Merge branch 'master' into master

This commit is contained in:
silverboots 2016-07-17 22:48:24 +01:00 committed by GitHub
commit ddafea638f
37 changed files with 394 additions and 632 deletions

View File

@ -1,7 +1,13 @@
## v1.0
## v1.0 [unreleased]
### Release Notes
**Breaking Change**: Aerospike main server node measurements have been renamed
aerospike_node. Aerospike namespace measurements have been renamed to
aerospike_namespace. They will also now be tagged with the node_name
that they correspond to. This has been done to differentiate measurements
that pertain to node vs. namespace statistics.
**Breaking Change**: users of github_webhooks must change to the new
`[[inputs.webhooks]]` plugin.
@ -33,15 +39,24 @@ should now look like:
- [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests.
- [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin.
- [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD.
- [#1387](https://github.com/influxdata/telegraf/pull/1387): **Breaking Change** - Redis `role` tag renamed to `replication_role` to avoid global_tags override
- [#1437](https://github.com/influxdata/telegraf/pull/1437): Fetching Galera status metrics in MySQL
- [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib.
### Bugfixes
- [#1472](https://github.com/influxdata/telegraf/pull/1472): diskio input plugin: set 'skip_serial_number = true' by default to avoid high cardinality.
- [#1426](https://github.com/influxdata/telegraf/pull/1426): nil metrics panic fix.
- [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin.
- [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin.
- [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin.
- [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support.
- [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload.
- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix.
- [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes.
- [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load.
- [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior
- [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues.
## v1.0 beta 2 [2016-06-21]

2
Godeps
View File

@ -1,5 +1,6 @@
github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9
github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc
github.com/aerospike/aerospike-client-go 45863b7fd8640dc12f7fdd397104d97e1986f25a
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
@ -50,6 +51,7 @@ github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2
github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
github.com/yuin/gopher-lua bf3808abd44b1e55143a2d7f08571aaa80db1808
github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363
golang.org/x/crypto 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3
golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172

View File

@ -25,10 +25,6 @@ build-for-docker:
"-s -X main.version=$(VERSION)" \
./cmd/telegraf/telegraf.go
# Build with race detector
dev: prepare
go build -race -ldflags "-X main.version=$(VERSION)" ./...
# run package script
package:
./scripts/build.py --package --version="$(VERSION)" --platform=linux --arch=all --upload
@ -55,7 +51,7 @@ docker-run:
docker run --name postgres -p "5432:5432" -d postgres
docker run --name rabbitmq -p "15672:15672" -p "5672:5672" -d rabbitmq:3-management
docker run --name redis -p "6379:6379" -d redis
docker run --name aerospike -p "3000:3000" -d aerospike
docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
@ -68,7 +64,7 @@ docker-run-circle:
-e ADVERTISED_PORT=9092 \
-p "2181:2181" -p "9092:9092" \
-d spotify/kafka
docker run --name aerospike -p "3000:3000" -d aerospike
docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann

View File

@ -221,8 +221,6 @@ Telegraf can also collect metrics via the following service plugins:
* [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github)
* [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar)
* [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer)
* [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks)
* [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks)
We'll be adding support for many more over the coming months. Read on if you
want to add support for another service or third-party API.

View File

@ -268,11 +268,31 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
for _, o := range a.Config.Outputs {
for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
} else {
o.AddMetric(copyMetric(m))
}
}
}
}
}
func copyMetric(m telegraf.Metric) telegraf.Metric {
t := time.Time(m.Time())
tags := make(map[string]string)
fields := make(map[string]interface{})
for k, v := range m.Tags() {
tags[k] = v
}
for k, v := range m.Fields() {
fields[k] = v
}
out, _ := telegraf.NewMetric(m.Name(), tags, fields, t)
return out
}
// Run runs the agent daemon, gathering every Interval

View File

@ -197,6 +197,8 @@
# # Configuration for Graphite server to send metrics to
# [[outputs.graphite]]
# ## TCP endpoint for your graphite instance.
# ## If multiple endpoints are configured, the output will be load balanced.
# ## Only one of the endpoints will be written to with each iteration.
# servers = ["localhost:2003"]
# ## Prefix metrics name
# prefix = ""

View File

@ -1,104 +1,19 @@
package aerospike
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
as "github.com/aerospike/aerospike-client-go"
)
const (
MSG_HEADER_SIZE = 8
MSG_TYPE = 1 // Info is 1
MSG_VERSION = 2
)
var (
STATISTICS_COMMAND = []byte("statistics\n")
NAMESPACES_COMMAND = []byte("namespaces\n")
)
type aerospikeMessageHeader struct {
Version uint8
Type uint8
DataLen [6]byte
}
type aerospikeMessage struct {
aerospikeMessageHeader
Data []byte
}
// Taken from aerospike-client-go/types/message.go
func (msg *aerospikeMessage) Serialize() []byte {
msg.DataLen = msgLenToBytes(int64(len(msg.Data)))
buf := bytes.NewBuffer([]byte{})
binary.Write(buf, binary.BigEndian, msg.aerospikeMessageHeader)
binary.Write(buf, binary.BigEndian, msg.Data[:])
return buf.Bytes()
}
type aerospikeInfoCommand struct {
msg *aerospikeMessage
}
// Taken from aerospike-client-go/info.go
func (nfo *aerospikeInfoCommand) parseMultiResponse() (map[string]string, error) {
responses := make(map[string]string)
offset := int64(0)
begin := int64(0)
dataLen := int64(len(nfo.msg.Data))
// Create reusable StringBuilder for performance.
for offset < dataLen {
b := nfo.msg.Data[offset]
if b == '\t' {
name := nfo.msg.Data[begin:offset]
offset++
begin = offset
// Parse field value.
for offset < dataLen {
if nfo.msg.Data[offset] == '\n' {
break
}
offset++
}
if offset > begin {
value := nfo.msg.Data[begin:offset]
responses[string(name)] = string(value)
} else {
responses[string(name)] = ""
}
offset++
begin = offset
} else if b == '\n' {
if offset > begin {
name := nfo.msg.Data[begin:offset]
responses[string(name)] = ""
}
offset++
begin = offset
} else {
offset++
}
}
if offset > begin {
name := nfo.msg.Data[begin:offset]
responses[string(name)] = ""
}
return responses, nil
}
type Aerospike struct {
Servers []string
}
@ -115,7 +30,7 @@ func (a *Aerospike) SampleConfig() string {
}
func (a *Aerospike) Description() string {
return "Read stats from an aerospike server"
return "Read stats from aerospike server(s)"
}
func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
@ -124,214 +39,90 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
}
var wg sync.WaitGroup
var outerr error
errChan := errchan.New(len(a.Servers))
wg.Add(len(a.Servers))
for _, server := range a.Servers {
wg.Add(1)
go func(server string) {
go func(serv string) {
defer wg.Done()
outerr = a.gatherServer(server, acc)
errChan.C <- a.gatherServer(serv, acc)
}(server)
}
wg.Wait()
return outerr
return errChan.Error()
}
func (a *Aerospike) gatherServer(host string, acc telegraf.Accumulator) error {
aerospikeInfo, err := getMap(STATISTICS_COMMAND, host)
func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) error {
host, port, err := net.SplitHostPort(hostport)
if err != nil {
return fmt.Errorf("Aerospike info failed: %s", err)
return err
}
readAerospikeStats(aerospikeInfo, acc, host, "")
namespaces, err := getList(NAMESPACES_COMMAND, host)
iport, err := strconv.Atoi(port)
if err != nil {
return fmt.Errorf("Aerospike namespace list failed: %s", err)
iport = 3000
}
for ix := range namespaces {
nsInfo, err := getMap([]byte("namespace/"+namespaces[ix]+"\n"), host)
c, err := as.NewClient(host, iport)
if err != nil {
return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err)
return err
}
defer c.Close()
nodes := c.GetNodes()
for _, n := range nodes {
tags := map[string]string{
"node_name": n.GetName(),
"aerospike_host": hostport,
}
fields := make(map[string]interface{})
stats, err := as.RequestNodeStats(n)
if err != nil {
return err
}
for k, v := range stats {
if iv, err := strconv.ParseInt(v, 10, 64); err == nil {
fields[strings.Replace(k, "-", "_", -1)] = iv
}
}
acc.AddFields("aerospike_node", fields, tags, time.Now())
info, err := as.RequestNodeInfo(n, "namespaces")
if err != nil {
return err
}
namespaces := strings.Split(info["namespaces"], ";")
for _, namespace := range namespaces {
nTags := copyTags(tags)
nTags["namespace"] = namespace
nFields := make(map[string]interface{})
info, err := as.RequestNodeInfo(n, "namespace/"+namespace)
if err != nil {
continue
}
stats := strings.Split(info["namespace/"+namespace], ";")
for _, stat := range stats {
parts := strings.Split(stat, "=")
if len(parts) < 2 {
continue
}
if iv, err := strconv.ParseInt(parts[1], 10, 64); err == nil {
nFields[strings.Replace(parts[0], "-", "_", -1)] = iv
}
}
acc.AddFields("aerospike_namespace", nFields, nTags, time.Now())
}
readAerospikeStats(nsInfo, acc, host, namespaces[ix])
}
return nil
}
func getMap(key []byte, host string) (map[string]string, error) {
data, err := get(key, host)
if err != nil {
return nil, fmt.Errorf("Failed to get data: %s", err)
func copyTags(m map[string]string) map[string]string {
out := make(map[string]string)
for k, v := range m {
out[k] = v
}
parsed, err := unmarshalMapInfo(data, string(key))
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal data: %s", err)
}
return parsed, nil
}
func getList(key []byte, host string) ([]string, error) {
data, err := get(key, host)
if err != nil {
return nil, fmt.Errorf("Failed to get data: %s", err)
}
parsed, err := unmarshalListInfo(data, string(key))
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal data: %s", err)
}
return parsed, nil
}
func get(key []byte, host string) (map[string]string, error) {
var err error
var data map[string]string
asInfo := &aerospikeInfoCommand{
msg: &aerospikeMessage{
aerospikeMessageHeader: aerospikeMessageHeader{
Version: uint8(MSG_VERSION),
Type: uint8(MSG_TYPE),
DataLen: msgLenToBytes(int64(len(key))),
},
Data: key,
},
}
cmd := asInfo.msg.Serialize()
addr, err := net.ResolveTCPAddr("tcp", host)
if err != nil {
return data, fmt.Errorf("Lookup failed for '%s': %s", host, err)
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return data, fmt.Errorf("Connection failed for '%s': %s", host, err)
}
defer conn.Close()
_, err = conn.Write(cmd)
if err != nil {
return data, fmt.Errorf("Failed to send to '%s': %s", host, err)
}
msgHeader := bytes.NewBuffer(make([]byte, MSG_HEADER_SIZE))
_, err = readLenFromConn(conn, msgHeader.Bytes(), MSG_HEADER_SIZE)
if err != nil {
return data, fmt.Errorf("Failed to read header: %s", err)
}
err = binary.Read(msgHeader, binary.BigEndian, &asInfo.msg.aerospikeMessageHeader)
if err != nil {
return data, fmt.Errorf("Failed to unmarshal header: %s", err)
}
msgLen := msgLenFromBytes(asInfo.msg.aerospikeMessageHeader.DataLen)
if int64(len(asInfo.msg.Data)) != msgLen {
asInfo.msg.Data = make([]byte, msgLen)
}
_, err = readLenFromConn(conn, asInfo.msg.Data, len(asInfo.msg.Data))
if err != nil {
return data, fmt.Errorf("Failed to read from connection to '%s': %s", host, err)
}
data, err = asInfo.parseMultiResponse()
if err != nil {
return data, fmt.Errorf("Failed to parse response from '%s': %s", host, err)
}
return data, err
}
func readAerospikeStats(
stats map[string]string,
acc telegraf.Accumulator,
host string,
namespace string,
) {
fields := make(map[string]interface{})
tags := map[string]string{
"aerospike_host": host,
"namespace": "_service",
}
if namespace != "" {
tags["namespace"] = namespace
}
for key, value := range stats {
// We are going to ignore all string based keys
val, err := strconv.ParseInt(value, 10, 64)
if err == nil {
if strings.Contains(key, "-") {
key = strings.Replace(key, "-", "_", -1)
}
fields[key] = val
}
}
acc.AddFields("aerospike", fields, tags)
}
func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) {
key = strings.TrimSuffix(key, "\n")
res := map[string]string{}
v, exists := infoMap[key]
if !exists {
return res, fmt.Errorf("Key '%s' missing from info", key)
}
values := strings.Split(v, ";")
for i := range values {
kv := strings.Split(values[i], "=")
if len(kv) > 1 {
res[kv[0]] = kv[1]
}
}
return res, nil
}
func unmarshalListInfo(infoMap map[string]string, key string) ([]string, error) {
key = strings.TrimSuffix(key, "\n")
v, exists := infoMap[key]
if !exists {
return []string{}, fmt.Errorf("Key '%s' missing from info", key)
}
values := strings.Split(v, ";")
return values, nil
}
func readLenFromConn(c net.Conn, buffer []byte, length int) (total int, err error) {
var r int
for total < length {
r, err = c.Read(buffer[total:length])
total += r
if err != nil {
break
}
}
return
}
// Taken from aerospike-client-go/types/message.go
func msgLenToBytes(DataLen int64) [6]byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(DataLen))
res := [6]byte{}
copy(res[:], b[2:])
return res
}
// Taken from aerospike-client-go/types/message.go
func msgLenFromBytes(buf [6]byte) int64 {
nbytes := append([]byte{0, 0}, buf[:]...)
DataLen := binary.BigEndian.Uint64(nbytes)
return int64(DataLen)
return out
}
func init() {

View File

@ -1,7 +1,6 @@
package aerospike
import (
"reflect"
"testing"
"github.com/influxdata/telegraf/testutil"
@ -23,96 +22,29 @@ func TestAerospikeStatistics(t *testing.T) {
err := a.Gather(&acc)
require.NoError(t, err)
// Only use a few of the metrics
asMetrics := []string{
"transactions",
"stat_write_errs",
"stat_read_reqs",
"stat_write_reqs",
}
for _, metric := range asMetrics {
assert.True(t, acc.HasIntField("aerospike", metric), metric)
}
assert.True(t, acc.HasMeasurement("aerospike_node"))
assert.True(t, acc.HasMeasurement("aerospike_namespace"))
assert.True(t, acc.HasIntField("aerospike_node", "batch_error"))
}
func TestAerospikeMsgLenFromToBytes(t *testing.T) {
var i int64 = 8
assert.True(t, i == msgLenFromBytes(msgLenToBytes(i)))
}
func TestAerospikeStatisticsPartialErr(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
a := &Aerospike{
Servers: []string{
testutil.GetLocalHost() + ":3000",
testutil.GetLocalHost() + ":9999",
},
}
func TestReadAerospikeStatsNoNamespace(t *testing.T) {
// Also test for re-writing
var acc testutil.Accumulator
stats := map[string]string{
"stat-write-errs": "12345",
"stat_read_reqs": "12345",
}
readAerospikeStats(stats, &acc, "host1", "")
fields := map[string]interface{}{
"stat_write_errs": int64(12345),
"stat_read_reqs": int64(12345),
}
tags := map[string]string{
"aerospike_host": "host1",
"namespace": "_service",
}
acc.AssertContainsTaggedFields(t, "aerospike", fields, tags)
}
func TestReadAerospikeStatsNamespace(t *testing.T) {
var acc testutil.Accumulator
stats := map[string]string{
"stat_write_errs": "12345",
"stat_read_reqs": "12345",
}
readAerospikeStats(stats, &acc, "host1", "test")
fields := map[string]interface{}{
"stat_write_errs": int64(12345),
"stat_read_reqs": int64(12345),
}
tags := map[string]string{
"aerospike_host": "host1",
"namespace": "test",
}
acc.AssertContainsTaggedFields(t, "aerospike", fields, tags)
}
func TestAerospikeUnmarshalList(t *testing.T) {
i := map[string]string{
"test": "one;two;three",
}
expected := []string{"one", "two", "three"}
list, err := unmarshalListInfo(i, "test2")
assert.True(t, err != nil)
list, err = unmarshalListInfo(i, "test")
assert.True(t, err == nil)
equal := true
for ix := range expected {
if list[ix] != expected[ix] {
equal = false
break
}
}
assert.True(t, equal)
}
func TestAerospikeUnmarshalMap(t *testing.T) {
i := map[string]string{
"test": "key1=value1;key2=value2",
}
expected := map[string]string{
"key1": "value1",
"key2": "value2",
}
m, err := unmarshalMapInfo(i, "test")
assert.True(t, err == nil)
assert.True(t, reflect.DeepEqual(m, expected))
err := a.Gather(&acc)
require.Error(t, err)
assert.True(t, acc.HasMeasurement("aerospike_node"))
assert.True(t, acc.HasMeasurement("aerospike_namespace"))
assert.True(t, acc.HasIntField("aerospike_node", "batch_error"))
}

View File

@ -33,8 +33,9 @@ KEY1 VAL1\n
### Tags:
All measurements have the following tags:
- path
Measurements don't have any specific tags unless you define them at the telegraf level (defaults). We
used to have the path listed as a tag, but to keep cardinality in check it's easier to move this
value to a field. Thanks @sebito91!
### Configuration:

View File

@ -56,10 +56,9 @@ func (g *CGroup) gatherDir(dir string, acc telegraf.Accumulator) error {
return err
}
}
fields["path"] = dir
tags := map[string]string{"path": dir}
acc.AddFields(metricName, fields, tags)
acc.AddFields(metricName, fields, nil)
return nil
}

View File

@ -3,10 +3,13 @@
package cgroup
import (
"fmt"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"reflect"
)
var cg1 = &CGroup{
@ -21,15 +24,32 @@ var cg1 = &CGroup{
},
}
func assertContainsFields(a *testutil.Accumulator, t *testing.T, measurement string, fieldSet []map[string]interface{}) {
a.Lock()
defer a.Unlock()
numEquals := 0
for _, p := range a.Metrics {
if p.Measurement == measurement {
for _, fields := range fieldSet {
if reflect.DeepEqual(fields, p.Fields) {
numEquals++
}
}
}
}
if numEquals != len(fieldSet) {
assert.Fail(t, fmt.Sprintf("only %d of %d are equal", numEquals, len(fieldSet)))
}
}
func TestCgroupStatistics_1(t *testing.T) {
var acc testutil.Accumulator
err := cg1.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory",
}
fields := map[string]interface{}{
"memory.stat.cache": 1739362304123123123,
"memory.stat.rss": 1775325184,
@ -42,8 +62,9 @@ func TestCgroupStatistics_1(t *testing.T) {
"memory.limit_in_bytes": 223372036854771712,
"memory.use_hierarchy": "12-781",
"notify_on_release": 0,
"path": "testdata/memory",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields})
}
// ======================================================================
@ -59,16 +80,14 @@ func TestCgroupStatistics_2(t *testing.T) {
err := cg2.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"path": "testdata/cpu",
}
fields := map[string]interface{}{
"cpuacct.usage_percpu.0": -1452543795404,
"cpuacct.usage_percpu.1": 1376681271659,
"cpuacct.usage_percpu.2": 1450950799997,
"cpuacct.usage_percpu.3": -1473113374257,
"path": "testdata/cpu",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields})
}
// ======================================================================
@ -84,18 +103,16 @@ func TestCgroupStatistics_3(t *testing.T) {
err := cg3.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory/group_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_1",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{
fieldsTwo := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_2",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields, fieldsTwo})
}
// ======================================================================
@ -111,23 +128,22 @@ func TestCgroupStatistics_4(t *testing.T) {
err := cg4.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory/group_1/group_1_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_1/group_1_1",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{
fieldsTwo := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_1/group_1_2",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{
fieldsThree := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_2",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields, fieldsTwo, fieldsThree})
}
// ======================================================================
@ -143,18 +159,16 @@ func TestCgroupStatistics_5(t *testing.T) {
err := cg5.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory/group_1/group_1_1",
}
fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_1/group_1_1",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{
fieldsTwo := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_2/group_1_1",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields, fieldsTwo})
}
// ======================================================================
@ -170,13 +184,11 @@ func TestCgroupStatistics_6(t *testing.T) {
err := cg6.Gather(&acc)
require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory",
}
fields := map[string]interface{}{
"memory.usage_in_bytes": 3513667584,
"memory.use_hierarchy": "12-781",
"memory.kmem.limit_in_bytes": 9223372036854771712,
"path": "testdata/memory",
}
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields})
}

View File

@ -32,6 +32,8 @@ regex patterns.
'''
```
> **Note:** The InfluxDB log pattern in the default configuration only works for Influx versions 1.0.0-beta1 or higher.
## Grok Parser
The grok parser uses a slightly modified version of logstash "grok" patterns,

View File

@ -306,6 +306,10 @@ var mappings = []*mapping{
onServer: "Threadpool_",
inExport: "threadpool_",
},
{
onServer: "wsrep_",
inExport: "wsrep_",
},
}
var (

View File

@ -70,7 +70,7 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error {
p.Exe, p.PidFile, p.Pattern, p.User, err.Error())
} else {
for pid, proc := range p.pidmap {
p := NewSpecProcessor(p.ProcessName, p.Prefix, acc, proc, p.tagmap[pid])
p := NewSpecProcessor(p.ProcessName, p.Prefix, pid, acc, proc, p.tagmap[pid])
p.pushMetrics()
}
}
@ -140,7 +140,6 @@ func (p *Procstat) pidsFromFile() ([]int32, error) {
out = append(out, int32(pid))
p.tagmap[int32(pid)] = map[string]string{
"pidfile": p.PidFile,
"pid": strings.TrimSpace(string(pidString)),
}
}
}
@ -165,7 +164,6 @@ func (p *Procstat) pidsFromExe() ([]int32, error) {
out = append(out, int32(ipid))
p.tagmap[int32(ipid)] = map[string]string{
"exe": p.Exe,
"pid": pid,
}
} else {
outerr = err
@ -193,7 +191,6 @@ func (p *Procstat) pidsFromPattern() ([]int32, error) {
out = append(out, int32(ipid))
p.tagmap[int32(ipid)] = map[string]string{
"pattern": p.Pattern,
"pid": pid,
}
} else {
outerr = err
@ -221,7 +218,6 @@ func (p *Procstat) pidsFromUser() ([]int32, error) {
out = append(out, int32(ipid))
p.tagmap[int32(ipid)] = map[string]string{
"user": p.User,
"pid": pid,
}
} else {
outerr = err

View File

@ -10,6 +10,7 @@ import (
type SpecProcessor struct {
Prefix string
pid int32
tags map[string]string
fields map[string]interface{}
acc telegraf.Accumulator
@ -19,6 +20,7 @@ type SpecProcessor struct {
func NewSpecProcessor(
processName string,
prefix string,
pid int32,
acc telegraf.Accumulator,
p *process.Process,
tags map[string]string,
@ -33,6 +35,7 @@ func NewSpecProcessor(
}
return &SpecProcessor{
Prefix: prefix,
pid: pid,
tags: tags,
fields: make(map[string]interface{}),
acc: acc,
@ -45,7 +48,7 @@ func (p *SpecProcessor) pushMetrics() {
if p.Prefix != "" {
prefix = p.Prefix + "_"
}
fields := map[string]interface{}{}
fields := map[string]interface{}{"pid": p.pid}
numThreads, err := p.proc.NumThreads()
if err == nil {

View File

@ -10,6 +10,7 @@ import (
"io"
"math"
"mime"
"net/http"
"time"
"github.com/influxdata/telegraf"
@ -19,17 +20,9 @@ import (
"github.com/prometheus/common/expfmt"
)
// PrometheusParser is an object for Parsing incoming metrics.
type PrometheusParser struct {
// PromFormat
PromFormat map[string]string
// DefaultTags will be added to every parsed metric
// DefaultTags map[string]string
}
// Parse returns a slice of Metrics from a text representation of a
// metrics
func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
var parser expfmt.TextParser
// parse even if the buffer begins with a newline
@ -38,38 +31,35 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)
// Get format
mediatype, params, err := mime.ParseMediaType(p.PromFormat["Content-Type"])
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
// Prepare output
metricFamilies := make(map[string]*dto.MetricFamily)
if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" {
for {
metricFamily := &dto.MetricFamily{}
if _, err = pbutil.ReadDelimited(reader, metricFamily); err != nil {
if err == io.EOF {
mf := &dto.MetricFamily{}
if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
if ierr == io.EOF {
break
}
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err)
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr)
}
metricFamilies[metricFamily.GetName()] = metricFamily
metricFamilies[mf.GetName()] = mf
}
} else {
metricFamilies, err = parser.TextToMetricFamilies(reader)
if err != nil {
return nil, fmt.Errorf("reading text format failed: %s", err)
}
}
// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
/*
for key, value := range p.DefaultTags {
tags[key] = value
}
*/
// reading fields
fields := make(map[string]interface{})
if mf.GetType() == dto.MetricType_SUMMARY {
@ -102,33 +92,10 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
}
}
}
}
return metrics, err
}
// Parse one line
func (p *PrometheusParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, fmt.Errorf(
"Can not parse the line: %s, for data format: prometheus", line)
}
return metrics[0], nil
}
/*
// Set default tags
func (p *PrometheusParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
*/
// Get Quantiles from summary metric
func makeQuantiles(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})

View File

@ -1,6 +1,7 @@
package prometheus
import (
"net/http"
"testing"
"time"
@ -101,10 +102,8 @@ cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`
func TestParseValidPrometheus(t *testing.T) {
parser := PrometheusParser{}
// Gauge value
metrics, err := parser.Parse([]byte(validUniqueGauge))
metrics, err := Parse([]byte(validUniqueGauge), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "cadvisor_version_info", metrics[0].Name())
@ -118,8 +117,7 @@ func TestParseValidPrometheus(t *testing.T) {
}, metrics[0].Tags())
// Counter value
//parser.SetDefaultTags(map[string]string{"mytag": "mytagvalue"})
metrics, err = parser.Parse([]byte(validUniqueCounter))
metrics, err = Parse([]byte(validUniqueCounter), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "get_token_fail_count", metrics[0].Name())
@ -129,8 +127,8 @@ func TestParseValidPrometheus(t *testing.T) {
assert.Equal(t, map[string]string{}, metrics[0].Tags())
// Summary data
//parser.SetDefaultTags(map[string]string{})
metrics, err = parser.Parse([]byte(validUniqueSummary))
//SetDefaultTags(map[string]string{})
metrics, err = Parse([]byte(validUniqueSummary), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name())
@ -144,7 +142,7 @@ func TestParseValidPrometheus(t *testing.T) {
assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags())
// histogram data
metrics, err = parser.Parse([]byte(validUniqueHistogram))
metrics, err = Parse([]byte(validUniqueHistogram), http.Header{})
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "apiserver_request_latencies", metrics[0].Name())
@ -165,11 +163,3 @@ func TestParseValidPrometheus(t *testing.T) {
metrics[0].Tags())
}
func TestParseLineInvalidPrometheus(t *testing.T) {
parser := PrometheusParser{}
metric, err := parser.ParseLine(validUniqueLine)
assert.NotNil(t, err)
assert.Nil(t, metric)
}

View File

@ -13,6 +13,8 @@ import (
"time"
)
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3`
type Prometheus struct {
Urls []string
@ -86,7 +88,7 @@ var client = &http.Client{
func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
collectDate := time.Now()
var req, err = http.NewRequest("GET", url, nil)
req.Header = make(http.Header)
req.Header.Add("Accept", acceptHeader)
var token []byte
var resp *http.Response
@ -129,20 +131,9 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
return fmt.Errorf("error reading body: %s", err)
}
// Headers
headers := make(map[string]string)
for key, value := range headers {
headers[key] = value
}
// Prepare Prometheus parser config
promparser := PrometheusParser{
PromFormat: headers,
}
metrics, err := promparser.Parse(body)
metrics, err := Parse(body, resp.Header)
if err != nil {
return fmt.Errorf("error getting processing samples for %s: %s",
return fmt.Errorf("error reading metrics for %s: %s",
url, err)
}
// Add (or not) collected metrics

View File

@ -43,6 +43,7 @@
- latest_fork_usec
- connected_slaves
- master_repl_offset
- master_last_io_seconds_ago
- repl_backlog_active
- repl_backlog_size
- repl_backlog_histlen
@ -57,6 +58,7 @@
- All measurements have the following tags:
- port
- server
- replication role
### Example Output:

View File

@ -66,6 +66,7 @@ var Tracking = map[string]string{
"latest_fork_usec": "latest_fork_usec",
"connected_slaves": "connected_slaves",
"master_repl_offset": "master_repl_offset",
"master_last_io_seconds_ago": "master_last_io_seconds_ago",
"repl_backlog_active": "repl_backlog_active",
"repl_backlog_size": "repl_backlog_size",
"repl_backlog_histlen": "repl_backlog_histlen",
@ -74,7 +75,7 @@ var Tracking = map[string]string{
"used_cpu_user": "used_cpu_user",
"used_cpu_sys_children": "used_cpu_sys_children",
"used_cpu_user_children": "used_cpu_user_children",
"role": "role",
"role": "replication_role",
}
var ErrProtocolError = errors.New("redis protocol error")
@ -208,7 +209,7 @@ func gatherInfoOutput(
}
if name == "role" {
tags["role"] = val
tags["replication_role"] = val
continue
}

View File

@ -35,7 +35,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
err := gatherInfoOutput(rdr, &acc, tags)
require.NoError(t, err)
tags = map[string]string{"host": "redis.net", "role": "master"}
tags = map[string]string{"host": "redis.net", "replication_role": "master"}
fields := map[string]interface{}{
"uptime": uint64(238),
"clients": uint64(1),
@ -71,7 +71,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
"used_cpu_user_children": float64(0.00),
"keyspace_hitrate": float64(0.50),
}
keyspaceTags := map[string]string{"host": "redis.net", "role": "master", "database": "db0"}
keyspaceTags := map[string]string{"host": "redis.net", "replication_role": "master", "database": "db0"}
keyspaceFields := map[string]interface{}{
"avg_ttl": uint64(0),
"expires": uint64(0),

View File

@ -92,8 +92,8 @@ var diskIoSampleConfig = `
## disk partitions.
## Setting devices will restrict the stats to the specified devices.
# devices = ["sda", "sdb"]
## Uncomment the following line if you do not need disk serial numbers.
# skip_serial_number = true
## Uncomment the following line if you need disk serial numbers.
# skip_serial_number = false
`
func (_ *DiskIOStats) SampleConfig() string {
@ -151,6 +151,6 @@ func init() {
})
inputs.Add("diskio", func() telegraf.Input {
return &DiskIOStats{ps: &systemPS{}}
return &DiskIOStats{ps: &systemPS{}, SkipSerialNumber: true}
})
}

View File

@ -31,6 +31,8 @@ type TcpListener struct {
accept chan bool
// drops tracks the number of dropped metrics.
drops int
// malformed tracks the number of malformed packets
malformed int
// track the listener here so we can close it in Stop()
listener *net.TCPListener
@ -45,6 +47,9 @@ var dropwarn = "ERROR: tcp_listener message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n"
var malformedwarn = "WARNING: tcp_listener has received %d malformed packets" +
" thus far."
const sampleConfig = `
## Address and port to host TCP listener on
service_address = ":8094"
@ -243,8 +248,10 @@ func (t *TcpListener) tcpParser() error {
if err == nil {
t.storeMetrics(metrics)
} else {
log.Printf("Malformed packet: [%s], Error: %s\n",
string(packet), err)
t.malformed++
if t.malformed == 1 || t.malformed%1000 == 0 {
log.Printf(malformedwarn, t.malformed)
}
}
}
}

View File

@ -27,6 +27,8 @@ type UdpListener struct {
done chan struct{}
// drops tracks the number of dropped metrics.
drops int
// malformed tracks the number of malformed packets
malformed int
parser parsers.Parser
@ -44,6 +46,9 @@ var dropwarn = "ERROR: udp_listener message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n"
var malformedwarn = "WARNING: udp_listener has received %d malformed packets" +
" thus far."
const sampleConfig = `
## Address and port to host UDP listener on
service_address = ":8092"
@ -152,7 +157,10 @@ func (u *UdpListener) udpParser() error {
if err == nil {
u.storeMetrics(metrics)
} else {
log.Printf("Malformed packet: [%s], Error: %s\n", packet, err)
u.malformed++
if u.malformed == 1 || u.malformed%1000 == 0 {
log.Printf(malformedwarn, u.malformed)
}
}
}
}

View File

@ -32,7 +32,7 @@ echo mntr | nc localhost 2181
Meta:
- units: int64
- tags: `server=<hostname> port=<port>`
- tags: `server=<hostname> port=<port> state=<leader|follower>`
Measurement names:
- zookeeper_avg_latency
@ -55,8 +55,12 @@ Measurement names:
Meta:
- units: string
- tags: `server=<hostname> port=<port>`
- tags: `server=<hostname> port=<port> state=<leader|follower>`
Measurement names:
- zookeeper_version
- zookeeper_server_state
### Tags:
- All measurements have the following tags:
-

View File

@ -55,6 +55,7 @@ func (z *Zookeeper) Gather(acc telegraf.Accumulator) error {
}
func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error {
var zookeeper_state string
_, _, err := net.SplitHostPort(address)
if err != nil {
address = address + ":2181"
@ -78,7 +79,6 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error
if len(service) != 2 {
return fmt.Errorf("Invalid service address: %s", address)
}
tags := map[string]string{"server": service[0], "port": service[1]}
fields := make(map[string]interface{})
for scanner.Scan() {
@ -92,6 +92,9 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error
}
measurement := strings.TrimPrefix(parts[1], "zk_")
if measurement == "server_state" {
zookeeper_state = parts[2]
} else {
sValue := string(parts[2])
iVal, err := strconv.ParseInt(sValue, 10, 64)
@ -101,6 +104,12 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error
fields[measurement] = sValue
}
}
}
tags := map[string]string{
"server": service[0],
"port": service[1],
"state": zookeeper_state,
}
acc.AddFields("zookeeper", fields, tags)
return nil

View File

@ -9,6 +9,8 @@ via raw TCP.
# Configuration for Graphite server to send metrics to
[[outputs.graphite]]
## TCP endpoint for your graphite instance.
## If multiple endpoints are configured, the output will be load balanced.
## Only one of the endpoints will be written to with each iteration.
servers = ["localhost:2003"]
## Prefix metrics name
prefix = ""

View File

@ -2,7 +2,6 @@ package graphite
import (
"errors"
"fmt"
"log"
"math/rand"
"net"
@ -25,6 +24,8 @@ type Graphite struct {
var sampleConfig = `
## TCP endpoint for your graphite instance.
## If multiple endpoints are configured, output will be load balanced.
## Only one of the endpoints will be written to with each iteration.
servers = ["localhost:2003"]
## Prefix metrics name
prefix = ""
@ -96,9 +97,12 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Send data to a random server
p := rand.Perm(len(g.conns))
for _, n := range p {
if _, e := fmt.Fprint(g.conns[n], graphitePoints); e != nil {
if g.Timeout > 0 {
g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
}
if _, e := g.conns[n].Write([]byte(graphitePoints)); e != nil {
// Error
log.Println("ERROR: " + err.Error())
log.Println("ERROR: " + e.Error())
// Let's try the next one
} else {
// Success

View File

@ -29,7 +29,9 @@ type Instrumental struct {
const (
DefaultHost = "collector.instrumentalapp.com"
AuthFormat = "hello version go/telegraf/1.0\nauthenticate %s\n"
HelloMessage = "hello version go/telegraf/1.1\n"
AuthFormat = "authenticate %s\n"
HandshakeFormat = HelloMessage + AuthFormat
)
var (
@ -52,6 +54,7 @@ var sampleConfig = `
func (i *Instrumental) Connect() error {
connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration)
if err != nil {
i.conn = nil
return err
@ -151,6 +154,11 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
return err
}
// force the connection closed after sending data
// to deal with various disconnection scenarios and eschew holding
// open idle connections en masse
i.Close()
return nil
}
@ -163,7 +171,7 @@ func (i *Instrumental) SampleConfig() string {
}
func (i *Instrumental) authenticate(conn net.Conn) error {
_, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken)
_, err := fmt.Fprintf(conn, HandshakeFormat, i.ApiToken)
if err != nil {
return err
}

View File

@ -24,7 +24,6 @@ func TestWrite(t *testing.T) {
ApiToken: "abc123token",
Prefix: "my.prefix",
}
i.Connect()
// Default to gauge
m1, _ := telegraf.NewMetric(
@ -40,10 +39,8 @@ func TestWrite(t *testing.T) {
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
// Simulate a connection close and reconnect.
metrics := []telegraf.Metric{m1, m2}
i.Write(metrics)
i.Close()
// Counter and Histogram are increments
m3, _ := telegraf.NewMetric(
@ -70,7 +67,6 @@ func TestWrite(t *testing.T) {
i.Write(metrics)
wg.Wait()
i.Close()
}
func TCPServer(t *testing.T, wg *sync.WaitGroup) {
@ -82,10 +78,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tp := textproto.NewReader(reader)
hello, _ := tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello)
assert.Equal(t, "hello version go/telegraf/1.1", hello)
auth, _ := tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth)
conn.Write([]byte("ok\nok\n"))
data1, _ := tp.ReadLine()
@ -99,10 +94,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tp = textproto.NewReader(reader)
hello, _ = tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello)
assert.Equal(t, "hello version go/telegraf/1.1", hello)
auth, _ = tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth)
conn.Write([]byte("ok\nok\n"))
data3, _ := tp.ReadLine()

View File

@ -153,8 +153,7 @@ func (l *Librato) Description() string {
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
gauges := []*Gauge{}
serializer := graphite.GraphiteSerializer{Template: l.Template}
bucket := serializer.SerializeBucketName(m.Name(), m.Tags())
bucket := graphite.SerializeBucketName(m.Name(), m.Tags(), l.Template, "")
for fieldName, value := range m.Fields() {
gauge := &Gauge{
Name: graphite.InsertField(bucket, fieldName),

View File

@ -6,6 +6,7 @@ import (
"net/http"
"regexp"
"strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
@ -26,6 +27,10 @@ var (
type PrometheusClient struct {
Listen string
metrics map[string]prometheus.Metric
sync.Mutex
}
var sampleConfig = `
@ -34,6 +39,7 @@ var sampleConfig = `
`
func (p *PrometheusClient) Start() error {
prometheus.MustRegister(p)
defer func() {
if r := recover(); r != nil {
// recovering from panic here because there is no way to stop a
@ -78,7 +84,27 @@ func (p *PrometheusClient) Description() string {
return "Configuration for the Prometheus client to spawn"
}
// Implements prometheus.Collector
func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
}
// Implements prometheus.Collector
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
p.Lock()
defer p.Unlock()
for _, m := range p.metrics {
ch <- m
}
}
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
p.Lock()
defer p.Unlock()
p.metrics = make(map[string]prometheus.Metric)
if len(metrics) == 0 {
return nil
}
@ -124,45 +150,23 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
continue
}
mVec := prometheus.NewUntypedVec(
prometheus.UntypedOpts{
Name: mname,
Help: "Telegraf collected metric",
},
labels,
)
collector, err := prometheus.RegisterOrGet(mVec)
if err != nil {
log.Printf("prometheus_client: Metric failed to register with prometheus, %s", err)
continue
}
mVec, ok := collector.(*prometheus.UntypedVec)
if !ok {
continue
}
desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
var metric prometheus.Metric
var err error
switch val := val.(type) {
case int64:
m, err := mVec.GetMetricWith(l)
if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
continue
}
m.Set(float64(val))
metric, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, float64(val))
case float64:
m, err := mVec.GetMetricWith(l)
if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
continue
}
m.Set(val)
metric, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, val)
default:
continue
}
if err != nil {
log.Printf("ERROR creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
}
p.metrics[desc.String()] = metric
}
}
return nil

View File

@ -10,22 +10,23 @@ import (
const DEFAULT_TEMPLATE = "host.tags.measurement.field"
var fieldDeleter = strings.NewReplacer(".FIELDNAME", "", "FIELDNAME.", "")
var (
fieldDeleter = strings.NewReplacer(".FIELDNAME", "", "FIELDNAME.", "")
sanitizedChars = strings.NewReplacer("/", "-", "@", "-", "*", "-", " ", "_", "..", ".")
)
type GraphiteSerializer struct {
Prefix string
Template string
}
var sanitizedChars = strings.NewReplacer("/", "-", "@", "-", "*", "-", " ", "_", "..", ".")
func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) {
out := []string{}
// Convert UnixNano to Unix timestamps
timestamp := metric.UnixNano() / 1000000000
bucket := s.SerializeBucketName(metric.Name(), metric.Tags())
bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix)
if bucket == "" {
return out, nil
}
@ -51,12 +52,14 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error)
// FIELDNAME. It is up to the user to replace this. This is so that
// SerializeBucketName can be called just once per measurement, rather than
// once per field. See GraphiteSerializer.InsertField() function.
func (s *GraphiteSerializer) SerializeBucketName(
func SerializeBucketName(
measurement string,
tags map[string]string,
template string,
prefix string,
) string {
if s.Template == "" {
s.Template = DEFAULT_TEMPLATE
if template == "" {
template = DEFAULT_TEMPLATE
}
tagsCopy := make(map[string]string)
for k, v := range tags {
@ -64,7 +67,7 @@ func (s *GraphiteSerializer) SerializeBucketName(
}
var out []string
templateParts := strings.Split(s.Template, ".")
templateParts := strings.Split(template, ".")
for _, templatePart := range templateParts {
switch templatePart {
case "measurement":
@ -96,10 +99,10 @@ func (s *GraphiteSerializer) SerializeBucketName(
return ""
}
if s.Prefix == "" {
if prefix == "" {
return sanitizedChars.Replace(strings.Join(out, "."))
}
return sanitizedChars.Replace(s.Prefix + "." + strings.Join(out, "."))
return sanitizedChars.Replace(prefix + "." + strings.Join(out, "."))
}
// InsertField takes the bucket string from SerializeBucketName and replaces the

View File

@ -225,8 +225,7 @@ func TestSerializeBucketNameNoHost(t *testing.T) {
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{}
mS := s.SerializeBucketName(m.Name(), m.Tags())
mS := SerializeBucketName(m.Name(), m.Tags(), "", "")
expS := "cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
@ -240,8 +239,7 @@ func TestSerializeBucketNameHost(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{}
mS := s.SerializeBucketName(m.Name(), m.Tags())
mS := SerializeBucketName(m.Name(), m.Tags(), "", "")
expS := "localhost.cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
@ -255,8 +253,7 @@ func TestSerializeBucketNamePrefix(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Prefix: "prefix"}
mS := s.SerializeBucketName(m.Name(), m.Tags())
mS := SerializeBucketName(m.Name(), m.Tags(), "", "prefix")
expS := "prefix.localhost.cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
@ -270,8 +267,7 @@ func TestTemplate1(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Template: template1}
mS := s.SerializeBucketName(m.Name(), m.Tags())
mS := SerializeBucketName(m.Name(), m.Tags(), template1, "")
expS := "cpu0.us-west-2.localhost.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
@ -285,8 +281,7 @@ func TestTemplate2(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Template: template2}
mS := s.SerializeBucketName(m.Name(), m.Tags())
mS := SerializeBucketName(m.Name(), m.Tags(), template2, "")
expS := "localhost.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
@ -300,8 +295,7 @@ func TestTemplate3(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Template: template3}
mS := s.SerializeBucketName(m.Name(), m.Tags())
mS := SerializeBucketName(m.Name(), m.Tags(), template3, "")
expS := "localhost.cpu0.us-west-2.FIELDNAME"
assert.Equal(t, expS, mS)
@ -315,8 +309,7 @@ func TestTemplate4(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Template: template4}
mS := s.SerializeBucketName(m.Name(), m.Tags())
mS := SerializeBucketName(m.Name(), m.Tags(), template4, "")
expS := "localhost.cpu0.us-west-2.cpu"
assert.Equal(t, expS, mS)
@ -330,8 +323,7 @@ func TestTemplate5(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Template: template5}
mS := s.SerializeBucketName(m.Name(), m.Tags())
mS := SerializeBucketName(m.Name(), m.Tags(), template5, "")
expS := "localhost.us-west-2.cpu0.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
@ -345,8 +337,7 @@ func TestTemplate6(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Template: template6}
mS := s.SerializeBucketName(m.Name(), m.Tags())
mS := SerializeBucketName(m.Name(), m.Tags(), template6, "")
expS := "localhost.cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS)

View File

@ -69,6 +69,8 @@ exit_if_fail telegraf -config $tmpdir/config.toml \
-test -input-filter cpu:mem
cat $GOPATH/bin/telegraf | gzip > $CIRCLE_ARTIFACTS/telegraf.gz
go build -o telegraf-race -race -ldflags "-X main.version=${VERSION}-RACE" cmd/telegraf/telegraf.go
cat telegraf-race | gzip > $CIRCLE_ARTIFACTS/telegraf-race.gz
eval "git describe --exact-match HEAD"
if [ $? -eq 0 ]; then

View File

@ -37,6 +37,10 @@ chmod 755 $LOG_DIR
if [[ -L /etc/init.d/telegraf ]]; then
rm -f /etc/init.d/telegraf
fi
# Remove legacy symlink, if it exists
if [[ -L /etc/systemd/system/telegraf.service ]]; then
rm -f /etc/systemd/system/telegraf.service
fi
# Add defaults file, if it doesn't exist
if [[ ! -f /etc/default/telegraf ]]; then

View File

@ -15,4 +15,3 @@ KillMode=control-group
[Install]
WantedBy=multi-user.target
Alias=telegraf.service