Add syslog output plugin (#5802)
This commit is contained in:
parent
dfb1387771
commit
761705c299
|
@ -2,6 +2,7 @@ package syslog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
framing "github.com/influxdata/telegraf/internal/syslog"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -37,7 +38,7 @@ func newUDPSyslogReceiver(address string, bestEffort bool) *Syslog {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTCPSyslogReceiver(address string, keepAlive *internal.Duration, maxConn int, bestEffort bool, f Framing) *Syslog {
|
func newTCPSyslogReceiver(address string, keepAlive *internal.Duration, maxConn int, bestEffort bool, f framing.Framing) *Syslog {
|
||||||
d := &internal.Duration{
|
d := &internal.Duration{
|
||||||
Duration: defaultReadTimeout,
|
Duration: defaultReadTimeout,
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
framing "github.com/influxdata/telegraf/internal/syslog"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -138,7 +139,7 @@ func testStrictNonTransparent(t *testing.T, protocol string, address string, wan
|
||||||
for _, tc := range getTestCasesForNonTransparent() {
|
for _, tc := range getTestCasesForNonTransparent() {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
// Creation of a strict mode receiver
|
// Creation of a strict mode receiver
|
||||||
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, NonTransparent)
|
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, framing.NonTransparent)
|
||||||
require.NotNil(t, receiver)
|
require.NotNil(t, receiver)
|
||||||
if wantTLS {
|
if wantTLS {
|
||||||
receiver.ServerConfig = *pki.TLSServerConfig()
|
receiver.ServerConfig = *pki.TLSServerConfig()
|
||||||
|
@ -200,7 +201,7 @@ func testBestEffortNonTransparent(t *testing.T, protocol string, address string,
|
||||||
for _, tc := range getTestCasesForNonTransparent() {
|
for _, tc := range getTestCasesForNonTransparent() {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
// Creation of a best effort mode receiver
|
// Creation of a best effort mode receiver
|
||||||
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, NonTransparent)
|
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, framing.NonTransparent)
|
||||||
require.NotNil(t, receiver)
|
require.NotNil(t, receiver)
|
||||||
if wantTLS {
|
if wantTLS {
|
||||||
receiver.ServerConfig = *pki.TLSServerConfig()
|
receiver.ServerConfig = *pki.TLSServerConfig()
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
framing "github.com/influxdata/telegraf/internal/syslog"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -338,7 +339,7 @@ func testStrictOctetCounting(t *testing.T, protocol string, address string, want
|
||||||
for _, tc := range getTestCasesForOctetCounting() {
|
for _, tc := range getTestCasesForOctetCounting() {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
// Creation of a strict mode receiver
|
// Creation of a strict mode receiver
|
||||||
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, OctetCounting)
|
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, framing.OctetCounting)
|
||||||
require.NotNil(t, receiver)
|
require.NotNil(t, receiver)
|
||||||
if wantTLS {
|
if wantTLS {
|
||||||
receiver.ServerConfig = *pki.TLSServerConfig()
|
receiver.ServerConfig = *pki.TLSServerConfig()
|
||||||
|
@ -400,7 +401,7 @@ func testBestEffortOctetCounting(t *testing.T, protocol string, address string,
|
||||||
for _, tc := range getTestCasesForOctetCounting() {
|
for _, tc := range getTestCasesForOctetCounting() {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
// Creation of a best effort mode receiver
|
// Creation of a best effort mode receiver
|
||||||
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, OctetCounting)
|
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, framing.OctetCounting)
|
||||||
require.NotNil(t, receiver)
|
require.NotNil(t, receiver)
|
||||||
if wantTLS {
|
if wantTLS {
|
||||||
receiver.ServerConfig = *pki.TLSServerConfig()
|
receiver.ServerConfig = *pki.TLSServerConfig()
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"github.com/influxdata/go-syslog/rfc5424"
|
"github.com/influxdata/go-syslog/rfc5424"
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
framing "github.com/influxdata/telegraf/internal/syslog"
|
||||||
tlsConfig "github.com/influxdata/telegraf/internal/tls"
|
tlsConfig "github.com/influxdata/telegraf/internal/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
)
|
)
|
||||||
|
@ -32,7 +33,7 @@ type Syslog struct {
|
||||||
KeepAlivePeriod *internal.Duration
|
KeepAlivePeriod *internal.Duration
|
||||||
MaxConnections int
|
MaxConnections int
|
||||||
ReadTimeout *internal.Duration
|
ReadTimeout *internal.Duration
|
||||||
Framing Framing
|
Framing framing.Framing
|
||||||
Trailer nontransparent.TrailerType
|
Trailer nontransparent.TrailerType
|
||||||
BestEffort bool
|
BestEffort bool
|
||||||
Separator string `toml:"sdparam_separator"`
|
Separator string `toml:"sdparam_separator"`
|
||||||
|
@ -313,7 +314,7 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select the parser to use depeding on transport framing
|
// Select the parser to use depeding on transport framing
|
||||||
if s.Framing == OctetCounting {
|
if s.Framing == framing.OctetCounting {
|
||||||
// Octet counting transparent framing
|
// Octet counting transparent framing
|
||||||
p = octetcounting.NewParser(opts...)
|
p = octetcounting.NewParser(opts...)
|
||||||
} else {
|
} else {
|
||||||
|
@ -445,7 +446,7 @@ func init() {
|
||||||
ReadTimeout: &internal.Duration{
|
ReadTimeout: &internal.Duration{
|
||||||
Duration: defaultReadTimeout,
|
Duration: defaultReadTimeout,
|
||||||
},
|
},
|
||||||
Framing: OctetCounting,
|
Framing: framing.OctetCounting,
|
||||||
Trailer: nontransparent.LF,
|
Trailer: nontransparent.LF,
|
||||||
Separator: "_",
|
Separator: "_",
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,5 +30,6 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy"
|
_ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/socket_writer"
|
_ "github.com/influxdata/telegraf/plugins/outputs/socket_writer"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/stackdriver"
|
_ "github.com/influxdata/telegraf/plugins/outputs/stackdriver"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/outputs/syslog"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/wavefront"
|
_ "github.com/influxdata/telegraf/plugins/outputs/wavefront"
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
# Syslog Output Plugin
|
||||||
|
|
||||||
|
The syslog output plugin sends syslog messages transmitted over
|
||||||
|
[UDP](https://tools.ietf.org/html/rfc5426) or
|
||||||
|
[TCP](https://tools.ietf.org/html/rfc6587) or
|
||||||
|
[TLS](https://tools.ietf.org/html/rfc5425), with or without the octet counting framing.
|
||||||
|
|
||||||
|
Syslog messages are formatted according to
|
||||||
|
[RFC 5424](https://tools.ietf.org/html/rfc5424).
|
||||||
|
|
||||||
|
### Configuration
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[outputs.syslog]]
|
||||||
|
## URL to connect to
|
||||||
|
## ex: address = "tcp://127.0.0.1:8094"
|
||||||
|
## ex: address = "tcp4://127.0.0.1:8094"
|
||||||
|
## ex: address = "tcp6://127.0.0.1:8094"
|
||||||
|
## ex: address = "tcp6://[2001:db8::1]:8094"
|
||||||
|
## ex: address = "udp://127.0.0.1:8094"
|
||||||
|
## ex: address = "udp4://127.0.0.1:8094"
|
||||||
|
## ex: address = "udp6://127.0.0.1:8094"
|
||||||
|
address = "tcp://127.0.0.1:8094"
|
||||||
|
|
||||||
|
## Optional TLS Config
|
||||||
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
## Use TLS but skip chain & host verification
|
||||||
|
# insecure_skip_verify = false
|
||||||
|
|
||||||
|
## Period between keep alive probes.
|
||||||
|
## Only applies to TCP sockets.
|
||||||
|
## 0 disables keep alive probes.
|
||||||
|
## Defaults to the OS configuration.
|
||||||
|
# keep_alive_period = "5m"
|
||||||
|
|
||||||
|
## The framing technique with which it is expected that messages are transported (default = "octet-counting").
|
||||||
|
## Whether the messages come using the octect-counting (RFC5425#section-4.3.1, RFC6587#section-3.4.1),
|
||||||
|
## or the non-transparent framing technique (RFC6587#section-3.4.2).
|
||||||
|
## Must be one of "octect-counting", "non-transparent".
|
||||||
|
# framing = "octet-counting"
|
||||||
|
|
||||||
|
## The trailer to be expected in case of non-trasparent framing (default = "LF").
|
||||||
|
## Must be one of "LF", or "NUL".
|
||||||
|
# trailer = "LF"
|
||||||
|
|
||||||
|
### SD-PARAMs settings
|
||||||
|
### A syslog message can contain multiple parameters and multiple identifiers within structured data section
|
||||||
|
### A syslog message can contain multiple structured data sections.
|
||||||
|
### For each unrecognised metric tag/field a SD-PARAMS can be created.
|
||||||
|
### Example
|
||||||
|
### Configuration =>
|
||||||
|
### sdparam_separator = "_"
|
||||||
|
### default_sdid = "default@32473"
|
||||||
|
### sdids = ["foo@123", "bar@456"]
|
||||||
|
### input => xyzzy,x=y foo@123_value=42,bar@456_value2=84,something_else=1
|
||||||
|
### output (structured data only) => [foo@123 value=42][bar@456 value2=84][default@32473 something_else=1 x=y]
|
||||||
|
|
||||||
|
## SD-PARAMs separator between the sdid and tag/field key (default = "_")
|
||||||
|
# sdparam_separator = "_"
|
||||||
|
|
||||||
|
## Default sdid used for tags/fields that don't contain a prefix defined in the explict sdids setting below
|
||||||
|
## If no default is specified, no SD-PARAMs will be used for unrecognised field.
|
||||||
|
# default_sdid = "default@32473"
|
||||||
|
|
||||||
|
##List of explicit prefixes to extract from tag/field keys and use as the SDID, if they match (see above example for more details):
|
||||||
|
# sdids = ["foo@123", "bar@456"]
|
||||||
|
###
|
||||||
|
|
||||||
|
## Default severity value. Severity and Facility are used to calculate the message PRI value (RFC5424#section-6.2.1)
|
||||||
|
## Used when no metric field with key "severity_code" is defined.
|
||||||
|
## If unset, 5 (notice) is the default
|
||||||
|
# default_severity_code = 5
|
||||||
|
|
||||||
|
## Default facility value. Facility and Severity are used to calculate the message PRI value (RFC5424#section-6.2.1)
|
||||||
|
## Used when no metric field with key "facility_code" is defined.
|
||||||
|
## If unset, 1 (user-level) is the default
|
||||||
|
# default_facility_code = 1
|
||||||
|
|
||||||
|
## Default APP-NAME value (RFC5424#section-6.2.5)
|
||||||
|
## Used when no metric tag with key "appname" is defined.
|
||||||
|
## If unset, "Telegraf" is the default
|
||||||
|
# default_appname = "Telegraf"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Metric mapping
|
||||||
|
The output plugin expects syslog metrics tags and fields to match up with the ones created in the [syslog input plugin](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/syslog#metrics).
|
||||||
|
|
||||||
|
The following table shows the metric tags, field and defaults used to format syslog messages.
|
||||||
|
|
||||||
|
| Syslog field | Metric Tag | Metric Field | Default value |
|
||||||
|
| --- | --- | --- | --- |
|
||||||
|
| APP-NAME | appname | - | default_appname = "Telegraf" |
|
||||||
|
| TIMESTAMP | - | timestamp | Metric's own timestamp |
|
||||||
|
| VERSION | - | version | 1 |
|
||||||
|
| PRI | - | serverity_code + (8 * facility_code)| default_severity_code=5 (notice), default_facility_code=1 (user-level)|
|
||||||
|
| HOSTNAME | hostname OR source OR host | - | os.Hostname() |
|
||||||
|
| MSGID | - | msgid | Metric name |
|
||||||
|
| PROCID | - | procid | - |
|
||||||
|
| MSG | - | msg | - |
|
|
@ -0,0 +1,245 @@
|
||||||
|
package syslog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/influxdata/go-syslog/nontransparent"
|
||||||
|
"github.com/influxdata/go-syslog/rfc5424"
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
framing "github.com/influxdata/telegraf/internal/syslog"
|
||||||
|
tlsint "github.com/influxdata/telegraf/internal/tls"
|
||||||
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Syslog struct {
|
||||||
|
Address string
|
||||||
|
KeepAlivePeriod *internal.Duration
|
||||||
|
DefaultSdid string
|
||||||
|
DefaultSeverityCode uint8
|
||||||
|
DefaultFacilityCode uint8
|
||||||
|
DefaultAppname string
|
||||||
|
Sdids []string
|
||||||
|
Separator string `toml:"sdparam_separator"`
|
||||||
|
Framing framing.Framing
|
||||||
|
Trailer nontransparent.TrailerType
|
||||||
|
net.Conn
|
||||||
|
tlsint.ClientConfig
|
||||||
|
mapper *SyslogMapper
|
||||||
|
}
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
## URL to connect to
|
||||||
|
## ex: address = "tcp://127.0.0.1:8094"
|
||||||
|
## ex: address = "tcp4://127.0.0.1:8094"
|
||||||
|
## ex: address = "tcp6://127.0.0.1:8094"
|
||||||
|
## ex: address = "tcp6://[2001:db8::1]:8094"
|
||||||
|
## ex: address = "udp://127.0.0.1:8094"
|
||||||
|
## ex: address = "udp4://127.0.0.1:8094"
|
||||||
|
## ex: address = "udp6://127.0.0.1:8094"
|
||||||
|
address = "tcp://127.0.0.1:8094"
|
||||||
|
|
||||||
|
## Optional TLS Config
|
||||||
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
## Use TLS but skip chain & host verification
|
||||||
|
# insecure_skip_verify = false
|
||||||
|
|
||||||
|
## Period between keep alive probes.
|
||||||
|
## Only applies to TCP sockets.
|
||||||
|
## 0 disables keep alive probes.
|
||||||
|
## Defaults to the OS configuration.
|
||||||
|
# keep_alive_period = "5m"
|
||||||
|
|
||||||
|
## The framing technique with which it is expected that messages are transported (default = "octet-counting").
|
||||||
|
## Whether the messages come using the octect-counting (RFC5425#section-4.3.1, RFC6587#section-3.4.1),
|
||||||
|
## or the non-transparent framing technique (RFC6587#section-3.4.2).
|
||||||
|
## Must be one of "octect-counting", "non-transparent".
|
||||||
|
# framing = "octet-counting"
|
||||||
|
|
||||||
|
## The trailer to be expected in case of non-trasparent framing (default = "LF").
|
||||||
|
## Must be one of "LF", or "NUL".
|
||||||
|
# trailer = "LF"
|
||||||
|
|
||||||
|
### SD-PARAMs settings
|
||||||
|
### A syslog message can contain multiple parameters and multiple identifiers within structured data section
|
||||||
|
### A syslog message can contain multiple structured data sections.
|
||||||
|
### For each unrecognised metric tag/field a SD-PARAMS can be created.
|
||||||
|
### Example
|
||||||
|
### Configuration =>
|
||||||
|
### sdparam_separator = "_"
|
||||||
|
### default_sdid = "default@32473"
|
||||||
|
### sdids = ["foo@123", "bar@456"]
|
||||||
|
### input => xyzzy,x=y foo@123_value=42,bar@456_value2=84,something_else=1
|
||||||
|
### output (structured data only) => [foo@123 value=42][bar@456 value2=84][default@32473 something_else=1 x=y]
|
||||||
|
|
||||||
|
## SD-PARAMs separator between the sdid and tag/field key (default = "_")
|
||||||
|
# sdparam_separator = "_"
|
||||||
|
|
||||||
|
## Default sdid used for tags/fields that don't contain a prefix defined in the explict sdids setting below
|
||||||
|
## If no default is specified, no SD-PARAMs will be used for unrecognised field.
|
||||||
|
# default_sdid = "default@32473"
|
||||||
|
|
||||||
|
##List of explicit prefixes to extract from tag/field keys and use as the SDID, if they match (see above example for more details):
|
||||||
|
# sdids = ["foo@123", "bar@456"]
|
||||||
|
###
|
||||||
|
|
||||||
|
## Default severity value. Severity and Facility are used to calculate the message PRI value (RFC5424#section-6.2.1)
|
||||||
|
## Used when no metric field with key "severity_code" is defined.
|
||||||
|
## If unset, 5 (notice) is the default
|
||||||
|
# default_severity_code = 5
|
||||||
|
|
||||||
|
## Default facility value. Facility and Severity are used to calculate the message PRI value (RFC5424#section-6.2.1)
|
||||||
|
## Used when no metric field with key "facility_code" is defined.
|
||||||
|
## If unset, 1 (user-level) is the default
|
||||||
|
# default_facility_code = 1
|
||||||
|
|
||||||
|
## Default APP-NAME value (RFC5424#section-6.2.5)
|
||||||
|
## Used when no metric tag with key "appname" is defined.
|
||||||
|
## If unset, "Telegraf" is the default
|
||||||
|
# default_appname = "Telegraf"
|
||||||
|
`
|
||||||
|
|
||||||
|
func (s *Syslog) Connect() error {
|
||||||
|
s.initializeSyslogMapper()
|
||||||
|
|
||||||
|
spl := strings.SplitN(s.Address, "://", 2)
|
||||||
|
if len(spl) != 2 {
|
||||||
|
return fmt.Errorf("invalid address: %s", s.Address)
|
||||||
|
}
|
||||||
|
|
||||||
|
tlsCfg, err := s.ClientConfig.TLSConfig()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var c net.Conn
|
||||||
|
if tlsCfg == nil {
|
||||||
|
c, err = net.Dial(spl[0], spl[1])
|
||||||
|
} else {
|
||||||
|
c, err = tls.Dial(spl[0], spl[1], tlsCfg)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.setKeepAlive(c); err != nil {
|
||||||
|
log.Printf("unable to configure keep alive (%s): %s", s.Address, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Conn = c
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Syslog) setKeepAlive(c net.Conn) error {
|
||||||
|
if s.KeepAlivePeriod == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
tcpc, ok := c.(*net.TCPConn)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(s.Address, "://", 2)[0])
|
||||||
|
}
|
||||||
|
if s.KeepAlivePeriod.Duration == 0 {
|
||||||
|
return tcpc.SetKeepAlive(false)
|
||||||
|
}
|
||||||
|
if err := tcpc.SetKeepAlive(true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return tcpc.SetKeepAlivePeriod(s.KeepAlivePeriod.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Syslog) Close() error {
|
||||||
|
if s.Conn == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err := s.Conn.Close()
|
||||||
|
s.Conn = nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Syslog) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Syslog) Description() string {
|
||||||
|
return "Configuration for Syslog server to send metrics to"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Syslog) Write(metrics []telegraf.Metric) (err error) {
|
||||||
|
if s.Conn == nil {
|
||||||
|
// previous write failed with permanent error and socket was closed.
|
||||||
|
if err = s.Connect(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, metric := range metrics {
|
||||||
|
var msg *rfc5424.SyslogMessage
|
||||||
|
if msg, err = s.mapper.MapMetricToSyslogMessage(metric); err != nil {
|
||||||
|
log.Printf("E! [outputs.syslog] Failed to create syslog message: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var msgBytesWithFraming []byte
|
||||||
|
if msgBytesWithFraming, err = s.getSyslogMessageBytesWithFraming(msg); err != nil {
|
||||||
|
log.Printf("E! [outputs.syslog] Failed to convert syslog message with framing: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err = s.Conn.Write(msgBytesWithFraming); err != nil {
|
||||||
|
if netErr, ok := err.(net.Error); !ok || !netErr.Temporary() {
|
||||||
|
s.Close()
|
||||||
|
s.Conn = nil
|
||||||
|
return fmt.Errorf("closing connection: %v", netErr)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Syslog) getSyslogMessageBytesWithFraming(msg *rfc5424.SyslogMessage) ([]byte, error) {
|
||||||
|
var msgString string
|
||||||
|
var err error
|
||||||
|
if msgString, err = msg.String(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
msgBytes := []byte(msgString)
|
||||||
|
|
||||||
|
if s.Framing == framing.OctetCounting {
|
||||||
|
return append([]byte(strconv.Itoa(len(msgBytes))+" "), msgBytes...), nil
|
||||||
|
}
|
||||||
|
// Non-transparent framing
|
||||||
|
return append(msgBytes, byte(s.Trailer)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Syslog) initializeSyslogMapper() {
|
||||||
|
if s.mapper != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.mapper = newSyslogMapper()
|
||||||
|
s.mapper.DefaultFacilityCode = s.DefaultFacilityCode
|
||||||
|
s.mapper.DefaultSeverityCode = s.DefaultSeverityCode
|
||||||
|
s.mapper.DefaultAppname = s.DefaultAppname
|
||||||
|
s.mapper.Separator = s.Separator
|
||||||
|
s.mapper.DefaultSdid = s.DefaultSdid
|
||||||
|
s.mapper.Sdids = s.Sdids
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSyslog() *Syslog {
|
||||||
|
return &Syslog{
|
||||||
|
Framing: framing.OctetCounting,
|
||||||
|
Trailer: nontransparent.LF,
|
||||||
|
Separator: "_",
|
||||||
|
DefaultSeverityCode: uint8(5), // notice
|
||||||
|
DefaultFacilityCode: uint8(1), // user-level
|
||||||
|
DefaultAppname: "Telegraf",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
outputs.Add("syslog", func() telegraf.Output { return newSyslog() })
|
||||||
|
}
|
|
@ -0,0 +1,199 @@
|
||||||
|
package syslog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"math"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/go-syslog/rfc5424"
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SyslogMapper struct {
|
||||||
|
DefaultSdid string
|
||||||
|
DefaultSeverityCode uint8
|
||||||
|
DefaultFacilityCode uint8
|
||||||
|
DefaultAppname string
|
||||||
|
Sdids []string
|
||||||
|
Separator string
|
||||||
|
reservedKeys map[string]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// MapMetricToSyslogMessage maps metrics tags/fields to syslog messages
|
||||||
|
func (sm *SyslogMapper) MapMetricToSyslogMessage(metric telegraf.Metric) (*rfc5424.SyslogMessage, error) {
|
||||||
|
msg := &rfc5424.SyslogMessage{}
|
||||||
|
|
||||||
|
sm.mapPriority(metric, msg)
|
||||||
|
sm.mapStructuredData(metric, msg)
|
||||||
|
sm.mapAppname(metric, msg)
|
||||||
|
mapHostname(metric, msg)
|
||||||
|
mapTimestamp(metric, msg)
|
||||||
|
mapMsgID(metric, msg)
|
||||||
|
mapVersion(metric, msg)
|
||||||
|
mapProcID(metric, msg)
|
||||||
|
mapMsg(metric, msg)
|
||||||
|
|
||||||
|
if !msg.Valid() {
|
||||||
|
return nil, errors.New("metric could not produce valid syslog message")
|
||||||
|
}
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *SyslogMapper) mapStructuredData(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
|
||||||
|
for _, tag := range metric.TagList() {
|
||||||
|
sm.mapStructuredDataItem(tag.Key, tag.Value, msg)
|
||||||
|
}
|
||||||
|
for _, field := range metric.FieldList() {
|
||||||
|
sm.mapStructuredDataItem(field.Key, formatValue(field.Value), msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *SyslogMapper) mapStructuredDataItem(key string, value string, msg *rfc5424.SyslogMessage) {
|
||||||
|
if sm.reservedKeys[key] {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
isExplicitSdid := false
|
||||||
|
for _, sdid := range sm.Sdids {
|
||||||
|
k := strings.TrimLeft(key, sdid+sm.Separator)
|
||||||
|
if len(key) > len(k) {
|
||||||
|
isExplicitSdid = true
|
||||||
|
msg.SetParameter(sdid, k, value)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !isExplicitSdid && len(sm.DefaultSdid) > 0 {
|
||||||
|
k := strings.TrimPrefix(key, sm.DefaultSdid+sm.Separator)
|
||||||
|
msg.SetParameter(sm.DefaultSdid, k, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *SyslogMapper) mapAppname(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
|
||||||
|
if value, ok := metric.GetTag("appname"); ok {
|
||||||
|
msg.SetAppname(formatValue(value))
|
||||||
|
} else {
|
||||||
|
//Use default appname
|
||||||
|
msg.SetAppname(sm.DefaultAppname)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mapMsgID(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
|
||||||
|
if value, ok := metric.GetField("msgid"); ok {
|
||||||
|
msg.SetMsgID(formatValue(value))
|
||||||
|
} else {
|
||||||
|
// We default to metric name
|
||||||
|
msg.SetMsgID(metric.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mapVersion(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
|
||||||
|
if value, ok := metric.GetField("version"); ok {
|
||||||
|
switch v := value.(type) {
|
||||||
|
case uint64:
|
||||||
|
msg.SetVersion(uint16(v))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
msg.SetVersion(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func mapMsg(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
|
||||||
|
if value, ok := metric.GetField("msg"); ok {
|
||||||
|
msg.SetMessage(formatValue(value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mapProcID(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
|
||||||
|
if value, ok := metric.GetField("procid"); ok {
|
||||||
|
msg.SetProcID(formatValue(value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *SyslogMapper) mapPriority(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
|
||||||
|
severityCode := sm.DefaultSeverityCode
|
||||||
|
facilityCode := sm.DefaultFacilityCode
|
||||||
|
|
||||||
|
if value, ok := getFieldCode(metric, "severity_code"); ok {
|
||||||
|
severityCode = *value
|
||||||
|
}
|
||||||
|
|
||||||
|
if value, ok := getFieldCode(metric, "facility_code"); ok {
|
||||||
|
facilityCode = *value
|
||||||
|
}
|
||||||
|
|
||||||
|
priority := (8 * facilityCode) + severityCode
|
||||||
|
msg.SetPriority(priority)
|
||||||
|
}
|
||||||
|
|
||||||
|
func mapHostname(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
|
||||||
|
// Try with hostname, then with source, then with host tags, then take OS Hostname
|
||||||
|
if value, ok := metric.GetTag("hostname"); ok {
|
||||||
|
msg.SetHostname(formatValue(value))
|
||||||
|
} else if value, ok := metric.GetTag("source"); ok {
|
||||||
|
msg.SetHostname(formatValue(value))
|
||||||
|
} else if value, ok := metric.GetTag("host"); ok {
|
||||||
|
msg.SetHostname(formatValue(value))
|
||||||
|
} else if value, err := os.Hostname(); err == nil {
|
||||||
|
msg.SetHostname(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mapTimestamp(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
|
||||||
|
timestamp := metric.Time()
|
||||||
|
if value, ok := metric.GetField("timestamp"); ok {
|
||||||
|
switch v := value.(type) {
|
||||||
|
case int64:
|
||||||
|
timestamp = time.Unix(0, v).UTC()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
msg.SetTimestamp(timestamp.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
|
||||||
|
func formatValue(value interface{}) string {
|
||||||
|
switch v := value.(type) {
|
||||||
|
case string:
|
||||||
|
return v
|
||||||
|
case bool:
|
||||||
|
if v {
|
||||||
|
return "1"
|
||||||
|
}
|
||||||
|
return "0"
|
||||||
|
case uint64:
|
||||||
|
return strconv.FormatUint(v, 10)
|
||||||
|
case int64:
|
||||||
|
return strconv.FormatInt(v, 10)
|
||||||
|
case float64:
|
||||||
|
if math.IsNaN(v) {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if math.IsInf(v, 0) {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return strconv.FormatFloat(v, 'f', -1, 64)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func getFieldCode(metric telegraf.Metric, fieldKey string) (*uint8, bool) {
|
||||||
|
if value, ok := metric.GetField(fieldKey); ok {
|
||||||
|
if v, err := strconv.ParseUint(formatValue(value), 10, 8); err == nil {
|
||||||
|
r := uint8(v)
|
||||||
|
return &r, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSyslogMapper() *SyslogMapper {
|
||||||
|
return &SyslogMapper{
|
||||||
|
reservedKeys: map[string]bool{
|
||||||
|
"version": true, "severity_code": true, "facility_code": true,
|
||||||
|
"procid": true, "msgid": true, "msg": true, "timestamp": true, "sdid": true,
|
||||||
|
"hostname": true, "source": true, "host": true, "severity": true,
|
||||||
|
"facility": true, "appname": true},
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,200 @@
|
||||||
|
package syslog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSyslogMapperWithDefaults(t *testing.T) {
|
||||||
|
s := newSyslog()
|
||||||
|
s.initializeSyslogMapper()
|
||||||
|
|
||||||
|
// Init metrics
|
||||||
|
m1, _ := metric.New(
|
||||||
|
"testmetric",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
hostname, err := os.Hostname()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
str, _ := syslogMessage.String()
|
||||||
|
assert.Equal(t, "<13>1 2010-11-10T23:00:00Z "+hostname+" Telegraf - testmetric -", str, "Wrong syslog message")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyslogMapperWithHostname(t *testing.T) {
|
||||||
|
s := newSyslog()
|
||||||
|
s.initializeSyslogMapper()
|
||||||
|
|
||||||
|
// Init metrics
|
||||||
|
m1, _ := metric.New(
|
||||||
|
"testmetric",
|
||||||
|
map[string]string{
|
||||||
|
"hostname": "testhost",
|
||||||
|
"source": "sourcevalue",
|
||||||
|
"host": "hostvalue",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
str, _ := syslogMessage.String()
|
||||||
|
assert.Equal(t, "<13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -", str, "Wrong syslog message")
|
||||||
|
}
|
||||||
|
func TestSyslogMapperWithHostnameSourceFallback(t *testing.T) {
|
||||||
|
s := newSyslog()
|
||||||
|
s.initializeSyslogMapper()
|
||||||
|
|
||||||
|
// Init metrics
|
||||||
|
m1, _ := metric.New(
|
||||||
|
"testmetric",
|
||||||
|
map[string]string{
|
||||||
|
"source": "sourcevalue",
|
||||||
|
"host": "hostvalue",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
str, _ := syslogMessage.String()
|
||||||
|
assert.Equal(t, "<13>1 2010-11-10T23:00:00Z sourcevalue Telegraf - testmetric -", str, "Wrong syslog message")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyslogMapperWithHostnameHostFallback(t *testing.T) {
|
||||||
|
s := newSyslog()
|
||||||
|
s.initializeSyslogMapper()
|
||||||
|
|
||||||
|
// Init metrics
|
||||||
|
m1, _ := metric.New(
|
||||||
|
"testmetric",
|
||||||
|
map[string]string{
|
||||||
|
"host": "hostvalue",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
str, _ := syslogMessage.String()
|
||||||
|
assert.Equal(t, "<13>1 2010-11-10T23:00:00Z hostvalue Telegraf - testmetric -", str, "Wrong syslog message")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyslogMapperWithDefaultSdid(t *testing.T) {
|
||||||
|
s := newSyslog()
|
||||||
|
s.DefaultSdid = "default@32473"
|
||||||
|
s.initializeSyslogMapper()
|
||||||
|
|
||||||
|
// Init metrics
|
||||||
|
m1, _ := metric.New(
|
||||||
|
"testmetric",
|
||||||
|
map[string]string{
|
||||||
|
"appname": "testapp",
|
||||||
|
"hostname": "testhost",
|
||||||
|
"tag1": "bar",
|
||||||
|
"default@32473_tag2": "foobar",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"severity_code": uint64(3),
|
||||||
|
"facility_code": uint64(3),
|
||||||
|
"msg": "Test message",
|
||||||
|
"procid": uint64(25),
|
||||||
|
"version": uint16(2),
|
||||||
|
"msgid": int64(555),
|
||||||
|
"timestamp": time.Date(2010, time.November, 10, 23, 30, 0, 0, time.UTC).UnixNano(),
|
||||||
|
"value1": int64(2),
|
||||||
|
"default@32473_value2": "foo",
|
||||||
|
"value3": float64(1.2),
|
||||||
|
},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
|
||||||
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
str, _ := syslogMessage.String()
|
||||||
|
assert.Equal(t, "<27>2 2010-11-10T23:30:00Z testhost testapp 25 555 [default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"foo\" value3=\"1.2\"] Test message", str, "Wrong syslog message")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyslogMapperWithDefaultSdidAndOtherSdids(t *testing.T) {
|
||||||
|
s := newSyslog()
|
||||||
|
s.DefaultSdid = "default@32473"
|
||||||
|
s.Sdids = []string{"bar@123", "foo@456"}
|
||||||
|
s.initializeSyslogMapper()
|
||||||
|
|
||||||
|
// Init metrics
|
||||||
|
m1, _ := metric.New(
|
||||||
|
"testmetric",
|
||||||
|
map[string]string{
|
||||||
|
"appname": "testapp",
|
||||||
|
"hostname": "testhost",
|
||||||
|
"tag1": "bar",
|
||||||
|
"default@32473_tag2": "foobar",
|
||||||
|
"bar@123_tag3": "barfoobar",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"severity_code": uint64(1),
|
||||||
|
"facility_code": uint64(3),
|
||||||
|
"msg": "Test message",
|
||||||
|
"procid": uint64(25),
|
||||||
|
"version": uint16(2),
|
||||||
|
"msgid": int64(555),
|
||||||
|
"timestamp": time.Date(2010, time.November, 10, 23, 30, 0, 0, time.UTC).UnixNano(),
|
||||||
|
"value1": int64(2),
|
||||||
|
"default@32473_value2": "default",
|
||||||
|
"bar@123_value3": int64(2),
|
||||||
|
"foo@456_value4": "foo",
|
||||||
|
},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
|
||||||
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
str, _ := syslogMessage.String()
|
||||||
|
assert.Equal(t, "<25>2 2010-11-10T23:30:00Z testhost testapp 25 555 [bar@123 tag3=\"barfoobar\" value3=\"2\"][default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"default\"][foo@456 value4=\"foo\"] Test message", str, "Wrong syslog message")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyslogMapperWithNoSdids(t *testing.T) {
|
||||||
|
// Init mapper
|
||||||
|
s := newSyslog()
|
||||||
|
s.initializeSyslogMapper()
|
||||||
|
|
||||||
|
// Init metrics
|
||||||
|
m1, _ := metric.New(
|
||||||
|
"testmetric",
|
||||||
|
map[string]string{
|
||||||
|
"appname": "testapp",
|
||||||
|
"hostname": "testhost",
|
||||||
|
"tag1": "bar",
|
||||||
|
"default@32473_tag2": "foobar",
|
||||||
|
"bar@123_tag3": "barfoobar",
|
||||||
|
"foo@456_tag4": "foobarfoo",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"severity_code": uint64(2),
|
||||||
|
"facility_code": uint64(3),
|
||||||
|
"msg": "Test message",
|
||||||
|
"procid": uint64(25),
|
||||||
|
"version": uint16(2),
|
||||||
|
"msgid": int64(555),
|
||||||
|
"timestamp": time.Date(2010, time.November, 10, 23, 30, 0, 0, time.UTC).UnixNano(),
|
||||||
|
"value1": int64(2),
|
||||||
|
"default@32473_value2": "default",
|
||||||
|
"bar@123_value3": int64(2),
|
||||||
|
"foo@456_value4": "foo",
|
||||||
|
},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
|
||||||
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
str, _ := syslogMessage.String()
|
||||||
|
assert.Equal(t, "<26>2 2010-11-10T23:30:00Z testhost testapp 25 555 - Test message", str, "Wrong syslog message")
|
||||||
|
}
|
|
@ -0,0 +1,205 @@
|
||||||
|
package syslog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
framing "github.com/influxdata/telegraf/internal/syslog"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGetSyslogMessageWithFramingOctectCounting(t *testing.T) {
|
||||||
|
// Init plugin
|
||||||
|
s := newSyslog()
|
||||||
|
s.initializeSyslogMapper()
|
||||||
|
|
||||||
|
// Init metrics
|
||||||
|
m1, _ := metric.New(
|
||||||
|
"testmetric",
|
||||||
|
map[string]string{
|
||||||
|
"hostname": "testhost",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
|
||||||
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, "59 <13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -", string(messageBytesWithFraming), "Incorrect Octect counting framing")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetSyslogMessageWithFramingNonTransparent(t *testing.T) {
|
||||||
|
// Init plugin
|
||||||
|
s := newSyslog()
|
||||||
|
s.initializeSyslogMapper()
|
||||||
|
s.Framing = framing.NonTransparent
|
||||||
|
|
||||||
|
// Init metrics
|
||||||
|
m1, _ := metric.New(
|
||||||
|
"testmetric",
|
||||||
|
map[string]string{
|
||||||
|
"hostname": "testhost",
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
|
||||||
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, "<13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -\x00", string(messageBytesWithFraming), "Incorrect Octect counting framing")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyslogWriteWithTcp(t *testing.T) {
|
||||||
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
s := newSyslog()
|
||||||
|
s.Address = "tcp://" + listener.Addr().String()
|
||||||
|
|
||||||
|
err = s.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
lconn, err := listener.Accept()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
testSyslogWriteWithStream(t, s, lconn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyslogWriteWithUdp(t *testing.T) {
|
||||||
|
listener, err := net.ListenPacket("udp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
s := newSyslog()
|
||||||
|
s.Address = "udp://" + listener.LocalAddr().String()
|
||||||
|
|
||||||
|
err = s.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
testSyslogWriteWithPacket(t, s, listener)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testSyslogWriteWithStream(t *testing.T, s *Syslog, lconn net.Conn) {
|
||||||
|
metrics := []telegraf.Metric{}
|
||||||
|
m1, _ := metric.New(
|
||||||
|
"testmetric",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC))
|
||||||
|
|
||||||
|
metrics = append(metrics, m1)
|
||||||
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(metrics[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = s.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
buf := make([]byte, 256)
|
||||||
|
n, err := lconn.Read(buf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, string(messageBytesWithFraming), string(buf[:n]))
|
||||||
|
}
|
||||||
|
|
||||||
|
func testSyslogWriteWithPacket(t *testing.T, s *Syslog, lconn net.PacketConn) {
|
||||||
|
s.Framing = framing.NonTransparent
|
||||||
|
metrics := []telegraf.Metric{}
|
||||||
|
m1, _ := metric.New(
|
||||||
|
"testmetric",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC))
|
||||||
|
|
||||||
|
metrics = append(metrics, m1)
|
||||||
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(metrics[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = s.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
buf := make([]byte, 256)
|
||||||
|
n, _, err := lconn.ReadFrom(buf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, string(messageBytesWithFraming), string(buf[:n]))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyslogWriteErr(t *testing.T) {
|
||||||
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
s := newSyslog()
|
||||||
|
s.Address = "tcp://" + listener.Addr().String()
|
||||||
|
|
||||||
|
err = s.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
s.Conn.(*net.TCPConn).SetReadBuffer(256)
|
||||||
|
|
||||||
|
lconn, err := listener.Accept()
|
||||||
|
require.NoError(t, err)
|
||||||
|
lconn.(*net.TCPConn).SetWriteBuffer(256)
|
||||||
|
|
||||||
|
metrics := []telegraf.Metric{testutil.TestMetric(1, "testerr")}
|
||||||
|
|
||||||
|
// close the socket to generate an error
|
||||||
|
lconn.Close()
|
||||||
|
s.Conn.Close()
|
||||||
|
err = s.Write(metrics)
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Nil(t, s.Conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyslogWriteReconnect(t *testing.T) {
|
||||||
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
s := newSyslog()
|
||||||
|
s.Address = "tcp://" + listener.Addr().String()
|
||||||
|
|
||||||
|
err = s.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
s.Conn.(*net.TCPConn).SetReadBuffer(256)
|
||||||
|
|
||||||
|
lconn, err := listener.Accept()
|
||||||
|
require.NoError(t, err)
|
||||||
|
lconn.(*net.TCPConn).SetWriteBuffer(256)
|
||||||
|
lconn.Close()
|
||||||
|
s.Conn = nil
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
var lerr error
|
||||||
|
go func() {
|
||||||
|
lconn, lerr = listener.Accept()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
metrics := []telegraf.Metric{testutil.TestMetric(1, "testerr")}
|
||||||
|
err = s.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
assert.NoError(t, lerr)
|
||||||
|
|
||||||
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(metrics[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage)
|
||||||
|
require.NoError(t, err)
|
||||||
|
buf := make([]byte, 256)
|
||||||
|
n, err := lconn.Read(buf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, string(messageBytesWithFraming), string(buf[:n]))
|
||||||
|
}
|
Loading…
Reference in New Issue