Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Miki 2016-02-19 12:10:19 +01:00
commit f662a88223
96 changed files with 1757 additions and 890 deletions

View File

@ -1,4 +1,4 @@
## v0.10.3 [unreleased]
## v0.10.3 [2016-02-18]
### Release Notes
- Users of the `exec` and `kafka_consumer` (and the new `nats_consumer`
@ -8,9 +8,13 @@ format that they would like to parse. Currently supports: "json", "influx", and
- Users of message broker and file output plugins can now choose what data format
they would like to output. Currently supports: "influx" and "graphite"
- More info on parsing _incoming_ data formats can be found
[here](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md)
[here](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md)
- More info on serializing _outgoing_ data formats can be found
[here](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md)
[here](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md)
- Telegraf now has an option `flush_buffer_when_full` that will flush the
metric buffer whenever it fills up for each output, rather than dropping
points and only flushing on a set time interval. This will default to `true`
and is in the `[agent]` config section.
### Features
- [#652](https://github.com/influxdata/telegraf/pull/652): CouchDB Input Plugin. Thanks @codehate!
@ -23,11 +27,15 @@ they would like to output. Currently supports: "influx" and "graphite"
- [#679](https://github.com/influxdata/telegraf/pull/679): Support for arbitrary output data formats.
- [#695](https://github.com/influxdata/telegraf/pull/695): raindrops input plugin. Thanks @burdandrei!
- [#650](https://github.com/influxdata/telegraf/pull/650): net_response input plugin. Thanks @titilambert!
- [#699](https://github.com/influxdata/telegraf/pull/699): Flush based on buffer size rather than time.
- [#682](https://github.com/influxdata/telegraf/pull/682): Mesos input plugin. Thanks @tripledes!
### Bugfixes
- [#443](https://github.com/influxdata/telegraf/issues/443): Fix Ping command timeout parameter on Linux.
- [#662](https://github.com/influxdata/telegraf/pull/667): Change `[tags]` to `[global_tags]` to fix multiple-plugin tags bug.
- [#642](https://github.com/influxdata/telegraf/issues/642): Riemann output plugin issues.
- [#394](https://github.com/influxdata/telegraf/issues/394): Support HTTP POST. Thanks @gabelev!
- [#715](https://github.com/influxdata/telegraf/pull/715): Fix influxdb precision config panic. Thanks @netixen!
## v0.10.2 [2016-02-04]

View File

@ -101,7 +101,7 @@ Some input plugins (such as
[exec](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/exec))
accept arbitrary input data formats. An overview of these data formats can
be found
[here](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md).
[here](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
In order to enable this, you must specify a `SetParser(parser parsers.Parser)`
function on the plugin object (see the exec plugin for an example), as well as
@ -114,10 +114,10 @@ creating the `Parser` object.
You should also add the following to your SampleConfig() return:
```toml
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```
@ -230,7 +230,7 @@ Some output plugins (such as
[file](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/file))
can write arbitrary output data formats. An overview of these data formats can
be found
[here](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md).
[here](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md).
In order to enable this, you must specify a
`SetSerializer(serializer serializers.Serializer)`
@ -244,10 +244,10 @@ instantiating and creating the `Serializer` object.
You should also add the following to your SampleConfig() return:
```toml
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
## Data format to output. This can be "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
```

View File

@ -14,9 +14,7 @@ windows: prepare-windows build-windows
# Only run the build (no dependency grabbing)
build:
go build -o telegraf -ldflags \
"-X main.Version=$(VERSION)" \
./cmd/telegraf/telegraf.go
go install -ldflags "-X main.Version=$(VERSION)" ./...
build-windows:
go build -o telegraf.exe -ldflags \
@ -24,27 +22,17 @@ build-windows:
./cmd/telegraf/telegraf.go
build-for-docker:
CGO_ENABLED=0 GOOS=linux go build -o telegraf -ldflags \
CGO_ENABLED=0 GOOS=linux go -o telegraf -ldflags \
"-X main.Version=$(VERSION)" \
./cmd/telegraf/telegraf.go
# Build with race detector
dev: prepare
go build -race -o telegraf -ldflags \
"-X main.Version=$(VERSION)" \
./cmd/telegraf/telegraf.go
go build -race -ldflags "-X main.Version=$(VERSION)" ./...
# Build linux 64-bit, 32-bit and arm architectures
build-linux-bins: prepare
GOARCH=amd64 GOOS=linux go build -o telegraf_linux_amd64 \
-ldflags "-X main.Version=$(VERSION)" \
./cmd/telegraf/telegraf.go
GOARCH=386 GOOS=linux go build -o telegraf_linux_386 \
-ldflags "-X main.Version=$(VERSION)" \
./cmd/telegraf/telegraf.go
GOARCH=arm GOOS=linux go build -o telegraf_linux_arm \
-ldflags "-X main.Version=$(VERSION)" \
./cmd/telegraf/telegraf.go
# run package script
package:
./scripts/build.py --package --version="$(VERSION)" --platform=linux --arch=all --upload
# Get dependencies and use gdm to checkout changesets
prepare:

View File

@ -27,8 +27,8 @@ the [release blog post](https://influxdata.com/blog/announcing-telegraf-0-10-0/)
### Linux deb and rpm Packages:
Latest:
* http://get.influxdb.org/telegraf/telegraf_0.10.2-1_amd64.deb
* http://get.influxdb.org/telegraf/telegraf-0.10.2-1.x86_64.rpm
* http://get.influxdb.org/telegraf/telegraf_0.10.3-1_amd64.deb
* http://get.influxdb.org/telegraf/telegraf-0.10.3-1.x86_64.rpm
0.2.x:
* http://get.influxdb.org/telegraf/telegraf_0.2.4_amd64.deb
@ -52,9 +52,9 @@ for instructions, replacing the `influxdb` package name with `telegraf`.
### Linux tarballs:
Latest:
* http://get.influxdb.org/telegraf/telegraf-0.10.2-1_linux_amd64.tar.gz
* http://get.influxdb.org/telegraf/telegraf-0.10.2-1_linux_i386.tar.gz
* http://get.influxdb.org/telegraf/telegraf-0.10.2-1_linux_arm.tar.gz
* http://get.influxdb.org/telegraf/telegraf-0.10.3-1_linux_amd64.tar.gz
* http://get.influxdb.org/telegraf/telegraf-0.10.3-1_linux_i386.tar.gz
* http://get.influxdb.org/telegraf/telegraf-0.10.3-1_linux_arm.tar.gz
0.2.x:
* http://get.influxdb.org/telegraf/telegraf_linux_amd64_0.2.4.tar.gz
@ -66,13 +66,13 @@ Latest:
To install the full directory structure with config file, run:
```
sudo tar -C / -zxvf ./telegraf-0.10.2-1_linux_amd64.tar.gz
sudo tar -C / -zxvf ./telegraf-0.10.3-1_linux_amd64.tar.gz
```
To extract only the binary, run:
```
tar -zxvf telegraf-0.10.2-1_linux_amd64.tar.gz --strip-components=3 ./usr/bin/telegraf
tar -zxvf telegraf-0.10.3-1_linux_amd64.tar.gz --strip-components=3 ./usr/bin/telegraf
```
### Ansible Role:
@ -141,7 +141,7 @@ Examples:
## Configuration
See the [configuration guide](CONFIGURATION.md) for a rundown of the more advanced
See the [configuration guide](docs/CONFIGURATION.md) for a rundown of the more advanced
configuration options.
## Supported Input Plugins
@ -169,6 +169,7 @@ Currently implemented sources:
* lustre2
* mailchimp
* memcached
* mesos
* mongodb
* mysql
* net_response

View File

@ -58,7 +58,8 @@ func (a *Agent) Connect() error {
}
err := o.Output.Connect()
if err != nil {
log.Printf("Failed to connect to output %s, retrying in 15s, error was '%s' \n", o.Name, err)
log.Printf("Failed to connect to output %s, retrying in 15s, "+
"error was '%s' \n", o.Name, err)
time.Sleep(15 * time.Second)
err = o.Output.Connect()
if err != nil {
@ -241,7 +242,7 @@ func (a *Agent) Test() error {
return nil
}
// flush writes a list of points to all configured outputs
// flush writes a list of metrics to all configured outputs
func (a *Agent) flush() {
var wg sync.WaitGroup
@ -260,7 +261,7 @@ func (a *Agent) flush() {
wg.Wait()
}
// flusher monitors the points input channel and flushes on the minimum interval
// flusher monitors the metrics input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error {
// Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected.
@ -271,14 +272,14 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
for {
select {
case <-shutdown:
log.Println("Hang on, flushing any cached points before shutdown")
log.Println("Hang on, flushing any cached metrics before shutdown")
a.flush()
return nil
case <-ticker.C:
a.flush()
case m := <-metricC:
for _, o := range a.Config.Outputs {
o.AddPoint(m)
o.AddMetric(m)
}
}
}
@ -318,8 +319,24 @@ func (a *Agent) Run(shutdown chan struct{}) error {
a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet,
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)
// channel shared between all input threads for accumulating points
metricC := make(chan telegraf.Metric, 1000)
// channel shared between all input threads for accumulating metrics
metricC := make(chan telegraf.Metric, 10000)
for _, input := range a.Config.Inputs {
// Start service of any ServicePlugins
switch p := input.Input.(type) {
case telegraf.ServiceInput:
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil {
log.Printf("Service for input %s failed to start, exiting\n%s\n",
input.Name, err.Error())
return err
}
defer p.Stop()
}
}
// Round collection to nearest interval by sleeping
if a.Config.Agent.RoundInterval {
@ -338,18 +355,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}()
for _, input := range a.Config.Inputs {
// Start service of any ServicePlugins
switch p := input.Input.(type) {
case telegraf.ServiceInput:
if err := p.Start(); err != nil {
log.Printf("Service for input %s failed to start, exiting\n%s\n",
input.Name, err.Error())
return err
}
defer p.Stop()
}
// Special handling for inputs that have their own collection interval
// configured. Default intervals are handled below with gatherParallel
if input.Config.Interval != 0 {

View File

@ -25,19 +25,19 @@ example, in the exec plugin:
```toml
[[inputs.exec]]
### Commands array
## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
### measurement name suffix (for separating different commands)
## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"
### Additional configuration options go here
## Additional configuration options go here
```
Each data_format has an additional set of configuration options available, which
@ -52,16 +52,16 @@ metrics are parsed directly into Telegraf metrics.
```toml
[[inputs.exec]]
### Commands array
## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
### measurement name suffix (for separating different commands)
## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```
@ -97,19 +97,19 @@ For example, if you had this configuration:
```toml
[[inputs.exec]]
### Commands array
## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
### measurement name suffix (for separating different commands)
## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"
### List of tag names to extract from top-level of JSON server response
## List of tag names to extract from top-level of JSON server response
tag_keys = [
"my_tag_1",
"my_tag_2"
@ -241,30 +241,30 @@ There are many more options available,
```toml
[[inputs.exec]]
### Commands array
## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
### measurement name suffix (for separating different commands)
## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Data format to consume. This can be "json", "influx" or "graphite" (line-protocol)
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
## Data format to consume. This can be "json", "influx" or "graphite" (line-protocol)
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "graphite"
### This string will be used to join the matched values.
## This string will be used to join the matched values.
separator = "_"
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. There can be only one default template.
### Templates support below format:
### 1. filter + template
### 2. filter + template + extra tag
### 3. filter + template with field key
### 4. default template
## Each template line requires a template pattern. It can have an optional
## filter before the template and separated by spaces. It can also have optional extra
## tags following the template. Multiple tags should be separated by commas and no spaces
## similar to the line protocol format. There can be only one default template.
## Templates support below format:
## 1. filter + template
## 2. filter + template + extra tag
## 3. filter + template with field key
## 4. default template
templates = [
"*.app env.service.resource.measurement",
"stats.* .host.measurement* region=us-west,agent=sensu",

View File

@ -26,16 +26,16 @@ config option, for example, in the `file` output plugin:
```toml
[[outputs.file]]
### Files to write to, "stdout" is a specially handled file.
## Files to write to, "stdout" is a specially handled file.
files = ["stdout"]
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
## Data format to output. This can be "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
### Additional configuration options go here
## Additional configuration options go here
```
Each data_format has an additional set of configuration options available, which
@ -50,13 +50,13 @@ metrics are serialized directly into InfluxDB line-protocol.
```toml
[[outputs.file]]
### Files to write to, "stdout" is a specially handled file.
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
## Data format to output. This can be "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
```
@ -84,13 +84,13 @@ tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690
```toml
[[outputs.file]]
### Files to write to, "stdout" is a specially handled file.
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
## Data format to output. This can be "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
prefix = "telegraf"

View File

@ -16,23 +16,37 @@
# Configuration for telegraf agent
[agent]
# Default data collection interval for all plugins
## Default data collection interval for all inputs
interval = "10s"
# Rounds collection interval to 'interval'
# ie, if interval="10s" then always collect on :00, :10, :20, etc.
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
# Default data flushing interval for all outputs. You should not set this below
# interval. Maximum flush_interval will be flush_interval + flush_jitter
## Telegraf will cache metric_buffer_limit metrics for each output, and will
## flush this buffer on a successful write.
metric_buffer_limit = 10000
## Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Default flushing interval for all outputs. You shouldn't set this below
## interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "10s"
# Jitter the flush interval by a random amount. This is primarily to avoid
# large write spikes for users running a large number of telegraf instances.
# ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
# Run telegraf in debug mode
## Run telegraf in debug mode
debug = false
# Override default hostname, if empty use os.Hostname()
## Run telegraf in quiet mode
quiet = false
## Override default hostname, if empty use os.Hostname()
hostname = ""
@ -49,7 +63,7 @@
urls = ["http://localhost:8086"] # required
# The target database for metrics (telegraf will create it if not exists)
database = "telegraf" # required
# Precision of writes, valid values are n, u, ms, s, m, and h
# Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# note: using second precision greatly helps InfluxDB compression
precision = "s"

View File

@ -24,7 +24,7 @@ type ServiceInput interface {
Gather(Accumulator) error
// Start starts the ServiceInput's service, whatever that may be
Start() error
Start(Accumulator) error
// Stop stops the services and closes any necessary channels and connections
Stop()

View File

@ -68,7 +68,7 @@ type AgentConfig struct {
// same time, which can have a measurable effect on the system.
CollectionJitter internal.Duration
// Interval at which to flush data
// FlushInterval is the Interval at which to flush data
FlushInterval internal.Duration
// FlushJitter Jitters the flush interval by a random amount.
@ -82,6 +82,11 @@ type AgentConfig struct {
// full, the oldest metrics will be overwritten.
MetricBufferLimit int
// FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever
// it fills up, regardless of FlushInterval. Setting this option to true
// does _not_ deactivate FlushInterval.
FlushBufferWhenFull bool
// TODO(cam): Remove UTC and Precision parameters, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatability
@ -128,9 +133,7 @@ func (c *Config) ListTags() string {
return strings.Join(tags, " ")
}
var header = `###############################################################################
# Telegraf Configuration #
###############################################################################
var header = `# Telegraf Configuration
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
@ -148,35 +151,37 @@ var header = `##################################################################
# Configuration for telegraf agent
[agent]
### Default data collection interval for all inputs
## Default data collection interval for all inputs
interval = "10s"
### Rounds collection interval to 'interval'
### ie, if interval="10s" then always collect on :00, :10, :20, etc.
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
### Telegraf will cache metric_buffer_limit metrics for each output, and will
### flush this buffer on a successful write.
## Telegraf will cache metric_buffer_limit metrics for each output, and will
## flush this buffer on a successful write.
metric_buffer_limit = 10000
## Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true
### Collection jitter is used to jitter the collection by a random amount.
### Each plugin will sleep for a random time within jitter before collecting.
### This can be used to avoid many plugins querying things like sysfs at the
### same time, which can have a measurable effect on the system.
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
### Default flushing interval for all outputs. You shouldn't set this below
### interval. Maximum flush_interval will be flush_interval + flush_jitter
## Default flushing interval for all outputs. You shouldn't set this below
## interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "10s"
### Jitter the flush interval by a random amount. This is primarily to avoid
### large write spikes for users running a large number of telegraf instances.
### ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
### Run telegraf in debug mode
## Run telegraf in debug mode
debug = false
### Run telegraf in quiet mode
## Run telegraf in quiet mode
quiet = false
### Override default hostname, if empty use os.Hostname()
## Override default hostname, if empty use os.Hostname()
hostname = ""
@ -421,8 +426,9 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
ro := internal_models.NewRunningOutput(name, output, outputConfig)
if c.Agent.MetricBufferLimit > 0 {
ro.PointBufferLimit = c.Agent.MetricBufferLimit
ro.MetricBufferLimit = c.Agent.MetricBufferLimit
}
ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull
ro.Quiet = c.Agent.Quiet
c.Outputs = append(c.Outputs, ro)
return nil

View File

@ -184,6 +184,15 @@
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]
# Telegraf plugin for gathering metrics from N Mesos masters
[[inputs.mesos]]
# Timeout, in ms.
timeout = 100
# A list of Mesos masters, default value is localhost:5050.
masters = ["localhost:5050"]
# Metrics groups to be collected, by default, all enabled.
master_collections = ["resources","master","system","slaves","frameworks","messages","evqueue","registrar"]
# Read metrics from one or many MongoDB servers
[[inputs.mongodb]]
# An array of URI to gather stats about. Specify an ip or hostname

View File

@ -2,22 +2,34 @@ package internal_models
import (
"log"
"sync"
"time"
"github.com/influxdata/telegraf"
)
const DEFAULT_POINT_BUFFER_LIMIT = 10000
const (
// Default number of metrics kept between flushes.
DEFAULT_METRIC_BUFFER_LIMIT = 10000
// Limit how many full metric buffers are kept due to failed writes.
FULL_METRIC_BUFFERS_LIMIT = 100
)
type RunningOutput struct {
Name string
Output telegraf.Output
Config *OutputConfig
Quiet bool
PointBufferLimit int
Name string
Output telegraf.Output
Config *OutputConfig
Quiet bool
MetricBufferLimit int
FlushBufferWhenFull bool
metrics []telegraf.Metric
overwriteCounter int
metrics []telegraf.Metric
tmpmetrics map[int][]telegraf.Metric
overwriteI int
mapI int
sync.Mutex
}
func NewRunningOutput(
@ -26,47 +38,95 @@ func NewRunningOutput(
conf *OutputConfig,
) *RunningOutput {
ro := &RunningOutput{
Name: name,
metrics: make([]telegraf.Metric, 0),
Output: output,
Config: conf,
PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT,
Name: name,
metrics: make([]telegraf.Metric, 0),
tmpmetrics: make(map[int][]telegraf.Metric),
Output: output,
Config: conf,
MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT,
}
return ro
}
func (ro *RunningOutput) AddPoint(point telegraf.Metric) {
// AddMetric adds a metric to the output. This function can also write cached
// points if FlushBufferWhenFull is true.
func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
if ro.Config.Filter.IsActive {
if !ro.Config.Filter.ShouldMetricPass(point) {
if !ro.Config.Filter.ShouldMetricPass(metric) {
return
}
}
ro.Lock()
defer ro.Unlock()
if len(ro.metrics) < ro.PointBufferLimit {
ro.metrics = append(ro.metrics, point)
if len(ro.metrics) < ro.MetricBufferLimit {
ro.metrics = append(ro.metrics, metric)
} else {
log.Printf("WARNING: overwriting cached metrics, you may want to " +
"increase the metric_buffer_limit setting in your [agent] config " +
"if you do not wish to overwrite metrics.\n")
if ro.overwriteCounter == len(ro.metrics) {
ro.overwriteCounter = 0
if ro.FlushBufferWhenFull {
ro.metrics = append(ro.metrics, metric)
tmpmetrics := make([]telegraf.Metric, len(ro.metrics))
copy(tmpmetrics, ro.metrics)
ro.metrics = make([]telegraf.Metric, 0)
err := ro.write(tmpmetrics)
if err != nil {
log.Printf("ERROR writing full metric buffer to output %s, %s",
ro.Name, err)
if len(ro.tmpmetrics) == FULL_METRIC_BUFFERS_LIMIT {
ro.mapI = 0
// overwrite one
ro.tmpmetrics[ro.mapI] = tmpmetrics
ro.mapI++
} else {
ro.tmpmetrics[ro.mapI] = tmpmetrics
ro.mapI++
}
}
} else {
log.Printf("WARNING: overwriting cached metrics, you may want to " +
"increase the metric_buffer_limit setting in your [agent] " +
"config if you do not wish to overwrite metrics.\n")
if ro.overwriteI == len(ro.metrics) {
ro.overwriteI = 0
}
ro.metrics[ro.overwriteI] = metric
ro.overwriteI++
}
ro.metrics[ro.overwriteCounter] = point
ro.overwriteCounter++
}
}
// Write writes all cached points to this output.
func (ro *RunningOutput) Write() error {
ro.Lock()
defer ro.Unlock()
err := ro.write(ro.metrics)
if err != nil {
return err
} else {
ro.metrics = make([]telegraf.Metric, 0)
ro.overwriteI = 0
}
// Write any cached metric buffers that failed previously
for i, tmpmetrics := range ro.tmpmetrics {
if err := ro.write(tmpmetrics); err != nil {
return err
} else {
delete(ro.tmpmetrics, i)
}
}
return nil
}
func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
start := time.Now()
err := ro.Output.Write(ro.metrics)
err := ro.Output.Write(metrics)
elapsed := time.Since(start)
if err == nil {
if !ro.Quiet {
log.Printf("Wrote %d metrics to output %s in %s\n",
len(ro.metrics), ro.Name, elapsed)
len(metrics), ro.Name, elapsed)
}
ro.metrics = make([]telegraf.Metric, 0)
ro.overwriteCounter = 0
}
return err
}

View File

@ -0,0 +1,265 @@
package internal_models
import (
"fmt"
"sort"
"sync"
"testing"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var first5 = []telegraf.Metric{
testutil.TestMetric(101, "metric1"),
testutil.TestMetric(101, "metric2"),
testutil.TestMetric(101, "metric3"),
testutil.TestMetric(101, "metric4"),
testutil.TestMetric(101, "metric5"),
}
var next5 = []telegraf.Metric{
testutil.TestMetric(101, "metric6"),
testutil.TestMetric(101, "metric7"),
testutil.TestMetric(101, "metric8"),
testutil.TestMetric(101, "metric9"),
testutil.TestMetric(101, "metric10"),
}
// Test that we can write metrics with simple default setup.
func TestRunningOutputDefault(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
for _, metric := range first5 {
ro.AddMetric(metric)
}
for _, metric := range next5 {
ro.AddMetric(metric)
}
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 10)
}
// Test that the first metric gets overwritten if there is a buffer overflow.
func TestRunningOutputOverwrite(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.MetricBufferLimit = 4
for _, metric := range first5 {
ro.AddMetric(metric)
}
require.Len(t, m.Metrics(), 0)
err := ro.Write()
require.NoError(t, err)
require.Len(t, m.Metrics(), 4)
var expected, actual []string
for i, exp := range first5[1:] {
expected = append(expected, exp.String())
actual = append(actual, m.Metrics()[i].String())
}
sort.Strings(expected)
sort.Strings(actual)
assert.Equal(t, expected, actual)
}
// Test that multiple buffer overflows are handled properly.
func TestRunningOutputMultiOverwrite(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.MetricBufferLimit = 3
for _, metric := range first5 {
ro.AddMetric(metric)
}
for _, metric := range next5 {
ro.AddMetric(metric)
}
require.Len(t, m.Metrics(), 0)
err := ro.Write()
require.NoError(t, err)
require.Len(t, m.Metrics(), 3)
var expected, actual []string
for i, exp := range next5[2:] {
expected = append(expected, exp.String())
actual = append(actual, m.Metrics()[i].String())
}
sort.Strings(expected)
sort.Strings(actual)
assert.Equal(t, expected, actual)
}
// Test that running output doesn't flush until it's full when
// FlushBufferWhenFull is set.
func TestRunningOutputFlushWhenFull(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.FlushBufferWhenFull = true
ro.MetricBufferLimit = 5
// Fill buffer to limit
for _, metric := range first5 {
ro.AddMetric(metric)
}
// no flush yet
assert.Len(t, m.Metrics(), 0)
// add one more metric
ro.AddMetric(next5[0])
// now it flushed
assert.Len(t, m.Metrics(), 6)
// add one more metric and write it manually
ro.AddMetric(next5[1])
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 7)
}
// Test that running output doesn't flush until it's full when
// FlushBufferWhenFull is set, twice.
func TestRunningOutputMultiFlushWhenFull(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.FlushBufferWhenFull = true
ro.MetricBufferLimit = 4
// Fill buffer past limit twive
for _, metric := range first5 {
ro.AddMetric(metric)
}
for _, metric := range next5 {
ro.AddMetric(metric)
}
// flushed twice
assert.Len(t, m.Metrics(), 10)
}
func TestRunningOutputWriteFail(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
IsActive: false,
},
}
m := &mockOutput{}
m.failWrite = true
ro := NewRunningOutput("test", m, conf)
ro.FlushBufferWhenFull = true
ro.MetricBufferLimit = 4
// Fill buffer past limit twice
for _, metric := range first5 {
ro.AddMetric(metric)
}
for _, metric := range next5 {
ro.AddMetric(metric)
}
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// manual write fails
err := ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
m.failWrite = false
err = ro.Write()
require.NoError(t, err)
assert.Len(t, m.Metrics(), 10)
}
type mockOutput struct {
sync.Mutex
metrics []telegraf.Metric
// if true, mock a write failure
failWrite bool
}
func (m *mockOutput) Connect() error {
return nil
}
func (m *mockOutput) Close() error {
return nil
}
func (m *mockOutput) Description() string {
return ""
}
func (m *mockOutput) SampleConfig() string {
return ""
}
func (m *mockOutput) Write(metrics []telegraf.Metric) error {
m.Lock()
defer m.Unlock()
if m.failWrite {
return fmt.Errorf("Failed Write!")
}
if m.metrics == nil {
m.metrics = []telegraf.Metric{}
}
for _, metric := range metrics {
m.metrics = append(m.metrics, metric)
}
return nil
}
func (m *mockOutput) Metrics() []telegraf.Metric {
m.Lock()
defer m.Unlock()
return m.metrics
}

View File

@ -104,9 +104,9 @@ type Aerospike struct {
}
var sampleConfig = `
### Aerospike servers to connect to (with port)
### This plugin will query all namespaces the aerospike
### server has configured and get stats for them.
## Aerospike servers to connect to (with port)
## This plugin will query all namespaces the aerospike
## server has configured and get stats for them.
servers = ["localhost:3000"]
`

View File

@ -20,6 +20,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/lustre2"
_ "github.com/influxdata/telegraf/plugins/inputs/mailchimp"
_ "github.com/influxdata/telegraf/plugins/inputs/memcached"
_ "github.com/influxdata/telegraf/plugins/inputs/mesos"
_ "github.com/influxdata/telegraf/plugins/inputs/mongodb"
_ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/mysql"

View File

@ -20,7 +20,7 @@ type Apache struct {
}
var sampleConfig = `
### An array of Apache status URI to gather stats.
## An array of Apache status URI to gather stats.
urls = ["http://localhost/server-status?auto"]
`

View File

@ -18,13 +18,13 @@ type Bcache struct {
}
var sampleConfig = `
### Bcache sets path
### If not specified, then default is:
## Bcache sets path
## If not specified, then default is:
bcachePath = "/sys/fs/bcache"
### By default, telegraf gather stats for all bcache devices
### Setting devices will restrict the stats to the specified
### bcache devices.
## By default, telegraf gather stats for all bcache devices
## Setting devices will restrict the stats to the specified
## bcache devices.
bcacheDevs = ["bcache0"]
`

View File

@ -75,8 +75,8 @@ func (*CouchDB) Description() string {
func (*CouchDB) SampleConfig() string {
return `
### Works with CouchDB stats endpoints out of the box
### Multiple HOSTs from which to read CouchDB stats:
## Works with CouchDB stats endpoints out of the box
## Multiple HOSTs from which to read CouchDB stats:
hosts = ["http://localhost:8086/_stats"]
`
}

View File

@ -22,11 +22,11 @@ type Disque struct {
}
var sampleConfig = `
### An array of URI to gather stats about. Specify an ip or hostname
### with optional port and password. ie disque://localhost, disque://10.10.3.33:18832,
### 10.0.0.1:10000, etc.
## An array of URI to gather stats about. Specify an ip or hostname
## with optional port and password. ie disque://localhost, disque://10.10.3.33:18832,
## 10.0.0.1:10000, etc.
### If no servers are specified, then localhost is used as the host.
## If no servers are specified, then localhost is used as the host.
servers = ["localhost"]
`

View File

@ -21,11 +21,11 @@ type Docker struct {
}
var sampleConfig = `
### Docker Endpoint
### To use TCP, set endpoint = "tcp://[ip]:[port]"
### To use environment variables (ie, docker-machine), set endpoint = "ENV"
## Docker Endpoint
## To use TCP, set endpoint = "tcp://[ip]:[port]"
## To use environment variables (ie, docker-machine), set endpoint = "ENV"
endpoint = "unix:///var/run/docker.sock"
### Only collect metrics for these containers, collect all if empty
## Only collect metrics for these containers, collect all if empty
container_names = []
`

View File

@ -24,13 +24,13 @@ func (d *Dovecot) Description() string {
}
var sampleConfig = `
### specify dovecot servers via an address:port list
### e.g.
### localhost:24242
###
### If no servers are specified, then localhost is used as the host.
## specify dovecot servers via an address:port list
## e.g.
## localhost:24242
##
## If no servers are specified, then localhost is used as the host.
servers = ["localhost:24242"]
### Only collect metrics for these domains, collect all if empty
## Only collect metrics for these domains, collect all if empty
domains = []
`

View File

@ -59,14 +59,14 @@ type indexHealth struct {
}
const sampleConfig = `
### specify a list of one or more Elasticsearch servers
## specify a list of one or more Elasticsearch servers
servers = ["http://localhost:9200"]
### set local to false when you want to read the indices stats from all nodes
### within the cluster
## set local to false when you want to read the indices stats from all nodes
## within the cluster
local = true
### set cluster_health to true when you want to also obtain cluster level stats
## set cluster_health to true when you want to also obtain cluster level stats
cluster_health = false
`

View File

@ -37,19 +37,19 @@ and strings will be ignored.
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Below configuration will be used for data_format = "graphite", can be ignored for other data_format
### If matching multiple measurement files, this string will be used to join the matched values.
## Below configuration will be used for data_format = "graphite", can be ignored for other data_format
## If matching multiple measurement files, this string will be used to join the matched values.
#separator = "."
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### 1. filter + template
### 2. filter + template + extra tag
### 3. filter + template with field key
### 4. default template
## Each template line requires a template pattern. It can have an optional
## filter before the template and separated by spaces. It can also have optional extra
## tags following the template. Multiple tags should be separated by commas and no spaces
## similar to the line protocol format. The can be only one default template.
## Templates support below format:
## 1. filter + template
## 2. filter + template + extra tag
## 3. filter + template with field key
## 4. default template
#templates = [
# "*.app env.service.resource.measurement",
# "stats.* .host.measurement* region=us-west,agent=sensu",
@ -141,19 +141,19 @@ We can also change the data_format to "graphite" to use the metrics collecting s
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Below configuration will be used for data_format = "graphite", can be ignored for other data_format
### If matching multiple measurement files, this string will be used to join the matched values.
## Below configuration will be used for data_format = "graphite", can be ignored for other data_format
## If matching multiple measurement files, this string will be used to join the matched values.
separator = "."
### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. The can be only one default template.
### Templates support below format:
### 1. filter + template
### 2. filter + template + extra tag
### 3. filter + template with field key
### 4. default template
## Each template line requires a template pattern. It can have an optional
## filter before the template and separated by spaces. It can also have optional extra
## tags following the template. Multiple tags should be separated by commas and no spaces
## similar to the line protocol format. The can be only one default template.
## Templates support below format:
## 1. filter + template
## 2. filter + template + extra tag
## 3. filter + template with field key
## 4. default template
templates = [
"*.app env.service.resource.measurement",
"stats.* .host.measurement* region=us-west,agent=sensu",

View File

@ -14,16 +14,16 @@ import (
)
const sampleConfig = `
### Commands array
## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
### measurement name suffix (for separating different commands)
## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`

View File

@ -31,7 +31,7 @@ func NewGithubWebhooks() *GithubWebhooks {
func (gh *GithubWebhooks) SampleConfig() string {
return `
### Address and port to host Webhook listener on
## Address and port to host Webhook listener on
service_address = ":1618"
`
}
@ -61,7 +61,7 @@ func (gh *GithubWebhooks) Listen() {
}
}
func (gh *GithubWebhooks) Start() error {
func (gh *GithubWebhooks) Start(_ telegraf.Accumulator) error {
go gh.Listen()
log.Printf("Started the github_webhooks service on %s\n", gh.ServiceAddress)
return nil

View File

@ -86,13 +86,13 @@ type haproxy struct {
}
var sampleConfig = `
### An array of address to gather stats about. Specify an ip on hostname
### with optional port. ie localhost, 10.10.3.33:1936, etc.
## An array of address to gather stats about. Specify an ip on hostname
## with optional port. ie localhost, 10.10.3.33:1936, etc.
### If no servers are specified, then default to 127.0.0.1:1936
## If no servers are specified, then default to 127.0.0.1:1936
servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"]
### Or you can also use local socket(not work yet)
### servers = ["socket://run/haproxy/admin.sock"]
## Or you can also use local socket(not work yet)
## servers = ["socket://run/haproxy/admin.sock"]
`
func (r *haproxy) SampleConfig() string {

View File

@ -1,6 +1,7 @@
package httpjson
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
@ -46,33 +47,33 @@ func (c RealHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) {
}
var sampleConfig = `
### NOTE This plugin only reads numerical measurements, strings and booleans
### will be ignored.
## NOTE This plugin only reads numerical measurements, strings and booleans
## will be ignored.
### a name for the service being polled
## a name for the service being polled
name = "webserver_stats"
### URL of each server in the service's cluster
## URL of each server in the service's cluster
servers = [
"http://localhost:9999/stats/",
"http://localhost:9998/stats/",
]
### HTTP method to use (case-sensitive)
## HTTP method to use: GET or POST (case-sensitive)
method = "GET"
### List of tag names to extract from top-level of JSON server response
## List of tag names to extract from top-level of JSON server response
# tag_keys = [
# "my_tag_1",
# "my_tag_2"
# ]
### HTTP parameters (all values must be strings)
## HTTP parameters (all values must be strings)
[inputs.httpjson.parameters]
event_type = "cpu_spike"
threshold = "0.75"
### HTTP Header parameters (all values must be strings)
## HTTP Header parameters (all values must be strings)
# [inputs.httpjson.headers]
# X-Auth-Token = "my-xauth-token"
# apiVersion = "v1"
@ -166,7 +167,8 @@ func (h *HttpJson) gatherServer(
return nil
}
// Sends an HTTP request to the server using the HttpJson object's HTTPClient
// Sends an HTTP request to the server using the HttpJson object's HTTPClient.
// This request can be either a GET or a POST.
// Parameters:
// serverURL: endpoint to send request to
//
@ -181,13 +183,24 @@ func (h *HttpJson) sendRequest(serverURL string) (string, float64, error) {
}
params := url.Values{}
for k, v := range h.Parameters {
params.Add(k, v)
data := url.Values{}
switch {
case h.Method == "GET":
requestURL.RawQuery = params.Encode()
for k, v := range h.Parameters {
params.Add(k, v)
}
case h.Method == "POST":
requestURL.RawQuery = ""
for k, v := range h.Parameters {
data.Add(k, v)
}
}
requestURL.RawQuery = params.Encode()
// Create + send request
req, err := http.NewRequest(h.Method, requestURL.String(), nil)
req, err := http.NewRequest(h.Method, requestURL.String(), bytes.NewBufferString(data.Encode()))
if err != nil {
return "", -1, err
}

View File

@ -22,11 +22,11 @@ func (*InfluxDB) Description() string {
func (*InfluxDB) SampleConfig() string {
return `
### Works with InfluxDB debug endpoints out of the box,
### but other services can use this format too.
### See the influxdb plugin's README for more details.
## Works with InfluxDB debug endpoints out of the box,
## but other services can use this format too.
## See the influxdb plugin's README for more details.
### Multiple URLs from which to read InfluxDB-formatted JSON
## Multiple URLs from which to read InfluxDB-formatted JSON
urls = [
"http://localhost:8086/debug/vars"
]

View File

@ -46,10 +46,10 @@ type Jolokia struct {
func (j *Jolokia) SampleConfig() string {
return `
### This is the context root used to compose the jolokia url
## This is the context root used to compose the jolokia url
context = "/jolokia/read"
### List of servers exposing jolokia read service
## List of servers exposing jolokia read service
[[inputs.jolokia.servers]]
name = "stable"
host = "192.168.103.2"
@ -57,10 +57,10 @@ func (j *Jolokia) SampleConfig() string {
# username = "myuser"
# password = "mypassword"
### List of metrics collected on above servers
### Each metric consists in a name, a jmx path and either
### a pass or drop slice attribute.
### This collect all heap memory usage metrics.
## List of metrics collected on above servers
## Each metric consists in a name, a jmx path and either
## a pass or drop slice attribute.
## This collect all heap memory usage metrics.
[[inputs.jolokia.metrics]]
name = "heap_memory_usage"
jmx = "/java.lang:type=Memory/HeapMemoryUsage"

View File

@ -11,21 +11,21 @@ from the same topic in parallel.
```toml
# Read metrics from Kafka topic(s)
[[inputs.kafka_consumer]]
### topic(s) to consume
## topic(s) to consume
topics = ["telegraf"]
### an array of Zookeeper connection strings
## an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
### the name of the consumer group
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
### Maximum number of metrics to buffer between collection intervals
## Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Offset (must be either "oldest" or "newest")
## Offset (must be either "oldest" or "newest")
offset = "oldest"
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```

View File

@ -1,7 +1,6 @@
package kafka_consumer
import (
"fmt"
"log"
"strings"
"sync"
@ -19,11 +18,13 @@ type Kafka struct {
Topics []string
ZookeeperPeers []string
Consumer *consumergroup.ConsumerGroup
MetricBuffer int
// Legacy metric buffer support
MetricBuffer int
// TODO remove PointBuffer, legacy support
PointBuffer int
Offset string
Offset string
parser parsers.Parser
sync.Mutex
@ -32,9 +33,10 @@ type Kafka struct {
in <-chan *sarama.ConsumerMessage
// channel for all kafka consumer errors
errs <-chan *sarama.ConsumerError
// channel for all incoming parsed kafka metrics
metricC chan telegraf.Metric
done chan struct{}
done chan struct{}
// keep the accumulator internally:
acc telegraf.Accumulator
// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer
// this is mostly for test purposes, but there may be a use-case for it later.
@ -42,21 +44,19 @@ type Kafka struct {
}
var sampleConfig = `
### topic(s) to consume
## topic(s) to consume
topics = ["telegraf"]
### an array of Zookeeper connection strings
## an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
### the name of the consumer group
## the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Offset (must be either "oldest" or "newest")
## Offset (must be either "oldest" or "newest")
offset = "oldest"
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
@ -72,11 +72,13 @@ func (k *Kafka) SetParser(parser parsers.Parser) {
k.parser = parser
}
func (k *Kafka) Start() error {
func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
var consumerErr error
k.acc = acc
config := consumergroup.NewConfig()
switch strings.ToLower(k.Offset) {
case "oldest", "":
@ -106,13 +108,6 @@ func (k *Kafka) Start() error {
}
k.done = make(chan struct{})
if k.PointBuffer == 0 && k.MetricBuffer == 0 {
k.MetricBuffer = 100000
} else if k.PointBuffer > 0 {
// Legacy support of PointBuffer field TODO remove
k.MetricBuffer = k.PointBuffer
}
k.metricC = make(chan telegraf.Metric, k.MetricBuffer)
// Start the kafka message reader
go k.receiver()
@ -138,14 +133,7 @@ func (k *Kafka) receiver() {
}
for _, metric := range metrics {
fmt.Println(string(metric.Name()))
select {
case k.metricC <- metric:
continue
default:
log.Printf("Kafka Consumer buffer is full, dropping a metric." +
" You may want to increase the metric_buffer setting")
}
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
if !k.doNotCommitMsgs {
@ -169,13 +157,6 @@ func (k *Kafka) Stop() {
}
func (k *Kafka) Gather(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
nmetrics := len(k.metricC)
for i := 0; i < nmetrics; i++ {
metric := <-k.metricC
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
return nil
}

View File

@ -44,18 +44,19 @@ func TestReadsMetricsFromKafka(t *testing.T) {
}
p, _ := parsers.NewInfluxParser()
k.SetParser(p)
if err := k.Start(); err != nil {
// Verify that we can now gather the sent message
var acc testutil.Accumulator
// Sanity check
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
if err := k.Start(&acc); err != nil {
t.Fatal(err.Error())
} else {
defer k.Stop()
}
waitForPoint(k, t)
// Verify that we can now gather the sent message
var acc testutil.Accumulator
// Sanity check
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
waitForPoint(&acc, t)
// Gather points
err = k.Gather(&acc)
@ -77,7 +78,7 @@ func TestReadsMetricsFromKafka(t *testing.T) {
// Waits for the metric that was sent to the kafka broker to arrive at the kafka
// consumer
func waitForPoint(k *Kafka, t *testing.T) {
func waitForPoint(acc *testutil.Accumulator, t *testing.T) {
// Give the kafka container up to 2 seconds to get the point to the consumer
ticker := time.NewTicker(5 * time.Millisecond)
counter := 0
@ -87,7 +88,7 @@ func waitForPoint(k *Kafka, t *testing.T) {
counter++
if counter > 1000 {
t.Fatal("Waited for 5s, point never arrived to consumer")
} else if len(k.metricC) == 1 {
} else if acc.NFields() == 1 {
return
}
}

View File

@ -4,7 +4,6 @@ import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
@ -17,29 +16,28 @@ const (
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
invalidMsg = "cpu_load_short,host=server01 1422568543702900257"
pointBuffer = 5
)
func NewTestKafka() (*Kafka, chan *sarama.ConsumerMessage) {
in := make(chan *sarama.ConsumerMessage, pointBuffer)
func newTestKafka() (*Kafka, chan *sarama.ConsumerMessage) {
in := make(chan *sarama.ConsumerMessage, 1000)
k := Kafka{
ConsumerGroup: "test",
Topics: []string{"telegraf"},
ZookeeperPeers: []string{"localhost:2181"},
PointBuffer: pointBuffer,
Offset: "oldest",
in: in,
doNotCommitMsgs: true,
errs: make(chan *sarama.ConsumerError, pointBuffer),
errs: make(chan *sarama.ConsumerError, 1000),
done: make(chan struct{}),
metricC: make(chan telegraf.Metric, pointBuffer),
}
return &k, in
}
// Test that the parser parses kafka messages into points
func TestRunParser(t *testing.T) {
k, in := NewTestKafka()
k, in := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewInfluxParser()
@ -47,12 +45,14 @@ func TestRunParser(t *testing.T) {
in <- saramaMsg(testMsg)
time.Sleep(time.Millisecond)
assert.Equal(t, len(k.metricC), 1)
assert.Equal(t, acc.NFields(), 1)
}
// Test that the parser ignores invalid messages
func TestRunParserInvalidMsg(t *testing.T) {
k, in := NewTestKafka()
k, in := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewInfluxParser()
@ -60,27 +60,14 @@ func TestRunParserInvalidMsg(t *testing.T) {
in <- saramaMsg(invalidMsg)
time.Sleep(time.Millisecond)
assert.Equal(t, len(k.metricC), 0)
}
// Test that points are dropped when we hit the buffer limit
func TestRunParserRespectsBuffer(t *testing.T) {
k, in := NewTestKafka()
defer close(k.done)
k.parser, _ = parsers.NewInfluxParser()
go k.receiver()
for i := 0; i < pointBuffer+1; i++ {
in <- saramaMsg(testMsg)
}
time.Sleep(time.Millisecond)
assert.Equal(t, len(k.metricC), 5)
assert.Equal(t, acc.NFields(), 0)
}
// Test that the parser parses kafka messages into points
func TestRunParserAndGather(t *testing.T) {
k, in := NewTestKafka()
k, in := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewInfluxParser()
@ -88,17 +75,18 @@ func TestRunParserAndGather(t *testing.T) {
in <- saramaMsg(testMsg)
time.Sleep(time.Millisecond)
acc := testutil.Accumulator{}
k.Gather(&acc)
assert.Equal(t, len(acc.Metrics), 1)
assert.Equal(t, acc.NFields(), 1)
acc.AssertContainsFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(23422)})
}
// Test that the parser parses kafka messages into points
func TestRunParserAndGatherGraphite(t *testing.T) {
k, in := NewTestKafka()
k, in := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
@ -106,17 +94,18 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
in <- saramaMsg(testMsgGraphite)
time.Sleep(time.Millisecond)
acc := testutil.Accumulator{}
k.Gather(&acc)
assert.Equal(t, len(acc.Metrics), 1)
assert.Equal(t, acc.NFields(), 1)
acc.AssertContainsFields(t, "cpu_load_short_graphite",
map[string]interface{}{"value": float64(23422)})
}
// Test that the parser parses kafka messages into points
func TestRunParserAndGatherJSON(t *testing.T) {
k, in := NewTestKafka()
k, in := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
@ -124,10 +113,9 @@ func TestRunParserAndGatherJSON(t *testing.T) {
in <- saramaMsg(testMsgJSON)
time.Sleep(time.Millisecond)
acc := testutil.Accumulator{}
k.Gather(&acc)
assert.Equal(t, len(acc.Metrics), 1)
assert.Equal(t, acc.NFields(), 2)
acc.AssertContainsFields(t, "kafka_json_test",
map[string]interface{}{
"a": float64(5),

View File

@ -132,8 +132,8 @@ var serverTypeMapping = map[string]ServerType{
}
var sampleConfig = `
### An array of URI to gather stats about LeoFS.
### Specify an ip or hostname with port. ie 127.0.0.1:4020
## An array of URI to gather stats about LeoFS.
## Specify an ip or hostname with port. ie 127.0.0.1:4020
servers = ["127.0.0.1:4021"]
`

View File

@ -29,9 +29,9 @@ type Lustre2 struct {
}
var sampleConfig = `
### An array of /proc globs to search for Lustre stats
### If not specified, the default will work on Lustre 2.5.x
###
## An array of /proc globs to search for Lustre stats
## If not specified, the default will work on Lustre 2.5.x
##
# ost_procfiles = [
# "/proc/fs/lustre/obdfilter/*/stats",
# "/proc/fs/lustre/osd-ldiskfs/*/stats"

View File

@ -17,13 +17,13 @@ type MailChimp struct {
}
var sampleConfig = `
### MailChimp API key
### get from https://admin.mailchimp.com/account/api/
## MailChimp API key
## get from https://admin.mailchimp.com/account/api/
api_key = "" # required
### Reports for campaigns sent more than days_old ago will not be collected.
### 0 means collect all.
## Reports for campaigns sent more than days_old ago will not be collected.
## 0 means collect all.
days_old = 0
### Campaign ID to get, if empty gets all campaigns, this option overrides days_old
## Campaign ID to get, if empty gets all campaigns, this option overrides days_old
# campaign_id = ""
`

View File

@ -19,8 +19,8 @@ type Memcached struct {
}
var sampleConfig = `
### An array of address to gather stats about. Specify an ip on hostname
### with optional port. ie localhost, 10.0.0.1:11211, etc.
## An array of address to gather stats about. Specify an ip on hostname
## with optional port. ie localhost, 10.0.0.1:11211, etc.
servers = ["localhost:11211"]
# unix_sockets = ["/var/run/memcached.sock"]
`

View File

@ -0,0 +1,165 @@
# Mesos Input Plugin
This input plugin gathers metrics from Mesos (*currently only Mesos masters*).
For more information, please check the [Mesos Observability Metrics](http://mesos.apache.org/documentation/latest/monitoring/) page.
### Configuration:
```toml
# Telegraf plugin for gathering metrics from N Mesos masters
[[inputs.mesos]]
# Timeout, in ms.
timeout = 100
# A list of Mesos masters, default value is localhost:5050.
masters = ["localhost:5050"]
# Metrics groups to be collected, by default, all enabled.
master_collections = ["resources","master","system","slaves","frameworks","messages","evqueue","registrar"]
```
### Measurements & Fields:
Mesos master metric groups
- resources
- master/cpus_percent
- master/cpus_used
- master/cpus_total
- master/cpus_revocable_percent
- master/cpus_revocable_total
- master/cpus_revocable_used
- master/disk_percent
- master/disk_used
- master/disk_total
- master/disk_revocable_percent
- master/disk_revocable_total
- master/disk_revocable_used
- master/mem_percent
- master/mem_used
- master/mem_total
- master/mem_revocable_percent
- master/mem_revocable_total
- master/mem_revocable_used
- master
- master/elected
- master/uptime_secs
- system
- system/cpus_total
- system/load_15min
- system/load_5min
- system/load_1min
- system/mem_free_bytes
- system/mem_total_bytes
- slaves
- master/slave_registrations
- master/slave_removals
- master/slave_reregistrations
- master/slave_shutdowns_scheduled
- master/slave_shutdowns_canceled
- master/slave_shutdowns_completed
- master/slaves_active
- master/slaves_connected
- master/slaves_disconnected
- master/slaves_inactive
- frameworks
- master/frameworks_active
- master/frameworks_connected
- master/frameworks_disconnected
- master/frameworks_inactive
- master/outstanding_offers
- tasks
- master/tasks_error
- master/tasks_failed
- master/tasks_finished
- master/tasks_killed
- master/tasks_lost
- master/tasks_running
- master/tasks_staging
- master/tasks_starting
- messages
- master/invalid_executor_to_framework_messages
- master/invalid_framework_to_executor_messages
- master/invalid_status_update_acknowledgements
- master/invalid_status_updates
- master/dropped_messages
- master/messages_authenticate
- master/messages_deactivate_framework
- master/messages_decline_offers
- master/messages_executor_to_framework
- master/messages_exited_executor
- master/messages_framework_to_executor
- master/messages_kill_task
- master/messages_launch_tasks
- master/messages_reconcile_tasks
- master/messages_register_framework
- master/messages_register_slave
- master/messages_reregister_framework
- master/messages_reregister_slave
- master/messages_resource_request
- master/messages_revive_offers
- master/messages_status_update
- master/messages_status_update_acknowledgement
- master/messages_unregister_framework
- master/messages_unregister_slave
- master/messages_update_slave
- master/recovery_slave_removals
- master/slave_removals/reason_registered
- master/slave_removals/reason_unhealthy
- master/slave_removals/reason_unregistered
- master/valid_framework_to_executor_messages
- master/valid_status_update_acknowledgements
- master/valid_status_updates
- master/task_lost/source_master/reason_invalid_offers
- master/task_lost/source_master/reason_slave_removed
- master/task_lost/source_slave/reason_executor_terminated
- master/valid_executor_to_framework_messages
- evqueue
- master/event_queue_dispatches
- master/event_queue_http_requests
- master/event_queue_messages
- registrar
- registrar/state_fetch_ms
- registrar/state_store_ms
- registrar/state_store_ms/max
- registrar/state_store_ms/min
- registrar/state_store_ms/p50
- registrar/state_store_ms/p90
- registrar/state_store_ms/p95
- registrar/state_store_ms/p99
- registrar/state_store_ms/p999
- registrar/state_store_ms/p9999
### Tags:
- All measurements have the following tags:
- server
### Example Output:
```
$ telegraf -config ~/mesos.conf -input-filter mesos -test
* Plugin: mesos, Collection 1
mesos,server=172.17.8.101 allocator/event_queue_dispatches=0,master/cpus_percent=0,
master/cpus_revocable_percent=0,master/cpus_revocable_total=0,
master/cpus_revocable_used=0,master/cpus_total=2,
master/cpus_used=0,master/disk_percent=0,master/disk_revocable_percent=0,
master/disk_revocable_total=0,master/disk_revocable_used=0,master/disk_total=10823,
master/disk_used=0,master/dropped_messages=2,master/elected=1,
master/event_queue_dispatches=10,master/event_queue_http_requests=0,
master/event_queue_messages=0,master/frameworks_active=2,master/frameworks_connected=2,
master/frameworks_disconnected=0,master/frameworks_inactive=0,
master/invalid_executor_to_framework_messages=0,
master/invalid_framework_to_executor_messages=0,
master/invalid_status_update_acknowledgements=0,master/invalid_status_updates=0,master/mem_percent=0,
master/mem_revocable_percent=0,master/mem_revocable_total=0,
master/mem_revocable_used=0,master/mem_total=1002,
master/mem_used=0,master/messages_authenticate=0,
master/messages_deactivate_framework=0 ...
```

View File

@ -0,0 +1,320 @@
package mesos
import (
"encoding/json"
"errors"
"io/ioutil"
"log"
"net"
"net/http"
"strconv"
"strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
type Mesos struct {
Timeout int
Masters []string
MasterCols []string `toml:"master_collections"`
}
var defaultMetrics = []string{
"resources", "master", "system", "slaves", "frameworks",
"tasks", "messages", "evqueue", "messages", "registrar",
}
var sampleConfig = `
# Timeout, in ms.
timeout = 100
# A list of Mesos masters, default value is localhost:5050.
masters = ["localhost:5050"]
# Metrics groups to be collected, by default, all enabled.
master_collections = ["resources","master","system","slaves","frameworks","messages","evqueue","registrar"]
`
// SampleConfig returns a sample configuration block
func (m *Mesos) SampleConfig() string {
return sampleConfig
}
// Description just returns a short description of the Mesos plugin
func (m *Mesos) Description() string {
return "Telegraf plugin for gathering metrics from N Mesos masters"
}
// Gather() metrics from given list of Mesos Masters
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
var errorChannel chan error
if len(m.Masters) == 0 {
m.Masters = []string{"localhost:5050"}
}
errorChannel = make(chan error, len(m.Masters)*2)
for _, v := range m.Masters {
wg.Add(1)
go func(c string) {
errorChannel <- m.gatherMetrics(c, acc)
wg.Done()
return
}(v)
}
wg.Wait()
close(errorChannel)
errorStrings := []string{}
// Gather all errors for returning them at once
for err := range errorChannel {
if err != nil {
errorStrings = append(errorStrings, err.Error())
}
}
if len(errorStrings) > 0 {
return errors.New(strings.Join(errorStrings, "\n"))
}
return nil
}
// metricsDiff() returns set names for removal
func metricsDiff(w []string) []string {
b := []string{}
s := make(map[string]bool)
if len(w) == 0 {
return b
}
for _, v := range w {
s[v] = true
}
for _, d := range defaultMetrics {
if _, ok := s[d]; !ok {
b = append(b, d)
}
}
return b
}
// masterBlocks serves as kind of metrics registry groupping them in sets
func masterBlocks(g string) []string {
var m map[string][]string
m = make(map[string][]string)
m["resources"] = []string{
"master/cpus_percent",
"master/cpus_used",
"master/cpus_total",
"master/cpus_revocable_percent",
"master/cpus_revocable_total",
"master/cpus_revocable_used",
"master/disk_percent",
"master/disk_used",
"master/disk_total",
"master/disk_revocable_percent",
"master/disk_revocable_total",
"master/disk_revocable_used",
"master/mem_percent",
"master/mem_used",
"master/mem_total",
"master/mem_revocable_percent",
"master/mem_revocable_total",
"master/mem_revocable_used",
}
m["master"] = []string{
"master/elected",
"master/uptime_secs",
}
m["system"] = []string{
"system/cpus_total",
"system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
}
m["slaves"] = []string{
"master/slave_registrations",
"master/slave_removals",
"master/slave_reregistrations",
"master/slave_shutdowns_scheduled",
"master/slave_shutdowns_canceled",
"master/slave_shutdowns_completed",
"master/slaves_active",
"master/slaves_connected",
"master/slaves_disconnected",
"master/slaves_inactive",
}
m["frameworks"] = []string{
"master/frameworks_active",
"master/frameworks_connected",
"master/frameworks_disconnected",
"master/frameworks_inactive",
"master/outstanding_offers",
}
m["tasks"] = []string{
"master/tasks_error",
"master/tasks_failed",
"master/tasks_finished",
"master/tasks_killed",
"master/tasks_lost",
"master/tasks_running",
"master/tasks_staging",
"master/tasks_starting",
}
m["messages"] = []string{
"master/invalid_executor_to_framework_messages",
"master/invalid_framework_to_executor_messages",
"master/invalid_status_update_acknowledgements",
"master/invalid_status_updates",
"master/dropped_messages",
"master/messages_authenticate",
"master/messages_deactivate_framework",
"master/messages_decline_offers",
"master/messages_executor_to_framework",
"master/messages_exited_executor",
"master/messages_framework_to_executor",
"master/messages_kill_task",
"master/messages_launch_tasks",
"master/messages_reconcile_tasks",
"master/messages_register_framework",
"master/messages_register_slave",
"master/messages_reregister_framework",
"master/messages_reregister_slave",
"master/messages_resource_request",
"master/messages_revive_offers",
"master/messages_status_update",
"master/messages_status_update_acknowledgement",
"master/messages_unregister_framework",
"master/messages_unregister_slave",
"master/messages_update_slave",
"master/recovery_slave_removals",
"master/slave_removals/reason_registered",
"master/slave_removals/reason_unhealthy",
"master/slave_removals/reason_unregistered",
"master/valid_framework_to_executor_messages",
"master/valid_status_update_acknowledgements",
"master/valid_status_updates",
"master/task_lost/source_master/reason_invalid_offers",
"master/task_lost/source_master/reason_slave_removed",
"master/task_lost/source_slave/reason_executor_terminated",
"master/valid_executor_to_framework_messages",
}
m["evqueue"] = []string{
"master/event_queue_dispatches",
"master/event_queue_http_requests",
"master/event_queue_messages",
}
m["registrar"] = []string{
"registrar/state_fetch_ms",
"registrar/state_store_ms",
"registrar/state_store_ms/max",
"registrar/state_store_ms/min",
"registrar/state_store_ms/p50",
"registrar/state_store_ms/p90",
"registrar/state_store_ms/p95",
"registrar/state_store_ms/p99",
"registrar/state_store_ms/p999",
"registrar/state_store_ms/p9999",
}
ret, ok := m[g]
if !ok {
log.Println("[mesos] Unkown metrics group: ", g)
return []string{}
}
return ret
}
// removeGroup(), remove unwanted sets
func (m *Mesos) removeGroup(j *map[string]interface{}) {
var ok bool
b := metricsDiff(m.MasterCols)
for _, k := range b {
for _, v := range masterBlocks(k) {
if _, ok = (*j)[v]; ok {
delete((*j), v)
}
}
}
}
// This should not belong to the object
func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}
host, _, err := net.SplitHostPort(a)
if err != nil {
host = a
a = a + ":5050"
}
tags := map[string]string{
"server": host,
}
if m.Timeout == 0 {
log.Println("[mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100
}
ts := strconv.Itoa(m.Timeout) + "ms"
resp, err := http.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
if err != nil {
return err
}
data, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return err
}
if err = json.Unmarshal([]byte(data), &jsonOut); err != nil {
return errors.New("Error decoding JSON response")
}
m.removeGroup(&jsonOut)
jf := jsonparser.JSONFlattener{}
err = jf.FlattenJSON("", jsonOut)
if err != nil {
return err
}
acc.AddFields("mesos", jf.Fields, tags)
return nil
}
func init() {
inputs.Add("mesos", func() telegraf.Input {
return &Mesos{}
})
}

View File

@ -0,0 +1,118 @@
package mesos
import (
"encoding/json"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"testing"
"github.com/influxdata/telegraf/testutil"
)
var mesosMetrics map[string]interface{}
var ts *httptest.Server
func generateMetrics() {
mesosMetrics = make(map[string]interface{})
metricNames := []string{"master/cpus_percent", "master/cpus_used", "master/cpus_total",
"master/cpus_revocable_percent", "master/cpus_revocable_total", "master/cpus_revocable_used",
"master/disk_percent", "master/disk_used", "master/disk_total", "master/disk_revocable_percent",
"master/disk_revocable_total", "master/disk_revocable_used", "master/mem_percent",
"master/mem_used", "master/mem_total", "master/mem_revocable_percent", "master/mem_revocable_total",
"master/mem_revocable_used", "master/elected", "master/uptime_secs", "system/cpus_total",
"system/load_15min", "system/load_5min", "system/load_1min", "system/mem_free_bytes",
"system/mem_total_bytes", "master/slave_registrations", "master/slave_removals",
"master/slave_reregistrations", "master/slave_shutdowns_scheduled", "master/slave_shutdowns_canceled",
"master/slave_shutdowns_completed", "master/slaves_active", "master/slaves_connected",
"master/slaves_disconnected", "master/slaves_inactive", "master/frameworks_active",
"master/frameworks_connected", "master/frameworks_disconnected", "master/frameworks_inactive",
"master/outstanding_offers", "master/tasks_error", "master/tasks_failed", "master/tasks_finished",
"master/tasks_killed", "master/tasks_lost", "master/tasks_running", "master/tasks_staging",
"master/tasks_starting", "master/invalid_executor_to_framework_messages", "master/invalid_framework_to_executor_messages",
"master/invalid_status_update_acknowledgements", "master/invalid_status_updates",
"master/dropped_messages", "master/messages_authenticate", "master/messages_deactivate_framework",
"master/messages_decline_offers", "master/messages_executor_to_framework", "master/messages_exited_executor",
"master/messages_framework_to_executor", "master/messages_kill_task", "master/messages_launch_tasks",
"master/messages_reconcile_tasks", "master/messages_register_framework", "master/messages_register_slave",
"master/messages_reregister_framework", "master/messages_reregister_slave", "master/messages_resource_request",
"master/messages_revive_offers", "master/messages_status_update", "master/messages_status_update_acknowledgement",
"master/messages_unregister_framework", "master/messages_unregister_slave", "master/messages_update_slave",
"master/recovery_slave_removals", "master/slave_removals/reason_registered", "master/slave_removals/reason_unhealthy",
"master/slave_removals/reason_unregistered", "master/valid_framework_to_executor_messages", "master/valid_status_update_acknowledgements",
"master/valid_status_updates", "master/task_lost/source_master/reason_invalid_offers",
"master/task_lost/source_master/reason_slave_removed", "master/task_lost/source_slave/reason_executor_terminated",
"master/valid_executor_to_framework_messages", "master/event_queue_dispatches",
"master/event_queue_http_requests", "master/event_queue_messages", "registrar/state_fetch_ms",
"registrar/state_store_ms", "registrar/state_store_ms/max", "registrar/state_store_ms/min",
"registrar/state_store_ms/p50", "registrar/state_store_ms/p90", "registrar/state_store_ms/p95",
"registrar/state_store_ms/p99", "registrar/state_store_ms/p999", "registrar/state_store_ms/p9999"}
for _, k := range metricNames {
mesosMetrics[k] = rand.Float64()
}
}
func TestMain(m *testing.M) {
generateMetrics()
r := http.NewServeMux()
r.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(mesosMetrics)
})
ts = httptest.NewServer(r)
rc := m.Run()
ts.Close()
os.Exit(rc)
}
func TestMesosMaster(t *testing.T) {
var acc testutil.Accumulator
m := Mesos{
Masters: []string{ts.Listener.Addr().String()},
Timeout: 10,
}
err := m.Gather(&acc)
if err != nil {
t.Errorf(err.Error())
}
acc.AssertContainsFields(t, "mesos", mesosMetrics)
}
func TestRemoveGroup(t *testing.T) {
generateMetrics()
m := Mesos{
MasterCols: []string{
"resources", "master", "registrar",
},
}
b := []string{
"system", "slaves", "frameworks",
"messages", "evqueue",
}
m.removeGroup(&mesosMetrics)
for _, v := range b {
for _, x := range masterBlocks(v) {
if _, ok := mesosMetrics[x]; ok {
t.Errorf("Found key %s, it should be gone.", x)
}
}
}
for _, v := range m.MasterCols {
for _, x := range masterBlocks(v) {
if _, ok := mesosMetrics[x]; !ok {
t.Errorf("Didn't find key %s, it should present.", x)
}
}
}
}

View File

@ -26,11 +26,11 @@ type Ssl struct {
}
var sampleConfig = `
### An array of URI to gather stats about. Specify an ip or hostname
### with optional port add password. ie,
### mongodb://user:auth_key@10.10.3.30:27017,
### mongodb://10.10.3.33:18832,
### 10.0.0.1:10000, etc.
## An array of URI to gather stats about. Specify an ip or hostname
## with optional port add password. ie,
## mongodb://user:auth_key@10.10.3.30:27017,
## mongodb://10.10.3.33:18832,
## 10.0.0.1:10000, etc.
servers = ["127.0.0.1:27017"]
`

View File

@ -3,7 +3,7 @@
The [MQTT](http://mqtt.org/) consumer plugin reads from
specified MQTT topics and adds messages to InfluxDB.
The plugin expects messages in the
[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md).
[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
### Configuration:
@ -11,34 +11,34 @@ The plugin expects messages in the
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
servers = ["localhost:1883"]
### MQTT QoS, must be 0, 1, or 2
## MQTT QoS, must be 0, 1, or 2
qos = 0
### Topics to subscribe to
## Topics to subscribe to
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]
### Maximum number of metrics to buffer between collection intervals
## Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### username and password to connect MQTT server.
## username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
### Optional SSL Config
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```

View File

@ -15,15 +15,17 @@ import (
)
type MQTTConsumer struct {
Servers []string
Topics []string
Username string
Password string
MetricBuffer int
QoS int `toml:"qos"`
Servers []string
Topics []string
Username string
Password string
QoS int `toml:"qos"`
parser parsers.Parser
// Legacy metric buffer support
MetricBuffer int
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
@ -35,45 +37,41 @@ type MQTTConsumer struct {
sync.Mutex
client *mqtt.Client
// channel for all incoming parsed mqtt metrics
metricC chan telegraf.Metric
// channel for the topics of all incoming metrics (for tagging metrics)
topicC chan string
// channel of all incoming raw mqtt messages
in chan mqtt.Message
done chan struct{}
// keep the accumulator internally:
acc telegraf.Accumulator
}
var sampleConfig = `
servers = ["localhost:1883"]
### MQTT QoS, must be 0, 1, or 2
## MQTT QoS, must be 0, 1, or 2
qos = 0
### Topics to subscribe to
## Topics to subscribe to
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### username and password to connect MQTT server.
## username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
### Optional SSL Config
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
@ -89,9 +87,11 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) {
m.parser = parser
}
func (m *MQTTConsumer) Start() error {
func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.Lock()
defer m.Unlock()
m.acc = acc
if m.QoS > 2 || m.QoS < 0 {
return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS)
}
@ -106,13 +106,8 @@ func (m *MQTTConsumer) Start() error {
return token.Error()
}
m.in = make(chan mqtt.Message, m.MetricBuffer)
m.in = make(chan mqtt.Message, 1000)
m.done = make(chan struct{})
if m.MetricBuffer == 0 {
m.MetricBuffer = 100000
}
m.metricC = make(chan telegraf.Metric, m.MetricBuffer)
m.topicC = make(chan string, m.MetricBuffer)
topics := make(map[string]byte)
for _, topic := range m.Topics {
@ -145,13 +140,9 @@ func (m *MQTTConsumer) receiver() {
}
for _, metric := range metrics {
select {
case m.metricC <- metric:
m.topicC <- topic
default:
log.Printf("MQTT Consumer buffer is full, dropping a metric." +
" You may want to increase the metric_buffer setting")
}
tags := metric.Tags()
tags["topic"] = topic
m.acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
}
}
}
@ -169,16 +160,6 @@ func (m *MQTTConsumer) Stop() {
}
func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error {
m.Lock()
defer m.Unlock()
nmetrics := len(m.metricC)
for i := 0; i < nmetrics; i++ {
metric := <-m.metricC
topic := <-m.topicC
tags := metric.Tags()
tags["topic"] = topic
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
}
return nil
}

View File

@ -4,7 +4,6 @@ import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
@ -16,19 +15,15 @@ const (
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
invalidMsg = "cpu_load_short,host=server01 1422568543702900257"
metricBuffer = 5
)
func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) {
in := make(chan mqtt.Message, metricBuffer)
in := make(chan mqtt.Message, 100)
n := &MQTTConsumer{
Topics: []string{"telegraf"},
Servers: []string{"localhost:1883"},
MetricBuffer: metricBuffer,
in: in,
done: make(chan struct{}),
metricC: make(chan telegraf.Metric, metricBuffer),
topicC: make(chan string, metricBuffer),
Topics: []string{"telegraf"},
Servers: []string{"localhost:1883"},
in: in,
done: make(chan struct{}),
}
return n, in
}
@ -36,14 +31,16 @@ func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) {
// Test that the parser parses NATS messages into metrics
func TestRunParser(t *testing.T) {
n, in := newTestMQTTConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
go n.receiver()
in <- mqttMsg(testMsg)
time.Sleep(time.Millisecond)
time.Sleep(time.Millisecond * 25)
if a := len(n.metricC); a != 1 {
if a := acc.NFields(); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}
}
@ -51,51 +48,34 @@ func TestRunParser(t *testing.T) {
// Test that the parser ignores invalid messages
func TestRunParserInvalidMsg(t *testing.T) {
n, in := newTestMQTTConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
go n.receiver()
in <- mqttMsg(invalidMsg)
time.Sleep(time.Millisecond)
time.Sleep(time.Millisecond * 25)
if a := len(n.metricC); a != 0 {
if a := acc.NFields(); a != 0 {
t.Errorf("got %v, expected %v", a, 0)
}
}
// Test that metrics are dropped when we hit the buffer limit
func TestRunParserRespectsBuffer(t *testing.T) {
n, in := newTestMQTTConsumer()
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
go n.receiver()
for i := 0; i < metricBuffer+1; i++ {
in <- mqttMsg(testMsg)
}
time.Sleep(time.Millisecond)
if a := len(n.metricC); a != metricBuffer {
t.Errorf("got %v, expected %v", a, metricBuffer)
}
}
// Test that the parser parses line format messages into metrics
func TestRunParserAndGather(t *testing.T) {
n, in := newTestMQTTConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
go n.receiver()
in <- mqttMsg(testMsg)
time.Sleep(time.Millisecond)
time.Sleep(time.Millisecond * 25)
acc := testutil.Accumulator{}
n.Gather(&acc)
if a := len(acc.Metrics); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}
acc.AssertContainsFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(23422)})
}
@ -103,19 +83,17 @@ func TestRunParserAndGather(t *testing.T) {
// Test that the parser parses graphite format messages into metrics
func TestRunParserAndGatherGraphite(t *testing.T) {
n, in := newTestMQTTConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
go n.receiver()
in <- mqttMsg(testMsgGraphite)
time.Sleep(time.Millisecond)
time.Sleep(time.Millisecond * 25)
acc := testutil.Accumulator{}
n.Gather(&acc)
if a := len(acc.Metrics); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}
acc.AssertContainsFields(t, "cpu_load_short_graphite",
map[string]interface{}{"value": float64(23422)})
}
@ -123,19 +101,17 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
// Test that the parser parses json format messages into metrics
func TestRunParserAndGatherJSON(t *testing.T) {
n, in := newTestMQTTConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
go n.receiver()
in <- mqttMsg(testMsgJSON)
time.Sleep(time.Millisecond)
time.Sleep(time.Millisecond * 25)
acc := testutil.Accumulator{}
n.Gather(&acc)
if a := len(acc.Metrics); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}
acc.AssertContainsFields(t, "nats_json_test",
map[string]interface{}{
"a": float64(5),

View File

@ -15,14 +15,14 @@ type Mysql struct {
}
var sampleConfig = `
### specify servers via a url matching:
### [username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify]]
### see https://github.com/go-sql-driver/mysql#dsn-data-source-name
### e.g.
### root:passwd@tcp(127.0.0.1:3306)/?tls=false
### root@tcp(127.0.0.1:3306)/?tls=false
###
### If no servers are specified, then localhost is used as the host.
## specify servers via a url matching:
## [username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify]]
## see https://github.com/go-sql-driver/mysql#dsn-data-source-name
## e.g.
## root:passwd@tcp(127.0.0.1:3306)/?tls=false
## root@tcp(127.0.0.1:3306)/?tls=false
##
## If no servers are specified, then localhost is used as the host.
servers = ["tcp(127.0.0.1:3306)/"]
`

View File

@ -2,7 +2,7 @@
The [NATS](http://www.nats.io/about/) consumer plugin reads from
specified NATS subjects and adds messages to InfluxDB. The plugin expects messages
in the [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md).
in the [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
A [Queue Group](http://www.nats.io/documentation/concepts/nats-queueing/)
is used when subscribing to subjects so multiple instances of telegraf can read
from a NATS cluster in parallel.
@ -12,20 +12,20 @@ from a NATS cluster in parallel.
```toml
# Read metrics from NATS subject(s)
[[inputs.nats_consumer]]
### urls of NATS servers
## urls of NATS servers
servers = ["nats://localhost:4222"]
### Use Transport Layer Security
## Use Transport Layer Security
secure = false
### subject(s) to consume
## subject(s) to consume
subjects = ["telegraf"]
### name a queue group
## name a queue group
queue_group = "telegraf_consumers"
### Maximum number of metrics to buffer between collection intervals
## Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```

View File

@ -28,8 +28,10 @@ type natsConsumer struct {
Servers []string
Secure bool
// Legacy metric buffer support
MetricBuffer int
parser parsers.Parser
parser parsers.Parser
sync.Mutex
Conn *nats.Conn
@ -39,27 +41,24 @@ type natsConsumer struct {
in chan *nats.Msg
// channel for all NATS read errors
errs chan error
// channel for all incoming parsed metrics
metricC chan telegraf.Metric
done chan struct{}
done chan struct{}
acc telegraf.Accumulator
}
var sampleConfig = `
### urls of NATS servers
## urls of NATS servers
servers = ["nats://localhost:4222"]
### Use Transport Layer Security
## Use Transport Layer Security
secure = false
### subject(s) to consume
## subject(s) to consume
subjects = ["telegraf"]
### name a queue group
## name a queue group
queue_group = "telegraf_consumers"
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
@ -84,10 +83,12 @@ func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro
}
// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up.
func (n *natsConsumer) Start() error {
func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
n.Lock()
defer n.Unlock()
n.acc = acc
var connectErr error
opts := nats.DefaultOptions
@ -115,11 +116,6 @@ func (n *natsConsumer) Start() error {
}
n.done = make(chan struct{})
if n.MetricBuffer == 0 {
n.MetricBuffer = 100000
}
n.metricC = make(chan telegraf.Metric, n.MetricBuffer)
// Start the message reader
go n.receiver()
@ -146,13 +142,7 @@ func (n *natsConsumer) receiver() {
}
for _, metric := range metrics {
select {
case n.metricC <- metric:
continue
default:
log.Printf("NATS Consumer buffer is full, dropping a metric." +
" You may want to increase the metric_buffer setting")
}
n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
}
@ -163,7 +153,6 @@ func (n *natsConsumer) clean() {
n.Lock()
defer n.Unlock()
close(n.in)
close(n.metricC)
close(n.errs)
for _, sub := range n.Subs {
@ -185,13 +174,6 @@ func (n *natsConsumer) Stop() {
}
func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
n.Lock()
defer n.Unlock()
nmetrics := len(n.metricC)
for i := 0; i < nmetrics; i++ {
metric := <-n.metricC
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
return nil
}

View File

@ -4,7 +4,6 @@ import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/nats-io/nats"
@ -21,15 +20,13 @@ const (
func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) {
in := make(chan *nats.Msg, metricBuffer)
n := &natsConsumer{
QueueGroup: "test",
Subjects: []string{"telegraf"},
Servers: []string{"nats://localhost:4222"},
Secure: false,
MetricBuffer: metricBuffer,
in: in,
errs: make(chan error, metricBuffer),
done: make(chan struct{}),
metricC: make(chan telegraf.Metric, metricBuffer),
QueueGroup: "test",
Subjects: []string{"telegraf"},
Servers: []string{"nats://localhost:4222"},
Secure: false,
in: in,
errs: make(chan error, metricBuffer),
done: make(chan struct{}),
}
return n, in
}
@ -37,66 +34,51 @@ func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) {
// Test that the parser parses NATS messages into metrics
func TestRunParser(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
go n.receiver()
in <- natsMsg(testMsg)
time.Sleep(time.Millisecond)
time.Sleep(time.Millisecond * 25)
if a := len(n.metricC); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
if acc.NFields() != 1 {
t.Errorf("got %v, expected %v", acc.NFields(), 1)
}
}
// Test that the parser ignores invalid messages
func TestRunParserInvalidMsg(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
go n.receiver()
in <- natsMsg(invalidMsg)
time.Sleep(time.Millisecond)
time.Sleep(time.Millisecond * 25)
if a := len(n.metricC); a != 0 {
t.Errorf("got %v, expected %v", a, 0)
}
}
// Test that metrics are dropped when we hit the buffer limit
func TestRunParserRespectsBuffer(t *testing.T) {
n, in := newTestNatsConsumer()
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
go n.receiver()
for i := 0; i < metricBuffer+1; i++ {
in <- natsMsg(testMsg)
}
time.Sleep(time.Millisecond)
if a := len(n.metricC); a != metricBuffer {
t.Errorf("got %v, expected %v", a, metricBuffer)
if acc.NFields() != 0 {
t.Errorf("got %v, expected %v", acc.NFields(), 0)
}
}
// Test that the parser parses line format messages into metrics
func TestRunParserAndGather(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
go n.receiver()
in <- natsMsg(testMsg)
time.Sleep(time.Millisecond)
time.Sleep(time.Millisecond * 25)
acc := testutil.Accumulator{}
n.Gather(&acc)
if a := len(acc.Metrics); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}
acc.AssertContainsFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(23422)})
}
@ -104,19 +86,17 @@ func TestRunParserAndGather(t *testing.T) {
// Test that the parser parses graphite format messages into metrics
func TestRunParserAndGatherGraphite(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
go n.receiver()
in <- natsMsg(testMsgGraphite)
time.Sleep(time.Millisecond)
time.Sleep(time.Millisecond * 25)
acc := testutil.Accumulator{}
n.Gather(&acc)
if a := len(acc.Metrics); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}
acc.AssertContainsFields(t, "cpu_load_short_graphite",
map[string]interface{}{"value": float64(23422)})
}
@ -124,19 +104,17 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
// Test that the parser parses json format messages into metrics
func TestRunParserAndGatherJSON(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
go n.receiver()
in <- natsMsg(testMsgJSON)
time.Sleep(time.Millisecond)
time.Sleep(time.Millisecond * 25)
acc := testutil.Accumulator{}
n.Gather(&acc)
if a := len(acc.Metrics); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}
acc.AssertContainsFields(t, "nats_json_test",
map[string]interface{}{
"a": float64(5),

View File

@ -27,17 +27,17 @@ func (_ *NetResponse) Description() string {
}
var sampleConfig = `
### Protocol, must be "tcp" or "udp"
## Protocol, must be "tcp" or "udp"
protocol = "tcp"
### Server address (default localhost)
## Server address (default localhost)
address = "github.com:80"
### Set timeout (default 1.0 seconds)
## Set timeout (default 1.0 seconds)
timeout = 1.0
### Set read timeout (default 1.0 seconds)
## Set read timeout (default 1.0 seconds)
read_timeout = 1.0
### Optional string sent to the server
## Optional string sent to the server
# send = "ssh"
### Optional expected string in answer
## Optional expected string in answer
# expect = "ssh"
`

View File

@ -20,7 +20,7 @@ type Nginx struct {
}
var sampleConfig = `
### An array of Nginx stub_status URI to gather stats.
## An array of Nginx stub_status URI to gather stats.
urls = ["http://localhost/status"]
`

View File

@ -41,7 +41,7 @@ type NSQ struct {
}
var sampleConfig = `
### An array of NSQD HTTP API endpoints
## An array of NSQD HTTP API endpoints
endpoints = ["http://localhost:4151"]
`

View File

@ -126,14 +126,14 @@ func (p *process) getUptime() int64 {
}
var sampleConfig = `
### Path of passenger-status.
###
### Plugin gather metric via parsing XML output of passenger-status
### More information about the tool:
### https://www.phusionpassenger.com/library/admin/apache/overall_status_report.html
###
### If no path is specified, then the plugin simply execute passenger-status
### hopefully it can be found in your PATH
## Path of passenger-status.
##
## Plugin gather metric via parsing XML output of passenger-status
## More information about the tool:
## https://www.phusionpassenger.com/library/admin/apache/overall_status_report.html
##
## If no path is specified, then the plugin simply execute passenger-status
## hopefully it can be found in your PATH
command = "passenger-status -v --show=xml"
`

View File

@ -41,25 +41,25 @@ type phpfpm struct {
}
var sampleConfig = `
### An array of addresses to gather stats about. Specify an ip or hostname
### with optional port and path
###
### Plugin can be configured in three modes (either can be used):
### - http: the URL must start with http:// or https://, ie:
### "http://localhost/status"
### "http://192.168.130.1/status?full"
###
### - unixsocket: path to fpm socket, ie:
### "/var/run/php5-fpm.sock"
### or using a custom fpm status path:
### "/var/run/php5-fpm.sock:fpm-custom-status-path"
###
### - fcgi: the URL must start with fcgi:// or cgi://, and port must be present, ie:
### "fcgi://10.0.0.12:9000/status"
### "cgi://10.0.10.12:9001/status"
###
### Example of multiple gathering from local socket and remove host
### urls = ["http://192.168.1.20/status", "/tmp/fpm.sock"]
## An array of addresses to gather stats about. Specify an ip or hostname
## with optional port and path
##
## Plugin can be configured in three modes (either can be used):
## - http: the URL must start with http:// or https://, ie:
## "http://localhost/status"
## "http://192.168.130.1/status?full"
##
## - unixsocket: path to fpm socket, ie:
## "/var/run/php5-fpm.sock"
## or using a custom fpm status path:
## "/var/run/php5-fpm.sock:fpm-custom-status-path"
##
## - fcgi: the URL must start with fcgi:// or cgi://, and port must be present, ie:
## "fcgi://10.0.0.12:9000/status"
## "cgi://10.0.10.12:9001/status"
##
## Example of multiple gathering from local socket and remove host
## urls = ["http://192.168.1.20/status", "/tmp/fpm.sock"]
urls = ["http://localhost/status"]
`

View File

@ -44,18 +44,18 @@ func (_ *Ping) Description() string {
}
var sampleConfig = `
### NOTE: this plugin forks the ping command. You may need to set capabilities
### via setcap cap_net_raw+p /bin/ping
## NOTE: this plugin forks the ping command. You may need to set capabilities
## via setcap cap_net_raw+p /bin/ping
### urls to ping
## urls to ping
urls = ["www.google.com"] # required
### number of pings to send (ping -c <COUNT>)
## number of pings to send (ping -c <COUNT>)
count = 1 # required
### interval, in s, at which to ping. 0 == default (ping -i <PING_INTERVAL>)
## interval, in s, at which to ping. 0 == default (ping -i <PING_INTERVAL>)
ping_interval = 0.0
### ping timeout, in s. 0 == no timeout (ping -t <TIMEOUT>)
## ping timeout, in s. 0 == no timeout (ping -t <TIMEOUT>)
timeout = 0.0
### interface to send ping from (ping -I <INTERFACE>)
## interface to send ping from (ping -I <INTERFACE>)
interface = ""
`

View File

@ -23,22 +23,22 @@ type Postgresql struct {
var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true}
var sampleConfig = `
### specify address via a url matching:
### postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
### or a simple string:
### host=localhost user=pqotest password=... sslmode=... dbname=app_production
###
### All connection parameters are optional.
###
### Without the dbname parameter, the driver will default to a database
### with the same name as the user. This dbname is just for instantiating a
### connection with the server and doesn't restrict the databases we are trying
### to grab metrics for.
###
## specify address via a url matching:
## postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
## or a simple string:
## host=localhost user=pqotest password=... sslmode=... dbname=app_production
##
## All connection parameters are optional.
##
## Without the dbname parameter, the driver will default to a database
## with the same name as the user. This dbname is just for instantiating a
## connection with the server and doesn't restrict the databases we are trying
## to grab metrics for.
##
address = "host=localhost user=postgres sslmode=disable"
### A list of databases to pull metrics about. If not specified, metrics for all
### databases are gathered.
## A list of databases to pull metrics about. If not specified, metrics for all
## databases are gathered.
# databases = ["app_production", "testing"]
`

View File

@ -18,8 +18,8 @@ type Powerdns struct {
}
var sampleConfig = `
### An array of sockets to gather stats about.
### Specify a path to unix socket.
## An array of sockets to gather stats about.
## Specify a path to unix socket.
unix_sockets = ["/var/run/pdns.controlsocket"]
`

View File

@ -30,15 +30,15 @@ func NewProcstat() *Procstat {
}
var sampleConfig = `
### Must specify one of: pid_file, exe, or pattern
### PID file to monitor process
## Must specify one of: pid_file, exe, or pattern
## PID file to monitor process
pid_file = "/var/run/nginx.pid"
### executable name (ie, pgrep <exe>)
## executable name (ie, pgrep <exe>)
# exe = "nginx"
### pattern as argument for pgrep (ie, pgrep -f <pattern>)
## pattern as argument for pgrep (ie, pgrep -f <pattern>)
# pattern = "nginx"
### Field name prefix
## Field name prefix
prefix = ""
`

View File

@ -17,7 +17,7 @@ type Prometheus struct {
}
var sampleConfig = `
### An array of urls to scrape metrics from.
## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]
`

View File

@ -18,7 +18,7 @@ type PuppetAgent struct {
}
var sampleConfig = `
### Location of puppet last run summary file
## Location of puppet last run summary file
location = "/var/lib/puppet/state/last_run_summary.yaml"
`

View File

@ -107,8 +107,8 @@ var sampleConfig = `
# username = "guest"
# password = "guest"
### A list of nodes to pull metrics about. If not specified, metrics for
### all nodes are gathered.
## A list of nodes to pull metrics about. If not specified, metrics for
## all nodes are gathered.
# nodes = ["rabbit@node1", "rabbit@node2"]
`

View File

@ -21,7 +21,7 @@ type Raindrops struct {
}
var sampleConfig = `
### An array of raindrops middleware URI to gather stats.
## An array of raindrops middleware URI to gather stats.
urls = ["http://localhost:8080/_raindrops"]
`

View File

@ -19,14 +19,14 @@ type Redis struct {
}
var sampleConfig = `
### specify servers via a url matching:
### [protocol://][:password]@address[:port]
### e.g.
### tcp://localhost:6379
### tcp://:password@192.168.99.100
###
### If no servers are specified, then localhost is used as the host.
### If no port is specified, 6379 is used
## specify servers via a url matching:
## [protocol://][:password]@address[:port]
## e.g.
## tcp://localhost:6379
## tcp://:password@192.168.99.100
##
## If no servers are specified, then localhost is used as the host.
## If no port is specified, 6379 is used
servers = ["tcp://localhost:6379"]
`

View File

@ -16,11 +16,11 @@ type RethinkDB struct {
}
var sampleConfig = `
### An array of URI to gather stats about. Specify an ip or hostname
### with optional port add password. ie,
### rethinkdb://user:auth_key@10.10.3.30:28105,
### rethinkdb://10.10.3.33:18832,
### 10.0.0.1:10000, etc.
## An array of URI to gather stats about. Specify an ip or hostname
## with optional port add password. ie,
## rethinkdb://user:auth_key@10.10.3.30:28105,
## rethinkdb://10.10.3.33:18832,
## 10.0.0.1:10000, etc.
servers = ["127.0.0.1:28015"]
`

View File

@ -20,15 +20,15 @@ func (_ *Sensors) Description() string {
}
var sensorsSampleConfig = `
### By default, telegraf gathers stats from all sensors detected by the
### lm-sensors module.
###
### Only collect stats from the selected sensors. Sensors are listed as
### <chip name>:<feature name>. This information can be found by running the
### sensors command, e.g. sensors -u
###
### A * as the feature name will return all features of the chip
###
## By default, telegraf gathers stats from all sensors detected by the
## lm-sensors module.
##
## Only collect stats from the selected sensors. Sensors are listed as
## <chip name>:<feature name>. This information can be found by running the
## sensors command, e.g. sensors -u
##
## A * as the feature name will return all features of the chip
##
# sensors = ["coretemp-isa-0000:Core 0", "coretemp-isa-0001:*"]
`

View File

@ -72,11 +72,11 @@ var initNode = Node{
var NameToOid = make(map[string]string)
var sampleConfig = `
### Use 'oids.txt' file to translate oids to names
### To generate 'oids.txt' you need to run:
### snmptranslate -m all -Tz -On | sed -e 's/"//g' > /tmp/oids.txt
### Or if you have an other MIB folder with custom MIBs
### snmptranslate -M /mycustommibfolder -Tz -On -m all | sed -e 's/"//g' > oids.txt
## Use 'oids.txt' file to translate oids to names
## To generate 'oids.txt' you need to run:
## snmptranslate -m all -Tz -On | sed -e 's/"//g' > /tmp/oids.txt
## Or if you have an other MIB folder with custom MIBs
## snmptranslate -M /mycustommibfolder -Tz -On -m all | sed -e 's/"//g' > oids.txt
snmptranslate_file = "/tmp/oids.txt"
[[inputs.snmp.host]]
address = "192.168.2.2:161"

View File

@ -31,12 +31,12 @@ var queries MapQuery
var defaultServer = "Server=.;app name=telegraf;log=1;"
var sampleConfig = `
### Specify instances to monitor with a list of connection strings.
### All connection parameters are optional.
### By default, the host is localhost, listening on default port, TCP 1433.
### for Windows, the user is the currently running AD user (SSO).
### See https://github.com/denisenkom/go-mssqldb for detailed connection
### parameters.
## Specify instances to monitor with a list of connection strings.
## All connection parameters are optional.
## By default, the host is localhost, listening on default port, TCP 1433.
## for Windows, the user is the currently running AD user (SSO).
## See https://github.com/denisenkom/go-mssqldb for detailed connection
## parameters.
# servers = [
# "Server=192.168.1.10;Port=1433;User Id=<user>;Password=<pw>;app name=telegraf;log=1;",
# ]

View File

@ -1,6 +1,47 @@
# Telegraf Service Plugin: statsd
#### Description
### Configuration
```toml
# Statsd Server
[[inputs.statsd]]
## Address and port to host UDP listener on
service_address = ":8125"
## Delete gauges every interval (default=false)
delete_gauges = false
## Delete counters every interval (default=false)
delete_counters = false
## Delete sets every interval (default=false)
delete_sets = false
## Delete timings & histograms every interval (default=true)
delete_timings = true
## Percentiles to calculate for timing & histogram stats
percentiles = [90]
## convert measurement names, "." to "_" and "-" to "__"
convert_names = true
## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
# templates = [
# "cpu.* measurement*"
# ]
## Number of UDP messages allowed to queue up, once filled,
## the statsd server will start dropping packets
allowed_pending_messages = 10000
## Number of timing/histogram values to track per-measurement in the
## calculation of percentiles. Raising this limit increases the accuracy
## of percentiles but also increases the memory usage and cpu time.
percentile_limit = 1000
## UDP packet size for the server to listen for. This will depend on the size
## of the packets that the client is sending, which is usually 1500 bytes.
udp_packet_size = 1500
```
### Description
The statsd plugin is a special type of plugin which runs a backgrounded statsd
listener service while telegraf is running.
@ -42,7 +83,7 @@ The string `foo:1|c:200|ms` is internally split into two individual metrics
`foo:1|c` and `foo:200|ms` which are added to the aggregator separately.
#### Influx Statsd
### Influx Statsd
In order to take advantage of InfluxDB's tagging system, we have made a couple
additions to the standard statsd protocol. First, you can specify
@ -59,7 +100,7 @@ COMING SOON: there will be a way to specify multiple fields.
current.users,service=payroll,server=host01:west=10,east=10,central=2,south=10|g
``` -->
#### Measurements:
### Measurements:
Meta:
- tags: `metric_type=<gauge|set|counter|timing|histogram>`
@ -99,7 +140,7 @@ metric type:
period are below x. The most common value that people use for `P` is the
`90`, this is a great number to try to optimize.
#### Plugin arguments
### Plugin arguments
- **service_address** string: Address to listen for statsd UDP packets on
- **delete_gauges** boolean: Delete gauges on every collection interval
@ -115,7 +156,7 @@ the accuracy of percentiles but also increases the memory usage and cpu time.
- **templates** []string: Templates for transforming statsd buckets into influx
measurements and tags.
#### Statsd bucket -> InfluxDB line-protocol Templates
### Statsd bucket -> InfluxDB line-protocol Templates
The plugin supports specifying templates for transforming statsd buckets into
InfluxDB measurement names and tags. The templates have a _measurement_ keyword,

View File

@ -123,39 +123,39 @@ func (_ *Statsd) Description() string {
}
const sampleConfig = `
### Address and port to host UDP listener on
## Address and port to host UDP listener on
service_address = ":8125"
### Delete gauges every interval (default=false)
## Delete gauges every interval (default=false)
delete_gauges = false
### Delete counters every interval (default=false)
## Delete counters every interval (default=false)
delete_counters = false
### Delete sets every interval (default=false)
## Delete sets every interval (default=false)
delete_sets = false
### Delete timings & histograms every interval (default=true)
## Delete timings & histograms every interval (default=true)
delete_timings = true
### Percentiles to calculate for timing & histogram stats
## Percentiles to calculate for timing & histogram stats
percentiles = [90]
### convert measurement names, "." to "_" and "-" to "__"
## convert measurement names, "." to "_" and "-" to "__"
convert_names = true
### Statsd data translation templates, more info can be read here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md#graphite
## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
# templates = [
# "cpu.* measurement*"
# ]
### Number of UDP messages allowed to queue up, once filled,
### the statsd server will start dropping packets
## Number of UDP messages allowed to queue up, once filled,
## the statsd server will start dropping packets
allowed_pending_messages = 10000
### Number of timing/histogram values to track per-measurement in the
### calculation of percentiles. Raising this limit increases the accuracy
### of percentiles but also increases the memory usage and cpu time.
## Number of timing/histogram values to track per-measurement in the
## calculation of percentiles. Raising this limit increases the accuracy
## of percentiles but also increases the memory usage and cpu time.
percentile_limit = 1000
### UDP packet size for the server to listen for. This will depend on the size
### of the packets that the client is sending, which is usually 1500 bytes.
## UDP packet size for the server to listen for. This will depend on the size
## of the packets that the client is sending, which is usually 1500 bytes.
udp_packet_size = 1500
`
@ -213,7 +213,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
return nil
}
func (s *Statsd) Start() error {
func (s *Statsd) Start(_ telegraf.Accumulator) error {
// Make data structures
s.done = make(chan struct{})
s.in = make(chan []byte, s.AllowedPendingMessages)

View File

@ -28,11 +28,11 @@ func (_ *CPUStats) Description() string {
}
var sampleConfig = `
### Whether to report per-cpu stats or not
## Whether to report per-cpu stats or not
percpu = true
### Whether to report total system cpu stats or not
## Whether to report total system cpu stats or not
totalcpu = true
### Comment this line if you want the raw CPU time metrics
## Comment this line if you want the raw CPU time metrics
drop = ["time_*"]
`

View File

@ -21,8 +21,8 @@ func (_ *DiskStats) Description() string {
}
var diskSampleConfig = `
### By default, telegraf gather stats for all mountpoints.
### Setting mountpoints will restrict the stats to the specified mountpoints.
## By default, telegraf gather stats for all mountpoints.
## Setting mountpoints will restrict the stats to the specified mountpoints.
# mount_points = ["/"]
`

View File

@ -21,10 +21,10 @@ func (_ *NetIOStats) Description() string {
}
var netSampleConfig = `
### By default, telegraf gathers stats from any up interface (excluding loopback)
### Setting interfaces will tell it to gather these explicit interfaces,
### regardless of status.
###
## By default, telegraf gathers stats from any up interface (excluding loopback)
## Setting interfaces will tell it to gather these explicit interfaces,
## regardless of status.
##
# interfaces = ["eth0"]
`

View File

@ -13,7 +13,7 @@ type Trig struct {
}
var TrigConfig = `
### Set the amplitude
## Set the amplitude
amplitude = 10.0
`

View File

@ -17,9 +17,9 @@ type Twemproxy struct {
}
var sampleConfig = `
### Twemproxy stats address and port (no scheme)
## Twemproxy stats address and port (no scheme)
addr = "localhost:22222"
### Monitor pool name
## Monitor pool name
pools = ["redis_pool", "mc_pool"]
`

View File

@ -14,12 +14,12 @@ import (
)
var sampleConfig string = `
### By default this plugin returns basic CPU and Disk statistics.
### See the README file for more examples.
### Uncomment examples below or write your own as you see fit. If the system
### being polled for data does not have the Object at startup of the Telegraf
### agent, it will not be gathered.
### Settings:
## By default this plugin returns basic CPU and Disk statistics.
## See the README file for more examples.
## Uncomment examples below or write your own as you see fit. If the system
## being polled for data does not have the Object at startup of the Telegraf
## agent, it will not be gathered.
## Settings:
# PrintValid = false # Print All matching performance counters
[[inputs.win_perf_counters.object]]

View File

@ -23,15 +23,15 @@ type poolInfo struct {
}
var sampleConfig = `
### ZFS kstat path
### If not specified, then default is:
## ZFS kstat path
## If not specified, then default is:
kstatPath = "/proc/spl/kstat/zfs"
### By default, telegraf gather all zfs stats
### If not specified, then default is:
## By default, telegraf gather all zfs stats
## If not specified, then default is:
kstatMetrics = ["arcstats", "zfetchstats", "vdev_cache_stats"]
### By default, don't gather zpool stats
## By default, don't gather zpool stats
poolMetrics = false
`

View File

@ -20,11 +20,11 @@ type Zookeeper struct {
}
var sampleConfig = `
### An array of address to gather stats about. Specify an ip or hostname
### with port. ie localhost:2181, 10.0.0.1:2181, etc.
## An array of address to gather stats about. Specify an ip or hostname
## with port. ie localhost:2181, 10.0.0.1:2181, etc.
### If no servers are specified, then localhost is used as the host.
### If no port is specified, 2181 is used
## If no servers are specified, then localhost is used as the host.
## If no port is specified, 2181 is used
servers = [":2181"]
`

View File

@ -22,13 +22,13 @@ type Amon struct {
}
var sampleConfig = `
### Amon Server Key
## Amon Server Key
server_key = "my-server-key" # required.
### Amon Instance URL
## Amon Instance URL
amon_instance = "https://youramoninstance" # required
### Connection timeout.
## Connection timeout.
# timeout = "5s"
`

View File

@ -52,32 +52,32 @@ const (
)
var sampleConfig = `
### AMQP url
## AMQP url
url = "amqp://localhost:5672/influxdb"
### AMQP exchange
## AMQP exchange
exchange = "telegraf"
### Telegraf tag to use as a routing key
### ie, if this tag exists, it's value will be used as the routing key
## Telegraf tag to use as a routing key
## ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"
### InfluxDB retention policy
## InfluxDB retention policy
# retention_policy = "default"
### InfluxDB database
## InfluxDB database
# database = "telegraf"
### InfluxDB precision
## InfluxDB precision
# precision = "s"
### Optional SSL Config
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
## Data format to output. This can be "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
`

View File

@ -25,10 +25,10 @@ type CloudWatch struct {
}
var sampleConfig = `
### Amazon REGION
## Amazon REGION
region = 'us-east-1'
### Namespace for the CloudWatch MetricDatums
## Namespace for the CloudWatch MetricDatums
namespace = 'InfluxData/Telegraf'
`

View File

@ -24,10 +24,10 @@ type Datadog struct {
}
var sampleConfig = `
### Datadog API key
## Datadog API key
apikey = "my-secret-key" # required.
### Connection timeout.
## Connection timeout.
# timeout = "5s"
`

View File

@ -20,13 +20,13 @@ type File struct {
}
var sampleConfig = `
### Files to write to, "stdout" is a specially handled file.
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
## Data format to output. This can be "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
`

View File

@ -23,11 +23,11 @@ type Graphite struct {
}
var sampleConfig = `
### TCP endpoint for your graphite instance.
## TCP endpoint for your graphite instance.
servers = ["localhost:2003"]
### Prefix metrics name
## Prefix metrics name
prefix = ""
### timeout in seconds for the write connection to graphite
## timeout in seconds for the write connection to graphite
timeout = 2
`

View File

@ -41,32 +41,32 @@ type InfluxDB struct {
}
var sampleConfig = `
### The full HTTP or UDP endpoint URL for your InfluxDB instance.
### Multiple urls can be specified as part of the same cluster,
### this means that only ONE of the urls will be written to each interval.
## The full HTTP or UDP endpoint URL for your InfluxDB instance.
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
# urls = ["udp://localhost:8089"] # UDP endpoint example
urls = ["http://localhost:8086"] # required
### The target database for metrics (telegraf will create it if not exists)
## The target database for metrics (telegraf will create it if not exists)
database = "telegraf" # required
### Precision of writes, valid values are n, u, ms, s, m, and h
### note: using "s" precision greatly improves InfluxDB compression
## Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h".
## note: using "s" precision greatly improves InfluxDB compression
precision = "s"
### Connection timeout (for the connection with InfluxDB), formatted as a string.
### If not provided, will default to 0 (no timeout)
## Connection timeout (for the connection with InfluxDB), formatted as a string.
## If not provided, will default to 0 (no timeout)
# timeout = "5s"
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
### Set the user agent for HTTP POSTs (can be useful for log differentiation)
## Set the user agent for HTTP POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
### Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
## Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
# udp_payload = 512
### Optional SSL Config
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
`
@ -156,17 +156,20 @@ func (i *InfluxDB) Description() string {
// Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: i.Database,
Precision: i.Precision,
})
if err != nil {
return err
}
for _, metric := range metrics {
bp.AddPoint(metric.Point())
}
// This will get set to nil if a successful write occurs
err := errors.New("Could not write to any InfluxDB server in cluster")
err = errors.New("Could not write to any InfluxDB server in cluster")
p := rand.Perm(len(i.conns))
for _, n := range p {

View File

@ -45,25 +45,25 @@ type Kafka struct {
}
var sampleConfig = `
### URLs of kafka brokers
## URLs of kafka brokers
brokers = ["localhost:9092"]
### Kafka topic for producer messages
## Kafka topic for producer messages
topic = "telegraf"
### Telegraf tag to use as a routing key
### ie, if this tag exists, it's value will be used as the routing key
## Telegraf tag to use as a routing key
## ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"
### Optional SSL Config
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
## Data format to output. This can be "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
`

View File

@ -28,16 +28,16 @@ type KinesisOutput struct {
}
var sampleConfig = `
### Amazon REGION of kinesis endpoint.
## Amazon REGION of kinesis endpoint.
region = "ap-southeast-2"
### Kinesis StreamName must exist prior to starting telegraf.
## Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName"
### PartitionKey as used for sharding data.
## PartitionKey as used for sharding data.
partitionkey = "PartitionKey"
### format of the Data payload in the kinesis PutRecord, supported
### String and Custom.
## format of the Data payload in the kinesis PutRecord, supported
## String and Custom.
format = "string"
### debug will show upstream aws messages.
## debug will show upstream aws messages.
debug = false
`

View File

@ -23,20 +23,20 @@ type Librato struct {
}
var sampleConfig = `
### Librator API Docs
### http://dev.librato.com/v1/metrics-authentication
## Librator API Docs
## http://dev.librato.com/v1/metrics-authentication
### Librato API user
## Librato API user
api_user = "telegraf@influxdb.com" # required.
### Librato API token
## Librato API token
api_token = "my-secret-token" # required.
### Tag Field to populate source attribute (optional)
### This is typically the _hostname_ from which the metric was obtained.
## Tag Field to populate source attribute (optional)
## This is typically the _hostname_ from which the metric was obtained.
source_tag = "hostname"
### Connection timeout.
## Connection timeout.
# timeout = "5s"
`

View File

@ -16,26 +16,26 @@ import (
var sampleConfig = `
servers = ["localhost:1883"] # required.
### MQTT outputs send metrics to this topic format
### "<topic_prefix>/<hostname>/<pluginname>/"
### ex: prefix/host/web01.example.com/mem
## MQTT outputs send metrics to this topic format
## "<topic_prefix>/<hostname>/<pluginname>/"
## ex: prefix/web01.example.com/mem
topic_prefix = "telegraf"
### username and password to connect MQTT server.
## username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
### Optional SSL Config
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
## Data format to output. This can be "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
`

View File

@ -19,15 +19,15 @@ type NSQ struct {
}
var sampleConfig = `
### Location of nsqd instance listening on TCP
## Location of nsqd instance listening on TCP
server = "localhost:4150"
### NSQ topic for producer messages
## NSQ topic for producer messages
topic = "telegraf"
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
## Data format to output. This can be "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
`

View File

@ -22,17 +22,17 @@ type OpenTSDB struct {
}
var sampleConfig = `
### prefix for metrics keys
## prefix for metrics keys
prefix = "my.specific.prefix."
## Telnet Mode ##
### DNS name of the OpenTSDB server in telnet mode
## DNS name of the OpenTSDB server in telnet mode
host = "opentsdb.example.com"
### Port of the OpenTSDB server in telnet mode
## Port of the OpenTSDB server in telnet mode
port = 4242
### Debug true - Prints OpenTSDB communication
## Debug true - Prints OpenTSDB communication
debug = false
`

View File

@ -16,7 +16,7 @@ type PrometheusClient struct {
}
var sampleConfig = `
### Address to listen on
## Address to listen on
# listen = ":9126"
`

View File

@ -21,11 +21,11 @@ type Riemann struct {
}
var sampleConfig = `
### URL of server
## URL of server
url = "localhost:5555"
### transport protocol to use either tcp or udp
## transport protocol to use either tcp or udp
transport = "tcp"
### separator to use between input name and field name in Riemann service name
## separator to use between input name and field name in Riemann service name
separator = " "
`

View File

@ -42,7 +42,7 @@ DESCRIPTION = "Plugin-driven server agent for reporting metrics into InfluxDB."
# SCRIPT START
prereqs = [ 'git', 'go' ]
optional_prereqs = [ 'gvm', 'fpm', 'rpmbuild' ]
optional_prereqs = [ 'fpm', 'rpmbuild' ]
fpm_common_args = "-f -s dir --log error \
--vendor {} \
@ -78,6 +78,14 @@ supported_packages = {
"linux": [ "deb", "rpm", "tar", "zip" ],
"windows": [ "tar", "zip" ],
}
supported_tags = {
# "linux": {
# "amd64": ["sensors"]
# }
}
prereq_cmds = {
# "linux": "sudo apt-get install lm-sensors libsensors4-dev"
}
def run(command, allow_failure=False, shell=False):
out = None
@ -233,52 +241,6 @@ def upload_packages(packages, bucket_name=None, nightly=False):
print("\t - Not uploading {}, already exists.".format(p))
print("")
def run_tests(race, parallel, timeout, no_vet):
get_command = "go get -d -t ./..."
print("Retrieving Go dependencies...")
sys.stdout.flush()
run(get_command)
print("Running tests:")
print("\tRace: ", race)
if parallel is not None:
print("\tParallel:", parallel)
if timeout is not None:
print("\tTimeout:", timeout)
sys.stdout.flush()
p = subprocess.Popen(["go", "fmt", "./..."], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if len(out) > 0 or len(err) > 0:
print("Code not formatted. Please use 'go fmt ./...' to fix formatting errors.")
print(out)
print(err)
return False
if not no_vet:
p = subprocess.Popen(["go", "tool", "vet", "-composites=false", "./"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if len(out) > 0 or len(err) > 0:
print("Go vet failed. Please run 'go vet ./...' and fix any errors.")
print(out)
print(err)
return False
else:
print("Skipping go vet ...")
sys.stdout.flush()
test_command = "go test -v"
if race:
test_command += " -race"
if parallel is not None:
test_command += " -parallel {}".format(parallel)
if timeout is not None:
test_command += " -timeout {}".format(timeout)
test_command += " ./..."
code = os.system(test_command)
if code != 0:
print("Tests Failed")
return False
else:
print("Tests Passed")
return True
def build(version=None,
branch=None,
commit=None,
@ -335,6 +297,11 @@ def build(version=None,
build_command += "go build -o {} ".format(os.path.join(outdir, b))
if race:
build_command += "-race "
if platform in supported_tags:
if arch in supported_tags[platform]:
build_tags = supported_tags[platform][arch]
for build_tag in build_tags:
build_command += "-tags "+build_tag+" "
go_version = get_go_version()
if "1.4" in go_version:
build_command += "-ldflags=\"-X main.buildTime '{}' ".format(datetime.datetime.utcnow().isoformat())
@ -393,14 +360,10 @@ def package_scripts(build_root):
shutil.copyfile(DEFAULT_CONFIG, os.path.join(build_root, CONFIG_DIR[1:], "telegraf.conf"))
os.chmod(os.path.join(build_root, CONFIG_DIR[1:], "telegraf.conf"), 0o644)
def go_get(update=False):
get_command = None
if update:
get_command = "go get -u -f -d ./..."
else:
get_command = "go get -d ./..."
def go_get():
print("Retrieving Go dependencies...")
run(get_command)
run("go get github.com/sparrc/gdm")
run("gdm restore")
def generate_md5_from_file(path):
m = hashlib.md5()
@ -450,7 +413,7 @@ def build_packages(build_output, version, pkg_arch, nightly=False, rc=None, iter
package_version = version
package_iteration = iteration
current_location = build_output[p][a]
if package_type in ['zip', 'tar']:
if nightly:
name = '{}-nightly_{}_{}'.format(name, p, a)
@ -519,12 +482,9 @@ def print_usage():
print("\t --race \n\t\t- Whether the produced build should have race detection enabled.")
print("\t --package \n\t\t- Whether the produced builds should be packaged for the target platform(s).")
print("\t --nightly \n\t\t- Whether the produced build is a nightly (affects version information).")
print("\t --update \n\t\t- Whether dependencies should be updated prior to building.")
print("\t --test \n\t\t- Run Go tests. Will not produce a build.")
print("\t --parallel \n\t\t- Run Go tests in parallel up to the count specified.")
print("\t --timeout \n\t\t- Timeout for Go tests. Defaults to 480s.")
print("\t --clean \n\t\t- Clean the build output directory prior to creating build.")
print("\t --no-get \n\t\t- Do not run `go get` before building.")
print("\t --bucket=<S3 bucket>\n\t\t- Full path of the bucket to upload packages to (must also specify --upload).")
print("\t --debug \n\t\t- Displays debug output.")
print("")
@ -592,15 +552,9 @@ def main():
elif '--nightly' in arg:
# Signifies that this is a nightly build.
nightly = True
elif '--update' in arg:
# Signifies that dependencies should be updated.
update = True
elif '--upload' in arg:
# Signifies that the resulting packages should be uploaded to S3
upload = True
elif '--test' in arg:
# Run tests and exit
test = True
elif '--parallel' in arg:
# Set parallel for tests.
parallel = int(arg.split("=")[1])
@ -620,8 +574,6 @@ def main():
elif '--bucket' in arg:
# The bucket to upload the packages to, relies on boto
upload_bucket = arg.split("=")[1]
elif '--no-get' in arg:
run_get = False
elif '--debug' in arg:
print "[DEBUG] Using debug output"
debug = True
@ -665,15 +617,10 @@ def main():
target_arch = 'i386'
elif target_arch == 'x86_64':
target_arch = 'amd64'
build_output = {}
if test:
if not run_tests(race, parallel, timeout, no_vet):
return 1
return 0
if run_get:
go_get(update=update)
build_output = {}
go_get()
platforms = []
single_build = True
@ -684,6 +631,8 @@ def main():
platforms = [target_platform]
for platform in platforms:
if platform in prereq_cmds:
run(prereq_cmds[platform])
build_output.update( { platform : {} } )
archs = []
if target_arch == "all":

View File

@ -61,13 +61,13 @@ exit_if_fail go test -race ./...
# Simple Integration Tests
# check that version was properly set
exit_if_fail "./telegraf -version | grep $VERSION"
exit_if_fail "telegraf -version | grep $VERSION"
# check that one test cpu & mem output work
tmpdir=$(mktemp -d)
./telegraf -sample-config > $tmpdir/config.toml
exit_if_fail ./telegraf -config $tmpdir/config.toml \
telegraf -sample-config > $tmpdir/config.toml
exit_if_fail telegraf -config $tmpdir/config.toml \
-test -input-filter cpu:mem
mv ./telegraf $CIRCLE_ARTIFACTS
mv $GOPATH/bin/telegraf $CIRCLE_ARTIFACTS
exit $rc

View File

@ -108,6 +108,8 @@ func (a *Accumulator) Get(measurement string) (*Metric, bool) {
// NFields returns the total number of fields in the accumulator, across all
// measurements
func (a *Accumulator) NFields() int {
a.Lock()
defer a.Unlock()
counter := 0
for _, pt := range a.Metrics {
for _, _ = range pt.Fields {
@ -123,6 +125,8 @@ func (a *Accumulator) AssertContainsTaggedFields(
fields map[string]interface{},
tags map[string]string,
) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if !reflect.DeepEqual(tags, p.Tags) {
continue
@ -148,6 +152,8 @@ func (a *Accumulator) AssertContainsFields(
measurement string,
fields map[string]interface{},
) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
if !reflect.DeepEqual(fields, p.Fields) {
@ -166,6 +172,8 @@ func (a *Accumulator) AssertContainsFields(
// HasIntValue returns true if the measurement has an Int value
func (a *Accumulator) HasIntField(measurement string, field string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
@ -182,6 +190,8 @@ func (a *Accumulator) HasIntField(measurement string, field string) bool {
// HasUIntValue returns true if the measurement has a UInt value
func (a *Accumulator) HasUIntField(measurement string, field string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
@ -198,6 +208,8 @@ func (a *Accumulator) HasUIntField(measurement string, field string) bool {
// HasFloatValue returns true if the given measurement has a float value
func (a *Accumulator) HasFloatField(measurement string, field string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
@ -215,6 +227,8 @@ func (a *Accumulator) HasFloatField(measurement string, field string) bool {
// HasMeasurement returns true if the accumulator has a measurement with the
// given name
func (a *Accumulator) HasMeasurement(measurement string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
return true