From 2df8dd6dbd7fb07b9e981a06327137674031a143 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 22 Jan 2016 17:21:57 -0700 Subject: [PATCH] Use an internal 'Metric' data type for telegraf metrics As of now, this is pretty much just a wrapper around client.Point, but this gives latitude to expand functionality more easily. closes #564 --- models/accumulator.go | 18 ++++++++ models/input.go | 31 +++++++++++++ models/metric.go | 105 ++++++++++++++++++++++++++++++++++++++++++ models/output.go | 31 +++++++++++++ 4 files changed, 185 insertions(+) create mode 100644 models/accumulator.go create mode 100644 models/input.go create mode 100644 models/metric.go create mode 100644 models/output.go diff --git a/models/accumulator.go b/models/accumulator.go new file mode 100644 index 000000000..8add896f3 --- /dev/null +++ b/models/accumulator.go @@ -0,0 +1,18 @@ +package models + +import "time" + +type Accumulator interface { + // Create a point with a value, decorating it with tags + // NOTE: tags is expected to be owned by the caller, don't mutate + // it after passing to Add. + Add(measurement string, + value interface{}, + tags map[string]string, + t ...time.Time) + + AddFields(measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time) +} diff --git a/models/input.go b/models/input.go new file mode 100644 index 000000000..e9fa48aef --- /dev/null +++ b/models/input.go @@ -0,0 +1,31 @@ +package models + +type Input interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + // Gather takes in an accumulator and adds the metrics that the Input + // gathers. This is called every "interval" + Gather(Accumulator) error +} + +type ServiceInput interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + // Gather takes in an accumulator and adds the metrics that the Input + // gathers. This is called every "interval" + Gather(Accumulator) error + + // Start starts the ServiceInput's service, whatever that may be + Start() error + + // Stop stops the services and closes any necessary channels and connections + Stop() +} diff --git a/models/metric.go b/models/metric.go new file mode 100644 index 000000000..c242f6866 --- /dev/null +++ b/models/metric.go @@ -0,0 +1,105 @@ +package models + +import ( + "time" + + "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/influxdb/models" +) + +type Metric interface { + // Name returns the measurement name of the metric + Name() string + + // Name returns the tags associated with the metric + Tags() map[string]string + + // Time return the timestamp for the metric + Time() time.Time + + // UnixNano returns the unix nano time of the metric + UnixNano() int64 + + // Fields returns the fields for the metric + Fields() map[string]interface{} + + // String returns a line-protocol string of the metric + String() string + + // PrecisionString returns a line-protocol string of the metric, at precision + PrecisionString(precison string) string +} + +// metric is a wrapper of the influxdb client.Point struct +type metric struct { + pt *client.Point +} + +// NewMetric returns a metric with the given timestamp. If a timestamp is not +// given, then data is sent to the database without a timestamp, in which case +// the server will assign local time upon reception. NOTE: it is recommended to +// send data with a timestamp. +func NewMetric( + name string, + tags map[string]string, + fields map[string]interface{}, + t ...time.Time, +) (Metric, error) { + var T time.Time + if len(t) > 0 { + T = t[0] + } + + pt, err := client.NewPoint(name, tags, fields, T) + if err != nil { + return nil, err + } + return &metric{ + pt: pt, + }, nil +} + +// ParseMetrics returns a slice of Metrics from a text representation of a +// metric (in line-protocol format) +// with each metric separated by newlines. If any metrics fail to parse, +// a non-nil error will be returned in addition to the metrics that parsed +// successfully. +func ParseMetrics(buf []byte) ([]Metric, error) { + points, err := models.ParsePoints(buf) + metrics := make([]Metric, len(points)) + for i, point := range points { + // Ignore error here because it's impossible that a model.Point + // wouldn't parse into client.Point properly + metrics[i], _ = NewMetric(point.Name(), point.Tags(), + point.Fields(), point.Time()) + } + return metrics, err +} + +func (m *metric) Name() string { + return m.pt.Name() +} + +func (m *metric) Tags() map[string]string { + return m.pt.Tags() +} + +func (m *metric) Time() time.Time { + return m.pt.Time() +} + +func (m *metric) UnixNano() int64 { + return m.pt.UnixNano() +} + +func (m *metric) Fields() map[string]interface{} { + return m.pt.Fields() +} + +func (m *metric) String() string { + return m.pt.String() +} + +func (m *metric) PrecisionString(precison string) string { + return m.pt.PrecisionString(precison) +} diff --git a/models/output.go b/models/output.go new file mode 100644 index 000000000..4b886acc2 --- /dev/null +++ b/models/output.go @@ -0,0 +1,31 @@ +package models + +type Output interface { + // Connect to the Output + Connect() error + // Close any connections to the Output + Close() error + // Description returns a one-sentence description on the Output + Description() string + // SampleConfig returns the default configuration of the Output + SampleConfig() string + // Write takes in group of points to be written to the Output + Write(metrics []*Metric) error +} + +type ServiceOutput interface { + // Connect to the Output + Connect() error + // Close any connections to the Output + Close() error + // Description returns a one-sentence description on the Output + Description() string + // SampleConfig returns the default configuration of the Output + SampleConfig() string + // Write takes in group of points to be written to the Output + Write(metrics []*Metric) error + // Start the "service" that will provide an Output + Start() error + // Stop the "service" that will provide an Output + Stop() +}