Updated with new code from master

This commit is contained in:
Dennis Bellinger 2016-08-08 10:45:05 -04:00
commit a22b24af06
31 changed files with 459 additions and 317 deletions

View File

@ -49,8 +49,15 @@ should now look like:
path = "/" path = "/"
``` ```
- `flush_jitter` behavior has been changed. The random jitter will now be
evaluated at every flush interval, rather than once at startup. This makes it
consistent with the behavior of `collection_jitter`.
### Features ### Features
- [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag.
- [#1525](https://github.com/influxdata/telegraf/pull/1525): Support setting per-device and total metrics for Docker network and blockio.
- [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats()
- [#1503](https://github.com/influxdata/telegraf/pull/1503): Add tls support for certs to RabbitMQ input plugin - [#1503](https://github.com/influxdata/telegraf/pull/1503): Add tls support for certs to RabbitMQ input plugin
- [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez! - [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez!
- [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin. - [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin.
@ -65,73 +72,11 @@ should now look like:
- [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin. - [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin.
- [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag. - [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag.
- [#1411](https://github.com/influxdata/telegraf/pull/1411): Implement support for fetching hddtemp data - [#1411](https://github.com/influxdata/telegraf/pull/1411): Implement support for fetching hddtemp data
### Bugfixes
- [#1472](https://github.com/influxdata/telegraf/pull/1472): diskio input plugin: set 'skip_serial_number = true' by default to avoid high cardinality.
- [#1426](https://github.com/influxdata/telegraf/pull/1426): nil metrics panic fix.
- [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin.
- [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin.
- [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin.
- [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support.
- [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload.
- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix.
- [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes.
- [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load.
- [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior
- [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues.
- [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix.
- [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin
- [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config.
- [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors.
- [#1499](https://github.com/influxdata/telegraf/pull/1499): Make the user able to specify full path for HAproxy stats
- [#1521](https://github.com/influxdata/telegraf/pull/1521): Fix Redis url, an extra "tcp://" was added.
## v1.0 beta 2 [2016-06-21]
### Features
- [#1340](https://github.com/influxdata/telegraf/issues/1340): statsd: do not log every dropped metric. - [#1340](https://github.com/influxdata/telegraf/issues/1340): statsd: do not log every dropped metric.
- [#1368](https://github.com/influxdata/telegraf/pull/1368): Add precision rounding to all metrics on collection. - [#1368](https://github.com/influxdata/telegraf/pull/1368): Add precision rounding to all metrics on collection.
- [#1390](https://github.com/influxdata/telegraf/pull/1390): Add support for Tengine - [#1390](https://github.com/influxdata/telegraf/pull/1390): Add support for Tengine
- [#1320](https://github.com/influxdata/telegraf/pull/1320): Logparser input plugin for parsing grok-style log patterns. - [#1320](https://github.com/influxdata/telegraf/pull/1320): Logparser input plugin for parsing grok-style log patterns.
- [#1397](https://github.com/influxdata/telegraf/issues/1397): ElasticSearch: now supports connecting to ElasticSearch via SSL - [#1397](https://github.com/influxdata/telegraf/issues/1397): ElasticSearch: now supports connecting to ElasticSearch via SSL
### Bugfixes
- [#1330](https://github.com/influxdata/telegraf/issues/1330): Fix exec plugin panic when using single binary.
- [#1336](https://github.com/influxdata/telegraf/issues/1336): Fixed incorrect prometheus metrics source selection.
- [#1112](https://github.com/influxdata/telegraf/issues/1112): Set default Zookeeper chroot to empty string.
- [#1335](https://github.com/influxdata/telegraf/issues/1335): Fix overall ping timeout to be calculated based on per-ping timeout.
- [#1374](https://github.com/influxdata/telegraf/pull/1374): Change "default" retention policy to "".
- [#1377](https://github.com/influxdata/telegraf/issues/1377): Graphite output mangling '%' character.
- [#1396](https://github.com/influxdata/telegraf/pull/1396): Prometheus input plugin now supports x509 certs authentication
## v1.0 beta 1 [2016-06-07]
### Release Notes
- `flush_jitter` behavior has been changed. The random jitter will now be
evaluated at every flush interval, rather than once at startup. This makes it
consistent with the behavior of `collection_jitter`.
- All AWS plugins now utilize a standard mechanism for evaluating credentials.
This allows all AWS plugins to support environment variables, shared credential
files & profiles, and role assumptions. See the specific plugin README for
details.
- The AWS CloudWatch input plugin can now declare a wildcard value for a metric
dimension. This causes the plugin to read all metrics that contain the specified
dimension key regardless of value. This is used to export collections of metrics
without having to know the dimension values ahead of time.
- The AWS CloudWatch input plugin can now be configured with the `cache_ttl`
attribute. This configures the TTL of the internal metric cache. This is useful
in conjunction with wildcard dimension values as it will control the amount of
time before a new metric is included by the plugin.
### Features
- [#1262](https://github.com/influxdata/telegraf/pull/1261): Add graylog input pluging. - [#1262](https://github.com/influxdata/telegraf/pull/1261): Add graylog input pluging.
- [#1294](https://github.com/influxdata/telegraf/pull/1294): consul input plugin. Thanks @harnash - [#1294](https://github.com/influxdata/telegraf/pull/1294): consul input plugin. Thanks @harnash
- [#1164](https://github.com/influxdata/telegraf/pull/1164): conntrack input plugin. Thanks @robinpercy! - [#1164](https://github.com/influxdata/telegraf/pull/1164): conntrack input plugin. Thanks @robinpercy!
@ -150,6 +95,38 @@ time before a new metric is included by the plugin.
### Bugfixes ### Bugfixes
- [#1519](https://github.com/influxdata/telegraf/pull/1519): Fix error race conditions and partial failures.
- [#1477](https://github.com/influxdata/telegraf/issues/1477): nstat: fix inaccurate config panic.
- [#1481](https://github.com/influxdata/telegraf/issues/1481): jolokia: fix handling multiple multi-dimensional attributes.
- [#1430](https://github.com/influxdata/telegraf/issues/1430): Fix prometheus character sanitizing. Sanitize more win_perf_counters characters.
- [#1534](https://github.com/influxdata/telegraf/pull/1534): Add diskio io_time to FreeBSD & report timing metrics as ms (as linux does).
- [#1379](https://github.com/influxdata/telegraf/issues/1379): Fix covering Amazon Linux for post remove flow.
- [#1584](https://github.com/influxdata/telegraf/issues/1584): procstat missing fields: read/write bytes & count
- [#1472](https://github.com/influxdata/telegraf/pull/1472): diskio input plugin: set 'skip_serial_number = true' by default to avoid high cardinality.
- [#1426](https://github.com/influxdata/telegraf/pull/1426): nil metrics panic fix.
- [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin.
- [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin.
- [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin.
- [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support.
- [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload.
- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix.
- [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes.
- [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load.
- [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior
- [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues.
- [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix.
- [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin
- [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config.
- [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors.
- [#1499](https://github.com/influxdata/telegraf/pull/1499): Make the user able to specify full path for HAproxy stats
- [#1521](https://github.com/influxdata/telegraf/pull/1521): Fix Redis url, an extra "tcp://" was added.
- [#1330](https://github.com/influxdata/telegraf/issues/1330): Fix exec plugin panic when using single binary.
- [#1336](https://github.com/influxdata/telegraf/issues/1336): Fixed incorrect prometheus metrics source selection.
- [#1112](https://github.com/influxdata/telegraf/issues/1112): Set default Zookeeper chroot to empty string.
- [#1335](https://github.com/influxdata/telegraf/issues/1335): Fix overall ping timeout to be calculated based on per-ping timeout.
- [#1374](https://github.com/influxdata/telegraf/pull/1374): Change "default" retention policy to "".
- [#1377](https://github.com/influxdata/telegraf/issues/1377): Graphite output mangling '%' character.
- [#1396](https://github.com/influxdata/telegraf/pull/1396): Prometheus input plugin now supports x509 certs authentication
- [#1252](https://github.com/influxdata/telegraf/pull/1252) & [#1279](https://github.com/influxdata/telegraf/pull/1279): Fix systemd service. Thanks @zbindenren & @PierreF! - [#1252](https://github.com/influxdata/telegraf/pull/1252) & [#1279](https://github.com/influxdata/telegraf/pull/1279): Fix systemd service. Thanks @zbindenren & @PierreF!
- [#1221](https://github.com/influxdata/telegraf/pull/1221): Fix influxdb n_shards counter. - [#1221](https://github.com/influxdata/telegraf/pull/1221): Fix influxdb n_shards counter.
- [#1258](https://github.com/influxdata/telegraf/pull/1258): Fix potential kernel plugin integer parse error. - [#1258](https://github.com/influxdata/telegraf/pull/1258): Fix potential kernel plugin integer parse error.
@ -159,6 +136,7 @@ time before a new metric is included by the plugin.
- [#1316](https://github.com/influxdata/telegraf/pull/1316): Removed leaked "database" tag on redis metrics. Thanks @PierreF! - [#1316](https://github.com/influxdata/telegraf/pull/1316): Removed leaked "database" tag on redis metrics. Thanks @PierreF!
- [#1323](https://github.com/influxdata/telegraf/issues/1323): Processes plugin: fix potential error with /proc/net/stat directory. - [#1323](https://github.com/influxdata/telegraf/issues/1323): Processes plugin: fix potential error with /proc/net/stat directory.
- [#1322](https://github.com/influxdata/telegraf/issues/1322): Fix rare RHEL 5.2 panic in gopsutil diskio gathering function. - [#1322](https://github.com/influxdata/telegraf/issues/1322): Fix rare RHEL 5.2 panic in gopsutil diskio gathering function.
- [#1586](https://github.com/influxdata/telegraf/pull/1586): Remove IF NOT EXISTS from influxdb output database creation.
## v0.13.1 [2016-05-24] ## v0.13.1 [2016-05-24]

View File

@ -11,6 +11,7 @@ Output plugins READMEs are less structured,
but any information you can provide on how the data will look is appreciated. but any information you can provide on how the data will look is appreciated.
See the [OpenTSDB output](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/opentsdb) See the [OpenTSDB output](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/opentsdb)
for a good example. for a good example.
1. **Optional:** Write a [tickscript](https://docs.influxdata.com/kapacitor/v1.0/tick/syntax/) for your plugin and add it to [Kapacitor](https://github.com/influxdata/kapacitor/tree/master/examples/telegraf). Or mention @jackzampolin in a PR comment with some common queries that you would want to alert on and he will write one for you.
## GoDoc ## GoDoc

2
Godeps
View File

@ -46,7 +46,7 @@ github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6
github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37 github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37
github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8 github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8
github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f
github.com/shirou/gopsutil ee66bc560c366dd33b9a4046ba0b644caba46bed github.com/shirou/gopsutil 4d0c402af66c78735c5ccf820dc2ca7de5e4ff08
github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d
github.com/sparrc/aerospike-client-go d4bb42d2c2d39dae68e054116f4538af189e05d5 github.com/sparrc/aerospike-client-go d4bb42d2c2d39dae68e054116f4538af189e05d5
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744

View File

@ -12,7 +12,7 @@ import (
) )
func NewAccumulator( func NewAccumulator(
inputConfig *internal_models.InputConfig, inputConfig *models.InputConfig,
metrics chan telegraf.Metric, metrics chan telegraf.Metric,
) *accumulator { ) *accumulator {
acc := accumulator{} acc := accumulator{}
@ -31,7 +31,7 @@ type accumulator struct {
// print every point added to the accumulator // print every point added to the accumulator
trace bool trace bool
inputConfig *internal_models.InputConfig inputConfig *models.InputConfig
precision time.Duration precision time.Duration

View File

@ -21,7 +21,7 @@ func TestAdd(t *testing.T) {
now := time.Now() now := time.Now()
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
a.Add("acctest", float64(101), map[string]string{}) a.Add("acctest", float64(101), map[string]string{})
a.Add("acctest", float64(101), map[string]string{"acc": "test"}) a.Add("acctest", float64(101), map[string]string{"acc": "test"})
@ -47,7 +47,7 @@ func TestAddNoPrecisionWithInterval(t *testing.T) {
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
a.SetPrecision(0, time.Second) a.SetPrecision(0, time.Second)
a.Add("acctest", float64(101), map[string]string{}) a.Add("acctest", float64(101), map[string]string{})
@ -74,7 +74,7 @@ func TestAddNoIntervalWithPrecision(t *testing.T) {
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
a.SetPrecision(time.Second, time.Millisecond) a.SetPrecision(time.Second, time.Millisecond)
a.Add("acctest", float64(101), map[string]string{}) a.Add("acctest", float64(101), map[string]string{})
@ -101,7 +101,7 @@ func TestAddDisablePrecision(t *testing.T) {
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
a.SetPrecision(time.Second, time.Millisecond) a.SetPrecision(time.Second, time.Millisecond)
a.DisablePrecision() a.DisablePrecision()
@ -129,7 +129,7 @@ func TestDifferentPrecisions(t *testing.T) {
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC) now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
a.SetPrecision(0, time.Second) a.SetPrecision(0, time.Second)
a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now)
@ -170,7 +170,7 @@ func TestAddDefaultTags(t *testing.T) {
now := time.Now() now := time.Now()
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
a.Add("acctest", float64(101), map[string]string{}) a.Add("acctest", float64(101), map[string]string{})
a.Add("acctest", float64(101), map[string]string{"acc": "test"}) a.Add("acctest", float64(101), map[string]string{"acc": "test"})
@ -196,7 +196,7 @@ func TestAddFields(t *testing.T) {
now := time.Now() now := time.Now()
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
fields := map[string]interface{}{ fields := map[string]interface{}{
"usage": float64(99), "usage": float64(99),
@ -229,7 +229,7 @@ func TestAddInfFields(t *testing.T) {
now := time.Now() now := time.Now()
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
fields := map[string]interface{}{ fields := map[string]interface{}{
"usage": inf, "usage": inf,
@ -257,7 +257,7 @@ func TestAddNaNFields(t *testing.T) {
now := time.Now() now := time.Now()
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
fields := map[string]interface{}{ fields := map[string]interface{}{
"usage": nan, "usage": nan,
@ -281,7 +281,7 @@ func TestAddUint64Fields(t *testing.T) {
now := time.Now() now := time.Now()
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
fields := map[string]interface{}{ fields := map[string]interface{}{
"usage": uint64(99), "usage": uint64(99),
@ -310,7 +310,7 @@ func TestAddUint64Overflow(t *testing.T) {
now := time.Now() now := time.Now()
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
fields := map[string]interface{}{ fields := map[string]interface{}{
"usage": uint64(9223372036854775808), "usage": uint64(9223372036854775808),
@ -340,7 +340,7 @@ func TestAddInts(t *testing.T) {
now := time.Now() now := time.Now()
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
a.Add("acctest", int(101), map[string]string{}) a.Add("acctest", int(101), map[string]string{})
a.Add("acctest", int32(101), map[string]string{"acc": "test"}) a.Add("acctest", int32(101), map[string]string{"acc": "test"})
@ -367,7 +367,7 @@ func TestAddFloats(t *testing.T) {
now := time.Now() now := time.Now()
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
a.Add("acctest", float32(101), map[string]string{"acc": "test"}) a.Add("acctest", float32(101), map[string]string{"acc": "test"})
a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now) a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now)
@ -389,7 +389,7 @@ func TestAddStrings(t *testing.T) {
now := time.Now() now := time.Now()
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
a.Add("acctest", "test", map[string]string{"acc": "test"}) a.Add("acctest", "test", map[string]string{"acc": "test"})
a.Add("acctest", "foo", map[string]string{"acc": "test"}, now) a.Add("acctest", "foo", map[string]string{"acc": "test"}, now)
@ -411,7 +411,7 @@ func TestAddBools(t *testing.T) {
now := time.Now() now := time.Now()
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
a.Add("acctest", true, map[string]string{"acc": "test"}) a.Add("acctest", true, map[string]string{"acc": "test"})
a.Add("acctest", false, map[string]string{"acc": "test"}, now) a.Add("acctest", false, map[string]string{"acc": "test"}, now)
@ -433,11 +433,11 @@ func TestAccFilterTags(t *testing.T) {
now := time.Now() now := time.Now()
a.metrics = make(chan telegraf.Metric, 10) a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics) defer close(a.metrics)
filter := internal_models.Filter{ filter := models.Filter{
TagExclude: []string{"acc"}, TagExclude: []string{"acc"},
} }
assert.NoError(t, filter.CompileFilter()) assert.NoError(t, filter.CompileFilter())
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
a.inputConfig.Filter = filter a.inputConfig.Filter = filter
a.Add("acctest", float64(101), map[string]string{}) a.Add("acctest", float64(101), map[string]string{})
@ -465,7 +465,7 @@ func TestAccAddError(t *testing.T) {
defer log.SetOutput(os.Stderr) defer log.SetOutput(os.Stderr)
a := accumulator{} a := accumulator{}
a.inputConfig = &internal_models.InputConfig{} a.inputConfig = &models.InputConfig{}
a.inputConfig.Name = "mock_plugin" a.inputConfig.Name = "mock_plugin"
a.AddError(fmt.Errorf("foo")) a.AddError(fmt.Errorf("foo"))

View File

@ -88,7 +88,7 @@ func (a *Agent) Close() error {
return err return err
} }
func panicRecover(input *internal_models.RunningInput) { func panicRecover(input *models.RunningInput) {
if err := recover(); err != nil { if err := recover(); err != nil {
trace := make([]byte, 2048) trace := make([]byte, 2048)
runtime.Stack(trace, true) runtime.Stack(trace, true)
@ -104,7 +104,7 @@ func panicRecover(input *internal_models.RunningInput) {
// reporting interval. // reporting interval.
func (a *Agent) gatherer( func (a *Agent) gatherer(
shutdown chan struct{}, shutdown chan struct{},
input *internal_models.RunningInput, input *models.RunningInput,
interval time.Duration, interval time.Duration,
metricC chan telegraf.Metric, metricC chan telegraf.Metric,
) error { ) error {
@ -152,7 +152,7 @@ func (a *Agent) gatherer(
// over. // over.
func gatherWithTimeout( func gatherWithTimeout(
shutdown chan struct{}, shutdown chan struct{},
input *internal_models.RunningInput, input *models.RunningInput,
acc *accumulator, acc *accumulator,
timeout time.Duration, timeout time.Duration,
) { ) {
@ -240,7 +240,7 @@ func (a *Agent) flush() {
wg.Add(len(a.Config.Outputs)) wg.Add(len(a.Config.Outputs))
for _, o := range a.Config.Outputs { for _, o := range a.Config.Outputs {
go func(output *internal_models.RunningOutput) { go func(output *models.RunningOutput) {
defer wg.Done() defer wg.Done()
err := output.Write() err := output.Write()
if err != nil { if err != nil {
@ -351,7 +351,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
if input.Config.Interval != 0 { if input.Config.Interval != 0 {
interval = input.Config.Interval interval = input.Config.Interval
} }
go func(in *internal_models.RunningInput, interv time.Duration) { go func(in *models.RunningInput, interv time.Duration) {
defer wg.Done() defer wg.Done()
if err := a.gatherer(shutdown, in, interv, metricC); err != nil { if err := a.gatherer(shutdown, in, interv, metricC); err != nil {
log.Printf(err.Error()) log.Printf(err.Error())

View File

@ -41,12 +41,6 @@ var fOutputList = flag.Bool("output-list", false,
"print available output plugins.") "print available output plugins.")
var fUsage = flag.String("usage", "", var fUsage = flag.String("usage", "",
"print usage for a plugin, ie, 'telegraf -usage mysql'") "print usage for a plugin, ie, 'telegraf -usage mysql'")
var fInputFiltersLegacy = flag.String("filter", "",
"filter the inputs to enable, separator is :")
var fOutputFiltersLegacy = flag.String("outputfilter", "",
"filter the outputs to enable, separator is :")
var fConfigDirectoryLegacy = flag.String("configdirectory", "",
"directory containing additional *.conf files")
var fService = flag.String("service", "", var fService = flag.String("service", "",
"operate on the service") "operate on the service")
@ -130,24 +124,11 @@ func reloadLoop(stop chan struct{}, s service.Service) {
args := flag.Args() args := flag.Args()
var inputFilters []string var inputFilters []string
if *fInputFiltersLegacy != "" {
fmt.Printf("WARNING '--filter' flag is deprecated, please use" +
" '--input-filter'")
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}
if *fInputFilters != "" { if *fInputFilters != "" {
inputFilter := strings.TrimSpace(*fInputFilters) inputFilter := strings.TrimSpace(*fInputFilters)
inputFilters = strings.Split(":"+inputFilter+":", ":") inputFilters = strings.Split(":"+inputFilter+":", ":")
} }
var outputFilters []string var outputFilters []string
if *fOutputFiltersLegacy != "" {
fmt.Printf("WARNING '--outputfilter' flag is deprecated, please use" +
" '--output-filter'")
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
if *fOutputFilters != "" { if *fOutputFilters != "" {
outputFilter := strings.TrimSpace(*fOutputFilters) outputFilter := strings.TrimSpace(*fOutputFilters)
outputFilters = strings.Split(":"+outputFilter+":", ":") outputFilters = strings.Split(":"+outputFilter+":", ":")
@ -165,34 +146,28 @@ func reloadLoop(stop chan struct{}, s service.Service) {
} }
} }
if *fOutputList { // switch for flags which just do something and exit immediately
switch {
case *fOutputList:
fmt.Println("Available Output Plugins:") fmt.Println("Available Output Plugins:")
for k, _ := range outputs.Outputs { for k, _ := range outputs.Outputs {
fmt.Printf(" %s\n", k) fmt.Printf(" %s\n", k)
} }
return return
} case *fInputList:
if *fInputList {
fmt.Println("Available Input Plugins:") fmt.Println("Available Input Plugins:")
for k, _ := range inputs.Inputs { for k, _ := range inputs.Inputs {
fmt.Printf(" %s\n", k) fmt.Printf(" %s\n", k)
} }
return return
} case *fVersion:
if *fVersion {
v := fmt.Sprintf("Telegraf - version %s", version) v := fmt.Sprintf("Telegraf - version %s", version)
fmt.Println(v) fmt.Println(v)
return return
} case *fSampleConfig:
if *fSampleConfig {
config.PrintSampleConfig(inputFilters, outputFilters) config.PrintSampleConfig(inputFilters, outputFilters)
return return
} case *fUsage != "":
if *fUsage != "" {
if err := config.PrintInputConfig(*fUsage); err != nil { if err := config.PrintInputConfig(*fUsage); err != nil {
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil { if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
log.Fatalf("%s and %s", err, err2) log.Fatalf("%s and %s", err, err2)
@ -222,15 +197,6 @@ func reloadLoop(stop chan struct{}, s service.Service) {
os.Exit(1) os.Exit(1)
} }
if *fConfigDirectoryLegacy != "" {
fmt.Printf("WARNING '--configdirectory' flag is deprecated, please use" +
" '--config-directory'")
err = c.LoadDirectory(*fConfigDirectoryLegacy)
if err != nil {
log.Fatal(err)
}
}
if *fConfigDirectory != "" { if *fConfigDirectory != "" {
err = c.LoadDirectory(*fConfigDirectory) err = c.LoadDirectory(*fConfigDirectory)
if err != nil { if err != nil {

View File

@ -55,7 +55,7 @@
## By default, precision will be set to the same timestamp order as the ## By default, precision will be set to the same timestamp order as the
## collection interval, with the maximum being 1s. ## collection interval, with the maximum being 1s.
## Precision will NOT be used for service inputs, such as logparser and statsd. ## Precision will NOT be used for service inputs, such as logparser and statsd.
## Valid values are "Nns", "Nus" (or "Nµs"), "Nms", "Ns". ## Valid values are "ns", "us" (or "µs"), "ms", "s".
precision = "" precision = ""
## Run telegraf in debug mode ## Run telegraf in debug mode
debug = false debug = false
@ -1577,7 +1577,7 @@
# ## /var/log/**.log -> recursively find all .log files in /var/log # ## /var/log/**.log -> recursively find all .log files in /var/log
# ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log # ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
# ## /var/log/apache.log -> only tail the apache log file # ## /var/log/apache.log -> only tail the apache log file
# files = ["/var/log/influxdb/influxdb.log"] # files = ["/var/log/apache/access.log"]
# ## Read file from beginning. # ## Read file from beginning.
# from_beginning = false # from_beginning = false
# #
@ -1590,9 +1590,9 @@
# ## Other common built-in patterns are: # ## Other common built-in patterns are:
# ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) # ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
# ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) # ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
# patterns = ["%{INFLUXDB_HTTPD_LOG}"] # patterns = ["%{COMBINED_LOG_FORMAT}"]
# ## Name of the outputted measurement name. # ## Name of the outputted measurement name.
# measurement = "influxdb_log" # measurement = "apache_access_log"
# ## Full path(s) to custom pattern files. # ## Full path(s) to custom pattern files.
# custom_pattern_files = [] # custom_pattern_files = []
# ## Custom patterns can also be defined here. Put one pattern per line. # ## Custom patterns can also be defined here. Put one pattern per line.

View File

@ -47,8 +47,8 @@ type Config struct {
OutputFilters []string OutputFilters []string
Agent *AgentConfig Agent *AgentConfig
Inputs []*internal_models.RunningInput Inputs []*models.RunningInput
Outputs []*internal_models.RunningOutput Outputs []*models.RunningOutput
} }
func NewConfig() *Config { func NewConfig() *Config {
@ -61,8 +61,8 @@ func NewConfig() *Config {
}, },
Tags: make(map[string]string), Tags: make(map[string]string),
Inputs: make([]*internal_models.RunningInput, 0), Inputs: make([]*models.RunningInput, 0),
Outputs: make([]*internal_models.RunningOutput, 0), Outputs: make([]*models.RunningOutput, 0),
InputFilters: make([]string, 0), InputFilters: make([]string, 0),
OutputFilters: make([]string, 0), OutputFilters: make([]string, 0),
} }
@ -219,7 +219,7 @@ var header = `# Telegraf Configuration
## By default, precision will be set to the same timestamp order as the ## By default, precision will be set to the same timestamp order as the
## collection interval, with the maximum being 1s. ## collection interval, with the maximum being 1s.
## Precision will NOT be used for service inputs, such as logparser and statsd. ## Precision will NOT be used for service inputs, such as logparser and statsd.
## Valid values are "Nns", "Nus" (or "Nµs"), "Nms", "Ns". ## Valid values are "ns", "us" (or "µs"), "ms", "s".
precision = "" precision = ""
## Run telegraf in debug mode ## Run telegraf in debug mode
debug = false debug = false
@ -598,7 +598,7 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
return err return err
} }
ro := internal_models.NewRunningOutput(name, output, outputConfig, ro := models.NewRunningOutput(name, output, outputConfig,
c.Agent.MetricBatchSize, c.Agent.MetricBufferLimit) c.Agent.MetricBatchSize, c.Agent.MetricBufferLimit)
c.Outputs = append(c.Outputs, ro) c.Outputs = append(c.Outputs, ro)
return nil return nil
@ -639,7 +639,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
return err return err
} }
rp := &internal_models.RunningInput{ rp := &models.RunningInput{
Name: name, Name: name,
Input: input, Input: input,
Config: pluginConfig, Config: pluginConfig,
@ -650,10 +650,10 @@ func (c *Config) addInput(name string, table *ast.Table) error {
// buildFilter builds a Filter // buildFilter builds a Filter
// (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to // (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to
// be inserted into the internal_models.OutputConfig/internal_models.InputConfig // be inserted into the models.OutputConfig/models.InputConfig
// to be used for glob filtering on tags and measurements // to be used for glob filtering on tags and measurements
func buildFilter(tbl *ast.Table) (internal_models.Filter, error) { func buildFilter(tbl *ast.Table) (models.Filter, error) {
f := internal_models.Filter{} f := models.Filter{}
if node, ok := tbl.Fields["namepass"]; ok { if node, ok := tbl.Fields["namepass"]; ok {
if kv, ok := node.(*ast.KeyValue); ok { if kv, ok := node.(*ast.KeyValue); ok {
@ -717,7 +717,7 @@ func buildFilter(tbl *ast.Table) (internal_models.Filter, error) {
if subtbl, ok := node.(*ast.Table); ok { if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields { for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok { if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &internal_models.TagFilter{Name: name} tagfilter := &models.TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok { if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value { for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok { if str, ok := elem.(*ast.String); ok {
@ -736,7 +736,7 @@ func buildFilter(tbl *ast.Table) (internal_models.Filter, error) {
if subtbl, ok := node.(*ast.Table); ok { if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields { for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok { if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &internal_models.TagFilter{Name: name} tagfilter := &models.TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok { if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value { for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok { if str, ok := elem.(*ast.String); ok {
@ -793,9 +793,9 @@ func buildFilter(tbl *ast.Table) (internal_models.Filter, error) {
// buildInput parses input specific items from the ast.Table, // buildInput parses input specific items from the ast.Table,
// builds the filter and returns a // builds the filter and returns a
// internal_models.InputConfig to be inserted into internal_models.RunningInput // models.InputConfig to be inserted into models.RunningInput
func buildInput(name string, tbl *ast.Table) (*internal_models.InputConfig, error) { func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
cp := &internal_models.InputConfig{Name: name} cp := &models.InputConfig{Name: name}
if node, ok := tbl.Fields["interval"]; ok { if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok { if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok { if str, ok := kv.Value.(*ast.String); ok {
@ -969,14 +969,14 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
// buildOutput parses output specific items from the ast.Table, // buildOutput parses output specific items from the ast.Table,
// builds the filter and returns an // builds the filter and returns an
// internal_models.OutputConfig to be inserted into internal_models.RunningInput // models.OutputConfig to be inserted into models.RunningInput
// Note: error exists in the return for future calls that might require error // Note: error exists in the return for future calls that might require error
func buildOutput(name string, tbl *ast.Table) (*internal_models.OutputConfig, error) { func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
filter, err := buildFilter(tbl) filter, err := buildFilter(tbl)
if err != nil { if err != nil {
return nil, err return nil, err
} }
oc := &internal_models.OutputConfig{ oc := &models.OutputConfig{
Name: name, Name: name,
Filter: filter, Filter: filter,
} }

View File

@ -26,19 +26,19 @@ func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) {
memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached := inputs.Inputs["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"192.168.1.1"} memcached.Servers = []string{"192.168.1.1"}
filter := internal_models.Filter{ filter := models.Filter{
NameDrop: []string{"metricname2"}, NameDrop: []string{"metricname2"},
NamePass: []string{"metricname1"}, NamePass: []string{"metricname1"},
FieldDrop: []string{"other", "stuff"}, FieldDrop: []string{"other", "stuff"},
FieldPass: []string{"some", "strings"}, FieldPass: []string{"some", "strings"},
TagDrop: []internal_models.TagFilter{ TagDrop: []models.TagFilter{
internal_models.TagFilter{ models.TagFilter{
Name: "badtag", Name: "badtag",
Filter: []string{"othertag"}, Filter: []string{"othertag"},
}, },
}, },
TagPass: []internal_models.TagFilter{ TagPass: []models.TagFilter{
internal_models.TagFilter{ models.TagFilter{
Name: "goodtag", Name: "goodtag",
Filter: []string{"mytag"}, Filter: []string{"mytag"},
}, },
@ -46,7 +46,7 @@ func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) {
IsActive: true, IsActive: true,
} }
assert.NoError(t, filter.CompileFilter()) assert.NoError(t, filter.CompileFilter())
mConfig := &internal_models.InputConfig{ mConfig := &models.InputConfig{
Name: "memcached", Name: "memcached",
Filter: filter, Filter: filter,
Interval: 10 * time.Second, Interval: 10 * time.Second,
@ -66,19 +66,19 @@ func TestConfig_LoadSingleInput(t *testing.T) {
memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached := inputs.Inputs["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"localhost"} memcached.Servers = []string{"localhost"}
filter := internal_models.Filter{ filter := models.Filter{
NameDrop: []string{"metricname2"}, NameDrop: []string{"metricname2"},
NamePass: []string{"metricname1"}, NamePass: []string{"metricname1"},
FieldDrop: []string{"other", "stuff"}, FieldDrop: []string{"other", "stuff"},
FieldPass: []string{"some", "strings"}, FieldPass: []string{"some", "strings"},
TagDrop: []internal_models.TagFilter{ TagDrop: []models.TagFilter{
internal_models.TagFilter{ models.TagFilter{
Name: "badtag", Name: "badtag",
Filter: []string{"othertag"}, Filter: []string{"othertag"},
}, },
}, },
TagPass: []internal_models.TagFilter{ TagPass: []models.TagFilter{
internal_models.TagFilter{ models.TagFilter{
Name: "goodtag", Name: "goodtag",
Filter: []string{"mytag"}, Filter: []string{"mytag"},
}, },
@ -86,7 +86,7 @@ func TestConfig_LoadSingleInput(t *testing.T) {
IsActive: true, IsActive: true,
} }
assert.NoError(t, filter.CompileFilter()) assert.NoError(t, filter.CompileFilter())
mConfig := &internal_models.InputConfig{ mConfig := &models.InputConfig{
Name: "memcached", Name: "memcached",
Filter: filter, Filter: filter,
Interval: 5 * time.Second, Interval: 5 * time.Second,
@ -113,19 +113,19 @@ func TestConfig_LoadDirectory(t *testing.T) {
memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached := inputs.Inputs["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"localhost"} memcached.Servers = []string{"localhost"}
filter := internal_models.Filter{ filter := models.Filter{
NameDrop: []string{"metricname2"}, NameDrop: []string{"metricname2"},
NamePass: []string{"metricname1"}, NamePass: []string{"metricname1"},
FieldDrop: []string{"other", "stuff"}, FieldDrop: []string{"other", "stuff"},
FieldPass: []string{"some", "strings"}, FieldPass: []string{"some", "strings"},
TagDrop: []internal_models.TagFilter{ TagDrop: []models.TagFilter{
internal_models.TagFilter{ models.TagFilter{
Name: "badtag", Name: "badtag",
Filter: []string{"othertag"}, Filter: []string{"othertag"},
}, },
}, },
TagPass: []internal_models.TagFilter{ TagPass: []models.TagFilter{
internal_models.TagFilter{ models.TagFilter{
Name: "goodtag", Name: "goodtag",
Filter: []string{"mytag"}, Filter: []string{"mytag"},
}, },
@ -133,7 +133,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
IsActive: true, IsActive: true,
} }
assert.NoError(t, filter.CompileFilter()) assert.NoError(t, filter.CompileFilter())
mConfig := &internal_models.InputConfig{ mConfig := &models.InputConfig{
Name: "memcached", Name: "memcached",
Filter: filter, Filter: filter,
Interval: 5 * time.Second, Interval: 5 * time.Second,
@ -150,7 +150,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
ex.SetParser(p) ex.SetParser(p)
ex.Command = "/usr/bin/myothercollector --foo=bar" ex.Command = "/usr/bin/myothercollector --foo=bar"
eConfig := &internal_models.InputConfig{ eConfig := &models.InputConfig{
Name: "exec", Name: "exec",
MeasurementSuffix: "_myothercollector", MeasurementSuffix: "_myothercollector",
} }
@ -169,7 +169,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
pstat := inputs.Inputs["procstat"]().(*procstat.Procstat) pstat := inputs.Inputs["procstat"]().(*procstat.Procstat)
pstat.PidFile = "/var/run/grafana-server.pid" pstat.PidFile = "/var/run/grafana-server.pid"
pConfig := &internal_models.InputConfig{Name: "procstat"} pConfig := &models.InputConfig{Name: "procstat"}
pConfig.Tags = make(map[string]string) pConfig.Tags = make(map[string]string)
assert.Equal(t, pstat, c.Inputs[3].Input, assert.Equal(t, pstat, c.Inputs[3].Input,

View File

@ -1,4 +1,4 @@
package internal_models package models
import ( import (
"fmt" "fmt"

View File

@ -1,4 +1,4 @@
package internal_models package models
import ( import (
"testing" "testing"

View File

@ -1,4 +1,4 @@
package internal_models package models
import ( import (
"time" "time"

View File

@ -1,4 +1,4 @@
package internal_models package models
import ( import (
"log" "log"

View File

@ -1,4 +1,4 @@
package internal_models package models
import ( import (
"fmt" "fmt"

View File

@ -14,17 +14,22 @@ regex patterns.
## /var/log/**.log -> recursively find all .log files in /var/log ## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file ## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/influxdb/influxdb.log"] files = ["/var/log/apache/access.log"]
## Read file from beginning. ## Read file from beginning.
from_beginning = false from_beginning = false
## Parse logstash-style "grok" patterns: ## Parse logstash-style "grok" patterns:
## Telegraf builtin parsing patterns: https://goo.gl/dkay10 ## Telegraf built-in parsing patterns: https://goo.gl/dkay10
[inputs.logparser.grok] [inputs.logparser.grok]
## This is a list of patterns to check the given log file(s) for. ## This is a list of patterns to check the given log file(s) for.
## Note that adding patterns here increases processing time. The most ## Note that adding patterns here increases processing time. The most
## efficient configuration is to have one file & pattern per logparser. ## efficient configuration is to have one pattern per logparser.
patterns = ["%{INFLUXDB_HTTPD_LOG}"] ## Other common built-in patterns are:
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
patterns = ["%{COMBINED_LOG_FORMAT}"]
## Name of the outputted measurement name.
measurement = "apache_access_log"
## Full path(s) to custom pattern files. ## Full path(s) to custom pattern files.
custom_pattern_files = [] custom_pattern_files = []
## Custom patterns can also be defined here. Put one pattern per line. ## Custom patterns can also be defined here. Put one pattern per line.
@ -32,8 +37,6 @@ regex patterns.
''' '''
``` ```
> **Note:** The InfluxDB log pattern in the default configuration only works for Influx versions 1.0.0-beta1 or higher.
## Grok Parser ## Grok Parser
The grok parser uses a slightly modified version of logstash "grok" patterns, The grok parser uses a slightly modified version of logstash "grok" patterns,
@ -69,6 +72,7 @@ Timestamp modifiers can be used to convert captures to the timestamp of the
- tag (converts the field into a tag) - tag (converts the field into a tag)
- drop (drops the field completely) - drop (drops the field completely)
- Timestamp modifiers: - Timestamp modifiers:
- ts (This will auto-learn the timestamp format)
- ts-ansic ("Mon Jan _2 15:04:05 2006") - ts-ansic ("Mon Jan _2 15:04:05 2006")
- ts-unix ("Mon Jan _2 15:04:05 MST 2006") - ts-unix ("Mon Jan _2 15:04:05 MST 2006")
- ts-ruby ("Mon Jan 02 15:04:05 -0700 2006") - ts-ruby ("Mon Jan 02 15:04:05 -0700 2006")

View File

@ -15,7 +15,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
var timeFormats = map[string]string{ var timeLayouts = map[string]string{
"ts-ansic": "Mon Jan _2 15:04:05 2006", "ts-ansic": "Mon Jan _2 15:04:05 2006",
"ts-unix": "Mon Jan _2 15:04:05 MST 2006", "ts-unix": "Mon Jan _2 15:04:05 MST 2006",
"ts-ruby": "Mon Jan 02 15:04:05 -0700 2006", "ts-ruby": "Mon Jan 02 15:04:05 -0700 2006",
@ -27,27 +27,33 @@ var timeFormats = map[string]string{
"ts-rfc3339": "2006-01-02T15:04:05Z07:00", "ts-rfc3339": "2006-01-02T15:04:05Z07:00",
"ts-rfc3339nano": "2006-01-02T15:04:05.999999999Z07:00", "ts-rfc3339nano": "2006-01-02T15:04:05.999999999Z07:00",
"ts-httpd": "02/Jan/2006:15:04:05 -0700", "ts-httpd": "02/Jan/2006:15:04:05 -0700",
"ts-epoch": "EPOCH", // These three are not exactly "layouts", but they are special cases that
"ts-epochnano": "EPOCH_NANO", // will get handled in the ParseLine function.
"ts-epoch": "EPOCH",
"ts-epochnano": "EPOCH_NANO",
"ts": "GENERIC_TIMESTAMP", // try parsing all known timestamp layouts.
} }
const ( const (
INT = "int" INT = "int"
TAG = "tag" TAG = "tag"
FLOAT = "float" FLOAT = "float"
STRING = "string" STRING = "string"
DURATION = "duration" DURATION = "duration"
DROP = "drop" DROP = "drop"
EPOCH = "EPOCH"
EPOCH_NANO = "EPOCH_NANO"
GENERIC_TIMESTAMP = "GENERIC_TIMESTAMP"
) )
var ( var (
// matches named captures that contain a type. // matches named captures that contain a modifier.
// ie, // ie,
// %{NUMBER:bytes:int} // %{NUMBER:bytes:int}
// %{IPORHOST:clientip:tag} // %{IPORHOST:clientip:tag}
// %{HTTPDATE:ts1:ts-http} // %{HTTPDATE:ts1:ts-http}
// %{HTTPDATE:ts2:ts-"02 Jan 06 15:04"} // %{HTTPDATE:ts2:ts-"02 Jan 06 15:04"}
typedRe = regexp.MustCompile(`%{\w+:(\w+):(ts-".+"|t?s?-?\w+)}`) modifierRe = regexp.MustCompile(`%{\w+:(\w+):(ts-".+"|t?s?-?\w+)}`)
// matches a plain pattern name. ie, %{NUMBER} // matches a plain pattern name. ie, %{NUMBER}
patternOnlyRe = regexp.MustCompile(`%{(\w+)}`) patternOnlyRe = regexp.MustCompile(`%{(\w+)}`)
) )
@ -87,6 +93,12 @@ type Parser struct {
// "RESPONSE_CODE": "%{NUMBER:rc:tag}" // "RESPONSE_CODE": "%{NUMBER:rc:tag}"
// } // }
patterns map[string]string patterns map[string]string
// foundTsLayouts is a slice of timestamp patterns that have been found
// in the log lines. This slice gets updated if the user uses the generic
// 'ts' modifier for timestamps. This slice is checked first for matches,
// so that previously-matched layouts get priority over all other timestamp
// layouts.
foundTsLayouts []string
g *grok.Grok g *grok.Grok
tsModder *tsModder tsModder *tsModder
@ -140,6 +152,7 @@ func (p *Parser) Compile() error {
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
var err error var err error
// values are the parsed fields from the log line
var values map[string]string var values map[string]string
// the matching pattern string // the matching pattern string
var patternName string var patternName string
@ -165,6 +178,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
continue continue
} }
// t is the modifier of the field
var t string var t string
// check if pattern has some modifiers // check if pattern has some modifiers
if types, ok := p.typeMap[patternName]; ok { if types, ok := p.typeMap[patternName]; ok {
@ -210,20 +224,50 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
tags[k] = v tags[k] = v
case STRING: case STRING:
fields[k] = strings.Trim(v, `"`) fields[k] = strings.Trim(v, `"`)
case "EPOCH": case EPOCH:
iv, err := strconv.ParseInt(v, 10, 64) iv, err := strconv.ParseInt(v, 10, 64)
if err != nil { if err != nil {
log.Printf("ERROR parsing %s to int: %s", v, err) log.Printf("ERROR parsing %s to int: %s", v, err)
} else { } else {
timestamp = time.Unix(iv, 0) timestamp = time.Unix(iv, 0)
} }
case "EPOCH_NANO": case EPOCH_NANO:
iv, err := strconv.ParseInt(v, 10, 64) iv, err := strconv.ParseInt(v, 10, 64)
if err != nil { if err != nil {
log.Printf("ERROR parsing %s to int: %s", v, err) log.Printf("ERROR parsing %s to int: %s", v, err)
} else { } else {
timestamp = time.Unix(0, iv) timestamp = time.Unix(0, iv)
} }
case GENERIC_TIMESTAMP:
var foundTs bool
// first try timestamp layouts that we've already found
for _, layout := range p.foundTsLayouts {
ts, err := time.Parse(layout, v)
if err == nil {
timestamp = ts
foundTs = true
break
}
}
// if we haven't found a timestamp layout yet, try all timestamp
// layouts.
if !foundTs {
for _, layout := range timeLayouts {
ts, err := time.Parse(layout, v)
if err == nil {
timestamp = ts
foundTs = true
p.foundTsLayouts = append(p.foundTsLayouts, layout)
break
}
}
}
// if we still haven't found a timestamp layout, log it and we will
// just use time.Now()
if !foundTs {
log.Printf("ERROR parsing timestamp [%s], could not find any "+
"suitable time layouts.", v)
}
case DROP: case DROP:
// goodbye! // goodbye!
default: default:
@ -267,7 +311,7 @@ func (p *Parser) compileCustomPatterns() error {
// check if pattern contains modifiers. Parse them out if it does. // check if pattern contains modifiers. Parse them out if it does.
for name, pattern := range p.patterns { for name, pattern := range p.patterns {
if typedRe.MatchString(pattern) { if modifierRe.MatchString(pattern) {
// this pattern has modifiers, so parse out the modifiers // this pattern has modifiers, so parse out the modifiers
pattern, err = p.parseTypedCaptures(name, pattern) pattern, err = p.parseTypedCaptures(name, pattern)
if err != nil { if err != nil {
@ -280,13 +324,13 @@ func (p *Parser) compileCustomPatterns() error {
return p.g.AddPatternsFromMap(p.patterns) return p.g.AddPatternsFromMap(p.patterns)
} }
// parseTypedCaptures parses the capture types, and then deletes the type from // parseTypedCaptures parses the capture modifiers, and then deletes the
// the line so that it is a valid "grok" pattern again. // modifier from the line so that it is a valid "grok" pattern again.
// ie, // ie,
// %{NUMBER:bytes:int} => %{NUMBER:bytes} (stores %{NUMBER}->bytes->int) // %{NUMBER:bytes:int} => %{NUMBER:bytes} (stores %{NUMBER}->bytes->int)
// %{IPORHOST:clientip:tag} => %{IPORHOST:clientip} (stores %{IPORHOST}->clientip->tag) // %{IPORHOST:clientip:tag} => %{IPORHOST:clientip} (stores %{IPORHOST}->clientip->tag)
func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) { func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) {
matches := typedRe.FindAllStringSubmatch(pattern, -1) matches := modifierRe.FindAllStringSubmatch(pattern, -1)
// grab the name of the capture pattern // grab the name of the capture pattern
patternName := "%{" + name + "}" patternName := "%{" + name + "}"
@ -298,16 +342,18 @@ func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) {
hasTimestamp := false hasTimestamp := false
for _, match := range matches { for _, match := range matches {
// regex capture 1 is the name of the capture // regex capture 1 is the name of the capture
// regex capture 2 is the type of the capture // regex capture 2 is the modifier of the capture
if strings.HasPrefix(match[2], "ts-") { if strings.HasPrefix(match[2], "ts") {
if hasTimestamp { if hasTimestamp {
return pattern, fmt.Errorf("logparser pattern compile error: "+ return pattern, fmt.Errorf("logparser pattern compile error: "+
"Each pattern is allowed only one named "+ "Each pattern is allowed only one named "+
"timestamp data type. pattern: %s", pattern) "timestamp data type. pattern: %s", pattern)
} }
if f, ok := timeFormats[match[2]]; ok { if layout, ok := timeLayouts[match[2]]; ok {
p.tsMap[patternName][match[1]] = f // built-in time format
p.tsMap[patternName][match[1]] = layout
} else { } else {
// custom time format
p.tsMap[patternName][match[1]] = strings.TrimSuffix(strings.TrimPrefix(match[2], `ts-"`), `"`) p.tsMap[patternName][match[1]] = strings.TrimSuffix(strings.TrimPrefix(match[2], `ts-"`), `"`)
} }
hasTimestamp = true hasTimestamp = true

View File

@ -38,32 +38,6 @@ func Benchmark_ParseLine_CombinedLogFormat(b *testing.B) {
benchM = m benchM = m
} }
func Benchmark_ParseLine_InfluxLog(b *testing.B) {
p := &Parser{
Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"},
}
p.Compile()
var m telegraf.Metric
for n := 0; n < b.N; n++ {
m, _ = p.ParseLine(`[httpd] 192.168.1.1 - - [14/Jun/2016:11:33:29 +0100] "POST /write?consistency=any&db=telegraf&precision=ns&rp= HTTP/1.1" 204 0 "-" "InfluxDBClient" 6f61bc44-321b-11e6-8050-000000000000 2513`)
}
benchM = m
}
func Benchmark_ParseLine_InfluxLog_NoMatch(b *testing.B) {
p := &Parser{
Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"},
}
p.Compile()
var m telegraf.Metric
for n := 0; n < b.N; n++ {
m, _ = p.ParseLine(`[retention] 2016/06/14 14:38:24 retention policy shard deletion check commencing`)
}
benchM = m
}
func Benchmark_ParseLine_CustomPattern(b *testing.B) { func Benchmark_ParseLine_CustomPattern(b *testing.B) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
@ -108,9 +82,9 @@ func TestMeasurementName(t *testing.T) {
assert.Equal(t, "my_web_log", m.Name()) assert.Equal(t, "my_web_log", m.Name())
} }
func TestBuiltinInfluxdbHttpd(t *testing.T) { func TestCustomInfluxdbHttpd(t *testing.T) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"}, Patterns: []string{`\[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:response_time_us:int}`},
} }
assert.NoError(t, p.Compile()) assert.NoError(t, p.Compile())
@ -333,6 +307,55 @@ func TestParseEpochErrors(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
} }
func TestParseGenericTimestamp(t *testing.T) {
p := &Parser{
Patterns: []string{`\[%{HTTPDATE:ts:ts}\] response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}`},
}
assert.NoError(t, p.Compile())
metricA, err := p.ParseLine(`[09/Jun/2016:03:37:03 +0000] response_time=20821 mymetric=10890.645`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"response_time": int64(20821),
"metric": float64(10890.645),
},
metricA.Fields())
assert.Equal(t, map[string]string{}, metricA.Tags())
assert.Equal(t, time.Unix(1465443423, 0).UTC(), metricA.Time().UTC())
metricB, err := p.ParseLine(`[09/Jun/2016:03:37:04 +0000] response_time=20821 mymetric=10890.645`)
require.NotNil(t, metricB)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"response_time": int64(20821),
"metric": float64(10890.645),
},
metricB.Fields())
assert.Equal(t, map[string]string{}, metricB.Tags())
assert.Equal(t, time.Unix(1465443424, 0).UTC(), metricB.Time().UTC())
}
func TestParseGenericTimestampNotFound(t *testing.T) {
p := &Parser{
Patterns: []string{`\[%{NOTSPACE:ts:ts}\] response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}`},
}
assert.NoError(t, p.Compile())
metricA, err := p.ParseLine(`[foobar] response_time=20821 mymetric=10890.645`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"response_time": int64(20821),
"metric": float64(10890.645),
},
metricA.Fields())
assert.Equal(t, map[string]string{}, metricA.Tags())
}
func TestCompileFileAndParse(t *testing.T) { func TestCompileFileAndParse(t *testing.T) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},

View File

@ -55,15 +55,13 @@ EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE}
# Wider-ranging username matching vs. logstash built-in %{USER} # Wider-ranging username matching vs. logstash built-in %{USER}
NGUSERNAME [a-zA-Z\.\@\-\+_%]+ NGUSERNAME [a-zA-Z\.\@\-\+_%]+
NGUSER %{NGUSERNAME} NGUSER %{NGUSERNAME}
# Wider-ranging client IP matching
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
## ##
## COMMON LOG PATTERNS ## COMMON LOG PATTERNS
## ##
# InfluxDB log patterns
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
INFLUXDB_HTTPD_LOG \[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:response_time_us:int}
# apache & nginx logs, this is also known as the "common log format" # apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format # see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)

View File

@ -51,15 +51,13 @@ EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE}
# Wider-ranging username matching vs. logstash built-in %{USER} # Wider-ranging username matching vs. logstash built-in %{USER}
NGUSERNAME [a-zA-Z\.\@\-\+_%]+ NGUSERNAME [a-zA-Z\.\@\-\+_%]+
NGUSER %{NGUSERNAME} NGUSER %{NGUSERNAME}
# Wider-ranging client IP matching
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
## ##
## COMMON LOG PATTERNS ## COMMON LOG PATTERNS
## ##
# InfluxDB log patterns
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
INFLUXDB_HTTPD_LOG \[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:response_time_us:int}
# apache & nginx logs, this is also known as the "common log format" # apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format # see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)

View File

@ -45,7 +45,7 @@ const sampleConfig = `
## /var/log/**.log -> recursively find all .log files in /var/log ## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file ## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/influxdb/influxdb.log"] files = ["/var/log/apache/access.log"]
## Read file from beginning. ## Read file from beginning.
from_beginning = false from_beginning = false
@ -58,9 +58,9 @@ const sampleConfig = `
## Other common built-in patterns are: ## Other common built-in patterns are:
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
patterns = ["%{INFLUXDB_HTTPD_LOG}"] patterns = ["%{COMBINED_LOG_FORMAT}"]
## Name of the outputted measurement name. ## Name of the outputted measurement name.
measurement = "influxdb_log" measurement = "apache_access_log"
## Full path(s) to custom pattern files. ## Full path(s) to custom pattern files.
custom_pattern_files = [] custom_pattern_files = []
## Custom patterns can also be defined here. Put one pattern per line. ## Custom patterns can also be defined here. Put one pattern per line.

View File

@ -71,7 +71,7 @@ func (p *SpecProcessor) pushMetrics() {
fields[prefix+"read_count"] = io.ReadCount fields[prefix+"read_count"] = io.ReadCount
fields[prefix+"write_count"] = io.WriteCount fields[prefix+"write_count"] = io.WriteCount
fields[prefix+"read_bytes"] = io.ReadBytes fields[prefix+"read_bytes"] = io.ReadBytes
fields[prefix+"write_bytes"] = io.WriteCount fields[prefix+"write_bytes"] = io.WriteBytes
} }
cpu_time, err := p.proc.Times() cpu_time, err := p.proc.Times()

View File

@ -17,6 +17,8 @@ func TestTailFromBeginning(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "") tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(tmpfile.Name()) defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n")
require.NoError(t, err)
tt := NewTail() tt := NewTail()
tt.FromBeginning = true tt.FromBeginning = true
@ -28,12 +30,10 @@ func TestTailFromBeginning(t *testing.T) {
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc)) require.NoError(t, tt.Start(&acc))
time.Sleep(time.Millisecond * 100)
_, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n")
require.NoError(t, err)
require.NoError(t, tt.Gather(&acc)) require.NoError(t, tt.Gather(&acc))
// arbitrary sleep to wait for message to show up // arbitrary sleep to wait for message to show up
time.Sleep(time.Millisecond * 250) time.Sleep(time.Millisecond * 150)
acc.AssertContainsTaggedFields(t, "cpu", acc.AssertContainsTaggedFields(t, "cpu",
map[string]interface{}{ map[string]interface{}{

View File

@ -158,7 +158,6 @@ func (t *TcpListener) tcpListen() error {
if err != nil { if err != nil {
return err return err
} }
// log.Printf("Received TCP Connection from %s", conn.RemoteAddr())
select { select {
case <-t.accept: case <-t.accept:
@ -194,7 +193,6 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) {
defer func() { defer func() {
t.wg.Done() t.wg.Done()
conn.Close() conn.Close()
// log.Printf("Closed TCP Connection from %s", conn.RemoteAddr())
// Add one connection potential back to channel when this one closes // Add one connection potential back to channel when this one closes
t.accept <- true t.accept <- true
t.forget(id) t.forget(id)
@ -239,14 +237,19 @@ func (t *TcpListener) tcpParser() error {
for { for {
select { select {
case <-t.done: case <-t.done:
return nil // drain input packets before finishing:
if len(t.in) == 0 {
return nil
}
case packet = <-t.in: case packet = <-t.in:
if len(packet) == 0 { if len(packet) == 0 {
continue continue
} }
metrics, err = t.parser.Parse(packet) metrics, err = t.parser.Parse(packet)
if err == nil { if err == nil {
t.storeMetrics(metrics) for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
} else { } else {
t.malformed++ t.malformed++
if t.malformed == 1 || t.malformed%1000 == 0 { if t.malformed == 1 || t.malformed%1000 == 0 {
@ -257,15 +260,6 @@ func (t *TcpListener) tcpParser() error {
} }
} }
func (t *TcpListener) storeMetrics(metrics []telegraf.Metric) error {
t.Lock()
defer t.Unlock()
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return nil
}
// forget a TCP connection // forget a TCP connection
func (t *TcpListener) forget(id string) { func (t *TcpListener) forget(id string) {
t.cleanup.Lock() t.cleanup.Lock()

View File

@ -37,6 +37,62 @@ func newTestTcpListener() (*TcpListener, chan []byte) {
return listener, in return listener, in
} }
// benchmark how long it takes to accept & process 100,000 metrics:
func BenchmarkTCP(b *testing.B) {
listener := TcpListener{
ServiceAddress: ":8198",
AllowedPendingMessages: 100000,
MaxTCPConnections: 250,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{Discard: true}
// send multiple messages to socket
for n := 0; n < b.N; n++ {
err := listener.Start(acc)
if err != nil {
panic(err)
}
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("tcp", "127.0.0.1:8198")
if err != nil {
panic(err)
}
for i := 0; i < 100000; i++ {
fmt.Fprintf(conn, testMsg)
}
// wait for 100,000 metrics to get added to accumulator
time.Sleep(time.Millisecond)
listener.Stop()
}
}
func TestHighTrafficTCP(t *testing.T) {
listener := TcpListener{
ServiceAddress: ":8199",
AllowedPendingMessages: 100000,
MaxTCPConnections: 250,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
// send multiple messages to socket
err := listener.Start(acc)
require.NoError(t, err)
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("tcp", "127.0.0.1:8199")
require.NoError(t, err)
for i := 0; i < 100000; i++ {
fmt.Fprintf(conn, testMsg)
}
time.Sleep(time.Millisecond)
listener.Stop()
assert.Equal(t, 100000, len(acc.Metrics))
}
func TestConnectTCP(t *testing.T) { func TestConnectTCP(t *testing.T) {
listener := TcpListener{ listener := TcpListener{
ServiceAddress: ":8194", ServiceAddress: ":8194",

View File

@ -3,8 +3,8 @@ package udp_listener
import ( import (
"log" "log"
"net" "net"
"strings"
"sync" "sync"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
@ -99,9 +99,11 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error {
} }
func (u *UdpListener) Stop() { func (u *UdpListener) Stop() {
u.Lock()
defer u.Unlock()
close(u.done) close(u.done)
u.listener.Close()
u.wg.Wait() u.wg.Wait()
u.listener.Close()
close(u.in) close(u.in)
log.Println("Stopped UDP listener service on ", u.ServiceAddress) log.Println("Stopped UDP listener service on ", u.ServiceAddress)
} }
@ -122,9 +124,13 @@ func (u *UdpListener) udpListen() error {
case <-u.done: case <-u.done:
return nil return nil
default: default:
u.listener.SetReadDeadline(time.Now().Add(time.Second))
n, _, err := u.listener.ReadFromUDP(buf) n, _, err := u.listener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") { if err != nil {
log.Printf("ERROR: %s\n", err.Error()) if err, ok := err.(net.Error); ok && err.Timeout() {
} else {
log.Printf("ERROR: %s\n", err.Error())
}
continue continue
} }
bufCopy := make([]byte, n) bufCopy := make([]byte, n)
@ -151,11 +157,15 @@ func (u *UdpListener) udpParser() error {
for { for {
select { select {
case <-u.done: case <-u.done:
return nil if len(u.in) == 0 {
return nil
}
case packet = <-u.in: case packet = <-u.in:
metrics, err = u.parser.Parse(packet) metrics, err = u.parser.Parse(packet)
if err == nil { if err == nil {
u.storeMetrics(metrics) for _, m := range metrics {
u.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
} else { } else {
u.malformed++ u.malformed++
if u.malformed == 1 || u.malformed%1000 == 0 { if u.malformed == 1 || u.malformed%1000 == 0 {
@ -166,15 +176,6 @@ func (u *UdpListener) udpParser() error {
} }
} }
func (u *UdpListener) storeMetrics(metrics []telegraf.Metric) error {
u.Lock()
defer u.Unlock()
for _, m := range metrics {
u.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return nil
}
func init() { func init() {
inputs.Add("udp_listener", func() telegraf.Input { inputs.Add("udp_listener", func() telegraf.Input {
return &UdpListener{} return &UdpListener{}

View File

@ -1,20 +1,36 @@
package udp_listener package udp_listener
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net"
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
testMsg = "cpu_load_short,host=server01 value=12.0 1422568543702900257\n"
testMsgs = `
cpu_load_short,host=server02 value=12.0 1422568543702900257
cpu_load_short,host=server03 value=12.0 1422568543702900257
cpu_load_short,host=server04 value=12.0 1422568543702900257
cpu_load_short,host=server05 value=12.0 1422568543702900257
cpu_load_short,host=server06 value=12.0 1422568543702900257
`
) )
func newTestUdpListener() (*UdpListener, chan []byte) { func newTestUdpListener() (*UdpListener, chan []byte) {
in := make(chan []byte, 1500) in := make(chan []byte, 1500)
listener := &UdpListener{ listener := &UdpListener{
ServiceAddress: ":8125", ServiceAddress: ":8125",
UDPPacketSize: 1500,
AllowedPendingMessages: 10000, AllowedPendingMessages: 10000,
in: in, in: in,
done: make(chan struct{}), done: make(chan struct{}),
@ -22,6 +38,72 @@ func newTestUdpListener() (*UdpListener, chan []byte) {
return listener, in return listener, in
} }
func TestHighTrafficUDP(t *testing.T) {
listener := UdpListener{
ServiceAddress: ":8126",
AllowedPendingMessages: 100000,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
// send multiple messages to socket
err := listener.Start(acc)
require.NoError(t, err)
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("udp", "127.0.0.1:8126")
require.NoError(t, err)
for i := 0; i < 20000; i++ {
// arbitrary, just to give the OS buffer some slack handling the
// packet storm.
time.Sleep(time.Microsecond)
fmt.Fprintf(conn, testMsgs)
}
time.Sleep(time.Millisecond)
listener.Stop()
// this is not an exact science, since UDP packets can easily get lost or
// dropped, but assume that the OS will be able to
// handle at least 90% of the sent UDP packets.
assert.InDelta(t, 100000, len(acc.Metrics), 10000)
}
func TestConnectUDP(t *testing.T) {
listener := UdpListener{
ServiceAddress: ":8127",
AllowedPendingMessages: 10000,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("udp", "127.0.0.1:8127")
require.NoError(t, err)
// send single message to socket
fmt.Fprintf(conn, testMsg)
time.Sleep(time.Millisecond * 15)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
)
// send multiple messages to socket
fmt.Fprintf(conn, testMsgs)
time.Sleep(time.Millisecond * 15)
hostTags := []string{"server02", "server03",
"server04", "server05", "server06"}
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag},
)
}
}
func TestRunParser(t *testing.T) { func TestRunParser(t *testing.T) {
log.SetOutput(ioutil.Discard) log.SetOutput(ioutil.Discard)
var testmsg = []byte("cpu_load_short,host=server01 value=12.0 1422568543702900257") var testmsg = []byte("cpu_load_short,host=server01 value=12.0 1422568543702900257")

View File

@ -146,7 +146,7 @@ func (i *InfluxDB) Connect() error {
func createDatabase(c client.Client, database string) error { func createDatabase(c client.Client, database string) error {
// Create Database if it doesn't exist // Create Database if it doesn't exist
_, err := c.Query(client.Query{ _, err := c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE IF NOT EXISTS \"%s\"", database), Command: fmt.Sprintf("CREATE DATABASE \"%s\"", database),
}) })
return err return err
} }

View File

@ -83,29 +83,17 @@ targets = {
} }
supported_builds = { supported_builds = {
"darwin": [ "amd64" ],
"windows": [ "amd64" ], "windows": [ "amd64" ],
"linux": [ "amd64", "i386", "armhf", "armel", "arm64", "static_amd64" ], "linux": [ "amd64", "i386", "armhf", "armel", "arm64", "static_amd64" ],
"freebsd": [ "amd64" ] "freebsd": [ "amd64" ]
} }
supported_packages = { supported_packages = {
"darwin": [ "tar" ],
"linux": [ "deb", "rpm", "tar" ], "linux": [ "deb", "rpm", "tar" ],
"windows": [ "zip" ], "windows": [ "zip" ],
"freebsd": [ "tar" ] "freebsd": [ "tar" ]
} }
supported_tags = {
# "linux": {
# "amd64": ["sensors"]
# }
}
prereq_cmds = {
# "linux": "sudo apt-get install lm-sensors libsensors4-dev"
}
################ ################
#### Telegraf Functions #### Telegraf Functions
################ ################

View File

@ -15,32 +15,28 @@ function disable_chkconfig {
rm -f /etc/init.d/telegraf rm -f /etc/init.d/telegraf
} }
if [[ -f /etc/redhat-release ]]; then if [[ "$1" == "0" ]]; then
# RHEL-variant logic # RHEL and any distribution that follow RHEL, Amazon Linux covered
if [[ "$1" = "0" ]]; then # telegraf is no longer installed, remove from init system
# InfluxDB is no longer installed, remove from init system rm -f /etc/default/telegraf
rm -f /etc/default/telegraf
which systemctl &>/dev/null which systemctl &>/dev/null
if [[ $? -eq 0 ]]; then if [[ $? -eq 0 ]]; then
disable_systemd disable_systemd
else else
# Assuming sysv # Assuming sysv
disable_chkconfig disable_chkconfig
fi
fi fi
elif [[ -f /etc/debian_version ]]; then elif [ "$1" == "remove" -o "$1" == "purge" ]; then
# Debian/Ubuntu logic # Debian/Ubuntu logic
if [[ "$1" != "upgrade" ]]; then # Remove/purge
# Remove/purge rm -f /etc/default/telegraf
rm -f /etc/default/telegraf
which systemctl &>/dev/null which systemctl &>/dev/null
if [[ $? -eq 0 ]]; then if [[ $? -eq 0 ]]; then
disable_systemd disable_systemd
else else
# Assuming sysv # Assuming sysv
disable_update_rcd disable_update_rcd
fi
fi fi
fi fi

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -27,9 +28,11 @@ func (p *Metric) String() string {
type Accumulator struct { type Accumulator struct {
sync.Mutex sync.Mutex
Metrics []*Metric Metrics []*Metric
Errors []error nMetrics uint64
debug bool Discard bool
Errors []error
debug bool
} }
// Add adds a measurement point to the accumulator // Add adds a measurement point to the accumulator
@ -43,6 +46,10 @@ func (a *Accumulator) Add(
a.AddFields(measurement, fields, tags, t...) a.AddFields(measurement, fields, tags, t...)
} }
func (a *Accumulator) NMetrics() uint64 {
return atomic.LoadUint64(&a.nMetrics)
}
// AddFields adds a measurement point with a specified timestamp. // AddFields adds a measurement point with a specified timestamp.
func (a *Accumulator) AddFields( func (a *Accumulator) AddFields(
measurement string, measurement string,
@ -50,6 +57,10 @@ func (a *Accumulator) AddFields(
tags map[string]string, tags map[string]string,
timestamp ...time.Time, timestamp ...time.Time,
) { ) {
atomic.AddUint64(&a.nMetrics, 1)
if a.Discard {
return
}
a.Lock() a.Lock()
defer a.Unlock() defer a.Unlock()
if tags == nil { if tags == nil {