Add max_message_len in kafka_consumer input (#2636)

This commit is contained in:
Nick Irvine 2017-04-11 12:05:39 -07:00 committed by Daniel Nelson
parent 8b4c3201a2
commit 198ef8de3a
4 changed files with 43 additions and 11 deletions

View File

@ -66,6 +66,7 @@ be deprecated eventually.
- [#2425](https://github.com/influxdata/telegraf/pull/2425): Support to include/exclude docker container labels as tags
- [#1667](https://github.com/influxdata/telegraf/pull/1667): dmcache input plugin
- [#2637](https://github.com/influxdata/telegraf/issues/2637): Add support for precision in http_listener
- [#2636](https://github.com/influxdata/telegraf/pull/2636): Add `message_len_max` option to `kafka_consumer` input
### Bugfixes

View File

@ -28,6 +28,10 @@ from the same topic in parallel.
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 65536
```
## Testing

View File

@ -17,6 +17,7 @@ import (
type Kafka struct {
ConsumerGroup string
Topics []string
MaxMessageLen int
ZookeeperPeers []string
ZookeeperChroot string
Consumer *consumergroup.ConsumerGroup
@ -58,10 +59,14 @@ var sampleConfig = `
offset = "oldest"
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 65536
`
func (k *Kafka) SampleConfig() string {
@ -130,18 +135,22 @@ func (k *Kafka) receiver() {
return
case err := <-k.errs:
if err != nil {
k.acc.AddError(fmt.Errorf("Kafka Consumer Error: %s\n", err))
k.acc.AddError(fmt.Errorf("Consumer Error: %s\n", err))
}
case msg := <-k.in:
if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen {
k.acc.AddError(fmt.Errorf("Message longer than max_message_len (%d > %d)",
len(msg.Value), k.MaxMessageLen))
} else {
metrics, err := k.parser.Parse(msg.Value)
if err != nil {
k.acc.AddError(fmt.Errorf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s",
k.acc.AddError(fmt.Errorf("Message Parse Error\nmessage: %s\nerror: %s",
string(msg.Value), err.Error()))
}
for _, metric := range metrics {
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
}
if !k.doNotCommitMsgs {
// TODO(cam) this locking can be removed if this PR gets merged:
@ -159,7 +168,7 @@ func (k *Kafka) Stop() {
defer k.Unlock()
close(k.done)
if err := k.Consumer.Close(); err != nil {
k.acc.AddError(fmt.Errorf("E! Error closing kafka consumer: %s\n", err.Error()))
k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error()))
}
}

View File

@ -1,6 +1,7 @@
package kafka_consumer
import (
"strings"
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
@ -62,6 +63,23 @@ func TestRunParserInvalidMsg(t *testing.T) {
assert.Equal(t, acc.NFields(), 0)
}
// Test that overlong messages are dropped
func TestDropOverlongMsg(t *testing.T) {
const maxMessageLen = 64 * 1024
k, in := newTestKafka()
k.MaxMessageLen = maxMessageLen
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
overlongMsg := strings.Repeat("v", maxMessageLen+1)
go k.receiver()
in <- saramaMsg(overlongMsg)
acc.WaitError(1)
assert.Equal(t, acc.NFields(), 0)
}
// Test that the parser parses kafka messages into points
func TestRunParserAndGather(t *testing.T) {
k, in := newTestKafka()