package influx import ( "bytes" "io" "github.com/influxdata/telegraf" ) // reader is an io.Reader for line protocol. type reader struct { metrics []telegraf.Metric serializer *Serializer offset int buf *bytes.Buffer } // NewReader creates a new reader over the given metrics. func NewReader(metrics []telegraf.Metric, serializer *Serializer) io.Reader { return &reader{ metrics: metrics, serializer: serializer, offset: 0, buf: bytes.NewBuffer(make([]byte, 0, serializer.maxLineBytes)), } } // SetMetrics changes the metrics to be read. func (r *reader) SetMetrics(metrics []telegraf.Metric) { r.metrics = metrics r.offset = 0 r.buf.Reset() } // Read reads up to len(p) bytes of the current metric into p, each call will // only serialize at most one metric so the number of bytes read may be less // than p. Subsequent calls to Read will read the next metric until all are // emitted. If a metric cannot be serialized, an error will be returned, you // may resume with the next metric by calling Read again. When all metrics // are emitted the err is io.EOF. func (r *reader) Read(p []byte) (int, error) { if r.buf.Len() > 0 { return r.buf.Read(p) } if r.offset >= len(r.metrics) { return 0, io.EOF } _, err := r.serializer.Write(r.buf, r.metrics[r.offset]) r.offset += 1 if err != nil { r.buf.Reset() return 0, err } return r.buf.Read(p) }