diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go index a88c06216..daa9b3a46 100644 --- a/plugins/inputs/syslog/syslog.go +++ b/plugins/inputs/syslog/syslog.go @@ -9,6 +9,7 @@ import ( "net/url" "os" "strings" + "strconv" "sync" "time" @@ -282,19 +283,16 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) { zero := time.Time{} for { - // make a temporary bytes var to read from the connection - tmp := make([]byte, 128) - // make 0 length data bytes (since we'll be appending) - data := make([]byte, 0) + tmpLen := make([]byte, 1) + length := make([]byte, 0) - // loop through the connection stream, appending tmp to data + // gather length from beginning of stream for { if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 { conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) } - // read to the tmp var - n, err := conn.Read(tmp) + _, err := conn.Read(tmpLen) if err != nil { // read timeout reached, retry if er, ok := err.(net.Error); ok && er.Timeout() { @@ -309,22 +307,68 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) { return } - // append read data to full data - data = append(data, tmp[:n]...) - - // break if ends with '\n' (todo: need to ensure writing w/o "\n" works) - if tmp[n-1] == '\n' { //|| tmp[n-1] == 'EOF' { + // if space found start parsing + if tmpLen[0] == byte(32) { break } + + // if byte read is not a number, error + if !(tmpLen[0] >= 48 && tmpLen[0] <= 57) { + s.store(rfc5425.Result{Error: fmt.Errorf("bad format received")}, acc) + return + } + + length = append(length, tmpLen[0]) + } + + data := &bytes.Buffer{} + + // read the data + if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 { + conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration)) + } + + n, err := io.Copy(data, conn) + if err != nil { + // read timeout reached, parse what we have + if er, ok := err.(net.Error); ok && er.Timeout() { + goto parseMsg + } + // client has closed connection, return + if err == io.EOF { + if n > 0 { + goto parseMsg + } + return + } + // other error, log and return + s.store(rfc5425.Result{Error: fmt.Errorf("failed reading from syslog client - %s", err.Error())}, acc) + return + } + +parseMsg: + x, err := strconv.Atoi(string(length)) + if err != nil { + s.store(rfc5425.Result{Error: fmt.Errorf("bad length received - %s", err.Error())}, acc) + return + } + + if n < int64(x) { + s.store(rfc5425.Result{Error: fmt.Errorf("failed reading all of msg; got %d, expected %d", n, x)}, acc) + return } conn.SetReadDeadline(zero) + // reconstruct syslog payload for parser + length = append(length, byte(32)) + tdata := append(length, data.Bytes()...) + var p *rfc5425.Parser if s.BestEffort { - p = rfc5425.NewParser(bytes.NewReader(data), rfc5425.WithBestEffort()) + p = rfc5425.NewParser(bytes.NewReader(tdata), rfc5425.WithBestEffort()) } else { - p = rfc5425.NewParser(bytes.NewReader(data)) + p = rfc5425.NewParser(bytes.NewReader(tdata)) } p.ParseExecuting(func(r *rfc5425.Result) {