replace outputs.Output with models.Output

This commit is contained in:
Cameron Sparr
2016-01-25 16:14:06 -07:00
parent 1e98823c61
commit a86d98fec0
37 changed files with 208 additions and 202 deletions

View File

@@ -8,8 +8,7 @@ import (
"time" "time"
imodels "github.com/influxdata/telegraf/internal/models" imodels "github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/influxdb/client/v2"
) )
type Accumulator interface { type Accumulator interface {
@@ -30,7 +29,7 @@ type Accumulator interface {
func NewAccumulator( func NewAccumulator(
inputConfig *imodels.InputConfig, inputConfig *imodels.InputConfig,
points chan *client.Point, points chan models.Metric,
) Accumulator { ) Accumulator {
acc := accumulator{} acc := accumulator{}
acc.points = points acc.points = points
@@ -41,7 +40,7 @@ func NewAccumulator(
type accumulator struct { type accumulator struct {
sync.Mutex sync.Mutex
points chan *client.Point points chan models.Metric
defaultTags map[string]string defaultTags map[string]string
@@ -152,15 +151,15 @@ func (ac *accumulator) AddFields(
measurement = ac.prefix + measurement measurement = ac.prefix + measurement
} }
pt, err := client.NewPoint(measurement, tags, result, timestamp) metric, err := models.NewMetric(measurement, tags, result, timestamp)
if err != nil { if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return return
} }
if ac.debug { if ac.debug {
fmt.Println("> " + pt.String()) fmt.Println("> " + metric.String())
} }
ac.points <- pt ac.points <- metric
} }
func (ac *accumulator) SetDefaultTags(tags map[string]string) { func (ac *accumulator) SetDefaultTags(tags map[string]string) {

View File

@@ -12,10 +12,8 @@ import (
"github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/config"
imodels "github.com/influxdata/telegraf/internal/models" imodels "github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/influxdb/client/v2"
) )
// Agent runs telegraf and collects data based on the given config // Agent runs telegraf and collects data based on the given config
@@ -47,7 +45,7 @@ func NewAgent(config *config.Config) (*Agent, error) {
func (a *Agent) Connect() error { func (a *Agent) Connect() error {
for _, o := range a.Config.Outputs { for _, o := range a.Config.Outputs {
switch ot := o.Output.(type) { switch ot := o.Output.(type) {
case outputs.ServiceOutput: case models.ServiceOutput:
if err := ot.Start(); err != nil { if err := ot.Start(); err != nil {
log.Printf("Service for output %s failed to start, exiting\n%s\n", log.Printf("Service for output %s failed to start, exiting\n%s\n",
o.Name, err.Error()) o.Name, err.Error())
@@ -80,7 +78,7 @@ func (a *Agent) Close() error {
for _, o := range a.Config.Outputs { for _, o := range a.Config.Outputs {
err = o.Output.Close() err = o.Output.Close()
switch ot := o.Output.(type) { switch ot := o.Output.(type) {
case outputs.ServiceOutput: case models.ServiceOutput:
ot.Stop() ot.Stop()
} }
} }
@@ -89,7 +87,7 @@ func (a *Agent) Close() error {
// gatherParallel runs the inputs that are using the same reporting interval // gatherParallel runs the inputs that are using the same reporting interval
// as the telegraf agent. // as the telegraf agent.
func (a *Agent) gatherParallel(pointChan chan *client.Point) error { func (a *Agent) gatherParallel(pointChan chan models.Metric) error {
var wg sync.WaitGroup var wg sync.WaitGroup
start := time.Now() start := time.Now()
@@ -146,7 +144,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
func (a *Agent) gatherSeparate( func (a *Agent) gatherSeparate(
shutdown chan struct{}, shutdown chan struct{},
input *imodels.RunningInput, input *imodels.RunningInput,
pointChan chan *client.Point, pointChan chan models.Metric,
) error { ) error {
ticker := time.NewTicker(input.Config.Interval) ticker := time.NewTicker(input.Config.Interval)
@@ -186,7 +184,7 @@ func (a *Agent) gatherSeparate(
func (a *Agent) Test() error { func (a *Agent) Test() error {
shutdown := make(chan struct{}) shutdown := make(chan struct{})
defer close(shutdown) defer close(shutdown)
pointChan := make(chan *client.Point) pointChan := make(chan models.Metric)
// dummy receiver for the point channel // dummy receiver for the point channel
go func() { go func() {
@@ -248,7 +246,7 @@ func (a *Agent) flush() {
} }
// flusher monitors the points input channel and flushes on the minimum interval // flusher monitors the points input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error { func (a *Agent) flusher(shutdown chan struct{}, pointChan chan models.Metric) error {
// Inelegant, but this sleep is to allow the Gather threads to run, so that // Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected. // the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
@@ -306,7 +304,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration) a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)
// channel shared between all input threads for accumulating points // channel shared between all input threads for accumulating points
pointChan := make(chan *client.Point, 1000) pointChan := make(chan models.Metric, 1000)
// Round collection to nearest interval by sleeping // Round collection to nearest interval by sleeping
if a.Config.Agent.RoundInterval { if a.Config.Agent.RoundInterval {

View File

@@ -3,8 +3,8 @@ package models
import ( import (
"strings" "strings"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
emodels "github.com/influxdata/telegraf/models"
) )
// TagFilter is the name of a tag, and the values on which to filter // TagFilter is the name of a tag, and the values on which to filter
@@ -24,8 +24,8 @@ type Filter struct {
IsActive bool IsActive bool
} }
func (f Filter) ShouldPointPass(point *client.Point) bool { func (f Filter) ShouldPointPass(metric emodels.Metric) bool {
if f.ShouldPass(point.Name()) && f.ShouldTagsPass(point.Tags()) { if f.ShouldPass(metric.Name()) && f.ShouldTagsPass(metric.Tags()) {
return true return true
} }
return false return false

View File

@@ -4,32 +4,30 @@ import (
"log" "log"
"time" "time"
"github.com/influxdata/telegraf/plugins/outputs" emodels "github.com/influxdata/telegraf/models"
"github.com/influxdata/influxdb/client/v2"
) )
const DEFAULT_POINT_BUFFER_LIMIT = 10000 const DEFAULT_POINT_BUFFER_LIMIT = 10000
type RunningOutput struct { type RunningOutput struct {
Name string Name string
Output outputs.Output Output emodels.Output
Config *OutputConfig Config *OutputConfig
Quiet bool Quiet bool
PointBufferLimit int PointBufferLimit int
points []*client.Point metrics []emodels.Metric
overwriteCounter int overwriteCounter int
} }
func NewRunningOutput( func NewRunningOutput(
name string, name string,
output outputs.Output, output emodels.Output,
conf *OutputConfig, conf *OutputConfig,
) *RunningOutput { ) *RunningOutput {
ro := &RunningOutput{ ro := &RunningOutput{
Name: name, Name: name,
points: make([]*client.Point, 0), metrics: make([]emodels.Metric, 0),
Output: output, Output: output,
Config: conf, Config: conf,
PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT, PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT,
@@ -37,34 +35,34 @@ func NewRunningOutput(
return ro return ro
} }
func (ro *RunningOutput) AddPoint(point *client.Point) { func (ro *RunningOutput) AddPoint(metric emodels.Metric) {
if ro.Config.Filter.IsActive { if ro.Config.Filter.IsActive {
if !ro.Config.Filter.ShouldPointPass(point) { if !ro.Config.Filter.ShouldPointPass(metric) {
return return
} }
} }
if len(ro.points) < ro.PointBufferLimit { if len(ro.metrics) < ro.PointBufferLimit {
ro.points = append(ro.points, point) ro.metrics = append(ro.metrics, metric)
} else { } else {
if ro.overwriteCounter == len(ro.points) { if ro.overwriteCounter == len(ro.metrics) {
ro.overwriteCounter = 0 ro.overwriteCounter = 0
} }
ro.points[ro.overwriteCounter] = point ro.metrics[ro.overwriteCounter] = metric
ro.overwriteCounter++ ro.overwriteCounter++
} }
} }
func (ro *RunningOutput) Write() error { func (ro *RunningOutput) Write() error {
start := time.Now() start := time.Now()
err := ro.Output.Write(ro.points) err := ro.Output.Write(ro.metrics)
elapsed := time.Since(start) elapsed := time.Since(start)
if err == nil { if err == nil {
if !ro.Quiet { if !ro.Quiet {
log.Printf("Wrote %d metrics to output %s in %s\n", log.Printf("Wrote %d metrics to output %s in %s\n",
len(ro.points), ro.Name, elapsed) len(ro.metrics), ro.Name, elapsed)
} }
ro.points = make([]*client.Point, 0) ro.metrics = make([]emodels.Metric, 0)
ro.overwriteCounter = 0 ro.overwriteCounter = 0
} }
return err return err

View File

@@ -28,6 +28,9 @@ type Metric interface {
// PrecisionString returns a line-protocol string of the metric, at precision // PrecisionString returns a line-protocol string of the metric, at precision
PrecisionString(precison string) string PrecisionString(precison string) string
// Point returns a influxdb client.Point object
Point() *client.Point
} }
// metric is a wrapper of the influxdb client.Point struct // metric is a wrapper of the influxdb client.Point struct
@@ -103,3 +106,7 @@ func (m *metric) String() string {
func (m *metric) PrecisionString(precison string) string { func (m *metric) PrecisionString(precison string) string {
return m.pt.PrecisionString(precison) return m.pt.PrecisionString(precison)
} }
func (m *metric) Point() *client.Point {
return m.pt
}

View File

@@ -10,7 +10,7 @@ type Output interface {
// SampleConfig returns the default configuration of the Output // SampleConfig returns the default configuration of the Output
SampleConfig() string SampleConfig() string
// Write takes in group of points to be written to the Output // Write takes in group of points to be written to the Output
Write(metrics []*Metric) error Write(metrics []Metric) error
} }
type ServiceOutput interface { type ServiceOutput interface {
@@ -23,7 +23,7 @@ type ServiceOutput interface {
// SampleConfig returns the default configuration of the Output // SampleConfig returns the default configuration of the Output
SampleConfig() string SampleConfig() string
// Write takes in group of points to be written to the Output // Write takes in group of points to be written to the Output
Write(metrics []*Metric) error Write(metrics []Metric) error
// Start the "service" that will provide an Output // Start the "service" that will provide an Output
Start() error Start() error
// Stop the "service" that will provide an Output // Stop the "service" that will provide an Output

View File

@@ -5,11 +5,13 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
// "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestSNMPErrorGet1(t *testing.T) { func TestSNMPErrorGet1(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
get1 := Data{ get1 := Data{
Name: "oid1", Name: "oid1",
Unit: "octets", Unit: "octets",
@@ -30,6 +32,9 @@ func TestSNMPErrorGet1(t *testing.T) {
} }
func TestSNMPErrorGet2(t *testing.T) { func TestSNMPErrorGet2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
get1 := Data{ get1 := Data{
Name: "oid1", Name: "oid1",
Unit: "octets", Unit: "octets",
@@ -49,6 +54,9 @@ func TestSNMPErrorGet2(t *testing.T) {
} }
func TestSNMPErrorBulk(t *testing.T) { func TestSNMPErrorBulk(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
bulk1 := Data{ bulk1 := Data{
Name: "oid1", Name: "oid1",
Unit: "octets", Unit: "octets",
@@ -69,13 +77,16 @@ func TestSNMPErrorBulk(t *testing.T) {
} }
func TestSNMPGet1(t *testing.T) { func TestSNMPGet1(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
get1 := Data{ get1 := Data{
Name: "oid1", Name: "oid1",
Unit: "octets", Unit: "octets",
Oid: ".1.3.6.1.2.1.2.2.1.16.1", Oid: ".1.3.6.1.2.1.2.2.1.16.1",
} }
h := Host{ h := Host{
Address: "127.0.0.1:31161", Address: testutil.GetLocalHost() + ":31161",
Community: "telegraf", Community: "telegraf",
Version: 2, Version: 2,
Timeout: 2.0, Timeout: 2.0,
@@ -104,12 +115,15 @@ func TestSNMPGet1(t *testing.T) {
} }
func TestSNMPGet2(t *testing.T) { func TestSNMPGet2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
get1 := Data{ get1 := Data{
Name: "oid1", Name: "oid1",
Oid: "ifNumber", Oid: "ifNumber",
} }
h := Host{ h := Host{
Address: "127.0.0.1:31161", Address: testutil.GetLocalHost() + ":31161",
Community: "telegraf", Community: "telegraf",
Version: 2, Version: 2,
Timeout: 2.0, Timeout: 2.0,
@@ -139,6 +153,9 @@ func TestSNMPGet2(t *testing.T) {
} }
func TestSNMPGet3(t *testing.T) { func TestSNMPGet3(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
get1 := Data{ get1 := Data{
Name: "oid1", Name: "oid1",
Unit: "octets", Unit: "octets",
@@ -146,7 +163,7 @@ func TestSNMPGet3(t *testing.T) {
Instance: "1", Instance: "1",
} }
h := Host{ h := Host{
Address: "127.0.0.1:31161", Address: testutil.GetLocalHost() + ":31161",
Community: "telegraf", Community: "telegraf",
Version: 2, Version: 2,
Timeout: 2.0, Timeout: 2.0,
@@ -177,6 +194,9 @@ func TestSNMPGet3(t *testing.T) {
} }
func TestSNMPEasyGet4(t *testing.T) { func TestSNMPEasyGet4(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
get1 := Data{ get1 := Data{
Name: "oid1", Name: "oid1",
Unit: "octets", Unit: "octets",
@@ -184,7 +204,7 @@ func TestSNMPEasyGet4(t *testing.T) {
Instance: "1", Instance: "1",
} }
h := Host{ h := Host{
Address: "127.0.0.1:31161", Address: testutil.GetLocalHost() + ":31161",
Community: "telegraf", Community: "telegraf",
Version: 2, Version: 2,
Timeout: 2.0, Timeout: 2.0,
@@ -227,6 +247,9 @@ func TestSNMPEasyGet4(t *testing.T) {
} }
func TestSNMPEasyGet5(t *testing.T) { func TestSNMPEasyGet5(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
get1 := Data{ get1 := Data{
Name: "oid1", Name: "oid1",
Unit: "octets", Unit: "octets",
@@ -234,7 +257,7 @@ func TestSNMPEasyGet5(t *testing.T) {
Instance: "1", Instance: "1",
} }
h := Host{ h := Host{
Address: "127.0.0.1:31161", Address: testutil.GetLocalHost() + ":31161",
Community: "telegraf", Community: "telegraf",
Version: 2, Version: 2,
Timeout: 2.0, Timeout: 2.0,
@@ -277,8 +300,11 @@ func TestSNMPEasyGet5(t *testing.T) {
} }
func TestSNMPEasyGet6(t *testing.T) { func TestSNMPEasyGet6(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
h := Host{ h := Host{
Address: "127.0.0.1:31161", Address: testutil.GetLocalHost() + ":31161",
Community: "telegraf", Community: "telegraf",
Version: 2, Version: 2,
Timeout: 2.0, Timeout: 2.0,
@@ -307,6 +333,9 @@ func TestSNMPEasyGet6(t *testing.T) {
} }
func TestSNMPBulk1(t *testing.T) { func TestSNMPBulk1(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
bulk1 := Data{ bulk1 := Data{
Name: "oid1", Name: "oid1",
Unit: "octets", Unit: "octets",
@@ -314,7 +343,7 @@ func TestSNMPBulk1(t *testing.T) {
MaxRepetition: 2, MaxRepetition: 2,
} }
h := Host{ h := Host{
Address: "127.0.0.1:31161", Address: testutil.GetLocalHost() + ":31161",
Community: "telegraf", Community: "telegraf",
Version: 2, Version: 2,
Timeout: 2.0, Timeout: 2.0,
@@ -385,6 +414,9 @@ func TestSNMPBulk1(t *testing.T) {
// bash scripts/circle-test.sh died unexpectedly // bash scripts/circle-test.sh died unexpectedly
// Maybe the test is too long ?? // Maybe the test is too long ??
func dTestSNMPBulk2(t *testing.T) { func dTestSNMPBulk2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
bulk1 := Data{ bulk1 := Data{
Name: "oid1", Name: "oid1",
Unit: "octets", Unit: "octets",
@@ -392,7 +424,7 @@ func dTestSNMPBulk2(t *testing.T) {
MaxRepetition: 2, MaxRepetition: 2,
} }
h := Host{ h := Host{
Address: "127.0.0.1:31161", Address: testutil.GetLocalHost() + ":31161",
Community: "telegraf", Community: "telegraf",
Version: 2, Version: 2,
Timeout: 2.0, Timeout: 2.0,

View File

@@ -8,8 +8,8 @@ import (
"net/http" "net/http"
"strings" "strings"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
) )
@@ -33,12 +33,12 @@ var sampleConfig = `
` `
type TimeSeries struct { type TimeSeries struct {
Series []*Metric `json:"series"` Series []*AmonMetric `json:"series"`
} }
type Metric struct { type AmonMetric struct {
Metric string `json:"metric"` AmonMetric string `json:"metric"`
Points [1]Point `json:"points"` Points [1]Point `json:"points"`
} }
type Point [2]float64 type Point [2]float64
@@ -53,31 +53,31 @@ func (a *Amon) Connect() error {
return nil return nil
} }
func (a *Amon) Write(points []*client.Point) error { func (a *Amon) Write(points []models.Metric) error {
if len(points) == 0 { if len(points) == 0 {
return nil return nil
} }
ts := TimeSeries{} ts := TimeSeries{}
tempSeries := []*Metric{} tempSeries := []*AmonMetric{}
metricCounter := 0 metricCounter := 0
for _, pt := range points { for _, pt := range points {
mname := strings.Replace(pt.Name(), "_", ".", -1) mname := strings.Replace(pt.Name(), "_", ".", -1)
if amonPts, err := buildPoints(pt); err == nil { if amonPts, err := buildPoints(pt); err == nil {
for fieldName, amonPt := range amonPts { for fieldName, amonPt := range amonPts {
metric := &Metric{ metric := &AmonMetric{
Metric: mname + "_" + strings.Replace(fieldName, "_", ".", -1), AmonMetric: mname + "_" + strings.Replace(fieldName, "_", ".", -1),
} }
metric.Points[0] = amonPt metric.Points[0] = amonPt
tempSeries = append(tempSeries, metric) tempSeries = append(tempSeries, metric)
metricCounter++ metricCounter++
} }
} else { } else {
log.Printf("unable to build Metric for %s, skipping\n", pt.Name()) log.Printf("unable to build AmonMetric for %s, skipping\n", pt.Name())
} }
} }
ts.Series = make([]*Metric, metricCounter) ts.Series = make([]*AmonMetric, metricCounter)
copy(ts.Series, tempSeries[0:]) copy(ts.Series, tempSeries[0:])
tsBytes, err := json.Marshal(ts) tsBytes, err := json.Marshal(ts)
if err != nil { if err != nil {
@@ -115,7 +115,7 @@ func (a *Amon) authenticatedUrl() string {
return fmt.Sprintf("%s/api/system/%s", a.AmonInstance, a.ServerKey) return fmt.Sprintf("%s/api/system/%s", a.AmonInstance, a.ServerKey)
} }
func buildPoints(pt *client.Point) (map[string]Point, error) { func buildPoints(pt models.Metric) (map[string]Point, error) {
pts := make(map[string]Point) pts := make(map[string]Point)
for k, v := range pt.Fields() { for k, v := range pt.Fields() {
var p Point var p Point
@@ -151,7 +151,7 @@ func (a *Amon) Close() error {
} }
func init() { func init() {
outputs.Add("amon", func() outputs.Output { outputs.Add("amon", func() models.Output {
return &Amon{} return &Amon{}
}) })
} }

View File

@@ -8,12 +8,12 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
) )
func TestBuildPoint(t *testing.T) { func TestBuildPoint(t *testing.T) {
var tagtests = []struct { var tagtests = []struct {
ptIn *client.Point ptIn models.Metric
outPt Point outPt Point
err error err error
}{ }{

View File

@@ -10,7 +10,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/streadway/amqp" "github.com/streadway/amqp"
) )
@@ -149,7 +149,7 @@ func (q *AMQP) Description() string {
return "Configuration for the AMQP server to send metrics to" return "Configuration for the AMQP server to send metrics to"
} }
func (q *AMQP) Write(points []*client.Point) error { func (q *AMQP) Write(points []models.Metric) error {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()
if len(points) == 0 { if len(points) == 0 {
@@ -190,7 +190,7 @@ func (q *AMQP) Write(points []*client.Point) error {
} }
func init() { func init() {
outputs.Add("amqp", func() outputs.Output { outputs.Add("amqp", func() models.Output {
return &AMQP{ return &AMQP{
Database: DefaultDatabase, Database: DefaultDatabase,
Precision: DefaultPrecision, Precision: DefaultPrecision,

View File

@@ -23,6 +23,6 @@ func TestConnectAndWrite(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Verify that we can successfully write data to the amqp broker // Verify that we can successfully write data to the amqp broker
err = q.Write(testutil.MockBatchPoints().Points()) err = q.Write(testutil.MockBatchPoints())
require.NoError(t, err) require.NoError(t, err)
} }

View File

@@ -14,7 +14,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
) )
@@ -72,7 +72,7 @@ func (c *CloudWatch) Close() error {
return nil return nil
} }
func (c *CloudWatch) Write(points []*client.Point) error { func (c *CloudWatch) Write(points []models.Metric) error {
for _, pt := range points { for _, pt := range points {
err := c.WriteSinglePoint(pt) err := c.WriteSinglePoint(pt)
if err != nil { if err != nil {
@@ -86,7 +86,7 @@ func (c *CloudWatch) Write(points []*client.Point) error {
// Write data for a single point. A point can have many fields and one field // Write data for a single point. A point can have many fields and one field
// is equal to one MetricDatum. There is a limit on how many MetricDatums a // is equal to one MetricDatum. There is a limit on how many MetricDatums a
// request can have so we process one Point at a time. // request can have so we process one Point at a time.
func (c *CloudWatch) WriteSinglePoint(point *client.Point) error { func (c *CloudWatch) WriteSinglePoint(point models.Metric) error {
datums := BuildMetricDatum(point) datums := BuildMetricDatum(point)
const maxDatumsPerCall = 20 // PutMetricData only supports up to 20 data points per call const maxDatumsPerCall = 20 // PutMetricData only supports up to 20 data points per call
@@ -143,7 +143,7 @@ func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch
// Make a MetricDatum for each field in a Point. Only fields with values that can be // Make a MetricDatum for each field in a Point. Only fields with values that can be
// converted to float64 are supported. Non-supported fields are skipped. // converted to float64 are supported. Non-supported fields are skipped.
func BuildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum { func BuildMetricDatum(point models.Metric) []*cloudwatch.MetricDatum {
datums := make([]*cloudwatch.MetricDatum, len(point.Fields())) datums := make([]*cloudwatch.MetricDatum, len(point.Fields()))
i := 0 i := 0
@@ -230,7 +230,7 @@ func BuildDimensions(ptTags map[string]string) []*cloudwatch.Dimension {
} }
func init() { func init() {
outputs.Add("cloudwatch", func() outputs.Output { outputs.Add("cloudwatch", func() models.Output {
return &CloudWatch{} return &CloudWatch{}
}) })
} }

View File

@@ -7,7 +7,7 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@@ -51,7 +51,7 @@ func TestBuildDimensions(t *testing.T) {
func TestBuildMetricDatums(t *testing.T) { func TestBuildMetricDatums(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
validPoints := []*client.Point{ validPoints := []models.Metric{
testutil.TestPoint(1), testutil.TestPoint(1),
testutil.TestPoint(int32(1)), testutil.TestPoint(int32(1)),
testutil.TestPoint(int64(1)), testutil.TestPoint(int64(1)),

View File

@@ -10,8 +10,8 @@ import (
"sort" "sort"
"strings" "strings"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
) )
@@ -32,14 +32,14 @@ var sampleConfig = `
` `
type TimeSeries struct { type TimeSeries struct {
Series []*Metric `json:"series"` Series []*DogMetric `json:"series"`
} }
type Metric struct { type DogMetric struct {
Metric string `json:"metric"` DogMetric string `json:"metric"`
Points [1]Point `json:"points"` Points [1]Point `json:"points"`
Host string `json:"host"` Host string `json:"host"`
Tags []string `json:"tags,omitempty"` Tags []string `json:"tags,omitempty"`
} }
type Point [2]float64 type Point [2]float64
@@ -62,31 +62,31 @@ func (d *Datadog) Connect() error {
return nil return nil
} }
func (d *Datadog) Write(points []*client.Point) error { func (d *Datadog) Write(points []models.Metric) error {
if len(points) == 0 { if len(points) == 0 {
return nil return nil
} }
ts := TimeSeries{} ts := TimeSeries{}
tempSeries := []*Metric{} tempSeries := []*DogMetric{}
metricCounter := 0 metricCounter := 0
for _, pt := range points { for _, pt := range points {
mname := strings.Replace(pt.Name(), "_", ".", -1) mname := strings.Replace(pt.Name(), "_", ".", -1)
if amonPts, err := buildPoints(pt); err == nil { if amonPts, err := buildPoints(pt); err == nil {
for fieldName, amonPt := range amonPts { for fieldName, amonPt := range amonPts {
metric := &Metric{ metric := &DogMetric{
Metric: mname + strings.Replace(fieldName, "_", ".", -1), DogMetric: mname + strings.Replace(fieldName, "_", ".", -1),
} }
metric.Points[0] = amonPt metric.Points[0] = amonPt
tempSeries = append(tempSeries, metric) tempSeries = append(tempSeries, metric)
metricCounter++ metricCounter++
} }
} else { } else {
log.Printf("unable to build Metric for %s, skipping\n", pt.Name()) log.Printf("unable to build DogMetric for %s, skipping\n", pt.Name())
} }
} }
ts.Series = make([]*Metric, metricCounter) ts.Series = make([]*DogMetric, metricCounter)
copy(ts.Series, tempSeries[0:]) copy(ts.Series, tempSeries[0:])
tsBytes, err := json.Marshal(ts) tsBytes, err := json.Marshal(ts)
if err != nil { if err != nil {
@@ -126,7 +126,7 @@ func (d *Datadog) authenticatedUrl() string {
return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode()) return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode())
} }
func buildPoints(pt *client.Point) (map[string]Point, error) { func buildPoints(pt models.Metric) (map[string]Point, error) {
pts := make(map[string]Point) pts := make(map[string]Point)
for k, v := range pt.Fields() { for k, v := range pt.Fields() {
var p Point var p Point
@@ -173,7 +173,7 @@ func (d *Datadog) Close() error {
} }
func init() { func init() {
outputs.Add("datadog", func() outputs.Output { outputs.Add("datadog", func() models.Output {
return NewDatadog(datadog_api) return NewDatadog(datadog_api)
}) })
} }

View File

@@ -11,7 +11,7 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@@ -38,7 +38,7 @@ func TestUriOverride(t *testing.T) {
d.Apikey = "123456" d.Apikey = "123456"
err := d.Connect() err := d.Connect()
require.NoError(t, err) require.NoError(t, err)
err = d.Write(testutil.MockBatchPoints().Points()) err = d.Write(testutil.MockBatchPoints())
require.NoError(t, err) require.NoError(t, err)
} }
@@ -57,7 +57,7 @@ func TestBadStatusCode(t *testing.T) {
d.Apikey = "123456" d.Apikey = "123456"
err := d.Connect() err := d.Connect()
require.NoError(t, err) require.NoError(t, err)
err = d.Write(testutil.MockBatchPoints().Points()) err = d.Write(testutil.MockBatchPoints())
if err == nil { if err == nil {
t.Errorf("error expected but none returned") t.Errorf("error expected but none returned")
} else { } else {
@@ -100,7 +100,7 @@ func TestBuildTags(t *testing.T) {
func TestBuildPoint(t *testing.T) { func TestBuildPoint(t *testing.T) {
var tagtests = []struct { var tagtests = []struct {
ptIn *client.Point ptIn models.Metric
outPt Point outPt Point
err error err error
}{ }{

View File

@@ -3,7 +3,7 @@ package graphite
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"log" "log"
"math/rand" "math/rand"
@@ -67,7 +67,7 @@ func (g *Graphite) Description() string {
// Choose a random server in the cluster to write to until a successful write // Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error. // occurs, logging each unsuccessful. If all servers fail, return error.
func (g *Graphite) Write(points []*client.Point) error { func (g *Graphite) Write(points []models.Metric) error {
// Prepare data // Prepare data
var bp []string var bp []string
for _, point := range points { for _, point := range points {
@@ -128,7 +128,7 @@ func (g *Graphite) Write(points []*client.Point) error {
} }
func init() { func init() {
outputs.Add("graphite", func() outputs.Output { outputs.Add("graphite", func() models.Output {
return &Graphite{} return &Graphite{}
}) })
} }

View File

@@ -8,7 +8,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@@ -21,14 +21,14 @@ func TestGraphiteError(t *testing.T) {
Prefix: "my.prefix", Prefix: "my.prefix",
} }
// Init points // Init points
pt1, _ := client.NewPoint( pt1, _ := models.NewMetric(
"mymeasurement", "mymeasurement",
map[string]string{"host": "192.168.0.1"}, map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"mymeasurement": float64(3.14)}, map[string]interface{}{"mymeasurement": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
// Prepare point list // Prepare point list
var points []*client.Point var points []models.Metric
points = append(points, pt1) points = append(points, pt1)
// Error // Error
err1 := g.Connect() err1 := g.Connect()
@@ -45,26 +45,26 @@ func TestGraphiteOK(t *testing.T) {
Prefix: "my.prefix", Prefix: "my.prefix",
} }
// Init points // Init points
pt1, _ := client.NewPoint( pt1, _ := models.NewMetric(
"mymeasurement", "mymeasurement",
map[string]string{"host": "192.168.0.1"}, map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"mymeasurement": float64(3.14)}, map[string]interface{}{"mymeasurement": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
pt2, _ := client.NewPoint( pt2, _ := models.NewMetric(
"mymeasurement", "mymeasurement",
map[string]string{"host": "192.168.0.1"}, map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)}, map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
pt3, _ := client.NewPoint( pt3, _ := models.NewMetric(
"my_measurement", "my_measurement",
map[string]string{"host": "192.168.0.1"}, map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)}, map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
// Prepare point list // Prepare point list
var points []*client.Point var points []models.Metric
points = append(points, pt1) points = append(points, pt1)
points = append(points, pt2) points = append(points, pt2)
points = append(points, pt3) points = append(points, pt3)

View File

@@ -9,9 +9,11 @@ import (
"strings" "strings"
"time" "time"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/influxdb/client/v2"
) )
type InfluxDB struct { type InfluxDB struct {
@@ -130,14 +132,14 @@ func (i *InfluxDB) Description() string {
// Choose a random server in the cluster to write to until a successful write // Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error. // occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(points []*client.Point) error { func (i *InfluxDB) Write(metrics []models.Metric) error {
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: i.Database, Database: i.Database,
Precision: i.Precision, Precision: i.Precision,
}) })
for _, point := range points { for _, metric := range metrics {
bp.AddPoint(point) bp.AddPoint(metric.Point())
} }
// This will get set to nil if a successful write occurs // This will get set to nil if a successful write occurs
@@ -156,7 +158,7 @@ func (i *InfluxDB) Write(points []*client.Point) error {
} }
func init() { func init() {
outputs.Add("influxdb", func() outputs.Output { outputs.Add("influxdb", func() models.Output {
return &InfluxDB{} return &InfluxDB{}
}) })
} }

View File

@@ -18,7 +18,7 @@ func TestUDPInflux(t *testing.T) {
err := i.Connect() err := i.Connect()
require.NoError(t, err) require.NoError(t, err)
err = i.Write(testutil.MockBatchPoints().Points()) err = i.Write(testutil.MockBatchPoints())
require.NoError(t, err) require.NoError(t, err)
} }
@@ -36,6 +36,6 @@ func TestHTTPInflux(t *testing.T) {
err := i.Connect() err := i.Connect()
require.NoError(t, err) require.NoError(t, err)
err = i.Write(testutil.MockBatchPoints().Points()) err = i.Write(testutil.MockBatchPoints())
require.NoError(t, err) require.NoError(t, err)
} }

View File

@@ -6,7 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"io/ioutil" "io/ioutil"
) )
@@ -112,7 +112,7 @@ func (k *Kafka) Description() string {
return "Configuration for the Kafka server to send metrics to" return "Configuration for the Kafka server to send metrics to"
} }
func (k *Kafka) Write(points []*client.Point) error { func (k *Kafka) Write(points []models.Metric) error {
if len(points) == 0 { if len(points) == 0 {
return nil return nil
} }
@@ -140,7 +140,7 @@ func (k *Kafka) Write(points []*client.Point) error {
} }
func init() { func init() {
outputs.Add("kafka", func() outputs.Output { outputs.Add("kafka", func() models.Output {
return &Kafka{} return &Kafka{}
}) })
} }

View File

@@ -23,6 +23,6 @@ func TestConnectAndWrite(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Verify that we can successfully write data to the kafka broker // Verify that we can successfully write data to the kafka broker
err = k.Write(testutil.MockBatchPoints().Points()) err = k.Write(testutil.MockBatchPoints())
require.NoError(t, err) require.NoError(t, err)
} }

View File

@@ -14,7 +14,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
) )
@@ -103,7 +103,7 @@ func (k *KinesisOutput) Close() error {
return nil return nil
} }
func FormatMetric(k *KinesisOutput, point *client.Point) (string, error) { func FormatMetric(k *KinesisOutput, point models.Metric) (string, error) {
if k.Format == "string" { if k.Format == "string" {
return point.String(), nil return point.String(), nil
} else { } else {
@@ -138,7 +138,7 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du
return time.Since(start) return time.Since(start)
} }
func (k *KinesisOutput) Write(points []*client.Point) error { func (k *KinesisOutput) Write(points []models.Metric) error {
var sz uint32 = 0 var sz uint32 = 0
if len(points) == 0 { if len(points) == 0 {
@@ -172,7 +172,7 @@ func (k *KinesisOutput) Write(points []*client.Point) error {
} }
func init() { func init() {
outputs.Add("kinesis", func() outputs.Output { outputs.Add("kinesis", func() models.Output {
return &KinesisOutput{} return &KinesisOutput{}
}) })
} }

View File

@@ -15,7 +15,7 @@ func TestFormatMetric(t *testing.T) {
Format: "string", Format: "string",
} }
p := testutil.MockBatchPoints().Points()[0] p := testutil.MockBatchPoints()[0]
valid_string := "test1,tag1=value1 value=1 1257894000000000000" valid_string := "test1,tag1=value1 value=1 1257894000000000000"
func_string, err := FormatMetric(k, p) func_string, err := FormatMetric(k, p)

View File

@@ -7,8 +7,8 @@ import (
"log" "log"
"net/http" "net/http"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
) )
@@ -69,7 +69,7 @@ func (l *Librato) Connect() error {
return nil return nil
} }
func (l *Librato) Write(points []*client.Point) error { func (l *Librato) Write(points []models.Metric) error {
if len(points) == 0 { if len(points) == 0 {
return nil return nil
} }
@@ -122,7 +122,7 @@ func (l *Librato) Description() string {
return "Configuration for Librato API to send metrics to." return "Configuration for Librato API to send metrics to."
} }
func (l *Librato) buildGauges(pt *client.Point) ([]*Gauge, error) { func (l *Librato) buildGauges(pt models.Metric) ([]*Gauge, error) {
gauges := []*Gauge{} gauges := []*Gauge{}
for fieldName, value := range pt.Fields() { for fieldName, value := range pt.Fields() {
gauge := &Gauge{ gauge := &Gauge{
@@ -169,7 +169,7 @@ func (l *Librato) Close() error {
} }
func init() { func init() {
outputs.Add("librato", func() outputs.Output { outputs.Add("librato", func() models.Output {
return NewLibrato(librato_api) return NewLibrato(librato_api)
}) })
} }

View File

@@ -11,7 +11,7 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@@ -39,7 +39,7 @@ func TestUriOverride(t *testing.T) {
l.ApiToken = "123456" l.ApiToken = "123456"
err := l.Connect() err := l.Connect()
require.NoError(t, err) require.NoError(t, err)
err = l.Write(testutil.MockBatchPoints().Points()) err = l.Write(testutil.MockBatchPoints())
require.NoError(t, err) require.NoError(t, err)
} }
@@ -61,7 +61,7 @@ func TestBadStatusCode(t *testing.T) {
l.ApiToken = "123456" l.ApiToken = "123456"
err := l.Connect() err := l.Connect()
require.NoError(t, err) require.NoError(t, err)
err = l.Write(testutil.MockBatchPoints().Points()) err = l.Write(testutil.MockBatchPoints())
if err == nil { if err == nil {
t.Errorf("error expected but none returned") t.Errorf("error expected but none returned")
} else { } else {
@@ -71,7 +71,7 @@ func TestBadStatusCode(t *testing.T) {
func TestBuildGauge(t *testing.T) { func TestBuildGauge(t *testing.T) {
var gaugeTests = []struct { var gaugeTests = []struct {
ptIn *client.Point ptIn models.Metric
outGauge *Gauge outGauge *Gauge
err error err error
}{ }{
@@ -161,20 +161,20 @@ func TestBuildGauge(t *testing.T) {
} }
func TestBuildGaugeWithSource(t *testing.T) { func TestBuildGaugeWithSource(t *testing.T) {
pt1, _ := client.NewPoint( pt1, _ := models.NewMetric(
"test1", "test1",
map[string]string{"hostname": "192.168.0.1"}, map[string]string{"hostname": "192.168.0.1"},
map[string]interface{}{"value": 0.0}, map[string]interface{}{"value": 0.0},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
pt2, _ := client.NewPoint( pt2, _ := models.NewMetric(
"test2", "test2",
map[string]string{"hostnam": "192.168.0.1"}, map[string]string{"hostnam": "192.168.0.1"},
map[string]interface{}{"value": 1.0}, map[string]interface{}{"value": 1.0},
time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC),
) )
var gaugeTests = []struct { var gaugeTests = []struct {
ptIn *client.Point ptIn models.Metric
outGauge *Gauge outGauge *Gauge
err error err error
}{ }{

View File

@@ -10,8 +10,8 @@ import (
"sync" "sync"
paho "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" paho "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
) )
@@ -78,7 +78,7 @@ func (m *MQTT) Description() string {
return "Configuration for MQTT server to send metrics to" return "Configuration for MQTT server to send metrics to"
} }
func (m *MQTT) Write(points []*client.Point) error { func (m *MQTT) Write(points []models.Metric) error {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
if len(points) == 0 { if len(points) == 0 {
@@ -184,7 +184,7 @@ func getCertPool(pemPath string) (*x509.CertPool, error) {
} }
func init() { func init() {
outputs.Add("mqtt", func() outputs.Output { outputs.Add("mqtt", func() models.Output {
return &MQTT{} return &MQTT{}
}) })
} }

View File

@@ -22,6 +22,6 @@ func TestConnectAndWrite(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Verify that we can successfully write data to the mqtt broker // Verify that we can successfully write data to the mqtt broker
err = m.Write(testutil.MockBatchPoints().Points()) err = m.Write(testutil.MockBatchPoints())
require.NoError(t, err) require.NoError(t, err)
} }

View File

@@ -2,7 +2,7 @@ package nsq
import ( import (
"fmt" "fmt"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/nsqio/go-nsq" "github.com/nsqio/go-nsq"
) )
@@ -45,7 +45,7 @@ func (n *NSQ) Description() string {
return "Send telegraf measurements to NSQD" return "Send telegraf measurements to NSQD"
} }
func (n *NSQ) Write(points []*client.Point) error { func (n *NSQ) Write(points []models.Metric) error {
if len(points) == 0 { if len(points) == 0 {
return nil return nil
} }
@@ -65,7 +65,7 @@ func (n *NSQ) Write(points []*client.Point) error {
} }
func init() { func init() {
outputs.Add("nsq", func() outputs.Output { outputs.Add("nsq", func() models.Output {
return &NSQ{} return &NSQ{}
}) })
} }

View File

@@ -23,6 +23,6 @@ func TestConnectAndWrite(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Verify that we can successfully write data to the NSQ daemon // Verify that we can successfully write data to the NSQ daemon
err = n.Write(testutil.MockBatchPoints().Points()) err = n.Write(testutil.MockBatchPoints())
require.NoError(t, err) require.NoError(t, err)
} }

View File

@@ -8,7 +8,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
) )
@@ -58,7 +58,7 @@ func (o *OpenTSDB) Connect() error {
return nil return nil
} }
func (o *OpenTSDB) Write(points []*client.Point) error { func (o *OpenTSDB) Write(points []models.Metric) error {
if len(points) == 0 { if len(points) == 0 {
return nil return nil
} }
@@ -101,7 +101,7 @@ func buildTags(ptTags map[string]string) []string {
return tags return tags
} }
func buildMetrics(pt *client.Point, now time.Time, prefix string) []*MetricLine { func buildMetrics(pt models.Metric, now time.Time, prefix string) []*MetricLine {
ret := []*MetricLine{} ret := []*MetricLine{}
for fieldName, value := range pt.Fields() { for fieldName, value := range pt.Fields() {
metric := &MetricLine{ metric := &MetricLine{
@@ -162,7 +162,7 @@ func (o *OpenTSDB) Close() error {
} }
func init() { func init() {
outputs.Add("opentsdb", func() outputs.Output { outputs.Add("opentsdb", func() models.Output {
return &OpenTSDB{} return &OpenTSDB{}
}) })
} }

View File

@@ -54,18 +54,18 @@ func TestWrite(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Verify that we can successfully write data to OpenTSDB // Verify that we can successfully write data to OpenTSDB
err = o.Write(testutil.MockBatchPoints().Points()) err = o.Write(testutil.MockBatchPoints())
require.NoError(t, err) require.NoError(t, err)
// Verify postive and negative test cases of writing data // Verify postive and negative test cases of writing data
bp := testutil.MockBatchPoints() bp := testutil.MockBatchPoints()
bp.AddPoint(testutil.TestPoint(float64(1.0), "justametric.float")) bp = append(bp, testutil.TestPoint(float64(1.0), "justametric.float"))
bp.AddPoint(testutil.TestPoint(int64(123456789), "justametric.int")) bp = append(bp, testutil.TestPoint(int64(123456789), "justametric.int"))
bp.AddPoint(testutil.TestPoint(uint64(123456789012345), "justametric.uint")) bp = append(bp, testutil.TestPoint(uint64(123456789012345), "justametric.uint"))
bp.AddPoint(testutil.TestPoint("Lorem Ipsum", "justametric.string")) bp = append(bp, testutil.TestPoint("Lorem Ipsum", "justametric.string"))
bp.AddPoint(testutil.TestPoint(float64(42.0), "justametric.anotherfloat")) bp = append(bp, testutil.TestPoint(float64(42.0), "justametric.anotherfloat"))
err = o.Write(bp.Points()) err = o.Write(bp)
require.NoError(t, err) require.NoError(t, err)
} }

View File

@@ -5,7 +5,7 @@ import (
"log" "log"
"net/http" "net/http"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@@ -58,7 +58,7 @@ func (p *PrometheusClient) Description() string {
return "Configuration for the Prometheus client to spawn" return "Configuration for the Prometheus client to spawn"
} }
func (p *PrometheusClient) Write(points []*client.Point) error { func (p *PrometheusClient) Write(points []models.Metric) error {
if len(points) == 0 { if len(points) == 0 {
return nil return nil
} }
@@ -119,7 +119,7 @@ func (p *PrometheusClient) Write(points []*client.Point) error {
} }
func init() { func init() {
outputs.Add("prometheus_client", func() outputs.Output { outputs.Add("prometheus_client", func() models.Output {
return &PrometheusClient{} return &PrometheusClient{}
}) })
} }

View File

@@ -5,7 +5,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/inputs/prometheus" "github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@@ -21,15 +21,15 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
Urls: []string{"http://localhost:9126/metrics"}, Urls: []string{"http://localhost:9126/metrics"},
} }
tags := make(map[string]string) tags := make(map[string]string)
pt1, _ := client.NewPoint( pt1, _ := models.NewMetric(
"test_point_1", "test_point_1",
tags, tags,
map[string]interface{}{"value": 0.0}) map[string]interface{}{"value": 0.0})
pt2, _ := client.NewPoint( pt2, _ := models.NewMetric(
"test_point_2", "test_point_2",
tags, tags,
map[string]interface{}{"value": 1.0}) map[string]interface{}{"value": 1.0})
var points = []*client.Point{ var points = []models.Metric{
pt1, pt1,
pt2, pt2,
} }
@@ -63,15 +63,15 @@ func TestPrometheusWritePointTag(t *testing.T) {
} }
tags := make(map[string]string) tags := make(map[string]string)
tags["testtag"] = "testvalue" tags["testtag"] = "testvalue"
pt1, _ := client.NewPoint( pt1, _ := models.NewMetric(
"test_point_3", "test_point_3",
tags, tags,
map[string]interface{}{"value": 0.0}) map[string]interface{}{"value": 0.0})
pt2, _ := client.NewPoint( pt2, _ := models.NewMetric(
"test_point_4", "test_point_4",
tags, tags,
map[string]interface{}{"value": 1.0}) map[string]interface{}{"value": 1.0})
var points = []*client.Point{ var points = []models.Metric{
pt1, pt1,
pt2, pt2,
} }

View File

@@ -1,40 +1,10 @@
package outputs package outputs
import ( import (
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
) )
type Output interface { type Creator func() models.Output
// Connect to the Output
Connect() error
// Close any connections to the Output
Close() error
// Description returns a one-sentence description on the Output
Description() string
// SampleConfig returns the default configuration of the Output
SampleConfig() string
// Write takes in group of points to be written to the Output
Write(points []*client.Point) error
}
type ServiceOutput interface {
// Connect to the Output
Connect() error
// Close any connections to the Output
Close() error
// Description returns a one-sentence description on the Output
Description() string
// SampleConfig returns the default configuration of the Output
SampleConfig() string
// Write takes in group of points to be written to the Output
Write(points []*client.Point) error
// Start the "service" that will provide an Output
Start() error
// Stop the "service" that will provide an Output
Stop()
}
type Creator func() Output
var Outputs = map[string]Creator{} var Outputs = map[string]Creator{}

View File

@@ -6,7 +6,7 @@ import (
"os" "os"
"github.com/amir/raidman" "github.com/amir/raidman"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
) )
@@ -48,7 +48,7 @@ func (r *Riemann) Description() string {
return "Configuration for the Riemann server to send metrics to" return "Configuration for the Riemann server to send metrics to"
} }
func (r *Riemann) Write(points []*client.Point) error { func (r *Riemann) Write(points []models.Metric) error {
if len(points) == 0 { if len(points) == 0 {
return nil return nil
} }
@@ -70,7 +70,7 @@ func (r *Riemann) Write(points []*client.Point) error {
return nil return nil
} }
func buildEvents(p *client.Point) []*raidman.Event { func buildEvents(p models.Metric) []*raidman.Event {
events := []*raidman.Event{} events := []*raidman.Event{}
for fieldName, value := range p.Fields() { for fieldName, value := range p.Fields() {
host, ok := p.Tags()["host"] host, ok := p.Tags()["host"]
@@ -95,7 +95,7 @@ func buildEvents(p *client.Point) []*raidman.Event {
} }
func init() { func init() {
outputs.Add("riemann", func() outputs.Output { outputs.Add("riemann", func() models.Output {
return &Riemann{} return &Riemann{}
}) })
} }

View File

@@ -22,6 +22,6 @@ func TestConnectAndWrite(t *testing.T) {
err := r.Connect() err := r.Connect()
require.NoError(t, err) require.NoError(t, err)
err = r.Write(testutil.MockBatchPoints().Points()) err = r.Write(testutil.MockBatchPoints())
require.NoError(t, err) require.NoError(t, err)
} }

View File

@@ -6,7 +6,7 @@ import (
"os" "os"
"time" "time"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/telegraf/models"
) )
var localhost = "localhost" var localhost = "localhost"
@@ -33,11 +33,11 @@ func GetLocalHost() string {
// MockBatchPoints returns a mock BatchPoints object for using in unit tests // MockBatchPoints returns a mock BatchPoints object for using in unit tests
// of telegraf output sinks. // of telegraf output sinks.
func MockBatchPoints() client.BatchPoints { func MockBatchPoints() []models.Metric {
// Create a new point batch // Create a new point batch
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{}) ret := []models.Metric{}
bp.AddPoint(TestPoint(1.0)) ret = append(ret, TestPoint(1.0))
return bp return ret
} }
// TestPoint Returns a simple test point: // TestPoint Returns a simple test point:
@@ -45,7 +45,7 @@ func MockBatchPoints() client.BatchPoints {
// tags -> "tag1":"value1" // tags -> "tag1":"value1"
// value -> value // value -> value
// time -> time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) // time -> time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
func TestPoint(value interface{}, name ...string) *client.Point { func TestPoint(value interface{}, name ...string) models.Metric {
if value == nil { if value == nil {
panic("Cannot use a nil value") panic("Cannot use a nil value")
} }
@@ -54,7 +54,7 @@ func TestPoint(value interface{}, name ...string) *client.Point {
measurement = name[0] measurement = name[0]
} }
tags := map[string]string{"tag1": "value1"} tags := map[string]string{"tag1": "value1"}
pt, _ := client.NewPoint( pt, _ := models.NewMetric(
measurement, measurement,
tags, tags,
map[string]interface{}{"value": value}, map[string]interface{}{"value": value},