Add SerializeBatch method to the Serializer interface (#4107)

This commit is contained in:
Daniel Nelson 2018-05-04 18:27:31 -07:00 committed by GitHub
parent 55b4fcb40d
commit 0ede70a2bd
9 changed files with 302 additions and 84 deletions

View File

@ -1,35 +1,15 @@
# Telegraf Output Data Formats # Output Data Formats
Telegraf is able to serialize metrics into the following output data formats: In addition to output specific data formats, Telegraf supports a set of
standard data formats that may be selected from when configuring many output
plugins.
1. [InfluxDB Line Protocol](#influx) 1. [InfluxDB Line Protocol](#influx)
1. [JSON](#json) 1. [JSON](#json)
1. [Graphite](#graphite) 1. [Graphite](#graphite)
Telegraf metrics, like InfluxDB You will be able to identify the plugins with support by the presence of a
[points](https://docs.influxdata.com/influxdb/latest/concepts/glossary/#point), `data_format` config option, for example, in the `file` output plugin:
are a combination of four basic parts:
1. Measurement Name
1. Tags
1. Fields
1. Timestamp
In InfluxDB line protocol, these 4 parts are easily defined in textual form:
```
measurement_name[,tag1=val1,...] field1=val1[,field2=val2,...] [timestamp]
```
For Telegraf outputs that write textual data (such as `kafka`, `mqtt`, and `file`),
InfluxDB line protocol was originally the only available output format. But now
we are normalizing telegraf metric "serializers" into a
[plugin-like interface](https://github.com/influxdata/telegraf/tree/master/plugins/serializers)
across all output plugins that can support it.
You will be able to identify a plugin that supports different data formats
by the presence of a `data_format`
config option, for example, in the `file` output plugin:
```toml ```toml
[[outputs.file]] [[outputs.file]]
## Files to write to, "stdout" is a specially handled file. ## Files to write to, "stdout" is a specially handled file.
@ -40,22 +20,16 @@ config option, for example, in the `file` output plugin:
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx" data_format = "influx"
## Additional configuration options go here
``` ```
Each data_format has an additional set of configuration options available, which ## Influx
I'll go over below.
# Influx: The `influx` data format outputs metrics using
The `influx` format outputs data as
[InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/). [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/).
This is the recommended format to use unless another format is required for This is the recommended format unless another format is required for
interoperability. interoperability.
### Influx Configuration: ### Influx Configuration
```toml ```toml
[[outputs.file]] [[outputs.file]]
## Files to write to, "stdout" is a specially handled file. ## Files to write to, "stdout" is a specially handled file.
@ -82,7 +56,7 @@ interoperability.
# influx_uint_support = false # influx_uint_support = false
``` ```
# Graphite: ## Graphite
The Graphite data format translates Telegraf metrics into _dot_ buckets. A The Graphite data format translates Telegraf metrics into _dot_ buckets. A
template can be specified for the output of Telegraf metrics into Graphite template can be specified for the output of Telegraf metrics into Graphite
@ -115,7 +89,7 @@ tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690
Fields with string values will be skipped. Boolean fields will be converted Fields with string values will be skipped. Boolean fields will be converted
to 1 (true) or 0 (false). to 1 (true) or 0 (false).
### Graphite Configuration: ### Graphite Configuration
```toml ```toml
[[outputs.file]] [[outputs.file]]
@ -134,10 +108,10 @@ to 1 (true) or 0 (false).
template = "host.tags.measurement.field" template = "host.tags.measurement.field"
``` ```
# JSON: ## JSON
The JSON data format serialized Telegraf metrics in json format. The format is:
The JSON output data format output for a single metric is in the
form:
```json ```json
{ {
"fields": { "fields": {
@ -154,7 +128,43 @@ The JSON data format serialized Telegraf metrics in json format. The format is:
} }
``` ```
### JSON Configuration: When an output plugin needs to emit multiple metrics at one time, it may use
the batch format. The use of batch format is determined by the plugin,
reference the documentation for the specific plugin.
```json
{
"metrics": [
{
"fields": {
"field_1": 30,
"field_2": 4,
"field_N": 59,
"n_images": 660
},
"name": "docker",
"tags": {
"host": "raynor"
},
"timestamp": 1458229140
},
{
"fields": {
"field_1": 30,
"field_2": 4,
"field_N": 59,
"n_images": 660
},
"name": "docker",
"tags": {
"host": "raynor"
},
"timestamp": 1458229140
}
]
}
```
### JSON Configuration
```toml ```toml
[[outputs.file]] [[outputs.file]]
@ -166,14 +176,9 @@ The JSON data format serialized Telegraf metrics in json format. The format is:
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "json" data_format = "json"
json_timestamp_units = "1ns"
```
By default, the timestamp that is output in JSON data format serialized Telegraf ## The resolution to use for the metric timestamp. Must be a duration string
metrics is in seconds. The precision of this timestamp can be adjusted for any output ## such as "1ns", "1us", "1ms", "10ms", "1s". Durations are truncated to
by adding the optional `json_timestamp_units` parameter to the configuration for ## the power of 10 less than the specified units.
that output. This parameter can be used to set the timestamp units to nanoseconds (`ns`), json_timestamp_units = "1s"
microseconds (`us` or `µs`), milliseconds (`ms`), or seconds (`s`). Note that this ```
parameter will be truncated to the nearest power of 10 that, so if the `json_timestamp_units`
are set to `15ms` the timestamps for the JSON format serialized Telegraf metrics will be
output in hundredths of a second (`10ms`).

View File

@ -67,12 +67,17 @@ func (d *MockDialer) DialContext(ctx context.Context, network string, address st
type MockSerializer struct { type MockSerializer struct {
SerializeF func(metric telegraf.Metric) ([]byte, error) SerializeF func(metric telegraf.Metric) ([]byte, error)
SerializeBatchF func(metrics []telegraf.Metric) ([]byte, error)
} }
func (s *MockSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { func (s *MockSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return s.SerializeF(metric) return s.SerializeF(metric)
} }
func (s *MockSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
return s.SerializeBatchF(metrics)
}
func TestUDP_NewUDPClientNoURL(t *testing.T) { func TestUDP_NewUDPClientNoURL(t *testing.T) {
config := &influxdb.UDPConfig{} config := &influxdb.UDPConfig{}
_, err := influxdb.NewUDPClient(config) _, err := influxdb.NewUDPClient(config)

View File

@ -1,6 +1,7 @@
package graphite package graphite
import ( import (
"bytes"
"fmt" "fmt"
"math" "math"
"regexp" "regexp"
@ -60,6 +61,21 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return out, nil return out, nil
} }
func (s *GraphiteSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
for _, m := range metrics {
buf, err := s.Serialize(m)
if err != nil {
return nil, err
}
_, err = batch.Write(buf)
if err != nil {
return nil, err
}
}
return batch.Bytes(), nil
}
func formatValue(value interface{}) string { func formatValue(value interface{}) string {
switch v := value.(type) { switch v := value.(type) {
case string: case string:

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
) )
@ -577,3 +578,32 @@ func TestClean(t *testing.T) {
}) })
} }
} }
func TestSerializeBatch(t *testing.T) {
now := time.Unix(1234567890, 0)
tests := []struct {
name string
metric_name string
tags map[string]string
fields map[string]interface{}
expected string
}{
{
"Base metric",
"cpu",
map[string]string{"host": "localhost"},
map[string]interface{}{"usage_busy": float64(8.5)},
"localhost.cpu.usage_busy 8.5 1234567890\nlocalhost.cpu.usage_busy 8.5 1234567890\n",
},
}
s := GraphiteSerializer{}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m, err := metric.New(tt.metric_name, tt.tags, tt.fields, now)
assert.NoError(t, err)
actual, _ := s.SerializeBatch([]telegraf.Metric{m, m})
require.Equal(t, tt.expected, string(actual))
})
}
}

View File

@ -102,6 +102,17 @@ func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) {
return out, nil return out, nil
} }
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
for _, m := range metrics {
_, err := s.Write(&batch, m)
if err != nil {
return nil, err
}
}
return batch.Bytes(), nil
}
func (s *Serializer) Write(w io.Writer, m telegraf.Metric) (int, error) { func (s *Serializer) Write(w io.Writer, m telegraf.Metric) (int, error) {
err := s.writeMetric(w, m) err := s.writeMetric(w, m)
return s.bytesWritten, err return s.bytesWritten, err

View File

@ -447,3 +447,24 @@ func BenchmarkSerializer(b *testing.B) {
}) })
} }
} }
func TestSerialize_SerializeBatch(t *testing.T) {
m := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
)
metrics := []telegraf.Metric{m, m}
serializer := NewSerializer()
serializer.SetFieldSortOrder(SortFields)
output, err := serializer.SerializeBatch(metrics)
require.NoError(t, err)
require.Equal(t, []byte("cpu value=42 0\ncpu value=42 0\n"), output)
}

View File

@ -1,29 +1,26 @@
package json package json
import ( import (
ejson "encoding/json" "encoding/json"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
type JsonSerializer struct { type serializer struct {
TimestampUnits time.Duration TimestampUnits time.Duration
} }
func (s *JsonSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { func NewSerializer(timestampUnits time.Duration) (*serializer, error) {
m := make(map[string]interface{}) s := &serializer{
units_nanoseconds := s.TimestampUnits.Nanoseconds() TimestampUnits: truncateDuration(timestampUnits),
// if the units passed in were less than or equal to zero,
// then serialize the timestamp in seconds (the default)
if units_nanoseconds <= 0 {
units_nanoseconds = 1000000000
} }
m["tags"] = metric.Tags() return s, nil
m["fields"] = metric.Fields() }
m["name"] = metric.Name()
m["timestamp"] = metric.Time().UnixNano() / units_nanoseconds func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
serialized, err := ejson.Marshal(m) m := s.createObject(metric)
serialized, err := json.Marshal(m)
if err != nil { if err != nil {
return []byte{}, err return []byte{}, err
} }
@ -31,3 +28,46 @@ func (s *JsonSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return serialized, nil return serialized, nil
} }
func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
objects := make([]interface{}, 0, len(metrics))
for _, metric := range metrics {
m := s.createObject(metric)
objects = append(objects, m)
}
obj := map[string]interface{}{
"metrics": objects,
}
serialized, err := json.Marshal(obj)
if err != nil {
return []byte{}, err
}
return serialized, nil
}
func (s *serializer) createObject(metric telegraf.Metric) map[string]interface{} {
m := make(map[string]interface{}, 4)
m["tags"] = metric.Tags()
m["fields"] = metric.Fields()
m["name"] = metric.Name()
m["timestamp"] = metric.Time().UnixNano() / int64(s.TimestampUnits)
return m
}
func truncateDuration(units time.Duration) time.Duration {
// Default precision is 1s
if units <= 0 {
return time.Second
}
// Search for the power of ten less than the duration
d := time.Nanosecond
for {
if d*10 > units {
return d
}
d = d * 10
}
}

View File

@ -6,10 +6,19 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
) )
func MustMetric(v telegraf.Metric, err error) telegraf.Metric {
if err != nil {
panic(err)
}
return v
}
func TestSerializeMetricFloat(t *testing.T) { func TestSerializeMetricFloat(t *testing.T) {
now := time.Now() now := time.Now()
tags := map[string]string{ tags := map[string]string{
@ -21,7 +30,7 @@ func TestSerializeMetricFloat(t *testing.T) {
m, err := metric.New("cpu", tags, fields, now) m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := JsonSerializer{} s, _ := NewSerializer(0)
var buf []byte var buf []byte
buf, err = s.Serialize(m) buf, err = s.Serialize(m)
assert.NoError(t, err) assert.NoError(t, err)
@ -29,6 +38,63 @@ func TestSerializeMetricFloat(t *testing.T) {
assert.Equal(t, string(expS), string(buf)) assert.Equal(t, string(expS), string(buf))
} }
func TestSerialize_TimestampUnits(t *testing.T) {
tests := []struct {
name string
timestampUnits time.Duration
expected string
}{
{
name: "default of 1s",
timestampUnits: 0,
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":1525478795}`,
},
{
name: "1ns",
timestampUnits: 1 * time.Nanosecond,
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":1525478795123456789}`,
},
{
name: "1ms",
timestampUnits: 1 * time.Millisecond,
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":1525478795123}`,
},
{
name: "10ms",
timestampUnits: 10 * time.Millisecond,
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":152547879512}`,
},
{
name: "15ms is reduced to 10ms",
timestampUnits: 15 * time.Millisecond,
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":152547879512}`,
},
{
name: "65ms is reduced to 10ms",
timestampUnits: 65 * time.Millisecond,
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":152547879512}`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(1525478795, 123456789),
),
)
s, _ := NewSerializer(tt.timestampUnits)
actual, err := s.Serialize(m)
require.NoError(t, err)
require.Equal(t, tt.expected+"\n", string(actual))
})
}
}
func TestSerializeMetricInt(t *testing.T) { func TestSerializeMetricInt(t *testing.T) {
now := time.Now() now := time.Now()
tags := map[string]string{ tags := map[string]string{
@ -40,7 +106,7 @@ func TestSerializeMetricInt(t *testing.T) {
m, err := metric.New("cpu", tags, fields, now) m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := JsonSerializer{} s, _ := NewSerializer(0)
var buf []byte var buf []byte
buf, err = s.Serialize(m) buf, err = s.Serialize(m)
assert.NoError(t, err) assert.NoError(t, err)
@ -60,7 +126,7 @@ func TestSerializeMetricString(t *testing.T) {
m, err := metric.New("cpu", tags, fields, now) m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := JsonSerializer{} s, _ := NewSerializer(0)
var buf []byte var buf []byte
buf, err = s.Serialize(m) buf, err = s.Serialize(m)
assert.NoError(t, err) assert.NoError(t, err)
@ -81,7 +147,7 @@ func TestSerializeMultiFields(t *testing.T) {
m, err := metric.New("cpu", tags, fields, now) m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := JsonSerializer{} s, _ := NewSerializer(0)
var buf []byte var buf []byte
buf, err = s.Serialize(m) buf, err = s.Serialize(m)
assert.NoError(t, err) assert.NoError(t, err)
@ -101,10 +167,29 @@ func TestSerializeMetricWithEscapes(t *testing.T) {
m, err := metric.New("My CPU", tags, fields, now) m, err := metric.New("My CPU", tags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := JsonSerializer{} s, _ := NewSerializer(0)
buf, err := s.Serialize(m) buf, err := s.Serialize(m)
assert.NoError(t, err) assert.NoError(t, err)
expS := []byte(fmt.Sprintf(`{"fields":{"U,age=Idle":90},"name":"My CPU","tags":{"cpu tag":"cpu0"},"timestamp":%d}`, now.Unix()) + "\n") expS := []byte(fmt.Sprintf(`{"fields":{"U,age=Idle":90},"name":"My CPU","tags":{"cpu tag":"cpu0"},"timestamp":%d}`, now.Unix()) + "\n")
assert.Equal(t, string(expS), string(buf)) assert.Equal(t, string(expS), string(buf))
} }
func TestSerializeBatch(t *testing.T) {
m := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
)
metrics := []telegraf.Metric{m, m}
s, _ := NewSerializer(0)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
require.Equal(t, []byte(`{"metrics":[{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0},{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0}]}`), buf)
}

View File

@ -25,6 +25,11 @@ type Serializer interface {
// separate metrics should be separated by a newline, and there should be // separate metrics should be separated by a newline, and there should be
// a newline at the end of the buffer. // a newline at the end of the buffer.
Serialize(metric telegraf.Metric) ([]byte, error) Serialize(metric telegraf.Metric) ([]byte, error)
// SerializeBatch takes an array of telegraf metric and serializes it into
// a byte buffer. This method is not required to be suitable for use with
// line oriented framing.
SerializeBatch(metrics []telegraf.Metric) ([]byte, error)
} }
// Config is a struct that covers the data types needed for all serializer types, // Config is a struct that covers the data types needed for all serializer types,
@ -72,7 +77,7 @@ func NewSerializer(config *Config) (Serializer, error) {
} }
func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) { func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) {
return &json.JsonSerializer{TimestampUnits: timestampUnits}, nil return json.NewSerializer(timestampUnits)
} }
func NewInfluxSerializerConfig(config *Config) (Serializer, error) { func NewInfluxSerializerConfig(config *Config) (Serializer, error) {