Skip to content

Commit

Permalink
Allow filtering event logs by attributes (apache#34417)
Browse files Browse the repository at this point in the history
Co-authored-by: Hussein Awala <[email protected]>
  • Loading branch information
hainenber and hussein-awala authored Sep 27, 2023
1 parent db89a33 commit 3189ebe
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 0 deletions.
22 changes: 22 additions & 0 deletions airflow/api_connexion/endpoints/event_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
)
from airflow.models import Log
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:

from sqlalchemy.orm import Session

from airflow.api_connexion.types import APIResponse
Expand All @@ -53,6 +55,12 @@ def get_event_log(*, event_log_id: int, session: Session = NEW_SESSION) -> APIRe
@provide_session
def get_event_logs(
*,
dag_id: str | None = None,
task_id: str | None = None,
owner: str | None = None,
event: str | None = None,
before: str | None = None,
after: str | None = None,
limit: int,
offset: int | None = None,
order_by: str = "event_log_id",
Expand All @@ -72,6 +80,20 @@ def get_event_logs(
]
total_entries = session.scalars(func.count(Log.id)).one()
query = select(Log)

if dag_id:
query = query.where(Log.dag_id == dag_id)
if task_id:
query = query.where(Log.task_id == task_id)
if owner:
query = query.where(Log.owner == owner)
if event:
query = query.where(Log.event == event)
if before:
query = query.where(Log.dttm < timezone.parse(before))
if after:
query = query.where(Log.dttm > timezone.parse(after))

query = apply_sorting(query, order_by, to_replace, allowed_filter_attrs)
event_logs = session.scalars(query.offset(offset).limit(limit)).all()
return event_log_collection_schema.dump(
Expand Down
56 changes: 56 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,12 @@ paths:
- $ref: '#/components/parameters/PageLimit'
- $ref: '#/components/parameters/PageOffset'
- $ref: '#/components/parameters/OrderBy'
- $ref: '#/components/parameters/FilterDAGID'
- $ref: '#/components/parameters/FilterTaskID'
- $ref: '#/components/parameters/Event'
- $ref: '#/components/parameters/Owner'
- $ref: '#/components/parameters/Before'
- $ref: '#/components/parameters/After'
responses:
'200':
description: Success.
Expand Down Expand Up @@ -4810,6 +4816,40 @@ components:
required: true
description: The task ID.

Event:
in: query
name: event
schema:
type: string
required: false
description: The name of event log.

Owner:
in: query
name: owner
schema:
type: string
required: false
description: The owner's name of event log.

Before:
in: query
name: before
schema:
type: string
format: date-time
required: false
description: Timestamp to select event logs occurring before.

After:
in: query
name: after
schema:
type: string
format: date-time
required: false
description: Timestamp to select event logs occurring after.

MapIndex:
in: path
name: map_index
Expand Down Expand Up @@ -5147,6 +5187,22 @@ components:
required: false
description: Only filter the XCom records which have the provided key.

FilterDAGID:
in: query
name: dag_id
schema:
type: string
required: false
description: Returns objects matched by the DAG ID.

FilterTaskID:
in: query
name: task_id
schema:
type: string
required: false
description: Returns objects matched by the Task ID.

# Other parameters
FileToken:
in: path
Expand Down
24 changes: 24 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2287,6 +2287,14 @@ export interface components {
DAGID: string;
/** @description The task ID. */
TaskID: string;
/** @description The name of event log. */
Event: string;
/** @description The owner's name of event log. */
Owner: string;
/** @description Timestamp to select event logs occurring before. */
Before: string;
/** @description Timestamp to select event logs occurring after. */
After: string;
/** @description The map index. */
MapIndex: number;
/** @description The DAG run ID. */
Expand Down Expand Up @@ -2424,6 +2432,10 @@ export interface components {
Paused: boolean;
/** @description Only filter the XCom records which have the provided key. */
FilterXcomKey: string;
/** @description Returns objects matched by the DAG ID. */
FilterDAGID: string;
/** @description Returns objects matched by the Task ID. */
FilterTaskID: string;
/**
* @description The key containing the encrypted path to the file. Encryption and decryption take place only on
* the server. This prevents the client from reading an non-DAG file. This also ensures API
Expand Down Expand Up @@ -3187,6 +3199,18 @@ export interface operations {
* *New in version 2.1.0*
*/
order_by?: components["parameters"]["OrderBy"];
/** Returns objects matched by the DAG ID. */
dag_id?: components["parameters"]["FilterDAGID"];
/** Returns objects matched by the Task ID. */
task_id?: components["parameters"]["FilterTaskID"];
/** The name of event log. */
event?: components["parameters"]["Event"];
/** The owner's name of event log. */
owner?: components["parameters"]["Owner"];
/** Timestamp to select event logs occurring before. */
before?: components["parameters"]["Before"];
/** Timestamp to select event logs occurring after. */
after?: components["parameters"]["After"];
};
};
responses: {
Expand Down
41 changes: 41 additions & 0 deletions tests/api_connexion/endpoints/test_event_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,47 @@ def test_should_raises_401_unauthenticated(self, log_model):

assert_401(response)

def test_should_filter_eventlogs_by_allowed_attributes(self, create_log_model, session):
eventlog1 = create_log_model(
event="TEST_EVENT_1",
dag_id="TEST_DAG_ID_1",
task_id="TEST_TASK_ID_1",
owner="TEST_OWNER_1",
when=self.default_time,
)
eventlog2 = create_log_model(
event="TEST_EVENT_2",
dag_id="TEST_DAG_ID_2",
task_id="TEST_TASK_ID_2",
owner="TEST_OWNER_2",
when=self.default_time_2,
)
session.add_all([eventlog1, eventlog2])
session.commit()
for attr in ["dag_id", "task_id", "owner", "event"]:
attr_value = f"TEST_{attr}_1".upper()
response = self.client.get(
f"/api/v1/eventLogs?{attr}={attr_value}", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
assert {eventlog[attr] for eventlog in response.json["event_logs"]} == {attr_value}

def test_should_filter_eventlogs_by_when(self, create_log_model, session):
eventlog1 = create_log_model(event="TEST_EVENT_1", when=self.default_time)
eventlog2 = create_log_model(event="TEST_EVENT_2", when=self.default_time_2)
session.add_all([eventlog1, eventlog2])
session.commit()
for when_attr, expected_eventlogs in {
"before": {"TEST_EVENT_1"},
"after": {"TEST_EVENT_2"},
}.items():
response = self.client.get(
f"/api/v1/eventLogs?{when_attr}=2020-06-10T20%3A00%3A01%2B00%3A00", # self.default_time + 1s
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
assert {eventlog["event"] for eventlog in response.json["event_logs"]} == expected_eventlogs


class TestGetEventLogPagination(TestEventLogEndpoint):
@pytest.mark.parametrize(
Expand Down

0 comments on commit 3189ebe

Please sign in to comment.