From 2fe26223278f841b4184449929587251a6572e21 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Thu, 29 Mar 2018 13:31:43 -0700 Subject: [PATCH] Add influx uint support as a runtime option (#3948) --- Makefile | 5 +-- docs/DATA_FORMATS_OUTPUT.md | 28 +++++++++--- internal/config/config.go | 13 ++++++ metric/metric.go | 31 ++----------- plugins/outputs/influxdb/README.md | 6 +++ plugins/outputs/influxdb/http.go | 3 +- plugins/outputs/influxdb/influxdb.go | 10 +++++ plugins/parsers/influx/handler.go | 10 ++--- plugins/parsers/influx/parser_test.go | 52 ++++++++++++++++------ plugins/serializers/influx/influx.go | 31 ++++++++++--- plugins/serializers/influx/influx_test.go | 53 ++++++++++++++--------- plugins/serializers/registry.go | 10 +++++ 12 files changed, 168 insertions(+), 84 deletions(-) diff --git a/Makefile b/Makefile index ffa3be0bf..b9832fb6a 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,7 @@ deps: gdm restore telegraf: - go build -i -o $(TELEGRAF) -ldflags "$(LDFLAGS)" $(BUILDFLAGS) ./cmd/telegraf/telegraf.go + go build -i -o $(TELEGRAF) -ldflags "$(LDFLAGS)" ./cmd/telegraf/telegraf.go go-install: go install -ldflags "-w -s $(LDFLAGS)" ./cmd/telegraf @@ -62,9 +62,6 @@ fmtcheck: fi @echo '[INFO] done.' -uint64: - BUILDFLAGS="-tags uint64" $(MAKE) all - lint: golint ./... diff --git a/docs/DATA_FORMATS_OUTPUT.md b/docs/DATA_FORMATS_OUTPUT.md index 136780381..5716b0aec 100644 --- a/docs/DATA_FORMATS_OUTPUT.md +++ b/docs/DATA_FORMATS_OUTPUT.md @@ -2,12 +2,12 @@ Telegraf is able to serialize metrics into the following output data formats: -1. [InfluxDB Line Protocol](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#influx) -1. [JSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#json) -1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite) +1. [InfluxDB Line Protocol](#influx) +1. [JSON](#json) +1. [Graphite](#graphite) Telegraf metrics, like InfluxDB -[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), +[points](https://docs.influxdata.com/influxdb/latest/concepts/glossary/#point), are a combination of four basic parts: 1. Measurement Name @@ -49,8 +49,10 @@ I'll go over below. # Influx: -There are no additional configuration options for InfluxDB line-protocol. The -metrics are serialized directly into InfluxDB line-protocol. +The `influx` format outputs data as +[InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/). +This is the recommended format to use unless another format is required for +interoperability. ### Influx Configuration: @@ -64,6 +66,20 @@ metrics are serialized directly into InfluxDB line-protocol. ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "influx" + + ## Maximum line length in bytes. Useful only for debugging. + # influx_max_line_bytes = 0 + + ## When true, fields will be output in ascending lexical order. Enabling + ## this option will result in decreased performance and is only recommended + ## when you need predictable ordering while debugging. + # influx_sort_fields = false + + ## When true, Telegraf will output unsigned integers as unsigned values, + ## i.e.: `42u`. You will need a version of InfluxDB supporting unsigned + ## integer values. Enabling this option will result in field type errors if + ## existing data has been written. + # influx_uint_support = false ``` # Graphite: diff --git a/internal/config/config.go b/internal/config/config.go index db1e1f82b..fe8cac5ae 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1391,6 +1391,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error } } + if node, ok := tbl.Fields["influx_uint_support"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if b, ok := kv.Value.(*ast.Boolean); ok { + var err error + c.InfluxUintSupport, err = b.Boolean() + if err != nil { + return nil, err + } + } + } + } + if node, ok := tbl.Fields["json_timestamp_units"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { @@ -1409,6 +1421,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error delete(tbl.Fields, "influx_max_line_bytes") delete(tbl.Fields, "influx_sort_fields") + delete(tbl.Fields, "influx_uint_support") delete(tbl.Fields, "data_format") delete(tbl.Fields, "prefix") delete(tbl.Fields, "template") diff --git a/metric/metric.go b/metric/metric.go index 7c8ca1134..23c48697f 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -9,18 +9,6 @@ import ( "github.com/influxdata/telegraf" ) -const MaxInt = int(^uint(0) >> 1) - -// enableUint64Support will enable uint64 support if set to true. -var enableUint64Support = false - -// EnableUintSupport manually enables uint support for convertValue. -// This function will be removed in the future and only exists for unit tests during the -// transition. -func EnableUintSupport() { - enableUint64Support = true -} - type metric struct { name string tags []*telegraf.Tag @@ -269,19 +257,8 @@ func convertField(v interface{}) interface{} { case int: return int64(v) case uint: - if v <= uint(MaxInt) { - return int64(v) - } else { - return int64(MaxInt) - } + return uint64(v) case uint64: - if enableUint64Support == false { - if v <= uint64(MaxInt) { - return int64(v) - } else { - return int64(MaxInt) - } - } return uint64(v) case []byte: return string(v) @@ -292,11 +269,11 @@ func convertField(v interface{}) interface{} { case int8: return int64(v) case uint32: - return int64(v) + return uint64(v) case uint16: - return int64(v) + return uint64(v) case uint8: - return int64(v) + return uint64(v) case float32: return float64(v) default: diff --git a/plugins/outputs/influxdb/README.md b/plugins/outputs/influxdb/README.md index aafe83da7..bac25cf53 100644 --- a/plugins/outputs/influxdb/README.md +++ b/plugins/outputs/influxdb/README.md @@ -59,4 +59,10 @@ This InfluxDB output plugin writes metrics to the [InfluxDB](https://github.com/ ## HTTP Content-Encoding for write request body, can be set to "gzip" to ## compress body or "identity" to apply no encoding. # content_encoding = "identity" + + ## When true, Telegraf will output unsigned integers as unsigned values, + ## i.e.: "42u". You will need a version of InfluxDB supporting unsigned + ## integer values. Enabling this option will result in field type errors if + ## existing data has been written. + # influx_uint_support = false ``` diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index fdbb5bd8c..f32054f40 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -102,7 +102,8 @@ type HTTPConfig struct { RetentionPolicy string Consistency string - Serializer *influx.Serializer + InfluxUintSupport bool `toml:"influx_uint_support"` + Serializer *influx.Serializer } type httpClient struct { diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 2e6e0d9c6..3d1a895a2 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -45,6 +45,7 @@ type InfluxDB struct { HTTPHeaders map[string]string `toml:"http_headers"` ContentEncoding string `toml:"content_encoding"` SkipDatabaseCreation bool `toml:"skip_database_creation"` + InfluxUintSupport bool `toml:"influx_uint_support"` // Path to CA file SSLCA string `toml:"ssl_ca"` @@ -119,6 +120,12 @@ var sampleConfig = ` ## HTTP Content-Encoding for write request body, can be set to "gzip" to ## compress body or "identity" to apply no encoding. # content_encoding = "identity" + + ## When true, Telegraf will output unsigned integers as unsigned values, + ## i.e.: "42u". You will need a version of InfluxDB supporting unsigned + ## integer values. Enabling this option will result in field type errors if + ## existing data has been written. + # influx_uint_support = false ` func (i *InfluxDB) Connect() error { @@ -135,6 +142,9 @@ func (i *InfluxDB) Connect() error { } i.serializer = influx.NewSerializer() + if i.InfluxUintSupport { + i.serializer.SetFieldTypeSupport(influx.UintSupport) + } for _, u := range urls { u, err := url.Parse(u) diff --git a/plugins/parsers/influx/handler.go b/plugins/parsers/influx/handler.go index 8c17cf9cf..a75883797 100644 --- a/plugins/parsers/influx/handler.go +++ b/plugins/parsers/influx/handler.go @@ -48,7 +48,7 @@ func (h *MetricHandler) AddInt(key []byte, value []byte) { fk := unescape(key) fv, err := parseIntBytes(bytes.TrimSuffix(value, []byte("i")), 10, 64) if err != nil { - log.Errorf("E! Received unparseable int value: %q", value) + log.Errorf("E! Received unparseable int value: %q: %v", value, err) return } h.builder.AddField(fk, fv) @@ -58,7 +58,7 @@ func (h *MetricHandler) AddUint(key []byte, value []byte) { fk := unescape(key) fv, err := parseUintBytes(bytes.TrimSuffix(value, []byte("u")), 10, 64) if err != nil { - log.Errorf("E! Received unparseable uint value: %q", value) + log.Errorf("E! Received unparseable uint value: %q: %v", value, err) return } h.builder.AddField(fk, fv) @@ -68,7 +68,7 @@ func (h *MetricHandler) AddFloat(key []byte, value []byte) { fk := unescape(key) fv, err := parseFloatBytes(value, 64) if err != nil { - log.Errorf("E! Received unparseable float value: %q", value) + log.Errorf("E! Received unparseable float value: %q: %v", value, err) return } h.builder.AddField(fk, fv) @@ -84,7 +84,7 @@ func (h *MetricHandler) AddBool(key []byte, value []byte) { fk := unescape(key) fv, err := parseBoolBytes(value) if err != nil { - log.Errorf("E! Received unparseable boolean value: %q", value) + log.Errorf("E! Received unparseable boolean value: %q: %v", value, err) return } h.builder.AddField(fk, fv) @@ -93,7 +93,7 @@ func (h *MetricHandler) AddBool(key []byte, value []byte) { func (h *MetricHandler) SetTimestamp(tm []byte) { v, err := parseIntBytes(tm, 10, 64) if err != nil { - log.Errorf("E! Received unparseable timestamp: %q", tm) + log.Errorf("E! Received unparseable timestamp: %q: %v", tm, err) return } ns := v * int64(h.precision) diff --git a/plugins/parsers/influx/parser_test.go b/plugins/parsers/influx/parser_test.go index b57eacfa0..59d4d7f16 100644 --- a/plugins/parsers/influx/parser_test.go +++ b/plugins/parsers/influx/parser_test.go @@ -16,12 +16,6 @@ func Metric(v telegraf.Metric, err error) telegraf.Metric { return v } -const ( - Uint64Overflow uint64 = 9223372036854775808 - Uint64Max uint64 = 18446744073709551615 - Uint64Test uint64 = 42 -) - var DefaultTime = func() time.Time { return time.Unix(42, 0) } @@ -262,6 +256,38 @@ var ptests = []struct { }, err: nil, }, + { + name: "field int overflow dropped", + input: []byte("cpu value=9223372036854775808i"), + metrics: []telegraf.Metric{ + Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(42, 0), + ), + ), + }, + err: nil, + }, + { + name: "field int max value", + input: []byte("cpu value=9223372036854775807i"), + metrics: []telegraf.Metric{ + Metric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 9223372036854775807, + }, + time.Unix(42, 0), + ), + ), + }, + err: nil, + }, { name: "field uint", input: []byte("cpu value=42u"), @@ -271,7 +297,7 @@ var ptests = []struct { "cpu", map[string]string{}, map[string]interface{}{ - "value": Uint64Test, + "value": uint64(42), }, time.Unix(42, 0), ), @@ -280,16 +306,14 @@ var ptests = []struct { err: nil, }, { - name: "field uint int overflow", - input: []byte("cpu value=9223372036854775808u"), + name: "field uint overflow dropped", + input: []byte("cpu value=18446744073709551616u"), metrics: []telegraf.Metric{ Metric( metric.New( "cpu", map[string]string{}, - map[string]interface{}{ - "value": Uint64Overflow, - }, + map[string]interface{}{}, time.Unix(42, 0), ), ), @@ -297,7 +321,7 @@ var ptests = []struct { err: nil, }, { - name: "field uint maximum", + name: "field uint max value", input: []byte("cpu value=18446744073709551615u"), metrics: []telegraf.Metric{ Metric( @@ -305,7 +329,7 @@ var ptests = []struct { "cpu", map[string]string{}, map[string]interface{}{ - "value": Uint64Max, + "value": uint64(18446744073709551615), }, time.Unix(42, 0), ), diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go index 321380013..7d68ed3df 100644 --- a/plugins/serializers/influx/influx.go +++ b/plugins/serializers/influx/influx.go @@ -20,6 +20,12 @@ const ( SortFields ) +type FieldTypeSupport int + +const ( + UintSupport FieldTypeSupport = 1 << iota +) + var ( ErrNeedMoreSpace = errors.New("need more space") ErrInvalidName = errors.New("invalid name") @@ -32,9 +38,10 @@ var ( // Serializer is a serializer for line protocol. type Serializer struct { - maxLineBytes int - bytesWritten int - fieldSortOrder FieldSortOrder + maxLineBytes int + bytesWritten int + fieldSortOrder FieldSortOrder + fieldTypeSupport FieldTypeSupport buf bytes.Buffer header []byte @@ -61,6 +68,10 @@ func (s *Serializer) SetFieldSortOrder(order FieldSortOrder) { s.fieldSortOrder = order } +func (s *Serializer) SetFieldTypeSupport(typeSupport FieldTypeSupport) { + s.fieldTypeSupport = typeSupport +} + // Serialize writes the telegraf.Metric to a byte slice. May produce multiple // lines of output if longer than maximum line length. Lines are terminated // with a newline (LF) char. @@ -142,7 +153,7 @@ func (s *Serializer) buildFieldPair(key string, value interface{}) error { s.pair = append(s.pair, key...) s.pair = append(s.pair, '=') - pair, err := appendFieldValue(s.pair, value) + pair, err := s.appendFieldValue(s.pair, value) if err != nil { return err } @@ -235,10 +246,18 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error { } -func appendFieldValue(buf []byte, value interface{}) ([]byte, error) { +func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, error) { switch v := value.(type) { case uint64: - return appendUintField(buf, v), nil + if s.fieldTypeSupport&UintSupport != 0 { + return appendUintField(buf, v), nil + } else { + if v <= uint64(MaxInt) { + return appendIntField(buf, int64(v)), nil + } else { + return appendIntField(buf, int64(MaxInt)), nil + } + } case int64: return appendIntField(buf, v), nil case float64: diff --git a/plugins/serializers/influx/influx_test.go b/plugins/serializers/influx/influx_test.go index eaf55a9e2..e31516462 100644 --- a/plugins/serializers/influx/influx_test.go +++ b/plugins/serializers/influx/influx_test.go @@ -11,26 +11,19 @@ import ( ) func MustMetric(v telegraf.Metric, err error) telegraf.Metric { - // Force uint support to be enabled for testing. - metric.EnableUintSupport() if err != nil { panic(err) } return v } -const ( - Uint64Overflow uint64 = 9223372036854775808 - Uint64Max uint64 = 18446744073709551615 - Uint64Test uint64 = 42 -) - var tests = []struct { - name string - maxBytes int - input telegraf.Metric - output []byte - err error + name string + maxBytes int + typeSupport FieldTypeSupport + input telegraf.Metric + output []byte + err error }{ { name: "minimal", @@ -143,40 +136,56 @@ var tests = []struct { "cpu", map[string]string{}, map[string]interface{}{ - "value": Uint64Test, + "value": uint64(42), }, time.Unix(0, 0), ), ), - output: []byte("cpu value=42u 0\n"), + output: []byte("cpu value=42u 0\n"), + typeSupport: UintSupport, }, { - name: "uint field int64 overflow", + name: "uint field max value", input: MustMetric( metric.New( "cpu", map[string]string{}, map[string]interface{}{ - "value": Uint64Overflow, + "value": uint64(18446744073709551615), }, time.Unix(0, 0), ), ), - output: []byte("cpu value=9223372036854775808u 0\n"), + output: []byte("cpu value=18446744073709551615u 0\n"), + typeSupport: UintSupport, }, { - name: "uint field uint64 max", + name: "uint field no uint support", input: MustMetric( metric.New( "cpu", map[string]string{}, map[string]interface{}{ - "value": Uint64Max, + "value": uint64(42), }, time.Unix(0, 0), ), ), - output: []byte("cpu value=18446744073709551615u 0\n"), + output: []byte("cpu value=42i 0\n"), + }, + { + name: "uint field no uint support overflow", + input: MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": uint64(18446744073709551615), + }, + time.Unix(0, 0), + ), + ), + output: []byte("cpu value=9223372036854775807i 0\n"), }, { name: "bool field", @@ -358,6 +367,7 @@ func TestSerializer(t *testing.T) { serializer := NewSerializer() serializer.SetMaxLineBytes(tt.maxBytes) serializer.SetFieldSortOrder(SortFields) + serializer.SetFieldTypeSupport(tt.typeSupport) output, err := serializer.Serialize(tt.input) require.Equal(t, tt.err, err) require.Equal(t, string(tt.output), string(output)) @@ -370,6 +380,7 @@ func BenchmarkSerializer(b *testing.B) { b.Run(tt.name, func(b *testing.B) { serializer := NewSerializer() serializer.SetMaxLineBytes(tt.maxBytes) + serializer.SetFieldTypeSupport(tt.typeSupport) for n := 0; n < b.N; n++ { output, err := serializer.Serialize(tt.input) _ = err diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 3389ec59c..431112b20 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -40,6 +40,9 @@ type Config struct { // than unsorted fields; influx format only InfluxSortFields bool + // Support unsigned integer output; influx format only + InfluxUintSupport bool + // Prefix to add to all measurements, only supports Graphite Prefix string @@ -77,9 +80,16 @@ func NewInfluxSerializerConfig(config *Config) (Serializer, error) { if config.InfluxSortFields { sort = influx.SortFields } + + var typeSupport influx.FieldTypeSupport + if config.InfluxUintSupport { + typeSupport = typeSupport + influx.UintSupport + } + s := influx.NewSerializer() s.SetMaxLineBytes(config.InfluxMaxLineBytes) s.SetFieldSortOrder(sort) + s.SetFieldTypeSupport(typeSupport) return s, nil }