diff --git a/docs/src/msg.md b/docs/src/msg.md index 66fc5dd..11b8251 100644 --- a/docs/src/msg.md +++ b/docs/src/msg.md @@ -1,18 +1,106 @@ # Messages -Messages are dictionary-like containers that carry information between agents. -Keys can be accessed on messages using the property notation (`msg.key`), and -keys that are absent yield `nothing`. When interacting with Java/Groovy agents, -messages are mapped to Java/Groovy message classes with fields with the same -name as the keys in the message. +Messages are data containers that carry information between agents. When +interacting with Java/Groovy agents, messages are mapped to Java/Groovy +message classes with fields with the same name as the keys in the message. -Message types are defined using the `MessageClass` function. For example: +Message types are defined using the `@message` macro. For example: ```julia -const ShellExecReq = MessageClass(@__MODULE__, "org.arl.fjage.shell.ShellExecReq") +@message "org.arl.fjage.shell.ShellExecReq" struct ShellExecReq + cmd::Union{String,Nothing} = nothing + script::Union{String,Nothing} = nothing + args::Vector{String} = String[] + ans::Bool = false +end ``` defines a `ShellExecReq` message type that maps to a Java class with the package `org.arl.fjage.shell.ShellExecReq`. +All messages are mutable. The `@message` macro also automatically adds a few fields: + +- `performative::Symbol` +- `messageID::String` +- `inReplyTo::String` +- `sender::AgentID` +- `recipient::AgentID` +- `sentAt::Int64` + +Messages can subtype other messages: +```julia +julia> abstract type MyAbstractMessage <: Message end + +julia> @message "org.arl.fjage.demo.MyConcreteMessage" struct MyConcreteMessage <: MyAbstractMessage + a::Int + end + +julia> MyConcreteMessage(a=1) +MyConcreteMessage:INFORM[a:1] + +julia> MyConcreteMessage(a=1) isa MyAbstractMessage +true +``` + +It is also possible to have a concrete message type that can also be a supertype +of another message: +```julia +julia> abstract type SomeMessage <: Message end + +julia> @message "org.arl.fjage.demo.SomeMessage" struct SomeMessage <: SomeMessage + a::Int + end + +julia> @message "org.arl.fjage.demo.SomeExtMessage" struct SomeExtMessage <: SomeMessage + a::Int + b::Int + end + +julia> SomeMessage(a=1) isa SomeMessage +true + +julia> SomeExtMessage(a=1, b=2) isa SomeMessage +true +``` + +Performatives are guessed automatically based on message classname. By default, +the performative is `Performative.INFORM`. If a message classname ends with a +`Req`, the default performative changes to `Performative.REQUEST`. Performatives +may be overridden at declaration or at construction (and are mutable): +```julia +julia> @message "org.arl.fjage.demo.SomeReq" struct SomeReq end; +julia> @message "org.arl.fjage.demo.SomeRsp" Performative.AGREE struct SomeRsp end; + +julia> SomeReq().performative +:REQUEST + +julia> SomeRsp().performative +:AGREE + +julia> SomeRsp(performative=Performative.INFORM).performative +:INFORM +``` + +When strict typing is not required, one can use the dictionary-like +`GenericMessage` message type: +```julia +julia> msg = GenericMessage("org.arl.fjage.demo.DynamicMessage") +DynamicMessage:INFORM + +julia> msg.a = 1 +1 + +julia> msg.b = "xyz" +"xyz" + +julia> msg +DynamicMessage:INFORM[a:1 b:"xyz"] + +julia> classname(msg) +"org.arl.fjage.demo.DynamicMessage" + +julia> msg isa GenericMessage +true +``` + ## API ```@autodocs diff --git a/pom.xml b/pom.xml index d687aac..65c3610 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ com.github.org-arl fjage - 1.10.0-SNAPSHOT + 1.12.2 diff --git a/src/Fjage.jl b/src/Fjage.jl index 81b0fec..1884087 100644 --- a/src/Fjage.jl +++ b/src/Fjage.jl @@ -34,4 +34,8 @@ include("container.jl") include("fsm.jl") include("coroutine_behavior.jl") +function __init__() + registermessages() +end + end diff --git a/src/const.jl b/src/const.jl index 4f8101a..cae867c 100644 --- a/src/const.jl +++ b/src/const.jl @@ -6,4 +6,9 @@ module Services end "Shell command execution request message." -const ShellExecReq = MessageClass(@__MODULE__, "org.arl.fjage.shell.ShellExecReq") +@message "org.arl.fjage.shell.ShellExecReq" struct ShellExecReq + cmd::Union{String,Nothing} = nothing + script::Union{String,Nothing} = nothing + args::Vector{String} = String[] + ans::Bool = false +end diff --git a/src/container.jl b/src/container.jl index 0c15dac..de12b8b 100644 --- a/src/container.jl +++ b/src/container.jl @@ -673,13 +673,13 @@ function _deliver(c::SlaveContainer, msg::Message, relay::Bool) if msg.recipient.name ∈ keys(c.agents) _deliver(c.agents[msg.recipient.name], msg) elseif relay - _prepare!(msg) + clazz, data = _prepare(msg) json = JSON.json(Dict( - "action" => "send", - "relay" => true, - "message" => Dict( - "clazz" => msg.__clazz__, - "data" => msg.__data__ + :action => :send, + :relay => true, + :message => Dict( + :clazz => clazz, + :data => data ) )) try @@ -701,22 +701,32 @@ _deliver(c::SlaveContainer, msg::Message) = _deliver(c, msg, true) ### stacktrace pretty printing & auto-reconnection -function reporterror(src, ex) - fname = basename(@__FILE__) - bt = String[] - for s ∈ stacktrace(catch_backtrace()) - push!(bt, " $s") - basename(string(s.file)) == fname && s.func == :run && break - end - bts = join(bt, '\n') - if src === nothing - @error "$(ex)\n Stack trace:\n$(bts)" - else - @error "[$(src)] $(ex)\n Stack trace:\n$(bts)" +""" + logerror(f::Function) + logerror(f::Function, src) + +Run function `f()` and log any errors that occur. +""" +function logerror(f::Function, src=nothing) + try + f() + catch ex + logerror(ex, src) end end -reporterror(ex) = reporterror(nothing, ex) +""" + logerror(err::Exception) + logerror(err::Exception, src) + +Log error `err` with a simple stack trace. +""" +function logerror(ex::Exception, src=nothing) + io = IOBuffer() + src === nothing || print(io, "[$src] ") + Base.showerror(IOContext(io, :limit => true), ex, Base.catch_backtrace()) + @error String(take!(io)) +end reconnect(c::StandaloneContainer, ex) = false function reconnect(c::SlaveContainer, ex) @@ -1443,7 +1453,7 @@ function action(b::OneShotBehavior) b.action === nothing || _mutex_call(b.action, b.agent, b) b.onend === nothing || _mutex_call(b.onend, b.agent, b) catch ex - reconnect(container(b.agent), ex) || reporterror(b.agent, ex) + reconnect(container(b.agent), ex) || logerror(ex, b.agent) end b.done = true delete!(b.agent._behaviors, b) @@ -1496,7 +1506,7 @@ function action(b::CyclicBehavior) try b.action === nothing || _mutex_call(b.action, b.agent, b) catch ex - reconnect(container(b.agent), ex) || reporterror(b.agent, ex) + reconnect(container(b.agent), ex) || logerror(ex, b.agent) end yield() else @@ -1505,7 +1515,7 @@ function action(b::CyclicBehavior) end b.onend === nothing || _mutex_call(b.onend, b.agent, b) catch ex - reconnect(container(b.agent), ex) || reporterror(b.agent, ex) + reconnect(container(b.agent), ex) || logerror(ex, b.agent) end b.done = true delete!(b.agent._behaviors, b) @@ -1594,7 +1604,7 @@ function action(b::WakerBehavior) end b.onend === nothing || _mutex_call(b.onend, b.agent, b) catch ex - reconnect(container(b.agent), ex) || reporterror(b.agent, ex) + reconnect(container(b.agent), ex) || logerror(ex, b.agent) end b.done = true delete!(b.agent._behaviors, b) @@ -1666,12 +1676,12 @@ function action(b::TickerBehavior) try b.done || b.action === nothing || _mutex_call(b.action, b.agent, b) catch ex - reconnect(container(b.agent), ex) || reporterror(b.agent, ex) + reconnect(container(b.agent), ex) || logerror(ex, b.agent) end end b.onend === nothing || _mutex_call(b.onend, b.agent, b) catch ex - reconnect(container(b.agent), ex) || reporterror(b.agent, ex) + reconnect(container(b.agent), ex) || logerror(ex, b.agent) end b.done = true delete!(b.agent._behaviors, b) @@ -1733,12 +1743,12 @@ function action(b::PoissonBehavior) try b.done || b.action === nothing || _mutex_call(b.action, b.agent, b) catch ex - reconnect(container(b.agent), ex) || reporterror(b.agent, ex) + reconnect(container(b.agent), ex) || logerror(ex, b.agent) end end b.onend === nothing || _mutex_call(b.onend, b.agent, b) catch ex - reconnect(container(b.agent), ex) || reporterror(b.agent, ex) + reconnect(container(b.agent), ex) || logerror(ex, b.agent) end b.done = true delete!(b.agent._behaviors, b) @@ -1814,12 +1824,12 @@ function action(b::MessageBehavior) msg = take!(ch) msg === nothing || b.action === nothing || _mutex_call(b.action, b.agent, b, msg) catch ex - reconnect(container(b.agent), ex) || reporterror(b.agent, ex) + reconnect(container(b.agent), ex) || logerror(ex, b.agent) end end b.onend === nothing || _mutex_call(b.onend, b.agent, b) catch ex - reconnect(container(b.agent), ex) || reporterror(b.agent, ex) + reconnect(container(b.agent), ex) || logerror(ex, b.agent) finally _dont_listen(b.agent, ch) close(ch) @@ -2056,10 +2066,10 @@ function _paramreq_action(a::Agent, b::MessageBehavior, msg::ParameterReq) end end catch ex - reconnect(container(a), ex) || reporterror(a, ex) + reconnect(container(a), ex) || logerror(ex, a) end end - rmsg = ParameterRsp(perf=Performative.INFORM, inReplyTo=msg.messageID, recipient=msg.sender, readonly=ro, index=ndx) + rmsg = ParameterRsp(performative=Performative.INFORM, inReplyTo=msg.messageID, recipient=msg.sender, readonly=ro, index=ndx) length(rsp) > 0 && ((rmsg.param, rmsg.value) = popfirst!(rsp)) length(rsp) > 0 && (rmsg.values = Dict(rsp)) send(a, rmsg) diff --git a/src/coroutine_behavior.jl b/src/coroutine_behavior.jl index a467ee5..e995819 100644 --- a/src/coroutine_behavior.jl +++ b/src/coroutine_behavior.jl @@ -52,15 +52,13 @@ end function action(b::CoroutineBehavior) b.control_task = current_task() b.action_task = Task() do - try + logerror(b.agent) do b.action(b.agent, b) - catch e - reporterror(b.agent, e) end b.done = true yieldto(b.control_task) end - try + logerror(b.agent) do while !b.done if !isnothing(b.block) lock(() -> wait(b.block), b.block) @@ -69,8 +67,6 @@ function action(b::CoroutineBehavior) yieldto(b.action_task) end end - catch ex - reporterror(b.agent, ex) end b.done = true b.control_task = nothing diff --git a/src/fsm.jl b/src/fsm.jl index 72bf756..5b8b056 100644 --- a/src/fsm.jl +++ b/src/fsm.jl @@ -214,7 +214,7 @@ function action(b::FSMBehavior) _mutex_call(onenter, b.agent, b, b.state) end catch ex - reconnect(container(b.agent), ex) || reporterror(b.agent, ex) + reconnect(container(b.agent), ex) || logerror(ex, b.agent) end yield() else @@ -222,7 +222,7 @@ function action(b::FSMBehavior) end end catch ex - reconnect(container(b.agent), ex) || reporterror(b.agent, ex) + reconnect(container(b.agent), ex) || logerror(ex, b.agent) end delete!(b.agent._behaviors, b) b.agent = nothing diff --git a/src/gw.jl b/src/gw.jl index 4cac869..b4a01b2 100644 --- a/src/gw.jl +++ b/src/gw.jl @@ -11,11 +11,9 @@ struct Gateway sock::Ref{TCPSocket} subscriptions::Set{String} pending::Dict{String,Channel} - msgqueue::Vector tasks_waiting_for_msg::Vector{Tuple{Task,#=receive_id::=#Int}} msgqueue_lock::ReentrantLock # Protects both msgqueue and tasks_waiting_for_msg - host::String port::Int reconnect::Ref{Bool} @@ -87,7 +85,7 @@ _alive(gw::Gateway) = nothing function _deliver(gw::Gateway, msg::Message, relay::Bool) lock(gw.msgqueue_lock) do - for (idx, (task, _)) in pairs(gw.tasks_waiting_for_msg) + for (idx, (task, _)) ∈ pairs(gw.tasks_waiting_for_msg) # Check if message matches the filter. This has to happen on the receiver # task because this task may run in a different world age. schedule(task, (current_task(), msg)) @@ -195,7 +193,7 @@ end function agentsforservice(gw::Gateway, svc::String) rq = Dict("action" => "agentsForService", "service" => svc) rsp = _ask(gw, rq) - [AgentID(a, false, gw) for a in rsp["agentIDs"]] + [AgentID(a, false, gw) for a ∈ rsp["agentIDs"]] end "Subscribe to receive all messages sent to the given topic." @@ -225,14 +223,20 @@ function Base.close(gw::Gateway) end # prepares a message to be sent to the server -function _prepare!(msg::Message) - for k in keys(msg.__data__) - v = msg.__data__[k] +function _prepare(msg::Message) + data = Dict{Symbol,Any}() + for k ∈ keys(msg) + v = msg[k] if typeof(v) <: Array && typeof(v).parameters[1] <: Complex btype = typeof(v).parameters[1].parameters[1] - msg.__data__[k] = reinterpret(btype, v) + data[k] = reinterpret(btype, v) + elseif v !== nothing + k === :performative && (k = :perf) + k === :messageID && (k = :msgID) + data[k] = v end end + classname(msg), data end # converts Base64 encoded arrays to Julia arrays @@ -261,45 +265,44 @@ function _b64toarray(v) end # creates a message object from a JSON representation of the object -function _inflate(json) +function _inflate(json::AbstractDict) function inflate_recursively!(d) - for (k,v) in d + for (k, v) ∈ d if typeof(v) <: Dict && haskey(v, "clazz") && match(r"^\[.$", v["clazz"]) != nothing v = _b64toarray(v) end if typeof(v) <: Array && length(v) > 0 t = typeof(v[1]) v = Array{t}(v) - kcplx = k*"__isComplex" + kcplx = k * "__isComplex" if haskey(d, kcplx) && d[kcplx] v = Array{Complex{t}}(reinterpret(Complex{t}, v)) delete!(d, kcplx) end end - if typeof(v) <: Dict - v = inflate_recursively!(v) - end - d[k] = v + d[k] = typeof(v) <: AbstractDict ? inflate_recursively!(v) : v end - return d - end - - if typeof(json) == String - json = JSON.parse(json) + d end clazz = json["clazz"] data = inflate_recursively!(json["data"]) - stype = _messageclass_lookup(clazz) - obj = @eval $stype() - for (k,v) in data + obj = _messageclass_lookup(clazz)() + for (k, v) ∈ data if k == "sender" || k == "recipient" v = AgentID(v) + elseif k == "perf" + k = "performative" + v = Symbol(v) + elseif k == "msgID" + k = "messageID" end - obj.__data__[k] = v + trysetproperty!(obj, Symbol(k), v) end obj end +_inflate(json::String) = _inflate(JSON.parse(json)) + """ send(gw, msg) @@ -310,13 +313,13 @@ function send(gw::Gateway, msg) isopen(gw.sock[]) || return false msg.sender = gw.agentID msg.sentAt = Dates.value(now()) - _prepare!(msg) + clazz, data = _prepare(msg) json = JSON.json(Dict( - "action" => "send", - "relay" => true, - "message" => Dict( - "clazz" => msg.__clazz__, - "data" => msg.__data__ + :action => :send, + :relay => true, + :message => Dict( + :clazz => clazz, + :data => data ) )) _println(gw.sock[], json) @@ -340,10 +343,11 @@ it must take in a message and return `true` or `false`. A message for which it r receive(gw::Gateway, timeout::Int=0) = receive(gw, msg->true, timeout) const receive_counter = Threads.Atomic{Int}(0) + function receive(gw::Gateway, filt, timeout=0) receive_id = (receive_counter[] += 1) maybe_msg = lock(gw.msgqueue_lock) do - for (idx, msg) in pairs(gw.msgqueue) + for (idx, msg) ∈ pairs(gw.msgqueue) if _matches(filt, msg) return Some(popat!(gw.msgqueue, idx)) end @@ -355,7 +359,7 @@ function receive(gw::Gateway, filt, timeout=0) @async begin sleep(timeout/1e3) lock(gw.msgqueue_lock) do - for (idx, (_, id)) in pairs(gw.tasks_waiting_for_msg) + for (idx, (_, id)) ∈ pairs(gw.tasks_waiting_for_msg) # We must identify the receive to remove from the waiting list # based on the receive ID and not the task because the task which # started this one may have had its previous receive satisfied and @@ -372,14 +376,10 @@ function receive(gw::Gateway, filt, timeout=0) push!(gw.tasks_waiting_for_msg, (current_task(), receive_id)) return nothing end - if !isnothing(maybe_msg) - return something(maybe_msg) - end + isnothing(maybe_msg) || return something(maybe_msg) while true maybe_task_and_msg = wait() - if isnothing(maybe_task_and_msg) - return nothing - end + isnothing(maybe_task_and_msg) && return nothing delivery_task, msg = maybe_task_and_msg if _matches(filt, msg) schedule(delivery_task, true) diff --git a/src/kwdef.jl b/src/kwdef.jl new file mode 100644 index 0000000..7dde8c0 --- /dev/null +++ b/src/kwdef.jl @@ -0,0 +1,124 @@ +# Temporary patch for @kwdef. +# Submitted to Base Julia in https://github.com/JuliaLang/julia/pull/53230 + +isexpr(@nospecialize(ex), head::Symbol) = isa(ex, Expr) && ex.head === head + +macro kwdef(expr) + expr = macroexpand(__module__, expr) # to expand @static + isexpr(expr, :struct) || error("Invalid usage of @kwdef") + T = expr.args[2] + if T isa Expr && T.head === :<: + T = T.args[1] + end + + params_ex = Expr(:parameters) + call_args = Any[] + + _kwdef!(expr.args[3], params_ex.args, call_args) + # Only define a constructor if the type has fields, otherwise we'll get a stack + # overflow on construction + if !isempty(params_ex.args) + T_no_esc,_ = strip_esc(T) + if T_no_esc isa Symbol + sig = :(($(esc(T)))($params_ex)) + call = :(($(esc(T)))($(call_args...))) + body = Expr(:block, __source__, call) + kwdefs = Expr(:function, sig, body) + elseif isexpr(T_no_esc, :curly) + # if T == S{A<:AA,B<:BB}, define two methods + # S(...) = ... + # S{A,B}(...) where {A<:AA,B<:BB} = ... + S = T.args[1] + P = T.args[2:end] + Q = Any[isexpr(U, :<:) ? U.args[1] : U for U in P] + SQ = :($S{$(Q...)}) + body1 = Expr(:block, __source__, :(($(esc(S)))($(call_args...)))) + sig1 = :(($(esc(S)))($params_ex)) + def1 = Expr(:function, sig1, body1) + body2 = Expr(:block, __source__, :(($(esc(SQ)))($(call_args...)))) + sig2 = :(($(esc(SQ)))($params_ex) where {$(esc.(P)...)}) + def2 = Expr(:function, sig2, body2) + kwdefs = Expr(:block, def1, def2) + else + error("Invalid usage of @kwdef") + end + else + kwdefs = nothing + end + return quote + $(esc(:($Base.@__doc__ $expr))) + $kwdefs + end +end + +# @kwdef helper function +# mutates arguments inplace +function _kwdef!(blk, params_args, call_args, esc_count = 0) + for i in eachindex(blk.args) + ei = blk.args[i] + if ei isa Symbol + # var + push!(params_args, ei) + push!(call_args, ei) + elseif ei isa Expr + is_atomic = ei.head === :atomic + ei = is_atomic ? first(ei.args) : ei # strip "@atomic" and add it back later + is_const = ei.head === :const + ei = is_const ? first(ei.args) : ei # strip "const" and add it back later + # Note: `@atomic const ..` isn't valid, but reconstruct it anyway to serve a nice error + if ei isa Symbol + # const var + push!(params_args, ei) + push!(call_args, ei) + elseif ei.head === :(=) + lhs = ei.args[1] + lhs_no_esc, lhs_esc_count = strip_esc(lhs) + if lhs_no_esc isa Symbol + # var = defexpr + var = lhs_no_esc + elseif lhs_no_esc isa Expr && lhs_no_esc.head === :(::) && strip_esc(lhs_no_esc.args[1])[1] isa Symbol + # var::T = defexpr + var = strip_esc(lhs_no_esc.args[1])[1] + else + # something else, e.g. inline inner constructor + # F(...) = ... + continue + end + defexpr = ei.args[2] # defexpr + defexpr = wrap_esc(defexpr, esc_count + lhs_esc_count) + push!(params_args, Expr(:kw, var, esc(defexpr))) + push!(call_args, var) + lhs = is_const ? Expr(:const, lhs) : lhs + lhs = is_atomic ? Expr(:atomic, lhs) : lhs + blk.args[i] = lhs # overrides arg + elseif ei.head === :(::) && strip_esc(ei.args[1])[1] isa Symbol + # var::Typ + var,_ = strip_esc(ei.args[1]) + push!(params_args, var) + push!(call_args, var) + elseif ei.head === :block + # can arise with use of @static inside type decl + _kwdef!(ei, params_args, call_args) + elseif ei.head === :escape + _kwdef!(ei, params_args, call_args, esc_count + 1) + end + end + end + blk +end + +function strip_esc(expr) + count = 0 + while isexpr(expr, :escape) + expr = expr.args[1] + count += 1 + end + return (expr, count) +end + +function wrap_esc(expr, count) + for _ = 1:count + expr = esc(expr) + end + return expr +end \ No newline at end of file diff --git a/src/msg.jl b/src/msg.jl index 758711d..fa4b317 100644 --- a/src/msg.jl +++ b/src/msg.jl @@ -1,29 +1,89 @@ -export Performative, Message, GenericMessage, MessageClass, AbstractMessageClass, clone, ParameterReq, ParameterRsp, set! +export Performative, Message, GenericMessage, @message, classname, clone, ParameterReq, ParameterRsp, set! # global variables const _messageclasses = Dict{String,DataType}() "An action represented by a message." module Performative - const REQUEST = "REQUEST" - const AGREE = "AGREE" - const REFUSE = "REFUSE" - const FAILURE = "FAILURE" - const INFORM = "INFORM" - const CONFIRM = "CONFIRM" - const DISCONFIRM = "DISCONFIRM" - const QUERY_IF = "QUERY_IF" - const NOT_UNDERSTOOD = "NOT_UNDERSTOOD" - const CFP = "CFP" - const PROPOSE = "PROPOSE" - const CANCEL = "CANCEL" + const REQUEST = :REQUEST + const AGREE = :AGREE + const REFUSE = :REFUSE + const FAILURE = :FAILURE + const INFORM = :INFORM + const CONFIRM = :CONFIRM + const DISCONFIRM = :DISCONFIRM + const QUERY_IF = :QUERY_IF + const NOT_UNDERSTOOD = :NOT_UNDERSTOOD + const CFP = :CFP + const PROPOSE = :PROPOSE + const CANCEL = :CANCEL end "Base class for messages transmitted by one agent to another." abstract type Message <: AbstractDict{Symbol,Any} end """ - mtype = MessageClass(context, clazz[, superclass[, performative]]) + classname(msg::Message) + +Return the fully qualified class name of a message. +""" +function classname end + +include("kwdef.jl") + +function _message(classname, perf, sdef) + if @capture(sdef, struct T_ <: P_ fields__ end) + if T == P + extra2 = :( Fjage.classname(::Type{$(T)}) = $(classname) ) + T = Symbol("_Concrete" * string(T)) + extra1 = :( $(P)(; kwargs...) = $(T)(; kwargs...) ) + else + extra1 = :() + extra2 = :() + end + T = esc(T) + P = esc(P) + fields .= esc.(fields) + extra1 = esc(extra1) + extra2 = esc(extra2) + push!(fields, :(messageID::String = string(uuid4()))) + push!(fields, :(performative::Symbol = $perf)) + push!(fields, :(sender::Union{AgentID,Nothing} = nothing)) + push!(fields, :(recipient::Union{AgentID,Nothing} = nothing)) + push!(fields, :(inReplyTo::Union{String,Nothing} = nothing)) + push!(fields, :(sentAt::Int64 = 0)) + quote + @kwdef mutable struct $(T) <: $(P); $(fields...); end + Fjage.classname(::Type{$(T)}) = $(classname) + Fjage.classname(::$(T)) = $(classname) + Fjage._messageclasses[$(classname)] = $(T) + $extra1 + $extra2 + end + elseif @capture(sdef, struct T_ fields__ end) + T = esc(T) + fields .= esc.(fields) + push!(fields, :(messageID::String = string(uuid4()))) + push!(fields, :(performative::Symbol = $perf)) + push!(fields, :(sender::Union{AgentID,Nothing} = nothing)) + push!(fields, :(recipient::Union{AgentID,Nothing} = nothing)) + push!(fields, :(inReplyTo::Union{String,Nothing} = nothing)) + push!(fields, :(sentAt::Int64 = 0)) + quote + @kwdef mutable struct $(T) <: Message; $(fields...); end + Fjage.classname(::Type{$(T)}) = $(classname) + Fjage.classname(::$(T)) = $(classname) + Fjage._messageclasses[$(classname)] = $(T) + end + else + @error "Bad message definition" + end +end + +""" + @message classname [performative] struct mtype [<: supertype] + fields... + end Create a message class from a fully qualified class name. If a performative is not specified, it is guessed based on the class name. For class names ending with "Req", @@ -32,77 +92,25 @@ the performative is assumed to be REQUEST, and for all other messages, INFORM. # Examples ```julia-repl -julia> MyShellExecReq = MessageClass(@__MODULE__, "org.arl.fjage.shell.ShellExecReq"); +julia> @message "org.arl.fjage.shell.MyShellExecReq" struct MyShellExecReq + cmd::String + end julia> req = MyShellExecReq(cmd="ps") -ShellExecReq: REQUEST [cmd:"ps"] +MyShellExecReq:REQUEST[cmd:"ps"] ``` """ -function MessageClass(context, clazz::String, superclass=nothing, performative=nothing) - sname = replace(string(clazz), "." => "_") - tname = sname - if performative === nothing - performative = match(r"Req$",string(clazz))==nothing ? Performative.INFORM : Performative.REQUEST - end - if superclass === nothing - superclass = "$(@__MODULE__).Message" - else - scname = string(superclass) - ndx = findlast(isequal('.'), scname) - if ndx !== nothing - scname = scname[ndx+1:end] - end - if scname == tname - tname = "$(scname)_" - end - end - expr = Expr(:toplevel) - expr.args = [Meta.parse(""" - struct $(tname) <: $(superclass) - clazz::String - data::Dict{String,Any} - $(tname)(c::String, d::Dict{String,Any}) = new(c, d) - end - """), - Meta.parse(""" - function $(sname)(d::Dict{String, Any}) - get!(d, "msgID", string($(@__MODULE__).uuid4())) - get!(d, "perf", "$performative") - return $(tname)("$(clazz)", d) - end - """), - Meta.parse(""" - function $(sname)(; kwargs...) - dict = Dict{String,Any}( - "msgID" => string($(@__MODULE__).uuid4()), - "perf" => "$(performative)" - ) - for k in keys(kwargs) - dict[string(k)] = kwargs[k] - end - return $(tname)("$(clazz)", dict) - end - """), - Meta.parse(""" - $(@__MODULE__)._messageclasses["$(clazz)"] = $(tname) - """)] - if sname != tname - push!(expr.args, Meta.parse("$(tname)(; kwargs...) = $(sname)(; kwargs...)")) - end - return context.eval(expr) +macro message(classname, perf, sdef) + _message(classname, perf, sdef) end -function AbstractMessageClass(context, clazz::String, performative=nothing) - sname = replace(string(clazz), "." => "_") - expr = Expr(:toplevel) - expr.args = [Meta.parse("abstract type $sname <: $(@__MODULE__).Message end"), Meta.parse("$sname")] - rv = context.eval(expr) - MessageClass(context, clazz, rv, performative) - return rv +macro message(classname, sdef) + perf = endswith(classname, "Req") ? :(:REQUEST) : :(:INFORM) + _message(classname, perf, sdef) end function clone(original::Message) - cloned = _messageclass_lookup(original.__clazz__)(original.__clazz__, deepcopy(original.__data__)) - cloned.msgID = string(uuid4()) + cloned = deepcopy(original) + cloned.messageID = string(uuid4()) return cloned end @@ -111,23 +119,25 @@ end registermessages(messageclasses) Register message classes with Fjage. Usually message classes are automatically registered on -creation with `MessageClass()`. However, when developing packages, if `MessageClass()` is used -at the module level, the types may be precompiled and the code to register the classes may not -get executed at runtime. In such cases, you may need to explicitly call `registermessages()` -in the `__init()__` funciton for the module. +creation with `@message`. However, when developing packages, if `@message` is used at the module level, +the types may be precompiled and the code to register the classes may not get executed at runtime. +In such cases, you may need to explicitly call `registermessages()` in the `__init()__` function +for the module. """ -function registermessages(msg=subtypes(Fjage.Message)) - for t ∈ msg - endswith(string(t), '_') && continue - s = t().__clazz__ - Fjage._messageclasses[s] = t - registermessages(subtypes(t)) +function registermessages(msg=subtypes(Message)) + for T ∈ msg + T <: GenericMessage && continue + if applicable(classname, T) + s = classname(T) + _messageclasses[s] = T + registermessages(subtypes(T)) + end end end -function _messageclass_lookup(clazz::String) - haskey(_messageclasses, clazz) && return _messageclasses[clazz] - Message +function _messageclass_lookup(classname::String) + haskey(_messageclasses, classname) && return _messageclasses[classname] + GenericMessage{Symbol(classname)} end # helper function to see if a message matches a filter @@ -136,32 +146,26 @@ function _matches(filt, msg) if typeof(filt) == DataType return typeof(msg) <: filt elseif typeof(filt) <: Message - return msg.inReplyTo == filt.msgID + return msg.inReplyTo == filt.messageID elseif typeof(filt) <: Function return filt(msg) end false end -# adds notation message.field -function Base.getproperty(s::Message, p::Symbol) - if p == :__clazz__ - return getfield(s, :clazz) - elseif p == :__data__ - return getfield(s, :data) - else - p1 = string(p) - if p1 == "performative" - p1 = "perf" - elseif p1 == "messageID" - p1 = "msgID" - end - v = getfield(s, :data) - if !haskey(v, p1) - return nothing - end - v = v[p1] - return v +# like Base.setproperty!, but does not throw an error if the property does not exist +function trysetproperty!(s::Message, p::Symbol, v) + hasfield(typeof(s), p) || return s + ftype = fieldtype(typeof(s), p) + ftype === Symbol && (v = Symbol(v)) + if v === nothing + ftype === Float32 && (v = NaN32) + ftype === Float64 && (v = NaN64) + end + try + setfield!(s, p, convert(ftype, v)) + catch ex + @warn "Error setting field $p to $v: $ex" end end @@ -174,34 +178,21 @@ function Base.get(s::Message, p::Symbol, default) end Base.getindex(s::Message, p::Symbol) = getproperty(s, p) -Base.keys(s::Message) = Symbol.(keys(getfield(s, :data))) -Base.values(s::Message) = values(getfield(s, :data)) +Base.keys(s::Message) = fieldnames(typeof(s)) +Base.values(s::Message) = getfield.(Ref(s), fieldnames(typeof(s))) Base.eltype(s::Message) = Pair{Symbol,Any} -Base.length(s::Message) = length(getfield(s, :data)) +Base.length(s::Message) = fieldcount(typeof(s)) function Base.iterate(s::Message) - it = iterate(getfield(s, :data)) - it === nothing && return nothing - (Symbol(it[1][1]) => it[1][2], it[2]) + f = fieldnames(typeof(s)) + isempty(f) && return nothing + v = getfield.(Ref(s), f) + (f[1] => v[1], (f[2:end], v[2:end])) end function Base.iterate(s::Message, state) - it = iterate(getfield(s, :data), state) - it === nothing && return nothing - (Symbol(it[1][1]) => it[1][2], it[2]) -end - -# adds notation message.field -function Base.setproperty!(s::Message, p::Symbol, v) - (p == :__clazz__ || p == :__data__) && throw(ArgumentError("read-only property cannot be set")) - p1 = string(p) - if p1 == "performative" - p1 = "perf" - elseif p1 == "messageID" - p1 = "msgID" - end - getfield(s, :data)[p1] = v - nothing + isempty(state[1]) && return nothing + (state[1][1] => state[2][1], (state[1][2:end], state[2][2:end])) end # pretty prints arrays without type names @@ -209,38 +200,42 @@ function _repr(x) x = repr(x) m = match(r"[A-Za-z0-9]+(\[.+\])", x) m !== nothing && (x = m[1]) + m = match(r"^\w+(\[.*)$", x) + m !== nothing && (x = m[1]) x end # pretty printing of messages function Base.show(io::IO, msg::Message) - ndx = findlast(".", msg.__clazz__) - s = ndx === nothing ? msg.__clazz__ : msg.__clazz__[ndx[1]+1:end] + s = classname(msg) + ndx = findlast(".", s) + ndx === nothing || (s = s[ndx[1]+1:end]) p = "" data_suffix = "" signal_suffix = "" suffix = "" - data = msg.__data__ - for k in keys(data) - x = data[k] - if k == "perf" - s *= ": " * x - elseif k == "data" + for k in keys(msg) + x = msg[k] + if k == :performative + s *= ":" * string(x) + elseif k == :data if typeof(x) <: Array data_suffix *= "($(length(x)) bytes)" - else - p *= " $k:" * _repr(data[k]) + elseif msg[k] !== nothing + p *= " $k:" * _repr(msg[k]) end - elseif k == "signal" + elseif k == :signal if typeof(x) <: Array signal_suffix *= "($(length(x)) samples)" - else - p *= " $k:" * _repr(data[k]) + elseif msg[k] !== nothing + p *= " $k:" * _repr(msg[k]) end - elseif k != "sender" && k != "recipient" && k != "msgID" && k != "inReplyTo" && k != "sentAt" - if typeof(x) <: Number || typeof(x) == String || typeof(x) <: Array || typeof(x) == Bool + elseif k != :sender && k != :recipient && k != :messageID && k != :inReplyTo && k != :sentAt + if typeof(x) <: Number + isnan(x) || (p *= " $k:" * _repr(x)) + elseif typeof(x) == String || typeof(x) <: Array || typeof(x) == Bool p *= " $k:" * _repr(x) - else + elseif x !== nothing && x !== missing suffix = "..." end end @@ -249,22 +244,83 @@ function Base.show(io::IO, msg::Message) length(signal_suffix) > 0 && (p *= " " * signal_suffix) length(data_suffix) > 0 && (p *= " " * data_suffix) p = strip(p) - length(p) > 0 && (s *= " [$p]") - if msg.__clazz__ == "org.arl.fjage.GenericMessage" - m = match(r"^GenericMessage: (.*)$", s) - m === nothing || (s = m[1]) - end + length(p) > 0 && (s *= "[$p]") print(io, s) end +# concrete message without data +@message "org.arl.fjage.Message" struct _Message end + "Generic message type that can carry arbitrary name-value pairs as data." -GenericMessage = MessageClass(@__MODULE__, "org.arl.fjage.GenericMessage") +Base.@kwdef mutable struct GenericMessage{T} <: Message + __data__::Dict{Symbol,Any} = Dict{Symbol,Any}() + messageID::String = string(Fjage.uuid4()) + performative::Symbol = Performative.INFORM + sender::Union{AgentID,Nothing} = nothing + recipient::Union{AgentID,Nothing} = nothing + inReplyTo::Union{String,Nothing} = nothing + sentAt::Int64 = 0 +end -"Parameter request message." -ParameterReq = MessageClass(@__MODULE__, "org.arl.fjage.param.ParameterReq") +Fjage.classname(::Type{GenericMessage{T}}) where T = string(T) +Fjage.classname(::GenericMessage{T}) where T = string(T) -"Parameter response message." -ParameterRsp = MessageClass(@__MODULE__, "org.arl.fjage.param.ParameterRsp") +GenericMessage(args...) = GenericMessage{Symbol("org.arl.fjage.GenericMessage")}(args...) +GenericMessage(clazz::String, perf::Symbol=Performative.INFORM; kwargs...) = GenericMessage{Symbol(clazz)}(; performative=perf, kwargs...) + +# adds notation message.field + +function Base.getproperty(s::GenericMessage, p::Symbol) + hasfield(typeof(s), p) && return getfield(s, p) + haskey(s.__data__, p) && return s.__data__[p] + clazz = classname(s) + ndx = findlast(".", clazz) + ndx === nothing || (clazz = clazz[ndx[1]+1:end]) + error("message $(clazz) has no field $(p)") +end + +function Base.setproperty!(s::GenericMessage, p::Symbol, v) + if hasfield(typeof(s), p) + setfield!(s, p, v) + else + s.__data__[p] = v + end +end + +trysetproperty!(s::GenericMessage, p::Symbol, v) = setproperty!(s, p, v) + +# dictionary interface for GenericMessages + +function Base.get(s::GenericMessage, p::Symbol, default) + hasfield(typeof(s), p) && return getfield(s, p) + haskey(s.__data__, p) && return s.__data__[p] + default +end + +function Base.keys(s::GenericMessage) + k = Set(keys(s.__data__)) + for f ∈ fieldnames(typeof(s)) + f == :__data__ && continue + push!(k, f) + end + k +end + +function Base.values(s::GenericMessage) + v = Any[] + for k ∈ keys(s) + push!(v, s[k]) + end + v +end + +Base.length(s::GenericMessage) = fieldcount(typeof(s)) - 1 + length(s.__data__) + +function Base.iterate(s::GenericMessage) + f = keys(s) + v = values(s) + (f[1] => v[1], (f[2:end], v[2:end])) +end """ msg = Message([perf]) @@ -274,15 +330,29 @@ Create a message with just a performative (`perf`) and no data. If the performat is not specified, it defaults to INFORM. If the inreplyto is specified, the message `inReplyTo` and `recipient` fields are set accordingly. """ -Message(perf::String=Performative.INFORM) = _Message(perf=perf) -Message(inreplyto::Message, perf::String=Performative.INFORM) = _Message(perf=perf, inReplyTo=inreplyto.msgID, recipient=inreplyto.sender) +Message(perf::Symbol=Performative.INFORM) = _Message(performative=perf) +Message(inreplyto::Message, perf::Symbol=Performative.INFORM) = _Message(performative=perf, inReplyTo=inreplyto.messageID, recipient=inreplyto.sender) + +"Parameter request message." +@message "org.arl.fjage.param.ParameterReq" struct ParameterReq + index::Int = -1 + param::Union{String,Nothing} = nothing + value::Union{Any,Nothing} = nothing + requests::Union{Vector{Dict{String,Any}},Nothing} = nothing +end -# message base class -_Message = MessageClass(@__MODULE__, "org.arl.fjage.Message") +"Parameter response message." +@message "org.arl.fjage.param.ParameterRsp" struct ParameterRsp + index::Int = -1 + param::Union{String,Nothing} = nothing + value::Union{Any,Nothing} = nothing + values::Union{Dict{String,Any},Nothing} = nothing + readonly::Vector{String} = String[] +end # convenience methods and pretty printing for parameters -function org_arl_fjage_param_ParameterReq(vals...; index=-1) +function ParameterReq(vals...; index=-1) req = ParameterReq(index=index) qlist = Pair{String,Any}[] for v ∈ vals @@ -325,6 +395,7 @@ ParameterReq[index=1 modulation=? ...] ``` """ function Base.get!(p::ParameterReq, param) + param = string(param) if p.param === nothing p.param = param else @@ -351,6 +422,7 @@ ParameterReq[index=1 modulation=ofdm ...] ``` """ function set!(p::ParameterReq, param, value) + param = string(param) if p.param === nothing p.param = param p.value = value @@ -381,16 +453,22 @@ end function Base.show(io::IO, p::ParameterReq) print(io, "ParameterReq[") - p.index !== nothing && p.index ≥ 0 && print(io, "index=", p.index, ' ') - p.param !== nothing && print(io, p.param, '=', (p.value === nothing ? "?" : string(p.value))) + if p.index !== nothing && p.index ≥ 0 + print(io, "index=", p.index) + p.param === nothing || print(io, ' ') + end + p.param === nothing || print(io, p.param, '=', (p.value === nothing ? "?" : string(p.value))) p.requests === nothing || print(io, " ...") print(io, ']') end function Base.show(io::IO, p::ParameterRsp) print(io, "ParameterRsp[") - p.index !== nothing && p.index ≥ 0 && print(io, "index=", p.index, ' ') - p.param !== nothing && print(io, p.param, '=', p.value) + if p.index !== nothing && p.index ≥ 0 + print(io, "index=", p.index) + p.param === nothing || print(io, ' ') + end + p.param === nothing || print(io, p.param, '=', p.value) p.values === nothing || print(io, " ...") print(io, ']') end diff --git a/test/runtests.jl b/test/runtests.jl index 71107cc..189b552 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -18,6 +18,7 @@ try @testset "Fjage" begin gw = Gateway("localhost", 5081) + @testset "Gateway" begin @test typeof(gw) <: Gateway @test typeof(gw.agentID) <: AgentID @@ -25,6 +26,7 @@ try end shell = agentforservice(gw, "org.arl.fjage.shell.Services.SHELL") + @testset "agentforservice (+)" begin @test typeof(shell) <: AgentID @test shell.name == "shell" @@ -51,14 +53,17 @@ try @test length(alist) == 0 end - MyAbstractReq = AbstractMessageClass(@__MODULE__, "org.arl.fjage.test.MyAbstractReq") - MyReq = MessageClass(@__MODULE__, "org.arl.fjage.test.MyReq", MyAbstractReq) - @testset "MessageClass" begin + abstract type MyAbstractReq <: Message end + @message "org.arl.fjage.test.MyAbstractReq" struct MyAbstractReq <: MyAbstractReq end + @message "org.arl.fjage.test.MyReq" Performative.AGREE struct MyReq <: MyAbstractReq end + + @testset "@message" begin @test MyAbstractReq <: Message @test MyReq <: Message @test MyReq <: MyAbstractReq @test isa(MyAbstractReq(), MyAbstractReq) @test isa(MyReq(), MyAbstractReq) + @test MyReq().performative == Performative.AGREE end @testset "send & receive (gw, blocking)" begin @@ -69,9 +74,8 @@ try send(gw, ShellExecReq(recipient=shell, cmd="1+2")) rsp = take!(channel) @test typeof(rsp) <: Message - @test rsp.performative == "AGREE" - - # Make sure response is removed + @test rsp.performative == Performative.AGREE + # make sure response is removed rsp = receive(gw) @test isnothing(rsp) end @@ -88,9 +92,8 @@ try sleep(0.1) end @test typeof(rsp) <: Message - @test rsp.performative == "AGREE" - - # Make sure response is removed + @test rsp.performative == Performative.AGREE + # make sure response is removed rsp = receive(gw) @test isnothing(rsp) end @@ -100,31 +103,32 @@ try send(shell, ShellExecReq(cmd="1+2")) rsp = receive(gw, 1000) @test typeof(rsp) <: Message - @test rsp.performative == "AGREE" + @test rsp.performative == Performative.AGREE end @testset "request (gw)" begin flush(gw) rsp = request(gw, ShellExecReq(recipient=shell, cmd="1+2")) @test typeof(rsp) <: Message - @test rsp.performative == "AGREE" + @test rsp.performative == Performative.AGREE end @testset "request (aid)" begin flush(gw) rsp = request(shell, ShellExecReq(cmd="1+2")) @test typeof(rsp) <: Message - @test rsp.performative == "AGREE" + @test rsp.performative == Performative.AGREE end @testset "<< (aid, +)" begin flush(gw) rsp = shell << ShellExecReq(cmd="1+2") @test typeof(rsp) <: Message - @test rsp.performative == "AGREE" + @test rsp.performative == Performative.AGREE end dummy = agent(gw, "dummy") + @testset "agent" begin @test typeof(dummy) <: AgentID end @@ -144,6 +148,7 @@ try end ntf = topic(gw, "broadcast") + @testset "topic" begin @test typeof(ntf) <: AgentID end @@ -158,7 +163,7 @@ try @testset "subscribe (+)" begin flush(gw) subscribe(gw, ntf) - send(ntf, ShellExecReq(cmd="1+2")) + send(ntf, ShellExecReq(cmd="1+2+3")) msg = receive(gw, 1000) @test typeof(msg) <: ShellExecReq end @@ -171,8 +176,7 @@ try send(ntf, ShellExecReq(cmd="1+2")) msg = take!(channel) @test typeof(msg) <: ShellExecReq - - # Make sure response is removed + # make sure response is removed msg = receive(gw, ShellExecReq) @test isnothing(msg) end @@ -195,7 +199,8 @@ try @test isnothing(msg) end - UnknownReq = MessageClass(@__MODULE__, "org.arl.fjage.shell.UnknownReq") + @message "org.arl.fjage.shell.UnknownReq" struct UnknownReq end + @testset "receive (filt, -)" begin flush(gw) send(ntf, ShellExecReq(cmd="1+2")) @@ -271,10 +276,10 @@ try sleep(1.0) end @test typeof(rsp) <: Message - @test rsp.performative == "AGREE" + @test rsp.performative == Performative.AGREE end - # Issue 21 + # issue #21 @testset "inflate json" begin vec_f64 = [1.0, 2.0, 3.4] vec_c64 = [1.0 + 2.0im, 3.4 + 2.1im] @@ -309,24 +314,22 @@ try end finally - -# stop fjåge - + # stop fjåge println("Stopping fjåge...") kill(master) - end @testset "CoroutineBehavior" begin + c = Container() start(c) - @agent struct MyAgent; end + @agent struct MyAgent end a = MyAgent() add(c, a) @testset "delay" begin - # Test that delay indeed delays for at least as long as promised + # test that delay indeed delays for at least as long as promised dt = 100 t = zeros(Int, 10) cond = Threads.Condition() @@ -335,10 +338,10 @@ end t[i] = currenttimemillis(a) delay(b, dt) end - lock(()->notify(cond), cond) + lock(() -> notify(cond), cond) end add(a, b) - lock(()->wait(cond), cond) + lock(() -> wait(cond), cond) @test done(b) @test all(diff(t) .>= dt) @@ -346,7 +349,7 @@ end end @testset "stop" begin - # Test that CoroutineBehaviors can be stopped during delays + # test that CoroutineBehaviors can be stopped during delays flag = false b = CoroutineBehavior() do a, b delay(b, 1000) @@ -361,7 +364,7 @@ end end @testset "lock" begin - # Test that CoroutineBehaviors lock the agent while they are running + # test that CoroutineBehaviors lock the agent while they are running dt = 1000 t0 = currenttimemillis(a) t1 = -1 @@ -370,16 +373,19 @@ end sleep(0.5 + dt*1e-3) @test t1 - t0 > dt end + end @testset "clone(::Message)" begin + original = GenericMessage() original.data = [1,2,3] cloned = clone(original) - @test cloned.__clazz__ == original.__clazz__ + @test classname(cloned) == classname(original) @test typeof(cloned) == typeof(original) - @test cloned.msgID != original.msgID + @test cloned.messageID != original.messageID @test cloned.data == original.data @test cloned.data !== original.data -end \ No newline at end of file + +end