diff --git a/dlt/cli/utils.py b/dlt/cli/utils.py index e829ff9a53..37508d8af4 100644 --- a/dlt/cli/utils.py +++ b/dlt/cli/utils.py @@ -1,19 +1,15 @@ import ast -import inspect import os import tempfile -import time -import contextlib -from typing import Any, Callable, Tuple +from typing import Callable from dlt.common import git from dlt.common.reflection.utils import set_ast_parents from dlt.common.storages import FileStorage from dlt.common.typing import TFun -from dlt.common.runtime.telemetry import start_telemetry -from dlt.common.runtime.segment import track from dlt.common.configuration import resolve_configuration from dlt.common.configuration.specs import RunConfiguration +from dlt.common.runtime.telemetry import with_telemetry from dlt.reflection.script_visitor import PipelineScriptVisitor @@ -62,39 +58,7 @@ def ensure_git_command(command: str) -> None: def track_command(command: str, track_before: bool, *args: str) -> Callable[[TFun], TFun]: - """Adds telemetry to f: TFun and add optional f *args values to `properties` of telemetry event""" - def decorator(f: TFun) -> TFun: - sig: inspect.Signature = inspect.signature(f) - def _wrap(*f_args: Any, **f_kwargs: Any) -> Any: - # look for additional arguments - bound_args = sig.bind(*f_args, **f_kwargs) - props = {p:bound_args.arguments[p] for p in args if p in bound_args.arguments} - start_ts = time.time() - - def _track(success: bool) -> None: - with contextlib.suppress(Exception): - props["elapsed"] = time.time() - start_ts - props["success"] = success - # resolve runtime config and init telemetry - c = resolve_configuration(RunConfiguration()) - start_telemetry(c) - track("command", command, props) - - # some commands should be tracked before execution - if track_before: - _track(True) - return f(*f_args, **f_kwargs) - # some commands we track after, where we can pass the success - try: - rv = f(*f_args, **f_kwargs) - _track(rv == 0) - return rv - except Exception: - _track(False) - raise - - return _wrap # type: ignore - return decorator + return with_telemetry("command", command, track_before, *args) def get_telemetry_status() -> bool: diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index 2df1590ae1..6162dbce59 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -1,7 +1,6 @@ import os import datetime # noqa: 251 import humanize -import inspect import contextlib from typing import Any, Callable, ClassVar, Dict, List, NamedTuple, Optional, Protocol, Sequence, TYPE_CHECKING, Tuple, TypedDict @@ -25,8 +24,16 @@ from dlt.common.data_writers.writers import TLoaderFileFormat +class ExtractDataInfo(TypedDict): + name: str + data_type: str + + class ExtractInfo(NamedTuple): """A tuple holding information on extracted data items. Returned by pipeline `extract` method.""" + + extract_data_info: List[ExtractDataInfo] + def asdict(self) -> DictStrAny: return {} @@ -209,7 +216,8 @@ def __call__( table_name: str = None, write_disposition: TWriteDisposition = None, columns: Sequence[TColumnSchema] = None, - schema: Schema = None + schema: Schema = None, + loader_file_format: TLoaderFileFormat = None ) -> LoadInfo: ... diff --git a/dlt/common/runtime/segment.py b/dlt/common/runtime/segment.py index 9885c1355c..b8d533cccb 100644 --- a/dlt/common/runtime/segment.py +++ b/dlt/common/runtime/segment.py @@ -13,13 +13,14 @@ from dlt.common.configuration.paths import get_dlt_data_dir from dlt.common.runtime import logger + from dlt.common.configuration.specs import RunConfiguration from dlt.common.runtime.exec_info import exec_info_names, in_continuous_integration from dlt.common.typing import DictStrAny, StrAny from dlt.common.utils import uniq_id from dlt.version import __version__, DLT_PKG_NAME -TEventCategory = Literal["pipeline", "command"] +TEventCategory = Literal["pipeline", "command", "helper"] _THREAD_POOL: ThreadPoolExecutor = None _SESSION: requests.Session = None @@ -202,9 +203,10 @@ def _send_event( headers = _segment_request_header(_WRITE_KEY) def _future_send() -> None: + # import time # start_ts = time.time() resp = _SESSION.post(_SEGMENT_ENDPOINT, headers=headers, json=payload, timeout=_SEGMENT_REQUEST_TIMEOUT) - # print(f"sending to Segment done {resp.status_code} {time.time() - start_ts}") + # print(f"SENDING TO Segment done {resp.status_code} {time.time() - start_ts} {base64.b64decode(_WRITE_KEY)}") # handle different failure cases if resp.status_code != 200: logger.debug( diff --git a/dlt/common/runtime/telemetry.py b/dlt/common/runtime/telemetry.py index e1488d37bb..5cd3d835fa 100644 --- a/dlt/common/runtime/telemetry.py +++ b/dlt/common/runtime/telemetry.py @@ -1,17 +1,24 @@ +import time +import contextlib +import inspect +from typing import Any, Callable + from dlt.common.configuration.specs import RunConfiguration -from dlt.common.runtime.segment import init_segment, disable_segment +from dlt.common.typing import TFun +from dlt.common.configuration import resolve_configuration +from dlt.common.runtime.segment import TEventCategory, init_segment, disable_segment, track from dlt.common.runtime.sentry import init_sentry, disable_sentry -_TELEMETRY_ENABLED = False +_TELEMETRY_STARTED = False def start_telemetry(config: RunConfiguration) -> None: # enable telemetry only once - global _TELEMETRY_ENABLED - if _TELEMETRY_ENABLED: + global _TELEMETRY_STARTED + if _TELEMETRY_STARTED: return if config.sentry_dsn: @@ -20,15 +27,61 @@ def start_telemetry(config: RunConfiguration) -> None: if config.dlthub_telemetry: init_segment(config) - _TELEMETRY_ENABLED = True + _TELEMETRY_STARTED = True def stop_telemetry() -> None: - global _TELEMETRY_ENABLED - if not _TELEMETRY_ENABLED: + global _TELEMETRY_STARTED + if not _TELEMETRY_STARTED: return disable_sentry() disable_segment() - _TELEMETRY_ENABLED = False + _TELEMETRY_STARTED = False + + +def is_telemetry_started() -> bool: + return _TELEMETRY_STARTED + + +def with_telemetry(category: TEventCategory, command: str, track_before: bool, *args: str) -> Callable[[TFun], TFun]: + """Adds telemetry to f: TFun and add optional f *args values to `properties` of telemetry event""" + def decorator(f: TFun) -> TFun: + sig: inspect.Signature = inspect.signature(f) + def _wrap(*f_args: Any, **f_kwargs: Any) -> Any: + # look for additional arguments + bound_args = sig.bind(*f_args, **f_kwargs) + props = {p:bound_args.arguments[p] for p in args if p in bound_args.arguments} + start_ts = time.time() + + def _track(success: bool) -> None: + with contextlib.suppress(Exception): + props["elapsed"] = time.time() - start_ts + props["success"] = success + # resolve runtime config and init telemetry + if not _TELEMETRY_STARTED: + c = resolve_configuration(RunConfiguration()) + start_telemetry(c) + track(category, command, props) + + # some commands should be tracked before execution + if track_before: + _track(True) + return f(*f_args, **f_kwargs) + # some commands we track after, where we can pass the success + try: + rv = f(*f_args, **f_kwargs) + # if decorated function returns int, 0 is a success - used to track dlt commands + if isinstance(rv, int): + success = rv == 0 + else: + success = True + _track(success) + return rv + except Exception: + _track(False) + raise + + return _wrap # type: ignore + return decorator \ No newline at end of file diff --git a/dlt/helpers/airflow_helper.py b/dlt/helpers/airflow_helper.py index 456fdbe05f..cd6c4fdf5b 100644 --- a/dlt/helpers/airflow_helper.py +++ b/dlt/helpers/airflow_helper.py @@ -4,11 +4,11 @@ from tenacity import retry_if_exception, wait_exponential, stop_after_attempt, Retrying, RetryCallState from dlt.common.exceptions import MissingDependencyException +from dlt.common.runtime.telemetry import with_telemetry try: from airflow.configuration import conf from airflow.utils.task_group import TaskGroup - #from airflow.decorators import task from airflow.operators.python import PythonOperator from airflow.operators.python import get_current_context except ImportError: @@ -118,6 +118,7 @@ def __init__( if ConfigProvidersContext in Container(): del Container()[ConfigProvidersContext] + @with_telemetry("helper", "airflow_add_run", False, "decompose") def add_run( self, pipeline: Pipeline, diff --git a/dlt/helpers/dbt/runner.py b/dlt/helpers/dbt/runner.py index 4c8a7920d0..2e857b2256 100644 --- a/dlt/helpers/dbt/runner.py +++ b/dlt/helpers/dbt/runner.py @@ -19,6 +19,8 @@ from dlt.helpers.dbt.configuration import DBTRunnerConfiguration from dlt.helpers.dbt.exceptions import IncrementalSchemaOutOfSyncError, PrerequisitesException, DBTNodeResult, DBTProcessingError +from dlt.common.runtime.telemetry import with_telemetry + class DBTPackageRunner: """A Python wrapper over a dbt package @@ -256,6 +258,7 @@ def run_all(self, raise +@with_telemetry("helper", "dbt_create_runner", False, "package_profile_name") @with_config(spec=DBTRunnerConfiguration, sections=(known_sections.DBT_PACKAGE_RUNNER,)) def create_runner( venv: Venv, diff --git a/dlt/pipeline/helpers.py b/dlt/pipeline/helpers.py index 51aab50a14..dc2a24ccc3 100644 --- a/dlt/pipeline/helpers.py +++ b/dlt/pipeline/helpers.py @@ -1,19 +1,18 @@ import contextlib -from typing import Callable, Sequence, Iterable, Optional, Any, List, Iterator, Dict, Union, TypedDict +from typing import Callable, Sequence, Iterable, Optional, Any, List, Dict, Tuple, Union, TypedDict from itertools import chain from dlt.common.jsonpath import resolve_paths, TAnyJsonPath, compile_paths - from dlt.common.exceptions import TerminalException -from dlt.common.schema.utils import get_child_tables, group_tables_by_resource, compile_simple_regexes, compile_simple_regex +from dlt.common.schema.utils import group_tables_by_resource, compile_simple_regexes, compile_simple_regex from dlt.common.schema.typing import TSimpleRegex from dlt.common.typing import REPattern -from dlt.destinations.exceptions import DatabaseUndefinedRelation +from dlt.common.pipeline import TSourceState, _reset_resource_state, _sources_state, _delete_source_state_keys, _get_matching_resources +from dlt.destinations.exceptions import DatabaseUndefinedRelation from dlt.pipeline.exceptions import PipelineStepFailed, PipelineHasPendingDataException from dlt.pipeline.typing import TPipelineStep from dlt.pipeline import Pipeline -from dlt.common.pipeline import TSourceState, _reset_resource_state, _sources_state, _delete_source_state_keys, _get_matching_resources def retry_load(retry_on_pipeline_steps: Sequence[TPipelineStep] = ("load",)) -> Callable[[BaseException], bool]: diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index ce82e9be3b..d061285e4e 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -24,6 +24,7 @@ from dlt.common.storages import LiveSchemaStorage, NormalizeStorage, LoadStorage, SchemaStorage, FileStorage, NormalizeStorageConfiguration, SchemaStorageConfiguration, LoadStorageConfiguration from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import DestinationReference, JobClientBase, DestinationClientConfiguration, DestinationClientDwhConfiguration, TDestinationReferenceArg, DestinationClientStagingConfiguration, DestinationClientDwhConfiguration +from dlt.common.destination.capabilities import INTERNAL_LOADER_FILE_FORMATS from dlt.common.pipeline import ExtractInfo, LoadInfo, NormalizeInfo, PipelineContext, SupportsPipeline, TPipelineLocalState, TPipelineState, StateInjectableContext from dlt.common.schema import Schema from dlt.common.utils import is_interactive @@ -44,12 +45,10 @@ from dlt.pipeline.configuration import PipelineConfiguration from dlt.pipeline.progress import _Collector, _NULL_COLLECTOR from dlt.pipeline.exceptions import CannotRestorePipelineException, InvalidPipelineName, PipelineConfigMissing, PipelineNotActive, PipelineStepFailed, SqlClientNotAvailable -from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, load_trace, merge_traces, start_trace, start_trace_step, end_trace_step, end_trace +from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, load_trace, merge_traces, start_trace, start_trace_step, end_trace_step, end_trace, describe_extract_data from dlt.pipeline.typing import TPipelineStep from dlt.pipeline.state_sync import STATE_ENGINE_VERSION, load_state_from_destination, merge_state_if_changed, migrate_state, state_resource, json_encode_state, json_decode_state -from dlt.common.destination.capabilities import INTERNAL_LOADER_FILE_FORMATS - def with_state_sync(may_extract_state: bool = False) -> Callable[[TFun], TFun]: @@ -285,10 +284,10 @@ def extract( # TODO: if we fail here we should probably wipe out the whole extract folder for extract_id in extract_ids: storage.commit_extract_files(extract_id) - return ExtractInfo() + return ExtractInfo(describe_extract_data(data)) except Exception as exc: # TODO: provide metrics from extractor - raise PipelineStepFailed(self, "extract", exc, ExtractInfo()) from exc + raise PipelineStepFailed(self, "extract", exc, ExtractInfo(describe_extract_data(data))) from exc @with_runtime_trace @with_schemas_sync diff --git a/dlt/pipeline/trace.py b/dlt/pipeline/trace.py index 84fc7f4e97..53a1c20a45 100644 --- a/dlt/pipeline/trace.py +++ b/dlt/pipeline/trace.py @@ -2,18 +2,19 @@ import pickle import datetime # noqa: 251 import dataclasses -from typing import Any, List, NamedTuple, Optional, Protocol, Sequence - +from collections.abc import Sequence as C_Sequence +from typing import Any, List, Tuple, NamedTuple, Optional, Protocol, Sequence import humanize from dlt.common import pendulum from dlt.common.runtime.logger import suppress_and_warn from dlt.common.configuration import is_secret_hint from dlt.common.configuration.utils import _RESOLVED_TRACES -from dlt.common.pipeline import SupportsPipeline +from dlt.common.pipeline import ExtractDataInfo, SupportsPipeline from dlt.common.typing import StrAny from dlt.common.utils import uniq_id +from dlt.extract.source import DltResource, DltSource from dlt.pipeline.typing import TPipelineStep from dlt.pipeline.exceptions import PipelineStepFailed @@ -212,3 +213,35 @@ def load_trace(trace_path: str) -> PipelineTrace: except (AttributeError, FileNotFoundError): # on incompatible pickling / file not found return no trace return None + + +def describe_extract_data(data: Any) -> List[ExtractDataInfo]: + """Extract source and resource names from data passed to extract""" + data_info: List[ExtractDataInfo] = [] + + def add_item(item: Any) -> bool: + if isinstance(item, (DltResource, DltSource)): + # record names of sources/resources + data_info.append({ + "name": item.name, + "data_type": "resource" if isinstance(item, DltResource) else "source" + }) + return False + else: + # anything else + data_info.append({ + "name": "", + "data_type": type(item).__name__ + }) + return True + + item: Any = data + if isinstance(data, C_Sequence) and len(data) > 0: + for item in data: + # add_item returns True if non named item was returned. in that case we break + if add_item(item): + break + return data_info + + add_item(item) + return data_info diff --git a/dlt/pipeline/track.py b/dlt/pipeline/track.py index 7ab1e0553f..8e474b8195 100644 --- a/dlt/pipeline/track.py +++ b/dlt/pipeline/track.py @@ -10,7 +10,7 @@ from dlt.common.runtime.exec_info import github_info from dlt.common.runtime.segment import track as dlthub_telemetry_track from dlt.common.runtime.slack import send_slack_message -from dlt.common.pipeline import LoadInfo, SupportsPipeline +from dlt.common.pipeline import LoadInfo, ExtractInfo, SupportsPipeline from dlt.common.destination import DestinationReference from dlt.pipeline.typing import TPipelineStep @@ -80,12 +80,17 @@ def on_end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: S # if step.step == "load": # if pipeline.runtime_config.slack_incoming_hook and step.step_exception is None: # slack_notify_load_success(pipeline.runtime_config.slack_incoming_hook, step_info, trace) - dlthub_telemetry_track("pipeline", step.step, { + props = { "elapsed": (step.finished_at - trace.started_at).total_seconds(), "success": step.step_exception is None, "destination_name": DestinationReference.to_name(pipeline.destination) if pipeline.destination else None, "transaction_id": trace.transaction_id - }) + } + # disable automatic slack messaging until we can configure messages themselves + if step.step == "extract" and step_info: + assert isinstance(step_info, ExtractInfo) + props["extract_data"] = step_info.extract_data_info + dlthub_telemetry_track("pipeline", step.step, props) def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline) -> None: diff --git a/docs/website/docs/reference/telemetry.md b/docs/website/docs/reference/telemetry.md index 46a55d2d24..d995d7aebd 100644 --- a/docs/website/docs/reference/telemetry.md +++ b/docs/website/docs/reference/telemetry.md @@ -52,6 +52,7 @@ Anonymous telemetry is sent when: - When `pipeline.run` is called, we send information when [extract, normalize and load](explainers/how-dlt-works.md) steps are completed. The data contains the destination name (e.g. `duckdb`), elapsed time, and if the step succeeded or not. +- When `dbt` and `airflow` helpers are used Here is an example `dlt init` telemetry message: diff --git a/pyproject.toml b/pyproject.toml index 26ac80e89d..dd67353ada 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dlt" -version = "0.3.3" +version = "0.3.4" description = "DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run." authors = ["dltHub Inc. "] maintainers = [ "Marcin Rudolf ", "Adrian Brudaru ", "Ty Dunn "] diff --git a/tests/conftest.py b/tests/conftest.py index 042bdb2f7c..960305669f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,6 +27,7 @@ def pytest_configure(config): test_storage_root = "_storage" run_configuration.RunConfiguration.config_files_storage_path = os.path.join(test_storage_root, "config/") + # push telemetry to CI run_configuration.RunConfiguration.dlthub_telemetry_segment_write_key = "TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB" storage_configuration.LoadStorageConfiguration.load_volume_path = os.path.join(test_storage_root, "load") @@ -55,9 +56,6 @@ def _create_pipeline_instance_id(self) -> str: return pendulum.now().format("_YYYYMMDDhhmmssSSSS") Pipeline._create_pipeline_instance_id = _create_pipeline_instance_id - - # push telemetry to CI - # os.environ["RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY"] = "TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB" # push sentry to ci os.environ["RUNTIME__SENTRY_DSN"] = "https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752" diff --git a/tests/pipeline/test_pipeline_trace.py b/tests/pipeline/test_pipeline_trace.py index 174785d308..b9bb8bb33a 100644 --- a/tests/pipeline/test_pipeline_trace.py +++ b/tests/pipeline/test_pipeline_trace.py @@ -13,13 +13,16 @@ from dlt.common.configuration.specs import CredentialsConfiguration from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext from dlt.common.pipeline import ExtractInfo +from dlt.common.schema import Schema from dlt.common.runtime.telemetry import stop_telemetry from dlt.common.typing import DictStrAny, StrStr, TSecretValue from dlt.pipeline.exceptions import PipelineStepFailed from dlt.pipeline.pipeline import Pipeline -from dlt.pipeline.trace import PipelineTrace, SerializableResolvedValueTrace, load_trace +from dlt.pipeline.trace import PipelineTrace, SerializableResolvedValueTrace, describe_extract_data, load_trace from dlt.pipeline.track import slack_notify_load_success +from dlt.extract.source import DltResource, DltSource +from dlt.extract.pipe import Pipe from tests.utils import start_test_telemetry from tests.common.configuration.utils import toml_providers, environment @@ -51,7 +54,8 @@ def data(): assert step.step == "extract" assert isinstance(step.started_at, datetime.datetime) assert isinstance(step.finished_at, datetime.datetime) - assert step.step_info is extract_info + assert isinstance(step.step_info, ExtractInfo) + assert step.step_info.extract_data_info == [{"name": "inject_tomls", "data_type": "source"}] # check config trace resolved = _find_resolved_value(trace.resolved_config_values, "api_type", []) assert resolved.config_type_name == "TestCreateTraceInjectTomlsConfiguration" @@ -97,6 +101,7 @@ def data(): assert step.step == "extract" assert isinstance(step.step_exception, str) assert isinstance(step.step_info, ExtractInfo) + assert step.step_info.extract_data_info == [{"name": "async_exception", "data_type": "source"}] assert_trace_printable(trace) # normalize @@ -224,6 +229,9 @@ def test_trace_telemetry() -> None: assert event["properties"]["destination_name"] == "dummy" assert isinstance(event["properties"]["elapsed"], float) assert isinstance(event["properties"]["transaction_id"], str) + # check extract info + if step == "extract": + assert event["properties"]["extract_data"] == [{"name": "", "data_type": "int"}] # we have two failed files (state and data) that should be logged by sentry assert len(SENTRY_SENT_ITEMS) == 2 @@ -243,10 +251,36 @@ def data(): assert event["properties"]["success"] is False assert event["properties"]["destination_name"] == "dummy" assert isinstance(event["properties"]["elapsed"], float) + # check extract info + if step == "extract": + assert event["properties"]["extract_data"] == [{"name": "data", "data_type": "resource"}] # we didn't log any errors assert len(SENTRY_SENT_ITEMS) == 0 +def test_extract_data_describe() -> None: + schema = Schema("test") + assert describe_extract_data(DltSource("sss_extract", "sect", schema)) == [{"name": "sss_extract", "data_type": "source"}] + assert describe_extract_data(DltResource(Pipe("rrr_extract"), None, False)) == [{"name": "rrr_extract", "data_type": "resource"}] + assert describe_extract_data([DltSource("sss_extract", "sect", schema)]) == [{"name": "sss_extract", "data_type": "source"}] + assert describe_extract_data([DltResource(Pipe("rrr_extract"), None, False)]) == [{"name": "rrr_extract", "data_type": "resource"}] + assert describe_extract_data( + [DltResource(Pipe("rrr_extract"), None, False), DltSource("sss_extract", "sect", schema)] + ) == [ + {"name": "rrr_extract", "data_type": "resource"}, {"name": "sss_extract", "data_type": "source"} + ] + assert describe_extract_data([{"a": "b"}]) == [{"name": "", "data_type": "dict"}] + from pandas import DataFrame + # we assume that List content has same type + assert describe_extract_data([DataFrame(), {"a": "b"}]) == [{"name": "", "data_type": "DataFrame"}] + # first unnamed element in the list breaks checking info + assert describe_extract_data( + [DltResource(Pipe("rrr_extract"), None, False), DataFrame(), DltSource("sss_extract", "sect", schema)] + ) == [ + {"name": "rrr_extract", "data_type": "resource"}, {"name": "", "data_type": "DataFrame"} + ] + + def test_slack_hook(environment: StrStr) -> None: stop_telemetry() hook_url = "https://hooks.slack.com/services/T04DHMAF13Q/B04E7B1MQ1H/TDHEI123WUEE"