Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

local attrbiutes #102

Merged
merged 7 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 97 additions & 27 deletions engine/immediate_task_conrrent_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ package engine
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/xcherryio/apis/goapi/xcapi"
"github.com/xcherryio/xcherry/common/decision"
"github.com/xcherryio/xcherry/common/httperror"
"github.com/xcherryio/xcherry/persistence/data_models"
"io/ioutil"
"net/http"
"time"

"github.com/xcherryio/xcherry/common/log"
"github.com/xcherryio/xcherry/common/log/tag"
Expand Down Expand Up @@ -354,34 +355,69 @@ func (w *immediateTaskConcurrentProcessor) processExecuteTask(
var resp *xcapi.AsyncStateExecuteResponse
var httpResp *http.Response
loadedGlobalAttributesResp, errToCheck := w.loadGlobalAttributesIfNeeded(ctx, prep, task)
if errToCheck == nil {
req := apiClient.DefaultAPI.ApiV1XcherryWorkerAsyncStateExecutePost(ctx)
resp, httpResp, errToCheck = req.AsyncStateExecuteRequest(
xcapi.AsyncStateExecuteRequest{
Context: createApiContext(
prep,
task,
prep.Info.RecoverFromStateExecutionId,
prep.Info.RecoverFromApi),
ProcessType: prep.Info.ProcessType,
StateId: task.StateId,
StateInput: &xcapi.EncodedObject{
Encoding: prep.Input.Encoding,
Data: prep.Input.Data,
},
CommandResults: &prep.WaitUntilCommandResults,
LoadedGlobalAttributes: &loadedGlobalAttributesResp.Response,
},
).Execute()
if httpResp != nil {
defer httpResp.Body.Close()
}
if errToCheck != nil {
if httperror.CheckHttpResponseAndError(errToCheck, httpResp, w.logger) {
status, details := w.composeHttpError(errToCheck, httpResp, prep.Info, task)

if errToCheck == nil {
errToCheck = decision.ValidateDecision(resp.StateDecision)
nextIntervalSecs, shouldRetry := w.checkRetry(task, prep.Info)
if shouldRetry {
return w.retryTask(ctx, task, prep, nextIntervalSecs, status, details)
}
return w.applyStateFailureRecoveryPolicy(ctx,
task,
prep,
status,
details,
task.ImmediateTaskInfo.WorkerTaskBackoffInfo.CompletedAttempts,
xcapi.EXECUTE_API)
}
}
loadedLocalAttributesResp, errToCheck := w.loadLocalAttributesIfNeeded(ctx, prep, task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to use separate txn to load for now. but fyi, we will need to evolve to this later:
#93
and
#103

if errToCheck != nil {
if httperror.CheckHttpResponseAndError(errToCheck, httpResp, w.logger) {
status, details := w.composeHttpError(errToCheck, httpResp, prep.Info, task)

nextIntervalSecs, shouldRetry := w.checkRetry(task, prep.Info)
if shouldRetry {
return w.retryTask(ctx, task, prep, nextIntervalSecs, status, details)
}
return w.applyStateFailureRecoveryPolicy(ctx,
task,
prep,
status,
details,
task.ImmediateTaskInfo.WorkerTaskBackoffInfo.CompletedAttempts,
xcapi.EXECUTE_API)
}
}

req := apiClient.DefaultAPI.ApiV1XcherryWorkerAsyncStateExecutePost(ctx)
resp, httpResp, errToCheck = req.AsyncStateExecuteRequest(
xcapi.AsyncStateExecuteRequest{
Context: createApiContext(
prep,
task,
prep.Info.RecoverFromStateExecutionId,
prep.Info.RecoverFromApi),
ProcessType: prep.Info.ProcessType,
StateId: task.StateId,
StateInput: &xcapi.EncodedObject{
Encoding: prep.Input.Encoding,
Data: prep.Input.Data,
},
CommandResults: &prep.WaitUntilCommandResults,
LoadedGlobalAttributes: &loadedGlobalAttributesResp.Response,
LoadedLocalAttributes: &loadedLocalAttributesResp.Response,
},
).Execute()
if httpResp != nil {
defer httpResp.Body.Close()
}

if errToCheck == nil {
errToCheck = decision.ValidateDecision(resp.StateDecision)
}

if httperror.CheckHttpResponseAndError(errToCheck, httpResp, w.logger) {
status, details := w.composeHttpError(errToCheck, httpResp, prep.Info, task)

Expand Down Expand Up @@ -411,6 +447,8 @@ func (w *immediateTaskConcurrentProcessor) processExecuteTask(
TaskSequence: task.GetTaskSequence(),
GlobalAttributeTableConfig: prep.Info.GlobalAttributeConfig,
UpdateGlobalAttributes: resp.WriteToGlobalAttributes,
LocalAttributeConfig: prep.Info.LocalAttributeConfig,
UpdateLocalAttributes: resp.WriteToLocalAttributes,
})
if err != nil {
return err
Expand Down Expand Up @@ -595,3 +633,35 @@ func (w *immediateTaskConcurrentProcessor) loadGlobalAttributesIfNeeded(

return resp, err
}

func (w *immediateTaskConcurrentProcessor) loadLocalAttributesIfNeeded(
ctx context.Context, prep data_models.PrepareStateExecutionResponse, task data_models.ImmediateTask,
) (*data_models.LoadLocalAttributesResponse, error) {
if prep.Info.StateConfig == nil ||
prep.Info.StateConfig.LoadLocalAttributesRequest == nil {
return &data_models.LoadLocalAttributesResponse{}, nil
}

if prep.Info.LocalAttributeConfig == nil {
return &data_models.LoadLocalAttributesResponse{},
fmt.Errorf("local attribute config is not available")
}

w.logger.Debug("loading local attributes for state execute",
tag.StateExecutionId(task.GetStateExecutionId()),
tag.JsonValue(prep.Info.StateConfig),
tag.JsonValue(prep.Info.LocalAttributeConfig))

resp, err := w.store.LoadLocalAttributes(ctx, data_models.LoadLocalAttributesRequest{
ProcessExecutionId: task.ProcessExecutionId,
AllLocalAttributeKeys: *prep.Info.LocalAttributeConfig,
Request: *prep.Info.StateConfig.LoadLocalAttributesRequest,
})

w.logger.Debug("loaded local attributes for state execute",
tag.StateExecutionId(task.GetStateExecutionId()),
tag.JsonValue(resp),
tag.Error(err))

return resp, err
}
10 changes: 9 additions & 1 deletion extensions/data_models_row.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
package extensions

import (
"github.com/xcherryio/xcherry/persistence/data_models"
"time"

"github.com/xcherryio/xcherry/persistence/data_models"

"github.com/jmoiron/sqlx/types"
"github.com/xcherryio/xcherry/common/uuid"
)
Expand Down Expand Up @@ -250,4 +251,11 @@ type (
CustomTableRowSelect struct {
ColumnToValue map[string]string
}

LocalAttributeRow struct {
ProcessExecutionId uuid.UUID
ProcessExecutionIdString string
Key string
Value types.JSONText
}
)
18 changes: 18 additions & 0 deletions extensions/postgres/non_transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,21 @@ func (d dbSession) SelectCustomTableByPK(
ColumnToValue: columnToValue,
}, nil
}

const selectLocalAttributesQuery = `SELECT
process_execution_id, key, value
FROM xcherry_sys_local_attributes WHERE process_execution_id = ? AND key IN (?)
`

func (d dbSession) SelectLocalAttributes(
ctx context.Context, processExecutionId uuid.UUID, keys []string,
) ([]extensions.LocalAttributeRow, error) {
var rows []extensions.LocalAttributeRow
query, args, err := sqlx.In(selectLocalAttributesQuery, processExecutionId.String(), keys)
if err != nil {
return nil, err
}
query = d.db.Rebind(query)
err = d.db.SelectContext(ctx, &rows, query, args...)
return rows, err
}
9 changes: 8 additions & 1 deletion extensions/postgres/schema/xcherry_sys_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,11 @@ CREATE TABLE xcherry_sys_local_queue_messages(
queue_name VARCHAR(31),
payload jsonb,
PRIMARY KEY (process_execution_id, dedup_id)
);
);

CREATE TABLE xcherry_sys_local_attributes(
process_execution_id uuid NOT NULL,
key VARCHAR(31) NOT NULL,
value jsonb,
PRIMARY KEY (process_execution_id, key)
);
23 changes: 23 additions & 0 deletions extensions/postgres/transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,26 @@ func (d dbTx) UpsertCustomTableByPK(
ON CONFLICT (`+pkName+`) DO `+updateClause)
return err
}

const insertLocalAttributeQuery = `INSERT INTO xcherry_sys_local_attributes
(process_execution_id, key, value)
VALUES (:process_execution_id_string, :key, :value)
`

func (d dbTx) InsertLocalAttribute(ctx context.Context, row extensions.LocalAttributeRow) error {
row.ProcessExecutionIdString = row.ProcessExecutionId.String()
_, err := d.tx.NamedExecContext(ctx, insertLocalAttributeQuery, row)
return err
}

const upsertLocalAttributeQuery = `INSERT INTO xcherry_sys_local_attributes
(process_execution_id, key, value)
VALUES (:process_execution_id_string, :key, :value)
ON CONFLICT (process_execution_id, key) DO UPDATE SET value = :value
`

func (d dbTx) UpsertLocalAttribute(ctx context.Context, row extensions.LocalAttributeRow) error {
row.ProcessExecutionIdString = row.ProcessExecutionId.String()
_, err := d.tx.NamedExecContext(ctx, upsertLocalAttributeQuery, row)
return err
}
7 changes: 7 additions & 0 deletions extensions/sql_db_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type transactionalCRUD interface {
UpsertCustomTableByPK(
ctx context.Context, tableName string, pkName, pkValue string, colToValue map[string]string,
) error

InsertLocalAttribute(ctx context.Context, insert LocalAttributeRow) error
UpsertLocalAttribute(ctx context.Context, row LocalAttributeRow) error
}

type nonTransactionalCRUD interface {
Expand All @@ -102,6 +105,10 @@ type nonTransactionalCRUD interface {
SelectCustomTableByPK(
ctx context.Context, tableName string, pkName, pkValue string, columns []string,
) (*CustomTableRowSelect, error)

SelectLocalAttributes(
ctx context.Context, processExecutionId uuid.UUID, keys []string,
) ([]LocalAttributeRow, error)
}

type ErrorChecker interface {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/urfave/cli/v2 v2.25.7
github.com/xcherryio/apis v0.0.2
github.com/xcherryio/sdk-go v0.0.0-20231121001803-5542cc80706e
github.com/xcherryio/sdk-go v0.0.0-20231130204036-991635cecd77
go.uber.org/multierr v1.10.0
go.uber.org/zap v1.26.0
gopkg.in/yaml.v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/xcherryio/apis v0.0.2 h1:sbxlWCrMrjmSQOLYOSHPiKZe+/LvpW0A5A6/PanUqRk=
github.com/xcherryio/apis v0.0.2/go.mod h1:7peiYpRUjmq0rl/8F0MmvFH8Vp7Y8Dq5OpRgpH0cMJU=
github.com/xcherryio/sdk-go v0.0.0-20231121001803-5542cc80706e h1:O1kd3ezvfB4TfN4hfKidqHDG/WpybCVXSIQY3FLL27A=
github.com/xcherryio/sdk-go v0.0.0-20231121001803-5542cc80706e/go.mod h1:3f+aG6R1WAurpTaZgemfnDqwIrp4BS3WJYM32ekm28k=
github.com/xcherryio/sdk-go v0.0.0-20231130204036-991635cecd77 h1:3DIl/bLd9X3CjIhuqd8IEKZpwRODeOdSJ31SHqwRoeE=
github.com/xcherryio/sdk-go v0.0.0-20231130204036-991635cecd77/go.mod h1:3f+aG6R1WAurpTaZgemfnDqwIrp4BS3WJYM32ekm28k=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
Expand Down
5 changes: 5 additions & 0 deletions integTests/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package integTests

import (
"github.com/xcherryio/sdk-go/integTests/local_attribute"
"testing"

"github.com/xcherryio/sdk-go/integTests/global_attribute"
Expand Down Expand Up @@ -83,6 +84,10 @@ func TestGlobalAttributesWithMultiTables(t *testing.T) {
global_attribute.TestGlobalAttributesWithMultiTables(t, client)
}

func TestLocalAttributes(t *testing.T) {
local_attribute.TestLocalAttributes(t, client)
}

func TestProcessTimeoutCase1(t *testing.T) {
process_timeout.TestStartTimeoutProcessCase1(t, client)
}
Expand Down
2 changes: 2 additions & 0 deletions integTests/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/xcherryio/sdk-go/integTests/basic"
"github.com/xcherryio/sdk-go/integTests/failure_recovery"
"github.com/xcherryio/sdk-go/integTests/global_attribute"
"github.com/xcherryio/sdk-go/integTests/local_attribute"
"github.com/xcherryio/sdk-go/integTests/multi_states"
"github.com/xcherryio/sdk-go/integTests/process_timeout"
"github.com/xcherryio/sdk-go/integTests/state_decision"
Expand All @@ -28,6 +29,7 @@ func init() {
&failure_recovery.StateFailureRecoveryTestExecuteNoWaitUntilProcess{},
&failure_recovery.StateFailureRecoveryTestExecuteFailedAtStartProcess{},
&multi_states.MultiStatesProcess{},
&local_attribute.LocalAttributeTestProcess{},
&state_decision.GracefulCompleteProcess{},
&state_decision.ForceCompleteProcess{},
&state_decision.ForceFailProcess{},
Expand Down
2 changes: 2 additions & 0 deletions persistence/data_models/async_state_execution_info_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type AsyncStateExecutionInfoJson struct {
RecoverFromStateExecutionId *string `json:"recoverFromStateExecutionId,omitempty"`
RecoverFromApi *xcapi.StateApiType `json:"recoverFromApi,omitempty"`
GlobalAttributeConfig *InternalGlobalAttributeConfig `json:"globalAttributeConfig"`
LocalAttributeConfig *InternalLocalAttributeConfig `json:"localAttributeConfig"`
}

func FromStartRequestToStateInfoBytes(req xcapi.ProcessExecutionStartRequest) ([]byte, error) {
Expand All @@ -27,6 +28,7 @@ func FromStartRequestToStateInfoBytes(req xcapi.ProcessExecutionStartRequest) ([
WorkerURL: req.GetWorkerUrl(),
StateConfig: req.StartStateConfig,
GlobalAttributeConfig: getInternalGlobalAttributeConfig(req),
LocalAttributeConfig: getInternalLocalAttributeConfig(req),
}

return infoJson.ToBytes()
Expand Down
17 changes: 16 additions & 1 deletion persistence/data_models/data_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ package data_models

import (
"fmt"
"github.com/xcherryio/apis/goapi/xcapi"
"strconv"
"strings"

"github.com/xcherryio/apis/goapi/xcapi"
"github.com/xcherryio/xcherry/common/uuid"
)

Expand Down Expand Up @@ -257,6 +257,9 @@ type (
GlobalAttributeTableConfig *InternalGlobalAttributeConfig
UpdateGlobalAttributes []xcapi.GlobalAttributeTableRowUpdate

LocalAttributeConfig *InternalLocalAttributeConfig
UpdateLocalAttributes []xcapi.KeyValue

TaskShardId int32
TaskSequence int64
}
Expand All @@ -265,6 +268,8 @@ type (
HasNewImmediateTask bool
FailAtUpdatingGlobalAttributes bool
UpdatingGlobalAttributesError error
FailAtUpdatingLocalAttributes bool
UpdatingLocalAttributesError error
}

PublishToLocalQueueRequest struct {
Expand Down Expand Up @@ -322,6 +327,16 @@ type (
FailAtUpdatingGlobalAttributes bool
UpdatingGlobalAttributesError error
}

LoadLocalAttributesRequest struct {
ProcessExecutionId uuid.UUID
AllLocalAttributeKeys InternalLocalAttributeConfig
Request xcapi.LoadLocalAttributesRequest
}

LoadLocalAttributesResponse struct {
Response xcapi.LoadLocalAttributesResponse
}
)

func (t ImmediateTask) GetTaskSequence() int64 {
Expand Down
Loading
Loading