From d1f965ae30577bc6abc41c5544984dc5e91c08f5 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 26 Aug 2015 11:02:10 -0600 Subject: [PATCH] Kafka output producer, send telegraf metrics to Kafka brokers Closes #38 --- CHANGELOG.md | 1 + README.md | 31 +++++------ agent.go | 8 +++ cmd/telegraf/telegraf.go | 3 -- config.go | 2 +- outputs/all/all.go | 1 + outputs/datadog/datadog_test.go | 18 ++----- outputs/kafka/kafka.go | 91 +++++++++++++++++++++++++++++++++ outputs/kafka/kafka_test.go | 28 ++++++++++ testutil/testutil.go | 17 ++++++ 10 files changed, 167 insertions(+), 33 deletions(-) create mode 100644 outputs/kafka/kafka.go create mode 100644 outputs/kafka/kafka_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dfa674eb..71852ce96 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## v0.1.7 [unreleased] ### Features +- [#38](https://github.com/influxdb/telegraf/pull/38): Kafka output sink. - [#133](https://github.com/influxdb/telegraf/pull/133): Add plugin.Gather error logging. Thanks @nickscript0! - [#136](https://github.com/influxdb/telegraf/issues/136): Add a -usage flag for printing usage of a single plugin. - [#137](https://github.com/influxdb/telegraf/issues/137): Memcached: fix when a value contains a space diff --git a/README.md b/README.md index bcf06d319..de84f2a8c 100644 --- a/README.md +++ b/README.md @@ -103,21 +103,22 @@ at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output measurements at a 10s interval and will collect totalcpu & percpu data. ``` -[outputs] -[outputs.influxdb] -url = "http://192.168.59.103:8086" # required. -database = "telegraf" # required. - [tags] -dc = "denver-1" + dc = "denver-1" [agent] -interval = "10s" + interval = "10s" + +# OUTPUTS +[outputs] +[outputs.influxdb] + url = "http://192.168.59.103:8086" # required. + database = "telegraf" # required. # PLUGINS [cpu] -percpu = true -totalcpu = true + percpu = true + totalcpu = true ``` Below is how to configure `tagpass` parameters (added in 0.1.4) @@ -125,15 +126,15 @@ Below is how to configure `tagpass` parameters (added in 0.1.4) ``` # Don't collect CPU data for cpu6 & cpu7 [cpu.tagdrop] -cpu = [ "cpu6", "cpu7" ] + cpu = [ "cpu6", "cpu7" ] [disk] [disk.tagpass] -# tagpass conditions are OR, not AND. -# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home) -# then the metric passes -fstype = [ "ext4", "xfs" ] -path = [ "/opt", "/home" ] + # tagpass conditions are OR, not AND. + # If the (filesystem is ext4 or xfs) OR (the path is /opt or /home) + # then the metric passes + fstype = [ "ext4", "xfs" ] + path = [ "/opt", "/home" ] ``` ## Supported Plugins diff --git a/agent.go b/agent.go index b35fae390..e5871b5b7 100644 --- a/agent.go +++ b/agent.go @@ -74,6 +74,9 @@ func (a *Agent) Connect() error { if err != nil { return err } + if a.Debug { + log.Printf("Successfully connected to output: %s\n", o.name) + } } return nil } @@ -160,6 +163,8 @@ func (a *Agent) LoadPlugins(pluginsFilter string) ([]string, error) { return names, nil } +// crankParallel runs the plugins that are using the same reporting interval +// as the telegraf agent. func (a *Agent) crankParallel() error { points := make(chan *BatchPoints, len(a.plugins)) @@ -203,6 +208,7 @@ func (a *Agent) crankParallel() error { return a.flush(&bp) } +// crank is mostly for test purposes. func (a *Agent) crank() error { var bp BatchPoints @@ -223,6 +229,8 @@ func (a *Agent) crank() error { return a.flush(&bp) } +// crankSeparate runs the plugins that have been configured with their own +// reporting interval. func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error { ticker := time.NewTicker(plugin.config.Interval) diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 38f323215..c7f863778 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -114,11 +114,8 @@ func main() { } shutdown := make(chan struct{}) - signals := make(chan os.Signal) - signal.Notify(signals, os.Interrupt) - go func() { <-signals close(shutdown) diff --git a/config.go b/config.go index 516fee1e5..19ebc00bf 100644 --- a/config.go +++ b/config.go @@ -356,6 +356,7 @@ var header = `# Telegraf configuration # debug = false # hostname = "prod3241" + ############################################################################### # OUTPUTS # ############################################################################### @@ -368,7 +369,6 @@ var header2 = ` ############################################################################### # PLUGINS # ############################################################################### - ` // PrintSampleConfig prints the sample config! diff --git a/outputs/all/all.go b/outputs/all/all.go index 0fb5f3723..36d11ea61 100644 --- a/outputs/all/all.go +++ b/outputs/all/all.go @@ -3,4 +3,5 @@ package all import ( _ "github.com/influxdb/telegraf/outputs/datadog" _ "github.com/influxdb/telegraf/outputs/influxdb" + _ "github.com/influxdb/telegraf/outputs/kafka" ) diff --git a/outputs/datadog/datadog_test.go b/outputs/datadog/datadog_test.go index 744afc99b..b5a7d3565 100644 --- a/outputs/datadog/datadog_test.go +++ b/outputs/datadog/datadog_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/influxdb/telegraf/testutil" + "github.com/influxdb/influxdb/client" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -25,18 +27,6 @@ func fakeDatadog() *Datadog { return d } -func testData() client.BatchPoints { - var bp client.BatchPoints - bp.Time = time.Now() - bp.Tags = map[string]string{"tag1": "value1"} - bp.Points = []client.Point{ - { - Fields: map[string]interface{}{"value": 1.0}, - }, - } - return bp -} - func TestUriOverride(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) @@ -48,7 +38,7 @@ func TestUriOverride(t *testing.T) { d.Apikey = "123456" err := d.Connect() require.NoError(t, err) - err = d.Write(testData()) + err = d.Write(testutil.MockBatchPoints()) require.NoError(t, err) } @@ -67,7 +57,7 @@ func TestBadStatusCode(t *testing.T) { d.Apikey = "123456" err := d.Connect() require.NoError(t, err) - err = d.Write(testData()) + err = d.Write(testutil.MockBatchPoints()) if err == nil { t.Errorf("error expected but none returned") } else { diff --git a/outputs/kafka/kafka.go b/outputs/kafka/kafka.go new file mode 100644 index 000000000..ac4d61164 --- /dev/null +++ b/outputs/kafka/kafka.go @@ -0,0 +1,91 @@ +package kafka + +import ( + "errors" + "fmt" + + "github.com/Shopify/sarama" + "github.com/influxdb/influxdb/client" + "github.com/influxdb/telegraf/outputs" +) + +type Kafka struct { + // Kafka brokers to send metrics to + Brokers []string + // Kafka topic + Topic string + + producer sarama.SyncProducer +} + +var sampleConfig = ` + # URLs of kafka brokers + brokers = ["localhost:9092"] + # Kafka topic for producer messages + topic = "telegraf" +` + +func (k *Kafka) Connect() error { + producer, err := sarama.NewSyncProducer(k.Brokers, nil) + if err != nil { + return err + } + k.producer = producer + return nil +} + +func (k *Kafka) Close() error { + return k.producer.Close() +} + +func (k *Kafka) SampleConfig() string { + return sampleConfig +} + +func (k *Kafka) Description() string { + return "Configuration for the Kafka server to send metrics to" +} + +func (k *Kafka) Write(bp client.BatchPoints) error { + if len(bp.Points) == 0 { + return nil + } + + for _, p := range bp.Points { + // Combine tags from Point and BatchPoints and grab the resulting + // line-protocol output string to write to Kafka + var value string + if p.Raw != "" { + value = p.Raw + } else { + for k, v := range bp.Tags { + if p.Tags == nil { + p.Tags = make(map[string]string, len(bp.Tags)) + } + p.Tags[k] = v + } + value = p.MarshalString() + } + + m := &sarama.ProducerMessage{ + Topic: k.Topic, + Value: sarama.StringEncoder(value), + } + if h, ok := p.Tags["host"]; ok { + m.Key = sarama.StringEncoder(h) + } + + _, _, err := k.producer.SendMessage(m) + if err != nil { + return errors.New(fmt.Sprintf("FAILED to send kafka message: %s\n", + err)) + } + } + return nil +} + +func init() { + outputs.Add("kafka", func() outputs.Output { + return &Kafka{} + }) +} diff --git a/outputs/kafka/kafka_test.go b/outputs/kafka/kafka_test.go new file mode 100644 index 000000000..e97bf1bb5 --- /dev/null +++ b/outputs/kafka/kafka_test.go @@ -0,0 +1,28 @@ +package kafka + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConnectAndWrite(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + brokers := []string{testutil.GetLocalHost() + ":9092"} + k := &Kafka{ + Brokers: brokers, + Topic: "Test", + } + + // Verify that we can connect to the Kafka broker + err := k.Connect() + require.NoError(t, err) + + // Verify that we can successfully write data to the kafka broker + err = k.Write(testutil.MockBatchPoints()) + require.NoError(t, err) +} diff --git a/testutil/testutil.go b/testutil/testutil.go index 91eb4b6b9..79a7dd544 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -4,6 +4,9 @@ import ( "net" "net/url" "os" + "time" + + "github.com/influxdb/influxdb/client" ) var localhost = "localhost" @@ -27,3 +30,17 @@ func GetLocalHost() string { } return localhost } + +// MockBatchPoints returns a mock BatchPoints object for using in unit tests +// of telegraf output sinks. +func MockBatchPoints() client.BatchPoints { + var bp client.BatchPoints + bp.Time = time.Now() + bp.Tags = map[string]string{"tag1": "value1"} + bp.Points = []client.Point{ + { + Fields: map[string]interface{}{"value": 1.0}, + }, + } + return bp +}