Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove internal retry from AddTask #6200

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 29 additions & 74 deletions service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (

"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
Expand All @@ -55,11 +54,15 @@ const (
// Time budget for empty task to propagate through the function stack and be returned to
// pollForActivityTask or pollForDecisionTask handler.
returnEmptyTaskTimeBudget = time.Second
// maxSyncMatchWaitTime is the max amount of time that we are willing to wait for a sync match to happen
maxSyncMatchWaitTime = 200 * time.Millisecond
)

var (
// ErrNoTasks is exported temporarily for integration test
ErrNoTasks = errors.New("no tasks")
ErrRemoteSyncMatchFailed = &types.RemoteSyncMatchedError{Message: "remote sync match failed"}
ErrTooManyOutstandingTasks = &types.ServiceBusyError{Message: "Too many outstanding appends to the TaskList"}
_stickyPollerUnavailableError = &types.StickyWorkerUnavailableError{Message: "sticky worker is unavailable, please use non-sticky task list."}
persistenceOperationRetryPolicy = common.CreatePersistenceRetryPolicy()
taskListActivityTypeTag = metrics.TaskListTypeTag("activity")
Expand Down Expand Up @@ -120,13 +123,6 @@ type (
}
)

const (
// maxSyncMatchWaitTime is the max amount of time that we are willing to wait for a sync match to happen
maxSyncMatchWaitTime = 200 * time.Millisecond
)

var errRemoteSyncMatchFailed = &types.RemoteSyncMatchedError{Message: "remote sync match failed"}

func NewManager(
domainCache cache.DomainCache,
logger log.Logger,
Expand Down Expand Up @@ -250,7 +246,6 @@ func (c *taskListManagerImpl) handleErr(err error) error {
// be written to database and later asynchronously matched with a poller
func (c *taskListManagerImpl) AddTask(ctx context.Context, params AddTaskParams) (bool, error) {
c.startWG.Wait()

if c.shouldReload() {
c.Stop()
return false, errShutdown
Expand All @@ -259,69 +254,51 @@ func (c *taskListManagerImpl) AddTask(ctx context.Context, params AddTaskParams)
// request sent by history service
c.liveness.MarkAlive()
}
var syncMatch bool
e := event.E{
TaskListName: c.taskListID.GetName(),
TaskListKind: &c.taskListKind,
TaskListType: c.taskListID.GetType(),
TaskInfo: *params.TaskInfo,
}
_, err := c.executeWithRetry(func() (interface{}, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

domainEntry, err := c.domainCache.GetDomainByID(params.TaskInfo.DomainID)
if err != nil {
return nil, err
}

isForwarded := params.ForwardedFrom != ""

if _, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err != nil {
// standby task, only persist when task is not forwarded from child partition
syncMatch = false
if isForwarded {
return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
}

r, err := c.taskWriter.appendTask(params.TaskInfo)
return r, err
}
domainEntry, err := c.domainCache.GetDomainByID(params.TaskInfo.DomainID)
if err != nil {
return false, err
}

if isActive, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err == nil && isActive {
isolationGroup, err := c.getIsolationGroupForTask(ctx, params.TaskInfo)
if err != nil {
return false, err
}
// active task, try sync match first
syncMatch, err = c.trySyncMatch(ctx, params, isolationGroup)
syncMatch, err := c.trySyncMatch(ctx, params, isolationGroup)
if syncMatch {
e.EventName = "SyncMatched so not persisted"
event.Log(e)
return &persistence.CreateTasksResponse{}, err
}
if params.ActivityTaskDispatchInfo != nil {
return false, errRemoteSyncMatchFailed
return syncMatch, err
}
}

if isForwarded {
// forwarded from child partition - only do sync match
// child partition will persist the task when sync match fails
e.EventName = "Could not SyncMatched Forwarded Task so not persisted"
event.Log(e)
return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
}
if params.ActivityTaskDispatchInfo != nil {
return false, ErrRemoteSyncMatchFailed
}

e.EventName = "Task Sent to Writer"
if params.ForwardedFrom != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would it be possible to put this into a method like params.IsForwarded() so that it's clearer what this check is indicating?

// forwarded from child partition - only do sync match
// child partition will persist the task when sync match fails
e.EventName = "Could not SyncMatched Forwarded Task so not persisted"
event.Log(e)
return c.taskWriter.appendTask(params.TaskInfo)
})
return false, ErrRemoteSyncMatchFailed
}

if err == nil && !syncMatch {
e.EventName = "Task Sent to Writer"
event.Log(e)
_, err = c.taskWriter.appendTask(ctx, params.TaskInfo)
if err == nil {
c.taskReader.Signal()
}

return syncMatch, err
return false, c.handleErr(err)
}

// DispatchTask dispatches a task to a poller. When there are no pollers to pick
Expand Down Expand Up @@ -370,7 +347,7 @@ func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond
// reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack,
// which counts against our SLO. By shortening the timeout by a very small amount, the emptyTask can be
// returned to the handler before a context timeout error is generated.
childCtx, cancel := c.newChildContext(ctx, c.config.LongPollExpirationInterval(), returnEmptyTaskTimeBudget)
childCtx, cancel := newChildContext(ctx, c.config.LongPollExpirationInterval(), returnEmptyTaskTimeBudget)
defer cancel()

isolationGroup := IsolationGroupFromContext(ctx)
Expand Down Expand Up @@ -506,24 +483,6 @@ func (c *taskListManagerImpl) TaskListID() *Identifier {
return c.taskListID
}

// Retry operation on transient error. On rangeID update by another process calls c.Stop().
func (c *taskListManagerImpl) executeWithRetry(
operation func() (interface{}, error),
) (result interface{}, err error) {

op := func() error {
result, err = operation()
return err
}

throttleRetry := backoff.NewThrottleRetry(
backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
backoff.WithRetryableError(persistence.IsTransientError),
)
err = c.handleErr(throttleRetry.Do(context.Background(), op))
return
}

func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params AddTaskParams, isolationGroup string) (bool, error) {
task := newInternalTask(params.TaskInfo, nil, params.Source, params.ForwardedFrom, true, params.ActivityTaskDispatchInfo, isolationGroup)
childCtx := ctx
Expand All @@ -535,7 +494,7 @@ func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params AddTaskPa
if !task.IsForwarded() {
// when task is forwarded from another matching host, we trust the context as is
// otherwise, we override to limit the amount of time we can block on sync match
childCtx, cancel = c.newChildContext(ctx, waitTime, time.Second)
childCtx, cancel = newChildContext(ctx, waitTime, time.Second)
}
var matched bool
var err error
Expand All @@ -554,7 +513,7 @@ func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params AddTaskPa
// method to create child context when childContext cannot use
// all of parent's deadline but instead there is a need to leave
// some time for parent to do some post-work
func (c *taskListManagerImpl) newChildContext(
func newChildContext(
parent context.Context,
timeout time.Duration,
tailroom time.Duration,
Expand Down Expand Up @@ -661,10 +620,6 @@ func getTaskListTypeTag(taskListType int) metrics.Tag {
}
}

func createServiceBusyError(msg string) *types.ServiceBusyError {
return &types.ServiceBusyError{Message: msg}
}

func rangeIDToTaskIDBlock(rangeID, rangeSize int64) taskIDBlock {
return taskIDBlock{
start: (rangeID-1)*rangeSize + 1,
Expand Down
131 changes: 131 additions & 0 deletions service/matching/tasklist/task_list_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,137 @@ func TestAddTaskStandby(t *testing.T) {
require.False(t, syncMatch)
}

func TestAddTaskStandbyTimeout(t *testing.T) {
controller := gomock.NewController(t)
logger := testlogger.New(t)

cfg := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname")
cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)

tlm := createTestTaskListManagerWithConfig(t, logger, controller, cfg)
require.NoError(t, tlm.Start())

// always block on the channel
tlm.taskWriter.appendCh = nil

domainID := uuid.New()
workflowID := "some random workflowID"
runID := "some random runID"

addTaskParam := AddTaskParams{
TaskInfo: &persistence.TaskInfo{
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ScheduleID: 2,
ScheduleToStartTimeout: 5,
CreatedTime: time.Now(),
},
}

testStandbyDomainEntry := cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: domainID, Name: "some random domain name"},
&persistence.DomainConfig{Retention: 1},
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestAlternativeClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: cluster.TestAlternativeClusterName},
},
},
1234,
)
mockDomainCache := tlm.domainCache.(*cache.MockDomainCache)
mockDomainCache.EXPECT().GetDomainByID(domainID).Return(testStandbyDomainEntry, nil).AnyTimes()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
syncMatch, err := tlm.AddTask(ctx, addTaskParam)
require.Equal(t, ErrTooManyOutstandingTasks, err)
require.False(t, syncMatch)
}

func TestAddTask(t *testing.T) {
testCases := []struct {
name string
request AddTaskParams
mockSetup func(*cache.MockDomainCache)
want bool
wantErr bool
}{
{
name: "failed to get domain by id",
request: AddTaskParams{
TaskInfo: &persistence.TaskInfo{
DomainID: "domainID",
WorkflowID: "workflowID",
RunID: "runID",
ScheduleID: 2,
ScheduleToStartTimeout: 5,
CreatedTime: time.Now(),
},
},
mockSetup: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(nil, errors.New("test error"))
},
wantErr: true,
},
{
name: "activity remote sync match failed",
request: AddTaskParams{
TaskInfo: &persistence.TaskInfo{
DomainID: "domainID",
WorkflowID: "workflowID",
RunID: "runID",
ScheduleID: 2,
ScheduleToStartTimeout: 5,
CreatedTime: time.Now(),
},
ActivityTaskDispatchInfo: &types.ActivityTaskDispatchInfo{},
},
mockSetup: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry("domain"), nil)
},
wantErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
controller := gomock.NewController(t)
logger := testlogger.New(t)
cfg := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname")
cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)

tm := NewTestTaskManager(t, logger, clock.NewRealTimeSource())
mockPartitioner := partition.NewMockPartitioner(controller)
mockDomainCache := cache.NewMockDomainCache(controller)
mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("domain", nil)
tl := "tl"
dID := "domain"
tlID, err := NewIdentifier(dID, tl, persistence.TaskListTypeActivity)
require.NoError(t, err)
tlKind := types.TaskListKindNormal
tlm, err := NewManager(mockDomainCache, logger, metrics.NewClient(tally.NoopScope, metrics.Matching), tm, cluster.GetTestClusterMetadata(true), mockPartitioner, nil, func(Manager) {}, tlID, &tlKind, cfg, clock.NewRealTimeSource(), time.Now())
require.NoError(t, err)
err = tlm.Start()
require.NoError(t, err)

tc.mockSetup(mockDomainCache)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
syncMatch, err := tlm.AddTask(ctx, tc.request)
if tc.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tc.want, syncMatch)
}
})
}
}

func TestGetPollerIsolationGroup(t *testing.T) {
controller := gomock.NewController(t)
logger := testlogger.New(t)
Expand Down
4 changes: 2 additions & 2 deletions service/matching/tasklist/task_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ func (tr *taskReader) completeTask(task *persistence.TaskInfo, err error) {
// Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be
// re-written to persistence frequently.
op := func() error {
_, err := tr.taskWriter.appendTask(task)
_, err := tr.taskWriter.appendTask(tr.cancelCtx, task)
return err
}
err = tr.throttleRetry.Do(context.Background(), op)
err = tr.throttleRetry.Do(tr.cancelCtx, op)
if err != nil {
// OK, we also failed to write to persistence.
// This should only happen in very extreme cases where persistence is completely down.
Expand Down
6 changes: 3 additions & 3 deletions service/matching/tasklist/task_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (w *taskWriter) isStopped() bool {
return atomic.LoadInt64(&w.stopped) == 1
}

func (w *taskWriter) appendTask(taskInfo *persistence.TaskInfo) (*persistence.CreateTasksResponse, error) {
func (w *taskWriter) appendTask(ctx context.Context, taskInfo *persistence.TaskInfo) (*persistence.CreateTasksResponse, error) {
if w.isStopped() {
return nil, errShutdown
}
Expand All @@ -142,8 +142,8 @@ func (w *taskWriter) appendTask(taskInfo *persistence.TaskInfo) (*persistence.Cr
// it to cassandra, just bail out and fail this request
return nil, errShutdown
}
default: // channel is full, throttle
return nil, createServiceBusyError("Too many outstanding appends to the TaskList")
case <-ctx.Done(): // channel is full, throttle
return nil, ErrTooManyOutstandingTasks
}
}

Expand Down
Loading