Skip to content

Commit

Permalink
feat(call): handle resume errors as a new 'error' output port
Browse files Browse the repository at this point in the history
  • Loading branch information
hishamhm committed Nov 25, 2024
1 parent c7cb10b commit 229e6f4
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 20 deletions.
2 changes: 2 additions & 0 deletions docs/datakit.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ An HTTP dispatch call.

* `body`: body returned as the dispatch response.
* `headers`: headers returned as the dispatch response.
* `error`: triggered if a dispatch error occurs, such as a DNS resolver timeout, etc.
The port returns the error message.

#### Supported attributes:

Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ mod test {
&[&[], &[]],
&[&[], &[]],
&[&[(5, 0)]],
&[&[(6, 0)], &[]],
&[&[(6, 0)], &[], &[]],
&[],
];
for (i, &output_list) in output_lists.iter().enumerate() {
Expand Down
18 changes: 11 additions & 7 deletions src/nodes/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,16 @@ impl Node for Call {
}

fn resume(&self, ctx: &dyn HttpContext, _inputs: &Input) -> State {
log::debug!("call: resume");
let headers = payload::from_pwm_headers(ctx.get_http_call_response_headers());

let headers = Some(payload::from_pwm_headers(
ctx.get_http_call_response_headers(),
));
if let Some(dispatch_status) = headers.get_str(":dispatch_status") {
if dispatch_status != "ok" {
#[cfg(debug_assertions)]
log::debug!("call: resume failure status: {dispatch_status}");

return Done(vec![None, None, Some(Payload::Raw(dispatch_status.into()))]);
}
}

let body = if let Some(body) = ctx.get_http_call_response_body(0, usize::MAX) {
let content_type = ctx.get_http_call_response_header("Content-Type");
Expand All @@ -117,9 +122,8 @@ impl Node for Call {
};

// TODO only produce an output if it is connected
// TODO produce a Fail() status on HTTP >= 400

Done(vec![body, headers])
Done(vec![body, Some(headers), None])
}
}

Expand All @@ -134,7 +138,7 @@ impl NodeFactory for CallFactory {
}
fn default_output_ports(&self) -> PortConfig {
PortConfig {
defaults: Some(PortConfig::names(&["body", "headers"])),
defaults: Some(PortConfig::names(&["body", "headers", "error"])),
user_defined_ports: false,
}
}
Expand Down
13 changes: 1 addition & 12 deletions src/nodes/exit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,7 @@ impl NodeConfig for ExitConfig {
}

fn default_outputs(&self) -> Option<Vec<NodeDefaultLink>> {
Some(vec![
NodeDefaultLink {
this_port: "body".into(),
other_node: "response".into(),
other_port: "body".into(),
},
NodeDefaultLink {
this_port: "headers".into(),
other_node: "response".into(),
other_port: "headers".into(),
},
])
None
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@ impl Payload {
}
}

pub fn get(&self, key: &str) -> Option<&serde_json::Value> {
match self {
Payload::Json(serde_json::Value::Object(map)) => map.get(key),
_ => None,
}
}

pub fn get_str(&self, key: &str) -> Option<&str> {
self.get(key).and_then(serde_json::Value::as_str)
}

pub fn json_null() -> Self {
Self::Json(Json::Null)
}
Expand Down

0 comments on commit 229e6f4

Please sign in to comment.