Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

local attrbiutes #102

Merged
merged 7 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 97 additions & 27 deletions engine/immediate_task_conrrent_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ package engine
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/xcherryio/apis/goapi/xcapi"
"github.com/xcherryio/xcherry/common/decision"
"github.com/xcherryio/xcherry/common/httperror"
"github.com/xcherryio/xcherry/persistence/data_models"
"io/ioutil"
"net/http"
"time"

"github.com/xcherryio/xcherry/common/log"
"github.com/xcherryio/xcherry/common/log/tag"
Expand Down Expand Up @@ -354,34 +355,69 @@ func (w *immediateTaskConcurrentProcessor) processExecuteTask(
var resp *xcapi.AsyncStateExecuteResponse
var httpResp *http.Response
loadedGlobalAttributesResp, errToCheck := w.loadGlobalAttributesIfNeeded(ctx, prep, task)
if errToCheck == nil {
req := apiClient.DefaultAPI.ApiV1XcherryWorkerAsyncStateExecutePost(ctx)
resp, httpResp, errToCheck = req.AsyncStateExecuteRequest(
xcapi.AsyncStateExecuteRequest{
Context: createApiContext(
prep,
task,
prep.Info.RecoverFromStateExecutionId,
prep.Info.RecoverFromApi),
ProcessType: prep.Info.ProcessType,
StateId: task.StateId,
StateInput: &xcapi.EncodedObject{
Encoding: prep.Input.Encoding,
Data: prep.Input.Data,
},
CommandResults: &prep.WaitUntilCommandResults,
LoadedGlobalAttributes: &loadedGlobalAttributesResp.Response,
},
).Execute()
if httpResp != nil {
defer httpResp.Body.Close()
}
if errToCheck != nil {
if httperror.CheckHttpResponseAndError(errToCheck, httpResp, w.logger) {
status, details := w.composeHttpError(errToCheck, httpResp, prep.Info, task)

if errToCheck == nil {
errToCheck = decision.ValidateDecision(resp.StateDecision)
nextIntervalSecs, shouldRetry := w.checkRetry(task, prep.Info)
if shouldRetry {
return w.retryTask(ctx, task, prep, nextIntervalSecs, status, details)
}
return w.applyStateFailureRecoveryPolicy(ctx,
task,
prep,
status,
details,
task.ImmediateTaskInfo.WorkerTaskBackoffInfo.CompletedAttempts,
xcapi.EXECUTE_API)
}
}
loadedLocalAttributesResp, errToCheck := w.loadLocalAttributesIfNeeded(ctx, prep, task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to use separate txn to load for now. but fyi, we will need to evolve to this later:
#93
and
#103

if errToCheck != nil {
if httperror.CheckHttpResponseAndError(errToCheck, httpResp, w.logger) {
status, details := w.composeHttpError(errToCheck, httpResp, prep.Info, task)

nextIntervalSecs, shouldRetry := w.checkRetry(task, prep.Info)
if shouldRetry {
return w.retryTask(ctx, task, prep, nextIntervalSecs, status, details)
}
return w.applyStateFailureRecoveryPolicy(ctx,
task,
prep,
status,
details,
task.ImmediateTaskInfo.WorkerTaskBackoffInfo.CompletedAttempts,
xcapi.EXECUTE_API)
}
}

req := apiClient.DefaultAPI.ApiV1XcherryWorkerAsyncStateExecutePost(ctx)
resp, httpResp, errToCheck = req.AsyncStateExecuteRequest(
xcapi.AsyncStateExecuteRequest{
Context: createApiContext(
prep,
task,
prep.Info.RecoverFromStateExecutionId,
prep.Info.RecoverFromApi),
ProcessType: prep.Info.ProcessType,
StateId: task.StateId,
StateInput: &xcapi.EncodedObject{
Encoding: prep.Input.Encoding,
Data: prep.Input.Data,
},
CommandResults: &prep.WaitUntilCommandResults,
LoadedGlobalAttributes: &loadedGlobalAttributesResp.Response,
LoadedLocalAttributes: &loadedLocalAttributesResp.Response,
},
).Execute()
if httpResp != nil {
defer httpResp.Body.Close()
}

if errToCheck == nil {
errToCheck = decision.ValidateDecision(resp.StateDecision)
}

if httperror.CheckHttpResponseAndError(errToCheck, httpResp, w.logger) {
status, details := w.composeHttpError(errToCheck, httpResp, prep.Info, task)

Expand Down Expand Up @@ -411,6 +447,8 @@ func (w *immediateTaskConcurrentProcessor) processExecuteTask(
TaskSequence: task.GetTaskSequence(),
GlobalAttributeTableConfig: prep.Info.GlobalAttributeConfig,
UpdateGlobalAttributes: resp.WriteToGlobalAttributes,
LocalAttributeConfig: prep.Info.LocalAttributeConfig,
UpdateLocalAttributes: resp.WriteToLocalAttributes,
})
if err != nil {
return err
Expand Down Expand Up @@ -595,3 +633,35 @@ func (w *immediateTaskConcurrentProcessor) loadGlobalAttributesIfNeeded(

return resp, err
}

func (w *immediateTaskConcurrentProcessor) loadLocalAttributesIfNeeded(
ctx context.Context, prep data_models.PrepareStateExecutionResponse, task data_models.ImmediateTask,
) (*data_models.LoadLocalAttributesResponse, error) {
if prep.Info.StateConfig == nil ||
prep.Info.StateConfig.LoadLocalAttributesRequest == nil {
return &data_models.LoadLocalAttributesResponse{}, nil
}

if prep.Info.LocalAttributeConfig == nil {
return &data_models.LoadLocalAttributesResponse{},
fmt.Errorf("local attribute config is not available")
}

w.logger.Debug("loading local attributes for state execute",
tag.StateExecutionId(task.GetStateExecutionId()),
tag.JsonValue(prep.Info.StateConfig),
tag.JsonValue(prep.Info.LocalAttributeConfig))

resp, err := w.store.LoadLocalAttributes(ctx, data_models.LoadLocalAttributesRequest{
ProcessExecutionId: task.ProcessExecutionId,
AllLocalAttributeKeys: *prep.Info.LocalAttributeConfig,
Request: *prep.Info.StateConfig.LoadLocalAttributesRequest,
})

w.logger.Debug("loaded local attributes for state execute",
tag.StateExecutionId(task.GetStateExecutionId()),
tag.JsonValue(resp),
tag.Error(err))

return resp, err
}
12 changes: 11 additions & 1 deletion extensions/data_models_row.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
package extensions

import (
"github.com/xcherryio/xcherry/persistence/data_models"
"time"

"github.com/xcherryio/xcherry/persistence/data_models"

"github.com/jmoiron/sqlx/types"
"github.com/xcherryio/xcherry/common/uuid"
)
Expand Down Expand Up @@ -250,4 +251,13 @@ type (
CustomTableRowSelect struct {
ColumnToValue map[string]string
}

LocalAttributeRow struct {
ProcessExecutionId uuid.UUID
ProcessExecutionIdString string
Key string
Value types.JSONText
Namespace string
ProcessId string
}
)
18 changes: 18 additions & 0 deletions extensions/postgres/non_transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,21 @@ func (d dbSession) SelectCustomTableByPK(
ColumnToValue: columnToValue,
}, nil
}

const selectLocalAttributesQuery = `SELECT
process_execution_id, key, value, namespace, process_id
FROM xdb_sys_local_attributes WHERE process_execution_id = ? AND key IN (?)
`

func (d dbSession) SelectLocalAttributes(
ctx context.Context, processExecutionId uuid.UUID, keys []string,
) ([]extensions.LocalAttributeRow, error) {
var rows []extensions.LocalAttributeRow
query, args, err := sqlx.In(selectLocalAttributesQuery, processExecutionId.String(), keys)
if err != nil {
return nil, err
}
query = d.db.Rebind(query)
err = d.db.SelectContext(ctx, &rows, query, args...)
return rows, err
}
11 changes: 10 additions & 1 deletion extensions/postgres/schema/xcherry_sys_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,13 @@ CREATE TABLE xcherry_sys_local_queue_messages(
queue_name VARCHAR(31),
payload jsonb,
PRIMARY KEY (process_execution_id, dedup_id)
);
);

CREATE TABLE xdb_sys_local_attributes(
process_execution_id uuid NOT NULL,
key VARCHAR(31) NOT NULL,
value jsonb,
namespace VARCHAR(31),
duoertai marked this conversation as resolved.
Show resolved Hide resolved
process_id VARCHAR(255),
PRIMARY KEY (process_execution_id, key)
);
22 changes: 22 additions & 0 deletions extensions/postgres/transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,25 @@ func (d dbTx) UpsertCustomTableByPK(
ON CONFLICT (`+pkName+`) DO `+updateClause)
return err
}

const insertLocalAttributeQuery = `INSERT INTO xdb_sys_local_attributes
(process_execution_id, key, value, namespace, process_id)
VALUES (:process_execution_id_string, :key, :value, :namespace, :process_id)
`

func (d dbTx) InsertLocalAttribute(ctx context.Context, row extensions.LocalAttributeRow) error {
row.ProcessExecutionIdString = row.ProcessExecutionId.String()
_, err := d.tx.NamedExecContext(ctx, insertLocalAttributeQuery, row)
return err
}

const updateLocalAttributeQuery = `UPDATE xdb_sys_local_attributes SET
duoertai marked this conversation as resolved.
Show resolved Hide resolved
value = :value
WHERE process_execution_id=:process_execution_id_string AND key=:key
`

func (d dbTx) UpdateLocalAttribute(ctx context.Context, row extensions.LocalAttributeRow) error {
row.ProcessExecutionIdString = row.ProcessExecutionId.String()
_, err := d.tx.NamedExecContext(ctx, updateLocalAttributeQuery, row)
return err
}
7 changes: 7 additions & 0 deletions extensions/sql_db_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type transactionalCRUD interface {
UpsertCustomTableByPK(
ctx context.Context, tableName string, pkName, pkValue string, colToValue map[string]string,
) error

InsertLocalAttribute(ctx context.Context, insert LocalAttributeRow) error
UpdateLocalAttribute(ctx context.Context, row LocalAttributeRow) error
}

type nonTransactionalCRUD interface {
Expand All @@ -102,6 +105,10 @@ type nonTransactionalCRUD interface {
SelectCustomTableByPK(
ctx context.Context, tableName string, pkName, pkValue string, columns []string,
) (*CustomTableRowSelect, error)

SelectLocalAttributes(
ctx context.Context, processExecutionId uuid.UUID, keys []string,
) ([]LocalAttributeRow, error)
}

type ErrorChecker interface {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/urfave/cli/v2 v2.25.7
github.com/xcherryio/apis v0.0.2
github.com/xcherryio/sdk-go v0.0.0-20231121001803-5542cc80706e
github.com/xcherryio/sdk-go v0.0.0-20231123063400-1b6cac11a27e
go.uber.org/multierr v1.10.0
go.uber.org/zap v1.26.0
gopkg.in/yaml.v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/xcherryio/apis v0.0.2 h1:sbxlWCrMrjmSQOLYOSHPiKZe+/LvpW0A5A6/PanUqRk=
github.com/xcherryio/apis v0.0.2/go.mod h1:7peiYpRUjmq0rl/8F0MmvFH8Vp7Y8Dq5OpRgpH0cMJU=
github.com/xcherryio/sdk-go v0.0.0-20231121001803-5542cc80706e h1:O1kd3ezvfB4TfN4hfKidqHDG/WpybCVXSIQY3FLL27A=
github.com/xcherryio/sdk-go v0.0.0-20231121001803-5542cc80706e/go.mod h1:3f+aG6R1WAurpTaZgemfnDqwIrp4BS3WJYM32ekm28k=
github.com/xcherryio/sdk-go v0.0.0-20231123063400-1b6cac11a27e h1:Jjmfz4loEYMkKZk3HlqliD6hVH/8EZfNU00ggMCZrDA=
github.com/xcherryio/sdk-go v0.0.0-20231123063400-1b6cac11a27e/go.mod h1:3f+aG6R1WAurpTaZgemfnDqwIrp4BS3WJYM32ekm28k=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
Expand Down
2 changes: 2 additions & 0 deletions persistence/data_models/async_state_execution_info_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type AsyncStateExecutionInfoJson struct {
RecoverFromStateExecutionId *string `json:"recoverFromStateExecutionId,omitempty"`
RecoverFromApi *xcapi.StateApiType `json:"recoverFromApi,omitempty"`
GlobalAttributeConfig *InternalGlobalAttributeConfig `json:"globalAttributeConfig"`
LocalAttributeConfig *InternalLocalAttributeConfig `json:"localAttributeConfig"`
}

func FromStartRequestToStateInfoBytes(req xcapi.ProcessExecutionStartRequest) ([]byte, error) {
Expand All @@ -27,6 +28,7 @@ func FromStartRequestToStateInfoBytes(req xcapi.ProcessExecutionStartRequest) ([
WorkerURL: req.GetWorkerUrl(),
StateConfig: req.StartStateConfig,
GlobalAttributeConfig: getInternalGlobalAttributeConfig(req),
LocalAttributeConfig: getInternalLocalAttributeConfig(req),
}

return infoJson.ToBytes()
Expand Down
19 changes: 18 additions & 1 deletion persistence/data_models/data_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ package data_models

import (
"fmt"
"github.com/xcherryio/apis/goapi/xcapi"
"strconv"
"strings"

"github.com/xcherryio/apis/goapi/xcapi"
"github.com/xcherryio/xcherry/common/uuid"
)

Expand All @@ -25,6 +25,8 @@ type (
HasNewImmediateTask bool
FailedAtWriteInitGlobalAttributes bool
GlobalAttributeWriteError error
FailedAtWriteInitLocalAttributes bool
LocalAttributeWriteError error
}

StopProcessRequest struct {
Expand Down Expand Up @@ -257,6 +259,9 @@ type (
GlobalAttributeTableConfig *InternalGlobalAttributeConfig
UpdateGlobalAttributes []xcapi.GlobalAttributeTableRowUpdate

LocalAttributeConfig *InternalLocalAttributeConfig
UpdateLocalAttributes []xcapi.KeyValue

TaskShardId int32
TaskSequence int64
}
Expand All @@ -265,6 +270,8 @@ type (
HasNewImmediateTask bool
FailAtUpdatingGlobalAttributes bool
UpdatingGlobalAttributesError error
FailAtUpdatingLocalAttributes bool
UpdatingLocalAttributesError error
}

PublishToLocalQueueRequest struct {
Expand Down Expand Up @@ -322,6 +329,16 @@ type (
FailAtUpdatingGlobalAttributes bool
UpdatingGlobalAttributesError error
}

LoadLocalAttributesRequest struct {
ProcessExecutionId uuid.UUID
AllLocalAttributeKeys InternalLocalAttributeConfig
Request xcapi.LoadLocalAttributesRequest
}

LoadLocalAttributesResponse struct {
Response xcapi.LoadLocalAttributesResponse
}
)

func (t ImmediateTask) GetTaskSequence() int64 {
Expand Down
13 changes: 13 additions & 0 deletions persistence/data_models/internal_global_attribute_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,16 @@ type InternalGlobalAttributeConfig struct {
// key is the table name, value is the primary key name and value
TablePrimaryKeys map[string]xcapi.TableColumnValue `json:"tablePrimaryKeys"`
}

func getInternalGlobalAttributeConfig(req xcapi.ProcessExecutionStartRequest) *InternalGlobalAttributeConfig {
if req.ProcessStartConfig != nil && req.ProcessStartConfig.GlobalAttributeConfig != nil {
primaryKeys := map[string]xcapi.TableColumnValue{}
for _, cfg := range req.ProcessStartConfig.GlobalAttributeConfig.TableConfigs {
primaryKeys[cfg.TableName] = cfg.PrimaryKey
}
return &InternalGlobalAttributeConfig{
TablePrimaryKeys: primaryKeys,
}
}
return nil
}
Loading
Loading