parent
73e2e6afc5
commit
839ca60b0e
|
@ -1,6 +1,7 @@
|
||||||
package syslog
|
package syslog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -279,20 +280,53 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
|
zero := time.Time{}
|
||||||
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
|
for {
|
||||||
}
|
// make a temporary bytes var to read from the connection
|
||||||
|
tmp := make([]byte, 128)
|
||||||
|
// make 0 length data bytes (since we'll be appending)
|
||||||
|
data := make([]byte, 0)
|
||||||
|
|
||||||
var p *rfc5425.Parser
|
// loop through the connection stream, appending tmp to data
|
||||||
if s.BestEffort {
|
for {
|
||||||
p = rfc5425.NewParser(conn, rfc5425.WithBestEffort())
|
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
|
||||||
} else {
|
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
|
||||||
p = rfc5425.NewParser(conn)
|
}
|
||||||
}
|
|
||||||
|
|
||||||
p.ParseExecuting(func(r *rfc5425.Result) {
|
// read to the tmp var
|
||||||
s.store(*r, acc)
|
n, err := conn.Read(tmp)
|
||||||
})
|
if err != nil {
|
||||||
|
// Ignore known/recoverable errors. In contrived tests:
|
||||||
|
// * i/o timeout error - no data to Read() before s.ReadTimeout.Duration expired
|
||||||
|
// * EOF error - connection open/close immediately
|
||||||
|
if er, ok := err.(net.Error); err != io.EOF && (ok && !er.Timeout()) {
|
||||||
|
s.store(rfc5425.Result{Error: fmt.Errorf("Failed reading from syslog client - %s", err.Error())}, acc)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// append read data to full data
|
||||||
|
data = append(data, tmp[:n]...)
|
||||||
|
|
||||||
|
// break if ends with '\n' (todo: need to ensure writing w/o "\n" works)
|
||||||
|
if tmp[n-1] == '\n' { //|| tmp[n-1] == 'EOF' {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.SetReadDeadline(zero)
|
||||||
|
|
||||||
|
var p *rfc5425.Parser
|
||||||
|
if s.BestEffort {
|
||||||
|
p = rfc5425.NewParser(bytes.NewReader(data), rfc5425.WithBestEffort())
|
||||||
|
} else {
|
||||||
|
p = rfc5425.NewParser(bytes.NewReader(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
p.ParseExecuting(func(r *rfc5425.Result) {
|
||||||
|
s.store(*r, acc)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Syslog) setKeepAlive(c *net.TCPConn) error {
|
func (s *Syslog) setKeepAlive(c *net.TCPConn) error {
|
||||||
|
|
Loading…
Reference in New Issue