Skip to content

Commit

Permalink
local attrbiutes (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
duoertai authored Nov 30, 2023
1 parent d1c0df9 commit 9b9ccac
Show file tree
Hide file tree
Showing 23 changed files with 402 additions and 54 deletions.
124 changes: 97 additions & 27 deletions engine/immediate_task_conrrent_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ package engine
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/xcherryio/apis/goapi/xcapi"
"github.com/xcherryio/xcherry/common/decision"
"github.com/xcherryio/xcherry/common/httperror"
"github.com/xcherryio/xcherry/persistence/data_models"
"io/ioutil"
"net/http"
"time"

"github.com/xcherryio/xcherry/common/log"
"github.com/xcherryio/xcherry/common/log/tag"
Expand Down Expand Up @@ -354,34 +355,69 @@ func (w *immediateTaskConcurrentProcessor) processExecuteTask(
var resp *xcapi.AsyncStateExecuteResponse
var httpResp *http.Response
loadedGlobalAttributesResp, errToCheck := w.loadGlobalAttributesIfNeeded(ctx, prep, task)
if errToCheck == nil {
req := apiClient.DefaultAPI.ApiV1XcherryWorkerAsyncStateExecutePost(ctx)
resp, httpResp, errToCheck = req.AsyncStateExecuteRequest(
xcapi.AsyncStateExecuteRequest{
Context: createApiContext(
prep,
task,
prep.Info.RecoverFromStateExecutionId,
prep.Info.RecoverFromApi),
ProcessType: prep.Info.ProcessType,
StateId: task.StateId,
StateInput: &xcapi.EncodedObject{
Encoding: prep.Input.Encoding,
Data: prep.Input.Data,
},
CommandResults: &prep.WaitUntilCommandResults,
LoadedGlobalAttributes: &loadedGlobalAttributesResp.Response,
},
).Execute()
if httpResp != nil {
defer httpResp.Body.Close()
}
if errToCheck != nil {
if httperror.CheckHttpResponseAndError(errToCheck, httpResp, w.logger) {
status, details := w.composeHttpError(errToCheck, httpResp, prep.Info, task)

if errToCheck == nil {
errToCheck = decision.ValidateDecision(resp.StateDecision)
nextIntervalSecs, shouldRetry := w.checkRetry(task, prep.Info)
if shouldRetry {
return w.retryTask(ctx, task, prep, nextIntervalSecs, status, details)
}
return w.applyStateFailureRecoveryPolicy(ctx,
task,
prep,
status,
details,
task.ImmediateTaskInfo.WorkerTaskBackoffInfo.CompletedAttempts,
xcapi.EXECUTE_API)
}
}
loadedLocalAttributesResp, errToCheck := w.loadLocalAttributesIfNeeded(ctx, prep, task)
if errToCheck != nil {
if httperror.CheckHttpResponseAndError(errToCheck, httpResp, w.logger) {
status, details := w.composeHttpError(errToCheck, httpResp, prep.Info, task)

nextIntervalSecs, shouldRetry := w.checkRetry(task, prep.Info)
if shouldRetry {
return w.retryTask(ctx, task, prep, nextIntervalSecs, status, details)
}
return w.applyStateFailureRecoveryPolicy(ctx,
task,
prep,
status,
details,
task.ImmediateTaskInfo.WorkerTaskBackoffInfo.CompletedAttempts,
xcapi.EXECUTE_API)
}
}

req := apiClient.DefaultAPI.ApiV1XcherryWorkerAsyncStateExecutePost(ctx)
resp, httpResp, errToCheck = req.AsyncStateExecuteRequest(
xcapi.AsyncStateExecuteRequest{
Context: createApiContext(
prep,
task,
prep.Info.RecoverFromStateExecutionId,
prep.Info.RecoverFromApi),
ProcessType: prep.Info.ProcessType,
StateId: task.StateId,
StateInput: &xcapi.EncodedObject{
Encoding: prep.Input.Encoding,
Data: prep.Input.Data,
},
CommandResults: &prep.WaitUntilCommandResults,
LoadedGlobalAttributes: &loadedGlobalAttributesResp.Response,
LoadedLocalAttributes: &loadedLocalAttributesResp.Response,
},
).Execute()
if httpResp != nil {
defer httpResp.Body.Close()
}

if errToCheck == nil {
errToCheck = decision.ValidateDecision(resp.StateDecision)
}

if httperror.CheckHttpResponseAndError(errToCheck, httpResp, w.logger) {
status, details := w.composeHttpError(errToCheck, httpResp, prep.Info, task)

Expand Down Expand Up @@ -411,6 +447,8 @@ func (w *immediateTaskConcurrentProcessor) processExecuteTask(
TaskSequence: task.GetTaskSequence(),
GlobalAttributeTableConfig: prep.Info.GlobalAttributeConfig,
UpdateGlobalAttributes: resp.WriteToGlobalAttributes,
LocalAttributeConfig: prep.Info.LocalAttributeConfig,
UpdateLocalAttributes: resp.WriteToLocalAttributes,
})
if err != nil {
return err
Expand Down Expand Up @@ -595,3 +633,35 @@ func (w *immediateTaskConcurrentProcessor) loadGlobalAttributesIfNeeded(

return resp, err
}

func (w *immediateTaskConcurrentProcessor) loadLocalAttributesIfNeeded(
ctx context.Context, prep data_models.PrepareStateExecutionResponse, task data_models.ImmediateTask,
) (*data_models.LoadLocalAttributesResponse, error) {
if prep.Info.StateConfig == nil ||
prep.Info.StateConfig.LoadLocalAttributesRequest == nil {
return &data_models.LoadLocalAttributesResponse{}, nil
}

if prep.Info.LocalAttributeConfig == nil {
return &data_models.LoadLocalAttributesResponse{},
fmt.Errorf("local attribute config is not available")
}

w.logger.Debug("loading local attributes for state execute",
tag.StateExecutionId(task.GetStateExecutionId()),
tag.JsonValue(prep.Info.StateConfig),
tag.JsonValue(prep.Info.LocalAttributeConfig))

resp, err := w.store.LoadLocalAttributes(ctx, data_models.LoadLocalAttributesRequest{
ProcessExecutionId: task.ProcessExecutionId,
AllLocalAttributeKeys: *prep.Info.LocalAttributeConfig,
Request: *prep.Info.StateConfig.LoadLocalAttributesRequest,
})

w.logger.Debug("loaded local attributes for state execute",
tag.StateExecutionId(task.GetStateExecutionId()),
tag.JsonValue(resp),
tag.Error(err))

return resp, err
}
10 changes: 9 additions & 1 deletion extensions/data_models_row.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
package extensions

import (
"github.com/xcherryio/xcherry/persistence/data_models"
"time"

"github.com/xcherryio/xcherry/persistence/data_models"

"github.com/jmoiron/sqlx/types"
"github.com/xcherryio/xcherry/common/uuid"
)
Expand Down Expand Up @@ -250,4 +251,11 @@ type (
CustomTableRowSelect struct {
ColumnToValue map[string]string
}

LocalAttributeRow struct {
ProcessExecutionId uuid.UUID
ProcessExecutionIdString string
Key string
Value types.JSONText
}
)
18 changes: 18 additions & 0 deletions extensions/postgres/non_transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,21 @@ func (d dbSession) SelectCustomTableByPK(
ColumnToValue: columnToValue,
}, nil
}

const selectLocalAttributesQuery = `SELECT
process_execution_id, key, value
FROM xcherry_sys_local_attributes WHERE process_execution_id = ? AND key IN (?)
`

func (d dbSession) SelectLocalAttributes(
ctx context.Context, processExecutionId uuid.UUID, keys []string,
) ([]extensions.LocalAttributeRow, error) {
var rows []extensions.LocalAttributeRow
query, args, err := sqlx.In(selectLocalAttributesQuery, processExecutionId.String(), keys)
if err != nil {
return nil, err
}
query = d.db.Rebind(query)
err = d.db.SelectContext(ctx, &rows, query, args...)
return rows, err
}
9 changes: 8 additions & 1 deletion extensions/postgres/schema/xcherry_sys_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,11 @@ CREATE TABLE xcherry_sys_local_queue_messages(
queue_name VARCHAR(31),
payload jsonb,
PRIMARY KEY (process_execution_id, dedup_id)
);
);

CREATE TABLE xcherry_sys_local_attributes(
process_execution_id uuid NOT NULL,
key VARCHAR(31) NOT NULL,
value jsonb,
PRIMARY KEY (process_execution_id, key)
);
23 changes: 23 additions & 0 deletions extensions/postgres/transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,26 @@ func (d dbTx) UpsertCustomTableByPK(
ON CONFLICT (`+pkName+`) DO `+updateClause)
return err
}

const insertLocalAttributeQuery = `INSERT INTO xcherry_sys_local_attributes
(process_execution_id, key, value)
VALUES (:process_execution_id_string, :key, :value)
`

func (d dbTx) InsertLocalAttribute(ctx context.Context, row extensions.LocalAttributeRow) error {
row.ProcessExecutionIdString = row.ProcessExecutionId.String()
_, err := d.tx.NamedExecContext(ctx, insertLocalAttributeQuery, row)
return err
}

const upsertLocalAttributeQuery = `INSERT INTO xcherry_sys_local_attributes
(process_execution_id, key, value)
VALUES (:process_execution_id_string, :key, :value)
ON CONFLICT (process_execution_id, key) DO UPDATE SET value = :value
`

func (d dbTx) UpsertLocalAttribute(ctx context.Context, row extensions.LocalAttributeRow) error {
row.ProcessExecutionIdString = row.ProcessExecutionId.String()
_, err := d.tx.NamedExecContext(ctx, upsertLocalAttributeQuery, row)
return err
}
7 changes: 7 additions & 0 deletions extensions/sql_db_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type transactionalCRUD interface {
UpsertCustomTableByPK(
ctx context.Context, tableName string, pkName, pkValue string, colToValue map[string]string,
) error

InsertLocalAttribute(ctx context.Context, insert LocalAttributeRow) error
UpsertLocalAttribute(ctx context.Context, row LocalAttributeRow) error
}

type nonTransactionalCRUD interface {
Expand All @@ -102,6 +105,10 @@ type nonTransactionalCRUD interface {
SelectCustomTableByPK(
ctx context.Context, tableName string, pkName, pkValue string, columns []string,
) (*CustomTableRowSelect, error)

SelectLocalAttributes(
ctx context.Context, processExecutionId uuid.UUID, keys []string,
) ([]LocalAttributeRow, error)
}

type ErrorChecker interface {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/urfave/cli/v2 v2.25.7
github.com/xcherryio/apis v0.0.2
github.com/xcherryio/sdk-go v0.0.0-20231121001803-5542cc80706e
github.com/xcherryio/sdk-go v0.0.0-20231130204036-991635cecd77
go.uber.org/multierr v1.10.0
go.uber.org/zap v1.26.0
gopkg.in/yaml.v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/xcherryio/apis v0.0.2 h1:sbxlWCrMrjmSQOLYOSHPiKZe+/LvpW0A5A6/PanUqRk=
github.com/xcherryio/apis v0.0.2/go.mod h1:7peiYpRUjmq0rl/8F0MmvFH8Vp7Y8Dq5OpRgpH0cMJU=
github.com/xcherryio/sdk-go v0.0.0-20231121001803-5542cc80706e h1:O1kd3ezvfB4TfN4hfKidqHDG/WpybCVXSIQY3FLL27A=
github.com/xcherryio/sdk-go v0.0.0-20231121001803-5542cc80706e/go.mod h1:3f+aG6R1WAurpTaZgemfnDqwIrp4BS3WJYM32ekm28k=
github.com/xcherryio/sdk-go v0.0.0-20231130204036-991635cecd77 h1:3DIl/bLd9X3CjIhuqd8IEKZpwRODeOdSJ31SHqwRoeE=
github.com/xcherryio/sdk-go v0.0.0-20231130204036-991635cecd77/go.mod h1:3f+aG6R1WAurpTaZgemfnDqwIrp4BS3WJYM32ekm28k=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
Expand Down
5 changes: 5 additions & 0 deletions integTests/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package integTests

import (
"github.com/xcherryio/sdk-go/integTests/local_attribute"
"testing"

"github.com/xcherryio/sdk-go/integTests/global_attribute"
Expand Down Expand Up @@ -83,6 +84,10 @@ func TestGlobalAttributesWithMultiTables(t *testing.T) {
global_attribute.TestGlobalAttributesWithMultiTables(t, client)
}

func TestLocalAttributes(t *testing.T) {
local_attribute.TestLocalAttributes(t, client)
}

func TestProcessTimeoutCase1(t *testing.T) {
process_timeout.TestStartTimeoutProcessCase1(t, client)
}
Expand Down
2 changes: 2 additions & 0 deletions integTests/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/xcherryio/sdk-go/integTests/basic"
"github.com/xcherryio/sdk-go/integTests/failure_recovery"
"github.com/xcherryio/sdk-go/integTests/global_attribute"
"github.com/xcherryio/sdk-go/integTests/local_attribute"
"github.com/xcherryio/sdk-go/integTests/multi_states"
"github.com/xcherryio/sdk-go/integTests/process_timeout"
"github.com/xcherryio/sdk-go/integTests/state_decision"
Expand All @@ -28,6 +29,7 @@ func init() {
&failure_recovery.StateFailureRecoveryTestExecuteNoWaitUntilProcess{},
&failure_recovery.StateFailureRecoveryTestExecuteFailedAtStartProcess{},
&multi_states.MultiStatesProcess{},
&local_attribute.LocalAttributeTestProcess{},
&state_decision.GracefulCompleteProcess{},
&state_decision.ForceCompleteProcess{},
&state_decision.ForceFailProcess{},
Expand Down
2 changes: 2 additions & 0 deletions persistence/data_models/async_state_execution_info_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type AsyncStateExecutionInfoJson struct {
RecoverFromStateExecutionId *string `json:"recoverFromStateExecutionId,omitempty"`
RecoverFromApi *xcapi.StateApiType `json:"recoverFromApi,omitempty"`
GlobalAttributeConfig *InternalGlobalAttributeConfig `json:"globalAttributeConfig"`
LocalAttributeConfig *InternalLocalAttributeConfig `json:"localAttributeConfig"`
}

func FromStartRequestToStateInfoBytes(req xcapi.ProcessExecutionStartRequest) ([]byte, error) {
Expand All @@ -27,6 +28,7 @@ func FromStartRequestToStateInfoBytes(req xcapi.ProcessExecutionStartRequest) ([
WorkerURL: req.GetWorkerUrl(),
StateConfig: req.StartStateConfig,
GlobalAttributeConfig: getInternalGlobalAttributeConfig(req),
LocalAttributeConfig: getInternalLocalAttributeConfig(req),
}

return infoJson.ToBytes()
Expand Down
17 changes: 16 additions & 1 deletion persistence/data_models/data_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ package data_models

import (
"fmt"
"github.com/xcherryio/apis/goapi/xcapi"
"strconv"
"strings"

"github.com/xcherryio/apis/goapi/xcapi"
"github.com/xcherryio/xcherry/common/uuid"
)

Expand Down Expand Up @@ -257,6 +257,9 @@ type (
GlobalAttributeTableConfig *InternalGlobalAttributeConfig
UpdateGlobalAttributes []xcapi.GlobalAttributeTableRowUpdate

LocalAttributeConfig *InternalLocalAttributeConfig
UpdateLocalAttributes []xcapi.KeyValue

TaskShardId int32
TaskSequence int64
}
Expand All @@ -265,6 +268,8 @@ type (
HasNewImmediateTask bool
FailAtUpdatingGlobalAttributes bool
UpdatingGlobalAttributesError error
FailAtUpdatingLocalAttributes bool
UpdatingLocalAttributesError error
}

PublishToLocalQueueRequest struct {
Expand Down Expand Up @@ -322,6 +327,16 @@ type (
FailAtUpdatingGlobalAttributes bool
UpdatingGlobalAttributesError error
}

LoadLocalAttributesRequest struct {
ProcessExecutionId uuid.UUID
AllLocalAttributeKeys InternalLocalAttributeConfig
Request xcapi.LoadLocalAttributesRequest
}

LoadLocalAttributesResponse struct {
Response xcapi.LoadLocalAttributesResponse
}
)

func (t ImmediateTask) GetTaskSequence() int64 {
Expand Down
Loading

0 comments on commit 9b9ccac

Please sign in to comment.