Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glue with HUDI: access to model input and output row counts #363

Open
pstrasser opened this issue Mar 19, 2024 · 0 comments
Open

Glue with HUDI: access to model input and output row counts #363

pstrasser opened this issue Mar 19, 2024 · 0 comments
Labels
enhancement New feature or request

Comments

@pstrasser
Copy link

Describe the feature

For monitoring the data processing with dbt, we need to access information about the processed data. For each model in the dbt project, some information like updated rows (for upserts) or total rows (for inserts) would be nice. Also some information about the magnitude of the referenced source model should be provided. As we plan to process a large amount of data daily with the help of dbt-glue, we would like to somehow track the record count that goes into each model and leaves it.

Describe alternatives you've considered

Other dbt-adapters have the possibility to fetch this via the result object and can access the adapter_response field for additional information like affected_rows.

This seems to be impossible for all spark-based implementations, as it is mentioned [here](https://github.com/dbt-labs/dbt-spark/issues/812], right?

I found this section in the code in impl.py:

inputDf = spark.sql(request)
outputDf = inputDf.drop("dbt_unique_key").withColumn("update_hudi_ts",current_timestamp())
if outputDf.count() > 0:
        parallelism = spark.conf.get("spark.default.parallelism")
        print("spark.default.parallelism: %s", parallelism)
        hudi_parallelism_options = {{
            "hoodie.upsert.shuffle.parallelism": parallelism,
            "hoodie.bulkinsert.shuffle.parallelism": parallelism,
        }}
        combinedConf = {{**{str(combined_config)}, **hudi_parallelism_options, **{str(hudi_config)}}}
        {self.hudi_write(write_mode, session, target_relation, custom_location)}

spark.sql("""REFRESH TABLE {target_relation.schema}.{target_relation.name}""")
SqlWrapper2.execute("""SELECT * FROM {target_relation.schema}.{target_relation.name} LIMIT 1""")

Here, somethink like an accumulator could be added, that tracks the number of rows in input and output dataframe, which would at least provide a little bit of monitoring for dbt-glue. I thought about somethink like this:

def countWithAccumulator(acc: LongAccumulator)(df: DataFrame): DataFrame = {
  val encoder = RowEncoder(df.schema)
  df.map(r => {
    acc.add(1)
    r
  })(encoder)
}

Code adaption:

val inputCounter = ...
inputDf = spark.sql(request).transform(countWithAccumulator(inputCounter)))

// integrate this somehow
println(s"number of model input records: ${inputCounter.value}")

Who will this benefit?

Everybody, who wants to have some monitoring about processed records. For now, this is not possible to access anyhow. The only possibility that occurs to me is writing custom sql statements, that track the count in tables, but we do not want to couple this dev monitoring with the actual business logic.

Are you interested in contributing this feature?

Yes, but further coordination is necessary on how to integrate this.

@pstrasser pstrasser added the enhancement New feature or request label Mar 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant