Compare commits

...

28 Commits

Author SHA1 Message Date
Cameron Sparr
ff2de0c715 Only build the docker plugin on linux 2015-08-27 17:09:18 -06:00
Cameron Sparr
5b78b1e548 Clean up agent error handling and logging of outputs/plugins
Closes #145
2015-08-27 13:41:19 -06:00
Cameron Sparr
d1f965ae30 Kafka output producer, send telegraf metrics to Kafka brokers
Closes #38
2015-08-26 17:03:58 -06:00
Cameron Sparr
434267898b Indent the toml config for readability 2015-08-26 09:22:03 -06:00
Cameron Sparr
a00510a73c Outputs enhancement to require Description and SampleConfig functions
Closes #142
2015-08-26 07:34:26 -06:00
Cameron Sparr
846fd31121 Improve build from source instructions
Closes #141
2015-08-25 18:18:56 -06:00
Cameron Sparr
ab4344a781 Merge problem, re-enable non-standard DB names 2015-08-25 16:52:16 -06:00
Cameron Sparr
ac97fefb91 makefile: ADVERTISED_HOST needs only be set during docker-compose target 2015-08-25 16:34:30 -06:00
subhachandrachandra
8d034f544c Fixed memory reporting for Linux systems
/proc/meminfo reports memory in KiloBytes and so needs a multiplier of 1024 instead of 1000.
The kernel reports in terms of pages and the proc filesystem is left shifting by 2 for 4KB pages to get KB. Since this is a binary shift, Bytes will need to shift by 10 and so get multiplied by 1024.

From the kernel code. PAGE_SHIFT = 12 for 4KB pages
"MemTotal:       %8lu kB\n", K(i.totalram)

Closes #131
2015-08-25 14:18:14 -06:00
subhachandrachandra
ca1d2c7000 Fixed total memory reporting for Darwin systems. hw.memsize is reported as bytes instead of pages. 2015-08-25 14:16:18 -06:00
Bruno Bigras
0acf15c025 Typo: prec -> perc
Closes #140
2015-08-25 14:15:12 -06:00
Cameron Sparr
94eed9b43c Add MySQL server address tag to all measurements
Closes #138
2015-08-25 13:58:55 -06:00
Bruno Bigras
8a6665c03f memcached: fix when a value contains a space
Fixes #137
Closes #139
2015-08-25 13:14:40 -06:00
Cameron Sparr
85ae6fffbb Vagrantfile: do a one-way rsync so that binaries don't get shared between VMs and host 2015-08-25 11:54:12 -06:00
Cameron Sparr
bd85a36cb1 Fixes #130, document mysql plugin better, README 2015-08-24 15:08:16 -06:00
Cameron Sparr
a449e4b47c Add #136 to CHANGELOG 2015-08-24 14:56:50 -06:00
Cameron Sparr
42602a3f35 Provide a -usage flag for printing the usage of a single plugin
Closes #136
2015-08-24 14:52:46 -06:00
Cameron Sparr
50f902cb02 Fixes #128, add system load and swap back to default Telegraf config 2015-08-24 13:26:21 -06:00
nickscript0
b014ac12ee Update CHANGELOG.md 2015-08-24 13:09:23 -06:00
nickscript0
610f24e0cd Update CHANGELOG.md 2015-08-24 13:09:23 -06:00
nsvarich
f45f7e56fd add plugin.name to error message 2015-08-24 13:09:23 -06:00
nickscript0
afe366d6b7 go fmt remove whitespace 2015-08-24 13:09:23 -06:00
nickscript0
1daa059ef9 Log plugin errors in crankParallel and crankSeparate cases. Previously errors weren't logged in these cases. 2015-08-24 13:09:23 -06:00
Cameron Sparr
9777aa6165 Update README to point to url without 'v' prepended to version 2015-08-24 10:48:21 -06:00
Cameron Sparr
143ec1a019 Filter out the 'v' from the version tag, issue #134 2015-08-24 10:39:15 -06:00
Cameron Sparr
3d05575e9d Fix for #129 README typo in the 0.1.6 package name url 2015-08-21 10:43:19 -06:00
Cameron Sparr
9d00b5e165 Version= doesnt work on go1.4.2
fixing makefile & vagrantfile & build script to reflect that
2015-08-20 16:43:25 -06:00
Cameron Sparr
a29b39e17a README typo fix 2015-08-20 15:18:45 -06:00
45 changed files with 666 additions and 378 deletions

View File

@@ -1,3 +1,20 @@
## v0.1.7 [unreleased]
### Features
- [#38](https://github.com/influxdb/telegraf/pull/38): Kafka output producer.
- [#133](https://github.com/influxdb/telegraf/pull/133): Add plugin.Gather error logging. Thanks @nickscript0!
- [#136](https://github.com/influxdb/telegraf/issues/136): Add a -usage flag for printing usage of a single plugin.
- [#137](https://github.com/influxdb/telegraf/issues/137): Memcached: fix when a value contains a space
- [#138](https://github.com/influxdb/telegraf/issues/138): MySQL server address tag.
- [#142](https://github.com/influxdb/telegraf/pull/142): Add Description and SampleConfig funcs to output interface
- Indent the toml config file for readability
### Bugfixes
- [#128](https://github.com/influxdb/telegraf/issues/128): system_load measurement missing.
- [#129](https://github.com/influxdb/telegraf/issues/129): Latest pkg url fix.
- [#131](https://github.com/influxdb/telegraf/issues/131): Fix memory reporting on linux & darwin. Thanks @subhachandrachandra!
- [#140](https://github.com/influxdb/telegraf/issues/140): Memory plugin prec->perc typo fix. Thanks @brunoqc!
## v0.1.6 [2015-08-20]
### Features

View File

@@ -3,24 +3,21 @@ VERSION := $(shell sh -c 'git describe --always --tags')
build: prepare
$(GOPATH)/bin/godep go build -o telegraf -ldflags \
"-X main.Version=$(VERSION)" \
"-X main.Version $(VERSION)" \
./cmd/telegraf/telegraf.go
prepare:
go get github.com/tools/godep
docker-compose:
docker-compose up -d
test:
ifeq ($(UNAME), Darwin)
ADVERTISED_HOST=$(shell sh -c 'boot2docker ip') $(MAKE) test-full
ADVERTISED_HOST=$(shell sh -c 'boot2docker ip') docker-compose up -d
endif
ifeq ($(UNAME), Linux)
ADVERTISED_HOST=localhost $(MAKE) test-full
ADVERTISED_HOST=localhost docker-compose up -d
endif
test-full: prepare docker-compose
test: prepare docker-compose
$(GOPATH)/bin/godep go test -v ./...
test-short: prepare

View File

@@ -29,11 +29,11 @@ are some InfluxDB compatibility requirements:
* InfluxDB 0.9.3+ (including nightly builds) requires Telegraf 0.1.5+
* InfluxDB 0.9.2 and prior requires Telegraf 0.1.4
Telegraf 0.1.5
Latest:
* http://get.influxdb.org/telegraf/telegraf_0.1.6_amd64.deb
* http://get.influxdb.org/telegraf/telegraf-0.1.6-1.x86_64.rpm
Telegraf 0.1.4
0.1.4:
* http://get.influxdb.org/telegraf/telegraf_0.1.4_amd64.deb
* http://get.influxdb.org/telegraf/telegraf-0.1.4-1.x86_64.rpm
@@ -46,9 +46,14 @@ brew install telegraf
### From Source:
Telegraf manages dependencies via `godep`, which gets installed via the Makefile.
Assuming you have your GOPATH setup, `make build` should be enough to gather dependencies
and build telegraf.
Telegraf manages dependencies via `godep`, which gets installed via the Makefile
if you don't have it already. You also must build with golang version 1.4+
1. [Install Go](https://golang.org/doc/install)
2. [Setup your GOPATH](https://golang.org/doc/code.html#GOPATH)
3. run `go get github.com/influxdb/telegraf`
4. `cd $GOPATH/src/github.com/influxdb/telegraf`
5. run `make`
### How to use it:
@@ -98,21 +103,22 @@ at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output
measurements at a 10s interval and will collect totalcpu & percpu data.
```
[outputs]
[outputs.influxdb]
url = "http://192.168.59.103:8086" # required.
database = "telegraf" # required.
[tags]
dc = "denver-1"
dc = "denver-1"
[agent]
interval = "10s"
interval = "10s"
# OUTPUTS
[outputs]
[outputs.influxdb]
url = "http://192.168.59.103:8086" # required.
database = "telegraf" # required.
# PLUGINS
[cpu]
percpu = true
totalcpu = true
percpu = true
totalcpu = true
```
Below is how to configure `tagpass` parameters (added in 0.1.4)
@@ -120,24 +126,27 @@ Below is how to configure `tagpass` parameters (added in 0.1.4)
```
# Don't collect CPU data for cpu6 & cpu7
[cpu.tagdrop]
cpu = [ "cpu6", "cpu7" ]
cpu = [ "cpu6", "cpu7" ]
[disk]
[disk.tagpass]
# tagpass conditions are OR, not AND.
# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home)
# then the metric passes
fstype = [ "ext4", "xfs" ]
path = [ "/opt", "/home" ]
# tagpass conditions are OR, not AND.
# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home)
# then the metric passes
fstype = [ "ext4", "xfs" ]
path = [ "/opt", "/home" ]
```
## Supported Plugins
Telegraf currently has support for collecting metrics from:
**You can view usage instructions for each plugin by running**
`telegraf -usage <pluginname>`
Telegraf currently has support for collecting metrics from
* disque
* elasticsearch
* exec (generic executable JSON-gathering plugin)
* exec (generic JSON-emitting executable plugin)
* haproxy
* httpjson (generic JSON-emitting http service plugin)
* kafka_consumer

10
Vagrantfile vendored
View File

@@ -7,7 +7,10 @@ VAGRANTFILE_API_VERSION = "2"
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.box = "ubuntu/trusty64"
config.vm.synced_folder ".", "/home/vagrant/go/src/github.com/influxdb/telegraf"
config.vm.synced_folder ".", "/home/vagrant/go/src/github.com/influxdb/telegraf",
type: "rsync",
rsync__args: ["--verbose", "--archive", "--delete", "-z", "--safe-links"],
rsync__exclude: ["./telegraf", ".vagrant/"]
config.vm.provision "shell", name: "sudo", inline: <<-SHELL
chown -R vagrant:vagrant /home/vagrant/go
@@ -22,7 +25,10 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
gvm install go1.4.2 --prefer-binary
gvm use go1.4.2 --default
echo "export PATH=$PATH:$GOPATH/bin" >> "$HOME/.bashrc"
cd "$HOME/go/src/github.com/influxdb/telegraf" && make
echo 'export GOPATH=/home/vagrant/go' >> "$HOME/.bashrc"
cd "$HOME/go/src/github.com/influxdb/telegraf" &&\
rm -rf Godeps/_workspace/pkg &&\
GOPATH="$HOME/go" make
SHELL
config.vm.provider "virtualbox" do |vb|

View File

@@ -1,6 +1,7 @@
package telegraf
import (
"errors"
"fmt"
"log"
"os"
@@ -74,6 +75,9 @@ func (a *Agent) Connect() error {
if err != nil {
return err
}
if a.Debug {
log.Printf("Successfully connected to output: %s\n", o.name)
}
}
return nil
}
@@ -160,6 +164,8 @@ func (a *Agent) LoadPlugins(pluginsFilter string) ([]string, error) {
return names, nil
}
// crankParallel runs the plugins that are using the same reporting interval
// as the telegraf agent.
func (a *Agent) crankParallel() error {
points := make(chan *BatchPoints, len(a.plugins))
@@ -179,7 +185,9 @@ func (a *Agent) crankParallel() error {
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
plugin.plugin.Gather(&acc)
if err := plugin.plugin.Gather(&acc); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.name, err)
}
points <- &acc
}(plugin)
@@ -200,6 +208,7 @@ func (a *Agent) crankParallel() error {
return a.flush(&bp)
}
// crank is mostly for test purposes.
func (a *Agent) crank() error {
var bp BatchPoints
@@ -220,27 +229,34 @@ func (a *Agent) crank() error {
return a.flush(&bp)
}
// crankSeparate runs the plugins that have been configured with their own
// reporting interval.
func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
ticker := time.NewTicker(plugin.config.Interval)
for {
var bp BatchPoints
var outerr error
bp.Debug = a.Debug
bp.Prefix = plugin.name + "_"
bp.Config = plugin.config
err := plugin.plugin.Gather(&bp)
if err != nil {
return err
if err := plugin.plugin.Gather(&bp); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.name, err)
outerr = errors.New("Error encountered processing plugins & outputs")
}
bp.Tags = a.Config.Tags
bp.Time = time.Now()
err = a.flush(&bp)
if err != nil {
return err
if err := a.flush(&bp); err != nil {
outerr = errors.New("Error encountered processing plugins & outputs")
}
if outerr != nil {
return outerr
}
select {
@@ -255,16 +271,20 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
func (a *Agent) flush(bp *BatchPoints) error {
var wg sync.WaitGroup
var outerr error
for _, o := range a.outputs {
wg.Add(1)
go func(ro *runningOutput) {
defer wg.Done()
outerr = ro.output.Write(bp.BatchPoints)
// Log all output errors:
if err := ro.output.Write(bp.BatchPoints); err != nil {
log.Printf("Error in output [%s]: %s", ro.name, err)
outerr = errors.New("Error encountered flushing outputs")
}
}(o)
}
wg.Wait()
return outerr
}
@@ -290,8 +310,7 @@ func (a *Agent) TestAllPlugins() error {
fmt.Printf("* Plugin: %s\n", name)
acc.Prefix = name + "_"
err := plugin.Gather(&acc)
if err != nil {
if err := plugin.Gather(&acc); err != nil {
return err
}
}
@@ -315,8 +334,7 @@ func (a *Agent) Test() error {
fmt.Printf("* Internal: %s\n", plugin.config.Interval)
}
err := plugin.plugin.Gather(&acc)
if err != nil {
if err := plugin.plugin.Gather(&acc); err != nil {
return err
}
}
@@ -333,7 +351,9 @@ func (a *Agent) Run(shutdown chan struct{}) error {
wg.Add(1)
go func(plugin *runningPlugin) {
defer wg.Done()
a.crankSeparate(shutdown, plugin)
if err := a.crankSeparate(shutdown, plugin); err != nil {
log.Printf(err.Error())
}
}(plugin)
}
}
@@ -343,9 +363,8 @@ func (a *Agent) Run(shutdown chan struct{}) error {
ticker := time.NewTicker(a.Interval.Duration)
for {
err := a.crankParallel()
if err != nil {
log.Printf("Error in plugins: %s", err)
if err := a.crankParallel(); err != nil {
log.Printf(err.Error())
}
select {

View File

@@ -30,16 +30,16 @@ func TestAgent_LoadPlugin(t *testing.T) {
assert.Equal(t, 2, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins("")
assert.Equal(t, 24, len(pluginsEnabled))
assert.Equal(t, 23, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins(" ")
assert.Equal(t, 24, len(pluginsEnabled))
assert.Equal(t, 23, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins(" ")
assert.Equal(t, 24, len(pluginsEnabled))
assert.Equal(t, 23, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins("\n\t")
assert.Equal(t, 24, len(pluginsEnabled))
assert.Equal(t, 23, len(pluginsEnabled))
}
/*

View File

@@ -5,6 +5,7 @@
# build process for InfluxDB.
BUILD_DIR=$HOME/telegraf-build
VERSION=`git describe --always --tags`
# GO_VERSION=go1.4.2
# source $HOME/.gvm/scripts/gvm
# exit_if_fail gvm use $GO_VERSION
@@ -13,7 +14,7 @@ BUILD_DIR=$HOME/telegraf-build
function exit_if_fail {
command=$@
echo "Executing '$command'"
$command
eval $command
rc=$?
if [ $rc -ne 0 ]; then
echo "'$command' returned $rc."
@@ -25,7 +26,7 @@ function exit_if_fail {
function build {
echo -n "=> $1-$2: "
GOOS=$1 GOARCH=$2 godep go build -o telegraf-$1-$2 \
-ldflags "-X main.Version=$3" \
-ldflags "-X main.Version $3" \
./cmd/telegraf/telegraf.go
du -h telegraf-$1-$2
}
@@ -61,11 +62,14 @@ exit_if_fail godep go vet ./...
exit_if_fail godep go test -v -short ./...
# Build binaries
build "linux" "amd64" `git describe --always --tags`
build "linux" "386" `git describe --always --tags`
build "linux" "arm" `git describe --always --tags`
build "linux" "amd64" $VERSION
build "linux" "386" $VERSION
build "linux" "arm" $VERSION
# simple integration test
# Simple Integration Tests
# check that version was properly set
exit_if_fail "./telegraf-linux-amd64 -version | grep $VERSION"
# check that one test cpu & mem output work
tmpdir=$(mktemp -d)
./telegraf-linux-amd64 -sample-config > $tmpdir/config.toml
exit_if_fail ./telegraf-linux-amd64 -config $tmpdir/config.toml \

View File

@@ -13,13 +13,18 @@ import (
_ "github.com/influxdb/telegraf/plugins/all"
)
var fDebug = flag.Bool("debug", false, "show metrics as they're generated to stdout")
var fDebug = flag.Bool("debug", false,
"show metrics as they're generated to stdout")
var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit")
var fConfig = flag.String("config", "", "configuration file to load")
var fVersion = flag.Bool("version", false, "display the version")
var fSampleConfig = flag.Bool("sample-config", false, "print out full sample configuration")
var fSampleConfig = flag.Bool("sample-config", false,
"print out full sample configuration")
var fPidfile = flag.String("pidfile", "", "file to write our pid to")
var fPLuginsFilter = flag.String("filter", "", "filter the plugins to enable, separator is :")
var fPLuginsFilter = flag.String("filter", "",
"filter the plugins to enable, separator is :")
var fUsage = flag.String("usage", "",
"print usage for a plugin, ie, 'telegraf -usage mysql'")
// Telegraf version
// -ldflags "-X main.Version=`git describe --always --tags`"
@@ -39,6 +44,13 @@ func main() {
return
}
if *fUsage != "" {
if err := telegraf.PrintPluginConfig(*fUsage); err != nil {
log.Fatal(err)
}
return
}
var (
config *telegraf.Config
err error
@@ -102,11 +114,8 @@ func main() {
}
shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt)
go func() {
<-signals
close(shutdown)

116
config.go
View File

@@ -8,6 +8,7 @@ import (
"strings"
"time"
"github.com/influxdb/telegraf/outputs"
"github.com/influxdb/telegraf/plugins"
"github.com/naoina/toml"
"github.com/naoina/toml/ast"
@@ -326,9 +327,6 @@ type hasDescr interface {
var header = `# Telegraf configuration
# If this file is missing an [agent] section, you must first generate a
# valid config with 'telegraf -sample-config > telegraf.toml'
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared plugins.
@@ -348,76 +346,90 @@ var header = `# Telegraf configuration
# NOTE: The configuration has a few required parameters. They are marked
# with 'required'. Be sure to edit those to make this configuration work.
# OUTPUTS
[outputs]
# Configuration for influxdb server to send metrics to
[outputs.influxdb]
# The full HTTP endpoint URL for your InfluxDB instance
url = "http://localhost:8086" # required.
# The target database for metrics. This database must already exist
database = "telegraf" # required.
# Connection timeout (for the connection with InfluxDB), formatted as a string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# If not provided, will default to 0 (no timeout)
# timeout = "5s"
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
# Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
# Tags can also be specified via a normal map, but only one form at a time:
# [tags]
# dc = "us-east-1" }
[tags]
# dc = "us-east-1"
# Configuration for telegraf itself
# [agent]
# interval = "10s"
# debug = false
# hostname = "prod3241"
[agent]
# interval = "10s"
# debug = false
# hostname = "prod3241"
# PLUGINS
###############################################################################
# OUTPUTS #
###############################################################################
[outputs]
`
var header2 = `
###############################################################################
# PLUGINS #
###############################################################################
`
// PrintSampleConfig prints the sample config!
func PrintSampleConfig() {
fmt.Printf(header)
var names []string
// Print Outputs
var onames []string
for name := range plugins.Plugins {
names = append(names, name)
for oname := range outputs.Outputs {
onames = append(onames, oname)
}
sort.Strings(onames)
for _, oname := range onames {
creator := outputs.Outputs[oname]
output := creator()
fmt.Printf("\n# %s\n[outputs.%s]", output.Description(), oname)
config := output.SampleConfig()
if config == "" {
fmt.Printf("\n # no configuration\n")
} else {
fmt.Printf(config)
}
}
sort.Strings(names)
fmt.Printf(header2)
for _, name := range names {
creator := plugins.Plugins[name]
// Print Plugins
var pnames []string
for pname := range plugins.Plugins {
pnames = append(pnames, pname)
}
sort.Strings(pnames)
for _, pname := range pnames {
creator := plugins.Plugins[pname]
plugin := creator()
fmt.Printf("# %s\n[%s]\n", plugin.Description(), name)
var config string
config = strings.TrimSpace(plugin.SampleConfig())
fmt.Printf("\n# %s\n[%s]", plugin.Description(), pname)
config := plugin.SampleConfig()
if config == "" {
fmt.Printf(" # no configuration\n\n")
fmt.Printf("\n # no configuration\n")
} else {
fmt.Printf("\n")
lines := strings.Split(config, "\n")
for _, line := range lines {
fmt.Printf("%s\n", line)
}
fmt.Printf("\n")
fmt.Printf(config)
}
}
}
// PrintPluginConfig prints the config usage of a single plugin.
func PrintPluginConfig(name string) error {
if creator, ok := plugins.Plugins[name]; ok {
plugin := creator()
fmt.Printf("# %s\n[%s]\n", plugin.Description(), name)
fmt.Printf(strings.TrimSpace(plugin.SampleConfig()))
} else {
return errors.New(fmt.Sprintf("Plugin %s not found", name))
}
return nil
}

View File

@@ -68,3 +68,9 @@ totalcpu = true
# Read metrics about memory usage
[mem]
# no configuration
[system]
# no configuration
[swap]
# no configuration

View File

@@ -3,4 +3,5 @@ package all
import (
_ "github.com/influxdb/telegraf/outputs/datadog"
_ "github.com/influxdb/telegraf/outputs/influxdb"
_ "github.com/influxdb/telegraf/outputs/kafka"
)

View File

@@ -21,6 +21,14 @@ type Datadog struct {
client *http.Client
}
var sampleConfig = `
# Datadog API key
apikey = "my-secret-key" # required.
# Connection timeout.
# timeout = "5s"
`
type TimeSeries struct {
Series []*Metric `json:"series"`
}
@@ -91,6 +99,14 @@ func (d *Datadog) Write(bp client.BatchPoints) error {
return nil
}
func (d *Datadog) SampleConfig() string {
return sampleConfig
}
func (d *Datadog) Description() string {
return "Configuration for DataDog API to send metrics to."
}
func (d *Datadog) authenticatedUrl() string {
q := url.Values{
"api_key": []string{d.Apikey},

View File

@@ -9,6 +9,8 @@ import (
"testing"
"time"
"github.com/influxdb/telegraf/testutil"
"github.com/influxdb/influxdb/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -25,18 +27,6 @@ func fakeDatadog() *Datadog {
return d
}
func testData() client.BatchPoints {
var bp client.BatchPoints
bp.Time = time.Now()
bp.Tags = map[string]string{"tag1": "value1"}
bp.Points = []client.Point{
{
Fields: map[string]interface{}{"value": 1.0},
},
}
return bp
}
func TestUriOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
@@ -48,7 +38,7 @@ func TestUriOverride(t *testing.T) {
d.Apikey = "123456"
err := d.Connect()
require.NoError(t, err)
err = d.Write(testData())
err = d.Write(testutil.MockBatchPoints())
require.NoError(t, err)
}
@@ -67,7 +57,7 @@ func TestBadStatusCode(t *testing.T) {
d.Apikey = "123456"
err := d.Connect()
require.NoError(t, err)
err = d.Write(testData())
err = d.Write(testutil.MockBatchPoints())
if err == nil {
t.Errorf("error expected but none returned")
} else {

View File

@@ -22,6 +22,25 @@ type InfluxDB struct {
conn *client.Client
}
var sampleConfig = `
# The full HTTP endpoint URL for your InfluxDB instance
url = "http://localhost:8086" # required.
# The target database for metrics. This database must already exist
database = "telegraf" # required.
# Connection timeout (for the connection with InfluxDB), formatted as a string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# If not provided, will default to 0 (no timeout)
# timeout = "5s"
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
# Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
`
func (i *InfluxDB) Connect() error {
u, err := url.Parse(i.URL)
if err != nil {
@@ -41,7 +60,7 @@ func (i *InfluxDB) Connect() error {
}
_, err = c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE telegraf"),
Command: fmt.Sprintf("CREATE DATABASE %s", i.Database),
})
if err != nil && !strings.Contains(err.Error(), "database already exists") {
@@ -57,6 +76,14 @@ func (i *InfluxDB) Close() error {
return nil
}
func (i *InfluxDB) SampleConfig() string {
return sampleConfig
}
func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to"
}
func (i *InfluxDB) Write(bp client.BatchPoints) error {
bp.Database = i.Database
if _, err := i.conn.Write(bp); err != nil {

91
outputs/kafka/kafka.go Normal file
View File

@@ -0,0 +1,91 @@
package kafka
import (
"errors"
"fmt"
"github.com/Shopify/sarama"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/outputs"
)
type Kafka struct {
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Topic string
producer sarama.SyncProducer
}
var sampleConfig = `
# URLs of kafka brokers
brokers = ["localhost:9092"]
# Kafka topic for producer messages
topic = "telegraf"
`
func (k *Kafka) Connect() error {
producer, err := sarama.NewSyncProducer(k.Brokers, nil)
if err != nil {
return err
}
k.producer = producer
return nil
}
func (k *Kafka) Close() error {
return k.producer.Close()
}
func (k *Kafka) SampleConfig() string {
return sampleConfig
}
func (k *Kafka) Description() string {
return "Configuration for the Kafka server to send metrics to"
}
func (k *Kafka) Write(bp client.BatchPoints) error {
if len(bp.Points) == 0 {
return nil
}
for _, p := range bp.Points {
// Combine tags from Point and BatchPoints and grab the resulting
// line-protocol output string to write to Kafka
var value string
if p.Raw != "" {
value = p.Raw
} else {
for k, v := range bp.Tags {
if p.Tags == nil {
p.Tags = make(map[string]string, len(bp.Tags))
}
p.Tags[k] = v
}
value = p.MarshalString()
}
m := &sarama.ProducerMessage{
Topic: k.Topic,
Value: sarama.StringEncoder(value),
}
if h, ok := p.Tags["host"]; ok {
m.Key = sarama.StringEncoder(h)
}
_, _, err := k.producer.SendMessage(m)
if err != nil {
return errors.New(fmt.Sprintf("FAILED to send kafka message: %s\n",
err))
}
}
return nil
}
func init() {
outputs.Add("kafka", func() outputs.Output {
return &Kafka{}
})
}

View File

@@ -0,0 +1,28 @@
package kafka
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
brokers := []string{testutil.GetLocalHost() + ":9092"}
k := &Kafka{
Brokers: brokers,
Topic: "Test",
}
// Verify that we can connect to the Kafka broker
err := k.Connect()
require.NoError(t, err)
// Verify that we can successfully write data to the kafka broker
err = k.Write(testutil.MockBatchPoints())
require.NoError(t, err)
}

View File

@@ -7,6 +7,8 @@ import (
type Output interface {
Connect() error
Close() error
Description() string
SampleConfig() string
Write(client.BatchPoints) error
}

View File

@@ -141,7 +141,7 @@ do_build() {
rm -f $GOPATH_INSTALL/bin/$b
done
godep go install -a -ldflags="-X main.Version=$version" ./...
godep go install -a -ldflags="-X main.Version $version" ./...
if [ $? -ne 0 ]; then
echo "Build failed, unable to create package -- aborting"
cleanup_exit 1
@@ -188,7 +188,7 @@ if [ "$1" == "-h" ]; then
usage 0
fi
VERSION=`git describe --always --tags`
VERSION=`git describe --always --tags | tr -d v`
echo -e "\nStarting package process, version: $VERSION\n"

View File

@@ -21,12 +21,13 @@ type Disque struct {
}
var sampleConfig = `
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port and password. ie disque://localhost, disque://10.10.3.33:18832,
# 10.0.0.1:10000, etc.
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]`
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port and password. ie disque://localhost, disque://10.10.3.33:18832,
# 10.0.0.1:10000, etc.
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]
`
func (r *Disque) SampleConfig() string {
return sampleConfig

View File

@@ -28,12 +28,12 @@ type node struct {
}
const sampleConfig = `
# specify a list of one or more Elasticsearch servers
servers = ["http://localhost:9200"]
# specify a list of one or more Elasticsearch servers
servers = ["http://localhost:9200"]
# set local to false when you want to read the indices stats from all nodes
# within the cluster
local = true
# set local to false when you want to read the indices stats from all nodes
# within the cluster
local = true
`
// Elasticsearch is a plugin to read stats from one or many Elasticsearch

View File

@@ -11,13 +11,13 @@ import (
)
const sampleConfig = `
# specify commands via an array of tables
[[exec.commands]]
# the command to run
command = "/usr/bin/mycollector --foo=bar"
# specify commands via an array of tables
[[exec.commands]]
# the command to run
command = "/usr/bin/mycollector --foo=bar"
# name of the command (used as a prefix for measurements)
name = "mycollector"
# name of the command (used as a prefix for measurements)
name = "mycollector"
`
type Command struct {

View File

@@ -84,13 +84,13 @@ type haproxy struct {
}
var sampleConfig = `
# An array of address to gather stats about. Specify an ip on hostname
# with optional port. ie localhost, 10.10.3.33:1936, etc.
#
# If no servers are specified, then default to 127.0.0.1:1936
servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"]
# Or you can also use local socket(not work yet)
# servers = ["socket:/run/haproxy/admin.sock"]
# An array of address to gather stats about. Specify an ip on hostname
# with optional port. ie localhost, 10.10.3.33:1936, etc.
#
# If no servers are specified, then default to 127.0.0.1:1936
servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"]
# Or you can also use local socket(not work yet)
# servers = ["socket:/run/haproxy/admin.sock"]
`
func (r *haproxy) SampleConfig() string {

View File

@@ -46,25 +46,25 @@ func (c RealHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) {
}
var sampleConfig = `
# Specify services via an array of tables
[[httpjson.services]]
# Specify services via an array of tables
[[httpjson.services]]
# a name for the service being polled
name = "webserver_stats"
# a name for the service being polled
name = "webserver_stats"
# URL of each server in the service's cluster
servers = [
"http://localhost:9999/stats/",
"http://localhost:9998/stats/",
]
# URL of each server in the service's cluster
servers = [
"http://localhost:9999/stats/",
"http://localhost:9998/stats/",
]
# HTTP method to use (case-sensitive)
method = "GET"
# HTTP method to use (case-sensitive)
method = "GET"
# HTTP parameters (all values must be strings)
[httpjson.services.parameters]
event_type = "cpu_spike"
threshold = "0.75"
# HTTP parameters (all values must be strings)
[httpjson.services.parameters]
event_type = "cpu_spike"
threshold = "0.75"
`
func (h *HttpJson) SampleConfig() string {

View File

@@ -20,17 +20,18 @@ type Kafka struct {
}
var sampleConfig = `
# topic to consume
topic = "topic_with_metrics"
# topic to consume
topic = "topic_with_metrics"
# the name of the consumer group
consumerGroupName = "telegraf_metrics_consumers"
# the name of the consumer group
consumerGroupName = "telegraf_metrics_consumers"
# an array of Zookeeper connection strings
zookeeperPeers = ["localhost:2181"]
# an array of Zookeeper connection strings
zookeeperPeers = ["localhost:2181"]
# Batch size of points sent to InfluxDB
batchSize = 1000`
# Batch size of points sent to InfluxDB
batchSize = 1000
`
func (k *Kafka) SampleConfig() string {
return sampleConfig

View File

@@ -131,11 +131,11 @@ var serverTypeMapping = map[string]ServerType{
}
var sampleConfig = `
# An array of URI to gather stats about LeoFS.
# Specify an ip or hostname with port. ie 127.0.0.1:4020
#
# If no servers are specified, then 127.0.0.1 is used as the host and 4020 as the port.
servers = ["127.0.0.1:4021"]
# An array of URI to gather stats about LeoFS.
# Specify an ip or hostname with port. ie 127.0.0.1:4020
#
# If no servers are specified, then 127.0.0.1 is used as the host and 4020 as the port.
servers = ["127.0.0.1:4021"]
`
func (l *LeoFS) SampleConfig() string {

View File

@@ -25,11 +25,12 @@ type Lustre2 struct {
}
var sampleConfig = `
# An array of /proc globs to search for Lustre stats
# If not specified, the default will work on Lustre 2.5.x
#
# ost_procfiles = ["/proc/fs/lustre/obdfilter/*/stats", "/proc/fs/lustre/osd-ldiskfs/*/stats"]
# mds_procfiles = ["/proc/fs/lustre/mdt/*/md_stats"]`
# An array of /proc globs to search for Lustre stats
# If not specified, the default will work on Lustre 2.5.x
#
# ost_procfiles = ["/proc/fs/lustre/obdfilter/*/stats", "/proc/fs/lustre/osd-ldiskfs/*/stats"]
# mds_procfiles = ["/proc/fs/lustre/mdt/*/md_stats"]
`
/* The wanted fields would be a []string if not for the
lines that start with read_bytes/write_bytes and contain

View File

@@ -17,11 +17,12 @@ type Memcached struct {
}
var sampleConfig = `
# An array of address to gather stats about. Specify an ip on hostname
# with optional port. ie localhost, 10.0.0.1:11211, etc.
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]`
# An array of address to gather stats about. Specify an ip on hostname
# with optional port. ie localhost, 10.0.0.1:11211, etc.
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]
`
var defaultTimeout = 5 * time.Second
@@ -100,17 +101,16 @@ func (m *Memcached) gatherServer(address string, acc plugins.Accumulator) error
break
}
// Read values
var name, value string
n, errScan := fmt.Sscanf(string(line), "STAT %s %s\r\n", &name, &value)
if errScan != nil || n != 2 {
s := bytes.SplitN(line, []byte(" "), 3)
if len(s) != 3 || !bytes.Equal(s[0], []byte("STAT")) {
return fmt.Errorf("unexpected line in stats response: %q", line)
}
// Save values
values[name] = value
values[string(s[1])] = string(s[2])
}
//
// Add server address as a tag
tags := map[string]string{"server": address}
// Process values

View File

@@ -25,12 +25,13 @@ type Ssl struct {
}
var sampleConfig = `
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017,
# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
servers = ["127.0.0.1:27017"]`
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017,
# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
servers = ["127.0.0.1:27017"]
`
func (m *MongoDB) SampleConfig() string {
return sampleConfig

View File

@@ -14,12 +14,15 @@ type Mysql struct {
}
var sampleConfig = `
# specify servers via a url matching:
# [username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify]]
# e.g. root:root@http://10.0.0.18/?tls=false
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]`
# specify servers via a url matching:
# [username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify]]
# e.g.
# root:root@http://10.0.0.18/?tls=false
# root:passwd@tcp(127.0.0.1:3036)/
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]
`
func (m *Mysql) SampleConfig() string {
return sampleConfig
@@ -109,10 +112,19 @@ func (m *Mysql) gatherServer(serv string, acc plugins.Accumulator) error {
var found bool
// Parse out user/password from server address tag if given
var servtag string
if strings.Contains(serv, "@") {
servtag = strings.Split(serv, "@")[1]
} else {
servtag = serv
}
tags := map[string]string{"server": servtag}
for _, mapped := range mappings {
if strings.HasPrefix(name, mapped.onServer) {
i, _ := strconv.Atoi(string(val.([]byte)))
acc.Add(mapped.inExport+name[len(mapped.onServer):], i, nil)
acc.Add(mapped.inExport+name[len(mapped.onServer):], i, tags)
found = true
}
}
@@ -128,14 +140,14 @@ func (m *Mysql) gatherServer(serv string, acc plugins.Accumulator) error {
return err
}
acc.Add("queries", i, nil)
acc.Add("queries", i, tags)
case "Slow_queries":
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
if err != nil {
return err
}
acc.Add("slow_queries", i, nil)
acc.Add("slow_queries", i, tags)
}
}

View File

@@ -19,8 +19,9 @@ type Nginx struct {
}
var sampleConfig = `
# An array of Nginx stub_status URI to gather stats.
urls = ["http://localhost/status"]`
# An array of Nginx stub_status URI to gather stats.
urls = ["http://localhost/status"]
`
func (n *Nginx) SampleConfig() string {
return sampleConfig

View File

@@ -18,28 +18,28 @@ type Postgresql struct {
}
var sampleConfig = `
# specify servers via an array of tables
[[postgresql.servers]]
# specify servers via an array of tables
[[postgresql.servers]]
# specify address via a url matching:
# postgres://[pqgotest[:password]]@localhost?sslmode=[disable|verify-ca|verify-full]
# or a simple string:
# host=localhost user=pqotest password=... sslmode=...
#
# All connection parameters are optional. By default, the host is localhost
# and the user is the currently running user. For localhost, we default
# to sslmode=disable as well.
#
# specify address via a url matching:
# postgres://[pqgotest[:password]]@localhost?sslmode=[disable|verify-ca|verify-full]
# or a simple string:
# host=localhost user=pqotest password=... sslmode=...
#
# All connection parameters are optional. By default, the host is localhost
# and the user is the currently running user. For localhost, we default
# to sslmode=disable as well.
#
address = "sslmode=disable"
address = "sslmode=disable"
# A list of databases to pull metrics about. If not specified, metrics for all
# databases are gathered.
# A list of databases to pull metrics about. If not specified, metrics for all
# databases are gathered.
# databases = ["app_production", "blah_testing"]
# databases = ["app_production", "blah_testing"]
# [[postgresql.servers]]
# address = "influx@remoteserver"
# [[postgresql.servers]]
# address = "influx@remoteserver"
`
func (p *Postgresql) SampleConfig() string {

View File

@@ -17,8 +17,9 @@ type Prometheus struct {
}
var sampleConfig = `
# An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]`
# An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]
`
func (r *Prometheus) SampleConfig() string {
return sampleConfig

View File

@@ -68,15 +68,15 @@ type Node struct {
}
var sampleConfig = `
# Specify servers via an array of tables
[[rabbitmq.servers]]
# url = "http://localhost:15672"
# username = "guest"
# password = "guest"
# Specify servers via an array of tables
[[rabbitmq.servers]]
# url = "http://localhost:15672"
# username = "guest"
# password = "guest"
# A list of nodes to pull metrics about. If not specified, metrics for
# all nodes are gathered.
# nodes = ["rabbit@node1", "rabbit@node2"]
# A list of nodes to pull metrics about. If not specified, metrics for
# all nodes are gathered.
# nodes = ["rabbit@node1", "rabbit@node2"]
`
func (r *RabbitMQ) SampleConfig() string {

View File

@@ -21,12 +21,13 @@ type Redis struct {
}
var sampleConfig = `
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie redis://localhost, redis://10.10.3.33:18832,
# 10.0.0.1:10000, etc.
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]`
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie redis://localhost, redis://10.10.3.33:18832,
# 10.0.0.1:10000, etc.
#
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]
`
func (r *Redis) SampleConfig() string {
return sampleConfig

View File

@@ -15,12 +15,13 @@ type RethinkDB struct {
}
var sampleConfig = `
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105,
# rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 28015 as the port.
servers = ["127.0.0.1:28015"]`
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105,
# rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 28015 as the port.
servers = ["127.0.0.1:28015"]
`
func (r *RethinkDB) SampleConfig() string {
return sampleConfig

View File

@@ -26,10 +26,11 @@ func (_ *CPUStats) Description() string {
}
var sampleConfig = `
# Whether to report per-cpu stats or not
percpu = true
# Whether to report total system cpu stats or not
totalcpu = true`
# Whether to report per-cpu stats or not
percpu = true
# Whether to report total system cpu stats or not
totalcpu = true
`
func (_ *CPUStats) SampleConfig() string {
return sampleConfig

View File

@@ -1,3 +1,5 @@
// +build linux
package system
import (

View File

@@ -0,0 +1,118 @@
// +build linux
package system
import (
"testing"
"github.com/influxdb/telegraf/plugins/system/ps/cpu"
"github.com/influxdb/telegraf/plugins/system/ps/docker"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDockerStats_GenerateStats(t *testing.T) {
var mps MockPS
var acc testutil.Accumulator
ds := &DockerContainerStat{
Name: "blah",
CPU: &cpu.CPUTimesStat{
CPU: "all",
User: 3.1,
System: 8.2,
Idle: 80.1,
Nice: 1.3,
Iowait: 0.2,
Irq: 0.1,
Softirq: 0.11,
Steal: 0.0001,
Guest: 8.1,
GuestNice: 0.324,
Stolen: 0.051,
},
Mem: &docker.CgroupMemStat{
ContainerID: "blah",
Cache: 1,
RSS: 2,
RSSHuge: 3,
MappedFile: 4,
Pgpgin: 5,
Pgpgout: 6,
Pgfault: 7,
Pgmajfault: 8,
InactiveAnon: 9,
ActiveAnon: 10,
InactiveFile: 11,
ActiveFile: 12,
Unevictable: 13,
HierarchicalMemoryLimit: 14,
TotalCache: 15,
TotalRSS: 16,
TotalRSSHuge: 17,
TotalMappedFile: 18,
TotalPgpgIn: 19,
TotalPgpgOut: 20,
TotalPgFault: 21,
TotalPgMajFault: 22,
TotalInactiveAnon: 23,
TotalActiveAnon: 24,
TotalInactiveFile: 25,
TotalActiveFile: 26,
TotalUnevictable: 27,
},
}
mps.On("DockerStat").Return([]*DockerContainerStat{ds}, nil)
err := (&DockerStats{&mps}).Gather(&acc)
require.NoError(t, err)
dockertags := map[string]string{
"name": "blah",
"id": "",
"command": "",
}
assert.True(t, acc.CheckTaggedValue("user", 3.1, dockertags))
assert.True(t, acc.CheckTaggedValue("system", 8.2, dockertags))
assert.True(t, acc.CheckTaggedValue("idle", 80.1, dockertags))
assert.True(t, acc.CheckTaggedValue("nice", 1.3, dockertags))
assert.True(t, acc.CheckTaggedValue("iowait", 0.2, dockertags))
assert.True(t, acc.CheckTaggedValue("irq", 0.1, dockertags))
assert.True(t, acc.CheckTaggedValue("softirq", 0.11, dockertags))
assert.True(t, acc.CheckTaggedValue("steal", 0.0001, dockertags))
assert.True(t, acc.CheckTaggedValue("guest", 8.1, dockertags))
assert.True(t, acc.CheckTaggedValue("guestNice", 0.324, dockertags))
assert.True(t, acc.CheckTaggedValue("stolen", 0.051, dockertags))
assert.True(t, acc.CheckTaggedValue("cache", uint64(1), dockertags))
assert.True(t, acc.CheckTaggedValue("rss", uint64(2), dockertags))
assert.True(t, acc.CheckTaggedValue("rss_huge", uint64(3), dockertags))
assert.True(t, acc.CheckTaggedValue("mapped_file", uint64(4), dockertags))
assert.True(t, acc.CheckTaggedValue("swap_in", uint64(5), dockertags))
assert.True(t, acc.CheckTaggedValue("swap_out", uint64(6), dockertags))
assert.True(t, acc.CheckTaggedValue("page_fault", uint64(7), dockertags))
assert.True(t, acc.CheckTaggedValue("page_major_fault", uint64(8), dockertags))
assert.True(t, acc.CheckTaggedValue("inactive_anon", uint64(9), dockertags))
assert.True(t, acc.CheckTaggedValue("active_anon", uint64(10), dockertags))
assert.True(t, acc.CheckTaggedValue("inactive_file", uint64(11), dockertags))
assert.True(t, acc.CheckTaggedValue("active_file", uint64(12), dockertags))
assert.True(t, acc.CheckTaggedValue("unevictable", uint64(13), dockertags))
assert.True(t, acc.CheckTaggedValue("memory_limit", uint64(14), dockertags))
assert.True(t, acc.CheckTaggedValue("total_cache", uint64(15), dockertags))
assert.True(t, acc.CheckTaggedValue("total_rss", uint64(16), dockertags))
assert.True(t, acc.CheckTaggedValue("total_rss_huge", uint64(17), dockertags))
assert.True(t, acc.CheckTaggedValue("total_mapped_file", uint64(18), dockertags))
assert.True(t, acc.CheckTaggedValue("total_swap_in", uint64(19), dockertags))
assert.True(t, acc.CheckTaggedValue("total_swap_out", uint64(20), dockertags))
assert.True(t, acc.CheckTaggedValue("total_page_fault", uint64(21), dockertags))
assert.True(t, acc.CheckTaggedValue("total_page_major_fault", uint64(22), dockertags))
assert.True(t, acc.CheckTaggedValue("total_inactive_anon", uint64(23), dockertags))
assert.True(t, acc.CheckTaggedValue("total_active_anon", uint64(24), dockertags))
assert.True(t, acc.CheckTaggedValue("total_inactive_file", uint64(25), dockertags))
assert.True(t, acc.CheckTaggedValue("total_active_file", uint64(26), dockertags))
assert.True(t, acc.CheckTaggedValue("total_unevictable", uint64(27), dockertags))
}

View File

@@ -27,7 +27,7 @@ func (s *MemStats) Gather(acc plugins.Accumulator) error {
acc.Add("total", vm.Total, vmtags)
acc.Add("available", vm.Available, vmtags)
acc.Add("used", vm.Used, vmtags)
acc.Add("used_prec", vm.UsedPercent, vmtags)
acc.Add("used_perc", vm.UsedPercent, vmtags)
acc.Add("free", vm.Free, vmtags)
acc.Add("active", vm.Active, vmtags)
acc.Add("inactive", vm.Inactive, vmtags)

View File

@@ -19,11 +19,11 @@ func (_ *NetIOStats) Description() string {
}
var netSampleConfig = `
# By default, telegraf gathers stats from any up interface (excluding loopback)
# Setting interfaces will tell it to gather these explicit interfaces,
# regardless of status.
#
# interfaces = ["eth0", ... ]
# By default, telegraf gathers stats from any up interface (excluding loopback)
# Setting interfaces will tell it to gather these explicit interfaces,
# regardless of status.
#
# interfaces = ["eth0", ... ]
`
func (_ *NetIOStats) SampleConfig() string {

View File

@@ -53,7 +53,7 @@ func VirtualMemory() (*VirtualMemoryStat, error) {
}
ret := &VirtualMemoryStat{
Total: parsed[0] * p,
Total: parsed[0],
Free: parsed[1] * p,
}

View File

@@ -30,17 +30,17 @@ func VirtualMemory() (*VirtualMemoryStat, error) {
}
switch key {
case "MemTotal":
ret.Total = t * 1000
ret.Total = t * 1024
case "MemFree":
ret.Free = t * 1000
ret.Free = t * 1024
case "Buffers":
ret.Buffers = t * 1000
ret.Buffers = t * 1024
case "Cached":
ret.Cached = t * 1000
ret.Cached = t * 1024
case "Active":
ret.Active = t * 1000
ret.Active = t * 1024
case "Inactive":
ret.Inactive = t * 1000
ret.Inactive = t * 1024
}
}
ret.Available = ret.Free + ret.Buffers + ret.Cached

View File

@@ -7,7 +7,6 @@ import (
"github.com/influxdb/telegraf/plugins/system/ps/cpu"
"github.com/influxdb/telegraf/plugins/system/ps/disk"
"github.com/influxdb/telegraf/plugins/system/ps/docker"
"github.com/influxdb/telegraf/plugins/system/ps/load"
"github.com/influxdb/telegraf/plugins/system/ps/mem"
"github.com/influxdb/telegraf/plugins/system/ps/net"
@@ -129,56 +128,6 @@ func TestSystemStats_GenerateStats(t *testing.T) {
mps.On("SwapStat").Return(sms, nil)
ds := &DockerContainerStat{
Name: "blah",
CPU: &cpu.CPUTimesStat{
CPU: "all",
User: 3.1,
System: 8.2,
Idle: 80.1,
Nice: 1.3,
Iowait: 0.2,
Irq: 0.1,
Softirq: 0.11,
Steal: 0.0001,
Guest: 8.1,
GuestNice: 0.324,
Stolen: 0.051,
},
Mem: &docker.CgroupMemStat{
ContainerID: "blah",
Cache: 1,
RSS: 2,
RSSHuge: 3,
MappedFile: 4,
Pgpgin: 5,
Pgpgout: 6,
Pgfault: 7,
Pgmajfault: 8,
InactiveAnon: 9,
ActiveAnon: 10,
InactiveFile: 11,
ActiveFile: 12,
Unevictable: 13,
HierarchicalMemoryLimit: 14,
TotalCache: 15,
TotalRSS: 16,
TotalRSSHuge: 17,
TotalMappedFile: 18,
TotalPgpgIn: 19,
TotalPgpgOut: 20,
TotalPgFault: 21,
TotalPgMajFault: 22,
TotalInactiveAnon: 23,
TotalActiveAnon: 24,
TotalInactiveFile: 25,
TotalActiveFile: 26,
TotalUnevictable: 27,
},
}
mps.On("DockerStat").Return([]*DockerContainerStat{ds}, nil)
ss := &SystemStats{ps: &mps}
err := ss.Gather(&acc)
@@ -310,7 +259,7 @@ func TestSystemStats_GenerateStats(t *testing.T) {
assert.True(t, acc.CheckTaggedValue("total", uint64(12400), vmtags))
assert.True(t, acc.CheckTaggedValue("available", uint64(7600), vmtags))
assert.True(t, acc.CheckTaggedValue("used", uint64(5000), vmtags))
assert.True(t, acc.CheckTaggedValue("used_prec", float64(47.1), vmtags))
assert.True(t, acc.CheckTaggedValue("used_perc", float64(47.1), vmtags))
assert.True(t, acc.CheckTaggedValue("free", uint64(1235), vmtags))
assert.True(t, acc.CheckTaggedValue("active", uint64(8134), vmtags))
assert.True(t, acc.CheckTaggedValue("inactive", uint64(1124), vmtags))
@@ -332,55 +281,6 @@ func TestSystemStats_GenerateStats(t *testing.T) {
assert.NoError(t, acc.ValidateTaggedValue("free", uint64(6412), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("in", uint64(7), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("out", uint64(830), swaptags))
err = (&DockerStats{&mps}).Gather(&acc)
require.NoError(t, err)
dockertags := map[string]string{
"name": "blah",
"id": "",
"command": "",
}
assert.True(t, acc.CheckTaggedValue("user", 3.1, dockertags))
assert.True(t, acc.CheckTaggedValue("system", 8.2, dockertags))
assert.True(t, acc.CheckTaggedValue("idle", 80.1, dockertags))
assert.True(t, acc.CheckTaggedValue("nice", 1.3, dockertags))
assert.True(t, acc.CheckTaggedValue("iowait", 0.2, dockertags))
assert.True(t, acc.CheckTaggedValue("irq", 0.1, dockertags))
assert.True(t, acc.CheckTaggedValue("softirq", 0.11, dockertags))
assert.True(t, acc.CheckTaggedValue("steal", 0.0001, dockertags))
assert.True(t, acc.CheckTaggedValue("guest", 8.1, dockertags))
assert.True(t, acc.CheckTaggedValue("guestNice", 0.324, dockertags))
assert.True(t, acc.CheckTaggedValue("stolen", 0.051, dockertags))
assert.True(t, acc.CheckTaggedValue("cache", uint64(1), dockertags))
assert.True(t, acc.CheckTaggedValue("rss", uint64(2), dockertags))
assert.True(t, acc.CheckTaggedValue("rss_huge", uint64(3), dockertags))
assert.True(t, acc.CheckTaggedValue("mapped_file", uint64(4), dockertags))
assert.True(t, acc.CheckTaggedValue("swap_in", uint64(5), dockertags))
assert.True(t, acc.CheckTaggedValue("swap_out", uint64(6), dockertags))
assert.True(t, acc.CheckTaggedValue("page_fault", uint64(7), dockertags))
assert.True(t, acc.CheckTaggedValue("page_major_fault", uint64(8), dockertags))
assert.True(t, acc.CheckTaggedValue("inactive_anon", uint64(9), dockertags))
assert.True(t, acc.CheckTaggedValue("active_anon", uint64(10), dockertags))
assert.True(t, acc.CheckTaggedValue("inactive_file", uint64(11), dockertags))
assert.True(t, acc.CheckTaggedValue("active_file", uint64(12), dockertags))
assert.True(t, acc.CheckTaggedValue("unevictable", uint64(13), dockertags))
assert.True(t, acc.CheckTaggedValue("memory_limit", uint64(14), dockertags))
assert.True(t, acc.CheckTaggedValue("total_cache", uint64(15), dockertags))
assert.True(t, acc.CheckTaggedValue("total_rss", uint64(16), dockertags))
assert.True(t, acc.CheckTaggedValue("total_rss_huge", uint64(17), dockertags))
assert.True(t, acc.CheckTaggedValue("total_mapped_file", uint64(18), dockertags))
assert.True(t, acc.CheckTaggedValue("total_swap_in", uint64(19), dockertags))
assert.True(t, acc.CheckTaggedValue("total_swap_out", uint64(20), dockertags))
assert.True(t, acc.CheckTaggedValue("total_page_fault", uint64(21), dockertags))
assert.True(t, acc.CheckTaggedValue("total_page_major_fault", uint64(22), dockertags))
assert.True(t, acc.CheckTaggedValue("total_inactive_anon", uint64(23), dockertags))
assert.True(t, acc.CheckTaggedValue("total_active_anon", uint64(24), dockertags))
assert.True(t, acc.CheckTaggedValue("total_inactive_file", uint64(25), dockertags))
assert.True(t, acc.CheckTaggedValue("total_active_file", uint64(26), dockertags))
assert.True(t, acc.CheckTaggedValue("total_unevictable", uint64(27), dockertags))
}
// Asserts that a given accumulator contains a measurment of type float64 with

View File

@@ -74,10 +74,6 @@ percpu = false
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]
# Read metrics about docker containers
[docker]
# no configuration
# Read stats from one or more Elasticsearch servers or clusters
[elasticsearch]

View File

@@ -4,6 +4,9 @@ import (
"net"
"net/url"
"os"
"time"
"github.com/influxdb/influxdb/client"
)
var localhost = "localhost"
@@ -27,3 +30,17 @@ func GetLocalHost() string {
}
return localhost
}
// MockBatchPoints returns a mock BatchPoints object for using in unit tests
// of telegraf output sinks.
func MockBatchPoints() client.BatchPoints {
var bp client.BatchPoints
bp.Time = time.Now()
bp.Tags = map[string]string{"tag1": "value1"}
bp.Points = []client.Point{
{
Fields: map[string]interface{}{"value": 1.0},
},
}
return bp
}