diff --git a/plugins/inputs/influxdb_listener/README.md b/plugins/inputs/influxdb_listener/README.md index b93573bf4..aae77fb96 100644 --- a/plugins/inputs/influxdb_listener/README.md +++ b/plugins/inputs/influxdb_listener/README.md @@ -54,6 +54,10 @@ submits data to InfluxDB determines the destination database. ## the tag will be overwritten with the database supplied. # database_tag = "" + ## If set the retention policy specified in the write query will be added as + ## the value of this tag name. + # retention_policy_tag = "" + ## Optional username and password to accept for HTTP basic authentication. ## You probably want to make sure you have TLS configured above for this. # basic_username = "foobar" diff --git a/plugins/inputs/influxdb_listener/influxdb_listener.go b/plugins/inputs/influxdb_listener/influxdb_listener.go index 1eac928af..4ba5a8c7c 100644 --- a/plugins/inputs/influxdb_listener/influxdb_listener.go +++ b/plugins/inputs/influxdb_listener/influxdb_listener.go @@ -29,13 +29,14 @@ type InfluxDBListener struct { port int tlsint.ServerConfig - ReadTimeout internal.Duration `toml:"read_timeout"` - WriteTimeout internal.Duration `toml:"write_timeout"` - MaxBodySize internal.Size `toml:"max_body_size"` - MaxLineSize internal.Size `toml:"max_line_size"` // deprecated in 1.14; ignored - BasicUsername string `toml:"basic_username"` - BasicPassword string `toml:"basic_password"` - DatabaseTag string `toml:"database_tag"` + ReadTimeout internal.Duration `toml:"read_timeout"` + WriteTimeout internal.Duration `toml:"write_timeout"` + MaxBodySize internal.Size `toml:"max_body_size"` + MaxLineSize internal.Size `toml:"max_line_size"` // deprecated in 1.14; ignored + BasicUsername string `toml:"basic_username"` + BasicPassword string `toml:"basic_password"` + DatabaseTag string `toml:"database_tag"` + RetentionPolicyTag string `toml:"retention_policy_tag"` timeFunc influx.TimeFunc @@ -72,12 +73,16 @@ const sampleConfig = ` ## 0 means to use the default of 32MiB. max_body_size = "32MiB" - ## Optional tag name used to store the database. + ## Optional tag name used to store the database. ## If the write has a database in the query string then it will be kept in this tag name. ## This tag can be used in downstream outputs. ## The default value of nothing means it will be off and the database will not be recorded. # database_tag = "" + ## If set the retention policy specified in the write query will be added as + ## the value of this tag name. + # retention_policy_tag = "" + ## Set one or more allowed client CA certificate file names to ## enable mutually authenticated TLS connections tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] @@ -255,6 +260,7 @@ func (h *InfluxDBListener) handleWrite() http.HandlerFunc { } db := req.URL.Query().Get("db") + rp := req.URL.Query().Get("rp") body := req.Body body = http.MaxBytesReader(res, body, h.MaxBodySize.Size) @@ -316,6 +322,10 @@ func (h *InfluxDBListener) handleWrite() http.HandlerFunc { m.AddTag(h.DatabaseTag, db) } + if h.RetentionPolicyTag != "" && rp != "" { + m.AddTag(h.RetentionPolicyTag, rp) + } + h.acc.AddMetric(m) } diff --git a/plugins/inputs/influxdb_listener/influxdb_listener_test.go b/plugins/inputs/influxdb_listener/influxdb_listener_test.go index 6990f6fc6..d0b2913cd 100644 --- a/plugins/inputs/influxdb_listener/influxdb_listener_test.go +++ b/plugins/inputs/influxdb_listener/influxdb_listener_test.go @@ -13,9 +13,9 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) @@ -207,6 +207,37 @@ func TestWriteKeepDatabase(t *testing.T) { } } +func TestWriteRetentionPolicyTag(t *testing.T) { + listener := newTestListener() + listener.RetentionPolicyTag = "rp" + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + resp, err := http.Post(createURL(listener, "http", "/write", "rp=myrp"), "", bytes.NewBuffer([]byte("cpu time_idle=42"))) + require.NoError(t, err) + resp.Body.Close() + require.Equal(t, 204, resp.StatusCode) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "rp": "myrp", + }, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + } + + acc.Wait(1) + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) +} + // http listener should add a newline at the end of the buffer if it's not there func TestWriteNoNewline(t *testing.T) { listener := newTestListener()