Support Processor & Aggregator Plugins

closes #1726
This commit is contained in:
Cameron Sparr 2016-09-08 15:22:10 +01:00
parent 974221f0cf
commit 64a71263a1
28 changed files with 1832 additions and 654 deletions

View File

@ -2,9 +2,8 @@ package telegraf
import "time"
// Accumulator is an interface for "accumulating" metrics from input plugin(s).
// The metrics are sent down a channel shared between all input plugins and then
// flushed on the configured flush_interval.
// Accumulator is an interface for "accumulating" metrics from plugin(s).
// The metrics are sent down a channel shared between all plugins.
type Accumulator interface {
// AddFields adds a metric to the accumulator with the given measurement
// name, fields, and tags (and timestamp). If a timestamp is not provided,
@ -29,12 +28,7 @@ type Accumulator interface {
tags map[string]string,
t ...time.Time)
AddError(err error)
Debug() bool
SetDebug(enabled bool)
SetPrecision(precision, interval time.Duration)
DisablePrecision()
AddError(err error)
}

View File

@ -1,37 +1,40 @@
package agent
import (
"fmt"
"log"
"math"
"sync/atomic"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/models"
)
type MetricMaker interface {
Name() string
MakeMetric(
measurement string,
fields map[string]interface{},
tags map[string]string,
mType telegraf.ValueType,
t time.Time,
) telegraf.Metric
}
func NewAccumulator(
inputConfig *models.InputConfig,
maker MetricMaker,
metrics chan telegraf.Metric,
) *accumulator {
acc := accumulator{}
acc.metrics = metrics
acc.inputConfig = inputConfig
acc.precision = time.Nanosecond
acc := accumulator{
maker: maker,
metrics: metrics,
precision: time.Nanosecond,
}
return &acc
}
type accumulator struct {
metrics chan telegraf.Metric
defaultTags map[string]string
debug bool
// print every point added to the accumulator
trace bool
inputConfig *models.InputConfig
maker MetricMaker
precision time.Duration
@ -44,7 +47,7 @@ func (ac *accumulator) AddFields(
tags map[string]string,
t ...time.Time,
) {
if m := ac.makeMetric(measurement, fields, tags, telegraf.Untyped, t...); m != nil {
if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Untyped, ac.getTime(t)); m != nil {
ac.metrics <- m
}
}
@ -55,7 +58,7 @@ func (ac *accumulator) AddGauge(
tags map[string]string,
t ...time.Time,
) {
if m := ac.makeMetric(measurement, fields, tags, telegraf.Gauge, t...); m != nil {
if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Gauge, ac.getTime(t)); m != nil {
ac.metrics <- m
}
}
@ -66,114 +69,11 @@ func (ac *accumulator) AddCounter(
tags map[string]string,
t ...time.Time,
) {
if m := ac.makeMetric(measurement, fields, tags, telegraf.Counter, t...); m != nil {
if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Counter, ac.getTime(t)); m != nil {
ac.metrics <- m
}
}
// makeMetric either returns a metric, or returns nil if the metric doesn't
// need to be created (because of filtering, an error, etc.)
func (ac *accumulator) makeMetric(
measurement string,
fields map[string]interface{},
tags map[string]string,
mType telegraf.ValueType,
t ...time.Time,
) telegraf.Metric {
if len(fields) == 0 || len(measurement) == 0 {
return nil
}
if tags == nil {
tags = make(map[string]string)
}
// Override measurement name if set
if len(ac.inputConfig.NameOverride) != 0 {
measurement = ac.inputConfig.NameOverride
}
// Apply measurement prefix and suffix if set
if len(ac.inputConfig.MeasurementPrefix) != 0 {
measurement = ac.inputConfig.MeasurementPrefix + measurement
}
if len(ac.inputConfig.MeasurementSuffix) != 0 {
measurement = measurement + ac.inputConfig.MeasurementSuffix
}
// Apply plugin-wide tags if set
for k, v := range ac.inputConfig.Tags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
// Apply daemon-wide tags if set
for k, v := range ac.defaultTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
// Apply the metric filter(s)
if ok := ac.inputConfig.Filter.Apply(measurement, fields, tags); !ok {
return nil
}
for k, v := range fields {
// Validate uint64 and float64 fields
switch val := v.(type) {
case uint64:
// InfluxDB does not support writing uint64
if val < uint64(9223372036854775808) {
fields[k] = int64(val)
} else {
fields[k] = int64(9223372036854775807)
}
continue
case float64:
// NaNs are invalid values in influxdb, skip measurement
if math.IsNaN(val) || math.IsInf(val, 0) {
if ac.debug {
log.Printf("I! Measurement [%s] field [%s] has a NaN or Inf "+
"field, skipping",
measurement, k)
}
delete(fields, k)
continue
}
}
fields[k] = v
}
var timestamp time.Time
if len(t) > 0 {
timestamp = t[0]
} else {
timestamp = time.Now()
}
timestamp = timestamp.Round(ac.precision)
var m telegraf.Metric
var err error
switch mType {
case telegraf.Counter:
m, err = telegraf.NewCounterMetric(measurement, tags, fields, timestamp)
case telegraf.Gauge:
m, err = telegraf.NewGaugeMetric(measurement, tags, fields, timestamp)
default:
m, err = telegraf.NewMetric(measurement, tags, fields, timestamp)
}
if err != nil {
log.Printf("E! Error adding point [%s]: %s\n", measurement, err.Error())
return nil
}
if ac.trace {
fmt.Println("> " + m.String())
}
return m
}
// AddError passes a runtime error to the accumulator.
// The error will be tagged with the plugin name and written to the log.
func (ac *accumulator) AddError(err error) {
@ -182,23 +82,7 @@ func (ac *accumulator) AddError(err error) {
}
atomic.AddUint64(&ac.errCount, 1)
//TODO suppress/throttle consecutive duplicate errors?
log.Printf("E! Error in input [%s]: %s", ac.inputConfig.Name, err)
}
func (ac *accumulator) Debug() bool {
return ac.debug
}
func (ac *accumulator) SetDebug(debug bool) {
ac.debug = debug
}
func (ac *accumulator) Trace() bool {
return ac.trace
}
func (ac *accumulator) SetTrace(trace bool) {
ac.trace = trace
log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err)
}
// SetPrecision takes two time.Duration objects. If the first is non-zero,
@ -222,17 +106,12 @@ func (ac *accumulator) SetPrecision(precision, interval time.Duration) {
}
}
func (ac *accumulator) DisablePrecision() {
ac.precision = time.Nanosecond
func (ac accumulator) getTime(t []time.Time) time.Time {
var timestamp time.Time
if len(t) > 0 {
timestamp = t[0]
} else {
timestamp = time.Now()
return timestamp.Round(ac.precision)
}
func (ac *accumulator) setDefaultTags(tags map[string]string) {
ac.defaultTags = tags
}
func (ac *accumulator) addDefaultTag(key, value string) {
if ac.defaultTags == nil {
ac.defaultTags = make(map[string]string)
}
ac.defaultTags[key] = value
}

View File

@ -4,24 +4,21 @@ import (
"bytes"
"fmt"
"log"
"math"
"os"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAdd(t *testing.T) {
a := accumulator{}
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
a := NewAccumulator(&TestMetricMaker{}, metrics)
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
@ -33,97 +30,142 @@ func TestAdd(t *testing.T) {
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"}, now)
testm := <-a.metrics
testm := <-metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")
testm = <-a.metrics
testm = <-metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test value=101")
testm = <-a.metrics
testm = <-metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()),
actual)
}
func TestAddGauge(t *testing.T) {
a := accumulator{}
func TestAddFields(t *testing.T) {
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
a := NewAccumulator(&TestMetricMaker{}, metrics)
a.AddGauge("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{})
a.AddGauge("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"})
a.AddGauge("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"}, now)
fields := map[string]interface{}{
"usage": float64(99),
}
a.AddFields("acctest", fields, map[string]string{})
a.AddGauge("acctest", fields, map[string]string{"acc": "test"})
a.AddCounter("acctest", fields, map[string]string{"acc": "test"}, now)
testm := <-a.metrics
testm := <-metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")
assert.Equal(t, testm.Type(), telegraf.Gauge)
assert.Contains(t, actual, "acctest usage=99")
testm = <-a.metrics
testm = <-metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test value=101")
assert.Equal(t, testm.Type(), telegraf.Gauge)
assert.Contains(t, actual, "acctest,acc=test usage=99")
testm = <-a.metrics
testm = <-metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()),
fmt.Sprintf("acctest,acc=test usage=99 %d", now.UnixNano()),
actual)
assert.Equal(t, testm.Type(), telegraf.Gauge)
}
func TestAddCounter(t *testing.T) {
a := accumulator{}
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
func TestAccAddError(t *testing.T) {
errBuf := bytes.NewBuffer(nil)
log.SetOutput(errBuf)
defer log.SetOutput(os.Stderr)
a.AddCounter("acctest",
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
a := NewAccumulator(&TestMetricMaker{}, metrics)
a.AddError(fmt.Errorf("foo"))
a.AddError(fmt.Errorf("bar"))
a.AddError(fmt.Errorf("baz"))
errs := bytes.Split(errBuf.Bytes(), []byte{'\n'})
assert.EqualValues(t, 3, a.errCount)
require.Len(t, errs, 4) // 4 because of trailing newline
assert.Contains(t, string(errs[0]), "TestPlugin")
assert.Contains(t, string(errs[0]), "foo")
assert.Contains(t, string(errs[1]), "TestPlugin")
assert.Contains(t, string(errs[1]), "bar")
assert.Contains(t, string(errs[2]), "TestPlugin")
assert.Contains(t, string(errs[2]), "baz")
}
func TestAddNoIntervalWithPrecision(t *testing.T) {
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
a := NewAccumulator(&TestMetricMaker{}, metrics)
a.SetPrecision(0, time.Second)
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{})
a.AddCounter("acctest",
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"})
a.AddCounter("acctest",
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"}, now)
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")
assert.Equal(t, testm.Type(), telegraf.Counter)
testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test value=101")
assert.Equal(t, testm.Type(), telegraf.Counter)
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()),
fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)),
actual)
}
func TestAddDisablePrecision(t *testing.T) {
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
a := NewAccumulator(&TestMetricMaker{}, metrics)
a.SetPrecision(time.Nanosecond, 0)
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{})
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"})
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"}, now)
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")
testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test value=101")
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082912748)),
actual)
assert.Equal(t, testm.Type(), telegraf.Counter)
}
func TestAddNoPrecisionWithInterval(t *testing.T) {
a := accumulator{}
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
a := NewAccumulator(&TestMetricMaker{}, metrics)
a.SetPrecision(0, time.Second)
a.AddFields("acctest",
@ -151,79 +193,11 @@ func TestAddNoPrecisionWithInterval(t *testing.T) {
actual)
}
func TestAddNoIntervalWithPrecision(t *testing.T) {
a := accumulator{}
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
a.SetPrecision(time.Second, time.Millisecond)
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{})
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"})
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"}, now)
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")
testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test value=101")
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)),
actual)
}
func TestAddDisablePrecision(t *testing.T) {
a := accumulator{}
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
a.SetPrecision(time.Second, time.Millisecond)
a.DisablePrecision()
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{})
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"})
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"}, now)
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")
testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test value=101")
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082912748)),
actual)
}
func TestDifferentPrecisions(t *testing.T) {
a := accumulator{}
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
a := NewAccumulator(&TestMetricMaker{}, metrics)
a.SetPrecision(0, time.Second)
a.AddFields("acctest",
@ -266,349 +240,100 @@ func TestDifferentPrecisions(t *testing.T) {
actual)
}
func TestAddDefaultTags(t *testing.T) {
a := accumulator{}
a.addDefaultTag("default", "tag")
func TestAddGauge(t *testing.T) {
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
a := NewAccumulator(&TestMetricMaker{}, metrics)
a.AddFields("acctest",
a.AddGauge("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{})
a.AddFields("acctest",
a.AddGauge("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"})
a.AddFields("acctest",
a.AddGauge("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"}, now)
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest,default=tag value=101")
testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test,default=tag value=101")
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test,default=tag value=101 %d", now.UnixNano()),
actual)
}
func TestAddFields(t *testing.T) {
a := accumulator{}
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
fields := map[string]interface{}{
"usage": float64(99),
}
a.AddFields("acctest", fields, map[string]string{})
a.AddFields("acctest", fields, map[string]string{"acc": "test"})
a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now)
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest usage=99")
testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test usage=99")
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test usage=99 %d", now.UnixNano()),
actual)
}
// Test that all Inf fields get dropped, and not added to metrics channel
func TestAddInfFields(t *testing.T) {
inf := math.Inf(1)
ninf := math.Inf(-1)
a := accumulator{}
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
fields := map[string]interface{}{
"usage": inf,
"nusage": ninf,
}
a.AddFields("acctest", fields, map[string]string{})
a.AddFields("acctest", fields, map[string]string{"acc": "test"})
a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now)
assert.Len(t, a.metrics, 0)
// test that non-inf fields are kept and not dropped
fields["notinf"] = float64(100)
a.AddFields("acctest", fields, map[string]string{})
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest notinf=100")
}
// Test that nan fields are dropped and not added
func TestAddNaNFields(t *testing.T) {
nan := math.NaN()
a := accumulator{}
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
fields := map[string]interface{}{
"usage": nan,
}
a.AddFields("acctest", fields, map[string]string{})
a.AddFields("acctest", fields, map[string]string{"acc": "test"})
a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now)
assert.Len(t, a.metrics, 0)
// test that non-nan fields are kept and not dropped
fields["notnan"] = float64(100)
a.AddFields("acctest", fields, map[string]string{})
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest notnan=100")
}
func TestAddUint64Fields(t *testing.T) {
a := accumulator{}
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
fields := map[string]interface{}{
"usage": uint64(99),
}
a.AddFields("acctest", fields, map[string]string{})
a.AddFields("acctest", fields, map[string]string{"acc": "test"})
a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now)
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest usage=99i")
testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test usage=99i")
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test usage=99i %d", now.UnixNano()),
actual)
}
func TestAddUint64Overflow(t *testing.T) {
a := accumulator{}
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
fields := map[string]interface{}{
"usage": uint64(9223372036854775808),
}
a.AddFields("acctest", fields, map[string]string{})
a.AddFields("acctest", fields, map[string]string{"acc": "test"})
a.AddFields("acctest", fields, map[string]string{"acc": "test"}, now)
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest usage=9223372036854775807i")
testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test usage=9223372036854775807i")
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test usage=9223372036854775807i %d", now.UnixNano()),
actual)
}
func TestAddInts(t *testing.T) {
a := accumulator{}
a.addDefaultTag("default", "tag")
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
a.AddFields("acctest",
map[string]interface{}{"value": int(101)},
map[string]string{})
a.AddFields("acctest",
map[string]interface{}{"value": int32(101)},
map[string]string{"acc": "test"})
a.AddFields("acctest",
map[string]interface{}{"value": int64(101)},
map[string]string{"acc": "test"}, now)
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest,default=tag value=101i")
testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test,default=tag value=101i")
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test,default=tag value=101i %d", now.UnixNano()),
actual)
}
func TestAddFloats(t *testing.T) {
a := accumulator{}
a.addDefaultTag("default", "tag")
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
a.AddFields("acctest",
map[string]interface{}{"value": float32(101)},
map[string]string{"acc": "test"})
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"}, now)
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest,acc=test,default=tag value=101")
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test,default=tag value=101 %d", now.UnixNano()),
actual)
}
func TestAddStrings(t *testing.T) {
a := accumulator{}
a.addDefaultTag("default", "tag")
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
a.AddFields("acctest",
map[string]interface{}{"value": "test"},
map[string]string{"acc": "test"})
a.AddFields("acctest",
map[string]interface{}{"value": "foo"},
map[string]string{"acc": "test"}, now)
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest,acc=test,default=tag value=\"test\"")
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test,default=tag value=\"foo\" %d", now.UnixNano()),
actual)
}
func TestAddBools(t *testing.T) {
a := accumulator{}
a.addDefaultTag("default", "tag")
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}
a.AddFields("acctest",
map[string]interface{}{"value": true}, map[string]string{"acc": "test"})
a.AddFields("acctest",
map[string]interface{}{"value": false}, map[string]string{"acc": "test"}, now)
testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest,acc=test,default=tag value=true")
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test,default=tag value=false %d", now.UnixNano()),
actual)
}
// Test that tag filters get applied to metrics.
func TestAccFilterTags(t *testing.T) {
a := accumulator{}
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
filter := models.Filter{
TagExclude: []string{"acc"},
}
assert.NoError(t, filter.Compile())
a.inputConfig = &models.InputConfig{}
a.inputConfig.Filter = filter
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{})
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"})
a.AddFields("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"}, now)
testm := <-a.metrics
testm := <-metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")
assert.Equal(t, testm.Type(), telegraf.Gauge)
testm = <-a.metrics
testm = <-metrics
actual = testm.String()
assert.Contains(t, actual, "acctest value=101")
assert.Contains(t, actual, "acctest,acc=test value=101")
assert.Equal(t, testm.Type(), telegraf.Gauge)
testm = <-a.metrics
testm = <-metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest value=101 %d", now.UnixNano()),
fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()),
actual)
assert.Equal(t, testm.Type(), telegraf.Gauge)
}
func TestAccAddError(t *testing.T) {
errBuf := bytes.NewBuffer(nil)
log.SetOutput(errBuf)
defer log.SetOutput(os.Stderr)
func TestAddCounter(t *testing.T) {
now := time.Now()
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)
a := NewAccumulator(&TestMetricMaker{}, metrics)
a := accumulator{}
a.inputConfig = &models.InputConfig{}
a.inputConfig.Name = "mock_plugin"
a.AddCounter("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{})
a.AddCounter("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"})
a.AddCounter("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"}, now)
a.AddError(fmt.Errorf("foo"))
a.AddError(fmt.Errorf("bar"))
a.AddError(fmt.Errorf("baz"))
testm := <-metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")
assert.Equal(t, testm.Type(), telegraf.Counter)
errs := bytes.Split(errBuf.Bytes(), []byte{'\n'})
assert.EqualValues(t, 3, a.errCount)
require.Len(t, errs, 4) // 4 because of trailing newline
assert.Contains(t, string(errs[0]), "mock_plugin")
assert.Contains(t, string(errs[0]), "foo")
assert.Contains(t, string(errs[1]), "mock_plugin")
assert.Contains(t, string(errs[1]), "bar")
assert.Contains(t, string(errs[2]), "mock_plugin")
assert.Contains(t, string(errs[2]), "baz")
testm = <-metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test value=101")
assert.Equal(t, testm.Type(), telegraf.Counter)
testm = <-metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()),
actual)
assert.Equal(t, testm.Type(), telegraf.Counter)
}
type TestMetricMaker struct {
}
func (tm *TestMetricMaker) Name() string {
return "TestPlugin"
}
func (tm *TestMetricMaker) MakeMetric(
measurement string,
fields map[string]interface{},
tags map[string]string,
mType telegraf.ValueType,
t time.Time,
) telegraf.Metric {
switch mType {
case telegraf.Untyped:
if m, err := telegraf.NewMetric(measurement, tags, fields, t); err == nil {
return m
}
case telegraf.Counter:
if m, err := telegraf.NewCounterMetric(measurement, tags, fields, t); err == nil {
return m
}
case telegraf.Gauge:
if m, err := telegraf.NewGaugeMetric(measurement, tags, fields, t); err == nil {
return m
}
}
return nil
}

View File

@ -89,7 +89,7 @@ func panicRecover(input *models.RunningInput) {
trace := make([]byte, 2048)
runtime.Stack(trace, true)
log.Printf("E! FATAL: Input [%s] panicked: %s, Stack:\n%s\n",
input.Name, err, trace)
input.Name(), err, trace)
log.Println("E! PLEASE REPORT THIS PANIC ON GITHUB with " +
"stack trace, configuration, and OS information: " +
"https://github.com/influxdata/telegraf/issues/new")
@ -103,19 +103,18 @@ func (a *Agent) gatherer(
input *models.RunningInput,
interval time.Duration,
metricC chan telegraf.Metric,
) error {
) {
defer panicRecover(input)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
var outerr error
acc := NewAccumulator(input.Config, metricC)
acc := NewAccumulator(input, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
acc.setDefaultTags(a.Config.Tags)
input.SetDebug(a.Config.Agent.Debug)
input.SetDefaultTags(a.Config.Tags)
internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown)
@ -123,15 +122,13 @@ func (a *Agent) gatherer(
gatherWithTimeout(shutdown, input, acc, interval)
elapsed := time.Since(start)
if outerr != nil {
return outerr
}
log.Printf("D! Input [%s] gathered metrics, (%s interval) in %s\n",
input.Name, interval, elapsed)
input.Name(), interval, elapsed)
select {
case <-shutdown:
return nil
return
case <-ticker.C:
continue
}
@ -160,13 +157,13 @@ func gatherWithTimeout(
select {
case err := <-done:
if err != nil {
log.Printf("E! ERROR in input [%s]: %s", input.Name, err)
log.Printf("E! ERROR in input [%s]: %s", input.Name(), err)
}
return
case <-ticker.C:
log.Printf("E! ERROR: input [%s] took longer to collect than "+
"collection interval (%s)",
input.Name, timeout)
input.Name(), timeout)
continue
case <-shutdown:
return
@ -194,13 +191,13 @@ func (a *Agent) Test() error {
}()
for _, input := range a.Config.Inputs {
acc := NewAccumulator(input.Config, metricC)
acc.SetTrace(true)
acc := NewAccumulator(input, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
acc.setDefaultTags(a.Config.Tags)
input.SetTrace(true)
input.SetDefaultTags(a.Config.Tags)
fmt.Printf("* Plugin: %s, Collection 1\n", input.Name)
fmt.Printf("* Plugin: %s, Collection 1\n", input.Name())
if input.Config.Interval != 0 {
fmt.Printf("* Internal: %s\n", input.Config.Interval)
}
@ -214,10 +211,10 @@ func (a *Agent) Test() error {
// Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
switch input.Name {
switch input.Name() {
case "cpu", "mongodb", "procstat":
time.Sleep(500 * time.Millisecond)
fmt.Printf("* Plugin: %s, Collection 2\n", input.Name)
fmt.Printf("* Plugin: %s, Collection 2\n", input.Name())
if err := input.Input.Gather(acc); err != nil {
return err
}
@ -250,20 +247,38 @@ func (a *Agent) flush() {
func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error {
// Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 200)
ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
time.Sleep(time.Millisecond * 300)
// create an output metric channel and a gorouting that continously passes
// each metric onto the output plugins & aggregators.
outMetricC := make(chan telegraf.Metric, 100)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-shutdown:
log.Println("I! Hang on, flushing any cached metrics before shutdown")
a.flush()
return nil
case <-ticker.C:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
for _, agg := range a.Config.Aggregators {
agg.Aggregator.Stop()
}
if len(outMetricC) > 0 {
// keep going until outMetricC is flushed
continue
}
return
case m := <-outMetricC:
// if dropOriginal is set to true, then we will only send this
// metric to the aggregators, not the outputs.
var dropOriginal bool
if !m.IsAggregate() {
for _, agg := range a.Config.Aggregators {
if ok := agg.Apply(copyMetric(m)); ok {
dropOriginal = true
}
}
}
if !dropOriginal {
for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
@ -274,6 +289,31 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
}
}
}
}()
ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
for {
select {
case <-shutdown:
log.Println("I! Hang on, flushing any cached metrics before shutdown")
// wait for outMetricC to get flushed before flushing outputs
wg.Wait()
a.flush()
return nil
case <-ticker.C:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case metric := <-metricC:
mS := []telegraf.Metric{metric}
for _, processor := range a.Config.Processors {
mS = processor.Apply(mS...)
}
for _, m := range mS {
outMetricC <- m
}
}
}
}
func copyMetric(m telegraf.Metric) telegraf.Metric {
t := time.Time(m.Time())
@ -303,18 +343,18 @@ func (a *Agent) Run(shutdown chan struct{}) error {
// channel shared between all input threads for accumulating metrics
metricC := make(chan telegraf.Metric, 10000)
// Start all ServicePlugins
for _, input := range a.Config.Inputs {
// Start service of any ServicePlugins
switch p := input.Input.(type) {
case telegraf.ServiceInput:
acc := NewAccumulator(input.Config, metricC)
acc := NewAccumulator(input, metricC)
// Service input plugins should set their own precision of their
// metrics.
acc.DisablePrecision()
acc.setDefaultTags(a.Config.Tags)
acc.SetPrecision(time.Nanosecond, 0)
input.SetDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil {
log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
input.Name, err.Error())
input.Name(), err.Error())
return err
}
defer p.Stop()
@ -327,6 +367,18 @@ func (a *Agent) Run(shutdown chan struct{}) error {
time.Sleep(time.Duration(i - (time.Now().UnixNano() % i)))
}
// Start all Aggregators
for _, aggregator := range a.Config.Aggregators {
acc := NewAccumulator(aggregator, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
if err := aggregator.Aggregator.Start(acc); err != nil {
log.Printf("[%s] failed to start, exiting\n%s\n",
aggregator.Name(), err.Error())
return err
}
}
wg.Add(1)
go func() {
defer wg.Done()

16
aggregator.go Normal file
View File

@ -0,0 +1,16 @@
package telegraf
type Aggregator interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string
// Description returns a one-sentence description on the Input
Description() string
// Apply the metric to the aggregator
Apply(in Metric)
// Start starts the service filter with the given accumulator
Start(acc Accumulator) error
Stop()
}

View File

@ -12,12 +12,13 @@ import (
"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
_ "github.com/influxdata/telegraf/plugins/aggregators/all"
"github.com/influxdata/telegraf/logger"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
_ "github.com/influxdata/telegraf/plugins/processors/all"
"github.com/kardianos/service"
)
@ -111,6 +112,8 @@ Examples:
telegraf -config telegraf.conf -input-filter cpu:mem -output-filter influxdb
`
var logger service.Logger
var stop chan struct{}
var srvc service.Service
@ -306,6 +309,10 @@ func main() {
if err != nil {
log.Fatal(err)
}
logger, err = s.Logger(nil)
if err != nil {
log.Fatal(err)
}
// Handle the -service flag here to prevent any issues with tooling that
// may not have an interactive session, e.g. installing from Ansible.
if *fService != "" {

View File

@ -11,15 +11,18 @@ import (
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/aggregators"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/config"
@ -50,6 +53,8 @@ type Config struct {
Agent *AgentConfig
Inputs []*models.RunningInput
Outputs []*models.RunningOutput
Processors []*models.RunningProcessor
Aggregators []*models.RunningAggregator
}
func NewConfig() *Config {
@ -64,6 +69,7 @@ func NewConfig() *Config {
Tags: make(map[string]string),
Inputs: make([]*models.RunningInput, 0),
Outputs: make([]*models.RunningOutput, 0),
Processors: make([]*models.RunningProcessor, 0),
InputFilters: make([]string, 0),
OutputFilters: make([]string, 0),
}
@ -138,7 +144,7 @@ type AgentConfig struct {
func (c *Config) InputNames() []string {
var name []string
for _, input := range c.Inputs {
name = append(name, input.Name)
name = append(name, input.Name())
}
return name
}
@ -248,6 +254,20 @@ var header = `# Telegraf Configuration
###############################################################################
`
var processorHeader = `
###############################################################################
# PROCESSOR PLUGINS #
###############################################################################
`
var aggregatorHeader = `
###############################################################################
# AGGREGATOR PLUGINS #
###############################################################################
`
var inputHeader = `
###############################################################################
@ -266,6 +286,7 @@ var serviceInputHeader = `
func PrintSampleConfig(inputFilters []string, outputFilters []string) {
fmt.Printf(header)
// print output plugins
if len(outputFilters) != 0 {
printFilteredOutputs(outputFilters, false)
} else {
@ -281,6 +302,25 @@ func PrintSampleConfig(inputFilters []string, outputFilters []string) {
printFilteredOutputs(pnames, true)
}
// print processor plugins
fmt.Printf(processorHeader)
pnames := []string{}
for pname := range processors.Processors {
pnames = append(pnames, pname)
}
sort.Strings(pnames)
printFilteredProcessors(pnames, true)
// pring aggregator plugins
fmt.Printf(aggregatorHeader)
pnames = []string{}
for pname := range aggregators.Aggregators {
pnames = append(pnames, pname)
}
sort.Strings(pnames)
printFilteredAggregators(pnames, true)
// print input plugins
fmt.Printf(inputHeader)
if len(inputFilters) != 0 {
printFilteredInputs(inputFilters, false)
@ -298,6 +338,42 @@ func PrintSampleConfig(inputFilters []string, outputFilters []string) {
}
}
func printFilteredProcessors(processorFilters []string, commented bool) {
// Filter processors
var pnames []string
for pname := range processors.Processors {
if sliceContains(pname, processorFilters) {
pnames = append(pnames, pname)
}
}
sort.Strings(pnames)
// Print Outputs
for _, pname := range pnames {
creator := processors.Processors[pname]
output := creator()
printConfig(pname, output, "processors", commented)
}
}
func printFilteredAggregators(aggregatorFilters []string, commented bool) {
// Filter outputs
var anames []string
for aname := range aggregators.Aggregators {
if sliceContains(aname, aggregatorFilters) {
anames = append(anames, aname)
}
}
sort.Strings(anames)
// Print Outputs
for _, aname := range anames {
creator := aggregators.Aggregators[aname]
output := creator()
printConfig(aname, output, "aggregators", commented)
}
}
func printFilteredInputs(inputFilters []string, commented bool) {
// Filter inputs
var pnames []string
@ -507,6 +583,7 @@ func (c *Config) LoadConfig(path string) error {
case "outputs":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
// legacy [outputs.influxdb] support
case *ast.Table:
if err = c.addOutput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
@ -525,6 +602,7 @@ func (c *Config) LoadConfig(path string) error {
case "inputs", "plugins":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
// legacy [inputs.cpu] support
case *ast.Table:
if err = c.addInput(pluginName, pluginSubTable); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
@ -540,6 +618,34 @@ func (c *Config) LoadConfig(path string) error {
pluginName, path)
}
}
case "processors":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addProcessor(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
}
}
case "aggregators":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case []*ast.Table:
for _, t := range pluginSubTable {
if err = c.addAggregator(pluginName, t); err != nil {
return fmt.Errorf("Error parsing %s, %s", path, err)
}
}
default:
return fmt.Errorf("Unsupported config format: %s, file %s",
pluginName, path)
}
}
// Assume it's an input input for legacy config file support if no other
// identifiers are present
default:
@ -580,6 +686,57 @@ func parseFile(fpath string) (*ast.Table, error) {
return toml.Parse(contents)
}
func (c *Config) addAggregator(name string, table *ast.Table) error {
creator, ok := aggregators.Aggregators[name]
if !ok {
return fmt.Errorf("Undefined but requested aggregator: %s", name)
}
aggregator := creator()
aggregatorConfig, err := buildAggregator(name, table)
if err != nil {
return err
}
if err := config.UnmarshalTable(table, aggregator); err != nil {
return err
}
rf := &models.RunningAggregator{
Aggregator: aggregator,
Config: aggregatorConfig,
}
c.Aggregators = append(c.Aggregators, rf)
return nil
}
func (c *Config) addProcessor(name string, table *ast.Table) error {
creator, ok := processors.Processors[name]
if !ok {
return fmt.Errorf("Undefined but requested processor: %s", name)
}
processor := creator()
processorConfig, err := buildProcessor(name, table)
if err != nil {
return err
}
if err := config.UnmarshalTable(table, processor); err != nil {
return err
}
rf := &models.RunningProcessor{
Name: name,
Processor: processor,
Config: processorConfig,
}
c.Processors = append(c.Processors, rf)
return nil
}
func (c *Config) addOutput(name string, table *ast.Table) error {
if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) {
return nil
@ -652,7 +809,6 @@ func (c *Config) addInput(name string, table *ast.Table) error {
}
rp := &models.RunningInput{
Name: name,
Input: input,
Config: pluginConfig,
}
@ -660,6 +816,93 @@ func (c *Config) addInput(name string, table *ast.Table) error {
return nil
}
// buildAggregator TODO doc
func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, error) {
conf := &models.AggregatorConfig{Name: name}
unsupportedFields := []string{"tagexclude", "taginclude"}
for _, field := range unsupportedFields {
if _, ok := tbl.Fields[field]; ok {
// TODO raise error because field is not supported
}
}
if node, ok := tbl.Fields["drop_original"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
var err error
conf.DropOriginal, err = strconv.ParseBool(b.Value)
if err != nil {
log.Printf("Error parsing boolean value for %s: %s\n", name, err)
}
}
}
}
if node, ok := tbl.Fields["name_prefix"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
conf.MeasurementPrefix = str.Value
}
}
}
if node, ok := tbl.Fields["name_suffix"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
conf.MeasurementSuffix = str.Value
}
}
}
if node, ok := tbl.Fields["name_override"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
conf.NameOverride = str.Value
}
}
}
conf.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
if err := config.UnmarshalTable(subtbl, conf.Tags); err != nil {
log.Printf("Could not parse tags for input %s\n", name)
}
}
}
delete(tbl.Fields, "drop_original")
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
delete(tbl.Fields, "name_override")
delete(tbl.Fields, "tags")
var err error
conf.Filter, err = buildFilter(tbl)
if err != nil {
return conf, err
}
return conf, nil
}
// buildProcessor TODO doc
func buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) {
conf := &models.ProcessorConfig{Name: name}
unsupportedFields := []string{"pass", "fieldpass", "drop", "fielddrop",
"tagexclude", "taginclude"}
for _, field := range unsupportedFields {
if _, ok := tbl.Fields[field]; ok {
// TODO raise error because field is not supported
}
}
var err error
conf.Filter, err = buildFilter(tbl)
if err != nil {
return conf, err
}
return conf, nil
}
// buildFilter builds a Filter
// (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to
// be inserted into the models.OutputConfig/models.InputConfig

View File

@ -96,7 +96,7 @@ func (f *Filter) Compile() error {
// Apply applies the filter to the given measurement name, fields map, and
// tags map. It will return false if the metric should be "filtered out", and
// true if the metric should "pass".
// It will modify tags in-place if they need to be deleted.
// It will modify tags & fields in-place if they need to be deleted.
func (f *Filter) Apply(
measurement string,
fields map[string]interface{},

View File

@ -0,0 +1,150 @@
package models
import (
"log"
"math"
"time"
"github.com/influxdata/telegraf"
)
// makemetric is used by both RunningAggregator & RunningInput
// to make metrics.
// nameOverride: override the name of the measurement being made.
// namePrefix: add this prefix to each measurement name.
// nameSuffix: add this suffix to each measurement name.
// pluginTags: these are tags that are specific to this plugin.
// daemonTags: these are daemon-wide global tags, and get applied after pluginTags.
// filter: this is a filter to apply to each metric being made.
// applyFilter: if false, the above filter is not applied to each metric.
// This is used by Aggregators, because aggregators use filters
// on incoming metrics instead of on created metrics.
func makemetric(
measurement string,
fields map[string]interface{},
tags map[string]string,
nameOverride string,
namePrefix string,
nameSuffix string,
pluginTags map[string]string,
daemonTags map[string]string,
filter Filter,
applyFilter bool,
debug bool,
mType telegraf.ValueType,
t time.Time,
) telegraf.Metric {
if len(fields) == 0 || len(measurement) == 0 {
return nil
}
if tags == nil {
tags = make(map[string]string)
}
// Override measurement name if set
if len(nameOverride) != 0 {
measurement = nameOverride
}
// Apply measurement prefix and suffix if set
if len(namePrefix) != 0 {
measurement = namePrefix + measurement
}
if len(nameSuffix) != 0 {
measurement = measurement + nameSuffix
}
// Apply plugin-wide tags if set
for k, v := range pluginTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
// Apply daemon-wide tags if set
for k, v := range daemonTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
// Apply the metric filter(s)
// for aggregators, the filter does not get applied when the metric is made.
// instead, the filter is applied to metric incoming into the plugin.
// ie, it gets applied in the RunningAggregator.Apply function.
if applyFilter {
if ok := filter.Apply(measurement, fields, tags); !ok {
return nil
}
}
for k, v := range fields {
// Validate uint64 and float64 fields
// convert all int & uint types to int64
switch val := v.(type) {
case uint:
fields[k] = int64(val)
continue
case uint8:
fields[k] = int64(val)
continue
case uint16:
fields[k] = int64(val)
continue
case uint32:
fields[k] = int64(val)
continue
case int:
fields[k] = int64(val)
continue
case int8:
fields[k] = int64(val)
continue
case int16:
fields[k] = int64(val)
continue
case int32:
fields[k] = int64(val)
continue
case uint64:
// InfluxDB does not support writing uint64
if val < uint64(9223372036854775808) {
fields[k] = int64(val)
} else {
fields[k] = int64(9223372036854775807)
}
continue
case float32:
fields[k] = float64(val)
continue
case float64:
// NaNs are invalid values in influxdb, skip measurement
if math.IsNaN(val) || math.IsInf(val, 0) {
if debug {
log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
"field, skipping",
measurement, k)
}
delete(fields, k)
continue
}
}
fields[k] = v
}
var m telegraf.Metric
var err error
switch mType {
case telegraf.Counter:
m, err = telegraf.NewCounterMetric(measurement, tags, fields, t)
case telegraf.Gauge:
m, err = telegraf.NewGaugeMetric(measurement, tags, fields, t)
default:
m, err = telegraf.NewMetric(measurement, tags, fields, t)
}
if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return nil
}
return m
}

View File

@ -0,0 +1,79 @@
package models
import (
"time"
"github.com/influxdata/telegraf"
)
type RunningAggregator struct {
Aggregator telegraf.Aggregator
Config *AggregatorConfig
}
// AggregatorConfig containing configuration parameters for the running
// aggregator plugin.
type AggregatorConfig struct {
Name string
DropOriginal bool
NameOverride string
MeasurementPrefix string
MeasurementSuffix string
Tags map[string]string
Filter Filter
}
func (r *RunningAggregator) Name() string {
return "aggregators." + r.Config.Name
}
func (r *RunningAggregator) MakeMetric(
measurement string,
fields map[string]interface{},
tags map[string]string,
mType telegraf.ValueType,
t time.Time,
) telegraf.Metric {
m := makemetric(
measurement,
fields,
tags,
r.Config.NameOverride,
r.Config.MeasurementPrefix,
r.Config.MeasurementSuffix,
r.Config.Tags,
nil,
r.Config.Filter,
false,
false,
mType,
t,
)
m.SetAggregate(true)
return m
}
// Apply applies the given metric to the aggregator.
// Before applying to the plugin, it will run any defined filters on the metric.
// Apply returns true if the original metric should be dropped.
func (r *RunningAggregator) Apply(in telegraf.Metric) bool {
if r.Config.Filter.IsActive() {
// check if the aggregator should apply this metric
name := in.Name()
fields := in.Fields()
tags := in.Tags()
t := in.Time()
if ok := r.Config.Filter.Apply(name, fields, tags); !ok {
// aggregator should not apply this metric
return false
}
in, _ = telegraf.NewMetric(name, tags, fields, t)
}
r.Aggregator.Apply(in)
return r.Config.DropOriginal
}

View File

@ -0,0 +1,150 @@
package models
import (
"fmt"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert"
)
func TestApply(t *testing.T) {
a := &TestAggregator{}
ra := RunningAggregator{
Config: &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
},
Aggregator: a,
}
assert.NoError(t, ra.Config.Filter.Compile())
m := ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now(),
)
assert.False(t, ra.Apply(m))
assert.Equal(t, int64(101), a.sum)
}
func TestApplyDropOriginal(t *testing.T) {
ra := RunningAggregator{
Config: &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"RI*"},
},
DropOriginal: true,
},
Aggregator: &TestAggregator{},
}
assert.NoError(t, ra.Config.Filter.Compile())
m := ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now(),
)
assert.True(t, ra.Apply(m))
// this metric name doesn't match the filter, so Apply will return false
m2 := ra.MakeMetric(
"foobar",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now(),
)
assert.False(t, ra.Apply(m2))
}
// make an untyped, counter, & gauge metric
func TestMakeMetricA(t *testing.T) {
now := time.Now()
ra := RunningAggregator{
Config: &AggregatorConfig{
Name: "TestRunningAggregator",
},
}
assert.Equal(t, "aggregators.TestRunningAggregator", ra.Name())
m := ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
now,
)
assert.Equal(
t,
m.String(),
fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
)
assert.Equal(
t,
m.Type(),
telegraf.Untyped,
)
m = ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Counter,
now,
)
assert.Equal(
t,
m.String(),
fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
)
assert.Equal(
t,
m.Type(),
telegraf.Counter,
)
m = ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Gauge,
now,
)
assert.Equal(
t,
m.String(),
fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
)
assert.Equal(
t,
m.Type(),
telegraf.Gauge,
)
}
type TestAggregator struct {
sum int64
}
func (t *TestAggregator) Description() string { return "" }
func (t *TestAggregator) SampleConfig() string { return "" }
func (t *TestAggregator) Start(acc telegraf.Accumulator) error { return nil }
func (t *TestAggregator) Stop() {}
func (t *TestAggregator) Apply(in telegraf.Metric) {
for _, v := range in.Fields() {
if vi, ok := v.(int64); ok {
t.sum += vi
}
}
}

View File

@ -1,15 +1,19 @@
package models
import (
"fmt"
"time"
"github.com/influxdata/telegraf"
)
type RunningInput struct {
Name string
Input telegraf.Input
Config *InputConfig
trace bool
debug bool
defaultTags map[string]string
}
// InputConfig containing a name, interval, and filter
@ -22,3 +26,59 @@ type InputConfig struct {
Filter Filter
Interval time.Duration
}
func (r *RunningInput) Name() string {
return "inputs." + r.Config.Name
}
// MakeMetric either returns a metric, or returns nil if the metric doesn't
// need to be created (because of filtering, an error, etc.)
func (r *RunningInput) MakeMetric(
measurement string,
fields map[string]interface{},
tags map[string]string,
mType telegraf.ValueType,
t time.Time,
) telegraf.Metric {
m := makemetric(
measurement,
fields,
tags,
r.Config.NameOverride,
r.Config.MeasurementPrefix,
r.Config.MeasurementSuffix,
r.Config.Tags,
r.defaultTags,
r.Config.Filter,
true,
r.debug,
mType,
t,
)
if r.trace && m != nil {
fmt.Println("> " + m.String())
}
return m
}
func (r *RunningInput) Debug() bool {
return r.debug
}
func (r *RunningInput) SetDebug(debug bool) {
r.debug = debug
}
func (r *RunningInput) Trace() bool {
return r.trace
}
func (r *RunningInput) SetTrace(trace bool) {
r.trace = trace
}
func (r *RunningInput) SetDefaultTags(tags map[string]string) {
r.defaultTags = tags
}

View File

@ -0,0 +1,326 @@
package models
import (
"fmt"
"math"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert"
)
func TestMakeMetricNoFields(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
},
}
m := ri.MakeMetric(
"RITest",
map[string]interface{}{},
map[string]string{},
telegraf.Untyped,
now,
)
assert.Nil(t, m)
}
// make an untyped, counter, & gauge metric
func TestMakeMetric(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
},
}
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true)
assert.Equal(t, true, ri.Trace())
assert.Equal(t, "inputs.TestRunningInput", ri.Name())
m := ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
now,
)
assert.Equal(
t,
m.String(),
fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
)
assert.Equal(
t,
m.Type(),
telegraf.Untyped,
)
m = ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Counter,
now,
)
assert.Equal(
t,
m.String(),
fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
)
assert.Equal(
t,
m.Type(),
telegraf.Counter,
)
m = ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Gauge,
now,
)
assert.Equal(
t,
m.String(),
fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
)
assert.Equal(
t,
m.Type(),
telegraf.Gauge,
)
}
func TestMakeMetricWithPluginTags(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
Tags: map[string]string{
"foo": "bar",
},
},
}
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true)
assert.Equal(t, true, ri.Trace())
m := ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
nil,
telegraf.Untyped,
now,
)
assert.Equal(
t,
m.String(),
fmt.Sprintf("RITest,foo=bar value=101i %d", now.UnixNano()),
)
}
func TestMakeMetricFilteredOut(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
Tags: map[string]string{
"foo": "bar",
},
Filter: Filter{NamePass: []string{"foobar"}},
},
}
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true)
assert.Equal(t, true, ri.Trace())
assert.NoError(t, ri.Config.Filter.Compile())
m := ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
nil,
telegraf.Untyped,
now,
)
assert.Nil(t, m)
}
func TestMakeMetricWithDaemonTags(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
},
}
ri.SetDefaultTags(map[string]string{
"foo": "bar",
})
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true)
assert.Equal(t, true, ri.Trace())
m := ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
now,
)
assert.Equal(
t,
m.String(),
fmt.Sprintf("RITest,foo=bar value=101i %d", now.UnixNano()),
)
}
// make an untyped, counter, & gauge metric
func TestMakeMetricInfFields(t *testing.T) {
inf := math.Inf(1)
ninf := math.Inf(-1)
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
},
}
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true)
assert.Equal(t, true, ri.Trace())
m := ri.MakeMetric(
"RITest",
map[string]interface{}{
"value": int(101),
"inf": inf,
"ninf": ninf,
},
map[string]string{},
telegraf.Untyped,
now,
)
assert.Equal(
t,
m.String(),
fmt.Sprintf("RITest value=101i %d", now.UnixNano()),
)
}
func TestMakeMetricAllFieldTypes(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
},
}
ri.SetDebug(true)
assert.Equal(t, true, ri.Debug())
ri.SetTrace(true)
assert.Equal(t, true, ri.Trace())
m := ri.MakeMetric(
"RITest",
map[string]interface{}{
"a": int(10),
"b": int8(10),
"c": int16(10),
"d": int32(10),
"e": uint(10),
"f": uint8(10),
"g": uint16(10),
"h": uint32(10),
"i": uint64(10),
"j": float32(10),
"k": uint64(9223372036854775810),
"l": "foobar",
"m": true,
},
map[string]string{},
telegraf.Untyped,
now,
)
assert.Equal(
t,
fmt.Sprintf("RITest a=10i,b=10i,c=10i,d=10i,e=10i,f=10i,g=10i,h=10i,i=10i,j=10,k=9223372036854775807i,l=\"foobar\",m=true %d", now.UnixNano()),
m.String(),
)
}
func TestMakeMetricNameOverride(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
NameOverride: "foobar",
},
}
m := ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
now,
)
assert.Equal(
t,
m.String(),
fmt.Sprintf("foobar value=101i %d", now.UnixNano()),
)
}
func TestMakeMetricNamePrefix(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
MeasurementPrefix: "foobar_",
},
}
m := ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
now,
)
assert.Equal(
t,
m.String(),
fmt.Sprintf("foobar_RITest value=101i %d", now.UnixNano()),
)
}
func TestMakeMetricNameSuffix(t *testing.T) {
now := time.Now()
ri := RunningInput{
Config: &InputConfig{
Name: "TestRunningInput",
MeasurementSuffix: "_foobar",
},
}
m := ri.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
now,
)
assert.Equal(
t,
m.String(),
fmt.Sprintf("RITest_foobar value=101i %d", now.UnixNano()),
)
}

View File

@ -132,7 +132,6 @@ func TestRunningOutput_PassFilter(t *testing.T) {
func TestRunningOutput_TagIncludeNoMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
TagInclude: []string{"nothing*"},
},
}
@ -154,7 +153,6 @@ func TestRunningOutput_TagIncludeNoMatch(t *testing.T) {
func TestRunningOutput_TagExcludeMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
TagExclude: []string{"tag*"},
},
}
@ -176,7 +174,6 @@ func TestRunningOutput_TagExcludeMatch(t *testing.T) {
func TestRunningOutput_TagExcludeNoMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
TagExclude: []string{"nothing*"},
},
}
@ -198,7 +195,6 @@ func TestRunningOutput_TagExcludeNoMatch(t *testing.T) {
func TestRunningOutput_TagIncludeMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
TagInclude: []string{"tag*"},
},
}

View File

@ -0,0 +1,37 @@
package models
import (
"github.com/influxdata/telegraf"
)
type RunningProcessor struct {
Name string
Processor telegraf.Processor
Config *ProcessorConfig
}
// FilterConfig containing a name and filter
type ProcessorConfig struct {
Name string
Filter Filter
}
func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
ret := []telegraf.Metric{}
for _, metric := range in {
if rp.Config.Filter.IsActive() {
// check if the filter should be applied to this metric
if ok := rp.Config.Filter.Apply(metric.Name(), metric.Fields(), metric.Tags()); !ok {
// this means filter should not be applied
ret = append(ret, metric)
continue
}
}
// This metric should pass through the filter, so call the filter Apply
// function and append results to the output slice.
ret = append(ret, rp.Processor.Apply(metric)...)
}
return ret
}

View File

@ -0,0 +1,117 @@
package models
import (
"testing"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
type TestProcessor struct {
}
func (f *TestProcessor) SampleConfig() string { return "" }
func (f *TestProcessor) Description() string { return "" }
// Apply renames:
// "foo" to "fuz"
// "bar" to "baz"
// And it also drops measurements named "dropme"
func (f *TestProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
out := make([]telegraf.Metric, 0)
for _, m := range in {
switch m.Name() {
case "foo":
out = append(out, testutil.TestMetric(1, "fuz"))
case "bar":
out = append(out, testutil.TestMetric(1, "baz"))
case "dropme":
// drop the metric!
default:
out = append(out, m)
}
}
return out
}
func NewTestRunningProcessor() *RunningProcessor {
out := &RunningProcessor{
Name: "test",
Processor: &TestProcessor{},
Config: &ProcessorConfig{Filter: Filter{}},
}
return out
}
func TestRunningProcessor(t *testing.T) {
inmetrics := []telegraf.Metric{
testutil.TestMetric(1, "foo"),
testutil.TestMetric(1, "bar"),
testutil.TestMetric(1, "baz"),
}
expectedNames := []string{
"fuz",
"baz",
"baz",
}
rfp := NewTestRunningProcessor()
filteredMetrics := rfp.Apply(inmetrics...)
actualNames := []string{
filteredMetrics[0].Name(),
filteredMetrics[1].Name(),
filteredMetrics[2].Name(),
}
assert.Equal(t, expectedNames, actualNames)
}
func TestRunningProcessor_WithNameDrop(t *testing.T) {
inmetrics := []telegraf.Metric{
testutil.TestMetric(1, "foo"),
testutil.TestMetric(1, "bar"),
testutil.TestMetric(1, "baz"),
}
expectedNames := []string{
"foo",
"baz",
"baz",
}
rfp := NewTestRunningProcessor()
rfp.Config.Filter.NameDrop = []string{"foo"}
assert.NoError(t, rfp.Config.Filter.Compile())
filteredMetrics := rfp.Apply(inmetrics...)
actualNames := []string{
filteredMetrics[0].Name(),
filteredMetrics[1].Name(),
filteredMetrics[2].Name(),
}
assert.Equal(t, expectedNames, actualNames)
}
func TestRunningProcessor_DroppedMetric(t *testing.T) {
inmetrics := []telegraf.Metric{
testutil.TestMetric(1, "dropme"),
testutil.TestMetric(1, "foo"),
testutil.TestMetric(1, "bar"),
}
expectedNames := []string{
"fuz",
"baz",
}
rfp := NewTestRunningProcessor()
filteredMetrics := rfp.Apply(inmetrics...)
actualNames := []string{
filteredMetrics[0].Name(),
filteredMetrics[1].Name(),
}
assert.Equal(t, expectedNames, actualNames)
}

View File

@ -4,6 +4,7 @@ import (
"time"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/models"
)
// ValueType is an enumeration of metric types that represent a simple value.
@ -33,6 +34,9 @@ type Metric interface {
// UnixNano returns the unix nano time of the metric
UnixNano() int64
// HashID returns a non-cryptographic hash of the metric (name + tags)
HashID() uint64
// Fields returns the fields for the metric
Fields() map[string]interface{}
@ -44,13 +48,21 @@ type Metric interface {
// Point returns a influxdb client.Point object
Point() *client.Point
// SetAggregate sets the metric's aggregate status
// This is so that aggregate metrics don't get re-sent to aggregator plugins
SetAggregate(bool)
// IsAggregate returns true if the metric is an aggregate
IsAggregate() bool
}
// metric is a wrapper of the influxdb client.Point struct
type metric struct {
pt *client.Point
pt models.Point
mType ValueType
isaggregate bool
}
// NewMetric returns an untyped metric.
@ -60,7 +72,7 @@ func NewMetric(
fields map[string]interface{},
t time.Time,
) (Metric, error) {
pt, err := client.NewPoint(name, tags, fields, t)
pt, err := models.NewPoint(name, tags, fields, t)
if err != nil {
return nil, err
}
@ -79,7 +91,7 @@ func NewGaugeMetric(
fields map[string]interface{},
t time.Time,
) (Metric, error) {
pt, err := client.NewPoint(name, tags, fields, t)
pt, err := models.NewPoint(name, tags, fields, t)
if err != nil {
return nil, err
}
@ -98,7 +110,7 @@ func NewCounterMetric(
fields map[string]interface{},
t time.Time,
) (Metric, error) {
pt, err := client.NewPoint(name, tags, fields, t)
pt, err := models.NewPoint(name, tags, fields, t)
if err != nil {
return nil, err
}
@ -124,6 +136,10 @@ func (m *metric) Type() ValueType {
return m.mType
}
func (m *metric) HashID() uint64 {
return m.pt.HashID()
}
func (m *metric) UnixNano() int64 {
return m.pt.UnixNano()
}
@ -141,5 +157,13 @@ func (m *metric) PrecisionString(precison string) string {
}
func (m *metric) Point() *client.Point {
return m.pt
return client.NewPointFrom(m.pt)
}
func (m *metric) IsAggregate() bool {
return m.isaggregate
}
func (m *metric) SetAggregate(b bool) {
m.isaggregate = b
}

View File

@ -0,0 +1,5 @@
package all
import (
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
)

View File

@ -0,0 +1,192 @@
package minmax
import (
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/aggregators"
)
type MinMax struct {
Period internal.Duration
// metrics waiting to be processed
metrics chan telegraf.Metric
shutdown chan struct{}
wg sync.WaitGroup
// caches for metric fields, names, and tags
fieldCache map[uint64]map[string]minmax
nameCache map[uint64]string
tagCache map[uint64]map[string]string
acc telegraf.Accumulator
}
type minmax struct {
min interface{}
max interface{}
}
var sampleConfig = `
## TODO doc
period = "30s"
`
func (m *MinMax) SampleConfig() string {
return sampleConfig
}
func (m *MinMax) Description() string {
return "Keep the aggregate min/max of each metric passing through."
}
func (m *MinMax) Apply(in telegraf.Metric) {
m.metrics <- in
}
func (m *MinMax) apply(in telegraf.Metric) {
id := in.HashID()
if _, ok := m.nameCache[id]; !ok {
// hit an uncached metric, create caches for first time:
m.nameCache[id] = in.Name()
m.tagCache[id] = in.Tags()
m.fieldCache[id] = make(map[string]minmax)
for k, v := range in.Fields() {
m.fieldCache[id][k] = minmax{
min: v,
max: v,
}
}
} else {
for k, v := range in.Fields() {
cmpmin := compare(m.fieldCache[id][k].min, v)
cmpmax := compare(m.fieldCache[id][k].max, v)
if cmpmin == 1 {
tmp := m.fieldCache[id][k]
tmp.min = v
m.fieldCache[id][k] = tmp
}
if cmpmax == -1 {
tmp := m.fieldCache[id][k]
tmp.max = v
m.fieldCache[id][k] = tmp
}
}
}
}
func (m *MinMax) Start(acc telegraf.Accumulator) error {
m.metrics = make(chan telegraf.Metric, 10)
m.shutdown = make(chan struct{})
m.clearCache()
m.acc = acc
m.wg.Add(1)
if m.Period.Duration > 0 {
go m.periodHandler()
} else {
go m.continuousHandler()
}
return nil
}
func (m *MinMax) Stop() {
close(m.shutdown)
m.wg.Wait()
}
func (m *MinMax) addfields(id uint64) {
fields := map[string]interface{}{}
for k, v := range m.fieldCache[id] {
fields[k+"_min"] = v.min
fields[k+"_max"] = v.max
}
m.acc.AddFields(m.nameCache[id], fields, m.tagCache[id])
}
func (m *MinMax) clearCache() {
m.fieldCache = make(map[uint64]map[string]minmax)
m.nameCache = make(map[uint64]string)
m.tagCache = make(map[uint64]map[string]string)
}
// periodHandler only adds the aggregate metrics on the configured Period.
// thus if telegraf's collection interval is 10s, and period is 30s, there
// will only be one aggregate sent every 3 metrics.
func (m *MinMax) periodHandler() {
// TODO make this sleep less of a hack!
time.Sleep(time.Millisecond * 200)
defer m.wg.Done()
ticker := time.NewTicker(m.Period.Duration)
defer ticker.Stop()
for {
select {
case in := <-m.metrics:
m.apply(in)
case <-m.shutdown:
if len(m.metrics) > 0 {
continue
}
return
case <-ticker.C:
for id, _ := range m.nameCache {
m.addfields(id)
}
m.clearCache()
}
}
}
// continuousHandler sends one metric for every metric that passes through it.
func (m *MinMax) continuousHandler() {
defer m.wg.Done()
for {
select {
case in := <-m.metrics:
m.apply(in)
m.addfields(in.HashID())
case <-m.shutdown:
if len(m.metrics) > 0 {
continue
}
return
}
}
}
func compare(a, b interface{}) int {
switch at := a.(type) {
case int64:
if bt, ok := b.(int64); ok {
if at < bt {
return -1
} else if at > bt {
return 1
}
return 0
} else {
return 0
}
case float64:
if bt, ok := b.(float64); ok {
if at < bt {
return -1
} else if at > bt {
return 1
}
return 0
} else {
return 0
}
default:
return 0
}
}
func init() {
aggregators.Add("minmax", func() telegraf.Aggregator {
return &MinMax{}
})
}

View File

@ -0,0 +1,51 @@
package minmax
import (
"testing"
"time"
"github.com/influxdata/telegraf"
)
func BenchmarkApply(b *testing.B) {
minmax := MinMax{}
minmax.clearCache()
m1, _ := telegraf.NewMetric("m1",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": int64(1),
"b": int64(1),
"c": int64(1),
"d": int64(1),
"e": int64(1),
"f": float64(2),
"g": float64(2),
"h": float64(2),
"i": float64(2),
"j": float64(3),
},
time.Now(),
)
m2, _ := telegraf.NewMetric("m1",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": int64(3),
"b": int64(3),
"c": int64(3),
"d": int64(3),
"e": int64(3),
"f": float64(1),
"g": float64(1),
"h": float64(1),
"i": float64(1),
"j": float64(1),
},
time.Now(),
)
for n := 0; n < b.N; n++ {
minmax.apply(m1)
minmax.apply(m2)
}
}

View File

@ -0,0 +1,11 @@
package aggregators
import "github.com/influxdata/telegraf"
type Creator func() telegraf.Aggregator
var Aggregators = map[string]Creator{}
func Add(name string, creator Creator) {
Aggregators[name] = creator
}

View File

@ -99,14 +99,14 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
go func(innerwg *sync.WaitGroup) {
defer innerwg.Done()
for i := 0; i < 500; i++ {
resp, err := http.Post("http://localhost:8286/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgs)))
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
}
wg.Done()
}()
}(&wg)
}
wg.Wait()

View File

@ -29,6 +29,7 @@ type Postgresql struct {
Tagvalue string
Measurement string
}
Debug bool
}
type query []struct {

View File

@ -0,0 +1,5 @@
package all
import (
_ "github.com/influxdata/telegraf/plugins/processors/printer"
)

View File

@ -0,0 +1,35 @@
package printer
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)
type Printer struct {
}
var sampleConfig = `
`
func (p *Printer) SampleConfig() string {
return sampleConfig
}
func (p *Printer) Description() string {
return "Print all metrics that pass through this filter."
}
func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, metric := range in {
fmt.Println(metric.String())
}
return in
}
func init() {
processors.Add("printer", func() telegraf.Processor {
return &Printer{}
})
}

View File

@ -0,0 +1,11 @@
package processors
import "github.com/influxdata/telegraf"
type Creator func() telegraf.Processor
var Processors = map[string]Creator{}
func Add(name string, creator Creator) {
Processors[name] = creator
}

12
processor.go Normal file
View File

@ -0,0 +1,12 @@
package telegraf
type Processor interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string
// Description returns a one-sentence description on the Input
Description() string
// Apply the filter to the given metric
Apply(in ...Metric) []Metric
}