From a231b3e79d7014a04879db0bb9538c5985cd00ca Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 25 Jun 2019 12:04:39 -0700 Subject: [PATCH] Fix parsing of remote tcp address in statsd input (#6031) --- plugins/inputs/statsd/datadog.go | 4 +- plugins/inputs/statsd/statsd.go | 15 +++---- plugins/inputs/statsd/statsd_test.go | 60 ++++++++++++++++++++++------ testutil/metric.go | 4 +- 4 files changed, 58 insertions(+), 25 deletions(-) diff --git a/plugins/inputs/statsd/datadog.go b/plugins/inputs/statsd/datadog.go index f2785ff38..6cce2316e 100644 --- a/plugins/inputs/statsd/datadog.go +++ b/plugins/inputs/statsd/datadog.go @@ -76,7 +76,9 @@ func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostnam fields := make(map[string]interface{}, 9) fields["alert_type"] = eventInfo // default event type fields["text"] = uncommenter.Replace(string(rawText)) - tags["source"] = defaultHostname // Use source tag because host is reserved tag key in Telegraf. + if defaultHostname != "" { + tags["source"] = defaultHostname + } fields["priority"] = priorityNormal ts := now if len(message) < 2 { diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 7408482b6..89d67b1ee 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -7,7 +7,6 @@ import ( "fmt" "log" "net" - "net/url" "sort" "strconv" "strings" @@ -817,14 +816,12 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { s.forget(id) s.CurrentConnections.Incr(-1) }() - addr := conn.RemoteAddr() - parsedURL, err := url.Parse(addr.String()) - if err != nil { - // this should never happen because the conn handler should give us parsable addresses, - // but if it does we will know - log.Printf("E! [inputs.statsd] failed to parse %s\n", addr) - return // close the connetion and return + + var remoteIP string + if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok { + remoteIP = addr.IP.String() } + var n int scanner := bufio.NewScanner(conn) for { @@ -848,7 +845,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { b.WriteByte('\n') select { - case s.in <- input{Buffer: b, Time: time.Now(), Addr: parsedURL.Host}: + case s.in <- input{Buffer: b, Time: time.Now(), Addr: remoteIP}: default: s.drops++ if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 4a856902d..9f760b9f9 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -15,19 +16,6 @@ const ( testMsg = "test.tcp.msg:100|c" ) -func newTestTCPListener() (*Statsd, chan input) { - in := make(chan input, 1500) - listener := &Statsd{ - Protocol: "tcp", - ServiceAddress: "localhost:8125", - AllowedPendingMessages: 10000, - MaxTCPConnections: 250, - in: in, - done: make(chan struct{}), - } - return listener, in -} - func NewTestStatsd() *Statsd { s := Statsd{} @@ -1596,3 +1584,49 @@ func testValidateGauge( } return nil } + +func TestTCP(t *testing.T) { + statsd := Statsd{ + Protocol: "tcp", + ServiceAddress: "localhost:0", + AllowedPendingMessages: 10000, + MaxTCPConnections: 2, + } + var acc testutil.Accumulator + require.NoError(t, statsd.Start(&acc)) + defer statsd.Stop() + + addr := statsd.TCPlistener.Addr().String() + + conn, err := net.Dial("tcp", addr) + _, err = conn.Write([]byte("cpu.time_idle:42|c\n")) + require.NoError(t, err) + err = conn.Close() + require.NoError(t, err) + + for { + err = statsd.Gather(&acc) + require.NoError(t, err) + + if len(acc.Metrics) > 0 { + break + } + } + + testutil.RequireMetricsEqual(t, + []telegraf.Metric{ + testutil.MustMetric( + "cpu_time_idle", + map[string]string{ + "metric_type": "counter", + }, + map[string]interface{}{ + "value": 42, + }, + time.Now(), + ), + }, + acc.GetTelegrafMetrics(), + testutil.IgnoreTime(), + ) +} diff --git a/testutil/metric.go b/testutil/metric.go index 0dca9c641..25e23fa20 100644 --- a/testutil/metric.go +++ b/testutil/metric.go @@ -143,7 +143,7 @@ func MetricEqual(expected, actual telegraf.Metric) bool { // RequireMetricEqual halts the test with an error if the metrics are not // equal. -func RequireMetricEqual(t *testing.T, expected, actual telegraf.Metric) { +func RequireMetricEqual(t *testing.T, expected, actual telegraf.Metric, opts ...cmp.Option) { t.Helper() var lhs, rhs *metricDiff @@ -154,7 +154,7 @@ func RequireMetricEqual(t *testing.T, expected, actual telegraf.Metric) { rhs = newMetricDiff(actual) } - if diff := cmp.Diff(lhs, rhs); diff != "" { + if diff := cmp.Diff(lhs, rhs, opts...); diff != "" { t.Fatalf("telegraf.Metric\n--- expected\n+++ actual\n%s", diff) } }