Cassandra keyspace name. Capillaries uses keyspaces as part of a script execution context. For example, if a script is expected to be executed every hour, producing time-of-day-specific data, corresponding keyspaces may be "hourly_summary_2000_05_25_09_00", "hourly_summary_2000_05_25_10_00" and so on. Each keyspace will hold isolated data, index, and workflow tables.
Capillaries creates keyspaces on-the-fly when they are referenced for the first time.
Capillaries scripts allow the following types:
Golang string, Cassandra TEXT
Golang int64, Cassandra BIGINT
Golang float64, Cassandra DOUBLE
Golang bool, Cassandra BOOLEAN
Golang time.time, Cassandra TIMESTAMP
Golang github.com/shopspring/decimal, Cassandra DECIMAL (both trimmed to 2 decimal digits)
Execution of a subset (or all) of script nodes. Runs help cover the scenario with supervised script execution when an operator may want to wait for some nodes to complete, check result data quality, and initiate the next run that will use those validated results. Runs are numbered starting from 1.
- Subset of rows from the source data table
- All data lines from one source data file read by file-table processor
Ability of a script node to split input data into batches and perform data processing simultaneously for multiple batches using multiple instances of the daemon. See expected_batches_total setting.
Complete set of instructions on how to process data using Capillaries for a specific business task. On the top level, it's a map of script nodes and a map of dependency policies.
A logical step in the script that calls a specific processor and supplies it with data produced by other script nodes or by external components (for example, via files).
Nodes that allow parallellism eventually trigger multiple instances of processors on multiple machines.
SQL-style joins. Only two types of joins are supported:
- left: SQL LEFT OUTER JOIN
- inner: SQL INNER JOIN
Used by table_lookup_table script nodes
Cassandra tables. There are three types of tables used by Capillaries. Cassandra setup section has a cqlsh screenshot with all types of tables on it.
Holds data results of a table-creating node. Data tables cannot have names that start with idx_
or wf_
.
Holds index built as the result of a table-creating node. One data table can have zero or many indexes, each defined using index definition. Marked with idx_
prefix.
Holds information about the execution status of each batch, node, run in this keyspace. Marked with wf_
prefix.
Data/index tables are run-specific, so table names are suffixed with a correspondent run id.
Capillaries code that reads source data from source tables/files performs data processing and writes data to target data tables or files.
The following processor types are supported out-of-the-box
Reads data from the source data file and writes processed data to the target table
Reads data from the source table and writes processed data to the target table
Reads data from the source table, finds matching rows in the lookup table, performs join operations, and writes processed data to a table. These nodes support SQL-like aggregate functions in Go expressions.
Reads records from the source table, makes sure the record is unique using the supplied unique index (only one unique index definition is allowed, and it is required), writes record to a table if it's unique
Reads data from the source table and writes processed data to the target file
A custom processor that can be implemented by a third party. The following custom processors are part of this repository:
A custom processor must meet some requirements.
- Must implement interfaces:
- CustomProcessorDef
- CustomProcessorRunner
- When processing a source data row, it should not make any assumptions about other source or target data rows.
Sample custom processor implementation in pkg/custom/py_calc. Performs arbitrary data processing on input data using user-supplied Python formulas. The functionality is well-covered in py_calc integration test. Toolbelt and Daemon environment configuration settings:
Full path to Python interpreter
Parameters passed to Python interpreter
Sample custom processor implementation in pkg/custom/tag_and_denormalize. Denormalizes input data by checking tag criteria and producing a new data row for each matching tag. The functionality is well-covered in tag_and_denormalize integration test and in the "What it is" section.
Toolbelt and Daemon environment configuration settings are discussed below.
The field in the target table where the tag value will be written to
tag->criteria_expression map. Expressions are allowed to use reader fields only (r.*
).
Same as tag_criteria, but in a separate JSON file. This is the preferred method to specify tag criteria because the list of tags:
- may contain thousands of entries, it's not a good idea to pollute the script file with those
- may be generated dynamically by some workflow component, it's good to have it contained in a single file without touching the script
One-line Go snippets used in script settings:
- field expressions
- writer "having" expressions
- lookup "filter" expressions
For the list of supported operations, see Eval(exp ast.Expr)
implementation in eval_ctx.go.
For the list of supported Go functions, see EvalFunc(callExp *ast.CallExpr, funcName string, args []interface{})
implementation in eval_ctx.go
At the moment, Capillaries supports only a limited subset of the standard Go library. Additions are welcome. Keep in mind that Capillaries expression engine:
- supports only primitive types (see Capillaries data types)
- does not support class member function calls
- does not support statements or multi-line expressions
- supports aggregate functions used in table_lookup_table nodes
RabbitMQ queue containing messages for a processor. The name of the queue is given by the handler_executable_type setting.
Graphviz DOT language - markdown-style diagram-drawing language. There is a number of free online tools that can visualize DOT language documents.
A command-line executable that performs common Capillaries operations by:
- reading Capillaries script files
- sending commands to the processor queue
The Toolbelt:
- can start/stop runs, so solution developers can use it in their scripts
- gives very basic access to the workflow tables, see
get_*_history
commands - can produce rudimentary visuals using DOT diagram language - see
validate_script
,get_run_status_diagram
commands
See Toolbelt and Daemon configuration for configuration settings.
One of the main purposes of the toolbelt is to give system integrators easy access to Capillaries API. Also, the toolbelt can be useful for visualizing scripts and the status of their execution with DOT diagrams, for example:
# Can be executed anytime
go run capitoolbelt.go validate_script -script_file=../../../test/data/cfg/lookup_quicktest/script.json -params_file=../../../test/data/cfg/lookup_quicktest/script_params_two_runs.json -idx_dag=true
# Can be executed when the lookup script is running using two runs
go run capitoolbelt.go get_run_status_diagram -script_file=../../../test/data/cfg/lookup_quicktest/script.json -params_file=../../../test/data/cfg/lookup_quicktest/script_params_two_runs.json -keyspace=lookup_quicktest -run_id=1
capideploly is not part of Capillaries framework. It's a command line tool that can be used to deploy a complete Capillaries-based solution in the public or private cloud that implements Openstack API or in the AWS cloud. See Capideploy repository.
An executable that implements one or more processors. Capillaries source code comes with a stock daemon that implements all supported processor types, including py_calc processor implemented as a custom processor.
The daemon consumes all messages from the processor queue specified by the handler_executable_type setting.
After processing all batches for a node, the daemon sends batch messages to dependant nodes, using RabbitMQ exchange specified by
For example, the stock daemon coming as part of the Capillaries source code uses:
- queue name (handler_executable_type): "capi_daemon"
- exchange name (exchange):"capillaries"
Third-party daemons may use other names for either/both queue and exchange, but in this case, the developers are in charge of creating all correspondent RabbitMQ infrastructure for that queue name, including dead-letter-exchange.
See Toolbelt and Daemon configuration for configuration settings.
A simple application that provides web service access to Capillaries environment (similar to the Toolbelt, but it speaks HTTP instead of cmdline). Can be used by Capillaries-UI, by integration tests (see lookup Webapi test or by third-party applications.
Please note that Webapi lacks user authorization capabilities.
A simple web UI application that provides user access to Capillaries environment (RabbitMQ queues and Cassandra storage) using Webapi. See Capillaries-UI readme for details.
Directed Acyclic Graph. Used in workflow descriptions and defines a collection of all tasks to run, organized in a way that reflects their relationships and dependencies.
Capillaries uses zap by Uber for logging. Logger settings can be changed in environment config JSON file. For log analysis, use free or commercial tools of your choice.
Unique int64 identifier assigned by Capillaries to every data row. Used internally in data table reader and lookup implementation.
Defines how file reader reads columns from the source file (CSV, Parquet).
col_default_value
: default value (specified as string in this setting: "0.0", "true" etc) to be used if the source file contains no value for this field; if omitted, the default Go value for this type is used
col_type
: one of the supported types
csv.col_idx
: zero-based column index in the source file; prohibited if col_hdr is specified
csv.col_hdr
: source file column header; prohibited if col_idx is specified
csv.col_format
: depends on the field type:
int
: must include%d
,fmt.Sscanf()
is used internallyfloat
: must include%f
,fmt.Sscanf()
is used internallydecimal2
: must include%f
,fmt.Sscanf()
is used internallydatetime
: must include Go2006-01-02 15:04:05
-style format specifier,time.Parse()
is used internallystring
: should not specify format, whole field contents will be loadedbool
: should not specify format,strconv.ParseBool
is used internally
parquet.col_name
: column name
Parquet types supported by Parquet Reader (from Parquet to Capillaries/Go):
Parquet Type/Logical | Capillaries (Go) |
---|---|
BYTE_ARRAY/UTF8 | string |
INT_64, INT_32 | int64 |
FLOAT, DOUBLE | float64 |
BOOLEAN | bool |
INT_32/DECIMAL, INT_64/DECIMAL, FIXED_LEN_BYTE_ARRAY/DECIMAL (up to 8 bytes only) | decimal2 |
INT_96, INT_32/DATE, INT_32/TIMESTAMP(MILLIS,MICROS), INT_64/TIMESTAMP(MILLIS,MICROS) | datetime |
Defines how table writer saves values to the target table.
expression
: Go expression, can use reader (r.*
), lookup (l.*
), and custom processor (p.*
) fields
type
: one of the supported types
default_value
: default value (specified as string in this setting: "0.0", "true" etc) to be used if left outer lookup produced no value on the right; if omitted, default Go value for this type is used
Defines how file writer saves values to the target file (CSV, Parquet).
name
: column name to be used in having
type
: one of the supported types
expression
: Go expression, can use reader fields only (r.*
)
format
: Go format string to be used when writing a value as text to the file, depends on the column type:
int
: must include%d
float
: must include%f
decimal2
: must include%s
datetime
: must include Go2006-01-02 15:04:05
-style format specifierstring
: must include%s
bool
: must include%t
header
: column header to be used in the target file
column_name
: column name
Parquet writer types:
Capillaries (Go) | Parquet Type/Logical |
---|---|
string | BYTE_ARRAY/UTF8 |
int64 | INT_64 |
float64 | DOUBLE |
bool | BOOLEAN |
decimal2 | INT_64/DECIMAL |
datetime | INT_64/TIMESTAMP(MILLIS) |
Used in w.indexes. Syntax:
[unique|non_unique](order_expression)
where order_expression is an order expression.
A unique index enforces key uniqueness on the database level. Key uniqueness does not affect lookup behaviour.
Used in index definitions, top/order and dependency policy event_priority_order settings. Syntax:
[<field_name>([case_modifier|sort_modifier,...]),...]
where
case_modifier: case_sensitive|ignore_case
sort_modifier: asc|desc
All index, sorting and dependency policy event_priority_order logic implemented by order expressions revolves around string keys built by BuildKey()
in key.go.
The mechanism for complementing a data row from the primary source table with matching data in the secondary (lookup) source table. Capillaries support two types of SQL-style lookups: inner and left outer (see Join operations).
There is no need to perform any setup steps beyond specifying RabbitMQ url in Toolbelt and Daemon configuration. Toolbelt and Daemon will create all required exchanges and queues on the fly. Below is a sample view of RabbitMQ Management Plugin after Toolbelt/Daemon has successfully initialized RabbitMQ:
There is no need to perform any setup steps beyond specifying Cassandra connectivity settings in Toolbelt and Daemon configuration. Toolbelt and Daemon will create keyspaces and tables on the fly. Below is a sample cqlsh session after lookup integration test completed 2 runs by executing
cd ./test/code/lookup/quicktest_local_fs
./1_create_data.sh
./2_two_runs.sh
All data, index and workflow tables are in place here.
Capillaries supports reading source data and configuration files, and writing result data files via SFTP. SFTP URI format used is as follows:
sftp://user@host[:port]/path/to/file
where
user
: name of the user on the targeted host; no user passwords authentication supported; only private key authentication supported, and the path to the private key is given by the private_keys map: a user
entry corresponds to a path to the private key.
host
: target host name or IP address
port
: optional, default is 22
/path/to/file
: full absolute path to the file
Here is the full list of configuration settings where SFTP URIs can be used:
- file reader source URIs
- script_file and script_params URIs used in Capillaries API and exposed via the Toolbelt or the Webapi
- file writer target URIs
Capillaries supports reading source data and configuration files from S3 buckets, and writing result data files to S3 buckets. S3 URI format used is as follows:
https://<bucket_name>.s3.<aws_region>.amazonaws.com/<path_to_file>
Assuming your AWS setup is based on the AWS account <aws_account_id> and it has an IAM user called sampledeployment005-internaluser
.
When accessing S3 buckets, Capillaries performs AWS authentication using credentials stored in ~/.aws/credentials file:
[default]
aws_access_key_id=<AKIA...sampledeployment005-internaluser key>
aws_secret_access_key=<...sampledeployment005-internaluser secret>
and configuration in ~/.aws/config file:
[default]
region=us-east-1
output=json
Alternatively, you can use environment variables:
export AWS_ACCESS_KEY_ID=<AKIA...sampledeployment005-internaluser key>
export AWS_SECRET_ACCESS_KEY=<...sampledeployment005-internaluser secret>
export AWS_DEFAULT_REGION=us-east-1
Example of bucket capillaries-sampledeployment005
permissions setup:
-
Block all public access.
-
Bucket policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<aws_account_id>:user/sampledeployment005-internaluser"
},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::capillaries-sampledeployment005"
},
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<aws_account_id>:user/sampledeployment005-internaluser"
},
"Action": [
"s3:DeleteObject",
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::capillaries-sampledeployment005/*"
}
]
}
If you need to allow an IAM user to access bucket some-external-bucket
that belongs to a different AWS account (not the <aws_account_id> of your IAM user), you may need to add a correspondent policy to sampledeployment005-internaluser. See https://stackoverflow.com/questions/77637011/how-to-provide-access-to-s3-buckets-in-a-different-aws-account
.
aws s3 ls s3://capillaries-sampledeployment005 --recursive
aws s3 cp test/data/in/lookup_quicktest/olist_orders_dataset.csv s3://capillaries-sampledeployment005/capi_in/lookup_quicktest/
aws s3 cp s3://capillaries-sampledeployment005/capi_out/lookup_quicktest/order_item_date_left_outer.csv .
aws s3 cp /tmp/capi_in/fannie_mae_bigtest/ s3://capillaries-sampledeployment005/capi_in/fannie_mae_bigtest --recursive --exclude "*" --include "CAS_2023_R08_G1_*.parquet"
aws s3 cp /tmp/capi_cfg/fannie_mae_bigtest/ s3://capillaries-sampledeploymen t005/capi_cfg/fannie_mae_bigtest --recursive