Remove outputs blocking inputs when output is slow (#4938)

This commit is contained in:
Daniel Nelson
2018-11-05 13:34:28 -08:00
committed by GitHub
parent 74667cd681
commit 6e5c2f8bb6
59 changed files with 3615 additions and 2189 deletions

View File

@@ -133,7 +133,6 @@ func (m *BasicStats) Add(in telegraf.Metric) {
}
func (m *BasicStats) Push(acc telegraf.Accumulator) {
config := getConfiguredStats(m)
for _, aggregate := range m.cache {

View File

@@ -13,7 +13,6 @@ For an introduction to AMQP see:
The following defaults are known to work with RabbitMQ:
```toml
# AMQP consumer plugin
[[inputs.amqp_consumer]]
## Broker to consume from.
## deprecated in 1.7; use the brokers option
@@ -46,16 +45,26 @@ The following defaults are known to work with RabbitMQ:
## AMQP queue name
queue = "telegraf"
## AMQP queue durability can be "transient" or "durable".
queue_durability = "durable"
## Binding Key
binding_key = "#"
## Maximum number of messages server should give to the worker.
# prefetch_count = 50
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html

View File

@@ -1,6 +1,7 @@
package amqp_consumer
import (
"context"
"errors"
"fmt"
"log"
@@ -9,25 +10,32 @@ import (
"sync"
"time"
"github.com/streadway/amqp"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/streadway/amqp"
)
const (
defaultMaxUndeliveredMessages = 1000
)
type empty struct{}
type semaphore chan empty
// AMQPConsumer is the top level struct for this plugin
type AMQPConsumer struct {
URL string `toml:"url"` // deprecated in 1.7; use brokers
Brokers []string `toml:"brokers"`
Username string `toml:"username"`
Password string `toml:"password"`
Exchange string `toml:"exchange"`
ExchangeType string `toml:"exchange_type"`
ExchangeDurability string `toml:"exchange_durability"`
ExchangePassive bool `toml:"exchange_passive"`
ExchangeArguments map[string]string `toml:"exchange_arguments"`
URL string `toml:"url"` // deprecated in 1.7; use brokers
Brokers []string `toml:"brokers"`
Username string `toml:"username"`
Password string `toml:"password"`
Exchange string `toml:"exchange"`
ExchangeType string `toml:"exchange_type"`
ExchangeDurability string `toml:"exchange_durability"`
ExchangePassive bool `toml:"exchange_passive"`
ExchangeArguments map[string]string `toml:"exchange_arguments"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
// Queue Name
Queue string `toml:"queue"`
@@ -44,9 +52,12 @@ type AMQPConsumer struct {
AuthMethod string
tls.ClientConfig
deliveries map[telegraf.TrackingID]amqp.Delivery
parser parsers.Parser
conn *amqp.Connection
wg *sync.WaitGroup
cancel context.CancelFunc
}
type externalAuth struct{}
@@ -114,6 +125,16 @@ func (a *AMQPConsumer) SampleConfig() string {
## Maximum number of messages server should give to the worker.
# prefetch_count = 50
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
@@ -185,9 +206,15 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
return err
}
ctx, cancel := context.WithCancel(context.Background())
a.cancel = cancel
a.wg = &sync.WaitGroup{}
a.wg.Add(1)
go a.process(msgs, acc)
go func() {
defer a.wg.Done()
a.process(ctx, msgs, acc)
}()
go func() {
for {
@@ -196,7 +223,7 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
break
}
log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err)
log.Printf("I! [inputs.amqp_consumer] connection closed: %s; trying to reconnect", err)
for {
msgs, err := a.connect(amqpConf)
if err != nil {
@@ -206,7 +233,10 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
}
a.wg.Add(1)
go a.process(msgs, acc)
go func() {
defer a.wg.Done()
a.process(ctx, msgs, acc)
}()
break
}
}
@@ -224,14 +254,14 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
p := rand.Perm(len(brokers))
for _, n := range p {
broker := brokers[n]
log.Printf("D! [amqp_consumer] connecting to %q", broker)
log.Printf("D! [inputs.amqp_consumer] connecting to %q", broker)
conn, err := amqp.DialConfig(broker, *amqpConf)
if err == nil {
a.conn = conn
log.Printf("D! [amqp_consumer] connected to %q", broker)
log.Printf("D! [inputs.amqp_consumer] connected to %q", broker)
break
}
log.Printf("D! [amqp_consumer] error connecting to %q", broker)
log.Printf("D! [inputs.amqp_consumer] error connecting to %q", broker)
}
if a.conn == nil {
@@ -320,7 +350,6 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
return nil, fmt.Errorf("Failed establishing connection to queue: %s", err)
}
log.Println("I! Started AMQP consumer")
return msgs, err
}
@@ -361,42 +390,101 @@ func declareExchange(
}
// Read messages from queue and add them to the Accumulator
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
defer a.wg.Done()
for d := range msgs {
metrics, err := a.parser.Parse(d.Body)
if err != nil {
log.Printf("E! %v: error parsing metric - %v", err, string(d.Body))
} else {
for _, m := range metrics {
acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, ac telegraf.Accumulator) {
a.deliveries = make(map[telegraf.TrackingID]amqp.Delivery)
acc := ac.WithTracking(a.MaxUndeliveredMessages)
sem := make(semaphore, a.MaxUndeliveredMessages)
for {
select {
case <-ctx.Done():
return
case track := <-acc.Delivered():
if a.onDelivery(track) {
<-sem
}
case sem <- empty{}:
select {
case <-ctx.Done():
return
case track := <-acc.Delivered():
if a.onDelivery(track) {
<-sem
<-sem
}
case d, ok := <-msgs:
if !ok {
return
}
err := a.onMessage(acc, d)
if err != nil {
acc.AddError(err)
<-sem
}
}
}
d.Ack(false)
}
log.Printf("I! AMQP consumer queue closed")
}
func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delivery) error {
metrics, err := a.parser.Parse(d.Body)
if err != nil {
return err
}
id := acc.AddTrackingMetricGroup(metrics)
a.deliveries[id] = d
return nil
}
func (a *AMQPConsumer) onDelivery(track telegraf.DeliveryInfo) bool {
delivery, ok := a.deliveries[track.ID()]
if !ok {
// Added by a previous connection
return false
}
if track.Delivered() {
err := delivery.Ack(false)
if err != nil {
log.Printf("E! [inputs.amqp_consumer] Unable to ack written delivery: %d: %v",
delivery.DeliveryTag, err)
a.conn.Close()
}
} else {
err := delivery.Reject(false)
if err != nil {
log.Printf("E! [inputs.amqp_consumer] Unable to reject failed delivery: %d: %v",
delivery.DeliveryTag, err)
a.conn.Close()
}
}
delete(a.deliveries, track.ID())
return true
}
func (a *AMQPConsumer) Stop() {
a.cancel()
a.wg.Wait()
err := a.conn.Close()
if err != nil && err != amqp.ErrClosed {
log.Printf("E! Error closing AMQP connection: %s", err)
log.Printf("E! [inputs.amqp_consumer] Error closing AMQP connection: %s", err)
return
}
a.wg.Wait()
log.Println("I! Stopped AMQP service")
}
func init() {
inputs.Add("amqp_consumer", func() telegraf.Input {
return &AMQPConsumer{
URL: DefaultBroker,
AuthMethod: DefaultAuthMethod,
ExchangeType: DefaultExchangeType,
ExchangeDurability: DefaultExchangeDurability,
QueueDurability: DefaultQueueDurability,
PrefetchCount: DefaultPrefetchCount,
URL: DefaultBroker,
AuthMethod: DefaultAuthMethod,
ExchangeType: DefaultExchangeType,
ExchangeDurability: DefaultExchangeDurability,
QueueDurability: DefaultQueueDurability,
PrefetchCount: DefaultPrefetchCount,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
}
})
}

View File

@@ -18,52 +18,54 @@ plugin.
memstats are taken from the Go runtime: https://golang.org/pkg/runtime/#MemStats
- internal\_memstats
- alloc\_bytes
- internal_memstats
- alloc_bytes
- frees
- heap\_alloc\_bytes
- heap\_idle\_bytes
- heap\_in\_use\_bytes
- heap\_objects\_bytes
- heap\_released\_bytes
- heap\_sys\_bytes
- heap_alloc_bytes
- heap_idle_bytes
- heap_in_use_bytes
- heap_objects_bytes
- heap_released_bytes
- heap_sys_bytes
- mallocs
- num\_gc
- pointer\_lookups
- sys\_bytes
- total\_alloc\_bytes
- num_gc
- pointer_lookups
- sys_bytes
- total_alloc_bytes
agent stats collect aggregate stats on all telegraf plugins.
- internal\_agent
- gather\_errors
- metrics\_dropped
- metrics\_gathered
- metrics\_written
- internal_agent
- gather_errors
- metrics_dropped
- metrics_gathered
- metrics_written
internal\_gather stats collect aggregate stats on all input plugins
internal_gather stats collect aggregate stats on all input plugins
that are of the same input type. They are tagged with `input=<plugin_name>`.
- internal\_gather
- gather\_time\_ns
- metrics\_gathered
- internal_gather
- gather_time_ns
- metrics_gathered
internal\_write stats collect aggregate stats on all output plugins
internal_write stats collect aggregate stats on all output plugins
that are of the same input type. They are tagged with `output=<plugin_name>`.
- internal\_write
- buffer\_limit
- buffer\_size
- metrics\_written
- metrics\_filtered
- write\_time\_ns
- internal_write
- buffer_limit
- buffer_size
- metrics_added
- metrics_written
- metrics_dropped
- metrics_filtered
- write_time_ns
internal\_\<plugin\_name\> are metrics which are defined on a per-plugin basis, and
internal_<plugin_name> are metrics which are defined on a per-plugin basis, and
usually contain tags which differentiate each instance of a particular type of
plugin.
- internal\_\<plugin\_name\>
- internal_<plugin_name>
- individual plugin-specific fields, such as requests counts.
### Tags:
@@ -76,7 +78,7 @@ to each particular plugin.
```
internal_memstats,host=tyrion alloc_bytes=4457408i,sys_bytes=10590456i,pointer_lookups=7i,mallocs=17642i,frees=7473i,heap_sys_bytes=6848512i,heap_idle_bytes=1368064i,heap_in_use_bytes=5480448i,heap_released_bytes=0i,total_alloc_bytes=6875560i,heap_alloc_bytes=4457408i,heap_objects_bytes=10169i,num_gc=2i 1480682800000000000
internal_agent,host=tyrion metrics_written=18i,metrics_dropped=0i,metrics_gathered=19i,gather_errors=0i 1480682800000000000
internal_write,output=file,host=tyrion buffer_limit=10000i,write_time_ns=636609i,metrics_written=18i,buffer_size=0i 1480682800000000000
internal_write,output=file,host=tyrion buffer_limit=10000i,write_time_ns=636609i,metrics_added=18i,metrics_written=18i,buffer_size=0i 1480682800000000000
internal_gather,input=internal,host=tyrion metrics_gathered=19i,gather_time_ns=442114i 1480682800000000000
internal_gather,input=http_listener,host=tyrion metrics_gathered=0i,gather_time_ns=167285i 1480682800000000000
internal_http_listener,address=:8186,host=tyrion queries_received=0i,writes_received=0i,requests_received=0i,buffers_created=0i,requests_served=0i,pings_received=0i,bytes_received=0i,not_founds_served=0i,pings_served=0i,queries_served=0i,writes_served=0i 1480682800000000000

View File

@@ -1,18 +1,14 @@
# Kafka Consumer Input Plugin
The [Kafka](http://kafka.apache.org/) consumer plugin polls a specified Kafka
topic and adds messages to InfluxDB. The plugin assumes messages follow the
line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/consumergroup)
is used to talk to the Kafka cluster so multiple instances of telegraf can read
from the same topic in parallel.
The [Kafka][kafka] consumer plugin reads from Kafka
and creates metrics using one of the supported [input data formats][].
For old kafka version (< 0.8), please use the kafka_consumer_legacy input plugin
For old kafka version (< 0.8), please use the [kafka_consumer_legacy][] input plugin
and use the old zookeeper connection method.
## Configuration
### Configuration
```toml
# Read metrics from Kafka topic(s)
[[inputs.kafka_consumer]]
## kafka servers
brokers = ["localhost:9092"]
@@ -44,18 +40,27 @@ and use the old zookeeper connection method.
## Offset (must be either "oldest" or "newest")
offset = "oldest"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
```
## Testing
Running integration tests requires running Zookeeper & Kafka. See Makefile
for kafka container command.
[kafka]: https://kafka.apache.org
[kafka_consumer_legacy]: /plugins/inputs/kafka_consumer_legacy/README.md
[input data formats]: /docs/DATA_FORMATS_INPUT.md

View File

@@ -1,55 +1,54 @@
package kafka_consumer
import (
"context"
"fmt"
"log"
"strings"
"sync"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
)
const (
defaultMaxUndeliveredMessages = 1000
)
type empty struct{}
type semaphore chan empty
type Consumer interface {
Errors() <-chan error
Messages() <-chan *sarama.ConsumerMessage
MarkOffset(msg *sarama.ConsumerMessage, metadata string)
Close() error
}
type Kafka struct {
ConsumerGroup string
ClientID string `toml:"client_id"`
Topics []string
Brokers []string
MaxMessageLen int
Version string `toml:"version"`
Cluster *cluster.Consumer
ConsumerGroup string `toml:"consumer_group"`
ClientID string `toml:"client_id"`
Topics []string `toml:"topics"`
Brokers []string `toml:"brokers"`
MaxMessageLen int `toml:"max_message_len"`
Version string `toml:"version"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Offset string `toml:"offset"`
SASLUsername string `toml:"sasl_username"`
SASLPassword string `toml:"sasl_password"`
tls.ClientConfig
// SASL Username
SASLUsername string `toml:"sasl_username"`
// SASL Password
SASLPassword string `toml:"sasl_password"`
cluster Consumer
parser parsers.Parser
wg *sync.WaitGroup
cancel context.CancelFunc
// Legacy metric buffer support
MetricBuffer int
// TODO remove PointBuffer, legacy support
PointBuffer int
Offset string
parser parsers.Parser
sync.Mutex
// channel for all incoming kafka messages
in <-chan *sarama.ConsumerMessage
// channel for all kafka consumer errors
errs <-chan error
done chan struct{}
// keep the accumulator internally:
acc telegraf.Accumulator
// Unconfirmed messages
messages map[telegraf.TrackingID]*sarama.ConsumerMessage
// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer
// this is mostly for test purposes, but there may be a use-case for it later.
@@ -86,16 +85,25 @@ var sampleConfig = `
consumer_group = "telegraf_metrics_consumers"
## Offset (must be either "oldest" or "newest")
offset = "oldest"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
`
func (k *Kafka) SampleConfig() string {
@@ -111,12 +119,8 @@ func (k *Kafka) SetParser(parser parsers.Parser) {
}
func (k *Kafka) Start(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
var clusterErr error
k.acc = acc
config := cluster.NewConfig()
if k.Version != "" {
@@ -159,13 +163,13 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error {
case "newest":
config.Consumer.Offsets.Initial = sarama.OffsetNewest
default:
log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'",
k.Offset)
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
if k.Cluster == nil {
k.Cluster, clusterErr = cluster.NewConsumer(
if k.cluster == nil {
k.cluster, clusterErr = cluster.NewConsumer(
k.Brokers,
k.ConsumerGroup,
k.Topics,
@@ -173,67 +177,110 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error {
)
if clusterErr != nil {
log.Printf("E! Error when creating Kafka Consumer, brokers: %v, topics: %v\n",
log.Printf("E! Error when creating Kafka Consumer, brokers: %v, topics: %v",
k.Brokers, k.Topics)
return clusterErr
}
// Setup message and error channels
k.in = k.Cluster.Messages()
k.errs = k.Cluster.Errors()
}
k.done = make(chan struct{})
// Start the kafka message reader
go k.receiver()
log.Printf("I! Started the kafka consumer service, brokers: %v, topics: %v\n",
ctx, cancel := context.WithCancel(context.Background())
k.cancel = cancel
// Start consumer goroutine
k.wg = &sync.WaitGroup{}
k.wg.Add(1)
go func() {
defer k.wg.Done()
k.receiver(ctx, acc)
}()
log.Printf("I! Started the kafka consumer service, brokers: %v, topics: %v",
k.Brokers, k.Topics)
return nil
}
// receiver() reads all incoming messages from the consumer, and parses them into
// influxdb metric points.
func (k *Kafka) receiver() {
func (k *Kafka) receiver(ctx context.Context, ac telegraf.Accumulator) {
k.messages = make(map[telegraf.TrackingID]*sarama.ConsumerMessage)
acc := ac.WithTracking(k.MaxUndeliveredMessages)
sem := make(semaphore, k.MaxUndeliveredMessages)
for {
select {
case <-k.done:
case <-ctx.Done():
return
case err := <-k.errs:
if err != nil {
k.acc.AddError(fmt.Errorf("Consumer Error: %s\n", err))
}
case msg := <-k.in:
if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen {
k.acc.AddError(fmt.Errorf("Message longer than max_message_len (%d > %d)",
len(msg.Value), k.MaxMessageLen))
} else {
metrics, err := k.parser.Parse(msg.Value)
case track := <-acc.Delivered():
<-sem
k.onDelivery(track)
case err := <-k.cluster.Errors():
acc.AddError(err)
case sem <- empty{}:
select {
case <-ctx.Done():
return
case track := <-acc.Delivered():
// Once for the delivered message, once to leave the case
<-sem
<-sem
k.onDelivery(track)
case err := <-k.cluster.Errors():
<-sem
acc.AddError(err)
case msg := <-k.cluster.Messages():
err := k.onMessage(acc, msg)
if err != nil {
k.acc.AddError(fmt.Errorf("Message Parse Error\nmessage: %s\nerror: %s",
string(msg.Value), err.Error()))
acc.AddError(err)
<-sem
}
for _, metric := range metrics {
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
}
if !k.doNotCommitMsgs {
// TODO(cam) this locking can be removed if this PR gets merged:
// https://github.com/wvanbergen/kafka/pull/84
k.Lock()
k.Cluster.MarkOffset(msg, "")
k.Unlock()
}
}
}
}
func (k *Kafka) markOffset(msg *sarama.ConsumerMessage) {
if !k.doNotCommitMsgs {
k.cluster.MarkOffset(msg, "")
}
}
func (k *Kafka) onMessage(acc telegraf.TrackingAccumulator, msg *sarama.ConsumerMessage) error {
if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen {
k.markOffset(msg)
return fmt.Errorf("Message longer than max_message_len (%d > %d)",
len(msg.Value), k.MaxMessageLen)
}
metrics, err := k.parser.Parse(msg.Value)
if err != nil {
return err
}
id := acc.AddTrackingMetricGroup(metrics)
k.messages[id] = msg
return nil
}
func (k *Kafka) onDelivery(track telegraf.DeliveryInfo) {
msg, ok := k.messages[track.ID()]
if !ok {
log.Printf("E! [inputs.kafka_consumer] Could not mark message delivered: %d", track.ID())
}
if track.Delivered() {
k.markOffset(msg)
}
delete(k.messages, track.ID())
}
func (k *Kafka) Stop() {
k.Lock()
defer k.Unlock()
close(k.done)
if err := k.Cluster.Close(); err != nil {
k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error()))
k.cancel()
k.wg.Wait()
if err := k.cluster.Close(); err != nil {
log.Printf("E! [inputs.kafka_consumer] Error closing consumer: %v", err)
}
}
@@ -243,6 +290,8 @@ func (k *Kafka) Gather(acc telegraf.Accumulator) error {
func init() {
inputs.Add("kafka_consumer", func() telegraf.Input {
return &Kafka{}
return &Kafka{
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
}
})
}

View File

@@ -38,7 +38,6 @@ func TestReadsMetricsFromKafka(t *testing.T) {
ConsumerGroup: "telegraf_test_consumers",
Topics: []string{testTopic},
Brokers: brokerPeers,
PointBuffer: 100000,
Offset: "oldest",
}
p, _ := parsers.NewInfluxParser()

View File

@@ -1,13 +1,14 @@
package kafka_consumer
import (
"context"
"strings"
"testing"
"github.com/Shopify/sarama"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
)
@@ -18,31 +19,57 @@ const (
invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n"
)
func newTestKafka() (*Kafka, chan *sarama.ConsumerMessage) {
in := make(chan *sarama.ConsumerMessage, 1000)
k := Kafka{
ConsumerGroup: "test",
Topics: []string{"telegraf"},
Brokers: []string{"localhost:9092"},
Offset: "oldest",
in: in,
doNotCommitMsgs: true,
errs: make(chan error, 1000),
done: make(chan struct{}),
type TestConsumer struct {
errors chan error
messages chan *sarama.ConsumerMessage
}
func (c *TestConsumer) Errors() <-chan error {
return c.errors
}
func (c *TestConsumer) Messages() <-chan *sarama.ConsumerMessage {
return c.messages
}
func (c *TestConsumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
}
func (c *TestConsumer) Close() error {
return nil
}
func (c *TestConsumer) Inject(msg *sarama.ConsumerMessage) {
c.messages <- msg
}
func newTestKafka() (*Kafka, *TestConsumer) {
consumer := &TestConsumer{
errors: make(chan error),
messages: make(chan *sarama.ConsumerMessage, 1000),
}
return &k, in
k := Kafka{
cluster: consumer,
ConsumerGroup: "test",
Topics: []string{"telegraf"},
Brokers: []string{"localhost:9092"},
Offset: "oldest",
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
doNotCommitMsgs: true,
messages: make(map[telegraf.TrackingID]*sarama.ConsumerMessage),
}
return &k, consumer
}
// Test that the parser parses kafka messages into points
func TestRunParser(t *testing.T) {
k, in := newTestKafka()
k, consumer := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
ctx := context.Background()
k.parser, _ = parsers.NewInfluxParser()
go k.receiver()
in <- saramaMsg(testMsg)
go k.receiver(ctx, &acc)
consumer.Inject(saramaMsg(testMsg))
acc.Wait(1)
assert.Equal(t, acc.NFields(), 1)
@@ -50,14 +77,13 @@ func TestRunParser(t *testing.T) {
// Test that the parser ignores invalid messages
func TestRunParserInvalidMsg(t *testing.T) {
k, in := newTestKafka()
k, consumer := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
ctx := context.Background()
k.parser, _ = parsers.NewInfluxParser()
go k.receiver()
in <- saramaMsg(invalidMsg)
go k.receiver(ctx, &acc)
consumer.Inject(saramaMsg(invalidMsg))
acc.WaitError(1)
assert.Equal(t, acc.NFields(), 0)
@@ -66,15 +92,14 @@ func TestRunParserInvalidMsg(t *testing.T) {
// Test that overlong messages are dropped
func TestDropOverlongMsg(t *testing.T) {
const maxMessageLen = 64 * 1024
k, in := newTestKafka()
k, consumer := newTestKafka()
k.MaxMessageLen = maxMessageLen
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
ctx := context.Background()
overlongMsg := strings.Repeat("v", maxMessageLen+1)
go k.receiver()
in <- saramaMsg(overlongMsg)
go k.receiver(ctx, &acc)
consumer.Inject(saramaMsg(overlongMsg))
acc.WaitError(1)
assert.Equal(t, acc.NFields(), 0)
@@ -82,14 +107,13 @@ func TestDropOverlongMsg(t *testing.T) {
// Test that the parser parses kafka messages into points
func TestRunParserAndGather(t *testing.T) {
k, in := newTestKafka()
k, consumer := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
ctx := context.Background()
k.parser, _ = parsers.NewInfluxParser()
go k.receiver()
in <- saramaMsg(testMsg)
go k.receiver(ctx, &acc)
consumer.Inject(saramaMsg(testMsg))
acc.Wait(1)
acc.GatherError(k.Gather)
@@ -101,14 +125,13 @@ func TestRunParserAndGather(t *testing.T) {
// Test that the parser parses kafka messages into points
func TestRunParserAndGatherGraphite(t *testing.T) {
k, in := newTestKafka()
k, consumer := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
ctx := context.Background()
k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
go k.receiver()
in <- saramaMsg(testMsgGraphite)
go k.receiver(ctx, &acc)
consumer.Inject(saramaMsg(testMsgGraphite))
acc.Wait(1)
acc.GatherError(k.Gather)
@@ -120,17 +143,16 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
// Test that the parser parses kafka messages into points
func TestRunParserAndGatherJSON(t *testing.T) {
k, in := newTestKafka()
k, consumer := newTestKafka()
acc := testutil.Accumulator{}
k.acc = &acc
defer close(k.done)
ctx := context.Background()
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver()
in <- saramaMsg(testMsgJSON)
go k.receiver(ctx, &acc)
consumer.Inject(saramaMsg(testMsgJSON))
acc.Wait(1)
acc.GatherError(k.Gather)

View File

@@ -1,14 +1,11 @@
# MQTT Consumer Input Plugin
The [MQTT](http://mqtt.org/) consumer plugin reads from
specified MQTT topics and adds messages to InfluxDB.
The plugin expects messages in the
[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
The [MQTT][mqtt] consumer plugin reads from the specified MQTT topics
and creates metrics using one of the supported [input data formats][].
### Configuration:
```toml
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
## MQTT broker URLs to be used. The format should be scheme://host:port,
## schema can be tcp, ssl, or ws.
@@ -26,6 +23,16 @@ The plugin expects messages in the
## Connection timeout for initial connection in seconds
connection_timeout = "30s"
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Topics to subscribe to
topics = [
"telegraf/host01/cpu",
@@ -62,3 +69,6 @@ The plugin expects messages in the
- All measurements are tagged with the incoming topic, ie
`topic=telegraf/host01/cpu`
[mqtt]: https://mqtt.org
[input data formats]: /docs/DATA_FORMATS_INPUT.md

View File

@@ -1,25 +1,31 @@
package mqtt_consumer
import (
"context"
"errors"
"fmt"
"log"
"strings"
"time"
"github.com/eclipse/paho.mqtt.golang"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/eclipse/paho.mqtt.golang"
)
// 30 Seconds is the default used by paho.mqtt.golang
var defaultConnectionTimeout = internal.Duration{Duration: 30 * time.Second}
var (
// 30 Seconds is the default used by paho.mqtt.golang
defaultConnectionTimeout = internal.Duration{Duration: 30 * time.Second}
defaultMaxUndeliveredMessages = 1000
)
type ConnectionState int
type empty struct{}
type semaphore chan empty
const (
Disconnected ConnectionState = iota
@@ -28,12 +34,13 @@ const (
)
type MQTTConsumer struct {
Servers []string
Topics []string
Username string
Password string
QoS int `toml:"qos"`
ConnectionTimeout internal.Duration `toml:"connection_timeout"`
Servers []string
Topics []string
Username string
Password string
QoS int `toml:"qos"`
ConnectionTimeout internal.Duration `toml:"connection_timeout"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
parser parsers.Parser
@@ -45,9 +52,14 @@ type MQTTConsumer struct {
tls.ClientConfig
client mqtt.Client
acc telegraf.Accumulator
acc telegraf.TrackingAccumulator
state ConnectionState
subscribed bool
sem semaphore
messages map[telegraf.TrackingID]bool
ctx context.Context
cancel context.CancelFunc
}
var sampleConfig = `
@@ -67,6 +79,16 @@ var sampleConfig = `
## Connection timeout for initial connection in seconds
connection_timeout = "30s"
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Topics to subscribe to
topics = [
"telegraf/host01/cpu",
@@ -118,7 +140,6 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return errors.New("persistent_session requires client_id")
}
m.acc = acc
if m.QoS > 2 || m.QoS < 0 {
return fmt.Errorf("qos value must be 0, 1, or 2: %d", m.QoS)
}
@@ -127,6 +148,9 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return fmt.Errorf("connection_timeout must be greater than 1s: %s", m.ConnectionTimeout.Duration)
}
m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
m.ctx, m.cancel = context.WithCancel(context.Background())
opts, err := m.createOpts()
if err != nil {
return err
@@ -146,8 +170,10 @@ func (m *MQTTConsumer) connect() error {
return err
}
log.Printf("I! [inputs.mqtt_consumer]: connected %v", m.Servers)
log.Printf("I! [inputs.mqtt_consumer] Connected %v", m.Servers)
m.state = Connected
m.sem = make(semaphore, m.MaxUndeliveredMessages)
m.messages = make(map[telegraf.TrackingID]bool)
// Only subscribe on first connection when using persistent sessions. On
// subsequent connections the subscriptions should be stored in the
@@ -172,38 +198,64 @@ func (m *MQTTConsumer) connect() error {
func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) {
m.acc.AddError(fmt.Errorf("connection lost: %v", err))
log.Printf("D! [inputs.mqtt_consumer]: disconnected %v", m.Servers)
log.Printf("D! [inputs.mqtt_consumer] Disconnected %v", m.Servers)
m.state = Disconnected
return
}
func (m *MQTTConsumer) recvMessage(c mqtt.Client, msg mqtt.Message) {
topic := msg.Topic()
for {
select {
case track := <-m.acc.Delivered():
_, ok := m.messages[track.ID()]
if !ok {
// Added by a previous connection
continue
}
<-m.sem
// No ack, MQTT does not support durable handling
delete(m.messages, track.ID())
case m.sem <- empty{}:
err := m.onMessage(m.acc, msg)
if err != nil {
m.acc.AddError(err)
<-m.sem
}
return
}
}
}
func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Message) error {
metrics, err := m.parser.Parse(msg.Payload())
if err != nil {
m.acc.AddError(err)
return err
}
topic := msg.Topic()
for _, metric := range metrics {
tags := metric.Tags()
tags["topic"] = topic
m.acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
metric.AddTag("topic", topic)
}
id := acc.AddTrackingMetricGroup(metrics)
m.messages[id] = true
return nil
}
func (m *MQTTConsumer) Stop() {
if m.state == Connected {
log.Printf("D! [inputs.mqtt_consumer]: disconnecting %v", m.Servers)
log.Printf("D! [inputs.mqtt_consumer] Disconnecting %v", m.Servers)
m.client.Disconnect(200)
log.Printf("D! [inputs.mqtt_consumer]: disconnected %v", m.Servers)
log.Printf("D! [inputs.mqtt_consumer] Disconnected %v", m.Servers)
m.state = Disconnected
}
m.cancel()
}
func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error {
if m.state == Disconnected {
m.state = Connecting
log.Printf("D! [inputs.mqtt_consumer]: connecting %v", m.Servers)
log.Printf("D! [inputs.mqtt_consumer] Connecting %v", m.Servers)
m.connect()
}
@@ -246,7 +298,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
for _, server := range m.Servers {
// Preserve support for host:port style servers; deprecated in Telegraf 1.4.4
if !strings.Contains(server, "://") {
log.Printf("W! [inputs.mqtt_consumer] server %q should be updated to use `scheme://host:port` format", server)
log.Printf("W! [inputs.mqtt_consumer] Server %q should be updated to use `scheme://host:port` format", server)
if tlsCfg == nil {
server = "tcp://" + server
} else {
@@ -267,8 +319,9 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
func init() {
inputs.Add("mqtt_consumer", func() telegraf.Input {
return &MQTTConsumer{
ConnectionTimeout: defaultConnectionTimeout,
state: Disconnected,
ConnectionTimeout: defaultConnectionTimeout,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
state: Disconnected,
}
})
}

View File

@@ -3,12 +3,9 @@ package mqtt_consumer
import (
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/eclipse/paho.mqtt.golang"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
const (
@@ -71,47 +68,6 @@ func TestPersistentClientIDFail(t *testing.T) {
assert.Error(t, err)
}
func TestRunParser(t *testing.T) {
n := newTestMQTTConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
n.parser, _ = parsers.NewInfluxParser()
n.recvMessage(nil, mqttMsg(testMsg))
if a := acc.NFields(); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}
}
// Test that the parser ignores invalid messages
func TestRunParserInvalidMsg(t *testing.T) {
n := newTestMQTTConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
n.parser, _ = parsers.NewInfluxParser()
n.recvMessage(nil, mqttMsg(invalidMsg))
if a := acc.NFields(); a != 0 {
t.Errorf("got %v, expected %v", a, 0)
}
assert.Len(t, acc.Errors, 1)
}
// Test that the parser parses line format messages into metrics
func TestRunParserAndGather(t *testing.T) {
n := newTestMQTTConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
n.parser, _ = parsers.NewInfluxParser()
n.recvMessage(nil, mqttMsg(testMsg))
acc.AssertContainsFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(23422)})
}
func mqttMsg(val string) mqtt.Message {
return &message{
topic: "telegraf/unit_test",

View File

@@ -1,16 +1,14 @@
# NATS Consumer Input Plugin
The [NATS](http://www.nats.io/about/) consumer plugin reads from
specified NATS subjects and adds messages to InfluxDB. The plugin expects messages
in the [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
A [Queue Group](http://www.nats.io/documentation/concepts/nats-queueing/)
is used when subscribing to subjects so multiple instances of telegraf can read
from a NATS cluster in parallel.
The [NATS][nats] consumer plugin reads from the specified NATS subjects and
creates metrics using one of the supported [input data formats][].
## Configuration
A [Queue Group][queue group] is used when subscribing to subjects so multiple
instances of telegraf can read from a NATS cluster in parallel.
### Configuration:
```toml
# Read metrics from NATS subject(s)
[[inputs.nats_consumer]]
## urls of NATS servers
servers = ["nats://localhost:4222"]
@@ -20,13 +18,29 @@ from a NATS cluster in parallel.
subjects = ["telegraf"]
## name a queue group
queue_group = "telegraf_consumers"
## Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
## Data format to consume.
## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536
# pending_bytes_limit = 67108864
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```
[nats]: https://www.nats.io/about/
[input data formats]: /docs/DATA_FORMATS_INPUT.md
[queue group]: https://www.nats.io/documentation/concepts/nats-queueing/

View File

@@ -1,6 +1,7 @@
package natsconsumer
import (
"context"
"fmt"
"log"
"sync"
@@ -11,6 +12,13 @@ import (
nats "github.com/nats-io/go-nats"
)
var (
defaultMaxUndeliveredMessages = 1000
)
type empty struct{}
type semaphore chan empty
type natsError struct {
conn *nats.Conn
sub *nats.Subscription
@@ -23,48 +31,58 @@ func (e natsError) Error() string {
}
type natsConsumer struct {
QueueGroup string
Subjects []string
Servers []string
Secure bool
QueueGroup string `toml:"queue_group"`
Subjects []string `toml:"subjects"`
Servers []string `toml:"servers"`
Secure bool `toml:"secure"`
// Client pending limits:
PendingMessageLimit int
PendingBytesLimit int
PendingMessageLimit int `toml:"pending_message_limit"`
PendingBytesLimit int `toml:"pending_bytes_limit"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
// Legacy metric buffer support; deprecated in v0.10.3
MetricBuffer int
conn *nats.Conn
subs []*nats.Subscription
parser parsers.Parser
sync.Mutex
wg sync.WaitGroup
Conn *nats.Conn
Subs []*nats.Subscription
// channel for all incoming NATS messages
in chan *nats.Msg
// channel for all NATS read errors
errs chan error
done chan struct{}
acc telegraf.Accumulator
errs chan error
acc telegraf.TrackingAccumulator
wg sync.WaitGroup
cancel context.CancelFunc
}
var sampleConfig = `
## urls of NATS servers
# servers = ["nats://localhost:4222"]
servers = ["nats://localhost:4222"]
## Use Transport Layer Security
# secure = false
secure = false
## subject(s) to consume
# subjects = ["telegraf"]
subjects = ["telegraf"]
## name a queue group
# queue_group = "telegraf_consumers"
queue_group = "telegraf_consumers"
## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536
# pending_bytes_limit = 67108864
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
@@ -94,10 +112,7 @@ func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro
// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up.
func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
n.Lock()
defer n.Unlock()
n.acc = acc
n.acc = acc.WithTracking(n.MaxUndeliveredMessages)
var connectErr error
@@ -112,89 +127,106 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
opts.Secure = n.Secure
if n.Conn == nil || n.Conn.IsClosed() {
n.Conn, connectErr = opts.Connect()
if n.conn == nil || n.conn.IsClosed() {
n.conn, connectErr = opts.Connect()
if connectErr != nil {
return connectErr
}
// Setup message and error channels
n.errs = make(chan error)
n.Conn.SetErrorHandler(n.natsErrHandler)
n.conn.SetErrorHandler(n.natsErrHandler)
n.in = make(chan *nats.Msg, 1000)
for _, subj := range n.Subjects {
sub, err := n.Conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) {
sub, err := n.conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) {
n.in <- m
})
if err != nil {
return err
}
// ensure that the subscription has been processed by the server
if err = n.Conn.Flush(); err != nil {
if err = n.conn.Flush(); err != nil {
return err
}
// set the subscription pending limits
if err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit); err != nil {
return err
}
n.Subs = append(n.Subs, sub)
n.subs = append(n.subs, sub)
}
}
n.done = make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
n.cancel = cancel
// Start the message reader
n.wg.Add(1)
go n.receiver()
go func() {
defer n.wg.Done()
go n.receiver(ctx)
}()
log.Printf("I! Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n",
n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup)
n.conn.ConnectedUrl(), n.Subjects, n.QueueGroup)
return nil
}
// receiver() reads all incoming messages from NATS, and parses them into
// telegraf metrics.
func (n *natsConsumer) receiver() {
defer n.wg.Done()
func (n *natsConsumer) receiver(ctx context.Context) {
sem := make(semaphore, n.MaxUndeliveredMessages)
for {
select {
case <-n.done:
case <-ctx.Done():
return
case <-n.acc.Delivered():
<-sem
case err := <-n.errs:
n.acc.AddError(fmt.Errorf("E! error reading from %s\n", err.Error()))
case msg := <-n.in:
metrics, err := n.parser.Parse(msg.Data)
if err != nil {
n.acc.AddError(fmt.Errorf("E! subject: %s, error: %s", msg.Subject, err.Error()))
}
n.acc.AddError(err)
case sem <- empty{}:
select {
case <-ctx.Done():
return
case err := <-n.errs:
<-sem
n.acc.AddError(err)
case <-n.acc.Delivered():
<-sem
<-sem
case msg := <-n.in:
metrics, err := n.parser.Parse(msg.Data)
if err != nil {
n.acc.AddError(fmt.Errorf("subject: %s, error: %s", msg.Subject, err.Error()))
<-sem
continue
}
for _, metric := range metrics {
n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
n.acc.AddTrackingMetricGroup(metrics)
}
}
}
}
func (n *natsConsumer) clean() {
for _, sub := range n.Subs {
for _, sub := range n.subs {
if err := sub.Unsubscribe(); err != nil {
n.acc.AddError(fmt.Errorf("E! Error unsubscribing from subject %s in queue %s: %s\n",
n.acc.AddError(fmt.Errorf("Error unsubscribing from subject %s in queue %s: %s\n",
sub.Subject, sub.Queue, err.Error()))
}
}
if n.Conn != nil && !n.Conn.IsClosed() {
n.Conn.Close()
if n.conn != nil && !n.conn.IsClosed() {
n.conn.Close()
}
}
func (n *natsConsumer) Stop() {
n.Lock()
close(n.done)
n.cancel()
n.wg.Wait()
n.clean()
n.Unlock()
}
func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
@@ -204,12 +236,13 @@ func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
func init() {
inputs.Add("nats_consumer", func() telegraf.Input {
return &natsConsumer{
Servers: []string{"nats://localhost:4222"},
Secure: false,
Subjects: []string{"telegraf"},
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
Servers: []string{"nats://localhost:4222"},
Secure: false,
Subjects: []string{"telegraf"},
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
}
})
}

View File

@@ -1,134 +0,0 @@
package natsconsumer
import (
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
nats "github.com/nats-io/go-nats"
"github.com/stretchr/testify/assert"
)
const (
testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n"
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n"
metricBuffer = 5
)
func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) {
in := make(chan *nats.Msg, metricBuffer)
n := &natsConsumer{
QueueGroup: "test",
Subjects: []string{"telegraf"},
Servers: []string{"nats://localhost:4222"},
Secure: false,
in: in,
errs: make(chan error, metricBuffer),
done: make(chan struct{}),
}
return n, in
}
// Test that the parser parses NATS messages into metrics
func TestRunParser(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
n.wg.Add(1)
go n.receiver()
in <- natsMsg(testMsg)
acc.Wait(1)
}
// Test that the parser ignores invalid messages
func TestRunParserInvalidMsg(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
n.wg.Add(1)
go n.receiver()
in <- natsMsg(invalidMsg)
acc.WaitError(1)
assert.Contains(t, acc.Errors[0].Error(), "E! subject: telegraf, error: metric parse error")
assert.EqualValues(t, 0, acc.NMetrics())
}
// Test that the parser parses line format messages into metrics
func TestRunParserAndGather(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()
n.wg.Add(1)
go n.receiver()
in <- natsMsg(testMsg)
n.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(23422)})
}
// Test that the parser parses graphite format messages into metrics
func TestRunParserAndGatherGraphite(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
n.wg.Add(1)
go n.receiver()
in <- natsMsg(testMsgGraphite)
n.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "cpu_load_short_graphite",
map[string]interface{}{"value": float64(23422)})
}
// Test that the parser parses json format messages into metrics
func TestRunParserAndGatherJSON(t *testing.T) {
n, in := newTestNatsConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "nats_json_test",
})
n.wg.Add(1)
go n.receiver()
in <- natsMsg(testMsgJSON)
n.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "nats_json_test",
map[string]interface{}{
"a": float64(5),
"b_c": float64(6),
})
}
func natsMsg(val string) *nats.Msg {
return &nats.Msg{
Subject: "telegraf",
Data: []byte(val),
}
}

View File

@@ -1,9 +1,9 @@
# NSQ Consumer Input Plugin
The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD
topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types.
The [NSQ][nsq] consumer plugin reads from NSQD and creates metrics using one
of the supported [input data formats][].
## Configuration
### Configuration:
```toml
# Read metrics from NSQD topic(s)
@@ -18,6 +18,16 @@ topic and adds messages to InfluxDB. This plugin allows a message to be in any o
channel = "consumer"
max_in_flight = 100
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
@@ -25,5 +35,5 @@ topic and adds messages to InfluxDB. This plugin allows a message to be in any o
data_format = "influx"
```
## Testing
The `nsq_consumer_test` mocks out the interaction with `NSQD`. It requires no outside dependencies.
[nsq]: https://nsq.io
[input data formats]: /docs/DATA_FORMATS_INPUT.md

View File

@@ -1,7 +1,9 @@
package nsq_consumer
import (
"fmt"
"context"
"log"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
@@ -9,17 +11,38 @@ import (
nsq "github.com/nsqio/go-nsq"
)
const (
defaultMaxUndeliveredMessages = 1000
)
type empty struct{}
type semaphore chan empty
type logger struct{}
func (l *logger) Output(calldepth int, s string) error {
log.Println("D! [inputs.nsq_consumer] " + s)
return nil
}
//NSQConsumer represents the configuration of the plugin
type NSQConsumer struct {
Server string
Nsqd []string
Nsqlookupd []string
Topic string
Channel string
MaxInFlight int
parser parsers.Parser
consumer *nsq.Consumer
acc telegraf.Accumulator
Server string `toml:"server"`
Nsqd []string `toml:"nsqd"`
Nsqlookupd []string `toml:"nsqlookupd"`
Topic string `toml:"topic"`
Channel string `toml:"channel"`
MaxInFlight int `toml:"max_in_flight"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
parser parsers.Parser
consumer *nsq.Consumer
mu sync.Mutex
messages map[telegraf.TrackingID]*nsq.Message
wg sync.WaitGroup
cancel context.CancelFunc
}
var sampleConfig = `
@@ -33,6 +56,16 @@ var sampleConfig = `
channel = "consumer"
max_in_flight = 100
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
@@ -40,12 +73,6 @@ var sampleConfig = `
data_format = "influx"
`
func init() {
inputs.Add("nsq_consumer", func() telegraf.Input {
return &NSQConsumer{}
})
}
// SetParser takes the data_format from the config and finds the right parser for that format
func (n *NSQConsumer) SetParser(parser parsers.Parser) {
n.parser = parser
@@ -62,32 +89,88 @@ func (n *NSQConsumer) Description() string {
}
// Start pulls data from nsq
func (n *NSQConsumer) Start(acc telegraf.Accumulator) error {
n.acc = acc
func (n *NSQConsumer) Start(ac telegraf.Accumulator) error {
acc := ac.WithTracking(n.MaxUndeliveredMessages)
sem := make(semaphore, n.MaxUndeliveredMessages)
n.messages = make(map[telegraf.TrackingID]*nsq.Message, n.MaxUndeliveredMessages)
ctx, cancel := context.WithCancel(context.Background())
n.cancel = cancel
n.connect()
n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
n.consumer.SetLogger(&logger{}, nsq.LogLevelInfo)
n.consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
metrics, err := n.parser.Parse(message.Body)
if err != nil {
acc.AddError(fmt.Errorf("E! NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error()))
acc.AddError(err)
// Remove the message from the queue
message.Finish()
return nil
}
for _, metric := range metrics {
n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
if len(metrics) == 0 {
message.Finish()
return nil
}
message.Finish()
select {
case <-ctx.Done():
return ctx.Err()
case sem <- empty{}:
break
}
n.mu.Lock()
id := acc.AddTrackingMetricGroup(metrics)
n.messages[id] = message
n.mu.Unlock()
message.DisableAutoResponse()
return nil
}), n.MaxInFlight)
}))
if len(n.Nsqlookupd) > 0 {
n.consumer.ConnectToNSQLookupds(n.Nsqlookupd)
}
n.consumer.ConnectToNSQDs(append(n.Nsqd, n.Server))
n.wg.Add(1)
go func() {
defer n.wg.Done()
n.onDelivery(ctx, acc, sem)
}()
return nil
}
func (n *NSQConsumer) onDelivery(ctx context.Context, acc telegraf.TrackingAccumulator, sem semaphore) {
for {
select {
case <-ctx.Done():
return
case info := <-acc.Delivered():
n.mu.Lock()
msg, ok := n.messages[info.ID()]
if !ok {
n.mu.Unlock()
continue
}
<-sem
delete(n.messages, info.ID())
n.mu.Unlock()
if info.Delivered() {
msg.Finish()
} else {
msg.Requeue(-1)
}
}
}
}
// Stop processing messages
func (n *NSQConsumer) Stop() {
n.cancel()
n.wg.Wait()
n.consumer.Stop()
<-n.consumer.StopChan
}
// Gather is a noop
@@ -107,3 +190,11 @@ func (n *NSQConsumer) connect() error {
}
return nil
}
func init() {
inputs.Add("nsq_consumer", func() telegraf.Input {
return &NSQConsumer{
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
}
})
}

View File

@@ -36,11 +36,12 @@ func TestReadsMetricsFromNSQ(t *testing.T) {
newMockNSQD(script, addr.String())
consumer := &NSQConsumer{
Server: "127.0.0.1:4155",
Topic: "telegraf",
Channel: "consume",
MaxInFlight: 1,
Nsqd: []string{"127.0.0.1:4155"},
Server: "127.0.0.1:4155",
Topic: "telegraf",
Channel: "consume",
MaxInFlight: 1,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Nsqd: []string{"127.0.0.1:4155"},
}
p, _ := parsers.NewInfluxParser()

View File

@@ -2,6 +2,7 @@ package socket_listener
import (
"bufio"
"crypto/tls"
"fmt"
"io"
"log"
@@ -9,11 +10,8 @@ import (
"os"
"strings"
"sync"
"time"
"crypto/tls"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
tlsint "github.com/influxdata/telegraf/internal/tls"
@@ -120,7 +118,7 @@ func (ssl *streamSocketListener) read(c net.Conn) {
continue
}
for _, m := range metrics {
ssl.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
ssl.AddMetric(m)
}
}
@@ -156,7 +154,7 @@ func (psl *packetSocketListener) listen() {
continue
}
for _, m := range metrics {
psl.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
psl.AddMetric(m)
}
}
}

View File

@@ -7,11 +7,13 @@ import (
type Discard struct{}
func (d *Discard) Connect() error { return nil }
func (d *Discard) Close() error { return nil }
func (d *Discard) SampleConfig() string { return "" }
func (d *Discard) Description() string { return "Send metrics to nowhere at all" }
func (d *Discard) Write(metrics []telegraf.Metric) error { return nil }
func (d *Discard) Connect() error { return nil }
func (d *Discard) Close() error { return nil }
func (d *Discard) SampleConfig() string { return "" }
func (d *Discard) Description() string { return "Send metrics to nowhere at all" }
func (d *Discard) Write(metrics []telegraf.Metric) error {
return nil
}
func init() {
outputs.Add("discard", func() telegraf.Output { return &Discard{} })

View File

@@ -144,7 +144,7 @@ func (p *PrometheusClient) auth(h http.Handler) http.Handler {
})
}
func (p *PrometheusClient) Start() error {
func (p *PrometheusClient) Connect() error {
defaultCollectors := map[string]bool{
"gocollector": true,
"process": true,
@@ -200,15 +200,6 @@ func (p *PrometheusClient) Start() error {
return nil
}
func (p *PrometheusClient) Stop() {
// plugin gets cleaned up in Close() already.
}
func (p *PrometheusClient) Connect() error {
// This service output does not need to make any further connections
return nil
}
func (p *PrometheusClient) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

View File

@@ -600,7 +600,7 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
pClient, p, err := setupPrometheus()
require.NoError(t, err)
defer pClient.Stop()
defer pClient.Close()
now := time.Now()
tags := make(map[string]string)
@@ -675,7 +675,7 @@ func setupPrometheus() (*PrometheusClient, *prometheus_input.Prometheus, error)
pTesting = NewClient()
pTesting.Listen = "localhost:9127"
pTesting.Path = "/metrics"
err := pTesting.Start()
err := pTesting.Connect()
if err != nil {
return nil, nil, err
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/processors"
)
@@ -76,12 +77,12 @@ var sampleConfig = `
## tags. If this setting is different than "" the plugin will add a
## tag (which name will be the value of this setting) to each metric with
## the value of the calculated GroupBy tag. Useful for debugging
# add_groupby_tag = ""
# add_groupby_tag = ""
## These settings provide a way to know the position of each metric in
## the top k. The 'add_rank_field' setting allows to specify for which
## fields the position is required. If the list is non empty, then a field
## will be added to each and every metric for each string present in this
## will be added to each and every metric for each string present in this
## setting. This field will contain the ranking of the group that
## the metric belonged to when aggregated over that field.
## The name of the field will be set to the name of the aggregation field,
@@ -208,6 +209,11 @@ func (t *TopK) Apply(in ...telegraf.Metric) []telegraf.Metric {
// Add the metrics received to our internal cache
for _, m := range in {
// When tracking metrics this plugin could deadlock the input by
// holding undelivered metrics while the input waits for metrics to be
// delivered. Instead, treat all handled metrics as delivered and
// produced metrics as untracked in a similar way to aggregators.
m.Drop()
// Check if the metric has any of the fields over which we are aggregating
hasField := false
@@ -281,7 +287,6 @@ func (t *TopK) push() []telegraf.Metric {
// Create a one dimensional list with the top K metrics of each key
for i, ag := range aggregations[0:min(t.K, len(aggregations))] {
// Check whether of not we need to add fields of tags to the selected metrics
if len(t.aggFieldSet) != 0 || len(t.rankFieldSet) != 0 || groupTag != "" {
for _, m := range t.cache[ag.groupbykey] {
@@ -311,7 +316,16 @@ func (t *TopK) push() []telegraf.Metric {
t.Reset()
return ret
result := make([]telegraf.Metric, 0, len(ret))
for _, m := range ret {
copy, err := metric.New(m.Name(), m.Tags(), m.Fields(), m.Time(), m.Type())
if err != nil {
continue
}
result = append(result, copy)
}
return result
}
// Function that generates the aggregation functions

View File

@@ -1,12 +1,12 @@
package topk
import (
"reflect"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
)
// Key, value pair that represents a telegraf.Metric Field
@@ -95,7 +95,7 @@ func deepCopy(a []telegraf.Metric) []telegraf.Metric {
func belongs(m telegraf.Metric, ms []telegraf.Metric) bool {
for _, i := range ms {
if reflect.DeepEqual(i, m) {
if testutil.MetricEqual(i, m) {
return true
}
}