diff --git a/Gopkg.lock b/Gopkg.lock index 76c5deb62..1f52f0087 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -219,6 +219,17 @@ revision = "2ea60e5f094469f9e65adb9cd103795b73ae743e" version = "v2.0.0" +[[projects]] + branch = "master" + digest = "1:ed5e77e0626ed76b7e7a2554bc4586aae768612381c5f62738f16a2dfa48763b" + name = "github.com/cisco-ie/nx-telemetry-proto" + packages = [ + "mdt_dialout", + "telemetry_bis", + ] + pruneopts = "" + revision = "82441e232cf6af9be0f808bf0c6421ee8519880e" + [[projects]] branch = "master" digest = "1:298e42868718da06fc0899ae8fdb99c48a14477045234c9274d81caa79af6a8f" @@ -858,6 +869,17 @@ revision = "eee57a3ac4174c55924125bb15eeeda8cffb6e6f" version = "v1.0.7" +[[projects]] + branch = "master" + digest = "1:06ee57a6252cc9c3a1650be9888e8df796d86947ec75bff7e2c4ac5689baa086" + name = "github.com/openconfig/gnmi" + packages = [ + "proto/gnmi", + "proto/gnmi_ext", + ] + pruneopts = "" + revision = "33a1865c302903e7a2e06f35960e6bc31e84b9f6" + [[projects]] digest = "1:5d9b668b0b4581a978f07e7d2e3314af18eb27b3fb5d19b70185b7c575723d11" name = "github.com/opencontainers/go-digest" @@ -1558,6 +1580,8 @@ "github.com/aws/aws-sdk-go/service/dynamodb", "github.com/aws/aws-sdk-go/service/kinesis", "github.com/bsm/sarama-cluster", + "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout", + "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis", "github.com/couchbase/go-couchbase", "github.com/denisenkom/go-mssqldb", "github.com/dgrijalva/jwt-go", @@ -1613,6 +1637,7 @@ "github.com/nats-io/gnatsd/server", "github.com/nats-io/go-nats", "github.com/nsqio/go-nsq", + "github.com/openconfig/gnmi/proto/gnmi", "github.com/openzipkin/zipkin-go-opentracing", "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore", "github.com/pkg/errors", @@ -1669,6 +1694,7 @@ "google.golang.org/grpc/codes", "google.golang.org/grpc/credentials", "google.golang.org/grpc/metadata", + "google.golang.org/grpc/peer", "google.golang.org/grpc/status", "gopkg.in/gorethink/gorethink.v3", "gopkg.in/ldap.v2", diff --git a/Gopkg.toml b/Gopkg.toml index 4e50eb11b..c817c1865 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -288,3 +288,11 @@ [[constraint]] name = "github.com/google/go-github" version = "24.0.1" + +[[constraint]] + branch = "master" + name = "github.com/openconfig/gnmi" + +[[constraint]] + branch = "master" + name = "github.com/cisco-ie/nx-telemetry-proto" diff --git a/README.md b/README.md index d3adf1e2d..60f349e47 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,8 @@ For documentation on the latest development code see the [documentation index][d * [ceph](./plugins/inputs/ceph) * [cgroup](./plugins/inputs/cgroup) * [chrony](./plugins/inputs/chrony) +* [cisco_telemetry_gnmi](./plugins/inputs/cisco_telemetry_gnmi) +* [cisco_telemetry_mdt](./plugins/inputs/cisco_telemetry_mdt) * [cloud_pubsub](./plugins/inputs/cloud_pubsub) Google Cloud Pub/Sub * [cloud_pubsub_push](./plugins/inputs/cloud_pubsub_push) Google Cloud Pub/Sub push endpoint * [conntrack](./plugins/inputs/conntrack) diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index e0531210e..17bac0a1a 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -16,6 +16,7 @@ following works: - github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE) - github.com/bsm/sarama-cluster [MIT License](https://github.com/bsm/sarama-cluster/blob/master/LICENSE) - github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE) +- github.com/cisco-ie/nx-telemetry-proto [Apache License 2.0](https://github.com/cisco-ie/nx-telemetry-proto/blob/master/LICENSE) - github.com/couchbase/go-couchbase [MIT License](https://github.com/couchbase/go-couchbase/blob/master/LICENSE) - github.com/couchbase/gomemcached [MIT License](https://github.com/couchbase/gomemcached/blob/master/LICENSE) - github.com/couchbase/goutils [COUCHBASE INC. COMMUNITY EDITION LICENSE](https://github.com/couchbase/goutils/blob/master/LICENSE.md) @@ -80,6 +81,7 @@ following works: - github.com/nats-io/go-nats [Apache License 2.0](https://github.com/nats-io/go-nats/blob/master/LICENSE) - github.com/nats-io/nuid [Apache License 2.0](https://github.com/nats-io/nuid/blob/master/LICENSE) - github.com/nsqio/go-nsq [MIT License](https://github.com/nsqio/go-nsq/blob/master/LICENSE) +- github.com/openconfig/gnmi [Apache License 2.0](https://github.com/openconfig/gnmi/blob/master/LICENSE) - github.com/opencontainers/go-digest [Apache License 2.0](https://github.com/opencontainers/go-digest/blob/master/LICENSE) - github.com/opencontainers/image-spec [Apache License 2.0](https://github.com/opencontainers/image-spec/blob/master/LICENSE) - github.com/opentracing-contrib/go-observer [Apache License 2.0](https://github.com/opentracing-contrib/go-observer/blob/master/LICENSE) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index a626fce92..ef032fe47 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -15,6 +15,8 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/ceph" _ "github.com/influxdata/telegraf/plugins/inputs/cgroup" _ "github.com/influxdata/telegraf/plugins/inputs/chrony" + _ "github.com/influxdata/telegraf/plugins/inputs/cisco_telemetry_gnmi" + _ "github.com/influxdata/telegraf/plugins/inputs/cisco_telemetry_mdt" _ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub" _ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub_push" _ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch" diff --git a/plugins/inputs/cisco_telemetry_gnmi/README.md b/plugins/inputs/cisco_telemetry_gnmi/README.md new file mode 100644 index 000000000..3e53cf0e5 --- /dev/null +++ b/plugins/inputs/cisco_telemetry_gnmi/README.md @@ -0,0 +1,75 @@ +# Cisco GNMI telemetry + +Cisco GNMI telemetry is an input plugin that consumes telemetry data similar to the [GNMI specification](https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md). +This GRPC-based protocol can utilize TLS for authentication and encryption. + +This plugin has been developed to support GNMI telemetry as produced by Cisco IOS XR (64-bit) version 6.5.1 and later. + + +### Configuration: + +This is a sample configuration for the plugin. + +```toml +[[inputs.cisco_telemetry_gnmi]] + ## Address and port of the GNMI GRPC server + addresses = ["10.49.234.114:57777"] + + ## define credentials + username = "cisco" + password = "cisco" + + ## GNMI encoding requested (one of: "proto", "json", "json_ietf") + # encoding = "proto" + + ## redial in case of failures after + redial = "10s" + + ## enable client-side TLS and define CA to authenticate the device + # enable_tls = true + # tls_ca = "/etc/telegraf/ca.pem" + # insecure_skip_verify = true + + ## define client-side TLS certificate & key to authenticate to the device + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## GNMI subscription prefix (optional, can usually be left empty) + ## See: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md#222-paths + # origin = "" + # prefix = "" + # target = "" + + ## Define additional aliases to map telemetry encoding paths to simple measurement names + # [inputs.cisco_telemetry_gnmi.aliases] + # ifcounters = "openconfig:/interfaces/interface/state/counters" + + [[inputs.cisco_telemetry_gnmi.subscription]] + ## Name of the measurement that will be emitted + name = "ifcounters" + + ## Origin and path of the subscription + ## See: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md#222-paths + ## + ## origin usually refers to a (YANG) data model implemented by the device + ## and path to a specific substructe inside it that should be subscribed to (similar to an XPath) + ## YANG models can be found e.g. here: https://github.com/YangModels/yang/tree/master/vendor/cisco/xr + origin = "openconfig-interfaces" + path = "/interfaces/interface/state/counters" + + # Subscription mode (one of: "target_defined", "sample", "on_change") and interval + subscription_mode = "sample" + sample_interval = "10s" + + ## Suppress redundant transmissions when measured values are unchanged + # suppress_redundant = false + + ## If suppression is enabled, send updates at least every X seconds anyway + # heartbeat_interval = "60s" +``` + +### Example Output +``` +ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=MgmtEth0/RP0/CPU0/0,source=10.49.234.115 in-multicast-pkts=0i,out-multicast-pkts=0i,out-errors=0i,out-discards=0i,in-broadcast-pkts=0i,out-broadcast-pkts=0i,in-discards=0i,in-unknown-protos=0i,in-errors=0i,out-unicast-pkts=0i,in-octets=0i,out-octets=0i,last-clear="2019-05-22T16:53:21Z",in-unicast-pkts=0i 1559145777425000000 +ifcounters,path=openconfig-interfaces:/interfaces/interface/state/counters,host=linux,name=GigabitEthernet0/0/0/0,source=10.49.234.115 out-multicast-pkts=0i,out-broadcast-pkts=0i,in-errors=0i,out-errors=0i,in-discards=0i,out-octets=0i,in-unknown-protos=0i,in-unicast-pkts=0i,in-octets=0i,in-multicast-pkts=0i,in-broadcast-pkts=0i,last-clear="2019-05-22T16:54:50Z",out-unicast-pkts=0i,out-discards=0i 1559145777425000000 +``` diff --git a/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi.go b/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi.go new file mode 100644 index 000000000..69495d6f6 --- /dev/null +++ b/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi.go @@ -0,0 +1,516 @@ +package cisco_telemetry_gnmi + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "log" + "net" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf/metric" + + "google.golang.org/grpc/credentials" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + internaltls "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" + jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" + "github.com/openconfig/gnmi/proto/gnmi" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// CiscoTelemetryGNMI plugin instance +type CiscoTelemetryGNMI struct { + Addresses []string `toml:"addresses"` + Subscriptions []Subscription `toml:"subscription"` + Aliases map[string]string `toml:"aliases"` + + // Optional subscription configuration + Encoding string + Origin string + Prefix string + Target string + UpdatesOnly bool `toml:"updates_only"` + + // Cisco IOS XR credentials + Username string + Password string + + // Redial + Redial internal.Duration + + // GRPC TLS settings + EnableTLS bool `toml:"enable_tls"` + internaltls.ClientConfig + + // Internal state + aliases map[string]string + acc telegraf.Accumulator + cancel context.CancelFunc + wg sync.WaitGroup +} + +// Subscription for a GNMI client +type Subscription struct { + Name string + Origin string + Path string + + // Subscription mode and interval + SubscriptionMode string `toml:"subscription_mode"` + SampleInterval internal.Duration `toml:"sample_interval"` + + // Duplicate suppression + SuppressRedundant bool `toml:"suppress_redundant"` + HeartbeatInterval internal.Duration `toml:"heartbeat_interval"` +} + +// Start the http listener service +func (c *CiscoTelemetryGNMI) Start(acc telegraf.Accumulator) error { + var err error + var ctx context.Context + var tlscfg *tls.Config + var request *gnmi.SubscribeRequest + c.acc = acc + ctx, c.cancel = context.WithCancel(context.Background()) + + // Validate configuration + if request, err = c.newSubscribeRequest(); err != nil { + return err + } else if c.Redial.Duration.Nanoseconds() <= 0 { + return fmt.Errorf("redial duration must be positive") + } + + // Parse TLS config + if c.EnableTLS { + if tlscfg, err = c.ClientConfig.TLSConfig(); err != nil { + return err + } + } + + if len(c.Username) > 0 { + ctx = metadata.AppendToOutgoingContext(ctx, "username", c.Username, "password", c.Password) + } + + // Invert explicit alias list and prefill subscription names + c.aliases = make(map[string]string, len(c.Subscriptions)+len(c.Aliases)) + for _, subscription := range c.Subscriptions { + path := subscription.Path + if len(subscription.Origin) > 0 { + path = subscription.Origin + ":" + path + } + + name := subscription.Name + if len(name) == 0 { + name = path[strings.LastIndexByte(path, '/')+1:] + } + if len(name) > 0 { + c.aliases[path] = name + } + } + for alias, path := range c.Aliases { + c.aliases[path] = alias + } + + // Create a goroutine for each device, dial and subscribe + c.wg.Add(len(c.Addresses)) + for _, addr := range c.Addresses { + go func(address string) { + defer c.wg.Done() + for ctx.Err() == nil { + if err := c.subscribeGNMI(ctx, address, tlscfg, request); err != nil && ctx.Err() == nil { + acc.AddError(err) + } + + select { + case <-ctx.Done(): + case <-time.After(c.Redial.Duration): + } + } + }(addr) + } + return nil +} + +// Create a new GNMI SubscribeRequest +func (c *CiscoTelemetryGNMI) newSubscribeRequest() (*gnmi.SubscribeRequest, error) { + // Create subscription objects + subscriptions := make([]*gnmi.Subscription, len(c.Subscriptions)) + for i, subscription := range c.Subscriptions { + gnmiPath, err := parsePath(subscription.Origin, subscription.Path, "") + if err != nil { + return nil, err + } + mode, ok := gnmi.SubscriptionMode_value[strings.ToUpper(subscription.SubscriptionMode)] + if !ok { + return nil, fmt.Errorf("invalid subscription mode %s", subscription.SubscriptionMode) + } + subscriptions[i] = &gnmi.Subscription{ + Path: gnmiPath, + Mode: gnmi.SubscriptionMode(mode), + SampleInterval: uint64(subscription.SampleInterval.Duration.Nanoseconds()), + SuppressRedundant: subscription.SuppressRedundant, + HeartbeatInterval: uint64(subscription.HeartbeatInterval.Duration.Nanoseconds()), + } + } + + // Construct subscribe request + gnmiPath, err := parsePath(c.Origin, c.Prefix, c.Target) + if err != nil { + return nil, err + } + + if c.Encoding != "proto" && c.Encoding != "json" && c.Encoding != "json_ietf" { + return nil, fmt.Errorf("unsupported encoding %s", c.Encoding) + } + + return &gnmi.SubscribeRequest{ + Request: &gnmi.SubscribeRequest_Subscribe{ + Subscribe: &gnmi.SubscriptionList{ + Prefix: gnmiPath, + Mode: gnmi.SubscriptionList_STREAM, + Encoding: gnmi.Encoding(gnmi.Encoding_value[strings.ToUpper(c.Encoding)]), + Subscription: subscriptions, + UpdatesOnly: c.UpdatesOnly, + }, + }, + }, nil +} + +// SubscribeGNMI and extract telemetry data +func (c *CiscoTelemetryGNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Config, request *gnmi.SubscribeRequest) error { + var opt grpc.DialOption + if tlscfg != nil { + opt = grpc.WithTransportCredentials(credentials.NewTLS(tlscfg)) + } else { + opt = grpc.WithInsecure() + } + + client, err := grpc.DialContext(ctx, address, opt) + if err != nil { + return fmt.Errorf("failed to dial: %v", err) + } + defer client.Close() + + subscribeClient, err := gnmi.NewGNMIClient(client).Subscribe(ctx) + if err != nil { + return fmt.Errorf("failed to setup subscription: %v", err) + } + + if err = subscribeClient.Send(request); err != nil { + return fmt.Errorf("failed to send subscription request: %v", err) + } + + log.Printf("D! [inputs.cisco_telemetry_gnmi]: Connection to GNMI device %s established", address) + defer log.Printf("D! [inputs.cisco_telemetry_gnmi]: Connection to GNMI device %s closed", address) + for ctx.Err() == nil { + var reply *gnmi.SubscribeResponse + if reply, err = subscribeClient.Recv(); err != nil { + if err != io.EOF && ctx.Err() == nil { + return fmt.Errorf("aborted GNMI subscription: %v", err) + } + break + } + + c.handleSubscribeResponse(address, reply) + } + return nil +} + +// HandleSubscribeResponse message from GNMI and parse contained telemetry data +func (c *CiscoTelemetryGNMI) handleSubscribeResponse(address string, reply *gnmi.SubscribeResponse) { + // Check if response is a GNMI Update and if we have a prefix to derive the measurement name + response, ok := reply.Response.(*gnmi.SubscribeResponse_Update) + if !ok { + return + } + + var prefix, prefixAliasPath string + grouper := metric.NewSeriesGrouper() + timestamp := time.Unix(0, response.Update.Timestamp) + prefixTags := make(map[string]string) + + if response.Update.Prefix != nil { + prefix, prefixAliasPath = c.handlePath(response.Update.Prefix, prefixTags, "") + } + prefixTags["source"], _, _ = net.SplitHostPort(address) + prefixTags["path"] = prefix + + // Parse individual Update message and create measurements + var name, lastAliasPath string + for _, update := range response.Update.Update { + // Prepare tags from prefix + tags := make(map[string]string, len(prefixTags)) + for key, val := range prefixTags { + tags[key] = val + } + aliasPath, fields := c.handleTelemetryField(update, tags, prefix) + + // Inherent valid alias from prefix parsing + if len(prefixAliasPath) > 0 && len(aliasPath) == 0 { + aliasPath = prefixAliasPath + } + + // Lookup alias if alias-path has changed + if aliasPath != lastAliasPath { + name = prefix + if alias, ok := c.aliases[aliasPath]; ok { + name = alias + } else { + log.Printf("D! [inputs.cisco_telemetry_gnmi]: No measurement alias for GNMI path: %s", name) + } + } + + // Group metrics + for key, val := range fields { + grouper.Add(name, tags, timestamp, key[len(aliasPath)+1:], val) + } + + lastAliasPath = aliasPath + } + + // Add grouped measurements + for _, metric := range grouper.Metrics() { + c.acc.AddMetric(metric) + } +} + +// HandleTelemetryField and add it to a measurement +func (c *CiscoTelemetryGNMI) handleTelemetryField(update *gnmi.Update, tags map[string]string, prefix string) (string, map[string]interface{}) { + path, aliasPath := c.handlePath(update.Path, tags, prefix) + + var value interface{} + var jsondata []byte + + switch val := update.Val.Value.(type) { + case *gnmi.TypedValue_AsciiVal: + value = val.AsciiVal + case *gnmi.TypedValue_BoolVal: + value = val.BoolVal + case *gnmi.TypedValue_BytesVal: + value = val.BytesVal + case *gnmi.TypedValue_DecimalVal: + value = val.DecimalVal + case *gnmi.TypedValue_FloatVal: + value = val.FloatVal + case *gnmi.TypedValue_IntVal: + value = val.IntVal + case *gnmi.TypedValue_StringVal: + value = val.StringVal + case *gnmi.TypedValue_UintVal: + value = val.UintVal + case *gnmi.TypedValue_JsonIetfVal: + jsondata = val.JsonIetfVal + case *gnmi.TypedValue_JsonVal: + jsondata = val.JsonVal + } + + name := strings.Replace(path, "-", "_", -1) + fields := make(map[string]interface{}) + if value != nil { + fields[name] = value + } else if jsondata != nil { + if err := json.Unmarshal(jsondata, &value); err != nil { + c.acc.AddError(fmt.Errorf("failed to parse JSON value: %v", err)) + } else { + flattener := jsonparser.JSONFlattener{Fields: fields} + flattener.FullFlattenJSON(name, value, true, true) + } + } + return aliasPath, fields +} + +// Parse path to path-buffer and tag-field +func (c *CiscoTelemetryGNMI) handlePath(path *gnmi.Path, tags map[string]string, prefix string) (string, string) { + var aliasPath string + builder := bytes.NewBufferString(prefix) + + // Prefix with origin + if len(path.Origin) > 0 { + builder.WriteString(path.Origin) + builder.WriteRune(':') + } + + // Parse generic keys from prefix + for _, elem := range path.Elem { + builder.WriteRune('/') + builder.WriteString(elem.Name) + name := builder.String() + + if _, exists := c.aliases[name]; exists { + aliasPath = name + } + + for key, val := range elem.Key { + key = strings.Replace(key, "-", "_", -1) + + // Use short-form of key if possible + if _, exists := tags[key]; exists { + tags[name+"/"+key] = val + } else { + tags[key] = val + } + } + } + + return builder.String(), aliasPath +} + +//ParsePath from XPath-like string to GNMI path structure +func parsePath(origin string, path string, target string) (*gnmi.Path, error) { + var err error + gnmiPath := gnmi.Path{Origin: origin, Target: target} + + if len(path) > 0 && path[0] != '/' { + return nil, fmt.Errorf("path does not start with a '/': %s", path) + } + + elem := &gnmi.PathElem{} + start, name, value, end := 0, -1, -1, -1 + + path = path + "/" + + for i := 0; i < len(path); i++ { + if path[i] == '[' { + if name >= 0 { + break + } + if end < 0 { + end = i + elem.Key = make(map[string]string) + } + name = i + 1 + } else if path[i] == '=' { + if name <= 0 || value >= 0 { + break + } + value = i + 1 + } else if path[i] == ']' { + if name <= 0 || value <= name { + break + } + elem.Key[path[name:value-1]] = strings.Trim(path[value:i], "'\"") + name, value = -1, -1 + } else if path[i] == '/' { + if name < 0 { + if end < 0 { + end = i + } + + if end > start { + elem.Name = path[start:end] + gnmiPath.Elem = append(gnmiPath.Elem, elem) + gnmiPath.Element = append(gnmiPath.Element, path[start:i]) + } + + start, name, value, end = i+1, -1, -1, -1 + elem = &gnmi.PathElem{} + } + } + } + + if name >= 0 || value >= 0 { + err = fmt.Errorf("Invalid GNMI path: %s", path) + } + + if err != nil { + return nil, err + } + + return &gnmiPath, nil +} + +// Stop listener and cleanup +func (c *CiscoTelemetryGNMI) Stop() { + c.cancel() + c.wg.Wait() +} + +const sampleConfig = ` + ## Address and port of the GNMI GRPC server + addresses = ["10.49.234.114:57777"] + + ## define credentials + username = "cisco" + password = "cisco" + + ## GNMI encoding requested (one of: "proto", "json", "json_ietf") + # encoding = "proto" + + ## redial in case of failures after + redial = "10s" + + ## enable client-side TLS and define CA to authenticate the device + # enable_tls = true + # tls_ca = "/etc/telegraf/ca.pem" + # insecure_skip_verify = true + + ## define client-side TLS certificate & key to authenticate to the device + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## GNMI subscription prefix (optional, can usually be left empty) + ## See: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md#222-paths + # origin = "" + # prefix = "" + # target = "" + + ## Define additional aliases to map telemetry encoding paths to simple measurement names + #[inputs.cisco_telemetry_gnmi.aliases] + # ifcounters = "openconfig:/interfaces/interface/state/counters" + + [[inputs.cisco_telemetry_gnmi.subscription]] + ## Name of the measurement that will be emitted + name = "ifcounters" + + ## Origin and path of the subscription + ## See: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md#222-paths + ## + ## origin usually refers to a (YANG) data model implemented by the device + ## and path to a specific substructe inside it that should be subscribed to (similar to an XPath) + ## YANG models can be found e.g. here: https://github.com/YangModels/yang/tree/master/vendor/cisco/xr + origin = "openconfig-interfaces" + path = "/interfaces/interface/state/counters" + + # Subscription mode (one of: "target_defined", "sample", "on_change") and interval + subscription_mode = "sample" + sample_interval = "10s" + + ## Suppress redundant transmissions when measured values are unchanged + # suppress_redundant = false + + ## If suppression is enabled, send updates at least every X seconds anyway + # heartbeat_interval = "60s" +` + +// SampleConfig of plugin +func (c *CiscoTelemetryGNMI) SampleConfig() string { + return sampleConfig +} + +// Description of plugin +func (c *CiscoTelemetryGNMI) Description() string { + return "Cisco GNMI telemetry input plugin based on GNMI telemetry data produced in IOS XR" +} + +// Gather plugin measurements (unused) +func (c *CiscoTelemetryGNMI) Gather(_ telegraf.Accumulator) error { + return nil +} + +func init() { + inputs.Add("cisco_telemetry_gnmi", func() telegraf.Input { + return &CiscoTelemetryGNMI{ + Encoding: "proto", + Redial: internal.Duration{Duration: 10 * time.Second}, + } + }) +} diff --git a/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi_test.go b/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi_test.go new file mode 100644 index 000000000..7e6b527b9 --- /dev/null +++ b/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi_test.go @@ -0,0 +1,247 @@ +package cisco_telemetry_gnmi + +import ( + "context" + "errors" + "fmt" + "net" + "testing" + "time" + + "google.golang.org/grpc/metadata" + + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "google.golang.org/grpc" + + "github.com/openconfig/gnmi/proto/gnmi" + "github.com/stretchr/testify/assert" +) + +func TestParsePath(t *testing.T) { + path := "/foo/bar/bla[shoo=woo][shoop=/woop/]/z" + parsed, err := parsePath("theorigin", path, "thetarget") + + assert.Nil(t, err) + assert.Equal(t, parsed.Origin, "theorigin") + assert.Equal(t, parsed.Target, "thetarget") + assert.Equal(t, parsed.Element, []string{"foo", "bar", "bla[shoo=woo][shoop=/woop/]", "z"}) + assert.Equal(t, parsed.Elem, []*gnmi.PathElem{{Name: "foo"}, {Name: "bar"}, + {Name: "bla", Key: map[string]string{"shoo": "woo", "shoop": "/woop/"}}, {Name: "z"}}) + + parsed, err = parsePath("", "", "") + assert.Nil(t, err) + assert.Equal(t, *parsed, gnmi.Path{}) + + parsed, err = parsePath("", "/foo[[", "") + assert.Nil(t, parsed) + assert.Equal(t, errors.New("Invalid GNMI path: /foo[[/"), err) +} + +type mockGNMIServer struct { + t *testing.T + acc *testutil.Accumulator + server *grpc.Server + scenario int +} + +func (m *mockGNMIServer) Capabilities(context.Context, *gnmi.CapabilityRequest) (*gnmi.CapabilityResponse, error) { + return nil, nil +} + +func (m *mockGNMIServer) Get(context.Context, *gnmi.GetRequest) (*gnmi.GetResponse, error) { + return nil, nil +} + +func (m *mockGNMIServer) Set(context.Context, *gnmi.SetRequest) (*gnmi.SetResponse, error) { + return nil, nil +} + +func (m *mockGNMIServer) Subscribe(server gnmi.GNMI_SubscribeServer) error { + // Avoid race conditions + go func() { + if m.scenario == 0 { + m.acc.WaitError(1) + } else if m.scenario == 1 || m.scenario == 3 { + m.acc.Wait(4) + } else if m.scenario == 2 { + m.acc.Wait(2) + } + if m.scenario >= 0 { + m.server.Stop() + } + }() + + metadata, ok := metadata.FromIncomingContext(server.Context()) + assert.Equal(m.t, ok, true) + assert.Equal(m.t, metadata.Get("username"), []string{"theuser"}) + assert.Equal(m.t, metadata.Get("password"), []string{"thepassword"}) + + switch m.scenario { + case 0: + return fmt.Errorf("testerror") + case 1: + notification := mockGNMINotification() + server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) + server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}}) + notification.Prefix.Elem[0].Key["foo"] = "bar2" + notification.Update[0].Path.Elem[1].Key["name"] = "str2" + notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_JsonVal{JsonVal: []byte{'"', '1', '2', '3', '"'}}} + server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) + return nil + case 2: + notification := mockGNMINotification() + server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) + return nil + case 3: + notification := mockGNMINotification() + notification.Prefix.Elem[0].Key["foo"] = "bar2" + notification.Update[0].Path.Elem[1].Key["name"] = "str2" + notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_BoolVal{BoolVal: false}} + server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) + return nil + default: + return fmt.Errorf("test not implemented ;)") + } +} + +func TestGNMIError(t *testing.T) { + listener, _ := net.Listen("tcp", "127.0.0.1:57003") + server := grpc.NewServer() + acc := &testutil.Accumulator{} + gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 0, server: server, acc: acc}) + + c := &CiscoTelemetryGNMI{Addresses: []string{"127.0.0.1:57003"}, + Username: "theuser", Password: "thepassword", Encoding: "proto", + Redial: internal.Duration{Duration: 1 * time.Second}} + + assert.Nil(t, c.Start(acc)) + server.Serve(listener) + c.Stop() + + assert.Contains(t, acc.Errors, errors.New("aborted GNMI subscription: rpc error: code = Unknown desc = testerror")) +} + +func mockGNMINotification() *gnmi.Notification { + return &gnmi.Notification{ + Timestamp: 1543236572000000000, + Prefix: &gnmi.Path{ + Origin: "type", + Elem: []*gnmi.PathElem{ + { + Name: "model", + Key: map[string]string{"foo": "bar"}, + }, + }, + Target: "subscription", + }, + Update: []*gnmi.Update{ + { + Path: &gnmi.Path{ + Elem: []*gnmi.PathElem{ + {Name: "some"}, + { + Name: "path", + Key: map[string]string{"name": "str", "uint64": "1234"}}, + }, + }, + Val: &gnmi.TypedValue{Value: &gnmi.TypedValue_IntVal{IntVal: 5678}}, + }, + { + Path: &gnmi.Path{ + Elem: []*gnmi.PathElem{ + {Name: "other"}, + {Name: "path"}, + }, + }, + Val: &gnmi.TypedValue{Value: &gnmi.TypedValue_StringVal{StringVal: "foobar"}}, + }, + { + Path: &gnmi.Path{ + Elem: []*gnmi.PathElem{ + {Name: "other"}, + {Name: "this"}, + }, + }, + Val: &gnmi.TypedValue{Value: &gnmi.TypedValue_StringVal{StringVal: "that"}}, + }, + }, + } +} + +func TestGNMIMultiple(t *testing.T) { + listener, _ := net.Listen("tcp", "127.0.0.1:57004") + server := grpc.NewServer() + acc := &testutil.Accumulator{} + gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 1, server: server, acc: acc}) + + c := &CiscoTelemetryGNMI{Addresses: []string{"127.0.0.1:57004"}, + Username: "theuser", Password: "thepassword", Encoding: "proto", + Redial: internal.Duration{Duration: 1 * time.Second}, + Subscriptions: []Subscription{{Name: "alias", Origin: "type", Path: "/model", SubscriptionMode: "sample"}}, + } + + assert.Nil(t, c.Start(acc)) + + server.Serve(listener) + c.Stop() + + assert.Empty(t, acc.Errors) + + tags := map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar", "name": "str", "uint64": "1234"} + fields := map[string]interface{}{"some/path": int64(5678)} + acc.AssertContainsTaggedFields(t, "alias", fields, tags) + + tags = map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar"} + fields = map[string]interface{}{"other/path": "foobar", "other/this": "that"} + acc.AssertContainsTaggedFields(t, "alias", fields, tags) + + tags = map[string]string{"path": "type:/model", "foo": "bar2", "source": "127.0.0.1", "name": "str2", "uint64": "1234"} + fields = map[string]interface{}{"some/path": "123"} + acc.AssertContainsTaggedFields(t, "alias", fields, tags) + + tags = map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar2"} + fields = map[string]interface{}{"other/path": "foobar", "other/this": "that"} + acc.AssertContainsTaggedFields(t, "alias", fields, tags) +} + +func TestGNMIMultipleRedial(t *testing.T) { + listener, _ := net.Listen("tcp", "127.0.0.1:57004") + server := grpc.NewServer() + acc := &testutil.Accumulator{} + gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 2, server: server, acc: acc}) + + c := &CiscoTelemetryGNMI{Addresses: []string{"127.0.0.1:57004"}, + Username: "theuser", Password: "thepassword", Encoding: "proto", + Redial: internal.Duration{Duration: 500 * time.Millisecond}, + Subscriptions: []Subscription{{Name: "alias", Origin: "type", Path: "/model", SubscriptionMode: "sample"}}, + } + + assert.Nil(t, c.Start(acc)) + server.Serve(listener) + + listener, _ = net.Listen("tcp", "127.0.0.1:57004") + server = grpc.NewServer() + gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 3, server: server, acc: acc}) + + server.Serve(listener) + c.Stop() + + assert.Empty(t, acc.Errors) + + tags := map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar", "name": "str", "uint64": "1234"} + fields := map[string]interface{}{"some/path": int64(5678)} + acc.AssertContainsTaggedFields(t, "alias", fields, tags) + + tags = map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar"} + fields = map[string]interface{}{"other/path": "foobar", "other/this": "that"} + acc.AssertContainsTaggedFields(t, "alias", fields, tags) + + tags = map[string]string{"path": "type:/model", "foo": "bar2", "source": "127.0.0.1", "name": "str2", "uint64": "1234"} + fields = map[string]interface{}{"some/path": false} + acc.AssertContainsTaggedFields(t, "alias", fields, tags) + + tags = map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar2"} + fields = map[string]interface{}{"other/path": "foobar", "other/this": "that"} + acc.AssertContainsTaggedFields(t, "alias", fields, tags) +} diff --git a/plugins/inputs/cisco_telemetry_mdt/README.md b/plugins/inputs/cisco_telemetry_mdt/README.md new file mode 100644 index 000000000..4672f4036 --- /dev/null +++ b/plugins/inputs/cisco_telemetry_mdt/README.md @@ -0,0 +1,41 @@ +# Cisco model-driven telemetry (MDT) + +Cisco model-driven telemetry (MDT) is an input plugin that consumes +telemetry data from Cisco IOS XR, IOS XE and NX-OS platforms. It supports TCP & GRPC dialout transports. +GRPC-based transport can utilize TLS for authentication and encryption. +Telemetry data is expected to be GPB-KV (self-describing-gpb) encoded. + +The GRPC dialout transport is supported on various IOS XR (64-bit) 6.1.x and later, IOS XE 16.10 and later, as well as NX-OS 7.x and later platforms. + +The TCP dialout transport is supported on IOS XR (32-bit and 64-bit) 6.1.x and later. + + +### Configuration: + +This is a sample configuration for the plugin. + +```toml +[[inputs.cisco_telemetry_mdt]] + ## Telemetry transport (one of: tcp, grpc) + transport = "grpc" + + ## Address and port to host telemetry listener + service_address = ":57000" + + ## Enable TLS for GRPC transport + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Enable TLS client authentication and define allowed CA certificates + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Define aliases to map telemetry encoding paths to simple measurement names + [inputs.cisco_telemetry_mdt.aliases] + ifstats = "ietf-interfaces:interfaces-state/interface/statistics" +``` + +### Example Output: +``` +ifstats,path=ietf-interfaces:interfaces-state/interface/statistics,host=linux,name=GigabitEthernet2,source=csr1kv,subscription=101 in-unicast-pkts=27i,in-multicast-pkts=0i,discontinuity-time="2019-05-23T07:40:23.000362+00:00",in-octets=5233i,in-errors=0i,out-multicast-pkts=0i,out-discards=0i,in-broadcast-pkts=0i,in-discards=0i,in-unknown-protos=0i,out-unicast-pkts=0i,out-broadcast-pkts=0i,out-octets=0i,out-errors=0i 1559150462624000000 +ifstats,path=ietf-interfaces:interfaces-state/interface/statistics,host=linux,name=GigabitEthernet1,source=csr1kv,subscription=101 in-octets=3394770806i,in-broadcast-pkts=0i,in-multicast-pkts=0i,out-broadcast-pkts=0i,in-unknown-protos=0i,out-octets=350212i,in-unicast-pkts=9477273i,in-discards=0i,out-unicast-pkts=2726i,out-discards=0i,discontinuity-time="2019-05-23T07:40:23.000363+00:00",in-errors=30i,out-multicast-pkts=0i,out-errors=0i 1559150462624000000 +``` \ No newline at end of file diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go new file mode 100644 index 000000000..fc018a31e --- /dev/null +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go @@ -0,0 +1,391 @@ +package cisco_telemetry_mdt + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "log" + "net" + "strings" + "sync" + "time" + + dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout" + telemetry "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis" + "github.com/golang/protobuf/proto" + "github.com/influxdata/telegraf" + internaltls "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/peer" +) + +const ( + // Maximum telemetry payload size (in bytes) to accept for GRPC dialout transport + tcpMaxMsgLen uint32 = 1024 * 1024 +) + +// CiscoTelemetryMDT plugin for IOS XR, IOS XE and NXOS platforms +type CiscoTelemetryMDT struct { + // Common configuration + Transport string + ServiceAddress string `toml:"service_address"` + MaxMsgSize int `toml:"max_msg_size"` + Aliases map[string]string `toml:"aliases"` + + // GRPC TLS settings + internaltls.ServerConfig + + // Internal listener / client handle + grpcServer *grpc.Server + listener net.Listener + + // Internal state + aliases map[string]string + acc telegraf.Accumulator + wg sync.WaitGroup +} + +// Start the Cisco MDT service +func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error { + var err error + c.acc = acc + c.listener, err = net.Listen("tcp", c.ServiceAddress) + if err != nil { + return err + } + + // Invert aliases list + c.aliases = make(map[string]string, len(c.Aliases)) + for alias, path := range c.Aliases { + c.aliases[path] = alias + } + + switch c.Transport { + case "tcp": + // TCP dialout server accept routine + c.wg.Add(1) + go func() { + c.acceptTCPClients() + c.wg.Done() + }() + + case "grpc": + var opts []grpc.ServerOption + tlsConfig, err := c.ServerConfig.TLSConfig() + if err != nil { + return err + } else if tlsConfig != nil { + opts = append(opts, grpc.Creds(credentials.NewTLS(tlsConfig))) + } + + if c.MaxMsgSize > 0 { + opts = append(opts, grpc.MaxRecvMsgSize(c.MaxMsgSize)) + } + + c.grpcServer = grpc.NewServer(opts...) + dialout.RegisterGRPCMdtDialoutServer(c.grpcServer, c) + + c.wg.Add(1) + go func() { + c.grpcServer.Serve(c.listener) + c.wg.Done() + }() + + default: + c.listener.Close() + return fmt.Errorf("invalid Cisco MDT transport: %s", c.Transport) + } + + return nil +} + +// AcceptTCPDialoutClients defines the TCP dialout server main routine +func (c *CiscoTelemetryMDT) acceptTCPClients() { + // Keep track of all active connections, so we can close them if necessary + var mutex sync.Mutex + clients := make(map[net.Conn]struct{}) + + for { + conn, err := c.listener.Accept() + if neterr, ok := err.(*net.OpError); ok && (neterr.Timeout() || neterr.Temporary()) { + continue + } else if err != nil { + break // Stop() will close the connection so Accept() will fail here + } + + mutex.Lock() + clients[conn] = struct{}{} + mutex.Unlock() + + // Individual client connection routine + c.wg.Add(1) + go func() { + log.Printf("D! [inputs.cisco_telemetry_mdt]: Accepted Cisco MDT TCP dialout connection from %s", conn.RemoteAddr()) + if err := c.handleTCPClient(conn); err != nil { + c.acc.AddError(err) + } + log.Printf("D! [inputs.cisco_telemetry_mdt]: Closed Cisco MDT TCP dialout connection from %s", conn.RemoteAddr()) + + mutex.Lock() + delete(clients, conn) + mutex.Unlock() + + conn.Close() + c.wg.Done() + }() + } + + // Close all remaining client connections + mutex.Lock() + for client := range clients { + if err := client.Close(); err != nil { + log.Printf("E! [inputs.cisco_telemetry_mdt]: Failed to close TCP dialout client: %v", err) + } + } + mutex.Unlock() +} + +// Handle a TCP telemetry client +func (c *CiscoTelemetryMDT) handleTCPClient(conn net.Conn) error { + // TCP Dialout telemetry framing header + var hdr struct { + MsgType uint16 + MsgEncap uint16 + MsgHdrVersion uint16 + MsgFlags uint16 + MsgLen uint32 + } + + var payload bytes.Buffer + + for { + // Read and validate dialout telemetry header + if err := binary.Read(conn, binary.BigEndian, &hdr); err != nil { + return err + } + + maxMsgSize := tcpMaxMsgLen + if c.MaxMsgSize > 0 { + maxMsgSize = uint32(c.MaxMsgSize) + } + + if hdr.MsgLen > maxMsgSize { + return fmt.Errorf("dialout packet too long: %v", hdr.MsgLen) + } else if hdr.MsgFlags != 0 { + return fmt.Errorf("invalid dialout flags: %v", hdr.MsgFlags) + } + + // Read and handle telemetry packet + payload.Reset() + if size, err := payload.ReadFrom(io.LimitReader(conn, int64(hdr.MsgLen))); size != int64(hdr.MsgLen) { + if err != nil { + return err + } + return fmt.Errorf("TCP dialout premature EOF") + } + + c.handleTelemetry(payload.Bytes()) + } +} + +// MdtDialout RPC server method for grpc-dialout transport +func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutServer) error { + peer, peerOK := peer.FromContext(stream.Context()) + if peerOK { + log.Printf("D! [inputs.cisco_telemetry_mdt]: Accepted Cisco MDT GRPC dialout connection from %s", peer.Addr) + } + + for { + packet, err := stream.Recv() + if err != nil { + if err != io.EOF { + c.acc.AddError(fmt.Errorf("GRPC dialout receive error: %v", err)) + } + break + } + + if len(packet.Data) == 0 && len(packet.Errors) != 0 { + c.acc.AddError(fmt.Errorf("GRPC dialout error: %s", packet.Errors)) + break + } + + c.handleTelemetry(packet.Data) + } + + if peerOK { + log.Printf("D! [inputs.cisco_telemetry_mdt]: Closed Cisco MDT GRPC dialout connection from %s", peer.Addr) + } + + return nil +} + +// Handle telemetry packet from any transport, decode and add as measurement +func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) { + var namebuf bytes.Buffer + telemetry := &telemetry.Telemetry{} + err := proto.Unmarshal(data, telemetry) + if err != nil { + c.acc.AddError(fmt.Errorf("Cisco MDT failed to decode: %v", err)) + return + } + + for _, gpbkv := range telemetry.DataGpbkv { + var fields map[string]interface{} + + // Produce metadata tags + var tags map[string]string + + // Top-level field may have measurement timestamp, if not use message timestamp + measured := gpbkv.Timestamp + if measured == 0 { + measured = telemetry.MsgTimestamp + } + + timestamp := time.Unix(int64(measured/1000), int64(measured%1000)*1000000) + + // Populate tags and fields from toplevel GPBKV fields "keys" and "content" + for _, field := range gpbkv.Fields { + switch field.Name { + case "keys": + tags = make(map[string]string, len(field.Fields)+2) + tags["source"] = telemetry.GetNodeIdStr() + tags["subscription"] = telemetry.GetSubscriptionIdStr() + for _, subfield := range field.Fields { + c.parseGPBKVField(subfield, &namebuf, telemetry.EncodingPath, timestamp, tags, nil) + } + case "content": + fields = make(map[string]interface{}, len(field.Fields)) + for _, subfield := range field.Fields { + c.parseGPBKVField(subfield, &namebuf, telemetry.EncodingPath, timestamp, tags, fields) + } + default: + log.Printf("I! [inputs.cisco_telemetry_mdt]: Unexpected top-level MDT field: %s", field.Name) + } + } + + // Find best alias for encoding path and emit measurement + if len(fields) > 0 && len(tags) > 0 && len(telemetry.EncodingPath) > 0 { + name := telemetry.EncodingPath + if alias, ok := c.aliases[name]; ok { + tags["path"] = name + name = alias + } else { + log.Printf("D! [inputs.cisco_telemetry_mdt]: No measurement alias for encoding path: %s", name) + } + c.acc.AddFields(name, fields, tags, timestamp) + } else { + c.acc.AddError(fmt.Errorf("empty encoding path or measurement")) + } + } +} + +// Recursively parse GPBKV field structure into fields or tags +func (c *CiscoTelemetryMDT) parseGPBKVField(field *telemetry.TelemetryField, namebuf *bytes.Buffer, + path string, timestamp time.Time, tags map[string]string, fields map[string]interface{}) { + + namelen := namebuf.Len() + if namelen > 0 { + namebuf.WriteRune('/') + } + namebuf.WriteString(strings.Replace(field.Name, "-", "_", -1)) + + // Decode Telemetry field value if set + var value interface{} + switch val := field.ValueByType.(type) { + case *telemetry.TelemetryField_BytesValue: + value = val.BytesValue + case *telemetry.TelemetryField_StringValue: + value = val.StringValue + case *telemetry.TelemetryField_BoolValue: + value = val.BoolValue + case *telemetry.TelemetryField_Uint32Value: + value = val.Uint32Value + case *telemetry.TelemetryField_Uint64Value: + value = val.Uint64Value + case *telemetry.TelemetryField_Sint32Value: + value = val.Sint32Value + case *telemetry.TelemetryField_Sint64Value: + value = val.Sint64Value + case *telemetry.TelemetryField_DoubleValue: + value = val.DoubleValue + case *telemetry.TelemetryField_FloatValue: + value = val.FloatValue + } + + if value != nil { + // Distinguish between tags (keys) and fields (data) to write to + if fields != nil { + fields[namebuf.String()] = value + } else { + if _, exists := tags[field.Name]; !exists { // Use short keys whenever possible + tags[field.Name] = fmt.Sprint(value) + } else { + tags[namebuf.String()] = fmt.Sprint(value) + } + } + } + + for _, subfield := range field.Fields { + c.parseGPBKVField(subfield, namebuf, path, timestamp, tags, fields) + } + + namebuf.Truncate(namelen) +} + +// Stop listener and cleanup +func (c *CiscoTelemetryMDT) Stop() { + if c.grpcServer != nil { + // Stop server and terminate all running dialout routines + c.grpcServer.Stop() + } + if c.listener != nil { + c.listener.Close() + } + c.wg.Wait() +} + +const sampleConfig = ` + ## Telemetry transport (one of: tcp, grpc) + transport = "grpc" + + ## Address and port to host telemetry listener + service_address = ":57000" + + ## Enable TLS for GRPC transport + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Enable TLS client authentication and define allowed CA certificates + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Define aliases to map telemetry encoding paths to simple measurement names + [inputs.cisco_telemetry_mdt.aliases] + ifstats = "ietf-interfaces:interfaces-state/interface/statistics" +` + +// SampleConfig of plugin +func (c *CiscoTelemetryMDT) SampleConfig() string { + return sampleConfig +} + +// Description of plugin +func (c *CiscoTelemetryMDT) Description() string { + return "Cisco model-driven telemetry (MDT) input plugin for IOS XR, IOS XE and NX-OS platforms" +} + +// Gather plugin measurements (unused) +func (c *CiscoTelemetryMDT) Gather(_ telegraf.Accumulator) error { + return nil +} + +func init() { + inputs.Add("cisco_telemetry_mdt", func() telegraf.Input { + return &CiscoTelemetryMDT{ + Transport: "grpc", + ServiceAddress: "127.0.0.1:57000", + } + }) +} diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go new file mode 100644 index 000000000..d2c686c69 --- /dev/null +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go @@ -0,0 +1,362 @@ +package cisco_telemetry_mdt + +import ( + "context" + "encoding/binary" + "errors" + "net" + "testing" + + "github.com/golang/protobuf/proto" + + dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout" + telemetry "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis" + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" +) + +func TestHandleTelemetryEmpty(t *testing.T) { + c := &CiscoTelemetryMDT{Transport: "dummy"} + acc := &testutil.Accumulator{} + c.Start(acc) + + telemetry := &telemetry.Telemetry{ + DataGpbkv: []*telemetry.TelemetryField{ + {}, + }, + } + data, _ := proto.Marshal(telemetry) + + c.handleTelemetry(data) + assert.Contains(t, acc.Errors, errors.New("empty encoding path or measurement")) + assert.Empty(t, acc.Metrics) +} + +func TestHandleTelemetryTwoSimple(t *testing.T) { + c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"alias": "type:model/some/path"}} + acc := &testutil.Accumulator{} + c.Start(acc) + + telemetry := &telemetry.Telemetry{ + MsgTimestamp: 1543236572000, + EncodingPath: "type:model/some/path", + NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "keys", + Fields: []*telemetry.TelemetryField{ + { + Name: "name", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "str"}, + }, + { + Name: "uint64", + ValueByType: &telemetry.TelemetryField_Uint64Value{Uint64Value: 1234}, + }, + }, + }, + { + Name: "content", + Fields: []*telemetry.TelemetryField{ + { + Name: "bool", + ValueByType: &telemetry.TelemetryField_BoolValue{BoolValue: true}, + }, + }, + }, + }, + }, + { + Fields: []*telemetry.TelemetryField{ + { + Name: "keys", + Fields: []*telemetry.TelemetryField{ + { + Name: "name", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "str2"}, + }, + }, + }, + { + Name: "content", + Fields: []*telemetry.TelemetryField{ + { + Name: "bool", + ValueByType: &telemetry.TelemetryField_BoolValue{BoolValue: false}, + }, + }, + }, + }, + }, + }, + } + data, _ := proto.Marshal(telemetry) + + c.handleTelemetry(data) + assert.Empty(t, acc.Errors) + + tags := map[string]string{"path": "type:model/some/path", "name": "str", "uint64": "1234", "source": "hostname", "subscription": "subscription"} + fields := map[string]interface{}{"bool": true} + acc.AssertContainsTaggedFields(t, "alias", fields, tags) + + tags = map[string]string{"path": "type:model/some/path", "name": "str2", "source": "hostname", "subscription": "subscription"} + fields = map[string]interface{}{"bool": false} + acc.AssertContainsTaggedFields(t, "alias", fields, tags) +} + +func TestHandleTelemetrySingleNested(t *testing.T) { + c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"nested": "type:model/nested/path"}} + acc := &testutil.Accumulator{} + c.Start(acc) + + telemetry := &telemetry.Telemetry{ + MsgTimestamp: 1543236572000, + EncodingPath: "type:model/nested/path", + NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "keys", + Fields: []*telemetry.TelemetryField{ + { + Name: "nested", + Fields: []*telemetry.TelemetryField{ + { + Name: "key", + Fields: []*telemetry.TelemetryField{ + { + Name: "level", + ValueByType: &telemetry.TelemetryField_DoubleValue{DoubleValue: 3}, + }, + }, + }, + }, + }, + }, + }, + { + Name: "content", + Fields: []*telemetry.TelemetryField{ + { + Name: "nested", + Fields: []*telemetry.TelemetryField{ + { + Name: "value", + Fields: []*telemetry.TelemetryField{ + { + Name: "foo", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + data, _ := proto.Marshal(telemetry) + + c.handleTelemetry(data) + assert.Empty(t, acc.Errors) + + tags := map[string]string{"path": "type:model/nested/path", "level": "3", "source": "hostname", "subscription": "subscription"} + fields := map[string]interface{}{"nested/value/foo": "bar"} + acc.AssertContainsTaggedFields(t, "nested", fields, tags) +} + +func TestTCPDialoutOverflow(t *testing.T) { + c := &CiscoTelemetryMDT{Transport: "tcp", ServiceAddress: "127.0.0.1:57000"} + acc := &testutil.Accumulator{} + assert.Nil(t, c.Start(acc)) + + hdr := struct { + MsgType uint16 + MsgEncap uint16 + MsgHdrVersion uint16 + MsgFlags uint16 + MsgLen uint32 + }{MsgLen: uint32(1000000000)} + + conn, _ := net.Dial("tcp", "127.0.0.1:57000") + binary.Write(conn, binary.BigEndian, hdr) + conn.Read([]byte{0}) + conn.Close() + + c.Stop() + + assert.Contains(t, acc.Errors, errors.New("dialout packet too long: 1000000000")) +} + +func mockTelemetryMessage() *telemetry.Telemetry { + return &telemetry.Telemetry{ + MsgTimestamp: 1543236572000, + EncodingPath: "type:model/some/path", + NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "keys", + Fields: []*telemetry.TelemetryField{ + { + Name: "name", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "str"}, + }, + }, + }, + { + Name: "content", + Fields: []*telemetry.TelemetryField{ + { + Name: "value", + ValueByType: &telemetry.TelemetryField_Sint64Value{Sint64Value: -1}, + }, + }, + }, + }, + }, + }, + } +} + +func TestTCPDialoutMultiple(t *testing.T) { + c := &CiscoTelemetryMDT{Transport: "tcp", ServiceAddress: "127.0.0.1:57000", Aliases: map[string]string{ + "some": "type:model/some/path", "parallel": "type:model/parallel/path", "other": "type:model/other/path"}} + acc := &testutil.Accumulator{} + assert.Nil(t, c.Start(acc)) + + telemetry := mockTelemetryMessage() + + hdr := struct { + MsgType uint16 + MsgEncap uint16 + MsgHdrVersion uint16 + MsgFlags uint16 + MsgLen uint32 + }{} + + conn, _ := net.Dial("tcp", "127.0.0.1:57000") + + data, _ := proto.Marshal(telemetry) + hdr.MsgLen = uint32(len(data)) + binary.Write(conn, binary.BigEndian, hdr) + conn.Write(data) + + conn2, _ := net.Dial("tcp", "127.0.0.1:57000") + telemetry.EncodingPath = "type:model/parallel/path" + data, _ = proto.Marshal(telemetry) + hdr.MsgLen = uint32(len(data)) + binary.Write(conn2, binary.BigEndian, hdr) + conn2.Write(data) + conn2.Write([]byte{0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0}) + conn2.Read([]byte{0}) + conn2.Close() + + telemetry.EncodingPath = "type:model/other/path" + data, _ = proto.Marshal(telemetry) + hdr.MsgLen = uint32(len(data)) + binary.Write(conn, binary.BigEndian, hdr) + conn.Write(data) + conn.Write([]byte{0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0}) + conn.Read([]byte{0}) + c.Stop() + conn.Close() + + // We use the invalid dialout flags to let the server close the connection + assert.Equal(t, acc.Errors, []error{errors.New("invalid dialout flags: 257"), errors.New("invalid dialout flags: 257")}) + + tags := map[string]string{"path": "type:model/some/path", "name": "str", "source": "hostname", "subscription": "subscription"} + fields := map[string]interface{}{"value": int64(-1)} + acc.AssertContainsTaggedFields(t, "some", fields, tags) + + tags = map[string]string{"path": "type:model/parallel/path", "name": "str", "source": "hostname", "subscription": "subscription"} + fields = map[string]interface{}{"value": int64(-1)} + acc.AssertContainsTaggedFields(t, "parallel", fields, tags) + + tags = map[string]string{"path": "type:model/other/path", "name": "str", "source": "hostname", "subscription": "subscription"} + fields = map[string]interface{}{"value": int64(-1)} + acc.AssertContainsTaggedFields(t, "other", fields, tags) +} + +func TestGRPCDialoutError(t *testing.T) { + c := &CiscoTelemetryMDT{Transport: "grpc", ServiceAddress: "127.0.0.1:57001"} + acc := &testutil.Accumulator{} + assert.Nil(t, c.Start(acc)) + + conn, _ := grpc.Dial("127.0.0.1:57001", grpc.WithInsecure()) + client := dialout.NewGRPCMdtDialoutClient(conn) + stream, _ := client.MdtDialout(context.Background()) + + args := &dialout.MdtDialoutArgs{Errors: "foobar"} + stream.Send(args) + + // Wait for the server to close + stream.Recv() + c.Stop() + + assert.Equal(t, acc.Errors, []error{errors.New("GRPC dialout error: foobar")}) +} + +func TestGRPCDialoutMultiple(t *testing.T) { + c := &CiscoTelemetryMDT{Transport: "grpc", ServiceAddress: "127.0.0.1:57001", Aliases: map[string]string{ + "some": "type:model/some/path", "parallel": "type:model/parallel/path", "other": "type:model/other/path"}} + acc := &testutil.Accumulator{} + assert.Nil(t, c.Start(acc)) + telemetry := mockTelemetryMessage() + + conn, _ := grpc.Dial("127.0.0.1:57001", grpc.WithInsecure(), grpc.WithBlock()) + client := dialout.NewGRPCMdtDialoutClient(conn) + stream, _ := client.MdtDialout(context.TODO()) + + data, _ := proto.Marshal(telemetry) + args := &dialout.MdtDialoutArgs{Data: data, ReqId: 456} + stream.Send(args) + + conn2, _ := grpc.Dial("127.0.0.1:57001", grpc.WithInsecure(), grpc.WithBlock()) + client2 := dialout.NewGRPCMdtDialoutClient(conn2) + stream2, _ := client2.MdtDialout(context.TODO()) + + telemetry.EncodingPath = "type:model/parallel/path" + data, _ = proto.Marshal(telemetry) + args = &dialout.MdtDialoutArgs{Data: data} + stream2.Send(args) + stream2.Send(&dialout.MdtDialoutArgs{Errors: "testclose"}) + stream2.Recv() + conn2.Close() + + telemetry.EncodingPath = "type:model/other/path" + data, _ = proto.Marshal(telemetry) + args = &dialout.MdtDialoutArgs{Data: data} + stream.Send(args) + stream.Send(&dialout.MdtDialoutArgs{Errors: "testclose"}) + stream.Recv() + + c.Stop() + conn.Close() + + assert.Equal(t, acc.Errors, []error{errors.New("GRPC dialout error: testclose"), errors.New("GRPC dialout error: testclose")}) + + tags := map[string]string{"path": "type:model/some/path", "name": "str", "source": "hostname", "subscription": "subscription"} + fields := map[string]interface{}{"value": int64(-1)} + acc.AssertContainsTaggedFields(t, "some", fields, tags) + + tags = map[string]string{"path": "type:model/parallel/path", "name": "str", "source": "hostname", "subscription": "subscription"} + fields = map[string]interface{}{"value": int64(-1)} + acc.AssertContainsTaggedFields(t, "parallel", fields, tags) + + tags = map[string]string{"path": "type:model/other/path", "name": "str", "source": "hostname", "subscription": "subscription"} + fields = map[string]interface{}{"value": int64(-1)} + acc.AssertContainsTaggedFields(t, "other", fields, tags) + +}