Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data processing pipeline #5

Open
sagikazarmark opened this issue May 27, 2017 · 15 comments
Open

Data processing pipeline #5

sagikazarmark opened this issue May 27, 2017 · 15 comments

Comments

@sagikazarmark
Copy link

Hey there,

Thanks for the great library, the Go market really lacked it.

Currently I am working on a data processing pipeline which imports data from various sources. This means that I have a data source with rows of data which I would like to process.

Any ideas how I could do it with this library?

(I know https://github.com/dailyburn/ratchet but I just can't get used to it's syntax)

Thanks in advance!

@adnaan
Copy link
Contributor

adnaan commented May 31, 2017

Hey,
Sorry for the delayed response.

Well it was never a primary intention of this library to be a establish data processing pipelines. For one, it provides only the flow control and leaves the processing to the implementer. I guess you would have probably have to write most of the data processing part. At this moment I am not sure, how would that work. I am definitely going to think about it though.

@sagikazarmark
Copy link
Author

sagikazarmark commented May 31, 2017

Thanks for your response.

I've been thinking about some solutions and I basically always got back to the same: make the pipeline stateless (currently it's not).

It would absolutely help to call the same pipeline multiple times which could be useful not just for a data processing pipeline. Currently you have to create a new pipeline each time you want to run one, even if you want to do the same steps on the context. I think it would be a great addition to achieve this (and probably can be done in a BC manner).

One way of doing that would be a way to pass the initial context/first request to the workflow.Run method. This would require change in the library.

Another solution (rather workaround) I have in mind is having a first step which reads from a channel every time I call Run, but it doesn't sound like a safe solution. A rather improved version of this would be using two channels: one carrying the context, another being just a signal that new data arrived and the the process should call workflow.Run.

I am pretty much in favor of option one. What do you think?

@adnaan
Copy link
Contributor

adnaan commented May 31, 2017

That sounds reasonable. About initial request:

How about we have another api ?

func RunRequest(request *Request) *Result 

But you would still need to have a new pipeline, because the following methods depend on it: Out, GetProgessPercent, GetDuration

If you do want to reuse the same steps, one might do this:

newPipeline.AddStage(oldstages...)
newPipeline.RunRequest(request *Request)*Result

I definitely think this can be implemented in a much better way, but don't see how it can be done BC. Maybe we can we have a better implementation in v2.

Please let me know if I am totally missing the point.

Also, could you elaborate more about the pipeline being stateless ?

@sagikazarmark
Copy link
Author

How about we have another api ?

Yeah, I was thinking about something similar. About the naming: I was thinking about something like RunWithContext, RunWithRequest.

Since the workflow is currently not stateless, it could also be part of the pipeline:

pipeline.InitialRequest(request)
pipeline.Run()

But this is probably less ideal.

Also, could you elaborate more about the pipeline being stateless ?

By stateless I mean there is no internal state in the pipeline, like initial data in a stage or like the ones mentioned (progress and duration). This would mean that subsequent (or even concurrent) calls to the same pipeline would be possible.

If making the pipeline stateless is not possible because of those details you mentioned (like we cannot return some thing as the result of calling Run along with Result) then we could create a PipelineBuilder holding the API for adding stages. Then create the stateful pipeline: pipeline := pipelineBuilder.Build()

This way building pipelines for batch use cases becomes a lot easier. Then the pipeline can be stateful and you do not reuse the pipeline itself, but it's builder.

Some code presenting the two solutions:

result, pipelineResult := pipeline.Run()

pipelineResult.Out()

This might not be perfect as it would require to return the result asynchronously.

pipelineBuilder.AddStage(...)

// loop
pipeline := pipelineBuilder.Build()
result := pipeline.Run()

Note that I haven't looked at the library code in-depth, so I might not be right about everything above, just trying to present the use-case and a few possible directions.

Right now for the V1 the pipeline builder might be the way (by deprecating adding stages directly to the pipeline)

@adnaan adnaan closed this as completed May 31, 2017
@adnaan adnaan reopened this May 31, 2017
@adnaan
Copy link
Contributor

adnaan commented May 31, 2017

I have a running engagement which is keeping me busy. Allow me some time to think more about this.

@sagikazarmark
Copy link
Author

Sure thing.

@adnaan
Copy link
Contributor

adnaan commented Jul 17, 2017

Hey there. I have just started looking at this again. Was wondering do you have an example of a library in another language which suits your purposes?

Also have you looked at Kasper : https://movio.co/en/blog/Kasper-process-library/

@sagikazarmark
Copy link
Author

sagikazarmark commented Jul 21, 2017 via email

@adnaan
Copy link
Contributor

adnaan commented Jul 24, 2017

portphp explains a lot. Going through it.

@sagikazarmark
Copy link
Author

Did you have the chance to give this a thought?

@adnaan
Copy link
Contributor

adnaan commented Sep 1, 2017

Sorry about the delay. Went through portphp as far as I could. Would love to connect over voice if possible. Please let me know: badr.adnaan at gmail

@sagikazarmark
Copy link
Author

I suggest that you sign on to slack and we talk there: http://slack.portphp.org/

@adnaan
Copy link
Contributor

adnaan commented Sep 11, 2017

For the issue log: have started working on this.

@adnaan
Copy link
Contributor

adnaan commented Sep 13, 2017

Incomplete PR for early feedback: #7

@sagikazarmark
Copy link
Author

Sorry, I didn't have too much time to review it so far, I'll try to do so.

In the meantime, here is a library with similar goals, probably with a smaller feature set: https://github.com/mitchellh/multistep

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants