From b2a05a3b587f37786870227f464a929d3a42b275 Mon Sep 17 00:00:00 2001 From: Daniel Blanco Date: Mon, 9 May 2016 11:37:17 +0100 Subject: [PATCH] Added request_aggregates input plugin --- plugins/inputs/request_aggregates/README.md | 90 ++++ plugins/inputs/request_aggregates/request.go | 67 +++ .../request_aggregates/request_aggregates.go | 356 ++++++++++++++++ .../request_aggregates_test.go | 385 ++++++++++++++++++ .../inputs/request_aggregates/request_test.go | 144 +++++++ plugins/inputs/request_aggregates/window.go | 142 +++++++ .../inputs/request_aggregates/window_test.go | 126 ++++++ 7 files changed, 1310 insertions(+) create mode 100644 plugins/inputs/request_aggregates/README.md create mode 100644 plugins/inputs/request_aggregates/request.go create mode 100644 plugins/inputs/request_aggregates/request_aggregates.go create mode 100644 plugins/inputs/request_aggregates/request_aggregates_test.go create mode 100644 plugins/inputs/request_aggregates/request_test.go create mode 100644 plugins/inputs/request_aggregates/window.go create mode 100644 plugins/inputs/request_aggregates/window_test.go diff --git a/plugins/inputs/request_aggregates/README.md b/plugins/inputs/request_aggregates/README.md new file mode 100644 index 000000000..81788a874 --- /dev/null +++ b/plugins/inputs/request_aggregates/README.md @@ -0,0 +1,90 @@ +# Request aggregates plugin + +The request aggregates plugin generates a set of aggregate values for a response time column in a CSV file within a +given interval. This is especially useful when calculating throughput of systems with high request frequency +for which storing every single request might require an unnecessary infrastructure. Aggregating values on the client +side minimises the number of writes to the InfluxDB server. + +The plugin generates data points at the end of the given window. If no lines were added to the file during a specific +window, no data points are generated. + +### Configuration: + +```toml +# Aggregates values for requests written to a log file +[[inputs.request_aggregates]] + # File to monitor. + file = "/var/server/access.csv" + # Position of the timestamp of the request in every line + timestamp_position = 0 + # Format of the timestamp (any layout accepted by Go Time.Parse or s/ms/us/ns for epoch time) + timestamp_format = "ms" + # Position of the time value to calculate in the log file (starting from 0) + time_position = 1 + # Window to consider for time percentiles + time_window_size = "60s" + # Windows to keep in memory before flushing in order to avoid requests coming in after a window is shut. + # If the CSV file is sorted by timestamp, this can be set to 1 + time_windows = 5 + # List of percentiles to calculate + time_percentiles = [90.0, 95.0, 99.0, 99.99] + # Position of the result column (success or failure) + result_position = 3 + # Regular expression used to determine if the result is successful or not (if empty only request_aggregates_all + # time series) will be generated + result_success_regex = ".*true.*" + # Time window to calculate throughput counters + throughput_window_size = "1s" + # Number of windows to keep in memory for throughput calculation + throughput_windows = 300 + # List of tags and their values to add to every data point + [inputs.aggregates.tags] + name = "myserver" +``` + +### Measurements & Fields: +Note: There are as many `perc[_percentile]` as percentiles defined in the configuration. + +- request_aggregates + - requests (integer) + - time_min (float) + - time_max (float) + - time_mean (float) + - time_perc_90 (float) + - time_perc_95 (float) + - [...] + - time_perc_99_99 (float) +- request_aggregates_success + - requests (integer) + - time_min (float) + - time_max (float) + - time_mean (float) + - time_perc_90 (float) + - time_perc_95 (float) + - [...] + - time_perc_99_99 (float) +- request_aggregates_failure + - requests (integer) + - time_min (float) + - time_max (float) + - time_mean (float) + - time_perc_90 (float) + - time_perc_95 (float) + - [...] + - time_perc_99_99 (float) +- request_aggregates_throughput + - requests_total (integer) + - requests_failed (integer) + +### Tags: +Tags are user defined in `[inputs.aggregates.tags]` + +### Example output: + +``` +$ ./telegraf -config telegraf.conf -input-filter request_aggregates -test +request_aggregates,name=myserver requests=186,time_max=380,time_min=86,time_mean=258.54,time_perc_90=200,time_perc_95=220,time_perc_99=225,time_perc_99_99=229 1462270026000000000 +request_aggregates_success,name=myserver requests=123,time_max=230,time_min=86,time_mean=120.23,time_perc_90=200,time_perc_95=220,time_perc_99=225,time_perc_99_99=229 1462270026000000000 +request_aggregates_failure,name=myserver requests=63,time_max=380,time_min=132,time_mean=298.54,time_perc_90=250,time_perc_95=270,time_perc_99=285,time_perc_99_99=290 1462270026000000000 +request_aggregates_throughput,name=myserver requests_total=186,requests_failed=63 1462270026000000000 +``` \ No newline at end of file diff --git a/plugins/inputs/request_aggregates/request.go b/plugins/inputs/request_aggregates/request.go new file mode 100644 index 000000000..a5842d029 --- /dev/null +++ b/plugins/inputs/request_aggregates/request.go @@ -0,0 +1,67 @@ +package request_aggregates + +import ( + "encoding/csv" + "fmt" + "regexp" + "strconv" + "strings" + "time" +) + +type Request struct { + Timestamp time.Time + Time float64 + Failure bool +} + +type RequestParser struct { + TimestampPosition int + TimestampFormat string + IsTimeEpoch bool + TimePosition int + ResultPosition int + SuccessRegexp *regexp.Regexp +} + +// Parses a CSV line and generates a Request +func (rp *RequestParser) ParseLine(line string) (*Request, error) { + var request Request + + // Split fields and assign values + reader := strings.NewReader(line) + fields, err := csv.NewReader(reader).Read() + if err != nil { + return nil, fmt.Errorf("ERROR: could not pass CSV line, Error: %s", err) + } + if rp.ResultPosition < 0 || len(fields) <= rp.ResultPosition || + rp.TimePosition < 0 || len(fields) <= rp.TimePosition || + rp.TimestampPosition < 0 || len(fields) <= rp.TimestampPosition { + return nil, fmt.Errorf("ERROR: column position out of range") + } + + if rp.IsTimeEpoch { + var dur time.Duration + dur, err = time.ParseDuration(fields[rp.TimestampPosition] + rp.TimestampFormat) + if err != nil { + return nil, fmt.Errorf("ERROR: could not parse epoch date, Error: %s", err) + } + request.Timestamp = time.Unix(0, dur.Nanoseconds()) + } else { + request.Timestamp, err = time.Parse(rp.TimestampFormat, fields[rp.TimestampPosition]) + if err != nil { + return nil, fmt.Errorf("ERROR: could not parse date, Error: %s", err) + } + } + + request.Time, err = strconv.ParseFloat(fields[rp.TimePosition], 64) + if err != nil { + return nil, fmt.Errorf("ERROR: could not parse time value, Error: %s", err) + } + + if rp.SuccessRegexp != nil { + request.Failure = !rp.SuccessRegexp.MatchString(fields[rp.ResultPosition]) + } + + return &request, nil +} diff --git a/plugins/inputs/request_aggregates/request_aggregates.go b/plugins/inputs/request_aggregates/request_aggregates.go new file mode 100644 index 000000000..3fa7835df --- /dev/null +++ b/plugins/inputs/request_aggregates/request_aggregates.go @@ -0,0 +1,356 @@ +package request_aggregates + +import ( + "fmt" + "github.com/hpcloud/tail" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "log" + "regexp" + "sync" + "time" +) + +type RequestAggregates struct { + File string + TimestampPosition int + TimestampFormat string + TimePosition int + TimePercentiles []float32 + TimeWindowSize internal.Duration + TimeWindows int + ResultPosition int + ResultSuccessRegex string + ThroughputWindowSize internal.Duration + ThroughputWindows int + + isTimestampEpoch bool + successRegexp *regexp.Regexp + tailer *tail.Tail + timeWindowSlice []Window + throughputWindowSlice []Window + timeTimer *time.Timer + throughputTimer *time.Timer + stopTimeChan chan bool + stopThroughputChan chan bool + timeMutex sync.Mutex + throughputMutex sync.Mutex + wg sync.WaitGroup + sync.Mutex +} + +func NewRequestAggregates() *RequestAggregates { + return &RequestAggregates{ + TimeWindows: 2, + ThroughputWindows: 10} +} + +const sampleConfig = ` + # File to monitor. + file = "/var/server/access.csv" + # Position of the timestamp of the request in every line + timestamp_position = 0 + # Format of the timestamp (any layout accepted by Go Time.Parse or s/ms/us/ns for epoch time) + timestamp_format = "ms" + # Position of the time value to calculate in the log file (starting from 0) + time_position = 1 + # Window to consider for time percentiles + time_window_size = "60s" + # Windows to keep in memory before flushing in order to avoid requests coming in after a window is shut. + # If the CSV file is sorted by timestamp, this can be set to 1 + time_windows = 5 + # List of percentiles to calculate + time_percentiles = [90.0, 95.0, 99.0, 99.99] + # Position of the result column (success or failure) + result_position = 3 + # Regular expression used to determine if the result is successful or not (if empty only request_aggregates_all + # time series) will be generated + result_success_regex = ".*true.*" + # Time window to calculate throughput counters + throughput_window_size = "1s" + # Number of windows to keep in memory for throughput calculation + throughput_windows = 300 + # List of tags and their values to add to every data point + [inputs.aggregates.tags] + name = "myserver" +` + +func (ra *RequestAggregates) SampleConfig() string { + return sampleConfig +} + +func (ra *RequestAggregates) Description() string { + return "Generates a set of aggregate values for a requests and their response times." +} + +func (ra *RequestAggregates) Gather(acc telegraf.Accumulator) error { + return nil +} + +func (ra *RequestAggregates) Start(acc telegraf.Accumulator) error { + ra.Lock() + defer ra.Unlock() + + err := ra.validateConfig() + if err != nil { + return err + } + + // Create tailer + ra.tailer, err = tail.TailFile(ra.File, tail.Config{ + Follow: true, + ReOpen: true, + Location: &tail.SeekInfo{Whence: 2, Offset: 0}}) + if err != nil { + return fmt.Errorf("ERROR tailing file %s, Error: %s", ra.File, err) + } + + // Create first time window and start go routine to manage them + now := time.Now() + ra.timeWindowSlice = append(ra.timeWindowSlice, &TimeWindow{ + StartTime: now, EndTime: now.Add(ra.TimeWindowSize.Duration), + OnlyTotal: ra.successRegexp == nil, Percentiles: ra.TimePercentiles}) + ra.timeTimer = time.NewTimer(ra.TimeWindowSize.Duration) + ra.stopTimeChan = make(chan bool, 1) + ra.wg.Add(1) + go ra.manageTimeWindows(acc) + + // Create first throughput window and start go routine to manage them + ra.throughputWindowSlice = append(ra.throughputWindowSlice, &ThroughputWindow{ + StartTime: now, EndTime: now.Add(ra.ThroughputWindowSize.Duration)}) + ra.throughputTimer = time.NewTimer(ra.ThroughputWindowSize.Duration) + ra.stopThroughputChan = make(chan bool, 1) + ra.wg.Add(1) + go ra.manageThroughputWindows(acc) + + // Start go routine to tail the file and put requests in windows + ra.wg.Add(1) + go ra.gatherFromFile(ra.tailer, acc) + + return nil +} + +func (ra *RequestAggregates) Stop() { + ra.Lock() + defer ra.Unlock() + + err := ra.tailer.Stop() + if err != nil { + log.Printf("ERROR: could not stop tail on file %s\n", ra.File) + } + ra.tailer.Cleanup() + + ra.timeTimer.Stop() + ra.stopTimeChan <- true + ra.throughputTimer.Stop() + ra.stopThroughputChan <- true + + ra.wg.Wait() +} + +// Validates the configuration in the struct +func (ra *RequestAggregates) validateConfig() error { + var err error + + // Compile regex to identify success + if ra.ResultSuccessRegex != "" { + ra.successRegexp, err = regexp.Compile(ra.ResultSuccessRegex) + if err != nil { + return fmt.Errorf("ERROR: success regexp is not valid, Error: %s", err) + } + } + // Check if timestamp format is valid + switch ra.TimestampFormat { + case "s", "ms", "us", "ns": + ra.isTimestampEpoch = true + break + default: + if time.Now().Format(ra.TimestampFormat) == ra.TimestampFormat { + return fmt.Errorf("ERROR: incorrect timestamp format") + } + } + // Check percentiles are valid + for _, percentile := range ra.TimePercentiles { + if percentile <= 0 || percentile >= 100 { + return fmt.Errorf("ERROR: percentiles must be numbers between 0 and 100 (not inclusive)") + } + } + //Check duration of windows + if ra.TimeWindowSize.Duration <= time.Duration(0) || ra.ThroughputWindowSize.Duration <= time.Duration(0) { + return fmt.Errorf("ERROR: windows need to be a positive duration") + } + // Check number of windows + if ra.TimeWindows <= 0 || ra.ThroughputWindows <= 0 { + return fmt.Errorf("ERROR: at least one window is required") + } + + return nil +} + +// Executed as a go routine, tails a given file and puts the parsed requests into their respective windows. +func (ra *RequestAggregates) gatherFromFile(tailer *tail.Tail, acc telegraf.Accumulator) { + defer ra.wg.Done() + + requestParser := &RequestParser{ + TimestampPosition: ra.TimestampPosition, + TimestampFormat: ra.TimestampFormat, + IsTimeEpoch: ra.isTimestampEpoch, + TimePosition: ra.TimePosition, + ResultPosition: ra.ResultPosition, + SuccessRegexp: ra.successRegexp} + + var err error + var line *tail.Line + var request *Request + for line = range tailer.Lines { + // Parse and validate line + if line.Err != nil { + log.Printf("ERROR: could not tail file %s, Error: %s\n", tailer.Filename, err) + continue + } + request, err = requestParser.ParseLine(line.Text) + if err != nil { + log.Printf("ERROR: malformed line in %s: [%s], Error: %s\n", tailer.Filename, line.Text, err) + continue + } + + // Wait until the window is created (it is possible that the line is read before the time ticks) + for ra.timeWindowSlice[len(ra.timeWindowSlice)-1].End().Before(request.Timestamp) { + time.Sleep(time.Millisecond * 10) + } + // Add request to time window + ra.timeMutex.Lock() + err = addToWindow(ra.timeWindowSlice, request) + if err != nil { + log.Printf("ERROR: could not find a time window, Request: %v, Error %s\n", request, err) + } + ra.timeMutex.Unlock() + + // Wait until the window is created (it is possible that the line is read before the time ticks) + for ra.throughputWindowSlice[len(ra.throughputWindowSlice)-1].End().Before(request.Timestamp) { + time.Sleep(time.Millisecond * 10) + } + // Add request to throughput window + ra.throughputMutex.Lock() + err = addToWindow(ra.throughputWindowSlice, request) + if err != nil { + log.Printf("ERROR: could not find a throughput window, Request: %v, Error %s\n", request, err) + } + ra.throughputMutex.Unlock() + } +} + +// Executed as a go routine, manages the windows related to time measures, creating new ones and flushing old ones +func (ra *RequestAggregates) manageTimeWindows(acc telegraf.Accumulator) { + defer ra.wg.Done() + onlyTotal := ra.successRegexp == nil + for { + select { + // If the timer is triggered + case <-ra.timeTimer.C: + ra.timeMutex.Lock() + // Create new window with the start time of the last one's end time + startTime := ra.timeWindowSlice[len(ra.timeWindowSlice)-1].End() + endTime := startTime.Add(ra.TimeWindowSize.Duration) + ra.timeWindowSlice = append(ra.timeWindowSlice, &TimeWindow{ + StartTime: startTime, EndTime: endTime, + OnlyTotal: onlyTotal, Percentiles: ra.TimePercentiles}) + // Flush oldest one if necessary + if len(ra.timeWindowSlice) > ra.TimeWindows { + ra.timeWindowSlice = flushWindow(ra.timeWindowSlice, acc) + } + ra.timeMutex.Unlock() + // Reset time till the end of the window + ra.timeTimer.Reset(endTime.Sub(time.Now())) + // If the stop signal is received + case <-ra.stopTimeChan: + ra.timeMutex.Lock() + ra.timeWindowSlice = flushAllWindows(ra.timeWindowSlice, acc) + ra.timeMutex.Unlock() + return + } + } +} + +// Executed as a go routine, manages the windows related to throughput measures, creating new ones and flushing old ones +func (ra *RequestAggregates) manageThroughputWindows(acc telegraf.Accumulator) { + defer ra.wg.Done() + for { + select { + // If the timer is triggered + case <-ra.throughputTimer.C: + ra.throughputMutex.Lock() + // Create new window with the start time of the last one's end time + startTime := ra.throughputWindowSlice[len(ra.throughputWindowSlice)-1].End() + endTime := startTime.Add(ra.ThroughputWindowSize.Duration) + ra.throughputWindowSlice = append(ra.throughputWindowSlice, &ThroughputWindow{ + StartTime: startTime, EndTime: endTime}) + // Flush oldest one if necessary + if len(ra.throughputWindowSlice) > ra.ThroughputWindows { + ra.throughputWindowSlice = flushWindow(ra.throughputWindowSlice, acc) + } + ra.throughputMutex.Unlock() + ra.throughputTimer.Reset(endTime.Sub(time.Now())) + // If the stop signal is received + case <-ra.stopThroughputChan: + ra.throughputMutex.Lock() + ra.throughputWindowSlice = flushAllWindows(ra.throughputWindowSlice, acc) + ra.throughputMutex.Unlock() + return + } + } +} + +// Removes the window at the front of the slice of windows and flushes its aggregated metrics to the accumulator +func flushWindow(windows []Window, acc telegraf.Accumulator) []Window { + if len(windows) > 0 { + var window Window + window, windows = windows[0], windows[1:] + metrics, err := window.Aggregate() + if err != nil { + log.Printf("ERROR: could not flush window, Error: %s\n", err) + } + for _, metric := range metrics { + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + } + return windows +} + +// Flushes all windows ot the accumulator +func flushAllWindows(windows []Window, acc telegraf.Accumulator) []Window { + for len(windows) > 0 { + windows = flushWindow(windows, acc) + } + return windows +} + +// Adds a request to a window, returns and error if it could not be added +func addToWindow(windows []Window, request *Request) error { + if len(windows) == 0 { + return fmt.Errorf("ERROR: no windows found") + } + first := windows[len(windows)-1] + if first.End().Before(request.Timestamp) { + return fmt.Errorf("ERROR: request is newer than any window") + } + last := windows[0] + if last.Start().After(request.Timestamp) { + return fmt.Errorf("ERROR: request is older than any window, try adding more windows") + } + for i := range windows { + window := windows[i] + if (window.Start().Before(request.Timestamp) || window.Start().Equal(request.Timestamp)) && + window.End().After(request.Timestamp) { + return window.Add(request) + } + } + return fmt.Errorf("ERROR: no window could be found") +} + +func init() { + inputs.Add("request_aggregates", func() telegraf.Input { + return NewRequestAggregates() + }) +} diff --git a/plugins/inputs/request_aggregates/request_aggregates_test.go b/plugins/inputs/request_aggregates/request_aggregates_test.go new file mode 100644 index 000000000..c12e55fde --- /dev/null +++ b/plugins/inputs/request_aggregates/request_aggregates_test.go @@ -0,0 +1,385 @@ +package request_aggregates + +import ( + "fmt" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "io/ioutil" + "os" + "regexp" + "testing" + "time" +) + +// This tests the start/stop and gather functionality +func TestNewRequestAggregates(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "") + require.NoError(t, err) + tmpfile.WriteString(fmt.Sprintf("%v", time.Now().UnixNano()) + ",123\n") + defer tmpfile.Close() + defer os.Remove(tmpfile.Name()) + + windowSize := internal.Duration{Duration: time.Millisecond * 100} + acc := &testutil.Accumulator{} + ra := &RequestAggregates{ + File: tmpfile.Name(), + TimestampFormat: "ns", + TimeWindowSize: windowSize, + TimeWindows: 1, + ThroughputWindowSize: windowSize, + ThroughputWindows: 10, + TimestampPosition: 0, + TimePosition: 1} + // When we start + ra.Start(acc) + // A tailer is created + ra.Lock() + require.NotNil(t, ra.tailer) + require.Equal(t, tmpfile.Name(), ra.tailer.Filename) + ra.Unlock() + // The windows are initialised + ra.timeMutex.Lock() + require.Equal(t, 1, len(ra.timeWindowSlice)) + ra.timeMutex.Unlock() + ra.throughputMutex.Lock() + require.Equal(t, 1, len(ra.throughputWindowSlice)) + ra.throughputMutex.Unlock() + acc.Lock() + require.Equal(t, 0, len(acc.Metrics)) + acc.Unlock() + + // When we put a request in the file + time.Sleep(time.Millisecond * 30) + tmpfile.WriteString(fmt.Sprintf("%v", time.Now().UnixNano()) + ",456\n") + time.Sleep(time.Millisecond * 30) + // One metric is stored in the windows + ra.throughputMutex.Lock() + require.Equal(t, int64(1), ra.throughputWindowSlice[0].(*ThroughputWindow).RequestsTotal) + ra.throughputMutex.Unlock() + ra.timeMutex.Lock() + require.Equal(t, 1, len(ra.timeWindowSlice[0].(*TimeWindow).TimesTotal)) + require.Equal(t, float64(456), ra.timeWindowSlice[0].(*TimeWindow).TimesTotal[0]) + ra.timeMutex.Unlock() + + // After the first window is expired + time.Sleep(windowSize.Duration) + // One of the windows has flushed one metric + ra.timeMutex.Lock() + require.Equal(t, 1, len(ra.timeWindowSlice)) + ra.timeMutex.Unlock() + ra.throughputMutex.Lock() + require.Equal(t, 2, len(ra.throughputWindowSlice)) + ra.throughputMutex.Unlock() + acc.Lock() + require.Equal(t, 1, len(acc.Metrics)) + acc.Unlock() + + // When we stop + ra.Stop() + // All the metrics should have been flushed + ra.timeMutex.Lock() + require.Equal(t, 0, len(ra.timeWindowSlice)) + ra.timeMutex.Unlock() + ra.throughputMutex.Lock() + require.Equal(t, 0, len(ra.throughputWindowSlice)) + ra.throughputMutex.Unlock() + acc.Lock() + require.Equal(t, 4, len(acc.Metrics)) + acc.Unlock() +} + +func TestRequestAggregates_validateConfig(t *testing.T) { + // Empty config + ra := &RequestAggregates{} + require.Error(t, ra.validateConfig()) + // Minimum config + ra = &RequestAggregates{ + TimestampFormat: "ms", + TimeWindowSize: internal.Duration{Duration: time.Millisecond * 10}, + TimeWindows: 2, + ThroughputWindowSize: internal.Duration{Duration: time.Millisecond * 10}, + ThroughputWindows: 10} + require.NoError(t, ra.validateConfig()) + // Regexp for success + ra.ResultSuccessRegex = "*success.*" + require.Error(t, ra.validateConfig()) + ra.ResultSuccessRegex = ".*success.*" + require.NoError(t, ra.validateConfig()) + // Time format + ra.TimestampFormat = "thisisnotavalidformat" + require.Error(t, ra.validateConfig()) + ra.TimestampFormat = "" + require.Error(t, ra.validateConfig()) + ra.TimestampFormat = "Mon Jan _2 15:04:05 2006" + require.NoError(t, ra.validateConfig()) + // Percentiles + ra.TimePercentiles = []float32{80, 90, 100} + require.Error(t, ra.validateConfig()) + ra.TimePercentiles = []float32{0, 90, 99} + require.Error(t, ra.validateConfig()) + ra.TimePercentiles = []float32{80, 90, 99} + require.NoError(t, ra.validateConfig()) + // Window size + ra.TimeWindowSize = internal.Duration{Duration: time.Duration(0)} + require.Error(t, ra.validateConfig()) + ra.TimeWindowSize = internal.Duration{Duration: time.Duration(-1)} + require.Error(t, ra.validateConfig()) + ra.TimeWindowSize = internal.Duration{Duration: time.Duration(1)} + require.NoError(t, ra.validateConfig()) + ra.ThroughputWindowSize = internal.Duration{Duration: time.Duration(0)} + require.Error(t, ra.validateConfig()) + ra.ThroughputWindowSize = internal.Duration{Duration: time.Duration(-1)} + require.Error(t, ra.validateConfig()) + ra.ThroughputWindowSize = internal.Duration{Duration: time.Duration(1)} + require.NoError(t, ra.validateConfig()) + // Number of windows + ra.TimeWindows = 0 + require.Error(t, ra.validateConfig()) + ra.TimeWindows = -1 + require.Error(t, ra.validateConfig()) + ra.TimeWindows = 1 + require.NoError(t, ra.validateConfig()) + ra.ThroughputWindows = 0 + require.Error(t, ra.validateConfig()) + ra.ThroughputWindows = -1 + require.Error(t, ra.validateConfig()) + ra.ThroughputWindows = 1 + require.NoError(t, ra.validateConfig()) +} + +func TestRequestAggregates_manageTimeWindows_OnlyTotal(t *testing.T) { + windowSize := internal.Duration{Duration: time.Millisecond * 100} + acc := &testutil.Accumulator{} + now := time.Now() + ra := &RequestAggregates{ + TimeWindows: 2, + TimeWindowSize: windowSize, + TimePercentiles: []float32{70, 80, 90}, + timeTimer: time.NewTimer(windowSize.Duration), + stopTimeChan: make(chan bool, 1)} + + // Add first window and start routine + ra.timeWindowSlice = append(ra.timeWindowSlice, &TimeWindow{ + StartTime: now, EndTime: now.Add(windowSize.Duration), OnlyTotal: true, Percentiles: ra.TimePercentiles}) + ra.wg.Add(1) + go ra.manageTimeWindows(acc) + + // Check values at different points + time.Sleep(time.Millisecond * 30) + ra.timeMutex.Lock() + require.Equal(t, 1, len(ra.timeWindowSlice)) + ra.timeMutex.Unlock() + acc.Lock() + require.Equal(t, 0, len(acc.Metrics)) + acc.Unlock() + time.Sleep(windowSize.Duration) + ra.timeMutex.Lock() + require.Equal(t, 2, len(ra.timeWindowSlice)) + ra.timeMutex.Unlock() + acc.Lock() + require.Equal(t, 0, len(acc.Metrics)) + acc.Unlock() + time.Sleep(windowSize.Duration) + ra.timeMutex.Lock() + require.Equal(t, 2, len(ra.timeWindowSlice)) + ra.timeMutex.Unlock() + acc.Lock() + require.Equal(t, 1, len(acc.Metrics)) + require.Equal(t, now.Add(windowSize.Duration), acc.Metrics[0].Time) + acc.Unlock() + + // Stop and wait for the process to finish + ra.timeTimer.Stop() + ra.stopTimeChan <- true + ra.wg.Wait() + + // Check that all metrics were flushed + ra.timeMutex.Lock() + require.Equal(t, 0, len(ra.timeWindowSlice)) + ra.timeMutex.Unlock() + acc.Lock() + require.Equal(t, 3, len(acc.Metrics)) + require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[1].Time) + require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[2].Time) + acc.Unlock() +} + +func TestRequestAggregates_manageTimeWindows_All(t *testing.T) { + windowSize := internal.Duration{Duration: time.Millisecond * 100} + acc := &testutil.Accumulator{} + now := time.Now() + ra := &RequestAggregates{ + TimeWindows: 2, + TimeWindowSize: windowSize, + TimePercentiles: []float32{70, 80, 90}, + successRegexp: regexp.MustCompile(".*success.*"), + timeTimer: time.NewTimer(windowSize.Duration), + stopTimeChan: make(chan bool, 1)} + + // Add first window and start routine + ra.timeWindowSlice = append(ra.timeWindowSlice, &TimeWindow{ + StartTime: now, EndTime: now.Add(windowSize.Duration), OnlyTotal: false, Percentiles: ra.TimePercentiles}) + ra.wg.Add(1) + go ra.manageTimeWindows(acc) + + // Check values at different points + time.Sleep(time.Millisecond * 30) + ra.timeMutex.Lock() + require.Equal(t, 1, len(ra.timeWindowSlice)) + ra.timeMutex.Unlock() + acc.Lock() + require.Equal(t, 0, len(acc.Metrics)) + acc.Unlock() + time.Sleep(windowSize.Duration) + ra.timeMutex.Lock() + require.Equal(t, 2, len(ra.timeWindowSlice)) + ra.timeMutex.Unlock() + acc.Lock() + require.Equal(t, 0, len(acc.Metrics)) + acc.Unlock() + time.Sleep(windowSize.Duration) + ra.timeMutex.Lock() + require.Equal(t, 2, len(ra.timeWindowSlice)) + ra.timeMutex.Unlock() + acc.Lock() + require.Equal(t, 3, len(acc.Metrics)) + require.Equal(t, now.Add(windowSize.Duration), acc.Metrics[0].Time) + require.Equal(t, now.Add(windowSize.Duration), acc.Metrics[1].Time) + require.Equal(t, now.Add(windowSize.Duration), acc.Metrics[2].Time) + acc.Unlock() + + // Stop and wait for the process to finish + ra.timeTimer.Stop() + ra.stopTimeChan <- true + ra.wg.Wait() + + // Check that all metrics were flushed + ra.timeMutex.Lock() + require.Equal(t, 0, len(ra.timeWindowSlice)) + ra.timeMutex.Unlock() + acc.Lock() + require.Equal(t, 9, len(acc.Metrics)) + require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[3].Time) + require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[4].Time) + require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[5].Time) + require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[6].Time) + require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[7].Time) + require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[8].Time) + acc.Unlock() +} + +func TestRequestAggregates_manageThroughputWindows(t *testing.T) { + windowSize := internal.Duration{Duration: time.Millisecond * 100} + acc := &testutil.Accumulator{} + now := time.Now() + ra := &RequestAggregates{ + ThroughputWindows: 2, + ThroughputWindowSize: windowSize, + throughputTimer: time.NewTimer(windowSize.Duration), + stopThroughputChan: make(chan bool, 1)} + + // Add first window and start routine + ra.throughputWindowSlice = append(ra.throughputWindowSlice, &ThroughputWindow{ + StartTime: now, EndTime: now.Add(windowSize.Duration)}) + ra.wg.Add(1) + go ra.manageThroughputWindows(acc) + + // Check values at different points + time.Sleep(time.Millisecond * 30) + ra.throughputMutex.Lock() + require.Equal(t, 1, len(ra.throughputWindowSlice)) + ra.throughputMutex.Unlock() + acc.Lock() + require.Equal(t, 0, len(acc.Metrics)) + acc.Unlock() + time.Sleep(windowSize.Duration) + ra.throughputMutex.Lock() + require.Equal(t, 2, len(ra.throughputWindowSlice)) + ra.throughputMutex.Unlock() + acc.Lock() + require.Equal(t, 0, len(acc.Metrics)) + acc.Unlock() + time.Sleep(windowSize.Duration) + ra.throughputMutex.Lock() + require.Equal(t, 2, len(ra.throughputWindowSlice)) + ra.throughputMutex.Unlock() + acc.Lock() + require.Equal(t, 1, len(acc.Metrics)) + require.Equal(t, now.Add(windowSize.Duration), acc.Metrics[0].Time) + acc.Unlock() + + // Stop and wait for the process to finish + ra.throughputTimer.Stop() + ra.stopThroughputChan <- true + ra.wg.Wait() + + // Check that all metrics were flushed + ra.throughputMutex.Lock() + require.Equal(t, 0, len(ra.throughputWindowSlice)) + ra.throughputMutex.Unlock() + acc.Lock() + require.Equal(t, 3, len(acc.Metrics)) + require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[1].Time) + require.Equal(t, now.Add(windowSize.Duration).Add(windowSize.Duration).Add(windowSize.Duration), acc.Metrics[2].Time) + acc.Unlock() +} + +func TestRequestAggregates_flushWindow(t *testing.T) { + acc := &testutil.Accumulator{} + now := time.Now() + windows := []Window{&ThroughputWindow{StartTime: now, EndTime: now.Add(time.Duration(60))}} + windows = flushWindow(windows, acc) + require.Equal(t, 0, len(windows)) + require.Equal(t, 1, len(acc.Metrics)) + require.Equal(t, MeasurementThroughput, acc.Metrics[0].Measurement) +} + +func TestRequestAggregates_flushAllWindows(t *testing.T) { + acc := &testutil.Accumulator{} + now := time.Now() + windows := []Window{&ThroughputWindow{StartTime: now, EndTime: now.Add(time.Duration(60))}, + &ThroughputWindow{StartTime: now.Add(time.Duration(60)), EndTime: now.Add(time.Duration(120))}, + &ThroughputWindow{StartTime: now.Add(time.Duration(120)), EndTime: now.Add(time.Duration(180))}} + windows = flushAllWindows(windows, acc) + require.Equal(t, 0, len(windows)) + require.Equal(t, 3, len(acc.Metrics)) +} + +func TestRequestAggregates_addToWindow(t *testing.T) { + now := time.Now() + var windows []Window + // Error if there are no windows (not added) + err := addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(30))}) + require.Error(t, err) + // Okay when one window + firstWindow := &ThroughputWindow{StartTime: now, EndTime: now.Add(time.Duration(60))} + windows = append(windows, firstWindow) + err = addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(30))}) + require.NoError(t, err) + require.Equal(t, int64(1), firstWindow.RequestsTotal) + // Okay when timestamp equal to start of window + err = addToWindow(windows, &Request{Timestamp: now}) + require.NoError(t, err) + require.Equal(t, int64(2), firstWindow.RequestsTotal) + // Error when timestamp equal to end of window + err = addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(60))}) + require.Error(t, err) + // Okay with more windows + middleWindow := &ThroughputWindow{StartTime: now.Add(time.Duration(60)), EndTime: now.Add(time.Duration(120))} + lastWindow := &ThroughputWindow{StartTime: now.Add(time.Duration(120)), EndTime: now.Add(time.Duration(180))} + windows = append(windows, middleWindow) + windows = append(windows, lastWindow) + err = addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(90))}) + require.NoError(t, err) + require.Equal(t, int64(1), middleWindow.RequestsTotal) + err = addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(150))}) + require.NoError(t, err) + require.Equal(t, int64(1), lastWindow.RequestsTotal) + // Error when later than last window + err = addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(220))}) + require.Error(t, err) + // Error when before first window + err = addToWindow(windows, &Request{Timestamp: now.Add(time.Duration(-20))}) + require.Error(t, err) +} diff --git a/plugins/inputs/request_aggregates/request_test.go b/plugins/inputs/request_aggregates/request_test.go new file mode 100644 index 000000000..bfc6893ab --- /dev/null +++ b/plugins/inputs/request_aggregates/request_test.go @@ -0,0 +1,144 @@ +package request_aggregates + +import ( + "github.com/stretchr/testify/require" + "regexp" + "testing" + "time" +) + +func TestRequestParser_ParseLine_Nanos(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "ns", IsTimeEpoch: true, TimePosition: 1} + + // Test format nanoseconds + r, err := rp.ParseLine("1462380541003228260,123,\"thisissuccessful\"") + require.NoError(t, err) + require.Equal(t, time.Unix(0, 1462380541003228260), r.Timestamp) +} + +func TestRequestParser_ParseLine_Micros(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "us", IsTimeEpoch: true, TimePosition: 1} + + // Test format nanoseconds + r, err := rp.ParseLine("1462380541003228,123,\"thisissuccessful\"") + require.NoError(t, err) + require.Equal(t, time.Unix(0, 1462380541003228000), r.Timestamp) +} + +func TestRequestParser_ParseLine_Milis(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "ms", IsTimeEpoch: true, TimePosition: 1} + + // Test format nanoseconds + r, err := rp.ParseLine("1462380541003,123,\"thisissuccessful\"") + require.NoError(t, err) + require.Equal(t, time.Unix(0, 1462380541003000000), r.Timestamp) +} + +func TestRequestParser_ParseLine_Seconds(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1} + + // Test format nanoseconds + r, err := rp.ParseLine("1462380541,123,\"thisissuccessful\"") + require.NoError(t, err) + require.Equal(t, time.Unix(1462380541, 0), r.Timestamp) +} + +func TestRequestParser_ParseLine_WrongUnit(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1} + + // Test format nanoseconds + _, err := rp.ParseLine("1462380541003228260,123,\"thisissuccessful\"") + require.Error(t, err) +} + +func TestRequestParser_ParseLine_Layout(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: time.RFC3339Nano, + IsTimeEpoch: false, TimePosition: 1} + + // Test format nanoseconds + r, err := rp.ParseLine("2006-01-02T15:04:05.999999999Z,123,\"thisissuccessful\"") + require.NoError(t, err) + parsed, _ := time.Parse(time.RFC3339Nano, "2006-01-02T15:04:05.999999999Z") + require.Equal(t, parsed, r.Timestamp) +} + +func TestRequestParser_ParseLine_WrongLayout(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: time.RFC3339Nano, + IsTimeEpoch: false, TimePosition: 1} + + // Test format nanoseconds + _, err := rp.ParseLine("2006-01-02T15:04:05,123,\"thisissuccessful\"") + require.Error(t, err) +} + +func TestRequestParser_ParseLine_Int(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1} + + // Test format nanoseconds + r, err := rp.ParseLine("1462380541,123,\"thisissuccessful\"") + require.NoError(t, err) + require.Equal(t, float64(123), r.Time) +} + +func TestRequestParser_ParseLine_Float(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1} + + // Test format nanoseconds + r, err := rp.ParseLine("1462380541,123.45,\"thisissuccessful\"") + require.NoError(t, err) + require.Equal(t, float64(123.45), r.Time) +} + +func TestRequestParser_ParseLine_NoRegexp(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1} + + // Test format nanoseconds + r, err := rp.ParseLine("1462380541,123.45,\"thisissuccessful\"") + require.NoError(t, err) + require.Equal(t, false, r.Failure) +} + +func TestRequestParser_ParseLine_Success(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1, + ResultPosition: 2, SuccessRegexp: regexp.MustCompile(".*success.*")} + + // Test format nanoseconds + r, err := rp.ParseLine("1462380541,123.45,\"thisissuccessful\"") + require.NoError(t, err) + require.Equal(t, false, r.Failure) +} + +func TestRequestParser_ParseLine_Failure(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1, + ResultPosition: 2, SuccessRegexp: regexp.MustCompile(".*success.*")} + + // Test format nanoseconds + r, err := rp.ParseLine("1462380541,123.45,\"thisonefailed\"") + require.NoError(t, err) + require.Equal(t, true, r.Failure) +} + +func TestRequestParser_ParseLine_TimestampOutOfBounds(t *testing.T) { + rp := &RequestParser{TimestampPosition: 6, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1} + + // Test format nanoseconds + _, err := rp.ParseLine("1462380541,123.45,\"thisissuccessful\"") + require.Error(t, err) +} + +func TestRequestParser_ParseLine_TimeOutOfBounds(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 6} + + // Test format nanoseconds + _, err := rp.ParseLine("1462380541,123.45,\"thisissuccessful\"") + require.Error(t, err) +} + +func TestRequestParser_ParseLine_SuccessOutOfBounds(t *testing.T) { + rp := &RequestParser{TimestampPosition: 0, TimestampFormat: "s", IsTimeEpoch: true, TimePosition: 1, + ResultPosition: 8, SuccessRegexp: regexp.MustCompile(".*success.*")} + + // Test format nanoseconds + _, err := rp.ParseLine("1462380541,123.45,\"thisissuccessful\"") + require.Error(t, err) +} diff --git a/plugins/inputs/request_aggregates/window.go b/plugins/inputs/request_aggregates/window.go new file mode 100644 index 000000000..edbdfe6e4 --- /dev/null +++ b/plugins/inputs/request_aggregates/window.go @@ -0,0 +1,142 @@ +package request_aggregates + +import ( + "fmt" + "github.com/influxdata/telegraf" + "sort" + "strings" + "time" +) + +const ( + MeasurementTime = "request_aggregates_total" + MeasurementTimeFail = "request_aggregates_fail" + MeasurementTimeSuccess = "request_aggregates_success" + FieldTimeRequests = "requests" + FieldTimeMin = "time_min" + FieldTimeMax = "time_max" + FieldTimeMean = "time_mean" + FieldTimePerc = "time_perc_" + + MeasurementThroughput = "request_aggregates_throughput" + FieldThroughputTotal = "requests_total" + FieldThroughputFailed = "requests_failed" +) + +type Window interface { + Aggregate() ([]telegraf.Metric, error) + Add(request *Request) error + Start() time.Time + End() time.Time +} + +type TimeWindow struct { + StartTime time.Time + EndTime time.Time + TimesTotal []float64 + TimesSuccess []float64 + TimesFail []float64 + Percentiles []float32 + OnlyTotal bool +} + +type ThroughputWindow struct { + StartTime time.Time + EndTime time.Time + RequestsTotal int64 + RequestsFail int64 +} + +func (tw *TimeWindow) Aggregate() ([]telegraf.Metric, error) { + metrics := make([]telegraf.Metric, 3) + + var err error + metrics[0], err = aggregateTimes(MeasurementTime, tw.TimesTotal, tw.Percentiles, tw.EndTime) + if err != nil { + return metrics, err + } + if !tw.OnlyTotal { + metrics[1], err = aggregateTimes(MeasurementTimeFail, tw.TimesFail, tw.Percentiles, tw.EndTime) + if err != nil { + return metrics, err + } + metrics[2], err = aggregateTimes(MeasurementTimeSuccess, tw.TimesSuccess, tw.Percentiles, tw.EndTime) + } else { + metrics = metrics[:1] + } + + return metrics, err +} + +func (tw *TimeWindow) Add(request *Request) error { + tw.TimesTotal = append(tw.TimesTotal, request.Time) + if !tw.OnlyTotal { + if request.Failure { + tw.TimesFail = append(tw.TimesFail, request.Time) + } else { + tw.TimesSuccess = append(tw.TimesSuccess, request.Time) + } + } + return nil +} + +func (tw *TimeWindow) Start() time.Time { + return tw.StartTime +} + +func (tw *TimeWindow) End() time.Time { + return tw.EndTime +} + +func (tw *ThroughputWindow) Aggregate() ([]telegraf.Metric, error) { + metrics := make([]telegraf.Metric, 1) + + metric, err := telegraf.NewMetric(MeasurementThroughput, nil, map[string]interface{}{ + FieldThroughputTotal: tw.RequestsTotal, + FieldThroughputFailed: tw.RequestsFail}, tw.EndTime) + metrics[0] = metric + + return metrics, err +} + +func (tw *ThroughputWindow) Add(request *Request) error { + tw.RequestsTotal++ + if request.Failure { + tw.RequestsFail++ + } + return nil +} + +func (tw *ThroughputWindow) Start() time.Time { + return tw.StartTime +} + +func (tw *ThroughputWindow) End() time.Time { + return tw.EndTime +} + +// Produces a metric with the aggregates for the given times and percentiles +func aggregateTimes(name string, times []float64, percentiles []float32, endTime time.Time) (telegraf.Metric, error) { + sort.Float64s(times) + + fields := map[string]interface{}{FieldTimeRequests: len(times)} + if len(times) > 0 { + fields[FieldTimeMin] = times[0] + fields[FieldTimeMax] = times[len(times)-1] + totalSum := float64(0) + for _, time := range times { + totalSum += time + } + fields[FieldTimeMean] = totalSum / float64(len(times)) + + for _, perc := range percentiles { + i := int(float64(len(times)) * float64(perc) / float64(100)) + if i < 0 { + i = 0 + } + fields[FieldTimePerc+strings.Replace(fmt.Sprintf("%v", perc), ".", "_", -1)] = times[i] + } + } + + return telegraf.NewMetric(name, nil, fields, endTime) +} diff --git a/plugins/inputs/request_aggregates/window_test.go b/plugins/inputs/request_aggregates/window_test.go new file mode 100644 index 000000000..f6a6ddd18 --- /dev/null +++ b/plugins/inputs/request_aggregates/window_test.go @@ -0,0 +1,126 @@ +package request_aggregates + +import ( + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestTimeWindow_Add(t *testing.T) { + tw := &TimeWindow{} + + tw.Add(&Request{Time: 123.45}) + require.Equal(t, 1, len(tw.TimesTotal)) + require.Equal(t, 1, len(tw.TimesSuccess)) + require.Equal(t, 0, len(tw.TimesFail)) + require.Equal(t, float64(123.45), tw.TimesTotal[0]) + require.Equal(t, float64(123.45), tw.TimesSuccess[0]) + + tw.Add(&Request{Time: 100, Failure: false}) + require.Equal(t, 2, len(tw.TimesTotal)) + require.Equal(t, 2, len(tw.TimesSuccess)) + require.Equal(t, 0, len(tw.TimesFail)) + require.Equal(t, float64(100), tw.TimesTotal[1]) + require.Equal(t, float64(100), tw.TimesSuccess[1]) + + tw.Add(&Request{Time: 200, Failure: true}) + require.Equal(t, 3, len(tw.TimesTotal)) + require.Equal(t, 2, len(tw.TimesSuccess)) + require.Equal(t, 1, len(tw.TimesFail)) + require.Equal(t, float64(200), tw.TimesTotal[2]) + require.Equal(t, float64(200), tw.TimesFail[0]) +} + +func TestTimeWindow_Start(t *testing.T) { + now := time.Now() + tw := &TimeWindow{StartTime: now} + require.Equal(t, now, tw.Start()) +} + +func TestTimeWindow_End(t *testing.T) { + now := time.Now() + tw := &TimeWindow{EndTime: now} + require.Equal(t, now, tw.End()) +} + +func TestTimeWindow_Aggregate_All(t *testing.T) { + start := time.Now() + end := start.Add(time.Duration(60)) + tw := &TimeWindow{StartTime: start, EndTime: end, OnlyTotal: false} + metrics, err := tw.Aggregate() + require.NoError(t, err) + require.Equal(t, 3, len(metrics)) + require.Equal(t, end, metrics[0].Time()) + require.Equal(t, MeasurementTime, metrics[0].Name()) + require.Equal(t, end, metrics[1].Time()) + require.Equal(t, MeasurementTimeFail, metrics[1].Name()) + require.Equal(t, end, metrics[0].Time()) + require.Equal(t, MeasurementTimeSuccess, metrics[2].Name()) +} + +func TestTimeWindow_Aggregate_OnlyTotal(t *testing.T) { + start := time.Now() + end := start.Add(time.Duration(60)) + tw := &TimeWindow{StartTime: start, EndTime: end, OnlyTotal: true} + metrics, err := tw.Aggregate() + require.NoError(t, err) + require.Equal(t, 1, len(metrics)) + require.Equal(t, end, metrics[0].Time()) + require.Equal(t, MeasurementTime, metrics[0].Name()) +} + +func TestTimeWindow_aggregateTimes(t *testing.T) { + end := time.Now() + metric, err := aggregateTimes(MeasurementTime, []float64{500, 900, 300, 1000, 100, 600, 700, 800, 200, 400}, + []float32{60, 80, 99.9}, end) + require.NoError(t, err) + require.Equal(t, MeasurementTime, metric.Name()) + require.Equal(t, int64(10), metric.Fields()[FieldTimeRequests]) + require.Equal(t, float64(1000), metric.Fields()[FieldTimeMax]) + require.Equal(t, float64(100), metric.Fields()[FieldTimeMin]) + require.Equal(t, float64(550), metric.Fields()[FieldTimeMean]) + require.Equal(t, float64(700), metric.Fields()[FieldTimePerc+"60"]) + require.Equal(t, float64(900), metric.Fields()[FieldTimePerc+"80"]) + require.Equal(t, float64(1000), metric.Fields()[FieldTimePerc+"99_9"]) +} + +func TestThroughputWindow_Add(t *testing.T) { + tw := &ThroughputWindow{} + + tw.Add(&Request{}) + require.Equal(t, int64(1), tw.RequestsTotal) + require.Equal(t, int64(0), tw.RequestsFail) + + tw.Add(&Request{Failure: false}) + require.Equal(t, int64(2), tw.RequestsTotal) + require.Equal(t, int64(0), tw.RequestsFail) + + tw.Add(&Request{Failure: true}) + require.Equal(t, int64(3), tw.RequestsTotal) + require.Equal(t, int64(1), tw.RequestsFail) +} + +func TestThroughputWindow_Start(t *testing.T) { + now := time.Now() + tw := &ThroughputWindow{StartTime: now} + require.Equal(t, now, tw.Start()) +} + +func TestThroughputWindow_End(t *testing.T) { + now := time.Now() + tw := &ThroughputWindow{EndTime: now} + require.Equal(t, now, tw.End()) +} + +func TestThroughputWindow_Aggregate(t *testing.T) { + start := time.Now() + end := start.Add(time.Duration(60)) + tw := &ThroughputWindow{StartTime: start, EndTime: end, RequestsTotal: 33, RequestsFail: 11} + metrics, err := tw.Aggregate() + require.NoError(t, err) + require.Equal(t, 1, len(metrics)) + require.Equal(t, end, metrics[0].Time()) + require.Equal(t, MeasurementThroughput, metrics[0].Name()) + require.Equal(t, int64(33), metrics[0].Fields()[FieldThroughputTotal]) + require.Equal(t, int64(11), metrics[0].Fields()[FieldThroughputFailed]) +}