From 22340ad98462d2c9092ea93a510db920963d422b Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 23 Jan 2017 13:50:52 -0800 Subject: [PATCH] Add newline to influx line-protocol if not present closes #2297 --- CHANGELOG.md | 3 ++- plugins/inputs/http_listener/http_listener.go | 3 --- plugins/parsers/influx/parser.go | 3 +++ plugins/parsers/influx/parser_test.go | 21 +++++++++++++++---- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a3c26132a..c9c85953f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,8 +80,9 @@ plugins, not just statsd. - [#1973](https://github.com/influxdata/telegraf/issues/1973): Partial fix: logparser CLF pattern with IPv6 addresses. - [#1975](https://github.com/influxdata/telegraf/issues/1975) & [#2102](https://github.com/influxdata/telegraf/issues/2102): Fix thread-safety when using multiple instances of the statsd input plugin. - [#2027](https://github.com/influxdata/telegraf/issues/2027): docker input: interface conversion panic fix. -- [#1814](https://github.com/influxdata/telegraf/issues/1814): snmp: ensure proper context is present on error messages +- [#1814](https://github.com/influxdata/telegraf/issues/1814): snmp: ensure proper context is present on error messages. - [#2299](https://github.com/influxdata/telegraf/issues/2299): opentsdb: add tcp:// prefix if no scheme provided. +- [#2297](https://github.com/influxdata/telegraf/issues/2297): influx parser: parse line-protocol without newlines. ## v1.1.2 [2016-12-12] diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index 05551a966..0f426f809 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -300,9 +300,6 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { } func (h *HTTPListener) parse(b []byte, t time.Time) error { - if !bytes.HasSuffix(b, []byte("\n")) { - b = append(b, '\n') - } metrics, err := h.parser.ParseWithDefaultTime(b, t) for _, m := range metrics { diff --git a/plugins/parsers/influx/parser.go b/plugins/parsers/influx/parser.go index f04058552..c15c503f7 100644 --- a/plugins/parsers/influx/parser.go +++ b/plugins/parsers/influx/parser.go @@ -16,6 +16,9 @@ type InfluxParser struct { } func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) { + if !bytes.HasSuffix(buf, []byte("\n")) { + buf = append(buf, '\n') + } // parse even if the buffer begins with a newline buf = bytes.TrimPrefix(buf, []byte("\n")) metrics, err := metric.ParseWithDefaultTime(buf, t) diff --git a/plugins/parsers/influx/parser_test.go b/plugins/parsers/influx/parser_test.go index 477cea36e..58531ff90 100644 --- a/plugins/parsers/influx/parser_test.go +++ b/plugins/parsers/influx/parser_test.go @@ -18,10 +18,11 @@ var ( ) const ( - validInflux = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000\n" - validInfluxNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n" - invalidInflux = "I don't think this is line protocol\n" - invalidInflux2 = "{\"a\": 5, \"b\": {\"c\": 6}}\n" + validInflux = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000\n" + validInfluxNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n" + validInfluxNoNewline = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000" + invalidInflux = "I don't think this is line protocol\n" + invalidInflux2 = "{\"a\": 5, \"b\": {\"c\": 6}}\n" ) const influxMulti = ` @@ -69,6 +70,18 @@ func TestParseValidInflux(t *testing.T) { "cpu": "cpu0", }, metrics[0].Tags()) assert.Equal(t, exptime, metrics[0].Time().UnixNano()) + + metrics, err = parser.Parse([]byte(validInfluxNoNewline)) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "cpu_load_short", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "value": float64(10), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "cpu": "cpu0", + }, metrics[0].Tags()) + assert.Equal(t, exptime, metrics[0].Time().UnixNano()) } func TestParseLineValidInflux(t *testing.T) {