From e1c3800cd98cfe92a4781cd604808bc7fb0f5b53 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 7 Jul 2016 12:15:47 +0200 Subject: [PATCH 01/19] Prometheus parser fix, parse headers properly closes #1458 --- CHANGELOG.md | 3 +- plugins/inputs/prometheus/parser.go | 117 ++++++++--------------- plugins/inputs/prometheus/parser_test.go | 22 ++--- plugins/inputs/prometheus/prometheus.go | 19 +--- 4 files changed, 55 insertions(+), 106 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f130fe60..0cb8d3349 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## v1.0 +## v1.0 [unreleased] ### Release Notes @@ -42,6 +42,7 @@ should now look like: - [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin. - [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support. - [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload. +- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix. ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/inputs/prometheus/parser.go b/plugins/inputs/prometheus/parser.go index e8a7c0892..3c9ddc503 100644 --- a/plugins/inputs/prometheus/parser.go +++ b/plugins/inputs/prometheus/parser.go @@ -10,6 +10,7 @@ import ( "io" "math" "mime" + "net/http" "time" "github.com/influxdata/telegraf" @@ -19,17 +20,9 @@ import ( "github.com/prometheus/common/expfmt" ) -// PrometheusParser is an object for Parsing incoming metrics. -type PrometheusParser struct { - // PromFormat - PromFormat map[string]string - // DefaultTags will be added to every parsed metric - // DefaultTags map[string]string -} - // Parse returns a slice of Metrics from a text representation of a // metrics -func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) { +func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) { var metrics []telegraf.Metric var parser expfmt.TextParser // parse even if the buffer begins with a newline @@ -38,97 +31,71 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) { buffer := bytes.NewBuffer(buf) reader := bufio.NewReader(buffer) - // Get format - mediatype, params, err := mime.ParseMediaType(p.PromFormat["Content-Type"]) + mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) // Prepare output metricFamilies := make(map[string]*dto.MetricFamily) + if err == nil && mediatype == "application/vnd.google.protobuf" && params["encoding"] == "delimited" && params["proto"] == "io.prometheus.client.MetricFamily" { for { - metricFamily := &dto.MetricFamily{} - if _, err = pbutil.ReadDelimited(reader, metricFamily); err != nil { - if err == io.EOF { + mf := &dto.MetricFamily{} + if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { + if ierr == io.EOF { break } - return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err) + return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr) } - metricFamilies[metricFamily.GetName()] = metricFamily + metricFamilies[mf.GetName()] = mf } } else { metricFamilies, err = parser.TextToMetricFamilies(reader) if err != nil { return nil, fmt.Errorf("reading text format failed: %s", err) } - // read metrics - for metricName, mf := range metricFamilies { - for _, m := range mf.Metric { - // reading tags - tags := makeLabels(m) - /* - for key, value := range p.DefaultTags { - tags[key] = value - } - */ - // reading fields - fields := make(map[string]interface{}) - if mf.GetType() == dto.MetricType_SUMMARY { - // summary metric - fields = makeQuantiles(m) - fields["count"] = float64(m.GetSummary().GetSampleCount()) - fields["sum"] = float64(m.GetSummary().GetSampleSum()) - } else if mf.GetType() == dto.MetricType_HISTOGRAM { - // historgram metric - fields = makeBuckets(m) - fields["count"] = float64(m.GetHistogram().GetSampleCount()) - fields["sum"] = float64(m.GetHistogram().GetSampleSum()) + } + // read metrics + for metricName, mf := range metricFamilies { + for _, m := range mf.Metric { + // reading tags + tags := makeLabels(m) + // reading fields + fields := make(map[string]interface{}) + if mf.GetType() == dto.MetricType_SUMMARY { + // summary metric + fields = makeQuantiles(m) + fields["count"] = float64(m.GetSummary().GetSampleCount()) + fields["sum"] = float64(m.GetSummary().GetSampleSum()) + } else if mf.GetType() == dto.MetricType_HISTOGRAM { + // historgram metric + fields = makeBuckets(m) + fields["count"] = float64(m.GetHistogram().GetSampleCount()) + fields["sum"] = float64(m.GetHistogram().GetSampleSum()) + + } else { + // standard metric + fields = getNameAndValue(m) + } + // converting to telegraf metric + if len(fields) > 0 { + var t time.Time + if m.TimestampMs != nil && *m.TimestampMs > 0 { + t = time.Unix(0, *m.TimestampMs*1000000) } else { - // standard metric - fields = getNameAndValue(m) + t = time.Now() } - // converting to telegraf metric - if len(fields) > 0 { - var t time.Time - if m.TimestampMs != nil && *m.TimestampMs > 0 { - t = time.Unix(0, *m.TimestampMs*1000000) - } else { - t = time.Now() - } - metric, err := telegraf.NewMetric(metricName, tags, fields, t) - if err == nil { - metrics = append(metrics, metric) - } + metric, err := telegraf.NewMetric(metricName, tags, fields, t) + if err == nil { + metrics = append(metrics, metric) } } } } + return metrics, err } -// Parse one line -func (p *PrometheusParser) ParseLine(line string) (telegraf.Metric, error) { - metrics, err := p.Parse([]byte(line + "\n")) - - if err != nil { - return nil, err - } - - if len(metrics) < 1 { - return nil, fmt.Errorf( - "Can not parse the line: %s, for data format: prometheus", line) - } - - return metrics[0], nil -} - -/* -// Set default tags -func (p *PrometheusParser) SetDefaultTags(tags map[string]string) { - p.DefaultTags = tags -} -*/ - // Get Quantiles from summary metric func makeQuantiles(m *dto.Metric) map[string]interface{} { fields := make(map[string]interface{}) diff --git a/plugins/inputs/prometheus/parser_test.go b/plugins/inputs/prometheus/parser_test.go index 6259a4ef6..4f2a8516f 100644 --- a/plugins/inputs/prometheus/parser_test.go +++ b/plugins/inputs/prometheus/parser_test.go @@ -1,6 +1,7 @@ package prometheus import ( + "net/http" "testing" "time" @@ -101,10 +102,8 @@ cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 ` func TestParseValidPrometheus(t *testing.T) { - parser := PrometheusParser{} - // Gauge value - metrics, err := parser.Parse([]byte(validUniqueGauge)) + metrics, err := Parse([]byte(validUniqueGauge), http.Header{}) assert.NoError(t, err) assert.Len(t, metrics, 1) assert.Equal(t, "cadvisor_version_info", metrics[0].Name()) @@ -118,8 +117,7 @@ func TestParseValidPrometheus(t *testing.T) { }, metrics[0].Tags()) // Counter value - //parser.SetDefaultTags(map[string]string{"mytag": "mytagvalue"}) - metrics, err = parser.Parse([]byte(validUniqueCounter)) + metrics, err = Parse([]byte(validUniqueCounter), http.Header{}) assert.NoError(t, err) assert.Len(t, metrics, 1) assert.Equal(t, "get_token_fail_count", metrics[0].Name()) @@ -129,8 +127,8 @@ func TestParseValidPrometheus(t *testing.T) { assert.Equal(t, map[string]string{}, metrics[0].Tags()) // Summary data - //parser.SetDefaultTags(map[string]string{}) - metrics, err = parser.Parse([]byte(validUniqueSummary)) + //SetDefaultTags(map[string]string{}) + metrics, err = Parse([]byte(validUniqueSummary), http.Header{}) assert.NoError(t, err) assert.Len(t, metrics, 1) assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name()) @@ -144,7 +142,7 @@ func TestParseValidPrometheus(t *testing.T) { assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags()) // histogram data - metrics, err = parser.Parse([]byte(validUniqueHistogram)) + metrics, err = Parse([]byte(validUniqueHistogram), http.Header{}) assert.NoError(t, err) assert.Len(t, metrics, 1) assert.Equal(t, "apiserver_request_latencies", metrics[0].Name()) @@ -165,11 +163,3 @@ func TestParseValidPrometheus(t *testing.T) { metrics[0].Tags()) } - -func TestParseLineInvalidPrometheus(t *testing.T) { - parser := PrometheusParser{} - metric, err := parser.ParseLine(validUniqueLine) - assert.NotNil(t, err) - assert.Nil(t, metric) - -} diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 2eabcf92c..12f7fd38e 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -13,6 +13,8 @@ import ( "time" ) +const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3` + type Prometheus struct { Urls []string @@ -86,7 +88,7 @@ var client = &http.Client{ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { collectDate := time.Now() var req, err = http.NewRequest("GET", url, nil) - req.Header = make(http.Header) + req.Header.Add("Accept", acceptHeader) var token []byte var resp *http.Response @@ -129,20 +131,9 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { return fmt.Errorf("error reading body: %s", err) } - // Headers - headers := make(map[string]string) - for key, value := range headers { - headers[key] = value - } - - // Prepare Prometheus parser config - promparser := PrometheusParser{ - PromFormat: headers, - } - - metrics, err := promparser.Parse(body) + metrics, err := Parse(body, resp.Header) if err != nil { - return fmt.Errorf("error getting processing samples for %s: %s", + return fmt.Errorf("error reading metrics for %s: %s", url, err) } // Add (or not) collected metrics From c873937356cec0dda64007d05e420daab9004ccc Mon Sep 17 00:00:00 2001 From: Jack Zampolin Date: Sun, 10 Jul 2016 03:11:43 -0700 Subject: [PATCH 02/19] Add note about influxdb compatability (#1465) --- plugins/inputs/logparser/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/inputs/logparser/README.md b/plugins/inputs/logparser/README.md index 1ff50bddd..64e8909f5 100644 --- a/plugins/inputs/logparser/README.md +++ b/plugins/inputs/logparser/README.md @@ -32,6 +32,8 @@ regex patterns. ''' ``` +> **Note:** The InfluxDB log pattern in the default configuration only works for Influx versions 1.0.0-beta1 or higher. + ## Grok Parser The grok parser uses a slightly modified version of logstash "grok" patterns, From d14e7536ab34e05e4bb003921a812dc2accb188f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20de=20Metz?= Date: Sun, 10 Jul 2016 12:12:33 +0200 Subject: [PATCH 03/19] Cleanup the list of plugins. (#1423) Github and Rollbar are now part of the webhooks plugin. --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index 53e672534..8264be7f6 100644 --- a/README.md +++ b/README.md @@ -221,8 +221,6 @@ Telegraf can also collect metrics via the following service plugins: * [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github) * [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar) * [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer) -* [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks) -* [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks) We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API. From 5f0a63f554861e1ea8f39a6293f09d63b1da85d8 Mon Sep 17 00:00:00 2001 From: Vladimir S Date: Sun, 10 Jul 2016 15:17:53 +0300 Subject: [PATCH 04/19] fixes #1450 (#1472) --- CHANGELOG.md | 1 + plugins/inputs/system/disk.go | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cb8d3349..b988508ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ should now look like: ### Bugfixes +- [#1472](https://github.com/influxdata/telegraf/pull/1472): diskio input plugin: set 'skip_serial_number = true' by default to avoid high cardinality. - [#1426](https://github.com/influxdata/telegraf/pull/1426): nil metrics panic fix. - [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin. - [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin. diff --git a/plugins/inputs/system/disk.go b/plugins/inputs/system/disk.go index 5784a7322..f79295294 100644 --- a/plugins/inputs/system/disk.go +++ b/plugins/inputs/system/disk.go @@ -92,8 +92,8 @@ var diskIoSampleConfig = ` ## disk partitions. ## Setting devices will restrict the stats to the specified devices. # devices = ["sda", "sdb"] - ## Uncomment the following line if you do not need disk serial numbers. - # skip_serial_number = true + ## Uncomment the following line if you need disk serial numbers. + # skip_serial_number = false ` func (_ *DiskIOStats) SampleConfig() string { @@ -151,6 +151,6 @@ func init() { }) inputs.Add("diskio", func() telegraf.Input { - return &DiskIOStats{ps: &systemPS{}} + return &DiskIOStats{ps: &systemPS{}, SkipSerialNumber: true} }) } From 6efe91ea9cece66e864b9e472f48811eaf61365a Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Sun, 10 Jul 2016 14:47:47 +0100 Subject: [PATCH 05/19] prometheus_client, implement Collector interface closes #1334 --- CHANGELOG.md | 1 + .../prometheus_client/prometheus_client.go | 70 ++++++++++--------- 2 files changed, 38 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b988508ae..9e4c9a968 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ should now look like: - [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support. - [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload. - [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix. +- [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes. ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index 804ae1fad..790784a2b 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -6,6 +6,7 @@ import ( "net/http" "regexp" "strings" + "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" @@ -26,6 +27,10 @@ var ( type PrometheusClient struct { Listen string + + metrics map[string]prometheus.Metric + + sync.Mutex } var sampleConfig = ` @@ -34,6 +39,7 @@ var sampleConfig = ` ` func (p *PrometheusClient) Start() error { + prometheus.MustRegister(p) defer func() { if r := recover(); r != nil { // recovering from panic here because there is no way to stop a @@ -78,7 +84,27 @@ func (p *PrometheusClient) Description() string { return "Configuration for the Prometheus client to spawn" } +// Implements prometheus.Collector +func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) { + prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch) +} + +// Implements prometheus.Collector +func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) { + p.Lock() + defer p.Unlock() + + for _, m := range p.metrics { + ch <- m + } +} + func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { + p.Lock() + defer p.Unlock() + + p.metrics = make(map[string]prometheus.Metric) + if len(metrics) == 0 { return nil } @@ -124,45 +150,23 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { continue } - mVec := prometheus.NewUntypedVec( - prometheus.UntypedOpts{ - Name: mname, - Help: "Telegraf collected metric", - }, - labels, - ) - collector, err := prometheus.RegisterOrGet(mVec) - if err != nil { - log.Printf("prometheus_client: Metric failed to register with prometheus, %s", err) - continue - } - mVec, ok := collector.(*prometheus.UntypedVec) - if !ok { - continue - } - + desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l) + var metric prometheus.Metric + var err error switch val := val.(type) { case int64: - m, err := mVec.GetMetricWith(l) - if err != nil { - log.Printf("ERROR Getting metric in Prometheus output, "+ - "key: %s, labels: %v,\nerr: %s\n", - mname, l, err.Error()) - continue - } - m.Set(float64(val)) + metric, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, float64(val)) case float64: - m, err := mVec.GetMetricWith(l) - if err != nil { - log.Printf("ERROR Getting metric in Prometheus output, "+ - "key: %s, labels: %v,\nerr: %s\n", - mname, l, err.Error()) - continue - } - m.Set(val) + metric, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, val) default: continue } + if err != nil { + log.Printf("ERROR creating prometheus metric, "+ + "key: %s, labels: %v,\nerr: %s\n", + mname, l, err.Error()) + } + p.metrics[desc.String()] = metric } } return nil From bb4f18ca887bd4ed66c11a6c01f1768be41a5b22 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 14 Jul 2016 08:52:37 -0600 Subject: [PATCH 06/19] temp ci fix, aerospike changed their metrics see http://www.aerospike.com/docs/operations/upgrade/stats_to_3_9 TODO change aerospike input plugin to use official go client library. --- Makefile | 4 ++-- plugins/inputs/aerospike/aerospike_test.go | 13 ------------- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/Makefile b/Makefile index 816c93cf1..6d4f8c35e 100644 --- a/Makefile +++ b/Makefile @@ -55,7 +55,7 @@ docker-run: docker run --name postgres -p "5432:5432" -d postgres docker run --name rabbitmq -p "15672:15672" -p "5672:5672" -d rabbitmq:3-management docker run --name redis -p "6379:6379" -d redis - docker run --name aerospike -p "3000:3000" -d aerospike + docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann @@ -68,7 +68,7 @@ docker-run-circle: -e ADVERTISED_PORT=9092 \ -p "2181:2181" -p "9092:9092" \ -d spotify/kafka - docker run --name aerospike -p "3000:3000" -d aerospike + docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann diff --git a/plugins/inputs/aerospike/aerospike_test.go b/plugins/inputs/aerospike/aerospike_test.go index 74b70eb1d..2717a15b9 100644 --- a/plugins/inputs/aerospike/aerospike_test.go +++ b/plugins/inputs/aerospike/aerospike_test.go @@ -22,19 +22,6 @@ func TestAerospikeStatistics(t *testing.T) { err := a.Gather(&acc) require.NoError(t, err) - - // Only use a few of the metrics - asMetrics := []string{ - "transactions", - "stat_write_errs", - "stat_read_reqs", - "stat_write_reqs", - } - - for _, metric := range asMetrics { - assert.True(t, acc.HasIntField("aerospike", metric), metric) - } - } func TestAerospikeMsgLenFromToBytes(t *testing.T) { From 7b550c11cb2aee6ec91bce50c32ebff41e25a737 Mon Sep 17 00:00:00 2001 From: Kostas Botsas Date: Thu, 14 Jul 2016 18:06:00 +0300 Subject: [PATCH 07/19] Documentation for load balancing on graphite output servers (#1469) * Added documentation for load balancing on graphite output servers * clarifications * updates1 * updates2 * updates3 --- etc/telegraf.conf | 2 ++ plugins/outputs/graphite/README.md | 2 ++ plugins/outputs/graphite/graphite.go | 2 ++ 3 files changed, 6 insertions(+) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index c9011536a..10e949302 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -197,6 +197,8 @@ # # Configuration for Graphite server to send metrics to # [[outputs.graphite]] # ## TCP endpoint for your graphite instance. +# ## If multiple endpoints are configured, the output will be load balanced. +# ## Only one of the endpoints will be written to with each iteration. # servers = ["localhost:2003"] # ## Prefix metrics name # prefix = "" diff --git a/plugins/outputs/graphite/README.md b/plugins/outputs/graphite/README.md index 2de699dea..3e2369e21 100644 --- a/plugins/outputs/graphite/README.md +++ b/plugins/outputs/graphite/README.md @@ -9,6 +9,8 @@ via raw TCP. # Configuration for Graphite server to send metrics to [[outputs.graphite]] ## TCP endpoint for your graphite instance. + ## If multiple endpoints are configured, the output will be load balanced. + ## Only one of the endpoints will be written to with each iteration. servers = ["localhost:2003"] ## Prefix metrics name prefix = "" diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index 30aee0eb6..4e127ed7c 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -25,6 +25,8 @@ type Graphite struct { var sampleConfig = ` ## TCP endpoint for your graphite instance. + ## If multiple endpoints are configured, output will be load balanced. + ## Only one of the endpoints will be written to with each iteration. servers = ["localhost:2003"] ## Prefix metrics name prefix = "" From 69ab8a645c5aceddc48d3882c3db769071fe8ce0 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 12 Jul 2016 14:44:11 -0600 Subject: [PATCH 08/19] graphite output: set write deadline on TCP connection --- plugins/outputs/graphite/graphite.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index 4e127ed7c..fb95aff83 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -2,7 +2,6 @@ package graphite import ( "errors" - "fmt" "log" "math/rand" "net" @@ -98,9 +97,12 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { // Send data to a random server p := rand.Perm(len(g.conns)) for _, n := range p { - if _, e := fmt.Fprint(g.conns[n], graphitePoints); e != nil { + if g.Timeout > 0 { + g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second)) + } + if _, e := g.conns[n].Write([]byte(graphitePoints)); e != nil { // Error - log.Println("ERROR: " + err.Error()) + log.Println("ERROR: " + e.Error()) // Let's try the next one } else { // Success From 7c9b312cee6228c7e7af4e9fa4b86b179f99d444 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 12 Jul 2016 15:31:08 -0600 Subject: [PATCH 09/19] Make race detector build in CI --- plugins/serializers/graphite/graphite.go | 7 ++++--- scripts/circle-test.sh | 2 ++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/plugins/serializers/graphite/graphite.go b/plugins/serializers/graphite/graphite.go index 43e32c244..db114ce9d 100644 --- a/plugins/serializers/graphite/graphite.go +++ b/plugins/serializers/graphite/graphite.go @@ -55,8 +55,9 @@ func (s *GraphiteSerializer) SerializeBucketName( measurement string, tags map[string]string, ) string { - if s.Template == "" { - s.Template = DEFAULT_TEMPLATE + template := s.Template + if template == "" { + template = DEFAULT_TEMPLATE } tagsCopy := make(map[string]string) for k, v := range tags { @@ -64,7 +65,7 @@ func (s *GraphiteSerializer) SerializeBucketName( } var out []string - templateParts := strings.Split(s.Template, ".") + templateParts := strings.Split(template, ".") for _, templatePart := range templateParts { switch templatePart { case "measurement": diff --git a/scripts/circle-test.sh b/scripts/circle-test.sh index 2333b5b73..93bafe320 100755 --- a/scripts/circle-test.sh +++ b/scripts/circle-test.sh @@ -69,6 +69,8 @@ exit_if_fail telegraf -config $tmpdir/config.toml \ -test -input-filter cpu:mem cat $GOPATH/bin/telegraf | gzip > $CIRCLE_ARTIFACTS/telegraf.gz +go build -o telegraf-race -race -ldflags "-X main.version=${VERSION}-RACE" cmd/telegraf/telegraf.go +cat telegraf-race | gzip > $CIRCLE_ARTIFACTS/telegraf-race.gz eval "git describe --exact-match HEAD" if [ $? -eq 0 ]; then From 821d3fafa6562acce148b1e08c3c0b310b6f0639 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 12 Jul 2016 17:08:03 -0600 Subject: [PATCH 10/19] Refactor SerializeBucketName to be read-only for struct fields --- plugins/outputs/librato/librato.go | 3 +-- plugins/serializers/graphite/graphite.go | 20 +++++++------- plugins/serializers/graphite/graphite_test.go | 27 +++++++------------ 3 files changed, 21 insertions(+), 29 deletions(-) diff --git a/plugins/outputs/librato/librato.go b/plugins/outputs/librato/librato.go index 15d6adbb2..ccb2acd9a 100644 --- a/plugins/outputs/librato/librato.go +++ b/plugins/outputs/librato/librato.go @@ -153,8 +153,7 @@ func (l *Librato) Description() string { func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) { gauges := []*Gauge{} - serializer := graphite.GraphiteSerializer{Template: l.Template} - bucket := serializer.SerializeBucketName(m.Name(), m.Tags()) + bucket := graphite.SerializeBucketName(m.Name(), m.Tags(), l.Template, "") for fieldName, value := range m.Fields() { gauge := &Gauge{ Name: graphite.InsertField(bucket, fieldName), diff --git a/plugins/serializers/graphite/graphite.go b/plugins/serializers/graphite/graphite.go index db114ce9d..6e5c4e879 100644 --- a/plugins/serializers/graphite/graphite.go +++ b/plugins/serializers/graphite/graphite.go @@ -10,22 +10,23 @@ import ( const DEFAULT_TEMPLATE = "host.tags.measurement.field" -var fieldDeleter = strings.NewReplacer(".FIELDNAME", "", "FIELDNAME.", "") +var ( + fieldDeleter = strings.NewReplacer(".FIELDNAME", "", "FIELDNAME.", "") + sanitizedChars = strings.NewReplacer("/", "-", "@", "-", "*", "-", " ", "_", "..", ".") +) type GraphiteSerializer struct { Prefix string Template string } -var sanitizedChars = strings.NewReplacer("/", "-", "@", "-", "*", "-", " ", "_", "..", ".") - -func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) { +func (s GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) { out := []string{} // Convert UnixNano to Unix timestamps timestamp := metric.UnixNano() / 1000000000 - bucket := s.SerializeBucketName(metric.Name(), metric.Tags()) + bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix) if bucket == "" { return out, nil } @@ -51,11 +52,12 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) // FIELDNAME. It is up to the user to replace this. This is so that // SerializeBucketName can be called just once per measurement, rather than // once per field. See GraphiteSerializer.InsertField() function. -func (s *GraphiteSerializer) SerializeBucketName( +func SerializeBucketName( measurement string, tags map[string]string, + template string, + prefix string, ) string { - template := s.Template if template == "" { template = DEFAULT_TEMPLATE } @@ -97,10 +99,10 @@ func (s *GraphiteSerializer) SerializeBucketName( return "" } - if s.Prefix == "" { + if prefix == "" { return sanitizedChars.Replace(strings.Join(out, ".")) } - return sanitizedChars.Replace(s.Prefix + "." + strings.Join(out, ".")) + return sanitizedChars.Replace(prefix + "." + strings.Join(out, ".")) } // InsertField takes the bucket string from SerializeBucketName and replaces the diff --git a/plugins/serializers/graphite/graphite_test.go b/plugins/serializers/graphite/graphite_test.go index 64c65d16b..50ba0e2e0 100644 --- a/plugins/serializers/graphite/graphite_test.go +++ b/plugins/serializers/graphite/graphite_test.go @@ -225,8 +225,7 @@ func TestSerializeBucketNameNoHost(t *testing.T) { m, err := telegraf.NewMetric("cpu", tags, fields, now) assert.NoError(t, err) - s := GraphiteSerializer{} - mS := s.SerializeBucketName(m.Name(), m.Tags()) + mS := SerializeBucketName(m.Name(), m.Tags(), "", "") expS := "cpu0.us-west-2.cpu.FIELDNAME" assert.Equal(t, expS, mS) @@ -240,8 +239,7 @@ func TestSerializeBucketNameHost(t *testing.T) { m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) assert.NoError(t, err) - s := GraphiteSerializer{} - mS := s.SerializeBucketName(m.Name(), m.Tags()) + mS := SerializeBucketName(m.Name(), m.Tags(), "", "") expS := "localhost.cpu0.us-west-2.cpu.FIELDNAME" assert.Equal(t, expS, mS) @@ -255,8 +253,7 @@ func TestSerializeBucketNamePrefix(t *testing.T) { m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) assert.NoError(t, err) - s := GraphiteSerializer{Prefix: "prefix"} - mS := s.SerializeBucketName(m.Name(), m.Tags()) + mS := SerializeBucketName(m.Name(), m.Tags(), "", "prefix") expS := "prefix.localhost.cpu0.us-west-2.cpu.FIELDNAME" assert.Equal(t, expS, mS) @@ -270,8 +267,7 @@ func TestTemplate1(t *testing.T) { m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) assert.NoError(t, err) - s := GraphiteSerializer{Template: template1} - mS := s.SerializeBucketName(m.Name(), m.Tags()) + mS := SerializeBucketName(m.Name(), m.Tags(), template1, "") expS := "cpu0.us-west-2.localhost.cpu.FIELDNAME" assert.Equal(t, expS, mS) @@ -285,8 +281,7 @@ func TestTemplate2(t *testing.T) { m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) assert.NoError(t, err) - s := GraphiteSerializer{Template: template2} - mS := s.SerializeBucketName(m.Name(), m.Tags()) + mS := SerializeBucketName(m.Name(), m.Tags(), template2, "") expS := "localhost.cpu.FIELDNAME" assert.Equal(t, expS, mS) @@ -300,8 +295,7 @@ func TestTemplate3(t *testing.T) { m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) assert.NoError(t, err) - s := GraphiteSerializer{Template: template3} - mS := s.SerializeBucketName(m.Name(), m.Tags()) + mS := SerializeBucketName(m.Name(), m.Tags(), template3, "") expS := "localhost.cpu0.us-west-2.FIELDNAME" assert.Equal(t, expS, mS) @@ -315,8 +309,7 @@ func TestTemplate4(t *testing.T) { m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) assert.NoError(t, err) - s := GraphiteSerializer{Template: template4} - mS := s.SerializeBucketName(m.Name(), m.Tags()) + mS := SerializeBucketName(m.Name(), m.Tags(), template4, "") expS := "localhost.cpu0.us-west-2.cpu" assert.Equal(t, expS, mS) @@ -330,8 +323,7 @@ func TestTemplate5(t *testing.T) { m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) assert.NoError(t, err) - s := GraphiteSerializer{Template: template5} - mS := s.SerializeBucketName(m.Name(), m.Tags()) + mS := SerializeBucketName(m.Name(), m.Tags(), template5, "") expS := "localhost.us-west-2.cpu0.cpu.FIELDNAME" assert.Equal(t, expS, mS) @@ -345,8 +337,7 @@ func TestTemplate6(t *testing.T) { m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) assert.NoError(t, err) - s := GraphiteSerializer{Template: template6} - mS := s.SerializeBucketName(m.Name(), m.Tags()) + mS := SerializeBucketName(m.Name(), m.Tags(), template6, "") expS := "localhost.cpu0.us-west-2.cpu.FIELDNAME" assert.Equal(t, expS, mS) From bfdd665435a1e7f987a0b2d00bfbf972012e7a92 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 13 Jul 2016 08:14:48 -0600 Subject: [PATCH 11/19] Copy metrics for each configured output This is for better thread-safety when running with multiple outputs, which can cause very odd panics at very high loads primarily this is to address #1432 closes #1432 --- CHANGELOG.md | 1 + Makefile | 4 ---- agent/agent.go | 24 +++++++++++++++++++-- plugins/inputs/tcp_listener/tcp_listener.go | 11 ++++++++-- plugins/inputs/udp_listener/udp_listener.go | 10 ++++++++- plugins/serializers/graphite/graphite.go | 2 +- 6 files changed, 42 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e4c9a968..d206a7d54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ should now look like: - [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload. - [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix. - [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes. +- [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load. ## v1.0 beta 2 [2016-06-21] diff --git a/Makefile b/Makefile index 6d4f8c35e..ee96e10bd 100644 --- a/Makefile +++ b/Makefile @@ -25,10 +25,6 @@ build-for-docker: "-s -X main.version=$(VERSION)" \ ./cmd/telegraf/telegraf.go -# Build with race detector -dev: prepare - go build -race -ldflags "-X main.version=$(VERSION)" ./... - # run package script package: ./scripts/build.py --package --version="$(VERSION)" --platform=linux --arch=all --upload diff --git a/agent/agent.go b/agent/agent.go index d1d36186e..ae520b89e 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -268,13 +268,33 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() case m := <-metricC: - for _, o := range a.Config.Outputs { - o.AddMetric(m) + for i, o := range a.Config.Outputs { + if i == len(a.Config.Outputs)-1 { + o.AddMetric(m) + } else { + o.AddMetric(copyMetric(m)) + } } } } } +func copyMetric(m telegraf.Metric) telegraf.Metric { + t := time.Time(m.Time()) + + tags := make(map[string]string) + fields := make(map[string]interface{}) + for k, v := range m.Tags() { + tags[k] = v + } + for k, v := range m.Fields() { + fields[k] = v + } + + out, _ := telegraf.NewMetric(m.Name(), tags, fields, t) + return out +} + // Run runs the agent daemon, gathering every Interval func (a *Agent) Run(shutdown chan struct{}) error { var wg sync.WaitGroup diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go index 053fc927e..4688e008b 100644 --- a/plugins/inputs/tcp_listener/tcp_listener.go +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -31,6 +31,8 @@ type TcpListener struct { accept chan bool // drops tracks the number of dropped metrics. drops int + // malformed tracks the number of malformed packets + malformed int // track the listener here so we can close it in Stop() listener *net.TCPListener @@ -45,6 +47,9 @@ var dropwarn = "ERROR: tcp_listener message queue full. " + "We have dropped %d messages so far. " + "You may want to increase allowed_pending_messages in the config\n" +var malformedwarn = "WARNING: tcp_listener has received %d malformed packets" + + " thus far." + const sampleConfig = ` ## Address and port to host TCP listener on service_address = ":8094" @@ -243,8 +248,10 @@ func (t *TcpListener) tcpParser() error { if err == nil { t.storeMetrics(metrics) } else { - log.Printf("Malformed packet: [%s], Error: %s\n", - string(packet), err) + t.malformed++ + if t.malformed == 1 || t.malformed%1000 == 0 { + log.Printf(malformedwarn, t.malformed) + } } } } diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index a20a5583f..120ee50e5 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -27,6 +27,8 @@ type UdpListener struct { done chan struct{} // drops tracks the number of dropped metrics. drops int + // malformed tracks the number of malformed packets + malformed int parser parsers.Parser @@ -44,6 +46,9 @@ var dropwarn = "ERROR: udp_listener message queue full. " + "We have dropped %d messages so far. " + "You may want to increase allowed_pending_messages in the config\n" +var malformedwarn = "WARNING: udp_listener has received %d malformed packets" + + " thus far." + const sampleConfig = ` ## Address and port to host UDP listener on service_address = ":8092" @@ -152,7 +157,10 @@ func (u *UdpListener) udpParser() error { if err == nil { u.storeMetrics(metrics) } else { - log.Printf("Malformed packet: [%s], Error: %s\n", packet, err) + u.malformed++ + if u.malformed == 1 || u.malformed%1000 == 0 { + log.Printf(malformedwarn, u.malformed) + } } } } diff --git a/plugins/serializers/graphite/graphite.go b/plugins/serializers/graphite/graphite.go index 6e5c4e879..2cc4add56 100644 --- a/plugins/serializers/graphite/graphite.go +++ b/plugins/serializers/graphite/graphite.go @@ -20,7 +20,7 @@ type GraphiteSerializer struct { Template string } -func (s GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) { +func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) { out := []string{} // Convert UnixNano to Unix timestamps From 97d92bba67301c0e0758894cb7ce41b9774170f5 Mon Sep 17 00:00:00 2001 From: Andrei Burd Date: Thu, 14 Jul 2016 20:28:36 +0100 Subject: [PATCH 12/19] Redis input enhancement (#1387) master_last_io_seconds_ago added role tag renamed to replication_role --- CHANGELOG.md | 1 + plugins/inputs/redis/README.md | 2 ++ plugins/inputs/redis/redis.go | 5 +++-- plugins/inputs/redis/redis_test.go | 4 ++-- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d206a7d54..d62675803 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ should now look like: - [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests. - [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin. - [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD. +- [#1387](https://github.com/influxdata/telegraf/pull/1387): **Breaking Change** - Redis `role` tag renamed to `replication_role` to avoid global_tags override ### Bugfixes diff --git a/plugins/inputs/redis/README.md b/plugins/inputs/redis/README.md index 1cbaea0ca..51b596aa0 100644 --- a/plugins/inputs/redis/README.md +++ b/plugins/inputs/redis/README.md @@ -43,6 +43,7 @@ - latest_fork_usec - connected_slaves - master_repl_offset + - master_last_io_seconds_ago - repl_backlog_active - repl_backlog_size - repl_backlog_histlen @@ -57,6 +58,7 @@ - All measurements have the following tags: - port - server + - replication role ### Example Output: diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index 94f562471..76cbc89cb 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -66,6 +66,7 @@ var Tracking = map[string]string{ "latest_fork_usec": "latest_fork_usec", "connected_slaves": "connected_slaves", "master_repl_offset": "master_repl_offset", + "master_last_io_seconds_ago": "master_last_io_seconds_ago", "repl_backlog_active": "repl_backlog_active", "repl_backlog_size": "repl_backlog_size", "repl_backlog_histlen": "repl_backlog_histlen", @@ -74,7 +75,7 @@ var Tracking = map[string]string{ "used_cpu_user": "used_cpu_user", "used_cpu_sys_children": "used_cpu_sys_children", "used_cpu_user_children": "used_cpu_user_children", - "role": "role", + "role": "replication_role", } var ErrProtocolError = errors.New("redis protocol error") @@ -208,7 +209,7 @@ func gatherInfoOutput( } if name == "role" { - tags["role"] = val + tags["replication_role"] = val continue } diff --git a/plugins/inputs/redis/redis_test.go b/plugins/inputs/redis/redis_test.go index b12950ee4..2e2fc1e37 100644 --- a/plugins/inputs/redis/redis_test.go +++ b/plugins/inputs/redis/redis_test.go @@ -35,7 +35,7 @@ func TestRedis_ParseMetrics(t *testing.T) { err := gatherInfoOutput(rdr, &acc, tags) require.NoError(t, err) - tags = map[string]string{"host": "redis.net", "role": "master"} + tags = map[string]string{"host": "redis.net", "replication_role": "master"} fields := map[string]interface{}{ "uptime": uint64(238), "clients": uint64(1), @@ -71,7 +71,7 @@ func TestRedis_ParseMetrics(t *testing.T) { "used_cpu_user_children": float64(0.00), "keyspace_hitrate": float64(0.50), } - keyspaceTags := map[string]string{"host": "redis.net", "role": "master", "database": "db0"} + keyspaceTags := map[string]string{"host": "redis.net", "replication_role": "master", "database": "db0"} keyspaceFields := map[string]interface{}{ "avg_ttl": uint64(0), "expires": uint64(0), From 53f40063b31fd9ef3d92e7fc22e821d0f71ac46d Mon Sep 17 00:00:00 2001 From: Sebastian Borza Date: Thu, 14 Jul 2016 15:18:55 -0500 Subject: [PATCH 13/19] Moving cgroup path name to field from tag to reduce cardinality (#1457) adding assertContainsFields function to cgroup_test for custom validation --- plugins/inputs/cgroup/README.md | 5 +- plugins/inputs/cgroup/cgroup_linux.go | 5 +- plugins/inputs/cgroup/cgroup_test.go | 84 +++++++++++++++------------ 3 files changed, 53 insertions(+), 41 deletions(-) diff --git a/plugins/inputs/cgroup/README.md b/plugins/inputs/cgroup/README.md index ab06342bf..feb332dd9 100644 --- a/plugins/inputs/cgroup/README.md +++ b/plugins/inputs/cgroup/README.md @@ -33,8 +33,9 @@ KEY1 VAL1\n ### Tags: -All measurements have the following tags: - - path +Measurements don't have any specific tags unless you define them at the telegraf level (defaults). We +used to have the path listed as a tag, but to keep cardinality in check it's easier to move this +value to a field. Thanks @sebito91! ### Configuration: diff --git a/plugins/inputs/cgroup/cgroup_linux.go b/plugins/inputs/cgroup/cgroup_linux.go index e8ba6f881..ecaf8126d 100644 --- a/plugins/inputs/cgroup/cgroup_linux.go +++ b/plugins/inputs/cgroup/cgroup_linux.go @@ -56,10 +56,9 @@ func (g *CGroup) gatherDir(dir string, acc telegraf.Accumulator) error { return err } } + fields["path"] = dir - tags := map[string]string{"path": dir} - - acc.AddFields(metricName, fields, tags) + acc.AddFields(metricName, fields, nil) return nil } diff --git a/plugins/inputs/cgroup/cgroup_test.go b/plugins/inputs/cgroup/cgroup_test.go index 206b51f6d..ff9b8d7a8 100644 --- a/plugins/inputs/cgroup/cgroup_test.go +++ b/plugins/inputs/cgroup/cgroup_test.go @@ -3,10 +3,13 @@ package cgroup import ( + "fmt" "testing" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "reflect" ) var cg1 = &CGroup{ @@ -21,15 +24,32 @@ var cg1 = &CGroup{ }, } +func assertContainsFields(a *testutil.Accumulator, t *testing.T, measurement string, fieldSet []map[string]interface{}) { + a.Lock() + defer a.Unlock() + + numEquals := 0 + for _, p := range a.Metrics { + if p.Measurement == measurement { + for _, fields := range fieldSet { + if reflect.DeepEqual(fields, p.Fields) { + numEquals++ + } + } + } + } + + if numEquals != len(fieldSet) { + assert.Fail(t, fmt.Sprintf("only %d of %d are equal", numEquals, len(fieldSet))) + } +} + func TestCgroupStatistics_1(t *testing.T) { var acc testutil.Accumulator err := cg1.Gather(&acc) require.NoError(t, err) - tags := map[string]string{ - "path": "testdata/memory", - } fields := map[string]interface{}{ "memory.stat.cache": 1739362304123123123, "memory.stat.rss": 1775325184, @@ -42,8 +62,9 @@ func TestCgroupStatistics_1(t *testing.T) { "memory.limit_in_bytes": 223372036854771712, "memory.use_hierarchy": "12-781", "notify_on_release": 0, + "path": "testdata/memory", } - acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields}) } // ====================================================================== @@ -59,16 +80,14 @@ func TestCgroupStatistics_2(t *testing.T) { err := cg2.Gather(&acc) require.NoError(t, err) - tags := map[string]string{ - "path": "testdata/cpu", - } fields := map[string]interface{}{ "cpuacct.usage_percpu.0": -1452543795404, "cpuacct.usage_percpu.1": 1376681271659, "cpuacct.usage_percpu.2": 1450950799997, "cpuacct.usage_percpu.3": -1473113374257, + "path": "testdata/cpu", } - acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields}) } // ====================================================================== @@ -84,18 +103,16 @@ func TestCgroupStatistics_3(t *testing.T) { err := cg3.Gather(&acc) require.NoError(t, err) - tags := map[string]string{ - "path": "testdata/memory/group_1", - } fields := map[string]interface{}{ "memory.limit_in_bytes": 223372036854771712, + "path": "testdata/memory/group_1", } - acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) - tags = map[string]string{ - "path": "testdata/memory/group_2", + fieldsTwo := map[string]interface{}{ + "memory.limit_in_bytes": 223372036854771712, + "path": "testdata/memory/group_2", } - acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields, fieldsTwo}) } // ====================================================================== @@ -111,23 +128,22 @@ func TestCgroupStatistics_4(t *testing.T) { err := cg4.Gather(&acc) require.NoError(t, err) - tags := map[string]string{ - "path": "testdata/memory/group_1/group_1_1", - } fields := map[string]interface{}{ "memory.limit_in_bytes": 223372036854771712, + "path": "testdata/memory/group_1/group_1_1", } - acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) - tags = map[string]string{ - "path": "testdata/memory/group_1/group_1_2", + fieldsTwo := map[string]interface{}{ + "memory.limit_in_bytes": 223372036854771712, + "path": "testdata/memory/group_1/group_1_2", } - acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) - tags = map[string]string{ - "path": "testdata/memory/group_2", + fieldsThree := map[string]interface{}{ + "memory.limit_in_bytes": 223372036854771712, + "path": "testdata/memory/group_2", } - acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + + assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields, fieldsTwo, fieldsThree}) } // ====================================================================== @@ -143,18 +159,16 @@ func TestCgroupStatistics_5(t *testing.T) { err := cg5.Gather(&acc) require.NoError(t, err) - tags := map[string]string{ - "path": "testdata/memory/group_1/group_1_1", - } fields := map[string]interface{}{ "memory.limit_in_bytes": 223372036854771712, + "path": "testdata/memory/group_1/group_1_1", } - acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) - tags = map[string]string{ - "path": "testdata/memory/group_2/group_1_1", + fieldsTwo := map[string]interface{}{ + "memory.limit_in_bytes": 223372036854771712, + "path": "testdata/memory/group_2/group_1_1", } - acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields, fieldsTwo}) } // ====================================================================== @@ -170,13 +184,11 @@ func TestCgroupStatistics_6(t *testing.T) { err := cg6.Gather(&acc) require.NoError(t, err) - tags := map[string]string{ - "path": "testdata/memory", - } fields := map[string]interface{}{ "memory.usage_in_bytes": 3513667584, "memory.use_hierarchy": "12-781", "memory.kmem.limit_in_bytes": 9223372036854771712, + "path": "testdata/memory", } - acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) + assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields}) } From 4651ab88ad45b55162b51091f9bfe073ce369e37 Mon Sep 17 00:00:00 2001 From: Shashank Sahni Date: Fri, 1 Jul 2016 13:31:14 -0700 Subject: [PATCH 14/19] Fetching galera status metrics in MySQL These are useful for Percona Xtradb cluster. closes #1437 --- CHANGELOG.md | 1 + plugins/inputs/mysql/mysql.go | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d62675803..b1daa60ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ should now look like: - [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin. - [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD. - [#1387](https://github.com/influxdata/telegraf/pull/1387): **Breaking Change** - Redis `role` tag renamed to `replication_role` to avoid global_tags override +- [#1437](https://github.com/influxdata/telegraf/pull/1437): Fetching Galera status metrics in MySQL ### Bugfixes diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index b8ff3945a..5011e82b9 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -306,6 +306,10 @@ var mappings = []*mapping{ onServer: "Threadpool_", inExport: "threadpool_", }, + { + onServer: "wsrep_", + inExport: "wsrep_", + }, } var ( From 21add2c79995eb2297f020abb2d59872c7c3047e Mon Sep 17 00:00:00 2001 From: Joel Meador Date: Tue, 21 Jun 2016 16:28:31 -0400 Subject: [PATCH 15/19] instrumental plugin, rewrite connection retries closes #1412 separate hello and authenticate functions, force connection close at end of write cycle so we don't hold open idle connections, which has the benefit of mostly removing the chance of getting hopelessly connection lost bump instrumental agent version fix test to deal with better better connect/reconnect logic and changed ident & auth handshake Update CHANGELOG.md correct URL from instrumental fork to origin and put the change in the correct part of the file go fmt undo split hello and auth commands, to reduce roundtrips --- CHANGELOG.md | 1 + plugins/outputs/instrumental/instrumental.go | 14 +++++++++++--- plugins/outputs/instrumental/instrumental_test.go | 10 ++-------- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b1daa60ac..da4cbf5cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ should now look like: - [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix. - [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes. - [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load. +- [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/outputs/instrumental/instrumental.go b/plugins/outputs/instrumental/instrumental.go index 461ba9d9e..2fcc28cc0 100644 --- a/plugins/outputs/instrumental/instrumental.go +++ b/plugins/outputs/instrumental/instrumental.go @@ -28,8 +28,10 @@ type Instrumental struct { } const ( - DefaultHost = "collector.instrumentalapp.com" - AuthFormat = "hello version go/telegraf/1.0\nauthenticate %s\n" + DefaultHost = "collector.instrumentalapp.com" + HelloMessage = "hello version go/telegraf/1.1\n" + AuthFormat = "authenticate %s\n" + HandshakeFormat = HelloMessage + AuthFormat ) var ( @@ -52,6 +54,7 @@ var sampleConfig = ` func (i *Instrumental) Connect() error { connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration) + if err != nil { i.conn = nil return err @@ -151,6 +154,11 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { return err } + // force the connection closed after sending data + // to deal with various disconnection scenarios and eschew holding + // open idle connections en masse + i.Close() + return nil } @@ -163,7 +171,7 @@ func (i *Instrumental) SampleConfig() string { } func (i *Instrumental) authenticate(conn net.Conn) error { - _, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken) + _, err := fmt.Fprintf(conn, HandshakeFormat, i.ApiToken) if err != nil { return err } diff --git a/plugins/outputs/instrumental/instrumental_test.go b/plugins/outputs/instrumental/instrumental_test.go index ceb53bac6..9708a2590 100644 --- a/plugins/outputs/instrumental/instrumental_test.go +++ b/plugins/outputs/instrumental/instrumental_test.go @@ -24,7 +24,6 @@ func TestWrite(t *testing.T) { ApiToken: "abc123token", Prefix: "my.prefix", } - i.Connect() // Default to gauge m1, _ := telegraf.NewMetric( @@ -40,10 +39,8 @@ func TestWrite(t *testing.T) { time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) - // Simulate a connection close and reconnect. metrics := []telegraf.Metric{m1, m2} i.Write(metrics) - i.Close() // Counter and Histogram are increments m3, _ := telegraf.NewMetric( @@ -70,7 +67,6 @@ func TestWrite(t *testing.T) { i.Write(metrics) wg.Wait() - i.Close() } func TCPServer(t *testing.T, wg *sync.WaitGroup) { @@ -82,10 +78,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) { tp := textproto.NewReader(reader) hello, _ := tp.ReadLine() - assert.Equal(t, "hello version go/telegraf/1.0", hello) + assert.Equal(t, "hello version go/telegraf/1.1", hello) auth, _ := tp.ReadLine() assert.Equal(t, "authenticate abc123token", auth) - conn.Write([]byte("ok\nok\n")) data1, _ := tp.ReadLine() @@ -99,10 +94,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) { tp = textproto.NewReader(reader) hello, _ = tp.ReadLine() - assert.Equal(t, "hello version go/telegraf/1.0", hello) + assert.Equal(t, "hello version go/telegraf/1.1", hello) auth, _ = tp.ReadLine() assert.Equal(t, "authenticate abc123token", auth) - conn.Write([]byte("ok\nok\n")) data3, _ := tp.ReadLine() From d5e743934380aa3e95e96b0e2a1b7f0c2b4fda4e Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 13 Jul 2016 18:49:17 -0600 Subject: [PATCH 16/19] procstat plugin: store PID as a field closes #1460 --- CHANGELOG.md | 1 + plugins/inputs/procstat/procstat.go | 6 +----- plugins/inputs/procstat/spec_processor.go | 5 ++++- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index da4cbf5cc..eda9f2f63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ should now look like: - [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes. - [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load. - [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior +- [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues. ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index 3b9f0f76c..358dc4c0f 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -70,7 +70,7 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error { p.Exe, p.PidFile, p.Pattern, p.User, err.Error()) } else { for pid, proc := range p.pidmap { - p := NewSpecProcessor(p.ProcessName, p.Prefix, acc, proc, p.tagmap[pid]) + p := NewSpecProcessor(p.ProcessName, p.Prefix, pid, acc, proc, p.tagmap[pid]) p.pushMetrics() } } @@ -140,7 +140,6 @@ func (p *Procstat) pidsFromFile() ([]int32, error) { out = append(out, int32(pid)) p.tagmap[int32(pid)] = map[string]string{ "pidfile": p.PidFile, - "pid": strings.TrimSpace(string(pidString)), } } } @@ -165,7 +164,6 @@ func (p *Procstat) pidsFromExe() ([]int32, error) { out = append(out, int32(ipid)) p.tagmap[int32(ipid)] = map[string]string{ "exe": p.Exe, - "pid": pid, } } else { outerr = err @@ -193,7 +191,6 @@ func (p *Procstat) pidsFromPattern() ([]int32, error) { out = append(out, int32(ipid)) p.tagmap[int32(ipid)] = map[string]string{ "pattern": p.Pattern, - "pid": pid, } } else { outerr = err @@ -221,7 +218,6 @@ func (p *Procstat) pidsFromUser() ([]int32, error) { out = append(out, int32(ipid)) p.tagmap[int32(ipid)] = map[string]string{ "user": p.User, - "pid": pid, } } else { outerr = err diff --git a/plugins/inputs/procstat/spec_processor.go b/plugins/inputs/procstat/spec_processor.go index 0e73b60e9..3789e99d0 100644 --- a/plugins/inputs/procstat/spec_processor.go +++ b/plugins/inputs/procstat/spec_processor.go @@ -10,6 +10,7 @@ import ( type SpecProcessor struct { Prefix string + pid int32 tags map[string]string fields map[string]interface{} acc telegraf.Accumulator @@ -19,6 +20,7 @@ type SpecProcessor struct { func NewSpecProcessor( processName string, prefix string, + pid int32, acc telegraf.Accumulator, p *process.Process, tags map[string]string, @@ -33,6 +35,7 @@ func NewSpecProcessor( } return &SpecProcessor{ Prefix: prefix, + pid: pid, tags: tags, fields: make(map[string]interface{}), acc: acc, @@ -45,7 +48,7 @@ func (p *SpecProcessor) pushMetrics() { if p.Prefix != "" { prefix = p.Prefix + "_" } - fields := map[string]interface{}{} + fields := map[string]interface{}{"pid": p.pid} numThreads, err := p.proc.NumThreads() if err == nil { From 207c5498e718af25768f5de0655cb786c45e9fc0 Mon Sep 17 00:00:00 2001 From: Pierre Fersing Date: Thu, 14 Jul 2016 23:53:05 +0200 Subject: [PATCH 17/19] Remove systemd Install alias (#1470) Alias is a list of additional names. Adding it's cannonical name cause systemctl enable telegraf to show a warning "Too many levels of symbolic links" --- scripts/post-install.sh | 4 ++++ scripts/telegraf.service | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/scripts/post-install.sh b/scripts/post-install.sh index fb0b441e8..95045be1f 100644 --- a/scripts/post-install.sh +++ b/scripts/post-install.sh @@ -37,6 +37,10 @@ chmod 755 $LOG_DIR if [[ -L /etc/init.d/telegraf ]]; then rm -f /etc/init.d/telegraf fi +# Remove legacy symlink, if it exists +if [[ -L /etc/systemd/system/telegraf.service ]]; then + rm -f /etc/systemd/system/telegraf.service +fi # Add defaults file, if it doesn't exist if [[ ! -f /etc/default/telegraf ]]; then diff --git a/scripts/telegraf.service b/scripts/telegraf.service index a7824c9a7..81c9b5408 100644 --- a/scripts/telegraf.service +++ b/scripts/telegraf.service @@ -15,4 +15,3 @@ KillMode=control-group [Install] WantedBy=multi-user.target -Alias=telegraf.service From 300d9adbd027ff87f5120e0e917d9787f83081d5 Mon Sep 17 00:00:00 2001 From: tuier Date: Sat, 16 Jul 2016 19:19:21 +0100 Subject: [PATCH 18/19] Considere zookeeper's state as a tags (#1417) This change will send the state of zookeeper (leader|follower) as a tag and not a metrics That way it will be easier to search for filter per state --- plugins/inputs/zookeeper/README.md | 10 +++++++--- plugins/inputs/zookeeper/zookeeper.go | 23 ++++++++++++++++------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/plugins/inputs/zookeeper/README.md b/plugins/inputs/zookeeper/README.md index fe7a8a4ad..bc7c17a4b 100644 --- a/plugins/inputs/zookeeper/README.md +++ b/plugins/inputs/zookeeper/README.md @@ -32,7 +32,7 @@ echo mntr | nc localhost 2181 Meta: - units: int64 -- tags: `server= port=` +- tags: `server= port= state=` Measurement names: - zookeeper_avg_latency @@ -55,8 +55,12 @@ Measurement names: Meta: - units: string -- tags: `server= port=` +- tags: `server= port= state=` Measurement names: - zookeeper_version -- zookeeper_server_state \ No newline at end of file + +### Tags: + +- All measurements have the following tags: + - diff --git a/plugins/inputs/zookeeper/zookeeper.go b/plugins/inputs/zookeeper/zookeeper.go index 54defc56f..c11b55f68 100644 --- a/plugins/inputs/zookeeper/zookeeper.go +++ b/plugins/inputs/zookeeper/zookeeper.go @@ -55,6 +55,7 @@ func (z *Zookeeper) Gather(acc telegraf.Accumulator) error { } func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error { + var zookeeper_state string _, _, err := net.SplitHostPort(address) if err != nil { address = address + ":2181" @@ -78,7 +79,6 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error if len(service) != 2 { return fmt.Errorf("Invalid service address: %s", address) } - tags := map[string]string{"server": service[0], "port": service[1]} fields := make(map[string]interface{}) for scanner.Scan() { @@ -92,15 +92,24 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error } measurement := strings.TrimPrefix(parts[1], "zk_") - sValue := string(parts[2]) - - iVal, err := strconv.ParseInt(sValue, 10, 64) - if err == nil { - fields[measurement] = iVal + if measurement == "server_state" { + zookeeper_state = parts[2] } else { - fields[measurement] = sValue + sValue := string(parts[2]) + + iVal, err := strconv.ParseInt(sValue, 10, 64) + if err == nil { + fields[measurement] = iVal + } else { + fields[measurement] = sValue + } } } + tags := map[string]string{ + "server": service[0], + "port": service[1], + "state": zookeeper_state, + } acc.AddFields("zookeeper", fields, tags) return nil From 704d9ad76c898c9f14c8ed7e33de416c8e4f1259 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 14 Jul 2016 23:12:32 -0600 Subject: [PATCH 19/19] Refactor aerospike plugin to use client lib --- CHANGELOG.md | 7 + Godeps | 2 + plugins/inputs/aerospike/aerospike.go | 357 +++++---------------- plugins/inputs/aerospike/aerospike_test.go | 97 ++---- 4 files changed, 104 insertions(+), 359 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eda9f2f63..d01567eba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ ### Release Notes +**Breaking Change**: Aerospike main server node measurements have been renamed +aerospike_node. Aerospike namespace measurements have been renamed to +aerospike_namespace. They will also now be tagged with the node_name +that they correspond to. This has been done to differentiate measurements +that pertain to node vs. namespace statistics. + **Breaking Change**: users of github_webhooks must change to the new `[[inputs.webhooks]]` plugin. @@ -35,6 +41,7 @@ should now look like: - [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD. - [#1387](https://github.com/influxdata/telegraf/pull/1387): **Breaking Change** - Redis `role` tag renamed to `replication_role` to avoid global_tags override - [#1437](https://github.com/influxdata/telegraf/pull/1437): Fetching Galera status metrics in MySQL +- [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib. ### Bugfixes diff --git a/Godeps b/Godeps index f47a57806..1546bb627 100644 --- a/Godeps +++ b/Godeps @@ -1,5 +1,6 @@ github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9 github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc +github.com/aerospike/aerospike-client-go 45863b7fd8640dc12f7fdd397104d97e1986f25a github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 @@ -50,6 +51,7 @@ github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2 github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 +github.com/yuin/gopher-lua bf3808abd44b1e55143a2d7f08571aaa80db1808 github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 golang.org/x/crypto 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3 golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172 diff --git a/plugins/inputs/aerospike/aerospike.go b/plugins/inputs/aerospike/aerospike.go index cd2ebe25c..4bb652c0a 100644 --- a/plugins/inputs/aerospike/aerospike.go +++ b/plugins/inputs/aerospike/aerospike.go @@ -1,104 +1,19 @@ package aerospike import ( - "bytes" - "encoding/binary" - "fmt" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/inputs" "net" "strconv" "strings" "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" + "github.com/influxdata/telegraf/plugins/inputs" + + as "github.com/aerospike/aerospike-client-go" ) -const ( - MSG_HEADER_SIZE = 8 - MSG_TYPE = 1 // Info is 1 - MSG_VERSION = 2 -) - -var ( - STATISTICS_COMMAND = []byte("statistics\n") - NAMESPACES_COMMAND = []byte("namespaces\n") -) - -type aerospikeMessageHeader struct { - Version uint8 - Type uint8 - DataLen [6]byte -} - -type aerospikeMessage struct { - aerospikeMessageHeader - Data []byte -} - -// Taken from aerospike-client-go/types/message.go -func (msg *aerospikeMessage) Serialize() []byte { - msg.DataLen = msgLenToBytes(int64(len(msg.Data))) - buf := bytes.NewBuffer([]byte{}) - binary.Write(buf, binary.BigEndian, msg.aerospikeMessageHeader) - binary.Write(buf, binary.BigEndian, msg.Data[:]) - return buf.Bytes() -} - -type aerospikeInfoCommand struct { - msg *aerospikeMessage -} - -// Taken from aerospike-client-go/info.go -func (nfo *aerospikeInfoCommand) parseMultiResponse() (map[string]string, error) { - responses := make(map[string]string) - offset := int64(0) - begin := int64(0) - - dataLen := int64(len(nfo.msg.Data)) - - // Create reusable StringBuilder for performance. - for offset < dataLen { - b := nfo.msg.Data[offset] - - if b == '\t' { - name := nfo.msg.Data[begin:offset] - offset++ - begin = offset - - // Parse field value. - for offset < dataLen { - if nfo.msg.Data[offset] == '\n' { - break - } - offset++ - } - - if offset > begin { - value := nfo.msg.Data[begin:offset] - responses[string(name)] = string(value) - } else { - responses[string(name)] = "" - } - offset++ - begin = offset - } else if b == '\n' { - if offset > begin { - name := nfo.msg.Data[begin:offset] - responses[string(name)] = "" - } - offset++ - begin = offset - } else { - offset++ - } - } - - if offset > begin { - name := nfo.msg.Data[begin:offset] - responses[string(name)] = "" - } - return responses, nil -} - type Aerospike struct { Servers []string } @@ -115,7 +30,7 @@ func (a *Aerospike) SampleConfig() string { } func (a *Aerospike) Description() string { - return "Read stats from an aerospike server" + return "Read stats from aerospike server(s)" } func (a *Aerospike) Gather(acc telegraf.Accumulator) error { @@ -124,214 +39,90 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error { } var wg sync.WaitGroup - - var outerr error - + errChan := errchan.New(len(a.Servers)) + wg.Add(len(a.Servers)) for _, server := range a.Servers { - wg.Add(1) - go func(server string) { + go func(serv string) { defer wg.Done() - outerr = a.gatherServer(server, acc) + errChan.C <- a.gatherServer(serv, acc) }(server) } wg.Wait() - return outerr + return errChan.Error() } -func (a *Aerospike) gatherServer(host string, acc telegraf.Accumulator) error { - aerospikeInfo, err := getMap(STATISTICS_COMMAND, host) +func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) error { + host, port, err := net.SplitHostPort(hostport) if err != nil { - return fmt.Errorf("Aerospike info failed: %s", err) + return err } - readAerospikeStats(aerospikeInfo, acc, host, "") - namespaces, err := getList(NAMESPACES_COMMAND, host) + + iport, err := strconv.Atoi(port) if err != nil { - return fmt.Errorf("Aerospike namespace list failed: %s", err) + iport = 3000 } - for ix := range namespaces { - nsInfo, err := getMap([]byte("namespace/"+namespaces[ix]+"\n"), host) - if err != nil { - return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err) + + c, err := as.NewClient(host, iport) + if err != nil { + return err + } + defer c.Close() + + nodes := c.GetNodes() + for _, n := range nodes { + tags := map[string]string{ + "node_name": n.GetName(), + "aerospike_host": hostport, + } + fields := make(map[string]interface{}) + stats, err := as.RequestNodeStats(n) + if err != nil { + return err + } + for k, v := range stats { + if iv, err := strconv.ParseInt(v, 10, 64); err == nil { + fields[strings.Replace(k, "-", "_", -1)] = iv + } + } + acc.AddFields("aerospike_node", fields, tags, time.Now()) + + info, err := as.RequestNodeInfo(n, "namespaces") + if err != nil { + return err + } + namespaces := strings.Split(info["namespaces"], ";") + + for _, namespace := range namespaces { + nTags := copyTags(tags) + nTags["namespace"] = namespace + nFields := make(map[string]interface{}) + info, err := as.RequestNodeInfo(n, "namespace/"+namespace) + if err != nil { + continue + } + stats := strings.Split(info["namespace/"+namespace], ";") + for _, stat := range stats { + parts := strings.Split(stat, "=") + if len(parts) < 2 { + continue + } + if iv, err := strconv.ParseInt(parts[1], 10, 64); err == nil { + nFields[strings.Replace(parts[0], "-", "_", -1)] = iv + } + } + acc.AddFields("aerospike_namespace", nFields, nTags, time.Now()) } - readAerospikeStats(nsInfo, acc, host, namespaces[ix]) } return nil } -func getMap(key []byte, host string) (map[string]string, error) { - data, err := get(key, host) - if err != nil { - return nil, fmt.Errorf("Failed to get data: %s", err) +func copyTags(m map[string]string) map[string]string { + out := make(map[string]string) + for k, v := range m { + out[k] = v } - parsed, err := unmarshalMapInfo(data, string(key)) - if err != nil { - return nil, fmt.Errorf("Failed to unmarshal data: %s", err) - } - - return parsed, nil -} - -func getList(key []byte, host string) ([]string, error) { - data, err := get(key, host) - if err != nil { - return nil, fmt.Errorf("Failed to get data: %s", err) - } - parsed, err := unmarshalListInfo(data, string(key)) - if err != nil { - return nil, fmt.Errorf("Failed to unmarshal data: %s", err) - } - - return parsed, nil -} - -func get(key []byte, host string) (map[string]string, error) { - var err error - var data map[string]string - - asInfo := &aerospikeInfoCommand{ - msg: &aerospikeMessage{ - aerospikeMessageHeader: aerospikeMessageHeader{ - Version: uint8(MSG_VERSION), - Type: uint8(MSG_TYPE), - DataLen: msgLenToBytes(int64(len(key))), - }, - Data: key, - }, - } - - cmd := asInfo.msg.Serialize() - addr, err := net.ResolveTCPAddr("tcp", host) - if err != nil { - return data, fmt.Errorf("Lookup failed for '%s': %s", host, err) - } - - conn, err := net.DialTCP("tcp", nil, addr) - if err != nil { - return data, fmt.Errorf("Connection failed for '%s': %s", host, err) - } - defer conn.Close() - - _, err = conn.Write(cmd) - if err != nil { - return data, fmt.Errorf("Failed to send to '%s': %s", host, err) - } - - msgHeader := bytes.NewBuffer(make([]byte, MSG_HEADER_SIZE)) - _, err = readLenFromConn(conn, msgHeader.Bytes(), MSG_HEADER_SIZE) - if err != nil { - return data, fmt.Errorf("Failed to read header: %s", err) - } - err = binary.Read(msgHeader, binary.BigEndian, &asInfo.msg.aerospikeMessageHeader) - if err != nil { - return data, fmt.Errorf("Failed to unmarshal header: %s", err) - } - - msgLen := msgLenFromBytes(asInfo.msg.aerospikeMessageHeader.DataLen) - - if int64(len(asInfo.msg.Data)) != msgLen { - asInfo.msg.Data = make([]byte, msgLen) - } - - _, err = readLenFromConn(conn, asInfo.msg.Data, len(asInfo.msg.Data)) - if err != nil { - return data, fmt.Errorf("Failed to read from connection to '%s': %s", host, err) - } - - data, err = asInfo.parseMultiResponse() - if err != nil { - return data, fmt.Errorf("Failed to parse response from '%s': %s", host, err) - } - - return data, err -} - -func readAerospikeStats( - stats map[string]string, - acc telegraf.Accumulator, - host string, - namespace string, -) { - fields := make(map[string]interface{}) - tags := map[string]string{ - "aerospike_host": host, - "namespace": "_service", - } - - if namespace != "" { - tags["namespace"] = namespace - } - for key, value := range stats { - // We are going to ignore all string based keys - val, err := strconv.ParseInt(value, 10, 64) - if err == nil { - if strings.Contains(key, "-") { - key = strings.Replace(key, "-", "_", -1) - } - fields[key] = val - } - } - acc.AddFields("aerospike", fields, tags) -} - -func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) { - key = strings.TrimSuffix(key, "\n") - res := map[string]string{} - - v, exists := infoMap[key] - if !exists { - return res, fmt.Errorf("Key '%s' missing from info", key) - } - - values := strings.Split(v, ";") - for i := range values { - kv := strings.Split(values[i], "=") - if len(kv) > 1 { - res[kv[0]] = kv[1] - } - } - - return res, nil -} - -func unmarshalListInfo(infoMap map[string]string, key string) ([]string, error) { - key = strings.TrimSuffix(key, "\n") - - v, exists := infoMap[key] - if !exists { - return []string{}, fmt.Errorf("Key '%s' missing from info", key) - } - - values := strings.Split(v, ";") - return values, nil -} - -func readLenFromConn(c net.Conn, buffer []byte, length int) (total int, err error) { - var r int - for total < length { - r, err = c.Read(buffer[total:length]) - total += r - if err != nil { - break - } - } - return -} - -// Taken from aerospike-client-go/types/message.go -func msgLenToBytes(DataLen int64) [6]byte { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, uint64(DataLen)) - res := [6]byte{} - copy(res[:], b[2:]) - return res -} - -// Taken from aerospike-client-go/types/message.go -func msgLenFromBytes(buf [6]byte) int64 { - nbytes := append([]byte{0, 0}, buf[:]...) - DataLen := binary.BigEndian.Uint64(nbytes) - return int64(DataLen) + return out } func init() { diff --git a/plugins/inputs/aerospike/aerospike_test.go b/plugins/inputs/aerospike/aerospike_test.go index 2717a15b9..8463432f5 100644 --- a/plugins/inputs/aerospike/aerospike_test.go +++ b/plugins/inputs/aerospike/aerospike_test.go @@ -1,7 +1,6 @@ package aerospike import ( - "reflect" "testing" "github.com/influxdata/telegraf/testutil" @@ -22,84 +21,30 @@ func TestAerospikeStatistics(t *testing.T) { err := a.Gather(&acc) require.NoError(t, err) + + assert.True(t, acc.HasMeasurement("aerospike_node")) + assert.True(t, acc.HasMeasurement("aerospike_namespace")) + assert.True(t, acc.HasIntField("aerospike_node", "batch_error")) } -func TestAerospikeMsgLenFromToBytes(t *testing.T) { - var i int64 = 8 - assert.True(t, i == msgLenFromBytes(msgLenToBytes(i))) -} +func TestAerospikeStatisticsPartialErr(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + a := &Aerospike{ + Servers: []string{ + testutil.GetLocalHost() + ":3000", + testutil.GetLocalHost() + ":9999", + }, + } -func TestReadAerospikeStatsNoNamespace(t *testing.T) { - // Also test for re-writing var acc testutil.Accumulator - stats := map[string]string{ - "stat-write-errs": "12345", - "stat_read_reqs": "12345", - } - readAerospikeStats(stats, &acc, "host1", "") - fields := map[string]interface{}{ - "stat_write_errs": int64(12345), - "stat_read_reqs": int64(12345), - } - tags := map[string]string{ - "aerospike_host": "host1", - "namespace": "_service", - } - acc.AssertContainsTaggedFields(t, "aerospike", fields, tags) -} - -func TestReadAerospikeStatsNamespace(t *testing.T) { - var acc testutil.Accumulator - stats := map[string]string{ - "stat_write_errs": "12345", - "stat_read_reqs": "12345", - } - readAerospikeStats(stats, &acc, "host1", "test") - - fields := map[string]interface{}{ - "stat_write_errs": int64(12345), - "stat_read_reqs": int64(12345), - } - tags := map[string]string{ - "aerospike_host": "host1", - "namespace": "test", - } - acc.AssertContainsTaggedFields(t, "aerospike", fields, tags) -} - -func TestAerospikeUnmarshalList(t *testing.T) { - i := map[string]string{ - "test": "one;two;three", - } - - expected := []string{"one", "two", "three"} - - list, err := unmarshalListInfo(i, "test2") - assert.True(t, err != nil) - - list, err = unmarshalListInfo(i, "test") - assert.True(t, err == nil) - equal := true - for ix := range expected { - if list[ix] != expected[ix] { - equal = false - break - } - } - assert.True(t, equal) -} - -func TestAerospikeUnmarshalMap(t *testing.T) { - i := map[string]string{ - "test": "key1=value1;key2=value2", - } - - expected := map[string]string{ - "key1": "value1", - "key2": "value2", - } - m, err := unmarshalMapInfo(i, "test") - assert.True(t, err == nil) - assert.True(t, reflect.DeepEqual(m, expected)) + err := a.Gather(&acc) + require.Error(t, err) + + assert.True(t, acc.HasMeasurement("aerospike_node")) + assert.True(t, acc.HasMeasurement("aerospike_namespace")) + assert.True(t, acc.HasIntField("aerospike_node", "batch_error")) }