From 73488eb61cf71d41e45df6f7b91cfc6d4e95a067 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 8 Jan 2020 10:52:36 -0800 Subject: [PATCH] Use last path element as field key if path fully specified (#6848) --- .../cisco_telemetry_gnmi.go | 21 +- .../cisco_telemetry_gnmi_test.go | 462 +++++++++++++----- 2 files changed, 353 insertions(+), 130 deletions(-) diff --git a/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi.go b/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi.go index 38297b976..c8c50e368 100644 --- a/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi.go +++ b/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi.go @@ -280,11 +280,26 @@ func (c *CiscoTelemetryGNMI) handleSubscribeResponse(address string, reply *gnmi } // Group metrics - for key, val := range fields { - if len(aliasPath) > 0 { + for k, v := range fields { + key := k + if len(aliasPath) < len(key) { + // This may not be an exact prefix, due to naming style + // conversion on the key. key = key[len(aliasPath)+1:] + } else { + // Otherwise use the last path element as the field key. + key = path.Base(key) + + // If there are no elements skip the item; this would be an + // invalid message. + key = strings.TrimLeft(key, "/.") + if key == "" { + c.Log.Errorf("invalid empty path: %q", k) + continue + } } - grouper.Add(name, tags, timestamp, key, val) + + grouper.Add(name, tags, timestamp, key, v) } lastAliasPath = aliasPath diff --git a/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi_test.go b/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi_test.go index 7a62bcd14..1b12886b9 100644 --- a/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi_test.go +++ b/plugins/inputs/cisco_telemetry_gnmi/cisco_telemetry_gnmi_test.go @@ -5,9 +5,11 @@ import ( "errors" "fmt" "net" + "sync" "testing" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/testutil" "github.com/openconfig/gnmi/proto/gnmi" @@ -37,89 +39,124 @@ func TestParsePath(t *testing.T) { assert.Equal(t, errors.New("Invalid GNMI path: /foo[[/"), err) } -type mockGNMIServer struct { - t *testing.T - acc *testutil.Accumulator - server *grpc.Server - scenario int +type MockServer struct { + SubscribeF func(gnmi.GNMI_SubscribeServer) error + GRPCServer *grpc.Server } -func (m *mockGNMIServer) Capabilities(context.Context, *gnmi.CapabilityRequest) (*gnmi.CapabilityResponse, error) { +func (s *MockServer) Capabilities(context.Context, *gnmi.CapabilityRequest) (*gnmi.CapabilityResponse, error) { return nil, nil } -func (m *mockGNMIServer) Get(context.Context, *gnmi.GetRequest) (*gnmi.GetResponse, error) { +func (s *MockServer) Get(context.Context, *gnmi.GetRequest) (*gnmi.GetResponse, error) { return nil, nil } -func (m *mockGNMIServer) Set(context.Context, *gnmi.SetRequest) (*gnmi.SetResponse, error) { +func (s *MockServer) Set(context.Context, *gnmi.SetRequest) (*gnmi.SetResponse, error) { return nil, nil } -func (m *mockGNMIServer) Subscribe(server gnmi.GNMI_SubscribeServer) error { - metadata, ok := metadata.FromIncomingContext(server.Context()) - require.Equal(m.t, ok, true) - require.Equal(m.t, metadata.Get("username"), []string{"theuser"}) - require.Equal(m.t, metadata.Get("password"), []string{"thepassword"}) - - // Must read request before sending a response; even though we don't check - // the request itself currently. - _, err := server.Recv() - if err != nil { - panic(err) - } - - switch m.scenario { - case 0: - return fmt.Errorf("testerror") - case 1: - notification := mockGNMINotification() - server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) - server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}}) - notification.Prefix.Elem[0].Key["foo"] = "bar2" - notification.Update[0].Path.Elem[1].Key["name"] = "str2" - notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_JsonVal{JsonVal: []byte{'"', '1', '2', '3', '"'}}} - server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) - return nil - case 2: - notification := mockGNMINotification() - server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) - return nil - case 3: - notification := mockGNMINotification() - notification.Prefix.Elem[0].Key["foo"] = "bar2" - notification.Update[0].Path.Elem[1].Key["name"] = "str2" - notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_BoolVal{BoolVal: false}} - server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) - return nil - default: - return fmt.Errorf("test not implemented ;)") - } +func (s *MockServer) Subscribe(server gnmi.GNMI_SubscribeServer) error { + return s.SubscribeF(server) } -func TestGNMIError(t *testing.T) { +func TestWaitError(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - server := grpc.NewServer() - acc := &testutil.Accumulator{} - gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 0, server: server, acc: acc}) - c := &CiscoTelemetryGNMI{ + grpcServer := grpc.NewServer() + gnmiServer := &MockServer{ + SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + return fmt.Errorf("testerror") + }, + GRPCServer: grpcServer, + } + gnmi.RegisterGNMIServer(grpcServer, gnmiServer) + + plugin := &CiscoTelemetryGNMI{ Log: testutil.Logger{}, Addresses: []string{listener.Addr().String()}, - Username: "theuser", Password: "thepassword", Encoding: "proto", - Redial: internal.Duration{Duration: 1 * time.Second}} + Encoding: "proto", + Redial: internal.Duration{Duration: 1 * time.Second}, + } - require.NoError(t, c.Start(acc)) + var acc testutil.Accumulator + err = plugin.Start(&acc) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) go func() { - err := server.Serve(listener) + defer wg.Done() + err := grpcServer.Serve(listener) require.NoError(t, err) }() - acc.WaitError(1) - c.Stop() - server.Stop() - require.Contains(t, acc.Errors, errors.New("aborted GNMI subscription: rpc error: code = Unknown desc = testerror")) + acc.WaitError(1) + plugin.Stop() + grpcServer.Stop() + wg.Wait() + + require.Contains(t, acc.Errors, + errors.New("aborted GNMI subscription: rpc error: code = Unknown desc = testerror")) +} + +func TestUsernamePassword(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + grpcServer := grpc.NewServer() + gnmiServer := &MockServer{ + SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + metadata, ok := metadata.FromIncomingContext(server.Context()) + if !ok { + return errors.New("failed to get metadata") + } + + username := metadata.Get("username") + if len(username) != 1 || username[0] != "theusername" { + return errors.New("wrong username") + } + + password := metadata.Get("password") + if len(password) != 1 || password[0] != "thepassword" { + return errors.New("wrong password") + } + + return errors.New("success") + }, + GRPCServer: grpcServer, + } + gnmi.RegisterGNMIServer(grpcServer, gnmiServer) + + plugin := &CiscoTelemetryGNMI{ + Log: testutil.Logger{}, + Addresses: []string{listener.Addr().String()}, + Username: "theusername", + Password: "thepassword", + Encoding: "proto", + Redial: internal.Duration{Duration: 1 * time.Second}, + } + + var acc testutil.Accumulator + err = plugin.Start(&acc) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := grpcServer.Serve(listener) + require.NoError(t, err) + }() + + acc.WaitError(1) + plugin.Stop() + grpcServer.Stop() + wg.Wait() + + require.Contains(t, acc.Errors, + errors.New("aborted GNMI subscription: rpc error: code = Unknown desc = success")) } func mockGNMINotification() *gnmi.Notification { @@ -169,97 +206,268 @@ func mockGNMINotification() *gnmi.Notification { } } -func TestGNMIMultiple(t *testing.T) { - listener, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - server := grpc.NewServer() - acc := &testutil.Accumulator{} - gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 1, server: server, acc: acc}) - - c := &CiscoTelemetryGNMI{ - Log: testutil.Logger{}, - Addresses: []string{listener.Addr().String()}, - Username: "theuser", Password: "thepassword", Encoding: "proto", - Redial: internal.Duration{Duration: 1 * time.Second}, - Subscriptions: []Subscription{{Name: "alias", Origin: "type", Path: "/model", SubscriptionMode: "sample"}}, +func TestNotification(t *testing.T) { + tests := []struct { + name string + plugin *CiscoTelemetryGNMI + server *MockServer + expected []telegraf.Metric + }{ + { + name: "multiple metrics", + plugin: &CiscoTelemetryGNMI{ + Log: testutil.Logger{}, + Encoding: "proto", + Redial: internal.Duration{Duration: 1 * time.Second}, + Subscriptions: []Subscription{ + { + Name: "alias", + Origin: "type", + Path: "/model", + SubscriptionMode: "sample", + }, + }, + }, + server: &MockServer{ + SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + notification := mockGNMINotification() + server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) + server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}}) + notification.Prefix.Elem[0].Key["foo"] = "bar2" + notification.Update[0].Path.Elem[1].Key["name"] = "str2" + notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_JsonVal{JsonVal: []byte{'"', '1', '2', '3', '"'}}} + server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) + return nil + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "alias", + map[string]string{ + "path": "type:/model", + "source": "127.0.0.1", + "foo": "bar", + "name": "str", + "uint64": "1234", + }, + map[string]interface{}{ + "some/path": int64(5678), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "alias", + map[string]string{ + "path": "type:/model", + "source": "127.0.0.1", + "foo": "bar", + }, + map[string]interface{}{ + "other/path": "foobar", + "other/this": "that", + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "alias", + map[string]string{ + "path": "type:/model", + "foo": "bar2", + "source": "127.0.0.1", + "name": "str2", + "uint64": "1234", + }, + map[string]interface{}{ + "some/path": "123", + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "alias", + map[string]string{ + "path": "type:/model", + "source": "127.0.0.1", + "foo": "bar2", + }, + map[string]interface{}{ + "other/path": "foobar", + "other/this": "that", + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "full path field key", + plugin: &CiscoTelemetryGNMI{ + Log: testutil.Logger{}, + Encoding: "proto", + Redial: internal.Duration{Duration: 1 * time.Second}, + Subscriptions: []Subscription{ + { + Name: "PHY_COUNTERS", + Origin: "type", + Path: "/state/port[port-id=*]/ethernet/oper-speed", + SubscriptionMode: "sample", + }, + }, + }, + server: &MockServer{ + SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + response := &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_Update{ + Update: &gnmi.Notification{ + Timestamp: 1543236572000000000, + Prefix: &gnmi.Path{ + Origin: "type", + Elem: []*gnmi.PathElem{ + { + Name: "state", + }, + { + Name: "port", + Key: map[string]string{"port-id": "1"}, + }, + { + Name: "ethernet", + }, + { + Name: "oper-speed", + }, + }, + Target: "subscription", + }, + Update: []*gnmi.Update{ + { + Path: &gnmi.Path{}, + Val: &gnmi.TypedValue{ + Value: &gnmi.TypedValue_IntVal{IntVal: 42}, + }, + }, + }, + }, + }, + } + server.Send(response) + return nil + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "PHY_COUNTERS", + map[string]string{ + "path": "type:/state/port/ethernet/oper-speed", + "source": "127.0.0.1", + "port_id": "1", + }, + map[string]interface{}{ + "oper_speed": 42, + }, + time.Unix(0, 0), + ), + }, + }, } - require.NoError(t, c.Start(acc)) - go func() { - err := server.Serve(listener) - require.NoError(t, err) - }() - acc.Wait(4) - c.Stop() - server.Stop() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) - require.Empty(t, acc.Errors) + tt.plugin.Addresses = []string{listener.Addr().String()} - tags := map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar", "name": "str", "uint64": "1234"} - fields := map[string]interface{}{"some/path": int64(5678)} - acc.AssertContainsTaggedFields(t, "alias", fields, tags) + grpcServer := grpc.NewServer() + tt.server.GRPCServer = grpcServer + gnmi.RegisterGNMIServer(grpcServer, tt.server) - tags = map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar"} - fields = map[string]interface{}{"other/path": "foobar", "other/this": "that"} - acc.AssertContainsTaggedFields(t, "alias", fields, tags) + var acc testutil.Accumulator + err = tt.plugin.Start(&acc) + require.NoError(t, err) - tags = map[string]string{"path": "type:/model", "foo": "bar2", "source": "127.0.0.1", "name": "str2", "uint64": "1234"} - fields = map[string]interface{}{"some/path": "123"} - acc.AssertContainsTaggedFields(t, "alias", fields, tags) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := grpcServer.Serve(listener) + require.NoError(t, err) + }() - tags = map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar2"} - fields = map[string]interface{}{"other/path": "foobar", "other/this": "that"} - acc.AssertContainsTaggedFields(t, "alias", fields, tags) + acc.Wait(len(tt.expected)) + tt.plugin.Stop() + grpcServer.Stop() + wg.Wait() + + testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), + testutil.IgnoreTime()) + }) + } } -func TestGNMIMultipleRedial(t *testing.T) { +func TestRedial(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - server := grpc.NewServer() - acc := &testutil.Accumulator{} - gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 2, server: server, acc: acc}) - c := &CiscoTelemetryGNMI{ + plugin := &CiscoTelemetryGNMI{ Log: testutil.Logger{}, Addresses: []string{listener.Addr().String()}, - Username: "theuser", Password: "thepassword", Encoding: "proto", - Redial: internal.Duration{Duration: 10 * time.Millisecond}, - Subscriptions: []Subscription{{Name: "alias", Origin: "type", Path: "/model", SubscriptionMode: "sample"}}, + Encoding: "proto", + Redial: internal.Duration{Duration: 10 * time.Millisecond}, } - require.NoError(t, c.Start(acc)) + grpcServer := grpc.NewServer() + gnmiServer := &MockServer{ + SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + notification := mockGNMINotification() + server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) + return nil + }, + GRPCServer: grpcServer, + } + gnmi.RegisterGNMIServer(grpcServer, gnmiServer) + + var wg sync.WaitGroup + wg.Add(1) go func() { - err := server.Serve(listener) + defer wg.Done() + err := grpcServer.Serve(listener) require.NoError(t, err) }() + + var acc testutil.Accumulator + err = plugin.Start(&acc) + require.NoError(t, err) + acc.Wait(2) - server.Stop() + grpcServer.Stop() + wg.Wait() - listener, _ = net.Listen("tcp", listener.Addr().String()) - server = grpc.NewServer() - gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 3, server: server, acc: acc}) + // Restart GNMI server at the same address + listener, err = net.Listen("tcp", listener.Addr().String()) + require.NoError(t, err) + grpcServer = grpc.NewServer() + gnmiServer = &MockServer{ + SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + notification := mockGNMINotification() + notification.Prefix.Elem[0].Key["foo"] = "bar2" + notification.Update[0].Path.Elem[1].Key["name"] = "str2" + notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_BoolVal{BoolVal: false}} + server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) + return nil + }, + GRPCServer: grpcServer, + } + gnmi.RegisterGNMIServer(grpcServer, gnmiServer) + + wg.Add(1) go func() { - err := server.Serve(listener) + defer wg.Done() + err := grpcServer.Serve(listener) require.NoError(t, err) }() + acc.Wait(4) - c.Stop() - server.Stop() - - tags := map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar", "name": "str", "uint64": "1234"} - fields := map[string]interface{}{"some/path": int64(5678)} - acc.AssertContainsTaggedFields(t, "alias", fields, tags) - - tags = map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar"} - fields = map[string]interface{}{"other/path": "foobar", "other/this": "that"} - acc.AssertContainsTaggedFields(t, "alias", fields, tags) - - tags = map[string]string{"path": "type:/model", "foo": "bar2", "source": "127.0.0.1", "name": "str2", "uint64": "1234"} - fields = map[string]interface{}{"some/path": false} - acc.AssertContainsTaggedFields(t, "alias", fields, tags) - - tags = map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar2"} - fields = map[string]interface{}{"other/path": "foobar", "other/this": "that"} - acc.AssertContainsTaggedFields(t, "alias", fields, tags) + plugin.Stop() + grpcServer.Stop() + wg.Wait() }