Added request_aggregates input plugin
This commit is contained in:
@ -0,0 +1,90 @@
# Request aggregates plugin
The request aggregates plugin generates a set of aggregate values for a response time column in a CSV file within a
given interval. This is especially useful when calculating throughput of systems with high request frequency
for which storing every single request might require an unnecessary infrastructure. Aggregating values on the client
side minimises the number of writes to the InfluxDB server.
The plugin generates data points at the end of the given window. If no lines were added to the file during a specific
window, no data points are generated.
### Configuration:
# Aggregates values for requests written to a log file
# File to monitor.
file = "/var/server/access.csv"
# Position of the timestamp of the request in every line
timestamp_position = 0
# Format of the timestamp (any layout accepted by Go Time.Parse or s/ms/us/ns for epoch time)
timestamp_format = "ms"
# Position of the time value to calculate in the log file (starting from 0)
time_position = 1
# Window to consider for time percentiles
time_window_size = "60s"
# Windows to keep in memory before flushing in order to avoid requests coming in after a window is shut.
# If the CSV file is sorted by timestamp, this can be set to 1
time_windows = 5
# List of percentiles to calculate
time_percentiles = [90.0, 95.0, 99.0, 99.99]
# Position of the result column (success or failure)
result_position = 3
# Regular expression used to determine if the result is successful or not (if empty only request_aggregates_all
# time series) will be generated
result_success_regex = ".*true.*"
# Time window to calculate throughput counters
throughput_window_size = "1s"
# Number of windows to keep in memory for throughput calculation
throughput_windows = 300
# List of tags and their values to add to every data point
name = "myserver"
### Measurements & Fields:
Note: There are as many `perc[_percentile]` as percentiles defined in the configuration.
- request_aggregates
- requests (integer)
- time_min (float)
- time_max (float)
- time_mean (float)
- time_perc_90 (float)
- time_perc_95 (float)
- [...]
- time_perc_99_99 (float)
- request_aggregates_success
- requests (integer)
- time_min (float)
- time_max (float)
- time_mean (float)
- time_perc_90 (float)
- time_perc_95 (float)
- [...]
- time_perc_99_99 (float)
- request_aggregates_failure
- requests (integer)
- time_min (float)
- time_max (float)
- time_mean (float)
- time_perc_90 (float)
- time_perc_95 (float)
- [...]
- time_perc_99_99 (float)
- request_aggregates_throughput
- requests_total (integer)
- requests_failed (integer)
### Tags:
Tags are user defined in `[inputs.aggregates.tags]`
### Example output:
$ ./telegraf -config telegraf.conf -input-filter request_aggregates -test
request_aggregates,name=myserver requests=186,time_max=380,time_min=86,time_mean=258.54,time_perc_90=200,time_perc_95=220,time_perc_99=225,time_perc_99_99=229 1462270026000000000
request_aggregates_success,name=myserver requests=123,time_max=230,time_min=86,time_mean=120.23,time_perc_90=200,time_perc_95=220,time_perc_99=225,time_perc_99_99=229 1462270026000000000
request_aggregates_failure,name=myserver requests=63,time_max=380,time_min=132,time_mean=298.54,time_perc_90=250,time_perc_95=270,time_perc_99=285,time_perc_99_99=290 1462270026000000000
request_aggregates_throughput,name=myserver requests_total=186,requests_failed=63 1462270026000000000
@ -0,0 +1,67 @@
package request_aggregates
import (
type Request struct {
Timestamp time.Time
Time float64
Failure bool
type RequestParser struct {
TimestampPosition int
TimestampFormat string
IsTimeEpoch bool
TimePosition int
ResultPosition int
SuccessRegexp *regexp.Regexp
// Parses a CSV line and generates a Request
func (rp *RequestParser) ParseLine(line string) (*Request, error) {
var request Request
// Split fields and assign values
reader := strings.NewReader(line)
fields, err := csv.NewReader(reader).Read()
if err != nil {
return nil, fmt.Errorf("ERROR: could not pass CSV line, Error: %s", err)
if rp.ResultPosition < 0 || len(fields) <= rp.ResultPosition ||
rp.TimePosition < 0 || len(fields) <= rp.TimePosition ||
rp.TimestampPosition < 0 || len(fields) <= rp.TimestampPosition {
return nil, fmt.Errorf("ERROR: column position out of range")
if rp.IsTimeEpoch {
var dur time.Duration
dur, err = time.ParseDuration(fields[rp.TimestampPosition] + rp.TimestampFormat)
if err != nil {
return nil, fmt.Errorf("ERROR: could not parse epoch date, Error: %s", err)
request.Timestamp = time.Unix(0, dur.Nanoseconds())
} else {
request.Timestamp, err = time.Parse(rp.TimestampFormat, fields[rp.TimestampPosition])
if err != nil {
return nil, fmt.Errorf("ERROR: could not parse date, Error: %s", err)
request.Time, err = strconv.ParseFloat(fields[rp.TimePosition], 64)
if err != nil {
return nil, fmt.Errorf("ERROR: could not parse time value, Error: %s", err)
if rp.SuccessRegexp != nil {
request.Failure = !rp.SuccessRegexp.MatchString(fields[rp.ResultPosition])
return &request, nil
@ -0,0 +1,356 @@
package request_aggregates
import (
type RequestAggregates struct {
File string
TimestampPosition int
TimestampFormat string
TimePosition int
TimePercentiles []float32
TimeWindowSize internal.Duration
TimeWindows int
ResultPosition int
ResultSuccessRegex string
ThroughputWindowSize internal.Duration
ThroughputWindows int
isTimestampEpoch bool
successRegexp *regexp.Regexp
tailer *tail.Tail
timeWindowSlice []Window
throughputWindowSlice []Window
timeTimer *time.Timer
throughputTimer *time.Timer
stopTimeChan chan bool
stopThroughputChan chan bool
timeMutex sync.Mutex
throughputMutex sync.Mutex
wg sync.WaitGroup
func NewRequestAggregates() *RequestAggregates {
return &RequestAggregates{
TimeWindows: 2,
ThroughputWindows: 10}
const sampleConfig = `
# File to monitor.
file = "/var/server/access.csv"
# Position of the timestamp of the request in every line
timestamp_position = 0
# Format of the timestamp (any layout accepted by Go Time.Parse or s/ms/us/ns for epoch time)
timestamp_format = "ms"
# Position of the time value to calculate in the log file (starting from 0)
time_position = 1
# Window to consider for time percentiles
time_window_size = "60s"
# Windows to keep in memory before flushing in order to avoid requests coming in after a window is shut.
# If the CSV file is sorted by timestamp, this can be set to 1
time_windows = 5
# List of percentiles to calculate
time_percentiles = [90.0, 95.0, 99.0, 99.99]
# Position of the result column (success or failure)
result_position = 3
# Regular expression used to determine if the result is successful or not (if empty only request_aggregates_all
# time series) will be generated
result_success_regex = ".*true.*"
# Time window to calculate throughput counters
throughput_window_size = "1s"
# Number of windows to keep in memory for throughput calculation
throughput_windows = 300
# List of tags and their values to add to every data point
name = "myserver"
func (ra *RequestAggregates) SampleConfig() string {
return sampleConfig
func (ra *RequestAggregates) Description() string {
return "Generates a set of aggregate values for a requests and their response times."
func (ra *RequestAggregates) Gather(acc telegraf.Accumulator) error {
return nil
func (ra *RequestAggregates) Start(acc telegraf.Accumulator) error {
defer ra.Unlock()
err := ra.validateConfig()
if err != nil {
return err
// Create tailer
ra.tailer, err = tail.TailFile(ra.File, tail.Config{
Follow: true,
ReOpen: true,
Location: &tail.SeekInfo{Whence: 2, Offset: 0}})
if err != nil {
return fmt.Errorf("ERROR tailing file %s, Error: %s", ra.File, err)
// Create first time window and start go routine to manage them
now := time.Now()
ra.timeWindowSlice = append(ra.timeWindowSlice, &TimeWindow{
StartTime: now, EndTime: now.Add(ra.TimeWindowSize.Duration),
OnlyTotal: ra.successRegexp == nil, Percentiles: ra.TimePercentiles})
ra.timeTimer = time.NewTimer(ra.TimeWindowSize.Duration)
ra.stopTimeChan = make(chan bool, 1)
go ra.manageTimeWindows(acc)
// Create first throughput window and start go routine to manage them
ra.throughputWindowSlice = append(ra.throughputWindowSlice, &ThroughputWindow{
StartTime: now, EndTime: now.Add(ra.ThroughputWindowSize.Duration)})
ra.throughputTimer = time.NewTimer(ra.ThroughputWindowSize.Duration)
ra.stopThroughputChan = make(chan bool, 1)
go ra.manageThroughputWindows(acc)
// Start go routine to tail the file and put requests in windows
go ra.gatherFromFile(ra.tailer, acc)
return nil
func (ra *RequestAggregates) Stop() {
defer ra.Unlock()
err := ra.tailer.Stop()
if err != nil {
log.Printf("ERROR: could not stop tail on file %s\n", ra.File)
ra.stopTimeChan <- true
ra.stopThroughputChan <- true
// Validates the configuration in the struct
func (ra *RequestAggregates) validateConfig() error {
var err error
// Compile regex to identify success
if ra.ResultSuccessRegex != "" {
ra.successRegexp, err = regexp.Compile(ra.ResultSuccessRegex)
if err != nil {
return fmt.Errorf("ERROR: success regexp is not valid, Error: %s", err)
// Check if timestamp format is valid
switch ra.TimestampFormat {
case "s", "ms", "us", "ns":
ra.isTimestampEpoch = true
if time.Now().Format(ra.TimestampFormat) == ra.TimestampFormat {
return fmt.Errorf("ERROR: incorrect timestamp format")
// Check percentiles are valid
for _, percentile := range ra.TimePercentiles {
if percentile <= 0 || percentile >= 100 {
return fmt.Errorf("ERROR: percentiles must be numbers between 0 and 100 (not inclusive)")
//Check duration of windows
if ra.TimeWindowSize.Duration <= time.Duration(0) || ra.ThroughputWindowSize.Duration <= time.Duration(0) {
return fmt.Errorf("ERROR: windows need to be a positive duration")
// Check number of windows
if ra.TimeWindows <= 0 || ra.ThroughputWindows <= 0 {
return fmt.Errorf("ERROR: at least one window is required")
return nil
// Executed as a go routine, tails a given file and puts the parsed requests into their respective windows.
func (ra *RequestAggregates) gatherFromFile(tailer *tail.Tail, acc telegraf.Accumulator) {
defer ra.wg.Done()
requestParser := &RequestParser{
TimestampPosition: ra.TimestampPosition,
TimestampFormat: ra.TimestampFormat,
IsTimeEpoch: ra.isTimestampEpoch,
TimePosition: ra.TimePosition,
ResultPosition: ra.ResultPosition,
SuccessRegexp: ra.successRegexp}
var err error
var line *tail.Line
var request *Request
for line = range tailer.Lines {
// Parse and validate line
if line.Err != nil {
log.Printf("ERROR: could not tail file %s, Error: %s\n", tailer.Filename, err)
request, err = requestParser.ParseLine(line.Text)
if err != nil {
log.Printf("ERROR: malformed line in %s: [%s], Error: %s\n", tailer.Filename, line.Text, err)
// Wait until the window is created (it is possible that the line is read before the time ticks)
for ra.timeWindowSlice[len(ra.timeWindowSlice)-1].End().Before(request.Timestamp) {
time.Sleep(time.Millisecond * 10)
// Add request to time window
err = addToWindow(ra.timeWindowSlice, request)
if err != nil {
log.Printf("ERROR: could not find a time window, Request: %v, Error %s\n", request, err)
// Wait until the window is created (it is possible that the line is read before the time ticks)
for ra.throughputWindowSlice[len(ra.throughputWindowSlice)-1].End().Before(request.Timestamp) {
time.Sleep(time.Millisecond * 10)
// Add request to throughput window
err = addToWindow(ra.throughputWindowSlice, request)
if err != nil {
log.Printf("ERROR: could not find a throughput window, Request: %v, Error %s\n", request, err)
// Executed as a go routine, manages the windows related to time measures, creating new ones and flushing old ones
func (ra *RequestAggregates) manageTimeWindows(acc telegraf.Accumulator) {
defer ra.wg.Done()
onlyTotal := ra.successRegexp == nil
for {
select {
// If the timer is triggered
case <-ra.timeTimer.C:
// Create new window with the start time of the last one's end time
startTime := ra.timeWindowSlice[len(ra.timeWindowSlice)-1].End()
endTime := startTime.Add(ra.TimeWindowSize.Duration)
ra.timeWindowSlice = append(ra.timeWindowSlice, &TimeWindow{
StartTime: startTime, EndTime: endTime,
OnlyTotal: onlyTotal, Percentiles: ra.TimePercentiles})
// Flush oldest one if necessary
if len(ra.timeWindowSlice) > ra.TimeWindows {
ra.timeWindowSlice = flushWindow(ra.timeWindowSlice, acc)
// Reset time till the end of the window
// If the stop signal is received
case <-ra.stopTimeChan:
ra.timeWindowSlice = flushAllWindows(ra.timeWindowSlice, acc)
// Executed as a go routine, manages the windows related to throughput measures, creating new ones and flushing old ones
func (ra *RequestAggregates) manageThroughputWindows(acc telegraf.Accumulator) {
defer ra.wg.Done()
for {
select {
// If the timer is triggered
case <-ra.throughputTimer.C:
// Create new window with the start time of the last one's end time
startTime := ra.throughputWindowSlice[len(ra.throughputWindowSlice)-1].End()
endTime := startTime.Add(ra.ThroughputWindowSize.Duration)
ra.throughputWindowSlice = append(ra.throughputWindowSlice, &ThroughputWindow{
StartTime: startTime, EndTime: endTime})
// Flush oldest one if necessary
if len(ra.throughputWindowSlice) > ra.ThroughputWindows {
ra.throughputWindowSlice = flushWindow(ra.throughputWindowSlice, acc)
// If the stop signal is received
case <-ra.stopThroughputChan:
ra.throughputWindowSlice = flushAllWindows(ra.throughputWindowSlice, acc)
// Removes the window at the front of the slice of windows and flushes its aggregated metrics to the accumulator
func flushWindow(windows []Window, acc telegraf.Accumulator) []Window {
if len(windows) > 0 {
var window Window
window, windows = windows[0], windows[1:]
metrics, err := window.Aggregate()
if err != nil {
log.Printf("ERROR: could not flush window, Error: %s\n", err)
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
return windows
// Flushes all windows ot the accumulator
func flushAllWindows(windows []Window, acc telegraf.Accumulator) []Window {
for len(windows) > 0 {
windows = flushWindow(windows, acc)
return windows
// Adds a request to a window, returns and error if it could not be added
func addToWindow(windows []Window, request *Request) error {
if len(windows) == 0 {
return fmt.Errorf("ERROR: no windows found")
first := windows[len(windows)-1]
if first.End().Before(request.Timestamp) {
return fmt.Errorf("ERROR: request is newer than any window")
last := windows[0]
if last.Start().After(request.Timestamp) {
return fmt.Errorf("ERROR: request is older than any window, try adding more windows")
for i := range windows {
window := windows[i]
if (window.Start().Before(request.Timestamp) || window.Start().Equal(request.Timestamp)) &&
window.End().After(request.Timestamp) {
return window.Add(request)
return fmt.Errorf("ERROR: no window could be found")
func init() {
inputs.Add("request_aggregates", func() telegraf.Input {
return NewRequestAggregates()
@ -0,0 +1,385 @@
package request_aggregates
import (
// This tests the start/stop and gather functionality
func TestNewRequestAggregates(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
tmpfile.WriteString(fmt.Sprintf("%v", time.Now().UnixNano()) + ",123\n")
defer tmpfile.Close()
defer os.Remove(tmpfile.Name())
windowSize := internal.Duration{Duration: time.Millisecond * 100}
acc := &testutil.Accumulator{}
ra := &RequestAggregates{
File: tmpfile.Name(),
TimestampFormat: "ns",
TimeWindowSize: windowSize,
TimeWindows: 1,
ThroughputWindowSize: windowSize,
ThroughputWindows: 10,
TimestampPosition: 0,
TimePosition: 1}
// When we start
// A tailer is created
require.NotNil(t, ra.tailer)
require.Equal(t, tmpfile.Name(), ra.tailer.Filename)
// The windows are initialised
require.Equal(t, 1, len(ra.timeWindowSlice))
require.Equal(t, 1, len(ra.throughputWindowSlice))
require.Equal(t, 0, len(acc.Metrics))
// When we put a request in the file
time.Sleep(time.Millisecond * 30)
tmpfile.WriteString(fmt.Sprintf("%v", time.Now().UnixNano()) + ",456\n")
time.Sleep(time.Millisecond * 30)
// One metric is stored in the windows
require.Equal(t, int64(1), ra.throughputWindowSlice[0].(*ThroughputWindow).RequestsTotal)
require.Equal(t, 1, len(ra.timeWindowSlice[0].(*TimeWindow).TimesTotal))
require.Equal(t, float64(456), ra.timeWindowSlice[0].(*TimeWindow).TimesTotal[0])
// After the first window is expired
// One of the windows has flushed one metric
require.Equal(t, 1, len(ra.timeWindowSlice))
require.Equal(t, 2, len(ra.throughputWindowSlice))
require.Equal(t, 1, len(acc.Metrics))
// When we stop
// All the metrics should have been flushed
require.Equal(t, 0, len(ra.timeWindowSlice))
require.Equal(t, 0, len(ra.throughputWindowSlice))
require.Equal(t, 4, len(acc.Metrics))
func TestRequestAggregates_validateConfig(t *testing.T) {
// Empty config
ra := &RequestAggregates{}
require.Error(t, ra.validateConfig())
// Minimum config
ra = &RequestAggregates{
TimestampFormat: "ms",
TimeWindowSize: internal.Duration{Duration: time.Millisecond * 10},
TimeWindows: 2,
ThroughputWindowSize: internal.Duration{Duration: time.Millisecond * 10},
ThroughputWindows: 10}
require.NoError(t, ra.validateConfig())
// Regexp for success
ra.ResultSuccessRegex = "*success.*"
require.Error(t, ra.validateConfig())
ra.ResultSuccessRegex = ".*success.*"
require.NoError(t, ra.validateConfig())
// Time format
ra.TimestampFormat = "thisisnotavalidformat"
require.Error(t, ra.validateConfig())
ra.TimestampFormat = ""
require.Error(t, ra.validateConfig())
ra.TimestampFormat = "Mon Jan _2 15:04:05 2006"
require.NoError(t, ra.validateConfig())
// Percentiles
ra.TimePercentiles = []float32{80, 90, 100}
require.Error(t, ra.validateConfig())
ra.TimePercentiles = []float32{0, 90, 99}
require.Error(t, ra.validateConfig())
ra.TimePercentiles = []float32{80, 90, 99}
require.NoError(t, ra.validateConfig())
// Window size
ra.TimeWindowSize = internal.Duration{Duration: time.Duration(0)}
require.Error(t, ra.validateConfig())
ra.TimeWindowSize = internal.Duration{Duration: time.Duration(-1)}
require.Error(t, ra.validateConfig())
ra.TimeWindowSize = internal.Duration{Duration: time.Duration(1)}
require.NoError(t, ra.validateConfig())
ra.ThroughputWindowSize = internal.Duration{Duration: time.Duration(0)}
require.Error(t, ra.validateConfig())
ra.ThroughputWindowSize = internal.Duration{Duration: time.Duration(-1)}
require.Error(t, ra.validateConfig())
ra.ThroughputWindowSize = internal.Duration{Duration: time.Duration(1)}
require.NoError(t, ra.validateConfig())
// Number of windows
ra.TimeWindows = 0
require.Error(t, ra.validateConfig())
ra.TimeWindows = -1
require.Error(t, ra.validateConfig())
ra.TimeWindows = 1
require.NoError(t, ra.validateConfig())
ra.ThroughputWindows = 0
require.Error(t, ra.validateConfig())
ra.ThroughputWindows = -1
require.Error(t, ra.validateConfig())
ra.ThroughputWindows = 1
require.NoError(t, ra.validateConfig())
func TestRequestAggregates_manageTimeWindows_OnlyTotal(t *testing.T) {
windowSize := internal.Duration{Duration: time.Millisecond * 100}
acc := &testutil.Accumulator{}
now := time.Now()
ra := &RequestAggregates{
TimeWindows: 2,
TimeWindowSize: windowSize,
TimePercentiles: []float32{70, 80, 90},
timeTimer: time.NewTimer(windowSize.Duration),
stopTimeChan: make(chan bool, 1)}
// Add first window and start routine
ra.timeWindowSlice = append(ra.timeWindowSlice, &TimeWindow{
StartTime: now, EndTime: now.Add(windowSize.Duration), OnlyTotal: true, Percentiles: ra.TimePercentiles})
go ra.manageTimeWindows(acc)
// Check values at different points
time.Sleep(time.Millisecond * 30)
require.Equal(t, 1, len(ra.timeWindowSlice))
require.Equal(t, 0, len(acc.Metrics))
require.Equal(t, 2, len(ra.timeWindowSlice))
require.Equal(t, 0, len(acc.Metrics))
require.Equal(t, 2, len(ra.timeWindowSlice))
require.Equal(t, 1, len(acc.Metrics))
require.Equal(t, now.Add(windowSize.Duration), acc.Metrics[0].Time)
// Stop and wait for the process to finish
ra.stopTimeChan <- true
// Check that all metrics were flushed
require.Equal(t, 0, len(ra.timeWindowSlice))
require.Equal(t, 3, len(acc.Metrics))
require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[1].Time)
require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[2].Time)
func TestRequestAggregates_manageTimeWindows_All(t *testing.T) {
windowSize := internal.Duration{Duration: time.Millisecond * 100}
acc := &testutil.Accumulator{}
now := time.Now()
ra := &RequestAggregates{
TimeWindows: 2,
TimeWindowSize: windowSize,
TimePercentiles: []float32{70, 80, 90},
successRegexp: regexp.MustCompile(".*success.*"),
timeTimer: time.NewTimer(windowSize.Duration),
stopTimeChan: make(chan bool, 1)}
// Add first window and start routine
ra.timeWindowSlice = append(ra.timeWindowSlice, &TimeWindow{
StartTime: now, EndTime: now.Add(windowSize.Duration), OnlyTotal: false, Percentiles: ra.TimePercentiles})
go ra.manageTimeWindows(acc)
// Check values at different points
time.Sleep(time.Millisecond * 30)
require.Equal(t, 1, len(ra.timeWindowSlice))
require.Equal(t, 0, len(acc.Metrics))
require.Equal(t, 2, len(ra.timeWindowSlice))
require.Equal(t, 0, len(acc.Metrics))
require.Equal(t, 2, len(ra.timeWindowSlice))
require.Equal(t, 3, len(acc.Metrics))
require.Equal(t, now.Add(windowSize.Duration), acc.Metrics[0].Time)
require.Equal(t, now.Add(windowSize.Duration), acc.Metrics[1].Time)
require.Equal(t, now.Add(windowSize.Duration), acc.Metrics[2].Time)
// Stop and wait for the process to finish
ra.stopTimeChan <- true
// Check that all metrics were flushed
require.Equal(t, 0, len(ra.timeWindowSlice))
require.Equal(t, 9, len(acc.Metrics))
require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[3].Time)
require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[4].Time)
require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[5].Time)
require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[6].Time)
require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[7].Time)
require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[8].Time)
func TestRequestAggregates_manageThroughputWindows(t *testing.T) {
windowSize := internal.Duration{Duration: time.Millisecond * 100}
acc := &testutil.Accumulator{}
now := time.Now()
ra := &RequestAggregates{
ThroughputWindows: 2,
ThroughputWindowSize: windowSize,
throughputTimer: time.NewTimer(windowSize.Duration),
stopThroughputChan: make(chan bool, 1)}
// Add first window and start routine
ra.throughputWindowSlice = append(ra.throughputWindowSlice, &ThroughputWindow{
StartTime: now, EndTime: now.Add(windowSize.Duration)})
go ra.manageThroughputWindows(acc)
// Check values at different points
time.Sleep(time.Millisecond * 30)
require.Equal(t, 1, len(ra.throughputWindowSlice))
require.Equal(t, 0, len(acc.Metrics))
require.Equal(t, 2, len(ra.throughputWindowSlice))
require.Equal(t, 0, len(acc.Metrics))
require.Equal(t, 2, len(ra.throughputWindowSlice))
require.Equal(t, 1, len(acc.Metrics))
require.Equal(t, now.Add(windowSize.Duration), acc.Metrics[0].Time)
// Stop and wait for the process to finish
ra.stopThroughputChan <- true
// Check that all metrics were flushed
require.Equal(t, 0, len(ra.throughputWindowSlice))
require.Equal(t, 3, len(acc.Metrics))
require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[1].Time)
require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[2].Time)
func TestRequestAggregates_flushWindow(t *testing.T) {
acc := &testutil.Accumulator{}
now := time.Now()
windows := []Window{&ThroughputWindow{StartTime: now, EndTime: now.Add(time.Duration(60))}}
windows = flushWindow(windows, acc)
require.Equal(t, 0, len(windows))
require.Equal(t, 1, len(acc.Metrics))
require.Equal(t, MeasurementThroughput, acc.Metrics[0].Measurement)
func TestRequestAggregates_flushAllWindows(t *testing.T) {
acc := &testutil.Accumulator{}
now := time.Now()
windows := []Window{&ThroughputWindow{StartTime: now, EndTime: now.Add(time.Duration(60))},
&ThroughputWindow{StartTime: now.Add(time.Duration(60)), EndTime: now.Add(time.Duration(120))},
&ThroughputWindow{StartTime: now.Add(time.Duration(120)), EndTime: now.Add(time.Duration(180))}}
windows = flushAllWindows(windows, acc)
require.Equal(t, 0, len(windows))
require.Equal(t, 3, len(acc.Metrics))
func TestRequestAggregates_addToWindow(t *testing.T) {
now := time.Now()
var windows []Window
// Error if there are no windows (not added)
err := addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(30))})
require.Error(t, err)
// Okay when one window
firstWindow := &ThroughputWindow{StartTime: now, EndTime: now.Add(time.Duration(60))}
windows = append(windows, firstWindow)
err = addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(30))})
require.NoError(t, err)
require.Equal(t, int64(1), firstWindow.RequestsTotal)
// Okay when timestamp equal to start of window
err = addToWindow(windows, &Request{Timestamp: now})
require.NoError(t, err)
require.Equal(t, int64(2), firstWindow.RequestsTotal)
// Error when timestamp equal to end of window
err = addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(60))})
require.Error(t, err)
// Okay with more windows
middleWindow := &ThroughputWindow{StartTime: now.Add(time.Duration(60)), EndTime: now.Add(time.Duration(120))}
lastWindow := &ThroughputWindow{StartTime: now.Add(time.Duration(120)), EndTime: now.Add(time.Duration(180))}
windows = append(windows, middleWindow)
windows = append(windows, lastWindow)
err = addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(90))})
require.NoError(t, err)
require.Equal(t, int64(1), middleWindow.RequestsTotal)
err = addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(150))})
require.NoError(t, err)
require.Equal(t, int64(1), lastWindow.RequestsTotal)
// Error when later than last window
err = addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(220))})
require.Error(t, err)
// Error when before first window
err = addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(-20))})
require.Error(t, err)
@ -0,0 +1,144 @@
package request_aggregates
import (
func TestRequestParser_ParseLine_Nanos(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "ns", IsTimeEpoch: true, TimePosition: 1}
// Test format nanoseconds
r, err := rp.ParseLine("1462380541003228260,123,\"thisissuccessful\"")
require.NoError(t, err)
require.Equal(t, time.Unix(0, 1462380541003228260), r.Timestamp)
func TestRequestParser_ParseLine_Micros(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "us", IsTimeEpoch: true, TimePosition: 1}
// Test format nanoseconds
r, err := rp.ParseLine("1462380541003228,123,\"thisissuccessful\"")
require.NoError(t, err)
require.Equal(t, time.Unix(0, 1462380541003228000), r.Timestamp)
func TestRequestParser_ParseLine_Milis(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "ms", IsTimeEpoch: true, TimePosition: 1}
// Test format nanoseconds
r, err := rp.ParseLine("1462380541003,123,\"thisissuccessful\"")
require.NoError(t, err)
require.Equal(t, time.Unix(0, 1462380541003000000), r.Timestamp)
func TestRequestParser_ParseLine_Seconds(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1}
// Test format nanoseconds
r, err := rp.ParseLine("1462380541,123,\"thisissuccessful\"")
require.NoError(t, err)
require.Equal(t, time.Unix(1462380541, 0), r.Timestamp)
func TestRequestParser_ParseLine_WrongUnit(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1}
// Test format nanoseconds
_, err := rp.ParseLine("1462380541003228260,123,\"thisissuccessful\"")
require.Error(t, err)
func TestRequestParser_ParseLine_Layout(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: time.RFC3339Nano,
IsTimeEpoch: false, TimePosition: 1}
// Test format nanoseconds
r, err := rp.ParseLine("2006-01-02T15:04:05.999999999Z,123,\"thisissuccessful\"")
require.NoError(t, err)
parsed, _ := time.Parse(time.RFC3339Nano, "2006-01-02T15:04:05.999999999Z")
require.Equal(t, parsed, r.Timestamp)
func TestRequestParser_ParseLine_WrongLayout(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: time.RFC3339Nano,
IsTimeEpoch: false, TimePosition: 1}
// Test format nanoseconds
_, err := rp.ParseLine("2006-01-02T15:04:05,123,\"thisissuccessful\"")
require.Error(t, err)
func TestRequestParser_ParseLine_Int(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1}
// Test format nanoseconds
r, err := rp.ParseLine("1462380541,123,\"thisissuccessful\"")
require.NoError(t, err)
require.Equal(t, float64(123), r.Time)
func TestRequestParser_ParseLine_Float(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1}
// Test format nanoseconds
r, err := rp.ParseLine("1462380541,123.45,\"thisissuccessful\"")
require.NoError(t, err)
require.Equal(t, float64(123.45), r.Time)
func TestRequestParser_ParseLine_NoRegexp(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1}
// Test format nanoseconds
r, err := rp.ParseLine("1462380541,123.45,\"thisissuccessful\"")
require.NoError(t, err)
require.Equal(t, false, r.Failure)
func TestRequestParser_ParseLine_Success(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1,
ResultPosition: 2, SuccessRegexp: regexp.MustCompile(".*success.*")}
// Test format nanoseconds
r, err := rp.ParseLine("1462380541,123.45,\"thisissuccessful\"")
require.NoError(t, err)
require.Equal(t, false, r.Failure)
func TestRequestParser_ParseLine_Failure(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1,
ResultPosition: 2, SuccessRegexp: regexp.MustCompile(".*success.*")}
// Test format nanoseconds
r, err := rp.ParseLine("1462380541,123.45,\"thisonefailed\"")
require.NoError(t, err)
require.Equal(t, true, r.Failure)
func TestRequestParser_ParseLine_TimestampOutOfBounds(t *testing.T) {
rp := &RequestParser{TimestampPosition: 6, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1}
// Test format nanoseconds
_, err := rp.ParseLine("1462380541,123.45,\"thisissuccessful\"")
require.Error(t, err)
func TestRequestParser_ParseLine_TimeOutOfBounds(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 6}
// Test format nanoseconds
_, err := rp.ParseLine("1462380541,123.45,\"thisissuccessful\"")
require.Error(t, err)
func TestRequestParser_ParseLine_SuccessOutOfBounds(t *testing.T) {
rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1,
ResultPosition: 8, SuccessRegexp: regexp.MustCompile(".*success.*")}
// Test format nanoseconds
_, err := rp.ParseLine("1462380541,123.45,\"thisissuccessful\"")
require.Error(t, err)
@ -0,0 +1,142 @@
package request_aggregates
import (
const (
MeasurementTime = "request_aggregates_total"
MeasurementTimeFail = "request_aggregates_fail"
MeasurementTimeSuccess = "request_aggregates_success"
FieldTimeRequests = "requests"
FieldTimeMin = "time_min"
FieldTimeMax = "time_max"
FieldTimeMean = "time_mean"
FieldTimePerc = "time_perc_"
MeasurementThroughput = "request_aggregates_throughput"
FieldThroughputTotal = "requests_total"
FieldThroughputFailed = "requests_failed"
type Window interface {
Aggregate() ([]telegraf.Metric, error)
Add(request *Request) error
Start() time.Time
End() time.Time
type TimeWindow struct {
StartTime time.Time
EndTime time.Time
TimesTotal []float64
TimesSuccess []float64
TimesFail []float64
Percentiles []float32
OnlyTotal bool
type ThroughputWindow struct {
StartTime time.Time
EndTime time.Time
RequestsTotal int64
RequestsFail int64
func (tw *TimeWindow) Aggregate() ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 3)
var err error
metrics[0], err = aggregateTimes(MeasurementTime, tw.TimesTotal, tw.Percentiles, tw.EndTime)
if err != nil {
return metrics, err
if !tw.OnlyTotal {
metrics[1], err = aggregateTimes(MeasurementTimeFail, tw.TimesFail, tw.Percentiles, tw.EndTime)
if err != nil {
return metrics, err
metrics[2], err = aggregateTimes(MeasurementTimeSuccess, tw.TimesSuccess, tw.Percentiles, tw.EndTime)
} else {
metrics = metrics[:1]
return metrics, err
func (tw *TimeWindow) Add(request *Request) error {
tw.TimesTotal = append(tw.TimesTotal, request.Time)
if !tw.OnlyTotal {
if request.Failure {
tw.TimesFail = append(tw.TimesFail, request.Time)
} else {
tw.TimesSuccess = append(tw.TimesSuccess, request.Time)
return nil
func (tw *TimeWindow) Start() time.Time {
return tw.StartTime
func (tw *TimeWindow) End() time.Time {
return tw.EndTime
func (tw *ThroughputWindow) Aggregate() ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 1)
metric, err := telegraf.NewMetric(MeasurementThroughput, nil, map[string]interface{}{
FieldThroughputTotal: tw.RequestsTotal,
FieldThroughputFailed: tw.RequestsFail}, tw.EndTime)
metrics[0] = metric
return metrics, err
func (tw *ThroughputWindow) Add(request *Request) error {
if request.Failure {
return nil
func (tw *ThroughputWindow) Start() time.Time {
return tw.StartTime
func (tw *ThroughputWindow) End() time.Time {
return tw.EndTime
// Produces a metric with the aggregates for the given times and percentiles
func aggregateTimes(name string, times []float64, percentiles []float32, endTime time.Time) (telegraf.Metric, error) {
fields := map[string]interface{}{FieldTimeRequests: len(times)}
if len(times) > 0 {
fields[FieldTimeMin] = times[0]
fields[FieldTimeMax] = times[len(times)-1]
totalSum := float64(0)
for _, time := range times {
totalSum += time
fields[FieldTimeMean] = totalSum / float64(len(times))
for _, perc := range percentiles {
i := int(float64(len(times)) * float64(perc) / float64(100))
if i < 0 {
i = 0
fields[FieldTimePerc+strings.Replace(fmt.Sprintf("%v", perc), ".", "_", -1)] = times[i]
return telegraf.NewMetric(name, nil, fields, endTime)
@ -0,0 +1,126 @@
package request_aggregates
import (
func TestTimeWindow_Add(t *testing.T) {
tw := &TimeWindow{}
tw.Add(&Request{Time: 123.45})
require.Equal(t, 1, len(tw.TimesTotal))
require.Equal(t, 1, len(tw.TimesSuccess))
require.Equal(t, 0, len(tw.TimesFail))
require.Equal(t, float64(123.45), tw.TimesTotal[0])
require.Equal(t, float64(123.45), tw.TimesSuccess[0])
tw.Add(&Request{Time: 100, Failure: false})
require.Equal(t, 2, len(tw.TimesTotal))
require.Equal(t, 2, len(tw.TimesSuccess))
require.Equal(t, 0, len(tw.TimesFail))
require.Equal(t, float64(100), tw.TimesTotal[1])
require.Equal(t, float64(100), tw.TimesSuccess[1])
tw.Add(&Request{Time: 200, Failure: true})
require.Equal(t, 3, len(tw.TimesTotal))
require.Equal(t, 2, len(tw.TimesSuccess))
require.Equal(t, 1, len(tw.TimesFail))
require.Equal(t, float64(200), tw.TimesTotal[2])
require.Equal(t, float64(200), tw.TimesFail[0])
func TestTimeWindow_Start(t *testing.T) {
now := time.Now()
tw := &TimeWindow{StartTime: now}
require.Equal(t, now, tw.Start())
func TestTimeWindow_End(t *testing.T) {
now := time.Now()
tw := &TimeWindow{EndTime: now}
require.Equal(t, now, tw.End())
func TestTimeWindow_Aggregate_All(t *testing.T) {
start := time.Now()
end := start.Add(time.Duration(60))
tw := &TimeWindow{StartTime: start, EndTime: end, OnlyTotal: false}
metrics, err := tw.Aggregate()
require.NoError(t, err)
require.Equal(t, 3, len(metrics))
require.Equal(t, end, metrics[0].Time())
require.Equal(t, MeasurementTime, metrics[0].Name())
require.Equal(t, end, metrics[1].Time())
require.Equal(t, MeasurementTimeFail, metrics[1].Name())
require.Equal(t, end, metrics[0].Time())
require.Equal(t, MeasurementTimeSuccess, metrics[2].Name())
func TestTimeWindow_Aggregate_OnlyTotal(t *testing.T) {
start := time.Now()
end := start.Add(time.Duration(60))
tw := &TimeWindow{StartTime: start, EndTime: end, OnlyTotal: true}
metrics, err := tw.Aggregate()
require.NoError(t, err)
require.Equal(t, 1, len(metrics))
require.Equal(t, end, metrics[0].Time())
require.Equal(t, MeasurementTime, metrics[0].Name())
func TestTimeWindow_aggregateTimes(t *testing.T) {
end := time.Now()
metric, err := aggregateTimes(MeasurementTime, []float64{500, 900, 300, 1000, 100, 600, 700, 800, 200, 400},
[]float32{60, 80, 99.9}, end)
require.NoError(t, err)
require.Equal(t, MeasurementTime, metric.Name())
require.Equal(t, int64(10), metric.Fields()[FieldTimeRequests])
require.Equal(t, float64(1000), metric.Fields()[FieldTimeMax])
require.Equal(t, float64(100), metric.Fields()[FieldTimeMin])
require.Equal(t, float64(550), metric.Fields()[FieldTimeMean])
require.Equal(t, float64(700), metric.Fields()[FieldTimePerc+"60"])
require.Equal(t, float64(900), metric.Fields()[FieldTimePerc+"80"])
require.Equal(t, float64(1000), metric.Fields()[FieldTimePerc+"99_9"])
func TestThroughputWindow_Add(t *testing.T) {
tw := &ThroughputWindow{}
require.Equal(t, int64(1), tw.RequestsTotal)
require.Equal(t, int64(0), tw.RequestsFail)
tw.Add(&Request{Failure: false})
require.Equal(t, int64(2), tw.RequestsTotal)
require.Equal(t, int64(0), tw.RequestsFail)
tw.Add(&Request{Failure: true})
require.Equal(t, int64(3), tw.RequestsTotal)
require.Equal(t, int64(1), tw.RequestsFail)
func TestThroughputWindow_Start(t *testing.T) {
now := time.Now()
tw := &ThroughputWindow{StartTime: now}
require.Equal(t, now, tw.Start())
func TestThroughputWindow_End(t *testing.T) {
now := time.Now()
tw := &ThroughputWindow{EndTime: now}
require.Equal(t, now, tw.End())
func TestThroughputWindow_Aggregate(t *testing.T) {
start := time.Now()
end := start.Add(time.Duration(60))
tw := &ThroughputWindow{StartTime: start, EndTime: end, RequestsTotal: 33, RequestsFail: 11}
metrics, err := tw.Aggregate()
require.NoError(t, err)
require.Equal(t, 1, len(metrics))
require.Equal(t, end, metrics[0].Time())
require.Equal(t, MeasurementThroughput, metrics[0].Name())
require.Equal(t, int64(33), metrics[0].Fields()[FieldThroughputTotal])
require.Equal(t, int64(11), metrics[0].Fields()[FieldThroughputFailed])
Reference in New Issue