package jti_openconfig_telemetry import ( "fmt" "net" "regexp" "strings" "sync" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" internaltls "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs/jti_openconfig_telemetry/auth" "github.com/influxdata/telegraf/plugins/inputs/jti_openconfig_telemetry/oc" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" ) type OpenConfigTelemetry struct { Servers []string `toml:"servers"` Sensors []string `toml:"sensors"` Username string `toml:"username"` Password string `toml:"password"` ClientID string `toml:"client_id"` SampleFrequency internal.Duration `toml:"sample_frequency"` StrAsTags bool `toml:"str_as_tags"` RetryDelay internal.Duration `toml:"retry_delay"` EnableTLS bool `toml:"enable_tls"` internaltls.ClientConfig Log telegraf.Logger sensorsConfig []sensorConfig grpcClientConns []*grpc.ClientConn wg *sync.WaitGroup } var ( // Regex to match and extract data points from path value in received key keyPathRegex = regexp.MustCompile("\\/([^\\/]*)\\[([A-Za-z0-9\\-\\/]*\\=[^\\[]*)\\]") sampleConfig = ` ## List of device addresses to collect telemetry from servers = ["localhost:1883"] ## Authentication details. Username and password are must if device expects ## authentication. Client ID must be unique when connecting from multiple instances ## of telegraf to the same device username = "user" password = "pass" client_id = "telegraf" ## Frequency to get data sample_frequency = "1000ms" ## Sensors to subscribe for ## A identifier for each sensor can be provided in path by separating with space ## Else sensor path will be used as identifier ## When identifier is used, we can provide a list of space separated sensors. ## A single subscription will be created with all these sensors and data will ## be saved to measurement with this identifier name sensors = [ "/interfaces/", "collection /components/ /lldp", ] ## We allow specifying sensor group level reporting rate. To do this, specify the ## reporting rate in Duration at the beginning of sensor paths / collection ## name. For entries without reporting rate, we use configured sample frequency sensors = [ "1000ms customReporting /interfaces /lldp", "2000ms collection /components", "/interfaces", ] ## Optional TLS Config # enable_tls = true # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Delay between retry attempts of failed RPC calls or streams. Defaults to 1000ms. ## Failed streams/calls will not be retried if 0 is provided retry_delay = "1000ms" ## To treat all string values as tags, set this to true str_as_tags = false ` ) func (m *OpenConfigTelemetry) SampleConfig() string { return sampleConfig } func (m *OpenConfigTelemetry) Description() string { return "Read JTI OpenConfig Telemetry from listed sensors" } func (m *OpenConfigTelemetry) Gather(acc telegraf.Accumulator) error { return nil } func (m *OpenConfigTelemetry) Stop() { for _, grpcClientConn := range m.grpcClientConns { grpcClientConn.Close() } m.wg.Wait() } // Takes in XML path with predicates and returns list of tags+values along with a final // XML path without predicates. If /events/event[id=2]/attributes[key='message']/value // is given input, this function will emit /events/event/attributes/value as xmlpath and // { /events/event/@id=2, /events/event/attributes/@key='message' } as tags func spitTagsNPath(xmlpath string) (string, map[string]string) { subs := keyPathRegex.FindAllStringSubmatch(xmlpath, -1) tags := make(map[string]string) // Given XML path, this will spit out final path without predicates if len(subs) > 0 { for _, sub := range subs { tagKey := strings.Split(xmlpath, sub[0])[0] + "/" + strings.TrimSpace(sub[1]) + "/@" // If we have multiple keys in give path like /events/event[id=2 and type=3]/, // we must emit multiple tags for _, kv := range strings.Split(sub[2], " and ") { key := tagKey + strings.TrimSpace(strings.Split(kv, "=")[0]) tagValue := strings.Replace(strings.Split(kv, "=")[1], "'", "", -1) tags[key] = tagValue } xmlpath = strings.Replace(xmlpath, sub[0], "/"+strings.TrimSpace(sub[1]), 1) } } return xmlpath, tags } // Takes in a OC response, extracts tag information from keys and returns a // list of groups with unique sets of tags+values func (m *OpenConfigTelemetry) extractData(r *telemetry.OpenConfigData, grpcServer string) []DataGroup { // Use empty prefix. We will update this when we iterate over key-value pairs prefix := "" dgroups := []DataGroup{} for _, v := range r.Kv { kv := make(map[string]interface{}) if v.Key == "__prefix__" { prefix = v.GetStrValue() continue } // Also, lets use prefix if there is one xmlpath, finaltags := spitTagsNPath(prefix + v.Key) finaltags["device"] = grpcServer switch v.Value.(type) { case *telemetry.KeyValue_StrValue: // If StrAsTags is set, we treat all string values as tags if m.StrAsTags { finaltags[xmlpath] = v.GetStrValue() } else { kv[xmlpath] = v.GetStrValue() } break case *telemetry.KeyValue_DoubleValue: kv[xmlpath] = v.GetDoubleValue() break case *telemetry.KeyValue_IntValue: kv[xmlpath] = v.GetIntValue() break case *telemetry.KeyValue_UintValue: kv[xmlpath] = v.GetUintValue() break case *telemetry.KeyValue_SintValue: kv[xmlpath] = v.GetSintValue() break case *telemetry.KeyValue_BoolValue: kv[xmlpath] = v.GetBoolValue() break case *telemetry.KeyValue_BytesValue: kv[xmlpath] = v.GetBytesValue() break } // Insert other tags from message finaltags["system_id"] = r.SystemId finaltags["path"] = r.Path // Insert derived key and value dgroups = CollectionByKeys(dgroups).Insert(finaltags, kv) // Insert data from message header dgroups = CollectionByKeys(dgroups).Insert(finaltags, map[string]interface{}{"_sequence": r.SequenceNumber}) dgroups = CollectionByKeys(dgroups).Insert(finaltags, map[string]interface{}{"_timestamp": r.Timestamp}) dgroups = CollectionByKeys(dgroups).Insert(finaltags, map[string]interface{}{"_component_id": r.ComponentId}) dgroups = CollectionByKeys(dgroups).Insert(finaltags, map[string]interface{}{"_subcomponent_id": r.SubComponentId}) } return dgroups } // Structure to hold sensors path list and measurement name type sensorConfig struct { measurementName string pathList []*telemetry.Path } // Takes in sensor configuration and converts it into slice of sensorConfig objects func (m *OpenConfigTelemetry) splitSensorConfig() int { var pathlist []*telemetry.Path var measurementName string var reportingRate uint32 m.sensorsConfig = make([]sensorConfig, 0) for _, sensor := range m.Sensors { spathSplit := strings.Fields(sensor) reportingRate = uint32(m.SampleFrequency.Duration / time.Millisecond) // Extract measurement name and custom reporting rate if specified. Custom // reporting rate will be specified at the beginning of sensor list, // followed by measurement name like "1000ms interfaces /interfaces" // where 1000ms is the custom reporting rate and interfaces is the // measurement name. If 1000ms is not given, we use global reporting rate // from sample_frequency. if measurement name is not given, we use first // sensor name as the measurement name. If first or the word after custom // reporting rate doesn't start with /, we treat it as measurement name // and exclude it from list of sensors to subscribe duration, err := time.ParseDuration(spathSplit[0]) if err == nil { reportingRate = uint32(duration / time.Millisecond) spathSplit = spathSplit[1:] } if len(spathSplit) == 0 { m.Log.Error("No sensors are specified") continue } // Word after custom reporting rate is treated as measurement name measurementName = spathSplit[0] // If our word after custom reporting rate doesn't start with /, we treat // it as measurement name. Else we treat it as sensor if !strings.HasPrefix(measurementName, "/") { spathSplit = spathSplit[1:] } if len(spathSplit) == 0 { m.Log.Error("No valid sensors are specified") continue } // Iterate over our sensors and create pathlist to subscribe pathlist = make([]*telemetry.Path, 0) for _, path := range spathSplit { pathlist = append(pathlist, &telemetry.Path{Path: path, SampleFrequency: reportingRate}) } m.sensorsConfig = append(m.sensorsConfig, sensorConfig{ measurementName: measurementName, pathList: pathlist, }) } return len(m.sensorsConfig) } // Subscribes and collects OpenConfig telemetry data from given server func (m *OpenConfigTelemetry) collectData(ctx context.Context, grpcServer string, grpcClientConn *grpc.ClientConn, acc telegraf.Accumulator) error { c := telemetry.NewOpenConfigTelemetryClient(grpcClientConn) for _, sensor := range m.sensorsConfig { m.wg.Add(1) go func(ctx context.Context, sensor sensorConfig) { defer m.wg.Done() for { stream, err := c.TelemetrySubscribe(ctx, &telemetry.SubscriptionRequest{PathList: sensor.pathList}) if err != nil { rpcStatus, _ := status.FromError(err) // If service is currently unavailable and may come back later, retry if rpcStatus.Code() != codes.Unavailable { acc.AddError(fmt.Errorf("could not subscribe to %s: %v", grpcServer, err)) return } else { // Retry with delay. If delay is not provided, use default if m.RetryDelay.Duration > 0 { m.Log.Debugf("Retrying %s with timeout %v", grpcServer, m.RetryDelay.Duration) time.Sleep(m.RetryDelay.Duration) continue } else { return } } } for { r, err := stream.Recv() if err != nil { // If we encounter error in the stream, break so we can retry // the connection acc.AddError(fmt.Errorf("failed to read from %s: %s", grpcServer, err)) break } m.Log.Debugf("Received from %s: %v", grpcServer, r) // Create a point and add to batch tags := make(map[string]string) // Insert additional tags tags["device"] = grpcServer dgroups := m.extractData(r, grpcServer) // Print final data collection m.Log.Debugf("Available collection for %s is: %v", grpcServer, dgroups) tnow := time.Now() // Iterate through data groups and add them for _, group := range dgroups { if len(group.tags) == 0 { acc.AddFields(sensor.measurementName, group.data, tags, tnow) } else { acc.AddFields(sensor.measurementName, group.data, group.tags, tnow) } } } } }(ctx, sensor) } return nil } func (m *OpenConfigTelemetry) Start(acc telegraf.Accumulator) error { // Build sensors config if m.splitSensorConfig() == 0 { return fmt.Errorf("no valid sensor configuration available") } // Parse TLS config var opts []grpc.DialOption if m.EnableTLS { tlscfg, err := m.ClientConfig.TLSConfig() if err != nil { return err } opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlscfg))) } else { opts = append(opts, grpc.WithInsecure()) } // Connect to given list of servers and start collecting data var grpcClientConn *grpc.ClientConn var wg sync.WaitGroup ctx := context.Background() m.wg = &wg for _, server := range m.Servers { // Extract device address and port grpcServer, grpcPort, err := net.SplitHostPort(server) if err != nil { m.Log.Errorf("Invalid server address: %s", err.Error()) continue } grpcClientConn, err = grpc.Dial(server, opts...) if err != nil { m.Log.Errorf("Failed to connect to %s: %s", server, err.Error()) } else { m.Log.Debugf("Opened a new gRPC session to %s on port %s", grpcServer, grpcPort) } // Add to the list of client connections m.grpcClientConns = append(m.grpcClientConns, grpcClientConn) if m.Username != "" && m.Password != "" && m.ClientID != "" { lc := authentication.NewLoginClient(grpcClientConn) loginReply, loginErr := lc.LoginCheck(ctx, &authentication.LoginRequest{UserName: m.Username, Password: m.Password, ClientId: m.ClientID}) if loginErr != nil { m.Log.Errorf("Could not initiate login check for %s: %v", server, loginErr) continue } // Check if the user is authenticated. Bail if auth error if !loginReply.Result { m.Log.Errorf("Failed to authenticate the user for %s", server) continue } } // Subscribe and gather telemetry data m.collectData(ctx, grpcServer, grpcClientConn, acc) } return nil } func init() { inputs.Add("jti_openconfig_telemetry", func() telegraf.Input { return &OpenConfigTelemetry{ RetryDelay: internal.Duration{Duration: time.Second}, StrAsTags: false, } }) }