Add tcp/udp check connection input plugin
This commit is contained in:
parent
85594cc92e
commit
c7c689d261
|
@ -24,6 +24,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/mysql"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/net_response"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/nsq"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/passenger"
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
# Example Input Plugin
|
||||
|
||||
The input plugin test UDP/TCP connections response time.
|
||||
It can also check response text.
|
||||
|
||||
### Configuration:
|
||||
|
||||
```
|
||||
# List of UDP/TCP connections you want to check
|
||||
[[inputs.net_response]]
|
||||
protocol = "tcp"
|
||||
# Server address (default IP localhost)
|
||||
address = "github.com:80"
|
||||
# Set timeout (default 1.0)
|
||||
timeout = 1.0
|
||||
# Set read timeout (default 1.0)
|
||||
read_timeout = 1.0
|
||||
# String sent to the server
|
||||
send = "ssh"
|
||||
# Expected string in answer
|
||||
expect = "ssh"
|
||||
|
||||
[[inputs.net_response]]
|
||||
protocol = "tcp"
|
||||
address = ":80"
|
||||
|
||||
[[inputs.net_response]]
|
||||
protocol = "udp"
|
||||
# Server address (default IP localhost)
|
||||
address = "github.com:80"
|
||||
# Set timeout (default 1.0)
|
||||
timeout = 1.0
|
||||
# Set read timeout (default 1.0)
|
||||
read_timeout = 1.0
|
||||
# String sent to the server
|
||||
send = "ssh"
|
||||
# Expected string in answer
|
||||
expect = "ssh"
|
||||
|
||||
[[inputs.net_response]]
|
||||
protocol = "udp"
|
||||
address = "localhost:161"
|
||||
timeout = 2.0
|
||||
```
|
||||
|
||||
### Measurements & Fields:
|
||||
|
||||
- net_response
|
||||
- response_time (float, seconds)
|
||||
- string_found (bool) # Only if "expected: option is set
|
||||
|
||||
### Tags:
|
||||
|
||||
- All measurements have the following tags:
|
||||
- host
|
||||
- port
|
||||
- protocol
|
||||
|
||||
### Example Output:
|
||||
|
||||
```
|
||||
$ ./telegraf -config telegraf.conf -input-filter net_response -test
|
||||
net_response,host=127.0.0.1,port=22,protocol=tcp response_time=0.18070360500000002,string_found=true 1454785464182527094
|
||||
net_response,host=127.0.0.1,port=2222,protocol=tcp response_time=1.090124776,string_found=false 1454784433658942325
|
||||
|
||||
```
|
|
@ -0,0 +1,198 @@
|
|||
package net_response
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"net"
|
||||
"net/textproto"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
// NetResponses struct
|
||||
type NetResponse struct {
|
||||
Address string
|
||||
Timeout float64
|
||||
ReadTimeout float64
|
||||
Send string
|
||||
Expect string
|
||||
Protocol string
|
||||
}
|
||||
|
||||
func (_ *NetResponse) Description() string {
|
||||
return "Ping given url(s) and return statistics"
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# Protocol
|
||||
protocol = "tcp"
|
||||
#protocol = "udp"
|
||||
# Server address (default IP localhost)
|
||||
address = "github.com:80"
|
||||
#address = ":80"
|
||||
# Set timeout (default 1.0)
|
||||
timeout = 1.0
|
||||
# Set read timeout (default 1.0)
|
||||
read_timeout = 1.0
|
||||
# String sent to the server
|
||||
send = "ssh"
|
||||
# Expected string in answer
|
||||
expect = "ssh"
|
||||
`
|
||||
|
||||
func (_ *NetResponse) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (t *NetResponse) TcpGather() (map[string]interface{}, error) {
|
||||
// Prepare fields
|
||||
fields := make(map[string]interface{})
|
||||
// Start Timer
|
||||
start := time.Now()
|
||||
// Resolving
|
||||
tcpAddr, err := net.ResolveTCPAddr("tcp", t.Address)
|
||||
// Connecting
|
||||
conn, err := net.DialTCP("tcp", nil, tcpAddr)
|
||||
// Stop timer
|
||||
responseTime := time.Since(start).Seconds()
|
||||
// Handle error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer conn.Close()
|
||||
// Send string if needed
|
||||
if t.Send != "" {
|
||||
msg := []byte(t.Send)
|
||||
conn.Write(msg)
|
||||
conn.CloseWrite()
|
||||
// Stop timer
|
||||
responseTime = time.Since(start).Seconds()
|
||||
}
|
||||
// Read string if needed
|
||||
if t.Expect != "" {
|
||||
// Set read timeout
|
||||
conn.SetReadDeadline(time.Now().Add(time.Duration(t.ReadTimeout) * time.Second))
|
||||
// Prepare reader
|
||||
reader := bufio.NewReader(conn)
|
||||
tp := textproto.NewReader(reader)
|
||||
// Read
|
||||
data, err := tp.ReadLine()
|
||||
// Stop timer
|
||||
responseTime = time.Since(start).Seconds()
|
||||
// Handle error
|
||||
if err != nil {
|
||||
fields["string_found"] = false
|
||||
} else {
|
||||
// Looking for string in answer
|
||||
RegEx := regexp.MustCompile(`.*` + t.Expect + `.*`)
|
||||
find := RegEx.FindString(string(data))
|
||||
if find != "" {
|
||||
fields["string_found"] = true
|
||||
} else {
|
||||
fields["string_found"] = false
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
fields["response_time"] = responseTime
|
||||
return fields, nil
|
||||
}
|
||||
|
||||
func (u *NetResponse) UdpGather() (map[string]interface{}, error) {
|
||||
// Prepare fields
|
||||
fields := make(map[string]interface{})
|
||||
// Start Timer
|
||||
start := time.Now()
|
||||
// Resolving
|
||||
udpAddr, err := net.ResolveUDPAddr("udp", u.Address)
|
||||
LocalAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
|
||||
// Connecting
|
||||
conn, err := net.DialUDP("udp", LocalAddr, udpAddr)
|
||||
defer conn.Close()
|
||||
// Handle error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Send string
|
||||
msg := []byte(u.Send)
|
||||
conn.Write(msg)
|
||||
// Read string
|
||||
// Set read timeout
|
||||
conn.SetReadDeadline(time.Now().Add(time.Duration(u.ReadTimeout) * time.Second))
|
||||
// Read
|
||||
buf := make([]byte, 1024)
|
||||
_, _, err = conn.ReadFromUDP(buf)
|
||||
// Stop timer
|
||||
responseTime := time.Since(start).Seconds()
|
||||
// Handle error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
// Looking for string in answer
|
||||
RegEx := regexp.MustCompile(`.*` + u.Expect + `.*`)
|
||||
find := RegEx.FindString(string(buf))
|
||||
if find != "" {
|
||||
fields["string_found"] = true
|
||||
} else {
|
||||
fields["string_found"] = false
|
||||
}
|
||||
}
|
||||
fields["response_time"] = responseTime
|
||||
return fields, nil
|
||||
}
|
||||
|
||||
func (c *NetResponse) Gather(acc telegraf.Accumulator) error {
|
||||
// Set default values
|
||||
if c.Timeout == 0 {
|
||||
c.Timeout = 1.0
|
||||
}
|
||||
if c.ReadTimeout == 0 {
|
||||
c.ReadTimeout = 1.0
|
||||
}
|
||||
// Check send and expected string
|
||||
if c.Protocol == "udp" && c.Send == "" {
|
||||
return errors.New("Send string cannot be empty")
|
||||
}
|
||||
if c.Protocol == "udp" && c.Expect == "" {
|
||||
return errors.New("Expected string cannot be empty")
|
||||
}
|
||||
// Prepare host and port
|
||||
host, port, err := net.SplitHostPort(c.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if host == "" {
|
||||
c.Address = "localhost:" + port
|
||||
}
|
||||
if port == "" {
|
||||
return errors.New("Bad port")
|
||||
}
|
||||
// Prepare data
|
||||
tags := map[string]string{"host": host, "port": port}
|
||||
var fields map[string]interface{}
|
||||
// Gather data
|
||||
if c.Protocol == "tcp" {
|
||||
fields, err = c.TcpGather()
|
||||
tags["protocol"] = "tcp"
|
||||
} else if c.Protocol == "udp" {
|
||||
fields, err = c.UdpGather()
|
||||
tags["protocol"] = "udp"
|
||||
} else {
|
||||
return errors.New("Bad protocol")
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Add metrics
|
||||
acc.AddFields("net_response", fields, tags)
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("net_response", func() telegraf.Input {
|
||||
return &NetResponse{}
|
||||
})
|
||||
}
|
|
@ -0,0 +1,198 @@
|
|||
package net_response
|
||||
|
||||
import (
|
||||
"net"
|
||||
"regexp"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBadProtocol(t *testing.T) {
|
||||
var acc testutil.Accumulator
|
||||
// Init plugin
|
||||
c := NetResponse{
|
||||
Protocol: "unknownprotocol",
|
||||
Address: ":9999",
|
||||
}
|
||||
// Error
|
||||
err1 := c.Gather(&acc)
|
||||
require.Error(t, err1)
|
||||
assert.Equal(t, "Bad protocol", err1.Error())
|
||||
}
|
||||
|
||||
func TestTCPError(t *testing.T) {
|
||||
var acc testutil.Accumulator
|
||||
// Init plugin
|
||||
c := NetResponse{
|
||||
Protocol: "tcp",
|
||||
Address: ":9999",
|
||||
}
|
||||
// Error
|
||||
err1 := c.Gather(&acc)
|
||||
require.Error(t, err1)
|
||||
assert.Equal(t, "dial tcp 127.0.0.1:9999: getsockopt: connection refused", err1.Error())
|
||||
}
|
||||
|
||||
func TestTCPOK1(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
var acc testutil.Accumulator
|
||||
// Init plugin
|
||||
c := NetResponse{
|
||||
Address: "127.0.0.1:2004",
|
||||
Send: "test",
|
||||
Expect: "test",
|
||||
ReadTimeout: 3.0,
|
||||
Timeout: 1.0,
|
||||
Protocol: "tcp",
|
||||
}
|
||||
// Start TCP server
|
||||
wg.Add(1)
|
||||
go TCPServer(t, &wg)
|
||||
wg.Wait()
|
||||
// Connect
|
||||
wg.Add(1)
|
||||
err1 := c.Gather(&acc)
|
||||
wg.Wait()
|
||||
// Override response time
|
||||
for _, p := range acc.Metrics {
|
||||
p.Fields["response_time"] = 1.0
|
||||
}
|
||||
require.NoError(t, err1)
|
||||
acc.AssertContainsTaggedFields(t,
|
||||
"net_response",
|
||||
map[string]interface{}{
|
||||
"string_found": true,
|
||||
"response_time": 1.0,
|
||||
},
|
||||
map[string]string{"host": "127.0.0.1",
|
||||
"port": "2004",
|
||||
"protocol": "tcp",
|
||||
},
|
||||
)
|
||||
// Waiting TCPserver
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestTCPOK2(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
var acc testutil.Accumulator
|
||||
// Init plugin
|
||||
c := NetResponse{
|
||||
Address: "127.0.0.1:2004",
|
||||
Send: "test",
|
||||
Expect: "test2",
|
||||
ReadTimeout: 3.0,
|
||||
Timeout: 1.0,
|
||||
Protocol: "tcp",
|
||||
}
|
||||
// Start TCP server
|
||||
wg.Add(1)
|
||||
go TCPServer(t, &wg)
|
||||
wg.Wait()
|
||||
// Connect
|
||||
wg.Add(1)
|
||||
err1 := c.Gather(&acc)
|
||||
wg.Wait()
|
||||
// Override response time
|
||||
for _, p := range acc.Metrics {
|
||||
p.Fields["response_time"] = 1.0
|
||||
}
|
||||
require.NoError(t, err1)
|
||||
acc.AssertContainsTaggedFields(t,
|
||||
"net_response",
|
||||
map[string]interface{}{
|
||||
"string_found": false,
|
||||
"response_time": 1.0,
|
||||
},
|
||||
map[string]string{"host": "127.0.0.1",
|
||||
"port": "2004",
|
||||
"protocol": "tcp",
|
||||
},
|
||||
)
|
||||
// Waiting TCPserver
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestUDPrror(t *testing.T) {
|
||||
var acc testutil.Accumulator
|
||||
// Init plugin
|
||||
c := NetResponse{
|
||||
Address: ":9999",
|
||||
Send: "test",
|
||||
Expect: "test",
|
||||
Protocol: "udp",
|
||||
}
|
||||
// Error
|
||||
err1 := c.Gather(&acc)
|
||||
require.Error(t, err1)
|
||||
assert.Regexp(t, regexp.MustCompile(`read udp 127.0.0.1:[0-9]*->127.0.0.1:9999: recvfrom: connection refused`), err1.Error())
|
||||
}
|
||||
|
||||
func TestUDPOK1(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
var acc testutil.Accumulator
|
||||
// Init plugin
|
||||
c := NetResponse{
|
||||
Address: "127.0.0.1:2004",
|
||||
Send: "test",
|
||||
Expect: "test",
|
||||
ReadTimeout: 3.0,
|
||||
Timeout: 1.0,
|
||||
Protocol: "udp",
|
||||
}
|
||||
// Start UDP server
|
||||
wg.Add(1)
|
||||
go UDPServer(t, &wg)
|
||||
wg.Wait()
|
||||
// Connect
|
||||
wg.Add(1)
|
||||
err1 := c.Gather(&acc)
|
||||
wg.Wait()
|
||||
// Override response time
|
||||
for _, p := range acc.Metrics {
|
||||
p.Fields["response_time"] = 1.0
|
||||
}
|
||||
require.NoError(t, err1)
|
||||
acc.AssertContainsTaggedFields(t,
|
||||
"net_response",
|
||||
map[string]interface{}{
|
||||
"string_found": true,
|
||||
"response_time": 1.0,
|
||||
},
|
||||
map[string]string{"host": "127.0.0.1",
|
||||
"port": "2004",
|
||||
"protocol": "udp",
|
||||
},
|
||||
)
|
||||
// Waiting TCPserver
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func UDPServer(t *testing.T, wg *sync.WaitGroup) {
|
||||
udpAddr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:2004")
|
||||
conn, _ := net.ListenUDP("udp", udpAddr)
|
||||
wg.Done()
|
||||
buf := make([]byte, 1024)
|
||||
_, remoteaddr, _ := conn.ReadFromUDP(buf)
|
||||
conn.WriteToUDP(buf, remoteaddr)
|
||||
conn.Close()
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
func TCPServer(t *testing.T, wg *sync.WaitGroup) {
|
||||
tcpAddr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:2004")
|
||||
tcpServer, _ := net.ListenTCP("tcp", tcpAddr)
|
||||
wg.Done()
|
||||
conn, _ := tcpServer.AcceptTCP()
|
||||
buf := make([]byte, 1024)
|
||||
conn.Read(buf)
|
||||
conn.Write(buf)
|
||||
conn.CloseWrite()
|
||||
tcpServer.Close()
|
||||
wg.Done()
|
||||
}
|
Loading…
Reference in New Issue