Allow metrics to be unserializable in influx.Reader (#4047)
Metrics that are unserializable will be logged at debug level, but the rest of the batch will be sent. Unserializable metrics can occur during normal operation such as if you remove all fields from a metric or the metric cannot fit within the line size limit.
This commit is contained in:
parent
6e0e6db1ee
commit
07760b2758
|
@ -2,8 +2,9 @@ package influx
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -26,14 +27,28 @@ const (
|
||||||
UintSupport FieldTypeSupport = 1 << iota
|
UintSupport FieldTypeSupport = 1 << iota
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// MetricError is an error causing a metric to be unserializable.
|
||||||
|
type MetricError struct {
|
||||||
|
s string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e MetricError) Error() string {
|
||||||
|
return e.s
|
||||||
|
}
|
||||||
|
|
||||||
|
// FieldError is an error causing a field to be unserializable.
|
||||||
|
type FieldError struct {
|
||||||
|
s string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e FieldError) Error() string {
|
||||||
|
return e.s
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrNeedMoreSpace = errors.New("need more space")
|
ErrNeedMoreSpace = &MetricError{"need more space"}
|
||||||
ErrInvalidName = errors.New("invalid name")
|
ErrInvalidName = &MetricError{"invalid name"}
|
||||||
ErrInvalidFieldKey = errors.New("invalid field key")
|
ErrNoFields = &MetricError{"no serializable fields"}
|
||||||
ErrInvalidFieldType = errors.New("invalid field type")
|
|
||||||
ErrFieldIsNaN = errors.New("is NaN")
|
|
||||||
ErrFieldIsInf = errors.New("is Inf")
|
|
||||||
ErrNoFields = errors.New("no fields")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Serializer is a serializer for line protocol.
|
// Serializer is a serializer for line protocol.
|
||||||
|
@ -148,7 +163,7 @@ func (s *Serializer) buildFieldPair(key string, value interface{}) error {
|
||||||
// Some keys are not encodeable as line protocol, such as those with a
|
// Some keys are not encodeable as line protocol, such as those with a
|
||||||
// trailing '\' or empty strings.
|
// trailing '\' or empty strings.
|
||||||
if key == "" {
|
if key == "" {
|
||||||
return ErrInvalidFieldKey
|
return &FieldError{"invalid field key"}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.pair = append(s.pair, key...)
|
s.pair = append(s.pair, key...)
|
||||||
|
@ -182,6 +197,9 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
|
||||||
for _, field := range m.FieldList() {
|
for _, field := range m.FieldList() {
|
||||||
err = s.buildFieldPair(field.Key, field.Value)
|
err = s.buildFieldPair(field.Key, field.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Printf(
|
||||||
|
"D! [serializers.influx] could not serialize field %q: %v; discarding field",
|
||||||
|
field.Key, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,11 +280,11 @@ func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, er
|
||||||
return appendIntField(buf, v), nil
|
return appendIntField(buf, v), nil
|
||||||
case float64:
|
case float64:
|
||||||
if math.IsNaN(v) {
|
if math.IsNaN(v) {
|
||||||
return nil, ErrFieldIsNaN
|
return nil, &FieldError{"is NaN"}
|
||||||
}
|
}
|
||||||
|
|
||||||
if math.IsInf(v, 0) {
|
if math.IsInf(v, 0) {
|
||||||
return nil, ErrFieldIsInf
|
return nil, &FieldError{"is Inf"}
|
||||||
}
|
}
|
||||||
|
|
||||||
return appendFloatField(buf, v), nil
|
return appendFloatField(buf, v), nil
|
||||||
|
@ -274,8 +292,9 @@ func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, er
|
||||||
return appendStringField(buf, v), nil
|
return appendStringField(buf, v), nil
|
||||||
case bool:
|
case bool:
|
||||||
return appendBoolField(buf, v), nil
|
return appendBoolField(buf, v), nil
|
||||||
|
default:
|
||||||
|
return buf, &FieldError{fmt.Sprintf("invalid value type: %T", v)}
|
||||||
}
|
}
|
||||||
return buf, ErrInvalidFieldType
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendUintField(buf []byte, value uint64) []byte {
|
func appendUintField(buf []byte, value uint64) []byte {
|
||||||
|
|
|
@ -2,7 +2,9 @@ package influx
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
)
|
)
|
||||||
|
@ -47,11 +49,25 @@ func (r *reader) Read(p []byte) (int, error) {
|
||||||
return 0, io.EOF
|
return 0, io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := r.serializer.Write(r.buf, r.metrics[r.offset])
|
for _, metric := range r.metrics[r.offset:] {
|
||||||
r.offset += 1
|
_, err := r.serializer.Write(r.buf, metric)
|
||||||
if err != nil {
|
r.offset += 1
|
||||||
r.buf.Reset()
|
if err != nil {
|
||||||
return 0, err
|
r.buf.Reset()
|
||||||
|
switch err.(type) {
|
||||||
|
case *MetricError:
|
||||||
|
// Since we are serializing an array of metrics, don't fail
|
||||||
|
// the entire batch just because of one unserializable metric.
|
||||||
|
log.Printf(
|
||||||
|
"D! [serializers.influx] could not serialize metric %q: %v; discarding metric",
|
||||||
|
metric.Name(), err)
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
fmt.Println(err)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.buf.Read(p)
|
return r.buf.Read(p)
|
||||||
|
|
|
@ -83,6 +83,62 @@ func TestReader(t *testing.T) {
|
||||||
},
|
},
|
||||||
expected: []byte("cpu value=42 0\n"),
|
expected: []byte("cpu value=42 0\n"),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "continue on failed metrics",
|
||||||
|
maxLineBytes: 4096,
|
||||||
|
bufferSize: 15,
|
||||||
|
input: []telegraf.Metric{
|
||||||
|
MustMetric(
|
||||||
|
metric.New(
|
||||||
|
"",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
MustMetric(
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: []byte("cpu value=42 0\n"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "last metric failed regression",
|
||||||
|
maxLineBytes: 4096,
|
||||||
|
bufferSize: 15,
|
||||||
|
input: []telegraf.Metric{
|
||||||
|
MustMetric(
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
MustMetric(
|
||||||
|
metric.New(
|
||||||
|
"",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: []byte("cpu value=42 0\n"),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
|
Loading…
Reference in New Issue