-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[bug] Fix issue where metadata got discarded in SourceAsset conversion #22862
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sryza
approved these changes
Jul 8, 2024
danielgafni
added a commit
to danielgafni/dagster
that referenced
this pull request
Jul 9, 2024
remove vestigial T_Handle (dagster-io#22772) This `TypeVar` is unused in `graph_definition.py` and has no effect in `op_definition.py`. [bug] Fix issue where metadata got discarded in SourceAsset conversion (dagster-io#22862) Resolves: dagster-io#22789 Fixes issue which I believe was introduced by: dagster-io#22165 (didn't dig that hard) The core problem is this conversion: https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster/dagster/_core/definitions/external_asset.py?L176 `observable_source_asset` does not set metadata on the `OutputDefinition` of its internal op. So when we initially convert a SourceAsset into an ol' fashioned `AssetsDefinition`, then call `to_source_assets()` on that, we end up losing the metadata field, as we were trying to pull it off of the `OutputDefinition`. This PR just pulls it off of the spec instead. Added test fails before, passes now. [record] pickle-ability (dagster-io#22861) support pickling for records since we need it in some places added test [RFC] Dagster Celery Runner (dagster-io#22601) Hello, I would like to propose a new feature to Dagster celery library by introducing Dagster Celery Runner. The PR also includes changes described in this RFC. We have been running this code in production for a while already and are confident that it is polished enough to be considered to be included as part of Dagster. Our workload consists of: * Multiple assets, each with on average 300 partitions. * Hourly schedule to materialize these assets (for each partition). * Each materialization job takes between 10 and 50 seconds. Using Dagster K8s run launcher along with single process executor, we have noticed that: * The workload puts too much load on our K8s cluster. * Startup times become significant due to the small size of our jobs. Without overcomplicating the solution, this setup did not seem as the way to go. ![K8s Runner drawio](https://github.com/dagster-io/dagster/assets/4254771/8a6805b6-accd-45e3-b8dc-37cec54e8561) Our goal was to create setup which: * Supports our workload. * Minimizes the run launching overhead on our infrastructure. * Maintains low job startup latency. * At-least-once execution strategy (runners like k8s give us) * Ability to scale horizontally. * Run level isolation. * Minimize downtime during deployment. Non-goals: * Op level isolation Original [dagster-celery](https://docs.dagster.io/deployment/guides/celery) package provides an alternative method of launching runs by dividing op execution across different workers. This approach still relies on a runner to start the run, within which the executor passes the tasks onto individual celery workers. Using it together with K8s executor would not solve our problem, and combining it with Default Runner (starts runs on using the code server) would introduce new scalability and deployment issues (see diagram). ![Celery Executor drawio](https://github.com/dagster-io/dagster/assets/4254771/01e5e73e-c2af-4b33-a401-435bbb07061a) While measuring performance of Dagster Celery executor, we did conclude that it did have the desired run startup latency and potential scalability. Our proposed approach introduces a Dagster Celery runner which mimics the way K8s runner works, but starts the runs directly on Dagster celery workers (see diagram). ![Celery Runner drawio](https://github.com/dagster-io/dagster/assets/4254771/3005bbd4-097b-4761-addd-28fb08bd182e) The main difference in this approach is that instead of launching a process containing an executor to monitor the steps, the task is submitted directly to the Celery Broker which in turn finds a worker to start the execution. Once a task has been submitted to celery the daemon monitors the process using standard job monitoring requests which run at a configured interval. From celery backend the monitoring can read whether a job is 'in progress', 'done' or an alternative state. Our production setup runs workers on k8s and uses stateless Redis as both Celery Broker and Backend. There are three edge cases when using Celery with stateless Broker and Backend: 1. Redis broker crashes midway before starting the job, causing queue to drop jobs before they are picked up. 2. Redis backend crashes before the success result is communicated. 3. The worker is forcibly terminated, thus not communicating the result to the celery backend. We mitigate case (1) by setting a sensible `start_timeout_seconds` on for run monitoring. If a job is not started in time, it will be marked as failed and caught by the retry policy. Case (2) is handled out of the box as the worker still communicates the job status to Dagster run storage. In case a job is marked as completed in Dagster, the monitoring stops. To handle case (3) we enforce a mandatory `dagster/max_runtime` tag on all jobs as it is otherwise impossible to distinguish a very slow job from a terminated one. Because the workers facilitate job execution, scaling is as easy as adding more workers. The job monitoring is cheap because it requires a call to Redis backend from the monitoring daemon. To make the solution more flexible, we add a run level tag specifying the celery queue to start the job on, thus allowing different workers to process different runs (akin to current op level queue tag). Celery supports warm shutdown, meaning a worker will stop accepting new tasks once it receives a SIGTERM signal, but will finish existing ones. We combine this feature with k8s termination grace period and default rolling update to spin up new workers to pick up the new tasks. Using celery instead of k8s loses the resource isolation jobs have. We use default prefork pool along with `max-memory-per-child` and `max-tasks-per-child` settings to introduce soft limits on worker processes. * Created new tests based on the default runner tests. * Created example docker-compose. --------- Signed-off-by: Egor Dmitriev <[email protected]> [asset differ] add "asset removed" change reason (dagster-io#22870) Allows the asset graph differ to mark an asset as removed in the case that an asset is present in the base branch but not the comparison branch. In the branch deploymenet case this is currently not used, but is useful for making the asset differ useful in other use-cases (e.g. tracking change history on a prod branch). New unit test.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary & Motivation
Resolves: #22789
Fixes issue which I believe was introduced by: #22165 (didn't dig that hard)
The core problem is this conversion: https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster/dagster/_core/definitions/external_asset.py?L176
observable_source_asset
does not set metadata on theOutputDefinition
of its internal op. So when we initially convert a SourceAsset into an ol' fashionedAssetsDefinition
, then callto_source_assets()
on that, we end up losing the metadata field, as we were trying to pull it off of theOutputDefinition
. This PR just pulls it off of the spec instead.How I Tested These Changes
Added test fails before, passes now.