Skip to content

Commit

Permalink
Remove internal retry from AddTask
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Aug 1, 2024
1 parent 06e5a6d commit 2b23bc8
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 79 deletions.
103 changes: 29 additions & 74 deletions service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (

"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
Expand All @@ -55,11 +54,15 @@ const (
// Time budget for empty task to propagate through the function stack and be returned to
// pollForActivityTask or pollForDecisionTask handler.
returnEmptyTaskTimeBudget = time.Second
// maxSyncMatchWaitTime is the max amount of time that we are willing to wait for a sync match to happen
maxSyncMatchWaitTime = 200 * time.Millisecond
)

var (
// ErrNoTasks is exported temporarily for integration test
ErrNoTasks = errors.New("no tasks")
ErrRemoteSyncMatchFailed = &types.RemoteSyncMatchedError{Message: "remote sync match failed"}
ErrTooManyOutstandingTasks = &types.ServiceBusyError{Message: "Too many outstanding appends to the TaskList"}
_stickyPollerUnavailableError = &types.StickyWorkerUnavailableError{Message: "sticky worker is unavailable, please use non-sticky task list."}
persistenceOperationRetryPolicy = common.CreatePersistenceRetryPolicy()
taskListActivityTypeTag = metrics.TaskListTypeTag("activity")
Expand Down Expand Up @@ -120,13 +123,6 @@ type (
}
)

const (
// maxSyncMatchWaitTime is the max amount of time that we are willing to wait for a sync match to happen
maxSyncMatchWaitTime = 200 * time.Millisecond
)

var errRemoteSyncMatchFailed = &types.RemoteSyncMatchedError{Message: "remote sync match failed"}

func NewManager(
domainCache cache.DomainCache,
logger log.Logger,
Expand Down Expand Up @@ -250,7 +246,6 @@ func (c *taskListManagerImpl) handleErr(err error) error {
// be written to database and later asynchronously matched with a poller
func (c *taskListManagerImpl) AddTask(ctx context.Context, params AddTaskParams) (bool, error) {
c.startWG.Wait()

if c.shouldReload() {
c.Stop()
return false, errShutdown
Expand All @@ -259,69 +254,51 @@ func (c *taskListManagerImpl) AddTask(ctx context.Context, params AddTaskParams)
// request sent by history service
c.liveness.MarkAlive()
}
var syncMatch bool
e := event.E{
TaskListName: c.taskListID.GetName(),
TaskListKind: &c.taskListKind,
TaskListType: c.taskListID.GetType(),
TaskInfo: *params.TaskInfo,
}
_, err := c.executeWithRetry(func() (interface{}, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

domainEntry, err := c.domainCache.GetDomainByID(params.TaskInfo.DomainID)
if err != nil {
return nil, err
}

isForwarded := params.ForwardedFrom != ""

if _, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err != nil {
// standby task, only persist when task is not forwarded from child partition
syncMatch = false
if isForwarded {
return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
}

r, err := c.taskWriter.appendTask(params.TaskInfo)
return r, err
}
domainEntry, err := c.domainCache.GetDomainByID(params.TaskInfo.DomainID)
if err != nil {
return false, err
}

if isActive, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err == nil && isActive {
isolationGroup, err := c.getIsolationGroupForTask(ctx, params.TaskInfo)
if err != nil {
return false, err
}
// active task, try sync match first
syncMatch, err = c.trySyncMatch(ctx, params, isolationGroup)
syncMatch, err := c.trySyncMatch(ctx, params, isolationGroup)
if syncMatch {
e.EventName = "SyncMatched so not persisted"
event.Log(e)
return &persistence.CreateTasksResponse{}, err
}
if params.ActivityTaskDispatchInfo != nil {
return false, errRemoteSyncMatchFailed
return syncMatch, err
}
}

if isForwarded {
// forwarded from child partition - only do sync match
// child partition will persist the task when sync match fails
e.EventName = "Could not SyncMatched Forwarded Task so not persisted"
event.Log(e)
return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
}
if params.ActivityTaskDispatchInfo != nil {
return false, ErrRemoteSyncMatchFailed
}

e.EventName = "Task Sent to Writer"
if params.ForwardedFrom != "" {
// forwarded from child partition - only do sync match
// child partition will persist the task when sync match fails
e.EventName = "Could not SyncMatched Forwarded Task so not persisted"
event.Log(e)
return c.taskWriter.appendTask(params.TaskInfo)
})
return false, ErrRemoteSyncMatchFailed
}

if err == nil && !syncMatch {
e.EventName = "Task Sent to Writer"
event.Log(e)
_, err = c.taskWriter.appendTask(ctx, params.TaskInfo)
if err == nil {
c.taskReader.Signal()
}

return syncMatch, err
return false, c.handleErr(err)
}

// DispatchTask dispatches a task to a poller. When there are no pollers to pick
Expand Down Expand Up @@ -370,7 +347,7 @@ func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond
// reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack,
// which counts against our SLO. By shortening the timeout by a very small amount, the emptyTask can be
// returned to the handler before a context timeout error is generated.
childCtx, cancel := c.newChildContext(ctx, c.config.LongPollExpirationInterval(), returnEmptyTaskTimeBudget)
childCtx, cancel := newChildContext(ctx, c.config.LongPollExpirationInterval(), returnEmptyTaskTimeBudget)
defer cancel()

isolationGroup := IsolationGroupFromContext(ctx)
Expand Down Expand Up @@ -506,24 +483,6 @@ func (c *taskListManagerImpl) TaskListID() *Identifier {
return c.taskListID
}

// Retry operation on transient error. On rangeID update by another process calls c.Stop().
func (c *taskListManagerImpl) executeWithRetry(
operation func() (interface{}, error),
) (result interface{}, err error) {

op := func() error {
result, err = operation()
return err
}

throttleRetry := backoff.NewThrottleRetry(
backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
backoff.WithRetryableError(persistence.IsTransientError),
)
err = c.handleErr(throttleRetry.Do(context.Background(), op))
return
}

func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params AddTaskParams, isolationGroup string) (bool, error) {
task := newInternalTask(params.TaskInfo, nil, params.Source, params.ForwardedFrom, true, params.ActivityTaskDispatchInfo, isolationGroup)
childCtx := ctx
Expand All @@ -535,7 +494,7 @@ func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params AddTaskPa
if !task.IsForwarded() {
// when task is forwarded from another matching host, we trust the context as is
// otherwise, we override to limit the amount of time we can block on sync match
childCtx, cancel = c.newChildContext(ctx, waitTime, time.Second)
childCtx, cancel = newChildContext(ctx, waitTime, time.Second)
}
var matched bool
var err error
Expand All @@ -554,7 +513,7 @@ func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params AddTaskPa
// method to create child context when childContext cannot use
// all of parent's deadline but instead there is a need to leave
// some time for parent to do some post-work
func (c *taskListManagerImpl) newChildContext(
func newChildContext(
parent context.Context,
timeout time.Duration,
tailroom time.Duration,
Expand Down Expand Up @@ -661,10 +620,6 @@ func getTaskListTypeTag(taskListType int) metrics.Tag {
}
}

func createServiceBusyError(msg string) *types.ServiceBusyError {
return &types.ServiceBusyError{Message: msg}
}

func rangeIDToTaskIDBlock(rangeID, rangeSize int64) taskIDBlock {
return taskIDBlock{
start: (rangeID-1)*rangeSize + 1,
Expand Down
131 changes: 131 additions & 0 deletions service/matching/tasklist/task_list_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,137 @@ func TestAddTaskStandby(t *testing.T) {
require.False(t, syncMatch)
}

func TestAddTaskStandbyTimeout(t *testing.T) {
controller := gomock.NewController(t)
logger := testlogger.New(t)

cfg := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname")
cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)

tlm := createTestTaskListManagerWithConfig(t, logger, controller, cfg)
require.NoError(t, tlm.Start())

// always block on the channel
tlm.taskWriter.appendCh = nil

domainID := uuid.New()
workflowID := "some random workflowID"
runID := "some random runID"

addTaskParam := AddTaskParams{
TaskInfo: &persistence.TaskInfo{
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ScheduleID: 2,
ScheduleToStartTimeout: 5,
CreatedTime: time.Now(),
},
}

testStandbyDomainEntry := cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: domainID, Name: "some random domain name"},
&persistence.DomainConfig{Retention: 1},
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestAlternativeClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: cluster.TestAlternativeClusterName},
},
},
1234,
)
mockDomainCache := tlm.domainCache.(*cache.MockDomainCache)
mockDomainCache.EXPECT().GetDomainByID(domainID).Return(testStandbyDomainEntry, nil).AnyTimes()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
syncMatch, err := tlm.AddTask(ctx, addTaskParam)
require.Equal(t, ErrTooManyOutstandingTasks, err)
require.False(t, syncMatch)
}

func TestAddTask(t *testing.T) {
testCases := []struct {
name string
request AddTaskParams
mockSetup func(*cache.MockDomainCache)
want bool
wantErr bool
}{
{
name: "failed to get domain by id",
request: AddTaskParams{
TaskInfo: &persistence.TaskInfo{
DomainID: "domainID",
WorkflowID: "workflowID",
RunID: "runID",
ScheduleID: 2,
ScheduleToStartTimeout: 5,
CreatedTime: time.Now(),
},
},
mockSetup: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(nil, errors.New("test error"))
},
wantErr: true,
},
{
name: "activity remote sync match failed",
request: AddTaskParams{
TaskInfo: &persistence.TaskInfo{
DomainID: "domainID",
WorkflowID: "workflowID",
RunID: "runID",
ScheduleID: 2,
ScheduleToStartTimeout: 5,
CreatedTime: time.Now(),
},
ActivityTaskDispatchInfo: &types.ActivityTaskDispatchInfo{},
},
mockSetup: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry("domain"), nil)
},
wantErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
controller := gomock.NewController(t)
logger := testlogger.New(t)
cfg := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname")
cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)

tm := NewTestTaskManager(t, logger, clock.NewRealTimeSource())
mockPartitioner := partition.NewMockPartitioner(controller)
mockDomainCache := cache.NewMockDomainCache(controller)
mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("domain", nil)
tl := "tl"
dID := "domain"
tlID, err := NewIdentifier(dID, tl, persistence.TaskListTypeActivity)
require.NoError(t, err)
tlKind := types.TaskListKindNormal
tlm, err := NewManager(mockDomainCache, logger, metrics.NewClient(tally.NoopScope, metrics.Matching), tm, cluster.GetTestClusterMetadata(true), mockPartitioner, nil, func(Manager) {}, tlID, &tlKind, cfg, clock.NewRealTimeSource(), time.Now())
require.NoError(t, err)
err = tlm.Start()
require.NoError(t, err)

tc.mockSetup(mockDomainCache)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
syncMatch, err := tlm.AddTask(ctx, tc.request)
if tc.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tc.want, syncMatch)
}
})
}
}

func TestGetPollerIsolationGroup(t *testing.T) {
controller := gomock.NewController(t)
logger := testlogger.New(t)
Expand Down
4 changes: 2 additions & 2 deletions service/matching/tasklist/task_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ func (tr *taskReader) completeTask(task *persistence.TaskInfo, err error) {
// Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be
// re-written to persistence frequently.
op := func() error {
_, err := tr.taskWriter.appendTask(task)
_, err := tr.taskWriter.appendTask(tr.cancelCtx, task)
return err
}
err = tr.throttleRetry.Do(context.Background(), op)
err = tr.throttleRetry.Do(tr.cancelCtx, op)
if err != nil {
// OK, we also failed to write to persistence.
// This should only happen in very extreme cases where persistence is completely down.
Expand Down
6 changes: 3 additions & 3 deletions service/matching/tasklist/task_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (w *taskWriter) isStopped() bool {
return atomic.LoadInt64(&w.stopped) == 1
}

func (w *taskWriter) appendTask(taskInfo *persistence.TaskInfo) (*persistence.CreateTasksResponse, error) {
func (w *taskWriter) appendTask(ctx context.Context, taskInfo *persistence.TaskInfo) (*persistence.CreateTasksResponse, error) {
if w.isStopped() {
return nil, errShutdown
}
Expand All @@ -142,8 +142,8 @@ func (w *taskWriter) appendTask(taskInfo *persistence.TaskInfo) (*persistence.Cr
// it to cassandra, just bail out and fail this request
return nil, errShutdown
}
default: // channel is full, throttle
return nil, createServiceBusyError("Too many outstanding appends to the TaskList")
case <-ctx.Done(): // channel is full, throttle
return nil, ErrTooManyOutstandingTasks
}
}

Expand Down

0 comments on commit 2b23bc8

Please sign in to comment.