diff --git a/engine/immediate_task_conrrent_processor.go b/engine/immediate_task_conrrent_processor.go index 8a934435..a170ca34 100644 --- a/engine/immediate_task_conrrent_processor.go +++ b/engine/immediate_task_conrrent_processor.go @@ -6,13 +6,14 @@ package engine import ( "context" "fmt" + "io/ioutil" + "net/http" + "time" + "github.com/xcherryio/apis/goapi/xcapi" "github.com/xcherryio/xcherry/common/decision" "github.com/xcherryio/xcherry/common/httperror" "github.com/xcherryio/xcherry/persistence/data_models" - "io/ioutil" - "net/http" - "time" "github.com/xcherryio/xcherry/common/log" "github.com/xcherryio/xcherry/common/log/tag" @@ -354,34 +355,69 @@ func (w *immediateTaskConcurrentProcessor) processExecuteTask( var resp *xcapi.AsyncStateExecuteResponse var httpResp *http.Response loadedGlobalAttributesResp, errToCheck := w.loadGlobalAttributesIfNeeded(ctx, prep, task) - if errToCheck == nil { - req := apiClient.DefaultAPI.ApiV1XcherryWorkerAsyncStateExecutePost(ctx) - resp, httpResp, errToCheck = req.AsyncStateExecuteRequest( - xcapi.AsyncStateExecuteRequest{ - Context: createApiContext( - prep, - task, - prep.Info.RecoverFromStateExecutionId, - prep.Info.RecoverFromApi), - ProcessType: prep.Info.ProcessType, - StateId: task.StateId, - StateInput: &xcapi.EncodedObject{ - Encoding: prep.Input.Encoding, - Data: prep.Input.Data, - }, - CommandResults: &prep.WaitUntilCommandResults, - LoadedGlobalAttributes: &loadedGlobalAttributesResp.Response, - }, - ).Execute() - if httpResp != nil { - defer httpResp.Body.Close() - } + if errToCheck != nil { + if httperror.CheckHttpResponseAndError(errToCheck, httpResp, w.logger) { + status, details := w.composeHttpError(errToCheck, httpResp, prep.Info, task) - if errToCheck == nil { - errToCheck = decision.ValidateDecision(resp.StateDecision) + nextIntervalSecs, shouldRetry := w.checkRetry(task, prep.Info) + if shouldRetry { + return w.retryTask(ctx, task, prep, nextIntervalSecs, status, details) + } + return w.applyStateFailureRecoveryPolicy(ctx, + task, + prep, + status, + details, + task.ImmediateTaskInfo.WorkerTaskBackoffInfo.CompletedAttempts, + xcapi.EXECUTE_API) + } + } + loadedLocalAttributesResp, errToCheck := w.loadLocalAttributesIfNeeded(ctx, prep, task) + if errToCheck != nil { + if httperror.CheckHttpResponseAndError(errToCheck, httpResp, w.logger) { + status, details := w.composeHttpError(errToCheck, httpResp, prep.Info, task) + + nextIntervalSecs, shouldRetry := w.checkRetry(task, prep.Info) + if shouldRetry { + return w.retryTask(ctx, task, prep, nextIntervalSecs, status, details) + } + return w.applyStateFailureRecoveryPolicy(ctx, + task, + prep, + status, + details, + task.ImmediateTaskInfo.WorkerTaskBackoffInfo.CompletedAttempts, + xcapi.EXECUTE_API) } } + req := apiClient.DefaultAPI.ApiV1XcherryWorkerAsyncStateExecutePost(ctx) + resp, httpResp, errToCheck = req.AsyncStateExecuteRequest( + xcapi.AsyncStateExecuteRequest{ + Context: createApiContext( + prep, + task, + prep.Info.RecoverFromStateExecutionId, + prep.Info.RecoverFromApi), + ProcessType: prep.Info.ProcessType, + StateId: task.StateId, + StateInput: &xcapi.EncodedObject{ + Encoding: prep.Input.Encoding, + Data: prep.Input.Data, + }, + CommandResults: &prep.WaitUntilCommandResults, + LoadedGlobalAttributes: &loadedGlobalAttributesResp.Response, + LoadedLocalAttributes: &loadedLocalAttributesResp.Response, + }, + ).Execute() + if httpResp != nil { + defer httpResp.Body.Close() + } + + if errToCheck == nil { + errToCheck = decision.ValidateDecision(resp.StateDecision) + } + if httperror.CheckHttpResponseAndError(errToCheck, httpResp, w.logger) { status, details := w.composeHttpError(errToCheck, httpResp, prep.Info, task) @@ -411,6 +447,8 @@ func (w *immediateTaskConcurrentProcessor) processExecuteTask( TaskSequence: task.GetTaskSequence(), GlobalAttributeTableConfig: prep.Info.GlobalAttributeConfig, UpdateGlobalAttributes: resp.WriteToGlobalAttributes, + LocalAttributeConfig: prep.Info.LocalAttributeConfig, + UpdateLocalAttributes: resp.WriteToLocalAttributes, }) if err != nil { return err @@ -595,3 +633,35 @@ func (w *immediateTaskConcurrentProcessor) loadGlobalAttributesIfNeeded( return resp, err } + +func (w *immediateTaskConcurrentProcessor) loadLocalAttributesIfNeeded( + ctx context.Context, prep data_models.PrepareStateExecutionResponse, task data_models.ImmediateTask, +) (*data_models.LoadLocalAttributesResponse, error) { + if prep.Info.StateConfig == nil || + prep.Info.StateConfig.LoadLocalAttributesRequest == nil { + return &data_models.LoadLocalAttributesResponse{}, nil + } + + if prep.Info.LocalAttributeConfig == nil { + return &data_models.LoadLocalAttributesResponse{}, + fmt.Errorf("local attribute config is not available") + } + + w.logger.Debug("loading local attributes for state execute", + tag.StateExecutionId(task.GetStateExecutionId()), + tag.JsonValue(prep.Info.StateConfig), + tag.JsonValue(prep.Info.LocalAttributeConfig)) + + resp, err := w.store.LoadLocalAttributes(ctx, data_models.LoadLocalAttributesRequest{ + ProcessExecutionId: task.ProcessExecutionId, + AllLocalAttributeKeys: *prep.Info.LocalAttributeConfig, + Request: *prep.Info.StateConfig.LoadLocalAttributesRequest, + }) + + w.logger.Debug("loaded local attributes for state execute", + tag.StateExecutionId(task.GetStateExecutionId()), + tag.JsonValue(resp), + tag.Error(err)) + + return resp, err +} diff --git a/extensions/data_models_row.go b/extensions/data_models_row.go index ccc6ba19..6dbad0b8 100644 --- a/extensions/data_models_row.go +++ b/extensions/data_models_row.go @@ -4,9 +4,10 @@ package extensions import ( - "github.com/xcherryio/xcherry/persistence/data_models" "time" + "github.com/xcherryio/xcherry/persistence/data_models" + "github.com/jmoiron/sqlx/types" "github.com/xcherryio/xcherry/common/uuid" ) @@ -250,4 +251,11 @@ type ( CustomTableRowSelect struct { ColumnToValue map[string]string } + + LocalAttributeRow struct { + ProcessExecutionId uuid.UUID + ProcessExecutionIdString string + Key string + Value types.JSONText + } ) diff --git a/extensions/postgres/non_transactional.go b/extensions/postgres/non_transactional.go index 66ac64df..a4cba31a 100644 --- a/extensions/postgres/non_transactional.go +++ b/extensions/postgres/non_transactional.go @@ -167,3 +167,21 @@ func (d dbSession) SelectCustomTableByPK( ColumnToValue: columnToValue, }, nil } + +const selectLocalAttributesQuery = `SELECT +process_execution_id, key, value +FROM xcherry_sys_local_attributes WHERE process_execution_id = ? AND key IN (?) +` + +func (d dbSession) SelectLocalAttributes( + ctx context.Context, processExecutionId uuid.UUID, keys []string, +) ([]extensions.LocalAttributeRow, error) { + var rows []extensions.LocalAttributeRow + query, args, err := sqlx.In(selectLocalAttributesQuery, processExecutionId.String(), keys) + if err != nil { + return nil, err + } + query = d.db.Rebind(query) + err = d.db.SelectContext(ctx, &rows, query, args...) + return rows, err +} diff --git a/extensions/postgres/schema/xcherry_sys_schema.sql b/extensions/postgres/schema/xcherry_sys_schema.sql index 4ccf1a1f..e5fe43db 100644 --- a/extensions/postgres/schema/xcherry_sys_schema.sql +++ b/extensions/postgres/schema/xcherry_sys_schema.sql @@ -72,4 +72,11 @@ CREATE TABLE xcherry_sys_local_queue_messages( queue_name VARCHAR(31), payload jsonb, PRIMARY KEY (process_execution_id, dedup_id) -); \ No newline at end of file +); + +CREATE TABLE xcherry_sys_local_attributes( + process_execution_id uuid NOT NULL, + key VARCHAR(31) NOT NULL, + value jsonb, + PRIMARY KEY (process_execution_id, key) +); diff --git a/extensions/postgres/transactional.go b/extensions/postgres/transactional.go index ec62b7d2..9d00d5bf 100644 --- a/extensions/postgres/transactional.go +++ b/extensions/postgres/transactional.go @@ -335,3 +335,26 @@ func (d dbTx) UpsertCustomTableByPK( ON CONFLICT (`+pkName+`) DO `+updateClause) return err } + +const insertLocalAttributeQuery = `INSERT INTO xcherry_sys_local_attributes + (process_execution_id, key, value) + VALUES (:process_execution_id_string, :key, :value) +` + +func (d dbTx) InsertLocalAttribute(ctx context.Context, row extensions.LocalAttributeRow) error { + row.ProcessExecutionIdString = row.ProcessExecutionId.String() + _, err := d.tx.NamedExecContext(ctx, insertLocalAttributeQuery, row) + return err +} + +const upsertLocalAttributeQuery = `INSERT INTO xcherry_sys_local_attributes +(process_execution_id, key, value) +VALUES (:process_execution_id_string, :key, :value) +ON CONFLICT (process_execution_id, key) DO UPDATE SET value = :value +` + +func (d dbTx) UpsertLocalAttribute(ctx context.Context, row extensions.LocalAttributeRow) error { + row.ProcessExecutionIdString = row.ProcessExecutionId.String() + _, err := d.tx.NamedExecContext(ctx, upsertLocalAttributeQuery, row) + return err +} diff --git a/extensions/sql_db_interfaces.go b/extensions/sql_db_interfaces.go index 5ee34462..305ad168 100644 --- a/extensions/sql_db_interfaces.go +++ b/extensions/sql_db_interfaces.go @@ -76,6 +76,9 @@ type transactionalCRUD interface { UpsertCustomTableByPK( ctx context.Context, tableName string, pkName, pkValue string, colToValue map[string]string, ) error + + InsertLocalAttribute(ctx context.Context, insert LocalAttributeRow) error + UpsertLocalAttribute(ctx context.Context, row LocalAttributeRow) error } type nonTransactionalCRUD interface { @@ -102,6 +105,10 @@ type nonTransactionalCRUD interface { SelectCustomTableByPK( ctx context.Context, tableName string, pkName, pkValue string, columns []string, ) (*CustomTableRowSelect, error) + + SelectLocalAttributes( + ctx context.Context, processExecutionId uuid.UUID, keys []string, + ) ([]LocalAttributeRow, error) } type ErrorChecker interface { diff --git a/go.mod b/go.mod index 4e634335..f336b9d5 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/urfave/cli/v2 v2.25.7 github.com/xcherryio/apis v0.0.2 - github.com/xcherryio/sdk-go v0.0.0-20231121001803-5542cc80706e + github.com/xcherryio/sdk-go v0.0.0-20231130204036-991635cecd77 go.uber.org/multierr v1.10.0 go.uber.org/zap v1.26.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index 5460ded4..d8c706cf 100644 --- a/go.sum +++ b/go.sum @@ -87,8 +87,8 @@ github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= github.com/xcherryio/apis v0.0.2 h1:sbxlWCrMrjmSQOLYOSHPiKZe+/LvpW0A5A6/PanUqRk= github.com/xcherryio/apis v0.0.2/go.mod h1:7peiYpRUjmq0rl/8F0MmvFH8Vp7Y8Dq5OpRgpH0cMJU= -github.com/xcherryio/sdk-go v0.0.0-20231121001803-5542cc80706e h1:O1kd3ezvfB4TfN4hfKidqHDG/WpybCVXSIQY3FLL27A= -github.com/xcherryio/sdk-go v0.0.0-20231121001803-5542cc80706e/go.mod h1:3f+aG6R1WAurpTaZgemfnDqwIrp4BS3WJYM32ekm28k= +github.com/xcherryio/sdk-go v0.0.0-20231130204036-991635cecd77 h1:3DIl/bLd9X3CjIhuqd8IEKZpwRODeOdSJ31SHqwRoeE= +github.com/xcherryio/sdk-go v0.0.0-20231130204036-991635cecd77/go.mod h1:3f+aG6R1WAurpTaZgemfnDqwIrp4BS3WJYM32ekm28k= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= diff --git a/integTests/all_test.go b/integTests/all_test.go index 1e292ab4..2d79e4c3 100644 --- a/integTests/all_test.go +++ b/integTests/all_test.go @@ -4,6 +4,7 @@ package integTests import ( + "github.com/xcherryio/sdk-go/integTests/local_attribute" "testing" "github.com/xcherryio/sdk-go/integTests/global_attribute" @@ -83,6 +84,10 @@ func TestGlobalAttributesWithMultiTables(t *testing.T) { global_attribute.TestGlobalAttributesWithMultiTables(t, client) } +func TestLocalAttributes(t *testing.T) { + local_attribute.TestLocalAttributes(t, client) +} + func TestProcessTimeoutCase1(t *testing.T) { process_timeout.TestStartTimeoutProcessCase1(t, client) } diff --git a/integTests/init.go b/integTests/init.go index b006243a..8b68dd3f 100644 --- a/integTests/init.go +++ b/integTests/init.go @@ -7,6 +7,7 @@ import ( "github.com/xcherryio/sdk-go/integTests/basic" "github.com/xcherryio/sdk-go/integTests/failure_recovery" "github.com/xcherryio/sdk-go/integTests/global_attribute" + "github.com/xcherryio/sdk-go/integTests/local_attribute" "github.com/xcherryio/sdk-go/integTests/multi_states" "github.com/xcherryio/sdk-go/integTests/process_timeout" "github.com/xcherryio/sdk-go/integTests/state_decision" @@ -28,6 +29,7 @@ func init() { &failure_recovery.StateFailureRecoveryTestExecuteNoWaitUntilProcess{}, &failure_recovery.StateFailureRecoveryTestExecuteFailedAtStartProcess{}, &multi_states.MultiStatesProcess{}, + &local_attribute.LocalAttributeTestProcess{}, &state_decision.GracefulCompleteProcess{}, &state_decision.ForceCompleteProcess{}, &state_decision.ForceFailProcess{}, diff --git a/persistence/data_models/async_state_execution_info_json.go b/persistence/data_models/async_state_execution_info_json.go index d7c3a455..1d692422 100644 --- a/persistence/data_models/async_state_execution_info_json.go +++ b/persistence/data_models/async_state_execution_info_json.go @@ -17,6 +17,7 @@ type AsyncStateExecutionInfoJson struct { RecoverFromStateExecutionId *string `json:"recoverFromStateExecutionId,omitempty"` RecoverFromApi *xcapi.StateApiType `json:"recoverFromApi,omitempty"` GlobalAttributeConfig *InternalGlobalAttributeConfig `json:"globalAttributeConfig"` + LocalAttributeConfig *InternalLocalAttributeConfig `json:"localAttributeConfig"` } func FromStartRequestToStateInfoBytes(req xcapi.ProcessExecutionStartRequest) ([]byte, error) { @@ -27,6 +28,7 @@ func FromStartRequestToStateInfoBytes(req xcapi.ProcessExecutionStartRequest) ([ WorkerURL: req.GetWorkerUrl(), StateConfig: req.StartStateConfig, GlobalAttributeConfig: getInternalGlobalAttributeConfig(req), + LocalAttributeConfig: getInternalLocalAttributeConfig(req), } return infoJson.ToBytes() diff --git a/persistence/data_models/data_models.go b/persistence/data_models/data_models.go index ab24c0e2..5824fe8f 100644 --- a/persistence/data_models/data_models.go +++ b/persistence/data_models/data_models.go @@ -5,10 +5,10 @@ package data_models import ( "fmt" - "github.com/xcherryio/apis/goapi/xcapi" "strconv" "strings" + "github.com/xcherryio/apis/goapi/xcapi" "github.com/xcherryio/xcherry/common/uuid" ) @@ -257,6 +257,9 @@ type ( GlobalAttributeTableConfig *InternalGlobalAttributeConfig UpdateGlobalAttributes []xcapi.GlobalAttributeTableRowUpdate + LocalAttributeConfig *InternalLocalAttributeConfig + UpdateLocalAttributes []xcapi.KeyValue + TaskShardId int32 TaskSequence int64 } @@ -265,6 +268,8 @@ type ( HasNewImmediateTask bool FailAtUpdatingGlobalAttributes bool UpdatingGlobalAttributesError error + FailAtUpdatingLocalAttributes bool + UpdatingLocalAttributesError error } PublishToLocalQueueRequest struct { @@ -322,6 +327,16 @@ type ( FailAtUpdatingGlobalAttributes bool UpdatingGlobalAttributesError error } + + LoadLocalAttributesRequest struct { + ProcessExecutionId uuid.UUID + AllLocalAttributeKeys InternalLocalAttributeConfig + Request xcapi.LoadLocalAttributesRequest + } + + LoadLocalAttributesResponse struct { + Response xcapi.LoadLocalAttributesResponse + } ) func (t ImmediateTask) GetTaskSequence() int64 { diff --git a/persistence/data_models/internal_global_attribute_config.go b/persistence/data_models/internal_global_attribute_config.go index b5fa2368..c6472cd8 100644 --- a/persistence/data_models/internal_global_attribute_config.go +++ b/persistence/data_models/internal_global_attribute_config.go @@ -9,3 +9,16 @@ type InternalGlobalAttributeConfig struct { // key is the table name, value is the primary key name and value TablePrimaryKeys map[string]xcapi.TableColumnValue `json:"tablePrimaryKeys"` } + +func getInternalGlobalAttributeConfig(req xcapi.ProcessExecutionStartRequest) *InternalGlobalAttributeConfig { + if req.ProcessStartConfig != nil && req.ProcessStartConfig.GlobalAttributeConfig != nil { + primaryKeys := map[string]xcapi.TableColumnValue{} + for _, cfg := range req.ProcessStartConfig.GlobalAttributeConfig.TableConfigs { + primaryKeys[cfg.TableName] = cfg.PrimaryKey + } + return &InternalGlobalAttributeConfig{ + TablePrimaryKeys: primaryKeys, + } + } + return nil +} diff --git a/persistence/data_models/internal_local_attribute_config.go b/persistence/data_models/internal_local_attribute_config.go new file mode 100644 index 00000000..65417881 --- /dev/null +++ b/persistence/data_models/internal_local_attribute_config.go @@ -0,0 +1,24 @@ +// Copyright (c) 2023 XDBLab Organization +// SPDX-License-Identifier: BUSL-1.1 + +package data_models + +import "github.com/xcherryio/apis/goapi/xcapi" + +type InternalLocalAttributeConfig struct { + AttributeKeys map[string]bool `json:"attributeKeys"` +} + +func getInternalLocalAttributeConfig(req xcapi.ProcessExecutionStartRequest) *InternalLocalAttributeConfig { + if req.ProcessStartConfig != nil && req.ProcessStartConfig.LocalAttributeConfig != nil { + attributeKeys := map[string]bool{} + keyValues := req.ProcessStartConfig.LocalAttributeConfig.InitialWrite + for i := range keyValues { + attributeKeys[keyValues[i].Key] = true + } + return &InternalLocalAttributeConfig{ + AttributeKeys: attributeKeys, + } + } + return nil +} diff --git a/persistence/data_models/process_execution_info_json.go b/persistence/data_models/process_execution_info_json.go index 68c35f33..af75093e 100644 --- a/persistence/data_models/process_execution_info_json.go +++ b/persistence/data_models/process_execution_info_json.go @@ -12,6 +12,7 @@ type ProcessExecutionInfoJson struct { ProcessType string `json:"processType"` WorkerURL string `json:"workerURL"` GlobalAttributeConfig *InternalGlobalAttributeConfig `json:"globalAttributeConfig"` + LocalAttributeConfig *InternalLocalAttributeConfig `json:"localAttributeConfig"` } func FromStartRequestToProcessInfoBytes(req xcapi.ProcessExecutionStartRequest) ([]byte, error) { @@ -19,6 +20,7 @@ func FromStartRequestToProcessInfoBytes(req xcapi.ProcessExecutionStartRequest) ProcessType: req.GetProcessType(), WorkerURL: req.GetWorkerUrl(), GlobalAttributeConfig: getInternalGlobalAttributeConfig(req), + LocalAttributeConfig: getInternalLocalAttributeConfig(req), } return json.Marshal(info) } diff --git a/persistence/data_models/utils.go b/persistence/data_models/utils.go index ac401428..d0c5a1b9 100644 --- a/persistence/data_models/utils.go +++ b/persistence/data_models/utils.go @@ -8,19 +8,6 @@ import ( "github.com/xcherryio/apis/goapi/xcapi" ) -func getInternalGlobalAttributeConfig(req xcapi.ProcessExecutionStartRequest) *InternalGlobalAttributeConfig { - if req.ProcessStartConfig != nil && req.ProcessStartConfig.GlobalAttributeConfig != nil { - primaryKeys := map[string]xcapi.TableColumnValue{} - for _, cfg := range req.ProcessStartConfig.GlobalAttributeConfig.TableConfigs { - primaryKeys[cfg.TableName] = cfg.PrimaryKey - } - return &InternalGlobalAttributeConfig{ - TablePrimaryKeys: primaryKeys, - } - } - return nil -} - func FromEncodedObjectIntoBytes(obj *xcapi.EncodedObject) ([]byte, error) { if obj == nil { // set this as default for diff --git a/persistence/interfaces.go b/persistence/interfaces.go index e3fa2eed..5747f3a4 100644 --- a/persistence/interfaces.go +++ b/persistence/interfaces.go @@ -64,6 +64,9 @@ type ( ctx context.Context, request data_models.LoadGlobalAttributesRequest, ) (*data_models.LoadGlobalAttributesResponse, error) + LoadLocalAttributes(ctx context.Context, request data_models.LoadLocalAttributesRequest, + ) (*data_models.LoadLocalAttributesResponse, error) + UpdateProcessExecutionForRpc(ctx context.Context, request data_models.UpdateProcessExecutionForRpcRequest) ( *data_models.UpdateProcessExecutionForRpcResponse, error) } diff --git a/persistence/sql/common.go b/persistence/sql/common.go index 457b6352..b6a74132 100644 --- a/persistence/sql/common.go +++ b/persistence/sql/common.go @@ -5,9 +5,10 @@ package sql import ( "context" + "math" + "github.com/xcherryio/apis/goapi/xcapi" "github.com/xcherryio/xcherry/common/uuid" - "math" "github.com/xcherryio/xcherry/common/ptr" "github.com/xcherryio/xcherry/extensions" @@ -73,6 +74,7 @@ type ( ProcessExecutionId uuid.UUID StateDecision xcapi.StateDecision GlobalAttributeTableConfig *data_models.InternalGlobalAttributeConfig + LocalAttributeConfig *data_models.InternalLocalAttributeConfig WorkerUrl string // for ProcessExecutionRowForUpdate @@ -120,6 +122,7 @@ func (p sqlProcessStoreImpl) handleStateDecision(ctx context.Context, tx extensi WorkerURL: request.WorkerUrl, StateConfig: next.StateConfig, GlobalAttributeConfig: request.GlobalAttributeTableConfig, + LocalAttributeConfig: request.LocalAttributeConfig, } stateInfoBytes, err := stateInfo.ToBytes() diff --git a/persistence/sql/complete_execute.go b/persistence/sql/complete_execute.go index 018fdc87..27c95a13 100644 --- a/persistence/sql/complete_execute.go +++ b/persistence/sql/complete_execute.go @@ -6,6 +6,7 @@ package sql import ( "context" "fmt" + "github.com/xcherryio/xcherry/persistence/data_models" "github.com/xcherryio/xcherry/common/log/tag" @@ -59,6 +60,23 @@ func (p sqlProcessStoreImpl) doCompleteExecuteExecutionTx( }, nil } + err = p.updateLocalAttributesIfNeeded( + ctx, + tx, + request.LocalAttributeConfig, + request.ProcessExecutionId, + request.Prepare.Info.Namespace, + request.Prepare.Info.ProcessId, + request.UpdateLocalAttributes) + if err != nil { + p.logger.Error("checkpoint", tag.Error(err)) + //lint:ignore nilerr reason + return &data_models.CompleteExecuteExecutionResponse{ + FailAtUpdatingLocalAttributes: true, + UpdatingLocalAttributesError: err, + }, nil + } + // Step 2: update state info currStateRow := extensions.AsyncStateExecutionRowForUpdateWithoutCommands{ @@ -104,6 +122,7 @@ func (p sqlProcessStoreImpl) doCompleteExecuteExecutionTx( ProcessExecutionId: request.ProcessExecutionId, StateDecision: request.StateDecision, GlobalAttributeTableConfig: request.GlobalAttributeTableConfig, + LocalAttributeConfig: request.LocalAttributeConfig, WorkerUrl: request.Prepare.Info.WorkerURL, ProcessExecutionRowStateExecutionSequenceMaps: &sequenceMaps, diff --git a/persistence/sql/load_local_attributes.go b/persistence/sql/load_local_attributes.go new file mode 100644 index 00000000..da30ac9e --- /dev/null +++ b/persistence/sql/load_local_attributes.go @@ -0,0 +1,52 @@ +// Copyright (c) 2023 XDBLab Organization +// SPDX-License-Identifier: BUSL-1.1 + +package sql + +import ( + "context" + "fmt" + + "github.com/xcherryio/apis/goapi/xcapi" + "github.com/xcherryio/xcherry/common/ptr" + "github.com/xcherryio/xcherry/extensions" + "github.com/xcherryio/xcherry/persistence/data_models" +) + +func (p sqlProcessStoreImpl) LoadLocalAttributes( + ctx context.Context, + request data_models.LoadLocalAttributesRequest, +) (*data_models.LoadLocalAttributesResponse, error) { + if len(request.Request.KeysToLoadWithLock) != 0 && + request.Request.LockingPolicy != ptr.Any(xcapi.NO_LOCKING) { + return nil, fmt.Errorf("locking policy %v is not supported", request.Request.LockingPolicy) + } + + var noLockRows []extensions.LocalAttributeRow + var err error + if len(request.Request.KeysToLoadNoLock) > 0 { + noLockRows, err = p.session.SelectLocalAttributes( + ctx, request.ProcessExecutionId, request.Request.KeysToLoadNoLock) + if err != nil { + return nil, err + } + } + + var attributes []xcapi.KeyValue + for _, row := range noLockRows { + value, err := data_models.BytesToEncodedObject(row.Value) + if err != nil { + return nil, err + } + attributes = append(attributes, xcapi.KeyValue{ + Key: row.Key, + Value: value, + }) + } + + return &data_models.LoadLocalAttributesResponse{ + Response: xcapi.LoadLocalAttributesResponse{ + Attributes: attributes, + }, + }, nil +} diff --git a/persistence/sql/start_process.go b/persistence/sql/start_process.go index 6b09e087..fee1cd8a 100644 --- a/persistence/sql/start_process.go +++ b/persistence/sql/start_process.go @@ -6,9 +6,10 @@ package sql import ( "context" "fmt" + "time" + "github.com/xcherryio/apis/goapi/xcapi" "github.com/xcherryio/xcherry/persistence/data_models" - "time" "github.com/xcherryio/xcherry/common/log/tag" "github.com/xcherryio/xcherry/common/uuid" @@ -58,20 +59,33 @@ func (p sqlProcessStoreImpl) doStartProcessTx( requestIdReusePolicy = *req.ProcessStartConfig.IdReusePolicy } + var resp *data_models.StartProcessResponse + var errStartProcess error switch requestIdReusePolicy { case xcapi.DISALLOW_REUSE: - return p.applyDisallowReusePolicy(ctx, tx, request) + resp, errStartProcess = p.applyDisallowReusePolicy(ctx, tx, request) case xcapi.ALLOW_IF_NO_RUNNING: - return p.applyAllowIfNoRunningPolicy(ctx, tx, request) + resp, errStartProcess = p.applyAllowIfNoRunningPolicy(ctx, tx, request) case xcapi.ALLOW_IF_PREVIOUS_EXIT_ABNORMALLY: - return p.applyAllowIfPreviousExitAbnormallyPolicy(ctx, tx, request) + resp, errStartProcess = p.applyAllowIfPreviousExitAbnormallyPolicy(ctx, tx, request) case xcapi.TERMINATE_IF_RUNNING: - return p.applyTerminateIfRunningPolicy(ctx, tx, request) + resp, errStartProcess = p.applyTerminateIfRunningPolicy(ctx, tx, request) default: return nil, fmt.Errorf( "unknown id reuse policy %v", req.ProcessStartConfig.IdReusePolicy) } + + if errStartProcess != nil { + return nil, errStartProcess + } + + err = p.handleInitialLocalAttributesWrite(ctx, tx, req, *resp) + if err != nil { + return nil, err + } + + return resp, nil } func (p sqlProcessStoreImpl) applyDisallowReusePolicy( diff --git a/persistence/sql/write_local_attributes.go b/persistence/sql/write_local_attributes.go new file mode 100644 index 00000000..c0b576d5 --- /dev/null +++ b/persistence/sql/write_local_attributes.go @@ -0,0 +1,72 @@ +// Copyright (c) 2023 XDBLab Organization +// SPDX-License-Identifier: BUSL-1.1 + +package sql + +import ( + "context" + + "github.com/xcherryio/apis/goapi/xcapi" + "github.com/xcherryio/xcherry/common/log/tag" + "github.com/xcherryio/xcherry/common/uuid" + "github.com/xcherryio/xcherry/extensions" + "github.com/xcherryio/xcherry/persistence/data_models" +) + +func (p sqlProcessStoreImpl) handleInitialLocalAttributesWrite( + ctx context.Context, + tx extensions.SQLTransaction, + req xcapi.ProcessExecutionStartRequest, + resp data_models.StartProcessResponse, +) error { + if req.ProcessStartConfig == nil || req.ProcessStartConfig.LocalAttributeConfig == nil || + len(req.ProcessStartConfig.LocalAttributeConfig.InitialWrite) == 0 { + return nil + } + + attributes := req.ProcessStartConfig.LocalAttributeConfig.InitialWrite + for i := range attributes { + valueBytes, err := data_models.FromEncodedObjectIntoBytes(&attributes[i].Value) + if err != nil { + return err + } + row := extensions.LocalAttributeRow{ + ProcessExecutionId: resp.ProcessExecutionId, + Key: attributes[i].Key, + Value: valueBytes, + } + + err = tx.InsertLocalAttribute(ctx, row) + if err != nil { + p.logger.Error("error on inserting local attribute", tag.Error(err)) + return err + } + } + + return nil +} + +func (p sqlProcessStoreImpl) updateLocalAttributesIfNeeded( + ctx context.Context, tx extensions.SQLTransaction, config *data_models.InternalLocalAttributeConfig, + processExecutionId uuid.UUID, namespace string, processId string, + localAttributeToUpdate []xcapi.KeyValue, +) error { + if len(localAttributeToUpdate) > 0 { + for _, kv := range localAttributeToUpdate { + valueBytes, err := data_models.FromEncodedObjectIntoBytes(&kv.Value) + if err != nil { + return err + } + row := extensions.LocalAttributeRow{ + ProcessExecutionId: processExecutionId, + Key: kv.Key, + Value: valueBytes, + } + err = tx.UpsertLocalAttribute(ctx, row) + if err != nil { + return err + } + } + } + return nil +} diff --git a/service/api/service_impl.go b/service/api/service_impl.go index 80eb9e3d..28844218 100644 --- a/service/api/service_impl.go +++ b/service/api/service_impl.go @@ -5,13 +5,14 @@ package api import ( "context" + "net/http" + "time" + "github.com/xcherryio/apis/goapi/xcapi" "github.com/xcherryio/xcherry/common/decision" "github.com/xcherryio/xcherry/common/httperror" "github.com/xcherryio/xcherry/common/urlautofix" "github.com/xcherryio/xcherry/persistence/data_models" - "net/http" - "time" "github.com/xcherryio/xcherry/common/log" "github.com/xcherryio/xcherry/common/log/tag" @@ -65,6 +66,7 @@ func (s serviceImpl) StartProcess( http.StatusFailedDependency, "Failed to write global attributes, please check the error message for details: "+resp.GlobalAttributeWriteError.Error()) } + if resp.HasNewImmediateTask { s.notifyRemoteImmediateTaskAsync(ctx, xcapi.NotifyImmediateTasksRequest{ ShardId: persistence.DefaultShardId,