Compare commits

..

6 Commits

Author SHA1 Message Date
Daniel Nelson
1989287606 Use %q format when printing influxdb errors 2018-06-05 17:25:09 -07:00
Daniel Nelson
5f0cbd1255 Update changelog 2018-06-05 17:14:29 -07:00
Leszek Charkiewicz
3ef4dff4ec Add SSL/TLS support to Redis input (#4236) 2018-06-05 17:12:30 -07:00
Piotr Popieluch
dfe7b5eec2 Don't skip metrics during startup in aggregate phase (#4230) 2018-06-05 16:30:53 -07:00
Daniel Nelson
92a8f795f5 Set 1.6.4 release date 2018-06-05 12:11:15 -07:00
Daniel Nelson
b1d77ade55 Update master version to 1.8 2018-06-05 11:46:55 -07:00
45 changed files with 491 additions and 1113 deletions

View File

@@ -2,12 +2,15 @@
defaults:
defaults: &defaults
working_directory: '/go/src/github.com/influxdata/telegraf'
go-1_8: &go-1_8
docker:
- image: 'circleci/golang:1.8.7'
go-1_9: &go-1_9
docker:
- image: 'circleci/golang:1.9.7'
- image: 'circleci/golang:1.9.5'
go-1_10: &go-1_10
docker:
- image: 'circleci/golang:1.10.3'
- image: 'circleci/golang:1.10.1'
version: 2
jobs:
@@ -20,6 +23,12 @@ jobs:
root: '/go/src'
paths:
- '*'
test-go-1.8:
<<: [ *defaults, *go-1_8 ]
steps:
- attach_workspace:
at: '/go/src'
- run: 'make test-ci'
test-go-1.9:
<<: [ *defaults, *go-1_9 ]
steps:
@@ -57,6 +66,9 @@ workflows:
build_and_release:
jobs:
- 'deps'
- 'test-go-1.8':
requires:
- 'deps'
- 'test-go-1.9':
requires:
- 'deps'
@@ -65,11 +77,15 @@ workflows:
- 'deps'
- 'release':
requires:
- 'test-go-1.8'
- 'test-go-1.9'
- 'test-go-1.10'
nightly:
jobs:
- 'deps'
- 'test-go-1.8':
requires:
- 'deps'
- 'test-go-1.9':
requires:
- 'deps'
@@ -78,6 +94,7 @@ workflows:
- 'deps'
- 'nightly':
requires:
- 'test-go-1.8'
- 'test-go-1.9'
- 'test-go-1.10'
triggers:

View File

@@ -1,17 +1,10 @@
## v1.7.1 [2018-07-03]
## v1.8 [unreleased]
### Bugfixes
### Features
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal.
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues.
- [#4334](https://github.com/influxdata/telegraf/pull/4334): Fix syslog timestamp parsing with single digit day of month.
- [#2910](https://github.com/influxdata/telegraf/issues/2910): Handle mysql input variations in the user_statistics collecting.
- [#4293](https://github.com/influxdata/telegraf/issues/4293): Fix minmax and basicstats aggregators to use uint64.
- [#4290](https://github.com/influxdata/telegraf/issues/4290): Document swap input plugin.
- [#4316](https://github.com/influxdata/telegraf/issues/4316): Fix incorrect precision being applied to metric in http_listener.
- [#4236](https://github.com/influxdata/telegraf/pull/4236): Add SSL/TLS support to redis input.
## v1.7 [2018-06-12]
## v1.7 [unreleased]
### Release Notes
@@ -86,8 +79,6 @@
- [#2879](https://github.com/influxdata/telegraf/issues/2879): Fix wildcards and multi instance processes in win_perf_counters.
- [#2468](https://github.com/influxdata/telegraf/issues/2468): Fix crash on 32-bit Windows in win_perf_counters.
- [#4198](https://github.com/influxdata/telegraf/issues/4198): Fix win_perf_counters not collecting at every interval.
- [#4227](https://github.com/influxdata/telegraf/issues/4227): Use same flags for all BSD family ping variants.
- [#4266](https://github.com/influxdata/telegraf/issues/4266): Remove tags with empty values from Wavefront output.
## v1.6.4 [2018-06-05]

7
Godeps
View File

@@ -28,13 +28,13 @@ github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7
github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7
github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc
github.com/gorilla/mux 53c1911da2b537f792e7cafcb446b05ffe33b996
github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836
github.com/go-redis/redis 83fb42932f6145ce52df09860384a4653d2d332a
github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
github.com/hashicorp/consul 5174058f0d2bda63fa5198ab96c33d9a909c58ed
github.com/influxdata/go-syslog eecd51df3ad85464a2bab9b7d3a45bc1e299059e
github.com/influxdata/go-syslog 84f3b60009444d298f97454feb1f20cf91d1fa6e
github.com/influxdata/tail c43482518d410361b6c383d7aebce33d0471d7bc
github.com/influxdata/toml 2a2e3012f7cfbef64091cc79776311e65dfa211b
github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f
github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec
github.com/fsnotify/fsnotify c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9
github.com/jackc/pgx 63f58fd32edb5684b9e9f4cfaac847c6b42b3917
@@ -51,6 +51,7 @@ github.com/multiplay/go-ts3 07477f49b8dfa3ada231afc7b7b17617d42afe8e
github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/nats-io/gnatsd 393bbb7c031433e68707c8810fda0bfcfbe6ab9b
github.com/nats-io/go-nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nats ea9585611a4ab58a205b9b125ebd74c389a6b898
github.com/nats-io/nuid 289cccf02c178dc782430d534e3c1f5b72af807f
github.com/nsqio/go-nsq eee57a3ac4174c55924125bb15eeeda8cffb6e6f
github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8

View File

@@ -40,7 +40,7 @@ Ansible role: https://github.com/rossmcdonald/telegraf
### From Source:
Telegraf requires golang version 1.9 or newer, the Makefile requires GNU make.
Telegraf requires golang version 1.8+, the Makefile requires GNU make.
Dependencies are managed with [gdm](https://github.com/sparrc/gdm),
which is installed by the Makefile if you don't have it already.

View File

@@ -362,24 +362,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
metricC := make(chan telegraf.Metric, 100)
aggC := make(chan telegraf.Metric, 100)
// Start all ServicePlugins
for _, input := range a.Config.Inputs {
input.SetDefaultTags(a.Config.Tags)
switch p := input.Input.(type) {
case telegraf.ServiceInput:
acc := NewAccumulator(input, metricC)
// Service input plugins should set their own precision of their
// metrics.
acc.SetPrecision(time.Nanosecond, 0)
if err := p.Start(acc); err != nil {
log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
input.Name(), err.Error())
return err
}
defer p.Stop()
}
}
// Round collection to nearest interval by sleeping
if a.Config.Agent.RoundInterval {
i := int64(a.Config.Agent.Interval.Duration)
@@ -419,6 +401,25 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}(input, interval)
}
// Start all ServicePlugins inputs after all other
// plugins are loaded so that no metrics get dropped
for _, input := range a.Config.Inputs {
input.SetDefaultTags(a.Config.Tags)
switch p := input.Input.(type) {
case telegraf.ServiceInput:
acc := NewAccumulator(input, metricC)
// Service input plugins should set their own precision of their
// metrics.
acc.SetPrecision(time.Nanosecond, 0)
if err := p.Start(acc); err != nil {
log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
input.Name(), err.Error())
return err
}
defer p.Stop()
}
}
wg.Wait()
a.Close()
return nil

View File

@@ -58,7 +58,7 @@ var fService = flag.String("service", "",
var fRunAsConsole = flag.Bool("console", false, "run as console application (windows only)")
var (
nextVersion = "1.7.0"
nextVersion = "1.8.0"
version string
commit string
branch string
@@ -147,11 +147,11 @@ func reloadLoop(
shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGTERM)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)
go func() {
select {
case sig := <-signals:
if sig == os.Interrupt || sig == syscall.SIGTERM {
if sig == os.Interrupt {
close(shutdown)
}
if sig == syscall.SIGHUP {

View File

@@ -1,6 +1,7 @@
package models
import (
"log"
"time"
"github.com/influxdata/telegraf"
@@ -153,6 +154,7 @@ func (r *RunningAggregator) Run(
m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) {
// the metric is outside the current aggregation period, so
// skip it.
log.Printf("D! aggregator: metric \"%s\" is not in the current timewindow, skipping", m.Name())
continue
}
r.add(m)

View File

@@ -17,7 +17,7 @@ type ClientConfig struct {
// Deprecated in 1.7; use TLS variables above
SSLCA string `toml:"ssl_ca"`
SSLCert string `toml:"ssl_cert"`
SSLKey string `toml:"ssl_key"`
SSLKey string `toml:"ssl_ca"`
}
// ServerConfig represents the standard server TLS config.

View File

@@ -246,8 +246,6 @@ func convert(in interface{}) (float64, bool) {
return v, true
case int64:
return float64(v), true
case uint64:
return float64(v), true
default:
return 0, false
}

View File

@@ -28,7 +28,6 @@ var m2, _ = metric.New("m1",
"c": float64(4),
"d": float64(6),
"e": float64(200),
"f": uint64(200),
"ignoreme": "string",
"andme": true,
},
@@ -82,10 +81,6 @@ func TestBasicStatsWithPeriod(t *testing.T) {
"e_max": float64(200),
"e_min": float64(200),
"e_mean": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -149,10 +144,6 @@ func TestBasicStatsDifferentPeriods(t *testing.T) {
"e_max": float64(200),
"e_min": float64(200),
"e_mean": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
}
expectedTags = map[string]string{
"foo": "bar",
@@ -178,7 +169,6 @@ func TestBasicStatsWithOnlyCount(t *testing.T) {
"c_count": float64(2),
"d_count": float64(2),
"e_count": float64(1),
"f_count": float64(1),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -204,7 +194,6 @@ func TestBasicStatsWithOnlyMin(t *testing.T) {
"c_min": float64(2),
"d_min": float64(2),
"e_min": float64(200),
"f_min": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -230,7 +219,6 @@ func TestBasicStatsWithOnlyMax(t *testing.T) {
"c_max": float64(4),
"d_max": float64(6),
"e_max": float64(200),
"f_max": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -256,7 +244,6 @@ func TestBasicStatsWithOnlyMean(t *testing.T) {
"c_mean": float64(3),
"d_mean": float64(4),
"e_mean": float64(200),
"f_mean": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -282,7 +269,6 @@ func TestBasicStatsWithOnlySum(t *testing.T) {
"c_sum": float64(6),
"d_sum": float64(8),
"e_sum": float64(200),
"f_sum": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -413,8 +399,6 @@ func TestBasicStatsWithMinAndMax(t *testing.T) {
"d_min": float64(2),
"e_max": float64(200), //e
"e_min": float64(200),
"f_max": float64(200), //f
"f_min": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -466,11 +450,6 @@ func TestBasicStatsWithAllStats(t *testing.T) {
"e_min": float64(200),
"e_mean": float64(200),
"e_sum": float64(200),
"f_count": float64(1), //f
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
"f_sum": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",

View File

@@ -107,8 +107,6 @@ func convert(in interface{}) (float64, bool) {
return v, true
case int64:
return float64(v), true
case uint64:
return float64(v), true
default:
return 0, false
}

View File

@@ -38,7 +38,6 @@ var m2, _ = metric.New("m1",
"i": float64(1),
"j": float64(1),
"k": float64(200),
"l": uint64(200),
"ignoreme": "string",
"andme": true,
},
@@ -86,8 +85,6 @@ func TestMinMaxWithPeriod(t *testing.T) {
"j_min": float64(1),
"k_max": float64(200),
"k_min": float64(200),
"l_max": float64(200),
"l_min": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
@@ -157,8 +154,6 @@ func TestMinMaxDifferentPeriods(t *testing.T) {
"j_min": float64(1),
"k_max": float64(200),
"k_min": float64(200),
"l_max": float64(200),
"l_min": float64(200),
}
expectedTags = map[string]string{
"foo": "bar",

View File

@@ -343,9 +343,6 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
}
func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
h.mu.Lock()
defer h.mu.Unlock()
h.handler.SetTimePrecision(getPrecisionMultiplier(precision))
h.handler.SetTimeFunc(func() time.Time { return t })
metrics, err := h.parser.Parse(b)

View File

@@ -293,7 +293,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
timestamp = time.Unix(0, iv)
}
case SYSLOG_TIMESTAMP:
ts, err := time.ParseInLocation(time.Stamp, v, p.loc)
ts, err := time.ParseInLocation("Jan 02 15:04:05", v, p.loc)
if err == nil {
if ts.Year() == 0 {
ts = ts.AddDate(timestamp.Year(), 0, 0)

View File

@@ -971,39 +971,14 @@ func TestNewlineInPatterns(t *testing.T) {
require.NotNil(t, m)
}
func TestSyslogTimestamp(t *testing.T) {
tests := []struct {
name string
line string
expected time.Time
}{
{
name: "two digit day of month",
line: "Sep 25 09:01:55 value=42",
expected: time.Date(2018, time.September, 25, 9, 1, 55, 0, time.UTC),
},
{
name: "one digit day of month single space",
line: "Sep 2 09:01:55 value=42",
expected: time.Date(2018, time.September, 2, 9, 1, 55, 0, time.UTC),
},
{
name: "one digit day of month double space",
line: "Sep 2 09:01:55 value=42",
expected: time.Date(2018, time.September, 2, 9, 1, 55, 0, time.UTC),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &Parser{
Patterns: []string{`%{SYSLOGTIMESTAMP:timestamp:ts-syslog} value=%{NUMBER:value:int}`},
timeFunc: func() time.Time { return time.Date(2017, time.April, 1, 0, 0, 0, 0, time.UTC) },
}
require.NoError(t, p.Compile())
m, err := p.ParseLine(tt.line)
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, tt.expected, m.Time())
})
func TestSyslogTimestampParser(t *testing.T) {
p := &Parser{
Patterns: []string{`%{SYSLOGTIMESTAMP:timestamp:ts-syslog} value=%{NUMBER:value:int}`},
timeFunc: func() time.Time { return time.Date(2018, time.April, 1, 0, 0, 0, 0, nil) },
}
require.NoError(t, p.Compile())
m, err := p.ParseLine("Sep 25 09:01:55 value=42")
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year())
}

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"database/sql"
"fmt"
"log"
"strconv"
"strings"
"sync"
@@ -79,7 +80,7 @@ var sampleConfig = `
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
gather_process_list = true
#
## gather user statistics from INFORMATION_SCHEMA.USER_STATISTICS
## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS
gather_user_statistics = true
#
## gather auto_increment columns and max values from information schema
@@ -281,8 +282,9 @@ const (
GROUP BY command,state
ORDER BY null`
infoSchemaUserStatisticsQuery = `
SELECT *
FROM information_schema.user_statistics`
SELECT *,count(*)
FROM information_schema.user_statistics
GROUP BY user`
infoSchemaAutoIncQuery = `
SELECT table_schema, table_name, column_name, auto_increment,
CAST(pow(2, case data_type
@@ -759,6 +761,103 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
if len(fields) > 0 {
acc.AddFields("mysql", fields, tags)
}
// gather connection metrics from processlist for each user
if m.GatherProcessList {
conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
if err != nil {
log.Printf("E! MySQL Error gathering process list: %s", err)
} else {
for conn_rows.Next() {
var user string
var connections int64
err = conn_rows.Scan(&user, &connections)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := make(map[string]interface{})
if err != nil {
return err
}
fields["connections"] = connections
acc.AddFields("mysql_users", fields, tags)
}
}
}
// gather connection metrics from user_statistics for each user
if m.GatherUserStatistics {
conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user")
if err != nil {
log.Printf("E! MySQL Error gathering user stats: %s", err)
} else {
for conn_rows.Next() {
var user string
var total_connections int64
var concurrent_connections int64
var connected_time int64
var busy_time int64
var cpu_time int64
var bytes_received int64
var bytes_sent int64
var binlog_bytes_written int64
var rows_fetched int64
var rows_updated int64
var table_rows_read int64
var select_commands int64
var update_commands int64
var other_commands int64
var commit_transactions int64
var rollback_transactions int64
var denied_connections int64
var lost_connections int64
var access_denied int64
var empty_queries int64
var total_ssl_connections int64
err = conn_rows.Scan(&user, &total_connections, &concurrent_connections,
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
&empty_queries, &total_ssl_connections,
)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := map[string]interface{}{
"total_connections": total_connections,
"concurrent_connections": concurrent_connections,
"connected_time": connected_time,
"busy_time": busy_time,
"cpu_time": cpu_time,
"bytes_received": bytes_received,
"bytes_sent": bytes_sent,
"binlog_bytes_written": binlog_bytes_written,
"rows_fetched": rows_fetched,
"rows_updated": rows_updated,
"table_rows_read": table_rows_read,
"select_commands": select_commands,
"update_commands": update_commands,
"other_commands": other_commands,
"commit_transactions": commit_transactions,
"rollback_transactions": rollback_transactions,
"denied_connections": denied_connections,
"lost_connections": lost_connections,
"access_denied": access_denied,
"empty_queries": empty_queries,
"total_ssl_connections": total_ssl_connections,
}
acc.AddFields("mysql_user_stats", fields, tags)
}
}
}
return nil
}
@@ -809,29 +908,6 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
} else {
acc.AddFields("mysql_process_list", fields, tags)
}
// get count of connections from each user
conn_rows, err := db.Query("SELECT user, sum(1) AS connections FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
if err != nil {
return err
}
for conn_rows.Next() {
var user string
var connections int64
err = conn_rows.Scan(&user, &connections)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": user}
fields := make(map[string]interface{})
fields["connections"] = connections
acc.AddFields("mysql_users", fields, tags)
}
return nil
}
@@ -841,190 +917,77 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr
// run query
rows, err := db.Query(infoSchemaUserStatisticsQuery)
if err != nil {
// disable collecting if table is not found (mysql specific error)
// (suppresses repeat errors)
if strings.Contains(err.Error(), "nknown table 'user_statistics'") {
m.GatherUserStatistics = false
}
return err
}
defer rows.Close()
cols, err := columnsToLower(rows.Columns())
if err != nil {
return err
}
read, err := getColSlice(len(cols))
if err != nil {
return err
}
var (
user string
total_connections int64
concurrent_connections int64
connected_time int64
busy_time int64
cpu_time int64
bytes_received int64
bytes_sent int64
binlog_bytes_written int64
rows_fetched int64
rows_updated int64
table_rows_read int64
select_commands int64
update_commands int64
other_commands int64
commit_transactions int64
rollback_transactions int64
denied_connections int64
lost_connections int64
access_denied int64
empty_queries int64
total_ssl_connections int64
count uint32
)
servtag := getDSNTag(serv)
for rows.Next() {
err = rows.Scan(read...)
err = rows.Scan(&user, &total_connections, &concurrent_connections,
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
&empty_queries, &total_ssl_connections, &count,
)
if err != nil {
return err
}
tags := map[string]string{"server": servtag, "user": *read[0].(*string)}
fields := map[string]interface{}{}
tags := map[string]string{"server": servtag, "user": user}
fields := map[string]interface{}{
for i := range cols {
if i == 0 {
continue // skip "user"
}
switch v := read[i].(type) {
case *int64:
fields[cols[i]] = *v
case *float64:
fields[cols[i]] = *v
case *string:
fields[cols[i]] = *v
default:
return fmt.Errorf("Unknown column type - %T", v)
}
"total_connections": total_connections,
"concurrent_connections": concurrent_connections,
"connected_time": connected_time,
"busy_time": busy_time,
"cpu_time": cpu_time,
"bytes_received": bytes_received,
"bytes_sent": bytes_sent,
"binlog_bytes_written": binlog_bytes_written,
"rows_fetched": rows_fetched,
"rows_updated": rows_updated,
"table_rows_read": table_rows_read,
"select_commands": select_commands,
"update_commands": update_commands,
"other_commands": other_commands,
"commit_transactions": commit_transactions,
"rollback_transactions": rollback_transactions,
"denied_connections": denied_connections,
"lost_connections": lost_connections,
"access_denied": access_denied,
"empty_queries": empty_queries,
"total_ssl_connections": total_ssl_connections,
}
acc.AddFields("mysql_user_stats", fields, tags)
}
return nil
}
// columnsToLower converts selected column names to lowercase.
func columnsToLower(s []string, e error) ([]string, error) {
if e != nil {
return nil, e
}
d := make([]string, len(s))
for i := range s {
d[i] = strings.ToLower(s[i])
}
return d, nil
}
// getColSlice returns an in interface slice that can be used in the row.Scan().
func getColSlice(l int) ([]interface{}, error) {
// list of all possible column names
var (
user string
total_connections int64
concurrent_connections int64
connected_time int64
busy_time int64
cpu_time int64
bytes_received int64
bytes_sent int64
binlog_bytes_written int64
rows_read int64
rows_sent int64
rows_deleted int64
rows_inserted int64
rows_updated int64
select_commands int64
update_commands int64
other_commands int64
commit_transactions int64
rollback_transactions int64
denied_connections int64
lost_connections int64
access_denied int64
empty_queries int64
total_ssl_connections int64
max_statement_time_exceeded int64
// maria specific
fbusy_time float64
fcpu_time float64
// percona specific
rows_fetched int64
table_rows_read int64
)
switch l {
case 23: // maria5
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&fbusy_time,
&fcpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_read,
&rows_sent,
&rows_deleted,
&rows_inserted,
&rows_updated,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
}, nil
case 25: // maria10
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&fbusy_time,
&fcpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_read,
&rows_sent,
&rows_deleted,
&rows_inserted,
&rows_updated,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
&total_ssl_connections,
&max_statement_time_exceeded,
}, nil
case 22: // percona
return []interface{}{
&user,
&total_connections,
&concurrent_connections,
&connected_time,
&busy_time,
&cpu_time,
&bytes_received,
&bytes_sent,
&binlog_bytes_written,
&rows_fetched,
&rows_updated,
&table_rows_read,
&select_commands,
&update_commands,
&other_commands,
&commit_transactions,
&rollback_transactions,
&denied_connections,
&lost_connections,
&access_denied,
&empty_queries,
&total_ssl_connections,
}, nil
}
return nil, fmt.Errorf("Not Supported - %d columns", l)
}
// gatherPerfTableIOWaits can be used to get total count and time
// of I/O wait event for each table and process
func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {

View File

@@ -8,7 +8,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
nats "github.com/nats-io/go-nats"
"github.com/nats-io/nats"
)
type natsError struct {

View File

@@ -5,7 +5,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
nats "github.com/nats-io/go-nats"
"github.com/nats-io/nats"
"github.com/stretchr/testify/assert"
)

View File

@@ -14,7 +14,7 @@ To use this plugin you must enable the [monitoring](https://www.openldap.org/dev
# ldaps, starttls, or no encryption. default is an empty string, disabling all encryption.
# note that port will likely need to be changed to 636 for ldaps
# valid options: "" | "starttls" | "ldaps"
tls = ""
ssl = ""
# skip peer certificate verification. Default is false.
insecure_skip_verify = false

View File

@@ -15,11 +15,9 @@ import (
type Openldap struct {
Host string
Port int
SSL string `toml:"ssl"` // Deprecated in 1.7; use TLS
TLS string `toml:"tls"`
Ssl string
InsecureSkipVerify bool
SSLCA string `toml:"ssl_ca"` // Deprecated in 1.7; use TLSCA
TLSCA string `toml:"tls_ca"`
SslCa string
BindDn string
BindPassword string
ReverseMetricNames bool
@@ -32,7 +30,7 @@ const sampleConfig string = `
# ldaps, starttls, or no encryption. default is an empty string, disabling all encryption.
# note that port will likely need to be changed to 636 for ldaps
# valid options: "" | "starttls" | "ldaps"
tls = ""
ssl = ""
# skip peer certificate verification. Default is false.
insecure_skip_verify = false
@@ -72,11 +70,9 @@ func NewOpenldap() *Openldap {
return &Openldap{
Host: "localhost",
Port: 389,
SSL: "",
TLS: "",
Ssl: "",
InsecureSkipVerify: false,
SSLCA: "",
TLSCA: "",
SslCa: "",
BindDn: "",
BindPassword: "",
ReverseMetricNames: false,
@@ -85,19 +81,12 @@ func NewOpenldap() *Openldap {
// gather metrics
func (o *Openldap) Gather(acc telegraf.Accumulator) error {
if o.TLS == "" {
o.TLS = o.SSL
}
if o.TLSCA == "" {
o.TLSCA = o.SSLCA
}
var err error
var l *ldap.Conn
if o.TLS != "" {
if o.Ssl != "" {
// build tls config
clientTLSConfig := tls.ClientConfig{
TLSCA: o.TLSCA,
SSLCA: o.SslCa,
InsecureSkipVerify: o.InsecureSkipVerify,
}
tlsConfig, err := clientTLSConfig.TLSConfig()
@@ -105,13 +94,13 @@ func (o *Openldap) Gather(acc telegraf.Accumulator) error {
acc.AddError(err)
return nil
}
if o.TLS == "ldaps" {
if o.Ssl == "ldaps" {
l, err = ldap.DialTLS("tcp", fmt.Sprintf("%s:%d", o.Host, o.Port), tlsConfig)
if err != nil {
acc.AddError(err)
return nil
}
} else if o.TLS == "starttls" {
} else if o.Ssl == "starttls" {
l, err = ldap.Dial("tcp", fmt.Sprintf("%s:%d", o.Host, o.Port))
if err != nil {
acc.AddError(err)
@@ -119,7 +108,7 @@ func (o *Openldap) Gather(acc telegraf.Accumulator) error {
}
err = l.StartTLS(tlsConfig)
} else {
acc.AddError(fmt.Errorf("Invalid setting for ssl: %s", o.TLS))
acc.AddError(fmt.Errorf("Invalid setting for ssl: %s", o.Ssl))
return nil
}
} else {

View File

@@ -1,11 +1,10 @@
package openldap
import (
"gopkg.in/ldap.v2"
"strconv"
"testing"
"gopkg.in/ldap.v2"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -75,7 +74,7 @@ func TestOpenldapStartTLS(t *testing.T) {
o := &Openldap{
Host: testutil.GetLocalHost(),
Port: 389,
SSL: "starttls",
Ssl: "starttls",
InsecureSkipVerify: true,
}
@@ -93,7 +92,7 @@ func TestOpenldapLDAPS(t *testing.T) {
o := &Openldap{
Host: testutil.GetLocalHost(),
Port: 636,
SSL: "ldaps",
Ssl: "ldaps",
InsecureSkipVerify: true,
}
@@ -111,7 +110,7 @@ func TestOpenldapInvalidSSL(t *testing.T) {
o := &Openldap{
Host: testutil.GetLocalHost(),
Port: 636,
SSL: "invalid",
Ssl: "invalid",
InsecureSkipVerify: true,
}
@@ -130,7 +129,7 @@ func TestOpenldapBind(t *testing.T) {
o := &Openldap{
Host: testutil.GetLocalHost(),
Port: 389,
SSL: "",
Ssl: "",
InsecureSkipVerify: true,
BindDn: "cn=manager,cn=config",
BindPassword: "secret",
@@ -158,7 +157,7 @@ func TestOpenldapReverseMetrics(t *testing.T) {
o := &Openldap{
Host: testutil.GetLocalHost(),
Port: 389,
SSL: "",
Ssl: "",
InsecureSkipVerify: true,
BindDn: "cn=manager,cn=config",
BindPassword: "secret",

View File

@@ -175,7 +175,7 @@ func (p *Ping) args(url string) []string {
}
if p.Timeout > 0 {
switch runtime.GOOS {
case "darwin", "freebsd", "netbsd", "openbsd":
case "darwin":
args = append(args, "-W", strconv.FormatFloat(p.Timeout*1000, 'f', -1, 64))
case "linux":
args = append(args, "-W", strconv.FormatFloat(p.Timeout, 'f', -1, 64))
@@ -186,7 +186,7 @@ func (p *Ping) args(url string) []string {
}
if p.Deadline > 0 {
switch runtime.GOOS {
case "darwin", "freebsd", "netbsd", "openbsd":
case "darwin":
args = append(args, "-t", strconv.Itoa(p.Deadline))
case "linux":
args = append(args, "-w", strconv.Itoa(p.Deadline))
@@ -197,10 +197,10 @@ func (p *Ping) args(url string) []string {
}
if p.Interface != "" {
switch runtime.GOOS {
case "darwin", "freebsd", "netbsd", "openbsd":
args = append(args, "-S", p.Interface)
case "linux":
args = append(args, "-I", p.Interface)
case "freebsd", "darwin":
args = append(args, "-S", p.Interface)
default:
// Not sure the best option here, just assume GNU ping?
args = append(args, "-I", p.Interface)

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"os"
"os/exec"
"path/filepath"
"path"
"strings"
"time"
@@ -28,37 +28,36 @@ func getQueueDirectory() (string, error) {
return strings.TrimSpace(string(qd)), nil
}
func qScan(path string, acc telegraf.Accumulator) (int64, int64, int64, error) {
func qScan(path string) (int64, int64, int64, error) {
f, err := os.Open(path)
if err != nil {
return 0, 0, 0, err
}
finfos, err := f.Readdir(-1)
f.Close()
if err != nil {
return 0, 0, 0, err
}
var length, size int64
var oldest time.Time
err := filepath.Walk(path, func(_ string, finfo os.FileInfo, err error) error {
if err != nil {
acc.AddError(fmt.Errorf("error scanning %s: %s", path, err))
return nil
}
if finfo.IsDir() {
return nil
}
for _, finfo := range finfos {
length++
size += finfo.Size()
ctime := statCTime(finfo.Sys())
if ctime.IsZero() {
return nil
continue
}
if oldest.IsZero() || ctime.Before(oldest) {
oldest = ctime
}
return nil
})
if err != nil {
return 0, 0, 0, err
}
var age int64
if !oldest.IsZero() {
age = int64(time.Now().Sub(oldest) / time.Second)
} else if length != 0 {
} else if len(finfos) != 0 {
// system doesn't support ctime
age = -1
}
@@ -78,8 +77,8 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
}
}
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
length, size, age, err := qScan(filepath.Join(p.QueueDirectory, q), acc)
for _, q := range []string{"active", "hold", "incoming", "maildrop"} {
length, size, age, err := qScan(path.Join(p.QueueDirectory, q))
if err != nil {
acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err))
continue
@@ -91,6 +90,30 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
acc.AddFields("postfix_queue", fields, map[string]string{"queue": q})
}
var dLength, dSize int64
dAge := int64(-1)
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"} {
length, size, age, err := qScan(path.Join(p.QueueDirectory, "deferred", q))
if err != nil {
if os.IsNotExist(err) {
// the directories are created on first use
continue
}
acc.AddError(fmt.Errorf("error scanning queue deferred/%s: %s", q, err))
return nil
}
dLength += length
dSize += size
if age > dAge {
dAge = age
}
}
fields := map[string]interface{}{"length": dLength, "size": dSize}
if dAge != -1 {
fields["age"] = dAge
}
acc.AddFields("postfix_queue", fields, map[string]string{"queue": "deferred"})
return nil
}

View File

@@ -3,7 +3,7 @@ package postfix
import (
"io/ioutil"
"os"
"path/filepath"
"path"
"testing"
"github.com/influxdata/telegraf/testutil"
@@ -16,16 +16,19 @@ func TestGather(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(td)
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred/0/0", "deferred/F/F"} {
require.NoError(t, os.MkdirAll(filepath.FromSlash(td+"/"+q), 0755))
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
require.NoError(t, os.Mkdir(path.Join(td, q), 0755))
}
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "F"} { // "E" deliberately left off
require.NoError(t, os.Mkdir(path.Join(td, "deferred", q), 0755))
}
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/02"), []byte("defg"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/hold/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/incoming/01"), []byte("abcd"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/0/0/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/F/F/F1"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "02"), []byte("defg"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "hold", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "incoming", "01"), []byte("abcd"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "0", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "F", "F1"), []byte("abc"), 0644))
p := Postfix{
QueueDirectory: td,

View File

@@ -14,6 +14,13 @@
## If no servers are specified, then localhost is used as the host.
## If no port is specified, 6379 is used
servers = ["tcp://localhost:6379"]
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = true
```
### Measurements & Fields:

View File

@@ -13,11 +13,13 @@ import (
"github.com/go-redis/redis"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
type Redis struct {
Servers []string
tls.ClientConfig
clients []Client
initialized bool
@@ -56,6 +58,13 @@ var sampleConfig = `
## If no servers are specified, then localhost is used as the host.
## If no port is specified, 6379 is used
servers = ["tcp://localhost:6379"]
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = true
`
func (r *Redis) SampleConfig() string {
@@ -109,12 +118,18 @@ func (r *Redis) init(acc telegraf.Accumulator) error {
address = u.Host
}
tlsConfig, err := r.ClientConfig.TLSConfig()
if err != nil {
return err
}
client := redis.NewClient(
&redis.Options{
Addr: address,
Password: password,
Network: u.Scheme,
PoolSize: 1,
Addr: address,
Password: password,
Network: u.Scheme,
PoolSize: 1,
TLSConfig: tlsConfig,
},
)

View File

@@ -1,30 +0,0 @@
# Swap Input Plugin
The swap plugin collects system swap metrics.
For a more information on what swap memory is, read [All about Linux swap space](https://www.linux.com/news/all-about-linux-swap-space).
### Configuration:
```toml
# Read metrics about swap memory usage
[[inputs.swap]]
# no configuration
```
### Metrics:
- swap
- fields:
- free (int)
- total (int)
- used (int)
- used_percent (float)
- in (int)
- out (int)
### Example Output:
```
swap total=20855394304i,used_percent=45.43883523785713,used=9476448256i,free=1715331072i 1511894782000000000
```

View File

@@ -42,9 +42,45 @@ func (s *MemStats) Gather(acc telegraf.Accumulator) error {
return nil
}
type SwapStats struct {
ps PS
}
func (_ *SwapStats) Description() string {
return "Read metrics about swap memory usage"
}
func (_ *SwapStats) SampleConfig() string { return "" }
func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
swap, err := s.ps.SwapStat()
if err != nil {
return fmt.Errorf("error getting swap memory info: %s", err)
}
fieldsG := map[string]interface{}{
"total": swap.Total,
"used": swap.Used,
"free": swap.Free,
"used_percent": swap.UsedPercent,
}
fieldsC := map[string]interface{}{
"in": swap.Sin,
"out": swap.Sout,
}
acc.AddGauge("swap", fieldsG, nil)
acc.AddCounter("swap", fieldsC, nil)
return nil
}
func init() {
ps := newSystemPS()
inputs.Add("mem", func() telegraf.Input {
return &MemStats{ps: ps}
})
inputs.Add("swap", func() telegraf.Input {
return &SwapStats{ps: ps}
})
}

View File

@@ -30,6 +30,17 @@ func TestMemStats(t *testing.T) {
mps.On("VMStat").Return(vms, nil)
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
err = (&MemStats{&mps}).Gather(&acc)
require.NoError(t, err)
@@ -50,4 +61,15 @@ func TestMemStats(t *testing.T) {
acc.AssertContainsTaggedFields(t, "mem", memfields, make(map[string]string))
acc.Metrics = nil
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swapfields := map[string]interface{}{
"total": uint64(8123),
"used": uint64(1232),
"used_percent": float64(12.2),
"free": uint64(6412),
}
acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string))
}

View File

@@ -1,47 +0,0 @@
package system
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type SwapStats struct {
ps PS
}
func (_ *SwapStats) Description() string {
return "Read metrics about swap memory usage"
}
func (_ *SwapStats) SampleConfig() string { return "" }
func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
swap, err := s.ps.SwapStat()
if err != nil {
return fmt.Errorf("error getting swap memory info: %s", err)
}
fieldsG := map[string]interface{}{
"total": swap.Total,
"used": swap.Used,
"free": swap.Free,
"used_percent": swap.UsedPercent,
}
fieldsC := map[string]interface{}{
"in": swap.Sin,
"out": swap.Sout,
}
acc.AddGauge("swap", fieldsG, nil)
acc.AddCounter("swap", fieldsC, nil)
return nil
}
func init() {
ps := newSystemPS()
inputs.Add("swap", func() telegraf.Input {
return &SwapStats{ps: ps}
})
}

View File

@@ -1,38 +0,0 @@
package system
import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/shirou/gopsutil/mem"
"github.com/stretchr/testify/require"
)
func TestSwapStats(t *testing.T) {
var mps MockPS
var err error
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swapfields := map[string]interface{}{
"total": uint64(8123),
"used": uint64(1232),
"used_percent": float64(12.2),
"free": uint64(6412),
}
acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string))
}

View File

@@ -1,13 +1,16 @@
# win_perf_counters readme
This document presents the input plugin to read Performance Counters on Windows operating systems.
Input plugin to read Performance Counters on Windows operating systems.
The configuration is parsed and then tested for validity, such as
Configuration is parsed and then tested for validity such as
whether the Object, Instance and Counter exist on Telegraf startup.
Counter paths are refreshed periodically, see the [CountersRefreshInterval](#countersrefreshinterval)
Counter paths are refreshed periodically, see [CountersRefreshInterval](#countersrefreshinterval)
configuration parameter for more info.
Wildcards can be used in instance and counter names. Partial wildcards are supported only
in instance names on Windows Vista and newer.
In case of query for all instances `["*"]`, the plugin does not return the instance `_Total`
by default. See [IncludeTotal](#includetotal) for more info.
@@ -16,7 +19,7 @@ by default. See [IncludeTotal](#includetotal) for more info.
The examples contained in this file have been found on the internet
as counters used when performance monitoring
Active Directory and IIS in particular.
There are a lot of other good objects to monitor, if you know what to look for.
There are a lot other good objects to monitor, if you know what to look for.
This file is likely to be updated in the future with more examples for
useful configurations for separate scenarios.
@@ -31,41 +34,23 @@ Bool, if set to `true` will print out all matching performance objects.
Example:
`PrintValid=true`
#### UseWildcardsExpansion
If `UseWildcardsExpansion` is set to true, wildcards can be used in the
instance name and the counter name. When using localized Windows, counters
will be also be localized. Instance indexes will also be returned in the
instance name.
Partial wildcards (e.g. `chrome*`) are supported only in the instance name on Windows Vista and newer.
If disabled, wildcards (not partial) in instance names can still be used, but
instance indexes will not be returned in the instance names.
Example:
`UseWildcardsExpansion=true`
#### CountersRefreshInterval
Configured counters are matched against available counters at the interval
specified by the `CountersRefreshInterval` parameter. The default value is `1m` (1 minute).
specified by the `CountersRefreshInterval` parameter. Default value is `1m` (1 minute).
If wildcards are used in instance or counter names, they are expanded at this point, if the `UseWildcardsExpansion` param is set to `true`.
If wildcards are used in instance or counter names, they are expanded at this point.
Setting the `CountersRefreshInterval` too low (order of seconds) can cause Telegraf to create
Setting `CountersRefreshInterval` too low (order of seconds) can cause Telegraf to create
a high CPU load.
Set it to `0s` to disable periodic refreshing.
Example:
`CountersRefreshInterval=1m`
Set to `0s` to disable periodic refreshing.
#### PreVistaSupport
_Deprecated. Necessary features on Windows Vista and newer are checked dynamically_
Bool, if set to `true`, the plugin will use the localized PerfCounter interface that has been present since before Vista for backwards compatability.
Bool, if set to `true` will use the localized PerfCounter interface that has been present since before Vista for backwards compatability.
It is recommended NOT to use this on OSes starting with Vista and newer because it requires more configuration to use this than the newer interface present since Vista.
@@ -77,12 +62,12 @@ Example for Windows Server 2003, this would be set to true:
See Entry below.
### Entry
A new configuration entry consists of the TOML header starting with,
A new configuration entry consists of the TOML header to start with,
`[[inputs.win_perf_counters.object]]`.
This must follow before other plugin configurations,
beneath the main win_perf_counters entry, `[[inputs.win_perf_counters]]`.
Following this are 3 required key/value pairs and three optional parameters and their usage.
Following this are 3 required key/value pairs and the three optional parameters and their usage.
#### ObjectName
**Required**
@@ -94,18 +79,16 @@ Example: `ObjectName = "LogicalDisk"`
#### Instances
**Required**
The instances key (this is an array) declares the instances of a counter you would like returned,
Instances key (this is an array) is the instances of a counter you would like returned,
it can be one or more values.
Example: `Instances = ["C:","D:","E:"]`
This will return only for the instances
Example, `Instances = ["C:","D:","E:"]` will return only for the instances
C:, D: and E: where relevant. To get all instances of a Counter, use `["*"]` only.
By default any results containing `_Total` are stripped,
unless this is specified as the wanted instance.
Alternatively see the option `IncludeTotal` below.
It is also possible to set partial wildcards, eg. `["chrome*"]`, if the `UseWildcardsExpansion` param is set to `true`
It is also possible to set partial wildcards, eg. `["chrome*"]`
Some Objects do not have instances to select from at all.
Here only one option is valid if you want data back,
@@ -114,43 +97,41 @@ and that is to specify `Instances = ["------"]`.
#### Counters
**Required**
The Counters key (this is an array) declares the counters of the ObjectName
Counters key (this is an array) is the counters of the ObjectName
you would like returned, it can also be one or more values.
Example: `Counters = ["% Idle Time", "% Disk Read Time", "% Disk Write Time"]`
This must be specified for every counter you want the results of, or use
`["*"]` for all the counters of the object, if the `UseWildcardsExpansion` param
is set to `true`.
This must be specified for every counter you want the results of,
or use `["*"]` for all the counters for object.
#### Measurement
*Optional*
This key is optional. If it is not set it will be `win_perf_counters`.
In InfluxDB this is the key underneath which the returned data is stored.
So for ordering your data in a good manner,
This key is optional, if it is not set it will be `win_perf_counters`.
In InfluxDB this is the key by which the returned data is stored underneath,
so for ordering your data in a good manner,
this is a good key to set with a value when you want your IIS and Disk results stored
separately from Processor results.
Example: `Measurement = "win_disk"``
Example: `Measurement = "win_disk"
#### IncludeTotal
*Optional*
This key is optional. It is a simple bool.
This key is optional, it is a simple bool.
If it is not set to true or included it is treated as false.
This key only has effect if the Instances key is set to `["*"]`
and you would also like all instances containing `_Total` to be returned,
This key only has an effect if the Instances key is set to `["*"]`
and you would also like all instances containing `_Total` returned,
like `_Total`, `0,_Total` and so on where applicable
(Processor Information is one example).
#### WarnOnMissing
*Optional*
This key is optional. It is a simple bool.
This key is optional, it is a simple bool.
If it is not set to true or included it is treated as false.
This only has effect on the first execution of the plugin.
It will print out any ObjectName/Instance/Counter combinations
This only has an effect on the first execution of the plugin,
it will print out any ObjectName/Instance/Counter combinations
asked for that do not match. Useful when debugging new configurations.
#### FailOnMissing

View File

@@ -352,7 +352,7 @@ func PdhGetFormattedCounterValueDouble(hCounter PDH_HCOUNTER, lpdwType *uint32,
// time.Sleep(2000 * time.Millisecond)
// }
// }
func PdhGetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER, lpdwBufferSize *uint32, lpdwBufferCount *uint32, itemBuffer *byte) uint32 {
func PdhGetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER, lpdwBufferSize *uint32, lpdwBufferCount *uint32, itemBuffer *PDH_FMT_COUNTERVALUE_ITEM_DOUBLE) uint32 {
ret, _, _ := pdh_GetFormattedCounterArrayW.Call(
uintptr(hCounter),
uintptr(PDH_FMT_DOUBLE|PDH_FMT_NOCAP100),

View File

@@ -9,12 +9,6 @@ import (
"unsafe"
)
//PerformanceQuery is abstraction for PDH_FMT_COUNTERVALUE_ITEM_DOUBLE
type CounterValue struct {
InstanceName string
Value float64
}
//PerformanceQuery provides wrappers around Windows performance counters API for easy usage in GO
type PerformanceQuery interface {
Open() error
@@ -24,7 +18,6 @@ type PerformanceQuery interface {
GetCounterPath(counterHandle PDH_HCOUNTER) (string, error)
ExpandWildCardPath(counterPath string) ([]string, error)
GetFormattedCounterValueDouble(hCounter PDH_HCOUNTER) (float64, error)
GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error)
CollectData() error
AddEnglishCounterSupported() bool
}
@@ -158,28 +151,6 @@ func (m *PerformanceQueryImpl) GetFormattedCounterValueDouble(hCounter PDH_HCOUN
}
}
func (m *PerformanceQueryImpl) GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error) {
var buffSize uint32
var itemCount uint32
ret := PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, nil)
if ret == PDH_MORE_DATA {
buff := make([]byte, buffSize)
ret = PdhGetFormattedCounterArrayDouble(hCounter, &buffSize, &itemCount, &buff[0])
if ret == ERROR_SUCCESS {
items := (*[1 << 20]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE)(unsafe.Pointer(&buff[0]))[:itemCount]
values := make([]CounterValue, 0, itemCount)
for _, item := range items {
if item.FmtValue.CStatus == PDH_CSTATUS_VALID_DATA || item.FmtValue.CStatus == PDH_CSTATUS_NEW_DATA {
val := CounterValue{UTF16PtrToString(item.SzName), item.FmtValue.DoubleValue}
values = append(values, val)
}
}
return values, nil
}
}
return nil, NewPdhError(ret)
}
func (m *PerformanceQueryImpl) CollectData() error {
if m.query == 0 {
return errors.New("uninitialised query")
@@ -210,7 +181,7 @@ func UTF16ToStringArray(buf []uint16) []string {
stringLine := UTF16PtrToString(&buf[0])
for stringLine != "" {
strings = append(strings, stringLine)
nextLineStart += len([]rune(stringLine)) + 1
nextLineStart += len(stringLine) + 1
remainingBuf := buf[nextLineStart:]
stringLine = UTF16PtrToString(&remainingBuf[0])
}

View File

@@ -5,14 +5,13 @@ package win_perf_counters
import (
"errors"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"log"
"regexp"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)
var sampleConfig = `
@@ -23,10 +22,6 @@ var sampleConfig = `
## agent, it will not be gathered.
## Settings:
# PrintValid = false # Print All matching performance counters
# If UseWildcardsExpansion params is set to true, wildcards (partial wildcards in instance names and wildcards in counters names) in configured counter paths will be expanded
# and in case of localized Windows, counter paths will be also localized. It also returns instance indexes in instance names.
# If false, wildcards (not partial) in instance names will still be expanded, but instance indexes will not be returned in instance names.
#UseWildcardsExpansion = false
# Period after which counters will be reread from configuration and wildcards in counter paths expanded
CountersRefreshInterval="1m"
@@ -80,7 +75,6 @@ type Win_PerfCounters struct {
PreVistaSupport bool
Object []perfobject
CountersRefreshInterval internal.Duration
UseWildcardsExpansion bool
lastRefreshed time.Time
counters []*counter
@@ -143,59 +137,45 @@ func (m *Win_PerfCounters) SampleConfig() string {
return sampleConfig
}
//objectName string, counter string, instance string, measurement string, include_total bool
func (m *Win_PerfCounters) AddItem(counterPath string, objectName string, instance string, counterName string, measurement string, includeTotal bool) error {
var err error
var counterHandle PDH_HCOUNTER
func (m *Win_PerfCounters) AddItem(counterPath string, instance string, measurement string, includeTotal bool) error {
if !m.query.AddEnglishCounterSupported() {
counterHandle, err = m.query.AddCounterToQuery(counterPath)
_, err := m.query.AddCounterToQuery(counterPath)
if err != nil {
return err
}
} else {
counterHandle, err = m.query.AddEnglishCounterToQuery(counterPath)
counterHandle, err := m.query.AddEnglishCounterToQuery(counterPath)
if err != nil {
return err
}
}
if m.UseWildcardsExpansion {
origInstance := instance
counterPath, err = m.query.GetCounterPath(counterHandle)
if err != nil {
return err
}
counters, err := m.query.ExpandWildCardPath(counterPath)
}
counters, err := m.query.ExpandWildCardPath(counterPath)
if err != nil {
return err
}
for _, counterPath := range counters {
var err error
counterHandle, err := m.query.AddCounterToQuery(counterPath)
parsedObjectName, parsedInstance, parsedCounter, err := extractObjectInstanceCounterFromQuery(counterPath)
if err != nil {
return err
}
for _, counterPath := range counters {
var err error
counterHandle, err := m.query.AddCounterToQuery(counterPath)
objectName, instance, counterName, err = extractObjectInstanceCounterFromQuery(counterPath)
if err != nil {
return err
}
if instance == "_Total" && origInstance == "*" && !includeTotal {
continue
}
newItem := &counter{counterPath, objectName, counterName, instance, measurement,
includeTotal, counterHandle}
m.counters = append(m.counters, newItem)
if m.PrintValid {
log.Printf("Valid: %s\n", counterPath)
}
if parsedInstance == "_Total" && instance == "*" && !includeTotal {
continue
}
} else {
newItem := &counter{counterPath, objectName, counterName, instance, measurement,
newItem := &counter{counterPath, parsedObjectName, parsedCounter, parsedInstance, measurement,
includeTotal, counterHandle}
m.counters = append(m.counters, newItem)
if m.PrintValid {
log.Printf("Valid: %s\n", counterPath)
}
@@ -219,7 +199,7 @@ func (m *Win_PerfCounters) ParseConfig() error {
counterPath = "\\" + objectname + "(" + instance + ")\\" + counter
}
err := m.AddItem(counterPath, objectname, instance, counter, PerfObject.Measurement, PerfObject.IncludeTotal)
err := m.AddItem(counterPath, instance, PerfObject.Measurement, PerfObject.IncludeTotal)
if err != nil {
if PerfObject.FailOnMissing || PerfObject.WarnOnMissing {
@@ -245,9 +225,7 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
var err error
if m.lastRefreshed.IsZero() || (m.CountersRefreshInterval.Duration.Nanoseconds() > 0 && m.lastRefreshed.Add(m.CountersRefreshInterval.Duration).Before(time.Now())) {
if m.counters != nil {
m.counters = m.counters[:0]
}
m.counters = m.counters[:0]
err = m.query.Open()
if err != nil {
@@ -283,61 +261,22 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
// For iterate over the known metrics and get the samples.
for _, metric := range m.counters {
// collect
if m.UseWildcardsExpansion {
value, err := m.query.GetFormattedCounterValueDouble(metric.counterHandle)
if err == nil {
measurement := sanitizedChars.Replace(metric.measurement)
if measurement == "" {
measurement = "win_perf_counters"
}
var instance = InstanceGrouping{measurement, metric.instance, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(value)
} else {
//ignore invalid data from as some counters from process instances returns this sometimes
if phderr, ok := err.(*PdhError); ok && phderr.ErrorCode != PDH_INVALID_DATA && phderr.ErrorCode != PDH_CALC_NEGATIVE_VALUE {
return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err)
}
value, err := m.query.GetFormattedCounterValueDouble(metric.counterHandle)
if err == nil {
measurement := sanitizedChars.Replace(metric.measurement)
if measurement == "" {
measurement = "win_perf_counters"
}
var instance = InstanceGrouping{measurement, metric.instance, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(value)
} else {
counterValues, err := m.query.GetFormattedCounterArrayDouble(metric.counterHandle)
if err == nil {
for _, cValue := range counterValues {
var add bool
if metric.includeTotal {
// If IncludeTotal is set, include all.
add = true
} else if metric.instance == "*" && !strings.Contains(cValue.InstanceName, "_Total") {
// Catch if set to * and that it is not a '*_Total*' instance.
add = true
} else if metric.instance == cValue.InstanceName {
// Catch if we set it to total or some form of it
add = true
} else if strings.Contains(metric.instance, "#") && strings.HasPrefix(metric.instance, cValue.InstanceName) {
// If you are using a multiple instance identifier such as "w3wp#1"
// phd.dll returns only the first 2 characters of the identifier.
add = true
cValue.InstanceName = metric.instance
} else if metric.instance == "------" {
add = true
}
if add {
measurement := sanitizedChars.Replace(metric.measurement)
if measurement == "" {
measurement = "win_perf_counters"
}
var instance = InstanceGrouping{measurement, cValue.InstanceName, metric.objectName}
if collectFields[instance] == nil {
collectFields[instance] = make(map[string]interface{})
}
collectFields[instance][sanitizedChars.Replace(metric.counter)] = float32(cValue.Value)
}
}
//ignore invalid data from as some counters from process instances returns this sometimes
if phderr, ok := err.(*PdhError); ok && phderr.ErrorCode != PDH_INVALID_DATA && phderr.ErrorCode != PDH_CALC_NEGATIVE_VALUE {
return fmt.Errorf("error while getting value for counter %s: %v", metric.counterPath, err)
}
}
}

View File

@@ -81,29 +81,6 @@ func TestWinPerformanceQueryImpl(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, paths)
assert.True(t, len(paths) > 1)
err = query.Open()
require.NoError(t, err)
counterPath = "\\Process(*)\\% Processor Time"
hCounter, err = query.AddEnglishCounterToQuery(counterPath)
require.NoError(t, err)
assert.NotEqual(t, 0, hCounter)
err = query.CollectData()
require.NoError(t, err)
time.Sleep(time.Second)
err = query.CollectData()
require.NoError(t, err)
arr, err := query.GetFormattedCounterArrayDouble(hCounter)
require.NoError(t, err)
assert.True(t, len(arr) > 0, "Too")
err = query.Close()
require.NoError(t, err)
}
func TestWinPerfcountersConfigGet1(t *testing.T) {
@@ -596,7 +573,7 @@ func TestWinPerfcountersCollect2(t *testing.T) {
perfobjects[0] = PerfObject
m := Win_PerfCounters{PrintValid: false, Object: perfobjects, query: &PerformanceQueryImpl{}, UseWildcardsExpansion: true}
m := Win_PerfCounters{PrintValid: false, Object: perfobjects, query: &PerformanceQueryImpl{}}
var acc testutil.Accumulator
err := m.Gather(&acc)
require.NoError(t, err)

View File

@@ -25,14 +25,6 @@ type FakePerformanceQuery struct {
openCalled bool
}
func (m *testCounter) ToCounterValue() *CounterValue {
_, inst, _, _ := extractObjectInstanceCounterFromQuery(m.path)
if inst == "" {
inst = "--"
}
return &CounterValue{inst, m.value}
}
func (m *FakePerformanceQuery) Open() error {
if m.openCalled {
err := m.Close()
@@ -110,48 +102,6 @@ func (m *FakePerformanceQuery) GetFormattedCounterValueDouble(counterHandle PDH_
}
return 0, fmt.Errorf("GetFormattedCounterValueDouble: invalid handle: %d", counterHandle)
}
func (m *FakePerformanceQuery) findCounterByPath(counterPath string) *testCounter {
for _, c := range m.counters {
if c.path == counterPath {
return &c
}
}
return nil
}
func (m *FakePerformanceQuery) findCounterByHandle(counterHandle PDH_HCOUNTER) *testCounter {
for _, c := range m.counters {
if c.handle == counterHandle {
return &c
}
}
return nil
}
func (m *FakePerformanceQuery) GetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER) ([]CounterValue, error) {
if !m.openCalled {
return nil, errors.New("GetFormattedCounterArrayDouble: uninitialised query")
}
for _, c := range m.counters {
if c.handle == hCounter {
if e, ok := m.expandPaths[c.path]; ok {
counters := make([]CounterValue, 0, len(e))
for _, p := range e {
counter := m.findCounterByPath(p)
if counter != nil && counter.value > 0 {
counters = append(counters, *counter.ToCounterValue())
} else {
return nil, fmt.Errorf("GetFormattedCounterArrayDouble: invalid counter : %s", p)
}
}
return counters, nil
} else {
return nil, fmt.Errorf("GetFormattedCounterArrayDouble: invalid counter : %d", hCounter)
}
}
}
return nil, fmt.Errorf("GetFormattedCounterArrayDouble: invalid counter : %d, no paths found", hCounter)
}
func (m *FakePerformanceQuery) CollectData() error {
if !m.openCalled {
@@ -202,7 +152,7 @@ func TestAddItemSimple(t *testing.T) {
}}
err = m.query.Open()
require.NoError(t, err)
err = m.AddItem(cps1[0], "O", "I", "c", "test", false)
err = m.AddItem(cps1[0], "I", "test", false)
require.NoError(t, err)
err = m.query.Close()
require.NoError(t, err)
@@ -211,7 +161,7 @@ func TestAddItemSimple(t *testing.T) {
func TestAddItemInvalidCountPath(t *testing.T) {
var err error
cps1 := []string{"\\O\\C"}
m := Win_PerfCounters{PrintValid: false, Object: nil, UseWildcardsExpansion: true, query: &FakePerformanceQuery{
m := Win_PerfCounters{PrintValid: false, Object: nil, query: &FakePerformanceQuery{
counters: createCounterMap(cps1, []float64{1.1}),
expandPaths: map[string][]string{
cps1[0]: {"\\O/C"},
@@ -220,7 +170,7 @@ func TestAddItemInvalidCountPath(t *testing.T) {
}}
err = m.query.Open()
require.NoError(t, err)
err = m.AddItem("\\O\\C", "O", "------", "C", "test", false)
err = m.AddItem("\\O\\C", "*", "test", false)
require.Error(t, err)
err = m.query.Close()
require.NoError(t, err)
@@ -247,24 +197,13 @@ func TestParseConfigBasic(t *testing.T) {
assert.Len(t, m.counters, 4)
err = m.query.Close()
require.NoError(t, err)
m.UseWildcardsExpansion = true
m.counters = nil
err = m.query.Open()
require.NoError(t, err)
err = m.ParseConfig()
require.NoError(t, err)
assert.Len(t, m.counters, 4)
err = m.query.Close()
require.NoError(t, err)
}
func TestParseConfigNoInstance(t *testing.T) {
var err error
perfObjects := createPerfObject("m", "O", []string{"------"}, []string{"C1", "C2"}, false, false)
cps1 := []string{"\\O\\C1", "\\O\\C2"}
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: false, query: &FakePerformanceQuery{
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap(cps1, []float64{1.1, 1.2}),
expandPaths: map[string][]string{
cps1[0]: {cps1[0]},
@@ -279,17 +218,6 @@ func TestParseConfigNoInstance(t *testing.T) {
assert.Len(t, m.counters, 2)
err = m.query.Close()
require.NoError(t, err)
m.UseWildcardsExpansion = true
m.counters = nil
err = m.query.Open()
require.NoError(t, err)
err = m.ParseConfig()
require.NoError(t, err)
assert.Len(t, m.counters, 2)
err = m.query.Close()
require.NoError(t, err)
}
func TestParseConfigInvalidCounterError(t *testing.T) {
@@ -311,16 +239,6 @@ func TestParseConfigInvalidCounterError(t *testing.T) {
require.Error(t, err)
err = m.query.Close()
require.NoError(t, err)
m.UseWildcardsExpansion = true
m.counters = nil
err = m.query.Open()
require.NoError(t, err)
err = m.ParseConfig()
require.Error(t, err)
err = m.query.Close()
require.NoError(t, err)
}
func TestParseConfigInvalidCounterNoError(t *testing.T) {
@@ -342,24 +260,13 @@ func TestParseConfigInvalidCounterNoError(t *testing.T) {
require.NoError(t, err)
err = m.query.Close()
require.NoError(t, err)
m.UseWildcardsExpansion = true
m.counters = nil
err = m.query.Open()
require.NoError(t, err)
err = m.ParseConfig()
require.NoError(t, err)
err = m.query.Close()
require.NoError(t, err)
}
func TestParseConfigTotalExpansion(t *testing.T) {
func TestParseConfigTotal(t *testing.T) {
var err error
perfObjects := createPerfObject("m", "O", []string{"*"}, []string{"*"}, true, true)
cps1 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(_Total)\\C1", "\\O(_Total)\\C2"}
m := Win_PerfCounters{PrintValid: false, UseWildcardsExpansion: true, Object: perfObjects, query: &FakePerformanceQuery{
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap(append(cps1, "\\O(*)\\*"), []float64{1.1, 1.2, 1.3, 1.4, 0}),
expandPaths: map[string][]string{
"\\O(*)\\*": cps1,
@@ -376,7 +283,7 @@ func TestParseConfigTotalExpansion(t *testing.T) {
perfObjects[0].IncludeTotal = false
m = Win_PerfCounters{PrintValid: false, UseWildcardsExpansion: true, Object: perfObjects, query: &FakePerformanceQuery{
m = Win_PerfCounters{PrintValid: false, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap(append(cps1, "\\O(*)\\*"), []float64{1.1, 1.2, 1.3, 1.4, 0}),
expandPaths: map[string][]string{
"\\O(*)\\*": cps1,
@@ -396,7 +303,7 @@ func TestParseConfigExpand(t *testing.T) {
var err error
perfObjects := createPerfObject("m", "O", []string{"*"}, []string{"*"}, false, false)
cps1 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(I2)\\C1", "\\O(I2)\\C2"}
m := Win_PerfCounters{PrintValid: false, UseWildcardsExpansion: true, Object: perfObjects, query: &FakePerformanceQuery{
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap(append(cps1, "\\O(*)\\*"), []float64{1.1, 1.2, 1.3, 1.4, 0}),
expandPaths: map[string][]string{
"\\O(*)\\*": cps1,
@@ -439,17 +346,6 @@ func TestSimpleGather(t *testing.T) {
"objectname": "O",
}
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
m.UseWildcardsExpansion = true
m.counters = nil
m.lastRefreshed = time.Time{}
var acc2 testutil.Accumulator
err = m.Gather(&acc2)
require.NoError(t, err)
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
}
func TestGatherInvalidDataIgnore(t *testing.T) {
@@ -481,25 +377,15 @@ func TestGatherInvalidDataIgnore(t *testing.T) {
"objectname": "O",
}
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
m.UseWildcardsExpansion = true
m.counters = nil
m.lastRefreshed = time.Time{}
var acc2 testutil.Accumulator
err = m.Gather(&acc2)
require.NoError(t, err)
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
}
//tests with expansion
func TestGatherRefreshingWithExpansion(t *testing.T) {
func TestGatherRefreshing(t *testing.T) {
var err error
if testing.Short() {
t.Skip("Skipping long taking test in short mode")
}
measurement := "test"
perfObjects := createPerfObject(measurement, "O", []string{"*"}, []string{"*"}, true, false)
perfObjects := createPerfObject(measurement, "O", []string{"*"}, []string{"*"}, false, false)
cps1 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(I2)\\C1", "\\O(I2)\\C2"}
fpm := &FakePerformanceQuery{
counters: createCounterMap(append(cps1, "\\O(*)\\*"), []float64{1.1, 1.2, 1.3, 1.4, 0}),
@@ -508,7 +394,7 @@ func TestGatherRefreshingWithExpansion(t *testing.T) {
},
addEnglishSupported: true,
}
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: true, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}}
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)
assert.Len(t, m.counters, 4)
@@ -577,211 +463,3 @@ func TestGatherRefreshingWithExpansion(t *testing.T) {
acc3.AssertContainsTaggedFields(t, measurement, fields3, tags3)
}
func TestGatherRefreshingWithoutExpansion(t *testing.T) {
var err error
if testing.Short() {
t.Skip("Skipping long taking test in short mode")
}
measurement := "test"
perfObjects := createPerfObject(measurement, "O", []string{"*"}, []string{"C1", "C2"}, true, false)
cps1 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(I2)\\C1", "\\O(I2)\\C2"}
fpm := &FakePerformanceQuery{
counters: createCounterMap(append([]string{"\\O(*)\\C1", "\\O(*)\\C2"}, cps1...), []float64{0, 0, 1.1, 1.2, 1.3, 1.4}),
expandPaths: map[string][]string{
"\\O(*)\\C1": {cps1[0], cps1[2]},
"\\O(*)\\C2": {cps1[1], cps1[3]},
},
addEnglishSupported: true,
}
m := Win_PerfCounters{PrintValid: false, Object: perfObjects, UseWildcardsExpansion: false, query: fpm, CountersRefreshInterval: internal.Duration{Duration: time.Second * 10}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)
assert.Len(t, m.counters, 2)
require.NoError(t, err)
assert.Len(t, acc1.Metrics, 2)
fields1 := map[string]interface{}{
"C1": float32(1.1),
"C2": float32(1.2),
}
tags1 := map[string]string{
"instance": "I1",
"objectname": "O",
}
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
fields2 := map[string]interface{}{
"C1": float32(1.3),
"C2": float32(1.4),
}
tags2 := map[string]string{
"instance": "I2",
"objectname": "O",
}
acc1.AssertContainsTaggedFields(t, measurement, fields2, tags2)
//test finding new instance
cps2 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(I2)\\C1", "\\O(I2)\\C2", "\\O(I3)\\C1", "\\O(I3)\\C2"}
fpm = &FakePerformanceQuery{
counters: createCounterMap(append([]string{"\\O(*)\\C1", "\\O(*)\\C2"}, cps2...), []float64{0, 0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6}),
expandPaths: map[string][]string{
"\\O(*)\\C1": {cps2[0], cps2[2], cps2[4]},
"\\O(*)\\C2": {cps2[1], cps2[3], cps2[5]},
},
addEnglishSupported: true,
}
m.query = fpm
fpm.Open()
var acc2 testutil.Accumulator
fields3 := map[string]interface{}{
"C1": float32(1.5),
"C2": float32(1.6),
}
tags3 := map[string]string{
"instance": "I3",
"objectname": "O",
}
//test before elapsing CounterRefreshRate counters are not refreshed
err = m.Gather(&acc2)
require.NoError(t, err)
assert.Len(t, m.counters, 2)
assert.Len(t, acc2.Metrics, 3)
acc2.AssertContainsTaggedFields(t, measurement, fields1, tags1)
acc2.AssertContainsTaggedFields(t, measurement, fields2, tags2)
acc2.AssertContainsTaggedFields(t, measurement, fields3, tags3)
//test changed configuration
perfObjects = createPerfObject(measurement, "O", []string{"*"}, []string{"C1", "C2", "C3"}, true, false)
cps3 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(I1)\\C3", "\\O(I2)\\C1", "\\O(I2)\\C2", "\\O(I2)\\C3"}
fpm = &FakePerformanceQuery{
counters: createCounterMap(append([]string{"\\O(*)\\C1", "\\O(*)\\C2", "\\O(*)\\C3"}, cps3...), []float64{0, 0, 0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6}),
expandPaths: map[string][]string{
"\\O(*)\\C1": {cps3[0], cps3[3]},
"\\O(*)\\C2": {cps3[1], cps3[4]},
"\\O(*)\\C3": {cps3[2], cps3[5]},
},
addEnglishSupported: true,
}
m.query = fpm
m.Object = perfObjects
fpm.Open()
time.Sleep(m.CountersRefreshInterval.Duration)
var acc3 testutil.Accumulator
err = m.Gather(&acc3)
require.NoError(t, err)
assert.Len(t, acc3.Metrics, 2)
fields4 := map[string]interface{}{
"C1": float32(1.1),
"C2": float32(1.2),
"C3": float32(1.3),
}
tags4 := map[string]string{
"instance": "I1",
"objectname": "O",
}
fields5 := map[string]interface{}{
"C1": float32(1.4),
"C2": float32(1.5),
"C3": float32(1.6),
}
tags5 := map[string]string{
"instance": "I2",
"objectname": "O",
}
acc3.AssertContainsTaggedFields(t, measurement, fields4, tags4)
acc3.AssertContainsTaggedFields(t, measurement, fields5, tags5)
}
func TestGatherTotalNoExpansion(t *testing.T) {
var err error
measurement := "m"
perfObjects := createPerfObject(measurement, "O", []string{"*"}, []string{"C1", "C2"}, true, true)
cps1 := []string{"\\O(I1)\\C1", "\\O(I1)\\C2", "\\O(_Total)\\C1", "\\O(_Total)\\C2"}
m := Win_PerfCounters{PrintValid: false, UseWildcardsExpansion: false, Object: perfObjects, query: &FakePerformanceQuery{
counters: createCounterMap(append([]string{"\\O(*)\\C1", "\\O(*)\\C2"}, cps1...), []float64{0, 0, 1.1, 1.2, 1.3, 1.4}),
expandPaths: map[string][]string{
"\\O(*)\\C1": {cps1[0], cps1[2]},
"\\O(*)\\C2": {cps1[1], cps1[3]},
},
addEnglishSupported: true,
}}
var acc1 testutil.Accumulator
err = m.Gather(&acc1)
require.NoError(t, err)
assert.Len(t, m.counters, 2)
assert.Len(t, acc1.Metrics, 2)
fields1 := map[string]interface{}{
"C1": float32(1.1),
"C2": float32(1.2),
}
tags1 := map[string]string{
"instance": "I1",
"objectname": "O",
}
acc1.AssertContainsTaggedFields(t, measurement, fields1, tags1)
fields2 := map[string]interface{}{
"C1": float32(1.3),
"C2": float32(1.4),
}
tags2 := map[string]string{
"instance": "_Total",
"objectname": "O",
}
acc1.AssertContainsTaggedFields(t, measurement, fields2, tags2)
perfObjects[0].IncludeTotal = false
m.counters = nil
m.lastRefreshed = time.Time{}
var acc2 testutil.Accumulator
err = m.Gather(&acc2)
require.NoError(t, err)
assert.Len(t, m.counters, 2)
assert.Len(t, acc2.Metrics, 1)
acc2.AssertContainsTaggedFields(t, measurement, fields1, tags1)
acc2.AssertDoesNotContainsTaggedFields(t, measurement, fields2, tags2)
}
// list of nul terminated strings from WinAPI
var unicodeStringListWithEnglishChars = []uint16{0x5c, 0x5c, 0x54, 0x34, 0x38, 0x30, 0x5c, 0x50, 0x68, 0x79, 0x73, 0x69, 0x63, 0x61, 0x6c, 0x44, 0x69, 0x73, 0x6b, 0x28, 0x30, 0x20, 0x43, 0x3a, 0x29, 0x5c, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x44, 0x69, 0x73, 0x6b, 0x20, 0x51, 0x75, 0x65, 0x75, 0x65, 0x20, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x0, 0x5c, 0x5c, 0x54, 0x34, 0x38, 0x30, 0x5c, 0x50, 0x68, 0x79, 0x73, 0x69, 0x63, 0x61, 0x6c, 0x44, 0x69, 0x73, 0x6b, 0x28, 0x5f, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x29, 0x5c, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x44, 0x69, 0x73, 0x6b, 0x20, 0x51, 0x75, 0x65, 0x75, 0x65, 0x20, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x0, 0x0}
var unicodeStringListWithCzechChars = []uint16{0x5c, 0x5c, 0x54, 0x34, 0x38, 0x30, 0x5c, 0x46, 0x79, 0x7a, 0x69, 0x63, 0x6b, 0xfd, 0x20, 0x64, 0x69, 0x73, 0x6b, 0x28, 0x30, 0x20, 0x43, 0x3a, 0x29, 0x5c, 0x41, 0x6b, 0x74, 0x75, 0xe1, 0x6c, 0x6e, 0xed, 0x20, 0x64, 0xe9, 0x6c, 0x6b, 0x61, 0x20, 0x66, 0x72, 0x6f, 0x6e, 0x74, 0x79, 0x20, 0x64, 0x69, 0x73, 0x6b, 0x75, 0x0, 0x5c, 0x5c, 0x54, 0x34, 0x38, 0x30, 0x5c, 0x46, 0x79, 0x7a, 0x69, 0x63, 0x6b, 0xfd, 0x20, 0x64, 0x69, 0x73, 0x6b, 0x28, 0x5f, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x29, 0x5c, 0x41, 0x6b, 0x74, 0x75, 0xe1, 0x6c, 0x6e, 0xed, 0x20, 0x64, 0xe9, 0x6c, 0x6b, 0x61, 0x20, 0x66, 0x72, 0x6f, 0x6e, 0x74, 0x79, 0x20, 0x64, 0x69, 0x73, 0x6b, 0x75, 0x0, 0x0}
var unicodeStringListSingleItem = []uint16{0x5c, 0x5c, 0x54, 0x34, 0x38, 0x30, 0x5c, 0x50, 0x68, 0x79, 0x73, 0x69, 0x63, 0x61, 0x6c, 0x44, 0x69, 0x73, 0x6b, 0x28, 0x30, 0x20, 0x43, 0x3a, 0x29, 0x5c, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x44, 0x69, 0x73, 0x6b, 0x20, 0x51, 0x75, 0x65, 0x75, 0x65, 0x20, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x0, 0x0}
var unicodeStringListNoItem = []uint16{0x0}
var stringArrayWithEnglishChars = []string{
"\\\\T480\\PhysicalDisk(0 C:)\\Current Disk Queue Length",
"\\\\T480\\PhysicalDisk(_Total)\\Current Disk Queue Length",
}
var stringArrayWithCzechChars = []string{
"\\\\T480\\Fyzick\u00fd disk(0 C:)\\Aktu\u00e1ln\u00ed d\u00e9lka fronty disku",
"\\\\T480\\Fyzick\u00fd disk(_Total)\\Aktu\u00e1ln\u00ed d\u00e9lka fronty disku",
}
var stringArraySingleItem = []string{
"\\\\T480\\PhysicalDisk(0 C:)\\Current Disk Queue Length",
}
func TestUTF16ToStringArray(t *testing.T) {
singleItem := UTF16ToStringArray(unicodeStringListSingleItem)
assert.True(t, assert.ObjectsAreEqual(singleItem, stringArraySingleItem), "Not equal single arrays")
noItem := UTF16ToStringArray(unicodeStringListNoItem)
assert.Nil(t, noItem)
engStrings := UTF16ToStringArray(unicodeStringListWithEnglishChars)
assert.True(t, assert.ObjectsAreEqual(engStrings, stringArrayWithEnglishChars), "Not equal eng arrays")
czechStrings := UTF16ToStringArray(unicodeStringListWithCzechChars)
assert.True(t, assert.ObjectsAreEqual(czechStrings, stringArrayWithCzechChars), "Not equal czech arrays")
}

View File

@@ -140,17 +140,17 @@ func (i *InfluxDB) Connect() error {
i.serializer.SetFieldTypeSupport(influx.UintSupport)
}
for _, u := range urls {
u, err := url.Parse(u)
for _, loc := range urls {
u, err := url.Parse(loc)
if err != nil {
return fmt.Errorf("error parsing url [%s]: %v", u, err)
return fmt.Errorf("error parsing url [%q]: %v", loc, err)
}
var proxy *url.URL
if len(i.HTTPProxy) > 0 {
proxy, err = url.Parse(i.HTTPProxy)
if err != nil {
return fmt.Errorf("error parsing proxy_url [%s]: %v", proxy, err)
return fmt.Errorf("error parsing proxy_url [%q]: %v", proxy, err)
}
}
@@ -170,7 +170,7 @@ func (i *InfluxDB) Connect() error {
i.clients = append(i.clients, c)
default:
return fmt.Errorf("unsupported scheme [%s]: %q", u, u.Scheme)
return fmt.Errorf("unsupported scheme [%q]: %q", u, u.Scheme)
}
}
@@ -209,14 +209,14 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
if apiError.Type == DatabaseNotFound {
err := client.CreateDatabase(ctx)
if err != nil {
log.Printf("E! [outputs.influxdb] when writing to [%s]: database %q not found and failed to recreate",
log.Printf("E! [outputs.influxdb] when writing to [%q]: database %q not found and failed to recreate",
client.URL(), client.Database())
}
}
}
}
log.Printf("E! [outputs.influxdb]: when writing to [%s]: %v", client.URL(), err)
log.Printf("E! [outputs.influxdb]: when writing to [%q]: %v", client.URL(), err)
}
return errors.New("could not write any address")
@@ -231,7 +231,7 @@ func (i *InfluxDB) udpClient(url *url.URL) (Client, error) {
c, err := i.CreateUDPClientF(config)
if err != nil {
return nil, fmt.Errorf("error creating UDP client [%s]: %v", url, err)
return nil, fmt.Errorf("error creating UDP client [%q]: %v", url, err)
}
return c, nil
@@ -261,13 +261,13 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL)
c, err := i.CreateHTTPClientF(config)
if err != nil {
return nil, fmt.Errorf("error creating HTTP client [%s]: %v", url, err)
return nil, fmt.Errorf("error creating HTTP client [%q]: %v", url, err)
}
if !i.SkipDatabaseCreation {
err = c.CreateDatabase(ctx)
if err != nil {
log.Printf("W! [outputs.influxdb] when writing to [%s]: database %q creation failed: %v",
log.Printf("W! [outputs.influxdb] when writing to [%q]: database %q creation failed: %v",
c.URL(), c.Database(), err)
}
}

View File

@@ -3,7 +3,7 @@ package nats
import (
"fmt"
nats_client "github.com/nats-io/go-nats"
nats_client "github.com/nats-io/nats"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"

View File

@@ -180,14 +180,6 @@ func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricPoint {
}
func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string) {
// Remove all empty tags.
for k, v := range mTags {
if v == "" {
delete(mTags, k)
}
}
var source string
sourceTagFound := false

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"encoding/json"
"fmt"
"log"
"strconv"
"strings"
"time"
@@ -45,15 +44,25 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
tags[k] = v
}
for _, tag := range p.TagKeys {
switch v := jsonOut[tag].(type) {
case string:
tags[tag] = v
case bool:
tags[tag] = strconv.FormatBool(v)
case float64:
tags[tag] = strconv.FormatFloat(v, 'f', -1, 64)
}
delete(jsonOut, tag)
}
f := JSONFlattener{}
err := f.FullFlattenJSON("", jsonOut, true, true)
err := f.FlattenJSON("", jsonOut)
if err != nil {
return nil, err
}
tags, nFields := p.switchFieldToTag(tags, f.Fields)
metric, err := metric.New(p.MetricName, tags, nFields, time.Now().UTC())
metric, err := metric.New(p.MetricName, tags, f.Fields, time.Now().UTC())
if err != nil {
return nil, err
@@ -61,43 +70,6 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
return append(metrics, metric), nil
}
//will take in field map with strings and bools,
//search for TagKeys that match fieldnames and add them to tags
//will delete any strings/bools that shouldn't be fields
//assumes that any non-numeric values in TagKeys should be displayed as tags
func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]interface{}) (map[string]string, map[string]interface{}) {
for _, name := range p.TagKeys {
//switch any fields in tagkeys into tags
if fields[name] == nil {
continue
}
switch value := fields[name].(type) {
case string:
tags[name] = value
delete(fields, name)
case bool:
tags[name] = strconv.FormatBool(value)
delete(fields, name)
case float64:
tags[name] = strconv.FormatFloat(value, 'f', -1, 64)
delete(fields, name)
default:
log.Printf("E! [parsers.json] Unrecognized type %T", value)
}
}
//remove any additional string/bool values from fields
for k := range fields {
switch fields[k].(type) {
case string:
delete(fields, k)
case bool:
delete(fields, k)
}
}
return tags, fields
}
func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
buf = bytes.TrimSpace(buf)
buf = bytes.TrimPrefix(buf, utf8BOM)
@@ -147,7 +119,6 @@ func (f *JSONFlattener) FlattenJSON(
if f.Fields == nil {
f.Fields = make(map[string]interface{})
}
return f.FullFlattenJSON(fieldname, v, false, false)
}

View File

@@ -4,7 +4,6 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
@@ -441,29 +440,3 @@ func TestHttpJsonBOM(t *testing.T) {
_, err := parser.Parse(jsonBOM)
assert.NoError(t, err)
}
//for testing issue #4260
func TestJSONParseNestedArray(t *testing.T) {
testString := `{
"total_devices": 5,
"total_threads": 10,
"shares": {
"total": 5,
"accepted": 5,
"rejected": 0,
"avg_find_time": 4,
"tester": "work",
"tester2": "don't want this",
"tester3": 7.93
}
}`
parser := JSONParser{
MetricName: "json_test",
TagKeys: []string{"total_devices", "total_threads", "shares_tester", "shares_tester3"},
}
metrics, err := parser.Parse([]byte(testString))
require.NoError(t, err)
require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags()))
}

View File

@@ -1,11 +1,11 @@
# Converter Processor
The converter processor is used to change the type of tag or field values. In
addition to changing field types it can convert between fields and tags.
addition to changing field types it can convert fields to tags and vis versa.
Values that cannot be converted are dropped.
**Note:** When converting tags to fields, take care to ensure the series is still
**Note:** When converting tags to fields, take care not to ensure the series is still
uniquely identifiable. Fields with the same series key (measurement + tags)
will overwrite one another.

View File

@@ -95,7 +95,7 @@ supported_packages = {
"freebsd": [ "tar" ]
}
next_version = '1.7.0'
next_version = '1.8.0'
################
#### Telegraf Functions