Skip to content

Commit

Permalink
RSDK-8819: Instrument robot with FTDC. Gated on startup flag. (#4484)
Browse files Browse the repository at this point in the history
Co-authored-by: Benjamin Rewis <[email protected]>
  • Loading branch information
dgottlieb and benjirewis authored Oct 25, 2024
1 parent e84feb2 commit 259a398
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 94 deletions.
125 changes: 77 additions & 48 deletions ftdc/custom_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,35 +32,32 @@ type schema struct {
// writeSchema writes down names for metrics in the form of a json array. All subsequent calls to
// `writeDatum` will assume this "header" representation until the next call to `writeSchema`. A
// full description of the file format is recorded in `doc.go`.
func writeSchema(schema *schema, output io.Writer) {
func writeSchema(schema *schema, output io.Writer) error {
// New schema byte
if _, err := output.Write([]byte{0x1}); err != nil {
panic(err)
return fmt.Errorf("Error writing schema bit: %w", err)
}

encoder := json.NewEncoder(output)
// `json.Encoder.Encode` assumes it convenient to append a newline character at the very
// end. This newline has been included in the format specification. Parsers must read over that.
if err := encoder.Encode(schema.fieldOrder); err != nil {
panic(err)
return fmt.Errorf("Error writing schema: %w", err)
}

return nil
}

// writeDatum writes out the three data format parts associated with every reading: the time, the
// diff bits and the values. See `writeSchema` for a full decsription of the file format.
//
// This may only call this when `len(curr) > 0`. `prev` may be nil or empty. If `prev` is non-empty,
// `len(prev)` must equal `len(curr)`.
func writeDatum(time int64, prev, curr []float32, output io.Writer) {
func writeDatum(time int64, prev, curr []float32, output io.Writer) error {
numPts := len(curr)
if numPts == 0 {
// Handled by the caller.
panic("No points?")
}

if len(prev) != 0 && numPts != len(prev) {
// Handled by the caller.
panic(fmt.Sprintf("Bad input sizes. Prev: %v Curr: %v", len(prev), len(curr)))
//nolint:stylecheck
return fmt.Errorf("Bad input sizes. Prev: %v Curr: %v", len(prev), len(curr))
}

// We first have to calculate the "diff bits".
Expand All @@ -70,13 +67,18 @@ func writeDatum(time int64, prev, curr []float32, output io.Writer) {
copy(diffs, curr)
} else {
for idx := range curr {
// We record the difference in the current reading compared to the previous reading for
// each metric.
diffs[idx] = curr[idx] - prev[idx]
}
}

// One bit per datapoint. And one leading bit for the "metric document identifier" bit.
numBits := numPts + 1

// Some math magic to calculate the number of bytes required to write out `numBits`. Use the
// following examples to build an understanding of how this works:
//
// If numBits < 8 then numBytes = 1,
// ElseIf numBits < 16 then numBytes = 2,
// ElseIf numBits < 24 then numBytes = 3, etc...
Expand All @@ -86,8 +88,8 @@ func writeDatum(time int64, prev, curr []float32, output io.Writer) {
// (and metric document identifier), we create a byte array to bitwise-or into.
diffBits := make([]byte, numBytes)
for diffIdx := range diffs {
// Leading bit is the "schema change" bit with a value of `0`. For a "data header", the
// "schema bit" value is 0. Start "diff bits" at index 1.
// Leading bit is the "schema change" bit. For a "data header", the "schema bit" value is 0.
// Start "diff bits" at index 1.
bitIdx := diffIdx + 1
byteIdx := bitIdx / 8
bitOffset := bitIdx % 8
Expand All @@ -100,22 +102,24 @@ func writeDatum(time int64, prev, curr []float32, output io.Writer) {
}

if _, err := output.Write(diffBits); err != nil {
panic(err)
return fmt.Errorf("Error writing diff bits: %w", err)
}

// Write time between diff bits and values.
if err := binary.Write(output, binary.BigEndian, time); err != nil {
panic(err)
return fmt.Errorf("Error writing time: %w", err)
}

// Write out values for metrics that changed across reading.
for idx, diff := range diffs {
if diff > epsilon {
if err := binary.Write(output, binary.BigEndian, curr[idx]); err != nil {
panic(err)
return fmt.Errorf("Error writing values: %w", err)
}
}
}

return nil
}

var errNotStruct = errors.New("stats object is not a struct")
Expand All @@ -131,13 +135,20 @@ func isNumeric(kind reflect.Kind) bool {

func flattenStruct(item reflect.Value) ([]float32, error) {
flattenPtr := func(inp reflect.Value) reflect.Value {
for inp.Kind() == reflect.Pointer {
for inp.Kind() == reflect.Pointer || inp.Kind() == reflect.Interface {
if inp.IsNil() {
return inp
}

inp = inp.Elem()
}
return inp
}

rVal := flattenPtr(item)
if rVal.Kind() != reflect.Struct {
return []float32{}, nil
}

var numbers []float32
// Use reflection to walk the member fields of an individual set of metric readings. We rely
Expand All @@ -156,19 +167,26 @@ func flattenStruct(item reflect.Value) ([]float32, error) {
numbers = append(numbers, float32(rField.Int()))
case rField.CanFloat():
numbers = append(numbers, float32(rField.Float()))
case rField.Kind() == reflect.Struct:
case rField.Kind() == reflect.Bool:
if rField.Bool() {
numbers = append(numbers, 1)
} else {
numbers = append(numbers, 0)
}
case rField.Kind() == reflect.Struct ||
rField.Kind() == reflect.Pointer ||
rField.Kind() == reflect.Interface:
subNumbers, err := flattenStruct(rField)
if err != nil {
return nil, err
}
numbers = append(numbers, subNumbers...)
case isNumeric(rField.Kind()):
//nolint:stylecheck
return nil, fmt.Errorf("A numeric type was forgotten to be included. Kind: %v", rField.Kind())
default:
// Embedded structs? Just grab a global logger for now. A second pass will better
// validate inputs/remove limitations. And thread through a proper logger if still
// necessary.
logging.Global().Warn("Bad number type. Type:", rField.Type())
// Ignore via writing a 0 and continue.
numbers = append(numbers, 0)
// Getting the keys for a structure will ignore these types. Such as the antagonistic
// `channel`, or `string`. We follow suit in ignoring these types.
}
}

Expand All @@ -185,33 +203,43 @@ func flattenStruct(item reflect.Value) ([]float32, error) {
//
// Will return `["PowerPct", "Pos"]`.
//
// The function right now does not recursively walk data structures. We assume for now that the
// caller will only feed "already flat" structures into FTDC. Later commits will better validate
// input and remove limitations.
func getFieldsForStruct(item reflect.Type) ([]string, error) {
flattenPtr := func(inp reflect.Type) reflect.Type {
for inp.Kind() == reflect.Pointer {
// Nested structures will walk and return a "dot delimited" name. E.g:
//
// type ParentFoo {
// Healthy Bool
// FooField Foo
// }
//
// Will return `["Healthy", "FooField.PowerPct", "FooField.Pos"]`.
func getFieldsForStruct(item reflect.Value) ([]string, error) {
flattenPtr := func(inp reflect.Value) reflect.Value {
for inp.Kind() == reflect.Pointer || inp.Kind() == reflect.Interface {
if inp.IsNil() {
return inp
}
inp = inp.Elem()
}
return inp
}

rType := flattenPtr(item)
if rType.Kind() != reflect.Struct {
return nil, fmt.Errorf("%w Type: %T", errNotStruct, item)
rVal := flattenPtr(item)
if rVal.Kind() != reflect.Struct {
return nil, errNotStruct
}

rType := rVal.Type()
var fields []string
for memberIdx := 0; memberIdx < rType.NumField(); memberIdx++ {
for memberIdx := 0; memberIdx < rVal.NumField(); memberIdx++ {
structField := rType.Field(memberIdx)
fieldType := flattenPtr(structField.Type)
if isNumeric(fieldType.Kind()) {
fieldVal := rVal.Field(memberIdx)
derefedVal := flattenPtr(fieldVal)
if isNumeric(derefedVal.Kind()) {
fields = append(fields, structField.Name)
continue
}

if fieldType.Kind() == reflect.Struct {
subFields, err := getFieldsForStruct(fieldType)
if derefedVal.Kind() == reflect.Struct {
subFields, err := getFieldsForStruct(derefedVal)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -247,7 +275,7 @@ func getSchema(data map[string]any) (*schema, *schemaError) {

for name, stats := range data {
mapOrder = append(mapOrder, name)
fieldsForItem, err := getFieldsForStruct(reflect.TypeOf(stats))
fieldsForItem, err := getFieldsForStruct(reflect.ValueOf(stats))
if err != nil {
return nil, &schemaError{name, err}
}
Expand All @@ -268,7 +296,7 @@ func getSchema(data map[string]any) (*schema, *schemaError) {
// flatten takes an input `Datum` and a `mapOrder` from the current `Schema` and returns a list of
// `float32`s representing the readings. Similar to `getFieldsForItem`, there are constraints on
// input data shape that this code currently does not validate.
func flatten(datum datum, schema *schema) ([]float32, error) {
func flatten(datum Datum, schema *schema) ([]float32, error) {
ret := make([]float32, 0, len(schema.fieldOrder))

for _, key := range schema.mapOrder {
Expand All @@ -290,18 +318,19 @@ func flatten(datum datum, schema *schema) ([]float32, error) {
return ret, nil
}

func parse(rawReader io.Reader) ([]datum, error) {
// Parse reads the entire contents from `rawReader` and returns a list of `Datum`. If an error
// occurs, the []Datum parsed up until the place of the error will be returned, in addition to a
// non-nil error.
func Parse(rawReader io.Reader) ([]Datum, error) {
logger := logging.NewLogger("")
logger.SetLevel(logging.ERROR)

return parseWithLogger(rawReader, logger)
return ParseWithLogger(rawReader, logger)
}

// parse reads the entire contents from `rawReader` and returns a list of `Datum`. If an error
// occurs, the []Datum parsed up until the place of the error will be returned, in addition to a
// non-nil error.
func parseWithLogger(rawReader io.Reader, logger logging.Logger) ([]datum, error) {
ret := make([]datum, 0)
// ParseWithLogger parses with a logger for output.
func ParseWithLogger(rawReader io.Reader, logger logging.Logger) ([]Datum, error) {
ret := make([]Datum, 0)

// prevValues are the previous values used for producing the diff bits. This is overwritten when
// a new metrics reading is made. and nilled out when the schema changes.
Expand Down Expand Up @@ -378,7 +407,7 @@ func parseWithLogger(rawReader io.Reader, logger logging.Logger) ([]datum, error

// Construct a `Datum` that hydrates/merged the full set of float32 metrics with the metric
// names as written in the most recent schema document.
ret = append(ret, datum{
ret = append(ret, Datum{
Time: dataTime,
Data: schema.Hydrate(data),
})
Expand Down
Loading

0 comments on commit 259a398

Please sign in to comment.