Skip unserializable metric in influxDB UDP output (#4534)

This commit is contained in:
Daniel Nelson 2018-08-14 13:36:29 -07:00 committed by Greg
parent 763dc6990c
commit fa30f568ec
5 changed files with 119 additions and 82 deletions

View File

@ -3,11 +3,11 @@ package influxdb
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"net" "net"
"net/url" "net/url"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/influx"
) )
@ -28,7 +28,7 @@ type Conn interface {
type UDPConfig struct { type UDPConfig struct {
MaxPayloadSize int MaxPayloadSize int
URL *url.URL URL *url.URL
Serializer serializers.Serializer Serializer *influx.Serializer
Dialer Dialer Dialer Dialer
} }
@ -65,7 +65,7 @@ func NewUDPClient(config *UDPConfig) (*udpClient, error) {
type udpClient struct { type udpClient struct {
conn Conn conn Conn
dialer Dialer dialer Dialer
serializer serializers.Serializer serializer *influx.Serializer
url *url.URL url *url.URL
} }
@ -89,7 +89,11 @@ func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
for _, metric := range metrics { for _, metric := range metrics {
octets, err := c.serializer.Serialize(metric) octets, err := c.serializer.Serialize(metric)
if err != nil { if err != nil {
return fmt.Errorf("could not serialize metric: %v", err) // Since we are serializing multiple metrics, don't fail the
// entire batch just because of one unserializable metric.
log.Printf("E! [outputs.influxdb] when writing to [%s] could not serialize metric: %v",
c.URL(), err)
continue
} }
_, err = c.conn.Write(octets) _, err = c.conn.Write(octets)

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"log"
"net" "net"
"net/url" "net/url"
"sync" "sync"
@ -13,7 +14,6 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs/influxdb" "github.com/influxdata/telegraf/plugins/outputs/influxdb"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -65,19 +65,6 @@ func (d *MockDialer) DialContext(ctx context.Context, network string, address st
return d.DialContextF(network, address) return d.DialContextF(network, address)
} }
type MockSerializer struct {
SerializeF func(metric telegraf.Metric) ([]byte, error)
SerializeBatchF func(metrics []telegraf.Metric) ([]byte, error)
}
func (s *MockSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return s.SerializeF(metric)
}
func (s *MockSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
return s.SerializeBatchF(metrics)
}
func TestUDP_NewUDPClientNoURL(t *testing.T) { func TestUDP_NewUDPClientNoURL(t *testing.T) {
config := &influxdb.UDPConfig{} config := &influxdb.UDPConfig{}
_, err := influxdb.NewUDPClient(config) _, err := influxdb.NewUDPClient(config)
@ -177,8 +164,17 @@ func TestUDP_WriteError(t *testing.T) {
require.True(t, closed) require.True(t, closed)
} }
func TestUDP_SerializeError(t *testing.T) { func TestUDP_ErrorLogging(t *testing.T) {
config := &influxdb.UDPConfig{ tests := []struct {
name string
config *influxdb.UDPConfig
metrics []telegraf.Metric
logContains string
}{
{
name: "logs need more space",
config: &influxdb.UDPConfig{
MaxPayloadSize: 1,
URL: getURL(), URL: getURL(),
Dialer: &MockDialer{ Dialer: &MockDialer{
DialContextF: func(network, address string) (influxdb.Conn, error) { DialContextF: func(network, address string) (influxdb.Conn, error) {
@ -186,19 +182,51 @@ func TestUDP_SerializeError(t *testing.T) {
return conn, nil return conn, nil
}, },
}, },
Serializer: &MockSerializer{
SerializeF: func(metric telegraf.Metric) ([]byte, error) {
return nil, influx.ErrNeedMoreSpace
}, },
metrics: []telegraf.Metric{getMetric()},
logContains: `could not serialize metric: "cpu": need more space`,
},
{
name: "logs series name",
config: &influxdb.UDPConfig{
URL: getURL(),
Dialer: &MockDialer{
DialContextF: func(network, address string) (influxdb.Conn, error) {
conn := &MockConn{}
return conn, nil
},
},
},
metrics: []telegraf.Metric{
func() telegraf.Metric {
metric, _ := metric.New(
"cpu",
map[string]string{
"host": "example.org",
},
map[string]interface{}{},
time.Unix(0, 0),
)
return metric
}(),
},
logContains: `could not serialize metric: "cpu,host=example.org": no serializable fields`,
}, },
} }
client, err := influxdb.NewUDPClient(config) for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var b bytes.Buffer
log.SetOutput(&b)
client, err := influxdb.NewUDPClient(tt.config)
require.NoError(t, err) require.NoError(t, err)
ctx := context.Background() ctx := context.Background()
err = client.Write(ctx, []telegraf.Metric{getMetric()}) err = client.Write(ctx, tt.metrics)
require.Error(t, err) require.NoError(t, err)
require.Contains(t, err.Error(), influx.ErrNeedMoreSpace.Error()) require.Contains(t, b.String(), tt.logContains)
})
}
} }
func TestUDP_WriteWithRealConn(t *testing.T) { func TestUDP_WriteWithRealConn(t *testing.T) {

View File

@ -27,30 +27,34 @@ const (
UintSupport FieldTypeSupport = 1 << iota UintSupport FieldTypeSupport = 1 << iota
) )
// MetricError is an error causing a metric to be unserializable. var (
NeedMoreSpace = "need more space"
InvalidName = "invalid name"
NoFields = "no serializable fields"
)
// MetricError is an error causing an entire metric to be unserializable.
type MetricError struct { type MetricError struct {
s string series string
reason string
} }
func (e MetricError) Error() string { func (e MetricError) Error() string {
return e.s if e.series != "" {
return fmt.Sprintf("%q: %s", e.series, e.reason)
}
return e.reason
} }
// FieldError is an error causing a field to be unserializable. // FieldError is an error causing a field to be unserializable.
type FieldError struct { type FieldError struct {
s string reason string
} }
func (e FieldError) Error() string { func (e FieldError) Error() string {
return e.s return e.reason
} }
var (
ErrNeedMoreSpace = &MetricError{"need more space"}
ErrInvalidName = &MetricError{"invalid name"}
ErrNoFields = &MetricError{"no serializable fields"}
)
// Serializer is a serializer for line protocol. // Serializer is a serializer for line protocol.
type Serializer struct { type Serializer struct {
maxLineBytes int maxLineBytes int
@ -102,17 +106,20 @@ func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) {
return out, nil return out, nil
} }
// SerializeBatch writes the slice of metrics and returns a byte slice of the
// results. The returned byte slice may contain multiple lines of data.
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer s.buf.Reset()
for _, m := range metrics { for _, m := range metrics {
_, err := s.Write(&batch, m) _, err := s.Write(&s.buf, m)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
return batch.Bytes(), nil out := make([]byte, s.buf.Len())
copy(out, s.buf.Bytes())
return out, nil
} }
func (s *Serializer) Write(w io.Writer, m telegraf.Metric) (int, error) { func (s *Serializer) Write(w io.Writer, m telegraf.Metric) (int, error) {
err := s.writeMetric(w, m) err := s.writeMetric(w, m)
return s.bytesWritten, err return s.bytesWritten, err
@ -135,7 +142,7 @@ func (s *Serializer) buildHeader(m telegraf.Metric) error {
name := nameEscape(m.Name()) name := nameEscape(m.Name())
if name == "" { if name == "" {
return ErrInvalidName return s.newMetricError(InvalidName)
} }
s.header = append(s.header, name...) s.header = append(s.header, name...)
@ -222,9 +229,10 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
} }
if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes { if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes {
// Need at least one field per line // Need at least one field per line, this metric cannot be fit
// into the max line bytes.
if firstField { if firstField {
return ErrNeedMoreSpace return s.newMetricError(NeedMoreSpace)
} }
err = s.write(w, s.footer) err = s.write(w, s.footer)
@ -232,21 +240,12 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
return err return err
} }
firstField = true
bytesNeeded = len(s.header) + len(s.pair) + len(s.footer) bytesNeeded = len(s.header) + len(s.pair) + len(s.footer)
if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes { if bytesNeeded > s.maxLineBytes {
return ErrNeedMoreSpace return s.newMetricError(NeedMoreSpace)
} }
err = s.write(w, s.header)
if err != nil {
return err
}
s.write(w, s.pair)
pairsLen += len(s.pair)
firstField = false
continue
} }
if firstField { if firstField {
@ -261,18 +260,28 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
} }
} }
s.write(w, s.pair) err = s.write(w, s.pair)
if err != nil {
return err
}
pairsLen += len(s.pair) pairsLen += len(s.pair)
firstField = false firstField = false
} }
if firstField { if firstField {
return ErrNoFields return s.newMetricError(NoFields)
} }
return s.write(w, s.footer) return s.write(w, s.footer)
}
func (s *Serializer) newMetricError(reason string) *MetricError {
if len(s.header) != 0 {
series := bytes.TrimRight(s.header, " ")
return &MetricError{series: string(series), reason: reason}
}
return &MetricError{reason: reason}
} }
func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, error) { func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, error) {

View File

@ -23,7 +23,7 @@ var tests = []struct {
typeSupport FieldTypeSupport typeSupport FieldTypeSupport
input telegraf.Metric input telegraf.Metric
output []byte output []byte
err error errReason string
}{ }{
{ {
name: "minimal", name: "minimal",
@ -98,7 +98,7 @@ var tests = []struct {
time.Unix(0, 0), time.Unix(0, 0),
), ),
), ),
err: ErrNoFields, errReason: NoFields,
}, },
{ {
name: "float Inf", name: "float Inf",
@ -334,7 +334,7 @@ var tests = []struct {
), ),
), ),
output: nil, output: nil,
err: ErrNeedMoreSpace, errReason: NeedMoreSpace,
}, },
{ {
name: "no fields", name: "no fields",
@ -346,7 +346,7 @@ var tests = []struct {
time.Unix(0, 0), time.Unix(0, 0),
), ),
), ),
err: ErrNoFields, errReason: NoFields,
}, },
{ {
name: "procstat", name: "procstat",
@ -427,7 +427,10 @@ func TestSerializer(t *testing.T) {
serializer.SetFieldSortOrder(SortFields) serializer.SetFieldSortOrder(SortFields)
serializer.SetFieldTypeSupport(tt.typeSupport) serializer.SetFieldTypeSupport(tt.typeSupport)
output, err := serializer.Serialize(tt.input) output, err := serializer.Serialize(tt.input)
require.Equal(t, tt.err, err) if tt.errReason != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tt.errReason)
}
require.Equal(t, string(tt.output), string(output)) require.Equal(t, string(tt.output), string(output))
}) })
} }

View File

@ -2,7 +2,6 @@ package influx
import ( import (
"bytes" "bytes"
"fmt"
"io" "io"
"log" "log"
@ -54,17 +53,11 @@ func (r *reader) Read(p []byte) (int, error) {
r.offset += 1 r.offset += 1
if err != nil { if err != nil {
r.buf.Reset() r.buf.Reset()
switch err.(type) { if err != nil {
case *MetricError: // Since we are serializing multiple metrics, don't fail the
// Since we are serializing an array of metrics, don't fail
// the entire batch just because of one unserializable metric. // the entire batch just because of one unserializable metric.
log.Printf( log.Printf("E! [serializers.influx] could not serialize metric: %v; discarding metric", err)
"D! [serializers.influx] could not serialize metric %q: %v; discarding metric",
metric.Name(), err)
continue continue
default:
fmt.Println(err)
return 0, err
} }
} }
break break