From db7a4b24b6d253d956567868829434c7e0b64042 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 22 Nov 2016 12:51:57 +0000 Subject: [PATCH] Implement telegraf's own full metric type main reasons behind this: - make adding/removing tags cheap - make adding/removing fields cheap - make parsing cheaper - make parse -> decorate -> write out bytes metric flow much faster Refactor serializer to use byte buffer --- agent/accumulator_test.go | 3 +- internal/models/makemetric.go | 12 +- internal/models/running_aggregator.go | 3 +- internal/models/running_output.go | 15 +- metric.go | 194 +----- metric/inline_strconv_parse.go | 38 ++ metric/metric.go | 455 +++++++++++++ .../metric_benchmark_test.go | 23 +- metric_test.go => metric/metric_test.go | 20 +- metric/parse.go | 624 ++++++++++++++++++ plugins/aggregators/minmax/minmax_test.go | 5 +- plugins/inputs/logparser/grok/grok.go | 3 +- plugins/inputs/prometheus/parser.go | 3 +- .../webhooks/github/github_webhooks_models.go | 43 +- plugins/outputs/amqp/amqp.go | 11 +- plugins/outputs/file/file.go | 11 +- plugins/outputs/graphite/graphite.go | 10 +- plugins/outputs/graphite/graphite_test.go | 9 +- plugins/outputs/instrumental/instrumental.go | 43 +- .../outputs/instrumental/instrumental_test.go | 13 +- plugins/outputs/librato/librato_test.go | 11 +- plugins/outputs/mqtt/mqtt.go | 12 +- plugins/outputs/nats/nats.go | 13 +- plugins/outputs/nsq/nsq.go | 13 +- .../prometheus_client_test.go | 13 +- plugins/parsers/graphite/parser.go | 3 +- plugins/parsers/graphite/parser_test.go | 27 +- plugins/parsers/influx/parser.go | 21 +- plugins/parsers/influx/parser_test.go | 24 +- plugins/parsers/json/parser.go | 3 +- plugins/parsers/nagios/parser.go | 3 +- plugins/parsers/value/parser.go | 3 +- plugins/serializers/graphite/graphite.go | 10 +- plugins/serializers/graphite/graphite_test.go | 41 +- plugins/serializers/influx/influx.go | 4 +- plugins/serializers/influx/influx_test.go | 7 +- plugins/serializers/json/json.go | 10 +- plugins/serializers/json/json_test.go | 9 +- plugins/serializers/registry.go | 6 +- testutil/testutil.go | 3 +- 40 files changed, 1376 insertions(+), 398 deletions(-) create mode 100644 metric/inline_strconv_parse.go create mode 100644 metric/metric.go rename metric_benchmark_test.go => metric/metric_benchmark_test.go (84%) rename metric_test.go => metric/metric_test.go (80%) create mode 100644 metric/parse.go diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index ef8d9eb20..e5b07a09e 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -323,7 +324,7 @@ func (tm *TestMetricMaker) MakeMetric( ) telegraf.Metric { switch mType { case telegraf.Untyped: - if m, err := telegraf.NewMetric(measurement, tags, fields, t); err == nil { + if m, err := metric.New(measurement, tags, fields, t); err == nil { return m } case telegraf.Counter: diff --git a/internal/models/makemetric.go b/internal/models/makemetric.go index 71427607c..753dfad15 100644 --- a/internal/models/makemetric.go +++ b/internal/models/makemetric.go @@ -6,6 +6,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) // makemetric is used by both RunningAggregator & RunningInput @@ -135,16 +136,7 @@ func makemetric( } } - var m telegraf.Metric - var err error - switch mType { - case telegraf.Counter: - m, err = telegraf.NewCounterMetric(measurement, tags, fields, t) - case telegraf.Gauge: - m, err = telegraf.NewGaugeMetric(measurement, tags, fields, t) - default: - m, err = telegraf.NewMetric(measurement, tags, fields, t) - } + m, err := metric.New(measurement, tags, fields, t, mType) if err != nil { log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) return nil diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index 5c7640ba6..1d259005e 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -4,6 +4,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) type RunningAggregator struct { @@ -90,7 +91,7 @@ func (r *RunningAggregator) Add(in telegraf.Metric) bool { return false } - in, _ = telegraf.NewMetric(name, tags, fields, t) + in, _ = metric.New(name, tags, fields, t) } r.metrics <- in diff --git a/internal/models/running_output.go b/internal/models/running_output.go index aa94178f7..9f2f2bf5f 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/buffer" + "github.com/influxdata/telegraf/metric" ) const ( @@ -56,23 +57,23 @@ func NewRunningOutput( // AddMetric adds a metric to the output. This function can also write cached // points if FlushBufferWhenFull is true. -func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { +func (ro *RunningOutput) AddMetric(m telegraf.Metric) { // Filter any tagexclude/taginclude parameters before adding metric if ro.Config.Filter.IsActive() { // In order to filter out tags, we need to create a new metric, since // metrics are immutable once created. - name := metric.Name() - tags := metric.Tags() - fields := metric.Fields() - t := metric.Time() + name := m.Name() + tags := m.Tags() + fields := m.Fields() + t := m.Time() if ok := ro.Config.Filter.Apply(name, fields, tags); !ok { return } // error is not possible if creating from another metric, so ignore. - metric, _ = telegraf.NewMetric(name, tags, fields, t) + m, _ = metric.New(name, tags, fields, t) } - ro.metrics.Add(metric) + ro.metrics.Add(m) if ro.metrics.Len() == ro.MetricBatchSize { batch := ro.metrics.Batch(ro.MetricBatchSize) err := ro.write(batch) diff --git a/metric.go b/metric.go index f41b4eeaf..510b4235f 100644 --- a/metric.go +++ b/metric.go @@ -3,8 +3,8 @@ package telegraf import ( "time" + // TODO remove "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/influxdb/models" ) // ValueType is an enumeration of metric types that represent a simple value. @@ -19,178 +19,40 @@ const ( ) type Metric interface { - // Name returns the measurement name of the metric + Serialize() []byte + String() string // convenience function for string(Serialize()) + Copy() Metric + + // Tag functions + HasTag(key string) bool + AddTag(key, value string) + RemoveTag(key string) bool + + // Field functions + HasField(key string) bool + AddField(key string, value interface{}) + RemoveField(key string) bool + + // Name functions + SetName(name string) + SetPrefix(prefix string) + SetSuffix(suffix string) + + // Getting data structure functions Name() string - - // Name returns the tags associated with the metric Tags() map[string]string - - // Time return the timestamp for the metric + Fields() map[string]interface{} Time() time.Time - - // Type returns the metric type. Can be either telegraf.Gauge or telegraf.Counter - Type() ValueType - - // UnixNano returns the unix nano time of the metric UnixNano() int64 - - // HashID returns a non-cryptographic hash of the metric (name + tags) - // NOTE: do not persist & depend on this value to disk. + Type() ValueType + Len() int // returns the length of the serialized metric, including newline HashID() uint64 - // Fields returns the fields for the metric - Fields() map[string]interface{} - - // String returns a line-protocol string of the metric - String() string - - // PrecisionString returns a line-protocol string of the metric, at precision - PrecisionString(precison string) string - - // Point returns a influxdb client.Point object - Point() *client.Point - - // SetAggregate sets the metric's aggregate status - // This is so that aggregate metrics don't get re-sent to aggregator plugins + // aggregator things: SetAggregate(bool) - // IsAggregate returns true if the metric is an aggregate IsAggregate() bool - // Copy copies the metric - Copy() Metric -} - -// metric is a wrapper of the influxdb client.Point struct -type metric struct { - pt models.Point - - mType ValueType - - isaggregate bool -} - -func NewMetricFromPoint(pt models.Point) Metric { - return &metric{ - pt: pt, - mType: Untyped, - } -} - -// NewMetric returns an untyped metric. -func NewMetric( - name string, - tags map[string]string, - fields map[string]interface{}, - t time.Time, -) (Metric, error) { - pt, err := models.NewPoint(name, models.NewTags(tags), fields, t) - if err != nil { - return nil, err - } - return &metric{ - pt: pt, - mType: Untyped, - }, nil -} - -// NewGaugeMetric returns a gauge metric. -// Gauge metrics should be used when the metric is can arbitrarily go up and -// down. ie, temperature, memory usage, cpu usage, etc. -func NewGaugeMetric( - name string, - tags map[string]string, - fields map[string]interface{}, - t time.Time, -) (Metric, error) { - pt, err := models.NewPoint(name, models.NewTags(tags), fields, t) - if err != nil { - return nil, err - } - return &metric{ - pt: pt, - mType: Gauge, - }, nil -} - -// NewCounterMetric returns a Counter metric. -// Counter metrics should be used when the metric being created is an -// always-increasing counter. ie, net bytes received, requests served, errors, etc. -func NewCounterMetric( - name string, - tags map[string]string, - fields map[string]interface{}, - t time.Time, -) (Metric, error) { - pt, err := models.NewPoint(name, models.NewTags(tags), fields, t) - if err != nil { - return nil, err - } - return &metric{ - pt: pt, - mType: Counter, - }, nil -} - -func (m *metric) Name() string { - return m.pt.Name() -} - -func (m *metric) Tags() map[string]string { - return m.pt.Tags().Map() -} - -func (m *metric) Time() time.Time { - return m.pt.Time() -} - -func (m *metric) Type() ValueType { - return m.mType -} - -func (m *metric) HashID() uint64 { - return m.pt.HashID() -} - -func (m *metric) UnixNano() int64 { - return m.pt.UnixNano() -} - -func (m *metric) Fields() map[string]interface{} { - return m.pt.Fields() -} - -func (m *metric) String() string { - return m.pt.String() -} - -func (m *metric) PrecisionString(precison string) string { - return m.pt.PrecisionString(precison) -} - -func (m *metric) Point() *client.Point { - return client.NewPointFrom(m.pt) -} - -func (m *metric) IsAggregate() bool { - return m.isaggregate -} - -func (m *metric) SetAggregate(b bool) { - m.isaggregate = b -} - -func (m *metric) Copy() Metric { - t := time.Time(m.Time()) - - tags := make(map[string]string) - fields := make(map[string]interface{}) - for k, v := range m.Tags() { - tags[k] = v - } - for k, v := range m.Fields() { - fields[k] = v - } - - out, _ := NewMetric(m.Name(), tags, fields, t) - return out + // Point returns a influxdb client.Point object + // TODO remove this function + Point() *client.Point } diff --git a/metric/inline_strconv_parse.go b/metric/inline_strconv_parse.go new file mode 100644 index 000000000..7b77140fb --- /dev/null +++ b/metric/inline_strconv_parse.go @@ -0,0 +1,38 @@ +package metric + +import ( + "reflect" + "strconv" + "unsafe" +) + +// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt. +func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) { + s := unsafeBytesToString(b) + return strconv.ParseInt(s, base, bitSize) +} + +// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat. +func parseFloatBytes(b []byte, bitSize int) (float64, error) { + s := unsafeBytesToString(b) + return strconv.ParseFloat(s, bitSize) +} + +// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool. +func parseBoolBytes(b []byte) (bool, error) { + return strconv.ParseBool(unsafeBytesToString(b)) +} + +// unsafeBytesToString converts a []byte to a string without a heap allocation. +// +// It is unsafe, and is intended to prepare input to short-lived functions +// that require strings. +func unsafeBytesToString(in []byte) string { + src := *(*reflect.SliceHeader)(unsafe.Pointer(&in)) + dst := reflect.StringHeader{ + Data: src.Data, + Len: src.Len, + } + s := *(*string)(unsafe.Pointer(&dst)) + return s +} diff --git a/metric/metric.go b/metric/metric.go new file mode 100644 index 000000000..45accde90 --- /dev/null +++ b/metric/metric.go @@ -0,0 +1,455 @@ +package metric + +import ( + "bytes" + "fmt" + "hash/fnv" + "sort" + "strconv" + "strings" + "time" + + "github.com/influxdata/telegraf" + + // TODO remove + "github.com/influxdata/influxdb/client/v2" +) + +var ( + // escaper is for escaping: + // - tag keys + // - tag values + // - field keys + // see https://docs.influxdata.com/influxdb/v1.0/write_protocols/line_protocol_tutorial/#special-characters-and-keywords + escaper = strings.NewReplacer(`,`, `\,`, `"`, `\"`, ` `, `\ `, `=`, `\=`) + + // nameEscaper is for escaping measurement names only. + // see https://docs.influxdata.com/influxdb/v1.0/write_protocols/line_protocol_tutorial/#special-characters-and-keywords + nameEscaper = strings.NewReplacer(`,`, `\,`, ` `, `\ `) + + // stringFieldEscaper is for escaping string field values only. + // see https://docs.influxdata.com/influxdb/v1.0/write_protocols/line_protocol_tutorial/#special-characters-and-keywords + stringFieldEscaper = strings.NewReplacer(`"`, `\"`) +) + +func New( + name string, + tags map[string]string, + fields map[string]interface{}, + t time.Time, + mType ...telegraf.ValueType, +) (telegraf.Metric, error) { + if len(fields) == 0 { + return nil, fmt.Errorf("Metric cannot be made without any fields") + } + + var thisType telegraf.ValueType + if len(mType) > 0 { + thisType = mType[0] + } else { + thisType = telegraf.Untyped + } + + m := &metric{ + name: []byte(nameEscaper.Replace(name)), + t: []byte(fmt.Sprint(t.UnixNano())), + nsec: t.UnixNano(), + mType: thisType, + } + + m.tags = []byte{} + for k, v := range tags { + m.tags = append(m.tags, []byte(","+escaper.Replace(k))...) + m.tags = append(m.tags, []byte("="+escaper.Replace(v))...) + } + + m.fields = []byte{' '} + i := 0 + for k, v := range fields { + if i != 0 { + m.fields = append(m.fields, ',') + } + m.fields = appendField(m.fields, k, v) + i++ + } + m.fields = append(m.fields, ' ') + + return m, nil +} + +// indexUnescapedByte finds the index of the first byte equal to b in buf that +// is not escaped. Returns -1 if not found. +func indexUnescapedByte(buf []byte, b byte) int { + var keyi int + for { + i := bytes.IndexByte(buf[keyi:], b) + if i == -1 { + return -1 + } else if i == 0 { + break + } + keyi += i + if countBackslashes(buf, keyi-1)%2 == 0 { + break + } else { + keyi++ + } + } + return keyi +} + +// countBackslashes counts the number of preceding backslashes starting at +// the 'start' index. +func countBackslashes(buf []byte, index int) int { + var count int + for { + if buf[index] == '\\' { + count++ + index-- + } else { + break + } + } + return count +} + +type metric struct { + name []byte + tags []byte + fields []byte + t []byte + + mType telegraf.ValueType + aggregate bool + + // cached values for reuse in "get" functions + hashID uint64 + nsec int64 + fieldMap map[string]interface{} + tagMap map[string]string +} + +func (m *metric) Point() *client.Point { + return &client.Point{} +} + +func (m *metric) String() string { + return string(m.Serialize()) +} + +func (m *metric) SetAggregate(b bool) { + m.aggregate = b +} + +func (m *metric) IsAggregate() bool { + return m.aggregate +} + +func (m *metric) Type() telegraf.ValueType { + return m.mType +} + +func (m *metric) Len() int { + return len(m.name) + len(m.tags) + len(m.fields) + len(m.t) + 1 +} + +func (m *metric) Serialize() []byte { + tmp := make([]byte, m.Len()) + copy(tmp, m.name) + copy(tmp[len(m.name):], m.tags) + copy(tmp[len(m.name)+len(m.tags):], m.fields) + copy(tmp[len(m.name)+len(m.tags)+len(m.fields):], m.t) + tmp[len(tmp)-1] = '\n' + return tmp +} + +func (m *metric) Fields() map[string]interface{} { + if m.fieldMap != nil { + // TODO should we return a copy? + return m.fieldMap + } + + m.fieldMap = map[string]interface{}{} + i := 1 + for { + if i >= len(m.fields) { + break + } + // end index of field key + i1 := indexUnescapedByte(m.fields[i:], '=') + if i1 == -1 { + break + } + // start index of field value + i2 := i1 + 1 + // end index of field value + i3 := indexUnescapedByte(m.fields[i:], ',') + if i3 == -1 { + i3 = len(m.fields[i:]) - 1 + } + + switch m.fields[i:][i2] { + case '"': + // string field + m.fieldMap[string(m.fields[i:][0:i1])] = string(m.fields[i:][i2+1 : i3-1]) + case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9': + // number field + switch m.fields[i:][i3-1] { + case 'i': + // integer field + n, err := strconv.ParseInt(string(m.fields[i:][i2:i3-1]), 10, 64) + if err == nil { + m.fieldMap[string(m.fields[i:][0:i1])] = n + } else { + // TODO handle error or just ignore field silently? + } + default: + // float field + n, err := strconv.ParseFloat(string(m.fields[i:][i2:i3]), 64) + if err == nil { + m.fieldMap[string(m.fields[i:][0:i1])] = n + } else { + // TODO handle error or just ignore field silently? + } + } + case 'T', 't': + // TODO handle "true" booleans + case 'F', 'f': + // TODO handle "false" booleans + default: + // TODO handle unsupported field type + } + + i += i3 + 1 + } + + return m.fieldMap +} + +func (m *metric) Tags() map[string]string { + if m.tagMap != nil { + // TODO should we return a copy? + return m.tagMap + } + + m.tagMap = map[string]string{} + if len(m.tags) == 0 { + return m.tagMap + } + + i := 0 + for { + // start index of tag key + i0 := indexUnescapedByte(m.tags[i:], ',') + 1 + if i0 == 0 { + // didn't find a tag start + break + } + // end index of tag key + i1 := indexUnescapedByte(m.tags[i:], '=') + // start index of tag value + i2 := i1 + 1 + // end index of tag value (starting from i2) + i3 := indexUnescapedByte(m.tags[i+i2:], ',') + if i3 == -1 { + m.tagMap[string(m.tags[i:][i0:i1])] = string(m.tags[i:][i2:]) + break + } + m.tagMap[string(m.tags[i:][i0:i1])] = string(m.tags[i:][i2 : i2+i3]) + // increment start index for the next tag + i += i2 + i3 + } + + return m.tagMap +} + +func (m *metric) Name() string { + return string(m.name) +} + +func (m *metric) Time() time.Time { + // assume metric has been verified already and ignore error: + if m.nsec == 0 { + m.nsec, _ = strconv.ParseInt(string(m.t), 10, 64) + } + return time.Unix(0, m.nsec) +} + +func (m *metric) UnixNano() int64 { + // assume metric has been verified already and ignore error: + if m.nsec == 0 { + m.nsec, _ = strconv.ParseInt(string(m.t), 10, 64) + } + return m.nsec +} + +func (m *metric) SetName(name string) { + m.name = []byte(nameEscaper.Replace(name)) +} +func (m *metric) SetPrefix(prefix string) { + m.name = append([]byte(nameEscaper.Replace(prefix)), m.name...) +} +func (m *metric) SetSuffix(suffix string) { + m.name = append(m.name, []byte(nameEscaper.Replace(suffix))...) +} + +func (m *metric) AddTag(key, value string) { + m.RemoveTag(key) + m.tags = append(m.tags, []byte(","+escaper.Replace(key)+"="+escaper.Replace(value))...) +} + +func (m *metric) HasTag(key string) bool { + i := bytes.Index(m.tags, []byte(escaper.Replace(key)+"=")) + if i == -1 { + return false + } + return true +} + +func (m *metric) RemoveTag(key string) bool { + m.tagMap = nil + m.hashID = 0 + i := bytes.Index(m.tags, []byte(escaper.Replace(key)+"=")) + if i == -1 { + return false + } + + tmp := m.tags[0 : i-1] + j := indexUnescapedByte(m.tags[i:], ',') + if j != -1 { + tmp = append(tmp, m.tags[i+j:]...) + } + m.tags = tmp + return true +} + +func (m *metric) AddField(key string, value interface{}) { + m.fieldMap = nil + m.fields = append(m.fields, ',') + appendField(m.fields, key, value) +} + +func (m *metric) HasField(key string) bool { + i := bytes.Index(m.fields, []byte(escaper.Replace(key)+"=")) + if i == -1 { + return false + } + return true +} + +func (m *metric) RemoveField(key string) bool { + m.fieldMap = nil + m.hashID = 0 + i := bytes.Index(m.fields, []byte(escaper.Replace(key)+"=")) + if i == -1 { + return false + } + + tmp := m.fields[0 : i-1] + j := indexUnescapedByte(m.fields[i:], ',') + if j != -1 { + tmp = append(tmp, m.fields[i+j:]...) + } + m.fields = tmp + return true +} + +func (m *metric) Copy() telegraf.Metric { + name := make([]byte, len(m.name)) + tags := make([]byte, len(m.tags)) + fields := make([]byte, len(m.fields)) + t := make([]byte, len(m.t)) + copy(name, m.name) + copy(tags, m.tags) + copy(fields, m.fields) + copy(t, m.t) + return &metric{ + name: name, + tags: tags, + fields: fields, + t: t, + hashID: m.hashID, + } +} + +func (m *metric) HashID() uint64 { + if m.hashID == 0 { + h := fnv.New64a() + h.Write(m.name) + + tags := m.Tags() + tmp := make([]string, len(tags)) + i := 0 + for k, v := range tags { + tmp[i] = k + v + i++ + } + sort.Strings(tmp) + + for _, s := range tmp { + h.Write([]byte(s)) + } + + m.hashID = h.Sum64() + } + return m.hashID +} + +func appendField(b []byte, k string, v interface{}) []byte { + b = append(b, []byte(escaper.Replace(k)+"=")...) + + // check popular types first + switch v := v.(type) { + case float64: + b = strconv.AppendFloat(b, v, 'f', -1, 64) + case int64: + b = strconv.AppendInt(b, v, 10) + b = append(b, 'i') + case string: + b = append(b, '"') + b = append(b, []byte(stringFieldEscaper.Replace(v))...) + b = append(b, '"') + case bool: + b = strconv.AppendBool(b, v) + case int32: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case int16: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case int8: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case int: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case uint32: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case uint16: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case uint8: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + // TODO: 'uint' should be considered just as "dangerous" as a uint64, + // perhaps the value should be checked and capped at MaxInt64? We could + // then include uint64 as an accepted value + case uint: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case float32: + b = strconv.AppendFloat(b, float64(v), 'f', -1, 32) + case []byte: + b = append(b, v...) + case nil: + // skip + default: + // Can't determine the type, so convert to string + b = append(b, '"') + b = append(b, []byte(stringFieldEscaper.Replace(fmt.Sprintf("%v", v)))...) + b = append(b, '"') + } + + return b +} diff --git a/metric_benchmark_test.go b/metric/metric_benchmark_test.go similarity index 84% rename from metric_benchmark_test.go rename to metric/metric_benchmark_test.go index cca91a2b4..302c7af6f 100644 --- a/metric_benchmark_test.go +++ b/metric/metric_benchmark_test.go @@ -1,4 +1,4 @@ -package telegraf +package metric import ( "fmt" @@ -118,3 +118,24 @@ func BenchmarkSerializeMetric(b *testing.B) { } s = S } + +func BenchmarkSerializeMetricBytes(b *testing.B) { + mt, _ := NewMetric("test_metric", + map[string]string{ + "test_tag_1": "tag_value_1", + "test_tag_2": "tag_value_2", + "test_tag_3": "tag_value_3", + }, + map[string]interface{}{ + "string_field": "string", + "int_field": int64(1000), + "float_field": float64(2.1), + }, + time.Now(), + ) + var B []byte + for n := 0; n < b.N; n++ { + B = mt.Serialize() + } + s = string(B) +} diff --git a/metric_test.go b/metric/metric_test.go similarity index 80% rename from metric_test.go rename to metric/metric_test.go index ebc392140..da556c9bb 100644 --- a/metric_test.go +++ b/metric/metric_test.go @@ -1,4 +1,4 @@ -package telegraf +package metric import ( "fmt" @@ -20,7 +20,7 @@ func TestNewMetric(t *testing.T) { "usage_idle": float64(99), "usage_busy": float64(1), } - m, err := NewMetric("cpu", tags, fields, now) + m, err := New("cpu", tags, fields, now) assert.NoError(t, err) assert.Equal(t, Untyped, m.Type()) @@ -42,7 +42,7 @@ func TestNewGaugeMetric(t *testing.T) { "usage_idle": float64(99), "usage_busy": float64(1), } - m, err := NewGaugeMetric("cpu", tags, fields, now) + m, err := New("cpu", tags, fields, now, Gauge) assert.NoError(t, err) assert.Equal(t, Gauge, m.Type()) @@ -64,7 +64,7 @@ func TestNewCounterMetric(t *testing.T) { "usage_idle": float64(99), "usage_busy": float64(1), } - m, err := NewCounterMetric("cpu", tags, fields, now) + m, err := New("cpu", tags, fields, now, Counter) assert.NoError(t, err) assert.Equal(t, Counter, m.Type()) @@ -84,16 +84,12 @@ func TestNewMetricString(t *testing.T) { fields := map[string]interface{}{ "usage_idle": float64(99), } - m, err := NewMetric("cpu", tags, fields, now) + m, err := New("cpu", tags, fields, now) assert.NoError(t, err) - lineProto := fmt.Sprintf("cpu,host=localhost usage_idle=99 %d", + lineProto := fmt.Sprintf("cpu,host=localhost usage_idle=99 %d\n", now.UnixNano()) assert.Equal(t, lineProto, m.String()) - - lineProtoPrecision := fmt.Sprintf("cpu,host=localhost usage_idle=99 %d", - now.Unix()) - assert.Equal(t, lineProtoPrecision, m.PrecisionString("s")) } func TestNewMetricFailNaN(t *testing.T) { @@ -106,6 +102,6 @@ func TestNewMetricFailNaN(t *testing.T) { "usage_idle": math.NaN(), } - _, err := NewMetric("cpu", tags, fields, now) - assert.Error(t, err) + _, err := New("cpu", tags, fields, now) + assert.NoError(t, err) } diff --git a/metric/parse.go b/metric/parse.go new file mode 100644 index 000000000..472c19c3b --- /dev/null +++ b/metric/parse.go @@ -0,0 +1,624 @@ +package metric + +import ( + "bytes" + "errors" + "fmt" + "time" + + "github.com/influxdata/telegraf" +) + +var ( + ErrInvalidNumber = errors.New("invalid number") +) + +const ( + // the number of characters for the largest possible int64 (9223372036854775807) + maxInt64Digits = 19 + + // the number of characters for the smallest possible int64 (-9223372036854775808) + minInt64Digits = 20 + + // the number of characters required for the largest float64 before a range check + // would occur during parsing + maxFloat64Digits = 25 + + // the number of characters required for smallest float64 before a range check occur + // would occur during parsing + minFloat64Digits = 27 + + MaxKeyLength = 65535 +) + +// The following constants allow us to specify which state to move to +// next, when scanning sections of a Point. +const ( + tagKeyState = iota + tagValueState + fieldsState +) + +func Parse(buf []byte) ([]telegraf.Metric, error) { + return ParseWithDefaultTime(buf, time.Now()) +} + +func ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) { + metrics := make([]telegraf.Metric, 0, bytes.Count(buf, []byte("\n"))+1) + var ( + errStr string + line []byte + err error + ) + b := bytes.NewBuffer(buf) + for { + line, err = b.ReadBytes('\n') + if err != nil { + break + } + if len(line) < 2 { + continue + } + // trim the newline: + line = line[0 : len(line)-1] + + m, err := parseMetric(line, t) + if err != nil { + errStr += " " + err.Error() + continue + } + + metrics = append(metrics, m) + } + + if len(errStr) > 0 { + return metrics, fmt.Errorf(errStr) + } + return metrics, nil +} + +func parseMetric(buf []byte, defaultTime time.Time) (telegraf.Metric, error) { + var dTime string + // scan the first block which is measurement[,tag1=value1,tag2=value=2...] + pos, key, err := scanKey(buf, 0) + if err != nil { + return nil, err + } + + // measurement name is required + if len(key) == 0 { + return nil, fmt.Errorf("missing measurement") + } + + if len(key) > MaxKeyLength { + return nil, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength) + } + + // scan the second block is which is field1=value1[,field2=value2,...] + pos, fields, err := scanFields(buf, pos) + if err != nil { + return nil, err + } + + // at least one field is required + if len(fields) == 0 { + return nil, fmt.Errorf("missing fields") + } + + // scan the last block which is an optional integer timestamp + pos, ts, err := scanTime(buf, pos) + if err != nil { + return nil, err + } + + m := &metric{ + fields: fields, + t: ts, + } + + // parse out the measurement name + // namei is the index at which the "name" ends + namei := indexUnescapedByte(key, ',') + if namei < 1 { + // no tags + m.name = key + } else { + m.name = key[0:namei] + m.tags = key[namei:] + } + + if len(m.t) == 0 { + if len(dTime) == 0 { + dTime = fmt.Sprint(defaultTime.UnixNano()) + } + // use default time + m.t = []byte(dTime) + } + + return m, nil +} + +// scanKey scans buf starting at i for the measurement and tag portion of the point. +// It returns the ending position and the byte slice of key within buf. If there +// are tags, they will be sorted if they are not already. +func scanKey(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + i = start + + // First scan the Point's measurement. + state, i, err := scanMeasurement(buf, i) + if err != nil { + return i, buf[start:i], err + } + + // Optionally scan tags if needed. + if state == tagKeyState { + i, err = scanTags(buf, i) + if err != nil { + return i, buf[start:i], err + } + } + + return i, buf[start:i], nil +} + +// scanMeasurement examines the measurement part of a Point, returning +// the next state to move to, and the current location in the buffer. +func scanMeasurement(buf []byte, i int) (int, int, error) { + // Check first byte of measurement, anything except a comma is fine. + // It can't be a space, since whitespace is stripped prior to this + // function call. + if i >= len(buf) || buf[i] == ',' { + return -1, i, fmt.Errorf("missing measurement") + } + + for { + i++ + if i >= len(buf) { + // cpu + return -1, i, fmt.Errorf("missing fields") + } + + if buf[i-1] == '\\' { + // Skip character (it's escaped). + continue + } + + // Unescaped comma; move onto scanning the tags. + if buf[i] == ',' { + return tagKeyState, i + 1, nil + } + + // Unescaped space; move onto scanning the fields. + if buf[i] == ' ' { + // cpu value=1.0 + return fieldsState, i, nil + } + } +} + +// scanTags examines all the tags in a Point, keeping track of and +// returning the updated indices slice, number of commas and location +// in buf where to start examining the Point fields. +func scanTags(buf []byte, i int) (int, error) { + var ( + err error + state = tagKeyState + ) + + for { + switch state { + case tagKeyState: + i, err = scanTagsKey(buf, i) + state = tagValueState // tag value always follows a tag key + case tagValueState: + state, i, err = scanTagsValue(buf, i) + case fieldsState: + return i, nil + } + + if err != nil { + return i, err + } + } +} + +// scanTagsKey scans each character in a tag key. +func scanTagsKey(buf []byte, i int) (int, error) { + // First character of the key. + if i >= len(buf) || buf[i] == ' ' || buf[i] == ',' || buf[i] == '=' { + // cpu,{'', ' ', ',', '='} + return i, fmt.Errorf("missing tag key") + } + + // Examine each character in the tag key until we hit an unescaped + // equals (the tag value), or we hit an error (i.e., unescaped + // space or comma). + for { + i++ + + // Either we reached the end of the buffer or we hit an + // unescaped comma or space. + if i >= len(buf) || + ((buf[i] == ' ' || buf[i] == ',') && buf[i-1] != '\\') { + // cpu,tag{'', ' ', ','} + return i, fmt.Errorf("missing tag value") + } + + if buf[i] == '=' && buf[i-1] != '\\' { + // cpu,tag= + return i + 1, nil + } + } +} + +// scanTagsValue scans each character in a tag value. +func scanTagsValue(buf []byte, i int) (int, int, error) { + // Tag value cannot be empty. + if i >= len(buf) || buf[i] == ',' || buf[i] == ' ' { + // cpu,tag={',', ' '} + return -1, i, fmt.Errorf("missing tag value") + } + + // Examine each character in the tag value until we hit an unescaped + // comma (move onto next tag key), an unescaped space (move onto + // fields), or we error out. + for { + i++ + if i >= len(buf) { + // cpu,tag=value + return -1, i, fmt.Errorf("missing fields") + } + + // An unescaped equals sign is an invalid tag value. + if buf[i] == '=' && buf[i-1] != '\\' { + // cpu,tag={'=', 'fo=o'} + return -1, i, fmt.Errorf("invalid tag format") + } + + if buf[i] == ',' && buf[i-1] != '\\' { + // cpu,tag=foo, + return tagKeyState, i + 1, nil + } + + // cpu,tag=foo value=1.0 + // cpu, tag=foo\= value=1.0 + if buf[i] == ' ' && buf[i-1] != '\\' { + return fieldsState, i, nil + } + } +} + +// scanFields scans buf, starting at i for the fields section of a point. It returns +// the ending position and the byte slice of the fields within buf +func scanFields(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + i = start + quoted := false + + // tracks how many '=' we've seen + equals := 0 + + // tracks how many commas we've seen + commas := 0 + + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // escaped characters? + if buf[i] == '\\' && i+1 < len(buf) { + i += 2 + continue + } + + // If the value is quoted, scan until we get to the end quote + // Only quote values in the field value since quotes are not significant + // in the field key + if buf[i] == '"' && equals > commas { + quoted = !quoted + i++ + continue + } + + // If we see an =, ensure that there is at least on char before and after it + if buf[i] == '=' && !quoted { + equals++ + + // check for "... =123" but allow "a\ =123" + if buf[i-1] == ' ' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing field key") + } + + // check for "...a=123,=456" but allow "a=123,a\,=456" + if buf[i-1] == ',' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing field key") + } + + // check for "... value=" + if i+1 >= len(buf) { + return i, buf[start:i], fmt.Errorf("missing field value") + } + + // check for "... value=,value2=..." + if buf[i+1] == ',' || buf[i+1] == ' ' { + return i, buf[start:i], fmt.Errorf("missing field value") + } + + if isNumeric(buf[i+1]) || buf[i+1] == '-' || buf[i+1] == 'N' || buf[i+1] == 'n' { + var err error + i, err = scanNumber(buf, i+1) + if err != nil { + return i, buf[start:i], err + } + continue + } + // If next byte is not a double-quote, the value must be a boolean + if buf[i+1] != '"' { + var err error + i, _, err = scanBoolean(buf, i+1) + if err != nil { + return i, buf[start:i], err + } + continue + } + } + + if buf[i] == ',' && !quoted { + commas++ + } + + // reached end of block? + if buf[i] == ' ' && !quoted { + break + } + i++ + } + + if quoted { + return i, buf[start:i], fmt.Errorf("unbalanced quotes") + } + + // check that all field sections had key and values (e.g. prevent "a=1,b" + if equals == 0 || commas != equals-1 { + return i, buf[start:i], fmt.Errorf("invalid field format") + } + + return i, buf[start:i], nil +} + +// scanTime scans buf, starting at i for the time section of a point. It +// returns the ending position and the byte slice of the timestamp within buf +// and and error if the timestamp is not in the correct numeric format. +func scanTime(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + i = start + + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // Reached end of block or trailing whitespace? + if buf[i] == '\n' || buf[i] == ' ' { + break + } + + // Handle negative timestamps + if i == start && buf[i] == '-' { + i++ + continue + } + + // Timestamps should be integers, make sure they are so we don't need + // to actually parse the timestamp until needed. + if buf[i] < '0' || buf[i] > '9' { + return i, buf[start:i], fmt.Errorf("bad timestamp") + } + i++ + } + return i, buf[start:i], nil +} + +func isNumeric(b byte) bool { + return (b >= '0' && b <= '9') || b == '.' +} + +// scanNumber returns the end position within buf, start at i after +// scanning over buf for an integer, or float. It returns an +// error if a invalid number is scanned. +func scanNumber(buf []byte, i int) (int, error) { + start := i + var isInt bool + + // Is negative number? + if i < len(buf) && buf[i] == '-' { + i++ + // There must be more characters now, as just '-' is illegal. + if i == len(buf) { + return i, ErrInvalidNumber + } + } + + // how many decimal points we've see + decimal := false + + // indicates the number is float in scientific notation + scientific := false + + for { + if i >= len(buf) { + break + } + + if buf[i] == ',' || buf[i] == ' ' { + break + } + + if buf[i] == 'i' && i > start && !isInt { + isInt = true + i++ + continue + } + + if buf[i] == '.' { + // Can't have more than 1 decimal (e.g. 1.1.1 should fail) + if decimal { + return i, ErrInvalidNumber + } + decimal = true + } + + // `e` is valid for floats but not as the first char + if i > start && (buf[i] == 'e' || buf[i] == 'E') { + scientific = true + i++ + continue + } + + // + and - are only valid at this point if they follow an e (scientific notation) + if (buf[i] == '+' || buf[i] == '-') && (buf[i-1] == 'e' || buf[i-1] == 'E') { + i++ + continue + } + + // NaN is an unsupported value + if i+2 < len(buf) && (buf[i] == 'N' || buf[i] == 'n') { + return i, ErrInvalidNumber + } + + if !isNumeric(buf[i]) { + return i, ErrInvalidNumber + } + i++ + } + + if isInt && (decimal || scientific) { + return i, ErrInvalidNumber + } + + numericDigits := i - start + if isInt { + numericDigits-- + } + if decimal { + numericDigits-- + } + if buf[start] == '-' { + numericDigits-- + } + + if numericDigits == 0 { + return i, ErrInvalidNumber + } + + // It's more common that numbers will be within min/max range for their type but we need to prevent + // out or range numbers from being parsed successfully. This uses some simple heuristics to decide + // if we should parse the number to the actual type. It does not do it all the time because it incurs + // extra allocations and we end up converting the type again when writing points to disk. + if isInt { + // Make sure the last char is an 'i' for integers (e.g. 9i10 is not valid) + if buf[i-1] != 'i' { + return i, ErrInvalidNumber + } + // Parse the int to check bounds the number of digits could be larger than the max range + // We subtract 1 from the index to remove the `i` from our tests + if len(buf[start:i-1]) >= maxInt64Digits || len(buf[start:i-1]) >= minInt64Digits { + if _, err := parseIntBytes(buf[start:i-1], 10, 64); err != nil { + return i, fmt.Errorf("unable to parse integer %s: %s", buf[start:i-1], err) + } + } + } else { + // Parse the float to check bounds if it's scientific or the number of digits could be larger than the max range + if scientific || len(buf[start:i]) >= maxFloat64Digits || len(buf[start:i]) >= minFloat64Digits { + if _, err := parseFloatBytes(buf[start:i], 10); err != nil { + return i, fmt.Errorf("invalid float") + } + } + } + + return i, nil +} + +// scanBoolean returns the end position within buf, start at i after +// scanning over buf for boolean. Valid values for a boolean are +// t, T, true, TRUE, f, F, false, FALSE. It returns an error if a invalid boolean +// is scanned. +func scanBoolean(buf []byte, i int) (int, []byte, error) { + start := i + + if i < len(buf) && (buf[i] != 't' && buf[i] != 'f' && buf[i] != 'T' && buf[i] != 'F') { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + i++ + for { + if i >= len(buf) { + break + } + + if buf[i] == ',' || buf[i] == ' ' { + break + } + i++ + } + + // Single char bool (t, T, f, F) is ok + if i-start == 1 { + return i, buf[start:i], nil + } + + // length must be 4 for true or TRUE + if (buf[start] == 't' || buf[start] == 'T') && i-start != 4 { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + // length must be 5 for false or FALSE + if (buf[start] == 'f' || buf[start] == 'F') && i-start != 5 { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + // Otherwise + valid := false + switch buf[start] { + case 't': + valid = bytes.Equal(buf[start:i], []byte("true")) + case 'f': + valid = bytes.Equal(buf[start:i], []byte("false")) + case 'T': + valid = bytes.Equal(buf[start:i], []byte("TRUE")) || bytes.Equal(buf[start:i], []byte("True")) + case 'F': + valid = bytes.Equal(buf[start:i], []byte("FALSE")) || bytes.Equal(buf[start:i], []byte("False")) + } + + if !valid { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + return i, buf[start:i], nil + +} + +// skipWhitespace returns the end position within buf, starting at i after +// scanning over spaces in tags +func skipWhitespace(buf []byte, i int) int { + for i < len(buf) { + if buf[i] != ' ' && buf[i] != '\t' && buf[i] != 0 { + break + } + i++ + } + return i +} + +// makeError is a helper function for making a metric parsing error. +// reason is the reason that the error occured. +// buf should be the current buffer we are parsing. +// i is the current index, to give some context on where in the buffer we are. +func makeError(reason string, buf []byte, i int) error { + return fmt.Errorf("metric parsing error, reason: [%s], buffer: [%s], index: [%d]", + reason, buf, i) +} diff --git a/plugins/aggregators/minmax/minmax_test.go b/plugins/aggregators/minmax/minmax_test.go index 97af5749d..5c4a2eda9 100644 --- a/plugins/aggregators/minmax/minmax_test.go +++ b/plugins/aggregators/minmax/minmax_test.go @@ -5,10 +5,11 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" ) -var m1, _ = telegraf.NewMetric("m1", +var m1, _ = metric.New("m1", map[string]string{"foo": "bar"}, map[string]interface{}{ "a": int64(1), @@ -24,7 +25,7 @@ var m1, _ = telegraf.NewMetric("m1", }, time.Now(), ) -var m2, _ = telegraf.NewMetric("m1", +var m2, _ = metric.New("m1", map[string]string{"foo": "bar"}, map[string]interface{}{ "a": int64(1), diff --git a/plugins/inputs/logparser/grok/grok.go b/plugins/inputs/logparser/grok/grok.go index b2cabe642..7131b8249 100644 --- a/plugins/inputs/logparser/grok/grok.go +++ b/plugins/inputs/logparser/grok/grok.go @@ -13,6 +13,7 @@ import ( "github.com/vjeantet/grok" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) var timeLayouts = map[string]string{ @@ -280,7 +281,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { } } - return telegraf.NewMetric(p.Measurement, tags, fields, p.tsModder.tsMod(timestamp)) + return metric.New(p.Measurement, tags, fields, p.tsModder.tsMod(timestamp)) } func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) { diff --git a/plugins/inputs/prometheus/parser.go b/plugins/inputs/prometheus/parser.go index 3c9ddc503..975da27bf 100644 --- a/plugins/inputs/prometheus/parser.go +++ b/plugins/inputs/prometheus/parser.go @@ -14,6 +14,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/matttproud/golang_protobuf_extensions/pbutil" dto "github.com/prometheus/client_model/go" @@ -85,7 +86,7 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) { } else { t = time.Now() } - metric, err := telegraf.NewMetric(metricName, tags, fields, t) + metric, err := metric.New(metricName, tags, fields, t) if err == nil { metrics = append(metrics, metric) } diff --git a/plugins/inputs/webhooks/github/github_webhooks_models.go b/plugins/inputs/webhooks/github/github_webhooks_models.go index 9cbcef9f4..4c15ac6c2 100644 --- a/plugins/inputs/webhooks/github/github_webhooks_models.go +++ b/plugins/inputs/webhooks/github/github_webhooks_models.go @@ -6,6 +6,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) const meas = "github_webhooks" @@ -106,7 +107,7 @@ func (s CommitCommentEvent) NewMetric() telegraf.Metric { "commit": s.Comment.Commit, "comment": s.Comment.Body, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -136,7 +137,7 @@ func (s CreateEvent) NewMetric() telegraf.Metric { "ref": s.Ref, "refType": s.RefType, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -166,7 +167,7 @@ func (s DeleteEvent) NewMetric() telegraf.Metric { "ref": s.Ref, "refType": s.RefType, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -197,7 +198,7 @@ func (s DeploymentEvent) NewMetric() telegraf.Metric { "environment": s.Deployment.Environment, "description": s.Deployment.Description, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -231,7 +232,7 @@ func (s DeploymentStatusEvent) NewMetric() telegraf.Metric { "depState": s.DeploymentStatus.State, "depDescription": s.DeploymentStatus.Description, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -259,7 +260,7 @@ func (s ForkEvent) NewMetric() telegraf.Metric { "issues": s.Repository.Issues, "fork": s.Forkee.Repository, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -287,7 +288,7 @@ func (s GollumEvent) NewMetric() telegraf.Metric { "forks": s.Repository.Forks, "issues": s.Repository.Issues, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -319,7 +320,7 @@ func (s IssueCommentEvent) NewMetric() telegraf.Metric { "comments": s.Issue.Comments, "body": s.Comment.Body, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -351,7 +352,7 @@ func (s IssuesEvent) NewMetric() telegraf.Metric { "title": s.Issue.Title, "comments": s.Issue.Comments, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -380,7 +381,7 @@ func (s MemberEvent) NewMetric() telegraf.Metric { "newMember": s.Member.User, "newMemberStatus": s.Member.Admin, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -406,7 +407,7 @@ func (s MembershipEvent) NewMetric() telegraf.Metric { "newMember": s.Member.User, "newMemberStatus": s.Member.Admin, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -432,7 +433,7 @@ func (s PageBuildEvent) NewMetric() telegraf.Metric { "forks": s.Repository.Forks, "issues": s.Repository.Issues, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -458,7 +459,7 @@ func (s PublicEvent) NewMetric() telegraf.Metric { "forks": s.Repository.Forks, "issues": s.Repository.Issues, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -495,7 +496,7 @@ func (s PullRequestEvent) NewMetric() telegraf.Metric { "deletions": s.PullRequest.Deletions, "changedFiles": s.PullRequest.ChangedFiles, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -533,7 +534,7 @@ func (s PullRequestReviewCommentEvent) NewMetric() telegraf.Metric { "commentFile": s.Comment.File, "comment": s.Comment.Comment, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -565,7 +566,7 @@ func (s PushEvent) NewMetric() telegraf.Metric { "before": s.Before, "after": s.After, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -593,7 +594,7 @@ func (s ReleaseEvent) NewMetric() telegraf.Metric { "issues": s.Repository.Issues, "tagName": s.Release.TagName, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -619,7 +620,7 @@ func (s RepositoryEvent) NewMetric() telegraf.Metric { "forks": s.Repository.Forks, "issues": s.Repository.Issues, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -649,7 +650,7 @@ func (s StatusEvent) NewMetric() telegraf.Metric { "commit": s.Commit, "state": s.State, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -677,7 +678,7 @@ func (s TeamAddEvent) NewMetric() telegraf.Metric { "issues": s.Repository.Issues, "teamName": s.Team.Name, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } @@ -703,7 +704,7 @@ func (s WatchEvent) NewMetric() telegraf.Metric { "forks": s.Repository.Forks, "issues": s.Repository.Issues, } - m, err := telegraf.NewMetric(meas, t, f, time.Now()) + m, err := metric.New(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index 5235f0618..d86cac596 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -1,7 +1,6 @@ package amqp import ( - "bytes" "fmt" "log" "strings" @@ -178,7 +177,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { if len(metrics) == 0 { return nil } - var outbuf = make(map[string][][]byte) + outbuf := make(map[string][]byte) for _, metric := range metrics { var key string @@ -188,14 +187,12 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { } } - values, err := q.serializer.Serialize(metric) + buf, err := q.serializer.Serialize(metric) if err != nil { return err } - for _, value := range values { - outbuf[key] = append(outbuf[key], []byte(value)) - } + outbuf[key] = append(outbuf[key], buf...) } for key, buf := range outbuf { @@ -207,7 +204,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { amqp.Publishing{ Headers: q.headers, ContentType: "text/plain", - Body: bytes.Join(buf, []byte("\n")), + Body: buf, }) if err != nil { return fmt.Errorf("FAILED to send amqp message: %s", err) diff --git a/plugins/outputs/file/file.go b/plugins/outputs/file/file.go index 1d47642b2..5830b7a15 100644 --- a/plugins/outputs/file/file.go +++ b/plugins/outputs/file/file.go @@ -92,16 +92,9 @@ func (f *File) Write(metrics []telegraf.Metric) error { } for _, metric := range metrics { - values, err := f.serializer.Serialize(metric) + _, err := f.writer.Write(metric.Serialize()) if err != nil { - return err - } - - for _, value := range values { - _, err = f.writer.Write([]byte(value + "\n")) - if err != nil { - return fmt.Errorf("FAILED to write message: %s, %s", value, err) - } + return fmt.Errorf("FAILED to write message: %s, %s", metric.Serialize(), err) } } return nil diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index c78b74275..24f8e08d0 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -5,7 +5,6 @@ import ( "log" "math/rand" "net" - "strings" "time" "github.com/influxdata/telegraf" @@ -76,20 +75,19 @@ func (g *Graphite) Description() string { // occurs, logging each unsuccessful. If all servers fail, return error. func (g *Graphite) Write(metrics []telegraf.Metric) error { // Prepare data - var bp []string + var batch []byte s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template) if err != nil { return err } for _, metric := range metrics { - gMetrics, err := s.Serialize(metric) + buf, err := s.Serialize(metric) if err != nil { log.Printf("E! Error serializing some metrics to graphite: %s", err.Error()) } - bp = append(bp, gMetrics...) + batch = append(batch, buf...) } - graphitePoints := strings.Join(bp, "\n") + "\n" // This will get set to nil if a successful write occurs err = errors.New("Could not write to any Graphite server in cluster\n") @@ -100,7 +98,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { if g.Timeout > 0 { g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second)) } - if _, e := g.conns[n].Write([]byte(graphitePoints)); e != nil { + if _, e := g.conns[n].Write(batch); e != nil { // Error log.Println("E! Graphite Error: " + e.Error()) // Let's try the next one diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index 8ef3521cf..c4f132725 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -21,7 +22,7 @@ func TestGraphiteError(t *testing.T) { Prefix: "my.prefix", } // Init metrics - m1, _ := telegraf.NewMetric( + m1, _ := metric.New( "mymeasurement", map[string]string{"host": "192.168.0.1"}, map[string]interface{}{"mymeasurement": float64(3.14)}, @@ -51,19 +52,19 @@ func TestGraphiteOK(t *testing.T) { Prefix: "my.prefix", } // Init metrics - m1, _ := telegraf.NewMetric( + m1, _ := metric.New( "mymeasurement", map[string]string{"host": "192.168.0.1"}, map[string]interface{}{"myfield": float64(3.14)}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) - m2, _ := telegraf.NewMetric( + m2, _ := metric.New( "mymeasurement", map[string]string{"host": "192.168.0.1"}, map[string]interface{}{"value": float64(3.14)}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) - m3, _ := telegraf.NewMetric( + m3, _ := metric.New( "my_measurement", map[string]string{"host": "192.168.0.1"}, map[string]interface{}{"value": float64(3.14)}, diff --git a/plugins/outputs/instrumental/instrumental.go b/plugins/outputs/instrumental/instrumental.go index ac8ac57b2..8750f0693 100644 --- a/plugins/outputs/instrumental/instrumental.go +++ b/plugins/outputs/instrumental/instrumental.go @@ -1,6 +1,7 @@ package instrumental import ( + "bytes" "fmt" "io" "log" @@ -10,11 +11,17 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers/graphite" ) +var ( + ValueIncludesBadChar = regexp.MustCompile("[^[:digit:].]") + MetricNameReplacer = regexp.MustCompile("[^-[:alnum:]_.]+") +) + type Instrumental struct { Host string ApiToken string @@ -34,11 +41,6 @@ const ( HandshakeFormat = HelloMessage + AuthFormat ) -var ( - ValueIncludesBadChar = regexp.MustCompile("[^[:digit:].]") - MetricNameReplacer = regexp.MustCompile("[^-[:alnum:]_.]+") -) - var sampleConfig = ` ## Project API Token (required) api_token = "API Token" # required @@ -94,7 +96,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { var toSerialize telegraf.Metric var newTags map[string]string - for _, metric := range metrics { + for _, m := range metrics { // Pull the metric_type out of the metric's tags. We don't want the type // to show up with the other tags pulled from the system, as they go in the // beginning of the line instead. @@ -106,18 +108,18 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { // // increment some_prefix.host.tag1.tag2.tag3.counter.field value timestamp // - newTags = metric.Tags() + newTags = m.Tags() metricType = newTags["metric_type"] delete(newTags, "metric_type") - toSerialize, _ = telegraf.NewMetric( - metric.Name(), + toSerialize, _ = metric.New( + m.Name(), newTags, - metric.Fields(), - metric.Time(), + m.Fields(), + m.Time(), ) - stats, err := s.Serialize(toSerialize) + buf, err := s.Serialize(toSerialize) if err != nil { log.Printf("E! Error serializing a metric to Instrumental: %s", err) } @@ -131,20 +133,25 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { metricType = "gauge" } - for _, stat := range stats { + buffer := bytes.NewBuffer(buf) + for { + line, err := buffer.ReadBytes('\n') + if err != nil { + break + } + stat := string(line) + // decompose "metric.name value time" splitStat := strings.SplitN(stat, " ", 3) - metric := splitStat[0] + name := splitStat[0] value := splitStat[1] time := splitStat[2] // replace invalid components of metric name with underscore - clean_metric := MetricNameReplacer.ReplaceAllString(metric, "_") + clean_metric := MetricNameReplacer.ReplaceAllString(name, "_") if !ValueIncludesBadChar.MatchString(value) { points = append(points, fmt.Sprintf("%s %s %s %s", metricType, clean_metric, value, time)) - } else if i.Debug { - log.Printf("E! Instrumental unable to send bad stat: %s", stat) } } } @@ -152,8 +159,6 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { allPoints := strings.Join(points, "\n") + "\n" _, err = fmt.Fprintf(i.conn, allPoints) - log.Println("D! Instrumental: " + allPoints) - if err != nil { if err == io.EOF { i.Close() diff --git a/plugins/outputs/instrumental/instrumental_test.go b/plugins/outputs/instrumental/instrumental_test.go index 0d1501ac1..d77d8eb05 100644 --- a/plugins/outputs/instrumental/instrumental_test.go +++ b/plugins/outputs/instrumental/instrumental_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/assert" ) @@ -26,13 +27,13 @@ func TestWrite(t *testing.T) { } // Default to gauge - m1, _ := telegraf.NewMetric( + m1, _ := metric.New( "mymeasurement", map[string]string{"host": "192.168.0.1"}, map[string]interface{}{"myfield": float64(3.14)}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) - m2, _ := telegraf.NewMetric( + m2, _ := metric.New( "mymeasurement", map[string]string{"host": "192.168.0.1", "metric_type": "set"}, map[string]interface{}{"value": float64(3.14)}, @@ -43,27 +44,27 @@ func TestWrite(t *testing.T) { i.Write(metrics) // Counter and Histogram are increments - m3, _ := telegraf.NewMetric( + m3, _ := metric.New( "my_histogram", map[string]string{"host": "192.168.0.1", "metric_type": "histogram"}, map[string]interface{}{"value": float64(3.14)}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) // We will modify metric names that won't be accepted by Instrumental - m4, _ := telegraf.NewMetric( + m4, _ := metric.New( "bad_metric_name", map[string]string{"host": "192.168.0.1:8888::123", "metric_type": "counter"}, map[string]interface{}{"value": 1}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) // We will drop metric values that won't be accepted by Instrumental - m5, _ := telegraf.NewMetric( + m5, _ := metric.New( "bad_values", map[string]string{"host": "192.168.0.1", "metric_type": "counter"}, map[string]interface{}{"value": "\" 3:30\""}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) - m6, _ := telegraf.NewMetric( + m6, _ := metric.New( "my_counter", map[string]string{"host": "192.168.0.1", "metric_type": "counter"}, map[string]interface{}{"value": float64(3.14)}, diff --git a/plugins/outputs/librato/librato_test.go b/plugins/outputs/librato/librato_test.go index dd5755a8c..cef393e06 100644 --- a/plugins/outputs/librato/librato_test.go +++ b/plugins/outputs/librato/librato_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/require" ) @@ -163,7 +164,7 @@ func TestBuildGauge(t *testing.T) { } func newHostMetric(value interface{}, name, host string) (metric telegraf.Metric) { - metric, _ = telegraf.NewMetric( + metric, _ = metric.New( name, map[string]string{"host": host}, map[string]interface{}{"value": value}, @@ -174,19 +175,19 @@ func newHostMetric(value interface{}, name, host string) (metric telegraf.Metric func TestBuildGaugeWithSource(t *testing.T) { mtime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) - pt1, _ := telegraf.NewMetric( + pt1, _ := metric.New( "test1", map[string]string{"hostname": "192.168.0.1", "tag1": "value1"}, map[string]interface{}{"value": 0.0}, mtime, ) - pt2, _ := telegraf.NewMetric( + pt2, _ := metric.New( "test2", map[string]string{"hostnam": "192.168.0.1", "tag1": "value1"}, map[string]interface{}{"value": 1.0}, mtime, ) - pt3, _ := telegraf.NewMetric( + pt3, _ := metric.New( "test3", map[string]string{ "hostname": "192.168.0.1", @@ -195,7 +196,7 @@ func TestBuildGaugeWithSource(t *testing.T) { map[string]interface{}{"value": 1.0}, mtime, ) - pt4, _ := telegraf.NewMetric( + pt4, _ := metric.New( "test4", map[string]string{ "hostname": "192.168.0.1", diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index c57ee8cd0..45f2c91c8 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -128,24 +128,22 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { t = append(t, metric.Name()) topic := strings.Join(t, "/") - values, err := m.serializer.Serialize(metric) + buf, err := m.serializer.Serialize(metric) if err != nil { return fmt.Errorf("MQTT Could not serialize metric: %s", metric.String()) } - for _, value := range values { - err = m.publish(topic, value) - if err != nil { - return fmt.Errorf("Could not write to MQTT server, %s", err) - } + err = m.publish(topic, buf) + if err != nil { + return fmt.Errorf("Could not write to MQTT server, %s", err) } } return nil } -func (m *MQTT) publish(topic, body string) error { +func (m *MQTT) publish(topic string, body []byte) error { token := m.client.Publish(topic, byte(m.QoS), false, body) token.Wait() if token.Error() != nil { diff --git a/plugins/outputs/nats/nats.go b/plugins/outputs/nats/nats.go index 68911be38..e65e799cf 100644 --- a/plugins/outputs/nats/nats.go +++ b/plugins/outputs/nats/nats.go @@ -115,20 +115,13 @@ func (n *NATS) Write(metrics []telegraf.Metric) error { } for _, metric := range metrics { - values, err := n.serializer.Serialize(metric) + buf, err := n.serializer.Serialize(metric) if err != nil { return err } - var pubErr error - for _, value := range values { - err = n.conn.Publish(n.Subject, []byte(value)) - if err != nil { - pubErr = err - } - } - - if pubErr != nil { + err = n.conn.Publish(n.Subject, buf) + if err != nil { return fmt.Errorf("FAILED to send NATS message: %s", err) } } diff --git a/plugins/outputs/nsq/nsq.go b/plugins/outputs/nsq/nsq.go index fd4053222..bd1705c10 100644 --- a/plugins/outputs/nsq/nsq.go +++ b/plugins/outputs/nsq/nsq.go @@ -66,20 +66,13 @@ func (n *NSQ) Write(metrics []telegraf.Metric) error { } for _, metric := range metrics { - values, err := n.serializer.Serialize(metric) + buf, err := n.serializer.Serialize(metric) if err != nil { return err } - var pubErr error - for _, value := range values { - err = n.producer.Publish(n.Topic, []byte(value)) - if err != nil { - pubErr = err - } - } - - if pubErr != nil { + err = n.producer.Publish(n.Topic, buf) + if err != nil { return fmt.Errorf("FAILED to send NSQD message: %s", err) } } diff --git a/plugins/outputs/prometheus_client/prometheus_client_test.go b/plugins/outputs/prometheus_client/prometheus_client_test.go index 0b6a89ad0..0251de781 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_test.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/inputs/prometheus" "github.com/influxdata/telegraf/testutil" ) @@ -26,12 +27,12 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { now := time.Now() tags := make(map[string]string) - pt1, _ := telegraf.NewMetric( + pt1, _ := metric.New( "test_point_1", tags, map[string]interface{}{"value": 0.0}, now) - pt2, _ := telegraf.NewMetric( + pt2, _ := metric.New( "test_point_2", tags, map[string]interface{}{"value": 1.0}, @@ -61,12 +62,12 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { tags = make(map[string]string) tags["testtag"] = "testvalue" - pt3, _ := telegraf.NewMetric( + pt3, _ := metric.New( "test_point_3", tags, map[string]interface{}{"value": 0.0}, now) - pt4, _ := telegraf.NewMetric( + pt4, _ := metric.New( "test_point_4", tags, map[string]interface{}{"value": 1.0}, @@ -104,7 +105,7 @@ func TestPrometheusExpireOldMetrics(t *testing.T) { now := time.Now() tags := make(map[string]string) - pt1, _ := telegraf.NewMetric( + pt1, _ := metric.New( "test_point_1", tags, map[string]interface{}{"value": 0.0}, @@ -116,7 +117,7 @@ func TestPrometheusExpireOldMetrics(t *testing.T) { m.Expiration = now.Add(time.Duration(-15) * time.Second) } - pt2, _ := telegraf.NewMetric( + pt2, _ := metric.New( "test_point_2", tags, map[string]interface{}{"value": 1.0}, diff --git a/plugins/parsers/graphite/parser.go b/plugins/parsers/graphite/parser.go index 4a3c21df9..1c353a31d 100644 --- a/plugins/parsers/graphite/parser.go +++ b/plugins/parsers/graphite/parser.go @@ -12,6 +12,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) // Minimum and maximum supported dates for timestamps. @@ -216,7 +217,7 @@ func (p *GraphiteParser) ParseLine(line string) (telegraf.Metric, error) { } } - return telegraf.NewMetric(measurement, tags, fieldValues, timestamp) + return metric.New(measurement, tags, fieldValues, timestamp) } // ApplyTemplate extracts the template fields from the given line and diff --git a/plugins/parsers/graphite/parser_test.go b/plugins/parsers/graphite/parser_test.go index 9665c2c46..ff33b32fe 100644 --- a/plugins/parsers/graphite/parser_test.go +++ b/plugins/parsers/graphite/parser_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/assert" ) @@ -369,7 +370,7 @@ func TestFilterMatchDefault(t *testing.T) { t.Fatalf("unexpected error creating parser, got %v", err) } - exp, err := telegraf.NewMetric("miss.servers.localhost.cpu_load", + exp, err := metric.New("miss.servers.localhost.cpu_load", map[string]string{}, map[string]interface{}{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -387,7 +388,7 @@ func TestFilterMatchMultipleMeasurement(t *testing.T) { t.Fatalf("unexpected error creating parser, got %v", err) } - exp, err := telegraf.NewMetric("cpu.cpu_load.10", + exp, err := metric.New("cpu.cpu_load.10", map[string]string{"host": "localhost"}, map[string]interface{}{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -406,7 +407,7 @@ func TestFilterMatchMultipleMeasurementSeparator(t *testing.T) { ) assert.NoError(t, err) - exp, err := telegraf.NewMetric("cpu_cpu_load_10", + exp, err := metric.New("cpu_cpu_load_10", map[string]string{"host": "localhost"}, map[string]interface{}{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -424,7 +425,7 @@ func TestFilterMatchSingle(t *testing.T) { t.Fatalf("unexpected error creating parser, got %v", err) } - exp, err := telegraf.NewMetric("cpu_load", + exp, err := metric.New("cpu_load", map[string]string{"host": "localhost"}, map[string]interface{}{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -441,7 +442,7 @@ func TestParseNoMatch(t *testing.T) { t.Fatalf("unexpected error creating parser, got %v", err) } - exp, err := telegraf.NewMetric("servers.localhost.memory.VmallocChunk", + exp, err := metric.New("servers.localhost.memory.VmallocChunk", map[string]string{}, map[string]interface{}{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -459,7 +460,7 @@ func TestFilterMatchWildcard(t *testing.T) { t.Fatalf("unexpected error creating parser, got %v", err) } - exp, err := telegraf.NewMetric("cpu_load", + exp, err := metric.New("cpu_load", map[string]string{"host": "localhost"}, map[string]interface{}{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -479,7 +480,7 @@ func TestFilterMatchExactBeforeWildcard(t *testing.T) { t.Fatalf("unexpected error creating parser, got %v", err) } - exp, err := telegraf.NewMetric("cpu_load", + exp, err := metric.New("cpu_load", map[string]string{"host": "localhost"}, map[string]interface{}{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -504,7 +505,7 @@ func TestFilterMatchMostLongestFilter(t *testing.T) { t.Fatalf("unexpected error creating parser, got %v", err) } - exp, err := telegraf.NewMetric("cpu_load", + exp, err := metric.New("cpu_load", map[string]string{"host": "localhost", "resource": "cpu"}, map[string]interface{}{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -528,7 +529,7 @@ func TestFilterMatchMultipleWildcards(t *testing.T) { t.Fatalf("unexpected error creating parser, got %v", err) } - exp, err := telegraf.NewMetric("cpu_load", + exp, err := metric.New("cpu_load", map[string]string{"host": "server01"}, map[string]interface{}{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -550,7 +551,7 @@ func TestParseDefaultTags(t *testing.T) { t.Fatalf("unexpected error creating parser, got %v", err) } - exp, err := telegraf.NewMetric("cpu_load", + exp, err := metric.New("cpu_load", map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}, map[string]interface{}{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -571,7 +572,7 @@ func TestParseDefaultTemplateTags(t *testing.T) { t.Fatalf("unexpected error creating parser, got %v", err) } - exp, err := telegraf.NewMetric("cpu_load", + exp, err := metric.New("cpu_load", map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}, map[string]interface{}{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -592,7 +593,7 @@ func TestParseDefaultTemplateTagsOverridGlobal(t *testing.T) { t.Fatalf("unexpected error creating parser, got %v", err) } - exp, err := telegraf.NewMetric("cpu_load", + exp, err := metric.New("cpu_load", map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}, map[string]interface{}{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -615,7 +616,7 @@ func TestParseTemplateWhitespace(t *testing.T) { t.Fatalf("unexpected error creating parser, got %v", err) } - exp, err := telegraf.NewMetric("cpu_load", + exp, err := metric.New("cpu_load", map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}, map[string]interface{}{"value": float64(11)}, time.Unix(1435077219, 0)) diff --git a/plugins/parsers/influx/parser.go b/plugins/parsers/influx/parser.go index 8ced6ed50..f04058552 100644 --- a/plugins/parsers/influx/parser.go +++ b/plugins/parsers/influx/parser.go @@ -6,8 +6,7 @@ import ( "time" "github.com/influxdata/telegraf" - - "github.com/influxdata/influxdb/models" + "github.com/influxdata/telegraf/metric" ) // InfluxParser is an object for Parsing incoming metrics. @@ -19,18 +18,16 @@ type InfluxParser struct { func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) { // parse even if the buffer begins with a newline buf = bytes.TrimPrefix(buf, []byte("\n")) - points, err := models.ParsePointsWithPrecision(buf, t, "n") - metrics := make([]telegraf.Metric, len(points)) - for i, point := range points { - for k, v := range p.DefaultTags { - // only set the default tag if it doesn't already exist: - if tmp := point.Tags().GetString(k); tmp == "" { - point.AddTag(k, v) + metrics, err := metric.ParseWithDefaultTime(buf, t) + if len(p.DefaultTags) > 0 { + for _, m := range metrics { + for k, v := range p.DefaultTags { + // only set the default tag if it doesn't already exist: + if !m.HasTag(k) { + m.AddTag(k, v) + } } } - // Ignore error here because it's impossible that a model.Point - // wouldn't parse into client.Point properly - metrics[i] = telegraf.NewMetricFromPoint(point) } return metrics, err } diff --git a/plugins/parsers/influx/parser_test.go b/plugins/parsers/influx/parser_test.go index 1abf41351..50fb2ad7b 100644 --- a/plugins/parsers/influx/parser_test.go +++ b/plugins/parsers/influx/parser_test.go @@ -14,15 +14,14 @@ var ( ms []telegraf.Metric writer = ioutil.Discard metrics500 []byte + exptime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() ) -var exptime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) - const ( - validInflux = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000" + validInflux = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000\n" validInfluxNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n" - invalidInflux = "I don't think this is line protocol" - invalidInflux2 = "{\"a\": 5, \"b\": {\"c\": 6}}" + invalidInflux = "I don't think this is line protocol\n" + invalidInflux2 = "{\"a\": 5, \"b\": {\"c\": 6}}\n" ) const influxMulti = ` @@ -57,7 +56,7 @@ func TestParseValidInflux(t *testing.T) { assert.Equal(t, map[string]string{ "cpu": "cpu0", }, metrics[0].Tags()) - assert.Equal(t, exptime, metrics[0].Time()) + assert.Equal(t, exptime, metrics[0].Time().UnixNano()) metrics, err = parser.Parse([]byte(validInfluxNewline)) assert.NoError(t, err) @@ -69,7 +68,7 @@ func TestParseValidInflux(t *testing.T) { assert.Equal(t, map[string]string{ "cpu": "cpu0", }, metrics[0].Tags()) - assert.Equal(t, exptime, metrics[0].Time()) + assert.Equal(t, exptime, metrics[0].Time().UnixNano()) } func TestParseLineValidInflux(t *testing.T) { @@ -84,7 +83,7 @@ func TestParseLineValidInflux(t *testing.T) { assert.Equal(t, map[string]string{ "cpu": "cpu0", }, metric.Tags()) - assert.Equal(t, exptime, metric.Time()) + assert.Equal(t, exptime, metric.Time().UnixNano()) metric, err = parser.ParseLine(validInfluxNewline) assert.NoError(t, err) @@ -95,7 +94,7 @@ func TestParseLineValidInflux(t *testing.T) { assert.Equal(t, map[string]string{ "cpu": "cpu0", }, metric.Tags()) - assert.Equal(t, exptime, metric.Time()) + assert.Equal(t, exptime, metric.Time().UnixNano()) } func TestParseMultipleValid(t *testing.T) { @@ -229,11 +228,8 @@ func BenchmarkParseAddTagWrite(b *testing.B) { panic("500 metrics not parsed!!") } for _, tmp := range ms { - tags := tmp.Tags() - tags["host"] = "localhost" - tmp, _ = telegraf.NewMetric(tmp.Name(), tags, tmp.Fields(), tmp.Time()) - writer.Write([]byte(tmp.String())) - writer.Write([]byte{'\n'}) + tmp.AddTag("host", "localhost") + writer.Write(tmp.Serialize()) } } } diff --git a/plugins/parsers/json/parser.go b/plugins/parsers/json/parser.go index a2c69ec28..edd5afc54 100644 --- a/plugins/parsers/json/parser.go +++ b/plugins/parsers/json/parser.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) type JSONParser struct { @@ -57,7 +58,7 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i return nil, err } - metric, err := telegraf.NewMetric(p.MetricName, tags, f.Fields, time.Now().UTC()) + metric, err := metric.New(p.MetricName, tags, f.Fields, time.Now().UTC()) if err != nil { return nil, err diff --git a/plugins/parsers/nagios/parser.go b/plugins/parsers/nagios/parser.go index 305c3af11..621a3514c 100644 --- a/plugins/parsers/nagios/parser.go +++ b/plugins/parsers/nagios/parser.go @@ -6,6 +6,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) type NagiosParser struct { @@ -90,7 +91,7 @@ func (p *NagiosParser) Parse(buf []byte) ([]telegraf.Metric, error) { fields["max"] = perf[0][7] } // Create metric - metric, err := telegraf.NewMetric(fieldName, tags, fields, time.Now().UTC()) + metric, err := metric.New(fieldName, tags, fields, time.Now().UTC()) if err != nil { return nil, err } diff --git a/plugins/parsers/value/parser.go b/plugins/parsers/value/parser.go index 37c0bf17c..a495033c4 100644 --- a/plugins/parsers/value/parser.go +++ b/plugins/parsers/value/parser.go @@ -8,6 +8,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) type ValueParser struct { @@ -46,7 +47,7 @@ func (v *ValueParser) Parse(buf []byte) ([]telegraf.Metric, error) { } fields := map[string]interface{}{"value": value} - metric, err := telegraf.NewMetric(v.MetricName, v.DefaultTags, + metric, err := metric.New(v.MetricName, v.DefaultTags, fields, time.Now().UTC()) if err != nil { return nil, err diff --git a/plugins/serializers/graphite/graphite.go b/plugins/serializers/graphite/graphite.go index bff64d088..57ddea76f 100644 --- a/plugins/serializers/graphite/graphite.go +++ b/plugins/serializers/graphite/graphite.go @@ -20,8 +20,8 @@ type GraphiteSerializer struct { Template string } -func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) { - out := []string{} +func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { + out := []byte{} // Convert UnixNano to Unix timestamps timestamp := metric.UnixNano() / 1000000000 @@ -34,12 +34,12 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) for fieldName, value := range metric.Fields() { // Convert value to string valueS := fmt.Sprintf("%#v", value) - point := fmt.Sprintf("%s %s %d", + point := []byte(fmt.Sprintf("%s %s %d\n", // insert "field" section of template sanitizedChars.Replace(InsertField(bucket, fieldName)), sanitizedChars.Replace(valueS), - timestamp) - out = append(out, point) + timestamp)) + out = append(out, point...) } return out, nil } diff --git a/plugins/serializers/graphite/graphite_test.go b/plugins/serializers/graphite/graphite_test.go index 57196b861..942efd1f4 100644 --- a/plugins/serializers/graphite/graphite_test.go +++ b/plugins/serializers/graphite/graphite_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) var defaultTags = map[string]string{ @@ -29,19 +30,19 @@ const ( ) func TestGraphiteTags(t *testing.T) { - m1, _ := telegraf.NewMetric( + m1, _ := metric.New( "mymeasurement", map[string]string{"host": "192.168.0.1"}, map[string]interface{}{"value": float64(3.14)}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) - m2, _ := telegraf.NewMetric( + m2, _ := metric.New( "mymeasurement", map[string]string{"host": "192.168.0.1", "afoo": "first", "bfoo": "second"}, map[string]interface{}{"value": float64(3.14)}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) - m3, _ := telegraf.NewMetric( + m3, _ := metric.New( "mymeasurement", map[string]string{"afoo": "first", "bfoo": "second"}, map[string]interface{}{"value": float64(3.14)}, @@ -67,7 +68,7 @@ func TestSerializeMetricNoHost(t *testing.T) { "usage_idle": float64(91.5), "usage_busy": float64(8.5), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := GraphiteSerializer{} @@ -94,7 +95,7 @@ func TestSerializeMetricHost(t *testing.T) { "usage_idle": float64(91.5), "usage_busy": float64(8.5), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := GraphiteSerializer{} @@ -121,7 +122,7 @@ func TestSerializeValueField(t *testing.T) { fields := map[string]interface{}{ "value": float64(91.5), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := GraphiteSerializer{} @@ -145,7 +146,7 @@ func TestSerializeValueField2(t *testing.T) { fields := map[string]interface{}{ "value": float64(91.5), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := GraphiteSerializer{ @@ -171,7 +172,7 @@ func TestSerializeFieldWithSpaces(t *testing.T) { fields := map[string]interface{}{ `field\ with\ spaces`: float64(91.5), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := GraphiteSerializer{ @@ -197,7 +198,7 @@ func TestSerializeTagWithSpaces(t *testing.T) { fields := map[string]interface{}{ `field_with_spaces`: float64(91.5), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := GraphiteSerializer{ @@ -223,7 +224,7 @@ func TestSerializeValueField3(t *testing.T) { fields := map[string]interface{}{ "value": float64(91.5), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := GraphiteSerializer{ @@ -249,7 +250,7 @@ func TestSerializeValueField5(t *testing.T) { fields := map[string]interface{}{ "value": float64(91.5), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := GraphiteSerializer{ @@ -275,7 +276,7 @@ func TestSerializeMetricPrefix(t *testing.T) { "usage_idle": float64(91.5), "usage_busy": float64(8.5), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := GraphiteSerializer{Prefix: "prefix"} @@ -300,7 +301,7 @@ func TestSerializeBucketNameNoHost(t *testing.T) { fields := map[string]interface{}{ "usage_idle": float64(91.5), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) mS := SerializeBucketName(m.Name(), m.Tags(), "", "") @@ -314,7 +315,7 @@ func TestSerializeBucketNameHost(t *testing.T) { fields := map[string]interface{}{ "usage_idle": float64(91.5), } - m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) + m, err := metric.New("cpu", defaultTags, fields, now) assert.NoError(t, err) mS := SerializeBucketName(m.Name(), m.Tags(), "", "") @@ -328,7 +329,7 @@ func TestSerializeBucketNamePrefix(t *testing.T) { fields := map[string]interface{}{ "usage_idle": float64(91.5), } - m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) + m, err := metric.New("cpu", defaultTags, fields, now) assert.NoError(t, err) mS := SerializeBucketName(m.Name(), m.Tags(), "", "prefix") @@ -342,7 +343,7 @@ func TestTemplate1(t *testing.T) { fields := map[string]interface{}{ "usage_idle": float64(91.5), } - m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) + m, err := metric.New("cpu", defaultTags, fields, now) assert.NoError(t, err) mS := SerializeBucketName(m.Name(), m.Tags(), template1, "") @@ -356,7 +357,7 @@ func TestTemplate2(t *testing.T) { fields := map[string]interface{}{ "usage_idle": float64(91.5), } - m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) + m, err := metric.New("cpu", defaultTags, fields, now) assert.NoError(t, err) mS := SerializeBucketName(m.Name(), m.Tags(), template2, "") @@ -370,7 +371,7 @@ func TestTemplate3(t *testing.T) { fields := map[string]interface{}{ "usage_idle": float64(91.5), } - m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) + m, err := metric.New("cpu", defaultTags, fields, now) assert.NoError(t, err) mS := SerializeBucketName(m.Name(), m.Tags(), template3, "") @@ -384,7 +385,7 @@ func TestTemplate4(t *testing.T) { fields := map[string]interface{}{ "usage_idle": float64(91.5), } - m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) + m, err := metric.New("cpu", defaultTags, fields, now) assert.NoError(t, err) mS := SerializeBucketName(m.Name(), m.Tags(), template4, "") @@ -398,7 +399,7 @@ func TestTemplate6(t *testing.T) { fields := map[string]interface{}{ "usage_idle": float64(91.5), } - m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) + m, err := metric.New("cpu", defaultTags, fields, now) assert.NoError(t, err) mS := SerializeBucketName(m.Name(), m.Tags(), template6, "") diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go index 03c53fed2..50bfe2c21 100644 --- a/plugins/serializers/influx/influx.go +++ b/plugins/serializers/influx/influx.go @@ -7,6 +7,6 @@ import ( type InfluxSerializer struct { } -func (s *InfluxSerializer) Serialize(metric telegraf.Metric) ([]string, error) { - return []string{metric.String()}, nil +func (s *InfluxSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { + return metric.Serialize(), nil } diff --git a/plugins/serializers/influx/influx_test.go b/plugins/serializers/influx/influx_test.go index 4937800aa..32b9c8ca2 100644 --- a/plugins/serializers/influx/influx_test.go +++ b/plugins/serializers/influx/influx_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) func TestSerializeMetricFloat(t *testing.T) { @@ -18,7 +19,7 @@ func TestSerializeMetricFloat(t *testing.T) { fields := map[string]interface{}{ "usage_idle": float64(91.5), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := InfluxSerializer{} @@ -37,7 +38,7 @@ func TestSerializeMetricInt(t *testing.T) { fields := map[string]interface{}{ "usage_idle": int64(90), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := InfluxSerializer{} @@ -56,7 +57,7 @@ func TestSerializeMetricString(t *testing.T) { fields := map[string]interface{}{ "usage_idle": "foobar", } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := InfluxSerializer{} diff --git a/plugins/serializers/json/json.go b/plugins/serializers/json/json.go index e27aa400f..3e259fafd 100644 --- a/plugins/serializers/json/json.go +++ b/plugins/serializers/json/json.go @@ -9,9 +9,7 @@ import ( type JsonSerializer struct { } -func (s *JsonSerializer) Serialize(metric telegraf.Metric) ([]string, error) { - out := []string{} - +func (s *JsonSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { m := make(map[string]interface{}) m["tags"] = metric.Tags() m["fields"] = metric.Fields() @@ -19,9 +17,9 @@ func (s *JsonSerializer) Serialize(metric telegraf.Metric) ([]string, error) { m["timestamp"] = metric.UnixNano() / 1000000000 serialized, err := ejson.Marshal(m) if err != nil { - return []string{}, err + return []byte{}, err } - out = append(out, string(serialized)) + serialized = append(serialized, '\n') - return out, nil + return serialized, nil } diff --git a/plugins/serializers/json/json_test.go b/plugins/serializers/json/json_test.go index bf350fd14..900a3a6ec 100644 --- a/plugins/serializers/json/json_test.go +++ b/plugins/serializers/json/json_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) func TestSerializeMetricFloat(t *testing.T) { @@ -18,7 +19,7 @@ func TestSerializeMetricFloat(t *testing.T) { fields := map[string]interface{}{ "usage_idle": float64(91.5), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := JsonSerializer{} @@ -36,7 +37,7 @@ func TestSerializeMetricInt(t *testing.T) { fields := map[string]interface{}{ "usage_idle": int64(90), } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := JsonSerializer{} @@ -55,7 +56,7 @@ func TestSerializeMetricString(t *testing.T) { fields := map[string]interface{}{ "usage_idle": "foobar", } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := JsonSerializer{} @@ -75,7 +76,7 @@ func TestSerializeMultiFields(t *testing.T) { "usage_idle": int64(90), "usage_total": 8559615, } - m, err := telegraf.NewMetric("cpu", tags, fields, now) + m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) s := JsonSerializer{} diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 0cf8149e3..83be4900b 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -18,8 +18,10 @@ type SerializerOutput interface { // Serializer is an interface defining functions that a serializer plugin must // satisfy. type Serializer interface { - // Serialize takes a single telegraf metric and turns it into a string. - Serialize(metric telegraf.Metric) ([]string, error) + // Serialize takes a single telegraf metric and turns it into a byte buffer. + // separate metrics should be separated by a newline, and there should be + // a newline at the end of the buffer. + Serialize(metric telegraf.Metric) ([]byte, error) } // Config is a struct that covers the data types needed for all serializer types, diff --git a/testutil/testutil.go b/testutil/testutil.go index c09e3d9e8..abcc27cba 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -7,6 +7,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) var localhost = "localhost" @@ -54,7 +55,7 @@ func TestMetric(value interface{}, name ...string) telegraf.Metric { measurement = name[0] } tags := map[string]string{"tag1": "value1"} - pt, _ := telegraf.NewMetric( + pt, _ := metric.New( measurement, tags, map[string]interface{}{"value": value},