Add SerializeBatch method to the Serializer interface (#4107)

This commit is contained in:
Daniel Nelson 2018-05-04 18:27:31 -07:00 committed by GitHub
parent de355b76d6
commit 73c22a8189
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 302 additions and 84 deletions

View File

@ -1,35 +1,15 @@
# Telegraf Output Data Formats
# Output Data Formats
Telegraf is able to serialize metrics into the following output data formats:
In addition to output specific data formats, Telegraf supports a set of
standard data formats that may be selected from when configuring many output
plugins.
1. [InfluxDB Line Protocol](#influx)
1. [JSON](#json)
1. [Graphite](#graphite)
Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/latest/concepts/glossary/#point),
are a combination of four basic parts:
1. Measurement Name
1. Tags
1. Fields
1. Timestamp
In InfluxDB line protocol, these 4 parts are easily defined in textual form:
```
measurement_name[,tag1=val1,...] field1=val1[,field2=val2,...] [timestamp]
```
For Telegraf outputs that write textual data (such as `kafka`, `mqtt`, and `file`),
InfluxDB line protocol was originally the only available output format. But now
we are normalizing telegraf metric "serializers" into a
[plugin-like interface](https://github.com/influxdata/telegraf/tree/master/plugins/serializers)
across all output plugins that can support it.
You will be able to identify a plugin that supports different data formats
by the presence of a `data_format`
config option, for example, in the `file` output plugin:
You will be able to identify the plugins with support by the presence of a
`data_format` config option, for example, in the `file` output plugin:
```toml
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
@ -40,22 +20,16 @@ config option, for example, in the `file` output plugin:
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
## Additional configuration options go here
```
Each data_format has an additional set of configuration options available, which
I'll go over below.
## Influx
# Influx:
The `influx` format outputs data as
The `influx` data format outputs metrics using
[InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/).
This is the recommended format to use unless another format is required for
This is the recommended format unless another format is required for
interoperability.
### Influx Configuration:
### Influx Configuration
```toml
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
@ -82,7 +56,7 @@ interoperability.
# influx_uint_support = false
```
# Graphite:
## Graphite
The Graphite data format translates Telegraf metrics into _dot_ buckets. A
template can be specified for the output of Telegraf metrics into Graphite
@ -115,7 +89,7 @@ tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690
Fields with string values will be skipped. Boolean fields will be converted
to 1 (true) or 0 (false).
### Graphite Configuration:
### Graphite Configuration
```toml
[[outputs.file]]
@ -134,27 +108,63 @@ to 1 (true) or 0 (false).
template = "host.tags.measurement.field"
```
# JSON:
The JSON data format serialized Telegraf metrics in json format. The format is:
## JSON
The JSON output data format output for a single metric is in the
form:
```json
{
"fields":{
"field_1":30,
"field_2":4,
"field_N":59,
"n_images":660
},
"name":"docker",
"tags":{
"host":"raynor"
},
"timestamp":1458229140
"fields": {
"field_1": 30,
"field_2": 4,
"field_N": 59,
"n_images": 660
},
"name": "docker",
"tags": {
"host": "raynor"
},
"timestamp": 1458229140
}
```
### JSON Configuration:
When an output plugin needs to emit multiple metrics at one time, it may use
the batch format. The use of batch format is determined by the plugin,
reference the documentation for the specific plugin.
```json
{
"metrics": [
{
"fields": {
"field_1": 30,
"field_2": 4,
"field_N": 59,
"n_images": 660
},
"name": "docker",
"tags": {
"host": "raynor"
},
"timestamp": 1458229140
},
{
"fields": {
"field_1": 30,
"field_2": 4,
"field_N": 59,
"n_images": 660
},
"name": "docker",
"tags": {
"host": "raynor"
},
"timestamp": 1458229140
}
]
}
```
### JSON Configuration
```toml
[[outputs.file]]
@ -166,14 +176,9 @@ The JSON data format serialized Telegraf metrics in json format. The format is:
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "json"
json_timestamp_units = "1ns"
```
By default, the timestamp that is output in JSON data format serialized Telegraf
metrics is in seconds. The precision of this timestamp can be adjusted for any output
by adding the optional `json_timestamp_units` parameter to the configuration for
that output. This parameter can be used to set the timestamp units to nanoseconds (`ns`),
microseconds (`us` or `µs`), milliseconds (`ms`), or seconds (`s`). Note that this
parameter will be truncated to the nearest power of 10 that, so if the `json_timestamp_units`
are set to `15ms` the timestamps for the JSON format serialized Telegraf metrics will be
output in hundredths of a second (`10ms`).
## The resolution to use for the metric timestamp. Must be a duration string
## such as "1ns", "1us", "1ms", "10ms", "1s". Durations are truncated to
## the power of 10 less than the specified units.
json_timestamp_units = "1s"
```

View File

@ -66,13 +66,18 @@ func (d *MockDialer) DialContext(ctx context.Context, network string, address st
}
type MockSerializer struct {
SerializeF func(metric telegraf.Metric) ([]byte, error)
SerializeF func(metric telegraf.Metric) ([]byte, error)
SerializeBatchF func(metrics []telegraf.Metric) ([]byte, error)
}
func (s *MockSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return s.SerializeF(metric)
}
func (s *MockSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
return s.SerializeBatchF(metrics)
}
func TestUDP_NewUDPClientNoURL(t *testing.T) {
config := &influxdb.UDPConfig{}
_, err := influxdb.NewUDPClient(config)

View File

@ -1,6 +1,7 @@
package graphite
import (
"bytes"
"fmt"
"math"
"regexp"
@ -60,6 +61,21 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return out, nil
}
func (s *GraphiteSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
for _, m := range metrics {
buf, err := s.Serialize(m)
if err != nil {
return nil, err
}
_, err = batch.Write(buf)
if err != nil {
return nil, err
}
}
return batch.Bytes(), nil
}
func formatValue(value interface{}) string {
switch v := value.(type) {
case string:

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
@ -577,3 +578,32 @@ func TestClean(t *testing.T) {
})
}
}
func TestSerializeBatch(t *testing.T) {
now := time.Unix(1234567890, 0)
tests := []struct {
name string
metric_name string
tags map[string]string
fields map[string]interface{}
expected string
}{
{
"Base metric",
"cpu",
map[string]string{"host": "localhost"},
map[string]interface{}{"usage_busy": float64(8.5)},
"localhost.cpu.usage_busy 8.5 1234567890\nlocalhost.cpu.usage_busy 8.5 1234567890\n",
},
}
s := GraphiteSerializer{}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m, err := metric.New(tt.metric_name, tt.tags, tt.fields, now)
assert.NoError(t, err)
actual, _ := s.SerializeBatch([]telegraf.Metric{m, m})
require.Equal(t, tt.expected, string(actual))
})
}
}

View File

@ -102,6 +102,17 @@ func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) {
return out, nil
}
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
for _, m := range metrics {
_, err := s.Write(&batch, m)
if err != nil {
return nil, err
}
}
return batch.Bytes(), nil
}
func (s *Serializer) Write(w io.Writer, m telegraf.Metric) (int, error) {
err := s.writeMetric(w, m)
return s.bytesWritten, err

View File

@ -447,3 +447,24 @@ func BenchmarkSerializer(b *testing.B) {
})
}
}
func TestSerialize_SerializeBatch(t *testing.T) {
m := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
)
metrics := []telegraf.Metric{m, m}
serializer := NewSerializer()
serializer.SetFieldSortOrder(SortFields)
output, err := serializer.SerializeBatch(metrics)
require.NoError(t, err)
require.Equal(t, []byte("cpu value=42 0\ncpu value=42 0\n"), output)
}

View File

@ -1,29 +1,26 @@
package json
import (
ejson "encoding/json"
"encoding/json"
"time"
"github.com/influxdata/telegraf"
)
type JsonSerializer struct {
type serializer struct {
TimestampUnits time.Duration
}
func (s *JsonSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
m := make(map[string]interface{})
units_nanoseconds := s.TimestampUnits.Nanoseconds()
// if the units passed in were less than or equal to zero,
// then serialize the timestamp in seconds (the default)
if units_nanoseconds <= 0 {
units_nanoseconds = 1000000000
func NewSerializer(timestampUnits time.Duration) (*serializer, error) {
s := &serializer{
TimestampUnits: truncateDuration(timestampUnits),
}
m["tags"] = metric.Tags()
m["fields"] = metric.Fields()
m["name"] = metric.Name()
m["timestamp"] = metric.Time().UnixNano() / units_nanoseconds
serialized, err := ejson.Marshal(m)
return s, nil
}
func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
m := s.createObject(metric)
serialized, err := json.Marshal(m)
if err != nil {
return []byte{}, err
}
@ -31,3 +28,46 @@ func (s *JsonSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return serialized, nil
}
func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
objects := make([]interface{}, 0, len(metrics))
for _, metric := range metrics {
m := s.createObject(metric)
objects = append(objects, m)
}
obj := map[string]interface{}{
"metrics": objects,
}
serialized, err := json.Marshal(obj)
if err != nil {
return []byte{}, err
}
return serialized, nil
}
func (s *serializer) createObject(metric telegraf.Metric) map[string]interface{} {
m := make(map[string]interface{}, 4)
m["tags"] = metric.Tags()
m["fields"] = metric.Fields()
m["name"] = metric.Name()
m["timestamp"] = metric.Time().UnixNano() / int64(s.TimestampUnits)
return m
}
func truncateDuration(units time.Duration) time.Duration {
// Default precision is 1s
if units <= 0 {
return time.Second
}
// Search for the power of ten less than the duration
d := time.Nanosecond
for {
if d*10 > units {
return d
}
d = d * 10
}
}

View File

@ -6,10 +6,19 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
func MustMetric(v telegraf.Metric, err error) telegraf.Metric {
if err != nil {
panic(err)
}
return v
}
func TestSerializeMetricFloat(t *testing.T) {
now := time.Now()
tags := map[string]string{
@ -21,7 +30,7 @@ func TestSerializeMetricFloat(t *testing.T) {
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s := JsonSerializer{}
s, _ := NewSerializer(0)
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
@ -29,6 +38,63 @@ func TestSerializeMetricFloat(t *testing.T) {
assert.Equal(t, string(expS), string(buf))
}
func TestSerialize_TimestampUnits(t *testing.T) {
tests := []struct {
name string
timestampUnits time.Duration
expected string
}{
{
name: "default of 1s",
timestampUnits: 0,
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":1525478795}`,
},
{
name: "1ns",
timestampUnits: 1 * time.Nanosecond,
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":1525478795123456789}`,
},
{
name: "1ms",
timestampUnits: 1 * time.Millisecond,
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":1525478795123}`,
},
{
name: "10ms",
timestampUnits: 10 * time.Millisecond,
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":152547879512}`,
},
{
name: "15ms is reduced to 10ms",
timestampUnits: 15 * time.Millisecond,
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":152547879512}`,
},
{
name: "65ms is reduced to 10ms",
timestampUnits: 65 * time.Millisecond,
expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":152547879512}`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(1525478795, 123456789),
),
)
s, _ := NewSerializer(tt.timestampUnits)
actual, err := s.Serialize(m)
require.NoError(t, err)
require.Equal(t, tt.expected+"\n", string(actual))
})
}
}
func TestSerializeMetricInt(t *testing.T) {
now := time.Now()
tags := map[string]string{
@ -40,7 +106,7 @@ func TestSerializeMetricInt(t *testing.T) {
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s := JsonSerializer{}
s, _ := NewSerializer(0)
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
@ -60,7 +126,7 @@ func TestSerializeMetricString(t *testing.T) {
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s := JsonSerializer{}
s, _ := NewSerializer(0)
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
@ -81,7 +147,7 @@ func TestSerializeMultiFields(t *testing.T) {
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s := JsonSerializer{}
s, _ := NewSerializer(0)
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
@ -101,10 +167,29 @@ func TestSerializeMetricWithEscapes(t *testing.T) {
m, err := metric.New("My CPU", tags, fields, now)
assert.NoError(t, err)
s := JsonSerializer{}
s, _ := NewSerializer(0)
buf, err := s.Serialize(m)
assert.NoError(t, err)
expS := []byte(fmt.Sprintf(`{"fields":{"U,age=Idle":90},"name":"My CPU","tags":{"cpu tag":"cpu0"},"timestamp":%d}`, now.Unix()) + "\n")
assert.Equal(t, string(expS), string(buf))
}
func TestSerializeBatch(t *testing.T) {
m := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
)
metrics := []telegraf.Metric{m, m}
s, _ := NewSerializer(0)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
require.Equal(t, []byte(`{"metrics":[{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0},{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0}]}`), buf)
}

View File

@ -25,6 +25,11 @@ type Serializer interface {
// separate metrics should be separated by a newline, and there should be
// a newline at the end of the buffer.
Serialize(metric telegraf.Metric) ([]byte, error)
// SerializeBatch takes an array of telegraf metric and serializes it into
// a byte buffer. This method is not required to be suitable for use with
// line oriented framing.
SerializeBatch(metrics []telegraf.Metric) ([]byte, error)
}
// Config is a struct that covers the data types needed for all serializer types,
@ -72,7 +77,7 @@ func NewSerializer(config *Config) (Serializer, error) {
}
func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) {
return &json.JsonSerializer{TimestampUnits: timestampUnits}, nil
return json.NewSerializer(timestampUnits)
}
func NewInfluxSerializerConfig(config *Config) (Serializer, error) {