Add UDP IPv6 support to statsd input (#3344)

This commit is contained in:
Craig Wickesser 2017-10-16 17:18:36 -04:00 committed by Daniel Nelson
parent 3ea41e885c
commit 246ffab3e0
2 changed files with 13 additions and 12 deletions

View File

@ -5,7 +5,7 @@
```toml ```toml
# Statsd Server # Statsd Server
[[inputs.statsd]] [[inputs.statsd]]
## Protocol, must be "tcp" or "udp" (default=udp) ## Protocol, must be "tcp", "udp4", "udp6" or "udp" (default=udp)
protocol = "udp" protocol = "udp"
## MaxTCPConnection - applicable when protocol is set to tcp (default=250) ## MaxTCPConnection - applicable when protocol is set to tcp (default=250)

View File

@ -171,7 +171,7 @@ func (_ *Statsd) Description() string {
} }
const sampleConfig = ` const sampleConfig = `
## Protocol, must be "tcp" or "udp" (default=udp) ## Protocol, must be "tcp", "udp", "udp4" or "udp6" (default=udp)
protocol = "udp" protocol = "udp"
## MaxTCPConnection - applicable when protocol is set to tcp (default=250) ## MaxTCPConnection - applicable when protocol is set to tcp (default=250)
@ -327,10 +327,9 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
s.wg.Add(2) s.wg.Add(2)
// Start the UDP listener // Start the UDP listener
switch s.Protocol { if s.isUDP() {
case "udp":
go s.udpListen() go s.udpListen()
case "tcp": } else {
go s.tcpListen() go s.tcpListen()
} }
// Start the line parser // Start the line parser
@ -382,8 +381,8 @@ func (s *Statsd) tcpListen() error {
func (s *Statsd) udpListen() error { func (s *Statsd) udpListen() error {
defer s.wg.Done() defer s.wg.Done()
var err error var err error
address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress) address, _ := net.ResolveUDPAddr(s.Protocol, s.ServiceAddress)
s.UDPlistener, err = net.ListenUDP("udp", address) s.UDPlistener, err = net.ListenUDP(s.Protocol, address)
if err != nil { if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err) log.Fatalf("ERROR: ListenUDP - %s", err)
} }
@ -825,10 +824,9 @@ func (s *Statsd) Stop() {
s.Lock() s.Lock()
log.Println("I! Stopping the statsd service") log.Println("I! Stopping the statsd service")
close(s.done) close(s.done)
switch s.Protocol { if s.isUDP() {
case "udp":
s.UDPlistener.Close() s.UDPlistener.Close()
case "tcp": } else {
s.TCPlistener.Close() s.TCPlistener.Close()
// Close all open TCP connections // Close all open TCP connections
// - get all conns from the s.conns map and put into slice // - get all conns from the s.conns map and put into slice
@ -843,8 +841,6 @@ func (s *Statsd) Stop() {
for _, conn := range conns { for _, conn := range conns {
conn.Close() conn.Close()
} }
default:
s.UDPlistener.Close()
} }
s.Unlock() s.Unlock()
@ -856,6 +852,11 @@ func (s *Statsd) Stop() {
s.Unlock() s.Unlock()
} }
// IsUDP returns true if the protocol is UDP, false otherwise.
func (s *Statsd) isUDP() bool {
return strings.HasPrefix(s.Protocol, "udp")
}
func init() { func init() {
inputs.Add("statsd", func() telegraf.Input { inputs.Add("statsd", func() telegraf.Input {
return &Statsd{ return &Statsd{