Add Beanstalkd input plugin (#4272)
This commit is contained in:
parent
710c101fe0
commit
69100f60b8
|
@ -7,6 +7,7 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
|
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
|
_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
|
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/inputs/beanstalkd"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/bond"
|
_ "github.com/influxdata/telegraf/plugins/inputs/bond"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/burrow"
|
_ "github.com/influxdata/telegraf/plugins/inputs/burrow"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
|
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
|
||||||
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
# Beanstalkd Input Plugin
|
||||||
|
|
||||||
|
The `beanstalkd` plugin collects server stats as well as tube stats (reported by `stats` and `stats-tube` commands respectively).
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[inputs.beanstalkd]]
|
||||||
|
## Server to collect data from
|
||||||
|
server = "localhost:11300"
|
||||||
|
|
||||||
|
## List of tubes to gather stats about.
|
||||||
|
## If no tubes specified then data gathered for each tube on server reported by list-tubes command
|
||||||
|
tubes = ["notifications"]
|
||||||
|
```
|
||||||
|
|
||||||
|
### Metrics:
|
||||||
|
|
||||||
|
Please see the [Beanstalk Protocol doc](https://raw.githubusercontent.com/kr/beanstalkd/master/doc/protocol.txt) for detailed explanation of `stats` and `stats-tube` commands output.
|
||||||
|
|
||||||
|
`beanstalkd_overview` – statistical information about the system as a whole
|
||||||
|
- fields
|
||||||
|
- cmd_delete
|
||||||
|
- cmd_pause_tube
|
||||||
|
- current_jobs_buried
|
||||||
|
- current_jobs_delayed
|
||||||
|
- current_jobs_ready
|
||||||
|
- current_jobs_reserved
|
||||||
|
- current_jobs_urgent
|
||||||
|
- current_using
|
||||||
|
- current_waiting
|
||||||
|
- current_watching
|
||||||
|
- pause
|
||||||
|
- pause_time_left
|
||||||
|
- total_jobs
|
||||||
|
- tags
|
||||||
|
- name
|
||||||
|
- server (address taken from config)
|
||||||
|
|
||||||
|
`beanstalkd_tube` – statistical information about the specified tube
|
||||||
|
- fields
|
||||||
|
- binlog_current_index
|
||||||
|
- binlog_max_size
|
||||||
|
- binlog_oldest_index
|
||||||
|
- binlog_records_migrated
|
||||||
|
- binlog_records_written
|
||||||
|
- cmd_bury
|
||||||
|
- cmd_delete
|
||||||
|
- cmd_ignore
|
||||||
|
- cmd_kick
|
||||||
|
- cmd_list_tube_used
|
||||||
|
- cmd_list_tubes
|
||||||
|
- cmd_list_tubes_watched
|
||||||
|
- cmd_pause_tube
|
||||||
|
- cmd_peek
|
||||||
|
- cmd_peek_buried
|
||||||
|
- cmd_peek_delayed
|
||||||
|
- cmd_peek_ready
|
||||||
|
- cmd_put
|
||||||
|
- cmd_release
|
||||||
|
- cmd_reserve
|
||||||
|
- cmd_reserve_with_timeout
|
||||||
|
- cmd_stats
|
||||||
|
- cmd_stats_job
|
||||||
|
- cmd_stats_tube
|
||||||
|
- cmd_touch
|
||||||
|
- cmd_use
|
||||||
|
- cmd_watch
|
||||||
|
- current_connections
|
||||||
|
- current_jobs_buried
|
||||||
|
- current_jobs_delayed
|
||||||
|
- current_jobs_ready
|
||||||
|
- current_jobs_reserved
|
||||||
|
- current_jobs_urgent
|
||||||
|
- current_producers
|
||||||
|
- current_tubes
|
||||||
|
- current_waiting
|
||||||
|
- current_workers
|
||||||
|
- job_timeouts
|
||||||
|
- max_job_size
|
||||||
|
- pid
|
||||||
|
- rusage_stime
|
||||||
|
- rusage_utime
|
||||||
|
- total_connections
|
||||||
|
- total_jobs
|
||||||
|
- uptime
|
||||||
|
- tags
|
||||||
|
- hostname
|
||||||
|
- id
|
||||||
|
- server (address taken from config)
|
||||||
|
- version
|
||||||
|
|
||||||
|
### Example Output:
|
||||||
|
```
|
||||||
|
beanstalkd_overview,host=server.local,hostname=a2ab22ed12e0,id=232485800aa11b24,server=localhost:11300,version=1.10 cmd_stats_tube=29482i,current_jobs_delayed=0i,current_jobs_urgent=6i,cmd_kick=0i,cmd_stats=7378i,cmd_stats_job=0i,current_waiting=0i,max_job_size=65535i,pid=6i,cmd_bury=0i,cmd_reserve_with_timeout=0i,cmd_touch=0i,current_connections=1i,current_jobs_ready=6i,current_producers=0i,cmd_delete=0i,cmd_list_tubes=7369i,cmd_peek_ready=0i,cmd_put=6i,cmd_use=3i,cmd_watch=0i,current_jobs_reserved=0i,rusage_stime=6.07,cmd_list_tubes_watched=0i,cmd_pause_tube=0i,total_jobs=6i,binlog_records_migrated=0i,cmd_list_tube_used=0i,cmd_peek_delayed=0i,cmd_release=0i,current_jobs_buried=0i,job_timeouts=0i,binlog_current_index=0i,binlog_max_size=10485760i,total_connections=7378i,cmd_peek_buried=0i,cmd_reserve=0i,current_tubes=4i,binlog_records_written=0i,cmd_peek=0i,rusage_utime=1.13,uptime=7099i,binlog_oldest_index=0i,current_workers=0i,cmd_ignore=0i 1528801650000000000
|
||||||
|
|
||||||
|
beanstalkd_tube,host=server.local,name=notifications,server=localhost:11300 pause_time_left=0i,current_jobs_buried=0i,current_jobs_delayed=0i,current_jobs_reserved=0i,current_using=0i,current_waiting=0i,pause=0i,total_jobs=3i,cmd_delete=0i,cmd_pause_tube=0i,current_jobs_ready=3i,current_jobs_urgent=3i,current_watching=0i 1528801650000000000
|
||||||
|
```
|
|
@ -0,0 +1,270 @@
|
||||||
|
package beanstalkd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/textproto"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
"gopkg.in/yaml.v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
const sampleConfig = `
|
||||||
|
## Server to collect data from
|
||||||
|
server = "localhost:11300"
|
||||||
|
|
||||||
|
## List of tubes to gather stats about.
|
||||||
|
## If no tubes specified then data gathered for each tube on server reported by list-tubes command
|
||||||
|
tubes = ["notifications"]
|
||||||
|
`
|
||||||
|
|
||||||
|
type Beanstalkd struct {
|
||||||
|
Server string `toml:"server"`
|
||||||
|
Tubes []string `toml:"tubes"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Beanstalkd) Description() string {
|
||||||
|
return "Collects Beanstalkd server and tubes stats"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Beanstalkd) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Beanstalkd) Gather(acc telegraf.Accumulator) error {
|
||||||
|
connection, err := textproto.Dial("tcp", b.Server)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer connection.Close()
|
||||||
|
|
||||||
|
tubes := b.Tubes
|
||||||
|
if len(tubes) == 0 {
|
||||||
|
err = runQuery(connection, "list-tubes", &tubes)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
err := b.gatherServerStats(connection, acc)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
for _, tube := range tubes {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(tube string) {
|
||||||
|
b.gatherTubeStats(connection, tube, acc)
|
||||||
|
wg.Done()
|
||||||
|
}(tube)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Beanstalkd) gatherServerStats(connection *textproto.Conn, acc telegraf.Accumulator) error {
|
||||||
|
stats := new(statsResponse)
|
||||||
|
if err := runQuery(connection, "stats", stats); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AddFields("beanstalkd_overview",
|
||||||
|
map[string]interface{}{
|
||||||
|
"binlog_current_index": stats.BinlogCurrentIndex,
|
||||||
|
"binlog_max_size": stats.BinlogMaxSize,
|
||||||
|
"binlog_oldest_index": stats.BinlogOldestIndex,
|
||||||
|
"binlog_records_migrated": stats.BinlogRecordsMigrated,
|
||||||
|
"binlog_records_written": stats.BinlogRecordsWritten,
|
||||||
|
"cmd_bury": stats.CmdBury,
|
||||||
|
"cmd_delete": stats.CmdDelete,
|
||||||
|
"cmd_ignore": stats.CmdIgnore,
|
||||||
|
"cmd_kick": stats.CmdKick,
|
||||||
|
"cmd_list_tube_used": stats.CmdListTubeUsed,
|
||||||
|
"cmd_list_tubes": stats.CmdListTubes,
|
||||||
|
"cmd_list_tubes_watched": stats.CmdListTubesWatched,
|
||||||
|
"cmd_pause_tube": stats.CmdPauseTube,
|
||||||
|
"cmd_peek": stats.CmdPeek,
|
||||||
|
"cmd_peek_buried": stats.CmdPeekBuried,
|
||||||
|
"cmd_peek_delayed": stats.CmdPeekDelayed,
|
||||||
|
"cmd_peek_ready": stats.CmdPeekReady,
|
||||||
|
"cmd_put": stats.CmdPut,
|
||||||
|
"cmd_release": stats.CmdRelease,
|
||||||
|
"cmd_reserve": stats.CmdReserve,
|
||||||
|
"cmd_reserve_with_timeout": stats.CmdReserveWithTimeout,
|
||||||
|
"cmd_stats": stats.CmdStats,
|
||||||
|
"cmd_stats_job": stats.CmdStatsJob,
|
||||||
|
"cmd_stats_tube": stats.CmdStatsTube,
|
||||||
|
"cmd_touch": stats.CmdTouch,
|
||||||
|
"cmd_use": stats.CmdUse,
|
||||||
|
"cmd_watch": stats.CmdWatch,
|
||||||
|
"current_connections": stats.CurrentConnections,
|
||||||
|
"current_jobs_buried": stats.CurrentJobsBuried,
|
||||||
|
"current_jobs_delayed": stats.CurrentJobsDelayed,
|
||||||
|
"current_jobs_ready": stats.CurrentJobsReady,
|
||||||
|
"current_jobs_reserved": stats.CurrentJobsReserved,
|
||||||
|
"current_jobs_urgent": stats.CurrentJobsUrgent,
|
||||||
|
"current_producers": stats.CurrentProducers,
|
||||||
|
"current_tubes": stats.CurrentTubes,
|
||||||
|
"current_waiting": stats.CurrentWaiting,
|
||||||
|
"current_workers": stats.CurrentWorkers,
|
||||||
|
"job_timeouts": stats.JobTimeouts,
|
||||||
|
"max_job_size": stats.MaxJobSize,
|
||||||
|
"pid": stats.Pid,
|
||||||
|
"rusage_stime": stats.RusageStime,
|
||||||
|
"rusage_utime": stats.RusageUtime,
|
||||||
|
"total_connections": stats.TotalConnections,
|
||||||
|
"total_jobs": stats.TotalJobs,
|
||||||
|
"uptime": stats.Uptime,
|
||||||
|
},
|
||||||
|
map[string]string{
|
||||||
|
"hostname": stats.Hostname,
|
||||||
|
"id": stats.Id,
|
||||||
|
"server": b.Server,
|
||||||
|
"version": stats.Version,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Beanstalkd) gatherTubeStats(connection *textproto.Conn, tube string, acc telegraf.Accumulator) error {
|
||||||
|
stats := new(statsTubeResponse)
|
||||||
|
if err := runQuery(connection, "stats-tube "+tube, stats); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AddFields("beanstalkd_tube",
|
||||||
|
map[string]interface{}{
|
||||||
|
"cmd_delete": stats.CmdDelete,
|
||||||
|
"cmd_pause_tube": stats.CmdPauseTube,
|
||||||
|
"current_jobs_buried": stats.CurrentJobsBuried,
|
||||||
|
"current_jobs_delayed": stats.CurrentJobsDelayed,
|
||||||
|
"current_jobs_ready": stats.CurrentJobsReady,
|
||||||
|
"current_jobs_reserved": stats.CurrentJobsReserved,
|
||||||
|
"current_jobs_urgent": stats.CurrentJobsUrgent,
|
||||||
|
"current_using": stats.CurrentUsing,
|
||||||
|
"current_waiting": stats.CurrentWaiting,
|
||||||
|
"current_watching": stats.CurrentWatching,
|
||||||
|
"pause": stats.Pause,
|
||||||
|
"pause_time_left": stats.PauseTimeLeft,
|
||||||
|
"total_jobs": stats.TotalJobs,
|
||||||
|
},
|
||||||
|
map[string]string{
|
||||||
|
"name": stats.Name,
|
||||||
|
"server": b.Server,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func runQuery(connection *textproto.Conn, cmd string, result interface{}) error {
|
||||||
|
requestId, err := connection.Cmd(cmd)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.StartResponse(requestId)
|
||||||
|
defer connection.EndResponse(requestId)
|
||||||
|
|
||||||
|
status, err := connection.ReadLine()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
size := 0
|
||||||
|
if _, err = fmt.Sscanf(status, "OK %d", &size); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
body := make([]byte, size+2)
|
||||||
|
if _, err = io.ReadFull(connection.R, body); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return yaml.Unmarshal(body, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("beanstalkd", func() telegraf.Input {
|
||||||
|
return &Beanstalkd{}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type statsResponse struct {
|
||||||
|
BinlogCurrentIndex int `yaml:"binlog-current-index"`
|
||||||
|
BinlogMaxSize int `yaml:"binlog-max-size"`
|
||||||
|
BinlogOldestIndex int `yaml:"binlog-oldest-index"`
|
||||||
|
BinlogRecordsMigrated int `yaml:"binlog-records-migrated"`
|
||||||
|
BinlogRecordsWritten int `yaml:"binlog-records-written"`
|
||||||
|
CmdBury int `yaml:"cmd-bury"`
|
||||||
|
CmdDelete int `yaml:"cmd-delete"`
|
||||||
|
CmdIgnore int `yaml:"cmd-ignore"`
|
||||||
|
CmdKick int `yaml:"cmd-kick"`
|
||||||
|
CmdListTubeUsed int `yaml:"cmd-list-tube-used"`
|
||||||
|
CmdListTubes int `yaml:"cmd-list-tubes"`
|
||||||
|
CmdListTubesWatched int `yaml:"cmd-list-tubes-watched"`
|
||||||
|
CmdPauseTube int `yaml:"cmd-pause-tube"`
|
||||||
|
CmdPeek int `yaml:"cmd-peek"`
|
||||||
|
CmdPeekBuried int `yaml:"cmd-peek-buried"`
|
||||||
|
CmdPeekDelayed int `yaml:"cmd-peek-delayed"`
|
||||||
|
CmdPeekReady int `yaml:"cmd-peek-ready"`
|
||||||
|
CmdPut int `yaml:"cmd-put"`
|
||||||
|
CmdRelease int `yaml:"cmd-release"`
|
||||||
|
CmdReserve int `yaml:"cmd-reserve"`
|
||||||
|
CmdReserveWithTimeout int `yaml:"cmd-reserve-with-timeout"`
|
||||||
|
CmdStats int `yaml:"cmd-stats"`
|
||||||
|
CmdStatsJob int `yaml:"cmd-stats-job"`
|
||||||
|
CmdStatsTube int `yaml:"cmd-stats-tube"`
|
||||||
|
CmdTouch int `yaml:"cmd-touch"`
|
||||||
|
CmdUse int `yaml:"cmd-use"`
|
||||||
|
CmdWatch int `yaml:"cmd-watch"`
|
||||||
|
CurrentConnections int `yaml:"current-connections"`
|
||||||
|
CurrentJobsBuried int `yaml:"current-jobs-buried"`
|
||||||
|
CurrentJobsDelayed int `yaml:"current-jobs-delayed"`
|
||||||
|
CurrentJobsReady int `yaml:"current-jobs-ready"`
|
||||||
|
CurrentJobsReserved int `yaml:"current-jobs-reserved"`
|
||||||
|
CurrentJobsUrgent int `yaml:"current-jobs-urgent"`
|
||||||
|
CurrentProducers int `yaml:"current-producers"`
|
||||||
|
CurrentTubes int `yaml:"current-tubes"`
|
||||||
|
CurrentWaiting int `yaml:"current-waiting"`
|
||||||
|
CurrentWorkers int `yaml:"current-workers"`
|
||||||
|
Hostname string `yaml:"hostname"`
|
||||||
|
Id string `yaml:"id"`
|
||||||
|
JobTimeouts int `yaml:"job-timeouts"`
|
||||||
|
MaxJobSize int `yaml:"max-job-size"`
|
||||||
|
Pid int `yaml:"pid"`
|
||||||
|
RusageStime float64 `yaml:"rusage-stime"`
|
||||||
|
RusageUtime float64 `yaml:"rusage-utime"`
|
||||||
|
TotalConnections int `yaml:"total-connections"`
|
||||||
|
TotalJobs int `yaml:"total-jobs"`
|
||||||
|
Uptime int `yaml:"uptime"`
|
||||||
|
Version string `yaml:"version"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type statsTubeResponse struct {
|
||||||
|
CmdDelete int `yaml:"cmd-delete"`
|
||||||
|
CmdPauseTube int `yaml:"cmd-pause-tube"`
|
||||||
|
CurrentJobsBuried int `yaml:"current-jobs-buried"`
|
||||||
|
CurrentJobsDelayed int `yaml:"current-jobs-delayed"`
|
||||||
|
CurrentJobsReady int `yaml:"current-jobs-ready"`
|
||||||
|
CurrentJobsReserved int `yaml:"current-jobs-reserved"`
|
||||||
|
CurrentJobsUrgent int `yaml:"current-jobs-urgent"`
|
||||||
|
CurrentUsing int `yaml:"current-using"`
|
||||||
|
CurrentWaiting int `yaml:"current-waiting"`
|
||||||
|
CurrentWatching int `yaml:"current-watching"`
|
||||||
|
Name string `yaml:"name"`
|
||||||
|
Pause int `yaml:"pause"`
|
||||||
|
PauseTimeLeft int `yaml:"pause-time-left"`
|
||||||
|
TotalJobs int `yaml:"total-jobs"`
|
||||||
|
}
|
|
@ -0,0 +1,332 @@
|
||||||
|
package beanstalkd_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"net/textproto"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs/beanstalkd"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBeanstalkd(t *testing.T) {
|
||||||
|
type tubeStats struct {
|
||||||
|
name string
|
||||||
|
fields map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
tubesConfig []string
|
||||||
|
expectedTubes []tubeStats
|
||||||
|
notExpectedTubes []tubeStats
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "All tubes stats",
|
||||||
|
tubesConfig: []string{},
|
||||||
|
expectedTubes: []tubeStats{
|
||||||
|
{name: "default", fields: defaultTubeFields},
|
||||||
|
{name: "test", fields: testTubeFields},
|
||||||
|
},
|
||||||
|
notExpectedTubes: []tubeStats{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Specified tubes stats",
|
||||||
|
tubesConfig: []string{"test"},
|
||||||
|
expectedTubes: []tubeStats{
|
||||||
|
{name: "test", fields: testTubeFields},
|
||||||
|
},
|
||||||
|
notExpectedTubes: []tubeStats{
|
||||||
|
{name: "default", fields: defaultTubeFields},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Unknown tube stats",
|
||||||
|
tubesConfig: []string{"unknown"},
|
||||||
|
expectedTubes: []tubeStats{},
|
||||||
|
notExpectedTubes: []tubeStats{
|
||||||
|
{name: "default", fields: defaultTubeFields},
|
||||||
|
{name: "test", fields: testTubeFields},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
server, err := startTestServer(t)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unable to create test server")
|
||||||
|
}
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
serverAddress := server.Addr().String()
|
||||||
|
plugin := beanstalkd.Beanstalkd{
|
||||||
|
Server: serverAddress,
|
||||||
|
Tubes: test.tubesConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, acc.GatherError(plugin.Gather))
|
||||||
|
|
||||||
|
acc.AssertContainsTaggedFields(t, "beanstalkd_overview",
|
||||||
|
overviewFields,
|
||||||
|
getOverviewTags(serverAddress),
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, expectedTube := range test.expectedTubes {
|
||||||
|
acc.AssertContainsTaggedFields(t, "beanstalkd_tube",
|
||||||
|
expectedTube.fields,
|
||||||
|
getTubeTags(serverAddress, expectedTube.name),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, notExpectedTube := range test.notExpectedTubes {
|
||||||
|
acc.AssertDoesNotContainsTaggedFields(t, "beanstalkd_tube",
|
||||||
|
notExpectedTube.fields,
|
||||||
|
getTubeTags(serverAddress, notExpectedTube.name),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func startTestServer(t *testing.T) (net.Listener, error) {
|
||||||
|
server, err := net.Listen("tcp", "localhost:0")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
connection, err := server.Accept()
|
||||||
|
if err != nil {
|
||||||
|
t.Log("Test server: failed to accept connection. Error: ", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
tp := textproto.NewConn(connection)
|
||||||
|
defer tp.Close()
|
||||||
|
|
||||||
|
sendSuccessResponse := func(body string) {
|
||||||
|
tp.PrintfLine("OK %d\r\n%s", len(body), body)
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
cmd, err := tp.ReadLine()
|
||||||
|
if err == io.EOF {
|
||||||
|
return
|
||||||
|
} else if err != nil {
|
||||||
|
t.Log("Test server: failed read command. Error: ", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch cmd {
|
||||||
|
case "list-tubes":
|
||||||
|
sendSuccessResponse(listTubesResponse)
|
||||||
|
case "stats":
|
||||||
|
sendSuccessResponse(statsResponse)
|
||||||
|
case "stats-tube default":
|
||||||
|
sendSuccessResponse(statsTubeDefaultResponse)
|
||||||
|
case "stats-tube test":
|
||||||
|
sendSuccessResponse(statsTubeTestResponse)
|
||||||
|
case "stats-tube unknown":
|
||||||
|
tp.PrintfLine("NOT_FOUND")
|
||||||
|
default:
|
||||||
|
t.Log("Test server: unknown command")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return server, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
listTubesResponse = `---
|
||||||
|
- default
|
||||||
|
- test
|
||||||
|
`
|
||||||
|
statsResponse = `---
|
||||||
|
current-jobs-urgent: 5
|
||||||
|
current-jobs-ready: 5
|
||||||
|
current-jobs-reserved: 0
|
||||||
|
current-jobs-delayed: 1
|
||||||
|
current-jobs-buried: 0
|
||||||
|
cmd-put: 6
|
||||||
|
cmd-peek: 0
|
||||||
|
cmd-peek-ready: 1
|
||||||
|
cmd-peek-delayed: 0
|
||||||
|
cmd-peek-buried: 0
|
||||||
|
cmd-reserve: 0
|
||||||
|
cmd-reserve-with-timeout: 1
|
||||||
|
cmd-delete: 1
|
||||||
|
cmd-release: 0
|
||||||
|
cmd-use: 2
|
||||||
|
cmd-watch: 0
|
||||||
|
cmd-ignore: 0
|
||||||
|
cmd-bury: 1
|
||||||
|
cmd-kick: 1
|
||||||
|
cmd-touch: 0
|
||||||
|
cmd-stats: 1
|
||||||
|
cmd-stats-job: 0
|
||||||
|
cmd-stats-tube: 2
|
||||||
|
cmd-list-tubes: 1
|
||||||
|
cmd-list-tube-used: 0
|
||||||
|
cmd-list-tubes-watched: 0
|
||||||
|
cmd-pause-tube: 0
|
||||||
|
job-timeouts: 0
|
||||||
|
total-jobs: 6
|
||||||
|
max-job-size: 65535
|
||||||
|
current-tubes: 2
|
||||||
|
current-connections: 2
|
||||||
|
current-producers: 1
|
||||||
|
current-workers: 1
|
||||||
|
current-waiting: 0
|
||||||
|
total-connections: 2
|
||||||
|
pid: 6
|
||||||
|
version: 1.10
|
||||||
|
rusage-utime: 0.000000
|
||||||
|
rusage-stime: 0.000000
|
||||||
|
uptime: 20
|
||||||
|
binlog-oldest-index: 0
|
||||||
|
binlog-current-index: 0
|
||||||
|
binlog-records-migrated: 0
|
||||||
|
binlog-records-written: 0
|
||||||
|
binlog-max-size: 10485760
|
||||||
|
id: bba7546657efdd4c
|
||||||
|
hostname: 2873efd3e88c
|
||||||
|
`
|
||||||
|
statsTubeDefaultResponse = `---
|
||||||
|
name: default
|
||||||
|
current-jobs-urgent: 0
|
||||||
|
current-jobs-ready: 0
|
||||||
|
current-jobs-reserved: 0
|
||||||
|
current-jobs-delayed: 0
|
||||||
|
current-jobs-buried: 0
|
||||||
|
total-jobs: 0
|
||||||
|
current-using: 2
|
||||||
|
current-watching: 2
|
||||||
|
current-waiting: 0
|
||||||
|
cmd-delete: 0
|
||||||
|
cmd-pause-tube: 0
|
||||||
|
pause: 0
|
||||||
|
pause-time-left: 0
|
||||||
|
`
|
||||||
|
statsTubeTestResponse = `---
|
||||||
|
name: test
|
||||||
|
current-jobs-urgent: 5
|
||||||
|
current-jobs-ready: 5
|
||||||
|
current-jobs-reserved: 0
|
||||||
|
current-jobs-delayed: 1
|
||||||
|
current-jobs-buried: 0
|
||||||
|
total-jobs: 6
|
||||||
|
current-using: 0
|
||||||
|
current-watching: 0
|
||||||
|
current-waiting: 0
|
||||||
|
cmd-delete: 0
|
||||||
|
cmd-pause-tube: 0
|
||||||
|
pause: 0
|
||||||
|
pause-time-left: 0
|
||||||
|
`
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// Default tube without stats
|
||||||
|
defaultTubeFields = map[string]interface{}{
|
||||||
|
"cmd_delete": 0,
|
||||||
|
"cmd_pause_tube": 0,
|
||||||
|
"current_jobs_buried": 0,
|
||||||
|
"current_jobs_delayed": 0,
|
||||||
|
"current_jobs_ready": 0,
|
||||||
|
"current_jobs_reserved": 0,
|
||||||
|
"current_jobs_urgent": 0,
|
||||||
|
"current_using": 2,
|
||||||
|
"current_waiting": 0,
|
||||||
|
"current_watching": 2,
|
||||||
|
"pause": 0,
|
||||||
|
"pause_time_left": 0,
|
||||||
|
"total_jobs": 0,
|
||||||
|
}
|
||||||
|
// Test tube with stats
|
||||||
|
testTubeFields = map[string]interface{}{
|
||||||
|
"cmd_delete": 0,
|
||||||
|
"cmd_pause_tube": 0,
|
||||||
|
"current_jobs_buried": 0,
|
||||||
|
"current_jobs_delayed": 1,
|
||||||
|
"current_jobs_ready": 5,
|
||||||
|
"current_jobs_reserved": 0,
|
||||||
|
"current_jobs_urgent": 5,
|
||||||
|
"current_using": 0,
|
||||||
|
"current_waiting": 0,
|
||||||
|
"current_watching": 0,
|
||||||
|
"pause": 0,
|
||||||
|
"pause_time_left": 0,
|
||||||
|
"total_jobs": 6,
|
||||||
|
}
|
||||||
|
// Server stats
|
||||||
|
overviewFields = map[string]interface{}{
|
||||||
|
"binlog_current_index": 0,
|
||||||
|
"binlog_max_size": 10485760,
|
||||||
|
"binlog_oldest_index": 0,
|
||||||
|
"binlog_records_migrated": 0,
|
||||||
|
"binlog_records_written": 0,
|
||||||
|
"cmd_bury": 1,
|
||||||
|
"cmd_delete": 1,
|
||||||
|
"cmd_ignore": 0,
|
||||||
|
"cmd_kick": 1,
|
||||||
|
"cmd_list_tube_used": 0,
|
||||||
|
"cmd_list_tubes": 1,
|
||||||
|
"cmd_list_tubes_watched": 0,
|
||||||
|
"cmd_pause_tube": 0,
|
||||||
|
"cmd_peek": 0,
|
||||||
|
"cmd_peek_buried": 0,
|
||||||
|
"cmd_peek_delayed": 0,
|
||||||
|
"cmd_peek_ready": 1,
|
||||||
|
"cmd_put": 6,
|
||||||
|
"cmd_release": 0,
|
||||||
|
"cmd_reserve": 0,
|
||||||
|
"cmd_reserve_with_timeout": 1,
|
||||||
|
"cmd_stats": 1,
|
||||||
|
"cmd_stats_job": 0,
|
||||||
|
"cmd_stats_tube": 2,
|
||||||
|
"cmd_touch": 0,
|
||||||
|
"cmd_use": 2,
|
||||||
|
"cmd_watch": 0,
|
||||||
|
"current_connections": 2,
|
||||||
|
"current_jobs_buried": 0,
|
||||||
|
"current_jobs_delayed": 1,
|
||||||
|
"current_jobs_ready": 5,
|
||||||
|
"current_jobs_reserved": 0,
|
||||||
|
"current_jobs_urgent": 5,
|
||||||
|
"current_producers": 1,
|
||||||
|
"current_tubes": 2,
|
||||||
|
"current_waiting": 0,
|
||||||
|
"current_workers": 1,
|
||||||
|
"job_timeouts": 0,
|
||||||
|
"max_job_size": 65535,
|
||||||
|
"pid": 6,
|
||||||
|
"rusage_stime": 0.0,
|
||||||
|
"rusage_utime": 0.0,
|
||||||
|
"total_connections": 2,
|
||||||
|
"total_jobs": 6,
|
||||||
|
"uptime": 20,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func getOverviewTags(server string) map[string]string {
|
||||||
|
return map[string]string{
|
||||||
|
"hostname": "2873efd3e88c",
|
||||||
|
"id": "bba7546657efdd4c",
|
||||||
|
"server": server,
|
||||||
|
"version": "1.10",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTubeTags(server string, tube string) map[string]string {
|
||||||
|
return map[string]string{
|
||||||
|
"name": tube,
|
||||||
|
"server": server,
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue