still unfinished logparser changes

This commit is contained in:
Max U 2018-07-03 16:27:11 -07:00
parent bfc13a744b
commit 67db14332f
2 changed files with 34 additions and 73 deletions

View File

@ -3,9 +3,7 @@
package logparser package logparser
import ( import (
"fmt"
"log" "log"
"reflect"
"strings" "strings"
"sync" "sync"
@ -35,9 +33,10 @@ type logEntry struct {
// LogParserPlugin is the primary struct to implement the interface for logparser plugin // LogParserPlugin is the primary struct to implement the interface for logparser plugin
type LogParserPlugin struct { type LogParserPlugin struct {
Files []string Files []string
FromBeginning bool FromBeginning bool
WatchMethod string WatchMethod string
MeasurementName string `toml:"measurement"`
tailers map[string]*tail.Tail tailers map[string]*tail.Tail
lines chan logEntry lines chan logEntry
@ -48,7 +47,7 @@ type LogParserPlugin struct {
sync.Mutex sync.Mutex
GrokParser *parsers.Parser `toml:"grok"` GrokParser parsers.Parser `toml:"grok"`
Patterns []string Patterns []string
NamedPatterns []string NamedPatterns []string
@ -146,38 +145,11 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
DataFormat: "grok", DataFormat: "grok",
} }
var err error var err error
*l.GrokParser, err = parsers.NewParser(config) l.GrokParser, err = parsers.NewParser(config)
if err != nil { if err != nil {
return err return err
} }
s := reflect.ValueOf(l).Elem()
for i := 0; i < s.NumField(); i++ {
f := s.Field(i)
log.Printf("got field %v: %v", i, f)
if !f.CanInterface() {
continue
}
if lpPlugin, ok := f.Interface().(parsers.Parser); ok {
if reflect.ValueOf(lpPlugin).IsNil() {
continue
}
l.parsers = append(l.parsers, lpPlugin)
}
}
if len(l.parsers) == 0 {
return fmt.Errorf("logparser input plugin: no parser defined")
}
// //compile log parser patterns:
// for _, parser := range l.parsers {
// if err := parser.Compile(); err != nil {
// return err
// }
// }
l.wg.Add(1) l.wg.Add(1)
go l.parser() go l.parser()
@ -284,18 +256,17 @@ func (l *LogParserPlugin) parser() {
continue continue
} }
} }
for _, parser := range l.parsers { m, err = l.GrokParser.ParseLine(entry.line)
m, err = parser.ParseLine(entry.line) if err == nil {
if err == nil { if m != nil {
if m != nil { tags := m.Tags()
tags := m.Tags() tags["path"] = entry.path
tags["path"] = entry.path l.acc.AddFields(l.MeasurementName, m.Fields(), tags, m.Time())
l.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else {
log.Println("E! Error parsing log line: " + err.Error())
} }
} else {
log.Println("E! Error parsing log line: " + err.Error())
} }
} }
} }

View File

@ -2,6 +2,7 @@ package logparser
import ( import (
"io/ioutil" "io/ioutil"
"log"
"os" "os"
"runtime" "runtime"
"strings" "strings"
@ -36,7 +37,7 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
logparser := &LogParserPlugin{ logparser := &LogParserPlugin{
FromBeginning: true, FromBeginning: true,
Files: []string{thisdir + "grok/testdata/*.log"}, Files: []string{thisdir + "grok/testdata/*.log"},
GrokParser: &p, GrokParser: p,
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
@ -46,26 +47,23 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
func TestGrokParseLogFiles(t *testing.T) { func TestGrokParseLogFiles(t *testing.T) {
thisdir := getCurrentDir() thisdir := getCurrentDir()
c := parsers.Config{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
DataFormat: "grok",
}
p, _ := parsers.NewParser(&c)
logparser := &LogParserPlugin{ logparser := &LogParserPlugin{
FromBeginning: true, Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
Files: []string{thisdir + "grok/testdata/*.log"}, CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
GrokParser: &p, FromBeginning: true,
Files: []string{thisdir + "grok/testdata/*.log"},
MeasurementName: "logparser_grok",
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
assert.NoError(t, logparser.Start(&acc)) assert.NoError(t, logparser.Start(&acc))
//acc.Wait(2) acc.Wait(2)
logparser.Stop() logparser.Stop()
log.Printf("metric[0] %v, tags: %v, fields: %v", acc.Metrics[0].Measurement, acc.Metrics[0].Tags, acc.Metrics[0].Fields)
acc.AssertContainsTaggedFields(t, "logparser_grok", acc.AssertContainsTaggedFields(t, "logparser_grok",
map[string]interface{}{ map[string]interface{}{
"clientip": "192.168.1.1", "clientip": "192.168.1.1",
@ -95,17 +93,13 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
thisdir := getCurrentDir() thisdir := getCurrentDir()
c := &parsers.Config{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
DataFormat: "grok",
}
p, err := parsers.NewParser(c)
logparser := &LogParserPlugin{ logparser := &LogParserPlugin{
FromBeginning: true, FromBeginning: true,
Files: []string{emptydir + "/*.log"}, Files: []string{emptydir + "/*.log"},
GrokParser: &p, Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
MeasurementName: "logparser_grok",
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
@ -136,17 +130,13 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
// pattern available for test_b.log // pattern available for test_b.log
func TestGrokParseLogFilesOneBad(t *testing.T) { func TestGrokParseLogFilesOneBad(t *testing.T) {
thisdir := getCurrentDir() thisdir := getCurrentDir()
c := &parsers.Config{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_BAD}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
DataFormat: "grok",
}
p, _ := parsers.NewParser(c)
logparser := &LogParserPlugin{ logparser := &LogParserPlugin{
FromBeginning: true, FromBeginning: true,
Files: []string{thisdir + "grok/testdata/test_a.log"}, Files: []string{thisdir + "grok/testdata/test_a.log"},
GrokParser: &p, Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_BAD}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
MeasurementName: "logparser_grok",
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}