Add collectd parser (#2654)

This commit is contained in:
Daniel Nelson 2017-04-12 10:41:26 -07:00 committed by GitHub
parent 198ef8de3a
commit c4634c1743
12 changed files with 592 additions and 4 deletions

View File

@ -67,6 +67,7 @@ be deprecated eventually.
- [#1667](https://github.com/influxdata/telegraf/pull/1667): dmcache input plugin
- [#2637](https://github.com/influxdata/telegraf/issues/2637): Add support for precision in http_listener
- [#2636](https://github.com/influxdata/telegraf/pull/2636): Add `message_len_max` option to `kafka_consumer` input
- [#1100](https://github.com/influxdata/telegraf/issues/1100): Add collectd parser
### Bugfixes

1
Godeps
View File

@ -1,3 +1,4 @@
collectd.org 2ce144541b8903101fb8f1483cc0497a68798122
github.com/Shopify/sarama 574d3147eee384229bf96a5d12c207fe7b5234f3
github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d
github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c

View File

@ -195,6 +195,16 @@ Telegraf can also collect metrics via the following service plugins:
* [mandrill](./plugins/inputs/webhooks/mandrill)
* [rollbar](./plugins/inputs/webhooks/rollbar)
Telegraf is able to parse the following input data formats into metrics, these
formats may be used with input plugins supporting the `data_format` option:
* [InfluxDB Line Protocol](./docs/DATA_FORMATS_INPUT.md#influx)
* [JSON](./docs/DATA_FORMATS_INPUT.md#json)
* [Graphite](./docs/DATA_FORMATS_INPUT.md#graphite)
* [Value](./docs/DATA_FORMATS_INPUT.md#value)
* [Nagios](./docs/DATA_FORMATS_INPUT.md#nagios)
* [Collectd](./docs/DATA_FORMATS_INPUT.md#collectd)
## Processor Plugins
* [printer](./plugins/processors/printer)

View File

@ -7,6 +7,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite)
1. [Value](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#value), ie: 45 or "booyah"
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd)
Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
@ -438,3 +439,43 @@ Note: Nagios Input Data Formats is only supported in `exec` input plugin.
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "nagios"
```
# Collectd:
The collectd format parses the collectd binary network protocol. Tags are
created for host, instance, type, and type instance. All collectd values are
added as float64 fields.
For more information about the binary network protocol see
[here](https://collectd.org/wiki/index.php/Binary_protocol).
You can control the cryptographic settings with parser options. Create an
authentication file and set `collectd_auth_file` to the path of the file, then
set the desired security level in `collectd_security_level`.
Additional information including client setup can be found
[here](https://collectd.org/wiki/index.php/Networking_introduction#Cryptographic_setup).
You can also change the path to the typesdb or add additional typesdb using
`collectd_typesdb`.
#### Collectd Configuration:
```toml
[[inputs.socket_listener]]
service_address = "udp://127.0.0.1:25826"
name_prefix = "collectd_"
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "collectd"
## Authentication file for cryptographic security levels
collectd_auth_file = "/etc/collectd/auth_file"
## One of none (default), sign, or encrypt
collectd_security_level = "encrypt"
## Path of to TypesDB specifications
collectd_typesdb = ["/usr/share/collectd/types.db"]
```

View File

@ -1,4 +1,5 @@
# List
- collectd.org [MIT LICENSE](https://github.com/collectd/go-collectd/blob/master/LICENSE)
- github.com/Shopify/sarama [MIT LICENSE](https://github.com/Shopify/sarama/blob/master/MIT-LICENSE)
- github.com/Sirupsen/logrus [MIT LICENSE](https://github.com/Sirupsen/logrus/blob/master/LICENSE)
- github.com/armon/go-metrics [MIT LICENSE](https://github.com/armon/go-metrics/blob/master/LICENSE)
@ -30,4 +31,3 @@
- gopkg.in/dancannon/gorethink.v1 [APACHE LICENSE](https://github.com/dancannon/gorethink/blob/v1.1.2/LICENSE)
- gopkg.in/mgo.v2 [BSD LICENSE](https://github.com/go-mgo/mgo/blob/v2/LICENSE)
- golang.org/x/crypto/ [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)

View File

@ -1230,6 +1230,34 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}
if node, ok := tbl.Fields["collectd_auth_file"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CollectdAuthFile = str.Value
}
}
}
if node, ok := tbl.Fields["collectd_security_level"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CollectdSecurityLevel = str.Value
}
}
}
if node, ok := tbl.Fields["collectd_typesdb"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.CollectdTypesDB = append(c.CollectdTypesDB, str.Value)
}
}
}
}
}
c.MetricName = name
delete(tbl.Fields, "data_format")
@ -1237,6 +1265,9 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "data_type")
delete(tbl.Fields, "collectd_auth_file")
delete(tbl.Fields, "collectd_security_level")
delete(tbl.Fields, "collectd_typesdb")
return parsers.NewParser(c)
}

View File

@ -4,11 +4,14 @@ import (
"io"
"log"
"os"
"regexp"
"time"
"github.com/influxdata/wlog"
)
var prefixRegex = regexp.MustCompile("^[DIWE]!")
// newTelegrafWriter returns a logging-wrapped writer.
func newTelegrafWriter(w io.Writer) io.Writer {
return &telegrafLog{
@ -21,7 +24,13 @@ type telegrafLog struct {
}
func (t *telegrafLog) Write(b []byte) (n int, err error) {
return t.writer.Write(append([]byte(time.Now().UTC().Format(time.RFC3339)+" "), b...))
var line []byte
if !prefixRegex.Match(b) {
line = append([]byte(time.Now().UTC().Format(time.RFC3339)+" I! "), b...)
} else {
line = append([]byte(time.Now().UTC().Format(time.RFC3339)+" "), b...)
}
return t.writer.Write(line)
}
// SetupLogging configures the logging output.

View File

@ -51,6 +51,19 @@ func TestErrorWriteLogToFile(t *testing.T) {
assert.Equal(t, f[19:], []byte("Z E! TEST\n"))
}
func TestAddDefaultLogLevel(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
assert.NoError(t, err)
defer func() { os.Remove(tmpfile.Name()) }()
SetupLogging(true, false, tmpfile.Name())
log.Printf("TEST")
f, err := ioutil.ReadFile(tmpfile.Name())
assert.NoError(t, err)
assert.Equal(t, f[19:], []byte("Z I! TEST\n"))
}
func BenchmarkTelegrafLogWrite(b *testing.B) {
var msg = []byte("test")
var buf bytes.Buffer

View File

@ -71,7 +71,7 @@ func (ssl *streamSocketListener) read(c net.Conn) {
for scnr.Scan() {
metrics, err := ssl.Parse(scnr.Bytes())
if err != nil {
ssl.AddError(fmt.Errorf("unable to parse incoming line"))
ssl.AddError(fmt.Errorf("unable to parse incoming line: %s", err))
//TODO rate limit
continue
}
@ -105,7 +105,7 @@ func (psl *packetSocketListener) listen() {
metrics, err := psl.Parse(buf[:n])
if err != nil {
psl.AddError(fmt.Errorf("unable to parse incoming packet"))
psl.AddError(fmt.Errorf("unable to parse incoming packet: %s", err))
//TODO rate limit
continue
}

View File

@ -0,0 +1,165 @@
package collectd
import (
"errors"
"fmt"
"log"
"os"
"collectd.org/api"
"collectd.org/network"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
const (
DefaultAuthFile = "/etc/collectd/auth_file"
)
type CollectdParser struct {
// DefaultTags will be added to every parsed metric
DefaultTags map[string]string
popts network.ParseOpts
}
func (p *CollectdParser) SetParseOpts(popts *network.ParseOpts) {
p.popts = *popts
}
func NewCollectdParser(
authFile string,
securityLevel string,
typesDB []string,
) (*CollectdParser, error) {
popts := network.ParseOpts{}
switch securityLevel {
case "none":
popts.SecurityLevel = network.None
case "sign":
popts.SecurityLevel = network.Sign
case "encrypt":
popts.SecurityLevel = network.Encrypt
default:
popts.SecurityLevel = network.None
}
if authFile == "" {
authFile = DefaultAuthFile
}
popts.PasswordLookup = network.NewAuthFile(authFile)
for _, path := range typesDB {
db, err := LoadTypesDB(path)
if err != nil {
return nil, err
}
if popts.TypesDB != nil {
popts.TypesDB.Merge(db)
} else {
popts.TypesDB = db
}
}
parser := CollectdParser{popts: popts}
return &parser, nil
}
func (p *CollectdParser) Parse(buf []byte) ([]telegraf.Metric, error) {
valueLists, err := network.Parse(buf, p.popts)
if err != nil {
return nil, fmt.Errorf("Collectd parser error: %s", err)
}
metrics := []telegraf.Metric{}
for _, valueList := range valueLists {
metrics = append(metrics, UnmarshalValueList(valueList)...)
}
if len(p.DefaultTags) > 0 {
for _, m := range metrics {
for k, v := range p.DefaultTags {
// only set the default tag if it doesn't already exist:
if !m.HasTag(k) {
m.AddTag(k, v)
}
}
}
}
return metrics, nil
}
func (p *CollectdParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line))
if err != nil {
return nil, err
}
if len(metrics) != 1 {
return nil, errors.New("Line contains multiple metrics")
}
return metrics[0], nil
}
func (p *CollectdParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
// UnmarshalValueList translates a ValueList into a Telegraf metric.
func UnmarshalValueList(vl *api.ValueList) []telegraf.Metric {
timestamp := vl.Time.UTC()
var metrics []telegraf.Metric
for i := range vl.Values {
var name string
name = fmt.Sprintf("%s_%s", vl.Identifier.Plugin, vl.DSName(i))
tags := make(map[string]string)
fields := make(map[string]interface{})
// Convert interface back to actual type, then to float64
switch value := vl.Values[i].(type) {
case api.Gauge:
fields["value"] = float64(value)
case api.Derive:
fields["value"] = float64(value)
case api.Counter:
fields["value"] = float64(value)
}
if vl.Identifier.Host != "" {
tags["host"] = vl.Identifier.Host
}
if vl.Identifier.PluginInstance != "" {
tags["instance"] = vl.Identifier.PluginInstance
}
if vl.Identifier.Type != "" {
tags["type"] = vl.Identifier.Type
}
if vl.Identifier.TypeInstance != "" {
tags["type_instance"] = vl.Identifier.TypeInstance
}
// Drop invalid points
m, err := metric.New(name, tags, fields, timestamp)
if err != nil {
log.Printf("E! Dropping metric %v: %v", name, err)
continue
}
metrics = append(metrics, m)
}
return metrics
}
func LoadTypesDB(path string) (*api.TypesDB, error) {
reader, err := os.Open(path)
if err != nil {
return nil, err
}
return api.NewTypesDB(reader)
}

View File

@ -0,0 +1,298 @@
package collectd
import (
"context"
"testing"
"collectd.org/api"
"collectd.org/network"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
)
type AuthMap struct {
Passwd map[string]string
}
func (p *AuthMap) Password(user string) (string, error) {
return p.Passwd[user], nil
}
type metricData struct {
name string
tags map[string]string
fields map[string]interface{}
}
type testCase struct {
vl []api.ValueList
expected []metricData
}
var singleMetric = testCase{
[]api.ValueList{
api.ValueList{
Identifier: api.Identifier{
Host: "xyzzy",
Plugin: "cpu",
PluginInstance: "1",
Type: "cpu",
TypeInstance: "user",
},
Values: []api.Value{
api.Counter(42),
},
DSNames: []string(nil),
},
},
[]metricData{
metricData{
"cpu_value",
map[string]string{
"type_instance": "user",
"host": "xyzzy",
"instance": "1",
"type": "cpu",
},
map[string]interface{}{
"value": float64(42),
},
},
},
}
var multiMetric = testCase{
[]api.ValueList{
api.ValueList{
Identifier: api.Identifier{
Host: "xyzzy",
Plugin: "cpu",
PluginInstance: "0",
Type: "cpu",
TypeInstance: "user",
},
Values: []api.Value{
api.Derive(42),
api.Gauge(42),
},
DSNames: []string(nil),
},
},
[]metricData{
metricData{
"cpu_0",
map[string]string{
"type_instance": "user",
"host": "xyzzy",
"instance": "0",
"type": "cpu",
},
map[string]interface{}{
"value": float64(42),
},
},
metricData{
"cpu_1",
map[string]string{
"type_instance": "user",
"host": "xyzzy",
"instance": "0",
"type": "cpu",
},
map[string]interface{}{
"value": float64(42),
},
},
},
}
func TestNewCollectdParser(t *testing.T) {
parser, err := NewCollectdParser("", "", []string{})
require.Nil(t, err)
require.Equal(t, parser.popts.SecurityLevel, network.None)
require.NotNil(t, parser.popts.PasswordLookup)
require.Nil(t, parser.popts.TypesDB)
}
func TestParse(t *testing.T) {
cases := []testCase{singleMetric, multiMetric}
for _, tc := range cases {
buf, err := writeValueList(tc.vl)
require.Nil(t, err)
bytes, err := buf.Bytes()
require.Nil(t, err)
parser := &CollectdParser{}
require.Nil(t, err)
metrics, err := parser.Parse(bytes)
require.Nil(t, err)
assertEqualMetrics(t, tc.expected, metrics)
}
}
func TestParse_DefaultTags(t *testing.T) {
buf, err := writeValueList(singleMetric.vl)
require.Nil(t, err)
bytes, err := buf.Bytes()
require.Nil(t, err)
parser := &CollectdParser{}
parser.SetDefaultTags(map[string]string{
"foo": "bar",
})
require.Nil(t, err)
metrics, err := parser.Parse(bytes)
require.Nil(t, err)
require.Equal(t, "bar", metrics[0].Tags()["foo"])
}
func TestParse_SignSecurityLevel(t *testing.T) {
parser := &CollectdParser{}
popts := &network.ParseOpts{
SecurityLevel: network.Sign,
PasswordLookup: &AuthMap{
map[string]string{
"user0": "bar",
},
},
}
parser.SetParseOpts(popts)
// Signed data
buf, err := writeValueList(singleMetric.vl)
require.Nil(t, err)
buf.Sign("user0", "bar")
bytes, err := buf.Bytes()
require.Nil(t, err)
metrics, err := parser.Parse(bytes)
require.Nil(t, err)
assertEqualMetrics(t, singleMetric.expected, metrics)
// Encrypted data
buf, err = writeValueList(singleMetric.vl)
require.Nil(t, err)
buf.Encrypt("user0", "bar")
bytes, err = buf.Bytes()
require.Nil(t, err)
metrics, err = parser.Parse(bytes)
require.Nil(t, err)
assertEqualMetrics(t, singleMetric.expected, metrics)
// Plain text data skipped
buf, err = writeValueList(singleMetric.vl)
require.Nil(t, err)
bytes, err = buf.Bytes()
require.Nil(t, err)
metrics, err = parser.Parse(bytes)
require.Nil(t, err)
require.Equal(t, []telegraf.Metric{}, metrics)
// Wrong password error
buf, err = writeValueList(singleMetric.vl)
require.Nil(t, err)
buf.Sign("x", "y")
bytes, err = buf.Bytes()
require.Nil(t, err)
metrics, err = parser.Parse(bytes)
require.NotNil(t, err)
}
func TestParse_EncryptSecurityLevel(t *testing.T) {
parser := &CollectdParser{}
popts := &network.ParseOpts{
SecurityLevel: network.Encrypt,
PasswordLookup: &AuthMap{
map[string]string{
"user0": "bar",
},
},
}
parser.SetParseOpts(popts)
// Signed data skipped
buf, err := writeValueList(singleMetric.vl)
require.Nil(t, err)
buf.Sign("user0", "bar")
bytes, err := buf.Bytes()
require.Nil(t, err)
metrics, err := parser.Parse(bytes)
require.Nil(t, err)
require.Equal(t, []telegraf.Metric{}, metrics)
// Encrypted data
buf, err = writeValueList(singleMetric.vl)
require.Nil(t, err)
buf.Encrypt("user0", "bar")
bytes, err = buf.Bytes()
require.Nil(t, err)
metrics, err = parser.Parse(bytes)
require.Nil(t, err)
assertEqualMetrics(t, singleMetric.expected, metrics)
// Plain text data skipped
buf, err = writeValueList(singleMetric.vl)
require.Nil(t, err)
bytes, err = buf.Bytes()
require.Nil(t, err)
metrics, err = parser.Parse(bytes)
require.Nil(t, err)
require.Equal(t, []telegraf.Metric{}, metrics)
// Wrong password error
buf, err = writeValueList(singleMetric.vl)
require.Nil(t, err)
buf.Sign("x", "y")
bytes, err = buf.Bytes()
require.Nil(t, err)
metrics, err = parser.Parse(bytes)
require.NotNil(t, err)
}
func TestParseLine(t *testing.T) {
buf, err := writeValueList(singleMetric.vl)
require.Nil(t, err)
bytes, err := buf.Bytes()
require.Nil(t, err)
parser, err := NewCollectdParser("", "", []string{})
require.Nil(t, err)
metric, err := parser.ParseLine(string(bytes))
require.Nil(t, err)
assertEqualMetrics(t, singleMetric.expected, []telegraf.Metric{metric})
}
func writeValueList(valueLists []api.ValueList) (*network.Buffer, error) {
buffer := network.NewBuffer(0)
ctx := context.Background()
for _, vl := range valueLists {
err := buffer.Write(ctx, &vl)
if err != nil {
return nil, err
}
}
return buffer, nil
}
func assertEqualMetrics(t *testing.T, expected []metricData, received []telegraf.Metric) {
require.Equal(t, len(expected), len(received))
for i, m := range received {
require.Equal(t, expected[i].name, m.Name())
require.Equal(t, expected[i].tags, m.Tags())
require.Equal(t, expected[i].fields, m.Fields())
}
}

View File

@ -5,6 +5,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers/collectd"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
@ -53,6 +54,13 @@ type Config struct {
// MetricName applies to JSON & value. This will be the name of the measurement.
MetricName string
// Authentication file for collectd
CollectdAuthFile string
// One of none (default), sign, or encrypt
CollectdSecurityLevel string
// Dataset specification for collectd
CollectdTypesDB []string
// DataType only applies to value, this will be the type to parse value to
DataType string
@ -78,6 +86,9 @@ func NewParser(config *Config) (Parser, error) {
case "graphite":
parser, err = NewGraphiteParser(config.Separator,
config.Templates, config.DefaultTags)
case "collectd":
parser, err = NewCollectdParser(config.CollectdAuthFile,
config.CollectdSecurityLevel, config.CollectdTypesDB)
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
@ -124,3 +135,11 @@ func NewValueParser(
DefaultTags: defaultTags,
}, nil
}
func NewCollectdParser(
authFile string,
securityLevel string,
typesDB []string,
) (Parser, error) {
return collectd.NewCollectdParser(authFile, securityLevel, typesDB)
}