From fa30f568ecd224ee64baf4399c22fe2aa9d5d532 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 14 Aug 2018 13:36:29 -0700 Subject: [PATCH] Skip unserializable metric in influxDB UDP output (#4534) --- plugins/outputs/influxdb/udp.go | 12 ++-- plugins/outputs/influxdb/udp_test.go | 88 +++++++++++++++-------- plugins/serializers/influx/influx.go | 73 ++++++++++--------- plugins/serializers/influx/influx_test.go | 15 ++-- plugins/serializers/influx/reader.go | 13 +--- 5 files changed, 119 insertions(+), 82 deletions(-) diff --git a/plugins/outputs/influxdb/udp.go b/plugins/outputs/influxdb/udp.go index 5b3f5ce51..62f2a6ab7 100644 --- a/plugins/outputs/influxdb/udp.go +++ b/plugins/outputs/influxdb/udp.go @@ -3,11 +3,11 @@ package influxdb import ( "context" "fmt" + "log" "net" "net/url" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers/influx" ) @@ -28,7 +28,7 @@ type Conn interface { type UDPConfig struct { MaxPayloadSize int URL *url.URL - Serializer serializers.Serializer + Serializer *influx.Serializer Dialer Dialer } @@ -65,7 +65,7 @@ func NewUDPClient(config *UDPConfig) (*udpClient, error) { type udpClient struct { conn Conn dialer Dialer - serializer serializers.Serializer + serializer *influx.Serializer url *url.URL } @@ -89,7 +89,11 @@ func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error for _, metric := range metrics { octets, err := c.serializer.Serialize(metric) if err != nil { - return fmt.Errorf("could not serialize metric: %v", err) + // Since we are serializing multiple metrics, don't fail the + // entire batch just because of one unserializable metric. + log.Printf("E! [outputs.influxdb] when writing to [%s] could not serialize metric: %v", + c.URL(), err) + continue } _, err = c.conn.Write(octets) diff --git a/plugins/outputs/influxdb/udp_test.go b/plugins/outputs/influxdb/udp_test.go index 9bced4262..61b3f1ded 100644 --- a/plugins/outputs/influxdb/udp_test.go +++ b/plugins/outputs/influxdb/udp_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "log" "net" "net/url" "sync" @@ -13,7 +14,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs/influxdb" - "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/stretchr/testify/require" ) @@ -65,19 +65,6 @@ func (d *MockDialer) DialContext(ctx context.Context, network string, address st return d.DialContextF(network, address) } -type MockSerializer struct { - SerializeF func(metric telegraf.Metric) ([]byte, error) - SerializeBatchF func(metrics []telegraf.Metric) ([]byte, error) -} - -func (s *MockSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { - return s.SerializeF(metric) -} - -func (s *MockSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { - return s.SerializeBatchF(metrics) -} - func TestUDP_NewUDPClientNoURL(t *testing.T) { config := &influxdb.UDPConfig{} _, err := influxdb.NewUDPClient(config) @@ -177,28 +164,69 @@ func TestUDP_WriteError(t *testing.T) { require.True(t, closed) } -func TestUDP_SerializeError(t *testing.T) { - config := &influxdb.UDPConfig{ - URL: getURL(), - Dialer: &MockDialer{ - DialContextF: func(network, address string) (influxdb.Conn, error) { - conn := &MockConn{} - return conn, nil +func TestUDP_ErrorLogging(t *testing.T) { + tests := []struct { + name string + config *influxdb.UDPConfig + metrics []telegraf.Metric + logContains string + }{ + { + name: "logs need more space", + config: &influxdb.UDPConfig{ + MaxPayloadSize: 1, + URL: getURL(), + Dialer: &MockDialer{ + DialContextF: func(network, address string) (influxdb.Conn, error) { + conn := &MockConn{} + return conn, nil + }, + }, }, + metrics: []telegraf.Metric{getMetric()}, + logContains: `could not serialize metric: "cpu": need more space`, }, - Serializer: &MockSerializer{ - SerializeF: func(metric telegraf.Metric) ([]byte, error) { - return nil, influx.ErrNeedMoreSpace + { + name: "logs series name", + config: &influxdb.UDPConfig{ + URL: getURL(), + Dialer: &MockDialer{ + DialContextF: func(network, address string) (influxdb.Conn, error) { + conn := &MockConn{} + return conn, nil + }, + }, }, + metrics: []telegraf.Metric{ + func() telegraf.Metric { + metric, _ := metric.New( + "cpu", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ) + return metric + }(), + }, + logContains: `could not serialize metric: "cpu,host=example.org": no serializable fields`, }, } - client, err := influxdb.NewUDPClient(config) - require.NoError(t, err) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var b bytes.Buffer + log.SetOutput(&b) - ctx := context.Background() - err = client.Write(ctx, []telegraf.Metric{getMetric()}) - require.Error(t, err) - require.Contains(t, err.Error(), influx.ErrNeedMoreSpace.Error()) + client, err := influxdb.NewUDPClient(tt.config) + require.NoError(t, err) + + ctx := context.Background() + err = client.Write(ctx, tt.metrics) + require.NoError(t, err) + require.Contains(t, b.String(), tt.logContains) + }) + } } func TestUDP_WriteWithRealConn(t *testing.T) { diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go index f052c9c93..2989e44e9 100644 --- a/plugins/serializers/influx/influx.go +++ b/plugins/serializers/influx/influx.go @@ -27,30 +27,34 @@ const ( UintSupport FieldTypeSupport = 1 << iota ) -// MetricError is an error causing a metric to be unserializable. +var ( + NeedMoreSpace = "need more space" + InvalidName = "invalid name" + NoFields = "no serializable fields" +) + +// MetricError is an error causing an entire metric to be unserializable. type MetricError struct { - s string + series string + reason string } func (e MetricError) Error() string { - return e.s + if e.series != "" { + return fmt.Sprintf("%q: %s", e.series, e.reason) + } + return e.reason } // FieldError is an error causing a field to be unserializable. type FieldError struct { - s string + reason string } func (e FieldError) Error() string { - return e.s + return e.reason } -var ( - ErrNeedMoreSpace = &MetricError{"need more space"} - ErrInvalidName = &MetricError{"invalid name"} - ErrNoFields = &MetricError{"no serializable fields"} -) - // Serializer is a serializer for line protocol. type Serializer struct { maxLineBytes int @@ -102,17 +106,20 @@ func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) { return out, nil } +// SerializeBatch writes the slice of metrics and returns a byte slice of the +// results. The returned byte slice may contain multiple lines of data. func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { - var batch bytes.Buffer + s.buf.Reset() for _, m := range metrics { - _, err := s.Write(&batch, m) + _, err := s.Write(&s.buf, m) if err != nil { return nil, err } } - return batch.Bytes(), nil + out := make([]byte, s.buf.Len()) + copy(out, s.buf.Bytes()) + return out, nil } - func (s *Serializer) Write(w io.Writer, m telegraf.Metric) (int, error) { err := s.writeMetric(w, m) return s.bytesWritten, err @@ -135,7 +142,7 @@ func (s *Serializer) buildHeader(m telegraf.Metric) error { name := nameEscape(m.Name()) if name == "" { - return ErrInvalidName + return s.newMetricError(InvalidName) } s.header = append(s.header, name...) @@ -222,9 +229,10 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error { } if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes { - // Need at least one field per line + // Need at least one field per line, this metric cannot be fit + // into the max line bytes. if firstField { - return ErrNeedMoreSpace + return s.newMetricError(NeedMoreSpace) } err = s.write(w, s.footer) @@ -232,21 +240,12 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error { return err } + firstField = true bytesNeeded = len(s.header) + len(s.pair) + len(s.footer) - if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes { - return ErrNeedMoreSpace + if bytesNeeded > s.maxLineBytes { + return s.newMetricError(NeedMoreSpace) } - - err = s.write(w, s.header) - if err != nil { - return err - } - - s.write(w, s.pair) - pairsLen += len(s.pair) - firstField = false - continue } if firstField { @@ -261,18 +260,28 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error { } } - s.write(w, s.pair) + err = s.write(w, s.pair) + if err != nil { + return err + } pairsLen += len(s.pair) firstField = false } if firstField { - return ErrNoFields + return s.newMetricError(NoFields) } return s.write(w, s.footer) +} +func (s *Serializer) newMetricError(reason string) *MetricError { + if len(s.header) != 0 { + series := bytes.TrimRight(s.header, " ") + return &MetricError{series: string(series), reason: reason} + } + return &MetricError{reason: reason} } func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, error) { diff --git a/plugins/serializers/influx/influx_test.go b/plugins/serializers/influx/influx_test.go index 74bffe5e4..2c1cbd587 100644 --- a/plugins/serializers/influx/influx_test.go +++ b/plugins/serializers/influx/influx_test.go @@ -23,7 +23,7 @@ var tests = []struct { typeSupport FieldTypeSupport input telegraf.Metric output []byte - err error + errReason string }{ { name: "minimal", @@ -98,7 +98,7 @@ var tests = []struct { time.Unix(0, 0), ), ), - err: ErrNoFields, + errReason: NoFields, }, { name: "float Inf", @@ -333,8 +333,8 @@ var tests = []struct { time.Unix(1519194109, 42), ), ), - output: nil, - err: ErrNeedMoreSpace, + output: nil, + errReason: NeedMoreSpace, }, { name: "no fields", @@ -346,7 +346,7 @@ var tests = []struct { time.Unix(0, 0), ), ), - err: ErrNoFields, + errReason: NoFields, }, { name: "procstat", @@ -427,7 +427,10 @@ func TestSerializer(t *testing.T) { serializer.SetFieldSortOrder(SortFields) serializer.SetFieldTypeSupport(tt.typeSupport) output, err := serializer.Serialize(tt.input) - require.Equal(t, tt.err, err) + if tt.errReason != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.errReason) + } require.Equal(t, string(tt.output), string(output)) }) } diff --git a/plugins/serializers/influx/reader.go b/plugins/serializers/influx/reader.go index 4a755c88d..d0dad8eeb 100644 --- a/plugins/serializers/influx/reader.go +++ b/plugins/serializers/influx/reader.go @@ -2,7 +2,6 @@ package influx import ( "bytes" - "fmt" "io" "log" @@ -54,17 +53,11 @@ func (r *reader) Read(p []byte) (int, error) { r.offset += 1 if err != nil { r.buf.Reset() - switch err.(type) { - case *MetricError: - // Since we are serializing an array of metrics, don't fail + if err != nil { + // Since we are serializing multiple metrics, don't fail the // the entire batch just because of one unserializable metric. - log.Printf( - "D! [serializers.influx] could not serialize metric %q: %v; discarding metric", - metric.Name(), err) + log.Printf("E! [serializers.influx] could not serialize metric: %v; discarding metric", err) continue - default: - fmt.Println(err) - return 0, err } } break