From fdd899e9d4fbe0662b51ca4cb0470de0e9473817 Mon Sep 17 00:00:00 2001 From: Ajay Kumar Chintala Date: Fri, 11 May 2018 17:58:19 -0700 Subject: [PATCH] Add service input plugin for OpenConfig streaming telemetry (#2292) --- Godeps | 4 +- plugins/inputs/all/all.go | 1 + .../inputs/jti_openconfig_telemetry/README.md | 59 + .../auth/authentication_service.pb.go | 182 +++ .../auth/authentication_service.proto | 48 + .../jti_openconfig_telemetry/collection.go | 63 + .../jti_openconfig_telemetry/oc/oc.pb.go | 1303 +++++++++++++++++ .../jti_openconfig_telemetry/oc/oc.proto | 319 ++++ .../openconfig_telemetry.go | 422 ++++++ .../openconfig_telemetry_test.go | 225 +++ 10 files changed, 2625 insertions(+), 1 deletion(-) create mode 100644 plugins/inputs/jti_openconfig_telemetry/README.md create mode 100644 plugins/inputs/jti_openconfig_telemetry/auth/authentication_service.pb.go create mode 100644 plugins/inputs/jti_openconfig_telemetry/auth/authentication_service.proto create mode 100644 plugins/inputs/jti_openconfig_telemetry/collection.go create mode 100644 plugins/inputs/jti_openconfig_telemetry/oc/oc.pb.go create mode 100644 plugins/inputs/jti_openconfig_telemetry/oc/oc.proto create mode 100644 plugins/inputs/jti_openconfig_telemetry/openconfig_telemetry.go create mode 100644 plugins/inputs/jti_openconfig_telemetry/openconfig_telemetry_test.go diff --git a/Godeps b/Godeps index 34fb3fe0c..12cf9cb92 100644 --- a/Godeps +++ b/Godeps @@ -83,9 +83,11 @@ github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096 github.com/yuin/gopher-lua 66c871e454fcf10251c61bf8eff02d0978cae75a github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 golang.org/x/crypto dc137beb6cce2043eb6b5f223ab8bf51c32459f4 -golang.org/x/net f2499483f923065a842d38eb4c7f1927e6fc6e6d +golang.org/x/net a337091b0525af65de94df2eb7e98bd9962dcbe2 golang.org/x/sys 739734461d1c916b6c72a63d7efda2b27edb369f golang.org/x/text 506f9d5c962f284575e88337e7d9296d27e729d3 +google.golang.org/genproto 11c7f9e547da6db876260ce49ea7536985904c9b +google.golang.org/grpc de2209a968d48e8970546c8a710189f7461370f7 gopkg.in/asn1-ber.v1 4e86f4367175e39f69d9358a5f17b4dda270378d gopkg.in/fatih/pool.v2 6e328e67893eb46323ad06f0e92cb9536babbabc gopkg.in/gorethink/gorethink.v3 7ab832f7b65573104a555d84a27992ae9ea1f659 diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index f4a630ecc..bb2316c95 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -42,6 +42,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/iptables" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia2" + _ "github.com/influxdata/telegraf/plugins/inputs/jti_openconfig_telemetry" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy" _ "github.com/influxdata/telegraf/plugins/inputs/kapacitor" diff --git a/plugins/inputs/jti_openconfig_telemetry/README.md b/plugins/inputs/jti_openconfig_telemetry/README.md new file mode 100644 index 000000000..7c30aaa8d --- /dev/null +++ b/plugins/inputs/jti_openconfig_telemetry/README.md @@ -0,0 +1,59 @@ +# JTI OpenConfig Telemetry Input Plugin + +This plugin reads Juniper Networks implementation of OpenConfig telemetry data from listed sensors using Junos Telemetry Interface. Refer to +[openconfig.net](http://openconfig.net/) for more details about OpenConfig and [Junos Telemetry Interface (JTI)](https://www.juniper.net/documentation/en_US/junos/topics/concept/junos-telemetry-interface-oveview.html). + +### Configuration: + +```toml +# Subscribe and receive OpenConfig Telemetry data using JTI +[[inputs.jti_openconfig_telemetry]] + ## List of device addresses to collect telemetry from + servers = ["localhost:1883"] + + ## Authentication details. Username and password are must if device expects + ## authentication. Client ID must be unique when connecting from multiple instances + ## of telegraf to the same device + username = "user" + password = "pass" + client_id = "telegraf" + + ## Frequency to get data + sample_frequency = "1000ms" + + ## Sensors to subscribe for + ## A identifier for each sensor can be provided in path by separating with space + ## Else sensor path will be used as identifier + ## When identifier is used, we can provide a list of space separated sensors. + ## A single subscription will be created with all these sensors and data will + ## be saved to measurement with this identifier name + sensors = [ + "/interfaces/", + "collection /components/ /lldp", + ] + + ## We allow specifying sensor group level reporting rate. To do this, specify the + ## reporting rate in Duration at the beginning of sensor paths / collection + ## name. For entries without reporting rate, we use configured sample frequency + sensors = [ + "1000ms customReporting /interfaces /lldp", + "2000ms collection /components", + "/interfaces", + ] + + ## x509 Certificate to use with TLS connection. If it is not provided, an insecure + ## channel will be opened with server + ssl_cert = "/etc/telegraf/cert.pem" + + ## Delay between retry attempts of failed RPC calls or streams. Defaults to 1000ms. + ## Failed streams/calls will not be retried if 0 is provided + retry_delay = "1000ms" + + ## To treat all string values as tags, set this to true + str_as_tags = false +``` + +### Tags: + +- All measurements are tagged appropriately using the identifier information + in incoming data diff --git a/plugins/inputs/jti_openconfig_telemetry/auth/authentication_service.pb.go b/plugins/inputs/jti_openconfig_telemetry/auth/authentication_service.pb.go new file mode 100644 index 000000000..7ddeefaca --- /dev/null +++ b/plugins/inputs/jti_openconfig_telemetry/auth/authentication_service.pb.go @@ -0,0 +1,182 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: authentication_service.proto + +/* +Package authentication is a generated protocol buffer package. + +It is generated from these files: + authentication_service.proto + +It has these top-level messages: + LoginRequest + LoginReply +*/ +package authentication + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +// The request message containing the user's name, password and client id +type LoginRequest struct { + UserName string `protobuf:"bytes,1,opt,name=user_name,json=userName" json:"user_name,omitempty"` + Password string `protobuf:"bytes,2,opt,name=password" json:"password,omitempty"` + ClientId string `protobuf:"bytes,3,opt,name=client_id,json=clientId" json:"client_id,omitempty"` +} + +func (m *LoginRequest) Reset() { *m = LoginRequest{} } +func (m *LoginRequest) String() string { return proto.CompactTextString(m) } +func (*LoginRequest) ProtoMessage() {} +func (*LoginRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *LoginRequest) GetUserName() string { + if m != nil { + return m.UserName + } + return "" +} + +func (m *LoginRequest) GetPassword() string { + if m != nil { + return m.Password + } + return "" +} + +func (m *LoginRequest) GetClientId() string { + if m != nil { + return m.ClientId + } + return "" +} + +// The response message containing the result of login attempt. +// result value of true indicates success and false indicates +// failure +type LoginReply struct { + Result bool `protobuf:"varint,1,opt,name=result" json:"result,omitempty"` +} + +func (m *LoginReply) Reset() { *m = LoginReply{} } +func (m *LoginReply) String() string { return proto.CompactTextString(m) } +func (*LoginReply) ProtoMessage() {} +func (*LoginReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *LoginReply) GetResult() bool { + if m != nil { + return m.Result + } + return false +} + +func init() { + proto.RegisterType((*LoginRequest)(nil), "authentication.LoginRequest") + proto.RegisterType((*LoginReply)(nil), "authentication.LoginReply") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for Login service + +type LoginClient interface { + LoginCheck(ctx context.Context, in *LoginRequest, opts ...grpc.CallOption) (*LoginReply, error) +} + +type loginClient struct { + cc *grpc.ClientConn +} + +func NewLoginClient(cc *grpc.ClientConn) LoginClient { + return &loginClient{cc} +} + +func (c *loginClient) LoginCheck(ctx context.Context, in *LoginRequest, opts ...grpc.CallOption) (*LoginReply, error) { + out := new(LoginReply) + err := grpc.Invoke(ctx, "/authentication.Login/LoginCheck", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Login service + +type LoginServer interface { + LoginCheck(context.Context, *LoginRequest) (*LoginReply, error) +} + +func RegisterLoginServer(s *grpc.Server, srv LoginServer) { + s.RegisterService(&_Login_serviceDesc, srv) +} + +func _Login_LoginCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(LoginRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(LoginServer).LoginCheck(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/authentication.Login/LoginCheck", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(LoginServer).LoginCheck(ctx, req.(*LoginRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Login_serviceDesc = grpc.ServiceDesc{ + ServiceName: "authentication.Login", + HandlerType: (*LoginServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "LoginCheck", + Handler: _Login_LoginCheck_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "authentication_service.proto", +} + +func init() { proto.RegisterFile("authentication_service.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 200 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x49, 0x2c, 0x2d, 0xc9, + 0x48, 0xcd, 0x2b, 0xc9, 0x4c, 0x4e, 0x2c, 0xc9, 0xcc, 0xcf, 0x8b, 0x2f, 0x4e, 0x2d, 0x2a, 0xcb, + 0x4c, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x43, 0x95, 0x55, 0x4a, 0xe1, 0xe2, + 0xf1, 0xc9, 0x4f, 0xcf, 0xcc, 0x0b, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x92, 0xe6, 0xe2, + 0x2c, 0x2d, 0x4e, 0x2d, 0x8a, 0xcf, 0x4b, 0xcc, 0x4d, 0x95, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, + 0xe2, 0x00, 0x09, 0xf8, 0x25, 0xe6, 0xa6, 0x0a, 0x49, 0x71, 0x71, 0x14, 0x24, 0x16, 0x17, 0x97, + 0xe7, 0x17, 0xa5, 0x48, 0x30, 0x41, 0xe4, 0x60, 0x7c, 0x90, 0xc6, 0xe4, 0x9c, 0xcc, 0xd4, 0xbc, + 0x92, 0xf8, 0xcc, 0x14, 0x09, 0x66, 0x88, 0x24, 0x44, 0xc0, 0x33, 0x45, 0x49, 0x85, 0x8b, 0x0b, + 0x6a, 0x4b, 0x41, 0x4e, 0xa5, 0x90, 0x18, 0x17, 0x5b, 0x51, 0x6a, 0x71, 0x69, 0x4e, 0x09, 0xd8, + 0x02, 0x8e, 0x20, 0x28, 0xcf, 0x28, 0x90, 0x8b, 0x15, 0xac, 0x4a, 0xc8, 0x03, 0xaa, 0xdc, 0x39, + 0x23, 0x35, 0x39, 0x5b, 0x48, 0x46, 0x0f, 0xd5, 0xcd, 0x7a, 0xc8, 0x0e, 0x96, 0x92, 0xc2, 0x21, + 0x5b, 0x90, 0x53, 0xa9, 0xc4, 0x90, 0xc4, 0x06, 0xf6, 0xb5, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, + 0x11, 0x57, 0x52, 0xd2, 0x15, 0x01, 0x00, 0x00, +} diff --git a/plugins/inputs/jti_openconfig_telemetry/auth/authentication_service.proto b/plugins/inputs/jti_openconfig_telemetry/auth/authentication_service.proto new file mode 100644 index 000000000..a41e13a09 --- /dev/null +++ b/plugins/inputs/jti_openconfig_telemetry/auth/authentication_service.proto @@ -0,0 +1,48 @@ +// +// Copyrights (c) 2017, Juniper Networks, Inc. +// All rights reserved. +// + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +syntax = "proto3"; + +package authentication; + +// The Login service definition. +service Login { + rpc LoginCheck (LoginRequest) returns (LoginReply) {} +} + +// The request message containing the user's name, password and client id +message LoginRequest { + string user_name = 1; + string password = 2; + string client_id = 3; +} + +/* + * The response message containing the result of login attempt. + * result value of true indicates success and false indicates + * failure + */ +message LoginReply { + bool result = 1; +} diff --git a/plugins/inputs/jti_openconfig_telemetry/collection.go b/plugins/inputs/jti_openconfig_telemetry/collection.go new file mode 100644 index 000000000..ffd9019f5 --- /dev/null +++ b/plugins/inputs/jti_openconfig_telemetry/collection.go @@ -0,0 +1,63 @@ +package jti_openconfig_telemetry + +import "sort" + +type DataGroup struct { + numKeys int + tags map[string]string + data map[string]interface{} +} + +// Sort the data groups by number of keys +type CollectionByKeys []DataGroup + +func (a CollectionByKeys) Len() int { return len(a) } +func (a CollectionByKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a CollectionByKeys) Less(i, j int) bool { return a[i].numKeys < a[j].numKeys } + +// Checks to see if there is already a group with these tags and returns its index. Returns -1 if unavailable. +func (a CollectionByKeys) IsAvailable(tags map[string]string) *DataGroup { + sort.Sort(CollectionByKeys(a)) + + // Iterate through all the groups and see if we have group with these tags + for _, group := range a { + // Since already sorted, match with only groups with N keys + if group.numKeys < len(tags) { + continue + } else if group.numKeys > len(tags) { + break + } + + matchFound := true + for k, v := range tags { + if val, ok := group.tags[k]; ok { + if val != v { + matchFound = false + break + } + } else { + matchFound = false + break + } + } + + if matchFound { + return &group + } + } + return nil +} + +// Inserts into already existing group or creates a new group +func (a CollectionByKeys) Insert(tags map[string]string, data map[string]interface{}) CollectionByKeys { + // If there is already a group with this set of tags, insert into it. Otherwise create a new group and insert + if group := a.IsAvailable(tags); group != nil { + for k, v := range data { + group.data[k] = v + } + } else { + a = append(a, DataGroup{len(tags), tags, data}) + } + + return a +} diff --git a/plugins/inputs/jti_openconfig_telemetry/oc/oc.pb.go b/plugins/inputs/jti_openconfig_telemetry/oc/oc.pb.go new file mode 100644 index 000000000..a4cd76cc4 --- /dev/null +++ b/plugins/inputs/jti_openconfig_telemetry/oc/oc.pb.go @@ -0,0 +1,1303 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: oc.proto + +/* +Package telemetry is a generated protocol buffer package. + +It is generated from these files: + oc.proto + +It has these top-level messages: + SubscriptionRequest + SubscriptionInput + Collector + Path + SubscriptionAdditionalConfig + SubscriptionReply + SubscriptionResponse + OpenConfigData + KeyValue + Delete + Eom + CancelSubscriptionRequest + CancelSubscriptionReply + GetSubscriptionsRequest + GetSubscriptionsReply + GetOperationalStateRequest + GetOperationalStateReply + DataEncodingRequest + DataEncodingReply +*/ +package telemetry + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +// Result of the operation +type ReturnCode int32 + +const ( + ReturnCode_SUCCESS ReturnCode = 0 + ReturnCode_NO_SUBSCRIPTION_ENTRY ReturnCode = 1 + ReturnCode_UNKNOWN_ERROR ReturnCode = 2 +) + +var ReturnCode_name = map[int32]string{ + 0: "SUCCESS", + 1: "NO_SUBSCRIPTION_ENTRY", + 2: "UNKNOWN_ERROR", +} +var ReturnCode_value = map[string]int32{ + "SUCCESS": 0, + "NO_SUBSCRIPTION_ENTRY": 1, + "UNKNOWN_ERROR": 2, +} + +func (x ReturnCode) String() string { + return proto.EnumName(ReturnCode_name, int32(x)) +} +func (ReturnCode) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +// Verbosity Level +type VerbosityLevel int32 + +const ( + VerbosityLevel_DETAIL VerbosityLevel = 0 + VerbosityLevel_TERSE VerbosityLevel = 1 + VerbosityLevel_BRIEF VerbosityLevel = 2 +) + +var VerbosityLevel_name = map[int32]string{ + 0: "DETAIL", + 1: "TERSE", + 2: "BRIEF", +} +var VerbosityLevel_value = map[string]int32{ + "DETAIL": 0, + "TERSE": 1, + "BRIEF": 2, +} + +func (x VerbosityLevel) String() string { + return proto.EnumName(VerbosityLevel_name, int32(x)) +} +func (VerbosityLevel) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +// Encoding Type Supported +type EncodingType int32 + +const ( + EncodingType_UNDEFINED EncodingType = 0 + EncodingType_XML EncodingType = 1 + EncodingType_JSON_IETF EncodingType = 2 + EncodingType_PROTO3 EncodingType = 3 +) + +var EncodingType_name = map[int32]string{ + 0: "UNDEFINED", + 1: "XML", + 2: "JSON_IETF", + 3: "PROTO3", +} +var EncodingType_value = map[string]int32{ + "UNDEFINED": 0, + "XML": 1, + "JSON_IETF": 2, + "PROTO3": 3, +} + +func (x EncodingType) String() string { + return proto.EnumName(EncodingType_name, int32(x)) +} +func (EncodingType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +// Message sent for a telemetry subscription request +type SubscriptionRequest struct { + // Data associated with a telemetry subscription + Input *SubscriptionInput `protobuf:"bytes,1,opt,name=input" json:"input,omitempty"` + // List of data models paths and filters + // which are used in a telemetry operation. + PathList []*Path `protobuf:"bytes,2,rep,name=path_list,json=pathList" json:"path_list,omitempty"` + // The below configuration is not defined in Openconfig RPC. + // It is a proposed extension to configure additional + // subscription request features. + AdditionalConfig *SubscriptionAdditionalConfig `protobuf:"bytes,3,opt,name=additional_config,json=additionalConfig" json:"additional_config,omitempty"` +} + +func (m *SubscriptionRequest) Reset() { *m = SubscriptionRequest{} } +func (m *SubscriptionRequest) String() string { return proto.CompactTextString(m) } +func (*SubscriptionRequest) ProtoMessage() {} +func (*SubscriptionRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *SubscriptionRequest) GetInput() *SubscriptionInput { + if m != nil { + return m.Input + } + return nil +} + +func (m *SubscriptionRequest) GetPathList() []*Path { + if m != nil { + return m.PathList + } + return nil +} + +func (m *SubscriptionRequest) GetAdditionalConfig() *SubscriptionAdditionalConfig { + if m != nil { + return m.AdditionalConfig + } + return nil +} + +// Data associated with a telemetry subscription +type SubscriptionInput struct { + // List of optional collector endpoints to send data for + // this subscription. + // If no collector destinations are specified, the collector + // destination is assumed to be the requester on the rpc channel. + CollectorList []*Collector `protobuf:"bytes,1,rep,name=collector_list,json=collectorList" json:"collector_list,omitempty"` +} + +func (m *SubscriptionInput) Reset() { *m = SubscriptionInput{} } +func (m *SubscriptionInput) String() string { return proto.CompactTextString(m) } +func (*SubscriptionInput) ProtoMessage() {} +func (*SubscriptionInput) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *SubscriptionInput) GetCollectorList() []*Collector { + if m != nil { + return m.CollectorList + } + return nil +} + +// Collector endpoints to send data specified as an ip+port combination. +type Collector struct { + // IP address of collector endpoint + Address string `protobuf:"bytes,1,opt,name=address" json:"address,omitempty"` + // Transport protocol port number for the collector destination. + Port uint32 `protobuf:"varint,2,opt,name=port" json:"port,omitempty"` +} + +func (m *Collector) Reset() { *m = Collector{} } +func (m *Collector) String() string { return proto.CompactTextString(m) } +func (*Collector) ProtoMessage() {} +func (*Collector) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *Collector) GetAddress() string { + if m != nil { + return m.Address + } + return "" +} + +func (m *Collector) GetPort() uint32 { + if m != nil { + return m.Port + } + return 0 +} + +// Data model path +type Path struct { + // Data model path of interest + // Path specification for elements of OpenConfig data models + Path string `protobuf:"bytes,1,opt,name=path" json:"path,omitempty"` + // Regular expression to be used in filtering state leaves + Filter string `protobuf:"bytes,2,opt,name=filter" json:"filter,omitempty"` + // If this is set to true, the target device will only send + // updates to the collector upon a change in data value + SuppressUnchanged bool `protobuf:"varint,3,opt,name=suppress_unchanged,json=suppressUnchanged" json:"suppress_unchanged,omitempty"` + // Maximum time in ms the target device may go without sending + // a message to the collector. If this time expires with + // suppress-unchanged set, the target device must send an update + // message regardless if the data values have changed. + MaxSilentInterval uint32 `protobuf:"varint,4,opt,name=max_silent_interval,json=maxSilentInterval" json:"max_silent_interval,omitempty"` + // Time in ms between collection and transmission of the + // specified data to the collector platform. The target device + // will sample the corresponding data (e.g,. a counter) and + // immediately send to the collector destination. + // + // If sample-frequency is set to 0, then the network device + // must emit an update upon every datum change. + SampleFrequency uint32 `protobuf:"varint,5,opt,name=sample_frequency,json=sampleFrequency" json:"sample_frequency,omitempty"` + // EOM needed for each walk cycle of this path? + // For periodic sensor, applicable for each complete reap + // For event sensor, applicable when initial dump is over + // (same as EOS) + // This feature is not implemented currently. + NeedEom bool `protobuf:"varint,6,opt,name=need_eom,json=needEom" json:"need_eom,omitempty"` +} + +func (m *Path) Reset() { *m = Path{} } +func (m *Path) String() string { return proto.CompactTextString(m) } +func (*Path) ProtoMessage() {} +func (*Path) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *Path) GetPath() string { + if m != nil { + return m.Path + } + return "" +} + +func (m *Path) GetFilter() string { + if m != nil { + return m.Filter + } + return "" +} + +func (m *Path) GetSuppressUnchanged() bool { + if m != nil { + return m.SuppressUnchanged + } + return false +} + +func (m *Path) GetMaxSilentInterval() uint32 { + if m != nil { + return m.MaxSilentInterval + } + return 0 +} + +func (m *Path) GetSampleFrequency() uint32 { + if m != nil { + return m.SampleFrequency + } + return 0 +} + +func (m *Path) GetNeedEom() bool { + if m != nil { + return m.NeedEom + } + return false +} + +// Configure subscription request additional features. +type SubscriptionAdditionalConfig struct { + // limit the number of records sent in the stream + LimitRecords int32 `protobuf:"varint,1,opt,name=limit_records,json=limitRecords" json:"limit_records,omitempty"` + // limit the time the stream remains open + LimitTimeSeconds int32 `protobuf:"varint,2,opt,name=limit_time_seconds,json=limitTimeSeconds" json:"limit_time_seconds,omitempty"` + // EOS needed for this subscription? + NeedEos bool `protobuf:"varint,3,opt,name=need_eos,json=needEos" json:"need_eos,omitempty"` +} + +func (m *SubscriptionAdditionalConfig) Reset() { *m = SubscriptionAdditionalConfig{} } +func (m *SubscriptionAdditionalConfig) String() string { return proto.CompactTextString(m) } +func (*SubscriptionAdditionalConfig) ProtoMessage() {} +func (*SubscriptionAdditionalConfig) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *SubscriptionAdditionalConfig) GetLimitRecords() int32 { + if m != nil { + return m.LimitRecords + } + return 0 +} + +func (m *SubscriptionAdditionalConfig) GetLimitTimeSeconds() int32 { + if m != nil { + return m.LimitTimeSeconds + } + return 0 +} + +func (m *SubscriptionAdditionalConfig) GetNeedEos() bool { + if m != nil { + return m.NeedEos + } + return false +} + +// 1. Reply data message sent out using out-of-band channel. +type SubscriptionReply struct { + // Response message to a telemetry subscription creation or + // get request. + Response *SubscriptionResponse `protobuf:"bytes,1,opt,name=response" json:"response,omitempty"` + // List of data models paths and filters + // which are used in a telemetry operation. + PathList []*Path `protobuf:"bytes,2,rep,name=path_list,json=pathList" json:"path_list,omitempty"` +} + +func (m *SubscriptionReply) Reset() { *m = SubscriptionReply{} } +func (m *SubscriptionReply) String() string { return proto.CompactTextString(m) } +func (*SubscriptionReply) ProtoMessage() {} +func (*SubscriptionReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } + +func (m *SubscriptionReply) GetResponse() *SubscriptionResponse { + if m != nil { + return m.Response + } + return nil +} + +func (m *SubscriptionReply) GetPathList() []*Path { + if m != nil { + return m.PathList + } + return nil +} + +// Response message to a telemetry subscription creation or get request. +type SubscriptionResponse struct { + // Unique id for the subscription on the device. This is + // generated by the device and returned in a subscription + // request or when listing existing subscriptions + SubscriptionId uint32 `protobuf:"varint,1,opt,name=subscription_id,json=subscriptionId" json:"subscription_id,omitempty"` +} + +func (m *SubscriptionResponse) Reset() { *m = SubscriptionResponse{} } +func (m *SubscriptionResponse) String() string { return proto.CompactTextString(m) } +func (*SubscriptionResponse) ProtoMessage() {} +func (*SubscriptionResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } + +func (m *SubscriptionResponse) GetSubscriptionId() uint32 { + if m != nil { + return m.SubscriptionId + } + return 0 +} + +// 2. Telemetry data send back on the same connection as the +// subscription request. +type OpenConfigData struct { + // router name:export IP address + SystemId string `protobuf:"bytes,1,opt,name=system_id,json=systemId" json:"system_id,omitempty"` + // line card / RE (slot number) + ComponentId uint32 `protobuf:"varint,2,opt,name=component_id,json=componentId" json:"component_id,omitempty"` + // PFE (if applicable) + SubComponentId uint32 `protobuf:"varint,3,opt,name=sub_component_id,json=subComponentId" json:"sub_component_id,omitempty"` + // Path specification for elements of OpenConfig data models + Path string `protobuf:"bytes,4,opt,name=path" json:"path,omitempty"` + // Sequence number, monotonically increasing for each + // system_id, component_id, sub_component_id + path. + SequenceNumber uint64 `protobuf:"varint,5,opt,name=sequence_number,json=sequenceNumber" json:"sequence_number,omitempty"` + // timestamp (milliseconds since epoch) + Timestamp uint64 `protobuf:"varint,6,opt,name=timestamp" json:"timestamp,omitempty"` + // List of key-value pairs + Kv []*KeyValue `protobuf:"bytes,7,rep,name=kv" json:"kv,omitempty"` + // For delete. If filled, it indicates delete + Delete []*Delete `protobuf:"bytes,8,rep,name=delete" json:"delete,omitempty"` + // If filled, it indicates end of marker for the + // respective path in the list. + Eom []*Eom `protobuf:"bytes,9,rep,name=eom" json:"eom,omitempty"` + // If filled, it indicates end of sync for complete subscription + SyncResponse bool `protobuf:"varint,10,opt,name=sync_response,json=syncResponse" json:"sync_response,omitempty"` +} + +func (m *OpenConfigData) Reset() { *m = OpenConfigData{} } +func (m *OpenConfigData) String() string { return proto.CompactTextString(m) } +func (*OpenConfigData) ProtoMessage() {} +func (*OpenConfigData) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } + +func (m *OpenConfigData) GetSystemId() string { + if m != nil { + return m.SystemId + } + return "" +} + +func (m *OpenConfigData) GetComponentId() uint32 { + if m != nil { + return m.ComponentId + } + return 0 +} + +func (m *OpenConfigData) GetSubComponentId() uint32 { + if m != nil { + return m.SubComponentId + } + return 0 +} + +func (m *OpenConfigData) GetPath() string { + if m != nil { + return m.Path + } + return "" +} + +func (m *OpenConfigData) GetSequenceNumber() uint64 { + if m != nil { + return m.SequenceNumber + } + return 0 +} + +func (m *OpenConfigData) GetTimestamp() uint64 { + if m != nil { + return m.Timestamp + } + return 0 +} + +func (m *OpenConfigData) GetKv() []*KeyValue { + if m != nil { + return m.Kv + } + return nil +} + +func (m *OpenConfigData) GetDelete() []*Delete { + if m != nil { + return m.Delete + } + return nil +} + +func (m *OpenConfigData) GetEom() []*Eom { + if m != nil { + return m.Eom + } + return nil +} + +func (m *OpenConfigData) GetSyncResponse() bool { + if m != nil { + return m.SyncResponse + } + return false +} + +// Simple Key-value, where value could be one of scalar types +type KeyValue struct { + // Key + Key string `protobuf:"bytes,1,opt,name=key" json:"key,omitempty"` + // One of possible values + // + // Types that are valid to be assigned to Value: + // *KeyValue_DoubleValue + // *KeyValue_IntValue + // *KeyValue_UintValue + // *KeyValue_SintValue + // *KeyValue_BoolValue + // *KeyValue_StrValue + // *KeyValue_BytesValue + Value isKeyValue_Value `protobuf_oneof:"value"` +} + +func (m *KeyValue) Reset() { *m = KeyValue{} } +func (m *KeyValue) String() string { return proto.CompactTextString(m) } +func (*KeyValue) ProtoMessage() {} +func (*KeyValue) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } + +type isKeyValue_Value interface { + isKeyValue_Value() +} + +type KeyValue_DoubleValue struct { + DoubleValue float64 `protobuf:"fixed64,5,opt,name=double_value,json=doubleValue,oneof"` +} +type KeyValue_IntValue struct { + IntValue int64 `protobuf:"varint,6,opt,name=int_value,json=intValue,oneof"` +} +type KeyValue_UintValue struct { + UintValue uint64 `protobuf:"varint,7,opt,name=uint_value,json=uintValue,oneof"` +} +type KeyValue_SintValue struct { + SintValue int64 `protobuf:"zigzag64,8,opt,name=sint_value,json=sintValue,oneof"` +} +type KeyValue_BoolValue struct { + BoolValue bool `protobuf:"varint,9,opt,name=bool_value,json=boolValue,oneof"` +} +type KeyValue_StrValue struct { + StrValue string `protobuf:"bytes,10,opt,name=str_value,json=strValue,oneof"` +} +type KeyValue_BytesValue struct { + BytesValue []byte `protobuf:"bytes,11,opt,name=bytes_value,json=bytesValue,proto3,oneof"` +} + +func (*KeyValue_DoubleValue) isKeyValue_Value() {} +func (*KeyValue_IntValue) isKeyValue_Value() {} +func (*KeyValue_UintValue) isKeyValue_Value() {} +func (*KeyValue_SintValue) isKeyValue_Value() {} +func (*KeyValue_BoolValue) isKeyValue_Value() {} +func (*KeyValue_StrValue) isKeyValue_Value() {} +func (*KeyValue_BytesValue) isKeyValue_Value() {} + +func (m *KeyValue) GetValue() isKeyValue_Value { + if m != nil { + return m.Value + } + return nil +} + +func (m *KeyValue) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *KeyValue) GetDoubleValue() float64 { + if x, ok := m.GetValue().(*KeyValue_DoubleValue); ok { + return x.DoubleValue + } + return 0 +} + +func (m *KeyValue) GetIntValue() int64 { + if x, ok := m.GetValue().(*KeyValue_IntValue); ok { + return x.IntValue + } + return 0 +} + +func (m *KeyValue) GetUintValue() uint64 { + if x, ok := m.GetValue().(*KeyValue_UintValue); ok { + return x.UintValue + } + return 0 +} + +func (m *KeyValue) GetSintValue() int64 { + if x, ok := m.GetValue().(*KeyValue_SintValue); ok { + return x.SintValue + } + return 0 +} + +func (m *KeyValue) GetBoolValue() bool { + if x, ok := m.GetValue().(*KeyValue_BoolValue); ok { + return x.BoolValue + } + return false +} + +func (m *KeyValue) GetStrValue() string { + if x, ok := m.GetValue().(*KeyValue_StrValue); ok { + return x.StrValue + } + return "" +} + +func (m *KeyValue) GetBytesValue() []byte { + if x, ok := m.GetValue().(*KeyValue_BytesValue); ok { + return x.BytesValue + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*KeyValue) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _KeyValue_OneofMarshaler, _KeyValue_OneofUnmarshaler, _KeyValue_OneofSizer, []interface{}{ + (*KeyValue_DoubleValue)(nil), + (*KeyValue_IntValue)(nil), + (*KeyValue_UintValue)(nil), + (*KeyValue_SintValue)(nil), + (*KeyValue_BoolValue)(nil), + (*KeyValue_StrValue)(nil), + (*KeyValue_BytesValue)(nil), + } +} + +func _KeyValue_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*KeyValue) + // value + switch x := m.Value.(type) { + case *KeyValue_DoubleValue: + b.EncodeVarint(5<<3 | proto.WireFixed64) + b.EncodeFixed64(math.Float64bits(x.DoubleValue)) + case *KeyValue_IntValue: + b.EncodeVarint(6<<3 | proto.WireVarint) + b.EncodeVarint(uint64(x.IntValue)) + case *KeyValue_UintValue: + b.EncodeVarint(7<<3 | proto.WireVarint) + b.EncodeVarint(uint64(x.UintValue)) + case *KeyValue_SintValue: + b.EncodeVarint(8<<3 | proto.WireVarint) + b.EncodeZigzag64(uint64(x.SintValue)) + case *KeyValue_BoolValue: + t := uint64(0) + if x.BoolValue { + t = 1 + } + b.EncodeVarint(9<<3 | proto.WireVarint) + b.EncodeVarint(t) + case *KeyValue_StrValue: + b.EncodeVarint(10<<3 | proto.WireBytes) + b.EncodeStringBytes(x.StrValue) + case *KeyValue_BytesValue: + b.EncodeVarint(11<<3 | proto.WireBytes) + b.EncodeRawBytes(x.BytesValue) + case nil: + default: + return fmt.Errorf("KeyValue.Value has unexpected type %T", x) + } + return nil +} + +func _KeyValue_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*KeyValue) + switch tag { + case 5: // value.double_value + if wire != proto.WireFixed64 { + return true, proto.ErrInternalBadWireType + } + x, err := b.DecodeFixed64() + m.Value = &KeyValue_DoubleValue{math.Float64frombits(x)} + return true, err + case 6: // value.int_value + if wire != proto.WireVarint { + return true, proto.ErrInternalBadWireType + } + x, err := b.DecodeVarint() + m.Value = &KeyValue_IntValue{int64(x)} + return true, err + case 7: // value.uint_value + if wire != proto.WireVarint { + return true, proto.ErrInternalBadWireType + } + x, err := b.DecodeVarint() + m.Value = &KeyValue_UintValue{x} + return true, err + case 8: // value.sint_value + if wire != proto.WireVarint { + return true, proto.ErrInternalBadWireType + } + x, err := b.DecodeZigzag64() + m.Value = &KeyValue_SintValue{int64(x)} + return true, err + case 9: // value.bool_value + if wire != proto.WireVarint { + return true, proto.ErrInternalBadWireType + } + x, err := b.DecodeVarint() + m.Value = &KeyValue_BoolValue{x != 0} + return true, err + case 10: // value.str_value + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + x, err := b.DecodeStringBytes() + m.Value = &KeyValue_StrValue{x} + return true, err + case 11: // value.bytes_value + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + x, err := b.DecodeRawBytes(true) + m.Value = &KeyValue_BytesValue{x} + return true, err + default: + return false, nil + } +} + +func _KeyValue_OneofSizer(msg proto.Message) (n int) { + m := msg.(*KeyValue) + // value + switch x := m.Value.(type) { + case *KeyValue_DoubleValue: + n += proto.SizeVarint(5<<3 | proto.WireFixed64) + n += 8 + case *KeyValue_IntValue: + n += proto.SizeVarint(6<<3 | proto.WireVarint) + n += proto.SizeVarint(uint64(x.IntValue)) + case *KeyValue_UintValue: + n += proto.SizeVarint(7<<3 | proto.WireVarint) + n += proto.SizeVarint(uint64(x.UintValue)) + case *KeyValue_SintValue: + n += proto.SizeVarint(8<<3 | proto.WireVarint) + n += proto.SizeVarint(uint64(uint64(x.SintValue<<1) ^ uint64((int64(x.SintValue) >> 63)))) + case *KeyValue_BoolValue: + n += proto.SizeVarint(9<<3 | proto.WireVarint) + n += 1 + case *KeyValue_StrValue: + n += proto.SizeVarint(10<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(len(x.StrValue))) + n += len(x.StrValue) + case *KeyValue_BytesValue: + n += proto.SizeVarint(11<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(len(x.BytesValue))) + n += len(x.BytesValue) + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + +// Message indicating delete for a particular path +type Delete struct { + Path string `protobuf:"bytes,1,opt,name=path" json:"path,omitempty"` +} + +func (m *Delete) Reset() { *m = Delete{} } +func (m *Delete) String() string { return proto.CompactTextString(m) } +func (*Delete) ProtoMessage() {} +func (*Delete) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } + +func (m *Delete) GetPath() string { + if m != nil { + return m.Path + } + return "" +} + +// Message indicating EOM for a particular path +type Eom struct { + Path string `protobuf:"bytes,1,opt,name=path" json:"path,omitempty"` +} + +func (m *Eom) Reset() { *m = Eom{} } +func (m *Eom) String() string { return proto.CompactTextString(m) } +func (*Eom) ProtoMessage() {} +func (*Eom) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } + +func (m *Eom) GetPath() string { + if m != nil { + return m.Path + } + return "" +} + +// Message sent for a telemetry subscription cancellation request +type CancelSubscriptionRequest struct { + // Subscription identifier as returned by the device when + // subscription was requested + SubscriptionId uint32 `protobuf:"varint,1,opt,name=subscription_id,json=subscriptionId" json:"subscription_id,omitempty"` +} + +func (m *CancelSubscriptionRequest) Reset() { *m = CancelSubscriptionRequest{} } +func (m *CancelSubscriptionRequest) String() string { return proto.CompactTextString(m) } +func (*CancelSubscriptionRequest) ProtoMessage() {} +func (*CancelSubscriptionRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} } + +func (m *CancelSubscriptionRequest) GetSubscriptionId() uint32 { + if m != nil { + return m.SubscriptionId + } + return 0 +} + +// Reply to telemetry subscription cancellation request +type CancelSubscriptionReply struct { + // Return code + Code ReturnCode `protobuf:"varint,1,opt,name=code,enum=telemetry.ReturnCode" json:"code,omitempty"` + // Return code string + CodeStr string `protobuf:"bytes,2,opt,name=code_str,json=codeStr" json:"code_str,omitempty"` +} + +func (m *CancelSubscriptionReply) Reset() { *m = CancelSubscriptionReply{} } +func (m *CancelSubscriptionReply) String() string { return proto.CompactTextString(m) } +func (*CancelSubscriptionReply) ProtoMessage() {} +func (*CancelSubscriptionReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} } + +func (m *CancelSubscriptionReply) GetCode() ReturnCode { + if m != nil { + return m.Code + } + return ReturnCode_SUCCESS +} + +func (m *CancelSubscriptionReply) GetCodeStr() string { + if m != nil { + return m.CodeStr + } + return "" +} + +// Message sent for a telemetry get request +type GetSubscriptionsRequest struct { + // Subscription identifier as returned by the device when + // subscription was requested + // --- or --- + // 0xFFFFFFFF for all subscription identifiers + SubscriptionId uint32 `protobuf:"varint,1,opt,name=subscription_id,json=subscriptionId" json:"subscription_id,omitempty"` +} + +func (m *GetSubscriptionsRequest) Reset() { *m = GetSubscriptionsRequest{} } +func (m *GetSubscriptionsRequest) String() string { return proto.CompactTextString(m) } +func (*GetSubscriptionsRequest) ProtoMessage() {} +func (*GetSubscriptionsRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } + +func (m *GetSubscriptionsRequest) GetSubscriptionId() uint32 { + if m != nil { + return m.SubscriptionId + } + return 0 +} + +// Reply to telemetry subscription get request +type GetSubscriptionsReply struct { + // List of current telemetry subscriptions + SubscriptionList []*SubscriptionReply `protobuf:"bytes,1,rep,name=subscription_list,json=subscriptionList" json:"subscription_list,omitempty"` +} + +func (m *GetSubscriptionsReply) Reset() { *m = GetSubscriptionsReply{} } +func (m *GetSubscriptionsReply) String() string { return proto.CompactTextString(m) } +func (*GetSubscriptionsReply) ProtoMessage() {} +func (*GetSubscriptionsReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} } + +func (m *GetSubscriptionsReply) GetSubscriptionList() []*SubscriptionReply { + if m != nil { + return m.SubscriptionList + } + return nil +} + +// Message sent for telemetry agent operational states request +type GetOperationalStateRequest struct { + // Per-subscription_id level operational state can be requested. + // + // Subscription identifier as returned by the device when + // subscription was requested + // --- or --- + // 0xFFFFFFFF for all subscription identifiers including agent-level + // operational stats + // --- or --- + // If subscription_id is not present then sent only agent-level + // operational stats + SubscriptionId uint32 `protobuf:"varint,1,opt,name=subscription_id,json=subscriptionId" json:"subscription_id,omitempty"` + // Control verbosity of the output + Verbosity VerbosityLevel `protobuf:"varint,2,opt,name=verbosity,enum=telemetry.VerbosityLevel" json:"verbosity,omitempty"` +} + +func (m *GetOperationalStateRequest) Reset() { *m = GetOperationalStateRequest{} } +func (m *GetOperationalStateRequest) String() string { return proto.CompactTextString(m) } +func (*GetOperationalStateRequest) ProtoMessage() {} +func (*GetOperationalStateRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} } + +func (m *GetOperationalStateRequest) GetSubscriptionId() uint32 { + if m != nil { + return m.SubscriptionId + } + return 0 +} + +func (m *GetOperationalStateRequest) GetVerbosity() VerbosityLevel { + if m != nil { + return m.Verbosity + } + return VerbosityLevel_DETAIL +} + +// Reply to telemetry agent operational states request +type GetOperationalStateReply struct { + // List of key-value pairs where + // key = operational state definition + // value = operational state value + Kv []*KeyValue `protobuf:"bytes,1,rep,name=kv" json:"kv,omitempty"` +} + +func (m *GetOperationalStateReply) Reset() { *m = GetOperationalStateReply{} } +func (m *GetOperationalStateReply) String() string { return proto.CompactTextString(m) } +func (*GetOperationalStateReply) ProtoMessage() {} +func (*GetOperationalStateReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{16} } + +func (m *GetOperationalStateReply) GetKv() []*KeyValue { + if m != nil { + return m.Kv + } + return nil +} + +// Message sent for a data encoding request +type DataEncodingRequest struct { +} + +func (m *DataEncodingRequest) Reset() { *m = DataEncodingRequest{} } +func (m *DataEncodingRequest) String() string { return proto.CompactTextString(m) } +func (*DataEncodingRequest) ProtoMessage() {} +func (*DataEncodingRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} } + +// Reply to data encodings supported request +type DataEncodingReply struct { + EncodingList []EncodingType `protobuf:"varint,1,rep,packed,name=encoding_list,json=encodingList,enum=telemetry.EncodingType" json:"encoding_list,omitempty"` +} + +func (m *DataEncodingReply) Reset() { *m = DataEncodingReply{} } +func (m *DataEncodingReply) String() string { return proto.CompactTextString(m) } +func (*DataEncodingReply) ProtoMessage() {} +func (*DataEncodingReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} } + +func (m *DataEncodingReply) GetEncodingList() []EncodingType { + if m != nil { + return m.EncodingList + } + return nil +} + +func init() { + proto.RegisterType((*SubscriptionRequest)(nil), "telemetry.SubscriptionRequest") + proto.RegisterType((*SubscriptionInput)(nil), "telemetry.SubscriptionInput") + proto.RegisterType((*Collector)(nil), "telemetry.Collector") + proto.RegisterType((*Path)(nil), "telemetry.Path") + proto.RegisterType((*SubscriptionAdditionalConfig)(nil), "telemetry.SubscriptionAdditionalConfig") + proto.RegisterType((*SubscriptionReply)(nil), "telemetry.SubscriptionReply") + proto.RegisterType((*SubscriptionResponse)(nil), "telemetry.SubscriptionResponse") + proto.RegisterType((*OpenConfigData)(nil), "telemetry.OpenConfigData") + proto.RegisterType((*KeyValue)(nil), "telemetry.KeyValue") + proto.RegisterType((*Delete)(nil), "telemetry.Delete") + proto.RegisterType((*Eom)(nil), "telemetry.Eom") + proto.RegisterType((*CancelSubscriptionRequest)(nil), "telemetry.CancelSubscriptionRequest") + proto.RegisterType((*CancelSubscriptionReply)(nil), "telemetry.CancelSubscriptionReply") + proto.RegisterType((*GetSubscriptionsRequest)(nil), "telemetry.GetSubscriptionsRequest") + proto.RegisterType((*GetSubscriptionsReply)(nil), "telemetry.GetSubscriptionsReply") + proto.RegisterType((*GetOperationalStateRequest)(nil), "telemetry.GetOperationalStateRequest") + proto.RegisterType((*GetOperationalStateReply)(nil), "telemetry.GetOperationalStateReply") + proto.RegisterType((*DataEncodingRequest)(nil), "telemetry.DataEncodingRequest") + proto.RegisterType((*DataEncodingReply)(nil), "telemetry.DataEncodingReply") + proto.RegisterEnum("telemetry.ReturnCode", ReturnCode_name, ReturnCode_value) + proto.RegisterEnum("telemetry.VerbosityLevel", VerbosityLevel_name, VerbosityLevel_value) + proto.RegisterEnum("telemetry.EncodingType", EncodingType_name, EncodingType_value) +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for OpenConfigTelemetry service + +type OpenConfigTelemetryClient interface { + // Request an inline subscription for data at the specified path. + // The device should send telemetry data back on the same + // connection as the subscription request. + TelemetrySubscribe(ctx context.Context, in *SubscriptionRequest, opts ...grpc.CallOption) (OpenConfigTelemetry_TelemetrySubscribeClient, error) + // Terminates and removes an exisiting telemetry subscription + CancelTelemetrySubscription(ctx context.Context, in *CancelSubscriptionRequest, opts ...grpc.CallOption) (*CancelSubscriptionReply, error) + // Get the list of current telemetry subscriptions from the + // target. This command returns a list of existing subscriptions + // not including those that are established via configuration. + GetTelemetrySubscriptions(ctx context.Context, in *GetSubscriptionsRequest, opts ...grpc.CallOption) (*GetSubscriptionsReply, error) + // Get Telemetry Agent Operational States + GetTelemetryOperationalState(ctx context.Context, in *GetOperationalStateRequest, opts ...grpc.CallOption) (*GetOperationalStateReply, error) + // Return the set of data encodings supported by the device for + // telemetry data + GetDataEncodings(ctx context.Context, in *DataEncodingRequest, opts ...grpc.CallOption) (*DataEncodingReply, error) +} + +type openConfigTelemetryClient struct { + cc *grpc.ClientConn +} + +func NewOpenConfigTelemetryClient(cc *grpc.ClientConn) OpenConfigTelemetryClient { + return &openConfigTelemetryClient{cc} +} + +func (c *openConfigTelemetryClient) TelemetrySubscribe(ctx context.Context, in *SubscriptionRequest, opts ...grpc.CallOption) (OpenConfigTelemetry_TelemetrySubscribeClient, error) { + stream, err := grpc.NewClientStream(ctx, &_OpenConfigTelemetry_serviceDesc.Streams[0], c.cc, "/telemetry.OpenConfigTelemetry/telemetrySubscribe", opts...) + if err != nil { + return nil, err + } + x := &openConfigTelemetryTelemetrySubscribeClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type OpenConfigTelemetry_TelemetrySubscribeClient interface { + Recv() (*OpenConfigData, error) + grpc.ClientStream +} + +type openConfigTelemetryTelemetrySubscribeClient struct { + grpc.ClientStream +} + +func (x *openConfigTelemetryTelemetrySubscribeClient) Recv() (*OpenConfigData, error) { + m := new(OpenConfigData) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *openConfigTelemetryClient) CancelTelemetrySubscription(ctx context.Context, in *CancelSubscriptionRequest, opts ...grpc.CallOption) (*CancelSubscriptionReply, error) { + out := new(CancelSubscriptionReply) + err := grpc.Invoke(ctx, "/telemetry.OpenConfigTelemetry/cancelTelemetrySubscription", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *openConfigTelemetryClient) GetTelemetrySubscriptions(ctx context.Context, in *GetSubscriptionsRequest, opts ...grpc.CallOption) (*GetSubscriptionsReply, error) { + out := new(GetSubscriptionsReply) + err := grpc.Invoke(ctx, "/telemetry.OpenConfigTelemetry/getTelemetrySubscriptions", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *openConfigTelemetryClient) GetTelemetryOperationalState(ctx context.Context, in *GetOperationalStateRequest, opts ...grpc.CallOption) (*GetOperationalStateReply, error) { + out := new(GetOperationalStateReply) + err := grpc.Invoke(ctx, "/telemetry.OpenConfigTelemetry/getTelemetryOperationalState", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *openConfigTelemetryClient) GetDataEncodings(ctx context.Context, in *DataEncodingRequest, opts ...grpc.CallOption) (*DataEncodingReply, error) { + out := new(DataEncodingReply) + err := grpc.Invoke(ctx, "/telemetry.OpenConfigTelemetry/getDataEncodings", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for OpenConfigTelemetry service + +type OpenConfigTelemetryServer interface { + // Request an inline subscription for data at the specified path. + // The device should send telemetry data back on the same + // connection as the subscription request. + TelemetrySubscribe(*SubscriptionRequest, OpenConfigTelemetry_TelemetrySubscribeServer) error + // Terminates and removes an exisiting telemetry subscription + CancelTelemetrySubscription(context.Context, *CancelSubscriptionRequest) (*CancelSubscriptionReply, error) + // Get the list of current telemetry subscriptions from the + // target. This command returns a list of existing subscriptions + // not including those that are established via configuration. + GetTelemetrySubscriptions(context.Context, *GetSubscriptionsRequest) (*GetSubscriptionsReply, error) + // Get Telemetry Agent Operational States + GetTelemetryOperationalState(context.Context, *GetOperationalStateRequest) (*GetOperationalStateReply, error) + // Return the set of data encodings supported by the device for + // telemetry data + GetDataEncodings(context.Context, *DataEncodingRequest) (*DataEncodingReply, error) +} + +func RegisterOpenConfigTelemetryServer(s *grpc.Server, srv OpenConfigTelemetryServer) { + s.RegisterService(&_OpenConfigTelemetry_serviceDesc, srv) +} + +func _OpenConfigTelemetry_TelemetrySubscribe_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SubscriptionRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(OpenConfigTelemetryServer).TelemetrySubscribe(m, &openConfigTelemetryTelemetrySubscribeServer{stream}) +} + +type OpenConfigTelemetry_TelemetrySubscribeServer interface { + Send(*OpenConfigData) error + grpc.ServerStream +} + +type openConfigTelemetryTelemetrySubscribeServer struct { + grpc.ServerStream +} + +func (x *openConfigTelemetryTelemetrySubscribeServer) Send(m *OpenConfigData) error { + return x.ServerStream.SendMsg(m) +} + +func _OpenConfigTelemetry_CancelTelemetrySubscription_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CancelSubscriptionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(OpenConfigTelemetryServer).CancelTelemetrySubscription(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/telemetry.OpenConfigTelemetry/CancelTelemetrySubscription", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(OpenConfigTelemetryServer).CancelTelemetrySubscription(ctx, req.(*CancelSubscriptionRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _OpenConfigTelemetry_GetTelemetrySubscriptions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetSubscriptionsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(OpenConfigTelemetryServer).GetTelemetrySubscriptions(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/telemetry.OpenConfigTelemetry/GetTelemetrySubscriptions", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(OpenConfigTelemetryServer).GetTelemetrySubscriptions(ctx, req.(*GetSubscriptionsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _OpenConfigTelemetry_GetTelemetryOperationalState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetOperationalStateRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(OpenConfigTelemetryServer).GetTelemetryOperationalState(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/telemetry.OpenConfigTelemetry/GetTelemetryOperationalState", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(OpenConfigTelemetryServer).GetTelemetryOperationalState(ctx, req.(*GetOperationalStateRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _OpenConfigTelemetry_GetDataEncodings_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DataEncodingRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(OpenConfigTelemetryServer).GetDataEncodings(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/telemetry.OpenConfigTelemetry/GetDataEncodings", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(OpenConfigTelemetryServer).GetDataEncodings(ctx, req.(*DataEncodingRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _OpenConfigTelemetry_serviceDesc = grpc.ServiceDesc{ + ServiceName: "telemetry.OpenConfigTelemetry", + HandlerType: (*OpenConfigTelemetryServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "cancelTelemetrySubscription", + Handler: _OpenConfigTelemetry_CancelTelemetrySubscription_Handler, + }, + { + MethodName: "getTelemetrySubscriptions", + Handler: _OpenConfigTelemetry_GetTelemetrySubscriptions_Handler, + }, + { + MethodName: "getTelemetryOperationalState", + Handler: _OpenConfigTelemetry_GetTelemetryOperationalState_Handler, + }, + { + MethodName: "getDataEncodings", + Handler: _OpenConfigTelemetry_GetDataEncodings_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "telemetrySubscribe", + Handler: _OpenConfigTelemetry_TelemetrySubscribe_Handler, + ServerStreams: true, + }, + }, + Metadata: "oc.proto", +} + +func init() { proto.RegisterFile("oc.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 1254 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x56, 0xcd, 0x6e, 0xdb, 0x46, + 0x17, 0x15, 0x25, 0xd9, 0x12, 0xaf, 0x7e, 0x42, 0x8d, 0xe3, 0x2f, 0xb2, 0xa3, 0xaf, 0x71, 0xe8, + 0x16, 0x71, 0x82, 0xd4, 0x28, 0x94, 0x45, 0x51, 0xa4, 0x40, 0x10, 0xcb, 0x74, 0xac, 0xc6, 0x95, + 0xdc, 0xa1, 0x9c, 0xb6, 0x2b, 0x82, 0x22, 0x27, 0x36, 0x11, 0xfe, 0x95, 0x33, 0x12, 0xc2, 0x4d, + 0x9e, 0xa0, 0xe8, 0x9b, 0x75, 0xdd, 0x97, 0xe8, 0x23, 0x74, 0x51, 0xcc, 0x90, 0x94, 0x46, 0x89, + 0x94, 0x34, 0x2b, 0x91, 0xe7, 0x9e, 0xb9, 0xf7, 0xcc, 0xbd, 0x67, 0x86, 0x82, 0x7a, 0xe4, 0x1c, + 0xc7, 0x49, 0xc4, 0x22, 0xa4, 0x32, 0xe2, 0x93, 0x80, 0xb0, 0x24, 0xd5, 0xff, 0x54, 0x60, 0xc7, + 0x9c, 0x4d, 0xa9, 0x93, 0x78, 0x31, 0xf3, 0xa2, 0x10, 0x93, 0xdf, 0x66, 0x84, 0x32, 0xd4, 0x87, + 0x2d, 0x2f, 0x8c, 0x67, 0xac, 0xab, 0x1c, 0x28, 0x47, 0x8d, 0x7e, 0xef, 0x78, 0xb1, 0xe4, 0x58, + 0xa6, 0x0f, 0x39, 0x07, 0x67, 0x54, 0xf4, 0x18, 0xd4, 0xd8, 0x66, 0x37, 0x96, 0xef, 0x51, 0xd6, + 0x2d, 0x1f, 0x54, 0x8e, 0x1a, 0xfd, 0x5b, 0xd2, 0xba, 0x4b, 0x9b, 0xdd, 0xe0, 0x3a, 0x67, 0x5c, + 0x78, 0x94, 0xa1, 0x09, 0x74, 0x6c, 0xd7, 0xf5, 0x78, 0x16, 0xdb, 0xb7, 0x9c, 0x28, 0x7c, 0xed, + 0x5d, 0x77, 0x2b, 0xa2, 0xda, 0x83, 0x0d, 0xd5, 0x9e, 0x2f, 0xf8, 0x03, 0x41, 0xc7, 0x9a, 0xfd, + 0x1e, 0xa2, 0x5f, 0x42, 0xe7, 0x03, 0x7d, 0xe8, 0x29, 0xb4, 0x9d, 0xc8, 0xf7, 0x89, 0xc3, 0xa2, + 0x24, 0x53, 0xa7, 0x08, 0x75, 0xb7, 0xa5, 0x3a, 0x83, 0x82, 0x80, 0x5b, 0x0b, 0x2e, 0xd7, 0xa9, + 0x7f, 0x07, 0xea, 0x22, 0x86, 0xba, 0x50, 0xb3, 0x5d, 0x37, 0x21, 0x94, 0x8a, 0xc6, 0xa8, 0xb8, + 0x78, 0x45, 0x08, 0xaa, 0x71, 0x94, 0xf0, 0x7d, 0x2b, 0x47, 0x2d, 0x2c, 0x9e, 0xf5, 0xbf, 0x14, + 0xa8, 0xf2, 0x5d, 0x8b, 0xa0, 0xcd, 0x6e, 0xf2, 0x35, 0xe2, 0x19, 0xfd, 0x0f, 0xb6, 0x5f, 0x7b, + 0x3e, 0x23, 0x89, 0x58, 0xa2, 0xe2, 0xfc, 0x0d, 0x7d, 0x0d, 0x88, 0xce, 0xe2, 0x98, 0x27, 0xb5, + 0x66, 0xa1, 0x73, 0x63, 0x87, 0xd7, 0xc4, 0x15, 0x8d, 0xa9, 0xe3, 0x4e, 0x11, 0xb9, 0x2a, 0x02, + 0xe8, 0x18, 0x76, 0x02, 0xfb, 0xad, 0x45, 0x3d, 0x9f, 0x84, 0xcc, 0xf2, 0x42, 0x46, 0x92, 0xb9, + 0xed, 0x77, 0xab, 0x42, 0x46, 0x27, 0xb0, 0xdf, 0x9a, 0x22, 0x32, 0xcc, 0x03, 0xe8, 0x21, 0x68, + 0xd4, 0x0e, 0x62, 0x9f, 0x58, 0xaf, 0x13, 0x3e, 0xeb, 0xd0, 0x49, 0xbb, 0x5b, 0x82, 0x7c, 0x2b, + 0xc3, 0xcf, 0x0a, 0x18, 0xed, 0x41, 0x3d, 0x24, 0xc4, 0xb5, 0x48, 0x14, 0x74, 0xb7, 0x45, 0xfd, + 0x1a, 0x7f, 0x37, 0xa2, 0x40, 0xff, 0x5d, 0x81, 0xde, 0xc7, 0x26, 0x83, 0x0e, 0xa1, 0xe5, 0x7b, + 0x81, 0xc7, 0xac, 0x84, 0x38, 0x51, 0xe2, 0x66, 0xed, 0xda, 0xc2, 0x4d, 0x01, 0xe2, 0x0c, 0x43, + 0x8f, 0x01, 0x65, 0x24, 0xe6, 0x05, 0xc4, 0xa2, 0xc4, 0x89, 0x42, 0x97, 0x8a, 0x76, 0x6c, 0x61, + 0x4d, 0x44, 0x26, 0x5e, 0x40, 0xcc, 0x0c, 0x97, 0xe4, 0xd0, 0xbc, 0x1d, 0xb9, 0x1c, 0xaa, 0xbf, + 0x5b, 0x9d, 0x3a, 0x26, 0xb1, 0x9f, 0xa2, 0xa7, 0x50, 0x4f, 0x08, 0x8d, 0xa3, 0x90, 0x92, 0xdc, + 0xc5, 0xf7, 0x36, 0xf8, 0x0a, 0xe7, 0x34, 0xbc, 0x58, 0xf0, 0x79, 0x5e, 0xd6, 0x9f, 0xc1, 0xed, + 0x75, 0xf9, 0xd0, 0x03, 0xb8, 0x45, 0x25, 0xdc, 0xf2, 0x5c, 0xa1, 0xa4, 0x85, 0xdb, 0x32, 0x3c, + 0x74, 0xf5, 0xbf, 0xcb, 0xd0, 0x1e, 0xc7, 0x24, 0xcc, 0xba, 0x77, 0x6a, 0x33, 0x1b, 0xdd, 0x05, + 0x95, 0xa6, 0x94, 0x91, 0xa0, 0x58, 0xa5, 0xe2, 0x7a, 0x06, 0x0c, 0x5d, 0x74, 0x1f, 0x9a, 0x4e, + 0x14, 0xc4, 0x51, 0x28, 0x86, 0xee, 0xe6, 0xae, 0x6b, 0x2c, 0xb0, 0xa1, 0x8b, 0x8e, 0x40, 0xa3, + 0xb3, 0xa9, 0xb5, 0x42, 0xab, 0x2c, 0x8a, 0x0f, 0x24, 0x66, 0xe1, 0xce, 0xaa, 0xe4, 0x4e, 0xae, + 0x3c, 0xf3, 0x01, 0xb1, 0xc2, 0x59, 0x30, 0x25, 0x89, 0x70, 0x49, 0x15, 0xb7, 0x0b, 0x78, 0x24, + 0x50, 0xd4, 0x03, 0x95, 0x4f, 0x8f, 0x32, 0x3b, 0x88, 0x85, 0x4b, 0xaa, 0x78, 0x09, 0xa0, 0x43, + 0x28, 0xbf, 0x99, 0x77, 0x6b, 0xa2, 0x7f, 0x3b, 0x52, 0xff, 0x5e, 0x92, 0xf4, 0x95, 0xed, 0xcf, + 0x08, 0x2e, 0xbf, 0x99, 0xa3, 0x87, 0xb0, 0xed, 0x12, 0x9f, 0x30, 0xd2, 0xad, 0x0b, 0x62, 0x47, + 0x22, 0x9e, 0x8a, 0x00, 0xce, 0x09, 0xe8, 0x00, 0x2a, 0xdc, 0x8d, 0xaa, 0xe0, 0xb5, 0x25, 0x9e, + 0x11, 0x05, 0x98, 0x87, 0xb8, 0xf1, 0x68, 0x1a, 0x3a, 0xd6, 0x62, 0xf4, 0x20, 0xac, 0xd2, 0xe4, + 0x60, 0x31, 0x17, 0xfd, 0x8f, 0x32, 0xd4, 0x0b, 0x09, 0x48, 0x83, 0xca, 0x1b, 0x92, 0xe6, 0x2d, + 0xe6, 0x8f, 0xe8, 0x10, 0x9a, 0x6e, 0x34, 0x9b, 0xfa, 0xc4, 0x9a, 0x73, 0x86, 0xd8, 0xb9, 0x72, + 0x5e, 0xc2, 0x8d, 0x0c, 0xcd, 0x96, 0xfd, 0x1f, 0x54, 0x2f, 0x64, 0x39, 0x83, 0x6f, 0xbc, 0x72, + 0x5e, 0xc2, 0x75, 0x2f, 0x64, 0x59, 0xf8, 0x1e, 0xc0, 0x6c, 0x19, 0xaf, 0xf1, 0xc6, 0x9c, 0x97, + 0xb0, 0x3a, 0x93, 0x09, 0x74, 0x49, 0xa8, 0x1f, 0x28, 0x47, 0x88, 0x13, 0xa8, 0x4c, 0x98, 0x46, + 0x91, 0x9f, 0x13, 0x54, 0xbe, 0x0d, 0x4e, 0xe0, 0xd8, 0x42, 0x01, 0x65, 0x49, 0x1e, 0xe7, 0xdb, + 0x54, 0xb9, 0x02, 0xca, 0x92, 0x2c, 0x7c, 0x1f, 0x1a, 0xd3, 0x94, 0x11, 0x9a, 0x13, 0x1a, 0x07, + 0xca, 0x51, 0xf3, 0xbc, 0x84, 0x41, 0x80, 0x82, 0x72, 0x52, 0x83, 0x2d, 0x11, 0xd4, 0x7b, 0xb0, + 0x9d, 0x75, 0x7a, 0xdd, 0x55, 0xa5, 0xef, 0x41, 0xc5, 0x88, 0x82, 0xb5, 0xa1, 0x53, 0xd8, 0x1b, + 0xd8, 0xa1, 0x43, 0xfc, 0x75, 0x1f, 0x91, 0xff, 0x6c, 0x7f, 0x0b, 0xee, 0xac, 0xcb, 0xc2, 0x4f, + 0xf1, 0x43, 0xa8, 0x3a, 0x91, 0x9b, 0x9d, 0xe0, 0x76, 0x7f, 0x57, 0x1a, 0x39, 0x26, 0x6c, 0x96, + 0x84, 0x83, 0xc8, 0x25, 0x58, 0x50, 0xf8, 0x05, 0xc1, 0x7f, 0x2d, 0xca, 0x8a, 0x3b, 0xb5, 0xc6, + 0xdf, 0x4d, 0x96, 0xe8, 0x27, 0x70, 0xe7, 0x05, 0x61, 0x72, 0x76, 0xfa, 0xd9, 0x22, 0xa7, 0xb0, + 0xfb, 0x61, 0x0e, 0x2e, 0x71, 0x08, 0x9d, 0x95, 0x0c, 0xd2, 0x17, 0xa6, 0xb7, 0xf1, 0xc6, 0x89, + 0xfd, 0x14, 0x6b, 0xf2, 0x32, 0x71, 0x91, 0xbc, 0x83, 0xfd, 0x17, 0x84, 0x8d, 0x63, 0x92, 0xd8, + 0xd9, 0x75, 0x6a, 0x32, 0x9b, 0x91, 0xcf, 0x95, 0x8a, 0xbe, 0x05, 0x75, 0x4e, 0x92, 0x69, 0x44, + 0x3d, 0x96, 0x8a, 0x56, 0xb4, 0xfb, 0x7b, 0x92, 0x92, 0x57, 0x45, 0xec, 0x82, 0xcc, 0x89, 0x8f, + 0x97, 0x5c, 0xfd, 0x19, 0x74, 0xd7, 0xd6, 0xe7, 0xdb, 0xcc, 0xce, 0xb2, 0xf2, 0xd1, 0xb3, 0xac, + 0xef, 0xc2, 0x0e, 0xbf, 0xbd, 0x8c, 0xd0, 0x89, 0x5c, 0x2f, 0xbc, 0xce, 0x95, 0xeb, 0x3f, 0x41, + 0x67, 0x15, 0xe6, 0x09, 0xbf, 0x87, 0x16, 0xc9, 0x81, 0x65, 0xcf, 0xda, 0xfd, 0x3b, 0xf2, 0xb1, + 0xce, 0xe3, 0x93, 0x34, 0x26, 0xb8, 0x59, 0xb0, 0x79, 0xab, 0x1e, 0xbd, 0x00, 0x58, 0x3a, 0x00, + 0x35, 0xa0, 0x66, 0x5e, 0x0d, 0x06, 0x86, 0x69, 0x6a, 0x25, 0xb4, 0x07, 0xbb, 0xa3, 0xb1, 0x65, + 0x5e, 0x9d, 0x98, 0x03, 0x3c, 0xbc, 0x9c, 0x0c, 0xc7, 0x23, 0xcb, 0x18, 0x4d, 0xf0, 0xaf, 0x9a, + 0x82, 0x3a, 0xd0, 0xba, 0x1a, 0xbd, 0x1c, 0x8d, 0x7f, 0x1e, 0x59, 0x06, 0xc6, 0x63, 0xac, 0x95, + 0x1f, 0xf5, 0xa1, 0xbd, 0xda, 0x10, 0x04, 0xb0, 0x7d, 0x6a, 0x4c, 0x9e, 0x0f, 0x2f, 0xb4, 0x12, + 0x52, 0x61, 0x6b, 0x62, 0x60, 0xd3, 0xd0, 0x14, 0xfe, 0x78, 0x82, 0x87, 0xc6, 0x99, 0x56, 0x7e, + 0xf4, 0x1c, 0x9a, 0xb2, 0x34, 0xd4, 0x02, 0xf5, 0x6a, 0x74, 0x6a, 0x9c, 0x0d, 0x47, 0xc6, 0xa9, + 0x56, 0x42, 0x35, 0xa8, 0xfc, 0xf2, 0xe3, 0x85, 0xa6, 0x70, 0xfc, 0x07, 0x73, 0x3c, 0xb2, 0x86, + 0xc6, 0xe4, 0x4c, 0x2b, 0xf3, 0xc4, 0x97, 0x78, 0x3c, 0x19, 0x3f, 0xd1, 0x2a, 0xfd, 0x7f, 0x2a, + 0xb0, 0xb3, 0xbc, 0xf2, 0x27, 0xc5, 0x96, 0x91, 0x09, 0x68, 0xb1, 0xff, 0xdc, 0x32, 0x53, 0x82, + 0xbe, 0xd8, 0x68, 0x24, 0xd1, 0xe0, 0x7d, 0x79, 0xbc, 0xab, 0x1f, 0x12, 0xbd, 0xf4, 0x8d, 0x82, + 0x3c, 0xb8, 0xeb, 0x88, 0x03, 0x36, 0x79, 0x2f, 0xb5, 0x48, 0x82, 0xbe, 0x94, 0xff, 0x08, 0x6d, + 0x3a, 0xce, 0xfb, 0xfa, 0x27, 0x58, 0xb1, 0x9f, 0xea, 0x25, 0xe4, 0xc0, 0xde, 0x35, 0x61, 0x6b, + 0xeb, 0x50, 0x24, 0xa7, 0xd8, 0x70, 0x20, 0xf7, 0x0f, 0x3e, 0xca, 0xc9, 0x8a, 0xf8, 0xd0, 0x93, + 0x8b, 0xbc, 0x6f, 0x58, 0xf4, 0xd5, 0x6a, 0x8e, 0x0d, 0x07, 0x6a, 0xff, 0xf0, 0x53, 0xb4, 0xac, + 0x1a, 0x06, 0xed, 0x9a, 0x30, 0xd9, 0xc0, 0x74, 0x65, 0x20, 0x6b, 0x1c, 0xbf, 0xdf, 0xdb, 0x18, + 0x17, 0x39, 0xa7, 0xdb, 0xe2, 0xaf, 0xf8, 0x93, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xc9, 0xe3, + 0x4f, 0x0d, 0x96, 0x0b, 0x00, 0x00, +} diff --git a/plugins/inputs/jti_openconfig_telemetry/oc/oc.proto b/plugins/inputs/jti_openconfig_telemetry/oc/oc.proto new file mode 100644 index 000000000..38ce9b422 --- /dev/null +++ b/plugins/inputs/jti_openconfig_telemetry/oc/oc.proto @@ -0,0 +1,319 @@ +// +// Copyrights (c) 2016, Juniper Networks, Inc. +// All rights reserved. +// + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +// +// Nitin Kumar 04/07/2016 +// Abbas Sakarwala 04/07/2016 +// +// This file defines the Openconfig Telemetry RPC APIs (for gRPC). +// +// https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang +// +// Version 1.0 +// + +syntax = "proto3"; + +package telemetry; + +// Interface exported by Agent +service OpenConfigTelemetry { + // Request an inline subscription for data at the specified path. + // The device should send telemetry data back on the same + // connection as the subscription request. + rpc telemetrySubscribe(SubscriptionRequest) returns (stream OpenConfigData) {} + + // Terminates and removes an exisiting telemetry subscription + rpc cancelTelemetrySubscription(CancelSubscriptionRequest) returns (CancelSubscriptionReply) {} + + // Get the list of current telemetry subscriptions from the + // target. This command returns a list of existing subscriptions + // not including those that are established via configuration. + rpc getTelemetrySubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsReply) {} + + // Get Telemetry Agent Operational States + rpc getTelemetryOperationalState(GetOperationalStateRequest) returns (GetOperationalStateReply) {} + + // Return the set of data encodings supported by the device for + // telemetry data + rpc getDataEncodings(DataEncodingRequest) returns (DataEncodingReply) {} +} + +// Message sent for a telemetry subscription request +message SubscriptionRequest { + // Data associated with a telemetry subscription + SubscriptionInput input = 1; + + // List of data models paths and filters + // which are used in a telemetry operation. + repeated Path path_list = 2; + + // The below configuration is not defined in Openconfig RPC. + // It is a proposed extension to configure additional + // subscription request features. + SubscriptionAdditionalConfig additional_config = 3; +} + +// Data associated with a telemetry subscription +message SubscriptionInput { + // List of optional collector endpoints to send data for + // this subscription. + // If no collector destinations are specified, the collector + // destination is assumed to be the requester on the rpc channel. + repeated Collector collector_list = 1; +} + +// Collector endpoints to send data specified as an ip+port combination. +message Collector { + // IP address of collector endpoint + string address = 1; + + // Transport protocol port number for the collector destination. + uint32 port = 2; +} + +// Data model path +message Path { + // Data model path of interest + // Path specification for elements of OpenConfig data models + string path = 1; + + // Regular expression to be used in filtering state leaves + string filter = 2; + + // If this is set to true, the target device will only send + // updates to the collector upon a change in data value + bool suppress_unchanged = 3; + + // Maximum time in ms the target device may go without sending + // a message to the collector. If this time expires with + // suppress-unchanged set, the target device must send an update + // message regardless if the data values have changed. + uint32 max_silent_interval = 4; + + // Time in ms between collection and transmission of the + // specified data to the collector platform. The target device + // will sample the corresponding data (e.g,. a counter) and + // immediately send to the collector destination. + // + // If sample-frequency is set to 0, then the network device + // must emit an update upon every datum change. + uint32 sample_frequency = 5; + + // EOM needed for each walk cycle of this path? + // For periodic sensor, applicable for each complete reap + // For event sensor, applicable when initial dump is over + // (same as EOS) + // This feature is not implemented currently. + bool need_eom = 6; +} + +// Configure subscription request additional features. +message SubscriptionAdditionalConfig { + // limit the number of records sent in the stream + int32 limit_records = 1; + + // limit the time the stream remains open + int32 limit_time_seconds = 2; + + // EOS needed for this subscription? + bool need_eos = 3; +} + +// Reply to inline subscription for data at the specified path is done in +// two-folds. +// 1. Reply data message sent out using out-of-band channel. +// 2. Telemetry data send back on the same connection as the +// subscription request. + +// 1. Reply data message sent out using out-of-band channel. +message SubscriptionReply { + // Response message to a telemetry subscription creation or + // get request. + SubscriptionResponse response = 1; + + // List of data models paths and filters + // which are used in a telemetry operation. + repeated Path path_list = 2; +} + +// Response message to a telemetry subscription creation or get request. +message SubscriptionResponse { + // Unique id for the subscription on the device. This is + // generated by the device and returned in a subscription + // request or when listing existing subscriptions + uint32 subscription_id = 1; +} + +// 2. Telemetry data send back on the same connection as the +// subscription request. +message OpenConfigData { + // router name:export IP address + string system_id = 1; + + // line card / RE (slot number) + uint32 component_id = 2; + + // PFE (if applicable) + uint32 sub_component_id = 3; + + // Path specification for elements of OpenConfig data models + string path = 4; + + // Sequence number, monotonically increasing for each + // system_id, component_id, sub_component_id + path. + uint64 sequence_number = 5; + + // timestamp (milliseconds since epoch) + uint64 timestamp = 6; + + // List of key-value pairs + repeated KeyValue kv = 7; + + // For delete. If filled, it indicates delete + repeated Delete delete = 8; + + // If filled, it indicates end of marker for the + // respective path in the list. + repeated Eom eom = 9; + + // If filled, it indicates end of sync for complete subscription + bool sync_response = 10; +} + +// Simple Key-value, where value could be one of scalar types +message KeyValue { + // Key + string key = 1; + + // One of possible values + oneof value { + double double_value = 5; + int64 int_value = 6; + uint64 uint_value = 7; + sint64 sint_value = 8; + bool bool_value = 9; + string str_value = 10; + bytes bytes_value = 11; + } +} + +// Message indicating delete for a particular path +message Delete { + string path = 1; +} + +// Message indicating EOM for a particular path +message Eom { + string path = 1; +} + +// Message sent for a telemetry subscription cancellation request +message CancelSubscriptionRequest { + // Subscription identifier as returned by the device when + // subscription was requested + uint32 subscription_id = 1; +} + +// Reply to telemetry subscription cancellation request +message CancelSubscriptionReply { + // Return code + ReturnCode code = 1; + + // Return code string + string code_str = 2; +}; + +// Result of the operation +enum ReturnCode { + SUCCESS = 0; + NO_SUBSCRIPTION_ENTRY = 1; + UNKNOWN_ERROR = 2; +} + +// Message sent for a telemetry get request +message GetSubscriptionsRequest { + // Subscription identifier as returned by the device when + // subscription was requested + // --- or --- + // 0xFFFFFFFF for all subscription identifiers + uint32 subscription_id = 1; +} + +// Reply to telemetry subscription get request +message GetSubscriptionsReply { + // List of current telemetry subscriptions + repeated SubscriptionReply subscription_list = 1; +} + +// Message sent for telemetry agent operational states request +message GetOperationalStateRequest { + // Per-subscription_id level operational state can be requested. + // + // Subscription identifier as returned by the device when + // subscription was requested + // --- or --- + // 0xFFFFFFFF for all subscription identifiers including agent-level + // operational stats + // --- or --- + // If subscription_id is not present then sent only agent-level + // operational stats + uint32 subscription_id = 1; + + // Control verbosity of the output + VerbosityLevel verbosity = 2; +} + +// Verbosity Level +enum VerbosityLevel { + DETAIL = 0; + TERSE = 1; + BRIEF = 2; +} + +// Reply to telemetry agent operational states request +message GetOperationalStateReply { + // List of key-value pairs where + // key = operational state definition + // value = operational state value + repeated KeyValue kv = 1; +} + +// Message sent for a data encoding request +message DataEncodingRequest { +} + +// Reply to data encodings supported request +message DataEncodingReply { + repeated EncodingType encoding_list = 1; +} + +// Encoding Type Supported +enum EncodingType { + UNDEFINED = 0; + XML = 1; + JSON_IETF = 2; + PROTO3 = 3; +} + diff --git a/plugins/inputs/jti_openconfig_telemetry/openconfig_telemetry.go b/plugins/inputs/jti_openconfig_telemetry/openconfig_telemetry.go new file mode 100644 index 000000000..49a593a08 --- /dev/null +++ b/plugins/inputs/jti_openconfig_telemetry/openconfig_telemetry.go @@ -0,0 +1,422 @@ +package jti_openconfig_telemetry + +import ( + "fmt" + "log" + "net" + "regexp" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/inputs/jti_openconfig_telemetry/auth" + "github.com/influxdata/telegraf/plugins/inputs/jti_openconfig_telemetry/oc" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" +) + +type OpenConfigTelemetry struct { + Servers []string + Sensors []string + Username string + Password string + ClientID string `toml:"client_id"` + SampleFrequency internal.Duration `toml:"sample_frequency"` + SSLCert string `toml:"ssl_cert"` + StrAsTags bool `toml:"str_as_tags"` + RetryDelay internal.Duration `toml:"retry_delay"` + + sensorsConfig []sensorConfig + grpcClientConns []*grpc.ClientConn + wg *sync.WaitGroup +} + +var ( + // Regex to match and extract data points from path value in received key + keyPathRegex = regexp.MustCompile("\\/([^\\/]*)\\[([A-Za-z0-9\\-\\/]*\\=[^\\[]*)\\]") + sampleConfig = ` + ## List of device addresses to collect telemetry from + servers = ["localhost:1883"] + + ## Authentication details. Username and password are must if device expects + ## authentication. Client ID must be unique when connecting from multiple instances + ## of telegraf to the same device + username = "user" + password = "pass" + client_id = "telegraf" + + ## Frequency to get data + sample_frequency = "1000ms" + + ## Sensors to subscribe for + ## A identifier for each sensor can be provided in path by separating with space + ## Else sensor path will be used as identifier + ## When identifier is used, we can provide a list of space separated sensors. + ## A single subscription will be created with all these sensors and data will + ## be saved to measurement with this identifier name + sensors = [ + "/interfaces/", + "collection /components/ /lldp", + ] + + ## We allow specifying sensor group level reporting rate. To do this, specify the + ## reporting rate in Duration at the beginning of sensor paths / collection + ## name. For entries without reporting rate, we use configured sample frequency + sensors = [ + "1000ms customReporting /interfaces /lldp", + "2000ms collection /components", + "/interfaces", + ] + + ## x509 Certificate to use with TLS connection. If it is not provided, an insecure + ## channel will be opened with server + ssl_cert = "/etc/telegraf/cert.pem" + + ## Delay between retry attempts of failed RPC calls or streams. Defaults to 1000ms. + ## Failed streams/calls will not be retried if 0 is provided + retry_delay = "1000ms" + + ## To treat all string values as tags, set this to true + str_as_tags = false +` +) + +func (m *OpenConfigTelemetry) SampleConfig() string { + return sampleConfig +} + +func (m *OpenConfigTelemetry) Description() string { + return "Read JTI OpenConfig Telemetry from listed sensors" +} + +func (m *OpenConfigTelemetry) Gather(acc telegraf.Accumulator) error { + return nil +} + +func (m *OpenConfigTelemetry) Stop() { + for _, grpcClientConn := range m.grpcClientConns { + grpcClientConn.Close() + } + m.wg.Wait() +} + +// Takes in XML path with predicates and returns list of tags+values along with a final +// XML path without predicates. If /events/event[id=2]/attributes[key='message']/value +// is given input, this function will emit /events/event/attributes/value as xmlpath and +// { /events/event/@id=2, /events/event/attributes/@key='message' } as tags +func spitTagsNPath(xmlpath string) (string, map[string]string) { + subs := keyPathRegex.FindAllStringSubmatch(xmlpath, -1) + tags := make(map[string]string) + + // Given XML path, this will spit out final path without predicates + if len(subs) > 0 { + for _, sub := range subs { + tagKey := strings.Split(xmlpath, sub[0])[0] + "/" + strings.TrimSpace(sub[1]) + "/@" + + // If we have multiple keys in give path like /events/event[id=2 and type=3]/, + // we must emit multiple tags + for _, kv := range strings.Split(sub[2], " and ") { + key := tagKey + strings.TrimSpace(strings.Split(kv, "=")[0]) + tagValue := strings.Replace(strings.Split(kv, "=")[1], "'", "", -1) + tags[key] = tagValue + } + + xmlpath = strings.Replace(xmlpath, sub[0], "/"+strings.TrimSpace(sub[1]), 1) + } + } + + return xmlpath, tags +} + +// Takes in a OC response, extracts tag information from keys and returns a +// list of groups with unique sets of tags+values +func (m *OpenConfigTelemetry) extractData(r *telemetry.OpenConfigData, grpcServer string) []DataGroup { + // Use empty prefix. We will update this when we iterate over key-value pairs + prefix := "" + + dgroups := []DataGroup{} + + for _, v := range r.Kv { + kv := make(map[string]interface{}) + + if v.Key == "__prefix__" { + prefix = v.GetStrValue() + continue + } + + // Also, lets use prefix if there is one + xmlpath, finaltags := spitTagsNPath(prefix + v.Key) + finaltags["device"] = grpcServer + + switch v.Value.(type) { + case *telemetry.KeyValue_StrValue: + // If StrAsTags is set, we treat all string values as tags + if m.StrAsTags { + finaltags[xmlpath] = v.GetStrValue() + } else { + kv[xmlpath] = v.GetStrValue() + } + break + case *telemetry.KeyValue_DoubleValue: + kv[xmlpath] = v.GetDoubleValue() + break + case *telemetry.KeyValue_IntValue: + kv[xmlpath] = v.GetIntValue() + break + case *telemetry.KeyValue_UintValue: + kv[xmlpath] = v.GetUintValue() + break + case *telemetry.KeyValue_SintValue: + kv[xmlpath] = v.GetSintValue() + break + case *telemetry.KeyValue_BoolValue: + kv[xmlpath] = v.GetBoolValue() + break + case *telemetry.KeyValue_BytesValue: + kv[xmlpath] = v.GetBytesValue() + break + } + + // Insert other tags from message + finaltags["system_id"] = r.SystemId + finaltags["path"] = r.Path + + // Insert derived key and value + dgroups = CollectionByKeys(dgroups).Insert(finaltags, kv) + + // Insert data from message header + dgroups = CollectionByKeys(dgroups).Insert(finaltags, + map[string]interface{}{"_sequence": r.SequenceNumber}) + dgroups = CollectionByKeys(dgroups).Insert(finaltags, + map[string]interface{}{"_timestamp": r.Timestamp}) + dgroups = CollectionByKeys(dgroups).Insert(finaltags, + map[string]interface{}{"_component_id": r.ComponentId}) + dgroups = CollectionByKeys(dgroups).Insert(finaltags, + map[string]interface{}{"_subcomponent_id": r.SubComponentId}) + } + + return dgroups +} + +// Structure to hold sensors path list and measurement name +type sensorConfig struct { + measurementName string + pathList []*telemetry.Path +} + +// Takes in sensor configuration and converts it into slice of sensorConfig objects +func (m *OpenConfigTelemetry) splitSensorConfig() int { + var pathlist []*telemetry.Path + var measurementName string + var reportingRate uint32 + + m.sensorsConfig = make([]sensorConfig, 0) + for _, sensor := range m.Sensors { + spathSplit := strings.Fields(sensor) + reportingRate = uint32(m.SampleFrequency.Duration / time.Millisecond) + + // Extract measurement name and custom reporting rate if specified. Custom + // reporting rate will be specified at the beginning of sensor list, + // followed by measurement name like "1000ms interfaces /interfaces" + // where 1000ms is the custom reporting rate and interfaces is the + // measurement name. If 1000ms is not given, we use global reporting rate + // from sample_frequency. if measurement name is not given, we use first + // sensor name as the measurement name. If first or the word after custom + // reporting rate doesn't start with /, we treat it as measurement name + // and exclude it from list of sensors to subscribe + duration, err := time.ParseDuration(spathSplit[0]) + if err == nil { + reportingRate = uint32(duration / time.Millisecond) + spathSplit = spathSplit[1:] + } + + if len(spathSplit) == 0 { + log.Printf("E! No sensors are specified") + continue + } + + // Word after custom reporting rate is treated as measurement name + measurementName = spathSplit[0] + + // If our word after custom reporting rate doesn't start with /, we treat + // it as measurement name. Else we treat it as sensor + if !strings.HasPrefix(measurementName, "/") { + spathSplit = spathSplit[1:] + } + + if len(spathSplit) == 0 { + log.Printf("E! No valid sensors are specified") + continue + } + + // Iterate over our sensors and create pathlist to subscribe + pathlist = make([]*telemetry.Path, 0) + for _, path := range spathSplit { + pathlist = append(pathlist, &telemetry.Path{Path: path, + SampleFrequency: reportingRate}) + } + + m.sensorsConfig = append(m.sensorsConfig, sensorConfig{ + measurementName: measurementName, pathList: pathlist, + }) + + } + + return len(m.sensorsConfig) +} + +// Subscribes and collects OpenConfig telemetry data from given server +func (m *OpenConfigTelemetry) collectData(ctx context.Context, + grpcServer string, grpcClientConn *grpc.ClientConn, + acc telegraf.Accumulator) error { + c := telemetry.NewOpenConfigTelemetryClient(grpcClientConn) + for _, sensor := range m.sensorsConfig { + m.wg.Add(1) + go func(ctx context.Context, sensor sensorConfig) { + defer m.wg.Done() + + for { + stream, err := c.TelemetrySubscribe(ctx, + &telemetry.SubscriptionRequest{PathList: sensor.pathList}) + if err != nil { + rpcStatus, _ := status.FromError(err) + // If service is currently unavailable and may come back later, retry + if rpcStatus.Code() != codes.Unavailable { + acc.AddError(fmt.Errorf("E! Could not subscribe to %s: %v", grpcServer, + err)) + return + } else { + // Retry with delay. If delay is not provided, use default + if m.RetryDelay.Duration > 0 { + log.Printf("D! Retrying %s with timeout %v", grpcServer, + m.RetryDelay.Duration) + time.Sleep(m.RetryDelay.Duration) + continue + } else { + return + } + } + } + for { + r, err := stream.Recv() + if err != nil { + // If we encounter error in the stream, break so we can retry + // the connection + acc.AddError(fmt.Errorf("E! Failed to read from %s: %v", err, grpcServer)) + break + } + + log.Printf("D! Received from %s: %v", grpcServer, r) + + // Create a point and add to batch + tags := make(map[string]string) + + // Insert additional tags + tags["device"] = grpcServer + + dgroups := m.extractData(r, grpcServer) + + // Print final data collection + log.Printf("D! Available collection for %s is: %v", grpcServer, dgroups) + + tnow := time.Now() + // Iterate through data groups and add them + for _, group := range dgroups { + if len(group.tags) == 0 { + acc.AddFields(sensor.measurementName, group.data, tags, tnow) + } else { + acc.AddFields(sensor.measurementName, group.data, group.tags, tnow) + } + } + } + } + }(ctx, sensor) + } + + return nil +} + +func (m *OpenConfigTelemetry) Start(acc telegraf.Accumulator) error { + // Build sensors config + if m.splitSensorConfig() == 0 { + return fmt.Errorf("E! No valid sensor configuration available") + } + + // If SSL certificate is provided, use transport credentials + var err error + var transportCredentials credentials.TransportCredentials + if m.SSLCert != "" { + transportCredentials, err = credentials.NewClientTLSFromFile(m.SSLCert, "") + if err != nil { + return fmt.Errorf("E! Failed to read certificate: %v", err) + } + } else { + transportCredentials = nil + } + + // Connect to given list of servers and start collecting data + var grpcClientConn *grpc.ClientConn + var wg sync.WaitGroup + ctx := context.Background() + m.wg = &wg + for _, server := range m.Servers { + // Extract device address and port + grpcServer, grpcPort, err := net.SplitHostPort(server) + if err != nil { + log.Printf("E! Invalid server address: %v", err) + continue + } + + // If a certificate is provided, open a secure channel. Else open insecure one + if transportCredentials != nil { + grpcClientConn, err = grpc.Dial(server, grpc.WithTransportCredentials(transportCredentials)) + } else { + grpcClientConn, err = grpc.Dial(server, grpc.WithInsecure()) + } + if err != nil { + log.Printf("E! Failed to connect to %s: %v", server, err) + } else { + log.Printf("D! Opened a new gRPC session to %s on port %s", grpcServer, grpcPort) + } + + // Add to the list of client connections + m.grpcClientConns = append(m.grpcClientConns, grpcClientConn) + + if m.Username != "" && m.Password != "" && m.ClientID != "" { + lc := authentication.NewLoginClient(grpcClientConn) + loginReply, loginErr := lc.LoginCheck(ctx, + &authentication.LoginRequest{UserName: m.Username, + Password: m.Password, ClientId: m.ClientID}) + if loginErr != nil { + log.Printf("E! Could not initiate login check for %s: %v", server, err) + continue + } + + // Check if the user is authenticated. Bail if auth error + if !loginReply.Result { + log.Printf("E! Failed to authenticate the user for %s", server) + continue + } + } + + // Subscribe and gather telemetry data + m.collectData(ctx, grpcServer, grpcClientConn, acc) + } + + return nil +} + +func init() { + inputs.Add("jti_openconfig_telemetry", func() telegraf.Input { + return &OpenConfigTelemetry{ + RetryDelay: internal.Duration{Duration: time.Second}, + StrAsTags: false, + } + }) +} diff --git a/plugins/inputs/jti_openconfig_telemetry/openconfig_telemetry_test.go b/plugins/inputs/jti_openconfig_telemetry/openconfig_telemetry_test.go new file mode 100644 index 000000000..8b0abd883 --- /dev/null +++ b/plugins/inputs/jti_openconfig_telemetry/openconfig_telemetry_test.go @@ -0,0 +1,225 @@ +package jti_openconfig_telemetry + +import ( + "log" + "net" + "os" + "testing" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs/jti_openconfig_telemetry/oc" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +var cfg = &OpenConfigTelemetry{ + Servers: []string{"127.0.0.1:50051"}, + SampleFrequency: internal.Duration{Duration: time.Second * 2}, +} + +var data = &telemetry.OpenConfigData{ + Path: "/sensor", + Kv: []*telemetry.KeyValue{{Key: "/sensor[tag='tagValue']/intKey", Value: &telemetry.KeyValue_IntValue{IntValue: 10}}}, +} + +var data_with_prefix = &telemetry.OpenConfigData{ + Path: "/sensor_with_prefix", + Kv: []*telemetry.KeyValue{{Key: "__prefix__", Value: &telemetry.KeyValue_StrValue{StrValue: "/sensor/prefix/"}}, + {Key: "intKey", Value: &telemetry.KeyValue_IntValue{IntValue: 10}}}, +} + +var data_with_multiple_tags = &telemetry.OpenConfigData{ + Path: "/sensor_with_multiple_tags", + Kv: []*telemetry.KeyValue{{Key: "__prefix__", Value: &telemetry.KeyValue_StrValue{StrValue: "/sensor/prefix/"}}, + {Key: "tagKey[tag='tagValue']/boolKey", Value: &telemetry.KeyValue_BoolValue{BoolValue: false}}, + {Key: "intKey", Value: &telemetry.KeyValue_IntValue{IntValue: 10}}}, +} + +var data_with_string_values = &telemetry.OpenConfigData{ + Path: "/sensor_with_string_values", + Kv: []*telemetry.KeyValue{{Key: "__prefix__", Value: &telemetry.KeyValue_StrValue{StrValue: "/sensor/prefix/"}}, + {Key: "strKey[tag='tagValue']/strValue", Value: &telemetry.KeyValue_StrValue{StrValue: "10"}}}, +} + +type openConfigTelemetryServer struct { +} + +func (s *openConfigTelemetryServer) TelemetrySubscribe(req *telemetry.SubscriptionRequest, stream telemetry.OpenConfigTelemetry_TelemetrySubscribeServer) error { + path := req.PathList[0].Path + if path == "/sensor" { + stream.Send(data) + } else if path == "/sensor_with_prefix" { + stream.Send(data_with_prefix) + } else if path == "/sensor_with_multiple_tags" { + stream.Send(data_with_multiple_tags) + } else if path == "/sensor_with_string_values" { + stream.Send(data_with_string_values) + } + return nil +} + +func (s *openConfigTelemetryServer) CancelTelemetrySubscription(ctx context.Context, req *telemetry.CancelSubscriptionRequest) (*telemetry.CancelSubscriptionReply, error) { + return nil, nil +} + +func (s *openConfigTelemetryServer) GetTelemetrySubscriptions(ctx context.Context, req *telemetry.GetSubscriptionsRequest) (*telemetry.GetSubscriptionsReply, error) { + return nil, nil +} + +func (s *openConfigTelemetryServer) GetTelemetryOperationalState(ctx context.Context, req *telemetry.GetOperationalStateRequest) (*telemetry.GetOperationalStateReply, error) { + return nil, nil +} + +func (s *openConfigTelemetryServer) GetDataEncodings(ctx context.Context, req *telemetry.DataEncodingRequest) (*telemetry.DataEncodingReply, error) { + return nil, nil +} + +func newServer() *openConfigTelemetryServer { + s := new(openConfigTelemetryServer) + return s +} + +func TestOpenConfigTelemetryData(t *testing.T) { + var acc testutil.Accumulator + + cfg.Sensors = []string{"/sensor"} + err := cfg.Start(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "device": "127.0.0.1", + "/sensor/@tag": "tagValue", + "system_id": "", + "path": "/sensor", + } + + fields := map[string]interface{}{ + "/sensor/intKey": int64(10), + "_sequence": uint64(0), + "_timestamp": uint64(0), + "_component_id": uint32(0), + "_subcomponent_id": uint32(0), + } + + // Give sometime for gRPC channel to be established + time.Sleep(2 * time.Second) + + acc.AssertContainsTaggedFields(t, "/sensor", fields, tags) +} + +func TestOpenConfigTelemetryDataWithPrefix(t *testing.T) { + var acc testutil.Accumulator + cfg.Sensors = []string{"/sensor_with_prefix"} + err := cfg.Start(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "device": "127.0.0.1", + "system_id": "", + "path": "/sensor_with_prefix", + } + + fields := map[string]interface{}{ + "/sensor/prefix/intKey": int64(10), + "_sequence": uint64(0), + "_timestamp": uint64(0), + "_component_id": uint32(0), + "_subcomponent_id": uint32(0), + } + + // Give sometime for gRPC channel to be established + time.Sleep(2 * time.Second) + + acc.AssertContainsTaggedFields(t, "/sensor_with_prefix", fields, tags) +} + +func TestOpenConfigTelemetryDataWithMultipleTags(t *testing.T) { + var acc testutil.Accumulator + cfg.Sensors = []string{"/sensor_with_multiple_tags"} + err := cfg.Start(&acc) + require.NoError(t, err) + + tags1 := map[string]string{ + "/sensor/prefix/tagKey/@tag": "tagValue", + "device": "127.0.0.1", + "system_id": "", + "path": "/sensor_with_multiple_tags", + } + + fields1 := map[string]interface{}{ + "/sensor/prefix/tagKey/boolKey": false, + "_sequence": uint64(0), + "_timestamp": uint64(0), + "_component_id": uint32(0), + "_subcomponent_id": uint32(0), + } + + tags2 := map[string]string{ + "device": "127.0.0.1", + "system_id": "", + "path": "/sensor_with_multiple_tags", + } + + fields2 := map[string]interface{}{ + "/sensor/prefix/intKey": int64(10), + "_sequence": uint64(0), + "_timestamp": uint64(0), + "_component_id": uint32(0), + "_subcomponent_id": uint32(0), + } + + // Give sometime for gRPC channel to be established + time.Sleep(2 * time.Second) + + acc.AssertContainsTaggedFields(t, "/sensor_with_multiple_tags", fields1, tags1) + acc.AssertContainsTaggedFields(t, "/sensor_with_multiple_tags", fields2, tags2) +} + +func TestOpenConfigTelemetryDataWithStringValues(t *testing.T) { + var acc testutil.Accumulator + cfg.Sensors = []string{"/sensor_with_string_values"} + err := cfg.Start(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "/sensor/prefix/strKey/@tag": "tagValue", + "device": "127.0.0.1", + "system_id": "", + "path": "/sensor_with_string_values", + } + + fields := map[string]interface{}{ + "/sensor/prefix/strKey/strValue": "10", + "_sequence": uint64(0), + "_timestamp": uint64(0), + "_component_id": uint32(0), + "_subcomponent_id": uint32(0), + } + + // Give sometime for gRPC channel to be established + time.Sleep(2 * time.Second) + + acc.AssertContainsTaggedFields(t, "/sensor_with_string_values", fields, tags) +} + +func TestMain(m *testing.M) { + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + log.Fatalf("Failed to listen: %v", err) + } + + cfg.Servers = []string{lis.Addr().String()} + + var opts []grpc.ServerOption + grpcServer := grpc.NewServer(opts...) + telemetry.RegisterOpenConfigTelemetryServer(grpcServer, newServer()) + go func() { + grpcServer.Serve(lis) + }() + defer grpcServer.Stop() + os.Exit(m.Run()) +}