Allow metrics to be unserializable in influx.Reader (#4047)

Metrics that are unserializable will be logged at debug level, but the
rest of the batch will be sent.  Unserializable metrics can occur during
normal operation such as if you remove all fields from a metric or the
metric cannot fit within the line size limit.
This commit is contained in:
Daniel Nelson 2018-04-19 16:24:31 -07:00 committed by GitHub
parent 42fee824f8
commit 29b37e67c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 108 additions and 17 deletions

View File

@ -2,8 +2,9 @@ package influx
import (
"bytes"
"errors"
"fmt"
"io"
"log"
"math"
"sort"
"strconv"
@ -26,14 +27,28 @@ const (
UintSupport FieldTypeSupport = 1 << iota
)
// MetricError is an error causing a metric to be unserializable.
type MetricError struct {
s string
}
func (e MetricError) Error() string {
return e.s
}
// FieldError is an error causing a field to be unserializable.
type FieldError struct {
s string
}
func (e FieldError) Error() string {
return e.s
}
var (
ErrNeedMoreSpace = errors.New("need more space")
ErrInvalidName = errors.New("invalid name")
ErrInvalidFieldKey = errors.New("invalid field key")
ErrInvalidFieldType = errors.New("invalid field type")
ErrFieldIsNaN = errors.New("is NaN")
ErrFieldIsInf = errors.New("is Inf")
ErrNoFields = errors.New("no fields")
ErrNeedMoreSpace = &MetricError{"need more space"}
ErrInvalidName = &MetricError{"invalid name"}
ErrNoFields = &MetricError{"no serializable fields"}
)
// Serializer is a serializer for line protocol.
@ -148,7 +163,7 @@ func (s *Serializer) buildFieldPair(key string, value interface{}) error {
// Some keys are not encodeable as line protocol, such as those with a
// trailing '\' or empty strings.
if key == "" {
return ErrInvalidFieldKey
return &FieldError{"invalid field key"}
}
s.pair = append(s.pair, key...)
@ -182,6 +197,9 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
for _, field := range m.FieldList() {
err = s.buildFieldPair(field.Key, field.Value)
if err != nil {
log.Printf(
"D! [serializers.influx] could not serialize field %q: %v; discarding field",
field.Key, err)
continue
}
@ -262,11 +280,11 @@ func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, er
return appendIntField(buf, v), nil
case float64:
if math.IsNaN(v) {
return nil, ErrFieldIsNaN
return nil, &FieldError{"is NaN"}
}
if math.IsInf(v, 0) {
return nil, ErrFieldIsInf
return nil, &FieldError{"is Inf"}
}
return appendFloatField(buf, v), nil
@ -274,8 +292,9 @@ func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, er
return appendStringField(buf, v), nil
case bool:
return appendBoolField(buf, v), nil
default:
return buf, &FieldError{fmt.Sprintf("invalid value type: %T", v)}
}
return buf, ErrInvalidFieldType
}
func appendUintField(buf []byte, value uint64) []byte {

View File

@ -2,7 +2,9 @@ package influx
import (
"bytes"
"fmt"
"io"
"log"
"github.com/influxdata/telegraf"
)
@ -47,11 +49,25 @@ func (r *reader) Read(p []byte) (int, error) {
return 0, io.EOF
}
_, err := r.serializer.Write(r.buf, r.metrics[r.offset])
r.offset += 1
if err != nil {
r.buf.Reset()
return 0, err
for _, metric := range r.metrics[r.offset:] {
_, err := r.serializer.Write(r.buf, metric)
r.offset += 1
if err != nil {
r.buf.Reset()
switch err.(type) {
case *MetricError:
// Since we are serializing an array of metrics, don't fail
// the entire batch just because of one unserializable metric.
log.Printf(
"D! [serializers.influx] could not serialize metric %q: %v; discarding metric",
metric.Name(), err)
continue
default:
fmt.Println(err)
return 0, err
}
}
break
}
return r.buf.Read(p)

View File

@ -83,6 +83,62 @@ func TestReader(t *testing.T) {
},
expected: []byte("cpu value=42 0\n"),
},
{
name: "continue on failed metrics",
maxLineBytes: 4096,
bufferSize: 15,
input: []telegraf.Metric{
MustMetric(
metric.New(
"",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
),
MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
),
},
expected: []byte("cpu value=42 0\n"),
},
{
name: "last metric failed regression",
maxLineBytes: 4096,
bufferSize: 15,
input: []telegraf.Metric{
MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
),
MustMetric(
metric.New(
"",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
),
},
expected: []byte("cpu value=42 0\n"),
},
}
for _, tt := range tests {