517 lines
14 KiB
Go
517 lines
14 KiB
Go
package cisco_telemetry_gnmi
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf/metric"
|
|
|
|
"google.golang.org/grpc/credentials"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/internal"
|
|
internaltls "github.com/influxdata/telegraf/internal/tls"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
|
|
"github.com/openconfig/gnmi/proto/gnmi"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
// CiscoTelemetryGNMI plugin instance
|
|
type CiscoTelemetryGNMI struct {
|
|
Addresses []string `toml:"addresses"`
|
|
Subscriptions []Subscription `toml:"subscription"`
|
|
Aliases map[string]string `toml:"aliases"`
|
|
|
|
// Optional subscription configuration
|
|
Encoding string
|
|
Origin string
|
|
Prefix string
|
|
Target string
|
|
UpdatesOnly bool `toml:"updates_only"`
|
|
|
|
// Cisco IOS XR credentials
|
|
Username string
|
|
Password string
|
|
|
|
// Redial
|
|
Redial internal.Duration
|
|
|
|
// GRPC TLS settings
|
|
EnableTLS bool `toml:"enable_tls"`
|
|
internaltls.ClientConfig
|
|
|
|
// Internal state
|
|
aliases map[string]string
|
|
acc telegraf.Accumulator
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// Subscription for a GNMI client
|
|
type Subscription struct {
|
|
Name string
|
|
Origin string
|
|
Path string
|
|
|
|
// Subscription mode and interval
|
|
SubscriptionMode string `toml:"subscription_mode"`
|
|
SampleInterval internal.Duration `toml:"sample_interval"`
|
|
|
|
// Duplicate suppression
|
|
SuppressRedundant bool `toml:"suppress_redundant"`
|
|
HeartbeatInterval internal.Duration `toml:"heartbeat_interval"`
|
|
}
|
|
|
|
// Start the http listener service
|
|
func (c *CiscoTelemetryGNMI) Start(acc telegraf.Accumulator) error {
|
|
var err error
|
|
var ctx context.Context
|
|
var tlscfg *tls.Config
|
|
var request *gnmi.SubscribeRequest
|
|
c.acc = acc
|
|
ctx, c.cancel = context.WithCancel(context.Background())
|
|
|
|
// Validate configuration
|
|
if request, err = c.newSubscribeRequest(); err != nil {
|
|
return err
|
|
} else if c.Redial.Duration.Nanoseconds() <= 0 {
|
|
return fmt.Errorf("redial duration must be positive")
|
|
}
|
|
|
|
// Parse TLS config
|
|
if c.EnableTLS {
|
|
if tlscfg, err = c.ClientConfig.TLSConfig(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if len(c.Username) > 0 {
|
|
ctx = metadata.AppendToOutgoingContext(ctx, "username", c.Username, "password", c.Password)
|
|
}
|
|
|
|
// Invert explicit alias list and prefill subscription names
|
|
c.aliases = make(map[string]string, len(c.Subscriptions)+len(c.Aliases))
|
|
for _, subscription := range c.Subscriptions {
|
|
path := subscription.Path
|
|
if len(subscription.Origin) > 0 {
|
|
path = subscription.Origin + ":" + path
|
|
}
|
|
|
|
name := subscription.Name
|
|
if len(name) == 0 {
|
|
name = path[strings.LastIndexByte(path, '/')+1:]
|
|
}
|
|
if len(name) > 0 {
|
|
c.aliases[path] = name
|
|
}
|
|
}
|
|
for alias, path := range c.Aliases {
|
|
c.aliases[path] = alias
|
|
}
|
|
|
|
// Create a goroutine for each device, dial and subscribe
|
|
c.wg.Add(len(c.Addresses))
|
|
for _, addr := range c.Addresses {
|
|
go func(address string) {
|
|
defer c.wg.Done()
|
|
for ctx.Err() == nil {
|
|
if err := c.subscribeGNMI(ctx, address, tlscfg, request); err != nil && ctx.Err() == nil {
|
|
acc.AddError(err)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-time.After(c.Redial.Duration):
|
|
}
|
|
}
|
|
}(addr)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Create a new GNMI SubscribeRequest
|
|
func (c *CiscoTelemetryGNMI) newSubscribeRequest() (*gnmi.SubscribeRequest, error) {
|
|
// Create subscription objects
|
|
subscriptions := make([]*gnmi.Subscription, len(c.Subscriptions))
|
|
for i, subscription := range c.Subscriptions {
|
|
gnmiPath, err := parsePath(subscription.Origin, subscription.Path, "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
mode, ok := gnmi.SubscriptionMode_value[strings.ToUpper(subscription.SubscriptionMode)]
|
|
if !ok {
|
|
return nil, fmt.Errorf("invalid subscription mode %s", subscription.SubscriptionMode)
|
|
}
|
|
subscriptions[i] = &gnmi.Subscription{
|
|
Path: gnmiPath,
|
|
Mode: gnmi.SubscriptionMode(mode),
|
|
SampleInterval: uint64(subscription.SampleInterval.Duration.Nanoseconds()),
|
|
SuppressRedundant: subscription.SuppressRedundant,
|
|
HeartbeatInterval: uint64(subscription.HeartbeatInterval.Duration.Nanoseconds()),
|
|
}
|
|
}
|
|
|
|
// Construct subscribe request
|
|
gnmiPath, err := parsePath(c.Origin, c.Prefix, c.Target)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if c.Encoding != "proto" && c.Encoding != "json" && c.Encoding != "json_ietf" {
|
|
return nil, fmt.Errorf("unsupported encoding %s", c.Encoding)
|
|
}
|
|
|
|
return &gnmi.SubscribeRequest{
|
|
Request: &gnmi.SubscribeRequest_Subscribe{
|
|
Subscribe: &gnmi.SubscriptionList{
|
|
Prefix: gnmiPath,
|
|
Mode: gnmi.SubscriptionList_STREAM,
|
|
Encoding: gnmi.Encoding(gnmi.Encoding_value[strings.ToUpper(c.Encoding)]),
|
|
Subscription: subscriptions,
|
|
UpdatesOnly: c.UpdatesOnly,
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// SubscribeGNMI and extract telemetry data
|
|
func (c *CiscoTelemetryGNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Config, request *gnmi.SubscribeRequest) error {
|
|
var opt grpc.DialOption
|
|
if tlscfg != nil {
|
|
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlscfg))
|
|
} else {
|
|
opt = grpc.WithInsecure()
|
|
}
|
|
|
|
client, err := grpc.DialContext(ctx, address, opt)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to dial: %v", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
subscribeClient, err := gnmi.NewGNMIClient(client).Subscribe(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to setup subscription: %v", err)
|
|
}
|
|
|
|
if err = subscribeClient.Send(request); err != nil {
|
|
return fmt.Errorf("failed to send subscription request: %v", err)
|
|
}
|
|
|
|
log.Printf("D! [inputs.cisco_telemetry_gnmi]: Connection to GNMI device %s established", address)
|
|
defer log.Printf("D! [inputs.cisco_telemetry_gnmi]: Connection to GNMI device %s closed", address)
|
|
for ctx.Err() == nil {
|
|
var reply *gnmi.SubscribeResponse
|
|
if reply, err = subscribeClient.Recv(); err != nil {
|
|
if err != io.EOF && ctx.Err() == nil {
|
|
return fmt.Errorf("aborted GNMI subscription: %v", err)
|
|
}
|
|
break
|
|
}
|
|
|
|
c.handleSubscribeResponse(address, reply)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// HandleSubscribeResponse message from GNMI and parse contained telemetry data
|
|
func (c *CiscoTelemetryGNMI) handleSubscribeResponse(address string, reply *gnmi.SubscribeResponse) {
|
|
// Check if response is a GNMI Update and if we have a prefix to derive the measurement name
|
|
response, ok := reply.Response.(*gnmi.SubscribeResponse_Update)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
var prefix, prefixAliasPath string
|
|
grouper := metric.NewSeriesGrouper()
|
|
timestamp := time.Unix(0, response.Update.Timestamp)
|
|
prefixTags := make(map[string]string)
|
|
|
|
if response.Update.Prefix != nil {
|
|
prefix, prefixAliasPath = c.handlePath(response.Update.Prefix, prefixTags, "")
|
|
}
|
|
prefixTags["source"], _, _ = net.SplitHostPort(address)
|
|
prefixTags["path"] = prefix
|
|
|
|
// Parse individual Update message and create measurements
|
|
var name, lastAliasPath string
|
|
for _, update := range response.Update.Update {
|
|
// Prepare tags from prefix
|
|
tags := make(map[string]string, len(prefixTags))
|
|
for key, val := range prefixTags {
|
|
tags[key] = val
|
|
}
|
|
aliasPath, fields := c.handleTelemetryField(update, tags, prefix)
|
|
|
|
// Inherent valid alias from prefix parsing
|
|
if len(prefixAliasPath) > 0 && len(aliasPath) == 0 {
|
|
aliasPath = prefixAliasPath
|
|
}
|
|
|
|
// Lookup alias if alias-path has changed
|
|
if aliasPath != lastAliasPath {
|
|
name = prefix
|
|
if alias, ok := c.aliases[aliasPath]; ok {
|
|
name = alias
|
|
} else {
|
|
log.Printf("D! [inputs.cisco_telemetry_gnmi]: No measurement alias for GNMI path: %s", name)
|
|
}
|
|
}
|
|
|
|
// Group metrics
|
|
for key, val := range fields {
|
|
grouper.Add(name, tags, timestamp, key[len(aliasPath)+1:], val)
|
|
}
|
|
|
|
lastAliasPath = aliasPath
|
|
}
|
|
|
|
// Add grouped measurements
|
|
for _, metric := range grouper.Metrics() {
|
|
c.acc.AddMetric(metric)
|
|
}
|
|
}
|
|
|
|
// HandleTelemetryField and add it to a measurement
|
|
func (c *CiscoTelemetryGNMI) handleTelemetryField(update *gnmi.Update, tags map[string]string, prefix string) (string, map[string]interface{}) {
|
|
path, aliasPath := c.handlePath(update.Path, tags, prefix)
|
|
|
|
var value interface{}
|
|
var jsondata []byte
|
|
|
|
switch val := update.Val.Value.(type) {
|
|
case *gnmi.TypedValue_AsciiVal:
|
|
value = val.AsciiVal
|
|
case *gnmi.TypedValue_BoolVal:
|
|
value = val.BoolVal
|
|
case *gnmi.TypedValue_BytesVal:
|
|
value = val.BytesVal
|
|
case *gnmi.TypedValue_DecimalVal:
|
|
value = val.DecimalVal
|
|
case *gnmi.TypedValue_FloatVal:
|
|
value = val.FloatVal
|
|
case *gnmi.TypedValue_IntVal:
|
|
value = val.IntVal
|
|
case *gnmi.TypedValue_StringVal:
|
|
value = val.StringVal
|
|
case *gnmi.TypedValue_UintVal:
|
|
value = val.UintVal
|
|
case *gnmi.TypedValue_JsonIetfVal:
|
|
jsondata = val.JsonIetfVal
|
|
case *gnmi.TypedValue_JsonVal:
|
|
jsondata = val.JsonVal
|
|
}
|
|
|
|
name := strings.Replace(path, "-", "_", -1)
|
|
fields := make(map[string]interface{})
|
|
if value != nil {
|
|
fields[name] = value
|
|
} else if jsondata != nil {
|
|
if err := json.Unmarshal(jsondata, &value); err != nil {
|
|
c.acc.AddError(fmt.Errorf("failed to parse JSON value: %v", err))
|
|
} else {
|
|
flattener := jsonparser.JSONFlattener{Fields: fields}
|
|
flattener.FullFlattenJSON(name, value, true, true)
|
|
}
|
|
}
|
|
return aliasPath, fields
|
|
}
|
|
|
|
// Parse path to path-buffer and tag-field
|
|
func (c *CiscoTelemetryGNMI) handlePath(path *gnmi.Path, tags map[string]string, prefix string) (string, string) {
|
|
var aliasPath string
|
|
builder := bytes.NewBufferString(prefix)
|
|
|
|
// Prefix with origin
|
|
if len(path.Origin) > 0 {
|
|
builder.WriteString(path.Origin)
|
|
builder.WriteRune(':')
|
|
}
|
|
|
|
// Parse generic keys from prefix
|
|
for _, elem := range path.Elem {
|
|
builder.WriteRune('/')
|
|
builder.WriteString(elem.Name)
|
|
name := builder.String()
|
|
|
|
if _, exists := c.aliases[name]; exists {
|
|
aliasPath = name
|
|
}
|
|
|
|
for key, val := range elem.Key {
|
|
key = strings.Replace(key, "-", "_", -1)
|
|
|
|
// Use short-form of key if possible
|
|
if _, exists := tags[key]; exists {
|
|
tags[name+"/"+key] = val
|
|
} else {
|
|
tags[key] = val
|
|
}
|
|
}
|
|
}
|
|
|
|
return builder.String(), aliasPath
|
|
}
|
|
|
|
//ParsePath from XPath-like string to GNMI path structure
|
|
func parsePath(origin string, path string, target string) (*gnmi.Path, error) {
|
|
var err error
|
|
gnmiPath := gnmi.Path{Origin: origin, Target: target}
|
|
|
|
if len(path) > 0 && path[0] != '/' {
|
|
return nil, fmt.Errorf("path does not start with a '/': %s", path)
|
|
}
|
|
|
|
elem := &gnmi.PathElem{}
|
|
start, name, value, end := 0, -1, -1, -1
|
|
|
|
path = path + "/"
|
|
|
|
for i := 0; i < len(path); i++ {
|
|
if path[i] == '[' {
|
|
if name >= 0 {
|
|
break
|
|
}
|
|
if end < 0 {
|
|
end = i
|
|
elem.Key = make(map[string]string)
|
|
}
|
|
name = i + 1
|
|
} else if path[i] == '=' {
|
|
if name <= 0 || value >= 0 {
|
|
break
|
|
}
|
|
value = i + 1
|
|
} else if path[i] == ']' {
|
|
if name <= 0 || value <= name {
|
|
break
|
|
}
|
|
elem.Key[path[name:value-1]] = strings.Trim(path[value:i], "'\"")
|
|
name, value = -1, -1
|
|
} else if path[i] == '/' {
|
|
if name < 0 {
|
|
if end < 0 {
|
|
end = i
|
|
}
|
|
|
|
if end > start {
|
|
elem.Name = path[start:end]
|
|
gnmiPath.Elem = append(gnmiPath.Elem, elem)
|
|
gnmiPath.Element = append(gnmiPath.Element, path[start:i])
|
|
}
|
|
|
|
start, name, value, end = i+1, -1, -1, -1
|
|
elem = &gnmi.PathElem{}
|
|
}
|
|
}
|
|
}
|
|
|
|
if name >= 0 || value >= 0 {
|
|
err = fmt.Errorf("Invalid GNMI path: %s", path)
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &gnmiPath, nil
|
|
}
|
|
|
|
// Stop listener and cleanup
|
|
func (c *CiscoTelemetryGNMI) Stop() {
|
|
c.cancel()
|
|
c.wg.Wait()
|
|
}
|
|
|
|
const sampleConfig = `
|
|
## Address and port of the GNMI GRPC server
|
|
addresses = ["10.49.234.114:57777"]
|
|
|
|
## define credentials
|
|
username = "cisco"
|
|
password = "cisco"
|
|
|
|
## GNMI encoding requested (one of: "proto", "json", "json_ietf")
|
|
# encoding = "proto"
|
|
|
|
## redial in case of failures after
|
|
redial = "10s"
|
|
|
|
## enable client-side TLS and define CA to authenticate the device
|
|
# enable_tls = true
|
|
# tls_ca = "/etc/telegraf/ca.pem"
|
|
# insecure_skip_verify = true
|
|
|
|
## define client-side TLS certificate & key to authenticate to the device
|
|
# tls_cert = "/etc/telegraf/cert.pem"
|
|
# tls_key = "/etc/telegraf/key.pem"
|
|
|
|
## GNMI subscription prefix (optional, can usually be left empty)
|
|
## See: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md#222-paths
|
|
# origin = ""
|
|
# prefix = ""
|
|
# target = ""
|
|
|
|
## Define additional aliases to map telemetry encoding paths to simple measurement names
|
|
#[inputs.cisco_telemetry_gnmi.aliases]
|
|
# ifcounters = "openconfig:/interfaces/interface/state/counters"
|
|
|
|
[[inputs.cisco_telemetry_gnmi.subscription]]
|
|
## Name of the measurement that will be emitted
|
|
name = "ifcounters"
|
|
|
|
## Origin and path of the subscription
|
|
## See: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md#222-paths
|
|
##
|
|
## origin usually refers to a (YANG) data model implemented by the device
|
|
## and path to a specific substructe inside it that should be subscribed to (similar to an XPath)
|
|
## YANG models can be found e.g. here: https://github.com/YangModels/yang/tree/master/vendor/cisco/xr
|
|
origin = "openconfig-interfaces"
|
|
path = "/interfaces/interface/state/counters"
|
|
|
|
# Subscription mode (one of: "target_defined", "sample", "on_change") and interval
|
|
subscription_mode = "sample"
|
|
sample_interval = "10s"
|
|
|
|
## Suppress redundant transmissions when measured values are unchanged
|
|
# suppress_redundant = false
|
|
|
|
## If suppression is enabled, send updates at least every X seconds anyway
|
|
# heartbeat_interval = "60s"
|
|
`
|
|
|
|
// SampleConfig of plugin
|
|
func (c *CiscoTelemetryGNMI) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
// Description of plugin
|
|
func (c *CiscoTelemetryGNMI) Description() string {
|
|
return "Cisco GNMI telemetry input plugin based on GNMI telemetry data produced in IOS XR"
|
|
}
|
|
|
|
// Gather plugin measurements (unused)
|
|
func (c *CiscoTelemetryGNMI) Gather(_ telegraf.Accumulator) error {
|
|
return nil
|
|
}
|
|
|
|
func init() {
|
|
inputs.Add("cisco_telemetry_gnmi", func() telegraf.Input {
|
|
return &CiscoTelemetryGNMI{
|
|
Encoding: "proto",
|
|
Redial: internal.Duration{Duration: 10 * time.Second},
|
|
}
|
|
})
|
|
}
|