updated for new output Write function
removed HTTP listener port in docker compose. Not being used by plugin.
This commit is contained in:
parent
2a76942a74
commit
e13500fc4f
|
@ -2,10 +2,9 @@ package nsq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/influxdb/influxdb/client"
|
"github.com/influxdb/influxdb/client/v2"
|
||||||
"github.com/influxdb/telegraf/outputs"
|
"github.com/influxdb/telegraf/outputs"
|
||||||
"github.com/nsqio/go-nsq"
|
"github.com/nsqio/go-nsq"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type NSQ struct {
|
type NSQ struct {
|
||||||
|
@ -46,34 +45,15 @@ func (n *NSQ) Description() string {
|
||||||
return "Send telegraf measurements to NSQD"
|
return "Send telegraf measurements to NSQD"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NSQ) Write(bp client.BatchPoints) error {
|
func (n *NSQ) Write(points []*client.Point) error {
|
||||||
if len(bp.Points) == 0 {
|
if len(points) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var zeroTime time.Time
|
for _, p := range points {
|
||||||
for _, p := range bp.Points {
|
|
||||||
// Combine tags from Point and BatchPoints and grab the resulting
|
// Combine tags from Point and BatchPoints and grab the resulting
|
||||||
// line-protocol output string to write to NSQ
|
// line-protocol output string to write to NSQ
|
||||||
var value string
|
value := p.String()
|
||||||
if p.Raw != "" {
|
|
||||||
value = p.Raw
|
|
||||||
} else {
|
|
||||||
for k, v := range bp.Tags {
|
|
||||||
if p.Tags == nil {
|
|
||||||
p.Tags = make(map[string]string, len(bp.Tags))
|
|
||||||
}
|
|
||||||
p.Tags[k] = v
|
|
||||||
}
|
|
||||||
if p.Time == zeroTime {
|
|
||||||
if bp.Time == zeroTime {
|
|
||||||
p.Time = time.Now()
|
|
||||||
} else {
|
|
||||||
p.Time = bp.Time
|
|
||||||
}
|
|
||||||
}
|
|
||||||
value = p.MarshalString()
|
|
||||||
}
|
|
||||||
|
|
||||||
err := n.producer.Publish(n.Topic, []byte(value))
|
err := n.producer.Publish(n.Topic, []byte(value))
|
||||||
|
|
||||||
|
@ -82,7 +62,6 @@ func (n *NSQ) Write(bp client.BatchPoints) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
@ -52,4 +52,3 @@ nsq:
|
||||||
image: nsqio/nsq
|
image: nsqio/nsq
|
||||||
ports:
|
ports:
|
||||||
- "4150:4150"
|
- "4150:4150"
|
||||||
- "4151:4151"
|
|
||||||
|
|
Loading…
Reference in New Issue