From ee9d0fc4932c86b210f201ac78e949435c25df68 Mon Sep 17 00:00:00 2001 From: Steven Barth Date: Tue, 17 Sep 2019 01:57:25 +0200 Subject: [PATCH] Support NX-OS telemetry extensions in cisco_telemetry_mdt (#6177) --- Gopkg.lock | 2 + plugins/inputs/cisco_telemetry_mdt/README.md | 3 + .../cisco_telemetry_mdt.go | 305 ++++++++++++++---- .../cisco_telemetry_mdt_test.go | 244 +++++++++++++- 4 files changed, 466 insertions(+), 88 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 1260d471e..22520af3a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1486,6 +1486,7 @@ "credentials", "credentials/oauth", "encoding", + "encoding/gzip", "encoding/proto", "grpclog", "internal", @@ -1813,6 +1814,7 @@ "google.golang.org/grpc", "google.golang.org/grpc/codes", "google.golang.org/grpc/credentials", + "google.golang.org/grpc/encoding/gzip", "google.golang.org/grpc/metadata", "google.golang.org/grpc/peer", "google.golang.org/grpc/status", diff --git a/plugins/inputs/cisco_telemetry_mdt/README.md b/plugins/inputs/cisco_telemetry_mdt/README.md index 2848d0493..3545c6120 100644 --- a/plugins/inputs/cisco_telemetry_mdt/README.md +++ b/plugins/inputs/cisco_telemetry_mdt/README.md @@ -29,6 +29,9 @@ The TCP dialout transport is supported on IOS XR (32-bit and 64-bit) 6.1.x and l ## transport only. # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + ## Define (for certain nested telemetry measurements with embedded tags) which fields are tags + # embedded_tags = ["Cisco-IOS-XR-qos-ma-oper:qos/interface-table/interface/input/service-policy-names/service-policy-instance/statistics/class-stats/class-name"] + ## Define aliases to map telemetry encoding paths to simple measurement names [inputs.cisco_telemetry_mdt.aliases] ifstats = "ietf-interfaces:interfaces-state/interface/statistics" diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go index 74480cb8a..ddca8247d 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go @@ -7,10 +7,14 @@ import ( "io" "log" "net" + "path" + "strconv" "strings" "sync" "time" + "github.com/influxdata/telegraf/metric" + dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout" telemetry "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis" "github.com/golang/protobuf/proto" @@ -19,6 +23,9 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + + // Register GRPC gzip decoder to support compressed telemetry + _ "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/peer" ) @@ -34,6 +41,7 @@ type CiscoTelemetryMDT struct { ServiceAddress string `toml:"service_address"` MaxMsgSize int `toml:"max_msg_size"` Aliases map[string]string `toml:"aliases"` + EmbeddedTags []string `toml:"embedded_tags"` // GRPC TLS settings internaltls.ServerConfig @@ -43,9 +51,12 @@ type CiscoTelemetryMDT struct { listener net.Listener // Internal state - aliases map[string]string - acc telegraf.Accumulator - wg sync.WaitGroup + aliases map[string]string + warned map[string]struct{} + extraTags map[string]map[string]struct{} + mutex sync.Mutex + acc telegraf.Accumulator + wg sync.WaitGroup } // Start the Cisco MDT service @@ -58,11 +69,22 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error { } // Invert aliases list + c.warned = make(map[string]struct{}) c.aliases = make(map[string]string, len(c.Aliases)) for alias, path := range c.Aliases { c.aliases[path] = alias } + // Fill extra tags + c.extraTags = make(map[string]map[string]struct{}) + for _, tag := range c.EmbeddedTags { + dir := path.Dir(tag) + if _, hasKey := c.extraTags[dir]; !hasKey { + c.extraTags[dir] = make(map[string]struct{}) + } + c.extraTags[dir][path.Base(tag)] = struct{}{} + } + switch c.Transport { case "tcp": // TCP dialout server accept routine @@ -76,6 +98,7 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error { var opts []grpc.ServerOption tlsConfig, err := c.ServerConfig.TLSConfig() if err != nil { + c.listener.Close() return err } else if tlsConfig != nil { opts = append(opts, grpc.Creds(credentials.NewTLS(tlsConfig))) @@ -198,6 +221,8 @@ func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutS log.Printf("D! [inputs.cisco_telemetry_mdt]: Accepted Cisco MDT GRPC dialout connection from %s", peer.Addr) } + var chunkBuffer bytes.Buffer + for { packet, err := stream.Recv() if err != nil { @@ -212,7 +237,18 @@ func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutS break } - c.handleTelemetry(packet.Data) + // Reassemble chunked telemetry data received from NX-OS + if packet.TotalSize == 0 { + c.handleTelemetry(packet.Data) + } else if int(packet.TotalSize) <= c.MaxMsgSize { + chunkBuffer.Write(packet.Data) + if chunkBuffer.Len() >= int(packet.TotalSize) { + c.handleTelemetry(chunkBuffer.Bytes()) + chunkBuffer.Reset() + } + } else { + c.acc.AddError(fmt.Errorf("dropped too large packet: %dB > %dB", packet.TotalSize, c.MaxMsgSize)) + } } if peerOK { @@ -224,115 +260,239 @@ func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutS // Handle telemetry packet from any transport, decode and add as measurement func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) { - var namebuf bytes.Buffer - telemetry := &telemetry.Telemetry{} - err := proto.Unmarshal(data, telemetry) + msg := &telemetry.Telemetry{} + err := proto.Unmarshal(data, msg) if err != nil { c.acc.AddError(fmt.Errorf("Cisco MDT failed to decode: %v", err)) return } - for _, gpbkv := range telemetry.DataGpbkv { - var fields map[string]interface{} - + grouper := metric.NewSeriesGrouper() + for _, gpbkv := range msg.DataGpbkv { // Produce metadata tags var tags map[string]string // Top-level field may have measurement timestamp, if not use message timestamp measured := gpbkv.Timestamp if measured == 0 { - measured = telemetry.MsgTimestamp + measured = msg.MsgTimestamp } timestamp := time.Unix(int64(measured/1000), int64(measured%1000)*1000000) - // Populate tags and fields from toplevel GPBKV fields "keys" and "content" + // Find toplevel GPBKV fields "keys" and "content" + var keys, content *telemetry.TelemetryField = nil, nil for _, field := range gpbkv.Fields { - switch field.Name { - case "keys": - tags = make(map[string]string, len(field.Fields)+2) - tags["source"] = telemetry.GetNodeIdStr() - tags["subscription"] = telemetry.GetSubscriptionIdStr() - for _, subfield := range field.Fields { - c.parseGPBKVField(subfield, &namebuf, telemetry.EncodingPath, timestamp, tags, nil) - } - case "content": - fields = make(map[string]interface{}, len(field.Fields)) - for _, subfield := range field.Fields { - c.parseGPBKVField(subfield, &namebuf, telemetry.EncodingPath, timestamp, tags, fields) - } - default: - log.Printf("I! [inputs.cisco_telemetry_mdt]: Unexpected top-level MDT field: %s", field.Name) + if field.Name == "keys" { + keys = field + } else if field.Name == "content" { + content = field } } - // Find best alias for encoding path and emit measurement - if len(fields) > 0 && len(tags) > 0 && len(telemetry.EncodingPath) > 0 { - name := telemetry.EncodingPath - if alias, ok := c.aliases[name]; ok { - tags["path"] = name - name = alias - } else { - log.Printf("D! [inputs.cisco_telemetry_mdt]: No measurement alias for encoding path: %s", name) - } - c.acc.AddFields(name, fields, tags, timestamp) - } else { - c.acc.AddError(fmt.Errorf("empty encoding path or measurement")) + if keys == nil || content == nil { + log.Printf("I! [inputs.cisco_telemetry_mdt]: Message from %s missing keys or content", msg.GetNodeIdStr()) + continue } + + // Parse keys + tags = make(map[string]string, len(keys.Fields)+3) + tags["source"] = msg.GetNodeIdStr() + tags["subscription"] = msg.GetSubscriptionIdStr() + tags["path"] = msg.GetEncodingPath() + + for _, subfield := range keys.Fields { + c.parseKeyField(tags, subfield, "") + } + + // Parse values + for _, subfield := range content.Fields { + c.parseContentField(grouper, subfield, "", msg.EncodingPath, tags, timestamp) + } + } + + for _, metric := range grouper.Metrics() { + c.acc.AddMetric(metric) } } -// Recursively parse GPBKV field structure into fields or tags -func (c *CiscoTelemetryMDT) parseGPBKVField(field *telemetry.TelemetryField, namebuf *bytes.Buffer, - path string, timestamp time.Time, tags map[string]string, fields map[string]interface{}) { - - namelen := namebuf.Len() - if namelen > 0 { - namebuf.WriteRune('/') - } - namebuf.WriteString(strings.Replace(field.Name, "-", "_", -1)) - - // Decode Telemetry field value if set - var value interface{} +func decodeValue(field *telemetry.TelemetryField) interface{} { switch val := field.ValueByType.(type) { case *telemetry.TelemetryField_BytesValue: - value = val.BytesValue + return val.BytesValue case *telemetry.TelemetryField_StringValue: - value = val.StringValue + if len(val.StringValue) > 0 { + return val.StringValue + } case *telemetry.TelemetryField_BoolValue: - value = val.BoolValue + return val.BoolValue case *telemetry.TelemetryField_Uint32Value: - value = val.Uint32Value + return val.Uint32Value case *telemetry.TelemetryField_Uint64Value: - value = val.Uint64Value + return val.Uint64Value case *telemetry.TelemetryField_Sint32Value: - value = val.Sint32Value + return val.Sint32Value case *telemetry.TelemetryField_Sint64Value: - value = val.Sint64Value + return val.Sint64Value case *telemetry.TelemetryField_DoubleValue: - value = val.DoubleValue + return val.DoubleValue case *telemetry.TelemetryField_FloatValue: - value = val.FloatValue + return val.FloatValue + } + return nil +} + +func decodeTag(field *telemetry.TelemetryField) string { + switch val := field.ValueByType.(type) { + case *telemetry.TelemetryField_BytesValue: + return string(val.BytesValue) + case *telemetry.TelemetryField_StringValue: + return val.StringValue + case *telemetry.TelemetryField_BoolValue: + if val.BoolValue { + return "true" + } + return "false" + case *telemetry.TelemetryField_Uint32Value: + return strconv.FormatUint(uint64(val.Uint32Value), 10) + case *telemetry.TelemetryField_Uint64Value: + return strconv.FormatUint(val.Uint64Value, 10) + case *telemetry.TelemetryField_Sint32Value: + return strconv.FormatInt(int64(val.Sint32Value), 10) + case *telemetry.TelemetryField_Sint64Value: + return strconv.FormatInt(val.Sint64Value, 10) + case *telemetry.TelemetryField_DoubleValue: + return strconv.FormatFloat(val.DoubleValue, 'f', -1, 64) + case *telemetry.TelemetryField_FloatValue: + return strconv.FormatFloat(float64(val.FloatValue), 'f', -1, 32) + default: + return "" + } +} + +// Recursively parse tag fields +func (c *CiscoTelemetryMDT) parseKeyField(tags map[string]string, field *telemetry.TelemetryField, prefix string) { + localname := strings.Replace(field.Name, "-", "_", -1) + name := localname + if len(localname) == 0 { + name = prefix + } else if len(prefix) > 0 { + name = prefix + "/" + localname } - if value != nil { - // Distinguish between tags (keys) and fields (data) to write to - if fields != nil { - fields[namebuf.String()] = value + if tag := decodeTag(field); len(name) > 0 && len(tag) > 0 { + if _, exists := tags[localname]; !exists { // Use short keys whenever possible + tags[localname] = tag } else { - if _, exists := tags[field.Name]; !exists { // Use short keys whenever possible - tags[field.Name] = fmt.Sprint(value) - } else { - tags[namebuf.String()] = fmt.Sprint(value) - } + tags[name] = tag } } for _, subfield := range field.Fields { - c.parseGPBKVField(subfield, namebuf, path, timestamp, tags, fields) + c.parseKeyField(tags, subfield, name) + } +} + +func (c *CiscoTelemetryMDT) parseContentField(grouper *metric.SeriesGrouper, field *telemetry.TelemetryField, prefix string, + path string, tags map[string]string, timestamp time.Time) { + name := strings.Replace(field.Name, "-", "_", -1) + if len(name) == 0 { + name = prefix + } else if len(prefix) > 0 { + name = prefix + "/" + name } - namebuf.Truncate(namelen) + extraTags := c.extraTags[path+"/"+name] + + if value := decodeValue(field); value != nil { + // Do alias lookup, to shorten measurement names + measurement := path + if alias, ok := c.aliases[path]; ok { + measurement = alias + } else { + c.mutex.Lock() + if _, haveWarned := c.warned[path]; !haveWarned { + log.Printf("D! [inputs.cisco_telemetry_mdt]: No measurement alias for encoding path: %s", path) + c.warned[path] = struct{}{} + } + c.mutex.Unlock() + } + + grouper.Add(measurement, tags, timestamp, name, value) + return + } + + if len(extraTags) > 0 { + for _, subfield := range field.Fields { + if _, isExtraTag := extraTags[subfield.Name]; isExtraTag { + tags[name+"/"+subfield.Name] = decodeTag(subfield) + } + } + } + + var nxAttributes, nxChildren, nxRows *telemetry.TelemetryField + isNXOS := !strings.ContainsRune(path, ':') // IOS-XR and IOS-XE have a colon in their encoding path, NX-OS does not + for _, subfield := range field.Fields { + if isNXOS && subfield.Name == "attributes" && len(subfield.Fields) > 0 { + nxAttributes = subfield.Fields[0] + } else if isNXOS && subfield.Name == "children" && len(subfield.Fields) > 0 { + nxChildren = subfield + } else if isNXOS && strings.HasPrefix(subfield.Name, "ROW_") { + nxRows = subfield + } else if _, isExtraTag := extraTags[subfield.Name]; !isExtraTag { // Regular telemetry decoding + c.parseContentField(grouper, subfield, name, path, tags, timestamp) + } + } + + if nxAttributes == nil && nxRows == nil { + return + } else if nxRows != nil { + // NXAPI structure: https://developer.cisco.com/docs/cisco-nexus-9000-series-nx-api-cli-reference-release-9-2x/ + for _, row := range nxRows.Fields { + for i, subfield := range row.Fields { + if i == 0 { // First subfield contains the index, promote it from value to tag + tags[prefix] = decodeTag(subfield) + } else { + c.parseContentField(grouper, subfield, "", path, tags, timestamp) + } + } + delete(tags, prefix) + } + return + } + + // DME structure: https://developer.cisco.com/site/nxapi-dme-model-reference-api/ + rn := "" + dn := false + + for _, subfield := range nxAttributes.Fields { + if subfield.Name == "rn" { + rn = decodeTag(subfield) + } else if subfield.Name == "dn" { + dn = true + } + } + + if len(rn) > 0 { + tags[prefix] = rn + } else if !dn { // Check for distinguished name being present + c.acc.AddError(fmt.Errorf("NX-OS decoding failed: missing dn field")) + return + } + + for _, subfield := range nxAttributes.Fields { + if subfield.Name != "rn" { + c.parseContentField(grouper, subfield, "", path, tags, timestamp) + } + } + + if nxChildren != nil { + // This is a nested structure, children will inherit relative name keys of parent + for _, subfield := range nxChildren.Fields { + c.parseContentField(grouper, subfield, prefix, path, tags, timestamp) + } + } + delete(tags, prefix) } // Stop listener and cleanup @@ -363,6 +523,9 @@ const sampleConfig = ` ## transport only. # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + ## Define (for certain nested telemetry measurements with embedded tags) which fields are tags + # embedded_tags = ["Cisco-IOS-XR-qos-ma-oper:qos/interface-table/interface/input/service-policy-names/service-policy-instance/statistics/class-stats/class-name"] + ## Define aliases to map telemetry encoding paths to simple measurement names [inputs.cisco_telemetry_mdt.aliases] ifstats = "ietf-interfaces:interfaces-state/interface/statistics" diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go index d2c686c69..3736a8531 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go @@ -17,23 +17,6 @@ import ( "google.golang.org/grpc" ) -func TestHandleTelemetryEmpty(t *testing.T) { - c := &CiscoTelemetryMDT{Transport: "dummy"} - acc := &testutil.Accumulator{} - c.Start(acc) - - telemetry := &telemetry.Telemetry{ - DataGpbkv: []*telemetry.TelemetryField{ - {}, - }, - } - data, _ := proto.Marshal(telemetry) - - c.handleTelemetry(data) - assert.Contains(t, acc.Errors, errors.New("empty encoding path or measurement")) - assert.Empty(t, acc.Metrics) -} - func TestHandleTelemetryTwoSimple(t *testing.T) { c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"alias": "type:model/some/path"}} acc := &testutil.Accumulator{} @@ -174,6 +157,233 @@ func TestHandleTelemetrySingleNested(t *testing.T) { acc.AssertContainsTaggedFields(t, "nested", fields, tags) } +func TestHandleEmbeddedTags(t *testing.T) { + c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"extra": "type:model/extra"}, EmbeddedTags: []string{"type:model/extra/list/name"}} + acc := &testutil.Accumulator{} + c.Start(acc) + + telemetry := &telemetry.Telemetry{ + MsgTimestamp: 1543236572000, + EncodingPath: "type:model/extra", + NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "keys", + Fields: []*telemetry.TelemetryField{ + { + Name: "foo", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + }, + }, + }, + { + Name: "content", + Fields: []*telemetry.TelemetryField{ + { + Name: "list", + Fields: []*telemetry.TelemetryField{ + { + Name: "name", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "entry1"}, + }, + { + Name: "test", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, + }, + }, + }, + { + Name: "list", + Fields: []*telemetry.TelemetryField{ + { + Name: "name", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "entry2"}, + }, + { + Name: "test", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + }, + }, + }, + }, + }, + }, + }, + }, + } + data, _ := proto.Marshal(telemetry) + + c.handleTelemetry(data) + assert.Empty(t, acc.Errors) + + tags1 := map[string]string{"path": "type:model/extra", "foo": "bar", "source": "hostname", "subscription": "subscription", "list/name": "entry1"} + fields1 := map[string]interface{}{"list/test": "foo"} + tags2 := map[string]string{"path": "type:model/extra", "foo": "bar", "source": "hostname", "subscription": "subscription", "list/name": "entry2"} + fields2 := map[string]interface{}{"list/test": "bar"} + acc.AssertContainsTaggedFields(t, "extra", fields1, tags1) + acc.AssertContainsTaggedFields(t, "extra", fields2, tags2) +} + +func TestHandleNXAPI(t *testing.T) { + c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"nxapi": "show nxapi"}} + acc := &testutil.Accumulator{} + c.Start(acc) + + telemetry := &telemetry.Telemetry{ + MsgTimestamp: 1543236572000, + EncodingPath: "show nxapi", + NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "keys", + Fields: []*telemetry.TelemetryField{ + { + Name: "foo", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + }, + }, + }, + { + Name: "content", + Fields: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "TABLE_nxapi", + Fields: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "ROW_nxapi", + Fields: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "index", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "i1"}, + }, + { + Name: "value", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, + }, + }, + }, + { + Fields: []*telemetry.TelemetryField{ + { + Name: "index", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "i2"}, + }, + { + Name: "value", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + data, _ := proto.Marshal(telemetry) + + c.handleTelemetry(data) + assert.Empty(t, acc.Errors) + + tags1 := map[string]string{"path": "show nxapi", "foo": "bar", "TABLE_nxapi": "i1", "source": "hostname", "subscription": "subscription"} + fields1 := map[string]interface{}{"value": "foo"} + tags2 := map[string]string{"path": "show nxapi", "foo": "bar", "TABLE_nxapi": "i2", "source": "hostname", "subscription": "subscription"} + fields2 := map[string]interface{}{"value": "bar"} + acc.AssertContainsTaggedFields(t, "nxapi", fields1, tags1) + acc.AssertContainsTaggedFields(t, "nxapi", fields2, tags2) +} + +func TestHandleNXDME(t *testing.T) { + c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"dme": "sys/dme"}} + acc := &testutil.Accumulator{} + c.Start(acc) + + telemetry := &telemetry.Telemetry{ + MsgTimestamp: 1543236572000, + EncodingPath: "sys/dme", + NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "keys", + Fields: []*telemetry.TelemetryField{ + { + Name: "foo", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + }, + }, + }, + { + Name: "content", + Fields: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "fooEntity", + Fields: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "attributes", + Fields: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "rn", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "some-rn"}, + }, + { + Name: "value", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + data, _ := proto.Marshal(telemetry) + + c.handleTelemetry(data) + assert.Empty(t, acc.Errors) + + tags1 := map[string]string{"path": "sys/dme", "foo": "bar", "fooEntity": "some-rn", "source": "hostname", "subscription": "subscription"} + fields1 := map[string]interface{}{"value": "foo"} + acc.AssertContainsTaggedFields(t, "dme", fields1, tags1) +} + func TestTCPDialoutOverflow(t *testing.T) { c := &CiscoTelemetryMDT{Transport: "tcp", ServiceAddress: "127.0.0.1:57000"} acc := &testutil.Accumulator{}