From 25d3f06756bcc8c3b986de7b03833f8951a3c700 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 30 May 2017 17:38:32 -0700 Subject: [PATCH] Fix length calculation of split metric buffer (#2869) --- plugins/outputs/influxdb/influxdb.go | 16 +++--- plugins/outputs/influxdb/influxdb_test.go | 65 +++++++++++++++++++++++ 2 files changed, 74 insertions(+), 7 deletions(-) diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 4a0becbbc..df3b34226 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -2,7 +2,6 @@ package influxdb import ( "fmt" - "io" "log" "math/rand" "strings" @@ -167,26 +166,29 @@ func (i *InfluxDB) Description() string { return "Configuration for influxdb server to send metrics to" } -func (i *InfluxDB) getReader(metrics []telegraf.Metric) io.Reader { +func (i *InfluxDB) split(metrics []telegraf.Metric) []telegraf.Metric { if !i.splitPayload { - return metric.NewReader(metrics) + return metrics } - splitData := make([]telegraf.Metric, 0) + split := make([]telegraf.Metric, 0) for _, m := range metrics { - splitData = append(splitData, m.Split(i.UDPPayload)...) + split = append(split, m.Split(i.UDPPayload)...) } - return metric.NewReader(splitData) + return split } // Write will choose a random server in the cluster to write to until a successful write // occurs, logging each unsuccessful. If all servers fail, return error. func (i *InfluxDB) Write(metrics []telegraf.Metric) error { + metrics = i.split(metrics) + bufsize := 0 for _, m := range metrics { bufsize += m.Len() } - r := i.getReader(metrics) + + r := metric.NewReader(metrics) // This will get set to nil if a successful write occurs err := fmt.Errorf("Could not write to any InfluxDB server in cluster") diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index fa0abd460..04019794f 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -2,10 +2,15 @@ package influxdb import ( "fmt" + "io" "net/http" "net/http/httptest" "testing" + "time" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/outputs/influxdb/client" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" @@ -58,6 +63,35 @@ func TestUDPInflux(t *testing.T) { require.NoError(t, i.Close()) } +func TestBasicSplit(t *testing.T) { + c := &MockClient{} + i := InfluxDB{ + clients: []client.Client{c}, + UDPPayload: 50, + splitPayload: true, + } + + // Input metrics: + // test1,tag1=value1 value1=1 value2=2 1257894000000000000\n + // + // Split metrics: + // test1,tag1=value1 value1=1 1257894000000000000\n + // test1,tag1=value1 value2=2 1257894000000000000\n + m, err := metric.New("test1", + map[string]string{"tag1": "value1"}, + map[string]interface{}{"value1": 1.0, "value2": 2.0}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + require.NoError(t, err) + + metrics := []telegraf.Metric{m} + err = i.Write(metrics) + require.Equal(t, 1, c.writeStreamCalled) + require.Equal(t, 94, c.contentLength) + + require.NoError(t, err) +} + func TestHTTPInflux(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { @@ -199,3 +233,34 @@ func TestHTTPError_FieldTypeConflict(t *testing.T) { require.NoError(t, err) require.NoError(t, i.Close()) } + +type MockClient struct { + writeStreamCalled int + contentLength int +} + +func (m *MockClient) Query(command string) error { + panic("not implemented") +} + +func (m *MockClient) Write(b []byte) (int, error) { + panic("not implemented") +} + +func (m *MockClient) WriteWithParams(b []byte, params client.WriteParams) (int, error) { + panic("not implemented") +} + +func (m *MockClient) WriteStream(b io.Reader, contentLength int) (int, error) { + m.writeStreamCalled++ + m.contentLength = contentLength + return 0, nil +} + +func (m *MockClient) WriteStreamWithParams(b io.Reader, contentLength int, params client.WriteParams) (int, error) { + panic("not implemented") +} + +func (m *MockClient) Close() error { + panic("not implemented") +}