NSQ Output plugin

NSQ output plugin, following the NSQ methodology output is a producer
to one instance of NSQD. The go library does not accept array values be
default for a Producer. Additionally service discovery is generally
done as a consumer.

Follows same methodology as Kafka Output without the tag reference.
This commit is contained in:
Jonathan Cross
2015-10-27 12:59:05 -04:00
committed by Cameron Sparr
parent c73c28de7e
commit 2a76942a74
37 changed files with 5469 additions and 0 deletions

4
outputs/nsq/README.md Normal file
View File

@@ -0,0 +1,4 @@
# NSQ Output Plugin
This plugin writes to a specified NSQD instance, usually local to the producer. It requires
a `server` name and a `topic` name.

92
outputs/nsq/nsq.go Normal file
View File

@@ -0,0 +1,92 @@
package nsq
import (
"fmt"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/outputs"
"github.com/nsqio/go-nsq"
"time"
)
type NSQ struct {
Server string
Topic string
producer *nsq.Producer
}
var sampleConfig = `
# Location of nsqd instance listening on TCP
server = "localhost:4150"
# NSQ topic for producer messages
topic = "telegraf"
`
func (n *NSQ) Connect() error {
config := nsq.NewConfig()
producer, err := nsq.NewProducer(n.Server, config)
if err != nil {
return err
}
n.producer = producer
return nil
}
func (n *NSQ) Close() error {
n.producer.Stop()
return nil
}
func (n *NSQ) SampleConfig() string {
return sampleConfig
}
func (n *NSQ) Description() string {
return "Send telegraf measurements to NSQD"
}
func (n *NSQ) Write(bp client.BatchPoints) error {
if len(bp.Points) == 0 {
return nil
}
var zeroTime time.Time
for _, p := range bp.Points {
// Combine tags from Point and BatchPoints and grab the resulting
// line-protocol output string to write to NSQ
var value string
if p.Raw != "" {
value = p.Raw
} else {
for k, v := range bp.Tags {
if p.Tags == nil {
p.Tags = make(map[string]string, len(bp.Tags))
}
p.Tags[k] = v
}
if p.Time == zeroTime {
if bp.Time == zeroTime {
p.Time = time.Now()
} else {
p.Time = bp.Time
}
}
value = p.MarshalString()
}
err := n.producer.Publish(n.Topic, []byte(value))
if err != nil {
return fmt.Errorf("FAILED to send NSQD message: %s", err)
}
}
return nil
}
func init() {
outputs.Add("nsq", func() outputs.Output {
return &NSQ{}
})
}

28
outputs/nsq/nsq_test.go Normal file
View File

@@ -0,0 +1,28 @@
package nsq
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
server := []string{testutil.GetLocalHost() + ":4150"}
n := &NSQ{
Server: server,
Topic: "telegraf",
}
// Verify that we can connect to the NSQ daemon
err := n.Connect()
require.NoError(t, err)
// Verify that we can successfully write data to the NSQ daemon
err = n.Write(testutil.MockBatchPoints())
require.NoError(t, err)
}