Skip to content

Commit

Permalink
WIP Ensemble and some other fixes (#103)
Browse files Browse the repository at this point in the history
Co-authored-by: Christopher Rackauckas <[email protected]>
  • Loading branch information
joshday and ChrisRackauckas authored Jul 19, 2023
1 parent 05063a3 commit 13e8715
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "SimulationService"
uuid = "e66378d9-a322-4933-8764-0ce0bcab4993"
authors = ["Five Grant <[email protected]>"]
version = "0.14.5"
version = "0.15.0"

[deps]
AMQPClient = "79c8b4cd-a41a-55fa-907c-fab5288e1383"
Expand Down
97 changes: 61 additions & 36 deletions src/SimulationService.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ function __init__()
if Threads.nthreads() == 1
@warn "SimulationService.jl expects `Threads.nthreads() > 1`. Use e.g. `julia --threads=auto`."
end
simulation_api_spec_main = "https://raw.githubusercontent.com/DARPA-ASKEM/simulation-api-spec/main"
openapi_spec[] = YAML.load_file(download("$simulation_api_spec_main/openapi.yaml"))
simulation_schema[] = get_json("$simulation_api_spec_main/schemas/simulation.json")
openapi_spec[] = YAML.load_file(download("https://raw.githubusercontent.com/DARPA-ASKEM/simulation-api-spec/main/openapi.yaml"))
simulation_schema[] = get_json("https://raw.githubusercontent.com/DARPA-ASKEM/simulation-api-spec/main/schemas/simulation.json")
petrinet_schema[] = get_json("https://raw.githubusercontent.com/DARPA-ASKEM/Model-Representations/main/petrinet/petrinet_schema.json")

HOST[] = get(ENV, "SIMSERVICE_HOST", "0.0.0.0")
Expand Down Expand Up @@ -97,11 +96,16 @@ function start!(; host=HOST[], port=PORT[], kw...)
JobSchedulers.scheduler_start()
JobSchedulers.set_scheduler(max_cpu=JobSchedulers.SCHEDULER_MAX_CPU, max_mem=0.5, update_second=0.05, max_job=5000)
Oxygen.resetstate()
Oxygen.@get "/" health
Oxygen.@get "/status/{id}" job_status
Oxygen.@post "/{operation_name}" operation

Oxygen.@get "/" health
Oxygen.@get "/status/{id}" job_status
Oxygen.@post "/kill/{id}" job_kill

Oxygen.@post "/simulate" req -> operation(req, "simulate")
Oxygen.@post "/calibrate" req -> operation(req, "calibrate")
Oxygen.@post "/ensemble-simulate" req -> operation(req, "ensemble-simulate")
Oxygen.@post "/ensemble-calibrate" req -> operation(req, "ensemble-calibrate")

# For /docs
Oxygen.mergeschema(openapi_spec[])

Expand All @@ -126,17 +130,6 @@ get_json(url::String) = JSON3.read(HTTP.get(url, json_header).body)

timestamp() = Dates.format(now(), "yyyy-mm-ddTHH:MM:SS")

# Dump all the info we can get about a simulation `id`
function debug_data(id::String)
@assert ENABLE_TDS[]
data_service_model = DataServiceModel(id::String)
request_json = data_service_model.execution_payload
amr = get_model(data_service_model.execution_payload.model_config_id)
route = String(Dict(v => k for (k,v) in operation_to_dsm_type)[data_service_model.type])
operation_request = OperationRequest(HTTP.Request("POST", "", [], JSON3.write(request_json)), route)
job_status = get_job_status(get_job(id))
return (; request_json, amr, data_service_model, operation_request, job_status)
end

#-----------------------------------------------------------------------------# job endpoints
get_job(id::String) = JobSchedulers.job_query(jobhash(id))
Expand Down Expand Up @@ -170,6 +163,7 @@ function job_kill(::HTTP.Request, id::String)
if isnothing(job)
return NO_JOB
else
# TODO: update simulation model in TDS with status="cancelled"
JobSchedulers.cancel!(job)
return HTTP.Response(200)
end
Expand All @@ -187,7 +181,7 @@ health(::HTTP.Request) = (
Base.@kwdef mutable struct OperationRequest
obj::JSON3.Object = JSON3.Object() # untouched JSON from request sent by HMI
id::String = "sciml-$(UUIDs.uuid4())" # matches DataServiceModel :id
operation::Symbol = :unknown # :simulate, :calibrate, etc.
route::String = "unknown" # :simulate, :calibrate, etc.
model::Union{Nothing, JSON3.Object} = nothing # ASKEM Model Representation (AMR)
models::Union{Nothing, Vector{JSON3.Object}} = nothing # Multiple models (in AMR)
timespan::Union{Nothing, NTuple{2, Float64}} = nothing # (start, end)
Expand All @@ -196,16 +190,16 @@ Base.@kwdef mutable struct OperationRequest
end

function Base.show(io::IO, o::OperationRequest)
println(io, "OperationRequest(id=$(repr(o.id)), operation=$(repr(o.operation)))")
println(io, "OperationRequest(id=$(repr(o.id)), route=$(repr(o.route)))")
end

function OperationRequest(req::HTTP.Request, operation_name::String)
function OperationRequest(req::HTTP.Request, route::String)
o = OperationRequest()
@info "[$(o.id)] OperationRequest recieved to route /$operation_name: $(String(copy(req.body)))"
@info "[$(o.id)] OperationRequest recieved to route /$route: $(String(copy(req.body)))"
o.obj = JSON3.read(req.body)
o.operation = Symbol(operation_name)
o.route = route
for (k,v) in o.obj
if !ENABLE_TDS[] && k in [:model_config_id, :model_config_ids, :dataset]
if !ENABLE_TDS[] && k in [:model_config_id, :model_config_ids, :dataset, :model_configs]
@warn "TDS Disabled - ignoring key `$k` from request with id: $(repr(o.id))"
continue
end
Expand All @@ -215,6 +209,9 @@ function OperationRequest(req::HTTP.Request, operation_name::String)
k == :dataset ? (o.df = get_dataset(v)) :
k == :model ? (o.model = v) :

# For ensemble, we get objects with {id, solution_mappings, weight}
k == :model_configs ? (o.models = [get_model(m.id) for m in v]) :

# For testing only:
k == :local_model_configuration_file ? (o.model = JSON.read(v).configuration) :
k == :local_model_file ? (o.model = JSON3.read(v)) :
Expand All @@ -224,9 +221,11 @@ function OperationRequest(req::HTTP.Request, operation_name::String)
return o
end



function solve(o::OperationRequest)
callback = get_callback(o)
T = operations2type[o.operation]
T = route2operation_type[o.route]
op = T(o)
o.result = solve(op; callback)
end
Expand Down Expand Up @@ -259,23 +258,47 @@ end
StructTypes.StructType(::Type{DataServiceModel}) = StructTypes.Mutable()

# PIRACY to fix JSON3.read(str, DataServiceModel)
# TODO make upstream issue in JSON3
JSON3.Object(x::AbstractDict) = JSON3.read(JSON3.write(x))

# Initialize a DataServiceModel
operation_to_dsm_type = Dict(
:simulate => "simulation",
:calibrate => "calibration_simulation",
:ensemble => "ensemble"
# translate route (in OpenAPI spec) to type (in TDS)
# route: https://raw.githubusercontent.com/DARPA-ASKEM/simulation-api-spec/main/openapi.yaml
# type: https://raw.githubusercontent.com/DARPA-ASKEM/simulation-api-spec/main/schemas/simulation.json
route2type = Dict(
"simulate" => "simulation",
"calibrate" => "calibration",
"ensemble-simulate" => "ensemble",
"ensemble-calibrate" => "ensemble"
)

# Initialize a DataServiceModel
function DataServiceModel(o::OperationRequest)
m = DataServiceModel()
m.id = o.id
m.type = operation_to_dsm_type[o.operation]
m.type = route2type[o.route]
m.execution_payload = o.obj
return m
end

#-----------------------------------------------------------------------------# debug_data
# For debugging: Try to recreate the OperationRequest from TDS data
# NOTE: TDS does not save which route was used...
function OperationRequest(m::DataServiceModel)
req = HTTP.Request("POST", "", [], JSON3.write(m.execution_payload))
return OperationRequest(req, "DataServiceModel: $(m.type)")
end

# Dump all the info we can get about a simulation `id`
function debug_data(id::String)
@assert ENABLE_TDS[]
data_service_model = DataServiceModel(id::String)
request_json = data_service_model.execution_payload
amr = get_model(data_service_model.execution_payload.model_config_id)
operation_request = OperationRequest(data_service_model)
job = get_job(id)
return (; request_json, amr, data_service_model, operation_request, job)
end

#-----------------------------------------------------------------------------# publish_to_rabbitmq
# published as JSON3.write(content)
function publish_to_rabbitmq(content)
Expand Down Expand Up @@ -318,6 +341,8 @@ function get_dataset(obj::JSON3.Object)
@info "`get_dataset` (dataset id=$(repr(obj.id))) rename! $k => $v"
rename!(df, k => v)
end
"timestep" in names(df) && rename!(df, "timestep" => "timestep") # hack to get df in our "schema"
@info "get_dataset (id=$(repr(obj.id))) with names: $(names(df))"
return df
end

Expand Down Expand Up @@ -385,7 +410,7 @@ function complete(o::OperationRequest)
tds_url = "$(TDS_URL[])/simulations/$(o.id)/upload-url?filename=$filename"
s3_url = get_json(tds_url).url
HTTP.put(s3_url, header; body=body)
update(o; status = "complete", completed_time = timestamp(), result_files = [s3_url])
update(o; status = "complete", completed_time = timestamp(), result_files = [filename])
end


Expand All @@ -404,9 +429,9 @@ last_job = Ref{JobSchedulers.Job}()
# 5) Job finishes: We get url from TDS where we can store results: GET /simulations/$sim_id/upload-url?filename=result.csv
# 6) We upload results to S3: PUT $url
# 7) We update simulation in TDS(status="complete", complete_time=<datetime>): PUT /simulations/$id
function operation(request::HTTP.Request, operation_name::String)
@info "Creating OperationRequest from POST to route $operation_name"
o = OperationRequest(request, operation_name) # 1, 3
function operation(request::HTTP.Request, route::String)
@info "Creating OperationRequest from POST to route $route"
o = OperationRequest(request, route) # 1, 3
create(o) # 2
job = JobSchedulers.Job(
@task begin
Expand All @@ -423,7 +448,7 @@ function operation(request::HTTP.Request, operation_name::String)
job.id = jobhash(o.id)
last_operation[] = o
last_job[] = job
@info "Submitting job..."
@info "Submitting job $(o.id)"
JobSchedulers.submit!(job)

body = JSON3.write((; simulation_id = o.id))
Expand All @@ -433,6 +458,6 @@ end
#-----------------------------------------------------------------------------# operations.jl
include("operations.jl")

include("precompile.jl")
get(ENV, "SIMSERVICE_PRECOMPILE", "true") == "true" && include("precompile.jl")

end # module
Loading

0 comments on commit 13e8715

Please sign in to comment.