diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index 7d26ddeb5..b30a8206d 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -255,6 +255,9 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } if c.config.ExcludeDatabaseTag { + // Avoid modifying the metric in case we need to retry the request. + metric = metric.Copy() + metric.Accept() metric.RemoveTag(c.config.DatabaseTag) } diff --git a/plugins/outputs/influxdb/http_test.go b/plugins/outputs/influxdb/http_test.go index e4acb1641..a09b02d43 100644 --- a/plugins/outputs/influxdb/http_test.go +++ b/plugins/outputs/influxdb/http_test.go @@ -675,3 +675,61 @@ func TestHTTP_UnixSocket(t *testing.T) { }) } } + +func TestHTTP_WriteDatabaseTagWorksOnRetry(t *testing.T) { + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + r.ParseForm() + require.Equal(t, r.Form["db"], []string{"foo"}) + + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + require.Contains(t, string(body), "cpu value=42") + + w.WriteHeader(http.StatusNoContent) + return + default: + w.WriteHeader(http.StatusNotFound) + return + } + }), + ) + defer ts.Close() + + addr := &url.URL{ + Scheme: "http", + Host: ts.Listener.Addr().String(), + } + + config := influxdb.HTTPConfig{ + URL: addr, + Database: "telegraf", + DatabaseTag: "database", + ExcludeDatabaseTag: true, + Log: testutil.Logger{}, + } + + client, err := influxdb.NewHTTPClient(config) + require.NoError(t, err) + + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "database": "foo", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + } + + ctx := context.Background() + err = client.Write(ctx, metrics) + require.NoError(t, err) + err = client.Write(ctx, metrics) + require.NoError(t, err) +} diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index fbfdf6958..b8706c9a5 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -189,6 +189,9 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } if c.ExcludeBucketTag { + // Avoid modifying the metric in case we need to retry the request. + metric = metric.Copy() + metric.Accept() metric.RemoveTag(c.BucketTag) } diff --git a/plugins/outputs/influxdb_v2/http_test.go b/plugins/outputs/influxdb_v2/http_test.go index 33ff9e24b..23c3ff05e 100644 --- a/plugins/outputs/influxdb_v2/http_test.go +++ b/plugins/outputs/influxdb_v2/http_test.go @@ -1,10 +1,17 @@ package influxdb_v2_test import ( + "context" + "io/ioutil" + "net/http" + "net/http/httptest" "net/url" "testing" + "time" + "github.com/influxdata/telegraf" influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -47,3 +54,60 @@ func TestNewHTTPClient(t *testing.T) { } } } + +func TestWriteBucketTagWorksOnRetry(t *testing.T) { + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/v2/write": + r.ParseForm() + require.Equal(t, r.Form["bucket"], []string{"foo"}) + + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + require.Contains(t, string(body), "cpu value=42") + + w.WriteHeader(http.StatusNoContent) + return + default: + w.WriteHeader(http.StatusNotFound) + return + } + }), + ) + defer ts.Close() + + addr := &url.URL{ + Scheme: "http", + Host: ts.Listener.Addr().String(), + } + + config := &influxdb.HTTPConfig{ + URL: addr, + Bucket: "telegraf", + BucketTag: "bucket", + ExcludeBucketTag: true, + } + + client, err := influxdb.NewHTTPClient(config) + require.NoError(t, err) + + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + } + + ctx := context.Background() + err = client.Write(ctx, metrics) + require.NoError(t, err) + err = client.Write(ctx, metrics) + require.NoError(t, err) +}