Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use macros to define messages rather than eval #45

Merged
merged 22 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c6aabb1
refactor(msg): use macros to define messages rather than eval (WIP)
mchitre Jan 23, 2024
a3c5363
refactor: improve message implementation
mchitre Jan 28, 2024
d64b36a
fix: register messages defined in Fjage
mchitre Jan 29, 2024
f3eb9cb
style: cosmetic improvements to code
mchitre Jan 29, 2024
630f264
refactor: improve GenericMessages and JSONification
mchitre Jan 29, 2024
d429569
test: fix test cases and cosmetic improvements
mchitre Jan 29, 2024
02fb6ba
refactor: make invalid properties for messages throw errors unless in…
mchitre Jan 31, 2024
1b2a489
refactor: drop unnecessary property handling code
mchitre Jan 31, 2024
12c3fe9
test: fix fjage version for testing
mchitre Jan 31, 2024
eee6eef
fix(msg): drop eval for performative and let it be interpolated in
mchitre Feb 1, 2024
fe5470a
test: add test case that uses performative in message definition
mchitre Feb 1, 2024
51bcd26
fix(msg): only assume Fjage symbol to be in scope of caller
mchitre Feb 1, 2024
7a15935
feat(msg): improve pretty printing
mchitre Feb 1, 2024
bba1083
doc: update docs to reflect new message definition API
mchitre Feb 1, 2024
993e261
refactor: Escape more narrowly in @message
ettersi Feb 2, 2024
030e6c8
fix(kwdef): Copy over Base.isexpr
ettersi Feb 4, 2024
868f916
fix(container): update container to use new message format
mchitre Feb 4, 2024
0088453
fix(msg): handle NaNs, inheritance and fix readonly property in Param…
mchitre Feb 4, 2024
55c290e
refactor(msg): use _Concrete prefix and use applicable() instead of t…
mchitre Feb 5, 2024
9af5a95
refactor: replace reporterror with logerror
mchitre Feb 5, 2024
80bdf6c
fix(kwdef): Delete @show that was added for debugging
ettersi Feb 7, 2024
7c5afd6
docs(kwdef): xref PR
ettersi Feb 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Fjage.jl
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ include("container.jl")
include("fsm.jl")
include("coroutine_behavior.jl")

function __init__()
registermessages()
end

end
7 changes: 6 additions & 1 deletion src/const.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,9 @@ module Services
end

"Shell command execution request message."
const ShellExecReq = MessageClass(@__MODULE__, "org.arl.fjage.shell.ShellExecReq")
@message "org.arl.fjage.shell.ShellExecReq" struct ShellExecReq
cmd::Union{String,Nothing} = nothing
script::Union{String,Nothing} = nothing
args::Vector{String} = String[]
ans::Bool = false
end
76 changes: 38 additions & 38 deletions src/gw.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ struct Gateway
sock::Ref{TCPSocket}
subscriptions::Set{String}
pending::Dict{String,Channel}

msgqueue::Vector
tasks_waiting_for_msg::Vector{Tuple{Task,#=receive_id::=#Int}}
msgqueue_lock::ReentrantLock # Protects both msgqueue and tasks_waiting_for_msg

host::String
port::Int
reconnect::Ref{Bool}
Expand Down Expand Up @@ -87,7 +85,7 @@ _alive(gw::Gateway) = nothing

function _deliver(gw::Gateway, msg::Message, relay::Bool)
lock(gw.msgqueue_lock) do
for (idx, (task, _)) in pairs(gw.tasks_waiting_for_msg)
for (idx, (task, _)) pairs(gw.tasks_waiting_for_msg)
# Check if message matches the filter. This has to happen on the receiver
# task because this task may run in a different world age.
schedule(task, (current_task(), msg))
Expand Down Expand Up @@ -195,7 +193,7 @@ end
function agentsforservice(gw::Gateway, svc::String)
rq = Dict("action" => "agentsForService", "service" => svc)
rsp = _ask(gw, rq)
[AgentID(a, false, gw) for a in rsp["agentIDs"]]
[AgentID(a, false, gw) for a rsp["agentIDs"]]
end

"Subscribe to receive all messages sent to the given topic."
Expand Down Expand Up @@ -225,14 +223,20 @@ function Base.close(gw::Gateway)
end

# prepares a message to be sent to the server
function _prepare!(msg::Message)
for k in keys(msg.__data__)
v = msg.__data__[k]
function _prepare(msg::Message)
data = Dict{Symbol,Any}()
for k ∈ keys(msg)
v = msg[k]
if typeof(v) <: Array && typeof(v).parameters[1] <: Complex
btype = typeof(v).parameters[1].parameters[1]
msg.__data__[k] = reinterpret(btype, v)
data[k] = reinterpret(btype, v)
elseif v !== nothing
k === :performative && (k = :perf)
k === :messageID && (k = :msgID)
data[k] = v
end
end
classname(msg), data
end

# converts Base64 encoded arrays to Julia arrays
Expand Down Expand Up @@ -261,45 +265,44 @@ function _b64toarray(v)
end

# creates a message object from a JSON representation of the object
function _inflate(json)
function _inflate(json::AbstractDict)
function inflate_recursively!(d)
for (k,v) in d
for (k, v) d
if typeof(v) <: Dict && haskey(v, "clazz") && match(r"^\[.$", v["clazz"]) != nothing
v = _b64toarray(v)
end
if typeof(v) <: Array && length(v) > 0
t = typeof(v[1])
v = Array{t}(v)
kcplx = k*"__isComplex"
kcplx = k * "__isComplex"
if haskey(d, kcplx) && d[kcplx]
v = Array{Complex{t}}(reinterpret(Complex{t}, v))
delete!(d, kcplx)
end
end
if typeof(v) <: Dict
v = inflate_recursively!(v)
end
d[k] = v
d[k] = typeof(v) <: AbstractDict ? inflate_recursively!(v) : v
end
return d
end

if typeof(json) == String
json = JSON.parse(json)
d
end
clazz = json["clazz"]
data = inflate_recursively!(json["data"])
stype = _messageclass_lookup(clazz)
obj = @eval $stype()
for (k,v) in data
obj = _messageclass_lookup(clazz)()
for (k, v) ∈ data
if k == "sender" || k == "recipient"
v = AgentID(v)
elseif k == "perf"
k = "performative"
v = Symbol(v)
elseif k == "msgID"
k = "messageID"
end
obj.__data__[k] = v
setproperty!(obj, Symbol(k), v; ignore_missingfields=true)
end
obj
end

_inflate(json::String) = _inflate(JSON.parse(json))

"""
send(gw, msg)

Expand All @@ -310,13 +313,13 @@ function send(gw::Gateway, msg)
isopen(gw.sock[]) || return false
msg.sender = gw.agentID
msg.sentAt = Dates.value(now())
_prepare!(msg)
clazz, data = _prepare(msg)
json = JSON.json(Dict(
"action" => "send",
"relay" => true,
"message" => Dict(
"clazz" => msg.__clazz__,
"data" => msg.__data__
:action => :send,
:relay => true,
:message => Dict(
:clazz => clazz,
:data => data
)
))
_println(gw.sock[], json)
Expand All @@ -340,10 +343,11 @@ it must take in a message and return `true` or `false`. A message for which it r
receive(gw::Gateway, timeout::Int=0) = receive(gw, msg->true, timeout)

const receive_counter = Threads.Atomic{Int}(0)

function receive(gw::Gateway, filt, timeout=0)
receive_id = (receive_counter[] += 1)
maybe_msg = lock(gw.msgqueue_lock) do
for (idx, msg) in pairs(gw.msgqueue)
for (idx, msg) pairs(gw.msgqueue)
if _matches(filt, msg)
return Some(popat!(gw.msgqueue, idx))
end
Expand All @@ -355,7 +359,7 @@ function receive(gw::Gateway, filt, timeout=0)
@async begin
sleep(timeout/1e3)
lock(gw.msgqueue_lock) do
for (idx, (_, id)) in pairs(gw.tasks_waiting_for_msg)
for (idx, (_, id)) pairs(gw.tasks_waiting_for_msg)
# We must identify the receive to remove from the waiting list
# based on the receive ID and not the task because the task which
# started this one may have had its previous receive satisfied and
Expand All @@ -372,14 +376,10 @@ function receive(gw::Gateway, filt, timeout=0)
push!(gw.tasks_waiting_for_msg, (current_task(), receive_id))
return nothing
end
if !isnothing(maybe_msg)
return something(maybe_msg)
end
isnothing(maybe_msg) || return something(maybe_msg)
while true
maybe_task_and_msg = wait()
if isnothing(maybe_task_and_msg)
return nothing
end
isnothing(maybe_task_and_msg) && return nothing
delivery_task, msg = maybe_task_and_msg
if _matches(filt, msg)
schedule(delivery_task, true)
Expand Down
Loading
Loading