Files
telegraf/plugins/serializers/influx/reader.go
Daniel Nelson 07760b2758 Allow metrics to be unserializable in influx.Reader (#4047)
Metrics that are unserializable will be logged at debug level, but the
rest of the batch will be sent.  Unserializable metrics can occur during
normal operation such as if you remove all fields from a metric or the
metric cannot fit within the line size limit.
2018-04-19 16:24:31 -07:00

75 lines
1.8 KiB
Go

package influx
import (
"bytes"
"fmt"
"io"
"log"
"github.com/influxdata/telegraf"
)
// reader is an io.Reader for line protocol.
type reader struct {
metrics []telegraf.Metric
serializer *Serializer
offset int
buf *bytes.Buffer
}
// NewReader creates a new reader over the given metrics.
func NewReader(metrics []telegraf.Metric, serializer *Serializer) io.Reader {
return &reader{
metrics: metrics,
serializer: serializer,
offset: 0,
buf: bytes.NewBuffer(make([]byte, 0, serializer.maxLineBytes)),
}
}
// SetMetrics changes the metrics to be read.
func (r *reader) SetMetrics(metrics []telegraf.Metric) {
r.metrics = metrics
r.offset = 0
r.buf.Reset()
}
// Read reads up to len(p) bytes of the current metric into p, each call will
// only serialize at most one metric so the number of bytes read may be less
// than p. Subsequent calls to Read will read the next metric until all are
// emitted. If a metric cannot be serialized, an error will be returned, you
// may resume with the next metric by calling Read again. When all metrics
// are emitted the err is io.EOF.
func (r *reader) Read(p []byte) (int, error) {
if r.buf.Len() > 0 {
return r.buf.Read(p)
}
if r.offset >= len(r.metrics) {
return 0, io.EOF
}
for _, metric := range r.metrics[r.offset:] {
_, err := r.serializer.Write(r.buf, metric)
r.offset += 1
if err != nil {
r.buf.Reset()
switch err.(type) {
case *MetricError:
// Since we are serializing an array of metrics, don't fail
// the entire batch just because of one unserializable metric.
log.Printf(
"D! [serializers.influx] could not serialize metric %q: %v; discarding metric",
metric.Name(), err)
continue
default:
fmt.Println(err)
return 0, err
}
}
break
}
return r.buf.Read(p)
}