Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Goroutine leak after Worker.Stop() #1129

Open
asido opened this issue Sep 16, 2021 · 2 comments
Open

Goroutine leak after Worker.Stop() #1129

asido opened this issue Sep 16, 2021 · 2 comments

Comments

@asido
Copy link

asido commented Sep 16, 2021

Describe the bug
Worker.Stop() doesn't await for goroutines to stop, which goleak identifies as a goroutine leak.

To Reproduce
Is the issue reproducible?

  • Yes

Steps to reproduce the behavior:

go.mod:

module cadence-goleak

go 1.16

require (
	go.uber.org/cadence v0.17.0
	go.uber.org/fx v1.14.2
	go.uber.org/goleak v1.1.10
	go.uber.org/yarpc v1.57.1
)

main.go:

package main

import (
	"context"

	"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
	"go.uber.org/cadence/worker"
	"go.uber.org/cadence/workflow"
	"go.uber.org/fx"
	"go.uber.org/yarpc/api/transport"
	"go.uber.org/yarpc/yarpctest"
)

var opts = fx.Options(
	fx.Provide(newCadenceWorker),
	fx.Invoke(registerCadenceWorker),
)

func main() {
	fx.New(opts).Run()
}

func newCadenceWorker() worker.Worker {
	serviceClient := workflowserviceclient.New(&transport.OutboundConfig{
		CallerName: "caller-name",
		Outbounds: transport.Outbounds{
			ServiceName: "service-name",
			Unary:       yarpctest.NewFakeTransport().NewOutbound(yarpctest.NewFakePeerList()),
		},
	})

	w := worker.New(serviceClient, "cadence-domain", "task-list", worker.Options{})
	w.RegisterWorkflowWithOptions(noopWorkflow, workflow.RegisterOptions{Name: "noop-workflow"})
	return w
}

func registerCadenceWorker(worker worker.Worker, lifecycle fx.Lifecycle) {
	lifecycle.Append(fx.Hook{
		OnStart: func(context.Context) error {
			return worker.Start()
		},
		OnStop: func(context.Context) error {
			worker.Stop()
			return nil
		},
	})
}

func noopWorkflow(ctx workflow.Context) error {
	return nil
}

main_test.go:

package main

import (
	"testing"
	"time"

	"go.uber.org/fx/fxtest"
	"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
	goleak.VerifyTestMain(m)
}

func Test_app_runs(t *testing.T) {
	app := fxtest.New(t, opts)
	app.RequireStart()
	// Give time for worker polling to begin.
	time.Sleep(time.Second)
	app.RequireStop()
}
$ go test .
goleak: Errors on successful test run: found unexpected goroutines:
[Goroutine 24 in state sleep, with time.Sleep on top of the stack:
goroutine 24 [sleep]:
time.Sleep(0x2035a97a4)
	/usr/local/Cellar/go/1.16.6/libexec/src/runtime/time.go:193 +0xd2
go.uber.org/cadence/internal/common/backoff.(*ConcurrentRetrier).throttleInternal(0xc000160aa0, 0xc00007cd20)
	/Users/asido/go/pkg/mod/go.uber.org/[email protected]/internal/common/backoff/retry.go:62 +0x7e
go.uber.org/cadence/internal/common/backoff.(*ConcurrentRetrier).Throttle(...)
	/Users/asido/go/pkg/mod/go.uber.org/[email protected]/internal/common/backoff/retry.go:48
go.uber.org/cadence/internal.(*baseWorker).pollTask(0xc0003581c0)
	/Users/asido/go/pkg/mod/go.uber.org/[email protected]/internal/internal_worker_base.go:261 +0x4a
go.uber.org/cadence/internal.(*baseWorker).runPoller(0xc0003581c0)
	/Users/asido/go/pkg/mod/go.uber.org/[email protected]/internal/internal_worker_base.go:227 +0xb5
created by go.uber.org/cadence/internal.(*baseWorker).Start
	/Users/asido/go/pkg/mod/go.uber.org/[email protected]/internal/internal_worker_base.go:190 +0xbb

 Goroutine 13 in state semacquire, with sync.runtime_Semacquire on top of the stack:
goroutine 13 [semacquire]:
sync.runtime_Semacquire(0xc000358238)
	/usr/local/Cellar/go/1.16.6/libexec/src/runtime/sema.go:56 +0x45
sync.(*WaitGroup).Wait(0xc000358230)
	/usr/local/Cellar/go/1.16.6/libexec/src/sync/waitgroup.go:130 +0x65
go.uber.org/cadence/internal/common/util.AwaitWaitGroup.func1(0xc000358230, 0xc0000b2360)
	/Users/asido/go/pkg/mod/go.uber.org/[email protected]/internal/common/util/util.go:52 +0x2b
created by go.uber.org/cadence/internal/common/util.AwaitWaitGroup
	/Users/asido/go/pkg/mod/go.uber.org/[email protected]/internal/common/util/util.go:51 +0x6e
]
FAIL	cadence-goleak	10.567s

Expected behavior

$ go test .
ok  	cadence-goleak

Additional Context
Used cadence-client v0.17.0, because with the program above v0.18.2 download fails. But the reported problem is in both releases.

$ go get .
# go.uber.org/cadence/internal/common
../../../go/pkg/mod/go.uber.org/[email protected]/internal/common/thrift_util.go:31:38: not enough arguments in call to thrift.NewTSerializer().Write
	have (thrift.TStruct)
	want (context.Context, thrift.TStruct)
../../../go/pkg/mod/go.uber.org/[email protected]/internal/common/thrift_util.go:53:27: not enough arguments in call to t.Protocol.Flush
	have ()
	want (context.Context)
../../../go/pkg/mod/go.uber.org/[email protected]/internal/common/thrift_util.go:57:28: not enough arguments in call to t.Transport.Flush
	have ()
	want (context.Context)
@Groxx
Copy link
Contributor

Groxx commented Oct 1, 2021

Yeah, AFAICT the shutdown process just closes a channel and returns :| so there's no way to wait for a clean shutdown.

There are a few other shutdown leaks throughout the client, unfortunately they're definitely not all simple fixes.


As far as this error:

$ go get .
# go.uber.org/cadence/internal/common
../../../go/pkg/mod/go.uber.org/[email protected]/internal/common/thrift_util.go:31:38: not enough arguments in call to thrift.NewTSerializer().Write
	have (thrift.TStruct)
	want (context.Context, thrift.TStruct)
../../../go/pkg/mod/go.uber.org/[email protected]/internal/common/thrift_util.go:53:27: not enough arguments in call to t.Protocol.Flush
	have ()
	want (context.Context)
../../../go/pkg/mod/go.uber.org/[email protected]/internal/common/thrift_util.go:57:28: not enough arguments in call to t.Transport.Flush
	have ()
	want (context.Context)

^ that means you already have a version of thrift that's too new for cadence. They made a breaking change without releasing a new version, and go modules do not allow specifying upper limits, so unfortunately you need to downgrade it to something compatible. If you do a "clean" go get, it works, because that downloads the minimum versions we specify in our go.mod file.

@longquanzheng
Copy link
Collaborator

You can pin the thrift version: like this: https://github.com/uber/cadence/blob/d3d06825adcf11c20ec3fc58e329f1d9560bb729/go.mod#L92

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants