Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tiled implementation of the Flink app #627

Merged
merged 46 commits into from
Feb 28, 2024

Conversation

caiocamatta-stripe
Copy link
Collaborator

@caiocamatta-stripe caiocamatta-stripe commented Nov 30, 2023

Summary

Adds a tiled implementation of the Flink app.

The tiled version of the Flink job adds a window / tiling operator. It has roughly the following DAG:

  1. Kafka source - Reads objects of type T (specific case class, Thrift / Proto) from a Kafka topic
  2. Spark expression eval - Evaluates the Spark SQL expression in the GroupBy and projects and filters the input data
  3. Window/tiling - This window aggregates incoming events, keeps track of the IRs, and sends them forward so they are written out to the KV store
  4. Avro conversion - Finishes converting the output of the window (the IRs) to a form that can be written out to the KV store (PutRequest object)
  5. KV store writer - Writes the PutRequest objects to the KV store using the AsyncDataStream API

In a subsequent PR I will add documentation on tiling in general -- probably in a markdown file. This documentation will be similar to the "Flink and Tiling" presentation I gave at the Stripe <> AirBnB summit we had.

Why / Goal

In our experience at Stripe, the tiled version of Chronon (Flink + Fetcher) has much lower latency and significantly better scalability than the untiled/regular version.

Test Plan

  • Added Unit Tests
  • Integration tested

I added

  • an end-to-end test for the untiled app that confirms the events are processed correctly.
  • tests for the key selector function
  • tests for the row aggregators

Checklist

  • [~] Documentation update
    • Comments in the code serve as immediate documentation, but as mentioned before I will be adding additional documentation

Reviewers

@piyushn-stripe @nikhilsimha @cristianfr

Copy link
Collaborator

@piyushn-stripe piyushn-stripe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Largely looks good, few minor comments

extends RichFlatMapFunction[Map[String, Any], PutRequest] {
@transient lazy val logger = LoggerFactory.getLogger(getClass)
// This utility contains common code for AvroCodecFn and TiledAvroCodecFn
sealed trait AvroCodecFnUtility {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a name like BaseAvroCodecFn ? I tend to associate utility with singletons which isn't what you're getting at here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make things explicit, we could make this an abstract class that extends from RichFlatMapFunction (so then subclasses know their contract is to provide groupByServingInfoParsed and fill out the open / flatMap methods)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed - BaseAvroCodecFn is a better name and making the contracts clearer is a good idea.

case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed)
extends RichFlatMapFunction[Map[String, Any], PutRequest] {
@transient lazy val logger = LoggerFactory.getLogger(getClass)
// This utility contains common code for AvroCodecFn and TiledAvroCodecFn
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets also highlight what the subclasses need to override / specialize

* Spark expression eval - Evaluates the Spark SQL expression in the GroupBy and projects and filters the input data
* Avro conversion - Converts the Spark expr eval output to a form that can be written out to the KV store (PutRequest object)
* KV store writer - Writes the PutRequest objects to the KV store using the AsyncDataStream API
* Kafka source -> Spark expression eval -> Avro conversion -> KV store writer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this changes now with the new tiling operators right? Should we add those too or maybe we move the operator breakdown to the run* methods

@@ -125,11 +88,76 @@ class FlinkJobIntegrationTest {
// capture the datastream of the 'created' timestamps of all the written out events
val writeEventCreatedDS = CollectSink.values.asScala

println(writeEventCreatedDS.size)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

* Find the smallest tail window resolution in a GroupBy. Returns None if the GroupBy does not define any windows.
* The window resolutions are: 5 min for a GroupBy a window < 12 hrs, 1 hr for < 12 days, 1 day for > 12 days.
* */
def getSmallestWindowResolutionInMillis(groupBy: GroupBy): Option[Long] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could live in Extensions - we already have helper functions like MaxWindow etc on GroupBy as implicits: https://github.com/airbnb/chronon/blob/master/api/src/main/scala/ai/chronon/api/Extensions.scala#L416

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extensions does seem like a good place for this, but it would require api to depend on aggregator so it can use FiveMinuteResolution.calculateTailHop. Or duplicating calculateTailHop in Extensions. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aggregator should already be able to access api/Extensions.scala

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@caiocamatta-stripe if IntelliJ doesn't auto-complete or detect the functions in Aggregator you might need to install a scala extension, because the implicits need a little help to work in IDE I think. But it should still compile from sbt.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I may be doing something silly..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to move resolution & 5minResolution into api if you want to achieve this. But not necessary in this PR IMO

Copy link
Contributor

@ezvz ezvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleting my last comment, I see that you did create some tests. Will continue reviewing shortyl!

// We use Flink "Side Outputs" to track any late events that aren't computed.
val tilingLateEventsTag = OutputTag[Map[String, Any]]("tiling-late-events")

// The tiling operator works the following way:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice comments!

Copy link
Contributor

@ezvz ezvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything looks pretty good to me. Might be good to get a stamp from @nikhilsimha too for this one.

List("id1") -> List(4.0), // Add up the double_val of the two 'id1' events
List("id2") -> List(10.0)
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not necessarily need to be part of this PR, but eventually having a test case with events that fall outside of the tail of the window might be interesting to make sure that we tail-exclude properly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! cc @piyushn-stripe - one of us can include it when we get a chance.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah +1 to that. Would be a good follow up (for both the tiled & untiled)

@caiocamatta-stripe
Copy link
Collaborator Author

hey @nikhilsimha can I get your review too pls?

@@ -75,6 +75,8 @@ class AsyncKVStoreWriter(onlineImpl: Api, featureGroupName: String)
// The context used for the future callbacks
implicit lazy val executor: ExecutionContext = AsyncKVStoreWriter.ExecutionContextInstance

// One may want to use different KV stores depending on whether tiling is on.
// The untiled version of Chronon works on "append" store semantics, and the tiled version works on "overwrite".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Comment on lines +21 to 23
* @tparam IN The input data type which contains the data to be avro-converted to bytes.
* @tparam OUT The output data type (generally a PutRequest).
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the naming itself makes a lot of what is written here obvious.

I am personally feel auto generated doc strings reduce readability of code, but it is subjective.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it hurts here, but noted!

I'll leave it for now but don't mind removing it if anyone has strong opinions.

@@ -47,7 +58,25 @@ class FlinkJob[T](eventSrc: FlinkSource[T],
// The source of our Flink application is a Kafka topic
val kafkaTopic: String = groupByServingInfoParsed.groupBy.streamingSource.get.topic

/**
* The "untiled" version of the Flink app.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this style of comments - over the autogen doc strings

Comment on lines 96 to 97
if (debug) {
logger.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not logger.debug?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to copy what's done in BaseFetcher, but I do prefer logger.debug

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to logger.debug, I prefer it that way too

Copy link
Contributor

@nikhilsimha nikhilsimha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid PR!

Can you also add a Readme to the chronon/flink dir with a rough control flow diagram?

@caiocamatta-stripe
Copy link
Collaborator Author

@nikhilsimha, I can add a control flow in a follow-up PR. For now we have the doc on the Tiled Architecture (which I'm merging along with this PR).

@caiocamatta-stripe caiocamatta-stripe merged commit dcb3750 into main Feb 28, 2024
7 checks passed
@caiocamatta-stripe caiocamatta-stripe deleted the caiocamatta--add-flink-tiling-oss branch February 28, 2024 17:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants