From 9ebf16636d42a5bd26888eb284763e70c8daefaa Mon Sep 17 00:00:00 2001 From: maxunt Date: Wed, 11 Jul 2018 17:29:23 -0700 Subject: [PATCH] Add parse_multivalue to collectd parser (#4403) --- docs/DATA_FORMATS_INPUT.md | 6 ++ internal/config/config.go | 9 ++ plugins/parsers/collectd/parser.go | 109 +++++++++++++++++------- plugins/parsers/collectd/parser_test.go | 20 ++++- plugins/parsers/registry.go | 8 +- 5 files changed, 117 insertions(+), 35 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index c1192e72b..88282c846 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -479,6 +479,12 @@ You can also change the path to the typesdb or add additional typesdb using collectd_security_level = "encrypt" ## Path of to TypesDB specifications collectd_typesdb = ["/usr/share/collectd/types.db"] + + # Multi-value plugins can be handled two ways. + # "split" will parse and store the multi-value plugin data into separate measurements + # "join" will parse and store the multi-value plugin as a single multi-value measurement. + # "split" is the default behavior for backward compatability with previous versions of influxdb. + collectd_parse_multivalue = "split" ``` # Dropwizard: diff --git a/internal/config/config.go b/internal/config/config.go index 8a31c271e..5b3e53457 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1285,6 +1285,14 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } } + if node, ok := tbl.Fields["collectd_parse_multivalue"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.CollectdSplit = str.Value + } + } + } + if node, ok := tbl.Fields["collectd_typesdb"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { @@ -1348,6 +1356,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { delete(tbl.Fields, "collectd_auth_file") delete(tbl.Fields, "collectd_security_level") delete(tbl.Fields, "collectd_typesdb") + delete(tbl.Fields, "collectd_parse_multivalue") delete(tbl.Fields, "dropwizard_metric_registry_path") delete(tbl.Fields, "dropwizard_time_path") delete(tbl.Fields, "dropwizard_time_format") diff --git a/plugins/parsers/collectd/parser.go b/plugins/parsers/collectd/parser.go index 20525610c..6b7fbd756 100644 --- a/plugins/parsers/collectd/parser.go +++ b/plugins/parsers/collectd/parser.go @@ -21,7 +21,10 @@ type CollectdParser struct { // DefaultTags will be added to every parsed metric DefaultTags map[string]string - popts network.ParseOpts + //whether or not to split multi value metric into multiple metrics + //default value is split + ParseMultiValue string + popts network.ParseOpts } func (p *CollectdParser) SetParseOpts(popts *network.ParseOpts) { @@ -32,6 +35,7 @@ func NewCollectdParser( authFile string, securityLevel string, typesDB []string, + split string, ) (*CollectdParser, error) { popts := network.ParseOpts{} @@ -64,7 +68,8 @@ func NewCollectdParser( } } - parser := CollectdParser{popts: popts} + parser := CollectdParser{popts: popts, + ParseMultiValue: split} return &parser, nil } @@ -76,7 +81,7 @@ func (p *CollectdParser) Parse(buf []byte) ([]telegraf.Metric, error) { metrics := []telegraf.Metric{} for _, valueList := range valueLists { - metrics = append(metrics, UnmarshalValueList(valueList)...) + metrics = append(metrics, UnmarshalValueList(valueList, p.ParseMultiValue)...) } if len(p.DefaultTags) > 0 { @@ -111,47 +116,91 @@ func (p *CollectdParser) SetDefaultTags(tags map[string]string) { } // UnmarshalValueList translates a ValueList into a Telegraf metric. -func UnmarshalValueList(vl *api.ValueList) []telegraf.Metric { +func UnmarshalValueList(vl *api.ValueList, multiValue string) []telegraf.Metric { timestamp := vl.Time.UTC() var metrics []telegraf.Metric - for i := range vl.Values { - var name string - name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i)) + + //set multiValue to default "split" if nothing is specified + if multiValue == "" { + multiValue = "split" + } + switch multiValue { + case "split": + for i := range vl.Values { + var name string + name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i)) + tags := make(map[string]string) + fields := make(map[string]interface{}) + + // Convert interface back to actual type, then to float64 + switch value := vl.Values[i].(type) { + case api.Gauge: + fields["value"] = float64(value) + case api.Derive: + fields["value"] = float64(value) + case api.Counter: + fields["value"] = float64(value) + } + + if vl.Identifier.Host != "" { + tags["host"] = vl.Identifier.Host + } + if vl.Identifier.PluginInstance != "" { + tags["instance"] = vl.Identifier.PluginInstance + } + if vl.Identifier.Type != "" { + tags["type"] = vl.Identifier.Type + } + if vl.Identifier.TypeInstance != "" { + tags["type_instance"] = vl.Identifier.TypeInstance + } + + // Drop invalid points + m, err := metric.New(name, tags, fields, timestamp) + if err != nil { + log.Printf("E! Dropping metric %v: %v", name, err) + continue + } + + metrics = append(metrics, m) + } + case "join": + name := vl.Identifier.Plugin tags := make(map[string]string) fields := make(map[string]interface{}) + for i := range vl.Values { + switch value := vl.Values[i].(type) { + case api.Gauge: + fields[vl.DSName(i)] = float64(value) + case api.Derive: + fields[vl.DSName(i)] = float64(value) + case api.Counter: + fields[vl.DSName(i)] = float64(value) + } - // Convert interface back to actual type, then to float64 - switch value := vl.Values[i].(type) { - case api.Gauge: - fields["value"] = float64(value) - case api.Derive: - fields["value"] = float64(value) - case api.Counter: - fields["value"] = float64(value) + if vl.Identifier.Host != "" { + tags["host"] = vl.Identifier.Host + } + if vl.Identifier.PluginInstance != "" { + tags["instance"] = vl.Identifier.PluginInstance + } + if vl.Identifier.Type != "" { + tags["type"] = vl.Identifier.Type + } + if vl.Identifier.TypeInstance != "" { + tags["type_instance"] = vl.Identifier.TypeInstance + } } - if vl.Identifier.Host != "" { - tags["host"] = vl.Identifier.Host - } - if vl.Identifier.PluginInstance != "" { - tags["instance"] = vl.Identifier.PluginInstance - } - if vl.Identifier.Type != "" { - tags["type"] = vl.Identifier.Type - } - if vl.Identifier.TypeInstance != "" { - tags["type_instance"] = vl.Identifier.TypeInstance - } - - // Drop invalid points m, err := metric.New(name, tags, fields, timestamp) if err != nil { log.Printf("E! Dropping metric %v: %v", name, err) - continue } metrics = append(metrics, m) + default: + log.Printf("parse-multi-value config can only be 'split' or 'join'") } return metrics } diff --git a/plugins/parsers/collectd/parser_test.go b/plugins/parsers/collectd/parser_test.go index 3aad04013..afd58ec72 100644 --- a/plugins/parsers/collectd/parser_test.go +++ b/plugins/parsers/collectd/parser_test.go @@ -6,6 +6,7 @@ import ( "collectd.org/api" "collectd.org/network" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" @@ -76,7 +77,7 @@ var multiMetric = testCase{ api.Derive(42), api.Gauge(42), }, - DSNames: []string(nil), + DSNames: []string{"t1", "t2"}, }, }, []metricData{ @@ -108,7 +109,7 @@ var multiMetric = testCase{ } func TestNewCollectdParser(t *testing.T) { - parser, err := NewCollectdParser("", "", []string{}) + parser, err := NewCollectdParser("", "", []string{}, "join") require.Nil(t, err) require.Equal(t, parser.popts.SecurityLevel, network.None) require.NotNil(t, parser.popts.PasswordLookup) @@ -133,6 +134,19 @@ func TestParse(t *testing.T) { } } +func TestParseMultiValueSplit(t *testing.T) { + buf, err := writeValueList(multiMetric.vl) + require.Nil(t, err) + bytes, err := buf.Bytes() + require.Nil(t, err) + + parser := &CollectdParser{ParseMultiValue: "split"} + metrics, err := parser.Parse(bytes) + require.Nil(t, err) + + assert.Equal(t, 2, len(metrics)) +} + func TestParse_DefaultTags(t *testing.T) { buf, err := writeValueList(singleMetric.vl) require.Nil(t, err) @@ -266,7 +280,7 @@ func TestParseLine(t *testing.T) { bytes, err := buf.Bytes() require.Nil(t, err) - parser, err := NewCollectdParser("", "", []string{}) + parser, err := NewCollectdParser("", "", []string{}, "split") require.Nil(t, err) metric, err := parser.ParseLine(string(bytes)) require.Nil(t, err) diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 58fce1722..ac6bbbda8 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -66,6 +66,9 @@ type Config struct { // Dataset specification for collectd CollectdTypesDB []string + // whether to split or join multivalue metrics + CollectdSplit string + // DataType only applies to value, this will be the type to parse value to DataType string @@ -109,7 +112,7 @@ func NewParser(config *Config) (Parser, error) { config.Templates, config.DefaultTags) case "collectd": parser, err = NewCollectdParser(config.CollectdAuthFile, - config.CollectdSecurityLevel, config.CollectdTypesDB) + config.CollectdSecurityLevel, config.CollectdTypesDB, config.CollectdSplit) case "dropwizard": parser, err = NewDropwizardParser( config.DropwizardMetricRegistryPath, @@ -172,8 +175,9 @@ func NewCollectdParser( authFile string, securityLevel string, typesDB []string, + split string, ) (Parser, error) { - return collectd.NewCollectdParser(authFile, securityLevel, typesDB) + return collectd.NewCollectdParser(authFile, securityLevel, typesDB, split) } func NewDropwizardParser(