Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestamps comparison in do_if #654

Merged
merged 18 commits into from
Sep 5, 2024
126 changes: 113 additions & 13 deletions fd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ var (
"byte_len_cmp": {},
"array_len_cmp": {},
}
doIfTimestampCmpOpNodes = map[string]struct{}{
"ts_cmp": {},
}
)

func extractFieldOpVals(jsonNode *simplejson.Json) [][]byte {
Expand Down Expand Up @@ -265,41 +268,136 @@ func noRequiredFieldError(field string) error {
return fmt.Errorf("no required field: %s", field)
}

func requiredString(jsonNode *simplejson.Json, fieldName string) (string, error) {
node, has := jsonNode.CheckGet(fieldName)
if !has {
return "", noRequiredFieldError(fieldName)
}

result, err := node.String()
if err != nil {
return "", err
}

return result, nil
}

func requiredInt(jsonNode *simplejson.Json, fieldName string) (int, error) {
node, has := jsonNode.CheckGet(fieldName)
if !has {
return 0, noRequiredFieldError(fieldName)
}

result, err := node.Int()
if err != nil {
return 0, err
}

return result, nil
}

const (
fieldNameField = "field"
fieldNameCmpOp = "cmp_op"
fieldNameCmpValue = "value"
)

func extractLengthCmpOpNode(opName string, jsonNode *simplejson.Json) (doif.Node, error) {
fieldPathNode, has := jsonNode.CheckGet(fieldNameField)
if !has {
return nil, noRequiredFieldError(fieldNameField)
fieldPath, err := requiredString(jsonNode, fieldNameField)
if err != nil {
return nil, err
}
fieldPath, err := fieldPathNode.String()

cmpOp, err := requiredString(jsonNode, fieldNameCmpOp)
if err != nil {
return nil, err
}

cmpOpNode, has := jsonNode.CheckGet(fieldNameCmpOp)
if !has {
return nil, noRequiredFieldError(fieldNameCmpOp)
cmpValue, err := requiredInt(jsonNode, fieldNameCmpValue)
if err != nil {
return nil, err
}
cmpOp, err := cmpOpNode.String()

return doif.NewLenCmpOpNode(opName, fieldPath, cmpOp, cmpValue)
}

const (
fieldNameFormat = "format"
fieldNameUpdateInterval = "update_interval"
fieldNameCmpValueShift = "value_shift"
)

const (
tsCmpModeNowTag = "now"
tsCmpModeConstTag = "const"

tsCmpValueNowTag = "now"
tsCmpValueStartTag = "file_d_start"
)

const (
defaultTsCmpValUpdateInterval = 10 * time.Second
defaultTsFormat = time.RFC3339Nano
)

func extractTsCmpOpNode(_ string, jsonNode *simplejson.Json) (doif.Node, error) {
fieldPath, err := requiredString(jsonNode, fieldNameField)
if err != nil {
return nil, err
}

cmpValueNode, has := jsonNode.CheckGet(fieldNameCmpValue)
if !has {
return nil, noRequiredFieldError(fieldNameCmpValue)
cmpOp, err := requiredString(jsonNode, fieldNameCmpOp)
if err != nil {
return nil, err
}
cmpValue, err := cmpValueNode.Int()

rawCmpValue, err := requiredString(jsonNode, fieldNameCmpValue)
if err != nil {
return nil, err
}

return doif.NewLenCmpOpNode(opName, fieldPath, cmpOp, cmpValue)
var cmpMode string
var cmpValue time.Time

switch rawCmpValue {
case tsCmpValueNowTag:
cmpMode = tsCmpModeNowTag
case tsCmpValueStartTag:
cmpMode = tsCmpModeConstTag
cmpValue = time.Now()
default:
cmpMode = tsCmpModeConstTag
cmpValue, err = time.Parse(time.RFC3339Nano, rawCmpValue)
if err != nil {
return nil, fmt.Errorf("parse ts cmp value: %w", err)
}
}

format := defaultTsFormat
str := jsonNode.Get(fieldNameFormat).MustString()
if str != "" {
format = str
}

cmpValueShift := time.Duration(0)
str = jsonNode.Get(fieldNameCmpValueShift).MustString()
if str != "" {
cmpValueShift, err = time.ParseDuration(str)
if err != nil {
return nil, fmt.Errorf("parse cmp value shift: %w", err)
}
}

updateInterval := defaultTsCmpValUpdateInterval
str = jsonNode.Get(fieldNameUpdateInterval).MustString()
if str != "" {
updateInterval, err = time.ParseDuration(str)
if err != nil {
return nil, fmt.Errorf("parse update interval: %w", err)
}
}

return doif.NewTsCmpOpNode(fieldPath, format, cmpOp, cmpMode, cmpValue, cmpValueShift, updateInterval)
}

func extractLogicalOpNode(opName string, jsonNode *simplejson.Json) (doif.Node, error) {
Expand Down Expand Up @@ -334,6 +432,8 @@ func extractDoIfNode(jsonNode *simplejson.Json) (doif.Node, error) {
return extractFieldOpNode(opName, jsonNode)
} else if _, has := doIfLengthCmpOpNodes[opName]; has {
return extractLengthCmpOpNode(opName, jsonNode)
} else if _, has := doIfTimestampCmpOpNodes[opName]; has {
return extractTsCmpOpNode(opName, jsonNode)
} else {
return nil, fmt.Errorf("unknown op %q", opName)
}
Expand Down
160 changes: 160 additions & 0 deletions fd/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"testing"
"time"

"github.com/bitly/go-simplejson"
"github.com/ozontech/file.d/pipeline"
Expand Down Expand Up @@ -51,6 +52,13 @@ type doIfTreeNode struct {
lenCmpOp string
cmpOp string
cmpValue int

tsCmpOp bool
tsFormat string
tsCmpValChangeMode string
tsCmpValue time.Time
tsCmpValueShift time.Duration
tsUpdateInterval time.Duration
}

// nolint:gocritic
Expand Down Expand Up @@ -78,6 +86,16 @@ func buildDoIfTree(node *doIfTreeNode) (doif.Node, error) {
)
case node.lenCmpOp != "":
return doif.NewLenCmpOpNode(node.lenCmpOp, node.fieldName, node.cmpOp, node.cmpValue)
case node.tsCmpOp:
return doif.NewTsCmpOpNode(
node.fieldName,
node.tsFormat,
node.cmpOp,
node.tsCmpValChangeMode,
node.tsCmpValue,
node.tsCmpValueShift,
node.tsUpdateInterval,
)
default:
return nil, errors.New("unknown type of node")
}
Expand Down Expand Up @@ -128,6 +146,14 @@ func Test_extractDoIfChecker(t *testing.T) {
"cmp_op": "lt",
"value": 100
},
{
"op": "ts_cmp",
"field": "timestamp",
"cmp_op": "lt",
"value": "2009-11-10T23:00:00Z",
"format": "2006-01-02T15:04:05.999999999Z07:00",
"update_interval": "15s"
},
{
"op": "or",
"operands": [
Expand Down Expand Up @@ -185,6 +211,15 @@ func Test_extractDoIfChecker(t *testing.T) {
fieldName: "items",
cmpValue: 100,
},
{
tsCmpOp: true,
cmpOp: "lt",
fieldName: "timestamp",
tsFormat: time.RFC3339Nano,
tsCmpValue: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
tsCmpValChangeMode: tsCmpModeConstTag,
tsUpdateInterval: 15 * time.Second,
},
{
logicalOp: "or",
operands: []*doIfTreeNode{
Expand Down Expand Up @@ -244,6 +279,48 @@ func Test_extractDoIfChecker(t *testing.T) {
cmpValue: 10,
},
},
{
name: "ok_ts_cmp_op",
args: args{
cfgStr: `{
"op": "ts_cmp",
"field": "timestamp",
"cmp_op": "lt",
"value": "2009-11-10T23:00:00Z",
"value_shift": "-24h",
"format": "2006-01-02T15:04:05Z07:00",
"update_interval": "15s"}`,
},
want: &doIfTreeNode{
tsCmpOp: true,
cmpOp: "lt",
fieldName: "timestamp",
tsFormat: time.RFC3339,
tsCmpValue: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
tsCmpValueShift: -24 * time.Hour,
tsCmpValChangeMode: tsCmpModeConstTag,
tsUpdateInterval: 15 * time.Second,
},
},
{
name: "ok_ts_cmp_op_default_settings",
args: args{
cfgStr: `{
"op": "ts_cmp",
"field": "timestamp",
"cmp_op": "lt",
"value": "now"}`,
},
want: &doIfTreeNode{
tsCmpOp: true,
cmpOp: "lt",
fieldName: "timestamp",
tsCmpValChangeMode: tsCmpModeNowTag,
tsFormat: defaultTsFormat,
tsCmpValueShift: 0,
tsUpdateInterval: defaultTsCmpValUpdateInterval,
},
},
{
name: "ok_single_val",
args: args{
Expand Down Expand Up @@ -375,6 +452,89 @@ func Test_extractDoIfChecker(t *testing.T) {
args: args{cfgStr: `{"op":"byte_len_cmp","field":"data","cmp_op":"lt","value":-1}`},
wantErr: true,
},
{
name: "error_ts_cmp_op_no_field",
args: args{
cfgStr: `{"op": "ts_cmp","cmp_op": "lt"}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_field_is_not_string",
args: args{
cfgStr: `{"op":"ts_cmp","field":123}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_no_format",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp"}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_format_is_not_string",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":123}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_no_cmp_op",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":"2006-01-02T15:04:05.999999999Z07:00"}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_cmp_op_is_not_string",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":"2006-01-02T15:04:05.999999999Z07:00","cmp_op":123}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_no_cmp_value",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":"2006-01-02T15:04:05.999999999Z07:00","cmp_op":"lt"}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_cmp_value_is_not_string",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":"2006-01-02T15:04:05.999999999Z07:00","cmp_op":"lt","value":123}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_invalid_cmp_value",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":"2006-01-02T15:04:05.999999999Z07:00","cmp_op":"lt","value":"qwe"}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_invalid_cmp_op",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":"2006-01-02T15:04:05.999999999Z07:00","cmp_op":"qwe","value":"2009-11-10T23:00:00Z"}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_invalid_update_interval",
args: args{
cfgStr: `{
"op": "ts_cmp",
"field": "timestamp",
"cmp_op": "lt",
"value": "2009-11-10T23:00:00Z",
"format": "2006-01-02T15:04:05.999999999Z07:00",
"update_interval": "qwe"}`,
},
wantErr: true,
},
}
for _, tt := range tests {
tt := tt
Expand Down
3 changes: 3 additions & 0 deletions pipeline/doif/README.idoc.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ the chain of Match func calls are performed across the whole tree.

### Length comparison op node
@do-if-len-cmp-op-node

### Timestamp comparison op node
@do-if-ts-cmp-op-node
Loading
Loading