package influxdb_test import ( "bytes" "context" "fmt" "log" "net" "net/url" "sync" "testing" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs/influxdb" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) var ( metricString = "cpu value=42 0\n" ) func getMetric() telegraf.Metric { metric, err := metric.New( "cpu", map[string]string{}, map[string]interface{}{ "value": 42.0, }, time.Unix(0, 0), ) if err != nil { panic(err) } return metric } func getURL() *url.URL { u, err := url.Parse("udp://localhost:0") if err != nil { panic(err) } return u } type MockConn struct { WriteF func(b []byte) (n int, err error) CloseF func() error } func (c *MockConn) Write(b []byte) (n int, err error) { return c.WriteF(b) } func (c *MockConn) Close() error { return c.CloseF() } type MockDialer struct { DialContextF func(network, address string) (influxdb.Conn, error) } func (d *MockDialer) DialContext(ctx context.Context, network string, address string) (influxdb.Conn, error) { return d.DialContextF(network, address) } func TestUDP_NewUDPClientNoURL(t *testing.T) { config := influxdb.UDPConfig{} _, err := influxdb.NewUDPClient(config) require.Equal(t, err, influxdb.ErrMissingURL) } func TestUDP_URL(t *testing.T) { u := getURL() config := influxdb.UDPConfig{ URL: u, } client, err := influxdb.NewUDPClient(config) require.NoError(t, err) require.Equal(t, u.String(), client.URL()) } func TestUDP_Simple(t *testing.T) { var buffer bytes.Buffer config := influxdb.UDPConfig{ URL: getURL(), Dialer: &MockDialer{ DialContextF: func(network, address string) (influxdb.Conn, error) { conn := &MockConn{ WriteF: func(b []byte) (n int, err error) { buffer.Write(b) return 0, nil }, } return conn, nil }, }, } client, err := influxdb.NewUDPClient(config) require.NoError(t, err) ctx := context.Background() err = client.Write(ctx, []telegraf.Metric{ getMetric(), getMetric(), }) require.NoError(t, err) require.Equal(t, metricString+metricString, buffer.String()) } func TestUDP_DialError(t *testing.T) { u, err := url.Parse("invalid://127.0.0.1:9999") require.NoError(t, err) config := influxdb.UDPConfig{ URL: u, Dialer: &MockDialer{ DialContextF: func(network, address string) (influxdb.Conn, error) { return nil, fmt.Errorf( `unsupported scheme [invalid://localhost:9999]: "invalid"`) }, }, } client, err := influxdb.NewUDPClient(config) require.NoError(t, err) ctx := context.Background() err = client.Write(ctx, []telegraf.Metric{getMetric()}) require.Error(t, err) } func TestUDP_WriteError(t *testing.T) { closed := false config := influxdb.UDPConfig{ URL: getURL(), Dialer: &MockDialer{ DialContextF: func(network, address string) (influxdb.Conn, error) { conn := &MockConn{ WriteF: func(b []byte) (n int, err error) { return 0, fmt.Errorf( "write udp 127.0.0.1:52190->127.0.0.1:9999: write: connection refused") }, CloseF: func() error { closed = true return nil }, } return conn, nil }, }, } client, err := influxdb.NewUDPClient(config) require.NoError(t, err) ctx := context.Background() err = client.Write(ctx, []telegraf.Metric{getMetric()}) require.Error(t, err) require.True(t, closed) } func TestUDP_ErrorLogging(t *testing.T) { tests := []struct { name string config influxdb.UDPConfig metrics []telegraf.Metric logContains string }{ { name: "logs need more space", config: influxdb.UDPConfig{ MaxPayloadSize: 1, URL: getURL(), Dialer: &MockDialer{ DialContextF: func(network, address string) (influxdb.Conn, error) { conn := &MockConn{} return conn, nil }, }, Log: testutil.Logger{}, }, metrics: []telegraf.Metric{getMetric()}, logContains: `could not serialize metric: "cpu": need more space`, }, { name: "logs series name", config: influxdb.UDPConfig{ URL: getURL(), Dialer: &MockDialer{ DialContextF: func(network, address string) (influxdb.Conn, error) { conn := &MockConn{} return conn, nil }, }, Log: testutil.Logger{}, }, metrics: []telegraf.Metric{ func() telegraf.Metric { metric, _ := metric.New( "cpu", map[string]string{ "host": "example.org", }, map[string]interface{}{}, time.Unix(0, 0), ) return metric }(), }, logContains: `could not serialize metric: "cpu,host=example.org": no serializable fields`, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var b bytes.Buffer log.SetOutput(&b) client, err := influxdb.NewUDPClient(tt.config) require.NoError(t, err) ctx := context.Background() err = client.Write(ctx, tt.metrics) require.NoError(t, err) require.Contains(t, b.String(), tt.logContains) }) } } func TestUDP_WriteWithRealConn(t *testing.T) { conn, err := net.ListenPacket("udp", "127.0.0.1:0") require.NoError(t, err) metrics := []telegraf.Metric{ getMetric(), getMetric(), } buf := make([]byte, 200) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() var total int for range metrics { n, _, err := conn.ReadFrom(buf[total:]) if err != nil { break } total += n } buf = buf[:total] }() addr := conn.LocalAddr() u, err := url.Parse(fmt.Sprintf("%s://%s", addr.Network(), addr)) require.NoError(t, err) config := influxdb.UDPConfig{ URL: u, } client, err := influxdb.NewUDPClient(config) require.NoError(t, err) ctx := context.Background() err = client.Write(ctx, metrics) require.NoError(t, err) wg.Wait() require.Equal(t, metricString+metricString, string(buf)) }