243 lines
4.5 KiB
Go
243 lines
4.5 KiB
Go
|
package disque
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"fmt"
|
||
|
"net"
|
||
|
"testing"
|
||
|
|
||
|
"github.com/influxdb/telegraf/testutil"
|
||
|
"github.com/stretchr/testify/assert"
|
||
|
"github.com/stretchr/testify/require"
|
||
|
)
|
||
|
|
||
|
func TestDisqueGeneratesMetrics(t *testing.T) {
|
||
|
l, err := net.Listen("tcp", ":0")
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
defer l.Close()
|
||
|
|
||
|
go func() {
|
||
|
c, err := l.Accept()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
buf := bufio.NewReader(c)
|
||
|
|
||
|
for {
|
||
|
line, err := buf.ReadString('\n')
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if line != "info\r\n" {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
fmt.Fprintf(c, "$%d\n", len(testOutput))
|
||
|
c.Write([]byte(testOutput))
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
addr := fmt.Sprintf("disque://%s", l.Addr().String())
|
||
|
|
||
|
r := &Disque{
|
||
|
Servers: []string{addr},
|
||
|
}
|
||
|
|
||
|
var acc testutil.Accumulator
|
||
|
|
||
|
err = r.Gather(&acc)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
checkInt := []struct {
|
||
|
name string
|
||
|
value uint64
|
||
|
}{
|
||
|
{"uptime", 1452705},
|
||
|
{"clients", 31},
|
||
|
{"blocked_clients", 13},
|
||
|
{"used_memory", 1840104},
|
||
|
{"used_memory_rss", 3227648},
|
||
|
{"used_memory_peak", 89603656},
|
||
|
{"total_connections_received", 5062777},
|
||
|
{"total_commands_processed", 12308396},
|
||
|
{"instantaneous_ops_per_sec", 18},
|
||
|
{"latest_fork_usec", 1644},
|
||
|
{"registered_jobs", 360},
|
||
|
{"registered_queues", 12},
|
||
|
}
|
||
|
|
||
|
for _, c := range checkInt {
|
||
|
assert.True(t, acc.CheckValue(c.name, c.value))
|
||
|
}
|
||
|
|
||
|
checkFloat := []struct {
|
||
|
name string
|
||
|
value float64
|
||
|
}{
|
||
|
{"mem_fragmentation_ratio", 1.75},
|
||
|
{"used_cpu_sys", 19585.73},
|
||
|
{"used_cpu_user", 11255.96},
|
||
|
{"used_cpu_sys_children", 1.75},
|
||
|
{"used_cpu_user_children", 1.91},
|
||
|
}
|
||
|
|
||
|
for _, c := range checkFloat {
|
||
|
assert.True(t, acc.CheckValue(c.name, c.value))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestDisqueCanPullStatsFromMultipleServers(t *testing.T) {
|
||
|
l, err := net.Listen("tcp", ":0")
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
defer l.Close()
|
||
|
|
||
|
go func() {
|
||
|
c, err := l.Accept()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
buf := bufio.NewReader(c)
|
||
|
|
||
|
for {
|
||
|
line, err := buf.ReadString('\n')
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if line != "info\r\n" {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
fmt.Fprintf(c, "$%d\n", len(testOutput))
|
||
|
c.Write([]byte(testOutput))
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
addr := fmt.Sprintf("disque://%s", l.Addr().String())
|
||
|
|
||
|
r := &Disque{
|
||
|
Servers: []string{addr},
|
||
|
}
|
||
|
|
||
|
var acc testutil.Accumulator
|
||
|
|
||
|
err = r.Gather(&acc)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
checkInt := []struct {
|
||
|
name string
|
||
|
value uint64
|
||
|
}{
|
||
|
{"uptime", 1452705},
|
||
|
{"clients", 31},
|
||
|
{"blocked_clients", 13},
|
||
|
{"used_memory", 1840104},
|
||
|
{"used_memory_rss", 3227648},
|
||
|
{"used_memory_peak", 89603656},
|
||
|
{"total_connections_received", 5062777},
|
||
|
{"total_commands_processed", 12308396},
|
||
|
{"instantaneous_ops_per_sec", 18},
|
||
|
{"latest_fork_usec", 1644},
|
||
|
{"registered_jobs", 360},
|
||
|
{"registered_queues", 12},
|
||
|
}
|
||
|
|
||
|
for _, c := range checkInt {
|
||
|
assert.True(t, acc.CheckValue(c.name, c.value))
|
||
|
}
|
||
|
|
||
|
checkFloat := []struct {
|
||
|
name string
|
||
|
value float64
|
||
|
}{
|
||
|
{"mem_fragmentation_ratio", 1.75},
|
||
|
{"used_cpu_sys", 19585.73},
|
||
|
{"used_cpu_user", 11255.96},
|
||
|
{"used_cpu_sys_children", 1.75},
|
||
|
{"used_cpu_user_children", 1.91},
|
||
|
}
|
||
|
|
||
|
for _, c := range checkFloat {
|
||
|
assert.True(t, acc.CheckValue(c.name, c.value))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
const testOutput = `# Server
|
||
|
disque_version:0.0.1
|
||
|
disque_git_sha1:b5247598
|
||
|
disque_git_dirty:0
|
||
|
disque_build_id:379fda78983a60c6
|
||
|
os:Linux 3.13.0-44-generic x86_64
|
||
|
arch_bits:64
|
||
|
multiplexing_api:epoll
|
||
|
gcc_version:4.8.2
|
||
|
process_id:32420
|
||
|
run_id:1cfdfa4c6bc3f285182db5427522a8a4c16e42e4
|
||
|
tcp_port:7711
|
||
|
uptime_in_seconds:1452705
|
||
|
uptime_in_days:16
|
||
|
hz:10
|
||
|
config_file:/usr/local/etc/disque/disque.conf
|
||
|
|
||
|
# Clients
|
||
|
connected_clients:31
|
||
|
client_longest_output_list:0
|
||
|
client_biggest_input_buf:0
|
||
|
blocked_clients:13
|
||
|
|
||
|
# Memory
|
||
|
used_memory:1840104
|
||
|
used_memory_human:1.75M
|
||
|
used_memory_rss:3227648
|
||
|
used_memory_peak:89603656
|
||
|
used_memory_peak_human:85.45M
|
||
|
mem_fragmentation_ratio:1.75
|
||
|
mem_allocator:jemalloc-3.6.0
|
||
|
|
||
|
# Jobs
|
||
|
registered_jobs:360
|
||
|
|
||
|
# Queues
|
||
|
registered_queues:12
|
||
|
|
||
|
# Persistence
|
||
|
loading:0
|
||
|
aof_enabled:1
|
||
|
aof_state:on
|
||
|
aof_rewrite_in_progress:0
|
||
|
aof_rewrite_scheduled:0
|
||
|
aof_last_rewrite_time_sec:0
|
||
|
aof_current_rewrite_time_sec:-1
|
||
|
aof_last_bgrewrite_status:ok
|
||
|
aof_last_write_status:ok
|
||
|
aof_current_size:41952430
|
||
|
aof_base_size:9808
|
||
|
aof_pending_rewrite:0
|
||
|
aof_buffer_length:0
|
||
|
aof_rewrite_buffer_length:0
|
||
|
aof_pending_bio_fsync:0
|
||
|
aof_delayed_fsync:1
|
||
|
|
||
|
# Stats
|
||
|
total_connections_received:5062777
|
||
|
total_commands_processed:12308396
|
||
|
instantaneous_ops_per_sec:18
|
||
|
total_net_input_bytes:1346996528
|
||
|
total_net_output_bytes:1967551763
|
||
|
instantaneous_input_kbps:1.38
|
||
|
instantaneous_output_kbps:1.78
|
||
|
rejected_connections:0
|
||
|
latest_fork_usec:1644
|
||
|
|
||
|
# CPU
|
||
|
used_cpu_sys:19585.73
|
||
|
used_cpu_user:11255.96
|
||
|
used_cpu_sys_children:1.75
|
||
|
used_cpu_user_children:1.91
|
||
|
`
|