telegraf/testutil/accumulator.go

737 lines
16 KiB
Go

package testutil
import (
"encoding/json"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert"
)
var (
lastID uint64
)
func newTrackingID() telegraf.TrackingID {
id := atomic.AddUint64(&lastID, 1)
return telegraf.TrackingID(id)
}
// Metric defines a single point measurement
type Metric struct {
Measurement string
Tags map[string]string
Fields map[string]interface{}
Time time.Time
Type telegraf.ValueType
}
func (p *Metric) String() string {
return fmt.Sprintf("%s %v %v", p.Measurement, p.Tags, p.Fields)
}
// Accumulator defines a mocked out accumulator
type Accumulator struct {
sync.Mutex
*sync.Cond
Metrics []*Metric
nMetrics uint64
Discard bool
Errors []error
debug bool
delivered chan telegraf.DeliveryInfo
TimeFunc func() time.Time
}
func (a *Accumulator) NMetrics() uint64 {
return atomic.LoadUint64(&a.nMetrics)
}
func (a *Accumulator) GetTelegrafMetrics() []telegraf.Metric {
metrics := []telegraf.Metric{}
for _, m := range a.Metrics {
metrics = append(metrics, FromTestMetric(m))
}
return metrics
}
func (a *Accumulator) FirstError() error {
if len(a.Errors) == 0 {
return nil
}
return a.Errors[0]
}
func (a *Accumulator) ClearMetrics() {
a.Lock()
defer a.Unlock()
atomic.StoreUint64(&a.nMetrics, 0)
a.Metrics = make([]*Metric, 0)
}
func (a *Accumulator) addFields(
measurement string,
tags map[string]string,
fields map[string]interface{},
tp telegraf.ValueType,
timestamp ...time.Time,
) {
a.Lock()
defer a.Unlock()
atomic.AddUint64(&a.nMetrics, 1)
if a.Cond != nil {
a.Cond.Broadcast()
}
if a.Discard {
return
}
if len(fields) == 0 {
return
}
tagsCopy := map[string]string{}
for k, v := range tags {
tagsCopy[k] = v
}
fieldsCopy := map[string]interface{}{}
for k, v := range fields {
fieldsCopy[k] = v
}
var t time.Time
if len(timestamp) > 0 {
t = timestamp[0]
} else {
t = time.Now()
if a.TimeFunc == nil {
t = time.Now()
} else {
t = a.TimeFunc()
}
}
if a.debug {
pretty, _ := json.MarshalIndent(fields, "", " ")
prettyTags, _ := json.MarshalIndent(tags, "", " ")
msg := fmt.Sprintf("Adding Measurement [%s]\nFields:%s\nTags:%s\n",
measurement, string(pretty), string(prettyTags))
fmt.Print(msg)
}
p := &Metric{
Measurement: measurement,
Fields: fieldsCopy,
Tags: tagsCopy,
Time: t,
Type: tp,
}
a.Metrics = append(a.Metrics, p)
}
// AddFields adds a measurement point with a specified timestamp.
func (a *Accumulator) AddFields(
measurement string,
fields map[string]interface{},
tags map[string]string,
timestamp ...time.Time,
) {
a.addFields(measurement, tags, fields, telegraf.Untyped, timestamp...)
}
func (a *Accumulator) AddCounter(
measurement string,
fields map[string]interface{},
tags map[string]string,
timestamp ...time.Time,
) {
a.addFields(measurement, tags, fields, telegraf.Counter, timestamp...)
}
func (a *Accumulator) AddGauge(
measurement string,
fields map[string]interface{},
tags map[string]string,
timestamp ...time.Time,
) {
a.addFields(measurement, tags, fields, telegraf.Gauge, timestamp...)
}
func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) {
for _, m := range metrics {
a.addFields(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time())
}
}
func (a *Accumulator) AddSummary(
measurement string,
fields map[string]interface{},
tags map[string]string,
timestamp ...time.Time,
) {
a.addFields(measurement, tags, fields, telegraf.Summary, timestamp...)
}
func (a *Accumulator) AddHistogram(
measurement string,
fields map[string]interface{},
tags map[string]string,
timestamp ...time.Time,
) {
a.addFields(measurement, tags, fields, telegraf.Histogram, timestamp...)
}
func (a *Accumulator) AddMetric(m telegraf.Metric) {
a.addFields(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time())
}
func (a *Accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator {
return a
}
func (a *Accumulator) AddTrackingMetric(m telegraf.Metric) telegraf.TrackingID {
a.AddMetric(m)
return newTrackingID()
}
func (a *Accumulator) AddTrackingMetricGroup(group []telegraf.Metric) telegraf.TrackingID {
for _, m := range group {
a.AddMetric(m)
}
return newTrackingID()
}
func (a *Accumulator) Delivered() <-chan telegraf.DeliveryInfo {
if a.delivered == nil {
a.delivered = make(chan telegraf.DeliveryInfo)
}
return a.delivered
}
// AddError appends the given error to Accumulator.Errors.
func (a *Accumulator) AddError(err error) {
if err == nil {
return
}
a.Lock()
a.Errors = append(a.Errors, err)
if a.Cond != nil {
a.Cond.Broadcast()
}
a.Unlock()
}
func (a *Accumulator) SetPrecision(precision time.Duration) {
return
}
func (a *Accumulator) DisablePrecision() {
return
}
func (a *Accumulator) Debug() bool {
// stub for implementing Accumulator interface.
return a.debug
}
func (a *Accumulator) SetDebug(debug bool) {
// stub for implementing Accumulator interface.
a.debug = debug
}
// Get gets the specified measurement point from the accumulator
func (a *Accumulator) Get(measurement string) (*Metric, bool) {
for _, p := range a.Metrics {
if p.Measurement == measurement {
return p, true
}
}
return nil, false
}
func (a *Accumulator) HasTag(measurement string, key string) bool {
for _, p := range a.Metrics {
if p.Measurement == measurement {
_, ok := p.Tags[key]
return ok
}
}
return false
}
func (a *Accumulator) TagSetValue(measurement string, key string) string {
for _, p := range a.Metrics {
if p.Measurement == measurement {
v, ok := p.Tags[key]
if ok {
return v
}
}
}
return ""
}
func (a *Accumulator) TagValue(measurement string, key string) string {
for _, p := range a.Metrics {
if p.Measurement == measurement {
v, ok := p.Tags[key]
if !ok {
return ""
}
return v
}
}
return ""
}
// Calls the given Gather function and returns the first error found.
func (a *Accumulator) GatherError(gf func(telegraf.Accumulator) error) error {
if err := gf(a); err != nil {
return err
}
if len(a.Errors) > 0 {
return a.Errors[0]
}
return nil
}
// NFields returns the total number of fields in the accumulator, across all
// measurements
func (a *Accumulator) NFields() int {
a.Lock()
defer a.Unlock()
counter := 0
for _, pt := range a.Metrics {
for range pt.Fields {
counter++
}
}
return counter
}
// Wait waits for the given number of metrics to be added to the accumulator.
func (a *Accumulator) Wait(n int) {
a.Lock()
if a.Cond == nil {
a.Cond = sync.NewCond(&a.Mutex)
}
for int(a.NMetrics()) < n {
a.Cond.Wait()
}
a.Unlock()
}
// WaitError waits for the given number of errors to be added to the accumulator.
func (a *Accumulator) WaitError(n int) {
a.Lock()
if a.Cond == nil {
a.Cond = sync.NewCond(&a.Mutex)
}
for len(a.Errors) < n {
a.Cond.Wait()
}
a.Unlock()
}
func (a *Accumulator) AssertContainsTaggedFields(
t *testing.T,
measurement string,
fields map[string]interface{},
tags map[string]string,
) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if !reflect.DeepEqual(tags, p.Tags) {
continue
}
if p.Measurement == measurement && reflect.DeepEqual(fields, p.Fields) {
return
}
}
msg := fmt.Sprintf("unknown measurement %s with tags %v", measurement, tags)
assert.Fail(t, msg)
}
func (a *Accumulator) AssertDoesNotContainsTaggedFields(
t *testing.T,
measurement string,
fields map[string]interface{},
tags map[string]string,
) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if !reflect.DeepEqual(tags, p.Tags) {
continue
}
if p.Measurement == measurement && reflect.DeepEqual(fields, p.Fields) {
msg := fmt.Sprintf(
"found measurement %s with tagged fields (tags %v) which should not be there",
measurement, tags)
assert.Fail(t, msg)
}
}
return
}
func (a *Accumulator) AssertContainsFields(
t *testing.T,
measurement string,
fields map[string]interface{},
) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
assert.Equal(t, fields, p.Fields)
return
}
}
msg := fmt.Sprintf("unknown measurement %s", measurement)
assert.Fail(t, msg)
}
func (a *Accumulator) HasPoint(
measurement string,
tags map[string]string,
fieldKey string,
fieldValue interface{},
) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement != measurement {
continue
}
if !reflect.DeepEqual(tags, p.Tags) {
continue
}
v, ok := p.Fields[fieldKey]
if ok && reflect.DeepEqual(v, fieldValue) {
return true
}
}
return false
}
func (a *Accumulator) AssertDoesNotContainMeasurement(t *testing.T, measurement string) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
msg := fmt.Sprintf("found unexpected measurement %s", measurement)
assert.Fail(t, msg)
}
}
}
// HasTimestamp returns true if the measurement has a matching Time value
func (a *Accumulator) HasTimestamp(measurement string, timestamp time.Time) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
return timestamp.Equal(p.Time)
}
}
return false
}
// HasField returns true if the given measurement has a field with the given
// name
func (a *Accumulator) HasField(measurement string, field string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
if _, ok := p.Fields[field]; ok {
return true
}
}
}
return false
}
// HasIntField returns true if the measurement has an Int value
func (a *Accumulator) HasIntField(measurement string, field string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
_, ok := value.(int)
return ok
}
}
}
}
return false
}
// HasInt64Field returns true if the measurement has an Int64 value
func (a *Accumulator) HasInt64Field(measurement string, field string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
_, ok := value.(int64)
return ok
}
}
}
}
return false
}
// HasInt32Field returns true if the measurement has an Int value
func (a *Accumulator) HasInt32Field(measurement string, field string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
_, ok := value.(int32)
return ok
}
}
}
}
return false
}
// HasStringField returns true if the measurement has an String value
func (a *Accumulator) HasStringField(measurement string, field string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
_, ok := value.(string)
return ok
}
}
}
}
return false
}
// HasUIntField returns true if the measurement has a UInt value
func (a *Accumulator) HasUIntField(measurement string, field string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
_, ok := value.(uint64)
return ok
}
}
}
}
return false
}
// HasFloatField returns true if the given measurement has a float value
func (a *Accumulator) HasFloatField(measurement string, field string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
_, ok := value.(float64)
return ok
}
}
}
}
return false
}
// HasMeasurement returns true if the accumulator has a measurement with the
// given name
func (a *Accumulator) HasMeasurement(measurement string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
return true
}
}
return false
}
// IntField returns the int value of the given measurement and field or false.
func (a *Accumulator) IntField(measurement string, field string) (int, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(int)
return v, ok
}
}
}
}
return 0, false
}
// Int64Field returns the int64 value of the given measurement and field or false.
func (a *Accumulator) Int64Field(measurement string, field string) (int64, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(int64)
return v, ok
}
}
}
}
return 0, false
}
// Uint64Field returns the int64 value of the given measurement and field or false.
func (a *Accumulator) Uint64Field(measurement string, field string) (uint64, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(uint64)
return v, ok
}
}
}
}
return 0, false
}
// Int32Field returns the int32 value of the given measurement and field or false.
func (a *Accumulator) Int32Field(measurement string, field string) (int32, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(int32)
return v, ok
}
}
}
}
return 0, false
}
// FloatField returns the float64 value of the given measurement and field or false.
func (a *Accumulator) FloatField(measurement string, field string) (float64, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(float64)
return v, ok
}
}
}
}
return 0.0, false
}
// StringField returns the string value of the given measurement and field or false.
func (a *Accumulator) StringField(measurement string, field string) (string, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(string)
return v, ok
}
}
}
}
return "", false
}
// BoolField returns the bool value of the given measurement and field or false.
func (a *Accumulator) BoolField(measurement string, field string) (bool, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
for fieldname, value := range p.Fields {
if fieldname == field {
v, ok := value.(bool)
return v, ok
}
}
}
}
return false, false
}
// NopAccumulator is used for benchmarking to isolate the plugin from the internal
// telegraf accumulator machinery.
type NopAccumulator struct{}
func (n *NopAccumulator) AddFields(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) {
}
func (n *NopAccumulator) AddGauge(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) {
}
func (n *NopAccumulator) AddCounter(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) {
}
func (n *NopAccumulator) AddSummary(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) {
}
func (n *NopAccumulator) AddHistogram(measurement string, fields map[string]interface{}, tags map[string]string, t ...time.Time) {
}
func (n *NopAccumulator) AddMetric(telegraf.Metric) {}
func (n *NopAccumulator) SetPrecision(precision time.Duration) {}
func (n *NopAccumulator) AddError(err error) {}
func (n *NopAccumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator { return nil }