Add service input plugin for OpenConfig streaming telemetry (#2292)

This commit is contained in:
Ajay Kumar Chintala 2018-05-11 17:58:19 -07:00 committed by Daniel Nelson
parent 7e0e664860
commit fdd899e9d4
10 changed files with 2625 additions and 1 deletions

4
Godeps
View File

@ -83,9 +83,11 @@ github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096
github.com/yuin/gopher-lua 66c871e454fcf10251c61bf8eff02d0978cae75a
github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363
golang.org/x/crypto dc137beb6cce2043eb6b5f223ab8bf51c32459f4
golang.org/x/net f2499483f923065a842d38eb4c7f1927e6fc6e6d
golang.org/x/net a337091b0525af65de94df2eb7e98bd9962dcbe2
golang.org/x/sys 739734461d1c916b6c72a63d7efda2b27edb369f
golang.org/x/text 506f9d5c962f284575e88337e7d9296d27e729d3
google.golang.org/genproto 11c7f9e547da6db876260ce49ea7536985904c9b
google.golang.org/grpc de2209a968d48e8970546c8a710189f7461370f7
gopkg.in/asn1-ber.v1 4e86f4367175e39f69d9358a5f17b4dda270378d
gopkg.in/fatih/pool.v2 6e328e67893eb46323ad06f0e92cb9536babbabc
gopkg.in/gorethink/gorethink.v3 7ab832f7b65573104a555d84a27992ae9ea1f659

View File

@ -42,6 +42,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/iptables"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia2"
_ "github.com/influxdata/telegraf/plugins/inputs/jti_openconfig_telemetry"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy"
_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"

View File

@ -0,0 +1,59 @@
# JTI OpenConfig Telemetry Input Plugin
This plugin reads Juniper Networks implementation of OpenConfig telemetry data from listed sensors using Junos Telemetry Interface. Refer to
[openconfig.net](http://openconfig.net/) for more details about OpenConfig and [Junos Telemetry Interface (JTI)](https://www.juniper.net/documentation/en_US/junos/topics/concept/junos-telemetry-interface-oveview.html).
### Configuration:
```toml
# Subscribe and receive OpenConfig Telemetry data using JTI
[[inputs.jti_openconfig_telemetry]]
## List of device addresses to collect telemetry from
servers = ["localhost:1883"]
## Authentication details. Username and password are must if device expects
## authentication. Client ID must be unique when connecting from multiple instances
## of telegraf to the same device
username = "user"
password = "pass"
client_id = "telegraf"
## Frequency to get data
sample_frequency = "1000ms"
## Sensors to subscribe for
## A identifier for each sensor can be provided in path by separating with space
## Else sensor path will be used as identifier
## When identifier is used, we can provide a list of space separated sensors.
## A single subscription will be created with all these sensors and data will
## be saved to measurement with this identifier name
sensors = [
"/interfaces/",
"collection /components/ /lldp",
]
## We allow specifying sensor group level reporting rate. To do this, specify the
## reporting rate in Duration at the beginning of sensor paths / collection
## name. For entries without reporting rate, we use configured sample frequency
sensors = [
"1000ms customReporting /interfaces /lldp",
"2000ms collection /components",
"/interfaces",
]
## x509 Certificate to use with TLS connection. If it is not provided, an insecure
## channel will be opened with server
ssl_cert = "/etc/telegraf/cert.pem"
## Delay between retry attempts of failed RPC calls or streams. Defaults to 1000ms.
## Failed streams/calls will not be retried if 0 is provided
retry_delay = "1000ms"
## To treat all string values as tags, set this to true
str_as_tags = false
```
### Tags:
- All measurements are tagged appropriately using the identifier information
in incoming data

View File

@ -0,0 +1,182 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: authentication_service.proto
/*
Package authentication is a generated protocol buffer package.
It is generated from these files:
authentication_service.proto
It has these top-level messages:
LoginRequest
LoginReply
*/
package authentication
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// The request message containing the user's name, password and client id
type LoginRequest struct {
UserName string `protobuf:"bytes,1,opt,name=user_name,json=userName" json:"user_name,omitempty"`
Password string `protobuf:"bytes,2,opt,name=password" json:"password,omitempty"`
ClientId string `protobuf:"bytes,3,opt,name=client_id,json=clientId" json:"client_id,omitempty"`
}
func (m *LoginRequest) Reset() { *m = LoginRequest{} }
func (m *LoginRequest) String() string { return proto.CompactTextString(m) }
func (*LoginRequest) ProtoMessage() {}
func (*LoginRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *LoginRequest) GetUserName() string {
if m != nil {
return m.UserName
}
return ""
}
func (m *LoginRequest) GetPassword() string {
if m != nil {
return m.Password
}
return ""
}
func (m *LoginRequest) GetClientId() string {
if m != nil {
return m.ClientId
}
return ""
}
// The response message containing the result of login attempt.
// result value of true indicates success and false indicates
// failure
type LoginReply struct {
Result bool `protobuf:"varint,1,opt,name=result" json:"result,omitempty"`
}
func (m *LoginReply) Reset() { *m = LoginReply{} }
func (m *LoginReply) String() string { return proto.CompactTextString(m) }
func (*LoginReply) ProtoMessage() {}
func (*LoginReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *LoginReply) GetResult() bool {
if m != nil {
return m.Result
}
return false
}
func init() {
proto.RegisterType((*LoginRequest)(nil), "authentication.LoginRequest")
proto.RegisterType((*LoginReply)(nil), "authentication.LoginReply")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for Login service
type LoginClient interface {
LoginCheck(ctx context.Context, in *LoginRequest, opts ...grpc.CallOption) (*LoginReply, error)
}
type loginClient struct {
cc *grpc.ClientConn
}
func NewLoginClient(cc *grpc.ClientConn) LoginClient {
return &loginClient{cc}
}
func (c *loginClient) LoginCheck(ctx context.Context, in *LoginRequest, opts ...grpc.CallOption) (*LoginReply, error) {
out := new(LoginReply)
err := grpc.Invoke(ctx, "/authentication.Login/LoginCheck", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Login service
type LoginServer interface {
LoginCheck(context.Context, *LoginRequest) (*LoginReply, error)
}
func RegisterLoginServer(s *grpc.Server, srv LoginServer) {
s.RegisterService(&_Login_serviceDesc, srv)
}
func _Login_LoginCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(LoginRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(LoginServer).LoginCheck(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/authentication.Login/LoginCheck",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(LoginServer).LoginCheck(ctx, req.(*LoginRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Login_serviceDesc = grpc.ServiceDesc{
ServiceName: "authentication.Login",
HandlerType: (*LoginServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "LoginCheck",
Handler: _Login_LoginCheck_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "authentication_service.proto",
}
func init() { proto.RegisterFile("authentication_service.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 200 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x49, 0x2c, 0x2d, 0xc9,
0x48, 0xcd, 0x2b, 0xc9, 0x4c, 0x4e, 0x2c, 0xc9, 0xcc, 0xcf, 0x8b, 0x2f, 0x4e, 0x2d, 0x2a, 0xcb,
0x4c, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x43, 0x95, 0x55, 0x4a, 0xe1, 0xe2,
0xf1, 0xc9, 0x4f, 0xcf, 0xcc, 0x0b, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x92, 0xe6, 0xe2,
0x2c, 0x2d, 0x4e, 0x2d, 0x8a, 0xcf, 0x4b, 0xcc, 0x4d, 0x95, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c,
0xe2, 0x00, 0x09, 0xf8, 0x25, 0xe6, 0xa6, 0x0a, 0x49, 0x71, 0x71, 0x14, 0x24, 0x16, 0x17, 0x97,
0xe7, 0x17, 0xa5, 0x48, 0x30, 0x41, 0xe4, 0x60, 0x7c, 0x90, 0xc6, 0xe4, 0x9c, 0xcc, 0xd4, 0xbc,
0x92, 0xf8, 0xcc, 0x14, 0x09, 0x66, 0x88, 0x24, 0x44, 0xc0, 0x33, 0x45, 0x49, 0x85, 0x8b, 0x0b,
0x6a, 0x4b, 0x41, 0x4e, 0xa5, 0x90, 0x18, 0x17, 0x5b, 0x51, 0x6a, 0x71, 0x69, 0x4e, 0x09, 0xd8,
0x02, 0x8e, 0x20, 0x28, 0xcf, 0x28, 0x90, 0x8b, 0x15, 0xac, 0x4a, 0xc8, 0x03, 0xaa, 0xdc, 0x39,
0x23, 0x35, 0x39, 0x5b, 0x48, 0x46, 0x0f, 0xd5, 0xcd, 0x7a, 0xc8, 0x0e, 0x96, 0x92, 0xc2, 0x21,
0x5b, 0x90, 0x53, 0xa9, 0xc4, 0x90, 0xc4, 0x06, 0xf6, 0xb5, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff,
0x11, 0x57, 0x52, 0xd2, 0x15, 0x01, 0x00, 0x00,
}

View File

@ -0,0 +1,48 @@
//
// Copyrights (c) 2017, Juniper Networks, Inc.
// All rights reserved.
//
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
syntax = "proto3";
package authentication;
// The Login service definition.
service Login {
rpc LoginCheck (LoginRequest) returns (LoginReply) {}
}
// The request message containing the user's name, password and client id
message LoginRequest {
string user_name = 1;
string password = 2;
string client_id = 3;
}
/*
* The response message containing the result of login attempt.
* result value of true indicates success and false indicates
* failure
*/
message LoginReply {
bool result = 1;
}

View File

@ -0,0 +1,63 @@
package jti_openconfig_telemetry
import "sort"
type DataGroup struct {
numKeys int
tags map[string]string
data map[string]interface{}
}
// Sort the data groups by number of keys
type CollectionByKeys []DataGroup
func (a CollectionByKeys) Len() int { return len(a) }
func (a CollectionByKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a CollectionByKeys) Less(i, j int) bool { return a[i].numKeys < a[j].numKeys }
// Checks to see if there is already a group with these tags and returns its index. Returns -1 if unavailable.
func (a CollectionByKeys) IsAvailable(tags map[string]string) *DataGroup {
sort.Sort(CollectionByKeys(a))
// Iterate through all the groups and see if we have group with these tags
for _, group := range a {
// Since already sorted, match with only groups with N keys
if group.numKeys < len(tags) {
continue
} else if group.numKeys > len(tags) {
break
}
matchFound := true
for k, v := range tags {
if val, ok := group.tags[k]; ok {
if val != v {
matchFound = false
break
}
} else {
matchFound = false
break
}
}
if matchFound {
return &group
}
}
return nil
}
// Inserts into already existing group or creates a new group
func (a CollectionByKeys) Insert(tags map[string]string, data map[string]interface{}) CollectionByKeys {
// If there is already a group with this set of tags, insert into it. Otherwise create a new group and insert
if group := a.IsAvailable(tags); group != nil {
for k, v := range data {
group.data[k] = v
}
} else {
a = append(a, DataGroup{len(tags), tags, data})
}
return a
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,319 @@
//
// Copyrights (c) 2016, Juniper Networks, Inc.
// All rights reserved.
//
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
//
// Nitin Kumar 04/07/2016
// Abbas Sakarwala 04/07/2016
//
// This file defines the Openconfig Telemetry RPC APIs (for gRPC).
//
// https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang
//
// Version 1.0
//
syntax = "proto3";
package telemetry;
// Interface exported by Agent
service OpenConfigTelemetry {
// Request an inline subscription for data at the specified path.
// The device should send telemetry data back on the same
// connection as the subscription request.
rpc telemetrySubscribe(SubscriptionRequest) returns (stream OpenConfigData) {}
// Terminates and removes an exisiting telemetry subscription
rpc cancelTelemetrySubscription(CancelSubscriptionRequest) returns (CancelSubscriptionReply) {}
// Get the list of current telemetry subscriptions from the
// target. This command returns a list of existing subscriptions
// not including those that are established via configuration.
rpc getTelemetrySubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsReply) {}
// Get Telemetry Agent Operational States
rpc getTelemetryOperationalState(GetOperationalStateRequest) returns (GetOperationalStateReply) {}
// Return the set of data encodings supported by the device for
// telemetry data
rpc getDataEncodings(DataEncodingRequest) returns (DataEncodingReply) {}
}
// Message sent for a telemetry subscription request
message SubscriptionRequest {
// Data associated with a telemetry subscription
SubscriptionInput input = 1;
// List of data models paths and filters
// which are used in a telemetry operation.
repeated Path path_list = 2;
// The below configuration is not defined in Openconfig RPC.
// It is a proposed extension to configure additional
// subscription request features.
SubscriptionAdditionalConfig additional_config = 3;
}
// Data associated with a telemetry subscription
message SubscriptionInput {
// List of optional collector endpoints to send data for
// this subscription.
// If no collector destinations are specified, the collector
// destination is assumed to be the requester on the rpc channel.
repeated Collector collector_list = 1;
}
// Collector endpoints to send data specified as an ip+port combination.
message Collector {
// IP address of collector endpoint
string address = 1;
// Transport protocol port number for the collector destination.
uint32 port = 2;
}
// Data model path
message Path {
// Data model path of interest
// Path specification for elements of OpenConfig data models
string path = 1;
// Regular expression to be used in filtering state leaves
string filter = 2;
// If this is set to true, the target device will only send
// updates to the collector upon a change in data value
bool suppress_unchanged = 3;
// Maximum time in ms the target device may go without sending
// a message to the collector. If this time expires with
// suppress-unchanged set, the target device must send an update
// message regardless if the data values have changed.
uint32 max_silent_interval = 4;
// Time in ms between collection and transmission of the
// specified data to the collector platform. The target device
// will sample the corresponding data (e.g,. a counter) and
// immediately send to the collector destination.
//
// If sample-frequency is set to 0, then the network device
// must emit an update upon every datum change.
uint32 sample_frequency = 5;
// EOM needed for each walk cycle of this path?
// For periodic sensor, applicable for each complete reap
// For event sensor, applicable when initial dump is over
// (same as EOS)
// This feature is not implemented currently.
bool need_eom = 6;
}
// Configure subscription request additional features.
message SubscriptionAdditionalConfig {
// limit the number of records sent in the stream
int32 limit_records = 1;
// limit the time the stream remains open
int32 limit_time_seconds = 2;
// EOS needed for this subscription?
bool need_eos = 3;
}
// Reply to inline subscription for data at the specified path is done in
// two-folds.
// 1. Reply data message sent out using out-of-band channel.
// 2. Telemetry data send back on the same connection as the
// subscription request.
// 1. Reply data message sent out using out-of-band channel.
message SubscriptionReply {
// Response message to a telemetry subscription creation or
// get request.
SubscriptionResponse response = 1;
// List of data models paths and filters
// which are used in a telemetry operation.
repeated Path path_list = 2;
}
// Response message to a telemetry subscription creation or get request.
message SubscriptionResponse {
// Unique id for the subscription on the device. This is
// generated by the device and returned in a subscription
// request or when listing existing subscriptions
uint32 subscription_id = 1;
}
// 2. Telemetry data send back on the same connection as the
// subscription request.
message OpenConfigData {
// router name:export IP address
string system_id = 1;
// line card / RE (slot number)
uint32 component_id = 2;
// PFE (if applicable)
uint32 sub_component_id = 3;
// Path specification for elements of OpenConfig data models
string path = 4;
// Sequence number, monotonically increasing for each
// system_id, component_id, sub_component_id + path.
uint64 sequence_number = 5;
// timestamp (milliseconds since epoch)
uint64 timestamp = 6;
// List of key-value pairs
repeated KeyValue kv = 7;
// For delete. If filled, it indicates delete
repeated Delete delete = 8;
// If filled, it indicates end of marker for the
// respective path in the list.
repeated Eom eom = 9;
// If filled, it indicates end of sync for complete subscription
bool sync_response = 10;
}
// Simple Key-value, where value could be one of scalar types
message KeyValue {
// Key
string key = 1;
// One of possible values
oneof value {
double double_value = 5;
int64 int_value = 6;
uint64 uint_value = 7;
sint64 sint_value = 8;
bool bool_value = 9;
string str_value = 10;
bytes bytes_value = 11;
}
}
// Message indicating delete for a particular path
message Delete {
string path = 1;
}
// Message indicating EOM for a particular path
message Eom {
string path = 1;
}
// Message sent for a telemetry subscription cancellation request
message CancelSubscriptionRequest {
// Subscription identifier as returned by the device when
// subscription was requested
uint32 subscription_id = 1;
}
// Reply to telemetry subscription cancellation request
message CancelSubscriptionReply {
// Return code
ReturnCode code = 1;
// Return code string
string code_str = 2;
};
// Result of the operation
enum ReturnCode {
SUCCESS = 0;
NO_SUBSCRIPTION_ENTRY = 1;
UNKNOWN_ERROR = 2;
}
// Message sent for a telemetry get request
message GetSubscriptionsRequest {
// Subscription identifier as returned by the device when
// subscription was requested
// --- or ---
// 0xFFFFFFFF for all subscription identifiers
uint32 subscription_id = 1;
}
// Reply to telemetry subscription get request
message GetSubscriptionsReply {
// List of current telemetry subscriptions
repeated SubscriptionReply subscription_list = 1;
}
// Message sent for telemetry agent operational states request
message GetOperationalStateRequest {
// Per-subscription_id level operational state can be requested.
//
// Subscription identifier as returned by the device when
// subscription was requested
// --- or ---
// 0xFFFFFFFF for all subscription identifiers including agent-level
// operational stats
// --- or ---
// If subscription_id is not present then sent only agent-level
// operational stats
uint32 subscription_id = 1;
// Control verbosity of the output
VerbosityLevel verbosity = 2;
}
// Verbosity Level
enum VerbosityLevel {
DETAIL = 0;
TERSE = 1;
BRIEF = 2;
}
// Reply to telemetry agent operational states request
message GetOperationalStateReply {
// List of key-value pairs where
// key = operational state definition
// value = operational state value
repeated KeyValue kv = 1;
}
// Message sent for a data encoding request
message DataEncodingRequest {
}
// Reply to data encodings supported request
message DataEncodingReply {
repeated EncodingType encoding_list = 1;
}
// Encoding Type Supported
enum EncodingType {
UNDEFINED = 0;
XML = 1;
JSON_IETF = 2;
PROTO3 = 3;
}

View File

@ -0,0 +1,422 @@
package jti_openconfig_telemetry
import (
"fmt"
"log"
"net"
"regexp"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/jti_openconfig_telemetry/auth"
"github.com/influxdata/telegraf/plugins/inputs/jti_openconfig_telemetry/oc"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
)
type OpenConfigTelemetry struct {
Servers []string
Sensors []string
Username string
Password string
ClientID string `toml:"client_id"`
SampleFrequency internal.Duration `toml:"sample_frequency"`
SSLCert string `toml:"ssl_cert"`
StrAsTags bool `toml:"str_as_tags"`
RetryDelay internal.Duration `toml:"retry_delay"`
sensorsConfig []sensorConfig
grpcClientConns []*grpc.ClientConn
wg *sync.WaitGroup
}
var (
// Regex to match and extract data points from path value in received key
keyPathRegex = regexp.MustCompile("\\/([^\\/]*)\\[([A-Za-z0-9\\-\\/]*\\=[^\\[]*)\\]")
sampleConfig = `
## List of device addresses to collect telemetry from
servers = ["localhost:1883"]
## Authentication details. Username and password are must if device expects
## authentication. Client ID must be unique when connecting from multiple instances
## of telegraf to the same device
username = "user"
password = "pass"
client_id = "telegraf"
## Frequency to get data
sample_frequency = "1000ms"
## Sensors to subscribe for
## A identifier for each sensor can be provided in path by separating with space
## Else sensor path will be used as identifier
## When identifier is used, we can provide a list of space separated sensors.
## A single subscription will be created with all these sensors and data will
## be saved to measurement with this identifier name
sensors = [
"/interfaces/",
"collection /components/ /lldp",
]
## We allow specifying sensor group level reporting rate. To do this, specify the
## reporting rate in Duration at the beginning of sensor paths / collection
## name. For entries without reporting rate, we use configured sample frequency
sensors = [
"1000ms customReporting /interfaces /lldp",
"2000ms collection /components",
"/interfaces",
]
## x509 Certificate to use with TLS connection. If it is not provided, an insecure
## channel will be opened with server
ssl_cert = "/etc/telegraf/cert.pem"
## Delay between retry attempts of failed RPC calls or streams. Defaults to 1000ms.
## Failed streams/calls will not be retried if 0 is provided
retry_delay = "1000ms"
## To treat all string values as tags, set this to true
str_as_tags = false
`
)
func (m *OpenConfigTelemetry) SampleConfig() string {
return sampleConfig
}
func (m *OpenConfigTelemetry) Description() string {
return "Read JTI OpenConfig Telemetry from listed sensors"
}
func (m *OpenConfigTelemetry) Gather(acc telegraf.Accumulator) error {
return nil
}
func (m *OpenConfigTelemetry) Stop() {
for _, grpcClientConn := range m.grpcClientConns {
grpcClientConn.Close()
}
m.wg.Wait()
}
// Takes in XML path with predicates and returns list of tags+values along with a final
// XML path without predicates. If /events/event[id=2]/attributes[key='message']/value
// is given input, this function will emit /events/event/attributes/value as xmlpath and
// { /events/event/@id=2, /events/event/attributes/@key='message' } as tags
func spitTagsNPath(xmlpath string) (string, map[string]string) {
subs := keyPathRegex.FindAllStringSubmatch(xmlpath, -1)
tags := make(map[string]string)
// Given XML path, this will spit out final path without predicates
if len(subs) > 0 {
for _, sub := range subs {
tagKey := strings.Split(xmlpath, sub[0])[0] + "/" + strings.TrimSpace(sub[1]) + "/@"
// If we have multiple keys in give path like /events/event[id=2 and type=3]/,
// we must emit multiple tags
for _, kv := range strings.Split(sub[2], " and ") {
key := tagKey + strings.TrimSpace(strings.Split(kv, "=")[0])
tagValue := strings.Replace(strings.Split(kv, "=")[1], "'", "", -1)
tags[key] = tagValue
}
xmlpath = strings.Replace(xmlpath, sub[0], "/"+strings.TrimSpace(sub[1]), 1)
}
}
return xmlpath, tags
}
// Takes in a OC response, extracts tag information from keys and returns a
// list of groups with unique sets of tags+values
func (m *OpenConfigTelemetry) extractData(r *telemetry.OpenConfigData, grpcServer string) []DataGroup {
// Use empty prefix. We will update this when we iterate over key-value pairs
prefix := ""
dgroups := []DataGroup{}
for _, v := range r.Kv {
kv := make(map[string]interface{})
if v.Key == "__prefix__" {
prefix = v.GetStrValue()
continue
}
// Also, lets use prefix if there is one
xmlpath, finaltags := spitTagsNPath(prefix + v.Key)
finaltags["device"] = grpcServer
switch v.Value.(type) {
case *telemetry.KeyValue_StrValue:
// If StrAsTags is set, we treat all string values as tags
if m.StrAsTags {
finaltags[xmlpath] = v.GetStrValue()
} else {
kv[xmlpath] = v.GetStrValue()
}
break
case *telemetry.KeyValue_DoubleValue:
kv[xmlpath] = v.GetDoubleValue()
break
case *telemetry.KeyValue_IntValue:
kv[xmlpath] = v.GetIntValue()
break
case *telemetry.KeyValue_UintValue:
kv[xmlpath] = v.GetUintValue()
break
case *telemetry.KeyValue_SintValue:
kv[xmlpath] = v.GetSintValue()
break
case *telemetry.KeyValue_BoolValue:
kv[xmlpath] = v.GetBoolValue()
break
case *telemetry.KeyValue_BytesValue:
kv[xmlpath] = v.GetBytesValue()
break
}
// Insert other tags from message
finaltags["system_id"] = r.SystemId
finaltags["path"] = r.Path
// Insert derived key and value
dgroups = CollectionByKeys(dgroups).Insert(finaltags, kv)
// Insert data from message header
dgroups = CollectionByKeys(dgroups).Insert(finaltags,
map[string]interface{}{"_sequence": r.SequenceNumber})
dgroups = CollectionByKeys(dgroups).Insert(finaltags,
map[string]interface{}{"_timestamp": r.Timestamp})
dgroups = CollectionByKeys(dgroups).Insert(finaltags,
map[string]interface{}{"_component_id": r.ComponentId})
dgroups = CollectionByKeys(dgroups).Insert(finaltags,
map[string]interface{}{"_subcomponent_id": r.SubComponentId})
}
return dgroups
}
// Structure to hold sensors path list and measurement name
type sensorConfig struct {
measurementName string
pathList []*telemetry.Path
}
// Takes in sensor configuration and converts it into slice of sensorConfig objects
func (m *OpenConfigTelemetry) splitSensorConfig() int {
var pathlist []*telemetry.Path
var measurementName string
var reportingRate uint32
m.sensorsConfig = make([]sensorConfig, 0)
for _, sensor := range m.Sensors {
spathSplit := strings.Fields(sensor)
reportingRate = uint32(m.SampleFrequency.Duration / time.Millisecond)
// Extract measurement name and custom reporting rate if specified. Custom
// reporting rate will be specified at the beginning of sensor list,
// followed by measurement name like "1000ms interfaces /interfaces"
// where 1000ms is the custom reporting rate and interfaces is the
// measurement name. If 1000ms is not given, we use global reporting rate
// from sample_frequency. if measurement name is not given, we use first
// sensor name as the measurement name. If first or the word after custom
// reporting rate doesn't start with /, we treat it as measurement name
// and exclude it from list of sensors to subscribe
duration, err := time.ParseDuration(spathSplit[0])
if err == nil {
reportingRate = uint32(duration / time.Millisecond)
spathSplit = spathSplit[1:]
}
if len(spathSplit) == 0 {
log.Printf("E! No sensors are specified")
continue
}
// Word after custom reporting rate is treated as measurement name
measurementName = spathSplit[0]
// If our word after custom reporting rate doesn't start with /, we treat
// it as measurement name. Else we treat it as sensor
if !strings.HasPrefix(measurementName, "/") {
spathSplit = spathSplit[1:]
}
if len(spathSplit) == 0 {
log.Printf("E! No valid sensors are specified")
continue
}
// Iterate over our sensors and create pathlist to subscribe
pathlist = make([]*telemetry.Path, 0)
for _, path := range spathSplit {
pathlist = append(pathlist, &telemetry.Path{Path: path,
SampleFrequency: reportingRate})
}
m.sensorsConfig = append(m.sensorsConfig, sensorConfig{
measurementName: measurementName, pathList: pathlist,
})
}
return len(m.sensorsConfig)
}
// Subscribes and collects OpenConfig telemetry data from given server
func (m *OpenConfigTelemetry) collectData(ctx context.Context,
grpcServer string, grpcClientConn *grpc.ClientConn,
acc telegraf.Accumulator) error {
c := telemetry.NewOpenConfigTelemetryClient(grpcClientConn)
for _, sensor := range m.sensorsConfig {
m.wg.Add(1)
go func(ctx context.Context, sensor sensorConfig) {
defer m.wg.Done()
for {
stream, err := c.TelemetrySubscribe(ctx,
&telemetry.SubscriptionRequest{PathList: sensor.pathList})
if err != nil {
rpcStatus, _ := status.FromError(err)
// If service is currently unavailable and may come back later, retry
if rpcStatus.Code() != codes.Unavailable {
acc.AddError(fmt.Errorf("E! Could not subscribe to %s: %v", grpcServer,
err))
return
} else {
// Retry with delay. If delay is not provided, use default
if m.RetryDelay.Duration > 0 {
log.Printf("D! Retrying %s with timeout %v", grpcServer,
m.RetryDelay.Duration)
time.Sleep(m.RetryDelay.Duration)
continue
} else {
return
}
}
}
for {
r, err := stream.Recv()
if err != nil {
// If we encounter error in the stream, break so we can retry
// the connection
acc.AddError(fmt.Errorf("E! Failed to read from %s: %v", err, grpcServer))
break
}
log.Printf("D! Received from %s: %v", grpcServer, r)
// Create a point and add to batch
tags := make(map[string]string)
// Insert additional tags
tags["device"] = grpcServer
dgroups := m.extractData(r, grpcServer)
// Print final data collection
log.Printf("D! Available collection for %s is: %v", grpcServer, dgroups)
tnow := time.Now()
// Iterate through data groups and add them
for _, group := range dgroups {
if len(group.tags) == 0 {
acc.AddFields(sensor.measurementName, group.data, tags, tnow)
} else {
acc.AddFields(sensor.measurementName, group.data, group.tags, tnow)
}
}
}
}
}(ctx, sensor)
}
return nil
}
func (m *OpenConfigTelemetry) Start(acc telegraf.Accumulator) error {
// Build sensors config
if m.splitSensorConfig() == 0 {
return fmt.Errorf("E! No valid sensor configuration available")
}
// If SSL certificate is provided, use transport credentials
var err error
var transportCredentials credentials.TransportCredentials
if m.SSLCert != "" {
transportCredentials, err = credentials.NewClientTLSFromFile(m.SSLCert, "")
if err != nil {
return fmt.Errorf("E! Failed to read certificate: %v", err)
}
} else {
transportCredentials = nil
}
// Connect to given list of servers and start collecting data
var grpcClientConn *grpc.ClientConn
var wg sync.WaitGroup
ctx := context.Background()
m.wg = &wg
for _, server := range m.Servers {
// Extract device address and port
grpcServer, grpcPort, err := net.SplitHostPort(server)
if err != nil {
log.Printf("E! Invalid server address: %v", err)
continue
}
// If a certificate is provided, open a secure channel. Else open insecure one
if transportCredentials != nil {
grpcClientConn, err = grpc.Dial(server, grpc.WithTransportCredentials(transportCredentials))
} else {
grpcClientConn, err = grpc.Dial(server, grpc.WithInsecure())
}
if err != nil {
log.Printf("E! Failed to connect to %s: %v", server, err)
} else {
log.Printf("D! Opened a new gRPC session to %s on port %s", grpcServer, grpcPort)
}
// Add to the list of client connections
m.grpcClientConns = append(m.grpcClientConns, grpcClientConn)
if m.Username != "" && m.Password != "" && m.ClientID != "" {
lc := authentication.NewLoginClient(grpcClientConn)
loginReply, loginErr := lc.LoginCheck(ctx,
&authentication.LoginRequest{UserName: m.Username,
Password: m.Password, ClientId: m.ClientID})
if loginErr != nil {
log.Printf("E! Could not initiate login check for %s: %v", server, err)
continue
}
// Check if the user is authenticated. Bail if auth error
if !loginReply.Result {
log.Printf("E! Failed to authenticate the user for %s", server)
continue
}
}
// Subscribe and gather telemetry data
m.collectData(ctx, grpcServer, grpcClientConn, acc)
}
return nil
}
func init() {
inputs.Add("jti_openconfig_telemetry", func() telegraf.Input {
return &OpenConfigTelemetry{
RetryDelay: internal.Duration{Duration: time.Second},
StrAsTags: false,
}
})
}

View File

@ -0,0 +1,225 @@
package jti_openconfig_telemetry
import (
"log"
"net"
"os"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs/jti_openconfig_telemetry/oc"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
var cfg = &OpenConfigTelemetry{
Servers: []string{"127.0.0.1:50051"},
SampleFrequency: internal.Duration{Duration: time.Second * 2},
}
var data = &telemetry.OpenConfigData{
Path: "/sensor",
Kv: []*telemetry.KeyValue{{Key: "/sensor[tag='tagValue']/intKey", Value: &telemetry.KeyValue_IntValue{IntValue: 10}}},
}
var data_with_prefix = &telemetry.OpenConfigData{
Path: "/sensor_with_prefix",
Kv: []*telemetry.KeyValue{{Key: "__prefix__", Value: &telemetry.KeyValue_StrValue{StrValue: "/sensor/prefix/"}},
{Key: "intKey", Value: &telemetry.KeyValue_IntValue{IntValue: 10}}},
}
var data_with_multiple_tags = &telemetry.OpenConfigData{
Path: "/sensor_with_multiple_tags",
Kv: []*telemetry.KeyValue{{Key: "__prefix__", Value: &telemetry.KeyValue_StrValue{StrValue: "/sensor/prefix/"}},
{Key: "tagKey[tag='tagValue']/boolKey", Value: &telemetry.KeyValue_BoolValue{BoolValue: false}},
{Key: "intKey", Value: &telemetry.KeyValue_IntValue{IntValue: 10}}},
}
var data_with_string_values = &telemetry.OpenConfigData{
Path: "/sensor_with_string_values",
Kv: []*telemetry.KeyValue{{Key: "__prefix__", Value: &telemetry.KeyValue_StrValue{StrValue: "/sensor/prefix/"}},
{Key: "strKey[tag='tagValue']/strValue", Value: &telemetry.KeyValue_StrValue{StrValue: "10"}}},
}
type openConfigTelemetryServer struct {
}
func (s *openConfigTelemetryServer) TelemetrySubscribe(req *telemetry.SubscriptionRequest, stream telemetry.OpenConfigTelemetry_TelemetrySubscribeServer) error {
path := req.PathList[0].Path
if path == "/sensor" {
stream.Send(data)
} else if path == "/sensor_with_prefix" {
stream.Send(data_with_prefix)
} else if path == "/sensor_with_multiple_tags" {
stream.Send(data_with_multiple_tags)
} else if path == "/sensor_with_string_values" {
stream.Send(data_with_string_values)
}
return nil
}
func (s *openConfigTelemetryServer) CancelTelemetrySubscription(ctx context.Context, req *telemetry.CancelSubscriptionRequest) (*telemetry.CancelSubscriptionReply, error) {
return nil, nil
}
func (s *openConfigTelemetryServer) GetTelemetrySubscriptions(ctx context.Context, req *telemetry.GetSubscriptionsRequest) (*telemetry.GetSubscriptionsReply, error) {
return nil, nil
}
func (s *openConfigTelemetryServer) GetTelemetryOperationalState(ctx context.Context, req *telemetry.GetOperationalStateRequest) (*telemetry.GetOperationalStateReply, error) {
return nil, nil
}
func (s *openConfigTelemetryServer) GetDataEncodings(ctx context.Context, req *telemetry.DataEncodingRequest) (*telemetry.DataEncodingReply, error) {
return nil, nil
}
func newServer() *openConfigTelemetryServer {
s := new(openConfigTelemetryServer)
return s
}
func TestOpenConfigTelemetryData(t *testing.T) {
var acc testutil.Accumulator
cfg.Sensors = []string{"/sensor"}
err := cfg.Start(&acc)
require.NoError(t, err)
tags := map[string]string{
"device": "127.0.0.1",
"/sensor/@tag": "tagValue",
"system_id": "",
"path": "/sensor",
}
fields := map[string]interface{}{
"/sensor/intKey": int64(10),
"_sequence": uint64(0),
"_timestamp": uint64(0),
"_component_id": uint32(0),
"_subcomponent_id": uint32(0),
}
// Give sometime for gRPC channel to be established
time.Sleep(2 * time.Second)
acc.AssertContainsTaggedFields(t, "/sensor", fields, tags)
}
func TestOpenConfigTelemetryDataWithPrefix(t *testing.T) {
var acc testutil.Accumulator
cfg.Sensors = []string{"/sensor_with_prefix"}
err := cfg.Start(&acc)
require.NoError(t, err)
tags := map[string]string{
"device": "127.0.0.1",
"system_id": "",
"path": "/sensor_with_prefix",
}
fields := map[string]interface{}{
"/sensor/prefix/intKey": int64(10),
"_sequence": uint64(0),
"_timestamp": uint64(0),
"_component_id": uint32(0),
"_subcomponent_id": uint32(0),
}
// Give sometime for gRPC channel to be established
time.Sleep(2 * time.Second)
acc.AssertContainsTaggedFields(t, "/sensor_with_prefix", fields, tags)
}
func TestOpenConfigTelemetryDataWithMultipleTags(t *testing.T) {
var acc testutil.Accumulator
cfg.Sensors = []string{"/sensor_with_multiple_tags"}
err := cfg.Start(&acc)
require.NoError(t, err)
tags1 := map[string]string{
"/sensor/prefix/tagKey/@tag": "tagValue",
"device": "127.0.0.1",
"system_id": "",
"path": "/sensor_with_multiple_tags",
}
fields1 := map[string]interface{}{
"/sensor/prefix/tagKey/boolKey": false,
"_sequence": uint64(0),
"_timestamp": uint64(0),
"_component_id": uint32(0),
"_subcomponent_id": uint32(0),
}
tags2 := map[string]string{
"device": "127.0.0.1",
"system_id": "",
"path": "/sensor_with_multiple_tags",
}
fields2 := map[string]interface{}{
"/sensor/prefix/intKey": int64(10),
"_sequence": uint64(0),
"_timestamp": uint64(0),
"_component_id": uint32(0),
"_subcomponent_id": uint32(0),
}
// Give sometime for gRPC channel to be established
time.Sleep(2 * time.Second)
acc.AssertContainsTaggedFields(t, "/sensor_with_multiple_tags", fields1, tags1)
acc.AssertContainsTaggedFields(t, "/sensor_with_multiple_tags", fields2, tags2)
}
func TestOpenConfigTelemetryDataWithStringValues(t *testing.T) {
var acc testutil.Accumulator
cfg.Sensors = []string{"/sensor_with_string_values"}
err := cfg.Start(&acc)
require.NoError(t, err)
tags := map[string]string{
"/sensor/prefix/strKey/@tag": "tagValue",
"device": "127.0.0.1",
"system_id": "",
"path": "/sensor_with_string_values",
}
fields := map[string]interface{}{
"/sensor/prefix/strKey/strValue": "10",
"_sequence": uint64(0),
"_timestamp": uint64(0),
"_component_id": uint32(0),
"_subcomponent_id": uint32(0),
}
// Give sometime for gRPC channel to be established
time.Sleep(2 * time.Second)
acc.AssertContainsTaggedFields(t, "/sensor_with_string_values", fields, tags)
}
func TestMain(m *testing.M) {
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
cfg.Servers = []string{lis.Addr().String()}
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
telemetry.RegisterOpenConfigTelemetryServer(grpcServer, newServer())
go func() {
grpcServer.Serve(lis)
}()
defer grpcServer.Stop()
os.Exit(m.Run())
}