From 8cc72368ca4de53610e9d891f6634b8d70385e7d Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 18 Apr 2016 22:23:48 -0600 Subject: [PATCH] influxdb output: close connections & dont always overwrite closes #1058 closes #1059 also see https://github.com/influxdata/influxdb/pull/6425 --- CHANGELOG.md | 6 +++- Godeps | 2 +- plugins/outputs/influxdb/influxdb.go | 43 +++++++++++++++++++--------- 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4945a3bbd..fdd8cf98f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## v0.13 [unreleased] ### Release Notes + - **Breaking Change**: docker plugin tags. The cont_id tag no longer exists, it will now be a field, and be called container_id. Additionally, cont_image and cont_name are being renamed to container_image and container_name. @@ -15,7 +16,7 @@ So adding "container" to each metric will: (1) make it more clear that these metrics are per-container, and (2) allow users to easily drop per-container metrics if cardinality is an issue (`namedrop = ["docker_container_*"]`) -- `tagexclude` and `tagexclude` are now available, which can be used to remove +- `tagexclude` and `taginclude` are now available, which can be used to remove tags from measurements on inputs and outputs. See [the configuration doc](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md) for more details. @@ -25,14 +26,17 @@ based on _prefix_ in addition to globs. This means that a filter like `fielddrop = ["time_"]` will need to be changed to `fielddrop = ["time_*"]` ### Features + - [#1017](https://github.com/influxdata/telegraf/pull/1017): taginclude and tagexclude arguments. - [#1015](https://github.com/influxdata/telegraf/pull/1015): Docker plugin schema refactor. - [#889](https://github.com/influxdata/telegraf/pull/889): Improved MySQL plugin. Thanks @maksadbek! ### Bugfixes + - [#921](https://github.com/influxdata/telegraf/pull/921): mqtt_consumer stops gathering metrics. Thanks @chaton78! - [#1013](https://github.com/influxdata/telegraf/pull/1013): Close dead riemann output connections. Thanks @echupriyanov! - [#1012](https://github.com/influxdata/telegraf/pull/1012): Set default tags in test accumulator. +- [#1058](https://github.com/influxdata/telegraf/issues/1058): Fix possible leaky TCP connections in influxdb output. ## v0.12.1 [2016-04-14] diff --git a/Godeps b/Godeps index 71057f497..926adcb74 100644 --- a/Godeps +++ b/Godeps @@ -24,7 +24,7 @@ github.com/gorilla/context 1ea25387ff6f684839d82767c1733ff4d4d15d0a github.com/gorilla/mux c9e326e2bdec29039a3761c07bece13133863e1e github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da -github.com/influxdata/influxdb e3fef5593c21644f2b43af55d6e17e70910b0e48 +github.com/influxdata/influxdb 21db76b3374c733f37ed16ad93f3484020034351 github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0 github.com/klauspost/crc32 19b0b332c9e4516a6370a0456e6182c3b5036720 github.com/lib/pq e182dc4027e2ded4b19396d638610f2653295f36 diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 626635a3b..891c752bd 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -125,13 +125,9 @@ func (i *InfluxDB) Connect() error { return err } - // Create Database if it doesn't exist - _, e := c.Query(client.Query{ - Command: fmt.Sprintf("CREATE DATABASE IF NOT EXISTS \"%s\"", i.Database), - }) - - if e != nil { - log.Println("Database creation failed: " + e.Error()) + err = createDatabase(c, i.Database) + if err != nil { + log.Println("Database creation failed: " + err.Error()) continue } @@ -144,8 +140,24 @@ func (i *InfluxDB) Connect() error { return nil } +func createDatabase(c client.Client, database string) error { + // Create Database if it doesn't exist + _, err := c.Query(client.Query{ + Command: fmt.Sprintf("CREATE DATABASE IF NOT EXISTS \"%s\"", database), + }) + return err +} + func (i *InfluxDB) Close() error { - // InfluxDB client does not provide a Close() function + var errS string + for j, _ := range i.conns { + if err := i.conns[j].Close(); err != nil { + errS += err.Error() + } + } + if errS != "" { + return fmt.Errorf("output influxdb close failed: %s", errS) + } return nil } @@ -185,18 +197,21 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { p := rand.Perm(len(i.conns)) for _, n := range p { if e := i.conns[n].Write(bp); e != nil { - log.Println("ERROR: " + e.Error()) + // Log write failure + log.Printf("ERROR: %s", e) + // If the database was not found, try to recreate it + if strings.Contains(e.Error(), "database not found") { + if errc := createDatabase(i.conns[n], i.Database); errc != nil { + log.Printf("ERROR: Database %s not found and failed to recreate\n", + i.Database) + } + } } else { err = nil break } } - // If all of the writes failed, create a new connection array so that - // i.Connect() will be called on the next gather. - if err != nil { - i.conns = make([]client.Client, 0) - } return err }