Add valuecounter aggregator plugin (#3523)
This commit is contained in:
parent
3626a522e5
commit
a059689378
|
@ -282,6 +282,7 @@ formats may be used with input plugins supporting the `data_format` option:
|
||||||
* [basicstats](./plugins/aggregators/basicstats)
|
* [basicstats](./plugins/aggregators/basicstats)
|
||||||
* [minmax](./plugins/aggregators/minmax)
|
* [minmax](./plugins/aggregators/minmax)
|
||||||
* [histogram](./plugins/aggregators/histogram)
|
* [histogram](./plugins/aggregators/histogram)
|
||||||
|
* [valuecounter](./plugins/aggregators/valuecounter)
|
||||||
|
|
||||||
## Output Plugins
|
## Output Plugins
|
||||||
|
|
||||||
|
|
|
@ -4,4 +4,5 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/aggregators/basicstats"
|
_ "github.com/influxdata/telegraf/plugins/aggregators/basicstats"
|
||||||
_ "github.com/influxdata/telegraf/plugins/aggregators/histogram"
|
_ "github.com/influxdata/telegraf/plugins/aggregators/histogram"
|
||||||
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
|
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/aggregators/valuecounter"
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
# ValueCounter Aggregator Plugin
|
||||||
|
|
||||||
|
The valuecounter plugin counts the occurrence of values in fields and emits the
|
||||||
|
counter once every 'period' seconds.
|
||||||
|
|
||||||
|
A use case for the valuecounter plugin is when you are processing a HTTP access
|
||||||
|
log (with the logparser input) and want to count the HTTP status codes.
|
||||||
|
|
||||||
|
The fields which will be counted must be configured with the `fields`
|
||||||
|
configuration directive. When no `fields` is provided the plugin will not count
|
||||||
|
any fields. The results are emitted in fields in the format:
|
||||||
|
`originalfieldname_fieldvalue = count`.
|
||||||
|
|
||||||
|
Valuecounter only works on fields of the type int, bool or string. Float fields
|
||||||
|
are being dropped to prevent the creating of too many fields.
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[aggregators.valuecounter]]
|
||||||
|
## General Aggregator Arguments:
|
||||||
|
## The period on which to flush & clear the aggregator.
|
||||||
|
period = "30s"
|
||||||
|
## If true, the original metric will be dropped by the
|
||||||
|
## aggregator and will not get sent to the output plugins.
|
||||||
|
drop_original = false
|
||||||
|
## The fields for which the values will be counted
|
||||||
|
fields = ["status"]
|
||||||
|
```
|
||||||
|
|
||||||
|
### Measurements & Fields:
|
||||||
|
|
||||||
|
- measurement1
|
||||||
|
- field_value1
|
||||||
|
- field_value2
|
||||||
|
|
||||||
|
### Tags:
|
||||||
|
|
||||||
|
No tags are applied by this aggregator.
|
||||||
|
|
||||||
|
### Example Output:
|
||||||
|
|
||||||
|
Example for parsing a HTTP access log.
|
||||||
|
|
||||||
|
telegraf.conf:
|
||||||
|
```
|
||||||
|
[[inputs.logparser]]
|
||||||
|
files = ["/tmp/tst.log"]
|
||||||
|
[inputs.logparser.grok]
|
||||||
|
patterns = ['%{DATA:url:tag} %{NUMBER:response:string}']
|
||||||
|
measurement = "access"
|
||||||
|
|
||||||
|
[[aggregators.valuecounter]]
|
||||||
|
namepass = ["access"]
|
||||||
|
fields = ["response"]
|
||||||
|
```
|
||||||
|
|
||||||
|
/tmp/tst.log
|
||||||
|
```
|
||||||
|
/some/path 200
|
||||||
|
/some/path 401
|
||||||
|
/some/path 200
|
||||||
|
```
|
||||||
|
|
||||||
|
```
|
||||||
|
$ telegraf --config telegraf.conf --quiet
|
||||||
|
|
||||||
|
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991487011
|
||||||
|
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="401" 1511948755991522282
|
||||||
|
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991531697
|
||||||
|
|
||||||
|
access,path=/tmp/tst.log,host=localhost.localdomain,url=/some/path response_200=2i,response_401=1i 1511948761000000000
|
||||||
|
```
|
|
@ -0,0 +1,108 @@
|
||||||
|
package valuecounter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/aggregators"
|
||||||
|
)
|
||||||
|
|
||||||
|
type aggregate struct {
|
||||||
|
name string
|
||||||
|
tags map[string]string
|
||||||
|
fieldCount map[string]int
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValueCounter an aggregation plugin
|
||||||
|
type ValueCounter struct {
|
||||||
|
cache map[uint64]aggregate
|
||||||
|
Fields []string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewValueCounter create a new aggregation plugin which counts the occurances
|
||||||
|
// of fields and emits the count.
|
||||||
|
func NewValueCounter() telegraf.Aggregator {
|
||||||
|
vc := &ValueCounter{}
|
||||||
|
vc.Reset()
|
||||||
|
return vc
|
||||||
|
}
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
## General Aggregator Arguments:
|
||||||
|
## The period on which to flush & clear the aggregator.
|
||||||
|
period = "30s"
|
||||||
|
## If true, the original metric will be dropped by the
|
||||||
|
## aggregator and will not get sent to the output plugins.
|
||||||
|
drop_original = false
|
||||||
|
## The fields for which the values will be counted
|
||||||
|
fields = []
|
||||||
|
`
|
||||||
|
|
||||||
|
// SampleConfig generates a sample config for the ValueCounter plugin
|
||||||
|
func (vc *ValueCounter) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
// Description returns the description of the ValueCounter plugin
|
||||||
|
func (vc *ValueCounter) Description() string {
|
||||||
|
return "Count the occurance of values in fields."
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add is run on every metric which passes the plugin
|
||||||
|
func (vc *ValueCounter) Add(in telegraf.Metric) {
|
||||||
|
id := in.HashID()
|
||||||
|
|
||||||
|
// Check if the cache already has an entry for this metric, if not create it
|
||||||
|
if _, ok := vc.cache[id]; !ok {
|
||||||
|
a := aggregate{
|
||||||
|
name: in.Name(),
|
||||||
|
tags: in.Tags(),
|
||||||
|
fieldCount: make(map[string]int),
|
||||||
|
}
|
||||||
|
vc.cache[id] = a
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if this metric has fields which we need to count, if so increment
|
||||||
|
// the count.
|
||||||
|
for fk, fv := range in.Fields() {
|
||||||
|
for _, cf := range vc.Fields {
|
||||||
|
if fk == cf {
|
||||||
|
// Do not process float types to prevent memory from blowing up
|
||||||
|
switch fv.(type) {
|
||||||
|
default:
|
||||||
|
log.Printf("I! Valuecounter: Unsupported field type. " +
|
||||||
|
"Must be an int, string or bool. Ignoring.")
|
||||||
|
continue
|
||||||
|
case uint64, int64, string, bool:
|
||||||
|
}
|
||||||
|
fn := fmt.Sprintf("%v_%v", fk, fv)
|
||||||
|
vc.cache[id].fieldCount[fn]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push emits the counters
|
||||||
|
func (vc *ValueCounter) Push(acc telegraf.Accumulator) {
|
||||||
|
for _, agg := range vc.cache {
|
||||||
|
fields := map[string]interface{}{}
|
||||||
|
|
||||||
|
for field, count := range agg.fieldCount {
|
||||||
|
fields[field] = count
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AddFields(agg.name, fields, agg.tags)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset the cache, executed after each push
|
||||||
|
func (vc *ValueCounter) Reset() {
|
||||||
|
vc.cache = make(map[uint64]aggregate)
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
aggregators.Add("valuecounter", func() telegraf.Aggregator {
|
||||||
|
return NewValueCounter()
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,126 @@
|
||||||
|
package valuecounter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Create a valuecounter with config
|
||||||
|
func NewTestValueCounter(fields []string) telegraf.Aggregator {
|
||||||
|
vc := &ValueCounter{
|
||||||
|
Fields: fields,
|
||||||
|
}
|
||||||
|
vc.Reset()
|
||||||
|
|
||||||
|
return vc
|
||||||
|
}
|
||||||
|
|
||||||
|
var m1, _ = metric.New("m1",
|
||||||
|
map[string]string{"foo": "bar"},
|
||||||
|
map[string]interface{}{
|
||||||
|
"status": 200,
|
||||||
|
"somefield": 20.1,
|
||||||
|
"foobar": "bar",
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
var m2, _ = metric.New("m1",
|
||||||
|
map[string]string{"foo": "bar"},
|
||||||
|
map[string]interface{}{
|
||||||
|
"status": "OK",
|
||||||
|
"ignoreme": "string",
|
||||||
|
"andme": true,
|
||||||
|
"boolfield": false,
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkApply(b *testing.B) {
|
||||||
|
vc := NewTestValueCounter([]string{"status"})
|
||||||
|
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
vc.Add(m1)
|
||||||
|
vc.Add(m2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test basic functionality
|
||||||
|
func TestBasic(t *testing.T) {
|
||||||
|
vc := NewTestValueCounter([]string{"status"})
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
|
||||||
|
vc.Add(m1)
|
||||||
|
vc.Add(m2)
|
||||||
|
vc.Add(m1)
|
||||||
|
vc.Push(&acc)
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"status_200": 2,
|
||||||
|
"status_OK": 1,
|
||||||
|
}
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test with multiple fields to count
|
||||||
|
func TestMultipleFields(t *testing.T) {
|
||||||
|
vc := NewTestValueCounter([]string{"status", "somefield", "boolfield"})
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
|
||||||
|
vc.Add(m1)
|
||||||
|
vc.Add(m2)
|
||||||
|
vc.Add(m2)
|
||||||
|
vc.Add(m1)
|
||||||
|
vc.Push(&acc)
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"status_200": 2,
|
||||||
|
"status_OK": 2,
|
||||||
|
"boolfield_false": 2,
|
||||||
|
}
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test with a reset between two runs
|
||||||
|
func TestWithReset(t *testing.T) {
|
||||||
|
vc := NewTestValueCounter([]string{"status"})
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
|
||||||
|
vc.Add(m1)
|
||||||
|
vc.Add(m1)
|
||||||
|
vc.Add(m2)
|
||||||
|
vc.Push(&acc)
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"status_200": 2,
|
||||||
|
"status_OK": 1,
|
||||||
|
}
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
|
||||||
|
|
||||||
|
acc.ClearMetrics()
|
||||||
|
vc.Reset()
|
||||||
|
|
||||||
|
vc.Add(m2)
|
||||||
|
vc.Add(m2)
|
||||||
|
vc.Add(m1)
|
||||||
|
vc.Push(&acc)
|
||||||
|
|
||||||
|
expectedFields = map[string]interface{}{
|
||||||
|
"status_200": 1,
|
||||||
|
"status_OK": 2,
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
|
||||||
|
}
|
Loading…
Reference in New Issue