Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output pipe disappeared! when trying to load data with flows #150

Open
3 tasks done
zelima opened this issue Oct 17, 2018 · 3 comments
Open
3 tasks done

Output pipe disappeared! when trying to load data with flows #150

zelima opened this issue Oct 17, 2018 · 3 comments

Comments

@zelima
Copy link
Contributor

zelima commented Oct 17, 2018

In order to submit an issue, please ensure you can check the following. Thanks!

  • python version 3.6
  • operating system Ubuntu
  • Dpp version 1.7.2

I have a piece of code with data flows that works fine if I execute it directly with python my_flow.py

# my_flow.py
from dataflows import Flow,  add_metadata, dump_to_path, load, printer

Flow(
    add_metadata(name="finance-vix"),
    load(
        load_source='http://www.cboe.com/publish/ScheduledTask/MktData/datahouse/vixcurrent.csv',
        headers=2
    ), printer(), dump_to_path(),
).process()

# Outout 
vixcurrent:
#     Date          VIX Open    VIX High     VIX Low    VIX Close
      (string)      (number)    (number)    (number)     (number)
----  ----------  ----------  ----------  ----------  -----------
1     1/2/2004         17.96       18.68       17.54        18.22
2     1/5/2004         18.45       18.49       17.44        17.49
3     1/6/2004         17.66       17.67       16.19        16.73
4     1/7/2004         16.72       16.75       15.5         15.5
5     1/8/2004         15.42       15.68       15.32        15.61
6     1/9/2004         16.15       16.88       15.57        16.75

However, if I wrap it inside flow() function and try and run pipelines via dpp run ./my-pipleine or run inside docker container via dpp server it fails silently, without showing me errors except saying Output pipe disappeared!.

# modified my_flow.py
from dataflows import Flow,  add_metadata, dump_to_path, load, printer

def flow(parameters, datapackage, resources, stats):
    return Flow(
        add_metadata(name="finance-vix"),
        load(
            load_source='http://www.cboe.com/publish/ScheduledTask/MktData/datahouse/vixcurrent.csv',
            headers=2
        ), printer(), dump_to_path()
    )

my pipeline-spec.yaml:

finance-vix-flow:
  pipeline:
    - flow: my_flow

my Docekrfile:

FROM frictionlessdata/datapackage-pipelines:latest


RUN apk --update --no-cache add libpq postgresql-dev libffi libffi-dev build-base python3-dev ca-certificates
RUN update-ca-certificates

WORKDIR /app
RUN apk add --update postgresql-client

ADD requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt

ADD . /app

CMD ["server"]

Error log on server:

(sink): >>> PROCESSED ROWS: 0
flow: DEBUG   :Starting new HTTP connection (1): www.cboe.com:80
flow: DEBUG   :http://www.cboe.com:80 "GET /publish/ScheduledTask/MktData/datahouse/vixcurrent.csv HTTP/1.1" 200 135448
flow: DEBUG   :Starting new HTTP connection (2): www.cboe.com:80
flow: DEBUG   :http://www.cboe.com:80 "GET /publish/ScheduledTask/MktData/datahouse/vixcurrent.csv HTTP/1.1" 200 135448
flow: DEBUG   :Starting new HTTP connection (1): www.cboe.com:80
flow: DEBUG   :http://www.cboe.com:80 "GET /publish/ScheduledTask/MktData/datahouse/vixcurrent.csv HTTP/1.1" 200 135448
flow: DEBUG   :Starting new HTTP connection (2): www.cboe.com:80
flow: DEBUG   :http://www.cboe.com:80 "GET /publish/ScheduledTask/MktData/datahouse/vixcurrent.csv HTTP/1.1" 200 135448
(sink): >>> PROCESSED ROWS: 0
flow: ERROR   :Output pipe disappeared!

Error log via dpp run ./finance-vix-flow

./finance-vix-flow: FAILURE, processed 0 rows
INFO    :RESULTS:
INFO    :FAILURE: ./finance-vix-flow 
ERROR log from processor flow:
+--------
| ERROR   :Output pipe disappeared!
+--------

@OriHoch
Copy link
Contributor

OriHoch commented Oct 23, 2018

the implementation is still a bit flaky, hopefully it will be improved in dpp v2

there are 3 possible problems with your code (or rather, with dpp..):

  1. need to set dpp:streaming: True on the resource
  2. stdout redirect is still buggy, so better not to use the printer
  3. not sure if required, but better to load the input datapackage and resources from the flow

fixed implementation (haven't tested):

# modified my_flow.py
from dataflows import Flow,  add_metadata, dump_to_path, load, printer, update_resources

def flow(parameters, datapackage, resources, stats):
    return Flow(
        load((datapackage, resources)),
        add_metadata(name="finance-vix"),
        load(
            load_source='http://www.cboe.com/publish/ScheduledTask/MktData/datahouse/vixcurrent.csv',
            headers=2
        ), 
        update_resource('vixcurrent', **{'dpp:streaming': True}),
        dump_to_path()
    )

if __name__ == '__main__':
    Flow(flow({}, {'resources': []}, [], {}), printer()).process()

@akariv
Copy link
Member

akariv commented Oct 23, 2018

This is true for version 1.7.2
Version 2.0.0 has introduced some modification, including:

@zelima does this reproduce in v2.0.0?

@zelima
Copy link
Contributor Author

zelima commented Oct 24, 2018

@zelima yes, by the time issue was open, this was happening in v2.0.0 as well. Though setting dpp:streaming: True helped both of them

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants