diff --git a/Godeps b/Godeps index 2b4fce555..5cb31c9f6 100644 --- a/Godeps +++ b/Godeps @@ -4,6 +4,7 @@ github.com/aerospike/aerospike-client-go 45863b7fd8640dc12f7fdd397104d97e1986f25 github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 +github.com/bsm/sarama-cluster dc1a390cf63c40d0a20ee9f79c4be120f79110e5 github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 github.com/couchbase/go-couchbase cb664315a324d87d19c879d9cc67fda6be8c2ac1 github.com/couchbase/gomemcached a5ea6356f648fec6ab89add00edd09151455b4b2 @@ -38,7 +39,6 @@ github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b github.com/nats-io/nats b13fc9d12b0b123ebc374e6b808c6228ae4234a3 github.com/nats-io/nuid 4f84f5f3b2786224e336af2e13dba0a0a80b76fa github.com/nsqio/go-nsq 0b80d6f05e15ca1930e0c5e1d540ed627e299980 -github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8 github.com/prometheus/client_golang 18acf9993a863f4c4b40612e19cdd243e7c86831 github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6 github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37 diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 5600d82a4..c6ef26560 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -1,39 +1,65 @@ package kafka_consumer import ( + "crypto/tls" "log" "strings" "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/Shopify/sarama" + "github.com/bsm/sarama-cluster" "github.com/wvanbergen/kafka/consumergroup" ) type Kafka struct { - ConsumerGroup string - Topics []string + // new kafka consumer + NewConsumer bool + // common for both versions + ConsumerGroup string + Topics []string + Offset string + + // for 0.8 ZookeeperPeers []string ZookeeperChroot string Consumer *consumergroup.ConsumerGroup + // for 0.9+ + BrokerList []string + Consumer9 *cluster.Consumer + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + + // Skip SSL verification + InsecureSkipVerify bool + + tlsConfig tls.Config + // Legacy metric buffer support MetricBuffer int // TODO remove PointBuffer, legacy support PointBuffer int - Offset string parser parsers.Parser sync.Mutex // channel for all incoming kafka messages in <-chan *sarama.ConsumerMessage + // channel for all kafka consumer errors - errs <-chan *sarama.ConsumerError + errs <-chan *sarama.ConsumerError + errs9 <-chan error + done chan struct{} // keep the accumulator internally: @@ -45,6 +71,8 @@ type Kafka struct { } var sampleConfig = ` + ## is new consumer? + new_consumer = true ## topic(s) to consume topics = ["telegraf"] ## an array of Zookeeper connection strings @@ -82,41 +110,88 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error { k.acc = acc - config := consumergroup.NewConfig() - config.Zookeeper.Chroot = k.ZookeeperChroot - switch strings.ToLower(k.Offset) { - case "oldest", "": - config.Offsets.Initial = sarama.OffsetOldest - case "newest": - config.Offsets.Initial = sarama.OffsetNewest - default: - log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", - k.Offset) - config.Offsets.Initial = sarama.OffsetOldest - } + log.Println(k.NewConsumer) - if k.Consumer == nil || k.Consumer.Closed() { - k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( - k.ConsumerGroup, - k.Topics, - k.ZookeeperPeers, - config, - ) - if consumerErr != nil { - return consumerErr + if !k.NewConsumer { + config := consumergroup.NewConfig() + + config.Zookeeper.Chroot = k.ZookeeperChroot + switch strings.ToLower(k.Offset) { + case "oldest", "": + config.Offsets.Initial = sarama.OffsetOldest + case "newest": + config.Offsets.Initial = sarama.OffsetNewest + default: + log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", + k.Offset) + config.Offsets.Initial = sarama.OffsetOldest } + if k.Consumer == nil || k.Consumer.Closed() { + k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( + k.ConsumerGroup, + k.Topics, + k.ZookeeperPeers, + config, + ) + if consumerErr != nil { + return consumerErr + } + + // Setup message and error channels + k.in = k.Consumer.Messages() + k.errs = k.Consumer.Errors() + } + k.done = make(chan struct{}) + // Start the kafka message reader + go k.receiver() + log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n", + k.ZookeeperPeers, k.Topics) + } else { + config := cluster.NewConfig() + + tlsConfig, err := internal.GetTLSConfig(k.SSLCert, k.SSLKey, k.SSLCA, k.InsecureSkipVerify) + if err != nil { + return err + } + + if tlsConfig != nil { + config.Net.TLS.Config = tlsConfig + config.Net.TLS.Enable = true + } + + switch strings.ToLower(k.Offset) { + case "oldest", "": + config.Consumer.Offsets.Initial = sarama.OffsetOldest + case "newest": + config.Consumer.Offsets.Initial = sarama.OffsetNewest + default: + log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", + k.Offset) + config.Consumer.Offsets.Initial = sarama.OffsetOldest + } + + // TODO: make this configurable + config.Consumer.Return.Errors = true + + if err := config.Validate(); err != nil { + return err + } + + k.Consumer9, err = cluster.NewConsumer(k.BrokerList, k.ConsumerGroup, k.Topics, config) + if err != nil { + return err + } // Setup message and error channels - k.in = k.Consumer.Messages() - k.errs = k.Consumer.Errors() + k.in = k.Consumer9.Messages() + k.errs9 = k.Consumer9.Errors() + k.done = make(chan struct{}) + // Start the kafka message reader for 0.9 + go k.collector() + log.Printf("Started the kafka consumer service with new consumer, brokers: %v, topics: %v\n", + k.BrokerList, k.Topics) } - k.done = make(chan struct{}) - - // Start the kafka message reader - go k.receiver() - log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n", - k.ZookeeperPeers, k.Topics) return nil } @@ -151,12 +226,45 @@ func (k *Kafka) receiver() { } } +// this is for kafka new consumer +func (k *Kafka) collector() { + for { + select { + case <-k.done: + return + case err := <-k.errs9: + log.Printf("Kafka Consumer Error: %s\n", err.Error()) + case msg := <-k.in: + metrics, err := k.parser.Parse(msg.Value) + + if err != nil { + log.Printf("KAFKA PARSE ERROR\nmessage: %s\nerror: %s", + string(msg.Value), err.Error()) + } + + for _, metric := range metrics { + k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + + if !k.doNotCommitMsgs { + k.Consumer9.MarkOffset(msg, "") + } + } + } +} + func (k *Kafka) Stop() { k.Lock() defer k.Unlock() close(k.done) - if err := k.Consumer.Close(); err != nil { - log.Printf("Error closing kafka consumer: %s\n", err.Error()) + if !k.NewConsumer { + if err := k.Consumer.Close(); err != nil { + log.Printf("Error closing kafka consumer: %s\n", err.Error()) + } + } else { + if err := k.Consumer9.Close(); err != nil { + log.Printf("Error closing kafka consumer: %s\n", err.Error()) + } } } diff --git a/plugins/parsers/json/parser.go b/plugins/parsers/json/parser.go index e5172ac97..a7c447dfa 100644 --- a/plugins/parsers/json/parser.go +++ b/plugins/parsers/json/parser.go @@ -11,9 +11,11 @@ import ( ) type JSONParser struct { - MetricName string - TagKeys []string - DefaultTags map[string]string + MetricName string + TagKeys []string + DefaultTags map[string]string + TimestampSelector string + TimestampFormatter string } func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { @@ -31,6 +33,25 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { tags[k] = v } + timestampStr := "" + timestampData := jsonOut[p.TimestampSelector] + if timestampData != nil { + timestampStr = timestampData.(string) + } + + if p.TimestampFormatter == "" { + p.TimestampFormatter = time.RFC3339Nano + } + + timestamp := time.Now().UTC() + if timestampStr != "" { + timestampTmp, err := time.Parse(p.TimestampFormatter, timestampStr) + if err != nil { + return nil, err + } + timestamp = timestampTmp + } + for _, tag := range p.TagKeys { switch v := jsonOut[tag].(type) { case string: @@ -45,7 +66,7 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { return nil, err } - metric, err := telegraf.NewMetric(p.MetricName, tags, f.Fields, time.Now().UTC()) + metric, err := telegraf.NewMetric(p.MetricName, tags, f.Fields, timestamp) if err != nil { return nil, err diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 360d795bc..b528eeb94 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -58,6 +58,12 @@ type Config struct { // DefaultTags are the default tags that will be added to all parsed metrics. DefaultTags map[string]string + + // TimestampSelector only applies to JSON + TimestampSelector string + + // TimestampFormatter only applies to JSON + TimestampFormatter string } // NewParser returns a Parser interface based on the given config. @@ -67,7 +73,8 @@ func NewParser(config *Config) (Parser, error) { switch config.DataFormat { case "json": parser, err = NewJSONParser(config.MetricName, - config.TagKeys, config.DefaultTags) + config.TagKeys, config.DefaultTags, + config.TimestampSelector, config.TimestampFormatter) case "value": parser, err = NewValueParser(config.MetricName, config.DataType, config.DefaultTags) @@ -88,11 +95,23 @@ func NewJSONParser( metricName string, tagKeys []string, defaultTags map[string]string, + timestampParameters ...string, ) (Parser, error) { + timestampSelector, timestampFormatter := "", "" + switch len(timestampParameters) { + case 2: + timestampFormatter = timestampParameters[1] + fallthrough + case 1: + timestampSelector = timestampParameters[0] + } + parser := &json.JSONParser{ - MetricName: metricName, - TagKeys: tagKeys, - DefaultTags: defaultTags, + MetricName: metricName, + TagKeys: tagKeys, + DefaultTags: defaultTags, + TimestampSelector: timestampSelector, + TimestampFormatter: timestampFormatter, } return parser, nil }