Add Arista LANZ consumer input plugin (#4112)

This commit is contained in:
Tim Hughes
2020-03-12 23:45:35 +00:00
committed by GitHub
parent 42804b7c56
commit e6f0644128
7 changed files with 370 additions and 0 deletions

View File

@@ -80,6 +80,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/kinesis_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kube_inventory"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/lanz"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
_ "github.com/influxdata/telegraf/plugins/inputs/linux_sysctl_fs"
_ "github.com/influxdata/telegraf/plugins/inputs/logparser"

View File

@@ -0,0 +1,87 @@
# Arista LANZ Consumer Input Plugin
This plugin provides a consumer for use with Arista Networks Latency Analyzer (LANZ)
Metrics are read from a stream of data via TCP through port 50001 on the
switches management IP. The data is in Protobuffers format. For more information on Arista LANZ
- https://www.arista.com/en/um-eos/eos-latency-analyzer-lanz
This plugin uses Arista's sdk.
- https://github.com/aristanetworks/goarista
### Configuration
You will need to configure LANZ and enable streaming LANZ data.
- https://www.arista.com/en/um-eos/eos-section-44-3-configuring-lanz
- https://www.arista.com/en/um-eos/eos-section-44-3-configuring-lanz#ww1149292
```toml
[[inputs.lanz]]
servers = [
"tcp://switch1.int.example.com:50001",
"tcp://switch2.int.example.com:50001",
]
```
### Metrics
For more details on the metrics see https://github.com/aristanetworks/goarista/blob/master/lanz/proto/lanz.proto
- lanz_congestion_record:
- tags:
- intf_name
- switch_id
- port_id
- entry_type
- traffic_class
- fabric_peer_intf_name
- source
- port
- fields:
- timestamp (integer)
- queue_size (integer)
- time_of_max_qlen (integer)
- tx_latency (integer)
- q_drop_count (integer)
- lanz_global_buffer_usage_record
- tags:
- entry_type
- source
- port
- fields:
- timestamp (integer)
- buffer_size (integer)
- duration (integer)
### Sample Queries
Get the max tx_latency for the last hour for all interfaces on all switches.
```
SELECT max("tx_latency") AS "max_tx_latency" FROM "congestion_record" WHERE time > now() - 1h GROUP BY time(10s), "hostname", "intf_name"
```
Get the max tx_latency for the last hour for all interfaces on all switches.
```
SELECT max("queue_size") AS "max_queue_size" FROM "congestion_record" WHERE time > now() - 1h GROUP BY time(10s), "hostname", "intf_name"
```
Get the max buffer_size for over the last hour for all switches.
```
SELECT max("buffer_size") AS "max_buffer_size" FROM "global_buffer_usage_record" WHERE time > now() - 1h GROUP BY time(10s), "hostname"
```
### Example output
```
lanz_global_buffer_usage_record,entry_type=2,host=telegraf.int.example.com,port=50001,source=switch01.int.example.com timestamp=158334105824919i,buffer_size=505i,duration=0i 1583341058300643815
lanz_congestion_record,entry_type=2,host=telegraf.int.example.com,intf_name=Ethernet36,port=50001,port_id=61,source=switch01.int.example.com,switch_id=0,traffic_class=1 time_of_max_qlen=0i,tx_latency=564480i,q_drop_count=0i,timestamp=158334105824919i,queue_size=225i 1583341058300636045
lanz_global_buffer_usage_record,entry_type=2,host=telegraf.int.example.com,port=50001,source=switch01.int.example.com timestamp=158334105824919i,buffer_size=589i,duration=0i 1583341058300457464
lanz_congestion_record,entry_type=1,host=telegraf.int.example.com,intf_name=Ethernet36,port=50001,port_id=61,source=switch01.int.example.com,switch_id=0,traffic_class=1 q_drop_count=0i,timestamp=158334105824919i,queue_size=232i,time_of_max_qlen=0i,tx_latency=584640i 1583341058300450302
```

137
plugins/inputs/lanz/lanz.go Normal file
View File

@@ -0,0 +1,137 @@
package lanz
import (
"net/url"
"strconv"
"sync"
"time"
"github.com/aristanetworks/goarista/lanz"
pb "github.com/aristanetworks/goarista/lanz/proto"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
var sampleConfig = `
## URL to Arista LANZ endpoint
servers = [
"tcp://127.0.0.1:50001"
]
`
func init() {
inputs.Add("lanz", func() telegraf.Input {
return NewLanz()
})
}
type Lanz struct {
Servers []string `toml:"servers"`
clients []lanz.Client
wg sync.WaitGroup
}
func NewLanz() *Lanz {
return &Lanz{}
}
func (l *Lanz) SampleConfig() string {
return sampleConfig
}
func (l *Lanz) Description() string {
return "Read metrics off Arista LANZ, via socket"
}
func (l *Lanz) Gather(acc telegraf.Accumulator) error {
return nil
}
func (l *Lanz) Start(acc telegraf.Accumulator) error {
if len(l.Servers) == 0 {
l.Servers = append(l.Servers, "tcp://127.0.0.1:50001")
}
for _, server := range l.Servers {
deviceUrl, err := url.Parse(server)
if err != nil {
return err
}
client := lanz.New(
lanz.WithAddr(deviceUrl.Host),
lanz.WithBackoff(1*time.Second),
lanz.WithTimeout(10*time.Second),
)
l.clients = append(l.clients, client)
in := make(chan *pb.LanzRecord)
go func() {
client.Run(in)
}()
l.wg.Add(1)
go func() {
l.wg.Done()
receive(acc, in, deviceUrl)
}()
}
return nil
}
func (l *Lanz) Stop() {
for _, client := range l.clients {
client.Stop()
}
l.wg.Wait()
}
func receive(acc telegraf.Accumulator, in <-chan *pb.LanzRecord, deviceUrl *url.URL) {
for {
select {
case msg, ok := <-in:
if !ok {
return
}
msgToAccumulator(acc, msg, deviceUrl)
}
}
}
func msgToAccumulator(acc telegraf.Accumulator, msg *pb.LanzRecord, deviceUrl *url.URL) {
cr := msg.GetCongestionRecord()
if cr != nil {
vals := map[string]interface{}{
"timestamp": int64(cr.GetTimestamp()),
"queue_size": int64(cr.GetQueueSize()),
"time_of_max_qlen": int64(cr.GetTimeOfMaxQLen()),
"tx_latency": int64(cr.GetTxLatency()),
"q_drop_count": int64(cr.GetQDropCount()),
}
tags := map[string]string{
"intf_name": cr.GetIntfName(),
"switch_id": strconv.FormatInt(int64(cr.GetSwitchId()), 10),
"port_id": strconv.FormatInt(int64(cr.GetPortId()), 10),
"entry_type": strconv.FormatInt(int64(cr.GetEntryType()), 10),
"traffic_class": strconv.FormatInt(int64(cr.GetTrafficClass()), 10),
"fabric_peer_intf_name": cr.GetFabricPeerIntfName(),
"source": deviceUrl.Hostname(),
"port": deviceUrl.Port(),
}
acc.AddFields("lanz_congestion_record", vals, tags)
}
gbur := msg.GetGlobalBufferUsageRecord()
if gbur != nil {
vals := map[string]interface{}{
"timestamp": int64(gbur.GetTimestamp()),
"buffer_size": int64(gbur.GetBufferSize()),
"duration": int64(gbur.GetDuration()),
}
tags := map[string]string{
"entry_type": strconv.FormatInt(int64(gbur.GetEntryType()), 10),
"source": deviceUrl.Hostname(),
"port": deviceUrl.Port(),
}
acc.AddFields("lanz_global_buffer_usage_record", vals, tags)
}
}

View File

@@ -0,0 +1,137 @@
package lanz
import (
"net/url"
"strconv"
"testing"
pb "github.com/aristanetworks/goarista/lanz/proto"
"github.com/golang/protobuf/proto"
"github.com/influxdata/telegraf/testutil"
)
var testProtoBufCongestionRecord1 = &pb.LanzRecord{
CongestionRecord: &pb.CongestionRecord{
Timestamp: proto.Uint64(100000000000000),
IntfName: proto.String("eth1"),
SwitchId: proto.Uint32(1),
PortId: proto.Uint32(1),
QueueSize: proto.Uint32(1),
EntryType: pb.CongestionRecord_EntryType.Enum(1),
TrafficClass: proto.Uint32(1),
TimeOfMaxQLen: proto.Uint64(100000000000000),
TxLatency: proto.Uint32(100),
QDropCount: proto.Uint32(1),
FabricPeerIntfName: proto.String("FabricPeerIntfName1"),
},
}
var testProtoBufCongestionRecord2 = &pb.LanzRecord{
CongestionRecord: &pb.CongestionRecord{
Timestamp: proto.Uint64(200000000000000),
IntfName: proto.String("eth2"),
SwitchId: proto.Uint32(2),
PortId: proto.Uint32(2),
QueueSize: proto.Uint32(2),
EntryType: pb.CongestionRecord_EntryType.Enum(2),
TrafficClass: proto.Uint32(2),
TimeOfMaxQLen: proto.Uint64(200000000000000),
TxLatency: proto.Uint32(200),
QDropCount: proto.Uint32(2),
FabricPeerIntfName: proto.String("FabricPeerIntfName2"),
},
}
var testProtoBufGlobalBufferUsageRecord = &pb.LanzRecord{
GlobalBufferUsageRecord: &pb.GlobalBufferUsageRecord{
EntryType: pb.GlobalBufferUsageRecord_EntryType.Enum(1),
Timestamp: proto.Uint64(100000000000000),
BufferSize: proto.Uint32(1),
Duration: proto.Uint32(10),
},
}
func TestLanzGeneratesMetrics(t *testing.T) {
var acc testutil.Accumulator
l := NewLanz()
l.Servers = append(l.Servers, "tcp://switch01.int.example.com:50001")
l.Servers = append(l.Servers, "tcp://switch02.int.example.com:50001")
deviceUrl1, err := url.Parse(l.Servers[0])
if err != nil {
t.Fail()
}
deviceUrl2, err := url.Parse(l.Servers[1])
if err != nil {
t.Fail()
}
msgToAccumulator(&acc, testProtoBufCongestionRecord1, deviceUrl1)
acc.Wait(1)
vals1 := map[string]interface{}{
"timestamp": int64(100000000000000),
"queue_size": int64(1),
"time_of_max_qlen": int64(100000000000000),
"tx_latency": int64(100),
"q_drop_count": int64(1),
}
tags1 := map[string]string{
"intf_name": "eth1",
"switch_id": strconv.FormatInt(int64(1), 10),
"port_id": strconv.FormatInt(int64(1), 10),
"entry_type": strconv.FormatInt(int64(1), 10),
"traffic_class": strconv.FormatInt(int64(1), 10),
"fabric_peer_intf_name": "FabricPeerIntfName1",
"source": "switch01.int.example.com",
"port": "50001",
}
acc.AssertContainsFields(t, "lanz_congestion_record", vals1)
acc.AssertContainsTaggedFields(t, "lanz_congestion_record", vals1, tags1)
acc.ClearMetrics()
msgToAccumulator(&acc, testProtoBufCongestionRecord2, deviceUrl2)
acc.Wait(1)
vals2 := map[string]interface{}{
"timestamp": int64(200000000000000),
"queue_size": int64(2),
"time_of_max_qlen": int64(200000000000000),
"tx_latency": int64(200),
"q_drop_count": int64(2),
}
tags2 := map[string]string{
"intf_name": "eth2",
"switch_id": strconv.FormatInt(int64(2), 10),
"port_id": strconv.FormatInt(int64(2), 10),
"entry_type": strconv.FormatInt(int64(2), 10),
"traffic_class": strconv.FormatInt(int64(2), 10),
"fabric_peer_intf_name": "FabricPeerIntfName2",
"source": "switch02.int.example.com",
"port": "50001",
}
acc.AssertContainsFields(t, "lanz_congestion_record", vals2)
acc.AssertContainsTaggedFields(t, "lanz_congestion_record", vals2, tags2)
acc.ClearMetrics()
msgToAccumulator(&acc, testProtoBufGlobalBufferUsageRecord, deviceUrl1)
acc.Wait(1)
gburVals1 := map[string]interface{}{
"timestamp": int64(100000000000000),
"buffer_size": int64(1),
"duration": int64(10),
}
gburTags1 := map[string]string{
"entry_type": strconv.FormatInt(int64(1), 10),
"source": "switch01.int.example.com",
"port": "50001",
}
acc.AssertContainsFields(t, "lanz_global_buffer_usage_record", gburVals1)
acc.AssertContainsTaggedFields(t, "lanz_global_buffer_usage_record", gburVals1, gburTags1)
}