From 0193cbee513bbd66b61f36a533241b9b212758fb Mon Sep 17 00:00:00 2001 From: Nick Irvine Date: Tue, 11 Apr 2017 12:05:39 -0700 Subject: [PATCH] Add max_message_len in kafka_consumer input (#2636) --- CHANGELOG.md | 1 + plugins/inputs/kafka_consumer/README.md | 4 +++ .../inputs/kafka_consumer/kafka_consumer.go | 31 ++++++++++++------- .../kafka_consumer/kafka_consumer_test.go | 18 +++++++++++ 4 files changed, 43 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 12381152c..10934f7fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,6 +66,7 @@ be deprecated eventually. - [#2425](https://github.com/influxdata/telegraf/pull/2425): Support to include/exclude docker container labels as tags - [#1667](https://github.com/influxdata/telegraf/pull/1667): dmcache input plugin - [#2637](https://github.com/influxdata/telegraf/issues/2637): Add support for precision in http_listener +- [#2636](https://github.com/influxdata/telegraf/pull/2636): Add `message_len_max` option to `kafka_consumer` input ### Bugfixes diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index afdb51e32..6a95a7c54 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -28,6 +28,10 @@ from the same topic in parallel. ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" + + ## Maximum length of a message to consume, in bytes (default 0/unlimited); + ## larger messages are dropped + max_message_len = 65536 ``` ## Testing diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 6f1f4020b..2f6933db0 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -17,6 +17,7 @@ import ( type Kafka struct { ConsumerGroup string Topics []string + MaxMessageLen int ZookeeperPeers []string ZookeeperChroot string Consumer *consumergroup.ConsumerGroup @@ -58,10 +59,14 @@ var sampleConfig = ` offset = "oldest" ## Data format to consume. - ## Each data format has it's own unique set of configuration options, read + ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" + + ## Maximum length of a message to consume, in bytes (default 0/unlimited); + ## larger messages are dropped + max_message_len = 65536 ` func (k *Kafka) SampleConfig() string { @@ -130,17 +135,21 @@ func (k *Kafka) receiver() { return case err := <-k.errs: if err != nil { - k.acc.AddError(fmt.Errorf("Kafka Consumer Error: %s\n", err)) + k.acc.AddError(fmt.Errorf("Consumer Error: %s\n", err)) } case msg := <-k.in: - metrics, err := k.parser.Parse(msg.Value) - if err != nil { - k.acc.AddError(fmt.Errorf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s", - string(msg.Value), err.Error())) - } - - for _, metric := range metrics { - k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen { + k.acc.AddError(fmt.Errorf("Message longer than max_message_len (%d > %d)", + len(msg.Value), k.MaxMessageLen)) + } else { + metrics, err := k.parser.Parse(msg.Value) + if err != nil { + k.acc.AddError(fmt.Errorf("Message Parse Error\nmessage: %s\nerror: %s", + string(msg.Value), err.Error())) + } + for _, metric := range metrics { + k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } } if !k.doNotCommitMsgs { @@ -159,7 +168,7 @@ func (k *Kafka) Stop() { defer k.Unlock() close(k.done) if err := k.Consumer.Close(); err != nil { - k.acc.AddError(fmt.Errorf("E! Error closing kafka consumer: %s\n", err.Error())) + k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error())) } } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index e1c24adbe..04498261c 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -1,6 +1,7 @@ package kafka_consumer import ( + "strings" "testing" "github.com/influxdata/telegraf/plugins/parsers" @@ -62,6 +63,23 @@ func TestRunParserInvalidMsg(t *testing.T) { assert.Equal(t, acc.NFields(), 0) } +// Test that overlong messages are dropped +func TestDropOverlongMsg(t *testing.T) { + const maxMessageLen = 64 * 1024 + k, in := newTestKafka() + k.MaxMessageLen = maxMessageLen + acc := testutil.Accumulator{} + k.acc = &acc + defer close(k.done) + overlongMsg := strings.Repeat("v", maxMessageLen+1) + + go k.receiver() + in <- saramaMsg(overlongMsg) + acc.WaitError(1) + + assert.Equal(t, acc.NFields(), 0) +} + // Test that the parser parses kafka messages into points func TestRunParserAndGather(t *testing.T) { k, in := newTestKafka()