From e6f06441282ffd994fd1b13b2413c024c34cf90b Mon Sep 17 00:00:00 2001 From: Tim Hughes Date: Thu, 12 Mar 2020 23:45:35 +0000 Subject: [PATCH] Add Arista LANZ consumer input plugin (#4112) --- README.md | 1 + go.mod | 2 + go.sum | 5 ++ plugins/inputs/all/all.go | 1 + plugins/inputs/lanz/README.md | 87 ++++++++++++++++++++ plugins/inputs/lanz/lanz.go | 137 +++++++++++++++++++++++++++++++ plugins/inputs/lanz/lanz_test.go | 137 +++++++++++++++++++++++++++++++ 7 files changed, 370 insertions(+) create mode 100644 plugins/inputs/lanz/README.md create mode 100644 plugins/inputs/lanz/lanz.go create mode 100644 plugins/inputs/lanz/lanz_test.go diff --git a/README.md b/README.md index 3fc101154..787ade0af 100644 --- a/README.md +++ b/README.md @@ -225,6 +225,7 @@ For documentation on the latest development code see the [documentation index][d * [kibana](./plugins/inputs/kibana) * [kubernetes](./plugins/inputs/kubernetes) * [kube_inventory](./plugins/inputs/kube_inventory) +* [lanz](./plugins/inputs/lanz) * [leofs](./plugins/inputs/leofs) * [linux_sysctl_fs](./plugins/inputs/linux_sysctl_fs) * [logparser](./plugins/inputs/logparser) diff --git a/go.mod b/go.mod index 03fb8d9e9..062e70f50 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,8 @@ require ( github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9 github.com/apache/thrift v0.12.0 + github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3 // indirect + github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740 github.com/armon/go-metrics v0.3.0 // indirect github.com/aws/aws-sdk-go v1.19.41 github.com/bitly/go-hostpool v0.1.0 // indirect diff --git a/go.sum b/go.sum index 719043dbd..a0d8fd479 100644 --- a/go.sum +++ b/go.sum @@ -60,6 +60,10 @@ github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9 h1:FXrPTd8Rdlc94dKccl github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc= github.com/apache/thrift v0.12.0 h1:pODnxUFNcjP9UTLZGTdeh+j16A8lJbRvD3rOtrk/7bs= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3 h1:Bmjk+DjIi3tTAU0wxGaFbfjGUqlxxSXARq9A96Kgoos= +github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3/go.mod h1:KASm+qXFKs/xjSoWn30NrWBBvdTTQq+UjkhjEJHfSFA= +github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740 h1:FD4/ikKOFxwP8muWDypbmBWc634+YcAs3eBrYAmRdZY= +github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740/go.mod h1:D/tb0zPVXnP7fmsLZjtdUhSsumbK/ij54UXjjVgMGxQ= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-metrics v0.3.0 h1:B7AQgHi8QSEi4uHu7Sbsga+IJDU+CENgjxoo81vDUqU= github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs= @@ -538,6 +542,7 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 h1:z99zHgr7hKfrUcX/KsoJk5 golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index ace0d0044..6624053df 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -80,6 +80,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/kinesis_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/kube_inventory" _ "github.com/influxdata/telegraf/plugins/inputs/kubernetes" + _ "github.com/influxdata/telegraf/plugins/inputs/lanz" _ "github.com/influxdata/telegraf/plugins/inputs/leofs" _ "github.com/influxdata/telegraf/plugins/inputs/linux_sysctl_fs" _ "github.com/influxdata/telegraf/plugins/inputs/logparser" diff --git a/plugins/inputs/lanz/README.md b/plugins/inputs/lanz/README.md new file mode 100644 index 000000000..95fe02a9b --- /dev/null +++ b/plugins/inputs/lanz/README.md @@ -0,0 +1,87 @@ +# Arista LANZ Consumer Input Plugin + +This plugin provides a consumer for use with Arista Networks’ Latency Analyzer (LANZ) + +Metrics are read from a stream of data via TCP through port 50001 on the +switches management IP. The data is in Protobuffers format. For more information on Arista LANZ + +- https://www.arista.com/en/um-eos/eos-latency-analyzer-lanz + +This plugin uses Arista's sdk. + +- https://github.com/aristanetworks/goarista + +### Configuration + +You will need to configure LANZ and enable streaming LANZ data. + +- https://www.arista.com/en/um-eos/eos-section-44-3-configuring-lanz +- https://www.arista.com/en/um-eos/eos-section-44-3-configuring-lanz#ww1149292 + +```toml +[[inputs.lanz]] + servers = [ + "tcp://switch1.int.example.com:50001", + "tcp://switch2.int.example.com:50001", + ] +``` + +### Metrics + +For more details on the metrics see https://github.com/aristanetworks/goarista/blob/master/lanz/proto/lanz.proto + +- lanz_congestion_record: + - tags: + - intf_name + - switch_id + - port_id + - entry_type + - traffic_class + - fabric_peer_intf_name + - source + - port + - fields: + - timestamp (integer) + - queue_size (integer) + - time_of_max_qlen (integer) + - tx_latency (integer) + - q_drop_count (integer) + +- lanz_global_buffer_usage_record + - tags: + - entry_type + - source + - port + - fields: + - timestamp (integer) + - buffer_size (integer) + - duration (integer) + + + +### Sample Queries + +Get the max tx_latency for the last hour for all interfaces on all switches. +``` +SELECT max("tx_latency") AS "max_tx_latency" FROM "congestion_record" WHERE time > now() - 1h GROUP BY time(10s), "hostname", "intf_name" +``` + +Get the max tx_latency for the last hour for all interfaces on all switches. +``` +SELECT max("queue_size") AS "max_queue_size" FROM "congestion_record" WHERE time > now() - 1h GROUP BY time(10s), "hostname", "intf_name" +``` + +Get the max buffer_size for over the last hour for all switches. +``` +SELECT max("buffer_size") AS "max_buffer_size" FROM "global_buffer_usage_record" WHERE time > now() - 1h GROUP BY time(10s), "hostname" +``` + +### Example output +``` +lanz_global_buffer_usage_record,entry_type=2,host=telegraf.int.example.com,port=50001,source=switch01.int.example.com timestamp=158334105824919i,buffer_size=505i,duration=0i 1583341058300643815 +lanz_congestion_record,entry_type=2,host=telegraf.int.example.com,intf_name=Ethernet36,port=50001,port_id=61,source=switch01.int.example.com,switch_id=0,traffic_class=1 time_of_max_qlen=0i,tx_latency=564480i,q_drop_count=0i,timestamp=158334105824919i,queue_size=225i 1583341058300636045 +lanz_global_buffer_usage_record,entry_type=2,host=telegraf.int.example.com,port=50001,source=switch01.int.example.com timestamp=158334105824919i,buffer_size=589i,duration=0i 1583341058300457464 +lanz_congestion_record,entry_type=1,host=telegraf.int.example.com,intf_name=Ethernet36,port=50001,port_id=61,source=switch01.int.example.com,switch_id=0,traffic_class=1 q_drop_count=0i,timestamp=158334105824919i,queue_size=232i,time_of_max_qlen=0i,tx_latency=584640i 1583341058300450302 +``` + + diff --git a/plugins/inputs/lanz/lanz.go b/plugins/inputs/lanz/lanz.go new file mode 100644 index 000000000..7553c33c7 --- /dev/null +++ b/plugins/inputs/lanz/lanz.go @@ -0,0 +1,137 @@ +package lanz + +import ( + "net/url" + "strconv" + "sync" + "time" + + "github.com/aristanetworks/goarista/lanz" + pb "github.com/aristanetworks/goarista/lanz/proto" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +var sampleConfig = ` + ## URL to Arista LANZ endpoint + servers = [ + "tcp://127.0.0.1:50001" + ] +` + +func init() { + inputs.Add("lanz", func() telegraf.Input { + return NewLanz() + }) +} + +type Lanz struct { + Servers []string `toml:"servers"` + clients []lanz.Client + wg sync.WaitGroup +} + +func NewLanz() *Lanz { + return &Lanz{} +} + +func (l *Lanz) SampleConfig() string { + return sampleConfig +} + +func (l *Lanz) Description() string { + return "Read metrics off Arista LANZ, via socket" +} + +func (l *Lanz) Gather(acc telegraf.Accumulator) error { + return nil +} + +func (l *Lanz) Start(acc telegraf.Accumulator) error { + + if len(l.Servers) == 0 { + l.Servers = append(l.Servers, "tcp://127.0.0.1:50001") + } + + for _, server := range l.Servers { + deviceUrl, err := url.Parse(server) + if err != nil { + return err + } + client := lanz.New( + lanz.WithAddr(deviceUrl.Host), + lanz.WithBackoff(1*time.Second), + lanz.WithTimeout(10*time.Second), + ) + l.clients = append(l.clients, client) + + in := make(chan *pb.LanzRecord) + go func() { + client.Run(in) + }() + l.wg.Add(1) + go func() { + l.wg.Done() + receive(acc, in, deviceUrl) + }() + } + return nil +} + +func (l *Lanz) Stop() { + for _, client := range l.clients { + client.Stop() + } + l.wg.Wait() +} + +func receive(acc telegraf.Accumulator, in <-chan *pb.LanzRecord, deviceUrl *url.URL) { + for { + select { + case msg, ok := <-in: + if !ok { + return + } + msgToAccumulator(acc, msg, deviceUrl) + } + } +} + +func msgToAccumulator(acc telegraf.Accumulator, msg *pb.LanzRecord, deviceUrl *url.URL) { + cr := msg.GetCongestionRecord() + if cr != nil { + vals := map[string]interface{}{ + "timestamp": int64(cr.GetTimestamp()), + "queue_size": int64(cr.GetQueueSize()), + "time_of_max_qlen": int64(cr.GetTimeOfMaxQLen()), + "tx_latency": int64(cr.GetTxLatency()), + "q_drop_count": int64(cr.GetQDropCount()), + } + tags := map[string]string{ + "intf_name": cr.GetIntfName(), + "switch_id": strconv.FormatInt(int64(cr.GetSwitchId()), 10), + "port_id": strconv.FormatInt(int64(cr.GetPortId()), 10), + "entry_type": strconv.FormatInt(int64(cr.GetEntryType()), 10), + "traffic_class": strconv.FormatInt(int64(cr.GetTrafficClass()), 10), + "fabric_peer_intf_name": cr.GetFabricPeerIntfName(), + "source": deviceUrl.Hostname(), + "port": deviceUrl.Port(), + } + acc.AddFields("lanz_congestion_record", vals, tags) + } + + gbur := msg.GetGlobalBufferUsageRecord() + if gbur != nil { + vals := map[string]interface{}{ + "timestamp": int64(gbur.GetTimestamp()), + "buffer_size": int64(gbur.GetBufferSize()), + "duration": int64(gbur.GetDuration()), + } + tags := map[string]string{ + "entry_type": strconv.FormatInt(int64(gbur.GetEntryType()), 10), + "source": deviceUrl.Hostname(), + "port": deviceUrl.Port(), + } + acc.AddFields("lanz_global_buffer_usage_record", vals, tags) + } +} diff --git a/plugins/inputs/lanz/lanz_test.go b/plugins/inputs/lanz/lanz_test.go new file mode 100644 index 000000000..5f9c7ab24 --- /dev/null +++ b/plugins/inputs/lanz/lanz_test.go @@ -0,0 +1,137 @@ +package lanz + +import ( + "net/url" + "strconv" + "testing" + + pb "github.com/aristanetworks/goarista/lanz/proto" + "github.com/golang/protobuf/proto" + "github.com/influxdata/telegraf/testutil" +) + +var testProtoBufCongestionRecord1 = &pb.LanzRecord{ + CongestionRecord: &pb.CongestionRecord{ + Timestamp: proto.Uint64(100000000000000), + IntfName: proto.String("eth1"), + SwitchId: proto.Uint32(1), + PortId: proto.Uint32(1), + QueueSize: proto.Uint32(1), + EntryType: pb.CongestionRecord_EntryType.Enum(1), + TrafficClass: proto.Uint32(1), + TimeOfMaxQLen: proto.Uint64(100000000000000), + TxLatency: proto.Uint32(100), + QDropCount: proto.Uint32(1), + FabricPeerIntfName: proto.String("FabricPeerIntfName1"), + }, +} +var testProtoBufCongestionRecord2 = &pb.LanzRecord{ + CongestionRecord: &pb.CongestionRecord{ + Timestamp: proto.Uint64(200000000000000), + IntfName: proto.String("eth2"), + SwitchId: proto.Uint32(2), + PortId: proto.Uint32(2), + QueueSize: proto.Uint32(2), + EntryType: pb.CongestionRecord_EntryType.Enum(2), + TrafficClass: proto.Uint32(2), + TimeOfMaxQLen: proto.Uint64(200000000000000), + TxLatency: proto.Uint32(200), + QDropCount: proto.Uint32(2), + FabricPeerIntfName: proto.String("FabricPeerIntfName2"), + }, +} + +var testProtoBufGlobalBufferUsageRecord = &pb.LanzRecord{ + GlobalBufferUsageRecord: &pb.GlobalBufferUsageRecord{ + EntryType: pb.GlobalBufferUsageRecord_EntryType.Enum(1), + Timestamp: proto.Uint64(100000000000000), + BufferSize: proto.Uint32(1), + Duration: proto.Uint32(10), + }, +} + +func TestLanzGeneratesMetrics(t *testing.T) { + + var acc testutil.Accumulator + + l := NewLanz() + + l.Servers = append(l.Servers, "tcp://switch01.int.example.com:50001") + l.Servers = append(l.Servers, "tcp://switch02.int.example.com:50001") + deviceUrl1, err := url.Parse(l.Servers[0]) + if err != nil { + t.Fail() + } + deviceUrl2, err := url.Parse(l.Servers[1]) + if err != nil { + t.Fail() + } + + msgToAccumulator(&acc, testProtoBufCongestionRecord1, deviceUrl1) + acc.Wait(1) + + vals1 := map[string]interface{}{ + "timestamp": int64(100000000000000), + "queue_size": int64(1), + "time_of_max_qlen": int64(100000000000000), + "tx_latency": int64(100), + "q_drop_count": int64(1), + } + tags1 := map[string]string{ + "intf_name": "eth1", + "switch_id": strconv.FormatInt(int64(1), 10), + "port_id": strconv.FormatInt(int64(1), 10), + "entry_type": strconv.FormatInt(int64(1), 10), + "traffic_class": strconv.FormatInt(int64(1), 10), + "fabric_peer_intf_name": "FabricPeerIntfName1", + "source": "switch01.int.example.com", + "port": "50001", + } + + acc.AssertContainsFields(t, "lanz_congestion_record", vals1) + acc.AssertContainsTaggedFields(t, "lanz_congestion_record", vals1, tags1) + + acc.ClearMetrics() + msgToAccumulator(&acc, testProtoBufCongestionRecord2, deviceUrl2) + acc.Wait(1) + + vals2 := map[string]interface{}{ + "timestamp": int64(200000000000000), + "queue_size": int64(2), + "time_of_max_qlen": int64(200000000000000), + "tx_latency": int64(200), + "q_drop_count": int64(2), + } + tags2 := map[string]string{ + "intf_name": "eth2", + "switch_id": strconv.FormatInt(int64(2), 10), + "port_id": strconv.FormatInt(int64(2), 10), + "entry_type": strconv.FormatInt(int64(2), 10), + "traffic_class": strconv.FormatInt(int64(2), 10), + "fabric_peer_intf_name": "FabricPeerIntfName2", + "source": "switch02.int.example.com", + "port": "50001", + } + + acc.AssertContainsFields(t, "lanz_congestion_record", vals2) + acc.AssertContainsTaggedFields(t, "lanz_congestion_record", vals2, tags2) + + acc.ClearMetrics() + msgToAccumulator(&acc, testProtoBufGlobalBufferUsageRecord, deviceUrl1) + acc.Wait(1) + + gburVals1 := map[string]interface{}{ + "timestamp": int64(100000000000000), + "buffer_size": int64(1), + "duration": int64(10), + } + gburTags1 := map[string]string{ + "entry_type": strconv.FormatInt(int64(1), 10), + "source": "switch01.int.example.com", + "port": "50001", + } + + acc.AssertContainsFields(t, "lanz_global_buffer_usage_record", gburVals1) + acc.AssertContainsTaggedFields(t, "lanz_global_buffer_usage_record", gburVals1, gburTags1) + +}