Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance incremental computation support in Texera #2165

Open
wants to merge 22 commits into
base: master
Choose a base branch
from

Conversation

zuozhiw
Copy link
Collaborator

@zuozhiw zuozhiw commented Sep 26, 2023

This PR enhances incremental computation support in Texera, including:

  1. Added an incremental aggregation framework. Specifically:
  • PartialAggregateOpExec and FinalAggregateOpExec are updated to use incremental computation. They will perdoically emit partial results to downstream.
  • Aggregate and LineChart operators now use the new aggregation framework. Other aggregate-based visualizations are not using it as they are now implemented with Python UDFs and HTML visualizations.
  • WordCloud is not using the new framework, as WordCloud is a special top-k aggregation.
  1. Added a general incremental computation option for all operators. Specifically:
  • Added a new option supportRetractableInput to indicate whether an operator support retractions as input tuples.
  • Added a new incremental computation enforcer that rewrites the workflow based on incremental computation requirements. It propagate the incremental properties and adds a "consolidate" operator if necessary.
  1. Added a simple incremental join operator.

For detailed technical presentation on incremental computation, see this slide and descriptions in this PR

@zuozhiw zuozhiw changed the title Enhance incremental computation support in Texera [WIP] Enhance incremental computation support in Texera Sep 29, 2023
Copy link
Collaborator

@Yicong-Huang Yicong-Huang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR looks good and clean! Left some small comments in code. Although, I am not very sure about the new incremental join operator, what is it behavior if the inputs are already retractable?

Comment on lines +90 to +95
var rewrittenLogicalPlan =
WorkflowCacheRewriter.transform(logicalPlan, opResultStorage, opsToReuseCache)
rewrittenLogicalPlan.operatorMap.values.foreach(initOperator)

// perform rewrite to enforce progressive computation constraints
rewrittenLogicalPlan = ProgressiveRetractionEnforcer.enforceDelta(rewrittenLogicalPlan, context)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest creating a new variable name for each step of the rewrite, as they are rewrites with different purposes.

Comment on lines +69 to +78
private def shouldEmitOutput(): Boolean = {
System.currentTimeMillis - lastUpdatedTime > UPDATE_INTERVAL_MS
}

private def emitOutputAndResetState(): scala.Iterator[Tuple] = {
lastUpdatedTime = System.currentTimeMillis
val resultIterator = getPartialOutputs()
this.partialObjectsPerKey = new mutable.HashMap[List[Object], List[Object]]()
resultIterator
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see similar code for partial and final aggregate operators to do time-based snapshots to push partial results out. If the time-based snapshot is a universal strategy for incremental operators to push out partial results, is it better to make it a standard framework?

@@ -272,9 +272,9 @@ public BuilderV2 add(String attributeName, AttributeType attributeType, Object f
*/
public BuilderV2 addSequentially(Object[] fields) {
checkNotNull(fields);
checkSchemaMatchesFields(schema.getAttributes(), Lists.newArrayList(fields));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need such an assertion for the normal tuple fields. if we need to add new fields (e.g., retraction or not), we can treat it separately? If so, I can do it in a future PR.


import scala.collection.mutable.ArrayBuffer

object ProgressiveRetractionEnforcer {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some doc to explain this enforcer's duty?

@@ -71,6 +72,7 @@ class SpecializedAggregateOpDesc extends AggregateOpDesc {
}
Schema
.newBuilder()
.add(ProgressiveUtils.insertRetractFlagAttr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we can have a Builder.allowRetract() to add this attribute internally for users?

Comment on lines +65 to +67
val builder = Tuple
.newBuilder(operatorSchemaInfo.outputSchemas(0))
.add(left)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case where the input left tuples and/or right tuples are already supporting retraction?

@Yicong-Huang
Copy link
Collaborator

will revisit after complier refactoring.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants