Add support for datadog events to statsd input (#5791)

This commit is contained in:
Jorge Landivar
2019-05-14 18:20:35 -05:00
committed by Daniel Nelson
parent 9318d47a38
commit 8f3ed45797
7 changed files with 852 additions and 177 deletions

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"log"
"net"
"net/url"
"sort"
"strconv"
"strings"
@@ -21,7 +22,7 @@ import (
)
const (
// UDP packet limit, see
// UDP_MAX_PACKET_SIZE is the UDP packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
UDP_MAX_PACKET_SIZE int = 64 * 1024
@@ -34,13 +35,14 @@ const (
MaxTCPConnections = 250
)
var dropwarn = "E! Error: statsd message queue full. " +
var dropwarn = "E! [inputs.statsd] Error: statsd message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n"
var malformedwarn = "E! Statsd over TCP has received %d malformed packets" +
var malformedwarn = "E! [inputs.statsd] Statsd over TCP has received %d malformed packets" +
" thus far."
// Statsd allows the importing of statsd and dogstatsd data.
type Statsd struct {
// Protocol used on listener - udp or tcp
Protocol string `toml:"protocol"`
@@ -67,7 +69,12 @@ type Statsd struct {
MetricSeparator string
// This flag enables parsing of tags in the dogstatsd extension to the
// statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/)
ParseDataDogTags bool
ParseDataDogTags bool // depreciated in 1.10; use datadog_extensions
// Parses extensions to statsd in the datadog statsd format
// currently supports metrics and datadog tags.
// http://docs.datadoghq.com/guides/dogstatsd/
DataDogExtensions bool `toml:"datadog_extensions"`
// UDPPacketSize is deprecated, it's only here for legacy support
// we now always create 1 max size buffer and then copy only what we need
@@ -91,7 +98,7 @@ type Statsd struct {
malformed int
// Channel for all incoming statsd packets
in chan *bytes.Buffer
in chan input
done chan struct{}
// Cache gauges, counters & sets so they can be aggregated as they arrive
@@ -131,6 +138,12 @@ type Statsd struct {
bufPool sync.Pool
}
type input struct {
*bytes.Buffer
time.Time
Addr string
}
// One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate>
type metric struct {
name string
@@ -214,6 +227,9 @@ const sampleConfig = `
## http://docs.datadoghq.com/guides/dogstatsd/
parse_data_dog_tags = false
## Parses datadog extensions to the statsd format
datadog_extensions = false
## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md
# templates = [
@@ -239,12 +255,12 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
defer s.Unlock()
now := time.Now()
for _, metric := range s.timings {
for _, m := range s.timings {
// Defining a template to parse field names for timers allows us to split
// out multiple fields per timer. In this case we prefix each stat with the
// field name and store these all in a single measurement.
fields := make(map[string]interface{})
for fieldName, stats := range metric.fields {
for fieldName, stats := range m.fields {
var prefix string
if fieldName != defaultFieldName {
prefix = fieldName + "_"
@@ -261,41 +277,44 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
}
}
acc.AddFields(metric.name, fields, metric.tags, now)
acc.AddFields(m.name, fields, m.tags, now)
}
if s.DeleteTimings {
s.timings = make(map[string]cachedtimings)
}
for _, metric := range s.gauges {
acc.AddGauge(metric.name, metric.fields, metric.tags, now)
for _, m := range s.gauges {
acc.AddGauge(m.name, m.fields, m.tags, now)
}
if s.DeleteGauges {
s.gauges = make(map[string]cachedgauge)
}
for _, metric := range s.counters {
acc.AddCounter(metric.name, metric.fields, metric.tags, now)
for _, m := range s.counters {
acc.AddCounter(m.name, m.fields, m.tags, now)
}
if s.DeleteCounters {
s.counters = make(map[string]cachedcounter)
}
for _, metric := range s.sets {
for _, m := range s.sets {
fields := make(map[string]interface{})
for field, set := range metric.fields {
for field, set := range m.fields {
fields[field] = int64(len(set))
}
acc.AddFields(metric.name, fields, metric.tags, now)
acc.AddFields(m.name, fields, m.tags, now)
}
if s.DeleteSets {
s.sets = make(map[string]cachedset)
}
return nil
}
func (s *Statsd) Start(_ telegraf.Accumulator) error {
if s.ParseDataDogTags {
s.DataDogExtensions = true
log.Printf("W! [inputs.statsd] The parse_data_dog_tags option is deprecated, use datadog_extensions instead.")
}
// Make data structures
s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter)
@@ -315,7 +334,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags)
s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags)
s.in = make(chan *bytes.Buffer, s.AllowedPendingMessages)
s.in = make(chan input, s.AllowedPendingMessages)
s.done = make(chan struct{})
s.accept = make(chan bool, s.MaxTCPConnections)
s.conns = make(map[string]*net.TCPConn)
@@ -329,7 +348,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
}
if s.ConvertNames {
log.Printf("I! WARNING statsd: convert_names config option is deprecated," +
log.Printf("W! [inputs.statsd] statsd: convert_names config option is deprecated," +
" please use metric_separator instead")
}
@@ -348,7 +367,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
return err
}
log.Println("I! Statsd UDP listener listening on: ", conn.LocalAddr().String())
log.Println("I! [inputs.statsd] Statsd UDP listener listening on: ", conn.LocalAddr().String())
s.UDPlistener = conn
s.wg.Add(1)
@@ -366,7 +385,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
return err
}
log.Println("I! TCP Statsd listening on: ", listener.Addr().String())
log.Println("I! [inputs.statsd] TCP Statsd listening on: ", listener.Addr().String())
s.TCPlistener = listener
s.wg.Add(1)
@@ -382,7 +401,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
defer s.wg.Done()
s.parser()
}()
log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress)
log.Printf("I! [inputs.statsd] Started the statsd service on %s\n", s.ServiceAddress)
return nil
}
@@ -439,17 +458,22 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error {
case <-s.done:
return nil
default:
n, _, err := conn.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("E! Error READ: %s\n", err.Error())
continue
n, addr, err := conn.ReadFromUDP(buf)
if err != nil {
if !strings.Contains(err.Error(), "closed network") {
log.Printf("E! [inputs.statsd] Error READ: %s\n", err.Error())
continue
}
return err
}
b := s.bufPool.Get().(*bytes.Buffer)
b.Reset()
b.Write(buf[:n])
select {
case s.in <- b:
case s.in <- input{
Buffer: b,
Time: time.Now(),
Addr: addr.IP.String()}:
default:
s.drops++
if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 {
@@ -468,12 +492,16 @@ func (s *Statsd) parser() error {
select {
case <-s.done:
return nil
case buf := <-s.in:
lines := strings.Split(buf.String(), "\n")
s.bufPool.Put(buf)
case in := <-s.in:
lines := strings.Split(in.Buffer.String(), "\n")
s.bufPool.Put(in.Buffer)
for _, line := range lines {
line = strings.TrimSpace(line)
if line != "" {
switch {
case line == "":
case s.DataDogExtensions && strings.HasPrefix(line, "_e"):
s.parseEventMessage(in.Time, line, in.Addr)
default:
s.parseStatsdLine(line)
}
}
@@ -488,7 +516,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
defer s.Unlock()
lineTags := make(map[string]string)
if s.ParseDataDogTags {
if s.DataDogExtensions {
recombinedSegments := make([]string, 0)
// datadog tags look like this:
// users.online:1|c|@0.5|#country:china,environment:production
@@ -499,24 +527,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
for _, segment := range pipesplit {
if len(segment) > 0 && segment[0] == '#' {
// we have ourselves a tag; they are comma separated
tagstr := segment[1:]
tags := strings.Split(tagstr, ",")
for _, tag := range tags {
ts := strings.SplitN(tag, ":", 2)
var k, v string
switch len(ts) {
case 1:
// just a tag
k = ts[0]
v = ""
case 2:
k = ts[0]
v = ts[1]
}
if k != "" {
lineTags[k] = v
}
}
parseDataDogTags(lineTags, segment[1:])
} else {
recombinedSegments = append(recombinedSegments, segment)
}
@@ -527,7 +538,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
// Validate splitting the line on ":"
bits := strings.Split(line, ":")
if len(bits) < 2 {
log.Printf("E! Error: splitting ':', Unable to parse metric: %s\n", line)
log.Printf("E! [inputs.statsd] Error: splitting ':', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
}
@@ -543,11 +554,11 @@ func (s *Statsd) parseStatsdLine(line string) error {
// Validate splitting the bit on "|"
pipesplit := strings.Split(bit, "|")
if len(pipesplit) < 2 {
log.Printf("E! Error: splitting '|', Unable to parse metric: %s\n", line)
log.Printf("E! [inputs.statsd] Error: splitting '|', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
} else if len(pipesplit) > 2 {
sr := pipesplit[2]
errmsg := "E! Error: parsing sample rate, %s, it must be in format like: " +
errmsg := "E! [inputs.statsd] parsing sample rate, %s, it must be in format like: " +
"@0.1, @0.5, etc. Ignoring sample rate for line: %s\n"
if strings.Contains(sr, "@") && len(sr) > 1 {
samplerate, err := strconv.ParseFloat(sr[1:], 64)
@@ -567,14 +578,14 @@ func (s *Statsd) parseStatsdLine(line string) error {
case "g", "c", "s", "ms", "h":
m.mtype = pipesplit[1]
default:
log.Printf("E! Error: Statsd Metric type %s unsupported", pipesplit[1])
log.Printf("E! [inputs.statsd] Error: Statsd Metric type %s unsupported", pipesplit[1])
return errors.New("Error Parsing statsd line")
}
// Parse the value
if strings.HasPrefix(pipesplit[0], "-") || strings.HasPrefix(pipesplit[0], "+") {
if m.mtype != "g" && m.mtype != "c" {
log.Printf("E! Error: +- values are only supported for gauges & counters: %s\n", line)
log.Printf("E! [inputs.statsd] Error: +- values are only supported for gauges & counters: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.additive = true
@@ -584,7 +595,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
case "g", "ms", "h":
v, err := strconv.ParseFloat(pipesplit[0], 64)
if err != nil {
log.Printf("E! Error: parsing value to float64: %s\n", line)
log.Printf("E! [inputs.statsd] Error: parsing value to float64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.floatvalue = v
@@ -594,7 +605,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
if err != nil {
v2, err2 := strconv.ParseFloat(pipesplit[0], 64)
if err2 != nil {
log.Printf("E! Error: parsing value to int64: %s\n", line)
log.Printf("E! [inputs.statsd] Error: parsing value to int64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
v = int64(v2)
@@ -622,7 +633,6 @@ func (s *Statsd) parseStatsdLine(line string) error {
case "h":
m.tags["metric_type"] = "histogram"
}
if len(lineTags) > 0 {
for k, v := range lineTags {
m.tags[k] = v
@@ -807,7 +817,14 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
s.forget(id)
s.CurrentConnections.Incr(-1)
}()
addr := conn.RemoteAddr()
parsedURL, err := url.Parse(addr.String())
if err != nil {
// this should never happen because the conn handler should give us parsable addresses,
// but if it does we will know
log.Printf("E! [inputs.statsd] failed to parse %s\n", addr)
return // close the connetion and return
}
var n int
scanner := bufio.NewScanner(conn)
for {
@@ -831,7 +848,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
b.WriteByte('\n')
select {
case s.in <- b:
case s.in <- input{Buffer: b, Time: time.Now(), Addr: parsedURL.Host}:
default:
s.drops++
if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 {
@@ -845,8 +862,8 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
// refuser refuses a TCP connection
func (s *Statsd) refuser(conn *net.TCPConn) {
conn.Close()
log.Printf("I! Refused TCP Connection from %s", conn.RemoteAddr())
log.Printf("I! WARNING: Maximum TCP Connections reached, you may want to" +
log.Printf("I! [inputs.statsd] Refused TCP Connection from %s", conn.RemoteAddr())
log.Printf("I! [inputs.statsd] WARNING: Maximum TCP Connections reached, you may want to" +
" adjust max_tcp_connections")
}
@@ -866,7 +883,7 @@ func (s *Statsd) remember(id string, conn *net.TCPConn) {
func (s *Statsd) Stop() {
s.Lock()
log.Println("I! Stopping the statsd service")
log.Println("I! [inputs.statsd] Stopping the statsd service")
close(s.done)
if s.isUDP() {
s.UDPlistener.Close()