Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use macros to define messages rather than eval #45

Merged
merged 22 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c6aabb1
refactor(msg): use macros to define messages rather than eval (WIP)
mchitre Jan 23, 2024
a3c5363
refactor: improve message implementation
mchitre Jan 28, 2024
d64b36a
fix: register messages defined in Fjage
mchitre Jan 29, 2024
f3eb9cb
style: cosmetic improvements to code
mchitre Jan 29, 2024
630f264
refactor: improve GenericMessages and JSONification
mchitre Jan 29, 2024
d429569
test: fix test cases and cosmetic improvements
mchitre Jan 29, 2024
02fb6ba
refactor: make invalid properties for messages throw errors unless in…
mchitre Jan 31, 2024
1b2a489
refactor: drop unnecessary property handling code
mchitre Jan 31, 2024
12c3fe9
test: fix fjage version for testing
mchitre Jan 31, 2024
eee6eef
fix(msg): drop eval for performative and let it be interpolated in
mchitre Feb 1, 2024
fe5470a
test: add test case that uses performative in message definition
mchitre Feb 1, 2024
51bcd26
fix(msg): only assume Fjage symbol to be in scope of caller
mchitre Feb 1, 2024
7a15935
feat(msg): improve pretty printing
mchitre Feb 1, 2024
bba1083
doc: update docs to reflect new message definition API
mchitre Feb 1, 2024
993e261
refactor: Escape more narrowly in @message
ettersi Feb 2, 2024
030e6c8
fix(kwdef): Copy over Base.isexpr
ettersi Feb 4, 2024
868f916
fix(container): update container to use new message format
mchitre Feb 4, 2024
0088453
fix(msg): handle NaNs, inheritance and fix readonly property in Param…
mchitre Feb 4, 2024
55c290e
refactor(msg): use _Concrete prefix and use applicable() instead of t…
mchitre Feb 5, 2024
9af5a95
refactor: replace reporterror with logerror
mchitre Feb 5, 2024
80bdf6c
fix(kwdef): Delete @show that was added for debugging
ettersi Feb 7, 2024
7c5afd6
docs(kwdef): xref PR
ettersi Feb 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 95 additions & 7 deletions docs/src/msg.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,106 @@
# Messages

Messages are dictionary-like containers that carry information between agents.
Keys can be accessed on messages using the property notation (`msg.key`), and
keys that are absent yield `nothing`. When interacting with Java/Groovy agents,
messages are mapped to Java/Groovy message classes with fields with the same
name as the keys in the message.
Messages are data containers that carry information between agents. When
interacting with Java/Groovy agents, messages are mapped to Java/Groovy
message classes with fields with the same name as the keys in the message.

Message types are defined using the `MessageClass` function. For example:
Message types are defined using the `@message` macro. For example:
```julia
const ShellExecReq = MessageClass(@__MODULE__, "org.arl.fjage.shell.ShellExecReq")
@message "org.arl.fjage.shell.ShellExecReq" struct ShellExecReq
cmd::Union{String,Nothing} = nothing
script::Union{String,Nothing} = nothing
args::Vector{String} = String[]
ans::Bool = false
end
```
defines a `ShellExecReq` message type that maps to a Java class with the
package `org.arl.fjage.shell.ShellExecReq`.

All messages are mutable. The `@message` macro also automatically adds a few fields:

- `performative::Symbol`
- `messageID::String`
- `inReplyTo::String`
- `sender::AgentID`
- `recipient::AgentID`
- `sentAt::Int64`

Messages can subtype other messages:
```julia
julia> abstract type MyAbstractMessage <: Message end

julia> @message "org.arl.fjage.demo.MyConcreteMessage" struct MyConcreteMessage <: MyAbstractMessage
a::Int
end

julia> MyConcreteMessage(a=1)
MyConcreteMessage:INFORM[a:1]

julia> MyConcreteMessage(a=1) isa MyAbstractMessage
true
```

It is also possible to have a concrete message type that can also be a supertype
of another message:
```julia
julia> abstract type SomeMessage <: Message end

julia> @message "org.arl.fjage.demo.SomeMessage" struct SomeMessage <: SomeMessage
a::Int
end

julia> @message "org.arl.fjage.demo.SomeExtMessage" struct SomeExtMessage <: SomeMessage
a::Int
b::Int
end

julia> SomeMessage(a=1) isa SomeMessage
true

julia> SomeExtMessage(a=1, b=2) isa SomeMessage
true
```

Performatives are guessed automatically based on message classname. By default,
the performative is `Performative.INFORM`. If a message classname ends with a
`Req`, the default performative changes to `Performative.REQUEST`. Performatives
may be overridden at declaration or at construction (and are mutable):
```julia
julia> @message "org.arl.fjage.demo.SomeReq" struct SomeReq end;
julia> @message "org.arl.fjage.demo.SomeRsp" Performative.AGREE struct SomeRsp end;

julia> SomeReq().performative
:REQUEST

julia> SomeRsp().performative
:AGREE

julia> SomeRsp(performative=Performative.INFORM).performative
:INFORM
```

When strict typing is not required, one can use the dictionary-like
`GenericMessage` message type:
```julia
julia> msg = GenericMessage("org.arl.fjage.demo.DynamicMessage")
DynamicMessage:INFORM

julia> msg.a = 1
1

julia> msg.b = "xyz"
"xyz"

julia> msg
DynamicMessage:INFORM[a:1 b:"xyz"]

julia> classname(msg)
"org.arl.fjage.demo.DynamicMessage"

julia> msg isa GenericMessage
true
```

## API

```@autodocs
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<dependency>
<groupId>com.github.org-arl</groupId>
<artifactId>fjage</artifactId>
<version>1.10.0-SNAPSHOT</version>
<version>1.12.2</version>
</dependency>
</dependencies>
</project>
4 changes: 4 additions & 0 deletions src/Fjage.jl
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ include("container.jl")
include("fsm.jl")
include("coroutine_behavior.jl")

function __init__()
registermessages()
end

end
7 changes: 6 additions & 1 deletion src/const.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,9 @@ module Services
end

"Shell command execution request message."
const ShellExecReq = MessageClass(@__MODULE__, "org.arl.fjage.shell.ShellExecReq")
@message "org.arl.fjage.shell.ShellExecReq" struct ShellExecReq
cmd::Union{String,Nothing} = nothing
script::Union{String,Nothing} = nothing
args::Vector{String} = String[]
ans::Bool = false
end
14 changes: 7 additions & 7 deletions src/container.jl
Original file line number Diff line number Diff line change
Expand Up @@ -673,13 +673,13 @@ function _deliver(c::SlaveContainer, msg::Message, relay::Bool)
if msg.recipient.name ∈ keys(c.agents)
_deliver(c.agents[msg.recipient.name], msg)
elseif relay
_prepare!(msg)
clazz, data = _prepare(msg)
json = JSON.json(Dict(
"action" => "send",
"relay" => true,
"message" => Dict(
"clazz" => msg.__clazz__,
"data" => msg.__data__
:action => :send,
:relay => true,
:message => Dict(
:clazz => clazz,
:data => data
)
))
try
Expand Down Expand Up @@ -2059,7 +2059,7 @@ function _paramreq_action(a::Agent, b::MessageBehavior, msg::ParameterReq)
reconnect(container(a), ex) || reporterror(a, ex)
end
end
rmsg = ParameterRsp(perf=Performative.INFORM, inReplyTo=msg.messageID, recipient=msg.sender, readonly=ro, index=ndx)
rmsg = ParameterRsp(performative=Performative.INFORM, inReplyTo=msg.messageID, recipient=msg.sender, readonly=ro, index=ndx)
length(rsp) > 0 && ((rmsg.param, rmsg.value) = popfirst!(rsp))
length(rsp) > 0 && (rmsg.values = Dict(rsp))
send(a, rmsg)
Expand Down
76 changes: 38 additions & 38 deletions src/gw.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ struct Gateway
sock::Ref{TCPSocket}
subscriptions::Set{String}
pending::Dict{String,Channel}

msgqueue::Vector
tasks_waiting_for_msg::Vector{Tuple{Task,#=receive_id::=#Int}}
msgqueue_lock::ReentrantLock # Protects both msgqueue and tasks_waiting_for_msg

host::String
port::Int
reconnect::Ref{Bool}
Expand Down Expand Up @@ -87,7 +85,7 @@ _alive(gw::Gateway) = nothing

function _deliver(gw::Gateway, msg::Message, relay::Bool)
lock(gw.msgqueue_lock) do
for (idx, (task, _)) in pairs(gw.tasks_waiting_for_msg)
for (idx, (task, _)) pairs(gw.tasks_waiting_for_msg)
# Check if message matches the filter. This has to happen on the receiver
# task because this task may run in a different world age.
schedule(task, (current_task(), msg))
Expand Down Expand Up @@ -195,7 +193,7 @@ end
function agentsforservice(gw::Gateway, svc::String)
rq = Dict("action" => "agentsForService", "service" => svc)
rsp = _ask(gw, rq)
[AgentID(a, false, gw) for a in rsp["agentIDs"]]
[AgentID(a, false, gw) for a rsp["agentIDs"]]
end

"Subscribe to receive all messages sent to the given topic."
Expand Down Expand Up @@ -225,14 +223,20 @@ function Base.close(gw::Gateway)
end

# prepares a message to be sent to the server
function _prepare!(msg::Message)
for k in keys(msg.__data__)
v = msg.__data__[k]
function _prepare(msg::Message)
data = Dict{Symbol,Any}()
for k ∈ keys(msg)
v = msg[k]
if typeof(v) <: Array && typeof(v).parameters[1] <: Complex
btype = typeof(v).parameters[1].parameters[1]
msg.__data__[k] = reinterpret(btype, v)
data[k] = reinterpret(btype, v)
elseif v !== nothing
k === :performative && (k = :perf)
k === :messageID && (k = :msgID)
data[k] = v
end
end
classname(msg), data
end

# converts Base64 encoded arrays to Julia arrays
Expand Down Expand Up @@ -261,45 +265,44 @@ function _b64toarray(v)
end

# creates a message object from a JSON representation of the object
function _inflate(json)
function _inflate(json::AbstractDict)
function inflate_recursively!(d)
for (k,v) in d
for (k, v) d
if typeof(v) <: Dict && haskey(v, "clazz") && match(r"^\[.$", v["clazz"]) != nothing
v = _b64toarray(v)
end
if typeof(v) <: Array && length(v) > 0
t = typeof(v[1])
v = Array{t}(v)
kcplx = k*"__isComplex"
kcplx = k * "__isComplex"
if haskey(d, kcplx) && d[kcplx]
v = Array{Complex{t}}(reinterpret(Complex{t}, v))
delete!(d, kcplx)
end
end
if typeof(v) <: Dict
v = inflate_recursively!(v)
end
d[k] = v
d[k] = typeof(v) <: AbstractDict ? inflate_recursively!(v) : v
end
return d
end

if typeof(json) == String
json = JSON.parse(json)
d
end
clazz = json["clazz"]
data = inflate_recursively!(json["data"])
stype = _messageclass_lookup(clazz)
obj = @eval $stype()
for (k,v) in data
obj = _messageclass_lookup(clazz)()
for (k, v) ∈ data
if k == "sender" || k == "recipient"
v = AgentID(v)
elseif k == "perf"
k = "performative"
v = Symbol(v)
elseif k == "msgID"
k = "messageID"
end
obj.__data__[k] = v
trysetproperty!(obj, Symbol(k), v)
end
obj
end

_inflate(json::String) = _inflate(JSON.parse(json))

"""
send(gw, msg)

Expand All @@ -310,13 +313,13 @@ function send(gw::Gateway, msg)
isopen(gw.sock[]) || return false
msg.sender = gw.agentID
msg.sentAt = Dates.value(now())
_prepare!(msg)
clazz, data = _prepare(msg)
json = JSON.json(Dict(
"action" => "send",
"relay" => true,
"message" => Dict(
"clazz" => msg.__clazz__,
"data" => msg.__data__
:action => :send,
:relay => true,
:message => Dict(
:clazz => clazz,
:data => data
)
))
_println(gw.sock[], json)
Expand All @@ -340,10 +343,11 @@ it must take in a message and return `true` or `false`. A message for which it r
receive(gw::Gateway, timeout::Int=0) = receive(gw, msg->true, timeout)

const receive_counter = Threads.Atomic{Int}(0)

function receive(gw::Gateway, filt, timeout=0)
receive_id = (receive_counter[] += 1)
maybe_msg = lock(gw.msgqueue_lock) do
for (idx, msg) in pairs(gw.msgqueue)
for (idx, msg) pairs(gw.msgqueue)
if _matches(filt, msg)
return Some(popat!(gw.msgqueue, idx))
end
Expand All @@ -355,7 +359,7 @@ function receive(gw::Gateway, filt, timeout=0)
@async begin
sleep(timeout/1e3)
lock(gw.msgqueue_lock) do
for (idx, (_, id)) in pairs(gw.tasks_waiting_for_msg)
for (idx, (_, id)) pairs(gw.tasks_waiting_for_msg)
# We must identify the receive to remove from the waiting list
# based on the receive ID and not the task because the task which
# started this one may have had its previous receive satisfied and
Expand All @@ -372,14 +376,10 @@ function receive(gw::Gateway, filt, timeout=0)
push!(gw.tasks_waiting_for_msg, (current_task(), receive_id))
return nothing
end
if !isnothing(maybe_msg)
return something(maybe_msg)
end
isnothing(maybe_msg) || return something(maybe_msg)
while true
maybe_task_and_msg = wait()
if isnothing(maybe_task_and_msg)
return nothing
end
isnothing(maybe_task_and_msg) && return nothing
delivery_task, msg = maybe_task_and_msg
if _matches(filt, msg)
schedule(delivery_task, true)
Expand Down
Loading
Loading