Add parse_multivalue to collectd parser (#4403)
This commit is contained in:
		
							parent
							
								
									7b73b0db3a
								
							
						
					
					
						commit
						9ebf16636d
					
				|  | @ -479,6 +479,12 @@ You can also change the path to the typesdb or add additional typesdb using | ||||||
|   collectd_security_level = "encrypt" |   collectd_security_level = "encrypt" | ||||||
|   ## Path of to TypesDB specifications |   ## Path of to TypesDB specifications | ||||||
|   collectd_typesdb = ["/usr/share/collectd/types.db"] |   collectd_typesdb = ["/usr/share/collectd/types.db"] | ||||||
|  | 
 | ||||||
|  |   # Multi-value plugins can be handled two ways.   | ||||||
|  |   # "split" will parse and store the multi-value plugin data into separate measurements | ||||||
|  |   # "join" will parse and store the multi-value plugin as a single multi-value measurement.   | ||||||
|  |   # "split" is the default behavior for backward compatability with previous versions of influxdb. | ||||||
|  |   collectd_parse_multivalue = "split" | ||||||
| ``` | ``` | ||||||
| 
 | 
 | ||||||
| # Dropwizard: | # Dropwizard: | ||||||
|  |  | ||||||
|  | @ -1285,6 +1285,14 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	if node, ok := tbl.Fields["collectd_parse_multivalue"]; ok { | ||||||
|  | 		if kv, ok := node.(*ast.KeyValue); ok { | ||||||
|  | 			if str, ok := kv.Value.(*ast.String); ok { | ||||||
|  | 				c.CollectdSplit = str.Value | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	if node, ok := tbl.Fields["collectd_typesdb"]; ok { | 	if node, ok := tbl.Fields["collectd_typesdb"]; ok { | ||||||
| 		if kv, ok := node.(*ast.KeyValue); ok { | 		if kv, ok := node.(*ast.KeyValue); ok { | ||||||
| 			if ary, ok := kv.Value.(*ast.Array); ok { | 			if ary, ok := kv.Value.(*ast.Array); ok { | ||||||
|  | @ -1348,6 +1356,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { | ||||||
| 	delete(tbl.Fields, "collectd_auth_file") | 	delete(tbl.Fields, "collectd_auth_file") | ||||||
| 	delete(tbl.Fields, "collectd_security_level") | 	delete(tbl.Fields, "collectd_security_level") | ||||||
| 	delete(tbl.Fields, "collectd_typesdb") | 	delete(tbl.Fields, "collectd_typesdb") | ||||||
|  | 	delete(tbl.Fields, "collectd_parse_multivalue") | ||||||
| 	delete(tbl.Fields, "dropwizard_metric_registry_path") | 	delete(tbl.Fields, "dropwizard_metric_registry_path") | ||||||
| 	delete(tbl.Fields, "dropwizard_time_path") | 	delete(tbl.Fields, "dropwizard_time_path") | ||||||
| 	delete(tbl.Fields, "dropwizard_time_format") | 	delete(tbl.Fields, "dropwizard_time_format") | ||||||
|  |  | ||||||
|  | @ -21,6 +21,9 @@ type CollectdParser struct { | ||||||
| 	// DefaultTags will be added to every parsed metric
 | 	// DefaultTags will be added to every parsed metric
 | ||||||
| 	DefaultTags map[string]string | 	DefaultTags map[string]string | ||||||
| 
 | 
 | ||||||
|  | 	//whether or not to split multi value metric into multiple metrics
 | ||||||
|  | 	//default value is split
 | ||||||
|  | 	ParseMultiValue string | ||||||
| 	popts           network.ParseOpts | 	popts           network.ParseOpts | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -32,6 +35,7 @@ func NewCollectdParser( | ||||||
| 	authFile string, | 	authFile string, | ||||||
| 	securityLevel string, | 	securityLevel string, | ||||||
| 	typesDB []string, | 	typesDB []string, | ||||||
|  | 	split string, | ||||||
| ) (*CollectdParser, error) { | ) (*CollectdParser, error) { | ||||||
| 	popts := network.ParseOpts{} | 	popts := network.ParseOpts{} | ||||||
| 
 | 
 | ||||||
|  | @ -64,7 +68,8 @@ func NewCollectdParser( | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	parser := CollectdParser{popts: popts} | 	parser := CollectdParser{popts: popts, | ||||||
|  | 		ParseMultiValue: split} | ||||||
| 	return &parser, nil | 	return &parser, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -76,7 +81,7 @@ func (p *CollectdParser) Parse(buf []byte) ([]telegraf.Metric, error) { | ||||||
| 
 | 
 | ||||||
| 	metrics := []telegraf.Metric{} | 	metrics := []telegraf.Metric{} | ||||||
| 	for _, valueList := range valueLists { | 	for _, valueList := range valueLists { | ||||||
| 		metrics = append(metrics, UnmarshalValueList(valueList)...) | 		metrics = append(metrics, UnmarshalValueList(valueList, p.ParseMultiValue)...) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if len(p.DefaultTags) > 0 { | 	if len(p.DefaultTags) > 0 { | ||||||
|  | @ -111,10 +116,17 @@ func (p *CollectdParser) SetDefaultTags(tags map[string]string) { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // UnmarshalValueList translates a ValueList into a Telegraf metric.
 | // UnmarshalValueList translates a ValueList into a Telegraf metric.
 | ||||||
| func UnmarshalValueList(vl *api.ValueList) []telegraf.Metric { | func UnmarshalValueList(vl *api.ValueList, multiValue string) []telegraf.Metric { | ||||||
| 	timestamp := vl.Time.UTC() | 	timestamp := vl.Time.UTC() | ||||||
| 
 | 
 | ||||||
| 	var metrics []telegraf.Metric | 	var metrics []telegraf.Metric | ||||||
|  | 
 | ||||||
|  | 	//set multiValue to default "split" if nothing is specified
 | ||||||
|  | 	if multiValue == "" { | ||||||
|  | 		multiValue = "split" | ||||||
|  | 	} | ||||||
|  | 	switch multiValue { | ||||||
|  | 	case "split": | ||||||
| 		for i := range vl.Values { | 		for i := range vl.Values { | ||||||
| 			var name string | 			var name string | ||||||
| 			name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i)) | 			name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i)) | ||||||
|  | @ -153,6 +165,43 @@ func UnmarshalValueList(vl *api.ValueList) []telegraf.Metric { | ||||||
| 
 | 
 | ||||||
| 			metrics = append(metrics, m) | 			metrics = append(metrics, m) | ||||||
| 		} | 		} | ||||||
|  | 	case "join": | ||||||
|  | 		name := vl.Identifier.Plugin | ||||||
|  | 		tags := make(map[string]string) | ||||||
|  | 		fields := make(map[string]interface{}) | ||||||
|  | 		for i := range vl.Values { | ||||||
|  | 			switch value := vl.Values[i].(type) { | ||||||
|  | 			case api.Gauge: | ||||||
|  | 				fields[vl.DSName(i)] = float64(value) | ||||||
|  | 			case api.Derive: | ||||||
|  | 				fields[vl.DSName(i)] = float64(value) | ||||||
|  | 			case api.Counter: | ||||||
|  | 				fields[vl.DSName(i)] = float64(value) | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			if vl.Identifier.Host != "" { | ||||||
|  | 				tags["host"] = vl.Identifier.Host | ||||||
|  | 			} | ||||||
|  | 			if vl.Identifier.PluginInstance != "" { | ||||||
|  | 				tags["instance"] = vl.Identifier.PluginInstance | ||||||
|  | 			} | ||||||
|  | 			if vl.Identifier.Type != "" { | ||||||
|  | 				tags["type"] = vl.Identifier.Type | ||||||
|  | 			} | ||||||
|  | 			if vl.Identifier.TypeInstance != "" { | ||||||
|  | 				tags["type_instance"] = vl.Identifier.TypeInstance | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		m, err := metric.New(name, tags, fields, timestamp) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.Printf("E! Dropping metric %v: %v", name, err) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		metrics = append(metrics, m) | ||||||
|  | 	default: | ||||||
|  | 		log.Printf("parse-multi-value config can only be 'split' or 'join'") | ||||||
|  | 	} | ||||||
| 	return metrics | 	return metrics | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -6,6 +6,7 @@ import ( | ||||||
| 
 | 
 | ||||||
| 	"collectd.org/api" | 	"collectd.org/api" | ||||||
| 	"collectd.org/network" | 	"collectd.org/network" | ||||||
|  | 	"github.com/stretchr/testify/assert" | ||||||
| 	"github.com/stretchr/testify/require" | 	"github.com/stretchr/testify/require" | ||||||
| 
 | 
 | ||||||
| 	"github.com/influxdata/telegraf" | 	"github.com/influxdata/telegraf" | ||||||
|  | @ -76,7 +77,7 @@ var multiMetric = testCase{ | ||||||
| 				api.Derive(42), | 				api.Derive(42), | ||||||
| 				api.Gauge(42), | 				api.Gauge(42), | ||||||
| 			}, | 			}, | ||||||
| 			DSNames: []string(nil), | 			DSNames: []string{"t1", "t2"}, | ||||||
| 		}, | 		}, | ||||||
| 	}, | 	}, | ||||||
| 	[]metricData{ | 	[]metricData{ | ||||||
|  | @ -108,7 +109,7 @@ var multiMetric = testCase{ | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func TestNewCollectdParser(t *testing.T) { | func TestNewCollectdParser(t *testing.T) { | ||||||
| 	parser, err := NewCollectdParser("", "", []string{}) | 	parser, err := NewCollectdParser("", "", []string{}, "join") | ||||||
| 	require.Nil(t, err) | 	require.Nil(t, err) | ||||||
| 	require.Equal(t, parser.popts.SecurityLevel, network.None) | 	require.Equal(t, parser.popts.SecurityLevel, network.None) | ||||||
| 	require.NotNil(t, parser.popts.PasswordLookup) | 	require.NotNil(t, parser.popts.PasswordLookup) | ||||||
|  | @ -133,6 +134,19 @@ func TestParse(t *testing.T) { | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func TestParseMultiValueSplit(t *testing.T) { | ||||||
|  | 	buf, err := writeValueList(multiMetric.vl) | ||||||
|  | 	require.Nil(t, err) | ||||||
|  | 	bytes, err := buf.Bytes() | ||||||
|  | 	require.Nil(t, err) | ||||||
|  | 
 | ||||||
|  | 	parser := &CollectdParser{ParseMultiValue: "split"} | ||||||
|  | 	metrics, err := parser.Parse(bytes) | ||||||
|  | 	require.Nil(t, err) | ||||||
|  | 
 | ||||||
|  | 	assert.Equal(t, 2, len(metrics)) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func TestParse_DefaultTags(t *testing.T) { | func TestParse_DefaultTags(t *testing.T) { | ||||||
| 	buf, err := writeValueList(singleMetric.vl) | 	buf, err := writeValueList(singleMetric.vl) | ||||||
| 	require.Nil(t, err) | 	require.Nil(t, err) | ||||||
|  | @ -266,7 +280,7 @@ func TestParseLine(t *testing.T) { | ||||||
| 	bytes, err := buf.Bytes() | 	bytes, err := buf.Bytes() | ||||||
| 	require.Nil(t, err) | 	require.Nil(t, err) | ||||||
| 
 | 
 | ||||||
| 	parser, err := NewCollectdParser("", "", []string{}) | 	parser, err := NewCollectdParser("", "", []string{}, "split") | ||||||
| 	require.Nil(t, err) | 	require.Nil(t, err) | ||||||
| 	metric, err := parser.ParseLine(string(bytes)) | 	metric, err := parser.ParseLine(string(bytes)) | ||||||
| 	require.Nil(t, err) | 	require.Nil(t, err) | ||||||
|  |  | ||||||
|  | @ -66,6 +66,9 @@ type Config struct { | ||||||
| 	// Dataset specification for collectd
 | 	// Dataset specification for collectd
 | ||||||
| 	CollectdTypesDB []string | 	CollectdTypesDB []string | ||||||
| 
 | 
 | ||||||
|  | 	// whether to split or join multivalue metrics
 | ||||||
|  | 	CollectdSplit string | ||||||
|  | 
 | ||||||
| 	// DataType only applies to value, this will be the type to parse value to
 | 	// DataType only applies to value, this will be the type to parse value to
 | ||||||
| 	DataType string | 	DataType string | ||||||
| 
 | 
 | ||||||
|  | @ -109,7 +112,7 @@ func NewParser(config *Config) (Parser, error) { | ||||||
| 			config.Templates, config.DefaultTags) | 			config.Templates, config.DefaultTags) | ||||||
| 	case "collectd": | 	case "collectd": | ||||||
| 		parser, err = NewCollectdParser(config.CollectdAuthFile, | 		parser, err = NewCollectdParser(config.CollectdAuthFile, | ||||||
| 			config.CollectdSecurityLevel, config.CollectdTypesDB) | 			config.CollectdSecurityLevel, config.CollectdTypesDB, config.CollectdSplit) | ||||||
| 	case "dropwizard": | 	case "dropwizard": | ||||||
| 		parser, err = NewDropwizardParser( | 		parser, err = NewDropwizardParser( | ||||||
| 			config.DropwizardMetricRegistryPath, | 			config.DropwizardMetricRegistryPath, | ||||||
|  | @ -172,8 +175,9 @@ func NewCollectdParser( | ||||||
| 	authFile string, | 	authFile string, | ||||||
| 	securityLevel string, | 	securityLevel string, | ||||||
| 	typesDB []string, | 	typesDB []string, | ||||||
|  | 	split string, | ||||||
| ) (Parser, error) { | ) (Parser, error) { | ||||||
| 	return collectd.NewCollectdParser(authFile, securityLevel, typesDB) | 	return collectd.NewCollectdParser(authFile, securityLevel, typesDB, split) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewDropwizardParser( | func NewDropwizardParser( | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue