Fix parsing of remote tcp address in statsd input (#6031)

This commit is contained in:
Daniel Nelson 2019-06-25 12:04:39 -07:00 committed by GitHub
parent e8a596858c
commit a231b3e79d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 58 additions and 25 deletions

View File

@ -76,7 +76,9 @@ func (s *Statsd) parseEventMessage(now time.Time, message string, defaultHostnam
fields := make(map[string]interface{}, 9) fields := make(map[string]interface{}, 9)
fields["alert_type"] = eventInfo // default event type fields["alert_type"] = eventInfo // default event type
fields["text"] = uncommenter.Replace(string(rawText)) fields["text"] = uncommenter.Replace(string(rawText))
tags["source"] = defaultHostname // Use source tag because host is reserved tag key in Telegraf. if defaultHostname != "" {
tags["source"] = defaultHostname
}
fields["priority"] = priorityNormal fields["priority"] = priorityNormal
ts := now ts := now
if len(message) < 2 { if len(message) < 2 {

View File

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"log" "log"
"net" "net"
"net/url"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -817,14 +816,12 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
s.forget(id) s.forget(id)
s.CurrentConnections.Incr(-1) s.CurrentConnections.Incr(-1)
}() }()
addr := conn.RemoteAddr()
parsedURL, err := url.Parse(addr.String()) var remoteIP string
if err != nil { if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
// this should never happen because the conn handler should give us parsable addresses, remoteIP = addr.IP.String()
// but if it does we will know
log.Printf("E! [inputs.statsd] failed to parse %s\n", addr)
return // close the connetion and return
} }
var n int var n int
scanner := bufio.NewScanner(conn) scanner := bufio.NewScanner(conn)
for { for {
@ -848,7 +845,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
b.WriteByte('\n') b.WriteByte('\n')
select { select {
case s.in <- input{Buffer: b, Time: time.Now(), Addr: parsedURL.Host}: case s.in <- input{Buffer: b, Time: time.Now(), Addr: remoteIP}:
default: default:
s.drops++ s.drops++
if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 {

View File

@ -6,6 +6,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -15,19 +16,6 @@ const (
testMsg = "test.tcp.msg:100|c" testMsg = "test.tcp.msg:100|c"
) )
func newTestTCPListener() (*Statsd, chan input) {
in := make(chan input, 1500)
listener := &Statsd{
Protocol: "tcp",
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
in: in,
done: make(chan struct{}),
}
return listener, in
}
func NewTestStatsd() *Statsd { func NewTestStatsd() *Statsd {
s := Statsd{} s := Statsd{}
@ -1596,3 +1584,49 @@ func testValidateGauge(
} }
return nil return nil
} }
func TestTCP(t *testing.T) {
statsd := Statsd{
Protocol: "tcp",
ServiceAddress: "localhost:0",
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
}
var acc testutil.Accumulator
require.NoError(t, statsd.Start(&acc))
defer statsd.Stop()
addr := statsd.TCPlistener.Addr().String()
conn, err := net.Dial("tcp", addr)
_, err = conn.Write([]byte("cpu.time_idle:42|c\n"))
require.NoError(t, err)
err = conn.Close()
require.NoError(t, err)
for {
err = statsd.Gather(&acc)
require.NoError(t, err)
if len(acc.Metrics) > 0 {
break
}
}
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 42,
},
time.Now(),
),
},
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}

View File

@ -143,7 +143,7 @@ func MetricEqual(expected, actual telegraf.Metric) bool {
// RequireMetricEqual halts the test with an error if the metrics are not // RequireMetricEqual halts the test with an error if the metrics are not
// equal. // equal.
func RequireMetricEqual(t *testing.T, expected, actual telegraf.Metric) { func RequireMetricEqual(t *testing.T, expected, actual telegraf.Metric, opts ...cmp.Option) {
t.Helper() t.Helper()
var lhs, rhs *metricDiff var lhs, rhs *metricDiff
@ -154,7 +154,7 @@ func RequireMetricEqual(t *testing.T, expected, actual telegraf.Metric) {
rhs = newMetricDiff(actual) rhs = newMetricDiff(actual)
} }
if diff := cmp.Diff(lhs, rhs); diff != "" { if diff := cmp.Diff(lhs, rhs, opts...); diff != "" {
t.Fatalf("telegraf.Metric\n--- expected\n+++ actual\n%s", diff) t.Fatalf("telegraf.Metric\n--- expected\n+++ actual\n%s", diff)
} }
} }