Support Go execd plugins with shim (#7283)

This commit is contained in:
Steven Soroka
2020-05-04 14:09:10 -04:00
committed by GitHub
parent 7a5690cd36
commit b73a232a6a
58 changed files with 915 additions and 65 deletions

286
models/buffer.go Normal file
View File

@@ -0,0 +1,286 @@
package models
import (
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)
var (
AgentMetricsWritten = selfstat.Register("agent", "metrics_written", map[string]string{})
AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{})
)
// Buffer stores metrics in a circular buffer.
type Buffer struct {
sync.Mutex
buf []telegraf.Metric
first int // index of the first/oldest metric
last int // one after the index of the last/newest metric
size int // number of metrics currently in the buffer
cap int // the capacity of the buffer
batchFirst int // index of the first metric in the batch
batchSize int // number of metrics currently in the batch
MetricsAdded selfstat.Stat
MetricsWritten selfstat.Stat
MetricsDropped selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat
}
// NewBuffer returns a new empty Buffer with the given capacity.
func NewBuffer(name string, alias string, capacity int) *Buffer {
tags := map[string]string{"output": name}
if alias != "" {
tags["alias"] = alias
}
b := &Buffer{
buf: make([]telegraf.Metric, capacity),
first: 0,
last: 0,
size: 0,
cap: capacity,
MetricsAdded: selfstat.Register(
"write",
"metrics_added",
tags,
),
MetricsWritten: selfstat.Register(
"write",
"metrics_written",
tags,
),
MetricsDropped: selfstat.Register(
"write",
"metrics_dropped",
tags,
),
BufferSize: selfstat.Register(
"write",
"buffer_size",
tags,
),
BufferLimit: selfstat.Register(
"write",
"buffer_limit",
tags,
),
}
b.BufferSize.Set(int64(0))
b.BufferLimit.Set(int64(capacity))
return b
}
// Len returns the number of metrics currently in the buffer.
func (b *Buffer) Len() int {
b.Lock()
defer b.Unlock()
return b.length()
}
func (b *Buffer) length() int {
return min(b.size+b.batchSize, b.cap)
}
func (b *Buffer) metricAdded() {
b.MetricsAdded.Incr(1)
}
func (b *Buffer) metricWritten(metric telegraf.Metric) {
AgentMetricsWritten.Incr(1)
b.MetricsWritten.Incr(1)
metric.Accept()
}
func (b *Buffer) metricDropped(metric telegraf.Metric) {
AgentMetricsDropped.Incr(1)
b.MetricsDropped.Incr(1)
metric.Reject()
}
func (b *Buffer) add(m telegraf.Metric) int {
dropped := 0
// Check if Buffer is full
if b.size == b.cap {
b.metricDropped(b.buf[b.last])
dropped++
if b.last == b.batchFirst && b.batchSize > 0 {
b.batchSize--
b.batchFirst = b.next(b.batchFirst)
}
}
b.metricAdded()
b.buf[b.last] = m
b.last = b.next(b.last)
if b.size == b.cap {
b.first = b.next(b.first)
}
b.size = min(b.size+1, b.cap)
return dropped
}
// Add adds metrics to the buffer and returns number of dropped metrics.
func (b *Buffer) Add(metrics ...telegraf.Metric) int {
b.Lock()
defer b.Unlock()
dropped := 0
for i := range metrics {
if n := b.add(metrics[i]); n != 0 {
dropped += n
}
}
b.BufferSize.Set(int64(b.length()))
return dropped
}
// Batch returns a slice containing up to batchSize of the most recently added
// metrics. Metrics are ordered from newest to oldest in the batch. The
// batch must not be modified by the client.
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
b.Lock()
defer b.Unlock()
outLen := min(b.size, batchSize)
out := make([]telegraf.Metric, outLen)
if outLen == 0 {
return out
}
b.batchFirst = b.cap + b.last - outLen
b.batchFirst %= b.cap
b.batchSize = outLen
batchIndex := b.batchFirst
for i := range out {
out[len(out)-1-i] = b.buf[batchIndex]
b.buf[batchIndex] = nil
batchIndex = b.next(batchIndex)
}
b.last = b.batchFirst
b.size -= outLen
return out
}
// Accept marks the batch, acquired from Batch(), as successfully written.
func (b *Buffer) Accept(batch []telegraf.Metric) {
b.Lock()
defer b.Unlock()
for _, m := range batch {
b.metricWritten(m)
}
b.resetBatch()
b.BufferSize.Set(int64(b.length()))
}
// Reject returns the batch, acquired from Batch(), to the buffer and marks it
// as unsent.
func (b *Buffer) Reject(batch []telegraf.Metric) {
b.Lock()
defer b.Unlock()
if len(batch) == 0 {
return
}
older := b.dist(b.first, b.batchFirst)
free := b.cap - b.size
restore := min(len(batch), free+older)
// Rotate newer metrics forward the number of metrics that we can restore.
rb := b.batchFirst
rp := b.last
re := b.nextby(rp, restore)
b.last = re
for rb != rp && rp != re {
rp = b.prev(rp)
re = b.prev(re)
if b.buf[re] != nil {
b.metricDropped(b.buf[re])
b.first = b.next(b.first)
}
b.buf[re] = b.buf[rp]
b.buf[rp] = nil
}
// Copy metrics from the batch back into the buffer; recall that the
// batch is in reverse order compared to b.buf
for i := range batch {
if i < restore {
re = b.prev(re)
b.buf[re] = batch[i]
b.size = min(b.size+1, b.cap)
} else {
b.metricDropped(batch[i])
}
}
b.resetBatch()
b.BufferSize.Set(int64(b.length()))
}
// dist returns the distance between two indexes. Because this data structure
// uses a half open range the arguments must both either left side or right
// side pairs.
func (b *Buffer) dist(begin, end int) int {
if begin <= end {
return end - begin
} else {
return b.cap - begin + end
}
}
// next returns the next index with wrapping.
func (b *Buffer) next(index int) int {
index++
if index == b.cap {
return 0
}
return index
}
// next returns the index that is count newer with wrapping.
func (b *Buffer) nextby(index, count int) int {
index += count
index %= b.cap
return index
}
// next returns the prev index with wrapping.
func (b *Buffer) prev(index int) int {
index--
if index < 0 {
return b.cap - 1
}
return index
}
func (b *Buffer) resetBatch() {
b.batchFirst = 0
b.batchSize = 0
}
func min(a, b int) int {
if b < a {
return b
}
return a
}

728
models/buffer_test.go Normal file
View File

@@ -0,0 +1,728 @@
package models
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
type MockMetric struct {
telegraf.Metric
AcceptF func()
RejectF func()
DropF func()
}
func (m *MockMetric) Accept() {
m.AcceptF()
}
func (m *MockMetric) Reject() {
m.RejectF()
}
func (m *MockMetric) Drop() {
m.DropF()
}
func Metric() telegraf.Metric {
return MetricTime(0)
}
func MetricTime(sec int64) telegraf.Metric {
m, err := metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(sec, 0),
)
if err != nil {
panic(err)
}
return m
}
func BenchmarkAddMetrics(b *testing.B) {
buf := NewBuffer("test", "", 10000)
m := Metric()
for n := 0; n < b.N; n++ {
buf.Add(m)
}
}
func setup(b *Buffer) *Buffer {
b.MetricsAdded.Set(0)
b.MetricsWritten.Set(0)
b.MetricsDropped.Set(0)
return b
}
func TestBuffer_LenEmpty(t *testing.T) {
b := setup(NewBuffer("test", "", 5))
require.Equal(t, 0, b.Len())
}
func TestBuffer_LenOne(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m)
require.Equal(t, 1, b.Len())
}
func TestBuffer_LenFull(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m, m, m)
require.Equal(t, 5, b.Len())
}
func TestBuffer_LenOverfill(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
setup(b)
b.Add(m, m, m, m, m, m)
require.Equal(t, 5, b.Len())
}
func TestBuffer_BatchLenZero(t *testing.T) {
b := setup(NewBuffer("test", "", 5))
batch := b.Batch(0)
require.Len(t, batch, 0)
}
func TestBuffer_BatchLenBufferEmpty(t *testing.T) {
b := setup(NewBuffer("test", "", 5))
batch := b.Batch(2)
require.Len(t, batch, 0)
}
func TestBuffer_BatchLenUnderfill(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m)
batch := b.Batch(2)
require.Len(t, batch, 1)
}
func TestBuffer_BatchLenFill(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m)
batch := b.Batch(2)
require.Len(t, batch, 2)
}
func TestBuffer_BatchLenExact(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m)
batch := b.Batch(2)
require.Len(t, batch, 2)
}
func TestBuffer_BatchLenLargerThanBuffer(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m, m, m)
batch := b.Batch(6)
require.Len(t, batch, 5)
}
func TestBuffer_BatchWrap(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m, m, m)
batch := b.Batch(2)
b.Accept(batch)
b.Add(m, m)
batch = b.Batch(5)
require.Len(t, batch, 5)
}
func TestBuffer_BatchLatest(t *testing.T) {
b := setup(NewBuffer("test", "", 4))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(2)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(3),
MetricTime(2),
}, batch)
}
func TestBuffer_BatchLatestWrap(t *testing.T) {
b := setup(NewBuffer("test", "", 4))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
b.Add(MetricTime(4))
b.Add(MetricTime(5))
batch := b.Batch(2)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
}, batch)
}
func TestBuffer_MultipleBatch(t *testing.T) {
b := setup(NewBuffer("test", "", 10))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
batch := b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
}, batch)
b.Accept(batch)
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(1),
}, batch)
b.Accept(batch)
}
func TestBuffer_RejectWithRoom(t *testing.T) {
b := setup(NewBuffer("test", "", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(2)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Reject(batch)
require.Equal(t, int64(0), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1),
}, batch)
}
func TestBuffer_RejectNothingNewFull(t *testing.T) {
b := setup(NewBuffer("test", "", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
b.Add(MetricTime(4))
b.Add(MetricTime(5))
batch := b.Batch(2)
b.Reject(batch)
require.Equal(t, int64(0), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1),
}, batch)
}
func TestBuffer_RejectNoRoom(t *testing.T) {
b := setup(NewBuffer("test", "", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(2)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
b.Add(MetricTime(7))
b.Add(MetricTime(8))
b.Reject(batch)
require.Equal(t, int64(3), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(8),
MetricTime(7),
MetricTime(6),
MetricTime(5),
MetricTime(4),
}, batch)
}
func TestBuffer_RejectRoomExact(t *testing.T) {
b := setup(NewBuffer("test", "", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
batch := b.Batch(2)
b.Add(MetricTime(3))
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Reject(batch)
require.Equal(t, int64(0), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
MetricTime(1),
}, batch)
}
func TestBuffer_RejectRoomOverwriteOld(t *testing.T) {
b := setup(NewBuffer("test", "", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(1)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
b.Reject(batch)
require.Equal(t, int64(1), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3),
MetricTime(2),
}, batch)
}
func TestBuffer_RejectPartialRoom(t *testing.T) {
b := setup(NewBuffer("test", "", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(2)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
b.Add(MetricTime(7))
b.Reject(batch)
require.Equal(t, int64(2), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(7),
MetricTime(6),
MetricTime(5),
MetricTime(4),
MetricTime(3),
}, batch)
}
func TestBuffer_RejectNewMetricsWrapped(t *testing.T) {
b := setup(NewBuffer("test", "", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(2)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
// buffer: 1, 4, 5; batch: 2, 3
require.Equal(t, int64(0), b.MetricsDropped.Get())
b.Add(MetricTime(6))
b.Add(MetricTime(7))
b.Add(MetricTime(8))
b.Add(MetricTime(9))
b.Add(MetricTime(10))
// buffer: 8, 9, 10, 6, 7; batch: 2, 3
require.Equal(t, int64(3), b.MetricsDropped.Get())
b.Add(MetricTime(11))
b.Add(MetricTime(12))
b.Add(MetricTime(13))
b.Add(MetricTime(14))
b.Add(MetricTime(15))
// buffer: 13, 14, 15, 11, 12; batch: 2, 3
require.Equal(t, int64(8), b.MetricsDropped.Get())
b.Reject(batch)
require.Equal(t, int64(10), b.MetricsDropped.Get())
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(15),
MetricTime(14),
MetricTime(13),
MetricTime(12),
MetricTime(11),
}, batch)
}
func TestBuffer_RejectWrapped(t *testing.T) {
b := setup(NewBuffer("test", "", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
b.Add(MetricTime(7))
b.Add(MetricTime(8))
batch := b.Batch(3)
b.Add(MetricTime(9))
b.Add(MetricTime(10))
b.Add(MetricTime(11))
b.Add(MetricTime(12))
b.Reject(batch)
batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(12),
MetricTime(11),
MetricTime(10),
MetricTime(9),
MetricTime(8),
}, batch)
}
func TestBuffer_RejectAdjustFirst(t *testing.T) {
b := setup(NewBuffer("test", "", 10))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(3)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
b.Reject(batch)
b.Add(MetricTime(7))
b.Add(MetricTime(8))
b.Add(MetricTime(9))
batch = b.Batch(3)
b.Add(MetricTime(10))
b.Add(MetricTime(11))
b.Add(MetricTime(12))
b.Reject(batch)
b.Add(MetricTime(13))
b.Add(MetricTime(14))
b.Add(MetricTime(15))
batch = b.Batch(3)
b.Add(MetricTime(16))
b.Add(MetricTime(17))
b.Add(MetricTime(18))
b.Reject(batch)
b.Add(MetricTime(19))
batch = b.Batch(10)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(19),
MetricTime(18),
MetricTime(17),
MetricTime(16),
MetricTime(15),
MetricTime(14),
MetricTime(13),
MetricTime(12),
MetricTime(11),
MetricTime(10),
}, batch)
}
func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m, m, m)
b.Add(m, m, m, m, m)
require.Equal(t, int64(5), b.MetricsDropped.Get())
require.Equal(t, int64(0), b.MetricsWritten.Get())
}
func TestBuffer_AcceptRemovesBatch(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m)
batch := b.Batch(2)
b.Accept(batch)
require.Equal(t, 1, b.Len())
}
func TestBuffer_RejectLeavesBatch(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m)
batch := b.Batch(2)
b.Reject(batch)
require.Equal(t, 3, b.Len())
}
func TestBuffer_AcceptWritesOverwrittenBatch(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m, m, m)
batch := b.Batch(5)
b.Add(m, m, m, m, m)
b.Accept(batch)
require.Equal(t, int64(0), b.MetricsDropped.Get())
require.Equal(t, int64(5), b.MetricsWritten.Get())
}
func TestBuffer_BatchRejectDropsOverwrittenBatch(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m, m, m)
batch := b.Batch(5)
b.Add(m, m, m, m, m)
b.Reject(batch)
require.Equal(t, int64(5), b.MetricsDropped.Get())
require.Equal(t, int64(0), b.MetricsWritten.Get())
}
func TestBuffer_MetricsOverwriteBatchAccept(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m, m, m)
batch := b.Batch(3)
b.Add(m, m, m)
b.Accept(batch)
require.Equal(t, int64(0), b.MetricsDropped.Get(), "dropped")
require.Equal(t, int64(3), b.MetricsWritten.Get(), "written")
}
func TestBuffer_MetricsOverwriteBatchReject(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m, m, m)
batch := b.Batch(3)
b.Add(m, m, m)
b.Reject(batch)
require.Equal(t, int64(3), b.MetricsDropped.Get())
require.Equal(t, int64(0), b.MetricsWritten.Get())
}
func TestBuffer_MetricsBatchAcceptRemoved(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m, m, m)
batch := b.Batch(3)
b.Add(m, m, m, m, m)
b.Accept(batch)
require.Equal(t, int64(2), b.MetricsDropped.Get())
require.Equal(t, int64(3), b.MetricsWritten.Get())
}
func TestBuffer_WrapWithBatch(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m)
b.Batch(3)
b.Add(m, m, m, m, m, m)
require.Equal(t, int64(1), b.MetricsDropped.Get())
}
func TestBuffer_BatchNotRemoved(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m, m, m)
b.Batch(2)
require.Equal(t, 5, b.Len())
}
func TestBuffer_BatchRejectAcceptNoop(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", "", 5))
b.Add(m, m, m, m, m)
batch := b.Batch(2)
b.Reject(batch)
b.Accept(batch)
require.Equal(t, 5, b.Len())
}
func TestBuffer_AcceptCallsMetricAccept(t *testing.T) {
var accept int
mm := &MockMetric{
Metric: Metric(),
AcceptF: func() {
accept++
},
}
b := setup(NewBuffer("test", "", 5))
b.Add(mm, mm, mm)
batch := b.Batch(2)
b.Accept(batch)
require.Equal(t, 2, accept)
}
func TestBuffer_AddCallsMetricRejectWhenNoBatch(t *testing.T) {
var reject int
mm := &MockMetric{
Metric: Metric(),
RejectF: func() {
reject++
},
}
b := setup(NewBuffer("test", "", 5))
setup(b)
b.Add(mm, mm, mm, mm, mm)
b.Add(mm, mm)
require.Equal(t, 2, reject)
}
func TestBuffer_AddCallsMetricRejectWhenNotInBatch(t *testing.T) {
var reject int
mm := &MockMetric{
Metric: Metric(),
RejectF: func() {
reject++
},
}
b := setup(NewBuffer("test", "", 5))
setup(b)
b.Add(mm, mm, mm, mm, mm)
batch := b.Batch(2)
b.Add(mm, mm, mm, mm)
require.Equal(t, 2, reject)
b.Reject(batch)
require.Equal(t, 4, reject)
}
func TestBuffer_RejectCallsMetricRejectWithOverwritten(t *testing.T) {
var reject int
mm := &MockMetric{
Metric: Metric(),
RejectF: func() {
reject++
},
}
b := setup(NewBuffer("test", "", 5))
b.Add(mm, mm, mm, mm, mm)
batch := b.Batch(5)
b.Add(mm, mm)
require.Equal(t, 0, reject)
b.Reject(batch)
require.Equal(t, 2, reject)
}
func TestBuffer_AddOverwriteAndReject(t *testing.T) {
var reject int
mm := &MockMetric{
Metric: Metric(),
RejectF: func() {
reject++
},
}
b := setup(NewBuffer("test", "", 5))
b.Add(mm, mm, mm, mm, mm)
batch := b.Batch(5)
b.Add(mm, mm, mm, mm, mm)
b.Add(mm, mm, mm, mm, mm)
b.Add(mm, mm, mm, mm, mm)
b.Add(mm, mm, mm, mm, mm)
require.Equal(t, 15, reject)
b.Reject(batch)
require.Equal(t, 20, reject)
}
func TestBuffer_AddOverwriteAndRejectOffset(t *testing.T) {
var reject int
var accept int
mm := &MockMetric{
Metric: Metric(),
RejectF: func() {
reject++
},
AcceptF: func() {
accept++
},
}
b := setup(NewBuffer("test", "", 5))
b.Add(mm, mm, mm)
b.Add(mm, mm, mm, mm)
require.Equal(t, 2, reject)
batch := b.Batch(5)
b.Add(mm, mm, mm, mm)
require.Equal(t, 2, reject)
b.Add(mm, mm, mm, mm)
require.Equal(t, 5, reject)
b.Add(mm, mm, mm, mm)
require.Equal(t, 9, reject)
b.Add(mm, mm, mm, mm)
require.Equal(t, 13, reject)
b.Accept(batch)
require.Equal(t, 13, reject)
require.Equal(t, 5, accept)
}
func TestBuffer_RejectEmptyBatch(t *testing.T) {
b := setup(NewBuffer("test", "", 5))
batch := b.Batch(2)
b.Add(MetricTime(1))
b.Reject(batch)
b.Add(MetricTime(2))
batch = b.Batch(2)
for _, m := range batch {
require.NotNil(t, m)
}
}

260
models/filter.go Normal file
View File

@@ -0,0 +1,260 @@
package models
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
)
// TagFilter is the name of a tag, and the values on which to filter
type TagFilter struct {
Name string
Filter []string
filter filter.Filter
}
// Filter containing drop/pass and tagdrop/tagpass rules
type Filter struct {
NameDrop []string
nameDrop filter.Filter
NamePass []string
namePass filter.Filter
FieldDrop []string
fieldDrop filter.Filter
FieldPass []string
fieldPass filter.Filter
TagDrop []TagFilter
TagPass []TagFilter
TagExclude []string
tagExclude filter.Filter
TagInclude []string
tagInclude filter.Filter
isActive bool
}
// Compile all Filter lists into filter.Filter objects.
func (f *Filter) Compile() error {
if len(f.NameDrop) == 0 &&
len(f.NamePass) == 0 &&
len(f.FieldDrop) == 0 &&
len(f.FieldPass) == 0 &&
len(f.TagInclude) == 0 &&
len(f.TagExclude) == 0 &&
len(f.TagPass) == 0 &&
len(f.TagDrop) == 0 {
return nil
}
f.isActive = true
var err error
f.nameDrop, err = filter.Compile(f.NameDrop)
if err != nil {
return fmt.Errorf("Error compiling 'namedrop', %s", err)
}
f.namePass, err = filter.Compile(f.NamePass)
if err != nil {
return fmt.Errorf("Error compiling 'namepass', %s", err)
}
f.fieldDrop, err = filter.Compile(f.FieldDrop)
if err != nil {
return fmt.Errorf("Error compiling 'fielddrop', %s", err)
}
f.fieldPass, err = filter.Compile(f.FieldPass)
if err != nil {
return fmt.Errorf("Error compiling 'fieldpass', %s", err)
}
f.tagExclude, err = filter.Compile(f.TagExclude)
if err != nil {
return fmt.Errorf("Error compiling 'tagexclude', %s", err)
}
f.tagInclude, err = filter.Compile(f.TagInclude)
if err != nil {
return fmt.Errorf("Error compiling 'taginclude', %s", err)
}
for i := range f.TagDrop {
f.TagDrop[i].filter, err = filter.Compile(f.TagDrop[i].Filter)
if err != nil {
return fmt.Errorf("Error compiling 'tagdrop', %s", err)
}
}
for i := range f.TagPass {
f.TagPass[i].filter, err = filter.Compile(f.TagPass[i].Filter)
if err != nil {
return fmt.Errorf("Error compiling 'tagpass', %s", err)
}
}
return nil
}
// Select returns true if the metric matches according to the
// namepass/namedrop and tagpass/tagdrop filters. The metric is not modified.
func (f *Filter) Select(metric telegraf.Metric) bool {
if !f.isActive {
return true
}
if !f.shouldNamePass(metric.Name()) {
return false
}
if !f.shouldTagsPass(metric.TagList()) {
return false
}
return true
}
// Modify removes any tags and fields from the metric according to the
// fieldpass/fielddrop and taginclude/tagexclude filters.
func (f *Filter) Modify(metric telegraf.Metric) {
if !f.isActive {
return
}
f.filterFields(metric)
f.filterTags(metric)
}
// IsActive checking if filter is active
func (f *Filter) IsActive() bool {
return f.isActive
}
// shouldNamePass returns true if the metric should pass, false if should drop
// based on the drop/pass filter parameters
func (f *Filter) shouldNamePass(key string) bool {
pass := func(f *Filter) bool {
if f.namePass.Match(key) {
return true
}
return false
}
drop := func(f *Filter) bool {
if f.nameDrop.Match(key) {
return false
}
return true
}
if f.namePass != nil && f.nameDrop != nil {
return pass(f) && drop(f)
} else if f.namePass != nil {
return pass(f)
} else if f.nameDrop != nil {
return drop(f)
}
return true
}
// shouldFieldPass returns true if the metric should pass, false if should drop
// based on the drop/pass filter parameters
func (f *Filter) shouldFieldPass(key string) bool {
if f.fieldPass != nil && f.fieldDrop != nil {
return f.fieldPass.Match(key) && !f.fieldDrop.Match(key)
} else if f.fieldPass != nil {
return f.fieldPass.Match(key)
} else if f.fieldDrop != nil {
return !f.fieldDrop.Match(key)
}
return true
}
// shouldTagsPass returns true if the metric should pass, false if should drop
// based on the tagdrop/tagpass filter parameters
func (f *Filter) shouldTagsPass(tags []*telegraf.Tag) bool {
pass := func(f *Filter) bool {
for _, pat := range f.TagPass {
if pat.filter == nil {
continue
}
for _, tag := range tags {
if tag.Key == pat.Name {
if pat.filter.Match(tag.Value) {
return true
}
}
}
}
return false
}
drop := func(f *Filter) bool {
for _, pat := range f.TagDrop {
if pat.filter == nil {
continue
}
for _, tag := range tags {
if tag.Key == pat.Name {
if pat.filter.Match(tag.Value) {
return false
}
}
}
}
return true
}
// Add additional logic in case where both parameters are set.
// see: https://github.com/influxdata/telegraf/issues/2860
if f.TagPass != nil && f.TagDrop != nil {
// return true only in case when tag pass and won't be dropped (true, true).
// in case when the same tag should be passed and dropped it will be dropped (true, false).
return pass(f) && drop(f)
} else if f.TagPass != nil {
return pass(f)
} else if f.TagDrop != nil {
return drop(f)
}
return true
}
// filterFields removes fields according to fieldpass/fielddrop.
func (f *Filter) filterFields(metric telegraf.Metric) {
filterKeys := []string{}
for _, field := range metric.FieldList() {
if !f.shouldFieldPass(field.Key) {
filterKeys = append(filterKeys, field.Key)
}
}
for _, key := range filterKeys {
metric.RemoveField(key)
}
}
// filterTags removes tags according to taginclude/tagexclude.
func (f *Filter) filterTags(metric telegraf.Metric) {
filterKeys := []string{}
if f.tagInclude != nil {
for _, tag := range metric.TagList() {
if !f.tagInclude.Match(tag.Key) {
filterKeys = append(filterKeys, tag.Key)
}
}
}
for _, key := range filterKeys {
metric.RemoveTag(key)
}
if f.tagExclude != nil {
for _, tag := range metric.TagList() {
if f.tagExclude.Match(tag.Key) {
filterKeys = append(filterKeys, tag.Key)
}
}
}
for _, key := range filterKeys {
metric.RemoveTag(key)
}
}

525
models/filter_test.go Normal file
View File

@@ -0,0 +1,525 @@
package models
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestFilter_ApplyEmpty(t *testing.T) {
f := Filter{}
require.NoError(t, f.Compile())
require.False(t, f.IsActive())
m, err := metric.New("m",
map[string]string{},
map[string]interface{}{"value": int64(1)},
time.Now())
require.NoError(t, err)
require.True(t, f.Select(m))
}
func TestFilter_ApplyTagsDontPass(t *testing.T) {
filters := []TagFilter{
{
Name: "cpu",
Filter: []string{"cpu-*"},
},
}
f := Filter{
TagDrop: filters,
}
require.NoError(t, f.Compile())
require.NoError(t, f.Compile())
require.True(t, f.IsActive())
m, err := metric.New("m",
map[string]string{"cpu": "cpu-total"},
map[string]interface{}{"value": int64(1)},
time.Now())
require.NoError(t, err)
require.False(t, f.Select(m))
}
func TestFilter_ApplyDeleteFields(t *testing.T) {
f := Filter{
FieldDrop: []string{"value"},
}
require.NoError(t, f.Compile())
require.NoError(t, f.Compile())
require.True(t, f.IsActive())
m, err := metric.New("m",
map[string]string{},
map[string]interface{}{
"value": int64(1),
"value2": int64(2),
},
time.Now())
require.NoError(t, err)
require.True(t, f.Select(m))
f.Modify(m)
require.Equal(t, map[string]interface{}{"value2": int64(2)}, m.Fields())
}
func TestFilter_ApplyDeleteAllFields(t *testing.T) {
f := Filter{
FieldDrop: []string{"value*"},
}
require.NoError(t, f.Compile())
require.NoError(t, f.Compile())
require.True(t, f.IsActive())
m, err := metric.New("m",
map[string]string{},
map[string]interface{}{
"value": int64(1),
"value2": int64(2),
},
time.Now())
require.NoError(t, err)
require.True(t, f.Select(m))
f.Modify(m)
require.Len(t, m.FieldList(), 0)
}
func TestFilter_Empty(t *testing.T) {
f := Filter{}
measurements := []string{
"foo",
"bar",
"barfoo",
"foo_bar",
"foo.bar",
"foo-bar",
"supercalifradjulisticexpialidocious",
}
for _, measurement := range measurements {
if !f.shouldFieldPass(measurement) {
t.Errorf("Expected measurement %s to pass", measurement)
}
}
}
func TestFilter_NamePass(t *testing.T) {
f := Filter{
NamePass: []string{"foo*", "cpu_usage_idle"},
}
require.NoError(t, f.Compile())
passes := []string{
"foo",
"foo_bar",
"foo.bar",
"foo-bar",
"cpu_usage_idle",
}
drops := []string{
"bar",
"barfoo",
"bar_foo",
"cpu_usage_busy",
}
for _, measurement := range passes {
if !f.shouldNamePass(measurement) {
t.Errorf("Expected measurement %s to pass", measurement)
}
}
for _, measurement := range drops {
if f.shouldNamePass(measurement) {
t.Errorf("Expected measurement %s to drop", measurement)
}
}
}
func TestFilter_NameDrop(t *testing.T) {
f := Filter{
NameDrop: []string{"foo*", "cpu_usage_idle"},
}
require.NoError(t, f.Compile())
drops := []string{
"foo",
"foo_bar",
"foo.bar",
"foo-bar",
"cpu_usage_idle",
}
passes := []string{
"bar",
"barfoo",
"bar_foo",
"cpu_usage_busy",
}
for _, measurement := range passes {
if !f.shouldNamePass(measurement) {
t.Errorf("Expected measurement %s to pass", measurement)
}
}
for _, measurement := range drops {
if f.shouldNamePass(measurement) {
t.Errorf("Expected measurement %s to drop", measurement)
}
}
}
func TestFilter_FieldPass(t *testing.T) {
f := Filter{
FieldPass: []string{"foo*", "cpu_usage_idle"},
}
require.NoError(t, f.Compile())
passes := []string{
"foo",
"foo_bar",
"foo.bar",
"foo-bar",
"cpu_usage_idle",
}
drops := []string{
"bar",
"barfoo",
"bar_foo",
"cpu_usage_busy",
}
for _, measurement := range passes {
if !f.shouldFieldPass(measurement) {
t.Errorf("Expected measurement %s to pass", measurement)
}
}
for _, measurement := range drops {
if f.shouldFieldPass(measurement) {
t.Errorf("Expected measurement %s to drop", measurement)
}
}
}
func TestFilter_FieldDrop(t *testing.T) {
f := Filter{
FieldDrop: []string{"foo*", "cpu_usage_idle"},
}
require.NoError(t, f.Compile())
drops := []string{
"foo",
"foo_bar",
"foo.bar",
"foo-bar",
"cpu_usage_idle",
}
passes := []string{
"bar",
"barfoo",
"bar_foo",
"cpu_usage_busy",
}
for _, measurement := range passes {
if !f.shouldFieldPass(measurement) {
t.Errorf("Expected measurement %s to pass", measurement)
}
}
for _, measurement := range drops {
if f.shouldFieldPass(measurement) {
t.Errorf("Expected measurement %s to drop", measurement)
}
}
}
func TestFilter_TagPass(t *testing.T) {
filters := []TagFilter{
{
Name: "cpu",
Filter: []string{"cpu-*"},
},
{
Name: "mem",
Filter: []string{"mem_free"},
}}
f := Filter{
TagPass: filters,
}
require.NoError(t, f.Compile())
passes := [][]*telegraf.Tag{
{{Key: "cpu", Value: "cpu-total"}},
{{Key: "cpu", Value: "cpu-0"}},
{{Key: "cpu", Value: "cpu-1"}},
{{Key: "cpu", Value: "cpu-2"}},
{{Key: "mem", Value: "mem_free"}},
}
drops := [][]*telegraf.Tag{
{{Key: "cpu", Value: "cputotal"}},
{{Key: "cpu", Value: "cpu0"}},
{{Key: "cpu", Value: "cpu1"}},
{{Key: "cpu", Value: "cpu2"}},
{{Key: "mem", Value: "mem_used"}},
}
for _, tags := range passes {
if !f.shouldTagsPass(tags) {
t.Errorf("Expected tags %v to pass", tags)
}
}
for _, tags := range drops {
if f.shouldTagsPass(tags) {
t.Errorf("Expected tags %v to drop", tags)
}
}
}
func TestFilter_TagDrop(t *testing.T) {
filters := []TagFilter{
{
Name: "cpu",
Filter: []string{"cpu-*"},
},
{
Name: "mem",
Filter: []string{"mem_free"},
}}
f := Filter{
TagDrop: filters,
}
require.NoError(t, f.Compile())
drops := [][]*telegraf.Tag{
{{Key: "cpu", Value: "cpu-total"}},
{{Key: "cpu", Value: "cpu-0"}},
{{Key: "cpu", Value: "cpu-1"}},
{{Key: "cpu", Value: "cpu-2"}},
{{Key: "mem", Value: "mem_free"}},
}
passes := [][]*telegraf.Tag{
{{Key: "cpu", Value: "cputotal"}},
{{Key: "cpu", Value: "cpu0"}},
{{Key: "cpu", Value: "cpu1"}},
{{Key: "cpu", Value: "cpu2"}},
{{Key: "mem", Value: "mem_used"}},
}
for _, tags := range passes {
if !f.shouldTagsPass(tags) {
t.Errorf("Expected tags %v to pass", tags)
}
}
for _, tags := range drops {
if f.shouldTagsPass(tags) {
t.Errorf("Expected tags %v to drop", tags)
}
}
}
func TestFilter_FilterTagsNoMatches(t *testing.T) {
m, err := metric.New("m",
map[string]string{
"host": "localhost",
"mytag": "foobar",
},
map[string]interface{}{"value": int64(1)},
time.Now())
require.NoError(t, err)
f := Filter{
TagExclude: []string{"nomatch"},
}
require.NoError(t, f.Compile())
f.filterTags(m)
require.Equal(t, map[string]string{
"host": "localhost",
"mytag": "foobar",
}, m.Tags())
f = Filter{
TagInclude: []string{"nomatch"},
}
require.NoError(t, f.Compile())
f.filterTags(m)
require.Equal(t, map[string]string{}, m.Tags())
}
func TestFilter_FilterTagsMatches(t *testing.T) {
m, err := metric.New("m",
map[string]string{
"host": "localhost",
"mytag": "foobar",
},
map[string]interface{}{"value": int64(1)},
time.Now())
require.NoError(t, err)
f := Filter{
TagExclude: []string{"ho*"},
}
require.NoError(t, f.Compile())
f.filterTags(m)
require.Equal(t, map[string]string{
"mytag": "foobar",
}, m.Tags())
m, err = metric.New("m",
map[string]string{
"host": "localhost",
"mytag": "foobar",
},
map[string]interface{}{"value": int64(1)},
time.Now())
require.NoError(t, err)
f = Filter{
TagInclude: []string{"my*"},
}
require.NoError(t, f.Compile())
f.filterTags(m)
require.Equal(t, map[string]string{
"mytag": "foobar",
}, m.Tags())
}
// TestFilter_FilterNamePassAndDrop used for check case when
// both parameters were defined
// see: https://github.com/influxdata/telegraf/issues/2860
func TestFilter_FilterNamePassAndDrop(t *testing.T) {
inputData := []string{"name1", "name2", "name3", "name4"}
expectedResult := []bool{false, true, false, false}
f := Filter{
NamePass: []string{"name1", "name2"},
NameDrop: []string{"name1", "name3"},
}
require.NoError(t, f.Compile())
for i, name := range inputData {
require.Equal(t, f.shouldNamePass(name), expectedResult[i])
}
}
// TestFilter_FilterFieldPassAndDrop used for check case when
// both parameters were defined
// see: https://github.com/influxdata/telegraf/issues/2860
func TestFilter_FilterFieldPassAndDrop(t *testing.T) {
inputData := []string{"field1", "field2", "field3", "field4"}
expectedResult := []bool{false, true, false, false}
f := Filter{
FieldPass: []string{"field1", "field2"},
FieldDrop: []string{"field1", "field3"},
}
require.NoError(t, f.Compile())
for i, field := range inputData {
require.Equal(t, f.shouldFieldPass(field), expectedResult[i])
}
}
// TestFilter_FilterTagsPassAndDrop used for check case when
// both parameters were defined
// see: https://github.com/influxdata/telegraf/issues/2860
func TestFilter_FilterTagsPassAndDrop(t *testing.T) {
inputData := [][]*telegraf.Tag{
{{Key: "tag1", Value: "1"}, {Key: "tag2", Value: "3"}},
{{Key: "tag1", Value: "1"}, {Key: "tag2", Value: "2"}},
{{Key: "tag1", Value: "2"}, {Key: "tag2", Value: "1"}},
{{Key: "tag1", Value: "4"}, {Key: "tag2", Value: "1"}},
}
expectedResult := []bool{false, true, false, false}
filterPass := []TagFilter{
{
Name: "tag1",
Filter: []string{"1", "4"},
},
}
filterDrop := []TagFilter{
{
Name: "tag1",
Filter: []string{"4"},
},
{
Name: "tag2",
Filter: []string{"3"},
},
}
f := Filter{
TagDrop: filterDrop,
TagPass: filterPass,
}
require.NoError(t, f.Compile())
for i, tag := range inputData {
require.Equal(t, f.shouldTagsPass(tag), expectedResult[i])
}
}
func BenchmarkFilter(b *testing.B) {
tests := []struct {
name string
filter Filter
metric telegraf.Metric
}{
{
name: "empty filter",
filter: Filter{},
metric: testutil.MustMetric("cpu",
map[string]string{},
map[string]interface{}{
"value": 42,
},
time.Unix(0, 0),
),
},
{
name: "namepass",
filter: Filter{
NamePass: []string{"cpu"},
},
metric: testutil.MustMetric("cpu",
map[string]string{},
map[string]interface{}{
"value": 42,
},
time.Unix(0, 0),
),
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
require.NoError(b, tt.filter.Compile())
for n := 0; n < b.N; n++ {
tt.filter.Select(tt.metric)
}
})
}
}

102
models/log.go Normal file
View File

@@ -0,0 +1,102 @@
package models
import (
"log"
"reflect"
"github.com/influxdata/telegraf"
)
// Logger defines a logging structure for plugins.
type Logger struct {
OnErrs []func()
Name string // Name is the plugin name, will be printed in the `[]`.
}
// NewLogger creates a new logger instance
func NewLogger(pluginType, name, alias string) *Logger {
return &Logger{
Name: logName(pluginType, name, alias),
}
}
// OnErr defines a callback that triggers only when errors are about to be written to the log
func (l *Logger) OnErr(f func()) {
l.OnErrs = append(l.OnErrs, f)
}
// Errorf logs an error message, patterned after log.Printf.
func (l *Logger) Errorf(format string, args ...interface{}) {
for _, f := range l.OnErrs {
f()
}
log.Printf("E! ["+l.Name+"] "+format, args...)
}
// Error logs an error message, patterned after log.Print.
func (l *Logger) Error(args ...interface{}) {
for _, f := range l.OnErrs {
f()
}
log.Print(append([]interface{}{"E! [" + l.Name + "] "}, args...)...)
}
// Debugf logs a debug message, patterned after log.Printf.
func (l *Logger) Debugf(format string, args ...interface{}) {
log.Printf("D! ["+l.Name+"] "+format, args...)
}
// Debug logs a debug message, patterned after log.Print.
func (l *Logger) Debug(args ...interface{}) {
log.Print(append([]interface{}{"D! [" + l.Name + "] "}, args...)...)
}
// Warnf logs a warning message, patterned after log.Printf.
func (l *Logger) Warnf(format string, args ...interface{}) {
log.Printf("W! ["+l.Name+"] "+format, args...)
}
// Warn logs a warning message, patterned after log.Print.
func (l *Logger) Warn(args ...interface{}) {
log.Print(append([]interface{}{"W! [" + l.Name + "] "}, args...)...)
}
// Infof logs an information message, patterned after log.Printf.
func (l *Logger) Infof(format string, args ...interface{}) {
log.Printf("I! ["+l.Name+"] "+format, args...)
}
// Info logs an information message, patterned after log.Print.
func (l *Logger) Info(args ...interface{}) {
log.Print(append([]interface{}{"I! [" + l.Name + "] "}, args...)...)
}
// logName returns the log-friendly name/type.
func logName(pluginType, name, alias string) string {
if alias == "" {
return pluginType + "." + name
}
return pluginType + "." + name + "::" + alias
}
func setLogIfExist(i interface{}, log telegraf.Logger) {
valI := reflect.ValueOf(i)
if valI.Type().Kind() != reflect.Ptr {
valI = reflect.New(reflect.TypeOf(i))
}
field := valI.Elem().FieldByName("Log")
if !field.IsValid() {
return
}
switch field.Type().String() {
case "telegraf.Logger":
if field.CanSet() {
field.Set(reflect.ValueOf(log))
}
}
return
}

24
models/log_test.go Normal file
View File

@@ -0,0 +1,24 @@
package models
import (
"testing"
"github.com/influxdata/telegraf/selfstat"
"github.com/stretchr/testify/require"
)
func TestErrorCounting(t *testing.T) {
reg := selfstat.Register(
"gather",
"errors",
map[string]string{"input": "test"},
)
iLog := Logger{Name: "inputs.test"}
iLog.OnErr(func() {
reg.Incr(1)
})
iLog.Error("something went wrong")
iLog.Errorf("something went wrong")
require.Equal(t, int64(2), reg.Get())
}

42
models/makemetric.go Normal file
View File

@@ -0,0 +1,42 @@
package models
import (
"github.com/influxdata/telegraf"
)
// Makemetric applies new metric plugin and agent measurement and tag
// settings.
func makemetric(
metric telegraf.Metric,
nameOverride string,
namePrefix string,
nameSuffix string,
tags map[string]string,
globalTags map[string]string,
) telegraf.Metric {
if len(nameOverride) != 0 {
metric.SetName(nameOverride)
}
if len(namePrefix) != 0 {
metric.AddPrefix(namePrefix)
}
if len(nameSuffix) != 0 {
metric.AddSuffix(nameSuffix)
}
// Apply plugin-wide tags
for k, v := range tags {
if _, ok := metric.GetTag(k); !ok {
metric.AddTag(k, v)
}
}
// Apply global tags
for k, v := range globalTags {
if _, ok := metric.GetTag(k); !ok {
metric.AddTag(k, v)
}
}
return metric
}

View File

@@ -0,0 +1,183 @@
package models
import (
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/selfstat"
)
type RunningAggregator struct {
sync.Mutex
Aggregator telegraf.Aggregator
Config *AggregatorConfig
periodStart time.Time
periodEnd time.Time
log telegraf.Logger
MetricsPushed selfstat.Stat
MetricsFiltered selfstat.Stat
MetricsDropped selfstat.Stat
PushTime selfstat.Stat
}
func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConfig) *RunningAggregator {
tags := map[string]string{"aggregator": config.Name}
if config.Alias != "" {
tags["alias"] = config.Alias
}
aggErrorsRegister := selfstat.Register("aggregate", "errors", tags)
logger := NewLogger("aggregators", config.Name, config.Alias)
logger.OnErr(func() {
aggErrorsRegister.Incr(1)
})
setLogIfExist(aggregator, logger)
return &RunningAggregator{
Aggregator: aggregator,
Config: config,
MetricsPushed: selfstat.Register(
"aggregate",
"metrics_pushed",
tags,
),
MetricsFiltered: selfstat.Register(
"aggregate",
"metrics_filtered",
tags,
),
MetricsDropped: selfstat.Register(
"aggregate",
"metrics_dropped",
tags,
),
PushTime: selfstat.Register(
"aggregate",
"push_time_ns",
tags,
),
log: logger,
}
}
// AggregatorConfig is the common config for all aggregators.
type AggregatorConfig struct {
Name string
Alias string
DropOriginal bool
Period time.Duration
Delay time.Duration
Grace time.Duration
NameOverride string
MeasurementPrefix string
MeasurementSuffix string
Tags map[string]string
Filter Filter
}
func (r *RunningAggregator) LogName() string {
return logName("aggregators", r.Config.Name, r.Config.Alias)
}
func (r *RunningAggregator) Init() error {
if p, ok := r.Aggregator.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}
func (r *RunningAggregator) Period() time.Duration {
return r.Config.Period
}
func (r *RunningAggregator) EndPeriod() time.Time {
return r.periodEnd
}
func (r *RunningAggregator) UpdateWindow(start, until time.Time) {
r.periodStart = start
r.periodEnd = until
r.log.Debugf("Updated aggregation range [%s, %s]", start, until)
}
func (r *RunningAggregator) MakeMetric(metric telegraf.Metric) telegraf.Metric {
m := makemetric(
metric,
r.Config.NameOverride,
r.Config.MeasurementPrefix,
r.Config.MeasurementSuffix,
r.Config.Tags,
nil)
if m != nil {
m.SetAggregate(true)
}
r.MetricsPushed.Incr(1)
return m
}
// Add a metric to the aggregator and return true if the original metric
// should be dropped.
func (r *RunningAggregator) Add(m telegraf.Metric) bool {
if ok := r.Config.Filter.Select(m); !ok {
return false
}
// Make a copy of the metric but don't retain tracking. We do not fail a
// delivery due to the aggregation not being sent because we can't create
// aggregations of historical data. Additionally, waiting for the
// aggregation to be pushed would introduce a hefty latency to delivery.
m = metric.FromMetric(m)
r.Config.Filter.Modify(m)
if len(m.FieldList()) == 0 {
r.MetricsFiltered.Incr(1)
return r.Config.DropOriginal
}
r.Lock()
defer r.Unlock()
if m.Time().Before(r.periodStart.Add(-r.Config.Grace)) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) {
r.log.Debugf("Metric is outside aggregation window; discarding. %s: m: %s e: %s g: %s",
m.Time(), r.periodStart, r.periodEnd, r.Config.Grace)
r.MetricsDropped.Incr(1)
return r.Config.DropOriginal
}
r.Aggregator.Add(m)
return r.Config.DropOriginal
}
func (r *RunningAggregator) Push(acc telegraf.Accumulator) {
r.Lock()
defer r.Unlock()
since := r.periodEnd
until := r.periodEnd.Add(r.Config.Period)
r.UpdateWindow(since, until)
r.push(acc)
r.Aggregator.Reset()
}
func (r *RunningAggregator) push(acc telegraf.Accumulator) {
start := time.Now()
r.Aggregator.Push(acc)
elapsed := time.Since(start)
r.PushTime.Incr(elapsed.Nanoseconds())
}
func (r *RunningAggregator) Log() telegraf.Logger {
return r.log
}

View File

@@ -0,0 +1,264 @@
package models
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestAdd(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
Period: time.Millisecond * 500,
})
require.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
now := time.Now()
ra.UpdateWindow(now, now.Add(ra.Config.Period))
m := testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
time.Now().Add(time.Millisecond*150),
telegraf.Untyped)
require.False(t, ra.Add(m))
ra.Push(&acc)
require.Equal(t, 1, len(acc.Metrics))
require.Equal(t, int64(101), acc.Metrics[0].Fields["sum"])
}
func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
Period: time.Millisecond * 500,
})
require.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
now := time.Now()
ra.UpdateWindow(now, now.Add(ra.Config.Period))
m := testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now.Add(-time.Hour),
telegraf.Untyped,
)
require.False(t, ra.Add(m))
// metric after current period
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now.Add(time.Hour),
telegraf.Untyped,
)
require.False(t, ra.Add(m))
// "now" metric
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
time.Now().Add(time.Millisecond*50),
telegraf.Untyped)
require.False(t, ra.Add(m))
ra.Push(&acc)
require.Equal(t, 1, len(acc.Metrics))
require.Equal(t, int64(101), acc.Metrics[0].Fields["sum"])
}
func TestAddMetricsOutsideCurrentPeriodWithGrace(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
Period: time.Millisecond * 1500,
Grace: time.Millisecond * 500,
})
require.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
now := time.Now()
ra.UpdateWindow(now, now.Add(ra.Config.Period))
m := testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now.Add(-time.Hour),
telegraf.Untyped,
)
require.False(t, ra.Add(m))
// metric before current period (late)
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(100),
},
now.Add(-time.Millisecond*1000),
telegraf.Untyped,
)
require.False(t, ra.Add(m))
// metric before current period, but within grace period (late)
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(102),
},
now.Add(-time.Millisecond*200),
telegraf.Untyped,
)
require.False(t, ra.Add(m))
// "now" metric
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
time.Now().Add(time.Millisecond*50),
telegraf.Untyped)
require.False(t, ra.Add(m))
ra.Push(&acc)
require.Equal(t, 1, len(acc.Metrics))
require.Equal(t, int64(203), acc.Metrics[0].Fields["sum"])
}
func TestAddAndPushOnePeriod(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
Period: time.Millisecond * 500,
})
require.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
now := time.Now()
ra.UpdateWindow(now, now.Add(ra.Config.Period))
m := testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
time.Now().Add(time.Millisecond*100),
telegraf.Untyped)
require.False(t, ra.Add(m))
ra.Push(&acc)
acc.AssertContainsFields(t, "TestMetric", map[string]interface{}{"sum": int64(101)})
}
func TestAddDropOriginal(t *testing.T) {
ra := NewRunningAggregator(&TestAggregator{}, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"RI*"},
},
DropOriginal: true,
})
require.NoError(t, ra.Config.Filter.Compile())
now := time.Now()
ra.UpdateWindow(now, now.Add(ra.Config.Period))
m := testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now,
telegraf.Untyped)
require.True(t, ra.Add(m))
// this metric name doesn't match the filter, so Add will return false
m2 := testutil.MustMetric("foobar",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now,
telegraf.Untyped)
require.False(t, ra.Add(m2))
}
func TestAddDoesNotModifyMetric(t *testing.T) {
ra := NewRunningAggregator(&TestAggregator{}, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
FieldPass: []string{"a"},
},
DropOriginal: true,
})
require.NoError(t, ra.Config.Filter.Compile())
now := time.Now()
m := testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"a": int64(42),
"b": int64(42),
},
now)
expected := m.Copy()
ra.Add(m)
testutil.RequireMetricEqual(t, expected, m)
}
type TestAggregator struct {
sum int64
}
func (t *TestAggregator) Description() string { return "" }
func (t *TestAggregator) SampleConfig() string { return "" }
func (t *TestAggregator) Reset() {
t.sum = 0
}
func (t *TestAggregator) Push(acc telegraf.Accumulator) {
acc.AddFields("TestMetric",
map[string]interface{}{"sum": t.sum},
map[string]string{},
)
}
func (t *TestAggregator) Add(in telegraf.Metric) {
for _, v := range in.Fields() {
if vi, ok := v.(int64); ok {
t.sum += vi
}
}
}

127
models/running_input.go Normal file
View File

@@ -0,0 +1,127 @@
package models
import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)
var (
GlobalMetricsGathered = selfstat.Register("agent", "metrics_gathered", map[string]string{})
GlobalGatherErrors = selfstat.Register("agent", "gather_errors", map[string]string{})
)
type RunningInput struct {
Input telegraf.Input
Config *InputConfig
log telegraf.Logger
defaultTags map[string]string
MetricsGathered selfstat.Stat
GatherTime selfstat.Stat
}
func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
tags := map[string]string{"input": config.Name}
if config.Alias != "" {
tags["alias"] = config.Alias
}
inputErrorsRegister := selfstat.Register("gather", "errors", tags)
logger := NewLogger("inputs", config.Name, config.Alias)
logger.OnErr(func() {
inputErrorsRegister.Incr(1)
GlobalGatherErrors.Incr(1)
})
setLogIfExist(input, logger)
return &RunningInput{
Input: input,
Config: config,
MetricsGathered: selfstat.Register(
"gather",
"metrics_gathered",
tags,
),
GatherTime: selfstat.RegisterTiming(
"gather",
"gather_time_ns",
tags,
),
log: logger,
}
}
// InputConfig is the common config for all inputs.
type InputConfig struct {
Name string
Alias string
Interval time.Duration
NameOverride string
MeasurementPrefix string
MeasurementSuffix string
Tags map[string]string
Filter Filter
}
func (r *RunningInput) metricFiltered(metric telegraf.Metric) {
metric.Drop()
}
func (r *RunningInput) LogName() string {
return logName("inputs", r.Config.Name, r.Config.Alias)
}
func (r *RunningInput) Init() error {
if p, ok := r.Input.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}
func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric {
if ok := r.Config.Filter.Select(metric); !ok {
r.metricFiltered(metric)
return nil
}
m := makemetric(
metric,
r.Config.NameOverride,
r.Config.MeasurementPrefix,
r.Config.MeasurementSuffix,
r.Config.Tags,
r.defaultTags)
r.Config.Filter.Modify(metric)
if len(metric.FieldList()) == 0 {
r.metricFiltered(metric)
return nil
}
r.MetricsGathered.Incr(1)
GlobalMetricsGathered.Incr(1)
return m
}
func (r *RunningInput) Gather(acc telegraf.Accumulator) error {
start := time.Now()
err := r.Input.Gather(acc)
elapsed := time.Since(start)
r.GatherTime.Incr(elapsed.Nanoseconds())
return err
}
func (r *RunningInput) SetDefaultTags(tags map[string]string) {
r.defaultTags = tags
}
func (r *RunningInput) Log() telegraf.Logger {
return r.log
}

View File

@@ -0,0 +1,294 @@
package models
import (
"testing"
"time"
"github.com/influxdata/telegraf/selfstat"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMakeMetricFilterAfterApplyingGlobalTags(t *testing.T) {
now := time.Now()
ri := NewRunningInput(&testInput{}, &InputConfig{
Filter: Filter{
TagInclude: []string{"b"},
},
})
require.NoError(t, ri.Config.Filter.Compile())
ri.SetDefaultTags(map[string]string{"a": "x", "b": "y"})
m, err := metric.New("cpu",
map[string]string{},
map[string]interface{}{
"value": 42,
},
now)
require.NoError(t, err)
actual := ri.MakeMetric(m)
expected, err := metric.New("cpu",
map[string]string{
"b": "y",
},
map[string]interface{}{
"value": 42,
},
now)
require.NoError(t, err)
testutil.RequireMetricEqual(t, expected, actual)
}
func TestMakeMetricNoFields(t *testing.T) {
now := time.Now()
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
})
m, err := metric.New("RITest",
map[string]string{},
map[string]interface{}{},
now,
telegraf.Untyped)
m = ri.MakeMetric(m)
require.NoError(t, err)
assert.Nil(t, m)
}
// nil fields should get dropped
func TestMakeMetricNilFields(t *testing.T) {
now := time.Now()
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
})
m, err := metric.New("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
"nil": nil,
},
now,
telegraf.Untyped)
require.NoError(t, err)
m = ri.MakeMetric(m)
expected, err := metric.New("RITest",
map[string]string{},
map[string]interface{}{
"value": int(101),
},
now,
)
require.NoError(t, err)
require.Equal(t, expected, m)
}
func TestMakeMetricWithPluginTags(t *testing.T) {
now := time.Now()
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
Tags: map[string]string{
"foo": "bar",
},
})
m := testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now,
telegraf.Untyped)
m = ri.MakeMetric(m)
expected, err := metric.New("RITest",
map[string]string{
"foo": "bar",
},
map[string]interface{}{
"value": 101,
},
now,
)
require.NoError(t, err)
require.Equal(t, expected, m)
}
func TestMakeMetricFilteredOut(t *testing.T) {
now := time.Now()
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
Tags: map[string]string{
"foo": "bar",
},
Filter: Filter{NamePass: []string{"foobar"}},
})
assert.NoError(t, ri.Config.Filter.Compile())
m, err := metric.New("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now,
telegraf.Untyped)
m = ri.MakeMetric(m)
require.NoError(t, err)
assert.Nil(t, m)
}
func TestMakeMetricWithDaemonTags(t *testing.T) {
now := time.Now()
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
})
ri.SetDefaultTags(map[string]string{
"foo": "bar",
})
m := testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now,
telegraf.Untyped)
m = ri.MakeMetric(m)
expected, err := metric.New("RITest",
map[string]string{
"foo": "bar",
},
map[string]interface{}{
"value": 101,
},
now,
)
require.NoError(t, err)
require.Equal(t, expected, m)
}
func TestMakeMetricNameOverride(t *testing.T) {
now := time.Now()
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
NameOverride: "foobar",
})
m, err := metric.New("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now,
telegraf.Untyped)
require.NoError(t, err)
m = ri.MakeMetric(m)
expected, err := metric.New("foobar",
nil,
map[string]interface{}{
"value": 101,
},
now,
)
require.NoError(t, err)
require.Equal(t, expected, m)
}
func TestMakeMetricNamePrefix(t *testing.T) {
now := time.Now()
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
MeasurementPrefix: "foobar_",
})
m, err := metric.New("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now,
telegraf.Untyped)
require.NoError(t, err)
m = ri.MakeMetric(m)
expected, err := metric.New("foobar_RITest",
nil,
map[string]interface{}{
"value": 101,
},
now,
)
require.NoError(t, err)
require.Equal(t, expected, m)
}
func TestMakeMetricNameSuffix(t *testing.T) {
now := time.Now()
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
MeasurementSuffix: "_foobar",
})
m, err := metric.New("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now,
telegraf.Untyped)
require.NoError(t, err)
m = ri.MakeMetric(m)
expected, err := metric.New("RITest_foobar",
nil,
map[string]interface{}{
"value": 101,
},
now,
)
require.NoError(t, err)
require.Equal(t, expected, m)
}
func TestMetricErrorCounters(t *testing.T) {
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestMetricErrorCounters",
})
getGatherErrors := func() int64 {
for _, r := range selfstat.Metrics() {
tag, hasTag := r.GetTag("input")
if r.Name() == "internal_gather" && hasTag && tag == "TestMetricErrorCounters" {
errCount, ok := r.GetField("errors")
if !ok {
t.Fatal("Expected error field")
}
return errCount.(int64)
}
}
return 0
}
before := getGatherErrors()
ri.Log().Error("Oh no")
after := getGatherErrors()
require.Greater(t, after, before)
require.GreaterOrEqual(t, int64(1), GlobalGatherErrors.Get())
}
type testInput struct{}
func (t *testInput) Description() string { return "" }
func (t *testInput) SampleConfig() string { return "" }
func (t *testInput) Gather(acc telegraf.Accumulator) error { return nil }

263
models/running_output.go Normal file
View File

@@ -0,0 +1,263 @@
package models
import (
"sync"
"sync/atomic"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)
const (
// Default size of metrics batch size.
DEFAULT_METRIC_BATCH_SIZE = 1000
// Default number of metrics kept. It should be a multiple of batch size.
DEFAULT_METRIC_BUFFER_LIMIT = 10000
)
// OutputConfig containing name and filter
type OutputConfig struct {
Name string
Alias string
Filter Filter
FlushInterval time.Duration
FlushJitter *time.Duration
MetricBufferLimit int
MetricBatchSize int
NameOverride string
NamePrefix string
NameSuffix string
}
// RunningOutput contains the output configuration
type RunningOutput struct {
// Must be 64-bit aligned
newMetricsCount int64
droppedMetrics int64
Output telegraf.Output
Config *OutputConfig
MetricBufferLimit int
MetricBatchSize int
MetricsFiltered selfstat.Stat
WriteTime selfstat.Stat
BatchReady chan time.Time
buffer *Buffer
log telegraf.Logger
aggMutex sync.Mutex
}
func NewRunningOutput(
name string,
output telegraf.Output,
config *OutputConfig,
batchSize int,
bufferLimit int,
) *RunningOutput {
tags := map[string]string{"output": config.Name}
if config.Alias != "" {
tags["alias"] = config.Alias
}
writeErrorsRegister := selfstat.Register("write", "errors", tags)
logger := NewLogger("outputs", config.Name, config.Alias)
logger.OnErr(func() {
writeErrorsRegister.Incr(1)
})
setLogIfExist(output, logger)
if config.MetricBufferLimit > 0 {
bufferLimit = config.MetricBufferLimit
}
if bufferLimit == 0 {
bufferLimit = DEFAULT_METRIC_BUFFER_LIMIT
}
if config.MetricBatchSize > 0 {
batchSize = config.MetricBatchSize
}
if batchSize == 0 {
batchSize = DEFAULT_METRIC_BATCH_SIZE
}
ro := &RunningOutput{
buffer: NewBuffer(config.Name, config.Alias, bufferLimit),
BatchReady: make(chan time.Time, 1),
Output: output,
Config: config,
MetricBufferLimit: bufferLimit,
MetricBatchSize: batchSize,
MetricsFiltered: selfstat.Register(
"write",
"metrics_filtered",
tags,
),
WriteTime: selfstat.RegisterTiming(
"write",
"write_time_ns",
tags,
),
log: logger,
}
return ro
}
func (r *RunningOutput) LogName() string {
return logName("outputs", r.Config.Name, r.Config.Alias)
}
func (ro *RunningOutput) metricFiltered(metric telegraf.Metric) {
ro.MetricsFiltered.Incr(1)
metric.Drop()
}
func (r *RunningOutput) Init() error {
if p, ok := r.Output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}
// AddMetric adds a metric to the output.
//
// Takes ownership of metric
func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
if ok := ro.Config.Filter.Select(metric); !ok {
ro.metricFiltered(metric)
return
}
ro.Config.Filter.Modify(metric)
if len(metric.FieldList()) == 0 {
ro.metricFiltered(metric)
return
}
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
ro.aggMutex.Lock()
output.Add(metric)
ro.aggMutex.Unlock()
return
}
if len(ro.Config.NameOverride) > 0 {
metric.SetName(ro.Config.NameOverride)
}
if len(ro.Config.NamePrefix) > 0 {
metric.AddPrefix(ro.Config.NamePrefix)
}
if len(ro.Config.NameSuffix) > 0 {
metric.AddSuffix(ro.Config.NameSuffix)
}
dropped := ro.buffer.Add(metric)
atomic.AddInt64(&ro.droppedMetrics, int64(dropped))
count := atomic.AddInt64(&ro.newMetricsCount, 1)
if count == int64(ro.MetricBatchSize) {
atomic.StoreInt64(&ro.newMetricsCount, 0)
select {
case ro.BatchReady <- time.Now():
default:
}
}
}
// Write writes all metrics to the output, stopping when all have been sent on
// or error.
func (ro *RunningOutput) Write() error {
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
ro.aggMutex.Lock()
metrics := output.Push()
ro.buffer.Add(metrics...)
output.Reset()
ro.aggMutex.Unlock()
}
atomic.StoreInt64(&ro.newMetricsCount, 0)
// Only process the metrics in the buffer now. Metrics added while we are
// writing will be sent on the next call.
nBuffer := ro.buffer.Len()
nBatches := nBuffer/ro.MetricBatchSize + 1
for i := 0; i < nBatches; i++ {
batch := ro.buffer.Batch(ro.MetricBatchSize)
if len(batch) == 0 {
break
}
err := ro.write(batch)
if err != nil {
ro.buffer.Reject(batch)
return err
}
ro.buffer.Accept(batch)
}
return nil
}
// WriteBatch writes a single batch of metrics to the output.
func (ro *RunningOutput) WriteBatch() error {
batch := ro.buffer.Batch(ro.MetricBatchSize)
if len(batch) == 0 {
return nil
}
err := ro.write(batch)
if err != nil {
ro.buffer.Reject(batch)
return err
}
ro.buffer.Accept(batch)
return nil
}
// Close closes the output
func (r *RunningOutput) Close() {
err := r.Output.Close()
if err != nil {
r.log.Errorf("Error closing output: %v", err)
}
}
func (r *RunningOutput) write(metrics []telegraf.Metric) error {
dropped := atomic.LoadInt64(&r.droppedMetrics)
if dropped > 0 {
r.log.Warnf("Metric buffer overflow; %d metrics have been dropped", dropped)
atomic.StoreInt64(&r.droppedMetrics, 0)
}
start := time.Now()
err := r.Output.Write(metrics)
elapsed := time.Since(start)
r.WriteTime.Incr(elapsed.Nanoseconds())
if err == nil {
r.log.Debugf("Wrote batch of %d metrics in %s", len(metrics), elapsed)
}
return err
}
func (r *RunningOutput) LogBufferStatus() {
nBuffer := r.buffer.Len()
r.log.Debugf("Buffer fullness: %d / %d metrics", nBuffer, r.MetricBufferLimit)
}
func (r *RunningOutput) Log() telegraf.Logger {
return r.log
}

View File

@@ -0,0 +1,589 @@
package models
import (
"fmt"
"sync"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var first5 = []telegraf.Metric{
testutil.TestMetric(101, "metric1"),
testutil.TestMetric(101, "metric2"),
testutil.TestMetric(101, "metric3"),
testutil.TestMetric(101, "metric4"),
testutil.TestMetric(101, "metric5"),
}
var next5 = []telegraf.Metric{
testutil.TestMetric(101, "metric6"),
testutil.TestMetric(101, "metric7"),
testutil.TestMetric(101, "metric8"),
testutil.TestMetric(101, "metric9"),
testutil.TestMetric(101, "metric10"),
}
func reverse(metrics []telegraf.Metric) []telegraf.Metric {
result := make([]telegraf.Metric, 0, len(metrics))
for i := len(metrics) - 1; i >= 0; i-- {
result = append(result, metrics[i])
}
return result
}
// Benchmark adding metrics.
func BenchmarkRunningOutputAddWrite(b *testing.B) {
conf := &OutputConfig{
Filter: Filter{},
}
m := &perfOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
for n := 0; n < b.N; n++ {
ro.AddMetric(testutil.TestMetric(101, "metric1"))
ro.Write()
}
}
// Benchmark adding metrics.
func BenchmarkRunningOutputAddWriteEvery100(b *testing.B) {
conf := &OutputConfig{
Filter: Filter{},
}
m := &perfOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
for n := 0; n < b.N; n++ {
ro.AddMetric(testutil.TestMetric(101, "metric1"))
if n%100 == 0 {
ro.Write()
}
}
}
// Benchmark adding metrics.
func BenchmarkRunningOutputAddFailWrites(b *testing.B) {
conf := &OutputConfig{
Filter: Filter{},
}
m := &perfOutput{}
m.failWrite = true
ro := NewRunningOutput("test", m, conf, 1000, 10000)
for n := 0; n < b.N; n++ {
ro.AddMetric(testutil.TestMetric(101, "metric1"))
}
}
// Test that NameDrop filters ger properly applied.
func TestRunningOutput_DropFilter(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
NameDrop: []string{"metric1", "metric2"},
},
}
assert.NoError(t, conf.Filter.Compile())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
for _, metric := range first5 {
ro.AddMetric(metric)
}
for _, metric := range next5 {
ro.AddMetric(metric)
}
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 8)
}
// Test that NameDrop filters without a match do nothing.
func TestRunningOutput_PassFilter(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
NameDrop: []string{"metric1000", "foo*"},
},
}
assert.NoError(t, conf.Filter.Compile())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
for _, metric := range first5 {
ro.AddMetric(metric)
}
for _, metric := range next5 {
ro.AddMetric(metric)
}
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 10)
}
// Test that tags are properly included
func TestRunningOutput_TagIncludeNoMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
TagInclude: []string{"nothing*"},
},
}
assert.NoError(t, conf.Filter.Compile())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.AddMetric(testutil.TestMetric(101, "metric1"))
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 1)
assert.Empty(t, m.Metrics()[0].Tags())
}
// Test that tags are properly excluded
func TestRunningOutput_TagExcludeMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
TagExclude: []string{"tag*"},
},
}
assert.NoError(t, conf.Filter.Compile())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.AddMetric(testutil.TestMetric(101, "metric1"))
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 1)
assert.Len(t, m.Metrics()[0].Tags(), 0)
}
// Test that tags are properly Excluded
func TestRunningOutput_TagExcludeNoMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
TagExclude: []string{"nothing*"},
},
}
assert.NoError(t, conf.Filter.Compile())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.AddMetric(testutil.TestMetric(101, "metric1"))
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 1)
assert.Len(t, m.Metrics()[0].Tags(), 1)
}
// Test that tags are properly included
func TestRunningOutput_TagIncludeMatch(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
TagInclude: []string{"tag*"},
},
}
assert.NoError(t, conf.Filter.Compile())
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.AddMetric(testutil.TestMetric(101, "metric1"))
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 1)
assert.Len(t, m.Metrics()[0].Tags(), 1)
}
// Test that measurement name overriding correctly
func TestRunningOutput_NameOverride(t *testing.T) {
conf := &OutputConfig{
NameOverride: "new_metric_name",
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.AddMetric(testutil.TestMetric(101, "metric1"))
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 1)
assert.Equal(t, "new_metric_name", m.Metrics()[0].Name())
}
// Test that measurement name prefix is added correctly
func TestRunningOutput_NamePrefix(t *testing.T) {
conf := &OutputConfig{
NamePrefix: "prefix_",
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.AddMetric(testutil.TestMetric(101, "metric1"))
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 1)
assert.Equal(t, "prefix_metric1", m.Metrics()[0].Name())
}
// Test that measurement name suffix is added correctly
func TestRunningOutput_NameSuffix(t *testing.T) {
conf := &OutputConfig{
NameSuffix: "_suffix",
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
ro.AddMetric(testutil.TestMetric(101, "metric1"))
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 1)
assert.Equal(t, "metric1_suffix", m.Metrics()[0].Name())
}
// Test that we can write metrics with simple default setup.
func TestRunningOutputDefault(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{},
}
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf, 1000, 10000)
for _, metric := range first5 {
ro.AddMetric(metric)
}
for _, metric := range next5 {
ro.AddMetric(metric)
}
assert.Len(t, m.Metrics(), 0)
err := ro.Write()
assert.NoError(t, err)
assert.Len(t, m.Metrics(), 10)
}
func TestRunningOutputWriteFail(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{},
}
m := &mockOutput{}
m.failWrite = true
ro := NewRunningOutput("test", m, conf, 4, 12)
// Fill buffer to limit twice
for _, metric := range first5 {
ro.AddMetric(metric)
}
for _, metric := range next5 {
ro.AddMetric(metric)
}
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// manual write fails
err := ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
m.failWrite = false
err = ro.Write()
require.NoError(t, err)
assert.Len(t, m.Metrics(), 10)
}
// Verify that the order of points is preserved during a write failure.
func TestRunningOutputWriteFailOrder(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{},
}
m := &mockOutput{}
m.failWrite = true
ro := NewRunningOutput("test", m, conf, 100, 1000)
// add 5 metrics
for _, metric := range first5 {
ro.AddMetric(metric)
}
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// Write fails
err := ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
m.failWrite = false
// add 5 more metrics
for _, metric := range next5 {
ro.AddMetric(metric)
}
err = ro.Write()
require.NoError(t, err)
// Verify that 10 metrics were written
assert.Len(t, m.Metrics(), 10)
// Verify that they are in order
expected := append(reverse(next5), reverse(first5)...)
assert.Equal(t, expected, m.Metrics())
}
// Verify that the order of points is preserved during many write failures.
func TestRunningOutputWriteFailOrder2(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{},
}
m := &mockOutput{}
m.failWrite = true
ro := NewRunningOutput("test", m, conf, 5, 100)
// add 5 metrics
for _, metric := range first5 {
ro.AddMetric(metric)
}
// Write fails
err := ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// add 5 metrics
for _, metric := range next5 {
ro.AddMetric(metric)
}
// Write fails
err = ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// add 5 metrics
for _, metric := range first5 {
ro.AddMetric(metric)
}
// Write fails
err = ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// add 5 metrics
for _, metric := range next5 {
ro.AddMetric(metric)
}
// Write fails
err = ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
m.failWrite = false
err = ro.Write()
require.NoError(t, err)
// Verify that 20 metrics were written
assert.Len(t, m.Metrics(), 20)
// Verify that they are in order
expected := append(reverse(next5), reverse(first5)...)
expected = append(expected, reverse(next5)...)
expected = append(expected, reverse(first5)...)
assert.Equal(t, expected, m.Metrics())
}
// Verify that the order of points is preserved when there is a remainder
// of points for the batch.
func TestRunningOutputWriteFailOrder3(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{},
}
m := &mockOutput{}
m.failWrite = true
ro := NewRunningOutput("test", m, conf, 5, 1000)
// add 5 metrics
for _, metric := range first5 {
ro.AddMetric(metric)
}
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// Write fails
err := ro.Write()
require.Error(t, err)
// no successful flush yet
assert.Len(t, m.Metrics(), 0)
// add and attempt to write a single metric:
ro.AddMetric(next5[0])
err = ro.Write()
require.Error(t, err)
// unset fail and write metrics
m.failWrite = false
err = ro.Write()
require.NoError(t, err)
// Verify that 6 metrics were written
assert.Len(t, m.Metrics(), 6)
// Verify that they are in order
expected := []telegraf.Metric{next5[0], first5[4], first5[3], first5[2], first5[1], first5[0]}
assert.Equal(t, expected, m.Metrics())
}
func TestInternalMetrics(t *testing.T) {
_ = NewRunningOutput(
"test_internal",
&mockOutput{},
&OutputConfig{
Filter: Filter{},
Name: "test_name",
Alias: "test_alias",
},
5,
10)
expected := []telegraf.Metric{
testutil.MustMetric(
"internal_write",
map[string]string{
"output": "test_name",
"alias": "test_alias",
},
map[string]interface{}{
"buffer_limit": 10,
"buffer_size": 0,
"errors": 0,
"metrics_added": 0,
"metrics_dropped": 0,
"metrics_filtered": 0,
"metrics_written": 0,
"write_time_ns": 0,
},
time.Unix(0, 0),
),
}
var actual []telegraf.Metric
for _, m := range selfstat.Metrics() {
output, _ := m.GetTag("output")
if m.Name() == "internal_write" && output == "test_name" {
actual = append(actual, m)
}
}
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
}
type mockOutput struct {
sync.Mutex
metrics []telegraf.Metric
// if true, mock a write failure
failWrite bool
}
func (m *mockOutput) Connect() error {
return nil
}
func (m *mockOutput) Close() error {
return nil
}
func (m *mockOutput) Description() string {
return ""
}
func (m *mockOutput) SampleConfig() string {
return ""
}
func (m *mockOutput) Write(metrics []telegraf.Metric) error {
m.Lock()
defer m.Unlock()
if m.failWrite {
return fmt.Errorf("Failed Write!")
}
if m.metrics == nil {
m.metrics = []telegraf.Metric{}
}
for _, metric := range metrics {
m.metrics = append(m.metrics, metric)
}
return nil
}
func (m *mockOutput) Metrics() []telegraf.Metric {
m.Lock()
defer m.Unlock()
return m.metrics
}
type perfOutput struct {
// if true, mock a write failure
failWrite bool
}
func (m *perfOutput) Connect() error {
return nil
}
func (m *perfOutput) Close() error {
return nil
}
func (m *perfOutput) Description() string {
return ""
}
func (m *perfOutput) SampleConfig() string {
return ""
}
func (m *perfOutput) Write(metrics []telegraf.Metric) error {
if m.failWrite {
return fmt.Errorf("Failed Write!")
}
return nil
}

104
models/running_processor.go Normal file
View File

@@ -0,0 +1,104 @@
package models
import (
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)
type RunningProcessor struct {
sync.Mutex
log telegraf.Logger
Processor telegraf.Processor
Config *ProcessorConfig
}
type RunningProcessors []*RunningProcessor
func (rp RunningProcessors) Len() int { return len(rp) }
func (rp RunningProcessors) Swap(i, j int) { rp[i], rp[j] = rp[j], rp[i] }
func (rp RunningProcessors) Less(i, j int) bool { return rp[i].Config.Order < rp[j].Config.Order }
// FilterConfig containing a name and filter
type ProcessorConfig struct {
Name string
Alias string
Order int64
Filter Filter
}
func NewRunningProcessor(processor telegraf.Processor, config *ProcessorConfig) *RunningProcessor {
tags := map[string]string{"processor": config.Name}
if config.Alias != "" {
tags["alias"] = config.Alias
}
processErrorsRegister := selfstat.Register("process", "errors", tags)
logger := NewLogger("processors", config.Name, config.Alias)
logger.OnErr(func() {
processErrorsRegister.Incr(1)
})
setLogIfExist(processor, logger)
return &RunningProcessor{
Processor: processor,
Config: config,
log: logger,
}
}
func (rp *RunningProcessor) metricFiltered(metric telegraf.Metric) {
metric.Drop()
}
func containsMetric(item telegraf.Metric, metrics []telegraf.Metric) bool {
for _, m := range metrics {
if item == m {
return true
}
}
return false
}
func (r *RunningProcessor) Init() error {
if p, ok := r.Processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}
func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
rp.Lock()
defer rp.Unlock()
ret := []telegraf.Metric{}
for _, metric := range in {
// In processors when a filter selects a metric it is sent through the
// processor. Otherwise the metric continues downstream unmodified.
if ok := rp.Config.Filter.Select(metric); !ok {
ret = append(ret, metric)
continue
}
rp.Config.Filter.Modify(metric)
if len(metric.FieldList()) == 0 {
rp.metricFiltered(metric)
continue
}
// This metric should pass through the filter, so call the filter Apply
// function and append results to the output slice.
ret = append(ret, rp.Processor.Apply(metric)...)
}
return ret
}
func (r *RunningProcessor) Log() telegraf.Logger {
return r.log
}

View File

@@ -0,0 +1,189 @@
package models
import (
"sort"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
// MockProcessor is a Processor with an overrideable Apply implementation.
type MockProcessor struct {
ApplyF func(in ...telegraf.Metric) []telegraf.Metric
}
func (p *MockProcessor) SampleConfig() string {
return ""
}
func (p *MockProcessor) Description() string {
return ""
}
func (p *MockProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
return p.ApplyF(in...)
}
// TagProcessor returns a Processor whose Apply function adds the tag and
// value.
func TagProcessor(key, value string) *MockProcessor {
return &MockProcessor{
ApplyF: func(in ...telegraf.Metric) []telegraf.Metric {
for _, m := range in {
m.AddTag(key, value)
}
return in
},
}
}
func TestRunningProcessor_Apply(t *testing.T) {
type args struct {
Processor telegraf.Processor
Config *ProcessorConfig
}
tests := []struct {
name string
args args
input []telegraf.Metric
expected []telegraf.Metric
}{
{
name: "inactive filter applies metrics",
args: args{
Processor: TagProcessor("apply", "true"),
Config: &ProcessorConfig{
Filter: Filter{},
},
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
expected: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"apply": "true",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
},
{
name: "filter applies",
args: args{
Processor: TagProcessor("apply", "true"),
Config: &ProcessorConfig{
Filter: Filter{
NamePass: []string{"cpu"},
},
},
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
expected: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"apply": "true",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
},
{
name: "filter doesn't apply",
args: args{
Processor: TagProcessor("apply", "true"),
Config: &ProcessorConfig{
Filter: Filter{
NameDrop: []string{"cpu"},
},
},
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
expected: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rp := &RunningProcessor{
Processor: tt.args.Processor,
Config: tt.args.Config,
}
rp.Config.Filter.Compile()
actual := rp.Apply(tt.input...)
require.Equal(t, tt.expected, actual)
})
}
}
func TestRunningProcessor_Order(t *testing.T) {
rp1 := &RunningProcessor{
Config: &ProcessorConfig{
Order: 1,
},
}
rp2 := &RunningProcessor{
Config: &ProcessorConfig{
Order: 2,
},
}
rp3 := &RunningProcessor{
Config: &ProcessorConfig{
Order: 3,
},
}
procs := RunningProcessors{rp2, rp3, rp1}
sort.Sort(procs)
require.Equal(t,
RunningProcessors{rp1, rp2, rp3},
procs)
}