@@ -9,6 +9,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/services/graphite"
|
||||
|
||||
@@ -51,6 +52,8 @@ type Statsd struct {
|
||||
done chan struct{}
|
||||
|
||||
// Cache gauges, counters & sets so they can be aggregated as they arrive
|
||||
// gauges and counters map measurement/tags hash -> field name -> metrics
|
||||
// sets and timings map measurement/tags hash -> metrics
|
||||
gauges map[string]cachedgauge
|
||||
counters map[string]cachedcounter
|
||||
sets map[string]cachedset
|
||||
@@ -80,6 +83,7 @@ func NewStatsd() *Statsd {
|
||||
// One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate>
|
||||
type metric struct {
|
||||
name string
|
||||
field string
|
||||
bucket string
|
||||
hash string
|
||||
intvalue int64
|
||||
@@ -91,21 +95,21 @@ type metric struct {
|
||||
}
|
||||
|
||||
type cachedset struct {
|
||||
name string
|
||||
set map[int64]bool
|
||||
tags map[string]string
|
||||
name string
|
||||
fields map[string]map[int64]bool
|
||||
tags map[string]string
|
||||
}
|
||||
|
||||
type cachedgauge struct {
|
||||
name string
|
||||
value float64
|
||||
tags map[string]string
|
||||
name string
|
||||
fields map[string]interface{}
|
||||
tags map[string]string
|
||||
}
|
||||
|
||||
type cachedcounter struct {
|
||||
name string
|
||||
value int64
|
||||
tags map[string]string
|
||||
name string
|
||||
fields map[string]interface{}
|
||||
tags map[string]string
|
||||
}
|
||||
|
||||
type cachedtimings struct {
|
||||
@@ -160,6 +164,7 @@ func (_ *Statsd) SampleConfig() string {
|
||||
func (s *Statsd) Gather(acc telegraf.Accumulator) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
now := time.Now()
|
||||
|
||||
for _, metric := range s.timings {
|
||||
fields := make(map[string]interface{})
|
||||
@@ -172,28 +177,32 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
|
||||
name := fmt.Sprintf("%v_percentile", percentile)
|
||||
fields[name] = metric.stats.Percentile(percentile)
|
||||
}
|
||||
acc.AddFields(metric.name, fields, metric.tags)
|
||||
acc.AddFields(metric.name, fields, metric.tags, now)
|
||||
}
|
||||
if s.DeleteTimings {
|
||||
s.timings = make(map[string]cachedtimings)
|
||||
}
|
||||
|
||||
for _, metric := range s.gauges {
|
||||
acc.Add(metric.name, metric.value, metric.tags)
|
||||
acc.AddFields(metric.name, metric.fields, metric.tags, now)
|
||||
}
|
||||
if s.DeleteGauges {
|
||||
s.gauges = make(map[string]cachedgauge)
|
||||
}
|
||||
|
||||
for _, metric := range s.counters {
|
||||
acc.Add(metric.name, metric.value, metric.tags)
|
||||
acc.AddFields(metric.name, metric.fields, metric.tags, now)
|
||||
}
|
||||
if s.DeleteCounters {
|
||||
s.counters = make(map[string]cachedcounter)
|
||||
}
|
||||
|
||||
for _, metric := range s.sets {
|
||||
acc.Add(metric.name, int64(len(metric.set)), metric.tags)
|
||||
fields := make(map[string]interface{})
|
||||
for field, set := range metric.fields {
|
||||
fields[field] = int64(len(set))
|
||||
}
|
||||
acc.AddFields(metric.name, fields, metric.tags, now)
|
||||
}
|
||||
if s.DeleteSets {
|
||||
s.sets = make(map[string]cachedset)
|
||||
@@ -358,7 +367,12 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
||||
}
|
||||
|
||||
// Parse the name & tags from bucket
|
||||
m.name, m.tags = s.parseName(m.bucket)
|
||||
m.name, m.field, m.tags = s.parseName(m.bucket)
|
||||
// fields are not supported for timings, so if specified combine into
|
||||
// the name
|
||||
if (m.mtype == "ms" || m.mtype == "h") && m.field != "value" {
|
||||
m.name += "_" + m.field
|
||||
}
|
||||
switch m.mtype {
|
||||
case "c":
|
||||
m.tags["metric_type"] = "counter"
|
||||
@@ -389,8 +403,8 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
||||
// parseName parses the given bucket name with the list of bucket maps in the
|
||||
// config file. If there is a match, it will parse the name of the metric and
|
||||
// map of tags.
|
||||
// Return values are (<name>, <tags>)
|
||||
func (s *Statsd) parseName(bucket string) (string, map[string]string) {
|
||||
// Return values are (<name>, <field>, <tags>)
|
||||
func (s *Statsd) parseName(bucket string) (string, string, map[string]string) {
|
||||
tags := make(map[string]string)
|
||||
|
||||
bucketparts := strings.Split(bucket, ",")
|
||||
@@ -410,17 +424,21 @@ func (s *Statsd) parseName(bucket string) (string, map[string]string) {
|
||||
DefaultTags: tags,
|
||||
}
|
||||
|
||||
var field string
|
||||
name := bucketparts[0]
|
||||
p, err := graphite.NewParserWithOptions(o)
|
||||
if err == nil {
|
||||
name, tags, _, _ = p.ApplyTemplate(name)
|
||||
name, tags, field, _ = p.ApplyTemplate(name)
|
||||
}
|
||||
if s.ConvertNames {
|
||||
name = strings.Replace(name, ".", "_", -1)
|
||||
name = strings.Replace(name, "-", "__", -1)
|
||||
}
|
||||
if field == "" {
|
||||
field = "value"
|
||||
}
|
||||
|
||||
return name, tags
|
||||
return name, field, tags
|
||||
}
|
||||
|
||||
// Parse the key,value out of a string that looks like "key=value"
|
||||
@@ -466,46 +484,59 @@ func (s *Statsd) aggregate(m metric) {
|
||||
s.timings[m.hash] = cached
|
||||
}
|
||||
case "c":
|
||||
cached, ok := s.counters[m.hash]
|
||||
// check if the measurement exists
|
||||
_, ok := s.counters[m.hash]
|
||||
if !ok {
|
||||
s.counters[m.hash] = cachedcounter{
|
||||
name: m.name,
|
||||
value: m.intvalue,
|
||||
tags: m.tags,
|
||||
name: m.name,
|
||||
fields: make(map[string]interface{}),
|
||||
tags: m.tags,
|
||||
}
|
||||
} else {
|
||||
cached.value += m.intvalue
|
||||
s.counters[m.hash] = cached
|
||||
}
|
||||
// check if the field exists
|
||||
_, ok = s.counters[m.hash].fields[m.field]
|
||||
if !ok {
|
||||
s.counters[m.hash].fields[m.field] = int64(0)
|
||||
}
|
||||
s.counters[m.hash].fields[m.field] =
|
||||
s.counters[m.hash].fields[m.field].(int64) + m.intvalue
|
||||
case "g":
|
||||
cached, ok := s.gauges[m.hash]
|
||||
// check if the measurement exists
|
||||
_, ok := s.gauges[m.hash]
|
||||
if !ok {
|
||||
s.gauges[m.hash] = cachedgauge{
|
||||
name: m.name,
|
||||
value: m.floatvalue,
|
||||
tags: m.tags,
|
||||
name: m.name,
|
||||
fields: make(map[string]interface{}),
|
||||
tags: m.tags,
|
||||
}
|
||||
}
|
||||
// check if the field exists
|
||||
_, ok = s.gauges[m.hash].fields[m.field]
|
||||
if !ok {
|
||||
s.gauges[m.hash].fields[m.field] = float64(0)
|
||||
}
|
||||
if m.additive {
|
||||
s.gauges[m.hash].fields[m.field] =
|
||||
s.gauges[m.hash].fields[m.field].(float64) + m.floatvalue
|
||||
} else {
|
||||
if m.additive {
|
||||
cached.value = cached.value + m.floatvalue
|
||||
} else {
|
||||
cached.value = m.floatvalue
|
||||
}
|
||||
s.gauges[m.hash] = cached
|
||||
s.gauges[m.hash].fields[m.field] = m.floatvalue
|
||||
}
|
||||
case "s":
|
||||
cached, ok := s.sets[m.hash]
|
||||
// check if the measurement exists
|
||||
_, ok := s.sets[m.hash]
|
||||
if !ok {
|
||||
// Completely new metric (initialize with count of 1)
|
||||
s.sets[m.hash] = cachedset{
|
||||
name: m.name,
|
||||
tags: m.tags,
|
||||
set: map[int64]bool{m.intvalue: true},
|
||||
name: m.name,
|
||||
fields: make(map[string]map[int64]bool),
|
||||
tags: m.tags,
|
||||
}
|
||||
} else {
|
||||
cached.set[m.intvalue] = true
|
||||
s.sets[m.hash] = cached
|
||||
}
|
||||
// check if the field exists
|
||||
_, ok = s.sets[m.hash].fields[m.field]
|
||||
if !ok {
|
||||
s.sets[m.hash].fields[m.field] = make(map[int64]bool)
|
||||
}
|
||||
s.sets[m.hash].fields[m.field][m.intvalue] = true
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user