package nsq_consumer import ( "fmt" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" nsq "github.com/nsqio/go-nsq" ) //NSQConsumer represents the configuration of the plugin type NSQConsumer struct { Server string Nsqd []string Nsqlookupd []string Topic string Channel string MaxInFlight int parser parsers.Parser consumer *nsq.Consumer acc telegraf.Accumulator } var sampleConfig = ` ## Server option still works but is deprecated, we just prepend it to the nsqd array. # server = "localhost:4150" ## An array representing the NSQD TCP HTTP Endpoints nsqd = ["localhost:4150"] ## An array representing the NSQLookupd HTTP Endpoints nsqlookupd = ["localhost:4161"] topic = "telegraf" channel = "consumer" max_in_flight = 100 ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" ` func init() { inputs.Add("nsq_consumer", func() telegraf.Input { return &NSQConsumer{} }) } // SetParser takes the data_format from the config and finds the right parser for that format func (n *NSQConsumer) SetParser(parser parsers.Parser) { n.parser = parser } // SampleConfig returns config values for generating a sample configuration file func (n *NSQConsumer) SampleConfig() string { return sampleConfig } // Description prints description string func (n *NSQConsumer) Description() string { return "Read NSQ topic for metrics." } // Start pulls data from nsq func (n *NSQConsumer) Start(acc telegraf.Accumulator) error { n.acc = acc n.connect() n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error { metrics, err := n.parser.Parse(message.Body) if err != nil { acc.AddError(fmt.Errorf("E! NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error())) return nil } for _, metric := range metrics { n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } message.Finish() return nil }), n.MaxInFlight) if len(n.Nsqlookupd) > 0 { n.consumer.ConnectToNSQLookupds(n.Nsqlookupd) } n.consumer.ConnectToNSQDs(append(n.Nsqd, n.Server)) return nil } // Stop processing messages func (n *NSQConsumer) Stop() { n.consumer.Stop() } // Gather is a noop func (n *NSQConsumer) Gather(acc telegraf.Accumulator) error { return nil } func (n *NSQConsumer) connect() error { if n.consumer == nil { config := nsq.NewConfig() config.MaxInFlight = n.MaxInFlight consumer, err := nsq.NewConsumer(n.Topic, n.Channel, config) if err != nil { return err } n.consumer = consumer } return nil }