Automates the Nifi dataflows
Dataflows can be fully automated using the Nifi API. This project taps and un-taps a dataflow by tracking the incoming/outgoing flowfiles.
More specifically, it turns on an initial processor and records the flowfiles generated by inspecting an incoming connection. After this, a middle processor is turned on. When all the flowfiles are registered in a outcoming connection, a final processor is turned on. Before finishing, it returns the pipeline to its initial state.
The following example represents the actions on the dataflow that get automated by the nifi_api library.
- The initial state of the dataflow:
- Turn on the "Initial" processor and turn off the "Final" processor:
- Turn off the "Initial" processor and record the flowfiles in the "Initial" connection, then turn on the "Middle" processor:
- Record the flowfiles in the "Final" Connection, when they coincide with the "Initial" flowfile turn off the "Middle" processor and turn on the "Final" processor.
- The flowfiles in the "Final" connection get consumed by the "Final" processor and the initial state of the dataflow is recovered.
Here is a recording of the Nifi UI when the tool is being executed on this dataflow:
The Nifi cluster used for testing is in the Cloudera Public Cloud and needs basic authentication credentials for accessing. The following environmental variables are necessary to access the cluster:
- CLOUDERA_USER=user
- CLOUDERA_PASS=password
- CLOUDERA_CLUSTER=https://<url_clustername>.cloudera.site/<clustername>
- CLOUDERA_NIFI_REST=/cdp-proxy-api/nifi-app/nifi-api/
For a pip
installation run:
pip install nifi-api
Consider the template Test_API.json
in the root folder, this is the template used in What automates?
section.
Write the data structure with the Nifi Ids (located in view configuration -> settings -> Id) of the processors and connections:
from nifi_api.environment import DataFlowIds
ids = {
"in_connection": {
"Id": "cc549c6e-0177-1000-ffff-ffffb5d2aba2",
"name": "First"
},
"out_connection": {
"Id": "51ab3b24-084f-1309-0000-00001946f2c7",
"name": "Final"
},
"in_processor": {
"Id": "36c62ad6-d606-3b04-9743-d77b6249608c",
"name": "First"
},
"middle_processor": {
"Id": "cc54862f-0177-1000-ffff-ffffe7325a20",
"name": "Middle"
},
"out_processor": {
"Id": "51ab3b1e-084f-1309-a135-aa0100d7186b",
"name": "Final"
},
}
data_ids = DataFlowIds(ids)
Instantiate and run:
from nifi_api.dataflow import DataFlow
dataflow = DataFlow(
dataflow_ids=data_ids,
delay_seconds_after_start=5,
delay_seconds_between_checks=5,
)
dataflow.run()
pipeline watching has started..
Pipeline watching has finished ...