Fix race conditions in gnmi telemetry tests (#5953)

This commit is contained in:
Daniel Nelson 2019-06-04 23:00:24 -07:00 committed by GitHub
parent 476f7fb9c5
commit ba0b0c02f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 44 deletions

View File

@ -13,17 +13,15 @@ import (
"sync" "sync"
"time" "time"
"github.com/influxdata/telegraf/metric"
"google.golang.org/grpc/credentials"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
internaltls "github.com/influxdata/telegraf/internal/tls" internaltls "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/openconfig/gnmi/proto/gnmi" "github.com/openconfig/gnmi/proto/gnmi"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )

View File

@ -8,14 +8,13 @@ import (
"testing" "testing"
"time" "time"
"google.golang.org/grpc/metadata"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"google.golang.org/grpc"
"github.com/openconfig/gnmi/proto/gnmi" "github.com/openconfig/gnmi/proto/gnmi"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
) )
func TestParsePath(t *testing.T) { func TestParsePath(t *testing.T) {
@ -58,24 +57,10 @@ func (m *mockGNMIServer) Set(context.Context, *gnmi.SetRequest) (*gnmi.SetRespon
} }
func (m *mockGNMIServer) Subscribe(server gnmi.GNMI_SubscribeServer) error { func (m *mockGNMIServer) Subscribe(server gnmi.GNMI_SubscribeServer) error {
// Avoid race conditions
go func() {
if m.scenario == 0 {
m.acc.WaitError(1)
} else if m.scenario == 1 || m.scenario == 3 {
m.acc.Wait(4)
} else if m.scenario == 2 {
m.acc.Wait(2)
}
if m.scenario >= 0 {
m.server.Stop()
}
}()
metadata, ok := metadata.FromIncomingContext(server.Context()) metadata, ok := metadata.FromIncomingContext(server.Context())
assert.Equal(m.t, ok, true) require.Equal(m.t, ok, true)
assert.Equal(m.t, metadata.Get("username"), []string{"theuser"}) require.Equal(m.t, metadata.Get("username"), []string{"theuser"})
assert.Equal(m.t, metadata.Get("password"), []string{"thepassword"}) require.Equal(m.t, metadata.Get("password"), []string{"thepassword"})
switch m.scenario { switch m.scenario {
case 0: case 0:
@ -106,20 +91,22 @@ func (m *mockGNMIServer) Subscribe(server gnmi.GNMI_SubscribeServer) error {
} }
func TestGNMIError(t *testing.T) { func TestGNMIError(t *testing.T) {
listener, _ := net.Listen("tcp", "127.0.0.1:57003") listener, _ := net.Listen("tcp", "127.0.0.1:0")
server := grpc.NewServer() server := grpc.NewServer()
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 0, server: server, acc: acc}) gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 0, server: server, acc: acc})
c := &CiscoTelemetryGNMI{Addresses: []string{"127.0.0.1:57003"}, c := &CiscoTelemetryGNMI{Addresses: []string{listener.Addr().String()},
Username: "theuser", Password: "thepassword", Encoding: "proto", Username: "theuser", Password: "thepassword", Encoding: "proto",
Redial: internal.Duration{Duration: 1 * time.Second}} Redial: internal.Duration{Duration: 1 * time.Second}}
assert.Nil(t, c.Start(acc)) require.Nil(t, c.Start(acc))
server.Serve(listener) go server.Serve(listener)
acc.WaitError(1)
c.Stop() c.Stop()
server.Stop()
assert.Contains(t, acc.Errors, errors.New("aborted GNMI subscription: rpc error: code = Unknown desc = testerror")) require.Contains(t, acc.Errors, errors.New("aborted GNMI subscription: rpc error: code = Unknown desc = testerror"))
} }
func mockGNMINotification() *gnmi.Notification { func mockGNMINotification() *gnmi.Notification {
@ -170,23 +157,25 @@ func mockGNMINotification() *gnmi.Notification {
} }
func TestGNMIMultiple(t *testing.T) { func TestGNMIMultiple(t *testing.T) {
listener, _ := net.Listen("tcp", "127.0.0.1:57004") listener, _ := net.Listen("tcp", "127.0.0.1:0")
server := grpc.NewServer() server := grpc.NewServer()
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 1, server: server, acc: acc}) gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 1, server: server, acc: acc})
c := &CiscoTelemetryGNMI{Addresses: []string{"127.0.0.1:57004"}, c := &CiscoTelemetryGNMI{Addresses: []string{listener.Addr().String()},
Username: "theuser", Password: "thepassword", Encoding: "proto", Username: "theuser", Password: "thepassword", Encoding: "proto",
Redial: internal.Duration{Duration: 1 * time.Second}, Redial: internal.Duration{Duration: 1 * time.Second},
Subscriptions: []Subscription{{Name: "alias", Origin: "type", Path: "/model", SubscriptionMode: "sample"}}, Subscriptions: []Subscription{{Name: "alias", Origin: "type", Path: "/model", SubscriptionMode: "sample"}},
} }
assert.Nil(t, c.Start(acc)) require.Nil(t, c.Start(acc))
server.Serve(listener) go server.Serve(listener)
acc.Wait(4)
c.Stop() c.Stop()
server.Stop()
assert.Empty(t, acc.Errors) require.Empty(t, acc.Errors)
tags := map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar", "name": "str", "uint64": "1234"} tags := map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar", "name": "str", "uint64": "1234"}
fields := map[string]interface{}{"some/path": int64(5678)} fields := map[string]interface{}{"some/path": int64(5678)}
@ -206,28 +195,31 @@ func TestGNMIMultiple(t *testing.T) {
} }
func TestGNMIMultipleRedial(t *testing.T) { func TestGNMIMultipleRedial(t *testing.T) {
listener, _ := net.Listen("tcp", "127.0.0.1:57004") listener, _ := net.Listen("tcp", "127.0.0.1:0")
server := grpc.NewServer() server := grpc.NewServer()
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 2, server: server, acc: acc}) gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 2, server: server, acc: acc})
c := &CiscoTelemetryGNMI{Addresses: []string{"127.0.0.1:57004"}, c := &CiscoTelemetryGNMI{Addresses: []string{listener.Addr().String()},
Username: "theuser", Password: "thepassword", Encoding: "proto", Username: "theuser", Password: "thepassword", Encoding: "proto",
Redial: internal.Duration{Duration: 500 * time.Millisecond}, Redial: internal.Duration{Duration: 10 * time.Millisecond},
Subscriptions: []Subscription{{Name: "alias", Origin: "type", Path: "/model", SubscriptionMode: "sample"}}, Subscriptions: []Subscription{{Name: "alias", Origin: "type", Path: "/model", SubscriptionMode: "sample"}},
} }
assert.Nil(t, c.Start(acc)) require.Nil(t, c.Start(acc))
server.Serve(listener)
listener, _ = net.Listen("tcp", "127.0.0.1:57004") go server.Serve(listener)
acc.Wait(2)
server.Stop()
listener, _ = net.Listen("tcp", listener.Addr().String())
server = grpc.NewServer() server = grpc.NewServer()
gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 3, server: server, acc: acc}) gnmi.RegisterGNMIServer(server, &mockGNMIServer{t: t, scenario: 3, server: server, acc: acc})
server.Serve(listener) go server.Serve(listener)
acc.Wait(4)
c.Stop() c.Stop()
server.Stop()
assert.Empty(t, acc.Errors)
tags := map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar", "name": "str", "uint64": "1234"} tags := map[string]string{"path": "type:/model", "source": "127.0.0.1", "foo": "bar", "name": "str", "uint64": "1234"}
fields := map[string]interface{}{"some/path": int64(5678)} fields := map[string]interface{}{"some/path": int64(5678)}