Use the prototransform
library to simplify your data transformation &
collection. Our simple package allows the caller to convert a given message data
blob from one format to another by referring to a type schema on the Buf Schema
Registry.
- No need to bake in proto files
- Supports Binary, JSON and Text formats
- Extensible for other/custom formats
prototransform
is designed to be flexible enough to fit quickly into your
development environment.
Here's an example of how you could use prototransform
to transform messages
received from a PubSub topic...
Whilst prototransform
has various applications, converting messages off some
kind of message queue is a primary use-case. This can take many forms, for the
purposes of simplicity we will look at this abstractly in a pub/sub model where
we want to:
- Open a subscription to a topic with the Pub/Sub service of your choice
- Start a
SchemaWatcher
to fetch a module from the Buf Schema Registry - Receive, Transform and Acknowledge messages from the topic
import (
"context"
"fmt"
"github.com/bufbuild/prototransform"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/<driver>"
)
...
subs, err := pubsub.OpenSubscription(ctx, "<driver-url>")
if err != nil {
return fmt.Errorf("could not open topic subscription: %v", err)
}
defer subs.Shutdown(ctx)
// Supply auth credentials to the BSR
client := prototransform.NewDefaultFileDescriptorSetServiceClient("<bsr-token>")
// Configure the module for schema watcher
cfg := &prototransform.SchemaWatcherConfig{
SchemaPoller: prototransform.NewSchemaPoller(
client,
"buf.build/someuser/somerepo", // BSR module
"some-tag", // tag or draft name or leave blank for "latest"
),
}
watcher, err := prototransform.NewSchemaWatcher(ctx, cfg)
if err != nil {
return fmt.Errorf("failed to create schema watcher: %v", err)
}
defer watcher.Stop()
// before we start processing messages, make sure the schema has been
// downloaded
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := watcher.AwaitReady(ctx); err != nil {
return fmt.Errorf("schema watcher never became ready: %v", err)
}
...
A SchemaWatcher
is the entrypoint of prototransform
. This is created first
so your code can connect to the Buf Schema Registry and fetch a schema to be used
to transform and/or filter payloads.
A Converter
implements the functionality to convert payloads to different formats
and optionally filter/mutate messages during this transformation. In the following
example, we have initialized a *prototransform.Converter
which expects a binary
input and will return JSON.
...
converter := &prototransform.Converter{
Resolver: schemaWatcher,
InputFormat: prototransform.BinaryInputFormat(proto.UnmarshalOptions{}),
OutputFormat: prototransform.JSONOutputFormat(protojson.MarshalOptions{}),
}
...
Out of the box, you can supply proto
, protojson
and prototext
here but
feel free to supply your own custom formats as-well.
FORMAT | InputFormat | OutputFormat |
---|---|---|
JSON | prototransform.JSONInputFormat() |
prototransform.JSONOutputFormat() |
TEXT | prototransform.TEXTInputFormat() |
prototransform.TEXTOutputFormat() |
Binary | prototransform.BinaryInputFormat() |
prototransform.BinaryOutputFormat() |
Now that we have an active subscription, schema watcher, and converter, we can start processing messages. A simple subscriber that transforms received messages looks like this:
...
// Loop on received messages.
for {
msg, err := subscription.Receive(ctx)
if err != nil {
log.Printf("Receiving message: %v", err)
break
}
// Do transformation based on the message name
convertedMessage, err := converter.ConvertMessage("<message-name>", msg.Body)
if err != nil {
log.Printf("Converting message: %v", err)
break
}
fmt.Printf("Converted message: %q\n", convertedMessage)
msg.Ack()
}
...
For illustrative purposes, let's assume that the topic we have subscribed to is
buf.connect.demo.eliza.v1
, we have the module stored on the BSR here.
We would configure the message name as buf.connect.demo.eliza.v1.ConverseRequest
.
A SchemaWatcher
can be configured with a user-supplied cache
implementation, to act as a fallback when fetching schemas. The interface is of
the form:
type Cache interface {
Load(ctx context.Context, key string) ([]byte, error)
Save(ctx context.Context, key string, data []byte) error
}
This repo provides three implementations that you can use:
filecache
: Cache schemas in local files.rediscache
: Cache schemas in a shared Redis server.memcache
: Cache schemas in a shared memcached server.
A use-case exists where the values within the output message should differ from the input given some set of defined rules. For example, Personally Identifiable Information(PII) may want to be removed from a message before it is piped into a sink. For this reason, we have supplied Filters.
Here's an example where we have defined a custom annotation to mark fields
as sensitive
:
syntax = "proto3";
package foo.v1;
// ...
extend google.protobuf.FieldOptions {
bool sensitive = 30000;
}
// ...
message FooMessage {
string name = 1 [(sensitive) = true];
}
We then use prototransform.Redact()
to create a filter and
supply it to our converter via its Filters
field:
...
isSensitive := func (in protoreflect.FieldDescriptor) bool {
return proto.GetExtension(in.Options(), foov1.E_Sensitive).(bool)
}
filter := prototransform.Redact(isSensitive)
converter.Filters = prototransform.Filters{filter}
...
Now, any attribute marked as "sensitive" will be omitted from the output produced by the converter.
This package also includes a predicate named HasDebugRedactOption
that
can be used to redact data for fields that have the debug_redact
standard
option set (this option was introduced in protoc
v22.0).
For help and discussion around Protobuf, best practices, and more, join us on Slack.
This project is currently in alpha. The API should be considered unstable and likely to change.
Offered under the Apache 2 license.