Skip to content

Commit

Permalink
Merge pull request #31 from scale-vector/quickstart_next_iteration
Browse files Browse the repository at this point in the history
further simplifying the getting started experience
  • Loading branch information
TyDunn authored Jun 28, 2022
2 parents a533e76 + 922072c commit 598f0f9
Showing 1 changed file with 45 additions and 145 deletions.
190 changes: 45 additions & 145 deletions QUICKSTART.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,202 +6,94 @@

*Please open a pull request [here](https://github.com/scale-vector/dlt/edit/master/QUICKSTART.md) if there is something you can improve about this quickstart.*

## 1. Grab the demo
## Grab the demo

a. Clone the example repository:
Clone the example repository:
```
git clone https://github.com/scale-vector/dlt-quickstart-example.git
```

b. Enter the directory:
Enter the directory:
```
cd dlt-quickstart-example
```

c. Open the files in your favorite IDE / text editor:
Open the files in your favorite IDE / text editor:
- `data.json` (i.e. the JSON document you will load)
- `credentials.json` (i.e. contains the credentials to our demo Google BigQuery warehouse)
- `quickstart.py` (i.e. the script that uses DLT)

## 2. Set up a virtual environment
## Set up a virtual environment

a. Ensure you are using either Python 3.8 or 3.9:
Ensure you are using either Python 3.8 or 3.9:
```
python3 --version
```

b. Create a new virtual environment:
Create a new virtual environment:
```
python3 -m venv ./env
```

c. Activate the virtual environment:
Activate the virtual environment:
```
source ./env/bin/activate
```

## 3. Install DLT and support for the target data warehouse
## Install DLT and support for the target data warehouse

a. Install DLT using pip:
Install DLT using pip:
```
pip3 install python-dlt
pip3 install -U python-dlt
```

b. Install support for Google BigQuery:
Install support for Google BigQuery:
```
pip3 install python-dlt[gcp]
pip3 install -U python-dlt[gcp]
```

## 4. Configure DLT
## Understanding the code

a. Import necessary libaries
```
import base64
import json
from dlt.common.utils import uniq_id
from dlt.pipeline import Pipeline, GCPPipelineCredentials
```

b. Create a unique prefix for your demo Google BigQuery table
```
schema_prefix = 'demo_' + uniq_id()[:4]
```

c. Name your schema
```
schema_name = 'example'
```

d. Name your table
```
parent_table = 'json_doc'
```

e. Specify your schema file location
```
schema_file_path = 'schema.yml'
```

f. Load credentials
```
with open('credentials.json', 'r', encoding="utf-8") as f:
gcp_credentials_json = json.load(f)
# Private key needs to be decoded (because we don't want to store it as plain text)
gcp_credentials_json["private_key"] = bytes([_a ^ _b for _a, _b in zip(base64.b64decode(gcp_credentials_json["private_key"]), b"quickstart-sv"*150)]).decode("utf-8")
credentials = GCPPipelineCredentials.from_services_dict(gcp_credentials_json, schema_prefix)
```

## 5. Create a DLT pipeline

a. Instantiate a pipeline
```
pipeline = Pipeline(schema_name)
```

b. Create the pipeline with your credentials
```
pipeline.create_pipeline(credentials)
```

## 6. Load the data from the JSON document

a. Load JSON document into a dictionary
```
with open('data.json', 'r', encoding="utf-8") as f:
data = json.load(f)
```

## 7. Pass the data to the DLT pipeline

a. Extract the dictionary into a table
```
pipeline.extract(iter(data), table_name=parent_table)
```
1. Configure DLT

b. Unpack the pipeline into a relational structure
```
pipeline.unpack()
```
2. Create a DLT pipeline

c. Save schema to `schema.yml` file
```
schema = pipeline.get_default_schema()
schema_yaml = schema.as_yaml(remove_default=True)
with open(schema_file_path, 'w', encoding="utf-8") as f:
f.write(schema_yaml)
```
3. Load the data from the JSON document

4. Pass the data to the DLT pipeline

## 8. Use DLT to load the data
5. Use DLT to load the data

a. Load
```
pipeline.load()
```
## Running the code

b. Make sure there are no errors
```
completed_loads = pipeline.list_completed_loads()
# print(completed_loads)
# now enumerate all complete loads if we have any failed packages
# complete but failed job will not raise any exceptions
for load_id in completed_loads:
print(f"Checking failed jobs in {load_id}")
for job, failed_message in pipeline.list_failed_jobs(load_id):
print(f"JOB: {job}\nMSG: {failed_message}")
```
Run the script:

c. Run the script:
```
python3 quickstart.py
```

d. Inspect `schema.yml` that has been generated:
Inspect `schema.yml` that has been generated:
```
vim schema.yml
```

## 9. Query the Google BigQuery table

a. Run SQL queries
```
def run_query(query):
df = c._execute_sql(query)
print(query)
print(list(df))
print()
with pipeline.sql_client() as c:
# Query table for parents
query = f"SELECT * FROM `{schema_prefix}_example.json_doc`"
run_query(query)
See results of querying the Google BigQuery table:

# Query table for children
query = f"SELECT * FROM `{schema_prefix}_example.json_doc__children` LIMIT 1000"
run_query(query)
`json_doc` table

# Join previous two queries via auto generated keys
query = f"""
select p.name, p.age, p.id as parent_id,
c.name as child_name, c.id as child_id, c._dlt_list_idx as child_order_in_list
from `{schema_prefix}_example.json_doc` as p
left join `{schema_prefix}_example.json_doc__children` as c
on p._dlt_id = c._dlt_parent_id
"""
run_query(query)
```

b. See results like the following

table: json_doc
SELECT * FROM `{schema_prefix}_example.json_doc`
```
```
{ "name": "Ana", "age": "30", "id": "456", "_dlt_load_id": "1654787700.406905", "_dlt_id": "5b018c1ba3364279a0ca1a231fbd8d90"}
{ "name": "Bob", "age": "30", "id": "455", "_dlt_load_id": "1654787700.406905", "_dlt_id": "afc8506472a14a529bf3e6ebba3e0a9e"}
```

table: json_doc__children
`json_doc__children` table

```
SELECT * FROM `{schema_prefix}_example.json_doc__children` LIMIT 1000
```
```
# {"name": "Bill", "id": "625", "_dlt_parent_id": "5b018c1ba3364279a0ca1a231fbd8d90", "_dlt_list_idx": "0", "_dlt_root_id": "5b018c1ba3364279a0ca1a231fbd8d90",
# "_dlt_id": "7993452627a98814cc7091f2c51faf5c"}
Expand All @@ -213,20 +105,28 @@ table: json_doc__children
# "_dlt_id": "d18172353fba1a492c739a7789a786cf"}
```

SQL result:
Joining the two tables above on autogenerated keys (i.e. `p._record_hash = c._parent_hash`)

```
select p.name, p.age, p.id as parent_id,
c.name as child_name, c.id as child_id, c._dlt_list_idx as child_order_in_list
from `{schema_prefix}_example.json_doc` as p
left join `{schema_prefix}_example.json_doc__children` as c
on p._dlt_id = c._dlt_parent_id
```
```
# { "name": "Ana", "age": "30", "parent_id": "456", "child_name": "Bill", "child_id": "625", "child_order_in_list": "0"}
# { "name": "Ana", "age": "30", "parent_id": "456", "child_name": "Elli", "child_id": "591", "child_order_in_list": "1"}
# { "name": "Bob", "age": "30", "parent_id": "455", "child_name": "Bill", "child_id": "625", "child_order_in_list": "0"}
# { "name": "Bob", "age": "30", "parent_id": "455", "child_name": "Dave", "child_id": "621", "child_order_in_list": "1"}
```

## 10. Next steps
## Next steps

a. Replace `data.json` with data you want to explore
1. Replace `data.json` with data you want to explore

b. Check that the inferred types are correct in `schema.yml`
2. Check that the inferred types are correct in `schema.yml`

c. Set up your own Google BigQuery warehouse (and replace the credentials)
3. Set up your own Google BigQuery warehouse (and replace the credentials)

d. Use this new clean staging layer as the starting point for a semantic layer / analytical model (e.g. using dbt)
4. Use this new clean staging layer as the starting point for a semantic layer / analytical model (e.g. using dbt)

0 comments on commit 598f0f9

Please sign in to comment.