Add database_tag option to influxdb_listener to add database from query string (#6257)

This commit is contained in:
Randy Coburn 2019-08-17 00:05:08 +02:00 committed by Daniel Nelson
parent ed23466a53
commit beb788bfc6
3 changed files with 54 additions and 27 deletions

View File

@ -46,6 +46,14 @@ submits data to InfluxDB determines the destination database.
tls_cert = "/etc/telegraf/cert.pem" tls_cert = "/etc/telegraf/cert.pem"
tls_key = "/etc/telegraf/key.pem" tls_key = "/etc/telegraf/key.pem"
## Optional tag name used to store the database name.
## If the write has a database in the query string then it will be kept in this tag name.
## This tag can be used in downstream outputs.
## The default value of nothing means it will be off and the database will not be recorded.
## If you have a tag that is the same as the one specified below, and supply a database,
## the tag will be overwritten with the database supplied.
# database_tag = ""
## Optional username and password to accept for HTTP basic authentication. ## Optional username and password to accept for HTTP basic authentication.
## You probably want to make sure you have TLS configured above for this. ## You probably want to make sure you have TLS configured above for this.
# basic_username = "foobar" # basic_username = "foobar"

View File

@ -37,17 +37,18 @@ const (
type TimeFunc func() time.Time type TimeFunc func() time.Time
type HTTPListener struct { type HTTPListener struct {
ServiceAddress string ServiceAddress string `toml:"service_address"`
ReadTimeout internal.Duration // Port gets pulled out of ServiceAddress
WriteTimeout internal.Duration Port int
MaxBodySize internal.Size
MaxLineSize internal.Size
Port int
tlsint.ServerConfig tlsint.ServerConfig
BasicUsername string ReadTimeout internal.Duration `toml:"read_timeout"`
BasicPassword string WriteTimeout internal.Duration `toml:"write_timeout"`
MaxBodySize internal.Size `toml:"max_body_size"`
MaxLineSize internal.Size `toml:"max_line_size"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
DatabaseTag string `toml:"database_tag"`
TimeFunc TimeFunc
@ -93,6 +94,13 @@ const sampleConfig = `
## Maximum line size allowed to be sent in bytes. ## Maximum line size allowed to be sent in bytes.
## 0 means to use the default of 65536 bytes (64 kibibytes) ## 0 means to use the default of 65536 bytes (64 kibibytes)
max_line_size = "64KiB" max_line_size = "64KiB"
## Optional tag name used to store the database.
## If the write has a database in the query string then it will be kept in this tag name.
## This tag can be used in downstream outputs.
## The default value of nothing means it will be off and the database will not be recorded.
# database_tag = ""
## Set one or more allowed client CA certificate file names to ## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections ## enable mutually authenticated TLS connections
@ -258,6 +266,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
now := h.TimeFunc() now := h.TimeFunc()
precision := req.URL.Query().Get("precision") precision := req.URL.Query().Get("precision")
db := req.URL.Query().Get("db")
// Handle gzip request bodies // Handle gzip request bodies
body := req.Body body := req.Body
@ -315,7 +324,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
if err == io.ErrUnexpectedEOF { if err == io.ErrUnexpectedEOF {
// finished reading the request body // finished reading the request body
err = h.parse(buf[:n+bufStart], now, precision) err = h.parse(buf[:n+bufStart], now, precision, db)
if err != nil { if err != nil {
log.Println("D! "+err.Error(), bufStart+n) log.Println("D! "+err.Error(), bufStart+n)
return400 = true return400 = true
@ -346,7 +355,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
bufStart = 0 bufStart = 0
continue continue
} }
if err := h.parse(buf[:i+1], now, precision); err != nil { if err := h.parse(buf[:i+1], now, precision, db); err != nil {
log.Println("D! " + err.Error()) log.Println("D! " + err.Error())
return400 = true return400 = true
} }
@ -359,7 +368,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
} }
} }
func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error { func (h *HTTPListener) parse(b []byte, t time.Time, precision, db string) error {
h.mu.Lock() h.mu.Lock()
defer h.mu.Unlock() defer h.mu.Unlock()
@ -371,6 +380,13 @@ func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
} }
for _, m := range metrics { for _, m := range metrics {
// Do we need to keep the database name in the query string.
// If a tag has been supplied to put the db in and we actually got a db query,
// then we write it in. This overwrites the database tag if one was sent.
// This makes it behave like the influx endpoint.
if h.DatabaseTag != "" && db != "" {
m.AddTag(h.DatabaseTag, db)
}
h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
} }

View File

@ -146,8 +146,11 @@ func TestWriteHTTPBasicAuth(t *testing.T) {
require.EqualValues(t, http.StatusNoContent, resp.StatusCode) require.EqualValues(t, http.StatusNoContent, resp.StatusCode)
} }
func TestWriteHTTP(t *testing.T) { func TestWriteHTTPKeepDatabase(t *testing.T) {
testMsgWithDB := "cpu_load_short,host=server01,database=wrongdb value=12.0 1422568543702900257\n"
listener := newTestHTTPListener() listener := newTestHTTPListener()
listener.DatabaseTag = "database"
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
@ -162,7 +165,19 @@ func TestWriteHTTP(t *testing.T) {
acc.Wait(1) acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short", acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"}, map[string]string{"host": "server01", "database": "mydb"},
)
// post single message to listener with a database tag in it already. It should be clobbered.
resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgWithDB)))
require.NoError(t, err)
resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode)
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01", "database": "mydb"},
) )
// post multiple message to listener // post multiple message to listener
@ -177,21 +192,9 @@ func TestWriteHTTP(t *testing.T) {
for _, hostTag := range hostTags { for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short", acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag}, map[string]string{"host": hostTag, "database": "mydb"},
) )
} }
// Post a gigantic metric to the listener and verify that an error is returned:
resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err)
resp.Body.Close()
require.EqualValues(t, 400, resp.StatusCode)
acc.Wait(3)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
)
} }
// http listener should add a newline at the end of the buffer if it's not there // http listener should add a newline at the end of the buffer if it's not there