Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split out zstandard compression (split from #5995) #6121

Closed
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ in development

Python 3.6 is no longer supported; Stackstorm requires at least Python 3.8.

* implemented zstandard compression for parameters and results. #5995
contributed by @guzzijones12

Fixed
~~~~~

Expand Down
3 changes: 3 additions & 0 deletions conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ connection_timeout = 3000
db_name = st2
# host of db server
host = 127.0.0.1
# compression for parameter and result storage in liveaction and execution models
# Valid values: zstandard, none
parameter_result_compression = zstandard
# password for db login
password = None
# port of db server
Expand Down
1 change: 1 addition & 0 deletions conf/st2.dev.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Config used by local development environment (tools/launch.dev.sh)
[database]
host = 127.0.0.1
parameter_result_compression = zstandard

[api]
# Host and port to bind the API server.
Expand Down
14 changes: 12 additions & 2 deletions contrib/runners/orquesta_runner/orquesta_runner/orquesta_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,22 @@ def start_workflow(self, action_parameters):
wf_def, self.execution, st2_ctx, notify_cfg=notify_cfg
)
except wf_exc.WorkflowInspectionError as e:
_, ex, tb = sys.exc_info()
status = ac_const.LIVEACTION_STATUS_FAILED
result = {"errors": e.args[1], "output": None}
result = {
"errors": e.args[1],
"output": None,
"traceback": "".join(traceback.format_tb(tb, 20)),
}
return (status, result, self.context)
except Exception as e:
_, ex, tb = sys.exc_info()
status = ac_const.LIVEACTION_STATUS_FAILED
result = {"errors": [{"message": six.text_type(e)}], "output": None}
result = {
"errors": [{"message": six.text_type(e)}],
"output": None,
"traceback": "".join(traceback.format_tb(tb, 20)),
}
return (status, result, self.context)

return self._handle_workflow_return_value(wf_ex_db)
Expand Down
44 changes: 39 additions & 5 deletions contrib/runners/orquesta_runner/tests/unit/test_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,21 @@ def test_fail_start_task_input_value_type(self):
workflow_execution=str(wf_ex_db.id)
)[0]
self.assertEqual(tk_ex_db.status, wf_statuses.FAILED)
self.assertDictEqual(tk_ex_db.result, {"errors": expected_errors})
self.assertEqual(
tk_ex_db.result["errors"][0]["type"], expected_errors[0]["type"]
)
self.assertEqual(
tk_ex_db.result["errors"][0]["message"], expected_errors[0]["message"]
)
self.assertEqual(
tk_ex_db.result["errors"][0]["task_id"], expected_errors[0]["task_id"]
)
self.assertEqual(
tk_ex_db.result["errors"][0]["route"], expected_errors[0]["route"]
)

lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
self.assertDictEqual(lv_ac_db.result, expected_result)

ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED)
Expand Down Expand Up @@ -522,13 +532,37 @@ def test_fail_next_task_input_value_type(self):
# Assert workflow execution and task2 execution failed.
wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(str(wf_ex_db.id))
self.assertEqual(wf_ex_db.status, wf_statuses.FAILED)
self.assertListEqual(
self.sort_workflow_errors(wf_ex_db.errors), expected_errors
self.assertEqual(
self.sort_workflow_errors(wf_ex_db.errors)[0]["type"],
expected_errors[0]["type"],
)
self.assertEqual(
self.sort_workflow_errors(wf_ex_db.errors)[0]["message"],
expected_errors[0]["message"],
)
self.assertEqual(
self.sort_workflow_errors(wf_ex_db.errors)[0]["task_id"],
expected_errors[0]["task_id"],
)
self.assertEqual(
self.sort_workflow_errors(wf_ex_db.errors)[0]["route"],
expected_errors[0]["route"],
)

tk2_ex_db = wf_db_access.TaskExecution.query(task_id="task2")[0]
self.assertEqual(tk2_ex_db.status, wf_statuses.FAILED)
self.assertDictEqual(tk2_ex_db.result, {"errors": expected_errors})
self.assertEqual(
tk2_ex_db.result["errors"][0]["type"], expected_errors[0]["type"]
)
self.assertEqual(
tk2_ex_db.result["errors"][0]["message"], expected_errors[0]["message"]
)
self.assertEqual(
tk2_ex_db.result["errors"][0]["task_id"], expected_errors[0]["task_id"]
)
self.assertEqual(
tk2_ex_db.result["errors"][0]["route"], expected_errors[0]["route"]
)

lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
Expand Down
6 changes: 5 additions & 1 deletion contrib/runners/orquesta_runner/tests/unit/test_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ def test_notify_task_list_nonexistent_task(self):
}

self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_FAILED)
self.assertDictEqual(lv_ac_db.result, expected_result)
self.assertEqual(
lv_ac_db.result["errors"][0]["message"],
expected_result["errors"][0]["message"],
)
self.assertIsNone(lv_ac_db.result["output"], expected_result["output"])

def test_notify_task_list_item_value(self):
wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
Expand Down
5 changes: 4 additions & 1 deletion st2actions/st2actions/container/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,13 @@ def _do_run(self, runner):
):
queries.setup_query(runner.liveaction.id, runner.runner_type, context)
except:
LOG.exception("Failed to run action.")
_, ex, tb = sys.exc_info()
# mark execution as failed.
status = action_constants.LIVEACTION_STATUS_FAILED
LOG.exception(
"Failed to run action. traceback: %s"
% "".join(traceback.format_tb(tb, 20))
)
# include the error message and traceback to try and provide some hints.
result = {
"error": str(ex),
Expand Down
57 changes: 34 additions & 23 deletions st2actions/st2actions/policies/concurrency_by_attr.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@

from __future__ import absolute_import

import six

from st2common.constants import action as action_constants
from st2common import log as logging
from st2common.fields import JSONDictEscapedFieldCompatibilityField
from st2common.persistence import action as action_access
from st2common.services import action as action_service
from st2common.policies.concurrency import BaseConcurrencyApplicator
Expand All @@ -41,31 +40,43 @@ def __init__(
)
self.attributes = attributes or []

def _get_filters(self, target):
filters = {
("parameters__%s" % k): v
for k, v in six.iteritems(target.parameters)
if k in self.attributes
}

filters["action"] = target.action
filters["status"] = None

return filters

def _apply_before(self, target):
# Get the count of scheduled and running instances of the action.
filters = self._get_filters(target)

# Get the count of scheduled instances of the action.
filters["status"] = action_constants.LIVEACTION_STATUS_SCHEDULED
scheduled = action_access.LiveAction.count(**filters)
scheduled_filters = {
"status": action_constants.LIVEACTION_STATUS_SCHEDULED,
"action": target.action,
}
scheduled = [i for i in action_access.LiveAction.query(**scheduled_filters)]

# Get the count of running instances of the action.
filters["status"] = action_constants.LIVEACTION_STATUS_RUNNING
running = action_access.LiveAction.count(**filters)
running_filters = {
"status": action_constants.LIVEACTION_STATUS_RUNNING,
"action": target.action,
}
running = [i for i in action_access.LiveAction.query(**running_filters)]
running.extend(scheduled)
count = 0
target_parameters = JSONDictEscapedFieldCompatibilityField().parse_field_value(
target.parameters
)
target_key_value_policy_attributes = {
k: v for k, v in target_parameters.items() if k in self.attributes
}

count = scheduled + running
for i in running:
running_event_parameters = (
JSONDictEscapedFieldCompatibilityField().parse_field_value(i.parameters)
)
# list of event parameter values that are also in policy
running_event_policy_item_key_value_attributes = {
k: v
for k, v in running_event_parameters.items()
if k in self.attributes
}
if (
running_event_policy_item_key_value_attributes
== target_key_value_policy_attributes
):
count += 1

# Mark the execution as scheduled if threshold is not reached or delayed otherwise.
if count < self.threshold:
Expand Down
37 changes: 13 additions & 24 deletions st2api/st2api/controllers/v1/actionexecutions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from st2common.exceptions import apivalidation as validation_exc
from st2common.exceptions import param as param_exc
from st2common.exceptions import trace as trace_exc
from st2common.fields import JSONDictEscapedFieldCompatibilityField
from st2common.models.api.action import LiveActionAPI
from st2common.models.api.action import LiveActionCreateAPI
from st2common.models.api.base import cast_argument_value
Expand Down Expand Up @@ -416,36 +417,18 @@ def get(

:rtype: ``str``
"""
# NOTE: Here we intentionally use as_pymongo() to avoid mongoengine layer even for old style
# data
# NOTE: we need to use to_python() to uncompress the data
try:
result = (
self.access.impl.model.objects.filter(id=id)
.only("result")
.as_pymongo()[0]
self.access.impl.model.objects.filter(id=id).only("result")[0].result
)
except IndexError:
raise NotFoundException("Execution with id %s not found" % (id))

if isinstance(result["result"], dict):
# For backward compatibility we also support old non JSON field storage format
if pretty_format:
response_body = orjson.dumps(
result["result"], option=orjson.OPT_INDENT_2
)
else:
response_body = orjson.dumps(result["result"])
if pretty_format:
response_body = orjson.dumps(result, option=orjson.OPT_INDENT_2)
else:
# For new JSON storage format we just use raw value since it's already JSON serialized
# string
response_body = result["result"]

if pretty_format:
# Pretty format is not a default behavior since it adds quite some overhead (e.g.
# 10-30ms for non pretty format for 4 MB json vs ~120 ms for pretty formatted)
response_body = orjson.dumps(
orjson.loads(result["result"]), option=orjson.OPT_INDENT_2
)
response_body = orjson.dumps(result)

response = Response()
response.headers["Content-Type"] = "text/json"
Expand Down Expand Up @@ -634,8 +617,14 @@ def post(self, spec_api, id, requester_user, no_merge=False, show_secrets=False)

# Merge in any parameters provided by the user
new_parameters = {}
original_parameters = getattr(existing_execution, "parameters", b"{}")
original_params_decoded = (
JSONDictEscapedFieldCompatibilityField().parse_field_value(
original_parameters
)
)
if not no_merge:
new_parameters.update(getattr(existing_execution, "parameters", {}))
new_parameters.update(original_params_decoded)
new_parameters.update(spec_api.parameters)

# Create object for the new execution
Expand Down
Loading
Loading