diff --git a/Godeps b/Godeps index 2ac95a904..0550ddcc8 100644 --- a/Godeps +++ b/Godeps @@ -1,5 +1,6 @@ github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9 github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc +github.com/VividCortex/gohistogram da38b6e56f2f7dc1999a037141441e50d6213f5d github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 diff --git a/agent/agent.go b/agent/agent.go index 1423ef773..a213bb5ec 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -242,28 +242,43 @@ func (a *Agent) flush() { } }(o) } - wg.Wait() } // flusher monitors the metrics input channel and flushes on the minimum interval func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error { + var metricStream chan telegraf.Metric + stopFilter := make(chan struct{}, len(a.Config.Filters)) // Inelegant, but this sleep is to allow the Gather threads to run, so that // the flusher will flush after metrics are collected. time.Sleep(time.Millisecond * 200) - ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) + metricStream = metricC + if len(a.Config.Filters) != 0 { + for _, name := range a.Config.FiltersOrder { + filter := a.Config.Filters[name] + log.Printf("Filter %s is enabled", name) + metricStream = filter.Pipe(metricStream) + go func(f telegraf.Filter) { + f.Start(stopFilter) + }(filter) + } + } for { select { case <-shutdown: + //sending shutdown signal for all filters + for range a.Config.Filters { + stopFilter <- struct{}{} + } log.Println("Hang on, flushing any cached metrics before shutdown") a.flush() return nil case <-ticker.C: internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() - case m := <-metricC: + case m := <-metricStream: for _, o := range a.Config.Outputs { o.AddMetric(m) } diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 6681ad073..42a7c287b 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + _ "github.com/influxdata/telegraf/plugins/filters/all" "github.com/influxdata/telegraf/plugins/inputs" _ "github.com/influxdata/telegraf/plugins/inputs/all" "github.com/influxdata/telegraf/plugins/outputs" diff --git a/filter.go b/filter.go new file mode 100644 index 000000000..410114d68 --- /dev/null +++ b/filter.go @@ -0,0 +1,15 @@ +package telegraf + +type Filter interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + //create pipe for filter + Pipe(in chan Metric) chan Metric + + // start the filter + Start(shutdown chan struct{}) +} diff --git a/internal/config/config.go b/internal/config/config.go index fdc9a8753..a27b4e9d4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/plugins/filters" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" @@ -46,9 +47,11 @@ type Config struct { InputFilters []string OutputFilters []string - Agent *AgentConfig - Inputs []*internal_models.RunningInput - Outputs []*internal_models.RunningOutput + Agent *AgentConfig + Inputs []*internal_models.RunningInput + Outputs []*internal_models.RunningOutput + Filters map[string]telegraf.Filter + FiltersOrder []string } func NewConfig() *Config { @@ -63,6 +66,8 @@ func NewConfig() *Config { Tags: make(map[string]string), Inputs: make([]*internal_models.RunningInput, 0), Outputs: make([]*internal_models.RunningOutput, 0), + Filters: make(map[string]telegraf.Filter), + FiltersOrder: make([]string, 0), InputFilters: make([]string, 0), OutputFilters: make([]string, 0), } @@ -516,6 +521,25 @@ func (c *Config) LoadConfig(path string) error { pluginName, path) } } + case "filter": + for pluginName, pluginVal := range subTable.Fields { + switch pluginSubTable := pluginVal.(type) { + case *ast.Table: + if err = c.addFilter(pluginName, pluginSubTable); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + case []*ast.Table: + for _, t := range pluginSubTable { + if err = c.addFilter(pluginName, t); err != nil { + return fmt.Errorf("Error parsing %s, %s", path, err) + } + } + default: + return fmt.Errorf("Unsupported config format: %s, file %s", + pluginName, path) + } + } + // Assume it's an input input for legacy config file support if no other // identifiers are present default: @@ -583,6 +607,25 @@ func (c *Config) addOutput(name string, table *ast.Table) error { return nil } +func (c *Config) addFilter(name string, table *ast.Table) error { + creator, ok := filters.Filters[name] + if !ok { + return fmt.Errorf("Undefined but requested filter: %s", name) + } + filter := creator() + + if err := config.UnmarshalTable(table, filter); err != nil { + return err + } + + if _, ok = c.Filters[name]; ok { + return fmt.Errorf("Filter already defined %s", name) + } + c.Filters[name] = filter + c.FiltersOrder = append(c.FiltersOrder, name) + return nil +} + func (c *Config) addInput(name string, table *ast.Table) error { if len(c.InputFilters) > 0 && !sliceContains(name, c.InputFilters) { return nil diff --git a/plugins/filters/all/all.go b/plugins/filters/all/all.go new file mode 100644 index 000000000..7d6a9242b --- /dev/null +++ b/plugins/filters/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/filters/histogram" +) diff --git a/plugins/filters/histogram/aggregate.go b/plugins/filters/histogram/aggregate.go new file mode 100644 index 000000000..2b233a72b --- /dev/null +++ b/plugins/filters/histogram/aggregate.go @@ -0,0 +1,65 @@ +package histogram + +import ( + "github.com/VividCortex/gohistogram" + "math" +) + +type Aggregate struct { + hist *gohistogram.NumericHistogram + sum float64 + max float64 + min float64 +} + +func (a *Aggregate) Add(n float64) { + a.sum += n + if a.max < n { + a.max = n + } + if a.min > n { + a.min = n + } + a.hist.Add(n) +} + +func (a *Aggregate) Quantile(q float64) float64 { + return a.hist.Quantile(q) +} + +func (a *Aggregate) Sum() float64 { + return a.sum +} + +func (a *Aggregate) CDF(x float64) float64 { + return a.hist.CDF(x) +} + +func (a *Aggregate) Mean() float64 { + return a.hist.Mean() +} + +func (a *Aggregate) Variance() float64 { + return a.hist.Variance() +} + +func (a *Aggregate) Count() float64 { + return a.hist.Count() +} + +func (a *Aggregate) Max() float64 { + return a.max +} + +func (a *Aggregate) Min() float64 { + return a.min +} + +func NewAggregate(n int) *Aggregate { + return &Aggregate{ + hist: gohistogram.NewHistogram(n), + max: math.SmallestNonzeroFloat64, + min: math.MaxFloat64, + sum: 0, + } +} diff --git a/plugins/filters/histogram/histogram.go b/plugins/filters/histogram/histogram.go new file mode 100644 index 000000000..9cf47a884 --- /dev/null +++ b/plugins/filters/histogram/histogram.go @@ -0,0 +1,168 @@ +package histogram + +import ( + "crypto/sha1" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/filters" + "io" + "log" + "sort" + "strconv" + "time" +) + +const field_sep = "." + +type metricID struct { + Name string + TagHash [sha1.Size]byte +} +type Histogram struct { + inch chan telegraf.Metric + outch chan telegraf.Metric + FlushInterval string + interval time.Duration + Bucketsize int + Metrics map[string][]float64 + fieldMap map[metricID]map[string]*Aggregate + metricTags map[metricID]map[string]string +} + +func (h *Histogram) Description() string { + return "Histogram: read metrics from inputs and create histogram for output" +} + +func (h *Histogram) SampleConfig() string { + return ` + ## Histogram Filter + ## This filter can be used to generate + ## (mean varince percentile count) + ## values generated are approxmation please refer to + ## Ben-Haim & Yom-Tov's A Streaming Parallel Decision Tree Algorithm + ## http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf + ##[[filter.histogram]] + ## bucket size if increase it will increase accuracy + ## but it will increase memory usage + ## bucketsize = 20 + ## [filter.histogram.metrics] + ## if array is empty only count mean + ## and variance will be cacluated. _ALL_METRIC special constanct + ## can be used instead of metric name this will aggregate all the + ## the merrtics for this filer + ## metric name = [percentiles] + ## tail = [0.90] +` +} + +func (h *Histogram) Pipe(in chan telegraf.Metric) chan telegraf.Metric { + h.inch = in + h.outch = make(chan telegraf.Metric, 10000) + return h.outch +} + +func (h *Histogram) Start(shutdown chan struct{}) { + interval, _ := time.ParseDuration(h.FlushInterval) + ticker := time.NewTicker(interval) + for { + select { + case m := <-h.inch: + if h.IsEnabled(m.Name()) { + h.AddMetric(m) + } else { + h.outch <- m + } + case <-shutdown: + log.Printf("Shuting down filters, All metric in the queue will be lost.") + return + case <-ticker.C: + h.OutputMetric() + } + } +} + +func (h *Histogram) hashTags(m map[string]string) (result [sha1.Size]byte) { + hash := sha1.New() + keys := []string{} + for key := range m { + keys = append(keys, key) + } + sort.Strings(keys) + for _, item := range keys { + io.WriteString(hash, item+m[item]) + } + copy(result[:], hash.Sum(nil)) + return result +} + +func (h *Histogram) AddMetric(metric telegraf.Metric) { + mID := metricID{ + Name: metric.Name(), + TagHash: h.hashTags(metric.Tags()), + } + if h.fieldMap[mID] == nil { + h.fieldMap[mID] = make(map[string]*Aggregate) + } + if h.metricTags[mID] == nil { + h.metricTags[mID] = make(map[string]string) + } + h.metricTags[mID] = metric.Tags() + for key, val := range metric.Fields() { + switch v := val.(type) { + case float64: + if h.fieldMap[mID][key] == nil { + h.fieldMap[mID][key] = NewAggregate(h.Bucketsize) + } + hist := h.fieldMap[mID][key] + hist.Add(v) + default: + log.Printf("When stats enabled all the fields should be of type float64 [field name %s]", key) + } + } +} + +func (h *Histogram) IsEnabled(name string) bool { + _, isAllEnabled := h.Metrics["_ALL_METRIC"] + _, ok := h.Metrics[name] + return ok || isAllEnabled +} + +func (h *Histogram) OutputMetric() { + all_percentile := h.Metrics["_ALL_METRIC"] + for mID, fields := range h.fieldMap { + mFields := make(map[string]interface{}) + for key, val := range fields { + percentile, ok := h.Metrics[mID.Name] + if !ok { + percentile = all_percentile + } + for _, perc := range percentile { + p := strconv.FormatFloat(perc*100, 'f', 0, 64) + mFields[key+field_sep+"p"+p] = val.Quantile(perc) + } + mFields[key+field_sep+"variance"] = val.Variance() + mFields[key+field_sep+"mean"] = val.Mean() + mFields[key+field_sep+"count"] = val.Count() + mFields[key+field_sep+"sum"] = val.Sum() + mFields[key+field_sep+"max"] = val.Max() + mFields[key+field_sep+"min"] = val.Min() + } + metric, _ := telegraf.NewMetric(mID.Name, h.metricTags[mID], mFields, time.Now().UTC()) + h.outch <- metric + delete(h.fieldMap, mID) + delete(h.metricTags, mID) + } +} + +func (h *Histogram) Reset() { + h.fieldMap = make(map[metricID]map[string]*Aggregate) + h.metricTags = make(map[metricID]map[string]string) +} + +func init() { + filters.Add("histogram", func() telegraf.Filter { + return &Histogram{ + fieldMap: make(map[metricID]map[string]*Aggregate), + metricTags: make(map[metricID]map[string]string), + } + }) +} diff --git a/plugins/filters/registry.go b/plugins/filters/registry.go new file mode 100644 index 000000000..e39176284 --- /dev/null +++ b/plugins/filters/registry.go @@ -0,0 +1,11 @@ +package filters + +import "github.com/influxdata/telegraf" + +type Creator func() telegraf.Filter + +var Filters = map[string]Creator{} + +func Add(name string, creator Creator) { + Filters[name] = creator +}