94 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			94 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Go
		
	
	
	
package nsq
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
 | 
						|
	"github.com/nsqio/go-nsq"
 | 
						|
 | 
						|
	"github.com/influxdata/telegraf"
 | 
						|
	"github.com/influxdata/telegraf/plugins/outputs"
 | 
						|
	"github.com/influxdata/telegraf/plugins/serializers"
 | 
						|
)
 | 
						|
 | 
						|
type NSQ struct {
 | 
						|
	Server   string
 | 
						|
	Topic    string
 | 
						|
	producer *nsq.Producer
 | 
						|
 | 
						|
	serializer serializers.Serializer
 | 
						|
}
 | 
						|
 | 
						|
var sampleConfig = `
 | 
						|
  ## Location of nsqd instance listening on TCP
 | 
						|
  server = "localhost:4150"
 | 
						|
  ## NSQ topic for producer messages
 | 
						|
  topic = "telegraf"
 | 
						|
 | 
						|
  ## Data format to output.
 | 
						|
  ## Each data format has it's own unique set of configuration options, read
 | 
						|
  ## more about them here:
 | 
						|
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
 | 
						|
  data_format = "influx"
 | 
						|
`
 | 
						|
 | 
						|
func (n *NSQ) SetSerializer(serializer serializers.Serializer) {
 | 
						|
	n.serializer = serializer
 | 
						|
}
 | 
						|
 | 
						|
func (n *NSQ) Connect() error {
 | 
						|
	config := nsq.NewConfig()
 | 
						|
	producer, err := nsq.NewProducer(n.Server, config)
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	n.producer = producer
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (n *NSQ) Close() error {
 | 
						|
	n.producer.Stop()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (n *NSQ) SampleConfig() string {
 | 
						|
	return sampleConfig
 | 
						|
}
 | 
						|
 | 
						|
func (n *NSQ) Description() string {
 | 
						|
	return "Send telegraf measurements to NSQD"
 | 
						|
}
 | 
						|
 | 
						|
func (n *NSQ) Write(metrics []telegraf.Metric) error {
 | 
						|
	if len(metrics) == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	for _, metric := range metrics {
 | 
						|
		values, err := n.serializer.Serialize(metric)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		var pubErr error
 | 
						|
		for _, value := range values {
 | 
						|
			err = n.producer.Publish(n.Topic, []byte(value))
 | 
						|
			if err != nil {
 | 
						|
				pubErr = err
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if pubErr != nil {
 | 
						|
			return fmt.Errorf("FAILED to send NSQD message: %s", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func init() {
 | 
						|
	outputs.Add("nsq", func() telegraf.Output {
 | 
						|
		return &NSQ{}
 | 
						|
	})
 | 
						|
}
 |