Add grace period for metrics late for aggregation (#6049)

This commit is contained in:
pberlowski 2019-07-31 20:52:12 +01:00 committed by Daniel Nelson
parent 282c8ce096
commit dc292b73a9
4 changed files with 84 additions and 3 deletions

View File

@ -325,6 +325,10 @@ Parameters that can be used with any aggregator plugin:
how long for aggregators to wait before receiving metrics from input
plugins, in the case that aggregators are flushing and inputs are gathering
on the same interval.
- **grace**: The duration when the metrics will still be aggregated
by the plugin, even though they're outside of the aggregation period. This
is needed in a situation when the agent is expected to receive late metrics
and it's acceptable to roll them up into next aggregation period.
- **drop_original**: If true, the original metric will be dropped by the
aggregator and will not get sent to the output plugins.
- **name_override**: Override the base name of the measurement. (Default is

View File

@ -1025,6 +1025,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
Name: name,
Delay: time.Millisecond * 100,
Period: time.Second * 30,
Grace: time.Second * 0,
}
if node, ok := tbl.Fields["period"]; ok {
@ -1053,6 +1054,18 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
}
}
if node, ok := tbl.Fields["grace"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
conf.Grace = dur
}
}
}
if node, ok := tbl.Fields["drop_original"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
@ -1100,6 +1113,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
delete(tbl.Fields, "period")
delete(tbl.Fields, "delay")
delete(tbl.Fields, "grace")
delete(tbl.Fields, "drop_original")
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")

View File

@ -59,6 +59,7 @@ type AggregatorConfig struct {
DropOriginal bool
Period time.Duration
Delay time.Duration
Grace time.Duration
NameOverride string
MeasurementPrefix string
@ -135,9 +136,9 @@ func (r *RunningAggregator) Add(m telegraf.Metric) bool {
r.Lock()
defer r.Unlock()
if m.Time().Before(r.periodStart) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) {
log.Printf("D! [%s] metric is outside aggregation window; discarding. %s: m: %s e: %s",
r.Name(), m.Time(), r.periodStart, r.periodEnd)
if m.Time().Before(r.periodStart.Add(-r.Config.Grace)) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) {
log.Printf("D! [%s] metric is outside aggregation window; discarding. %s: m: %s e: %s g: %s",
r.Name(), m.Time(), r.periodStart, r.periodEnd, r.Config.Grace)
r.MetricsDropped.Incr(1)
return r.Config.DropOriginal
}

View File

@ -89,6 +89,68 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
require.Equal(t, int64(101), acc.Metrics[0].Fields["sum"])
}
func TestAddMetricsOutsideCurrentPeriodWithGrace(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
Period: time.Millisecond * 1500,
Grace: time.Millisecond * 500,
})
require.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
now := time.Now()
ra.UpdateWindow(now, now.Add(ra.Config.Period))
m := testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
now.Add(-time.Hour),
telegraf.Untyped,
)
require.False(t, ra.Add(m))
// metric before current period (late)
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(100),
},
now.Add(-time.Millisecond*1000),
telegraf.Untyped,
)
require.False(t, ra.Add(m))
// metric before current period, but within grace period (late)
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(102),
},
now.Add(-time.Millisecond*200),
telegraf.Untyped,
)
require.False(t, ra.Add(m))
// "now" metric
m = testutil.MustMetric("RITest",
map[string]string{},
map[string]interface{}{
"value": int64(101),
},
time.Now().Add(time.Millisecond*50),
telegraf.Untyped)
require.False(t, ra.Add(m))
ra.Push(&acc)
require.Equal(t, 1, len(acc.Metrics))
require.Equal(t, int64(203), acc.Metrics[0].Fields["sum"])
}
func TestAddAndPushOnePeriod(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{