From beb788bfc65e3d5426a25ef27d04bb1ee7509672 Mon Sep 17 00:00:00 2001 From: Randy Coburn Date: Sat, 17 Aug 2019 00:05:08 +0200 Subject: [PATCH] Add database_tag option to influxdb_listener to add database from query string (#6257) --- plugins/inputs/influxdb_listener/README.md | 8 ++++ .../inputs/influxdb_listener/http_listener.go | 40 +++++++++++++------ .../influxdb_listener/http_listener_test.go | 33 ++++++++------- 3 files changed, 54 insertions(+), 27 deletions(-) diff --git a/plugins/inputs/influxdb_listener/README.md b/plugins/inputs/influxdb_listener/README.md index 8b6d2ad51..5efa6baf1 100644 --- a/plugins/inputs/influxdb_listener/README.md +++ b/plugins/inputs/influxdb_listener/README.md @@ -46,6 +46,14 @@ submits data to InfluxDB determines the destination database. tls_cert = "/etc/telegraf/cert.pem" tls_key = "/etc/telegraf/key.pem" + ## Optional tag name used to store the database name. + ## If the write has a database in the query string then it will be kept in this tag name. + ## This tag can be used in downstream outputs. + ## The default value of nothing means it will be off and the database will not be recorded. + ## If you have a tag that is the same as the one specified below, and supply a database, + ## the tag will be overwritten with the database supplied. + # database_tag = "" + ## Optional username and password to accept for HTTP basic authentication. ## You probably want to make sure you have TLS configured above for this. # basic_username = "foobar" diff --git a/plugins/inputs/influxdb_listener/http_listener.go b/plugins/inputs/influxdb_listener/http_listener.go index 7e5544786..5383fd2aa 100644 --- a/plugins/inputs/influxdb_listener/http_listener.go +++ b/plugins/inputs/influxdb_listener/http_listener.go @@ -37,17 +37,18 @@ const ( type TimeFunc func() time.Time type HTTPListener struct { - ServiceAddress string - ReadTimeout internal.Duration - WriteTimeout internal.Duration - MaxBodySize internal.Size - MaxLineSize internal.Size - Port int - + ServiceAddress string `toml:"service_address"` + // Port gets pulled out of ServiceAddress + Port int tlsint.ServerConfig - BasicUsername string - BasicPassword string + ReadTimeout internal.Duration `toml:"read_timeout"` + WriteTimeout internal.Duration `toml:"write_timeout"` + MaxBodySize internal.Size `toml:"max_body_size"` + MaxLineSize internal.Size `toml:"max_line_size"` + BasicUsername string `toml:"basic_username"` + BasicPassword string `toml:"basic_password"` + DatabaseTag string `toml:"database_tag"` TimeFunc @@ -93,6 +94,13 @@ const sampleConfig = ` ## Maximum line size allowed to be sent in bytes. ## 0 means to use the default of 65536 bytes (64 kibibytes) max_line_size = "64KiB" + + + ## Optional tag name used to store the database. + ## If the write has a database in the query string then it will be kept in this tag name. + ## This tag can be used in downstream outputs. + ## The default value of nothing means it will be off and the database will not be recorded. + # database_tag = "" ## Set one or more allowed client CA certificate file names to ## enable mutually authenticated TLS connections @@ -258,6 +266,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { now := h.TimeFunc() precision := req.URL.Query().Get("precision") + db := req.URL.Query().Get("db") // Handle gzip request bodies body := req.Body @@ -315,7 +324,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { if err == io.ErrUnexpectedEOF { // finished reading the request body - err = h.parse(buf[:n+bufStart], now, precision) + err = h.parse(buf[:n+bufStart], now, precision, db) if err != nil { log.Println("D! "+err.Error(), bufStart+n) return400 = true @@ -346,7 +355,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { bufStart = 0 continue } - if err := h.parse(buf[:i+1], now, precision); err != nil { + if err := h.parse(buf[:i+1], now, precision, db); err != nil { log.Println("D! " + err.Error()) return400 = true } @@ -359,7 +368,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { } } -func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error { +func (h *HTTPListener) parse(b []byte, t time.Time, precision, db string) error { h.mu.Lock() defer h.mu.Unlock() @@ -371,6 +380,13 @@ func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error { } for _, m := range metrics { + // Do we need to keep the database name in the query string. + // If a tag has been supplied to put the db in and we actually got a db query, + // then we write it in. This overwrites the database tag if one was sent. + // This makes it behave like the influx endpoint. + if h.DatabaseTag != "" && db != "" { + m.AddTag(h.DatabaseTag, db) + } h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) } diff --git a/plugins/inputs/influxdb_listener/http_listener_test.go b/plugins/inputs/influxdb_listener/http_listener_test.go index 964295061..6d14e6539 100644 --- a/plugins/inputs/influxdb_listener/http_listener_test.go +++ b/plugins/inputs/influxdb_listener/http_listener_test.go @@ -146,8 +146,11 @@ func TestWriteHTTPBasicAuth(t *testing.T) { require.EqualValues(t, http.StatusNoContent, resp.StatusCode) } -func TestWriteHTTP(t *testing.T) { +func TestWriteHTTPKeepDatabase(t *testing.T) { + testMsgWithDB := "cpu_load_short,host=server01,database=wrongdb value=12.0 1422568543702900257\n" + listener := newTestHTTPListener() + listener.DatabaseTag = "database" acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -162,7 +165,19 @@ func TestWriteHTTP(t *testing.T) { acc.Wait(1) acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, - map[string]string{"host": "server01"}, + map[string]string{"host": "server01", "database": "mydb"}, + ) + + // post single message to listener with a database tag in it already. It should be clobbered. + resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgWithDB))) + require.NoError(t, err) + resp.Body.Close() + require.EqualValues(t, 204, resp.StatusCode) + + acc.Wait(1) + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01", "database": "mydb"}, ) // post multiple message to listener @@ -177,21 +192,9 @@ func TestWriteHTTP(t *testing.T) { for _, hostTag := range hostTags { acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, - map[string]string{"host": hostTag}, + map[string]string{"host": hostTag, "database": "mydb"}, ) } - - // Post a gigantic metric to the listener and verify that an error is returned: - resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric))) - require.NoError(t, err) - resp.Body.Close() - require.EqualValues(t, 400, resp.StatusCode) - - acc.Wait(3) - acc.AssertContainsTaggedFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(12)}, - map[string]string{"host": "server01"}, - ) } // http listener should add a newline at the end of the buffer if it's not there