Statsd listener plugin

implement gauges, sets, counters
This commit is contained in:
Cameron Sparr 2015-09-24 11:06:11 -07:00
parent 316fa1cc01
commit eb2a4dc724
9 changed files with 583 additions and 29 deletions

View File

@ -16,8 +16,6 @@ and submit new plugins.
### Plugin Guidelines ### Plugin Guidelines
* A plugin must conform to the `plugins.Plugin` interface. * A plugin must conform to the `plugins.Plugin` interface.
* Telegraf promises to run each plugin's Gather function serially. This means
developers don't have to worry about thread safety within these functions.
* Each generated metric automatically has the name of the plugin that generated * Each generated metric automatically has the name of the plugin that generated
it prepended. This is to keep plugins honest. it prepended. This is to keep plugins honest.
* Plugins should call `plugins.Add` in their `init` function to register themselves. * Plugins should call `plugins.Add` in their `init` function to register themselves.

View File

@ -361,6 +361,20 @@ func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup var wg sync.WaitGroup
for _, plugin := range a.plugins { for _, plugin := range a.plugins {
// Start service of any ServicePlugins
switch p := plugin.plugin.(type) {
case plugins.ServicePlugin:
if err := p.Start(); err != nil {
log.Printf("Service for plugin %s failed to start, exiting\n%s\n",
plugin.name, err.Error())
return err
}
defer p.Stop()
}
// Special handling for plugins that have their own collection interval
// configured. Default intervals are handled below with crankParallel
if plugin.config.Interval != 0 { if plugin.config.Interval != 0 {
wg.Add(1) wg.Add(1)
go func(plugin *runningPlugin) { go func(plugin *runningPlugin) {

View File

@ -377,18 +377,25 @@ var header = `# Telegraf configuration
[outputs] [outputs]
` `
var header2 = ` var pluginHeader = `
############################################################################### ###############################################################################
# PLUGINS # # PLUGINS #
############################################################################### ###############################################################################
` `
var servicePluginHeader = `
###############################################################################
# SERVICE PLUGINS #
###############################################################################
`
// PrintSampleConfig prints the sample config // PrintSampleConfig prints the sample config
func PrintSampleConfig(pluginFilters []string, outputFilters []string) { func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
fmt.Printf(header) fmt.Printf(header)
// Print Outputs // Filter outputs
var onames []string var onames []string
for oname := range outputs.Outputs { for oname := range outputs.Outputs {
if len(outputFilters) == 0 || sliceContains(oname, outputFilters) { if len(outputFilters) == 0 || sliceContains(oname, outputFilters) {
@ -397,6 +404,7 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
} }
sort.Strings(onames) sort.Strings(onames)
// Print Outputs
for _, oname := range onames { for _, oname := range onames {
creator := outputs.Outputs[oname] creator := outputs.Outputs[oname]
output := creator() output := creator()
@ -411,9 +419,7 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
} }
} }
fmt.Printf(header2) // Filter plugins
// Print Plugins
var pnames []string var pnames []string
for pname := range plugins.Plugins { for pname := range plugins.Plugins {
if len(pluginFilters) == 0 || sliceContains(pname, pluginFilters) { if len(pluginFilters) == 0 || sliceContains(pname, pluginFilters) {
@ -422,12 +428,31 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
} }
sort.Strings(pnames) sort.Strings(pnames)
// Print Plugins
fmt.Printf(pluginHeader)
servPlugins := make(map[string]plugins.ServicePlugin)
for _, pname := range pnames { for _, pname := range pnames {
creator := plugins.Plugins[pname] creator := plugins.Plugins[pname]
plugin := creator() plugin := creator()
fmt.Printf("\n# %s\n[%s]", plugin.Description(), pname) switch p := plugin.(type) {
case plugins.ServicePlugin:
servPlugins[pname] = p
continue
}
printConfig(pname, plugin)
}
// Print Service Plugins
fmt.Printf(servicePluginHeader)
for name, plugin := range servPlugins {
printConfig(name, plugin)
}
}
func printConfig(name string, plugin plugins.Plugin) {
fmt.Printf("\n# %s\n[%s]", plugin.Description(), name)
config := plugin.SampleConfig() config := plugin.SampleConfig()
if config == "" { if config == "" {
fmt.Printf("\n # no configuration\n") fmt.Printf("\n # no configuration\n")
@ -435,7 +460,6 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
fmt.Printf(config) fmt.Printf(config)
} }
} }
}
func sliceContains(name string, list []string) bool { func sliceContains(name string, list []string) bool {
for _, b := range list { for _, b := range list {
@ -449,9 +473,7 @@ func sliceContains(name string, list []string) bool {
// PrintPluginConfig prints the config usage of a single plugin. // PrintPluginConfig prints the config usage of a single plugin.
func PrintPluginConfig(name string) error { func PrintPluginConfig(name string) error {
if creator, ok := plugins.Plugins[name]; ok { if creator, ok := plugins.Plugins[name]; ok {
plugin := creator() printConfig(name, creator())
fmt.Printf("# %s\n[%s]", plugin.Description(), name)
fmt.Printf(plugin.SampleConfig())
} else { } else {
return errors.New(fmt.Sprintf("Plugin %s not found", name)) return errors.New(fmt.Sprintf("Plugin %s not found", name))
} }

View File

@ -28,22 +28,18 @@ type InfluxDB struct {
var sampleConfig = ` var sampleConfig = `
# The full HTTP endpoint URL for your InfluxDB instance # The full HTTP endpoint URL for your InfluxDB instance
# Multiple urls can be specified for InfluxDB cluster support. Server to # Multiple urls can be specified for InfluxDB cluster support.
# write to will be randomly chosen each interval. urls = ["http://localhost:8086"] # required
urls = ["http://localhost:8086"] # required. # The target database for metrics (telegraf will create it if not exists)
database = "telegraf" # required
# The target database for metrics. This database must already exist # # Connection timeout (for the connection with InfluxDB), formatted as a string.
database = "telegraf" # required. # # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# # If not provided, will default to 0 (no timeout)
# Connection timeout (for the connection with InfluxDB), formatted as a string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# If not provided, will default to 0 (no timeout)
# timeout = "5s" # timeout = "5s"
# username = "telegraf" # username = "telegraf"
# password = "metricsmetricsmetricsmetrics" # password = "metricsmetricsmetricsmetrics"
# # Set the user agent for the POSTs (can be useful for log differentiation)
# Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf" # user_agent = "telegraf"
` `

View File

@ -22,6 +22,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/rabbitmq" _ "github.com/influxdb/telegraf/plugins/rabbitmq"
_ "github.com/influxdb/telegraf/plugins/redis" _ "github.com/influxdb/telegraf/plugins/redis"
_ "github.com/influxdb/telegraf/plugins/rethinkdb" _ "github.com/influxdb/telegraf/plugins/rethinkdb"
_ "github.com/influxdb/telegraf/plugins/statsd"
_ "github.com/influxdb/telegraf/plugins/system" _ "github.com/influxdb/telegraf/plugins/system"
_ "github.com/influxdb/telegraf/plugins/zookeeper" _ "github.com/influxdb/telegraf/plugins/zookeeper"
) )

View File

@ -20,11 +20,35 @@ type Accumulator interface {
} }
type Plugin interface { type Plugin interface {
// SampleConfig returns the default configuration of the Plugin
SampleConfig() string SampleConfig() string
// Description returns a one-sentence description on the Plugin
Description() string Description() string
// Gather takes in an accumulator and adds the metrics that the Plugin
// gathers. This is called every "interval"
Gather(Accumulator) error Gather(Accumulator) error
} }
type ServicePlugin interface {
// SampleConfig returns the default configuration of the Plugin
SampleConfig() string
// Description returns a one-sentence description on the Plugin
Description() string
// Gather takes in an accumulator and adds the metrics that the Plugin
// gathers. This is called every "interval"
Gather(Accumulator) error
// Start starts the ServicePlugin's service, whatever that may be
Start() error
// Stop stops the services and closes any necessary channels and connections
Stop()
}
type Creator func() Plugin type Creator func() Plugin
var Plugins = map[string]Creator{} var Plugins = map[string]Creator{}

79
plugins/statsd/README.md Normal file
View File

@ -0,0 +1,79 @@
# Telegraf Service Plugin: statsd
#### Plugin arguments:
- **service_address** string: Address to listen for statsd UDP packets on
- **delete_gauges** boolean: Delete gauges on every collection interval
- **delete_counters** boolean: Delete counters on every collection interval
- **delete_sets** boolean: Delete set counters on every collection interval
- **allowed_pending_messages** integer: Number of messages allowed to queue up
on the UDP listener before the next flush. NOTE: gauge, counter, and set
measurements are aggregated as they arrive, so this is not a straight counter of
the number of total messages that the listener can handle between flushes.
#### Statsd bucket -> InfluxDB Mapping
By default, statsd buckets are converted to measurement names with the rules:
- "." -> "_"
- "-" -> "__"
This plugin also accepts a list of config tables to describe a mapping of a statsd
bucket to an InfluxDB measurement name and tags.
Each mapping must specify a match glob pattern. It can optionally take a name
for the measurement and a map of bucket indices to tag names.
For example, the following configuration:
```
[[statsd.mappings]]
match = "users.current.*.*"
name = "current_users"
[statsd.mappings.tagmap]
unit = 0
server = 2
service = 3
[[statsd.mappings]]
match = "deploys.*.*"
name = "service_deploys"
[statsd.mappings.tagmap]
service_type = 1
service_name = 2
```
Will map statsd -> influx like so:
```
users.current.den001.myapp:32|g
=> [server="den001" service="myapp" unit="users"] statsd_current_users_gauge value=32
deploys.test.myservice:1|c
=> [service_name="myservice" service_type="test"] statsd_service_deploys_counter value=1
random.jumping-sheep:10|c
=> [] statsd_random_jumping__sheep_counter value=10
```
#### Description
The statsd plugin is a special type of plugin which runs a backgrounded statsd
listener service while telegraf is running.
The format of the statsd messages was based on the format described in the
original [etsy statsd](https://github.com/etsy/statsd/blob/master/docs/metric_types.md)
implementation. In short, the telegraf statsd listener will accept:
- Gauges
- `users.current.den001.myapp:32|g` <- standard
- `users.current.den001.myapp:+10|g` <- additive
- `users.current.den001.myapp:-10|g`
- Counters
- `deploys.test.myservice:1|c` <- increments by 1
- `deploys.test.myservice:101|c` <- increments by 101
- `deploys.test.myservice:1|c|@0.1` <- sample rate, increments by 10
- Sets
- `users.unique:101|s`
- `users.unique:101|s`
- `users.unique:102|s` <- would result in a count of 2 for `users.unique`
- Timers
- TODO

409
plugins/statsd/statsd.go Normal file
View File

@ -0,0 +1,409 @@
package statsd
import (
"log"
"net"
"strconv"
"strings"
"sync"
"github.com/influxdb/telegraf/plugins"
)
var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n"
type Statsd struct {
// Address & Port to serve from
ServiceAddress string
// Number of messages allowed to queue up in between calls to Gather. If this
// fills up, packets will get dropped until the next Gather interval is ran.
AllowedPendingMessages int
DeleteGauges bool
DeleteCounters bool
DeleteSets bool
sync.Mutex
// Channel for all incoming statsd messages
in chan string
inmetrics chan metric
done chan struct{}
// Cache gauges, counters & sets so they can be aggregated as they arrive
gauges map[string]cachedmetric
counters map[string]cachedmetric
sets map[string]cachedmetric
Mappings []struct {
Match string
Name string
Tagmap map[string]int
}
}
// One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate>
type metric struct {
name string
bucket string
value int64
mtype string
additive bool
samplerate float64
tags map[string]string
}
// cachedmetric is a subset of metric used specifically for storing cached
// gauges and counters, ready for sending to InfluxDB.
type cachedmetric struct {
value int64
tags map[string]string
set map[int64]bool
}
func (_ *Statsd) Description() string {
return "Statsd listener"
}
const sampleConfig = `
# Address and port to host UDP listener on
service_address = ":8125"
# Delete gauges every interval
delete_gauges = false
# Delete counters every interval
delete_counters = false
# Delete sets every interval
delete_sets = false
# Number of messages allowed to queue up, once filled,
# the statsd server will start dropping packets
allowed_pending_messages = 10000
`
func (_ *Statsd) SampleConfig() string {
return sampleConfig
}
func (s *Statsd) Gather(acc plugins.Accumulator) error {
s.Lock()
defer s.Unlock()
values := make(map[string]int64)
items := len(s.inmetrics)
for i := 0; i < items; i++ {
m := <-s.inmetrics
switch m.mtype {
case "c", "g", "s":
log.Println("ERROR: Uh oh, this should not have happened")
case "ms", "h":
// TODO
}
}
for name, cmetric := range s.gauges {
acc.Add(name, cmetric.value, cmetric.tags)
}
if s.DeleteGauges {
s.gauges = make(map[string]cachedmetric)
}
for name, cmetric := range s.counters {
acc.Add(name, cmetric.value, cmetric.tags)
}
if s.DeleteCounters {
s.counters = make(map[string]cachedmetric)
}
for name, cmetric := range s.sets {
acc.Add(name, cmetric.value, cmetric.tags)
}
if s.DeleteSets {
s.sets = make(map[string]cachedmetric)
}
for name, value := range values {
acc.Add(name, value, nil)
}
return nil
}
func (s *Statsd) Start() error {
log.Println("Starting up the statsd service")
// Make data structures
s.done = make(chan struct{})
s.in = make(chan string, s.AllowedPendingMessages)
s.inmetrics = make(chan metric, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedmetric)
s.counters = make(map[string]cachedmetric)
s.sets = make(map[string]cachedmetric)
// Start the UDP listener
go s.udpListen()
// Start the line parser
go s.parser()
return nil
}
// udpListen starts listening for udp packets on the configured port.
func (s *Statsd) udpListen() error {
address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress)
listener, err := net.ListenUDP("udp", address)
if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err)
}
defer listener.Close()
log.Println("Statsd listener listening on: ", listener.LocalAddr().String())
for {
select {
case <-s.done:
return nil
default:
buf := make([]byte, 1024)
n, _, err := listener.ReadFromUDP(buf)
if err != nil {
log.Printf("ERROR: %s\n", err.Error())
}
lines := strings.Split(string(buf[:n]), "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if line != "" {
select {
case s.in <- line:
default:
log.Printf(dropwarn, line)
}
}
}
}
}
}
// parser monitors the s.in channel, if there is a line ready, it parses the
// statsd string into a usable metric struct and either aggregates the value
// or pushes it into the s.inmetrics channel.
func (s *Statsd) parser() error {
for {
select {
case <-s.done:
return nil
case line := <-s.in:
s.parseStatsdLine(line)
}
}
}
// parseStatsdLine will parse the given statsd line, validating it as it goes.
// If the line is valid, it will be cached for the next call to Gather()
func (s *Statsd) parseStatsdLine(line string) {
s.Lock()
defer s.Unlock()
// Validate splitting the line on "|"
m := metric{}
parts1 := strings.Split(line, "|")
if len(parts1) < 2 {
log.Printf("Error splitting '|', Unable to parse metric: %s\n", line)
return
} else if len(parts1) > 2 {
sr := parts1[2]
if strings.Contains(sr, "@") && len(sr) > 1 {
samplerate, err := strconv.ParseFloat(sr[1:], 64)
if err != nil {
log.Printf("Error parsing sample rate: %s\n", err.Error())
} else {
m.samplerate = samplerate
}
} else {
msg := "Error parsing sample rate, it must be in format like: " +
"@0.1, @0.5, etc. Ignoring sample rate for line: %s\n"
log.Printf(msg, line)
}
}
// Validate metric type
switch parts1[1] {
case "g", "c", "s", "ms", "h":
m.mtype = parts1[1]
default:
log.Printf("Statsd Metric type %s unsupported", parts1[1])
return
}
// Validate splitting the rest of the line on ":"
parts2 := strings.Split(parts1[0], ":")
if len(parts2) != 2 {
log.Printf("Error splitting ':', Unable to parse metric: %s\n", line)
return
}
m.bucket = parts2[0]
// Parse the value
if strings.ContainsAny(parts2[1], "-+") {
if m.mtype != "g" {
log.Printf("Error: +- values are only supported for gauges: %s\n", line)
return
}
m.additive = true
}
v, err := strconv.ParseInt(parts2[1], 10, 64)
if err != nil {
log.Printf("Error: parsing value to int64: %s\n", line)
return
}
// If a sample rate is given with a counter, divide value by the rate
if m.samplerate != 0 && m.mtype == "c" {
v = int64(float64(v) / m.samplerate)
}
m.value = v
// Parse the name
m.name, m.tags = s.parseName(m)
switch m.mtype {
// Aggregate gauges, counters and sets as we go
case "g", "c", "s":
s.aggregate(m)
// Timers get processed at flush time
default:
select {
case s.inmetrics <- m:
default:
log.Printf(dropwarn, line)
}
}
}
// parseName parses the given bucket name with the list of bucket maps in the
// config file. If there is a match, it will parse the name of the metric and
// map of tags.
// Return values are (<name>, <tags>)
func (s *Statsd) parseName(m metric) (string, map[string]string) {
var tags map[string]string
name := strings.Replace(m.bucket, ".", "_", -1)
name = strings.Replace(name, "-", "__", -1)
for _, bm := range s.Mappings {
if bucketglob(bm.Match, m.bucket) {
tags = make(map[string]string)
bparts := strings.Split(m.bucket, ".")
for name, index := range bm.Tagmap {
if index >= len(bparts) {
log.Printf("ERROR: Index %d out of range for bucket %s\n",
index, m.bucket)
continue
}
tags[name] = bparts[index]
}
if bm.Name != "" {
name = bm.Name
}
}
}
switch m.mtype {
case "c":
name = name + "_counter"
case "g":
name = name + "_gauge"
case "s":
name = name + "_set"
case "ms", "h":
name = name + "_timer"
}
return name, tags
}
func bucketglob(pattern, bucket string) bool {
pparts := strings.Split(pattern, ".")
bparts := strings.Split(bucket, ".")
if len(pparts) != len(bparts) {
return false
}
for i, _ := range pparts {
if pparts[i] == "*" || pparts[i] == bparts[i] {
continue
} else {
return false
}
}
return true
}
// aggregate takes in a metric of type "counter", "gauge", or "set". It then
// aggregates and caches the current value. It does not deal with the
// DeleteCounters, DeleteGauges or DeleteSets options, because those are dealt
// with in the Gather function.
func (s *Statsd) aggregate(m metric) {
switch m.mtype {
case "c":
cached, ok := s.counters[m.name]
if !ok {
s.counters[m.name] = cachedmetric{
value: m.value,
tags: m.tags,
}
} else {
cached.value += m.value
cached.tags = m.tags
s.counters[m.name] = cached
}
case "g":
cached, ok := s.gauges[m.name]
if !ok {
s.gauges[m.name] = cachedmetric{
value: m.value,
tags: m.tags,
}
} else {
if m.additive {
cached.value = cached.value + m.value
} else {
cached.value = m.value
}
cached.tags = m.tags
s.gauges[m.name] = cached
}
case "s":
cached, ok := s.sets[m.name]
if !ok {
// Completely new metric (initialize with count of 1)
s.sets[m.name] = cachedmetric{
value: 1,
tags: m.tags,
set: map[int64]bool{m.value: true},
}
} else {
_, ok := s.sets[m.name].set[m.value]
if !ok {
// Metric exists, but value has not been counted
cached.value += 1
cached.set[m.value] = true
s.sets[m.name] = cached
}
}
}
}
func (s *Statsd) Stop() {
s.Lock()
defer s.Unlock()
log.Println("Stopping the statsd service")
close(s.done)
close(s.in)
close(s.inmetrics)
}
func init() {
plugins.Add("statsd", func() plugins.Plugin {
return &Statsd{}
})
}

View File

@ -0,0 +1,11 @@
package statsd
import (
"testing"
)
func TestListen(t *testing.T) {
if false {
t.Errorf("Test failed!")
}
}