-
Notifications
You must be signed in to change notification settings - Fork 75
Guide to Implement a Java Native Operator
In this page, we'll explain the basic concepts in Texera and use examples to show how to implement an operator.
Every operator ideally has three classes that are found in each operator package in src/main/scala/edu/uci/ics/texera/workflow/operators/
- LogicalOp
- OperatorExecutor
- OperatorExecutorConfig
A Texera user constructs a workflow using the frontend, which consists of many operators. Each operator take input data from its previous operator(s), does some computation, and outputs the results to the next operator(s).
Suppose we have the following sample records, each of which has an ID and a tweet.
id tweet
1 "today is a good day"
2 "weather is bad during the day"
Each row is called a Tuple
, and each column is called a Field
.
// get the value of a field by column name
tuple1.getField("id") // result: 1
tuple1.getField("tweet") // result: "today is a good day"
// get the value by column index
tuple1.get(0) // result: 1
In this dataset, we have 2 columns, namely id
of the integer type and tweet
of the string type. This information is called a Schema
.
A schema
contains a list of attributes
, and each attribute
has a name
(name of the column) and a type
(data type of the column).
schema = tuple.getSchema()
schema.getAttributes().get(0) // Attribute("id", AttributeType.Integer)
schema.getAttributes().get(1) // Attribute("tweet", AttributeType.String)
A regular expression operator matches a regular expression (regex) on each input tuple. For example, if we search the regex "weather" on the tweet
attribute, then only tuple 2 will be the result. In other words, the regular expression operator is a kind of filter()
operation in many programming languages.
To implement a regular expression operator, you will first need to write an LogicalOp
. The following code is part of class RegexOpDesc
.
class RegexOpDesc extends FilterOpDesc {
@JsonProperty(required = true)
@JsonSchemaTitle("attribute")
@JsonPropertyDescription("column to search regex on")
@AutofillAttributeName
var attribute: String = _
@JsonProperty(required = true)
@JsonSchemaTitle("regex")
@JsonPropertyDescription("regular expression")
var regex: String = _
@JsonProperty(required = false, defaultValue = "false")
@JsonSchemaTitle("Case Insensitive")
@JsonPropertyDescription("regex match is case sensitive")
var caseInsensitive: Boolean = _
}
The regular expression operator needs to take 3 properties from the user, namely attribute
(the name of the column to search on), regex
(the regular expression itself) and caseInsensitive
(whether case sensitive for this regular expression).
The @JsonProperty
annotation will let the system know that this property needs to come from the user input, and it will automatically generate the corresponding input form in the frontend.
Inside @JsonProperty
, required = true
tells the frontend that this property is required from the user. The property also needs to provide a user-friendly title (inside @JsonSchemaTitle
annotation) and a detailed description (inside @JsonPropertyDescription
annotation). @AutofillAttributeName
annotation tells the frontend to provide autocomplete on attribute name (name of the column).
This operator descriptor also needs to provide information about this operator, including a user-friendly name, description, the group it belongs to, and number of input/output ports.
override def operatorInfo: OperatorInfo =
OperatorInfo(
userFriendlyName = "Regular Expression",
operatorDescription = "Search a regular expression in a string column",
operatorGroupName = OperatorGroupConstants.SEARCH_GROUP,
numInputPorts = 1,
numOutputPorts = 1
)
Finally, the operator descriptor needs to specify its corresponding operator executor. An OperatorExecutor
, or OpExec
for short, contains the implementation of the processing logic in the operator. For the regular expression operator, it corresponds to RegexOpExec
. The OpDesc supplies an OpExecInitInfo
with a function that creates the corresponding operator executor () => new RegexOpExec(this)
. When creating a PhysicalOp (e.g., using oneToOnePhysicalOp
in this case, which is one type of physical operator that should be used in most cases), the OpExecInitInfo
is passed in for the PhysicalOp to use.
PhysicalOp.oneToOnePhysicalOp(
executionId,
operatorIdentifier,
OpExecInitInfo(_ => new RegexOpExec(this))
)
The implementation of the regular expression operator executor is rather simple. Since this operator is doing a kind of filter()
operation, it extends a pre-defined class FilterOpExec
. It calls setFilterFunc
to specify the filter function used by this operator: the matchRegex
function. In matchRegex
, we first get the string value of a column, and then test if the value matches the regex.
class RegexOpExec(val opDesc: RegexOpDesc) extends FilterOpExec {
val pattern: Pattern = Pattern.compile(opDesc.regex)
this.setFilterFunc(this.matchRegex)
def matchRegex(tuple: Tuple): Boolean = {
val tupleValue = tuple.getField(opDesc.attribute).toString
return pattern.matcher(tupleValue).find
}
}
This operator needs to be registered to let the system know its existence. In the LogicalOp
class, we need to add a new entry, which specifies its operator descriptor class and a unique operator name.
@JsonSubTypes(
Array(
new Type(value = classOf[RegexOpDesc], name = "Regex"),
)
)
abstract class LogicalOp extends PortDescriptor with Serializable {
}
Now this operator will be automatically available in the frontend. We can now start the system and test this operator.
To add an image for this operator, go to core/new-gui/src/assets/operator_images
, then add an image with the SAME NAME as what's specified in the operator registration. The image file should be in png
format, with a transparent background, black and white, and should be square.
For example, for the regex operator, the code new Type(value = classOf[RegexOpDesc], name = "Regex")
specified a name Regex
, then the image file name should be Regex.png
.
Summary: we have gone through the steps to implement a simple regular expression operator. This operator is a type of filter()
operation. So it's built on top of a set of pre-defined classes, FilterOpDesc
, FilterOpExec
, and FilterOpExecConfig
.
A map()
operation processes one input tuple and produces exactly one output tuple. Next, we'll briefly explain the map()
type of operators using the Sentiment Analysis operator as an example.
The sentiment analysis operator uses the Stanford NLP package to analyze the sentiment of a text. Given the example dataset above, the output of this operator looks like this:
id tweet sentiment
1 "today is a good day" "positive"
2 "weather is bad during the day" "negative"
The following code is the implementation of class SentimentAnalysisOpDesc
in Java.
public class SentimentAnalysisOpDesc extends MapOpDesc {
@JsonProperty(required = true)
@JsonSchemaTitle("attribute")
@JsonPropertyDescription("column to perform sentiment analysis on")
@AutofillAttributeName
public String attribute;
@JsonProperty(value = "result attribute", required = true, defaultValue = "sentiment")
@JsonPropertyDescription("column name of the sentiment analysis result")
public String resultAttribute;
@Override
public OneToOneOpExecConfig operatorExecutor() {
return new OneToOneOpExecConfig(operatorIdentifier(), () -> new SentimentAnalysisOpExec(this));
}
@Override
public OperatorInfo operatorInfo() {
return new OperatorInfo(
"Sentiment Analysis",
"analysis the sentiment of a text using machine learning",
OperatorGroupConstants.ANALYTICS_GROUP(),
1, 1
);
}
@Override
public Schema getOutputSchema(Schema[] schemas) {
if (resultAttribute == null || resultAttribute.trim().isEmpty()) {
return null;
}
return Schema.newBuilder().add(schemas[0]).add(resultAttribute, AttributeType.STRING).build();
}
}
You'll notice that this operator implements a new function, getOutputSchema
. This is because this operator adds a new column called sentiment
. The function getOutputSchema
returns the output schema produced by this operator given an input schema.
In this implementation, resultAttribute
is the new column name given by the user (default value is "sentiment"). If the value is empty, we return a null value to indicate that the output schema cannot be produced. The result schema includes all the attributes from the input schema, plus a new attribute of type string.
The regular expression operator does not implement this function because a filter()
operation does not add or remove any columns.
The implementation of SentimentAnalysisOpExec
extends MapOpExec
and provides a map function. You can check the implementation in the codebase.
In Texera, currently we have 4 pre-defined operations you can extend.
-
filter()
: filters out any input tuple if it doesn't satisfy a condition. -
map()
: for each input tuple, transforms it to exactly one output tuple. -
flatmap()
: for each input tuple, transforms it to a list of output tuples. -
aggregate()
: performs an aggregation, such as sum, count, average, etc.
To implement an operator, you can first check if your operator can be implemented using the 4 pre-defined operations. You can find these pre-defined operations under texera/workflow/common/operators
. Your own operator implementation should be in texera/workflow/operators/youroperator
.
For more complicated operators, if they cannot be implemented using these operations, then you need to implement OperatorExecutor
using the following low-level interface.
trait IOperatorExecutor {
def open(): Unit
def close(): Unit
def processTuple(tuple: Either[ITuple, InputExhausted], input: Int): Iterator[ITuple]
}
The open()
and close()
functions allow you to initialize and dispose any resources (such as opened files), respectively. They will be called once before and after the whole execution by the engine. The important function is processTuple
, which implements the processing logic inside the operator.
The processTuple
function takes two parameters: tuple
and input
. Since an operator can have multiple input ports, and each input port can have multiple input operators connected to (e.g., Union), input: Int
indicates which input port the current tuple is coming from. The parameter tuple
is either a Tuple
type or an InputExhausted
type, indicating all data from an input operator has been exhausted. It returns an Iterator[Tuple]
, which means zero or more output tuples can be produced following this input. processTuple
will be called whenever a new input tuple arrives, and called once if the input is exhausted. When an input port is connected to multiple input operators, this InputExhausted
will be processed multiple times (once per input operator).
Texera's backend is responsible for determining the UI information to the frontend. After receiving the information, the frontend efficiently translates and presents the content.
-
Input Box
Here is an example of a user input box, with the name “Client Id” and its description.
@JsonProperty(required=true) @JsonSchemaTitle("Client Id") @JsonPropertyDescription("Client id that uses to access Reddit API") var clientId: String = _
-
Multiple selection
Here is an example of a multiple selection in the aggregate operator.
@JsonProperty(value = "attribute", required = true) @JsonPropertyDescription("column to calculate average value") @AutofillAttributeName var attribute: String = _
In the backend, we assign the attribute name list to fill the selections. Since it is multiselection, the type needs to be a list.
-
Checkbox
For the checkbox, we assign the data type to boolean. Here is an example in pythonUDF operator. By setting the data type to boolean, we successfully implement it as a checkbox.
@JsonProperty(required = true, defaultValue = "true") @JsonSchemaTitle("Retain input columns") @JsonPropertyDescription("Keep the original input columns?") var retainInputColumns: Boolean = Boolean.box(false)
-
List
In pythonUDF operator, there is an example of a list, which is for the output schema. By clicking the blue button, we can add one more pair of attribute information. And the red button will delete such attribute information. In the backend, we have a list to hold the attribute values.
@JsonProperty @JsonSchemaTitle("Extra output column(s)") @JsonPropertyDescription( "Name of the newly added output columns that the UDF will produce, if any" ) var outputColumns: List[Attribute] = List()
In the file amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/LogicalOp.scala
, you will find a list of all registered operators, complete with their descriptor classes and names. After adding an operator's information, you can assign an icon to it. All operator icons are stored in the /core/new-gui/src/assets/operator_images
directory. It's essential to ensure that the icon filename matches its respective operator descriptor name.