Compare commits

..

6 Commits

Author SHA1 Message Date
Daniel Nelson
1989287606 Use %q format when printing influxdb errors 2018-06-05 17:25:09 -07:00
Daniel Nelson
5f0cbd1255 Update changelog 2018-06-05 17:14:29 -07:00
Leszek Charkiewicz
3ef4dff4ec Add SSL/TLS support to Redis input (#4236) 2018-06-05 17:12:30 -07:00
Piotr Popieluch
dfe7b5eec2 Don't skip metrics during startup in aggregate phase (#4230) 2018-06-05 16:30:53 -07:00
Daniel Nelson
92a8f795f5 Set 1.6.4 release date 2018-06-05 12:11:15 -07:00
Daniel Nelson
b1d77ade55 Update master version to 1.8 2018-06-05 11:46:55 -07:00
45 changed files with 491 additions and 1113 deletions

View File

@@ -2,12 +2,15 @@
defaults: defaults:
defaults: &defaults defaults: &defaults
working_directory: '/go/src/github.com/influxdata/telegraf' working_directory: '/go/src/github.com/influxdata/telegraf'
go-1_8: &go-1_8
docker:
- image: 'circleci/golang:1.8.7'
go-1_9: &go-1_9 go-1_9: &go-1_9
docker: docker:
- image: 'circleci/golang:1.9.7' - image: 'circleci/golang:1.9.5'
go-1_10: &go-1_10 go-1_10: &go-1_10
docker: docker:
- image: 'circleci/golang:1.10.3' - image: 'circleci/golang:1.10.1'
version: 2 version: 2
jobs: jobs:
@@ -20,6 +23,12 @@ jobs:
root: '/go/src' root: '/go/src'
paths: paths:
- '*' - '*'
test-go-1.8:
<<: [ *defaults, *go-1_8 ]
steps:
- attach_workspace:
at: '/go/src'
- run: 'make test-ci'
test-go-1.9: test-go-1.9:
<<: [ *defaults, *go-1_9 ] <<: [ *defaults, *go-1_9 ]
steps: steps:
@@ -57,6 +66,9 @@ workflows:
build_and_release: build_and_release:
jobs: jobs:
- 'deps' - 'deps'
- 'test-go-1.8':
requires:
- 'deps'
- 'test-go-1.9': - 'test-go-1.9':
requires: requires:
- 'deps' - 'deps'
@@ -65,11 +77,15 @@ workflows:
- 'deps' - 'deps'
- 'release': - 'release':
requires: requires:
- 'test-go-1.8'
- 'test-go-1.9' - 'test-go-1.9'
- 'test-go-1.10' - 'test-go-1.10'
nightly: nightly:
jobs: jobs:
- 'deps' - 'deps'
- 'test-go-1.8':
requires:
- 'deps'
- 'test-go-1.9': - 'test-go-1.9':
requires: requires:
- 'deps' - 'deps'
@@ -78,6 +94,7 @@ workflows:
- 'deps' - 'deps'
- 'nightly': - 'nightly':
requires: requires:
- 'test-go-1.8'
- 'test-go-1.9' - 'test-go-1.9'
- 'test-go-1.10' - 'test-go-1.10'
triggers: triggers:

View File

@@ -1,17 +1,10 @@
## v1.7.1 [2018-07-03] ## v1.8 [unreleased]
### Bugfixes ### Features
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal. - [#4236](https://github.com/influxdata/telegraf/pull/4236): Add SSL/TLS support to redis input.
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues.
- [#4334](https://github.com/influxdata/telegraf/pull/4334): Fix syslog timestamp parsing with single digit day of month.
- [#2910](https://github.com/influxdata/telegraf/issues/2910): Handle mysql input variations in the user_statistics collecting.
- [#4293](https://github.com/influxdata/telegraf/issues/4293): Fix minmax and basicstats aggregators to use uint64.
- [#4290](https://github.com/influxdata/telegraf/issues/4290): Document swap input plugin.
- [#4316](https://github.com/influxdata/telegraf/issues/4316): Fix incorrect precision being applied to metric in http_listener.
## v1.7 [2018-06-12] ## v1.7 [unreleased]
### Release Notes ### Release Notes
@@ -86,8 +79,6 @@
- [#2879](https://github.com/influxdata/telegraf/issues/2879): Fix wildcards and multi instance processes in win_perf_counters. - [#2879](https://github.com/influxdata/telegraf/issues/2879): Fix wildcards and multi instance processes in win_perf_counters.
- [#2468](https://github.com/influxdata/telegraf/issues/2468): Fix crash on 32-bit Windows in win_perf_counters. - [#2468](https://github.com/influxdata/telegraf/issues/2468): Fix crash on 32-bit Windows in win_perf_counters.
- [#4198](https://github.com/influxdata/telegraf/issues/4198): Fix win_perf_counters not collecting at every interval. - [#4198](https://github.com/influxdata/telegraf/issues/4198): Fix win_perf_counters not collecting at every interval.
- [#4227](https://github.com/influxdata/telegraf/issues/4227): Use same flags for all BSD family ping variants.
- [#4266](https://github.com/influxdata/telegraf/issues/4266): Remove tags with empty values from Wavefront output.
## v1.6.4 [2018-06-05] ## v1.6.4 [2018-06-05]

7
Godeps
View File

@@ -28,13 +28,13 @@ github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7
github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7 github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7
github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc
github.com/gorilla/mux 53c1911da2b537f792e7cafcb446b05ffe33b996 github.com/gorilla/mux 53c1911da2b537f792e7cafcb446b05ffe33b996
github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836 github.com/go-redis/redis 83fb42932f6145ce52df09860384a4653d2d332a
github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
github.com/hashicorp/consul 5174058f0d2bda63fa5198ab96c33d9a909c58ed github.com/hashicorp/consul 5174058f0d2bda63fa5198ab96c33d9a909c58ed
github.com/influxdata/go-syslog eecd51df3ad85464a2bab9b7d3a45bc1e299059e github.com/influxdata/go-syslog 84f3b60009444d298f97454feb1f20cf91d1fa6e
github.com/influxdata/tail c43482518d410361b6c383d7aebce33d0471d7bc github.com/influxdata/tail c43482518d410361b6c383d7aebce33d0471d7bc
github.com/influxdata/toml 2a2e3012f7cfbef64091cc79776311e65dfa211b github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f
github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec
github.com/fsnotify/fsnotify c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9 github.com/fsnotify/fsnotify c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9
github.com/jackc/pgx 63f58fd32edb5684b9e9f4cfaac847c6b42b3917 github.com/jackc/pgx 63f58fd32edb5684b9e9f4cfaac847c6b42b3917
@@ -51,6 +51,7 @@ github.com/multiplay/go-ts3 07477f49b8dfa3ada231afc7b7b17617d42afe8e
github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/nats-io/gnatsd 393bbb7c031433e68707c8810fda0bfcfbe6ab9b github.com/nats-io/gnatsd 393bbb7c031433e68707c8810fda0bfcfbe6ab9b
github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898 github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f
github.com/nsqio/go-nsq eee57a3ac4174c55924125bb15eeeda8cffb6e6f github.com/nsqio/go-nsq eee57a3ac4174c55924125bb15eeeda8cffb6e6f
github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8 github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8

View File

@@ -40,7 +40,7 @@ Ansible role: https://github.com/rossmcdonald/telegraf
### From Source: ### From Source:
Telegraf requires golang version 1.9 or newer, the Makefile requires GNU make. Telegraf requires golang version 1.8+, the Makefile requires GNU make.
Dependencies are managed with [gdm](https://github.com/sparrc/gdm), Dependencies are managed with [gdm](https://github.com/sparrc/gdm),
which is installed by the Makefile if you don't have it already. which is installed by the Makefile if you don't have it already.

View File

@@ -362,24 +362,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
metricC := make(chan telegraf.Metric, 100) metricC := make(chan telegraf.Metric, 100)
aggC := make(chan telegraf.Metric, 100) aggC := make(chan telegraf.Metric, 100)
// Start all ServicePlugins
for _, input := range a.Config.Inputs {
input.SetDefaultTags(a.Config.Tags)
switch p := input.Input.(type) {
case telegraf.ServiceInput:
acc := NewAccumulator(input, metricC)
// Service input plugins should set their own precision of their
// metrics.
acc.SetPrecision(time.Nanosecond, 0)
if err := p.Start(acc); err != nil {
log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
input.Name(), err.Error())
return err
}
defer p.Stop()
}
}
// Round collection to nearest interval by sleeping // Round collection to nearest interval by sleeping
if a.Config.Agent.RoundInterval { if a.Config.Agent.RoundInterval {
i := int64(a.Config.Agent.Interval.Duration) i := int64(a.Config.Agent.Interval.Duration)
@@ -419,6 +401,25 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}(input, interval) }(input, interval)
} }
// Start all ServicePlugins inputs after all other
// plugins are loaded so that no metrics get dropped
for _, input := range a.Config.Inputs {
input.SetDefaultTags(a.Config.Tags)
switch p := input.Input.(type) {
case telegraf.ServiceInput:
acc := NewAccumulator(input, metricC)
// Service input plugins should set their own precision of their
// metrics.
acc.SetPrecision(time.Nanosecond, 0)
if err := p.Start(acc); err != nil {
log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
input.Name(), err.Error())
return err
}
defer p.Stop()
}
}
wg.Wait() wg.Wait()
a.Close() a.Close()
return nil return nil

View File

@@ -58,7 +58,7 @@ var fService = flag.String("service", "",
var fRunAsConsole = flag.Bool("console", false, "run as console application (windows only)") var fRunAsConsole = flag.Bool("console", false, "run as console application (windows only)")
var ( var (
nextVersion = "1.7.0" nextVersion = "1.8.0"
version string version string
commit string commit string
branch string branch string
@@ -147,11 +147,11 @@ func reloadLoop(
shutdown := make(chan struct{}) shutdown := make(chan struct{})
signals := make(chan os.Signal) signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGTERM) signal.Notify(signals, os.Interrupt, syscall.SIGHUP)
go func() { go func() {
select { select {
case sig := <-signals: case sig := <-signals:
if sig == os.Interrupt || sig == syscall.SIGTERM { if sig == os.Interrupt {
close(shutdown) close(shutdown)
} }
if sig == syscall.SIGHUP { if sig == syscall.SIGHUP {

View File

@@ -1,6 +1,7 @@
package models package models
import ( import (
"log"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@@ -153,6 +154,7 @@ func (r *RunningAggregator) Run(
m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) { m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) {
// the metric is outside the current aggregation period, so // the metric is outside the current aggregation period, so
// skip it. // skip it.
log.Printf("D! aggregator: metric \"%s\" is not in the current timewindow, skipping", m.Name())
continue continue
} }
r.add(m) r.add(m)

View File

@@ -17,7 +17,7 @@ type ClientConfig struct {
// Deprecated in 1.7; use TLS variables above // Deprecated in 1.7; use TLS variables above
SSLCA string `toml:"ssl_ca"` SSLCA string `toml:"ssl_ca"`
SSLCert string `toml:"ssl_cert"` SSLCert string `toml:"ssl_cert"`
SSLKey string `toml:"ssl_key"` SSLKey string `toml:"ssl_ca"`
} }
// ServerConfig represents the standard server TLS config. // ServerConfig represents the standard server TLS config.

View File

@@ -246,8 +246,6 @@ func convert(in interface{}) (float64, bool) {
return v, true return v, true
case int64: case int64:
return float64(v), true return float64(v), true
case uint64:
return float64(v), true
default: default:
return 0, false return 0, false
} }

View File

@@ -28,7 +28,6 @@ var m2, _ = metric.New("m1",
"c": float64(4), "c": float64(4),
"d": float64(6), "d": float64(6),
"e": float64(200), "e": float64(200),
"f": uint64(200),
"ignoreme": "string", "ignoreme": "string",
"andme": true, "andme": true,
}, },
@@ -82,10 +81,6 @@ func TestBasicStatsWithPeriod(t *testing.T) {
"e_max": float64(200), "e_max": float64(200),
"e_min": float64(200), "e_min": float64(200),
"e_mean": float64(200), "e_mean": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -149,10 +144,6 @@ func TestBasicStatsDifferentPeriods(t *testing.T) {
"e_max": float64(200), "e_max": float64(200),
"e_min": float64(200), "e_min": float64(200),
"e_mean": float64(200), "e_mean": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
} }
expectedTags = map[string]string{ expectedTags = map[string]string{
"foo": "bar", "foo": "bar",
@@ -178,7 +169,6 @@ func TestBasicStatsWithOnlyCount(t *testing.T) {
"c_count": float64(2), "c_count": float64(2),
"d_count": float64(2), "d_count": float64(2),
"e_count": float64(1), "e_count": float64(1),
"f_count": float64(1),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -204,7 +194,6 @@ func TestBasicStatsWithOnlyMin(t *testing.T) {
"c_min": float64(2), "c_min": float64(2),
"d_min": float64(2), "d_min": float64(2),
"e_min": float64(200), "e_min": float64(200),
"f_min": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -230,7 +219,6 @@ func TestBasicStatsWithOnlyMax(t *testing.T) {
"c_max": float64(4), "c_max": float64(4),
"d_max": float64(6), "d_max": float64(6),
"e_max": float64(200), "e_max": float64(200),
"f_max": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -256,7 +244,6 @@ func TestBasicStatsWithOnlyMean(t *testing.T) {
"c_mean": float64(3), "c_mean": float64(3),
"d_mean": float64(4), "d_mean": float64(4),
"e_mean": float64(200), "e_mean": float64(200),
"f_mean": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -282,7 +269,6 @@ func TestBasicStatsWithOnlySum(t *testing.T) {
"c_sum": float64(6), "c_sum": float64(6),
"d_sum": float64(8), "d_sum": float64(8),
"e_sum": float64(200), "e_sum": float64(200),
"f_sum": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -413,8 +399,6 @@ func TestBasicStatsWithMinAndMax(t *testing.T) {
"d_min": float64(2), "d_min": float64(2),
"e_max": float64(200), //e "e_max": float64(200), //e
"e_min": float64(200), "e_min": float64(200),
"f_max": float64(200), //f
"f_min": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -466,11 +450,6 @@ func TestBasicStatsWithAllStats(t *testing.T) {
"e_min": float64(200), "e_min": float64(200),
"e_mean": float64(200), "e_mean": float64(200),
"e_sum": float64(200), "e_sum": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
"f_sum": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",

View File

@@ -107,8 +107,6 @@ func convert(in interface{}) (float64, bool) {
return v, true return v, true
case int64: case int64:
return float64(v), true return float64(v), true
case uint64:
return float64(v), true
default: default:
return 0, false return 0, false
} }

View File

@@ -38,7 +38,6 @@ var m2, _ = metric.New("m1",
"i": float64(1), "i": float64(1),
"j": float64(1), "j": float64(1),
"k": float64(200), "k": float64(200),
"l": uint64(200),
"ignoreme": "string", "ignoreme": "string",
"andme": true, "andme": true,
}, },
@@ -86,8 +85,6 @@ func TestMinMaxWithPeriod(t *testing.T) {
"j_min": float64(1), "j_min": float64(1),
"k_max": float64(200), "k_max": float64(200),
"k_min": float64(200), "k_min": float64(200),
"l_max": float64(200),
"l_min": float64(200),
} }
expectedTags := map[string]string{ expectedTags := map[string]string{
"foo": "bar", "foo": "bar",
@@ -157,8 +154,6 @@ func TestMinMaxDifferentPeriods(t *testing.T) {
"j_min": float64(1), "j_min": float64(1),
"k_max": float64(200), "k_max": float64(200),
"k_min": float64(200), "k_min": float64(200),
"l_max": float64(200),
"l_min": float64(200),
} }
expectedTags = map[string]string{ expectedTags = map[string]string{
"foo": "bar", "foo": "bar",

View File

@@ -343,9 +343,6 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
} }
func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error { func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
h.mu.Lock()
defer h.mu.Unlock()
h.handler.SetTimePrecision(getPrecisionMultiplier(precision)) h.handler.SetTimePrecision(getPrecisionMultiplier(precision))
h.handler.SetTimeFunc(func() time.Time { return t }) h.handler.SetTimeFunc(func() time.Time { return t })
metrics, err := h.parser.Parse(b) metrics, err := h.parser.Parse(b)

View File

@@ -293,7 +293,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
timestamp = time.Unix(0, iv) timestamp = time.Unix(0, iv)
} }
case SYSLOG_TIMESTAMP: case SYSLOG_TIMESTAMP:
ts, err := time.ParseInLocation(time.Stamp, v, p.loc) ts, err := time.ParseInLocation("Jan 02 15:04:05", v, p.loc)
if err == nil { if err == nil {
if ts.Year() == 0 { if ts.Year() == 0 {
ts = ts.AddDate(timestamp.Year(), 0, 0) ts = ts.AddDate(timestamp.Year(), 0, 0)

View File

@@ -971,39 +971,14 @@ func TestNewlineInPatterns(t *testing.T) {
require.NotNil(t, m) require.NotNil(t, m)
} }
func TestSyslogTimestamp(t *testing.T) { func TestSyslogTimestampParser(t *testing.T) {
tests := []struct { p := &Parser{
name string Patterns: []string{`%{SYSLOGTIMESTAMP:timestamp:ts-syslog} value=%{NUMBER:value:int}`},
line string timeFunc: func() time.Time { return time.Date(2018, time.April, 1, 0, 0, 0, 0, nil) },
expected time.Time
}{
{
name: "two digit day of month",
line: "Sep 25 09:01:55 value=42",
expected: time.Date(2018, time.September, 25, 9, 1, 55, 0, time.UTC),
},
{
name: "one digit day of month single space",
line: "Sep 2 09:01:55 value=42",
expected: time.Date(2018, time.September, 2, 9, 1, 55, 0, time.UTC),
},
{
name: "one digit day of month double space",
line: "Sep 2 09:01:55 value=42",
expected: time.Date(2018, time.September, 2, 9, 1, 55, 0, time.UTC),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &Parser{
Patterns: []string{`%{SYSLOGTIMESTAMP:timestamp:ts-syslog} value=%{NUMBER:value:int}`},
timeFunc: func() time.Time { return time.Date(2017, time.April, 1, 0, 0, 0, 0, time.UTC) },
}
require.NoError(t, p.Compile())
m, err := p.ParseLine(tt.line)
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, tt.expected, m.Time())
})
} }
require.NoError(t, p.Compile())
m, err := p.ParseLine("Sep 25 09:01:55 value=42")
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year())
} }

View File

@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"database/sql" "database/sql"
"fmt" "fmt"
"log"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@@ -79,7 +80,7 @@ var sampleConfig = `
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST ## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
gather_process_list = true gather_process_list = true
# #
## gather user statistics from INFORMATION_SCHEMA.USER_STATISTICS ## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS
gather_user_statistics = true gather_user_statistics = true
# #
## gather auto_increment columns and max values from information schema ## gather auto_increment columns and max values from information schema
@@ -281,8 +282,9 @@ const (
GROUP BY command,state GROUP BY command,state
ORDER BY null` ORDER BY null`
infoSchemaUserStatisticsQuery = ` infoSchemaUserStatisticsQuery = `
SELECT * SELECT *,count(*)
FROM information_schema.user_statistics` FROM information_schema.user_statistics
GROUP BY user`
infoSchemaAutoIncQuery = ` infoSchemaAutoIncQuery = `
SELECT table_schema, table_name, column_name, auto_increment, SELECT table_schema, table_name, column_name, auto_increment,
CAST(pow(2, case data_type CAST(pow(2, case data_type
@@ -759,6 +761,103 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
if len(fields) > 0 { if len(fields) > 0 {
acc.AddFields("mysql", fields, tags) acc.AddFields("mysql", fields, tags)
} }
// gather connection metrics from processlist for each user
if m.GatherProcessList {
conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
if err != nil {
log.Printf("E! MySQL Error gathering process list: %s", err)
} else {
for conn_rows.Next() {
var user string
var connections int64
err = conn_rows.Scan(&user, &connections)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := make(map[string]interface{})
if err != nil {
return err
}
fields["connections"] = connections
acc.AddFields("mysql_users", fields, tags)
}
}
}
// gather connection metrics from user_statistics for each user
if m.GatherUserStatistics {
conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user")
if err != nil {
log.Printf("E! MySQL Error gathering user stats: %s", err)
} else {
for conn_rows.Next() {
var user string
var total_connections int64
var concurrent_connections int64
var connected_time int64
var busy_time int64
var cpu_time int64
var bytes_received int64
var bytes_sent int64
var binlog_bytes_written int64
var rows_fetched int64
var rows_updated int64
var table_rows_read int64
var select_commands int64
var update_commands int64
var other_commands int64
var commit_transactions int64
var rollback_transactions int64
var denied_connections int64
var lost_connections int64
var access_denied int64
var empty_queries int64
var total_ssl_connections int64
err = conn_rows.Scan(&user, &total_connections, &concurrent_connections,
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
&empty_queries, &total_ssl_connections,
)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := map[string]interface{}{
"total_connections": total_connections,
"concurrent_connections": concurrent_connections,
"connected_time": connected_time,
"busy_time": busy_time,
"cpu_time": cpu_time,
"bytes_received": bytes_received,
"bytes_sent": bytes_sent,
"binlog_bytes_written": binlog_bytes_written,
"rows_fetched": rows_fetched,
"rows_updated": rows_updated,
"table_rows_read": table_rows_read,
"select_commands": select_commands,
"update_commands": update_commands,
"other_commands": other_commands,
"commit_transactions": commit_transactions,
"rollback_transactions": rollback_transactions,
"denied_connections": denied_connections,
"lost_connections": lost_connections,
"access_denied": access_denied,
"empty_queries": empty_queries,
"total_ssl_connections": total_ssl_connections,
}
acc.AddFields("mysql_user_stats", fields, tags)
}
}
}
return nil return nil
} }
@@ -809,29 +908,6 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
} else { } else {
acc.AddFields("mysql_process_list", fields, tags) acc.AddFields("mysql_process_list", fields, tags)
} }
// get count of connections from each user
conn_rows, err := db.Query("SELECT user, sum(1) AS connections FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
if err != nil {
return err
}
for conn_rows.Next() {
var user string
var connections int64
err = conn_rows.Scan(&user, &connections)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := make(map[string]interface{})
fields["connections"] = connections
acc.AddFields("mysql_users", fields, tags)
}
return nil return nil
} }
@@ -841,190 +917,77 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr
// run query // run query
rows, err := db.Query(infoSchemaUserStatisticsQuery) rows, err := db.Query(infoSchemaUserStatisticsQuery)
if err != nil { if err != nil {
// disable collecting if table is not found (mysql specific error)
// (suppresses repeat errors)
if strings.Contains(err.Error(), "nknown table 'user_statistics'") {
m.GatherUserStatistics = false
}
return err return err
} }
defer rows.Close() defer rows.Close()
var (
cols, err := columnsToLower(rows.Columns()) user string
if err != nil { total_connections int64
return err concurrent_connections int64
} connected_time int64
busy_time int64
read, err := getColSlice(len(cols)) cpu_time int64
if err != nil { bytes_received int64
return err bytes_sent int64
} binlog_bytes_written int64
rows_fetched int64
rows_updated int64
table_rows_read int64
select_commands int64
update_commands int64
other_commands int64
commit_transactions int64
rollback_transactions int64
denied_connections int64
lost_connections int64
access_denied int64
empty_queries int64
total_ssl_connections int64
count uint32
)
servtag := getDSNTag(serv) servtag := getDSNTag(serv)
for rows.Next() { for rows.Next() {
err = rows.Scan(read...) err = rows.Scan(&user, &total_connections, &concurrent_connections,
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
&empty_queries, &total_ssl_connections, &count,
)
if err != nil { if err != nil {
return err return err
} }
tags := map[string]string{"server": servtag, "user": *read[0].(*string)} tags := map[string]string{"server": servtag, "user": user}
fields := map[string]interface{}{} fields := map[string]interface{}{
for i := range cols { "total_connections": total_connections,
if i == 0 { "concurrent_connections": concurrent_connections,
continue // skip "user" "connected_time": connected_time,
} "busy_time": busy_time,
switch v := read[i].(type) { "cpu_time": cpu_time,
case *int64: "bytes_received": bytes_received,
fields[cols[i]] = *v "bytes_sent": bytes_sent,
case *float64: "binlog_bytes_written": binlog_bytes_written,
fields[cols[i]] = *v "rows_fetched": rows_fetched,
case *string: "rows_updated": rows_updated,
fields[cols[i]] = *v "table_rows_read": table_rows_read,
default: "select_commands": select_commands,
return fmt.Errorf("Unknown column type - %T", v) "update_commands": update_commands,
} "other_commands": other_commands,
"commit_transactions": commit_transactions,
"rollback_transactions": rollback_transactions,
"denied_connections": denied_connections,
"lost_connections": lost_connections,
"access_denied": access_denied,
"empty_queries": empty_queries,
"total_ssl_connections": total_ssl_connections,
} }
acc.AddFields("mysql_user_stats", fields, tags) acc.AddFields("mysql_user_stats", fields, tags)
} }
return nil return nil
} }
// columnsToLower converts selected column names to lowercase.
func columnsToLower(s []string, e error) ([]string, error) {
if e != nil {
return nil, e
}
d := make([]string, len(s))
for i := range s {
d[i] = strings.ToLower(s[i])
}
return d, nil
}
// getColSlice returns an in interface slice that can be used in the row.Scan().
func getColSlice(l int) ([]interface{}, error) {
// list of all possible column names
var (
user string
total_connections int64
concurrent_connections int64
connected_time int64
busy_time int64
cpu_time int64
bytes_received int64
bytes_sent int64
binlog_bytes_written int64
rows_read int64
rows_sent int64
rows_deleted int64
rows_inserted int64
rows_updated int64
select_commands int64
update_commands int64
other_commands int64
commit_transactions int64
rollback_transactions int64
denied_connections int64
lost_connections int64
access_denied int64
empty_queries int64
total_ssl_connections int64
max_statement_time_exceeded int64
// maria specific
fbusy_time float64
fcpu_time float64
// percona specific
rows_fetched int64
table_rows_read int64
)
switch l {
case 23: // maria5
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&fbusy_time,
&fcpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_read,
&rows_sent,
&rows_deleted,
&rows_inserted,
&rows_updated,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
}, nil
case 25: // maria10
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&fbusy_time,
&fcpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_read,
&rows_sent,
&rows_deleted,
&rows_inserted,
&rows_updated,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
&total_ssl_connections,
&max_statement_time_exceeded,
}, nil
case 22: // percona
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&busy_time,
&cpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_fetched,
&rows_updated,
&table_rows_read,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
&total_ssl_connections,
}, nil
}
return nil, fmt.Errorf("Not Supported - %d columns", l)
}
// gatherPerfTableIOWaits can be used to get total count and time // gatherPerfTableIOWaits can be used to get total count and time
// of I/O wait event for each table and process // of I/O wait event for each table and process
func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error { func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {

View File

@@ -8,7 +8,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
nats "github.com/nats-io/go-nats" "github.com/nats-io/nats"
) )
type natsError struct { type natsError struct {

View File

@@ -5,7 +5,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
nats "github.com/nats-io/go-nats" "github.com/nats-io/nats"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )

View File

@@ -14,7 +14,7 @@ To use this plugin you must enable the [monitoring](https://www.openldap.org/dev
# ldaps, starttls, or no encryption. default is an empty string, disabling all encryption. # ldaps, starttls, or no encryption. default is an empty string, disabling all encryption.
# note that port will likely need to be changed to 636 for ldaps # note that port will likely need to be changed to 636 for ldaps
# valid options: "" | "starttls" | "ldaps" # valid options: "" | "starttls" | "ldaps"
tls = "" ssl = ""
# skip peer certificate verification. Default is false. # skip peer certificate verification. Default is false.
insecure_skip_verify = false insecure_skip_verify = false

View File

@@ -15,11 +15,9 @@ import (
type Openldap struct { type Openldap struct {
Host string Host string
Port int Port int
SSL string `toml:"ssl"` // Deprecated in 1.7; use TLS Ssl string
TLS string `toml:"tls"`
InsecureSkipVerify bool InsecureSkipVerify bool
SSLCA string `toml:"ssl_ca"` // Deprecated in 1.7; use TLSCA SslCa string
TLSCA string `toml:"tls_ca"`
BindDn string BindDn string
BindPassword string BindPassword string
ReverseMetricNames bool ReverseMetricNames bool
@@ -32,7 +30,7 @@ const sampleConfig string = `
# ldaps, starttls, or no encryption. default is an empty string, disabling all encryption. # ldaps, starttls, or no encryption. default is an empty string, disabling all encryption.
# note that port will likely need to be changed to 636 for ldaps # note that port will likely need to be changed to 636 for ldaps
# valid options: "" | "starttls" | "ldaps" # valid options: "" | "starttls" | "ldaps"
tls = "" ssl = ""
# skip peer certificate verification. Default is false. # skip peer certificate verification. Default is false.
insecure_skip_verify = false insecure_skip_verify = false
@@ -72,11 +70,9 @@ func NewOpenldap() *Openldap {
return &Openldap{ return &Openldap{
Host: "localhost", Host: "localhost",
Port: 389, Port: 389,
SSL: "", Ssl: "",
TLS: "",
InsecureSkipVerify: false, InsecureSkipVerify: false,
SSLCA: "", SslCa: "",
TLSCA: "",
BindDn: "", BindDn: "",
BindPassword: "", BindPassword: "",
ReverseMetricNames: false, ReverseMetricNames: false,
@@ -85,19 +81,12 @@ func NewOpenldap() *Openldap {
// gather metrics // gather metrics
func (o *Openldap) Gather(acc telegraf.Accumulator) error { func (o *Openldap) Gather(acc telegraf.Accumulator) error {
if o.TLS == "" {
o.TLS = o.SSL
}
if o.TLSCA == "" {
o.TLSCA = o.SSLCA
}
var err error var err error
var l *ldap.Conn var l *ldap.Conn
if o.TLS != "" { if o.Ssl != "" {
// build tls config // build tls config
clientTLSConfig := tls.ClientConfig{ clientTLSConfig := tls.ClientConfig{
TLSCA: o.TLSCA, SSLCA: o.SslCa,
InsecureSkipVerify: o.InsecureSkipVerify, InsecureSkipVerify: o.InsecureSkipVerify,
} }
tlsConfig, err := clientTLSConfig.TLSConfig() tlsConfig, err := clientTLSConfig.TLSConfig()
@@ -105,13 +94,13 @@ func (o *Openldap) Gather(acc telegraf.Accumulator) error {
acc.AddError(err) acc.AddError(err)
return nil return nil
} }
if o.TLS == "ldaps" { if o.Ssl == "ldaps" {
l, err = ldap.DialTLS("tcp", fmt.Sprintf("%s:%d", o.Host, o.Port), tlsConfig) l, err = ldap.DialTLS("tcp", fmt.Sprintf("%s:%d", o.Host, o.Port), tlsConfig)
if err != nil { if err != nil {
acc.AddError(err) acc.AddError(err)
return nil return nil
} }
} else if o.TLS == "starttls" { } else if o.Ssl == "starttls" {
l, err = ldap.Dial("tcp", fmt.Sprintf("%s:%d", o.Host, o.Port)) l, err = ldap.Dial("tcp", fmt.Sprintf("%s:%d", o.Host, o.Port))
if err != nil { if err != nil {
acc.AddError(err) acc.AddError(err)
@@ -119,7 +108,7 @@ func (o *Openldap) Gather(acc telegraf.Accumulator) error {
} }
err = l.StartTLS(tlsConfig) err = l.StartTLS(tlsConfig)
} else { } else {
acc.AddError(fmt.Errorf("Invalid setting for ssl: %s", o.TLS)) acc.AddError(fmt.Errorf("Invalid setting for ssl: %s", o.Ssl))
return nil return nil
} }
} else { } else {

View File

@@ -1,11 +1,10 @@
package openldap package openldap
import ( import (
"gopkg.in/ldap.v2"
"strconv" "strconv"
"testing" "testing"
"gopkg.in/ldap.v2"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@@ -75,7 +74,7 @@ func TestOpenldapStartTLS(t *testing.T) {
o := &Openldap{ o := &Openldap{
Host: testutil.GetLocalHost(), Host: testutil.GetLocalHost(),
Port: 389, Port: 389,
SSL: "starttls", Ssl: "starttls",
InsecureSkipVerify: true, InsecureSkipVerify: true,
} }
@@ -93,7 +92,7 @@ func TestOpenldapLDAPS(t *testing.T) {
o := &Openldap{ o := &Openldap{
Host: testutil.GetLocalHost(), Host: testutil.GetLocalHost(),
Port: 636, Port: 636,
SSL: "ldaps", Ssl: "ldaps",
InsecureSkipVerify: true, InsecureSkipVerify: true,
} }
@@ -111,7 +110,7 @@ func TestOpenldapInvalidSSL(t *testing.T) {
o := &Openldap{ o := &Openldap{
Host: testutil.GetLocalHost(), Host: testutil.GetLocalHost(),
Port: 636, Port: 636,
SSL: "invalid", Ssl: "invalid",
InsecureSkipVerify: true, InsecureSkipVerify: true,
} }
@@ -130,7 +129,7 @@ func TestOpenldapBind(t *testing.T) {
o := &Openldap{ o := &Openldap{
Host: testutil.GetLocalHost(), Host: testutil.GetLocalHost(),
Port: 389, Port: 389,
SSL: "", Ssl: "",
InsecureSkipVerify: true, InsecureSkipVerify: true,
BindDn: "cn=manager,cn=config", BindDn: "cn=manager,cn=config",
BindPassword: "secret", BindPassword: "secret",
@@ -158,7 +157,7 @@ func TestOpenldapReverseMetrics(t *testing.T) {
o := &Openldap{ o := &Openldap{
Host: testutil.GetLocalHost(), Host: testutil.GetLocalHost(),
Port: 389, Port: 389,
SSL: "", Ssl: "",
InsecureSkipVerify: true, InsecureSkipVerify: true,
BindDn: "cn=manager,cn=config", BindDn: "cn=manager,cn=config",
BindPassword: "secret", BindPassword: "secret",

View File

@@ -175,7 +175,7 @@ func (p *Ping) args(url string) []string {
} }
if p.Timeout > 0 { if p.Timeout > 0 {
switch runtime.GOOS { switch runtime.GOOS {
case "darwin", "freebsd", "netbsd", "openbsd": case "darwin":
args = append(args, "-W", strconv.FormatFloat(p.Timeout*1000, 'f', -1, 64)) args = append(args, "-W", strconv.FormatFloat(p.Timeout*1000, 'f', -1, 64))
case "linux": case "linux":
args = append(args, "-W", strconv.FormatFloat(p.Timeout, 'f', -1, 64)) args = append(args, "-W", strconv.FormatFloat(p.Timeout, 'f', -1, 64))
@@ -186,7 +186,7 @@ func (p *Ping) args(url string) []string {
} }
if p.Deadline > 0 { if p.Deadline > 0 {
switch runtime.GOOS { switch runtime.GOOS {
case "darwin", "freebsd", "netbsd", "openbsd": case "darwin":
args = append(args, "-t", strconv.Itoa(p.Deadline)) args = append(args, "-t", strconv.Itoa(p.Deadline))
case "linux": case "linux":
args = append(args, "-w", strconv.Itoa(p.Deadline)) args = append(args, "-w", strconv.Itoa(p.Deadline))
@@ -197,10 +197,10 @@ func (p *Ping) args(url string) []string {
} }
if p.Interface != "" { if p.Interface != "" {
switch runtime.GOOS { switch runtime.GOOS {
case "darwin", "freebsd", "netbsd", "openbsd":
args = append(args, "-S", p.Interface)
case "linux": case "linux":
args = append(args, "-I", p.Interface) args = append(args, "-I", p.Interface)
case "freebsd", "darwin":
args = append(args, "-S", p.Interface)
default: default:
// Not sure the best option here, just assume GNU ping? // Not sure the best option here, just assume GNU ping?
args = append(args, "-I", p.Interface) args = append(args, "-I", p.Interface)

View File

@@ -4,7 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path"
"strings" "strings"
"time" "time"
@@ -28,37 +28,36 @@ func getQueueDirectory() (string, error) {
return strings.TrimSpace(string(qd)), nil return strings.TrimSpace(string(qd)), nil
} }
func qScan(path string, acc telegraf.Accumulator) (int64, int64, int64, error) { func qScan(path string) (int64, int64, int64, error) {
f, err := os.Open(path)
if err != nil {
return 0, 0, 0, err
}
finfos, err := f.Readdir(-1)
f.Close()
if err != nil {
return 0, 0, 0, err
}
var length, size int64 var length, size int64
var oldest time.Time var oldest time.Time
err := filepath.Walk(path, func(_ string, finfo os.FileInfo, err error) error { for _, finfo := range finfos {
if err != nil {
acc.AddError(fmt.Errorf("error scanning %s: %s", path, err))
return nil
}
if finfo.IsDir() {
return nil
}
length++ length++
size += finfo.Size() size += finfo.Size()
ctime := statCTime(finfo.Sys()) ctime := statCTime(finfo.Sys())
if ctime.IsZero() { if ctime.IsZero() {
return nil continue
} }
if oldest.IsZero() || ctime.Before(oldest) { if oldest.IsZero() || ctime.Before(oldest) {
oldest = ctime oldest = ctime
} }
return nil
})
if err != nil {
return 0, 0, 0, err
} }
var age int64 var age int64
if !oldest.IsZero() { if !oldest.IsZero() {
age = int64(time.Now().Sub(oldest) / time.Second) age = int64(time.Now().Sub(oldest) / time.Second)
} else if length != 0 { } else if len(finfos) != 0 {
// system doesn't support ctime // system doesn't support ctime
age = -1 age = -1
} }
@@ -78,8 +77,8 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
} }
} }
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} { for _, q := range []string{"active", "hold", "incoming", "maildrop"} {
length, size, age, err := qScan(filepath.Join(p.QueueDirectory, q), acc) length, size, age, err := qScan(path.Join(p.QueueDirectory, q))
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err)) acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err))
continue continue
@@ -91,6 +90,30 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
acc.AddFields("postfix_queue", fields, map[string]string{"queue": q}) acc.AddFields("postfix_queue", fields, map[string]string{"queue": q})
} }
var dLength, dSize int64
dAge := int64(-1)
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"} {
length, size, age, err := qScan(path.Join(p.QueueDirectory, "deferred", q))
if err != nil {
if os.IsNotExist(err) {
// the directories are created on first use
continue
}
acc.AddError(fmt.Errorf("error scanning queue deferred/%s: %s", q, err))
return nil
}
dLength += length
dSize += size
if age > dAge {
dAge = age
}
}
fields := map[string]interface{}{"length": dLength, "size": dSize}
if dAge != -1 {
fields["age"] = dAge
}
acc.AddFields("postfix_queue", fields, map[string]string{"queue": "deferred"})
return nil return nil
} }

View File

@@ -3,7 +3,7 @@ package postfix
import ( import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path"
"testing" "testing"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@@ -16,16 +16,19 @@ func TestGather(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(td) defer os.RemoveAll(td)
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred/0/0", "deferred/F/F"} { for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
require.NoError(t, os.MkdirAll(filepath.FromSlash(td+"/"+q), 0755)) require.NoError(t, os.Mkdir(path.Join(td, q), 0755))
}
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "F"} { // "E" deliberately left off
require.NoError(t, os.Mkdir(path.Join(td, "deferred", q), 0755))
} }
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/01"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/02"), []byte("defg"), 0644)) require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "02"), []byte("defg"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/hold/01"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(path.Join(td, "hold", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/incoming/01"), []byte("abcd"), 0644)) require.NoError(t, ioutil.WriteFile(path.Join(td, "incoming", "01"), []byte("abcd"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/0/0/01"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "0", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/F/F/F1"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "F", "F1"), []byte("abc"), 0644))
p := Postfix{ p := Postfix{
QueueDirectory: td, QueueDirectory: td,

View File

@@ -14,6 +14,13 @@
## If no servers are specified, then localhost is used as the host. ## If no servers are specified, then localhost is used as the host.
## If no port is specified, 6379 is used ## If no port is specified, 6379 is used
servers = ["tcp://localhost:6379"] servers = ["tcp://localhost:6379"]
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = true
``` ```
### Measurements & Fields: ### Measurements & Fields:

View File

@@ -13,11 +13,13 @@ import (
"github.com/go-redis/redis" "github.com/go-redis/redis"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
type Redis struct { type Redis struct {
Servers []string Servers []string
tls.ClientConfig
clients []Client clients []Client
initialized bool initialized bool
@@ -56,6 +58,13 @@ var sampleConfig = `
## If no servers are specified, then localhost is used as the host. ## If no servers are specified, then localhost is used as the host.
## If no port is specified, 6379 is used ## If no port is specified, 6379 is used
servers = ["tcp://localhost:6379"] servers = ["tcp://localhost:6379"]
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = true
` `
func (r *Redis) SampleConfig() string { func (r *Redis) SampleConfig() string {
@@ -109,12 +118,18 @@ func (r *Redis) init(acc telegraf.Accumulator) error {
address = u.Host address = u.Host
} }
tlsConfig, err := r.ClientConfig.TLSConfig()
if err != nil {
return err
}
client := redis.NewClient( client := redis.NewClient(
&redis.Options{ &redis.Options{
Addr: address, Addr: address,
Password: password, Password: password,
Network: u.Scheme, Network: u.Scheme,
PoolSize: 1, PoolSize: 1,
TLSConfig: tlsConfig,
}, },
) )

View File

@@ -1,30 +0,0 @@
# Swap Input Plugin
The swap plugin collects system swap metrics.
For a more information on what swap memory is, read [All about Linux swap space](https://www.linux.com/news/all-about-linux-swap-space).
### Configuration:
```toml
# Read metrics about swap memory usage
[[inputs.swap]]
# no configuration
```
### Metrics:
- swap
- fields:
- free (int)
- total (int)
- used (int)
- used_percent (float)
- in (int)
- out (int)
### Example Output:
```
swap total=20855394304i,used_percent=45.43883523785713,used=9476448256i,free=1715331072i 1511894782000000000
```

View File

@@ -42,9 +42,45 @@ func (s *MemStats) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
type SwapStats struct {
ps PS
}
func (_ *SwapStats) Description() string {
return "Read metrics about swap memory usage"
}
func (_ *SwapStats) SampleConfig() string { return "" }
func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
swap, err := s.ps.SwapStat()
if err != nil {
return fmt.Errorf("error getting swap memory info: %s", err)
}
fieldsG := map[string]interface{}{
"total": swap.Total,
"used": swap.Used,
"free": swap.Free,
"used_percent": swap.UsedPercent,
}
fieldsC := map[string]interface{}{
"in": swap.Sin,
"out": swap.Sout,
}
acc.AddGauge("swap", fieldsG, nil)
acc.AddCounter("swap", fieldsC, nil)
return nil
}
func init() { func init() {
ps := newSystemPS() ps := newSystemPS()
inputs.Add("mem", func() telegraf.Input { inputs.Add("mem", func() telegraf.Input {
return &MemStats{ps: ps} return &MemStats{ps: ps}
}) })
inputs.Add("swap", func() telegraf.Input {
return &SwapStats{ps: ps}
})
} }

View File

@@ -30,6 +30,17 @@ func TestMemStats(t *testing.T) {
mps.On("VMStat").Return(vms, nil) mps.On("VMStat").Return(vms, nil)
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
err = (&MemStats{&mps}).Gather(&acc) err = (&MemStats{&mps}).Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
@@ -50,4 +61,15 @@ func TestMemStats(t *testing.T) {
acc.AssertContainsTaggedFields(t, "mem", memfields, make(map[string]string)) acc.AssertContainsTaggedFields(t, "mem", memfields, make(map[string]string))
acc.Metrics = nil acc.Metrics = nil
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swapfields := map[string]interface{}{
"total": uint64(8123),
"used": uint64(1232),
"used_percent": float64(12.2),
"free": uint64(6412),
}
acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string))
} }

View File

@@ -1,47 +0,0 @@
package system
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type SwapStats struct {
ps PS
}
func (_ *SwapStats) Description() string {
return "Read metrics about swap memory usage"
}
func (_ *SwapStats) SampleConfig() string { return "" }
func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
swap, err := s.ps.SwapStat()
if err != nil {
return fmt.Errorf("error getting swap memory info: %s", err)
}
fieldsG := map[string]interface{}{
"total": swap.Total,
"used": swap.Used,
"free": swap.Free,
"used_percent": swap.UsedPercent,
}
fieldsC := map[string]interface{}{
"in": swap.Sin,
"out": swap.Sout,
}
acc.AddGauge("swap", fieldsG, nil)
acc.AddCounter("swap", fieldsC, nil)
return nil
}
func init() {
ps := newSystemPS()
inputs.Add("swap", func() telegraf.Input {
return &SwapStats{ps: ps}
})
}

View File

@@ -1,38 +0,0 @@
package system
import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/shirou/gopsutil/mem"
"github.com/stretchr/testify/require"
)
func TestSwapStats(t *testing.T) {
var mps MockPS
var err error
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swapfields := map[string]interface{}{
"total": uint64(8123),
"used": uint64(1232),
"used_percent": float64(12.2),
"free": uint64(6412),
}
acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string))
}

View File

@@ -1,13 +1,16 @@
# win_perf_counters readme # win_perf_counters readme
This document presents the input plugin to read Performance Counters on Windows operating systems. Input plugin to read Performance Counters on Windows operating systems.
The configuration is parsed and then tested for validity, such as Configuration is parsed and then tested for validity such as
whether the Object, Instance and Counter exist on Telegraf startup. whether the Object, Instance and Counter exist on Telegraf startup.
Counter paths are refreshed periodically, see the [CountersRefreshInterval](#countersrefreshinterval) Counter paths are refreshed periodically, see [CountersRefreshInterval](#countersrefreshinterval)
configuration parameter for more info. configuration parameter for more info.
Wildcards can be used in instance and counter names. Partial wildcards are supported only
in instance names on Windows Vista and newer.
In case of query for all instances `["*"]`, the plugin does not return the instance `_Total` In case of query for all instances `["*"]`, the plugin does not return the instance `_Total`
by default. See [IncludeTotal](#includetotal) for more info. by default. See [IncludeTotal](#includetotal) for more info.
@@ -16,7 +19,7 @@ by default. See [IncludeTotal](#includetotal) for more info.
The examples contained in this file have been found on the internet The examples contained in this file have been found on the internet
as counters used when performance monitoring as counters used when performance monitoring
Active Directory and IIS in particular. Active Directory and IIS in particular.
There are a lot of other good objects to monitor, if you know what to look for. There are a lot other good objects to monitor, if you know what to look for.
This file is likely to be updated in the future with more examples for This file is likely to be updated in the future with more examples for
useful configurations for separate scenarios. useful configurations for separate scenarios.
@@ -31,41 +34,23 @@ Bool, if set to `true` will print out all matching performance objects.
Example: Example:
`PrintValid=true` `PrintValid=true`
#### UseWildcardsExpansion
If `UseWildcardsExpansion` is set to true, wildcards can be used in the
instance name and the counter name. When using localized Windows, counters
will be also be localized. Instance indexes will also be returned in the
instance name.
Partial wildcards (e.g. `chrome*`) are supported only in the instance name on Windows Vista and newer.
If disabled, wildcards (not partial) in instance names can still be used, but
instance indexes will not be returned in the instance names.
Example:
`UseWildcardsExpansion=true`
#### CountersRefreshInterval #### CountersRefreshInterval
Configured counters are matched against available counters at the interval Configured counters are matched against available counters at the interval
specified by the `CountersRefreshInterval` parameter. The default value is `1m` (1 minute). specified by the `CountersRefreshInterval` parameter. Default value is `1m` (1 minute).
If wildcards are used in instance or counter names, they are expanded at this point, if the `UseWildcardsExpansion` param is set to `true`. If wildcards are used in instance or counter names, they are expanded at this point.
Setting the `CountersRefreshInterval` too low (order of seconds) can cause Telegraf to create Setting `CountersRefreshInterval` too low (order of seconds) can cause Telegraf to create
a high CPU load. a high CPU load.
Set it to `0s` to disable periodic refreshing. Set to `0s` to disable periodic refreshing.
Example:
`CountersRefreshInterval=1m`
#### PreVistaSupport #### PreVistaSupport
_Deprecated. Necessary features on Windows Vista and newer are checked dynamically_ _Deprecated. Necessary features on Windows Vista and newer are checked dynamically_
Bool, if set to `true`, the plugin will use the localized PerfCounter interface that has been present since before Vista for backwards compatability. Bool, if set to `true` will use the localized PerfCounter interface that has been present since before Vista for backwards compatability.
It is recommended NOT to use this on OSes starting with Vista and newer because it requires more configuration to use this than the newer interface present since Vista. It is recommended NOT to use this on OSes starting with Vista and newer because it requires more configuration to use this than the newer interface present since Vista.
@@ -77,12 +62,12 @@ Example for Windows Server 2003, this would be set to true:
See Entry below. See Entry below.
### Entry ### Entry
A new configuration entry consists of the TOML header starting with, A new configuration entry consists of the TOML header to start with,
`[[inputs.win_perf_counters.object]]`. `[[inputs.win_perf_counters.object]]`.
This must follow before other plugin configurations, This must follow before other plugin configurations,
beneath the main win_perf_counters entry, `[[inputs.win_perf_counters]]`. beneath the main win_perf_counters entry, `[[inputs.win_perf_counters]]`.
Following this are 3 required key/value pairs and three optional parameters and their usage. Following this are 3 required key/value pairs and the three optional parameters and their usage.
#### ObjectName #### ObjectName
**Required** **Required**
@@ -94,18 +79,16 @@ Example: `ObjectName = "LogicalDisk"`
#### Instances #### Instances
**Required** **Required**
The instances key (this is an array) declares the instances of a counter you would like returned, Instances key (this is an array) is the instances of a counter you would like returned,
it can be one or more values. it can be one or more values.
Example: `Instances = ["C:","D:","E:"]` Example, `Instances = ["C:","D:","E:"]` will return only for the instances
This will return only for the instances
C:, D: and E: where relevant. To get all instances of a Counter, use `["*"]` only. C:, D: and E: where relevant. To get all instances of a Counter, use `["*"]` only.
By default any results containing `_Total` are stripped, By default any results containing `_Total` are stripped,
unless this is specified as the wanted instance. unless this is specified as the wanted instance.
Alternatively see the option `IncludeTotal` below. Alternatively see the option `IncludeTotal` below.
It is also possible to set partial wildcards, eg. `["chrome*"]`, if the `UseWildcardsExpansion` param is set to `true` It is also possible to set partial wildcards, eg. `["chrome*"]`
Some Objects do not have instances to select from at all. Some Objects do not have instances to select from at all.
Here only one option is valid if you want data back, Here only one option is valid if you want data back,
@@ -114,43 +97,41 @@ and that is to specify `Instances = ["------"]`.
#### Counters #### Counters
**Required** **Required**
The Counters key (this is an array) declares the counters of the ObjectName Counters key (this is an array) is the counters of the ObjectName
you would like returned, it can also be one or more values. you would like returned, it can also be one or more values.
Example: `Counters = ["% Idle Time", "% Disk Read Time", "% Disk Write Time"]` Example: `Counters = ["% Idle Time", "% Disk Read Time", "% Disk Write Time"]`
This must be specified for every counter you want the results of,
This must be specified for every counter you want the results of, or use or use `["*"]` for all the counters for object.
`["*"]` for all the counters of the object, if the `UseWildcardsExpansion` param
is set to `true`.
#### Measurement #### Measurement
*Optional* *Optional*
This key is optional. If it is not set it will be `win_perf_counters`. This key is optional, if it is not set it will be `win_perf_counters`.
In InfluxDB this is the key underneath which the returned data is stored. In InfluxDB this is the key by which the returned data is stored underneath,
So for ordering your data in a good manner, so for ordering your data in a good manner,
this is a good key to set with a value when you want your IIS and Disk results stored this is a good key to set with a value when you want your IIS and Disk results stored
separately from Processor results. separately from Processor results.
Example: `Measurement = "win_disk"`` Example: `Measurement = "win_disk"
#### IncludeTotal #### IncludeTotal
*Optional* *Optional*
This key is optional. It is a simple bool. This key is optional, it is a simple bool.
If it is not set to true or included it is treated as false. If it is not set to true or included it is treated as false.
This key only has effect if the Instances key is set to `["*"]` This key only has an effect if the Instances key is set to `["*"]`
and you would also like all instances containing `_Total` to be returned, and you would also like all instances containing `_Total` returned,
like `_Total`, `0,_Total` and so on where applicable like `_Total`, `0,_Total` and so on where applicable
(Processor Information is one example). (Processor Information is one example).
#### WarnOnMissing #### WarnOnMissing
*Optional* *Optional*
This key is optional. It is a simple bool. This key is optional, it is a simple bool.
If it is not set to true or included it is treated as false. If it is not set to true or included it is treated as false.
This only has effect on the first execution of the plugin. This only has an effect on the first execution of the plugin,
It will print out any ObjectName/Instance/Counter combinations it will print out any ObjectName/Instance/Counter combinations
asked for that do not match. Useful when debugging new configurations. asked for that do not match. Useful when debugging new configurations.
#### FailOnMissing #### FailOnMissing

View File

@@ -352,7 +352,7 @@ func PdhGetFormattedCounterValueDouble(hCounter PDH_HCOUNTER, lpdwType *uint32,
// time.Sleep(2000 * time.Millisecond) // time.Sleep(2000 * time.Millisecond)
// } // }
// } // }
func PdhGetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER, lpdwBufferSize *uint32, lpdwBufferCount *uint32, itemBuffer *byte) uint32 { func PdhGetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER, lpdwBufferSize *uint32, lpdwBufferCount *uint32, itemBuffer *PDH_FMT_COUNTERVALUE_ITEM_DOUBLE) uint32 {
ret, _, _ := pdh_GetFormattedCounterArrayW.Call( ret, _, _ := pdh_GetFormattedCounterArrayW.Call(
uintptr(hCounter), uintptr(hCounter),
uintptr(PDH_FMT_DOUBLE|PDH_FMT_NOCAP100), uintptr(PDH_FMT_DOUBLE|PDH_FMT_NOCAP100),

View File

@@ -9,12 +9,6 @@ import (
"unsafe" "unsafe"
) )
//PerformanceQuery is abstraction for PDH_FMT_COUNTERVALUE_ITEM_DOUBLE
type CounterValue struct {
InstanceName string
Value float64
}
//PerformanceQuery provides wrappers around Windows performance counters API for easy usage in GO //PerformanceQuery provides wrappers around Windows performance counters API for easy usage in GO
type PerformanceQuery interface { type PerformanceQuery interface {
Open() error Open() error
@@ -24,7 +18,6 @@ type PerformanceQuery interface {
GetCounterPath(counterHandle PDH_HCOUNTER) (string, error) GetCounterPath(counterHandle PDH_HCOUNTER) (string, error)
ExpandWildCardPath(counterPath string) ([]string, error) ExpandWildCardPath(counterPath string) ([]string, error)
GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error) GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error)
GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error)
CollectData() error CollectData() error
AddEnglishCounterSupported() bool AddEnglishCounterSupported() bool
} }
@@ -158,28 +151,6 @@ func (m *PerformanceQueryImpl) GetFormattedCounterValueDouble(hCounter PDH_HCOUN
} }
} }
func (m *PerformanceQueryImpl) GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error) {
var buffSize uint32
var itemCount uint32
ret := PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, nil)
if ret == PDH_MORE_DATA {
buff := make([]byte, buffSize)
ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, &buff[0])
if ret == ERROR_SUCCESS {
items := (*[1 << 20]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE)(unsafe.Pointer(&buff[0]))[:itemCount]
values := make([]CounterValue, 0, itemCount)
for _, item := range items {
if item.FmtValue.CStatus == PDH_CSTATUS_VALID_DATA || item.FmtValue.CStatus == PDH_CSTATUS_NEW_DATA {
val := CounterValue{UTF16PtrToString(item.SzName), item.FmtValue.DoubleValue}
values = append(values, val)
}
}
return values, nil
}
}
return nil, NewPdhError(ret)
}
func (m *PerformanceQueryImpl) CollectData() error { func (m *PerformanceQueryImpl) CollectData() error {
if m.query == 0 { if m.query == 0 {
return errors.New("uninitialised query") return errors.New("uninitialised query")
@@ -210,7 +181,7 @@ func UTF16ToStringArray(buf []uint16) []string {
stringLine := UTF16PtrToString(&buf[0]) stringLine := UTF16PtrToString(&buf[0])
for stringLine != "" { for stringLine != "" {
strings = append(strings, stringLine) strings = append(strings, stringLine)
nextLineStart += len([]rune(stringLine)) + 1 nextLineStart += len(stringLine) + 1
remainingBuf := buf[nextLineStart:] remainingBuf := buf[nextLineStart:]
stringLine = UTF16PtrToString(&remainingBuf[0]) stringLine = UTF16PtrToString(&remainingBuf[0])
} }

View File

@@ -5,14 +5,13 @@ package win_perf_counters
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"log" "log"
"regexp" "regexp"
"strings" "strings"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
) )
var sampleConfig = ` var sampleConfig = `
@@ -23,10 +22,6 @@ var sampleConfig = `
## agent, it will not be gathered. ## agent, it will not be gathered.
## Settings: ## Settings:
# PrintValid = false # Print All matching performance counters # PrintValid = false # Print All matching performance counters
# If UseWildcardsExpansion params is set to true, wildcards (partial wildcards in instance names and wildcards in counters names) in configured counter paths will be expanded
# and in case of localized Windows, counter paths will be also localized. It also returns instance indexes in instance names.
# If false, wildcards (not partial) in instance names will still be expanded, but instance indexes will not be returned in instance names.
#UseWildcardsExpansion = false
# Period after which counters will be reread from configuration and wildcards in counter paths expanded # Period after which counters will be reread from configuration and wildcards in counter paths expanded
CountersRefreshInterval="1m" CountersRefreshInterval="1m"
@@ -80,7 +75,6 @@ type Win_PerfCounters struct {
PreVistaSupport bool PreVistaSupport bool
Object []perfobject Object []perfobject
CountersRefreshInterval internal.Duration CountersRefreshInterval internal.Duration
UseWildcardsExpansion bool
lastRefreshed time.Time lastRefreshed time.Time
counters []*counter counters []*counter
@@ -143,59 +137,45 @@ func (m *Win_PerfCounters) SampleConfig() string {
return sampleConfig return sampleConfig
} }
//objectName string, counter string, instance string, measurement string, include_total bool func (m *Win_PerfCounters) AddItem(counterPath string, instance string, measurement string, includeTotal bool) error {
func (m *Win_PerfCounters) AddItem(counterPath string, objectName string, instance string, counterName string, measurement string, includeTotal bool) error {
var err error
var counterHandle PDH_HCOUNTER
if !m.query.AddEnglishCounterSupported() { if !m.query.AddEnglishCounterSupported() {
counterHandle, err = m.query.AddCounterToQuery(counterPath) _, err := m.query.AddCounterToQuery(counterPath)
if err != nil { if err != nil {
return err return err
} }
} else { } else {
counterHandle, err = m.query.AddEnglishCounterToQuery(counterPath) counterHandle, err := m.query.AddEnglishCounterToQuery(counterPath)
if err != nil { if err != nil {
return err return err
} }
}
if m.UseWildcardsExpansion {
origInstance := instance
counterPath, err = m.query.GetCounterPath(counterHandle) counterPath, err = m.query.GetCounterPath(counterHandle)
if err != nil { if err != nil {
return err return err
} }
counters, err := m.query.ExpandWildCardPath(counterPath) }
counters, err := m.query.ExpandWildCardPath(counterPath)
if err != nil {
return err
}
for _, counterPath := range counters {
var err error
counterHandle, err := m.query.AddCounterToQuery(counterPath)
parsedObjectName, parsedInstance, parsedCounter, err := extractObjectInstanceCounterFromQuery(counterPath)
if err != nil { if err != nil {
return err return err
} }
for _, counterPath := range counters { if parsedInstance == "_Total" && instance == "*" && !includeTotal {
var err error continue
counterHandle, err := m.query.AddCounterToQuery(counterPath)
objectName, instance, counterName, err = extractObjectInstanceCounterFromQuery(counterPath)
if err != nil {
return err
}
if instance == "_Total" && origInstance == "*" && !includeTotal {
continue
}
newItem := &counter{counterPath, objectName, counterName, instance, measurement,
includeTotal, counterHandle}
m.counters = append(m.counters, newItem)
if m.PrintValid {
log.Printf("Valid: %s\n", counterPath)
}
} }
} else {
newItem := &counter{counterPath, objectName, counterName, instance, measurement, newItem := &counter{counterPath, parsedObjectName, parsedCounter, parsedInstance, measurement,
includeTotal, counterHandle} includeTotal, counterHandle}
m.counters = append(m.counters, newItem) m.counters = append(m.counters, newItem)
if m.PrintValid { if m.PrintValid {
log.Printf("Valid: %s\n", counterPath) log.Printf("Valid: %s\n", counterPath)
} }
@@ -219,7 +199,7 @@ func (m *Win_PerfCounters) ParseConfig() error {
counterPath = "\\" + objectname + "(" + instance + ")\\" + counter counterPath = "\\" + objectname + "(" + instance + ")\\" + counter
} }
err := m.AddItem(counterPath, objectname, instance, counter, PerfObject.Measurement, PerfObject.IncludeTotal) err := m.AddItem(counterPath, instance, PerfObject.Measurement, PerfObject.IncludeTotal)
if err != nil { if err != nil {
if PerfObject.FailOnMissing || PerfObject.WarnOnMissing { if PerfObject.FailOnMissing || PerfObject.WarnOnMissing {
@@ -245,9 +225,7 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
var err error var err error
if m.lastRefreshed.IsZero() || (m.CountersRefreshInterval.Duration.Nanoseconds() > 0 && m.lastRefreshed.Add(m.CountersRefreshInterval.Duration).Before(time.Now())) { if m.lastRefreshed.IsZero() || (m.CountersRefreshInterval.Duration.Nanoseconds() > 0 && m.lastRefreshed.Add(m.CountersRefreshInterval.Duration).Before(time.Now())) {
if m.counters != nil { m.counters = m.counters[:0]
m.counters = m.counters[:0]
}
err = m.query.Open() err = m.query.Open()
if err != nil { if err != nil {
@@ -283,61 +261,22 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
// For iterate over the known metrics and get the samples. // For iterate over the known metrics and get the samples.
for _, metric := range m.counters { for _, metric := range m.counters {
// collect // collect
if m.UseWildcardsExpansion { value, err := m.query.GetFormattedCounterValueDouble(metric.counterHandle)
value, err := m.query.GetFormattedCounterValueDouble(metric.counterHandle) if err == nil {
if err == nil { measurement := sanitizedChars.Replace(metric.measurement)
measurement := sanitizedChars.Replace(metric.measurement) if measurement == "" {
if measurement == "" { measurement = "win_perf_counters"
measurement = "win_perf_counters"
}
var instance = InstanceGrouping{measurement, metric.instance, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(value)
} else {
//ignore invalid data from as some counters from process instances returns this sometimes
if phderr, ok := err.(*PdhError); ok && phderr.ErrorCode != PDH_INVALID_DATA && phderr.ErrorCode != PDH_CALC_NEGATIVE_VALUE {
return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err)
}
} }
var instance = InstanceGrouping{measurement, metric.instance, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(value)
} else { } else {
counterValues, err := m.query.GetFormattedCounterArrayDouble(metric.counterHandle) //ignore invalid data from as some counters from process instances returns this sometimes
if err == nil { if phderr, ok := err.(*PdhError); ok && phderr.ErrorCode != PDH_INVALID_DATA && phderr.ErrorCode != PDH_CALC_NEGATIVE_VALUE {
for _, cValue := range counterValues { return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err)
var add bool
if metric.includeTotal {
// If IncludeTotal is set, include all.
add = true
} else if metric.instance == "*" && !strings.Contains(cValue.InstanceName, "_Total") {
// Catch if set to * and that it is not a '*_Total*' instance.
add = true
} else if metric.instance == cValue.InstanceName {
// Catch if we set it to total or some form of it
add = true
} else if strings.Contains(metric.instance, "#") && strings.HasPrefix(metric.instance, cValue.InstanceName) {
// If you are using a multiple instance identifier such as "w3wp#1"
// phd.dll returns only the first 2 characters of the identifier.
add = true
cValue.InstanceName = metric.instance
} else if metric.instance == "------" {
add = true
}
if add {
measurement := sanitizedChars.Replace(metric.measurement)
if measurement == "" {
measurement = "win_perf_counters"
}
var instance = InstanceGrouping{measurement, cValue.InstanceName, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(cValue.Value)
}
}
} }
} }
} }

View File

@@ -81,29 +81,6 @@ func TestWinPerformanceQueryImpl(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, paths) require.NotNil(t, paths)
assert.True(t, len(paths) > 1) assert.True(t, len(paths) > 1)
err = query.Open()
require.NoError(t, err)
counterPath = "\\Process(*)\\% Processor Time"
hCounter, err = query.AddEnglishCounterToQuery(counterPath)
require.NoError(t, err)
assert.NotEqual(t, 0, hCounter)
err = query.CollectData()
require.NoError(t, err)
time.Sleep(time.Second)
err = query.CollectData()
require.NoError(t, err)
arr, err := query.GetFormattedCounterArrayDouble(hCounter)
require.NoError(t, err)
assert.True(t, len(arr) > 0, "Too")
err = query.Close()
require.NoError(t, err)
} }
func TestWinPerfcountersConfigGet1(t *testing.T) { func TestWinPerfcountersConfigGet1(t *testing.T) {
@@ -596,7 +573,7 @@ func TestWinPerfcountersCollect2(t *testing.T) {
perfobjects[0] = PerfObject perfobjects[0] = PerfObject
m := Win_PerfCounters{PrintValid: false, Object: perfobjects, query: &PerformanceQueryImpl{}, UseWildcardsExpansion: true} m := Win_PerfCounters{PrintValid: false, Object: perfobjects, query: &PerformanceQueryImpl{}}
var acc testutil.Accumulator var acc testutil.Accumulator
err := m.Gather(&acc) err := m.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)

View File

@@ -25,14 +25,6 @@ type FakePerformanceQuery struct {
openCalled bool openCalled bool
} }
func (m *testCounter) ToCounterValue() *CounterValue {
_, inst, _, _ := extractObjectInstanceCounterFromQuery(m.path)
if inst == "" {
inst = "--"
}
return &CounterValue{inst, m.value}
}
func (m *FakePerformanceQuery) Open() error { func (m *FakePerformanceQuery) Open() error {
if m.openCalled { if m.openCalled {
err := m.Close() err := m.Close()
@@ -110,48 +102,6 @@ func (m *FakePerformanceQuery) GetFormattedCounterValueDouble(counterHandle PDH_
} }
return 0, fmt.Errorf("GetFormattedCounterValueDouble: invalid handle: %d", counterHandle) return 0, fmt.Errorf("GetFormattedCounterValueDouble: invalid handle: %d", counterHandle)
} }
func (m *FakePerformanceQuery) findCounterByPath(counterPath string) *testCounter {
for _, c := range m.counters {
if c.path == counterPath {
return &c
}
}
return nil
}
func (m *FakePerformanceQuery) findCounterByHandle(counterHandle PDH_HCOUNTER) *testCounter {
for _, c := range m.counters {
if c.handle == counterHandle {
return &c
}
}
return nil
}
func (m *FakePerformanceQuery) GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error) {
if !m.openCalled {
return nil, errors.New("GetFormattedCounterArrayDouble: uninitialised query")
}
for _, c := range m.counters {
if c.handle == hCounter {
if e, ok := m.expandPaths[c.path]; ok {
counters := make([]CounterValue, 0, len(e))
for _, p := range e {
counter := m.findCounterByPath(p)
if counter != nil && counter.value > 0 {
counters = append(counters, *counter.ToCounterValue())
} else {
return nil, fmt.Errorf("GetFormattedCounterArrayDouble: invalid counter : %s", p)
}
}
return counters, nil
} else {
return nil, fmt.Errorf("GetFormattedCounterArrayDouble: invalid counter : %d", hCounter)
}
}
}
return nil, fmt.Errorf("GetFormattedCounterArrayDouble: invalid counter : %d, no paths found", hCounter)
}
func (m *FakePerformanceQuery) CollectData() error { func (m *FakePerformanceQuery) CollectData() error {
if !m.openCalled { if !m.openCalled {
@@ -202,7 +152,7 @@ func TestAddItemSimple(t *testing.T) {
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
err = m.AddItem(cps1[0], "O", "I", "c", "test", false) err = m.AddItem(cps1[0], "I", "test", false)
require.NoError(t, err) require.NoError(t, err)
err = m.query.Close() err = m.query.Close()
require.NoError(t, err) require.NoError(t, err)
@@ -211,7 +161,7 @@ func TestAddItemSimple(t *testing.T) {
func TestAddItemInvalidCountPath(t *testing.T) { func TestAddItemInvalidCountPath(t *testing.T) {
var err error var err error
cps1 := []string{"\\O\\C"} cps1 := []string{"\\O\\C"}
m := Win_PerfCounters{PrintValid: false, Object: nil, UseWildcardsExpansion: true, query: &FakePerformanceQuery{ m := Win_PerfCounters{PrintValid: false, Object: nil, query: &FakePerformanceQuery{
counters: createCounterMap(cps1, []float64{1.1}), counters: createCounterMap(cps1, []float64{1.1}),
expandPaths: map[string][]string{ expandPaths: map[string][]string{
cps1[0]: {"\\O/C"}, cps1[0]: {"\\O/C"},
@@ -220,7 +170,7 @@ func TestAddItemInvalidCountPath(t *testing.T) {
}} }}
err = m.query.Open() err = m.query.Open()
require.NoError(t, err) require.NoError(t, err)
err = m.AddItem("\\O\\C", "O", "------", "C", "test", false) err = m.AddItem("\\O\\C", "*", "test", false)
require.Error(t, err) require.Error(t, err)
err = m.query.Close() err = m.query.Close()
require.NoError(t, err) require.NoError(t, err)
@@ -247,24 +197,13 @@ func TestParseConfigBasic(t *testing.T) {
assert.Len(t, m.counters, 4) assert.Len(t, m.counters, 4)
err = m.query.Close() err = m.query.Close()
require.NoError(t, err) require.NoError(t, err)
m.UseWildcardsExpansion = true
m.counters = nil
err = m.query.Open()
require.NoError(t, err)
err = m.ParseConfig()
require.NoError(t, err)
assert.Len(t, m.counters, 4)
err = m.query.Close()
require.NoError(t, err)
} }
func TestParseConfigNoInstance(t *testing.T) { func TestParseConfigNoInstance(t *testing.T) {
var err error var err error
perfObjects := createPerfObject("m", "O", []string{"------"}, []string{"C1", "C2"}, false, false) perfObjects := createPerfObject("m", "O", []string{"------"}, []string{"C1", "C2"}, false, false)
cps1 := []string{"\\O\\C1", "\\O\\C2"} cps1 := []string{"\\O\\C1", "\\O\\C2"}
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: false, query: &FakePerformanceQuery{ m := Win_PerfCounters{PrintValid: false, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap(cps1, []float64{1.1, 1.2}), counters: createCounterMap(cps1, []float64{1.1, 1.2}),
expandPaths: map[string][]string{ expandPaths: map[string][]string{
cps1[0]: {cps1[0]}, cps1[0]: {cps1[0]},
@@ -279,17 +218,6 @@ func TestParseConfigNoInstance(t *testing.T) {
assert.Len(t, m.counters, 2) assert.Len(t, m.counters, 2)
err = m.query.Close() err = m.query.Close()
require.NoError(t, err) require.NoError(t, err)
m.UseWildcardsExpansion = true
m.counters = nil
err = m.query.Open()
require.NoError(t, err)
err = m.ParseConfig()
require.NoError(t, err)
assert.Len(t, m.counters, 2)
err = m.query.Close()
require.NoError(t, err)
} }
func TestParseConfigInvalidCounterError(t *testing.T) { func TestParseConfigInvalidCounterError(t *testing.T) {
@@ -311,16 +239,6 @@ func TestParseConfigInvalidCounterError(t *testing.T) {
require.Error(t, err) require.Error(t, err)
err = m.query.Close() err = m.query.Close()
require.NoError(t, err) require.NoError(t, err)
m.UseWildcardsExpansion = true
m.counters = nil
err = m.query.Open()
require.NoError(t, err)
err = m.ParseConfig()
require.Error(t, err)
err = m.query.Close()
require.NoError(t, err)
} }
func TestParseConfigInvalidCounterNoError(t *testing.T) { func TestParseConfigInvalidCounterNoError(t *testing.T) {
@@ -342,24 +260,13 @@ func TestParseConfigInvalidCounterNoError(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
err = m.query.Close() err = m.query.Close()
require.NoError(t, err) require.NoError(t, err)
m.UseWildcardsExpansion = true
m.counters = nil
err = m.query.Open()
require.NoError(t, err)
err = m.ParseConfig()
require.NoError(t, err)
err = m.query.Close()
require.NoError(t, err)
} }
func TestParseConfigTotalExpansion(t *testing.T) { func TestParseConfigTotal(t *testing.T) {
var err error var err error
perfObjects := createPerfObject("m", "O", []string{"*"}, []string{"*"}, true, true) perfObjects := createPerfObject("m", "O", []string{"*"}, []string{"*"}, true, true)
cps1 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(_Total)\\C1", "\\O(_Total)\\C2"} cps1 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(_Total)\\C1", "\\O(_Total)\\C2"}
m := Win_PerfCounters{PrintValid: false, UseWildcardsExpansion: true, Object: perfObjects, query: &FakePerformanceQuery{ m := Win_PerfCounters{PrintValid: false, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap(append(cps1, "\\O(*)\\*"), []float64{1.1, 1.2, 1.3, 1.4, 0}), counters: createCounterMap(append(cps1, "\\O(*)\\*"), []float64{1.1, 1.2, 1.3, 1.4, 0}),
expandPaths: map[string][]string{ expandPaths: map[string][]string{
"\\O(*)\\*": cps1, "\\O(*)\\*": cps1,
@@ -376,7 +283,7 @@ func TestParseConfigTotalExpansion(t *testing.T) {
perfObjects[0].IncludeTotal = false perfObjects[0].IncludeTotal = false
m = Win_PerfCounters{PrintValid: false, UseWildcardsExpansion: true, Object: perfObjects, query: &FakePerformanceQuery{ m = Win_PerfCounters{PrintValid: false, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap(append(cps1, "\\O(*)\\*"), []float64{1.1, 1.2, 1.3, 1.4, 0}), counters: createCounterMap(append(cps1, "\\O(*)\\*"), []float64{1.1, 1.2, 1.3, 1.4, 0}),
expandPaths: map[string][]string{ expandPaths: map[string][]string{
"\\O(*)\\*": cps1, "\\O(*)\\*": cps1,
@@ -396,7 +303,7 @@ func TestParseConfigExpand(t *testing.T) {
var err error var err error
perfObjects := createPerfObject("m", "O", []string{"*"}, []string{"*"}, false, false) perfObjects := createPerfObject("m", "O", []string{"*"}, []string{"*"}, false, false)
cps1 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(I2)\\C1", "\\O(I2)\\C2"} cps1 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(I2)\\C1", "\\O(I2)\\C2"}
m := Win_PerfCounters{PrintValid: false, UseWildcardsExpansion: true, Object: perfObjects, query: &FakePerformanceQuery{ m := Win_PerfCounters{PrintValid: false, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap(append(cps1, "\\O(*)\\*"), []float64{1.1, 1.2, 1.3, 1.4, 0}), counters: createCounterMap(append(cps1, "\\O(*)\\*"), []float64{1.1, 1.2, 1.3, 1.4, 0}),
expandPaths: map[string][]string{ expandPaths: map[string][]string{
"\\O(*)\\*": cps1, "\\O(*)\\*": cps1,
@@ -439,17 +346,6 @@ func TestSimpleGather(t *testing.T) {
"objectname": "O", "objectname": "O",
} }
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1) acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
m.UseWildcardsExpansion = true
m.counters = nil
m.lastRefreshed = time.Time{}
var acc2 testutil.Accumulator
err = m.Gather(&acc2)
require.NoError(t, err)
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
} }
func TestGatherInvalidDataIgnore(t *testing.T) { func TestGatherInvalidDataIgnore(t *testing.T) {
@@ -481,25 +377,15 @@ func TestGatherInvalidDataIgnore(t *testing.T) {
"objectname": "O", "objectname": "O",
} }
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1) acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
m.UseWildcardsExpansion = true
m.counters = nil
m.lastRefreshed = time.Time{}
var acc2 testutil.Accumulator
err = m.Gather(&acc2)
require.NoError(t, err)
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
} }
//tests with expansion func TestGatherRefreshing(t *testing.T) {
func TestGatherRefreshingWithExpansion(t *testing.T) {
var err error var err error
if testing.Short() { if testing.Short() {
t.Skip("Skipping long taking test in short mode") t.Skip("Skipping long taking test in short mode")
} }
measurement := "test" measurement := "test"
perfObjects := createPerfObject(measurement, "O", []string{"*"}, []string{"*"}, true, false) perfObjects := createPerfObject(measurement, "O", []string{"*"}, []string{"*"}, false, false)
cps1 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(I2)\\C1", "\\O(I2)\\C2"} cps1 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(I2)\\C1", "\\O(I2)\\C2"}
fpm := &FakePerformanceQuery{ fpm := &FakePerformanceQuery{
counters: createCounterMap(append(cps1, "\\O(*)\\*"), []float64{1.1, 1.2, 1.3, 1.4, 0}), counters: createCounterMap(append(cps1, "\\O(*)\\*"), []float64{1.1, 1.2, 1.3, 1.4, 0}),
@@ -508,7 +394,7 @@ func TestGatherRefreshingWithExpansion(t *testing.T) {
}, },
addEnglishSupported: true, addEnglishSupported: true,
} }
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: true, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}} m := Win_PerfCounters{PrintValid: false, Object: perfObjects, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}}
var acc1 testutil.Accumulator var acc1 testutil.Accumulator
err = m.Gather(&acc1) err = m.Gather(&acc1)
assert.Len(t, m.counters, 4) assert.Len(t, m.counters, 4)
@@ -577,211 +463,3 @@ func TestGatherRefreshingWithExpansion(t *testing.T) {
acc3.AssertContainsTaggedFields(t, measurement, fields3, tags3) acc3.AssertContainsTaggedFields(t, measurement, fields3, tags3)
} }
func TestGatherRefreshingWithoutExpansion(t *testing.T) {
var err error
if testing.Short() {
t.Skip("Skipping long taking test in short mode")
}
measurement := "test"
perfObjects := createPerfObject(measurement, "O", []string{"*"}, []string{"C1", "C2"}, true, false)
cps1 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(I2)\\C1", "\\O(I2)\\C2"}
fpm := &FakePerformanceQuery{
counters: createCounterMap(append([]string{"\\O(*)\\C1", "\\O(*)\\C2"}, cps1...), []float64{0, 0, 1.1, 1.2, 1.3, 1.4}),
expandPaths: map[string][]string{
"\\O(*)\\C1": {cps1[0], cps1[2]},
"\\O(*)\\C2": {cps1[1], cps1[3]},
},
addEnglishSupported: true,
}
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: false, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)
assert.Len(t, m.counters, 2)
require.NoError(t, err)
assert.Len(t, acc1.Metrics, 2)
fields1 := map[string]interface{}{
"C1": float32(1.1),
"C2": float32(1.2),
}
tags1 := map[string]string{
"instance": "I1",
"objectname": "O",
}
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
fields2 := map[string]interface{}{
"C1": float32(1.3),
"C2": float32(1.4),
}
tags2 := map[string]string{
"instance": "I2",
"objectname": "O",
}
acc1.AssertContainsTaggedFields(t, measurement, fields2, tags2)
//test finding new instance
cps2 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(I2)\\C1", "\\O(I2)\\C2", "\\O(I3)\\C1", "\\O(I3)\\C2"}
fpm = &FakePerformanceQuery{
counters: createCounterMap(append([]string{"\\O(*)\\C1", "\\O(*)\\C2"}, cps2...), []float64{0, 0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6}),
expandPaths: map[string][]string{
"\\O(*)\\C1": {cps2[0], cps2[2], cps2[4]},
"\\O(*)\\C2": {cps2[1], cps2[3], cps2[5]},
},
addEnglishSupported: true,
}
m.query = fpm
fpm.Open()
var acc2 testutil.Accumulator
fields3 := map[string]interface{}{
"C1": float32(1.5),
"C2": float32(1.6),
}
tags3 := map[string]string{
"instance": "I3",
"objectname": "O",
}
//test before elapsing CounterRefreshRate counters are not refreshed
err = m.Gather(&acc2)
require.NoError(t, err)
assert.Len(t, m.counters, 2)
assert.Len(t, acc2.Metrics, 3)
acc2.AssertContainsTaggedFields(t, measurement, fields1, tags1)
acc2.AssertContainsTaggedFields(t, measurement, fields2, tags2)
acc2.AssertContainsTaggedFields(t, measurement, fields3, tags3)
//test changed configuration
perfObjects = createPerfObject(measurement, "O", []string{"*"}, []string{"C1", "C2", "C3"}, true, false)
cps3 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(I1)\\C3", "\\O(I2)\\C1", "\\O(I2)\\C2", "\\O(I2)\\C3"}
fpm = &FakePerformanceQuery{
counters: createCounterMap(append([]string{"\\O(*)\\C1", "\\O(*)\\C2", "\\O(*)\\C3"}, cps3...), []float64{0, 0, 0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6}),
expandPaths: map[string][]string{
"\\O(*)\\C1": {cps3[0], cps3[3]},
"\\O(*)\\C2": {cps3[1], cps3[4]},
"\\O(*)\\C3": {cps3[2], cps3[5]},
},
addEnglishSupported: true,
}
m.query = fpm
m.Object = perfObjects
fpm.Open()
time.Sleep(m.CountersRefreshInterval.Duration)
var acc3 testutil.Accumulator
err = m.Gather(&acc3)
require.NoError(t, err)
assert.Len(t, acc3.Metrics, 2)
fields4 := map[string]interface{}{
"C1": float32(1.1),
"C2": float32(1.2),
"C3": float32(1.3),
}
tags4 := map[string]string{
"instance": "I1",
"objectname": "O",
}
fields5 := map[string]interface{}{
"C1": float32(1.4),
"C2": float32(1.5),
"C3": float32(1.6),
}
tags5 := map[string]string{
"instance": "I2",
"objectname": "O",
}
acc3.AssertContainsTaggedFields(t, measurement, fields4, tags4)
acc3.AssertContainsTaggedFields(t, measurement, fields5, tags5)
}
func TestGatherTotalNoExpansion(t *testing.T) {
var err error
measurement := "m"
perfObjects := createPerfObject(measurement, "O", []string{"*"}, []string{"C1", "C2"}, true, true)
cps1 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(_Total)\\C1", "\\O(_Total)\\C2"}
m := Win_PerfCounters{PrintValid: false, UseWildcardsExpansion: false, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap(append([]string{"\\O(*)\\C1", "\\O(*)\\C2"}, cps1...), []float64{0, 0, 1.1, 1.2, 1.3, 1.4}),
expandPaths: map[string][]string{
"\\O(*)\\C1": {cps1[0], cps1[2]},
"\\O(*)\\C2": {cps1[1], cps1[3]},
},
addEnglishSupported: true,
}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)
require.NoError(t, err)
assert.Len(t, m.counters, 2)
assert.Len(t, acc1.Metrics, 2)
fields1 := map[string]interface{}{
"C1": float32(1.1),
"C2": float32(1.2),
}
tags1 := map[string]string{
"instance": "I1",
"objectname": "O",
}
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
fields2 := map[string]interface{}{
"C1": float32(1.3),
"C2": float32(1.4),
}
tags2 := map[string]string{
"instance": "_Total",
"objectname": "O",
}
acc1.AssertContainsTaggedFields(t, measurement, fields2, tags2)
perfObjects[0].IncludeTotal = false
m.counters = nil
m.lastRefreshed = time.Time{}
var acc2 testutil.Accumulator
err = m.Gather(&acc2)
require.NoError(t, err)
assert.Len(t, m.counters, 2)
assert.Len(t, acc2.Metrics, 1)
acc2.AssertContainsTaggedFields(t, measurement, fields1, tags1)
acc2.AssertDoesNotContainsTaggedFields(t, measurement, fields2, tags2)
}
// list of nul terminated strings from WinAPI
var unicodeStringListWithEnglishChars = []uint16{0x5c, 0x5c, 0x54, 0x34, 0x38, 0x30, 0x5c, 0x50, 0x68, 0x79, 0x73, 0x69, 0x63, 0x61, 0x6c, 0x44, 0x69, 0x73, 0x6b, 0x28, 0x30, 0x20, 0x43, 0x3a, 0x29, 0x5c, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x44, 0x69, 0x73, 0x6b, 0x20, 0x51, 0x75, 0x65, 0x75, 0x65, 0x20, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x0, 0x5c, 0x5c, 0x54, 0x34, 0x38, 0x30, 0x5c, 0x50, 0x68, 0x79, 0x73, 0x69, 0x63, 0x61, 0x6c, 0x44, 0x69, 0x73, 0x6b, 0x28, 0x5f, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x29, 0x5c, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x44, 0x69, 0x73, 0x6b, 0x20, 0x51, 0x75, 0x65, 0x75, 0x65, 0x20, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x0, 0x0}
var unicodeStringListWithCzechChars = []uint16{0x5c, 0x5c, 0x54, 0x34, 0x38, 0x30, 0x5c, 0x46, 0x79, 0x7a, 0x69, 0x63, 0x6b, 0xfd, 0x20, 0x64, 0x69, 0x73, 0x6b, 0x28, 0x30, 0x20, 0x43, 0x3a, 0x29, 0x5c, 0x41, 0x6b, 0x74, 0x75, 0xe1, 0x6c, 0x6e, 0xed, 0x20, 0x64, 0xe9, 0x6c, 0x6b, 0x61, 0x20, 0x66, 0x72, 0x6f, 0x6e, 0x74, 0x79, 0x20, 0x64, 0x69, 0x73, 0x6b, 0x75, 0x0, 0x5c, 0x5c, 0x54, 0x34, 0x38, 0x30, 0x5c, 0x46, 0x79, 0x7a, 0x69, 0x63, 0x6b, 0xfd, 0x20, 0x64, 0x69, 0x73, 0x6b, 0x28, 0x5f, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x29, 0x5c, 0x41, 0x6b, 0x74, 0x75, 0xe1, 0x6c, 0x6e, 0xed, 0x20, 0x64, 0xe9, 0x6c, 0x6b, 0x61, 0x20, 0x66, 0x72, 0x6f, 0x6e, 0x74, 0x79, 0x20, 0x64, 0x69, 0x73, 0x6b, 0x75, 0x0, 0x0}
var unicodeStringListSingleItem = []uint16{0x5c, 0x5c, 0x54, 0x34, 0x38, 0x30, 0x5c, 0x50, 0x68, 0x79, 0x73, 0x69, 0x63, 0x61, 0x6c, 0x44, 0x69, 0x73, 0x6b, 0x28, 0x30, 0x20, 0x43, 0x3a, 0x29, 0x5c, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x44, 0x69, 0x73, 0x6b, 0x20, 0x51, 0x75, 0x65, 0x75, 0x65, 0x20, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x0, 0x0}
var unicodeStringListNoItem = []uint16{0x0}
var stringArrayWithEnglishChars = []string{
"\\\\T480\\PhysicalDisk(0 C:)\\Current Disk Queue Length",
"\\\\T480\\PhysicalDisk(_Total)\\Current Disk Queue Length",
}
var stringArrayWithCzechChars = []string{
"\\\\T480\\Fyzick\u00fd disk(0 C:)\\Aktu\u00e1ln\u00ed d\u00e9lka fronty disku",
"\\\\T480\\Fyzick\u00fd disk(_Total)\\Aktu\u00e1ln\u00ed d\u00e9lka fronty disku",
}
var stringArraySingleItem = []string{
"\\\\T480\\PhysicalDisk(0 C:)\\Current Disk Queue Length",
}
func TestUTF16ToStringArray(t *testing.T) {
singleItem := UTF16ToStringArray(unicodeStringListSingleItem)
assert.True(t, assert.ObjectsAreEqual(singleItem, stringArraySingleItem), "Not equal single arrays")
noItem := UTF16ToStringArray(unicodeStringListNoItem)
assert.Nil(t, noItem)
engStrings := UTF16ToStringArray(unicodeStringListWithEnglishChars)
assert.True(t, assert.ObjectsAreEqual(engStrings, stringArrayWithEnglishChars), "Not equal eng arrays")
czechStrings := UTF16ToStringArray(unicodeStringListWithCzechChars)
assert.True(t, assert.ObjectsAreEqual(czechStrings, stringArrayWithCzechChars), "Not equal czech arrays")
}

View File

@@ -140,17 +140,17 @@ func (i *InfluxDB) Connect() error {
i.serializer.SetFieldTypeSupport(influx.UintSupport) i.serializer.SetFieldTypeSupport(influx.UintSupport)
} }
for _, u := range urls { for _, loc := range urls {
u, err := url.Parse(u) u, err := url.Parse(loc)
if err != nil { if err != nil {
return fmt.Errorf("error parsing url [%s]: %v", u, err) return fmt.Errorf("error parsing url [%q]: %v", loc, err)
} }
var proxy *url.URL var proxy *url.URL
if len(i.HTTPProxy) > 0 { if len(i.HTTPProxy) > 0 {
proxy, err = url.Parse(i.HTTPProxy) proxy, err = url.Parse(i.HTTPProxy)
if err != nil { if err != nil {
return fmt.Errorf("error parsing proxy_url [%s]: %v", proxy, err) return fmt.Errorf("error parsing proxy_url [%q]: %v", proxy, err)
} }
} }
@@ -170,7 +170,7 @@ func (i *InfluxDB) Connect() error {
i.clients = append(i.clients, c) i.clients = append(i.clients, c)
default: default:
return fmt.Errorf("unsupported scheme [%s]: %q", u, u.Scheme) return fmt.Errorf("unsupported scheme [%q]: %q", u, u.Scheme)
} }
} }
@@ -209,14 +209,14 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
if apiError.Type == DatabaseNotFound { if apiError.Type == DatabaseNotFound {
err := client.CreateDatabase(ctx) err := client.CreateDatabase(ctx)
if err != nil { if err != nil {
log.Printf("E! [outputs.influxdb] when writing to [%s]: database %q not found and failed to recreate", log.Printf("E! [outputs.influxdb] when writing to [%q]: database %q not found and failed to recreate",
client.URL(), client.Database()) client.URL(), client.Database())
} }
} }
} }
} }
log.Printf("E! [outputs.influxdb]: when writing to [%s]: %v", client.URL(), err) log.Printf("E! [outputs.influxdb]: when writing to [%q]: %v", client.URL(), err)
} }
return errors.New("could not write any address") return errors.New("could not write any address")
@@ -231,7 +231,7 @@ func (i *InfluxDB) udpClient(url *url.URL) (Client, error) {
c, err := i.CreateUDPClientF(config) c, err := i.CreateUDPClientF(config)
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating UDP client [%s]: %v", url, err) return nil, fmt.Errorf("error creating UDP client [%q]: %v", url, err)
} }
return c, nil return c, nil
@@ -261,13 +261,13 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL)
c, err := i.CreateHTTPClientF(config) c, err := i.CreateHTTPClientF(config)
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating HTTP client [%s]: %v", url, err) return nil, fmt.Errorf("error creating HTTP client [%q]: %v", url, err)
} }
if !i.SkipDatabaseCreation { if !i.SkipDatabaseCreation {
err = c.CreateDatabase(ctx) err = c.CreateDatabase(ctx)
if err != nil { if err != nil {
log.Printf("W! [outputs.influxdb] when writing to [%s]: database %q creation failed: %v", log.Printf("W! [outputs.influxdb] when writing to [%q]: database %q creation failed: %v",
c.URL(), c.Database(), err) c.URL(), c.Database(), err)
} }
} }

View File

@@ -3,7 +3,7 @@ package nats
import ( import (
"fmt" "fmt"
nats_client "github.com/nats-io/go-nats" nats_client "github.com/nats-io/nats"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/internal/tls"

View File

@@ -180,14 +180,6 @@ func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricPoint {
} }
func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string) { func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string) {
// Remove all empty tags.
for k, v := range mTags {
if v == "" {
delete(mTags, k)
}
}
var source string var source string
sourceTagFound := false sourceTagFound := false

View File

@@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -45,15 +44,25 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
tags[k] = v tags[k] = v
} }
for _, tag := range p.TagKeys {
switch v := jsonOut[tag].(type) {
case string:
tags[tag] = v
case bool:
tags[tag] = strconv.FormatBool(v)
case float64:
tags[tag] = strconv.FormatFloat(v, 'f', -1, 64)
}
delete(jsonOut, tag)
}
f := JSONFlattener{} f := JSONFlattener{}
err := f.FullFlattenJSON("", jsonOut, true, true) err := f.FlattenJSON("", jsonOut)
if err != nil { if err != nil {
return nil, err return nil, err
} }
tags, nFields := p.switchFieldToTag(tags, f.Fields) metric, err := metric.New(p.MetricName, tags, f.Fields, time.Now().UTC())
metric, err := metric.New(p.MetricName, tags, nFields, time.Now().UTC())
if err != nil { if err != nil {
return nil, err return nil, err
@@ -61,43 +70,6 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
return append(metrics, metric), nil return append(metrics, metric), nil
} }
//will take in field map with strings and bools,
//search for TagKeys that match fieldnames and add them to tags
//will delete any strings/bools that shouldn't be fields
//assumes that any non-numeric values in TagKeys should be displayed as tags
func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]interface{}) (map[string]string, map[string]interface{}) {
for _, name := range p.TagKeys {
//switch any fields in tagkeys into tags
if fields[name] == nil {
continue
}
switch value := fields[name].(type) {
case string:
tags[name] = value
delete(fields, name)
case bool:
tags[name] = strconv.FormatBool(value)
delete(fields, name)
case float64:
tags[name] = strconv.FormatFloat(value, 'f', -1, 64)
delete(fields, name)
default:
log.Printf("E! [parsers.json] Unrecognized type %T", value)
}
}
//remove any additional string/bool values from fields
for k := range fields {
switch fields[k].(type) {
case string:
delete(fields, k)
case bool:
delete(fields, k)
}
}
return tags, fields
}
func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
buf = bytes.TrimSpace(buf) buf = bytes.TrimSpace(buf)
buf = bytes.TrimPrefix(buf, utf8BOM) buf = bytes.TrimPrefix(buf, utf8BOM)
@@ -147,7 +119,6 @@ func (f *JSONFlattener) FlattenJSON(
if f.Fields == nil { if f.Fields == nil {
f.Fields = make(map[string]interface{}) f.Fields = make(map[string]interface{})
} }
return f.FullFlattenJSON(fieldname, v, false, false) return f.FullFlattenJSON(fieldname, v, false, false)
} }

View File

@@ -4,7 +4,6 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
const ( const (
@@ -441,29 +440,3 @@ func TestHttpJsonBOM(t *testing.T) {
_, err := parser.Parse(jsonBOM) _, err := parser.Parse(jsonBOM)
assert.NoError(t, err) assert.NoError(t, err)
} }
//for testing issue #4260
func TestJSONParseNestedArray(t *testing.T) {
testString := `{
"total_devices": 5,
"total_threads": 10,
"shares": {
"total": 5,
"accepted": 5,
"rejected": 0,
"avg_find_time": 4,
"tester": "work",
"tester2": "don't want this",
"tester3": 7.93
}
}`
parser := JSONParser{
MetricName: "json_test",
TagKeys: []string{"total_devices", "total_threads", "shares_tester", "shares_tester3"},
}
metrics, err := parser.Parse([]byte(testString))
require.NoError(t, err)
require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags()))
}

View File

@@ -1,11 +1,11 @@
# Converter Processor # Converter Processor
The converter processor is used to change the type of tag or field values. In The converter processor is used to change the type of tag or field values. In
addition to changing field types it can convert between fields and tags. addition to changing field types it can convert fields to tags and vis versa.
Values that cannot be converted are dropped. Values that cannot be converted are dropped.
**Note:** When converting tags to fields, take care to ensure the series is still **Note:** When converting tags to fields, take care not to ensure the series is still
uniquely identifiable. Fields with the same series key (measurement + tags) uniquely identifiable. Fields with the same series key (measurement + tags)
will overwrite one another. will overwrite one another.

View File

@@ -95,7 +95,7 @@ supported_packages = {
"freebsd": [ "tar" ] "freebsd": [ "tar" ]
} }
next_version = '1.7.0' next_version = '1.8.0'
################ ################
#### Telegraf Functions #### Telegraf Functions