From 300d9adbd027ff87f5120e0e917d9787f83081d5 Mon Sep 17 00:00:00 2001 From: tuier Date: Sat, 16 Jul 2016 19:19:21 +0100 Subject: [PATCH 01/13] Considere zookeeper's state as a tags (#1417) This change will send the state of zookeeper (leader|follower) as a tag and not a metrics That way it will be easier to search for filter per state --- plugins/inputs/zookeeper/README.md | 10 +++++++--- plugins/inputs/zookeeper/zookeeper.go | 23 ++++++++++++++++------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/plugins/inputs/zookeeper/README.md b/plugins/inputs/zookeeper/README.md index fe7a8a4ad..bc7c17a4b 100644 --- a/plugins/inputs/zookeeper/README.md +++ b/plugins/inputs/zookeeper/README.md @@ -32,7 +32,7 @@ echo mntr | nc localhost 2181 Meta: - units: int64 -- tags: `server= port=` +- tags: `server= port= state=` Measurement names: - zookeeper_avg_latency @@ -55,8 +55,12 @@ Measurement names: Meta: - units: string -- tags: `server= port=` +- tags: `server= port= state=` Measurement names: - zookeeper_version -- zookeeper_server_state \ No newline at end of file + +### Tags: + +- All measurements have the following tags: + - diff --git a/plugins/inputs/zookeeper/zookeeper.go b/plugins/inputs/zookeeper/zookeeper.go index 54defc56f..c11b55f68 100644 --- a/plugins/inputs/zookeeper/zookeeper.go +++ b/plugins/inputs/zookeeper/zookeeper.go @@ -55,6 +55,7 @@ func (z *Zookeeper) Gather(acc telegraf.Accumulator) error { } func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error { + var zookeeper_state string _, _, err := net.SplitHostPort(address) if err != nil { address = address + ":2181" @@ -78,7 +79,6 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error if len(service) != 2 { return fmt.Errorf("Invalid service address: %s", address) } - tags := map[string]string{"server": service[0], "port": service[1]} fields := make(map[string]interface{}) for scanner.Scan() { @@ -92,15 +92,24 @@ func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error } measurement := strings.TrimPrefix(parts[1], "zk_") - sValue := string(parts[2]) - - iVal, err := strconv.ParseInt(sValue, 10, 64) - if err == nil { - fields[measurement] = iVal + if measurement == "server_state" { + zookeeper_state = parts[2] } else { - fields[measurement] = sValue + sValue := string(parts[2]) + + iVal, err := strconv.ParseInt(sValue, 10, 64) + if err == nil { + fields[measurement] = iVal + } else { + fields[measurement] = sValue + } } } + tags := map[string]string{ + "server": service[0], + "port": service[1], + "state": zookeeper_state, + } acc.AddFields("zookeeper", fields, tags) return nil From 704d9ad76c898c9f14c8ed7e33de416c8e4f1259 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 14 Jul 2016 23:12:32 -0600 Subject: [PATCH 02/13] Refactor aerospike plugin to use client lib --- CHANGELOG.md | 7 + Godeps | 2 + plugins/inputs/aerospike/aerospike.go | 357 +++++---------------- plugins/inputs/aerospike/aerospike_test.go | 97 ++---- 4 files changed, 104 insertions(+), 359 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eda9f2f63..d01567eba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ ### Release Notes +**Breaking Change**: Aerospike main server node measurements have been renamed +aerospike_node. Aerospike namespace measurements have been renamed to +aerospike_namespace. They will also now be tagged with the node_name +that they correspond to. This has been done to differentiate measurements +that pertain to node vs. namespace statistics. + **Breaking Change**: users of github_webhooks must change to the new `[[inputs.webhooks]]` plugin. @@ -35,6 +41,7 @@ should now look like: - [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD. - [#1387](https://github.com/influxdata/telegraf/pull/1387): **Breaking Change** - Redis `role` tag renamed to `replication_role` to avoid global_tags override - [#1437](https://github.com/influxdata/telegraf/pull/1437): Fetching Galera status metrics in MySQL +- [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib. ### Bugfixes diff --git a/Godeps b/Godeps index f47a57806..1546bb627 100644 --- a/Godeps +++ b/Godeps @@ -1,5 +1,6 @@ github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9 github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc +github.com/aerospike/aerospike-client-go 45863b7fd8640dc12f7fdd397104d97e1986f25a github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 @@ -50,6 +51,7 @@ github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2 github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 +github.com/yuin/gopher-lua bf3808abd44b1e55143a2d7f08571aaa80db1808 github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 golang.org/x/crypto 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3 golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172 diff --git a/plugins/inputs/aerospike/aerospike.go b/plugins/inputs/aerospike/aerospike.go index cd2ebe25c..4bb652c0a 100644 --- a/plugins/inputs/aerospike/aerospike.go +++ b/plugins/inputs/aerospike/aerospike.go @@ -1,104 +1,19 @@ package aerospike import ( - "bytes" - "encoding/binary" - "fmt" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/inputs" "net" "strconv" "strings" "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" + "github.com/influxdata/telegraf/plugins/inputs" + + as "github.com/aerospike/aerospike-client-go" ) -const ( - MSG_HEADER_SIZE = 8 - MSG_TYPE = 1 // Info is 1 - MSG_VERSION = 2 -) - -var ( - STATISTICS_COMMAND = []byte("statistics\n") - NAMESPACES_COMMAND = []byte("namespaces\n") -) - -type aerospikeMessageHeader struct { - Version uint8 - Type uint8 - DataLen [6]byte -} - -type aerospikeMessage struct { - aerospikeMessageHeader - Data []byte -} - -// Taken from aerospike-client-go/types/message.go -func (msg *aerospikeMessage) Serialize() []byte { - msg.DataLen = msgLenToBytes(int64(len(msg.Data))) - buf := bytes.NewBuffer([]byte{}) - binary.Write(buf, binary.BigEndian, msg.aerospikeMessageHeader) - binary.Write(buf, binary.BigEndian, msg.Data[:]) - return buf.Bytes() -} - -type aerospikeInfoCommand struct { - msg *aerospikeMessage -} - -// Taken from aerospike-client-go/info.go -func (nfo *aerospikeInfoCommand) parseMultiResponse() (map[string]string, error) { - responses := make(map[string]string) - offset := int64(0) - begin := int64(0) - - dataLen := int64(len(nfo.msg.Data)) - - // Create reusable StringBuilder for performance. - for offset < dataLen { - b := nfo.msg.Data[offset] - - if b == '\t' { - name := nfo.msg.Data[begin:offset] - offset++ - begin = offset - - // Parse field value. - for offset < dataLen { - if nfo.msg.Data[offset] == '\n' { - break - } - offset++ - } - - if offset > begin { - value := nfo.msg.Data[begin:offset] - responses[string(name)] = string(value) - } else { - responses[string(name)] = "" - } - offset++ - begin = offset - } else if b == '\n' { - if offset > begin { - name := nfo.msg.Data[begin:offset] - responses[string(name)] = "" - } - offset++ - begin = offset - } else { - offset++ - } - } - - if offset > begin { - name := nfo.msg.Data[begin:offset] - responses[string(name)] = "" - } - return responses, nil -} - type Aerospike struct { Servers []string } @@ -115,7 +30,7 @@ func (a *Aerospike) SampleConfig() string { } func (a *Aerospike) Description() string { - return "Read stats from an aerospike server" + return "Read stats from aerospike server(s)" } func (a *Aerospike) Gather(acc telegraf.Accumulator) error { @@ -124,214 +39,90 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error { } var wg sync.WaitGroup - - var outerr error - + errChan := errchan.New(len(a.Servers)) + wg.Add(len(a.Servers)) for _, server := range a.Servers { - wg.Add(1) - go func(server string) { + go func(serv string) { defer wg.Done() - outerr = a.gatherServer(server, acc) + errChan.C <- a.gatherServer(serv, acc) }(server) } wg.Wait() - return outerr + return errChan.Error() } -func (a *Aerospike) gatherServer(host string, acc telegraf.Accumulator) error { - aerospikeInfo, err := getMap(STATISTICS_COMMAND, host) +func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) error { + host, port, err := net.SplitHostPort(hostport) if err != nil { - return fmt.Errorf("Aerospike info failed: %s", err) + return err } - readAerospikeStats(aerospikeInfo, acc, host, "") - namespaces, err := getList(NAMESPACES_COMMAND, host) + + iport, err := strconv.Atoi(port) if err != nil { - return fmt.Errorf("Aerospike namespace list failed: %s", err) + iport = 3000 } - for ix := range namespaces { - nsInfo, err := getMap([]byte("namespace/"+namespaces[ix]+"\n"), host) - if err != nil { - return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err) + + c, err := as.NewClient(host, iport) + if err != nil { + return err + } + defer c.Close() + + nodes := c.GetNodes() + for _, n := range nodes { + tags := map[string]string{ + "node_name": n.GetName(), + "aerospike_host": hostport, + } + fields := make(map[string]interface{}) + stats, err := as.RequestNodeStats(n) + if err != nil { + return err + } + for k, v := range stats { + if iv, err := strconv.ParseInt(v, 10, 64); err == nil { + fields[strings.Replace(k, "-", "_", -1)] = iv + } + } + acc.AddFields("aerospike_node", fields, tags, time.Now()) + + info, err := as.RequestNodeInfo(n, "namespaces") + if err != nil { + return err + } + namespaces := strings.Split(info["namespaces"], ";") + + for _, namespace := range namespaces { + nTags := copyTags(tags) + nTags["namespace"] = namespace + nFields := make(map[string]interface{}) + info, err := as.RequestNodeInfo(n, "namespace/"+namespace) + if err != nil { + continue + } + stats := strings.Split(info["namespace/"+namespace], ";") + for _, stat := range stats { + parts := strings.Split(stat, "=") + if len(parts) < 2 { + continue + } + if iv, err := strconv.ParseInt(parts[1], 10, 64); err == nil { + nFields[strings.Replace(parts[0], "-", "_", -1)] = iv + } + } + acc.AddFields("aerospike_namespace", nFields, nTags, time.Now()) } - readAerospikeStats(nsInfo, acc, host, namespaces[ix]) } return nil } -func getMap(key []byte, host string) (map[string]string, error) { - data, err := get(key, host) - if err != nil { - return nil, fmt.Errorf("Failed to get data: %s", err) +func copyTags(m map[string]string) map[string]string { + out := make(map[string]string) + for k, v := range m { + out[k] = v } - parsed, err := unmarshalMapInfo(data, string(key)) - if err != nil { - return nil, fmt.Errorf("Failed to unmarshal data: %s", err) - } - - return parsed, nil -} - -func getList(key []byte, host string) ([]string, error) { - data, err := get(key, host) - if err != nil { - return nil, fmt.Errorf("Failed to get data: %s", err) - } - parsed, err := unmarshalListInfo(data, string(key)) - if err != nil { - return nil, fmt.Errorf("Failed to unmarshal data: %s", err) - } - - return parsed, nil -} - -func get(key []byte, host string) (map[string]string, error) { - var err error - var data map[string]string - - asInfo := &aerospikeInfoCommand{ - msg: &aerospikeMessage{ - aerospikeMessageHeader: aerospikeMessageHeader{ - Version: uint8(MSG_VERSION), - Type: uint8(MSG_TYPE), - DataLen: msgLenToBytes(int64(len(key))), - }, - Data: key, - }, - } - - cmd := asInfo.msg.Serialize() - addr, err := net.ResolveTCPAddr("tcp", host) - if err != nil { - return data, fmt.Errorf("Lookup failed for '%s': %s", host, err) - } - - conn, err := net.DialTCP("tcp", nil, addr) - if err != nil { - return data, fmt.Errorf("Connection failed for '%s': %s", host, err) - } - defer conn.Close() - - _, err = conn.Write(cmd) - if err != nil { - return data, fmt.Errorf("Failed to send to '%s': %s", host, err) - } - - msgHeader := bytes.NewBuffer(make([]byte, MSG_HEADER_SIZE)) - _, err = readLenFromConn(conn, msgHeader.Bytes(), MSG_HEADER_SIZE) - if err != nil { - return data, fmt.Errorf("Failed to read header: %s", err) - } - err = binary.Read(msgHeader, binary.BigEndian, &asInfo.msg.aerospikeMessageHeader) - if err != nil { - return data, fmt.Errorf("Failed to unmarshal header: %s", err) - } - - msgLen := msgLenFromBytes(asInfo.msg.aerospikeMessageHeader.DataLen) - - if int64(len(asInfo.msg.Data)) != msgLen { - asInfo.msg.Data = make([]byte, msgLen) - } - - _, err = readLenFromConn(conn, asInfo.msg.Data, len(asInfo.msg.Data)) - if err != nil { - return data, fmt.Errorf("Failed to read from connection to '%s': %s", host, err) - } - - data, err = asInfo.parseMultiResponse() - if err != nil { - return data, fmt.Errorf("Failed to parse response from '%s': %s", host, err) - } - - return data, err -} - -func readAerospikeStats( - stats map[string]string, - acc telegraf.Accumulator, - host string, - namespace string, -) { - fields := make(map[string]interface{}) - tags := map[string]string{ - "aerospike_host": host, - "namespace": "_service", - } - - if namespace != "" { - tags["namespace"] = namespace - } - for key, value := range stats { - // We are going to ignore all string based keys - val, err := strconv.ParseInt(value, 10, 64) - if err == nil { - if strings.Contains(key, "-") { - key = strings.Replace(key, "-", "_", -1) - } - fields[key] = val - } - } - acc.AddFields("aerospike", fields, tags) -} - -func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) { - key = strings.TrimSuffix(key, "\n") - res := map[string]string{} - - v, exists := infoMap[key] - if !exists { - return res, fmt.Errorf("Key '%s' missing from info", key) - } - - values := strings.Split(v, ";") - for i := range values { - kv := strings.Split(values[i], "=") - if len(kv) > 1 { - res[kv[0]] = kv[1] - } - } - - return res, nil -} - -func unmarshalListInfo(infoMap map[string]string, key string) ([]string, error) { - key = strings.TrimSuffix(key, "\n") - - v, exists := infoMap[key] - if !exists { - return []string{}, fmt.Errorf("Key '%s' missing from info", key) - } - - values := strings.Split(v, ";") - return values, nil -} - -func readLenFromConn(c net.Conn, buffer []byte, length int) (total int, err error) { - var r int - for total < length { - r, err = c.Read(buffer[total:length]) - total += r - if err != nil { - break - } - } - return -} - -// Taken from aerospike-client-go/types/message.go -func msgLenToBytes(DataLen int64) [6]byte { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, uint64(DataLen)) - res := [6]byte{} - copy(res[:], b[2:]) - return res -} - -// Taken from aerospike-client-go/types/message.go -func msgLenFromBytes(buf [6]byte) int64 { - nbytes := append([]byte{0, 0}, buf[:]...) - DataLen := binary.BigEndian.Uint64(nbytes) - return int64(DataLen) + return out } func init() { diff --git a/plugins/inputs/aerospike/aerospike_test.go b/plugins/inputs/aerospike/aerospike_test.go index 2717a15b9..8463432f5 100644 --- a/plugins/inputs/aerospike/aerospike_test.go +++ b/plugins/inputs/aerospike/aerospike_test.go @@ -1,7 +1,6 @@ package aerospike import ( - "reflect" "testing" "github.com/influxdata/telegraf/testutil" @@ -22,84 +21,30 @@ func TestAerospikeStatistics(t *testing.T) { err := a.Gather(&acc) require.NoError(t, err) + + assert.True(t, acc.HasMeasurement("aerospike_node")) + assert.True(t, acc.HasMeasurement("aerospike_namespace")) + assert.True(t, acc.HasIntField("aerospike_node", "batch_error")) } -func TestAerospikeMsgLenFromToBytes(t *testing.T) { - var i int64 = 8 - assert.True(t, i == msgLenFromBytes(msgLenToBytes(i))) -} +func TestAerospikeStatisticsPartialErr(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + a := &Aerospike{ + Servers: []string{ + testutil.GetLocalHost() + ":3000", + testutil.GetLocalHost() + ":9999", + }, + } -func TestReadAerospikeStatsNoNamespace(t *testing.T) { - // Also test for re-writing var acc testutil.Accumulator - stats := map[string]string{ - "stat-write-errs": "12345", - "stat_read_reqs": "12345", - } - readAerospikeStats(stats, &acc, "host1", "") - fields := map[string]interface{}{ - "stat_write_errs": int64(12345), - "stat_read_reqs": int64(12345), - } - tags := map[string]string{ - "aerospike_host": "host1", - "namespace": "_service", - } - acc.AssertContainsTaggedFields(t, "aerospike", fields, tags) -} - -func TestReadAerospikeStatsNamespace(t *testing.T) { - var acc testutil.Accumulator - stats := map[string]string{ - "stat_write_errs": "12345", - "stat_read_reqs": "12345", - } - readAerospikeStats(stats, &acc, "host1", "test") - - fields := map[string]interface{}{ - "stat_write_errs": int64(12345), - "stat_read_reqs": int64(12345), - } - tags := map[string]string{ - "aerospike_host": "host1", - "namespace": "test", - } - acc.AssertContainsTaggedFields(t, "aerospike", fields, tags) -} - -func TestAerospikeUnmarshalList(t *testing.T) { - i := map[string]string{ - "test": "one;two;three", - } - - expected := []string{"one", "two", "three"} - - list, err := unmarshalListInfo(i, "test2") - assert.True(t, err != nil) - - list, err = unmarshalListInfo(i, "test") - assert.True(t, err == nil) - equal := true - for ix := range expected { - if list[ix] != expected[ix] { - equal = false - break - } - } - assert.True(t, equal) -} - -func TestAerospikeUnmarshalMap(t *testing.T) { - i := map[string]string{ - "test": "key1=value1;key2=value2", - } - - expected := map[string]string{ - "key1": "value1", - "key2": "value2", - } - m, err := unmarshalMapInfo(i, "test") - assert.True(t, err == nil) - assert.True(t, reflect.DeepEqual(m, expected)) + err := a.Gather(&acc) + require.Error(t, err) + + assert.True(t, acc.HasMeasurement("aerospike_node")) + assert.True(t, acc.HasMeasurement("aerospike_namespace")) + assert.True(t, acc.HasIntField("aerospike_node", "batch_error")) } From 6afe9ceef1222c1d9dae0262865662bcf57d3f79 Mon Sep 17 00:00:00 2001 From: ashish Date: Mon, 18 Jul 2016 12:06:41 +0530 Subject: [PATCH 03/13] cassandra plugin lower version support added closes #1427 closes #1508 --- CHANGELOG.md | 1 + plugins/inputs/cassandra/cassandra.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d01567eba..e5388cb84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ should now look like: - [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load. - [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior - [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues. +- [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix. ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/inputs/cassandra/cassandra.go b/plugins/inputs/cassandra/cassandra.go index 351232aca..e7edf7153 100644 --- a/plugins/inputs/cassandra/cassandra.go +++ b/plugins/inputs/cassandra/cassandra.go @@ -148,7 +148,7 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) { tokens := parseJmxMetricRequest(r.(map[string]interface{})["mbean"].(string)) // Requests with wildcards for keyspace or table names will return nested // maps in the json response - if tokens["type"] == "Table" && (tokens["keyspace"] == "*" || + if (tokens["type"] == "Table" || tokens["type"] == "ColumnFamily") && (tokens["keyspace"] == "*" || tokens["scope"] == "*") { if valuesMap, ok := out["value"]; ok { for k, v := range valuesMap.(map[string]interface{}) { From b4a6d9c6475e8bca374f072d9e7f8dd9cc25f702 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 18 Jul 2016 11:45:25 +0100 Subject: [PATCH 04/13] Change prometheus replacer to reverse regex replacer closes #1474 --- plugins/outputs/prometheus_client/prometheus_client.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index 790784a2b..4f7ce8053 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -5,7 +5,6 @@ import ( "log" "net/http" "regexp" - "strings" "sync" "github.com/influxdata/telegraf" @@ -14,7 +13,7 @@ import ( ) var ( - sanitizedChars = strings.NewReplacer("/", "_", "@", "_", " ", "_", "-", "_", ".", "_") + invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) // Prometheus metric names must match this regex // see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels @@ -111,12 +110,12 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { for _, point := range metrics { key := point.Name() - key = sanitizedChars.Replace(key) + key = invalidNameCharRE.ReplaceAllString(key, "_") var labels []string l := prometheus.Labels{} for k, v := range point.Tags() { - k = sanitizedChars.Replace(k) + k = invalidNameCharRE.ReplaceAllString(k, "_") if len(k) == 0 { continue } @@ -137,7 +136,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { } // sanitize the measurement name - n = sanitizedChars.Replace(n) + n = invalidNameCharRE.ReplaceAllString(n, "_") var mname string if n == "value" { mname = key From 2d6c8767f775cc612facc1fe82d53719a66b4b22 Mon Sep 17 00:00:00 2001 From: Mark McKinstry Date: Mon, 18 Jul 2016 07:03:39 -0400 Subject: [PATCH 05/13] add ability to read redis from a socket (#1480) * add ability to read redis from a socket * update CHANGELOG --- CHANGELOG.md | 1 + plugins/inputs/redis/redis.go | 48 +++++++++++++++++++++++++---------- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e5388cb84..6128a698b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ should now look like: - [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests. - [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin. - [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD. +- [#1369](https://github.com/influxdata/telegraf/pull/1480): add ability to read redis from a socket. - [#1387](https://github.com/influxdata/telegraf/pull/1387): **Breaking Change** - Redis `role` tag renamed to `replication_role` to avoid global_tags override - [#1437](https://github.com/influxdata/telegraf/pull/1437): Fetching Galera status metrics in MySQL - [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib. diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index 76cbc89cb..fc50387df 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -25,6 +25,7 @@ var sampleConfig = ` ## e.g. ## tcp://localhost:6379 ## tcp://:password@192.168.99.100 + ## unix:///var/run/redis.sock ## ## If no servers are specified, then localhost is used as the host. ## If no port is specified, 6379 is used @@ -80,12 +81,15 @@ var Tracking = map[string]string{ var ErrProtocolError = errors.New("redis protocol error") +const defaultPort = "6379" + // Reads stats from all configured servers accumulates stats. // Returns one of the errors encountered while gather stats (if any). func (r *Redis) Gather(acc telegraf.Accumulator) error { if len(r.Servers) == 0 { url := &url.URL{ - Host: ":6379", + Scheme: "tcp", + Host: ":6379", } r.gatherServer(url, acc) return nil @@ -96,6 +100,10 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error { var outerr error for _, serv := range r.Servers { + if !strings.HasPrefix(serv, "tcp://") || !strings.HasPrefix(serv, "unix://") { + serv = "tcp://" + serv + } + u, err := url.Parse(serv) if err != nil { return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) @@ -105,6 +113,13 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error { u.Host = serv u.Path = "" } + if u.Scheme == "tcp" { + _, _, err := net.SplitHostPort(u.Host) + if err != nil { + u.Host = u.Host + ":" + defaultPort + } + } + wg.Add(1) go func(serv string) { defer wg.Done() @@ -117,17 +132,17 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error { return outerr } -const defaultPort = "6379" - func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { - _, _, err := net.SplitHostPort(addr.Host) - if err != nil { - addr.Host = addr.Host + ":" + defaultPort - } + var address string - c, err := net.DialTimeout("tcp", addr.Host, defaultTimeout) + if addr.Scheme == "unix" { + address = addr.Path + } else { + address = addr.Host + } + c, err := net.DialTimeout(addr.Scheme, address, defaultTimeout) if err != nil { - return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err) + return fmt.Errorf("Unable to connect to redis server '%s': %s", address, err) } defer c.Close() @@ -155,12 +170,17 @@ func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { c.Write([]byte("EOF\r\n")) rdr := bufio.NewReader(c) - // Setup tags for all redis metrics - host, port := "unknown", "unknown" - // If there's an error, ignore and use 'unknown' tags - host, port, _ = net.SplitHostPort(addr.Host) - tags := map[string]string{"server": host, "port": port} + var tags map[string]string + if addr.Scheme == "unix" { + tags = map[string]string{"socket": addr.Path} + } else { + // Setup tags for all redis metrics + host, port := "unknown", "unknown" + // If there's an error, ignore and use 'unknown' tags + host, port, _ = net.SplitHostPort(addr.Host) + tags = map[string]string{"server": host, "port": port} + } return gatherInfoOutput(rdr, acc, tags) } From 1d9745ee98806fda6c20910d572ae15b35a7f036 Mon Sep 17 00:00:00 2001 From: Tim Allen Date: Mon, 11 Jul 2016 08:58:00 -0500 Subject: [PATCH 06/13] Move exec WaitGroup from Exec instance level to Gather. If Gather is run concurently the shared WaitGroup variable never finishes. closes #1463 closes #1464 --- CHANGELOG.md | 1 + plugins/inputs/exec/exec.go | 13 ++++++------- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6128a698b..0e8dd69cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ should now look like: - [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior - [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues. - [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix. +- [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index c8d4cee50..060a4f308 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -48,8 +48,6 @@ type Exec struct { parser parsers.Parser - wg sync.WaitGroup - runner Runner errChan chan error } @@ -119,8 +117,8 @@ func (c CommandRunner) Run( return out.Bytes(), nil } -func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { - defer e.wg.Done() +func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) { + defer wg.Done() out, err := e.runner.Run(e, command, acc) if err != nil { @@ -151,6 +149,7 @@ func (e *Exec) SetParser(parser parsers.Parser) { } func (e *Exec) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup // Legacy single command support if e.Command != "" { e.Commands = append(e.Commands, e.Command) @@ -190,11 +189,11 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error { errChan := errchan.New(len(commands)) e.errChan = errChan.C - e.wg.Add(len(commands)) + wg.Add(len(commands)) for _, command := range commands { - go e.ProcessCommand(command, acc) + go e.ProcessCommand(command, acc, &wg) } - e.wg.Wait() + wg.Wait() return errChan.Error() } From 8c7edeb53bfdf07f51d7d809399c22aee9905679 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Fri, 1 Jul 2016 08:49:48 -0600 Subject: [PATCH 07/13] allow measurement to be defined for logparser_grok plugin --- plugins/inputs/logparser/grok/grok.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/logparser/grok/grok.go b/plugins/inputs/logparser/grok/grok.go index a463c0f6a..54ecb464b 100644 --- a/plugins/inputs/logparser/grok/grok.go +++ b/plugins/inputs/logparser/grok/grok.go @@ -56,6 +56,7 @@ type Parser struct { Patterns []string CustomPatterns string CustomPatternFiles []string + Measurement string // typeMap is a map of patterns -> capture name -> modifier, // ie, { @@ -114,6 +115,10 @@ func (p *Parser) Compile() error { p.addCustomPatterns(scanner) } + if p.Measurement == "" { + p.Measurement = "logparser_grok" + } + return p.compileCustomPatterns() } @@ -215,7 +220,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { } } - return telegraf.NewMetric("logparser_grok", tags, fields, p.tsModder.tsMod(timestamp)) + return telegraf.NewMetric(p.Measurement, tags, fields, p.tsModder.tsMod(timestamp)) } func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) { From 5dc4cce15712d7000e30506e3100d8771a631e82 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 18 Jul 2016 12:27:46 +0100 Subject: [PATCH 08/13] Fixup adding 'measurement' to logparser grok closes #1434 --- CHANGELOG.md | 1 + plugins/inputs/logparser/grok/grok.go | 2 +- plugins/inputs/logparser/grok/grok_test.go | 26 ++++++++++++++++++++++ plugins/inputs/logparser/logparser.go | 2 ++ 4 files changed, 30 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e8dd69cf..2be040bf3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ should now look like: - [#1387](https://github.com/influxdata/telegraf/pull/1387): **Breaking Change** - Redis `role` tag renamed to `replication_role` to avoid global_tags override - [#1437](https://github.com/influxdata/telegraf/pull/1437): Fetching Galera status metrics in MySQL - [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib. +- [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin. ### Bugfixes diff --git a/plugins/inputs/logparser/grok/grok.go b/plugins/inputs/logparser/grok/grok.go index 54ecb464b..16e62b223 100644 --- a/plugins/inputs/logparser/grok/grok.go +++ b/plugins/inputs/logparser/grok/grok.go @@ -56,7 +56,7 @@ type Parser struct { Patterns []string CustomPatterns string CustomPatternFiles []string - Measurement string + Measurement string // typeMap is a map of patterns -> capture name -> modifier, // ie, { diff --git a/plugins/inputs/logparser/grok/grok_test.go b/plugins/inputs/logparser/grok/grok_test.go index 02f69f67a..979553f88 100644 --- a/plugins/inputs/logparser/grok/grok_test.go +++ b/plugins/inputs/logparser/grok/grok_test.go @@ -83,6 +83,32 @@ func Benchmark_ParseLine_CustomPattern(b *testing.B) { benchM = m } +func TestMeasurementName(t *testing.T) { + p := &Parser{ + Measurement: "my_web_log", + Patterns: []string{"%{COMMON_LOG_FORMAT}"}, + } + assert.NoError(t, p.Compile()) + + // Parse an influxdb POST request + m, err := p.ParseLine(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`) + require.NotNil(t, m) + assert.NoError(t, err) + assert.Equal(t, + map[string]interface{}{ + "resp_bytes": int64(2326), + "auth": "frank", + "client_ip": "127.0.0.1", + "resp_code": int64(200), + "http_version": float64(1.0), + "ident": "user-identifier", + "request": "/apache_pb.gif", + }, + m.Fields()) + assert.Equal(t, map[string]string{"verb": "GET"}, m.Tags()) + assert.Equal(t, "my_web_log", m.Name()) +} + func TestBuiltinInfluxdbHttpd(t *testing.T) { p := &Parser{ Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"}, diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index 82003582f..4737ace65 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -58,6 +58,8 @@ const sampleConfig = ` ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) patterns = ["%{INFLUXDB_HTTPD_LOG}"] + ## Name of the outputted measurement name. + measurement = "influxdb_log" ## Full path(s) to custom pattern files. custom_pattern_files = [] ## Custom patterns can also be defined here. Put one pattern per line. From 1c2965703dbc2f989ce4a0974d4769009b966048 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20de=20Metz?= Date: Mon, 18 Jul 2016 13:41:13 +0200 Subject: [PATCH 09/13] Webhooks plugin: add mandrill (#1408) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add mandrill webhook. * Store the id of the msg as part of event. Signed-off-by: Cyril Duez Signed-off-by: François de Metz * Decode body to get the mandrill_events. Signed-off-by: Cyril Duez Signed-off-by: François de Metz * Handle HEAD request. Signed-off-by: Cyril Duez Signed-off-by: François de Metz * Add the README. Signed-off-by: Cyril Duez Signed-off-by: François de Metz * Add mandrill_webhooks to the README. Signed-off-by: Cyril Duez Signed-off-by: François de Metz * Update changelog. Signed-off-by: Cyril Duez Signed-off-by: François de Metz * Run gofmt. Signed-off-by: Cyril Duez Signed-off-by: François de Metz --- CHANGELOG.md | 1 + README.md | 1 + plugins/inputs/webhooks/README.md | 1 + plugins/inputs/webhooks/mandrill/README.md | 15 ++++ .../webhooks/mandrill/mandrill_webhooks.go | 56 ++++++++++++ .../mandrill/mandrill_webhooks_events.go | 24 ++++++ .../mandrill_webhooks_events_json_test.go | 58 +++++++++++++ .../mandrill/mandrill_webhooks_test.go | 85 +++++++++++++++++++ plugins/inputs/webhooks/webhooks.go | 9 +- 9 files changed, 248 insertions(+), 2 deletions(-) create mode 100644 plugins/inputs/webhooks/mandrill/README.md create mode 100644 plugins/inputs/webhooks/mandrill/mandrill_webhooks.go create mode 100644 plugins/inputs/webhooks/mandrill/mandrill_webhooks_events.go create mode 100644 plugins/inputs/webhooks/mandrill/mandrill_webhooks_events_json_test.go create mode 100644 plugins/inputs/webhooks/mandrill/mandrill_webhooks_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 2be040bf3..46239894f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ should now look like: - [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez! - [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin. +- [#1408](https://github.com/influxdata/telegraf/pull/1408): mandrill webhook plugin. - [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests. - [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin. - [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD. diff --git a/README.md b/README.md index 8264be7f6..738f9eaea 100644 --- a/README.md +++ b/README.md @@ -219,6 +219,7 @@ Telegraf can also collect metrics via the following service plugins: * [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer) * [webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks) * [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github) + * [mandrill](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/mandrill) * [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar) * [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer) diff --git a/plugins/inputs/webhooks/README.md b/plugins/inputs/webhooks/README.md index 5a42f6ea7..86e6685b8 100644 --- a/plugins/inputs/webhooks/README.md +++ b/plugins/inputs/webhooks/README.md @@ -16,6 +16,7 @@ $ sudo service telegraf start ## Available webhooks - [Github](github/) +- [Mandrill](mandrill/) - [Rollbar](rollbar/) ## Adding new webhooks plugin diff --git a/plugins/inputs/webhooks/mandrill/README.md b/plugins/inputs/webhooks/mandrill/README.md new file mode 100644 index 000000000..2fb4914e1 --- /dev/null +++ b/plugins/inputs/webhooks/mandrill/README.md @@ -0,0 +1,15 @@ +# mandrill webhook + +You should configure your Mandrill's Webhooks to point at the `webhooks` service. To do this go to `mandrillapp.com/` and click `Settings > Webhooks`. In the resulting page, click on `Add a Webhook`, select all events, and set the `URL` to `http://:1619/mandrill`, and click on `Create Webhook`. + +## Events + +See the [webhook doc](https://mandrill.zendesk.com/hc/en-us/articles/205583307-Message-Event-Webhook-format). + +All events for logs the original timestamp, the event name and the unique identifier of the message that generated the event. + +**Tags:** +* 'event' = `event.event` string + +**Fields:** +* 'id' = `event._id` string diff --git a/plugins/inputs/webhooks/mandrill/mandrill_webhooks.go b/plugins/inputs/webhooks/mandrill/mandrill_webhooks.go new file mode 100644 index 000000000..e9d4a6de4 --- /dev/null +++ b/plugins/inputs/webhooks/mandrill/mandrill_webhooks.go @@ -0,0 +1,56 @@ +package mandrill + +import ( + "encoding/json" + "io/ioutil" + "log" + "net/http" + "net/url" + "time" + + "github.com/gorilla/mux" + "github.com/influxdata/telegraf" +) + +type MandrillWebhook struct { + Path string + acc telegraf.Accumulator +} + +func (md *MandrillWebhook) Register(router *mux.Router, acc telegraf.Accumulator) { + router.HandleFunc(md.Path, md.returnOK).Methods("HEAD") + router.HandleFunc(md.Path, md.eventHandler).Methods("POST") + + log.Printf("Started the webhooks_mandrill on %s\n", md.Path) + md.acc = acc +} + +func (md *MandrillWebhook) returnOK(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) +} + +func (md *MandrillWebhook) eventHandler(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + body, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + data, err := url.ParseQuery(string(body)) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + var events []MandrillEvent + err = json.Unmarshal([]byte(data.Get("mandrill_events")), &events) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + for _, event := range events { + md.acc.AddFields("mandrill_webhooks", event.Fields(), event.Tags(), time.Unix(event.TimeStamp, 0)) + } + + w.WriteHeader(http.StatusOK) +} diff --git a/plugins/inputs/webhooks/mandrill/mandrill_webhooks_events.go b/plugins/inputs/webhooks/mandrill/mandrill_webhooks_events.go new file mode 100644 index 000000000..b36b13e54 --- /dev/null +++ b/plugins/inputs/webhooks/mandrill/mandrill_webhooks_events.go @@ -0,0 +1,24 @@ +package mandrill + +type Event interface { + Tags() map[string]string + Fields() map[string]interface{} +} + +type MandrillEvent struct { + EventName string `json:"event"` + TimeStamp int64 `json:"ts"` + Id string `json:"_id"` +} + +func (me *MandrillEvent) Tags() map[string]string { + return map[string]string{ + "event": me.EventName, + } +} + +func (me *MandrillEvent) Fields() map[string]interface{} { + return map[string]interface{}{ + "id": me.Id, + } +} diff --git a/plugins/inputs/webhooks/mandrill/mandrill_webhooks_events_json_test.go b/plugins/inputs/webhooks/mandrill/mandrill_webhooks_events_json_test.go new file mode 100644 index 000000000..4ab385e18 --- /dev/null +++ b/plugins/inputs/webhooks/mandrill/mandrill_webhooks_events_json_test.go @@ -0,0 +1,58 @@ +package mandrill + +func SendEventJSON() string { + return ` + { + "event": "send", + "msg": { + "ts": 1365109999, + "subject": "This an example webhook message", + "email": "example.webhook@mandrillapp.com", + "sender": "example.sender@mandrillapp.com", + "tags": [ + "webhook-example" + ], + "opens": [ + + ], + "clicks": [ + + ], + "state": "sent", + "metadata": { + "user_id": 111 + }, + "_id": "exampleaaaaaaaaaaaaaaaaaaaaaaaaa", + "_version": "exampleaaaaaaaaaaaaaaa" + }, + "_id": "id1", + "ts": 1384954004 + }` +} + +func HardBounceEventJSON() string { + return ` + { + "event": "hard_bounce", + "msg": { + "ts": 1365109999, + "subject": "This an example webhook message", + "email": "example.webhook@mandrillapp.com", + "sender": "example.sender@mandrillapp.com", + "tags": [ + "webhook-example" + ], + "state": "bounced", + "metadata": { + "user_id": 111 + }, + "_id": "exampleaaaaaaaaaaaaaaaaaaaaaaaaa2", + "_version": "exampleaaaaaaaaaaaaaaa", + "bounce_description": "bad_mailbox", + "bgtools_code": 10, + "diag": "smtp;550 5.1.1 The email account that you tried to reach does not exist. Please try double-checking the recipient's email address for typos or unnecessary spaces." + }, + "_id": "id2", + "ts": 1384954004 + }` +} diff --git a/plugins/inputs/webhooks/mandrill/mandrill_webhooks_test.go b/plugins/inputs/webhooks/mandrill/mandrill_webhooks_test.go new file mode 100644 index 000000000..94ac68684 --- /dev/null +++ b/plugins/inputs/webhooks/mandrill/mandrill_webhooks_test.go @@ -0,0 +1,85 @@ +package mandrill + +import ( + "github.com/influxdata/telegraf/testutil" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" +) + +func postWebhooks(md *MandrillWebhook, eventBody string) *httptest.ResponseRecorder { + body := url.Values{} + body.Set("mandrill_events", eventBody) + req, _ := http.NewRequest("POST", "/mandrill", strings.NewReader(body.Encode())) + w := httptest.NewRecorder() + + md.eventHandler(w, req) + + return w +} + +func headRequest(md *MandrillWebhook) *httptest.ResponseRecorder { + req, _ := http.NewRequest("HEAD", "/mandrill", strings.NewReader("")) + w := httptest.NewRecorder() + + md.returnOK(w, req) + + return w +} + +func TestHead(t *testing.T) { + md := &MandrillWebhook{Path: "/mandrill"} + resp := headRequest(md) + if resp.Code != http.StatusOK { + t.Errorf("HEAD returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) + } +} + +func TestSendEvent(t *testing.T) { + var acc testutil.Accumulator + md := &MandrillWebhook{Path: "/mandrill", acc: &acc} + resp := postWebhooks(md, "["+SendEventJSON()+"]") + if resp.Code != http.StatusOK { + t.Errorf("POST send returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) + } + + fields := map[string]interface{}{ + "id": "id1", + } + + tags := map[string]string{ + "event": "send", + } + + acc.AssertContainsTaggedFields(t, "mandrill_webhooks", fields, tags) +} + +func TestMultipleEvents(t *testing.T) { + var acc testutil.Accumulator + md := &MandrillWebhook{Path: "/mandrill", acc: &acc} + resp := postWebhooks(md, "["+SendEventJSON()+","+HardBounceEventJSON()+"]") + if resp.Code != http.StatusOK { + t.Errorf("POST send returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) + } + + fields := map[string]interface{}{ + "id": "id1", + } + + tags := map[string]string{ + "event": "send", + } + + acc.AssertContainsTaggedFields(t, "mandrill_webhooks", fields, tags) + + fields = map[string]interface{}{ + "id": "id2", + } + + tags = map[string]string{ + "event": "hard_bounce", + } + acc.AssertContainsTaggedFields(t, "mandrill_webhooks", fields, tags) +} diff --git a/plugins/inputs/webhooks/webhooks.go b/plugins/inputs/webhooks/webhooks.go index d8c74850a..884435c36 100644 --- a/plugins/inputs/webhooks/webhooks.go +++ b/plugins/inputs/webhooks/webhooks.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs/webhooks/github" + "github.com/influxdata/telegraf/plugins/inputs/webhooks/mandrill" "github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar" ) @@ -25,8 +26,9 @@ func init() { type Webhooks struct { ServiceAddress string - Github *github.GithubWebhook - Rollbar *rollbar.RollbarWebhook + Github *github.GithubWebhook + Mandrill *mandrill.MandrillWebhook + Rollbar *rollbar.RollbarWebhook } func NewWebhooks() *Webhooks { @@ -41,6 +43,9 @@ func (wb *Webhooks) SampleConfig() string { [inputs.webhooks.github] path = "/github" + [inputs.webhooks.mandrill] + path = "/mandrill" + [inputs.webhooks.rollbar] path = "/rollbar" ` From 281a4d550021f88ea36eb05b3c0536b0ad6c68f6 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 18 Jul 2016 12:54:33 +0100 Subject: [PATCH 10/13] Change resp_code from field to tag in logparser closes #1479 --- CHANGELOG.md | 1 + plugins/inputs/logparser/grok/grok_test.go | 15 +++++---------- plugins/inputs/logparser/grok/influx_patterns.go | 2 +- .../logparser/grok/patterns/influx-patterns | 2 +- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 46239894f..a0f0cca16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ should now look like: - [#1437](https://github.com/influxdata/telegraf/pull/1437): Fetching Galera status metrics in MySQL - [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib. - [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin. +- [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag. ### Bugfixes diff --git a/plugins/inputs/logparser/grok/grok_test.go b/plugins/inputs/logparser/grok/grok_test.go index 979553f88..1181e85ae 100644 --- a/plugins/inputs/logparser/grok/grok_test.go +++ b/plugins/inputs/logparser/grok/grok_test.go @@ -99,13 +99,12 @@ func TestMeasurementName(t *testing.T) { "resp_bytes": int64(2326), "auth": "frank", "client_ip": "127.0.0.1", - "resp_code": int64(200), "http_version": float64(1.0), "ident": "user-identifier", "request": "/apache_pb.gif", }, m.Fields()) - assert.Equal(t, map[string]string{"verb": "GET"}, m.Tags()) + assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags()) assert.Equal(t, "my_web_log", m.Name()) } @@ -124,7 +123,6 @@ func TestBuiltinInfluxdbHttpd(t *testing.T) { "resp_bytes": int64(0), "auth": "-", "client_ip": "::1", - "resp_code": int64(204), "http_version": float64(1.1), "ident": "-", "referrer": "-", @@ -133,7 +131,7 @@ func TestBuiltinInfluxdbHttpd(t *testing.T) { "agent": "InfluxDBClient", }, m.Fields()) - assert.Equal(t, map[string]string{"verb": "POST"}, m.Tags()) + assert.Equal(t, map[string]string{"verb": "POST", "resp_code": "204"}, m.Tags()) // Parse an influxdb GET request m, err = p.ParseLine(`[httpd] ::1 - - [14/Jun/2016:12:10:02 +0100] "GET /query?db=telegraf&q=SELECT+bytes%2Cresponse_time_us+FROM+logparser_grok+WHERE+http_method+%3D+%27GET%27+AND+response_time_us+%3E+0+AND+time+%3E+now%28%29+-+1h HTTP/1.1" 200 578 "http://localhost:8083/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.84 Safari/537.36" 8a3806f1-3220-11e6-8006-000000000000 988`) @@ -144,7 +142,6 @@ func TestBuiltinInfluxdbHttpd(t *testing.T) { "resp_bytes": int64(578), "auth": "-", "client_ip": "::1", - "resp_code": int64(200), "http_version": float64(1.1), "ident": "-", "referrer": "http://localhost:8083/", @@ -153,7 +150,7 @@ func TestBuiltinInfluxdbHttpd(t *testing.T) { "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.84 Safari/537.36", }, m.Fields()) - assert.Equal(t, map[string]string{"verb": "GET"}, m.Tags()) + assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags()) } // common log format @@ -173,13 +170,12 @@ func TestBuiltinCommonLogFormat(t *testing.T) { "resp_bytes": int64(2326), "auth": "frank", "client_ip": "127.0.0.1", - "resp_code": int64(200), "http_version": float64(1.0), "ident": "user-identifier", "request": "/apache_pb.gif", }, m.Fields()) - assert.Equal(t, map[string]string{"verb": "GET"}, m.Tags()) + assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags()) } // combined log format @@ -199,7 +195,6 @@ func TestBuiltinCombinedLogFormat(t *testing.T) { "resp_bytes": int64(2326), "auth": "frank", "client_ip": "127.0.0.1", - "resp_code": int64(200), "http_version": float64(1.0), "ident": "user-identifier", "request": "/apache_pb.gif", @@ -207,7 +202,7 @@ func TestBuiltinCombinedLogFormat(t *testing.T) { "agent": "Mozilla", }, m.Fields()) - assert.Equal(t, map[string]string{"verb": "GET"}, m.Tags()) + assert.Equal(t, map[string]string{"verb": "GET", "resp_code": "200"}, m.Tags()) } func TestCompileStringAndParse(t *testing.T) { diff --git a/plugins/inputs/logparser/grok/influx_patterns.go b/plugins/inputs/logparser/grok/influx_patterns.go index 0622c61ef..53be0e20d 100644 --- a/plugins/inputs/logparser/grok/influx_patterns.go +++ b/plugins/inputs/logparser/grok/influx_patterns.go @@ -66,7 +66,7 @@ INFLUXDB_HTTPD_LOG \[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:r # apache & nginx logs, this is also known as the "common log format" # see https://en.wikipedia.org/wiki/Common_Log_Format -COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:int} (?:%{NUMBER:resp_bytes:int}|-) +COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) # Combined log format is the same as the common log format but with the addition # of two quoted strings at the end for "referrer" and "agent" diff --git a/plugins/inputs/logparser/grok/patterns/influx-patterns b/plugins/inputs/logparser/grok/patterns/influx-patterns index f4d375f4d..1db74a17a 100644 --- a/plugins/inputs/logparser/grok/patterns/influx-patterns +++ b/plugins/inputs/logparser/grok/patterns/influx-patterns @@ -62,7 +62,7 @@ INFLUXDB_HTTPD_LOG \[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:r # apache & nginx logs, this is also known as the "common log format" # see https://en.wikipedia.org/wiki/Common_Log_Format -COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:int} (?:%{NUMBER:resp_bytes:int}|-) +COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) # Combined log format is the same as the common log format but with the addition # of two quoted strings at the end for "referrer" and "agent" From dabb6f54663ca16f8c62d0f725fc3e302b98e87d Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 18 Jul 2016 14:44:25 +0100 Subject: [PATCH 11/13] Internally name all patterns for log parsing flexibility closes #1436 This also fixes the bad behavior of waiting until runtime to return log parsing pattern compile errors when a pattern was simply unfound. closes #1418 Also protect against user error when the telegraf user does not have permission to open the provided file. We will now error and exit in this case, rather than silently waiting to get permission to open it. --- CHANGELOG.md | 2 ++ plugins/inputs/logparser/grok/grok.go | 22 ++++++++++-- plugins/inputs/logparser/grok/grok_test.go | 39 ++++++++++++++++++++-- plugins/inputs/logparser/logparser.go | 33 +++++++++--------- plugins/inputs/logparser/logparser_test.go | 7 ++-- plugins/inputs/tail/tail.go | 7 ++-- 6 files changed, 84 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a0f0cca16..99e8ffe56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,8 @@ should now look like: - [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues. - [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix. - [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin +- [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config. +- [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors. ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/inputs/logparser/grok/grok.go b/plugins/inputs/logparser/grok/grok.go index 16e62b223..d8691d7b9 100644 --- a/plugins/inputs/logparser/grok/grok.go +++ b/plugins/inputs/logparser/grok/grok.go @@ -53,7 +53,12 @@ var ( ) type Parser struct { - Patterns []string + Patterns []string + // namedPatterns is a list of internally-assigned names to the patterns + // specified by the user in Patterns. + // They will look like: + // GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc. + namedPatterns []string CustomPatterns string CustomPatternFiles []string Measurement string @@ -98,13 +103,24 @@ func (p *Parser) Compile() error { return err } - p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns + // Give Patterns fake names so that they can be treated as named + // "custom patterns" + p.namedPatterns = make([]string, len(p.Patterns)) + for i, pattern := range p.Patterns { + name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i) + p.CustomPatterns += "\n" + name + " " + pattern + "\n" + p.namedPatterns[i] = "%{" + name + "}" + } + // Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse + // them together as the same type of pattern. + p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns if len(p.CustomPatterns) != 0 { scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns)) p.addCustomPatterns(scanner) } + // Parse any custom pattern files supplied. for _, filename := range p.CustomPatternFiles { file, err := os.Open(filename) if err != nil { @@ -127,7 +143,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { var values map[string]string // the matching pattern string var patternName string - for _, pattern := range p.Patterns { + for _, pattern := range p.namedPatterns { if values, err = p.g.Parse(pattern, line); err != nil { return nil, err } diff --git a/plugins/inputs/logparser/grok/grok_test.go b/plugins/inputs/logparser/grok/grok_test.go index 1181e85ae..295f32609 100644 --- a/plugins/inputs/logparser/grok/grok_test.go +++ b/plugins/inputs/logparser/grok/grok_test.go @@ -207,7 +207,7 @@ func TestBuiltinCombinedLogFormat(t *testing.T) { func TestCompileStringAndParse(t *testing.T) { p := &Parser{ - Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, + Patterns: []string{"%{TEST_LOG_A}"}, CustomPatterns: ` DURATION %{NUMBER}[nuµm]?s RESPONSE_CODE %{NUMBER:response_code:tag} @@ -230,6 +230,41 @@ func TestCompileStringAndParse(t *testing.T) { assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags()) } +func TestCompileErrorsOnInvalidPattern(t *testing.T) { + p := &Parser{ + Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, + CustomPatterns: ` + DURATION %{NUMBER}[nuµm]?s + RESPONSE_CODE %{NUMBER:response_code:tag} + RESPONSE_TIME %{DURATION:response_time:duration} + TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME} + `, + } + assert.Error(t, p.Compile()) + + metricA, _ := p.ParseLine(`1.25 200 192.168.1.1 5.432µs`) + require.Nil(t, metricA) +} + +func TestParsePatternsWithoutCustom(t *testing.T) { + p := &Parser{ + Patterns: []string{"%{POSINT:ts:ts-epochnano} response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}"}, + } + assert.NoError(t, p.Compile()) + + metricA, err := p.ParseLine(`1466004605359052000 response_time=20821 mymetric=10890.645`) + require.NotNil(t, metricA) + assert.NoError(t, err) + assert.Equal(t, + map[string]interface{}{ + "response_time": int64(20821), + "metric": float64(10890.645), + }, + metricA.Fields()) + assert.Equal(t, map[string]string{}, metricA.Tags()) + assert.Equal(t, time.Unix(0, 1466004605359052000), metricA.Time()) +} + func TestParseEpochNano(t *testing.T) { p := &Parser{ Patterns: []string{"%{MYAPP}"}, @@ -413,7 +448,7 @@ func TestParseErrors(t *testing.T) { TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:int} %{} `, } - assert.NoError(t, p.Compile()) + assert.Error(t, p.Compile()) _, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] notnumber 200 192.168.1.1 5.432µs 101`) assert.Error(t, err) diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index 4737ace65..6b29ea031 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -9,6 +9,7 @@ import ( "github.com/hpcloud/tail" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/inputs" @@ -110,11 +111,15 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { } // compile log parser patterns: + errChan := errchan.New(len(l.parsers)) for _, parser := range l.parsers { if err := parser.Compile(); err != nil { - return err + errChan.C <- err } } + if err := errChan.Error(); err != nil { + return err + } var seek tail.SeekInfo if !l.FromBeginning { @@ -125,24 +130,25 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { l.wg.Add(1) go l.parser() - var errS string // Create a "tailer" for each file for _, filepath := range l.Files { g, err := globpath.Compile(filepath) if err != nil { log.Printf("ERROR Glob %s failed to compile, %s", filepath, err) + continue } - for file, _ := range g.Match() { + files := g.Match() + errChan = errchan.New(len(files)) + for file, _ := range files { tailer, err := tail.TailFile(file, tail.Config{ - ReOpen: true, - Follow: true, - Location: &seek, + ReOpen: true, + Follow: true, + Location: &seek, + MustExist: true, }) - if err != nil { - errS += err.Error() + " " - continue - } + errChan.C <- err + // create a goroutine for each "tailer" l.wg.Add(1) go l.receiver(tailer) @@ -150,10 +156,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { } } - if errS != "" { - return fmt.Errorf(errS) - } - return nil + return errChan.Error() } // receiver is launched as a goroutine to continuously watch a tailed logfile @@ -201,8 +204,6 @@ func (l *LogParserPlugin) parser() { if m != nil { l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) } - } else { - log.Printf("Malformed log line in [%s], Error: %s\n", line, err) } } } diff --git a/plugins/inputs/logparser/logparser_test.go b/plugins/inputs/logparser/logparser_test.go index 095b627ef..97f33067e 100644 --- a/plugins/inputs/logparser/logparser_test.go +++ b/plugins/inputs/logparser/logparser_test.go @@ -37,7 +37,7 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, logparser.Start(&acc)) + assert.Error(t, logparser.Start(&acc)) time.Sleep(time.Millisecond * 500) logparser.Stop() @@ -80,6 +80,8 @@ func TestGrokParseLogFiles(t *testing.T) { map[string]string{}) } +// Test that test_a.log line gets parsed even though we don't have the correct +// pattern available for test_b.log func TestGrokParseLogFilesOneBad(t *testing.T) { thisdir := getCurrentDir() p := &grok.Parser{ @@ -90,11 +92,12 @@ func TestGrokParseLogFilesOneBad(t *testing.T) { logparser := &LogParserPlugin{ FromBeginning: true, - Files: []string{thisdir + "grok/testdata/*.log"}, + Files: []string{thisdir + "grok/testdata/test_a.log"}, GrokParser: p, } acc := testutil.Accumulator{} + acc.SetDebug(true) assert.NoError(t, logparser.Start(&acc)) time.Sleep(time.Millisecond * 500) diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index 7386e053d..942fd6bae 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -86,9 +86,10 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { for file, _ := range g.Match() { tailer, err := tail.TailFile(file, tail.Config{ - ReOpen: true, - Follow: true, - Location: &seek, + ReOpen: true, + Follow: true, + Location: &seek, + MustExist: true, }) if err != nil { errS += err.Error() + " " From b58cd78c79f3326bd6be9b76a286f4a5ac8a5fcd Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 18 Jul 2016 17:26:44 +0100 Subject: [PATCH 12/13] Use errchan in redis input plugin this may address, or at least log issue #1462 --- plugins/inputs/redis/redis.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index fc50387df..649786c2c 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -12,6 +12,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -96,9 +97,7 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error { } var wg sync.WaitGroup - - var outerr error - + errChan := errchan.New(len(r.Servers)) for _, serv := range r.Servers { if !strings.HasPrefix(serv, "tcp://") || !strings.HasPrefix(serv, "unix://") { serv = "tcp://" + serv @@ -123,13 +122,12 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(serv string) { defer wg.Done() - outerr = r.gatherServer(u, acc) + errChan.C <- r.gatherServer(u, acc) }(serv) } wg.Wait() - - return outerr + return errChan.Error() } func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { From 03d02fa67a06b73614cae657f36adb8dd7e147ba Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 18 Jul 2016 17:37:21 +0100 Subject: [PATCH 13/13] Telegraf v1.0 beta 3 --- CHANGELOG.md | 2 ++ Godeps | 1 + README.md | 18 +++++++++--------- plugins/inputs/aerospike/aerospike.go | 2 +- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 99e8ffe56..5aa149a89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## v1.0 [unreleased] +## v1.0 beta 3 [2016-07-18] + ### Release Notes **Breaking Change**: Aerospike main server node measurements have been renamed diff --git a/Godeps b/Godeps index 1546bb627..5caa6a9e2 100644 --- a/Godeps +++ b/Godeps @@ -46,6 +46,7 @@ github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8 github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f github.com/shirou/gopsutil 586bb697f3ec9f8ec08ffefe18f521a64534037c github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d +github.com/sparrc/aerospike-client-go d4bb42d2c2d39dae68e054116f4538af189e05d5 github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2 diff --git a/README.md b/README.md index 738f9eaea..aa8d9e039 100644 --- a/README.md +++ b/README.md @@ -20,12 +20,12 @@ new plugins. ### Linux deb and rpm Packages: Latest: -* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta2_amd64.deb -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta2.x86_64.rpm +* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta3_amd64.deb +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta3.x86_64.rpm Latest (arm): -* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta2_armhf.deb -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta2.armhf.rpm +* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta3_armhf.deb +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta3.armhf.rpm ##### Package Instructions: @@ -46,14 +46,14 @@ to use this repo to install & update telegraf. ### Linux tarballs: Latest: -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta2_linux_amd64.tar.gz -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta2_linux_i386.tar.gz -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta2_linux_armhf.tar.gz +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_linux_amd64.tar.gz +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_linux_i386.tar.gz +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_linux_armhf.tar.gz ### FreeBSD tarball: Latest: -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta2_freebsd_amd64.tar.gz +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_freebsd_amd64.tar.gz ### Ansible Role: @@ -69,7 +69,7 @@ brew install telegraf ### Windows Binaries (EXPERIMENTAL) Latest: -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta2_windows_amd64.zip +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_windows_amd64.zip ### From Source: diff --git a/plugins/inputs/aerospike/aerospike.go b/plugins/inputs/aerospike/aerospike.go index 4bb652c0a..29e51cb82 100644 --- a/plugins/inputs/aerospike/aerospike.go +++ b/plugins/inputs/aerospike/aerospike.go @@ -11,7 +11,7 @@ import ( "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" - as "github.com/aerospike/aerospike-client-go" + as "github.com/sparrc/aerospike-client-go" ) type Aerospike struct {