diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c30f32fa..2b917c5cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,11 +32,16 @@ should now look like: - [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin. - [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests. - [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin. +- [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin. +- [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD. ### Bugfixes - [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin. - [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin. +- [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin. +- [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support. +- [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload. ## v1.0 beta 2 [2016-06-21] @@ -56,6 +61,7 @@ should now look like: - [#1335](https://github.com/influxdata/telegraf/issues/1335): Fix overall ping timeout to be calculated based on per-ping timeout. - [#1374](https://github.com/influxdata/telegraf/pull/1374): Change "default" retention policy to "". - [#1377](https://github.com/influxdata/telegraf/issues/1377): Graphite output mangling '%' character. +- [#1396](https://github.com/influxdata/telegraf/pull/1396): Prometheus input plugin now supports x509 certs authentication ## v1.0 beta 1 [2016-06-07] diff --git a/README.md b/README.md index 91d051078..eaa05428c 100644 --- a/README.md +++ b/README.md @@ -221,6 +221,9 @@ Telegraf can also collect metrics via the following service plugins: * [webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks) * [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github) * [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar) +* [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer) +* [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks) +* [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks) We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API. diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 054bcf62b..c9011536a 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -526,6 +526,19 @@ # socket_suffix = "asok" +# # Read specific statistics per cgroup +# [[inputs.cgroup]] +# ## Directories in which to look for files, globs are supported. +# # paths = [ +# # "/cgroup/memory", +# # "/cgroup/memory/child1", +# # "/cgroup/memory/child2/*", +# # ] +# ## cgroup stat fields, as file names, globs are supported. +# ## these file names are appended to each path from above. +# # files = ["memory.*usage*", "memory.limit_in_bytes"] + + # # Pull Metric Statistics from Amazon CloudWatch # [[inputs.cloudwatch]] # ## Amazon Region @@ -679,6 +692,13 @@ # # ## set cluster_health to true when you want to also obtain cluster level stats # cluster_health = false +# +# ## Optional SSL Config +# # ssl_ca = "/etc/telegraf/ca.pem" +# # ssl_cert = "/etc/telegraf/cert.pem" +# # ssl_key = "/etc/telegraf/key.pem" +# ## Use SSL but skip chain & host verification +# # insecure_skip_verify = false # # Read metrics from one or more commands that can output to stdout @@ -1259,10 +1279,15 @@ # ## An array of urls to scrape metrics from. # urls = ["http://localhost:9100/metrics"] # -# ## Use SSL but skip chain & host verification -# # insecure_skip_verify = false # ## Use bearer token for authorization # # bearer_token = /path/to/bearer/token +# +# ## Optional SSL Config +# # ssl_ca = /path/to/cafile +# # ssl_cert = /path/to/certfile +# # ssl_key = /path/to/keyfile +# ## Use SSL but skip chain & host verification +# # insecure_skip_verify = false # # Reads last_run_summary.yaml file and converts to measurments diff --git a/internal/config/config.go b/internal/config/config.go index 99db2e30d..8f7821624 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -539,6 +539,13 @@ func (c *Config) LoadConfig(path string) error { return nil } +// trimBOM trims the Byte-Order-Marks from the beginning of the file. +// this is for Windows compatability only. +// see https://github.com/influxdata/telegraf/issues/1378 +func trimBOM(f []byte) []byte { + return bytes.TrimPrefix(f, []byte("\xef\xbb\xbf")) +} + // parseFile loads a TOML configuration from a provided path and // returns the AST produced from the TOML parser. When loading the file, it // will find environment variables and replace them. @@ -547,6 +554,8 @@ func parseFile(fpath string) (*ast.Table, error) { if err != nil { return nil, err } + // ugh windows why + contents = trimBOM(contents) env_vars := envVarRe.FindAll(contents, -1) for _, env_var := range env_vars { diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 69ec6e442..f2132c56b 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -6,6 +6,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/bcache" _ "github.com/influxdata/telegraf/plugins/inputs/cassandra" _ "github.com/influxdata/telegraf/plugins/inputs/ceph" + _ "github.com/influxdata/telegraf/plugins/inputs/cgroup" _ "github.com/influxdata/telegraf/plugins/inputs/chrony" _ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/inputs/conntrack" @@ -41,6 +42,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/net_response" _ "github.com/influxdata/telegraf/plugins/inputs/nginx" _ "github.com/influxdata/telegraf/plugins/inputs/nsq" + _ "github.com/influxdata/telegraf/plugins/inputs/nsq_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/nstat" _ "github.com/influxdata/telegraf/plugins/inputs/ntpq" _ "github.com/influxdata/telegraf/plugins/inputs/passenger" diff --git a/plugins/inputs/cgroup/README.md b/plugins/inputs/cgroup/README.md new file mode 100644 index 000000000..ab06342bf --- /dev/null +++ b/plugins/inputs/cgroup/README.md @@ -0,0 +1,58 @@ +# CGroup Input Plugin For Telegraf Agent + +This input plugin will capture specific statistics per cgroup. + +Following file formats are supported: + +* Single value + +``` +VAL\n +``` + +* New line separated values + +``` +VAL0\n +VAL1\n +``` + +* Space separated values + +``` +VAL0 VAL1 ...\n +``` + +* New line separated key-space-value's + +``` +KEY0 VAL0\n +KEY1 VAL1\n +``` + + +### Tags: + +All measurements have the following tags: + - path + + +### Configuration: + +``` +# [[inputs.cgroup]] + # paths = [ + # "/cgroup/memory", # root cgroup + # "/cgroup/memory/child1", # container cgroup + # "/cgroup/memory/child2/*", # all children cgroups under child2, but not child2 itself + # ] + # files = ["memory.*usage*", "memory.limit_in_bytes"] + +# [[inputs.cgroup]] + # paths = [ + # "/cgroup/cpu", # root cgroup + # "/cgroup/cpu/*", # all container cgroups + # "/cgroup/cpu/*/*", # all children cgroups under each container cgroup + # ] + # files = ["cpuacct.usage", "cpu.cfs_period_us", "cpu.cfs_quota_us"] +``` diff --git a/plugins/inputs/cgroup/cgroup.go b/plugins/inputs/cgroup/cgroup.go new file mode 100644 index 000000000..e38b6a4c1 --- /dev/null +++ b/plugins/inputs/cgroup/cgroup.go @@ -0,0 +1,35 @@ +package cgroup + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type CGroup struct { + Paths []string `toml:"paths"` + Files []string `toml:"files"` +} + +var sampleConfig = ` + ## Directories in which to look for files, globs are supported. + # paths = [ + # "/cgroup/memory", + # "/cgroup/memory/child1", + # "/cgroup/memory/child2/*", + # ] + ## cgroup stat fields, as file names, globs are supported. + ## these file names are appended to each path from above. + # files = ["memory.*usage*", "memory.limit_in_bytes"] +` + +func (g *CGroup) SampleConfig() string { + return sampleConfig +} + +func (g *CGroup) Description() string { + return "Read specific statistics per cgroup" +} + +func init() { + inputs.Add("cgroup", func() telegraf.Input { return &CGroup{} }) +} diff --git a/plugins/inputs/cgroup/cgroup_linux.go b/plugins/inputs/cgroup/cgroup_linux.go new file mode 100644 index 000000000..e8ba6f881 --- /dev/null +++ b/plugins/inputs/cgroup/cgroup_linux.go @@ -0,0 +1,244 @@ +// +build linux + +package cgroup + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "path/filepath" + "regexp" + "strconv" + + "github.com/influxdata/telegraf" +) + +const metricName = "cgroup" + +func (g *CGroup) Gather(acc telegraf.Accumulator) error { + list := make(chan pathInfo) + go g.generateDirs(list) + + for dir := range list { + if dir.err != nil { + return dir.err + } + if err := g.gatherDir(dir.path, acc); err != nil { + return err + } + } + + return nil +} + +func (g *CGroup) gatherDir(dir string, acc telegraf.Accumulator) error { + fields := make(map[string]interface{}) + + list := make(chan pathInfo) + go g.generateFiles(dir, list) + + for file := range list { + if file.err != nil { + return file.err + } + + raw, err := ioutil.ReadFile(file.path) + if err != nil { + return err + } + if len(raw) == 0 { + continue + } + + fd := fileData{data: raw, path: file.path} + if err := fd.parse(fields); err != nil { + return err + } + } + + tags := map[string]string{"path": dir} + + acc.AddFields(metricName, fields, tags) + + return nil +} + +// ====================================================================== + +type pathInfo struct { + path string + err error +} + +func isDir(path string) (bool, error) { + result, err := os.Stat(path) + if err != nil { + return false, err + } + return result.IsDir(), nil +} + +func (g *CGroup) generateDirs(list chan<- pathInfo) { + for _, dir := range g.Paths { + // getting all dirs that match the pattern 'dir' + items, err := filepath.Glob(dir) + if err != nil { + list <- pathInfo{err: err} + return + } + + for _, item := range items { + ok, err := isDir(item) + if err != nil { + list <- pathInfo{err: err} + return + } + // supply only dirs + if ok { + list <- pathInfo{path: item} + } + } + } + close(list) +} + +func (g *CGroup) generateFiles(dir string, list chan<- pathInfo) { + for _, file := range g.Files { + // getting all file paths that match the pattern 'dir + file' + // path.Base make sure that file variable does not contains part of path + items, err := filepath.Glob(path.Join(dir, path.Base(file))) + if err != nil { + list <- pathInfo{err: err} + return + } + + for _, item := range items { + ok, err := isDir(item) + if err != nil { + list <- pathInfo{err: err} + return + } + // supply only files not dirs + if !ok { + list <- pathInfo{path: item} + } + } + } + close(list) +} + +// ====================================================================== + +type fileData struct { + data []byte + path string +} + +func (fd *fileData) format() (*fileFormat, error) { + for _, ff := range fileFormats { + ok, err := ff.match(fd.data) + if err != nil { + return nil, err + } + if ok { + return &ff, nil + } + } + + return nil, fmt.Errorf("%v: unknown file format", fd.path) +} + +func (fd *fileData) parse(fields map[string]interface{}) error { + format, err := fd.format() + if err != nil { + return err + } + + format.parser(filepath.Base(fd.path), fields, fd.data) + return nil +} + +// ====================================================================== + +type fileFormat struct { + name string + pattern string + parser func(measurement string, fields map[string]interface{}, b []byte) +} + +const keyPattern = "[[:alpha:]_]+" +const valuePattern = "[\\d-]+" + +var fileFormats = [...]fileFormat{ + // VAL\n + fileFormat{ + name: "Single value", + pattern: "^" + valuePattern + "\n$", + parser: func(measurement string, fields map[string]interface{}, b []byte) { + re := regexp.MustCompile("^(" + valuePattern + ")\n$") + matches := re.FindAllStringSubmatch(string(b), -1) + fields[measurement] = numberOrString(matches[0][1]) + }, + }, + // VAL0\n + // VAL1\n + // ... + fileFormat{ + name: "New line separated values", + pattern: "^(" + valuePattern + "\n){2,}$", + parser: func(measurement string, fields map[string]interface{}, b []byte) { + re := regexp.MustCompile("(" + valuePattern + ")\n") + matches := re.FindAllStringSubmatch(string(b), -1) + for i, v := range matches { + fields[measurement+"."+strconv.Itoa(i)] = numberOrString(v[1]) + } + }, + }, + // VAL0 VAL1 ...\n + fileFormat{ + name: "Space separated values", + pattern: "^(" + valuePattern + " )+\n$", + parser: func(measurement string, fields map[string]interface{}, b []byte) { + re := regexp.MustCompile("(" + valuePattern + ") ") + matches := re.FindAllStringSubmatch(string(b), -1) + for i, v := range matches { + fields[measurement+"."+strconv.Itoa(i)] = numberOrString(v[1]) + } + }, + }, + // KEY0 VAL0\n + // KEY1 VAL1\n + // ... + fileFormat{ + name: "New line separated key-space-value's", + pattern: "^(" + keyPattern + " " + valuePattern + "\n)+$", + parser: func(measurement string, fields map[string]interface{}, b []byte) { + re := regexp.MustCompile("(" + keyPattern + ") (" + valuePattern + ")\n") + matches := re.FindAllStringSubmatch(string(b), -1) + for _, v := range matches { + fields[measurement+"."+v[1]] = numberOrString(v[2]) + } + }, + }, +} + +func numberOrString(s string) interface{} { + i, err := strconv.Atoi(s) + if err == nil { + return i + } + + return s +} + +func (f fileFormat) match(b []byte) (bool, error) { + ok, err := regexp.Match(f.pattern, b) + if err != nil { + return false, err + } + if ok { + return true, nil + } + return false, nil +} diff --git a/plugins/inputs/cgroup/cgroup_notlinux.go b/plugins/inputs/cgroup/cgroup_notlinux.go new file mode 100644 index 000000000..2bc227410 --- /dev/null +++ b/plugins/inputs/cgroup/cgroup_notlinux.go @@ -0,0 +1,11 @@ +// +build !linux + +package cgroup + +import ( + "github.com/influxdata/telegraf" +) + +func (g *CGroup) Gather(acc telegraf.Accumulator) error { + return nil +} diff --git a/plugins/inputs/cgroup/cgroup_test.go b/plugins/inputs/cgroup/cgroup_test.go new file mode 100644 index 000000000..206b51f6d --- /dev/null +++ b/plugins/inputs/cgroup/cgroup_test.go @@ -0,0 +1,182 @@ +// +build linux + +package cgroup + +import ( + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +var cg1 = &CGroup{ + Paths: []string{"testdata/memory"}, + Files: []string{ + "memory.empty", + "memory.max_usage_in_bytes", + "memory.limit_in_bytes", + "memory.stat", + "memory.use_hierarchy", + "notify_on_release", + }, +} + +func TestCgroupStatistics_1(t *testing.T) { + var acc testutil.Accumulator + + err := cg1.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "path": "testdata/memory", + } + fields := map[string]interface{}{ + "memory.stat.cache": 1739362304123123123, + "memory.stat.rss": 1775325184, + "memory.stat.rss_huge": 778043392, + "memory.stat.mapped_file": 421036032, + "memory.stat.dirty": -307200, + "memory.max_usage_in_bytes.0": 0, + "memory.max_usage_in_bytes.1": -1, + "memory.max_usage_in_bytes.2": 2, + "memory.limit_in_bytes": 223372036854771712, + "memory.use_hierarchy": "12-781", + "notify_on_release": 0, + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) +} + +// ====================================================================== + +var cg2 = &CGroup{ + Paths: []string{"testdata/cpu"}, + Files: []string{"cpuacct.usage_percpu"}, +} + +func TestCgroupStatistics_2(t *testing.T) { + var acc testutil.Accumulator + + err := cg2.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "path": "testdata/cpu", + } + fields := map[string]interface{}{ + "cpuacct.usage_percpu.0": -1452543795404, + "cpuacct.usage_percpu.1": 1376681271659, + "cpuacct.usage_percpu.2": 1450950799997, + "cpuacct.usage_percpu.3": -1473113374257, + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) +} + +// ====================================================================== + +var cg3 = &CGroup{ + Paths: []string{"testdata/memory/*"}, + Files: []string{"memory.limit_in_bytes"}, +} + +func TestCgroupStatistics_3(t *testing.T) { + var acc testutil.Accumulator + + err := cg3.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "path": "testdata/memory/group_1", + } + fields := map[string]interface{}{ + "memory.limit_in_bytes": 223372036854771712, + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + + tags = map[string]string{ + "path": "testdata/memory/group_2", + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) +} + +// ====================================================================== + +var cg4 = &CGroup{ + Paths: []string{"testdata/memory/*/*", "testdata/memory/group_2"}, + Files: []string{"memory.limit_in_bytes"}, +} + +func TestCgroupStatistics_4(t *testing.T) { + var acc testutil.Accumulator + + err := cg4.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "path": "testdata/memory/group_1/group_1_1", + } + fields := map[string]interface{}{ + "memory.limit_in_bytes": 223372036854771712, + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + + tags = map[string]string{ + "path": "testdata/memory/group_1/group_1_2", + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + + tags = map[string]string{ + "path": "testdata/memory/group_2", + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) +} + +// ====================================================================== + +var cg5 = &CGroup{ + Paths: []string{"testdata/memory/*/group_1_1"}, + Files: []string{"memory.limit_in_bytes"}, +} + +func TestCgroupStatistics_5(t *testing.T) { + var acc testutil.Accumulator + + err := cg5.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "path": "testdata/memory/group_1/group_1_1", + } + fields := map[string]interface{}{ + "memory.limit_in_bytes": 223372036854771712, + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + + tags = map[string]string{ + "path": "testdata/memory/group_2/group_1_1", + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) +} + +// ====================================================================== + +var cg6 = &CGroup{ + Paths: []string{"testdata/memory"}, + Files: []string{"memory.us*", "*/memory.kmem.*"}, +} + +func TestCgroupStatistics_6(t *testing.T) { + var acc testutil.Accumulator + + err := cg6.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "path": "testdata/memory", + } + fields := map[string]interface{}{ + "memory.usage_in_bytes": 3513667584, + "memory.use_hierarchy": "12-781", + "memory.kmem.limit_in_bytes": 9223372036854771712, + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) +} diff --git a/plugins/inputs/cgroup/testdata/blkio/blkio.io_serviced b/plugins/inputs/cgroup/testdata/blkio/blkio.io_serviced new file mode 100644 index 000000000..4b28cf721 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/blkio/blkio.io_serviced @@ -0,0 +1 @@ +Total 0 diff --git a/plugins/inputs/cgroup/testdata/blkio/blkio.throttle.io_serviced b/plugins/inputs/cgroup/testdata/blkio/blkio.throttle.io_serviced new file mode 100644 index 000000000..519480715 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/blkio/blkio.throttle.io_serviced @@ -0,0 +1,131 @@ +11:0 Read 0 +11:0 Write 0 +11:0 Sync 0 +11:0 Async 0 +11:0 Total 0 +8:0 Read 49134 +8:0 Write 216703 +8:0 Sync 177906 +8:0 Async 87931 +8:0 Total 265837 +7:7 Read 0 +7:7 Write 0 +7:7 Sync 0 +7:7 Async 0 +7:7 Total 0 +7:6 Read 0 +7:6 Write 0 +7:6 Sync 0 +7:6 Async 0 +7:6 Total 0 +7:5 Read 0 +7:5 Write 0 +7:5 Sync 0 +7:5 Async 0 +7:5 Total 0 +7:4 Read 0 +7:4 Write 0 +7:4 Sync 0 +7:4 Async 0 +7:4 Total 0 +7:3 Read 0 +7:3 Write 0 +7:3 Sync 0 +7:3 Async 0 +7:3 Total 0 +7:2 Read 0 +7:2 Write 0 +7:2 Sync 0 +7:2 Async 0 +7:2 Total 0 +7:1 Read 0 +7:1 Write 0 +7:1 Sync 0 +7:1 Async 0 +7:1 Total 0 +7:0 Read 0 +7:0 Write 0 +7:0 Sync 0 +7:0 Async 0 +7:0 Total 0 +1:15 Read 3 +1:15 Write 0 +1:15 Sync 0 +1:15 Async 3 +1:15 Total 3 +1:14 Read 3 +1:14 Write 0 +1:14 Sync 0 +1:14 Async 3 +1:14 Total 3 +1:13 Read 3 +1:13 Write 0 +1:13 Sync 0 +1:13 Async 3 +1:13 Total 3 +1:12 Read 3 +1:12 Write 0 +1:12 Sync 0 +1:12 Async 3 +1:12 Total 3 +1:11 Read 3 +1:11 Write 0 +1:11 Sync 0 +1:11 Async 3 +1:11 Total 3 +1:10 Read 3 +1:10 Write 0 +1:10 Sync 0 +1:10 Async 3 +1:10 Total 3 +1:9 Read 3 +1:9 Write 0 +1:9 Sync 0 +1:9 Async 3 +1:9 Total 3 +1:8 Read 3 +1:8 Write 0 +1:8 Sync 0 +1:8 Async 3 +1:8 Total 3 +1:7 Read 3 +1:7 Write 0 +1:7 Sync 0 +1:7 Async 3 +1:7 Total 3 +1:6 Read 3 +1:6 Write 0 +1:6 Sync 0 +1:6 Async 3 +1:6 Total 3 +1:5 Read 3 +1:5 Write 0 +1:5 Sync 0 +1:5 Async 3 +1:5 Total 3 +1:4 Read 3 +1:4 Write 0 +1:4 Sync 0 +1:4 Async 3 +1:4 Total 3 +1:3 Read 3 +1:3 Write 0 +1:3 Sync 0 +1:3 Async 3 +1:3 Total 3 +1:2 Read 3 +1:2 Write 0 +1:2 Sync 0 +1:2 Async 3 +1:2 Total 3 +1:1 Read 3 +1:1 Write 0 +1:1 Sync 0 +1:1 Async 3 +1:1 Total 3 +1:0 Read 3 +1:0 Write 0 +1:0 Sync 0 +1:0 Async 3 +1:0 Total 3 +Total 265885 diff --git a/plugins/inputs/cgroup/testdata/cpu/cpu.cfs_quota_us b/plugins/inputs/cgroup/testdata/cpu/cpu.cfs_quota_us new file mode 100644 index 000000000..3a2e3f498 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/cpu/cpu.cfs_quota_us @@ -0,0 +1 @@ +-1 diff --git a/plugins/inputs/cgroup/testdata/cpu/cpuacct.usage_percpu b/plugins/inputs/cgroup/testdata/cpu/cpuacct.usage_percpu new file mode 100644 index 000000000..36737768a --- /dev/null +++ b/plugins/inputs/cgroup/testdata/cpu/cpuacct.usage_percpu @@ -0,0 +1 @@ +-1452543795404 1376681271659 1450950799997 -1473113374257 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.limit_in_bytes new file mode 100644 index 000000000..78169435f --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.limit_in_bytes @@ -0,0 +1 @@ +223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.stat b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.stat new file mode 100644 index 000000000..a5493b9b2 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.stat @@ -0,0 +1,5 @@ +cache 1739362304123123123 +rss 1775325184 +rss_huge 778043392 +mapped_file 421036032 +dirty -307200 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.limit_in_bytes new file mode 100644 index 000000000..78169435f --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.limit_in_bytes @@ -0,0 +1 @@ +223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.stat b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.stat new file mode 100644 index 000000000..a5493b9b2 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.stat @@ -0,0 +1,5 @@ +cache 1739362304123123123 +rss 1775325184 +rss_huge 778043392 +mapped_file 421036032 +dirty -307200 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.limit_in_bytes new file mode 100644 index 000000000..564113cfa --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.limit_in_bytes @@ -0,0 +1 @@ +9223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.max_usage_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.max_usage_in_bytes new file mode 100644 index 000000000..573541ac9 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.max_usage_in_bytes @@ -0,0 +1 @@ +0 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/memory.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_1/memory.limit_in_bytes new file mode 100644 index 000000000..78169435f --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/memory.limit_in_bytes @@ -0,0 +1 @@ +223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/memory.stat b/plugins/inputs/cgroup/testdata/memory/group_1/memory.stat new file mode 100644 index 000000000..a5493b9b2 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/memory.stat @@ -0,0 +1,5 @@ +cache 1739362304123123123 +rss 1775325184 +rss_huge 778043392 +mapped_file 421036032 +dirty -307200 diff --git a/plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.limit_in_bytes new file mode 100644 index 000000000..78169435f --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.limit_in_bytes @@ -0,0 +1 @@ +223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.stat b/plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.stat new file mode 100644 index 000000000..a5493b9b2 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.stat @@ -0,0 +1,5 @@ +cache 1739362304123123123 +rss 1775325184 +rss_huge 778043392 +mapped_file 421036032 +dirty -307200 diff --git a/plugins/inputs/cgroup/testdata/memory/group_2/memory.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_2/memory.limit_in_bytes new file mode 100644 index 000000000..78169435f --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_2/memory.limit_in_bytes @@ -0,0 +1 @@ +223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/group_2/memory.stat b/plugins/inputs/cgroup/testdata/memory/group_2/memory.stat new file mode 100644 index 000000000..a5493b9b2 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_2/memory.stat @@ -0,0 +1,5 @@ +cache 1739362304123123123 +rss 1775325184 +rss_huge 778043392 +mapped_file 421036032 +dirty -307200 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.empty b/plugins/inputs/cgroup/testdata/memory/memory.empty new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/inputs/cgroup/testdata/memory/memory.kmem.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/memory.kmem.limit_in_bytes new file mode 100644 index 000000000..564113cfa --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.kmem.limit_in_bytes @@ -0,0 +1 @@ +9223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/memory.limit_in_bytes new file mode 100644 index 000000000..78169435f --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.limit_in_bytes @@ -0,0 +1 @@ +223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.max_usage_in_bytes b/plugins/inputs/cgroup/testdata/memory/memory.max_usage_in_bytes new file mode 100644 index 000000000..712313d3d --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.max_usage_in_bytes @@ -0,0 +1,3 @@ +0 +-1 +2 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.numa_stat b/plugins/inputs/cgroup/testdata/memory/memory.numa_stat new file mode 100644 index 000000000..e7c54ebb5 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.numa_stat @@ -0,0 +1,8 @@ +total=858067 N0=858067 +file=406254 N0=406254 +anon=451792 N0=451792 +unevictable=21 N0=21 +hierarchical_total=858067 N0=858067 +hierarchical_file=406254 N0=406254 +hierarchical_anon=451792 N0=451792 +hierarchical_unevictable=21 N0=21 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.stat b/plugins/inputs/cgroup/testdata/memory/memory.stat new file mode 100644 index 000000000..a5493b9b2 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.stat @@ -0,0 +1,5 @@ +cache 1739362304123123123 +rss 1775325184 +rss_huge 778043392 +mapped_file 421036032 +dirty -307200 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.usage_in_bytes b/plugins/inputs/cgroup/testdata/memory/memory.usage_in_bytes new file mode 100644 index 000000000..661151f51 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.usage_in_bytes @@ -0,0 +1 @@ +3513667584 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.use_hierarchy b/plugins/inputs/cgroup/testdata/memory/memory.use_hierarchy new file mode 100644 index 000000000..07cbc8fc6 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.use_hierarchy @@ -0,0 +1 @@ +12-781 diff --git a/plugins/inputs/cgroup/testdata/memory/notify_on_release b/plugins/inputs/cgroup/testdata/memory/notify_on_release new file mode 100644 index 000000000..573541ac9 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/notify_on_release @@ -0,0 +1 @@ +0 diff --git a/plugins/inputs/nsq_consumer/README.md b/plugins/inputs/nsq_consumer/README.md new file mode 100644 index 000000000..eac494ccb --- /dev/null +++ b/plugins/inputs/nsq_consumer/README.md @@ -0,0 +1,25 @@ +# NSQ Consumer Input Plugin + +The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD +topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types. + +## Configuration + +```toml +# Read metrics from NSQD topic(s) +[[inputs.nsq_consumer]] + ## An array of NSQD HTTP API endpoints + server = "localhost:4150" + topic = "telegraf" + channel = "consumer" + max_in_flight = 100 + + ## Data format to consume. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +``` + +## Testing +The `nsq_consumer_test` mocks out the interaction with `NSQD`. It requires no outside dependencies. diff --git a/plugins/inputs/nsq_consumer/nsq_consumer.go b/plugins/inputs/nsq_consumer/nsq_consumer.go new file mode 100644 index 000000000..b227b7e50 --- /dev/null +++ b/plugins/inputs/nsq_consumer/nsq_consumer.go @@ -0,0 +1,99 @@ +package nsq_consumer + +import ( + "log" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/nsqio/go-nsq" +) + +//NSQConsumer represents the configuration of the plugin +type NSQConsumer struct { + Server string + Topic string + Channel string + MaxInFlight int + parser parsers.Parser + consumer *nsq.Consumer + acc telegraf.Accumulator +} + +var sampleConfig = ` + ## An string representing the NSQD TCP Endpoint + server = "localhost:4150" + topic = "telegraf" + channel = "consumer" + max_in_flight = 100 + + ## Data format to consume. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +func init() { + inputs.Add("nsq_consumer", func() telegraf.Input { + return &NSQConsumer{} + }) +} + +// SetParser takes the data_format from the config and finds the right parser for that format +func (n *NSQConsumer) SetParser(parser parsers.Parser) { + n.parser = parser +} + +// SampleConfig returns config values for generating a sample configuration file +func (n *NSQConsumer) SampleConfig() string { + return sampleConfig +} + +// Description prints description string +func (n *NSQConsumer) Description() string { + return "Read NSQ topic for metrics." +} + +// Start pulls data from nsq +func (n *NSQConsumer) Start(acc telegraf.Accumulator) error { + n.acc = acc + n.connect() + n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error { + metrics, err := n.parser.Parse(message.Body) + if err != nil { + log.Printf("NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error()) + return nil + } + for _, metric := range metrics { + n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + message.Finish() + return nil + }), n.MaxInFlight) + n.consumer.ConnectToNSQD(n.Server) + return nil +} + +// Stop processing messages +func (n *NSQConsumer) Stop() { + n.consumer.Stop() +} + +// Gather is a noop +func (n *NSQConsumer) Gather(acc telegraf.Accumulator) error { + return nil +} + +func (n *NSQConsumer) connect() error { + if n.consumer == nil { + config := nsq.NewConfig() + config.MaxInFlight = n.MaxInFlight + consumer, err := nsq.NewConsumer(n.Topic, n.Channel, config) + if err != nil { + return err + } + n.consumer = consumer + } + return nil +} diff --git a/plugins/inputs/nsq_consumer/nsq_consumer_test.go b/plugins/inputs/nsq_consumer/nsq_consumer_test.go new file mode 100644 index 000000000..59db675a5 --- /dev/null +++ b/plugins/inputs/nsq_consumer/nsq_consumer_test.go @@ -0,0 +1,245 @@ +package nsq_consumer + +import ( + "bufio" + "bytes" + "encoding/binary" + "io" + "log" + "net" + "strconv" + "testing" + "time" + + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + "github.com/nsqio/go-nsq" + "github.com/stretchr/testify/assert" +) + +// This test is modeled after the kafka consumer integration test +func TestReadsMetricsFromNSQ(t *testing.T) { + msgID := nsq.MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} + msg := nsq.NewMessage(msgID, []byte("cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257")) + + script := []instruction{ + // SUB + instruction{0, nsq.FrameTypeResponse, []byte("OK")}, + // IDENTIFY + instruction{0, nsq.FrameTypeResponse, []byte("OK")}, + instruction{20 * time.Millisecond, nsq.FrameTypeMessage, frameMessage(msg)}, + // needed to exit test + instruction{100 * time.Millisecond, -1, []byte("exit")}, + } + + addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:4155") + newMockNSQD(script, addr.String()) + + consumer := &NSQConsumer{ + Server: "127.0.0.1:4155", + Topic: "telegraf", + Channel: "consume", + MaxInFlight: 1, + } + + p, _ := parsers.NewInfluxParser() + consumer.SetParser(p) + var acc testutil.Accumulator + assert.Equal(t, 0, len(acc.Metrics), "There should not be any points") + if err := consumer.Start(&acc); err != nil { + t.Fatal(err.Error()) + } else { + defer consumer.Stop() + } + + waitForPoint(&acc, t) + + if len(acc.Metrics) == 1 { + point := acc.Metrics[0] + assert.Equal(t, "cpu_load_short", point.Measurement) + assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields) + assert.Equal(t, map[string]string{ + "host": "server01", + "direction": "in", + "region": "us-west", + }, point.Tags) + assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix()) + } else { + t.Errorf("No points found in accumulator, expected 1") + } + +} + +// Waits for the metric that was sent to the kafka broker to arrive at the kafka +// consumer +func waitForPoint(acc *testutil.Accumulator, t *testing.T) { + // Give the kafka container up to 2 seconds to get the point to the consumer + ticker := time.NewTicker(5 * time.Millisecond) + defer ticker.Stop() + counter := 0 + for { + select { + case <-ticker.C: + counter++ + if counter > 1000 { + t.Fatal("Waited for 5s, point never arrived to consumer") + } else if acc.NFields() == 1 { + return + } + } + } +} + +func newMockNSQD(script []instruction, addr string) *mockNSQD { + n := &mockNSQD{ + script: script, + exitChan: make(chan int), + } + + tcpListener, err := net.Listen("tcp", addr) + if err != nil { + log.Fatalf("FATAL: listen (%s) failed - %s", n.tcpAddr.String(), err) + } + n.tcpListener = tcpListener + n.tcpAddr = tcpListener.Addr().(*net.TCPAddr) + + go n.listen() + + return n +} + +// The code below allows us to mock the interactions with nsqd. This is taken from: +// https://github.com/nsqio/go-nsq/blob/master/mock_test.go +type instruction struct { + delay time.Duration + frameType int32 + body []byte +} + +type mockNSQD struct { + script []instruction + got [][]byte + tcpAddr *net.TCPAddr + tcpListener net.Listener + exitChan chan int +} + +func (n *mockNSQD) listen() { + for { + conn, err := n.tcpListener.Accept() + if err != nil { + break + } + go n.handle(conn) + } + close(n.exitChan) +} + +func (n *mockNSQD) handle(conn net.Conn) { + var idx int + buf := make([]byte, 4) + _, err := io.ReadFull(conn, buf) + if err != nil { + log.Fatalf("ERROR: failed to read protocol version - %s", err) + } + + readChan := make(chan []byte) + readDoneChan := make(chan int) + scriptTime := time.After(n.script[0].delay) + rdr := bufio.NewReader(conn) + + go func() { + for { + line, err := rdr.ReadBytes('\n') + if err != nil { + return + } + // trim the '\n' + line = line[:len(line)-1] + readChan <- line + <-readDoneChan + } + }() + + var rdyCount int + for idx < len(n.script) { + select { + case line := <-readChan: + n.got = append(n.got, line) + params := bytes.Split(line, []byte(" ")) + switch { + case bytes.Equal(params[0], []byte("IDENTIFY")): + l := make([]byte, 4) + _, err := io.ReadFull(rdr, l) + if err != nil { + log.Printf(err.Error()) + goto exit + } + size := int32(binary.BigEndian.Uint32(l)) + b := make([]byte, size) + _, err = io.ReadFull(rdr, b) + if err != nil { + log.Printf(err.Error()) + goto exit + } + case bytes.Equal(params[0], []byte("RDY")): + rdy, _ := strconv.Atoi(string(params[1])) + rdyCount = rdy + case bytes.Equal(params[0], []byte("FIN")): + case bytes.Equal(params[0], []byte("REQ")): + } + readDoneChan <- 1 + case <-scriptTime: + inst := n.script[idx] + if bytes.Equal(inst.body, []byte("exit")) { + goto exit + } + if inst.frameType == nsq.FrameTypeMessage { + if rdyCount == 0 { + scriptTime = time.After(n.script[idx+1].delay) + continue + } + rdyCount-- + } + _, err := conn.Write(framedResponse(inst.frameType, inst.body)) + if err != nil { + log.Printf(err.Error()) + goto exit + } + scriptTime = time.After(n.script[idx+1].delay) + idx++ + } + } + +exit: + n.tcpListener.Close() + conn.Close() +} + +func framedResponse(frameType int32, data []byte) []byte { + var w bytes.Buffer + + beBuf := make([]byte, 4) + size := uint32(len(data)) + 4 + + binary.BigEndian.PutUint32(beBuf, size) + _, err := w.Write(beBuf) + if err != nil { + return nil + } + + binary.BigEndian.PutUint32(beBuf, uint32(frameType)) + _, err = w.Write(beBuf) + if err != nil { + return nil + } + + w.Write(data) + return w.Bytes() +} + +func frameMessage(m *nsq.Message) []byte { + var b bytes.Buffer + m.WriteTo(&b) + return b.Bytes() +} diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index 3aa8c8afd..8298b9d27 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -30,6 +30,26 @@ to filter and some tags kubeservice = "kube-apiserver" ``` +```toml +# Authorize with a bearer token skipping cert verification +[[inputs.prometheus]] + # An array of urls to scrape metrics from. + urls = ["http://my-kube-apiserver:8080/metrics"] + bearer_token = '/path/to/bearer/token' + insecure_skip_verify = true +``` + +```toml +# Authorize using x509 certs +[[inputs.prometheus]] + # An array of urls to scrape metrics from. + urls = ["https://my-kube-apiserver:8080/metrics"] + + ssl_ca = '/path/to/cafile' + ssl_cert = '/path/to/certfile' + ssl_key = '/path/to/keyfile' +``` + ### Measurements & Fields & Tags: Measurements and fields could be any thing. diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 1c60a363e..2eabcf92c 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -1,10 +1,10 @@ package prometheus import ( - "crypto/tls" "errors" "fmt" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "io/ioutil" "net" @@ -16,20 +16,32 @@ import ( type Prometheus struct { Urls []string - // Use SSL but skip chain & host verification - InsecureSkipVerify bool // Bearer Token authorization file path BearerToken string `toml:"bearer_token"` + + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool } var sampleConfig = ` ## An array of urls to scrape metrics from. urls = ["http://localhost:9100/metrics"] - ## Use SSL but skip chain & host verification - # insecure_skip_verify = false ## Use bearer token for authorization # bearer_token = /path/to/bearer/token + + ## Optional SSL Config + # ssl_ca = /path/to/cafile + # ssl_cert = /path/to/certfile + # ssl_key = /path/to/keyfile + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false ` func (p *Prometheus) SampleConfig() string { @@ -78,16 +90,21 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { var token []byte var resp *http.Response + tlsCfg, err := internal.GetTLSConfig( + p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify) + if err != nil { + return err + } + var rt http.RoundTripper = &http.Transport{ Dial: (&net.Dialer{ Timeout: 5 * time.Second, KeepAlive: 30 * time.Second, }).Dial, - TLSHandshakeTimeout: 5 * time.Second, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: p.InsecureSkipVerify, - }, + TLSHandshakeTimeout: 5 * time.Second, + TLSClientConfig: tlsCfg, ResponseHeaderTimeout: time.Duration(3 * time.Second), + DisableKeepAlives: true, } if p.BearerToken != "" { diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index d5e3f1ced..804ae1fad 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -25,8 +25,7 @@ var ( ) type PrometheusClient struct { - Listen string - metrics map[string]*prometheus.UntypedVec + Listen string } var sampleConfig = ` @@ -35,6 +34,14 @@ var sampleConfig = ` ` func (p *PrometheusClient) Start() error { + defer func() { + if r := recover(); r != nil { + // recovering from panic here because there is no way to stop a + // running http go server except by a kill signal. Since the server + // does not stop on SIGHUP, Start() will panic when the process + // is reloaded. + } + }() if p.Listen == "" { p.Listen = "localhost:9126" } @@ -44,7 +51,6 @@ func (p *PrometheusClient) Start() error { Addr: p.Listen, } - p.metrics = make(map[string]*prometheus.UntypedVec) go server.ListenAndServe() return nil } @@ -118,24 +124,26 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { continue } - // Create a new metric if it hasn't been created yet. - if _, ok := p.metrics[mname]; !ok { - p.metrics[mname] = prometheus.NewUntypedVec( - prometheus.UntypedOpts{ - Name: mname, - Help: "Telegraf collected metric", - }, - labels, - ) - if err := prometheus.Register(p.metrics[mname]); err != nil { - log.Printf("prometheus_client: Metric failed to register with prometheus, %s", err) - continue - } + mVec := prometheus.NewUntypedVec( + prometheus.UntypedOpts{ + Name: mname, + Help: "Telegraf collected metric", + }, + labels, + ) + collector, err := prometheus.RegisterOrGet(mVec) + if err != nil { + log.Printf("prometheus_client: Metric failed to register with prometheus, %s", err) + continue + } + mVec, ok := collector.(*prometheus.UntypedVec) + if !ok { + continue } switch val := val.(type) { case int64: - m, err := p.metrics[mname].GetMetricWith(l) + m, err := mVec.GetMetricWith(l) if err != nil { log.Printf("ERROR Getting metric in Prometheus output, "+ "key: %s, labels: %v,\nerr: %s\n", @@ -144,7 +152,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { } m.Set(float64(val)) case float64: - m, err := p.metrics[mname].GetMetricWith(l) + m, err := mVec.GetMetricWith(l) if err != nil { log.Printf("ERROR Getting metric in Prometheus output, "+ "key: %s, labels: %v,\nerr: %s\n",