From 3ad10283efaa0601daf32c09a7e4c689337a1347 Mon Sep 17 00:00:00 2001 From: Piotr Popieluch Date: Tue, 19 Jun 2018 03:06:11 +0200 Subject: [PATCH] Add valuecounter aggregator plugin (#3523) --- README.md | 1 + plugins/aggregators/all/all.go | 1 + plugins/aggregators/valuecounter/README.md | 73 ++++++++++ .../aggregators/valuecounter/valuecounter.go | 108 +++++++++++++++ .../valuecounter/valuecounter_test.go | 126 ++++++++++++++++++ 5 files changed, 309 insertions(+) create mode 100644 plugins/aggregators/valuecounter/README.md create mode 100644 plugins/aggregators/valuecounter/valuecounter.go create mode 100644 plugins/aggregators/valuecounter/valuecounter_test.go diff --git a/README.md b/README.md index e5a6634d5..06dd74d94 100644 --- a/README.md +++ b/README.md @@ -282,6 +282,7 @@ formats may be used with input plugins supporting the `data_format` option: * [basicstats](./plugins/aggregators/basicstats) * [minmax](./plugins/aggregators/minmax) * [histogram](./plugins/aggregators/histogram) +* [valuecounter](./plugins/aggregators/valuecounter) ## Output Plugins diff --git a/plugins/aggregators/all/all.go b/plugins/aggregators/all/all.go index 98aecb83f..ff1bbfc70 100644 --- a/plugins/aggregators/all/all.go +++ b/plugins/aggregators/all/all.go @@ -4,4 +4,5 @@ import ( _ "github.com/influxdata/telegraf/plugins/aggregators/basicstats" _ "github.com/influxdata/telegraf/plugins/aggregators/histogram" _ "github.com/influxdata/telegraf/plugins/aggregators/minmax" + _ "github.com/influxdata/telegraf/plugins/aggregators/valuecounter" ) diff --git a/plugins/aggregators/valuecounter/README.md b/plugins/aggregators/valuecounter/README.md new file mode 100644 index 000000000..3d132c3bb --- /dev/null +++ b/plugins/aggregators/valuecounter/README.md @@ -0,0 +1,73 @@ +# ValueCounter Aggregator Plugin + +The valuecounter plugin counts the occurrence of values in fields and emits the +counter once every 'period' seconds. + +A use case for the valuecounter plugin is when you are processing a HTTP access +log (with the logparser input) and want to count the HTTP status codes. + +The fields which will be counted must be configured with the `fields` +configuration directive. When no `fields` is provided the plugin will not count +any fields. The results are emitted in fields in the format: +`originalfieldname_fieldvalue = count`. + +Valuecounter only works on fields of the type int, bool or string. Float fields +are being dropped to prevent the creating of too many fields. + +### Configuration: + +```toml +[[aggregators.valuecounter]] + ## General Aggregator Arguments: + ## The period on which to flush & clear the aggregator. + period = "30s" + ## If true, the original metric will be dropped by the + ## aggregator and will not get sent to the output plugins. + drop_original = false + ## The fields for which the values will be counted + fields = ["status"] +``` + +### Measurements & Fields: + +- measurement1 + - field_value1 + - field_value2 + +### Tags: + +No tags are applied by this aggregator. + +### Example Output: + +Example for parsing a HTTP access log. + +telegraf.conf: +``` +[[inputs.logparser]] + files = ["/tmp/tst.log"] + [inputs.logparser.grok] + patterns = ['%{DATA:url:tag} %{NUMBER:response:string}'] + measurement = "access" + +[[aggregators.valuecounter]] + namepass = ["access"] + fields = ["response"] +``` + +/tmp/tst.log +``` +/some/path 200 +/some/path 401 +/some/path 200 +``` + +``` +$ telegraf --config telegraf.conf --quiet + +access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991487011 +access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="401" 1511948755991522282 +access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991531697 + +access,path=/tmp/tst.log,host=localhost.localdomain,url=/some/path response_200=2i,response_401=1i 1511948761000000000 +``` diff --git a/plugins/aggregators/valuecounter/valuecounter.go b/plugins/aggregators/valuecounter/valuecounter.go new file mode 100644 index 000000000..c43b7723b --- /dev/null +++ b/plugins/aggregators/valuecounter/valuecounter.go @@ -0,0 +1,108 @@ +package valuecounter + +import ( + "fmt" + "log" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/aggregators" +) + +type aggregate struct { + name string + tags map[string]string + fieldCount map[string]int +} + +// ValueCounter an aggregation plugin +type ValueCounter struct { + cache map[uint64]aggregate + Fields []string +} + +// NewValueCounter create a new aggregation plugin which counts the occurances +// of fields and emits the count. +func NewValueCounter() telegraf.Aggregator { + vc := &ValueCounter{} + vc.Reset() + return vc +} + +var sampleConfig = ` + ## General Aggregator Arguments: + ## The period on which to flush & clear the aggregator. + period = "30s" + ## If true, the original metric will be dropped by the + ## aggregator and will not get sent to the output plugins. + drop_original = false + ## The fields for which the values will be counted + fields = [] +` + +// SampleConfig generates a sample config for the ValueCounter plugin +func (vc *ValueCounter) SampleConfig() string { + return sampleConfig +} + +// Description returns the description of the ValueCounter plugin +func (vc *ValueCounter) Description() string { + return "Count the occurance of values in fields." +} + +// Add is run on every metric which passes the plugin +func (vc *ValueCounter) Add(in telegraf.Metric) { + id := in.HashID() + + // Check if the cache already has an entry for this metric, if not create it + if _, ok := vc.cache[id]; !ok { + a := aggregate{ + name: in.Name(), + tags: in.Tags(), + fieldCount: make(map[string]int), + } + vc.cache[id] = a + } + + // Check if this metric has fields which we need to count, if so increment + // the count. + for fk, fv := range in.Fields() { + for _, cf := range vc.Fields { + if fk == cf { + // Do not process float types to prevent memory from blowing up + switch fv.(type) { + default: + log.Printf("I! Valuecounter: Unsupported field type. " + + "Must be an int, string or bool. Ignoring.") + continue + case uint64, int64, string, bool: + } + fn := fmt.Sprintf("%v_%v", fk, fv) + vc.cache[id].fieldCount[fn]++ + } + } + } +} + +// Push emits the counters +func (vc *ValueCounter) Push(acc telegraf.Accumulator) { + for _, agg := range vc.cache { + fields := map[string]interface{}{} + + for field, count := range agg.fieldCount { + fields[field] = count + } + + acc.AddFields(agg.name, fields, agg.tags) + } +} + +// Reset the cache, executed after each push +func (vc *ValueCounter) Reset() { + vc.cache = make(map[uint64]aggregate) +} + +func init() { + aggregators.Add("valuecounter", func() telegraf.Aggregator { + return NewValueCounter() + }) +} diff --git a/plugins/aggregators/valuecounter/valuecounter_test.go b/plugins/aggregators/valuecounter/valuecounter_test.go new file mode 100644 index 000000000..01c68c496 --- /dev/null +++ b/plugins/aggregators/valuecounter/valuecounter_test.go @@ -0,0 +1,126 @@ +package valuecounter + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" +) + +// Create a valuecounter with config +func NewTestValueCounter(fields []string) telegraf.Aggregator { + vc := &ValueCounter{ + Fields: fields, + } + vc.Reset() + + return vc +} + +var m1, _ = metric.New("m1", + map[string]string{"foo": "bar"}, + map[string]interface{}{ + "status": 200, + "somefield": 20.1, + "foobar": "bar", + }, + time.Now(), +) + +var m2, _ = metric.New("m1", + map[string]string{"foo": "bar"}, + map[string]interface{}{ + "status": "OK", + "ignoreme": "string", + "andme": true, + "boolfield": false, + }, + time.Now(), +) + +func BenchmarkApply(b *testing.B) { + vc := NewTestValueCounter([]string{"status"}) + + for n := 0; n < b.N; n++ { + vc.Add(m1) + vc.Add(m2) + } +} + +// Test basic functionality +func TestBasic(t *testing.T) { + vc := NewTestValueCounter([]string{"status"}) + acc := testutil.Accumulator{} + + vc.Add(m1) + vc.Add(m2) + vc.Add(m1) + vc.Push(&acc) + + expectedFields := map[string]interface{}{ + "status_200": 2, + "status_OK": 1, + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +// Test with multiple fields to count +func TestMultipleFields(t *testing.T) { + vc := NewTestValueCounter([]string{"status", "somefield", "boolfield"}) + acc := testutil.Accumulator{} + + vc.Add(m1) + vc.Add(m2) + vc.Add(m2) + vc.Add(m1) + vc.Push(&acc) + + expectedFields := map[string]interface{}{ + "status_200": 2, + "status_OK": 2, + "boolfield_false": 2, + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +// Test with a reset between two runs +func TestWithReset(t *testing.T) { + vc := NewTestValueCounter([]string{"status"}) + acc := testutil.Accumulator{} + + vc.Add(m1) + vc.Add(m1) + vc.Add(m2) + vc.Push(&acc) + + expectedFields := map[string]interface{}{ + "status_200": 2, + "status_OK": 1, + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) + + acc.ClearMetrics() + vc.Reset() + + vc.Add(m2) + vc.Add(m2) + vc.Add(m1) + vc.Push(&acc) + + expectedFields = map[string]interface{}{ + "status_200": 1, + "status_OK": 2, + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +}