Add Cisco model-driven telemetry & GNMI inputs (#5852)

This commit is contained in:
Steven Barth
2019-06-04 23:39:46 +02:00
committed by Daniel Nelson
parent 25471f6722
commit e18393fabf
11 changed files with 1672 additions and 0 deletions

View File

@@ -0,0 +1,41 @@
# Cisco model-driven telemetry (MDT)
Cisco model-driven telemetry (MDT) is an input plugin that consumes
telemetry data from Cisco IOS XR, IOS XE and NX-OS platforms. It supports TCP & GRPC dialout transports.
GRPC-based transport can utilize TLS for authentication and encryption.
Telemetry data is expected to be GPB-KV (self-describing-gpb) encoded.
The GRPC dialout transport is supported on various IOS XR (64-bit) 6.1.x and later, IOS XE 16.10 and later, as well as NX-OS 7.x and later platforms.
The TCP dialout transport is supported on IOS XR (32-bit and 64-bit) 6.1.x and later.
### Configuration:
This is a sample configuration for the plugin.
```toml
[[inputs.cisco_telemetry_mdt]]
## Telemetry transport (one of: tcp, grpc)
transport = "grpc"
## Address and port to host telemetry listener
service_address = ":57000"
## Enable TLS for GRPC transport
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Enable TLS client authentication and define allowed CA certificates
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
## Define aliases to map telemetry encoding paths to simple measurement names
[inputs.cisco_telemetry_mdt.aliases]
ifstats = "ietf-interfaces:interfaces-state/interface/statistics"
```
### Example Output:
```
ifstats,path=ietf-interfaces:interfaces-state/interface/statistics,host=linux,name=GigabitEthernet2,source=csr1kv,subscription=101 in-unicast-pkts=27i,in-multicast-pkts=0i,discontinuity-time="2019-05-23T07:40:23.000362+00:00",in-octets=5233i,in-errors=0i,out-multicast-pkts=0i,out-discards=0i,in-broadcast-pkts=0i,in-discards=0i,in-unknown-protos=0i,out-unicast-pkts=0i,out-broadcast-pkts=0i,out-octets=0i,out-errors=0i 1559150462624000000
ifstats,path=ietf-interfaces:interfaces-state/interface/statistics,host=linux,name=GigabitEthernet1,source=csr1kv,subscription=101 in-octets=3394770806i,in-broadcast-pkts=0i,in-multicast-pkts=0i,out-broadcast-pkts=0i,in-unknown-protos=0i,out-octets=350212i,in-unicast-pkts=9477273i,in-discards=0i,out-unicast-pkts=2726i,out-discards=0i,discontinuity-time="2019-05-23T07:40:23.000363+00:00",in-errors=30i,out-multicast-pkts=0i,out-errors=0i 1559150462624000000
```

View File

@@ -0,0 +1,391 @@
package cisco_telemetry_mdt
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"log"
"net"
"strings"
"sync"
"time"
dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout"
telemetry "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis"
"github.com/golang/protobuf/proto"
"github.com/influxdata/telegraf"
internaltls "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
)
const (
// Maximum telemetry payload size (in bytes) to accept for GRPC dialout transport
tcpMaxMsgLen uint32 = 1024 * 1024
)
// CiscoTelemetryMDT plugin for IOS XR, IOS XE and NXOS platforms
type CiscoTelemetryMDT struct {
// Common configuration
Transport string
ServiceAddress string `toml:"service_address"`
MaxMsgSize int `toml:"max_msg_size"`
Aliases map[string]string `toml:"aliases"`
// GRPC TLS settings
internaltls.ServerConfig
// Internal listener / client handle
grpcServer *grpc.Server
listener net.Listener
// Internal state
aliases map[string]string
acc telegraf.Accumulator
wg sync.WaitGroup
}
// Start the Cisco MDT service
func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error {
var err error
c.acc = acc
c.listener, err = net.Listen("tcp", c.ServiceAddress)
if err != nil {
return err
}
// Invert aliases list
c.aliases = make(map[string]string, len(c.Aliases))
for alias, path := range c.Aliases {
c.aliases[path] = alias
}
switch c.Transport {
case "tcp":
// TCP dialout server accept routine
c.wg.Add(1)
go func() {
c.acceptTCPClients()
c.wg.Done()
}()
case "grpc":
var opts []grpc.ServerOption
tlsConfig, err := c.ServerConfig.TLSConfig()
if err != nil {
return err
} else if tlsConfig != nil {
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
if c.MaxMsgSize > 0 {
opts = append(opts, grpc.MaxRecvMsgSize(c.MaxMsgSize))
}
c.grpcServer = grpc.NewServer(opts...)
dialout.RegisterGRPCMdtDialoutServer(c.grpcServer, c)
c.wg.Add(1)
go func() {
c.grpcServer.Serve(c.listener)
c.wg.Done()
}()
default:
c.listener.Close()
return fmt.Errorf("invalid Cisco MDT transport: %s", c.Transport)
}
return nil
}
// AcceptTCPDialoutClients defines the TCP dialout server main routine
func (c *CiscoTelemetryMDT) acceptTCPClients() {
// Keep track of all active connections, so we can close them if necessary
var mutex sync.Mutex
clients := make(map[net.Conn]struct{})
for {
conn, err := c.listener.Accept()
if neterr, ok := err.(*net.OpError); ok && (neterr.Timeout() || neterr.Temporary()) {
continue
} else if err != nil {
break // Stop() will close the connection so Accept() will fail here
}
mutex.Lock()
clients[conn] = struct{}{}
mutex.Unlock()
// Individual client connection routine
c.wg.Add(1)
go func() {
log.Printf("D! [inputs.cisco_telemetry_mdt]: Accepted Cisco MDT TCP dialout connection from %s", conn.RemoteAddr())
if err := c.handleTCPClient(conn); err != nil {
c.acc.AddError(err)
}
log.Printf("D! [inputs.cisco_telemetry_mdt]: Closed Cisco MDT TCP dialout connection from %s", conn.RemoteAddr())
mutex.Lock()
delete(clients, conn)
mutex.Unlock()
conn.Close()
c.wg.Done()
}()
}
// Close all remaining client connections
mutex.Lock()
for client := range clients {
if err := client.Close(); err != nil {
log.Printf("E! [inputs.cisco_telemetry_mdt]: Failed to close TCP dialout client: %v", err)
}
}
mutex.Unlock()
}
// Handle a TCP telemetry client
func (c *CiscoTelemetryMDT) handleTCPClient(conn net.Conn) error {
// TCP Dialout telemetry framing header
var hdr struct {
MsgType uint16
MsgEncap uint16
MsgHdrVersion uint16
MsgFlags uint16
MsgLen uint32
}
var payload bytes.Buffer
for {
// Read and validate dialout telemetry header
if err := binary.Read(conn, binary.BigEndian, &hdr); err != nil {
return err
}
maxMsgSize := tcpMaxMsgLen
if c.MaxMsgSize > 0 {
maxMsgSize = uint32(c.MaxMsgSize)
}
if hdr.MsgLen > maxMsgSize {
return fmt.Errorf("dialout packet too long: %v", hdr.MsgLen)
} else if hdr.MsgFlags != 0 {
return fmt.Errorf("invalid dialout flags: %v", hdr.MsgFlags)
}
// Read and handle telemetry packet
payload.Reset()
if size, err := payload.ReadFrom(io.LimitReader(conn, int64(hdr.MsgLen))); size != int64(hdr.MsgLen) {
if err != nil {
return err
}
return fmt.Errorf("TCP dialout premature EOF")
}
c.handleTelemetry(payload.Bytes())
}
}
// MdtDialout RPC server method for grpc-dialout transport
func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutServer) error {
peer, peerOK := peer.FromContext(stream.Context())
if peerOK {
log.Printf("D! [inputs.cisco_telemetry_mdt]: Accepted Cisco MDT GRPC dialout connection from %s", peer.Addr)
}
for {
packet, err := stream.Recv()
if err != nil {
if err != io.EOF {
c.acc.AddError(fmt.Errorf("GRPC dialout receive error: %v", err))
}
break
}
if len(packet.Data) == 0 && len(packet.Errors) != 0 {
c.acc.AddError(fmt.Errorf("GRPC dialout error: %s", packet.Errors))
break
}
c.handleTelemetry(packet.Data)
}
if peerOK {
log.Printf("D! [inputs.cisco_telemetry_mdt]: Closed Cisco MDT GRPC dialout connection from %s", peer.Addr)
}
return nil
}
// Handle telemetry packet from any transport, decode and add as measurement
func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) {
var namebuf bytes.Buffer
telemetry := &telemetry.Telemetry{}
err := proto.Unmarshal(data, telemetry)
if err != nil {
c.acc.AddError(fmt.Errorf("Cisco MDT failed to decode: %v", err))
return
}
for _, gpbkv := range telemetry.DataGpbkv {
var fields map[string]interface{}
// Produce metadata tags
var tags map[string]string
// Top-level field may have measurement timestamp, if not use message timestamp
measured := gpbkv.Timestamp
if measured == 0 {
measured = telemetry.MsgTimestamp
}
timestamp := time.Unix(int64(measured/1000), int64(measured%1000)*1000000)
// Populate tags and fields from toplevel GPBKV fields "keys" and "content"
for _, field := range gpbkv.Fields {
switch field.Name {
case "keys":
tags = make(map[string]string, len(field.Fields)+2)
tags["source"] = telemetry.GetNodeIdStr()
tags["subscription"] = telemetry.GetSubscriptionIdStr()
for _, subfield := range field.Fields {
c.parseGPBKVField(subfield, &namebuf, telemetry.EncodingPath, timestamp, tags, nil)
}
case "content":
fields = make(map[string]interface{}, len(field.Fields))
for _, subfield := range field.Fields {
c.parseGPBKVField(subfield, &namebuf, telemetry.EncodingPath, timestamp, tags, fields)
}
default:
log.Printf("I! [inputs.cisco_telemetry_mdt]: Unexpected top-level MDT field: %s", field.Name)
}
}
// Find best alias for encoding path and emit measurement
if len(fields) > 0 && len(tags) > 0 && len(telemetry.EncodingPath) > 0 {
name := telemetry.EncodingPath
if alias, ok := c.aliases[name]; ok {
tags["path"] = name
name = alias
} else {
log.Printf("D! [inputs.cisco_telemetry_mdt]: No measurement alias for encoding path: %s", name)
}
c.acc.AddFields(name, fields, tags, timestamp)
} else {
c.acc.AddError(fmt.Errorf("empty encoding path or measurement"))
}
}
}
// Recursively parse GPBKV field structure into fields or tags
func (c *CiscoTelemetryMDT) parseGPBKVField(field *telemetry.TelemetryField, namebuf *bytes.Buffer,
path string, timestamp time.Time, tags map[string]string, fields map[string]interface{}) {
namelen := namebuf.Len()
if namelen > 0 {
namebuf.WriteRune('/')
}
namebuf.WriteString(strings.Replace(field.Name, "-", "_", -1))
// Decode Telemetry field value if set
var value interface{}
switch val := field.ValueByType.(type) {
case *telemetry.TelemetryField_BytesValue:
value = val.BytesValue
case *telemetry.TelemetryField_StringValue:
value = val.StringValue
case *telemetry.TelemetryField_BoolValue:
value = val.BoolValue
case *telemetry.TelemetryField_Uint32Value:
value = val.Uint32Value
case *telemetry.TelemetryField_Uint64Value:
value = val.Uint64Value
case *telemetry.TelemetryField_Sint32Value:
value = val.Sint32Value
case *telemetry.TelemetryField_Sint64Value:
value = val.Sint64Value
case *telemetry.TelemetryField_DoubleValue:
value = val.DoubleValue
case *telemetry.TelemetryField_FloatValue:
value = val.FloatValue
}
if value != nil {
// Distinguish between tags (keys) and fields (data) to write to
if fields != nil {
fields[namebuf.String()] = value
} else {
if _, exists := tags[field.Name]; !exists { // Use short keys whenever possible
tags[field.Name] = fmt.Sprint(value)
} else {
tags[namebuf.String()] = fmt.Sprint(value)
}
}
}
for _, subfield := range field.Fields {
c.parseGPBKVField(subfield, namebuf, path, timestamp, tags, fields)
}
namebuf.Truncate(namelen)
}
// Stop listener and cleanup
func (c *CiscoTelemetryMDT) Stop() {
if c.grpcServer != nil {
// Stop server and terminate all running dialout routines
c.grpcServer.Stop()
}
if c.listener != nil {
c.listener.Close()
}
c.wg.Wait()
}
const sampleConfig = `
## Telemetry transport (one of: tcp, grpc)
transport = "grpc"
## Address and port to host telemetry listener
service_address = ":57000"
## Enable TLS for GRPC transport
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Enable TLS client authentication and define allowed CA certificates
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
## Define aliases to map telemetry encoding paths to simple measurement names
[inputs.cisco_telemetry_mdt.aliases]
ifstats = "ietf-interfaces:interfaces-state/interface/statistics"
`
// SampleConfig of plugin
func (c *CiscoTelemetryMDT) SampleConfig() string {
return sampleConfig
}
// Description of plugin
func (c *CiscoTelemetryMDT) Description() string {
return "Cisco model-driven telemetry (MDT) input plugin for IOS XR, IOS XE and NX-OS platforms"
}
// Gather plugin measurements (unused)
func (c *CiscoTelemetryMDT) Gather(_ telegraf.Accumulator) error {
return nil
}
func init() {
inputs.Add("cisco_telemetry_mdt", func() telegraf.Input {
return &CiscoTelemetryMDT{
Transport: "grpc",
ServiceAddress: "127.0.0.1:57000",
}
})
}

View File

@@ -0,0 +1,362 @@
package cisco_telemetry_mdt
import (
"context"
"encoding/binary"
"errors"
"net"
"testing"
"github.com/golang/protobuf/proto"
dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout"
telemetry "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)
func TestHandleTelemetryEmpty(t *testing.T) {
c := &CiscoTelemetryMDT{Transport: "dummy"}
acc := &testutil.Accumulator{}
c.Start(acc)
telemetry := &telemetry.Telemetry{
DataGpbkv: []*telemetry.TelemetryField{
{},
},
}
data, _ := proto.Marshal(telemetry)
c.handleTelemetry(data)
assert.Contains(t, acc.Errors, errors.New("empty encoding path or measurement"))
assert.Empty(t, acc.Metrics)
}
func TestHandleTelemetryTwoSimple(t *testing.T) {
c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"alias": "type:model/some/path"}}
acc := &testutil.Accumulator{}
c.Start(acc)
telemetry := &telemetry.Telemetry{
MsgTimestamp: 1543236572000,
EncodingPath: "type:model/some/path",
NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"},
Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"},
DataGpbkv: []*telemetry.TelemetryField{
{
Fields: []*telemetry.TelemetryField{
{
Name: "keys",
Fields: []*telemetry.TelemetryField{
{
Name: "name",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "str"},
},
{
Name: "uint64",
ValueByType: &telemetry.TelemetryField_Uint64Value{Uint64Value: 1234},
},
},
},
{
Name: "content",
Fields: []*telemetry.TelemetryField{
{
Name: "bool",
ValueByType: &telemetry.TelemetryField_BoolValue{BoolValue: true},
},
},
},
},
},
{
Fields: []*telemetry.TelemetryField{
{
Name: "keys",
Fields: []*telemetry.TelemetryField{
{
Name: "name",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "str2"},
},
},
},
{
Name: "content",
Fields: []*telemetry.TelemetryField{
{
Name: "bool",
ValueByType: &telemetry.TelemetryField_BoolValue{BoolValue: false},
},
},
},
},
},
},
}
data, _ := proto.Marshal(telemetry)
c.handleTelemetry(data)
assert.Empty(t, acc.Errors)
tags := map[string]string{"path": "type:model/some/path", "name": "str", "uint64": "1234", "source": "hostname", "subscription": "subscription"}
fields := map[string]interface{}{"bool": true}
acc.AssertContainsTaggedFields(t, "alias", fields, tags)
tags = map[string]string{"path": "type:model/some/path", "name": "str2", "source": "hostname", "subscription": "subscription"}
fields = map[string]interface{}{"bool": false}
acc.AssertContainsTaggedFields(t, "alias", fields, tags)
}
func TestHandleTelemetrySingleNested(t *testing.T) {
c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"nested": "type:model/nested/path"}}
acc := &testutil.Accumulator{}
c.Start(acc)
telemetry := &telemetry.Telemetry{
MsgTimestamp: 1543236572000,
EncodingPath: "type:model/nested/path",
NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"},
Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"},
DataGpbkv: []*telemetry.TelemetryField{
{
Fields: []*telemetry.TelemetryField{
{
Name: "keys",
Fields: []*telemetry.TelemetryField{
{
Name: "nested",
Fields: []*telemetry.TelemetryField{
{
Name: "key",
Fields: []*telemetry.TelemetryField{
{
Name: "level",
ValueByType: &telemetry.TelemetryField_DoubleValue{DoubleValue: 3},
},
},
},
},
},
},
},
{
Name: "content",
Fields: []*telemetry.TelemetryField{
{
Name: "nested",
Fields: []*telemetry.TelemetryField{
{
Name: "value",
Fields: []*telemetry.TelemetryField{
{
Name: "foo",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"},
},
},
},
},
},
},
},
},
},
},
}
data, _ := proto.Marshal(telemetry)
c.handleTelemetry(data)
assert.Empty(t, acc.Errors)
tags := map[string]string{"path": "type:model/nested/path", "level": "3", "source": "hostname", "subscription": "subscription"}
fields := map[string]interface{}{"nested/value/foo": "bar"}
acc.AssertContainsTaggedFields(t, "nested", fields, tags)
}
func TestTCPDialoutOverflow(t *testing.T) {
c := &CiscoTelemetryMDT{Transport: "tcp", ServiceAddress: "127.0.0.1:57000"}
acc := &testutil.Accumulator{}
assert.Nil(t, c.Start(acc))
hdr := struct {
MsgType uint16
MsgEncap uint16
MsgHdrVersion uint16
MsgFlags uint16
MsgLen uint32
}{MsgLen: uint32(1000000000)}
conn, _ := net.Dial("tcp", "127.0.0.1:57000")
binary.Write(conn, binary.BigEndian, hdr)
conn.Read([]byte{0})
conn.Close()
c.Stop()
assert.Contains(t, acc.Errors, errors.New("dialout packet too long: 1000000000"))
}
func mockTelemetryMessage() *telemetry.Telemetry {
return &telemetry.Telemetry{
MsgTimestamp: 1543236572000,
EncodingPath: "type:model/some/path",
NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"},
Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"},
DataGpbkv: []*telemetry.TelemetryField{
{
Fields: []*telemetry.TelemetryField{
{
Name: "keys",
Fields: []*telemetry.TelemetryField{
{
Name: "name",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "str"},
},
},
},
{
Name: "content",
Fields: []*telemetry.TelemetryField{
{
Name: "value",
ValueByType: &telemetry.TelemetryField_Sint64Value{Sint64Value: -1},
},
},
},
},
},
},
}
}
func TestTCPDialoutMultiple(t *testing.T) {
c := &CiscoTelemetryMDT{Transport: "tcp", ServiceAddress: "127.0.0.1:57000", Aliases: map[string]string{
"some": "type:model/some/path", "parallel": "type:model/parallel/path", "other": "type:model/other/path"}}
acc := &testutil.Accumulator{}
assert.Nil(t, c.Start(acc))
telemetry := mockTelemetryMessage()
hdr := struct {
MsgType uint16
MsgEncap uint16
MsgHdrVersion uint16
MsgFlags uint16
MsgLen uint32
}{}
conn, _ := net.Dial("tcp", "127.0.0.1:57000")
data, _ := proto.Marshal(telemetry)
hdr.MsgLen = uint32(len(data))
binary.Write(conn, binary.BigEndian, hdr)
conn.Write(data)
conn2, _ := net.Dial("tcp", "127.0.0.1:57000")
telemetry.EncodingPath = "type:model/parallel/path"
data, _ = proto.Marshal(telemetry)
hdr.MsgLen = uint32(len(data))
binary.Write(conn2, binary.BigEndian, hdr)
conn2.Write(data)
conn2.Write([]byte{0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0})
conn2.Read([]byte{0})
conn2.Close()
telemetry.EncodingPath = "type:model/other/path"
data, _ = proto.Marshal(telemetry)
hdr.MsgLen = uint32(len(data))
binary.Write(conn, binary.BigEndian, hdr)
conn.Write(data)
conn.Write([]byte{0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0})
conn.Read([]byte{0})
c.Stop()
conn.Close()
// We use the invalid dialout flags to let the server close the connection
assert.Equal(t, acc.Errors, []error{errors.New("invalid dialout flags: 257"), errors.New("invalid dialout flags: 257")})
tags := map[string]string{"path": "type:model/some/path", "name": "str", "source": "hostname", "subscription": "subscription"}
fields := map[string]interface{}{"value": int64(-1)}
acc.AssertContainsTaggedFields(t, "some", fields, tags)
tags = map[string]string{"path": "type:model/parallel/path", "name": "str", "source": "hostname", "subscription": "subscription"}
fields = map[string]interface{}{"value": int64(-1)}
acc.AssertContainsTaggedFields(t, "parallel", fields, tags)
tags = map[string]string{"path": "type:model/other/path", "name": "str", "source": "hostname", "subscription": "subscription"}
fields = map[string]interface{}{"value": int64(-1)}
acc.AssertContainsTaggedFields(t, "other", fields, tags)
}
func TestGRPCDialoutError(t *testing.T) {
c := &CiscoTelemetryMDT{Transport: "grpc", ServiceAddress: "127.0.0.1:57001"}
acc := &testutil.Accumulator{}
assert.Nil(t, c.Start(acc))
conn, _ := grpc.Dial("127.0.0.1:57001", grpc.WithInsecure())
client := dialout.NewGRPCMdtDialoutClient(conn)
stream, _ := client.MdtDialout(context.Background())
args := &dialout.MdtDialoutArgs{Errors: "foobar"}
stream.Send(args)
// Wait for the server to close
stream.Recv()
c.Stop()
assert.Equal(t, acc.Errors, []error{errors.New("GRPC dialout error: foobar")})
}
func TestGRPCDialoutMultiple(t *testing.T) {
c := &CiscoTelemetryMDT{Transport: "grpc", ServiceAddress: "127.0.0.1:57001", Aliases: map[string]string{
"some": "type:model/some/path", "parallel": "type:model/parallel/path", "other": "type:model/other/path"}}
acc := &testutil.Accumulator{}
assert.Nil(t, c.Start(acc))
telemetry := mockTelemetryMessage()
conn, _ := grpc.Dial("127.0.0.1:57001", grpc.WithInsecure(), grpc.WithBlock())
client := dialout.NewGRPCMdtDialoutClient(conn)
stream, _ := client.MdtDialout(context.TODO())
data, _ := proto.Marshal(telemetry)
args := &dialout.MdtDialoutArgs{Data: data, ReqId: 456}
stream.Send(args)
conn2, _ := grpc.Dial("127.0.0.1:57001", grpc.WithInsecure(), grpc.WithBlock())
client2 := dialout.NewGRPCMdtDialoutClient(conn2)
stream2, _ := client2.MdtDialout(context.TODO())
telemetry.EncodingPath = "type:model/parallel/path"
data, _ = proto.Marshal(telemetry)
args = &dialout.MdtDialoutArgs{Data: data}
stream2.Send(args)
stream2.Send(&dialout.MdtDialoutArgs{Errors: "testclose"})
stream2.Recv()
conn2.Close()
telemetry.EncodingPath = "type:model/other/path"
data, _ = proto.Marshal(telemetry)
args = &dialout.MdtDialoutArgs{Data: data}
stream.Send(args)
stream.Send(&dialout.MdtDialoutArgs{Errors: "testclose"})
stream.Recv()
c.Stop()
conn.Close()
assert.Equal(t, acc.Errors, []error{errors.New("GRPC dialout error: testclose"), errors.New("GRPC dialout error: testclose")})
tags := map[string]string{"path": "type:model/some/path", "name": "str", "source": "hostname", "subscription": "subscription"}
fields := map[string]interface{}{"value": int64(-1)}
acc.AssertContainsTaggedFields(t, "some", fields, tags)
tags = map[string]string{"path": "type:model/parallel/path", "name": "str", "source": "hostname", "subscription": "subscription"}
fields = map[string]interface{}{"value": int64(-1)}
acc.AssertContainsTaggedFields(t, "parallel", fields, tags)
tags = map[string]string{"path": "type:model/other/path", "name": "str", "source": "hostname", "subscription": "subscription"}
fields = map[string]interface{}{"value": int64(-1)}
acc.AssertContainsTaggedFields(t, "other", fields, tags)
}