Add support for non-transparent framing of syslog messages (#5148)

This commit is contained in:
Leonardo Di Donato
2018-12-18 19:54:38 +01:00
committed by Daniel Nelson
parent 1334919224
commit 697381d4b5
10 changed files with 601 additions and 120 deletions

View File

@@ -2,7 +2,8 @@
The syslog plugin listens for syslog messages transmitted over
[UDP](https://tools.ietf.org/html/rfc5426) or
[TCP](https://tools.ietf.org/html/rfc5425).
[TCP](https://tools.ietf.org/html/rfc6587) or
[TLS](https://tools.ietf.org/html/rfc5425), with or without the octet counting framing.
Syslog messages should be formatted according to
[RFC 5424](https://tools.ietf.org/html/rfc5424).
@@ -37,6 +38,16 @@ Syslog messages should be formatted according to
## 0 means unlimited.
# read_timeout = "5s"
## The framing technique with which it is expected that messages are transported (default = "octet-counting").
## Whether the messages come using the octect-counting (RFC5425#section-4.3.1, RFC6587#section-3.4.1),
## or the non-transparent framing technique (RFC6587#section-3.4.2).
## Must be one of "octect-counting", "non-transparent".
# framing = "octet-counting"
## The trailer to be expected in case of non-trasparent framing (default = "LF").
## Must be one of "LF", or "NUL".
# trailer = "LF"
## Whether to parse in best effort mode or not (default = false).
## By default best effort parsing is off.
# best_effort = false
@@ -49,11 +60,18 @@ Syslog messages should be formatted according to
# sdparam_separator = "_"
```
#### Best Effort
#### Message transport
The `framing` option only applies to streams. It governs the way we expect to receive messages within the stream.
Namely, with the [`"octet counting"`](https://tools.ietf.org/html/rfc5425#section-4.3) technique (default) or with the [`"non-transparent"`](https://tools.ietf.org/html/rfc6587#section-3.4.2) framing.
The `trailer` option only applies when `framing` option is `"non-transparent"`. It must have one of the following values: `"LF"` (default), or `"NUL"`.
#### Best effort
The [`best_effort`](https://github.com/influxdata/go-syslog#best-effort-mode)
option instructs the parser to extract partial but valid info from syslog
messages. If unset only full messages will be collected.
messages. If unset only full messages will be collected.
#### Rsyslog Integration

View File

@@ -0,0 +1,62 @@
package syslog
import (
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"time"
)
var (
pki = testutil.NewPKI("../../../testutil/pki")
)
type testCasePacket struct {
name string
data []byte
wantBestEffort *testutil.Metric
wantStrict *testutil.Metric
werr bool
}
type testCaseStream struct {
name string
data []byte
wantBestEffort []testutil.Metric
wantStrict []testutil.Metric
werr int // how many errors we expect in the strict mode?
}
func newUDPSyslogReceiver(address string, bestEffort bool) *Syslog {
return &Syslog{
Address: address,
now: func() time.Time {
return defaultTime
},
BestEffort: bestEffort,
Separator: "_",
}
}
func newTCPSyslogReceiver(address string, keepAlive *internal.Duration, maxConn int, bestEffort bool, f Framing) *Syslog {
d := &internal.Duration{
Duration: defaultReadTimeout,
}
s := &Syslog{
Address: address,
now: func() time.Time {
return defaultTime
},
Framing: f,
ReadTimeout: d,
BestEffort: bestEffort,
Separator: "_",
}
if keepAlive != nil {
s.KeepAlivePeriod = keepAlive
}
if maxConn > 0 {
s.MaxConnections = maxConn
}
return s
}

View File

@@ -0,0 +1,64 @@
package syslog
import (
"fmt"
"strings"
)
// Framing represents the framing technique we expect the messages to come.
type Framing int
const (
// OctetCounting indicates the transparent framing technique for syslog transport.
OctetCounting Framing = iota
// NonTransparent indicates the non-transparent framing technique for syslog transport.
NonTransparent
)
func (f Framing) String() string {
switch f {
case OctetCounting:
return "OCTET-COUNTING"
case NonTransparent:
return "NON-TRANSPARENT"
}
return ""
}
// UnmarshalTOML implements ability to unmarshal framing from TOML files.
func (f *Framing) UnmarshalTOML(data []byte) (err error) {
return f.UnmarshalText(data)
}
// UnmarshalText implements encoding.TextUnmarshaler
func (f *Framing) UnmarshalText(data []byte) (err error) {
s := string(data)
switch strings.ToUpper(s) {
case `OCTET-COUNTING`:
fallthrough
case `"OCTET-COUNTING"`:
fallthrough
case `'OCTET-COUNTING'`:
*f = OctetCounting
return
case `NON-TRANSPARENT`:
fallthrough
case `"NON-TRANSPARENT"`:
fallthrough
case `'NON-TRANSPARENT'`:
*f = NonTransparent
return
}
*f = -1
return fmt.Errorf("unknown framing")
}
// MarshalText implements encoding.TextMarshaler
func (f Framing) MarshalText() ([]byte, error) {
s := f.String()
if s != "" {
return []byte(s), nil
}
return nil, fmt.Errorf("unknown framing")
}

View File

@@ -0,0 +1,37 @@
package syslog
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestFraming(t *testing.T) {
var f1 Framing
f1.UnmarshalTOML([]byte(`"non-transparent"`))
assert.Equal(t, NonTransparent, f1)
var f2 Framing
f2.UnmarshalTOML([]byte(`non-transparent`))
assert.Equal(t, NonTransparent, f2)
var f3 Framing
f3.UnmarshalTOML([]byte(`'non-transparent'`))
assert.Equal(t, NonTransparent, f3)
var f4 Framing
f4.UnmarshalTOML([]byte(`"octet-counting"`))
assert.Equal(t, OctetCounting, f4)
var f5 Framing
f5.UnmarshalTOML([]byte(`octet-counting`))
assert.Equal(t, OctetCounting, f5)
var f6 Framing
f6.UnmarshalTOML([]byte(`'octet-counting'`))
assert.Equal(t, OctetCounting, f6)
var f7 Framing
err := f7.UnmarshalTOML([]byte(`nope`))
assert.Equal(t, Framing(-1), f7)
assert.Error(t, err)
}

View File

@@ -0,0 +1,308 @@
package syslog
import (
"crypto/tls"
"io/ioutil"
"net"
"os"
"path/filepath"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func getTestCasesForNonTransparent() []testCaseStream {
testCases := []testCaseStream{
{
name: "1st/avg/ok",
data: []byte(`<29>1 2016-02-21T04:32:57+00:00 web1 someservice 2341 2 [origin][meta sequence="14125553" service="someservice"] "GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`),
wantStrict: []testutil.Metric{
{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"timestamp": time.Unix(1456029177, 0).UnixNano(),
"procid": "2341",
"msgid": "2",
"message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`,
"origin": true,
"meta_sequence": "14125553",
"meta_service": "someservice",
"severity_code": 5,
"facility_code": 3,
},
Tags: map[string]string{
"severity": "notice",
"facility": "daemon",
"hostname": "web1",
"appname": "someservice",
},
Time: defaultTime,
},
},
wantBestEffort: []testutil.Metric{
{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"timestamp": time.Unix(1456029177, 0).UnixNano(),
"procid": "2341",
"msgid": "2",
"message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`,
"origin": true,
"meta_sequence": "14125553",
"meta_service": "someservice",
"severity_code": 5,
"facility_code": 3,
},
Tags: map[string]string{
"severity": "notice",
"facility": "daemon",
"hostname": "web1",
"appname": "someservice",
},
Time: defaultTime,
},
},
werr: 1,
},
{
name: "1st/min/ok//2nd/min/ok",
data: []byte("<1>2 - - - - - -\n<4>11 - - - - - -\n"),
wantStrict: []testutil.Metric{
{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(2),
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(11),
"severity_code": 4,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "warning",
"facility": "kern",
},
Time: defaultTime.Add(time.Nanosecond),
},
},
wantBestEffort: []testutil.Metric{
{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(2),
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(11),
"severity_code": 4,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "warning",
"facility": "kern",
},
Time: defaultTime.Add(time.Nanosecond),
},
},
},
}
return testCases
}
func testStrictNonTransparent(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) {
for _, tc := range getTestCasesForNonTransparent() {
t.Run(tc.name, func(t *testing.T) {
// Creation of a strict mode receiver
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, NonTransparent)
require.NotNil(t, receiver)
if wantTLS {
receiver.ServerConfig = *pki.TLSServerConfig()
}
require.Equal(t, receiver.KeepAlivePeriod, keepAlive)
acc := &testutil.Accumulator{}
require.NoError(t, receiver.Start(acc))
defer receiver.Stop()
// Connect
var conn net.Conn
var err error
if wantTLS {
config, e := pki.TLSClientConfig().TLSConfig()
require.NoError(t, e)
config.ServerName = "localhost"
conn, err = tls.Dial(protocol, address, config)
} else {
conn, err = net.Dial(protocol, address)
defer conn.Close()
}
require.NotNil(t, conn)
require.NoError(t, err)
// Clear
acc.ClearMetrics()
acc.Errors = make([]error, 0)
// Write
_, err = conn.Write(tc.data)
conn.Close()
require.NoError(t, err)
// Wait that the the number of data points is accumulated
// Since the receiver is running concurrently
if tc.wantStrict != nil {
acc.Wait(len(tc.wantStrict))
}
// Wait the parsing error
acc.WaitError(tc.werr)
// Verify
if len(acc.Errors) != tc.werr {
t.Fatalf("Got unexpected errors. want error = %v, errors = %v\n", tc.werr, acc.Errors)
}
var got []testutil.Metric
for _, metric := range acc.Metrics {
got = append(got, *metric)
}
if !cmp.Equal(tc.wantStrict, got) {
t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(tc.wantStrict, got))
}
})
}
}
func testBestEffortNonTransparent(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) {
for _, tc := range getTestCasesForNonTransparent() {
t.Run(tc.name, func(t *testing.T) {
// Creation of a best effort mode receiver
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, NonTransparent)
require.NotNil(t, receiver)
if wantTLS {
receiver.ServerConfig = *pki.TLSServerConfig()
}
require.Equal(t, receiver.KeepAlivePeriod, keepAlive)
acc := &testutil.Accumulator{}
require.NoError(t, receiver.Start(acc))
defer receiver.Stop()
// Connect
var conn net.Conn
var err error
if wantTLS {
config, e := pki.TLSClientConfig().TLSConfig()
require.NoError(t, e)
config.ServerName = "localhost"
conn, err = tls.Dial(protocol, address, config)
} else {
conn, err = net.Dial(protocol, address)
}
require.NotNil(t, conn)
require.NoError(t, err)
// Clear
acc.ClearMetrics()
acc.Errors = make([]error, 0)
// Write
_, err = conn.Write(tc.data)
require.NoError(t, err)
conn.Close()
// Wait that the the number of data points is accumulated
// Since the receiver is running concurrently
if tc.wantBestEffort != nil {
acc.Wait(len(tc.wantBestEffort))
}
// Verify
var got []testutil.Metric
for _, metric := range acc.Metrics {
got = append(got, *metric)
}
if !cmp.Equal(tc.wantBestEffort, got) {
t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(tc.wantBestEffort, got))
}
})
}
}
func TestNonTransparentStrict_tcp(t *testing.T) {
testStrictNonTransparent(t, "tcp", address, false, nil)
}
func TestNonTransparentBestEffort_tcp(t *testing.T) {
testBestEffortNonTransparent(t, "tcp", address, false, nil)
}
func TestNonTransparentStrict_tcp_tls(t *testing.T) {
testStrictNonTransparent(t, "tcp", address, true, nil)
}
func TestNonTransparentBestEffort_tcp_tls(t *testing.T) {
testBestEffortNonTransparent(t, "tcp", address, true, nil)
}
func TestNonTransparentStrictWithKeepAlive_tcp_tls(t *testing.T) {
testStrictNonTransparent(t, "tcp", address, true, &internal.Duration{Duration: time.Minute})
}
func TestNonTransparentStrictWithZeroKeepAlive_tcp_tls(t *testing.T) {
testStrictNonTransparent(t, "tcp", address, true, &internal.Duration{Duration: 0})
}
func TestNonTransparentStrict_unix(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "telegraf")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
sock := filepath.Join(tmpdir, "syslog.TestStrict_unix.sock")
testStrictNonTransparent(t, "unix", sock, false, nil)
}
func TestNonTransparentBestEffort_unix(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "telegraf")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
sock := filepath.Join(tmpdir, "syslog.TestBestEffort_unix.sock")
testBestEffortNonTransparent(t, "unix", sock, false, nil)
}
func TestNonTransparentStrict_unix_tls(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "telegraf")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
sock := filepath.Join(tmpdir, "syslog.TestStrict_unix_tls.sock")
testStrictNonTransparent(t, "unix", sock, true, nil)
}
func TestNonTransparentBestEffort_unix_tls(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "telegraf")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
sock := filepath.Join(tmpdir, "syslog.TestBestEffort_unix_tls.sock")
testBestEffortNonTransparent(t, "unix", sock, true, nil)
}

View File

@@ -16,20 +16,8 @@ import (
"github.com/stretchr/testify/require"
)
var (
pki = testutil.NewPKI("../../../testutil/pki")
)
type testCase5425 struct {
name string
data []byte
wantBestEffort []testutil.Metric
wantStrict []testutil.Metric
werr int // how many errors we expect in the strict mode?
}
func getTestCasesForRFC5425() []testCase5425 {
testCases := []testCase5425{
func getTestCasesForOctetCounting() []testCaseStream {
testCases := []testCaseStream{
{
name: "1st/avg/ok",
data: []byte(`188 <29>1 2016-02-21T04:32:57+00:00 web1 someservice 2341 2 [origin][meta sequence="14125553" service="someservice"] "GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`),
@@ -346,34 +334,11 @@ func getTestCasesForRFC5425() []testCase5425 {
return testCases
}
func newTCPSyslogReceiver(address string, keepAlive *internal.Duration, maxConn int, bestEffort bool) *Syslog {
d := &internal.Duration{
Duration: defaultReadTimeout,
}
s := &Syslog{
Address: address,
now: func() time.Time {
return defaultTime
},
ReadTimeout: d,
BestEffort: bestEffort,
Separator: "_",
}
if keepAlive != nil {
s.KeepAlivePeriod = keepAlive
}
if maxConn > 0 {
s.MaxConnections = maxConn
}
return s
}
func testStrictRFC5425(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) {
for _, tc := range getTestCasesForRFC5425() {
func testStrictOctetCounting(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) {
for _, tc := range getTestCasesForOctetCounting() {
t.Run(tc.name, func(t *testing.T) {
// Creation of a strict mode receiver
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false)
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, OctetCounting)
require.NotNil(t, receiver)
if wantTLS {
receiver.ServerConfig = *pki.TLSServerConfig()
@@ -431,11 +396,11 @@ func testStrictRFC5425(t *testing.T, protocol string, address string, wantTLS bo
}
}
func testBestEffortRFC5425(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) {
for _, tc := range getTestCasesForRFC5425() {
func testBestEffortOctetCounting(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) {
for _, tc := range getTestCasesForOctetCounting() {
t.Run(tc.name, func(t *testing.T) {
// Creation of a best effort mode receiver
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true)
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, OctetCounting)
require.NotNil(t, receiver)
if wantTLS {
receiver.ServerConfig = *pki.TLSServerConfig()
@@ -486,58 +451,58 @@ func testBestEffortRFC5425(t *testing.T, protocol string, address string, wantTL
}
}
func TestStrict_tcp(t *testing.T) {
testStrictRFC5425(t, "tcp", address, false, nil)
func TestOctetCountingStrict_tcp(t *testing.T) {
testStrictOctetCounting(t, "tcp", address, false, nil)
}
func TestBestEffort_tcp(t *testing.T) {
testBestEffortRFC5425(t, "tcp", address, false, nil)
func TestOctetCountingBestEffort_tcp(t *testing.T) {
testBestEffortOctetCounting(t, "tcp", address, false, nil)
}
func TestStrict_tcp_tls(t *testing.T) {
testStrictRFC5425(t, "tcp", address, true, nil)
func TestOctetCountingStrict_tcp_tls(t *testing.T) {
testStrictOctetCounting(t, "tcp", address, true, nil)
}
func TestBestEffort_tcp_tls(t *testing.T) {
testBestEffortRFC5425(t, "tcp", address, true, nil)
func TestOctetCountingBestEffort_tcp_tls(t *testing.T) {
testBestEffortOctetCounting(t, "tcp", address, true, nil)
}
func TestStrictWithKeepAlive_tcp_tls(t *testing.T) {
testStrictRFC5425(t, "tcp", address, true, &internal.Duration{Duration: time.Minute})
func TestOctetCountingStrictWithKeepAlive_tcp_tls(t *testing.T) {
testStrictOctetCounting(t, "tcp", address, true, &internal.Duration{Duration: time.Minute})
}
func TestStrictWithZeroKeepAlive_tcp_tls(t *testing.T) {
testStrictRFC5425(t, "tcp", address, true, &internal.Duration{Duration: 0})
func TestOctetCountingStrictWithZeroKeepAlive_tcp_tls(t *testing.T) {
testStrictOctetCounting(t, "tcp", address, true, &internal.Duration{Duration: 0})
}
func TestStrict_unix(t *testing.T) {
func TestOctetCountingStrict_unix(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "telegraf")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
sock := filepath.Join(tmpdir, "syslog.TestStrict_unix.sock")
testStrictRFC5425(t, "unix", sock, false, nil)
testStrictOctetCounting(t, "unix", sock, false, nil)
}
func TestBestEffort_unix(t *testing.T) {
func TestOctetCountingBestEffort_unix(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "telegraf")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
sock := filepath.Join(tmpdir, "syslog.TestBestEffort_unix.sock")
testBestEffortRFC5425(t, "unix", sock, false, nil)
testBestEffortOctetCounting(t, "unix", sock, false, nil)
}
func TestStrict_unix_tls(t *testing.T) {
func TestOctetCountingStrict_unix_tls(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "telegraf")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
sock := filepath.Join(tmpdir, "syslog.TestStrict_unix_tls.sock")
testStrictRFC5425(t, "unix", sock, true, nil)
testStrictOctetCounting(t, "unix", sock, true, nil)
}
func TestBestEffort_unix_tls(t *testing.T) {
func TestOctetCountingBestEffort_unix_tls(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "telegraf")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
sock := filepath.Join(tmpdir, "syslog.TestBestEffort_unix_tls.sock")
testBestEffortRFC5425(t, "unix", sock, true, nil)
testBestEffortOctetCounting(t, "unix", sock, true, nil)
}

View File

@@ -15,16 +15,8 @@ import (
"github.com/stretchr/testify/require"
)
type testCase5426 struct {
name string
data []byte
wantBestEffort *testutil.Metric
wantStrict *testutil.Metric
werr bool
}
func getTestCasesForRFC5426() []testCase5426 {
testCases := []testCase5426{
func getTestCasesForRFC5426() []testCasePacket {
testCases := []testCasePacket{
{
name: "empty",
data: []byte(""),
@@ -239,17 +231,6 @@ func getTestCasesForRFC5426() []testCase5426 {
return testCases
}
func newUDPSyslogReceiver(address string, bestEffort bool) *Syslog {
return &Syslog{
Address: address,
now: func() time.Time {
return defaultTime
},
BestEffort: bestEffort,
Separator: "_",
}
}
func testRFC5426(t *testing.T, protocol string, address string, bestEffort bool) {
for _, tc := range getTestCasesForRFC5426() {
t.Run(tc.name, func(t *testing.T) {

View File

@@ -12,8 +12,10 @@ import (
"time"
"unicode"
"github.com/influxdata/go-syslog"
"github.com/influxdata/go-syslog/nontransparent"
"github.com/influxdata/go-syslog/octetcounting"
"github.com/influxdata/go-syslog/rfc5424"
"github.com/influxdata/go-syslog/rfc5425"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
tlsConfig "github.com/influxdata/telegraf/internal/tls"
@@ -28,8 +30,10 @@ type Syslog struct {
tlsConfig.ServerConfig
Address string `toml:"server"`
KeepAlivePeriod *internal.Duration
ReadTimeout *internal.Duration
MaxConnections int
ReadTimeout *internal.Duration
Framing Framing
Trailer nontransparent.TrailerType
BestEffort bool
Separator string `toml:"sdparam_separator"`
@@ -76,6 +80,16 @@ var sampleConfig = `
## 0 means unlimited.
# read_timeout = "5s"
## The framing technique with which it is expected that messages are transported (default = "octet-counting").
## Whether the messages come using the octect-counting (RFC5425#section-4.3.1, RFC6587#section-3.4.1),
## or the non-transparent framing technique (RFC6587#section-3.4.2).
## Must be one of "octect-counting", "non-transparent".
# framing = "octet-counting"
## The trailer to be expected in case of non-trasparent framing (default = "LF").
## Must be one of "LF", or "NUL".
# trailer = "LF"
## Whether to parse in best effort mode or not (default = false).
## By default best effort parsing is off.
# best_effort = false
@@ -95,7 +109,7 @@ func (s *Syslog) SampleConfig() string {
// Description returns the plugin description
func (s *Syslog) Description() string {
return "Accepts syslog messages per RFC5425"
return "Accepts syslog messages following RFC5424 format with transports as per RFC5426, RFC5425, or RFC6587"
}
// Gather ...
@@ -203,7 +217,12 @@ func getAddressParts(a string) (string, string, error) {
func (s *Syslog) listenPacket(acc telegraf.Accumulator) {
defer s.wg.Done()
b := make([]byte, ipMaxPacketSize)
p := rfc5424.NewParser()
var p syslog.Machine
if s.BestEffort {
p = rfc5424.NewParser(rfc5424.WithBestEffort())
} else {
p = rfc5424.NewParser()
}
for {
n, _, err := s.udpListener.ReadFrom(b)
if err != nil {
@@ -213,9 +232,9 @@ func (s *Syslog) listenPacket(acc telegraf.Accumulator) {
break
}
message, err := p.Parse(b[:n], &s.BestEffort)
message, err := p.Parse(b[:n])
if message != nil {
acc.AddFields("syslog", fields(*message, s), tags(*message), s.time())
acc.AddFields("syslog", fields(message, s), tags(message), s.time())
}
if err != nil {
acc.AddError(err)
@@ -276,24 +295,38 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
conn.Close()
}()
var p *rfc5425.Parser
var p syslog.Parser
if s.BestEffort {
p = rfc5425.NewParser(conn, rfc5425.WithBestEffort())
} else {
p = rfc5425.NewParser(conn)
}
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
}
p.ParseExecuting(func(r *rfc5425.Result) {
emit := func(r *syslog.Result) {
s.store(*r, acc)
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
}
})
}
// Create parser options
opts := []syslog.ParserOption{
syslog.WithListener(emit),
}
if s.BestEffort {
opts = append(opts, syslog.WithBestEffort())
}
// Select the parser to use depeding on transport framing
if s.Framing == OctetCounting {
// Octet counting transparent framing
p = octetcounting.NewParser(opts...)
} else {
// Non-transparent framing
opts = append(opts, nontransparent.WithTrailer(s.Trailer))
p = nontransparent.NewParser(opts...)
}
p.Parse(conn)
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
}
}
func (s *Syslog) setKeepAlive(c *net.TCPConn) error {
@@ -310,20 +343,16 @@ func (s *Syslog) setKeepAlive(c *net.TCPConn) error {
return c.SetKeepAlivePeriod(s.KeepAlivePeriod.Duration)
}
func (s *Syslog) store(res rfc5425.Result, acc telegraf.Accumulator) {
func (s *Syslog) store(res syslog.Result, acc telegraf.Accumulator) {
if res.Error != nil {
acc.AddError(res.Error)
}
if res.MessageError != nil {
acc.AddError(res.MessageError)
}
if res.Message != nil {
msg := *res.Message
acc.AddFields("syslog", fields(msg, s), tags(msg), s.time())
acc.AddFields("syslog", fields(res.Message, s), tags(res.Message), s.time())
}
}
func tags(msg rfc5424.SyslogMessage) map[string]string {
func tags(msg syslog.Message) map[string]string {
ts := map[string]string{}
// Not checking assuming a minimally valid message
@@ -341,7 +370,7 @@ func tags(msg rfc5424.SyslogMessage) map[string]string {
return ts
}
func fields(msg rfc5424.SyslogMessage, s *Syslog) map[string]interface{} {
func fields(msg syslog.Message, s *Syslog) map[string]interface{} {
// Not checking assuming a minimally valid message
flds := map[string]interface{}{
"version": msg.Version(),
@@ -415,6 +444,8 @@ func init() {
ReadTimeout: &internal.Duration{
Duration: defaultReadTimeout,
},
Framing: OctetCounting,
Trailer: nontransparent.LF,
Separator: "_",
}