diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 5b6faf4c9..e0531210e 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -135,3 +135,6 @@ following works: - gopkg.in/olivere/elastic.v5 [MIT License](https://github.com/olivere/elastic/blob/v5.0.76/LICENSE) - gopkg.in/tomb.v1 [BSD 3-Clause Clear License](https://github.com/go-tomb/tomb/blob/v1/LICENSE) - gopkg.in/yaml.v2 [Apache License 2.0](https://github.com/go-yaml/yaml/blob/v2.2.2/LICENSE) + +## telegraf used and modified code from these projects +- github.com/DataDog/datadog-agent [Apache License 2.0](https://github.com/DataDog/datadog-agent/LICENSE) \ No newline at end of file diff --git a/plugins/inputs/statsd/README.md b/plugins/inputs/statsd/README.md index c1093bf39..a33480f61 100644 --- a/plugins/inputs/statsd/README.md +++ b/plugins/inputs/statsd/README.md @@ -42,8 +42,14 @@ ## Parses tags in the datadog statsd format ## http://docs.datadoghq.com/guides/dogstatsd/ + ## deprecated in 1.10; use datadog_extensions option instead parse_data_dog_tags = false + ## Parses extensions to statsd in the datadog statsd format + ## currently supports metrics and datadog tags. + ## http://docs.datadoghq.com/guides/dogstatsd/ + datadog_extensions = false + ## Statsd data translation templates, more info can be read here: ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md # templates = [ @@ -185,6 +191,7 @@ the accuracy of percentiles but also increases the memory usage and cpu time. - **templates** []string: Templates for transforming statsd buckets into influx measurements and tags. - **parse_data_dog_tags** boolean: Enable parsing of tags in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/) +- **datadog_extensions** boolean: Enable parsing of DataDog's extensions to dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/) ### Statsd bucket -> InfluxDB line-protocol Templates diff --git a/plugins/inputs/statsd/datadog.go b/plugins/inputs/statsd/datadog.go new file mode 100644 index 000000000..f2785ff38 --- /dev/null +++ b/plugins/inputs/statsd/datadog.go @@ -0,0 +1,174 @@ +package statsd + +// this is adapted from datadog's apache licensed version at +// https://github.com/DataDog/datadog-agent/blob/fcfc74f106ab1bd6991dfc6a7061c558d934158a/pkg/dogstatsd/parser.go#L173 + +import ( + "errors" + "fmt" + "strconv" + "strings" + "time" +) + +const ( + priorityNormal = "normal" + priorityLow = "low" + + eventInfo = "info" + eventWarning = "warning" + eventError = "error" + eventSuccess = "success" +) + +var uncommenter = strings.NewReplacer("\\n", "\n") + +func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostname string) error { + // _e{title.length,text.length}:title|text + // [ + // |d:date_happened + // |p:priority + // |h:hostname + // |t:alert_type + // |s:source_type_nam + // |#tag1,tag2 + // ] + // + // + // tag is key:value + messageRaw := strings.SplitN(message, ":", 2) + if len(messageRaw) < 2 || len(messageRaw[0]) < 7 || len(messageRaw[1]) < 3 { + return fmt.Errorf("Invalid message format") + } + header := messageRaw[0] + message = messageRaw[1] + + rawLen := strings.SplitN(header[3:], ",", 2) + if len(rawLen) != 2 { + return fmt.Errorf("Invalid message format") + } + + titleLen, err := strconv.ParseInt(rawLen[0], 10, 64) + if err != nil { + return fmt.Errorf("Invalid message format, could not parse title.length: '%s'", rawLen[0]) + } + if len(rawLen[1]) < 1 { + return fmt.Errorf("Invalid message format, could not parse text.length: '%s'", rawLen[0]) + } + textLen, err := strconv.ParseInt(rawLen[1][:len(rawLen[1])-1], 10, 64) + if err != nil { + return fmt.Errorf("Invalid message format, could not parse text.length: '%s'", rawLen[0]) + } + if titleLen+textLen+1 > int64(len(message)) { + return fmt.Errorf("Invalid message format, title.length and text.length exceed total message length") + } + + rawTitle := message[:titleLen] + rawText := message[titleLen+1 : titleLen+1+textLen] + message = message[titleLen+1+textLen:] + + if len(rawTitle) == 0 || len(rawText) == 0 { + return fmt.Errorf("Invalid event message format: empty 'title' or 'text' field") + } + + name := rawTitle + tags := make(map[string]string, strings.Count(message, ",")+2) // allocate for the approximate number of tags + fields := make(map[string]interface{}, 9) + fields["alert_type"] = eventInfo // default event type + fields["text"] = uncommenter.Replace(string(rawText)) + tags["source"] = defaultHostname // Use source tag because host is reserved tag key in Telegraf. + fields["priority"] = priorityNormal + ts := now + if len(message) < 2 { + s.acc.AddFields(name, fields, tags, ts) + return nil + } + + rawMetadataFields := strings.Split(message[1:], "|") + for i := range rawMetadataFields { + if len(rawMetadataFields[i]) < 2 { + return errors.New("too short metadata field") + } + switch rawMetadataFields[i][:2] { + case "d:": + ts, err := strconv.ParseInt(rawMetadataFields[i][2:], 10, 64) + if err != nil { + continue + } + fields["ts"] = ts + case "p:": + switch rawMetadataFields[i][2:] { + case priorityLow: + fields["priority"] = priorityLow + case priorityNormal: // we already used this as a default + default: + continue + } + case "h:": + tags["source"] = rawMetadataFields[i][2:] + case "t:": + switch rawMetadataFields[i][2:] { + case eventError, eventWarning, eventSuccess, eventInfo: + fields["alert_type"] = rawMetadataFields[i][2:] // already set for info + default: + continue + } + case "k:": + tags["aggregation_key"] = rawMetadataFields[i][2:] + case "s:": + fields["source_type_name"] = rawMetadataFields[i][2:] + default: + if rawMetadataFields[i][0] == '#' { + parseDataDogTags(tags, rawMetadataFields[i][1:]) + } else { + return fmt.Errorf("unknown metadata type: '%s'", rawMetadataFields[i]) + } + } + } + // Use source tag because host is reserved tag key in Telegraf. + // In datadog the host tag and `h:` are interchangable, so we have to chech for the host tag. + if host, ok := tags["host"]; ok { + delete(tags, "host") + tags["source"] = host + } + s.acc.AddFields(name, fields, tags, ts) + return nil +} + +func parseDataDogTags(tags map[string]string, message string) { + start, i := 0, 0 + var k string + var inVal bool // check if we are parsing the value part of the tag + for i = range message { + if message[i] == ',' { + if k == "" { + k = message[start:i] + tags[k] = "true" // this is because influx doesn't support empty tags + start = i + 1 + continue + } + v := message[start:i] + if v == "" { + v = "true" + } + tags[k] = v + start = i + 1 + k, inVal = "", false // reset state vars + } else if message[i] == ':' && !inVal { + k = message[start:i] + start = i + 1 + inVal = true + } + } + if k == "" && start < i+1 { + tags[message[start:i+1]] = "true" + } + // grab the last value + if k != "" { + if start < i+1 { + tags[k] = message[start : i+1] + return + } + tags[k] = "true" + } +} diff --git a/plugins/inputs/statsd/datadog_test.go b/plugins/inputs/statsd/datadog_test.go new file mode 100644 index 000000000..61762a2c4 --- /dev/null +++ b/plugins/inputs/statsd/datadog_test.go @@ -0,0 +1,478 @@ +package statsd + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestEventGather(t *testing.T) { + now := time.Now() + type expected struct { + title string + tags map[string]string + fields map[string]interface{} + } + tests := []struct { + name string + message string + hostname string + now time.Time + err bool + expected expected + }{{ + name: "basic", + message: "_e{10,9}:test title|test text", + hostname: "default-hostname", + now: now, + err: false, + expected: expected{ + title: "test title", + tags: map[string]string{"source": "default-hostname"}, + fields: map[string]interface{}{ + "priority": priorityNormal, + "alert_type": "info", + "text": "test text", + }, + }, + }, + { + name: "escape some stuff", + message: "_e{10,24}:test title|test\\line1\\nline2\\nline3", + hostname: "default-hostname", + now: now.Add(1), + err: false, + expected: expected{ + title: "test title", + tags: map[string]string{"source": "default-hostname"}, + fields: map[string]interface{}{ + "priority": priorityNormal, + "alert_type": "info", + "text": "test\\line1\nline2\nline3", + }, + }, + }, + { + name: "custom time", + message: "_e{10,9}:test title|test text|d:21", + hostname: "default-hostname", + now: now.Add(2), + err: false, + expected: expected{ + title: "test title", + tags: map[string]string{"source": "default-hostname"}, + fields: map[string]interface{}{ + "priority": priorityNormal, + "alert_type": "info", + "text": "test text", + "ts": int64(21), + }, + }, + }, + } + acc := &testutil.Accumulator{} + s := NewTestStatsd() + s.acc = acc + + for i := range tests { + t.Run(tests[i].name, func(t *testing.T) { + err := s.parseEventMessage(tests[i].now, tests[i].message, tests[i].hostname) + if tests[i].err { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + require.Equal(t, uint64(i+1), acc.NMetrics()) + + require.Nil(t, err) + require.Equal(t, tests[i].expected.title, acc.Metrics[i].Measurement) + require.Equal(t, tests[i].expected.tags, acc.Metrics[i].Tags) + require.Equal(t, tests[i].expected.fields, acc.Metrics[i].Fields) + }) + } +} + +// These tests adapted from tests in +// https://github.com/DataDog/datadog-agent/blob/master/pkg/dogstatsd/parser_test.go +// to ensure compatibility with the datadog-agent parser + +func TestEvents(t *testing.T) { + now := time.Now() + type args struct { + now time.Time + message string + hostname string + } + type expected struct { + title string + text interface{} + now time.Time + ts interface{} + priority string + source string + alertType interface{} + aggregationKey string + sourceTypeName interface{} + checkTags map[string]string + } + + tests := []struct { + name string + args args + expected expected + }{ + { + name: "event minimal", + args: args{ + now: now, + message: "_e{10,9}:test title|test text", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now, + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + aggregationKey: "", + }, + }, + { + name: "event multilines text", + args: args{ + now: now.Add(1), + message: "_e{10,24}:test title|test\\line1\\nline2\\nline3", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test\\line1\nline2\nline3", + now: now.Add(1), + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + aggregationKey: "", + }, + }, + { + name: "event pipe in title", + args: args{ + now: now.Add(2), + message: "_e{10,24}:test|title|test\\line1\\nline2\\nline3", + hostname: "default-hostname", + }, + expected: expected{ + title: "test|title", + text: "test\\line1\nline2\nline3", + now: now.Add(2), + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + aggregationKey: "", + }, + }, + { + name: "event metadata timestamp", + args: args{ + now: now.Add(3), + message: "_e{10,9}:test title|test text|d:21", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(3), + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + aggregationKey: "", + ts: int64(21), + }, + }, + { + name: "event metadata priority", + args: args{ + now: now.Add(4), + message: "_e{10,9}:test title|test text|p:low", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(4), + priority: priorityLow, + source: "default-hostname", + alertType: eventInfo, + }, + }, + { + name: "event metadata hostname", + args: args{ + now: now.Add(5), + message: "_e{10,9}:test title|test text|h:localhost", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(5), + priority: priorityNormal, + source: "localhost", + alertType: eventInfo, + }, + }, + { + name: "event metadata hostname in tag", + args: args{ + now: now.Add(6), + message: "_e{10,9}:test title|test text|#host:localhost", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(6), + priority: priorityNormal, + source: "localhost", + alertType: eventInfo, + }, + }, + { + name: "event metadata empty host tag", + args: args{ + now: now.Add(7), + message: "_e{10,9}:test title|test text|#host:,other:tag", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(7), + priority: priorityNormal, + source: "true", + alertType: eventInfo, + checkTags: map[string]string{"other": "tag", "source": "true"}, + }, + }, + { + name: "event metadata alert type", + args: args{ + now: now.Add(8), + message: "_e{10,9}:test title|test text|t:warning", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(8), + priority: priorityNormal, + source: "default-hostname", + alertType: eventWarning, + }, + }, + { + name: "event metadata aggregation key", + args: args{ + now: now.Add(9), + message: "_e{10,9}:test title|test text|k:some aggregation key", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(9), + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + aggregationKey: "some aggregation key", + }, + }, + { + name: "event metadata aggregation key", + args: args{ + now: now.Add(10), + message: "_e{10,9}:test title|test text|k:some aggregation key", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(10), + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + aggregationKey: "some aggregation key", + }, + }, + { + name: "event metadata source type", + args: args{ + now: now.Add(11), + message: "_e{10,9}:test title|test text|s:this is the source", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(11), + priority: priorityNormal, + source: "default-hostname", + sourceTypeName: "this is the source", + alertType: eventInfo, + }, + }, + { + name: "event metadata source type", + args: args{ + now: now.Add(11), + message: "_e{10,9}:test title|test text|s:this is the source", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(11), + priority: priorityNormal, + source: "default-hostname", + sourceTypeName: "this is the source", + alertType: eventInfo, + }, + }, + { + name: "event metadata source tags", + args: args{ + now: now.Add(11), + message: "_e{10,9}:test title|test text|#tag1,tag2:test", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(11), + priority: priorityNormal, + source: "default-hostname", + alertType: eventInfo, + checkTags: map[string]string{"tag1": "true", "tag2": "test", "source": "default-hostname"}, + }, + }, + { + name: "event metadata multiple", + args: args{ + now: now.Add(11), + message: "_e{10,9}:test title|test text|t:warning|d:12345|p:low|h:some.host|k:aggKey|s:source test|#tag1,tag2:test", + hostname: "default-hostname", + }, + expected: expected{ + title: "test title", + text: "test text", + now: now.Add(11), + priority: priorityLow, + source: "some.host", + ts: int64(12345), + alertType: eventWarning, + aggregationKey: "aggKey", + sourceTypeName: "source test", + checkTags: map[string]string{"aggregation_key": "aggKey", "tag1": "true", "tag2": "test", "source": "some.host"}, + }, + }, + } + for i := range tests { + t.Run(tests[i].name, func(t *testing.T) { + s := NewTestStatsd() + acc := &testutil.Accumulator{} + s.acc = acc + err := s.parseEventMessage(tests[i].args.now, tests[i].args.message, tests[i].args.hostname) + require.Nil(t, err) + m := acc.Metrics[0] + require.Equal(t, tests[i].expected.title, m.Measurement) + require.Equal(t, tests[i].expected.text, m.Fields["text"]) + require.Equal(t, tests[i].expected.now, m.Time) + require.Equal(t, tests[i].expected.ts, m.Fields["ts"]) + require.Equal(t, tests[i].expected.priority, m.Fields["priority"]) + require.Equal(t, tests[i].expected.source, m.Tags["source"]) + require.Equal(t, tests[i].expected.alertType, m.Fields["alert_type"]) + require.Equal(t, tests[i].expected.aggregationKey, m.Tags["aggregation_key"]) + require.Equal(t, tests[i].expected.sourceTypeName, m.Fields["source_type_name"]) + if tests[i].expected.checkTags != nil { + require.Equal(t, tests[i].expected.checkTags, m.Tags) + } + }) + } +} + +func TestEventError(t *testing.T) { + now := time.Now() + s := NewTestStatsd() + s.acc = &testutil.Accumulator{} + // missing length header + err := s.parseEventMessage(now, "_e:title|text", "default-hostname") + require.Error(t, err) + + // greater length than packet + err = s.parseEventMessage(now, "_e{10,10}:title|text", "default-hostname") + require.Error(t, err) + + // zero length + err = s.parseEventMessage(now, "_e{0,0}:a|a", "default-hostname") + require.Error(t, err) + + // missing title or text length + err = s.parseEventMessage(now, "_e{5555:title|text", "default-hostname") + require.Error(t, err) + + // missing wrong len format + err = s.parseEventMessage(now, "_e{a,1}:title|text", "default-hostname") + require.Error(t, err) + + err = s.parseEventMessage(now, "_e{1,a}:title|text", "default-hostname") + require.Error(t, err) + + // missing title or text length + err = s.parseEventMessage(now, "_e{5,}:title|text", "default-hostname") + require.Error(t, err) + + err = s.parseEventMessage(now, "_e{100,:title|text", "default-hostname") + require.Error(t, err) + + err = s.parseEventMessage(now, "_e,100:title|text", "default-hostname") + require.Error(t, err) + + err = s.parseEventMessage(now, "_e{,4}:title|text", "default-hostname") + require.Error(t, err) + + err = s.parseEventMessage(now, "_e{}:title|text", "default-hostname") + require.Error(t, err) + + err = s.parseEventMessage(now, "_e{,}:title|text", "default-hostname") + require.Error(t, err) + + // not enough information + err = s.parseEventMessage(now, "_e|text", "default-hostname") + require.Error(t, err) + + err = s.parseEventMessage(now, "_e:|text", "default-hostname") + require.Error(t, err) + + // invalid timestamp + err = s.parseEventMessage(now, "_e{5,4}:title|text|d:abc", "default-hostname") + require.NoError(t, err) + + // invalid priority + err = s.parseEventMessage(now, "_e{5,4}:title|text|p:urgent", "default-hostname") + require.NoError(t, err) + + // invalid priority + err = s.parseEventMessage(now, "_e{5,4}:title|text|p:urgent", "default-hostname") + require.NoError(t, err) + + // invalid alert type + err = s.parseEventMessage(now, "_e{5,4}:title|text|t:test", "default-hostname") + require.NoError(t, err) + + // unknown metadata + err = s.parseEventMessage(now, "_e{5,4}:title|text|x:1234", "default-hostname") + require.Error(t, err) +} diff --git a/plugins/inputs/statsd/running_stats.go b/plugins/inputs/statsd/running_stats.go index 2395ab143..6f8045b42 100644 --- a/plugins/inputs/statsd/running_stats.go +++ b/plugins/inputs/statsd/running_stats.go @@ -49,7 +49,7 @@ func (rs *RunningStats) AddValue(v float64) { } // These are used for the running mean and variance - rs.n += 1 + rs.n++ rs.ex += v - rs.k rs.ex2 += (v - rs.k) * (v - rs.k) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 8b5e15502..7408482b6 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -7,6 +7,7 @@ import ( "fmt" "log" "net" + "net/url" "sort" "strconv" "strings" @@ -21,7 +22,7 @@ import ( ) const ( - // UDP packet limit, see + // UDP_MAX_PACKET_SIZE is the UDP packet limit, see // https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure UDP_MAX_PACKET_SIZE int = 64 * 1024 @@ -34,13 +35,14 @@ const ( MaxTCPConnections = 250 ) -var dropwarn = "E! Error: statsd message queue full. " + +var dropwarn = "E! [inputs.statsd] Error: statsd message queue full. " + "We have dropped %d messages so far. " + "You may want to increase allowed_pending_messages in the config\n" -var malformedwarn = "E! Statsd over TCP has received %d malformed packets" + +var malformedwarn = "E! [inputs.statsd] Statsd over TCP has received %d malformed packets" + " thus far." +// Statsd allows the importing of statsd and dogstatsd data. type Statsd struct { // Protocol used on listener - udp or tcp Protocol string `toml:"protocol"` @@ -67,7 +69,12 @@ type Statsd struct { MetricSeparator string // This flag enables parsing of tags in the dogstatsd extension to the // statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/) - ParseDataDogTags bool + ParseDataDogTags bool // depreciated in 1.10; use datadog_extensions + + // Parses extensions to statsd in the datadog statsd format + // currently supports metrics and datadog tags. + // http://docs.datadoghq.com/guides/dogstatsd/ + DataDogExtensions bool `toml:"datadog_extensions"` // UDPPacketSize is deprecated, it's only here for legacy support // we now always create 1 max size buffer and then copy only what we need @@ -91,7 +98,7 @@ type Statsd struct { malformed int // Channel for all incoming statsd packets - in chan *bytes.Buffer + in chan input done chan struct{} // Cache gauges, counters & sets so they can be aggregated as they arrive @@ -131,6 +138,12 @@ type Statsd struct { bufPool sync.Pool } +type input struct { + *bytes.Buffer + time.Time + Addr string +} + // One statsd metric, form is :||@ type metric struct { name string @@ -214,6 +227,9 @@ const sampleConfig = ` ## http://docs.datadoghq.com/guides/dogstatsd/ parse_data_dog_tags = false + ## Parses datadog extensions to the statsd format + datadog_extensions = false + ## Statsd data translation templates, more info can be read here: ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md # templates = [ @@ -239,12 +255,12 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { defer s.Unlock() now := time.Now() - for _, metric := range s.timings { + for _, m := range s.timings { // Defining a template to parse field names for timers allows us to split // out multiple fields per timer. In this case we prefix each stat with the // field name and store these all in a single measurement. fields := make(map[string]interface{}) - for fieldName, stats := range metric.fields { + for fieldName, stats := range m.fields { var prefix string if fieldName != defaultFieldName { prefix = fieldName + "_" @@ -261,41 +277,44 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { } } - acc.AddFields(metric.name, fields, metric.tags, now) + acc.AddFields(m.name, fields, m.tags, now) } if s.DeleteTimings { s.timings = make(map[string]cachedtimings) } - for _, metric := range s.gauges { - acc.AddGauge(metric.name, metric.fields, metric.tags, now) + for _, m := range s.gauges { + acc.AddGauge(m.name, m.fields, m.tags, now) } if s.DeleteGauges { s.gauges = make(map[string]cachedgauge) } - for _, metric := range s.counters { - acc.AddCounter(metric.name, metric.fields, metric.tags, now) + for _, m := range s.counters { + acc.AddCounter(m.name, m.fields, m.tags, now) } if s.DeleteCounters { s.counters = make(map[string]cachedcounter) } - for _, metric := range s.sets { + for _, m := range s.sets { fields := make(map[string]interface{}) - for field, set := range metric.fields { + for field, set := range m.fields { fields[field] = int64(len(set)) } - acc.AddFields(metric.name, fields, metric.tags, now) + acc.AddFields(m.name, fields, m.tags, now) } if s.DeleteSets { s.sets = make(map[string]cachedset) } - return nil } func (s *Statsd) Start(_ telegraf.Accumulator) error { + if s.ParseDataDogTags { + s.DataDogExtensions = true + log.Printf("W! [inputs.statsd] The parse_data_dog_tags option is deprecated, use datadog_extensions instead.") + } // Make data structures s.gauges = make(map[string]cachedgauge) s.counters = make(map[string]cachedcounter) @@ -315,7 +334,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags) - s.in = make(chan *bytes.Buffer, s.AllowedPendingMessages) + s.in = make(chan input, s.AllowedPendingMessages) s.done = make(chan struct{}) s.accept = make(chan bool, s.MaxTCPConnections) s.conns = make(map[string]*net.TCPConn) @@ -329,7 +348,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { } if s.ConvertNames { - log.Printf("I! WARNING statsd: convert_names config option is deprecated," + + log.Printf("W! [inputs.statsd] statsd: convert_names config option is deprecated," + " please use metric_separator instead") } @@ -348,7 +367,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { return err } - log.Println("I! Statsd UDP listener listening on: ", conn.LocalAddr().String()) + log.Println("I! [inputs.statsd] Statsd UDP listener listening on: ", conn.LocalAddr().String()) s.UDPlistener = conn s.wg.Add(1) @@ -366,7 +385,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { return err } - log.Println("I! TCP Statsd listening on: ", listener.Addr().String()) + log.Println("I! [inputs.statsd] TCP Statsd listening on: ", listener.Addr().String()) s.TCPlistener = listener s.wg.Add(1) @@ -382,7 +401,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { defer s.wg.Done() s.parser() }() - log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress) + log.Printf("I! [inputs.statsd] Started the statsd service on %s\n", s.ServiceAddress) return nil } @@ -439,17 +458,22 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error { case <-s.done: return nil default: - n, _, err := conn.ReadFromUDP(buf) - if err != nil && !strings.Contains(err.Error(), "closed network") { - log.Printf("E! Error READ: %s\n", err.Error()) - continue + n, addr, err := conn.ReadFromUDP(buf) + if err != nil { + if !strings.Contains(err.Error(), "closed network") { + log.Printf("E! [inputs.statsd] Error READ: %s\n", err.Error()) + continue + } + return err } b := s.bufPool.Get().(*bytes.Buffer) b.Reset() b.Write(buf[:n]) - select { - case s.in <- b: + case s.in <- input{ + Buffer: b, + Time: time.Now(), + Addr: addr.IP.String()}: default: s.drops++ if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 { @@ -468,12 +492,16 @@ func (s *Statsd) parser() error { select { case <-s.done: return nil - case buf := <-s.in: - lines := strings.Split(buf.String(), "\n") - s.bufPool.Put(buf) + case in := <-s.in: + lines := strings.Split(in.Buffer.String(), "\n") + s.bufPool.Put(in.Buffer) for _, line := range lines { line = strings.TrimSpace(line) - if line != "" { + switch { + case line == "": + case s.DataDogExtensions && strings.HasPrefix(line, "_e"): + s.parseEventMessage(in.Time, line, in.Addr) + default: s.parseStatsdLine(line) } } @@ -488,7 +516,7 @@ func (s *Statsd) parseStatsdLine(line string) error { defer s.Unlock() lineTags := make(map[string]string) - if s.ParseDataDogTags { + if s.DataDogExtensions { recombinedSegments := make([]string, 0) // datadog tags look like this: // users.online:1|c|@0.5|#country:china,environment:production @@ -499,24 +527,7 @@ func (s *Statsd) parseStatsdLine(line string) error { for _, segment := range pipesplit { if len(segment) > 0 && segment[0] == '#' { // we have ourselves a tag; they are comma separated - tagstr := segment[1:] - tags := strings.Split(tagstr, ",") - for _, tag := range tags { - ts := strings.SplitN(tag, ":", 2) - var k, v string - switch len(ts) { - case 1: - // just a tag - k = ts[0] - v = "" - case 2: - k = ts[0] - v = ts[1] - } - if k != "" { - lineTags[k] = v - } - } + parseDataDogTags(lineTags, segment[1:]) } else { recombinedSegments = append(recombinedSegments, segment) } @@ -527,7 +538,7 @@ func (s *Statsd) parseStatsdLine(line string) error { // Validate splitting the line on ":" bits := strings.Split(line, ":") if len(bits) < 2 { - log.Printf("E! Error: splitting ':', Unable to parse metric: %s\n", line) + log.Printf("E! [inputs.statsd] Error: splitting ':', Unable to parse metric: %s\n", line) return errors.New("Error Parsing statsd line") } @@ -543,11 +554,11 @@ func (s *Statsd) parseStatsdLine(line string) error { // Validate splitting the bit on "|" pipesplit := strings.Split(bit, "|") if len(pipesplit) < 2 { - log.Printf("E! Error: splitting '|', Unable to parse metric: %s\n", line) + log.Printf("E! [inputs.statsd] Error: splitting '|', Unable to parse metric: %s\n", line) return errors.New("Error Parsing statsd line") } else if len(pipesplit) > 2 { sr := pipesplit[2] - errmsg := "E! Error: parsing sample rate, %s, it must be in format like: " + + errmsg := "E! [inputs.statsd] parsing sample rate, %s, it must be in format like: " + "@0.1, @0.5, etc. Ignoring sample rate for line: %s\n" if strings.Contains(sr, "@") && len(sr) > 1 { samplerate, err := strconv.ParseFloat(sr[1:], 64) @@ -567,14 +578,14 @@ func (s *Statsd) parseStatsdLine(line string) error { case "g", "c", "s", "ms", "h": m.mtype = pipesplit[1] default: - log.Printf("E! Error: Statsd Metric type %s unsupported", pipesplit[1]) + log.Printf("E! [inputs.statsd] Error: Statsd Metric type %s unsupported", pipesplit[1]) return errors.New("Error Parsing statsd line") } // Parse the value if strings.HasPrefix(pipesplit[0], "-") || strings.HasPrefix(pipesplit[0], "+") { if m.mtype != "g" && m.mtype != "c" { - log.Printf("E! Error: +- values are only supported for gauges & counters: %s\n", line) + log.Printf("E! [inputs.statsd] Error: +- values are only supported for gauges & counters: %s\n", line) return errors.New("Error Parsing statsd line") } m.additive = true @@ -584,7 +595,7 @@ func (s *Statsd) parseStatsdLine(line string) error { case "g", "ms", "h": v, err := strconv.ParseFloat(pipesplit[0], 64) if err != nil { - log.Printf("E! Error: parsing value to float64: %s\n", line) + log.Printf("E! [inputs.statsd] Error: parsing value to float64: %s\n", line) return errors.New("Error Parsing statsd line") } m.floatvalue = v @@ -594,7 +605,7 @@ func (s *Statsd) parseStatsdLine(line string) error { if err != nil { v2, err2 := strconv.ParseFloat(pipesplit[0], 64) if err2 != nil { - log.Printf("E! Error: parsing value to int64: %s\n", line) + log.Printf("E! [inputs.statsd] Error: parsing value to int64: %s\n", line) return errors.New("Error Parsing statsd line") } v = int64(v2) @@ -622,7 +633,6 @@ func (s *Statsd) parseStatsdLine(line string) error { case "h": m.tags["metric_type"] = "histogram" } - if len(lineTags) > 0 { for k, v := range lineTags { m.tags[k] = v @@ -807,7 +817,14 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { s.forget(id) s.CurrentConnections.Incr(-1) }() - + addr := conn.RemoteAddr() + parsedURL, err := url.Parse(addr.String()) + if err != nil { + // this should never happen because the conn handler should give us parsable addresses, + // but if it does we will know + log.Printf("E! [inputs.statsd] failed to parse %s\n", addr) + return // close the connetion and return + } var n int scanner := bufio.NewScanner(conn) for { @@ -831,7 +848,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { b.WriteByte('\n') select { - case s.in <- b: + case s.in <- input{Buffer: b, Time: time.Now(), Addr: parsedURL.Host}: default: s.drops++ if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { @@ -845,8 +862,8 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { // refuser refuses a TCP connection func (s *Statsd) refuser(conn *net.TCPConn) { conn.Close() - log.Printf("I! Refused TCP Connection from %s", conn.RemoteAddr()) - log.Printf("I! WARNING: Maximum TCP Connections reached, you may want to" + + log.Printf("I! [inputs.statsd] Refused TCP Connection from %s", conn.RemoteAddr()) + log.Printf("I! [inputs.statsd] WARNING: Maximum TCP Connections reached, you may want to" + " adjust max_tcp_connections") } @@ -866,7 +883,7 @@ func (s *Statsd) remember(id string, conn *net.TCPConn) { func (s *Statsd) Stop() { s.Lock() - log.Println("I! Stopping the statsd service") + log.Println("I! [inputs.statsd] Stopping the statsd service") close(s.done) if s.isUDP() { s.UDPlistener.Close() diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 1e50c8341..4a856902d 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -1,8 +1,6 @@ package statsd import ( - "bytes" - "errors" "fmt" "net" "testing" @@ -17,8 +15,8 @@ const ( testMsg = "test.tcp.msg:100|c" ) -func newTestTcpListener() (*Statsd, chan *bytes.Buffer) { - in := make(chan *bytes.Buffer, 1500) +func newTestTCPListener() (*Statsd, chan input) { + in := make(chan input, 1500) listener := &Statsd{ Protocol: "tcp", ServiceAddress: "localhost:8125", @@ -35,7 +33,7 @@ func NewTestStatsd() *Statsd { // Make data structures s.done = make(chan struct{}) - s.in = make(chan *bytes.Buffer, s.AllowedPendingMessages) + s.in = make(chan input, s.AllowedPendingMessages) s.gauges = make(map[string]cachedgauge) s.counters = make(map[string]cachedcounter) s.sets = make(map[string]cachedset) @@ -189,7 +187,7 @@ func BenchmarkTCP(b *testing.B) { // Valid lines should be parsed and their values should be cached func TestParse_ValidLines(t *testing.T) { s := NewTestStatsd() - valid_lines := []string{ + validLines := []string{ "valid:45|c", "valid:45|s", "valid:45|g", @@ -197,7 +195,7 @@ func TestParse_ValidLines(t *testing.T) { "valid.timer:45|h", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) @@ -210,7 +208,7 @@ func TestParse_Gauges(t *testing.T) { s := NewTestStatsd() // Test that gauge +- values work - valid_lines := []string{ + validLines := []string{ "plus.minus:100|g", "plus.minus:-10|g", "plus.minus:+30|g", @@ -228,7 +226,7 @@ func TestParse_Gauges(t *testing.T) { "scientific.notation.minus:4.7E-5|g", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) @@ -274,7 +272,7 @@ func TestParse_Gauges(t *testing.T) { } for _, test := range validations { - err := test_validate_gauge(test.name, test.value, s.gauges) + err := testValidateGauge(test.name, test.value, s.gauges) if err != nil { t.Error(err.Error()) } @@ -286,7 +284,7 @@ func TestParse_Sets(t *testing.T) { s := NewTestStatsd() // Test that sets work - valid_lines := []string{ + validLines := []string{ "unique.user.ids:100|s", "unique.user.ids:100|s", "unique.user.ids:100|s", @@ -306,7 +304,7 @@ func TestParse_Sets(t *testing.T) { "string.sets:bar|s", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) @@ -336,7 +334,7 @@ func TestParse_Sets(t *testing.T) { } for _, test := range validations { - err := test_validate_set(test.name, test.value, s.sets) + err := testValidateSet(test.name, test.value, s.sets) if err != nil { t.Error(err.Error()) } @@ -348,7 +346,7 @@ func TestParse_Counters(t *testing.T) { s := NewTestStatsd() // Test that counters work - valid_lines := []string{ + validLines := []string{ "small.inc:1|c", "big.inc:100|c", "big.inc:1|c", @@ -363,7 +361,7 @@ func TestParse_Counters(t *testing.T) { "negative.test:-5|c", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) @@ -401,7 +399,7 @@ func TestParse_Counters(t *testing.T) { } for _, test := range validations { - err := test_validate_counter(test.name, test.value, s.counters) + err := testValidateCounter(test.name, test.value, s.counters) if err != nil { t.Error(err.Error()) } @@ -415,7 +413,7 @@ func TestParse_Timings(t *testing.T) { acc := &testutil.Accumulator{} // Test that counters work - valid_lines := []string{ + validLines := []string{ "test.timing:1|ms", "test.timing:11|ms", "test.timing:1|ms", @@ -423,7 +421,7 @@ func TestParse_Timings(t *testing.T) { "test.timing:1|ms", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) @@ -464,7 +462,7 @@ func TestParseScientificNotation(t *testing.T) { // Invalid lines should return an error func TestParse_InvalidLines(t *testing.T) { s := NewTestStatsd() - invalid_lines := []string{ + invalidLines := []string{ "i.dont.have.a.pipe:45g", "i.dont.have.a.colon45|c", "invalid.metric.type:45|e", @@ -475,7 +473,7 @@ func TestParse_InvalidLines(t *testing.T) { "invalid.value:d11|c", "invalid.value:1d1|c", } - for _, line := range invalid_lines { + for _, line := range invalidLines { err := s.parseStatsdLine(line) if err == nil { t.Errorf("Parsing line %s should have resulted in an error\n", line) @@ -486,21 +484,21 @@ func TestParse_InvalidLines(t *testing.T) { // Invalid sample rates should be ignored and not applied func TestParse_InvalidSampleRate(t *testing.T) { s := NewTestStatsd() - invalid_lines := []string{ + invalidLines := []string{ "invalid.sample.rate:45|c|0.1", "invalid.sample.rate.2:45|c|@foo", "invalid.sample.rate:45|g|@0.1", "invalid.sample.rate:45|s|@0.1", } - for _, line := range invalid_lines { + for _, line := range invalidLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } } - counter_validations := []struct { + counterValidations := []struct { name string value int64 cache map[string]cachedcounter @@ -517,19 +515,19 @@ func TestParse_InvalidSampleRate(t *testing.T) { }, } - for _, test := range counter_validations { - err := test_validate_counter(test.name, test.value, test.cache) + for _, test := range counterValidations { + err := testValidateCounter(test.name, test.value, test.cache) if err != nil { t.Error(err.Error()) } } - err := test_validate_gauge("invalid_sample_rate", 45, s.gauges) + err := testValidateGauge("invalid_sample_rate", 45, s.gauges) if err != nil { t.Error(err.Error()) } - err = test_validate_set("invalid_sample_rate", 1, s.sets) + err = testValidateSet("invalid_sample_rate", 1, s.sets) if err != nil { t.Error(err.Error()) } @@ -538,12 +536,12 @@ func TestParse_InvalidSampleRate(t *testing.T) { // Names should be parsed like . -> _ func TestParse_DefaultNameParsing(t *testing.T) { s := NewTestStatsd() - valid_lines := []string{ + validLines := []string{ "valid:1|c", "valid.foo-bar:11|c", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) @@ -565,7 +563,7 @@ func TestParse_DefaultNameParsing(t *testing.T) { } for _, test := range validations { - err := test_validate_counter(test.name, test.value, s.counters) + err := testValidateCounter(test.name, test.value, s.counters) if err != nil { t.Error(err.Error()) } @@ -607,7 +605,7 @@ func TestParse_Template(t *testing.T) { // Validate counters for _, test := range validations { - err := test_validate_counter(test.name, test.value, s.counters) + err := testValidateCounter(test.name, test.value, s.counters) if err != nil { t.Error(err.Error()) } @@ -649,7 +647,7 @@ func TestParse_TemplateFilter(t *testing.T) { // Validate counters for _, test := range validations { - err := test_validate_counter(test.name, test.value, s.counters) + err := testValidateCounter(test.name, test.value, s.counters) if err != nil { t.Error(err.Error()) } @@ -687,7 +685,7 @@ func TestParse_TemplateSpecificity(t *testing.T) { // Validate counters for _, test := range validations { - err := test_validate_counter(test.name, test.value, s.counters) + err := testValidateCounter(test.name, test.value, s.counters) if err != nil { t.Error(err.Error()) } @@ -723,7 +721,7 @@ func TestParse_TemplateFields(t *testing.T) { } } - counter_tests := []struct { + counterTests := []struct { name string value int64 field string @@ -745,14 +743,14 @@ func TestParse_TemplateFields(t *testing.T) { }, } // Validate counters - for _, test := range counter_tests { - err := test_validate_counter(test.name, test.value, s.counters, test.field) + for _, test := range counterTests { + err := testValidateCounter(test.name, test.value, s.counters, test.field) if err != nil { t.Error(err.Error()) } } - gauge_tests := []struct { + gaugeTests := []struct { name string value float64 field string @@ -769,14 +767,14 @@ func TestParse_TemplateFields(t *testing.T) { }, } // Validate gauges - for _, test := range gauge_tests { - err := test_validate_gauge(test.name, test.value, s.gauges, test.field) + for _, test := range gaugeTests { + err := testValidateGauge(test.name, test.value, s.gauges, test.field) if err != nil { t.Error(err.Error()) } } - set_tests := []struct { + setTests := []struct { name string value int64 field string @@ -793,8 +791,8 @@ func TestParse_TemplateFields(t *testing.T) { }, } // Validate sets - for _, test := range set_tests { - err := test_validate_set(test.name, test.value, s.sets, test.field) + for _, test := range setTests { + err := testValidateSet(test.name, test.value, s.sets, test.field) if err != nil { t.Error(err.Error()) } @@ -864,7 +862,7 @@ func TestParse_Tags(t *testing.T) { // Test that DataDog tags are parsed func TestParse_DataDogTags(t *testing.T) { s := NewTestStatsd() - s.ParseDataDogTags = true + s.DataDogExtensions = true lines := []string{ "my_counter:1|c|#host:localhost,environment:prod,endpoint:/:tenant?/oauth/ro", @@ -873,24 +871,28 @@ func TestParse_DataDogTags(t *testing.T) { "my_timer:3|ms|@0.1|#live,host:localhost", } - testTags := map[string]map[string]string{ + expectedTags := map[string]map[string]string{ "my_counter": { "host": "localhost", "environment": "prod", "endpoint": "/:tenant?/oauth/ro", + "metric_type": "counter", }, "my_gauge": { - "live": "", + "live": "true", + "metric_type": "gauge", }, "my_set": { - "host": "localhost", + "host": "localhost", + "metric_type": "set", }, "my_timer": { - "live": "", - "host": "localhost", + "live": "true", + "host": "localhost", + "metric_type": "timing", }, } @@ -901,18 +903,16 @@ func TestParse_DataDogTags(t *testing.T) { } } - sourceTags := map[string]map[string]string{ + actualTags := map[string]map[string]string{ "my_gauge": tagsForItem(s.gauges), "my_counter": tagsForItem(s.counters), "my_set": tagsForItem(s.sets), "my_timer": tagsForItem(s.timings), } - - for statName, tags := range testTags { - for k, v := range tags { - otherValue := sourceTags[statName][k] - if sourceTags[statName][k] != v { - t.Errorf("Error with %s, tag %s: %s != %s", statName, k, v, otherValue) + for name, tags := range expectedTags { + for expectedK, expectedV := range tags { + if expectedV != actualTags[name][expectedK] { + t.Errorf("failed: expected: %#v != %#v", tags, actualTags[name]) } } } @@ -945,8 +945,8 @@ func TestParseName(t *testing.T) { s := NewTestStatsd() tests := []struct { - in_name string - out_name string + inName string + outName string }{ { "foobar", @@ -963,9 +963,9 @@ func TestParseName(t *testing.T) { } for _, test := range tests { - name, _, _ := s.parseName(test.in_name) - if name != test.out_name { - t.Errorf("Expected: %s, got %s", test.out_name, name) + name, _, _ := s.parseName(test.inName) + if name != test.outName { + t.Errorf("Expected: %s, got %s", test.outName, name) } } @@ -973,8 +973,8 @@ func TestParseName(t *testing.T) { s.MetricSeparator = "." tests = []struct { - in_name string - out_name string + inName string + outName string }{ { "foobar", @@ -991,9 +991,9 @@ func TestParseName(t *testing.T) { } for _, test := range tests { - name, _, _ := s.parseName(test.in_name) - if name != test.out_name { - t.Errorf("Expected: %s, got %s", test.out_name, name) + name, _, _ := s.parseName(test.inName) + if name != test.outName { + t.Errorf("Expected: %s, got %s", test.outName, name) } } } @@ -1004,12 +1004,12 @@ func TestParse_MeasurementsWithSameName(t *testing.T) { s := NewTestStatsd() // Test that counters work - valid_lines := []string{ + validLines := []string{ "test.counter,host=localhost:1|c", "test.counter,host=localhost,region=west:1|c", } - for _, line := range valid_lines { + for _, line := range validLines { err := s.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) @@ -1024,7 +1024,7 @@ func TestParse_MeasurementsWithSameName(t *testing.T) { // Test that measurements with multiple bits, are treated as different outputs // but are equal to their single-measurement representation func TestParse_MeasurementsWithMultipleValues(t *testing.T) { - single_lines := []string{ + singleLines := []string{ "valid.multiple:0|ms|@0.1", "valid.multiple:0|ms|", "valid.multiple:1|ms", @@ -1050,7 +1050,7 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) { "valid.multiple.mixed:1|g", } - multiple_lines := []string{ + multipleLines := []string{ "valid.multiple:0|ms|@0.1:0|ms|:1|ms", "valid.multiple.duplicate:1|c:1|c:2|c:1|c", "valid.multiple.duplicate:1|h:1|h:2|h:1|h", @@ -1059,28 +1059,28 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) { "valid.multiple.mixed:1|c:1|ms:2|s:1|g", } - s_single := NewTestStatsd() - s_multiple := NewTestStatsd() + sSingle := NewTestStatsd() + sMultiple := NewTestStatsd() - for _, line := range single_lines { - err := s_single.parseStatsdLine(line) + for _, line := range singleLines { + err := sSingle.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } } - for _, line := range multiple_lines { - err := s_multiple.parseStatsdLine(line) + for _, line := range multipleLines { + err := sMultiple.parseStatsdLine(line) if err != nil { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } } - if len(s_single.timings) != 3 { - t.Errorf("Expected 3 measurement, found %d", len(s_single.timings)) + if len(sSingle.timings) != 3 { + t.Errorf("Expected 3 measurement, found %d", len(sSingle.timings)) } - if cachedtiming, ok := s_single.timings["metric_type=timingvalid_multiple"]; !ok { + if cachedtiming, ok := sSingle.timings["metric_type=timingvalid_multiple"]; !ok { t.Errorf("Expected cached measurement with hash 'metric_type=timingvalid_multiple' not found") } else { if cachedtiming.name != "valid_multiple" { @@ -1100,60 +1100,60 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) { } } - // test if s_single and s_multiple did compute the same stats for valid.multiple.duplicate - if err := test_validate_set("valid_multiple_duplicate", 2, s_single.sets); err != nil { + // test if sSingle and sMultiple did compute the same stats for valid.multiple.duplicate + if err := testValidateSet("valid_multiple_duplicate", 2, sSingle.sets); err != nil { t.Error(err.Error()) } - if err := test_validate_set("valid_multiple_duplicate", 2, s_multiple.sets); err != nil { + if err := testValidateSet("valid_multiple_duplicate", 2, sMultiple.sets); err != nil { t.Error(err.Error()) } - if err := test_validate_counter("valid_multiple_duplicate", 5, s_single.counters); err != nil { + if err := testValidateCounter("valid_multiple_duplicate", 5, sSingle.counters); err != nil { t.Error(err.Error()) } - if err := test_validate_counter("valid_multiple_duplicate", 5, s_multiple.counters); err != nil { + if err := testValidateCounter("valid_multiple_duplicate", 5, sMultiple.counters); err != nil { t.Error(err.Error()) } - if err := test_validate_gauge("valid_multiple_duplicate", 1, s_single.gauges); err != nil { + if err := testValidateGauge("valid_multiple_duplicate", 1, sSingle.gauges); err != nil { t.Error(err.Error()) } - if err := test_validate_gauge("valid_multiple_duplicate", 1, s_multiple.gauges); err != nil { + if err := testValidateGauge("valid_multiple_duplicate", 1, sMultiple.gauges); err != nil { t.Error(err.Error()) } - // test if s_single and s_multiple did compute the same stats for valid.multiple.mixed - if err := test_validate_set("valid_multiple_mixed", 1, s_single.sets); err != nil { + // test if sSingle and sMultiple did compute the same stats for valid.multiple.mixed + if err := testValidateSet("valid_multiple_mixed", 1, sSingle.sets); err != nil { t.Error(err.Error()) } - if err := test_validate_set("valid_multiple_mixed", 1, s_multiple.sets); err != nil { + if err := testValidateSet("valid_multiple_mixed", 1, sMultiple.sets); err != nil { t.Error(err.Error()) } - if err := test_validate_counter("valid_multiple_mixed", 1, s_single.counters); err != nil { + if err := testValidateCounter("valid_multiple_mixed", 1, sSingle.counters); err != nil { t.Error(err.Error()) } - if err := test_validate_counter("valid_multiple_mixed", 1, s_multiple.counters); err != nil { + if err := testValidateCounter("valid_multiple_mixed", 1, sMultiple.counters); err != nil { t.Error(err.Error()) } - if err := test_validate_gauge("valid_multiple_mixed", 1, s_single.gauges); err != nil { + if err := testValidateGauge("valid_multiple_mixed", 1, sSingle.gauges); err != nil { t.Error(err.Error()) } - if err := test_validate_gauge("valid_multiple_mixed", 1, s_multiple.gauges); err != nil { + if err := testValidateGauge("valid_multiple_mixed", 1, sMultiple.gauges); err != nil { t.Error(err.Error()) } } // Tests low-level functionality of timings when multiple fields is enabled // and a measurement template has been defined which can parse field names -func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) { +func TestParse_TimingsMultipleFieldsWithTemplate(t *testing.T) { s := NewTestStatsd() s.Templates = []string{"measurement.field"} s.Percentiles = []int{90} @@ -1204,7 +1204,7 @@ func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) { // Tests low-level functionality of timings when multiple fields is enabled // but a measurement template hasn't been defined so we can't parse field names // In this case the behaviour should be the same as normal behaviour -func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) { +func TestParse_TimingsMultipleFieldsWithoutTemplate(t *testing.T) { s := NewTestStatsd() s.Templates = []string{} s.Percentiles = []int{90} @@ -1420,14 +1420,14 @@ func TestParse_Gauges_Delete(t *testing.T) { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } - err = test_validate_gauge("current_users", 100, s.gauges) + err = testValidateGauge("current_users", 100, s.gauges) if err != nil { t.Error(err.Error()) } s.Gather(fakeacc) - err = test_validate_gauge("current_users", 100, s.gauges) + err = testValidateGauge("current_users", 100, s.gauges) if err == nil { t.Error("current_users_gauge metric should have been deleted") } @@ -1446,14 +1446,14 @@ func TestParse_Sets_Delete(t *testing.T) { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } - err = test_validate_set("unique_user_ids", 1, s.sets) + err = testValidateSet("unique_user_ids", 1, s.sets) if err != nil { t.Error(err.Error()) } s.Gather(fakeacc) - err = test_validate_set("unique_user_ids", 1, s.sets) + err = testValidateSet("unique_user_ids", 1, s.sets) if err == nil { t.Error("unique_user_ids_set metric should have been deleted") } @@ -1472,14 +1472,14 @@ func TestParse_Counters_Delete(t *testing.T) { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } - err = test_validate_counter("total_users", 100, s.counters) + err = testValidateCounter("total_users", 100, s.counters) if err != nil { t.Error(err.Error()) } s.Gather(fakeacc) - err = test_validate_counter("total_users", 100, s.counters) + err = testValidateCounter("total_users", 100, s.counters) if err == nil { t.Error("total_users_counter metric should have been deleted") } @@ -1504,8 +1504,7 @@ func TestParseKeyValue(t *testing.T) { } // Test utility functions - -func test_validate_set( +func testValidateSet( name string, value int64, cache map[string]cachedset, @@ -1527,17 +1526,16 @@ func test_validate_set( } } if !found { - return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) + return fmt.Errorf("test Error: Metric name %s not found", name) } if value != int64(len(metric.fields[f])) { - return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n", - name, value, len(metric.fields[f]))) + return fmt.Errorf("measurement: %s, expected %d, actual %d", name, value, len(metric.fields[f])) } return nil } -func test_validate_counter( +func testValidateCounter( name string, valueExpected int64, cache map[string]cachedcounter, @@ -1559,17 +1557,16 @@ func test_validate_counter( } } if !found { - return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) + return fmt.Errorf("test Error: Metric name %s not found", name) } if valueExpected != valueActual { - return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n", - name, valueExpected, valueActual)) + return fmt.Errorf("measurement: %s, expected %d, actual %d", name, valueExpected, valueActual) } return nil } -func test_validate_gauge( +func testValidateGauge( name string, valueExpected float64, cache map[string]cachedgauge, @@ -1591,12 +1588,11 @@ func test_validate_gauge( } } if !found { - return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) + return fmt.Errorf("test Error: Metric name %s not found", name) } if valueExpected != valueActual { - return errors.New(fmt.Sprintf("Measurement: %s, expected %f, actual %f\n", - name, valueExpected, valueActual)) + return fmt.Errorf("Measurement: %s, expected %f, actual %f", name, valueExpected, valueActual) } return nil }