Use graphite parser for templating, godep update to head
This commit is contained in:
@@ -6,7 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
"github.com/influxdb/influxdb/models"
|
||||
"github.com/influxdb/telegraf/plugins"
|
||||
"github.com/wvanbergen/kafka/consumergroup"
|
||||
)
|
||||
@@ -86,9 +86,9 @@ func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte
|
||||
for {
|
||||
select {
|
||||
case batch := <-metricConsumer:
|
||||
var points []tsdb.Point
|
||||
var points []models.Point
|
||||
var err error
|
||||
if points, err = tsdb.ParsePoints(batch); err != nil {
|
||||
if points, err = models.ParsePoints(batch); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -2,12 +2,16 @@ package statsd
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdb/influxdb/services/graphite"
|
||||
|
||||
"github.com/influxdb/telegraf/plugins"
|
||||
)
|
||||
|
||||
@@ -38,11 +42,8 @@ type Statsd struct {
|
||||
counters map[string]cachedcounter
|
||||
sets map[string]cachedset
|
||||
|
||||
Mappings []struct {
|
||||
Match string
|
||||
Name string
|
||||
Tagmap map[string]int
|
||||
}
|
||||
// bucket -> influx templates
|
||||
Templates []string
|
||||
}
|
||||
|
||||
func NewStatsd() *Statsd {
|
||||
@@ -63,6 +64,7 @@ func NewStatsd() *Statsd {
|
||||
type metric struct {
|
||||
name string
|
||||
bucket string
|
||||
hash string
|
||||
intvalue int64
|
||||
floatvalue float64
|
||||
mtype string
|
||||
@@ -72,21 +74,25 @@ type metric struct {
|
||||
}
|
||||
|
||||
type cachedset struct {
|
||||
name string
|
||||
set map[int64]bool
|
||||
tags map[string]string
|
||||
}
|
||||
|
||||
type cachedgauge struct {
|
||||
name string
|
||||
value float64
|
||||
tags map[string]string
|
||||
}
|
||||
|
||||
type cachedcounter struct {
|
||||
name string
|
||||
value int64
|
||||
tags map[string]string
|
||||
}
|
||||
|
||||
type cachedtiming struct {
|
||||
name string
|
||||
timings []float64
|
||||
tags map[string]string
|
||||
}
|
||||
@@ -131,22 +137,22 @@ func (s *Statsd) Gather(acc plugins.Accumulator) error {
|
||||
}
|
||||
}
|
||||
|
||||
for name, cmetric := range s.gauges {
|
||||
acc.Add(name, cmetric.value, cmetric.tags)
|
||||
for _, cmetric := range s.gauges {
|
||||
acc.Add(cmetric.name, cmetric.value, cmetric.tags)
|
||||
}
|
||||
if s.DeleteGauges {
|
||||
s.gauges = make(map[string]cachedgauge)
|
||||
}
|
||||
|
||||
for name, cmetric := range s.counters {
|
||||
acc.Add(name, cmetric.value, cmetric.tags)
|
||||
for _, cmetric := range s.counters {
|
||||
acc.Add(cmetric.name, cmetric.value, cmetric.tags)
|
||||
}
|
||||
if s.DeleteCounters {
|
||||
s.counters = make(map[string]cachedcounter)
|
||||
}
|
||||
|
||||
for name, cmetric := range s.sets {
|
||||
acc.Add(name, int64(len(cmetric.set)), cmetric.tags)
|
||||
for _, cmetric := range s.sets {
|
||||
acc.Add(cmetric.name, int64(len(cmetric.set)), cmetric.tags)
|
||||
}
|
||||
if s.DeleteSets {
|
||||
s.sets = make(map[string]cachedset)
|
||||
@@ -301,6 +307,14 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
||||
// Parse the name
|
||||
m.name, m.tags = s.parseName(m)
|
||||
|
||||
// Make a unique key for the measurement name/tags
|
||||
var tg []string
|
||||
for k, v := range m.tags {
|
||||
tg = append(tg, fmt.Sprintf("%s=%s", k, v))
|
||||
}
|
||||
sort.Strings(tg)
|
||||
m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name)
|
||||
|
||||
switch m.mtype {
|
||||
// Aggregate gauges, counters and sets as we go
|
||||
case "g", "c", "s":
|
||||
@@ -321,28 +335,21 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
||||
// map of tags.
|
||||
// Return values are (<name>, <tags>)
|
||||
func (s *Statsd) parseName(m metric) (string, map[string]string) {
|
||||
name := m.bucket
|
||||
tags := make(map[string]string)
|
||||
name := strings.Replace(m.bucket, ".", "_", -1)
|
||||
name = strings.Replace(name, "-", "__", -1)
|
||||
|
||||
for _, bm := range s.Mappings {
|
||||
if bucketglob(bm.Match, m.bucket) {
|
||||
tags = make(map[string]string)
|
||||
bparts := strings.Split(m.bucket, ".")
|
||||
for name, index := range bm.Tagmap {
|
||||
if index >= len(bparts) {
|
||||
log.Printf("ERROR: Index %d out of range for bucket %s\n",
|
||||
index, m.bucket)
|
||||
continue
|
||||
}
|
||||
tags[name] = bparts[index]
|
||||
}
|
||||
if bm.Name != "" {
|
||||
name = bm.Name
|
||||
}
|
||||
}
|
||||
o := graphite.Options{
|
||||
Separator: "_",
|
||||
Templates: s.Templates,
|
||||
}
|
||||
|
||||
p, err := graphite.NewParserWithOptions(o)
|
||||
if err == nil {
|
||||
name, tags = p.ApplyTemplate(m.bucket)
|
||||
}
|
||||
name = strings.Replace(name, ".", "_", -1)
|
||||
name = strings.Replace(name, "-", "__", -1)
|
||||
|
||||
switch m.mtype {
|
||||
case "c":
|
||||
tags["metric_type"] = "counter"
|
||||
@@ -357,23 +364,6 @@ func (s *Statsd) parseName(m metric) (string, map[string]string) {
|
||||
return name, tags
|
||||
}
|
||||
|
||||
func bucketglob(pattern, bucket string) bool {
|
||||
pparts := strings.Split(pattern, ".")
|
||||
bparts := strings.Split(bucket, ".")
|
||||
if len(pparts) != len(bparts) {
|
||||
return false
|
||||
}
|
||||
|
||||
for i, _ := range pparts {
|
||||
if pparts[i] == "*" || pparts[i] == bparts[i] {
|
||||
continue
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// aggregate takes in a metric of type "counter", "gauge", or "set". It then
|
||||
// aggregates and caches the current value. It does not deal with the
|
||||
// DeleteCounters, DeleteGauges or DeleteSets options, because those are dealt
|
||||
@@ -381,21 +371,23 @@ func bucketglob(pattern, bucket string) bool {
|
||||
func (s *Statsd) aggregate(m metric) {
|
||||
switch m.mtype {
|
||||
case "c":
|
||||
cached, ok := s.counters[m.name]
|
||||
cached, ok := s.counters[m.hash]
|
||||
if !ok {
|
||||
s.counters[m.name] = cachedcounter{
|
||||
s.counters[m.hash] = cachedcounter{
|
||||
name: m.name,
|
||||
value: m.intvalue,
|
||||
tags: m.tags,
|
||||
}
|
||||
} else {
|
||||
cached.value += m.intvalue
|
||||
cached.tags = m.tags
|
||||
s.counters[m.name] = cached
|
||||
s.counters[m.hash] = cached
|
||||
}
|
||||
case "g":
|
||||
cached, ok := s.gauges[m.name]
|
||||
cached, ok := s.gauges[m.hash]
|
||||
if !ok {
|
||||
s.gauges[m.name] = cachedgauge{
|
||||
s.gauges[m.hash] = cachedgauge{
|
||||
name: m.name,
|
||||
value: m.floatvalue,
|
||||
tags: m.tags,
|
||||
}
|
||||
@@ -406,19 +398,20 @@ func (s *Statsd) aggregate(m metric) {
|
||||
cached.value = m.floatvalue
|
||||
}
|
||||
cached.tags = m.tags
|
||||
s.gauges[m.name] = cached
|
||||
s.gauges[m.hash] = cached
|
||||
}
|
||||
case "s":
|
||||
cached, ok := s.sets[m.name]
|
||||
cached, ok := s.sets[m.hash]
|
||||
if !ok {
|
||||
// Completely new metric (initialize with count of 1)
|
||||
s.sets[m.name] = cachedset{
|
||||
s.sets[m.hash] = cachedset{
|
||||
name: m.name,
|
||||
tags: m.tags,
|
||||
set: map[int64]bool{m.intvalue: true},
|
||||
}
|
||||
} else {
|
||||
cached.set[m.intvalue] = true
|
||||
s.sets[m.name] = cached
|
||||
s.sets[m.hash] = cached
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,6 +135,14 @@ func TestParse_NameMapTags(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test that measurements with the same name, but different tags, are treated
|
||||
// as different values in the statsd cache
|
||||
func TestParse_MeasurementsWithSameName(t *testing.T) {
|
||||
if false {
|
||||
t.Errorf("TODO")
|
||||
}
|
||||
}
|
||||
|
||||
// Valid lines should be parsed and their values should be cached
|
||||
func TestParse_ValidLines(t *testing.T) {
|
||||
s := NewStatsd()
|
||||
@@ -429,8 +437,16 @@ func test_validate_set(
|
||||
value int64,
|
||||
cache map[string]cachedset,
|
||||
) error {
|
||||
metric, ok := cache[name]
|
||||
if !ok {
|
||||
var metric cachedset
|
||||
var found bool
|
||||
for _, v := range cache {
|
||||
if v.name == name {
|
||||
metric = v
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name))
|
||||
}
|
||||
|
||||
@@ -446,8 +462,16 @@ func test_validate_counter(
|
||||
value int64,
|
||||
cache map[string]cachedcounter,
|
||||
) error {
|
||||
metric, ok := cache[name]
|
||||
if !ok {
|
||||
var metric cachedcounter
|
||||
var found bool
|
||||
for _, v := range cache {
|
||||
if v.name == name {
|
||||
metric = v
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name))
|
||||
}
|
||||
|
||||
@@ -463,8 +487,16 @@ func test_validate_gauge(
|
||||
value float64,
|
||||
cache map[string]cachedgauge,
|
||||
) error {
|
||||
metric, ok := cache[name]
|
||||
if !ok {
|
||||
var metric cachedgauge
|
||||
var found bool
|
||||
for _, v := range cache {
|
||||
if v.name == name {
|
||||
metric = v
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user