Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The program has bug when the steps in a stage are set to concurrent #12

Open
liuchenailq opened this issue Nov 27, 2021 · 3 comments
Open

Comments

@liuchenailq
Copy link

liuchenailq commented Nov 27, 2021

When I set up concurrent execution for the steps of a stage, I find that program always execute the last step.
Looking at the source code, I found closure functions used for concurrent execution, which I guess is where the bug came from.

// file  stage.go
for _, step := range st.Steps {
			step.Status("begin")
			g.run(func() *Result {

				defer step.Status("end")

				//disables strict mode. g.run will wait for all steps to finish
				if st.DisableStrictMode {
					return step.Exec(request)
				}
				fmt.Println(step.getCtx().index, step.getCtx().name)
				resultChan := make(chan *Result, 1)
@adnaan
Copy link
Contributor

adnaan commented Nov 28, 2021

Hey there. Could you please provide me an example code for the buggy behaviour ?

@liuchenailq
Copy link
Author

When I run the sample code you gave, the sample code is pipeline/examples/advanced/main.go

The sample code creates a pipeline with three stages. The first stage is executed sequentially, the second and third stages are executed concurrently, and the first step of the third stage returns an error.

I found that the first stage executed sequentially worked well, while the second and third stages executed concurrently failed, running the last step three times in both cases.

The result is as follows,You can find that stage1 and stage2 both run step2, and run it three times.

[pipeline][getfiles]: begin
[stage-0][stage]: begin
[stage-0][stage]: is not concurrent
[step-0][getfiles.con_stage.*main.downloadStep]: begin
[step-0][getfiles.con_stage.*main.downloadStep]: &{Data:<nil> KeyVal:map[]}
[step-0][getfiles.con_stage.*main.downloadStep]: Started downloading file 1mbfile
[step-0][getfiles.con_stage.*main.downloadStep]: Successfully downloaded file 1mbfile
[step-0][getfiles.con_stage.*main.downloadStep]: end
[step-1][getfiles.con_err_stage.*main.downloadStep]: begin
[step-1][getfiles.con_err_stage.*main.downloadStep]: &{Data:{bytesDownloaded:102400} KeyVal:map[bytesDownloaded:102400]}
[step-1][getfiles.con_err_stage.*main.downloadStep]: Started downloading file 5mbfile
[step-1][getfiles.con_err_stage.*main.downloadStep]: Successfully downloaded file 5mbfile
[step-1][getfiles.con_err_stage.*main.downloadStep]: end
[step-2][getfiles.con_err_stage.*main.downloadStep]: begin
[step-2][getfiles.con_err_stage.*main.downloadStep]: &{Data:{bytesDownloaded:102400} KeyVal:map[bytesDownloaded:102400]}
[step-2][getfiles.con_err_stage.*main.downloadStep]: Started downloading file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: Successfully downloaded file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: end
[stage-0][stage]: end
[stage-1][con_stage]: begin
[stage-1][con_stage]: is concurrent
[step-0][getfiles.con_stage.*main.downloadStep]: begin
[step-1][getfiles.con_err_stage.*main.downloadStep]: begin
[step-2][getfiles.con_err_stage.*main.downloadStep]: begin
[step-2][getfiles.con_err_stage.*main.downloadStep]: &{Data:{bytesDownloaded:102400} KeyVal:map[bytesDownloaded:102400]}
[step-2][getfiles.con_err_stage.*main.downloadStep]: &{Data:{bytesDownloaded:102400} KeyVal:map[bytesDownloaded:102400]}
[step-2][getfiles.con_err_stage.*main.downloadStep]: Started downloading file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: Started downloading file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: &{Data:{bytesDownloaded:102400} KeyVal:map[bytesDownloaded:102400]}
[step-2][getfiles.con_err_stage.*main.downloadStep]: Started downloading file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: Successfully downloaded file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: end
[step-2][getfiles.con_err_stage.*main.downloadStep]: Successfully downloaded file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: end
[step-2][getfiles.con_err_stage.*main.downloadStep]: Successfully downloaded file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: end
[stage-1][con_stage]: end
[stage-2][con_err_stage]: begin
[stage-2][con_err_stage]: is concurrent
[step-0][getfiles.con_err_stage.*main.downloadStep]: begin
[step-1][getfiles.con_err_stage.*main.downloadStep]: begin
[step-2][getfiles.con_err_stage.*main.downloadStep]: begin
[step-2][getfiles.con_err_stage.*main.downloadStep]: &{Data:<nil> KeyVal:map[]}
[step-2][getfiles.con_err_stage.*main.downloadStep]: Started downloading file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: &{Data:<nil> KeyVal:map[]}
[step-2][getfiles.con_err_stage.*main.downloadStep]: Started downloading file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: &{Data:<nil> KeyVal:map[]}
[step-2][getfiles.con_err_stage.*main.downloadStep]: Started downloading file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: Successfully downloaded file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: end
[step-2][getfiles.con_err_stage.*main.downloadStep]: Successfully downloaded file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: end
[step-2][getfiles.con_err_stage.*main.downloadStep]: Successfully downloaded file 10mbfile
[step-2][getfiles.con_err_stage.*main.downloadStep]: end
[stage-2][con_err_stage]: end
[pipeline][getfiles]: end
timeTaken: 9.252079992s

Looking at the source code, I found that the stage.run() function uses closure functions, which I guess is where the bug came from.

// Run the stage execution sequentially
func (st *Stage) run(request *Request) *Result {
	if len(st.Steps) == 0 {
		return &Result{Error: fmt.Errorf("No steps to be executed")}
	}
	st.status("begin")
	defer st.status("end")

	if st.Concurrent {
		st.status("is concurrent")
		g, ctx := withContext(context.Background())
		for _, step := range st.Steps {
			step.Status("begin")
			g.run(func() *Result {

				defer step.Status("end")
				//disables strict mode. g.run will wait for all steps to finish
				if st.DisableStrictMode {
					return step.Exec(request)
				}

@glaye
Copy link

glaye commented Apr 20, 2023

Have you solved it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants