Skip to content

Commit

Permalink
Remove internal retry from AddTask
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Jul 30, 2024
1 parent f3350d0 commit 3e06d67
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 76 deletions.
97 changes: 26 additions & 71 deletions service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (

"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
Expand All @@ -54,11 +53,15 @@ const (
// Time budget for empty task to propagate through the function stack and be returned to
// pollForActivityTask or pollForDecisionTask handler.
returnEmptyTaskTimeBudget = time.Second
// maxSyncMatchWaitTime is the max amount of time that we are willing to wait for a sync match to happen
maxSyncMatchWaitTime = 200 * time.Millisecond
)

var (
// ErrNoTasks is exported temporarily for integration test
ErrNoTasks = errors.New("no tasks")
ErrRemoteSyncMatchFailed = &types.RemoteSyncMatchedError{Message: "remote sync match failed"}
ErrTooManyOutstandingTasks = &types.ServiceBusyError{Message: "Too many outstanding appends to the TaskList"}
_stickyPollerUnavailableError = &types.StickyWorkerUnavailableError{Message: "sticky worker is unavailable, please use non-sticky task list."}
persistenceOperationRetryPolicy = common.CreatePersistenceRetryPolicy()
taskListActivityTypeTag = metrics.TaskListTypeTag("activity")
Expand Down Expand Up @@ -120,13 +123,6 @@ type (
}
)

const (
// maxSyncMatchWaitTime is the max amount of time that we are willing to wait for a sync match to happen
maxSyncMatchWaitTime = 200 * time.Millisecond
)

var errRemoteSyncMatchFailed = &types.RemoteSyncMatchedError{Message: "remote sync match failed"}

func NewManager(
domainCache cache.DomainCache,
logger log.Logger,
Expand Down Expand Up @@ -250,7 +246,6 @@ func (c *taskListManagerImpl) handleErr(err error) error {
// be written to database and later asynchronously matched with a poller
func (c *taskListManagerImpl) AddTask(ctx context.Context, params AddTaskParams) (bool, error) {
c.startWG.Wait()

if c.shouldReload() {
c.Stop()
return false, errShutdown
Expand All @@ -259,57 +254,39 @@ func (c *taskListManagerImpl) AddTask(ctx context.Context, params AddTaskParams)
// request sent by history service
c.liveness.MarkAlive()
}
var syncMatch bool
_, err := c.executeWithRetry(func() (interface{}, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

domainEntry, err := c.domainCache.GetDomainByID(params.TaskInfo.DomainID)
if err != nil {
return nil, err
}

isForwarded := params.ForwardedFrom != ""

if _, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err != nil {
// standby task, only persist when task is not forwarded from child partition
syncMatch = false
if isForwarded {
return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
}

r, err := c.taskWriter.appendTask(params.TaskInfo)
return r, err
}
domainEntry, err := c.domainCache.GetDomainByID(params.TaskInfo.DomainID)
if err != nil {
return false, err

Check warning on line 259 in service/matching/tasklist/task_list_manager.go

View check run for this annotation

Codecov / codecov/patch

service/matching/tasklist/task_list_manager.go#L259

Added line #L259 was not covered by tests
}

if isActive, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err == nil && isActive {
isolationGroup, err := c.getIsolationGroupForTask(ctx, params.TaskInfo)
if err != nil {
return false, err
}
// active task, try sync match first
syncMatch, err = c.trySyncMatch(ctx, params, isolationGroup)
syncMatch, err := c.trySyncMatch(ctx, params, isolationGroup)
if syncMatch {
return &persistence.CreateTasksResponse{}, err
}
if params.ActivityTaskDispatchInfo != nil {
return false, errRemoteSyncMatchFailed
return syncMatch, err

Check warning on line 270 in service/matching/tasklist/task_list_manager.go

View check run for this annotation

Codecov / codecov/patch

service/matching/tasklist/task_list_manager.go#L270

Added line #L270 was not covered by tests
}
}

if isForwarded {
// forwarded from child partition - only do sync match
// child partition will persist the task when sync match fails
return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
}
if params.ActivityTaskDispatchInfo != nil {
return false, ErrRemoteSyncMatchFailed

Check warning on line 275 in service/matching/tasklist/task_list_manager.go

View check run for this annotation

Codecov / codecov/patch

service/matching/tasklist/task_list_manager.go#L275

Added line #L275 was not covered by tests
}

return c.taskWriter.appendTask(params.TaskInfo)
})
if params.ForwardedFrom != "" {
// forwarded from child partition - only do sync match
// child partition will persist the task when sync match fails
return false, ErrRemoteSyncMatchFailed
}

if err == nil && !syncMatch {
_, err = c.taskWriter.appendTask(ctx, params.TaskInfo)
if err == nil {
c.taskReader.Signal()
}

return syncMatch, err
return false, c.handleErr(err)
}

// DispatchTask dispatches a task to a poller. When there are no pollers to pick
Expand Down Expand Up @@ -358,7 +335,7 @@ func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond
// reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack,
// which counts against our SLO. By shortening the timeout by a very small amount, the emptyTask can be
// returned to the handler before a context timeout error is generated.
childCtx, cancel := c.newChildContext(ctx, c.config.LongPollExpirationInterval(), returnEmptyTaskTimeBudget)
childCtx, cancel := newChildContext(ctx, c.config.LongPollExpirationInterval(), returnEmptyTaskTimeBudget)
defer cancel()

isolationGroup := IsolationGroupFromContext(ctx)
Expand Down Expand Up @@ -494,24 +471,6 @@ func (c *taskListManagerImpl) TaskListID() *Identifier {
return c.taskListID
}

// Retry operation on transient error. On rangeID update by another process calls c.Stop().
func (c *taskListManagerImpl) executeWithRetry(
operation func() (interface{}, error),
) (result interface{}, err error) {

op := func() error {
result, err = operation()
return err
}

throttleRetry := backoff.NewThrottleRetry(
backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
backoff.WithRetryableError(persistence.IsTransientError),
)
err = c.handleErr(throttleRetry.Do(context.Background(), op))
return
}

func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params AddTaskParams, isolationGroup string) (bool, error) {
task := newInternalTask(params.TaskInfo, nil, params.Source, params.ForwardedFrom, true, params.ActivityTaskDispatchInfo, isolationGroup)
childCtx := ctx
Expand All @@ -523,7 +482,7 @@ func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params AddTaskPa
if !task.IsForwarded() {
// when task is forwarded from another matching host, we trust the context as is
// otherwise, we override to limit the amount of time we can block on sync match
childCtx, cancel = c.newChildContext(ctx, waitTime, time.Second)
childCtx, cancel = newChildContext(ctx, waitTime, time.Second)
}
var matched bool
var err error
Expand All @@ -542,7 +501,7 @@ func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params AddTaskPa
// method to create child context when childContext cannot use
// all of parent's deadline but instead there is a need to leave
// some time for parent to do some post-work
func (c *taskListManagerImpl) newChildContext(
func newChildContext(
parent context.Context,
timeout time.Duration,
tailroom time.Duration,
Expand Down Expand Up @@ -649,10 +608,6 @@ func getTaskListTypeTag(taskListType int) metrics.Tag {
}
}

func createServiceBusyError(msg string) *types.ServiceBusyError {
return &types.ServiceBusyError{Message: msg}
}

func rangeIDToTaskIDBlock(rangeID, rangeSize int64) taskIDBlock {
return taskIDBlock{
start: (rangeID-1)*rangeSize + 1,
Expand Down
4 changes: 2 additions & 2 deletions service/matching/tasklist/task_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,10 @@ func (tr *taskReader) completeTask(task *persistence.TaskInfo, err error) {
// Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be
// re-written to persistence frequently.
op := func() error {
_, err := tr.taskWriter.appendTask(task)
_, err := tr.taskWriter.appendTask(tr.cancelCtx, task)

Check warning on line 358 in service/matching/tasklist/task_reader.go

View check run for this annotation

Codecov / codecov/patch

service/matching/tasklist/task_reader.go#L358

Added line #L358 was not covered by tests
return err
}
err = tr.throttleRetry.Do(context.Background(), op)
err = tr.throttleRetry.Do(tr.cancelCtx, op)

Check warning on line 361 in service/matching/tasklist/task_reader.go

View check run for this annotation

Codecov / codecov/patch

service/matching/tasklist/task_reader.go#L361

Added line #L361 was not covered by tests
if err != nil {
// OK, we also failed to write to persistence.
// This should only happen in very extreme cases where persistence is completely down.
Expand Down
6 changes: 3 additions & 3 deletions service/matching/tasklist/task_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (w *taskWriter) isStopped() bool {
return atomic.LoadInt64(&w.stopped) == 1
}

func (w *taskWriter) appendTask(taskInfo *persistence.TaskInfo) (*persistence.CreateTasksResponse, error) {
func (w *taskWriter) appendTask(ctx context.Context, taskInfo *persistence.TaskInfo) (*persistence.CreateTasksResponse, error) {
if w.isStopped() {
return nil, errShutdown
}
Expand All @@ -140,8 +140,8 @@ func (w *taskWriter) appendTask(taskInfo *persistence.TaskInfo) (*persistence.Cr
// it to cassandra, just bail out and fail this request
return nil, errShutdown
}
default: // channel is full, throttle
return nil, createServiceBusyError("Too many outstanding appends to the TaskList")
case <-ctx.Done(): // channel is full, throttle
return nil, ErrTooManyOutstandingTasks

Check warning on line 144 in service/matching/tasklist/task_writer.go

View check run for this annotation

Codecov / codecov/patch

service/matching/tasklist/task_writer.go#L143-L144

Added lines #L143 - L144 were not covered by tests
}
}

Expand Down

0 comments on commit 3e06d67

Please sign in to comment.