Compare commits

..

46 Commits

Author SHA1 Message Date
Daniel Nelson
0cc5fc0ce4 Set 1.4.2 release date
(cherry picked from commit 4e0c8e6026)
2017-10-10 13:31:06 -07:00
Daniel Nelson
8011109466 Remove InfluxDB path prefix test
This tests a feature that is not yet on this branch and the test was
mistakenly backported.
2017-10-05 16:37:58 -07:00
Daniel Nelson
588f0c77f8 Update changelog
(cherry picked from commit 13c7802b84)
2017-10-05 16:17:06 -07:00
Daniel Nelson
4301b8e32a Use chunked transfer encoding in InfluxDB output (#3307)
(cherry picked from commit cce40c515a)
2017-10-05 16:17:05 -07:00
Daniel Nelson
3c9d7db0a0 Update changelog
(cherry picked from commit 6e1fa559a3)
2017-10-05 16:06:11 -07:00
Daniel Nelson
f7b3eb1ebd Fix panic in cpu input if number of cpus changes (#3306)
(cherry picked from commit f56dda0ac8)
2017-10-05 16:06:11 -07:00
Daniel Nelson
b8ab827629 Update changelog
(cherry picked from commit 002ccf3295)
2017-10-03 15:27:49 -07:00
Daniel Nelson
d03e2fca32 Add support for proxy environment variables to http_response (#3302)
(cherry picked from commit a163effa6d)
2017-10-03 15:26:55 -07:00
Daniel Nelson
eca00c10e0 Add support for standard proxy env vars in outputs. (#3212)
(cherry picked from commit 7b08f9d099)
2017-10-03 15:26:44 -07:00
Daniel Nelson
9cf19df04e Update changelog
(cherry picked from commit f67350107d)
2017-10-02 17:17:10 -07:00
Daniel Nelson
e77c2b76e7 Fix case sensitivity error in sqlserver input (#3287)
(cherry picked from commit 8e3ed96d6f)
2017-10-02 17:17:10 -07:00
Daniel Nelson
c749c43dab Fix mqtt_consumer connection_timeout test
(cherry picked from commit cdca81c999)
2017-10-02 12:32:05 -07:00
Daniel Nelson
1be17ea5af Update example config 2017-09-29 16:04:02 -07:00
Daniel Nelson
e1155bec20 Update changelog
(cherry picked from commit 29b6f4168c)
2017-09-29 16:01:11 -07:00
Daniel Nelson
cfac750469 Fix format of connection_timeout in mqtt_consumer (#3286)
(cherry picked from commit 3d62e045af)
2017-09-29 16:01:11 -07:00
Daniel Nelson
f10d5b43c4 Update changelog
(cherry picked from commit cadafa6405)
2017-09-26 16:03:30 -07:00
Daniel Nelson
47b2d04d5b Allow JSON data format to contain zero metrics (#3268)
(cherry picked from commit 22a9ffbb9d)
2017-09-26 16:03:30 -07:00
Daniel Nelson
0e0da57b9a Update changelog
(cherry picked from commit 2e1457a496)
2017-09-26 15:38:41 -07:00
Daniel Nelson
8e7cf0109e Fix parsing of JSON with a UTF8 BOM in httpjson (#3267)
(cherry picked from commit 8614445235)
2017-09-26 15:38:41 -07:00
Daniel Nelson
5b791fd2e5 Update changelog
(cherry picked from commit f23d1eb078)
2017-09-26 15:29:19 -07:00
Daniel Nelson
293b1a0093 Fix dmcache tests with 32bit int
(cherry picked from commit ef5c12bd86)
2017-09-26 15:29:01 -07:00
Daniel Nelson
761ea06d6a Fix cgroup tests with 32bit int
(cherry picked from commit c013cc1497)
2017-09-26 15:29:01 -07:00
Daniel Nelson
8fafe9878b Fix ceph tests with 32bit int
(cherry picked from commit bb665cf013)
2017-09-26 15:29:01 -07:00
Daniel Nelson
5da3eef38b Allow 64bit integers in kernel_vmstat
(cherry picked from commit f823fc73f6)
2017-09-26 15:29:00 -07:00
Daniel Nelson
2de7aa23d7 Set 1.4.1 release date in changelog
(cherry picked from commit fd702e6bb8)
2017-09-26 14:19:51 -07:00
Daniel Nelson
52cd38150c Update changelog
(cherry picked from commit 0048bf2120)
2017-09-18 14:25:57 -07:00
Daniel Nelson
c08f492f78 Fix arm64 packages contain 32-bit executable (#3246)
(cherry picked from commit b8e134cd37)
2017-09-18 14:25:57 -07:00
Daniel Nelson
66cfe80e37 Update changelog
(cherry picked from commit b94cda6b46)
2017-09-14 15:30:51 -07:00
Trevor Pounds
ba5e5ec283 Fix panic in statsd p100 calculation (#3230)
(cherry picked from commit 73372872c2)
2017-09-14 15:30:51 -07:00
Daniel Nelson
259f8e4002 Update changelog
(cherry picked from commit 875ab3c4b7)
2017-09-14 15:05:38 -07:00
Mark Wilkinson - m82labs
558ab0c730 Fix duplicate keys in perf counters sqlserver query (#3175)
(cherry picked from commit 1c5ebd4be3)
2017-09-14 15:05:38 -07:00
Daniel Nelson
8d4fbe29e7 Update changelog
(cherry picked from commit 103d24bfba)
2017-09-14 15:01:28 -07:00
Daniel Nelson
72337a1c97 Fix skipped line with empty target in iptables (#3235)
(cherry picked from commit d5f48e3e96)
2017-09-14 15:01:21 -07:00
Daniel Nelson
86537899b2 Update changelog
(cherry picked from commit 7a41d2c586)
2017-09-14 13:07:30 -07:00
Trevor Pounds
a727d5d1f0 Fix counter and gauge metric types. (#3232)
(cherry picked from commit fa1982323a)
2017-09-14 13:07:30 -07:00
Daniel Nelson
7ec194a482 Update changelog
(cherry picked from commit cdf63c5776)
2017-09-13 17:32:03 -07:00
Daniel Nelson
5a77d28837 Whitelist allowed char classes for opentsdb output. (#3227)
(cherry picked from commit 0a8c2e0b3b)
2017-09-13 17:32:03 -07:00
Daniel Nelson
47927c353d Fix fluentd test
(cherry picked from commit eebee9759f)
2017-09-12 17:58:29 -07:00
Daniel Nelson
b9e7fa27aa Update changelog
(cherry picked from commit c5cfde667a)
2017-09-12 17:18:29 -07:00
Daniel Nelson
0d437140bd Fix optional field types in fluentd input
(cherry picked from commit 8a68e7424c)
2017-09-12 17:18:29 -07:00
Daniel Nelson
36969a63c2 Update changelog
(cherry picked from commit cc63b3b667)
2017-09-11 12:28:37 -07:00
DanKans
e9a12bb694 Fix MQTT input exits if Broker is not available on startup (#3202)
(cherry picked from commit 5488f4b3ac)
2017-09-11 12:28:12 -07:00
Daniel Nelson
34b7a4c361 Add 1.4.0 release date
(cherry picked from commit ab1c11b06d)
2017-09-05 17:15:06 -07:00
Daniel Nelson
f46370d982 Sort metrics before comparing in graphite test
(cherry picked from commit 98e784faf3)
2017-09-05 12:50:55 -07:00
Daniel Nelson
07b7e09749 Update changelog
(cherry picked from commit f43af72785)
2017-08-31 13:44:05 -07:00
Daniel Nelson
e54795795d Fix panic when handling string fields with escapes (#3188)
(cherry picked from commit 28d16188b3)
2017-08-30 21:17:10 -07:00
44 changed files with 590 additions and 498 deletions

View File

@@ -1,4 +1,31 @@
## v1.4 [unreleased]
## v1.4.2 [2017-10-10]
### Bugfixes
- [#3259](https://github.com/influxdata/telegraf/issues/3259): Fix error if int larger than 32-bit in /proc/vmstat.
- [#3265](https://github.com/influxdata/telegraf/issues/3265): Fix parsing of JSON with a UTF8 BOM in httpjson.
- [#2887](https://github.com/influxdata/telegraf/issues/2887): Allow JSON data format to contain zero metrics.
- [#3284](https://github.com/influxdata/telegraf/issues/3284): Fix format of connection_timeout in mqtt_consumer.
- [#3081](https://github.com/influxdata/telegraf/issues/3081): Fix case sensitivity error in sqlserver input.
- [#3297](https://github.com/influxdata/telegraf/issues/3297): Add support for proxy environment variables to http_response.
- [#1588](https://github.com/influxdata/telegraf/issues/1588): Add support for standard proxy env vars in outputs.
- [#3282](https://github.com/influxdata/telegraf/issues/3282): Fix panic in cpu input if number of cpus changes.
- [#2854](https://github.com/influxdata/telegraf/issues/2854): Use chunked transfer encoding in InfluxDB output.
## v1.4.1 [2017-09-26]
### Bugfixes
- [#3167](https://github.com/influxdata/telegraf/issues/3167): Fix MQTT input exits if Broker is not available on startup.
- [#3217](https://github.com/influxdata/telegraf/issues/3217): Fix optional field value conversions in fluentd input.
- [#3227](https://github.com/influxdata/telegraf/issues/3227): Whitelist allowed char classes for opentsdb output.
- [#3232](https://github.com/influxdata/telegraf/issues/3232): Fix counter and gauge metric types.
- [#3235](https://github.com/influxdata/telegraf/issues/3235): Fix skipped line with empty target in iptables.
- [#3175](https://github.com/influxdata/telegraf/issues/3175): Fix duplicate keys in perf counters sqlserver query.
- [#3230](https://github.com/influxdata/telegraf/issues/3230): Fix panic in statsd p100 calculation.
- [#3242](https://github.com/influxdata/telegraf/issues/3242): Fix arm64 packages contain 32-bit executable.
## v1.4 [2017-09-05]
### Release Notes
@@ -103,6 +130,7 @@
- [#2672](https://github.com/influxdata/telegraf/issues/2672): Fix NSQ input plugin when used with version 1.0.0-compat.
- [#2523](https://github.com/influxdata/telegraf/issues/2523): Added CloudWatch metric constraint validation.
- [#3179](https://github.com/influxdata/telegraf/issues/3179): Skip non-numerical values in graphite format.
- [#3187](https://github.com/influxdata/telegraf/issues/3187): Fix panic when handling string fields with escapes.
## v1.3.5 [2017-07-26]

View File

@@ -121,6 +121,9 @@
## HTTP Proxy Config
# http_proxy = "http://corporate.proxy:3128"
## Optional HTTP headers
# http_headers = {"X-Special-Header" = "Special-Value"}
## Compress each HTTP request payload using GZIP.
# content_encoding = "gzip"
@@ -1095,10 +1098,7 @@
# # Read metrics from fail2ban.
# [[inputs.fail2ban]]
# ## fail2ban-client require root access.
# ## Setting 'use_sudo' to true will make use of sudo to run fail2ban-client.
# ## Users must configure sudo to allow telegraf user to run fail2ban-client with no password.
# ## This plugin run only "fail2ban-client status".
# ## Use sudo to run fail2ban-client
# use_sudo = false
@@ -1960,13 +1960,9 @@
# ## Includes connection time, any redirects, and reading the response body.
# # client_timeout = "4s"
#
# ## A list of nodes to gather as the rabbitmq_node measurement. If not
# ## specified, metrics for all nodes are gathered.
# ## A list of nodes to pull metrics about. If not specified, metrics for
# ## all nodes are gathered.
# # nodes = ["rabbit@node1", "rabbit@node2"]
#
# ## A list of queues to gather as the rabbitmq_queue measurement. If not
# ## specified, metrics for all queues are gathered.
# # queues = ["telegraf"]
# # Read raindrops stats (raindrops - real-time stats for preforking Rack servers)
@@ -2382,10 +2378,10 @@
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
#
# ## Data format to output.
# ## Data format to consume.
# ## Each data format has its own unique set of configuration options, read
# ## more about them here:
# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
@@ -2519,6 +2515,8 @@
# servers = ["localhost:1883"]
# ## MQTT QoS, must be 0, 1, or 2
# qos = 0
# ## Connection timeout for initial connection in seconds
# connection_timeout = "30s"
#
# ## Topics to subscribe to
# topics = [

View File

@@ -150,12 +150,6 @@ func makemetric(
continue
}
case string:
if strings.HasSuffix(val, `\`) {
log.Printf("D! Measurement [%s] field [%s] has a value "+
"ending with a backslash, skipping", measurement, k)
delete(fields, k)
continue
}
fields[k] = v
default:
fields[k] = v

View File

@@ -370,16 +370,17 @@ func TestMakeMetric_TrailingSlash(t *testing.T) {
expectedTags: map[string]string{},
},
{
name: "Field value with trailing slash dropped",
name: "Field value with trailing slash okay",
measurement: `cpu`,
fields: map[string]interface{}{
"value": int64(42),
"bad": `xyzzy\`,
"ok": `xyzzy\`,
},
tags: map[string]string{},
expectedMeasurement: `cpu`,
expectedFields: map[string]interface{}{
"value": int64(42),
"ok": `xyzzy\`,
},
expectedTags: map[string]string{},
},
@@ -387,7 +388,7 @@ func TestMakeMetric_TrailingSlash(t *testing.T) {
name: "Must have one field after dropped",
measurement: `cpu`,
fields: map[string]interface{}{
"bad": `xyzzy\`,
"bad": math.NaN(),
},
tags: map[string]string{},
expectedNil: true,

View File

@@ -21,14 +21,14 @@ func New(
t time.Time,
mType ...telegraf.ValueType,
) (telegraf.Metric, error) {
if len(fields) == 0 {
return nil, fmt.Errorf("Metric cannot be made without any fields")
}
if len(name) == 0 {
return nil, fmt.Errorf("Metric cannot be made with an empty name")
return nil, fmt.Errorf("missing measurement name")
}
if len(fields) == 0 {
return nil, fmt.Errorf("%s: must have one or more fields", name)
}
if strings.HasSuffix(name, `\`) {
return nil, fmt.Errorf("Metric cannot have measurement name ending with a backslash")
return nil, fmt.Errorf("%s: measurement name cannot end with a backslash", name)
}
var thisType telegraf.ValueType
@@ -49,10 +49,10 @@ func New(
taglen := 0
for k, v := range tags {
if strings.HasSuffix(k, `\`) {
return nil, fmt.Errorf("Metric cannot have tag key ending with a backslash")
return nil, fmt.Errorf("%s: tag key cannot end with a backslash: %s", name, k)
}
if strings.HasSuffix(v, `\`) {
return nil, fmt.Errorf("Metric cannot have tag value ending with a backslash")
return nil, fmt.Errorf("%s: tag value cannot end with a backslash: %s", name, v)
}
if len(k) == 0 || len(v) == 0 {
@@ -79,7 +79,7 @@ func New(
fieldlen := 0
for k, _ := range fields {
if strings.HasSuffix(k, `\`) {
return nil, fmt.Errorf("Metric cannot have field key ending with a backslash")
return nil, fmt.Errorf("%s: field key cannot end with a backslash: %s", name, k)
}
// 10 bytes is completely arbitrary, but will at least prevent some
@@ -102,7 +102,8 @@ func New(
}
// indexUnescapedByte finds the index of the first byte equal to b in buf that
// is not escaped. Returns -1 if not found.
// is not escaped. Does not allow the escape char to be escaped. Returns -1 if
// not found.
func indexUnescapedByte(buf []byte, b byte) int {
var keyi int
for {
@@ -122,6 +123,46 @@ func indexUnescapedByte(buf []byte, b byte) int {
return keyi
}
// indexUnescapedByteBackslashEscaping finds the index of the first byte equal
// to b in buf that is not escaped. Allows for the escape char `\` to be
// escaped. Returns -1 if not found.
func indexUnescapedByteBackslashEscaping(buf []byte, b byte) int {
var keyi int
for {
i := bytes.IndexByte(buf[keyi:], b)
if i == -1 {
return -1
} else if i == 0 {
break
}
keyi += i
if countBackslashes(buf, keyi-1)%2 == 0 {
break
} else {
keyi++
}
}
return keyi
}
// countBackslashes counts the number of preceding backslashes starting at
// the 'start' index.
func countBackslashes(buf []byte, index int) int {
var count int
for {
if index < 0 {
return count
}
if buf[index] == '\\' {
count++
index--
} else {
break
}
}
return count
}
type metric struct {
name []byte
tags []byte
@@ -283,7 +324,7 @@ func (m *metric) Fields() map[string]interface{} {
// end index of field value
var i3 int
if m.fields[i:][i2] == '"' {
i3 = indexUnescapedByte(m.fields[i:][i2+1:], '"')
i3 = indexUnescapedByteBackslashEscaping(m.fields[i:][i2+1:], '"')
if i3 == -1 {
i3 = len(m.fields[i:])
}

View File

@@ -258,6 +258,7 @@ func TestNewMetric_Fields(t *testing.T) {
"quote_string": `x"y`,
"backslash_quote_string": `x\"y`,
"backslash": `x\y`,
"ends_with_backslash": `x\`,
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)

View File

@@ -26,7 +26,7 @@ func TestParseSockId(t *testing.T) {
func TestParseMonDump(t *testing.T) {
dump, err := parseDump(monPerfDump)
assert.NoError(t, err)
assert.InEpsilon(t, 5678670180, dump["cluster"]["osd_kb_used"], epsilon)
assert.InEpsilon(t, int64(5678670180), dump["cluster"]["osd_kb_used"], epsilon)
assert.InEpsilon(t, 6866.540527000, dump["paxos"]["store_state_latency.sum"], epsilon)
}

View File

@@ -225,7 +225,7 @@ var fileFormats = [...]fileFormat{
}
func numberOrString(s string) interface{} {
i, err := strconv.Atoi(s)
i, err := strconv.ParseInt(s, 10, 64)
if err == nil {
return i
}

View File

@@ -31,17 +31,17 @@ func TestCgroupStatistics_1(t *testing.T) {
"path": "testdata/memory",
}
fields := map[string]interface{}{
"memory.stat.cache": 1739362304123123123,
"memory.stat.rss": 1775325184,
"memory.stat.rss_huge": 778043392,
"memory.stat.mapped_file": 421036032,
"memory.stat.dirty": -307200,
"memory.max_usage_in_bytes.0": 0,
"memory.max_usage_in_bytes.1": -1,
"memory.max_usage_in_bytes.2": 2,
"memory.limit_in_bytes": 223372036854771712,
"memory.stat.cache": int64(1739362304123123123),
"memory.stat.rss": int64(1775325184),
"memory.stat.rss_huge": int64(778043392),
"memory.stat.mapped_file": int64(421036032),
"memory.stat.dirty": int64(-307200),
"memory.max_usage_in_bytes.0": int64(0),
"memory.max_usage_in_bytes.1": int64(-1),
"memory.max_usage_in_bytes.2": int64(2),
"memory.limit_in_bytes": int64(223372036854771712),
"memory.use_hierarchy": "12-781",
"notify_on_release": 0,
"notify_on_release": int64(0),
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
}
@@ -63,10 +63,10 @@ func TestCgroupStatistics_2(t *testing.T) {
"path": "testdata/cpu",
}
fields := map[string]interface{}{
"cpuacct.usage_percpu.0": -1452543795404,
"cpuacct.usage_percpu.1": 1376681271659,
"cpuacct.usage_percpu.2": 1450950799997,
"cpuacct.usage_percpu.3": -1473113374257,
"cpuacct.usage_percpu.0": int64(-1452543795404),
"cpuacct.usage_percpu.1": int64(1376681271659),
"cpuacct.usage_percpu.2": int64(1450950799997),
"cpuacct.usage_percpu.3": int64(-1473113374257),
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
}
@@ -88,7 +88,7 @@ func TestCgroupStatistics_3(t *testing.T) {
"path": "testdata/memory/group_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"memory.limit_in_bytes": int64(223372036854771712),
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
@@ -115,7 +115,7 @@ func TestCgroupStatistics_4(t *testing.T) {
"path": "testdata/memory/group_1/group_1_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"memory.limit_in_bytes": int64(223372036854771712),
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
@@ -147,7 +147,7 @@ func TestCgroupStatistics_5(t *testing.T) {
"path": "testdata/memory/group_1/group_1_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"memory.limit_in_bytes": int64(223372036854771712),
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
@@ -174,9 +174,9 @@ func TestCgroupStatistics_6(t *testing.T) {
"path": "testdata/memory",
}
fields := map[string]interface{}{
"memory.usage_in_bytes": 3513667584,
"memory.usage_in_bytes": int64(3513667584),
"memory.use_hierarchy": "12-781",
"memory.kmem.limit_in_bytes": 9223372036854771712,
"memory.kmem.limit_in_bytes": int64(9223372036854771712),
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
}

View File

@@ -16,21 +16,21 @@ const metricName = "dmcache"
type cacheStatus struct {
device string
length int
length int64
target string
metadataBlocksize int
metadataUsed int
metadataTotal int
cacheBlocksize int
cacheUsed int
cacheTotal int
readHits int
readMisses int
writeHits int
writeMisses int
demotions int
promotions int
dirty int
metadataBlocksize int64
metadataUsed int64
metadataTotal int64
cacheBlocksize int64
cacheUsed int64
cacheTotal int64
readHits int64
readMisses int64
writeHits int64
writeMisses int64
demotions int64
promotions int64
dirty int64
}
func (c *DMCache) Gather(acc telegraf.Accumulator) error {
@@ -69,12 +69,12 @@ func parseDMSetupStatus(line string) (cacheStatus, error) {
}
status.device = strings.TrimRight(values[0], ":")
status.length, err = strconv.Atoi(values[2])
status.length, err = strconv.ParseInt(values[2], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.target = values[3]
status.metadataBlocksize, err = strconv.Atoi(values[4])
status.metadataBlocksize, err = strconv.ParseInt(values[4], 10, 64)
if err != nil {
return cacheStatus{}, err
}
@@ -82,15 +82,15 @@ func parseDMSetupStatus(line string) (cacheStatus, error) {
if len(metadata) != 2 {
return cacheStatus{}, parseError
}
status.metadataUsed, err = strconv.Atoi(metadata[0])
status.metadataUsed, err = strconv.ParseInt(metadata[0], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.metadataTotal, err = strconv.Atoi(metadata[1])
status.metadataTotal, err = strconv.ParseInt(metadata[1], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.cacheBlocksize, err = strconv.Atoi(values[6])
status.cacheBlocksize, err = strconv.ParseInt(values[6], 10, 64)
if err != nil {
return cacheStatus{}, err
}
@@ -98,39 +98,39 @@ func parseDMSetupStatus(line string) (cacheStatus, error) {
if len(cache) != 2 {
return cacheStatus{}, parseError
}
status.cacheUsed, err = strconv.Atoi(cache[0])
status.cacheUsed, err = strconv.ParseInt(cache[0], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.cacheTotal, err = strconv.Atoi(cache[1])
status.cacheTotal, err = strconv.ParseInt(cache[1], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.readHits, err = strconv.Atoi(values[8])
status.readHits, err = strconv.ParseInt(values[8], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.readMisses, err = strconv.Atoi(values[9])
status.readMisses, err = strconv.ParseInt(values[9], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.writeHits, err = strconv.Atoi(values[10])
status.writeHits, err = strconv.ParseInt(values[10], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.writeMisses, err = strconv.Atoi(values[11])
status.writeMisses, err = strconv.ParseInt(values[11], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.demotions, err = strconv.Atoi(values[12])
status.demotions, err = strconv.ParseInt(values[12], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.promotions, err = strconv.Atoi(values[13])
status.promotions, err = strconv.ParseInt(values[13], 10, 64)
if err != nil {
return cacheStatus{}, err
}
status.dirty, err = strconv.Atoi(values[14])
status.dirty, err = strconv.ParseInt(values[14], 10, 64)
if err != nil {
return cacheStatus{}, err
}

View File

@@ -35,20 +35,20 @@ func TestPerDeviceGoodOutput(t *testing.T) {
"device": "cs-1",
}
fields1 := map[string]interface{}{
"length": 4883791872,
"metadata_blocksize": 8,
"metadata_used": 1018,
"metadata_total": 1501122,
"cache_blocksize": 512,
"cache_used": 7,
"cache_total": 464962,
"read_hits": 139,
"read_misses": 352643,
"write_hits": 15,
"write_misses": 46,
"demotions": 0,
"promotions": 7,
"dirty": 0,
"length": int64(4883791872),
"metadata_blocksize": int64(8),
"metadata_used": int64(1018),
"metadata_total": int64(1501122),
"cache_blocksize": int64(512),
"cache_used": int64(7),
"cache_total": int64(464962),
"read_hits": int64(139),
"read_misses": int64(352643),
"write_hits": int64(15),
"write_misses": int64(46),
"demotions": int64(0),
"promotions": int64(7),
"dirty": int64(0),
}
acc.AssertContainsTaggedFields(t, measurement, fields1, tags1)
@@ -56,20 +56,20 @@ func TestPerDeviceGoodOutput(t *testing.T) {
"device": "cs-2",
}
fields2 := map[string]interface{}{
"length": 4294967296,
"metadata_blocksize": 8,
"metadata_used": 72352,
"metadata_total": 1310720,
"cache_blocksize": 128,
"cache_used": 26,
"cache_total": 24327168,
"read_hits": 2409,
"read_misses": 286,
"write_hits": 265,
"write_misses": 524682,
"demotions": 0,
"promotions": 0,
"dirty": 0,
"length": int64(4294967296),
"metadata_blocksize": int64(8),
"metadata_used": int64(72352),
"metadata_total": int64(1310720),
"cache_blocksize": int64(128),
"cache_used": int64(26),
"cache_total": int64(24327168),
"read_hits": int64(2409),
"read_misses": int64(286),
"write_hits": int64(265),
"write_misses": int64(524682),
"demotions": int64(0),
"promotions": int64(0),
"dirty": int64(0),
}
acc.AssertContainsTaggedFields(t, measurement, fields2, tags2)
@@ -78,20 +78,20 @@ func TestPerDeviceGoodOutput(t *testing.T) {
}
fields3 := map[string]interface{}{
"length": 9178759168,
"metadata_blocksize": 16,
"metadata_used": 73370,
"metadata_total": 2811842,
"cache_blocksize": 640,
"cache_used": 33,
"cache_total": 24792130,
"read_hits": 2548,
"read_misses": 352929,
"write_hits": 280,
"write_misses": 524728,
"demotions": 0,
"promotions": 7,
"dirty": 0,
"length": int64(9178759168),
"metadata_blocksize": int64(16),
"metadata_used": int64(73370),
"metadata_total": int64(2811842),
"cache_blocksize": int64(640),
"cache_used": int64(33),
"cache_total": int64(24792130),
"read_hits": int64(2548),
"read_misses": int64(352929),
"write_hits": int64(280),
"write_misses": int64(524728),
"demotions": int64(0),
"promotions": int64(7),
"dirty": int64(0),
}
acc.AssertContainsTaggedFields(t, measurement, fields3, tags3)
}
@@ -113,20 +113,20 @@ func TestNotPerDeviceGoodOutput(t *testing.T) {
}
fields := map[string]interface{}{
"length": 9178759168,
"metadata_blocksize": 16,
"metadata_used": 73370,
"metadata_total": 2811842,
"cache_blocksize": 640,
"cache_used": 33,
"cache_total": 24792130,
"read_hits": 2548,
"read_misses": 352929,
"write_hits": 280,
"write_misses": 524728,
"demotions": 0,
"promotions": 7,
"dirty": 0,
"length": int64(9178759168),
"metadata_blocksize": int64(16),
"metadata_used": int64(73370),
"metadata_total": int64(2811842),
"cache_blocksize": int64(640),
"cache_used": int64(33),
"cache_total": int64(24792130),
"read_hits": int64(2548),
"read_misses": int64(352929),
"write_hits": int64(280),
"write_misses": int64(524728),
"demotions": int64(0),
"promotions": int64(7),
"dirty": int64(0),
}
acc.AssertContainsTaggedFields(t, measurement, fields, tags)
}

View File

@@ -148,15 +148,15 @@ func (h *Fluentd) Gather(acc telegraf.Accumulator) error {
}
if p.BufferQueueLength != nil {
tmpFields["buffer_queue_length"] = p.BufferQueueLength
tmpFields["buffer_queue_length"] = *p.BufferQueueLength
}
if p.RetryCount != nil {
tmpFields["retry_count"] = p.RetryCount
tmpFields["retry_count"] = *p.RetryCount
}
if p.BufferTotalQueuedSize != nil {
tmpFields["buffer_total_queued_size"] = p.BufferTotalQueuedSize
tmpFields["buffer_total_queued_size"] = *p.BufferTotalQueuedSize
}
if !((p.BufferQueueLength == nil) && (p.RetryCount == nil) && (p.BufferTotalQueuedSize == nil)) {

View File

@@ -122,12 +122,6 @@ func Test_parse(t *testing.T) {
}
func Test_Gather(t *testing.T) {
if testing.Short() {
t.Skip("Skipping Gather function test")
}
t.Log("Testing Gather function")
t.Logf("Start HTTP mock (%s) with sampleJSON", fluentdTest.Endpoint)
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -157,13 +151,13 @@ func Test_Gather(t *testing.T) {
assert.Equal(t, expectedOutput[0].PluginID, acc.Metrics[0].Tags["plugin_id"])
assert.Equal(t, expectedOutput[0].PluginType, acc.Metrics[0].Tags["plugin_type"])
assert.Equal(t, expectedOutput[0].PluginCategory, acc.Metrics[0].Tags["plugin_category"])
assert.Equal(t, expectedOutput[0].RetryCount, acc.Metrics[0].Fields["retry_count"])
assert.Equal(t, *expectedOutput[0].RetryCount, acc.Metrics[0].Fields["retry_count"])
assert.Equal(t, expectedOutput[1].PluginID, acc.Metrics[1].Tags["plugin_id"])
assert.Equal(t, expectedOutput[1].PluginType, acc.Metrics[1].Tags["plugin_type"])
assert.Equal(t, expectedOutput[1].PluginCategory, acc.Metrics[1].Tags["plugin_category"])
assert.Equal(t, expectedOutput[1].RetryCount, acc.Metrics[1].Fields["retry_count"])
assert.Equal(t, expectedOutput[1].BufferQueueLength, acc.Metrics[1].Fields["buffer_queue_length"])
assert.Equal(t, expectedOutput[1].BufferTotalQueuedSize, acc.Metrics[1].Fields["buffer_total_queued_size"])
assert.Equal(t, *expectedOutput[1].RetryCount, acc.Metrics[1].Fields["retry_count"])
assert.Equal(t, *expectedOutput[1].BufferQueueLength, acc.Metrics[1].Fields["buffer_queue_length"])
assert.Equal(t, *expectedOutput[1].BufferTotalQueuedSize, acc.Metrics[1].Fields["buffer_total_queued_size"])
}

View File

@@ -98,6 +98,7 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
}
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DisableKeepAlives: true,
TLSClientConfig: tlsCfg,
},

View File

@@ -1,6 +1,7 @@
package httpjson
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
@@ -15,6 +16,10 @@ import (
"github.com/influxdata/telegraf/plugins/parsers"
)
var (
utf8BOM = []byte("\xef\xbb\xbf")
)
// HttpJson struct
type HttpJson struct {
Name string
@@ -170,7 +175,6 @@ func (h *HttpJson) gatherServer(
serverURL string,
) error {
resp, responseTime, err := h.sendRequest(serverURL)
if err != nil {
return err
}
@@ -266,6 +270,7 @@ func (h *HttpJson) sendRequest(serverURL string) (string, float64, error) {
if err != nil {
return string(body), responseTime, err
}
body = bytes.TrimPrefix(body, utf8BOM)
// Process response
if resp.StatusCode != http.StatusOK {

View File

@@ -477,15 +477,13 @@ func TestHttpJsonBadJson(t *testing.T) {
assert.Equal(t, 0, acc.NFields())
}
// Test response to empty string as response objectgT
// Test response to empty string as response object
func TestHttpJsonEmptyResponse(t *testing.T) {
httpjson := genMockHttpJson(empty, 200)
var acc testutil.Accumulator
err := acc.GatherError(httpjson[0].Gather)
assert.Error(t, err)
assert.Equal(t, 0, acc.NFields())
assert.NoError(t, err)
}
// Test that the proper values are ignored or collected
@@ -560,3 +558,18 @@ func TestHttpJsonArray200Tags(t *testing.T) {
}
}
}
var jsonBOM = []byte("\xef\xbb\xbf[{\"value\":17}]")
// TestHttpJsonBOM tests that UTF-8 JSON with a BOM can be parsed
func TestHttpJsonBOM(t *testing.T) {
httpjson := genMockHttpJson(string(jsonBOM), 200)
for _, service := range httpjson {
if service.Name == "other_webapp" {
var acc testutil.Accumulator
err := acc.GatherError(service.Gather)
require.NoError(t, err)
}
}
}

View File

@@ -95,7 +95,7 @@ const measurement = "iptables"
var errParse = errors.New("Cannot parse iptables list information")
var chainNameRe = regexp.MustCompile(`^Chain\s+(\S+)`)
var fieldsHeaderRe = regexp.MustCompile(`^\s*pkts\s+bytes\s+`)
var commentRe = regexp.MustCompile(`\s*/\*\s*(.+?)\s*\*/\s*`)
var valuesRe = regexp.MustCompile(`^\s*(\d+)\s+(\d+)\s+.*?/\*\s*(.+?)\s*\*/\s*`)
func (ipt *Iptables) parseAndGather(data string, acc telegraf.Accumulator) error {
lines := strings.Split(data, "\n")
@@ -110,21 +110,14 @@ func (ipt *Iptables) parseAndGather(data string, acc telegraf.Accumulator) error
return errParse
}
for _, line := range lines[2:] {
tokens := strings.Fields(line)
if len(tokens) < 10 {
matches := valuesRe.FindStringSubmatch(line)
if len(matches) != 4 {
continue
}
pkts := tokens[0]
bytes := tokens[1]
end := strings.Join(tokens[9:], " ")
matches := commentRe.FindStringSubmatch(end)
if matches == nil {
continue
}
comment := matches[1]
pkts := matches[1]
bytes := matches[2]
comment := matches[3]
tags := map[string]string{"table": ipt.Table, "chain": mchain[1], "ruleid": comment}
fields := make(map[string]interface{})

View File

@@ -154,68 +154,85 @@ func TestIptables_Gather(t *testing.T) {
tags: []map[string]string{},
fields: [][]map[string]interface{}{},
},
{ // 11 - all target and ports
table: "all_recv",
chains: []string{"accountfwd"},
values: []string{
`Chain accountfwd (1 references)
pkts bytes target prot opt in out source destination
123 456 all -- eth0 * 0.0.0.0/0 0.0.0.0/0 /* all_recv */
`},
tags: []map[string]string{
map[string]string{"table": "all_recv", "chain": "accountfwd", "ruleid": "all_recv"},
},
fields: [][]map[string]interface{}{
{map[string]interface{}{"pkts": uint64(123), "bytes": uint64(456)}},
},
},
}
for i, tt := range tests {
i++
ipt := &Iptables{
Table: tt.table,
Chains: tt.chains,
lister: func(table, chain string) (string, error) {
if len(tt.values) > 0 {
v := tt.values[0]
tt.values = tt.values[1:]
return v, nil
}
return "", nil
},
}
acc := new(testutil.Accumulator)
err := acc.GatherError(ipt.Gather)
if !reflect.DeepEqual(tt.err, err) {
t.Errorf("%d: expected error '%#v' got '%#v'", i, tt.err, err)
}
if tt.table == "" {
n := acc.NFields()
if n != 0 {
t.Errorf("%d: expected 0 fields if empty table got %d", i, n)
t.Run(tt.table, func(t *testing.T) {
i++
ipt := &Iptables{
Table: tt.table,
Chains: tt.chains,
lister: func(table, chain string) (string, error) {
if len(tt.values) > 0 {
v := tt.values[0]
tt.values = tt.values[1:]
return v, nil
}
return "", nil
},
}
continue
}
if len(tt.chains) == 0 {
n := acc.NFields()
if n != 0 {
t.Errorf("%d: expected 0 fields if empty chains got %d", i, n)
acc := new(testutil.Accumulator)
err := acc.GatherError(ipt.Gather)
if !reflect.DeepEqual(tt.err, err) {
t.Errorf("%d: expected error '%#v' got '%#v'", i, tt.err, err)
}
continue
}
if len(tt.tags) == 0 {
n := acc.NFields()
if n != 0 {
t.Errorf("%d: expected 0 values got %d", i, n)
if tt.table == "" {
n := acc.NFields()
if n != 0 {
t.Errorf("%d: expected 0 fields if empty table got %d", i, n)
}
return
}
continue
}
n := 0
for j, tags := range tt.tags {
for k, fields := range tt.fields[j] {
if len(acc.Metrics) < n+1 {
t.Errorf("%d: expected at least %d values got %d", i, n+1, len(acc.Metrics))
break
if len(tt.chains) == 0 {
n := acc.NFields()
if n != 0 {
t.Errorf("%d: expected 0 fields if empty chains got %d", i, n)
}
m := acc.Metrics[n]
if !reflect.DeepEqual(m.Measurement, measurement) {
t.Errorf("%d %d %d: expected measurement '%#v' got '%#v'\n", i, j, k, measurement, m.Measurement)
}
if !reflect.DeepEqual(m.Tags, tags) {
t.Errorf("%d %d %d: expected tags\n%#v got\n%#v\n", i, j, k, tags, m.Tags)
}
if !reflect.DeepEqual(m.Fields, fields) {
t.Errorf("%d %d %d: expected fields\n%#v got\n%#v\n", i, j, k, fields, m.Fields)
}
n++
return
}
}
if len(tt.tags) == 0 {
n := acc.NFields()
if n != 0 {
t.Errorf("%d: expected 0 values got %d", i, n)
}
return
}
n := 0
for j, tags := range tt.tags {
for k, fields := range tt.fields[j] {
if len(acc.Metrics) < n+1 {
t.Errorf("%d: expected at least %d values got %d", i, n+1, len(acc.Metrics))
break
}
m := acc.Metrics[n]
if !reflect.DeepEqual(m.Measurement, measurement) {
t.Errorf("%d %d %d: expected measurement '%#v' got '%#v'\n", i, j, k, measurement, m.Measurement)
}
if !reflect.DeepEqual(m.Tags, tags) {
t.Errorf("%d %d %d: expected tags\n%#v got\n%#v\n", i, j, k, tags, m.Tags)
}
if !reflect.DeepEqual(m.Fields, fields) {
t.Errorf("%d %d %d: expected fields\n%#v got\n%#v\n", i, j, k, fields, m.Fields)
}
n++
}
}
})
}
}

View File

@@ -13,6 +13,8 @@ The plugin expects messages in the
servers = ["localhost:1883"]
## MQTT QoS, must be 0, 1, or 2
qos = 0
## Connection timeout for initial connection in seconds
connection_timeout = "30s"
## Topics to subscribe to
topics = [

View File

@@ -15,12 +15,16 @@ import (
"github.com/eclipse/paho.mqtt.golang"
)
// 30 Seconds is the default used by paho.mqtt.golang
var defaultConnectionTimeout = internal.Duration{Duration: 30 * time.Second}
type MQTTConsumer struct {
Servers []string
Topics []string
Username string
Password string
QoS int `toml:"qos"`
Servers []string
Topics []string
Username string
Password string
QoS int `toml:"qos"`
ConnectionTimeout internal.Duration `toml:"connection_timeout"`
parser parsers.Parser
@@ -48,13 +52,15 @@ type MQTTConsumer struct {
// keep the accumulator internally:
acc telegraf.Accumulator
started bool
connected bool
}
var sampleConfig = `
servers = ["localhost:1883"]
## MQTT QoS, must be 0, 1, or 2
qos = 0
## Connection timeout for initial connection in seconds
connection_timeout = "30s"
## Topics to subscribe to
topics = [
@@ -103,7 +109,7 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) {
func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.Lock()
defer m.Unlock()
m.started = false
m.connected = false
if m.PersistentSession && m.ClientID == "" {
return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" +
@@ -115,26 +121,40 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS)
}
if m.ConnectionTimeout.Duration < 1*time.Second {
return fmt.Errorf("MQTT Consumer, invalid connection_timeout value: %s", m.ConnectionTimeout.Duration)
}
opts, err := m.createOpts()
if err != nil {
return err
}
m.client = mqtt.NewClient(opts)
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
m.in = make(chan mqtt.Message, 1000)
m.done = make(chan struct{})
m.connect()
return nil
}
func (m *MQTTConsumer) connect() error {
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
err := token.Error()
log.Printf("D! MQTT Consumer, connection error - %v", err)
return err
}
go m.receiver()
return nil
}
func (m *MQTTConsumer) onConnect(c mqtt.Client) {
log.Printf("I! MQTT Client Connected")
if !m.PersistentSession || !m.started {
if !m.PersistentSession || !m.connected {
topics := make(map[string]byte)
for _, topic := range m.Topics {
topics[topic] = byte(m.QoS)
@@ -145,7 +165,7 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) {
m.acc.AddError(fmt.Errorf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s",
strings.Join(m.Topics[:], ","), subscribeToken.Error()))
}
m.started = true
m.connected = true
}
return
}
@@ -186,18 +206,27 @@ func (m *MQTTConsumer) recvMessage(_ mqtt.Client, msg mqtt.Message) {
func (m *MQTTConsumer) Stop() {
m.Lock()
defer m.Unlock()
close(m.done)
m.client.Disconnect(200)
m.started = false
if m.connected {
close(m.done)
m.client.Disconnect(200)
m.connected = false
}
}
func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error {
if !m.connected {
m.connect()
}
return nil
}
func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts := mqtt.NewClientOptions()
opts.ConnectTimeout = m.ConnectionTimeout.Duration
if m.ClientID == "" {
opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5))
} else {
@@ -238,11 +267,14 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts.SetCleanSession(!m.PersistentSession)
opts.SetOnConnectHandler(m.onConnect)
opts.SetConnectionLostHandler(m.onConnectionLost)
return opts, nil
}
func init() {
inputs.Add("mqtt_consumer", func() telegraf.Input {
return &MQTTConsumer{}
return &MQTTConsumer{
ConnectionTimeout: defaultConnectionTimeout,
}
})
}

View File

@@ -22,11 +22,13 @@ const (
func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) {
in := make(chan mqtt.Message, 100)
n := &MQTTConsumer{
Topics: []string{"telegraf"},
Servers: []string{"localhost:1883"},
in: in,
done: make(chan struct{}),
Topics: []string{"telegraf"},
Servers: []string{"localhost:1883"},
in: in,
done: make(chan struct{}),
connected: true,
}
return n, in
}
@@ -131,6 +133,7 @@ func TestRunParserAndGather(t *testing.T) {
n, in := newTestMQTTConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()

View File

@@ -2,11 +2,12 @@ package sqlserver
import (
"database/sql"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
// go-mssqldb initialization
_ "github.com/zensqlmonitor/go-mssqldb"
)
@@ -1022,7 +1023,7 @@ CREATE TABLE #PCounters
Primary Key(object_name, counter_name, instance_name)
);
INSERT #PCounters
SELECT RTrim(spi.object_name) object_name
SELECT DISTINCT RTrim(spi.object_name) object_name
, RTrim(spi.counter_name) counter_name
, RTrim(spi.instance_name) instance_name
, spi.cntr_value
@@ -1044,7 +1045,7 @@ CREATE TABLE #CCounters
Primary Key(object_name, counter_name, instance_name)
);
INSERT #CCounters
SELECT RTrim(spi.object_name) object_name
SELECT DISTINCT RTrim(spi.object_name) object_name
, RTrim(spi.counter_name) counter_name
, RTrim(spi.instance_name) instance_name
, spi.cntr_value
@@ -1436,16 +1437,16 @@ SELECT
, type = 'Wait stats'
---- values
, [I/O] = SUM([I/O])
, [Latch] = SUM([Latch])
, [Lock] = SUM([Lock])
, [Network] = SUM([Network])
, [Service broker] = SUM([Service broker])
, [Memory] = SUM([Memory])
, [Buffer] = SUM([Buffer])
, [Latch] = SUM([LATCH])
, [Lock] = SUM([LOCK])
, [Network] = SUM([NETWORK])
, [Service broker] = SUM([SERVICE BROKER])
, [Memory] = SUM([MEMORY])
, [Buffer] = SUM([BUFFER])
, [CLR] = SUM([CLR])
, [SQLOS] = SUM([SQLOS])
, [XEvent] = SUM([XEvent])
, [Other] = SUM([Other])
, [XEvent] = SUM([XEVENT])
, [Other] = SUM([OTHER])
, [Total] = SUM([I/O]+[LATCH]+[LOCK]+[NETWORK]+[SERVICE BROKER]+[MEMORY]+[BUFFER]+[CLR]+[XEVENT]+[SQLOS]+[OTHER])
FROM
(
@@ -1479,16 +1480,16 @@ SELECT
, type = 'Wait stats'
---- values
, [I/O] = SUM([I/O])
, [Latch] = SUM([Latch])
, [Lock] = SUM([Lock])
, [Network] = SUM([Network])
, [Service broker] = SUM([Service broker])
, [Memory] = SUM([Memory])
, [Buffer] = SUM([Buffer])
, [Latch] = SUM([LATCH])
, [Lock] = SUM([LOCK])
, [Network] = SUM([NETWORK])
, [Service broker] = SUM([SERVICE BROKER])
, [Memory] = SUM([MEMORY])
, [Buffer] = SUM([BUFFER])
, [CLR] = SUM([CLR])
, [SQLOS] = SUM([SQLOS])
, [XEvent] = SUM([XEvent])
, [Other] = SUM([Other])
, [XEvent] = SUM([XEVENT])
, [Other] = SUM([OTHER])
, [Total] = SUM([I/O]+[LATCH]+[LOCK]+[NETWORK]+[SERVICE BROKER]+[MEMORY]+[BUFFER]+[CLR]+[XEVENT]+[SQLOS]+[OTHER])
FROM
(

View File

@@ -101,8 +101,15 @@ func (rs *RunningStats) Percentile(n int) float64 {
}
i := int(float64(len(rs.perc)) * float64(n) / float64(100))
if i < 0 {
i = 0
}
return rs.perc[i]
return rs.perc[clamp(i, 0, len(rs.perc)-1)]
}
func clamp(i int, min int, max int) int {
if i < min {
return min
}
if i > max {
return max
}
return i
}

View File

@@ -23,12 +23,18 @@ func TestRunningStats_Single(t *testing.T) {
if rs.Lower() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Lower())
}
if rs.Percentile(100) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(100))
}
if rs.Percentile(90) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(90))
}
if rs.Percentile(50) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(50))
}
if rs.Percentile(0) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(0))
}
if rs.Count() != 1 {
t.Errorf("Expected %v, got %v", 1, rs.Count())
}
@@ -58,12 +64,18 @@ func TestRunningStats_Duplicate(t *testing.T) {
if rs.Lower() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Lower())
}
if rs.Percentile(100) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(100))
}
if rs.Percentile(90) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(90))
}
if rs.Percentile(50) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(50))
}
if rs.Percentile(0) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(0))
}
if rs.Count() != 4 {
t.Errorf("Expected %v, got %v", 4, rs.Count())
}
@@ -93,12 +105,18 @@ func TestRunningStats(t *testing.T) {
if rs.Lower() != 5 {
t.Errorf("Expected %v, got %v", 5, rs.Lower())
}
if rs.Percentile(100) != 45 {
t.Errorf("Expected %v, got %v", 45, rs.Percentile(100))
}
if rs.Percentile(90) != 32 {
t.Errorf("Expected %v, got %v", 32, rs.Percentile(90))
}
if rs.Percentile(50) != 11 {
t.Errorf("Expected %v, got %v", 11, rs.Percentile(50))
}
if rs.Percentile(0) != 5 {
t.Errorf("Expected %v, got %v", 5, rs.Percentile(0))
}
if rs.Count() != 16 {
t.Errorf("Expected %v, got %v", 4, rs.Count())
}

View File

@@ -251,14 +251,14 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
}
for _, metric := range s.gauges {
acc.AddFields(metric.name, metric.fields, metric.tags, now)
acc.AddGauge(metric.name, metric.fields, metric.tags, now)
}
if s.DeleteGauges {
s.gauges = make(map[string]cachedgauge)
}
for _, metric := range s.counters {
acc.AddFields(metric.name, metric.fields, metric.tags, now)
acc.AddCounter(metric.name, metric.fields, metric.tags, now)
}
if s.DeleteCounters {
s.counters = make(map[string]cachedcounter)

View File

@@ -11,7 +11,7 @@ import (
type CPUStats struct {
ps PS
lastStats []cpu.TimesStat
lastStats map[string]cpu.TimesStat
PerCPU bool `toml:"percpu"`
TotalCPU bool `toml:"totalcpu"`
@@ -53,7 +53,7 @@ func (s *CPUStats) Gather(acc telegraf.Accumulator) error {
}
now := time.Now()
for i, cts := range times {
for _, cts := range times {
tags := map[string]string{
"cpu": cts.CPU,
}
@@ -86,13 +86,16 @@ func (s *CPUStats) Gather(acc telegraf.Accumulator) error {
// If it's the 1st gather, can't get CPU Usage stats yet
continue
}
lastCts := s.lastStats[i]
lastCts, ok := s.lastStats[cts.CPU]
if !ok {
continue
}
lastTotal := totalCpuTime(lastCts)
lastActive := activeCpuTime(lastCts)
totalDelta := total - lastTotal
if totalDelta < 0 {
s.lastStats = times
return fmt.Errorf("Error: current total CPU time is less than previous total CPU time")
}
@@ -118,7 +121,10 @@ func (s *CPUStats) Gather(acc telegraf.Accumulator) error {
acc.AddGauge("cpu", fieldsG, tags, now)
}
s.lastStats = times
s.lastStats = make(map[string]cpu.TimesStat)
for _, cts := range times {
s.lastStats[cts.CPU] = cts
}
return nil
}

View File

@@ -149,3 +149,38 @@ func assertContainsTaggedFloat(
measurement, delta, expectedValue, actualValue)
assert.Fail(t, msg)
}
// TestCPUCountChange tests that no errors are encountered if the number of
// CPUs increases as reported with LXC.
func TestCPUCountIncrease(t *testing.T) {
var mps MockPS
var mps2 MockPS
var acc testutil.Accumulator
var err error
cs := NewCPUStats(&mps)
mps.On("CPUTimes").Return(
[]cpu.TimesStat{
cpu.TimesStat{
CPU: "cpu0",
},
}, nil)
err = cs.Gather(&acc)
require.NoError(t, err)
mps2.On("CPUTimes").Return(
[]cpu.TimesStat{
cpu.TimesStat{
CPU: "cpu0",
},
cpu.TimesStat{
CPU: "cpu1",
},
}, nil)
cs.ps = &mps2
err = cs.Gather(&acc)
require.NoError(t, err)
}

View File

@@ -41,7 +41,7 @@ func (k *KernelVmstat) Gather(acc telegraf.Accumulator) error {
// We only want the even number index as that contain the stat name.
if i%2 == 0 {
// Convert the stat value into an integer.
m, err := strconv.Atoi(string(dataFields[i+1]))
m, err := strconv.ParseInt(string(dataFields[i+1]), 10, 64)
if err != nil {
return err
}

View File

@@ -48,7 +48,7 @@ func TestFullVmStatProcFile(t *testing.T) {
"nr_isolated_anon": int64(0),
"nr_isolated_file": int64(0),
"nr_shmem": int64(541689),
"numa_hit": int64(5113399878),
"numa_hit": int64(6690743595),
"numa_miss": int64(0),
"numa_foreign": int64(0),
"numa_interleave": int64(35793),
@@ -200,7 +200,7 @@ nr_writeback_temp 0
nr_isolated_anon 0
nr_isolated_file 0
nr_shmem 541689
numa_hit 5113399878
numa_hit 6690743595
numa_miss 0
numa_foreign 0
numa_interleave 35793

View File

@@ -48,6 +48,9 @@ func (a *Amon) Connect() error {
return fmt.Errorf("serverkey and amon_instance are required fields for amon output")
}
a.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: a.Timeout.Duration,
}
return nil

View File

@@ -55,7 +55,11 @@ func (d *Datadog) Connect() error {
if d.Apikey == "" {
return fmt.Errorf("apikey is a required field for datadog output")
}
d.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: d.Timeout.Duration,
}
return nil

View File

@@ -4,13 +4,7 @@ import "io"
type Client interface {
Query(command string) error
Write(b []byte) (int, error)
WriteWithParams(b []byte, params WriteParams) (int, error)
WriteStream(b io.Reader, contentLength int) (int, error)
WriteStreamWithParams(b io.Reader, contentLength int, params WriteParams) (int, error)
WriteStream(b io.Reader) error
Close() error
}

View File

@@ -53,6 +53,7 @@ func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
}
} else {
transport = http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: config.TLSConfig,
}
}
@@ -135,60 +136,13 @@ func (c *httpClient) Query(command string) error {
return c.doRequest(req, http.StatusOK)
}
func (c *httpClient) Write(b []byte) (int, error) {
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), c.writeURL)
func (c *httpClient) WriteStream(r io.Reader) error {
req, err := c.makeWriteRequest(r, c.writeURL)
if err != nil {
return 0, nil
return err
}
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return len(b), nil
}
return 0, err
}
func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), writeURL(c.url, wp))
if err != nil {
return 0, nil
}
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return len(b), nil
}
return 0, err
}
func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
req, err := c.makeWriteRequest(r, contentLength, c.writeURL)
if err != nil {
return 0, nil
}
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return contentLength, nil
}
return 0, err
}
func (c *httpClient) WriteStreamWithParams(
r io.Reader,
contentLength int,
wp WriteParams,
) (int, error) {
req, err := c.makeWriteRequest(r, contentLength, writeURL(c.url, wp))
if err != nil {
return 0, nil
}
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return contentLength, nil
}
return 0, err
return c.doRequest(req, http.StatusNoContent)
}
func (c *httpClient) doRequest(
@@ -230,7 +184,6 @@ func (c *httpClient) doRequest(
func (c *httpClient) makeWriteRequest(
body io.Reader,
contentLength int,
writeURL string,
) (*http.Request, error) {
req, err := c.makeRequest(writeURL, body)
@@ -239,8 +192,6 @@ func (c *httpClient) makeWriteRequest(
}
if c.config.ContentEncoding == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
} else {
req.Header.Set("Content-Length", fmt.Sprint(contentLength))
}
return req, nil
}

View File

@@ -110,66 +110,8 @@ func TestHTTPClient_Write(t *testing.T) {
client, err := NewHTTP(config, wp)
defer client.Close()
assert.NoError(t, err)
n, err := client.Write([]byte("cpu value=99\n"))
assert.Equal(t, 13, n)
assert.NoError(t, err)
_, err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")), 13)
assert.NoError(t, err)
}
func TestHTTPClient_WriteParamsOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
// test that database is set properly
if r.FormValue("db") != "override" {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}],"error":"wrong db name"}`)
}
// Validate the request body:
buf := make([]byte, 100)
n, _ := r.Body.Read(buf)
expected := "cpu value=99"
got := string(buf[0 : n-1])
if expected != got {
w.WriteHeader(http.StatusTeapot)
w.Header().Set("Content-Type", "application/json")
msg := fmt.Sprintf(`{"results":[{}],"error":"expected [%s], got [%s]"}`, expected, got)
fmt.Fprintln(w, msg)
}
w.WriteHeader(http.StatusNoContent)
w.Header().Set("Content-Type", "application/json")
case "/query":
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}
}))
defer ts.Close()
config := HTTPConfig{
URL: ts.URL,
}
defaultWP := WriteParams{
Database: "test",
}
client, err := NewHTTP(config, defaultWP)
defer client.Close()
assert.NoError(t, err)
// test that WriteWithParams overrides the default write params
wp := WriteParams{
Database: "override",
}
n, err := client.WriteWithParams([]byte("cpu value=99\n"), wp)
assert.Equal(t, 13, n)
assert.NoError(t, err)
_, err = client.WriteStreamWithParams(bytes.NewReader([]byte("cpu value=99\n")), 13, wp)
err = client.WriteStream(bytes.NewReader([]byte("cpu value=99\n")))
assert.NoError(t, err)
}
@@ -197,23 +139,7 @@ func TestHTTPClient_Write_Errors(t *testing.T) {
assert.NoError(t, err)
lp := []byte("cpu value=99\n")
n, err := client.Write(lp)
assert.Equal(t, 0, n)
assert.Error(t, err)
n, err = client.WriteStream(bytes.NewReader(lp), 13)
assert.Equal(t, 0, n)
assert.Error(t, err)
wp := WriteParams{
Database: "override",
}
n, err = client.WriteWithParams(lp, wp)
assert.Equal(t, 0, n)
assert.Error(t, err)
n, err = client.WriteStreamWithParams(bytes.NewReader(lp), 13, wp)
assert.Equal(t, 0, n)
err = client.WriteStream(bytes.NewReader(lp))
assert.Error(t, err)
}

View File

@@ -1,7 +1,6 @@
package client
import (
"bytes"
"fmt"
"io"
"log"
@@ -62,18 +61,8 @@ func (c *udpClient) Query(command string) error {
return nil
}
// Write will send the byte stream to the given UDP client endpoint
func (c *udpClient) Write(b []byte) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1)
}
// WriteWithParams are ignored by the UDP client, will forward to WriteStream
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1)
}
// WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
func (c *udpClient) WriteStream(r io.Reader) error {
var totaln int
for {
nR, err := r.Read(c.buffer)
@@ -81,14 +70,14 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
break
}
if err != io.EOF && err != nil {
return totaln, err
return err
}
if c.buffer[nR-1] == uint8('\n') {
nW, err := c.conn.Write(c.buffer[0:nR])
totaln += nW
if err != nil {
return totaln, err
return err
}
} else {
log.Printf("E! Could not fit point into UDP payload; dropping")
@@ -99,7 +88,7 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
break
}
if err != io.EOF && err != nil {
return totaln, err
return err
}
if c.buffer[nR-1] == uint8('\n') {
break
@@ -107,13 +96,7 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
}
}
}
return totaln, nil
}
// WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client
// write params are ignored by the UDP client
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
return c.WriteStream(r, -1)
return nil
}
// Close will terminate the provided client connection

View File

@@ -9,7 +9,6 @@ import (
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestUDPClient(t *testing.T) {
@@ -65,43 +64,6 @@ func TestUDPClient_Write(t *testing.T) {
}
}()
// test sending simple metric
n, err := client.Write([]byte("cpu value=99\n"))
assert.Equal(t, n, 13)
assert.NoError(t, err)
pkt := <-packets
assert.Equal(t, "cpu value=99\n", pkt)
wp := WriteParams{}
//
// Using WriteStream() & a metric.Reader:
config3 := UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 40,
}
client3, err := NewUDP(config3)
assert.NoError(t, err)
now := time.Unix(1484142942, 0)
m1, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, now)
m2, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, now)
m3, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, now)
ms := []telegraf.Metric{m1, m2, m3}
mReader := metric.NewReader(ms)
n, err = client3.WriteStreamWithParams(mReader, 10, wp)
// 3 metrics at 35 bytes each (including the newline)
assert.Equal(t, 105, n)
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
assert.NoError(t, client.Close())
config = UDPConfig{
@@ -112,17 +74,15 @@ func TestUDPClient_Write(t *testing.T) {
assert.NoError(t, err)
ts := time.Unix(1484142943, 0)
m1, _ = metric.New("test", map[string]string{},
m1, _ := metric.New("test", map[string]string{},
map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts)
m2, _ = metric.New("test", map[string]string{},
m2, _ := metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, ts)
ms = []telegraf.Metric{m1, m2}
ms := []telegraf.Metric{m1, m2}
reader := metric.NewReader(ms)
n, err = client4.WriteStream(reader, 0)
err = client4.WriteStream(reader)
assert.NoError(t, err)
require.Equal(t, 35, n)
assert.NoError(t, err)
pkt = <-packets
pkt := <-packets
assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt)
assert.NoError(t, client4.Close())

View File

@@ -185,12 +185,6 @@ func (i *InfluxDB) Description() string {
// Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
bufsize := 0
for _, m := range metrics {
bufsize += m.Len()
}
r := metric.NewReader(metrics)
// This will get set to nil if a successful write occurs
@@ -198,7 +192,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
p := rand.Perm(len(i.clients))
for _, n := range p {
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
if e := i.clients[n].WriteStream(r); e != nil {
// If the database was not found, try to recreate it:
if strings.Contains(e.Error(), "database not found") {
errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))

View File

@@ -80,6 +80,9 @@ func (l *Librato) Connect() error {
"api_user and api_token are required fields for librato output")
}
l.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: l.Timeout.Duration,
}
return nil

View File

@@ -5,6 +5,7 @@ import (
"log"
"net"
"net/url"
"regexp"
"sort"
"strconv"
"strings"
@@ -13,6 +14,16 @@ import (
"github.com/influxdata/telegraf/plugins/outputs"
)
var (
allowedChars = regexp.MustCompile(`[^a-zA-Z0-9-_./\p{L}]`)
hypenChars = strings.NewReplacer(
"@", "-",
"*", "-",
`%`, "-",
"#", "-",
"$", "-")
)
type OpenTSDB struct {
Prefix string
@@ -24,9 +35,6 @@ type OpenTSDB struct {
Debug bool
}
var sanitizedChars = strings.NewReplacer("@", "-", "*", "-", " ", "_",
`%`, "-", "#", "-", "$", "-", ":", "_")
var sampleConfig = `
## prefix for metrics keys
prefix = "my.specific.prefix."
@@ -125,8 +133,7 @@ func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric, u *url.URL) error {
}
metric := &HttpMetric{
Metric: sanitizedChars.Replace(fmt.Sprintf("%s%s_%s",
o.Prefix, m.Name(), fieldName)),
Metric: sanitize(fmt.Sprintf("%s%s_%s", o.Prefix, m.Name(), fieldName)),
Tags: tags,
Timestamp: now,
Value: value,
@@ -176,7 +183,7 @@ func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric, u *url.URL) error {
}
messageLine := fmt.Sprintf("put %s %v %s %s\n",
sanitizedChars.Replace(fmt.Sprintf("%s%s_%s", o.Prefix, m.Name(), fieldName)),
sanitize(fmt.Sprintf("%s%s_%s", o.Prefix, m.Name(), fieldName)),
now, metricValue, tags)
_, err := connection.Write([]byte(messageLine))
@@ -192,7 +199,7 @@ func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric, u *url.URL) error {
func cleanTags(tags map[string]string) map[string]string {
tagSet := make(map[string]string, len(tags))
for k, v := range tags {
tagSet[sanitizedChars.Replace(k)] = sanitizedChars.Replace(v)
tagSet[sanitize(k)] = sanitize(v)
}
return tagSet
}
@@ -236,6 +243,13 @@ func (o *OpenTSDB) Close() error {
return nil
}
func sanitize(value string) string {
// Apply special hypenation rules to preserve backwards compatibility
value = hypenChars.Replace(value)
// Replace any remaining illegal chars
return allowedChars.ReplaceAllLiteralString(value, "_")
}
func init() {
outputs.Add("opentsdb", func() telegraf.Output {
return &OpenTSDB{}

View File

@@ -10,9 +10,10 @@ import (
"strconv"
"testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
//"github.com/stretchr/testify/require"
)
func TestCleanTags(t *testing.T) {
@@ -29,8 +30,16 @@ func TestCleanTags(t *testing.T) {
map[string]string{"aaa": "bbb"},
},
{
map[string]string{"Sp%ci@l Chars": "g$t repl#ced"},
map[string]string{"Sp-ci-l_Chars": "g-t_repl-ced"},
map[string]string{"Sp%ci@l Chars[": "g$t repl#ce)d"},
map[string]string{"Sp-ci-l_Chars_": "g-t_repl-ce_d"},
},
{
map[string]string{"μnicodε_letters": "okαy"},
map[string]string{"μnicodε_letters": "okαy"},
},
{
map[string]string{"n☺": "emojies☠"},
map[string]string{"n_": "emojies_"},
},
{
map[string]string{},
@@ -75,6 +84,47 @@ func TestBuildTagsTelnet(t *testing.T) {
}
}
func TestSanitize(t *testing.T) {
tests := []struct {
name string
value string
expected string
}{
{
name: "Ascii letters and numbers allowed",
value: "ascii 123",
expected: "ascii_123",
},
{
name: "Allowed punct",
value: "-_./",
expected: "-_./",
},
{
name: "Special conversions to hyphen",
value: "@*%#$!",
expected: "-----_",
},
{
name: "Unicode Letters allowed",
value: "μnicodε_letters",
expected: "μnicodε_letters",
},
{
name: "Other Unicode not allowed",
value: "“☢”",
expected: "___",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := sanitize(tt.value)
require.Equal(t, tt.expected, actual)
})
}
}
func BenchmarkHttpSend(b *testing.B) {
const BatchSize = 50
const MetricsCount = 4 * BatchSize

View File

@@ -67,6 +67,10 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
}
func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
buf = bytes.TrimSpace(buf)
if len(buf) == 0 {
return make([]telegraf.Metric, 0), nil
}
if !isarray(buf) {
metrics := make([]telegraf.Metric, 0)

View File

@@ -84,6 +84,16 @@ func TestParseValidJSON(t *testing.T) {
"b_c": float64(6),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())
// Test that whitespace only will parse as an empty list of metrics
metrics, err = parser.Parse([]byte("\n\t"))
assert.NoError(t, err)
assert.Len(t, metrics, 0)
// Test that an empty string will parse as an empty list of metrics
metrics, err = parser.Parse([]byte(""))
assert.NoError(t, err)
assert.Len(t, metrics, 0)
}
func TestParseLineValidJSON(t *testing.T) {

View File

@@ -212,6 +212,8 @@ func TestSerializeValueBoolean(t *testing.T) {
fmt.Sprintf("localhost.enabled.cpu0.us-west-2.cpu 1 %d", now.Unix()),
fmt.Sprintf("localhost.disabled.cpu0.us-west-2.cpu 0 %d", now.Unix()),
}
sort.Strings(mS)
sort.Strings(expS)
assert.Equal(t, expS, mS)
}

View File

@@ -274,6 +274,8 @@ def get_system_arch():
arch = "amd64"
elif arch == "386":
arch = "i386"
elif "arm64" in arch:
arch = "arm64"
elif 'arm' in arch:
# Prevent uname from reporting full ARM arch (eg 'armv7l')
arch = "arm"
@@ -446,6 +448,8 @@ def build(version=None,
# Handle variations in architecture output
if arch == "i386" or arch == "i686":
arch = "386"
elif "arm64" in arch:
arch = "arm64"
elif "arm" in arch:
arch = "arm"
build_command += "GOOS={} GOARCH={} ".format(platform, arch)