From 16cabaf19d6bfc94f22b4e863063e4dbc2cbd09f Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Mon, 29 Apr 2024 23:38:08 -0700 Subject: [PATCH] Goldsky asset ingest (#1321) * many goldsky changes * ensure working goldsky asset * clean up --- .dockerignore | 30 +- docker/cloudquery-ts-base.Dockerfile | 2 +- ops/tf-modules/warehouse-cluster/main.tf | 12 +- poetry.lock | 573 +++++++++++++- pyproject.toml | 7 +- warehouse/oso_dagster/__init__.py | 45 -- warehouse/oso_dagster/assets.py | 378 ++------- warehouse/oso_dagster/cbt/__init__.py | 25 + warehouse/oso_dagster/definitions.py | 47 ++ warehouse/oso_dagster/goldsky.py | 927 ++++++++++------------- warehouse/oso_dagster/goldsky_dask.py | 195 +++++ warehouse/oso_dagster/models/dedupe.sql | 3 + 12 files changed, 1318 insertions(+), 926 deletions(-) create mode 100644 warehouse/oso_dagster/cbt/__init__.py create mode 100644 warehouse/oso_dagster/definitions.py create mode 100644 warehouse/oso_dagster/goldsky_dask.py create mode 100644 warehouse/oso_dagster/models/dedupe.sql diff --git a/.dockerignore b/.dockerignore index 853df907d..9cbecdaee 100644 --- a/.dockerignore +++ b/.dockerignore @@ -20,13 +20,13 @@ coverage/ .eslintcache # builds -out/ -build/ -dist/ -data/ -.turbo/ -.next/ -.docusaurus/ +**/out/ +**/build/ +**/dist/ +**/data/ +**/.turbo/ +**/.next/ +**/.docusaurus/ # files .DS_Store @@ -36,6 +36,12 @@ data/ *.log coverage.json +**/.env +**/.next +**/.meltano + +**/.venv + # typescript *.tsbuildinfo next-env.d.ts @@ -46,9 +52,11 @@ graph/.test.subgraph.yaml # playwright playwright-report/ node_modules +target .pnpm-store test_only.* **/test_only.* +**/test_only Dockerfile *.Dockerfile **/Dockerfile @@ -60,11 +68,13 @@ Dockerfile **/__pycache__ # dbt -target/ -dbt_packages/ +**/target/ +**/dbt_packages/ # Cloudquery .cq/ +**/.cq # Github directory -.github/scripts \ No newline at end of file +.github/scripts +.git \ No newline at end of file diff --git a/docker/cloudquery-ts-base.Dockerfile b/docker/cloudquery-ts-base.Dockerfile index 44dbf18a4..e5e477c08 100644 --- a/docker/cloudquery-ts-base.Dockerfile +++ b/docker/cloudquery-ts-base.Dockerfile @@ -1,4 +1,4 @@ -FROM node:18 as build +FROM node:20 as build RUN npm install -g pnpm@^9.0.0 diff --git a/ops/tf-modules/warehouse-cluster/main.tf b/ops/tf-modules/warehouse-cluster/main.tf index 78740dafb..8d3345ab4 100644 --- a/ops/tf-modules/warehouse-cluster/main.tf +++ b/ops/tf-modules/warehouse-cluster/main.tf @@ -7,13 +7,13 @@ locals { node_pools = concat([ { name = "${var.cluster_name}-default-node-pool" - machine_type = "e2-medium" + machine_type = "e2-standard-2" node_locations = join(",", var.cluster_zones) min_count = 0 max_count = 3 local_ssd_count = 0 spot = false - disk_size_gb = 75 + disk_size_gb = 50 disk_type = "pd-standard" image_type = "COS_CONTAINERD" enable_gcfs = false @@ -25,6 +25,7 @@ locals { preemptible = false initial_node_count = 1 }, + # The spot pool is for workloads that need spot { name = "${var.cluster_name}-spot-node-pool" machine_type = "n1-standard-16" @@ -45,9 +46,10 @@ locals { preemptible = false initial_node_count = 0 }, + # The preemptible pool should be used only if spot can't be used { name = "${var.cluster_name}-preemptible-node-pool" - machine_type = "n1-standard-64" + machine_type = "n1-standard-16" node_locations = join(",", var.cluster_zones) min_count = 0 max_count = 16 @@ -93,14 +95,14 @@ locals { { key = "pool_type" value = "spot" - effect = "PREFER_NO_SCHEDULE" + effect = "NO_SCHEDULE" }, ] "${var.cluster_name}-preemptible-node-pool" = [ { key = "pool_type" value = "preemptible" - effect = "PREFER_NO_SCHEDULE" + effect = "NO_SCHEDULE" }, ] }, var.extra_node_taints) diff --git a/poetry.lock b/poetry.lock index cf62b4acb..26c863359 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "agate" @@ -255,6 +255,20 @@ six = ">=1.12.0" astroid = ["astroid (>=1,<2)", "astroid (>=2,<4)"] test = ["astroid (>=1,<2)", "astroid (>=2,<4)", "pytest"] +[[package]] +name = "asyncache" +version = "0.3.1" +description = "Helpers to use cachetools with async code." +optional = false +python-versions = ">=3.8,<4.0" +files = [ + {file = "asyncache-0.3.1-py3-none-any.whl", hash = "sha256:ef20a1024d265090dd1e0785c961cf98b9c32cc7d9478973dcf25ac1b80011f5"}, + {file = "asyncache-0.3.1.tar.gz", hash = "sha256:9a1e60a75668e794657489bdea6540ee7e3259c483517b934670db7600bf5035"}, +] + +[package.dependencies] +cachetools = ">=5.2.0,<6.0.0" + [[package]] name = "attrs" version = "23.2.0" @@ -588,6 +602,17 @@ pg8000 = ["pg8000 (>=1.30.4)"] pymysql = ["PyMySQL (>=1.1.0)"] pytds = ["python-tds (>=1.15.0)"] +[[package]] +name = "cloudpickle" +version = "3.0.0" +description = "Pickler class to extend the standard pickle.Pickler functionality" +optional = false +python-versions = ">=3.8" +files = [ + {file = "cloudpickle-3.0.0-py3-none-any.whl", hash = "sha256:246ee7d0c295602a036e86369c77fecda4ab17b506496730f2f576d9016fd9c7"}, + {file = "cloudpickle-3.0.0.tar.gz", hash = "sha256:996d9a482c6fb4f33c1a35335cf8afd065d2a56e973270364840712d9131a882"}, +] + [[package]] name = "cloudquery-plugin-pb" version = "0.0.24" @@ -908,6 +933,57 @@ uvicorn = {version = "*", extras = ["standard"]} notebook = ["nbconvert"] test = ["starlette[full]"] +[[package]] +name = "dask" +version = "2024.4.2" +description = "Parallel PyData with Task Scheduling" +optional = false +python-versions = ">=3.9" +files = [ + {file = "dask-2024.4.2-py3-none-any.whl", hash = "sha256:56fbe92472e3b323ab7beaf2dc8437d48066ac21aa9c2c17ac40d2b6f7b4c414"}, + {file = "dask-2024.4.2.tar.gz", hash = "sha256:3d7a516468d96e72581b84c7bb00172366f30d24c689ea4e9bd1334ab6d98f8a"}, +] + +[package.dependencies] +click = ">=8.1" +cloudpickle = ">=1.5.0" +distributed = {version = "2024.4.2", optional = true, markers = "extra == \"distributed\""} +fsspec = ">=2021.09.0" +importlib-metadata = {version = ">=4.13.0", markers = "python_version < \"3.12\""} +packaging = ">=20.0" +partd = ">=1.2.0" +pyyaml = ">=5.3.1" +toolz = ">=0.10.0" + +[package.extras] +array = ["numpy (>=1.21)"] +complete = ["dask[array,dataframe,diagnostics,distributed]", "lz4 (>=4.3.2)", "pyarrow (>=7.0)", "pyarrow-hotfix"] +dataframe = ["dask-expr (>=1.0,<1.1)", "dask[array]", "pandas (>=1.3)"] +diagnostics = ["bokeh (>=2.4.2)", "jinja2 (>=2.10.3)"] +distributed = ["distributed (==2024.4.2)"] +test = ["pandas[test]", "pre-commit", "pytest", "pytest-cov", "pytest-rerunfailures", "pytest-timeout", "pytest-xdist"] + +[[package]] +name = "dask-kubernetes" +version = "2024.4.2" +description = "Native Kubernetes integration for Dask" +optional = false +python-versions = ">=3.9" +files = [ + {file = "dask-kubernetes-2024.4.2.tar.gz", hash = "sha256:2b88cb401bc2e59ce92a41dafe2d1247e990b83cfcf50ac848f58482abcef3b6"}, + {file = "dask_kubernetes-2024.4.2-py3-none-any.whl", hash = "sha256:2b848e07bd05da4e63a0312e13d8f4d43c9309aad175aced873dffc299348e74"}, +] + +[package.dependencies] +dask = ">=2022.08.1" +distributed = ">=2022.08.1" +kopf = ">=1.35.3" +kr8s = "==0.14.*" +kubernetes = ">=12.0.1" +kubernetes-asyncio = ">=12.0.1" +pykube-ng = ">=22.9.0" +rich = ">=12.5.1" + [[package]] name = "db-dtypes" version = "1.2.0" @@ -1057,6 +1133,34 @@ Pygments = ">=2.9.0,<3.0.0" [package.extras] toml = ["tomli (>=1.2.1)"] +[[package]] +name = "distributed" +version = "2024.4.2" +description = "Distributed scheduler for Dask" +optional = false +python-versions = ">=3.9" +files = [ + {file = "distributed-2024.4.2-py3-none-any.whl", hash = "sha256:801d3b5e5fe5273d0da335db527a12568ee9ad3368eba6a735bb852cf6753dac"}, + {file = "distributed-2024.4.2.tar.gz", hash = "sha256:58e86c407f499b782ecc62b712723d05265c7e8c76ee870b1c3c32e4dfcf1893"}, +] + +[package.dependencies] +click = ">=8.0" +cloudpickle = ">=1.5.0" +dask = "2024.4.2" +jinja2 = ">=2.10.3" +locket = ">=1.0.0" +msgpack = ">=1.0.0" +packaging = ">=20.0" +psutil = ">=5.7.2" +pyyaml = ">=5.3.1" +sortedcontainers = ">=2.0.5" +tblib = ">=1.6.0" +toolz = ">=0.10.0" +tornado = ">=6.0.4" +urllib3 = ">=1.24.3" +zict = ">=3.0.0" + [[package]] name = "docstring-parser" version = "0.16" @@ -1124,23 +1228,6 @@ files = [ {file = "duckdb-0.10.2.tar.gz", hash = "sha256:0f609c9d5f941f1ecde810f010dd9321cd406a552c1df20318a13fa64247f67f"}, ] -[[package]] -name = "dune-contract-usage" -version = "1.0.0" -description = "Collect contract usage from dune" -optional = false -python-versions = "^3.11" -files = [] -develop = true - -[package.dependencies] -arrow = "^1.3.0" -cloudquery-plugin-sdk = "^0.1.12" - -[package.source] -type = "directory" -url = "warehouse/cloudquery-dune-contract-usage" - [[package]] name = "example-plugin" version = "1.0.0" @@ -1982,6 +2069,27 @@ files = [ {file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"}, ] +[[package]] +name = "httpcore" +version = "1.0.5" +description = "A minimal low-level HTTP client." +optional = false +python-versions = ">=3.8" +files = [ + {file = "httpcore-1.0.5-py3-none-any.whl", hash = "sha256:421f18bac248b25d310f3cacd198d55b8e6125c107797b609ff9b7a6ba7991b5"}, + {file = "httpcore-1.0.5.tar.gz", hash = "sha256:34a38e2f9291467ee3b44e89dd52615370e152954ba21721378a87b2960f7a61"}, +] + +[package.dependencies] +certifi = "*" +h11 = ">=0.13,<0.15" + +[package.extras] +asyncio = ["anyio (>=4.0,<5.0)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] +trio = ["trio (>=0.22.0,<0.26.0)"] + [[package]] name = "httplib2" version = "0.22.0" @@ -2044,6 +2152,47 @@ files = [ [package.extras] test = ["Cython (>=0.29.24,<0.30.0)"] +[[package]] +name = "httpx" +version = "0.27.0" +description = "The next generation HTTP client." +optional = false +python-versions = ">=3.8" +files = [ + {file = "httpx-0.27.0-py3-none-any.whl", hash = "sha256:71d5465162c13681bff01ad59b2cc68dd838ea1f10e51574bac27103f00c91a5"}, + {file = "httpx-0.27.0.tar.gz", hash = "sha256:a0cb88a46f32dc874e04ee956e4c2764aba2aa228f650b06788ba6bda2962ab5"}, +] + +[package.dependencies] +anyio = "*" +certifi = "*" +httpcore = "==1.*" +idna = "*" +sniffio = "*" + +[package.extras] +brotli = ["brotli", "brotlicffi"] +cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] + +[[package]] +name = "httpx-ws" +version = "0.6.0" +description = "WebSockets support for HTTPX" +optional = false +python-versions = ">=3.8" +files = [ + {file = "httpx_ws-0.6.0-py3-none-any.whl", hash = "sha256:437cfca94519a4e6ae06eb5573192df6c0da85c22b1a19cc1ea0b02b05a51d25"}, + {file = "httpx_ws-0.6.0.tar.gz", hash = "sha256:60218f531fb474a2143af38568f4b7d94ba356780973443365c8e2c87882bb8c"}, +] + +[package.dependencies] +anyio = ">=4" +httpcore = ">=1.0.4" +httpx = ">=0.23.1" +wsproto = "*" + [[package]] name = "humanfriendly" version = "10.0" @@ -2134,6 +2283,17 @@ qtconsole = ["qtconsole"] test = ["pickleshare", "pytest (<8)", "pytest-asyncio (<0.22)", "testpath"] test-extra = ["curio", "ipython[test]", "matplotlib (!=3.2.0)", "nbformat", "numpy (>=1.23)", "pandas", "trio"] +[[package]] +name = "iso8601" +version = "2.1.0" +description = "Simple module to parse ISO 8601 dates" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "iso8601-2.1.0-py3-none-any.whl", hash = "sha256:aac4145c4dcb66ad8b648a02830f5e2ff6c24af20f4f482689be402db2429242"}, + {file = "iso8601-2.1.0.tar.gz", hash = "sha256:6b1d3829ee8921c4301998c909f7829fa9ed3cbdac0d3b16af2d743aed1ba8df"}, +] + [[package]] name = "isodate" version = "0.6.1" @@ -2233,6 +2393,105 @@ files = [ [package.dependencies] referencing = ">=0.31.0" +[[package]] +name = "kopf" +version = "1.37.2" +description = "Kubernetes Operator Pythonic Framework (Kopf)" +optional = false +python-versions = ">=3.8" +files = [ + {file = "kopf-1.37.2-py3-none-any.whl", hash = "sha256:aa7a5470dd7655adae1aebe8426fc050ce7f21e13d49a3b88ef209176527e28f"}, + {file = "kopf-1.37.2.tar.gz", hash = "sha256:f551db1772b0583e82d33bd1f95eec631000f686965500bb066ef3f5b47960a0"}, +] + +[package.dependencies] +aiohttp = [ + {version = "*", markers = "python_version < \"3.12\""}, + {version = ">=3.9.0", markers = "python_version >= \"3.12\""}, +] +click = "*" +iso8601 = "*" +python-json-logger = "*" +pyyaml = "*" +typing-extensions = "*" + +[package.extras] +dev = ["certbuilder", "certvalidator", "oscrypto", "pyngrok"] +full-auth = ["kubernetes", "pykube-ng"] +uvloop = ["uvloop", "uvloop (>=0.18.0)"] + +[[package]] +name = "kr8s" +version = "0.14.3" +description = "A Kubernetes API library" +optional = false +python-versions = ">=3.8" +files = [ + {file = "kr8s-0.14.3-py3-none-any.whl", hash = "sha256:6f653cf73bf775d3e596a13f10a021dffa3de6c96b5fdb3c6b674a726e920adb"}, + {file = "kr8s-0.14.3.tar.gz", hash = "sha256:79487da3547374e31e7f882aeff26c95285b77bd9ed2716d895ff0bf18896bae"}, +] + +[package.dependencies] +anyio = ">=3.7.0" +asyncache = ">=0.3.1" +cryptography = ">=35" +exceptiongroup = {version = ">=1.2.0", markers = "python_version < \"3.12\""} +httpx = ">=0.24.1" +httpx-ws = ">=0.5.1" +python-box = ">=7.0.1" +python-jsonpath = ">=0.7.1" +pyyaml = ">=6.0" + +[package.extras] +docs = ["furo (>=2023.3.27)", "myst-parser (>=1.0.0)", "sphinx (>=5.3.0)", "sphinx-autoapi (>=2.1.0)", "sphinx-autobuild (>=2021.3.14)", "sphinx-copybutton (>=0.5.1)", "sphinx-design (>=0.3.0)", "sphinxcontrib-mermaid (>=0.8.1)"] +test = ["kubernetes (>=26.1.0)", "kubernetes-asyncio (>=24.2.3)", "kubernetes-validate (>=1.28.0)", "lightkube (>=0.13.0)", "pykube-ng (>=23.6.0)", "pytest (>=7.2.2)", "pytest-asyncio (>=0.20.3)", "pytest-cov (>=4.0.0)", "pytest-kind (>=22.11.1)", "pytest-rerunfailures (>=11.1.2)", "pytest-timeout (>=2.1.0)", "trio (>=0.22.0)"] + +[[package]] +name = "kubernetes" +version = "29.0.0" +description = "Kubernetes python client" +optional = false +python-versions = ">=3.6" +files = [ + {file = "kubernetes-29.0.0-py2.py3-none-any.whl", hash = "sha256:ab8cb0e0576ccdfb71886366efb102c6a20f268d817be065ce7f9909c631e43e"}, + {file = "kubernetes-29.0.0.tar.gz", hash = "sha256:c4812e227ae74d07d53c88293e564e54b850452715a59a927e7e1bc6b9a60459"}, +] + +[package.dependencies] +certifi = ">=14.05.14" +google-auth = ">=1.0.1" +oauthlib = ">=3.2.2" +python-dateutil = ">=2.5.3" +pyyaml = ">=5.4.1" +requests = "*" +requests-oauthlib = "*" +six = ">=1.9.0" +urllib3 = ">=1.24.2" +websocket-client = ">=0.32.0,<0.40.0 || >0.40.0,<0.41.dev0 || >=0.43.dev0" + +[package.extras] +adal = ["adal (>=1.0.2)"] + +[[package]] +name = "kubernetes-asyncio" +version = "29.0.0" +description = "Kubernetes asynchronous python client" +optional = false +python-versions = "*" +files = [ + {file = "kubernetes_asyncio-29.0.0-py3-none-any.whl", hash = "sha256:9dd911260e89d984479f23b1000b8713d35b68fbd92873473449b5558da1a3f4"}, + {file = "kubernetes_asyncio-29.0.0.tar.gz", hash = "sha256:99ff1fea4df062adb7a6e48962ad2504459165ffd7542013bc3d72d9ab3817be"}, +] + +[package.dependencies] +aiohttp = ">=3.9.0,<4.0.0" +certifi = ">=14.05.14" +python-dateutil = ">=2.5.3" +pyyaml = ">=3.12" +setuptools = ">=21.0.0" +six = ">=1.9.0" +urllib3 = ">=1.24.2" + [[package]] name = "leather" version = "0.4.0" @@ -2267,6 +2526,17 @@ dev = ["black", "flake8", "isort", "pre-commit", "pyproject-flake8"] doc = ["myst-parser", "sphinx", "sphinx-book-theme"] test = ["coverage", "pytest", "pytest-cov"] +[[package]] +name = "locket" +version = "1.0.0" +description = "File-based locks for Python on Linux and Windows" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +files = [ + {file = "locket-1.0.0-py2.py3-none-any.whl", hash = "sha256:b6c819a722f7b6bd955b80781788e4a66a55628b858d347536b7e81325a3a5e3"}, + {file = "locket-1.0.0.tar.gz", hash = "sha256:5c0d4c052a8bbbf750e056a8e65ccd309086f4f0f18a2eac306a8dfa4112a632"}, +] + [[package]] name = "logbook" version = "1.5.3" @@ -2296,6 +2566,56 @@ sqlalchemy = ["sqlalchemy"] test = ["mock", "pytest", "pytest-cov (<2.6)"] zmq = ["pyzmq"] +[[package]] +name = "lz4" +version = "4.3.3" +description = "LZ4 Bindings for Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "lz4-4.3.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b891880c187e96339474af2a3b2bfb11a8e4732ff5034be919aa9029484cd201"}, + {file = "lz4-4.3.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:222a7e35137d7539c9c33bb53fcbb26510c5748779364014235afc62b0ec797f"}, + {file = "lz4-4.3.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f76176492ff082657ada0d0f10c794b6da5800249ef1692b35cf49b1e93e8ef7"}, + {file = "lz4-4.3.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1d18718f9d78182c6b60f568c9a9cec8a7204d7cb6fad4e511a2ef279e4cb05"}, + {file = "lz4-4.3.3-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6cdc60e21ec70266947a48839b437d46025076eb4b12c76bd47f8e5eb8a75dcc"}, + {file = "lz4-4.3.3-cp310-cp310-win32.whl", hash = "sha256:c81703b12475da73a5d66618856d04b1307e43428a7e59d98cfe5a5d608a74c6"}, + {file = "lz4-4.3.3-cp310-cp310-win_amd64.whl", hash = "sha256:43cf03059c0f941b772c8aeb42a0813d68d7081c009542301637e5782f8a33e2"}, + {file = "lz4-4.3.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:30e8c20b8857adef7be045c65f47ab1e2c4fabba86a9fa9a997d7674a31ea6b6"}, + {file = "lz4-4.3.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2f7b1839f795315e480fb87d9bc60b186a98e3e5d17203c6e757611ef7dcef61"}, + {file = "lz4-4.3.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:edfd858985c23523f4e5a7526ca6ee65ff930207a7ec8a8f57a01eae506aaee7"}, + {file = "lz4-4.3.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0e9c410b11a31dbdc94c05ac3c480cb4b222460faf9231f12538d0074e56c563"}, + {file = "lz4-4.3.3-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d2507ee9c99dbddd191c86f0e0c8b724c76d26b0602db9ea23232304382e1f21"}, + {file = "lz4-4.3.3-cp311-cp311-win32.whl", hash = "sha256:f180904f33bdd1e92967923a43c22899e303906d19b2cf8bb547db6653ea6e7d"}, + {file = "lz4-4.3.3-cp311-cp311-win_amd64.whl", hash = "sha256:b14d948e6dce389f9a7afc666d60dd1e35fa2138a8ec5306d30cd2e30d36b40c"}, + {file = "lz4-4.3.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:e36cd7b9d4d920d3bfc2369840da506fa68258f7bb176b8743189793c055e43d"}, + {file = "lz4-4.3.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:31ea4be9d0059c00b2572d700bf2c1bc82f241f2c3282034a759c9a4d6ca4dc2"}, + {file = "lz4-4.3.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33c9a6fd20767ccaf70649982f8f3eeb0884035c150c0b818ea660152cf3c809"}, + {file = "lz4-4.3.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bca8fccc15e3add173da91be8f34121578dc777711ffd98d399be35487c934bf"}, + {file = "lz4-4.3.3-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e7d84b479ddf39fe3ea05387f10b779155fc0990125f4fb35d636114e1c63a2e"}, + {file = "lz4-4.3.3-cp312-cp312-win32.whl", hash = "sha256:337cb94488a1b060ef1685187d6ad4ba8bc61d26d631d7ba909ee984ea736be1"}, + {file = "lz4-4.3.3-cp312-cp312-win_amd64.whl", hash = "sha256:5d35533bf2cee56f38ced91f766cd0038b6abf46f438a80d50c52750088be93f"}, + {file = "lz4-4.3.3-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:363ab65bf31338eb364062a15f302fc0fab0a49426051429866d71c793c23394"}, + {file = "lz4-4.3.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:0a136e44a16fc98b1abc404fbabf7f1fada2bdab6a7e970974fb81cf55b636d0"}, + {file = "lz4-4.3.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:abc197e4aca8b63f5ae200af03eb95fb4b5055a8f990079b5bdf042f568469dd"}, + {file = "lz4-4.3.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:56f4fe9c6327adb97406f27a66420b22ce02d71a5c365c48d6b656b4aaeb7775"}, + {file = "lz4-4.3.3-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f0e822cd7644995d9ba248cb4b67859701748a93e2ab7fc9bc18c599a52e4604"}, + {file = "lz4-4.3.3-cp38-cp38-win32.whl", hash = "sha256:24b3206de56b7a537eda3a8123c644a2b7bf111f0af53bc14bed90ce5562d1aa"}, + {file = "lz4-4.3.3-cp38-cp38-win_amd64.whl", hash = "sha256:b47839b53956e2737229d70714f1d75f33e8ac26e52c267f0197b3189ca6de24"}, + {file = "lz4-4.3.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:6756212507405f270b66b3ff7f564618de0606395c0fe10a7ae2ffcbbe0b1fba"}, + {file = "lz4-4.3.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ee9ff50557a942d187ec85462bb0960207e7ec5b19b3b48949263993771c6205"}, + {file = "lz4-4.3.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2b901c7784caac9a1ded4555258207d9e9697e746cc8532129f150ffe1f6ba0d"}, + {file = "lz4-4.3.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b6d9ec061b9eca86e4dcc003d93334b95d53909afd5a32c6e4f222157b50c071"}, + {file = "lz4-4.3.3-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f4c7bf687303ca47d69f9f0133274958fd672efaa33fb5bcde467862d6c621f0"}, + {file = "lz4-4.3.3-cp39-cp39-win32.whl", hash = "sha256:054b4631a355606e99a42396f5db4d22046a3397ffc3269a348ec41eaebd69d2"}, + {file = "lz4-4.3.3-cp39-cp39-win_amd64.whl", hash = "sha256:eac9af361e0d98335a02ff12fb56caeb7ea1196cf1a49dbf6f17828a131da807"}, + {file = "lz4-4.3.3.tar.gz", hash = "sha256:01fe674ef2889dbb9899d8a67361e0c4a2c833af5aeb37dd505727cf5d2a131e"}, +] + +[package.extras] +docs = ["sphinx (>=1.6.0)", "sphinx-bootstrap-theme"] +flake8 = ["flake8"] +tests = ["psutil", "pytest (!=3.3.0)", "pytest-cov"] + [[package]] name = "mako" version = "1.3.3" @@ -2756,6 +3076,22 @@ pyasn1-modules = ">=0.0.5" rsa = ">=3.1.4" six = ">=1.6.1" +[[package]] +name = "oauthlib" +version = "3.2.2" +description = "A generic, spec-compliant, thorough implementation of the OAuth request-signing logic" +optional = false +python-versions = ">=3.6" +files = [ + {file = "oauthlib-3.2.2-py3-none-any.whl", hash = "sha256:8139f29aac13e25d502680e9e19963e83f16838d48a0d71c287fe40e7067fbca"}, + {file = "oauthlib-3.2.2.tar.gz", hash = "sha256:9859c40929662bec5d64f34d01c99e093149682a3f38915dc0655d5a633dd918"}, +] + +[package.extras] +rsa = ["cryptography (>=3.0.0)"] +signals = ["blinker (>=1.4.0)"] +signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"] + [[package]] name = "orjson" version = "3.10.1" @@ -2925,6 +3261,24 @@ files = [ qa = ["flake8 (==3.8.3)", "mypy (==0.782)"] testing = ["docopt", "pytest (<6.0.0)"] +[[package]] +name = "partd" +version = "1.4.1" +description = "Appendable key-value storage" +optional = false +python-versions = ">=3.7" +files = [ + {file = "partd-1.4.1-py3-none-any.whl", hash = "sha256:27e766663d36c161e2827aa3e28541c992f0b9527d3cca047e13fb3acdb989e6"}, + {file = "partd-1.4.1.tar.gz", hash = "sha256:56c25dd49e6fea5727e731203c466c6e092f308d8f0024e199d02f6aa2167f67"}, +] + +[package.dependencies] +locket = "*" +toolz = "*" + +[package.extras] +complete = ["blosc", "numpy (>=1.9.0)", "pandas (>=0.19.0)", "pyzmq"] + [[package]] name = "pathspec" version = "0.11.2" @@ -3407,6 +3761,26 @@ files = [ plugins = ["importlib-metadata"] windows-terminal = ["colorama (>=0.4.6)"] +[[package]] +name = "pykube-ng" +version = "23.6.0" +description = "Python client library for Kubernetes" +optional = false +python-versions = ">=3.8,<4" +files = [ + {file = "pykube-ng-23.6.0.tar.gz", hash = "sha256:46de8e17ed87c1a1014667d60e7d94a1f3fa2b8037b41e67d32c28b5869af35d"}, + {file = "pykube_ng-23.6.0-py3-none-any.whl", hash = "sha256:63f20f634bfcd83966edec32f892286f75dffb817a2c097434ecc039e558ec8f"}, +] + +[package.dependencies] +pyyaml = "*" +requests = ">=2.12" +urllib3 = ">=1.26.9" + +[package.extras] +gcp = ["google-auth", "jsonpath-ng"] +oidc = ["requests-oauthlib (>=1.3.0,<2.0.0)"] + [[package]] name = "pyparsing" version = "3.1.2" @@ -3452,6 +3826,38 @@ pluggy = ">=1.3.0,<2.0" [package.extras] testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +[[package]] +name = "python-box" +version = "7.1.1" +description = "Advanced Python dictionaries with dot notation access" +optional = false +python-versions = ">=3.8" +files = [ + {file = "python-box-7.1.1.tar.gz", hash = "sha256:2a3df244a5a79ac8f8447b5d11b5be0f2747d7b141cb2866060081ae9b53cc50"}, + {file = "python_box-7.1.1-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:81ed1ec0f0ff2370227fc07277c5baca46d190a4747631bad7eb6ab1630fb7d9"}, + {file = "python_box-7.1.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8891735b4148e84d348c6eadd2f127152f751c9603e35d43a1f496183a291ac4"}, + {file = "python_box-7.1.1-cp310-cp310-win_amd64.whl", hash = "sha256:0036fd47d388deaca8ebd65aea905f88ee6ef91d1d8ce34898b66f1824afbe80"}, + {file = "python_box-7.1.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:aabf8b9ae5dbc8ba431d8cbe0d4cfe737a25d52d68b0f5f2ff34915c21a2c1db"}, + {file = "python_box-7.1.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c046608337e723ae4de3206db5d1e1202ed166da2dfdc70c1f9361e72ace5633"}, + {file = "python_box-7.1.1-cp311-cp311-win_amd64.whl", hash = "sha256:f9266795e9c233874fb5b34fa994054b4fb0371881678e6ec45aec17fc95feac"}, + {file = "python_box-7.1.1-cp38-cp38-macosx_11_0_x86_64.whl", hash = "sha256:f76b5b7f0cdc07bfdd4200dc24e6e33189bb2ae322137a2b7110fd41891a3157"}, + {file = "python_box-7.1.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4ea13c98e05a3ec0ff26f254986a17290b69b5ade209fad081fd628f8fcfaa08"}, + {file = "python_box-7.1.1-cp38-cp38-win_amd64.whl", hash = "sha256:1b3f346e332dba16df0b0543d319d9e7ce07d93e5ae152175302894352aa2d28"}, + {file = "python_box-7.1.1-cp39-cp39-macosx_11_0_x86_64.whl", hash = "sha256:24c4ec0ee0278f66321100aaa9c615413da27a14ff43d376a2a3b4665e1d9494"}, + {file = "python_box-7.1.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d95e5eec4fc8f3fc5c9cc7347fc2eb4f9187c853d34c90b1658d1eff96cd4eac"}, + {file = "python_box-7.1.1-cp39-cp39-win_amd64.whl", hash = "sha256:a0f1333c42e81529b6f68c192050df9d4505b803be7ac47f114036b98707f7cf"}, + {file = "python_box-7.1.1-py3-none-any.whl", hash = "sha256:63b609555554d7a9d4b6e725f8e78ef1717c67e7d386200e03422ad612338df8"}, +] + +[package.extras] +all = ["msgpack", "ruamel.yaml (>=0.17)", "toml"] +msgpack = ["msgpack"] +pyyaml = ["PyYAML"] +ruamel-yaml = ["ruamel.yaml (>=0.17)"] +toml = ["toml"] +tomli = ["tomli", "tomli-w"] +yaml = ["ruamel.yaml (>=0.17)"] + [[package]] name = "python-dateutil" version = "2.8.2" @@ -3480,6 +3886,28 @@ files = [ [package.extras] cli = ["click (>=5.0)"] +[[package]] +name = "python-json-logger" +version = "2.0.7" +description = "A python library adding a json log formatter" +optional = false +python-versions = ">=3.6" +files = [ + {file = "python-json-logger-2.0.7.tar.gz", hash = "sha256:23e7ec02d34237c5aa1e29a070193a4ea87583bb4e7f8fd06d3de8264c4b2e1c"}, + {file = "python_json_logger-2.0.7-py3-none-any.whl", hash = "sha256:f380b826a991ebbe3de4d897aeec42760035ac760345e57b812938dc8b35e2bd"}, +] + +[[package]] +name = "python-jsonpath" +version = "1.1.1" +description = "JSONPath, JSON Pointer and JSON Patch for Python." +optional = false +python-versions = ">=3.7" +files = [ + {file = "python_jsonpath-1.1.1-py3-none-any.whl", hash = "sha256:43f2622b7aaaf4f45dd873e80cfd181058503e08ffdeac5218135f3a97bd0aec"}, + {file = "python_jsonpath-1.1.1.tar.gz", hash = "sha256:d2944e1f7a1d6c8fa958724f9570b8f04a4e00ab6bf1e4733346ab8dcef1f74f"}, +] + [[package]] name = "python-slugify" version = "8.0.4" @@ -3740,6 +4168,24 @@ urllib3 = ">=1.21.1,<3" socks = ["PySocks (>=1.5.6,!=1.5.7)"] use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +[[package]] +name = "requests-oauthlib" +version = "2.0.0" +description = "OAuthlib authentication support for Requests." +optional = false +python-versions = ">=3.4" +files = [ + {file = "requests-oauthlib-2.0.0.tar.gz", hash = "sha256:b3dffaebd884d8cd778494369603a9e7b58d29111bf6b41bdc2dcd87203af4e9"}, + {file = "requests_oauthlib-2.0.0-py2.py3-none-any.whl", hash = "sha256:7dd8a5c40426b779b0868c404bdef9768deccf22749cde15852df527e6269b36"}, +] + +[package.dependencies] +oauthlib = ">=3.0.0" +requests = ">=2.0.0" + +[package.extras] +rsa = ["oauthlib[signedtoken] (>=3.0.0)"] + [[package]] name = "requests-toolbelt" version = "1.0.0" @@ -4056,6 +4502,17 @@ files = [ {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, ] +[[package]] +name = "sortedcontainers" +version = "2.4.0" +description = "Sorted Containers -- Sorted List, Sorted Dict, Sorted Set" +optional = false +python-versions = "*" +files = [ + {file = "sortedcontainers-2.4.0-py2.py3-none-any.whl", hash = "sha256:a163dcaede0f1c021485e957a39245190e74249897e2ae4b2aa38595db237ee0"}, + {file = "sortedcontainers-2.4.0.tar.gz", hash = "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88"}, +] + [[package]] name = "sqlalchemy" version = "2.0.28" @@ -4412,6 +4869,17 @@ files = [ {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, ] +[[package]] +name = "toolz" +version = "0.12.1" +description = "List processing tools and functional utilities" +optional = false +python-versions = ">=3.7" +files = [ + {file = "toolz-0.12.1-py3-none-any.whl", hash = "sha256:d22731364c07d72eea0a0ad45bafb2c2937ab6fd38a3507bf55eae8744aa7d85"}, + {file = "toolz-0.12.1.tar.gz", hash = "sha256:ecca342664893f177a13dac0e6b41cbd8ac25a358e5f215316d43e2100224f4d"}, +] + [[package]] name = "toposort" version = "1.10" @@ -4423,6 +4891,26 @@ files = [ {file = "toposort-1.10.tar.gz", hash = "sha256:bfbb479c53d0a696ea7402601f4e693c97b0367837c8898bc6471adfca37a6bd"}, ] +[[package]] +name = "tornado" +version = "6.4" +description = "Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed." +optional = false +python-versions = ">= 3.8" +files = [ + {file = "tornado-6.4-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:02ccefc7d8211e5a7f9e8bc3f9e5b0ad6262ba2fbb683a6443ecc804e5224ce0"}, + {file = "tornado-6.4-cp38-abi3-macosx_10_9_x86_64.whl", hash = "sha256:27787de946a9cffd63ce5814c33f734c627a87072ec7eed71f7fc4417bb16263"}, + {file = "tornado-6.4-cp38-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f7894c581ecdcf91666a0912f18ce5e757213999e183ebfc2c3fdbf4d5bd764e"}, + {file = "tornado-6.4-cp38-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e43bc2e5370a6a8e413e1e1cd0c91bedc5bd62a74a532371042a18ef19e10579"}, + {file = "tornado-6.4-cp38-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f0251554cdd50b4b44362f73ad5ba7126fc5b2c2895cc62b14a1c2d7ea32f212"}, + {file = "tornado-6.4-cp38-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:fd03192e287fbd0899dd8f81c6fb9cbbc69194d2074b38f384cb6fa72b80e9c2"}, + {file = "tornado-6.4-cp38-abi3-musllinux_1_1_i686.whl", hash = "sha256:88b84956273fbd73420e6d4b8d5ccbe913c65d31351b4c004ae362eba06e1f78"}, + {file = "tornado-6.4-cp38-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:71ddfc23a0e03ef2df1c1397d859868d158c8276a0603b96cf86892bff58149f"}, + {file = "tornado-6.4-cp38-abi3-win32.whl", hash = "sha256:6f8a6c77900f5ae93d8b4ae1196472d0ccc2775cc1dfdc9e7727889145c45052"}, + {file = "tornado-6.4-cp38-abi3-win_amd64.whl", hash = "sha256:10aeaa8006333433da48dec9fe417877f8bcc21f48dda8d661ae79da357b2a63"}, + {file = "tornado-6.4.tar.gz", hash = "sha256:72291fa6e6bc84e626589f1c29d90a5a6d593ef5ae68052ee2ef000dfd273dee"}, +] + [[package]] name = "tqdm" version = "4.66.2" @@ -4477,13 +4965,13 @@ typing-extensions = ">=3.7.4.3" [[package]] name = "types-python-dateutil" -version = "2.8.19.20240311" +version = "2.9.0.20240316" description = "Typing stubs for python-dateutil" optional = false python-versions = ">=3.8" files = [ - {file = "types-python-dateutil-2.8.19.20240311.tar.gz", hash = "sha256:51178227bbd4cbec35dc9adffbf59d832f20e09842d7dcb8c73b169b8780b7cb"}, - {file = "types_python_dateutil-2.8.19.20240311-py3-none-any.whl", hash = "sha256:ef813da0809aca76472ca88807addbeea98b19339aebe56159ae2f4b4f70857a"}, + {file = "types-python-dateutil-2.9.0.20240316.tar.gz", hash = "sha256:5d2f2e240b86905e40944dd787db6da9263f0deabef1076ddaed797351ec0202"}, + {file = "types_python_dateutil-2.9.0.20240316-py3-none-any.whl", hash = "sha256:6b8cb66d960771ce5ff974e9dd45e38facb81718cc1e208b10b1baccbfdbee3b"}, ] [[package]] @@ -4775,6 +5263,22 @@ files = [ {file = "wcwidth-0.2.13.tar.gz", hash = "sha256:72ea0c06399eb286d978fdedb6923a9eb47e1c486ce63e9b4e64fc18303972b5"}, ] +[[package]] +name = "websocket-client" +version = "1.8.0" +description = "WebSocket client for Python with low level API options" +optional = false +python-versions = ">=3.8" +files = [ + {file = "websocket_client-1.8.0-py3-none-any.whl", hash = "sha256:17b44cc997f5c498e809b22cdf2d9c7a9e71c02c8cc2b6c56e7c2d1239bfa526"}, + {file = "websocket_client-1.8.0.tar.gz", hash = "sha256:3239df9f44da632f96012472805d40a23281a991027ce11d2f45a6f24ac4c3da"}, +] + +[package.extras] +docs = ["Sphinx (>=6.0)", "myst-parser (>=2.0.0)", "sphinx-rtd-theme (>=1.1.0)"] +optional = ["python-socks", "wsaccel"] +test = ["websockets"] + [[package]] name = "websockets" version = "12.0" @@ -4856,6 +5360,20 @@ files = [ {file = "websockets-12.0.tar.gz", hash = "sha256:81df9cbcbb6c260de1e007e58c011bfebe2dafc8435107b0537f393dd38c8b1b"}, ] +[[package]] +name = "wsproto" +version = "1.2.0" +description = "WebSockets state-machine based protocol implementation" +optional = false +python-versions = ">=3.7.0" +files = [ + {file = "wsproto-1.2.0-py3-none-any.whl", hash = "sha256:b9acddd652b585d75b20477888c56642fdade28bdfd3579aa24a4d2c037dd736"}, + {file = "wsproto-1.2.0.tar.gz", hash = "sha256:ad565f26ecb92588a3e43bc3d96164de84cd9902482b130d0ddbaa9664a85065"}, +] + +[package.dependencies] +h11 = ">=0.9.0,<1" + [[package]] name = "yarl" version = "1.9.4" @@ -4959,6 +5477,17 @@ files = [ idna = ">=2.0" multidict = ">=4.0" +[[package]] +name = "zict" +version = "3.0.0" +description = "Mutable mapping tools" +optional = false +python-versions = ">=3.8" +files = [ + {file = "zict-3.0.0-py2.py3-none-any.whl", hash = "sha256:5796e36bd0e0cc8cf0fbc1ace6a68912611c1dbd74750a3f3026b9b9d6a327ae"}, + {file = "zict-3.0.0.tar.gz", hash = "sha256:e321e263b6a97aafc0790c3cfb3c04656b7066e6738c37fffcca95d803c9fba5"}, +] + [[package]] name = "zipp" version = "3.17.0" @@ -4977,4 +5506,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.11,<3.13" -content-hash = "b006ed4a7ebdef3a8d3a59f927e5677642e60899cb61ba1d1a77a4592d59a89e" +content-hash = "6fd4761fabc34b6e4c21109215d5624377bc51bca6e63ebd85e20b0499848985" diff --git a/pyproject.toml b/pyproject.toml index d5fd6ff8a..c0dd77259 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,6 @@ packages = [ [tool.poetry.dependencies] python = "^3.11,<3.13" example-plugin = { path = "warehouse/cloudquery-example-plugin", develop = true } -dune-contract-usage = { path = "warehouse/cloudquery-dune-contract-usage", develop = true } google-cloud-bigquery = "^3.17.1" pendulum = "^3.0.0" google-api-python-client = "^2.116.0" @@ -34,6 +33,10 @@ dagster-dbt = "^0.23.2" dagster-webserver = "^1.7.2" dagster-gcp = "^0.23.2" duckdb = "^0.10.2" +dask-kubernetes = "^2024.4.2" +dask = { extras = ["distributed"], version = "^2024.4.2" } +lz4 = "^4.3.3" +arrow = "^1.3.0" [tool.poetry.scripts] @@ -71,4 +74,4 @@ project_dir = "." target = "playground" [tool.dagster] -module_name = "oso_dagster" +module_name = "oso_dagster.definitions" diff --git a/warehouse/oso_dagster/__init__.py b/warehouse/oso_dagster/__init__.py index ddab213e0..e69de29bb 100644 --- a/warehouse/oso_dagster/__init__.py +++ b/warehouse/oso_dagster/__init__.py @@ -1,45 +0,0 @@ -import os - -from dagster import Definitions -from dagster_dbt import DbtCliResource -from dagster_gcp import BigQueryResource, GCSResource - -from .assets import ( - main_dbt_assets, - karma3_globaltrust, - karma3_globaltrust_config, - karma3_localtrust, - testing_goldsky, - async_asset, -) -from .constants import main_dbt_project_dir -from .schedules import schedules - -from dotenv import load_dotenv - -load_dotenv() - -defs = Definitions( - assets=[main_dbt_assets, testing_goldsky, async_asset] - + karma3_globaltrust.assets - + karma3_globaltrust_config.assets - + karma3_localtrust.assets, - schedules=schedules, - resources={ - "main_dbt": DbtCliResource(project_dir=os.fspath(main_dbt_project_dir)), - "bigquery": BigQueryResource( - project=os.environ.get("GOOGLE_PROJECT_ID"), # required - ), - "gcs": GCSResource( - project=os.environ.get("GOOGLE_PROJECT_ID"), # required - ), - }, - jobs=[] - + karma3_globaltrust.jobs - + karma3_globaltrust_config.jobs - + karma3_localtrust.jobs, - sensors=[] - + karma3_globaltrust.sensors - + karma3_globaltrust_config.sensors - + karma3_localtrust.sensors, -) diff --git a/warehouse/oso_dagster/assets.py b/warehouse/oso_dagster/assets.py index 2a1164cda..456e74f31 100644 --- a/warehouse/oso_dagster/assets.py +++ b/warehouse/oso_dagster/assets.py @@ -1,6 +1,7 @@ from concurrent.futures import ProcessPoolExecutor import os import uuid +import time import re import arrow import asyncio @@ -40,14 +41,10 @@ from dagster_dbt import DbtCliResource, dbt_assets, DagsterDbtTranslator from .constants import main_dbt_manifest_path from .goldsky import ( - GoldskyDuckDB, - GoldskyQueueItem, GoldskyConfig, - GoldskyContext, - GoldskyQueue, - GoldskyQueues, - mp_load_goldsky_worker, + GoldskyAsset, ) +from .goldsky_dask import RetryTaskManager class Interval(Enum): @@ -142,292 +139,68 @@ def parse_interval_prefix(interval: Interval, prefix: str) -> arrow.Arrow: return arrow.get(prefix, "YYYYMMDD") -async def load_goldsky_worker( - job_id: str, - context: AssetExecutionContext, - config: GoldskyConfig, - gs_context: GoldskyContext, - gs_duckdb: GoldskyDuckDB, - worker: str, - queue: GoldskyQueue, - last_checkpoint_from_previous_run: Optional[int] = None, -): - context.log.info(f"starting the worker for {worker}") - item = queue.dequeue() - if not item: - context.log.info(f"nothing to load for worker {worker}") - return - last_checkpoint = item.checkpoint - 1 - batch_to_load: List[GoldskyQueueItem] = [item] - current_batch = 0 - - while item: - if item.checkpoint > last_checkpoint: - if item.checkpoint - 1 != last_checkpoint: - context.log.info( - "potentially missing or checkpoints number jumped unexpectedly. not erroring" - ) - else: - raise Exception( - f"Unexpected out of order checkpoints current: {item.checkpoint} last: {last_checkpoint}" - ) - if item.checkpoint % 10 == 0: - context.log.info(f"Processing {item.blob_name}") - last_checkpoint = item.checkpoint - - item = queue.dequeue() - if not item: - break - batch_to_load.append(item) - - if len(batch_to_load) > config.size: - gs_duckdb.load_and_merge( - worker, - current_batch, - batch_to_load, - ) - current_batch += 1 - batch_to_load = [] - if len(batch_to_load) > 0: - gs_duckdb.load_and_merge( - worker, - current_batch, - batch_to_load, - ) - current_batch += 1 - batch_to_load = [] - - # Load all of the tables into bigquery - with gs_context.bigquery.get_client() as client: - dest_table_ref = client.get_dataset(config.dataset_name).table( - f"{config.table_name}_{worker}" - ) - new = last_checkpoint_from_previous_run is None - try: - client.get_table(dest_table_ref) - except NotFound as exc: - if last_checkpoint_from_previous_run is not None: - raise exc - new = True - - if not new: - context.log.info("Merging into worker table") - client.query_and_wait( - f""" - LOAD DATA OVERWRITE `{config.project_id}.{config.dataset_name}.{config.table_name}_{worker}_{job_id}` - FROM FILES ( - format = "PARQUET", - uris = ["{gs_duckdb.wildcard_path(worker)}"] - ); - """ - ) - tx_query = f""" - BEGIN - BEGIN TRANSACTION; - INSERT INTO `{config.project_id}.{config.dataset_name}.{config.table_name}_{worker}` - SELECT * FROM `{config.project_id}.{config.dataset_name}.{config.table_name}_{worker}_{job_id}`; - - INSERT INTO `{config.project_id}.{config.dataset_name}.{config.table_name}_pointer_state` (worker, last_checkpoint) - VALUES ('{worker}', {last_checkpoint}); - COMMIT TRANSACTION; - EXCEPTION WHEN ERROR THEN - -- Roll back the transaction inside the exception handler. - SELECT @@error.message; - ROLLBACK TRANSACTION; - END; - """ - context.log.debug(f"query: {tx_query}") - client.query_and_wait(tx_query) - client.query_and_wait( - f""" - DROP TABLE `{config.project_id}.{config.dataset_name}.{config.table_name}_{worker}_{job_id}`; - """ - ) - else: - context.log.info("Creating new worker table") - query1 = f""" - LOAD DATA OVERWRITE `{config.project_id}.{config.dataset_name}.{config.table_name}_{worker}` - FROM FILES ( - format = "PARQUET", - uris = ["{gs_duckdb.wildcard_path(worker)}"] - ); - """ - context.log.debug(f"query: {query1}") - client.query_and_wait(query1) - rows = client.query_and_wait( - f""" - INSERT INTO `{config.project_id}.{config.dataset_name}.{config.table_name}_pointer_state` (worker, last_checkpoint) - VALUES ('{worker}', {last_checkpoint}); - """ - ) - context.log.info(rows) - - -def load_goldsky_queue_item( - context: AssetExecutionContext, - project_id: str, - bucket_name: str, - dataset_name: str, - table_name: str, - partition_column_name: str, - bigquery: BigQueryResource, - worker: str, - item: GoldskyQueueItem, -): - with bigquery.get_client() as bq_client: - dataset = bq_client.dataset(project_id) - temp_table_name = f"{project_id}.{dataset_name}.{table_name}_worker_{worker}_checkpoint_premerge" - - bq_client.query_and_wait( - f""" - LOAD DATA OVERWRITE `{temp_table_name}` - FROM FILES ( - format = "PARQUET", - uris = ["gs://{bucket_name}/{item.blob_name}"] - ); - """ - ) - - -@asset(key="optimism_traces") -async def testing_goldsky( - context: AssetExecutionContext, bigquery: BigQueryResource, gcs: GCSResource -) -> MaterializeResult: - goldsky_re = re.compile( - os.path.join("goldsky", "optimism-traces") - + r"/(?P\d+-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-(?P\d+)-(?P\d+).parquet" - ) - gs_config = GoldskyConfig( +optimism_traces_parallel = GoldskyAsset.setup_asset( + name="optimism_traces_parallel", + config=GoldskyConfig( + "optimism-traces", "opensource-observer", "oso-dataset-transfer-bucket", "oso_raw_sources", "optimism_traces", "block_number", - int(os.environ.get("GOLDSKY_BATCH_SIZE", "40")), - int(os.environ.get("GOLDSKY_CHECKPOINT_SIZE", "1000")), + int(os.environ.get("GOLDSKY_BATCH_SIZE", "10")), + int(os.environ.get("GOLDSKY_CHECKPOINT_SIZE", "100")), os.environ.get("DUCKDB_GCS_KEY_ID"), os.environ.get("DUCKDB_GCS_SECRET"), - ) - - gcs_client = gcs.get_client() - blobs = gcs_client.list_blobs("oso-dataset-transfer-bucket", prefix="goldsky") - - parsed_files = [] - gs_job_ids = set() - queues = GoldskyQueues(max_size=int(os.environ.get("GOLDSKY_MAX_QUEUE_SIZE", 200))) - - worker_status: Mapping[str, int] = {} - # Get the current state - with bigquery.get_client() as client: - try: - rows = client.query_and_wait( - f""" - SELECT worker, MAX(last_checkpoint) AS last_checkpoint - FROM `{gs_config.project_id}.{gs_config.dataset_name}.{gs_config.table_name}_pointer_state` - GROUP BY 1; - """ - ) - for row in rows: - context.log.debug(row) - worker_status[row.worker] = row.last_checkpoint - except NotFound: - context.log.info("No pointer status found. Will create the table later") - - for blob in blobs: - match = goldsky_re.match(blob.name) - if not match: - context.log.debug(f"skipping {blob.name}") - continue - parsed_files.append(match) - worker = match.group("worker") - gs_job_ids.add(match.group("job_id")) - checkpoint = int(match.group("checkpoint")) - if checkpoint <= worker_status.get(worker, -1): - context.log.debug(f"skipping {blob.name} as it was already processed") - continue - queues.enqueue( - worker, - GoldskyQueueItem( - checkpoint, - blob.name, - ), - ) - - if len(gs_job_ids) > 1: - raise Exception("We aren't currently handling multiple job ids") - - gs_context = GoldskyContext(bigquery, gcs) - - job_id = arrow.now().format("YYYYMMDDHHmm") - - pointer_table = f"{gs_config.project_id}.{gs_config.dataset_name}.{gs_config.table_name}_pointer_state" - - with bigquery.get_client() as client: - dataset = client.get_dataset(gs_config.dataset_name) - table_name = f"{gs_config.table_name}_pointer_state" - pointer_table_ref = dataset.table(table_name) - try: - client.get_table(pointer_table_ref) - except NotFound as exc: - if table_name in exc.message: - context.log.info("Pointer table not found.") - client.query_and_wait( - f""" - CREATE TABLE {pointer_table} (worker STRING, last_checkpoint INT64); - """ - ) - else: - raise exc - - # gs_duckdb = GoldskyDuckDB.connect( - # f"_temp/{job_id}", - # gs_config.bucket_name, - # gs_config.bucket_key_id, - # gs_config.bucket_secret, - # os.environ.get("DAGSTER_DUCKDB_PATH"), - # context.log, - # os.environ.get("DUCKDB_MEMORY_LIMIT", "16GB"), - # ) - - worker_coroutines = [] - - # For each worker - for worker, queue in queues.worker_queues(): - context.log.info(f"Creating coroutines for worker {worker}") - await mp_load_goldsky_worker( - job_id, - context, - gs_config, - gs_context, - worker, - queue, - last_checkpoint_from_previous_run=worker_status.get(worker, None), - ) - - # await asyncio.gather(*worker_coroutines) - - # Create a temporary table to load the current checkpoint - - # Check for duplicates between the current checkpoint and the existing table - # of data - - # In a transaction - # -- Merge the dataset into the main table - # -- Update a source pointer for the current checkpoint (so we don't reprocess) - # -- Delete the temporary table + ), +) - # TODOS - # Move the data into cold storage - - return MaterializeResult( - metadata=dict( - job_id_count=len(gs_job_ids), - job_ids=list(gs_job_ids), - worker_count=len(queues.workers()), - workers=list(queues.workers()), - status=queues.status(), - ) - ) +# @asset(key="optimism_traces_parallel") +# def optimism_traces_parallel( +# context: AssetExecutionContext, bigquery: BigQueryResource, gcs: GCSResource +# ) -> MaterializeResult: +# config = GoldskyConfig( +# "optimism-traces", +# "opensource-observer", +# "oso-dataset-transfer-bucket", +# "oso_raw_sources", +# "optimism_traces", +# "block_number", +# int(os.environ.get("GOLDSKY_BATCH_SIZE", "10")), +# int(os.environ.get("GOLDSKY_CHECKPOINT_SIZE", "100")), +# os.environ.get("DUCKDB_GCS_KEY_ID"), +# os.environ.get("DUCKDB_GCS_SECRET"), +# ) +# loop = asyncio.new_event_loop() + +# last_restart = time.time() +# retries = 0 +# while True: +# try: +# asset = GoldskyAsset(gcs, bigquery, config) +# task_manager = RetryTaskManager.setup( +# loop, +# config.bucket_key_id, +# config.bucket_secret, +# asset.cluster_spec, +# context.log, +# ) +# try: +# loop.run_until_complete(asset.materialize(task_manager, context)) +# return +# finally: +# task_manager.close() +# except Exception as e: +# now = time.time() +# if now > last_restart + 120: +# last_restart = now +# retries = 1 +# continue +# else: +# if retries > 3: +# raise e +# context.log.error("kube cluster probably disconnected retrying") +# retries += 1 def interval_gcs_import_asset(key: str, config: IntervalGCSAsset, **kwargs): @@ -582,43 +355,6 @@ def gcs_clean_up_sensor( return AssetFactoryResponse([gcs_asset], [gcs_clean_up_sensor], [gcs_clean_up_job]) -async def sleep_and_print(msg: str, sleep: float): - await asyncio.sleep(sleep) - print(msg) - - -@dataclass -class Boop: - foo: str - bar: str - - -official_name = None - - -def mp_test(x: Boop, num: int): - print(f"hi from multiproc {num} {x} {official_name}") - - -def mp_init(x: int, y: str): - global official_name - official_name = uuid.uuid4() - print(f"hi from the multiproc init {x} {y} {official_name}") - - -@asset -async def async_asset() -> MaterializeResult: - with ProcessPoolExecutor(8, initializer=mp_init, initargs=(1, "hello")) as executor: - futures = [] - for i in range(10): - future = executor.submit(mp_test, Boop("a", "b"), i) - futures.append(asyncio.wrap_future(future)) - print("wait for the pool to finish") - await asyncio.gather(*futures) - - return MaterializeResult(metadata={"boop": True}) - - karma3_globaltrust = interval_gcs_import_asset( "karma3_globaltrust", IntervalGCSAsset( diff --git a/warehouse/oso_dagster/cbt/__init__.py b/warehouse/oso_dagster/cbt/__init__.py new file mode 100644 index 000000000..b59ca24e3 --- /dev/null +++ b/warehouse/oso_dagster/cbt/__init__.py @@ -0,0 +1,25 @@ +# CBT - cheap build tool (only works for bigquery) +# +# A poor excuse for a dbt replacement when calling sql as a library +from dagster_gcp import BigQueryResource +from google.cloud.bigquery import TableReference +from jinja2 import Environment, FileSystemLoader, select_autoescape + + +class CBT: + def __init__(self, bigquery: BigQueryResource, search_paths: List[str]): + self.bigquery = bigquery + loader = FileSystemLoader(search_paths) + self.env = Environment( + loader=loader, + ) + + def run_load( + self, query_file: str, destination_table: TableReference, *args, **kwargs + ): + select_query_template = self.env.get_template(query_file) + select_query = select_query_template.render(*args, **kwargs) + with self.bigquery.get_client() as client: + job = client.query(select_query) + job.destination(destination_table) + job.result() diff --git a/warehouse/oso_dagster/definitions.py b/warehouse/oso_dagster/definitions.py new file mode 100644 index 000000000..dc4acdfe8 --- /dev/null +++ b/warehouse/oso_dagster/definitions.py @@ -0,0 +1,47 @@ +import os + +from dagster import Definitions +from dagster_dbt import DbtCliResource +from dagster_gcp import BigQueryResource, GCSResource + +from .assets import ( + main_dbt_assets, + karma3_globaltrust, + karma3_globaltrust_config, + karma3_localtrust, + optimism_traces_parallel, +) +from .constants import main_dbt_project_dir +from .schedules import schedules + +from dotenv import load_dotenv + +load_dotenv() + +defs = Definitions( + assets=[ + main_dbt_assets, + optimism_traces_parallel, + ] + + karma3_globaltrust.assets + + karma3_globaltrust_config.assets + + karma3_localtrust.assets, + schedules=schedules, + resources={ + "main_dbt": DbtCliResource(project_dir=os.fspath(main_dbt_project_dir)), + "bigquery": BigQueryResource( + project=os.environ.get("GOOGLE_PROJECT_ID"), # required + ), + "gcs": GCSResource( + project=os.environ.get("GOOGLE_PROJECT_ID"), # required + ), + }, + jobs=[] + + karma3_globaltrust.jobs + + karma3_globaltrust_config.jobs + + karma3_localtrust.jobs, + sensors=[] + + karma3_globaltrust.sensors + + karma3_globaltrust_config.sensors + + karma3_localtrust.sensors, +) diff --git a/warehouse/oso_dagster/goldsky.py b/warehouse/oso_dagster/goldsky.py index 3cfe34374..3bc503baf 100644 --- a/warehouse/oso_dagster/goldsky.py +++ b/warehouse/oso_dagster/goldsky.py @@ -2,20 +2,24 @@ import asyncio import os import arrow -from concurrent.futures import ProcessPoolExecutor -from queue import Empty -import multiprocessing as mp +import re +import json +from dask.distributed import get_worker +from dask_kubernetes.operator import make_cluster_spec from dataclasses import dataclass -from typing import List, Mapping, Optional +from typing import List, Mapping, Tuple import heapq import duckdb -from dagster import DagsterLogManager, AssetExecutionContext, MaterializeResult +from dagster import asset, DagsterLogManager, AssetExecutionContext, MaterializeResult from dagster_gcp import BigQueryResource, GCSResource from google.api_core.exceptions import NotFound +from .goldsky_dask import setup_kube_cluster_client, DuckDBGCSPlugin, RetryTaskManager @dataclass class GoldskyConfig: + # This is the name of the asset within the goldsky directory path in gcs + asset_name: str project_id: str bucket_name: str dataset_name: str @@ -25,17 +29,10 @@ class GoldskyConfig: pointer_size: int bucket_key_id: str bucket_secret: str - - -@dataclass -class GoldskyContext: - bigquery: BigQueryResource - gcs: GCSResource - - -class GoldskyWorkerLoader: - def __init__(self, worker: str): - pass + worker_memory: str = "4096Mi" + scheduler_memory: str = "2560Mi" + goldsky_dir_path: str = "goldsky" + temp_path: str = "_temp" @dataclass @@ -97,422 +94,229 @@ def worker_queues(self): return self.queues.items() -class GoldskyProcess: - @classmethod - def start( - cls, - queue: mp.Queue, - log_queue: mp.Queue, - gcs_destination_path: str, - config: GoldskyConfig, - ): - proc = mp.Process( - target=goldsky_process_worker, - args=( - gcs_destination_path, - queue, - log_queue, - config, - ), - kwargs={ - "memory_limit": os.environ.get("GOLDSKY_PROCESS_DUCKDB_MEMORY", "2GB") - }, - ) - proc.start() - return cls(proc, queue) - - def __init__(self, process: mp.Process, queue: mp.Queue): - self._process = process - self._queue = queue - - @property - def process(self): - return self._process - - @property - def queue(self): - return self._queue - - -class MultiProcessLogger: - def __init__(self, queue: mp.Queue): - self.queue = queue - - def debug(self, message: str): - self.queue.put(["debug", message]) - - def info(self, message: str): - self.queue.put(["info", message]) - - def warn(self, message: str): - self.queue.put(["warn", message]) - - @dataclass -class MPWorkerItem: - worker: str - blob_name: str +class GoldskyProcessItem: + source: str + destination: str checkpoint: int -def goldsky_process_worker( - destination_path: str, - queue: mp.Queue, - log_queue: mp.Queue, - config: GoldskyConfig, - memory_limit: str = "16GB", -): - gs_duckdb = MPGoldskyDuckDB.connect( - destination_path, - config.bucket_name, - config.bucket_key_id, - config.bucket_secret, - "", - MultiProcessLogger(log_queue), - memory_limit=memory_limit, - ) - - item: MPWorkerItem = queue.get() - while True: - gs_duckdb.load_and_add_checkpoint( - item.worker, item.checkpoint, item.blob_name, item.checkpoint - ) - try: - item = queue.get(block=False) - except Empty: - break +def process_goldsky_file(item: GoldskyProcessItem): + worker = get_worker() + plugin: DuckDBGCSPlugin = worker.plugins["duckdb-gcs"] + query = f""" + COPY ( + SELECT {item.checkpoint} as _checkpoint, * + FROM read_parquet('{item.source}') + ) TO '{item.destination}'; + """ + print(f"Querying with: {query}") + worker.log_event("info", {"message": "running query", "query": query}) + plugin.conn.sql(query) + return True -class GoldskyDuckDB: - @classmethod - def connect( - cls, - destination_path: str, - bucket_name: str, - key_id: str, - secret: str, - path: str, - log: DagsterLogManager, - memory_limit: str = "16GB", - ): - conn = duckdb.connect() - conn.sql( - f""" - CREATE SECRET ( - TYPE GCS, - KEY_ID '{key_id}', - SECRET '{secret}' - ); - """ - ) - conn.sql(f"SET memory_limit = '{memory_limit}';") - return cls(bucket_name, destination_path, log, conn) - +class GoldskyWorker: def __init__( self, - bucket_name: str, - destination_path: str, - log: DagsterLogManager, - conn: duckdb.DuckDBPyConnection, + worker: str, + job_id: str, + pointer_table: str, + latest_checkpoint: int | None, + gcs: GCSResource, + bigquery: BigQueryResource, + config: GoldskyConfig, + queue: GoldskyQueue, + task_manager: RetryTaskManager, ): - self.destination_path = destination_path - self.bucket_name = bucket_name - self.conn = conn - self.log = log - - def full_dest_table_path(self, worker: str, batch_id: int): - return f"gs://{self.bucket_name}/{self.destination_path}/{worker}/table_{batch_id}.parquet" - - def full_dest_delete_path(self, worker: str, batch_id: int): - return f"gs://{self.bucket_name}/{self.destination_path}/{worker}/delete_{batch_id}.parquet" - - def full_dest_deduped_path(self, worker: str, batch_id: int): - return f"gs://{self.bucket_name}/{self.destination_path}/{worker}/deduped_{batch_id}.parquet" - - def wildcard_path(self, worker: str): - return ( - f"gs://{self.bucket_name}/{self.destination_path}/{worker}/table_*.parquet" - ) + self.worker = worker + self.config = config + self.gcs = gcs + self.queue = queue + self.bigquery = bigquery + self.task_manager = task_manager + self.job_id = job_id + self.latest_checkpoint = latest_checkpoint + self.pointer_table = pointer_table - def remove_dupes(self, worker: str, batches: List[int]): - for batch_id in batches[:-1]: - self.remove_dupe_for_batch(worker, batch_id) - self.remove_dupe_for_batch(worker, batches[-1], last=True) - - def remove_dupe_for_batch(self, worker: str, batch_id: int, last: bool = False): - self.log.info(f"removing duplicates for batch {batch_id}") - self.conn.sql( - f""" - CREATE OR REPLACE TABLE deduped_{worker}_{batch_id} - AS - SELECT * FROM read_parquet('{self.full_dest_table_path(worker, batch_id)}') - """ - ) + def worker_destination_path(self, filename: str): + return f"gs://{self.config.bucket_name}/{self.config.temp_path}/{self.job_id}/{self.worker}/{filename}" - if not last: - self.conn.sql( - f""" - DELETE FROM deduped_{worker}_{batch_id} - WHERE id in ( - SELECT id FROM read_parquet('{self.full_dest_delete_path(worker, batch_id)}') + @property + def worker_wildcard_path(self): + return self.worker_destination_path("table_*.parquet") + + async def process(self, context: AssetExecutionContext): + count = 0 + item = self.queue.dequeue() + current_checkpoint = item.checkpoint + in_flight = [] + while item is not None: + source = f"gs://{self.config.bucket_name}/{item.blob_name}" + destination = self.worker_destination_path( + f"table_{item.checkpoint}.parquet" ) - """ + in_flight.append( + self.task_manager.submit( + process_goldsky_file, + args=( + GoldskyProcessItem( + source=source, + destination=destination, + checkpoint=item.checkpoint, + ), + ), + pure=False, + ) ) + count += 1 + if count >= self.config.pointer_size: + context.log.debug( + f"Worker {self.worker} waiting for {len(in_flight)} blobs to process" + ) + progress = 0 + for coro in asyncio.as_completed(in_flight): + await coro + progress += 1 + context.log.debug( + f"Worker[{self.worker}] progress: {progress}/{count}" + ) + context.log.debug(f"Worker[{self.worker}] done waiting for blobs") - self.conn.sql( - f""" - COPY deduped_{worker}_{batch_id} TO '{self.full_dest_deduped_path(worker, batch_id)}'; - """ - ) - - def load_and_merge( - self, worker: str, batch_id: int, batch_items: List[GoldskyQueueItem] - ): - conn = self.conn - bucket_name = self.bucket_name - - base = f"gs://{bucket_name}" - - size = len(batch_items) + # Update the pointer table to the latest item's checkpoint + await self.update_pointer_table(context, current_checkpoint) - merged_table = f"merged_{worker}_{batch_id}" + in_flight = [] + count = 0 - # Start in reverse order and insert into the table - conn.sql( - f""" - CREATE TEMP TABLE {merged_table} - AS - SELECT {batch_items[0].checkpoint} AS _checkpoint, * - FROM read_parquet('{base}/{batch_items[-1].blob_name}') - """ - ) + current_checkpoint = item.checkpoint + item = self.queue.dequeue() - for batch_item in batch_items: - self.log.info(f"Inserting all items in {base}/{batch_item.blob_name}") - file_ref = f"{base}/{batch_item.blob_name}" - # TO DO CHECK FOR DUPES IN THE SAME CHECKPOINT - conn.sql( - f""" - INSERT INTO {merged_table} - SELECT {batch_item.checkpoint} AS _checkpoint, * - FROM read_parquet('{file_ref}') - """ + if len(in_flight) > 0: + context.log.debug( + f"Finalizing worker {self.worker} waiting for {len(in_flight)} blobs to process. Last checkpoint {current_checkpoint}", ) + progress = 0 + for coro in asyncio.as_completed(in_flight): + await coro + progress += 1 + context.log.debug(f"Worker[{self.worker}] progress: {progress}/{count}") + await self.update_pointer_table(context, current_checkpoint) + return self.worker + + def old_update_pointer_table(self, context: AssetExecutionContext, checkpoint: int): + context.log.debug(f"updating pointer table for {self.worker}") + config = self.config + worker = self.worker + job_id = self.job_id + + with self.bigquery.get_client() as client: + dest_table_ref = client.get_dataset(config.dataset_name).table( + f"{config.table_name}_{worker}" + ) + new = self.latest_checkpoint is None + try: + client.get_table(dest_table_ref) + except NotFound as exc: + if self.latest_checkpoint is not None: + raise exc + new = True + + wildcard_path = self.worker_wildcard_path + + if not new: + context.log.info("Merging into worker table") + client.query_and_wait( + f""" + LOAD DATA OVERWRITE `{config.project_id}.{config.dataset_name}.{config.table_name}_{worker}_{job_id}` + FROM FILES ( + format = "PARQUET", + uris = ["{wildcard_path}"] + ); + """ + ) + tx_query = f""" + BEGIN + BEGIN TRANSACTION; + INSERT INTO `{config.project_id}.{config.dataset_name}.{config.table_name}_{worker}` + SELECT * FROM `{config.project_id}.{config.dataset_name}.{config.table_name}_{worker}_{job_id}`; + + INSERT INTO `{self.pointer_table}` (worker, last_checkpoint) + VALUES ('{worker}', {checkpoint}); + COMMIT TRANSACTION; + EXCEPTION WHEN ERROR THEN + -- Roll back the transaction inside the exception handler. + SELECT @@error.message; + ROLLBACK TRANSACTION; + END; + """ + context.log.debug(f"query: {tx_query}") + client.query_and_wait(tx_query) + client.query_and_wait( + f""" + DROP TABLE `{config.project_id}.{config.dataset_name}.{config.table_name}_{worker}_{job_id}`; + """ + ) + else: + context.log.info("Creating new worker table") + query1 = f""" + LOAD DATA OVERWRITE `{config.project_id}.{config.dataset_name}.{config.table_name}_{worker}` + FROM FILES ( + format = "PARQUET", + uris = ["{wildcard_path}"] + ); + """ + context.log.debug(f"query: {query1}") + client.query_and_wait(query1) + rows = client.query_and_wait( + f""" + INSERT INTO `{self.pointer_table}` (worker, last_checkpoint) + VALUES ('{worker}', {checkpoint}); + """ + ) + context.log.info(rows) - conn.sql( - f""" - COPY {merged_table} TO '{self.full_dest_table_path(worker, batch_id)}'; - """ - ) - - conn.sql( - f""" - DROP TABLE {merged_table}; - """ - ) - self.log.info(f"Completed load and merge {batch_id}") - - -class MPGoldskyDuckDB: - @classmethod - def connect( - cls, - config: GoldskyConfig, - destination_path: str, - memory_limit: str = "1GB", - ): - conn = duckdb.connect() - conn.sql( - f""" - CREATE SECRET ( - TYPE GCS, - KEY_ID '{config.bucket_key_id}', - SECRET '{config.bucket_secret}' - ); - """ - ) - conn.sql(f"SET memory_limit = '{memory_limit}';") - conn.sql(f"SET enable_progress_bar = false;") - return cls(config.bucket_name, destination_path, conn) - - def __init__( - self, - bucket_name: str, - destination_path: str, - conn: duckdb.DuckDBPyConnection, + async def update_pointer_table( + self, context: AssetExecutionContext, checkpoint: int ): - self.destination_path = destination_path - self.bucket_name = bucket_name - self.conn = conn - - def full_dest_table_path(self, worker: str, batch_id: int): - return f"gs://{self.bucket_name}/{self.destination_path}/{worker}/table_{batch_id}.parquet" - - def full_dest_delete_path(self, worker: str, batch_id: int): - return f"gs://{self.bucket_name}/{self.destination_path}/{worker}/delete_{batch_id}.parquet" - - def full_dest_deduped_path(self, worker: str, batch_id: int): - return f"gs://{self.bucket_name}/{self.destination_path}/{worker}/deduped_{batch_id}.parquet" - - def wildcard_path(self, worker: str): - return ( - f"gs://{self.bucket_name}/{self.destination_path}/{worker}/table_*.parquet" + await asyncio.to_thread( + blocking_update_pointer_table, + context, + self.config, + self.bigquery, + self.job_id, + self.worker, + self.pointer_table, + checkpoint, + self.latest_checkpoint, + self.worker_wildcard_path, ) + self.latest_checkpoint = checkpoint - def load_and_add_checkpoint( - self, - worker: str, - batch_id: int, - blob_name: str, - checkpoint: int, - log: MultiProcessLogger, - ): - conn = self.conn - bucket_name = self.bucket_name - base = f"gs://{bucket_name}" - - # Start in reverse order and insert into the table - query = f""" - COPY ( - SELECT {checkpoint} as _checkpoint, * - FROM read_parquet('{base}/{blob_name}') - ) TO '{self.full_dest_table_path(worker, batch_id)}'; - """ - conn.sql(query) - - log.info(f"Completed load {blob_name}") - - -glob_gs_duck: MPGoldskyDuckDB | None = None -dagster_log: MultiProcessLogger | None = None - - -def mp_init(log: MultiProcessLogger, config: GoldskyConfig, destination_path: str): - global glob_gs_duck - global dagster_log - dagster_log = log - glob_gs_duck = MPGoldskyDuckDB.connect(config, destination_path, "2GB") - log.debug("inititalized worker") - - -def mp_run_load(item: MPWorkerItem): - glob_gs_duck.load_and_add_checkpoint( - item.worker, - item.checkpoint, - item.blob_name, - item.checkpoint, - dagster_log, - ) - - -async def mp_load_goldsky_worker( - job_id: str, +def blocking_update_pointer_table( context: AssetExecutionContext, config: GoldskyConfig, - gs_context: GoldskyContext, + bigquery: BigQueryResource, + job_id: str, worker: str, - queue: GoldskyQueue, - last_checkpoint_from_previous_run: Optional[int] = None, + pointer_table: str, + new_checkpoint: int, + latest_checkpoint: int | None, + wildcard_path: str, ): - context.log.info(f"starting the worker for {worker}") - item = queue.dequeue() - if not item: - context.log.info(f"nothing to load for worker {worker}") - return - last_checkpoint = item.checkpoint - 1 - destination_path = f"_temp/{job_id}" - log_queue = mp.Queue() - - # Create the pool - with ProcessPoolExecutor( - int(os.environ.get("GOLDSKY_PROCESS_POOL_SIZE", "10")), - initializer=mp_init, - initargs=( - MultiProcessLogger(log_queue), - config, - destination_path, - ), - ) as executor: - futures = [] - context.log.info("Starting the processing pool") - time.sleep(5) - - async def handle_logs(): - context.log.debug("starting multiproc log handler") - while True: - try: - log_item = log_queue.get(block=False) - if type(log_item) != list: - context.log.warn( - "received unexpected object in the multiproc logs" - ) - continue - else: - if log_item[0] == "debug": - context.log.debug(log_item[1]) - if log_item[0] == "info": - context.log.info(log_item[1]) - if log_item[0] == "warn": - context.log.warn(log_item[1]) - - except Empty: - pass - try: - await asyncio.sleep(0.1) - except asyncio.CancelledError: - break - - log_task = asyncio.create_task(handle_logs()) - - while item: - if item.checkpoint > last_checkpoint: - if item.checkpoint - 1 != last_checkpoint: - context.log.info( - "potentially missing or checkpoints number jumped unexpectedly. not erroring" - ) - else: - raise Exception( - f"Unexpected out of order checkpoints current: {item.checkpoint} last: {last_checkpoint}" - ) - if item.checkpoint % 10 == 0: - context.log.info(f"Processing {item.blob_name}") - last_checkpoint = item.checkpoint - - item = queue.dequeue() - if not item: - break - future = executor.submit( - mp_run_load, - MPWorkerItem( - worker=worker, - blob_name=item.blob_name, - checkpoint=item.checkpoint, - ), - ) - context.log.debug(f"wrapping future for {item.blob_name}") - futures.append(asyncio.wrap_future(future)) - await asyncio.gather(*futures) - - log_task.cancel() - - # Load all of the tables into bigquery - with gs_context.bigquery.get_client() as client: + with bigquery.get_client() as client: dest_table_ref = client.get_dataset(config.dataset_name).table( f"{config.table_name}_{worker}" ) - new = last_checkpoint_from_previous_run is None + new = latest_checkpoint is None try: client.get_table(dest_table_ref) except NotFound as exc: - if last_checkpoint_from_previous_run is not None: + if latest_checkpoint is not None: raise exc new = True - wildcard_path = ( - f"gs://{config.bucket_name}/{destination_path}/{worker}/table_*.parquet" - ) - if not new: context.log.info("Merging into worker table") client.query_and_wait( @@ -530,8 +334,8 @@ async def handle_logs(): INSERT INTO `{config.project_id}.{config.dataset_name}.{config.table_name}_{worker}` SELECT * FROM `{config.project_id}.{config.dataset_name}.{config.table_name}_{worker}_{job_id}`; - INSERT INTO `{config.project_id}.{config.dataset_name}.{config.table_name}_pointer_state` (worker, last_checkpoint) - VALUES ('{worker}', {last_checkpoint}); + INSERT INTO `{pointer_table}` (worker, last_checkpoint) + VALUES ('{worker}', {new_checkpoint}); COMMIT TRANSACTION; EXCEPTION WHEN ERROR THEN -- Roll back the transaction inside the exception handler. @@ -559,146 +363,229 @@ async def handle_logs(): client.query_and_wait(query1) rows = client.query_and_wait( f""" - INSERT INTO `{config.project_id}.{config.dataset_name}.{config.table_name}_pointer_state` (worker, last_checkpoint) - VALUES ('{worker}', {last_checkpoint}); + INSERT INTO `{pointer_table}` (worker, last_checkpoint) + VALUES ('{worker}', {new_checkpoint}); """ ) context.log.info(rows) -async def testing_goldsky( - context: AssetExecutionContext, bigquery: BigQueryResource, gcs: GCSResource -) -> MaterializeResult: - goldsky_re = re.compile( - os.path.join("goldsky", "optimism-traces") - + r"/(?P\d+-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-(?P\d+)-(?P\d+).parquet" - ) - gs_config = GoldskyConfig( - "opensource-observer", - "oso-dataset-transfer-bucket", - "oso_raw_sources", - "optimism_traces", - "block_number", - int(os.environ.get("GOLDSKY_BATCH_SIZE", "40")), - int(os.environ.get("GOLDSKY_CHECKPOINT_SIZE", "1000")), - os.environ.get("DUCKDB_GCS_KEY_ID"), - os.environ.get("DUCKDB_GCS_SECRET"), - ) - - gcs_client = gcs.get_client() - blobs = gcs_client.list_blobs("oso-dataset-transfer-bucket", prefix="goldsky") - - parsed_files = [] - gs_job_ids = set() - queues = GoldskyQueues(max_size=int(os.environ.get("GOLDSKY_MAX_QUEUE_SIZE", 200))) - - worker_status: Mapping[str, int] = {} - # Get the current state - with bigquery.get_client() as client: - try: - rows = client.query_and_wait( - f""" - SELECT worker, MAX(last_checkpoint) AS last_checkpoint - FROM `{gs_config.project_id}.{gs_config.dataset_name}.{gs_config.table_name}_pointer_state` - GROUP BY 1; - """ - ) - for row in rows: - context.log.debug(row) - worker_status[row.worker] = row.last_checkpoint - except NotFound: - context.log.info("No pointer status found. Will create the table later") - - for blob in blobs: - match = goldsky_re.match(blob.name) - if not match: - context.log.debug(f"skipping {blob.name}") - continue - parsed_files.append(match) - worker = match.group("worker") - gs_job_ids.add(match.group("job_id")) - checkpoint = int(match.group("checkpoint")) - if checkpoint <= worker_status.get(worker, -1): - context.log.debug(f"skipping {blob.name} as it was already processed") - continue - queues.enqueue( - worker, - GoldskyQueueItem( - checkpoint, - blob.name, - ), - ) +class GoldskyAsset: + @classmethod + def setup_asset(cls, name: str, config: GoldskyConfig): + @asset(name=name) + def goldsky_asset( + context: AssetExecutionContext, bigquery: BigQueryResource, gcs: GCSResource + ): + loop = asyncio.new_event_loop() + context.log.info(f"Job name?: {context.job_name}") + + last_restart = time.time() + retries = 0 + while True: + try: + gs_asset = cls(gcs, bigquery, config) + context.log.info(gs_asset.cluster_spec) + task_manager = RetryTaskManager.setup( + loop, + config.bucket_key_id, + config.bucket_secret, + gs_asset.cluster_spec, + context.log, + ) + try: + loop.run_until_complete( + gs_asset.materialize(task_manager, context) + ) + return + finally: + task_manager.close() + except Exception as e: + now = time.time() + if now > last_restart + 120: + last_restart = now + retries = 1 + continue + else: + if retries > 3: + raise e + context.log.error("kube cluster probably disconnected retrying") + retries += 1 - if len(gs_job_ids) > 1: - raise Exception("We aren't currently handling multiple job ids") + return goldsky_asset - gs_context = GoldskyContext(bigquery, gcs) + def __init__( + self, gcs: GCSResource, bigquery: BigQueryResource, config: GoldskyConfig + ): + self.config = config + self.gcs = gcs + self.bigquery = bigquery + self._task_manager = None + self._job_id = arrow.now().format("YYYYMMDDHHmm") + + async def materialize( + self, task_manager: RetryTaskManager, context: AssetExecutionContext + ): + context.log.info({"info": "starting goldsky asset", "config": self.config}) + self.ensure_pointer_table(context) - job_id = arrow.now().format("YYYYMMDDHHmm") + worker_status, queues = self.load_queues(context) - pointer_table = f"{gs_config.project_id}.{gs_config.dataset_name}.{gs_config.table_name}_pointer_state" + context.log.debug(f"spec: ${json.dumps(self.cluster_spec)}") - with bigquery.get_client() as client: - dataset = client.get_dataset(gs_config.dataset_name) - table_name = f"{gs_config.table_name}_pointer_state" - pointer_table_ref = dataset.table(table_name) - try: - client.get_table(pointer_table_ref) - except NotFound as exc: - if table_name in exc.message: - context.log.info("Pointer table not found.") - client.query_and_wait( + job_id = self._job_id + + worker_coroutines = [] + for worker_name, queue in queues.worker_queues(): + worker = GoldskyWorker( + worker_name, + job_id, + self.pointer_table, + worker_status.get(worker_name, None), + self.gcs, + self.bigquery, + self.config, + queue, + task_manager, + ) + worker_coroutines.append(worker.process(context)) + for coro in asyncio.as_completed(worker_coroutines): + worker = await coro + context.log.info(f"Worker[{worker}] Completed") + + # Dedupe and partition the current worker table into a deduped and partitioned table + # + + def dedupe(self): + pass + + def get_worker_status(self, context: AssetExecutionContext): + worker_status: Mapping[str, int] = {} + # Get the current state + with self.bigquery.get_client() as client: + try: + rows = client.query_and_wait( f""" - CREATE TABLE {pointer_table} (worker STRING, last_checkpoint INT64); + SELECT worker, MAX(last_checkpoint) AS last_checkpoint + FROM `{self.pointer_table}` + GROUP BY 1; """ ) - else: - raise exc + for row in rows: + context.log.debug(row) + worker_status[row.worker] = row.last_checkpoint + except NotFound: + context.log.info("No pointer status found. Will create the table later") + return worker_status - # gs_duckdb = GoldskyDuckDB.connect( - # f"_temp/{job_id}", - # gs_config.bucket_name, - # gs_config.bucket_key_id, - # gs_config.bucket_secret, - # os.environ.get("DAGSTER_DUCKDB_PATH"), - # context.log, - # os.environ.get("DUCKDB_MEMORY_LIMIT", "16GB"), - # ) - - # For each worker - for worker, queue in queues.worker_queues(): - context.log.info(f"Creating coroutines for worker {worker}") - await mp_load_goldsky_worker( - job_id, - context, - gs_config, - gs_context, - worker, - queue, - last_checkpoint_from_previous_run=worker_status.get(worker, None), + @property + def pointer_table(self): + return f"{self.config.project_id}.{self.config.dataset_name}.{self.config.table_name}_pointer_state" + + def ensure_pointer_table(self, context: AssetExecutionContext): + config = self.config + pointer_table = f"{config.project_id}.{config.dataset_name}.{config.table_name}_pointer_state" + + with self.bigquery.get_client() as client: + dataset = client.get_dataset(config.dataset_name) + table_name = f"{config.table_name}_pointer_state" + pointer_table_ref = dataset.table(table_name) + try: + client.get_table(pointer_table_ref) + except NotFound as exc: + if table_name in exc.message: + context.log.info("Pointer table not found.") + client.query_and_wait( + f""" + CREATE TABLE {pointer_table} (worker STRING, last_checkpoint INT64); + """ + ) + else: + raise exc + + @property + def cluster_spec(self): + spec = make_cluster_spec( + name=f"{self.config.asset_name.replace('_', '-')}-{self._job_id}", + resources={ + "requests": {"memory": self.config.scheduler_memory}, + "limits": {"memory": self.config.scheduler_memory}, + }, + image="ghcr.io/opensource-observer/dagster-dask:distributed-test-9", + ) + spec["spec"]["worker"]["spec"]["tolerations"] = [ + { + "key": "pool_type", + "effect": "NoSchedule", + "operator": "Equal", + "value": "spot", + } + ] + spec["spec"]["worker"]["spec"]["nodeSelector"] = {"pool_type": "spot"} + + # Give the workers a different resource allocation + for container in spec["spec"]["worker"]["spec"]["containers"]: + container["resources"] = { + "limits": { + "memory": self.config.worker_memory, + }, + "requests": { + "memory": self.config.worker_memory, + }, + } + if container["name"] == "worker": + args: List[str] = container["args"] + args.append("--nthreads") + args.append("1") + args.append("--nworkers") + args.append("1") + args.append("--memory-limit") + args.append("0") + return spec + + @property + def goldsky_re(self): + return re.compile( + os.path.join(self.config.goldsky_dir_path, self.config.asset_name) + + r"/(?P\d+-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-(?P\d+)-(?P\d+).parquet" ) - # await asyncio.gather(*worker_coroutines) + def load_queues(self, context: AssetExecutionContext) -> Tuple[dict, GoldskyQueues]: + gcs_client = self.gcs.get_client() - # Create a temporary table to load the current checkpoint + blobs = gcs_client.list_blobs("oso-dataset-transfer-bucket", prefix="goldsky") - # Check for duplicates between the current checkpoint and the existing table - # of data + parsed_files = [] + gs_job_ids = set() + queues = GoldskyQueues( + max_size=int(os.environ.get("GOLDSKY_MAX_QUEUE_SIZE", 10000)) + ) - # In a transaction - # -- Merge the dataset into the main table - # -- Update a source pointer for the current checkpoint (so we don't reprocess) - # -- Delete the temporary table + worker_status = self.get_worker_status(context) + + for blob in blobs: + match = self.goldsky_re.match(blob.name) + if not match: + # context.log.debug(f"skipping {blob.name}") + continue + parsed_files.append(match) + worker = match.group("worker") + gs_job_ids.add(match.group("job_id")) + checkpoint = int(match.group("checkpoint")) + if checkpoint <= worker_status.get(worker, -1): + # context.log.debug(f"skipping {blob.name} as it was already processed") + continue + queues.enqueue( + worker, + GoldskyQueueItem( + checkpoint, + blob.name, + ), + ) - # TODOS - # Move the data into cold storage + for worker, queue in queues.worker_queues(): + context.log.debug(f"Worker[{worker}] queue size: {queue.len()}") - return MaterializeResult( - metadata=dict( - job_id_count=len(gs_job_ids), - job_ids=list(gs_job_ids), - worker_count=len(queues.workers()), - workers=list(queues.workers()), - status=queues.status(), - ) - ) + if len(gs_job_ids) > 1: + raise Exception("We aren't currently handling multiple job ids") + return (worker_status, queues) diff --git a/warehouse/oso_dagster/goldsky_dask.py b/warehouse/oso_dagster/goldsky_dask.py new file mode 100644 index 000000000..50315b652 --- /dev/null +++ b/warehouse/oso_dagster/goldsky_dask.py @@ -0,0 +1,195 @@ +import time +import asyncio +import duckdb +import copy +from dataclasses import dataclass, field +from typing import Any +from dask.distributed import ( + Client, + Worker, + WorkerPlugin, + get_worker, + Future as DaskFuture, +) +from dask_kubernetes.operator import KubeCluster, make_cluster_spec +from dagster import DagsterLogManager +from concurrent.futures import CancelledError + + +class RetryTaskManager: + client: Client | None + loop: asyncio.AbstractEventLoop + cluster: KubeCluster | None + log: DagsterLogManager + bucket_key_id: str + bucket_secret: str + cluster_spec: dict + + @classmethod + def setup( + cls, + loop: asyncio.AbstractEventLoop, + bucket_key_id: str, + bucket_secret: str, + cluster_spec: dict, + log: DagsterLogManager, + ): + task_manager = cls( + loop, + bucket_key_id, + bucket_secret, + cluster_spec, + log, + ) + task_manager.reconnect() + return task_manager + + def __init__( + self, + loop: asyncio.AbstractEventLoop, + bucket_key_id: str, + bucket_secret: str, + cluster_spec: dict, + log: DagsterLogManager, + ): + self.log = log + self.loop = loop + self.bucket_key_id = bucket_key_id + self.bucket_secret = bucket_secret + self.cluster_spec = cluster_spec + self.client = None + self._reconnection_time = 0 + + def reconnect(self): + self.log.debug("reconnecting") + if self.client: + self.client.close() + self.cluster.close() + + cluster_spec = copy.deepcopy(self.cluster_spec) + cluster_spec["metadata"]["name"] = ( + cluster_spec["metadata"]["name"] + f"-{time.time()}" + ) + + self.cluster = KubeCluster(custom_cluster_spec=self.cluster_spec) + self.cluster.adapt(minimum=8, maximum=50) + self.client = Client(self.cluster) + self.client.register_plugin( + DuckDBGCSPlugin( + self.bucket_key_id, + self.bucket_secret, + ), + name="duckdb-gcs", + ) + self.reconnection_time = time.time() + self._reconnection_queued = False + self.log.debug("reconnection completed") + + def wrap_future(self, f: DaskFuture): + loop = self.loop + aio_future = loop.create_future() + + def on_done(*_): + try: + result = f.result() + except Exception as e: + loop.call_soon_threadsafe(aio_future.set_exception, e) + else: + loop.call_soon_threadsafe(aio_future.set_result, result) + + f.add_done_callback(on_done) + return aio_future + + async def submit( + self, + func, + args: Any | None = None, + kwargs: Any | None = None, + pure: bool = True, + retries: int = 3, + retry_delay: float = 0.5, + ): + kwargs = kwargs or {} + # return self.wrap_future(self.client.submit(func, *args, **kwargs, pure=pure)) + retry_count = 0 + last_err = None + while True: + try: + return await self.wrap_future( + self.client.submit(func, *args, **kwargs, pure=pure) + ) + except CancelledError as e: + # If this is cancelled treat that as an immediate bail out + raise e + except asyncio.CancelledError as e: + # If this is cancelled treat that as an immediate bail out + raise e + except Exception as e: + self.log.error(f"Caught an error on a worker. Retrying {e} {type(e)}") + last_err = e + if retry_count >= retries: + if last_err: + raise last_err + retry_count += 1 + await asyncio.sleep(retry_delay) + retry_delay += retry_delay + + def close(self): + if self.client: + self.client.close() + if self.cluster: + self.cluster.close() + + +async def setup_kube_cluster_client( + bucket_key_id: str, bucket_secret: str, cluster_spec: dict +): + cluster = KubeCluster(custom_cluster_spec=cluster_spec) + cluster.adapt(minimum=8, maximum=50) + client = Client(cluster) + client.register_plugin( + DuckDBGCSPlugin( + bucket_key_id, + bucket_secret, + ), + name="duckdb-gcs", + ) + return client + + +class DuckDBGCSPlugin(WorkerPlugin): + def __init__(self, key_id: str, secret: str, memory_limit: str = "2GB"): + self.key_id = key_id + self.secret = secret + self.memory_limit = memory_limit + self._conn = None + + def setup(self, worker: Worker): + conn = duckdb.connect() + worker.log_event( + "info", {"message": "Initializing worker", "key_id": self.key_id} + ) + conn.sql( + f""" + CREATE SECRET ( + TYPE GCS, + KEY_ID '{self.key_id}', + SECRET '{self.secret}' + ); + """ + ) + conn.sql(f"SET memory_limit = '{self.memory_limit}';") + conn.sql(f"SET enable_progress_bar = false;") + worker.log_event( + "info", {"message": "duckdb ready", "memory_limit": self.memory_limit} + ) + self._conn = conn + + def teardown(self, worker: Worker): + self.conn.close() + + @property + def conn(self) -> duckdb.DuckDBPyConnection: + if not self._conn: + raise Exception("Duckdb connection not established") + return self._conn diff --git a/warehouse/oso_dagster/models/dedupe.sql b/warehouse/oso_dagster/models/dedupe.sql new file mode 100644 index 000000000..4a8cfd95c --- /dev/null +++ b/warehouse/oso_dagster/models/dedupe.sql @@ -0,0 +1,3 @@ +SELECT {{ source.sql_columns(exclude=["_checkpoint"]) }} +FROM {{ source.sql_from() }} +QUALIFY ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `_checkpoint` DESC) = 1 \ No newline at end of file