telegraf/plugins/inputs/beanstalkd/beanstalkd_test.go

333 lines
7.5 KiB
Go

package beanstalkd_test
import (
"io"
"net"
"net/textproto"
"testing"
"github.com/influxdata/telegraf/plugins/inputs/beanstalkd"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestBeanstalkd(t *testing.T) {
type tubeStats struct {
name string
fields map[string]interface{}
}
tests := []struct {
name string
tubesConfig []string
expectedTubes []tubeStats
notExpectedTubes []tubeStats
}{
{
name: "All tubes stats",
tubesConfig: []string{},
expectedTubes: []tubeStats{
{name: "default", fields: defaultTubeFields},
{name: "test", fields: testTubeFields},
},
notExpectedTubes: []tubeStats{},
},
{
name: "Specified tubes stats",
tubesConfig: []string{"test"},
expectedTubes: []tubeStats{
{name: "test", fields: testTubeFields},
},
notExpectedTubes: []tubeStats{
{name: "default", fields: defaultTubeFields},
},
},
{
name: "Unknown tube stats",
tubesConfig: []string{"unknown"},
expectedTubes: []tubeStats{},
notExpectedTubes: []tubeStats{
{name: "default", fields: defaultTubeFields},
{name: "test", fields: testTubeFields},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
server, err := startTestServer(t)
if err != nil {
t.Fatalf("Unable to create test server")
}
defer server.Close()
serverAddress := server.Addr().String()
plugin := beanstalkd.Beanstalkd{
Server: serverAddress,
Tubes: test.tubesConfig,
}
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))
acc.AssertContainsTaggedFields(t, "beanstalkd_overview",
overviewFields,
getOverviewTags(serverAddress),
)
for _, expectedTube := range test.expectedTubes {
acc.AssertContainsTaggedFields(t, "beanstalkd_tube",
expectedTube.fields,
getTubeTags(serverAddress, expectedTube.name),
)
}
for _, notExpectedTube := range test.notExpectedTubes {
acc.AssertDoesNotContainsTaggedFields(t, "beanstalkd_tube",
notExpectedTube.fields,
getTubeTags(serverAddress, notExpectedTube.name),
)
}
})
}
}
func startTestServer(t *testing.T) (net.Listener, error) {
server, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, err
}
go func() {
defer server.Close()
connection, err := server.Accept()
if err != nil {
t.Log("Test server: failed to accept connection. Error: ", err)
return
}
tp := textproto.NewConn(connection)
defer tp.Close()
sendSuccessResponse := func(body string) {
tp.PrintfLine("OK %d\r\n%s", len(body), body)
}
for {
cmd, err := tp.ReadLine()
if err == io.EOF {
return
} else if err != nil {
t.Log("Test server: failed read command. Error: ", err)
return
}
switch cmd {
case "list-tubes":
sendSuccessResponse(listTubesResponse)
case "stats":
sendSuccessResponse(statsResponse)
case "stats-tube default":
sendSuccessResponse(statsTubeDefaultResponse)
case "stats-tube test":
sendSuccessResponse(statsTubeTestResponse)
case "stats-tube unknown":
tp.PrintfLine("NOT_FOUND")
default:
t.Log("Test server: unknown command")
}
}
}()
return server, nil
}
const (
listTubesResponse = `---
- default
- test
`
statsResponse = `---
current-jobs-urgent: 5
current-jobs-ready: 5
current-jobs-reserved: 0
current-jobs-delayed: 1
current-jobs-buried: 0
cmd-put: 6
cmd-peek: 0
cmd-peek-ready: 1
cmd-peek-delayed: 0
cmd-peek-buried: 0
cmd-reserve: 0
cmd-reserve-with-timeout: 1
cmd-delete: 1
cmd-release: 0
cmd-use: 2
cmd-watch: 0
cmd-ignore: 0
cmd-bury: 1
cmd-kick: 1
cmd-touch: 0
cmd-stats: 1
cmd-stats-job: 0
cmd-stats-tube: 2
cmd-list-tubes: 1
cmd-list-tube-used: 0
cmd-list-tubes-watched: 0
cmd-pause-tube: 0
job-timeouts: 0
total-jobs: 6
max-job-size: 65535
current-tubes: 2
current-connections: 2
current-producers: 1
current-workers: 1
current-waiting: 0
total-connections: 2
pid: 6
version: 1.10
rusage-utime: 0.000000
rusage-stime: 0.000000
uptime: 20
binlog-oldest-index: 0
binlog-current-index: 0
binlog-records-migrated: 0
binlog-records-written: 0
binlog-max-size: 10485760
id: bba7546657efdd4c
hostname: 2873efd3e88c
`
statsTubeDefaultResponse = `---
name: default
current-jobs-urgent: 0
current-jobs-ready: 0
current-jobs-reserved: 0
current-jobs-delayed: 0
current-jobs-buried: 0
total-jobs: 0
current-using: 2
current-watching: 2
current-waiting: 0
cmd-delete: 0
cmd-pause-tube: 0
pause: 0
pause-time-left: 0
`
statsTubeTestResponse = `---
name: test
current-jobs-urgent: 5
current-jobs-ready: 5
current-jobs-reserved: 0
current-jobs-delayed: 1
current-jobs-buried: 0
total-jobs: 6
current-using: 0
current-watching: 0
current-waiting: 0
cmd-delete: 0
cmd-pause-tube: 0
pause: 0
pause-time-left: 0
`
)
var (
// Default tube without stats
defaultTubeFields = map[string]interface{}{
"cmd_delete": 0,
"cmd_pause_tube": 0,
"current_jobs_buried": 0,
"current_jobs_delayed": 0,
"current_jobs_ready": 0,
"current_jobs_reserved": 0,
"current_jobs_urgent": 0,
"current_using": 2,
"current_waiting": 0,
"current_watching": 2,
"pause": 0,
"pause_time_left": 0,
"total_jobs": 0,
}
// Test tube with stats
testTubeFields = map[string]interface{}{
"cmd_delete": 0,
"cmd_pause_tube": 0,
"current_jobs_buried": 0,
"current_jobs_delayed": 1,
"current_jobs_ready": 5,
"current_jobs_reserved": 0,
"current_jobs_urgent": 5,
"current_using": 0,
"current_waiting": 0,
"current_watching": 0,
"pause": 0,
"pause_time_left": 0,
"total_jobs": 6,
}
// Server stats
overviewFields = map[string]interface{}{
"binlog_current_index": 0,
"binlog_max_size": 10485760,
"binlog_oldest_index": 0,
"binlog_records_migrated": 0,
"binlog_records_written": 0,
"cmd_bury": 1,
"cmd_delete": 1,
"cmd_ignore": 0,
"cmd_kick": 1,
"cmd_list_tube_used": 0,
"cmd_list_tubes": 1,
"cmd_list_tubes_watched": 0,
"cmd_pause_tube": 0,
"cmd_peek": 0,
"cmd_peek_buried": 0,
"cmd_peek_delayed": 0,
"cmd_peek_ready": 1,
"cmd_put": 6,
"cmd_release": 0,
"cmd_reserve": 0,
"cmd_reserve_with_timeout": 1,
"cmd_stats": 1,
"cmd_stats_job": 0,
"cmd_stats_tube": 2,
"cmd_touch": 0,
"cmd_use": 2,
"cmd_watch": 0,
"current_connections": 2,
"current_jobs_buried": 0,
"current_jobs_delayed": 1,
"current_jobs_ready": 5,
"current_jobs_reserved": 0,
"current_jobs_urgent": 5,
"current_producers": 1,
"current_tubes": 2,
"current_waiting": 0,
"current_workers": 1,
"job_timeouts": 0,
"max_job_size": 65535,
"pid": 6,
"rusage_stime": 0.0,
"rusage_utime": 0.0,
"total_connections": 2,
"total_jobs": 6,
"uptime": 20,
}
)
func getOverviewTags(server string) map[string]string {
return map[string]string{
"hostname": "2873efd3e88c",
"id": "bba7546657efdd4c",
"server": server,
"version": "1.10",
}
}
func getTubeTags(server string, tube string) map[string]string {
return map[string]string{
"name": tube,
"server": server,
}
}