From b18134a4e35a71047d4fc4c9477366b2de04058a Mon Sep 17 00:00:00 2001 From: Thibault Cohen Date: Thu, 23 Jun 2016 03:59:14 -0400 Subject: [PATCH 01/11] Fix #1405 (#1406) --- CHANGELOG.md | 1 + plugins/inputs/prometheus/prometheus.go | 1 + 2 files changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f2439332..2edc48a3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ should now look like: - [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin. - [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin. +- [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin. ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 1c60a363e..d546b0eab 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -88,6 +88,7 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { InsecureSkipVerify: p.InsecureSkipVerify, }, ResponseHeaderTimeout: time.Duration(3 * time.Second), + DisableKeepAlives: true, } if p.BearerToken != "" { From 50ea7f4a9da9a4874b9a7ebd1feebf87b5791ace Mon Sep 17 00:00:00 2001 From: Victor Garcia Date: Thu, 23 Jun 2016 09:59:44 +0200 Subject: [PATCH 02/11] x509 certs authentication now supported for Prometheus input plugin (#1396) --- CHANGELOG.md | 1 + plugins/inputs/prometheus/README.md | 20 +++++++++++++++ plugins/inputs/prometheus/prometheus.go | 34 ++++++++++++++++++------- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2edc48a3d..63ce3d35c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ should now look like: - [#1335](https://github.com/influxdata/telegraf/issues/1335): Fix overall ping timeout to be calculated based on per-ping timeout. - [#1374](https://github.com/influxdata/telegraf/pull/1374): Change "default" retention policy to "". - [#1377](https://github.com/influxdata/telegraf/issues/1377): Graphite output mangling '%' character. +- [#1396](https://github.com/influxdata/telegraf/pull/1396): Prometheus input plugin now supports x509 certs authentication ## v1.0 beta 1 [2016-06-07] diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index 3aa8c8afd..8298b9d27 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -30,6 +30,26 @@ to filter and some tags kubeservice = "kube-apiserver" ``` +```toml +# Authorize with a bearer token skipping cert verification +[[inputs.prometheus]] + # An array of urls to scrape metrics from. + urls = ["http://my-kube-apiserver:8080/metrics"] + bearer_token = '/path/to/bearer/token' + insecure_skip_verify = true +``` + +```toml +# Authorize using x509 certs +[[inputs.prometheus]] + # An array of urls to scrape metrics from. + urls = ["https://my-kube-apiserver:8080/metrics"] + + ssl_ca = '/path/to/cafile' + ssl_cert = '/path/to/certfile' + ssl_key = '/path/to/keyfile' +``` + ### Measurements & Fields & Tags: Measurements and fields could be any thing. diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index d546b0eab..2eabcf92c 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -1,10 +1,10 @@ package prometheus import ( - "crypto/tls" "errors" "fmt" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "io/ioutil" "net" @@ -16,20 +16,32 @@ import ( type Prometheus struct { Urls []string - // Use SSL but skip chain & host verification - InsecureSkipVerify bool // Bearer Token authorization file path BearerToken string `toml:"bearer_token"` + + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool } var sampleConfig = ` ## An array of urls to scrape metrics from. urls = ["http://localhost:9100/metrics"] - ## Use SSL but skip chain & host verification - # insecure_skip_verify = false ## Use bearer token for authorization # bearer_token = /path/to/bearer/token + + ## Optional SSL Config + # ssl_ca = /path/to/cafile + # ssl_cert = /path/to/certfile + # ssl_key = /path/to/keyfile + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false ` func (p *Prometheus) SampleConfig() string { @@ -78,15 +90,19 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { var token []byte var resp *http.Response + tlsCfg, err := internal.GetTLSConfig( + p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify) + if err != nil { + return err + } + var rt http.RoundTripper = &http.Transport{ Dial: (&net.Dialer{ Timeout: 5 * time.Second, KeepAlive: 30 * time.Second, }).Dial, - TLSHandshakeTimeout: 5 * time.Second, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: p.InsecureSkipVerify, - }, + TLSHandshakeTimeout: 5 * time.Second, + TLSClientConfig: tlsCfg, ResponseHeaderTimeout: time.Duration(3 * time.Second), DisableKeepAlives: true, } From 5ddd61d2e21f31a2ce75459b7ebeff1e801be562 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 22 Jun 2016 18:54:29 +0100 Subject: [PATCH 03/11] Trim BOM from config file for windows support closes #1378 --- CHANGELOG.md | 1 + internal/config/config.go | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63ce3d35c..ee96aaa62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ should now look like: - [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin. - [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin. - [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin. +- [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support. ## v1.0 beta 2 [2016-06-21] diff --git a/internal/config/config.go b/internal/config/config.go index 99db2e30d..b1be77d29 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -539,6 +539,13 @@ func (c *Config) LoadConfig(path string) error { return nil } +// trimBOM trims the Byte-Order-Marks from the beginning of the file. +// this is for Windows compatability only. +// see https://github.com/influxdata/telegraf/issues/1378 +func trimBOM(fileBytes []byte) []byte { + return bytes.Trim(fileBytes, "\xef\xbb\xbf") +} + // parseFile loads a TOML configuration from a provided path and // returns the AST produced from the TOML parser. When loading the file, it // will find environment variables and replace them. @@ -547,6 +554,8 @@ func parseFile(fpath string) (*ast.Table, error) { if err != nil { return nil, err } + // ugh windows why + contents = trimBOM(contents) env_vars := envVarRe.FindAll(contents, -1) for _, env_var := range env_vars { From b0484d8a0c2171ef6fc5990d4b77cff88f43b957 Mon Sep 17 00:00:00 2001 From: Vladimir Sagan Date: Wed, 8 Jun 2016 11:13:22 +0300 Subject: [PATCH 04/11] add cgroup plugin --- plugins/inputs/all/all.go | 1 + plugins/inputs/cgroup/README.md | 59 ++++ plugins/inputs/cgroup/cgroup.go | 292 ++++++++++++++++++ plugins/inputs/cgroup/cgroup_notlinux.go | 3 + plugins/inputs/cgroup/cgroup_test.go | 182 +++++++++++ .../cgroup/testdata/blkio/blkio.io_serviced | 1 + .../testdata/blkio/blkio.throttle.io_serviced | 131 ++++++++ .../cgroup/testdata/cpu/cpu.cfs_quota_us | 1 + .../cgroup/testdata/cpu/cpuacct.usage_percpu | 1 + .../group_1/group_1_1/memory.limit_in_bytes | 1 + .../memory/group_1/group_1_1/memory.stat | 5 + .../group_1/group_1_2/memory.limit_in_bytes | 1 + .../memory/group_1/group_1_2/memory.stat | 5 + .../memory/group_1/memory.kmem.limit_in_bytes | 1 + .../group_1/memory.kmem.max_usage_in_bytes | 1 + .../memory/group_1/memory.limit_in_bytes | 1 + .../testdata/memory/group_1/memory.stat | 5 + .../group_2/group_1_1/memory.limit_in_bytes | 1 + .../memory/group_2/group_1_1/memory.stat | 5 + .../memory/group_2/memory.limit_in_bytes | 1 + .../testdata/memory/group_2/memory.stat | 5 + .../cgroup/testdata/memory/memory.empty | 0 .../memory/memory.kmem.limit_in_bytes | 1 + .../testdata/memory/memory.limit_in_bytes | 1 + .../testdata/memory/memory.max_usage_in_bytes | 3 + .../cgroup/testdata/memory/memory.numa_stat | 8 + .../inputs/cgroup/testdata/memory/memory.stat | 5 + .../testdata/memory/memory.usage_in_bytes | 1 + .../testdata/memory/memory.use_hierarchy | 1 + .../cgroup/testdata/memory/notify_on_release | 1 + 30 files changed, 724 insertions(+) create mode 100644 plugins/inputs/cgroup/README.md create mode 100644 plugins/inputs/cgroup/cgroup.go create mode 100644 plugins/inputs/cgroup/cgroup_notlinux.go create mode 100644 plugins/inputs/cgroup/cgroup_test.go create mode 100644 plugins/inputs/cgroup/testdata/blkio/blkio.io_serviced create mode 100644 plugins/inputs/cgroup/testdata/blkio/blkio.throttle.io_serviced create mode 100644 plugins/inputs/cgroup/testdata/cpu/cpu.cfs_quota_us create mode 100644 plugins/inputs/cgroup/testdata/cpu/cpuacct.usage_percpu create mode 100644 plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.limit_in_bytes create mode 100644 plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.stat create mode 100644 plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.limit_in_bytes create mode 100644 plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.stat create mode 100644 plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.limit_in_bytes create mode 100644 plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.max_usage_in_bytes create mode 100644 plugins/inputs/cgroup/testdata/memory/group_1/memory.limit_in_bytes create mode 100644 plugins/inputs/cgroup/testdata/memory/group_1/memory.stat create mode 100644 plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.limit_in_bytes create mode 100644 plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.stat create mode 100644 plugins/inputs/cgroup/testdata/memory/group_2/memory.limit_in_bytes create mode 100644 plugins/inputs/cgroup/testdata/memory/group_2/memory.stat create mode 100644 plugins/inputs/cgroup/testdata/memory/memory.empty create mode 100644 plugins/inputs/cgroup/testdata/memory/memory.kmem.limit_in_bytes create mode 100644 plugins/inputs/cgroup/testdata/memory/memory.limit_in_bytes create mode 100644 plugins/inputs/cgroup/testdata/memory/memory.max_usage_in_bytes create mode 100644 plugins/inputs/cgroup/testdata/memory/memory.numa_stat create mode 100644 plugins/inputs/cgroup/testdata/memory/memory.stat create mode 100644 plugins/inputs/cgroup/testdata/memory/memory.usage_in_bytes create mode 100644 plugins/inputs/cgroup/testdata/memory/memory.use_hierarchy create mode 100644 plugins/inputs/cgroup/testdata/memory/notify_on_release diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index e73b71eb3..512753b7a 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -6,6 +6,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/bcache" _ "github.com/influxdata/telegraf/plugins/inputs/cassandra" _ "github.com/influxdata/telegraf/plugins/inputs/ceph" + _ "github.com/influxdata/telegraf/plugins/inputs/cgroup" _ "github.com/influxdata/telegraf/plugins/inputs/chrony" _ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/inputs/conntrack" diff --git a/plugins/inputs/cgroup/README.md b/plugins/inputs/cgroup/README.md new file mode 100644 index 000000000..a8fd1243e --- /dev/null +++ b/plugins/inputs/cgroup/README.md @@ -0,0 +1,59 @@ +# CGroup Input Plugin For Telegraf Agent + +This input plugin will capture specific statistics per cgroup. + +Following file formats are supported: + +* Single value + +``` +VAL\n +``` + +* New line separated values + +``` +VAL0\n +VAL1\n +``` + +* Space separated values + +``` +VAL0 VAL1 ...\n +``` + +* New line separated key-space-value's + +``` +KEY0 VAL0\n +KEY1 VAL1\n +``` + + +### Tags: + +All measurements have the following tags: + - path + + +### Configuration: + +``` +# [[inputs.cgroup]] + # flush_scope = 10 # optional (the fields will be divided into parts of 10 items) + # paths = [ + # "/cgroup/memory", # root cgroup + # "/cgroup/memory/child1", # container cgroup + # "/cgroup/memory/child2/*", # all children cgroups under child2, but not child2 itself + # ] + # fields = ["memory.*usage*", "memory.limit_in_bytes"] + +# [[inputs.cgroup]] + # paths = [ + # "/cgroup/cpu", # root cgroup + # "/cgroup/cpu/*", # all container cgroups + # "/cgroup/cpu/*/*", # all children cgroups under each container cgroup + # ] + # fields = ["cpuacct.usage", "cpu.cfs_period_us", "cpu.cfs_quota_us"] +``` diff --git a/plugins/inputs/cgroup/cgroup.go b/plugins/inputs/cgroup/cgroup.go new file mode 100644 index 000000000..df8f9d915 --- /dev/null +++ b/plugins/inputs/cgroup/cgroup.go @@ -0,0 +1,292 @@ +// +build linux + +package cgroup + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "path/filepath" + "regexp" + "strconv" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const metricName = "cgroup" + +type CGroup struct { + Paths []string `toml:"paths"` + Files []string `toml:"fields"` + FlushScope int `toml:"flush_scope"` +} + +var sampleConfig = ` + # paths = [ + # "/cgroup/memory", + # "/cgroup/memory/child1", + # "/cgroup/memory/child2/*", + # ] + # fields = ["memory.*usage*", "memory.limit_in_bytes"] +` + +func (g *CGroup) SampleConfig() string { + return sampleConfig +} + +func (g *CGroup) Description() string { + return "Read specific statistics per cgroup" +} + +func (g *CGroup) Gather(acc telegraf.Accumulator) error { + list := make(chan pathInfo) + go g.generateDirs(list) + + for dir := range list { + if dir.err != nil { + return dir.err + } + if err := g.gatherDir(dir.path, acc); err != nil { + return err + } + } + + return nil +} + +func (g *CGroup) gatherDir(dir string, acc telegraf.Accumulator) error { + fields := make(map[string]interface{}) + + list := make(chan pathInfo) + go g.generateFiles(dir, list) + + for file := range list { + if file.err != nil { + return file.err + } + + raw, err := ioutil.ReadFile(file.path) + if err != nil { + return err + } + if len(raw) == 0 { + continue + } + + fd := fileData{data: raw, path: file.path} + if err := fd.parse(fields); err != nil { + return err + } + } + + tags := map[string]string{"path": dir} + + if g.FlushScope <= 0 { + acc.AddFields(metricName, fields, tags) + return nil + } + writeWithBatches(acc, fields, tags, g.FlushScope) + + return nil +} + +func writeWithBatches(acc telegraf.Accumulator, fields map[string]interface{}, tags map[string]string, scope int) { + for len(fields) > 0 { + batch := make(map[string]interface{}) + + for k, v := range fields { + batch[k] = v + delete(fields, k) + if len(batch) == scope || len(fields) == 0 { + break + } + } + + acc.AddFields(metricName, batch, tags) + } +} + +// ====================================================================== + +type pathInfo struct { + path string + err error +} + +func isDir(path string) (bool, error) { + result, err := os.Stat(path) + if err != nil { + return false, err + } + return result.IsDir(), nil +} + +func (g *CGroup) generateDirs(list chan<- pathInfo) { + for _, dir := range g.Paths { + // getting all dirs that match the pattern 'dir' + items, err := filepath.Glob(dir) + if err != nil { + list <- pathInfo{err: err} + return + } + + for _, item := range items { + ok, err := isDir(item) + if err != nil { + list <- pathInfo{err: err} + return + } + // supply only dirs + if ok { + list <- pathInfo{path: item} + } + } + } + close(list) +} + +func (g *CGroup) generateFiles(dir string, list chan<- pathInfo) { + for _, file := range g.Files { + // getting all file paths that match the pattern 'dir + file' + // path.Base make sure that file variable does not contains part of path + items, err := filepath.Glob(path.Join(dir, path.Base(file))) + if err != nil { + list <- pathInfo{err: err} + return + } + + for _, item := range items { + ok, err := isDir(item) + if err != nil { + list <- pathInfo{err: err} + return + } + // supply only files not dirs + if !ok { + list <- pathInfo{path: item} + } + } + } + close(list) +} + +// ====================================================================== + +type fileData struct { + data []byte + path string +} + +func (fd *fileData) format() (*fileFormat, error) { + for _, ff := range fileFormats { + ok, err := ff.match(fd.data) + if err != nil { + return nil, err + } + if ok { + return &ff, nil + } + } + + return nil, fmt.Errorf("%v: unknown file format", fd.path) +} + +func (fd *fileData) parse(fields map[string]interface{}) error { + format, err := fd.format() + if err != nil { + return err + } + + format.parser(filepath.Base(fd.path), fields, fd.data) + return nil +} + +// ====================================================================== + +type fileFormat struct { + name string + pattern string + parser func(measurement string, fields map[string]interface{}, b []byte) +} + +const keyPattern = "[[:alpha:]_]+" +const valuePattern = "[\\d-]+" + +var fileFormats = [...]fileFormat{ + // VAL\n + fileFormat{ + name: "Single value", + pattern: "^" + valuePattern + "\n$", + parser: func(measurement string, fields map[string]interface{}, b []byte) { + re := regexp.MustCompile("^(" + valuePattern + ")\n$") + matches := re.FindAllStringSubmatch(string(b), -1) + fields[measurement] = numberOrString(matches[0][1]) + }, + }, + // VAL0\n + // VAL1\n + // ... + fileFormat{ + name: "New line separated values", + pattern: "^(" + valuePattern + "\n){2,}$", + parser: func(measurement string, fields map[string]interface{}, b []byte) { + re := regexp.MustCompile("(" + valuePattern + ")\n") + matches := re.FindAllStringSubmatch(string(b), -1) + for i, v := range matches { + fields[measurement+"."+strconv.Itoa(i)] = numberOrString(v[1]) + } + }, + }, + // VAL0 VAL1 ...\n + fileFormat{ + name: "Space separated values", + pattern: "^(" + valuePattern + " )+\n$", + parser: func(measurement string, fields map[string]interface{}, b []byte) { + re := regexp.MustCompile("(" + valuePattern + ") ") + matches := re.FindAllStringSubmatch(string(b), -1) + for i, v := range matches { + fields[measurement+"."+strconv.Itoa(i)] = numberOrString(v[1]) + } + }, + }, + // KEY0 VAL0\n + // KEY1 VAL1\n + // ... + fileFormat{ + name: "New line separated key-space-value's", + pattern: "^(" + keyPattern + " " + valuePattern + "\n)+$", + parser: func(measurement string, fields map[string]interface{}, b []byte) { + re := regexp.MustCompile("(" + keyPattern + ") (" + valuePattern + ")\n") + matches := re.FindAllStringSubmatch(string(b), -1) + for _, v := range matches { + fields[measurement+"."+v[1]] = numberOrString(v[2]) + } + }, + }, +} + +func numberOrString(s string) interface{} { + i, err := strconv.Atoi(s) + if err == nil { + return i + } + + return s +} + +func (f fileFormat) match(b []byte) (bool, error) { + ok, err := regexp.Match(f.pattern, b) + if err != nil { + return false, err + } + if ok { + return true, nil + } + return false, nil +} + +func init() { + inputs.Add("cgroup", func() telegraf.Input { return &CGroup{} }) +} diff --git a/plugins/inputs/cgroup/cgroup_notlinux.go b/plugins/inputs/cgroup/cgroup_notlinux.go new file mode 100644 index 000000000..661f99f5c --- /dev/null +++ b/plugins/inputs/cgroup/cgroup_notlinux.go @@ -0,0 +1,3 @@ +// +build !linux + +package cgroup diff --git a/plugins/inputs/cgroup/cgroup_test.go b/plugins/inputs/cgroup/cgroup_test.go new file mode 100644 index 000000000..206b51f6d --- /dev/null +++ b/plugins/inputs/cgroup/cgroup_test.go @@ -0,0 +1,182 @@ +// +build linux + +package cgroup + +import ( + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +var cg1 = &CGroup{ + Paths: []string{"testdata/memory"}, + Files: []string{ + "memory.empty", + "memory.max_usage_in_bytes", + "memory.limit_in_bytes", + "memory.stat", + "memory.use_hierarchy", + "notify_on_release", + }, +} + +func TestCgroupStatistics_1(t *testing.T) { + var acc testutil.Accumulator + + err := cg1.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "path": "testdata/memory", + } + fields := map[string]interface{}{ + "memory.stat.cache": 1739362304123123123, + "memory.stat.rss": 1775325184, + "memory.stat.rss_huge": 778043392, + "memory.stat.mapped_file": 421036032, + "memory.stat.dirty": -307200, + "memory.max_usage_in_bytes.0": 0, + "memory.max_usage_in_bytes.1": -1, + "memory.max_usage_in_bytes.2": 2, + "memory.limit_in_bytes": 223372036854771712, + "memory.use_hierarchy": "12-781", + "notify_on_release": 0, + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) +} + +// ====================================================================== + +var cg2 = &CGroup{ + Paths: []string{"testdata/cpu"}, + Files: []string{"cpuacct.usage_percpu"}, +} + +func TestCgroupStatistics_2(t *testing.T) { + var acc testutil.Accumulator + + err := cg2.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "path": "testdata/cpu", + } + fields := map[string]interface{}{ + "cpuacct.usage_percpu.0": -1452543795404, + "cpuacct.usage_percpu.1": 1376681271659, + "cpuacct.usage_percpu.2": 1450950799997, + "cpuacct.usage_percpu.3": -1473113374257, + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) +} + +// ====================================================================== + +var cg3 = &CGroup{ + Paths: []string{"testdata/memory/*"}, + Files: []string{"memory.limit_in_bytes"}, +} + +func TestCgroupStatistics_3(t *testing.T) { + var acc testutil.Accumulator + + err := cg3.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "path": "testdata/memory/group_1", + } + fields := map[string]interface{}{ + "memory.limit_in_bytes": 223372036854771712, + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + + tags = map[string]string{ + "path": "testdata/memory/group_2", + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) +} + +// ====================================================================== + +var cg4 = &CGroup{ + Paths: []string{"testdata/memory/*/*", "testdata/memory/group_2"}, + Files: []string{"memory.limit_in_bytes"}, +} + +func TestCgroupStatistics_4(t *testing.T) { + var acc testutil.Accumulator + + err := cg4.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "path": "testdata/memory/group_1/group_1_1", + } + fields := map[string]interface{}{ + "memory.limit_in_bytes": 223372036854771712, + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + + tags = map[string]string{ + "path": "testdata/memory/group_1/group_1_2", + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + + tags = map[string]string{ + "path": "testdata/memory/group_2", + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) +} + +// ====================================================================== + +var cg5 = &CGroup{ + Paths: []string{"testdata/memory/*/group_1_1"}, + Files: []string{"memory.limit_in_bytes"}, +} + +func TestCgroupStatistics_5(t *testing.T) { + var acc testutil.Accumulator + + err := cg5.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "path": "testdata/memory/group_1/group_1_1", + } + fields := map[string]interface{}{ + "memory.limit_in_bytes": 223372036854771712, + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + + tags = map[string]string{ + "path": "testdata/memory/group_2/group_1_1", + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) +} + +// ====================================================================== + +var cg6 = &CGroup{ + Paths: []string{"testdata/memory"}, + Files: []string{"memory.us*", "*/memory.kmem.*"}, +} + +func TestCgroupStatistics_6(t *testing.T) { + var acc testutil.Accumulator + + err := cg6.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "path": "testdata/memory", + } + fields := map[string]interface{}{ + "memory.usage_in_bytes": 3513667584, + "memory.use_hierarchy": "12-781", + "memory.kmem.limit_in_bytes": 9223372036854771712, + } + acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) +} diff --git a/plugins/inputs/cgroup/testdata/blkio/blkio.io_serviced b/plugins/inputs/cgroup/testdata/blkio/blkio.io_serviced new file mode 100644 index 000000000..4b28cf721 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/blkio/blkio.io_serviced @@ -0,0 +1 @@ +Total 0 diff --git a/plugins/inputs/cgroup/testdata/blkio/blkio.throttle.io_serviced b/plugins/inputs/cgroup/testdata/blkio/blkio.throttle.io_serviced new file mode 100644 index 000000000..519480715 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/blkio/blkio.throttle.io_serviced @@ -0,0 +1,131 @@ +11:0 Read 0 +11:0 Write 0 +11:0 Sync 0 +11:0 Async 0 +11:0 Total 0 +8:0 Read 49134 +8:0 Write 216703 +8:0 Sync 177906 +8:0 Async 87931 +8:0 Total 265837 +7:7 Read 0 +7:7 Write 0 +7:7 Sync 0 +7:7 Async 0 +7:7 Total 0 +7:6 Read 0 +7:6 Write 0 +7:6 Sync 0 +7:6 Async 0 +7:6 Total 0 +7:5 Read 0 +7:5 Write 0 +7:5 Sync 0 +7:5 Async 0 +7:5 Total 0 +7:4 Read 0 +7:4 Write 0 +7:4 Sync 0 +7:4 Async 0 +7:4 Total 0 +7:3 Read 0 +7:3 Write 0 +7:3 Sync 0 +7:3 Async 0 +7:3 Total 0 +7:2 Read 0 +7:2 Write 0 +7:2 Sync 0 +7:2 Async 0 +7:2 Total 0 +7:1 Read 0 +7:1 Write 0 +7:1 Sync 0 +7:1 Async 0 +7:1 Total 0 +7:0 Read 0 +7:0 Write 0 +7:0 Sync 0 +7:0 Async 0 +7:0 Total 0 +1:15 Read 3 +1:15 Write 0 +1:15 Sync 0 +1:15 Async 3 +1:15 Total 3 +1:14 Read 3 +1:14 Write 0 +1:14 Sync 0 +1:14 Async 3 +1:14 Total 3 +1:13 Read 3 +1:13 Write 0 +1:13 Sync 0 +1:13 Async 3 +1:13 Total 3 +1:12 Read 3 +1:12 Write 0 +1:12 Sync 0 +1:12 Async 3 +1:12 Total 3 +1:11 Read 3 +1:11 Write 0 +1:11 Sync 0 +1:11 Async 3 +1:11 Total 3 +1:10 Read 3 +1:10 Write 0 +1:10 Sync 0 +1:10 Async 3 +1:10 Total 3 +1:9 Read 3 +1:9 Write 0 +1:9 Sync 0 +1:9 Async 3 +1:9 Total 3 +1:8 Read 3 +1:8 Write 0 +1:8 Sync 0 +1:8 Async 3 +1:8 Total 3 +1:7 Read 3 +1:7 Write 0 +1:7 Sync 0 +1:7 Async 3 +1:7 Total 3 +1:6 Read 3 +1:6 Write 0 +1:6 Sync 0 +1:6 Async 3 +1:6 Total 3 +1:5 Read 3 +1:5 Write 0 +1:5 Sync 0 +1:5 Async 3 +1:5 Total 3 +1:4 Read 3 +1:4 Write 0 +1:4 Sync 0 +1:4 Async 3 +1:4 Total 3 +1:3 Read 3 +1:3 Write 0 +1:3 Sync 0 +1:3 Async 3 +1:3 Total 3 +1:2 Read 3 +1:2 Write 0 +1:2 Sync 0 +1:2 Async 3 +1:2 Total 3 +1:1 Read 3 +1:1 Write 0 +1:1 Sync 0 +1:1 Async 3 +1:1 Total 3 +1:0 Read 3 +1:0 Write 0 +1:0 Sync 0 +1:0 Async 3 +1:0 Total 3 +Total 265885 diff --git a/plugins/inputs/cgroup/testdata/cpu/cpu.cfs_quota_us b/plugins/inputs/cgroup/testdata/cpu/cpu.cfs_quota_us new file mode 100644 index 000000000..3a2e3f498 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/cpu/cpu.cfs_quota_us @@ -0,0 +1 @@ +-1 diff --git a/plugins/inputs/cgroup/testdata/cpu/cpuacct.usage_percpu b/plugins/inputs/cgroup/testdata/cpu/cpuacct.usage_percpu new file mode 100644 index 000000000..36737768a --- /dev/null +++ b/plugins/inputs/cgroup/testdata/cpu/cpuacct.usage_percpu @@ -0,0 +1 @@ +-1452543795404 1376681271659 1450950799997 -1473113374257 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.limit_in_bytes new file mode 100644 index 000000000..78169435f --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.limit_in_bytes @@ -0,0 +1 @@ +223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.stat b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.stat new file mode 100644 index 000000000..a5493b9b2 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_1/memory.stat @@ -0,0 +1,5 @@ +cache 1739362304123123123 +rss 1775325184 +rss_huge 778043392 +mapped_file 421036032 +dirty -307200 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.limit_in_bytes new file mode 100644 index 000000000..78169435f --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.limit_in_bytes @@ -0,0 +1 @@ +223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.stat b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.stat new file mode 100644 index 000000000..a5493b9b2 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/group_1_2/memory.stat @@ -0,0 +1,5 @@ +cache 1739362304123123123 +rss 1775325184 +rss_huge 778043392 +mapped_file 421036032 +dirty -307200 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.limit_in_bytes new file mode 100644 index 000000000..564113cfa --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.limit_in_bytes @@ -0,0 +1 @@ +9223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.max_usage_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.max_usage_in_bytes new file mode 100644 index 000000000..573541ac9 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/memory.kmem.max_usage_in_bytes @@ -0,0 +1 @@ +0 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/memory.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_1/memory.limit_in_bytes new file mode 100644 index 000000000..78169435f --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/memory.limit_in_bytes @@ -0,0 +1 @@ +223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/group_1/memory.stat b/plugins/inputs/cgroup/testdata/memory/group_1/memory.stat new file mode 100644 index 000000000..a5493b9b2 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_1/memory.stat @@ -0,0 +1,5 @@ +cache 1739362304123123123 +rss 1775325184 +rss_huge 778043392 +mapped_file 421036032 +dirty -307200 diff --git a/plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.limit_in_bytes new file mode 100644 index 000000000..78169435f --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.limit_in_bytes @@ -0,0 +1 @@ +223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.stat b/plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.stat new file mode 100644 index 000000000..a5493b9b2 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_2/group_1_1/memory.stat @@ -0,0 +1,5 @@ +cache 1739362304123123123 +rss 1775325184 +rss_huge 778043392 +mapped_file 421036032 +dirty -307200 diff --git a/plugins/inputs/cgroup/testdata/memory/group_2/memory.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/group_2/memory.limit_in_bytes new file mode 100644 index 000000000..78169435f --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_2/memory.limit_in_bytes @@ -0,0 +1 @@ +223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/group_2/memory.stat b/plugins/inputs/cgroup/testdata/memory/group_2/memory.stat new file mode 100644 index 000000000..a5493b9b2 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/group_2/memory.stat @@ -0,0 +1,5 @@ +cache 1739362304123123123 +rss 1775325184 +rss_huge 778043392 +mapped_file 421036032 +dirty -307200 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.empty b/plugins/inputs/cgroup/testdata/memory/memory.empty new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/inputs/cgroup/testdata/memory/memory.kmem.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/memory.kmem.limit_in_bytes new file mode 100644 index 000000000..564113cfa --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.kmem.limit_in_bytes @@ -0,0 +1 @@ +9223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.limit_in_bytes b/plugins/inputs/cgroup/testdata/memory/memory.limit_in_bytes new file mode 100644 index 000000000..78169435f --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.limit_in_bytes @@ -0,0 +1 @@ +223372036854771712 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.max_usage_in_bytes b/plugins/inputs/cgroup/testdata/memory/memory.max_usage_in_bytes new file mode 100644 index 000000000..712313d3d --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.max_usage_in_bytes @@ -0,0 +1,3 @@ +0 +-1 +2 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.numa_stat b/plugins/inputs/cgroup/testdata/memory/memory.numa_stat new file mode 100644 index 000000000..e7c54ebb5 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.numa_stat @@ -0,0 +1,8 @@ +total=858067 N0=858067 +file=406254 N0=406254 +anon=451792 N0=451792 +unevictable=21 N0=21 +hierarchical_total=858067 N0=858067 +hierarchical_file=406254 N0=406254 +hierarchical_anon=451792 N0=451792 +hierarchical_unevictable=21 N0=21 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.stat b/plugins/inputs/cgroup/testdata/memory/memory.stat new file mode 100644 index 000000000..a5493b9b2 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.stat @@ -0,0 +1,5 @@ +cache 1739362304123123123 +rss 1775325184 +rss_huge 778043392 +mapped_file 421036032 +dirty -307200 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.usage_in_bytes b/plugins/inputs/cgroup/testdata/memory/memory.usage_in_bytes new file mode 100644 index 000000000..661151f51 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.usage_in_bytes @@ -0,0 +1 @@ +3513667584 diff --git a/plugins/inputs/cgroup/testdata/memory/memory.use_hierarchy b/plugins/inputs/cgroup/testdata/memory/memory.use_hierarchy new file mode 100644 index 000000000..07cbc8fc6 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/memory.use_hierarchy @@ -0,0 +1 @@ +12-781 diff --git a/plugins/inputs/cgroup/testdata/memory/notify_on_release b/plugins/inputs/cgroup/testdata/memory/notify_on_release new file mode 100644 index 000000000..573541ac9 --- /dev/null +++ b/plugins/inputs/cgroup/testdata/memory/notify_on_release @@ -0,0 +1 @@ +0 From 9c2ca805da85c9e9792477e46dc218c379fa6b85 Mon Sep 17 00:00:00 2001 From: Vladimir Sagan Date: Thu, 23 Jun 2016 11:11:44 +0300 Subject: [PATCH 05/11] Remove flush_scope logic --- plugins/inputs/cgroup/README.md | 1 - plugins/inputs/cgroup/cgroup.go | 27 +++------------------------ 2 files changed, 3 insertions(+), 25 deletions(-) diff --git a/plugins/inputs/cgroup/README.md b/plugins/inputs/cgroup/README.md index a8fd1243e..283b17634 100644 --- a/plugins/inputs/cgroup/README.md +++ b/plugins/inputs/cgroup/README.md @@ -41,7 +41,6 @@ All measurements have the following tags: ``` # [[inputs.cgroup]] - # flush_scope = 10 # optional (the fields will be divided into parts of 10 items) # paths = [ # "/cgroup/memory", # root cgroup # "/cgroup/memory/child1", # container cgroup diff --git a/plugins/inputs/cgroup/cgroup.go b/plugins/inputs/cgroup/cgroup.go index df8f9d915..57ea67a06 100644 --- a/plugins/inputs/cgroup/cgroup.go +++ b/plugins/inputs/cgroup/cgroup.go @@ -18,9 +18,8 @@ import ( const metricName = "cgroup" type CGroup struct { - Paths []string `toml:"paths"` - Files []string `toml:"fields"` - FlushScope int `toml:"flush_scope"` + Paths []string `toml:"paths"` + Files []string `toml:"fields"` } var sampleConfig = ` @@ -83,31 +82,11 @@ func (g *CGroup) gatherDir(dir string, acc telegraf.Accumulator) error { tags := map[string]string{"path": dir} - if g.FlushScope <= 0 { - acc.AddFields(metricName, fields, tags) - return nil - } - writeWithBatches(acc, fields, tags, g.FlushScope) + acc.AddFields(metricName, fields, tags) return nil } -func writeWithBatches(acc telegraf.Accumulator, fields map[string]interface{}, tags map[string]string, scope int) { - for len(fields) > 0 { - batch := make(map[string]interface{}) - - for k, v := range fields { - batch[k] = v - delete(fields, k) - if len(batch) == scope || len(fields) == 0 { - break - } - } - - acc.AddFields(metricName, batch, tags) - } -} - // ====================================================================== type pathInfo struct { From d641c42029df4e06dde535928dc5ab735c86c861 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 23 Jun 2016 10:23:31 +0100 Subject: [PATCH 06/11] cgroup: change fields -> files closes #1103 closes #1350 --- plugins/inputs/cgroup/cgroup.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/cgroup/cgroup.go b/plugins/inputs/cgroup/cgroup.go index 57ea67a06..341ada5a1 100644 --- a/plugins/inputs/cgroup/cgroup.go +++ b/plugins/inputs/cgroup/cgroup.go @@ -19,16 +19,19 @@ const metricName = "cgroup" type CGroup struct { Paths []string `toml:"paths"` - Files []string `toml:"fields"` + Files []string `toml:"files"` } var sampleConfig = ` + ## Directories in which to look for files, globs are supported. # paths = [ # "/cgroup/memory", # "/cgroup/memory/child1", # "/cgroup/memory/child2/*", # ] - # fields = ["memory.*usage*", "memory.limit_in_bytes"] + ## cgroup stat fields, as file names, globs are supported. + ## these file names are appended to each path from above. + # files = ["memory.*usage*", "memory.limit_in_bytes"] ` func (g *CGroup) SampleConfig() string { From 30cc00d11b9d873d95ae568cf43b9a5a8e8d307d Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 23 Jun 2016 10:28:38 +0100 Subject: [PATCH 07/11] Update changelog, etc/telegraf.conf --- CHANGELOG.md | 1 + etc/telegraf.conf | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ee96aaa62..bed972e5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ should now look like: - [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez! - [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin. - [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests. +- [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin. ### Bugfixes diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 054bcf62b..98138eef4 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -679,6 +679,13 @@ # # ## set cluster_health to true when you want to also obtain cluster level stats # cluster_health = false +# +# ## Optional SSL Config +# # ssl_ca = "/etc/telegraf/ca.pem" +# # ssl_cert = "/etc/telegraf/cert.pem" +# # ssl_key = "/etc/telegraf/key.pem" +# ## Use SSL but skip chain & host verification +# # insecure_skip_verify = false # # Read metrics from one or more commands that can output to stdout @@ -1259,10 +1266,15 @@ # ## An array of urls to scrape metrics from. # urls = ["http://localhost:9100/metrics"] # -# ## Use SSL but skip chain & host verification -# # insecure_skip_verify = false # ## Use bearer token for authorization # # bearer_token = /path/to/bearer/token +# +# ## Optional SSL Config +# # ssl_ca = /path/to/cafile +# # ssl_cert = /path/to/certfile +# # ssl_key = /path/to/keyfile +# ## Use SSL but skip chain & host verification +# # insecure_skip_verify = false # # Reads last_run_summary.yaml file and converts to measurments From f7e057ec552d6f18aa3093a38c53c64f2846bce3 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 23 Jun 2016 11:41:37 +0100 Subject: [PATCH 08/11] refactor cgroup build so non-linux systems see plugin also updated the README for the fields->files change. --- etc/telegraf.conf | 13 ++ plugins/inputs/cgroup/README.md | 4 +- plugins/inputs/cgroup/cgroup.go | 239 ---------------------- plugins/inputs/cgroup/cgroup_linux.go | 244 +++++++++++++++++++++++ plugins/inputs/cgroup/cgroup_notlinux.go | 8 + 5 files changed, 267 insertions(+), 241 deletions(-) create mode 100644 plugins/inputs/cgroup/cgroup_linux.go diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 98138eef4..c9011536a 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -526,6 +526,19 @@ # socket_suffix = "asok" +# # Read specific statistics per cgroup +# [[inputs.cgroup]] +# ## Directories in which to look for files, globs are supported. +# # paths = [ +# # "/cgroup/memory", +# # "/cgroup/memory/child1", +# # "/cgroup/memory/child2/*", +# # ] +# ## cgroup stat fields, as file names, globs are supported. +# ## these file names are appended to each path from above. +# # files = ["memory.*usage*", "memory.limit_in_bytes"] + + # # Pull Metric Statistics from Amazon CloudWatch # [[inputs.cloudwatch]] # ## Amazon Region diff --git a/plugins/inputs/cgroup/README.md b/plugins/inputs/cgroup/README.md index 283b17634..ab06342bf 100644 --- a/plugins/inputs/cgroup/README.md +++ b/plugins/inputs/cgroup/README.md @@ -46,7 +46,7 @@ All measurements have the following tags: # "/cgroup/memory/child1", # container cgroup # "/cgroup/memory/child2/*", # all children cgroups under child2, but not child2 itself # ] - # fields = ["memory.*usage*", "memory.limit_in_bytes"] + # files = ["memory.*usage*", "memory.limit_in_bytes"] # [[inputs.cgroup]] # paths = [ @@ -54,5 +54,5 @@ All measurements have the following tags: # "/cgroup/cpu/*", # all container cgroups # "/cgroup/cpu/*/*", # all children cgroups under each container cgroup # ] - # fields = ["cpuacct.usage", "cpu.cfs_period_us", "cpu.cfs_quota_us"] + # files = ["cpuacct.usage", "cpu.cfs_period_us", "cpu.cfs_quota_us"] ``` diff --git a/plugins/inputs/cgroup/cgroup.go b/plugins/inputs/cgroup/cgroup.go index 341ada5a1..e38b6a4c1 100644 --- a/plugins/inputs/cgroup/cgroup.go +++ b/plugins/inputs/cgroup/cgroup.go @@ -1,22 +1,10 @@ -// +build linux - package cgroup import ( - "fmt" - "io/ioutil" - "os" - "path" - "path/filepath" - "regexp" - "strconv" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) -const metricName = "cgroup" - type CGroup struct { Paths []string `toml:"paths"` Files []string `toml:"files"` @@ -42,233 +30,6 @@ func (g *CGroup) Description() string { return "Read specific statistics per cgroup" } -func (g *CGroup) Gather(acc telegraf.Accumulator) error { - list := make(chan pathInfo) - go g.generateDirs(list) - - for dir := range list { - if dir.err != nil { - return dir.err - } - if err := g.gatherDir(dir.path, acc); err != nil { - return err - } - } - - return nil -} - -func (g *CGroup) gatherDir(dir string, acc telegraf.Accumulator) error { - fields := make(map[string]interface{}) - - list := make(chan pathInfo) - go g.generateFiles(dir, list) - - for file := range list { - if file.err != nil { - return file.err - } - - raw, err := ioutil.ReadFile(file.path) - if err != nil { - return err - } - if len(raw) == 0 { - continue - } - - fd := fileData{data: raw, path: file.path} - if err := fd.parse(fields); err != nil { - return err - } - } - - tags := map[string]string{"path": dir} - - acc.AddFields(metricName, fields, tags) - - return nil -} - -// ====================================================================== - -type pathInfo struct { - path string - err error -} - -func isDir(path string) (bool, error) { - result, err := os.Stat(path) - if err != nil { - return false, err - } - return result.IsDir(), nil -} - -func (g *CGroup) generateDirs(list chan<- pathInfo) { - for _, dir := range g.Paths { - // getting all dirs that match the pattern 'dir' - items, err := filepath.Glob(dir) - if err != nil { - list <- pathInfo{err: err} - return - } - - for _, item := range items { - ok, err := isDir(item) - if err != nil { - list <- pathInfo{err: err} - return - } - // supply only dirs - if ok { - list <- pathInfo{path: item} - } - } - } - close(list) -} - -func (g *CGroup) generateFiles(dir string, list chan<- pathInfo) { - for _, file := range g.Files { - // getting all file paths that match the pattern 'dir + file' - // path.Base make sure that file variable does not contains part of path - items, err := filepath.Glob(path.Join(dir, path.Base(file))) - if err != nil { - list <- pathInfo{err: err} - return - } - - for _, item := range items { - ok, err := isDir(item) - if err != nil { - list <- pathInfo{err: err} - return - } - // supply only files not dirs - if !ok { - list <- pathInfo{path: item} - } - } - } - close(list) -} - -// ====================================================================== - -type fileData struct { - data []byte - path string -} - -func (fd *fileData) format() (*fileFormat, error) { - for _, ff := range fileFormats { - ok, err := ff.match(fd.data) - if err != nil { - return nil, err - } - if ok { - return &ff, nil - } - } - - return nil, fmt.Errorf("%v: unknown file format", fd.path) -} - -func (fd *fileData) parse(fields map[string]interface{}) error { - format, err := fd.format() - if err != nil { - return err - } - - format.parser(filepath.Base(fd.path), fields, fd.data) - return nil -} - -// ====================================================================== - -type fileFormat struct { - name string - pattern string - parser func(measurement string, fields map[string]interface{}, b []byte) -} - -const keyPattern = "[[:alpha:]_]+" -const valuePattern = "[\\d-]+" - -var fileFormats = [...]fileFormat{ - // VAL\n - fileFormat{ - name: "Single value", - pattern: "^" + valuePattern + "\n$", - parser: func(measurement string, fields map[string]interface{}, b []byte) { - re := regexp.MustCompile("^(" + valuePattern + ")\n$") - matches := re.FindAllStringSubmatch(string(b), -1) - fields[measurement] = numberOrString(matches[0][1]) - }, - }, - // VAL0\n - // VAL1\n - // ... - fileFormat{ - name: "New line separated values", - pattern: "^(" + valuePattern + "\n){2,}$", - parser: func(measurement string, fields map[string]interface{}, b []byte) { - re := regexp.MustCompile("(" + valuePattern + ")\n") - matches := re.FindAllStringSubmatch(string(b), -1) - for i, v := range matches { - fields[measurement+"."+strconv.Itoa(i)] = numberOrString(v[1]) - } - }, - }, - // VAL0 VAL1 ...\n - fileFormat{ - name: "Space separated values", - pattern: "^(" + valuePattern + " )+\n$", - parser: func(measurement string, fields map[string]interface{}, b []byte) { - re := regexp.MustCompile("(" + valuePattern + ") ") - matches := re.FindAllStringSubmatch(string(b), -1) - for i, v := range matches { - fields[measurement+"."+strconv.Itoa(i)] = numberOrString(v[1]) - } - }, - }, - // KEY0 VAL0\n - // KEY1 VAL1\n - // ... - fileFormat{ - name: "New line separated key-space-value's", - pattern: "^(" + keyPattern + " " + valuePattern + "\n)+$", - parser: func(measurement string, fields map[string]interface{}, b []byte) { - re := regexp.MustCompile("(" + keyPattern + ") (" + valuePattern + ")\n") - matches := re.FindAllStringSubmatch(string(b), -1) - for _, v := range matches { - fields[measurement+"."+v[1]] = numberOrString(v[2]) - } - }, - }, -} - -func numberOrString(s string) interface{} { - i, err := strconv.Atoi(s) - if err == nil { - return i - } - - return s -} - -func (f fileFormat) match(b []byte) (bool, error) { - ok, err := regexp.Match(f.pattern, b) - if err != nil { - return false, err - } - if ok { - return true, nil - } - return false, nil -} - func init() { inputs.Add("cgroup", func() telegraf.Input { return &CGroup{} }) } diff --git a/plugins/inputs/cgroup/cgroup_linux.go b/plugins/inputs/cgroup/cgroup_linux.go new file mode 100644 index 000000000..e8ba6f881 --- /dev/null +++ b/plugins/inputs/cgroup/cgroup_linux.go @@ -0,0 +1,244 @@ +// +build linux + +package cgroup + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "path/filepath" + "regexp" + "strconv" + + "github.com/influxdata/telegraf" +) + +const metricName = "cgroup" + +func (g *CGroup) Gather(acc telegraf.Accumulator) error { + list := make(chan pathInfo) + go g.generateDirs(list) + + for dir := range list { + if dir.err != nil { + return dir.err + } + if err := g.gatherDir(dir.path, acc); err != nil { + return err + } + } + + return nil +} + +func (g *CGroup) gatherDir(dir string, acc telegraf.Accumulator) error { + fields := make(map[string]interface{}) + + list := make(chan pathInfo) + go g.generateFiles(dir, list) + + for file := range list { + if file.err != nil { + return file.err + } + + raw, err := ioutil.ReadFile(file.path) + if err != nil { + return err + } + if len(raw) == 0 { + continue + } + + fd := fileData{data: raw, path: file.path} + if err := fd.parse(fields); err != nil { + return err + } + } + + tags := map[string]string{"path": dir} + + acc.AddFields(metricName, fields, tags) + + return nil +} + +// ====================================================================== + +type pathInfo struct { + path string + err error +} + +func isDir(path string) (bool, error) { + result, err := os.Stat(path) + if err != nil { + return false, err + } + return result.IsDir(), nil +} + +func (g *CGroup) generateDirs(list chan<- pathInfo) { + for _, dir := range g.Paths { + // getting all dirs that match the pattern 'dir' + items, err := filepath.Glob(dir) + if err != nil { + list <- pathInfo{err: err} + return + } + + for _, item := range items { + ok, err := isDir(item) + if err != nil { + list <- pathInfo{err: err} + return + } + // supply only dirs + if ok { + list <- pathInfo{path: item} + } + } + } + close(list) +} + +func (g *CGroup) generateFiles(dir string, list chan<- pathInfo) { + for _, file := range g.Files { + // getting all file paths that match the pattern 'dir + file' + // path.Base make sure that file variable does not contains part of path + items, err := filepath.Glob(path.Join(dir, path.Base(file))) + if err != nil { + list <- pathInfo{err: err} + return + } + + for _, item := range items { + ok, err := isDir(item) + if err != nil { + list <- pathInfo{err: err} + return + } + // supply only files not dirs + if !ok { + list <- pathInfo{path: item} + } + } + } + close(list) +} + +// ====================================================================== + +type fileData struct { + data []byte + path string +} + +func (fd *fileData) format() (*fileFormat, error) { + for _, ff := range fileFormats { + ok, err := ff.match(fd.data) + if err != nil { + return nil, err + } + if ok { + return &ff, nil + } + } + + return nil, fmt.Errorf("%v: unknown file format", fd.path) +} + +func (fd *fileData) parse(fields map[string]interface{}) error { + format, err := fd.format() + if err != nil { + return err + } + + format.parser(filepath.Base(fd.path), fields, fd.data) + return nil +} + +// ====================================================================== + +type fileFormat struct { + name string + pattern string + parser func(measurement string, fields map[string]interface{}, b []byte) +} + +const keyPattern = "[[:alpha:]_]+" +const valuePattern = "[\\d-]+" + +var fileFormats = [...]fileFormat{ + // VAL\n + fileFormat{ + name: "Single value", + pattern: "^" + valuePattern + "\n$", + parser: func(measurement string, fields map[string]interface{}, b []byte) { + re := regexp.MustCompile("^(" + valuePattern + ")\n$") + matches := re.FindAllStringSubmatch(string(b), -1) + fields[measurement] = numberOrString(matches[0][1]) + }, + }, + // VAL0\n + // VAL1\n + // ... + fileFormat{ + name: "New line separated values", + pattern: "^(" + valuePattern + "\n){2,}$", + parser: func(measurement string, fields map[string]interface{}, b []byte) { + re := regexp.MustCompile("(" + valuePattern + ")\n") + matches := re.FindAllStringSubmatch(string(b), -1) + for i, v := range matches { + fields[measurement+"."+strconv.Itoa(i)] = numberOrString(v[1]) + } + }, + }, + // VAL0 VAL1 ...\n + fileFormat{ + name: "Space separated values", + pattern: "^(" + valuePattern + " )+\n$", + parser: func(measurement string, fields map[string]interface{}, b []byte) { + re := regexp.MustCompile("(" + valuePattern + ") ") + matches := re.FindAllStringSubmatch(string(b), -1) + for i, v := range matches { + fields[measurement+"."+strconv.Itoa(i)] = numberOrString(v[1]) + } + }, + }, + // KEY0 VAL0\n + // KEY1 VAL1\n + // ... + fileFormat{ + name: "New line separated key-space-value's", + pattern: "^(" + keyPattern + " " + valuePattern + "\n)+$", + parser: func(measurement string, fields map[string]interface{}, b []byte) { + re := regexp.MustCompile("(" + keyPattern + ") (" + valuePattern + ")\n") + matches := re.FindAllStringSubmatch(string(b), -1) + for _, v := range matches { + fields[measurement+"."+v[1]] = numberOrString(v[2]) + } + }, + }, +} + +func numberOrString(s string) interface{} { + i, err := strconv.Atoi(s) + if err == nil { + return i + } + + return s +} + +func (f fileFormat) match(b []byte) (bool, error) { + ok, err := regexp.Match(f.pattern, b) + if err != nil { + return false, err + } + if ok { + return true, nil + } + return false, nil +} diff --git a/plugins/inputs/cgroup/cgroup_notlinux.go b/plugins/inputs/cgroup/cgroup_notlinux.go index 661f99f5c..2bc227410 100644 --- a/plugins/inputs/cgroup/cgroup_notlinux.go +++ b/plugins/inputs/cgroup/cgroup_notlinux.go @@ -1,3 +1,11 @@ // +build !linux package cgroup + +import ( + "github.com/influxdata/telegraf" +) + +func (g *CGroup) Gather(acc telegraf.Accumulator) error { + return nil +} From a6365a608686d833e164bb557b6527c6728e3639 Mon Sep 17 00:00:00 2001 From: Jonathan Chauncey Date: Thu, 9 Jun 2016 13:31:05 -0600 Subject: [PATCH 09/11] feat(nsq_consumer): Add input plugin to consume metrics from an nsqd topic closes #1347 closes #1369 --- CHANGELOG.md | 1 + README.md | 3 + plugins/inputs/all/all.go | 1 + plugins/inputs/nsq_consumer/README.md | 25 ++ plugins/inputs/nsq_consumer/nsq_consumer.go | 99 +++++++ .../inputs/nsq_consumer/nsq_consumer_test.go | 245 ++++++++++++++++++ 6 files changed, 374 insertions(+) create mode 100644 plugins/inputs/nsq_consumer/README.md create mode 100644 plugins/inputs/nsq_consumer/nsq_consumer.go create mode 100644 plugins/inputs/nsq_consumer/nsq_consumer_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index bed972e5c..ce2a883e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ should now look like: - [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin. - [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests. - [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin. +- [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD. ### Bugfixes diff --git a/README.md b/README.md index 425e7d701..53e672534 100644 --- a/README.md +++ b/README.md @@ -220,6 +220,9 @@ Telegraf can also collect metrics via the following service plugins: * [webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks) * [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github) * [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar) +* [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer) +* [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks) +* [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks) We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API. diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 512753b7a..529a13bae 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -41,6 +41,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/net_response" _ "github.com/influxdata/telegraf/plugins/inputs/nginx" _ "github.com/influxdata/telegraf/plugins/inputs/nsq" + _ "github.com/influxdata/telegraf/plugins/inputs/nsq_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/nstat" _ "github.com/influxdata/telegraf/plugins/inputs/ntpq" _ "github.com/influxdata/telegraf/plugins/inputs/passenger" diff --git a/plugins/inputs/nsq_consumer/README.md b/plugins/inputs/nsq_consumer/README.md new file mode 100644 index 000000000..eac494ccb --- /dev/null +++ b/plugins/inputs/nsq_consumer/README.md @@ -0,0 +1,25 @@ +# NSQ Consumer Input Plugin + +The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD +topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types. + +## Configuration + +```toml +# Read metrics from NSQD topic(s) +[[inputs.nsq_consumer]] + ## An array of NSQD HTTP API endpoints + server = "localhost:4150" + topic = "telegraf" + channel = "consumer" + max_in_flight = 100 + + ## Data format to consume. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +``` + +## Testing +The `nsq_consumer_test` mocks out the interaction with `NSQD`. It requires no outside dependencies. diff --git a/plugins/inputs/nsq_consumer/nsq_consumer.go b/plugins/inputs/nsq_consumer/nsq_consumer.go new file mode 100644 index 000000000..b227b7e50 --- /dev/null +++ b/plugins/inputs/nsq_consumer/nsq_consumer.go @@ -0,0 +1,99 @@ +package nsq_consumer + +import ( + "log" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/nsqio/go-nsq" +) + +//NSQConsumer represents the configuration of the plugin +type NSQConsumer struct { + Server string + Topic string + Channel string + MaxInFlight int + parser parsers.Parser + consumer *nsq.Consumer + acc telegraf.Accumulator +} + +var sampleConfig = ` + ## An string representing the NSQD TCP Endpoint + server = "localhost:4150" + topic = "telegraf" + channel = "consumer" + max_in_flight = 100 + + ## Data format to consume. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +func init() { + inputs.Add("nsq_consumer", func() telegraf.Input { + return &NSQConsumer{} + }) +} + +// SetParser takes the data_format from the config and finds the right parser for that format +func (n *NSQConsumer) SetParser(parser parsers.Parser) { + n.parser = parser +} + +// SampleConfig returns config values for generating a sample configuration file +func (n *NSQConsumer) SampleConfig() string { + return sampleConfig +} + +// Description prints description string +func (n *NSQConsumer) Description() string { + return "Read NSQ topic for metrics." +} + +// Start pulls data from nsq +func (n *NSQConsumer) Start(acc telegraf.Accumulator) error { + n.acc = acc + n.connect() + n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error { + metrics, err := n.parser.Parse(message.Body) + if err != nil { + log.Printf("NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error()) + return nil + } + for _, metric := range metrics { + n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + message.Finish() + return nil + }), n.MaxInFlight) + n.consumer.ConnectToNSQD(n.Server) + return nil +} + +// Stop processing messages +func (n *NSQConsumer) Stop() { + n.consumer.Stop() +} + +// Gather is a noop +func (n *NSQConsumer) Gather(acc telegraf.Accumulator) error { + return nil +} + +func (n *NSQConsumer) connect() error { + if n.consumer == nil { + config := nsq.NewConfig() + config.MaxInFlight = n.MaxInFlight + consumer, err := nsq.NewConsumer(n.Topic, n.Channel, config) + if err != nil { + return err + } + n.consumer = consumer + } + return nil +} diff --git a/plugins/inputs/nsq_consumer/nsq_consumer_test.go b/plugins/inputs/nsq_consumer/nsq_consumer_test.go new file mode 100644 index 000000000..59db675a5 --- /dev/null +++ b/plugins/inputs/nsq_consumer/nsq_consumer_test.go @@ -0,0 +1,245 @@ +package nsq_consumer + +import ( + "bufio" + "bytes" + "encoding/binary" + "io" + "log" + "net" + "strconv" + "testing" + "time" + + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + "github.com/nsqio/go-nsq" + "github.com/stretchr/testify/assert" +) + +// This test is modeled after the kafka consumer integration test +func TestReadsMetricsFromNSQ(t *testing.T) { + msgID := nsq.MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} + msg := nsq.NewMessage(msgID, []byte("cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257")) + + script := []instruction{ + // SUB + instruction{0, nsq.FrameTypeResponse, []byte("OK")}, + // IDENTIFY + instruction{0, nsq.FrameTypeResponse, []byte("OK")}, + instruction{20 * time.Millisecond, nsq.FrameTypeMessage, frameMessage(msg)}, + // needed to exit test + instruction{100 * time.Millisecond, -1, []byte("exit")}, + } + + addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:4155") + newMockNSQD(script, addr.String()) + + consumer := &NSQConsumer{ + Server: "127.0.0.1:4155", + Topic: "telegraf", + Channel: "consume", + MaxInFlight: 1, + } + + p, _ := parsers.NewInfluxParser() + consumer.SetParser(p) + var acc testutil.Accumulator + assert.Equal(t, 0, len(acc.Metrics), "There should not be any points") + if err := consumer.Start(&acc); err != nil { + t.Fatal(err.Error()) + } else { + defer consumer.Stop() + } + + waitForPoint(&acc, t) + + if len(acc.Metrics) == 1 { + point := acc.Metrics[0] + assert.Equal(t, "cpu_load_short", point.Measurement) + assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields) + assert.Equal(t, map[string]string{ + "host": "server01", + "direction": "in", + "region": "us-west", + }, point.Tags) + assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix()) + } else { + t.Errorf("No points found in accumulator, expected 1") + } + +} + +// Waits for the metric that was sent to the kafka broker to arrive at the kafka +// consumer +func waitForPoint(acc *testutil.Accumulator, t *testing.T) { + // Give the kafka container up to 2 seconds to get the point to the consumer + ticker := time.NewTicker(5 * time.Millisecond) + defer ticker.Stop() + counter := 0 + for { + select { + case <-ticker.C: + counter++ + if counter > 1000 { + t.Fatal("Waited for 5s, point never arrived to consumer") + } else if acc.NFields() == 1 { + return + } + } + } +} + +func newMockNSQD(script []instruction, addr string) *mockNSQD { + n := &mockNSQD{ + script: script, + exitChan: make(chan int), + } + + tcpListener, err := net.Listen("tcp", addr) + if err != nil { + log.Fatalf("FATAL: listen (%s) failed - %s", n.tcpAddr.String(), err) + } + n.tcpListener = tcpListener + n.tcpAddr = tcpListener.Addr().(*net.TCPAddr) + + go n.listen() + + return n +} + +// The code below allows us to mock the interactions with nsqd. This is taken from: +// https://github.com/nsqio/go-nsq/blob/master/mock_test.go +type instruction struct { + delay time.Duration + frameType int32 + body []byte +} + +type mockNSQD struct { + script []instruction + got [][]byte + tcpAddr *net.TCPAddr + tcpListener net.Listener + exitChan chan int +} + +func (n *mockNSQD) listen() { + for { + conn, err := n.tcpListener.Accept() + if err != nil { + break + } + go n.handle(conn) + } + close(n.exitChan) +} + +func (n *mockNSQD) handle(conn net.Conn) { + var idx int + buf := make([]byte, 4) + _, err := io.ReadFull(conn, buf) + if err != nil { + log.Fatalf("ERROR: failed to read protocol version - %s", err) + } + + readChan := make(chan []byte) + readDoneChan := make(chan int) + scriptTime := time.After(n.script[0].delay) + rdr := bufio.NewReader(conn) + + go func() { + for { + line, err := rdr.ReadBytes('\n') + if err != nil { + return + } + // trim the '\n' + line = line[:len(line)-1] + readChan <- line + <-readDoneChan + } + }() + + var rdyCount int + for idx < len(n.script) { + select { + case line := <-readChan: + n.got = append(n.got, line) + params := bytes.Split(line, []byte(" ")) + switch { + case bytes.Equal(params[0], []byte("IDENTIFY")): + l := make([]byte, 4) + _, err := io.ReadFull(rdr, l) + if err != nil { + log.Printf(err.Error()) + goto exit + } + size := int32(binary.BigEndian.Uint32(l)) + b := make([]byte, size) + _, err = io.ReadFull(rdr, b) + if err != nil { + log.Printf(err.Error()) + goto exit + } + case bytes.Equal(params[0], []byte("RDY")): + rdy, _ := strconv.Atoi(string(params[1])) + rdyCount = rdy + case bytes.Equal(params[0], []byte("FIN")): + case bytes.Equal(params[0], []byte("REQ")): + } + readDoneChan <- 1 + case <-scriptTime: + inst := n.script[idx] + if bytes.Equal(inst.body, []byte("exit")) { + goto exit + } + if inst.frameType == nsq.FrameTypeMessage { + if rdyCount == 0 { + scriptTime = time.After(n.script[idx+1].delay) + continue + } + rdyCount-- + } + _, err := conn.Write(framedResponse(inst.frameType, inst.body)) + if err != nil { + log.Printf(err.Error()) + goto exit + } + scriptTime = time.After(n.script[idx+1].delay) + idx++ + } + } + +exit: + n.tcpListener.Close() + conn.Close() +} + +func framedResponse(frameType int32, data []byte) []byte { + var w bytes.Buffer + + beBuf := make([]byte, 4) + size := uint32(len(data)) + 4 + + binary.BigEndian.PutUint32(beBuf, size) + _, err := w.Write(beBuf) + if err != nil { + return nil + } + + binary.BigEndian.PutUint32(beBuf, uint32(frameType)) + _, err = w.Write(beBuf) + if err != nil { + return nil + } + + w.Write(data) + return w.Bytes() +} + +func frameMessage(m *nsq.Message) []byte { + var b bytes.Buffer + m.WriteTo(&b) + return b.Bytes() +} From f62c493c7717428e832194c3a077253c3a496d70 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 10 Jun 2016 17:18:38 +0100 Subject: [PATCH 10/11] Recover from prometheus multiple handler panic closes #1339 --- CHANGELOG.md | 1 + .../prometheus_client/prometheus_client.go | 44 +++++++++++-------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ce2a883e1..f614f4422 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ should now look like: - [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin. - [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin. - [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support. +- [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload. ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index d5e3f1ced..804ae1fad 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -25,8 +25,7 @@ var ( ) type PrometheusClient struct { - Listen string - metrics map[string]*prometheus.UntypedVec + Listen string } var sampleConfig = ` @@ -35,6 +34,14 @@ var sampleConfig = ` ` func (p *PrometheusClient) Start() error { + defer func() { + if r := recover(); r != nil { + // recovering from panic here because there is no way to stop a + // running http go server except by a kill signal. Since the server + // does not stop on SIGHUP, Start() will panic when the process + // is reloaded. + } + }() if p.Listen == "" { p.Listen = "localhost:9126" } @@ -44,7 +51,6 @@ func (p *PrometheusClient) Start() error { Addr: p.Listen, } - p.metrics = make(map[string]*prometheus.UntypedVec) go server.ListenAndServe() return nil } @@ -118,24 +124,26 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { continue } - // Create a new metric if it hasn't been created yet. - if _, ok := p.metrics[mname]; !ok { - p.metrics[mname] = prometheus.NewUntypedVec( - prometheus.UntypedOpts{ - Name: mname, - Help: "Telegraf collected metric", - }, - labels, - ) - if err := prometheus.Register(p.metrics[mname]); err != nil { - log.Printf("prometheus_client: Metric failed to register with prometheus, %s", err) - continue - } + mVec := prometheus.NewUntypedVec( + prometheus.UntypedOpts{ + Name: mname, + Help: "Telegraf collected metric", + }, + labels, + ) + collector, err := prometheus.RegisterOrGet(mVec) + if err != nil { + log.Printf("prometheus_client: Metric failed to register with prometheus, %s", err) + continue + } + mVec, ok := collector.(*prometheus.UntypedVec) + if !ok { + continue } switch val := val.(type) { case int64: - m, err := p.metrics[mname].GetMetricWith(l) + m, err := mVec.GetMetricWith(l) if err != nil { log.Printf("ERROR Getting metric in Prometheus output, "+ "key: %s, labels: %v,\nerr: %s\n", @@ -144,7 +152,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { } m.Set(float64(val)) case float64: - m, err := p.metrics[mname].GetMetricWith(l) + m, err := mVec.GetMetricWith(l) if err != nil { log.Printf("ERROR Getting metric in Prometheus output, "+ "key: %s, labels: %v,\nerr: %s\n", From 755b2ec9535a619f67247697649d59176185183f Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 24 Jun 2016 08:47:31 +0100 Subject: [PATCH 11/11] fixup: BOM Trim -> TrimPrefix --- internal/config/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index b1be77d29..8f7821624 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -542,8 +542,8 @@ func (c *Config) LoadConfig(path string) error { // trimBOM trims the Byte-Order-Marks from the beginning of the file. // this is for Windows compatability only. // see https://github.com/influxdata/telegraf/issues/1378 -func trimBOM(fileBytes []byte) []byte { - return bytes.Trim(fileBytes, "\xef\xbb\xbf") +func trimBOM(f []byte) []byte { + return bytes.TrimPrefix(f, []byte("\xef\xbb\xbf")) } // parseFile loads a TOML configuration from a provided path and