Skip to content

Commit

Permalink
Add custom info field to ExternalResourceInfo (#6115)
Browse files Browse the repository at this point in the history
* Send actor worker assignment in task events (#394)(#393)

This change adds a new `FastTaskAssignment` message type to fast task protos, which is used to send actor assignment info to admin via task events.

While here I also hardened linting on our protos and updates existing protos to conform. This matches the restrictions we have in place in unionai/cloud/idl

- [x] Add and run unittests
- [x] Run locally and verify worker assignment info comes through in task events

```
❯ curl -s http://localhost:8088/api/v1/task_executions/flytesnacks/development/f0dcb5e7cae6d47acaf3/n0\?limit\=100 | jq '.taskExecutions[0].closure.metadata.externalResources'
[
  {
    "externalId": "",
    "index": 0,
    "retryAttempt": 0,
    "phase": "UNDEFINED",
    "cacheStatus": "CACHE_DISABLED",
    "logs": [],
    "customInfo": {
      "assignedWorker": "example-2b377271ee80",
      "environmentId": "flytesnacks_development_example_c183cb6c9868d96"
    }
  }
]

❯ curl -s http://localhost:8088/api/v1/task_executions/flytesnacks/development/f0dcb5e7cae6d47acaf3/n1\?limit\=100 | jq '.taskExecutions[0].closure.metadata.externalResources'
[
  {
    "externalId": "",
    "index": 0,
    "retryAttempt": 0,
    "phase": "UNDEFINED",
    "cacheStatus": "CACHE_DISABLED",
    "logs": [],
    "customInfo": {
      "assignedWorker": "example-2b377271ee80",
      "environmentId": "flytesnacks_development_example_c183cb6c9868d96"
    }
  }
]
```

One merged, bring to cloud. Will be a no-op initially, but we can use this in a few dependent tasks to surface logs or look up other fast task worker info.

Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [ ] To be upstreamed to OSS

ref COR-1581

Add custom info field to ExternalResourceInfo
Adds a custom info field to `ExternalResourceInfo`. This is an extension of the [ExternalId](https://github.com/unionai/flyte/blob/2b6dfcaedab7ed6a7606d1434211086d064c6560/flyteidl/protos/flyteidl/event/event.proto#L269) field and can be used to pass plugin specific context or identifiers. We will use this to pass fast task worker assignment.

N/A

Just merge, not used yet

Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed to OSS

ref COR-1581

Closes #393

Signed-off-by: Daniel Rammer <[email protected]>

* removed unused time import

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
Co-authored-by: Andrew Dye <[email protected]>
  • Loading branch information
hamersaw and andrewwdye authored Dec 18, 2024
1 parent 87ef4a0 commit 3c03cff
Show file tree
Hide file tree
Showing 13 changed files with 275 additions and 63 deletions.
6 changes: 6 additions & 0 deletions flyteadmin/pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
_struct "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"

"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
Expand Down Expand Up @@ -310,6 +311,11 @@ func mergeExternalResource(existing, latest *event.ExternalResourceInfo) *event.
}
existing.Logs = mergeLogs(existing.GetLogs(), latest.GetLogs())

// Overwrite custom info if provided
if latest.GetCustomInfo() != nil {
existing.CustomInfo = proto.Clone(latest.GetCustomInfo()).(*structpb.Struct)
}

return existing
}

Expand Down
10 changes: 10 additions & 0 deletions flyteidl/clients/go/assets/admin.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/event/event_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

163 changes: 110 additions & 53 deletions flyteidl/gen/pb-go/flyteidl/event/event.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions flyteidl/gen/pb-js/flyteidl.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3c03cff

Please sign in to comment.