From 815e9534b8afbf6ed7764c256b7a045c976b31a9 Mon Sep 17 00:00:00 2001 From: Tyler Nisonoff Date: Sat, 27 Jun 2015 12:56:27 -0700 Subject: [PATCH 1/5] fixed spelling mistake -- memoory -> memory --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d72701aaf..0d7223bc2 100644 --- a/README.md +++ b/README.md @@ -133,7 +133,7 @@ func Gather(acc plugins.Accumulator) error { } acc.Add("cpu", process.CPUTime, tags) - acc.Add("memoory", process.MemoryBytes, tags) + acc.Add("memory", process.MemoryBytes, tags) } } ``` From 5e1ba3fbb76be4499ff25d54feae4d21d5188c5a Mon Sep 17 00:00:00 2001 From: Todd Persen Date: Wed, 1 Jul 2015 12:36:52 -0700 Subject: [PATCH 2/5] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c478732ec..735175f41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## v0.1.2 [unreleased] +## v0.1.2 [2015-07-01] ### Features - [#12](https://github.com/influxdb/influxdb/pull/12): Add Linux/ARM to the list of built binaries. Thanks @voxxit! From c523ae2c52edb3f657e17559273c9102ec58fefd Mon Sep 17 00:00:00 2001 From: Todd Persen Date: Wed, 1 Jul 2015 12:37:44 -0700 Subject: [PATCH 3/5] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0d7223bc2..711903f51 100644 --- a/README.md +++ b/README.md @@ -13,8 +13,8 @@ We'll eagerly accept pull requests for new plugins and will manage the set of pl ### Linux packages for Debian/Ubuntu and RHEL/CentOS: ``` -http://get.influxdb.org/telegraf/telegraf_0.1.1_amd64.deb -http://get.influxdb.org/telegraf/telegraf-0.1.1-1.x86_64.rpm +http://get.influxdb.org/telegraf/telegraf_0.1.2_amd64.deb +http://get.influxdb.org/telegraf/telegraf-0.1.2-1.x86_64.rpm ``` ### OSX via Homebrew: From 6550d4f634f7b176a35096d1a06fe71d2b62fe70 Mon Sep 17 00:00:00 2001 From: Todd Persen Date: Thu, 2 Jul 2015 11:25:06 -0700 Subject: [PATCH 4/5] Update CHANGELOG.md --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 735175f41..85730cafd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,15 @@ ### Features - [#12](https://github.com/influxdb/influxdb/pull/12): Add Linux/ARM to the list of built binaries. Thanks @voxxit! - [#14](https://github.com/influxdb/influxdb/pull/14): Clarify the S3 buckets that Telegraf is pushed to. +- [#16](https://github.com/influxdb/influxdb/pull/16): Convert Redis to use URI, support Redis AUTH. Thanks @jipperinbham! +- [#21](https://github.com/influxdb/influxdb/pull/21): Add memcached plugiun. Thanks @Yukki! ### Bugfixes - [#13](https://github.com/influxdb/influxdb/pull/13): Fix the packaging script. +- [#19](https://github.com/influxdb/influxdb/pull/19): Add host name to metric tags. Thanks @sherifzain! +- [#20](https://github.com/influxdb/influxdb/pull/20): Fix race condition with accumulator mutex. Thanks @nkatsaros! +- [#23](https://github.com/influxdb/influxdb/pull/23): Change name of folder for packages. Thanks @colinrymer! +- [#32](https://github.com/influxdb/influxdb/pull/32): Fix spelling of memoory -> memory. Thanks @tylernisonoff! ## v0.1.1 [2015-06-19] From 0692b4be618024def1cbee0f01c24f64c11719d0 Mon Sep 17 00:00:00 2001 From: Emil Stolarsky Date: Fri, 26 Jun 2015 23:56:14 -0400 Subject: [PATCH 5/5] Add Kafka Consumer Plugin The Kafka consumer plugin polls a specified Kafka topic and adds messages to InfluxDB. The plugin assumes messages follow the line protocol. Consumer Group is used to talk to the Kafka cluster so multiple instances of telegraf can read from the same topic in parallel. --- plugins/all/all.go | 1 + plugins/kafka_consumer/kafka_consumer.go | 153 ++++++++++++++++++ .../kafka_consumer_integration_test.go | 62 +++++++ plugins/kafka_consumer/kafka_consumer_test.go | 95 +++++++++++ 4 files changed, 311 insertions(+) create mode 100644 plugins/kafka_consumer/kafka_consumer.go create mode 100644 plugins/kafka_consumer/kafka_consumer_integration_test.go create mode 100644 plugins/kafka_consumer/kafka_consumer_test.go diff --git a/plugins/all/all.go b/plugins/all/all.go index 71381aa87..466a7166d 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/memcached" _ "github.com/influxdb/telegraf/plugins/mysql" _ "github.com/influxdb/telegraf/plugins/postgresql" diff --git a/plugins/kafka_consumer/kafka_consumer.go b/plugins/kafka_consumer/kafka_consumer.go new file mode 100644 index 000000000..5e1b7fc5b --- /dev/null +++ b/plugins/kafka_consumer/kafka_consumer.go @@ -0,0 +1,153 @@ +package kafka_consumer + +import ( + "os" + "os/signal" + "time" + + "github.com/influxdb/influxdb/tsdb" + "github.com/influxdb/telegraf/plugins" + "github.com/wvanbergen/kafka/consumergroup" + "gopkg.in/Shopify/sarama.v1" +) + +type Kafka struct { + ConsumerGroupName string + Topic string + ZookeeperPeers []string + Consumer *consumergroup.ConsumerGroup + BatchSize int +} + +var sampleConfig = ` +# topic to consume +topic = "topic_with_metrics" + +# the name of the consumer group +consumerGroupName = "telegraf_metrics_consumers" + +# an array of Zookeeper connection strings +zookeeperPeers = ["localhost:2181"] + +# Batch size of points sent to InfluxDB +batchSize = 1000` + +func (k *Kafka) SampleConfig() string { + return sampleConfig +} + +func (k *Kafka) Description() string { + return "read metrics from a Kafka topic" +} + +type Metric struct { + Measurement string `json:"measurement"` + Values map[string]interface{} `json:"values"` + Tags map[string]string `json:"tags"` + Time time.Time `json:"time"` +} + +func (k *Kafka) Gather(acc plugins.Accumulator) error { + var consumerErr error + metricQueue := make(chan []byte, 200) + + if k.Consumer == nil { + k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( + k.ConsumerGroupName, + []string{k.Topic}, + k.ZookeeperPeers, + nil, + ) + + if consumerErr != nil { + return consumerErr + } + + c := make(chan os.Signal, 1) + halt := make(chan bool, 1) + signal.Notify(c, os.Interrupt) + go func() { + <-c + halt <- true + emitMetrics(k, acc, metricQueue) + k.Consumer.Close() + }() + + go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt) + } + + return emitMetrics(k, acc, metricQueue) +} + +func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte) error { + timeout := time.After(1 * time.Second) + + for { + select { + case batch := <-metricConsumer: + var points []tsdb.Point + var err error + if points, err = tsdb.ParsePoints(batch); err != nil { + return err + } + + for _, point := range points { + acc.AddValuesWithTime(point.Name(), point.Fields(), point.Tags(), point.Time()) + } + case <-timeout: + return nil + } + } +} + +const millisecond = 1000000 * time.Nanosecond + +type ack func(*sarama.ConsumerMessage) error + +func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) { + batch := make([]byte, 0) + currentBatchSize := 0 + timeout := time.After(500 * millisecond) + var msg *sarama.ConsumerMessage + + for { + select { + case msg = <-kafkaMsgs: + if currentBatchSize != 0 { + batch = append(batch, '\n') + } + + batch = append(batch, msg.Value...) + currentBatchSize++ + + if currentBatchSize == maxBatchSize { + metricProducer <- batch + currentBatchSize = 0 + batch = make([]byte, 0) + ackMsg(msg) + } + case <-timeout: + if currentBatchSize != 0 { + metricProducer <- batch + currentBatchSize = 0 + batch = make([]byte, 0) + ackMsg(msg) + } + + timeout = time.After(500 * millisecond) + case <-halt: + if currentBatchSize != 0 { + metricProducer <- batch + ackMsg(msg) + } + + return + } + } +} + +func init() { + plugins.Add("kafka", func() plugins.Plugin { + return &Kafka{} + }) +} diff --git a/plugins/kafka_consumer/kafka_consumer_integration_test.go b/plugins/kafka_consumer/kafka_consumer_integration_test.go new file mode 100644 index 000000000..1541cb127 --- /dev/null +++ b/plugins/kafka_consumer/kafka_consumer_integration_test.go @@ -0,0 +1,62 @@ +package kafka_consumer + +import ( + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestReadsMetricsFromKafka(t *testing.T) { + var zkPeers, brokerPeers []string + + if len(os.Getenv("ZOOKEEPER_PEERS")) == 0 { + zkPeers = []string{"localhost:2181"} + } else { + zkPeers = strings.Split(os.Getenv("ZOOKEEPER_PEERS"), ",") + } + + if len(os.Getenv("KAFKA_PEERS")) == 0 { + brokerPeers = []string{"localhost:9092"} + } else { + brokerPeers = strings.Split(os.Getenv("KAFKA_PEERS"), ",") + } + + k := &Kafka{ + ConsumerGroupName: "telegraf_test_consumers", + Topic: fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()), + ZookeeperPeers: zkPeers, + } + + msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257" + producer, err := sarama.NewSyncProducer(brokerPeers, nil) + require.NoError(t, err) + _, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: k.Topic, Value: sarama.StringEncoder(msg)}) + producer.Close() + + var acc testutil.Accumulator + + // Sanity check + assert.Equal(t, 0, len(acc.Points), "there should not be any points") + + err = k.Gather(&acc) + require.NoError(t, err) + + assert.Equal(t, 1, len(acc.Points), "there should be a single point") + + point := acc.Points[0] + assert.Equal(t, "cpu_load_short", point.Measurement) + assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values) + assert.Equal(t, map[string]string{ + "host": "server01", + "direction": "in", + "region": "us-west", + }, point.Tags) + assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time) +} diff --git a/plugins/kafka_consumer/kafka_consumer_test.go b/plugins/kafka_consumer/kafka_consumer_test.go new file mode 100644 index 000000000..fa6ad4a97 --- /dev/null +++ b/plugins/kafka_consumer/kafka_consumer_test.go @@ -0,0 +1,95 @@ +package kafka_consumer + +import ( + "strings" + "testing" + "time" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/Shopify/sarama.v1" +) + +const testMsg = "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257" + +func TestReadFromKafkaBatchesMsgsOnBatchSize(t *testing.T) { + halt := make(chan bool, 1) + metricChan := make(chan []byte, 1) + kafkaChan := make(chan *sarama.ConsumerMessage, 10) + for i := 0; i < 10; i++ { + kafkaChan <- saramaMsg(testMsg) + } + + expectedBatch := strings.Repeat(testMsg+"\n", 9) + testMsg + readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error { + batch := <-metricChan + assert.Equal(t, expectedBatch, string(batch)) + + halt <- true + + return nil + }, halt) +} + +func TestReadFromKafkaBatchesMsgsOnTimeout(t *testing.T) { + halt := make(chan bool, 1) + metricChan := make(chan []byte, 1) + kafkaChan := make(chan *sarama.ConsumerMessage, 10) + for i := 0; i < 3; i++ { + kafkaChan <- saramaMsg(testMsg) + } + + expectedBatch := strings.Repeat(testMsg+"\n", 2) + testMsg + readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error { + batch := <-metricChan + assert.Equal(t, expectedBatch, string(batch)) + + halt <- true + + return nil + }, halt) +} + +func TestEmitMetricsSendMetricsToAcc(t *testing.T) { + k := &Kafka{} + var acc testutil.Accumulator + testChan := make(chan []byte, 1) + testChan <- []byte(testMsg) + + err := emitMetrics(k, &acc, testChan) + require.NoError(t, err) + + assert.Equal(t, 1, len(acc.Points), "there should be a single point") + + point := acc.Points[0] + assert.Equal(t, "cpu_load_short", point.Measurement) + assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values) + assert.Equal(t, map[string]string{ + "host": "server01", + "direction": "in", + "region": "us-west", + }, point.Tags) + + assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time) +} + +func TestEmitMetricsTimesOut(t *testing.T) { + k := &Kafka{} + var acc testutil.Accumulator + testChan := make(chan []byte) + + err := emitMetrics(k, &acc, testChan) + require.NoError(t, err) + + assert.Equal(t, 0, len(acc.Points), "there should not be a any points") +} + +func saramaMsg(val string) *sarama.ConsumerMessage { + return &sarama.ConsumerMessage{ + Key: nil, + Value: []byte(val), + Offset: 0, + Partition: 0, + } +}