From 7a1a91d6fefa7e938bc08877331df17e54d71ec7 Mon Sep 17 00:00:00 2001 From: Tom McSweeney Date: Thu, 8 Dec 2016 13:46:48 +1100 Subject: [PATCH] Adds an 'output_precision' configuration parameter --- docs/CONFIGURATION.md | 2 +- etc/telegraf.conf | 2 +- etc/telegraf_windows.conf | 2 +- internal/config/config.go | 2 +- plugins/outputs/amqp/amqp.go | 14 +++++++++++++- plugins/outputs/file/file.go | 14 +++++++++++++- plugins/outputs/kafka/kafka.go | 14 +++++++++++++- plugins/outputs/mqtt/mqtt.go | 14 +++++++++++++- plugins/outputs/nats/nats.go | 13 ++++++++++++- plugins/outputs/nsq/nsq.go | 18 +++++++++++++++--- plugins/serializers/graphite/graphite.go | 2 +- plugins/serializers/influx/influx.go | 2 +- plugins/serializers/json/json.go | 24 ++++++++++++++++++++++-- plugins/serializers/registry.go | 2 +- 14 files changed, 108 insertions(+), 17 deletions(-) diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 9b2eb99d8..609c770a1 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -59,7 +59,7 @@ ie, a jitter of 5s and flush_interval 10s means flushes will happen every 10-15s * **precision**: By default, precision will be set to the same timestamp order as the collection interval, with the maximum being 1s. Precision will NOT be used for service inputs, such as logparser and statsd. Valid values are -"ns", "us" (or "µs"), "ms", "s". +"1ns", "1us" (or "1µs"), "1ms", "1s". * **logfile**: Specify the log file name. The empty string means to log to stdout. * **debug**: Run telegraf in debug mode. * **quiet**: Run telegraf in quiet mode (error messages only). diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 8ebf0a7a7..57e1418b9 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -58,7 +58,7 @@ ## By default, precision will be set to the same timestamp order as the ## collection interval, with the maximum being 1s. ## Precision will NOT be used for service inputs, such as logparser and statsd. - ## Valid values are "ns", "us" (or "µs"), "ms", "s". + ## Valid values are "1ns", "1us" (or "1µs"), "1ms", "1s". precision = "" ## Logging configuration: diff --git a/etc/telegraf_windows.conf b/etc/telegraf_windows.conf index ca0357ef3..55385560b 100644 --- a/etc/telegraf_windows.conf +++ b/etc/telegraf_windows.conf @@ -67,7 +67,7 @@ urls = ["http://localhost:8086"] # required # The target database for metrics (telegraf will create it if not exists) database = "telegraf" # required - # Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h". + # Precision of writes, valid values are "1ns", "1us" (or "1µs"), "1ms", "1s", "1m", "1h". # note: using second precision greatly helps InfluxDB compression precision = "s" diff --git a/internal/config/config.go b/internal/config/config.go index 24dec4169..ea16a77a3 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -233,7 +233,7 @@ var header = `# Telegraf Configuration ## By default, precision will be set to the same timestamp order as the ## collection interval, with the maximum being 1s. ## Precision will NOT be used for service inputs, such as logparser and statsd. - ## Valid values are "ns", "us" (or "µs"), "ms", "s". + ## Valid values are "1ns", "1us" (or "1µs"), "1ms", "1s". precision = "" ## Logging configuration: diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index d86cac596..c78485d75 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -30,6 +30,9 @@ type AMQP struct { RetentionPolicy string // InfluxDB precision (DEPRECATED) Precision string + // OutputPrecision string (parsed as a duration, + // only used for JSON output) + OutputPrecision string // Path to CA file SSLCA string `toml:"ssl_ca"` @@ -78,6 +81,15 @@ var sampleConfig = ` ## InfluxDB database # database = "telegraf" + ## The output_precision parameter can be used to specify the units that should + ## be used when creating timestamps and is only used when the data_format is + ## set to "json"; in that case valid values are "1ns", "1us" (or "1µs"), "1ms", + ## or "1s"; for the other supported data_format types, the precision will depend + ## on the data_format (seconds for "graphite" data, nanoseconds for "influx" + ## data); if unspecified, then the timestamps output with "json" data + ## will be in seconds + output_precision = "" + ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" @@ -187,7 +199,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { } } - buf, err := q.serializer.Serialize(metric) + buf, err := q.serializer.Serialize(metric, q.OutputPrecision) if err != nil { return err } diff --git a/plugins/outputs/file/file.go b/plugins/outputs/file/file.go index e05d0fe83..d8a7c58fd 100644 --- a/plugins/outputs/file/file.go +++ b/plugins/outputs/file/file.go @@ -12,6 +12,9 @@ import ( type File struct { Files []string + // OutputPrecision string (parsed as a duration, + // only used for JSON output) + OutputPrecision string writer io.Writer closers []io.Closer @@ -23,6 +26,15 @@ var sampleConfig = ` ## Files to write to, "stdout" is a specially handled file. files = ["stdout", "/tmp/metrics.out"] + ## The output_precision parameter can be used to specify the units that should + ## be used when creating timestamps and is only used when the data_format is + ## set to "json"; in that case valid values are "1ns", "1us" (or "1µs"), "1ms", + ## or "1s"; for the other supported data_format types, the precision will depend + ## on the data_format (seconds for "graphite" data, nanoseconds for "influx" + ## data); if unspecified, then the timestamps output with "json" data + ## will be in seconds + output_precision = "" + ## Data format to output. ## Each data format has it's own unique set of configuration options, read ## more about them here: @@ -92,7 +104,7 @@ func (f *File) Write(metrics []telegraf.Metric) error { } for _, metric := range metrics { - b, err := f.serializer.Serialize(metric) + b, err := f.serializer.Serialize(metric, f.OutputPrecision) if err != nil { return fmt.Errorf("failed to serialize message: %s", err) } diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 0bec92812..646648d48 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -25,6 +25,9 @@ type Kafka struct { RequiredAcks int // MaxRetry Tag MaxRetry int + // OutputPrecision string (parsed as a duration, + // only used for JSON output) + OutputPrecision string // Legacy SSL config options // TLS client certificate @@ -85,6 +88,15 @@ var sampleConfig = ` ## The total number of times to retry sending a message max_retry = 3 + ## The output_precision parameter can be used to specify the units that should + ## be used when creating timestamps and is only used when the data_format is + ## set to "json"; in that case valid values are "1ns", "1us" (or "1µs"), "1ms", + ## or "1s"; for the other supported data_format types, the precision will depend + ## on the data_format (seconds for "graphite" data, nanoseconds for "influx" + ## data); if unspecified, then the timestamps output with "json" data + ## will be in seconds + output_precision = "" + ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" @@ -154,7 +166,7 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { } for _, metric := range metrics { - buf, err := k.serializer.Serialize(metric) + buf, err := k.serializer.Serialize(metric, k.OutputPrecision) if err != nil { return err } diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 45f2c91c8..5cca994bf 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -25,6 +25,15 @@ var sampleConfig = ` # username = "telegraf" # password = "metricsmetricsmetricsmetrics" + ## The output_precision parameter can be used to specify the units that should + ## be used when creating timestamps and is only used when the data_format is + ## set to "json"; in that case valid values are "1ns", "1us" (or "1µs"), "1ms", + ## or "1s"; for the other supported data_format types, the precision will depend + ## on the data_format (seconds for "graphite" data, nanoseconds for "influx" + ## data); if unspecified, then the timestamps output with "json" data + ## will be in seconds + output_precision = "" + ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" @@ -47,6 +56,9 @@ type MQTT struct { Timeout internal.Duration TopicPrefix string QoS int `toml:"qos"` + // OutputPrecision string (parsed as a duration, + // only used for JSON output) + OutputPrecision string // Path to CA file SSLCA string `toml:"ssl_ca"` @@ -128,7 +140,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { t = append(t, metric.Name()) topic := strings.Join(t, "/") - buf, err := m.serializer.Serialize(metric) + buf, err := m.serializer.Serialize(metric, m.OutputPrecision) if err != nil { return fmt.Errorf("MQTT Could not serialize metric: %s", metric.String()) diff --git a/plugins/outputs/nats/nats.go b/plugins/outputs/nats/nats.go index e65e799cf..aeaaee6fc 100644 --- a/plugins/outputs/nats/nats.go +++ b/plugins/outputs/nats/nats.go @@ -19,6 +19,9 @@ type NATS struct { Password string // NATS subject to publish metrics to Subject string + // OutputPrecision string (parsed as a duration, + // only used for JSON output) + OutputPrecision string // Path to CA file SSLCA string `toml:"ssl_ca"` @@ -41,6 +44,14 @@ var sampleConfig = ` # password = "" ## NATS subject for producer messages subject = "telegraf" + ## The output_precision parameter can be used to specify the units that should + ## be used when creating timestamps and is only used when the data_format is + ## set to "json"; in that case valid values are "1ns", "1us" (or "1µs"), "1ms", + ## or "1s"; for the other supported data_format types, the precision will depend + ## on the data_format (seconds for "graphite" data, nanoseconds for "influx" + ## data); if unspecified, then the timestamps output with "json" data + ## will be in seconds + output_precision = "" ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" @@ -115,7 +126,7 @@ func (n *NATS) Write(metrics []telegraf.Metric) error { } for _, metric := range metrics { - buf, err := n.serializer.Serialize(metric) + buf, err := n.serializer.Serialize(metric, n.OutputPrecision) if err != nil { return err } diff --git a/plugins/outputs/nsq/nsq.go b/plugins/outputs/nsq/nsq.go index bd1705c10..29c4d9c5a 100644 --- a/plugins/outputs/nsq/nsq.go +++ b/plugins/outputs/nsq/nsq.go @@ -11,8 +11,12 @@ import ( ) type NSQ struct { - Server string - Topic string + Server string + Topic string + // OutputPrecision string (parsed as a duration, + // only used for JSON output) + OutputPrecision string + producer *nsq.Producer serializer serializers.Serializer @@ -23,6 +27,14 @@ var sampleConfig = ` server = "localhost:4150" ## NSQ topic for producer messages topic = "telegraf" + ## The output_precision parameter can be used to specify the units that should + ## be used when creating timestamps and is only used when the data_format is + ## set to "json"; in that case valid values are "1ns", "1us" (or "1µs"), "1ms", + ## or "1s"; for the other supported data_format types, the precision will depend + ## on the data_format (seconds for "graphite" data, nanoseconds for "influx" + ## data); if unspecified, then the timestamps output with "json" data + ## will be in seconds + output_precision = "" ## Data format to output. ## Each data format has it's own unique set of configuration options, read @@ -66,7 +78,7 @@ func (n *NSQ) Write(metrics []telegraf.Metric) error { } for _, metric := range metrics { - buf, err := n.serializer.Serialize(metric) + buf, err := n.serializer.Serialize(metric, n.OutputPrecision) if err != nil { return err } diff --git a/plugins/serializers/graphite/graphite.go b/plugins/serializers/graphite/graphite.go index 57ddea76f..1d8a41d7b 100644 --- a/plugins/serializers/graphite/graphite.go +++ b/plugins/serializers/graphite/graphite.go @@ -20,7 +20,7 @@ type GraphiteSerializer struct { Template string } -func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { +func (s *GraphiteSerializer) Serialize(metric telegraf.Metric, output_precision ...string) ([]byte, error) { out := []byte{} // Convert UnixNano to Unix timestamps diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go index 459a8a74b..cd2c567eb 100644 --- a/plugins/serializers/influx/influx.go +++ b/plugins/serializers/influx/influx.go @@ -7,6 +7,6 @@ import ( type InfluxSerializer struct { } -func (s *InfluxSerializer) Serialize(m telegraf.Metric) ([]byte, error) { +func (s *InfluxSerializer) Serialize(m telegraf.Metric, output_precision ...string) ([]byte, error) { return m.Serialize(), nil } diff --git a/plugins/serializers/json/json.go b/plugins/serializers/json/json.go index 3e259fafd..8f02b7f85 100644 --- a/plugins/serializers/json/json.go +++ b/plugins/serializers/json/json.go @@ -2,6 +2,7 @@ package json import ( ejson "encoding/json" + "time" "github.com/influxdata/telegraf" ) @@ -9,12 +10,31 @@ import ( type JsonSerializer struct { } -func (s *JsonSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { +func (s *JsonSerializer) Serialize(metric telegraf.Metric, output_precision ...string) ([]byte, error) { + // if no duration is specified, or if the output_precision value passed + // in represents a duration that is not greater than zero, then default + // to an output resolution for our timestamp values of 1 second (timestamps + // will be returned to the nearest second) + default_val := "1s" + var output_resolution time.Duration + if len(output_precision) > 0 && len(output_precision[0]) > 0 { + parsed_val, err := time.ParseDuration(output_precision[0]) + if err != nil { + return nil, err + } + if parsed_val > 0.0 { + output_resolution = parsed_val + } else { + output_resolution, _ = time.ParseDuration(default_val) + } + } else { + output_resolution, _ = time.ParseDuration(default_val) + } m := make(map[string]interface{}) m["tags"] = metric.Tags() m["fields"] = metric.Fields() m["name"] = metric.Name() - m["timestamp"] = metric.UnixNano() / 1000000000 + m["timestamp"] = metric.UnixNano() / output_resolution.Nanoseconds() serialized, err := ejson.Marshal(m) if err != nil { return []byte{}, err diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 83be4900b..e49ccd102 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -21,7 +21,7 @@ type Serializer interface { // Serialize takes a single telegraf metric and turns it into a byte buffer. // separate metrics should be separated by a newline, and there should be // a newline at the end of the buffer. - Serialize(metric telegraf.Metric) ([]byte, error) + Serialize(metric telegraf.Metric, output_precision ...string) ([]byte, error) } // Config is a struct that covers the data types needed for all serializer types,