From 1edfa9bbd0665cb75060bd2cdced60fbc8566656 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 27 Jan 2016 16:15:14 -0700 Subject: [PATCH] Throughout telegraf, use telegraf.Metric rather than client.Point closes #599 --- CHANGELOG.md | 2 + CONTRIBUTING.md | 22 +-- agent/accumulator.go | 15 +- agent/agent.go | 30 ++-- internal/models/filter.go | 6 +- internal/models/running_output.go | 24 ++-- output.go | 36 +---- .../inputs/github_webhooks/github_webhooks.go | 2 +- .../github_webhooks/github_webhooks_models.go | 130 +++++++++--------- plugins/inputs/httpjson/httpjson_test.go | 4 +- plugins/inputs/influxdb/influxdb_test.go | 2 +- plugins/inputs/jolokia/jolokia_test.go | 4 +- .../inputs/kafka_consumer/kafka_consumer.go | 19 ++- .../kafka_consumer_integration_test.go | 8 +- .../kafka_consumer/kafka_consumer_test.go | 12 +- plugins/inputs/postgresql/postgresql_test.go | 2 +- plugins/inputs/system/cpu_test.go | 2 +- plugins/inputs/system/disk_test.go | 22 +-- plugins/inputs/system/memory_test.go | 2 +- plugins/inputs/system/net_test.go | 2 +- plugins/inputs/zfs/zfs_test.go | 6 +- plugins/outputs/amon/amon.go | 31 ++--- plugins/outputs/amon/amon_test.go | 20 +-- plugins/outputs/amqp/amqp.go | 11 +- plugins/outputs/amqp/amqp_test.go | 2 +- plugins/outputs/cloudwatch/README.md | 2 +- plugins/outputs/cloudwatch/cloudwatch.go | 25 ++-- plugins/outputs/cloudwatch/cloudwatch_test.go | 22 +-- plugins/outputs/datadog/datadog.go | 45 +++--- plugins/outputs/datadog/datadog_test.go | 24 ++-- plugins/outputs/graphite/graphite.go | 7 +- plugins/outputs/graphite/graphite_test.go | 30 ++-- plugins/outputs/influxdb/influxdb.go | 11 +- plugins/outputs/influxdb/influxdb_test.go | 4 +- plugins/outputs/kafka/kafka.go | 11 +- plugins/outputs/kafka/kafka_test.go | 2 +- plugins/outputs/kinesis/kinesis.go | 11 +- plugins/outputs/kinesis/kinesis_test.go | 2 +- plugins/outputs/librato/librato.go | 31 ++--- plugins/outputs/librato/librato_test.go | 28 ++-- plugins/outputs/mqtt/mqtt.go | 11 +- plugins/outputs/mqtt/mqtt_test.go | 2 +- plugins/outputs/nsq/nsq.go | 11 +- plugins/outputs/nsq/nsq_test.go | 2 +- plugins/outputs/opentsdb/opentsdb.go | 25 ++-- plugins/outputs/opentsdb/opentsdb_test.go | 16 +-- .../prometheus_client/prometheus_client.go | 9 +- .../prometheus_client_test.go | 18 +-- plugins/outputs/riemann/riemann.go | 11 +- plugins/outputs/riemann/riemann_test.go | 2 +- testutil/accumulator.go | 32 ++--- testutil/testutil.go | 18 +-- 52 files changed, 391 insertions(+), 437 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 736952fc1..6d6fda00d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,8 +3,10 @@ ### Release Notes ### Features +- [#564](https://github.com/influxdata/telegraf/issues/564): features for plugin writing simplification. Internal metric data type. ### Bugfixes +- [#599](https://github.com/influxdata/telegraf/issues/599): datadog plugin tags not working. ## v0.10.1 [2016-01-27] diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 1dad3ab17..fa1645725 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -37,7 +37,7 @@ and submit new inputs. ### Input Plugin Guidelines -* A plugin must conform to the `inputs.Input` interface. +* A plugin must conform to the `telegraf.Input` interface. * Input Plugins should call `inputs.Add` in their `init` function to register themselves. See below for a quick example. * Input Plugins must be added to the @@ -97,7 +97,10 @@ package simple // simple.go -import "github.com/influxdata/telegraf/plugins/inputs" +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) type Simple struct { Ok bool @@ -122,7 +125,7 @@ func (s *Simple) Gather(acc inputs.Accumulator) error { } func init() { - inputs.Add("simple", func() inputs.Input { return &Simple{} }) + inputs.Add("simple", func() telegraf.Input { return &Simple{} }) } ``` @@ -182,7 +185,7 @@ type Output interface { Close() error Description() string SampleConfig() string - Write(points []*client.Point) error + Write(metrics []telegraf.Metric) error } ``` @@ -193,7 +196,10 @@ package simpleoutput // simpleoutput.go -import "github.com/influxdata/telegraf/plugins/outputs" +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" +) type Simple struct { Ok bool @@ -217,7 +223,7 @@ func (s *Simple) Close() error { return nil } -func (s *Simple) Write(points []*client.Point) error { +func (s *Simple) Write(metrics []telegraf.Metric) error { for _, pt := range points { // write `pt` to the output sink here } @@ -225,7 +231,7 @@ func (s *Simple) Write(points []*client.Point) error { } func init() { - outputs.Add("simpleoutput", func() outputs.Output { return &Simple{} }) + outputs.Add("simpleoutput", func() telegraf.Output { return &Simple{} }) } ``` @@ -253,7 +259,7 @@ type ServiceOutput interface { Close() error Description() string SampleConfig() string - Write(points []*client.Point) error + Write(metrics []telegraf.Metric) error Start() error Stop() } diff --git a/agent/accumulator.go b/agent/accumulator.go index 30d59e919..9361ad82e 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -7,17 +7,16 @@ import ( "sync" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/models" - - "github.com/influxdata/influxdb/client/v2" ) func NewAccumulator( inputConfig *internal_models.InputConfig, - points chan *client.Point, + metrics chan telegraf.Metric, ) *accumulator { acc := accumulator{} - acc.points = points + acc.metrics = metrics acc.inputConfig = inputConfig return &acc } @@ -25,7 +24,7 @@ func NewAccumulator( type accumulator struct { sync.Mutex - points chan *client.Point + metrics chan telegraf.Metric defaultTags map[string]string @@ -136,15 +135,15 @@ func (ac *accumulator) AddFields( measurement = ac.prefix + measurement } - pt, err := client.NewPoint(measurement, tags, result, timestamp) + m, err := telegraf.NewMetric(measurement, tags, result, timestamp) if err != nil { log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) return } if ac.debug { - fmt.Println("> " + pt.String()) + fmt.Println("> " + m.String()) } - ac.points <- pt + ac.metrics <- m } func (ac *accumulator) Debug() bool { diff --git a/agent/agent.go b/agent/agent.go index 8825a2ec8..bd52e7875 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -14,8 +14,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/models" - - "github.com/influxdata/influxdb/client/v2" ) // Agent runs telegraf and collects data based on the given config @@ -101,7 +99,7 @@ func panicRecover(input *internal_models.RunningInput) { // gatherParallel runs the inputs that are using the same reporting interval // as the telegraf agent. -func (a *Agent) gatherParallel(pointChan chan *client.Point) error { +func (a *Agent) gatherParallel(metricC chan telegraf.Metric) error { var wg sync.WaitGroup start := time.Now() @@ -118,7 +116,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { defer panicRecover(input) defer wg.Done() - acc := NewAccumulator(input.Config, pointChan) + acc := NewAccumulator(input.Config, metricC) acc.SetDebug(a.Config.Agent.Debug) acc.setDefaultTags(a.Config.Tags) @@ -159,7 +157,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { func (a *Agent) gatherSeparate( shutdown chan struct{}, input *internal_models.RunningInput, - pointChan chan *client.Point, + metricC chan telegraf.Metric, ) error { defer panicRecover(input) @@ -169,7 +167,7 @@ func (a *Agent) gatherSeparate( var outerr error start := time.Now() - acc := NewAccumulator(input.Config, pointChan) + acc := NewAccumulator(input.Config, metricC) acc.SetDebug(a.Config.Agent.Debug) acc.setDefaultTags(a.Config.Tags) @@ -201,13 +199,13 @@ func (a *Agent) gatherSeparate( func (a *Agent) Test() error { shutdown := make(chan struct{}) defer close(shutdown) - pointChan := make(chan *client.Point) + metricC := make(chan telegraf.Metric) // dummy receiver for the point channel go func() { for { select { - case <-pointChan: + case <-metricC: // do nothing case <-shutdown: return @@ -216,7 +214,7 @@ func (a *Agent) Test() error { }() for _, input := range a.Config.Inputs { - acc := NewAccumulator(input.Config, pointChan) + acc := NewAccumulator(input.Config, metricC) acc.SetDebug(true) fmt.Printf("* Plugin: %s, Collection 1\n", input.Name) @@ -263,7 +261,7 @@ func (a *Agent) flush() { } // flusher monitors the points input channel and flushes on the minimum interval -func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error { +func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error { // Inelegant, but this sleep is to allow the Gather threads to run, so that // the flusher will flush after metrics are collected. time.Sleep(time.Millisecond * 200) @@ -278,9 +276,9 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er return nil case <-ticker.C: a.flush() - case pt := <-pointChan: + case m := <-metricC: for _, o := range a.Config.Outputs { - o.AddPoint(pt) + o.AddPoint(m) } } } @@ -321,7 +319,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration) // channel shared between all input threads for accumulating points - pointChan := make(chan *client.Point, 1000) + metricC := make(chan telegraf.Metric, 1000) // Round collection to nearest interval by sleeping if a.Config.Agent.RoundInterval { @@ -333,7 +331,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { wg.Add(1) go func() { defer wg.Done() - if err := a.flusher(shutdown, pointChan); err != nil { + if err := a.flusher(shutdown, metricC); err != nil { log.Printf("Flusher routine failed, exiting: %s\n", err.Error()) close(shutdown) } @@ -358,7 +356,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { wg.Add(1) go func(input *internal_models.RunningInput) { defer wg.Done() - if err := a.gatherSeparate(shutdown, input, pointChan); err != nil { + if err := a.gatherSeparate(shutdown, input, metricC); err != nil { log.Printf(err.Error()) } }(input) @@ -368,7 +366,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { defer wg.Wait() for { - if err := a.gatherParallel(pointChan); err != nil { + if err := a.gatherParallel(metricC); err != nil { log.Printf(err.Error()) } diff --git a/internal/models/filter.go b/internal/models/filter.go index 06fe636cb..9b4f2ba90 100644 --- a/internal/models/filter.go +++ b/internal/models/filter.go @@ -3,7 +3,7 @@ package internal_models import ( "strings" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" ) @@ -24,8 +24,8 @@ type Filter struct { IsActive bool } -func (f Filter) ShouldPointPass(point *client.Point) bool { - if f.ShouldPass(point.Name()) && f.ShouldTagsPass(point.Tags()) { +func (f Filter) ShouldMetricPass(metric telegraf.Metric) bool { + if f.ShouldPass(metric.Name()) && f.ShouldTagsPass(metric.Tags()) { return true } return false diff --git a/internal/models/running_output.go b/internal/models/running_output.go index f0dae6db1..6d985fb4f 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -5,8 +5,6 @@ import ( "time" "github.com/influxdata/telegraf" - - "github.com/influxdata/influxdb/client/v2" ) const DEFAULT_POINT_BUFFER_LIMIT = 10000 @@ -18,7 +16,7 @@ type RunningOutput struct { Quiet bool PointBufferLimit int - points []*client.Point + metrics []telegraf.Metric overwriteCounter int } @@ -29,7 +27,7 @@ func NewRunningOutput( ) *RunningOutput { ro := &RunningOutput{ Name: name, - points: make([]*client.Point, 0), + metrics: make([]telegraf.Metric, 0), Output: output, Config: conf, PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT, @@ -37,34 +35,34 @@ func NewRunningOutput( return ro } -func (ro *RunningOutput) AddPoint(point *client.Point) { +func (ro *RunningOutput) AddPoint(point telegraf.Metric) { if ro.Config.Filter.IsActive { - if !ro.Config.Filter.ShouldPointPass(point) { + if !ro.Config.Filter.ShouldMetricPass(point) { return } } - if len(ro.points) < ro.PointBufferLimit { - ro.points = append(ro.points, point) + if len(ro.metrics) < ro.PointBufferLimit { + ro.metrics = append(ro.metrics, point) } else { - if ro.overwriteCounter == len(ro.points) { + if ro.overwriteCounter == len(ro.metrics) { ro.overwriteCounter = 0 } - ro.points[ro.overwriteCounter] = point + ro.metrics[ro.overwriteCounter] = point ro.overwriteCounter++ } } func (ro *RunningOutput) Write() error { start := time.Now() - err := ro.Output.Write(ro.points) + err := ro.Output.Write(ro.metrics) elapsed := time.Since(start) if err == nil { if !ro.Quiet { log.Printf("Wrote %d metrics to output %s in %s\n", - len(ro.points), ro.Name, elapsed) + len(ro.metrics), ro.Name, elapsed) } - ro.points = make([]*client.Point, 0) + ro.metrics = make([]telegraf.Metric, 0) ro.overwriteCounter = 0 } return err diff --git a/output.go b/output.go index 39b1778b9..d66ea4556 100644 --- a/output.go +++ b/output.go @@ -1,37 +1,5 @@ package telegraf -import "github.com/influxdata/influxdb/client/v2" - -// type Output interface { -// // Connect to the Output -// Connect() error -// // Close any connections to the Output -// Close() error -// // Description returns a one-sentence description on the Output -// Description() string -// // SampleConfig returns the default configuration of the Output -// SampleConfig() string -// // Write takes in group of points to be written to the Output -// Write(metrics []Metric) error -// } - -// type ServiceOutput interface { -// // Connect to the Output -// Connect() error -// // Close any connections to the Output -// Close() error -// // Description returns a one-sentence description on the Output -// Description() string -// // SampleConfig returns the default configuration of the Output -// SampleConfig() string -// // Write takes in group of points to be written to the Output -// Write(metrics []Metric) error -// // Start the "service" that will provide an Output -// Start() error -// // Stop the "service" that will provide an Output -// Stop() -// } - type Output interface { // Connect to the Output Connect() error @@ -42,7 +10,7 @@ type Output interface { // SampleConfig returns the default configuration of the Output SampleConfig() string // Write takes in group of points to be written to the Output - Write(points []*client.Point) error + Write(metrics []Metric) error } type ServiceOutput interface { @@ -55,7 +23,7 @@ type ServiceOutput interface { // SampleConfig returns the default configuration of the Output SampleConfig() string // Write takes in group of points to be written to the Output - Write(points []*client.Point) error + Write(metrics []Metric) error // Start the "service" that will provide an Output Start() error // Stop the "service" that will provide an Output diff --git a/plugins/inputs/github_webhooks/github_webhooks.go b/plugins/inputs/github_webhooks/github_webhooks.go index b4c0c5659..3eeb44c22 100644 --- a/plugins/inputs/github_webhooks/github_webhooks.go +++ b/plugins/inputs/github_webhooks/github_webhooks.go @@ -45,7 +45,7 @@ func (gh *GithubWebhooks) Gather(acc telegraf.Accumulator) error { gh.Lock() defer gh.Unlock() for _, event := range gh.events { - p := event.NewPoint() + p := event.NewMetric() acc.AddFields("github_webhooks", p.Fields(), p.Tags(), p.Time()) } gh.events = make([]Event, 0) diff --git a/plugins/inputs/github_webhooks/github_webhooks_models.go b/plugins/inputs/github_webhooks/github_webhooks_models.go index 16acad6b5..2902708c2 100644 --- a/plugins/inputs/github_webhooks/github_webhooks_models.go +++ b/plugins/inputs/github_webhooks/github_webhooks_models.go @@ -5,13 +5,13 @@ import ( "log" "time" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" ) const meas = "github_webhooks" type Event interface { - NewPoint() *client.Point + NewMetric() telegraf.Metric } type Repository struct { @@ -90,7 +90,7 @@ type CommitCommentEvent struct { Sender Sender `json:"sender"` } -func (s CommitCommentEvent) NewPoint() *client.Point { +func (s CommitCommentEvent) NewMetric() telegraf.Metric { event := "commit_comment" t := map[string]string{ "event": event, @@ -106,11 +106,11 @@ func (s CommitCommentEvent) NewPoint() *client.Point { "commit": s.Comment.Commit, "comment": s.Comment.Body, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type CreateEvent struct { @@ -120,7 +120,7 @@ type CreateEvent struct { Sender Sender `json:"sender"` } -func (s CreateEvent) NewPoint() *client.Point { +func (s CreateEvent) NewMetric() telegraf.Metric { event := "create" t := map[string]string{ "event": event, @@ -136,11 +136,11 @@ func (s CreateEvent) NewPoint() *client.Point { "ref": s.Ref, "refType": s.RefType, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type DeleteEvent struct { @@ -150,7 +150,7 @@ type DeleteEvent struct { Sender Sender `json:"sender"` } -func (s DeleteEvent) NewPoint() *client.Point { +func (s DeleteEvent) NewMetric() telegraf.Metric { event := "delete" t := map[string]string{ "event": event, @@ -166,11 +166,11 @@ func (s DeleteEvent) NewPoint() *client.Point { "ref": s.Ref, "refType": s.RefType, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type DeploymentEvent struct { @@ -179,7 +179,7 @@ type DeploymentEvent struct { Sender Sender `json:"sender"` } -func (s DeploymentEvent) NewPoint() *client.Point { +func (s DeploymentEvent) NewMetric() telegraf.Metric { event := "deployment" t := map[string]string{ "event": event, @@ -197,11 +197,11 @@ func (s DeploymentEvent) NewPoint() *client.Point { "environment": s.Deployment.Environment, "description": s.Deployment.Description, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type DeploymentStatusEvent struct { @@ -211,7 +211,7 @@ type DeploymentStatusEvent struct { Sender Sender `json:"sender"` } -func (s DeploymentStatusEvent) NewPoint() *client.Point { +func (s DeploymentStatusEvent) NewMetric() telegraf.Metric { event := "delete" t := map[string]string{ "event": event, @@ -231,11 +231,11 @@ func (s DeploymentStatusEvent) NewPoint() *client.Point { "depState": s.DeploymentStatus.State, "depDescription": s.DeploymentStatus.Description, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type ForkEvent struct { @@ -244,7 +244,7 @@ type ForkEvent struct { Sender Sender `json:"sender"` } -func (s ForkEvent) NewPoint() *client.Point { +func (s ForkEvent) NewMetric() telegraf.Metric { event := "fork" t := map[string]string{ "event": event, @@ -259,11 +259,11 @@ func (s ForkEvent) NewPoint() *client.Point { "issues": s.Repository.Issues, "fork": s.Forkee.Repository, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type GollumEvent struct { @@ -273,7 +273,7 @@ type GollumEvent struct { } // REVIEW: Going to be lazy and not deal with the pages. -func (s GollumEvent) NewPoint() *client.Point { +func (s GollumEvent) NewMetric() telegraf.Metric { event := "gollum" t := map[string]string{ "event": event, @@ -287,11 +287,11 @@ func (s GollumEvent) NewPoint() *client.Point { "forks": s.Repository.Forks, "issues": s.Repository.Issues, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type IssueCommentEvent struct { @@ -301,7 +301,7 @@ type IssueCommentEvent struct { Sender Sender `json:"sender"` } -func (s IssueCommentEvent) NewPoint() *client.Point { +func (s IssueCommentEvent) NewMetric() telegraf.Metric { event := "issue_comment" t := map[string]string{ "event": event, @@ -319,11 +319,11 @@ func (s IssueCommentEvent) NewPoint() *client.Point { "comments": s.Issue.Comments, "body": s.Comment.Body, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type IssuesEvent struct { @@ -333,7 +333,7 @@ type IssuesEvent struct { Sender Sender `json:"sender"` } -func (s IssuesEvent) NewPoint() *client.Point { +func (s IssuesEvent) NewMetric() telegraf.Metric { event := "issue" t := map[string]string{ "event": event, @@ -351,11 +351,11 @@ func (s IssuesEvent) NewPoint() *client.Point { "title": s.Issue.Title, "comments": s.Issue.Comments, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type MemberEvent struct { @@ -364,7 +364,7 @@ type MemberEvent struct { Sender Sender `json:"sender"` } -func (s MemberEvent) NewPoint() *client.Point { +func (s MemberEvent) NewMetric() telegraf.Metric { event := "member" t := map[string]string{ "event": event, @@ -380,11 +380,11 @@ func (s MemberEvent) NewPoint() *client.Point { "newMember": s.Member.User, "newMemberStatus": s.Member.Admin, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type MembershipEvent struct { @@ -394,7 +394,7 @@ type MembershipEvent struct { Team Team `json:"team"` } -func (s MembershipEvent) NewPoint() *client.Point { +func (s MembershipEvent) NewMetric() telegraf.Metric { event := "membership" t := map[string]string{ "event": event, @@ -406,11 +406,11 @@ func (s MembershipEvent) NewPoint() *client.Point { "newMember": s.Member.User, "newMemberStatus": s.Member.Admin, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type PageBuildEvent struct { @@ -418,7 +418,7 @@ type PageBuildEvent struct { Sender Sender `json:"sender"` } -func (s PageBuildEvent) NewPoint() *client.Point { +func (s PageBuildEvent) NewMetric() telegraf.Metric { event := "page_build" t := map[string]string{ "event": event, @@ -432,11 +432,11 @@ func (s PageBuildEvent) NewPoint() *client.Point { "forks": s.Repository.Forks, "issues": s.Repository.Issues, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type PublicEvent struct { @@ -444,7 +444,7 @@ type PublicEvent struct { Sender Sender `json:"sender"` } -func (s PublicEvent) NewPoint() *client.Point { +func (s PublicEvent) NewMetric() telegraf.Metric { event := "public" t := map[string]string{ "event": event, @@ -458,11 +458,11 @@ func (s PublicEvent) NewPoint() *client.Point { "forks": s.Repository.Forks, "issues": s.Repository.Issues, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type PullRequestEvent struct { @@ -472,7 +472,7 @@ type PullRequestEvent struct { Sender Sender `json:"sender"` } -func (s PullRequestEvent) NewPoint() *client.Point { +func (s PullRequestEvent) NewMetric() telegraf.Metric { event := "pull_request" t := map[string]string{ "event": event, @@ -495,11 +495,11 @@ func (s PullRequestEvent) NewPoint() *client.Point { "deletions": s.PullRequest.Deletions, "changedFiles": s.PullRequest.ChangedFiles, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type PullRequestReviewCommentEvent struct { @@ -509,7 +509,7 @@ type PullRequestReviewCommentEvent struct { Sender Sender `json:"sender"` } -func (s PullRequestReviewCommentEvent) NewPoint() *client.Point { +func (s PullRequestReviewCommentEvent) NewMetric() telegraf.Metric { event := "pull_request_review_comment" t := map[string]string{ "event": event, @@ -533,11 +533,11 @@ func (s PullRequestReviewCommentEvent) NewPoint() *client.Point { "commentFile": s.Comment.File, "comment": s.Comment.Comment, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type PushEvent struct { @@ -548,7 +548,7 @@ type PushEvent struct { Sender Sender `json:"sender"` } -func (s PushEvent) NewPoint() *client.Point { +func (s PushEvent) NewMetric() telegraf.Metric { event := "push" t := map[string]string{ "event": event, @@ -565,11 +565,11 @@ func (s PushEvent) NewPoint() *client.Point { "before": s.Before, "after": s.After, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type ReleaseEvent struct { @@ -578,7 +578,7 @@ type ReleaseEvent struct { Sender Sender `json:"sender"` } -func (s ReleaseEvent) NewPoint() *client.Point { +func (s ReleaseEvent) NewMetric() telegraf.Metric { event := "release" t := map[string]string{ "event": event, @@ -593,11 +593,11 @@ func (s ReleaseEvent) NewPoint() *client.Point { "issues": s.Repository.Issues, "tagName": s.Release.TagName, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type RepositoryEvent struct { @@ -605,7 +605,7 @@ type RepositoryEvent struct { Sender Sender `json:"sender"` } -func (s RepositoryEvent) NewPoint() *client.Point { +func (s RepositoryEvent) NewMetric() telegraf.Metric { event := "repository" t := map[string]string{ "event": event, @@ -619,11 +619,11 @@ func (s RepositoryEvent) NewPoint() *client.Point { "forks": s.Repository.Forks, "issues": s.Repository.Issues, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type StatusEvent struct { @@ -633,7 +633,7 @@ type StatusEvent struct { Sender Sender `json:"sender"` } -func (s StatusEvent) NewPoint() *client.Point { +func (s StatusEvent) NewMetric() telegraf.Metric { event := "status" t := map[string]string{ "event": event, @@ -649,11 +649,11 @@ func (s StatusEvent) NewPoint() *client.Point { "commit": s.Commit, "state": s.State, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type TeamAddEvent struct { @@ -662,7 +662,7 @@ type TeamAddEvent struct { Sender Sender `json:"sender"` } -func (s TeamAddEvent) NewPoint() *client.Point { +func (s TeamAddEvent) NewMetric() telegraf.Metric { event := "team_add" t := map[string]string{ "event": event, @@ -677,11 +677,11 @@ func (s TeamAddEvent) NewPoint() *client.Point { "issues": s.Repository.Issues, "teamName": s.Team.Name, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } type WatchEvent struct { @@ -689,7 +689,7 @@ type WatchEvent struct { Sender Sender `json:"sender"` } -func (s WatchEvent) NewPoint() *client.Point { +func (s WatchEvent) NewMetric() telegraf.Metric { event := "delete" t := map[string]string{ "event": event, @@ -703,9 +703,9 @@ func (s WatchEvent) NewPoint() *client.Point { "forks": s.Repository.Forks, "issues": s.Repository.Issues, } - p, err := client.NewPoint(meas, t, f, time.Now()) + m, err := telegraf.NewMetric(meas, t, f, time.Now()) if err != nil { log.Fatalf("Failed to create %v event", event) } - return p + return m } diff --git a/plugins/inputs/httpjson/httpjson_test.go b/plugins/inputs/httpjson/httpjson_test.go index 9eff03887..f5f81c7c3 100644 --- a/plugins/inputs/httpjson/httpjson_test.go +++ b/plugins/inputs/httpjson/httpjson_test.go @@ -136,7 +136,7 @@ func TestHttpJson200(t *testing.T) { require.NoError(t, err) assert.Equal(t, 12, acc.NFields()) // Set responsetime - for _, p := range acc.Points { + for _, p := range acc.Metrics { p.Fields["response_time"] = 1.0 } @@ -203,7 +203,7 @@ func TestHttpJson200Tags(t *testing.T) { var acc testutil.Accumulator err := service.Gather(&acc) // Set responsetime - for _, p := range acc.Points { + for _, p := range acc.Metrics { p.Fields["response_time"] = 1.0 } require.NoError(t, err) diff --git a/plugins/inputs/influxdb/influxdb_test.go b/plugins/inputs/influxdb/influxdb_test.go index ef6c1a97a..3d9e2a35b 100644 --- a/plugins/inputs/influxdb/influxdb_test.go +++ b/plugins/inputs/influxdb/influxdb_test.go @@ -71,7 +71,7 @@ func TestBasic(t *testing.T) { var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) - require.Len(t, acc.Points, 2) + require.Len(t, acc.Metrics, 2) fields := map[string]interface{}{ // JSON will truncate floats to integer representations. // Since there's no distinction in JSON, we can't assume it's an int. diff --git a/plugins/inputs/jolokia/jolokia_test.go b/plugins/inputs/jolokia/jolokia_test.go index 63b47ebff..961ba7055 100644 --- a/plugins/inputs/jolokia/jolokia_test.go +++ b/plugins/inputs/jolokia/jolokia_test.go @@ -85,7 +85,7 @@ func TestHttpJsonMultiValue(t *testing.T) { err := jolokia.Gather(&acc) assert.Nil(t, err) - assert.Equal(t, 1, len(acc.Points)) + assert.Equal(t, 1, len(acc.Metrics)) fields := map[string]interface{}{ "heap_memory_usage_init": 67108864.0, @@ -112,5 +112,5 @@ func TestHttpJsonOn404(t *testing.T) { err := jolokia.Gather(&acc) assert.Nil(t, err) - assert.Equal(t, 0, len(acc.Points)) + assert.Equal(t, 0, len(acc.Metrics)) } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index b25c32a4d..499b2e50b 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -5,7 +5,6 @@ import ( "strings" "sync" - "github.com/influxdata/influxdb/models" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -28,8 +27,8 @@ type Kafka struct { // channel for all kafka consumer errors errs <-chan *sarama.ConsumerError // channel for all incoming parsed kafka points - pointChan chan models.Point - done chan struct{} + metricC chan telegraf.Metric + done chan struct{} // doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer // this is mostly for test purposes, but there may be a use-case for it later. @@ -94,7 +93,7 @@ func (k *Kafka) Start() error { if k.PointBuffer == 0 { k.PointBuffer = 100000 } - k.pointChan = make(chan models.Point, k.PointBuffer) + k.metricC = make(chan telegraf.Metric, k.PointBuffer) // Start the kafka message reader go k.parser() @@ -113,18 +112,18 @@ func (k *Kafka) parser() { case err := <-k.errs: log.Printf("Kafka Consumer Error: %s\n", err.Error()) case msg := <-k.in: - points, err := models.ParsePoints(msg.Value) + metrics, err := telegraf.ParseMetrics(msg.Value) if err != nil { log.Printf("Could not parse kafka message: %s, error: %s", string(msg.Value), err.Error()) } - for _, point := range points { + for _, metric := range metrics { select { - case k.pointChan <- point: + case k.metricC <- metric: continue default: - log.Printf("Kafka Consumer buffer is full, dropping a point." + + log.Printf("Kafka Consumer buffer is full, dropping a metric." + " You may want to increase the point_buffer setting") } } @@ -152,9 +151,9 @@ func (k *Kafka) Stop() { func (k *Kafka) Gather(acc telegraf.Accumulator) error { k.Lock() defer k.Unlock() - npoints := len(k.pointChan) + npoints := len(k.metricC) for i := 0; i < npoints; i++ { - point := <-k.pointChan + point := <-k.metricC acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) } return nil diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go index 0611467ff..a3a4a6e35 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go @@ -51,13 +51,13 @@ func TestReadsMetricsFromKafka(t *testing.T) { // Verify that we can now gather the sent message var acc testutil.Accumulator // Sanity check - assert.Equal(t, 0, len(acc.Points), "There should not be any points") + assert.Equal(t, 0, len(acc.Metrics), "There should not be any points") // Gather points err = k.Gather(&acc) require.NoError(t, err) - if len(acc.Points) == 1 { - point := acc.Points[0] + if len(acc.Metrics) == 1 { + point := acc.Metrics[0] assert.Equal(t, "cpu_load_short", point.Measurement) assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields) assert.Equal(t, map[string]string{ @@ -83,7 +83,7 @@ func waitForPoint(k *Kafka, t *testing.T) { counter++ if counter > 1000 { t.Fatal("Waited for 5s, point never arrived to consumer") - } else if len(k.pointChan) == 1 { + } else if len(k.metricC) == 1 { return } } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 560e130c0..be8984300 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/influxdata/influxdb/models" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" "github.com/Shopify/sarama" @@ -29,7 +29,7 @@ func NewTestKafka() (*Kafka, chan *sarama.ConsumerMessage) { doNotCommitMsgs: true, errs: make(chan *sarama.ConsumerError, pointBuffer), done: make(chan struct{}), - pointChan: make(chan models.Point, pointBuffer), + metricC: make(chan telegraf.Metric, pointBuffer), } return &k, in } @@ -43,7 +43,7 @@ func TestRunParser(t *testing.T) { in <- saramaMsg(testMsg) time.Sleep(time.Millisecond) - assert.Equal(t, len(k.pointChan), 1) + assert.Equal(t, len(k.metricC), 1) } // Test that the parser ignores invalid messages @@ -55,7 +55,7 @@ func TestRunParserInvalidMsg(t *testing.T) { in <- saramaMsg(invalidMsg) time.Sleep(time.Millisecond) - assert.Equal(t, len(k.pointChan), 0) + assert.Equal(t, len(k.metricC), 0) } // Test that points are dropped when we hit the buffer limit @@ -69,7 +69,7 @@ func TestRunParserRespectsBuffer(t *testing.T) { } time.Sleep(time.Millisecond) - assert.Equal(t, len(k.pointChan), 5) + assert.Equal(t, len(k.metricC), 5) } // Test that the parser parses kafka messages into points @@ -84,7 +84,7 @@ func TestRunParserAndGather(t *testing.T) { acc := testutil.Accumulator{} k.Gather(&acc) - assert.Equal(t, len(acc.Points), 1) + assert.Equal(t, len(acc.Metrics), 1) acc.AssertContainsFields(t, "cpu_load_short", map[string]interface{}{"value": float64(23422)}) } diff --git a/plugins/inputs/postgresql/postgresql_test.go b/plugins/inputs/postgresql/postgresql_test.go index 8baae39a6..3a2ccb1b0 100644 --- a/plugins/inputs/postgresql/postgresql_test.go +++ b/plugins/inputs/postgresql/postgresql_test.go @@ -113,7 +113,7 @@ func TestPostgresqlDefaultsToAllDatabases(t *testing.T) { var found bool - for _, pnt := range acc.Points { + for _, pnt := range acc.Metrics { if pnt.Measurement == "postgresql" { if pnt.Tags["db"] == "postgres" { found = true diff --git a/plugins/inputs/system/cpu_test.go b/plugins/inputs/system/cpu_test.go index 77d90e2a5..0db92d39b 100644 --- a/plugins/inputs/system/cpu_test.go +++ b/plugins/inputs/system/cpu_test.go @@ -123,7 +123,7 @@ func assertContainsTaggedFloat( tags map[string]string, ) { var actualValue float64 - for _, pt := range acc.Points { + for _, pt := range acc.Metrics { if pt.Measurement == measurement { for fieldname, value := range pt.Fields { if fieldname == field { diff --git a/plugins/inputs/system/disk_test.go b/plugins/inputs/system/disk_test.go index 8a31957c0..86537be23 100644 --- a/plugins/inputs/system/disk_test.go +++ b/plugins/inputs/system/disk_test.go @@ -57,9 +57,9 @@ func TestDiskStats(t *testing.T) { err = (&DiskStats{ps: &mps}).Gather(&acc) require.NoError(t, err) - numDiskPoints := acc.NFields() - expectedAllDiskPoints := 14 - assert.Equal(t, expectedAllDiskPoints, numDiskPoints) + numDiskMetrics := acc.NFields() + expectedAllDiskMetrics := 14 + assert.Equal(t, expectedAllDiskMetrics, numDiskMetrics) tags1 := map[string]string{ "path": "/", @@ -91,15 +91,15 @@ func TestDiskStats(t *testing.T) { acc.AssertContainsTaggedFields(t, "disk", fields1, tags1) acc.AssertContainsTaggedFields(t, "disk", fields2, tags2) - // We expect 6 more DiskPoints to show up with an explicit match on "/" + // We expect 6 more DiskMetrics to show up with an explicit match on "/" // and /home not matching the /dev in MountPoints err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/dev"}}).Gather(&acc) - assert.Equal(t, expectedAllDiskPoints+7, acc.NFields()) + assert.Equal(t, expectedAllDiskMetrics+7, acc.NFields()) // We should see all the diskpoints as MountPoints includes both // / and /home err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/home"}}).Gather(&acc) - assert.Equal(t, 2*expectedAllDiskPoints+7, acc.NFields()) + assert.Equal(t, 2*expectedAllDiskMetrics+7, acc.NFields()) } // func TestDiskIOStats(t *testing.T) { @@ -138,9 +138,9 @@ func TestDiskStats(t *testing.T) { // err = (&DiskIOStats{ps: &mps}).Gather(&acc) // require.NoError(t, err) -// numDiskIOPoints := acc.NFields() -// expectedAllDiskIOPoints := 14 -// assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints) +// numDiskIOMetrics := acc.NFields() +// expectedAllDiskIOMetrics := 14 +// assert.Equal(t, expectedAllDiskIOMetrics, numDiskIOMetrics) // dtags1 := map[string]string{ // "name": "sda1", @@ -166,10 +166,10 @@ func TestDiskStats(t *testing.T) { // assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2)) // assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2)) -// // We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1" +// // We expect 7 more DiskIOMetrics to show up with an explicit match on "sdb1" // // and serial should be missing from the tags with SkipSerialNumber set // err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc) -// assert.Equal(t, expectedAllDiskIOPoints+7, acc.NFields()) +// assert.Equal(t, expectedAllDiskIOMetrics+7, acc.NFields()) // dtags3 := map[string]string{ // "name": "sdb1", diff --git a/plugins/inputs/system/memory_test.go b/plugins/inputs/system/memory_test.go index 0a85bc869..a7f7905f9 100644 --- a/plugins/inputs/system/memory_test.go +++ b/plugins/inputs/system/memory_test.go @@ -55,7 +55,7 @@ func TestMemStats(t *testing.T) { } acc.AssertContainsTaggedFields(t, "mem", memfields, make(map[string]string)) - acc.Points = nil + acc.Metrics = nil err = (&SwapStats{&mps}).Gather(&acc) require.NoError(t, err) diff --git a/plugins/inputs/system/net_test.go b/plugins/inputs/system/net_test.go index 3297acf07..0391a005a 100644 --- a/plugins/inputs/system/net_test.go +++ b/plugins/inputs/system/net_test.go @@ -85,7 +85,7 @@ func TestNetStats(t *testing.T) { } acc.AssertContainsTaggedFields(t, "net", fields2, ntags) - acc.Points = nil + acc.Metrics = nil err = (&NetStats{&mps}).Gather(&acc) require.NoError(t, err) diff --git a/plugins/inputs/zfs/zfs_test.go b/plugins/inputs/zfs/zfs_test.go index e40d91c02..514bad3d4 100644 --- a/plugins/inputs/zfs/zfs_test.go +++ b/plugins/inputs/zfs/zfs_test.go @@ -148,7 +148,7 @@ func TestZfsPoolMetrics(t *testing.T) { require.NoError(t, err) require.False(t, acc.HasMeasurement("zfs_pool")) - acc.Points = nil + acc.Metrics = nil z = &Zfs{KstatPath: testKstatPath, KstatMetrics: []string{"arcstats"}, PoolMetrics: true} err = z.Gather(&acc) @@ -198,7 +198,7 @@ func TestZfsGeneratesMetrics(t *testing.T) { require.NoError(t, err) acc.AssertContainsTaggedFields(t, "zfs", intMetrics, tags) - acc.Points = nil + acc.Metrics = nil //two pools, all metrics err = os.MkdirAll(testKstatPath+"/STORAGE", 0755) @@ -217,7 +217,7 @@ func TestZfsGeneratesMetrics(t *testing.T) { require.NoError(t, err) acc.AssertContainsTaggedFields(t, "zfs", intMetrics, tags) - acc.Points = nil + acc.Metrics = nil intMetrics = getKstatMetricsArcOnly() diff --git a/plugins/outputs/amon/amon.go b/plugins/outputs/amon/amon.go index af3d37146..40505f62f 100644 --- a/plugins/outputs/amon/amon.go +++ b/plugins/outputs/amon/amon.go @@ -8,10 +8,9 @@ import ( "net/http" "strings" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" - "github.com/influxdata/telegraf" ) type Amon struct { @@ -39,7 +38,7 @@ type TimeSeries struct { type Metric struct { Metric string `json:"metric"` - Points [1]Point `json:"points"` + Points [1]Point `json:"metrics"` } type Point [2]float64 @@ -54,17 +53,17 @@ func (a *Amon) Connect() error { return nil } -func (a *Amon) Write(points []*client.Point) error { - if len(points) == 0 { +func (a *Amon) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { return nil } ts := TimeSeries{} tempSeries := []*Metric{} metricCounter := 0 - for _, pt := range points { - mname := strings.Replace(pt.Name(), "_", ".", -1) - if amonPts, err := buildPoints(pt); err == nil { + for _, m := range metrics { + mname := strings.Replace(m.Name(), "_", ".", -1) + if amonPts, err := buildMetrics(m); err == nil { for fieldName, amonPt := range amonPts { metric := &Metric{ Metric: mname + "_" + strings.Replace(fieldName, "_", ".", -1), @@ -74,7 +73,7 @@ func (a *Amon) Write(points []*client.Point) error { metricCounter++ } } else { - log.Printf("unable to build Metric for %s, skipping\n", pt.Name()) + log.Printf("unable to build Metric for %s, skipping\n", m.Name()) } } @@ -116,17 +115,17 @@ func (a *Amon) authenticatedUrl() string { return fmt.Sprintf("%s/api/system/%s", a.AmonInstance, a.ServerKey) } -func buildPoints(pt *client.Point) (map[string]Point, error) { - pts := make(map[string]Point) - for k, v := range pt.Fields() { +func buildMetrics(m telegraf.Metric) (map[string]Point, error) { + ms := make(map[string]Point) + for k, v := range m.Fields() { var p Point if err := p.setValue(v); err != nil { - return pts, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) + return ms, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) } - p[0] = float64(pt.Time().Unix()) - pts[k] = p + p[0] = float64(m.Time().Unix()) + ms[k] = p } - return pts, nil + return ms, nil } func (p *Point) setValue(v interface{}) error { diff --git a/plugins/outputs/amon/amon_test.go b/plugins/outputs/amon/amon_test.go index b725bab9e..408f0607b 100644 --- a/plugins/outputs/amon/amon_test.go +++ b/plugins/outputs/amon/amon_test.go @@ -8,17 +8,17 @@ import ( "github.com/influxdata/telegraf/testutil" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" ) func TestBuildPoint(t *testing.T) { var tagtests = []struct { - ptIn *client.Point + ptIn telegraf.Metric outPt Point err error }{ { - testutil.TestPoint(float64(0.0), "testpt"), + testutil.TestMetric(float64(0.0), "testpt"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 0.0, @@ -26,7 +26,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestPoint(float64(1.0), "testpt"), + testutil.TestMetric(float64(1.0), "testpt"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 1.0, @@ -34,7 +34,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestPoint(int(10), "testpt"), + testutil.TestMetric(int(10), "testpt"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 10.0, @@ -42,7 +42,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestPoint(int32(112345), "testpt"), + testutil.TestMetric(int32(112345), "testpt"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0, @@ -50,7 +50,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestPoint(int64(112345), "testpt"), + testutil.TestMetric(int64(112345), "testpt"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0, @@ -58,7 +58,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestPoint(float32(11234.5), "testpt"), + testutil.TestMetric(float32(11234.5), "testpt"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5, @@ -66,7 +66,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestPoint("11234.5", "testpt"), + testutil.TestMetric("11234.5", "testpt"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5, @@ -75,7 +75,7 @@ func TestBuildPoint(t *testing.T) { }, } for _, tt := range tagtests { - pt, err := buildPoints(tt.ptIn) + pt, err := buildMetrics(tt.ptIn) if err != nil && tt.err == nil { t.Errorf("%s: unexpected error, %+v\n", tt.ptIn.Name(), err) } diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index 688ab6a47..14364519e 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -10,9 +10,8 @@ import ( "sync" "time" - "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" "github.com/streadway/amqp" ) @@ -150,17 +149,15 @@ func (q *AMQP) Description() string { return "Configuration for the AMQP server to send metrics to" } -func (q *AMQP) Write(points []*client.Point) error { +func (q *AMQP) Write(metrics []telegraf.Metric) error { q.Lock() defer q.Unlock() - if len(points) == 0 { + if len(metrics) == 0 { return nil } var outbuf = make(map[string][][]byte) - for _, p := range points { - // Combine tags from Point and BatchPoints and grab the resulting - // line-protocol output string to write to AMQP + for _, p := range metrics { var value, key string value = p.String() diff --git a/plugins/outputs/amqp/amqp_test.go b/plugins/outputs/amqp/amqp_test.go index a65634cab..4cecff02e 100644 --- a/plugins/outputs/amqp/amqp_test.go +++ b/plugins/outputs/amqp/amqp_test.go @@ -23,6 +23,6 @@ func TestConnectAndWrite(t *testing.T) { require.NoError(t, err) // Verify that we can successfully write data to the amqp broker - err = q.Write(testutil.MockBatchPoints().Points()) + err = q.Write(testutil.MockMetrics()) require.NoError(t, err) } diff --git a/plugins/outputs/cloudwatch/README.md b/plugins/outputs/cloudwatch/README.md index 853d038c3..0d9a3e9cf 100644 --- a/plugins/outputs/cloudwatch/README.md +++ b/plugins/outputs/cloudwatch/README.md @@ -1,6 +1,6 @@ ## Amazon CloudWatch Output for Telegraf -This plugin will send points to Amazon CloudWatch. +This plugin will send metrics to Amazon CloudWatch. ## Amazon Authentication diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go index f581a5219..a24f741a4 100644 --- a/plugins/outputs/cloudwatch/cloudwatch.go +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -14,9 +14,8 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatch" - "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" ) type CloudWatch struct { @@ -73,9 +72,9 @@ func (c *CloudWatch) Close() error { return nil } -func (c *CloudWatch) Write(points []*client.Point) error { - for _, pt := range points { - err := c.WriteSinglePoint(pt) +func (c *CloudWatch) Write(metrics []telegraf.Metric) error { + for _, m := range metrics { + err := c.WriteSinglePoint(m) if err != nil { return err } @@ -87,10 +86,10 @@ func (c *CloudWatch) Write(points []*client.Point) error { // Write data for a single point. A point can have many fields and one field // is equal to one MetricDatum. There is a limit on how many MetricDatums a // request can have so we process one Point at a time. -func (c *CloudWatch) WriteSinglePoint(point *client.Point) error { +func (c *CloudWatch) WriteSinglePoint(point telegraf.Metric) error { datums := BuildMetricDatum(point) - const maxDatumsPerCall = 20 // PutMetricData only supports up to 20 data points per call + const maxDatumsPerCall = 20 // PutMetricData only supports up to 20 data metrics per call for _, partition := range PartitionDatums(maxDatumsPerCall, datums) { err := c.WriteToCloudWatch(partition) @@ -144,7 +143,7 @@ func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch // Make a MetricDatum for each field in a Point. Only fields with values that can be // converted to float64 are supported. Non-supported fields are skipped. -func BuildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum { +func BuildMetricDatum(point telegraf.Metric) []*cloudwatch.MetricDatum { datums := make([]*cloudwatch.MetricDatum, len(point.Fields())) i := 0 @@ -190,15 +189,15 @@ func BuildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum { // Make a list of Dimensions by using a Point's tags. CloudWatch supports up to // 10 dimensions per metric so we only keep up to the first 10 alphabetically. // This always includes the "host" tag if it exists. -func BuildDimensions(ptTags map[string]string) []*cloudwatch.Dimension { +func BuildDimensions(mTags map[string]string) []*cloudwatch.Dimension { const MaxDimensions = 10 - dimensions := make([]*cloudwatch.Dimension, int(math.Min(float64(len(ptTags)), MaxDimensions))) + dimensions := make([]*cloudwatch.Dimension, int(math.Min(float64(len(mTags)), MaxDimensions))) i := 0 // This is pretty ugly but we always want to include the "host" tag if it exists. - if host, ok := ptTags["host"]; ok { + if host, ok := mTags["host"]; ok { dimensions[i] = &cloudwatch.Dimension{ Name: aws.String("host"), Value: aws.String(host), @@ -207,7 +206,7 @@ func BuildDimensions(ptTags map[string]string) []*cloudwatch.Dimension { } var keys []string - for k := range ptTags { + for k := range mTags { if k != "host" { keys = append(keys, k) } @@ -221,7 +220,7 @@ func BuildDimensions(ptTags map[string]string) []*cloudwatch.Dimension { dimensions[i] = &cloudwatch.Dimension{ Name: aws.String(k), - Value: aws.String(ptTags[k]), + Value: aws.String(mTags[k]), } i += 1 diff --git a/plugins/outputs/cloudwatch/cloudwatch_test.go b/plugins/outputs/cloudwatch/cloudwatch_test.go index 2041e14fd..06b41fa21 100644 --- a/plugins/outputs/cloudwatch/cloudwatch_test.go +++ b/plugins/outputs/cloudwatch/cloudwatch_test.go @@ -7,7 +7,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatch" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" @@ -19,7 +19,7 @@ func TestBuildDimensions(t *testing.T) { assert := assert.New(t) - testPoint := testutil.TestPoint(1) + testPoint := testutil.TestMetric(1) dimensions := BuildDimensions(testPoint.Tags()) tagKeys := make([]string, len(testPoint.Tags())) @@ -46,25 +46,25 @@ func TestBuildDimensions(t *testing.T) { } } -// Test that points with valid values have a MetricDatum created where as non valid do not. +// Test that metrics with valid values have a MetricDatum created where as non valid do not. // Skips "time.Time" type as something is converting the value to string. func TestBuildMetricDatums(t *testing.T) { assert := assert.New(t) - validPoints := []*client.Point{ - testutil.TestPoint(1), - testutil.TestPoint(int32(1)), - testutil.TestPoint(int64(1)), - testutil.TestPoint(float64(1)), - testutil.TestPoint(true), + validMetrics := []telegraf.Metric{ + testutil.TestMetric(1), + testutil.TestMetric(int32(1)), + testutil.TestMetric(int64(1)), + testutil.TestMetric(float64(1)), + testutil.TestMetric(true), } - for _, point := range validPoints { + for _, point := range validMetrics { datums := BuildMetricDatum(point) assert.Equal(1, len(datums), "Valid type should create a Datum") } - nonValidPoint := testutil.TestPoint("Foo") + nonValidPoint := testutil.TestMetric("Foo") assert.Equal(0, len(BuildMetricDatum(nonValidPoint)), "Invalid type should not create a Datum") } diff --git a/plugins/outputs/datadog/datadog.go b/plugins/outputs/datadog/datadog.go index 78555fc1f..aa2cb4ca0 100644 --- a/plugins/outputs/datadog/datadog.go +++ b/plugins/outputs/datadog/datadog.go @@ -10,10 +10,9 @@ import ( "sort" "strings" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" - "github.com/influxdata/telegraf" ) type Datadog struct { @@ -38,9 +37,9 @@ type TimeSeries struct { type Metric struct { Metric string `json:"metric"` - Points [1]Point `json:"points"` + Points [1]Point `json:"metrics"` Host string `json:"host"` - Tags []string `json:"tags,omitempty"` + Tags []string `json:"tags,omitemmy"` } type Point [2]float64 @@ -63,27 +62,29 @@ func (d *Datadog) Connect() error { return nil } -func (d *Datadog) Write(points []*client.Point) error { - if len(points) == 0 { +func (d *Datadog) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { return nil } ts := TimeSeries{} tempSeries := []*Metric{} metricCounter := 0 - for _, pt := range points { - mname := strings.Replace(pt.Name(), "_", ".", -1) - if amonPts, err := buildPoints(pt); err == nil { - for fieldName, amonPt := range amonPts { + for _, m := range metrics { + mname := strings.Replace(m.Name(), "_", ".", -1) + if dogMs, err := buildMetrics(m); err == nil { + for fieldName, dogM := range dogMs { metric := &Metric{ Metric: mname + strings.Replace(fieldName, "_", ".", -1), + Tags: buildTags(m.Tags()), + Host: m.Tags()["host"], } - metric.Points[0] = amonPt + metric.Points[0] = dogM tempSeries = append(tempSeries, metric) metricCounter++ } } else { - log.Printf("unable to build Metric for %s, skipping\n", pt.Name()) + log.Printf("unable to build Metric for %s, skipping\n", m.Name()) } } @@ -127,23 +128,23 @@ func (d *Datadog) authenticatedUrl() string { return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode()) } -func buildPoints(pt *client.Point) (map[string]Point, error) { - pts := make(map[string]Point) - for k, v := range pt.Fields() { +func buildMetrics(m telegraf.Metric) (map[string]Point, error) { + ms := make(map[string]Point) + for k, v := range m.Fields() { var p Point if err := p.setValue(v); err != nil { - return pts, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) + return ms, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) } - p[0] = float64(pt.Time().Unix()) - pts[k] = p + p[0] = float64(m.Time().Unix()) + ms[k] = p } - return pts, nil + return ms, nil } -func buildTags(ptTags map[string]string) []string { - tags := make([]string, len(ptTags)) +func buildTags(mTags map[string]string) []string { + tags := make([]string, len(mTags)) index := 0 - for k, v := range ptTags { + for k, v := range mTags { tags[index] = fmt.Sprintf("%s:%s", k, v) index += 1 } diff --git a/plugins/outputs/datadog/datadog_test.go b/plugins/outputs/datadog/datadog_test.go index 968a8e9c8..30495a044 100644 --- a/plugins/outputs/datadog/datadog_test.go +++ b/plugins/outputs/datadog/datadog_test.go @@ -11,7 +11,7 @@ import ( "github.com/influxdata/telegraf/testutil" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -38,7 +38,7 @@ func TestUriOverride(t *testing.T) { d.Apikey = "123456" err := d.Connect() require.NoError(t, err) - err = d.Write(testutil.MockBatchPoints().Points()) + err = d.Write(testutil.MockMetrics()) require.NoError(t, err) } @@ -57,7 +57,7 @@ func TestBadStatusCode(t *testing.T) { d.Apikey = "123456" err := d.Connect() require.NoError(t, err) - err = d.Write(testutil.MockBatchPoints().Points()) + err = d.Write(testutil.MockMetrics()) if err == nil { t.Errorf("error expected but none returned") } else { @@ -100,12 +100,12 @@ func TestBuildTags(t *testing.T) { func TestBuildPoint(t *testing.T) { var tagtests = []struct { - ptIn *client.Point + ptIn telegraf.Metric outPt Point err error }{ { - testutil.TestPoint(0.0, "test1"), + testutil.TestMetric(0.0, "test1"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 0.0, @@ -113,7 +113,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestPoint(1.0, "test2"), + testutil.TestMetric(1.0, "test2"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 1.0, @@ -121,7 +121,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestPoint(10, "test3"), + testutil.TestMetric(10, "test3"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 10.0, @@ -129,7 +129,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestPoint(int32(112345), "test4"), + testutil.TestMetric(int32(112345), "test4"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0, @@ -137,7 +137,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestPoint(int64(112345), "test5"), + testutil.TestMetric(int64(112345), "test5"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0, @@ -145,7 +145,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestPoint(float32(11234.5), "test6"), + testutil.TestMetric(float32(11234.5), "test6"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5, @@ -153,7 +153,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestPoint("11234.5", "test7"), + testutil.TestMetric("11234.5", "test7"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5, @@ -162,7 +162,7 @@ func TestBuildPoint(t *testing.T) { }, } for _, tt := range tagtests { - pt, err := buildPoints(tt.ptIn) + pt, err := buildMetrics(tt.ptIn) if err != nil && tt.err == nil { t.Errorf("%s: unexpected error, %+v\n", tt.ptIn.Name(), err) } diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index f7e58672a..17a831ad6 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -3,9 +3,8 @@ package graphite import ( "errors" "fmt" - "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" "log" "math/rand" "net" @@ -68,10 +67,10 @@ func (g *Graphite) Description() string { // Choose a random server in the cluster to write to until a successful write // occurs, logging each unsuccessful. If all servers fail, return error. -func (g *Graphite) Write(points []*client.Point) error { +func (g *Graphite) Write(metrics []telegraf.Metric) error { // Prepare data var bp []string - for _, point := range points { + for _, point := range metrics { // Get name name := point.Name() // Convert UnixNano to Unix timestamps diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index be4cc2472..f6f048fc3 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -20,20 +20,20 @@ func TestGraphiteError(t *testing.T) { Servers: []string{"127.0.0.1:2003", "127.0.0.1:12003"}, Prefix: "my.prefix", } - // Init points - pt1, _ := client.NewPoint( + // Init metrics + pt1, _ := telegraf.NewMetric( "mymeasurement", map[string]string{"host": "192.168.0.1"}, map[string]interface{}{"mymeasurement": float64(3.14)}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) // Prepare point list - var points []*client.Point - points = append(points, pt1) + var metrics []telegraf.Metric + metrics = append(metrics, pt1) // Error err1 := g.Connect() require.NoError(t, err1) - err2 := g.Write(points) + err2 := g.Write(metrics) require.Error(t, err2) assert.Equal(t, "Could not write to any Graphite server in cluster\n", err2.Error()) } @@ -44,30 +44,30 @@ func TestGraphiteOK(t *testing.T) { g := Graphite{ Prefix: "my.prefix", } - // Init points - pt1, _ := client.NewPoint( + // Init metrics + pt1, _ := telegraf.NewMetric( "mymeasurement", map[string]string{"host": "192.168.0.1"}, map[string]interface{}{"mymeasurement": float64(3.14)}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) - pt2, _ := client.NewPoint( + pt2, _ := telegraf.NewMetric( "mymeasurement", map[string]string{"host": "192.168.0.1"}, map[string]interface{}{"value": float64(3.14)}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) - pt3, _ := client.NewPoint( + pt3, _ := telegraf.NewMetric( "my_measurement", map[string]string{"host": "192.168.0.1"}, map[string]interface{}{"value": float64(3.14)}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) // Prepare point list - var points []*client.Point - points = append(points, pt1) - points = append(points, pt2) - points = append(points, pt3) + var metrics []telegraf.Metric + metrics = append(metrics, pt1) + metrics = append(metrics, pt2) + metrics = append(metrics, pt3) // Start TCP server wg.Add(1) go TCPServer(t, &wg) @@ -78,7 +78,7 @@ func TestGraphiteOK(t *testing.T) { wg.Wait() require.NoError(t, err1) // Send Data - err2 := g.Write(points) + err2 := g.Write(metrics) require.NoError(t, err2) wg.Add(1) // Waiting TCPserver diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 41bfba9ba..2b0582283 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -9,10 +9,11 @@ import ( "strings" "time" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" - "github.com/influxdata/telegraf" + + "github.com/influxdata/influxdb/client/v2" ) type InfluxDB struct { @@ -131,14 +132,14 @@ func (i *InfluxDB) Description() string { // Choose a random server in the cluster to write to until a successful write // occurs, logging each unsuccessful. If all servers fail, return error. -func (i *InfluxDB) Write(points []*client.Point) error { +func (i *InfluxDB) Write(metrics []telegraf.Metric) error { bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ Database: i.Database, Precision: i.Precision, }) - for _, point := range points { - bp.AddPoint(point) + for _, metric := range metrics { + bp.AddPoint(metric.Point()) } // This will get set to nil if a successful write occurs diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index 5da0c056f..1414fa839 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -18,7 +18,7 @@ func TestUDPInflux(t *testing.T) { err := i.Connect() require.NoError(t, err) - err = i.Write(testutil.MockBatchPoints().Points()) + err = i.Write(testutil.MockMetrics()) require.NoError(t, err) } @@ -36,6 +36,6 @@ func TestHTTPInflux(t *testing.T) { err := i.Connect() require.NoError(t, err) - err = i.Write(testutil.MockBatchPoints().Points()) + err = i.Write(testutil.MockMetrics()) require.NoError(t, err) } diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 37773b7d1..9aa8eb259 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -6,9 +6,8 @@ import ( "errors" "fmt" "github.com/Shopify/sarama" - "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" "io/ioutil" ) @@ -113,14 +112,12 @@ func (k *Kafka) Description() string { return "Configuration for the Kafka server to send metrics to" } -func (k *Kafka) Write(points []*client.Point) error { - if len(points) == 0 { +func (k *Kafka) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { return nil } - for _, p := range points { - // Combine tags from Point and BatchPoints and grab the resulting - // line-protocol output string to write to Kafka + for _, p := range metrics { value := p.String() m := &sarama.ProducerMessage{ diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index 2af343778..103f268cb 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -23,6 +23,6 @@ func TestConnectAndWrite(t *testing.T) { require.NoError(t, err) // Verify that we can successfully write data to the kafka broker - err = k.Write(testutil.MockBatchPoints().Points()) + err = k.Write(testutil.MockMetrics()) require.NoError(t, err) } diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index 4824a8cea..a25eb477c 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -14,9 +14,8 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" ) type KinesisOutput struct { @@ -104,7 +103,7 @@ func (k *KinesisOutput) Close() error { return nil } -func FormatMetric(k *KinesisOutput, point *client.Point) (string, error) { +func FormatMetric(k *KinesisOutput, point telegraf.Metric) (string, error) { if k.Format == "string" { return point.String(), nil } else { @@ -139,16 +138,16 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du return time.Since(start) } -func (k *KinesisOutput) Write(points []*client.Point) error { +func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { var sz uint32 = 0 - if len(points) == 0 { + if len(metrics) == 0 { return nil } r := []*kinesis.PutRecordsRequestEntry{} - for _, p := range points { + for _, p := range metrics { atomic.AddUint32(&sz, 1) metric, _ := FormatMetric(k, p) diff --git a/plugins/outputs/kinesis/kinesis_test.go b/plugins/outputs/kinesis/kinesis_test.go index 76eb6ebca..b398d7e8a 100644 --- a/plugins/outputs/kinesis/kinesis_test.go +++ b/plugins/outputs/kinesis/kinesis_test.go @@ -15,7 +15,7 @@ func TestFormatMetric(t *testing.T) { Format: "string", } - p := testutil.MockBatchPoints().Points()[0] + p := testutil.MockMetrics()[0] valid_string := "test1,tag1=value1 value=1 1257894000000000000" func_string, err := FormatMetric(k, p) diff --git a/plugins/outputs/librato/librato.go b/plugins/outputs/librato/librato.go index 9903b17b3..3d3abbf03 100644 --- a/plugins/outputs/librato/librato.go +++ b/plugins/outputs/librato/librato.go @@ -7,10 +7,9 @@ import ( "log" "net/http" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" - "github.com/influxdata/telegraf" ) type Librato struct { @@ -41,7 +40,7 @@ var sampleConfig = ` # timeout = "5s" ` -type Metrics struct { +type LMetrics struct { Gauges []*Gauge `json:"gauges"` } @@ -70,27 +69,27 @@ func (l *Librato) Connect() error { return nil } -func (l *Librato) Write(points []*client.Point) error { - if len(points) == 0 { +func (l *Librato) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { return nil } - metrics := Metrics{} + lmetrics := LMetrics{} tempGauges := []*Gauge{} metricCounter := 0 - for _, pt := range points { - if gauges, err := l.buildGauges(pt); err == nil { + for _, m := range metrics { + if gauges, err := l.buildGauges(m); err == nil { for _, gauge := range gauges { tempGauges = append(tempGauges, gauge) metricCounter++ } } else { - log.Printf("unable to build Gauge for %s, skipping\n", pt.Name()) + log.Printf("unable to build Gauge for %s, skipping\n", m.Name()) } } - metrics.Gauges = make([]*Gauge, metricCounter) - copy(metrics.Gauges, tempGauges[0:]) + lmetrics.Gauges = make([]*Gauge, metricCounter) + copy(lmetrics.Gauges, tempGauges[0:]) metricsBytes, err := json.Marshal(metrics) if err != nil { return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error()) @@ -123,19 +122,19 @@ func (l *Librato) Description() string { return "Configuration for Librato API to send metrics to." } -func (l *Librato) buildGauges(pt *client.Point) ([]*Gauge, error) { +func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) { gauges := []*Gauge{} - for fieldName, value := range pt.Fields() { + for fieldName, value := range m.Fields() { gauge := &Gauge{ - Name: pt.Name() + "_" + fieldName, - MeasureTime: pt.Time().Unix(), + Name: m.Name() + "_" + fieldName, + MeasureTime: m.Time().Unix(), } if err := gauge.setValue(value); err != nil { return gauges, fmt.Errorf("unable to extract value from Fields, %s\n", err.Error()) } if l.SourceTag != "" { - if source, ok := pt.Tags()[l.SourceTag]; ok { + if source, ok := m.Tags()[l.SourceTag]; ok { gauge.Source = source } else { return gauges, diff --git a/plugins/outputs/librato/librato_test.go b/plugins/outputs/librato/librato_test.go index 25418baa5..c0b6ba021 100644 --- a/plugins/outputs/librato/librato_test.go +++ b/plugins/outputs/librato/librato_test.go @@ -11,7 +11,7 @@ import ( "github.com/influxdata/telegraf/testutil" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" "github.com/stretchr/testify/require" ) @@ -39,7 +39,7 @@ func TestUriOverride(t *testing.T) { l.ApiToken = "123456" err := l.Connect() require.NoError(t, err) - err = l.Write(testutil.MockBatchPoints().Points()) + err = l.Write(testutil.MockMetrics()) require.NoError(t, err) } @@ -61,7 +61,7 @@ func TestBadStatusCode(t *testing.T) { l.ApiToken = "123456" err := l.Connect() require.NoError(t, err) - err = l.Write(testutil.MockBatchPoints().Points()) + err = l.Write(testutil.MockMetrics()) if err == nil { t.Errorf("error expected but none returned") } else { @@ -71,12 +71,12 @@ func TestBadStatusCode(t *testing.T) { func TestBuildGauge(t *testing.T) { var gaugeTests = []struct { - ptIn *client.Point + ptIn telegraf.Metric outGauge *Gauge err error }{ { - testutil.TestPoint(0.0, "test1"), + testutil.TestMetric(0.0, "test1"), &Gauge{ Name: "test1", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), @@ -85,7 +85,7 @@ func TestBuildGauge(t *testing.T) { nil, }, { - testutil.TestPoint(1.0, "test2"), + testutil.TestMetric(1.0, "test2"), &Gauge{ Name: "test2", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), @@ -94,7 +94,7 @@ func TestBuildGauge(t *testing.T) { nil, }, { - testutil.TestPoint(10, "test3"), + testutil.TestMetric(10, "test3"), &Gauge{ Name: "test3", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), @@ -103,7 +103,7 @@ func TestBuildGauge(t *testing.T) { nil, }, { - testutil.TestPoint(int32(112345), "test4"), + testutil.TestMetric(int32(112345), "test4"), &Gauge{ Name: "test4", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), @@ -112,7 +112,7 @@ func TestBuildGauge(t *testing.T) { nil, }, { - testutil.TestPoint(int64(112345), "test5"), + testutil.TestMetric(int64(112345), "test5"), &Gauge{ Name: "test5", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), @@ -121,7 +121,7 @@ func TestBuildGauge(t *testing.T) { nil, }, { - testutil.TestPoint(float32(11234.5), "test6"), + testutil.TestMetric(float32(11234.5), "test6"), &Gauge{ Name: "test6", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), @@ -130,7 +130,7 @@ func TestBuildGauge(t *testing.T) { nil, }, { - testutil.TestPoint("11234.5", "test7"), + testutil.TestMetric("11234.5", "test7"), &Gauge{ Name: "test7", MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), @@ -161,20 +161,20 @@ func TestBuildGauge(t *testing.T) { } func TestBuildGaugeWithSource(t *testing.T) { - pt1, _ := client.NewPoint( + pt1, _ := telegraf.NewMetric( "test1", map[string]string{"hostname": "192.168.0.1"}, map[string]interface{}{"value": 0.0}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) - pt2, _ := client.NewPoint( + pt2, _ := telegraf.NewMetric( "test2", map[string]string{"hostnam": "192.168.0.1"}, map[string]interface{}{"value": 1.0}, time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), ) var gaugeTests = []struct { - ptIn *client.Point + ptIn telegraf.Metric outGauge *Gauge err error }{ diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 75a992a01..db38ca957 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -10,10 +10,9 @@ import ( "sync" paho "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" - "github.com/influxdata/telegraf" ) const MaxClientIdLen = 8 @@ -79,18 +78,18 @@ func (m *MQTT) Description() string { return "Configuration for MQTT server to send metrics to" } -func (m *MQTT) Write(points []*client.Point) error { +func (m *MQTT) Write(metrics []telegraf.Metric) error { m.Lock() defer m.Unlock() - if len(points) == 0 { + if len(metrics) == 0 { return nil } - hostname, ok := points[0].Tags()["host"] + hostname, ok := metrics[0].Tags()["host"] if !ok { hostname = "" } - for _, p := range points { + for _, p := range metrics { var t []string if m.TopicPrefix != "" { t = append(t, m.TopicPrefix) diff --git a/plugins/outputs/mqtt/mqtt_test.go b/plugins/outputs/mqtt/mqtt_test.go index f25f4497f..25d0ab9e3 100644 --- a/plugins/outputs/mqtt/mqtt_test.go +++ b/plugins/outputs/mqtt/mqtt_test.go @@ -22,6 +22,6 @@ func TestConnectAndWrite(t *testing.T) { require.NoError(t, err) // Verify that we can successfully write data to the mqtt broker - err = m.Write(testutil.MockBatchPoints().Points()) + err = m.Write(testutil.MockMetrics()) require.NoError(t, err) } diff --git a/plugins/outputs/nsq/nsq.go b/plugins/outputs/nsq/nsq.go index 94c636b44..1b354394c 100644 --- a/plugins/outputs/nsq/nsq.go +++ b/plugins/outputs/nsq/nsq.go @@ -2,9 +2,8 @@ package nsq import ( "fmt" - "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" "github.com/nsqio/go-nsq" ) @@ -46,14 +45,12 @@ func (n *NSQ) Description() string { return "Send telegraf measurements to NSQD" } -func (n *NSQ) Write(points []*client.Point) error { - if len(points) == 0 { +func (n *NSQ) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { return nil } - for _, p := range points { - // Combine tags from Point and BatchPoints and grab the resulting - // line-protocol output string to write to NSQ + for _, p := range metrics { value := p.String() err := n.producer.Publish(n.Topic, []byte(value)) diff --git a/plugins/outputs/nsq/nsq_test.go b/plugins/outputs/nsq/nsq_test.go index b2d703a70..0880d0252 100644 --- a/plugins/outputs/nsq/nsq_test.go +++ b/plugins/outputs/nsq/nsq_test.go @@ -23,6 +23,6 @@ func TestConnectAndWrite(t *testing.T) { require.NoError(t, err) // Verify that we can successfully write data to the NSQ daemon - err = n.Write(testutil.MockBatchPoints().Points()) + err = n.Write(testutil.MockMetrics()) require.NoError(t, err) } diff --git a/plugins/outputs/opentsdb/opentsdb.go b/plugins/outputs/opentsdb/opentsdb.go index 480738c26..cbab1ca4e 100644 --- a/plugins/outputs/opentsdb/opentsdb.go +++ b/plugins/outputs/opentsdb/opentsdb.go @@ -8,9 +8,8 @@ import ( "strings" "time" - "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" ) type OpenTSDB struct { @@ -59,8 +58,8 @@ func (o *OpenTSDB) Connect() error { return nil } -func (o *OpenTSDB) Write(points []*client.Point) error { - if len(points) == 0 { +func (o *OpenTSDB) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { return nil } now := time.Now() @@ -74,8 +73,8 @@ func (o *OpenTSDB) Write(points []*client.Point) error { } defer connection.Close() - for _, pt := range points { - for _, metric := range buildMetrics(pt, now, o.Prefix) { + for _, m := range metrics { + for _, metric := range buildMetrics(m, now, o.Prefix) { messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags) if o.Debug { @@ -91,10 +90,10 @@ func (o *OpenTSDB) Write(points []*client.Point) error { return nil } -func buildTags(ptTags map[string]string) []string { - tags := make([]string, len(ptTags)) +func buildTags(mTags map[string]string) []string { + tags := make([]string, len(mTags)) index := 0 - for k, v := range ptTags { + for k, v := range mTags { tags[index] = fmt.Sprintf("%s=%s", k, v) index += 1 } @@ -102,11 +101,11 @@ func buildTags(ptTags map[string]string) []string { return tags } -func buildMetrics(pt *client.Point, now time.Time, prefix string) []*MetricLine { +func buildMetrics(m telegraf.Metric, now time.Time, prefix string) []*MetricLine { ret := []*MetricLine{} - for fieldName, value := range pt.Fields() { + for fieldName, value := range m.Fields() { metric := &MetricLine{ - Metric: fmt.Sprintf("%s%s_%s", prefix, pt.Name(), fieldName), + Metric: fmt.Sprintf("%s%s_%s", prefix, m.Name(), fieldName), Timestamp: now.Unix(), } @@ -116,7 +115,7 @@ func buildMetrics(pt *client.Point, now time.Time, prefix string) []*MetricLine continue } metric.Value = metricValue - tagsSlice := buildTags(pt.Tags()) + tagsSlice := buildTags(m.Tags()) metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " ")) ret = append(ret, metric) } diff --git a/plugins/outputs/opentsdb/opentsdb_test.go b/plugins/outputs/opentsdb/opentsdb_test.go index 92df3fb52..900c9f123 100644 --- a/plugins/outputs/opentsdb/opentsdb_test.go +++ b/plugins/outputs/opentsdb/opentsdb_test.go @@ -54,18 +54,18 @@ func TestWrite(t *testing.T) { require.NoError(t, err) // Verify that we can successfully write data to OpenTSDB - err = o.Write(testutil.MockBatchPoints().Points()) + err = o.Write(testutil.MockMetrics()) require.NoError(t, err) // Verify postive and negative test cases of writing data - bp := testutil.MockBatchPoints() - bp.AddPoint(testutil.TestPoint(float64(1.0), "justametric.float")) - bp.AddPoint(testutil.TestPoint(int64(123456789), "justametric.int")) - bp.AddPoint(testutil.TestPoint(uint64(123456789012345), "justametric.uint")) - bp.AddPoint(testutil.TestPoint("Lorem Ipsum", "justametric.string")) - bp.AddPoint(testutil.TestPoint(float64(42.0), "justametric.anotherfloat")) + metrics := testutil.MockMetrics() + metrics = append(metrics, testutil.TestMetric(float64(1.0), "justametric.float")) + metrics = append(metrics, testutil.TestMetric(int64(123456789), "justametric.int")) + metrics = append(metrics, testutil.TestMetric(uint64(123456789012345), "justametric.uint")) + metrics = append(metrics, testutil.TestMetric("Lorem Ipsum", "justametric.string")) + metrics = append(metrics, testutil.TestMetric(float64(42.0), "justametric.anotherfloat")) - err = o.Write(bp.Points()) + err = o.Write(metrics) require.NoError(t, err) } diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index 34f85bb23..696b3ccb7 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -5,9 +5,8 @@ import ( "log" "net/http" - "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" "github.com/prometheus/client_golang/prometheus" ) @@ -59,12 +58,12 @@ func (p *PrometheusClient) Description() string { return "Configuration for the Prometheus client to spawn" } -func (p *PrometheusClient) Write(points []*client.Point) error { - if len(points) == 0 { +func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { return nil } - for _, point := range points { + for _, point := range metrics { var labels []string key := point.Name() diff --git a/plugins/outputs/prometheus_client/prometheus_client_test.go b/plugins/outputs/prometheus_client/prometheus_client_test.go index 73163ee1d..34ef77e4c 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_test.go @@ -5,7 +5,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs/prometheus" "github.com/influxdata/telegraf/testutil" ) @@ -21,19 +21,19 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { Urls: []string{"http://localhost:9126/metrics"}, } tags := make(map[string]string) - pt1, _ := client.NewPoint( + pt1, _ := telegraf.NewMetric( "test_point_1", tags, map[string]interface{}{"value": 0.0}) - pt2, _ := client.NewPoint( + pt2, _ := telegraf.NewMetric( "test_point_2", tags, map[string]interface{}{"value": 1.0}) - var points = []*client.Point{ + var metrics = []telegraf.Metric{ pt1, pt2, } - require.NoError(t, pTesting.Write(points)) + require.NoError(t, pTesting.Write(metrics)) expected := []struct { name string @@ -63,19 +63,19 @@ func TestPrometheusWritePointTag(t *testing.T) { } tags := make(map[string]string) tags["testtag"] = "testvalue" - pt1, _ := client.NewPoint( + pt1, _ := telegraf.NewMetric( "test_point_3", tags, map[string]interface{}{"value": 0.0}) - pt2, _ := client.NewPoint( + pt2, _ := telegraf.NewMetric( "test_point_4", tags, map[string]interface{}{"value": 1.0}) - var points = []*client.Point{ + var metrics = []telegraf.Metric{ pt1, pt2, } - require.NoError(t, pTesting.Write(points)) + require.NoError(t, pTesting.Write(metrics)) expected := []struct { name string diff --git a/plugins/outputs/riemann/riemann.go b/plugins/outputs/riemann/riemann.go index 50c1555f7..6636af852 100644 --- a/plugins/outputs/riemann/riemann.go +++ b/plugins/outputs/riemann/riemann.go @@ -6,9 +6,8 @@ import ( "os" "github.com/amir/raidman" - "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" ) type Riemann struct { @@ -49,13 +48,13 @@ func (r *Riemann) Description() string { return "Configuration for the Riemann server to send metrics to" } -func (r *Riemann) Write(points []*client.Point) error { - if len(points) == 0 { +func (r *Riemann) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { return nil } var events []*raidman.Event - for _, p := range points { + for _, p := range metrics { evs := buildEvents(p) for _, ev := range evs { events = append(events, ev) @@ -71,7 +70,7 @@ func (r *Riemann) Write(points []*client.Point) error { return nil } -func buildEvents(p *client.Point) []*raidman.Event { +func buildEvents(p telegraf.Metric) []*raidman.Event { events := []*raidman.Event{} for fieldName, value := range p.Fields() { host, ok := p.Tags()["host"] diff --git a/plugins/outputs/riemann/riemann_test.go b/plugins/outputs/riemann/riemann_test.go index 8b3f27ac0..b599cdf60 100644 --- a/plugins/outputs/riemann/riemann_test.go +++ b/plugins/outputs/riemann/riemann_test.go @@ -22,6 +22,6 @@ func TestConnectAndWrite(t *testing.T) { err := r.Connect() require.NoError(t, err) - err = r.Write(testutil.MockBatchPoints().Points()) + err = r.Write(testutil.MockMetrics()) require.NoError(t, err) } diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 05363e28c..7101db091 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -11,15 +11,15 @@ import ( "github.com/stretchr/testify/assert" ) -// Point defines a single point measurement -type Point struct { +// Metric defines a single point measurement +type Metric struct { Measurement string Tags map[string]string Fields map[string]interface{} Time time.Time } -func (p *Point) String() string { +func (p *Metric) String() string { return fmt.Sprintf("%s %v", p.Measurement, p.Fields) } @@ -27,8 +27,8 @@ func (p *Point) String() string { type Accumulator struct { sync.Mutex - Points []*Point - debug bool + Metrics []*Metric + debug bool } // Add adds a measurement point to the accumulator @@ -74,14 +74,14 @@ func (a *Accumulator) AddFields( fmt.Print(msg) } - p := &Point{ + p := &Metric{ Measurement: measurement, Fields: fields, Tags: tags, Time: t, } - a.Points = append(a.Points, p) + a.Metrics = append(a.Metrics, p) } func (a *Accumulator) Debug() bool { @@ -95,8 +95,8 @@ func (a *Accumulator) SetDebug(debug bool) { } // Get gets the specified measurement point from the accumulator -func (a *Accumulator) Get(measurement string) (*Point, bool) { - for _, p := range a.Points { +func (a *Accumulator) Get(measurement string) (*Metric, bool) { + for _, p := range a.Metrics { if p.Measurement == measurement { return p, true } @@ -109,7 +109,7 @@ func (a *Accumulator) Get(measurement string) (*Point, bool) { // measurements func (a *Accumulator) NFields() int { counter := 0 - for _, pt := range a.Points { + for _, pt := range a.Metrics { for _, _ = range pt.Fields { counter++ } @@ -123,7 +123,7 @@ func (a *Accumulator) AssertContainsTaggedFields( fields map[string]interface{}, tags map[string]string, ) { - for _, p := range a.Points { + for _, p := range a.Metrics { if !reflect.DeepEqual(tags, p.Tags) { continue } @@ -148,7 +148,7 @@ func (a *Accumulator) AssertContainsFields( measurement string, fields map[string]interface{}, ) { - for _, p := range a.Points { + for _, p := range a.Metrics { if p.Measurement == measurement { if !reflect.DeepEqual(fields, p.Fields) { pActual, _ := json.MarshalIndent(p.Fields, "", " ") @@ -166,7 +166,7 @@ func (a *Accumulator) AssertContainsFields( // HasIntValue returns true if the measurement has an Int value func (a *Accumulator) HasIntField(measurement string, field string) bool { - for _, p := range a.Points { + for _, p := range a.Metrics { if p.Measurement == measurement { for fieldname, value := range p.Fields { if fieldname == field { @@ -182,7 +182,7 @@ func (a *Accumulator) HasIntField(measurement string, field string) bool { // HasUIntValue returns true if the measurement has a UInt value func (a *Accumulator) HasUIntField(measurement string, field string) bool { - for _, p := range a.Points { + for _, p := range a.Metrics { if p.Measurement == measurement { for fieldname, value := range p.Fields { if fieldname == field { @@ -198,7 +198,7 @@ func (a *Accumulator) HasUIntField(measurement string, field string) bool { // HasFloatValue returns true if the given measurement has a float value func (a *Accumulator) HasFloatField(measurement string, field string) bool { - for _, p := range a.Points { + for _, p := range a.Metrics { if p.Measurement == measurement { for fieldname, value := range p.Fields { if fieldname == field { @@ -215,7 +215,7 @@ func (a *Accumulator) HasFloatField(measurement string, field string) bool { // HasMeasurement returns true if the accumulator has a measurement with the // given name func (a *Accumulator) HasMeasurement(measurement string) bool { - for _, p := range a.Points { + for _, p := range a.Metrics { if p.Measurement == measurement { return true } diff --git a/testutil/testutil.go b/testutil/testutil.go index 436b57361..c09e3d9e8 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -6,7 +6,7 @@ import ( "os" "time" - "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf" ) var localhost = "localhost" @@ -31,21 +31,21 @@ func GetLocalHost() string { return localhost } -// MockBatchPoints returns a mock BatchPoints object for using in unit tests +// MockMetrics returns a mock []telegraf.Metric object for using in unit tests // of telegraf output sinks. -func MockBatchPoints() client.BatchPoints { +func MockMetrics() []telegraf.Metric { + metrics := make([]telegraf.Metric, 0) // Create a new point batch - bp, _ := client.NewBatchPoints(client.BatchPointsConfig{}) - bp.AddPoint(TestPoint(1.0)) - return bp + metrics = append(metrics, TestMetric(1.0)) + return metrics } -// TestPoint Returns a simple test point: +// TestMetric Returns a simple test point: // measurement -> "test1" or name // tags -> "tag1":"value1" // value -> value // time -> time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) -func TestPoint(value interface{}, name ...string) *client.Point { +func TestMetric(value interface{}, name ...string) telegraf.Metric { if value == nil { panic("Cannot use a nil value") } @@ -54,7 +54,7 @@ func TestPoint(value interface{}, name ...string) *client.Point { measurement = name[0] } tags := map[string]string{"tag1": "value1"} - pt, _ := client.NewPoint( + pt, _ := telegraf.NewMetric( measurement, tags, map[string]interface{}{"value": value},