diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ec7a35363..5aefbfdcb 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -2,7 +2,7 @@ 1. [Sign the CLA](http://influxdb.com/community/cla.html) 1. Make changes or write plugin (see below for details) -1. Add your plugin to `plugins/inputs/all/all.go` or `plugins/outputs/all/all.go` +1. Add your plugin to one of: `plugins/{inputs,outputs,aggregators,processors}/all/all.go` 1. If your plugin requires a new Go package, [add it](https://github.com/influxdata/telegraf/blob/master/CONTRIBUTING.md#adding-a-dependency) 1. Write a README for your plugin, if it's an input plugin, it should be structured @@ -16,8 +16,8 @@ for a good example. ## GoDoc -Public interfaces for inputs, outputs, metrics, and the accumulator can be found -on the GoDoc +Public interfaces for inputs, outputs, processors, aggregators, metrics, +and the accumulator can be found on the GoDoc [![GoDoc](https://godoc.org/github.com/influxdata/telegraf?status.svg)](https://godoc.org/github.com/influxdata/telegraf) @@ -46,7 +46,7 @@ and submit new inputs. ### Input Plugin Guidelines -* A plugin must conform to the `telegraf.Input` interface. +* A plugin must conform to the [`telegraf.Input`](https://godoc.org/github.com/influxdata/telegraf#Input) interface. * Input Plugins should call `inputs.Add` in their `init` function to register themselves. See below for a quick example. * Input Plugins must be added to the @@ -177,7 +177,7 @@ similar constructs. ### Output Plugin Guidelines -* An output must conform to the `outputs.Output` interface. +* An output must conform to the [`telegraf.Output`](https://godoc.org/github.com/influxdata/telegraf#Output) interface. * Outputs should call `outputs.Add` in their `init` function to register themselves. See below for a quick example. * To be available within Telegraf itself, plugins must add themselves to the @@ -275,6 +275,186 @@ and `Stop()` methods. * Same as the `Output` guidelines, except that they must conform to the `output.ServiceOutput` interface. +## Processor Plugins + +This section is for developers who want to create a new processor plugin. + +### Processor Plugin Guidelines + +* A processor must conform to the [`telegraf.Processor`](https://godoc.org/github.com/influxdata/telegraf#Processor) interface. +* Processors should call `processors.Add` in their `init` function to register themselves. +See below for a quick example. +* To be available within Telegraf itself, plugins must add themselves to the +`github.com/influxdata/telegraf/plugins/processors/all/all.go` file. +* The `SampleConfig` function should return valid toml that describes how the +processor can be configured. This is include in `telegraf -sample-config`. +* The `Description` function should say in one line what this processor does. + +### Processor Example + +```go +package printer + +// printer.go + +import ( + "fmt" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +type Printer struct { +} + +var sampleConfig = ` +` + +func (p *Printer) SampleConfig() string { + return sampleConfig +} + +func (p *Printer) Description() string { + return "Print all metrics that pass through this filter." +} + +func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric { + for _, metric := range in { + fmt.Println(metric.String()) + } + return in +} + +func init() { + processors.Add("printer", func() telegraf.Processor { + return &Printer{} + }) +} +``` + +## Aggregator Plugins + +This section is for developers who want to create a new aggregator plugin. + +### Aggregator Plugin Guidelines + +* A aggregator must conform to the [`telegraf.Aggregator`](https://godoc.org/github.com/influxdata/telegraf#Aggregator) interface. +* Aggregators should call `aggregators.Add` in their `init` function to register themselves. +See below for a quick example. +* To be available within Telegraf itself, plugins must add themselves to the +`github.com/influxdata/telegraf/plugins/aggregators/all/all.go` file. +* The `SampleConfig` function should return valid toml that describes how the +aggregator can be configured. This is include in `telegraf -sample-config`. +* The `Description` function should say in one line what this aggregator does. +* The Aggregator plugin will need to keep caches of metrics that have passed +through it. This should be done using the builtin `HashID()` function of each +metric. +* When the `Reset()` function is called, all caches should be cleared. + +### Aggregator Example + +```go +package min + +// min.go + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/aggregators" +) + +type Min struct { + // caches for metric fields, names, and tags + fieldCache map[uint64]map[string]float64 + nameCache map[uint64]string + tagCache map[uint64]map[string]string +} + +func NewMin() telegraf.Aggregator { + m := &Min{} + m.Reset() + return m +} + +var sampleConfig = ` + ## period is the flush & clear interval of the aggregator. + period = "30s" + ## If true drop_original will drop the original metrics and + ## only send aggregates. + drop_original = false +` + +func (m *Min) SampleConfig() string { + return sampleConfig +} + +func (m *Min) Description() string { + return "Keep the aggregate min of each metric passing through." +} + +func (m *Min) Add(in telegraf.Metric) { + id := in.HashID() + if _, ok := m.nameCache[id]; !ok { + // hit an uncached metric, create caches for first time: + m.nameCache[id] = in.Name() + m.tagCache[id] = in.Tags() + m.fieldCache[id] = make(map[string]float64) + for k, v := range in.Fields() { + if fv, ok := convert(v); ok { + m.fieldCache[id][k] = fv + } + } + } else { + for k, v := range in.Fields() { + if fv, ok := convert(v); ok { + if _, ok := m.fieldCache[id][k]; !ok { + // hit an uncached field of a cached metric + m.fieldCache[id][k] = fv + continue + } + if fv < m.fieldCache[id][k] { + // set new minimum + m.fieldCache[id][k] = fv + } + } + } + } +} + +func (m *Min) Push(acc telegraf.Accumulator) { + for id, _ := range m.nameCache { + fields := map[string]interface{}{} + for k, v := range m.fieldCache[id] { + fields[k+"_min"] = v + } + acc.AddFields(m.nameCache[id], fields, m.tagCache[id]) + } +} + +func (m *Min) Reset() { + m.fieldCache = make(map[uint64]map[string]float64) + m.nameCache = make(map[uint64]string) + m.tagCache = make(map[uint64]map[string]string) +} + +func convert(in interface{}) (float64, bool) { + switch v := in.(type) { + case float64: + return v, true + case int64: + return float64(v), true + default: + return 0, false + } +} + +func init() { + aggregators.Add("min", func() telegraf.Aggregator { + return NewMin() + }) +} +``` + ## Unit Tests ### Execute short tests