Merge branch 'master' into master

This commit is contained in:
bagelswitch 2016-07-18 08:28:52 -07:00 committed by GitHub
commit 51215b52e6
48 changed files with 754 additions and 587 deletions

View File

@ -2,6 +2,12 @@
### Release Notes ### Release Notes
**Breaking Change**: Aerospike main server node measurements have been renamed
aerospike_node. Aerospike namespace measurements have been renamed to
aerospike_namespace. They will also now be tagged with the node_name
that they correspond to. This has been done to differentiate measurements
that pertain to node vs. namespace statistics.
**Breaking Change**: users of github_webhooks must change to the new **Breaking Change**: users of github_webhooks must change to the new
`[[inputs.webhooks]]` plugin. `[[inputs.webhooks]]` plugin.
@ -30,10 +36,17 @@ should now look like:
- [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez! - [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez!
- [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin. - [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin.
- [#1408](https://github.com/influxdata/telegraf/pull/1408): mandrill webhook plugin.
- [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests. - [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests.
- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin. - [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin.
- [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin. - [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin.
- [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD. - [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD.
- [#1369](https://github.com/influxdata/telegraf/pull/1480): add ability to read redis from a socket.
- [#1387](https://github.com/influxdata/telegraf/pull/1387): **Breaking Change** - Redis `role` tag renamed to `replication_role` to avoid global_tags override
- [#1437](https://github.com/influxdata/telegraf/pull/1437): Fetching Galera status metrics in MySQL
- [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib.
- [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin.
- [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag.
### Bugfixes ### Bugfixes
@ -45,6 +58,14 @@ should now look like:
- [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support. - [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support.
- [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload. - [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload.
- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix. - [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix.
- [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes.
- [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load.
- [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior
- [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues.
- [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix.
- [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin
- [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config.
- [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors.
## v1.0 beta 2 [2016-06-21] ## v1.0 beta 2 [2016-06-21]

2
Godeps
View File

@ -1,5 +1,6 @@
github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9 github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9
github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc
github.com/aerospike/aerospike-client-go 45863b7fd8640dc12f7fdd397104d97e1986f25a
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
@ -50,6 +51,7 @@ github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2 github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2
github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866 github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
github.com/yuin/gopher-lua bf3808abd44b1e55143a2d7f08571aaa80db1808
github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363
golang.org/x/crypto 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3 golang.org/x/crypto 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3
golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172 golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172

View File

@ -25,10 +25,6 @@ build-for-docker:
"-s -X main.version=$(VERSION)" \ "-s -X main.version=$(VERSION)" \
./cmd/telegraf/telegraf.go ./cmd/telegraf/telegraf.go
# Build with race detector
dev: prepare
go build -race -ldflags "-X main.version=$(VERSION)" ./...
# run package script # run package script
package: package:
./scripts/build.py --package --version="$(VERSION)" --platform=linux --arch=all --upload ./scripts/build.py --package --version="$(VERSION)" --platform=linux --arch=all --upload
@ -55,7 +51,7 @@ docker-run:
docker run --name postgres -p "5432:5432" -d postgres docker run --name postgres -p "5432:5432" -d postgres
docker run --name rabbitmq -p "15672:15672" -p "5672:5672" -d rabbitmq:3-management docker run --name rabbitmq -p "15672:15672" -p "5672:5672" -d rabbitmq:3-management
docker run --name redis -p "6379:6379" -d redis docker run --name redis -p "6379:6379" -d redis
docker run --name aerospike -p "3000:3000" -d aerospike docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann docker run --name riemann -p "5555:5555" -d blalor/riemann
@ -68,7 +64,7 @@ docker-run-circle:
-e ADVERTISED_PORT=9092 \ -e ADVERTISED_PORT=9092 \
-p "2181:2181" -p "9092:9092" \ -p "2181:2181" -p "9092:9092" \
-d spotify/kafka -d spotify/kafka
docker run --name aerospike -p "3000:3000" -d aerospike docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann docker run --name riemann -p "5555:5555" -d blalor/riemann

View File

@ -220,6 +220,7 @@ Telegraf can also collect metrics via the following service plugins:
* [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer) * [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer)
* [webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks) * [webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks)
* [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github) * [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github)
* [mandrill](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/mandrill)
* [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar) * [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar)
* [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer) * [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer)

View File

@ -268,11 +268,31 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush() a.flush()
case m := <-metricC: case m := <-metricC:
for _, o := range a.Config.Outputs { for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m) o.AddMetric(m)
} else {
o.AddMetric(copyMetric(m))
} }
} }
} }
}
}
func copyMetric(m telegraf.Metric) telegraf.Metric {
t := time.Time(m.Time())
tags := make(map[string]string)
fields := make(map[string]interface{})
for k, v := range m.Tags() {
tags[k] = v
}
for k, v := range m.Fields() {
fields[k] = v
}
out, _ := telegraf.NewMetric(m.Name(), tags, fields, t)
return out
} }
// Run runs the agent daemon, gathering every Interval // Run runs the agent daemon, gathering every Interval

View File

@ -197,6 +197,8 @@
# # Configuration for Graphite server to send metrics to # # Configuration for Graphite server to send metrics to
# [[outputs.graphite]] # [[outputs.graphite]]
# ## TCP endpoint for your graphite instance. # ## TCP endpoint for your graphite instance.
# ## If multiple endpoints are configured, the output will be load balanced.
# ## Only one of the endpoints will be written to with each iteration.
# servers = ["localhost:2003"] # servers = ["localhost:2003"]
# ## Prefix metrics name # ## Prefix metrics name
# prefix = "" # prefix = ""

View File

@ -1,104 +1,19 @@
package aerospike package aerospike
import ( import (
"bytes"
"encoding/binary"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"net" "net"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
as "github.com/aerospike/aerospike-client-go"
) )
const (
MSG_HEADER_SIZE = 8
MSG_TYPE = 1 // Info is 1
MSG_VERSION = 2
)
var (
STATISTICS_COMMAND = []byte("statistics\n")
NAMESPACES_COMMAND = []byte("namespaces\n")
)
type aerospikeMessageHeader struct {
Version uint8
Type uint8
DataLen [6]byte
}
type aerospikeMessage struct {
aerospikeMessageHeader
Data []byte
}
// Taken from aerospike-client-go/types/message.go
func (msg *aerospikeMessage) Serialize() []byte {
msg.DataLen = msgLenToBytes(int64(len(msg.Data)))
buf := bytes.NewBuffer([]byte{})
binary.Write(buf, binary.BigEndian, msg.aerospikeMessageHeader)
binary.Write(buf, binary.BigEndian, msg.Data[:])
return buf.Bytes()
}
type aerospikeInfoCommand struct {
msg *aerospikeMessage
}
// Taken from aerospike-client-go/info.go
func (nfo *aerospikeInfoCommand) parseMultiResponse() (map[string]string, error) {
responses := make(map[string]string)
offset := int64(0)
begin := int64(0)
dataLen := int64(len(nfo.msg.Data))
// Create reusable StringBuilder for performance.
for offset < dataLen {
b := nfo.msg.Data[offset]
if b == '\t' {
name := nfo.msg.Data[begin:offset]
offset++
begin = offset
// Parse field value.
for offset < dataLen {
if nfo.msg.Data[offset] == '\n' {
break
}
offset++
}
if offset > begin {
value := nfo.msg.Data[begin:offset]
responses[string(name)] = string(value)
} else {
responses[string(name)] = ""
}
offset++
begin = offset
} else if b == '\n' {
if offset > begin {
name := nfo.msg.Data[begin:offset]
responses[string(name)] = ""
}
offset++
begin = offset
} else {
offset++
}
}
if offset > begin {
name := nfo.msg.Data[begin:offset]
responses[string(name)] = ""
}
return responses, nil
}
type Aerospike struct { type Aerospike struct {
Servers []string Servers []string
} }
@ -115,7 +30,7 @@ func (a *Aerospike) SampleConfig() string {
} }
func (a *Aerospike) Description() string { func (a *Aerospike) Description() string {
return "Read stats from an aerospike server" return "Read stats from aerospike server(s)"
} }
func (a *Aerospike) Gather(acc telegraf.Accumulator) error { func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
@ -124,214 +39,90 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
} }
var wg sync.WaitGroup var wg sync.WaitGroup
errChan := errchan.New(len(a.Servers))
var outerr error wg.Add(len(a.Servers))
for _, server := range a.Servers { for _, server := range a.Servers {
wg.Add(1) go func(serv string) {
go func(server string) {
defer wg.Done() defer wg.Done()
outerr = a.gatherServer(server, acc) errChan.C <- a.gatherServer(serv, acc)
}(server) }(server)
} }
wg.Wait() wg.Wait()
return outerr return errChan.Error()
} }
func (a *Aerospike) gatherServer(host string, acc telegraf.Accumulator) error { func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) error {
aerospikeInfo, err := getMap(STATISTICS_COMMAND, host) host, port, err := net.SplitHostPort(hostport)
if err != nil { if err != nil {
return fmt.Errorf("Aerospike info failed: %s", err) return err
} }
readAerospikeStats(aerospikeInfo, acc, host, "")
namespaces, err := getList(NAMESPACES_COMMAND, host) iport, err := strconv.Atoi(port)
if err != nil { if err != nil {
return fmt.Errorf("Aerospike namespace list failed: %s", err) iport = 3000
} }
for ix := range namespaces {
nsInfo, err := getMap([]byte("namespace/"+namespaces[ix]+"\n"), host) c, err := as.NewClient(host, iport)
if err != nil { if err != nil {
return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err) return err
}
defer c.Close()
nodes := c.GetNodes()
for _, n := range nodes {
tags := map[string]string{
"node_name": n.GetName(),
"aerospike_host": hostport,
}
fields := make(map[string]interface{})
stats, err := as.RequestNodeStats(n)
if err != nil {
return err
}
for k, v := range stats {
if iv, err := strconv.ParseInt(v, 10, 64); err == nil {
fields[strings.Replace(k, "-", "_", -1)] = iv
}
}
acc.AddFields("aerospike_node", fields, tags, time.Now())
info, err := as.RequestNodeInfo(n, "namespaces")
if err != nil {
return err
}
namespaces := strings.Split(info["namespaces"], ";")
for _, namespace := range namespaces {
nTags := copyTags(tags)
nTags["namespace"] = namespace
nFields := make(map[string]interface{})
info, err := as.RequestNodeInfo(n, "namespace/"+namespace)
if err != nil {
continue
}
stats := strings.Split(info["namespace/"+namespace], ";")
for _, stat := range stats {
parts := strings.Split(stat, "=")
if len(parts) < 2 {
continue
}
if iv, err := strconv.ParseInt(parts[1], 10, 64); err == nil {
nFields[strings.Replace(parts[0], "-", "_", -1)] = iv
}
}
acc.AddFields("aerospike_namespace", nFields, nTags, time.Now())
} }
readAerospikeStats(nsInfo, acc, host, namespaces[ix])
} }
return nil return nil
} }
func getMap(key []byte, host string) (map[string]string, error) { func copyTags(m map[string]string) map[string]string {
data, err := get(key, host) out := make(map[string]string)
if err != nil { for k, v := range m {
return nil, fmt.Errorf("Failed to get data: %s", err) out[k] = v
} }
parsed, err := unmarshalMapInfo(data, string(key)) return out
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal data: %s", err)
}
return parsed, nil
}
func getList(key []byte, host string) ([]string, error) {
data, err := get(key, host)
if err != nil {
return nil, fmt.Errorf("Failed to get data: %s", err)
}
parsed, err := unmarshalListInfo(data, string(key))
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal data: %s", err)
}
return parsed, nil
}
func get(key []byte, host string) (map[string]string, error) {
var err error
var data map[string]string
asInfo := &aerospikeInfoCommand{
msg: &aerospikeMessage{
aerospikeMessageHeader: aerospikeMessageHeader{
Version: uint8(MSG_VERSION),
Type: uint8(MSG_TYPE),
DataLen: msgLenToBytes(int64(len(key))),
},
Data: key,
},
}
cmd := asInfo.msg.Serialize()
addr, err := net.ResolveTCPAddr("tcp", host)
if err != nil {
return data, fmt.Errorf("Lookup failed for '%s': %s", host, err)
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return data, fmt.Errorf("Connection failed for '%s': %s", host, err)
}
defer conn.Close()
_, err = conn.Write(cmd)
if err != nil {
return data, fmt.Errorf("Failed to send to '%s': %s", host, err)
}
msgHeader := bytes.NewBuffer(make([]byte, MSG_HEADER_SIZE))
_, err = readLenFromConn(conn, msgHeader.Bytes(), MSG_HEADER_SIZE)
if err != nil {
return data, fmt.Errorf("Failed to read header: %s", err)
}
err = binary.Read(msgHeader, binary.BigEndian, &asInfo.msg.aerospikeMessageHeader)
if err != nil {
return data, fmt.Errorf("Failed to unmarshal header: %s", err)
}
msgLen := msgLenFromBytes(asInfo.msg.aerospikeMessageHeader.DataLen)
if int64(len(asInfo.msg.Data)) != msgLen {
asInfo.msg.Data = make([]byte, msgLen)
}
_, err = readLenFromConn(conn, asInfo.msg.Data, len(asInfo.msg.Data))
if err != nil {
return data, fmt.Errorf("Failed to read from connection to '%s': %s", host, err)
}
data, err = asInfo.parseMultiResponse()
if err != nil {
return data, fmt.Errorf("Failed to parse response from '%s': %s", host, err)
}
return data, err
}
func readAerospikeStats(
stats map[string]string,
acc telegraf.Accumulator,
host string,
namespace string,
) {
fields := make(map[string]interface{})
tags := map[string]string{
"aerospike_host": host,
"namespace": "_service",
}
if namespace != "" {
tags["namespace"] = namespace
}
for key, value := range stats {
// We are going to ignore all string based keys
val, err := strconv.ParseInt(value, 10, 64)
if err == nil {
if strings.Contains(key, "-") {
key = strings.Replace(key, "-", "_", -1)
}
fields[key] = val
}
}
acc.AddFields("aerospike", fields, tags)
}
func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) {
key = strings.TrimSuffix(key, "\n")
res := map[string]string{}
v, exists := infoMap[key]
if !exists {
return res, fmt.Errorf("Key '%s' missing from info", key)
}
values := strings.Split(v, ";")
for i := range values {
kv := strings.Split(values[i], "=")
if len(kv) > 1 {
res[kv[0]] = kv[1]
}
}
return res, nil
}
func unmarshalListInfo(infoMap map[string]string, key string) ([]string, error) {
key = strings.TrimSuffix(key, "\n")
v, exists := infoMap[key]
if !exists {
return []string{}, fmt.Errorf("Key '%s' missing from info", key)
}
values := strings.Split(v, ";")
return values, nil
}
func readLenFromConn(c net.Conn, buffer []byte, length int) (total int, err error) {
var r int
for total < length {
r, err = c.Read(buffer[total:length])
total += r
if err != nil {
break
}
}
return
}
// Taken from aerospike-client-go/types/message.go
func msgLenToBytes(DataLen int64) [6]byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(DataLen))
res := [6]byte{}
copy(res[:], b[2:])
return res
}
// Taken from aerospike-client-go/types/message.go
func msgLenFromBytes(buf [6]byte) int64 {
nbytes := append([]byte{0, 0}, buf[:]...)
DataLen := binary.BigEndian.Uint64(nbytes)
return int64(DataLen)
} }
func init() { func init() {

View File

@ -1,7 +1,6 @@
package aerospike package aerospike
import ( import (
"reflect"
"testing" "testing"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -23,96 +22,29 @@ func TestAerospikeStatistics(t *testing.T) {
err := a.Gather(&acc) err := a.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
// Only use a few of the metrics assert.True(t, acc.HasMeasurement("aerospike_node"))
asMetrics := []string{ assert.True(t, acc.HasMeasurement("aerospike_namespace"))
"transactions", assert.True(t, acc.HasIntField("aerospike_node", "batch_error"))
"stat_write_errs",
"stat_read_reqs",
"stat_write_reqs",
}
for _, metric := range asMetrics {
assert.True(t, acc.HasIntField("aerospike", metric), metric)
}
} }
func TestAerospikeMsgLenFromToBytes(t *testing.T) { func TestAerospikeStatisticsPartialErr(t *testing.T) {
var i int64 = 8 if testing.Short() {
assert.True(t, i == msgLenFromBytes(msgLenToBytes(i))) t.Skip("Skipping integration test in short mode")
} }
a := &Aerospike{
Servers: []string{
testutil.GetLocalHost() + ":3000",
testutil.GetLocalHost() + ":9999",
},
}
func TestReadAerospikeStatsNoNamespace(t *testing.T) {
// Also test for re-writing
var acc testutil.Accumulator var acc testutil.Accumulator
stats := map[string]string{
"stat-write-errs": "12345",
"stat_read_reqs": "12345",
}
readAerospikeStats(stats, &acc, "host1", "")
fields := map[string]interface{}{ err := a.Gather(&acc)
"stat_write_errs": int64(12345), require.Error(t, err)
"stat_read_reqs": int64(12345),
} assert.True(t, acc.HasMeasurement("aerospike_node"))
tags := map[string]string{ assert.True(t, acc.HasMeasurement("aerospike_namespace"))
"aerospike_host": "host1", assert.True(t, acc.HasIntField("aerospike_node", "batch_error"))
"namespace": "_service",
}
acc.AssertContainsTaggedFields(t, "aerospike", fields, tags)
}
func TestReadAerospikeStatsNamespace(t *testing.T) {
var acc testutil.Accumulator
stats := map[string]string{
"stat_write_errs": "12345",
"stat_read_reqs": "12345",
}
readAerospikeStats(stats, &acc, "host1", "test")
fields := map[string]interface{}{
"stat_write_errs": int64(12345),
"stat_read_reqs": int64(12345),
}
tags := map[string]string{
"aerospike_host": "host1",
"namespace": "test",
}
acc.AssertContainsTaggedFields(t, "aerospike", fields, tags)
}
func TestAerospikeUnmarshalList(t *testing.T) {
i := map[string]string{
"test": "one;two;three",
}
expected := []string{"one", "two", "three"}
list, err := unmarshalListInfo(i, "test2")
assert.True(t, err != nil)
list, err = unmarshalListInfo(i, "test")
assert.True(t, err == nil)
equal := true
for ix := range expected {
if list[ix] != expected[ix] {
equal = false
break
}
}
assert.True(t, equal)
}
func TestAerospikeUnmarshalMap(t *testing.T) {
i := map[string]string{
"test": "key1=value1;key2=value2",
}
expected := map[string]string{
"key1": "value1",
"key2": "value2",
}
m, err := unmarshalMapInfo(i, "test")
assert.True(t, err == nil)
assert.True(t, reflect.DeepEqual(m, expected))
} }

View File

@ -148,7 +148,7 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) {
tokens := parseJmxMetricRequest(r.(map[string]interface{})["mbean"].(string)) tokens := parseJmxMetricRequest(r.(map[string]interface{})["mbean"].(string))
// Requests with wildcards for keyspace or table names will return nested // Requests with wildcards for keyspace or table names will return nested
// maps in the json response // maps in the json response
if tokens["type"] == "Table" && (tokens["keyspace"] == "*" || if (tokens["type"] == "Table" || tokens["type"] == "ColumnFamily") && (tokens["keyspace"] == "*" ||
tokens["scope"] == "*") { tokens["scope"] == "*") {
if valuesMap, ok := out["value"]; ok { if valuesMap, ok := out["value"]; ok {
for k, v := range valuesMap.(map[string]interface{}) { for k, v := range valuesMap.(map[string]interface{}) {

View File

@ -33,8 +33,9 @@ KEY1 VAL1\n
### Tags: ### Tags:
All measurements have the following tags: Measurements don't have any specific tags unless you define them at the telegraf level (defaults). We
- path used to have the path listed as a tag, but to keep cardinality in check it's easier to move this
value to a field. Thanks @sebito91!
### Configuration: ### Configuration:

View File

@ -56,10 +56,9 @@ func (g *CGroup) gatherDir(dir string, acc telegraf.Accumulator) error {
return err return err
} }
} }
fields["path"] = dir
tags := map[string]string{"path": dir} acc.AddFields(metricName, fields, nil)
acc.AddFields(metricName, fields, tags)
return nil return nil
} }

View File

@ -3,10 +3,13 @@
package cgroup package cgroup
import ( import (
"fmt"
"testing" "testing"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"reflect"
) )
var cg1 = &CGroup{ var cg1 = &CGroup{
@ -21,15 +24,32 @@ var cg1 = &CGroup{
}, },
} }
func assertContainsFields(a *testutil.Accumulator, t *testing.T, measurement string, fieldSet []map[string]interface{}) {
a.Lock()
defer a.Unlock()
numEquals := 0
for _, p := range a.Metrics {
if p.Measurement == measurement {
for _, fields := range fieldSet {
if reflect.DeepEqual(fields, p.Fields) {
numEquals++
}
}
}
}
if numEquals != len(fieldSet) {
assert.Fail(t, fmt.Sprintf("only %d of %d are equal", numEquals, len(fieldSet)))
}
}
func TestCgroupStatistics_1(t *testing.T) { func TestCgroupStatistics_1(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
err := cg1.Gather(&acc) err := cg1.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory",
}
fields := map[string]interface{}{ fields := map[string]interface{}{
"memory.stat.cache": 1739362304123123123, "memory.stat.cache": 1739362304123123123,
"memory.stat.rss": 1775325184, "memory.stat.rss": 1775325184,
@ -42,8 +62,9 @@ func TestCgroupStatistics_1(t *testing.T) {
"memory.limit_in_bytes": 223372036854771712, "memory.limit_in_bytes": 223372036854771712,
"memory.use_hierarchy": "12-781", "memory.use_hierarchy": "12-781",
"notify_on_release": 0, "notify_on_release": 0,
"path": "testdata/memory",
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields})
} }
// ====================================================================== // ======================================================================
@ -59,16 +80,14 @@ func TestCgroupStatistics_2(t *testing.T) {
err := cg2.Gather(&acc) err := cg2.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
tags := map[string]string{
"path": "testdata/cpu",
}
fields := map[string]interface{}{ fields := map[string]interface{}{
"cpuacct.usage_percpu.0": -1452543795404, "cpuacct.usage_percpu.0": -1452543795404,
"cpuacct.usage_percpu.1": 1376681271659, "cpuacct.usage_percpu.1": 1376681271659,
"cpuacct.usage_percpu.2": 1450950799997, "cpuacct.usage_percpu.2": 1450950799997,
"cpuacct.usage_percpu.3": -1473113374257, "cpuacct.usage_percpu.3": -1473113374257,
"path": "testdata/cpu",
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields})
} }
// ====================================================================== // ======================================================================
@ -84,18 +103,16 @@ func TestCgroupStatistics_3(t *testing.T) {
err := cg3.Gather(&acc) err := cg3.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory/group_1",
}
fields := map[string]interface{}{ fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712, "memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_1",
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{ fieldsTwo := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_2", "path": "testdata/memory/group_2",
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields, fieldsTwo})
} }
// ====================================================================== // ======================================================================
@ -111,23 +128,22 @@ func TestCgroupStatistics_4(t *testing.T) {
err := cg4.Gather(&acc) err := cg4.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory/group_1/group_1_1",
}
fields := map[string]interface{}{ fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712, "memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_1/group_1_1",
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{ fieldsTwo := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_1/group_1_2", "path": "testdata/memory/group_1/group_1_2",
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{ fieldsThree := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_2", "path": "testdata/memory/group_2",
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields, fieldsTwo, fieldsThree})
} }
// ====================================================================== // ======================================================================
@ -143,18 +159,16 @@ func TestCgroupStatistics_5(t *testing.T) {
err := cg5.Gather(&acc) err := cg5.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory/group_1/group_1_1",
}
fields := map[string]interface{}{ fields := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712, "memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_1/group_1_1",
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{ fieldsTwo := map[string]interface{}{
"memory.limit_in_bytes": 223372036854771712,
"path": "testdata/memory/group_2/group_1_1", "path": "testdata/memory/group_2/group_1_1",
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields, fieldsTwo})
} }
// ====================================================================== // ======================================================================
@ -170,13 +184,11 @@ func TestCgroupStatistics_6(t *testing.T) {
err := cg6.Gather(&acc) err := cg6.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory",
}
fields := map[string]interface{}{ fields := map[string]interface{}{
"memory.usage_in_bytes": 3513667584, "memory.usage_in_bytes": 3513667584,
"memory.use_hierarchy": "12-781", "memory.use_hierarchy": "12-781",
"memory.kmem.limit_in_bytes": 9223372036854771712, "memory.kmem.limit_in_bytes": 9223372036854771712,
"path": "testdata/memory",
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) assertContainsFields(&acc, t, "cgroup", []map[string]interface{}{fields})
} }

View File

@ -48,8 +48,6 @@ type Exec struct {
parser parsers.Parser parser parsers.Parser
wg sync.WaitGroup
runner Runner runner Runner
errChan chan error errChan chan error
} }
@ -119,8 +117,8 @@ func (c CommandRunner) Run(
return out.Bytes(), nil return out.Bytes(), nil
} }
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) {
defer e.wg.Done() defer wg.Done()
out, err := e.runner.Run(e, command, acc) out, err := e.runner.Run(e, command, acc)
if err != nil { if err != nil {
@ -151,6 +149,7 @@ func (e *Exec) SetParser(parser parsers.Parser) {
} }
func (e *Exec) Gather(acc telegraf.Accumulator) error { func (e *Exec) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
// Legacy single command support // Legacy single command support
if e.Command != "" { if e.Command != "" {
e.Commands = append(e.Commands, e.Command) e.Commands = append(e.Commands, e.Command)
@ -190,11 +189,11 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
errChan := errchan.New(len(commands)) errChan := errchan.New(len(commands))
e.errChan = errChan.C e.errChan = errChan.C
e.wg.Add(len(commands)) wg.Add(len(commands))
for _, command := range commands { for _, command := range commands {
go e.ProcessCommand(command, acc) go e.ProcessCommand(command, acc, &wg)
} }
e.wg.Wait() wg.Wait()
return errChan.Error() return errChan.Error()
} }

View File

@ -54,8 +54,14 @@ var (
type Parser struct { type Parser struct {
Patterns []string Patterns []string
// namedPatterns is a list of internally-assigned names to the patterns
// specified by the user in Patterns.
// They will look like:
// GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc.
namedPatterns []string
CustomPatterns string CustomPatterns string
CustomPatternFiles []string CustomPatternFiles []string
Measurement string
// typeMap is a map of patterns -> capture name -> modifier, // typeMap is a map of patterns -> capture name -> modifier,
// ie, { // ie, {
@ -97,13 +103,24 @@ func (p *Parser) Compile() error {
return err return err
} }
p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns // Give Patterns fake names so that they can be treated as named
// "custom patterns"
p.namedPatterns = make([]string, len(p.Patterns))
for i, pattern := range p.Patterns {
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.namedPatterns[i] = "%{" + name + "}"
}
// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse
// them together as the same type of pattern.
p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns
if len(p.CustomPatterns) != 0 { if len(p.CustomPatterns) != 0 {
scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns)) scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns))
p.addCustomPatterns(scanner) p.addCustomPatterns(scanner)
} }
// Parse any custom pattern files supplied.
for _, filename := range p.CustomPatternFiles { for _, filename := range p.CustomPatternFiles {
file, err := os.Open(filename) file, err := os.Open(filename)
if err != nil { if err != nil {
@ -114,6 +131,10 @@ func (p *Parser) Compile() error {
p.addCustomPatterns(scanner) p.addCustomPatterns(scanner)
} }
if p.Measurement == "" {
p.Measurement = "logparser_grok"
}
return p.compileCustomPatterns() return p.compileCustomPatterns()
} }
@ -122,7 +143,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
var values map[string]string var values map[string]string
// the matching pattern string // the matching pattern string
var patternName string var patternName string
for _, pattern := range p.Patterns { for _, pattern := range p.namedPatterns {
if values, err = p.g.Parse(pattern, line); err != nil { if values, err = p.g.Parse(pattern, line); err != nil {
return nil, err return nil, err
} }
@ -215,7 +236,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
} }
} }
return telegraf.NewMetric("logparser_grok", tags, fields, p.tsModder.tsMod(timestamp)) return telegraf.NewMetric(p.Measurement, tags, fields, p.tsModder.tsMod(timestamp))
} }
func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) { func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) {

View File

@ -83,6 +83,31 @@ func Benchmark_ParseLine_CustomPattern(b *testing.B) {
benchM = m benchM = m
} }
func TestMeasurementName(t *testing.T) {
p := &Parser{
Measurement: "my_web_log",
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
}
assert.NoError(t, p.Compile())
// Parse an influxdb POST request
m, err := p.ParseLine(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`)
require.NotNil(t, m)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"resp_bytes": int64(2326),
"auth": "frank",
"client_ip": "127.0.0.1",
"http_version": float64(1.0),
"ident": "user-identifier",
"request": "/apache_pb.gif",
},
m.Fields())
assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
assert.Equal(t, "my_web_log", m.Name())
}
func TestBuiltinInfluxdbHttpd(t *testing.T) { func TestBuiltinInfluxdbHttpd(t *testing.T) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"}, Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"},
@ -98,7 +123,6 @@ func TestBuiltinInfluxdbHttpd(t *testing.T) {
"resp_bytes": int64(0), "resp_bytes": int64(0),
"auth": "-", "auth": "-",
"client_ip": "::1", "client_ip": "::1",
"resp_code": int64(204),
"http_version": float64(1.1), "http_version": float64(1.1),
"ident": "-", "ident": "-",
"referrer": "-", "referrer": "-",
@ -107,7 +131,7 @@ func TestBuiltinInfluxdbHttpd(t *testing.T) {
"agent": "InfluxDBClient", "agent": "InfluxDBClient",
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "POST"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "POST", "resp_code": "204"}, m.Tags())
// Parse an influxdb GET request // Parse an influxdb GET request
m, err = p.ParseLine(`[httpd] ::1 - - [14/Jun/2016:12:10:02 +0100] "GET /query?db=telegraf&q=SELECT+bytes%2Cresponse_time_us+FROM+logparser_grok+WHERE+http_method+%3D+%27GET%27+AND+response_time_us+%3E+0+AND+time+%3E+now%28%29+-+1h HTTP/1.1" 200 578 "http://localhost:8083/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.84 Safari/537.36" 8a3806f1-3220-11e6-8006-000000000000 988`) m, err = p.ParseLine(`[httpd] ::1 - - [14/Jun/2016:12:10:02 +0100] "GET /query?db=telegraf&q=SELECT+bytes%2Cresponse_time_us+FROM+logparser_grok+WHERE+http_method+%3D+%27GET%27+AND+response_time_us+%3E+0+AND+time+%3E+now%28%29+-+1h HTTP/1.1" 200 578 "http://localhost:8083/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.84 Safari/537.36" 8a3806f1-3220-11e6-8006-000000000000 988`)
@ -118,7 +142,6 @@ func TestBuiltinInfluxdbHttpd(t *testing.T) {
"resp_bytes": int64(578), "resp_bytes": int64(578),
"auth": "-", "auth": "-",
"client_ip": "::1", "client_ip": "::1",
"resp_code": int64(200),
"http_version": float64(1.1), "http_version": float64(1.1),
"ident": "-", "ident": "-",
"referrer": "http://localhost:8083/", "referrer": "http://localhost:8083/",
@ -127,7 +150,7 @@ func TestBuiltinInfluxdbHttpd(t *testing.T) {
"agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.84 Safari/537.36", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.84 Safari/537.36",
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "GET"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
} }
// common log format // common log format
@ -147,13 +170,12 @@ func TestBuiltinCommonLogFormat(t *testing.T) {
"resp_bytes": int64(2326), "resp_bytes": int64(2326),
"auth": "frank", "auth": "frank",
"client_ip": "127.0.0.1", "client_ip": "127.0.0.1",
"resp_code": int64(200),
"http_version": float64(1.0), "http_version": float64(1.0),
"ident": "user-identifier", "ident": "user-identifier",
"request": "/apache_pb.gif", "request": "/apache_pb.gif",
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "GET"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
} }
// combined log format // combined log format
@ -173,7 +195,6 @@ func TestBuiltinCombinedLogFormat(t *testing.T) {
"resp_bytes": int64(2326), "resp_bytes": int64(2326),
"auth": "frank", "auth": "frank",
"client_ip": "127.0.0.1", "client_ip": "127.0.0.1",
"resp_code": int64(200),
"http_version": float64(1.0), "http_version": float64(1.0),
"ident": "user-identifier", "ident": "user-identifier",
"request": "/apache_pb.gif", "request": "/apache_pb.gif",
@ -181,12 +202,12 @@ func TestBuiltinCombinedLogFormat(t *testing.T) {
"agent": "Mozilla", "agent": "Mozilla",
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "GET"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
} }
func TestCompileStringAndParse(t *testing.T) { func TestCompileStringAndParse(t *testing.T) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, Patterns: []string{"%{TEST_LOG_A}"},
CustomPatterns: ` CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag} RESPONSE_CODE %{NUMBER:response_code:tag}
@ -209,6 +230,41 @@ func TestCompileStringAndParse(t *testing.T) {
assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags()) assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags())
} }
func TestCompileErrorsOnInvalidPattern(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time:duration}
TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
`,
}
assert.Error(t, p.Compile())
metricA, _ := p.ParseLine(`1.25 200 192.168.1.1 5.432µs`)
require.Nil(t, metricA)
}
func TestParsePatternsWithoutCustom(t *testing.T) {
p := &Parser{
Patterns: []string{"%{POSINT:ts:ts-epochnano} response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}"},
}
assert.NoError(t, p.Compile())
metricA, err := p.ParseLine(`1466004605359052000 response_time=20821 mymetric=10890.645`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"response_time": int64(20821),
"metric": float64(10890.645),
},
metricA.Fields())
assert.Equal(t, map[string]string{}, metricA.Tags())
assert.Equal(t, time.Unix(0, 1466004605359052000), metricA.Time())
}
func TestParseEpochNano(t *testing.T) { func TestParseEpochNano(t *testing.T) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{MYAPP}"}, Patterns: []string{"%{MYAPP}"},
@ -392,7 +448,7 @@ func TestParseErrors(t *testing.T) {
TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:int} %{} TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:int} %{}
`, `,
} }
assert.NoError(t, p.Compile()) assert.Error(t, p.Compile())
_, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] notnumber 200 192.168.1.1 5.432µs 101`) _, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] notnumber 200 192.168.1.1 5.432µs 101`)
assert.Error(t, err) assert.Error(t, err)

View File

@ -66,7 +66,7 @@ INFLUXDB_HTTPD_LOG \[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:r
# apache & nginx logs, this is also known as the "common log format" # apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format # see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:int} (?:%{NUMBER:resp_bytes:int}|-) COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)
# Combined log format is the same as the common log format but with the addition # Combined log format is the same as the common log format but with the addition
# of two quoted strings at the end for "referrer" and "agent" # of two quoted strings at the end for "referrer" and "agent"

View File

@ -62,7 +62,7 @@ INFLUXDB_HTTPD_LOG \[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:r
# apache & nginx logs, this is also known as the "common log format" # apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format # see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:int} (?:%{NUMBER:resp_bytes:int}|-) COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)
# Combined log format is the same as the common log format but with the addition # Combined log format is the same as the common log format but with the addition
# of two quoted strings at the end for "referrer" and "agent" # of two quoted strings at the end for "referrer" and "agent"

View File

@ -9,6 +9,7 @@ import (
"github.com/hpcloud/tail" "github.com/hpcloud/tail"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
@ -58,6 +59,8 @@ const sampleConfig = `
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
patterns = ["%{INFLUXDB_HTTPD_LOG}"] patterns = ["%{INFLUXDB_HTTPD_LOG}"]
## Name of the outputted measurement name.
measurement = "influxdb_log"
## Full path(s) to custom pattern files. ## Full path(s) to custom pattern files.
custom_pattern_files = [] custom_pattern_files = []
## Custom patterns can also be defined here. Put one pattern per line. ## Custom patterns can also be defined here. Put one pattern per line.
@ -108,11 +111,15 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
} }
// compile log parser patterns: // compile log parser patterns:
errChan := errchan.New(len(l.parsers))
for _, parser := range l.parsers { for _, parser := range l.parsers {
if err := parser.Compile(); err != nil { if err := parser.Compile(); err != nil {
return err errChan.C <- err
} }
} }
if err := errChan.Error(); err != nil {
return err
}
var seek tail.SeekInfo var seek tail.SeekInfo
if !l.FromBeginning { if !l.FromBeginning {
@ -123,24 +130,25 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.wg.Add(1) l.wg.Add(1)
go l.parser() go l.parser()
var errS string
// Create a "tailer" for each file // Create a "tailer" for each file
for _, filepath := range l.Files { for _, filepath := range l.Files {
g, err := globpath.Compile(filepath) g, err := globpath.Compile(filepath)
if err != nil { if err != nil {
log.Printf("ERROR Glob %s failed to compile, %s", filepath, err) log.Printf("ERROR Glob %s failed to compile, %s", filepath, err)
continue
} }
for file, _ := range g.Match() { files := g.Match()
errChan = errchan.New(len(files))
for file, _ := range files {
tailer, err := tail.TailFile(file, tailer, err := tail.TailFile(file,
tail.Config{ tail.Config{
ReOpen: true, ReOpen: true,
Follow: true, Follow: true,
Location: &seek, Location: &seek,
MustExist: true,
}) })
if err != nil { errChan.C <- err
errS += err.Error() + " "
continue
}
// create a goroutine for each "tailer" // create a goroutine for each "tailer"
l.wg.Add(1) l.wg.Add(1)
go l.receiver(tailer) go l.receiver(tailer)
@ -148,10 +156,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
} }
} }
if errS != "" { return errChan.Error()
return fmt.Errorf(errS)
}
return nil
} }
// receiver is launched as a goroutine to continuously watch a tailed logfile // receiver is launched as a goroutine to continuously watch a tailed logfile
@ -199,8 +204,6 @@ func (l *LogParserPlugin) parser() {
if m != nil { if m != nil {
l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
} }
} else {
log.Printf("Malformed log line in [%s], Error: %s\n", line, err)
} }
} }
} }

View File

@ -37,7 +37,7 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
assert.NoError(t, logparser.Start(&acc)) assert.Error(t, logparser.Start(&acc))
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
logparser.Stop() logparser.Stop()
@ -80,6 +80,8 @@ func TestGrokParseLogFiles(t *testing.T) {
map[string]string{}) map[string]string{})
} }
// Test that test_a.log line gets parsed even though we don't have the correct
// pattern available for test_b.log
func TestGrokParseLogFilesOneBad(t *testing.T) { func TestGrokParseLogFilesOneBad(t *testing.T) {
thisdir := getCurrentDir() thisdir := getCurrentDir()
p := &grok.Parser{ p := &grok.Parser{
@ -90,11 +92,12 @@ func TestGrokParseLogFilesOneBad(t *testing.T) {
logparser := &LogParserPlugin{ logparser := &LogParserPlugin{
FromBeginning: true, FromBeginning: true,
Files: []string{thisdir + "grok/testdata/*.log"}, Files: []string{thisdir + "grok/testdata/test_a.log"},
GrokParser: p, GrokParser: p,
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, logparser.Start(&acc)) assert.NoError(t, logparser.Start(&acc))
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)

View File

@ -306,6 +306,10 @@ var mappings = []*mapping{
onServer: "Threadpool_", onServer: "Threadpool_",
inExport: "threadpool_", inExport: "threadpool_",
}, },
{
onServer: "wsrep_",
inExport: "wsrep_",
},
} }
var ( var (

View File

@ -70,7 +70,7 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error {
p.Exe, p.PidFile, p.Pattern, p.User, err.Error()) p.Exe, p.PidFile, p.Pattern, p.User, err.Error())
} else { } else {
for pid, proc := range p.pidmap { for pid, proc := range p.pidmap {
p := NewSpecProcessor(p.ProcessName, p.Prefix, acc, proc, p.tagmap[pid]) p := NewSpecProcessor(p.ProcessName, p.Prefix, pid, acc, proc, p.tagmap[pid])
p.pushMetrics() p.pushMetrics()
} }
} }
@ -140,7 +140,6 @@ func (p *Procstat) pidsFromFile() ([]int32, error) {
out = append(out, int32(pid)) out = append(out, int32(pid))
p.tagmap[int32(pid)] = map[string]string{ p.tagmap[int32(pid)] = map[string]string{
"pidfile": p.PidFile, "pidfile": p.PidFile,
"pid": strings.TrimSpace(string(pidString)),
} }
} }
} }
@ -165,7 +164,6 @@ func (p *Procstat) pidsFromExe() ([]int32, error) {
out = append(out, int32(ipid)) out = append(out, int32(ipid))
p.tagmap[int32(ipid)] = map[string]string{ p.tagmap[int32(ipid)] = map[string]string{
"exe": p.Exe, "exe": p.Exe,
"pid": pid,
} }
} else { } else {
outerr = err outerr = err
@ -193,7 +191,6 @@ func (p *Procstat) pidsFromPattern() ([]int32, error) {
out = append(out, int32(ipid)) out = append(out, int32(ipid))
p.tagmap[int32(ipid)] = map[string]string{ p.tagmap[int32(ipid)] = map[string]string{
"pattern": p.Pattern, "pattern": p.Pattern,
"pid": pid,
} }
} else { } else {
outerr = err outerr = err
@ -221,7 +218,6 @@ func (p *Procstat) pidsFromUser() ([]int32, error) {
out = append(out, int32(ipid)) out = append(out, int32(ipid))
p.tagmap[int32(ipid)] = map[string]string{ p.tagmap[int32(ipid)] = map[string]string{
"user": p.User, "user": p.User,
"pid": pid,
} }
} else { } else {
outerr = err outerr = err

View File

@ -10,6 +10,7 @@ import (
type SpecProcessor struct { type SpecProcessor struct {
Prefix string Prefix string
pid int32
tags map[string]string tags map[string]string
fields map[string]interface{} fields map[string]interface{}
acc telegraf.Accumulator acc telegraf.Accumulator
@ -19,6 +20,7 @@ type SpecProcessor struct {
func NewSpecProcessor( func NewSpecProcessor(
processName string, processName string,
prefix string, prefix string,
pid int32,
acc telegraf.Accumulator, acc telegraf.Accumulator,
p *process.Process, p *process.Process,
tags map[string]string, tags map[string]string,
@ -33,6 +35,7 @@ func NewSpecProcessor(
} }
return &SpecProcessor{ return &SpecProcessor{
Prefix: prefix, Prefix: prefix,
pid: pid,
tags: tags, tags: tags,
fields: make(map[string]interface{}), fields: make(map[string]interface{}),
acc: acc, acc: acc,
@ -45,7 +48,7 @@ func (p *SpecProcessor) pushMetrics() {
if p.Prefix != "" { if p.Prefix != "" {
prefix = p.Prefix + "_" prefix = p.Prefix + "_"
} }
fields := map[string]interface{}{} fields := map[string]interface{}{"pid": p.pid}
numThreads, err := p.proc.NumThreads() numThreads, err := p.proc.NumThreads()
if err == nil { if err == nil {

View File

@ -43,6 +43,7 @@
- latest_fork_usec - latest_fork_usec
- connected_slaves - connected_slaves
- master_repl_offset - master_repl_offset
- master_last_io_seconds_ago
- repl_backlog_active - repl_backlog_active
- repl_backlog_size - repl_backlog_size
- repl_backlog_histlen - repl_backlog_histlen
@ -57,6 +58,7 @@
- All measurements have the following tags: - All measurements have the following tags:
- port - port
- server - server
- replication role
### Example Output: ### Example Output:

View File

@ -25,6 +25,7 @@ var sampleConfig = `
## e.g. ## e.g.
## tcp://localhost:6379 ## tcp://localhost:6379
## tcp://:password@192.168.99.100 ## tcp://:password@192.168.99.100
## unix:///var/run/redis.sock
## ##
## If no servers are specified, then localhost is used as the host. ## If no servers are specified, then localhost is used as the host.
## If no port is specified, 6379 is used ## If no port is specified, 6379 is used
@ -66,6 +67,7 @@ var Tracking = map[string]string{
"latest_fork_usec": "latest_fork_usec", "latest_fork_usec": "latest_fork_usec",
"connected_slaves": "connected_slaves", "connected_slaves": "connected_slaves",
"master_repl_offset": "master_repl_offset", "master_repl_offset": "master_repl_offset",
"master_last_io_seconds_ago": "master_last_io_seconds_ago",
"repl_backlog_active": "repl_backlog_active", "repl_backlog_active": "repl_backlog_active",
"repl_backlog_size": "repl_backlog_size", "repl_backlog_size": "repl_backlog_size",
"repl_backlog_histlen": "repl_backlog_histlen", "repl_backlog_histlen": "repl_backlog_histlen",
@ -74,16 +76,19 @@ var Tracking = map[string]string{
"used_cpu_user": "used_cpu_user", "used_cpu_user": "used_cpu_user",
"used_cpu_sys_children": "used_cpu_sys_children", "used_cpu_sys_children": "used_cpu_sys_children",
"used_cpu_user_children": "used_cpu_user_children", "used_cpu_user_children": "used_cpu_user_children",
"role": "role", "role": "replication_role",
} }
var ErrProtocolError = errors.New("redis protocol error") var ErrProtocolError = errors.New("redis protocol error")
const defaultPort = "6379"
// Reads stats from all configured servers accumulates stats. // Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any). // Returns one of the errors encountered while gather stats (if any).
func (r *Redis) Gather(acc telegraf.Accumulator) error { func (r *Redis) Gather(acc telegraf.Accumulator) error {
if len(r.Servers) == 0 { if len(r.Servers) == 0 {
url := &url.URL{ url := &url.URL{
Scheme: "tcp",
Host: ":6379", Host: ":6379",
} }
r.gatherServer(url, acc) r.gatherServer(url, acc)
@ -95,6 +100,10 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error {
var outerr error var outerr error
for _, serv := range r.Servers { for _, serv := range r.Servers {
if !strings.HasPrefix(serv, "tcp://") || !strings.HasPrefix(serv, "unix://") {
serv = "tcp://" + serv
}
u, err := url.Parse(serv) u, err := url.Parse(serv)
if err != nil { if err != nil {
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
@ -104,6 +113,13 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error {
u.Host = serv u.Host = serv
u.Path = "" u.Path = ""
} }
if u.Scheme == "tcp" {
_, _, err := net.SplitHostPort(u.Host)
if err != nil {
u.Host = u.Host + ":" + defaultPort
}
}
wg.Add(1) wg.Add(1)
go func(serv string) { go func(serv string) {
defer wg.Done() defer wg.Done()
@ -116,17 +132,17 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error {
return outerr return outerr
} }
const defaultPort = "6379"
func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
_, _, err := net.SplitHostPort(addr.Host) var address string
if err != nil {
addr.Host = addr.Host + ":" + defaultPort
}
c, err := net.DialTimeout("tcp", addr.Host, defaultTimeout) if addr.Scheme == "unix" {
address = addr.Path
} else {
address = addr.Host
}
c, err := net.DialTimeout(addr.Scheme, address, defaultTimeout)
if err != nil { if err != nil {
return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err) return fmt.Errorf("Unable to connect to redis server '%s': %s", address, err)
} }
defer c.Close() defer c.Close()
@ -154,12 +170,17 @@ func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
c.Write([]byte("EOF\r\n")) c.Write([]byte("EOF\r\n"))
rdr := bufio.NewReader(c) rdr := bufio.NewReader(c)
var tags map[string]string
if addr.Scheme == "unix" {
tags = map[string]string{"socket": addr.Path}
} else {
// Setup tags for all redis metrics // Setup tags for all redis metrics
host, port := "unknown", "unknown" host, port := "unknown", "unknown"
// If there's an error, ignore and use 'unknown' tags // If there's an error, ignore and use 'unknown' tags
host, port, _ = net.SplitHostPort(addr.Host) host, port, _ = net.SplitHostPort(addr.Host)
tags := map[string]string{"server": host, "port": port} tags = map[string]string{"server": host, "port": port}
}
return gatherInfoOutput(rdr, acc, tags) return gatherInfoOutput(rdr, acc, tags)
} }
@ -208,7 +229,7 @@ func gatherInfoOutput(
} }
if name == "role" { if name == "role" {
tags["role"] = val tags["replication_role"] = val
continue continue
} }

View File

@ -35,7 +35,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
err := gatherInfoOutput(rdr, &acc, tags) err := gatherInfoOutput(rdr, &acc, tags)
require.NoError(t, err) require.NoError(t, err)
tags = map[string]string{"host": "redis.net", "role": "master"} tags = map[string]string{"host": "redis.net", "replication_role": "master"}
fields := map[string]interface{}{ fields := map[string]interface{}{
"uptime": uint64(238), "uptime": uint64(238),
"clients": uint64(1), "clients": uint64(1),
@ -71,7 +71,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
"used_cpu_user_children": float64(0.00), "used_cpu_user_children": float64(0.00),
"keyspace_hitrate": float64(0.50), "keyspace_hitrate": float64(0.50),
} }
keyspaceTags := map[string]string{"host": "redis.net", "role": "master", "database": "db0"} keyspaceTags := map[string]string{"host": "redis.net", "replication_role": "master", "database": "db0"}
keyspaceFields := map[string]interface{}{ keyspaceFields := map[string]interface{}{
"avg_ttl": uint64(0), "avg_ttl": uint64(0),
"expires": uint64(0), "expires": uint64(0),

View File

@ -89,6 +89,7 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
ReOpen: true, ReOpen: true,
Follow: true, Follow: true,
Location: &seek, Location: &seek,
MustExist: true,
}) })
if err != nil { if err != nil {
errS += err.Error() + " " errS += err.Error() + " "

View File

@ -31,6 +31,8 @@ type TcpListener struct {
accept chan bool accept chan bool
// drops tracks the number of dropped metrics. // drops tracks the number of dropped metrics.
drops int drops int
// malformed tracks the number of malformed packets
malformed int
// track the listener here so we can close it in Stop() // track the listener here so we can close it in Stop()
listener *net.TCPListener listener *net.TCPListener
@ -45,6 +47,9 @@ var dropwarn = "ERROR: tcp_listener message queue full. " +
"We have dropped %d messages so far. " + "We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n" "You may want to increase allowed_pending_messages in the config\n"
var malformedwarn = "WARNING: tcp_listener has received %d malformed packets" +
" thus far."
const sampleConfig = ` const sampleConfig = `
## Address and port to host TCP listener on ## Address and port to host TCP listener on
service_address = ":8094" service_address = ":8094"
@ -243,8 +248,10 @@ func (t *TcpListener) tcpParser() error {
if err == nil { if err == nil {
t.storeMetrics(metrics) t.storeMetrics(metrics)
} else { } else {
log.Printf("Malformed packet: [%s], Error: %s\n", t.malformed++
string(packet), err) if t.malformed == 1 || t.malformed%1000 == 0 {
log.Printf(malformedwarn, t.malformed)
}
} }
} }
} }

View File

@ -27,6 +27,8 @@ type UdpListener struct {
done chan struct{} done chan struct{}
// drops tracks the number of dropped metrics. // drops tracks the number of dropped metrics.
drops int drops int
// malformed tracks the number of malformed packets
malformed int
parser parsers.Parser parser parsers.Parser
@ -44,6 +46,9 @@ var dropwarn = "ERROR: udp_listener message queue full. " +
"We have dropped %d messages so far. " + "We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n" "You may want to increase allowed_pending_messages in the config\n"
var malformedwarn = "WARNING: udp_listener has received %d malformed packets" +
" thus far."
const sampleConfig = ` const sampleConfig = `
## Address and port to host UDP listener on ## Address and port to host UDP listener on
service_address = ":8092" service_address = ":8092"
@ -152,7 +157,10 @@ func (u *UdpListener) udpParser() error {
if err == nil { if err == nil {
u.storeMetrics(metrics) u.storeMetrics(metrics)
} else { } else {
log.Printf("Malformed packet: [%s], Error: %s\n", packet, err) u.malformed++
if u.malformed == 1 || u.malformed%1000 == 0 {
log.Printf(malformedwarn, u.malformed)
}
} }
} }
} }

View File

@ -16,6 +16,7 @@ $ sudo service telegraf start
## Available webhooks ## Available webhooks
- [Github](github/) - [Github](github/)
- [Mandrill](mandrill/)
- [Rollbar](rollbar/) - [Rollbar](rollbar/)
## Adding new webhooks plugin ## Adding new webhooks plugin

View File

@ -0,0 +1,15 @@
# mandrill webhook
You should configure your Mandrill's Webhooks to point at the `webhooks` service. To do this go to `mandrillapp.com/` and click `Settings > Webhooks`. In the resulting page, click on `Add a Webhook`, select all events, and set the `URL` to `http://<my_ip>:1619/mandrill`, and click on `Create Webhook`.
## Events
See the [webhook doc](https://mandrill.zendesk.com/hc/en-us/articles/205583307-Message-Event-Webhook-format).
All events for logs the original timestamp, the event name and the unique identifier of the message that generated the event.
**Tags:**
* 'event' = `event.event` string
**Fields:**
* 'id' = `event._id` string

View File

@ -0,0 +1,56 @@
package mandrill
import (
"encoding/json"
"io/ioutil"
"log"
"net/http"
"net/url"
"time"
"github.com/gorilla/mux"
"github.com/influxdata/telegraf"
)
type MandrillWebhook struct {
Path string
acc telegraf.Accumulator
}
func (md *MandrillWebhook) Register(router *mux.Router, acc telegraf.Accumulator) {
router.HandleFunc(md.Path, md.returnOK).Methods("HEAD")
router.HandleFunc(md.Path, md.eventHandler).Methods("POST")
log.Printf("Started the webhooks_mandrill on %s\n", md.Path)
md.acc = acc
}
func (md *MandrillWebhook) returnOK(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func (md *MandrillWebhook) eventHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
data, err := url.ParseQuery(string(body))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
var events []MandrillEvent
err = json.Unmarshal([]byte(data.Get("mandrill_events")), &events)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
for _, event := range events {
md.acc.AddFields("mandrill_webhooks", event.Fields(), event.Tags(), time.Unix(event.TimeStamp, 0))
}
w.WriteHeader(http.StatusOK)
}

View File

@ -0,0 +1,24 @@
package mandrill
type Event interface {
Tags() map[string]string
Fields() map[string]interface{}
}
type MandrillEvent struct {
EventName string `json:"event"`
TimeStamp int64 `json:"ts"`
Id string `json:"_id"`
}
func (me *MandrillEvent) Tags() map[string]string {
return map[string]string{
"event": me.EventName,
}
}
func (me *MandrillEvent) Fields() map[string]interface{} {
return map[string]interface{}{
"id": me.Id,
}
}

View File

@ -0,0 +1,58 @@
package mandrill
func SendEventJSON() string {
return `
{
"event": "send",
"msg": {
"ts": 1365109999,
"subject": "This an example webhook message",
"email": "example.webhook@mandrillapp.com",
"sender": "example.sender@mandrillapp.com",
"tags": [
"webhook-example"
],
"opens": [
],
"clicks": [
],
"state": "sent",
"metadata": {
"user_id": 111
},
"_id": "exampleaaaaaaaaaaaaaaaaaaaaaaaaa",
"_version": "exampleaaaaaaaaaaaaaaa"
},
"_id": "id1",
"ts": 1384954004
}`
}
func HardBounceEventJSON() string {
return `
{
"event": "hard_bounce",
"msg": {
"ts": 1365109999,
"subject": "This an example webhook message",
"email": "example.webhook@mandrillapp.com",
"sender": "example.sender@mandrillapp.com",
"tags": [
"webhook-example"
],
"state": "bounced",
"metadata": {
"user_id": 111
},
"_id": "exampleaaaaaaaaaaaaaaaaaaaaaaaaa2",
"_version": "exampleaaaaaaaaaaaaaaa",
"bounce_description": "bad_mailbox",
"bgtools_code": 10,
"diag": "smtp;550 5.1.1 The email account that you tried to reach does not exist. Please try double-checking the recipient's email address for typos or unnecessary spaces."
},
"_id": "id2",
"ts": 1384954004
}`
}

View File

@ -0,0 +1,85 @@
package mandrill
import (
"github.com/influxdata/telegraf/testutil"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
)
func postWebhooks(md *MandrillWebhook, eventBody string) *httptest.ResponseRecorder {
body := url.Values{}
body.Set("mandrill_events", eventBody)
req, _ := http.NewRequest("POST", "/mandrill", strings.NewReader(body.Encode()))
w := httptest.NewRecorder()
md.eventHandler(w, req)
return w
}
func headRequest(md *MandrillWebhook) *httptest.ResponseRecorder {
req, _ := http.NewRequest("HEAD", "/mandrill", strings.NewReader(""))
w := httptest.NewRecorder()
md.returnOK(w, req)
return w
}
func TestHead(t *testing.T) {
md := &MandrillWebhook{Path: "/mandrill"}
resp := headRequest(md)
if resp.Code != http.StatusOK {
t.Errorf("HEAD returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)
}
}
func TestSendEvent(t *testing.T) {
var acc testutil.Accumulator
md := &MandrillWebhook{Path: "/mandrill", acc: &acc}
resp := postWebhooks(md, "["+SendEventJSON()+"]")
if resp.Code != http.StatusOK {
t.Errorf("POST send returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)
}
fields := map[string]interface{}{
"id": "id1",
}
tags := map[string]string{
"event": "send",
}
acc.AssertContainsTaggedFields(t, "mandrill_webhooks", fields, tags)
}
func TestMultipleEvents(t *testing.T) {
var acc testutil.Accumulator
md := &MandrillWebhook{Path: "/mandrill", acc: &acc}
resp := postWebhooks(md, "["+SendEventJSON()+","+HardBounceEventJSON()+"]")
if resp.Code != http.StatusOK {
t.Errorf("POST send returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)
}
fields := map[string]interface{}{
"id": "id1",
}
tags := map[string]string{
"event": "send",
}
acc.AssertContainsTaggedFields(t, "mandrill_webhooks", fields, tags)
fields = map[string]interface{}{
"id": "id2",
}
tags = map[string]string{
"event": "hard_bounce",
}
acc.AssertContainsTaggedFields(t, "mandrill_webhooks", fields, tags)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/github" "github.com/influxdata/telegraf/plugins/inputs/webhooks/github"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/mandrill"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar" "github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar"
) )
@ -26,6 +27,7 @@ type Webhooks struct {
ServiceAddress string ServiceAddress string
Github *github.GithubWebhook Github *github.GithubWebhook
Mandrill *mandrill.MandrillWebhook
Rollbar *rollbar.RollbarWebhook Rollbar *rollbar.RollbarWebhook
} }
@ -41,6 +43,9 @@ func (wb *Webhooks) SampleConfig() string {
[inputs.webhooks.github] [inputs.webhooks.github]
path = "/github" path = "/github"
[inputs.webhooks.mandrill]
path = "/mandrill"
[inputs.webhooks.rollbar] [inputs.webhooks.rollbar]
path = "/rollbar" path = "/rollbar"
` `

View File

@ -32,7 +32,7 @@ echo mntr | nc localhost 2181
Meta: Meta:
- units: int64 - units: int64
- tags: `server=<hostname> port=<port>` - tags: `server=<hostname> port=<port> state=<leader|follower>`
Measurement names: Measurement names:
- zookeeper_avg_latency - zookeeper_avg_latency
@ -55,8 +55,12 @@ Measurement names:
Meta: Meta:
- units: string - units: string
- tags: `server=<hostname> port=<port>` - tags: `server=<hostname> port=<port> state=<leader|follower>`
Measurement names: Measurement names:
- zookeeper_version - zookeeper_version
- zookeeper_server_state
### Tags:
- All measurements have the following tags:
-

View File

@ -55,6 +55,7 @@ func (z *Zookeeper) Gather(acc telegraf.Accumulator) error {
} }
func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error { func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error {
var zookeeper_state string
_, _, err := net.SplitHostPort(address) _, _, err := net.SplitHostPort(address)
if err != nil { if err != nil {
address = address + ":2181" address = address + ":2181"
@ -78,7 +79,6 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error
if len(service) != 2 { if len(service) != 2 {
return fmt.Errorf("Invalid service address: %s", address) return fmt.Errorf("Invalid service address: %s", address)
} }
tags := map[string]string{"server": service[0], "port": service[1]}
fields := make(map[string]interface{}) fields := make(map[string]interface{})
for scanner.Scan() { for scanner.Scan() {
@ -92,6 +92,9 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error
} }
measurement := strings.TrimPrefix(parts[1], "zk_") measurement := strings.TrimPrefix(parts[1], "zk_")
if measurement == "server_state" {
zookeeper_state = parts[2]
} else {
sValue := string(parts[2]) sValue := string(parts[2])
iVal, err := strconv.ParseInt(sValue, 10, 64) iVal, err := strconv.ParseInt(sValue, 10, 64)
@ -101,6 +104,12 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error
fields[measurement] = sValue fields[measurement] = sValue
} }
} }
}
tags := map[string]string{
"server": service[0],
"port": service[1],
"state": zookeeper_state,
}
acc.AddFields("zookeeper", fields, tags) acc.AddFields("zookeeper", fields, tags)
return nil return nil

View File

@ -9,6 +9,8 @@ via raw TCP.
# Configuration for Graphite server to send metrics to # Configuration for Graphite server to send metrics to
[[outputs.graphite]] [[outputs.graphite]]
## TCP endpoint for your graphite instance. ## TCP endpoint for your graphite instance.
## If multiple endpoints are configured, the output will be load balanced.
## Only one of the endpoints will be written to with each iteration.
servers = ["localhost:2003"] servers = ["localhost:2003"]
## Prefix metrics name ## Prefix metrics name
prefix = "" prefix = ""

View File

@ -2,7 +2,6 @@ package graphite
import ( import (
"errors" "errors"
"fmt"
"log" "log"
"math/rand" "math/rand"
"net" "net"
@ -25,6 +24,8 @@ type Graphite struct {
var sampleConfig = ` var sampleConfig = `
## TCP endpoint for your graphite instance. ## TCP endpoint for your graphite instance.
## If multiple endpoints are configured, output will be load balanced.
## Only one of the endpoints will be written to with each iteration.
servers = ["localhost:2003"] servers = ["localhost:2003"]
## Prefix metrics name ## Prefix metrics name
prefix = "" prefix = ""
@ -96,9 +97,12 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Send data to a random server // Send data to a random server
p := rand.Perm(len(g.conns)) p := rand.Perm(len(g.conns))
for _, n := range p { for _, n := range p {
if _, e := fmt.Fprint(g.conns[n], graphitePoints); e != nil { if g.Timeout > 0 {
g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
}
if _, e := g.conns[n].Write([]byte(graphitePoints)); e != nil {
// Error // Error
log.Println("ERROR: " + err.Error()) log.Println("ERROR: " + e.Error())
// Let's try the next one // Let's try the next one
} else { } else {
// Success // Success

View File

@ -29,7 +29,9 @@ type Instrumental struct {
const ( const (
DefaultHost = "collector.instrumentalapp.com" DefaultHost = "collector.instrumentalapp.com"
AuthFormat = "hello version go/telegraf/1.0\nauthenticate %s\n" HelloMessage = "hello version go/telegraf/1.1\n"
AuthFormat = "authenticate %s\n"
HandshakeFormat = HelloMessage + AuthFormat
) )
var ( var (
@ -52,6 +54,7 @@ var sampleConfig = `
func (i *Instrumental) Connect() error { func (i *Instrumental) Connect() error {
connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration) connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration)
if err != nil { if err != nil {
i.conn = nil i.conn = nil
return err return err
@ -151,6 +154,11 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
return err return err
} }
// force the connection closed after sending data
// to deal with various disconnection scenarios and eschew holding
// open idle connections en masse
i.Close()
return nil return nil
} }
@ -163,7 +171,7 @@ func (i *Instrumental) SampleConfig() string {
} }
func (i *Instrumental) authenticate(conn net.Conn) error { func (i *Instrumental) authenticate(conn net.Conn) error {
_, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken) _, err := fmt.Fprintf(conn, HandshakeFormat, i.ApiToken)
if err != nil { if err != nil {
return err return err
} }

View File

@ -24,7 +24,6 @@ func TestWrite(t *testing.T) {
ApiToken: "abc123token", ApiToken: "abc123token",
Prefix: "my.prefix", Prefix: "my.prefix",
} }
i.Connect()
// Default to gauge // Default to gauge
m1, _ := telegraf.NewMetric( m1, _ := telegraf.NewMetric(
@ -40,10 +39,8 @@ func TestWrite(t *testing.T) {
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
// Simulate a connection close and reconnect.
metrics := []telegraf.Metric{m1, m2} metrics := []telegraf.Metric{m1, m2}
i.Write(metrics) i.Write(metrics)
i.Close()
// Counter and Histogram are increments // Counter and Histogram are increments
m3, _ := telegraf.NewMetric( m3, _ := telegraf.NewMetric(
@ -70,7 +67,6 @@ func TestWrite(t *testing.T) {
i.Write(metrics) i.Write(metrics)
wg.Wait() wg.Wait()
i.Close()
} }
func TCPServer(t *testing.T, wg *sync.WaitGroup) { func TCPServer(t *testing.T, wg *sync.WaitGroup) {
@ -82,10 +78,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tp := textproto.NewReader(reader) tp := textproto.NewReader(reader)
hello, _ := tp.ReadLine() hello, _ := tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello) assert.Equal(t, "hello version go/telegraf/1.1", hello)
auth, _ := tp.ReadLine() auth, _ := tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth) assert.Equal(t, "authenticate abc123token", auth)
conn.Write([]byte("ok\nok\n")) conn.Write([]byte("ok\nok\n"))
data1, _ := tp.ReadLine() data1, _ := tp.ReadLine()
@ -99,10 +94,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tp = textproto.NewReader(reader) tp = textproto.NewReader(reader)
hello, _ = tp.ReadLine() hello, _ = tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello) assert.Equal(t, "hello version go/telegraf/1.1", hello)
auth, _ = tp.ReadLine() auth, _ = tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth) assert.Equal(t, "authenticate abc123token", auth)
conn.Write([]byte("ok\nok\n")) conn.Write([]byte("ok\nok\n"))
data3, _ := tp.ReadLine() data3, _ := tp.ReadLine()

View File

@ -153,8 +153,7 @@ func (l *Librato) Description() string {
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) { func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
gauges := []*Gauge{} gauges := []*Gauge{}
serializer := graphite.GraphiteSerializer{Template: l.Template} bucket := graphite.SerializeBucketName(m.Name(), m.Tags(), l.Template, "")
bucket := serializer.SerializeBucketName(m.Name(), m.Tags())
for fieldName, value := range m.Fields() { for fieldName, value := range m.Fields() {
gauge := &Gauge{ gauge := &Gauge{
Name: graphite.InsertField(bucket, fieldName), Name: graphite.InsertField(bucket, fieldName),

View File

@ -5,7 +5,7 @@ import (
"log" "log"
"net/http" "net/http"
"regexp" "regexp"
"strings" "sync"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
@ -13,7 +13,7 @@ import (
) )
var ( var (
sanitizedChars = strings.NewReplacer("/", "_", "@", "_", " ", "_", "-", "_", ".", "_") invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
// Prometheus metric names must match this regex // Prometheus metric names must match this regex
// see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels // see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
@ -26,6 +26,10 @@ var (
type PrometheusClient struct { type PrometheusClient struct {
Listen string Listen string
metrics map[string]prometheus.Metric
sync.Mutex
} }
var sampleConfig = ` var sampleConfig = `
@ -34,6 +38,7 @@ var sampleConfig = `
` `
func (p *PrometheusClient) Start() error { func (p *PrometheusClient) Start() error {
prometheus.MustRegister(p)
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
// recovering from panic here because there is no way to stop a // recovering from panic here because there is no way to stop a
@ -78,19 +83,39 @@ func (p *PrometheusClient) Description() string {
return "Configuration for the Prometheus client to spawn" return "Configuration for the Prometheus client to spawn"
} }
// Implements prometheus.Collector
func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
}
// Implements prometheus.Collector
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
p.Lock()
defer p.Unlock()
for _, m := range p.metrics {
ch <- m
}
}
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
p.Lock()
defer p.Unlock()
p.metrics = make(map[string]prometheus.Metric)
if len(metrics) == 0 { if len(metrics) == 0 {
return nil return nil
} }
for _, point := range metrics { for _, point := range metrics {
key := point.Name() key := point.Name()
key = sanitizedChars.Replace(key) key = invalidNameCharRE.ReplaceAllString(key, "_")
var labels []string var labels []string
l := prometheus.Labels{} l := prometheus.Labels{}
for k, v := range point.Tags() { for k, v := range point.Tags() {
k = sanitizedChars.Replace(k) k = invalidNameCharRE.ReplaceAllString(k, "_")
if len(k) == 0 { if len(k) == 0 {
continue continue
} }
@ -111,7 +136,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
} }
// sanitize the measurement name // sanitize the measurement name
n = sanitizedChars.Replace(n) n = invalidNameCharRE.ReplaceAllString(n, "_")
var mname string var mname string
if n == "value" { if n == "value" {
mname = key mname = key
@ -124,45 +149,23 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
continue continue
} }
mVec := prometheus.NewUntypedVec( desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
prometheus.UntypedOpts{ var metric prometheus.Metric
Name: mname, var err error
Help: "Telegraf collected metric",
},
labels,
)
collector, err := prometheus.RegisterOrGet(mVec)
if err != nil {
log.Printf("prometheus_client: Metric failed to register with prometheus, %s", err)
continue
}
mVec, ok := collector.(*prometheus.UntypedVec)
if !ok {
continue
}
switch val := val.(type) { switch val := val.(type) {
case int64: case int64:
m, err := mVec.GetMetricWith(l) metric, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, float64(val))
if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
continue
}
m.Set(float64(val))
case float64: case float64:
m, err := mVec.GetMetricWith(l) metric, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, val)
if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
continue
}
m.Set(val)
default: default:
continue continue
} }
if err != nil {
log.Printf("ERROR creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
}
p.metrics[desc.String()] = metric
} }
} }
return nil return nil

View File

@ -10,22 +10,23 @@ import (
const DEFAULT_TEMPLATE = "host.tags.measurement.field" const DEFAULT_TEMPLATE = "host.tags.measurement.field"
var fieldDeleter = strings.NewReplacer(".FIELDNAME", "", "FIELDNAME.", "") var (
fieldDeleter = strings.NewReplacer(".FIELDNAME", "", "FIELDNAME.", "")
sanitizedChars = strings.NewReplacer("/", "-", "@", "-", "*", "-", " ", "_", "..", ".")
)
type GraphiteSerializer struct { type GraphiteSerializer struct {
Prefix string Prefix string
Template string Template string
} }
var sanitizedChars = strings.NewReplacer("/", "-", "@", "-", "*", "-", " ", "_", "..", ".")
func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) { func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) {
out := []string{} out := []string{}
// Convert UnixNano to Unix timestamps // Convert UnixNano to Unix timestamps
timestamp := metric.UnixNano() / 1000000000 timestamp := metric.UnixNano() / 1000000000
bucket := s.SerializeBucketName(metric.Name(), metric.Tags()) bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix)
if bucket == "" { if bucket == "" {
return out, nil return out, nil
} }
@ -51,12 +52,14 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error)
// FIELDNAME. It is up to the user to replace this. This is so that // FIELDNAME. It is up to the user to replace this. This is so that
// SerializeBucketName can be called just once per measurement, rather than // SerializeBucketName can be called just once per measurement, rather than
// once per field. See GraphiteSerializer.InsertField() function. // once per field. See GraphiteSerializer.InsertField() function.
func (s *GraphiteSerializer) SerializeBucketName( func SerializeBucketName(
measurement string, measurement string,
tags map[string]string, tags map[string]string,
template string,
prefix string,
) string { ) string {
if s.Template == "" { if template == "" {
s.Template = DEFAULT_TEMPLATE template = DEFAULT_TEMPLATE
} }
tagsCopy := make(map[string]string) tagsCopy := make(map[string]string)
for k, v := range tags { for k, v := range tags {
@ -64,7 +67,7 @@ func (s *GraphiteSerializer) SerializeBucketName(
} }
var out []string var out []string
templateParts := strings.Split(s.Template, ".") templateParts := strings.Split(template, ".")
for _, templatePart := range templateParts { for _, templatePart := range templateParts {
switch templatePart { switch templatePart {
case "measurement": case "measurement":
@ -96,10 +99,10 @@ func (s *GraphiteSerializer) SerializeBucketName(
return "" return ""
} }
if s.Prefix == "" { if prefix == "" {
return sanitizedChars.Replace(strings.Join(out, ".")) return sanitizedChars.Replace(strings.Join(out, "."))
} }
return sanitizedChars.Replace(s.Prefix + "." + strings.Join(out, ".")) return sanitizedChars.Replace(prefix + "." + strings.Join(out, "."))
} }
// InsertField takes the bucket string from SerializeBucketName and replaces the // InsertField takes the bucket string from SerializeBucketName and replaces the

View File

@ -225,8 +225,7 @@ func TestSerializeBucketNameNoHost(t *testing.T) {
m, err := telegraf.NewMetric("cpu", tags, fields, now) m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := GraphiteSerializer{} mS := SerializeBucketName(m.Name(), m.Tags(), "", "")
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "cpu0.us-west-2.cpu.FIELDNAME" expS := "cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)
@ -240,8 +239,7 @@ func TestSerializeBucketNameHost(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := GraphiteSerializer{} mS := SerializeBucketName(m.Name(), m.Tags(), "", "")
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "localhost.cpu0.us-west-2.cpu.FIELDNAME" expS := "localhost.cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)
@ -255,8 +253,7 @@ func TestSerializeBucketNamePrefix(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := GraphiteSerializer{Prefix: "prefix"} mS := SerializeBucketName(m.Name(), m.Tags(), "", "prefix")
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "prefix.localhost.cpu0.us-west-2.cpu.FIELDNAME" expS := "prefix.localhost.cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)
@ -270,8 +267,7 @@ func TestTemplate1(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := GraphiteSerializer{Template: template1} mS := SerializeBucketName(m.Name(), m.Tags(), template1, "")
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "cpu0.us-west-2.localhost.cpu.FIELDNAME" expS := "cpu0.us-west-2.localhost.cpu.FIELDNAME"
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)
@ -285,8 +281,7 @@ func TestTemplate2(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := GraphiteSerializer{Template: template2} mS := SerializeBucketName(m.Name(), m.Tags(), template2, "")
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "localhost.cpu.FIELDNAME" expS := "localhost.cpu.FIELDNAME"
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)
@ -300,8 +295,7 @@ func TestTemplate3(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := GraphiteSerializer{Template: template3} mS := SerializeBucketName(m.Name(), m.Tags(), template3, "")
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "localhost.cpu0.us-west-2.FIELDNAME" expS := "localhost.cpu0.us-west-2.FIELDNAME"
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)
@ -315,8 +309,7 @@ func TestTemplate4(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := GraphiteSerializer{Template: template4} mS := SerializeBucketName(m.Name(), m.Tags(), template4, "")
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "localhost.cpu0.us-west-2.cpu" expS := "localhost.cpu0.us-west-2.cpu"
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)
@ -330,8 +323,7 @@ func TestTemplate5(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := GraphiteSerializer{Template: template5} mS := SerializeBucketName(m.Name(), m.Tags(), template5, "")
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "localhost.us-west-2.cpu0.cpu.FIELDNAME" expS := "localhost.us-west-2.cpu0.cpu.FIELDNAME"
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)
@ -345,8 +337,7 @@ func TestTemplate6(t *testing.T) {
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now) m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := GraphiteSerializer{Template: template6} mS := SerializeBucketName(m.Name(), m.Tags(), template6, "")
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "localhost.cpu0.us-west-2.cpu.FIELDNAME" expS := "localhost.cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)

View File

@ -69,6 +69,8 @@ exit_if_fail telegraf -config $tmpdir/config.toml \
-test -input-filter cpu:mem -test -input-filter cpu:mem
cat $GOPATH/bin/telegraf | gzip > $CIRCLE_ARTIFACTS/telegraf.gz cat $GOPATH/bin/telegraf | gzip > $CIRCLE_ARTIFACTS/telegraf.gz
go build -o telegraf-race -race -ldflags "-X main.version=${VERSION}-RACE" cmd/telegraf/telegraf.go
cat telegraf-race | gzip > $CIRCLE_ARTIFACTS/telegraf-race.gz
eval "git describe --exact-match HEAD" eval "git describe --exact-match HEAD"
if [ $? -eq 0 ]; then if [ $? -eq 0 ]; then

View File

@ -37,6 +37,10 @@ chmod 755 $LOG_DIR
if [[ -L /etc/init.d/telegraf ]]; then if [[ -L /etc/init.d/telegraf ]]; then
rm -f /etc/init.d/telegraf rm -f /etc/init.d/telegraf
fi fi
# Remove legacy symlink, if it exists
if [[ -L /etc/systemd/system/telegraf.service ]]; then
rm -f /etc/systemd/system/telegraf.service
fi
# Add defaults file, if it doesn't exist # Add defaults file, if it doesn't exist
if [[ ! -f /etc/default/telegraf ]]; then if [[ ! -f /etc/default/telegraf ]]; then

View File

@ -15,4 +15,3 @@ KillMode=control-group
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target
Alias=telegraf.service