Add native Go ping method to ping input plugin (#6050)

This commit is contained in:
Greg
2019-07-11 16:07:58 -06:00
committed by Daniel Nelson
parent c9107015b0
commit ea6b398fa3
6 changed files with 492 additions and 321 deletions

View File

@@ -1,20 +1,17 @@
// +build !windows
package ping
import (
"context"
"errors"
"fmt"
"math"
"net"
"os/exec"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/glinton/ping"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
@@ -34,7 +31,7 @@ type Ping struct {
// Number of pings to send (ping -c <COUNT>)
Count int
// Ping timeout, in seconds. 0 means no timeout (ping -W <TIMEOUT>)
// Per-ping timeout, in seconds. 0 means no timeout (ping -W <TIMEOUT>)
Timeout float64
// Ping deadline, in seconds. 0 means no deadline. (ping -w <DEADLINE>)
@@ -46,18 +43,27 @@ type Ping struct {
// URLs to ping
Urls []string
// Method defines how to ping (native or exec)
Method string
// Ping executable binary
Binary string
// Arguments for ping command.
// when `Arguments` is not empty, other options (ping_interval, timeout, etc) will be ignored
// Arguments for ping command. When arguments is not empty, system binary will be used and
// other options (ping_interval, timeout, etc) will be ignored
Arguments []string
// Whether to resolve addresses using ipv6 or not.
IPv6 bool
// host ping function
pingHost HostPinger
// listenAddr is the address associated with the interface defined.
listenAddr string
}
func (_ *Ping) Description() string {
func (*Ping) Description() string {
return "Ping given url(s) and return statistics"
}
@@ -69,7 +75,6 @@ const sampleConfig = `
# count = 1
## Interval, in s, at which to ping. 0 == default (ping -i <PING_INTERVAL>)
## Not available in Windows.
# ping_interval = 1.0
## Per-ping timeout, in s. 0 == no timeout (ping -W <TIMEOUT>)
@@ -78,27 +83,53 @@ const sampleConfig = `
## Total-ping deadline, in s. 0 == no deadline (ping -w <DEADLINE>)
# deadline = 10
## Interface or source address to send ping from (ping -I <INTERFACE/SRC_ADDR>)
## on Darwin and Freebsd only source address possible: (ping -S <SRC_ADDR>)
## Interface or source address to send ping from (ping -I[-S] <INTERFACE/SRC_ADDR>)
# interface = ""
## Specify the ping executable binary, default is "ping"
# binary = "ping"
## How to ping. "native" doesn't have external dependencies, while "exec" depends on 'ping'.
# method = "exec"
## Arguments for ping command
## when arguments is not empty, other options (ping_interval, timeout, etc) will be ignored
## Specify the ping executable binary, default is "ping"
# binary = "ping"
## Arguments for ping command. When arguments is not empty, system binary will be used and
## other options (ping_interval, timeout, etc) will be ignored.
# arguments = ["-c", "3"]
## Use only ipv6 addresses when resolving hostnames.
# ipv6 = false
`
func (_ *Ping) SampleConfig() string {
func (*Ping) SampleConfig() string {
return sampleConfig
}
func (p *Ping) Gather(acc telegraf.Accumulator) error {
// Spin off a go routine for each url to ping
for _, url := range p.Urls {
p.wg.Add(1)
go p.pingToURL(url, acc)
if p.Interface != "" && p.listenAddr != "" {
p.listenAddr = getAddr(p.Interface)
}
for _, ip := range p.Urls {
_, err := net.LookupHost(ip)
if err != nil {
acc.AddFields("ping", map[string]interface{}{"result_code": 1}, map[string]string{"ip": ip})
acc.AddError(err)
return nil
}
if p.Method == "native" {
p.wg.Add(1)
go func(ip string) {
defer p.wg.Done()
p.pingToURLNative(ip, acc)
}(ip)
} else {
p.wg.Add(1)
go func(ip string) {
defer p.wg.Done()
p.pingToURL(ip, acc)
}(ip)
}
}
p.wg.Wait()
@@ -106,81 +137,39 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error {
return nil
}
func (p *Ping) pingToURL(u string, acc telegraf.Accumulator) {
defer p.wg.Done()
tags := map[string]string{"url": u}
fields := map[string]interface{}{"result_code": 0}
_, err := net.LookupHost(u)
if err != nil {
acc.AddError(err)
fields["result_code"] = 1
acc.AddFields("ping", fields, tags)
return
func getAddr(iface string) string {
if addr := net.ParseIP(iface); addr != nil {
return addr.String()
}
args := p.args(u, runtime.GOOS)
totalTimeout := 60.0
if len(p.Arguments) == 0 {
totalTimeout = float64(p.Count)*p.Timeout + float64(p.Count-1)*p.PingInterval
ifaces, err := net.Interfaces()
if err != nil {
return ""
}
out, err := p.pingHost(p.Binary, totalTimeout, args...)
if err != nil {
// Some implementations of ping return a 1 exit code on
// timeout, if this occurs we will not exit and try to parse
// the output.
status := -1
if exitError, ok := err.(*exec.ExitError); ok {
if ws, ok := exitError.Sys().(syscall.WaitStatus); ok {
status = ws.ExitStatus()
fields["result_code"] = status
var ip net.IP
for i := range ifaces {
if ifaces[i].Name == iface {
addrs, err := ifaces[i].Addrs()
if err != nil {
return ""
}
if len(addrs) > 0 {
switch v := addrs[0].(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if len(ip) == 0 {
return ""
}
return ip.String()
}
}
if status != 1 {
// Combine go err + stderr output
out = strings.TrimSpace(out)
if len(out) > 0 {
acc.AddError(fmt.Errorf("host %s: %s, %s", u, out, err))
} else {
acc.AddError(fmt.Errorf("host %s: %s", u, err))
}
fields["result_code"] = 2
acc.AddFields("ping", fields, tags)
return
}
}
trans, rec, ttl, min, avg, max, stddev, err := processPingOutput(out)
if err != nil {
// fatal error
acc.AddError(fmt.Errorf("%s: %s", err, u))
fields["result_code"] = 2
acc.AddFields("ping", fields, tags)
return
}
// Calculate packet loss percentage
loss := float64(trans-rec) / float64(trans) * 100.0
fields["packets_transmitted"] = trans
fields["packets_received"] = rec
fields["percent_packet_loss"] = loss
if ttl >= 0 {
fields["ttl"] = ttl
}
if min >= 0 {
fields["minimum_response_ms"] = min
}
if avg >= 0 {
fields["average_response_ms"] = avg
}
if max >= 0 {
fields["maximum_response_ms"] = max
}
if stddev >= 0 {
fields["standard_deviation_ms"] = stddev
}
acc.AddFields("ping", fields, tags)
return ""
}
func hostPinger(binary string, timeout float64, args ...string) (string, error) {
@@ -194,137 +183,156 @@ func hostPinger(binary string, timeout float64, args ...string) (string, error)
return string(out), err
}
// args returns the arguments for the 'ping' executable
func (p *Ping) args(url string, system string) []string {
if len(p.Arguments) > 0 {
return append(p.Arguments, url)
func (p *Ping) pingToURLNative(destination string, acc telegraf.Accumulator) {
ctx := context.Background()
network := "ip4"
if p.IPv6 {
network = "ip6"
}
// build the ping command args based on toml config
args := []string{"-c", strconv.Itoa(p.Count), "-n", "-s", "16"}
if p.PingInterval > 0 {
args = append(args, "-i", strconv.FormatFloat(p.PingInterval, 'f', -1, 64))
host, err := net.ResolveIPAddr(network, destination)
if err != nil {
acc.AddFields("ping", map[string]interface{}{"result_code": 1}, map[string]string{"url": destination})
acc.AddError(err)
return
}
if p.Timeout > 0 {
switch system {
case "darwin":
args = append(args, "-W", strconv.FormatFloat(p.Timeout*1000, 'f', -1, 64))
case "freebsd", "netbsd", "openbsd":
args = append(args, "-W", strconv.FormatFloat(p.Timeout*1000, 'f', -1, 64))
case "linux":
args = append(args, "-W", strconv.FormatFloat(p.Timeout, 'f', -1, 64))
default:
// Not sure the best option here, just assume GNU ping?
args = append(args, "-W", strconv.FormatFloat(p.Timeout, 'f', -1, 64))
}
interval := p.PingInterval
if interval < 0.2 {
interval = 0.2
}
timeout := p.Timeout
if timeout == 0 {
timeout = 5
}
tick := time.NewTicker(time.Duration(interval * float64(time.Second)))
defer tick.Stop()
if p.Deadline > 0 {
switch system {
case "darwin", "freebsd", "netbsd", "openbsd":
args = append(args, "-t", strconv.Itoa(p.Deadline))
case "linux":
args = append(args, "-w", strconv.Itoa(p.Deadline))
default:
// not sure the best option here, just assume gnu ping?
args = append(args, "-w", strconv.Itoa(p.Deadline))
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(p.Deadline)*time.Second)
defer cancel()
}
resps := make(chan *ping.Response)
rsps := []*ping.Response{}
r := &sync.WaitGroup{}
r.Add(1)
go func() {
for res := range resps {
rsps = append(rsps, res)
}
r.Done()
}()
wg := &sync.WaitGroup{}
c := ping.Client{}
var i int
for i = 0; i < p.Count; i++ {
select {
case <-ctx.Done():
goto finish
case <-tick.C:
ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout*float64(time.Second)))
defer cancel()
wg.Add(1)
go func(seq int) {
defer wg.Done()
resp, err := c.Do(ctx, &ping.Request{
Dst: net.ParseIP(host.String()),
Src: net.ParseIP(p.listenAddr),
Seq: seq,
})
if err != nil {
acc.AddFields("ping", map[string]interface{}{"result_code": 2}, map[string]string{"url": destination})
acc.AddError(err)
return
}
resps <- resp
}(i + 1)
}
}
if p.Interface != "" {
switch system {
case "darwin":
args = append(args, "-I", p.Interface)
case "freebsd", "netbsd", "openbsd":
args = append(args, "-S", p.Interface)
case "linux":
args = append(args, "-I", p.Interface)
default:
// not sure the best option here, just assume gnu ping?
args = append(args, "-i", p.Interface)
finish:
wg.Wait()
close(resps)
r.Wait()
tags, fields := onFin(i, rsps, destination)
acc.AddFields("ping", fields, tags)
}
func onFin(packetsSent int, resps []*ping.Response, destination string) (map[string]string, map[string]interface{}) {
packetsRcvd := len(resps)
tags := map[string]string{"url": destination}
fields := map[string]interface{}{
"result_code": 0,
"packets_transmitted": packetsSent,
"packets_received": packetsRcvd,
}
if packetsSent == 0 {
return tags, fields
}
if packetsRcvd == 0 {
fields["percent_packet_loss"] = float64(100)
return tags, fields
}
fields["percent_packet_loss"] = float64(packetsSent-packetsRcvd) / float64(packetsSent) * 100
ttl := resps[0].TTL
var min, max, avg, total time.Duration
min = resps[0].RTT
max = resps[0].RTT
for _, res := range resps {
if res.RTT < min {
min = res.RTT
}
}
args = append(args, url)
return args
}
// processPingOutput takes in a string output from the ping command, like:
//
// ping www.google.com (173.194.115.84): 56 data bytes
// 64 bytes from 173.194.115.84: icmp_seq=0 ttl=54 time=52.172 ms
// 64 bytes from 173.194.115.84: icmp_seq=1 ttl=54 time=34.843 ms
//
// --- www.google.com ping statistics ---
// 2 packets transmitted, 2 packets received, 0.0% packet loss
// round-trip min/avg/max/stddev = 34.843/43.508/52.172/8.664 ms
//
// It returns (<transmitted packets>, <received packets>, <average response>)
func processPingOutput(out string) (int, int, int, float64, float64, float64, float64, error) {
var trans, recv, ttl int = 0, 0, -1
var min, avg, max, stddev float64 = -1.0, -1.0, -1.0, -1.0
// Set this error to nil if we find a 'transmitted' line
err := errors.New("Fatal error processing ping output")
lines := strings.Split(out, "\n")
for _, line := range lines {
// Reading only first TTL, ignoring other TTL messages
if ttl == -1 && strings.Contains(line, "ttl=") {
ttl, err = getTTL(line)
} else if strings.Contains(line, "transmitted") &&
strings.Contains(line, "received") {
trans, recv, err = getPacketStats(line, trans, recv)
if err != nil {
return trans, recv, ttl, min, avg, max, stddev, err
}
} else if strings.Contains(line, "min/avg/max") {
min, avg, max, stddev, err = checkRoundTripTimeStats(line, min, avg, max, stddev)
if err != nil {
return trans, recv, ttl, min, avg, max, stddev, err
}
if res.RTT > max {
max = res.RTT
}
total += res.RTT
}
return trans, recv, ttl, min, avg, max, stddev, err
avg = total / time.Duration(packetsRcvd)
var sumsquares time.Duration
for _, res := range resps {
sumsquares += (res.RTT - avg) * (res.RTT - avg)
}
stdDev := time.Duration(math.Sqrt(float64(sumsquares / time.Duration(packetsRcvd))))
// Set TTL only on supported platform. See golang.org/x/net/ipv4/payload_cmsg.go
switch runtime.GOOS {
case "aix", "darwin", "dragonfly", "freebsd", "linux", "netbsd", "openbsd", "solaris":
fields["ttl"] = ttl
}
fields["minimum_response_ms"] = float64(min.Nanoseconds()) / float64(time.Millisecond)
fields["average_response_ms"] = float64(avg.Nanoseconds()) / float64(time.Millisecond)
fields["maximum_response_ms"] = float64(max.Nanoseconds()) / float64(time.Millisecond)
fields["standard_deviation_ms"] = float64(stdDev.Nanoseconds()) / float64(time.Millisecond)
return tags, fields
}
func getPacketStats(line string, trans, recv int) (int, int, error) {
stats := strings.Split(line, ", ")
// Transmitted packets
trans, err := strconv.Atoi(strings.Split(stats[0], " ")[0])
if err != nil {
return trans, recv, err
// Init ensures the plugin is configured correctly.
func (p *Ping) Init() error {
if p.Count < 1 {
return errors.New("bad number of packets to transmit")
}
// Received packets
recv, err = strconv.Atoi(strings.Split(stats[1], " ")[0])
return trans, recv, err
}
func getTTL(line string) (int, error) {
ttlLine := regexp.MustCompile(`ttl=(\d+)`)
ttlMatch := ttlLine.FindStringSubmatch(line)
return strconv.Atoi(ttlMatch[1])
}
func checkRoundTripTimeStats(line string, min, avg, max,
stddev float64) (float64, float64, float64, float64, error) {
stats := strings.Split(line, " ")[3]
data := strings.Split(stats, "/")
min, err := strconv.ParseFloat(data[0], 64)
if err != nil {
return min, avg, max, stddev, err
}
avg, err = strconv.ParseFloat(data[1], 64)
if err != nil {
return min, avg, max, stddev, err
}
max, err = strconv.ParseFloat(data[2], 64)
if err != nil {
return min, avg, max, stddev, err
}
if len(data) == 4 {
stddev, err = strconv.ParseFloat(data[3], 64)
if err != nil {
return min, avg, max, stddev, err
}
}
return min, avg, max, stddev, err
return nil
}
func init() {
@@ -335,6 +343,7 @@ func init() {
Count: 1,
Timeout: 1.0,
Deadline: 10,
Method: "exec",
Binary: "ping",
Arguments: []string{},
}