Skip to content

Commit

Permalink
Add waitForProcessComplete (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
zklgame authored Jun 11, 2024
1 parent a33e9ff commit a7263a5
Show file tree
Hide file tree
Showing 16 changed files with 518 additions and 40 deletions.
4 changes: 4 additions & 0 deletions engine/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ var defaultWorkerTaskBackoffRetryPolicy = xcapi.RetryPolicy{
MaximumAttempts: ptr.Any(int32(0)),
MaximumAttemptsDurationSeconds: ptr.Any(int32(0)),
}

const DEFAULT_WAIT_FOR_TIMEOUT_MAX int32 = 30

const WaitForProcessCompletionResultStop string = "STOP"
80 changes: 61 additions & 19 deletions engine/immediate_task_concurrent_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"

"github.com/xcherryio/apis/goapi/xcapi"
Expand All @@ -27,14 +28,15 @@ type immediateTaskConcurrentProcessor struct {
rootCtx context.Context
cfg config.Config
taskToProcessChan chan data_models.ImmediateTask
// for quickly checking if the shardId is being processed
currentShards map[int32]bool
// shardId to the channel
// shardId: channel
taskToCommitChans map[int32]chan<- data_models.ImmediateTask
taskNotifier TaskNotifier
processStore persistence.ProcessStore
visibilityStore persistence.VisibilityStore
logger log.Logger
// shardId: WaitForProcessCompletionChannels
waitForProcessCompletionChannelsPerShardMap map[int32]WaitForProcessCompletionChannels
taskNotifier TaskNotifier
processStore persistence.ProcessStore
visibilityStore persistence.VisibilityStore
logger log.Logger
lock sync.RWMutex
}

func NewImmediateTaskConcurrentProcessor(
Expand All @@ -47,12 +49,13 @@ func NewImmediateTaskConcurrentProcessor(
rootCtx: ctx,
cfg: cfg,
taskToProcessChan: make(chan data_models.ImmediateTask, bufferSize),
currentShards: map[int32]bool{},
taskToCommitChans: make(map[int32]chan<- data_models.ImmediateTask),
taskNotifier: notifier,
processStore: processStore,
visibilityStore: visibilityStore,
logger: logger,
waitForProcessCompletionChannelsPerShardMap: make(map[int32]WaitForProcessCompletionChannels),
taskNotifier: notifier,
processStore: processStore,
visibilityStore: visibilityStore,
logger: logger,
lock: sync.RWMutex{},
}
}

Expand All @@ -66,17 +69,44 @@ func (w *immediateTaskConcurrentProcessor) GetTasksToProcessChan() chan<- data_m
func (w *immediateTaskConcurrentProcessor) AddImmediateTaskQueue(
shardId int32, tasksToCommitChan chan<- data_models.ImmediateTask,
) (alreadyExisted bool) {
exists := w.currentShards[shardId]
w.currentShards[shardId] = true
w.taskToCommitChans[shardId] = tasksToCommitChan
w.lock.Lock()
defer w.lock.Unlock()

_, exists := w.taskToCommitChans[shardId]
if !exists {
w.taskToCommitChans[shardId] = tasksToCommitChan
}

return exists
}

func (w *immediateTaskConcurrentProcessor) RemoveImmediateTaskQueue(shardId int32) {
delete(w.currentShards, shardId)
w.lock.Lock()
defer w.lock.Unlock()

delete(w.taskToCommitChans, shardId)
}

func (w *immediateTaskConcurrentProcessor) AddWaitForProcessCompletionChannels(shardId int32,
waitForProcessCompletionChannelsPerShard WaitForProcessCompletionChannels) (alreadyExisted bool) {
w.lock.Lock()
defer w.lock.Unlock()

_, exists := w.waitForProcessCompletionChannelsPerShardMap[shardId]
if !exists {
w.waitForProcessCompletionChannelsPerShardMap[shardId] = waitForProcessCompletionChannelsPerShard
}

return exists
}

func (w *immediateTaskConcurrentProcessor) RemoveWaitForProcessCompletionChannels(shardId int32) {
w.lock.Lock()
defer w.lock.Unlock()

delete(w.waitForProcessCompletionChannelsPerShardMap, shardId)
}

func (w *immediateTaskConcurrentProcessor) Start() error {
concurrency := w.cfg.AsyncService.ImmediateTaskQueue.ProcessorConcurrency

Expand All @@ -90,15 +120,18 @@ func (w *immediateTaskConcurrentProcessor) Start() error {
if !ok {
return
}
if !w.currentShards[task.ShardId] {

_, exists := w.taskToCommitChans[task.ShardId]
if !exists {
w.logger.Info("skip the stale task that is due to shard movement", tag.Shard(task.ShardId), tag.ID(task.GetTaskId()))
continue
}

err := w.processImmediateTask(w.rootCtx, task)

if w.currentShards[task.ShardId] { // check again
commitChan := w.taskToCommitChans[task.ShardId]
commitChan, exists := w.taskToCommitChans[task.ShardId]

if exists { // check again
if err != nil {
// put it back to the queue for immediate retry
// Note that if the error is because of invoking worker APIs, it will be sent to
Expand Down Expand Up @@ -496,6 +529,15 @@ func (w *immediateTaskConcurrentProcessor) processExecuteTask(
if compResp.HasNewImmediateTask {
w.notifyNewImmediateTask(task.ShardId, prep, task)
}

// signal to the process completion waiting channel
waitForProcessCompletionChannelsPerShard, ok := w.waitForProcessCompletionChannelsPerShardMap[task.ShardId]
if ok && compResp.ProcessStatus != data_models.ProcessExecutionStatusUndefined &&
compResp.ProcessStatus != data_models.ProcessExecutionStatusRunning {

waitForProcessCompletionChannelsPerShard.Signal(task.ProcessExecutionId.String(), compResp.ProcessStatus.String())
}

return nil
}

Expand Down
13 changes: 13 additions & 0 deletions engine/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type ImmediateTaskProcessor interface {
shardId int32, tasksToCommitChan chan<- data_models.ImmediateTask,
) (alreadyExisted bool)
RemoveImmediateTaskQueue(shardId int32)

AddWaitForProcessCompletionChannels(shardId int32,
waitForProcessCompletionChannelsPerShard WaitForProcessCompletionChannels) (alreadyExisted bool)
RemoveWaitForProcessCompletionChannels(shardId int32)
}

type TimerTaskProcessor interface {
Expand All @@ -68,3 +72,12 @@ type TimerTaskProcessor interface {
) (alreadyExisted bool)
RemoveTimerTaskQueue(shardId int32)
}

type WaitForProcessCompletionChannels interface {
Start()
Stop()

Add(processExecutionId string) chan string
Signal(processExecutionId string, result string)
TerminateWaiting(processExecutionId string)
}
158 changes: 158 additions & 0 deletions engine/wait_for_process_completion_channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2023 xCherryIO organization

// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package engine

import (
"fmt"
"github.com/xcherryio/xcherry/common/log"
"sync"
"time"
)

type WaitForProcessCompletionChannelsPerShardImpl struct {
shardId int32
logger log.Logger

processor ImmediateTaskProcessor

// processExecutionId : channel
channelMap map[string]chan string
// processExecutionId : a list of timestamps of when the waiting requests were created
waitingRequestCreatedAt map[string][]int64
lock sync.RWMutex
}

func NewWaitForProcessCompletionChannelsPerShardImplImpl(
shardId int32, logger log.Logger, processor ImmediateTaskProcessor) WaitForProcessCompletionChannels {
return &WaitForProcessCompletionChannelsPerShardImpl{
shardId: shardId,
logger: logger,

processor: processor,

channelMap: map[string]chan string{},
waitingRequestCreatedAt: map[string][]int64{},
lock: sync.RWMutex{},
}
}

func (w *WaitForProcessCompletionChannelsPerShardImpl) Start() {
w.processor.AddWaitForProcessCompletionChannels(w.shardId, w)
}

func (w *WaitForProcessCompletionChannelsPerShardImpl) Stop() {
w.processor.RemoveWaitForProcessCompletionChannels(w.shardId)

var procIds []string

for procId := range w.channelMap {
procIds = append(procIds, procId)
}

for _, procId := range procIds {
w.Signal(procId, WaitForProcessCompletionResultStop)
}
}

func (w *WaitForProcessCompletionChannelsPerShardImpl) Add(processExecutionId string) chan string {
w.logger.Info(fmt.Sprintf("Add process execution completion waiting request for %s in shard %d",
processExecutionId, w.shardId))

w.lock.Lock()
defer w.lock.Unlock()

channel, ok := w.channelMap[processExecutionId]
if !ok {
channel = make(chan string)
w.channelMap[processExecutionId] = channel
}

w.waitingRequestCreatedAt[processExecutionId] = append(w.waitingRequestCreatedAt[processExecutionId], w.now())

return channel
}

func (w *WaitForProcessCompletionChannelsPerShardImpl) Signal(processExecutionId string, result string) {
channel, ok := w.channelMap[processExecutionId]
if !ok {
return
}

w.lock.Lock()
defer w.lock.Unlock()

count := len(w.waitingRequestCreatedAt[processExecutionId])

for i := 0; i < count; i++ {
select {
case channel <- result:
w.logger.Info(fmt.Sprintf("Signal process execution completion waiting result %d for %s: %s",
i, processExecutionId, result))
default:
w.logger.Info(fmt.Sprintf("Not signal process execution completion waiting result %d for %s: %s",
i, processExecutionId, result))
}
}

w.waitingRequestCreatedAt[processExecutionId] = []int64{}

go func() {
// sleep 3 seconds before close the channel
time.Sleep(time.Second * 3)

w.cleanup(processExecutionId)
}()
}

func (w *WaitForProcessCompletionChannelsPerShardImpl) TerminateWaiting(processExecutionId string) {
w.logger.Info(fmt.Sprintf("Terminate process execution completion waiting for %s in shard %d",
processExecutionId, w.shardId))

w.lock.Lock()
defer w.lock.Unlock()

var validWaitingRequestCreatedAt []int64

now := w.now()
for _, createdAt := range w.waitingRequestCreatedAt[processExecutionId] {
if createdAt+int64(DEFAULT_WAIT_FOR_TIMEOUT_MAX) < now {
w.logger.Info(fmt.Sprintf(
"Remove process execution completion waiting request created at %d for %s in shard %d",
createdAt, processExecutionId, w.shardId))
continue
}

validWaitingRequestCreatedAt = append(validWaitingRequestCreatedAt, createdAt)
}

w.waitingRequestCreatedAt[processExecutionId] = validWaitingRequestCreatedAt

if len(w.waitingRequestCreatedAt) == 0 {
w.cleanup(processExecutionId)
}
}

func (w *WaitForProcessCompletionChannelsPerShardImpl) cleanup(processExecutionId string) {
w.lock.Lock()
defer w.lock.Unlock()

delete(w.waitingRequestCreatedAt, processExecutionId)

channel, ok := w.channelMap[processExecutionId]
if !ok {
return
}

delete(w.channelMap, processExecutionId)
close(channel)

w.logger.Info(fmt.Sprintf("Close process execution completion waiting channel for %s in shard %d",
processExecutionId, w.shardId))
}

func (w *WaitForProcessCompletionChannelsPerShardImpl) now() int64 {
return time.Now().Unix()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b
github.com/stretchr/testify v1.8.4
github.com/urfave/cli/v2 v2.25.7
github.com/xcherryio/apis v0.0.3-0.20240313171434-ae652fc3c70f
github.com/xcherryio/apis v0.0.3-0.20240422013829-2c1c8bfbaa4b
github.com/xcherryio/sdk-go v0.0.0-20240115163029-e21cc0710e61
go.uber.org/multierr v1.10.0
go.uber.org/zap v1.26.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4d
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/xcherryio/apis v0.0.3-0.20240313171434-ae652fc3c70f h1:csBDKtifwAIRXaHpw3xiUqNDdS0As8OSrflQPr0bTm8=
github.com/xcherryio/apis v0.0.3-0.20240313171434-ae652fc3c70f/go.mod h1:7peiYpRUjmq0rl/8F0MmvFH8Vp7Y8Dq5OpRgpH0cMJU=
github.com/xcherryio/apis v0.0.3-0.20240422013829-2c1c8bfbaa4b h1:ea05r21EcjiedKXoADNLNAeOdm1W1mS/EMDWYfgSlnQ=
github.com/xcherryio/apis v0.0.3-0.20240422013829-2c1c8bfbaa4b/go.mod h1:7peiYpRUjmq0rl/8F0MmvFH8Vp7Y8Dq5OpRgpH0cMJU=
github.com/xcherryio/sdk-go v0.0.0-20240115163029-e21cc0710e61 h1:6Xr3S342Di2QuvagFb4uG1AkA8lQLWfED1ynZvnu3V0=
github.com/xcherryio/sdk-go v0.0.0-20240115163029-e21cc0710e61/go.mod h1:Ouc00E061VNVYemKbVQCxB3LSOgIkxV81h//1O1ODws=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type (

CompleteExecuteExecutionResponse struct {
HasNewImmediateTask bool
ProcessStatus ProcessExecutionStatus
FailedAtWritingAppDatabase bool
AppDatabaseWritingError error
}
Expand Down
1 change: 1 addition & 0 deletions persistence/process/complete_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,5 +187,6 @@ func (p sqlProcessStoreImpl) doCompleteExecuteExecutionTx(

return &data_models.CompleteExecuteExecutionResponse{
HasNewImmediateTask: hasNewImmediateTask,
ProcessStatus: prcRow.Status,
}, nil
}
2 changes: 2 additions & 0 deletions service/api/default_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const PathStopProcessExecution = "/api/v1/xcherry/service/process-execution/stop
const PathPublishToLocalQueue = "/api/v1/xcherry/service/process-execution/publish-to-local-queue"
const PathProcessExecutionRpc = "/api/v1/xcherry/service/process-execution/rpc"
const PathListProcessExecutions = "/api/v1/xcherry/service/process-execution/list"
const PathWaitForProcessCompletion = "/api/v1/xcherry/service/process-execution/wait-for-process-completion"

type defaultSever struct {
rootCtx context.Context
Expand Down Expand Up @@ -50,6 +51,7 @@ func NewDefaultAPIServerWithGin(
engine.POST(PathPublishToLocalQueue, handler.PublishToLocalQueue)
engine.POST(PathProcessExecutionRpc, handler.Rpc)
engine.POST(PathListProcessExecutions, handler.ListProcessExecutions)
engine.POST(PathWaitForProcessCompletion, handler.WaitForProcessCompletion)

svrCfg := cfg.ApiService.HttpServer
httpServer := &http.Server{
Expand Down
Loading

0 comments on commit a7263a5

Please sign in to comment.