From 1f7a5d7eed120c2aeaa808dd9e96355fbc236de7 Mon Sep 17 00:00:00 2001 From: Thibault Cohen Date: Tue, 26 Jan 2016 19:12:54 -0500 Subject: [PATCH] Add tcp/udp check connection plugin --- plugins/inputs/all/all.go | 1 + plugins/inputs/connection/connection.go | 285 +++++++++++++++++++ plugins/inputs/connection/connection_test.go | 186 ++++++++++++ 3 files changed, 472 insertions(+) create mode 100644 plugins/inputs/connection/connection.go create mode 100644 plugins/inputs/connection/connection_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index e9ad49f26..1688f9713 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -4,6 +4,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/aerospike" _ "github.com/influxdata/telegraf/plugins/inputs/apache" _ "github.com/influxdata/telegraf/plugins/inputs/bcache" + _ "github.com/influxdata/telegraf/plugins/inputs/connection" _ "github.com/influxdata/telegraf/plugins/inputs/disque" _ "github.com/influxdata/telegraf/plugins/inputs/docker" _ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch" diff --git a/plugins/inputs/connection/connection.go b/plugins/inputs/connection/connection.go new file mode 100644 index 000000000..92f1a5734 --- /dev/null +++ b/plugins/inputs/connection/connection.go @@ -0,0 +1,285 @@ +package connection + +import ( + "bufio" + "errors" + "net" + "net/textproto" + "regexp" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf/plugins/inputs" +) + +// Connections struct +type Connection struct { + Tcps []Tcp `toml:"tcp"` + Udps []Udp `toml:"udp"` +} + +// Tcp connection struct +type Tcp struct { + Address string + Timeout float64 + ReadTimeout float64 + Send string + Expect string +} + +// Udp connection struct +type Udp struct { + Address string + Timeout float64 + ReadTimeout float64 + Send string + Expect string +} + +func (_ *Connection) Description() string { + return "Ping given url(s) and return statistics" +} + +var sampleConfig = ` + [[inputs.connection.tcp]] + // Server address (default IP localhost) + address = "github.com:80" + // Set timeout (default 1.0) + timeout = 1.0 + // Set read timeout (default 1.0) + read_timeout = 1.0 + // String sent to the server + send = "ssh" + // Expected string in answer + expect = "ssh" + + [[inputs.connection.tcp]] + address = ":80" + + [[inputs.connection.udp]] + // Server address (default IP localhost) + address = "github.com:80" + // Set timeout (default 1.0) + timeout = 1.0 + // Set read timeout (default 1.0) + read_timeout = 1.0 + // String sent to the server + send = "ssh" + // Expected string in answer + expect = "ssh" + + [[inputs.connection.udp]] + address = "localhost:161" + timeout = 2.0 +` + +func (_ *Connection) SampleConfig() string { + return sampleConfig +} + +func (t *Tcp) Gather() (map[string]interface{}, error) { + // Prepare fields + fields := make(map[string]interface{}) + // Start Timer + start := time.Now() + // Resolving + tcpAddr, err := net.ResolveTCPAddr("tcp", t.Address) + // Connecting + conn, err := net.DialTCP("tcp", nil, tcpAddr) + // Stop timer + responseTime := time.Since(start).Seconds() + // Handle error + if err != nil { + return nil, err + } + defer conn.Close() + // Send string if needed + if t.Send != "" { + msg := []byte(t.Send) + conn.Write(msg) + conn.CloseWrite() + // Stop timer + responseTime = time.Since(start).Seconds() + } + // Read string if needed + if t.Expect != "" { + // Set read timeout + conn.SetReadDeadline(time.Now().Add(time.Duration(t.ReadTimeout) * time.Second)) + // Prepare reader + reader := bufio.NewReader(conn) + tp := textproto.NewReader(reader) + // Read + data, err := tp.ReadLine() + // Stop timer + responseTime = time.Since(start).Seconds() + // Handle error + if err != nil { + fields["string_found"] = false + } else { + // Looking for string in answer + RegEx := regexp.MustCompile(`.*` + t.Expect + `.*`) + find := RegEx.FindString(string(data)) + if find != "" { + fields["string_found"] = true + } else { + fields["string_found"] = false + } + } + + } + fields["response_time"] = responseTime + return fields, nil +} + +func (u *Udp) Gather() (map[string]interface{}, error) { + // Prepare fields + fields := make(map[string]interface{}) + // Start Timer + start := time.Now() + // Resolving + udpAddr, err := net.ResolveUDPAddr("udp", u.Address) + LocalAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + // Connecting + conn, err := net.DialUDP("udp", LocalAddr, udpAddr) + defer conn.Close() + // Handle error + if err != nil { + return nil, err + } + // Send string + msg := []byte(u.Send) + conn.Write(msg) + // Read string + // Set read timeout + conn.SetReadDeadline(time.Now().Add(time.Duration(u.ReadTimeout) * time.Second)) + // Read + buf := make([]byte, 1024) + _, _, err = conn.ReadFromUDP(buf) + // Stop timer + responseTime := time.Since(start).Seconds() + // Handle error + if err != nil { + return nil, err + } else { + // Looking for string in answer + RegEx := regexp.MustCompile(`.*` + u.Expect + `.*`) + find := RegEx.FindString(string(buf)) + if find != "" { + fields["string_found"] = true + } else { + fields["string_found"] = false + } + } + fields["response_time"] = responseTime + return fields, nil +} + +func (c *Connection) Gather(acc inputs.Accumulator) error { + + var wg sync.WaitGroup + errorChannel := make(chan error, (len(c.Tcps)+len(c.Udps))*2) + + // Spin off a go routine for each TCP + for _, tcp := range c.Tcps { + wg.Add(1) + go func(tcp Tcp, acc inputs.Accumulator) { + defer wg.Done() + // Set default Tcp values + if tcp.Timeout == 0 { + tcp.Timeout = 1.0 + } + if tcp.ReadTimeout == 0 { + tcp.ReadTimeout = 1.0 + } + // Prepare host and port + host, port, err := net.SplitHostPort(tcp.Address) + if err != nil { + errorChannel <- err + return + } + if host == "" { + tcp.Address = "localhost:" + port + } + if port == "" { + errorChannel <- errors.New("Bad port") + return + } + // Gather data + fields, err := tcp.Gather() + if err != nil { + errorChannel <- err + return + } + tags := map[string]string{"server": tcp.Address} + // Add metrics + acc.AddFields("tcp_connection", fields, tags) + }(tcp, acc) + } + + // Spin off a go routine for each UDP + for _, udp := range c.Udps { + wg.Add(1) + go func(udp Udp, acc inputs.Accumulator) { + defer wg.Done() + // Check send and expected string + if udp.Send == "" { + errorChannel <- errors.New("Send string cannot be empty") + return + } + if udp.Expect == "" { + errorChannel <- errors.New("Expected string cannot be empty") + return + } + // Set default Tcp values + if udp.Timeout == 0 { + udp.Timeout = 1.0 + } + if udp.ReadTimeout == 0 { + udp.ReadTimeout = 1.0 + } + // Prepare host and port + host, port, err := net.SplitHostPort(udp.Address) + if err != nil { + errorChannel <- err + return + } + if host == "" { + udp.Address = "localhost:" + port + } + if port == "" { + errorChannel <- errors.New("Bad port") + return + } + // Gather data + fields, err := udp.Gather() + if err != nil { + errorChannel <- err + return + } + tags := map[string]string{"server": udp.Address} + // Add metrics + acc.AddFields("udp_connection", fields, tags) + }(udp, acc) + } + + wg.Wait() + close(errorChannel) + + // Get all errors and return them as one giant error + errorStrings := []string{} + for err := range errorChannel { + errorStrings = append(errorStrings, err.Error()) + } + + if len(errorStrings) == 0 { + return nil + } + return errors.New(strings.Join(errorStrings, "\n")) +} + +func init() { + inputs.Add("connection", func() inputs.Input { + return &Connection{} + }) +} diff --git a/plugins/inputs/connection/connection_test.go b/plugins/inputs/connection/connection_test.go new file mode 100644 index 000000000..2cb5da8cf --- /dev/null +++ b/plugins/inputs/connection/connection_test.go @@ -0,0 +1,186 @@ +package connection + +import ( + "net" + "regexp" + "sync" + "testing" + + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTCPError(t *testing.T) { + var acc testutil.Accumulator + // Init plugin + tcp1 := Tcp{ + Address: ":9999", + } + c := Connection{ + Tcps: []Tcp{tcp1}, + } + // Error + err1 := c.Gather(&acc) + require.Error(t, err1) + assert.Equal(t, "dial tcp 127.0.0.1:9999: getsockopt: connection refused", err1.Error()) +} + +func TestTCPOK1(t *testing.T) { + var wg sync.WaitGroup + var acc testutil.Accumulator + // Init plugin + tcp1 := Tcp{ + Address: "127.0.0.1:2004", + Send: "test", + Expect: "test", + ReadTimeout: 3.0, + Timeout: 1.0, + } + c := Connection{ + Tcps: []Tcp{tcp1}, + } + // Start TCP server + wg.Add(1) + go TCPServer(t, &wg) + wg.Wait() + // Connect + wg.Add(1) + err1 := c.Gather(&acc) + wg.Wait() + // Override response time + for _, p := range acc.Points { + p.Fields["response_time"] = 1.0 + } + require.NoError(t, err1) + acc.AssertContainsTaggedFields(t, + "tcp_connection", + map[string]interface{}{ + "string_found": true, + "response_time": 1.0, + }, + map[string]string{"server": tcp1.Address}, + ) + // Waiting TCPserver + wg.Wait() +} + +func TestTCPOK2(t *testing.T) { + var wg sync.WaitGroup + var acc testutil.Accumulator + // Init plugin + tcp1 := Tcp{ + Address: "127.0.0.1:2004", + Send: "test", + Expect: "test2", + ReadTimeout: 3.0, + Timeout: 1.0, + } + c := Connection{ + Tcps: []Tcp{tcp1}, + } + // Start TCP server + wg.Add(1) + go TCPServer(t, &wg) + wg.Wait() + // Connect + wg.Add(1) + err1 := c.Gather(&acc) + wg.Wait() + // Override response time + for _, p := range acc.Points { + p.Fields["response_time"] = 1.0 + } + require.NoError(t, err1) + acc.AssertContainsTaggedFields(t, + "tcp_connection", + map[string]interface{}{ + "string_found": false, + "response_time": 1.0, + }, + map[string]string{"server": tcp1.Address}, + ) + // Waiting TCPserver + wg.Wait() +} + +func TestUDPrror(t *testing.T) { + var acc testutil.Accumulator + // Init plugin + udp1 := Udp{ + Address: ":9999", + Send: "test", + Expect: "test", + } + c := Connection{ + Udps: []Udp{udp1}, + } + // Error + err1 := c.Gather(&acc) + require.Error(t, err1) + assert.Regexp(t, regexp.MustCompile(`read udp 127.0.0.1:[0-9]*->127.0.0.1:9999: recvfrom: connection refused`), err1.Error()) +} + +func TestUDPOK1(t *testing.T) { + var wg sync.WaitGroup + var acc testutil.Accumulator + // Init plugin + udp1 := Udp{ + Address: "127.0.0.1:2004", + Send: "test", + Expect: "test", + ReadTimeout: 3.0, + Timeout: 1.0, + } + c := Connection{ + Udps: []Udp{udp1}, + } + // Start UDP server + wg.Add(1) + go UDPServer(t, &wg) + wg.Wait() + // Connect + wg.Add(1) + err1 := c.Gather(&acc) + wg.Wait() + // Override response time + for _, p := range acc.Points { + p.Fields["response_time"] = 1.0 + } + require.NoError(t, err1) + acc.AssertContainsTaggedFields(t, + "udp_connection", + map[string]interface{}{ + "string_found": true, + "response_time": 1.0, + }, + map[string]string{"server": udp1.Address}, + ) + // Waiting TCPserver + wg.Wait() +} + +func UDPServer(t *testing.T, wg *sync.WaitGroup) { + udpAddr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:2004") + conn, _ := net.ListenUDP("udp", udpAddr) + wg.Done() + buf := make([]byte, 1024) + _, remoteaddr, _ := conn.ReadFromUDP(buf) + conn.WriteToUDP(buf, remoteaddr) + conn.Close() + wg.Done() +} + +func TCPServer(t *testing.T, wg *sync.WaitGroup) { + tcpAddr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:2004") + tcpServer, _ := net.ListenTCP("tcp", tcpAddr) + wg.Done() + conn, _ := tcpServer.AcceptTCP() + buf := make([]byte, 1024) + conn.Read(buf) + conn.Write(buf) + conn.CloseWrite() + tcpServer.Close() + wg.Done() +}