Merge branch 'master' into cgroup-plugin
This commit is contained in:
commit
8b2c8f75ca
|
@ -1,4 +1,4 @@
|
||||||
## v1.0 [unreleased]
|
## v1.0 beta 1 [2016-06-07]
|
||||||
|
|
||||||
### Release Notes
|
### Release Notes
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ in conjunction with wildcard dimension values as it will control the amount of
|
||||||
time before a new metric is included by the plugin.
|
time before a new metric is included by the plugin.
|
||||||
|
|
||||||
### Features
|
### Features
|
||||||
|
- [#1262](https://github.com/influxdata/telegraf/pull/1261): Add graylog input pluging.
|
||||||
- [#1294](https://github.com/influxdata/telegraf/pull/1294): consul input plugin. Thanks @harnash
|
- [#1294](https://github.com/influxdata/telegraf/pull/1294): consul input plugin. Thanks @harnash
|
||||||
- [#1164](https://github.com/influxdata/telegraf/pull/1164): conntrack input plugin. Thanks @robinpercy!
|
- [#1164](https://github.com/influxdata/telegraf/pull/1164): conntrack input plugin. Thanks @robinpercy!
|
||||||
- [#1165](https://github.com/influxdata/telegraf/pull/1165): vmstat input plugin. Thanks @jshim-xm!
|
- [#1165](https://github.com/influxdata/telegraf/pull/1165): vmstat input plugin. Thanks @jshim-xm!
|
||||||
|
@ -46,6 +47,9 @@ time before a new metric is included by the plugin.
|
||||||
- [#1268](https://github.com/influxdata/telegraf/pull/1268): Fix potential influxdb input type assertion panic.
|
- [#1268](https://github.com/influxdata/telegraf/pull/1268): Fix potential influxdb input type assertion panic.
|
||||||
- [#1283](https://github.com/influxdata/telegraf/pull/1283): Still send processes metrics if a process exited during metric collection.
|
- [#1283](https://github.com/influxdata/telegraf/pull/1283): Still send processes metrics if a process exited during metric collection.
|
||||||
- [#1297](https://github.com/influxdata/telegraf/issues/1297): disk plugin panic when usage grab fails.
|
- [#1297](https://github.com/influxdata/telegraf/issues/1297): disk plugin panic when usage grab fails.
|
||||||
|
- [#1316](https://github.com/influxdata/telegraf/pull/1316): Removed leaked "database" tag on redis metrics. Thanks @PierreF!
|
||||||
|
- [#1323](https://github.com/influxdata/telegraf/issues/1323): Processes plugin: fix potential error with /proc/net/stat directory.
|
||||||
|
- [#1322](https://github.com/influxdata/telegraf/issues/1322): Fix rare RHEL 5.2 panic in gopsutil diskio gathering function.
|
||||||
|
|
||||||
## v0.13.1 [2016-05-24]
|
## v0.13.1 [2016-05-24]
|
||||||
|
|
||||||
|
|
2
Godeps
2
Godeps
|
@ -43,7 +43,7 @@ github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6
|
||||||
github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37
|
github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37
|
||||||
github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8
|
github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8
|
||||||
github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f
|
github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f
|
||||||
github.com/shirou/gopsutil 83c6e72cbdef6e8ada934549abf700ff0ba96776
|
github.com/shirou/gopsutil 586bb697f3ec9f8ec08ffefe18f521a64534037c
|
||||||
github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d
|
github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d
|
||||||
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
|
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
|
||||||
github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
|
github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
|
||||||
|
|
6
Makefile
6
Makefile
|
@ -64,7 +64,6 @@ endif
|
||||||
docker run --name memcached -p "11211:11211" -d memcached
|
docker run --name memcached -p "11211:11211" -d memcached
|
||||||
docker run --name postgres -p "5432:5432" -d postgres
|
docker run --name postgres -p "5432:5432" -d postgres
|
||||||
docker run --name rabbitmq -p "15672:15672" -p "5672:5672" -d rabbitmq:3-management
|
docker run --name rabbitmq -p "15672:15672" -p "5672:5672" -d rabbitmq:3-management
|
||||||
docker run --name opentsdb -p "4242:4242" -d petergrace/opentsdb-docker
|
|
||||||
docker run --name redis -p "6379:6379" -d redis
|
docker run --name redis -p "6379:6379" -d redis
|
||||||
docker run --name aerospike -p "3000:3000" -d aerospike
|
docker run --name aerospike -p "3000:3000" -d aerospike
|
||||||
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
|
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
|
||||||
|
@ -79,7 +78,6 @@ docker-run-circle:
|
||||||
-e ADVERTISED_PORT=9092 \
|
-e ADVERTISED_PORT=9092 \
|
||||||
-p "2181:2181" -p "9092:9092" \
|
-p "2181:2181" -p "9092:9092" \
|
||||||
-d spotify/kafka
|
-d spotify/kafka
|
||||||
docker run --name opentsdb -p "4242:4242" -d petergrace/opentsdb-docker
|
|
||||||
docker run --name aerospike -p "3000:3000" -d aerospike
|
docker run --name aerospike -p "3000:3000" -d aerospike
|
||||||
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
|
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
|
||||||
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
|
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
|
||||||
|
@ -88,8 +86,8 @@ docker-run-circle:
|
||||||
|
|
||||||
# Kill all docker containers, ignore errors
|
# Kill all docker containers, ignore errors
|
||||||
docker-kill:
|
docker-kill:
|
||||||
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
|
-docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann snmp
|
||||||
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
|
-docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann snmp
|
||||||
|
|
||||||
# Run full unit tests using docker containers (includes setup and teardown)
|
# Run full unit tests using docker containers (includes setup and teardown)
|
||||||
test: vet docker-kill docker-run
|
test: vet docker-kill docker-run
|
||||||
|
|
19
README.md
19
README.md
|
@ -20,12 +20,12 @@ new plugins.
|
||||||
### Linux deb and rpm Packages:
|
### Linux deb and rpm Packages:
|
||||||
|
|
||||||
Latest:
|
Latest:
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf_0.13.1_amd64.deb
|
* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta1_amd64.deb
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1.x86_64.rpm
|
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta1.x86_64.rpm
|
||||||
|
|
||||||
Latest (arm):
|
Latest (arm):
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf_0.13.1_armhf.deb
|
* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta1_armhf.deb
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1.armhf.rpm
|
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta1.armhf.rpm
|
||||||
|
|
||||||
##### Package Instructions:
|
##### Package Instructions:
|
||||||
|
|
||||||
|
@ -46,14 +46,14 @@ to use this repo to install & update telegraf.
|
||||||
### Linux tarballs:
|
### Linux tarballs:
|
||||||
|
|
||||||
Latest:
|
Latest:
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1_linux_amd64.tar.gz
|
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta1_linux_amd64.tar.gz
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1_linux_i386.tar.gz
|
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta1_linux_i386.tar.gz
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1_linux_armhf.tar.gz
|
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta1_linux_armhf.tar.gz
|
||||||
|
|
||||||
### FreeBSD tarball:
|
### FreeBSD tarball:
|
||||||
|
|
||||||
Latest:
|
Latest:
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1_freebsd_amd64.tar.gz
|
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta1_freebsd_amd64.tar.gz
|
||||||
|
|
||||||
### Ansible Role:
|
### Ansible Role:
|
||||||
|
|
||||||
|
@ -69,8 +69,7 @@ brew install telegraf
|
||||||
### Windows Binaries (EXPERIMENTAL)
|
### Windows Binaries (EXPERIMENTAL)
|
||||||
|
|
||||||
Latest:
|
Latest:
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1_windows_amd64.zip
|
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta1_windows_amd64.zip
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1_windows_i386.zip
|
|
||||||
|
|
||||||
### From Source:
|
### From Source:
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
package errchan
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ErrChan struct {
|
||||||
|
C chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
// New returns an error channel of max length 'n'
|
||||||
|
// errors can be sent to the ErrChan.C channel, and will be returned when
|
||||||
|
// ErrChan.Error() is called.
|
||||||
|
func New(n int) *ErrChan {
|
||||||
|
return &ErrChan{
|
||||||
|
C: make(chan error, n),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error closes the ErrChan.C channel and returns an error if there are any
|
||||||
|
// non-nil errors, otherwise returns nil.
|
||||||
|
func (e *ErrChan) Error() error {
|
||||||
|
close(e.C)
|
||||||
|
|
||||||
|
var out string
|
||||||
|
for err := range e.C {
|
||||||
|
if err != nil {
|
||||||
|
out += "[" + err.Error() + "], "
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if out != "" {
|
||||||
|
return fmt.Errorf("Errors encountered: " + strings.TrimRight(out, ", "))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -3,6 +3,7 @@ package cloudwatch
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
@ -12,6 +13,7 @@ import (
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
internalaws "github.com/influxdata/telegraf/internal/config/aws"
|
internalaws "github.com/influxdata/telegraf/internal/config/aws"
|
||||||
|
"github.com/influxdata/telegraf/internal/errchan"
|
||||||
"github.com/influxdata/telegraf/internal/limiter"
|
"github.com/influxdata/telegraf/internal/limiter"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
)
|
)
|
||||||
|
@ -166,7 +168,7 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
metricCount := len(metrics)
|
metricCount := len(metrics)
|
||||||
var errChan = make(chan error, metricCount)
|
errChan := errchan.New(metricCount)
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
|
@ -175,18 +177,18 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
|
||||||
// http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
|
// http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
|
||||||
lmtr := limiter.NewRateLimiter(10, time.Second)
|
lmtr := limiter.NewRateLimiter(10, time.Second)
|
||||||
defer lmtr.Stop()
|
defer lmtr.Stop()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(metrics))
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
<-lmtr.C
|
<-lmtr.C
|
||||||
go c.gatherMetric(acc, m, now, errChan)
|
go func(inm *cloudwatch.Metric) {
|
||||||
|
defer wg.Done()
|
||||||
|
c.gatherMetric(acc, inm, now, errChan.C)
|
||||||
|
}(m)
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
for i := 1; i <= metricCount; i++ {
|
return errChan.Error()
|
||||||
err := <-errChan
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
@ -2,14 +2,13 @@ package elasticsearch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal/errchan"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
|
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
|
||||||
)
|
)
|
||||||
|
@ -102,7 +101,7 @@ func (e *Elasticsearch) Description() string {
|
||||||
// Gather reads the stats from Elasticsearch and writes it to the
|
// Gather reads the stats from Elasticsearch and writes it to the
|
||||||
// Accumulator.
|
// Accumulator.
|
||||||
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
|
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
|
||||||
errChan := make(chan error, len(e.Servers))
|
errChan := errchan.New(len(e.Servers))
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(len(e.Servers))
|
wg.Add(len(e.Servers))
|
||||||
|
|
||||||
|
@ -116,7 +115,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
|
||||||
url = s + statsPath
|
url = s + statsPath
|
||||||
}
|
}
|
||||||
if err := e.gatherNodeStats(url, acc); err != nil {
|
if err := e.gatherNodeStats(url, acc); err != nil {
|
||||||
errChan <- err
|
errChan.C <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if e.ClusterHealth {
|
if e.ClusterHealth {
|
||||||
|
@ -126,17 +125,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
close(errChan)
|
return errChan.Error()
|
||||||
// Get all errors and return them as one giant error
|
|
||||||
errStrings := []string{}
|
|
||||||
for err := range errChan {
|
|
||||||
errStrings = append(errStrings, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(errStrings) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return errors.New(strings.Join(errStrings, "\n"))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
|
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
|
||||||
|
|
|
@ -11,39 +11,22 @@ all scripts matching glob pattern ```/tmp/collect_*.sh``` are configured for ```
|
||||||
in JSON format. Glob patterns are matched on every run, so adding new scripts that match the pattern
|
in JSON format. Glob patterns are matched on every run, so adding new scripts that match the pattern
|
||||||
will cause them to be picked up immediately.
|
will cause them to be picked up immediately.
|
||||||
|
|
||||||
```
|
```toml
|
||||||
# Read flattened metrics from one or more commands that output JSON to stdout
|
# Read flattened metrics from one or more commands that output JSON to stdout
|
||||||
[[inputs.exec]]
|
[[inputs.exec]]
|
||||||
# Shell/commands array
|
# Shell/commands array
|
||||||
# Full command line to executable with parameters, or a glob pattern to run all matching files.
|
# Full command line to executable with parameters, or a glob pattern to run all matching files.
|
||||||
commands = ["/tmp/test.sh", "/tmp/test2.sh", "/tmp/collect_*.sh"]
|
commands = ["/tmp/test.sh", "/tmp/test2.sh", "/tmp/collect_*.sh"]
|
||||||
|
|
||||||
|
## Timeout for each command to complete.
|
||||||
|
timeout = "5s"
|
||||||
|
|
||||||
# Data format to consume.
|
# Data format to consume.
|
||||||
# NOTE json only reads numerical measurements, strings and booleans are ignored.
|
# NOTE json only reads numerical measurements, strings and booleans are ignored.
|
||||||
data_format = "json"
|
data_format = "json"
|
||||||
|
|
||||||
# measurement name suffix (for separating different commands)
|
# measurement name suffix (for separating different commands)
|
||||||
name_suffix = "_mycollector"
|
name_suffix = "_mycollector"
|
||||||
|
|
||||||
## Below configuration will be used for data_format = "graphite", can be ignored for other data_format
|
|
||||||
## If matching multiple measurement files, this string will be used to join the matched values.
|
|
||||||
#separator = "."
|
|
||||||
|
|
||||||
## Each template line requires a template pattern. It can have an optional
|
|
||||||
## filter before the template and separated by spaces. It can also have optional extra
|
|
||||||
## tags following the template. Multiple tags should be separated by commas and no spaces
|
|
||||||
## similar to the line protocol format. The can be only one default template.
|
|
||||||
## Templates support below format:
|
|
||||||
## 1. filter + template
|
|
||||||
## 2. filter + template + extra tag
|
|
||||||
## 3. filter + template with field key
|
|
||||||
## 4. default template
|
|
||||||
#templates = [
|
|
||||||
# "*.app env.service.resource.measurement",
|
|
||||||
# "stats.* .host.measurement* region=us-west,agent=sensu",
|
|
||||||
# "stats2.* .host.measurement.field",
|
|
||||||
# "measurement*"
|
|
||||||
#]
|
|
||||||
```
|
```
|
||||||
|
|
||||||
Other options for modifying the measurement names are:
|
Other options for modifying the measurement names are:
|
||||||
|
@ -82,7 +65,7 @@ in influx line-protocol format.
|
||||||
|
|
||||||
#### Configuration
|
#### Configuration
|
||||||
|
|
||||||
```
|
```toml
|
||||||
[[inputs.exec]]
|
[[inputs.exec]]
|
||||||
# Shell/commands array
|
# Shell/commands array
|
||||||
# compatible with old version
|
# compatible with old version
|
||||||
|
@ -90,6 +73,9 @@ in influx line-protocol format.
|
||||||
# command = "/usr/bin/line_protocol_collector"
|
# command = "/usr/bin/line_protocol_collector"
|
||||||
commands = ["/usr/bin/line_protocol_collector","/tmp/test2.sh"]
|
commands = ["/usr/bin/line_protocol_collector","/tmp/test2.sh"]
|
||||||
|
|
||||||
|
## Timeout for each command to complete.
|
||||||
|
timeout = "5s"
|
||||||
|
|
||||||
# Data format to consume.
|
# Data format to consume.
|
||||||
# NOTE json only reads numerical measurements, strings and booleans are ignored.
|
# NOTE json only reads numerical measurements, strings and booleans are ignored.
|
||||||
data_format = "influx"
|
data_format = "influx"
|
||||||
|
@ -123,12 +109,16 @@ We can also change the data_format to "graphite" to use the metrics collecting s
|
||||||
In this example a script called /tmp/test.sh and a script called /tmp/test2.sh are configured for [[inputs.exec]] in graphite format.
|
In this example a script called /tmp/test.sh and a script called /tmp/test2.sh are configured for [[inputs.exec]] in graphite format.
|
||||||
|
|
||||||
#### Configuration
|
#### Configuration
|
||||||
```
|
|
||||||
|
```toml
|
||||||
# Read flattened metrics from one or more commands that output JSON to stdout
|
# Read flattened metrics from one or more commands that output JSON to stdout
|
||||||
[[inputs.exec]]
|
[[inputs.exec]]
|
||||||
# Shell/commands array
|
# Shell/commands array
|
||||||
commands = ["/tmp/test.sh","/tmp/test2.sh"]
|
commands = ["/tmp/test.sh","/tmp/test2.sh"]
|
||||||
|
|
||||||
|
## Timeout for each command to complete.
|
||||||
|
timeout = "5s"
|
||||||
|
|
||||||
# Data format to consume.
|
# Data format to consume.
|
||||||
# NOTE json only reads numerical measurements, strings and booleans are ignored.
|
# NOTE json only reads numerical measurements, strings and booleans are ignored.
|
||||||
data_format = "graphite"
|
data_format = "graphite"
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/internal/errchan"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/nagios"
|
"github.com/influxdata/telegraf/plugins/parsers/nagios"
|
||||||
|
@ -182,23 +183,15 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
e.errChan = make(chan error, len(commands))
|
errChan := errchan.New(len(commands))
|
||||||
|
e.errChan = errChan.C
|
||||||
|
|
||||||
e.wg.Add(len(commands))
|
e.wg.Add(len(commands))
|
||||||
for _, command := range commands {
|
for _, command := range commands {
|
||||||
go e.ProcessCommand(command, acc)
|
go e.ProcessCommand(command, acc)
|
||||||
}
|
}
|
||||||
e.wg.Wait()
|
e.wg.Wait()
|
||||||
|
return errChan.Error()
|
||||||
select {
|
|
||||||
default:
|
|
||||||
close(e.errChan)
|
|
||||||
return nil
|
|
||||||
case err := <-e.errChan:
|
|
||||||
close(e.errChan)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
@ -3,8 +3,6 @@ package haproxy
|
||||||
import (
|
import (
|
||||||
"encoding/csv"
|
"encoding/csv"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/influxdata/telegraf"
|
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -13,6 +11,10 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal/errchan"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
)
|
)
|
||||||
|
|
||||||
//CSV format: https://cbonte.github.io/haproxy-dconv/configuration-1.5.html#9.1
|
//CSV format: https://cbonte.github.io/haproxy-dconv/configuration-1.5.html#9.1
|
||||||
|
@ -113,20 +115,17 @@ func (g *haproxy) Gather(acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
errChan := errchan.New(len(g.Servers))
|
||||||
var outerr error
|
wg.Add(len(g.Servers))
|
||||||
|
for _, server := range g.Servers {
|
||||||
for _, serv := range g.Servers {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(serv string) {
|
go func(serv string) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
outerr = g.gatherServer(serv, acc)
|
errChan.C <- g.gatherServer(serv, acc)
|
||||||
}(serv)
|
}(server)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
return errChan.Error()
|
||||||
return outerr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error {
|
func (g *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error {
|
||||||
|
|
|
@ -5,9 +5,11 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal/errchan"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -129,20 +131,18 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var errChan = make(chan error, len(gatherFunctions))
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(gatherFunctions))
|
||||||
|
errChan := errchan.New(len(gatherFunctions))
|
||||||
for _, f := range gatherFunctions {
|
for _, f := range gatherFunctions {
|
||||||
go f(r, acc, errChan)
|
go func(gf gatherFunc) {
|
||||||
|
defer wg.Done()
|
||||||
|
gf(r, acc, errChan.C)
|
||||||
|
}(f)
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
for i := 1; i <= len(gatherFunctions); i++ {
|
return errChan.Error()
|
||||||
err := <-errChan
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
|
func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
|
||||||
|
|
|
@ -241,10 +241,14 @@ func gatherKeyspaceLine(
|
||||||
name string,
|
name string,
|
||||||
line string,
|
line string,
|
||||||
acc telegraf.Accumulator,
|
acc telegraf.Accumulator,
|
||||||
tags map[string]string,
|
global_tags map[string]string,
|
||||||
) {
|
) {
|
||||||
if strings.Contains(line, "keys=") {
|
if strings.Contains(line, "keys=") {
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
|
tags := make(map[string]string)
|
||||||
|
for k, v := range global_tags {
|
||||||
|
tags[k] = v
|
||||||
|
}
|
||||||
tags["database"] = name
|
tags["database"] = name
|
||||||
dbparts := strings.Split(line, ",")
|
dbparts := strings.Split(line, ",")
|
||||||
for _, dbp := range dbparts {
|
for _, dbp := range dbparts {
|
||||||
|
|
|
@ -35,6 +35,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
|
||||||
err := gatherInfoOutput(rdr, &acc, tags)
|
err := gatherInfoOutput(rdr, &acc, tags)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tags = map[string]string{"host": "redis.net", "role": "master"}
|
||||||
fields := map[string]interface{}{
|
fields := map[string]interface{}{
|
||||||
"uptime": uint64(238),
|
"uptime": uint64(238),
|
||||||
"clients": uint64(1),
|
"clients": uint64(1),
|
||||||
|
@ -70,13 +71,14 @@ func TestRedis_ParseMetrics(t *testing.T) {
|
||||||
"used_cpu_user_children": float64(0.00),
|
"used_cpu_user_children": float64(0.00),
|
||||||
"keyspace_hitrate": float64(0.50),
|
"keyspace_hitrate": float64(0.50),
|
||||||
}
|
}
|
||||||
|
keyspaceTags := map[string]string{"host": "redis.net", "role": "master", "database": "db0"}
|
||||||
keyspaceFields := map[string]interface{}{
|
keyspaceFields := map[string]interface{}{
|
||||||
"avg_ttl": uint64(0),
|
"avg_ttl": uint64(0),
|
||||||
"expires": uint64(0),
|
"expires": uint64(0),
|
||||||
"keys": uint64(2),
|
"keys": uint64(2),
|
||||||
}
|
}
|
||||||
acc.AssertContainsTaggedFields(t, "redis", fields, tags)
|
acc.AssertContainsTaggedFields(t, "redis", fields, tags)
|
||||||
acc.AssertContainsTaggedFields(t, "redis_keyspace", keyspaceFields, tags)
|
acc.AssertContainsTaggedFields(t, "redis_keyspace", keyspaceFields, keyspaceTags)
|
||||||
}
|
}
|
||||||
|
|
||||||
const testOutput = `# Server
|
const testOutput = `# Server
|
||||||
|
|
|
@ -18,7 +18,7 @@ It is supposed to be used to monitor actual memory usage in a cross platform fas
|
||||||
designed for informational purposes only.
|
designed for informational purposes only.
|
||||||
- **free**: memory not being used at all (zeroed) that is readily available; note
|
- **free**: memory not being used at all (zeroed) that is readily available; note
|
||||||
that this doesn't reflect the actual memory available (use 'available' instead).
|
that this doesn't reflect the actual memory available (use 'available' instead).
|
||||||
- **used_percent**: the percentage usage calculated as `(total - used) / total * 100`
|
- **used_percent**: the percentage usage calculated as `used / total * 100`
|
||||||
|
|
||||||
## Measurements:
|
## Measurements:
|
||||||
#### Raw Memory measurements:
|
#### Raw Memory measurements:
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
@ -19,7 +19,7 @@ import (
|
||||||
|
|
||||||
type Processes struct {
|
type Processes struct {
|
||||||
execPS func() ([]byte, error)
|
execPS func() ([]byte, error)
|
||||||
readProcFile func(statFile string) ([]byte, error)
|
readProcFile func(filename string) ([]byte, error)
|
||||||
|
|
||||||
forcePS bool
|
forcePS bool
|
||||||
forceProc bool
|
forceProc bool
|
||||||
|
@ -128,22 +128,16 @@ func (p *Processes) gatherFromPS(fields map[string]interface{}) error {
|
||||||
|
|
||||||
// get process states from /proc/(pid)/stat files
|
// get process states from /proc/(pid)/stat files
|
||||||
func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
|
func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
|
||||||
files, err := ioutil.ReadDir("/proc")
|
filenames, err := filepath.Glob("/proc/[0-9]*/stat")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, file := range files {
|
for _, filename := range filenames {
|
||||||
if !file.IsDir() {
|
_, err := os.Stat(filename)
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
statFile := path.Join("/proc", file.Name(), "stat")
|
data, err := p.readProcFile(filename)
|
||||||
data, err := p.readProcFile(statFile)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !file.IsDir() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if data == nil {
|
if data == nil {
|
||||||
|
@ -159,7 +153,7 @@ func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
|
||||||
|
|
||||||
stats := bytes.Fields(data)
|
stats := bytes.Fields(data)
|
||||||
if len(stats) < 3 {
|
if len(stats) < 3 {
|
||||||
return fmt.Errorf("Something is terribly wrong with %s", statFile)
|
return fmt.Errorf("Something is terribly wrong with %s", filename)
|
||||||
}
|
}
|
||||||
switch stats[0][0] {
|
switch stats[0][0] {
|
||||||
case 'R':
|
case 'R':
|
||||||
|
@ -176,7 +170,7 @@ func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
|
||||||
fields["paging"] = fields["paging"].(int64) + int64(1)
|
fields["paging"] = fields["paging"].(int64) + int64(1)
|
||||||
default:
|
default:
|
||||||
log.Printf("processes: Unknown state [ %s ] in file %s",
|
log.Printf("processes: Unknown state [ %s ] in file %s",
|
||||||
string(stats[0][0]), statFile)
|
string(stats[0][0]), filename)
|
||||||
}
|
}
|
||||||
fields["total"] = fields["total"].(int64) + int64(1)
|
fields["total"] = fields["total"].(int64) + int64(1)
|
||||||
|
|
||||||
|
@ -190,15 +184,12 @@ func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func readProcFile(statFile string) ([]byte, error) {
|
func readProcFile(filename string) ([]byte, error) {
|
||||||
if _, err := os.Stat(statFile); os.IsNotExist(err) {
|
data, err := ioutil.ReadFile(filename)
|
||||||
return nil, nil
|
|
||||||
} else if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := ioutil.ReadFile(statFile)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,9 +3,8 @@ package opentsdb
|
||||||
import (
|
import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
// "github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
// "github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestBuildTagsTelnet(t *testing.T) {
|
func TestBuildTagsTelnet(t *testing.T) {
|
||||||
|
@ -42,40 +41,40 @@ func TestBuildTagsTelnet(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite(t *testing.T) {
|
// func TestWrite(t *testing.T) {
|
||||||
if testing.Short() {
|
// if testing.Short() {
|
||||||
t.Skip("Skipping integration test in short mode")
|
// t.Skip("Skipping integration test in short mode")
|
||||||
}
|
// }
|
||||||
|
|
||||||
o := &OpenTSDB{
|
// o := &OpenTSDB{
|
||||||
Host: testutil.GetLocalHost(),
|
// Host: testutil.GetLocalHost(),
|
||||||
Port: 4242,
|
// Port: 4242,
|
||||||
Prefix: "prefix.test.",
|
// Prefix: "prefix.test.",
|
||||||
}
|
// }
|
||||||
|
|
||||||
// Verify that we can connect to the OpenTSDB instance
|
// // Verify that we can connect to the OpenTSDB instance
|
||||||
err := o.Connect()
|
// err := o.Connect()
|
||||||
require.NoError(t, err)
|
// require.NoError(t, err)
|
||||||
|
|
||||||
// Verify that we can successfully write data to OpenTSDB
|
// // Verify that we can successfully write data to OpenTSDB
|
||||||
err = o.Write(testutil.MockMetrics())
|
// err = o.Write(testutil.MockMetrics())
|
||||||
require.NoError(t, err)
|
// require.NoError(t, err)
|
||||||
|
|
||||||
// Verify postive and negative test cases of writing data
|
// // Verify postive and negative test cases of writing data
|
||||||
metrics := testutil.MockMetrics()
|
// metrics := testutil.MockMetrics()
|
||||||
metrics = append(metrics, testutil.TestMetric(float64(1.0),
|
// metrics = append(metrics, testutil.TestMetric(float64(1.0),
|
||||||
"justametric.float"))
|
// "justametric.float"))
|
||||||
metrics = append(metrics, testutil.TestMetric(int64(123456789),
|
// metrics = append(metrics, testutil.TestMetric(int64(123456789),
|
||||||
"justametric.int"))
|
// "justametric.int"))
|
||||||
metrics = append(metrics, testutil.TestMetric(uint64(123456789012345),
|
// metrics = append(metrics, testutil.TestMetric(uint64(123456789012345),
|
||||||
"justametric.uint"))
|
// "justametric.uint"))
|
||||||
metrics = append(metrics, testutil.TestMetric("Lorem Ipsum",
|
// metrics = append(metrics, testutil.TestMetric("Lorem Ipsum",
|
||||||
"justametric.string"))
|
// "justametric.string"))
|
||||||
metrics = append(metrics, testutil.TestMetric(float64(42.0),
|
// metrics = append(metrics, testutil.TestMetric(float64(42.0),
|
||||||
"justametric.anotherfloat"))
|
// "justametric.anotherfloat"))
|
||||||
metrics = append(metrics, testutil.TestMetric(float64(42.0),
|
// metrics = append(metrics, testutil.TestMetric(float64(42.0),
|
||||||
"metric w/ specialchars"))
|
// "metric w/ specialchars"))
|
||||||
|
|
||||||
err = o.Write(metrics)
|
// err = o.Write(metrics)
|
||||||
require.NoError(t, err)
|
// require.NoError(t, err)
|
||||||
}
|
// }
|
||||||
|
|
|
@ -85,7 +85,7 @@ targets = {
|
||||||
supported_builds = {
|
supported_builds = {
|
||||||
"darwin": [ "amd64" ],
|
"darwin": [ "amd64" ],
|
||||||
"windows": [ "amd64" ],
|
"windows": [ "amd64" ],
|
||||||
"linux": [ "amd64", "i386", "armhf", "armel", "arm64" ],
|
"linux": [ "amd64", "i386", "armhf", "armel", "arm64", "static_amd64" ],
|
||||||
"freebsd": [ "amd64" ]
|
"freebsd": [ "amd64" ]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -553,7 +553,7 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
|
||||||
build_root = os.path.join(tmp_build_dir,
|
build_root = os.path.join(tmp_build_dir,
|
||||||
platform,
|
platform,
|
||||||
arch,
|
arch,
|
||||||
'{}-{}-{}'.format(PACKAGE_NAME, version, iteration))
|
PACKAGE_NAME)
|
||||||
os.makedirs(build_root)
|
os.makedirs(build_root)
|
||||||
|
|
||||||
# Copy packaging scripts to build directory
|
# Copy packaging scripts to build directory
|
||||||
|
|
Loading…
Reference in New Issue