Adds a new json_timestamp_units configuration parameter (#2587)

This commit is contained in:
tjmcs 2017-03-29 17:12:29 -07:00 committed by Daniel Nelson
parent 03ee6022f3
commit fb1c7d0154
4 changed files with 46 additions and 6 deletions

View File

@ -147,4 +147,14 @@ The JSON data format serialized Telegraf metrics in json format. The format is:
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "json" data_format = "json"
json_timestamp_units = "1ns"
``` ```
By default, the timestamp that is output in JSON data format serialized Telegraf
metrics is in seconds. The precision of this timestamp can be adjusted for any output
by adding the optional `json_timestamp_units` parameter to the configuration for
that output. This parameter can be used to set the timestamp units to nanoseconds (`ns`),
microseconds (`us` or `µs`), milliseconds (`ms`), or seconds (`s`). Note that this
parameter will be truncated to the nearest power of 10 that, so if the `json_timestamp_units`
are set to `15ms` the timestamps for the JSON format serialized Telegraf metrics will be
output in hundredths of a second (`10ms`).

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"math"
"os" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
@ -1244,7 +1245,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
// a serializers.Serializer object, and creates it, which can then be added onto // a serializers.Serializer object, and creates it, which can then be added onto
// an Output object. // an Output object.
func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error) { func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error) {
c := &serializers.Config{} c := &serializers.Config{TimestampUnits: time.Duration(1 * time.Second)}
if node, ok := tbl.Fields["data_format"]; ok { if node, ok := tbl.Fields["data_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok { if kv, ok := node.(*ast.KeyValue); ok {
@ -1274,9 +1275,26 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
} }
} }
if node, ok := tbl.Fields["json_timestamp_units"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
timestampVal, err := time.ParseDuration(str.Value)
if err != nil {
return nil, fmt.Errorf("Unable to parse json_timestamp_units as a duration, %s", err)
}
// now that we have a duration, truncate it to the nearest
// power of ten (just in case)
nearest_exponent := int64(math.Log10(float64(timestampVal.Nanoseconds())))
new_nanoseconds := int64(math.Pow(10.0, float64(nearest_exponent)))
c.TimestampUnits = time.Duration(new_nanoseconds)
}
}
}
delete(tbl.Fields, "data_format") delete(tbl.Fields, "data_format")
delete(tbl.Fields, "prefix") delete(tbl.Fields, "prefix")
delete(tbl.Fields, "template") delete(tbl.Fields, "template")
delete(tbl.Fields, "json_timestamp_units")
return serializers.NewSerializer(c) return serializers.NewSerializer(c)
} }

View File

@ -2,19 +2,27 @@ package json
import ( import (
ejson "encoding/json" ejson "encoding/json"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
type JsonSerializer struct { type JsonSerializer struct {
TimestampUnits time.Duration
} }
func (s *JsonSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { func (s *JsonSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
m := make(map[string]interface{}) m := make(map[string]interface{})
units_nanoseconds := s.TimestampUnits.Nanoseconds()
// if the units passed in were less than or equal to zero,
// then serialize the timestamp in seconds (the default)
if units_nanoseconds <= 0 {
units_nanoseconds = 1000000000
}
m["tags"] = metric.Tags() m["tags"] = metric.Tags()
m["fields"] = metric.Fields() m["fields"] = metric.Fields()
m["name"] = metric.Name() m["name"] = metric.Name()
m["timestamp"] = metric.UnixNano() / 1000000000 m["timestamp"] = metric.UnixNano() / units_nanoseconds
serialized, err := ejson.Marshal(m) serialized, err := ejson.Marshal(m)
if err != nil { if err != nil {
return []byte{}, err return []byte{}, err

View File

@ -2,6 +2,7 @@ package serializers
import ( import (
"fmt" "fmt"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -29,7 +30,7 @@ type Serializer interface {
// Config is a struct that covers the data types needed for all serializer types, // Config is a struct that covers the data types needed for all serializer types,
// and can be used to instantiate _any_ of the serializers. // and can be used to instantiate _any_ of the serializers.
type Config struct { type Config struct {
// Dataformat can be one of: influx, graphite // Dataformat can be one of: influx, graphite, or json
DataFormat string DataFormat string
// Prefix to add to all measurements, only supports Graphite // Prefix to add to all measurements, only supports Graphite
@ -38,6 +39,9 @@ type Config struct {
// Template for converting telegraf metrics into Graphite // Template for converting telegraf metrics into Graphite
// only supports Graphite // only supports Graphite
Template string Template string
// Timestamp units to use for JSON formatted output
TimestampUnits time.Duration
} }
// NewSerializer a Serializer interface based on the given config. // NewSerializer a Serializer interface based on the given config.
@ -50,15 +54,15 @@ func NewSerializer(config *Config) (Serializer, error) {
case "graphite": case "graphite":
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template) serializer, err = NewGraphiteSerializer(config.Prefix, config.Template)
case "json": case "json":
serializer, err = NewJsonSerializer() serializer, err = NewJsonSerializer(config.TimestampUnits)
default: default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat) err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
} }
return serializer, err return serializer, err
} }
func NewJsonSerializer() (Serializer, error) { func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) {
return &json.JsonSerializer{}, nil return &json.JsonSerializer{TimestampUnits: timestampUnits}, nil
} }
func NewInfluxSerializer() (Serializer, error) { func NewInfluxSerializer() (Serializer, error) {