MQTT Consumer Input plugin

This commit is contained in:
Cameron Sparr
2016-02-09 15:03:46 -07:00
parent 6c353e8b8f
commit 8d0f50a6fd
16 changed files with 554 additions and 44 deletions

View File

@@ -1,4 +1,4 @@
# Kafka Consumer
# Kafka Consumer Input Plugin
The [Kafka](http://kafka.apache.org/) consumer plugin polls a specified Kafka
topic and adds messages to InfluxDB. The plugin assumes messages follow the
@@ -6,6 +6,29 @@ line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/con
is used to talk to the Kafka cluster so multiple instances of telegraf can read
from the same topic in parallel.
## Configuration
```toml
# Read metrics from Kafka topic(s)
[[inputs.kafka_consumer]]
### topic(s) to consume
topics = ["telegraf"]
### an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
### the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Offset (must be either "oldest" or "newest")
offset = "oldest"
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
data_format = "influx"
```
## Testing
Running integration tests requires running Zookeeper & Kafka. The following
@@ -16,9 +39,3 @@ To start Kafka & Zookeeper:
```
docker run -d -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`boot2docker ip || docker-machine ip <your_machine_name>` --env ADVERTISED_PORT=9092 spotify/kafka
```
To run tests:
```
go test
```

View File

@@ -19,8 +19,10 @@ type Kafka struct {
Topics []string
ZookeeperPeers []string
Consumer *consumergroup.ConsumerGroup
PointBuffer int
Offset string
MetricBuffer int
// TODO remove PointBuffer, legacy support
PointBuffer int
Offset string
parser parsers.Parser
@@ -30,7 +32,7 @@ type Kafka struct {
in <-chan *sarama.ConsumerMessage
// channel for all kafka consumer errors
errs <-chan *sarama.ConsumerError
// channel for all incoming parsed kafka points
// channel for all incoming parsed kafka metrics
metricC chan telegraf.Metric
done chan struct{}
@@ -46,8 +48,8 @@ var sampleConfig = `
zookeeper_peers = ["localhost:2181"]
### the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
### Maximum number of points to buffer between collection intervals
point_buffer = 100000
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Offset (must be either "oldest" or "newest")
offset = "oldest"
@@ -104,10 +106,13 @@ func (k *Kafka) Start() error {
}
k.done = make(chan struct{})
if k.PointBuffer == 0 {
k.PointBuffer = 100000
if k.PointBuffer == 0 && k.MetricBuffer == 0 {
k.MetricBuffer = 100000
} else if k.PointBuffer > 0 {
// Legacy support of PointBuffer field TODO remove
k.MetricBuffer = k.PointBuffer
}
k.metricC = make(chan telegraf.Metric, k.PointBuffer)
k.metricC = make(chan telegraf.Metric, k.MetricBuffer)
// Start the kafka message reader
go k.receiver()
@@ -128,7 +133,7 @@ func (k *Kafka) receiver() {
case msg := <-k.in:
metrics, err := k.parser.Parse(msg.Value)
if err != nil {
log.Printf("Could not parse kafka message: %s, error: %s",
log.Printf("KAFKA PARSE ERROR\nmessage: %s\nerror: %s",
string(msg.Value), err.Error())
}
@@ -139,7 +144,7 @@ func (k *Kafka) receiver() {
continue
default:
log.Printf("Kafka Consumer buffer is full, dropping a metric." +
" You may want to increase the point_buffer setting")
" You may want to increase the metric_buffer setting")
}
}
@@ -166,10 +171,10 @@ func (k *Kafka) Stop() {
func (k *Kafka) Gather(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
npoints := len(k.metricC)
for i := 0; i < npoints; i++ {
point := <-k.metricC
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
nmetrics := len(k.metricC)
for i := 0; i < nmetrics; i++ {
metric := <-k.metricC
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
return nil
}