Update with new code from master

This commit is contained in:
Dennis Bellinger 2016-07-18 16:49:10 -04:00
commit 35f5e62a09
25 changed files with 564 additions and 450 deletions

View File

@ -1,7 +1,15 @@
## v1.0 [unreleased] ## v1.0 [unreleased]
## v1.0 beta 3 [2016-07-18]
### Release Notes ### Release Notes
**Breaking Change**: Aerospike main server node measurements have been renamed
aerospike_node. Aerospike namespace measurements have been renamed to
aerospike_namespace. They will also now be tagged with the node_name
that they correspond to. This has been done to differentiate measurements
that pertain to node vs. namespace statistics.
**Breaking Change**: users of github_webhooks must change to the new **Breaking Change**: users of github_webhooks must change to the new
`[[inputs.webhooks]]` plugin. `[[inputs.webhooks]]` plugin.
@ -30,11 +38,16 @@ should now look like:
- [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez! - [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez!
- [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin. - [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin.
- [#1408](https://github.com/influxdata/telegraf/pull/1408): mandrill webhook plugin.
- [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests. - [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests.
- [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin. - [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin.
- [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD. - [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD.
- [#1369](https://github.com/influxdata/telegraf/pull/1480): add ability to read redis from a socket.
- [#1387](https://github.com/influxdata/telegraf/pull/1387): **Breaking Change** - Redis `role` tag renamed to `replication_role` to avoid global_tags override - [#1387](https://github.com/influxdata/telegraf/pull/1387): **Breaking Change** - Redis `role` tag renamed to `replication_role` to avoid global_tags override
- [#1437](https://github.com/influxdata/telegraf/pull/1437): Fetching Galera status metrics in MySQL - [#1437](https://github.com/influxdata/telegraf/pull/1437): Fetching Galera status metrics in MySQL
- [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib.
- [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin.
- [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag.
### Bugfixes ### Bugfixes
@ -50,6 +63,10 @@ should now look like:
- [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load. - [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load.
- [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior - [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior
- [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues. - [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues.
- [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix.
- [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin
- [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config.
- [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors.
## v1.0 beta 2 [2016-06-21] ## v1.0 beta 2 [2016-06-21]

3
Godeps
View File

@ -1,5 +1,6 @@
github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9 github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9
github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc
github.com/aerospike/aerospike-client-go 45863b7fd8640dc12f7fdd397104d97e1986f25a
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
@ -45,11 +46,13 @@ github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8
github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f
github.com/shirou/gopsutil 586bb697f3ec9f8ec08ffefe18f521a64534037c github.com/shirou/gopsutil 586bb697f3ec9f8ec08ffefe18f521a64534037c
github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d
github.com/sparrc/aerospike-client-go d4bb42d2c2d39dae68e054116f4538af189e05d5
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2 github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2
github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866 github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
github.com/yuin/gopher-lua bf3808abd44b1e55143a2d7f08571aaa80db1808
github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363
golang.org/x/crypto 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3 golang.org/x/crypto 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3
golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172 golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172

View File

@ -20,12 +20,12 @@ new plugins.
### Linux deb and rpm Packages: ### Linux deb and rpm Packages:
Latest: Latest:
* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta2_amd64.deb * https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta3_amd64.deb
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta2.x86_64.rpm * https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta3.x86_64.rpm
Latest (arm): Latest (arm):
* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta2_armhf.deb * https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta3_armhf.deb
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta2.armhf.rpm * https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta3.armhf.rpm
##### Package Instructions: ##### Package Instructions:
@ -46,14 +46,14 @@ to use this repo to install & update telegraf.
### Linux tarballs: ### Linux tarballs:
Latest: Latest:
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta2_linux_amd64.tar.gz * https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_linux_amd64.tar.gz
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta2_linux_i386.tar.gz * https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_linux_i386.tar.gz
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta2_linux_armhf.tar.gz * https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_linux_armhf.tar.gz
### FreeBSD tarball: ### FreeBSD tarball:
Latest: Latest:
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta2_freebsd_amd64.tar.gz * https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_freebsd_amd64.tar.gz
### Ansible Role: ### Ansible Role:
@ -69,7 +69,7 @@ brew install telegraf
### Windows Binaries (EXPERIMENTAL) ### Windows Binaries (EXPERIMENTAL)
Latest: Latest:
* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta2_windows_amd64.zip * https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_windows_amd64.zip
### From Source: ### From Source:
@ -219,6 +219,7 @@ Telegraf can also collect metrics via the following service plugins:
* [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer) * [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer)
* [webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks) * [webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks)
* [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github) * [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github)
* [mandrill](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/mandrill)
* [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar) * [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar)
* [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer) * [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer)

View File

@ -1,104 +1,19 @@
package aerospike package aerospike
import ( import (
"bytes"
"encoding/binary"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"net" "net"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
as "github.com/sparrc/aerospike-client-go"
) )
const (
MSG_HEADER_SIZE = 8
MSG_TYPE = 1 // Info is 1
MSG_VERSION = 2
)
var (
STATISTICS_COMMAND = []byte("statistics\n")
NAMESPACES_COMMAND = []byte("namespaces\n")
)
type aerospikeMessageHeader struct {
Version uint8
Type uint8
DataLen [6]byte
}
type aerospikeMessage struct {
aerospikeMessageHeader
Data []byte
}
// Taken from aerospike-client-go/types/message.go
func (msg *aerospikeMessage) Serialize() []byte {
msg.DataLen = msgLenToBytes(int64(len(msg.Data)))
buf := bytes.NewBuffer([]byte{})
binary.Write(buf, binary.BigEndian, msg.aerospikeMessageHeader)
binary.Write(buf, binary.BigEndian, msg.Data[:])
return buf.Bytes()
}
type aerospikeInfoCommand struct {
msg *aerospikeMessage
}
// Taken from aerospike-client-go/info.go
func (nfo *aerospikeInfoCommand) parseMultiResponse() (map[string]string, error) {
responses := make(map[string]string)
offset := int64(0)
begin := int64(0)
dataLen := int64(len(nfo.msg.Data))
// Create reusable StringBuilder for performance.
for offset < dataLen {
b := nfo.msg.Data[offset]
if b == '\t' {
name := nfo.msg.Data[begin:offset]
offset++
begin = offset
// Parse field value.
for offset < dataLen {
if nfo.msg.Data[offset] == '\n' {
break
}
offset++
}
if offset > begin {
value := nfo.msg.Data[begin:offset]
responses[string(name)] = string(value)
} else {
responses[string(name)] = ""
}
offset++
begin = offset
} else if b == '\n' {
if offset > begin {
name := nfo.msg.Data[begin:offset]
responses[string(name)] = ""
}
offset++
begin = offset
} else {
offset++
}
}
if offset > begin {
name := nfo.msg.Data[begin:offset]
responses[string(name)] = ""
}
return responses, nil
}
type Aerospike struct { type Aerospike struct {
Servers []string Servers []string
} }
@ -115,7 +30,7 @@ func (a *Aerospike) SampleConfig() string {
} }
func (a *Aerospike) Description() string { func (a *Aerospike) Description() string {
return "Read stats from an aerospike server" return "Read stats from aerospike server(s)"
} }
func (a *Aerospike) Gather(acc telegraf.Accumulator) error { func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
@ -124,214 +39,90 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
} }
var wg sync.WaitGroup var wg sync.WaitGroup
errChan := errchan.New(len(a.Servers))
var outerr error wg.Add(len(a.Servers))
for _, server := range a.Servers { for _, server := range a.Servers {
wg.Add(1) go func(serv string) {
go func(server string) {
defer wg.Done() defer wg.Done()
outerr = a.gatherServer(server, acc) errChan.C <- a.gatherServer(serv, acc)
}(server) }(server)
} }
wg.Wait() wg.Wait()
return outerr return errChan.Error()
} }
func (a *Aerospike) gatherServer(host string, acc telegraf.Accumulator) error { func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) error {
aerospikeInfo, err := getMap(STATISTICS_COMMAND, host) host, port, err := net.SplitHostPort(hostport)
if err != nil { if err != nil {
return fmt.Errorf("Aerospike info failed: %s", err) return err
} }
readAerospikeStats(aerospikeInfo, acc, host, "")
namespaces, err := getList(NAMESPACES_COMMAND, host) iport, err := strconv.Atoi(port)
if err != nil { if err != nil {
return fmt.Errorf("Aerospike namespace list failed: %s", err) iport = 3000
} }
for ix := range namespaces {
nsInfo, err := getMap([]byte("namespace/"+namespaces[ix]+"\n"), host) c, err := as.NewClient(host, iport)
if err != nil { if err != nil {
return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err) return err
}
defer c.Close()
nodes := c.GetNodes()
for _, n := range nodes {
tags := map[string]string{
"node_name": n.GetName(),
"aerospike_host": hostport,
}
fields := make(map[string]interface{})
stats, err := as.RequestNodeStats(n)
if err != nil {
return err
}
for k, v := range stats {
if iv, err := strconv.ParseInt(v, 10, 64); err == nil {
fields[strings.Replace(k, "-", "_", -1)] = iv
}
}
acc.AddFields("aerospike_node", fields, tags, time.Now())
info, err := as.RequestNodeInfo(n, "namespaces")
if err != nil {
return err
}
namespaces := strings.Split(info["namespaces"], ";")
for _, namespace := range namespaces {
nTags := copyTags(tags)
nTags["namespace"] = namespace
nFields := make(map[string]interface{})
info, err := as.RequestNodeInfo(n, "namespace/"+namespace)
if err != nil {
continue
}
stats := strings.Split(info["namespace/"+namespace], ";")
for _, stat := range stats {
parts := strings.Split(stat, "=")
if len(parts) < 2 {
continue
}
if iv, err := strconv.ParseInt(parts[1], 10, 64); err == nil {
nFields[strings.Replace(parts[0], "-", "_", -1)] = iv
}
}
acc.AddFields("aerospike_namespace", nFields, nTags, time.Now())
} }
readAerospikeStats(nsInfo, acc, host, namespaces[ix])
} }
return nil return nil
} }
func getMap(key []byte, host string) (map[string]string, error) { func copyTags(m map[string]string) map[string]string {
data, err := get(key, host) out := make(map[string]string)
if err != nil { for k, v := range m {
return nil, fmt.Errorf("Failed to get data: %s", err) out[k] = v
} }
parsed, err := unmarshalMapInfo(data, string(key)) return out
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal data: %s", err)
}
return parsed, nil
}
func getList(key []byte, host string) ([]string, error) {
data, err := get(key, host)
if err != nil {
return nil, fmt.Errorf("Failed to get data: %s", err)
}
parsed, err := unmarshalListInfo(data, string(key))
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal data: %s", err)
}
return parsed, nil
}
func get(key []byte, host string) (map[string]string, error) {
var err error
var data map[string]string
asInfo := &aerospikeInfoCommand{
msg: &aerospikeMessage{
aerospikeMessageHeader: aerospikeMessageHeader{
Version: uint8(MSG_VERSION),
Type: uint8(MSG_TYPE),
DataLen: msgLenToBytes(int64(len(key))),
},
Data: key,
},
}
cmd := asInfo.msg.Serialize()
addr, err := net.ResolveTCPAddr("tcp", host)
if err != nil {
return data, fmt.Errorf("Lookup failed for '%s': %s", host, err)
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return data, fmt.Errorf("Connection failed for '%s': %s", host, err)
}
defer conn.Close()
_, err = conn.Write(cmd)
if err != nil {
return data, fmt.Errorf("Failed to send to '%s': %s", host, err)
}
msgHeader := bytes.NewBuffer(make([]byte, MSG_HEADER_SIZE))
_, err = readLenFromConn(conn, msgHeader.Bytes(), MSG_HEADER_SIZE)
if err != nil {
return data, fmt.Errorf("Failed to read header: %s", err)
}
err = binary.Read(msgHeader, binary.BigEndian, &asInfo.msg.aerospikeMessageHeader)
if err != nil {
return data, fmt.Errorf("Failed to unmarshal header: %s", err)
}
msgLen := msgLenFromBytes(asInfo.msg.aerospikeMessageHeader.DataLen)
if int64(len(asInfo.msg.Data)) != msgLen {
asInfo.msg.Data = make([]byte, msgLen)
}
_, err = readLenFromConn(conn, asInfo.msg.Data, len(asInfo.msg.Data))
if err != nil {
return data, fmt.Errorf("Failed to read from connection to '%s': %s", host, err)
}
data, err = asInfo.parseMultiResponse()
if err != nil {
return data, fmt.Errorf("Failed to parse response from '%s': %s", host, err)
}
return data, err
}
func readAerospikeStats(
stats map[string]string,
acc telegraf.Accumulator,
host string,
namespace string,
) {
fields := make(map[string]interface{})
tags := map[string]string{
"aerospike_host": host,
"namespace": "_service",
}
if namespace != "" {
tags["namespace"] = namespace
}
for key, value := range stats {
// We are going to ignore all string based keys
val, err := strconv.ParseInt(value, 10, 64)
if err == nil {
if strings.Contains(key, "-") {
key = strings.Replace(key, "-", "_", -1)
}
fields[key] = val
}
}
acc.AddFields("aerospike", fields, tags)
}
func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) {
key = strings.TrimSuffix(key, "\n")
res := map[string]string{}
v, exists := infoMap[key]
if !exists {
return res, fmt.Errorf("Key '%s' missing from info", key)
}
values := strings.Split(v, ";")
for i := range values {
kv := strings.Split(values[i], "=")
if len(kv) > 1 {
res[kv[0]] = kv[1]
}
}
return res, nil
}
func unmarshalListInfo(infoMap map[string]string, key string) ([]string, error) {
key = strings.TrimSuffix(key, "\n")
v, exists := infoMap[key]
if !exists {
return []string{}, fmt.Errorf("Key '%s' missing from info", key)
}
values := strings.Split(v, ";")
return values, nil
}
func readLenFromConn(c net.Conn, buffer []byte, length int) (total int, err error) {
var r int
for total < length {
r, err = c.Read(buffer[total:length])
total += r
if err != nil {
break
}
}
return
}
// Taken from aerospike-client-go/types/message.go
func msgLenToBytes(DataLen int64) [6]byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(DataLen))
res := [6]byte{}
copy(res[:], b[2:])
return res
}
// Taken from aerospike-client-go/types/message.go
func msgLenFromBytes(buf [6]byte) int64 {
nbytes := append([]byte{0, 0}, buf[:]...)
DataLen := binary.BigEndian.Uint64(nbytes)
return int64(DataLen)
} }
func init() { func init() {

View File

@ -1,7 +1,6 @@
package aerospike package aerospike
import ( import (
"reflect"
"testing" "testing"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -22,84 +21,30 @@ func TestAerospikeStatistics(t *testing.T) {
err := a.Gather(&acc) err := a.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, acc.HasMeasurement("aerospike_node"))
assert.True(t, acc.HasMeasurement("aerospike_namespace"))
assert.True(t, acc.HasIntField("aerospike_node", "batch_error"))
} }
func TestAerospikeMsgLenFromToBytes(t *testing.T) { func TestAerospikeStatisticsPartialErr(t *testing.T) {
var i int64 = 8 if testing.Short() {
assert.True(t, i == msgLenFromBytes(msgLenToBytes(i))) t.Skip("Skipping integration test in short mode")
} }
a := &Aerospike{
Servers: []string{
testutil.GetLocalHost() + ":3000",
testutil.GetLocalHost() + ":9999",
},
}
func TestReadAerospikeStatsNoNamespace(t *testing.T) {
// Also test for re-writing
var acc testutil.Accumulator var acc testutil.Accumulator
stats := map[string]string{
"stat-write-errs": "12345",
"stat_read_reqs": "12345",
}
readAerospikeStats(stats, &acc, "host1", "")
fields := map[string]interface{}{ err := a.Gather(&acc)
"stat_write_errs": int64(12345), require.Error(t, err)
"stat_read_reqs": int64(12345),
} assert.True(t, acc.HasMeasurement("aerospike_node"))
tags := map[string]string{ assert.True(t, acc.HasMeasurement("aerospike_namespace"))
"aerospike_host": "host1", assert.True(t, acc.HasIntField("aerospike_node", "batch_error"))
"namespace": "_service",
}
acc.AssertContainsTaggedFields(t, "aerospike", fields, tags)
}
func TestReadAerospikeStatsNamespace(t *testing.T) {
var acc testutil.Accumulator
stats := map[string]string{
"stat_write_errs": "12345",
"stat_read_reqs": "12345",
}
readAerospikeStats(stats, &acc, "host1", "test")
fields := map[string]interface{}{
"stat_write_errs": int64(12345),
"stat_read_reqs": int64(12345),
}
tags := map[string]string{
"aerospike_host": "host1",
"namespace": "test",
}
acc.AssertContainsTaggedFields(t, "aerospike", fields, tags)
}
func TestAerospikeUnmarshalList(t *testing.T) {
i := map[string]string{
"test": "one;two;three",
}
expected := []string{"one", "two", "three"}
list, err := unmarshalListInfo(i, "test2")
assert.True(t, err != nil)
list, err = unmarshalListInfo(i, "test")
assert.True(t, err == nil)
equal := true
for ix := range expected {
if list[ix] != expected[ix] {
equal = false
break
}
}
assert.True(t, equal)
}
func TestAerospikeUnmarshalMap(t *testing.T) {
i := map[string]string{
"test": "key1=value1;key2=value2",
}
expected := map[string]string{
"key1": "value1",
"key2": "value2",
}
m, err := unmarshalMapInfo(i, "test")
assert.True(t, err == nil)
assert.True(t, reflect.DeepEqual(m, expected))
} }

View File

@ -148,7 +148,7 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) {
tokens := parseJmxMetricRequest(r.(map[string]interface{})["mbean"].(string)) tokens := parseJmxMetricRequest(r.(map[string]interface{})["mbean"].(string))
// Requests with wildcards for keyspace or table names will return nested // Requests with wildcards for keyspace or table names will return nested
// maps in the json response // maps in the json response
if tokens["type"] == "Table" && (tokens["keyspace"] == "*" || if (tokens["type"] == "Table" || tokens["type"] == "ColumnFamily") && (tokens["keyspace"] == "*" ||
tokens["scope"] == "*") { tokens["scope"] == "*") {
if valuesMap, ok := out["value"]; ok { if valuesMap, ok := out["value"]; ok {
for k, v := range valuesMap.(map[string]interface{}) { for k, v := range valuesMap.(map[string]interface{}) {

View File

@ -48,8 +48,6 @@ type Exec struct {
parser parsers.Parser parser parsers.Parser
wg sync.WaitGroup
runner Runner runner Runner
errChan chan error errChan chan error
} }
@ -119,8 +117,8 @@ func (c CommandRunner) Run(
return out.Bytes(), nil return out.Bytes(), nil
} }
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) {
defer e.wg.Done() defer wg.Done()
out, err := e.runner.Run(e, command, acc) out, err := e.runner.Run(e, command, acc)
if err != nil { if err != nil {
@ -151,6 +149,7 @@ func (e *Exec) SetParser(parser parsers.Parser) {
} }
func (e *Exec) Gather(acc telegraf.Accumulator) error { func (e *Exec) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
// Legacy single command support // Legacy single command support
if e.Command != "" { if e.Command != "" {
e.Commands = append(e.Commands, e.Command) e.Commands = append(e.Commands, e.Command)
@ -190,11 +189,11 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
errChan := errchan.New(len(commands)) errChan := errchan.New(len(commands))
e.errChan = errChan.C e.errChan = errChan.C
e.wg.Add(len(commands)) wg.Add(len(commands))
for _, command := range commands { for _, command := range commands {
go e.ProcessCommand(command, acc) go e.ProcessCommand(command, acc, &wg)
} }
e.wg.Wait() wg.Wait()
return errChan.Error() return errChan.Error()
} }

View File

@ -53,9 +53,15 @@ var (
) )
type Parser struct { type Parser struct {
Patterns []string Patterns []string
// namedPatterns is a list of internally-assigned names to the patterns
// specified by the user in Patterns.
// They will look like:
// GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc.
namedPatterns []string
CustomPatterns string CustomPatterns string
CustomPatternFiles []string CustomPatternFiles []string
Measurement string
// typeMap is a map of patterns -> capture name -> modifier, // typeMap is a map of patterns -> capture name -> modifier,
// ie, { // ie, {
@ -97,13 +103,24 @@ func (p *Parser) Compile() error {
return err return err
} }
p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns // Give Patterns fake names so that they can be treated as named
// "custom patterns"
p.namedPatterns = make([]string, len(p.Patterns))
for i, pattern := range p.Patterns {
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.namedPatterns[i] = "%{" + name + "}"
}
// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse
// them together as the same type of pattern.
p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns
if len(p.CustomPatterns) != 0 { if len(p.CustomPatterns) != 0 {
scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns)) scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns))
p.addCustomPatterns(scanner) p.addCustomPatterns(scanner)
} }
// Parse any custom pattern files supplied.
for _, filename := range p.CustomPatternFiles { for _, filename := range p.CustomPatternFiles {
file, err := os.Open(filename) file, err := os.Open(filename)
if err != nil { if err != nil {
@ -114,6 +131,10 @@ func (p *Parser) Compile() error {
p.addCustomPatterns(scanner) p.addCustomPatterns(scanner)
} }
if p.Measurement == "" {
p.Measurement = "logparser_grok"
}
return p.compileCustomPatterns() return p.compileCustomPatterns()
} }
@ -122,7 +143,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
var values map[string]string var values map[string]string
// the matching pattern string // the matching pattern string
var patternName string var patternName string
for _, pattern := range p.Patterns { for _, pattern := range p.namedPatterns {
if values, err = p.g.Parse(pattern, line); err != nil { if values, err = p.g.Parse(pattern, line); err != nil {
return nil, err return nil, err
} }
@ -215,7 +236,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
} }
} }
return telegraf.NewMetric("logparser_grok", tags, fields, p.tsModder.tsMod(timestamp)) return telegraf.NewMetric(p.Measurement, tags, fields, p.tsModder.tsMod(timestamp))
} }
func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) { func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) {

View File

@ -83,6 +83,31 @@ func Benchmark_ParseLine_CustomPattern(b *testing.B) {
benchM = m benchM = m
} }
func TestMeasurementName(t *testing.T) {
p := &Parser{
Measurement: "my_web_log",
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
}
assert.NoError(t, p.Compile())
// Parse an influxdb POST request
m, err := p.ParseLine(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`)
require.NotNil(t, m)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"resp_bytes": int64(2326),
"auth": "frank",
"client_ip": "127.0.0.1",
"http_version": float64(1.0),
"ident": "user-identifier",
"request": "/apache_pb.gif",
},
m.Fields())
assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
assert.Equal(t, "my_web_log", m.Name())
}
func TestBuiltinInfluxdbHttpd(t *testing.T) { func TestBuiltinInfluxdbHttpd(t *testing.T) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"}, Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"},
@ -98,7 +123,6 @@ func TestBuiltinInfluxdbHttpd(t *testing.T) {
"resp_bytes": int64(0), "resp_bytes": int64(0),
"auth": "-", "auth": "-",
"client_ip": "::1", "client_ip": "::1",
"resp_code": int64(204),
"http_version": float64(1.1), "http_version": float64(1.1),
"ident": "-", "ident": "-",
"referrer": "-", "referrer": "-",
@ -107,7 +131,7 @@ func TestBuiltinInfluxdbHttpd(t *testing.T) {
"agent": "InfluxDBClient", "agent": "InfluxDBClient",
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "POST"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "POST", "resp_code": "204"}, m.Tags())
// Parse an influxdb GET request // Parse an influxdb GET request
m, err = p.ParseLine(`[httpd] ::1 - - [14/Jun/2016:12:10:02 +0100] "GET /query?db=telegraf&q=SELECT+bytes%2Cresponse_time_us+FROM+logparser_grok+WHERE+http_method+%3D+%27GET%27+AND+response_time_us+%3E+0+AND+time+%3E+now%28%29+-+1h HTTP/1.1" 200 578 "http://localhost:8083/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.84 Safari/537.36" 8a3806f1-3220-11e6-8006-000000000000 988`) m, err = p.ParseLine(`[httpd] ::1 - - [14/Jun/2016:12:10:02 +0100] "GET /query?db=telegraf&q=SELECT+bytes%2Cresponse_time_us+FROM+logparser_grok+WHERE+http_method+%3D+%27GET%27+AND+response_time_us+%3E+0+AND+time+%3E+now%28%29+-+1h HTTP/1.1" 200 578 "http://localhost:8083/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.84 Safari/537.36" 8a3806f1-3220-11e6-8006-000000000000 988`)
@ -118,7 +142,6 @@ func TestBuiltinInfluxdbHttpd(t *testing.T) {
"resp_bytes": int64(578), "resp_bytes": int64(578),
"auth": "-", "auth": "-",
"client_ip": "::1", "client_ip": "::1",
"resp_code": int64(200),
"http_version": float64(1.1), "http_version": float64(1.1),
"ident": "-", "ident": "-",
"referrer": "http://localhost:8083/", "referrer": "http://localhost:8083/",
@ -127,7 +150,7 @@ func TestBuiltinInfluxdbHttpd(t *testing.T) {
"agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.84 Safari/537.36", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.84 Safari/537.36",
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "GET"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
} }
// common log format // common log format
@ -147,13 +170,12 @@ func TestBuiltinCommonLogFormat(t *testing.T) {
"resp_bytes": int64(2326), "resp_bytes": int64(2326),
"auth": "frank", "auth": "frank",
"client_ip": "127.0.0.1", "client_ip": "127.0.0.1",
"resp_code": int64(200),
"http_version": float64(1.0), "http_version": float64(1.0),
"ident": "user-identifier", "ident": "user-identifier",
"request": "/apache_pb.gif", "request": "/apache_pb.gif",
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "GET"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
} }
// combined log format // combined log format
@ -173,7 +195,6 @@ func TestBuiltinCombinedLogFormat(t *testing.T) {
"resp_bytes": int64(2326), "resp_bytes": int64(2326),
"auth": "frank", "auth": "frank",
"client_ip": "127.0.0.1", "client_ip": "127.0.0.1",
"resp_code": int64(200),
"http_version": float64(1.0), "http_version": float64(1.0),
"ident": "user-identifier", "ident": "user-identifier",
"request": "/apache_pb.gif", "request": "/apache_pb.gif",
@ -181,12 +202,12 @@ func TestBuiltinCombinedLogFormat(t *testing.T) {
"agent": "Mozilla", "agent": "Mozilla",
}, },
m.Fields()) m.Fields())
assert.Equal(t, map[string]string{"verb": "GET"}, m.Tags()) assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags())
} }
func TestCompileStringAndParse(t *testing.T) { func TestCompileStringAndParse(t *testing.T) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, Patterns: []string{"%{TEST_LOG_A}"},
CustomPatterns: ` CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag} RESPONSE_CODE %{NUMBER:response_code:tag}
@ -209,6 +230,41 @@ func TestCompileStringAndParse(t *testing.T) {
assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags()) assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags())
} }
func TestCompileErrorsOnInvalidPattern(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time:duration}
TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
`,
}
assert.Error(t, p.Compile())
metricA, _ := p.ParseLine(`1.25 200 192.168.1.1 5.432µs`)
require.Nil(t, metricA)
}
func TestParsePatternsWithoutCustom(t *testing.T) {
p := &Parser{
Patterns: []string{"%{POSINT:ts:ts-epochnano} response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}"},
}
assert.NoError(t, p.Compile())
metricA, err := p.ParseLine(`1466004605359052000 response_time=20821 mymetric=10890.645`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"response_time": int64(20821),
"metric": float64(10890.645),
},
metricA.Fields())
assert.Equal(t, map[string]string{}, metricA.Tags())
assert.Equal(t, time.Unix(0, 1466004605359052000), metricA.Time())
}
func TestParseEpochNano(t *testing.T) { func TestParseEpochNano(t *testing.T) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{MYAPP}"}, Patterns: []string{"%{MYAPP}"},
@ -392,7 +448,7 @@ func TestParseErrors(t *testing.T) {
TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:int} %{} TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:int} %{}
`, `,
} }
assert.NoError(t, p.Compile()) assert.Error(t, p.Compile())
_, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] notnumber 200 192.168.1.1 5.432µs 101`) _, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] notnumber 200 192.168.1.1 5.432µs 101`)
assert.Error(t, err) assert.Error(t, err)

View File

@ -66,7 +66,7 @@ INFLUXDB_HTTPD_LOG \[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:r
# apache & nginx logs, this is also known as the "common log format" # apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format # see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:int} (?:%{NUMBER:resp_bytes:int}|-) COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)
# Combined log format is the same as the common log format but with the addition # Combined log format is the same as the common log format but with the addition
# of two quoted strings at the end for "referrer" and "agent" # of two quoted strings at the end for "referrer" and "agent"

View File

@ -62,7 +62,7 @@ INFLUXDB_HTTPD_LOG \[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:r
# apache & nginx logs, this is also known as the "common log format" # apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format # see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:int} (?:%{NUMBER:resp_bytes:int}|-) COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)
# Combined log format is the same as the common log format but with the addition # Combined log format is the same as the common log format but with the addition
# of two quoted strings at the end for "referrer" and "agent" # of two quoted strings at the end for "referrer" and "agent"

View File

@ -9,6 +9,7 @@ import (
"github.com/hpcloud/tail" "github.com/hpcloud/tail"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
@ -58,6 +59,8 @@ const sampleConfig = `
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
patterns = ["%{INFLUXDB_HTTPD_LOG}"] patterns = ["%{INFLUXDB_HTTPD_LOG}"]
## Name of the outputted measurement name.
measurement = "influxdb_log"
## Full path(s) to custom pattern files. ## Full path(s) to custom pattern files.
custom_pattern_files = [] custom_pattern_files = []
## Custom patterns can also be defined here. Put one pattern per line. ## Custom patterns can also be defined here. Put one pattern per line.
@ -108,11 +111,15 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
} }
// compile log parser patterns: // compile log parser patterns:
errChan := errchan.New(len(l.parsers))
for _, parser := range l.parsers { for _, parser := range l.parsers {
if err := parser.Compile(); err != nil { if err := parser.Compile(); err != nil {
return err errChan.C <- err
} }
} }
if err := errChan.Error(); err != nil {
return err
}
var seek tail.SeekInfo var seek tail.SeekInfo
if !l.FromBeginning { if !l.FromBeginning {
@ -123,24 +130,25 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.wg.Add(1) l.wg.Add(1)
go l.parser() go l.parser()
var errS string
// Create a "tailer" for each file // Create a "tailer" for each file
for _, filepath := range l.Files { for _, filepath := range l.Files {
g, err := globpath.Compile(filepath) g, err := globpath.Compile(filepath)
if err != nil { if err != nil {
log.Printf("ERROR Glob %s failed to compile, %s", filepath, err) log.Printf("ERROR Glob %s failed to compile, %s", filepath, err)
continue
} }
for file, _ := range g.Match() { files := g.Match()
errChan = errchan.New(len(files))
for file, _ := range files {
tailer, err := tail.TailFile(file, tailer, err := tail.TailFile(file,
tail.Config{ tail.Config{
ReOpen: true, ReOpen: true,
Follow: true, Follow: true,
Location: &seek, Location: &seek,
MustExist: true,
}) })
if err != nil { errChan.C <- err
errS += err.Error() + " "
continue
}
// create a goroutine for each "tailer" // create a goroutine for each "tailer"
l.wg.Add(1) l.wg.Add(1)
go l.receiver(tailer) go l.receiver(tailer)
@ -148,10 +156,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
} }
} }
if errS != "" { return errChan.Error()
return fmt.Errorf(errS)
}
return nil
} }
// receiver is launched as a goroutine to continuously watch a tailed logfile // receiver is launched as a goroutine to continuously watch a tailed logfile
@ -199,8 +204,6 @@ func (l *LogParserPlugin) parser() {
if m != nil { if m != nil {
l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
} }
} else {
log.Printf("Malformed log line in [%s], Error: %s\n", line, err)
} }
} }
} }

View File

@ -37,7 +37,7 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
assert.NoError(t, logparser.Start(&acc)) assert.Error(t, logparser.Start(&acc))
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
logparser.Stop() logparser.Stop()
@ -80,6 +80,8 @@ func TestGrokParseLogFiles(t *testing.T) {
map[string]string{}) map[string]string{})
} }
// Test that test_a.log line gets parsed even though we don't have the correct
// pattern available for test_b.log
func TestGrokParseLogFilesOneBad(t *testing.T) { func TestGrokParseLogFilesOneBad(t *testing.T) {
thisdir := getCurrentDir() thisdir := getCurrentDir()
p := &grok.Parser{ p := &grok.Parser{
@ -90,11 +92,12 @@ func TestGrokParseLogFilesOneBad(t *testing.T) {
logparser := &LogParserPlugin{ logparser := &LogParserPlugin{
FromBeginning: true, FromBeginning: true,
Files: []string{thisdir + "grok/testdata/*.log"}, Files: []string{thisdir + "grok/testdata/test_a.log"},
GrokParser: p, GrokParser: p,
} }
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, logparser.Start(&acc)) assert.NoError(t, logparser.Start(&acc))
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)

View File

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -25,6 +26,7 @@ var sampleConfig = `
## e.g. ## e.g.
## tcp://localhost:6379 ## tcp://localhost:6379
## tcp://:password@192.168.99.100 ## tcp://:password@192.168.99.100
## unix:///var/run/redis.sock
## ##
## If no servers are specified, then localhost is used as the host. ## If no servers are specified, then localhost is used as the host.
## If no port is specified, 6379 is used ## If no port is specified, 6379 is used
@ -80,22 +82,27 @@ var Tracking = map[string]string{
var ErrProtocolError = errors.New("redis protocol error") var ErrProtocolError = errors.New("redis protocol error")
const defaultPort = "6379"
// Reads stats from all configured servers accumulates stats. // Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any). // Returns one of the errors encountered while gather stats (if any).
func (r *Redis) Gather(acc telegraf.Accumulator) error { func (r *Redis) Gather(acc telegraf.Accumulator) error {
if len(r.Servers) == 0 { if len(r.Servers) == 0 {
url := &url.URL{ url := &url.URL{
Host: ":6379", Scheme: "tcp",
Host: ":6379",
} }
r.gatherServer(url, acc) r.gatherServer(url, acc)
return nil return nil
} }
var wg sync.WaitGroup var wg sync.WaitGroup
errChan := errchan.New(len(r.Servers))
var outerr error
for _, serv := range r.Servers { for _, serv := range r.Servers {
if !strings.HasPrefix(serv, "tcp://") || !strings.HasPrefix(serv, "unix://") {
serv = "tcp://" + serv
}
u, err := url.Parse(serv) u, err := url.Parse(serv)
if err != nil { if err != nil {
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
@ -105,29 +112,35 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error {
u.Host = serv u.Host = serv
u.Path = "" u.Path = ""
} }
if u.Scheme == "tcp" {
_, _, err := net.SplitHostPort(u.Host)
if err != nil {
u.Host = u.Host + ":" + defaultPort
}
}
wg.Add(1) wg.Add(1)
go func(serv string) { go func(serv string) {
defer wg.Done() defer wg.Done()
outerr = r.gatherServer(u, acc) errChan.C <- r.gatherServer(u, acc)
}(serv) }(serv)
} }
wg.Wait() wg.Wait()
return errChan.Error()
return outerr
} }
const defaultPort = "6379"
func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
_, _, err := net.SplitHostPort(addr.Host) var address string
if err != nil {
addr.Host = addr.Host + ":" + defaultPort
}
c, err := net.DialTimeout("tcp", addr.Host, defaultTimeout) if addr.Scheme == "unix" {
address = addr.Path
} else {
address = addr.Host
}
c, err := net.DialTimeout(addr.Scheme, address, defaultTimeout)
if err != nil { if err != nil {
return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err) return fmt.Errorf("Unable to connect to redis server '%s': %s", address, err)
} }
defer c.Close() defer c.Close()
@ -155,12 +168,17 @@ func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
c.Write([]byte("EOF\r\n")) c.Write([]byte("EOF\r\n"))
rdr := bufio.NewReader(c) rdr := bufio.NewReader(c)
// Setup tags for all redis metrics var tags map[string]string
host, port := "unknown", "unknown"
// If there's an error, ignore and use 'unknown' tags
host, port, _ = net.SplitHostPort(addr.Host)
tags := map[string]string{"server": host, "port": port}
if addr.Scheme == "unix" {
tags = map[string]string{"socket": addr.Path}
} else {
// Setup tags for all redis metrics
host, port := "unknown", "unknown"
// If there's an error, ignore and use 'unknown' tags
host, port, _ = net.SplitHostPort(addr.Host)
tags = map[string]string{"server": host, "port": port}
}
return gatherInfoOutput(rdr, acc, tags) return gatherInfoOutput(rdr, acc, tags)
} }

View File

@ -86,9 +86,10 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
for file, _ := range g.Match() { for file, _ := range g.Match() {
tailer, err := tail.TailFile(file, tailer, err := tail.TailFile(file,
tail.Config{ tail.Config{
ReOpen: true, ReOpen: true,
Follow: true, Follow: true,
Location: &seek, Location: &seek,
MustExist: true,
}) })
if err != nil { if err != nil {
errS += err.Error() + " " errS += err.Error() + " "

View File

@ -16,6 +16,7 @@ $ sudo service telegraf start
## Available webhooks ## Available webhooks
- [Github](github/) - [Github](github/)
- [Mandrill](mandrill/)
- [Rollbar](rollbar/) - [Rollbar](rollbar/)
## Adding new webhooks plugin ## Adding new webhooks plugin

View File

@ -0,0 +1,15 @@
# mandrill webhook
You should configure your Mandrill's Webhooks to point at the `webhooks` service. To do this go to `mandrillapp.com/` and click `Settings > Webhooks`. In the resulting page, click on `Add a Webhook`, select all events, and set the `URL` to `http://<my_ip>:1619/mandrill`, and click on `Create Webhook`.
## Events
See the [webhook doc](https://mandrill.zendesk.com/hc/en-us/articles/205583307-Message-Event-Webhook-format).
All events for logs the original timestamp, the event name and the unique identifier of the message that generated the event.
**Tags:**
* 'event' = `event.event` string
**Fields:**
* 'id' = `event._id` string

View File

@ -0,0 +1,56 @@
package mandrill
import (
"encoding/json"
"io/ioutil"
"log"
"net/http"
"net/url"
"time"
"github.com/gorilla/mux"
"github.com/influxdata/telegraf"
)
type MandrillWebhook struct {
Path string
acc telegraf.Accumulator
}
func (md *MandrillWebhook) Register(router *mux.Router, acc telegraf.Accumulator) {
router.HandleFunc(md.Path, md.returnOK).Methods("HEAD")
router.HandleFunc(md.Path, md.eventHandler).Methods("POST")
log.Printf("Started the webhooks_mandrill on %s\n", md.Path)
md.acc = acc
}
func (md *MandrillWebhook) returnOK(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func (md *MandrillWebhook) eventHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
data, err := url.ParseQuery(string(body))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
var events []MandrillEvent
err = json.Unmarshal([]byte(data.Get("mandrill_events")), &events)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
for _, event := range events {
md.acc.AddFields("mandrill_webhooks", event.Fields(), event.Tags(), time.Unix(event.TimeStamp, 0))
}
w.WriteHeader(http.StatusOK)
}

View File

@ -0,0 +1,24 @@
package mandrill
type Event interface {
Tags() map[string]string
Fields() map[string]interface{}
}
type MandrillEvent struct {
EventName string `json:"event"`
TimeStamp int64 `json:"ts"`
Id string `json:"_id"`
}
func (me *MandrillEvent) Tags() map[string]string {
return map[string]string{
"event": me.EventName,
}
}
func (me *MandrillEvent) Fields() map[string]interface{} {
return map[string]interface{}{
"id": me.Id,
}
}

View File

@ -0,0 +1,58 @@
package mandrill
func SendEventJSON() string {
return `
{
"event": "send",
"msg": {
"ts": 1365109999,
"subject": "This an example webhook message",
"email": "example.webhook@mandrillapp.com",
"sender": "example.sender@mandrillapp.com",
"tags": [
"webhook-example"
],
"opens": [
],
"clicks": [
],
"state": "sent",
"metadata": {
"user_id": 111
},
"_id": "exampleaaaaaaaaaaaaaaaaaaaaaaaaa",
"_version": "exampleaaaaaaaaaaaaaaa"
},
"_id": "id1",
"ts": 1384954004
}`
}
func HardBounceEventJSON() string {
return `
{
"event": "hard_bounce",
"msg": {
"ts": 1365109999,
"subject": "This an example webhook message",
"email": "example.webhook@mandrillapp.com",
"sender": "example.sender@mandrillapp.com",
"tags": [
"webhook-example"
],
"state": "bounced",
"metadata": {
"user_id": 111
},
"_id": "exampleaaaaaaaaaaaaaaaaaaaaaaaaa2",
"_version": "exampleaaaaaaaaaaaaaaa",
"bounce_description": "bad_mailbox",
"bgtools_code": 10,
"diag": "smtp;550 5.1.1 The email account that you tried to reach does not exist. Please try double-checking the recipient's email address for typos or unnecessary spaces."
},
"_id": "id2",
"ts": 1384954004
}`
}

View File

@ -0,0 +1,85 @@
package mandrill
import (
"github.com/influxdata/telegraf/testutil"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
)
func postWebhooks(md *MandrillWebhook, eventBody string) *httptest.ResponseRecorder {
body := url.Values{}
body.Set("mandrill_events", eventBody)
req, _ := http.NewRequest("POST", "/mandrill", strings.NewReader(body.Encode()))
w := httptest.NewRecorder()
md.eventHandler(w, req)
return w
}
func headRequest(md *MandrillWebhook) *httptest.ResponseRecorder {
req, _ := http.NewRequest("HEAD", "/mandrill", strings.NewReader(""))
w := httptest.NewRecorder()
md.returnOK(w, req)
return w
}
func TestHead(t *testing.T) {
md := &MandrillWebhook{Path: "/mandrill"}
resp := headRequest(md)
if resp.Code != http.StatusOK {
t.Errorf("HEAD returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)
}
}
func TestSendEvent(t *testing.T) {
var acc testutil.Accumulator
md := &MandrillWebhook{Path: "/mandrill", acc: &acc}
resp := postWebhooks(md, "["+SendEventJSON()+"]")
if resp.Code != http.StatusOK {
t.Errorf("POST send returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)
}
fields := map[string]interface{}{
"id": "id1",
}
tags := map[string]string{
"event": "send",
}
acc.AssertContainsTaggedFields(t, "mandrill_webhooks", fields, tags)
}
func TestMultipleEvents(t *testing.T) {
var acc testutil.Accumulator
md := &MandrillWebhook{Path: "/mandrill", acc: &acc}
resp := postWebhooks(md, "["+SendEventJSON()+","+HardBounceEventJSON()+"]")
if resp.Code != http.StatusOK {
t.Errorf("POST send returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)
}
fields := map[string]interface{}{
"id": "id1",
}
tags := map[string]string{
"event": "send",
}
acc.AssertContainsTaggedFields(t, "mandrill_webhooks", fields, tags)
fields = map[string]interface{}{
"id": "id2",
}
tags = map[string]string{
"event": "hard_bounce",
}
acc.AssertContainsTaggedFields(t, "mandrill_webhooks", fields, tags)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/github" "github.com/influxdata/telegraf/plugins/inputs/webhooks/github"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/mandrill"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar" "github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar"
) )
@ -25,8 +26,9 @@ func init() {
type Webhooks struct { type Webhooks struct {
ServiceAddress string ServiceAddress string
Github *github.GithubWebhook Github *github.GithubWebhook
Rollbar *rollbar.RollbarWebhook Mandrill *mandrill.MandrillWebhook
Rollbar *rollbar.RollbarWebhook
} }
func NewWebhooks() *Webhooks { func NewWebhooks() *Webhooks {
@ -41,6 +43,9 @@ func (wb *Webhooks) SampleConfig() string {
[inputs.webhooks.github] [inputs.webhooks.github]
path = "/github" path = "/github"
[inputs.webhooks.mandrill]
path = "/mandrill"
[inputs.webhooks.rollbar] [inputs.webhooks.rollbar]
path = "/rollbar" path = "/rollbar"
` `

View File

@ -32,7 +32,7 @@ echo mntr | nc localhost 2181
Meta: Meta:
- units: int64 - units: int64
- tags: `server=<hostname> port=<port>` - tags: `server=<hostname> port=<port> state=<leader|follower>`
Measurement names: Measurement names:
- zookeeper_avg_latency - zookeeper_avg_latency
@ -55,8 +55,12 @@ Measurement names:
Meta: Meta:
- units: string - units: string
- tags: `server=<hostname> port=<port>` - tags: `server=<hostname> port=<port> state=<leader|follower>`
Measurement names: Measurement names:
- zookeeper_version - zookeeper_version
- zookeeper_server_state
### Tags:
- All measurements have the following tags:
-

View File

@ -55,6 +55,7 @@ func (z *Zookeeper) Gather(acc telegraf.Accumulator) error {
} }
func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error { func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error {
var zookeeper_state string
_, _, err := net.SplitHostPort(address) _, _, err := net.SplitHostPort(address)
if err != nil { if err != nil {
address = address + ":2181" address = address + ":2181"
@ -78,7 +79,6 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error
if len(service) != 2 { if len(service) != 2 {
return fmt.Errorf("Invalid service address: %s", address) return fmt.Errorf("Invalid service address: %s", address)
} }
tags := map[string]string{"server": service[0], "port": service[1]}
fields := make(map[string]interface{}) fields := make(map[string]interface{})
for scanner.Scan() { for scanner.Scan() {
@ -92,15 +92,24 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error
} }
measurement := strings.TrimPrefix(parts[1], "zk_") measurement := strings.TrimPrefix(parts[1], "zk_")
sValue := string(parts[2]) if measurement == "server_state" {
zookeeper_state = parts[2]
iVal, err := strconv.ParseInt(sValue, 10, 64)
if err == nil {
fields[measurement] = iVal
} else { } else {
fields[measurement] = sValue sValue := string(parts[2])
iVal, err := strconv.ParseInt(sValue, 10, 64)
if err == nil {
fields[measurement] = iVal
} else {
fields[measurement] = sValue
}
} }
} }
tags := map[string]string{
"server": service[0],
"port": service[1],
"state": zookeeper_state,
}
acc.AddFields("zookeeper", fields, tags) acc.AddFields("zookeeper", fields, tags)
return nil return nil

View File

@ -5,7 +5,6 @@ import (
"log" "log"
"net/http" "net/http"
"regexp" "regexp"
"strings"
"sync" "sync"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -14,7 +13,7 @@ import (
) )
var ( var (
sanitizedChars = strings.NewReplacer("/", "_", "@", "_", " ", "_", "-", "_", ".", "_") invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
// Prometheus metric names must match this regex // Prometheus metric names must match this regex
// see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels // see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
@ -111,12 +110,12 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
for _, point := range metrics { for _, point := range metrics {
key := point.Name() key := point.Name()
key = sanitizedChars.Replace(key) key = invalidNameCharRE.ReplaceAllString(key, "_")
var labels []string var labels []string
l := prometheus.Labels{} l := prometheus.Labels{}
for k, v := range point.Tags() { for k, v := range point.Tags() {
k = sanitizedChars.Replace(k) k = invalidNameCharRE.ReplaceAllString(k, "_")
if len(k) == 0 { if len(k) == 0 {
continue continue
} }
@ -137,7 +136,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
} }
// sanitize the measurement name // sanitize the measurement name
n = sanitizedChars.Replace(n) n = invalidNameCharRE.ReplaceAllString(n, "_")
var mname string var mname string
if n == "value" { if n == "value" {
mname = key mname = key