Add sFlow input plugin (#7188)
This commit is contained in:
402
plugins/inputs/sflow/decoder/directives.go
Normal file
402
plugins/inputs/sflow/decoder/directives.go
Normal file
@@ -0,0 +1,402 @@
|
||||
package decoder
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
)
|
||||
|
||||
// Directive is a Decode Directive, the basic building block of a decoder
|
||||
type Directive interface {
|
||||
|
||||
// Execute performs the function of the decode directive. If DecodeContext is nil then the
|
||||
// ask is to check that a subsequent execution (with non nill DecodeContext) is expted to work.
|
||||
Execute(*bytes.Buffer, *DecodeContext) error
|
||||
}
|
||||
|
||||
type IterOption struct {
|
||||
EOFTerminateIter bool
|
||||
RemainingToGreaterEqualOrTerminate uint32
|
||||
}
|
||||
|
||||
// ValueDirective is a decode directive that extracts some data from the packet, an integer or byte maybe,
|
||||
// which it then processes by using it, for example, as the counter for the number of iterations to perform
|
||||
// of downstream decode directives.
|
||||
//
|
||||
// A ValueDirective can be used to either Switch, Iter(ate), Encapsulate or Do mutually exclusively.
|
||||
type ValueDirective interface {
|
||||
Directive
|
||||
|
||||
// Switch attaches a set of conditional decode directives downstream of this decode directive
|
||||
Switch(paths ...CaseValueDirective) ValueDirective
|
||||
|
||||
// Iter attaches a single downstream decode directive that will be executed repeatedly according to the iteration count
|
||||
Iter(maxIterations uint32, dd Directive, iterOptions ...IterOption) ValueDirective
|
||||
|
||||
// Encapsulated will form a new buffer of the encapsulated length and pass that buffer on to the downsstream decode directive
|
||||
Encapsulated(maxSize uint32, dd Directive) ValueDirective
|
||||
|
||||
// Ref records this decode directive in the passed reference
|
||||
Ref(*interface{}) ValueDirective
|
||||
|
||||
// Do attaches a Decode Operation - these are uses of the decoded information to perform work on, transform, write out etc.
|
||||
Do(ddo DirectiveOp) ValueDirective
|
||||
}
|
||||
|
||||
type valueDirective struct {
|
||||
reference *valueDirective
|
||||
|
||||
value interface{}
|
||||
noDecode bool
|
||||
|
||||
cases []CaseValueDirective
|
||||
iter Directive
|
||||
maxIterations uint32
|
||||
encapsulated Directive
|
||||
maxEncapsulation uint32
|
||||
ops []DirectiveOp
|
||||
err error
|
||||
|
||||
iterOption IterOption
|
||||
}
|
||||
|
||||
func valueToString(in interface{}) string {
|
||||
switch v := in.(type) {
|
||||
case *uint16:
|
||||
return fmt.Sprintf("%d", *v)
|
||||
case uint16:
|
||||
return fmt.Sprintf("%d", v)
|
||||
case *uint32:
|
||||
return fmt.Sprintf("%d", *v)
|
||||
case uint32:
|
||||
return fmt.Sprintf("%d", v)
|
||||
default:
|
||||
return fmt.Sprintf("%v", in)
|
||||
}
|
||||
}
|
||||
|
||||
func (dd *valueDirective) Execute(buffer *bytes.Buffer, dc *DecodeContext) error {
|
||||
if dd.reference == nil && !dd.noDecode {
|
||||
if e := binary.Read(buffer, binary.BigEndian, dd.value); e != nil {
|
||||
return e
|
||||
}
|
||||
}
|
||||
|
||||
// Switch downstream?
|
||||
if dd.cases != nil && len(dd.cases) > 0 {
|
||||
for _, c := range dd.cases {
|
||||
if c.Equals(dd.value) {
|
||||
return c.Execute(buffer, dc)
|
||||
}
|
||||
}
|
||||
switch v := dd.value.(type) {
|
||||
case *uint32:
|
||||
return fmt.Errorf("(%T).Switch,unmatched case %d", v, *v)
|
||||
case *uint16:
|
||||
return fmt.Errorf("(%T).Switch,unmatched case %d", v, *v)
|
||||
default:
|
||||
return fmt.Errorf("(%T).Switch,unmatched case %v", dd.value, dd.value)
|
||||
}
|
||||
}
|
||||
|
||||
// Iter downstream?
|
||||
if dd.iter != nil {
|
||||
fn := func(id interface{}) error {
|
||||
if dd.iterOption.RemainingToGreaterEqualOrTerminate > 0 && uint32(buffer.Len()) < dd.iterOption.RemainingToGreaterEqualOrTerminate {
|
||||
return nil
|
||||
}
|
||||
if dd.iterOption.EOFTerminateIter && buffer.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if e := dd.iter.Execute(buffer, dc); e != nil {
|
||||
return e
|
||||
}
|
||||
return nil
|
||||
}
|
||||
switch v := dd.value.(type) {
|
||||
case *uint32:
|
||||
if *v > dd.maxIterations {
|
||||
return fmt.Errorf("iter exceeds configured max - value %d, limit %d", *v, dd.maxIterations)
|
||||
}
|
||||
for i := uint32(0); i < *v; i++ {
|
||||
if e := fn(i); e != nil {
|
||||
return e
|
||||
}
|
||||
}
|
||||
case *uint16:
|
||||
if *v > uint16(dd.maxIterations) {
|
||||
return fmt.Errorf("iter exceeds configured max - value %d, limit %d", *v, dd.maxIterations)
|
||||
}
|
||||
for i := uint16(0); i < *v; i++ {
|
||||
if e := fn(i); e != nil {
|
||||
return e
|
||||
}
|
||||
}
|
||||
default:
|
||||
// Can't actually get here if .Iter method check types (and it does)
|
||||
return fmt.Errorf("(%T).Iter, cannot iterator over this type", dd.value)
|
||||
}
|
||||
}
|
||||
|
||||
// Encapsualted downstream>
|
||||
if dd.encapsulated != nil {
|
||||
switch v := dd.value.(type) {
|
||||
case *uint32:
|
||||
if *v > dd.maxEncapsulation {
|
||||
return fmt.Errorf("encap exceeds configured max - value %d, limit %d", *v, dd.maxEncapsulation)
|
||||
}
|
||||
return dd.encapsulated.Execute(bytes.NewBuffer(buffer.Next(int(*v))), dc)
|
||||
case *uint16:
|
||||
if *v > uint16(dd.maxEncapsulation) {
|
||||
return fmt.Errorf("encap exceeds configured max - value %d, limit %d", *v, dd.maxEncapsulation)
|
||||
}
|
||||
return dd.encapsulated.Execute(bytes.NewBuffer(buffer.Next(int(*v))), dc)
|
||||
}
|
||||
}
|
||||
|
||||
// Perform the attached operations
|
||||
for _, op := range dd.ops {
|
||||
if err := op.process(dc, dd.value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// panickIfNotBlackCanvas checks the state of this value directive to see if it is has
|
||||
// alrady been configured in a manner inconsistent with another configuration change
|
||||
func (dd *valueDirective) panickIfNotBlackCanvas(change string, checkDOs bool) {
|
||||
if dd.cases != nil {
|
||||
panic(fmt.Sprintf("already have switch cases assigned, cannot assign %s", change))
|
||||
}
|
||||
if dd.iter != nil {
|
||||
panic(fmt.Sprintf("already have iter assigned, cannot assign %s", change))
|
||||
}
|
||||
if dd.encapsulated != nil {
|
||||
panic(fmt.Sprintf("already have encap assigned, cannot assign %s @", change))
|
||||
}
|
||||
if checkDOs && dd.ops != nil && len(dd.ops) > 0 {
|
||||
panic(fmt.Sprintf("already have do assigned, cannot assign %s", change))
|
||||
}
|
||||
}
|
||||
|
||||
func (dd *valueDirective) Switch(paths ...CaseValueDirective) ValueDirective {
|
||||
dd.panickIfNotBlackCanvas("new switch", true)
|
||||
dd.cases = paths
|
||||
return dd
|
||||
}
|
||||
|
||||
func (dd *valueDirective) Iter(maxIterations uint32, iter Directive, iterOptions ...IterOption) ValueDirective {
|
||||
dd.panickIfNotBlackCanvas("new iter", true)
|
||||
switch dd.value.(type) {
|
||||
case *uint32:
|
||||
case *uint16:
|
||||
default:
|
||||
panic(fmt.Sprintf("cannot iterate a %T", dd.value))
|
||||
}
|
||||
|
||||
dd.iter = iter
|
||||
dd.maxIterations = maxIterations
|
||||
for _, io := range iterOptions {
|
||||
dd.iterOption = io
|
||||
}
|
||||
return dd
|
||||
}
|
||||
|
||||
func (dd *valueDirective) Encapsulated(maxSize uint32, encapsulated Directive) ValueDirective {
|
||||
dd.panickIfNotBlackCanvas("new encapsulated", true)
|
||||
switch dd.value.(type) {
|
||||
case *uint32:
|
||||
case *uint16:
|
||||
default:
|
||||
panic(fmt.Sprintf("cannot encapsulated on a %T", dd.value))
|
||||
}
|
||||
|
||||
dd.encapsulated = encapsulated
|
||||
dd.maxEncapsulation = maxSize
|
||||
return dd
|
||||
}
|
||||
|
||||
func (dd *valueDirective) Do(ddo DirectiveOp) ValueDirective {
|
||||
dd.panickIfNotBlackCanvas("new do", false)
|
||||
for {
|
||||
if ddo.prev() == nil {
|
||||
break
|
||||
}
|
||||
ddo = ddo.prev()
|
||||
}
|
||||
if err := ddo.process(nil, dd.value); err != nil {
|
||||
panic(fmt.Sprintf("directive operation %T cannot process %T - %s", ddo, dd.value, err))
|
||||
}
|
||||
if dd.ops == nil {
|
||||
dd.ops = make([]DirectiveOp, 0, 5)
|
||||
}
|
||||
dd.ops = append(dd.ops, ddo)
|
||||
|
||||
return dd
|
||||
}
|
||||
|
||||
func (dd *valueDirective) Ref(ref *interface{}) ValueDirective {
|
||||
if *ref != nil {
|
||||
panic("ref already assigned, not overwritting")
|
||||
}
|
||||
*ref = dd
|
||||
return dd
|
||||
}
|
||||
|
||||
// errorDirective a decode directive that reports an error
|
||||
type errorDirective struct {
|
||||
Directive
|
||||
}
|
||||
|
||||
func (dd *errorDirective) Execute(buffer *bytes.Buffer, dc *DecodeContext) error {
|
||||
return fmt.Errorf("Error Directive")
|
||||
}
|
||||
|
||||
// CaseValueDirective is a decode directive that also has a switch/case test
|
||||
type CaseValueDirective interface {
|
||||
Directive
|
||||
Equals(interface{}) bool
|
||||
}
|
||||
|
||||
type caseValueDirective struct {
|
||||
caseValue interface{}
|
||||
isDefault bool
|
||||
equalsDd Directive
|
||||
}
|
||||
|
||||
func (dd *caseValueDirective) Execute(buffer *bytes.Buffer, dc *DecodeContext) error {
|
||||
if dd.equalsDd == nil {
|
||||
return nil
|
||||
}
|
||||
return dd.equalsDd.Execute(buffer, dc)
|
||||
}
|
||||
|
||||
func (dd *caseValueDirective) Equals(value interface{}) bool {
|
||||
if dd.isDefault {
|
||||
return true
|
||||
}
|
||||
switch ourV := dd.caseValue.(type) {
|
||||
case uint32:
|
||||
ov, ok := value.(*uint32)
|
||||
if ok {
|
||||
return ourV == *ov
|
||||
}
|
||||
case uint16:
|
||||
ov, ok := value.(*uint16)
|
||||
if ok {
|
||||
return ourV == *ov
|
||||
}
|
||||
case byte:
|
||||
ov, ok := value.([]byte)
|
||||
if ok {
|
||||
if len(ov) == 1 {
|
||||
return ourV == ov[0]
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// sequenceDirective is a decode directive that is a simple sequentially executed list of other decode directives
|
||||
type sequenceDirective struct {
|
||||
decoders []Directive
|
||||
}
|
||||
|
||||
func (di *sequenceDirective) Execute(buffer *bytes.Buffer, dc *DecodeContext) error {
|
||||
for _, innerDD := range di.decoders {
|
||||
if err := innerDD.Execute(buffer, dc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// openMetric a decode directive that opens the recording of new fields and tags
|
||||
type openMetric struct {
|
||||
name string
|
||||
}
|
||||
|
||||
func (di *openMetric) Execute(buffer *bytes.Buffer, dc *DecodeContext) error {
|
||||
dc.openMetric(di.name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// closeMetric a decode directive that closes the current open metric
|
||||
type closeMetric struct {
|
||||
}
|
||||
|
||||
func (di *closeMetric) Execute(buffer *bytes.Buffer, dc *DecodeContext) error {
|
||||
dc.closeMetric()
|
||||
return nil
|
||||
}
|
||||
|
||||
// DecodeContext provides context for the decoding of a packet and primarily acts
|
||||
// as a repository for metrics that are collected during the packet decode process
|
||||
type DecodeContext struct {
|
||||
metrics []telegraf.Metric
|
||||
timeHasBeenSet bool
|
||||
|
||||
// oreMetric is used to capture tags or fields that may be recored before a metric has been openned
|
||||
// these fields and tags are then copied into metrics that are then subsequently opened
|
||||
preMetric telegraf.Metric
|
||||
current telegraf.Metric
|
||||
nano int
|
||||
}
|
||||
|
||||
func (dc *DecodeContext) openMetric(name string) {
|
||||
t := dc.preMetric.Time()
|
||||
if !dc.timeHasBeenSet {
|
||||
t = time.Now().Add(time.Duration(dc.nano))
|
||||
}
|
||||
m, _ := metric.New(name, make(map[string]string), make(map[string]interface{}), t)
|
||||
dc.nano++
|
||||
// make sure to copy any fields and tags that were capture prior to the metric being openned
|
||||
for t, v := range dc.preMetric.Tags() {
|
||||
m.AddTag(t, v)
|
||||
}
|
||||
for f, v := range dc.preMetric.Fields() {
|
||||
m.AddField(f, v)
|
||||
}
|
||||
dc.current = m
|
||||
}
|
||||
|
||||
func (dc *DecodeContext) closeMetric() {
|
||||
if dc.current != nil {
|
||||
dc.metrics = append(dc.metrics, dc.current)
|
||||
}
|
||||
dc.current = nil
|
||||
}
|
||||
|
||||
func (dc *DecodeContext) currentMetric() telegraf.Metric {
|
||||
if dc.current == nil {
|
||||
return dc.preMetric
|
||||
}
|
||||
return dc.current
|
||||
}
|
||||
|
||||
// Decode initiates the decoding of the supplied buffer according to the root decode directive that is provided
|
||||
func (dc *DecodeContext) Decode(dd Directive, buffer *bytes.Buffer) error {
|
||||
return dd.Execute(buffer, dc)
|
||||
}
|
||||
|
||||
// GetMetrics answers the metrics that have been collected during the packet decode
|
||||
func (dc *DecodeContext) GetMetrics() []telegraf.Metric {
|
||||
return dc.metrics
|
||||
}
|
||||
|
||||
type notifyDirective struct {
|
||||
fn func()
|
||||
}
|
||||
|
||||
func (nd *notifyDirective) Execute(_ *bytes.Buffer, dc *DecodeContext) error {
|
||||
if dc != nil {
|
||||
nd.fn()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
582
plugins/inputs/sflow/decoder/directives_test.go
Normal file
582
plugins/inputs/sflow/decoder/directives_test.go
Normal file
@@ -0,0 +1,582 @@
|
||||
package decoder
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Execute will ececute the decode directive relative to the supplied buffer
|
||||
func Execute(dd Directive, buffer *bytes.Buffer) error {
|
||||
dc := &DecodeContext{}
|
||||
return dd.Execute(buffer, dc)
|
||||
}
|
||||
|
||||
func Test_basicUI32NotEnoughBytes(t *testing.T) {
|
||||
dd := U32()
|
||||
value := uint16(1001) // not enough bytes to read a U32 out as only a U16 in
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
require.Error(t, Execute(dd, &buffer))
|
||||
}
|
||||
|
||||
func Test_basicUI32(t *testing.T) {
|
||||
dd := U32()
|
||||
value := uint32(1001)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
require.Equal(t, 0, buffer.Len())
|
||||
x, _ := dd.(*valueDirective)
|
||||
require.Equal(t, &value, x.value)
|
||||
}
|
||||
|
||||
func Test_basicBytes(t *testing.T) {
|
||||
dd := Bytes(4)
|
||||
value := []byte{0x01, 0x02, 0x03, 0x04}
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
require.Equal(t, 0, buffer.Len())
|
||||
x, _ := dd.(*valueDirective)
|
||||
require.Equal(t, value, x.value)
|
||||
}
|
||||
|
||||
func Test_basicSeq(t *testing.T) {
|
||||
|
||||
// Seq with no members compiles and executed but buffer is left untouched
|
||||
dd := Seq()
|
||||
value := uint32(1001)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
originalLen := buffer.Len()
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
require.Equal(t, originalLen, buffer.Len())
|
||||
|
||||
u := U32()
|
||||
dd = Seq(
|
||||
u,
|
||||
)
|
||||
value = uint32(1001)
|
||||
buffer.Reset()
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
require.Equal(t, 0, buffer.Len())
|
||||
x, _ := u.(*valueDirective)
|
||||
require.Equal(t, &value, x.value)
|
||||
}
|
||||
|
||||
func Test_basicSeqOf(t *testing.T) {
|
||||
// SeqOf with no members compiles and executed but buffer is left untouched
|
||||
dd := SeqOf([]Directive{})
|
||||
value := uint32(1001)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
originalLen := buffer.Len()
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
require.Equal(t, originalLen, buffer.Len())
|
||||
|
||||
u := U32()
|
||||
dd = SeqOf(
|
||||
[]Directive{u},
|
||||
)
|
||||
value = uint32(1001)
|
||||
buffer.Reset()
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
require.Equal(t, 0, buffer.Len())
|
||||
x, _ := u.(*valueDirective)
|
||||
require.Equal(t, &value, x.value)
|
||||
}
|
||||
|
||||
func Test_errorInSeq(t *testing.T) {
|
||||
// Seq with no members compiles and executed but buffer is left untouched
|
||||
dd := Seq(U32(), ErrorDirective())
|
||||
value := uint32(1001)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
require.Error(t, Execute(dd, &buffer))
|
||||
}
|
||||
|
||||
func Test_basicU32Switch(t *testing.T) {
|
||||
c1 := U32()
|
||||
c2 := U32()
|
||||
dd := U32().Switch(
|
||||
Case(uint32(1), c1),
|
||||
Case(uint32(2), c2),
|
||||
)
|
||||
|
||||
value1 := uint32(3)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value1))
|
||||
value2 := uint32(4)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value2))
|
||||
require.Error(t, Execute(dd, &buffer)) // should error as no path
|
||||
|
||||
value1 = uint32(1)
|
||||
buffer.Reset()
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value1))
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value2))
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
x, _ := c1.(*valueDirective)
|
||||
y, _ := c2.(*valueDirective)
|
||||
value0 := uint32(0)
|
||||
require.Equal(t, &value2, x.value)
|
||||
require.Equal(t, &value0, y.value)
|
||||
|
||||
// bad path shoudl raise error
|
||||
// path 1 should be able to fina value in c1 and not in c2
|
||||
// then other way around
|
||||
}
|
||||
|
||||
func Test_basicBinSwitch(t *testing.T) {
|
||||
c1 := U32()
|
||||
c2 := U32()
|
||||
dd := Bytes(1).Switch(
|
||||
Case(byte(1), c1),
|
||||
Case(byte(2), c2),
|
||||
)
|
||||
|
||||
value1 := byte(3)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value1))
|
||||
value2 := uint32(4)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value2))
|
||||
require.Error(t, Execute(dd, &buffer)) // should error as no path
|
||||
|
||||
value1 = byte(1)
|
||||
buffer.Reset()
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value1))
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value2))
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
x, _ := c1.(*valueDirective)
|
||||
y, _ := c2.(*valueDirective)
|
||||
value0 := uint32(0)
|
||||
require.Equal(t, &value2, x.value)
|
||||
require.Equal(t, &value0, y.value)
|
||||
|
||||
// bad path shoudl raise error
|
||||
// path 1 should be able to fina value in c1 and not in c2
|
||||
// then other way around
|
||||
}
|
||||
|
||||
func Test_basicIter(t *testing.T) {
|
||||
innerDD := U32()
|
||||
dd := U32().Iter(math.MaxInt32, innerDD)
|
||||
|
||||
var buffer bytes.Buffer
|
||||
iterations := uint32(2)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations))
|
||||
it1Val := uint32(3)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it1Val))
|
||||
it2Val := uint32(4)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it2Val))
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
x, _ := dd.(*valueDirective)
|
||||
require.Equal(t, &iterations, x.value)
|
||||
y, _ := innerDD.(*valueDirective)
|
||||
// we can't test it1Val as it gets overwritten!
|
||||
require.Equal(t, &it2Val, y.value)
|
||||
}
|
||||
|
||||
func Test_IterLimit(t *testing.T) {
|
||||
innerDD := U32()
|
||||
dd := U32().Iter(1, innerDD) // limit set at 1
|
||||
var buffer bytes.Buffer
|
||||
iterations := uint32(2)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations))
|
||||
it1Val := uint32(3)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it1Val))
|
||||
it2Val := uint32(4)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it2Val))
|
||||
require.Error(t, Execute(dd, &buffer))
|
||||
}
|
||||
|
||||
func Test_errorWithinIter(t *testing.T) {
|
||||
dd := U32().Iter(math.MaxInt32, ErrorDirective())
|
||||
|
||||
var buffer bytes.Buffer
|
||||
iterations := uint32(1)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations))
|
||||
|
||||
require.Error(t, Execute(dd, &buffer))
|
||||
}
|
||||
|
||||
func Test_errorWithinIter2(t *testing.T) {
|
||||
dd := U32().Iter(math.MaxInt32, U32().Do(ErrorOp(false)))
|
||||
var buffer bytes.Buffer
|
||||
iterations := uint32(1)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations))
|
||||
innerValue := uint32(1)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &innerValue))
|
||||
require.Error(t, Execute(dd, &buffer))
|
||||
}
|
||||
|
||||
func Test_errorWithinIter3(t *testing.T) {
|
||||
defer expectPanic(t, "Test_cantIterBytes")
|
||||
U32().Iter(math.MaxInt32, U32().Do(ErrorOp(true)))
|
||||
}
|
||||
|
||||
func Test_alreadyEncapsulated(t *testing.T) {
|
||||
defer expectPanic(t, "Test_cantIterBytes")
|
||||
u := U32()
|
||||
inner := U32()
|
||||
u.Encapsulated(math.MaxInt32, inner)
|
||||
u.Encapsulated(math.MaxInt32, inner)
|
||||
}
|
||||
|
||||
func Test_alreadyDoAssigned(t *testing.T) {
|
||||
defer expectPanic(t, "Test_cantIterBytes")
|
||||
u := U32()
|
||||
u.Do(AsF("foo"))
|
||||
inner := U32()
|
||||
u.Encapsulated(math.MaxInt32, inner)
|
||||
}
|
||||
|
||||
func Test_cantIterBytes(t *testing.T) {
|
||||
defer expectPanic(t, "Test_cantIterBytes")
|
||||
_ = Bytes(1).Iter(math.MaxInt32, U32())
|
||||
}
|
||||
|
||||
// then open metric
|
||||
func Test_OpenMetric(t *testing.T) {
|
||||
innerDD := U32()
|
||||
dd := U32().Iter(math.MaxInt32, Seq(
|
||||
OpenMetric(""),
|
||||
innerDD,
|
||||
CloseMetric(),
|
||||
))
|
||||
|
||||
var buffer bytes.Buffer
|
||||
iterations := uint32(2)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations))
|
||||
it1Val := uint32(3)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it1Val))
|
||||
it2Val := uint32(3)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it2Val))
|
||||
dc := NewDecodeContext()
|
||||
require.NoError(t, dc.Decode(dd, &buffer))
|
||||
require.Equal(t, 2, len(dc.GetMetrics()))
|
||||
}
|
||||
|
||||
func Test_AsF(t *testing.T) {
|
||||
innerDD := U32().Do(AsF("foo"))
|
||||
dd := U32().Iter(math.MaxInt32, Seq(
|
||||
OpenMetric(""),
|
||||
innerDD,
|
||||
CloseMetric(),
|
||||
))
|
||||
|
||||
var buffer bytes.Buffer
|
||||
iterations := uint32(2)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations))
|
||||
it1Val := uint32(3)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it1Val))
|
||||
it2Val := uint32(3)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it2Val))
|
||||
dc := NewDecodeContext()
|
||||
require.NoError(t, dc.Decode(dd, &buffer))
|
||||
require.Equal(t, 2, len(dc.GetMetrics()))
|
||||
m := dc.GetMetrics()
|
||||
require.Equal(t, uint64(it1Val), getField(m[0], "foo"))
|
||||
require.Equal(t, uint64(it2Val), getField(m[1], "foo"))
|
||||
}
|
||||
|
||||
func Test_AsT(t *testing.T) {
|
||||
innerDD := U32().Do(AsT("foo"))
|
||||
dd := U32().Iter(math.MaxInt32, Seq(
|
||||
OpenMetric(""),
|
||||
innerDD,
|
||||
CloseMetric(),
|
||||
))
|
||||
|
||||
var buffer bytes.Buffer
|
||||
iterations := uint32(2)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations))
|
||||
it1Val := uint32(3)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it1Val))
|
||||
it2Val := uint32(3)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it2Val))
|
||||
dc := NewDecodeContext()
|
||||
require.NoError(t, dc.Decode(dd, &buffer))
|
||||
require.Equal(t, 2, len(dc.GetMetrics()))
|
||||
m := dc.GetMetrics()
|
||||
require.Equal(t, fmt.Sprintf("%d", it1Val), getTag(m[0], "foo"))
|
||||
require.Equal(t, fmt.Sprintf("%d", it2Val), getTag(m[1], "foo"))
|
||||
}
|
||||
|
||||
func getField(m telegraf.Metric, name string) interface{} {
|
||||
v, _ := m.GetField(name)
|
||||
return v
|
||||
}
|
||||
|
||||
func getTag(m telegraf.Metric, name string) string {
|
||||
v, _ := m.GetTag(name)
|
||||
return v
|
||||
}
|
||||
|
||||
func Test_preMetricNesting(t *testing.T) {
|
||||
innerDD := U32().Do(AsF("foo"))
|
||||
dd := Seq(
|
||||
U32().Do(AsF("bar")),
|
||||
U32().Do(AsT("baz")),
|
||||
U32().Iter(math.MaxInt32,
|
||||
Seq(
|
||||
OpenMetric(""),
|
||||
innerDD,
|
||||
CloseMetric(),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
var buffer bytes.Buffer
|
||||
barVal := uint32(55)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &barVal))
|
||||
bazVal := uint32(56)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &bazVal))
|
||||
iterations := uint32(2)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations))
|
||||
it1Val := uint32(3)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it1Val))
|
||||
it2Val := uint32(3)
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it2Val))
|
||||
dc := NewDecodeContext()
|
||||
require.NoError(t, dc.Decode(dd, &buffer))
|
||||
require.Equal(t, 2, len(dc.GetMetrics()))
|
||||
m := dc.GetMetrics()
|
||||
require.Equal(t, uint64(barVal), getField(m[0], "bar"))
|
||||
require.Equal(t, fmt.Sprintf("%d", bazVal), getTag(m[0], "baz"))
|
||||
require.Equal(t, uint64(it1Val), getField(m[0], "foo"))
|
||||
require.Equal(t, uint64(barVal), getField(m[1], "bar"))
|
||||
require.Equal(t, fmt.Sprintf("%d", bazVal), getTag(m[1], "baz"))
|
||||
require.Equal(t, uint64(it2Val), getField(m[1], "foo"))
|
||||
}
|
||||
|
||||
func Test_BasicEncapsulated(t *testing.T) {
|
||||
|
||||
encap1Value := uint32(2)
|
||||
encap2Value := uint32(3)
|
||||
var encapBuffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&encapBuffer, binary.BigEndian, &encap1Value))
|
||||
require.NoError(t, binary.Write(&encapBuffer, binary.BigEndian, &encap2Value))
|
||||
|
||||
encapSize := uint32(encapBuffer.Len())
|
||||
envelopeValue := uint32(4)
|
||||
var envelopeBuffer bytes.Buffer
|
||||
|
||||
require.NoError(t, binary.Write(&envelopeBuffer, binary.BigEndian, &encapSize))
|
||||
l, e := envelopeBuffer.Write(encapBuffer.Bytes())
|
||||
require.NoError(t, e)
|
||||
require.Equal(t, encapSize, uint32(l))
|
||||
require.NoError(t, binary.Write(&envelopeBuffer, binary.BigEndian, &envelopeValue))
|
||||
|
||||
innerDD := U32()
|
||||
envelopeDD := U32() // the buffer contains another U32 but the encpaultation will ignore it
|
||||
dd := Seq(
|
||||
U32().Encapsulated(math.MaxInt32, innerDD),
|
||||
envelopeDD,
|
||||
)
|
||||
require.NoError(t, Execute(dd, &envelopeBuffer))
|
||||
|
||||
require.Equal(t, 0, envelopeBuffer.Len())
|
||||
x, _ := envelopeDD.(*valueDirective)
|
||||
require.Equal(t, &envelopeValue, x.value)
|
||||
y, _ := innerDD.(*valueDirective)
|
||||
require.Equal(t, &encap1Value, y.value)
|
||||
}
|
||||
|
||||
func Test_EncapsulationLimit(t *testing.T) {
|
||||
|
||||
encap1Value := uint32(2)
|
||||
encap2Value := uint32(3)
|
||||
var encapBuffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&encapBuffer, binary.BigEndian, &encap1Value))
|
||||
require.NoError(t, binary.Write(&encapBuffer, binary.BigEndian, &encap2Value))
|
||||
|
||||
encapSize := uint32(encapBuffer.Len())
|
||||
envelopeValue := uint32(4)
|
||||
var envelopeBuffer bytes.Buffer
|
||||
|
||||
require.NoError(t, binary.Write(&envelopeBuffer, binary.BigEndian, &encapSize))
|
||||
l, e := envelopeBuffer.Write(encapBuffer.Bytes())
|
||||
require.NoError(t, e)
|
||||
require.Equal(t, encapSize, uint32(l))
|
||||
require.NoError(t, binary.Write(&envelopeBuffer, binary.BigEndian, &envelopeValue))
|
||||
|
||||
innerDD := U32()
|
||||
envelopeDD := U32()
|
||||
dd := Seq(
|
||||
U32().Encapsulated(4, innerDD), // 4 bytes, not 8 bytes or higher as max
|
||||
envelopeDD,
|
||||
)
|
||||
require.Error(t, Execute(dd, &envelopeBuffer))
|
||||
}
|
||||
|
||||
func Test_cantEncapulatedBytes(t *testing.T) {
|
||||
defer expectPanic(t, "cantEncapulatedBytes")
|
||||
_ = Bytes(1).Encapsulated(math.MaxInt32, U32())
|
||||
}
|
||||
|
||||
func Test_BasicRef(t *testing.T) {
|
||||
var x interface{}
|
||||
dd1 := U32().Ref(&x)
|
||||
dd2 := Ref(x)
|
||||
dd := Seq(
|
||||
dd1,
|
||||
dd2,
|
||||
)
|
||||
y, ok := dd2.(*valueDirective)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, y.reference, x)
|
||||
|
||||
value := uint32(1001)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
|
||||
y, _ = dd1.(*valueDirective)
|
||||
require.Equal(t, &value, y.value)
|
||||
|
||||
y, _ = dd2.(*valueDirective)
|
||||
require.Equal(t, &value, y.value)
|
||||
}
|
||||
|
||||
func Test_RefReassignError(t *testing.T) {
|
||||
defer expectPanic(t, "iter iter")
|
||||
var x interface{}
|
||||
U32().Ref(&x)
|
||||
U32().Ref(&x)
|
||||
}
|
||||
|
||||
func Test_ToU32(t *testing.T) {
|
||||
u := U32().Do(U32ToU32(func(in uint32) uint32 { return in >> 2 }).AsF("x"))
|
||||
dd := Seq(OpenMetric(""), u, CloseMetric())
|
||||
|
||||
value := uint32(1001)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
|
||||
dc := NewDecodeContext()
|
||||
require.NoError(t, dc.Decode(dd, &buffer))
|
||||
|
||||
// require original value decoded
|
||||
x, _ := u.(*valueDirective)
|
||||
require.Equal(t, &value, x.value)
|
||||
|
||||
// require field ejected
|
||||
require.Equal(t, 1, len(dc.GetMetrics()))
|
||||
m := dc.GetMetrics()
|
||||
require.Equal(t, uint64(value>>2), getField(m[0], "x"))
|
||||
}
|
||||
|
||||
func expectPanic(t *testing.T, msg string) {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_U32BlankCanvasIter(t *testing.T) {
|
||||
u := U32().Iter(math.MaxInt32, U32())
|
||||
func() {
|
||||
defer expectPanic(t, "iter iter")
|
||||
u.Iter(math.MaxInt32, U32())
|
||||
}()
|
||||
func() {
|
||||
defer expectPanic(t, "iter switch")
|
||||
u.Switch(Case(uint32(0), U32()))
|
||||
}()
|
||||
func() {
|
||||
defer expectPanic(t, "iter encap")
|
||||
u.Encapsulated(math.MaxInt32, U32())
|
||||
}()
|
||||
func() {
|
||||
defer expectPanic(t, "iter do")
|
||||
u.Do(AsF("foo"))
|
||||
}()
|
||||
}
|
||||
func Test_U32BlankCanvasSwitch(t *testing.T) {
|
||||
u := U32().Switch(Case(uint32(0), U32()))
|
||||
func() {
|
||||
defer expectPanic(t, "switch iter")
|
||||
u.Iter(math.MaxInt32, U32())
|
||||
}()
|
||||
func() {
|
||||
defer expectPanic(t, "switch switch")
|
||||
u.Switch(Case(uint32(0), U32()))
|
||||
}()
|
||||
func() {
|
||||
defer expectPanic(t, "switch encap")
|
||||
u.Encapsulated(math.MaxInt32, U32())
|
||||
}()
|
||||
func() {
|
||||
defer expectPanic(t, "switch do")
|
||||
u.Do(AsF("foo"))
|
||||
}()
|
||||
}
|
||||
|
||||
func Test_U32BasicSwitch(t *testing.T) {
|
||||
s := U32().Switch(Case(uint32(0), nil))
|
||||
value := uint32(0)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
dc := NewDecodeContext()
|
||||
require.NoError(t, dc.Decode(s, &buffer))
|
||||
}
|
||||
|
||||
func Test_U32BasicSwitchDefault(t *testing.T) {
|
||||
s := U32().Switch(Case(uint32(0), nil), DefaultCase(nil))
|
||||
value := uint32(2)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
dc := NewDecodeContext()
|
||||
require.NoError(t, dc.Decode(s, &buffer))
|
||||
}
|
||||
|
||||
func Test_U16(t *testing.T) {
|
||||
dd := U16()
|
||||
value := uint16(1001)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
require.Equal(t, 0, buffer.Len())
|
||||
x, _ := dd.(*valueDirective)
|
||||
require.Equal(t, &value, x.value)
|
||||
}
|
||||
|
||||
func Test_U16Value(t *testing.T) {
|
||||
myU16 := uint16(5)
|
||||
dd := U16Value(&myU16)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
x, _ := dd.(*valueDirective)
|
||||
require.Equal(t, &myU16, x.value)
|
||||
}
|
||||
|
||||
func Test_Bytes(t *testing.T) {
|
||||
dd := Bytes(4)
|
||||
value := []byte{0x01, 0x02, 0x03, 0x04}
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
require.Equal(t, 0, buffer.Len())
|
||||
x, _ := dd.(*valueDirective)
|
||||
require.Equal(t, value, x.value)
|
||||
}
|
||||
|
||||
func Test_nilRefAnfWongTypeRef(t *testing.T) {
|
||||
func() {
|
||||
defer expectPanic(t, "Test_nilRef")
|
||||
Ref(nil)
|
||||
}()
|
||||
|
||||
func() {
|
||||
defer expectPanic(t, "Test_nilRef")
|
||||
f := new(uint32)
|
||||
Ref(f)
|
||||
}()
|
||||
}
|
||||
216
plugins/inputs/sflow/decoder/funcs.go
Normal file
216
plugins/inputs/sflow/decoder/funcs.go
Normal file
@@ -0,0 +1,216 @@
|
||||
package decoder
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
)
|
||||
|
||||
// U32 answers a directive for 32bit Unsigned Integers
|
||||
func U32() ValueDirective {
|
||||
return &valueDirective{value: new(uint32)}
|
||||
}
|
||||
|
||||
// U64 answers a directive for 64bit Unsigned Integers
|
||||
func U64() ValueDirective {
|
||||
return &valueDirective{value: new(uint64)}
|
||||
}
|
||||
|
||||
// U8 answers a directive for 8bit Unsigned Integers
|
||||
func U8() ValueDirective {
|
||||
return &valueDirective{value: new(uint8)}
|
||||
}
|
||||
|
||||
// U16 answers a directive for 32bit Unsigned Integers
|
||||
func U16() ValueDirective {
|
||||
return &valueDirective{value: new(uint16)}
|
||||
}
|
||||
|
||||
// U16Value answers a directive that doesn't actually decode itself but reused a value previously decoded of type uint16
|
||||
func U16Value(value *uint16) ValueDirective {
|
||||
return &valueDirective{value: value, noDecode: true}
|
||||
}
|
||||
|
||||
// Bytes answers a value directive that will decode the specified number (len) of bytes from the packet
|
||||
func Bytes(len int) ValueDirective {
|
||||
return &valueDirective{value: make([]byte, len)}
|
||||
}
|
||||
|
||||
// Case answers a directive to be used within a Switch clause of a U32 directive
|
||||
func Case(caseValue interface{}, dd Directive) CaseValueDirective {
|
||||
return &caseValueDirective{caseValue: caseValue, isDefault: false, equalsDd: dd}
|
||||
}
|
||||
|
||||
// DefaultCase answers a case decoder directive that can be used as the default, catch all, of a Switch
|
||||
func DefaultCase(dd Directive) CaseValueDirective {
|
||||
return &caseValueDirective{caseValue: nil, isDefault: true, equalsDd: dd}
|
||||
}
|
||||
|
||||
// Ref answers a decoder that reuses, through referal, an existing U32 directive
|
||||
func Ref(target interface{}) ValueDirective {
|
||||
if target == nil {
|
||||
panic("Ref given a nil reference")
|
||||
}
|
||||
r, ok := target.(*valueDirective)
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("Ref not given a ValueDirective reference but a %T", target))
|
||||
}
|
||||
return &valueDirective{reference: r, value: r.value}
|
||||
}
|
||||
|
||||
// Seq ansers a directive that sequentially executes a list of provided directives
|
||||
func Seq(decoders ...Directive) Directive {
|
||||
return &sequenceDirective{decoders: decoders}
|
||||
}
|
||||
|
||||
func SeqOf(decoders []Directive) Directive {
|
||||
return &sequenceDirective{decoders: decoders}
|
||||
}
|
||||
|
||||
// OpenMetric answers a directive that opens a new metrics for collecting tags and fields
|
||||
func OpenMetric(name string) Directive {
|
||||
return &openMetric{name: name}
|
||||
}
|
||||
|
||||
// CloseMetric answers a directive that close the current metrics
|
||||
func CloseMetric() Directive {
|
||||
return &closeMetric{}
|
||||
}
|
||||
|
||||
// NewDecodeContext ansewers a new Decode Contect to support the process of decoding
|
||||
func NewDecodeContext() *DecodeContext {
|
||||
m, _ := metric.New("sflow", make(map[string]string), make(map[string]interface{}), time.Now())
|
||||
return &DecodeContext{preMetric: m}
|
||||
}
|
||||
|
||||
// U32ToU32 answers a decode operation that transforms a uint32 to a uint32 via the supplied fn
|
||||
func U32ToU32(fn func(uint32) uint32) *U32ToU32DOp {
|
||||
result := &U32ToU32DOp{fn: fn, baseDOp: baseDOp{}}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// U32ToStr answers a decode operation that transforms a uint32 to a string via the supplied fn
|
||||
func U32ToStr(fn func(uint32) string) *U32ToStrDOp {
|
||||
result := &U32ToStrDOp{baseDOp: baseDOp{}, fn: fn}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// U16ToStr answers a decode operation that transforms a uint16 to a string via the supplied fn
|
||||
func U16ToStr(fn func(uint16) string) *U16ToStrDOp {
|
||||
result := &U16ToStrDOp{baseDOp: baseDOp{}, fn: fn}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// U16ToU16 answers a decode operation that transforms a uint16 to a uint16 via the supplied fn
|
||||
func U16ToU16(fn func(uint16) uint16) *U16ToU16DOp {
|
||||
result := &U16ToU16DOp{baseDOp: baseDOp{}, fn: fn}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// AsF answers a decode operation that will output a field into the open metric with the given name
|
||||
func AsF(name string) *AsFDOp {
|
||||
result := &AsFDOp{baseDOp: baseDOp{}, name: name}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// AsT answers a decode operation that will output a tag into the open metric with the given name
|
||||
func AsT(name string) *AsTDOp {
|
||||
result := &AsTDOp{name: name, baseDOp: baseDOp{}}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// AsTimestamp answers a decode operation that will set the tiemstamp on the metric
|
||||
func AsTimestamp() *AsTimestampDOp {
|
||||
result := &AsTimestampDOp{baseDOp: baseDOp{}}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// BytesToStr answers a decode operation that transforms a []bytes to a string via the supplied fn
|
||||
func BytesToStr(len int, fn func([]byte) string) *BytesToStrDOp {
|
||||
result := &BytesToStrDOp{baseDOp: baseDOp{}, len: len, fn: fn}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// BytesTo answers a decode operation that transforms a []bytes to a interface{} via the supplied fn
|
||||
func BytesTo(len int, fn func([]byte) interface{}) *BytesToDOp {
|
||||
result := &BytesToDOp{baseDOp: baseDOp{}, len: len, fn: fn}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// BytesToU32 answers a decode operation that transforms a []bytes to an uint32 via the supplied fn
|
||||
func BytesToU32(len int, fn func([]byte) uint32) *BytesToU32DOp {
|
||||
result := &BytesToU32DOp{baseDOp: baseDOp{}, len: len, fn: fn}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// MapU32ToStr answers a decode operation that maps an uint32 to a string via the supplied map
|
||||
func MapU32ToStr(m map[uint32]string) *U32ToStrDOp {
|
||||
result := &U32ToStrDOp{fn: func(in uint32) string {
|
||||
return m[in]
|
||||
}, baseDOp: baseDOp{}}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// U32Assert answers a decode operation that will assert the uint32 is a particulr value or generate an error
|
||||
func U32Assert(fn func(v uint32) bool, fmtStr string) *U32AssertDOp {
|
||||
result := &U32AssertDOp{baseDOp: baseDOp{}, fn: fn, fmtStr: fmtStr}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
func U16Assert(fn func(v uint16) bool, fmtStr string) *U16AssertDOp {
|
||||
result := &U16AssertDOp{baseDOp: baseDOp{}, fn: fn, fmtStr: fmtStr}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// MapU16ToStr answers a decode operation that maps an uint16 to a string via the supplied map
|
||||
func MapU16ToStr(m map[uint16]string) *U16ToStrDOp {
|
||||
result := &U16ToStrDOp{baseDOp: baseDOp{}, fn: func(in uint16) string {
|
||||
return m[in]
|
||||
}}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// Set answers a decode operation that will set the supplied *value to the value passed through the operation
|
||||
func Set(ptr interface{}) *SetDOp {
|
||||
result := &SetDOp{ptr: ptr, baseDOp: baseDOp{}}
|
||||
result.do = result
|
||||
return result
|
||||
}
|
||||
|
||||
// ErrorDirective answers a decode directive that will generate an error
|
||||
func ErrorDirective() Directive {
|
||||
return &errorDirective{}
|
||||
}
|
||||
|
||||
// ErrorOp answers a decode operation that will generate an error
|
||||
func ErrorOp(errorOnTestProcess bool) *ErrorDOp {
|
||||
result := &ErrorDOp{baseDOp: baseDOp{}, errorOnTestProcess: errorOnTestProcess}
|
||||
result.do = result
|
||||
return result
|
||||
|
||||
}
|
||||
|
||||
// Notify answers a decode directive that will notify the supplied function upon execution
|
||||
func Notify(fn func()) Directive {
|
||||
return ¬ifyDirective{fn}
|
||||
}
|
||||
|
||||
// Nop answer a decode directive that is the null, benign, deocder
|
||||
func Nop() Directive {
|
||||
return Notify(func() {})
|
||||
}
|
||||
490
plugins/inputs/sflow/decoder/ops.go
Normal file
490
plugins/inputs/sflow/decoder/ops.go
Normal file
@@ -0,0 +1,490 @@
|
||||
package decoder
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
// DirectiveOp are operations that are performed on values that have been decoded.
|
||||
// They are expected to be chained together, in a flow programming style, and the
|
||||
// Decode Directive that they are assigned to then walks back up the linked list to find the root
|
||||
// operation that will then be performed (passing the value down through various transformations)
|
||||
type DirectiveOp interface {
|
||||
prev() DirectiveOp
|
||||
// process method can be executed in two contexts, one to check that the given type
|
||||
// of upstream value can be processed (not to process it) and then to actually process
|
||||
// the upstream value. The difference in reqwuired behaviour is signalled by the presence
|
||||
// of the DecodeContect - if nil. just test, if !nil process
|
||||
process(dc *DecodeContext, upstreamValue interface{}) error
|
||||
}
|
||||
|
||||
type baseDOp struct {
|
||||
p DirectiveOp
|
||||
do DirectiveOp
|
||||
n DirectiveOp
|
||||
}
|
||||
|
||||
func (op *baseDOp) prev() DirectiveOp {
|
||||
return op.p
|
||||
}
|
||||
|
||||
func (op *baseDOp) AsF(name string) DirectiveOp {
|
||||
result := &AsFDOp{baseDOp: baseDOp{p: op.do}, name: name}
|
||||
result.do = result
|
||||
op.n = result
|
||||
return result
|
||||
}
|
||||
|
||||
func (op *baseDOp) AsT(name string) DirectiveOp {
|
||||
result := &AsTDOp{baseDOp: baseDOp{p: op.do}, name: name}
|
||||
result.do = result
|
||||
op.n = result
|
||||
return result
|
||||
}
|
||||
|
||||
func (op *baseDOp) Set(ptr interface{}) *SetDOp {
|
||||
result := &SetDOp{baseDOp: baseDOp{p: op.do}, ptr: ptr}
|
||||
result.do = result
|
||||
op.n = result
|
||||
return result
|
||||
}
|
||||
|
||||
// U32ToU32DOp is a deode operation that can process U32 to U32
|
||||
type U32ToU32DOp struct {
|
||||
baseDOp
|
||||
fn func(uint32) uint32
|
||||
}
|
||||
|
||||
func (op *U32ToU32DOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
var out uint32
|
||||
switch v := upstreamValue.(type) {
|
||||
case *uint32:
|
||||
if dc != nil {
|
||||
out = op.fn(*v)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("cannot process %T", v)
|
||||
}
|
||||
|
||||
if dc != nil && op.n != nil {
|
||||
return op.n.process(dc, out)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ToString answers a U32ToStr decode operation that will transform this output of thie U32ToU32 into a string
|
||||
func (op *U32ToU32DOp) ToString(fn func(uint32) string) *U32ToStrDOp {
|
||||
result := &U32ToStrDOp{baseDOp: baseDOp{p: op}, fn: fn}
|
||||
result.do = result
|
||||
op.n = result
|
||||
return result
|
||||
}
|
||||
|
||||
// AsFDOp is a deode operation that writes fields to metrics
|
||||
type AsFDOp struct {
|
||||
baseDOp
|
||||
name string
|
||||
}
|
||||
|
||||
func (op *AsFDOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
var m telegraf.Metric
|
||||
if dc != nil {
|
||||
m = dc.currentMetric()
|
||||
}
|
||||
switch v := upstreamValue.(type) {
|
||||
case *uint64:
|
||||
if dc != nil {
|
||||
m.AddField(op.name, *v)
|
||||
}
|
||||
case *uint32:
|
||||
if dc != nil {
|
||||
m.AddField(op.name, *v)
|
||||
}
|
||||
case uint32:
|
||||
if dc != nil {
|
||||
m.AddField(op.name, v)
|
||||
}
|
||||
case *uint16:
|
||||
if dc != nil {
|
||||
m.AddField(op.name, *v)
|
||||
}
|
||||
case uint16:
|
||||
if dc != nil {
|
||||
m.AddField(op.name, v)
|
||||
}
|
||||
case *uint8:
|
||||
if dc != nil {
|
||||
m.AddField(op.name, *v)
|
||||
}
|
||||
case uint8:
|
||||
if dc != nil {
|
||||
m.AddField(op.name, v)
|
||||
}
|
||||
case string:
|
||||
if dc != nil {
|
||||
m.AddField(op.name, v)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("AsF cannot process %T", v)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AsTimestampDOp is a deode operation that sets the timestamp on the metric
|
||||
type AsTimestampDOp struct {
|
||||
baseDOp
|
||||
}
|
||||
|
||||
func (op *AsTimestampDOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
var m telegraf.Metric
|
||||
if dc != nil {
|
||||
m = dc.currentMetric()
|
||||
}
|
||||
switch v := upstreamValue.(type) {
|
||||
case *uint32:
|
||||
if dc != nil {
|
||||
m.SetTime(time.Unix(int64(*v), 0))
|
||||
dc.timeHasBeenSet = true
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("can't process %T", upstreamValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AsTDOp is a deode operation that writes tags to metrics
|
||||
type AsTDOp struct {
|
||||
baseDOp
|
||||
name string
|
||||
skipEmpty bool
|
||||
}
|
||||
|
||||
func (op *AsTDOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
var m telegraf.Metric
|
||||
if dc != nil {
|
||||
m = dc.currentMetric()
|
||||
}
|
||||
switch v := upstreamValue.(type) {
|
||||
case *uint32:
|
||||
if dc != nil {
|
||||
m.AddTag(op.name, fmt.Sprintf("%d", *v))
|
||||
}
|
||||
case uint32:
|
||||
if dc != nil {
|
||||
m.AddTag(op.name, fmt.Sprintf("%d", v))
|
||||
}
|
||||
case *uint16:
|
||||
if dc != nil {
|
||||
m.AddTag(op.name, fmt.Sprintf("%d", *v))
|
||||
}
|
||||
case uint16:
|
||||
if dc != nil {
|
||||
m.AddTag(op.name, fmt.Sprintf("%d", v))
|
||||
}
|
||||
case *uint8:
|
||||
if dc != nil {
|
||||
m.AddTag(op.name, fmt.Sprintf("%d", *v))
|
||||
}
|
||||
case uint8:
|
||||
if dc != nil {
|
||||
m.AddTag(op.name, fmt.Sprintf("%d", v))
|
||||
}
|
||||
case string:
|
||||
if dc != nil {
|
||||
if !op.skipEmpty || v != "" {
|
||||
m.AddTag(op.name, v)
|
||||
}
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("can't process %T", upstreamValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (op *AsTDOp) prev() DirectiveOp {
|
||||
return op.p
|
||||
}
|
||||
|
||||
// BytesToStrDOp is a decode operation that transforms []bytes to strings
|
||||
type BytesToStrDOp struct {
|
||||
baseDOp
|
||||
len int
|
||||
fn func([]byte) string
|
||||
}
|
||||
|
||||
func (op *BytesToStrDOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
switch v := upstreamValue.(type) {
|
||||
case []byte:
|
||||
if len(v) == op.len {
|
||||
if dc != nil {
|
||||
out := op.fn(v)
|
||||
if op.n != nil {
|
||||
return op.n.process(dc, out)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("cannot process len(%d) as requrire %d", len(v), op.len)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("cannot process %T", upstreamValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// U32AssertDOp is a decode operation that asserts a particular uint32 value
|
||||
type U32AssertDOp struct {
|
||||
baseDOp
|
||||
fn func(uint32) bool
|
||||
fmtStr string
|
||||
}
|
||||
|
||||
func (op *U32AssertDOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
switch v := upstreamValue.(type) {
|
||||
case *uint32:
|
||||
if dc != nil && !op.fn(*v) {
|
||||
return fmt.Errorf(op.fmtStr, *v)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("cannot process %T", upstreamValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// U16AssertDOp is a decode operation that asserts a particular uint32 value
|
||||
type U16AssertDOp struct {
|
||||
baseDOp
|
||||
fn func(uint16) bool
|
||||
fmtStr string
|
||||
}
|
||||
|
||||
func (op *U16AssertDOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
switch v := upstreamValue.(type) {
|
||||
case *uint16:
|
||||
if dc != nil && !op.fn(*v) {
|
||||
return fmt.Errorf(op.fmtStr, *v)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("cannot process %T", upstreamValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// U32ToStrDOp is a decod eoperation that transforms a uint32 to a string
|
||||
type U32ToStrDOp struct {
|
||||
baseDOp
|
||||
fn func(uint32) string
|
||||
}
|
||||
|
||||
func (op *U32ToStrDOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
switch v := upstreamValue.(type) {
|
||||
case uint32:
|
||||
if dc != nil && op.n != nil {
|
||||
op.n.process(dc, (op.fn(v)))
|
||||
}
|
||||
case *uint32:
|
||||
if dc != nil && op.n != nil {
|
||||
return op.n.process(dc, (op.fn(*v)))
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("cannot process %T", upstreamValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BreakIf answers a BreakIf operation that will break the current decode operation chain, without an error, if the value processed
|
||||
// is the supplied value
|
||||
func (op *U32ToStrDOp) BreakIf(value string) *BreakIfDOp {
|
||||
result := &BreakIfDOp{baseDOp: baseDOp{p: op}, value: value}
|
||||
result.do = result
|
||||
op.n = result
|
||||
return result
|
||||
}
|
||||
|
||||
// U16ToStrDOp is a decode operation that transforms a uint16 to a string
|
||||
type U16ToStrDOp struct {
|
||||
baseDOp
|
||||
fn func(uint16) string
|
||||
}
|
||||
|
||||
func (op *U16ToStrDOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
switch v := upstreamValue.(type) {
|
||||
case *uint16:
|
||||
if dc != nil {
|
||||
return op.n.process(dc, (op.fn(*v)))
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("cannot process %T", upstreamValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BreakIfDOp is a decode operation that will break the current outer iteration
|
||||
type BreakIfDOp struct {
|
||||
baseDOp
|
||||
value string
|
||||
}
|
||||
|
||||
func (op *BreakIfDOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
switch v := upstreamValue.(type) {
|
||||
case string:
|
||||
if dc != nil {
|
||||
if v != op.value {
|
||||
op.n.process(dc, v)
|
||||
}
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("cannot process %T", upstreamValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// U16ToU16DOp is a decode operation that transfirms one uint16 to another uint16
|
||||
type U16ToU16DOp struct {
|
||||
baseDOp
|
||||
fn func(uint16) uint16
|
||||
}
|
||||
|
||||
func (op *U16ToU16DOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
var out uint16
|
||||
var err error
|
||||
switch v := upstreamValue.(type) {
|
||||
case *uint16:
|
||||
if dc != nil {
|
||||
out = op.fn(*v)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("cannot process %T", upstreamValue)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if op.n != nil && dc != nil {
|
||||
return op.n.process(dc, out)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BytesToU32DOp is a decode operation that transforms a []byte to a uint32
|
||||
type BytesToU32DOp struct {
|
||||
baseDOp
|
||||
len int
|
||||
fn func([]byte) uint32
|
||||
}
|
||||
|
||||
func (op *BytesToU32DOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
switch v := upstreamValue.(type) {
|
||||
case []byte:
|
||||
if len(v) == op.len {
|
||||
out := op.fn(v)
|
||||
if op.n != nil {
|
||||
return op.n.process(dc, out)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("cannot process %T as len(%d) != %d", upstreamValue, v, op.len)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("cannot process %T", upstreamValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetDOp is a decode operation that will Set a pointer to a value to be the value processed
|
||||
type SetDOp struct {
|
||||
baseDOp
|
||||
ptr interface{}
|
||||
}
|
||||
|
||||
func (op *SetDOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
switch v := upstreamValue.(type) {
|
||||
case *uint32:
|
||||
ptr, ok := op.ptr.(*uint32)
|
||||
if ok {
|
||||
if dc != nil {
|
||||
*ptr = *v
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("cannot process as ptr %T and not *uint32", op.ptr)
|
||||
}
|
||||
case uint32:
|
||||
ptr, ok := op.ptr.(*uint32)
|
||||
if ok {
|
||||
if dc != nil {
|
||||
*ptr = v
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("cannot process as ptr %T and not *uint32", op.ptr)
|
||||
}
|
||||
case *uint16:
|
||||
ptr, ok := op.ptr.(*uint16)
|
||||
if ok {
|
||||
if dc != nil {
|
||||
*ptr = *v
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("cannot process as ptr %T and not *uint16", op.ptr)
|
||||
}
|
||||
case uint16:
|
||||
ptr, ok := op.ptr.(*uint16)
|
||||
if ok {
|
||||
if dc != nil {
|
||||
*ptr = v
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("cannot process as ptr %T and not *uint16", op.ptr)
|
||||
}
|
||||
case string:
|
||||
ptr, ok := op.ptr.(*string)
|
||||
if ok {
|
||||
if dc != nil {
|
||||
*ptr = v
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("cannot process as ptr %T and not *string", op.ptr)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("cannot process %T", upstreamValue)
|
||||
}
|
||||
if op.n != nil && dc != nil {
|
||||
return op.n.process(dc, upstreamValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BytesToDOp is a decode operation that will transform []byte to interface{} according to a suppied function
|
||||
type BytesToDOp struct {
|
||||
baseDOp
|
||||
len int
|
||||
fn func([]byte) interface{}
|
||||
}
|
||||
|
||||
func (op *BytesToDOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
switch v := upstreamValue.(type) {
|
||||
case []byte:
|
||||
if len(v) == op.len {
|
||||
if dc != nil {
|
||||
out := op.fn(v)
|
||||
return op.n.process(dc, out)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("cannot process as len:%d required %d", len(v), op.len)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("cannot process %T", upstreamValue)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ErrorDOp is a decode operation that will generate an error
|
||||
type ErrorDOp struct {
|
||||
baseDOp
|
||||
errorOnTestProcess bool
|
||||
}
|
||||
|
||||
func (op *ErrorDOp) process(dc *DecodeContext, upstreamValue interface{}) error {
|
||||
if dc == nil && !op.errorOnTestProcess {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Error Op")
|
||||
}
|
||||
383
plugins/inputs/sflow/decoder/ops_test.go
Normal file
383
plugins/inputs/sflow/decoder/ops_test.go
Normal file
@@ -0,0 +1,383 @@
|
||||
package decoder
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func Test_U64AsF(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := AsF("out")
|
||||
in := uint64(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, in, getField(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U32AsF(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := AsF("out")
|
||||
in := uint32(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, uint64(in), getField(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U16PtrAsF(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := AsF("out")
|
||||
in := uint16(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, uint64(in), getField(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U16AsF(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := AsF("out")
|
||||
in := uint16(5)
|
||||
require.NoError(t, ddo.process(dc, in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, uint64(in), getField(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U8AsF(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := AsF("out")
|
||||
in := uint8(5)
|
||||
require.NoError(t, ddo.process(dc, in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, uint64(in), getField(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U8PtrAsF(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := AsF("out")
|
||||
in := uint8(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, uint64(in), getField(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U32AsT(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := AsT("out")
|
||||
in := uint32(5)
|
||||
require.NoError(t, ddo.process(dc, in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, fmt.Sprintf("%d", in), getTag(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U32PtrAsT(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := AsT("out")
|
||||
in := uint32(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, fmt.Sprintf("%d", in), getTag(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U16AsT(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := AsT("out")
|
||||
in := uint16(5)
|
||||
require.NoError(t, ddo.process(dc, in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, fmt.Sprintf("%d", in), getTag(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U16PtrAsT(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := AsT("out")
|
||||
in := uint16(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, fmt.Sprintf("%d", in), getTag(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U8AsT(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := AsT("out")
|
||||
in := uint8(5)
|
||||
require.NoError(t, ddo.process(dc, in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, fmt.Sprintf("%d", in), getTag(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U8PtrAsT(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := AsT("out")
|
||||
in := uint8(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, fmt.Sprintf("%d", in), getTag(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U32ToU32AsF(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := U32ToU32(func(i uint32) uint32 { return i * 2 })
|
||||
ddo2 := ddo.AsF("out")
|
||||
require.Equal(t, ddo, ddo2.prev())
|
||||
in := uint32(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, uint64(in*2), getField(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U16ToU16AsF(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := U16ToU16(func(i uint16) uint16 { return i * 2 })
|
||||
ddo2 := ddo.AsF("out")
|
||||
require.Equal(t, ddo, ddo2.prev())
|
||||
in := uint16(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, uint64(in*2), getField(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U32ToStrAsT(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := U32ToStr(func(i uint32) string { return fmt.Sprintf("%d", i*2) })
|
||||
ddo2 := ddo.AsT("out")
|
||||
require.Equal(t, ddo, ddo2.prev())
|
||||
in := uint32(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, fmt.Sprintf("%d", (in*2)), getTag(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U16ToStrAsT(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := U16ToStr(func(i uint16) string { return fmt.Sprintf("%d", i*2) })
|
||||
ddo2 := ddo.AsT("out")
|
||||
require.Equal(t, ddo, ddo2.prev())
|
||||
in := uint16(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, fmt.Sprintf("%d", (in*2)), getTag(m, "out"))
|
||||
}
|
||||
|
||||
func Test_MapU32ToStrAsT(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
myMap := map[uint32]string{5: "five"}
|
||||
ddo := MapU32ToStr(myMap)
|
||||
ddo2 := ddo.AsT("out")
|
||||
require.Equal(t, ddo, ddo2.prev())
|
||||
in := uint32(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, "five", getTag(m, "out"))
|
||||
}
|
||||
|
||||
func Test_MapU16ToStrAsT(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
myMap := map[uint16]string{5: "five"}
|
||||
ddo := MapU16ToStr(myMap)
|
||||
ddo2 := ddo.AsT("out")
|
||||
require.Equal(t, ddo, ddo2.prev())
|
||||
in := uint16(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, "five", getTag(m, "out"))
|
||||
}
|
||||
|
||||
func Test_DecDir_ToU32(t *testing.T) {
|
||||
u := U32().
|
||||
Do(U32ToU32(func(in uint32) uint32 { return in >> 2 }).AsF("out1")).
|
||||
Do(U32ToU32(func(in uint32) uint32 { return in * 2 }).AsF("out2"))
|
||||
dd := Seq(OpenMetric(""), u, CloseMetric())
|
||||
|
||||
value := uint32(1001)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
|
||||
dc := NewDecodeContext()
|
||||
require.NoError(t, dc.Decode(dd, &buffer))
|
||||
|
||||
x, _ := u.(*valueDirective)
|
||||
require.Equal(t, &value, x.value)
|
||||
|
||||
// require field ejected
|
||||
require.Equal(t, 1, len(dc.GetMetrics()))
|
||||
m := dc.GetMetrics()
|
||||
require.Equal(t, uint64(value>>2), getField(m[0], "out1"))
|
||||
require.Equal(t, uint64(value*2), getField(m[0], "out2"))
|
||||
}
|
||||
|
||||
func Test_BytesToStrAsT(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
f := func(b []byte) string { return fmt.Sprintf("%d:%d", b[0], b[1]) }
|
||||
ddo := BytesToStr(2, f)
|
||||
ddo2 := ddo.AsT("out")
|
||||
require.Equal(t, ddo, ddo2.prev())
|
||||
in := []byte{0x01, 0x02}
|
||||
require.NoError(t, ddo.process(dc, in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, fmt.Sprintf("%d:%d", in[0], in[1]), getTag(m, "out"))
|
||||
}
|
||||
|
||||
func Test_BytesToAsT(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
f := func(b []byte) interface{} { return fmt.Sprintf("%d:%d", b[0], b[1]) }
|
||||
ddo := BytesTo(2, f)
|
||||
ddo2 := ddo.AsT("out")
|
||||
require.Equal(t, ddo, ddo2.prev())
|
||||
in := []byte{0x01, 0x02}
|
||||
require.NoError(t, ddo.process(dc, in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, fmt.Sprintf("%d:%d", in[0], in[1]), getTag(m, "out"))
|
||||
}
|
||||
|
||||
func Test_BytesToU32AsF(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
f := func(b []byte) uint32 { return uint32(b[0] * b[1]) }
|
||||
ddo := BytesToU32(2, f)
|
||||
ddo2 := ddo.AsF("out")
|
||||
require.Equal(t, ddo, ddo2.prev())
|
||||
in := []byte{0x01, 0x02}
|
||||
require.NoError(t, ddo.process(dc, in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, uint64(in[0]*in[1]), getField(m, "out"))
|
||||
}
|
||||
|
||||
func Test_U32require(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
ddo := U32Assert(func(in uint32) bool { return false }, "bad")
|
||||
in := uint32(5)
|
||||
require.Error(t, ddo.process(dc, &in))
|
||||
}
|
||||
|
||||
func Test_U16require(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
ddo := U16Assert(func(in uint16) bool { return false }, "bad")
|
||||
in := uint16(5)
|
||||
require.Error(t, ddo.process(dc, &in))
|
||||
}
|
||||
|
||||
func Test_Set(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
ptr := new(uint32)
|
||||
ddo := Set(ptr)
|
||||
in := uint32(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
require.Equal(t, *ptr, in)
|
||||
}
|
||||
|
||||
func Test_U16Set(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
ptr := new(uint16)
|
||||
ddo := Set(ptr)
|
||||
in := uint16(5)
|
||||
require.NoError(t, ddo.process(dc, in))
|
||||
require.Equal(t, *ptr, in)
|
||||
}
|
||||
|
||||
func Test_U16PtrSet(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
ptr := new(uint16)
|
||||
ddo := Set(ptr)
|
||||
in := uint16(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
require.Equal(t, *ptr, in)
|
||||
}
|
||||
|
||||
func Test_U32toU32Set(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
ptr := new(uint32)
|
||||
ddo := U32ToU32(func(in uint32) uint32 { return in * 2 }).Set(ptr).prev()
|
||||
in := uint32(5)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
require.Equal(t, *ptr, in*2)
|
||||
}
|
||||
|
||||
func Test_U32toU32toString(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
ptr := new(string)
|
||||
ddo := U32ToU32(func(in uint32) uint32 { return in * 2 }).ToString(func(in uint32) string { return fmt.Sprintf("%d", in*2) }).Set(ptr).prev().prev()
|
||||
in := uint32(2)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
require.Equal(t, "8", *ptr)
|
||||
}
|
||||
|
||||
func Test_U32toU32toStringBreakIf(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
ptr := new(string)
|
||||
ddo := U32ToU32(func(in uint32) uint32 { return in * 2 }).ToString(func(in uint32) string { return fmt.Sprintf("%d", in*2) }).BreakIf("8").Set(ptr).prev().prev().prev()
|
||||
in := uint32(2)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
require.Equal(t, "", *ptr)
|
||||
|
||||
in = uint32(1)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
require.Equal(t, "4", *ptr)
|
||||
}
|
||||
|
||||
func Test_notify(t *testing.T) {
|
||||
value := uint32(1001)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
|
||||
ptr := new(uint32)
|
||||
*ptr = uint32(2002)
|
||||
var notificationOne uint32
|
||||
var notificationTwo uint32
|
||||
dd := Seq(
|
||||
Notify(func() { notificationOne = *ptr }),
|
||||
U32().Do(Set(ptr)),
|
||||
Notify(func() { notificationTwo = *ptr }),
|
||||
)
|
||||
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
require.Equal(t, uint32(2002), notificationOne)
|
||||
require.Equal(t, uint32(1001), notificationTwo)
|
||||
}
|
||||
|
||||
func Test_nop(t *testing.T) {
|
||||
value := uint32(1001)
|
||||
var buffer bytes.Buffer
|
||||
require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value))
|
||||
originalLen := buffer.Len()
|
||||
dd := Seq(
|
||||
Nop(),
|
||||
)
|
||||
|
||||
require.NoError(t, Execute(dd, &buffer))
|
||||
require.Equal(t, originalLen, buffer.Len())
|
||||
}
|
||||
|
||||
func Test_AsTimestamp(t *testing.T) {
|
||||
dc := NewDecodeContext()
|
||||
dc.openMetric("")
|
||||
ddo := AsTimestamp()
|
||||
now := time.Now()
|
||||
in := uint32(now.Unix()) // only handles as uin32 (not uint64)
|
||||
require.NoError(t, ddo.process(dc, &in))
|
||||
m := dc.currentMetric()
|
||||
require.Equal(t, now.Unix(), m.Time().Unix())
|
||||
}
|
||||
Reference in New Issue
Block a user