Enhance syslog reading and ensure proper length is read
This commit is contained in:
parent
a7545e6cac
commit
4ea618ea26
|
@ -9,6 +9,7 @@ import (
|
|||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -282,19 +283,16 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
|
|||
|
||||
zero := time.Time{}
|
||||
for {
|
||||
// make a temporary bytes var to read from the connection
|
||||
tmp := make([]byte, 128)
|
||||
// make 0 length data bytes (since we'll be appending)
|
||||
data := make([]byte, 0)
|
||||
tmpLen := make([]byte, 1)
|
||||
length := make([]byte, 0)
|
||||
|
||||
// loop through the connection stream, appending tmp to data
|
||||
// gather length from beginning of stream
|
||||
for {
|
||||
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
|
||||
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
|
||||
}
|
||||
|
||||
// read to the tmp var
|
||||
n, err := conn.Read(tmp)
|
||||
_, err := conn.Read(tmpLen)
|
||||
if err != nil {
|
||||
// read timeout reached, retry
|
||||
if er, ok := err.(net.Error); ok && er.Timeout() {
|
||||
|
@ -309,22 +307,68 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
|
|||
return
|
||||
}
|
||||
|
||||
// append read data to full data
|
||||
data = append(data, tmp[:n]...)
|
||||
|
||||
// break if ends with '\n' (todo: need to ensure writing w/o "\n" works)
|
||||
if tmp[n-1] == '\n' { //|| tmp[n-1] == 'EOF' {
|
||||
// if space found start parsing
|
||||
if tmpLen[0] == byte(32) {
|
||||
break
|
||||
}
|
||||
|
||||
// if byte read is not a number, error
|
||||
if !(tmpLen[0] >= 48 && tmpLen[0] <= 57) {
|
||||
s.store(rfc5425.Result{Error: fmt.Errorf("bad format received")}, acc)
|
||||
return
|
||||
}
|
||||
|
||||
length = append(length, tmpLen[0])
|
||||
}
|
||||
|
||||
data := &bytes.Buffer{}
|
||||
|
||||
// read the data
|
||||
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
|
||||
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
|
||||
}
|
||||
|
||||
n, err := io.Copy(data, conn)
|
||||
if err != nil {
|
||||
// read timeout reached, parse what we have
|
||||
if er, ok := err.(net.Error); ok && er.Timeout() {
|
||||
goto parseMsg
|
||||
}
|
||||
// client has closed connection, return
|
||||
if err == io.EOF {
|
||||
if n > 0 {
|
||||
goto parseMsg
|
||||
}
|
||||
return
|
||||
}
|
||||
// other error, log and return
|
||||
s.store(rfc5425.Result{Error: fmt.Errorf("failed reading from syslog client - %s", err.Error())}, acc)
|
||||
return
|
||||
}
|
||||
|
||||
parseMsg:
|
||||
x, err := strconv.Atoi(string(length))
|
||||
if err != nil {
|
||||
s.store(rfc5425.Result{Error: fmt.Errorf("bad length received - %s", err.Error())}, acc)
|
||||
return
|
||||
}
|
||||
|
||||
if n < int64(x) {
|
||||
s.store(rfc5425.Result{Error: fmt.Errorf("failed reading all of msg; got %d, expected %d", n, x)}, acc)
|
||||
return
|
||||
}
|
||||
|
||||
conn.SetReadDeadline(zero)
|
||||
|
||||
// reconstruct syslog payload for parser
|
||||
length = append(length, byte(32))
|
||||
tdata := append(length, data.Bytes()...)
|
||||
|
||||
var p *rfc5425.Parser
|
||||
if s.BestEffort {
|
||||
p = rfc5425.NewParser(bytes.NewReader(data), rfc5425.WithBestEffort())
|
||||
p = rfc5425.NewParser(bytes.NewReader(tdata), rfc5425.WithBestEffort())
|
||||
} else {
|
||||
p = rfc5425.NewParser(bytes.NewReader(data))
|
||||
p = rfc5425.NewParser(bytes.NewReader(tdata))
|
||||
}
|
||||
|
||||
p.ParseExecuting(func(r *rfc5425.Result) {
|
||||
|
|
Loading…
Reference in New Issue