Add logfmt parser (#4539)
This commit is contained in:
parent
430d7103da
commit
e893dc38a2
|
@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics:
|
|||
1. [Collectd](#collectd)
|
||||
1. [Dropwizard](#dropwizard)
|
||||
1. [Grok](#grok)
|
||||
1. [Logfmt](#logfmt)
|
||||
1. [Wavefront](#wavefront)
|
||||
|
||||
Telegraf metrics, like InfluxDB
|
||||
|
@ -882,6 +883,22 @@ the file output will only print once per `flush_interval`.
|
|||
- If successful, add the next token, update the pattern and retest.
|
||||
- Continue one token at a time until the entire line is successfully parsed.
|
||||
|
||||
# Logfmt
|
||||
This parser implements the logfmt format by extracting and converting key-value pairs from log text in the form `<key>=<value>`.
|
||||
At the moment, the plugin will produce one metric per line and all keys
|
||||
are added as fields.
|
||||
A typical log
|
||||
```
|
||||
method=GET host=influxdata.org ts=2018-07-24T19:43:40.275Z
|
||||
connect=4ms service=8ms status=200 bytes=1653
|
||||
```
|
||||
will be converted into
|
||||
```
|
||||
logfmt method="GET",host="influxdata.org",ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i
|
||||
|
||||
```
|
||||
Additional information about the logfmt format can be found [here](https://brandur.org/logfmt).
|
||||
|
||||
# Wavefront:
|
||||
|
||||
Wavefront Data Format is metrics are parsed directly into Telegraf metrics.
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
package logfmt
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-logfmt/logfmt"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNoMetric = fmt.Errorf("no metric in line")
|
||||
)
|
||||
|
||||
// Parser decodes logfmt formatted messages into metrics.
|
||||
type Parser struct {
|
||||
MetricName string
|
||||
DefaultTags map[string]string
|
||||
Now func() time.Time
|
||||
}
|
||||
|
||||
// NewParser creates a parser.
|
||||
func NewParser(metricName string, defaultTags map[string]string) *Parser {
|
||||
return &Parser{
|
||||
MetricName: metricName,
|
||||
DefaultTags: defaultTags,
|
||||
Now: time.Now,
|
||||
}
|
||||
}
|
||||
|
||||
// Parse converts a slice of bytes in logfmt format to metrics.
|
||||
func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) {
|
||||
reader := bytes.NewReader(b)
|
||||
decoder := logfmt.NewDecoder(reader)
|
||||
metrics := make([]telegraf.Metric, 0)
|
||||
for {
|
||||
ok := decoder.ScanRecord()
|
||||
if !ok {
|
||||
err := decoder.Err()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
break
|
||||
}
|
||||
fields := make(map[string]interface{})
|
||||
for decoder.ScanKeyval() {
|
||||
if string(decoder.Value()) == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
//type conversions
|
||||
value := string(decoder.Value())
|
||||
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
|
||||
fields[string(decoder.Key())] = iValue
|
||||
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil {
|
||||
fields[string(decoder.Key())] = fValue
|
||||
} else if bValue, err := strconv.ParseBool(value); err == nil {
|
||||
fields[string(decoder.Key())] = bValue
|
||||
} else {
|
||||
fields[string(decoder.Key())] = value
|
||||
}
|
||||
}
|
||||
if len(fields) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
m, err := metric.New(p.MetricName, map[string]string{}, fields, p.Now())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metrics = append(metrics, m)
|
||||
}
|
||||
p.applyDefaultTags(metrics)
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
// ParseLine converts a single line of text in logfmt format to metrics.
|
||||
func (p *Parser) ParseLine(s string) (telegraf.Metric, error) {
|
||||
metrics, err := p.Parse([]byte(s))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(metrics) < 1 {
|
||||
return nil, ErrNoMetric
|
||||
}
|
||||
return metrics[0], nil
|
||||
}
|
||||
|
||||
// SetDefaultTags adds tags to the metrics outputs of Parse and ParseLine.
|
||||
func (p *Parser) SetDefaultTags(tags map[string]string) {
|
||||
p.DefaultTags = tags
|
||||
}
|
||||
|
||||
func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) {
|
||||
if len(p.DefaultTags) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, m := range metrics {
|
||||
for k, v := range p.DefaultTags {
|
||||
if !m.HasTag(k) {
|
||||
m.AddTag(k, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,231 @@
|
|||
package logfmt
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric {
|
||||
t.Helper()
|
||||
v, err := metric.New(m.Measurement, m.Tags, m.Fields, m.Time)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func TestParse(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
measurement string
|
||||
now func() time.Time
|
||||
bytes []byte
|
||||
want []testutil.Metric
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "no bytes returns no metrics",
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
want: []testutil.Metric{},
|
||||
},
|
||||
{
|
||||
name: "test without trailing end",
|
||||
bytes: []byte("foo=\"bar\""),
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
measurement: "testlog",
|
||||
want: []testutil.Metric{
|
||||
testutil.Metric{
|
||||
Measurement: "testlog",
|
||||
Tags: map[string]string{},
|
||||
Fields: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
Time: time.Unix(0, 0),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test with trailing end",
|
||||
bytes: []byte("foo=\"bar\"\n"),
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
measurement: "testlog",
|
||||
want: []testutil.Metric{
|
||||
testutil.Metric{
|
||||
Measurement: "testlog",
|
||||
Tags: map[string]string{},
|
||||
Fields: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
Time: time.Unix(0, 0),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "logfmt parser returns all the fields",
|
||||
bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`),
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
measurement: "testlog",
|
||||
want: []testutil.Metric{
|
||||
testutil.Metric{
|
||||
Measurement: "testlog",
|
||||
Tags: map[string]string{},
|
||||
Fields: map[string]interface{}{
|
||||
"lvl": "info",
|
||||
"msg": "http request",
|
||||
"method": "POST",
|
||||
"ts": "2018-07-24T19:43:40.275Z",
|
||||
},
|
||||
Time: time.Unix(0, 0),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "logfmt parser parses every line",
|
||||
bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"),
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
measurement: "testlog",
|
||||
want: []testutil.Metric{
|
||||
testutil.Metric{
|
||||
Measurement: "testlog",
|
||||
Tags: map[string]string{},
|
||||
Fields: map[string]interface{}{
|
||||
"lvl": "info",
|
||||
"msg": "http request",
|
||||
"method": "POST",
|
||||
"ts": "2018-07-24T19:43:40.275Z",
|
||||
},
|
||||
Time: time.Unix(0, 0),
|
||||
},
|
||||
testutil.Metric{
|
||||
Measurement: "testlog",
|
||||
Tags: map[string]string{},
|
||||
Fields: map[string]interface{}{
|
||||
"parent_id": "088876RL000",
|
||||
"duration": 7.45,
|
||||
"log_id": "09R4e4Rl000",
|
||||
},
|
||||
Time: time.Unix(0, 0),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "keys without = or values are ignored",
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
bytes: []byte(`i am no data.`),
|
||||
want: []testutil.Metric{},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "keys without values are ignored",
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
bytes: []byte(`foo="" bar=`),
|
||||
want: []testutil.Metric{},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "unterminated quote produces error",
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
measurement: "testlog",
|
||||
bytes: []byte(`bar=baz foo="bar`),
|
||||
want: []testutil.Metric{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "malformed key",
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
measurement: "testlog",
|
||||
bytes: []byte(`"foo=" bar=baz`),
|
||||
want: []testutil.Metric{},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
l := Parser{
|
||||
MetricName: tt.measurement,
|
||||
Now: tt.now,
|
||||
}
|
||||
got, err := l.Parse(tt.bytes)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
require.Equal(t, len(tt.want), len(got))
|
||||
for i, m := range got {
|
||||
testutil.MustEqual(t, m, tt.want[i])
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseLine(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
s string
|
||||
measurement string
|
||||
now func() time.Time
|
||||
want testutil.Metric
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "No Metric In line",
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
want: testutil.Metric{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "Log parser fmt returns all fields",
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
measurement: "testlog",
|
||||
s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`,
|
||||
want: testutil.Metric{
|
||||
Measurement: "testlog",
|
||||
Fields: map[string]interface{}{
|
||||
"ts": "2018-07-24T19:43:35.207268Z",
|
||||
"lvl": int64(5),
|
||||
"msg": "Write failed",
|
||||
"log_id": "09R4e4Rl000",
|
||||
},
|
||||
Tags: map[string]string{},
|
||||
Time: time.Unix(0, 0),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ParseLine only returns metrics from first string",
|
||||
now: func() time.Time { return time.Unix(0, 0) },
|
||||
measurement: "testlog",
|
||||
s: "ts=2018-07-24T19:43:35.207268Z lvl=5 msg=\"Write failed\" log_id=09R4e4Rl000\nmethod=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000",
|
||||
want: testutil.Metric{
|
||||
Measurement: "testlog",
|
||||
Fields: map[string]interface{}{
|
||||
"ts": "2018-07-24T19:43:35.207268Z",
|
||||
"lvl": int64(5),
|
||||
"msg": "Write failed",
|
||||
"log_id": "09R4e4Rl000",
|
||||
},
|
||||
Tags: map[string]string{},
|
||||
Time: time.Unix(0, 0),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
l := Parser{
|
||||
MetricName: tt.measurement,
|
||||
Now: tt.now,
|
||||
}
|
||||
got, err := l.ParseLine(tt.s)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Fatalf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
if got != nil {
|
||||
testutil.MustEqual(t, got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/influxdata/telegraf/plugins/parsers/grok"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/json"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/logfmt"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/nagios"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/value"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
|
||||
|
@ -142,6 +143,8 @@ func NewParser(config *Config) (Parser, error) {
|
|||
config.GrokCustomPatterns,
|
||||
config.GrokCustomPatternFiles,
|
||||
config.GrokTimeZone)
|
||||
case "logfmt":
|
||||
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags)
|
||||
default:
|
||||
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
|
||||
}
|
||||
|
@ -242,6 +245,11 @@ func NewDropwizardParser(
|
|||
return parser, err
|
||||
}
|
||||
|
||||
// NewLogFmtParser returns a logfmt parser with the default options.
|
||||
func NewLogFmtParser(metricName string, defaultTags map[string]string) (Parser, error) {
|
||||
return logfmt.NewParser(metricName, defaultTags), nil
|
||||
}
|
||||
|
||||
func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {
|
||||
return wavefront.NewWavefrontParser(defaultTags), nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue