Don't read/check msg length
This commit is contained in:
parent
4ea618ea26
commit
6f4bd9ad82
|
@ -9,7 +9,6 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"strconv"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -281,46 +280,7 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
zero := time.Time{}
|
|
||||||
for {
|
for {
|
||||||
tmpLen := make([]byte, 1)
|
|
||||||
length := make([]byte, 0)
|
|
||||||
|
|
||||||
// gather length from beginning of stream
|
|
||||||
for {
|
|
||||||
if s.ReadTimeout != nil && s.ReadTimeout.Duration > 0 {
|
|
||||||
conn.SetReadDeadline(time.Now().Add(s.ReadTimeout.Duration))
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := conn.Read(tmpLen)
|
|
||||||
if err != nil {
|
|
||||||
// read timeout reached, retry
|
|
||||||
if er, ok := err.(net.Error); ok && er.Timeout() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// client has closed connection, return
|
|
||||||
if err == io.EOF {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// other error, log and return
|
|
||||||
s.store(rfc5425.Result{Error: fmt.Errorf("failed reading from syslog client - %s", err.Error())}, acc)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// if space found start parsing
|
|
||||||
if tmpLen[0] == byte(32) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// if byte read is not a number, error
|
|
||||||
if !(tmpLen[0] >= 48 && tmpLen[0] <= 57) {
|
|
||||||
s.store(rfc5425.Result{Error: fmt.Errorf("bad format received")}, acc)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
length = append(length, tmpLen[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
data := &bytes.Buffer{}
|
data := &bytes.Buffer{}
|
||||||
|
|
||||||
// read the data
|
// read the data
|
||||||
|
@ -332,6 +292,9 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// read timeout reached, parse what we have
|
// read timeout reached, parse what we have
|
||||||
if er, ok := err.(net.Error); ok && er.Timeout() {
|
if er, ok := err.(net.Error); ok && er.Timeout() {
|
||||||
|
if n == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
goto parseMsg
|
goto parseMsg
|
||||||
}
|
}
|
||||||
// client has closed connection, return
|
// client has closed connection, return
|
||||||
|
@ -345,30 +308,17 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
|
||||||
s.store(rfc5425.Result{Error: fmt.Errorf("failed reading from syslog client - %s", err.Error())}, acc)
|
s.store(rfc5425.Result{Error: fmt.Errorf("failed reading from syslog client - %s", err.Error())}, acc)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// handle client disconnect
|
||||||
parseMsg:
|
if n == 0 {
|
||||||
x, err := strconv.Atoi(string(length))
|
|
||||||
if err != nil {
|
|
||||||
s.store(rfc5425.Result{Error: fmt.Errorf("bad length received - %s", err.Error())}, acc)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if n < int64(x) {
|
parseMsg:
|
||||||
s.store(rfc5425.Result{Error: fmt.Errorf("failed reading all of msg; got %d, expected %d", n, x)}, acc)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
conn.SetReadDeadline(zero)
|
|
||||||
|
|
||||||
// reconstruct syslog payload for parser
|
|
||||||
length = append(length, byte(32))
|
|
||||||
tdata := append(length, data.Bytes()...)
|
|
||||||
|
|
||||||
var p *rfc5425.Parser
|
var p *rfc5425.Parser
|
||||||
if s.BestEffort {
|
if s.BestEffort {
|
||||||
p = rfc5425.NewParser(bytes.NewReader(tdata), rfc5425.WithBestEffort())
|
p = rfc5425.NewParser(data, rfc5425.WithBestEffort())
|
||||||
} else {
|
} else {
|
||||||
p = rfc5425.NewParser(bytes.NewReader(tdata))
|
p = rfc5425.NewParser(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.ParseExecuting(func(r *rfc5425.Result) {
|
p.ParseExecuting(func(r *rfc5425.Result) {
|
||||||
|
|
Loading…
Reference in New Issue