diff --git a/plugins/inputs/EXAMPLE_README.md b/plugins/inputs/EXAMPLE_README.md index ffe1be7cc..6b86615b0 100644 --- a/plugins/inputs/EXAMPLE_README.md +++ b/plugins/inputs/EXAMPLE_README.md @@ -4,7 +4,7 @@ The `example` plugin gathers metrics about example things. This description explains at a high level what the plugin does and provides links to where additional information can be found. -Telegraf minimum version: Telegraf x.x +Telegraf minimum version: Telegraf x.x Plugin minimum tested version: x.x ### Configuration diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 3e1f959fa..33038ab72 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -140,6 +140,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/riak" _ "github.com/influxdata/telegraf/plugins/inputs/salesforce" _ "github.com/influxdata/telegraf/plugins/inputs/sensors" + _ "github.com/influxdata/telegraf/plugins/inputs/sflow" _ "github.com/influxdata/telegraf/plugins/inputs/smart" _ "github.com/influxdata/telegraf/plugins/inputs/snmp" _ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy" diff --git a/plugins/inputs/sflow/README.md b/plugins/inputs/sflow/README.md new file mode 100644 index 000000000..07cd2024b --- /dev/null +++ b/plugins/inputs/sflow/README.md @@ -0,0 +1,92 @@ +# SFlow Input Plugin + +The SFlow Input Plugin provides support for acting as an SFlow V5 collector in +accordance with the specification from [sflow.org](https://sflow.org/). + +Currently only Flow Samples of Ethernet / IPv4 & IPv4 TCP & UDP headers are +turned into metrics. Counters and other header samples are ignored. + +### Configuration + +```toml +[[inputs.sflow]] + ## Address to listen for sFlow packets. + ## example: service_address = "udp://:6343" + ## service_address = "udp4://:6343" + ## service_address = "udp6://:6343" + service_address = "udp://:6343" + + ## Set the size of the operating system's receive buffer. + ## example: read_buffer_size = "64KiB" + # read_buffer_size = "" +``` + +### Metrics + +- sflow + - tags: + - agent_address (IP address of the agent that obtained the sflow sample and sent it to this collector) + - source_id_type(source_id_type field of flow_sample or flow_sample_expanded structures) + - source_id_index(source_id_index field of flow_sample or flow_sample_expanded structures) + - input_ifindex (value (input) field of flow_sample or flow_sample_expanded structures) + - output_ifindex (value (output) field of flow_sample or flow_sample_expanded structures) + - sample_direction (source_id_index, netif_index_in and netif_index_out) + - header_protocol (header_protocol field of sampled_header structures) + - ether_type (eth_type field of an ETHERNET-ISO88023 header) + - src_ip (source_ipaddr field of IPv4 or IPv6 structures) + - src_port (src_port field of TCP or UDP structures) + - src_port_name (src_port) + - src_mac (source_mac_addr field of an ETHERNET-ISO88023 header) + - src_vlan (src_vlan field of extended_switch structure) + - src_priority (src_priority field of extended_switch structure) + - src_mask_len (src_mask_len field of extended_router structure) + - dst_ip (destination_ipaddr field of IPv4 or IPv6 structures) + - dst_port (dst_port field of TCP or UDP structures) + - dst_port_name (dst_port) + - dst_mac (destination_mac_addr field of an ETHERNET-ISO88023 header) + - dst_vlan (dst_vlan field of extended_switch structure) + - dst_priority (dst_priority field of extended_switch structure) + - dst_mask_len (dst_mask_len field of extended_router structure) + - next_hop (next_hop field of extended_router structure) + - ip_version (ip_ver field of IPv4 or IPv6 structures) + - ip_protocol (ip_protocol field of IPv4 or IPv6 structures) + - ip_dscp (ip_dscp field of IPv4 or IPv6 structures) + - ip_ecn (ecn field of IPv4 or IPv6 structures) + - tcp_urgent_pointer (urgent_pointer field of TCP structure) + - fields: + - bytes (integer, the product of frame_length and packets) + - drops (integer, drops field of flow_sample or flow_sample_expanded structures) + - packets (integer, sampling_rate field of flow_sample or flow_sample_expanded structures) + - frame_length (integer, frame_length field of sampled_header structures) + - header_size (integer, header_size field of sampled_header structures) + - ip_fragment_offset (integer, ip_ver field of IPv4 structures) + - ip_header_length (integer, ip_ver field of IPv4 structures) + - ip_total_length (integer, ip_total_len field of IPv4 structures) + - ip_ttl (integer, ip_ttl field of IPv4 structures or ip_hop_limit field IPv6 structures) + - tcp_header_length (integer, size field of TCP structure. This value is specified in 32-bit words. It must be multiplied by 4 to produce a value in bytes.) + - tcp_window_size (integer, window_size field of TCP structure) + - udp_length (integer, length field of UDP structures) + - ip_flags (integer, ip_ver field of IPv4 structures) + - tcp_flags (integer, TCP flags of TCP IP header (IPv4 or IPv6)) + +### Troubleshooting + +The [sflowtool][] utility can be used to print sFlow packets, and compared +against the metrics produced by Telegraf. +``` +sflowtool -p 6343 +``` + +If opening an issue, in addition to the output of sflowtool it will also be +helpful to collect a packet capture. Adjust the interface, host and port as +needed: +``` +$ sudo tcpdump -s 0 -i eth0 -w telegraf-sflow.pcap host 127.0.0.1 and port 6343 +``` + +[sflowtool]: https://github.com/sflow/sflowtool + +### Example Output +``` +sflow,agent_address=0.0.0.0,dst_ip=10.0.0.2,dst_mac=ff:ff:ff:ff:ff:ff,dst_port=40042,ether_type=IPv4,header_protocol=ETHERNET-ISO88023,input_ifindex=6,ip_dscp=27,ip_ecn=0,output_ifindex=1073741823,source_id_index=3,source_id_type=0,src_ip=10.0.0.1,src_mac=ff:ff:ff:ff:ff:ff,src_port=443 bytes=1570i,drops=0i,frame_length=157i,header_length=128i,ip_flags=2i,ip_fragment_offset=0i,ip_total_length=139i,ip_ttl=42i,sampling_rate=10i,tcp_header_length=0i,tcp_urgent_pointer=0i,tcp_window_size=14i 1584473704793580447 +``` diff --git a/plugins/inputs/sflow/decoder.go b/plugins/inputs/sflow/decoder.go new file mode 100644 index 000000000..51a534881 --- /dev/null +++ b/plugins/inputs/sflow/decoder.go @@ -0,0 +1,306 @@ +package sflow + +import ( + "fmt" + "math" + "net" + + "github.com/influxdata/telegraf/plugins/inputs/sflow/decoder" +) + +const ( + addressTypeIPv4 = uint32(1) // line: 1383 + addressTypeIPv6 = uint32(2) // line: 1384 + + sampleTypeFlowSample = uint32(1) // line: 1614 + sampleTypeFlowSampleExpanded = uint32(3) // line: 1698 + + flowDataRawPacketHeaderFormat = uint32(1) // line: 1938 + + headerProtocolEthernetIso88023 = uint32(1) // line: 1920 + + ipProtocolTCP = byte(6) + ipProtocolUDP = byte(17) + + metricName = "sflow" +) + +var headerProtocolMap = map[uint32]string{ + headerProtocolEthernetIso88023: "ETHERNET-ISO88023", // line: 1920 +} + +var etypeMap = map[uint16]string{ + 0x0800: "IPv4", + 0x86DD: "IPv6", +} + +func bytesToIPStr(b []byte) string { + return net.IP(b).String() +} + +func bytesToMACStr(b []byte) string { + return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x", b[0], b[1], b[2], b[3], b[4], b[5]) +} + +var ipvMap = map[uint32]string{ + 1: "IPV4", // line: 1383 + 2: "IPV6", // line: 1384 +} + +// V5FormatOptions captures configuration for controlling the processing of an SFlow V5 packet. +type V5FormatOptions struct { + MaxFlowsPerSample uint32 + MaxSamplesPerPacket uint32 + MaxFlowHeaderLength uint32 + MaxSampleLength uint32 +} + +// NewDefaultV5FormatOptions answers a new V5FormatOptions with default values initialised +func NewDefaultV5FormatOptions() V5FormatOptions { + return V5FormatOptions{ + MaxFlowsPerSample: math.MaxUint32, + MaxSamplesPerPacket: math.MaxUint32, + MaxFlowHeaderLength: math.MaxUint32, + MaxSampleLength: math.MaxUint32, + } +} + +// V5Format answers and decoder.Directive capable of decoding sFlow v5 packets in accordance +// with SFlow v5 specification at https://sflow.org/sflow_version_5.txt +func V5Format(options V5FormatOptions) decoder.Directive { + return decoder.Seq( // line: 1823 + decoder.U32().Do(decoder.U32Assert(func(v uint32) bool { return v == 5 }, "Version %d not supported, only version 5")), + decoder.U32().Switch( // agent_address line: 1787 + decoder.Case(addressTypeIPv4, decoder.Bytes(4).Do(decoder.BytesToStr(4, bytesToIPStr).AsT("agent_address"))), // line: 1390 + decoder.Case(addressTypeIPv6, decoder.Bytes(16).Do(decoder.BytesToStr(16, bytesToIPStr).AsT("agent_address"))), // line: 1393 + ), + decoder.U32(), // sub_agent_id line: 1790 + decoder.U32(), // sequence_number line: 1801 + decoder.U32(), // uptime line: 1804 + decoder.U32().Iter(options.MaxSamplesPerPacket, sampleRecord(options)), // samples line: 1812 + ) +} + +func sampleRecord(options V5FormatOptions) decoder.Directive { + var sampleType interface{} + return decoder.Seq( // line: 1760 + decoder.U32().Ref(&sampleType), // sample_type line: 1761 + decoder.U32().Encapsulated(options.MaxSampleLength, // sample_data line: 1762 + decoder.Ref(sampleType).Switch( + decoder.Case(sampleTypeFlowSample, flowSample(sampleType, options)), // line: 1614 + decoder.Case(sampleTypeFlowSampleExpanded, flowSampleExpanded(sampleType, options)), // line: 1698 + decoder.DefaultCase(nil), // this allows other cases to just be ignored rather than cause an error + ), + ), + ) +} + +func flowSample(sampleType interface{}, options V5FormatOptions) decoder.Directive { + var samplingRate = new(uint32) + var sourceIDIndex = new(uint32) + return decoder.Seq( // line: 1616 + decoder.U32(), // sequence_number line: 1617 + decoder.U32(). // source_id line: 1622 + Do(decoder.U32ToU32(func(v uint32) uint32 { return v >> 24 }).AsT("source_id_type")). // source_id_type Line 1465 + Do(decoder.U32ToU32(func(v uint32) uint32 { return v & 0x00ffffff }).Set(sourceIDIndex).AsT("source_id_index")), // line: 1468 + decoder.U32().Do(decoder.Set(samplingRate).AsF("sampling_rate")), // line: 1631 + decoder.U32(), // samplePool: Line 1632 + decoder.U32().Do(decoder.AsF("drops")), // Line 1636 + decoder.U32(). // line: 1651 + Do(decoder.U32ToU32(func(v uint32) uint32 { return v & 0x3fffffff }).AsT("input_ifindex")). // line: 1477 + Do(decoder.U32ToU32(func(v uint32) uint32 { return v & 0x3fffffff }). + ToString(func(v uint32) string { + if v == *sourceIDIndex { + return "ingress" + } + return "" + }). + BreakIf(""). + AsT("sample_direction")), + decoder.U32(). // line: 1652 + Do(decoder.U32ToU32(func(v uint32) uint32 { return v & 0x3fffffff }).AsT("output_ifindex")). // line: 1477 + Do(decoder.U32ToU32(func(v uint32) uint32 { return v & 0x3fffffff }). + ToString(func(v uint32) string { + if v == *sourceIDIndex { + return "egress" + } + return "" + }). + BreakIf(""). + AsT("sample_direction")), + decoder.U32().Iter(options.MaxFlowsPerSample, flowRecord(samplingRate, options)), // line: 1654 + ) +} + +func flowSampleExpanded(sampleType interface{}, options V5FormatOptions) decoder.Directive { + var samplingRate = new(uint32) + var sourceIDIndex = new(uint32) + return decoder.Seq( // line: 1700 + decoder.U32(), // sequence_number line: 1701 + decoder.U32().Do(decoder.AsT("source_id_type")), // line: 1706 + 16878 + decoder.U32().Do(decoder.Set(sourceIDIndex).AsT("source_id_index")), // line 1689 + decoder.U32().Do(decoder.Set(samplingRate).AsF("sampling_rate")), // sample_rate line: 1707 + decoder.U32(), // saple_pool line: 1708 + decoder.U32().Do(decoder.AsF("drops")), // line: 1712 + decoder.U32(), // inputt line: 1727 + decoder.U32(). // input line: 1727 + Do(decoder.AsT("input_ifindex")). // line: 1728 + Do(decoder.U32ToStr(func(v uint32) string { + if v == *sourceIDIndex { + return "ingress" + } + return "" + }). + BreakIf(""). + AsT("sample_direction")), + decoder.U32(), // output line: 1728 + decoder.U32(). // outpuit line: 1728 + Do(decoder.AsT("output_ifindex")). // line: 1729 CHANFE AS FOR NON EXPANDED + Do(decoder.U32ToStr(func(v uint32) string { + if v == *sourceIDIndex { + return "egress" + } + return "" + }). + BreakIf(""). + AsT("sample_direction")), + decoder.U32().Iter(options.MaxFlowsPerSample, flowRecord(samplingRate, options)), // line: 1730 + ) +} + +func flowRecord(samplingRate *uint32, options V5FormatOptions) decoder.Directive { + var flowFormat interface{} + return decoder.Seq( // line: 1597 + decoder.U32().Ref(&flowFormat), // line 1598 + decoder.U32().Encapsulated(options.MaxFlowHeaderLength, // line 1599 + decoder.Ref(flowFormat).Switch( + decoder.Case(flowDataRawPacketHeaderFormat, rawPacketHeaderFlowData(samplingRate, options)), // line: 1938 + decoder.DefaultCase(nil), + ), + ), + ) +} + +func rawPacketHeaderFlowData(samplingRate *uint32, options V5FormatOptions) decoder.Directive { + var protocol interface{} + var headerLength interface{} + return decoder.Seq( // line: 1940 + decoder.U32().Ref(&protocol).Do(decoder.MapU32ToStr(headerProtocolMap).AsT("header_protocol")), // line: 1941 + decoder.U32(). // line: 1942 + Do(decoder.AsF("frame_length")). + Do(decoder.U32ToU32(func(in uint32) uint32 { + return in * (*samplingRate) + }).AsF("bytes")), + decoder.U32(), // stripped line: 1967 + decoder.U32().Ref(&headerLength).Do(decoder.AsF("header_length")), + decoder.Ref(headerLength).Encapsulated(options.MaxFlowHeaderLength, + decoder.Ref(protocol).Switch( + decoder.Case(headerProtocolEthernetIso88023, ethHeader(options)), + decoder.DefaultCase(nil), + )), + ) +} + +// ethHeader answers a decode Directive that will decode an ethernet frame header +// according to https://en.wikipedia.org/wiki/Ethernet_frame +func ethHeader(options V5FormatOptions) decoder.Directive { + var tagOrEType interface{} + etype := new(uint16) + return decoder.Seq( + decoder.OpenMetric(metricName), + decoder.Bytes(6).Do(decoder.BytesToStr(6, bytesToMACStr).AsT("dst_mac")), + decoder.Bytes(6).Do(decoder.BytesToStr(6, bytesToMACStr).AsT("src_mac")), + decoder.U16().Ref(&tagOrEType).Switch( + decoder.Case(uint16(0x8100), + decoder.Seq( + decoder.U16(), + decoder.U16().Do(decoder.Set(etype)), // just follows on from vlan id + ), + ), + decoder.DefaultCase( // Not an 802.1Q VLAN Tag, just treat as an ether type + decoder.Ref(tagOrEType).Do(decoder.Set(etype)), + ), + ), + decoder.U16Value(etype).Do(decoder.MapU16ToStr(etypeMap).AsT("ether_type")), + decoder.U16Value(etype).Switch( + decoder.Case(uint16(0x0800), ipv4Header(options)), + decoder.Case(uint16(0x86DD), ipv6Header(options)), + decoder.DefaultCase(nil), + ), + decoder.CloseMetric(), + ) + +} + +// ipv4Header answers a decode Directive that decode an IPv4 header according to +// https://en.wikipedia.org/wiki/IPv4 +func ipv4Header(options V5FormatOptions) decoder.Directive { + var proto interface{} + return decoder.Seq( + decoder.U16(). + Do(decoder.U16ToU16(func(in uint16) uint16 { return (in & 0xFC) >> 2 }).AsT("ip_dscp")). + Do(decoder.U16ToU16(func(in uint16) uint16 { return in & 0x3 }).AsT("ip_ecn")), + decoder.U16().Do(decoder.AsF("ip_total_length")), + decoder.U16(), + decoder.U16(). + Do(decoder.U16ToU16(func(v uint16) uint16 { return (v & 0xE000) >> 13 }).AsF("ip_flags")). + Do(decoder.U16ToU16(func(v uint16) uint16 { return v & 0x1FFF }).AsF("ip_fragment_offset")), + decoder.Bytes(1).Do(decoder.BytesTo(1, func(b []byte) interface{} { return uint8(b[0]) }).AsF("ip_ttl")), + decoder.Bytes(1).Ref(&proto), + decoder.U16(), + decoder.Bytes(4).Do(decoder.BytesToStr(4, bytesToIPStr).AsT("src_ip")), + decoder.Bytes(4).Do(decoder.BytesToStr(4, bytesToIPStr).AsT("dst_ip")), + decoder.Ref(proto).Switch( // Does not consider IHL and Options + decoder.Case(ipProtocolTCP, tcpHeader(options)), + decoder.Case(ipProtocolUDP, udpHeader(options)), + decoder.DefaultCase(nil), + ), + ) +} + +// ipv6Header answers a decode Directive that decode an IPv6 header according to +// https://en.wikipedia.org/wiki/IPv6_packet +func ipv6Header(options V5FormatOptions) decoder.Directive { + nextHeader := new(uint16) + return decoder.Seq( + decoder.U32(). + Do(decoder.U32ToU32(func(in uint32) uint32 { return (in & 0xFC00000) >> 22 }).AsF("ip_dscp")). + Do(decoder.U32ToU32(func(in uint32) uint32 { return (in & 0x300000) >> 20 }).AsF("ip_ecn")), + decoder.U16(), + decoder.U16(). + Do(decoder.U16ToU16(func(in uint16) uint16 { return (in & 0xFF00) >> 8 }).Set(nextHeader)), + decoder.Bytes(16).Do(decoder.BytesToStr(16, bytesToIPStr).AsT("src_ip")), + decoder.Bytes(16).Do(decoder.BytesToStr(16, bytesToIPStr).AsT("dst_ip")), + decoder.U16Value(nextHeader).Switch( + decoder.Case(uint16(ipProtocolTCP), tcpHeader(options)), + decoder.Case(uint16(ipProtocolUDP), udpHeader(options)), + decoder.DefaultCase(nil), + ), + ) +} + +func tcpHeader(options V5FormatOptions) decoder.Directive { + return decoder.Seq( + decoder.U16(). + Do(decoder.AsT("src_port")), + decoder.U16(). + Do(decoder.AsT("dst_port")), + decoder.U32(), //"sequence"), + decoder.U32(), //"ack_number"), + decoder.Bytes(2). + Do(decoder.BytesToU32(2, func(b []byte) uint32 { return uint32((b[0] & 0xF0) * 4) }).AsF("tcp_header_length")), + decoder.U16().Do(decoder.AsF("tcp_window_size")), + decoder.U16(), // "checksum"), + decoder.U16().Do(decoder.AsF("tcp_urgent_pointer")), + ) +} + +func udpHeader(options V5FormatOptions) decoder.Directive { + return decoder.Seq( + decoder.U16(). + Do(decoder.AsT("src_port")), + decoder.U16(). + Do(decoder.AsT("dst_port")), + decoder.U16().Do(decoder.AsF("udp_length")), + ) +} diff --git a/plugins/inputs/sflow/decoder/directives.go b/plugins/inputs/sflow/decoder/directives.go new file mode 100644 index 000000000..9b20e1c33 --- /dev/null +++ b/plugins/inputs/sflow/decoder/directives.go @@ -0,0 +1,402 @@ +package decoder + +import ( + "bytes" + "encoding/binary" + "fmt" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +// Directive is a Decode Directive, the basic building block of a decoder +type Directive interface { + + // Execute performs the function of the decode directive. If DecodeContext is nil then the + // ask is to check that a subsequent execution (with non nill DecodeContext) is expted to work. + Execute(*bytes.Buffer, *DecodeContext) error +} + +type IterOption struct { + EOFTerminateIter bool + RemainingToGreaterEqualOrTerminate uint32 +} + +// ValueDirective is a decode directive that extracts some data from the packet, an integer or byte maybe, +// which it then processes by using it, for example, as the counter for the number of iterations to perform +// of downstream decode directives. +// +// A ValueDirective can be used to either Switch, Iter(ate), Encapsulate or Do mutually exclusively. +type ValueDirective interface { + Directive + + // Switch attaches a set of conditional decode directives downstream of this decode directive + Switch(paths ...CaseValueDirective) ValueDirective + + // Iter attaches a single downstream decode directive that will be executed repeatedly according to the iteration count + Iter(maxIterations uint32, dd Directive, iterOptions ...IterOption) ValueDirective + + // Encapsulated will form a new buffer of the encapsulated length and pass that buffer on to the downsstream decode directive + Encapsulated(maxSize uint32, dd Directive) ValueDirective + + // Ref records this decode directive in the passed reference + Ref(*interface{}) ValueDirective + + // Do attaches a Decode Operation - these are uses of the decoded information to perform work on, transform, write out etc. + Do(ddo DirectiveOp) ValueDirective +} + +type valueDirective struct { + reference *valueDirective + + value interface{} + noDecode bool + + cases []CaseValueDirective + iter Directive + maxIterations uint32 + encapsulated Directive + maxEncapsulation uint32 + ops []DirectiveOp + err error + + iterOption IterOption +} + +func valueToString(in interface{}) string { + switch v := in.(type) { + case *uint16: + return fmt.Sprintf("%d", *v) + case uint16: + return fmt.Sprintf("%d", v) + case *uint32: + return fmt.Sprintf("%d", *v) + case uint32: + return fmt.Sprintf("%d", v) + default: + return fmt.Sprintf("%v", in) + } +} + +func (dd *valueDirective) Execute(buffer *bytes.Buffer, dc *DecodeContext) error { + if dd.reference == nil && !dd.noDecode { + if e := binary.Read(buffer, binary.BigEndian, dd.value); e != nil { + return e + } + } + + // Switch downstream? + if dd.cases != nil && len(dd.cases) > 0 { + for _, c := range dd.cases { + if c.Equals(dd.value) { + return c.Execute(buffer, dc) + } + } + switch v := dd.value.(type) { + case *uint32: + return fmt.Errorf("(%T).Switch,unmatched case %d", v, *v) + case *uint16: + return fmt.Errorf("(%T).Switch,unmatched case %d", v, *v) + default: + return fmt.Errorf("(%T).Switch,unmatched case %v", dd.value, dd.value) + } + } + + // Iter downstream? + if dd.iter != nil { + fn := func(id interface{}) error { + if dd.iterOption.RemainingToGreaterEqualOrTerminate > 0 && uint32(buffer.Len()) < dd.iterOption.RemainingToGreaterEqualOrTerminate { + return nil + } + if dd.iterOption.EOFTerminateIter && buffer.Len() == 0 { + return nil + } + if e := dd.iter.Execute(buffer, dc); e != nil { + return e + } + return nil + } + switch v := dd.value.(type) { + case *uint32: + if *v > dd.maxIterations { + return fmt.Errorf("iter exceeds configured max - value %d, limit %d", *v, dd.maxIterations) + } + for i := uint32(0); i < *v; i++ { + if e := fn(i); e != nil { + return e + } + } + case *uint16: + if *v > uint16(dd.maxIterations) { + return fmt.Errorf("iter exceeds configured max - value %d, limit %d", *v, dd.maxIterations) + } + for i := uint16(0); i < *v; i++ { + if e := fn(i); e != nil { + return e + } + } + default: + // Can't actually get here if .Iter method check types (and it does) + return fmt.Errorf("(%T).Iter, cannot iterator over this type", dd.value) + } + } + + // Encapsualted downstream> + if dd.encapsulated != nil { + switch v := dd.value.(type) { + case *uint32: + if *v > dd.maxEncapsulation { + return fmt.Errorf("encap exceeds configured max - value %d, limit %d", *v, dd.maxEncapsulation) + } + return dd.encapsulated.Execute(bytes.NewBuffer(buffer.Next(int(*v))), dc) + case *uint16: + if *v > uint16(dd.maxEncapsulation) { + return fmt.Errorf("encap exceeds configured max - value %d, limit %d", *v, dd.maxEncapsulation) + } + return dd.encapsulated.Execute(bytes.NewBuffer(buffer.Next(int(*v))), dc) + } + } + + // Perform the attached operations + for _, op := range dd.ops { + if err := op.process(dc, dd.value); err != nil { + return err + } + } + + return nil +} + +// panickIfNotBlackCanvas checks the state of this value directive to see if it is has +// alrady been configured in a manner inconsistent with another configuration change +func (dd *valueDirective) panickIfNotBlackCanvas(change string, checkDOs bool) { + if dd.cases != nil { + panic(fmt.Sprintf("already have switch cases assigned, cannot assign %s", change)) + } + if dd.iter != nil { + panic(fmt.Sprintf("already have iter assigned, cannot assign %s", change)) + } + if dd.encapsulated != nil { + panic(fmt.Sprintf("already have encap assigned, cannot assign %s @", change)) + } + if checkDOs && dd.ops != nil && len(dd.ops) > 0 { + panic(fmt.Sprintf("already have do assigned, cannot assign %s", change)) + } +} + +func (dd *valueDirective) Switch(paths ...CaseValueDirective) ValueDirective { + dd.panickIfNotBlackCanvas("new switch", true) + dd.cases = paths + return dd +} + +func (dd *valueDirective) Iter(maxIterations uint32, iter Directive, iterOptions ...IterOption) ValueDirective { + dd.panickIfNotBlackCanvas("new iter", true) + switch dd.value.(type) { + case *uint32: + case *uint16: + default: + panic(fmt.Sprintf("cannot iterate a %T", dd.value)) + } + + dd.iter = iter + dd.maxIterations = maxIterations + for _, io := range iterOptions { + dd.iterOption = io + } + return dd +} + +func (dd *valueDirective) Encapsulated(maxSize uint32, encapsulated Directive) ValueDirective { + dd.panickIfNotBlackCanvas("new encapsulated", true) + switch dd.value.(type) { + case *uint32: + case *uint16: + default: + panic(fmt.Sprintf("cannot encapsulated on a %T", dd.value)) + } + + dd.encapsulated = encapsulated + dd.maxEncapsulation = maxSize + return dd +} + +func (dd *valueDirective) Do(ddo DirectiveOp) ValueDirective { + dd.panickIfNotBlackCanvas("new do", false) + for { + if ddo.prev() == nil { + break + } + ddo = ddo.prev() + } + if err := ddo.process(nil, dd.value); err != nil { + panic(fmt.Sprintf("directive operation %T cannot process %T - %s", ddo, dd.value, err)) + } + if dd.ops == nil { + dd.ops = make([]DirectiveOp, 0, 5) + } + dd.ops = append(dd.ops, ddo) + + return dd +} + +func (dd *valueDirective) Ref(ref *interface{}) ValueDirective { + if *ref != nil { + panic("ref already assigned, not overwritting") + } + *ref = dd + return dd +} + +// errorDirective a decode directive that reports an error +type errorDirective struct { + Directive +} + +func (dd *errorDirective) Execute(buffer *bytes.Buffer, dc *DecodeContext) error { + return fmt.Errorf("Error Directive") +} + +// CaseValueDirective is a decode directive that also has a switch/case test +type CaseValueDirective interface { + Directive + Equals(interface{}) bool +} + +type caseValueDirective struct { + caseValue interface{} + isDefault bool + equalsDd Directive +} + +func (dd *caseValueDirective) Execute(buffer *bytes.Buffer, dc *DecodeContext) error { + if dd.equalsDd == nil { + return nil + } + return dd.equalsDd.Execute(buffer, dc) +} + +func (dd *caseValueDirective) Equals(value interface{}) bool { + if dd.isDefault { + return true + } + switch ourV := dd.caseValue.(type) { + case uint32: + ov, ok := value.(*uint32) + if ok { + return ourV == *ov + } + case uint16: + ov, ok := value.(*uint16) + if ok { + return ourV == *ov + } + case byte: + ov, ok := value.([]byte) + if ok { + if len(ov) == 1 { + return ourV == ov[0] + } + } + } + return false +} + +// sequenceDirective is a decode directive that is a simple sequentially executed list of other decode directives +type sequenceDirective struct { + decoders []Directive +} + +func (di *sequenceDirective) Execute(buffer *bytes.Buffer, dc *DecodeContext) error { + for _, innerDD := range di.decoders { + if err := innerDD.Execute(buffer, dc); err != nil { + return err + } + } + return nil +} + +// openMetric a decode directive that opens the recording of new fields and tags +type openMetric struct { + name string +} + +func (di *openMetric) Execute(buffer *bytes.Buffer, dc *DecodeContext) error { + dc.openMetric(di.name) + return nil +} + +// closeMetric a decode directive that closes the current open metric +type closeMetric struct { +} + +func (di *closeMetric) Execute(buffer *bytes.Buffer, dc *DecodeContext) error { + dc.closeMetric() + return nil +} + +// DecodeContext provides context for the decoding of a packet and primarily acts +// as a repository for metrics that are collected during the packet decode process +type DecodeContext struct { + metrics []telegraf.Metric + timeHasBeenSet bool + + // oreMetric is used to capture tags or fields that may be recored before a metric has been openned + // these fields and tags are then copied into metrics that are then subsequently opened + preMetric telegraf.Metric + current telegraf.Metric + nano int +} + +func (dc *DecodeContext) openMetric(name string) { + t := dc.preMetric.Time() + if !dc.timeHasBeenSet { + t = time.Now().Add(time.Duration(dc.nano)) + } + m, _ := metric.New(name, make(map[string]string), make(map[string]interface{}), t) + dc.nano++ + // make sure to copy any fields and tags that were capture prior to the metric being openned + for t, v := range dc.preMetric.Tags() { + m.AddTag(t, v) + } + for f, v := range dc.preMetric.Fields() { + m.AddField(f, v) + } + dc.current = m +} + +func (dc *DecodeContext) closeMetric() { + if dc.current != nil { + dc.metrics = append(dc.metrics, dc.current) + } + dc.current = nil +} + +func (dc *DecodeContext) currentMetric() telegraf.Metric { + if dc.current == nil { + return dc.preMetric + } + return dc.current +} + +// Decode initiates the decoding of the supplied buffer according to the root decode directive that is provided +func (dc *DecodeContext) Decode(dd Directive, buffer *bytes.Buffer) error { + return dd.Execute(buffer, dc) +} + +// GetMetrics answers the metrics that have been collected during the packet decode +func (dc *DecodeContext) GetMetrics() []telegraf.Metric { + return dc.metrics +} + +type notifyDirective struct { + fn func() +} + +func (nd *notifyDirective) Execute(_ *bytes.Buffer, dc *DecodeContext) error { + if dc != nil { + nd.fn() + } + return nil +} diff --git a/plugins/inputs/sflow/decoder/directives_test.go b/plugins/inputs/sflow/decoder/directives_test.go new file mode 100644 index 000000000..0a8d99e7a --- /dev/null +++ b/plugins/inputs/sflow/decoder/directives_test.go @@ -0,0 +1,582 @@ +package decoder + +import ( + "bytes" + "encoding/binary" + "fmt" + "math" + "testing" + + "github.com/influxdata/telegraf" + "github.com/stretchr/testify/require" +) + +// Execute will ececute the decode directive relative to the supplied buffer +func Execute(dd Directive, buffer *bytes.Buffer) error { + dc := &DecodeContext{} + return dd.Execute(buffer, dc) +} + +func Test_basicUI32NotEnoughBytes(t *testing.T) { + dd := U32() + value := uint16(1001) // not enough bytes to read a U32 out as only a U16 in + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + require.Error(t, Execute(dd, &buffer)) +} + +func Test_basicUI32(t *testing.T) { + dd := U32() + value := uint32(1001) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + require.NoError(t, Execute(dd, &buffer)) + require.Equal(t, 0, buffer.Len()) + x, _ := dd.(*valueDirective) + require.Equal(t, &value, x.value) +} + +func Test_basicBytes(t *testing.T) { + dd := Bytes(4) + value := []byte{0x01, 0x02, 0x03, 0x04} + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + require.NoError(t, Execute(dd, &buffer)) + require.Equal(t, 0, buffer.Len()) + x, _ := dd.(*valueDirective) + require.Equal(t, value, x.value) +} + +func Test_basicSeq(t *testing.T) { + + // Seq with no members compiles and executed but buffer is left untouched + dd := Seq() + value := uint32(1001) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + originalLen := buffer.Len() + require.NoError(t, Execute(dd, &buffer)) + require.Equal(t, originalLen, buffer.Len()) + + u := U32() + dd = Seq( + u, + ) + value = uint32(1001) + buffer.Reset() + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + require.NoError(t, Execute(dd, &buffer)) + require.Equal(t, 0, buffer.Len()) + x, _ := u.(*valueDirective) + require.Equal(t, &value, x.value) +} + +func Test_basicSeqOf(t *testing.T) { + // SeqOf with no members compiles and executed but buffer is left untouched + dd := SeqOf([]Directive{}) + value := uint32(1001) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + originalLen := buffer.Len() + require.NoError(t, Execute(dd, &buffer)) + require.Equal(t, originalLen, buffer.Len()) + + u := U32() + dd = SeqOf( + []Directive{u}, + ) + value = uint32(1001) + buffer.Reset() + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + require.NoError(t, Execute(dd, &buffer)) + require.Equal(t, 0, buffer.Len()) + x, _ := u.(*valueDirective) + require.Equal(t, &value, x.value) +} + +func Test_errorInSeq(t *testing.T) { + // Seq with no members compiles and executed but buffer is left untouched + dd := Seq(U32(), ErrorDirective()) + value := uint32(1001) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + require.Error(t, Execute(dd, &buffer)) +} + +func Test_basicU32Switch(t *testing.T) { + c1 := U32() + c2 := U32() + dd := U32().Switch( + Case(uint32(1), c1), + Case(uint32(2), c2), + ) + + value1 := uint32(3) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value1)) + value2 := uint32(4) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value2)) + require.Error(t, Execute(dd, &buffer)) // should error as no path + + value1 = uint32(1) + buffer.Reset() + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value1)) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value2)) + require.NoError(t, Execute(dd, &buffer)) + x, _ := c1.(*valueDirective) + y, _ := c2.(*valueDirective) + value0 := uint32(0) + require.Equal(t, &value2, x.value) + require.Equal(t, &value0, y.value) + + // bad path shoudl raise error + // path 1 should be able to fina value in c1 and not in c2 + // then other way around +} + +func Test_basicBinSwitch(t *testing.T) { + c1 := U32() + c2 := U32() + dd := Bytes(1).Switch( + Case(byte(1), c1), + Case(byte(2), c2), + ) + + value1 := byte(3) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value1)) + value2 := uint32(4) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value2)) + require.Error(t, Execute(dd, &buffer)) // should error as no path + + value1 = byte(1) + buffer.Reset() + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value1)) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value2)) + require.NoError(t, Execute(dd, &buffer)) + x, _ := c1.(*valueDirective) + y, _ := c2.(*valueDirective) + value0 := uint32(0) + require.Equal(t, &value2, x.value) + require.Equal(t, &value0, y.value) + + // bad path shoudl raise error + // path 1 should be able to fina value in c1 and not in c2 + // then other way around +} + +func Test_basicIter(t *testing.T) { + innerDD := U32() + dd := U32().Iter(math.MaxInt32, innerDD) + + var buffer bytes.Buffer + iterations := uint32(2) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations)) + it1Val := uint32(3) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it1Val)) + it2Val := uint32(4) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it2Val)) + require.NoError(t, Execute(dd, &buffer)) + x, _ := dd.(*valueDirective) + require.Equal(t, &iterations, x.value) + y, _ := innerDD.(*valueDirective) + // we can't test it1Val as it gets overwritten! + require.Equal(t, &it2Val, y.value) +} + +func Test_IterLimit(t *testing.T) { + innerDD := U32() + dd := U32().Iter(1, innerDD) // limit set at 1 + var buffer bytes.Buffer + iterations := uint32(2) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations)) + it1Val := uint32(3) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it1Val)) + it2Val := uint32(4) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it2Val)) + require.Error(t, Execute(dd, &buffer)) +} + +func Test_errorWithinIter(t *testing.T) { + dd := U32().Iter(math.MaxInt32, ErrorDirective()) + + var buffer bytes.Buffer + iterations := uint32(1) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations)) + + require.Error(t, Execute(dd, &buffer)) +} + +func Test_errorWithinIter2(t *testing.T) { + dd := U32().Iter(math.MaxInt32, U32().Do(ErrorOp(false))) + var buffer bytes.Buffer + iterations := uint32(1) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations)) + innerValue := uint32(1) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &innerValue)) + require.Error(t, Execute(dd, &buffer)) +} + +func Test_errorWithinIter3(t *testing.T) { + defer expectPanic(t, "Test_cantIterBytes") + U32().Iter(math.MaxInt32, U32().Do(ErrorOp(true))) +} + +func Test_alreadyEncapsulated(t *testing.T) { + defer expectPanic(t, "Test_cantIterBytes") + u := U32() + inner := U32() + u.Encapsulated(math.MaxInt32, inner) + u.Encapsulated(math.MaxInt32, inner) +} + +func Test_alreadyDoAssigned(t *testing.T) { + defer expectPanic(t, "Test_cantIterBytes") + u := U32() + u.Do(AsF("foo")) + inner := U32() + u.Encapsulated(math.MaxInt32, inner) +} + +func Test_cantIterBytes(t *testing.T) { + defer expectPanic(t, "Test_cantIterBytes") + _ = Bytes(1).Iter(math.MaxInt32, U32()) +} + +// then open metric +func Test_OpenMetric(t *testing.T) { + innerDD := U32() + dd := U32().Iter(math.MaxInt32, Seq( + OpenMetric(""), + innerDD, + CloseMetric(), + )) + + var buffer bytes.Buffer + iterations := uint32(2) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations)) + it1Val := uint32(3) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it1Val)) + it2Val := uint32(3) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it2Val)) + dc := NewDecodeContext() + require.NoError(t, dc.Decode(dd, &buffer)) + require.Equal(t, 2, len(dc.GetMetrics())) +} + +func Test_AsF(t *testing.T) { + innerDD := U32().Do(AsF("foo")) + dd := U32().Iter(math.MaxInt32, Seq( + OpenMetric(""), + innerDD, + CloseMetric(), + )) + + var buffer bytes.Buffer + iterations := uint32(2) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations)) + it1Val := uint32(3) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it1Val)) + it2Val := uint32(3) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it2Val)) + dc := NewDecodeContext() + require.NoError(t, dc.Decode(dd, &buffer)) + require.Equal(t, 2, len(dc.GetMetrics())) + m := dc.GetMetrics() + require.Equal(t, uint64(it1Val), getField(m[0], "foo")) + require.Equal(t, uint64(it2Val), getField(m[1], "foo")) +} + +func Test_AsT(t *testing.T) { + innerDD := U32().Do(AsT("foo")) + dd := U32().Iter(math.MaxInt32, Seq( + OpenMetric(""), + innerDD, + CloseMetric(), + )) + + var buffer bytes.Buffer + iterations := uint32(2) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations)) + it1Val := uint32(3) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it1Val)) + it2Val := uint32(3) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it2Val)) + dc := NewDecodeContext() + require.NoError(t, dc.Decode(dd, &buffer)) + require.Equal(t, 2, len(dc.GetMetrics())) + m := dc.GetMetrics() + require.Equal(t, fmt.Sprintf("%d", it1Val), getTag(m[0], "foo")) + require.Equal(t, fmt.Sprintf("%d", it2Val), getTag(m[1], "foo")) +} + +func getField(m telegraf.Metric, name string) interface{} { + v, _ := m.GetField(name) + return v +} + +func getTag(m telegraf.Metric, name string) string { + v, _ := m.GetTag(name) + return v +} + +func Test_preMetricNesting(t *testing.T) { + innerDD := U32().Do(AsF("foo")) + dd := Seq( + U32().Do(AsF("bar")), + U32().Do(AsT("baz")), + U32().Iter(math.MaxInt32, + Seq( + OpenMetric(""), + innerDD, + CloseMetric(), + ), + ), + ) + + var buffer bytes.Buffer + barVal := uint32(55) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &barVal)) + bazVal := uint32(56) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &bazVal)) + iterations := uint32(2) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &iterations)) + it1Val := uint32(3) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it1Val)) + it2Val := uint32(3) + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &it2Val)) + dc := NewDecodeContext() + require.NoError(t, dc.Decode(dd, &buffer)) + require.Equal(t, 2, len(dc.GetMetrics())) + m := dc.GetMetrics() + require.Equal(t, uint64(barVal), getField(m[0], "bar")) + require.Equal(t, fmt.Sprintf("%d", bazVal), getTag(m[0], "baz")) + require.Equal(t, uint64(it1Val), getField(m[0], "foo")) + require.Equal(t, uint64(barVal), getField(m[1], "bar")) + require.Equal(t, fmt.Sprintf("%d", bazVal), getTag(m[1], "baz")) + require.Equal(t, uint64(it2Val), getField(m[1], "foo")) +} + +func Test_BasicEncapsulated(t *testing.T) { + + encap1Value := uint32(2) + encap2Value := uint32(3) + var encapBuffer bytes.Buffer + require.NoError(t, binary.Write(&encapBuffer, binary.BigEndian, &encap1Value)) + require.NoError(t, binary.Write(&encapBuffer, binary.BigEndian, &encap2Value)) + + encapSize := uint32(encapBuffer.Len()) + envelopeValue := uint32(4) + var envelopeBuffer bytes.Buffer + + require.NoError(t, binary.Write(&envelopeBuffer, binary.BigEndian, &encapSize)) + l, e := envelopeBuffer.Write(encapBuffer.Bytes()) + require.NoError(t, e) + require.Equal(t, encapSize, uint32(l)) + require.NoError(t, binary.Write(&envelopeBuffer, binary.BigEndian, &envelopeValue)) + + innerDD := U32() + envelopeDD := U32() // the buffer contains another U32 but the encpaultation will ignore it + dd := Seq( + U32().Encapsulated(math.MaxInt32, innerDD), + envelopeDD, + ) + require.NoError(t, Execute(dd, &envelopeBuffer)) + + require.Equal(t, 0, envelopeBuffer.Len()) + x, _ := envelopeDD.(*valueDirective) + require.Equal(t, &envelopeValue, x.value) + y, _ := innerDD.(*valueDirective) + require.Equal(t, &encap1Value, y.value) +} + +func Test_EncapsulationLimit(t *testing.T) { + + encap1Value := uint32(2) + encap2Value := uint32(3) + var encapBuffer bytes.Buffer + require.NoError(t, binary.Write(&encapBuffer, binary.BigEndian, &encap1Value)) + require.NoError(t, binary.Write(&encapBuffer, binary.BigEndian, &encap2Value)) + + encapSize := uint32(encapBuffer.Len()) + envelopeValue := uint32(4) + var envelopeBuffer bytes.Buffer + + require.NoError(t, binary.Write(&envelopeBuffer, binary.BigEndian, &encapSize)) + l, e := envelopeBuffer.Write(encapBuffer.Bytes()) + require.NoError(t, e) + require.Equal(t, encapSize, uint32(l)) + require.NoError(t, binary.Write(&envelopeBuffer, binary.BigEndian, &envelopeValue)) + + innerDD := U32() + envelopeDD := U32() + dd := Seq( + U32().Encapsulated(4, innerDD), // 4 bytes, not 8 bytes or higher as max + envelopeDD, + ) + require.Error(t, Execute(dd, &envelopeBuffer)) +} + +func Test_cantEncapulatedBytes(t *testing.T) { + defer expectPanic(t, "cantEncapulatedBytes") + _ = Bytes(1).Encapsulated(math.MaxInt32, U32()) +} + +func Test_BasicRef(t *testing.T) { + var x interface{} + dd1 := U32().Ref(&x) + dd2 := Ref(x) + dd := Seq( + dd1, + dd2, + ) + y, ok := dd2.(*valueDirective) + require.True(t, ok) + require.Equal(t, y.reference, x) + + value := uint32(1001) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + require.NoError(t, Execute(dd, &buffer)) + + y, _ = dd1.(*valueDirective) + require.Equal(t, &value, y.value) + + y, _ = dd2.(*valueDirective) + require.Equal(t, &value, y.value) +} + +func Test_RefReassignError(t *testing.T) { + defer expectPanic(t, "iter iter") + var x interface{} + U32().Ref(&x) + U32().Ref(&x) +} + +func Test_ToU32(t *testing.T) { + u := U32().Do(U32ToU32(func(in uint32) uint32 { return in >> 2 }).AsF("x")) + dd := Seq(OpenMetric(""), u, CloseMetric()) + + value := uint32(1001) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + + dc := NewDecodeContext() + require.NoError(t, dc.Decode(dd, &buffer)) + + // require original value decoded + x, _ := u.(*valueDirective) + require.Equal(t, &value, x.value) + + // require field ejected + require.Equal(t, 1, len(dc.GetMetrics())) + m := dc.GetMetrics() + require.Equal(t, uint64(value>>2), getField(m[0], "x")) +} + +func expectPanic(t *testing.T, msg string) { + if r := recover(); r == nil { + t.Errorf(msg) + } +} + +func Test_U32BlankCanvasIter(t *testing.T) { + u := U32().Iter(math.MaxInt32, U32()) + func() { + defer expectPanic(t, "iter iter") + u.Iter(math.MaxInt32, U32()) + }() + func() { + defer expectPanic(t, "iter switch") + u.Switch(Case(uint32(0), U32())) + }() + func() { + defer expectPanic(t, "iter encap") + u.Encapsulated(math.MaxInt32, U32()) + }() + func() { + defer expectPanic(t, "iter do") + u.Do(AsF("foo")) + }() +} +func Test_U32BlankCanvasSwitch(t *testing.T) { + u := U32().Switch(Case(uint32(0), U32())) + func() { + defer expectPanic(t, "switch iter") + u.Iter(math.MaxInt32, U32()) + }() + func() { + defer expectPanic(t, "switch switch") + u.Switch(Case(uint32(0), U32())) + }() + func() { + defer expectPanic(t, "switch encap") + u.Encapsulated(math.MaxInt32, U32()) + }() + func() { + defer expectPanic(t, "switch do") + u.Do(AsF("foo")) + }() +} + +func Test_U32BasicSwitch(t *testing.T) { + s := U32().Switch(Case(uint32(0), nil)) + value := uint32(0) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + dc := NewDecodeContext() + require.NoError(t, dc.Decode(s, &buffer)) +} + +func Test_U32BasicSwitchDefault(t *testing.T) { + s := U32().Switch(Case(uint32(0), nil), DefaultCase(nil)) + value := uint32(2) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + dc := NewDecodeContext() + require.NoError(t, dc.Decode(s, &buffer)) +} + +func Test_U16(t *testing.T) { + dd := U16() + value := uint16(1001) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + require.NoError(t, Execute(dd, &buffer)) + require.Equal(t, 0, buffer.Len()) + x, _ := dd.(*valueDirective) + require.Equal(t, &value, x.value) +} + +func Test_U16Value(t *testing.T) { + myU16 := uint16(5) + dd := U16Value(&myU16) + var buffer bytes.Buffer + require.NoError(t, Execute(dd, &buffer)) + x, _ := dd.(*valueDirective) + require.Equal(t, &myU16, x.value) +} + +func Test_Bytes(t *testing.T) { + dd := Bytes(4) + value := []byte{0x01, 0x02, 0x03, 0x04} + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + require.NoError(t, Execute(dd, &buffer)) + require.Equal(t, 0, buffer.Len()) + x, _ := dd.(*valueDirective) + require.Equal(t, value, x.value) +} + +func Test_nilRefAnfWongTypeRef(t *testing.T) { + func() { + defer expectPanic(t, "Test_nilRef") + Ref(nil) + }() + + func() { + defer expectPanic(t, "Test_nilRef") + f := new(uint32) + Ref(f) + }() +} diff --git a/plugins/inputs/sflow/decoder/funcs.go b/plugins/inputs/sflow/decoder/funcs.go new file mode 100644 index 000000000..c90e1488f --- /dev/null +++ b/plugins/inputs/sflow/decoder/funcs.go @@ -0,0 +1,216 @@ +package decoder + +import ( + "fmt" + "time" + + "github.com/influxdata/telegraf/metric" +) + +// U32 answers a directive for 32bit Unsigned Integers +func U32() ValueDirective { + return &valueDirective{value: new(uint32)} +} + +// U64 answers a directive for 64bit Unsigned Integers +func U64() ValueDirective { + return &valueDirective{value: new(uint64)} +} + +// U8 answers a directive for 8bit Unsigned Integers +func U8() ValueDirective { + return &valueDirective{value: new(uint8)} +} + +// U16 answers a directive for 32bit Unsigned Integers +func U16() ValueDirective { + return &valueDirective{value: new(uint16)} +} + +// U16Value answers a directive that doesn't actually decode itself but reused a value previously decoded of type uint16 +func U16Value(value *uint16) ValueDirective { + return &valueDirective{value: value, noDecode: true} +} + +// Bytes answers a value directive that will decode the specified number (len) of bytes from the packet +func Bytes(len int) ValueDirective { + return &valueDirective{value: make([]byte, len)} +} + +// Case answers a directive to be used within a Switch clause of a U32 directive +func Case(caseValue interface{}, dd Directive) CaseValueDirective { + return &caseValueDirective{caseValue: caseValue, isDefault: false, equalsDd: dd} +} + +// DefaultCase answers a case decoder directive that can be used as the default, catch all, of a Switch +func DefaultCase(dd Directive) CaseValueDirective { + return &caseValueDirective{caseValue: nil, isDefault: true, equalsDd: dd} +} + +// Ref answers a decoder that reuses, through referal, an existing U32 directive +func Ref(target interface{}) ValueDirective { + if target == nil { + panic("Ref given a nil reference") + } + r, ok := target.(*valueDirective) + if !ok { + panic(fmt.Sprintf("Ref not given a ValueDirective reference but a %T", target)) + } + return &valueDirective{reference: r, value: r.value} +} + +// Seq ansers a directive that sequentially executes a list of provided directives +func Seq(decoders ...Directive) Directive { + return &sequenceDirective{decoders: decoders} +} + +func SeqOf(decoders []Directive) Directive { + return &sequenceDirective{decoders: decoders} +} + +// OpenMetric answers a directive that opens a new metrics for collecting tags and fields +func OpenMetric(name string) Directive { + return &openMetric{name: name} +} + +// CloseMetric answers a directive that close the current metrics +func CloseMetric() Directive { + return &closeMetric{} +} + +// NewDecodeContext ansewers a new Decode Contect to support the process of decoding +func NewDecodeContext() *DecodeContext { + m, _ := metric.New("sflow", make(map[string]string), make(map[string]interface{}), time.Now()) + return &DecodeContext{preMetric: m} +} + +// U32ToU32 answers a decode operation that transforms a uint32 to a uint32 via the supplied fn +func U32ToU32(fn func(uint32) uint32) *U32ToU32DOp { + result := &U32ToU32DOp{fn: fn, baseDOp: baseDOp{}} + result.do = result + return result +} + +// U32ToStr answers a decode operation that transforms a uint32 to a string via the supplied fn +func U32ToStr(fn func(uint32) string) *U32ToStrDOp { + result := &U32ToStrDOp{baseDOp: baseDOp{}, fn: fn} + result.do = result + return result +} + +// U16ToStr answers a decode operation that transforms a uint16 to a string via the supplied fn +func U16ToStr(fn func(uint16) string) *U16ToStrDOp { + result := &U16ToStrDOp{baseDOp: baseDOp{}, fn: fn} + result.do = result + return result +} + +// U16ToU16 answers a decode operation that transforms a uint16 to a uint16 via the supplied fn +func U16ToU16(fn func(uint16) uint16) *U16ToU16DOp { + result := &U16ToU16DOp{baseDOp: baseDOp{}, fn: fn} + result.do = result + return result +} + +// AsF answers a decode operation that will output a field into the open metric with the given name +func AsF(name string) *AsFDOp { + result := &AsFDOp{baseDOp: baseDOp{}, name: name} + result.do = result + return result +} + +// AsT answers a decode operation that will output a tag into the open metric with the given name +func AsT(name string) *AsTDOp { + result := &AsTDOp{name: name, baseDOp: baseDOp{}} + result.do = result + return result +} + +// AsTimestamp answers a decode operation that will set the tiemstamp on the metric +func AsTimestamp() *AsTimestampDOp { + result := &AsTimestampDOp{baseDOp: baseDOp{}} + result.do = result + return result +} + +// BytesToStr answers a decode operation that transforms a []bytes to a string via the supplied fn +func BytesToStr(len int, fn func([]byte) string) *BytesToStrDOp { + result := &BytesToStrDOp{baseDOp: baseDOp{}, len: len, fn: fn} + result.do = result + return result +} + +// BytesTo answers a decode operation that transforms a []bytes to a interface{} via the supplied fn +func BytesTo(len int, fn func([]byte) interface{}) *BytesToDOp { + result := &BytesToDOp{baseDOp: baseDOp{}, len: len, fn: fn} + result.do = result + return result +} + +// BytesToU32 answers a decode operation that transforms a []bytes to an uint32 via the supplied fn +func BytesToU32(len int, fn func([]byte) uint32) *BytesToU32DOp { + result := &BytesToU32DOp{baseDOp: baseDOp{}, len: len, fn: fn} + result.do = result + return result +} + +// MapU32ToStr answers a decode operation that maps an uint32 to a string via the supplied map +func MapU32ToStr(m map[uint32]string) *U32ToStrDOp { + result := &U32ToStrDOp{fn: func(in uint32) string { + return m[in] + }, baseDOp: baseDOp{}} + result.do = result + return result +} + +// U32Assert answers a decode operation that will assert the uint32 is a particulr value or generate an error +func U32Assert(fn func(v uint32) bool, fmtStr string) *U32AssertDOp { + result := &U32AssertDOp{baseDOp: baseDOp{}, fn: fn, fmtStr: fmtStr} + result.do = result + return result +} + +func U16Assert(fn func(v uint16) bool, fmtStr string) *U16AssertDOp { + result := &U16AssertDOp{baseDOp: baseDOp{}, fn: fn, fmtStr: fmtStr} + result.do = result + return result +} + +// MapU16ToStr answers a decode operation that maps an uint16 to a string via the supplied map +func MapU16ToStr(m map[uint16]string) *U16ToStrDOp { + result := &U16ToStrDOp{baseDOp: baseDOp{}, fn: func(in uint16) string { + return m[in] + }} + result.do = result + return result +} + +// Set answers a decode operation that will set the supplied *value to the value passed through the operation +func Set(ptr interface{}) *SetDOp { + result := &SetDOp{ptr: ptr, baseDOp: baseDOp{}} + result.do = result + return result +} + +// ErrorDirective answers a decode directive that will generate an error +func ErrorDirective() Directive { + return &errorDirective{} +} + +// ErrorOp answers a decode operation that will generate an error +func ErrorOp(errorOnTestProcess bool) *ErrorDOp { + result := &ErrorDOp{baseDOp: baseDOp{}, errorOnTestProcess: errorOnTestProcess} + result.do = result + return result + +} + +// Notify answers a decode directive that will notify the supplied function upon execution +func Notify(fn func()) Directive { + return ¬ifyDirective{fn} +} + +// Nop answer a decode directive that is the null, benign, deocder +func Nop() Directive { + return Notify(func() {}) +} diff --git a/plugins/inputs/sflow/decoder/ops.go b/plugins/inputs/sflow/decoder/ops.go new file mode 100644 index 000000000..2a1e0c72b --- /dev/null +++ b/plugins/inputs/sflow/decoder/ops.go @@ -0,0 +1,490 @@ +package decoder + +import ( + "fmt" + "time" + + "github.com/influxdata/telegraf" +) + +// DirectiveOp are operations that are performed on values that have been decoded. +// They are expected to be chained together, in a flow programming style, and the +// Decode Directive that they are assigned to then walks back up the linked list to find the root +// operation that will then be performed (passing the value down through various transformations) +type DirectiveOp interface { + prev() DirectiveOp + // process method can be executed in two contexts, one to check that the given type + // of upstream value can be processed (not to process it) and then to actually process + // the upstream value. The difference in reqwuired behaviour is signalled by the presence + // of the DecodeContect - if nil. just test, if !nil process + process(dc *DecodeContext, upstreamValue interface{}) error +} + +type baseDOp struct { + p DirectiveOp + do DirectiveOp + n DirectiveOp +} + +func (op *baseDOp) prev() DirectiveOp { + return op.p +} + +func (op *baseDOp) AsF(name string) DirectiveOp { + result := &AsFDOp{baseDOp: baseDOp{p: op.do}, name: name} + result.do = result + op.n = result + return result +} + +func (op *baseDOp) AsT(name string) DirectiveOp { + result := &AsTDOp{baseDOp: baseDOp{p: op.do}, name: name} + result.do = result + op.n = result + return result +} + +func (op *baseDOp) Set(ptr interface{}) *SetDOp { + result := &SetDOp{baseDOp: baseDOp{p: op.do}, ptr: ptr} + result.do = result + op.n = result + return result +} + +// U32ToU32DOp is a deode operation that can process U32 to U32 +type U32ToU32DOp struct { + baseDOp + fn func(uint32) uint32 +} + +func (op *U32ToU32DOp) process(dc *DecodeContext, upstreamValue interface{}) error { + var out uint32 + switch v := upstreamValue.(type) { + case *uint32: + if dc != nil { + out = op.fn(*v) + } + default: + return fmt.Errorf("cannot process %T", v) + } + + if dc != nil && op.n != nil { + return op.n.process(dc, out) + } + return nil +} + +// ToString answers a U32ToStr decode operation that will transform this output of thie U32ToU32 into a string +func (op *U32ToU32DOp) ToString(fn func(uint32) string) *U32ToStrDOp { + result := &U32ToStrDOp{baseDOp: baseDOp{p: op}, fn: fn} + result.do = result + op.n = result + return result +} + +// AsFDOp is a deode operation that writes fields to metrics +type AsFDOp struct { + baseDOp + name string +} + +func (op *AsFDOp) process(dc *DecodeContext, upstreamValue interface{}) error { + var m telegraf.Metric + if dc != nil { + m = dc.currentMetric() + } + switch v := upstreamValue.(type) { + case *uint64: + if dc != nil { + m.AddField(op.name, *v) + } + case *uint32: + if dc != nil { + m.AddField(op.name, *v) + } + case uint32: + if dc != nil { + m.AddField(op.name, v) + } + case *uint16: + if dc != nil { + m.AddField(op.name, *v) + } + case uint16: + if dc != nil { + m.AddField(op.name, v) + } + case *uint8: + if dc != nil { + m.AddField(op.name, *v) + } + case uint8: + if dc != nil { + m.AddField(op.name, v) + } + case string: + if dc != nil { + m.AddField(op.name, v) + } + default: + return fmt.Errorf("AsF cannot process %T", v) + } + return nil +} + +// AsTimestampDOp is a deode operation that sets the timestamp on the metric +type AsTimestampDOp struct { + baseDOp +} + +func (op *AsTimestampDOp) process(dc *DecodeContext, upstreamValue interface{}) error { + var m telegraf.Metric + if dc != nil { + m = dc.currentMetric() + } + switch v := upstreamValue.(type) { + case *uint32: + if dc != nil { + m.SetTime(time.Unix(int64(*v), 0)) + dc.timeHasBeenSet = true + } + default: + return fmt.Errorf("can't process %T", upstreamValue) + } + return nil +} + +// AsTDOp is a deode operation that writes tags to metrics +type AsTDOp struct { + baseDOp + name string + skipEmpty bool +} + +func (op *AsTDOp) process(dc *DecodeContext, upstreamValue interface{}) error { + var m telegraf.Metric + if dc != nil { + m = dc.currentMetric() + } + switch v := upstreamValue.(type) { + case *uint32: + if dc != nil { + m.AddTag(op.name, fmt.Sprintf("%d", *v)) + } + case uint32: + if dc != nil { + m.AddTag(op.name, fmt.Sprintf("%d", v)) + } + case *uint16: + if dc != nil { + m.AddTag(op.name, fmt.Sprintf("%d", *v)) + } + case uint16: + if dc != nil { + m.AddTag(op.name, fmt.Sprintf("%d", v)) + } + case *uint8: + if dc != nil { + m.AddTag(op.name, fmt.Sprintf("%d", *v)) + } + case uint8: + if dc != nil { + m.AddTag(op.name, fmt.Sprintf("%d", v)) + } + case string: + if dc != nil { + if !op.skipEmpty || v != "" { + m.AddTag(op.name, v) + } + } + default: + return fmt.Errorf("can't process %T", upstreamValue) + } + return nil +} + +func (op *AsTDOp) prev() DirectiveOp { + return op.p +} + +// BytesToStrDOp is a decode operation that transforms []bytes to strings +type BytesToStrDOp struct { + baseDOp + len int + fn func([]byte) string +} + +func (op *BytesToStrDOp) process(dc *DecodeContext, upstreamValue interface{}) error { + switch v := upstreamValue.(type) { + case []byte: + if len(v) == op.len { + if dc != nil { + out := op.fn(v) + if op.n != nil { + return op.n.process(dc, out) + } + } + } else { + return fmt.Errorf("cannot process len(%d) as requrire %d", len(v), op.len) + } + default: + return fmt.Errorf("cannot process %T", upstreamValue) + } + return nil +} + +// U32AssertDOp is a decode operation that asserts a particular uint32 value +type U32AssertDOp struct { + baseDOp + fn func(uint32) bool + fmtStr string +} + +func (op *U32AssertDOp) process(dc *DecodeContext, upstreamValue interface{}) error { + switch v := upstreamValue.(type) { + case *uint32: + if dc != nil && !op.fn(*v) { + return fmt.Errorf(op.fmtStr, *v) + } + default: + return fmt.Errorf("cannot process %T", upstreamValue) + } + return nil +} + +// U16AssertDOp is a decode operation that asserts a particular uint32 value +type U16AssertDOp struct { + baseDOp + fn func(uint16) bool + fmtStr string +} + +func (op *U16AssertDOp) process(dc *DecodeContext, upstreamValue interface{}) error { + switch v := upstreamValue.(type) { + case *uint16: + if dc != nil && !op.fn(*v) { + return fmt.Errorf(op.fmtStr, *v) + } + default: + return fmt.Errorf("cannot process %T", upstreamValue) + } + return nil +} + +// U32ToStrDOp is a decod eoperation that transforms a uint32 to a string +type U32ToStrDOp struct { + baseDOp + fn func(uint32) string +} + +func (op *U32ToStrDOp) process(dc *DecodeContext, upstreamValue interface{}) error { + switch v := upstreamValue.(type) { + case uint32: + if dc != nil && op.n != nil { + op.n.process(dc, (op.fn(v))) + } + case *uint32: + if dc != nil && op.n != nil { + return op.n.process(dc, (op.fn(*v))) + } + default: + return fmt.Errorf("cannot process %T", upstreamValue) + } + return nil +} + +// BreakIf answers a BreakIf operation that will break the current decode operation chain, without an error, if the value processed +// is the supplied value +func (op *U32ToStrDOp) BreakIf(value string) *BreakIfDOp { + result := &BreakIfDOp{baseDOp: baseDOp{p: op}, value: value} + result.do = result + op.n = result + return result +} + +// U16ToStrDOp is a decode operation that transforms a uint16 to a string +type U16ToStrDOp struct { + baseDOp + fn func(uint16) string +} + +func (op *U16ToStrDOp) process(dc *DecodeContext, upstreamValue interface{}) error { + switch v := upstreamValue.(type) { + case *uint16: + if dc != nil { + return op.n.process(dc, (op.fn(*v))) + } + default: + return fmt.Errorf("cannot process %T", upstreamValue) + } + return nil +} + +// BreakIfDOp is a decode operation that will break the current outer iteration +type BreakIfDOp struct { + baseDOp + value string +} + +func (op *BreakIfDOp) process(dc *DecodeContext, upstreamValue interface{}) error { + switch v := upstreamValue.(type) { + case string: + if dc != nil { + if v != op.value { + op.n.process(dc, v) + } + } + default: + return fmt.Errorf("cannot process %T", upstreamValue) + } + return nil +} + +// U16ToU16DOp is a decode operation that transfirms one uint16 to another uint16 +type U16ToU16DOp struct { + baseDOp + fn func(uint16) uint16 +} + +func (op *U16ToU16DOp) process(dc *DecodeContext, upstreamValue interface{}) error { + var out uint16 + var err error + switch v := upstreamValue.(type) { + case *uint16: + if dc != nil { + out = op.fn(*v) + } + default: + return fmt.Errorf("cannot process %T", upstreamValue) + } + if err != nil { + return err + } + if op.n != nil && dc != nil { + return op.n.process(dc, out) + } + return nil +} + +// BytesToU32DOp is a decode operation that transforms a []byte to a uint32 +type BytesToU32DOp struct { + baseDOp + len int + fn func([]byte) uint32 +} + +func (op *BytesToU32DOp) process(dc *DecodeContext, upstreamValue interface{}) error { + switch v := upstreamValue.(type) { + case []byte: + if len(v) == op.len { + out := op.fn(v) + if op.n != nil { + return op.n.process(dc, out) + } + } else { + return fmt.Errorf("cannot process %T as len(%d) != %d", upstreamValue, v, op.len) + } + default: + return fmt.Errorf("cannot process %T", upstreamValue) + } + return nil +} + +// SetDOp is a decode operation that will Set a pointer to a value to be the value processed +type SetDOp struct { + baseDOp + ptr interface{} +} + +func (op *SetDOp) process(dc *DecodeContext, upstreamValue interface{}) error { + switch v := upstreamValue.(type) { + case *uint32: + ptr, ok := op.ptr.(*uint32) + if ok { + if dc != nil { + *ptr = *v + } + } else { + return fmt.Errorf("cannot process as ptr %T and not *uint32", op.ptr) + } + case uint32: + ptr, ok := op.ptr.(*uint32) + if ok { + if dc != nil { + *ptr = v + } + } else { + return fmt.Errorf("cannot process as ptr %T and not *uint32", op.ptr) + } + case *uint16: + ptr, ok := op.ptr.(*uint16) + if ok { + if dc != nil { + *ptr = *v + } + } else { + return fmt.Errorf("cannot process as ptr %T and not *uint16", op.ptr) + } + case uint16: + ptr, ok := op.ptr.(*uint16) + if ok { + if dc != nil { + *ptr = v + } + } else { + return fmt.Errorf("cannot process as ptr %T and not *uint16", op.ptr) + } + case string: + ptr, ok := op.ptr.(*string) + if ok { + if dc != nil { + *ptr = v + } + } else { + return fmt.Errorf("cannot process as ptr %T and not *string", op.ptr) + } + default: + return fmt.Errorf("cannot process %T", upstreamValue) + } + if op.n != nil && dc != nil { + return op.n.process(dc, upstreamValue) + } + return nil +} + +// BytesToDOp is a decode operation that will transform []byte to interface{} according to a suppied function +type BytesToDOp struct { + baseDOp + len int + fn func([]byte) interface{} +} + +func (op *BytesToDOp) process(dc *DecodeContext, upstreamValue interface{}) error { + switch v := upstreamValue.(type) { + case []byte: + if len(v) == op.len { + if dc != nil { + out := op.fn(v) + return op.n.process(dc, out) + } + } else { + return fmt.Errorf("cannot process as len:%d required %d", len(v), op.len) + } + default: + return fmt.Errorf("cannot process %T", upstreamValue) + } + return nil +} + +// ErrorDOp is a decode operation that will generate an error +type ErrorDOp struct { + baseDOp + errorOnTestProcess bool +} + +func (op *ErrorDOp) process(dc *DecodeContext, upstreamValue interface{}) error { + if dc == nil && !op.errorOnTestProcess { + return nil + } + return fmt.Errorf("Error Op") +} diff --git a/plugins/inputs/sflow/decoder/ops_test.go b/plugins/inputs/sflow/decoder/ops_test.go new file mode 100644 index 000000000..2b626b55d --- /dev/null +++ b/plugins/inputs/sflow/decoder/ops_test.go @@ -0,0 +1,383 @@ +package decoder + +import ( + "bytes" + "encoding/binary" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_U64AsF(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := AsF("out") + in := uint64(5) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, in, getField(m, "out")) +} + +func Test_U32AsF(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := AsF("out") + in := uint32(5) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, uint64(in), getField(m, "out")) +} + +func Test_U16PtrAsF(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := AsF("out") + in := uint16(5) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, uint64(in), getField(m, "out")) +} + +func Test_U16AsF(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := AsF("out") + in := uint16(5) + require.NoError(t, ddo.process(dc, in)) + m := dc.currentMetric() + require.Equal(t, uint64(in), getField(m, "out")) +} + +func Test_U8AsF(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := AsF("out") + in := uint8(5) + require.NoError(t, ddo.process(dc, in)) + m := dc.currentMetric() + require.Equal(t, uint64(in), getField(m, "out")) +} + +func Test_U8PtrAsF(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := AsF("out") + in := uint8(5) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, uint64(in), getField(m, "out")) +} + +func Test_U32AsT(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := AsT("out") + in := uint32(5) + require.NoError(t, ddo.process(dc, in)) + m := dc.currentMetric() + require.Equal(t, fmt.Sprintf("%d", in), getTag(m, "out")) +} + +func Test_U32PtrAsT(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := AsT("out") + in := uint32(5) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, fmt.Sprintf("%d", in), getTag(m, "out")) +} + +func Test_U16AsT(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := AsT("out") + in := uint16(5) + require.NoError(t, ddo.process(dc, in)) + m := dc.currentMetric() + require.Equal(t, fmt.Sprintf("%d", in), getTag(m, "out")) +} + +func Test_U16PtrAsT(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := AsT("out") + in := uint16(5) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, fmt.Sprintf("%d", in), getTag(m, "out")) +} + +func Test_U8AsT(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := AsT("out") + in := uint8(5) + require.NoError(t, ddo.process(dc, in)) + m := dc.currentMetric() + require.Equal(t, fmt.Sprintf("%d", in), getTag(m, "out")) +} + +func Test_U8PtrAsT(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := AsT("out") + in := uint8(5) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, fmt.Sprintf("%d", in), getTag(m, "out")) +} + +func Test_U32ToU32AsF(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := U32ToU32(func(i uint32) uint32 { return i * 2 }) + ddo2 := ddo.AsF("out") + require.Equal(t, ddo, ddo2.prev()) + in := uint32(5) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, uint64(in*2), getField(m, "out")) +} + +func Test_U16ToU16AsF(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := U16ToU16(func(i uint16) uint16 { return i * 2 }) + ddo2 := ddo.AsF("out") + require.Equal(t, ddo, ddo2.prev()) + in := uint16(5) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, uint64(in*2), getField(m, "out")) +} + +func Test_U32ToStrAsT(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := U32ToStr(func(i uint32) string { return fmt.Sprintf("%d", i*2) }) + ddo2 := ddo.AsT("out") + require.Equal(t, ddo, ddo2.prev()) + in := uint32(5) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, fmt.Sprintf("%d", (in*2)), getTag(m, "out")) +} + +func Test_U16ToStrAsT(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := U16ToStr(func(i uint16) string { return fmt.Sprintf("%d", i*2) }) + ddo2 := ddo.AsT("out") + require.Equal(t, ddo, ddo2.prev()) + in := uint16(5) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, fmt.Sprintf("%d", (in*2)), getTag(m, "out")) +} + +func Test_MapU32ToStrAsT(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + myMap := map[uint32]string{5: "five"} + ddo := MapU32ToStr(myMap) + ddo2 := ddo.AsT("out") + require.Equal(t, ddo, ddo2.prev()) + in := uint32(5) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, "five", getTag(m, "out")) +} + +func Test_MapU16ToStrAsT(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + myMap := map[uint16]string{5: "five"} + ddo := MapU16ToStr(myMap) + ddo2 := ddo.AsT("out") + require.Equal(t, ddo, ddo2.prev()) + in := uint16(5) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, "five", getTag(m, "out")) +} + +func Test_DecDir_ToU32(t *testing.T) { + u := U32(). + Do(U32ToU32(func(in uint32) uint32 { return in >> 2 }).AsF("out1")). + Do(U32ToU32(func(in uint32) uint32 { return in * 2 }).AsF("out2")) + dd := Seq(OpenMetric(""), u, CloseMetric()) + + value := uint32(1001) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + + dc := NewDecodeContext() + require.NoError(t, dc.Decode(dd, &buffer)) + + x, _ := u.(*valueDirective) + require.Equal(t, &value, x.value) + + // require field ejected + require.Equal(t, 1, len(dc.GetMetrics())) + m := dc.GetMetrics() + require.Equal(t, uint64(value>>2), getField(m[0], "out1")) + require.Equal(t, uint64(value*2), getField(m[0], "out2")) +} + +func Test_BytesToStrAsT(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + f := func(b []byte) string { return fmt.Sprintf("%d:%d", b[0], b[1]) } + ddo := BytesToStr(2, f) + ddo2 := ddo.AsT("out") + require.Equal(t, ddo, ddo2.prev()) + in := []byte{0x01, 0x02} + require.NoError(t, ddo.process(dc, in)) + m := dc.currentMetric() + require.Equal(t, fmt.Sprintf("%d:%d", in[0], in[1]), getTag(m, "out")) +} + +func Test_BytesToAsT(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + f := func(b []byte) interface{} { return fmt.Sprintf("%d:%d", b[0], b[1]) } + ddo := BytesTo(2, f) + ddo2 := ddo.AsT("out") + require.Equal(t, ddo, ddo2.prev()) + in := []byte{0x01, 0x02} + require.NoError(t, ddo.process(dc, in)) + m := dc.currentMetric() + require.Equal(t, fmt.Sprintf("%d:%d", in[0], in[1]), getTag(m, "out")) +} + +func Test_BytesToU32AsF(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + f := func(b []byte) uint32 { return uint32(b[0] * b[1]) } + ddo := BytesToU32(2, f) + ddo2 := ddo.AsF("out") + require.Equal(t, ddo, ddo2.prev()) + in := []byte{0x01, 0x02} + require.NoError(t, ddo.process(dc, in)) + m := dc.currentMetric() + require.Equal(t, uint64(in[0]*in[1]), getField(m, "out")) +} + +func Test_U32require(t *testing.T) { + dc := NewDecodeContext() + ddo := U32Assert(func(in uint32) bool { return false }, "bad") + in := uint32(5) + require.Error(t, ddo.process(dc, &in)) +} + +func Test_U16require(t *testing.T) { + dc := NewDecodeContext() + ddo := U16Assert(func(in uint16) bool { return false }, "bad") + in := uint16(5) + require.Error(t, ddo.process(dc, &in)) +} + +func Test_Set(t *testing.T) { + dc := NewDecodeContext() + ptr := new(uint32) + ddo := Set(ptr) + in := uint32(5) + require.NoError(t, ddo.process(dc, &in)) + require.Equal(t, *ptr, in) +} + +func Test_U16Set(t *testing.T) { + dc := NewDecodeContext() + ptr := new(uint16) + ddo := Set(ptr) + in := uint16(5) + require.NoError(t, ddo.process(dc, in)) + require.Equal(t, *ptr, in) +} + +func Test_U16PtrSet(t *testing.T) { + dc := NewDecodeContext() + ptr := new(uint16) + ddo := Set(ptr) + in := uint16(5) + require.NoError(t, ddo.process(dc, &in)) + require.Equal(t, *ptr, in) +} + +func Test_U32toU32Set(t *testing.T) { + dc := NewDecodeContext() + ptr := new(uint32) + ddo := U32ToU32(func(in uint32) uint32 { return in * 2 }).Set(ptr).prev() + in := uint32(5) + require.NoError(t, ddo.process(dc, &in)) + require.Equal(t, *ptr, in*2) +} + +func Test_U32toU32toString(t *testing.T) { + dc := NewDecodeContext() + ptr := new(string) + ddo := U32ToU32(func(in uint32) uint32 { return in * 2 }).ToString(func(in uint32) string { return fmt.Sprintf("%d", in*2) }).Set(ptr).prev().prev() + in := uint32(2) + require.NoError(t, ddo.process(dc, &in)) + require.Equal(t, "8", *ptr) +} + +func Test_U32toU32toStringBreakIf(t *testing.T) { + dc := NewDecodeContext() + ptr := new(string) + ddo := U32ToU32(func(in uint32) uint32 { return in * 2 }).ToString(func(in uint32) string { return fmt.Sprintf("%d", in*2) }).BreakIf("8").Set(ptr).prev().prev().prev() + in := uint32(2) + require.NoError(t, ddo.process(dc, &in)) + require.Equal(t, "", *ptr) + + in = uint32(1) + require.NoError(t, ddo.process(dc, &in)) + require.Equal(t, "4", *ptr) +} + +func Test_notify(t *testing.T) { + value := uint32(1001) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + + ptr := new(uint32) + *ptr = uint32(2002) + var notificationOne uint32 + var notificationTwo uint32 + dd := Seq( + Notify(func() { notificationOne = *ptr }), + U32().Do(Set(ptr)), + Notify(func() { notificationTwo = *ptr }), + ) + + require.NoError(t, Execute(dd, &buffer)) + require.Equal(t, uint32(2002), notificationOne) + require.Equal(t, uint32(1001), notificationTwo) +} + +func Test_nop(t *testing.T) { + value := uint32(1001) + var buffer bytes.Buffer + require.NoError(t, binary.Write(&buffer, binary.BigEndian, &value)) + originalLen := buffer.Len() + dd := Seq( + Nop(), + ) + + require.NoError(t, Execute(dd, &buffer)) + require.Equal(t, originalLen, buffer.Len()) +} + +func Test_AsTimestamp(t *testing.T) { + dc := NewDecodeContext() + dc.openMetric("") + ddo := AsTimestamp() + now := time.Now() + in := uint32(now.Unix()) // only handles as uin32 (not uint64) + require.NoError(t, ddo.process(dc, &in)) + m := dc.currentMetric() + require.Equal(t, now.Unix(), m.Time().Unix()) +} diff --git a/plugins/inputs/sflow/decoder_test.go b/plugins/inputs/sflow/decoder_test.go new file mode 100644 index 000000000..33db1d1d2 --- /dev/null +++ b/plugins/inputs/sflow/decoder_test.go @@ -0,0 +1,975 @@ +package sflow + +import ( + "bytes" + "encoding/hex" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs/sflow/decoder" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestIPv4SW(t *testing.T) { + packet, err := hex.DecodeString("0000000500000001c0a80102000000100000f3d40bfa047f0000000200000001000000d00001210a000001fe000004000484240000000000000001fe00000200000000020000000100000090000000010000010b0000000400000080000c2936d3d694c691aa97600800450000f9f19040004011b4f5c0a80913c0a8090a00a1ba0500e5641f3081da02010104066d6f746f6770a281cc02047b46462e0201000201003081bd3012060d2b06010201190501010281dc710201003013060d2b06010201190501010281e66802025acc3012060d2b0601020119050101000003e9000000100000000900000000000000090000000000000001000000d00000e3cc000002100000400048eb740000000000000002100000020000000002000000010000009000000001000000970000000400000080000c2936d3d6fcecda44008f81000009080045000081186440003f119098c0a80815c0a8090a9a690202006d23083c33303e4170722031312030393a33333a3031206b6e6f64653120736e6d70645b313039385d3a20436f6e6e656374696f6e2066726f6d205544503a205b3139322e3136382e392e31305d3a34393233362d000003e90000001000000009000000000000000900000000") + require.NoError(t, err) + + dc := decoder.NewDecodeContext() + err = dc.Decode(V5Format(NewDefaultV5FormatOptions()), bytes.NewBuffer(packet)) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "192.168.1.2", + "dst_ip": "192.168.9.10", + "dst_mac": "00:0c:29:36:d3:d6", + "dst_port": "47621", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "510", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "512", + "sample_direction": "ingress", + "source_id_index": "510", + "source_id_type": "0", + "src_ip": "192.168.9.19", + "src_mac": "94:c6:91:aa:97:60", + "src_port": "161", + }, + map[string]interface{}{ + "bytes": uint64(0x042c00), + "drops": uint64(0x00), + "frame_length": uint64(0x010b), + "header_length": uint64(0x80), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0xf9), + "ip_ttl": uint64(0x40), + "sampling_rate": uint64(0x0400), + "udp_length": uint64(0xe5), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "192.168.1.2", + "dst_ip": "192.168.9.10", + "dst_mac": "00:0c:29:36:d3:d6", + "dst_port": "514", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "528", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "512", + "sample_direction": "ingress", + "source_id_index": "528", + "source_id_type": "0", + "src_ip": "192.168.8.21", + "src_mac": "fc:ec:da:44:00:8f", + "src_port": "39529", + }, + map[string]interface{}{ + "bytes": uint64(0x25c000), + "drops": uint64(0x00), + "frame_length": uint64(0x97), + "header_length": uint64(0x80), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x81), + "ip_ttl": uint64(0x3f), + "sampling_rate": uint64(0x4000), + "udp_length": uint64(0x6d), + }, + time.Unix(0, 0), + ), + } + actual := dc.GetMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) +} + +func BenchmarkDecodeIPv4SW(b *testing.B) { + packet, err := hex.DecodeString("0000000500000001c0a80102000000100000f3d40bfa047f0000000200000001000000d00001210a000001fe000004000484240000000000000001fe00000200000000020000000100000090000000010000010b0000000400000080000c2936d3d694c691aa97600800450000f9f19040004011b4f5c0a80913c0a8090a00a1ba0500e5641f3081da02010104066d6f746f6770a281cc02047b46462e0201000201003081bd3012060d2b06010201190501010281dc710201003013060d2b06010201190501010281e66802025acc3012060d2b0601020119050101000003e9000000100000000900000000000000090000000000000001000000d00000e3cc000002100000400048eb740000000000000002100000020000000002000000010000009000000001000000970000000400000080000c2936d3d6fcecda44008f81000009080045000081186440003f119098c0a80815c0a8090a9a690202006d23083c33303e4170722031312030393a33333a3031206b6e6f64653120736e6d70645b313039385d3a20436f6e6e656374696f6e2066726f6d205544503a205b3139322e3136382e392e31305d3a34393233362d000003e90000001000000009000000000000000900000000") + require.NoError(b, err) + + dc := decoder.NewDecodeContext() + err = dc.Decode(V5Format(NewDefaultV5FormatOptions()), bytes.NewBuffer(packet)) + require.NoError(b, err) + + format := V5Format(NewDefaultV5FormatOptions()) + b.ResetTimer() + for n := 0; n < b.N; n++ { + err := dc.Decode(format, bytes.NewBuffer(packet)) + if err != nil { + panic(err) + } + + _ = dc.GetMetrics() + } +} + +func BenchmarkNewV5FormatDirective(b *testing.B) { + for n := 0; n < b.N; n++ { + _ = V5Format(NewDefaultV5FormatOptions()) + } +} + +func TestExpandFlow(t *testing.T) { + packet, err := hex.DecodeString("00000005000000010a00015000000000000f58998ae119780000000300000003000000c4000b62a90000000000100c840000040024fb7e1e0000000000000000001017840000000000100c8400000001000000010000009000000001000005bc0000000400000080001b17000130001201f58d44810023710800450205a6305440007e06ee92ac100016d94d52f505997e701fa1e17aff62574a50100200355f000000ffff00000b004175746f72697a7a6174610400008040ffff000400008040050031303030320500313030302004000000000868a200000000000000000860a200000000000000000003000000c40003cecf000000000010170400004000a168ac1c000000000000000000101784000000000010170400000001000000010000009000000001000005f200000004000000800024e8324338d4ae52aa0b54810020060800450005dc5420400080061397c0a8060cc0a806080050efcfbb25bad9a21c839a501000fff54000008a55f70975a0ff88b05735597ae274bd81fcba17e6e9206b8ea0fb07d05fc27dad06cfe3fdba5d2fc4d057b0add711e596cbe5e9b4bbe8be59cd77537b7a89f7414a628b736d00000003000000c0000c547a0000000000100c04000004005bc3c3b50000000000000000001017840000000000100c0400000001000000010000008c000000010000007e000000040000007a001b17000130001201f58d448100237108004500006824ea4000ff32c326d94d5105501018f02e88d003000001dd39b1d025d1c68689583b2ab21522d5b5a959642243804f6d51e63323091cc04544285433eb3f6b29e1046a6a2fa7806319d62041d8fa4bd25b7cd85b8db54202054a077ac11de84acbe37a550004") + require.NoError(t, err) + + dc := decoder.NewDecodeContext() + err = dc.Decode(V5Format(NewDefaultV5FormatOptions()), bytes.NewBuffer(packet)) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "10.0.1.80", + "dst_ip": "217.77.82.245", + "dst_mac": "00:1b:17:00:01:30", + "dst_port": "32368", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "1054596", + "ip_dscp": "0", + "ip_ecn": "2", + "output_ifindex": "1051780", + "sample_direction": "egress", + "source_id_index": "1051780", + "source_id_type": "0", + "src_ip": "172.16.0.22", + "src_mac": "00:12:01:f5:8d:44", + "src_port": "1433", + }, + map[string]interface{}{ + "bytes": uint64(0x16f000), + "drops": uint64(0x00), + "frame_length": uint64(0x05bc), + "header_length": uint64(0x80), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x05a6), + "ip_ttl": uint64(0x7e), + "sampling_rate": uint64(0x0400), + "tcp_header_length": uint64(0x40), + "tcp_urgent_pointer": uint64(0x00), + "tcp_window_size": uint64(0x0200), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "10.0.1.80", + "dst_ip": "192.168.6.8", + "dst_mac": "00:24:e8:32:43:38", + "dst_port": "61391", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "1054596", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "1054468", + "sample_direction": "egress", + "source_id_index": "1054468", + "source_id_type": "0", + "src_ip": "192.168.6.12", + "src_mac": "d4:ae:52:aa:0b:54", + "src_port": "80", + }, + map[string]interface{}{ + "bytes": uint64(0x017c8000), + "drops": uint64(0x00), + "frame_length": uint64(0x05f2), + "header_length": uint64(0x80), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x05dc), + "ip_ttl": uint64(0x80), + "sampling_rate": uint64(0x4000), + "tcp_header_length": uint64(0x40), + "tcp_urgent_pointer": uint64(0x00), + "tcp_window_size": uint64(0xff), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "10.0.1.80", + "dst_ip": "80.16.24.240", + "dst_mac": "00:1b:17:00:01:30", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "1054596", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "1051652", + "sample_direction": "egress", + "source_id_index": "1051652", + "source_id_type": "0", + "src_ip": "217.77.81.5", + "src_mac": "00:12:01:f5:8d:44", + }, + map[string]interface{}{ + "bytes": uint64(0x01f800), + "drops": uint64(0x00), + "frame_length": uint64(0x7e), + "header_length": uint64(0x7a), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x68), + "ip_ttl": uint64(0xff), + "sampling_rate": uint64(0x0400), + }, + time.Unix(0, 0), + ), + } + actual := dc.GetMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) +} + +func TestIPv4SWRT(t *testing.T) { + packet, err := hex.DecodeString("000000050000000189dd4f010000000000003d4f21151ad40000000600000001000000bc354b97090000020c000013b175792bea000000000000028f0000020c0000000300000001000000640000000100000058000000040000005408b2587a57624c16fc0b61a5080045000046c3e440003a1118a0052aada7569e5ab367a6e35b0032d7bbf1f2fb2eb2490a97f87abc31e135834be367000002590000ffffffffffffffff02add830d51e0aec14cf000003e90000001000000000000000000000000000000000000003ea0000001000000001c342e32a000000160000000b00000001000000a88b8ffb57000002a2000013b12e344fd800000000000002a20000028f0000000300000001000000500000000100000042000000040000003e4c16fc0b6202c03e0fdecafe080045000030108000007d11fe45575185a718693996f0570e8c001c20614ad602003fd6d4afa6a6d18207324000271169b00000000003e90000001000000000000000000000000000000000000003ea000000100000000189dd4f210000000f0000001800000001000000e8354b970a0000020c000013b175793f9b000000000000028f0000020c00000003000000010000009000000001000001a500000004000000800231466d0b2c4c16fc0b61a5080045000193198f40003a114b75052aae1f5f94c778678ef24d017f50ea7622287c30799e1f7d45932d01ca92c46d930000927c0000ffffffffffffffff02ad0eea6498953d1c7ebb6dbdf0525c80e1a9a62bacfea92f69b7336c2f2f60eba0593509e14eef167eb37449f05ad70b8241c1a46d000003e90000001000000000000000000000000000000000000003ea0000001000000001c342e1fd000000160000001000000001000000e8354b970b0000020c000013b17579534c000000000000028f0000020c00000003000000010000009000000001000000b500000004000000800231466d0b2c4c16fc0b61a50800450000a327c240003606fd67b93c706a021ff365045fe8a0976d624df8207083501800edb31b0000485454502f312e3120323030204f4b0d0a5365727665723a2050726f746f636f6c20485454500d0a436f6e74656e742d4c656e6774683a20313430340d0a436f6e6e656374696f6e3a20000003e90000001000000000000000000000000000000000000003ea0000001000000001c342e1fd000000170000001000000001000000e8354b970c0000020c000013b1757966fd000000000000028f0000020c000000030000000100000090000000010000018e00000004000000800231466d0b2c4c16fc0b61a508004500017c7d2c40003a116963052abd8d021c940e67e7e0d501682342dbe7936bd47ef487dee5591ec1b24d83622e000072250000ffffffffffffffff02ad0039d8ba86a90017071d76b177de4d8c4e23bcaaaf4d795f77b032f959e0fb70234d4c28922d4e08dd3330c66e34bff51cc8ade5000003e90000001000000000000000000000000000000000000003ea0000001000000001c342e1fd000000160000001000000001000000e80d6146ac000002a1000013b17880b49d00000000000002a10000028f00000003000000010000009000000001000005ee00000004000000804c16fc0b6201d8b122766a2c0800450005dc04574000770623a11fcd80a218691d4cf2fe01bbd4f47482065fd63a5010fabd7987000052a20002c8c43ea91ca1eaa115663f5218a37fbb409dfbbedff54731ef41199b35535905ac2366a05a803146ced544abf45597f3714327d59f99e30c899c39fc5a4b67d12087bf8db2bc000003e90000001000000000000000000000000000000000000003ea000000100000000189dd4f210000001000000018") + require.NoError(t, err) + + dc := decoder.NewDecodeContext() + err = dc.Decode(V5Format(NewDefaultV5FormatOptions()), bytes.NewBuffer(packet)) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "137.221.79.1", + "dst_ip": "86.158.90.179", + "dst_mac": "08:b2:58:7a:57:62", + "dst_port": "58203", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "655", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "524", + "sample_direction": "egress", + "source_id_index": "524", + "source_id_type": "0", + "src_ip": "5.42.173.167", + "src_mac": "4c:16:fc:0b:61:a5", + "src_port": "26534", + }, + map[string]interface{}{ + "bytes": uint64(0x06c4d8), + "drops": uint64(0x00), + "frame_length": uint64(0x58), + "header_length": uint64(0x54), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x46), + "ip_ttl": uint64(0x3a), + "sampling_rate": uint64(0x13b1), + "udp_length": uint64(0x32), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "137.221.79.1", + "dst_ip": "24.105.57.150", + "dst_mac": "4c:16:fc:0b:62:02", + "dst_port": "3724", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "674", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "655", + "sample_direction": "ingress", + "source_id_index": "674", + "source_id_type": "0", + "src_ip": "87.81.133.167", + "src_mac": "c0:3e:0f:de:ca:fe", + "src_port": "61527", + }, + map[string]interface{}{ + "bytes": uint64(0x0513a2), + "drops": uint64(0x00), + "frame_length": uint64(0x42), + "header_length": uint64(0x3e), + "ip_flags": uint64(0x00), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x30), + "ip_ttl": uint64(0x7d), + "sampling_rate": uint64(0x13b1), + "udp_length": uint64(0x1c), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "137.221.79.1", + "dst_ip": "95.148.199.120", + "dst_mac": "02:31:46:6d:0b:2c", + "dst_port": "62029", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "655", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "524", + "sample_direction": "egress", + "source_id_index": "524", + "source_id_type": "0", + "src_ip": "5.42.174.31", + "src_mac": "4c:16:fc:0b:61:a5", + "src_port": "26510", + }, + map[string]interface{}{ + "bytes": uint64(0x206215), + "drops": uint64(0x00), + "frame_length": uint64(0x01a5), + "header_length": uint64(0x80), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x0193), + "ip_ttl": uint64(0x3a), + "sampling_rate": uint64(0x13b1), + "udp_length": uint64(0x017f), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "137.221.79.1", + "dst_ip": "2.31.243.101", + "dst_mac": "02:31:46:6d:0b:2c", + "dst_port": "59552", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "655", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "524", + "sample_direction": "egress", + "source_id_index": "524", + "source_id_type": "0", + "src_ip": "185.60.112.106", + "src_mac": "4c:16:fc:0b:61:a5", + "src_port": "1119", + }, + map[string]interface{}{ + "bytes": uint64(0x0dec25), + "drops": uint64(0x00), + "frame_length": uint64(0xb5), + "header_length": uint64(0x80), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0xa3), + "ip_ttl": uint64(0x36), + "sampling_rate": uint64(0x13b1), + "tcp_header_length": uint64(0x40), + "tcp_urgent_pointer": uint64(0x00), + "tcp_window_size": uint64(0xed), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "137.221.79.1", + "dst_ip": "2.28.148.14", + "dst_mac": "02:31:46:6d:0b:2c", + "dst_port": "57557", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "655", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "524", + "sample_direction": "egress", + "source_id_index": "524", + "source_id_type": "0", + "src_ip": "5.42.189.141", + "src_mac": "4c:16:fc:0b:61:a5", + "src_port": "26599", + }, + map[string]interface{}{ + "bytes": uint64(0x1e9d2e), + "drops": uint64(0x00), + "frame_length": uint64(0x018e), + "header_length": uint64(0x80), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x017c), + "ip_ttl": uint64(0x3a), + "sampling_rate": uint64(0x13b1), + "udp_length": uint64(0x0168), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "137.221.79.1", + "dst_ip": "24.105.29.76", + "dst_mac": "4c:16:fc:0b:62:01", + "dst_port": "443", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "673", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "655", + "sample_direction": "ingress", + "source_id_index": "673", + "source_id_type": "0", + "src_ip": "31.205.128.162", + "src_mac": "d8:b1:22:76:6a:2c", + "src_port": "62206", + }, + map[string]interface{}{ + "bytes": uint64(0x74c38e), + "drops": uint64(0x00), + "frame_length": uint64(0x05ee), + "header_length": uint64(0x80), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x05dc), + "ip_ttl": uint64(0x77), + "sampling_rate": uint64(0x13b1), + "tcp_header_length": uint64(0x40), + "tcp_urgent_pointer": uint64(0x00), + "tcp_window_size": uint64(0xfabd), + }, + time.Unix(0, 0), + ), + } + actual := dc.GetMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) +} + +func TestIPv6SW(t *testing.T) { + packet, err := hex.DecodeString("00000005000000010ae0648100000002000093d824ac82340000000100000001000000d000019f94000001010000100019f94000000000000000010100000000000000020000000100000090000000010000058c00000008000000800008e3fffc10d4f4be04612486dd60000000054e113a2607f8b0400200140000000000000008262000edc000e804a25e30c581af36fa01bbfa6f054e249810b584bcbf12926c2e29a779c26c72db483e8191524fe2288bfdaceaf9d2e724d04305706efcfdef70db86873bbacf29698affe4e7d6faa21d302f9b4b023291a05a000003e90000001000000001000000000000000100000000") + require.NoError(t, err) + + dc := decoder.NewDecodeContext() + err = dc.Decode(V5Format(NewDefaultV5FormatOptions()), bytes.NewBuffer(packet)) + require.NoError(t, err) + + expected := []telegraf.Metric{ + + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "10.224.100.129", + "dst_ip": "2620:ed:c000:e804:a25e:30c5:81af:36fa", + "dst_mac": "00:08:e3:ff:fc:10", + "dst_port": "64111", + "ether_type": "IPv6", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "257", + "output_ifindex": "0", + "sample_direction": "ingress", + "source_id_index": "257", + "source_id_type": "0", + "src_ip": "2607:f8b0:4002:14::8", + "src_mac": "d4:f4:be:04:61:24", + "src_port": "443", + }, + map[string]interface{}{ + "bytes": uint64(0x58c000), + "drops": uint64(0x00), + "frame_length": uint64(0x058c), + "header_length": uint64(0x80), + "ip_dscp": uint64(0x00), + "ip_ecn": uint64(0x00), + "sampling_rate": uint64(0x1000), + "udp_length": uint64(0x054e), + }, + time.Unix(0, 0), + ), + } + actual := dc.GetMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) +} + +func TestExpandFlowCounter(t *testing.T) { + packet, err := hex.DecodeString("00000005000000010a00015000000000000f58898ae0fa380000000700000004000000ec00006ece0000000000101784000000030000000200000034000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100000058001017840000000600000002540be400000000010000000300007b8ebd37b97e61ff94860803e8e908ffb2b500000000000000000000000000018e7c31ee7ba4195f041874579ff021ba936300000000000000000000000100000007000000380011223344550003f8b15645e7e7d6960000002fe2fc02fc01edbf580000000000000000000000000000000001dcb9cf000000000000000000000004000000ec00006ece0000000000100184000000030000000200000034000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100000058001001840000000600000002540be400000000010000000300000841131d1fd9f850bfb103617cb401e6598900000000000000000000000000000bec1902e5da9212e3e96d7996e922513250000000000000000000000001000000070000003800112233445500005c260acbddb3000100000003e2fc02fc01ee414f0000000000000000000000000000000001dccdd30000000000000000000000030000008400004606000000000010030400004000ad9dc19b0000000000000000001017840000000000100304000000010000000100000050000000010000004400000004000000400012815116c4001517cf426d8100200608004500002895da40008006d74bc0a8060ac0a8064f04ef04aab1797122cf7eaf4f5010ffff7727000000000000000000000003000000b0001bd698000000000010148400000400700b180f000000000000000000101504000000000010148400000001000000010000007c000000010000006f000000040000006b001b17000131f0f755b9afc081000439080045000059045340005206920c1f0d4703d94d52e201bbf14977d1e9f15498af36801800417f1100000101080afdf3c70400e043871503010020ff268cfe2e2fd5fffe1d3d704a91d57b895f174c4b4428c66679d80a307294303f00000003000000c40003ceca000000000010170400004000a166aa7a000000000000000000101784000000000010170400000001000000010000009000000001000005f200000004000000800024e8369e2bd4ae52aa0b54810020060800450005dc4c71400080061b45c0a8060cc0a806090050f855692a7a94a1154ae1801001046b6a00000101080a6869a48d151016d046a84a7aa1c6743fa05179f7ecbd4e567150cb6f2077ff89480ae730637d26d2237c08548806f672c7476eb1b5a447b42cb9ce405994d152fa3e000000030000008c001bd699000000000010148400000400700b180f0000000000000000001015040000000000101484000000010000000100000058000000010000004a0000000400000046001b17000131f0f755b9afc0810004390800450000340ce040003a06bea5c1ce8793d94d528f00504c3b08b18f275b83d5df8010054586ad00000101050a5b83d5de5b83d5df11d800000003000000c400004e07000000000010028400004000c7ec97f2000000000000000000100784000000000010028400000001000000010000009000000001000005f2000000040000008000005e0001ff005056800dd18100000a0800450005dc5a42400040066ef70a000ac8c0a8967201bbe17c81597908caf8a05f5010010328610000f172263da0ba5d6223c079b8238bc841256bf17c4ffb08ad11c4fbff6f87ae1624a6b057b8baa9342114e5f5b46179083020cb560c4e9eadcec6dfd83e102ddbc27024803eb5") + require.NoError(t, err) + + dc := decoder.NewDecodeContext() + err = dc.Decode(V5Format(NewDefaultV5FormatOptions()), bytes.NewBuffer(packet)) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "10.0.1.80", + "dst_ip": "192.168.6.79", + "dst_mac": "00:12:81:51:16:c4", + "dst_port": "1194", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "1054596", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "1049348", + "sample_direction": "egress", + "source_id_index": "1049348", + "source_id_type": "0", + "src_ip": "192.168.6.10", + "src_mac": "00:15:17:cf:42:6d", + "src_port": "1263", + }, + map[string]interface{}{ + "bytes": uint64(0x110000), + "drops": uint64(0x00), + "frame_length": uint64(0x44), + "header_length": uint64(0x40), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x28), + "ip_ttl": uint64(0x80), + "sampling_rate": uint64(0x4000), + "tcp_header_length": uint64(0x40), + "tcp_urgent_pointer": uint64(0x00), + "tcp_window_size": uint64(0xffff), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "10.0.1.80", + "dst_ip": "217.77.82.226", + "dst_mac": "00:1b:17:00:01:31", + "dst_port": "61769", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "1053956", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "1053828", + "sample_direction": "egress", + "source_id_index": "1053828", + "source_id_type": "0", + "src_ip": "31.13.71.3", + "src_mac": "f0:f7:55:b9:af:c0", + "src_port": "443", + }, + map[string]interface{}{ + "bytes": uint64(0x01bc00), + "drops": uint64(0x00), + "frame_length": uint64(0x6f), + "header_length": uint64(0x6b), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x59), + "ip_ttl": uint64(0x52), + "sampling_rate": uint64(0x0400), + "tcp_header_length": uint64(0x00), + "tcp_urgent_pointer": uint64(0x00), + "tcp_window_size": uint64(0x41), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "10.0.1.80", + "dst_ip": "192.168.6.9", + "dst_mac": "00:24:e8:36:9e:2b", + "dst_port": "63573", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "1054596", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "1054468", + "sample_direction": "egress", + "source_id_index": "1054468", + "source_id_type": "0", + "src_ip": "192.168.6.12", + "src_mac": "d4:ae:52:aa:0b:54", + "src_port": "80", + }, + map[string]interface{}{ + "bytes": uint64(0x017c8000), + "drops": uint64(0x00), + "frame_length": uint64(0x05f2), + "header_length": uint64(0x80), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x05dc), + "ip_ttl": uint64(0x80), + "sampling_rate": uint64(0x4000), + "tcp_header_length": uint64(0x00), + "tcp_urgent_pointer": uint64(0x00), + "tcp_window_size": uint64(0x0104), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "10.0.1.80", + "dst_ip": "217.77.82.143", + "dst_mac": "00:1b:17:00:01:31", + "dst_port": "19515", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "1053956", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "1053828", + "sample_direction": "egress", + "source_id_index": "1053828", + "source_id_type": "0", + "src_ip": "193.206.135.147", + "src_mac": "f0:f7:55:b9:af:c0", + "src_port": "80", + }, + map[string]interface{}{ + "bytes": uint64(0x012800), + "drops": uint64(0x00), + "frame_length": uint64(0x4a), + "header_length": uint64(0x46), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x34), + "ip_ttl": uint64(0x3a), + "sampling_rate": uint64(0x0400), + "tcp_header_length": uint64(0x00), + "tcp_urgent_pointer": uint64(0x00), + "tcp_window_size": uint64(0x0545), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "10.0.1.80", + "dst_ip": "192.168.150.114", + "dst_mac": "00:00:5e:00:01:ff", + "dst_port": "57724", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "1050500", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "1049220", + "sample_direction": "egress", + "source_id_index": "1049220", + "source_id_type": "0", + "src_ip": "10.0.10.200", + "src_mac": "00:50:56:80:0d:d1", + "src_port": "443", + }, + map[string]interface{}{ + "bytes": uint64(0x017c8000), + "drops": uint64(0x00), + "frame_length": uint64(0x05f2), + "header_length": uint64(0x80), + "ip_flags": uint64(0x02), + "ip_fragment_offset": uint64(0x00), + "ip_total_length": uint64(0x05dc), + "ip_ttl": uint64(0x40), + "sampling_rate": uint64(0x4000), + "tcp_header_length": uint64(0x40), + "tcp_urgent_pointer": uint64(0x00), + "tcp_window_size": uint64(0x0103), + }, + time.Unix(0, 0), + ), + } + actual := dc.GetMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) +} + +func TestFlowExpandCounter(t *testing.T) { + packet, err := hex.DecodeString("00000005000000010a000150000000000006d14d8ae0fe200000000200000004000000ac00006d15000000004b00ca000000000200000002000000340000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000584b00ca0000000001000000000000000000000001000000010000308ae33bb950eb92a8a3004d0bb406899571000000000000000000000000000012f7ed9c9db8c24ed90604eaf0bd04636edb00000000000000000000000100000004000000ac00006d15000000004b0054000000000200000002000000340000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000584b00540000000001000000003b9aca000000000100000003000067ba8e64fd23fa65f26d0215ec4a0021086600000000000000000000000000002002c3b21045c2378ad3001fb2f300061872000000000000000000000001") + require.NoError(t, err) + + dc := decoder.NewDecodeContext() + err = dc.Decode(V5Format(NewDefaultV5FormatOptions()), bytes.NewBuffer(packet)) + require.NoError(t, err) + + // we don't do anything with samples yet + expected := []telegraf.Metric{} + actual := dc.GetMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) +} + +func TestUDPHeader(t *testing.T) { + options := NewDefaultV5FormatOptions() + octets := bytes.NewBuffer([]byte{ + 0x00, 0x01, // src_port + 0x00, 0x02, // dst_port + 0x00, 0x03, // udp_length + }) + + directive := decoder.Seq( + decoder.OpenMetric("sflow"), + udpHeader(options), + decoder.CloseMetric(), + ) + dc := decoder.NewDecodeContext() + err := directive.Execute(octets, dc) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "sflow", + map[string]string{ + "src_port": "1", + "dst_port": "2", + }, + map[string]interface{}{ + "udp_length": uint64(3), + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, dc.GetMetrics(), testutil.IgnoreTime()) +} + +func BenchmarkUDPHeader(b *testing.B) { + options := NewDefaultV5FormatOptions() + octets := bytes.NewBuffer([]byte{ + 0x00, 0x01, // src_port + 0x00, 0x02, // dst_port + 0x00, 0x03, // udp_length + }) + + directive := decoder.Seq( + decoder.OpenMetric("sflow"), + udpHeader(options), + decoder.CloseMetric(), + ) + dc := decoder.NewDecodeContext() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + _ = directive.Execute(octets, dc) + } +} + +func TestIPv4Header(t *testing.T) { + octets := bytes.NewBuffer( + []byte{ + 0x45, // version + IHL + 0x00, // ip_dscp + ip_ecn + 0x00, 0x00, // total length + 0x00, 0x00, // identification + 0x00, 0x00, // flags + frag offset + 0x00, // ttl + 0x11, // protocol; 0x11 = udp + 0x00, 0x00, // header checksum + 0x7f, 0x00, 0x00, 0x01, // src ip + 0x7f, 0x00, 0x00, 0x02, // dst ip + 0x00, 0x01, // src_port + 0x00, 0x02, // dst_port + 0x00, 0x03, // udp_length + }, + ) + dc := decoder.NewDecodeContext() + + options := NewDefaultV5FormatOptions() + directive := decoder.Seq( + decoder.OpenMetric("sflow"), + ipv4Header(options), + decoder.CloseMetric(), + ) + + err := directive.Execute(octets, dc) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "sflow", + map[string]string{ + "src_ip": "127.0.0.1", + "dst_ip": "127.0.0.2", + "ip_dscp": "0", + "ip_ecn": "0", + "src_port": "1", + "dst_port": "2", + }, + map[string]interface{}{ + "ip_flags": uint64(0), + "ip_fragment_offset": uint64(0), + "ip_total_length": uint64(0), + "ip_ttl": uint64(0), + "udp_length": uint64(3), + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, dc.GetMetrics(), testutil.IgnoreTime()) +} + +// Using the same Directive instance, prior paths through the parse tree should +// not affect the latest parse. +func TestIPv4HeaderSwitch(t *testing.T) { + options := NewDefaultV5FormatOptions() + directive := decoder.Seq( + decoder.OpenMetric("sflow"), + ipv4Header(options), + decoder.CloseMetric(), + ) + + octets := bytes.NewBuffer( + []byte{ + 0x45, // version + IHL + 0x00, // ip_dscp + ip_ecn + 0x00, 0x00, // total length + 0x00, 0x00, // identification + 0x00, 0x00, // flags + frag offset + 0x00, // ttl + 0x11, // protocol; 0x11 = udp + 0x00, 0x00, // header checksum + 0x7f, 0x00, 0x00, 0x01, // src ip + 0x7f, 0x00, 0x00, 0x02, // dst ip + 0x00, 0x01, // src_port + 0x00, 0x02, // dst_port + 0x00, 0x03, // udp_length + }, + ) + dc := decoder.NewDecodeContext() + err := directive.Execute(octets, dc) + require.NoError(t, err) + + octets = bytes.NewBuffer( + []byte{ + 0x45, // version + IHL + 0x00, // ip_dscp + ip_ecn + 0x00, 0x00, // total length + 0x00, 0x00, // identification + 0x00, 0x00, // flags + frag offset + 0x00, // ttl + 0x06, // protocol; 0x06 = tcp + 0x00, 0x00, // header checksum + 0x7f, 0x00, 0x00, 0x01, // src ip + 0x7f, 0x00, 0x00, 0x02, // dst ip + 0x00, 0x01, // src_port + 0x00, 0x02, // dst_port + 0x00, 0x00, 0x00, 0x00, // sequence + 0x00, 0x00, 0x00, 0x00, // ack_number + 0x00, 0x00, // tcp_header_length + 0x00, 0x00, // tcp_window_size + 0x00, 0x00, // checksum + 0x00, 0x00, // tcp_urgent_pointer + }, + ) + dc = decoder.NewDecodeContext() + err = directive.Execute(octets, dc) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "sflow", + map[string]string{ + "src_ip": "127.0.0.1", + "dst_ip": "127.0.0.2", + "ip_dscp": "0", + "ip_ecn": "0", + "src_port": "1", + "dst_port": "2", + }, + map[string]interface{}{ + "ip_flags": uint64(0), + "ip_fragment_offset": uint64(0), + "ip_total_length": uint64(0), + "ip_ttl": uint64(0), + "tcp_header_length": uint64(0), + "tcp_window_size": uint64(0), + "tcp_urgent_pointer": uint64(0), + }, + time.Unix(0, 0), + ), + } + + // check that udp fields are not set on the tcp metric + testutil.RequireMetricsEqual(t, expected, dc.GetMetrics(), testutil.IgnoreTime()) +} + +func TestUnknownProtocol(t *testing.T) { + octets := bytes.NewBuffer( + []byte{ + 0x45, // version + IHL + 0x00, // ip_dscp + ip_ecn + 0x00, 0x00, // total length + 0x00, 0x00, // identification + 0x00, 0x00, // flags + frag offset + 0x00, // ttl + 0x99, // protocol + 0x00, 0x00, // header checksum + 0x7f, 0x00, 0x00, 0x01, // src ip + 0x7f, 0x00, 0x00, 0x02, // dst ip + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + }, + ) + dc := decoder.NewDecodeContext() + + options := NewDefaultV5FormatOptions() + directive := decoder.Seq( + decoder.OpenMetric("sflow"), + ipv4Header(options), + decoder.CloseMetric(), + ) + + err := directive.Execute(octets, dc) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "sflow", + map[string]string{ + "src_ip": "127.0.0.1", + "dst_ip": "127.0.0.2", + "ip_dscp": "0", + "ip_ecn": "0", + }, + map[string]interface{}{ + "ip_flags": uint64(0), + "ip_fragment_offset": uint64(0), + "ip_total_length": uint64(0), + "ip_ttl": uint64(0), + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, dc.GetMetrics(), testutil.IgnoreTime()) +} diff --git a/plugins/inputs/sflow/sflow.go b/plugins/inputs/sflow/sflow.go new file mode 100644 index 000000000..7d113dd1e --- /dev/null +++ b/plugins/inputs/sflow/sflow.go @@ -0,0 +1,154 @@ +package sflow + +import ( + "bytes" + "context" + "fmt" + "io" + "net" + "net/url" + "strings" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/inputs/sflow/decoder" +) + +const sampleConfig = ` + ## Address to listen for sFlow packets. + ## example: service_address = "udp://:6343" + ## service_address = "udp4://:6343" + ## service_address = "udp6://:6343" + service_address = "udp://:6343" + + ## Set the size of the operating system's receive buffer. + ## example: read_buffer_size = "64KiB" + # read_buffer_size = "" +` + +const ( + maxPacketSize = 64 * 1024 +) + +type SFlow struct { + ServiceAddress string `toml:"service_address"` + ReadBufferSize internal.Size `toml:"read_buffer_size"` + + Log telegraf.Logger `toml:"-"` + + addr net.Addr + decoderOpts decoder.Directive + closer io.Closer + cancel context.CancelFunc + wg sync.WaitGroup +} + +// Description answers a description of this input plugin +func (s *SFlow) Description() string { + return "SFlow V5 Protocol Listener" +} + +// SampleConfig answers a sample configuration +func (s *SFlow) SampleConfig() string { + return sampleConfig +} + +func (s *SFlow) Init() error { + + config := NewDefaultV5FormatOptions() + s.decoderOpts = V5Format(config) + return nil +} + +// Start starts this sFlow listener listening on the configured network for sFlow packets +func (s *SFlow) Start(acc telegraf.Accumulator) error { + u, err := url.Parse(s.ServiceAddress) + if err != nil { + return err + } + + conn, err := listenUDP(u.Scheme, u.Host) + if err != nil { + return err + } + s.closer = conn + s.addr = conn.LocalAddr() + + if s.ReadBufferSize.Size > 0 { + conn.SetReadBuffer(int(s.ReadBufferSize.Size)) + } + + s.Log.Infof("Listening on %s://%s", s.addr.Network(), s.addr.String()) + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.read(acc, conn) + }() + + return nil +} + +// Gather is a NOOP for sFlow as it receives, asynchronously, sFlow network packets +func (s *SFlow) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (s *SFlow) Stop() { + if s.closer != nil { + s.closer.Close() + } + s.wg.Wait() +} + +func (s *SFlow) Address() net.Addr { + return s.addr +} + +func (s *SFlow) read(acc telegraf.Accumulator, conn net.PacketConn) { + buf := make([]byte, maxPacketSize) + for { + n, _, err := conn.ReadFrom(buf) + if err != nil { + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + acc.AddError(err) + } + break + } + s.process(acc, buf[:n]) + } +} + +func (s *SFlow) process(acc telegraf.Accumulator, buf []byte) { + decoder := decoder.NewDecodeContext() + if err := decoder.Decode(s.decoderOpts, bytes.NewBuffer(buf)); err != nil { + acc.AddError(fmt.Errorf("unable to parse incoming packet: %s", err)) + } + + metrics := decoder.GetMetrics() + for _, m := range metrics { + acc.AddMetric(m) + } +} + +func listenUDP(network string, address string) (*net.UDPConn, error) { + switch network { + case "udp", "udp4", "udp6": + addr, err := net.ResolveUDPAddr(network, address) + if err != nil { + return nil, err + } + return net.ListenUDP(network, addr) + default: + return nil, fmt.Errorf("unsupported network type: %s", network) + } +} + +// init registers this SFlow input plug in with the Telegraf framework +func init() { + inputs.Add("sflow", func() telegraf.Input { + return &SFlow{} + }) +} diff --git a/plugins/inputs/sflow/sflow_test.go b/plugins/inputs/sflow/sflow_test.go new file mode 100644 index 000000000..90f3a7c6d --- /dev/null +++ b/plugins/inputs/sflow/sflow_test.go @@ -0,0 +1,135 @@ +package sflow + +import ( + "encoding/hex" + "net" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestSFlow(t *testing.T) { + sflow := &SFlow{ + ServiceAddress: "udp://127.0.0.1:0", + Log: testutil.Logger{}, + } + err := sflow.Init() + require.NoError(t, err) + + var acc testutil.Accumulator + err = sflow.Start(&acc) + require.NoError(t, err) + defer sflow.Stop() + + client, err := net.Dial(sflow.Address().Network(), sflow.Address().String()) + require.NoError(t, err) + + packetBytes, err := hex.DecodeString("0000000500000001c0a80102000000100000f3d40bfa047f0000000200000001000000d00001210a000001fe000004000484240000000000000001fe00000200000000020000000100000090000000010000010b0000000400000080000c2936d3d694c691aa97600800450000f9f19040004011b4f5c0a80913c0a8090a00a1ba0500e5641f3081da02010104066d6f746f6770a281cc02047b46462e0201000201003081bd3012060d2b06010201190501010281dc710201003013060d2b06010201190501010281e66802025acc3012060d2b0601020119050101000003e9000000100000000900000000000000090000000000000001000000d00000e3cc000002100000400048eb740000000000000002100000020000000002000000010000009000000001000000970000000400000080000c2936d3d6fcecda44008f81000009080045000081186440003f119098c0a80815c0a8090a9a690202006d23083c33303e4170722031312030393a33333a3031206b6e6f64653120736e6d70645b313039385d3a20436f6e6e656374696f6e2066726f6d205544503a205b3139322e3136382e392e31305d3a34393233362d000003e90000001000000009000000000000000900000000") + require.NoError(t, err) + client.Write(packetBytes) + + acc.Wait(2) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "192.168.1.2", + "dst_ip": "192.168.9.10", + "dst_mac": "00:0c:29:36:d3:d6", + "dst_port": "47621", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "510", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "512", + "sample_direction": "ingress", + "source_id_index": "510", + "source_id_type": "0", + "src_ip": "192.168.9.19", + "src_mac": "94:c6:91:aa:97:60", + "src_port": "161", + }, + map[string]interface{}{ + "bytes": uint64(273408), + "drops": uint64(0), + "frame_length": uint64(267), + "header_length": uint64(128), + "ip_flags": uint64(2), + "ip_fragment_offset": uint64(0), + "ip_total_length": uint64(249), + "ip_ttl": uint64(64), + "sampling_rate": uint64(1024), + "udp_length": uint64(229), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "sflow", + map[string]string{ + "agent_address": "192.168.1.2", + "dst_ip": "192.168.9.10", + "dst_mac": "00:0c:29:36:d3:d6", + "dst_port": "514", + "ether_type": "IPv4", + "header_protocol": "ETHERNET-ISO88023", + "input_ifindex": "528", + "ip_dscp": "0", + "ip_ecn": "0", + "output_ifindex": "512", + "sample_direction": "ingress", + "source_id_index": "528", + "source_id_type": "0", + "src_ip": "192.168.8.21", + "src_mac": "fc:ec:da:44:00:8f", + "src_port": "39529", + }, + map[string]interface{}{ + "bytes": uint64(2473984), + "drops": uint64(0), + "frame_length": uint64(151), + "header_length": uint64(128), + "ip_flags": uint64(2), + "ip_fragment_offset": uint64(0), + "ip_total_length": uint64(129), + "ip_ttl": uint64(63), + "sampling_rate": uint64(16384), + "udp_length": uint64(109), + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), + testutil.IgnoreTime()) +} + +func BenchmarkSFlow(b *testing.B) { + sflow := &SFlow{ + ServiceAddress: "udp://127.0.0.1:0", + Log: testutil.Logger{}, + } + err := sflow.Init() + require.NoError(b, err) + + var acc testutil.Accumulator + err = sflow.Start(&acc) + require.NoError(b, err) + defer sflow.Stop() + + client, err := net.Dial(sflow.Address().Network(), sflow.Address().String()) + require.NoError(b, err) + + packetBytes, err := hex.DecodeString("0000000500000001c0a80102000000100000f3d40bfa047f0000000200000001000000d00001210a000001fe000004000484240000000000000001fe00000200000000020000000100000090000000010000010b0000000400000080000c2936d3d694c691aa97600800450000f9f19040004011b4f5c0a80913c0a8090a00a1ba0500e5641f3081da02010104066d6f746f6770a281cc02047b46462e0201000201003081bd3012060d2b06010201190501010281dc710201003013060d2b06010201190501010281e66802025acc3012060d2b0601020119050101000003e9000000100000000900000000000000090000000000000001000000d00000e3cc000002100000400048eb740000000000000002100000020000000002000000010000009000000001000000970000000400000080000c2936d3d6fcecda44008f81000009080045000081186440003f119098c0a80815c0a8090a9a690202006d23083c33303e4170722031312030393a33333a3031206b6e6f64653120736e6d70645b313039385d3a20436f6e6e656374696f6e2066726f6d205544503a205b3139322e3136382e392e31305d3a34393233362d000003e90000001000000009000000000000000900000000") + require.NoError(b, err) + + b.ResetTimer() + for n := 0; n < b.N; n++ { + client.Write(packetBytes) + acc.Wait(2) + } +}