Compare commits

...

47 Commits

Author SHA1 Message Date
Daniel Nelson 46db92aad3
Set release date for 1.3.3
(cherry picked from commit 75dbf2b0f8)
2017-06-28 13:06:16 -07:00
Daniel Nelson 5fcc5d1bba
Update changelog
(cherry picked from commit f2bb4acd4a)
2017-06-26 15:25:38 -07:00
Bob Shannon 2e6f7055cb
Fix panic in elasticsearch input if cannot determine master (#2954)
(cherry picked from commit a7595c918a)
2017-06-26 15:25:26 -07:00
Daniel Nelson da640a8af7
Update changelog
(cherry picked from commit e028f10586)
2017-06-23 11:06:17 -07:00
Daniel Nelson 1e7750c502
Fix bug parsing default timestamps with modified precision (#2949)
(cherry picked from commit 9276318faf)
2017-06-23 11:06:01 -07:00
Daniel Nelson 0231b3ea0a
Update changelog
(cherry picked from commit 9211985c63)
2017-06-21 12:40:01 -07:00
Daniel Nelson b0ba853395
Remove label value sanitization in prometheus output (#2939)
(cherry picked from commit 929ba0a637)
2017-06-21 12:39:53 -07:00
Daniel Nelson 9722d675bb
Update changelog
(cherry picked from commit a729a44284)
2017-06-16 13:20:43 -07:00
Daniel Nelson a3acfa8163
Allow dos line endings in tail and logparser (#2920)
Parsing dos line ending delimited line protocol is still illegal in most
cases.
(cherry picked from commit 3ecfd32df5)
2017-06-16 13:20:10 -07:00
Daniel Nelson 0f419e9a0d
Update 1.3.2 release date
(cherry picked from commit ca72df5868)
2017-06-14 12:17:44 -07:00
Daniel Nelson 958d689274
Update changelog
(cherry picked from commit ea787b83bf)
2017-06-13 18:07:59 -07:00
Daniel Nelson 2370c04dd7
Ensure prometheus metrics have same set of labels (#2857)
(cherry picked from commit 949072e8dc)
2017-06-13 18:07:34 -07:00
Daniel Nelson 4978c8ccb9
Update changelog
(cherry picked from commit 0c53de6700)
2017-06-08 16:56:10 -07:00
Daniel Nelson abe5e28575
Fix support for mongodb/leofs urls without scheme (#2900)
This was broken by changes in go 1.8 to url.Parse.  This change allows
the string but prompts the user to move to the correct url string.

(cherry picked from commit b277e6e2d7)
2017-06-08 16:55:46 -07:00
Daniel Nelson 34da5a6e45
Update changelog
(cherry picked from commit 84dbf8bb25)
2017-06-07 13:48:58 -07:00
Daniel Nelson 4de1bf29cc
Fix metric splitting edge cases (#2896)
Metrics needing one extra byte to fit the output buffer would not be split, so we would emit lines without a line ending. Metrics which overflowed by exactly one field length would be split one field too late, causing truncated fields.
(cherry picked from commit a275e6792a)
2017-06-07 13:47:58 -07:00
Daniel Nelson f23845afef
Update changelog
(cherry picked from commit a47e6e6efe)
2017-06-05 12:51:58 -07:00
Daniel Nelson 721ed8b3f2
Fix udp metric splitting (#2880)
(cherry picked from commit 5bab4616ff)
2017-06-05 12:50:59 -07:00
Daniel Nelson 385c114d00
Update release date 2017-05-31 14:53:01 -07:00
Daniel Nelson f93615672b
Generate sha256 hashes when packaging
(cherry picked from commit 0b6db905ff)
2017-05-31 12:30:51 -07:00
Daniel Nelson 444f1ba09c
Update changelog
(cherry picked from commit 9529199a44)
2017-05-30 17:41:06 -07:00
Daniel Nelson 5417a12ee3
Fix length calculation of split metric buffer (#2869)
(cherry picked from commit be03abd464)
2017-05-30 17:41:00 -07:00
Daniel Nelson f6c986b5ca
Update changelog
(cherry picked from commit 04aa732e94)
2017-05-30 11:05:56 -07:00
Steve Nardone 7f67bf593f
Fix panic in mongo input (#2848)
(cherry picked from commit e7f9db297e)
2017-05-30 11:05:28 -07:00
Daniel Nelson 7ca902f987
Update changelog
(cherry picked from commit 7d7206b3e2)
2017-05-25 16:24:46 -07:00
Daniel Nelson dc69cacaa1
Update gopsutil version
fixes #2856

(cherry picked from commit 03ca3975b5)
2017-05-25 16:24:33 -07:00
Daniel Nelson 29671154bc
Update changelog
(cherry picked from commit e1088b9eee)
2017-05-25 13:39:55 -07:00
Daniel Nelson 4f8341670e
Fix influxdb output database quoting (#2851)
(cherry picked from commit f47924ffc5)
2017-05-25 13:27:16 -07:00
Daniel Nelson 99edca80ef
Handle process termination during read from /proc (#2816)
Fixes #2815.
(cherry picked from commit c53d9fa9b7)
2017-05-18 18:03:54 -07:00
Daniel Nelson 44654e011c
Reuse transports in input plugins (#2782)
(cherry picked from commit a47aa0dcc2)
2017-05-18 18:02:09 -07:00
Daniel Nelson 389439111b
Fixed sqlserver input to work with case sensitive server collation. (#2749)
Fixed a problem with sqlserver input where database properties are not returned by Telegraf when SQL Server has been set up with a case sensitive server-level collation.

* Added bugfix entry to CHANGELOG.md for sqlserver collation input fix.

(cherry picked from commit e2983383e4)
2017-05-18 17:59:14 -07:00
Daniel Nelson 2bc5594b44
Add release date for 1.3.0 2017-05-15 20:05:22 -07:00
Daniel Nelson 99b53c8745
Add back the changelog entry for 2141 2017-05-15 12:56:11 -07:00
Daniel Nelson 27b89dff48
Only split metrics if there is an udp output (#2799) 2017-05-12 15:34:31 -07:00
Sebastian Borza b16eb6eae6 split metrics based on UDPPayload size (#2795) 2017-05-12 14:42:18 -07:00
Daniel Nelson feaf76913b
Add missing plugins to README 2017-05-09 13:51:26 -07:00
Daniel Nelson ff704fbe0d
Add SLES11 support to rpm package (#2768) 2017-05-05 14:30:31 -07:00
Sébastien ebef47f56a
fix systemd path in order to add compatibility with SuSe (#2499) 2017-05-05 14:30:24 -07:00
Daniel Nelson 18fd2d987d
Return an error if no valid patterns. (#2753) 2017-05-02 14:55:16 -07:00
Alexander Blagoev 5e70cb3e44
Improve redis input documentation (#2708) 2017-05-02 14:12:09 -07:00
Patrick Hemmer ce203dc687
fix close on closed socket_writer (#2748) 2017-05-02 11:07:58 -07:00
Daniel Nelson b0a2e8e1bd
Add initial documentation for rabbitmq input. (#2745) 2017-05-01 18:57:19 -07:00
Daniel Nelson 499495f844
Don't log error creating database on connect (#2740)
closes #2739
2017-04-28 15:59:28 -07:00
Daniel Nelson 20ab8fb2c3
Update telegraf.conf 2017-04-28 13:49:09 -07:00
Daniel Nelson bc474d3a53
Clarify retention policy option for influxdb output
closes #2696
2017-04-28 13:48:24 -07:00
Daniel Nelson 547be87d79
Clarify retention policy option for influxdb output
closes #2696
2017-04-28 13:43:00 -07:00
Daniel Nelson 619d4d5d29
Use go 1.8.1 for CI and Release builds (#2732) 2017-04-27 16:22:41 -07:00
45 changed files with 1495 additions and 480 deletions

View File

@ -1,4 +1,33 @@
## v1.3 [unreleased] ## v1.3.3 [2017-06-28]
### Bugfixes
- [#2915](https://github.com/influxdata/telegraf/issues/2915): Allow dos line endings in tail and logparser.
- [#2937](https://github.com/influxdata/telegraf/issues/2937): Remove label value sanitization in prometheus output.
- [#2948](https://github.com/influxdata/telegraf/issues/2948): Fix bug parsing default timestamps with modified precision.
- [#2954](https://github.com/influxdata/telegraf/issues/2954): Fix panic in elasticsearch input if cannot determine master.
## v1.3.2 [2017-06-14]
### Bugfixes
- [#2862](https://github.com/influxdata/telegraf/issues/2862): Fix InfluxDB UDP metric splitting.
- [#2888](https://github.com/influxdata/telegraf/issues/2888): Fix mongodb/leofs urls without scheme.
- [#2822](https://github.com/influxdata/telegraf/issues/2822): Fix inconsistent label dimensions in prometheus output.
## v1.3.1 [2017-05-31]
### Bugfixes
- [#2749](https://github.com/influxdata/telegraf/pull/2749): Fixed sqlserver input to work with case sensitive server collation.
- [#2782](https://github.com/influxdata/telegraf/pull/2782): Reuse transports in input plugins
- [#2815](https://github.com/influxdata/telegraf/issues/2815): Inputs processes fails with "no such process".
- [#2851](https://github.com/influxdata/telegraf/pull/2851): Fix InfluxDB output database quoting.
- [#2856](https://github.com/influxdata/telegraf/issues/2856): Fix net input on older Linux kernels.
- [#2848](https://github.com/influxdata/telegraf/pull/2848): Fix panic in mongo input.
- [#2869](https://github.com/influxdata/telegraf/pull/2869): Fix length calculation of split metric buffer.
## v1.3 [2017-05-15]
### Release Notes ### Release Notes
@ -79,6 +108,9 @@ be deprecated eventually.
- [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option - [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option
- [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer - [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer
- [#2031](https://github.com/influxdata/telegraf/pull/2031): Add Kapacitor input plugin - [#2031](https://github.com/influxdata/telegraf/pull/2031): Add Kapacitor input plugin
- [#2732](https://github.com/influxdata/telegraf/pull/2732): Use go 1.8.1
- [#2712](https://github.com/influxdata/telegraf/issues/2712): Documentation for rabbitmq input plugin
- [#2141](https://github.com/influxdata/telegraf/pull/2141): Logparser handles newly-created files.
### Bugfixes ### Bugfixes
@ -118,6 +150,7 @@ be deprecated eventually.
- [#1911](https://github.com/influxdata/telegraf/issues/1911): Sysstat plugin needs LANG=C or similar locale - [#1911](https://github.com/influxdata/telegraf/issues/1911): Sysstat plugin needs LANG=C or similar locale
- [#2528](https://github.com/influxdata/telegraf/issues/2528): File output closes standard streams on reload. - [#2528](https://github.com/influxdata/telegraf/issues/2528): File output closes standard streams on reload.
- [#2603](https://github.com/influxdata/telegraf/issues/2603): AMQP output disconnect blocks all outputs - [#2603](https://github.com/influxdata/telegraf/issues/2603): AMQP output disconnect blocks all outputs
- [#2706](https://github.com/influxdata/telegraf/issues/2706): Improve documentation for redis input plugin
## v1.2.1 [2017-02-01] ## v1.2.1 [2017-02-01]

2
Godeps
View File

@ -46,7 +46,7 @@ github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160
github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c
github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693 github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693
github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b
github.com/shirou/gopsutil 70693b6a3da51a8a686d31f1b346077bbc066062 github.com/shirou/gopsutil 9a4a9167ad3b4355dbf1c2c7a0f5f0d3fb1e9ab9
github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15 github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15
github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6 github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6
github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987 github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987

View File

@ -111,6 +111,7 @@ configuration options.
* [couchbase](./plugins/inputs/couchbase) * [couchbase](./plugins/inputs/couchbase)
* [couchdb](./plugins/inputs/couchdb) * [couchdb](./plugins/inputs/couchdb)
* [disque](./plugins/inputs/disque) * [disque](./plugins/inputs/disque)
* [dmcache](./plugins/inputs/dmcache)
* [dns query time](./plugins/inputs/dns_query) * [dns query time](./plugins/inputs/dns_query)
* [docker](./plugins/inputs/docker) * [docker](./plugins/inputs/docker)
* [dovecot](./plugins/inputs/dovecot) * [dovecot](./plugins/inputs/dovecot)
@ -127,6 +128,7 @@ configuration options.
* [ipmi_sensor](./plugins/inputs/ipmi_sensor) * [ipmi_sensor](./plugins/inputs/ipmi_sensor)
* [iptables](./plugins/inputs/iptables) * [iptables](./plugins/inputs/iptables)
* [jolokia](./plugins/inputs/jolokia) * [jolokia](./plugins/inputs/jolokia)
* [kapacitor](./plugins/inputs/kapacitor)
* [kubernetes](./plugins/inputs/kubernetes) * [kubernetes](./plugins/inputs/kubernetes)
* [leofs](./plugins/inputs/leofs) * [leofs](./plugins/inputs/leofs)
* [lustre2](./plugins/inputs/lustre2) * [lustre2](./plugins/inputs/lustre2)
@ -195,6 +197,7 @@ Telegraf can also collect metrics via the following service plugins:
* [github](./plugins/inputs/webhooks/github) * [github](./plugins/inputs/webhooks/github)
* [mandrill](./plugins/inputs/webhooks/mandrill) * [mandrill](./plugins/inputs/webhooks/mandrill)
* [rollbar](./plugins/inputs/webhooks/rollbar) * [rollbar](./plugins/inputs/webhooks/rollbar)
* [papertrail](./plugins/inputs/webhooks/papertrail)
Telegraf is able to parse the following input data formats into metrics, these Telegraf is able to parse the following input data formats into metrics, these
formats may be used with input plugins supporting the `data_format` option: formats may be used with input plugins supporting the `data_format` option:

View File

@ -1,13 +1,11 @@
machine: machine:
go:
version: 1.8.1
services: services:
- docker - docker
post: - memcached
- sudo service zookeeper stop - redis
- go version - rabbitmq-server
- sudo rm -rf /usr/local/go
- wget https://storage.googleapis.com/golang/go1.8.linux-amd64.tar.gz
- sudo tar -C /usr/local -xzf go1.8.linux-amd64.tar.gz
- go version
dependencies: dependencies:
override: override:

View File

@ -95,7 +95,8 @@
## The target database for metrics (telegraf will create it if not exists). ## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp. ## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = "" retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all" ## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any" write_consistency = "any"
@ -141,7 +142,7 @@
# ## described here: https://www.rabbitmq.com/plugins.html # ## described here: https://www.rabbitmq.com/plugins.html
# # auth_method = "PLAIN" # # auth_method = "PLAIN"
# ## Telegraf tag to use as a routing key # ## Telegraf tag to use as a routing key
# ## ie, if this tag exists, it's value will be used as the routing key # ## ie, if this tag exists, its value will be used as the routing key
# routing_tag = "host" # routing_tag = "host"
# #
# ## InfluxDB retention policy # ## InfluxDB retention policy
@ -335,6 +336,10 @@
# ## Use SSL but skip chain & host verification # ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false # # insecure_skip_verify = false
# #
# ## Optional SASL Config
# # sasl_username = "kafka"
# # sasl_password = "secret"
#
# ## Data format to output. # ## Data format to output.
# ## Each data format has its own unique set of configuration options, read # ## Each data format has its own unique set of configuration options, read
# ## more about them here: # ## more about them here:
@ -1325,6 +1330,18 @@
# attribute = "LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount" # attribute = "LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount"
# # Read Kapacitor-formatted JSON metrics from one or more HTTP endpoints
# [[inputs.kapacitor]]
# ## Multiple URLs from which to read Kapacitor-formatted JSON
# ## Default is "http://localhost:9092/kapacitor/v1/debug/vars".
# urls = [
# "http://localhost:9092/kapacitor/v1/debug/vars"
# ]
#
# ## Time limit for http requests
# timeout = "5s"
# # Get kernel statistics from /proc/vmstat # # Get kernel statistics from /proc/vmstat
# [[inputs.kernel_vmstat]] # [[inputs.kernel_vmstat]]
# # no configuration # # no configuration

View File

@ -218,7 +218,7 @@ func (m *metric) SerializeTo(dst []byte) int {
} }
func (m *metric) Split(maxSize int) []telegraf.Metric { func (m *metric) Split(maxSize int) []telegraf.Metric {
if m.Len() < maxSize { if m.Len() <= maxSize {
return []telegraf.Metric{m} return []telegraf.Metric{m}
} }
var out []telegraf.Metric var out []telegraf.Metric
@ -248,7 +248,7 @@ func (m *metric) Split(maxSize int) []telegraf.Metric {
// if true, then we need to create a metric _not_ including the currently // if true, then we need to create a metric _not_ including the currently
// selected field // selected field
if len(m.fields[i:j])+len(fields)+constant > maxSize { if len(m.fields[i:j])+len(fields)+constant >= maxSize {
// if false, then we'll create a metric including the currently // if false, then we'll create a metric including the currently
// selected field anyways. This means that the given maxSize is too // selected field anyways. This means that the given maxSize is too
// small for a single field to fit. // small for a single field to fit.

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestNewMetric(t *testing.T) { func TestNewMetric(t *testing.T) {
@ -458,7 +459,7 @@ func TestSplitMetric(t *testing.T) {
assert.Len(t, split70, 3) assert.Len(t, split70, 3)
split60 := m.Split(60) split60 := m.Split(60)
assert.Len(t, split60, 4) assert.Len(t, split60, 5)
} }
// test splitting metric into various max lengths // test splitting metric into various max lengths
@ -578,6 +579,42 @@ func TestSplitMetric_OneField(t *testing.T) {
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String()) assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
} }
func TestSplitMetric_ExactSize(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
"string": "test",
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)
actual := m.Split(m.Len())
// check that no copy was made
require.Equal(t, &m, &actual[0])
}
func TestSplitMetric_NoRoomForNewline(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)
actual := m.Split(m.Len() - 1)
require.Equal(t, 2, len(actual))
}
func TestNewMetricAggregate(t *testing.T) { func TestNewMetricAggregate(t *testing.T) {
now := time.Now() now := time.Now()

View File

@ -129,7 +129,7 @@ func parseMetric(buf []byte,
// apply precision multiplier // apply precision multiplier
var nsec int64 var nsec int64
multiplier := getPrecisionMultiplier(precision) multiplier := getPrecisionMultiplier(precision)
if multiplier > 1 { if len(ts) > 0 && multiplier > 1 {
tsint, err := parseIntBytes(ts, 10, 64) tsint, err := parseIntBytes(ts, 10, 64)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -380,11 +380,25 @@ func TestParsePrecision(t *testing.T) {
} { } {
metrics, err := ParseWithDefaultTimePrecision( metrics, err := ParseWithDefaultTimePrecision(
[]byte(tt.line+"\n"), time.Now(), tt.precision) []byte(tt.line+"\n"), time.Now(), tt.precision)
assert.NoError(t, err, tt) assert.NoError(t, err)
assert.Equal(t, tt.expected, metrics[0].UnixNano()) assert.Equal(t, tt.expected, metrics[0].UnixNano())
} }
} }
func TestParsePrecisionUnsetTime(t *testing.T) {
for _, tt := range []struct {
line string
precision string
}{
{"test v=42", "s"},
{"test v=42", "ns"},
} {
_, err := ParseWithDefaultTimePrecision(
[]byte(tt.line+"\n"), time.Now(), tt.precision)
assert.NoError(t, err)
}
}
func TestParseMaxKeyLength(t *testing.T) { func TestParseMaxKeyLength(t *testing.T) {
key := "" key := ""
for { for {

View File

@ -57,7 +57,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
// this for-loop is the sunny-day scenario, where we are given a // this for-loop is the sunny-day scenario, where we are given a
// buffer that is large enough to hold at least a single metric. // buffer that is large enough to hold at least a single metric.
// all of the cases below it are edge-cases. // all of the cases below it are edge-cases.
if r.metrics[r.iM].Len() < len(p[i:]) { if r.metrics[r.iM].Len() <= len(p[i:]) {
i += r.metrics[r.iM].SerializeTo(p[i:]) i += r.metrics[r.iM].SerializeTo(p[i:])
} else { } else {
break break
@ -76,7 +76,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
if len(tmp) > 1 { if len(tmp) > 1 {
r.splitMetrics = tmp r.splitMetrics = tmp
r.state = split r.state = split
if r.splitMetrics[0].Len() < len(p) { if r.splitMetrics[0].Len() <= len(p) {
i += r.splitMetrics[0].SerializeTo(p) i += r.splitMetrics[0].SerializeTo(p)
r.iSM = 1 r.iSM = 1
} else { } else {
@ -99,7 +99,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
} }
case split: case split:
if r.splitMetrics[r.iSM].Len() < len(p) { if r.splitMetrics[r.iSM].Len() <= len(p) {
// write the current split metric // write the current split metric
i += r.splitMetrics[r.iSM].SerializeTo(p) i += r.splitMetrics[r.iSM].SerializeTo(p)
r.iSM++ r.iSM++
@ -131,6 +131,10 @@ func (r *reader) Read(p []byte) (n int, err error) {
r.iSM++ r.iSM++
if r.iSM == len(r.splitMetrics) { if r.iSM == len(r.splitMetrics) {
r.iM++ r.iM++
if r.iM == len(r.metrics) {
r.state = done
return i, io.EOF
}
r.state = normal r.state = normal
} else { } else {
r.state = split r.state = split

View File

@ -8,8 +8,8 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func BenchmarkMetricReader(b *testing.B) { func BenchmarkMetricReader(b *testing.B) {
@ -116,6 +116,140 @@ func TestMetricReader_OverflowMetric(t *testing.T) {
} }
} }
// Regression test for when a metric is the same size as the buffer.
//
// Previously EOF would not be set until the next call to Read.
func TestMetricReader_MetricSizeEqualsBufferSize(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{"a": int64(1)}, ts)
metrics := []telegraf.Metric{m1}
r := NewReader(metrics)
buf := make([]byte, m1.Len())
for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}
// Regression test for when a metric requires to be split and one of the
// split metrics is exactly the size of the buffer.
//
// Previously an empty string would be returned on the next Read without error,
// and then next Read call would panic.
func TestMetricReader_SplitWithExactLengthSplit(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{"a": int64(1), "bb": int64(2)}, ts)
metrics := []telegraf.Metric{m1}
r := NewReader(metrics)
buf := make([]byte, 30)
// foo a=1i,bb=2i 1481032190000000000\n // len 35
//
// Requires this specific split order:
// foo a=1i 1481032190000000000\n // len 29
// foo bb=2i 1481032190000000000\n // len 30
for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}
// Regresssion test for when a metric requires to be split and one of the
// split metrics is larger than the buffer.
//
// Previously the metric index would be set incorrectly causing a panic.
func TestMetricReader_SplitOverflowOversized(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{
"a": int64(1),
"bbb": int64(2),
}, ts)
metrics := []telegraf.Metric{m1}
r := NewReader(metrics)
buf := make([]byte, 30)
// foo a=1i,bbb=2i 1481032190000000000\n // len 36
//
// foo a=1i 1481032190000000000\n // len 29
// foo bbb=2i 1481032190000000000\n // len 31
for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}
// Regresssion test for when a split metric exactly fits in the buffer.
//
// Previously the metric would be overflow split when not required.
func TestMetricReader_SplitOverflowUneeded(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{"a": int64(1), "b": int64(2)}, ts)
metrics := []telegraf.Metric{m1}
r := NewReader(metrics)
buf := make([]byte, 29)
// foo a=1i,b=2i 1481032190000000000\n // len 34
//
// foo a=1i 1481032190000000000\n // len 29
// foo b=2i 1481032190000000000\n // len 29
for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}
func TestMetricReader_OverflowMultipleMetrics(t *testing.T) { func TestMetricReader_OverflowMultipleMetrics(t *testing.T) {
ts := time.Unix(1481032190, 0) ts := time.Unix(1481032190, 0)
m, _ := New("foo", map[string]string{}, m, _ := New("foo", map[string]string{},
@ -485,3 +619,17 @@ func TestMetricReader_SplitMetricChangingBuffer2(t *testing.T) {
assert.Equal(t, test.err, err, test.expRegex) assert.Equal(t, test.err, err, test.expRegex)
} }
} }
func TestMetricRoundtrip(t *testing.T) {
const lp = `nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=netstat,sr=database IpExtInBcastOctets=12570626154i,IpExtInBcastPkts=95541226i,IpExtInCEPkts=0i,IpExtInCsumErrors=0i,IpExtInECT0Pkts=55674i,IpExtInECT1Pkts=0i,IpExtInMcastOctets=5928296i,IpExtInMcastPkts=174365i,IpExtInNoECTPkts=17965863529i,IpExtInNoRoutes=20i,IpExtInOctets=3334866321815i,IpExtInTruncatedPkts=0i,IpExtOutBcastOctets=0i,IpExtOutBcastPkts=0i,IpExtOutMcastOctets=0i,IpExtOutMcastPkts=0i,IpExtOutOctets=31397892391399i,TcpExtArpFilter=0i,TcpExtBusyPollRxPackets=0i,TcpExtDelayedACKLocked=14094i,TcpExtDelayedACKLost=302083i,TcpExtDelayedACKs=55486507i,TcpExtEmbryonicRsts=11879i,TcpExtIPReversePathFilter=0i,TcpExtListenDrops=1736i,TcpExtListenOverflows=0i,TcpExtLockDroppedIcmps=0i,TcpExtOfoPruned=0i,TcpExtOutOfWindowIcmps=8i,TcpExtPAWSActive=0i,TcpExtPAWSEstab=974i,TcpExtPAWSPassive=0i,TcpExtPruneCalled=0i,TcpExtRcvPruned=0i,TcpExtSyncookiesFailed=12593i,TcpExtSyncookiesRecv=0i,TcpExtSyncookiesSent=0i,TcpExtTCPACKSkippedChallenge=0i,TcpExtTCPACKSkippedFinWait2=0i,TcpExtTCPACKSkippedPAWS=806i,TcpExtTCPACKSkippedSeq=519i,TcpExtTCPACKSkippedSynRecv=0i,TcpExtTCPACKSkippedTimeWait=0i,TcpExtTCPAbortFailed=0i,TcpExtTCPAbortOnClose=22i,TcpExtTCPAbortOnData=36593i,TcpExtTCPAbortOnLinger=0i,TcpExtTCPAbortOnMemory=0i,TcpExtTCPAbortOnTimeout=674i,TcpExtTCPAutoCorking=494253233i,TcpExtTCPBacklogDrop=0i,TcpExtTCPChallengeACK=281i,TcpExtTCPDSACKIgnoredNoUndo=93354i,TcpExtTCPDSACKIgnoredOld=336i,TcpExtTCPDSACKOfoRecv=0i,TcpExtTCPDSACKOfoSent=7i,TcpExtTCPDSACKOldSent=302073i,TcpExtTCPDSACKRecv=215884i,TcpExtTCPDSACKUndo=7633i,TcpExtTCPDeferAcceptDrop=0i,TcpExtTCPDirectCopyFromBacklog=0i,TcpExtTCPDirectCopyFromPrequeue=0i,TcpExtTCPFACKReorder=1320i,TcpExtTCPFastOpenActive=0i,TcpExtTCPFastOpenActiveFail=0i,TcpExtTCPFastOpenCookieReqd=0i,TcpExtTCPFastOpenListenOverflow=0i,TcpExtTCPFastOpenPassive=0i,TcpExtTCPFastOpenPassiveFail=0i,TcpExtTCPFastRetrans=350681i,TcpExtTCPForwardRetrans=142168i,TcpExtTCPFromZeroWindowAdv=4317i,TcpExtTCPFullUndo=29502i,TcpExtTCPHPAcks=10267073000i,TcpExtTCPHPHits=5629837098i,TcpExtTCPHPHitsToUser=0i,TcpExtTCPHystartDelayCwnd=285127i,TcpExtTCPHystartDelayDetect=12318i,TcpExtTCPHystartTrainCwnd=69160570i,TcpExtTCPHystartTrainDetect=3315799i,TcpExtTCPLossFailures=109i,TcpExtTCPLossProbeRecovery=110819i,TcpExtTCPLossProbes=233995i,TcpExtTCPLossUndo=5276i,TcpExtTCPLostRetransmit=397i,TcpExtTCPMD5NotFound=0i,TcpExtTCPMD5Unexpected=0i,TcpExtTCPMemoryPressures=0i,TcpExtTCPMinTTLDrop=0i,TcpExtTCPOFODrop=0i,TcpExtTCPOFOMerge=7i,TcpExtTCPOFOQueue=15196i,TcpExtTCPOrigDataSent=29055119435i,TcpExtTCPPartialUndo=21320i,TcpExtTCPPrequeueDropped=0i,TcpExtTCPPrequeued=0i,TcpExtTCPPureAcks=1236441827i,TcpExtTCPRcvCoalesce=225590473i,TcpExtTCPRcvCollapsed=0i,TcpExtTCPRenoFailures=0i,TcpExtTCPRenoRecovery=0i,TcpExtTCPRenoRecoveryFail=0i,TcpExtTCPRenoReorder=0i,TcpExtTCPReqQFullDoCookies=0i,TcpExtTCPReqQFullDrop=0i,TcpExtTCPRetransFail=41i,TcpExtTCPSACKDiscard=0i,TcpExtTCPSACKReneging=0i,TcpExtTCPSACKReorder=4307i,TcpExtTCPSYNChallenge=244i,TcpExtTCPSackFailures=1698i,TcpExtTCPSackMerged=184668i,TcpExtTCPSackRecovery=97369i,TcpExtTCPSackRecoveryFail=381i,TcpExtTCPSackShiftFallback=2697079i,TcpExtTCPSackShifted=760299i,TcpExtTCPSchedulerFailed=0i,TcpExtTCPSlowStartRetrans=9276i,TcpExtTCPSpuriousRTOs=959i,TcpExtTCPSpuriousRtxHostQueues=2973i,TcpExtTCPSynRetrans=200970i,TcpExtTCPTSReorder=15221i,TcpExtTCPTimeWaitOverflow=0i,TcpExtTCPTimeouts=70127i,TcpExtTCPToZeroWindowAdv=4317i,TcpExtTCPWantZeroWindowAdv=2133i,TcpExtTW=24809813i,TcpExtTWKilled=0i,TcpExtTWRecycled=0i 1496460785000000000
nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=snmp,sr=database IcmpInAddrMaskReps=0i,IcmpInAddrMasks=90i,IcmpInCsumErrors=0i,IcmpInDestUnreachs=284401i,IcmpInEchoReps=9i,IcmpInEchos=1761912i,IcmpInErrors=407i,IcmpInMsgs=2047767i,IcmpInParmProbs=0i,IcmpInRedirects=0i,IcmpInSrcQuenchs=0i,IcmpInTimeExcds=46i,IcmpInTimestampReps=0i,IcmpInTimestamps=1309i,IcmpMsgInType0=9i,IcmpMsgInType11=46i,IcmpMsgInType13=1309i,IcmpMsgInType17=90i,IcmpMsgInType3=284401i,IcmpMsgInType8=1761912i,IcmpMsgOutType0=1761912i,IcmpMsgOutType14=1248i,IcmpMsgOutType3=108709i,IcmpMsgOutType8=9i,IcmpOutAddrMaskReps=0i,IcmpOutAddrMasks=0i,IcmpOutDestUnreachs=108709i,IcmpOutEchoReps=1761912i,IcmpOutEchos=9i,IcmpOutErrors=0i,IcmpOutMsgs=1871878i,IcmpOutParmProbs=0i,IcmpOutRedirects=0i,IcmpOutSrcQuenchs=0i,IcmpOutTimeExcds=0i,IcmpOutTimestampReps=1248i,IcmpOutTimestamps=0i,IpDefaultTTL=64i,IpForwDatagrams=0i,IpForwarding=2i,IpFragCreates=0i,IpFragFails=0i,IpFragOKs=0i,IpInAddrErrors=0i,IpInDelivers=17658795773i,IpInDiscards=0i,IpInHdrErrors=0i,IpInReceives=17659269339i,IpInUnknownProtos=0i,IpOutDiscards=236976i,IpOutNoRoutes=1009i,IpOutRequests=23466783734i,IpReasmFails=0i,IpReasmOKs=0i,IpReasmReqds=0i,IpReasmTimeout=0i,TcpActiveOpens=23308977i,TcpAttemptFails=3757543i,TcpCurrEstab=280i,TcpEstabResets=184792i,TcpInCsumErrors=0i,TcpInErrs=232i,TcpInSegs=17536573089i,TcpMaxConn=-1i,TcpOutRsts=4051451i,TcpOutSegs=29836254873i,TcpPassiveOpens=176546974i,TcpRetransSegs=878085i,TcpRtoAlgorithm=1i,TcpRtoMax=120000i,TcpRtoMin=200i,UdpInCsumErrors=0i,UdpInDatagrams=24441661i,UdpInErrors=0i,UdpLiteInCsumErrors=0i,UdpLiteInDatagrams=0i,UdpLiteInErrors=0i,UdpLiteNoPorts=0i,UdpLiteOutDatagrams=0i,UdpLiteRcvbufErrors=0i,UdpLiteSndbufErrors=0i,UdpNoPorts=17660i,UdpOutDatagrams=51807896i,UdpRcvbufErrors=0i,UdpSndbufErrors=236922i 1496460785000000000
`
metrics, err := Parse([]byte(lp))
require.NoError(t, err)
r := NewReader(metrics)
buf := make([]byte, 128)
_, err = r.Read(buf)
require.NoError(t, err)
metrics, err = Parse(buf)
require.NoError(t, err)
}

View File

@ -29,6 +29,8 @@ type Apache struct {
SSLKey string `toml:"ssl_key"` SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification // Use SSL but skip chain & host verification
InsecureSkipVerify bool InsecureSkipVerify bool
client *http.Client
} }
var sampleConfig = ` var sampleConfig = `
@ -66,6 +68,14 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
n.ResponseTimeout.Duration = time.Second * 5 n.ResponseTimeout.Duration = time.Second * 5
} }
if n.client == nil {
client, err := n.createHttpClient()
if err != nil {
return err
}
n.client = client
}
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(n.Urls)) wg.Add(len(n.Urls))
for _, u := range n.Urls { for _, u := range n.Urls {
@ -85,31 +95,24 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { func (n *Apache) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
var tr *http.Transport n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
if err != nil {
if addr.Scheme == "https" { return nil, err
tlsCfg, err := internal.GetTLSConfig(
n.SSLCert, n.SSLKey, n.SSLCA, n.InsecureSkipVerify)
if err != nil {
return err
}
tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
TLSClientConfig: tlsCfg,
}
} else {
tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
} }
client := &http.Client{ client := &http.Client{
Transport: tr, Transport: &http.Transport{
Timeout: n.ResponseTimeout.Duration, TLSClientConfig: tlsCfg,
},
Timeout: n.ResponseTimeout.Duration,
} }
return client, nil
}
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
req, err := http.NewRequest("GET", addr.String(), nil) req, err := http.NewRequest("GET", addr.String(), nil)
if err != nil { if err != nil {
return fmt.Errorf("error on new request to %s : %s\n", addr.String(), err) return fmt.Errorf("error on new request to %s : %s\n", addr.String(), err)
@ -119,7 +122,7 @@ func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
req.SetBasicAuth(n.Username, n.Password) req.SetBasicAuth(n.Username, n.Password)
} }
resp, err := client.Do(req) resp, err := n.client.Do(req)
if err != nil { if err != nil {
return fmt.Errorf("error on request to %s : %s\n", addr.String(), err) return fmt.Errorf("error on request to %s : %s\n", addr.String(), err)
} }

View File

@ -169,7 +169,10 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
if e.ClusterStats { if e.ClusterStats {
// get cat/master information here so NodeStats can determine // get cat/master information here so NodeStats can determine
// whether this node is the Master // whether this node is the Master
e.setCatMaster(s + "/_cat/master") if err := e.setCatMaster(s + "/_cat/master"); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
}
} }
// Always gather node states // Always gather node states
@ -353,7 +356,7 @@ func (e *Elasticsearch) setCatMaster(url string) error {
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer // NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
// to let the underlying transport close the connection and re-establish a new one for // to let the underlying transport close the connection and re-establish a new one for
// future calls. // future calls.
return fmt.Errorf("status-code %d, expected %d", r.StatusCode, http.StatusOK) return fmt.Errorf("elasticsearch: Unable to retrieve master node information. API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
} }
response, err := ioutil.ReadAll(r.Body) response, err := ioutil.ReadAll(r.Body)

View File

@ -25,7 +25,6 @@ type HTTPResponse struct {
Headers map[string]string Headers map[string]string
FollowRedirects bool FollowRedirects bool
ResponseStringMatch string ResponseStringMatch string
compiledStringMatch *regexp.Regexp
// Path to CA file // Path to CA file
SSLCA string `toml:"ssl_ca"` SSLCA string `toml:"ssl_ca"`
@ -35,6 +34,9 @@ type HTTPResponse struct {
SSLKey string `toml:"ssl_key"` SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification // Use SSL but skip chain & host verification
InsecureSkipVerify bool InsecureSkipVerify bool
compiledStringMatch *regexp.Regexp
client *http.Client
} }
// Description returns the plugin Description // Description returns the plugin Description
@ -88,13 +90,12 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
tr := &http.Transport{
ResponseHeaderTimeout: h.ResponseTimeout.Duration,
TLSClientConfig: tlsCfg,
}
client := &http.Client{ client := &http.Client{
Transport: tr, Transport: &http.Transport{
Timeout: h.ResponseTimeout.Duration, DisableKeepAlives: true,
TLSClientConfig: tlsCfg,
},
Timeout: h.ResponseTimeout.Duration,
} }
if h.FollowRedirects == false { if h.FollowRedirects == false {
@ -106,15 +107,10 @@ func (h *HTTPResponse) createHttpClient() (*http.Client, error) {
} }
// HTTPGather gathers all fields and returns any errors it encounters // HTTPGather gathers all fields and returns any errors it encounters
func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) { func (h *HTTPResponse) httpGather() (map[string]interface{}, error) {
// Prepare fields // Prepare fields
fields := make(map[string]interface{}) fields := make(map[string]interface{})
client, err := h.createHttpClient()
if err != nil {
return nil, err
}
var body io.Reader var body io.Reader
if h.Body != "" { if h.Body != "" {
body = strings.NewReader(h.Body) body = strings.NewReader(h.Body)
@ -133,7 +129,7 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
// Start Timer // Start Timer
start := time.Now() start := time.Now()
resp, err := client.Do(request) resp, err := h.client.Do(request)
if err != nil { if err != nil {
if h.FollowRedirects { if h.FollowRedirects {
return nil, err return nil, err
@ -145,6 +141,11 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
return nil, err return nil, err
} }
} }
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
fields["response_time"] = time.Since(start).Seconds() fields["response_time"] = time.Since(start).Seconds()
fields["http_response_code"] = resp.StatusCode fields["http_response_code"] = resp.StatusCode
@ -202,8 +203,17 @@ func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error {
// Prepare data // Prepare data
tags := map[string]string{"server": h.Address, "method": h.Method} tags := map[string]string{"server": h.Address, "method": h.Method}
var fields map[string]interface{} var fields map[string]interface{}
if h.client == nil {
client, err := h.createHttpClient()
if err != nil {
return err
}
h.client = client
}
// Gather data // Gather data
fields, err = h.HTTPGather() fields, err = h.httpGather()
if err != nil { if err != nil {
return err return err
} }

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -73,13 +74,13 @@ func TestHeaders(t *testing.T) {
"Host": "Hello", "Host": "Hello",
}, },
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
assert.NotNil(t, fields["response_time"])
} }
func TestFields(t *testing.T) { func TestFields(t *testing.T) {
@ -97,13 +98,14 @@ func TestFields(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather()
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
assert.NotNil(t, fields["response_time"])
} }
func TestRedirects(t *testing.T) { func TestRedirects(t *testing.T) {
@ -121,12 +123,13 @@ func TestRedirects(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{ h = &HTTPResponse{
Address: ts.URL + "/badredirect", Address: ts.URL + "/badredirect",
@ -138,8 +141,12 @@ func TestRedirects(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err = h.HTTPGather() acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.Error(t, err) require.Error(t, err)
value, ok = acc.IntField("http_response", "http_response_code")
require.False(t, ok)
} }
func TestMethod(t *testing.T) { func TestMethod(t *testing.T) {
@ -157,12 +164,13 @@ func TestMethod(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{ h = &HTTPResponse{
Address: ts.URL + "/mustbepostmethod", Address: ts.URL + "/mustbepostmethod",
@ -174,12 +182,13 @@ func TestMethod(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err = h.HTTPGather() acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok = acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusMethodNotAllowed, value)
//check that lowercase methods work correctly //check that lowercase methods work correctly
h = &HTTPResponse{ h = &HTTPResponse{
@ -192,12 +201,13 @@ func TestMethod(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err = h.HTTPGather() acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok = acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusMethodNotAllowed, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusMethodNotAllowed, value)
} }
func TestBody(t *testing.T) { func TestBody(t *testing.T) {
@ -215,12 +225,13 @@ func TestBody(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok := acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusOK, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusOK, value)
h = &HTTPResponse{ h = &HTTPResponse{
Address: ts.URL + "/musthaveabody", Address: ts.URL + "/musthaveabody",
@ -231,12 +242,13 @@ func TestBody(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err = h.HTTPGather() acc = testutil.Accumulator{}
err = h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) { value, ok = acc.IntField("http_response", "http_response_code")
assert.Equal(t, http.StatusBadRequest, fields["http_response_code"]) require.True(t, ok)
} require.Equal(t, http.StatusBadRequest, value)
} }
func TestStringMatch(t *testing.T) { func TestStringMatch(t *testing.T) {
@ -255,15 +267,18 @@ func TestStringMatch(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 1, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 1, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
} }
func TestStringMatchJson(t *testing.T) { func TestStringMatchJson(t *testing.T) {
@ -282,15 +297,18 @@ func TestStringMatchJson(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather() var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 1, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 1, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
} }
func TestStringMatchFail(t *testing.T) { func TestStringMatchFail(t *testing.T) {
@ -309,18 +327,26 @@ func TestStringMatchFail(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
fields, err := h.HTTPGather()
require.NoError(t, err)
assert.NotEmpty(t, fields)
if assert.NotNil(t, fields["http_response_code"]) {
assert.Equal(t, http.StatusOK, fields["http_response_code"])
}
assert.Equal(t, 0, fields["response_string_match"])
assert.NotNil(t, fields["response_time"])
var acc testutil.Accumulator
err := h.Gather(&acc)
require.NoError(t, err)
value, ok := acc.IntField("http_response", "http_response_code")
require.True(t, ok)
require.Equal(t, http.StatusOK, value)
value, ok = acc.IntField("http_response", "response_string_match")
require.True(t, ok)
require.Equal(t, 0, value)
_, ok = acc.FloatField("http_response", "response_time")
require.True(t, ok)
} }
func TestTimeout(t *testing.T) { func TestTimeout(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test with sleep in short mode.")
}
mux := setUpTestMux() mux := setUpTestMux()
ts := httptest.NewServer(mux) ts := httptest.NewServer(mux)
defer ts.Close() defer ts.Close()
@ -335,6 +361,10 @@ func TestTimeout(t *testing.T) {
}, },
FollowRedirects: true, FollowRedirects: true,
} }
_, err := h.HTTPGather() var acc testutil.Accumulator
require.Error(t, err) err := h.Gather(&acc)
require.NoError(t, err)
ok := acc.HasIntField("http_response", "http_response_code")
require.False(t, ok)
} }

View File

@ -3,6 +3,7 @@ package leofs
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"log"
"net/url" "net/url"
"os/exec" "os/exec"
"strconv" "strconv"
@ -18,7 +19,7 @@ import (
const oid = ".1.3.6.1.4.1.35450" const oid = ".1.3.6.1.4.1.35450"
// For Manager Master // For Manager Master
const defaultEndpoint = "127.0.0.1:4020" const defaultEndpoint = "udp://127.0.0.1:4020"
type ServerType int type ServerType int
@ -135,9 +136,9 @@ var serverTypeMapping = map[string]ServerType{
} }
var sampleConfig = ` var sampleConfig = `
## An array of URI to gather stats about LeoFS. ## An array of URLs of the form:
## Specify an ip or hostname with port. ie 127.0.0.1:4020 ## "udp://" host [ ":" port]
servers = ["127.0.0.1:4021"] servers = ["udp://127.0.0.1:4020"]
` `
func (l *LeoFS) SampleConfig() string { func (l *LeoFS) SampleConfig() string {
@ -154,17 +155,28 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
var wg sync.WaitGroup var wg sync.WaitGroup
for _, endpoint := range l.Servers { for i, endpoint := range l.Servers {
_, err := url.Parse(endpoint) if !strings.HasPrefix(endpoint, "udp://") {
// Preserve backwards compatibility for hostnames without a
// scheme, broken in go 1.8. Remove in Telegraf 2.0
endpoint = "udp://" + endpoint
log.Printf("W! [inputs.mongodb] Using %q as connection URL; please update your configuration to use an URL", endpoint)
l.Servers[i] = endpoint
}
u, err := url.Parse(endpoint)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("Unable to parse the address:%s, err:%s", endpoint, err)) acc.AddError(fmt.Errorf("Unable to parse address %q: %s", endpoint, err))
continue continue
} }
port, err := retrieveTokenAfterColon(endpoint) if u.Host == "" {
if err != nil { acc.AddError(fmt.Errorf("Unable to parse address %q", endpoint))
acc.AddError(err)
continue continue
} }
port := u.Port()
if port == "" {
port = "4020"
}
st, ok := serverTypeMapping[port] st, ok := serverTypeMapping[port]
if !ok { if !ok {
st = ServerTypeStorage st = ServerTypeStorage

View File

@ -118,11 +118,18 @@ func (p *Parser) Compile() error {
// Give Patterns fake names so that they can be treated as named // Give Patterns fake names so that they can be treated as named
// "custom patterns" // "custom patterns"
p.namedPatterns = make([]string, len(p.Patterns)) p.namedPatterns = make([]string, 0, len(p.Patterns))
for i, pattern := range p.Patterns { for i, pattern := range p.Patterns {
if pattern == "" {
continue
}
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i) name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n" p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.namedPatterns[i] = "%{" + name + "}" p.namedPatterns = append(p.namedPatterns, "%{"+name+"}")
}
if len(p.namedPatterns) == 0 {
return fmt.Errorf("pattern required")
} }
// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse // Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"log" "log"
"reflect" "reflect"
"strings"
"sync" "sync"
"github.com/influxdata/tail" "github.com/influxdata/tail"
@ -45,6 +46,7 @@ const sampleConfig = `
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file ## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"] files = ["/var/log/apache/access.log"]
## Read files that currently exist from the beginning. Files that are created ## Read files that currently exist from the beginning. Files that are created
## while telegraf is running (and that match the "files" globs) will always ## while telegraf is running (and that match the "files" globs) will always
## be read from the beginning. ## be read from the beginning.
@ -117,16 +119,11 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
} }
// compile log parser patterns: // compile log parser patterns:
var haveError bool
for _, parser := range l.parsers { for _, parser := range l.parsers {
if err := parser.Compile(); err != nil { if err := parser.Compile(); err != nil {
acc.AddError(err) return err
haveError = true
} }
} }
if haveError {
return nil
}
l.wg.Add(1) l.wg.Add(1)
go l.parser() go l.parser()
@ -191,9 +188,12 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {
continue continue
} }
// Fix up files with Windows line endings.
text := strings.TrimRight(line.Text, "\r")
select { select {
case <-l.done: case <-l.done:
case l.lines <- line.Text: case l.lines <- text:
} }
} }
} }

View File

@ -38,12 +38,8 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
logparser.Start(&acc) err := logparser.Start(&acc)
if assert.NotEmpty(t, acc.Errors) { assert.Error(t, err)
assert.Error(t, acc.Errors[0])
}
logparser.Stop()
} }
func TestGrokParseLogFiles(t *testing.T) { func TestGrokParseLogFiles(t *testing.T) {

View File

@ -4,12 +4,12 @@
```toml ```toml
[[inputs.mongodb]] [[inputs.mongodb]]
## An array of URI to gather stats about. Specify an ip or hostname ## An array of URLs of the form:
## with optional port add password. ie, ## "mongodb://" [user ":" pass "@"] host [ ":" port]
## For example:
## mongodb://user:auth_key@10.10.3.30:27017, ## mongodb://user:auth_key@10.10.3.30:27017,
## mongodb://10.10.3.33:18832, ## mongodb://10.10.3.33:18832,
## 10.0.0.1:10000, etc. servers = ["mongodb://127.0.0.1:27017"]
servers = ["127.0.0.1:27017"]
gather_perdb_stats = false gather_perdb_stats = false
## Optional SSL Config ## Optional SSL Config
@ -19,14 +19,7 @@
## Use SSL but skip chain & host verification ## Use SSL but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
``` ```
This connection uri may be different based on your environment and mongodb
For authenticated mongodb instances use `mongodb://` connection URI
```toml
[[inputs.mongodb]]
servers = ["mongodb://username:password@10.XX.XX.XX:27101/mydatabase?authSource=admin"]
```
This connection uri may be different based on your environement and mongodb
setup. If the user doesn't have the required privilege to execute serverStatus setup. If the user doesn't have the required privilege to execute serverStatus
command the you will get this error on telegraf command the you will get this error on telegraf

View File

@ -4,8 +4,10 @@ import (
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"fmt" "fmt"
"log"
"net" "net"
"net/url" "net/url"
"strings"
"sync" "sync"
"time" "time"
@ -37,12 +39,12 @@ type Ssl struct {
} }
var sampleConfig = ` var sampleConfig = `
## An array of URI to gather stats about. Specify an ip or hostname ## An array of URLs of the form:
## with optional port add password. ie, ## "mongodb://" [user ":" pass "@"] host [ ":" port]
## For example:
## mongodb://user:auth_key@10.10.3.30:27017, ## mongodb://user:auth_key@10.10.3.30:27017,
## mongodb://10.10.3.33:18832, ## mongodb://10.10.3.33:18832,
## 10.0.0.1:10000, etc. servers = ["mongodb://127.0.0.1:27017"]
servers = ["127.0.0.1:27017"]
gather_perdb_stats = false gather_perdb_stats = false
## Optional SSL Config ## Optional SSL Config
@ -61,7 +63,7 @@ func (*MongoDB) Description() string {
return "Read metrics from one or many MongoDB servers" return "Read metrics from one or many MongoDB servers"
} }
var localhost = &url.URL{Host: "127.0.0.1:27017"} var localhost = &url.URL{Host: "mongodb://127.0.0.1:27017"}
// Reads stats from all configured servers accumulates stats. // Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any). // Returns one of the errors encountered while gather stats (if any).
@ -72,19 +74,25 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
} }
var wg sync.WaitGroup var wg sync.WaitGroup
for _, serv := range m.Servers { for i, serv := range m.Servers {
if !strings.HasPrefix(serv, "mongodb://") {
// Preserve backwards compatibility for hostnames without a
// scheme, broken in go 1.8. Remove in Telegraf 2.0
serv = "mongodb://" + serv
log.Printf("W! [inputs.mongodb] Using %q as connection URL; please update your configuration to use an URL", serv)
m.Servers[i] = serv
}
u, err := url.Parse(serv) u, err := url.Parse(serv)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("Unable to parse to address '%s': %s", serv, err)) acc.AddError(fmt.Errorf("Unable to parse address %q: %s", serv, err))
continue continue
} else if u.Scheme == "" {
u.Scheme = "mongodb"
// fallback to simple string based address (i.e. "10.0.0.1:10000")
u.Host = serv
if u.Path == u.Host {
u.Path = ""
}
} }
if u.Host == "" {
acc.AddError(fmt.Errorf("Unable to parse address %q", serv))
continue
}
wg.Add(1) wg.Add(1)
go func(srv *Server) { go func(srv *Server) {
defer wg.Done() defer wg.Done()

View File

@ -564,7 +564,7 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec
// BEGIN code modification // BEGIN code modification
if newStat.Repl.IsMaster.(bool) { if newStat.Repl.IsMaster.(bool) {
returnVal.NodeType = "PRI" returnVal.NodeType = "PRI"
} else if newStat.Repl.Secondary.(bool) { } else if newStat.Repl.Secondary != nil && newStat.Repl.Secondary.(bool) {
returnVal.NodeType = "SEC" returnVal.NodeType = "SEC"
} else if newStat.Repl.ArbiterOnly != nil && newStat.Repl.ArbiterOnly.(bool) { } else if newStat.Repl.ArbiterOnly != nil && newStat.Repl.ArbiterOnly.(bool) {
returnVal.NodeType = "ARB" returnVal.NodeType = "ARB"

View File

@ -4,7 +4,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@ -32,6 +31,8 @@ type Prometheus struct {
SSLKey string `toml:"ssl_key"` SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification // Use SSL but skip chain & host verification
InsecureSkipVerify bool InsecureSkipVerify bool
client *http.Client
} }
var sampleConfig = ` var sampleConfig = `
@ -65,6 +66,14 @@ var ErrProtocolError = errors.New("prometheus protocol error")
// Reads stats from all configured servers accumulates stats. // Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any). // Returns one of the errors encountered while gather stats (if any).
func (p *Prometheus) Gather(acc telegraf.Accumulator) error { func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
if p.client == nil {
client, err := p.createHttpClient()
if err != nil {
return err
}
p.client = client
}
var wg sync.WaitGroup var wg sync.WaitGroup
for _, serv := range p.Urls { for _, serv := range p.Urls {
@ -89,29 +98,30 @@ var client = &http.Client{
Timeout: time.Duration(4 * time.Second), Timeout: time.Duration(4 * time.Second),
} }
func (p *Prometheus) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
DisableKeepAlives: true,
},
Timeout: p.ResponseTimeout.Duration,
}
return client, nil
}
func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
var req, err = http.NewRequest("GET", url, nil) var req, err = http.NewRequest("GET", url, nil)
req.Header.Add("Accept", acceptHeader) req.Header.Add("Accept", acceptHeader)
var token []byte var token []byte
var resp *http.Response var resp *http.Response
tlsCfg, err := internal.GetTLSConfig(
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify)
if err != nil {
return err
}
var rt http.RoundTripper = &http.Transport{
Dial: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: p.ResponseTimeout.Duration,
DisableKeepAlives: true,
}
if p.BearerToken != "" { if p.BearerToken != "" {
token, err = ioutil.ReadFile(p.BearerToken) token, err = ioutil.ReadFile(p.BearerToken)
if err != nil { if err != nil {
@ -120,7 +130,7 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
req.Header.Set("Authorization", "Bearer "+string(token)) req.Header.Set("Authorization", "Bearer "+string(token))
} }
resp, err = rt.RoundTrip(req) resp, err = p.client.Do(req)
if err != nil { if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err) return fmt.Errorf("error making HTTP request to %s: %s", url, err)
} }

View File

@ -0,0 +1,121 @@
# RabbitMQ Input Plugin
Reads metrics from RabbitMQ servers via the [Management Plugin](https://www.rabbitmq.com/management.html).
For additional details reference the [RabbitMQ Management HTTP Stats](https://cdn.rawgit.com/rabbitmq/rabbitmq-management/master/priv/www/doc/stats.html).
### Configuration:
```toml
[[inputs.rabbitmq]]
## Management Plugin url. (default: http://localhost:15672)
# url = "http://localhost:15672"
## Tag added to rabbitmq_overview series; deprecated: use tags
# name = "rmq-server-1"
## Credentials
# username = "guest"
# password = "guest"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## Optional request timeouts
##
## ResponseHeaderTimeout, if non-zero, specifies the amount of time to wait
## for a server's response headers after fully writing the request.
# header_timeout = "3s"
##
## client_timeout specifies a time limit for requests made by this client.
## Includes connection time, any redirects, and reading the response body.
# client_timeout = "4s"
## A list of nodes to pull metrics about. If not specified, metrics for
## all nodes are gathered.
# nodes = ["rabbit@node1", "rabbit@node2"]
```
### Measurements & Fields:
- rabbitmq_overview
- channels (int, channels)
- connections (int, connections)
- consumers (int, consumers)
- exchanges (int, exchanges)
- messages (int, messages)
- messages_acked (int, messages)
- messages_delivered (int, messages)
- messages_published (int, messages)
- messages_ready (int, messages)
- messages_unacked (int, messages)
- queues (int, queues)
- rabbitmq_node
- disk_free (int, bytes)
- disk_free_limit (int, bytes)
- fd_total (int, file descriptors)
- fd_used (int, file descriptors)
- mem_limit (int, bytes)
- mem_used (int, bytes)
- proc_total (int, erlang processes)
- proc_used (int, erlang processes)
- run_queue (int, erlang processes)
- sockets_total (int, sockets)
- sockets_used (int, sockets)
- rabbitmq_queue
- consumer_utilisation (float, percent)
- consumers (int, int)
- idle_since (string, time - e.g., "2006-01-02 15:04:05")
- memory (int, bytes)
- message_bytes (int, bytes)
- message_bytes_persist (int, bytes)
- message_bytes_ram (int, bytes)
- message_bytes_ready (int, bytes)
- message_bytes_unacked (int, bytes)
- messages (int, count)
- messages_ack (int, count)
- messages_ack_rate (float, messages per second)
- messages_deliver (int, count)
- messages_deliver_rate (float, messages per second)
- messages_deliver_get (int, count)
- messages_deliver_get_rate (float, messages per second)
- messages_publish (int, count)
- messages_publish_rate (float, messages per second)
- messages_ready (int, count)
- messages_redeliver (int, count)
- messages_redeliver_rate (float, messages per second)
- messages_unack (integer, count)
### Tags:
- All measurements have the following tags:
- url
- rabbitmq_overview
- name
- rabbitmq_node
- node
- rabbitmq_queue
- url
- queue
- vhost
- node
- durable
- auto_delete
### Sample Queries:
### Example Output:
```
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
```

View File

@ -140,8 +140,11 @@ type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
var sampleConfig = ` var sampleConfig = `
## Management Plugin url. (default: http://localhost:15672)
# url = "http://localhost:15672" # url = "http://localhost:15672"
# name = "rmq-server-1" # optional tag ## Tag added to rabbitmq_overview series; deprecated: use tags
# name = "rmq-server-1"
## Credentials
# username = "guest" # username = "guest"
# password = "guest" # password = "guest"
@ -174,7 +177,7 @@ func (r *RabbitMQ) SampleConfig() string {
// Description ... // Description ...
func (r *RabbitMQ) Description() string { func (r *RabbitMQ) Description() string {
return "Read metrics from one or many RabbitMQ servers via the management API" return "Reads metrics from RabbitMQ servers via the Management Plugin"
} }
// Gather ... // Gather ...

View File

@ -18,47 +18,103 @@
### Measurements & Fields: ### Measurements & Fields:
- Measurement The plugin gathers the results of the [INFO](https://redis.io/commands/info) redis command.
- uptime_in_seconds There are two separate measurements: _redis_ and _redis\_keyspace_, the latter is used for gathering database related statistics.
- connected_clients
- used_memory Additionally the plugin also calculates the hit/miss ratio (keyspace\_hitrate) and the elapsed time since the last rdb save (rdb\_last\_save\_time\_elapsed).
- used_memory_rss
- used_memory_peak - redis
- used_memory_lua - keyspace_hitrate(float, number)
- rdb_changes_since_last_save - rdb_last_save_time_elapsed(int, seconds)
- total_connections_received
- total_commands_processed **Server**
- instantaneous_ops_per_sec - uptime(int, seconds)
- instantaneous_input_kbps - lru_clock(int, number)
- instantaneous_output_kbps
- sync_full **Clients**
- sync_partial_ok - clients(int, number)
- sync_partial_err - client_longest_output_list(int, number)
- expired_keys - client_biggest_input_buf(int, number)
- evicted_keys - blocked_clients(int, number)
- keyspace_hits
- keyspace_misses **Memory**
- pubsub_channels - used_memory(int, bytes)
- pubsub_patterns - used_memory_rss(int, bytes)
- latest_fork_usec - used_memory_peak(int, bytes)
- connected_slaves - total_system_memory(int, bytes)
- master_repl_offset - used_memory_lua(int, bytes)
- master_last_io_seconds_ago - maxmemory(int, bytes)
- repl_backlog_active - maxmemory_policy(string)
- repl_backlog_size - mem_fragmentation_ratio(float, number)
- repl_backlog_histlen
- mem_fragmentation_ratio **Persistance**
- used_cpu_sys - loading(int,flag)
- used_cpu_user - rdb_changes_since_last_save(int, number)
- used_cpu_sys_children - rdb_bgsave_in_progress(int, flag)
- used_cpu_user_children - rdb_last_save_time(int, seconds)
- rdb_last_bgsave_status(string)
- rdb_last_bgsave_time_sec(int, seconds)
- rdb_current_bgsave_time_sec(int, seconds)
- aof_enabled(int, flag)
- aof_rewrite_in_progress(int, flag)
- aof_rewrite_scheduled(int, flag)
- aof_last_rewrite_time_sec(int, seconds)
- aof_current_rewrite_time_sec(int, seconds)
- aof_last_bgrewrite_status(string)
- aof_last_write_status(string)
**Stats**
- total_connections_received(int, number)
- total_commands_processed(int, number)
- instantaneous_ops_per_sec(int, number)
- total_net_input_bytes(int, bytes)
- total_net_output_bytes(int, bytes)
- instantaneous_input_kbps(float, bytes)
- instantaneous_output_kbps(float, bytes)
- rejected_connections(int, number)
- sync_full(int, number)
- sync_partial_ok(int, number)
- sync_partial_err(int, number)
- expired_keys(int, number)
- evicted_keys(int, number)
- keyspace_hits(int, number)
- keyspace_misses(int, number)
- pubsub_channels(int, number)
- pubsub_patterns(int, number)
- latest_fork_usec(int, microseconds)
- migrate_cached_sockets(int, number)
**Replication**
- connected_slaves(int, number)
- master_repl_offset(int, number)
- repl_backlog_active(int, number)
- repl_backlog_size(int, bytes)
- repl_backlog_first_byte_offset(int, number)
- repl_backlog_histlen(int, bytes)
**CPU**
- used_cpu_sys(float, number)
- used_cpu_user(float, number)
- used_cpu_sys_children(float, number)
- used_cpu_user_children(float, number)
**Cluster**
- cluster_enabled(int, flag)
- redis_keyspace
- keys(int, number)
- expires(int, number)
- avg_ttl(int, number)
### Tags: ### Tags:
- All measurements have the following tags: - All measurements have the following tags:
- port - port
- server - server
- replication role - replication_role
- The redis_keyspace measurement has an additional database tag:
- database
### Example Output: ### Example Output:
@ -84,5 +140,10 @@ When run with:
It produces: It produces:
``` ```
* Plugin: redis, Collection 1 * Plugin: redis, Collection 1
> redis,port=6379,server=localhost clients=1i,connected_slaves=0i,evicted_keys=0i,expired_keys=0i,instantaneous_ops_per_sec=0i,keyspace_hitrate=0,keyspace_hits=0i,keyspace_misses=2i,latest_fork_usec=0i,master_repl_offset=0i,mem_fragmentation_ratio=3.58,pubsub_channels=0i,pubsub_patterns=0i,rdb_changes_since_last_save=0i,repl_backlog_active=0i,repl_backlog_histlen=0i,repl_backlog_size=1048576i,sync_full=0i,sync_partial_err=0i,sync_partial_ok=0i,total_commands_processed=4i,total_connections_received=2i,uptime=869i,used_cpu_sys=0.07,used_cpu_sys_children=0,used_cpu_user=0.1,used_cpu_user_children=0,used_memory=502048i,used_memory_lua=33792i,used_memory_peak=501128i,used_memory_rss=1798144i 1457052084987848383 > redis,server=localhost,port=6379,replication_role=master,host=host keyspace_hitrate=1,clients=2i,blocked_clients=0i,instantaneous_input_kbps=0,sync_full=0i,pubsub_channels=0i,pubsub_patterns=0i,total_net_output_bytes=6659253i,used_memory=842448i,total_system_memory=8351916032i,aof_current_rewrite_time_sec=-1i,rdb_changes_since_last_save=0i,sync_partial_err=0i,latest_fork_usec=508i,instantaneous_output_kbps=0,expired_keys=0i,used_memory_peak=843416i,aof_rewrite_in_progress=0i,aof_last_bgrewrite_status="ok",migrate_cached_sockets=0i,connected_slaves=0i,maxmemory_policy="noeviction",aof_rewrite_scheduled=0i,total_net_input_bytes=3125i,used_memory_rss=9564160i,repl_backlog_histlen=0i,rdb_last_bgsave_status="ok",aof_last_rewrite_time_sec=-1i,keyspace_misses=0i,client_biggest_input_buf=5i,used_cpu_user=1.33,maxmemory=0i,rdb_current_bgsave_time_sec=-1i,total_commands_processed=271i,repl_backlog_size=1048576i,used_cpu_sys=3,uptime=2822i,lru_clock=16706281i,used_memory_lua=37888i,rejected_connections=0i,sync_partial_ok=0i,evicted_keys=0i,rdb_last_save_time_elapsed=1922i,rdb_last_save_time=1493099368i,instantaneous_ops_per_sec=0i,used_cpu_user_children=0,client_longest_output_list=0i,master_repl_offset=0i,repl_backlog_active=0i,keyspace_hits=2i,used_cpu_sys_children=0,cluster_enabled=0i,rdb_last_bgsave_time_sec=0i,aof_last_write_status="ok",total_connections_received=263i,aof_enabled=0i,repl_backlog_first_byte_offset=0i,mem_fragmentation_ratio=11.35,loading=0i,rdb_bgsave_in_progress=0i 1493101290000000000
```
redis_keyspace:
```
> redis_keyspace,database=db1,host=host,server=localhost,port=6379,replication_role=master keys=1i,expires=0i,avg_ttl=0i 1493101350000000000
``` ```

View File

@ -843,7 +843,7 @@ FROM (SELECT DISTINCT DatabaseName FROM #Databases) AS bl
SET @DynamicPivotQuery = N' SET @DynamicPivotQuery = N'
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -856,7 +856,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -869,7 +869,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -883,7 +883,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -896,7 +896,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -909,7 +909,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -922,7 +922,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -935,7 +935,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -948,7 +948,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)
@ -961,7 +961,7 @@ UNION ALL
SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'') SELECT measurement = Measurement, servername = REPLACE(@@SERVERNAME, ''\'', '':'')
, type = ''Database properties'' , type = ''Database properties''
, ' + @ColumnName + ', total FROM , ' + @ColumnName + ', Total FROM
( (
SELECT Measurement, DatabaseName, Value SELECT Measurement, DatabaseName, Value
, Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement) , Total = (SELECT SUM(Value) FROM #Databases WHERE Measurement = d.Measurement)

View File

@ -12,6 +12,7 @@ import (
"path/filepath" "path/filepath"
"runtime" "runtime"
"strconv" "strconv"
"syscall"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
@ -195,6 +196,13 @@ func readProcFile(filename string) ([]byte, error) {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, nil return nil, nil
} }
// Reading from /proc/<PID> fails with ESRCH if the process has
// been terminated between open() and read().
if perr, ok := err.(*os.PathError); ok && perr.Err == syscall.ESRCH {
return nil, nil
}
return nil, err return nil, err
} }

View File

@ -2,6 +2,7 @@ package tail
import ( import (
"fmt" "fmt"
"strings"
"sync" "sync"
"github.com/influxdata/tail" "github.com/influxdata/tail"
@ -123,7 +124,10 @@ func (t *Tail) receiver(tailer *tail.Tail) {
tailer.Filename, err)) tailer.Filename, err))
continue continue
} }
m, err = t.parser.ParseLine(line.Text) // Fix up files with Windows line endings.
text := strings.TrimRight(line.Text, "\r")
m, err = t.parser.ParseLine(text)
if err == nil { if err == nil {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
} else { } else {

View File

@ -103,3 +103,33 @@ func TestTailBadLine(t *testing.T) {
acc.WaitError(1) acc.WaitError(1)
assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line") assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line")
} }
func TestTailDosLineendings(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString("cpu usage_idle=100\r\ncpu2 usage_idle=200\r\n")
require.NoError(t, err)
tt := NewTail()
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
p, _ := parsers.NewInfluxParser()
tt.SetParser(p)
defer tt.Stop()
defer tmpfile.Close()
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc))
require.NoError(t, acc.GatherError(tt.Gather))
acc.Wait(2)
acc.AssertContainsFields(t, "cpu",
map[string]interface{}{
"usage_idle": float64(100),
})
acc.AssertContainsFields(t, "cpu2",
map[string]interface{}{
"usage_idle": float64(200),
})
}

View File

@ -18,7 +18,8 @@ This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP.
## The target database for metrics (telegraf will create it if not exists). ## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp. ## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = "" retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all" ## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any" write_consistency = "any"
@ -52,7 +53,7 @@ to write to. Each URL should start with either `http://` or `udp://`
### Optional parameters: ### Optional parameters:
* `write_consistency`: Write consistency (clusters only), can be: "any", "one", "quorum", "all". * `write_consistency`: Write consistency (clusters only), can be: "any", "one", "quorum", "all".
* `retention_policy`: Retention policy to write to. * `retention_policy`: Name of existing retention policy to write to. Empty string writes to the default retention policy.
* `timeout`: Write timeout (for the InfluxDB client), formatted as a string. If not provided, will default to 5s. 0s means no timeout (not recommended). * `timeout`: Write timeout (for the InfluxDB client), formatted as a string. If not provided, will default to 5s. 0s means no timeout (not recommended).
* `username`: Username for influxdb * `username`: Username for influxdb
* `password`: Password for influxdb * `password`: Password for influxdb

View File

@ -16,7 +16,6 @@ var (
defaultRequestTimeout = time.Second * 5 defaultRequestTimeout = time.Second * 5
) )
//
func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) { func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
// validate required parameters: // validate required parameters:
if len(config.URL) == 0 { if len(config.URL) == 0 {

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"log"
"net" "net"
"net/url" "net/url"
) )
@ -25,6 +26,7 @@ type UDPConfig struct {
PayloadSize int PayloadSize int
} }
// NewUDP will return an instance of the telegraf UDP output plugin for influxdb
func NewUDP(config UDPConfig) (Client, error) { func NewUDP(config UDPConfig) (Client, error) {
p, err := url.Parse(config.URL) p, err := url.Parse(config.URL)
if err != nil { if err != nil {
@ -55,20 +57,22 @@ type udpClient struct {
buffer []byte buffer []byte
} }
// Query will send the provided query command to the client, returning an error if any issues arise
func (c *udpClient) Query(command string) error { func (c *udpClient) Query(command string) error {
return nil return nil
} }
// Write will send the byte stream to the given UDP client endpoint
func (c *udpClient) Write(b []byte) (int, error) { func (c *udpClient) Write(b []byte) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1) return c.WriteStream(bytes.NewReader(b), -1)
} }
// write params are ignored by the UDP client // WriteWithParams are ignored by the UDP client, will forward to WriteStream
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) { func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1) return c.WriteStream(bytes.NewReader(b), -1)
} }
// contentLength is ignored by the UDP client. // WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) { func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
var totaln int var totaln int
for { for {
@ -79,21 +83,40 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
if err != io.EOF && err != nil { if err != io.EOF && err != nil {
return totaln, err return totaln, err
} }
nW, err := c.conn.Write(c.buffer[0:nR])
totaln += nW if c.buffer[nR-1] == uint8('\n') {
if err != nil { nW, err := c.conn.Write(c.buffer[0:nR])
return totaln, err totaln += nW
if err != nil {
return totaln, err
}
} else {
log.Printf("E! Could not fit point into UDP payload; dropping")
// Scan forward until next line break to realign.
for {
nR, err := r.Read(c.buffer)
if nR == 0 {
break
}
if err != io.EOF && err != nil {
return totaln, err
}
if c.buffer[nR-1] == uint8('\n') {
break
}
}
} }
} }
return totaln, nil return totaln, nil
} }
// contentLength is ignored by the UDP client. // WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client
// write params are ignored by the UDP client // write params are ignored by the UDP client
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) { func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
return c.WriteStream(r, -1) return c.WriteStream(r, -1)
} }
// Close will terminate the provided client connection
func (c *udpClient) Close() error { func (c *udpClient) Close() error {
return c.conn.Close() return c.conn.Close()
} }

View File

@ -1,7 +1,6 @@
package client package client
import ( import (
"bytes"
"net" "net"
"testing" "testing"
"time" "time"
@ -10,6 +9,7 @@ import (
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestUDPClient(t *testing.T) { func TestUDPClient(t *testing.T) {
@ -72,63 +72,7 @@ func TestUDPClient_Write(t *testing.T) {
pkt := <-packets pkt := <-packets
assert.Equal(t, "cpu value=99\n", pkt) assert.Equal(t, "cpu value=99\n", pkt)
metrics := `cpu value=99
cpu value=55
cpu value=44
cpu value=101
cpu value=91
cpu value=92
`
// test sending packet with 6 metrics in a stream.
reader := bytes.NewReader([]byte(metrics))
// contentLength is ignored:
n, err = client.WriteStream(reader, 10)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55\ncpu value=44\ncpu value=101\ncpu value=91\ncpu value=92\n", pkt)
//
// Test that UDP packets get broken up properly:
config2 := UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 25,
}
client2, err := NewUDP(config2)
assert.NoError(t, err)
wp := WriteParams{} wp := WriteParams{}
//
// Using Write():
buf := []byte(metrics)
n, err = client2.WriteWithParams(buf, wp)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
pkt = <-packets
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
pkt = <-packets
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
pkt = <-packets
assert.Equal(t, "=92\n", pkt)
//
// Using WriteStream():
reader = bytes.NewReader([]byte(metrics))
n, err = client2.WriteStreamWithParams(reader, 10, wp)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
pkt = <-packets
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
pkt = <-packets
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
pkt = <-packets
assert.Equal(t, "=92\n", pkt)
// //
// Using WriteStream() & a metric.Reader: // Using WriteStream() & a metric.Reader:
config3 := UDPConfig{ config3 := UDPConfig{
@ -159,4 +103,27 @@ cpu value=92
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt) assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
assert.NoError(t, client.Close()) assert.NoError(t, client.Close())
config = UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 40,
}
client4, err := NewUDP(config)
assert.NoError(t, err)
ts := time.Unix(1484142943, 0)
m1, _ = metric.New("test", map[string]string{},
map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts)
m2, _ = metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, ts)
ms = []telegraf.Metric{m1, m2}
reader := metric.NewReader(ms)
n, err = client4.WriteStream(reader, 0)
assert.NoError(t, err)
require.Equal(t, 35, n)
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt)
assert.NoError(t, client4.Close())
} }

View File

@ -15,6 +15,12 @@ import (
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client" "github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
) )
var (
// Quote Ident replacer.
qiReplacer = strings.NewReplacer("\n", `\n`, `\`, `\\`, `"`, `\"`)
)
// InfluxDB struct is the primary data structure for the plugin
type InfluxDB struct { type InfluxDB struct {
// URL is only for backwards compatability // URL is only for backwards compatability
URL string URL string
@ -55,7 +61,8 @@ var sampleConfig = `
## The target database for metrics (telegraf will create it if not exists). ## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp. ## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
retention_policy = "" retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all" ## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any" write_consistency = "any"
@ -78,11 +85,10 @@ var sampleConfig = `
# insecure_skip_verify = false # insecure_skip_verify = false
` `
// Connect initiates the primary connection to the range of provided URLs
func (i *InfluxDB) Connect() error { func (i *InfluxDB) Connect() error {
var urls []string var urls []string
for _, u := range i.URLs { urls = append(urls, i.URLs...)
urls = append(urls, u)
}
// Backward-compatability with single Influx URL config files // Backward-compatability with single Influx URL config files
// This could eventually be removed in favor of specifying the urls as a list // This could eventually be removed in favor of specifying the urls as a list
@ -129,9 +135,11 @@ func (i *InfluxDB) Connect() error {
} }
i.clients = append(i.clients, c) i.clients = append(i.clients, c)
err = c.Query("CREATE DATABASE " + i.Database) err = c.Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
if err != nil { if err != nil {
log.Println("E! Database creation failed: " + err.Error()) if !strings.Contains(err.Error(), "Status Code [403]") {
log.Println("I! Database creation failed: " + err.Error())
}
continue continue
} }
} }
@ -141,25 +149,30 @@ func (i *InfluxDB) Connect() error {
return nil return nil
} }
// Close will terminate the session to the backend, returning error if an issue arises
func (i *InfluxDB) Close() error { func (i *InfluxDB) Close() error {
return nil return nil
} }
// SampleConfig returns the formatted sample configuration for the plugin
func (i *InfluxDB) SampleConfig() string { func (i *InfluxDB) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Description returns the human-readable function definition of the plugin
func (i *InfluxDB) Description() string { func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to" return "Configuration for influxdb server to send metrics to"
} }
// Choose a random server in the cluster to write to until a successful write // Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error. // occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error { func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
bufsize := 0 bufsize := 0
for _, m := range metrics { for _, m := range metrics {
bufsize += m.Len() bufsize += m.Len()
} }
r := metric.NewReader(metrics) r := metric.NewReader(metrics)
// This will get set to nil if a successful write occurs // This will get set to nil if a successful write occurs
@ -170,7 +183,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil { if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
// If the database was not found, try to recreate it: // If the database was not found, try to recreate it:
if strings.Contains(e.Error(), "database not found") { if strings.Contains(e.Error(), "database not found") {
if errc := i.clients[n].Query("CREATE DATABASE " + i.Database); errc != nil { errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
if errc != nil {
log.Printf("E! Error: Database %s not found and failed to recreate\n", log.Printf("E! Error: Database %s not found and failed to recreate\n",
i.Database) i.Database)
} }

View File

@ -2,15 +2,52 @@ package influxdb
import ( import (
"fmt" "fmt"
"io"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestIdentQuoting(t *testing.T) {
var testCases = []struct {
database string
expected string
}{
{"x-y", `CREATE DATABASE "x-y"`},
{`x"y`, `CREATE DATABASE "x\"y"`},
{"x\ny", `CREATE DATABASE "x\ny"`},
{`x\y`, `CREATE DATABASE "x\\y"`},
}
for _, tc := range testCases {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
q := r.Form.Get("q")
assert.Equal(t, tc.expected, q)
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
}))
defer ts.Close()
i := InfluxDB{
URLs: []string{ts.URL},
Database: tc.database,
}
err := i.Connect()
require.NoError(t, err)
require.NoError(t, i.Close())
}
}
func TestUDPInflux(t *testing.T) { func TestUDPInflux(t *testing.T) {
i := InfluxDB{ i := InfluxDB{
URLs: []string{"udp://localhost:8089"}, URLs: []string{"udp://localhost:8089"},
@ -164,3 +201,34 @@ func TestHTTPError_FieldTypeConflict(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, i.Close()) require.NoError(t, i.Close())
} }
type MockClient struct {
writeStreamCalled int
contentLength int
}
func (m *MockClient) Query(command string) error {
panic("not implemented")
}
func (m *MockClient) Write(b []byte) (int, error) {
panic("not implemented")
}
func (m *MockClient) WriteWithParams(b []byte, params client.WriteParams) (int, error) {
panic("not implemented")
}
func (m *MockClient) WriteStream(b io.Reader, contentLength int) (int, error) {
m.writeStreamCalled++
m.contentLength = contentLength
return 0, nil
}
func (m *MockClient) WriteStreamWithParams(b io.Reader, contentLength int, params client.WriteParams) (int, error) {
panic("not implemented")
}
func (m *MockClient) Close() error {
panic("not implemented")
}

View File

@ -6,6 +6,8 @@ import (
"log" "log"
"net/http" "net/http"
"regexp" "regexp"
"sort"
"strings"
"sync" "sync"
"time" "time"
@ -17,19 +19,40 @@ import (
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
type MetricWithExpiration struct { // SampleID uniquely identifies a Sample
Metric prometheus.Metric type SampleID string
// Sample represents the current value of a series.
type Sample struct {
// Labels are the Prometheus labels.
Labels map[string]string
// Value is the value in the Prometheus output.
Value float64
// Expiration is the deadline that this Sample is valid until.
Expiration time.Time Expiration time.Time
} }
// MetricFamily contains the data required to build valid prometheus Metrics.
type MetricFamily struct {
// Samples are the Sample belonging to this MetricFamily.
Samples map[SampleID]*Sample
// Type of the Value.
ValueType prometheus.ValueType
// LabelSet is the label counts for all Samples.
LabelSet map[string]int
}
type PrometheusClient struct { type PrometheusClient struct {
Listen string Listen string
ExpirationInterval internal.Duration `toml:"expiration_interval"` ExpirationInterval internal.Duration `toml:"expiration_interval"`
server *http.Server
metrics map[string]*MetricWithExpiration server *http.Server
sync.Mutex sync.Mutex
// fam is the non-expired MetricFamily by Prometheus metric name.
fam map[string]*MetricFamily
// now returns the current time.
now func() time.Time
} }
var sampleConfig = ` var sampleConfig = `
@ -41,7 +64,6 @@ var sampleConfig = `
` `
func (p *PrometheusClient) Start() error { func (p *PrometheusClient) Start() error {
p.metrics = make(map[string]*MetricWithExpiration)
prometheus.Register(p) prometheus.Register(p)
if p.Listen == "" { if p.Listen == "" {
@ -88,96 +110,153 @@ func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch) prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
} }
// Implements prometheus.Collector // Expire removes Samples that have expired.
func (p *PrometheusClient) Expire() {
now := p.now()
for name, family := range p.fam {
for key, sample := range family.Samples {
if p.ExpirationInterval.Duration != 0 && now.After(sample.Expiration) {
for k, _ := range sample.Labels {
family.LabelSet[k]--
}
delete(family.Samples, key)
if len(family.Samples) == 0 {
delete(p.fam, name)
}
}
}
}
}
// Collect implements prometheus.Collector
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) { func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
for key, m := range p.metrics { p.Expire()
if p.ExpirationInterval.Duration != 0 && time.Now().After(m.Expiration) {
delete(p.metrics, key) for name, family := range p.fam {
} else { // Get list of all labels on MetricFamily
ch <- m.Metric var labelNames []string
for k, v := range family.LabelSet {
if v > 0 {
labelNames = append(labelNames, k)
}
}
desc := prometheus.NewDesc(name, "Telegraf collected metric", labelNames, nil)
for _, sample := range family.Samples {
// Get labels for this sample; unset labels will be set to the
// empty string
var labels []string
for _, label := range labelNames {
v := sample.Labels[label]
labels = append(labels, v)
}
metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...)
if err != nil {
log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
name, labels, err.Error())
}
ch <- metric
} }
} }
} }
func sanitize(value string) string {
return invalidNameCharRE.ReplaceAllString(value, "_")
}
func valueType(tt telegraf.ValueType) prometheus.ValueType {
switch tt {
case telegraf.Counter:
return prometheus.CounterValue
case telegraf.Gauge:
return prometheus.GaugeValue
default:
return prometheus.UntypedValue
}
}
// CreateSampleID creates a SampleID based on the tags of a telegraf.Metric.
func CreateSampleID(tags map[string]string) SampleID {
pairs := make([]string, 0, len(tags))
for k, v := range tags {
pairs = append(pairs, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(pairs)
return SampleID(strings.Join(pairs, ","))
}
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
if len(metrics) == 0 { now := p.now()
return nil
}
for _, point := range metrics { for _, point := range metrics {
key := point.Name() tags := point.Tags()
key = invalidNameCharRE.ReplaceAllString(key, "_") vt := valueType(point.Type())
sampleID := CreateSampleID(tags)
// convert tags into prometheus labels labels := make(map[string]string)
var labels []string for k, v := range tags {
l := prometheus.Labels{} labels[sanitize(k)] = v
for k, v := range point.Tags() {
k = invalidNameCharRE.ReplaceAllString(k, "_")
if len(k) == 0 {
continue
}
labels = append(labels, k)
l[k] = v
} }
// Get a type if it's available, defaulting to Untyped for fn, fv := range point.Fields() {
var mType prometheus.ValueType
switch point.Type() {
case telegraf.Counter:
mType = prometheus.CounterValue
case telegraf.Gauge:
mType = prometheus.GaugeValue
default:
mType = prometheus.UntypedValue
}
for n, val := range point.Fields() {
// Ignore string and bool fields. // Ignore string and bool fields.
switch val.(type) { var value float64
case string: switch fv := fv.(type) {
continue
case bool:
continue
}
// sanitize the measurement name
n = invalidNameCharRE.ReplaceAllString(n, "_")
var mname string
if n == "value" {
mname = key
} else {
mname = fmt.Sprintf("%s_%s", key, n)
}
desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
var metric prometheus.Metric
var err error
// switch for field type
switch val := val.(type) {
case int64: case int64:
metric, err = prometheus.NewConstMetric(desc, mType, float64(val)) value = float64(fv)
case float64: case float64:
metric, err = prometheus.NewConstMetric(desc, mType, val) value = fv
default: default:
continue continue
} }
if err != nil {
log.Printf("E! Error creating prometheus metric, "+ sample := &Sample{
"key: %s, labels: %v,\nerr: %s\n", Labels: labels,
mname, l, err.Error()) Value: value,
Expiration: now.Add(p.ExpirationInterval.Duration),
} }
p.metrics[desc.String()] = &MetricWithExpiration{ // Special handling of value field; supports passthrough from
Metric: metric, // the prometheus input.
Expiration: time.Now().Add(p.ExpirationInterval.Duration), var mname string
if fn == "value" {
mname = sanitize(point.Name())
} else {
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
} }
var fam *MetricFamily
var ok bool
if fam, ok = p.fam[mname]; !ok {
fam = &MetricFamily{
Samples: make(map[SampleID]*Sample),
ValueType: vt,
LabelSet: make(map[string]int),
}
p.fam[mname] = fam
} else {
if fam.ValueType != vt {
// Don't return an error since this would be a permanent error
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
break
}
}
for k, _ := range sample.Labels {
fam.LabelSet[k]++
}
fam.Samples[sampleID] = sample
} }
} }
return nil return nil
@ -187,6 +266,8 @@ func init() {
outputs.Add("prometheus_client", func() telegraf.Output { outputs.Add("prometheus_client", func() telegraf.Output {
return &PrometheusClient{ return &PrometheusClient{
ExpirationInterval: internal.Duration{Duration: time.Second * 60}, ExpirationInterval: internal.Duration{Duration: time.Second * 60},
fam: make(map[string]*MetricFamily),
now: time.Now,
} }
}) })
} }

View File

@ -4,16 +4,314 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs/prometheus" prometheus_input "github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
) )
func setUnixTime(client *PrometheusClient, sec int64) {
client.now = func() time.Time {
return time.Unix(sec, 0)
}
}
// NewClient initializes a PrometheusClient.
func NewClient() *PrometheusClient {
return &PrometheusClient{
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
fam: make(map[string]*MetricFamily),
now: time.Now,
}
}
func TestWrite_Basic(t *testing.T) {
now := time.Now()
pt1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 0.0},
now)
var metrics = []telegraf.Metric{
pt1,
}
client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
require.Equal(t, map[string]int{}, fam.LabelSet)
sample, ok := fam.Samples[CreateSampleID(pt1.Tags())]
require.True(t, ok)
require.Equal(t, 0.0, sample.Value)
require.True(t, now.Before(sample.Expiration))
}
func TestWrite_IntField(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 42},
time.Now())
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
for _, v := range fam.Samples {
require.Equal(t, 42.0, v.Value)
}
}
func TestWrite_FieldNotValue(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"howdy": 0.0},
time.Now())
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo_howdy"]
require.True(t, ok)
for _, v := range fam.Samples {
require.Equal(t, 0.0, v.Value)
}
}
func TestWrite_SkipNonNumberField(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": "howdy"},
time.Now())
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
_, ok := client.fam["foo"]
require.False(t, ok)
}
func TestWrite_Counter(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Counter)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.CounterValue, fam.ValueType)
}
func TestWrite_Sanitize(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo.bar",
map[string]string{"tag-with-dash": "localhost.local"},
map[string]interface{}{"field-with-dash": 42},
time.Now(),
telegraf.Counter)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo_bar_field_with_dash"]
require.True(t, ok)
require.Equal(t, map[string]int{"tag_with_dash": 1}, fam.LabelSet)
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
require.True(t, ok)
require.Equal(t, map[string]string{
"tag_with_dash": "localhost.local"}, sample1.Labels)
}
func TestWrite_Gauge(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Gauge)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.GaugeValue, fam.ValueType)
}
func TestWrite_MixedValueType(t *testing.T) {
now := time.Now()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
now,
telegraf.Counter)
p2, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 2.0},
now,
telegraf.Gauge)
var metrics = []telegraf.Metric{p1, p2}
client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 1, len(fam.Samples))
}
func TestWrite_Tags(t *testing.T) {
now := time.Now()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
now)
p2, err := metric.New(
"foo",
map[string]string{"host": "localhost"},
map[string]interface{}{"value": 2.0},
now)
var metrics = []telegraf.Metric{p1, p2}
client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
require.True(t, ok)
require.Equal(t, 1.0, sample1.Value)
require.True(t, now.Before(sample1.Expiration))
sample2, ok := fam.Samples[CreateSampleID(p2.Tags())]
require.True(t, ok)
require.Equal(t, 2.0, sample2.Value)
require.True(t, now.Before(sample2.Expiration))
}
func TestExpire(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
time.Now())
setUnixTime(client, 0)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
p2, err := metric.New(
"bar",
make(map[string]string),
map[string]interface{}{"value": 2.0},
time.Now())
setUnixTime(client, 1)
err = client.Write([]telegraf.Metric{p2})
setUnixTime(client, 61)
require.Equal(t, 2, len(client.fam))
client.Expire()
require.Equal(t, 1, len(client.fam))
}
func TestExpire_TagsNoDecrement(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
time.Now())
setUnixTime(client, 0)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
p2, err := metric.New(
"foo",
map[string]string{"host": "localhost"},
map[string]interface{}{"value": 2.0},
time.Now())
setUnixTime(client, 1)
err = client.Write([]telegraf.Metric{p2})
setUnixTime(client, 61)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 2, len(fam.Samples))
client.Expire()
require.Equal(t, 1, len(fam.Samples))
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
}
func TestExpire_TagsWithDecrement(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
map[string]string{"host": "localhost"},
map[string]interface{}{"value": 1.0},
time.Now())
setUnixTime(client, 0)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
p2, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 2.0},
time.Now())
setUnixTime(client, 1)
err = client.Write([]telegraf.Metric{p2})
setUnixTime(client, 61)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 2, len(fam.Samples))
client.Expire()
require.Equal(t, 1, len(fam.Samples))
require.Equal(t, map[string]int{"host": 0}, fam.LabelSet)
}
var pTesting *PrometheusClient var pTesting *PrometheusClient
func TestPrometheusWritePointEmptyTag(t *testing.T) { func TestPrometheusWritePointEmptyTag(t *testing.T) {
@ -93,74 +391,21 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
} }
} }
func TestPrometheusExpireOldMetrics(t *testing.T) { func setupPrometheus() (*PrometheusClient, *prometheus_input.Prometheus, error) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
pClient, p, err := setupPrometheus()
pClient.ExpirationInterval = internal.Duration{Duration: time.Second * 10}
require.NoError(t, err)
defer pClient.Stop()
now := time.Now()
tags := make(map[string]string)
pt1, _ := metric.New(
"test_point_1",
tags,
map[string]interface{}{"value": 0.0},
now)
var metrics = []telegraf.Metric{pt1}
require.NoError(t, pClient.Write(metrics))
for _, m := range pClient.metrics {
m.Expiration = now.Add(time.Duration(-15) * time.Second)
}
pt2, _ := metric.New(
"test_point_2",
tags,
map[string]interface{}{"value": 1.0},
now)
var metrics2 = []telegraf.Metric{pt2}
require.NoError(t, pClient.Write(metrics2))
expected := []struct {
name string
value float64
tags map[string]string
}{
{"test_point_2", 1.0, tags},
}
var acc testutil.Accumulator
require.NoError(t, p.Gather(&acc))
for _, e := range expected {
acc.AssertContainsFields(t, e.name,
map[string]interface{}{"value": e.value})
}
acc.AssertDoesNotContainMeasurement(t, "test_point_1")
// Confirm that it's not in the PrometheusClient map anymore
assert.Equal(t, 1, len(pClient.metrics))
}
func setupPrometheus() (*PrometheusClient, *prometheus.Prometheus, error) {
if pTesting == nil { if pTesting == nil {
pTesting = &PrometheusClient{Listen: "localhost:9127"} pTesting = NewClient()
pTesting.Listen = "localhost:9127"
err := pTesting.Start() err := pTesting.Start()
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
} else { } else {
pTesting.metrics = make(map[string]*MetricWithExpiration) pTesting.fam = make(map[string]*MetricFamily)
} }
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
p := &prometheus.Prometheus{ p := &prometheus_input.Prometheus{
Urls: []string{"http://localhost:9127/metrics"}, Urls: []string{"http://localhost:9127/metrics"},
} }

View File

@ -124,6 +124,16 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
return nil return nil
} }
// Close closes the connection. Noop if already closed.
func (sw *SocketWriter) Close() error {
if sw.Conn == nil {
return nil
}
err := sw.Conn.Close()
sw.Conn = nil
return err
}
func newSocketWriter() *SocketWriter { func newSocketWriter() *SocketWriter {
s, _ := serializers.NewInfluxSerializer() s, _ := serializers.NewInfluxSerializer()
return &SocketWriter{ return &SocketWriter{

View File

@ -143,7 +143,7 @@ func TestSocketWriter_Write_err(t *testing.T) {
// close the socket to generate an error // close the socket to generate an error
lconn.Close() lconn.Close()
sw.Close() sw.Conn.Close()
err = sw.Write(metrics) err = sw.Write(metrics)
require.Error(t, err) require.Error(t, err)
assert.Nil(t, sw.Conn) assert.Nil(t, sw.Conn)

View File

@ -499,13 +499,12 @@ def build(version=None,
logging.info("Time taken: {}s".format((end_time - start_time).total_seconds())) logging.info("Time taken: {}s".format((end_time - start_time).total_seconds()))
return True return True
def generate_md5_from_file(path): def generate_sha256_from_file(path):
"""Generate MD5 signature based on the contents of the file at path. """Generate SHA256 hash signature based on the contents of the file at path.
""" """
m = hashlib.md5() m = hashlib.sha256()
with open(path, 'rb') as f: with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""): m.update(f.read())
m.update(chunk)
return m.hexdigest() return m.hexdigest()
def generate_sig_from_file(path): def generate_sig_from_file(path):
@ -636,6 +635,10 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
elif package_type not in ['zip', 'tar'] and static or "static_" in arch: elif package_type not in ['zip', 'tar'] and static or "static_" in arch:
logging.info("Skipping package type '{}' for static builds.".format(package_type)) logging.info("Skipping package type '{}' for static builds.".format(package_type))
else: else:
if package_type == 'rpm' and release and '~' in package_version:
package_version, suffix = package_version.split('~', 1)
# The ~ indicatees that this is a prerelease so we give it a leading 0.
package_iteration = "0.%s" % suffix
fpm_command = "fpm {} --name {} -a {} -t {} --version {} --iteration {} -C {} -p {} ".format( fpm_command = "fpm {} --name {} -a {} -t {} --version {} --iteration {} -C {} -p {} ".format(
fpm_common_args, fpm_common_args,
name, name,
@ -664,9 +667,6 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
if package_type == 'rpm': if package_type == 'rpm':
# rpm's convert any dashes to underscores # rpm's convert any dashes to underscores
package_version = package_version.replace("-", "_") package_version = package_version.replace("-", "_")
new_outfile = outfile.replace("{}-{}".format(package_version, package_iteration), package_version)
os.rename(outfile, new_outfile)
outfile = new_outfile
outfiles.append(os.path.join(os.getcwd(), outfile)) outfiles.append(os.path.join(os.getcwd(), outfile))
logging.debug("Produced package files: {}".format(outfiles)) logging.debug("Produced package files: {}".format(outfiles))
return outfiles return outfiles
@ -789,9 +789,10 @@ def main(args):
if not upload_packages(packages, bucket_name=args.bucket, overwrite=args.upload_overwrite): if not upload_packages(packages, bucket_name=args.bucket, overwrite=args.upload_overwrite):
return 1 return 1
logging.info("Packages created:") logging.info("Packages created:")
for p in packages: for filename in packages:
logging.info("{} (MD5={})".format(p.split('/')[-1:][0], logging.info("%s (SHA256=%s)",
generate_md5_from_file(p))) os.path.basename(filename),
generate_sha256_from_file(filename))
if orig_branch != get_current_branch(): if orig_branch != get_current_branch():
logging.info("Moving back to original git branch: {}".format(args.branch)) logging.info("Moving back to original git branch: {}".format(args.branch))
run("git checkout {}".format(orig_branch)) run("git checkout {}".format(orig_branch))

View File

@ -135,7 +135,9 @@ case $1 in
fi fi
log_success_msg "Starting the process" "$name" log_success_msg "Starting the process" "$name"
if which start-stop-daemon > /dev/null 2>&1; then if command -v startproc >/dev/null; then
startproc -u "$USER" -g "$GROUP" -p "$pidfile" -q -- "$daemon" -pidfile "$pidfile" -config "$config" -config-directory "$confdir" $TELEGRAF_OPTS
elif which start-stop-daemon > /dev/null 2>&1; then
start-stop-daemon --chuid $USER:$GROUP --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR & start-stop-daemon --chuid $USER:$GROUP --start --quiet --pidfile $pidfile --exec $daemon -- -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &
else else
su -s /bin/sh -c "nohup $daemon -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &" $USER su -s /bin/sh -c "nohup $daemon -pidfile $pidfile -config $config -config-directory $confdir $TELEGRAF_OPTS >>$STDOUT 2>>$STDERR &" $USER

View File

@ -11,7 +11,7 @@ function install_init {
} }
function install_systemd { function install_systemd {
cp -f $SCRIPT_DIR/telegraf.service /lib/systemd/system/telegraf.service cp -f $SCRIPT_DIR/telegraf.service $1
systemctl enable telegraf || true systemctl enable telegraf || true
systemctl daemon-reload || true systemctl daemon-reload || true
} }
@ -24,12 +24,12 @@ function install_chkconfig {
chkconfig --add telegraf chkconfig --add telegraf
} }
if ! grep "^telegraf:" /etc/group &>/dev/null; then
groupadd -r telegraf
fi
if ! id telegraf &>/dev/null; then if ! id telegraf &>/dev/null; then
if ! grep "^telegraf:" /etc/group &>/dev/null; then useradd -r -M telegraf -s /bin/false -d /etc/telegraf -g telegraf
useradd -r -K USERGROUPS_ENAB=yes -M telegraf -s /bin/false -d /etc/telegraf
else
useradd -r -K USERGROUPS_ENAB=yes -M telegraf -s /bin/false -d /etc/telegraf -g telegraf
fi
fi fi
test -d $LOG_DIR || mkdir -p $LOG_DIR test -d $LOG_DIR || mkdir -p $LOG_DIR
@ -56,10 +56,10 @@ if [[ ! -d /etc/telegraf/telegraf.d ]]; then
fi fi
# Distribution-specific logic # Distribution-specific logic
if [[ -f /etc/redhat-release ]]; then if [[ -f /etc/redhat-release ]] || [[ -f /etc/SuSE-release ]]; then
# RHEL-variant logic # RHEL-variant logic
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
install_systemd install_systemd /usr/lib/systemd/system/telegraf.service
else else
# Assuming SysVinit # Assuming SysVinit
install_init install_init
@ -73,10 +73,10 @@ if [[ -f /etc/redhat-release ]]; then
elif [[ -f /etc/debian_version ]]; then elif [[ -f /etc/debian_version ]]; then
# Debian/Ubuntu logic # Debian/Ubuntu logic
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
install_systemd install_systemd /lib/systemd/system/telegraf.service
systemctl restart telegraf || echo "WARNING: systemd not running." systemctl restart telegraf || echo "WARNING: systemd not running."
else else
# Assuming SysVinit # Assuming SysVinit
install_init install_init
# Run update-rc.d or fallback to chkconfig if not available # Run update-rc.d or fallback to chkconfig if not available
if which update-rc.d &>/dev/null; then if which update-rc.d &>/dev/null; then
@ -89,7 +89,7 @@ elif [[ -f /etc/debian_version ]]; then
elif [[ -f /etc/os-release ]]; then elif [[ -f /etc/os-release ]]; then
source /etc/os-release source /etc/os-release
if [[ $ID = "amzn" ]]; then if [[ $ID = "amzn" ]]; then
# Amazon Linux logic # Amazon Linux logic
install_init install_init
# Run update-rc.d or fallback to chkconfig if not available # Run update-rc.d or fallback to chkconfig if not available
if which update-rc.d &>/dev/null; then if which update-rc.d &>/dev/null; then
@ -97,5 +97,6 @@ elif [[ -f /etc/os-release ]]; then
else else
install_chkconfig install_chkconfig
fi fi
/etc/init.d/telegraf restart
fi fi
fi fi

View File

@ -2,7 +2,7 @@
function disable_systemd { function disable_systemd {
systemctl disable telegraf systemctl disable telegraf
rm -f /lib/systemd/system/telegraf.service rm -f $1
} }
function disable_update_rcd { function disable_update_rcd {
@ -15,14 +15,14 @@ function disable_chkconfig {
rm -f /etc/init.d/telegraf rm -f /etc/init.d/telegraf
} }
if [[ -f /etc/redhat-release ]]; then if [[ -f /etc/redhat-release ]] || [[ -f /etc/SuSE-release ]]; then
# RHEL-variant logic # RHEL-variant logic
if [[ "$1" = "0" ]]; then if [[ "$1" = "0" ]]; then
# InfluxDB is no longer installed, remove from init system # InfluxDB is no longer installed, remove from init system
rm -f /etc/default/telegraf rm -f /etc/default/telegraf
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
disable_systemd disable_systemd /usr/lib/systemd/system/telegraf.service
else else
# Assuming sysv # Assuming sysv
disable_chkconfig disable_chkconfig
@ -35,7 +35,7 @@ elif [[ -f /etc/debian_version ]]; then
rm -f /etc/default/telegraf rm -f /etc/default/telegraf
if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then if [[ "$(readlink /proc/1/exe)" == */systemd ]]; then
disable_systemd disable_systemd /lib/systemd/system/telegraf.service
else else
# Assuming sysv # Assuming sysv
# Run update-rc.d or fallback to chkconfig if not available # Run update-rc.d or fallback to chkconfig if not available

View File

@ -417,3 +417,53 @@ func (a *Accumulator) HasMeasurement(measurement string) bool {
} }
return false return false
} }
func (a *Accumulator) IntField(measurement string, field string) (int, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(int)
return v, ok
}
}
}
}
return 0, false
}
func (a *Accumulator) FloatField(measurement string, field string) (float64, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(float64)
return v, ok
}
}
}
}
return 0.0, false
}
func (a *Accumulator) StringField(measurement string, field string) (string, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(string)
return v, ok
}
}
}
}
return "", false
}