Add influx uint support as a runtime option (#3948)
This commit is contained in:
@@ -59,4 +59,10 @@ This InfluxDB output plugin writes metrics to the [InfluxDB](https://github.com/
|
||||
## HTTP Content-Encoding for write request body, can be set to "gzip" to
|
||||
## compress body or "identity" to apply no encoding.
|
||||
# content_encoding = "identity"
|
||||
|
||||
## When true, Telegraf will output unsigned integers as unsigned values,
|
||||
## i.e.: "42u". You will need a version of InfluxDB supporting unsigned
|
||||
## integer values. Enabling this option will result in field type errors if
|
||||
## existing data has been written.
|
||||
# influx_uint_support = false
|
||||
```
|
||||
|
||||
@@ -102,7 +102,8 @@ type HTTPConfig struct {
|
||||
RetentionPolicy string
|
||||
Consistency string
|
||||
|
||||
Serializer *influx.Serializer
|
||||
InfluxUintSupport bool `toml:"influx_uint_support"`
|
||||
Serializer *influx.Serializer
|
||||
}
|
||||
|
||||
type httpClient struct {
|
||||
|
||||
@@ -45,6 +45,7 @@ type InfluxDB struct {
|
||||
HTTPHeaders map[string]string `toml:"http_headers"`
|
||||
ContentEncoding string `toml:"content_encoding"`
|
||||
SkipDatabaseCreation bool `toml:"skip_database_creation"`
|
||||
InfluxUintSupport bool `toml:"influx_uint_support"`
|
||||
|
||||
// Path to CA file
|
||||
SSLCA string `toml:"ssl_ca"`
|
||||
@@ -119,6 +120,12 @@ var sampleConfig = `
|
||||
## HTTP Content-Encoding for write request body, can be set to "gzip" to
|
||||
## compress body or "identity" to apply no encoding.
|
||||
# content_encoding = "identity"
|
||||
|
||||
## When true, Telegraf will output unsigned integers as unsigned values,
|
||||
## i.e.: "42u". You will need a version of InfluxDB supporting unsigned
|
||||
## integer values. Enabling this option will result in field type errors if
|
||||
## existing data has been written.
|
||||
# influx_uint_support = false
|
||||
`
|
||||
|
||||
func (i *InfluxDB) Connect() error {
|
||||
@@ -135,6 +142,9 @@ func (i *InfluxDB) Connect() error {
|
||||
}
|
||||
|
||||
i.serializer = influx.NewSerializer()
|
||||
if i.InfluxUintSupport {
|
||||
i.serializer.SetFieldTypeSupport(influx.UintSupport)
|
||||
}
|
||||
|
||||
for _, u := range urls {
|
||||
u, err := url.Parse(u)
|
||||
|
||||
@@ -48,7 +48,7 @@ func (h *MetricHandler) AddInt(key []byte, value []byte) {
|
||||
fk := unescape(key)
|
||||
fv, err := parseIntBytes(bytes.TrimSuffix(value, []byte("i")), 10, 64)
|
||||
if err != nil {
|
||||
log.Errorf("E! Received unparseable int value: %q", value)
|
||||
log.Errorf("E! Received unparseable int value: %q: %v", value, err)
|
||||
return
|
||||
}
|
||||
h.builder.AddField(fk, fv)
|
||||
@@ -58,7 +58,7 @@ func (h *MetricHandler) AddUint(key []byte, value []byte) {
|
||||
fk := unescape(key)
|
||||
fv, err := parseUintBytes(bytes.TrimSuffix(value, []byte("u")), 10, 64)
|
||||
if err != nil {
|
||||
log.Errorf("E! Received unparseable uint value: %q", value)
|
||||
log.Errorf("E! Received unparseable uint value: %q: %v", value, err)
|
||||
return
|
||||
}
|
||||
h.builder.AddField(fk, fv)
|
||||
@@ -68,7 +68,7 @@ func (h *MetricHandler) AddFloat(key []byte, value []byte) {
|
||||
fk := unescape(key)
|
||||
fv, err := parseFloatBytes(value, 64)
|
||||
if err != nil {
|
||||
log.Errorf("E! Received unparseable float value: %q", value)
|
||||
log.Errorf("E! Received unparseable float value: %q: %v", value, err)
|
||||
return
|
||||
}
|
||||
h.builder.AddField(fk, fv)
|
||||
@@ -84,7 +84,7 @@ func (h *MetricHandler) AddBool(key []byte, value []byte) {
|
||||
fk := unescape(key)
|
||||
fv, err := parseBoolBytes(value)
|
||||
if err != nil {
|
||||
log.Errorf("E! Received unparseable boolean value: %q", value)
|
||||
log.Errorf("E! Received unparseable boolean value: %q: %v", value, err)
|
||||
return
|
||||
}
|
||||
h.builder.AddField(fk, fv)
|
||||
@@ -93,7 +93,7 @@ func (h *MetricHandler) AddBool(key []byte, value []byte) {
|
||||
func (h *MetricHandler) SetTimestamp(tm []byte) {
|
||||
v, err := parseIntBytes(tm, 10, 64)
|
||||
if err != nil {
|
||||
log.Errorf("E! Received unparseable timestamp: %q", tm)
|
||||
log.Errorf("E! Received unparseable timestamp: %q: %v", tm, err)
|
||||
return
|
||||
}
|
||||
ns := v * int64(h.precision)
|
||||
|
||||
@@ -16,12 +16,6 @@ func Metric(v telegraf.Metric, err error) telegraf.Metric {
|
||||
return v
|
||||
}
|
||||
|
||||
const (
|
||||
Uint64Overflow uint64 = 9223372036854775808
|
||||
Uint64Max uint64 = 18446744073709551615
|
||||
Uint64Test uint64 = 42
|
||||
)
|
||||
|
||||
var DefaultTime = func() time.Time {
|
||||
return time.Unix(42, 0)
|
||||
}
|
||||
@@ -262,6 +256,38 @@ var ptests = []struct {
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "field int overflow dropped",
|
||||
input: []byte("cpu value=9223372036854775808i"),
|
||||
metrics: []telegraf.Metric{
|
||||
Metric(
|
||||
metric.New(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{},
|
||||
time.Unix(42, 0),
|
||||
),
|
||||
),
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "field int max value",
|
||||
input: []byte("cpu value=9223372036854775807i"),
|
||||
metrics: []telegraf.Metric{
|
||||
Metric(
|
||||
metric.New(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": 9223372036854775807,
|
||||
},
|
||||
time.Unix(42, 0),
|
||||
),
|
||||
),
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "field uint",
|
||||
input: []byte("cpu value=42u"),
|
||||
@@ -271,7 +297,7 @@ var ptests = []struct {
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": Uint64Test,
|
||||
"value": uint64(42),
|
||||
},
|
||||
time.Unix(42, 0),
|
||||
),
|
||||
@@ -280,16 +306,14 @@ var ptests = []struct {
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "field uint int overflow",
|
||||
input: []byte("cpu value=9223372036854775808u"),
|
||||
name: "field uint overflow dropped",
|
||||
input: []byte("cpu value=18446744073709551616u"),
|
||||
metrics: []telegraf.Metric{
|
||||
Metric(
|
||||
metric.New(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": Uint64Overflow,
|
||||
},
|
||||
map[string]interface{}{},
|
||||
time.Unix(42, 0),
|
||||
),
|
||||
),
|
||||
@@ -297,7 +321,7 @@ var ptests = []struct {
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "field uint maximum",
|
||||
name: "field uint max value",
|
||||
input: []byte("cpu value=18446744073709551615u"),
|
||||
metrics: []telegraf.Metric{
|
||||
Metric(
|
||||
@@ -305,7 +329,7 @@ var ptests = []struct {
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": Uint64Max,
|
||||
"value": uint64(18446744073709551615),
|
||||
},
|
||||
time.Unix(42, 0),
|
||||
),
|
||||
|
||||
@@ -20,6 +20,12 @@ const (
|
||||
SortFields
|
||||
)
|
||||
|
||||
type FieldTypeSupport int
|
||||
|
||||
const (
|
||||
UintSupport FieldTypeSupport = 1 << iota
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNeedMoreSpace = errors.New("need more space")
|
||||
ErrInvalidName = errors.New("invalid name")
|
||||
@@ -32,9 +38,10 @@ var (
|
||||
|
||||
// Serializer is a serializer for line protocol.
|
||||
type Serializer struct {
|
||||
maxLineBytes int
|
||||
bytesWritten int
|
||||
fieldSortOrder FieldSortOrder
|
||||
maxLineBytes int
|
||||
bytesWritten int
|
||||
fieldSortOrder FieldSortOrder
|
||||
fieldTypeSupport FieldTypeSupport
|
||||
|
||||
buf bytes.Buffer
|
||||
header []byte
|
||||
@@ -61,6 +68,10 @@ func (s *Serializer) SetFieldSortOrder(order FieldSortOrder) {
|
||||
s.fieldSortOrder = order
|
||||
}
|
||||
|
||||
func (s *Serializer) SetFieldTypeSupport(typeSupport FieldTypeSupport) {
|
||||
s.fieldTypeSupport = typeSupport
|
||||
}
|
||||
|
||||
// Serialize writes the telegraf.Metric to a byte slice. May produce multiple
|
||||
// lines of output if longer than maximum line length. Lines are terminated
|
||||
// with a newline (LF) char.
|
||||
@@ -142,7 +153,7 @@ func (s *Serializer) buildFieldPair(key string, value interface{}) error {
|
||||
|
||||
s.pair = append(s.pair, key...)
|
||||
s.pair = append(s.pair, '=')
|
||||
pair, err := appendFieldValue(s.pair, value)
|
||||
pair, err := s.appendFieldValue(s.pair, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -235,10 +246,18 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
|
||||
|
||||
}
|
||||
|
||||
func appendFieldValue(buf []byte, value interface{}) ([]byte, error) {
|
||||
func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, error) {
|
||||
switch v := value.(type) {
|
||||
case uint64:
|
||||
return appendUintField(buf, v), nil
|
||||
if s.fieldTypeSupport&UintSupport != 0 {
|
||||
return appendUintField(buf, v), nil
|
||||
} else {
|
||||
if v <= uint64(MaxInt) {
|
||||
return appendIntField(buf, int64(v)), nil
|
||||
} else {
|
||||
return appendIntField(buf, int64(MaxInt)), nil
|
||||
}
|
||||
}
|
||||
case int64:
|
||||
return appendIntField(buf, v), nil
|
||||
case float64:
|
||||
|
||||
@@ -11,26 +11,19 @@ import (
|
||||
)
|
||||
|
||||
func MustMetric(v telegraf.Metric, err error) telegraf.Metric {
|
||||
// Force uint support to be enabled for testing.
|
||||
metric.EnableUintSupport()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
const (
|
||||
Uint64Overflow uint64 = 9223372036854775808
|
||||
Uint64Max uint64 = 18446744073709551615
|
||||
Uint64Test uint64 = 42
|
||||
)
|
||||
|
||||
var tests = []struct {
|
||||
name string
|
||||
maxBytes int
|
||||
input telegraf.Metric
|
||||
output []byte
|
||||
err error
|
||||
name string
|
||||
maxBytes int
|
||||
typeSupport FieldTypeSupport
|
||||
input telegraf.Metric
|
||||
output []byte
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "minimal",
|
||||
@@ -143,40 +136,56 @@ var tests = []struct {
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": Uint64Test,
|
||||
"value": uint64(42),
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
),
|
||||
output: []byte("cpu value=42u 0\n"),
|
||||
output: []byte("cpu value=42u 0\n"),
|
||||
typeSupport: UintSupport,
|
||||
},
|
||||
{
|
||||
name: "uint field int64 overflow",
|
||||
name: "uint field max value",
|
||||
input: MustMetric(
|
||||
metric.New(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": Uint64Overflow,
|
||||
"value": uint64(18446744073709551615),
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
),
|
||||
output: []byte("cpu value=9223372036854775808u 0\n"),
|
||||
output: []byte("cpu value=18446744073709551615u 0\n"),
|
||||
typeSupport: UintSupport,
|
||||
},
|
||||
{
|
||||
name: "uint field uint64 max",
|
||||
name: "uint field no uint support",
|
||||
input: MustMetric(
|
||||
metric.New(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": Uint64Max,
|
||||
"value": uint64(42),
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
),
|
||||
output: []byte("cpu value=18446744073709551615u 0\n"),
|
||||
output: []byte("cpu value=42i 0\n"),
|
||||
},
|
||||
{
|
||||
name: "uint field no uint support overflow",
|
||||
input: MustMetric(
|
||||
metric.New(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": uint64(18446744073709551615),
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
),
|
||||
output: []byte("cpu value=9223372036854775807i 0\n"),
|
||||
},
|
||||
{
|
||||
name: "bool field",
|
||||
@@ -358,6 +367,7 @@ func TestSerializer(t *testing.T) {
|
||||
serializer := NewSerializer()
|
||||
serializer.SetMaxLineBytes(tt.maxBytes)
|
||||
serializer.SetFieldSortOrder(SortFields)
|
||||
serializer.SetFieldTypeSupport(tt.typeSupport)
|
||||
output, err := serializer.Serialize(tt.input)
|
||||
require.Equal(t, tt.err, err)
|
||||
require.Equal(t, string(tt.output), string(output))
|
||||
@@ -370,6 +380,7 @@ func BenchmarkSerializer(b *testing.B) {
|
||||
b.Run(tt.name, func(b *testing.B) {
|
||||
serializer := NewSerializer()
|
||||
serializer.SetMaxLineBytes(tt.maxBytes)
|
||||
serializer.SetFieldTypeSupport(tt.typeSupport)
|
||||
for n := 0; n < b.N; n++ {
|
||||
output, err := serializer.Serialize(tt.input)
|
||||
_ = err
|
||||
|
||||
@@ -40,6 +40,9 @@ type Config struct {
|
||||
// than unsorted fields; influx format only
|
||||
InfluxSortFields bool
|
||||
|
||||
// Support unsigned integer output; influx format only
|
||||
InfluxUintSupport bool
|
||||
|
||||
// Prefix to add to all measurements, only supports Graphite
|
||||
Prefix string
|
||||
|
||||
@@ -77,9 +80,16 @@ func NewInfluxSerializerConfig(config *Config) (Serializer, error) {
|
||||
if config.InfluxSortFields {
|
||||
sort = influx.SortFields
|
||||
}
|
||||
|
||||
var typeSupport influx.FieldTypeSupport
|
||||
if config.InfluxUintSupport {
|
||||
typeSupport = typeSupport + influx.UintSupport
|
||||
}
|
||||
|
||||
s := influx.NewSerializer()
|
||||
s.SetMaxLineBytes(config.InfluxMaxLineBytes)
|
||||
s.SetFieldSortOrder(sort)
|
||||
s.SetFieldTypeSupport(typeSupport)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user