Merge pull request #1 from influxdata/master

Update from source
This commit is contained in:
Vladimir S 2016-06-07 15:06:37 +03:00
commit a1ab8ffc56
17 changed files with 165 additions and 158 deletions

View File

@ -1,4 +1,4 @@
## v1.0 [unreleased]
## v1.0 beta 1 [2016-06-07]
### Release Notes
@ -22,6 +22,7 @@ in conjunction with wildcard dimension values as it will control the amount of
time before a new metric is included by the plugin.
### Features
- [#1262](https://github.com/influxdata/telegraf/pull/1261): Add graylog input pluging.
- [#1294](https://github.com/influxdata/telegraf/pull/1294): consul input plugin. Thanks @harnash
- [#1164](https://github.com/influxdata/telegraf/pull/1164): conntrack input plugin. Thanks @robinpercy!
- [#1165](https://github.com/influxdata/telegraf/pull/1165): vmstat input plugin. Thanks @jshim-xm!
@ -46,6 +47,9 @@ time before a new metric is included by the plugin.
- [#1268](https://github.com/influxdata/telegraf/pull/1268): Fix potential influxdb input type assertion panic.
- [#1283](https://github.com/influxdata/telegraf/pull/1283): Still send processes metrics if a process exited during metric collection.
- [#1297](https://github.com/influxdata/telegraf/issues/1297): disk plugin panic when usage grab fails.
- [#1316](https://github.com/influxdata/telegraf/pull/1316): Removed leaked "database" tag on redis metrics. Thanks @PierreF!
- [#1323](https://github.com/influxdata/telegraf/issues/1323): Processes plugin: fix potential error with /proc/net/stat directory.
- [#1322](https://github.com/influxdata/telegraf/issues/1322): Fix rare RHEL 5.2 panic in gopsutil diskio gathering function.
## v0.13.1 [2016-05-24]

2
Godeps
View File

@ -43,7 +43,7 @@ github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6
github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37
github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8
github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f
github.com/shirou/gopsutil 83c6e72cbdef6e8ada934549abf700ff0ba96776
github.com/shirou/gopsutil 586bb697f3ec9f8ec08ffefe18f521a64534037c
github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c

View File

@ -64,7 +64,6 @@ endif
docker run --name memcached -p "11211:11211" -d memcached
docker run --name postgres -p "5432:5432" -d postgres
docker run --name rabbitmq -p "15672:15672" -p "5672:5672" -d rabbitmq:3-management
docker run --name opentsdb -p "4242:4242" -d petergrace/opentsdb-docker
docker run --name redis -p "6379:6379" -d redis
docker run --name aerospike -p "3000:3000" -d aerospike
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
@ -79,7 +78,6 @@ docker-run-circle:
-e ADVERTISED_PORT=9092 \
-p "2181:2181" -p "9092:9092" \
-d spotify/kafka
docker run --name opentsdb -p "4242:4242" -d petergrace/opentsdb-docker
docker run --name aerospike -p "3000:3000" -d aerospike
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
@ -88,8 +86,8 @@ docker-run-circle:
# Kill all docker containers, ignore errors
docker-kill:
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
-docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann snmp
-docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann snmp
# Run full unit tests using docker containers (includes setup and teardown)
test: vet docker-kill docker-run

View File

@ -20,12 +20,12 @@ new plugins.
### Linux deb and rpm Packages:
Latest:
* https://dl.influxdata.com/telegraf/releases/telegraf_0.13.1_amd64.deb
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1.x86_64.rpm
* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta1_amd64.deb
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta1.x86_64.rpm
Latest (arm):
* https://dl.influxdata.com/telegraf/releases/telegraf_0.13.1_armhf.deb
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1.armhf.rpm
* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta1_armhf.deb
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta1.armhf.rpm
##### Package Instructions:
@ -46,14 +46,14 @@ to use this repo to install & update telegraf.
### Linux tarballs:
Latest:
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1_linux_amd64.tar.gz
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1_linux_i386.tar.gz
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1_linux_armhf.tar.gz
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta1_linux_amd64.tar.gz
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta1_linux_i386.tar.gz
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta1_linux_armhf.tar.gz
### FreeBSD tarball:
Latest:
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1_freebsd_amd64.tar.gz
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta1_freebsd_amd64.tar.gz
### Ansible Role:
@ -69,8 +69,7 @@ brew install telegraf
### Windows Binaries (EXPERIMENTAL)
Latest:
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1_windows_amd64.zip
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.1_windows_i386.zip
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta1_windows_amd64.zip
### From Source:

View File

@ -0,0 +1,37 @@
package errchan
import (
"fmt"
"strings"
)
type ErrChan struct {
C chan error
}
// New returns an error channel of max length 'n'
// errors can be sent to the ErrChan.C channel, and will be returned when
// ErrChan.Error() is called.
func New(n int) *ErrChan {
return &ErrChan{
C: make(chan error, n),
}
}
// Error closes the ErrChan.C channel and returns an error if there are any
// non-nil errors, otherwise returns nil.
func (e *ErrChan) Error() error {
close(e.C)
var out string
for err := range e.C {
if err != nil {
out += "[" + err.Error() + "], "
}
}
if out != "" {
return fmt.Errorf("Errors encountered: " + strings.TrimRight(out, ", "))
}
return nil
}

View File

@ -3,6 +3,7 @@ package cloudwatch
import (
"fmt"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
@ -12,6 +13,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/internal/limiter"
"github.com/influxdata/telegraf/plugins/inputs"
)
@ -166,7 +168,7 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
}
metricCount := len(metrics)
var errChan = make(chan error, metricCount)
errChan := errchan.New(metricCount)
now := time.Now()
@ -175,18 +177,18 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
// http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
lmtr := limiter.NewRateLimiter(10, time.Second)
defer lmtr.Stop()
var wg sync.WaitGroup
wg.Add(len(metrics))
for _, m := range metrics {
<-lmtr.C
go c.gatherMetric(acc, m, now, errChan)
go func(inm *cloudwatch.Metric) {
defer wg.Done()
c.gatherMetric(acc, inm, now, errChan.C)
}(m)
}
wg.Wait()
for i := 1; i <= metricCount; i++ {
err := <-errChan
if err != nil {
return err
}
}
return nil
return errChan.Error()
}
func init() {

View File

@ -2,14 +2,13 @@ package elasticsearch
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
@ -102,7 +101,7 @@ func (e *Elasticsearch) Description() string {
// Gather reads the stats from Elasticsearch and writes it to the
// Accumulator.
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
errChan := make(chan error, len(e.Servers))
errChan := errchan.New(len(e.Servers))
var wg sync.WaitGroup
wg.Add(len(e.Servers))
@ -116,7 +115,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
url = s + statsPath
}
if err := e.gatherNodeStats(url, acc); err != nil {
errChan <- err
errChan.C <- err
return
}
if e.ClusterHealth {
@ -126,17 +125,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}
wg.Wait()
close(errChan)
// Get all errors and return them as one giant error
errStrings := []string{}
for err := range errChan {
errStrings = append(errStrings, err.Error())
}
if len(errStrings) == 0 {
return nil
}
return errors.New(strings.Join(errStrings, "\n"))
return errChan.Error()
}
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {

View File

@ -11,39 +11,22 @@ all scripts matching glob pattern ```/tmp/collect_*.sh``` are configured for ```
in JSON format. Glob patterns are matched on every run, so adding new scripts that match the pattern
will cause them to be picked up immediately.
```
```toml
# Read flattened metrics from one or more commands that output JSON to stdout
[[inputs.exec]]
# Shell/commands array
# Full command line to executable with parameters, or a glob pattern to run all matching files.
commands = ["/tmp/test.sh", "/tmp/test2.sh", "/tmp/collect_*.sh"]
## Timeout for each command to complete.
timeout = "5s"
# Data format to consume.
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
## Below configuration will be used for data_format = "graphite", can be ignored for other data_format
## If matching multiple measurement files, this string will be used to join the matched values.
#separator = "."
## Each template line requires a template pattern. It can have an optional
## filter before the template and separated by spaces. It can also have optional extra
## tags following the template. Multiple tags should be separated by commas and no spaces
## similar to the line protocol format. The can be only one default template.
## Templates support below format:
## 1. filter + template
## 2. filter + template + extra tag
## 3. filter + template with field key
## 4. default template
#templates = [
# "*.app env.service.resource.measurement",
# "stats.* .host.measurement* region=us-west,agent=sensu",
# "stats2.* .host.measurement.field",
# "measurement*"
#]
```
Other options for modifying the measurement names are:
@ -82,7 +65,7 @@ in influx line-protocol format.
#### Configuration
```
```toml
[[inputs.exec]]
# Shell/commands array
# compatible with old version
@ -90,6 +73,9 @@ in influx line-protocol format.
# command = "/usr/bin/line_protocol_collector"
commands = ["/usr/bin/line_protocol_collector","/tmp/test2.sh"]
## Timeout for each command to complete.
timeout = "5s"
# Data format to consume.
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "influx"
@ -123,12 +109,16 @@ We can also change the data_format to "graphite" to use the metrics collecting s
In this example a script called /tmp/test.sh and a script called /tmp/test2.sh are configured for [[inputs.exec]] in graphite format.
#### Configuration
```
```toml
# Read flattened metrics from one or more commands that output JSON to stdout
[[inputs.exec]]
# Shell/commands array
commands = ["/tmp/test.sh","/tmp/test2.sh"]
## Timeout for each command to complete.
timeout = "5s"
# Data format to consume.
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "graphite"

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
@ -182,23 +183,15 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
}
}
e.errChan = make(chan error, len(commands))
errChan := errchan.New(len(commands))
e.errChan = errChan.C
e.wg.Add(len(commands))
for _, command := range commands {
go e.ProcessCommand(command, acc)
}
e.wg.Wait()
select {
default:
close(e.errChan)
return nil
case err := <-e.errChan:
close(e.errChan)
return err
}
return errChan.Error()
}
func init() {

View File

@ -3,8 +3,6 @@ package haproxy
import (
"encoding/csv"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"io"
"net"
"net/http"
@ -13,6 +11,10 @@ import (
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
)
//CSV format: https://cbonte.github.io/haproxy-dconv/configuration-1.5.html#9.1
@ -113,20 +115,17 @@ func (g *haproxy) Gather(acc telegraf.Accumulator) error {
}
var wg sync.WaitGroup
var outerr error
for _, serv := range g.Servers {
wg.Add(1)
errChan := errchan.New(len(g.Servers))
wg.Add(len(g.Servers))
for _, server := range g.Servers {
go func(serv string) {
defer wg.Done()
outerr = g.gatherServer(serv, acc)
}(serv)
errChan.C <- g.gatherServer(serv, acc)
}(server)
}
wg.Wait()
return outerr
return errChan.Error()
}
func (g *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error {

View File

@ -5,9 +5,11 @@ import (
"fmt"
"net/http"
"strconv"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
)
@ -129,20 +131,18 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
}
}
var errChan = make(chan error, len(gatherFunctions))
var wg sync.WaitGroup
wg.Add(len(gatherFunctions))
errChan := errchan.New(len(gatherFunctions))
for _, f := range gatherFunctions {
go f(r, acc, errChan)
go func(gf gatherFunc) {
defer wg.Done()
gf(r, acc, errChan.C)
}(f)
}
wg.Wait()
for i := 1; i <= len(gatherFunctions); i++ {
err := <-errChan
if err != nil {
return err
}
}
return nil
return errChan.Error()
}
func (r *RabbitMQ) requestJSON(u string, target interface{}) error {

View File

@ -241,10 +241,14 @@ func gatherKeyspaceLine(
name string,
line string,
acc telegraf.Accumulator,
tags map[string]string,
global_tags map[string]string,
) {
if strings.Contains(line, "keys=") {
fields := make(map[string]interface{})
tags := make(map[string]string)
for k, v := range global_tags {
tags[k] = v
}
tags["database"] = name
dbparts := strings.Split(line, ",")
for _, dbp := range dbparts {

View File

@ -35,6 +35,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
err := gatherInfoOutput(rdr, &acc, tags)
require.NoError(t, err)
tags = map[string]string{"host": "redis.net", "role": "master"}
fields := map[string]interface{}{
"uptime": uint64(238),
"clients": uint64(1),
@ -70,13 +71,14 @@ func TestRedis_ParseMetrics(t *testing.T) {
"used_cpu_user_children": float64(0.00),
"keyspace_hitrate": float64(0.50),
}
keyspaceTags := map[string]string{"host": "redis.net", "role": "master", "database": "db0"}
keyspaceFields := map[string]interface{}{
"avg_ttl": uint64(0),
"expires": uint64(0),
"keys": uint64(2),
}
acc.AssertContainsTaggedFields(t, "redis", fields, tags)
acc.AssertContainsTaggedFields(t, "redis_keyspace", keyspaceFields, tags)
acc.AssertContainsTaggedFields(t, "redis_keyspace", keyspaceFields, keyspaceTags)
}
const testOutput = `# Server

View File

@ -18,7 +18,7 @@ It is supposed to be used to monitor actual memory usage in a cross platform fas
designed for informational purposes only.
- **free**: memory not being used at all (zeroed) that is readily available; note
that this doesn't reflect the actual memory available (use 'available' instead).
- **used_percent**: the percentage usage calculated as `(total - used) / total * 100`
- **used_percent**: the percentage usage calculated as `used / total * 100`
## Measurements:
#### Raw Memory measurements:

View File

@ -9,7 +9,7 @@ import (
"log"
"os"
"os/exec"
"path"
"path/filepath"
"runtime"
"strconv"
@ -19,7 +19,7 @@ import (
type Processes struct {
execPS func() ([]byte, error)
readProcFile func(statFile string) ([]byte, error)
readProcFile func(filename string) ([]byte, error)
forcePS bool
forceProc bool
@ -128,22 +128,16 @@ func (p *Processes) gatherFromPS(fields map[string]interface{}) error {
// get process states from /proc/(pid)/stat files
func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
files, err := ioutil.ReadDir("/proc")
filenames, err := filepath.Glob("/proc/[0-9]*/stat")
if err != nil {
return err
}
for _, file := range files {
if !file.IsDir() {
continue
}
for _, filename := range filenames {
_, err := os.Stat(filename)
statFile := path.Join("/proc", file.Name(), "stat")
data, err := p.readProcFile(statFile)
data, err := p.readProcFile(filename)
if err != nil {
if !file.IsDir() {
continue
}
return err
}
if data == nil {
@ -159,7 +153,7 @@ func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
stats := bytes.Fields(data)
if len(stats) < 3 {
return fmt.Errorf("Something is terribly wrong with %s", statFile)
return fmt.Errorf("Something is terribly wrong with %s", filename)
}
switch stats[0][0] {
case 'R':
@ -176,7 +170,7 @@ func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
fields["paging"] = fields["paging"].(int64) + int64(1)
default:
log.Printf("processes: Unknown state [ %s ] in file %s",
string(stats[0][0]), statFile)
string(stats[0][0]), filename)
}
fields["total"] = fields["total"].(int64) + int64(1)
@ -190,15 +184,12 @@ func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
return nil
}
func readProcFile(statFile string) ([]byte, error) {
if _, err := os.Stat(statFile); os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, err
}
data, err := ioutil.ReadFile(statFile)
func readProcFile(filename string) ([]byte, error) {
data, err := ioutil.ReadFile(filename)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}

View File

@ -3,9 +3,8 @@ package opentsdb
import (
"reflect"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
// "github.com/influxdata/telegraf/testutil"
// "github.com/stretchr/testify/require"
)
func TestBuildTagsTelnet(t *testing.T) {
@ -42,40 +41,40 @@ func TestBuildTagsTelnet(t *testing.T) {
}
}
func TestWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// func TestWrite(t *testing.T) {
// if testing.Short() {
// t.Skip("Skipping integration test in short mode")
// }
o := &OpenTSDB{
Host: testutil.GetLocalHost(),
Port: 4242,
Prefix: "prefix.test.",
}
// o := &OpenTSDB{
// Host: testutil.GetLocalHost(),
// Port: 4242,
// Prefix: "prefix.test.",
// }
// Verify that we can connect to the OpenTSDB instance
err := o.Connect()
require.NoError(t, err)
// // Verify that we can connect to the OpenTSDB instance
// err := o.Connect()
// require.NoError(t, err)
// Verify that we can successfully write data to OpenTSDB
err = o.Write(testutil.MockMetrics())
require.NoError(t, err)
// // Verify that we can successfully write data to OpenTSDB
// err = o.Write(testutil.MockMetrics())
// require.NoError(t, err)
// Verify postive and negative test cases of writing data
metrics := testutil.MockMetrics()
metrics = append(metrics, testutil.TestMetric(float64(1.0),
"justametric.float"))
metrics = append(metrics, testutil.TestMetric(int64(123456789),
"justametric.int"))
metrics = append(metrics, testutil.TestMetric(uint64(123456789012345),
"justametric.uint"))
metrics = append(metrics, testutil.TestMetric("Lorem Ipsum",
"justametric.string"))
metrics = append(metrics, testutil.TestMetric(float64(42.0),
"justametric.anotherfloat"))
metrics = append(metrics, testutil.TestMetric(float64(42.0),
"metric w/ specialchars"))
// // Verify postive and negative test cases of writing data
// metrics := testutil.MockMetrics()
// metrics = append(metrics, testutil.TestMetric(float64(1.0),
// "justametric.float"))
// metrics = append(metrics, testutil.TestMetric(int64(123456789),
// "justametric.int"))
// metrics = append(metrics, testutil.TestMetric(uint64(123456789012345),
// "justametric.uint"))
// metrics = append(metrics, testutil.TestMetric("Lorem Ipsum",
// "justametric.string"))
// metrics = append(metrics, testutil.TestMetric(float64(42.0),
// "justametric.anotherfloat"))
// metrics = append(metrics, testutil.TestMetric(float64(42.0),
// "metric w/ specialchars"))
err = o.Write(metrics)
require.NoError(t, err)
}
// err = o.Write(metrics)
// require.NoError(t, err)
// }

View File

@ -85,7 +85,7 @@ targets = {
supported_builds = {
"darwin": [ "amd64" ],
"windows": [ "amd64" ],
"linux": [ "amd64", "i386", "armhf", "armel", "arm64" ],
"linux": [ "amd64", "i386", "armhf", "armel", "arm64", "static_amd64" ],
"freebsd": [ "amd64" ]
}
@ -553,7 +553,7 @@ def package(build_output, pkg_name, version, nightly=False, iteration=1, static=
build_root = os.path.join(tmp_build_dir,
platform,
arch,
'{}-{}-{}'.format(PACKAGE_NAME, version, iteration))
PACKAGE_NAME)
os.makedirs(build_root)
# Copy packaging scripts to build directory