Compare commits

...

6 Commits

Author SHA1 Message Date
Greg Linton 6f4bd9ad82 Don't read/check msg length 2018-07-03 15:58:03 -06:00
Greg Linton 4ea618ea26 Enhance syslog reading and ensure proper length is read 2018-07-03 15:05:59 -06:00
Greg Linton a7545e6cac Don't close connection if readTimeout exceeded 2018-07-02 18:56:58 -06:00
Greg Linton 58e815fdd1 Merge branch 'master' into bugfix/4335 2018-07-02 18:13:44 -06:00
Greg Linton 06682c6350 Cleanup syslog messages
Remove leading and trailing spaces and/or newlines from syslog message
field.
2018-07-02 18:11:58 -06:00
Greg Linton 839ca60b0e Improve syslog connection handling
Resolves #4335
2018-07-02 17:49:13 -06:00
1 changed files with 45 additions and 13 deletions

View File

@ -1,6 +1,7 @@
package syslog package syslog
import ( import (
"bytes"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"io" "io"
@ -279,20 +280,51 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
conn.Close() conn.Close()
}() }()
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 { for {
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) data := &bytes.Buffer{}
}
var p *rfc5425.Parser // read the data
if s.BestEffort { if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
p = rfc5425.NewParser(conn, rfc5425.WithBestEffort()) conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
} else { }
p = rfc5425.NewParser(conn)
}
p.ParseExecuting(func(r *rfc5425.Result) { n, err := io.Copy(data, conn)
s.store(*r, acc) if err != nil {
}) // read timeout reached, parse what we have
if er, ok := err.(net.Error); ok && er.Timeout() {
if n == 0 {
continue
}
goto parseMsg
}
// client has closed connection, return
if err == io.EOF {
if n > 0 {
goto parseMsg
}
return
}
// other error, log and return
s.store(rfc5425.Result{Error: fmt.Errorf("failed reading from syslog client - %s", err.Error())}, acc)
return
}
// handle client disconnect
if n == 0 {
return
}
parseMsg:
var p *rfc5425.Parser
if s.BestEffort {
p = rfc5425.NewParser(data, rfc5425.WithBestEffort())
} else {
p = rfc5425.NewParser(data)
}
p.ParseExecuting(func(r *rfc5425.Result) {
s.store(*r, acc)
})
}
} }
func (s *Syslog) setKeepAlive(c *net.TCPConn) error { func (s *Syslog) setKeepAlive(c *net.TCPConn) error {
@ -361,7 +393,7 @@ func fields(msg rfc5424.SyslogMessage, s *Syslog) map[string]interface{} {
} }
if msg.Message() != nil { if msg.Message() != nil {
flds["message"] = *msg.Message() flds["message"] = strings.TrimSpace(*msg.Message())
} }
if msg.StructuredData() != nil { if msg.StructuredData() != nil {