From 29b37e67c28eecc1a0cb9ae83db746d67addd290 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Thu, 19 Apr 2018 16:24:31 -0700 Subject: [PATCH] Allow metrics to be unserializable in influx.Reader (#4047) Metrics that are unserializable will be logged at debug level, but the rest of the batch will be sent. Unserializable metrics can occur during normal operation such as if you remove all fields from a metric or the metric cannot fit within the line size limit. --- plugins/serializers/influx/influx.go | 43 ++++++++++++----- plugins/serializers/influx/reader.go | 26 +++++++++-- plugins/serializers/influx/reader_test.go | 56 +++++++++++++++++++++++ 3 files changed, 108 insertions(+), 17 deletions(-) diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go index 7d68ed3df..926bfcb34 100644 --- a/plugins/serializers/influx/influx.go +++ b/plugins/serializers/influx/influx.go @@ -2,8 +2,9 @@ package influx import ( "bytes" - "errors" + "fmt" "io" + "log" "math" "sort" "strconv" @@ -26,14 +27,28 @@ const ( UintSupport FieldTypeSupport = 1 << iota ) +// MetricError is an error causing a metric to be unserializable. +type MetricError struct { + s string +} + +func (e MetricError) Error() string { + return e.s +} + +// FieldError is an error causing a field to be unserializable. +type FieldError struct { + s string +} + +func (e FieldError) Error() string { + return e.s +} + var ( - ErrNeedMoreSpace = errors.New("need more space") - ErrInvalidName = errors.New("invalid name") - ErrInvalidFieldKey = errors.New("invalid field key") - ErrInvalidFieldType = errors.New("invalid field type") - ErrFieldIsNaN = errors.New("is NaN") - ErrFieldIsInf = errors.New("is Inf") - ErrNoFields = errors.New("no fields") + ErrNeedMoreSpace = &MetricError{"need more space"} + ErrInvalidName = &MetricError{"invalid name"} + ErrNoFields = &MetricError{"no serializable fields"} ) // Serializer is a serializer for line protocol. @@ -148,7 +163,7 @@ func (s *Serializer) buildFieldPair(key string, value interface{}) error { // Some keys are not encodeable as line protocol, such as those with a // trailing '\' or empty strings. if key == "" { - return ErrInvalidFieldKey + return &FieldError{"invalid field key"} } s.pair = append(s.pair, key...) @@ -182,6 +197,9 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error { for _, field := range m.FieldList() { err = s.buildFieldPair(field.Key, field.Value) if err != nil { + log.Printf( + "D! [serializers.influx] could not serialize field %q: %v; discarding field", + field.Key, err) continue } @@ -262,11 +280,11 @@ func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, er return appendIntField(buf, v), nil case float64: if math.IsNaN(v) { - return nil, ErrFieldIsNaN + return nil, &FieldError{"is NaN"} } if math.IsInf(v, 0) { - return nil, ErrFieldIsInf + return nil, &FieldError{"is Inf"} } return appendFloatField(buf, v), nil @@ -274,8 +292,9 @@ func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, er return appendStringField(buf, v), nil case bool: return appendBoolField(buf, v), nil + default: + return buf, &FieldError{fmt.Sprintf("invalid value type: %T", v)} } - return buf, ErrInvalidFieldType } func appendUintField(buf []byte, value uint64) []byte { diff --git a/plugins/serializers/influx/reader.go b/plugins/serializers/influx/reader.go index 590e2fafd..4a755c88d 100644 --- a/plugins/serializers/influx/reader.go +++ b/plugins/serializers/influx/reader.go @@ -2,7 +2,9 @@ package influx import ( "bytes" + "fmt" "io" + "log" "github.com/influxdata/telegraf" ) @@ -47,11 +49,25 @@ func (r *reader) Read(p []byte) (int, error) { return 0, io.EOF } - _, err := r.serializer.Write(r.buf, r.metrics[r.offset]) - r.offset += 1 - if err != nil { - r.buf.Reset() - return 0, err + for _, metric := range r.metrics[r.offset:] { + _, err := r.serializer.Write(r.buf, metric) + r.offset += 1 + if err != nil { + r.buf.Reset() + switch err.(type) { + case *MetricError: + // Since we are serializing an array of metrics, don't fail + // the entire batch just because of one unserializable metric. + log.Printf( + "D! [serializers.influx] could not serialize metric %q: %v; discarding metric", + metric.Name(), err) + continue + default: + fmt.Println(err) + return 0, err + } + } + break } return r.buf.Read(p) diff --git a/plugins/serializers/influx/reader_test.go b/plugins/serializers/influx/reader_test.go index 5dca21595..642b71b1c 100644 --- a/plugins/serializers/influx/reader_test.go +++ b/plugins/serializers/influx/reader_test.go @@ -83,6 +83,62 @@ func TestReader(t *testing.T) { }, expected: []byte("cpu value=42 0\n"), }, + { + name: "continue on failed metrics", + maxLineBytes: 4096, + bufferSize: 15, + input: []telegraf.Metric{ + MustMetric( + metric.New( + "", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ), + MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ), + }, + expected: []byte("cpu value=42 0\n"), + }, + { + name: "last metric failed regression", + maxLineBytes: 4096, + bufferSize: 15, + input: []telegraf.Metric{ + MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ), + MustMetric( + metric.New( + "", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ), + }, + expected: []byte("cpu value=42 0\n"), + }, } for _, tt := range tests {