From 9e8aa65434960ba405165c1be8ef2c65cbe016d4 Mon Sep 17 00:00:00 2001 From: David Potter Date: Tue, 18 Jun 2024 14:11:11 -0700 Subject: [PATCH 01/20] improve v2 docs --- unstructured/ingest/v2/README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/unstructured/ingest/v2/README.md b/unstructured/ingest/v2/README.md index c11e2150f0..3a246cb355 100644 --- a/unstructured/ingest/v2/README.md +++ b/unstructured/ingest/v2/README.md @@ -127,3 +127,8 @@ because if enabling async, that code will be run in a single process with the as with a multiprocessing pool fan out. If the underlying code is completely blocking but the async flag is enabled, this will run as if it's a normal for loop and will get worse performance than is simply run via multiprocessing. One option to help support IO heavy tasks that might not support async yet is wrapping it in a `run_in_executor()` call. Otherwise, it is common that the underlying SDKs have an async version to run the same network calls without blocking the event loop. + + +# Creating V2 Connectors + +As compared to V1 connectors, V2 has a few more steps. Most notably the Index and Download steps have been separated. Also there is a Staging step that saves the documents in a format that is ready to \ No newline at end of file From bcc13d34fa9947d5883ee55233520975db1ca2a3 Mon Sep 17 00:00:00 2001 From: David Potter Date: Tue, 18 Jun 2024 14:11:43 -0700 Subject: [PATCH 02/20] more improvement --- unstructured/ingest/v2/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unstructured/ingest/v2/README.md b/unstructured/ingest/v2/README.md index 3a246cb355..e65b17bfa3 100644 --- a/unstructured/ingest/v2/README.md +++ b/unstructured/ingest/v2/README.md @@ -131,4 +131,4 @@ the underlying SDKs have an async version to run the same network calls without # Creating V2 Connectors -As compared to V1 connectors, V2 has a few more steps. Most notably the Index and Download steps have been separated. Also there is a Staging step that saves the documents in a format that is ready to \ No newline at end of file +As compared to V1 connectors, V2 has a few more steps. Most notably for Source Connectors the Index and Download steps have been separated. And in Destination connectors, there is a Staging step that saves the documents in a format that is ready to upload. Ideally the actual Upload step should be doing the minimum amount of processing before uploading. \ No newline at end of file From ae9134459959c2fb8bf0dc99c85e32820b8954e1 Mon Sep 17 00:00:00 2001 From: David Potter Date: Thu, 11 Jul 2024 15:11:01 -0700 Subject: [PATCH 03/20] progress --- unstructured/ingest/v2/README2.md | 91 +++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 unstructured/ingest/v2/README2.md diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md new file mode 100644 index 0000000000..e62a40cefb --- /dev/null +++ b/unstructured/ingest/v2/README2.md @@ -0,0 +1,91 @@ +# Developing V2 Connectors +## Intro +The Unstructured open source repo processes documents in a pipeline. The Source and Destination connectors sit at the front and back of the pipeline. For more details see below. + +## Simplest Example of a Pipeline +The simplest example of a pipeline would start with a local source connector, followed by a partioner, and then end with a local destination connector. Here is what the code to run this would look like: + +`local.py` + +``` +from unstructured.ingest.v2.interfaces import ProcessorConfig +from unstructured.ingest.v2.pipeline.pipeline import Pipeline +from unstructured.ingest.v2.processes.connectors.local import ( + LocalConnectionConfig, + LocalDownloaderConfig, + LocalIndexerConfig, + LocalUploaderConfig, +) +from unstructured.ingest.v2.processes.partitioner import PartitionerConfig + +if __name__ == "__main__": + Pipeline.from_configs( + context=ProcessorConfig( + verbose=True, + work_dir="local-working-dir", + reprocess=True, + re_download=True, + ), + source_connection_config=LocalConnectionConfig(), + indexer_config=LocalIndexerConfig(input_path="example-docs/fake-text.txt"), + downloader_config=LocalDownloaderConfig(), + partitioner_config=PartitionerConfig(), + uploader_config=LocalUploaderConfig(output_dir="local-working-dir/output"), + ).run() +``` +You can run this with `python local.py` (Adjust the `input_path` and `output_path` as appropriate.) + +The result would be a partitioned `fake-text.txt.json` file in the `local-output` directory. + +>This is the type of Python file you'll want to run while developing a new connector so that you can iterate on your connector. + +The ProcessorConfig attributes are optional, but are added to make development easier. + +Notice that the pipeline runs the following: + +* context - The ProcessorConfig runs the pipeline. The arguments are related to the overall pipeline. +* source_connection - Takes arguments needed to connect to the source. Local files don't need anything here. Other connectors will. +* indexer - Takes the files in the `input_path` and creates .json files that point the downloader step to the right files +* downloader - This does the actual downloading of the raw files (for non-blob files it may do something different like create a .txt file for every row in a source table) +* partitioner - Partitions the downloaded file provided it is a partionable file type. +* chunker/embedder - *Not represented here* but often needed to prepare files for upload to a vector database. +* stager - *Not represented here* but is often used to prepare partitioned files for upload. +* uploader - Uploads the blob-like files to the `output_dir`. + + +If you look at the folders/files in `local-working-dir` you will see the files that the pipeline creates as it runs. + +``` +local-working-dir +- index + - a4a1035d57ed.json +- output + - fake-text.txt.json +- partition + - 36caa9b04378.json +``` + +(Note that the index and partition file names are deterministic and based on the BLABLABLA) In the case of the local source connector, it won't download files because they are already local. Also note that the final file is named based on the original file with a .json since it has been partitioned. Not all output files will be named the same as the input file. An example is a table as a file, the output will be BLABLABLA. + + + + + + + + + + + + + + + + + + + +## More Advanced Pipeline (S3 Source) +Here is a more advanced pipeline with an S3 source connector, followed by a partioner, and then ending with a local connector. +>This is the type of Python file you'll want to create while developing a new **source** connector so that you can iterate on your source connector. + From f5cf7006f5ae71d065e596796c9a2c20d73b99d3 Mon Sep 17 00:00:00 2001 From: David Potter Date: Fri, 12 Jul 2024 08:55:01 -0700 Subject: [PATCH 04/20] destination info --- unstructured/ingest/v2/README2.md | 137 +++++++++++++++++++++++++++++- 1 file changed, 133 insertions(+), 4 deletions(-) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index e62a40cefb..a6236d3c50 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -1,6 +1,6 @@ # Developing V2 Connectors ## Intro -The Unstructured open source repo processes documents in a pipeline. The Source and Destination connectors sit at the front and back of the pipeline. For more details see below. +The Unstructured open source repo processes documents (artifacts) in a pipeline. The Source and Destination connectors sit at the front and back of the pipeline. For more details see below. ## Simplest Example of a Pipeline The simplest example of a pipeline would start with a local source connector, followed by a partioner, and then end with a local destination connector. Here is what the code to run this would look like: @@ -33,11 +33,11 @@ if __name__ == "__main__": uploader_config=LocalUploaderConfig(output_dir="local-working-dir/output"), ).run() ``` -You can run this with `python local.py` (Adjust the `input_path` and `output_path` as appropriate.) +You can run this with `python local.py` (Adjust the `input_path` and `output_dir` as appropriate.) The result would be a partitioned `fake-text.txt.json` file in the `local-output` directory. ->This is the type of Python file you'll want to run while developing a new connector so that you can iterate on your connector. +>*** The above is the type of Python file you'll want to run during development so that you can iterate on your connector. The ProcessorConfig attributes are optional, but are added to make development easier. @@ -65,15 +65,144 @@ local-working-dir - 36caa9b04378.json ``` -(Note that the index and partition file names are deterministic and based on the BLABLABLA) In the case of the local source connector, it won't download files because they are already local. Also note that the final file is named based on the original file with a .json since it has been partitioned. Not all output files will be named the same as the input file. An example is a table as a file, the output will be BLABLABLA. +(Note that the index and partition file names are deterministic and based on the BLABLABLA) In the case of the local source connector, it won't *download* files because they are already local. But for other source connectors there will be a `download` folder. Also note that the final file is named based on the original file with a .json since it has been partitioned. Not all output files will be named the same as the input file. An example is a table as a file, the output will be BLABLABLA. +You can see the source/destination connector file that it runs here: +!!! LINK HERE !!! +If you look through the file you will notice these Classes (actually @dataclasses because BLABLABLA) and functions +* LocalAccessConfig - This usually holds passwords, tokens, etc. This data gets hidden in all logs (and encrypted in our platform solution) +* LocalConnectionConfig - Username, host, port, etc. Anything needed for connecting to the service. It also imports the AccessConfig +* LocalIndexerConfig - Holds information +* LocalIndexer +* LocalDownloaderConfig + +* LocalDownloader + +* LocalUploaderConfig + +* LocalUploader + +* local_source_entry + +* local_destination_entry + + + +## Building a Destination Connector +We'll start with a Destination Connector because those are the easier to build. + +In this case we'll use the Chroma vector database destination because: + +* The service can be hosted locally. !!! https://docs.trychroma.com/guides +* We can show off the chunking and embedding step (used for vector database destinations) +* It uses a staging step to prepare the artifacts before uploading +* You can examine the Chroma database file easily since its just a sqlite database + + +The python file to iterate on development looks like this: + +`chroma.py` + +``` +import random # So we get a new Chroma collections on every run + +from unstructured.ingest.v2.interfaces import ProcessorConfig +from unstructured.ingest.v2.pipeline.pipeline import Pipeline +from unstructured.ingest.v2.processes.chunker import ChunkerConfig +from unstructured.ingest.v2.processes.connectors.chroma import ( + ChromaAccessConfig, + ChromaConnectionConfig, + ChromaUploaderConfig, + ChromaUploadStagerConfig, +) +from unstructured.ingest.v2.processes.connectors.local import ( + LocalConnectionConfig, + LocalDownloaderConfig, + LocalIndexerConfig, +) +from unstructured.ingest.v2.processes.embedder import EmbedderConfig +from unstructured.ingest.v2.processes.partitioner import PartitionerConfig + +if __name__ == "__main__": + Pipeline.from_configs( + context=ProcessorConfig( + verbose=True, + work_dir="chroma-working-dir", + reprocess=True, + re_download=True, + ), + source_connection_config=LocalConnectionConfig(), + indexer_config=LocalIndexerConfig(input_path="example-docs/fake-text.txt"), + downloader_config=LocalDownloaderConfig(), + partitioner_config=PartitionerConfig(), + + chunker_config=ChunkerConfig( + chunking_strategy="basic", + ), + embedder_config=EmbedderConfig(embedding_provider="langchain-huggingface"), + + destination_connection_config=ChromaConnectionConfig( + access_config=ChromaAccessConfig(settings=None, headers=None), + host="localhost", + port=8000, + collection_name=f"test-collection-{random.randint(1000,9999)}", + ), + stager_config=ChromaUploadStagerConfig(), + uploader_config=ChromaUploaderConfig(batch_size=10), + ).run() + +``` + +Notice how the top part looks similar to the local connector running file. But now we are adding a chunker and an embedder. And the destination connection is for the Chroma uploader. Also note that there is a stager_config. This is where we prepare the document/artifact in a custom way before uploading BLABLABLA. + +Let's run it. + + + +* blabla make sure you have run `pip install "unstructured[chroma]"` + +* in a separate terminal (with chroma installed) run +`./scripts/chroma-test-helpers/create-and-check-chroma.sh chroma-db-file` +the service should now be running on port 8000 + +* `python chroma.py` +* You can examine the resulting sqlite database (`chroma.sqlite3`) in the `chroma-db-file` directory if you want to see the results. + + +Let's look at the python file that it runs BLABLABLA + +!!! link to file + +* ChromaAccessConfig - Needed for connecting to Chroma. Usually sensitive attributes that will be hidden. + +* ChromaConnectionConfig - Non sensitive attributes. `collection_name` does not have a default value. `access_config` imports the ChromaAccessConfig and hides the values via `enhanced_field(sensitive=True)` + +* ChromaUploadStagerConfig - The Stager config. Didn't need anything for Chroma. + +* ChromaUploadStager - The conform_dict is the critical method here. It takes the file we get from the Embedder step and prepares it for upload to the Chroma database. But it does not upload it. It saves the file to the `upload_stage` directory. The file type can be whatever makes sense for the Uploader phase. + +* ChromaUploaderConfig - Attributes that are necessary for the upload stage specifically. The ChromaUploader will be upserting artifacts in batches. + +* ChromaUploader - Connects to the Client. And uploads artifacts. Note that it does the minimum amount of processing possible to the artifacts before uploading. The Stager phase is responsible for preparing artifacts. Chroma wants artifacts in a dictionary of lists so we do have to create that in the Uploader since there is not a practical way to store that in a file. + +* chroma_destination_entry - Registers the Chroma destination connector with the pipeline. (!!! LINK `unstructured/ingest/v2/processes/connectors/__init__.py`) + +Let's take a quick look at the working directory: +``` +chroma-working-dir +- chunk +- embed +- index +- partition +- upload_stage +``` From 71140deb77891c66090c5ab604a90ff0f450a91e Mon Sep 17 00:00:00 2001 From: David Potter Date: Fri, 12 Jul 2024 09:23:23 -0700 Subject: [PATCH 05/20] remove unnecessary --- unstructured/ingest/v2/README2.md | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index a6236d3c50..e976c6ed81 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -194,7 +194,7 @@ Let's look at the python file that it runs BLABLABLA * chroma_destination_entry - Registers the Chroma destination connector with the pipeline. (!!! LINK `unstructured/ingest/v2/processes/connectors/__init__.py`) -Let's take a quick look at the working directory: +Let's take a quick look at the `upload_stage` in working directory: ``` chroma-working-dir - chunk @@ -202,7 +202,9 @@ chroma-working-dir - index - partition - upload_stage + - e17715933baf.json ``` +e17715933baf.json is a json file which is appropriate for this destination connector. But it could very well be a .csv if the uploader is a relational database. Or if the destination is blob(file) storage you @@ -211,10 +213,3 @@ chroma-working-dir - - - -## More Advanced Pipeline (S3 Source) -Here is a more advanced pipeline with an S3 source connector, followed by a partioner, and then ending with a local connector. ->This is the type of Python file you'll want to create while developing a new **source** connector so that you can iterate on your source connector. - From c864735c278e6431e097ed5c4ec45fffa6c4758b Mon Sep 17 00:00:00 2001 From: David Potter Date: Fri, 12 Jul 2024 09:24:26 -0700 Subject: [PATCH 06/20] add coming soon --- unstructured/ingest/v2/README2.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index e976c6ed81..e15fb37214 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -1,6 +1,6 @@ # Developing V2 Connectors ## Intro -The Unstructured open source repo processes documents (artifacts) in a pipeline. The Source and Destination connectors sit at the front and back of the pipeline. For more details see below. +The Unstructured open source repo processes documents (artifacts) in a pipeline. The Source and Destination connectors sit at the front and back of the pipeline. For more details see below (COMING SOON). ## Simplest Example of a Pipeline The simplest example of a pipeline would start with a local source connector, followed by a partioner, and then end with a local destination connector. Here is what the code to run this would look like: From 972f19e825199223a00a274914519a6325be2d83 Mon Sep 17 00:00:00 2001 From: David Potter Date: Fri, 12 Jul 2024 09:25:35 -0700 Subject: [PATCH 07/20] mod --- unstructured/ingest/v2/README2.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index e15fb37214..49e111b981 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -5,6 +5,8 @@ The Unstructured open source repo processes documents (artifacts) in a pipeline. ## Simplest Example of a Pipeline The simplest example of a pipeline would start with a local source connector, followed by a partioner, and then end with a local destination connector. Here is what the code to run this would look like: +>*** This is the type of Python file you'll want to run during development so that you can iterate on your connector. + `local.py` ``` @@ -37,7 +39,6 @@ You can run this with `python local.py` (Adjust the `input_path` and `output_dir The result would be a partitioned `fake-text.txt.json` file in the `local-output` directory. ->*** The above is the type of Python file you'll want to run during development so that you can iterate on your connector. The ProcessorConfig attributes are optional, but are added to make development easier. From 1cf047cb17eb6c50049f7902035e5387277699ac Mon Sep 17 00:00:00 2001 From: David Potter Date: Fri, 12 Jul 2024 09:34:00 -0700 Subject: [PATCH 08/20] upload stage --- unstructured/ingest/v2/README2.md | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index 49e111b981..c2183522e0 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -40,11 +40,10 @@ You can run this with `python local.py` (Adjust the `input_path` and `output_dir The result would be a partitioned `fake-text.txt.json` file in the `local-output` directory. -The ProcessorConfig attributes are optional, but are added to make development easier. Notice that the pipeline runs the following: -* context - The ProcessorConfig runs the pipeline. The arguments are related to the overall pipeline. +* context - The ProcessorConfig runs the pipeline. The arguments are related to the overall pipeline. We added some optional args to make development easier. * source_connection - Takes arguments needed to connect to the source. Local files don't need anything here. Other connectors will. * indexer - Takes the files in the `input_path` and creates .json files that point the downloader step to the right files * downloader - This does the actual downloading of the raw files (for non-blob files it may do something different like create a .txt file for every row in a source table) @@ -80,7 +79,7 @@ If you look through the file you will notice these Classes (actually @dataclasse * LocalIndexerConfig - Holds information -* LocalIndexer +* LocalIndexer - BLABLABLA * LocalDownloaderConfig @@ -97,7 +96,7 @@ If you look through the file you will notice these Classes (actually @dataclasse ## Building a Destination Connector -We'll start with a Destination Connector because those are the easier to build. +We'll start with a Destination Connector because those are the easier to build than Source Connectors. In this case we'll use the Chroma vector database destination because: @@ -199,13 +198,17 @@ Let's take a quick look at the `upload_stage` in working directory: ``` chroma-working-dir - chunk + - f0987c36c3b0.json - embed + - dafc7add1d21.json - index + - a4a1035d57ed.json - partition + - 36caa9b04378.json - upload_stage - e17715933baf.json ``` -e17715933baf.json is a json file which is appropriate for this destination connector. But it could very well be a .csv if the uploader is a relational database. Or if the destination is blob(file) storage you +`e17715933baf.json` in the `upload_stage` is a `.json` file which is appropriate for this destination connector. But it could very well be a `.csv` if the uploader is a relational database. Or if the destination is blob(file) storage, like AWS S3, you may not need the Staging phase. The embed `.json` file would be uploaded directly. From e9a7f922ce910bf98c0a2a85d9e3f585040011f6 Mon Sep 17 00:00:00 2001 From: David Potter Date: Fri, 12 Jul 2024 09:38:04 -0700 Subject: [PATCH 09/20] little fixes --- unstructured/ingest/v2/README2.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index c2183522e0..10f0ba1000 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -65,7 +65,7 @@ local-working-dir - 36caa9b04378.json ``` -(Note that the index and partition file names are deterministic and based on the BLABLABLA) In the case of the local source connector, it won't *download* files because they are already local. But for other source connectors there will be a `download` folder. Also note that the final file is named based on the original file with a .json since it has been partitioned. Not all output files will be named the same as the input file. An example is a table as a file, the output will be BLABLABLA. +(Note that the index and partition file names are deterministic and based on the BLABLABLA) In the case of the local source connector, it won't *download* files because they are already local. But for other source connectors there will be a `download` folder. Also note that the final file is named based on the original file with a `.json` extension since it has been partitioned. Not all output files will be named the same as the input file. An example is a table as a source file, the output will be BLABLABLA. You can see the source/destination connector file that it runs here: @@ -96,7 +96,7 @@ If you look through the file you will notice these Classes (actually @dataclasse ## Building a Destination Connector -We'll start with a Destination Connector because those are the easier to build than Source Connectors. +We'll start with building a Destination Connector because those are the easier to build than Source Connectors. In this case we'll use the Chroma vector database destination because: From 3b708a130590977107532109e0f68a27f342bad4 Mon Sep 17 00:00:00 2001 From: David Potter Date: Fri, 12 Jul 2024 09:41:03 -0700 Subject: [PATCH 10/20] add links --- unstructured/ingest/v2/README2.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index 10f0ba1000..68a6abe89c 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -69,7 +69,7 @@ local-working-dir You can see the source/destination connector file that it runs here: -!!! LINK HERE !!! +https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/ingest/v2/processes/connectors/local.py If you look through the file you will notice these Classes (actually @dataclasses because BLABLABLA) and functions @@ -178,7 +178,7 @@ the service should now be running on port 8000 Let's look at the python file that it runs BLABLABLA -!!! link to file +https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/ingest/v2/processes/connectors/chroma.py * ChromaAccessConfig - Needed for connecting to Chroma. Usually sensitive attributes that will be hidden. From 6f340946ab88b423fca653b211393e987b689745 Mon Sep 17 00:00:00 2001 From: David Potter Date: Fri, 12 Jul 2024 09:58:33 -0700 Subject: [PATCH 11/20] new connector --- unstructured/ingest/v2/README2.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index 68a6abe89c..737939e668 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -210,6 +210,17 @@ chroma-working-dir ``` `e17715933baf.json` in the `upload_stage` is a `.json` file which is appropriate for this destination connector. But it could very well be a `.csv` if the uploader is a relational database. Or if the destination is blob(file) storage, like AWS S3, you may not need the Staging phase. The embed `.json` file would be uploaded directly. +When you make a **new** Destination Connector you will need these files first: + +`unstructured/ingest/v2/processes/connectors/your_connector.py` + +BLABLABLA + +And add to: + +`unstructured/ingest/v2/processes/connectors/__init__.py` + +BLABLABLA From 8a3bf3ac17779f74af5904c1ad30b396430059bc Mon Sep 17 00:00:00 2001 From: David Potter Date: Mon, 15 Jul 2024 10:31:51 -0700 Subject: [PATCH 12/20] destination rough done --- examples/ingest/sql/ingest.sh | 2 +- unstructured/ingest/v2/README2.md | 39 +++++++++++++++++++++++++++---- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/examples/ingest/sql/ingest.sh b/examples/ingest/sql/ingest.sh index 9ac064cfc9..09c279b066 100755 --- a/examples/ingest/sql/ingest.sh +++ b/examples/ingest/sql/ingest.sh @@ -10,7 +10,7 @@ cd "$SCRIPT_DIR"/../../.. || exit 1 PYTHONPATH=. ./unstructured/ingest/main.py \ local \ --input-path example-docs/book-war-and-peace-1225p.txt \ - --output-dir local-to-pinecone \ + --output-dir local-to-sql \ --strategy fast \ --chunking-strategy by_title \ --embedding-provider "" \ diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index 737939e668..0035a64b7f 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -194,6 +194,8 @@ https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/ingest/v2 * chroma_destination_entry - Registers the Chroma destination connector with the pipeline. (!!! LINK `unstructured/ingest/v2/processes/connectors/__init__.py`) +Note that the `chroma.py` file imports the official Chroma python package when it *creates* the client and not at the top of the file. This is so that BLABLABLA + Let's take a quick look at the `upload_stage` in working directory: ``` chroma-working-dir @@ -212,15 +214,42 @@ chroma-working-dir When you make a **new** Destination Connector you will need these files first: -`unstructured/ingest/v2/processes/connectors/your_connector.py` +* `unstructured/ingest/v2/processes/connectors/your_connector.py` +* And add that to: `unstructured/ingest/v2/processes/connectors/__init__.py` +* Your python file to iterate on development. You can call it `unstructured/ingest/v2/examples/example_your_connector.py` +* And some form of **live connection** to the Destination service. In the case of Chroma we have a local service running. Often we will run a docker container (Elasticsearch). At other times we will use a hosted service if there is no docker image (Pinecone). + +Once the connector is worked out with those files, you will need to add a few more files. + +* `unstructured/ingest/v2/cli/cmds/your_connector.py` +* Add that to: `unstructured/ingest/v2/cli/cmds/__init__.py` +* Makefile +* Manifest.in +* setup.py +* your_connector.in (to create the requirements file) +* Documentation + +The CLI file. This allows the connector to be run via the command line. All the arguments for the connector need to be exposed. + +`unstructured/ingest/v2/cli/cmds/your_connector.py` + + +### Intrgration Test +And lastly we need an executable .sh file that runs in CI/CD as an integration? test. + +`test_unstructured_ingest/dest/weaviate.sh` is a good example because it uses a Docker container to act as the Weaviate service. + +If you run `./test_unstructured_ingest/dest/weaviate.sh` from the root it will spin up a docker container. Create a blank `elements` collection based on the schema. Partition `fake-memo.pdf`. Embed the artifact with vector embeddings. Upload the artifact to the Weaviate vector database. And then it runs `/python/test-ingest-weaviate-output.py` which counts the number of embeddings that were loaded. + +In an ideal world, for a vector database destination, the test will also do a vector search and validate the results. (`scripts/elasticsearch-test-helpers/destination_connector/test-ingest-elasticsearch-output.py` is an example of this.) + +If you can run the integration test successfully then most of the files should be in order. + +## Building a Source Connector -BLABLABLA -And add to: -`unstructured/ingest/v2/processes/connectors/__init__.py` -BLABLABLA From e6cd58b97356cad323bbf2d193ee083d3d510a00 Mon Sep 17 00:00:00 2001 From: David Potter Date: Mon, 15 Jul 2024 12:35:43 -0700 Subject: [PATCH 13/20] add source connector instructions --- unstructured/ingest/v2/README2.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index 0035a64b7f..dd082b7658 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -247,6 +247,24 @@ If you can run the integration test successfully then most of the files should b ## Building a Source Connector +The Source Connector example we will use is `onedrive.py`. The S3 connector might be a simpler example, but it relies on the incredibly useful fsspec package. +https://filesystem-spec.readthedocs.io/en/latest/ +If your source connector can take advantage of fsspec, then S3 might be a good example. + + +The Source Connector is similar to the Destination Connector instructions above. + +But the key difference is the Indexer. The Indexer essentially gets a list of the documents/artifacts in the Source service. (in the case of a local connector it would be like a bash `ls` command). It then creates individual files for each artifact that need to be downloaded.This is so that the next phase, the Downloader phase, can be scaled out with multiple workers. The Indexer phase needs to return pointers to those artifacts in the shape of the FileData object, which it then downloads as `.json` files. + +The Downloader then uses the `.json` files that the Indexer created and downloads the raw files (in the case of a blob type file, .pdf, .txt) or as individual rows in a table, or any other needed format. + +Here are some of the file types it can download and partition. +https://github.com/Unstructured-IO/unstructured/blob/0c562d80503f6ef96504c6e38f27cfd9da8761df/unstructured/file_utils/filetype.py + +The Indexer files (resulting `.json` files in the index folder) also contain metadata that will be used to determine if the files have already been processed. + + + From 7a9280c8fc2f12e0c290428eb934dbe268d12884 Mon Sep 17 00:00:00 2001 From: David Potter Date: Mon, 15 Jul 2024 12:51:19 -0700 Subject: [PATCH 14/20] remove unneeded in readme 1 --- unstructured/ingest/v2/README.md | 5 ----- 1 file changed, 5 deletions(-) diff --git a/unstructured/ingest/v2/README.md b/unstructured/ingest/v2/README.md index e65b17bfa3..c11e2150f0 100644 --- a/unstructured/ingest/v2/README.md +++ b/unstructured/ingest/v2/README.md @@ -127,8 +127,3 @@ because if enabling async, that code will be run in a single process with the as with a multiprocessing pool fan out. If the underlying code is completely blocking but the async flag is enabled, this will run as if it's a normal for loop and will get worse performance than is simply run via multiprocessing. One option to help support IO heavy tasks that might not support async yet is wrapping it in a `run_in_executor()` call. Otherwise, it is common that the underlying SDKs have an async version to run the same network calls without blocking the event loop. - - -# Creating V2 Connectors - -As compared to V1 connectors, V2 has a few more steps. Most notably for Source Connectors the Index and Download steps have been separated. And in Destination connectors, there is a Staging step that saves the documents in a format that is ready to upload. Ideally the actual Upload step should be doing the minimum amount of processing before uploading. \ No newline at end of file From 3f3d7ac17ff0a67ed239c950d85dbb49e7dbc17d Mon Sep 17 00:00:00 2001 From: David Potter Date: Mon, 15 Jul 2024 14:31:15 -0700 Subject: [PATCH 15/20] cleaning up blablas --- unstructured/ingest/v2/README2.md | 66 +++++++++++++++++++++++-------- 1 file changed, 50 insertions(+), 16 deletions(-) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index dd082b7658..a828e50e2f 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -65,7 +65,7 @@ local-working-dir - 36caa9b04378.json ``` -(Note that the index and partition file names are deterministic and based on the BLABLABLA) In the case of the local source connector, it won't *download* files because they are already local. But for other source connectors there will be a `download` folder. Also note that the final file is named based on the original file with a `.json` extension since it has been partitioned. Not all output files will be named the same as the input file. An example is a table as a source file, the output will be BLABLABLA. +(Note that the index and partition file names are deterministic and based on the BLABLABLA) In the case of the local source connector, it won't *download* files because they are already local. But for other source connectors there will be a `download` folder. Also note that the final file is named based on the original file with a `.json` extension since it has been partitioned. Not all output files will be named the same as the input file. An example is a table as a source file, the output will be based on a hash of BLABLABLA. You can see the source/destination connector file that it runs here: @@ -77,21 +77,21 @@ If you look through the file you will notice these Classes (actually @dataclasse * LocalConnectionConfig - Username, host, port, etc. Anything needed for connecting to the service. It also imports the AccessConfig -* LocalIndexerConfig - Holds information +* LocalIndexerConfig - Holds arguments that allow Indexer to connect to the service and what kind of documents to filter for. -* LocalIndexer - BLABLABLA +* LocalIndexer - Does the actual file listing and filtering. Note that it yields a FileData object -* LocalDownloaderConfig +* LocalDownloaderConfig - In this case it doesn't need anything that the LocalIndexerConfig already provides. -* LocalDownloader +* LocalDownloader - Does the actual downloading of the raw files. -* LocalUploaderConfig +* LocalUploaderConfig - Arguments for upload location -* LocalUploader +* LocalUploader - Does the actual uploading -* local_source_entry +* local_source_entry - Used to register the source connector here: `unstructured/ingest/v2/processes/connectors/__init__.py` -* local_destination_entry +* local_destination_entry - Used to register the destination connector here: `unstructured/ingest/v2/processes/connectors/__init__.py` @@ -100,7 +100,7 @@ We'll start with building a Destination Connector because those are the easier t In this case we'll use the Chroma vector database destination because: -* The service can be hosted locally. !!! https://docs.trychroma.com/guides +* The service can be hosted locally. https://docs.trychroma.com/guides * We can show off the chunking and embedding step (used for vector database destinations) * It uses a staging step to prepare the artifacts before uploading * You can examine the Chroma database file easily since its just a sqlite database @@ -160,23 +160,23 @@ if __name__ == "__main__": ``` -Notice how the top part looks similar to the local connector running file. But now we are adding a chunker and an embedder. And the destination connection is for the Chroma uploader. Also note that there is a stager_config. This is where we prepare the document/artifact in a custom way before uploading BLABLABLA. +Notice how the top part looks similar to the local connector running file. But now we are adding a chunker and an embedder. And the destination connection is for the Chroma uploader. Also note that there is a stager_config. This is where we prepare the document/artifact in a custom way before running the Uploader. Let's run it. -* blabla make sure you have run `pip install "unstructured[chroma]"` +* Make sure your python environment is set up and then run `pip install "unstructured[chroma]"` -* in a separate terminal (with chroma installed) run +* In a separate terminal (with chroma installed) run `./scripts/chroma-test-helpers/create-and-check-chroma.sh chroma-db-file` the service should now be running on port 8000 -* `python chroma.py` +* Run your example file: `python chroma.py` * You can examine the resulting sqlite database (`chroma.sqlite3`) in the `chroma-db-file` directory if you want to see the results. -Let's look at the python file that it runs BLABLABLA +Let's look at the python file in the Unstructured repo https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/ingest/v2/processes/connectors/chroma.py @@ -210,7 +210,7 @@ chroma-working-dir - upload_stage - e17715933baf.json ``` -`e17715933baf.json` in the `upload_stage` is a `.json` file which is appropriate for this destination connector. But it could very well be a `.csv` if the uploader is a relational database. Or if the destination is blob(file) storage, like AWS S3, you may not need the Staging phase. The embed `.json` file would be uploaded directly. +`e17715933baf.json` in the `upload_stage` is a `.json` file which is appropriate for this destination connector. But it could very well be a `.csv` (or file of your choosing) if the uploader is a relational database. Or if the destination is blob(file) storage, like AWS S3, you may not need the Staging phase. The partitioned/embedded `.json` file would be uploaded directly. When you make a **new** Destination Connector you will need these files first: @@ -263,7 +263,41 @@ https://github.com/Unstructured-IO/unstructured/blob/0c562d80503f6ef96504c6e38f2 The Indexer files (resulting `.json` files in the index folder) also contain metadata that will be used to determine if the files have already been processed. +The file to use for iteration would look like this: +>*** This is the type of Python file you'll want to run during development so that you can iterate on your connector. + +`onedrive.py` + +``` +EXAMPLE COMING SOON +from unstructured.ingest.v2.interfaces import ProcessorConfig +from unstructured.ingest.v2.pipeline.pipeline import Pipeline +from unstructured.ingest.v2.processes.connectors.local import ( + LocalConnectionConfig, + LocalDownloaderConfig, + LocalIndexerConfig, + LocalUploaderConfig, +) + +from unstructured.ingest.v2.processes.partitioner import PartitionerConfig + +if __name__ == "__main__": + Pipeline.from_configs( + context=ProcessorConfig( + verbose=True, + work_dir="local-working-dir", + reprocess=True, + re_download=True, + ), + source_connection_config=LocalConnectionConfig(), + indexer_config=LocalIndexerConfig(input_path="example-docs/fake-text.txt"), + downloader_config=LocalDownloaderConfig(), + partitioner_config=PartitionerConfig(), + uploader_config=LocalUploaderConfig(output_dir="local-working-dir/output"), + ).run() +``` +To run this would require service credentials for Onedrive. So we will skip that part. But this still gives a good example of the kind of file you need to iterate with. From fa7db4a8406d8d4eec615ece955941e49c1fdd39 Mon Sep 17 00:00:00 2001 From: David Potter Date: Mon, 15 Jul 2024 14:45:00 -0700 Subject: [PATCH 16/20] add example --- unstructured/ingest/v2/README2.md | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index a828e50e2f..071b6527b5 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -270,34 +270,42 @@ The file to use for iteration would look like this: `onedrive.py` ``` -EXAMPLE COMING SOON +import os + from unstructured.ingest.v2.interfaces import ProcessorConfig from unstructured.ingest.v2.pipeline.pipeline import Pipeline from unstructured.ingest.v2.processes.connectors.local import ( - LocalConnectionConfig, - LocalDownloaderConfig, - LocalIndexerConfig, LocalUploaderConfig, ) - +from unstructured.ingest.v2.processes.connectors.onedrive import ( + OnedriveAccessConfig, + OnedriveConnectionConfig, + OnedriveDownloaderConfig, + OnedriveIndexerConfig, +) from unstructured.ingest.v2.processes.partitioner import PartitionerConfig if __name__ == "__main__": Pipeline.from_configs( context=ProcessorConfig( verbose=True, - work_dir="local-working-dir", + work_dir="onedrive-working-dir", reprocess=True, re_download=True, ), - source_connection_config=LocalConnectionConfig(), - indexer_config=LocalIndexerConfig(input_path="example-docs/fake-text.txt"), - downloader_config=LocalDownloaderConfig(), + indexer_config=OnedriveIndexerConfig(path="/utic-test-ingest-fixtures", recursive=True), + downloader_config=OnedriveDownloaderConfig(download_dir="onedrive-working-dir/download"), + source_connection_config=OnedriveConnectionConfig( + client_id=os.getenv("MS_CLIENT_ID"), + user_pname=os.getenv("MS_USER_PNAME"), + tenant=os.getenv("MS_TENANT_ID"), + access_config=OnedriveAccessConfig(client_cred=os.getenv("MS_CLIENT_CRED")), + ), partitioner_config=PartitionerConfig(), - uploader_config=LocalUploaderConfig(output_dir="local-working-dir/output"), + uploader_config=LocalUploaderConfig(output_dir="onedrive-working-dir/output"), ).run() ``` -To run this would require service credentials for Onedrive. So we will skip that part. But this still gives a good example of the kind of file you need to iterate with. +To run this would require service credentials for Onedrive. And we can't run a Docker container locally. So we will skip that part. But this still gives a good example of the kind of file you need to iterate with. From b094bfb9967175fc73d0f3a09a49178fc6c6b037 Mon Sep 17 00:00:00 2001 From: David Potter Date: Tue, 13 Aug 2024 07:21:52 -0700 Subject: [PATCH 17/20] finished readme --- unstructured/ingest/v2/README2.md | 38 +++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index 071b6527b5..0000476cb2 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -65,19 +65,19 @@ local-working-dir - 36caa9b04378.json ``` -(Note that the index and partition file names are deterministic and based on the BLABLABLA) In the case of the local source connector, it won't *download* files because they are already local. But for other source connectors there will be a `download` folder. Also note that the final file is named based on the original file with a `.json` extension since it has been partitioned. Not all output files will be named the same as the input file. An example is a table as a source file, the output will be based on a hash of BLABLABLA. +(Note that the index and partition file names are deterministic and based on the hash of the current step along with the previous hash.) In the case of the local source connector, it won't *download* files because they are already local. But for other source connectors there will be a `download` folder. Also note that the final file is named based on the original file with a `.json` extension since it has been partitioned. Not all output files will be named the same as the input file. An example is a table as a source file, the output will be based on a hash of BLABLABLA. You can see the source/destination connector file that it runs here: https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/ingest/v2/processes/connectors/local.py -If you look through the file you will notice these Classes (actually @dataclasses because BLABLABLA) and functions +If you look through the file you will notice these interfaces and functions * LocalAccessConfig - This usually holds passwords, tokens, etc. This data gets hidden in all logs (and encrypted in our platform solution) * LocalConnectionConfig - Username, host, port, etc. Anything needed for connecting to the service. It also imports the AccessConfig -* LocalIndexerConfig - Holds arguments that allow Indexer to connect to the service and what kind of documents to filter for. +* LocalIndexerConfig - Holds attributes that allow Indexer to connect to the service and what kind of documents to filter for. * LocalIndexer - Does the actual file listing and filtering. Note that it yields a FileData object @@ -194,7 +194,7 @@ https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/ingest/v2 * chroma_destination_entry - Registers the Chroma destination connector with the pipeline. (!!! LINK `unstructured/ingest/v2/processes/connectors/__init__.py`) -Note that the `chroma.py` file imports the official Chroma python package when it *creates* the client and not at the top of the file. This is so that BLABLABLA +Note that the `chroma.py` file imports the official Chroma python package when it *creates* the client and not at the top of the file. This allows the classes to be *instantiated* without error,They will raise a runtime error though if the imports are missing. Let's take a quick look at the `upload_stage` in working directory: ``` @@ -247,14 +247,14 @@ If you can run the integration test successfully then most of the files should b ## Building a Source Connector -The Source Connector example we will use is `onedrive.py`. The S3 connector might be a simpler example, but it relies on the incredibly useful fsspec package. +The Source Connector example we will use is `onedrive.py`. The S3 connector might be a simpler example, but it relies on the incredibly useful fsspec package, so it is not a good general example. https://filesystem-spec.readthedocs.io/en/latest/ -If your source connector can take advantage of fsspec, then S3 might be a good example. +If your source connector can take advantage of fsspec, then S3 might be one to check out. The Source Connector is similar to the Destination Connector instructions above. -But the key difference is the Indexer. The Indexer essentially gets a list of the documents/artifacts in the Source service. (in the case of a local connector it would be like a bash `ls` command). It then creates individual files for each artifact that need to be downloaded.This is so that the next phase, the Downloader phase, can be scaled out with multiple workers. The Indexer phase needs to return pointers to those artifacts in the shape of the FileData object, which it then downloads as `.json` files. +But the key difference is the Indexer. The Indexer essentially gets a list of the documents/artifacts in the Source service. (in the case of a local connector it would be like a bash `ls` command). It then creates individual files for each artifact that need to be downloaded. This is so that the next phase, the Downloader phase, can be scaled out with multiple workers. The Indexer phase needs to return pointers to those artifacts in the shape of the FileData object, which it then downloads as `.json` files. The Downloader then uses the `.json` files that the Indexer created and downloads the raw files (in the case of a blob type file, .pdf, .txt) or as individual rows in a table, or any other needed format. @@ -307,11 +307,35 @@ if __name__ == "__main__": ``` To run this would require service credentials for Onedrive. And we can't run a Docker container locally. So we will skip that part. But this still gives a good example of the kind of file you need to iterate with. +Let's look at the python file in the Unstructured repo + +https://github.com/Unstructured-IO/unstructured-ingest/blob/main/unstructured_ingest/v2/processes/connectors/onedrive.py + +If you look through the file you will notice these interfaces and functions + +* OnedriveAccessConfig - Holds client credentials. This data gets hidden in all logs (and encrypted in our platform solution) + +* OnedriveConnectionConfig - Client id, etc. Anything needed for connecting to the service. It also imports the AccessConfig. Notice the `@requires_dependencies` decorator which imports the microsoft `msal` package for that method. + +* OnedriveIndexerConfig - Holds attributes that allow Indexer to connect to the service. Since Onedrive has a folder structure we allow `recursive` indexing. It will go down into all the folders of the `path`. +* OnedriveIndexer - Does the actual file listing. Note that it yields a FileData object + +* OnedriveDownloaderConfig - In this case it doesn't need anything that the OndriveIndexerConfig already provides. + +* OnedriveDownloader - Does the actual downloading of the raw files. + +And those are the basics of a Source Connector. Each connector will have its specific problems to sort out. + +### Intrgration Test +We need a test to run in the CI/CD. See the Chroma integration test section above. +## Conclusion +Building a connector is relatively straightforward, especially if there is an existing connector that closely matches the new one. For example, most of the vector destinations are quite similar. The difference was mostly how to prep the data to be uploaded in the necessary schema. +If you have any questions post in the public Slack channel `ask-for-help-open-source-library` From 0cae6d0ce58bb2b19a62f47be1126b1a1ef17a3f Mon Sep 17 00:00:00 2001 From: David Potter Date: Tue, 13 Aug 2024 07:43:51 -0700 Subject: [PATCH 18/20] add diagram --- unstructured/ingest/v2/README2.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index 0000476cb2..57b5b5c89f 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -1,6 +1,6 @@ # Developing V2 Connectors ## Intro -The Unstructured open source repo processes documents (artifacts) in a pipeline. The Source and Destination connectors sit at the front and back of the pipeline. For more details see below (COMING SOON). +The Unstructured open source repo processes documents (artifacts) in a pipeline. The Source and Destination connectors sit at the front and back of the pipeline. For a visual example see the flow diagram at the bottom (link to bottom). ## Simplest Example of a Pipeline The simplest example of a pipeline would start with a local source connector, followed by a partioner, and then end with a local destination connector. Here is what the code to run this would look like: @@ -337,7 +337,11 @@ Building a connector is relatively straightforward, especially if there is an ex If you have any questions post in the public Slack channel `ask-for-help-open-source-library` +### Sequence Diagram +Yellow (without the Uncompressing) represents the steps in a Source Connector. Orange represents a Destination Connector. + +![unstructured ingest diagram](assets/pipeline.png) From 129abd0a14c51194824c3b7f7fad9623b7cbfa54 Mon Sep 17 00:00:00 2001 From: David Potter Date: Tue, 13 Aug 2024 08:09:52 -0700 Subject: [PATCH 19/20] fix small errrors --- unstructured/ingest/v2/README2.md | 38 ++++++++++++++++++------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index 57b5b5c89f..4b488a2e9d 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -3,7 +3,7 @@ The Unstructured open source repo processes documents (artifacts) in a pipeline. The Source and Destination connectors sit at the front and back of the pipeline. For a visual example see the flow diagram at the bottom (link to bottom). ## Simplest Example of a Pipeline -The simplest example of a pipeline would start with a local source connector, followed by a partioner, and then end with a local destination connector. Here is what the code to run this would look like: +The simplest example of a pipeline starts with a local source connector, followed by a partioner, and then ends with a local destination connector. Here is what the code to run this looks like: >*** This is the type of Python file you'll want to run during development so that you can iterate on your connector. @@ -37,7 +37,7 @@ if __name__ == "__main__": ``` You can run this with `python local.py` (Adjust the `input_path` and `output_dir` as appropriate.) -The result would be a partitioned `fake-text.txt.json` file in the `local-output` directory. +The result is a partitioned `fake-text.txt.json` file in the `local-output` directory. @@ -47,7 +47,7 @@ Notice that the pipeline runs the following: * source_connection - Takes arguments needed to connect to the source. Local files don't need anything here. Other connectors will. * indexer - Takes the files in the `input_path` and creates .json files that point the downloader step to the right files * downloader - This does the actual downloading of the raw files (for non-blob files it may do something different like create a .txt file for every row in a source table) -* partitioner - Partitions the downloaded file provided it is a partionable file type. +* partitioner - Partitions the downloaded file, provided it is a partionable file type. ([link to file types supported](https://github.com/Unstructured-IO/unstructured/blob/0c562d80503f6ef96504c6e38f27cfd9da8761df/unstructured/file_utils/filetype.py)) * chunker/embedder - *Not represented here* but often needed to prepare files for upload to a vector database. * stager - *Not represented here* but is often used to prepare partitioned files for upload. * uploader - Uploads the blob-like files to the `output_dir`. @@ -65,7 +65,7 @@ local-working-dir - 36caa9b04378.json ``` -(Note that the index and partition file names are deterministic and based on the hash of the current step along with the previous hash.) In the case of the local source connector, it won't *download* files because they are already local. But for other source connectors there will be a `download` folder. Also note that the final file is named based on the original file with a `.json` extension since it has been partitioned. Not all output files will be named the same as the input file. An example is a table as a source file, the output will be based on a hash of BLABLABLA. +(Note that the index and partition file names are deterministic and based on the hash of the current step along with the previous step's hash.) In the case of the local source connector, it won't *download* files because they are already local. But for other source connectors there will be a `download` folder. Also note that the final file is named based on the original file with a `.json` extension since it has been partitioned. Not all output files will be named the same as the input file. This is the case for database like sources. You can see the source/destination connector file that it runs here: @@ -79,9 +79,9 @@ If you look through the file you will notice these interfaces and functions * LocalIndexerConfig - Holds attributes that allow Indexer to connect to the service and what kind of documents to filter for. -* LocalIndexer - Does the actual file listing and filtering. Note that it yields a FileData object +* LocalIndexer - Does the actual file listing and filtering. Note that it yields a FileData object. -* LocalDownloaderConfig - In this case it doesn't need anything that the LocalIndexerConfig already provides. +* LocalDownloaderConfig - In this case it doesn't need anything if the LocalIndexerConfig already provides it. * LocalDownloader - Does the actual downloading of the raw files. @@ -96,7 +96,7 @@ If you look through the file you will notice these interfaces and functions ## Building a Destination Connector -We'll start with building a Destination Connector because those are the easier to build than Source Connectors. +We'll start with building a Destination Connector because they are easier to build than Source Connectors. In this case we'll use the Chroma vector database destination because: @@ -160,7 +160,7 @@ if __name__ == "__main__": ``` -Notice how the top part looks similar to the local connector running file. But now we are adding a chunker and an embedder. And the destination connection is for the Chroma uploader. Also note that there is a stager_config. This is where we prepare the document/artifact in a custom way before running the Uploader. +Notice how the top part looks similar to the local connector running file. (link to local connector file above) But now we are adding a **chunker** and an **embedder**. Also note that there is a stager_config. This is where we prepare the document/artifact in a custom way before running the Uploader. Let's run it. @@ -182,15 +182,15 @@ https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/ingest/v2 * ChromaAccessConfig - Needed for connecting to Chroma. Usually sensitive attributes that will be hidden. -* ChromaConnectionConfig - Non sensitive attributes. `collection_name` does not have a default value. `access_config` imports the ChromaAccessConfig and hides the values via `enhanced_field(sensitive=True)` +* ChromaConnectionConfig - Non sensitive attributes needed to connect to the client. `collection_name` does not have a default value. `access_config` imports the ChromaAccessConfig and hides the values via `enhanced_field(sensitive=True)` * ChromaUploadStagerConfig - The Stager config. Didn't need anything for Chroma. -* ChromaUploadStager - The conform_dict is the critical method here. It takes the file we get from the Embedder step and prepares it for upload to the Chroma database. But it does not upload it. It saves the file to the `upload_stage` directory. The file type can be whatever makes sense for the Uploader phase. +* ChromaUploadStager - The conform_dict is the critical method here. It takes the file we get from the Embedder step and prepares it for upload to the Chroma database. But it does not upload it. It saves the file to the `upload_stage` directory. The file type can be whatever makes sense for the Uploader phase (.json, .csv, .txt). * ChromaUploaderConfig - Attributes that are necessary for the upload stage specifically. The ChromaUploader will be upserting artifacts in batches. -* ChromaUploader - Connects to the Client. And uploads artifacts. Note that it does the minimum amount of processing possible to the artifacts before uploading. The Stager phase is responsible for preparing artifacts. Chroma wants artifacts in a dictionary of lists so we do have to create that in the Uploader since there is not a practical way to store that in a file. +* ChromaUploader - Connects to the Client. And uploads artifacts. Note that it does the minimum amount of processing possible to the artifacts before uploading. The Stager phase is responsible for preparing artifacts. Chroma wants artifacts in a dictionary of lists so we do have to create that in the Uploader since there is not a practical way to store that in a .json file. * chroma_destination_entry - Registers the Chroma destination connector with the pipeline. (!!! LINK `unstructured/ingest/v2/processes/connectors/__init__.py`) @@ -212,6 +212,8 @@ chroma-working-dir ``` `e17715933baf.json` in the `upload_stage` is a `.json` file which is appropriate for this destination connector. But it could very well be a `.csv` (or file of your choosing) if the uploader is a relational database. Or if the destination is blob(file) storage, like AWS S3, you may not need the Staging phase. The partitioned/embedded `.json` file would be uploaded directly. +### Additional files + When you make a **new** Destination Connector you will need these files first: * `unstructured/ingest/v2/processes/connectors/your_connector.py` @@ -252,9 +254,9 @@ https://filesystem-spec.readthedocs.io/en/latest/ If your source connector can take advantage of fsspec, then S3 might be one to check out. -The Source Connector is similar to the Destination Connector instructions above. +The Source Connector instructions are similar to the Destination Connector above. -But the key difference is the Indexer. The Indexer essentially gets a list of the documents/artifacts in the Source service. (in the case of a local connector it would be like a bash `ls` command). It then creates individual files for each artifact that need to be downloaded. This is so that the next phase, the Downloader phase, can be scaled out with multiple workers. The Indexer phase needs to return pointers to those artifacts in the shape of the FileData object, which it then downloads as `.json` files. +But the key difference is the Indexer. The Indexer essentially gets a list of the documents/artifacts in the Source service. (In the case of a local connector it would be like a bash `ls` command). It then creates individual files for each artifact that need to be downloaded. This is so that the next phase, the Downloader phase, can be scaled out with multiple workers. The Indexer phase needs to return pointers to those artifacts in the shape of the FileData object, which it then downloads as `.json` files. The Downloader then uses the `.json` files that the Indexer created and downloads the raw files (in the case of a blob type file, .pdf, .txt) or as individual rows in a table, or any other needed format. @@ -307,7 +309,7 @@ if __name__ == "__main__": ``` To run this would require service credentials for Onedrive. And we can't run a Docker container locally. So we will skip that part. But this still gives a good example of the kind of file you need to iterate with. -Let's look at the python file in the Unstructured repo +Let's look at the source connector file that it runs. https://github.com/Unstructured-IO/unstructured-ingest/blob/main/unstructured_ingest/v2/processes/connectors/onedrive.py @@ -327,13 +329,17 @@ If you look through the file you will notice these interfaces and functions And those are the basics of a Source Connector. Each connector will have its specific problems to sort out. +### Additional Files + +See the additional files above. (link to section) + ### Intrgration Test -We need a test to run in the CI/CD. See the Chroma integration test section above. +We need a test to run in the CI/CD. See the Chroma integration test section above. (link to section) ## Conclusion -Building a connector is relatively straightforward, especially if there is an existing connector that closely matches the new one. For example, most of the vector destinations are quite similar. The difference was mostly how to prep the data to be uploaded in the necessary schema. +Building a connector is relatively straightforward, especially if there is an existing connector that closely matches the new one. For example, most of the vector destinations are quite similar. If you have any questions post in the public Slack channel `ask-for-help-open-source-library` From 26671f8d7c433100e61dc512974209f4d2803c27 Mon Sep 17 00:00:00 2001 From: David Potter Date: Mon, 30 Sep 2024 14:31:34 -0700 Subject: [PATCH 20/20] make suggested fixes --- unstructured/ingest/v2/README2.md | 38 +++++++++++++++---------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/unstructured/ingest/v2/README2.md b/unstructured/ingest/v2/README2.md index 4b488a2e9d..a9cc577de2 100644 --- a/unstructured/ingest/v2/README2.md +++ b/unstructured/ingest/v2/README2.md @@ -1,9 +1,9 @@ # Developing V2 Connectors ## Intro -The Unstructured open source repo processes documents (artifacts) in a pipeline. The Source and Destination connectors sit at the front and back of the pipeline. For a visual example see the flow diagram at the bottom (link to bottom). +The Unstructured open source library processes documents (artifacts) in a pipeline. The source and destination connectors sit at the front and back of the pipeline. For a visual example see the [sequence diagram](#sequence-diagram). -## Simplest Example of a Pipeline -The simplest example of a pipeline starts with a local source connector, followed by a partioner, and then ends with a local destination connector. Here is what the code to run this looks like: +## Basic Example of a Pipeline +The most basic example of a pipeline starts with a local source connector, followed by a partitioner, and then ends with a local destination connector. Here is what the code to run this looks like: >*** This is the type of Python file you'll want to run during development so that you can iterate on your connector. @@ -43,11 +43,11 @@ The result is a partitioned `fake-text.txt.json` file in the `local-output` dire Notice that the pipeline runs the following: -* context - The ProcessorConfig runs the pipeline. The arguments are related to the overall pipeline. We added some optional args to make development easier. +* context - The `ProcessorConfig` runs the pipeline. The arguments are related to the overall pipeline. We added some args (`verbose`, `work_dir`) to make development easier. * source_connection - Takes arguments needed to connect to the source. Local files don't need anything here. Other connectors will. -* indexer - Takes the files in the `input_path` and creates .json files that point the downloader step to the right files -* downloader - This does the actual downloading of the raw files (for non-blob files it may do something different like create a .txt file for every row in a source table) -* partitioner - Partitions the downloaded file, provided it is a partionable file type. ([link to file types supported](https://github.com/Unstructured-IO/unstructured/blob/0c562d80503f6ef96504c6e38f27cfd9da8761df/unstructured/file_utils/filetype.py)) +* indexer - Takes the files in the `input_path` and creates `.json` files that point the downloader step to the right files. +* downloader - This does the actual downloading of the raw files (for non-blob files it may do something different like create a `.txt` file for every row in a source table) +* partitioner - Partitions the downloaded file, provided it is a partionable file type. [See the supported file types.](https://github.com/Unstructured-IO/unstructured/blob/0c562d80503f6ef96504c6e38f27cfd9da8761df/unstructured/file_utils/filetype.py) * chunker/embedder - *Not represented here* but often needed to prepare files for upload to a vector database. * stager - *Not represented here* but is often used to prepare partitioned files for upload. * uploader - Uploads the blob-like files to the `output_dir`. @@ -73,9 +73,9 @@ https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/ingest/v2 If you look through the file you will notice these interfaces and functions -* LocalAccessConfig - This usually holds passwords, tokens, etc. This data gets hidden in all logs (and encrypted in our platform solution) +* LocalAccessConfig - This usually holds passwords, tokens, etc. This data gets hidden in all logs (and encrypted in our platform solution). -* LocalConnectionConfig - Username, host, port, etc. Anything needed for connecting to the service. It also imports the AccessConfig +* LocalConnectionConfig - Username, host, port, etc. Anything needed for connecting to the service. It also imports the `AccessConfig`. * LocalIndexerConfig - Holds attributes that allow Indexer to connect to the service and what kind of documents to filter for. @@ -96,7 +96,7 @@ If you look through the file you will notice these interfaces and functions ## Building a Destination Connector -We'll start with building a Destination Connector because they are easier to build than Source Connectors. +We'll start with building a destination connector because they are easier to build than source connectors. In this case we'll use the Chroma vector database destination because: @@ -186,11 +186,11 @@ https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/ingest/v2 * ChromaUploadStagerConfig - The Stager config. Didn't need anything for Chroma. -* ChromaUploadStager - The conform_dict is the critical method here. It takes the file we get from the Embedder step and prepares it for upload to the Chroma database. But it does not upload it. It saves the file to the `upload_stage` directory. The file type can be whatever makes sense for the Uploader phase (.json, .csv, .txt). +* ChromaUploadStager - The conform_dict is the critical method here. It takes the file we get from the Embedder step and prepares it for upload to the Chroma database. But it does not upload it. It saves the file to the `upload_stage` directory. The file type can be whatever makes sense for the Uploader phase (`.json`, `.csv`, `.txt`). * ChromaUploaderConfig - Attributes that are necessary for the upload stage specifically. The ChromaUploader will be upserting artifacts in batches. -* ChromaUploader - Connects to the Client. And uploads artifacts. Note that it does the minimum amount of processing possible to the artifacts before uploading. The Stager phase is responsible for preparing artifacts. Chroma wants artifacts in a dictionary of lists so we do have to create that in the Uploader since there is not a practical way to store that in a .json file. +* ChromaUploader - Connects to the Client. And uploads artifacts. Note that it does the minimum amount of processing possible to the artifacts before uploading. The Stager phase is responsible for preparing artifacts. Chroma wants artifacts in a dictionary of lists so we do have to create that in the Uploader since there is not a practical way to store that in a `.json` file. * chroma_destination_entry - Registers the Chroma destination connector with the pipeline. (!!! LINK `unstructured/ingest/v2/processes/connectors/__init__.py`) @@ -214,12 +214,12 @@ chroma-working-dir ### Additional files -When you make a **new** Destination Connector you will need these files first: +When you make a **new** destination connector you will need these files first: * `unstructured/ingest/v2/processes/connectors/your_connector.py` * And add that to: `unstructured/ingest/v2/processes/connectors/__init__.py` * Your python file to iterate on development. You can call it `unstructured/ingest/v2/examples/example_your_connector.py` -* And some form of **live connection** to the Destination service. In the case of Chroma we have a local service running. Often we will run a docker container (Elasticsearch). At other times we will use a hosted service if there is no docker image (Pinecone). +* And some form of **live connection** to the destination service. In the case of Chroma we have a local service running. Often we will run a docker container (Elasticsearch). At other times we will use a hosted service if there is no docker image (Pinecone). Once the connector is worked out with those files, you will need to add a few more files. @@ -249,14 +249,14 @@ If you can run the integration test successfully then most of the files should b ## Building a Source Connector -The Source Connector example we will use is `onedrive.py`. The S3 connector might be a simpler example, but it relies on the incredibly useful fsspec package, so it is not a good general example. +The source connector example we will use is `onedrive.py`. The S3 connector might be a simpler example, but it relies on the incredibly useful fsspec package, so it is not a good general example. https://filesystem-spec.readthedocs.io/en/latest/ If your source connector can take advantage of fsspec, then S3 might be one to check out. -The Source Connector instructions are similar to the Destination Connector above. +The source connector instructions are similar to the destination connector above. -But the key difference is the Indexer. The Indexer essentially gets a list of the documents/artifacts in the Source service. (In the case of a local connector it would be like a bash `ls` command). It then creates individual files for each artifact that need to be downloaded. This is so that the next phase, the Downloader phase, can be scaled out with multiple workers. The Indexer phase needs to return pointers to those artifacts in the shape of the FileData object, which it then downloads as `.json` files. +But the key difference is the Indexer. The Indexer essentially gets a list of the documents/artifacts in the source service. (In the case of a local connector it would be like a bash `ls` command). It then creates individual files for each artifact that need to be downloaded. This is so that the next phase, the Downloader phase, can be scaled out with multiple workers. The Indexer phase needs to return pointers to those artifacts in the shape of the FileData object, which it then downloads as `.json` files. The Downloader then uses the `.json` files that the Indexer created and downloads the raw files (in the case of a blob type file, .pdf, .txt) or as individual rows in a table, or any other needed format. @@ -327,7 +327,7 @@ If you look through the file you will notice these interfaces and functions * OnedriveDownloader - Does the actual downloading of the raw files. -And those are the basics of a Source Connector. Each connector will have its specific problems to sort out. +And those are the basics of a source connector. Each connector will have its specific problems to sort out. ### Additional Files @@ -345,7 +345,7 @@ If you have any questions post in the public Slack channel `ask-for-help-open-so ### Sequence Diagram -Yellow (without the Uncompressing) represents the steps in a Source Connector. Orange represents a Destination Connector. +Yellow (without the Uncompressing) represents the steps in a source connector. Orange represents a destination connector. ![unstructured ingest diagram](assets/pipeline.png)