Add multicast support to socket_listener input
This commit is contained in:
parent
50c1103657
commit
1872356103
|
@ -114,7 +114,7 @@ func (ssl *streamSocketListener) read(c net.Conn) {
|
||||||
metrics, err := ssl.Parse(scnr.Bytes())
|
metrics, err := ssl.Parse(scnr.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ssl.AddError(fmt.Errorf("unable to parse incoming line: %s", err))
|
ssl.AddError(fmt.Errorf("unable to parse incoming line: %s", err))
|
||||||
//TODO rate limit
|
// TODO rate limit
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
|
@ -150,7 +150,7 @@ func (psl *packetSocketListener) listen() {
|
||||||
metrics, err := psl.Parse(buf[:n])
|
metrics, err := psl.Parse(buf[:n])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
psl.AddError(fmt.Errorf("unable to parse incoming packet: %s", err))
|
psl.AddError(fmt.Errorf("unable to parse incoming packet: %s", err))
|
||||||
//TODO rate limit
|
// TODO rate limit
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
|
@ -284,7 +284,7 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
|
||||||
sl.Closer = ssl
|
sl.Closer = ssl
|
||||||
go ssl.listen()
|
go ssl.listen()
|
||||||
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
|
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
|
||||||
pc, err := net.ListenPacket(protocol, addr)
|
pc, err := udpListen(protocol, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -317,6 +317,31 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func udpListen(network string, address string) (net.PacketConn, error) {
|
||||||
|
switch network {
|
||||||
|
case "udp", "udp4", "udp6":
|
||||||
|
var addr *net.UDPAddr
|
||||||
|
var err error
|
||||||
|
var ifi *net.Interface
|
||||||
|
if spl := strings.SplitN(address, "%", 2); len(spl) == 2 {
|
||||||
|
address = spl[0]
|
||||||
|
ifi, err = net.InterfaceByName(spl[1])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
addr, err = net.ResolveUDPAddr(network, address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if addr.IP.IsMulticast() {
|
||||||
|
return net.ListenMulticastUDP(network, ifi, addr)
|
||||||
|
}
|
||||||
|
return net.ListenUDP(network, addr)
|
||||||
|
}
|
||||||
|
return net.ListenPacket(network, address)
|
||||||
|
}
|
||||||
|
|
||||||
func (sl *SocketListener) Stop() {
|
func (sl *SocketListener) Stop() {
|
||||||
if sl.Closer != nil {
|
if sl.Closer != nil {
|
||||||
sl.Close()
|
sl.Close()
|
||||||
|
|
Loading…
Reference in New Issue