Compare commits

...

57 Commits
1.4.1 ... 1.4.4

Author SHA1 Message Date
Daniel Nelson
ddcb93188f Set 1.4.4 release date
(cherry picked from commit 07297e80a8)
2017-11-08 15:22:31 -08:00
Daniel Nelson
cb193d0e8a Update changelog
(cherry picked from commit 2c2dc97702)
2017-11-07 11:43:33 -08:00
Daniel Nelson
600f9fa067 Use current time if container read time is zero value (#3437)
(cherry picked from commit cbbdf1043b)
2017-11-07 11:43:29 -08:00
Daniel Nelson
4cedae9d2c Update changelog
(cherry picked from commit c55f285de0)
2017-11-07 11:37:45 -08:00
Daniel Nelson
4c8e8fc2f1 Update gopsutil to v2.17.10 (#3441)
(cherry picked from commit e1295c41c8)
2017-11-07 11:37:24 -08:00
Daniel Nelson
7c5bcfe84e Update changelog
(cherry picked from commit e0df62c27b)
2017-11-06 17:43:02 -08:00
Bob Shannon
efa20d05fa Redact datadog API key in log output (#3420)
(cherry picked from commit fdf12ce6b4)
2017-11-06 17:42:57 -08:00
Daniel Nelson
187c7e12a8 Update changelog
(cherry picked from commit c116af35c7)
2017-10-30 15:36:17 -07:00
Daniel Nelson
f29a994743 Use explicit schemas in mqtt_consumer input (#3401)
(cherry picked from commit fcfcc803b1)
2017-10-30 15:35:48 -07:00
Daniel Nelson
f416f429d7 Fix circle-ci Go version 2017-10-30 15:06:34 -07:00
Daniel Nelson
ec6b1aae94 Fix unittest for golang 1.9
(cherry picked from commit cafb22d145)
2017-10-30 15:04:30 -07:00
Daniel Nelson
b473b6a659 Set release date for 1.4.3 2017-10-25 14:16:05 -07:00
Daniel Nelson
e5d08a4d86 Update changelog
(cherry picked from commit 13c1f1524a)
2017-10-24 16:26:06 -07:00
Daniel Nelson
3c894bb056 Use golang.org/x/sys/unix instead of syscall in diskio (#3384)
(cherry picked from commit 9a062498e7)
2017-10-24 16:26:06 -07:00
Daniel Nelson
d2d173b792 Update changelog
(cherry picked from commit f64cf89db1)
2017-10-24 15:47:19 -07:00
Daniel Nelson
145f7da42e If the connector name cannot be unquoted, use the raw value (#3371)
(cherry picked from commit 6d1777276c)
2017-10-24 15:47:15 -07:00
Daniel Nelson
f9f8d9ed7e Update changelog
(cherry picked from commit 65580759fc)
2017-10-23 12:37:30 -07:00
Sergei Smolianinov
0dd3b0507b Fix ACL token usage in consul input plugin (#3376)
(cherry picked from commit d2f9fc7d8c)
2017-10-23 12:37:30 -07:00
Daniel Nelson
c44b4fcc89 Update changelog
(cherry picked from commit 7088d98304)
2017-10-19 16:35:11 -07:00
Daniel Nelson
cb9c1653d3 Remove warning when JSON contains null value (#3359)
(cherry picked from commit 4243403432)
2017-10-19 16:35:10 -07:00
Daniel Nelson
cf7590b88e Update changelog
(cherry picked from commit 9b59cdd10e)
2017-10-18 13:58:25 -07:00
clheikes
5a7d889908 Fix TELEGRAF_OPTS expansion in systemd service unit (#3354)
(cherry picked from commit 02baa696c3)
2017-10-18 13:58:25 -07:00
Daniel Nelson
ef652678dd Update changelog
(cherry picked from commit a4fa19252f)
2017-10-18 12:58:41 -07:00
Dimitris Rozakis
c4cc57956b Respect path prefix in influx output uri (#3224)
(cherry picked from commit 9c8f4afa37)
2017-10-18 12:58:41 -07:00
Daniel Nelson
7b8a761c63 Update changelog
(cherry picked from commit 7ba376964c)
2017-10-18 12:26:07 -07:00
Ayrdrie
7d66319f59 Fix mongodb input panic when restarting mongodb (#3355)
(cherry picked from commit a75ab3e190)
2017-10-18 12:26:03 -07:00
Pierre Fersing
22f64f8417 Fix CPU system plugin gets stuck after system suspend (#3342)
(cherry picked from commit f5a9d1bc75)
2017-10-16 14:27:58 -07:00
Daniel Nelson
6b4deb01bb Update changelog
(cherry picked from commit 3ea41e885c)
2017-10-16 11:27:28 -07:00
Daniel Nelson
e4835cdc30 Fix case sensitivity issue in sqlserver query (#3336)
(cherry picked from commit 1f348037b7)
2017-10-16 11:27:28 -07:00
Daniel Nelson
e32ffdde06 Update changelog
(cherry picked from commit 0f9f757da7)
2017-10-12 17:27:24 -07:00
Windkit Li
0f905eaee7 Fix snmpwalk address format in leofs input (#3328)
(cherry picked from commit 2f8d0f4d47)
2017-10-12 17:27:24 -07:00
Daniel Nelson
4d48dcb84f Update changelog
(cherry picked from commit 024dea2ff9)
2017-10-12 15:56:09 -07:00
Daniel Nelson
17377b4942 Fix container name filters in docker input (#3331)
(cherry picked from commit fa25e123d8)
2017-10-12 15:55:50 -07:00
Daniel Nelson
0cc5fc0ce4 Set 1.4.2 release date
(cherry picked from commit 4e0c8e6026)
2017-10-10 13:31:06 -07:00
Daniel Nelson
8011109466 Remove InfluxDB path prefix test
This tests a feature that is not yet on this branch and the test was
mistakenly backported.
2017-10-05 16:37:58 -07:00
Daniel Nelson
588f0c77f8 Update changelog
(cherry picked from commit 13c7802b84)
2017-10-05 16:17:06 -07:00
Daniel Nelson
4301b8e32a Use chunked transfer encoding in InfluxDB output (#3307)
(cherry picked from commit cce40c515a)
2017-10-05 16:17:05 -07:00
Daniel Nelson
3c9d7db0a0 Update changelog
(cherry picked from commit 6e1fa559a3)
2017-10-05 16:06:11 -07:00
Daniel Nelson
f7b3eb1ebd Fix panic in cpu input if number of cpus changes (#3306)
(cherry picked from commit f56dda0ac8)
2017-10-05 16:06:11 -07:00
Daniel Nelson
b8ab827629 Update changelog
(cherry picked from commit 002ccf3295)
2017-10-03 15:27:49 -07:00
Daniel Nelson
d03e2fca32 Add support for proxy environment variables to http_response (#3302)
(cherry picked from commit a163effa6d)
2017-10-03 15:26:55 -07:00
Daniel Nelson
eca00c10e0 Add support for standard proxy env vars in outputs. (#3212)
(cherry picked from commit 7b08f9d099)
2017-10-03 15:26:44 -07:00
Daniel Nelson
9cf19df04e Update changelog
(cherry picked from commit f67350107d)
2017-10-02 17:17:10 -07:00
Daniel Nelson
e77c2b76e7 Fix case sensitivity error in sqlserver input (#3287)
(cherry picked from commit 8e3ed96d6f)
2017-10-02 17:17:10 -07:00
Daniel Nelson
c749c43dab Fix mqtt_consumer connection_timeout test
(cherry picked from commit cdca81c999)
2017-10-02 12:32:05 -07:00
Daniel Nelson
1be17ea5af Update example config 2017-09-29 16:04:02 -07:00
Daniel Nelson
e1155bec20 Update changelog
(cherry picked from commit 29b6f4168c)
2017-09-29 16:01:11 -07:00
Daniel Nelson
cfac750469 Fix format of connection_timeout in mqtt_consumer (#3286)
(cherry picked from commit 3d62e045af)
2017-09-29 16:01:11 -07:00
Daniel Nelson
f10d5b43c4 Update changelog
(cherry picked from commit cadafa6405)
2017-09-26 16:03:30 -07:00
Daniel Nelson
47b2d04d5b Allow JSON data format to contain zero metrics (#3268)
(cherry picked from commit 22a9ffbb9d)
2017-09-26 16:03:30 -07:00
Daniel Nelson
0e0da57b9a Update changelog
(cherry picked from commit 2e1457a496)
2017-09-26 15:38:41 -07:00
Daniel Nelson
8e7cf0109e Fix parsing of JSON with a UTF8 BOM in httpjson (#3267)
(cherry picked from commit 8614445235)
2017-09-26 15:38:41 -07:00
Daniel Nelson
5b791fd2e5 Update changelog
(cherry picked from commit f23d1eb078)
2017-09-26 15:29:19 -07:00
Daniel Nelson
293b1a0093 Fix dmcache tests with 32bit int
(cherry picked from commit ef5c12bd86)
2017-09-26 15:29:01 -07:00
Daniel Nelson
761ea06d6a Fix cgroup tests with 32bit int
(cherry picked from commit c013cc1497)
2017-09-26 15:29:01 -07:00
Daniel Nelson
8fafe9878b Fix ceph tests with 32bit int
(cherry picked from commit bb665cf013)
2017-09-26 15:29:01 -07:00
Daniel Nelson
5da3eef38b Allow 64bit integers in kernel_vmstat
(cherry picked from commit f823fc73f6)
2017-09-26 15:29:00 -07:00
45 changed files with 995 additions and 689 deletions

View File

@@ -1,3 +1,41 @@
## v1.4.4 [2017-11-08]
- [#3401](https://github.com/influxdata/telegraf/pull/3401): Use schema specified in mqtt_consumer input.
- [#3419](https://github.com/influxdata/telegraf/issues/3419): Redact datadog API key in log output.
- [#3311](https://github.com/influxdata/telegraf/issues/3311): Fix error getting pids in netstat input.
- [#3339](https://github.com/influxdata/telegraf/issues/3339): Support HOST_VAR envvar to locate /var in system input.
- [#3383](https://github.com/influxdata/telegraf/issues/3383): Use current time if docker container read time is zero value.
## v1.4.3 [2017-10-25]
### Bugfixes
- [#3327](https://github.com/influxdata/telegraf/issues/3327): Fix container name filters in docker input.
- [#3321](https://github.com/influxdata/telegraf/issues/3321): Fix snmpwalk address format in leofs input.
- [#3329](https://github.com/influxdata/telegraf/issues/3329): Fix case sensitivity issue in sqlserver query.
- [#3342](https://github.com/influxdata/telegraf/pull/3342): Fix CPU input plugin stuck after suspend on Linux.
- [#3013](https://github.com/influxdata/telegraf/issues/3013): Fix mongodb input panic when restarting mongodb.
- [#3224](https://github.com/influxdata/telegraf/pull/3224): Preserve url path prefix in influx output.
- [#3354](https://github.com/influxdata/telegraf/pull/3354): Fix TELEGRAF_OPTS expansion in systemd service unit.
- [#3357](https://github.com/influxdata/telegraf/issues/3357): Remove warning when JSON contains null value.
- [#3375](https://github.com/influxdata/telegraf/issues/3375): Fix ACL token usage in consul input plugin.
- [#3369](https://github.com/influxdata/telegraf/issues/3369): Fix unquoting error with Tomcat 6.
- [#3373](https://github.com/influxdata/telegraf/issues/3373): Fix syscall panic in diskio on some Linux systems.
## v1.4.2 [2017-10-10]
### Bugfixes
- [#3259](https://github.com/influxdata/telegraf/issues/3259): Fix error if int larger than 32-bit in /proc/vmstat.
- [#3265](https://github.com/influxdata/telegraf/issues/3265): Fix parsing of JSON with a UTF8 BOM in httpjson.
- [#2887](https://github.com/influxdata/telegraf/issues/2887): Allow JSON data format to contain zero metrics.
- [#3284](https://github.com/influxdata/telegraf/issues/3284): Fix format of connection_timeout in mqtt_consumer.
- [#3081](https://github.com/influxdata/telegraf/issues/3081): Fix case sensitivity error in sqlserver input.
- [#3297](https://github.com/influxdata/telegraf/issues/3297): Add support for proxy environment variables to http_response.
- [#1588](https://github.com/influxdata/telegraf/issues/1588): Add support for standard proxy env vars in outputs.
- [#3282](https://github.com/influxdata/telegraf/issues/3282): Fix panic in cpu input if number of cpus changes.
- [#2854](https://github.com/influxdata/telegraf/issues/2854): Use chunked transfer encoding in InfluxDB output.
## v1.4.1 [2017-09-26]
### Bugfixes

2
Godeps
View File

@@ -60,7 +60,7 @@ github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160
github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c
github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693
github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b
github.com/shirou/gopsutil 9a4a9167ad3b4355dbf1c2c7a0f5f0d3fb1e9ab9
github.com/shirou/gopsutil 48fc5612898a1213aa5d6a0fb2d4f7b968e898fb
github.com/shirou/w32 3c9377fc6748f222729a8270fe2775d149a249ad
github.com/Shopify/sarama c01858abb625b73a3af51d0798e4ad42c8147093
github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d

View File

@@ -1,11 +1,14 @@
machine:
go:
version: 1.8.1
services:
- docker
- memcached
- redis
- rabbitmq-server
post:
- sudo rm -rf /usr/local/go
- wget https://storage.googleapis.com/golang/go1.8.4.linux-amd64.tar.gz
- sudo tar -C /usr/local -xzf go1.8.4.linux-amd64.tar.gz
- go version
dependencies:
override:

View File

@@ -84,9 +84,7 @@
# Configuration for influxdb server to send metrics to
[[outputs.influxdb]]
## The HTTP or UDP URL for your InfluxDB instance. Each item should be
## of the form:
## scheme "://" host [ ":" port]
## The full HTTP or UDP URL for your InfluxDB instance.
##
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
@@ -121,6 +119,9 @@
## HTTP Proxy Config
# http_proxy = "http://corporate.proxy:3128"
## Optional HTTP headers
# http_headers = {"X-Special-Header" = "Special-Value"}
## Compress each HTTP request payload using GZIP.
# content_encoding = "gzip"
@@ -1095,10 +1096,7 @@
# # Read metrics from fail2ban.
# [[inputs.fail2ban]]
# ## fail2ban-client require root access.
# ## Setting 'use_sudo' to true will make use of sudo to run fail2ban-client.
# ## Users must configure sudo to allow telegraf user to run fail2ban-client with no password.
# ## This plugin run only "fail2ban-client status".
# ## Use sudo to run fail2ban-client
# use_sudo = false
@@ -1472,8 +1470,8 @@
# # Read metrics from a LeoFS Server via SNMP
# [[inputs.leofs]]
# ## An array of URLs of the form:
# ## "udp://" host [ ":" port]
# servers = ["udp://127.0.0.1:4020"]
# ## host [ ":" port]
# servers = ["127.0.0.1:4020"]
# # Provides Linux sysctl fs metrics
@@ -1960,13 +1958,9 @@
# ## Includes connection time, any redirects, and reading the response body.
# # client_timeout = "4s"
#
# ## A list of nodes to gather as the rabbitmq_node measurement. If not
# ## specified, metrics for all nodes are gathered.
# ## A list of nodes to pull metrics about. If not specified, metrics for
# ## all nodes are gathered.
# # nodes = ["rabbit@node1", "rabbit@node2"]
#
# ## A list of queues to gather as the rabbitmq_queue measurement. If not
# ## specified, metrics for all queues are gathered.
# # queues = ["telegraf"]
# # Read raindrops stats (raindrops - real-time stats for preforking Rack servers)
@@ -2382,10 +2376,10 @@
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
#
# ## Data format to output.
# ## Data format to consume.
# ## Each data format has its own unique set of configuration options, read
# ## more about them here:
# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
@@ -2519,6 +2513,8 @@
# servers = ["localhost:1883"]
# ## MQTT QoS, must be 0, 1, or 2
# qos = 0
# ## Connection timeout for initial connection in seconds
# connection_timeout = "30s"
#
# ## Topics to subscribe to
# topics = [

View File

@@ -77,3 +77,40 @@ func compileFilterNoGlob(filters []string) Filter {
}
return &out
}
type IncludeExcludeFilter struct {
include Filter
exclude Filter
}
func NewIncludeExcludeFilter(
include []string,
exclude []string,
) (Filter, error) {
in, err := Compile(include)
if err != nil {
return nil, err
}
ex, err := Compile(exclude)
if err != nil {
return nil, err
}
return &IncludeExcludeFilter{in, ex}, nil
}
func (f *IncludeExcludeFilter) Match(s string) bool {
if f.include != nil {
if !f.include.Match(s) {
return false
}
}
if f.exclude != nil {
if f.exclude.Match(s) {
return false
}
}
return true
}

View File

@@ -31,7 +31,7 @@ func TestNewMetric(t *testing.T) {
assert.Equal(t, tags, m.Tags())
assert.Equal(t, fields, m.Fields())
assert.Equal(t, "cpu", m.Name())
assert.Equal(t, now, m.Time())
assert.Equal(t, now.UnixNano(), m.Time().UnixNano())
assert.Equal(t, now.UnixNano(), m.UnixNano())
}
@@ -414,7 +414,7 @@ func TestNewGaugeMetric(t *testing.T) {
assert.Equal(t, tags, m.Tags())
assert.Equal(t, fields, m.Fields())
assert.Equal(t, "cpu", m.Name())
assert.Equal(t, now, m.Time())
assert.Equal(t, now.UnixNano(), m.Time().UnixNano())
assert.Equal(t, now.UnixNano(), m.UnixNano())
}
@@ -436,7 +436,7 @@ func TestNewCounterMetric(t *testing.T) {
assert.Equal(t, tags, m.Tags())
assert.Equal(t, fields, m.Fields())
assert.Equal(t, "cpu", m.Name())
assert.Equal(t, now, m.Time())
assert.Equal(t, now.UnixNano(), m.Time().UnixNano())
assert.Equal(t, now.UnixNano(), m.UnixNano())
}

View File

@@ -26,7 +26,7 @@ func TestParseSockId(t *testing.T) {
func TestParseMonDump(t *testing.T) {
dump, err := parseDump(monPerfDump)
assert.NoError(t, err)
assert.InEpsilon(t, 5678670180, dump["cluster"]["osd_kb_used"], epsilon)
assert.InEpsilon(t, int64(5678670180), dump["cluster"]["osd_kb_used"], epsilon)
assert.InEpsilon(t, 6866.540527000, dump["paxos"]["store_state_latency.sum"], epsilon)
}

View File

@@ -225,7 +225,7 @@ var fileFormats = [...]fileFormat{
}
func numberOrString(s string) interface{} {
i, err := strconv.Atoi(s)
i, err := strconv.ParseInt(s, 10, 64)
if err == nil {
return i
}

View File

@@ -31,17 +31,17 @@ func TestCgroupStatistics_1(t *testing.T) {
"path": "testdata/memory",
}
fields := map[string]interface{}{
"memory.stat.cache": 1739362304123123123,
"memory.stat.rss": 1775325184,
"memory.stat.rss_huge": 778043392,
"memory.stat.mapped_file": 421036032,
"memory.stat.dirty": -307200,
"memory.max_usage_in_bytes.0": 0,
"memory.max_usage_in_bytes.1": -1,
"memory.max_usage_in_bytes.2": 2,
"memory.limit_in_bytes": 223372036854771712,
"memory.stat.cache": int64(1739362304123123123),
"memory.stat.rss": int64(1775325184),
"memory.stat.rss_huge": int64(778043392),
"memory.stat.mapped_file": int64(421036032),
"memory.stat.dirty": int64(-307200),
"memory.max_usage_in_bytes.0": int64(0),
"memory.max_usage_in_bytes.1": int64(-1),
"memory.max_usage_in_bytes.2": int64(2),
"memory.limit_in_bytes": int64(223372036854771712),
"memory.use_hierarchy": "12-781",
"notify_on_release": 0,
"notify_on_release": int64(0),
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
}
@@ -63,10 +63,10 @@ func TestCgroupStatistics_2(t *testing.T) {
"path": "testdata/cpu",
}
fields := map[string]interface{}{
"cpuacct.usage_percpu.0": -1452543795404,
"cpuacct.usage_percpu.1": 1376681271659,
"cpuacct.usage_percpu.2": 1450950799997,
"cpuacct.usage_percpu.3": -1473113374257,
"cpuacct.usage_percpu.0": int64(-1452543795404),
"cpuacct.usage_percpu.1": int64(1376681271659),
"cpuacct.usage_percpu.2": int64(1450950799997),
"cpuacct.usage_percpu.3": int64(-1473113374257),
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
}
@@ -88,7 +88,7 @@ func TestCgroupStatistics_3(t *testing.T) {
"path": "testdata/memory/group_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"memory.limit_in_bytes": int64(223372036854771712),
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
@@ -115,7 +115,7 @@ func TestCgroupStatistics_4(t *testing.T) {
"path": "testdata/memory/group_1/group_1_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"memory.limit_in_bytes": int64(223372036854771712),
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
@@ -147,7 +147,7 @@ func TestCgroupStatistics_5(t *testing.T) {
"path": "testdata/memory/group_1/group_1_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"memory.limit_in_bytes": int64(223372036854771712),
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
@@ -174,9 +174,9 @@ func TestCgroupStatistics_6(t *testing.T) {
"path": "testdata/memory",
}
fields := map[string]interface{}{
"memory.usage_in_bytes": 3513667584,
"memory.usage_in_bytes": int64(3513667584),
"memory.use_hierarchy": "12-781",
"memory.kmem.limit_in_bytes": 9223372036854771712,
"memory.kmem.limit_in_bytes": int64(9223372036854771712),
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
}

View File

@@ -69,6 +69,10 @@ func (c *Consul) createAPIClient() (*api.Client, error) {
config.Datacenter = c.Datacentre
}
if c.Token != "" {
config.Token = c.Token
}
if c.Username != "" {
config.HttpAuth = &api.HttpBasicAuth{
Username: c.Username,

View File

@@ -20,7 +20,7 @@ var sampleChecks = []*api.HealthCheck{
},
}
func TestGatherHealtCheck(t *testing.T) {
func TestGatherHealthCheck(t *testing.T) {
expectedFields := map[string]interface{}{
"check_name": "foo.health",
"status": "passing",

View File

@@ -16,21 +16,21 @@ const metricName = "dmcache"
type cacheStatus struct {
device string
length int
length int64
target string
metadataBlocksize int
metadataUsed int
metadataTotal int
cacheBlocksize int
cacheUsed int
cacheTotal int
readHits int
readMisses int
writeHits int
writeMisses int
demotions int
promotions int
dirty int
metadataBlocksize int64
metadataUsed int64
metadataTotal int64
cacheBlocksize int64
cacheUsed int64
cacheTotal int64
readHits int64
readMisses int64
writeHits int64
writeMisses int64
demotions int64
promotions int64
dirty int64
}
func (c *DMCache) Gather(acc telegraf.Accumulator) error {
@@ -69,12 +69,12 @@ func parseDMSetupStatus(line string) (cacheStatus, error) {
}
status.device = strings.TrimRight(values[0], ":")
status.length, err = strconv.Atoi(values[2])
status.length, err = strconv.ParseInt(values[2], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.target = values[3]
status.metadataBlocksize, err = strconv.Atoi(values[4])
status.metadataBlocksize, err = strconv.ParseInt(values[4], 10, 64)
if err != nil {
return cacheStatus{}, err
}
@@ -82,15 +82,15 @@ func parseDMSetupStatus(line string) (cacheStatus, error) {
if len(metadata) != 2 {
return cacheStatus{}, parseError
}
status.metadataUsed, err = strconv.Atoi(metadata[0])
status.metadataUsed, err = strconv.ParseInt(metadata[0], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.metadataTotal, err = strconv.Atoi(metadata[1])
status.metadataTotal, err = strconv.ParseInt(metadata[1], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.cacheBlocksize, err = strconv.Atoi(values[6])
status.cacheBlocksize, err = strconv.ParseInt(values[6], 10, 64)
if err != nil {
return cacheStatus{}, err
}
@@ -98,39 +98,39 @@ func parseDMSetupStatus(line string) (cacheStatus, error) {
if len(cache) != 2 {
return cacheStatus{}, parseError
}
status.cacheUsed, err = strconv.Atoi(cache[0])
status.cacheUsed, err = strconv.ParseInt(cache[0], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.cacheTotal, err = strconv.Atoi(cache[1])
status.cacheTotal, err = strconv.ParseInt(cache[1], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.readHits, err = strconv.Atoi(values[8])
status.readHits, err = strconv.ParseInt(values[8], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.readMisses, err = strconv.Atoi(values[9])
status.readMisses, err = strconv.ParseInt(values[9], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.writeHits, err = strconv.Atoi(values[10])
status.writeHits, err = strconv.ParseInt(values[10], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.writeMisses, err = strconv.Atoi(values[11])
status.writeMisses, err = strconv.ParseInt(values[11], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.demotions, err = strconv.Atoi(values[12])
status.demotions, err = strconv.ParseInt(values[12], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.promotions, err = strconv.Atoi(values[13])
status.promotions, err = strconv.ParseInt(values[13], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.dirty, err = strconv.Atoi(values[14])
status.dirty, err = strconv.ParseInt(values[14], 10, 64)
if err != nil {
return cacheStatus{}, err
}

View File

@@ -35,20 +35,20 @@ func TestPerDeviceGoodOutput(t *testing.T) {
"device": "cs-1",
}
fields1 := map[string]interface{}{
"length": 4883791872,
"metadata_blocksize": 8,
"metadata_used": 1018,
"metadata_total": 1501122,
"cache_blocksize": 512,
"cache_used": 7,
"cache_total": 464962,
"read_hits": 139,
"read_misses": 352643,
"write_hits": 15,
"write_misses": 46,
"demotions": 0,
"promotions": 7,
"dirty": 0,
"length": int64(4883791872),
"metadata_blocksize": int64(8),
"metadata_used": int64(1018),
"metadata_total": int64(1501122),
"cache_blocksize": int64(512),
"cache_used": int64(7),
"cache_total": int64(464962),
"read_hits": int64(139),
"read_misses": int64(352643),
"write_hits": int64(15),
"write_misses": int64(46),
"demotions": int64(0),
"promotions": int64(7),
"dirty": int64(0),
}
acc.AssertContainsTaggedFields(t, measurement, fields1, tags1)
@@ -56,20 +56,20 @@ func TestPerDeviceGoodOutput(t *testing.T) {
"device": "cs-2",
}
fields2 := map[string]interface{}{
"length": 4294967296,
"metadata_blocksize": 8,
"metadata_used": 72352,
"metadata_total": 1310720,
"cache_blocksize": 128,
"cache_used": 26,
"cache_total": 24327168,
"read_hits": 2409,
"read_misses": 286,
"write_hits": 265,
"write_misses": 524682,
"demotions": 0,
"promotions": 0,
"dirty": 0,
"length": int64(4294967296),
"metadata_blocksize": int64(8),
"metadata_used": int64(72352),
"metadata_total": int64(1310720),
"cache_blocksize": int64(128),
"cache_used": int64(26),
"cache_total": int64(24327168),
"read_hits": int64(2409),
"read_misses": int64(286),
"write_hits": int64(265),
"write_misses": int64(524682),
"demotions": int64(0),
"promotions": int64(0),
"dirty": int64(0),
}
acc.AssertContainsTaggedFields(t, measurement, fields2, tags2)
@@ -78,20 +78,20 @@ func TestPerDeviceGoodOutput(t *testing.T) {
}
fields3 := map[string]interface{}{
"length": 9178759168,
"metadata_blocksize": 16,
"metadata_used": 73370,
"metadata_total": 2811842,
"cache_blocksize": 640,
"cache_used": 33,
"cache_total": 24792130,
"read_hits": 2548,
"read_misses": 352929,
"write_hits": 280,
"write_misses": 524728,
"demotions": 0,
"promotions": 7,
"dirty": 0,
"length": int64(9178759168),
"metadata_blocksize": int64(16),
"metadata_used": int64(73370),
"metadata_total": int64(2811842),
"cache_blocksize": int64(640),
"cache_used": int64(33),
"cache_total": int64(24792130),
"read_hits": int64(2548),
"read_misses": int64(352929),
"write_hits": int64(280),
"write_misses": int64(524728),
"demotions": int64(0),
"promotions": int64(7),
"dirty": int64(0),
}
acc.AssertContainsTaggedFields(t, measurement, fields3, tags3)
}
@@ -113,20 +113,20 @@ func TestNotPerDeviceGoodOutput(t *testing.T) {
}
fields := map[string]interface{}{
"length": 9178759168,
"metadata_blocksize": 16,
"metadata_used": 73370,
"metadata_total": 2811842,
"cache_blocksize": 640,
"cache_used": 33,
"cache_total": 24792130,
"read_hits": 2548,
"read_misses": 352929,
"write_hits": 280,
"write_misses": 524728,
"demotions": 0,
"promotions": 7,
"dirty": 0,
"length": int64(9178759168),
"metadata_blocksize": int64(16),
"metadata_used": int64(73370),
"metadata_total": int64(2811842),
"cache_blocksize": int64(640),
"cache_used": int64(33),
"cache_total": int64(24792130),
"read_hits": int64(2548),
"read_misses": int64(352929),
"write_hits": int64(280),
"write_misses": int64(524728),
"demotions": int64(0),
"promotions": int64(7),
"dirty": int64(0),
}
acc.AssertContainsTaggedFields(t, measurement, fields, tags)
}

View File

@@ -20,16 +20,6 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
)
type DockerLabelFilter struct {
labelInclude filter.Filter
labelExclude filter.Filter
}
type DockerContainerFilter struct {
containerInclude filter.Filter
containerExclude filter.Filter
}
// Docker object
type Docker struct {
Endpoint string
@@ -41,11 +31,9 @@ type Docker struct {
TagEnvironment []string `toml:"tag_env"`
LabelInclude []string `toml:"docker_label_include"`
LabelExclude []string `toml:"docker_label_exclude"`
LabelFilter DockerLabelFilter
ContainerInclude []string `toml:"container_name_include"`
ContainerExclude []string `toml:"container_name_exclude"`
ContainerFilter DockerContainerFilter
SSLCA string `toml:"ssl_ca"`
SSLCert string `toml:"ssl_cert"`
@@ -55,10 +43,12 @@ type Docker struct {
newEnvClient func() (Client, error)
newClient func(string, *tls.Config) (Client, error)
client Client
httpClient *http.Client
engine_host string
filtersCreated bool
client Client
httpClient *http.Client
engine_host string
filtersCreated bool
labelFilter filter.Filter
containerFilter filter.Filter
}
// KB, MB, GB, TB, PB...human friendly
@@ -291,12 +281,8 @@ func (d *Docker) gatherContainer(
"container_version": imageVersion,
}
if len(d.ContainerInclude) > 0 || len(d.ContainerExclude) > 0 {
if len(d.ContainerInclude) == 0 || !d.ContainerFilter.containerInclude.Match(cname) {
if len(d.ContainerExclude) == 0 || d.ContainerFilter.containerExclude.Match(cname) {
return nil
}
}
if !d.containerFilter.Match(cname) {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration)
@@ -317,10 +303,8 @@ func (d *Docker) gatherContainer(
// Add labels to tags
for k, label := range container.Labels {
if len(d.LabelInclude) == 0 || d.LabelFilter.labelInclude.Match(k) {
if len(d.LabelExclude) == 0 || !d.LabelFilter.labelExclude.Match(k) {
tags[k] = label
}
if d.labelFilter.Match(k) {
tags[k] = label
}
}
@@ -355,7 +339,11 @@ func gatherContainerStats(
total bool,
daemonOSType string,
) {
now := stat.Read
tm := stat.Read
if tm.Before(time.Unix(0, 0)) {
tm = time.Now()
}
memfields := map[string]interface{}{
"container_id": id,
@@ -415,7 +403,7 @@ func gatherContainerStats(
memfields["private_working_set"] = stat.MemoryStats.PrivateWorkingSet
}
acc.AddFields("docker_container_mem", memfields, tags, now)
acc.AddFields("docker_container_mem", memfields, tags, tm)
cpufields := map[string]interface{}{
"usage_total": stat.CPUStats.CPUUsage.TotalUsage,
@@ -440,7 +428,7 @@ func gatherContainerStats(
cputags := copyTags(tags)
cputags["cpu"] = "cpu-total"
acc.AddFields("docker_container_cpu", cpufields, cputags, now)
acc.AddFields("docker_container_cpu", cpufields, cputags, tm)
// If we have OnlineCPUs field, then use it to restrict stats gathering to only Online CPUs
// (https://github.com/moby/moby/commit/115f91d7575d6de6c7781a96a082f144fd17e400)
@@ -458,7 +446,7 @@ func gatherContainerStats(
"usage_total": percpu,
"container_id": id,
}
acc.AddFields("docker_container_cpu", fields, percputags, now)
acc.AddFields("docker_container_cpu", fields, percputags, tm)
}
totalNetworkStatMap := make(map[string]interface{})
@@ -478,7 +466,7 @@ func gatherContainerStats(
if perDevice {
nettags := copyTags(tags)
nettags["network"] = network
acc.AddFields("docker_container_net", netfields, nettags, now)
acc.AddFields("docker_container_net", netfields, nettags, tm)
}
if total {
for field, value := range netfields {
@@ -511,17 +499,17 @@ func gatherContainerStats(
nettags := copyTags(tags)
nettags["network"] = "total"
totalNetworkStatMap["container_id"] = id
acc.AddFields("docker_container_net", totalNetworkStatMap, nettags, now)
acc.AddFields("docker_container_net", totalNetworkStatMap, nettags, tm)
}
gatherBlockIOMetrics(stat, acc, tags, now, id, perDevice, total)
gatherBlockIOMetrics(stat, acc, tags, tm, id, perDevice, total)
}
func gatherBlockIOMetrics(
stat *types.StatsJSON,
acc telegraf.Accumulator,
tags map[string]string,
now time.Time,
tm time.Time,
id string,
perDevice bool,
total bool,
@@ -592,7 +580,7 @@ func gatherBlockIOMetrics(
if perDevice {
iotags := copyTags(tags)
iotags["device"] = device
acc.AddFields("docker_container_blkio", fields, iotags, now)
acc.AddFields("docker_container_blkio", fields, iotags, tm)
}
if total {
for field, value := range fields {
@@ -623,7 +611,7 @@ func gatherBlockIOMetrics(
totalStatMap["container_id"] = id
iotags := copyTags(tags)
iotags["device"] = "total"
acc.AddFields("docker_container_blkio", totalStatMap, iotags, now)
acc.AddFields("docker_container_blkio", totalStatMap, iotags, tm)
}
}
@@ -666,46 +654,25 @@ func parseSize(sizeStr string) (int64, error) {
}
func (d *Docker) createContainerFilters() error {
// Backwards compatibility for deprecated `container_names` parameter.
if len(d.ContainerNames) > 0 {
d.ContainerInclude = append(d.ContainerInclude, d.ContainerNames...)
}
if len(d.ContainerInclude) != 0 {
var err error
d.ContainerFilter.containerInclude, err = filter.Compile(d.ContainerInclude)
if err != nil {
return err
}
filter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude)
if err != nil {
return err
}
if len(d.ContainerExclude) != 0 {
var err error
d.ContainerFilter.containerExclude, err = filter.Compile(d.ContainerExclude)
if err != nil {
return err
}
}
d.containerFilter = filter
return nil
}
func (d *Docker) createLabelFilters() error {
if len(d.LabelInclude) != 0 {
var err error
d.LabelFilter.labelInclude, err = filter.Compile(d.LabelInclude)
if err != nil {
return err
}
filter, err := filter.NewIncludeExcludeFilter(d.LabelInclude, d.LabelExclude)
if err != nil {
return err
}
if len(d.LabelExclude) != 0 {
var err error
d.LabelFilter.labelExclude, err = filter.Compile(d.LabelExclude)
if err != nil {
return err
}
}
d.labelFilter = filter
return nil
}

View File

@@ -44,21 +44,23 @@ func (c *MockClient) ContainerInspect(
return c.ContainerInspectF(ctx, containerID)
}
var baseClient = MockClient{
InfoF: func(context.Context) (types.Info, error) {
return info, nil
},
ContainerListF: func(context.Context, types.ContainerListOptions) ([]types.Container, error) {
return containerList, nil
},
ContainerStatsF: func(context.Context, string, bool) (types.ContainerStats, error) {
return containerStats(), nil
},
ContainerInspectF: func(context.Context, string) (types.ContainerJSON, error) {
return containerInspect, nil
},
}
func newClient(host string, tlsConfig *tls.Config) (Client, error) {
return &MockClient{
InfoF: func(context.Context) (types.Info, error) {
return info, nil
},
ContainerListF: func(context.Context, types.ContainerListOptions) ([]types.Container, error) {
return containerList, nil
},
ContainerStatsF: func(context.Context, string, bool) (types.ContainerStats, error) {
return containerStats(), nil
},
ContainerInspectF: func(context.Context, string) (types.ContainerJSON, error) {
return containerInspect, nil
},
}, nil
return &baseClient, nil
}
func TestDockerGatherContainerStats(t *testing.T) {
@@ -234,82 +236,291 @@ func TestDocker_WindowsMemoryContainerStats(t *testing.T) {
require.NoError(t, err)
}
func TestDockerGatherLabels(t *testing.T) {
var gatherLabelsTests = []struct {
include []string
exclude []string
expected []string
notexpected []string
func TestContainerLabels(t *testing.T) {
var tests = []struct {
name string
container types.Container
include []string
exclude []string
expected map[string]string
}{
{[]string{}, []string{}, []string{"label1", "label2"}, []string{}},
{[]string{"*"}, []string{}, []string{"label1", "label2"}, []string{}},
{[]string{"lab*"}, []string{}, []string{"label1", "label2"}, []string{}},
{[]string{"label1"}, []string{}, []string{"label1"}, []string{"label2"}},
{[]string{"label1*"}, []string{}, []string{"label1"}, []string{"label2"}},
{[]string{}, []string{"*"}, []string{}, []string{"label1", "label2"}},
{[]string{}, []string{"lab*"}, []string{}, []string{"label1", "label2"}},
{[]string{}, []string{"label1"}, []string{"label2"}, []string{"label1"}},
{[]string{"*"}, []string{"*"}, []string{}, []string{"label1", "label2"}},
{
name: "Nil filters matches all",
container: types.Container{
Labels: map[string]string{
"a": "x",
},
},
include: nil,
exclude: nil,
expected: map[string]string{
"a": "x",
},
},
{
name: "Empty filters matches all",
container: types.Container{
Labels: map[string]string{
"a": "x",
},
},
include: []string{},
exclude: []string{},
expected: map[string]string{
"a": "x",
},
},
{
name: "Must match include",
container: types.Container{
Labels: map[string]string{
"a": "x",
"b": "y",
},
},
include: []string{"a"},
exclude: []string{},
expected: map[string]string{
"a": "x",
},
},
{
name: "Must not match exclude",
container: types.Container{
Labels: map[string]string{
"a": "x",
"b": "y",
},
},
include: []string{},
exclude: []string{"b"},
expected: map[string]string{
"a": "x",
},
},
{
name: "Include Glob",
container: types.Container{
Labels: map[string]string{
"aa": "x",
"ab": "y",
"bb": "z",
},
},
include: []string{"a*"},
exclude: []string{},
expected: map[string]string{
"aa": "x",
"ab": "y",
},
},
{
name: "Exclude Glob",
container: types.Container{
Labels: map[string]string{
"aa": "x",
"ab": "y",
"bb": "z",
},
},
include: []string{},
exclude: []string{"a*"},
expected: map[string]string{
"bb": "z",
},
},
{
name: "Excluded Includes",
container: types.Container{
Labels: map[string]string{
"aa": "x",
"ab": "y",
"bb": "z",
},
},
include: []string{"a*"},
exclude: []string{"*b"},
expected: map[string]string{
"aa": "x",
},
},
}
for _, tt := range gatherLabelsTests {
t.Run("", func(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var acc testutil.Accumulator
d := Docker{
newClient: newClient,
newClientFunc := func(host string, tlsConfig *tls.Config) (Client, error) {
client := baseClient
client.ContainerListF = func(context.Context, types.ContainerListOptions) ([]types.Container, error) {
return []types.Container{tt.container}, nil
}
return &client, nil
}
for _, label := range tt.include {
d.LabelInclude = append(d.LabelInclude, label)
}
for _, label := range tt.exclude {
d.LabelExclude = append(d.LabelExclude, label)
d := Docker{
newClient: newClientFunc,
LabelInclude: tt.include,
LabelExclude: tt.exclude,
}
err := d.Gather(&acc)
require.NoError(t, err)
for _, label := range tt.expected {
if !acc.HasTag("docker_container_cpu", label) {
t.Errorf("Didn't get expected label of %s. Test was: Include: %s Exclude %s",
label, tt.include, tt.exclude)
// Grab tags from a container metric
var actual map[string]string
for _, metric := range acc.Metrics {
if metric.Measurement == "docker_container_cpu" {
actual = metric.Tags
}
}
for _, label := range tt.notexpected {
if acc.HasTag("docker_container_cpu", label) {
t.Errorf("Got unexpected label of %s. Test was: Include: %s Exclude %s",
label, tt.include, tt.exclude)
}
for k, v := range tt.expected {
require.Equal(t, v, actual[k])
}
})
}
}
func TestContainerNames(t *testing.T) {
var gatherContainerNames = []struct {
include []string
exclude []string
expected []string
notexpected []string
var tests = []struct {
name string
containers [][]string
include []string
exclude []string
expected []string
}{
{[]string{}, []string{}, []string{"etcd", "etcd2"}, []string{}},
{[]string{"*"}, []string{}, []string{"etcd", "etcd2"}, []string{}},
{[]string{"etc*"}, []string{}, []string{"etcd", "etcd2"}, []string{}},
{[]string{"etcd"}, []string{}, []string{"etcd"}, []string{"etcd2"}},
{[]string{"etcd2*"}, []string{}, []string{"etcd2"}, []string{"etcd"}},
{[]string{}, []string{"etc*"}, []string{}, []string{"etcd", "etcd2"}},
{[]string{}, []string{"etcd"}, []string{"etcd2"}, []string{"etcd"}},
{[]string{"*"}, []string{"*"}, []string{"etcd", "etcd2"}, []string{}},
{[]string{}, []string{"*"}, []string{""}, []string{"etcd", "etcd2"}},
{
name: "Nil filters matches all",
containers: [][]string{
{"/etcd"},
{"/etcd2"},
},
include: nil,
exclude: nil,
expected: []string{"etcd", "etcd2"},
},
{
name: "Empty filters matches all",
containers: [][]string{
{"/etcd"},
{"/etcd2"},
},
include: []string{},
exclude: []string{},
expected: []string{"etcd", "etcd2"},
},
{
name: "Match all containers",
containers: [][]string{
{"/etcd"},
{"/etcd2"},
},
include: []string{"*"},
exclude: []string{},
expected: []string{"etcd", "etcd2"},
},
{
name: "Include prefix match",
containers: [][]string{
{"/etcd"},
{"/etcd2"},
},
include: []string{"etc*"},
exclude: []string{},
expected: []string{"etcd", "etcd2"},
},
{
name: "Exact match",
containers: [][]string{
{"/etcd"},
{"/etcd2"},
},
include: []string{"etcd"},
exclude: []string{},
expected: []string{"etcd"},
},
{
name: "Star matches zero length",
containers: [][]string{
{"/etcd"},
{"/etcd2"},
},
include: []string{"etcd2*"},
exclude: []string{},
expected: []string{"etcd2"},
},
{
name: "Exclude matches all",
containers: [][]string{
{"/etcd"},
{"/etcd2"},
},
include: []string{},
exclude: []string{"etc*"},
expected: []string{},
},
{
name: "Exclude single",
containers: [][]string{
{"/etcd"},
{"/etcd2"},
},
include: []string{},
exclude: []string{"etcd"},
expected: []string{"etcd2"},
},
{
name: "Exclude all",
containers: [][]string{
{"/etcd"},
{"/etcd2"},
},
include: []string{"*"},
exclude: []string{"*"},
expected: []string{},
},
{
name: "Exclude item matching include",
containers: [][]string{
{"acme"},
{"foo"},
{"acme-test"},
},
include: []string{"acme*"},
exclude: []string{"*test*"},
expected: []string{"acme"},
},
{
name: "Exclude item no wildcards",
containers: [][]string{
{"acme"},
{"acme-test"},
},
include: []string{"acme*"},
exclude: []string{"test"},
expected: []string{"acme", "acme-test"},
},
}
for _, tt := range gatherContainerNames {
t.Run("", func(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var acc testutil.Accumulator
newClientFunc := func(host string, tlsConfig *tls.Config) (Client, error) {
client := baseClient
client.ContainerListF = func(context.Context, types.ContainerListOptions) ([]types.Container, error) {
var containers []types.Container
for _, names := range tt.containers {
containers = append(containers, types.Container{
Names: names,
})
}
return containers, nil
}
return &client, nil
}
d := Docker{
newClient: newClient,
newClient: newClientFunc,
ContainerInclude: tt.include,
ContainerExclude: tt.exclude,
}
@@ -317,39 +528,21 @@ func TestContainerNames(t *testing.T) {
err := d.Gather(&acc)
require.NoError(t, err)
// Set of expected names
var expected = make(map[string]bool)
for _, v := range tt.expected {
expected[v] = true
}
// Set of actual names
var actual = make(map[string]bool)
for _, metric := range acc.Metrics {
if metric.Measurement == "docker_container_cpu" {
if val, ok := metric.Tags["container_name"]; ok {
var found bool = false
for _, cname := range tt.expected {
if val == cname {
found = true
break
}
}
if !found {
t.Errorf("Got unexpected container of %s. Test was -> Include: %s, Exclude: %s", val, tt.include, tt.exclude)
}
}
if name, ok := metric.Tags["container_name"]; ok {
actual[name] = true
}
}
for _, metric := range acc.Metrics {
if metric.Measurement == "docker_container_cpu" {
if val, ok := metric.Tags["container_name"]; ok {
var found bool = false
for _, cname := range tt.notexpected {
if val == cname {
found = true
break
}
}
if found {
t.Errorf("Got unexpected container of %s. Test was -> Include: %s, Exclude: %s", val, tt.include, tt.exclude)
}
}
}
}
require.Equal(t, expected, actual)
})
}
}

View File

@@ -98,6 +98,7 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
}
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DisableKeepAlives: true,
TLSClientConfig: tlsCfg,
},

View File

@@ -1,6 +1,7 @@
package httpjson
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
@@ -15,6 +16,10 @@ import (
"github.com/influxdata/telegraf/plugins/parsers"
)
var (
utf8BOM = []byte("\xef\xbb\xbf")
)
// HttpJson struct
type HttpJson struct {
Name string
@@ -170,7 +175,6 @@ func (h *HttpJson) gatherServer(
serverURL string,
) error {
resp, responseTime, err := h.sendRequest(serverURL)
if err != nil {
return err
}
@@ -266,6 +270,7 @@ func (h *HttpJson) sendRequest(serverURL string) (string, float64, error) {
if err != nil {
return string(body), responseTime, err
}
body = bytes.TrimPrefix(body, utf8BOM)
// Process response
if resp.StatusCode != http.StatusOK {

View File

@@ -477,15 +477,13 @@ func TestHttpJsonBadJson(t *testing.T) {
assert.Equal(t, 0, acc.NFields())
}
// Test response to empty string as response objectgT
// Test response to empty string as response object
func TestHttpJsonEmptyResponse(t *testing.T) {
httpjson := genMockHttpJson(empty, 200)
var acc testutil.Accumulator
err := acc.GatherError(httpjson[0].Gather)
assert.Error(t, err)
assert.Equal(t, 0, acc.NFields())
assert.NoError(t, err)
}
// Test that the proper values are ignored or collected
@@ -560,3 +558,18 @@ func TestHttpJsonArray200Tags(t *testing.T) {
}
}
}
var jsonBOM = []byte("\xef\xbb\xbf[{\"value\":17}]")
// TestHttpJsonBOM tests that UTF-8 JSON with a BOM can be parsed
func TestHttpJsonBOM(t *testing.T) {
httpjson := genMockHttpJson(string(jsonBOM), 200)
for _, service := range httpjson {
if service.Name == "other_webapp" {
var acc testutil.Accumulator
err := acc.GatherError(service.Gather)
require.NoError(t, err)
}
}
}

View File

@@ -3,8 +3,6 @@ package leofs
import (
"bufio"
"fmt"
"log"
"net/url"
"os/exec"
"strconv"
"strings"
@@ -19,7 +17,7 @@ import (
const oid = ".1.3.6.1.4.1.35450"
// For Manager Master
const defaultEndpoint = "udp://127.0.0.1:4020"
const defaultEndpoint = "127.0.0.1:4020"
type ServerType int
@@ -137,8 +135,8 @@ var serverTypeMapping = map[string]ServerType{
var sampleConfig = `
## An array of URLs of the form:
## "udp://" host [ ":" port]
servers = ["udp://127.0.0.1:4020"]
## host [ ":" port]
servers = ["127.0.0.1:4020"]
`
func (l *LeoFS) SampleConfig() string {
@@ -155,28 +153,22 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
return nil
}
var wg sync.WaitGroup
for i, endpoint := range l.Servers {
if !strings.HasPrefix(endpoint, "udp://") {
// Preserve backwards compatibility for hostnames without a
// scheme, broken in go 1.8. Remove in Telegraf 2.0
endpoint = "udp://" + endpoint
log.Printf("W! [inputs.mongodb] Using %q as connection URL; please update your configuration to use an URL", endpoint)
l.Servers[i] = endpoint
}
u, err := url.Parse(endpoint)
if err != nil {
acc.AddError(fmt.Errorf("Unable to parse address %q: %s", endpoint, err))
continue
}
if u.Host == "" {
for _, endpoint := range l.Servers {
results := strings.Split(endpoint, ":")
port := "4020"
if len(results) > 2 {
acc.AddError(fmt.Errorf("Unable to parse address %q", endpoint))
continue
} else if len(results) == 2 {
if _, err := strconv.Atoi(results[1]); err == nil {
port = results[1]
} else {
acc.AddError(fmt.Errorf("Unable to parse port from %q", endpoint))
continue
}
}
port := u.Port()
if port == "" {
port = "4020"
}
st, ok := serverTypeMapping[port]
if !ok {
st = ServerTypeStorage
@@ -196,7 +188,7 @@ func (l *LeoFS) gatherServer(
serverType ServerType,
acc telegraf.Accumulator,
) error {
cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", endpoint, oid)
cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", "-On", endpoint, oid)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err

View File

@@ -16,21 +16,21 @@ package main
import "fmt"
const output = ` + "`" + `iso.3.6.1.4.1.35450.15.1.0 = STRING: "manager_888@127.0.0.1"
iso.3.6.1.4.1.35450.15.2.0 = Gauge32: 186
iso.3.6.1.4.1.35450.15.3.0 = Gauge32: 46235519
iso.3.6.1.4.1.35450.15.4.0 = Gauge32: 32168525
iso.3.6.1.4.1.35450.15.5.0 = Gauge32: 14066068
iso.3.6.1.4.1.35450.15.6.0 = Gauge32: 5512968
iso.3.6.1.4.1.35450.15.7.0 = Gauge32: 186
iso.3.6.1.4.1.35450.15.8.0 = Gauge32: 46269006
iso.3.6.1.4.1.35450.15.9.0 = Gauge32: 32202867
iso.3.6.1.4.1.35450.15.10.0 = Gauge32: 14064995
iso.3.6.1.4.1.35450.15.11.0 = Gauge32: 5492634
iso.3.6.1.4.1.35450.15.12.0 = Gauge32: 60
iso.3.6.1.4.1.35450.15.13.0 = Gauge32: 43515904
iso.3.6.1.4.1.35450.15.14.0 = Gauge32: 60
iso.3.6.1.4.1.35450.15.15.0 = Gauge32: 43533983` + "`" +
const output = ` + "`" + `.1.3.6.1.4.1.35450.15.1.0 = STRING: "manager_888@127.0.0.1"
.1.3.6.1.4.1.35450.15.2.0 = Gauge32: 186
.1.3.6.1.4.1.35450.15.3.0 = Gauge32: 46235519
.1.3.6.1.4.1.35450.15.4.0 = Gauge32: 32168525
.1.3.6.1.4.1.35450.15.5.0 = Gauge32: 14066068
.1.3.6.1.4.1.35450.15.6.0 = Gauge32: 5512968
.1.3.6.1.4.1.35450.15.7.0 = Gauge32: 186
.1.3.6.1.4.1.35450.15.8.0 = Gauge32: 46269006
.1.3.6.1.4.1.35450.15.9.0 = Gauge32: 32202867
.1.3.6.1.4.1.35450.15.10.0 = Gauge32: 14064995
.1.3.6.1.4.1.35450.15.11.0 = Gauge32: 5492634
.1.3.6.1.4.1.35450.15.12.0 = Gauge32: 60
.1.3.6.1.4.1.35450.15.13.0 = Gauge32: 43515904
.1.3.6.1.4.1.35450.15.14.0 = Gauge32: 60
.1.3.6.1.4.1.35450.15.15.0 = Gauge32: 43533983` + "`" +
`
func main() {
fmt.Println(output)
@@ -42,34 +42,34 @@ package main
import "fmt"
const output = ` + "`" + `iso.3.6.1.4.1.35450.34.1.0 = STRING: "storage_0@127.0.0.1"
iso.3.6.1.4.1.35450.34.2.0 = Gauge32: 512
iso.3.6.1.4.1.35450.34.3.0 = Gauge32: 38126307
iso.3.6.1.4.1.35450.34.4.0 = Gauge32: 22308716
iso.3.6.1.4.1.35450.34.5.0 = Gauge32: 15816448
iso.3.6.1.4.1.35450.34.6.0 = Gauge32: 5232008
iso.3.6.1.4.1.35450.34.7.0 = Gauge32: 512
iso.3.6.1.4.1.35450.34.8.0 = Gauge32: 38113176
iso.3.6.1.4.1.35450.34.9.0 = Gauge32: 22313398
iso.3.6.1.4.1.35450.34.10.0 = Gauge32: 15798779
iso.3.6.1.4.1.35450.34.11.0 = Gauge32: 5237315
iso.3.6.1.4.1.35450.34.12.0 = Gauge32: 191
iso.3.6.1.4.1.35450.34.13.0 = Gauge32: 824
iso.3.6.1.4.1.35450.34.14.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.15.0 = Gauge32: 50105
iso.3.6.1.4.1.35450.34.16.0 = Gauge32: 196654
iso.3.6.1.4.1.35450.34.17.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.18.0 = Gauge32: 2052
iso.3.6.1.4.1.35450.34.19.0 = Gauge32: 50296
iso.3.6.1.4.1.35450.34.20.0 = Gauge32: 35
iso.3.6.1.4.1.35450.34.21.0 = Gauge32: 898
iso.3.6.1.4.1.35450.34.22.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.23.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.24.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.31.0 = Gauge32: 51
iso.3.6.1.4.1.35450.34.32.0 = Gauge32: 53219328
iso.3.6.1.4.1.35450.34.33.0 = Gauge32: 51
iso.3.6.1.4.1.35450.34.34.0 = Gauge32: 53351083` + "`" +
const output = ` + "`" + `.1.3.6.1.4.1.35450.34.1.0 = STRING: "storage_0@127.0.0.1"
.1.3.6.1.4.1.35450.34.2.0 = Gauge32: 512
.1.3.6.1.4.1.35450.34.3.0 = Gauge32: 38126307
.1.3.6.1.4.1.35450.34.4.0 = Gauge32: 22308716
.1.3.6.1.4.1.35450.34.5.0 = Gauge32: 15816448
.1.3.6.1.4.1.35450.34.6.0 = Gauge32: 5232008
.1.3.6.1.4.1.35450.34.7.0 = Gauge32: 512
.1.3.6.1.4.1.35450.34.8.0 = Gauge32: 38113176
.1.3.6.1.4.1.35450.34.9.0 = Gauge32: 22313398
.1.3.6.1.4.1.35450.34.10.0 = Gauge32: 15798779
.1.3.6.1.4.1.35450.34.11.0 = Gauge32: 5237315
.1.3.6.1.4.1.35450.34.12.0 = Gauge32: 191
.1.3.6.1.4.1.35450.34.13.0 = Gauge32: 824
.1.3.6.1.4.1.35450.34.14.0 = Gauge32: 0
.1.3.6.1.4.1.35450.34.15.0 = Gauge32: 50105
.1.3.6.1.4.1.35450.34.16.0 = Gauge32: 196654
.1.3.6.1.4.1.35450.34.17.0 = Gauge32: 0
.1.3.6.1.4.1.35450.34.18.0 = Gauge32: 2052
.1.3.6.1.4.1.35450.34.19.0 = Gauge32: 50296
.1.3.6.1.4.1.35450.34.20.0 = Gauge32: 35
.1.3.6.1.4.1.35450.34.21.0 = Gauge32: 898
.1.3.6.1.4.1.35450.34.22.0 = Gauge32: 0
.1.3.6.1.4.1.35450.34.23.0 = Gauge32: 0
.1.3.6.1.4.1.35450.34.24.0 = Gauge32: 0
.1.3.6.1.4.1.35450.34.31.0 = Gauge32: 51
.1.3.6.1.4.1.35450.34.32.0 = Gauge32: 53219328
.1.3.6.1.4.1.35450.34.33.0 = Gauge32: 51
.1.3.6.1.4.1.35450.34.34.0 = Gauge32: 53351083` + "`" +
`
func main() {
fmt.Println(output)
@@ -81,31 +81,31 @@ package main
import "fmt"
const output = ` + "`" + `iso.3.6.1.4.1.35450.34.1.0 = STRING: "gateway_0@127.0.0.1"
iso.3.6.1.4.1.35450.34.2.0 = Gauge32: 465
iso.3.6.1.4.1.35450.34.3.0 = Gauge32: 61676335
iso.3.6.1.4.1.35450.34.4.0 = Gauge32: 46890415
iso.3.6.1.4.1.35450.34.5.0 = Gauge32: 14785011
iso.3.6.1.4.1.35450.34.6.0 = Gauge32: 5578855
iso.3.6.1.4.1.35450.34.7.0 = Gauge32: 465
iso.3.6.1.4.1.35450.34.8.0 = Gauge32: 61644426
iso.3.6.1.4.1.35450.34.9.0 = Gauge32: 46880358
iso.3.6.1.4.1.35450.34.10.0 = Gauge32: 14763002
iso.3.6.1.4.1.35450.34.11.0 = Gauge32: 5582125
iso.3.6.1.4.1.35450.34.12.0 = Gauge32: 191
iso.3.6.1.4.1.35450.34.13.0 = Gauge32: 827
iso.3.6.1.4.1.35450.34.14.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.15.0 = Gauge32: 50105
iso.3.6.1.4.1.35450.34.16.0 = Gauge32: 196650
iso.3.6.1.4.1.35450.34.17.0 = Gauge32: 0
iso.3.6.1.4.1.35450.34.18.0 = Gauge32: 30256
iso.3.6.1.4.1.35450.34.19.0 = Gauge32: 532158
iso.3.6.1.4.1.35450.34.20.0 = Gauge32: 34
iso.3.6.1.4.1.35450.34.21.0 = Gauge32: 1
iso.3.6.1.4.1.35450.34.31.0 = Gauge32: 53
iso.3.6.1.4.1.35450.34.32.0 = Gauge32: 55050240
iso.3.6.1.4.1.35450.34.33.0 = Gauge32: 53
iso.3.6.1.4.1.35450.34.34.0 = Gauge32: 55186538` + "`" +
const output = ` + "`" + `.1.3.6.1.4.1.35450.34.1.0 = STRING: "gateway_0@127.0.0.1"
.1.3.6.1.4.1.35450.34.2.0 = Gauge32: 465
.1.3.6.1.4.1.35450.34.3.0 = Gauge32: 61676335
.1.3.6.1.4.1.35450.34.4.0 = Gauge32: 46890415
.1.3.6.1.4.1.35450.34.5.0 = Gauge32: 14785011
.1.3.6.1.4.1.35450.34.6.0 = Gauge32: 5578855
.1.3.6.1.4.1.35450.34.7.0 = Gauge32: 465
.1.3.6.1.4.1.35450.34.8.0 = Gauge32: 61644426
.1.3.6.1.4.1.35450.34.9.0 = Gauge32: 46880358
.1.3.6.1.4.1.35450.34.10.0 = Gauge32: 14763002
.1.3.6.1.4.1.35450.34.11.0 = Gauge32: 5582125
.1.3.6.1.4.1.35450.34.12.0 = Gauge32: 191
.1.3.6.1.4.1.35450.34.13.0 = Gauge32: 827
.1.3.6.1.4.1.35450.34.14.0 = Gauge32: 0
.1.3.6.1.4.1.35450.34.15.0 = Gauge32: 50105
.1.3.6.1.4.1.35450.34.16.0 = Gauge32: 196650
.1.3.6.1.4.1.35450.34.17.0 = Gauge32: 0
.1.3.6.1.4.1.35450.34.18.0 = Gauge32: 30256
.1.3.6.1.4.1.35450.34.19.0 = Gauge32: 532158
.1.3.6.1.4.1.35450.34.20.0 = Gauge32: 34
.1.3.6.1.4.1.35450.34.21.0 = Gauge32: 1
.1.3.6.1.4.1.35450.34.31.0 = Gauge32: 53
.1.3.6.1.4.1.35450.34.32.0 = Gauge32: 55050240
.1.3.6.1.4.1.35450.34.33.0 = Gauge32: 53
.1.3.6.1.4.1.35450.34.34.0 = Gauge32: 55186538` + "`" +
`
func main() {
fmt.Println(output)

View File

@@ -514,7 +514,7 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec
returnVal.Command = diff(newStat.Opcounters.Command, oldStat.Opcounters.Command, sampleSecs)
}
if newStat.Metrics != nil && newStat.Metrics.TTL != nil && oldStat.Metrics.TTL != nil {
if newStat.Metrics != nil && newStat.Metrics.TTL != nil && oldStat.Metrics != nil && oldStat.Metrics.TTL != nil {
returnVal.Passes = diff(newStat.Metrics.TTL.Passes, oldStat.Metrics.TTL.Passes, sampleSecs)
returnVal.DeletedDocuments = diff(newStat.Metrics.TTL.DeletedDocuments, oldStat.Metrics.TTL.DeletedDocuments, sampleSecs)
}

View File

@@ -10,11 +10,13 @@ The plugin expects messages in the
```toml
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
servers = ["localhost:1883"]
## MQTT broker URLs to be used. The format should be scheme://host:port,
## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
## MQTT QoS, must be 0, 1, or 2
qos = 0
## Connection timeout for initial connection in seconds
connection_timeout = 30
connection_timeout = "30s"
## Topics to subscribe to
topics = [

View File

@@ -15,6 +15,9 @@ import (
"github.com/eclipse/paho.mqtt.golang"
)
// 30 Seconds is the default used by paho.mqtt.golang
var defaultConnectionTimeout = internal.Duration{Duration: 30 * time.Second}
type MQTTConsumer struct {
Servers []string
Topics []string
@@ -53,11 +56,14 @@ type MQTTConsumer struct {
}
var sampleConfig = `
servers = ["localhost:1883"]
## MQTT broker URLs to be used. The format should be scheme://host:port,
## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
## MQTT QoS, must be 0, 1, or 2
qos = 0
## Connection timeout for initial connection in seconds
connection_timeout = 30
connection_timeout = "30s"
## Topics to subscribe to
topics = [
@@ -118,8 +124,8 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS)
}
if int(m.ConnectionTimeout.Duration) <= 0 {
return fmt.Errorf("MQTT Consumer, invalid connection_timeout value: %d", m.ConnectionTimeout)
if m.ConnectionTimeout.Duration < 1*time.Second {
return fmt.Errorf("MQTT Consumer, invalid connection_timeout value: %s", m.ConnectionTimeout.Duration)
}
opts, err := m.createOpts()
@@ -236,9 +242,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
return nil, err
}
scheme := "tcp"
if tlsCfg != nil {
scheme = "ssl"
opts.SetTLSConfig(tlsCfg)
}
@@ -254,8 +258,17 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
if len(m.Servers) == 0 {
return opts, fmt.Errorf("could not get host infomations")
}
for _, host := range m.Servers {
server := fmt.Sprintf("%s://%s", scheme, host)
for _, server := range m.Servers {
// Preserve support for host:port style servers; deprecated in Telegraf 1.4.4
if !strings.Contains(server, "://") {
log.Printf("W! mqtt_consumer server %q should be updated to use `scheme://host:port` format", server)
if tlsCfg == nil {
server = "tcp://" + server
} else {
server = "ssl://" + server
}
}
opts.AddBroker(server)
}
@@ -270,6 +283,8 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
func init() {
inputs.Add("mqtt_consumer", func() telegraf.Input {
return &MQTTConsumer{}
return &MQTTConsumer{
ConnectionTimeout: defaultConnectionTimeout,
}
})
}

View File

@@ -2,11 +2,12 @@ package sqlserver
import (
"database/sql"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
// go-mssqldb initialization
_ "github.com/zensqlmonitor/go-mssqldb"
)
@@ -244,10 +245,10 @@ UNION ALL
SELECT 'Average pending disk IO', AveragePendingDiskIOCount = (SELECT AVG(pending_disk_io_count) FROM sys.dm_os_schedulers WITH (NOLOCK) WHERE scheduler_id < 255 )
UNION ALL
SELECT 'Buffer pool rate (bytes/sec)', BufferPoolRate = (1.0*cntr_value * 8 * 1024) /
(SELECT 1.0*cntr_value FROM sys.dm_os_performance_counters WHERE object_name like '%Buffer Manager%' AND lower(counter_name) = 'Page life expectancy')
(SELECT 1.0*cntr_value FROM sys.dm_os_performance_counters WHERE object_name like '%Buffer Manager%' AND counter_name = 'Page life expectancy')
FROM sys.dm_os_performance_counters
WHERE object_name like '%Buffer Manager%'
AND counter_name = 'database pages'
AND counter_name = 'Database pages'
UNION ALL
SELECT 'Memory grant pending', MemoryGrantPending = cntr_value
FROM sys.dm_os_performance_counters
@@ -1436,16 +1437,16 @@ SELECT
, type = 'Wait stats'
---- values
, [I/O] = SUM([I/O])
, [Latch] = SUM([Latch])
, [Lock] = SUM([Lock])
, [Network] = SUM([Network])
, [Service broker] = SUM([Service broker])
, [Memory] = SUM([Memory])
, [Buffer] = SUM([Buffer])
, [Latch] = SUM([LATCH])
, [Lock] = SUM([LOCK])
, [Network] = SUM([NETWORK])
, [Service broker] = SUM([SERVICE BROKER])
, [Memory] = SUM([MEMORY])
, [Buffer] = SUM([BUFFER])
, [CLR] = SUM([CLR])
, [SQLOS] = SUM([SQLOS])
, [XEvent] = SUM([XEvent])
, [Other] = SUM([Other])
, [XEvent] = SUM([XEVENT])
, [Other] = SUM([OTHER])
, [Total] = SUM([I/O]+[LATCH]+[LOCK]+[NETWORK]+[SERVICE BROKER]+[MEMORY]+[BUFFER]+[CLR]+[XEVENT]+[SQLOS]+[OTHER])
FROM
(
@@ -1479,16 +1480,16 @@ SELECT
, type = 'Wait stats'
---- values
, [I/O] = SUM([I/O])
, [Latch] = SUM([Latch])
, [Lock] = SUM([Lock])
, [Network] = SUM([Network])
, [Service broker] = SUM([Service broker])
, [Memory] = SUM([Memory])
, [Buffer] = SUM([Buffer])
, [Latch] = SUM([LATCH])
, [Lock] = SUM([LOCK])
, [Network] = SUM([NETWORK])
, [Service broker] = SUM([SERVICE BROKER])
, [Memory] = SUM([MEMORY])
, [Buffer] = SUM([BUFFER])
, [CLR] = SUM([CLR])
, [SQLOS] = SUM([SQLOS])
, [XEvent] = SUM([XEvent])
, [Other] = SUM([Other])
, [XEvent] = SUM([XEVENT])
, [Other] = SUM([OTHER])
, [Total] = SUM([I/O]+[LATCH]+[LOCK]+[NETWORK]+[SERVICE BROKER]+[MEMORY]+[BUFFER]+[CLR]+[XEVENT]+[SQLOS]+[OTHER])
FROM
(

View File

@@ -11,7 +11,7 @@ import (
type CPUStats struct {
ps PS
lastStats []cpu.TimesStat
lastStats map[string]cpu.TimesStat
PerCPU bool `toml:"percpu"`
TotalCPU bool `toml:"totalcpu"`
@@ -53,7 +53,7 @@ func (s *CPUStats) Gather(acc telegraf.Accumulator) error {
}
now := time.Now()
for i, cts := range times {
for _, cts := range times {
tags := map[string]string{
"cpu": cts.CPU,
}
@@ -86,14 +86,18 @@ func (s *CPUStats) Gather(acc telegraf.Accumulator) error {
// If it's the 1st gather, can't get CPU Usage stats yet
continue
}
lastCts := s.lastStats[i]
lastCts, ok := s.lastStats[cts.CPU]
if !ok {
continue
}
lastTotal := totalCpuTime(lastCts)
lastActive := activeCpuTime(lastCts)
totalDelta := total - lastTotal
if totalDelta < 0 {
s.lastStats = times
return fmt.Errorf("Error: current total CPU time is less than previous total CPU time")
err = fmt.Errorf("Error: current total CPU time is less than previous total CPU time")
break
}
if totalDelta == 0 {
@@ -118,9 +122,12 @@ func (s *CPUStats) Gather(acc telegraf.Accumulator) error {
acc.AddGauge("cpu", fieldsG, tags, now)
}
s.lastStats = times
s.lastStats = make(map[string]cpu.TimesStat)
for _, cts := range times {
s.lastStats[cts.CPU] = cts
}
return nil
return err
}
func totalCpuTime(t cpu.TimesStat) float64 {

View File

@@ -149,3 +149,107 @@ func assertContainsTaggedFloat(
measurement, delta, expectedValue, actualValue)
assert.Fail(t, msg)
}
// TestCPUCountChange tests that no errors are encountered if the number of
// CPUs increases as reported with LXC.
func TestCPUCountIncrease(t *testing.T) {
var mps MockPS
var mps2 MockPS
var acc testutil.Accumulator
var err error
cs := NewCPUStats(&mps)
mps.On("CPUTimes").Return(
[]cpu.TimesStat{
cpu.TimesStat{
CPU: "cpu0",
},
}, nil)
err = cs.Gather(&acc)
require.NoError(t, err)
mps2.On("CPUTimes").Return(
[]cpu.TimesStat{
cpu.TimesStat{
CPU: "cpu0",
},
cpu.TimesStat{
CPU: "cpu1",
},
}, nil)
cs.ps = &mps2
err = cs.Gather(&acc)
require.NoError(t, err)
}
// TestCPUTimesDecrease tests that telegraf continue to works after
// CPU times decrease, which seems to occur when Linux system is suspended.
func TestCPUTimesDecrease(t *testing.T) {
var mps MockPS
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
cts := cpu.TimesStat{
CPU: "cpu0",
User: 18,
Idle: 80,
Iowait: 2,
}
cts2 := cpu.TimesStat{
CPU: "cpu0",
User: 38, // increased by 20
Idle: 40, // decreased by 40
Iowait: 1, // decreased by 1
}
cts3 := cpu.TimesStat{
CPU: "cpu0",
User: 56, // increased by 18
Idle: 120, // increased by 80
Iowait: 3, // increased by 2
}
mps.On("CPUTimes").Return([]cpu.TimesStat{cts}, nil)
cs := NewCPUStats(&mps)
cputags := map[string]string{
"cpu": "cpu0",
}
err := cs.Gather(&acc)
require.NoError(t, err)
// Computed values are checked with delta > 0 becasue of floating point arithmatic
// imprecision
assertContainsTaggedFloat(t, &acc, "cpu", "time_user", 18, 0, cputags)
assertContainsTaggedFloat(t, &acc, "cpu", "time_idle", 80, 0, cputags)
assertContainsTaggedFloat(t, &acc, "cpu", "time_iowait", 2, 0, cputags)
mps2 := MockPS{}
mps2.On("CPUTimes").Return([]cpu.TimesStat{cts2}, nil)
cs.ps = &mps2
// CPU times decreased. An error should be raised
err = cs.Gather(&acc)
require.Error(t, err)
mps3 := MockPS{}
mps3.On("CPUTimes").Return([]cpu.TimesStat{cts3}, nil)
cs.ps = &mps3
err = cs.Gather(&acc)
require.NoError(t, err)
assertContainsTaggedFloat(t, &acc, "cpu", "time_user", 56, 0, cputags)
assertContainsTaggedFloat(t, &acc, "cpu", "time_idle", 120, 0, cputags)
assertContainsTaggedFloat(t, &acc, "cpu", "time_iowait", 3, 0, cputags)
assertContainsTaggedFloat(t, &acc, "cpu", "usage_user", 18, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "cpu", "usage_idle", 80, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "cpu", "usage_iowait", 2, 0.0005, cputags)
}

View File

@@ -2,6 +2,7 @@ package system
import (
"fmt"
"log"
"regexp"
"strings"
@@ -164,14 +165,13 @@ func (s *DiskIOStats) Gather(acc telegraf.Accumulator) error {
var varRegex = regexp.MustCompile(`\$(?:\w+|\{\w+\})`)
func (s *DiskIOStats) diskName(devName string) string {
di, err := s.diskInfo(devName)
if err != nil {
// discard error :-(
// We can't return error because it's non-fatal to the Gather().
// And we have no logger, so we can't log it.
if len(s.NameTemplates) == 0 {
return devName
}
if di == nil {
di, err := s.diskInfo(devName)
if err != nil {
log.Printf("W! Error gathering disk info: %s", err)
return devName
}
@@ -198,14 +198,13 @@ func (s *DiskIOStats) diskName(devName string) string {
}
func (s *DiskIOStats) diskTags(devName string) map[string]string {
di, err := s.diskInfo(devName)
if err != nil {
// discard error :-(
// We can't return error because it's non-fatal to the Gather().
// And we have no logger, so we can't log it.
if len(s.DeviceTags) == 0 {
return nil
}
if di == nil {
di, err := s.diskInfo(devName)
if err != nil {
log.Printf("W! Error gathering disk info: %s", err)
return nil
}

View File

@@ -5,25 +5,26 @@ import (
"fmt"
"os"
"strings"
"syscall"
"golang.org/x/sys/unix"
)
type diskInfoCache struct {
stat syscall.Stat_t
values map[string]string
udevDataPath string
values map[string]string
}
var udevPath = "/run/udev/data"
func (s *DiskIOStats) diskInfo(devName string) (map[string]string, error) {
fi, err := os.Stat("/dev/" + devName)
var err error
var stat unix.Stat_t
path := "/dev/" + devName
err = unix.Stat(path, &stat)
if err != nil {
return nil, err
}
stat, ok := fi.Sys().(*syscall.Stat_t)
if !ok {
return nil, nil
}
if s.infoCache == nil {
s.infoCache = map[string]diskInfoCache{}
@@ -31,25 +32,26 @@ func (s *DiskIOStats) diskInfo(devName string) (map[string]string, error) {
ic, ok := s.infoCache[devName]
if ok {
return ic.values, nil
} else {
ic = diskInfoCache{
stat: *stat,
values: map[string]string{},
}
s.infoCache[devName] = ic
}
di := ic.values
major := stat.Rdev >> 8 & 0xff
minor := stat.Rdev & 0xff
udevDataPath := fmt.Sprintf("%s/b%d:%d", udevPath, major, minor)
f, err := os.Open(fmt.Sprintf("%s/b%d:%d", udevPath, major, minor))
di := map[string]string{}
s.infoCache[devName] = diskInfoCache{
udevDataPath: udevDataPath,
values: di,
}
f, err := os.Open(udevDataPath)
if err != nil {
return nil, err
}
defer f.Close()
scnr := bufio.NewScanner(f)
scnr := bufio.NewScanner(f)
for scnr.Scan() {
l := scnr.Text()
if len(l) < 4 || l[:2] != "E:" {

View File

@@ -41,7 +41,7 @@ func (k *KernelVmstat) Gather(acc telegraf.Accumulator) error {
// We only want the even number index as that contain the stat name.
if i%2 == 0 {
// Convert the stat value into an integer.
m, err := strconv.Atoi(string(dataFields[i+1]))
m, err := strconv.ParseInt(string(dataFields[i+1]), 10, 64)
if err != nil {
return err
}

View File

@@ -48,7 +48,7 @@ func TestFullVmStatProcFile(t *testing.T) {
"nr_isolated_anon": int64(0),
"nr_isolated_file": int64(0),
"nr_shmem": int64(541689),
"numa_hit": int64(5113399878),
"numa_hit": int64(6690743595),
"numa_miss": int64(0),
"numa_foreign": int64(0),
"numa_interleave": int64(35793),
@@ -200,7 +200,7 @@ nr_writeback_temp 0
nr_isolated_anon 0
nr_isolated_file 0
nr_shmem 541689
numa_hit 5113399878
numa_hit 6690743595
numa_miss 0
numa_foreign 0
numa_interleave 35793

View File

@@ -165,7 +165,7 @@ func (s *Tomcat) Gather(acc telegraf.Accumulator) error {
for _, c := range status.TomcatConnectors {
name, err := strconv.Unquote(c.Name)
if err != nil {
return fmt.Errorf("Unable to unquote name '%s': %s", c.Name, err)
name = c.Name
}
tccTags := map[string]string{

View File

@@ -11,7 +11,7 @@ import (
"github.com/stretchr/testify/require"
)
var tomcatStatus = `<?xml version="1.0" encoding="UTF-8"?>
var tomcatStatus8 = `<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="/manager/xform.xsl" ?>
<status>
<jvm>
@@ -37,10 +37,10 @@ var tomcatStatus = `<?xml version="1.0" encoding="UTF-8"?>
</connector>
</status>`
func TestHTTPTomcat(t *testing.T) {
func TestHTTPTomcat8(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, tomcatStatus)
fmt.Fprintln(w, tomcatStatus8)
}))
defer ts.Close()
@@ -91,5 +91,63 @@ func TestHTTPTomcat(t *testing.T) {
"name": "http-apr-8080",
}
acc.AssertContainsTaggedFields(t, "tomcat_connector", connectorFields, connectorTags)
}
var tomcatStatus6 = `<?xml version="1.0" encoding="utf-8"?>
<?xml-stylesheet type="text/xsl" href="xform.xsl" ?>
<status>
<jvm>
<memory free="1942681600" total="2040070144" max="2040070144"/>
</jvm>
<connector name="http-8080">
<threadInfo maxThreads="150" currentThreadCount="2" currentThreadsBusy="2"/>
<requestInfo maxTime="1005" processingTime="2465" requestCount="436" errorCount="16" bytesReceived="0" bytesSent="550196"/>
<workers>
<worker stage="K" requestProcessingTime="526" requestBytesSent="0" requestBytesReceived="0" remoteAddr="127.0.0.1" virtualHost="?" method="?" currentUri="?" currentQueryString="?" protocol="?"/>
<worker stage="S" requestProcessingTime="1" requestBytesSent="0" requestBytesReceived="0" remoteAddr="127.0.0.1" virtualHost="127.0.0.1" method="GET" currentUri="/manager/status/all" currentQueryString="XML=true" protocol="HTTP/1.1"/>
</workers>
</connector>
</status>`
func TestHTTPTomcat6(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, tomcatStatus6)
}))
defer ts.Close()
tc := Tomcat{
URL: ts.URL,
Username: "tomcat",
Password: "s3cret",
}
var acc testutil.Accumulator
err := tc.Gather(&acc)
require.NoError(t, err)
// tomcat_jvm_memory
jvmMemoryFields := map[string]interface{}{
"free": int64(1942681600),
"total": int64(2040070144),
"max": int64(2040070144),
}
acc.AssertContainsFields(t, "tomcat_jvm_memory", jvmMemoryFields)
// tomcat_connector
connectorFields := map[string]interface{}{
"bytes_received": int64(0),
"bytes_sent": int64(550196),
"current_thread_count": int64(2),
"current_threads_busy": int64(2),
"error_count": int(16),
"max_threads": int64(150),
"max_time": int(1005),
"processing_time": int(2465),
"request_count": int(436),
}
connectorTags := map[string]string{
"name": "http-8080",
}
acc.AssertContainsTaggedFields(t, "tomcat_connector", connectorFields, connectorTags)
}

View File

@@ -48,6 +48,9 @@ func (a *Amon) Connect() error {
return fmt.Errorf("serverkey and amon_instance are required fields for amon output")
}
a.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: a.Timeout.Duration,
}
return nil

View File

@@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"sort"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
@@ -55,7 +56,11 @@ func (d *Datadog) Connect() error {
if d.Apikey == "" {
return fmt.Errorf("apikey is a required field for datadog output")
}
d.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: d.Timeout.Duration,
}
return nil
@@ -96,6 +101,7 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error {
}
}
redactedApiKey := "****************"
ts.Series = make([]*Metric, metricCounter)
copy(ts.Series, tempSeries[0:])
tsBytes, err := json.Marshal(ts)
@@ -104,13 +110,13 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error {
}
req, err := http.NewRequest("POST", d.authenticatedUrl(), bytes.NewBuffer(tsBytes))
if err != nil {
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
return fmt.Errorf("unable to create http.Request, %s\n", strings.Replace(err.Error(), d.Apikey, redactedApiKey, -1))
}
req.Header.Add("Content-Type", "application/json")
resp, err := d.client.Do(req)
if err != nil {
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
return fmt.Errorf("error POSTing metrics, %s\n", strings.Replace(err.Error(), d.Apikey, redactedApiKey, -1))
}
defer resp.Body.Close()

View File

@@ -7,9 +7,7 @@ This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP.
```toml
# Configuration for influxdb server to send metrics to
[[outputs.influxdb]]
## The HTTP or UDP URL for your InfluxDB instance. Each item should be
## of the form:
## scheme "://" host [ ":" port]
## The full HTTP or UDP URL for your InfluxDB instance.
##
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.

View File

@@ -4,13 +4,7 @@ import "io"
type Client interface {
Query(command string) error
Write(b []byte) (int, error)
WriteWithParams(b []byte, params WriteParams) (int, error)
WriteStream(b io.Reader, contentLength int) (int, error)
WriteStreamWithParams(b io.Reader, contentLength int, params WriteParams) (int, error)
WriteStream(b io.Reader) error
Close() error
}

View File

@@ -10,6 +10,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"path"
"time"
)
@@ -53,6 +54,7 @@ func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
}
} else {
transport = http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: config.TLSConfig,
}
}
@@ -135,60 +137,13 @@ func (c *httpClient) Query(command string) error {
return c.doRequest(req, http.StatusOK)
}
func (c *httpClient) Write(b []byte) (int, error) {
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), c.writeURL)
func (c *httpClient) WriteStream(r io.Reader) error {
req, err := c.makeWriteRequest(r, c.writeURL)
if err != nil {
return 0, nil
return err
}
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return len(b), nil
}
return 0, err
}
func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), writeURL(c.url, wp))
if err != nil {
return 0, nil
}
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return len(b), nil
}
return 0, err
}
func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
req, err := c.makeWriteRequest(r, contentLength, c.writeURL)
if err != nil {
return 0, nil
}
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return contentLength, nil
}
return 0, err
}
func (c *httpClient) WriteStreamWithParams(
r io.Reader,
contentLength int,
wp WriteParams,
) (int, error) {
req, err := c.makeWriteRequest(r, contentLength, writeURL(c.url, wp))
if err != nil {
return 0, nil
}
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return contentLength, nil
}
return 0, err
return c.doRequest(req, http.StatusNoContent)
}
func (c *httpClient) doRequest(
@@ -230,7 +185,6 @@ func (c *httpClient) doRequest(
func (c *httpClient) makeWriteRequest(
body io.Reader,
contentLength int,
writeURL string,
) (*http.Request, error) {
req, err := c.makeRequest(writeURL, body)
@@ -239,8 +193,6 @@ func (c *httpClient) makeWriteRequest(
}
if c.config.ContentEncoding == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
} else {
req.Header.Set("Content-Length", fmt.Sprint(contentLength))
}
return req, nil
}
@@ -304,8 +256,11 @@ func writeURL(u *url.URL, wp WriteParams) string {
}
u.RawQuery = params.Encode()
u.Path = "write"
return u.String()
p := u.Path
u.Path = path.Join(p, "write")
s := u.String()
u.Path = p
return s
}
func queryURL(u *url.URL, command string) string {
@@ -313,6 +268,9 @@ func queryURL(u *url.URL, command string) string {
params.Set("q", command)
u.RawQuery = params.Encode()
u.Path = "query"
return u.String()
p := u.Path
u.Path = path.Join(p, "query")
s := u.String()
u.Path = p
return s
}

View File

@@ -110,66 +110,8 @@ func TestHTTPClient_Write(t *testing.T) {
client, err := NewHTTP(config, wp)
defer client.Close()
assert.NoError(t, err)
n, err := client.Write([]byte("cpu value=99\n"))
assert.Equal(t, 13, n)
assert.NoError(t, err)
_, err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")), 13)
assert.NoError(t, err)
}
func TestHTTPClient_WriteParamsOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
// test that database is set properly
if r.FormValue("db") != "override" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`)
}
// Validate the request body:
buf := make([]byte, 100)
n, _ := r.Body.Read(buf)
expected := "cpu value=99"
got := string(buf[0 : n-1])
if expected != got {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got)
fmt.Fprintln(w, msg)
}
w.WriteHeader(http.StatusNoContent)
w.Header().Set("Content-Type", "application/json")
case "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
defer client.Close()
assert.NoError(t, err)
// test that WriteWithParams overrides the default write params
wp := WriteParams{
Database: "override",
}
n, err := client.WriteWithParams([]byte("cpu value=99\n"), wp)
assert.Equal(t, 13, n)
assert.NoError(t, err)
_, err = client.WriteStreamWithParams(bytes.NewReader([]byte("cpu value=99\n")), 13, wp)
err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")))
assert.NoError(t, err)
}
@@ -197,23 +139,7 @@ func TestHTTPClient_Write_Errors(t *testing.T) {
assert.NoError(t, err)
lp := []byte("cpu value=99\n")
n, err := client.Write(lp)
assert.Equal(t, 0, n)
assert.Error(t, err)
n, err = client.WriteStream(bytes.NewReader(lp), 13)
assert.Equal(t, 0, n)
assert.Error(t, err)
wp := WriteParams{
Database: "override",
}
n, err = client.WriteWithParams(lp, wp)
assert.Equal(t, 0, n)
assert.Error(t, err)
n, err = client.WriteStreamWithParams(bytes.NewReader(lp), 13, wp)
assert.Equal(t, 0, n)
err = client.WriteStream(bytes.NewReader(lp))
assert.Error(t, err)
}
@@ -373,3 +299,37 @@ func TestGzipCompression(t *testing.T) {
assert.Equal(t, []byte(influxLine), uncompressed.Bytes())
}
func TestHTTPClient_PathPrefix(t *testing.T) {
prefix := "/some/random/prefix"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case prefix + "/write":
w.WriteHeader(http.StatusNoContent)
w.Header().Set("Content-Type", "application/json")
case prefix + "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
default:
w.WriteHeader(http.StatusNotFound)
msg := fmt.Sprintf("Path not found: %s", r.URL.Path)
fmt.Fprintln(w, msg)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL + prefix,
}
wp := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, wp)
defer client.Close()
assert.NoError(t, err)
err = client.Query("CREATE DATABASE test")
assert.NoError(t, err)
err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")))
assert.NoError(t, err)
}

View File

@@ -1,7 +1,6 @@
package client
import (
"bytes"
"fmt"
"io"
"log"
@@ -62,18 +61,8 @@ func (c *udpClient) Query(command string) error {
return nil
}
// Write will send the byte stream to the given UDP client endpoint
func (c *udpClient) Write(b []byte) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1)
}
// WriteWithParams are ignored by the UDP client, will forward to WriteStream
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1)
}
// WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
func (c *udpClient) WriteStream(r io.Reader) error {
var totaln int
for {
nR, err := r.Read(c.buffer)
@@ -81,14 +70,14 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
break
}
if err != io.EOF && err != nil {
return totaln, err
return err
}
if c.buffer[nR-1] == uint8('\n') {
nW, err := c.conn.Write(c.buffer[0:nR])
totaln += nW
if err != nil {
return totaln, err
return err
}
} else {
log.Printf("E! Could not fit point into UDP payload; dropping")
@@ -99,7 +88,7 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
break
}
if err != io.EOF && err != nil {
return totaln, err
return err
}
if c.buffer[nR-1] == uint8('\n') {
break
@@ -107,13 +96,7 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
}
}
}
return totaln, nil
}
// WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client
// write params are ignored by the UDP client
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
return c.WriteStream(r, -1)
return nil
}
// Close will terminate the provided client connection

View File

@@ -9,7 +9,6 @@ import (
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestUDPClient(t *testing.T) {
@@ -65,43 +64,6 @@ func TestUDPClient_Write(t *testing.T) {
}
}()
// test sending simple metric
n, err := client.Write([]byte("cpu value=99\n"))
assert.Equal(t, n, 13)
assert.NoError(t, err)
pkt := <-packets
assert.Equal(t, "cpu value=99\n", pkt)
wp := WriteParams{}
//
// Using WriteStream() & a metric.Reader:
config3 := UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 40,
}
client3, err := NewUDP(config3)
assert.NoError(t, err)
now := time.Unix(1484142942, 0)
m1, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, now)
m2, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, now)
m3, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, now)
ms := []telegraf.Metric{m1, m2, m3}
mReader := metric.NewReader(ms)
n, err = client3.WriteStreamWithParams(mReader, 10, wp)
// 3 metrics at 35 bytes each (including the newline)
assert.Equal(t, 105, n)
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
assert.NoError(t, client.Close())
config = UDPConfig{
@@ -112,17 +74,15 @@ func TestUDPClient_Write(t *testing.T) {
assert.NoError(t, err)
ts := time.Unix(1484142943, 0)
m1, _ = metric.New("test", map[string]string{},
m1, _ := metric.New("test", map[string]string{},
map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts)
m2, _ = metric.New("test", map[string]string{},
m2, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, ts)
ms = []telegraf.Metric{m1, m2}
ms := []telegraf.Metric{m1, m2}
reader := metric.NewReader(ms)
n, err = client4.WriteStream(reader, 0)
err = client4.WriteStream(reader)
assert.NoError(t, err)
require.Equal(t, 35, n)
assert.NoError(t, err)
pkt = <-packets
pkt := <-packets
assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt)
assert.NoError(t, client4.Close())

View File

@@ -53,9 +53,7 @@ type InfluxDB struct {
}
var sampleConfig = `
## The HTTP or UDP URL for your InfluxDB instance. Each item should be
## of the form:
## scheme "://" host [ ":" port]
## The full HTTP or UDP URL for your InfluxDB instance.
##
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
@@ -185,12 +183,6 @@ func (i *InfluxDB) Description() string {
// Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
bufsize := 0
for _, m := range metrics {
bufsize += m.Len()
}
r := metric.NewReader(metrics)
// This will get set to nil if a successful write occurs
@@ -198,7 +190,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
p := rand.Perm(len(i.clients))
for _, n := range p {
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
if e := i.clients[n].WriteStream(r); e != nil {
// If the database was not found, try to recreate it:
if strings.Contains(e.Error(), "database not found") {
errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))

View File

@@ -80,6 +80,9 @@ func (l *Librato) Connect() error {
"api_user and api_token are required fields for librato output")
}
l.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: l.Timeout.Duration,
}
return nil

View File

@@ -67,6 +67,10 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
}
func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
buf = bytes.TrimSpace(buf)
if len(buf) == 0 {
return make([]telegraf.Metric, 0), nil
}
if !isarray(buf) {
metrics := make([]telegraf.Metric, 0)
@@ -155,8 +159,6 @@ func (f *JSONFlattener) FullFlattenJSON(
return nil
}
case nil:
// ignored types
fmt.Println("json parser ignoring " + fieldname)
return nil
default:
return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)",

View File

@@ -84,6 +84,16 @@ func TestParseValidJSON(t *testing.T) {
"b_c": float64(6),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())
// Test that whitespace only will parse as an empty list of metrics
metrics, err = parser.Parse([]byte("\n\t"))
assert.NoError(t, err)
assert.Len(t, metrics, 0)
// Test that an empty string will parse as an empty list of metrics
metrics, err = parser.Parse([]byte(""))
assert.NoError(t, err)
assert.Len(t, metrics, 0)
}
func TestParseLineValidJSON(t *testing.T) {

View File

@@ -6,7 +6,7 @@ After=network.target
[Service]
EnvironmentFile=-/etc/default/telegraf
User=telegraf
ExecStart=/usr/bin/telegraf -config /etc/telegraf/telegraf.conf -config-directory /etc/telegraf/telegraf.d ${TELEGRAF_OPTS}
ExecStart=/usr/bin/telegraf -config /etc/telegraf/telegraf.conf -config-directory /etc/telegraf/telegraf.d $TELEGRAF_OPTS
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure
RestartForceExitStatus=SIGPIPE