Fix length calculation of split metric buffer (#2869)
This commit is contained in:
		
							parent
							
								
									04aa732e94
								
							
						
					
					
						commit
						be03abd464
					
				|  | @ -2,7 +2,6 @@ package influxdb | |||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"log" | ||||
| 	"math/rand" | ||||
| 	"strings" | ||||
|  | @ -167,26 +166,29 @@ func (i *InfluxDB) Description() string { | |||
| 	return "Configuration for influxdb server to send metrics to" | ||||
| } | ||||
| 
 | ||||
| func (i *InfluxDB) getReader(metrics []telegraf.Metric) io.Reader { | ||||
| func (i *InfluxDB) split(metrics []telegraf.Metric) []telegraf.Metric { | ||||
| 	if !i.splitPayload { | ||||
| 		return metric.NewReader(metrics) | ||||
| 		return metrics | ||||
| 	} | ||||
| 
 | ||||
| 	splitData := make([]telegraf.Metric, 0) | ||||
| 	split := make([]telegraf.Metric, 0) | ||||
| 	for _, m := range metrics { | ||||
| 		splitData = append(splitData, m.Split(i.UDPPayload)...) | ||||
| 		split = append(split, m.Split(i.UDPPayload)...) | ||||
| 	} | ||||
| 	return metric.NewReader(splitData) | ||||
| 	return split | ||||
| } | ||||
| 
 | ||||
| // Write will choose a random server in the cluster to write to until a successful write
 | ||||
| // occurs, logging each unsuccessful. If all servers fail, return error.
 | ||||
| func (i *InfluxDB) Write(metrics []telegraf.Metric) error { | ||||
| 	metrics = i.split(metrics) | ||||
| 
 | ||||
| 	bufsize := 0 | ||||
| 	for _, m := range metrics { | ||||
| 		bufsize += m.Len() | ||||
| 	} | ||||
| 	r := i.getReader(metrics) | ||||
| 
 | ||||
| 	r := metric.NewReader(metrics) | ||||
| 
 | ||||
| 	// This will get set to nil if a successful write occurs
 | ||||
| 	err := fmt.Errorf("Could not write to any InfluxDB server in cluster") | ||||
|  |  | |||
|  | @ -2,10 +2,15 @@ package influxdb | |||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 	"net/http/httptest" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/influxdata/telegraf" | ||||
| 	"github.com/influxdata/telegraf/metric" | ||||
| 	"github.com/influxdata/telegraf/plugins/outputs/influxdb/client" | ||||
| 	"github.com/influxdata/telegraf/testutil" | ||||
| 
 | ||||
| 	"github.com/stretchr/testify/assert" | ||||
|  | @ -58,6 +63,35 @@ func TestUDPInflux(t *testing.T) { | |||
| 	require.NoError(t, i.Close()) | ||||
| } | ||||
| 
 | ||||
| func TestBasicSplit(t *testing.T) { | ||||
| 	c := &MockClient{} | ||||
| 	i := InfluxDB{ | ||||
| 		clients:      []client.Client{c}, | ||||
| 		UDPPayload:   50, | ||||
| 		splitPayload: true, | ||||
| 	} | ||||
| 
 | ||||
| 	// Input metrics:
 | ||||
| 	// test1,tag1=value1 value1=1 value2=2 1257894000000000000\n
 | ||||
| 	//
 | ||||
| 	// Split metrics:
 | ||||
| 	// test1,tag1=value1 value1=1 1257894000000000000\n
 | ||||
| 	// test1,tag1=value1 value2=2 1257894000000000000\n
 | ||||
| 	m, err := metric.New("test1", | ||||
| 		map[string]string{"tag1": "value1"}, | ||||
| 		map[string]interface{}{"value1": 1.0, "value2": 2.0}, | ||||
| 		time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), | ||||
| 	) | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	metrics := []telegraf.Metric{m} | ||||
| 	err = i.Write(metrics) | ||||
| 	require.Equal(t, 1, c.writeStreamCalled) | ||||
| 	require.Equal(t, 94, c.contentLength) | ||||
| 
 | ||||
| 	require.NoError(t, err) | ||||
| } | ||||
| 
 | ||||
| func TestHTTPInflux(t *testing.T) { | ||||
| 	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		switch r.URL.Path { | ||||
|  | @ -199,3 +233,34 @@ func TestHTTPError_FieldTypeConflict(t *testing.T) { | |||
| 	require.NoError(t, err) | ||||
| 	require.NoError(t, i.Close()) | ||||
| } | ||||
| 
 | ||||
| type MockClient struct { | ||||
| 	writeStreamCalled int | ||||
| 	contentLength     int | ||||
| } | ||||
| 
 | ||||
| func (m *MockClient) Query(command string) error { | ||||
| 	panic("not implemented") | ||||
| } | ||||
| 
 | ||||
| func (m *MockClient) Write(b []byte) (int, error) { | ||||
| 	panic("not implemented") | ||||
| } | ||||
| 
 | ||||
| func (m *MockClient) WriteWithParams(b []byte, params client.WriteParams) (int, error) { | ||||
| 	panic("not implemented") | ||||
| } | ||||
| 
 | ||||
| func (m *MockClient) WriteStream(b io.Reader, contentLength int) (int, error) { | ||||
| 	m.writeStreamCalled++ | ||||
| 	m.contentLength = contentLength | ||||
| 	return 0, nil | ||||
| } | ||||
| 
 | ||||
| func (m *MockClient) WriteStreamWithParams(b io.Reader, contentLength int, params client.WriteParams) (int, error) { | ||||
| 	panic("not implemented") | ||||
| } | ||||
| 
 | ||||
| func (m *MockClient) Close() error { | ||||
| 	panic("not implemented") | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue