Metrics that are unserializable will be logged at debug level, but the
rest of the batch will be sent. Unserializable metrics can occur during
normal operation such as if you remove all fields from a metric or the
metric cannot fit within the line size limit.
(cherry picked from commit 29b37e67c2)
322 lines
6.5 KiB
Go
322 lines
6.5 KiB
Go
package influx
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"math"
|
|
"sort"
|
|
"strconv"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
)
|
|
|
|
const MaxInt = int(^uint(0) >> 1)
|
|
|
|
type FieldSortOrder int
|
|
|
|
const (
|
|
NoSortFields FieldSortOrder = iota
|
|
SortFields
|
|
)
|
|
|
|
type FieldTypeSupport int
|
|
|
|
const (
|
|
UintSupport FieldTypeSupport = 1 << iota
|
|
)
|
|
|
|
// MetricError is an error causing a metric to be unserializable.
|
|
type MetricError struct {
|
|
s string
|
|
}
|
|
|
|
func (e MetricError) Error() string {
|
|
return e.s
|
|
}
|
|
|
|
// FieldError is an error causing a field to be unserializable.
|
|
type FieldError struct {
|
|
s string
|
|
}
|
|
|
|
func (e FieldError) Error() string {
|
|
return e.s
|
|
}
|
|
|
|
var (
|
|
ErrNeedMoreSpace = &MetricError{"need more space"}
|
|
ErrInvalidName = &MetricError{"invalid name"}
|
|
ErrNoFields = &MetricError{"no serializable fields"}
|
|
)
|
|
|
|
// Serializer is a serializer for line protocol.
|
|
type Serializer struct {
|
|
maxLineBytes int
|
|
bytesWritten int
|
|
fieldSortOrder FieldSortOrder
|
|
fieldTypeSupport FieldTypeSupport
|
|
|
|
buf bytes.Buffer
|
|
header []byte
|
|
footer []byte
|
|
pair []byte
|
|
}
|
|
|
|
func NewSerializer() *Serializer {
|
|
serializer := &Serializer{
|
|
fieldSortOrder: NoSortFields,
|
|
|
|
header: make([]byte, 0, 50),
|
|
footer: make([]byte, 0, 21),
|
|
pair: make([]byte, 0, 50),
|
|
}
|
|
return serializer
|
|
}
|
|
|
|
func (s *Serializer) SetMaxLineBytes(bytes int) {
|
|
s.maxLineBytes = bytes
|
|
}
|
|
|
|
func (s *Serializer) SetFieldSortOrder(order FieldSortOrder) {
|
|
s.fieldSortOrder = order
|
|
}
|
|
|
|
func (s *Serializer) SetFieldTypeSupport(typeSupport FieldTypeSupport) {
|
|
s.fieldTypeSupport = typeSupport
|
|
}
|
|
|
|
// Serialize writes the telegraf.Metric to a byte slice. May produce multiple
|
|
// lines of output if longer than maximum line length. Lines are terminated
|
|
// with a newline (LF) char.
|
|
func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) {
|
|
s.buf.Reset()
|
|
err := s.writeMetric(&s.buf, m)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
out := make([]byte, s.buf.Len())
|
|
copy(out, s.buf.Bytes())
|
|
return out, nil
|
|
}
|
|
|
|
func (s *Serializer) Write(w io.Writer, m telegraf.Metric) (int, error) {
|
|
err := s.writeMetric(w, m)
|
|
return s.bytesWritten, err
|
|
}
|
|
|
|
func (s *Serializer) writeString(w io.Writer, str string) error {
|
|
n, err := io.WriteString(w, str)
|
|
s.bytesWritten += n
|
|
return err
|
|
}
|
|
|
|
func (s *Serializer) write(w io.Writer, b []byte) error {
|
|
n, err := w.Write(b)
|
|
s.bytesWritten += n
|
|
return err
|
|
}
|
|
|
|
func (s *Serializer) buildHeader(m telegraf.Metric) error {
|
|
s.header = s.header[:0]
|
|
|
|
name := nameEscape(m.Name())
|
|
if name == "" {
|
|
return ErrInvalidName
|
|
}
|
|
|
|
s.header = append(s.header, name...)
|
|
|
|
for _, tag := range m.TagList() {
|
|
key := escape(tag.Key)
|
|
value := escape(tag.Value)
|
|
|
|
// Some keys and values are not encodeable as line protocol, such as
|
|
// those with a trailing '\' or empty strings.
|
|
if key == "" || value == "" {
|
|
continue
|
|
}
|
|
|
|
s.header = append(s.header, ',')
|
|
s.header = append(s.header, key...)
|
|
s.header = append(s.header, '=')
|
|
s.header = append(s.header, value...)
|
|
}
|
|
|
|
s.header = append(s.header, ' ')
|
|
return nil
|
|
}
|
|
|
|
func (s *Serializer) buildFooter(m telegraf.Metric) {
|
|
s.footer = s.footer[:0]
|
|
s.footer = append(s.footer, ' ')
|
|
s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano(), 10)
|
|
s.footer = append(s.footer, '\n')
|
|
}
|
|
|
|
func (s *Serializer) buildFieldPair(key string, value interface{}) error {
|
|
s.pair = s.pair[:0]
|
|
key = escape(key)
|
|
|
|
// Some keys are not encodeable as line protocol, such as those with a
|
|
// trailing '\' or empty strings.
|
|
if key == "" {
|
|
return &FieldError{"invalid field key"}
|
|
}
|
|
|
|
s.pair = append(s.pair, key...)
|
|
s.pair = append(s.pair, '=')
|
|
pair, err := s.appendFieldValue(s.pair, value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.pair = pair
|
|
return nil
|
|
}
|
|
|
|
func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
|
|
var err error
|
|
|
|
err = s.buildHeader(m)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.buildFooter(m)
|
|
|
|
if s.fieldSortOrder == SortFields {
|
|
sort.Slice(m.FieldList(), func(i, j int) bool {
|
|
return m.FieldList()[i].Key < m.FieldList()[j].Key
|
|
})
|
|
}
|
|
|
|
pairsLen := 0
|
|
firstField := true
|
|
for _, field := range m.FieldList() {
|
|
err = s.buildFieldPair(field.Key, field.Value)
|
|
if err != nil {
|
|
log.Printf(
|
|
"D! [serializers.influx] could not serialize field %q: %v; discarding field",
|
|
field.Key, err)
|
|
continue
|
|
}
|
|
|
|
bytesNeeded := len(s.header) + pairsLen + len(s.pair) + len(s.footer)
|
|
|
|
// Additional length needed for field separator `,`
|
|
if !firstField {
|
|
bytesNeeded += 1
|
|
}
|
|
|
|
if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes {
|
|
// Need at least one field per line
|
|
if firstField {
|
|
return ErrNeedMoreSpace
|
|
}
|
|
|
|
err = s.write(w, s.footer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
bytesNeeded = len(s.header) + len(s.pair) + len(s.footer)
|
|
|
|
if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes {
|
|
return ErrNeedMoreSpace
|
|
}
|
|
|
|
err = s.write(w, s.header)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.write(w, s.pair)
|
|
pairsLen += len(s.pair)
|
|
firstField = false
|
|
continue
|
|
}
|
|
|
|
if firstField {
|
|
err = s.write(w, s.header)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
err = s.writeString(w, ",")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
s.write(w, s.pair)
|
|
|
|
pairsLen += len(s.pair)
|
|
firstField = false
|
|
}
|
|
|
|
if firstField {
|
|
return ErrNoFields
|
|
}
|
|
|
|
return s.write(w, s.footer)
|
|
|
|
}
|
|
|
|
func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, error) {
|
|
switch v := value.(type) {
|
|
case uint64:
|
|
if s.fieldTypeSupport&UintSupport != 0 {
|
|
return appendUintField(buf, v), nil
|
|
} else {
|
|
if v <= uint64(MaxInt) {
|
|
return appendIntField(buf, int64(v)), nil
|
|
} else {
|
|
return appendIntField(buf, int64(MaxInt)), nil
|
|
}
|
|
}
|
|
case int64:
|
|
return appendIntField(buf, v), nil
|
|
case float64:
|
|
if math.IsNaN(v) {
|
|
return nil, &FieldError{"is NaN"}
|
|
}
|
|
|
|
if math.IsInf(v, 0) {
|
|
return nil, &FieldError{"is Inf"}
|
|
}
|
|
|
|
return appendFloatField(buf, v), nil
|
|
case string:
|
|
return appendStringField(buf, v), nil
|
|
case bool:
|
|
return appendBoolField(buf, v), nil
|
|
default:
|
|
return buf, &FieldError{fmt.Sprintf("invalid value type: %T", v)}
|
|
}
|
|
}
|
|
|
|
func appendUintField(buf []byte, value uint64) []byte {
|
|
return append(strconv.AppendUint(buf, value, 10), 'u')
|
|
}
|
|
|
|
func appendIntField(buf []byte, value int64) []byte {
|
|
return append(strconv.AppendInt(buf, value, 10), 'i')
|
|
}
|
|
|
|
func appendFloatField(buf []byte, value float64) []byte {
|
|
return strconv.AppendFloat(buf, value, 'f', -1, 64)
|
|
}
|
|
|
|
func appendBoolField(buf []byte, value bool) []byte {
|
|
return strconv.AppendBool(buf, value)
|
|
}
|
|
|
|
func appendStringField(buf []byte, value string) []byte {
|
|
buf = append(buf, '"')
|
|
buf = append(buf, stringFieldEscape(value)...)
|
|
buf = append(buf, '"')
|
|
return buf
|
|
}
|