2016-01-27 21:21:36 +00:00
|
|
|
package internal_models
|
2016-01-22 18:54:12 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"log"
|
|
|
|
"time"
|
|
|
|
|
2016-01-27 21:21:36 +00:00
|
|
|
"github.com/influxdata/telegraf"
|
2016-01-22 18:54:12 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
const DEFAULT_POINT_BUFFER_LIMIT = 10000
|
|
|
|
|
|
|
|
type RunningOutput struct {
|
|
|
|
Name string
|
2016-01-27 21:21:36 +00:00
|
|
|
Output telegraf.Output
|
2016-01-22 18:54:12 +00:00
|
|
|
Config *OutputConfig
|
|
|
|
Quiet bool
|
|
|
|
PointBufferLimit int
|
|
|
|
|
2016-01-27 23:15:14 +00:00
|
|
|
metrics []telegraf.Metric
|
2016-01-22 18:54:12 +00:00
|
|
|
overwriteCounter int
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewRunningOutput(
|
|
|
|
name string,
|
2016-01-27 21:21:36 +00:00
|
|
|
output telegraf.Output,
|
2016-01-22 18:54:12 +00:00
|
|
|
conf *OutputConfig,
|
|
|
|
) *RunningOutput {
|
|
|
|
ro := &RunningOutput{
|
|
|
|
Name: name,
|
2016-01-27 23:15:14 +00:00
|
|
|
metrics: make([]telegraf.Metric, 0),
|
2016-01-22 18:54:12 +00:00
|
|
|
Output: output,
|
|
|
|
Config: conf,
|
|
|
|
PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT,
|
|
|
|
}
|
|
|
|
return ro
|
|
|
|
}
|
|
|
|
|
2016-01-27 23:15:14 +00:00
|
|
|
func (ro *RunningOutput) AddPoint(point telegraf.Metric) {
|
2016-01-22 18:54:12 +00:00
|
|
|
if ro.Config.Filter.IsActive {
|
2016-01-27 23:15:14 +00:00
|
|
|
if !ro.Config.Filter.ShouldMetricPass(point) {
|
2016-01-22 18:54:12 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-01-27 23:15:14 +00:00
|
|
|
if len(ro.metrics) < ro.PointBufferLimit {
|
|
|
|
ro.metrics = append(ro.metrics, point)
|
2016-01-22 18:54:12 +00:00
|
|
|
} else {
|
2016-01-28 20:23:19 +00:00
|
|
|
log.Printf("WARNING: overwriting cached metrics, you may want to " +
|
|
|
|
"increase the metric_buffer_limit setting in your [agent] config " +
|
|
|
|
"if you do not wish to overwrite metrics.\n")
|
2016-01-27 23:15:14 +00:00
|
|
|
if ro.overwriteCounter == len(ro.metrics) {
|
2016-01-22 18:54:12 +00:00
|
|
|
ro.overwriteCounter = 0
|
|
|
|
}
|
2016-01-27 23:15:14 +00:00
|
|
|
ro.metrics[ro.overwriteCounter] = point
|
2016-01-22 18:54:12 +00:00
|
|
|
ro.overwriteCounter++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ro *RunningOutput) Write() error {
|
|
|
|
start := time.Now()
|
2016-01-27 23:15:14 +00:00
|
|
|
err := ro.Output.Write(ro.metrics)
|
2016-01-22 18:54:12 +00:00
|
|
|
elapsed := time.Since(start)
|
|
|
|
if err == nil {
|
|
|
|
if !ro.Quiet {
|
|
|
|
log.Printf("Wrote %d metrics to output %s in %s\n",
|
2016-01-27 23:15:14 +00:00
|
|
|
len(ro.metrics), ro.Name, elapsed)
|
2016-01-22 18:54:12 +00:00
|
|
|
}
|
2016-01-27 23:15:14 +00:00
|
|
|
ro.metrics = make([]telegraf.Metric, 0)
|
2016-01-22 18:54:12 +00:00
|
|
|
ro.overwriteCounter = 0
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// OutputConfig containing name and filter
|
|
|
|
type OutputConfig struct {
|
|
|
|
Name string
|
|
|
|
Filter Filter
|
|
|
|
}
|