Use last path element as field key if path fully specified (#6848)

This commit is contained in:
Daniel Nelson 2020-01-08 10:52:36 -08:00 committed by GitHub
parent 69d9c10572
commit 73488eb61c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 353 additions and 130 deletions

View File

@ -280,11 +280,26 @@ func (c *CiscoTelemetryGNMI) handleSubscribeResponse(address string, reply *gnmi
} }
// Group metrics // Group metrics
for key, val := range fields { for k, v := range fields {
if len(aliasPath) > 0 { key := k
if len(aliasPath) < len(key) {
// This may not be an exact prefix, due to naming style
// conversion on the key.
key = key[len(aliasPath)+1:] key = key[len(aliasPath)+1:]
} else {
// Otherwise use the last path element as the field key.
key = path.Base(key)
// If there are no elements skip the item; this would be an
// invalid message.
key = strings.TrimLeft(key, "/.")
if key == "" {
c.Log.Errorf("invalid empty path: %q", k)
continue
}
} }
grouper.Add(name, tags, timestamp, key, val)
grouper.Add(name, tags, timestamp, key, v)
} }
lastAliasPath = aliasPath lastAliasPath = aliasPath

View File

@ -5,9 +5,11 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"sync"
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/openconfig/gnmi/proto/gnmi" "github.com/openconfig/gnmi/proto/gnmi"
@ -37,89 +39,124 @@ func TestParsePath(t *testing.T) {
assert.Equal(t, errors.New("Invalid GNMI path: /foo[[/"), err) assert.Equal(t, errors.New("Invalid GNMI path: /foo[[/"), err)
} }
type mockGNMIServer struct { type MockServer struct {
t *testing.T SubscribeF func(gnmi.GNMI_SubscribeServer) error
acc *testutil.Accumulator GRPCServer *grpc.Server
server *grpc.Server
scenario int
} }
func (m *mockGNMIServer) Capabilities(context.Context, *gnmi.CapabilityRequest) (*gnmi.CapabilityResponse, error) { func (s *MockServer) Capabilities(context.Context, *gnmi.CapabilityRequest) (*gnmi.CapabilityResponse, error) {
return nil, nil return nil, nil
} }
func (m *mockGNMIServer) Get(context.Context, *gnmi.GetRequest) (*gnmi.GetResponse, error) { func (s *MockServer) Get(context.Context, *gnmi.GetRequest) (*gnmi.GetResponse, error) {
return nil, nil return nil, nil
} }
func (m *mockGNMIServer) Set(context.Context, *gnmi.SetRequest) (*gnmi.SetResponse, error) { func (s *MockServer) Set(context.Context, *gnmi.SetRequest) (*gnmi.SetResponse, error) {
return nil, nil return nil, nil
} }
func (m *mockGNMIServer) Subscribe(server gnmi.GNMI_SubscribeServer) error { func (s *MockServer) Subscribe(server gnmi.GNMI_SubscribeServer) error {
metadata, ok := metadata.FromIncomingContext(server.Context()) return s.SubscribeF(server)
require.Equal(m.t, ok, true)
require.Equal(m.t, metadata.Get("username"), []string{"theuser"})
require.Equal(m.t, metadata.Get("password"), []string{"thepassword"})
// Must read request before sending a response; even though we don't check
// the request itself currently.
_, err := server.Recv()
if err != nil {
panic(err)
}
switch m.scenario {
case 0:
return fmt.Errorf("testerror")
case 1:
notification := mockGNMINotification()
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}})
notification.Prefix.Elem[0].Key["foo"] = "bar2"
notification.Update[0].Path.Elem[1].Key["name"] = "str2"
notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_JsonVal{JsonVal: []byte{'"', '1', '2', '3', '"'}}}
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
return nil
case 2:
notification := mockGNMINotification()
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
return nil
case 3:
notification := mockGNMINotification()
notification.Prefix.Elem[0].Key["foo"] = "bar2"
notification.Update[0].Path.Elem[1].Key["name"] = "str2"
notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_BoolVal{BoolVal: false}}
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
return nil
default:
return fmt.Errorf("test not implemented ;)")
}
} }
func TestGNMIError(t *testing.T) { func TestWaitError(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0") listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err) require.NoError(t, err)
server := grpc.NewServer()
acc := &testutil.Accumulator{}
gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 0, server: server, acc: acc})
c := &CiscoTelemetryGNMI{ grpcServer := grpc.NewServer()
gnmiServer := &MockServer{
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
return fmt.Errorf("testerror")
},
GRPCServer: grpcServer,
}
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
plugin := &CiscoTelemetryGNMI{
Log: testutil.Logger{}, Log: testutil.Logger{},
Addresses: []string{listener.Addr().String()}, Addresses: []string{listener.Addr().String()},
Username: "theuser", Password: "thepassword", Encoding: "proto", Encoding: "proto",
Redial: internal.Duration{Duration: 1 * time.Second}} Redial: internal.Duration{Duration: 1 * time.Second},
}
require.NoError(t, c.Start(acc)) var acc testutil.Accumulator
err = plugin.Start(&acc)
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() { go func() {
err := server.Serve(listener) defer wg.Done()
err := grpcServer.Serve(listener)
require.NoError(t, err) require.NoError(t, err)
}() }()
acc.WaitError(1)
c.Stop()
server.Stop()
require.Contains(t, acc.Errors, errors.New("aborted GNMI subscription: rpc error: code = Unknown desc = testerror")) acc.WaitError(1)
plugin.Stop()
grpcServer.Stop()
wg.Wait()
require.Contains(t, acc.Errors,
errors.New("aborted GNMI subscription: rpc error: code = Unknown desc = testerror"))
}
func TestUsernamePassword(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
grpcServer := grpc.NewServer()
gnmiServer := &MockServer{
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
metadata, ok := metadata.FromIncomingContext(server.Context())
if !ok {
return errors.New("failed to get metadata")
}
username := metadata.Get("username")
if len(username) != 1 || username[0] != "theusername" {
return errors.New("wrong username")
}
password := metadata.Get("password")
if len(password) != 1 || password[0] != "thepassword" {
return errors.New("wrong password")
}
return errors.New("success")
},
GRPCServer: grpcServer,
}
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
plugin := &CiscoTelemetryGNMI{
Log: testutil.Logger{},
Addresses: []string{listener.Addr().String()},
Username: "theusername",
Password: "thepassword",
Encoding: "proto",
Redial: internal.Duration{Duration: 1 * time.Second},
}
var acc testutil.Accumulator
err = plugin.Start(&acc)
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := grpcServer.Serve(listener)
require.NoError(t, err)
}()
acc.WaitError(1)
plugin.Stop()
grpcServer.Stop()
wg.Wait()
require.Contains(t, acc.Errors,
errors.New("aborted GNMI subscription: rpc error: code = Unknown desc = success"))
} }
func mockGNMINotification() *gnmi.Notification { func mockGNMINotification() *gnmi.Notification {
@ -169,97 +206,268 @@ func mockGNMINotification() *gnmi.Notification {
} }
} }
func TestGNMIMultiple(t *testing.T) { func TestNotification(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0") tests := []struct {
require.NoError(t, err) name string
server := grpc.NewServer() plugin *CiscoTelemetryGNMI
acc := &testutil.Accumulator{} server *MockServer
gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 1, server: server, acc: acc}) expected []telegraf.Metric
}{
c := &CiscoTelemetryGNMI{ {
Log: testutil.Logger{}, name: "multiple metrics",
Addresses: []string{listener.Addr().String()}, plugin: &CiscoTelemetryGNMI{
Username: "theuser", Password: "thepassword", Encoding: "proto", Log: testutil.Logger{},
Redial: internal.Duration{Duration: 1 * time.Second}, Encoding: "proto",
Subscriptions: []Subscription{{Name: "alias", Origin: "type", Path: "/model", SubscriptionMode: "sample"}}, Redial: internal.Duration{Duration: 1 * time.Second},
Subscriptions: []Subscription{
{
Name: "alias",
Origin: "type",
Path: "/model",
SubscriptionMode: "sample",
},
},
},
server: &MockServer{
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
notification := mockGNMINotification()
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}})
notification.Prefix.Elem[0].Key["foo"] = "bar2"
notification.Update[0].Path.Elem[1].Key["name"] = "str2"
notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_JsonVal{JsonVal: []byte{'"', '1', '2', '3', '"'}}}
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
return nil
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"alias",
map[string]string{
"path": "type:/model",
"source": "127.0.0.1",
"foo": "bar",
"name": "str",
"uint64": "1234",
},
map[string]interface{}{
"some/path": int64(5678),
},
time.Unix(0, 0),
),
testutil.MustMetric(
"alias",
map[string]string{
"path": "type:/model",
"source": "127.0.0.1",
"foo": "bar",
},
map[string]interface{}{
"other/path": "foobar",
"other/this": "that",
},
time.Unix(0, 0),
),
testutil.MustMetric(
"alias",
map[string]string{
"path": "type:/model",
"foo": "bar2",
"source": "127.0.0.1",
"name": "str2",
"uint64": "1234",
},
map[string]interface{}{
"some/path": "123",
},
time.Unix(0, 0),
),
testutil.MustMetric(
"alias",
map[string]string{
"path": "type:/model",
"source": "127.0.0.1",
"foo": "bar2",
},
map[string]interface{}{
"other/path": "foobar",
"other/this": "that",
},
time.Unix(0, 0),
),
},
},
{
name: "full path field key",
plugin: &CiscoTelemetryGNMI{
Log: testutil.Logger{},
Encoding: "proto",
Redial: internal.Duration{Duration: 1 * time.Second},
Subscriptions: []Subscription{
{
Name: "PHY_COUNTERS",
Origin: "type",
Path: "/state/port[port-id=*]/ethernet/oper-speed",
SubscriptionMode: "sample",
},
},
},
server: &MockServer{
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
response := &gnmi.SubscribeResponse{
Response: &gnmi.SubscribeResponse_Update{
Update: &gnmi.Notification{
Timestamp: 1543236572000000000,
Prefix: &gnmi.Path{
Origin: "type",
Elem: []*gnmi.PathElem{
{
Name: "state",
},
{
Name: "port",
Key: map[string]string{"port-id": "1"},
},
{
Name: "ethernet",
},
{
Name: "oper-speed",
},
},
Target: "subscription",
},
Update: []*gnmi.Update{
{
Path: &gnmi.Path{},
Val: &gnmi.TypedValue{
Value: &gnmi.TypedValue_IntVal{IntVal: 42},
},
},
},
},
},
}
server.Send(response)
return nil
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"PHY_COUNTERS",
map[string]string{
"path": "type:/state/port/ethernet/oper-speed",
"source": "127.0.0.1",
"port_id": "1",
},
map[string]interface{}{
"oper_speed": 42,
},
time.Unix(0, 0),
),
},
},
} }
require.NoError(t, c.Start(acc)) for _, tt := range tests {
go func() { t.Run(tt.name, func(t *testing.T) {
err := server.Serve(listener) listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err) require.NoError(t, err)
}()
acc.Wait(4)
c.Stop()
server.Stop()
require.Empty(t, acc.Errors) tt.plugin.Addresses = []string{listener.Addr().String()}
tags := map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar", "name": "str", "uint64": "1234"} grpcServer := grpc.NewServer()
fields := map[string]interface{}{"some/path": int64(5678)} tt.server.GRPCServer = grpcServer
acc.AssertContainsTaggedFields(t, "alias", fields, tags) gnmi.RegisterGNMIServer(grpcServer, tt.server)
tags = map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar"} var acc testutil.Accumulator
fields = map[string]interface{}{"other/path": "foobar", "other/this": "that"} err = tt.plugin.Start(&acc)
acc.AssertContainsTaggedFields(t, "alias", fields, tags) require.NoError(t, err)
tags = map[string]string{"path": "type:/model", "foo": "bar2", "source": "127.0.0.1", "name": "str2", "uint64": "1234"} var wg sync.WaitGroup
fields = map[string]interface{}{"some/path": "123"} wg.Add(1)
acc.AssertContainsTaggedFields(t, "alias", fields, tags) go func() {
defer wg.Done()
err := grpcServer.Serve(listener)
require.NoError(t, err)
}()
tags = map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar2"} acc.Wait(len(tt.expected))
fields = map[string]interface{}{"other/path": "foobar", "other/this": "that"} tt.plugin.Stop()
acc.AssertContainsTaggedFields(t, "alias", fields, tags) grpcServer.Stop()
wg.Wait()
testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(),
testutil.IgnoreTime())
})
}
} }
func TestGNMIMultipleRedial(t *testing.T) { func TestRedial(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0") listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err) require.NoError(t, err)
server := grpc.NewServer()
acc := &testutil.Accumulator{}
gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 2, server: server, acc: acc})
c := &CiscoTelemetryGNMI{ plugin := &CiscoTelemetryGNMI{
Log: testutil.Logger{}, Log: testutil.Logger{},
Addresses: []string{listener.Addr().String()}, Addresses: []string{listener.Addr().String()},
Username: "theuser", Password: "thepassword", Encoding: "proto", Encoding: "proto",
Redial: internal.Duration{Duration: 10 * time.Millisecond}, Redial: internal.Duration{Duration: 10 * time.Millisecond},
Subscriptions: []Subscription{{Name: "alias", Origin: "type", Path: "/model", SubscriptionMode: "sample"}},
} }
require.NoError(t, c.Start(acc)) grpcServer := grpc.NewServer()
gnmiServer := &MockServer{
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
notification := mockGNMINotification()
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
return nil
},
GRPCServer: grpcServer,
}
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
var wg sync.WaitGroup
wg.Add(1)
go func() { go func() {
err := server.Serve(listener) defer wg.Done()
err := grpcServer.Serve(listener)
require.NoError(t, err) require.NoError(t, err)
}() }()
var acc testutil.Accumulator
err = plugin.Start(&acc)
require.NoError(t, err)
acc.Wait(2) acc.Wait(2)
server.Stop() grpcServer.Stop()
wg.Wait()
listener, _ = net.Listen("tcp", listener.Addr().String()) // Restart GNMI server at the same address
server = grpc.NewServer() listener, err = net.Listen("tcp", listener.Addr().String())
gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 3, server: server, acc: acc}) require.NoError(t, err)
grpcServer = grpc.NewServer()
gnmiServer = &MockServer{
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
notification := mockGNMINotification()
notification.Prefix.Elem[0].Key["foo"] = "bar2"
notification.Update[0].Path.Elem[1].Key["name"] = "str2"
notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_BoolVal{BoolVal: false}}
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
return nil
},
GRPCServer: grpcServer,
}
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
wg.Add(1)
go func() { go func() {
err := server.Serve(listener) defer wg.Done()
err := grpcServer.Serve(listener)
require.NoError(t, err) require.NoError(t, err)
}() }()
acc.Wait(4) acc.Wait(4)
c.Stop() plugin.Stop()
server.Stop() grpcServer.Stop()
wg.Wait()
tags := map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar", "name": "str", "uint64": "1234"}
fields := map[string]interface{}{"some/path": int64(5678)}
acc.AssertContainsTaggedFields(t, "alias", fields, tags)
tags = map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar"}
fields = map[string]interface{}{"other/path": "foobar", "other/this": "that"}
acc.AssertContainsTaggedFields(t, "alias", fields, tags)
tags = map[string]string{"path": "type:/model", "foo": "bar2", "source": "127.0.0.1", "name": "str2", "uint64": "1234"}
fields = map[string]interface{}{"some/path": false}
acc.AssertContainsTaggedFields(t, "alias", fields, tags)
tags = map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar2"}
fields = map[string]interface{}{"other/path": "foobar", "other/this": "that"}
acc.AssertContainsTaggedFields(t, "alias", fields, tags)
} }