From 761705c29902821239bcd8b8d601c5c7281fd8e1 Mon Sep 17 00:00:00 2001 From: javicrespo Date: Wed, 8 May 2019 20:21:51 +0200 Subject: [PATCH] Add syslog output plugin (#5802) --- .../inputs => internal}/syslog/framing.go | 0 .../syslog/framing_test.go | 0 plugins/inputs/syslog/commons_test.go | 3 +- plugins/inputs/syslog/nontransparent_test.go | 5 +- plugins/inputs/syslog/octetcounting_test.go | 5 +- plugins/inputs/syslog/syslog.go | 7 +- plugins/outputs/all/all.go | 1 + plugins/outputs/syslog/README.md | 101 ++++++++ plugins/outputs/syslog/syslog.go | 245 ++++++++++++++++++ plugins/outputs/syslog/syslog_mapper.go | 199 ++++++++++++++ plugins/outputs/syslog/syslog_mapper_test.go | 200 ++++++++++++++ plugins/outputs/syslog/syslog_test.go | 205 +++++++++++++++ 12 files changed, 963 insertions(+), 8 deletions(-) rename {plugins/inputs => internal}/syslog/framing.go (100%) rename {plugins/inputs => internal}/syslog/framing_test.go (100%) create mode 100644 plugins/outputs/syslog/README.md create mode 100644 plugins/outputs/syslog/syslog.go create mode 100644 plugins/outputs/syslog/syslog_mapper.go create mode 100644 plugins/outputs/syslog/syslog_mapper_test.go create mode 100644 plugins/outputs/syslog/syslog_test.go diff --git a/plugins/inputs/syslog/framing.go b/internal/syslog/framing.go similarity index 100% rename from plugins/inputs/syslog/framing.go rename to internal/syslog/framing.go diff --git a/plugins/inputs/syslog/framing_test.go b/internal/syslog/framing_test.go similarity index 100% rename from plugins/inputs/syslog/framing_test.go rename to internal/syslog/framing_test.go diff --git a/plugins/inputs/syslog/commons_test.go b/plugins/inputs/syslog/commons_test.go index f55d080a1..5d5562fc7 100644 --- a/plugins/inputs/syslog/commons_test.go +++ b/plugins/inputs/syslog/commons_test.go @@ -2,6 +2,7 @@ package syslog import ( "github.com/influxdata/telegraf/internal" + framing "github.com/influxdata/telegraf/internal/syslog" "github.com/influxdata/telegraf/testutil" "time" ) @@ -37,7 +38,7 @@ func newUDPSyslogReceiver(address string, bestEffort bool) *Syslog { } } -func newTCPSyslogReceiver(address string, keepAlive *internal.Duration, maxConn int, bestEffort bool, f Framing) *Syslog { +func newTCPSyslogReceiver(address string, keepAlive *internal.Duration, maxConn int, bestEffort bool, f framing.Framing) *Syslog { d := &internal.Duration{ Duration: defaultReadTimeout, } diff --git a/plugins/inputs/syslog/nontransparent_test.go b/plugins/inputs/syslog/nontransparent_test.go index 1dea84144..2bf6aa4ef 100644 --- a/plugins/inputs/syslog/nontransparent_test.go +++ b/plugins/inputs/syslog/nontransparent_test.go @@ -11,6 +11,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/influxdata/telegraf/internal" + framing "github.com/influxdata/telegraf/internal/syslog" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -138,7 +139,7 @@ func testStrictNonTransparent(t *testing.T, protocol string, address string, wan for _, tc := range getTestCasesForNonTransparent() { t.Run(tc.name, func(t *testing.T) { // Creation of a strict mode receiver - receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, NonTransparent) + receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, framing.NonTransparent) require.NotNil(t, receiver) if wantTLS { receiver.ServerConfig = *pki.TLSServerConfig() @@ -200,7 +201,7 @@ func testBestEffortNonTransparent(t *testing.T, protocol string, address string, for _, tc := range getTestCasesForNonTransparent() { t.Run(tc.name, func(t *testing.T) { // Creation of a best effort mode receiver - receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, NonTransparent) + receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, framing.NonTransparent) require.NotNil(t, receiver) if wantTLS { receiver.ServerConfig = *pki.TLSServerConfig() diff --git a/plugins/inputs/syslog/octetcounting_test.go b/plugins/inputs/syslog/octetcounting_test.go index c61805131..4f8f2d278 100644 --- a/plugins/inputs/syslog/octetcounting_test.go +++ b/plugins/inputs/syslog/octetcounting_test.go @@ -12,6 +12,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/influxdata/telegraf/internal" + framing "github.com/influxdata/telegraf/internal/syslog" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -338,7 +339,7 @@ func testStrictOctetCounting(t *testing.T, protocol string, address string, want for _, tc := range getTestCasesForOctetCounting() { t.Run(tc.name, func(t *testing.T) { // Creation of a strict mode receiver - receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, OctetCounting) + receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, framing.OctetCounting) require.NotNil(t, receiver) if wantTLS { receiver.ServerConfig = *pki.TLSServerConfig() @@ -400,7 +401,7 @@ func testBestEffortOctetCounting(t *testing.T, protocol string, address string, for _, tc := range getTestCasesForOctetCounting() { t.Run(tc.name, func(t *testing.T) { // Creation of a best effort mode receiver - receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, OctetCounting) + receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, framing.OctetCounting) require.NotNil(t, receiver) if wantTLS { receiver.ServerConfig = *pki.TLSServerConfig() diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go index 51d2ee455..e1e918759 100644 --- a/plugins/inputs/syslog/syslog.go +++ b/plugins/inputs/syslog/syslog.go @@ -18,6 +18,7 @@ import ( "github.com/influxdata/go-syslog/rfc5424" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" + framing "github.com/influxdata/telegraf/internal/syslog" tlsConfig "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -32,7 +33,7 @@ type Syslog struct { KeepAlivePeriod *internal.Duration MaxConnections int ReadTimeout *internal.Duration - Framing Framing + Framing framing.Framing Trailer nontransparent.TrailerType BestEffort bool Separator string `toml:"sdparam_separator"` @@ -313,7 +314,7 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) { } // Select the parser to use depeding on transport framing - if s.Framing == OctetCounting { + if s.Framing == framing.OctetCounting { // Octet counting transparent framing p = octetcounting.NewParser(opts...) } else { @@ -445,7 +446,7 @@ func init() { ReadTimeout: &internal.Duration{ Duration: defaultReadTimeout, }, - Framing: OctetCounting, + Framing: framing.OctetCounting, Trailer: nontransparent.LF, Separator: "_", } diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index a5d2a44da..c29d05efb 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -30,5 +30,6 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy" _ "github.com/influxdata/telegraf/plugins/outputs/socket_writer" _ "github.com/influxdata/telegraf/plugins/outputs/stackdriver" + _ "github.com/influxdata/telegraf/plugins/outputs/syslog" _ "github.com/influxdata/telegraf/plugins/outputs/wavefront" ) diff --git a/plugins/outputs/syslog/README.md b/plugins/outputs/syslog/README.md new file mode 100644 index 000000000..8655cbd6a --- /dev/null +++ b/plugins/outputs/syslog/README.md @@ -0,0 +1,101 @@ +# Syslog Output Plugin + +The syslog output plugin sends syslog messages transmitted over +[UDP](https://tools.ietf.org/html/rfc5426) or +[TCP](https://tools.ietf.org/html/rfc6587) or +[TLS](https://tools.ietf.org/html/rfc5425), with or without the octet counting framing. + +Syslog messages are formatted according to +[RFC 5424](https://tools.ietf.org/html/rfc5424). + +### Configuration + +```toml +[[outputs.syslog]] + ## URL to connect to + ## ex: address = "tcp://127.0.0.1:8094" + ## ex: address = "tcp4://127.0.0.1:8094" + ## ex: address = "tcp6://127.0.0.1:8094" + ## ex: address = "tcp6://[2001:db8::1]:8094" + ## ex: address = "udp://127.0.0.1:8094" + ## ex: address = "udp4://127.0.0.1:8094" + ## ex: address = "udp6://127.0.0.1:8094" + address = "tcp://127.0.0.1:8094" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Period between keep alive probes. + ## Only applies to TCP sockets. + ## 0 disables keep alive probes. + ## Defaults to the OS configuration. + # keep_alive_period = "5m" + + ## The framing technique with which it is expected that messages are transported (default = "octet-counting"). + ## Whether the messages come using the octect-counting (RFC5425#section-4.3.1, RFC6587#section-3.4.1), + ## or the non-transparent framing technique (RFC6587#section-3.4.2). + ## Must be one of "octect-counting", "non-transparent". + # framing = "octet-counting" + + ## The trailer to be expected in case of non-trasparent framing (default = "LF"). + ## Must be one of "LF", or "NUL". + # trailer = "LF" + + ### SD-PARAMs settings + ### A syslog message can contain multiple parameters and multiple identifiers within structured data section + ### A syslog message can contain multiple structured data sections. + ### For each unrecognised metric tag/field a SD-PARAMS can be created. + ### Example + ### Configuration => + ### sdparam_separator = "_" + ### default_sdid = "default@32473" + ### sdids = ["foo@123", "bar@456"] + ### input => xyzzy,x=y foo@123_value=42,bar@456_value2=84,something_else=1 + ### output (structured data only) => [foo@123 value=42][bar@456 value2=84][default@32473 something_else=1 x=y] + + ## SD-PARAMs separator between the sdid and tag/field key (default = "_") + # sdparam_separator = "_" + + ## Default sdid used for tags/fields that don't contain a prefix defined in the explict sdids setting below + ## If no default is specified, no SD-PARAMs will be used for unrecognised field. + # default_sdid = "default@32473" + + ##List of explicit prefixes to extract from tag/field keys and use as the SDID, if they match (see above example for more details): + # sdids = ["foo@123", "bar@456"] + ### + + ## Default severity value. Severity and Facility are used to calculate the message PRI value (RFC5424#section-6.2.1) + ## Used when no metric field with key "severity_code" is defined. + ## If unset, 5 (notice) is the default + # default_severity_code = 5 + + ## Default facility value. Facility and Severity are used to calculate the message PRI value (RFC5424#section-6.2.1) + ## Used when no metric field with key "facility_code" is defined. + ## If unset, 1 (user-level) is the default + # default_facility_code = 1 + + ## Default APP-NAME value (RFC5424#section-6.2.5) + ## Used when no metric tag with key "appname" is defined. + ## If unset, "Telegraf" is the default + # default_appname = "Telegraf" +``` + +### Metric mapping +The output plugin expects syslog metrics tags and fields to match up with the ones created in the [syslog input plugin](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/syslog#metrics). + +The following table shows the metric tags, field and defaults used to format syslog messages. + +| Syslog field | Metric Tag | Metric Field | Default value | +| --- | --- | --- | --- | +| APP-NAME | appname | - | default_appname = "Telegraf" | +| TIMESTAMP | - | timestamp | Metric's own timestamp | +| VERSION | - | version | 1 | +| PRI | - | serverity_code + (8 * facility_code)| default_severity_code=5 (notice), default_facility_code=1 (user-level)| +| HOSTNAME | hostname OR source OR host | - | os.Hostname() | +| MSGID | - | msgid | Metric name | +| PROCID | - | procid | - | +| MSG | - | msg | - | \ No newline at end of file diff --git a/plugins/outputs/syslog/syslog.go b/plugins/outputs/syslog/syslog.go new file mode 100644 index 000000000..684806b85 --- /dev/null +++ b/plugins/outputs/syslog/syslog.go @@ -0,0 +1,245 @@ +package syslog + +import ( + "crypto/tls" + "fmt" + "log" + "net" + "strconv" + "strings" + + "github.com/influxdata/go-syslog/nontransparent" + "github.com/influxdata/go-syslog/rfc5424" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + framing "github.com/influxdata/telegraf/internal/syslog" + tlsint "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/outputs" +) + +type Syslog struct { + Address string + KeepAlivePeriod *internal.Duration + DefaultSdid string + DefaultSeverityCode uint8 + DefaultFacilityCode uint8 + DefaultAppname string + Sdids []string + Separator string `toml:"sdparam_separator"` + Framing framing.Framing + Trailer nontransparent.TrailerType + net.Conn + tlsint.ClientConfig + mapper *SyslogMapper +} + +var sampleConfig = ` + ## URL to connect to + ## ex: address = "tcp://127.0.0.1:8094" + ## ex: address = "tcp4://127.0.0.1:8094" + ## ex: address = "tcp6://127.0.0.1:8094" + ## ex: address = "tcp6://[2001:db8::1]:8094" + ## ex: address = "udp://127.0.0.1:8094" + ## ex: address = "udp4://127.0.0.1:8094" + ## ex: address = "udp6://127.0.0.1:8094" + address = "tcp://127.0.0.1:8094" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Period between keep alive probes. + ## Only applies to TCP sockets. + ## 0 disables keep alive probes. + ## Defaults to the OS configuration. + # keep_alive_period = "5m" + + ## The framing technique with which it is expected that messages are transported (default = "octet-counting"). + ## Whether the messages come using the octect-counting (RFC5425#section-4.3.1, RFC6587#section-3.4.1), + ## or the non-transparent framing technique (RFC6587#section-3.4.2). + ## Must be one of "octect-counting", "non-transparent". + # framing = "octet-counting" + + ## The trailer to be expected in case of non-trasparent framing (default = "LF"). + ## Must be one of "LF", or "NUL". + # trailer = "LF" + + ### SD-PARAMs settings + ### A syslog message can contain multiple parameters and multiple identifiers within structured data section + ### A syslog message can contain multiple structured data sections. + ### For each unrecognised metric tag/field a SD-PARAMS can be created. + ### Example + ### Configuration => + ### sdparam_separator = "_" + ### default_sdid = "default@32473" + ### sdids = ["foo@123", "bar@456"] + ### input => xyzzy,x=y foo@123_value=42,bar@456_value2=84,something_else=1 + ### output (structured data only) => [foo@123 value=42][bar@456 value2=84][default@32473 something_else=1 x=y] + + ## SD-PARAMs separator between the sdid and tag/field key (default = "_") + # sdparam_separator = "_" + + ## Default sdid used for tags/fields that don't contain a prefix defined in the explict sdids setting below + ## If no default is specified, no SD-PARAMs will be used for unrecognised field. + # default_sdid = "default@32473" + + ##List of explicit prefixes to extract from tag/field keys and use as the SDID, if they match (see above example for more details): + # sdids = ["foo@123", "bar@456"] + ### + + ## Default severity value. Severity and Facility are used to calculate the message PRI value (RFC5424#section-6.2.1) + ## Used when no metric field with key "severity_code" is defined. + ## If unset, 5 (notice) is the default + # default_severity_code = 5 + + ## Default facility value. Facility and Severity are used to calculate the message PRI value (RFC5424#section-6.2.1) + ## Used when no metric field with key "facility_code" is defined. + ## If unset, 1 (user-level) is the default + # default_facility_code = 1 + + ## Default APP-NAME value (RFC5424#section-6.2.5) + ## Used when no metric tag with key "appname" is defined. + ## If unset, "Telegraf" is the default + # default_appname = "Telegraf" +` + +func (s *Syslog) Connect() error { + s.initializeSyslogMapper() + + spl := strings.SplitN(s.Address, "://", 2) + if len(spl) != 2 { + return fmt.Errorf("invalid address: %s", s.Address) + } + + tlsCfg, err := s.ClientConfig.TLSConfig() + if err != nil { + return err + } + + var c net.Conn + if tlsCfg == nil { + c, err = net.Dial(spl[0], spl[1]) + } else { + c, err = tls.Dial(spl[0], spl[1], tlsCfg) + } + if err != nil { + return err + } + + if err := s.setKeepAlive(c); err != nil { + log.Printf("unable to configure keep alive (%s): %s", s.Address, err) + } + + s.Conn = c + return nil +} + +func (s *Syslog) setKeepAlive(c net.Conn) error { + if s.KeepAlivePeriod == nil { + return nil + } + tcpc, ok := c.(*net.TCPConn) + if !ok { + return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(s.Address, "://", 2)[0]) + } + if s.KeepAlivePeriod.Duration == 0 { + return tcpc.SetKeepAlive(false) + } + if err := tcpc.SetKeepAlive(true); err != nil { + return err + } + return tcpc.SetKeepAlivePeriod(s.KeepAlivePeriod.Duration) +} + +func (s *Syslog) Close() error { + if s.Conn == nil { + return nil + } + err := s.Conn.Close() + s.Conn = nil + return err +} + +func (s *Syslog) SampleConfig() string { + return sampleConfig +} + +func (s *Syslog) Description() string { + return "Configuration for Syslog server to send metrics to" +} + +func (s *Syslog) Write(metrics []telegraf.Metric) (err error) { + if s.Conn == nil { + // previous write failed with permanent error and socket was closed. + if err = s.Connect(); err != nil { + return err + } + } + for _, metric := range metrics { + var msg *rfc5424.SyslogMessage + if msg, err = s.mapper.MapMetricToSyslogMessage(metric); err != nil { + log.Printf("E! [outputs.syslog] Failed to create syslog message: %v", err) + continue + } + var msgBytesWithFraming []byte + if msgBytesWithFraming, err = s.getSyslogMessageBytesWithFraming(msg); err != nil { + log.Printf("E! [outputs.syslog] Failed to convert syslog message with framing: %v", err) + continue + } + if _, err = s.Conn.Write(msgBytesWithFraming); err != nil { + if netErr, ok := err.(net.Error); !ok || !netErr.Temporary() { + s.Close() + s.Conn = nil + return fmt.Errorf("closing connection: %v", netErr) + } + return err + } + } + return nil +} + +func (s *Syslog) getSyslogMessageBytesWithFraming(msg *rfc5424.SyslogMessage) ([]byte, error) { + var msgString string + var err error + if msgString, err = msg.String(); err != nil { + return nil, err + } + msgBytes := []byte(msgString) + + if s.Framing == framing.OctetCounting { + return append([]byte(strconv.Itoa(len(msgBytes))+" "), msgBytes...), nil + } + // Non-transparent framing + return append(msgBytes, byte(s.Trailer)), nil +} + +func (s *Syslog) initializeSyslogMapper() { + if s.mapper != nil { + return + } + s.mapper = newSyslogMapper() + s.mapper.DefaultFacilityCode = s.DefaultFacilityCode + s.mapper.DefaultSeverityCode = s.DefaultSeverityCode + s.mapper.DefaultAppname = s.DefaultAppname + s.mapper.Separator = s.Separator + s.mapper.DefaultSdid = s.DefaultSdid + s.mapper.Sdids = s.Sdids +} + +func newSyslog() *Syslog { + return &Syslog{ + Framing: framing.OctetCounting, + Trailer: nontransparent.LF, + Separator: "_", + DefaultSeverityCode: uint8(5), // notice + DefaultFacilityCode: uint8(1), // user-level + DefaultAppname: "Telegraf", + } +} + +func init() { + outputs.Add("syslog", func() telegraf.Output { return newSyslog() }) +} diff --git a/plugins/outputs/syslog/syslog_mapper.go b/plugins/outputs/syslog/syslog_mapper.go new file mode 100644 index 000000000..ba6b0d660 --- /dev/null +++ b/plugins/outputs/syslog/syslog_mapper.go @@ -0,0 +1,199 @@ +package syslog + +import ( + "errors" + "math" + "os" + "strconv" + "strings" + "time" + + "github.com/influxdata/go-syslog/rfc5424" + "github.com/influxdata/telegraf" +) + +type SyslogMapper struct { + DefaultSdid string + DefaultSeverityCode uint8 + DefaultFacilityCode uint8 + DefaultAppname string + Sdids []string + Separator string + reservedKeys map[string]bool +} + +// MapMetricToSyslogMessage maps metrics tags/fields to syslog messages +func (sm *SyslogMapper) MapMetricToSyslogMessage(metric telegraf.Metric) (*rfc5424.SyslogMessage, error) { + msg := &rfc5424.SyslogMessage{} + + sm.mapPriority(metric, msg) + sm.mapStructuredData(metric, msg) + sm.mapAppname(metric, msg) + mapHostname(metric, msg) + mapTimestamp(metric, msg) + mapMsgID(metric, msg) + mapVersion(metric, msg) + mapProcID(metric, msg) + mapMsg(metric, msg) + + if !msg.Valid() { + return nil, errors.New("metric could not produce valid syslog message") + } + return msg, nil +} + +func (sm *SyslogMapper) mapStructuredData(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { + for _, tag := range metric.TagList() { + sm.mapStructuredDataItem(tag.Key, tag.Value, msg) + } + for _, field := range metric.FieldList() { + sm.mapStructuredDataItem(field.Key, formatValue(field.Value), msg) + } +} + +func (sm *SyslogMapper) mapStructuredDataItem(key string, value string, msg *rfc5424.SyslogMessage) { + if sm.reservedKeys[key] { + return + } + isExplicitSdid := false + for _, sdid := range sm.Sdids { + k := strings.TrimLeft(key, sdid+sm.Separator) + if len(key) > len(k) { + isExplicitSdid = true + msg.SetParameter(sdid, k, value) + break + } + } + if !isExplicitSdid && len(sm.DefaultSdid) > 0 { + k := strings.TrimPrefix(key, sm.DefaultSdid+sm.Separator) + msg.SetParameter(sm.DefaultSdid, k, value) + } +} + +func (sm *SyslogMapper) mapAppname(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { + if value, ok := metric.GetTag("appname"); ok { + msg.SetAppname(formatValue(value)) + } else { + //Use default appname + msg.SetAppname(sm.DefaultAppname) + } +} + +func mapMsgID(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { + if value, ok := metric.GetField("msgid"); ok { + msg.SetMsgID(formatValue(value)) + } else { + // We default to metric name + msg.SetMsgID(metric.Name()) + } +} + +func mapVersion(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { + if value, ok := metric.GetField("version"); ok { + switch v := value.(type) { + case uint64: + msg.SetVersion(uint16(v)) + return + } + } + msg.SetVersion(1) +} + +func mapMsg(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { + if value, ok := metric.GetField("msg"); ok { + msg.SetMessage(formatValue(value)) + } +} + +func mapProcID(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { + if value, ok := metric.GetField("procid"); ok { + msg.SetProcID(formatValue(value)) + } +} + +func (sm *SyslogMapper) mapPriority(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { + severityCode := sm.DefaultSeverityCode + facilityCode := sm.DefaultFacilityCode + + if value, ok := getFieldCode(metric, "severity_code"); ok { + severityCode = *value + } + + if value, ok := getFieldCode(metric, "facility_code"); ok { + facilityCode = *value + } + + priority := (8 * facilityCode) + severityCode + msg.SetPriority(priority) +} + +func mapHostname(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { + // Try with hostname, then with source, then with host tags, then take OS Hostname + if value, ok := metric.GetTag("hostname"); ok { + msg.SetHostname(formatValue(value)) + } else if value, ok := metric.GetTag("source"); ok { + msg.SetHostname(formatValue(value)) + } else if value, ok := metric.GetTag("host"); ok { + msg.SetHostname(formatValue(value)) + } else if value, err := os.Hostname(); err == nil { + msg.SetHostname(value) + } +} + +func mapTimestamp(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { + timestamp := metric.Time() + if value, ok := metric.GetField("timestamp"); ok { + switch v := value.(type) { + case int64: + timestamp = time.Unix(0, v).UTC() + } + } + msg.SetTimestamp(timestamp.Format(time.RFC3339)) +} + +func formatValue(value interface{}) string { + switch v := value.(type) { + case string: + return v + case bool: + if v { + return "1" + } + return "0" + case uint64: + return strconv.FormatUint(v, 10) + case int64: + return strconv.FormatInt(v, 10) + case float64: + if math.IsNaN(v) { + return "" + } + + if math.IsInf(v, 0) { + return "" + } + return strconv.FormatFloat(v, 'f', -1, 64) + } + + return "" +} + +func getFieldCode(metric telegraf.Metric, fieldKey string) (*uint8, bool) { + if value, ok := metric.GetField(fieldKey); ok { + if v, err := strconv.ParseUint(formatValue(value), 10, 8); err == nil { + r := uint8(v) + return &r, true + } + } + return nil, false +} + +func newSyslogMapper() *SyslogMapper { + return &SyslogMapper{ + reservedKeys: map[string]bool{ + "version": true, "severity_code": true, "facility_code": true, + "procid": true, "msgid": true, "msg": true, "timestamp": true, "sdid": true, + "hostname": true, "source": true, "host": true, "severity": true, + "facility": true, "appname": true}, + } +} diff --git a/plugins/outputs/syslog/syslog_mapper_test.go b/plugins/outputs/syslog/syslog_mapper_test.go new file mode 100644 index 000000000..300d5fcab --- /dev/null +++ b/plugins/outputs/syslog/syslog_mapper_test.go @@ -0,0 +1,200 @@ +package syslog + +import ( + "os" + "testing" + "time" + + "github.com/influxdata/telegraf/metric" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSyslogMapperWithDefaults(t *testing.T) { + s := newSyslog() + s.initializeSyslogMapper() + + // Init metrics + m1, _ := metric.New( + "testmetric", + map[string]string{}, + map[string]interface{}{}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + hostname, err := os.Hostname() + assert.NoError(t, err) + syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) + require.NoError(t, err) + str, _ := syslogMessage.String() + assert.Equal(t, "<13>1 2010-11-10T23:00:00Z "+hostname+" Telegraf - testmetric -", str, "Wrong syslog message") +} + +func TestSyslogMapperWithHostname(t *testing.T) { + s := newSyslog() + s.initializeSyslogMapper() + + // Init metrics + m1, _ := metric.New( + "testmetric", + map[string]string{ + "hostname": "testhost", + "source": "sourcevalue", + "host": "hostvalue", + }, + map[string]interface{}{}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) + require.NoError(t, err) + str, _ := syslogMessage.String() + assert.Equal(t, "<13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -", str, "Wrong syslog message") +} +func TestSyslogMapperWithHostnameSourceFallback(t *testing.T) { + s := newSyslog() + s.initializeSyslogMapper() + + // Init metrics + m1, _ := metric.New( + "testmetric", + map[string]string{ + "source": "sourcevalue", + "host": "hostvalue", + }, + map[string]interface{}{}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) + require.NoError(t, err) + str, _ := syslogMessage.String() + assert.Equal(t, "<13>1 2010-11-10T23:00:00Z sourcevalue Telegraf - testmetric -", str, "Wrong syslog message") +} + +func TestSyslogMapperWithHostnameHostFallback(t *testing.T) { + s := newSyslog() + s.initializeSyslogMapper() + + // Init metrics + m1, _ := metric.New( + "testmetric", + map[string]string{ + "host": "hostvalue", + }, + map[string]interface{}{}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) + require.NoError(t, err) + str, _ := syslogMessage.String() + assert.Equal(t, "<13>1 2010-11-10T23:00:00Z hostvalue Telegraf - testmetric -", str, "Wrong syslog message") +} + +func TestSyslogMapperWithDefaultSdid(t *testing.T) { + s := newSyslog() + s.DefaultSdid = "default@32473" + s.initializeSyslogMapper() + + // Init metrics + m1, _ := metric.New( + "testmetric", + map[string]string{ + "appname": "testapp", + "hostname": "testhost", + "tag1": "bar", + "default@32473_tag2": "foobar", + }, + map[string]interface{}{ + "severity_code": uint64(3), + "facility_code": uint64(3), + "msg": "Test message", + "procid": uint64(25), + "version": uint16(2), + "msgid": int64(555), + "timestamp": time.Date(2010, time.November, 10, 23, 30, 0, 0, time.UTC).UnixNano(), + "value1": int64(2), + "default@32473_value2": "foo", + "value3": float64(1.2), + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) + require.NoError(t, err) + str, _ := syslogMessage.String() + assert.Equal(t, "<27>2 2010-11-10T23:30:00Z testhost testapp 25 555 [default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"foo\" value3=\"1.2\"] Test message", str, "Wrong syslog message") +} + +func TestSyslogMapperWithDefaultSdidAndOtherSdids(t *testing.T) { + s := newSyslog() + s.DefaultSdid = "default@32473" + s.Sdids = []string{"bar@123", "foo@456"} + s.initializeSyslogMapper() + + // Init metrics + m1, _ := metric.New( + "testmetric", + map[string]string{ + "appname": "testapp", + "hostname": "testhost", + "tag1": "bar", + "default@32473_tag2": "foobar", + "bar@123_tag3": "barfoobar", + }, + map[string]interface{}{ + "severity_code": uint64(1), + "facility_code": uint64(3), + "msg": "Test message", + "procid": uint64(25), + "version": uint16(2), + "msgid": int64(555), + "timestamp": time.Date(2010, time.November, 10, 23, 30, 0, 0, time.UTC).UnixNano(), + "value1": int64(2), + "default@32473_value2": "default", + "bar@123_value3": int64(2), + "foo@456_value4": "foo", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) + require.NoError(t, err) + str, _ := syslogMessage.String() + assert.Equal(t, "<25>2 2010-11-10T23:30:00Z testhost testapp 25 555 [bar@123 tag3=\"barfoobar\" value3=\"2\"][default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"default\"][foo@456 value4=\"foo\"] Test message", str, "Wrong syslog message") +} + +func TestSyslogMapperWithNoSdids(t *testing.T) { + // Init mapper + s := newSyslog() + s.initializeSyslogMapper() + + // Init metrics + m1, _ := metric.New( + "testmetric", + map[string]string{ + "appname": "testapp", + "hostname": "testhost", + "tag1": "bar", + "default@32473_tag2": "foobar", + "bar@123_tag3": "barfoobar", + "foo@456_tag4": "foobarfoo", + }, + map[string]interface{}{ + "severity_code": uint64(2), + "facility_code": uint64(3), + "msg": "Test message", + "procid": uint64(25), + "version": uint16(2), + "msgid": int64(555), + "timestamp": time.Date(2010, time.November, 10, 23, 30, 0, 0, time.UTC).UnixNano(), + "value1": int64(2), + "default@32473_value2": "default", + "bar@123_value3": int64(2), + "foo@456_value4": "foo", + }, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) + require.NoError(t, err) + str, _ := syslogMessage.String() + assert.Equal(t, "<26>2 2010-11-10T23:30:00Z testhost testapp 25 555 - Test message", str, "Wrong syslog message") +} diff --git a/plugins/outputs/syslog/syslog_test.go b/plugins/outputs/syslog/syslog_test.go new file mode 100644 index 000000000..7581a7b53 --- /dev/null +++ b/plugins/outputs/syslog/syslog_test.go @@ -0,0 +1,205 @@ +package syslog + +import ( + "net" + "sync" + "testing" + "time" + + "github.com/influxdata/telegraf" + framing "github.com/influxdata/telegraf/internal/syslog" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGetSyslogMessageWithFramingOctectCounting(t *testing.T) { + // Init plugin + s := newSyslog() + s.initializeSyslogMapper() + + // Init metrics + m1, _ := metric.New( + "testmetric", + map[string]string{ + "hostname": "testhost", + }, + map[string]interface{}{}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) + require.NoError(t, err) + messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage) + require.NoError(t, err) + + assert.Equal(t, "59 <13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -", string(messageBytesWithFraming), "Incorrect Octect counting framing") +} + +func TestGetSyslogMessageWithFramingNonTransparent(t *testing.T) { + // Init plugin + s := newSyslog() + s.initializeSyslogMapper() + s.Framing = framing.NonTransparent + + // Init metrics + m1, _ := metric.New( + "testmetric", + map[string]string{ + "hostname": "testhost", + }, + map[string]interface{}{}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) + require.NoError(t, err) + messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage) + require.NoError(t, err) + + assert.Equal(t, "<13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -\x00", string(messageBytesWithFraming), "Incorrect Octect counting framing") +} + +func TestSyslogWriteWithTcp(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + s := newSyslog() + s.Address = "tcp://" + listener.Addr().String() + + err = s.Connect() + require.NoError(t, err) + + lconn, err := listener.Accept() + require.NoError(t, err) + + testSyslogWriteWithStream(t, s, lconn) +} + +func TestSyslogWriteWithUdp(t *testing.T) { + listener, err := net.ListenPacket("udp", "127.0.0.1:0") + require.NoError(t, err) + + s := newSyslog() + s.Address = "udp://" + listener.LocalAddr().String() + + err = s.Connect() + require.NoError(t, err) + + testSyslogWriteWithPacket(t, s, listener) +} + +func testSyslogWriteWithStream(t *testing.T, s *Syslog, lconn net.Conn) { + metrics := []telegraf.Metric{} + m1, _ := metric.New( + "testmetric", + map[string]string{}, + map[string]interface{}{}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)) + + metrics = append(metrics, m1) + syslogMessage, err := s.mapper.MapMetricToSyslogMessage(metrics[0]) + require.NoError(t, err) + messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage) + require.NoError(t, err) + + err = s.Write(metrics) + require.NoError(t, err) + + buf := make([]byte, 256) + n, err := lconn.Read(buf) + require.NoError(t, err) + assert.Equal(t, string(messageBytesWithFraming), string(buf[:n])) +} + +func testSyslogWriteWithPacket(t *testing.T, s *Syslog, lconn net.PacketConn) { + s.Framing = framing.NonTransparent + metrics := []telegraf.Metric{} + m1, _ := metric.New( + "testmetric", + map[string]string{}, + map[string]interface{}{}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)) + + metrics = append(metrics, m1) + syslogMessage, err := s.mapper.MapMetricToSyslogMessage(metrics[0]) + require.NoError(t, err) + messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage) + require.NoError(t, err) + + err = s.Write(metrics) + require.NoError(t, err) + + buf := make([]byte, 256) + n, _, err := lconn.ReadFrom(buf) + require.NoError(t, err) + assert.Equal(t, string(messageBytesWithFraming), string(buf[:n])) +} + +func TestSyslogWriteErr(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + s := newSyslog() + s.Address = "tcp://" + listener.Addr().String() + + err = s.Connect() + require.NoError(t, err) + s.Conn.(*net.TCPConn).SetReadBuffer(256) + + lconn, err := listener.Accept() + require.NoError(t, err) + lconn.(*net.TCPConn).SetWriteBuffer(256) + + metrics := []telegraf.Metric{testutil.TestMetric(1, "testerr")} + + // close the socket to generate an error + lconn.Close() + s.Conn.Close() + err = s.Write(metrics) + require.Error(t, err) + assert.Nil(t, s.Conn) +} + +func TestSyslogWriteReconnect(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + s := newSyslog() + s.Address = "tcp://" + listener.Addr().String() + + err = s.Connect() + require.NoError(t, err) + s.Conn.(*net.TCPConn).SetReadBuffer(256) + + lconn, err := listener.Accept() + require.NoError(t, err) + lconn.(*net.TCPConn).SetWriteBuffer(256) + lconn.Close() + s.Conn = nil + + wg := sync.WaitGroup{} + wg.Add(1) + var lerr error + go func() { + lconn, lerr = listener.Accept() + wg.Done() + }() + + metrics := []telegraf.Metric{testutil.TestMetric(1, "testerr")} + err = s.Write(metrics) + require.NoError(t, err) + + wg.Wait() + assert.NoError(t, lerr) + + syslogMessage, err := s.mapper.MapMetricToSyslogMessage(metrics[0]) + require.NoError(t, err) + messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage) + require.NoError(t, err) + buf := make([]byte, 256) + n, err := lconn.Read(buf) + require.NoError(t, err) + assert.Equal(t, string(messageBytesWithFraming), string(buf[:n])) +}