updated for new output Write function

removed HTTP listener port in docker compose. Not being used by plugin.
This commit is contained in:
Jonathan Cross 2015-10-27 13:41:45 -04:00
parent 6430047828
commit 6671a8d88b
2 changed files with 5 additions and 27 deletions

View File

@ -2,10 +2,9 @@ package nsq
import ( import (
"fmt" "fmt"
"github.com/influxdb/influxdb/client" "github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/outputs" "github.com/influxdb/telegraf/outputs"
"github.com/nsqio/go-nsq" "github.com/nsqio/go-nsq"
"time"
) )
type NSQ struct { type NSQ struct {
@ -46,34 +45,15 @@ func (n *NSQ) Description() string {
return "Send telegraf measurements to NSQD" return "Send telegraf measurements to NSQD"
} }
func (n *NSQ) Write(bp client.BatchPoints) error { func (n *NSQ) Write(points []*client.Point) error {
if len(bp.Points) == 0 { if len(points) == 0 {
return nil return nil
} }
var zeroTime time.Time for _, p := range points {
for _, p := range bp.Points {
// Combine tags from Point and BatchPoints and grab the resulting // Combine tags from Point and BatchPoints and grab the resulting
// line-protocol output string to write to NSQ // line-protocol output string to write to NSQ
var value string value := p.String()
if p.Raw != "" {
value = p.Raw
} else {
for k, v := range bp.Tags {
if p.Tags == nil {
p.Tags = make(map[string]string, len(bp.Tags))
}
p.Tags[k] = v
}
if p.Time == zeroTime {
if bp.Time == zeroTime {
p.Time = time.Now()
} else {
p.Time = bp.Time
}
}
value = p.MarshalString()
}
err := n.producer.Publish(n.Topic, []byte(value)) err := n.producer.Publish(n.Topic, []byte(value))
@ -82,7 +62,6 @@ func (n *NSQ) Write(bp client.BatchPoints) error {
} }
} }
return nil return nil
} }
func init() { func init() {

View File

@ -52,4 +52,3 @@ nsq:
image: nsqio/nsq image: nsqio/nsq
ports: ports:
- "4150:4150" - "4150:4150"
- "4151:4151"