diff --git a/docs/DATA_FORMATS_OUTPUT.md b/docs/DATA_FORMATS_OUTPUT.md index 5716b0aec..d21aca91d 100644 --- a/docs/DATA_FORMATS_OUTPUT.md +++ b/docs/DATA_FORMATS_OUTPUT.md @@ -1,35 +1,15 @@ -# Telegraf Output Data Formats +# Output Data Formats -Telegraf is able to serialize metrics into the following output data formats: +In addition to output specific data formats, Telegraf supports a set of +standard data formats that may be selected from when configuring many output +plugins. 1. [InfluxDB Line Protocol](#influx) 1. [JSON](#json) 1. [Graphite](#graphite) -Telegraf metrics, like InfluxDB -[points](https://docs.influxdata.com/influxdb/latest/concepts/glossary/#point), -are a combination of four basic parts: - -1. Measurement Name -1. Tags -1. Fields -1. Timestamp - -In InfluxDB line protocol, these 4 parts are easily defined in textual form: - -``` -measurement_name[,tag1=val1,...] field1=val1[,field2=val2,...] [timestamp] -``` - -For Telegraf outputs that write textual data (such as `kafka`, `mqtt`, and `file`), -InfluxDB line protocol was originally the only available output format. But now -we are normalizing telegraf metric "serializers" into a -[plugin-like interface](https://github.com/influxdata/telegraf/tree/master/plugins/serializers) -across all output plugins that can support it. -You will be able to identify a plugin that supports different data formats -by the presence of a `data_format` -config option, for example, in the `file` output plugin: - +You will be able to identify the plugins with support by the presence of a +`data_format` config option, for example, in the `file` output plugin: ```toml [[outputs.file]] ## Files to write to, "stdout" is a specially handled file. @@ -40,22 +20,16 @@ config option, for example, in the `file` output plugin: ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "influx" - - ## Additional configuration options go here ``` -Each data_format has an additional set of configuration options available, which -I'll go over below. +## Influx -# Influx: - -The `influx` format outputs data as +The `influx` data format outputs metrics using [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/). -This is the recommended format to use unless another format is required for +This is the recommended format unless another format is required for interoperability. -### Influx Configuration: - +### Influx Configuration ```toml [[outputs.file]] ## Files to write to, "stdout" is a specially handled file. @@ -82,7 +56,7 @@ interoperability. # influx_uint_support = false ``` -# Graphite: +## Graphite The Graphite data format translates Telegraf metrics into _dot_ buckets. A template can be specified for the output of Telegraf metrics into Graphite @@ -115,7 +89,7 @@ tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690 Fields with string values will be skipped. Boolean fields will be converted to 1 (true) or 0 (false). -### Graphite Configuration: +### Graphite Configuration ```toml [[outputs.file]] @@ -134,27 +108,63 @@ to 1 (true) or 0 (false). template = "host.tags.measurement.field" ``` -# JSON: - -The JSON data format serialized Telegraf metrics in json format. The format is: +## JSON +The JSON output data format output for a single metric is in the +form: ```json { - "fields":{ - "field_1":30, - "field_2":4, - "field_N":59, - "n_images":660 - }, - "name":"docker", - "tags":{ - "host":"raynor" - }, - "timestamp":1458229140 + "fields": { + "field_1": 30, + "field_2": 4, + "field_N": 59, + "n_images": 660 + }, + "name": "docker", + "tags": { + "host": "raynor" + }, + "timestamp": 1458229140 } ``` -### JSON Configuration: +When an output plugin needs to emit multiple metrics at one time, it may use +the batch format. The use of batch format is determined by the plugin, +reference the documentation for the specific plugin. +```json +{ + "metrics": [ + { + "fields": { + "field_1": 30, + "field_2": 4, + "field_N": 59, + "n_images": 660 + }, + "name": "docker", + "tags": { + "host": "raynor" + }, + "timestamp": 1458229140 + }, + { + "fields": { + "field_1": 30, + "field_2": 4, + "field_N": 59, + "n_images": 660 + }, + "name": "docker", + "tags": { + "host": "raynor" + }, + "timestamp": 1458229140 + } + ] +} +``` + +### JSON Configuration ```toml [[outputs.file]] @@ -166,14 +176,9 @@ The JSON data format serialized Telegraf metrics in json format. The format is: ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json" - json_timestamp_units = "1ns" -``` -By default, the timestamp that is output in JSON data format serialized Telegraf -metrics is in seconds. The precision of this timestamp can be adjusted for any output -by adding the optional `json_timestamp_units` parameter to the configuration for -that output. This parameter can be used to set the timestamp units to nanoseconds (`ns`), -microseconds (`us` or `µs`), milliseconds (`ms`), or seconds (`s`). Note that this -parameter will be truncated to the nearest power of 10 that, so if the `json_timestamp_units` -are set to `15ms` the timestamps for the JSON format serialized Telegraf metrics will be -output in hundredths of a second (`10ms`). + ## The resolution to use for the metric timestamp. Must be a duration string + ## such as "1ns", "1us", "1ms", "10ms", "1s". Durations are truncated to + ## the power of 10 less than the specified units. + json_timestamp_units = "1s" +``` diff --git a/plugins/outputs/influxdb/udp_test.go b/plugins/outputs/influxdb/udp_test.go index fd05c6905..017ee0be9 100644 --- a/plugins/outputs/influxdb/udp_test.go +++ b/plugins/outputs/influxdb/udp_test.go @@ -66,13 +66,18 @@ func (d *MockDialer) DialContext(ctx context.Context, network string, address st } type MockSerializer struct { - SerializeF func(metric telegraf.Metric) ([]byte, error) + SerializeF func(metric telegraf.Metric) ([]byte, error) + SerializeBatchF func(metrics []telegraf.Metric) ([]byte, error) } func (s *MockSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { return s.SerializeF(metric) } +func (s *MockSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { + return s.SerializeBatchF(metrics) +} + func TestUDP_NewUDPClientNoURL(t *testing.T) { config := &influxdb.UDPConfig{} _, err := influxdb.NewUDPClient(config) diff --git a/plugins/serializers/graphite/graphite.go b/plugins/serializers/graphite/graphite.go index 0f7fcd8f5..c06ba67ff 100644 --- a/plugins/serializers/graphite/graphite.go +++ b/plugins/serializers/graphite/graphite.go @@ -1,6 +1,7 @@ package graphite import ( + "bytes" "fmt" "math" "regexp" @@ -60,6 +61,21 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { return out, nil } +func (s *GraphiteSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { + var batch bytes.Buffer + for _, m := range metrics { + buf, err := s.Serialize(m) + if err != nil { + return nil, err + } + _, err = batch.Write(buf) + if err != nil { + return nil, err + } + } + return batch.Bytes(), nil +} + func formatValue(value interface{}) string { switch v := value.(type) { case string: diff --git a/plugins/serializers/graphite/graphite_test.go b/plugins/serializers/graphite/graphite_test.go index e08f47593..85e7a6458 100644 --- a/plugins/serializers/graphite/graphite_test.go +++ b/plugins/serializers/graphite/graphite_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" ) @@ -577,3 +578,32 @@ func TestClean(t *testing.T) { }) } } + +func TestSerializeBatch(t *testing.T) { + now := time.Unix(1234567890, 0) + tests := []struct { + name string + metric_name string + tags map[string]string + fields map[string]interface{} + expected string + }{ + { + "Base metric", + "cpu", + map[string]string{"host": "localhost"}, + map[string]interface{}{"usage_busy": float64(8.5)}, + "localhost.cpu.usage_busy 8.5 1234567890\nlocalhost.cpu.usage_busy 8.5 1234567890\n", + }, + } + + s := GraphiteSerializer{} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m, err := metric.New(tt.metric_name, tt.tags, tt.fields, now) + assert.NoError(t, err) + actual, _ := s.SerializeBatch([]telegraf.Metric{m, m}) + require.Equal(t, tt.expected, string(actual)) + }) + } +} diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go index 819701c2e..f052c9c93 100644 --- a/plugins/serializers/influx/influx.go +++ b/plugins/serializers/influx/influx.go @@ -102,6 +102,17 @@ func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) { return out, nil } +func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { + var batch bytes.Buffer + for _, m := range metrics { + _, err := s.Write(&batch, m) + if err != nil { + return nil, err + } + } + return batch.Bytes(), nil +} + func (s *Serializer) Write(w io.Writer, m telegraf.Metric) (int, error) { err := s.writeMetric(w, m) return s.bytesWritten, err diff --git a/plugins/serializers/influx/influx_test.go b/plugins/serializers/influx/influx_test.go index dca56dcf7..74bffe5e4 100644 --- a/plugins/serializers/influx/influx_test.go +++ b/plugins/serializers/influx/influx_test.go @@ -447,3 +447,24 @@ func BenchmarkSerializer(b *testing.B) { }) } } + +func TestSerialize_SerializeBatch(t *testing.T) { + m := MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ) + + metrics := []telegraf.Metric{m, m} + + serializer := NewSerializer() + serializer.SetFieldSortOrder(SortFields) + output, err := serializer.SerializeBatch(metrics) + require.NoError(t, err) + require.Equal(t, []byte("cpu value=42 0\ncpu value=42 0\n"), output) +} diff --git a/plugins/serializers/json/json.go b/plugins/serializers/json/json.go index f988bf40e..bfb84f9a7 100644 --- a/plugins/serializers/json/json.go +++ b/plugins/serializers/json/json.go @@ -1,29 +1,26 @@ package json import ( - ejson "encoding/json" + "encoding/json" "time" "github.com/influxdata/telegraf" ) -type JsonSerializer struct { +type serializer struct { TimestampUnits time.Duration } -func (s *JsonSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { - m := make(map[string]interface{}) - units_nanoseconds := s.TimestampUnits.Nanoseconds() - // if the units passed in were less than or equal to zero, - // then serialize the timestamp in seconds (the default) - if units_nanoseconds <= 0 { - units_nanoseconds = 1000000000 +func NewSerializer(timestampUnits time.Duration) (*serializer, error) { + s := &serializer{ + TimestampUnits: truncateDuration(timestampUnits), } - m["tags"] = metric.Tags() - m["fields"] = metric.Fields() - m["name"] = metric.Name() - m["timestamp"] = metric.Time().UnixNano() / units_nanoseconds - serialized, err := ejson.Marshal(m) + return s, nil +} + +func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) { + m := s.createObject(metric) + serialized, err := json.Marshal(m) if err != nil { return []byte{}, err } @@ -31,3 +28,46 @@ func (s *JsonSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { return serialized, nil } + +func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { + objects := make([]interface{}, 0, len(metrics)) + for _, metric := range metrics { + m := s.createObject(metric) + objects = append(objects, m) + } + + obj := map[string]interface{}{ + "metrics": objects, + } + + serialized, err := json.Marshal(obj) + if err != nil { + return []byte{}, err + } + return serialized, nil +} + +func (s *serializer) createObject(metric telegraf.Metric) map[string]interface{} { + m := make(map[string]interface{}, 4) + m["tags"] = metric.Tags() + m["fields"] = metric.Fields() + m["name"] = metric.Name() + m["timestamp"] = metric.Time().UnixNano() / int64(s.TimestampUnits) + return m +} + +func truncateDuration(units time.Duration) time.Duration { + // Default precision is 1s + if units <= 0 { + return time.Second + } + + // Search for the power of ten less than the duration + d := time.Nanosecond + for { + if d*10 > units { + return d + } + d = d * 10 + } +} diff --git a/plugins/serializers/json/json_test.go b/plugins/serializers/json/json_test.go index e44952b6f..82990b747 100644 --- a/plugins/serializers/json/json_test.go +++ b/plugins/serializers/json/json_test.go @@ -6,10 +6,19 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" ) +func MustMetric(v telegraf.Metric, err error) telegraf.Metric { + if err != nil { + panic(err) + } + return v +} + func TestSerializeMetricFloat(t *testing.T) { now := time.Now() tags := map[string]string{ @@ -21,7 +30,7 @@ func TestSerializeMetricFloat(t *testing.T) { m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) - s := JsonSerializer{} + s, _ := NewSerializer(0) var buf []byte buf, err = s.Serialize(m) assert.NoError(t, err) @@ -29,6 +38,63 @@ func TestSerializeMetricFloat(t *testing.T) { assert.Equal(t, string(expS), string(buf)) } +func TestSerialize_TimestampUnits(t *testing.T) { + tests := []struct { + name string + timestampUnits time.Duration + expected string + }{ + { + name: "default of 1s", + timestampUnits: 0, + expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":1525478795}`, + }, + { + name: "1ns", + timestampUnits: 1 * time.Nanosecond, + expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":1525478795123456789}`, + }, + { + name: "1ms", + timestampUnits: 1 * time.Millisecond, + expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":1525478795123}`, + }, + { + name: "10ms", + timestampUnits: 10 * time.Millisecond, + expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":152547879512}`, + }, + { + name: "15ms is reduced to 10ms", + timestampUnits: 15 * time.Millisecond, + expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":152547879512}`, + }, + { + name: "65ms is reduced to 10ms", + timestampUnits: 65 * time.Millisecond, + expected: `{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":152547879512}`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(1525478795, 123456789), + ), + ) + s, _ := NewSerializer(tt.timestampUnits) + actual, err := s.Serialize(m) + require.NoError(t, err) + require.Equal(t, tt.expected+"\n", string(actual)) + }) + } +} + func TestSerializeMetricInt(t *testing.T) { now := time.Now() tags := map[string]string{ @@ -40,7 +106,7 @@ func TestSerializeMetricInt(t *testing.T) { m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) - s := JsonSerializer{} + s, _ := NewSerializer(0) var buf []byte buf, err = s.Serialize(m) assert.NoError(t, err) @@ -60,7 +126,7 @@ func TestSerializeMetricString(t *testing.T) { m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) - s := JsonSerializer{} + s, _ := NewSerializer(0) var buf []byte buf, err = s.Serialize(m) assert.NoError(t, err) @@ -81,7 +147,7 @@ func TestSerializeMultiFields(t *testing.T) { m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) - s := JsonSerializer{} + s, _ := NewSerializer(0) var buf []byte buf, err = s.Serialize(m) assert.NoError(t, err) @@ -101,10 +167,29 @@ func TestSerializeMetricWithEscapes(t *testing.T) { m, err := metric.New("My CPU", tags, fields, now) assert.NoError(t, err) - s := JsonSerializer{} + s, _ := NewSerializer(0) buf, err := s.Serialize(m) assert.NoError(t, err) expS := []byte(fmt.Sprintf(`{"fields":{"U,age=Idle":90},"name":"My CPU","tags":{"cpu tag":"cpu0"},"timestamp":%d}`, now.Unix()) + "\n") assert.Equal(t, string(expS), string(buf)) } + +func TestSerializeBatch(t *testing.T) { + m := MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ) + + metrics := []telegraf.Metric{m, m} + s, _ := NewSerializer(0) + buf, err := s.SerializeBatch(metrics) + require.NoError(t, err) + require.Equal(t, []byte(`{"metrics":[{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0},{"fields":{"value":42},"name":"cpu","tags":{},"timestamp":0}]}`), buf) +} diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 431112b20..59050d089 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -25,6 +25,11 @@ type Serializer interface { // separate metrics should be separated by a newline, and there should be // a newline at the end of the buffer. Serialize(metric telegraf.Metric) ([]byte, error) + + // SerializeBatch takes an array of telegraf metric and serializes it into + // a byte buffer. This method is not required to be suitable for use with + // line oriented framing. + SerializeBatch(metrics []telegraf.Metric) ([]byte, error) } // Config is a struct that covers the data types needed for all serializer types, @@ -72,7 +77,7 @@ func NewSerializer(config *Config) (Serializer, error) { } func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) { - return &json.JsonSerializer{TimestampUnits: timestampUnits}, nil + return json.NewSerializer(timestampUnits) } func NewInfluxSerializerConfig(config *Config) (Serializer, error) {