Add syslog input plugin (#4181)

This commit is contained in:
Leonardo Di Donato 2018-05-25 20:40:12 +02:00 committed by Daniel Nelson
parent 70519e9a3a
commit 54c59f8688
7 changed files with 1520 additions and 0 deletions

1
Godeps
View File

@ -32,6 +32,7 @@ github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836
github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
github.com/hashicorp/consul 5174058f0d2bda63fa5198ab96c33d9a909c58ed github.com/hashicorp/consul 5174058f0d2bda63fa5198ab96c33d9a909c58ed
github.com/influxdata/go-syslog 84f3b60009444d298f97454feb1f20cf91d1fa6e
github.com/influxdata/tail c43482518d410361b6c383d7aebce33d0471d7bc github.com/influxdata/tail c43482518d410361b6c383d7aebce33d0471d7bc
github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f
github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec

View File

@ -97,6 +97,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/solr" _ "github.com/influxdata/telegraf/plugins/inputs/solr"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
_ "github.com/influxdata/telegraf/plugins/inputs/statsd" _ "github.com/influxdata/telegraf/plugins/inputs/statsd"
_ "github.com/influxdata/telegraf/plugins/inputs/syslog"
_ "github.com/influxdata/telegraf/plugins/inputs/sysstat" _ "github.com/influxdata/telegraf/plugins/inputs/sysstat"
_ "github.com/influxdata/telegraf/plugins/inputs/system" _ "github.com/influxdata/telegraf/plugins/inputs/system"
_ "github.com/influxdata/telegraf/plugins/inputs/tail" _ "github.com/influxdata/telegraf/plugins/inputs/tail"

View File

@ -0,0 +1,119 @@
# syslog input plugin
Collects syslog messages as per RFC5425 or RFC5426.
It can act as a syslog transport receiver over TLS (or TCP) - ie., RFC5425 - or over UDP - ie., RFC5426.
This plugin listens for syslog messages following RFC5424 format. When received it parses them extracting metrics.
### Configuration
```toml
[[inputs.syslog]]
## Specify an ip or hostname with port - eg., tcp://localhost:6514, tcp://10.0.0.1:6514
## Protocol, address and port to host the syslog receiver.
## If no host is specified, then localhost is used.
## If no port is specified, 6514 is used (RFC5425#section-4.1).
server = "tcp://:6514"
## TLS Config
# tls_allowed_cacerts = ["/etc/telegraf/ca.pem"]
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Period between keep alive probes.
## 0 disables keep alive probes.
## Defaults to the OS configuration.
## Only applies to stream sockets (e.g. TCP).
# keep_alive_period = "5m"
## Maximum number of concurrent connections (default = 0).
## 0 means unlimited.
## Only applies to stream sockets (e.g. TCP).
# max_connections = 1024
## Read timeout (default = 500ms).
## 0 means unlimited.
# read_timeout = 500ms
## Whether to parse in best effort mode or not (default = false).
## By default best effort parsing is off.
# best_effort = false
## Character to prepend to SD-PARAMs (default = "_").
## A syslog message can contain multiple parameters and multiple identifiers within structured data section.
## Eg., [id1 name1="val1" name2="val2"][id2 name1="val1" nameA="valA"]
## For each combination a field is created.
## Its name is created concatenating identifier, sdparam_separator, and parameter name.
# sdparam_separator = "_"
```
#### Other configs
Other available configurations are:
- `keep_alive_period`, `max_connections` for stream sockets
- `read_timeout`
- `best_effort` to tell the parser to work until it is able to do and extract partial but valid info (more [here](https://github.com/influxdata/go-syslog#best-effort-mode))
- `sdparam_separator` to choose how to separate structured data param name from its structured data identifier
### Metrics
- syslog
- fields
- **version** (`uint16`)
- **severity_code** (`int`)
- **facility_code** (`int`)
- timestamp (`int`)
- procid (`string`)
- msgid (`string`)
- *sdid* (`bool`)
- *sdid . sdparam_separator . sdparam_name* (`string`)
- tags
- **severity** (`string`)
- **facility** (`string`)
- hostname (`string`)
- appname (`string`)
The name of fields in _italic_ corresponds to their runtime value.
The fields/tags which name is in **bold** will always be present when a valid Syslog message has been received.
### RSYSLOG integration
The following instructions illustrate how to configure a syslog transport sender as per RFC5425 - ie., using the octect framing technique - via RSYSLOG.
Install `rsyslog`.
Give it a configuration - ie., `/etc/rsyslog.conf`.
```
$ModLoad imuxsock # provides support for local system logging
$ModLoad imklog # provides kernel logging support
$ModLoad immark # provides heart-beat logs
$FileOwner root
$FileGroup root
$FileCreateMode 0640
$DirCreateMode 0755
$Umask 0022
$WorkDirectory /var/spool/rsyslog # default location for work (spool) files
$ActionQueueType LinkedList # use asynchronous processing
$ActionQueueFileName srvrfwd # set file name, also enables disk mode
$ActionResumeRetryCount -1 # infinite retries on insert failure
$ActionQueueSaveOnShutdown on # save in-memory data if rsyslog shuts down
$IncludeConfig /etc/rsyslog.d/*.conf
```
Specify you want the octet framing technique enabled and the format of each syslog message to follow the RFC5424.
Create a file - eg., `/etc/rsyslog.d/50-default.conf` - containing:
```
*.* @@(o)127.0.0.1:6514;RSYSLOG_SyslogProtocol23Format
```
To complete the TLS setup please refer to [rsyslog docs](https://www.rsyslog.com/doc/v8-stable/tutorials/tls.html).
Notice that this configuration tells `rsyslog` to broadcast messages to `127.0.0.1>6514`.
So you have to configure this plugin accordingly.

View File

@ -0,0 +1,520 @@
package syslog
import (
"crypto/tls"
"fmt"
"net"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
var (
pki = testutil.NewPKI("../../../testutil/pki")
)
type testCase5425 struct {
name string
data []byte
wantBestEffort []testutil.Metric
wantStrict []testutil.Metric
werr int // how many errors we expect in the strict mode?
}
func getTestCasesForRFC5425() []testCase5425 {
testCases := []testCase5425{
{
name: "1st/avg/ok",
data: []byte(`188 <29>1 2016-02-21T04:32:57+00:00 web1 someservice 2341 2 [origin][meta sequence="14125553" service="someservice"] "GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`),
wantStrict: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"timestamp": time.Unix(1456029177, 0).UnixNano(),
"procid": "2341",
"msgid": "2",
"message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`,
"origin": true,
"meta_sequence": "14125553",
"meta_service": "someservice",
"severity_code": 5,
"facility_code": 3,
},
Tags: map[string]string{
"severity": "notice",
"facility": "daemon",
"hostname": "web1",
"appname": "someservice",
},
Time: defaultTime,
},
},
wantBestEffort: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"timestamp": time.Unix(1456029177, 0).UnixNano(),
"procid": "2341",
"msgid": "2",
"message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`,
"origin": true,
"meta_sequence": "14125553",
"meta_service": "someservice",
"severity_code": 5,
"facility_code": 3,
},
Tags: map[string]string{
"severity": "notice",
"facility": "daemon",
"hostname": "web1",
"appname": "someservice",
},
Time: defaultTime,
},
},
},
{
name: "1st/min/ok//2nd/min/ok",
data: []byte("16 <1>2 - - - - - -17 <4>11 - - - - - -"),
wantStrict: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(2),
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(11),
"severity_code": 4,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "warning",
"facility": "kern",
},
Time: defaultTime.Add(time.Nanosecond),
},
},
wantBestEffort: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(2),
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(11),
"severity_code": 4,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "warning",
"facility": "kern",
},
Time: defaultTime.Add(time.Nanosecond),
},
},
},
{
name: "1st/utf8/ok",
data: []byte("23 <1>1 - - - - - - hellø"),
wantStrict: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"message": "hellø",
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
},
wantBestEffort: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"message": "hellø",
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
},
},
{
name: "1st/nl/ok", // newline
data: []byte("28 <1>3 - - - - - - hello\nworld"),
wantStrict: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(3),
"message": "hello\nworld",
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
},
wantBestEffort: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(3),
"message": "hello\nworld",
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
},
},
{
name: "1st/uf/ko", // underflow (msglen less than provided octets)
data: []byte("16 <1>2"),
wantStrict: nil,
wantBestEffort: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(2),
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
},
werr: 1,
},
{
name: "1st/min/ok",
data: []byte("16 <1>1 - - - - - -"),
wantStrict: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
},
wantBestEffort: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
},
},
{
name: "1st/uf/mf", // The first "underflow" message breaks also the second one
data: []byte("16 <1>217 <11>1 - - - - - -"),
wantStrict: nil,
wantBestEffort: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(217),
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
},
werr: 1,
},
// {
// name: "1st/of/ko", // overflow (msglen greather then max allowed octets)
// data: []byte(fmt.Sprintf("8193 <%d>%d %s %s %s %s %s 12 %s", maxP, maxV, maxTS, maxH, maxA, maxPID, maxMID, message7681)),
// want: []testutil.Metric{},
// },
{
name: "1st/max/ok",
data: []byte(fmt.Sprintf("8192 <%d>%d %s %s %s %s %s - %s", maxP, maxV, maxTS, maxH, maxA, maxPID, maxMID, message7681)),
wantStrict: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": maxV,
"timestamp": time.Unix(1514764799, 999999000).UnixNano(),
"message": message7681,
"procid": maxPID,
"msgid": maxMID,
"facility_code": 23,
"severity_code": 7,
},
Tags: map[string]string{
"severity": "debug",
"facility": "local7",
"hostname": maxH,
"appname": maxA,
},
Time: defaultTime,
},
},
wantBestEffort: []testutil.Metric{
testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": maxV,
"timestamp": time.Unix(1514764799, 999999000).UnixNano(),
"message": message7681,
"procid": maxPID,
"msgid": maxMID,
"facility_code": 23,
"severity_code": 7,
},
Tags: map[string]string{
"severity": "debug",
"facility": "local7",
"hostname": maxH,
"appname": maxA,
},
Time: defaultTime,
},
},
},
}
return testCases
}
func newTCPSyslogReceiver(address string, keepAlive *internal.Duration, maxConn int, bestEffort bool) *Syslog {
d := &internal.Duration{
Duration: defaultReadTimeout,
}
s := &Syslog{
Address: address,
now: func() time.Time {
return defaultTime
},
ReadTimeout: d,
BestEffort: bestEffort,
Separator: "_",
}
if keepAlive != nil {
s.KeepAlivePeriod = keepAlive
}
if maxConn > 0 {
s.MaxConnections = maxConn
}
return s
}
func testStrictRFC5425(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) {
for _, tc := range getTestCasesForRFC5425() {
t.Run(tc.name, func(t *testing.T) {
// Creation of a strict mode receiver
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false)
require.NotNil(t, receiver)
if wantTLS {
receiver.ServerConfig = *pki.TLSServerConfig()
}
require.Equal(t, receiver.KeepAlivePeriod, keepAlive)
acc := &testutil.Accumulator{}
require.NoError(t, receiver.Start(acc))
defer receiver.Stop()
// Connect
var conn net.Conn
var err error
if wantTLS {
config, e := pki.TLSClientConfig().TLSConfig()
require.NoError(t, e)
config.ServerName = "localhost"
conn, err = tls.Dial(protocol, address, config)
} else {
conn, err = net.Dial(protocol, address)
defer conn.Close()
}
require.NotNil(t, conn)
require.NoError(t, err)
// Clear
acc.ClearMetrics()
acc.Errors = make([]error, 0)
// Write
conn.Write(tc.data)
// Wait that the the number of data points is accumulated
// Since the receiver is running concurrently
if tc.wantStrict != nil {
acc.Wait(len(tc.wantStrict))
}
// Wait the parsing error
acc.WaitError(tc.werr)
// Verify
if len(acc.Errors) != tc.werr {
t.Fatalf("Got unexpected errors. want error = %v, errors = %v\n", tc.werr, acc.Errors)
}
var got []testutil.Metric
for _, metric := range acc.Metrics {
got = append(got, *metric)
}
if !cmp.Equal(tc.wantStrict, got) {
t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(tc.wantStrict, got))
}
})
}
}
func testBestEffortRFC5425(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *internal.Duration) {
for _, tc := range getTestCasesForRFC5425() {
t.Run(tc.name, func(t *testing.T) {
// Creation of a best effort mode receiver
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true)
require.NotNil(t, receiver)
if wantTLS {
receiver.ServerConfig = *pki.TLSServerConfig()
}
require.Equal(t, receiver.KeepAlivePeriod, keepAlive)
acc := &testutil.Accumulator{}
require.NoError(t, receiver.Start(acc))
defer receiver.Stop()
// Connect
var conn net.Conn
var err error
if wantTLS {
config, e := pki.TLSClientConfig().TLSConfig()
require.NoError(t, e)
config.ServerName = "localhost"
conn, err = tls.Dial(protocol, address, config)
} else {
conn, err = net.Dial(protocol, address)
defer conn.Close()
}
require.NotNil(t, conn)
require.NoError(t, err)
// Clear
acc.ClearMetrics()
acc.Errors = make([]error, 0)
// Write
conn.Write(tc.data)
// Wait that the the number of data points is accumulated
// Since the receiver is running concurrently
if tc.wantBestEffort != nil {
acc.Wait(len(tc.wantBestEffort))
}
// Verify
var got []testutil.Metric
for _, metric := range acc.Metrics {
got = append(got, *metric)
}
if !cmp.Equal(tc.wantBestEffort, got) {
t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(tc.wantBestEffort, got))
}
})
}
}
func TestStrict_tcp(t *testing.T) {
testStrictRFC5425(t, "tcp", address, false, nil)
}
func TestBestEffort_tcp(t *testing.T) {
testBestEffortRFC5425(t, "tcp", address, false, nil)
}
func TestStrict_tcp_tls(t *testing.T) {
testStrictRFC5425(t, "tcp", address, true, nil)
}
func TestBestEffort_tcp_tls(t *testing.T) {
testBestEffortRFC5425(t, "tcp", address, true, nil)
}
func TestStrictWithKeepAlive_tcp_tls(t *testing.T) {
testStrictRFC5425(t, "tcp", address, true, &internal.Duration{Duration: time.Minute})
}
func TestStrictWithZeroKeepAlive_tcp_tls(t *testing.T) {
testStrictRFC5425(t, "tcp", address, true, &internal.Duration{Duration: 0})
}
func TestStrict_unix(t *testing.T) {
testStrictRFC5425(t, "unix", "/tmp/telegraf_test.sock", false, nil)
}
func TestBestEffort_unix(t *testing.T) {
testBestEffortRFC5425(t, "unix", "/tmp/telegraf_test.sock", false, nil)
}
func TestStrict_unix_tls(t *testing.T) {
testStrictRFC5425(t, "unix", "/tmp/telegraf_test.sock", true, nil)
}
func TestBestEffort_unix_tls(t *testing.T) {
testBestEffortRFC5425(t, "unix", "/tmp/telegraf_test.sock", true, nil)
}

View File

@ -0,0 +1,400 @@
package syslog
import (
"fmt"
"net"
"os"
"sync/atomic"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
type testCase5426 struct {
name string
data []byte
wantBestEffort *testutil.Metric
wantStrict *testutil.Metric
werr bool
}
func getTestCasesForRFC5426() []testCase5426 {
testCases := []testCase5426{
{
name: "empty",
data: []byte(""),
werr: true,
},
{
name: "complete",
data: []byte("<1>1 - - - - - - A"),
wantBestEffort: &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"message": "A",
"facility_code": 0,
"severity_code": 1,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
wantStrict: &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"message": "A",
"facility_code": 0,
"severity_code": 1,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
},
{
name: "one/per/packet",
data: []byte("<1>3 - - - - - - A<1>4 - - - - - - B"),
wantBestEffort: &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(3),
"message": "A<1>4 - - - - - - B",
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
wantStrict: &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(3),
"message": "A<1>4 - - - - - - B",
"severity_code": 1,
"facility_code": 0,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
},
{
name: "average",
data: []byte(`<29>1 2016-02-21T04:32:57+00:00 web1 someservice 2341 2 [origin][meta sequence="14125553" service="someservice"] "GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`),
wantBestEffort: &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"timestamp": time.Unix(1456029177, 0).UnixNano(),
"procid": "2341",
"msgid": "2",
"message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`,
"origin": true,
"meta_sequence": "14125553",
"meta_service": "someservice",
"severity_code": 5,
"facility_code": 3,
},
Tags: map[string]string{
"severity": "notice",
"facility": "daemon",
"hostname": "web1",
"appname": "someservice",
},
Time: defaultTime,
},
wantStrict: &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"timestamp": time.Unix(1456029177, 0).UnixNano(),
"procid": "2341",
"msgid": "2",
"message": `"GET /v1/ok HTTP/1.1" 200 145 "-" "hacheck 0.9.0" 24306 127.0.0.1:40124 575`,
"origin": true,
"meta_sequence": "14125553",
"meta_service": "someservice",
"severity_code": 5,
"facility_code": 3,
},
Tags: map[string]string{
"severity": "notice",
"facility": "daemon",
"hostname": "web1",
"appname": "someservice",
},
Time: defaultTime,
},
},
{
name: "max",
data: []byte(fmt.Sprintf("<%d>%d %s %s %s %s %s - %s", maxP, maxV, maxTS, maxH, maxA, maxPID, maxMID, message7681)),
wantBestEffort: &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": maxV,
"timestamp": time.Unix(1514764799, 999999000).UnixNano(),
"message": message7681,
"procid": maxPID,
"msgid": maxMID,
"severity_code": 7,
"facility_code": 23,
},
Tags: map[string]string{
"severity": "debug",
"facility": "local7",
"hostname": maxH,
"appname": maxA,
},
Time: defaultTime,
},
wantStrict: &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": maxV,
"timestamp": time.Unix(1514764799, 999999000).UnixNano(),
"message": message7681,
"procid": maxPID,
"msgid": maxMID,
"severity_code": 7,
"facility_code": 23,
},
Tags: map[string]string{
"severity": "debug",
"facility": "local7",
"hostname": maxH,
"appname": maxA,
},
Time: defaultTime,
},
},
{
name: "minimal/incomplete",
data: []byte("<1>2"),
wantBestEffort: &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(2),
"facility_code": 0,
"severity_code": 1,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: defaultTime,
},
werr: true,
},
}
return testCases
}
func newUDPSyslogReceiver(address string, bestEffort bool) *Syslog {
return &Syslog{
Address: address,
now: func() time.Time {
return defaultTime
},
BestEffort: bestEffort,
Separator: "_",
}
}
func testRFC5426(t *testing.T, protocol string, address string, bestEffort bool) {
for _, tc := range getTestCasesForRFC5426() {
t.Run(tc.name, func(t *testing.T) {
// Create receiver
receiver := newUDPSyslogReceiver(protocol+"://"+address, bestEffort)
acc := &testutil.Accumulator{}
require.NoError(t, receiver.Start(acc))
defer receiver.Stop()
// Clear
acc.ClearMetrics()
acc.Errors = make([]error, 0)
// Connect
conn, err := net.Dial(protocol, address)
require.NotNil(t, conn)
defer conn.Close()
require.Nil(t, err)
// Write
_, e := conn.Write(tc.data)
require.Nil(t, e)
// Waiting ...
if tc.wantStrict == nil && tc.werr || bestEffort && tc.werr {
acc.WaitError(1)
}
if tc.wantBestEffort != nil && bestEffort || tc.wantStrict != nil && !bestEffort {
acc.Wait(1) // RFC5426 mandates a syslog message per UDP packet
}
// Compare
var got *testutil.Metric
var want *testutil.Metric
if len(acc.Metrics) > 0 {
got = acc.Metrics[0]
}
if bestEffort {
want = tc.wantBestEffort
} else {
want = tc.wantStrict
}
if !cmp.Equal(want, got) {
t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, got))
}
})
}
}
func TestBestEffort_udp(t *testing.T) {
testRFC5426(t, "udp", address, true)
}
func TestStrict_udp(t *testing.T) {
testRFC5426(t, "udp", address, false)
}
func TestBestEffort_unixgram(t *testing.T) {
sockname := "/tmp/telegraf_test.sock"
os.Create(sockname)
testRFC5426(t, "unixgram", sockname, true)
}
func TestStrict_unixgram(t *testing.T) {
sockname := "/tmp/telegraf_test.sock"
os.Create(sockname)
testRFC5426(t, "unixgram", sockname, false)
}
func TestTimeIncrement_udp(t *testing.T) {
var i int64
atomic.StoreInt64(&i, 0)
getNow := func() time.Time {
if atomic.LoadInt64(&i)%2 == 0 {
return time.Unix(1, 0)
}
return time.Unix(1, 1)
}
// Create receiver
receiver := &Syslog{
Address: "udp://" + address,
now: getNow,
BestEffort: false,
Separator: "_",
}
acc := &testutil.Accumulator{}
require.NoError(t, receiver.Start(acc))
defer receiver.Stop()
// Connect
conn, err := net.Dial("udp", address)
require.NotNil(t, conn)
defer conn.Close()
require.Nil(t, err)
// Write
_, e := conn.Write([]byte("<1>1 - - - - - -"))
require.Nil(t, e)
// Wait
acc.Wait(1)
want := &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"facility_code": 0,
"severity_code": 1,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: getNow(),
}
if !cmp.Equal(want, acc.Metrics[0]) {
t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, acc.Metrics[0]))
}
// New one with different time
atomic.StoreInt64(&i, atomic.LoadInt64(&i)+1)
// Clear
acc.ClearMetrics()
// Write
_, e = conn.Write([]byte("<1>1 - - - - - -"))
require.Nil(t, e)
// Wait
acc.Wait(1)
want = &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"facility_code": 0,
"severity_code": 1,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: getNow(),
}
if !cmp.Equal(want, acc.Metrics[0]) {
t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, acc.Metrics[0]))
}
// New one with same time as previous one
// Clear
acc.ClearMetrics()
// Write
_, e = conn.Write([]byte("<1>1 - - - - - -"))
require.Nil(t, e)
// Wait
acc.Wait(1)
want = &testutil.Metric{
Measurement: "syslog",
Fields: map[string]interface{}{
"version": uint16(1),
"facility_code": 0,
"severity_code": 1,
},
Tags: map[string]string{
"severity": "alert",
"facility": "kern",
},
Time: getNow().Add(time.Nanosecond),
}
if !cmp.Equal(want, acc.Metrics[0]) {
t.Fatalf("Got (+) / Want (-)\n %s", cmp.Diff(want, acc.Metrics[0]))
}
}

View File

@ -0,0 +1,419 @@
package syslog
import (
"crypto/tls"
"fmt"
"io"
"net"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/influxdata/go-syslog/rfc5424"
"github.com/influxdata/go-syslog/rfc5425"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
tlsConfig "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
const defaultReadTimeout = time.Millisecond * 500
const ipMaxPacketSize = 64 * 1024
// Syslog is a syslog plugin
type Syslog struct {
tlsConfig.ServerConfig
Address string `toml:"server"`
KeepAlivePeriod *internal.Duration
ReadTimeout *internal.Duration
MaxConnections int
BestEffort bool
Separator string `toml:"sdparam_separator"`
now func() time.Time
lastTime time.Time
mu sync.Mutex
wg sync.WaitGroup
io.Closer
isStream bool
tcpListener net.Listener
tlsConfig *tls.Config
connections map[string]net.Conn
connectionsMu sync.Mutex
udpListener net.PacketConn
}
var sampleConfig = `
## Specify an ip or hostname with port - eg., tcp://localhost:6514, tcp://10.0.0.1:6514
## Protocol, address and port to host the syslog receiver.
## If no host is specified, then localhost is used.
## If no port is specified, 6514 is used (RFC5425#section-4.1).
server = "tcp://:6514"
## TLS Config
# tls_allowed_cacerts = ["/etc/telegraf/ca.pem"]
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Period between keep alive probes.
## 0 disables keep alive probes.
## Defaults to the OS configuration.
## Only applies to stream sockets (e.g. TCP).
# keep_alive_period = "5m"
## Maximum number of concurrent connections (default = 0).
## 0 means unlimited.
## Only applies to stream sockets (e.g. TCP).
# max_connections = 1024
## Read timeout (default = 500ms).
## 0 means unlimited.
# read_timeout = 500ms
## Whether to parse in best effort mode or not (default = false).
## By default best effort parsing is off.
# best_effort = false
## Character to prepend to SD-PARAMs (default = "_").
## A syslog message can contain multiple parameters and multiple identifiers within structured data section.
## Eg., [id1 name1="val1" name2="val2"][id2 name1="val1" nameA="valA"]
## For each combination a field is created.
## Its name is created concatenating identifier, sdparam_separator, and parameter name.
# sdparam_separator = "_"
`
// SampleConfig returns sample configuration message
func (s *Syslog) SampleConfig() string {
return sampleConfig
}
// Description returns the plugin description
func (s *Syslog) Description() string {
return "Accepts syslog messages per RFC5425"
}
// Gather ...
func (s *Syslog) Gather(_ telegraf.Accumulator) error {
return nil
}
// Start starts the service.
func (s *Syslog) Start(acc telegraf.Accumulator) error {
s.mu.Lock()
defer s.mu.Unlock()
scheme, host, err := getAddressParts(s.Address)
if err != nil {
return err
}
s.Address = host
switch scheme {
case "tcp", "tcp4", "tcp6", "unix", "unixpacket":
s.isStream = true
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
s.isStream = false
default:
return fmt.Errorf("unknown protocol '%s' in '%s'", scheme, s.Address)
}
if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" {
os.Remove(s.Address)
}
if s.isStream {
l, err := net.Listen(scheme, s.Address)
if err != nil {
return err
}
s.Closer = l
s.tcpListener = l
s.tlsConfig, err = s.TLSConfig()
if err != nil {
return err
}
s.wg.Add(1)
go s.listenStream(acc)
} else {
l, err := net.ListenPacket(scheme, s.Address)
if err != nil {
return err
}
s.Closer = l
s.udpListener = l
s.wg.Add(1)
go s.listenPacket(acc)
}
if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" {
s.Closer = unixCloser{path: s.Address, closer: s.Closer}
}
return nil
}
// Stop cleans up all resources
func (s *Syslog) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if s.Closer != nil {
s.Close()
}
s.wg.Wait()
}
// getAddressParts returns the address scheme and host
// it also sets defaults for them when missing
// when the input address does not specify the protocol it returns an error
func getAddressParts(a string) (string, string, error) {
parts := strings.SplitN(a, "://", 2)
if len(parts) != 2 {
return "", "", fmt.Errorf("missing protocol within address '%s'", a)
}
u, _ := url.Parse(a)
switch u.Scheme {
case "unix", "unixpacket", "unixgram":
return parts[0], parts[1], nil
}
var host string
if u.Hostname() != "" {
host = u.Hostname()
}
host += ":"
if u.Port() == "" {
host += "6514"
} else {
host += u.Port()
}
return u.Scheme, host, nil
}
func (s *Syslog) listenPacket(acc telegraf.Accumulator) {
defer s.wg.Done()
b := make([]byte, ipMaxPacketSize)
p := rfc5424.NewParser()
for {
n, _, err := s.udpListener.ReadFrom(b)
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
acc.AddError(err)
}
break
}
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
s.udpListener.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
}
message, err := p.Parse(b[:n], &s.BestEffort)
if message != nil {
acc.AddFields("syslog", fields(*message, s), tags(*message), s.time())
}
if err != nil {
acc.AddError(err)
}
}
}
func (s *Syslog) listenStream(acc telegraf.Accumulator) {
defer s.wg.Done()
s.connections = map[string]net.Conn{}
for {
conn, err := s.tcpListener.Accept()
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
acc.AddError(err)
}
break
}
var tcpConn, _ = conn.(*net.TCPConn)
if s.tlsConfig != nil {
conn = tls.Server(conn, s.tlsConfig)
}
s.connectionsMu.Lock()
if s.MaxConnections > 0 && len(s.connections) >= s.MaxConnections {
s.connectionsMu.Unlock()
conn.Close()
continue
}
s.connections[conn.RemoteAddr().String()] = conn
s.connectionsMu.Unlock()
if err := s.setKeepAlive(tcpConn); err != nil {
acc.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", s.Address, err))
}
go s.handle(conn, acc)
}
s.connectionsMu.Lock()
for _, c := range s.connections {
c.Close()
}
s.connectionsMu.Unlock()
}
func (s *Syslog) removeConnection(c net.Conn) {
s.connectionsMu.Lock()
delete(s.connections, c.RemoteAddr().String())
s.connectionsMu.Unlock()
}
func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
defer func() {
s.removeConnection(conn)
conn.Close()
}()
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
}
var p *rfc5425.Parser
if s.BestEffort {
p = rfc5425.NewParser(conn, rfc5425.WithBestEffort())
} else {
p = rfc5425.NewParser(conn)
}
p.ParseExecuting(func(r *rfc5425.Result) {
s.store(*r, acc)
})
}
func (s *Syslog) setKeepAlive(c *net.TCPConn) error {
if s.KeepAlivePeriod == nil {
return nil
}
if s.KeepAlivePeriod.Duration == 0 {
return c.SetKeepAlive(false)
}
if err := c.SetKeepAlive(true); err != nil {
return err
}
return c.SetKeepAlivePeriod(s.KeepAlivePeriod.Duration)
}
func (s *Syslog) store(res rfc5425.Result, acc telegraf.Accumulator) {
if res.Error != nil {
acc.AddError(res.Error)
}
if res.MessageError != nil {
acc.AddError(res.MessageError)
}
if res.Message != nil {
msg := *res.Message
acc.AddFields("syslog", fields(msg, s), tags(msg), s.time())
}
}
func tags(msg rfc5424.SyslogMessage) map[string]string {
ts := map[string]string{}
// Not checking assuming a minimally valid message
ts["severity"] = *msg.SeverityShortLevel()
ts["facility"] = *msg.FacilityLevel()
if msg.Hostname() != nil {
ts["hostname"] = *msg.Hostname()
}
if msg.Appname() != nil {
ts["appname"] = *msg.Appname()
}
return ts
}
func fields(msg rfc5424.SyslogMessage, s *Syslog) map[string]interface{} {
// Not checking assuming a minimally valid message
flds := map[string]interface{}{
"version": msg.Version(),
}
flds["severity_code"] = int(*msg.Severity())
flds["facility_code"] = int(*msg.Facility())
if msg.Timestamp() != nil {
flds["timestamp"] = (*msg.Timestamp()).UnixNano()
}
if msg.ProcID() != nil {
flds["procid"] = *msg.ProcID()
}
if msg.MsgID() != nil {
flds["msgid"] = *msg.MsgID()
}
if msg.Message() != nil {
flds["message"] = *msg.Message()
}
if msg.StructuredData() != nil {
for sdid, sdparams := range *msg.StructuredData() {
if len(sdparams) == 0 {
// When SD-ID does not have params we indicate its presence with a bool
flds[sdid] = true
continue
}
for name, value := range sdparams {
// Using whitespace as separator since it is not allowed by the grammar within SDID
flds[sdid+s.Separator+name] = value
}
}
}
return flds
}
type unixCloser struct {
path string
closer io.Closer
}
func (uc unixCloser) Close() error {
err := uc.closer.Close()
os.Remove(uc.path) // ignore error
return err
}
func (s *Syslog) time() time.Time {
t := s.now()
if t == s.lastTime {
t = t.Add(time.Nanosecond)
}
s.lastTime = t
return t
}
func getNanoNow() time.Time {
return time.Unix(0, time.Now().UnixNano())
}
func init() {
receiver := &Syslog{
Address: ":6514",
now: getNanoNow,
ReadTimeout: &internal.Duration{
Duration: defaultReadTimeout,
},
Separator: "_",
}
inputs.Add("syslog", func() telegraf.Input { return receiver })
}

View File

@ -0,0 +1,60 @@
package syslog
import (
"strings"
"testing"
"time"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
const (
address = ":6514"
)
var defaultTime = time.Unix(0, 0)
var maxP = uint8(191)
var maxV = uint16(999)
var maxTS = "2017-12-31T23:59:59.999999+00:00"
var maxH = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabc"
var maxA = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdef"
var maxPID = "abcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzabcdefghilmnopqrstuvzab"
var maxMID = "abcdefghilmnopqrstuvzabcdefghilm"
var message7681 = strings.Repeat("l", 7681)
func TestAddress(t *testing.T) {
var err error
var rec *Syslog
rec = &Syslog{
Address: "localhost:6514",
}
err = rec.Start(&testutil.Accumulator{})
require.EqualError(t, err, "missing protocol within address 'localhost:6514'")
require.Error(t, err)
rec = &Syslog{
Address: "unsupported://example.com:6514",
}
err = rec.Start(&testutil.Accumulator{})
require.EqualError(t, err, "unknown protocol 'unsupported' in 'example.com:6514'")
require.Error(t, err)
rec = &Syslog{
Address: "unixgram:///tmp/telegraf.sock",
}
err = rec.Start(&testutil.Accumulator{})
require.NoError(t, err)
require.Equal(t, "/tmp/telegraf.sock", rec.Address)
rec.Stop()
// Default port is 6514
rec = &Syslog{
Address: "tcp://localhost",
}
err = rec.Start(&testutil.Accumulator{})
require.NoError(t, err)
require.Equal(t, "localhost:6514", rec.Address)
rec.Stop()
}