Add newline to influx line-protocol if not present

closes #2297
This commit is contained in:
Cameron Sparr 2017-01-23 13:50:52 -08:00
parent a852e8106e
commit 4d72cd7c9f
4 changed files with 22 additions and 8 deletions

View File

@ -80,8 +80,9 @@ plugins, not just statsd.
- [#1973](https://github.com/influxdata/telegraf/issues/1973): Partial fix: logparser CLF pattern with IPv6 addresses. - [#1973](https://github.com/influxdata/telegraf/issues/1973): Partial fix: logparser CLF pattern with IPv6 addresses.
- [#1975](https://github.com/influxdata/telegraf/issues/1975) & [#2102](https://github.com/influxdata/telegraf/issues/2102): Fix thread-safety when using multiple instances of the statsd input plugin. - [#1975](https://github.com/influxdata/telegraf/issues/1975) & [#2102](https://github.com/influxdata/telegraf/issues/2102): Fix thread-safety when using multiple instances of the statsd input plugin.
- [#2027](https://github.com/influxdata/telegraf/issues/2027): docker input: interface conversion panic fix. - [#2027](https://github.com/influxdata/telegraf/issues/2027): docker input: interface conversion panic fix.
- [#1814](https://github.com/influxdata/telegraf/issues/1814): snmp: ensure proper context is present on error messages - [#1814](https://github.com/influxdata/telegraf/issues/1814): snmp: ensure proper context is present on error messages.
- [#2299](https://github.com/influxdata/telegraf/issues/2299): opentsdb: add tcp:// prefix if no scheme provided. - [#2299](https://github.com/influxdata/telegraf/issues/2299): opentsdb: add tcp:// prefix if no scheme provided.
- [#2297](https://github.com/influxdata/telegraf/issues/2297): influx parser: parse line-protocol without newlines.
## v1.1.2 [2016-12-12] ## v1.1.2 [2016-12-12]

View File

@ -300,9 +300,6 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
} }
func (h *HTTPListener) parse(b []byte, t time.Time) error { func (h *HTTPListener) parse(b []byte, t time.Time) error {
if !bytes.HasSuffix(b, []byte("\n")) {
b = append(b, '\n')
}
metrics, err := h.parser.ParseWithDefaultTime(b, t) metrics, err := h.parser.ParseWithDefaultTime(b, t)
for _, m := range metrics { for _, m := range metrics {

View File

@ -16,6 +16,9 @@ type InfluxParser struct {
} }
func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) { func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
if !bytes.HasSuffix(buf, []byte("\n")) {
buf = append(buf, '\n')
}
// parse even if the buffer begins with a newline // parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n")) buf = bytes.TrimPrefix(buf, []byte("\n"))
metrics, err := metric.ParseWithDefaultTime(buf, t) metrics, err := metric.ParseWithDefaultTime(buf, t)

View File

@ -18,10 +18,11 @@ var (
) )
const ( const (
validInflux = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000\n" validInflux = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
validInfluxNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n" validInfluxNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
invalidInflux = "I don't think this is line protocol\n" validInfluxNoNewline = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000"
invalidInflux2 = "{\"a\": 5, \"b\": {\"c\": 6}}\n" invalidInflux = "I don't think this is line protocol\n"
invalidInflux2 = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
) )
const influxMulti = ` const influxMulti = `
@ -69,6 +70,18 @@ func TestParseValidInflux(t *testing.T) {
"cpu": "cpu0", "cpu": "cpu0",
}, metrics[0].Tags()) }, metrics[0].Tags())
assert.Equal(t, exptime, metrics[0].Time().UnixNano()) assert.Equal(t, exptime, metrics[0].Time().UnixNano())
metrics, err = parser.Parse([]byte(validInfluxNoNewline))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "cpu_load_short", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(10),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{
"cpu": "cpu0",
}, metrics[0].Tags())
assert.Equal(t, exptime, metrics[0].Time().UnixNano())
} }
func TestParseLineValidInflux(t *testing.T) { func TestParseLineValidInflux(t *testing.T) {