Fix database routing on retry with exclude_database_tag (#6486)
This commit is contained in:
parent
d71c8ed3b9
commit
47fd285b4a
|
@ -255,6 +255,9 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
|
|||
}
|
||||
|
||||
if c.config.ExcludeDatabaseTag {
|
||||
// Avoid modifying the metric in case we need to retry the request.
|
||||
metric = metric.Copy()
|
||||
metric.Accept()
|
||||
metric.RemoveTag(c.config.DatabaseTag)
|
||||
}
|
||||
|
||||
|
|
|
@ -675,3 +675,61 @@ func TestHTTP_UnixSocket(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTP_WriteDatabaseTagWorksOnRetry(t *testing.T) {
|
||||
ts := httptest.NewServer(
|
||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case "/write":
|
||||
r.ParseForm()
|
||||
require.Equal(t, r.Form["db"], []string{"foo"})
|
||||
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, string(body), "cpu value=42")
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
default:
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
}),
|
||||
)
|
||||
defer ts.Close()
|
||||
|
||||
addr := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: ts.Listener.Addr().String(),
|
||||
}
|
||||
|
||||
config := influxdb.HTTPConfig{
|
||||
URL: addr,
|
||||
Database: "telegraf",
|
||||
DatabaseTag: "database",
|
||||
ExcludeDatabaseTag: true,
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
client, err := influxdb.NewHTTPClient(config)
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics := []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{
|
||||
"database": "foo",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": 42.0,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
err = client.Write(ctx, metrics)
|
||||
require.NoError(t, err)
|
||||
err = client.Write(ctx, metrics)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
|
@ -189,6 +189,9 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
|
|||
}
|
||||
|
||||
if c.ExcludeBucketTag {
|
||||
// Avoid modifying the metric in case we need to retry the request.
|
||||
metric = metric.Copy()
|
||||
metric.Accept()
|
||||
metric.RemoveTag(c.BucketTag)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,10 +1,17 @@
|
|||
package influxdb_v2_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -47,3 +54,60 @@ func TestNewHTTPClient(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteBucketTagWorksOnRetry(t *testing.T) {
|
||||
ts := httptest.NewServer(
|
||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case "/api/v2/write":
|
||||
r.ParseForm()
|
||||
require.Equal(t, r.Form["bucket"], []string{"foo"})
|
||||
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, string(body), "cpu value=42")
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
default:
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
}),
|
||||
)
|
||||
defer ts.Close()
|
||||
|
||||
addr := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: ts.Listener.Addr().String(),
|
||||
}
|
||||
|
||||
config := &influxdb.HTTPConfig{
|
||||
URL: addr,
|
||||
Bucket: "telegraf",
|
||||
BucketTag: "bucket",
|
||||
ExcludeBucketTag: true,
|
||||
}
|
||||
|
||||
client, err := influxdb.NewHTTPClient(config)
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics := []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{
|
||||
"bucket": "foo",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": 42.0,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
err = client.Write(ctx, metrics)
|
||||
require.NoError(t, err)
|
||||
err = client.Write(ctx, metrics)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue