From fb71143b79495292ac43e7908883110b20a4b2a6 Mon Sep 17 00:00:00 2001 From: souvik Date: Tue, 16 Feb 2016 11:52:00 +0530 Subject: [PATCH] Changing StatsD in telegraf so that it able to fetch timestamp from metric tags instead of using current timestamp --- plugins/inputs/statsd/statsd.go | 34 ++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index fb8de402e..6af027a18 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -61,6 +61,7 @@ type Statsd struct { // bucket -> influx templates Templates []string + timestamp time.Time } func NewStatsd() *Statsd { @@ -92,30 +93,35 @@ type metric struct { additive bool samplerate float64 tags map[string]string + timestamp time.Time } type cachedset struct { name string fields map[string]map[int64]bool tags map[string]string + timestamp time.Time } type cachedgauge struct { name string fields map[string]interface{} tags map[string]string + timestamp time.Time } type cachedcounter struct { name string fields map[string]interface{} tags map[string]string + timestamp time.Time } type cachedtimings struct { name string stats RunningStats tags map[string]string + timestamp time.Time } func (_ *Statsd) Description() string { @@ -166,8 +172,6 @@ func (_ *Statsd) SampleConfig() string { func (s *Statsd) Gather(acc telegraf.Accumulator) error { s.Lock() defer s.Unlock() - now := time.Now() - for _, metric := range s.timings { fields := make(map[string]interface{}) fields["mean"] = metric.stats.Mean() @@ -179,21 +183,21 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { name := fmt.Sprintf("%v_percentile", percentile) fields[name] = metric.stats.Percentile(percentile) } - acc.AddFields(metric.name, fields, metric.tags, now) + acc.AddFields(metric.name, fields, metric.tags, metric.timestamp) } if s.DeleteTimings { s.timings = make(map[string]cachedtimings) } for _, metric := range s.gauges { - acc.AddFields(metric.name, metric.fields, metric.tags, now) + acc.AddFields(metric.name, metric.fields, metric.tags, metric.timestamp) } if s.DeleteGauges { s.gauges = make(map[string]cachedgauge) } for _, metric := range s.counters { - acc.AddFields(metric.name, metric.fields, metric.tags, now) + acc.AddFields(metric.name, metric.fields, metric.tags, metric.timestamp) } if s.DeleteCounters { s.counters = make(map[string]cachedcounter) @@ -204,7 +208,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { for field, set := range metric.fields { fields[field] = int64(len(set)) } - acc.AddFields(metric.name, fields, metric.tags, now) + acc.AddFields(metric.name, fields, metric.tags, metric.timestamp) } if s.DeleteSets { s.sets = make(map[string]cachedset) @@ -309,7 +313,7 @@ func (s *Statsd) parseStatsdLine(line string) error { return errors.New("Error Parsing statsd line") } else if len(pipesplit) > 2 { sr := pipesplit[2] - errmsg := "Error: parsing sample rate, %s, it must be in format like: " + + errmsg := "Error: parsing sample rate , %s, it must be in format like: " + "@0.1, @0.5, etc. Ignoring sample rate for line: %s\n" if strings.Contains(sr, "@") && len(sr) > 1 { samplerate, err := strconv.ParseFloat(sr[1:], 64) @@ -390,9 +394,21 @@ func (s *Statsd) parseStatsdLine(line string) error { // Make a unique key for the measurement name/tags var tg []string + timestamp_available := false for k, v := range m.tags { tg = append(tg, fmt.Sprintf("%s=%s", k, v)) + if (k == "timestamp") { + i, err := strconv.ParseInt(v, 10, 64) + if err != nil { + panic(err) + } + m.timestamp = time.Unix(i, 0) + timestamp_available = true + } } + if (!timestamp_available) { + m.timestamp = time.Now() + } sort.Strings(tg) m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name) @@ -466,6 +482,7 @@ func (s *Statsd) aggregate(m metric) { cached = cachedtimings{ name: m.name, tags: m.tags, + timestamp: m.timestamp, stats: RunningStats{ PercLimit: s.PercentileLimit, }, @@ -489,6 +506,7 @@ func (s *Statsd) aggregate(m metric) { name: m.name, fields: make(map[string]interface{}), tags: m.tags, + timestamp: m.timestamp, } } // check if the field exists @@ -506,6 +524,7 @@ func (s *Statsd) aggregate(m metric) { name: m.name, fields: make(map[string]interface{}), tags: m.tags, + timestamp: m.timestamp, } } // check if the field exists @@ -527,6 +546,7 @@ func (s *Statsd) aggregate(m metric) { name: m.name, fields: make(map[string]map[int64]bool), tags: m.tags, + timestamp: m.timestamp, } } // check if the field exists