From 697381d4b55b3eefa2aed697d38a412c1e48b438 Mon Sep 17 00:00:00 2001 From: Leonardo Di Donato Date: Tue, 18 Dec 2018 19:54:38 +0100 Subject: [PATCH] Add support for non-transparent framing of syslog messages (#5148) --- Gopkg.lock | 25 +- Gopkg.toml | 2 +- plugins/inputs/syslog/README.md | 24 +- plugins/inputs/syslog/commons_test.go | 62 ++++ plugins/inputs/syslog/framing.go | 64 ++++ plugins/inputs/syslog/framing_test.go | 37 +++ plugins/inputs/syslog/nontransparent_test.go | 308 ++++++++++++++++++ ...{rfc5425_test.go => octetcounting_test.go} | 91 ++---- plugins/inputs/syslog/rfc5426_test.go | 23 +- plugins/inputs/syslog/syslog.go | 85 +++-- 10 files changed, 601 insertions(+), 120 deletions(-) create mode 100644 plugins/inputs/syslog/commons_test.go create mode 100644 plugins/inputs/syslog/framing.go create mode 100644 plugins/inputs/syslog/framing_test.go create mode 100644 plugins/inputs/syslog/nontransparent_test.go rename plugins/inputs/syslog/{rfc5425_test.go => octetcounting_test.go} (84%) diff --git a/Gopkg.lock b/Gopkg.lock index d043bccd0..8fd3e81c4 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -583,15 +583,17 @@ version = "v0.8.1" [[projects]] - digest = "1:a39ef049cdeee03a57b132e7d60e32711b9d949c78458da78e702d9864c54369" + digest = "1:824c4cd143ee15735f1c75d9072aad46e51dd27a4ef8bf6ce723a138265b7fb3" name = "github.com/influxdata/go-syslog" packages = [ + ".", + "nontransparent", + "octetcounting", "rfc5424", - "rfc5425", ] pruneopts = "" - revision = "eecd51df3ad85464a2bab9b7d3a45bc1e299059e" - version = "v1.0.1" + revision = "0cd00a9f0a5e5607d5ef9a294c260f77a74e3b5a" + version = "v2.0.0" [[projects]] branch = "master" @@ -689,6 +691,17 @@ pruneopts = "" revision = "b84e30acd515aadc4b783ad4ff83aff3299bdfe0" +[[projects]] + branch = "develop" + digest = "1:3e66a61a57bbbe832c338edb3a623be0deb3dec650c2f3515149658898287e37" + name = "github.com/leodido/ragel-machinery" + packages = [ + ".", + "parser", + ] + pruneopts = "" + revision = "299bdde78165d4ca4bc7d064d8d6a4f39ac6de8c" + [[projects]] branch = "master" digest = "1:7e9956922e349af0190afa0b6621befcd201072679d8e51a9047ff149f2afe93" @@ -1478,8 +1491,10 @@ "github.com/google/go-cmp/cmp", "github.com/gorilla/mux", "github.com/hashicorp/consul/api", + "github.com/influxdata/go-syslog", + "github.com/influxdata/go-syslog/nontransparent", + "github.com/influxdata/go-syslog/octetcounting", "github.com/influxdata/go-syslog/rfc5424", - "github.com/influxdata/go-syslog/rfc5425", "github.com/influxdata/tail", "github.com/influxdata/toml", "github.com/influxdata/toml/ast", diff --git a/Gopkg.toml b/Gopkg.toml index 80df324dc..3b5c1b917 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -72,7 +72,7 @@ [[constraint]] name = "github.com/influxdata/go-syslog" - version = "1.0.1" + version = "2.0.0" [[constraint]] name = "github.com/influxdata/tail" diff --git a/plugins/inputs/syslog/README.md b/plugins/inputs/syslog/README.md index ad9b9b572..8183d2c90 100644 --- a/plugins/inputs/syslog/README.md +++ b/plugins/inputs/syslog/README.md @@ -2,7 +2,8 @@ The syslog plugin listens for syslog messages transmitted over [UDP](https://tools.ietf.org/html/rfc5426) or -[TCP](https://tools.ietf.org/html/rfc5425). +[TCP](https://tools.ietf.org/html/rfc6587) or +[TLS](https://tools.ietf.org/html/rfc5425), with or without the octet counting framing. Syslog messages should be formatted according to [RFC 5424](https://tools.ietf.org/html/rfc5424). @@ -37,6 +38,16 @@ Syslog messages should be formatted according to ## 0 means unlimited. # read_timeout = "5s" + ## The framing technique with which it is expected that messages are transported (default = "octet-counting"). + ## Whether the messages come using the octect-counting (RFC5425#section-4.3.1, RFC6587#section-3.4.1), + ## or the non-transparent framing technique (RFC6587#section-3.4.2). + ## Must be one of "octect-counting", "non-transparent". + # framing = "octet-counting" + + ## The trailer to be expected in case of non-trasparent framing (default = "LF"). + ## Must be one of "LF", or "NUL". + # trailer = "LF" + ## Whether to parse in best effort mode or not (default = false). ## By default best effort parsing is off. # best_effort = false @@ -49,11 +60,18 @@ Syslog messages should be formatted according to # sdparam_separator = "_" ``` -#### Best Effort +#### Message transport + +The `framing` option only applies to streams. It governs the way we expect to receive messages within the stream. +Namely, with the [`"octet counting"`](https://tools.ietf.org/html/rfc5425#section-4.3) technique (default) or with the [`"non-transparent"`](https://tools.ietf.org/html/rfc6587#section-3.4.2) framing. + +The `trailer` option only applies when `framing` option is `"non-transparent"`. It must have one of the following values: `"LF"` (default), or `"NUL"`. + +#### Best effort The [`best_effort`](https://github.com/influxdata/go-syslog#best-effort-mode) option instructs the parser to extract partial but valid info from syslog -messages. If unset only full messages will be collected. +messages. If unset only full messages will be collected. #### Rsyslog Integration diff --git a/plugins/inputs/syslog/commons_test.go b/plugins/inputs/syslog/commons_test.go new file mode 100644 index 000000000..f55d080a1 --- /dev/null +++ b/plugins/inputs/syslog/commons_test.go @@ -0,0 +1,62 @@ +package syslog + +import ( + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "time" +) + +var ( + pki = testutil.NewPKI("../../../testutil/pki") +) + +type testCasePacket struct { + name string + data []byte + wantBestEffort *testutil.Metric + wantStrict *testutil.Metric + werr bool +} + +type testCaseStream struct { + name string + data []byte + wantBestEffort []testutil.Metric + wantStrict []testutil.Metric + werr int // how many errors we expect in the strict mode? +} + +func newUDPSyslogReceiver(address string, bestEffort bool) *Syslog { + return &Syslog{ + Address: address, + now: func() time.Time { + return defaultTime + }, + BestEffort: bestEffort, + Separator: "_", + } +} + +func newTCPSyslogReceiver(address string, keepAlive *internal.Duration, maxConn int, bestEffort bool, f Framing) *Syslog { + d := &internal.Duration{ + Duration: defaultReadTimeout, + } + s := &Syslog{ + Address: address, + now: func() time.Time { + return defaultTime + }, + Framing: f, + ReadTimeout: d, + BestEffort: bestEffort, + Separator: "_", + } + if keepAlive != nil { + s.KeepAlivePeriod = keepAlive + } + if maxConn > 0 { + s.MaxConnections = maxConn + } + + return s +} diff --git a/plugins/inputs/syslog/framing.go b/plugins/inputs/syslog/framing.go new file mode 100644 index 000000000..6edfc7058 --- /dev/null +++ b/plugins/inputs/syslog/framing.go @@ -0,0 +1,64 @@ +package syslog + +import ( + "fmt" + "strings" +) + +// Framing represents the framing technique we expect the messages to come. +type Framing int + +const ( + // OctetCounting indicates the transparent framing technique for syslog transport. + OctetCounting Framing = iota + // NonTransparent indicates the non-transparent framing technique for syslog transport. + NonTransparent +) + +func (f Framing) String() string { + switch f { + case OctetCounting: + return "OCTET-COUNTING" + case NonTransparent: + return "NON-TRANSPARENT" + } + return "" +} + +// UnmarshalTOML implements ability to unmarshal framing from TOML files. +func (f *Framing) UnmarshalTOML(data []byte) (err error) { + return f.UnmarshalText(data) +} + +// UnmarshalText implements encoding.TextUnmarshaler +func (f *Framing) UnmarshalText(data []byte) (err error) { + s := string(data) + switch strings.ToUpper(s) { + case `OCTET-COUNTING`: + fallthrough + case `"OCTET-COUNTING"`: + fallthrough + case `'OCTET-COUNTING'`: + *f = OctetCounting + return + + case `NON-TRANSPARENT`: + fallthrough + case `"NON-TRANSPARENT"`: + fallthrough + case `'NON-TRANSPARENT'`: + *f = NonTransparent + return + } + *f = -1 + return fmt.Errorf("unknown framing") +} + +// MarshalText implements encoding.TextMarshaler +func (f Framing) MarshalText() ([]byte, error) { + s := f.String() + if s != "" { + return []byte(s), nil + } + return nil, fmt.Errorf("unknown framing") +} diff --git a/plugins/inputs/syslog/framing_test.go b/plugins/inputs/syslog/framing_test.go new file mode 100644 index 000000000..1442eba7f --- /dev/null +++ b/plugins/inputs/syslog/framing_test.go @@ -0,0 +1,37 @@ +package syslog + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestFraming(t *testing.T) { + var f1 Framing + f1.UnmarshalTOML([]byte(`"non-transparent"`)) + assert.Equal(t, NonTransparent, f1) + + var f2 Framing + f2.UnmarshalTOML([]byte(`non-transparent`)) + assert.Equal(t, NonTransparent, f2) + + var f3 Framing + f3.UnmarshalTOML([]byte(`'non-transparent'`)) + assert.Equal(t, NonTransparent, f3) + + var f4 Framing + f4.UnmarshalTOML([]byte(`"octet-counting"`)) + assert.Equal(t, OctetCounting, f4) + + var f5 Framing + f5.UnmarshalTOML([]byte(`octet-counting`)) + assert.Equal(t, OctetCounting, f5) + + var f6 Framing + f6.UnmarshalTOML([]byte(`'octet-counting'`)) + assert.Equal(t, OctetCounting, f6) + + var f7 Framing + err := f7.UnmarshalTOML([]byte(`nope`)) + assert.Equal(t, Framing(-1), f7) + assert.Error(t, err) +} diff --git a/plugins/inputs/syslog/nontransparent_test.go b/plugins/inputs/syslog/nontransparent_test.go new file mode 100644 index 000000000..1dea84144 --- /dev/null +++ b/plugins/inputs/syslog/nontransparent_test.go @@ -0,0 +1,308 @@ +package syslog + +import ( + "crypto/tls" + "io/ioutil" + "net" + "os" + "path/filepath" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func getTestCasesForNonTransparent() []testCaseStream { + testCases := []testCaseStream{ + { + name: "1st/avg/ok", + data: []byte(`<29>1 2016-02-21T04:32:57+00:00 web1 someservice 2341 2 [origin][meta sequence="14125553" service="someservice"] "GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`), + wantStrict: []testutil.Metric{ + { + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "timestamp": time.Unix(1456029177, 0).UnixNano(), + "procid": "2341", + "msgid": "2", + "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, + "origin": true, + "meta_sequence": "14125553", + "meta_service": "someservice", + "severity_code": 5, + "facility_code": 3, + }, + Tags: map[string]string{ + "severity": "notice", + "facility": "daemon", + "hostname": "web1", + "appname": "someservice", + }, + Time: defaultTime, + }, + }, + wantBestEffort: []testutil.Metric{ + { + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(1), + "timestamp": time.Unix(1456029177, 0).UnixNano(), + "procid": "2341", + "msgid": "2", + "message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`, + "origin": true, + "meta_sequence": "14125553", + "meta_service": "someservice", + "severity_code": 5, + "facility_code": 3, + }, + Tags: map[string]string{ + "severity": "notice", + "facility": "daemon", + "hostname": "web1", + "appname": "someservice", + }, + Time: defaultTime, + }, + }, + werr: 1, + }, + { + name: "1st/min/ok//2nd/min/ok", + data: []byte("<1>2 - - - - - -\n<4>11 - - - - - -\n"), + wantStrict: []testutil.Metric{ + { + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(2), + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + { + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(11), + "severity_code": 4, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "warning", + "facility": "kern", + }, + Time: defaultTime.Add(time.Nanosecond), + }, + }, + wantBestEffort: []testutil.Metric{ + { + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(2), + "severity_code": 1, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "alert", + "facility": "kern", + }, + Time: defaultTime, + }, + { + Measurement: "syslog", + Fields: map[string]interface{}{ + "version": uint16(11), + "severity_code": 4, + "facility_code": 0, + }, + Tags: map[string]string{ + "severity": "warning", + "facility": "kern", + }, + Time: defaultTime.Add(time.Nanosecond), + }, + }, + }, + } + return testCases +} + +func testStrictNonTransparent(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) { + for _, tc := range getTestCasesForNonTransparent() { + t.Run(tc.name, func(t *testing.T) { + // Creation of a strict mode receiver + receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, NonTransparent) + require.NotNil(t, receiver) + if wantTLS { + receiver.ServerConfig = *pki.TLSServerConfig() + } + require.Equal(t, receiver.KeepAlivePeriod, keepAlive) + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + // Connect + var conn net.Conn + var err error + if wantTLS { + config, e := pki.TLSClientConfig().TLSConfig() + require.NoError(t, e) + config.ServerName = "localhost" + conn, err = tls.Dial(protocol, address, config) + } else { + conn, err = net.Dial(protocol, address) + defer conn.Close() + } + require.NotNil(t, conn) + require.NoError(t, err) + + // Clear + acc.ClearMetrics() + acc.Errors = make([]error, 0) + + // Write + _, err = conn.Write(tc.data) + conn.Close() + require.NoError(t, err) + + // Wait that the the number of data points is accumulated + // Since the receiver is running concurrently + if tc.wantStrict != nil { + acc.Wait(len(tc.wantStrict)) + } + + // Wait the parsing error + acc.WaitError(tc.werr) + + // Verify + if len(acc.Errors) != tc.werr { + t.Fatalf("Got unexpected errors. want error = %v, errors = %v\n", tc.werr, acc.Errors) + } + var got []testutil.Metric + for _, metric := range acc.Metrics { + got = append(got, *metric) + } + if !cmp.Equal(tc.wantStrict, got) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(tc.wantStrict, got)) + } + }) + } +} + +func testBestEffortNonTransparent(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) { + for _, tc := range getTestCasesForNonTransparent() { + t.Run(tc.name, func(t *testing.T) { + // Creation of a best effort mode receiver + receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, NonTransparent) + require.NotNil(t, receiver) + if wantTLS { + receiver.ServerConfig = *pki.TLSServerConfig() + } + require.Equal(t, receiver.KeepAlivePeriod, keepAlive) + acc := &testutil.Accumulator{} + require.NoError(t, receiver.Start(acc)) + defer receiver.Stop() + + // Connect + var conn net.Conn + var err error + if wantTLS { + config, e := pki.TLSClientConfig().TLSConfig() + require.NoError(t, e) + config.ServerName = "localhost" + conn, err = tls.Dial(protocol, address, config) + } else { + conn, err = net.Dial(protocol, address) + } + require.NotNil(t, conn) + require.NoError(t, err) + + // Clear + acc.ClearMetrics() + acc.Errors = make([]error, 0) + + // Write + _, err = conn.Write(tc.data) + require.NoError(t, err) + conn.Close() + + // Wait that the the number of data points is accumulated + // Since the receiver is running concurrently + if tc.wantBestEffort != nil { + acc.Wait(len(tc.wantBestEffort)) + } + + // Verify + var got []testutil.Metric + for _, metric := range acc.Metrics { + got = append(got, *metric) + } + if !cmp.Equal(tc.wantBestEffort, got) { + t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(tc.wantBestEffort, got)) + } + }) + } +} + +func TestNonTransparentStrict_tcp(t *testing.T) { + testStrictNonTransparent(t, "tcp", address, false, nil) +} + +func TestNonTransparentBestEffort_tcp(t *testing.T) { + testBestEffortNonTransparent(t, "tcp", address, false, nil) +} + +func TestNonTransparentStrict_tcp_tls(t *testing.T) { + testStrictNonTransparent(t, "tcp", address, true, nil) +} + +func TestNonTransparentBestEffort_tcp_tls(t *testing.T) { + testBestEffortNonTransparent(t, "tcp", address, true, nil) +} + +func TestNonTransparentStrictWithKeepAlive_tcp_tls(t *testing.T) { + testStrictNonTransparent(t, "tcp", address, true, &internal.Duration{Duration: time.Minute}) +} + +func TestNonTransparentStrictWithZeroKeepAlive_tcp_tls(t *testing.T) { + testStrictNonTransparent(t, "tcp", address, true, &internal.Duration{Duration: 0}) +} + +func TestNonTransparentStrict_unix(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "telegraf") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + sock := filepath.Join(tmpdir, "syslog.TestStrict_unix.sock") + testStrictNonTransparent(t, "unix", sock, false, nil) +} + +func TestNonTransparentBestEffort_unix(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "telegraf") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + sock := filepath.Join(tmpdir, "syslog.TestBestEffort_unix.sock") + testBestEffortNonTransparent(t, "unix", sock, false, nil) +} + +func TestNonTransparentStrict_unix_tls(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "telegraf") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + sock := filepath.Join(tmpdir, "syslog.TestStrict_unix_tls.sock") + testStrictNonTransparent(t, "unix", sock, true, nil) +} + +func TestNonTransparentBestEffort_unix_tls(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "telegraf") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + sock := filepath.Join(tmpdir, "syslog.TestBestEffort_unix_tls.sock") + testBestEffortNonTransparent(t, "unix", sock, true, nil) +} diff --git a/plugins/inputs/syslog/rfc5425_test.go b/plugins/inputs/syslog/octetcounting_test.go similarity index 84% rename from plugins/inputs/syslog/rfc5425_test.go rename to plugins/inputs/syslog/octetcounting_test.go index d629024b7..c61805131 100644 --- a/plugins/inputs/syslog/rfc5425_test.go +++ b/plugins/inputs/syslog/octetcounting_test.go @@ -16,20 +16,8 @@ import ( "github.com/stretchr/testify/require" ) -var ( - pki = testutil.NewPKI("../../../testutil/pki") -) - -type testCase5425 struct { - name string - data []byte - wantBestEffort []testutil.Metric - wantStrict []testutil.Metric - werr int // how many errors we expect in the strict mode? -} - -func getTestCasesForRFC5425() []testCase5425 { - testCases := []testCase5425{ +func getTestCasesForOctetCounting() []testCaseStream { + testCases := []testCaseStream{ { name: "1st/avg/ok", data: []byte(`188 <29>1 2016-02-21T04:32:57+00:00 web1 someservice 2341 2 [origin][meta sequence="14125553" service="someservice"] "GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`), @@ -346,34 +334,11 @@ func getTestCasesForRFC5425() []testCase5425 { return testCases } -func newTCPSyslogReceiver(address string, keepAlive *internal.Duration, maxConn int, bestEffort bool) *Syslog { - d := &internal.Duration{ - Duration: defaultReadTimeout, - } - s := &Syslog{ - Address: address, - now: func() time.Time { - return defaultTime - }, - ReadTimeout: d, - BestEffort: bestEffort, - Separator: "_", - } - if keepAlive != nil { - s.KeepAlivePeriod = keepAlive - } - if maxConn > 0 { - s.MaxConnections = maxConn - } - - return s -} - -func testStrictRFC5425(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) { - for _, tc := range getTestCasesForRFC5425() { +func testStrictOctetCounting(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) { + for _, tc := range getTestCasesForOctetCounting() { t.Run(tc.name, func(t *testing.T) { // Creation of a strict mode receiver - receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false) + receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, OctetCounting) require.NotNil(t, receiver) if wantTLS { receiver.ServerConfig = *pki.TLSServerConfig() @@ -431,11 +396,11 @@ func testStrictRFC5425(t *testing.T, protocol string, address string, wantTLS bo } } -func testBestEffortRFC5425(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) { - for _, tc := range getTestCasesForRFC5425() { +func testBestEffortOctetCounting(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) { + for _, tc := range getTestCasesForOctetCounting() { t.Run(tc.name, func(t *testing.T) { // Creation of a best effort mode receiver - receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true) + receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, OctetCounting) require.NotNil(t, receiver) if wantTLS { receiver.ServerConfig = *pki.TLSServerConfig() @@ -486,58 +451,58 @@ func testBestEffortRFC5425(t *testing.T, protocol string, address string, wantTL } } -func TestStrict_tcp(t *testing.T) { - testStrictRFC5425(t, "tcp", address, false, nil) +func TestOctetCountingStrict_tcp(t *testing.T) { + testStrictOctetCounting(t, "tcp", address, false, nil) } -func TestBestEffort_tcp(t *testing.T) { - testBestEffortRFC5425(t, "tcp", address, false, nil) +func TestOctetCountingBestEffort_tcp(t *testing.T) { + testBestEffortOctetCounting(t, "tcp", address, false, nil) } -func TestStrict_tcp_tls(t *testing.T) { - testStrictRFC5425(t, "tcp", address, true, nil) +func TestOctetCountingStrict_tcp_tls(t *testing.T) { + testStrictOctetCounting(t, "tcp", address, true, nil) } -func TestBestEffort_tcp_tls(t *testing.T) { - testBestEffortRFC5425(t, "tcp", address, true, nil) +func TestOctetCountingBestEffort_tcp_tls(t *testing.T) { + testBestEffortOctetCounting(t, "tcp", address, true, nil) } -func TestStrictWithKeepAlive_tcp_tls(t *testing.T) { - testStrictRFC5425(t, "tcp", address, true, &internal.Duration{Duration: time.Minute}) +func TestOctetCountingStrictWithKeepAlive_tcp_tls(t *testing.T) { + testStrictOctetCounting(t, "tcp", address, true, &internal.Duration{Duration: time.Minute}) } -func TestStrictWithZeroKeepAlive_tcp_tls(t *testing.T) { - testStrictRFC5425(t, "tcp", address, true, &internal.Duration{Duration: 0}) +func TestOctetCountingStrictWithZeroKeepAlive_tcp_tls(t *testing.T) { + testStrictOctetCounting(t, "tcp", address, true, &internal.Duration{Duration: 0}) } -func TestStrict_unix(t *testing.T) { +func TestOctetCountingStrict_unix(t *testing.T) { tmpdir, err := ioutil.TempDir("", "telegraf") require.NoError(t, err) defer os.RemoveAll(tmpdir) sock := filepath.Join(tmpdir, "syslog.TestStrict_unix.sock") - testStrictRFC5425(t, "unix", sock, false, nil) + testStrictOctetCounting(t, "unix", sock, false, nil) } -func TestBestEffort_unix(t *testing.T) { +func TestOctetCountingBestEffort_unix(t *testing.T) { tmpdir, err := ioutil.TempDir("", "telegraf") require.NoError(t, err) defer os.RemoveAll(tmpdir) sock := filepath.Join(tmpdir, "syslog.TestBestEffort_unix.sock") - testBestEffortRFC5425(t, "unix", sock, false, nil) + testBestEffortOctetCounting(t, "unix", sock, false, nil) } -func TestStrict_unix_tls(t *testing.T) { +func TestOctetCountingStrict_unix_tls(t *testing.T) { tmpdir, err := ioutil.TempDir("", "telegraf") require.NoError(t, err) defer os.RemoveAll(tmpdir) sock := filepath.Join(tmpdir, "syslog.TestStrict_unix_tls.sock") - testStrictRFC5425(t, "unix", sock, true, nil) + testStrictOctetCounting(t, "unix", sock, true, nil) } -func TestBestEffort_unix_tls(t *testing.T) { +func TestOctetCountingBestEffort_unix_tls(t *testing.T) { tmpdir, err := ioutil.TempDir("", "telegraf") require.NoError(t, err) defer os.RemoveAll(tmpdir) sock := filepath.Join(tmpdir, "syslog.TestBestEffort_unix_tls.sock") - testBestEffortRFC5425(t, "unix", sock, true, nil) + testBestEffortOctetCounting(t, "unix", sock, true, nil) } diff --git a/plugins/inputs/syslog/rfc5426_test.go b/plugins/inputs/syslog/rfc5426_test.go index 67966ed1d..ba856b0ac 100644 --- a/plugins/inputs/syslog/rfc5426_test.go +++ b/plugins/inputs/syslog/rfc5426_test.go @@ -15,16 +15,8 @@ import ( "github.com/stretchr/testify/require" ) -type testCase5426 struct { - name string - data []byte - wantBestEffort *testutil.Metric - wantStrict *testutil.Metric - werr bool -} - -func getTestCasesForRFC5426() []testCase5426 { - testCases := []testCase5426{ +func getTestCasesForRFC5426() []testCasePacket { + testCases := []testCasePacket{ { name: "empty", data: []byte(""), @@ -239,17 +231,6 @@ func getTestCasesForRFC5426() []testCase5426 { return testCases } -func newUDPSyslogReceiver(address string, bestEffort bool) *Syslog { - return &Syslog{ - Address: address, - now: func() time.Time { - return defaultTime - }, - BestEffort: bestEffort, - Separator: "_", - } -} - func testRFC5426(t *testing.T, protocol string, address string, bestEffort bool) { for _, tc := range getTestCasesForRFC5426() { t.Run(tc.name, func(t *testing.T) { diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go index 034e03df2..ab2277caa 100644 --- a/plugins/inputs/syslog/syslog.go +++ b/plugins/inputs/syslog/syslog.go @@ -12,8 +12,10 @@ import ( "time" "unicode" + "github.com/influxdata/go-syslog" + "github.com/influxdata/go-syslog/nontransparent" + "github.com/influxdata/go-syslog/octetcounting" "github.com/influxdata/go-syslog/rfc5424" - "github.com/influxdata/go-syslog/rfc5425" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" tlsConfig "github.com/influxdata/telegraf/internal/tls" @@ -28,8 +30,10 @@ type Syslog struct { tlsConfig.ServerConfig Address string `toml:"server"` KeepAlivePeriod *internal.Duration - ReadTimeout *internal.Duration MaxConnections int + ReadTimeout *internal.Duration + Framing Framing + Trailer nontransparent.TrailerType BestEffort bool Separator string `toml:"sdparam_separator"` @@ -76,6 +80,16 @@ var sampleConfig = ` ## 0 means unlimited. # read_timeout = "5s" + ## The framing technique with which it is expected that messages are transported (default = "octet-counting"). + ## Whether the messages come using the octect-counting (RFC5425#section-4.3.1, RFC6587#section-3.4.1), + ## or the non-transparent framing technique (RFC6587#section-3.4.2). + ## Must be one of "octect-counting", "non-transparent". + # framing = "octet-counting" + + ## The trailer to be expected in case of non-trasparent framing (default = "LF"). + ## Must be one of "LF", or "NUL". + # trailer = "LF" + ## Whether to parse in best effort mode or not (default = false). ## By default best effort parsing is off. # best_effort = false @@ -95,7 +109,7 @@ func (s *Syslog) SampleConfig() string { // Description returns the plugin description func (s *Syslog) Description() string { - return "Accepts syslog messages per RFC5425" + return "Accepts syslog messages following RFC5424 format with transports as per RFC5426, RFC5425, or RFC6587" } // Gather ... @@ -203,7 +217,12 @@ func getAddressParts(a string) (string, string, error) { func (s *Syslog) listenPacket(acc telegraf.Accumulator) { defer s.wg.Done() b := make([]byte, ipMaxPacketSize) - p := rfc5424.NewParser() + var p syslog.Machine + if s.BestEffort { + p = rfc5424.NewParser(rfc5424.WithBestEffort()) + } else { + p = rfc5424.NewParser() + } for { n, _, err := s.udpListener.ReadFrom(b) if err != nil { @@ -213,9 +232,9 @@ func (s *Syslog) listenPacket(acc telegraf.Accumulator) { break } - message, err := p.Parse(b[:n], &s.BestEffort) + message, err := p.Parse(b[:n]) if message != nil { - acc.AddFields("syslog", fields(*message, s), tags(*message), s.time()) + acc.AddFields("syslog", fields(message, s), tags(message), s.time()) } if err != nil { acc.AddError(err) @@ -276,24 +295,38 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) { conn.Close() }() - var p *rfc5425.Parser + var p syslog.Parser - if s.BestEffort { - p = rfc5425.NewParser(conn, rfc5425.WithBestEffort()) - } else { - p = rfc5425.NewParser(conn) - } - - if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 { - conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) - } - - p.ParseExecuting(func(r *rfc5425.Result) { + emit := func(r *syslog.Result) { s.store(*r, acc) if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 { conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) } - }) + } + + // Create parser options + opts := []syslog.ParserOption{ + syslog.WithListener(emit), + } + if s.BestEffort { + opts = append(opts, syslog.WithBestEffort()) + } + + // Select the parser to use depeding on transport framing + if s.Framing == OctetCounting { + // Octet counting transparent framing + p = octetcounting.NewParser(opts...) + } else { + // Non-transparent framing + opts = append(opts, nontransparent.WithTrailer(s.Trailer)) + p = nontransparent.NewParser(opts...) + } + + p.Parse(conn) + + if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 { + conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) + } } func (s *Syslog) setKeepAlive(c *net.TCPConn) error { @@ -310,20 +343,16 @@ func (s *Syslog) setKeepAlive(c *net.TCPConn) error { return c.SetKeepAlivePeriod(s.KeepAlivePeriod.Duration) } -func (s *Syslog) store(res rfc5425.Result, acc telegraf.Accumulator) { +func (s *Syslog) store(res syslog.Result, acc telegraf.Accumulator) { if res.Error != nil { acc.AddError(res.Error) } - if res.MessageError != nil { - acc.AddError(res.MessageError) - } if res.Message != nil { - msg := *res.Message - acc.AddFields("syslog", fields(msg, s), tags(msg), s.time()) + acc.AddFields("syslog", fields(res.Message, s), tags(res.Message), s.time()) } } -func tags(msg rfc5424.SyslogMessage) map[string]string { +func tags(msg syslog.Message) map[string]string { ts := map[string]string{} // Not checking assuming a minimally valid message @@ -341,7 +370,7 @@ func tags(msg rfc5424.SyslogMessage) map[string]string { return ts } -func fields(msg rfc5424.SyslogMessage, s *Syslog) map[string]interface{} { +func fields(msg syslog.Message, s *Syslog) map[string]interface{} { // Not checking assuming a minimally valid message flds := map[string]interface{}{ "version": msg.Version(), @@ -415,6 +444,8 @@ func init() { ReadTimeout: &internal.Duration{ Duration: defaultReadTimeout, }, + Framing: OctetCounting, + Trailer: nontransparent.LF, Separator: "_", }