Initial redis consumer input plugin

This commit is contained in:
Justin McCarty 2016-09-23 14:25:22 -04:00
parent 9ea3dbeee8
commit c0bb5e8cf2
6 changed files with 405 additions and 0 deletions

2
Godeps
View File

@ -58,7 +58,9 @@ github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363
golang.org/x/crypto 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3
golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172
golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34
gopkg.in/bsm/ratelimit.v1 db14e161995a5177acef654cb0dd785e8ee8bc22
gopkg.in/dancannon/gorethink.v1 7d1af5be49cb5ecc7b177bf387d232050299d6ef
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/mgo.v2 d90005c5262a3463800497ea5a89aed5fe22c886
gopkg.in/redis.v4 938235994ea88a05678f8060741d5f34ed6a5ff3
gopkg.in/yaml.v2 a83829b6f1293c91addabc89d0571c246397bbf4

View File

@ -219,6 +219,7 @@ Telegraf can also collect metrics via the following service plugins:
* [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer)
* [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer)
* [logparser](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/logparser)
* [redis_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/redis_consumer)
* [statsd](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd)
* [tail](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/tail)
* [tcp_listener](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/tcp_listener)

View File

@ -59,6 +59,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/rabbitmq"
_ "github.com/influxdata/telegraf/plugins/inputs/raindrops"
_ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/redis_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak"
_ "github.com/influxdata/telegraf/plugins/inputs/sensors"

View File

@ -0,0 +1,33 @@
# Redis Consumer Input Plugin
The [Redis](http://http://redis.io//) consumer plugin subscribes to one or more
Redis channels and adds messages to InfluxDB. Multiple Redis servers may be specified
at a time. The Redis consumer may be configured to use both standard channel names or
patterned channel names.
## Configuration
```toml
# Read metrics from Redis channel(s)
[[inputs.redis_consumer]]
servers = ["tcp://localhost:6379"]
## List of channels to listen to. Selecting channels using Redis'
## pattern-matching is allowed, e.g.:
## channels = ["telegraf:*", "app_[1-3]"]
##
## See http://redis.io/topics/pubsub#pattern-matching-subscriptions for
## more info.
channels = ["telegraf"]
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```
## Testing
Running integration tests requires running Redis. See Makefile
for redis container command.

View File

@ -0,0 +1,216 @@
package redis_consumer
import (
"fmt"
"log"
"regexp"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"gopkg.in/redis.v4"
)
// RedisConsumer represents a redis consumer for Telegraf
type RedisConsumer struct {
Servers []string
Channels []string
clients []*redis.Client
acc telegraf.Accumulator
accumLock sync.Mutex
parser parsers.Parser
pubsubs []*redis.PubSub
finish chan struct{}
}
var sampleConfig = `
## Specify servers via a url matching:
## [protocol://][:password]@address[:port]
## e.g.
## tcp://localhost:6379
## tcp://:password@192.168.99.100
##
## If no servers are specified, then localhost is used as the host.
## If no port is specified, 6379 is used
servers = ["tcp://localhost:6379"]
## List of channels to listen to. Selecting channels using Redis'
## pattern-matching is allowed, e.g.:
## channels = ["telegraf:*", "app_[1-3]"]
##
## See http://redis.io/topics/pubsub#pattern-matching-subscriptions for
## more info.
channels = ["telegraf"]
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
func parseChannels(channels []string) (subs, psubs []string, err error) {
err = nil
subs = make([]string, 0)
psubs = make([]string, 0)
for _, channel := range channels {
if matched, fail := regexp.MatchString(`[^\\][\[|\(|\*]`, channel); fail != nil {
err = fmt.Errorf("Could not parse %s : %v", channel, fail)
return
} else if matched {
psubs = append(psubs, channel)
} else {
subs = append(subs, channel)
}
}
return
}
func createClient(server string) (*redis.Client, error) {
client := redis.NewClient(&redis.Options{
Addr: server,
})
if _, err := client.Ping().Result(); err != nil {
return client, fmt.Errorf("Unable to ping redis server %s : %v", server, err)
}
return client, nil
}
// SetParser allows the consumer to accept multiple data formats
func (r *RedisConsumer) SetParser(parser parsers.Parser) {
r.parser = parser
}
// SampleConfig provides a sample configuration for the redis consumer
func (r *RedisConsumer) SampleConfig() string {
return sampleConfig
}
// Description provides a description of the consumer
func (r *RedisConsumer) Description() string {
return "Reads metrics from Redis channels"
}
// Gather noop for the redis consumer
func (r *RedisConsumer) Gather(acc telegraf.Accumulator) error {
return nil
}
// Start starts fetching data from the redis server
func (r *RedisConsumer) Start(acc telegraf.Accumulator) error {
r.accumLock.Lock()
defer r.accumLock.Unlock()
r.acc = acc
if len(r.Servers) == 0 {
r.Servers = append(r.Servers, "tcp://localhost:6379")
}
// Verify every server can be connected
for _, server := range r.Servers {
var client *redis.Client
var err error
if client, err = createClient(server); err != nil {
return fmt.Errorf("Unable to crate redis server %s : %v", server, err)
}
r.clients = append(r.clients, client)
}
// Verify all subscriptions can be made
var err error
if r.pubsubs, err = r.createSubscriptions(); err != nil {
return err
}
r.finish = make(chan struct{})
// Start listening
for _, pubsub := range r.pubsubs {
go r.listen(pubsub)
}
return nil
}
func (r *RedisConsumer) createSubscriptions() ([]*redis.PubSub, error) {
subs, psubs, err := parseChannels(r.Channels)
if err != nil {
return nil, err
}
pubsubs := []*redis.PubSub{}
for _, c := range r.clients {
var s, ps *redis.PubSub
if len(subs) > 0 {
s, err = c.Subscribe(subs...)
if err != nil {
return nil, fmt.Errorf("Error during subscription creation: %v", err)
}
pubsubs = append(pubsubs, s)
}
if len(psubs) > 0 {
ps, err = c.PSubscribe(psubs...)
if err != nil {
return nil, fmt.Errorf("Error during psubscription creation: %v", err)
}
pubsubs = append(pubsubs, ps)
}
}
return pubsubs, nil
}
func (r *RedisConsumer) listen(pubsub *redis.PubSub) {
for {
msg, err := pubsub.ReceiveMessage()
// Check if the consumer is finishing
if err != nil {
select {
case <-r.finish:
pubsub.Close()
return
default:
// Nothing todo
}
}
metrics, merr := r.parser.Parse([]byte(msg.Payload))
if merr != nil {
log.Printf("Redis Parse Error.\n\tMessage: %s\n\tError: %v", msg.Payload, merr)
continue
}
for _, metric := range metrics {
r.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
}
}
// Stop stops fetching data from the redis server
func (r *RedisConsumer) Stop() error {
r.accumLock.Lock()
defer r.accumLock.Unlock()
close(r.finish)
for _, client := range r.clients {
if err := client.Close(); err != nil {
return fmt.Errorf("Error closing redis server: %v", err)
}
}
return nil
}
func init() {
inputs.Add("redis_consumer", func() telegraf.Input {
return &RedisConsumer{}
})
}

View File

@ -0,0 +1,152 @@
package redis_consumer
import (
"fmt"
"testing"
"time"
"gopkg.in/redis.v4"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestParseChannels(t *testing.T) {
psubChannel := `channel[1_3]`
plainSubChannel := `channel`
escapeSubChannel := `channel\*`
tests := []struct {
testName string
channels []string
psubCount int
subCount int
}{
{
testName: "normal sub",
channels: []string{plainSubChannel},
psubCount: 0,
subCount: 1,
},
{
testName: "psub",
channels: []string{psubChannel},
psubCount: 1,
subCount: 0,
},
{
testName: "escaped sub",
channels: []string{escapeSubChannel},
psubCount: 0,
subCount: 1,
},
{
testName: "all",
channels: []string{escapeSubChannel, psubChannel, plainSubChannel},
psubCount: 1,
subCount: 2,
},
}
for _, parseTest := range tests {
s, p, e := parseChannels(parseTest.channels)
if e != nil {
t.Errorf("Test %s had unexpected error %v", parseTest.testName, e)
}
if parseTest.subCount != len(s) {
t.Errorf("Test %s subchanel count. Expected %d Actual %d", parseTest.testName, parseTest.subCount, len(s))
}
if parseTest.psubCount != len(p) {
t.Errorf("Test %s psubchanel count. Expected %d Actual %d", parseTest.testName, parseTest.psubCount, len(p))
}
}
}
func TestRedisConnect(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
addr := fmt.Sprintf(testutil.GetLocalHost() + ":6379")
c, err := createClient(addr)
require.NoError(t, err)
c.Close()
}
func TestCreateSubscriptions(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
addr := fmt.Sprintf(testutil.GetLocalHost() + ":6379")
c, _ := createClient(addr)
defer c.Close()
r := &RedisConsumer{
Channels: []string{"test_channel_1", "test_channel_2, test_channel_[1_3]"},
clients: []*redis.Client{c},
}
pubsubs, err := r.createSubscriptions()
require.NoError(t, err)
assert.Equal(t, 2, len(pubsubs))
}
func TestRedisReceive(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
testChannel := "test_channel"
testMsg := "cpu_load,host=server01,region=us-west value=1.0 1444444444"
addr := fmt.Sprintf(testutil.GetLocalHost() + ":6379")
testClient, err := createClient(addr)
defer testClient.Close()
parser, _ := parsers.NewInfluxParser()
var acc testutil.Accumulator
require.NoError(t, err)
r := &RedisConsumer{
Servers: []string{addr},
Channels: []string{testChannel},
}
r.SetParser(parser)
if err = r.Start(&acc); err != nil {
t.Fatal(err.Error())
}
defer r.Stop()
testClient.Publish(testChannel, testMsg)
waitForPoint(&acc, 2, t)
if len(acc.Metrics) != 1 {
t.Error("Metric no receieved")
}
}
// Waits for the metric to arrive in the accumulator
func waitForPoint(acc *testutil.Accumulator, waitSeconds int, t *testing.T) {
intervalMS := 5
threshold := (waitSeconds * 1000) / intervalMS
ticker := time.NewTicker(time.Duration(intervalMS) * time.Millisecond)
counter := 0
for {
select {
case <-ticker.C:
counter++
if counter > threshold {
t.Fatalf("Waited for %ds, point never arrived to accumulator", waitSeconds)
} else if acc.NFields() == 1 {
return
}
}
}
}