package disque import ( "bufio" "fmt" "net" "testing" "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestDisqueGeneratesMetrics(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } l, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) defer l.Close() go func() { c, err := l.Accept() if err != nil { return } buf := bufio.NewReader(c) for { line, err := buf.ReadString('\n') if err != nil { return } if line != "info\r\n" { return } fmt.Fprintf(c, "$%d\n", len(testOutput)) c.Write([]byte(testOutput)) } }() addr := fmt.Sprintf("disque://%s", l.Addr().String()) r := &Disque{ Servers: []string{addr}, } var acc testutil.Accumulator err = r.Gather(&acc) require.NoError(t, err) checkInt := []struct { name string value uint64 }{ {"uptime", 1452705}, {"clients", 31}, {"blocked_clients", 13}, {"used_memory", 1840104}, {"used_memory_rss", 3227648}, {"used_memory_peak", 89603656}, {"total_connections_received", 5062777}, {"total_commands_processed", 12308396}, {"instantaneous_ops_per_sec", 18}, {"latest_fork_usec", 1644}, {"registered_jobs", 360}, {"registered_queues", 12}, } for _, c := range checkInt { assert.True(t, acc.CheckValue(c.name, c.value)) } checkFloat := []struct { name string value float64 }{ {"mem_fragmentation_ratio", 1.75}, {"used_cpu_sys", 19585.73}, {"used_cpu_user", 11255.96}, {"used_cpu_sys_children", 1.75}, {"used_cpu_user_children", 1.91}, } for _, c := range checkFloat { assert.True(t, acc.CheckValue(c.name, c.value)) } } func TestDisqueCanPullStatsFromMultipleServers(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } l, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) defer l.Close() go func() { c, err := l.Accept() if err != nil { return } buf := bufio.NewReader(c) for { line, err := buf.ReadString('\n') if err != nil { return } if line != "info\r\n" { return } fmt.Fprintf(c, "$%d\n", len(testOutput)) c.Write([]byte(testOutput)) } }() addr := fmt.Sprintf("disque://%s", l.Addr().String()) r := &Disque{ Servers: []string{addr}, } var acc testutil.Accumulator err = r.Gather(&acc) require.NoError(t, err) checkInt := []struct { name string value uint64 }{ {"uptime", 1452705}, {"clients", 31}, {"blocked_clients", 13}, {"used_memory", 1840104}, {"used_memory_rss", 3227648}, {"used_memory_peak", 89603656}, {"total_connections_received", 5062777}, {"total_commands_processed", 12308396}, {"instantaneous_ops_per_sec", 18}, {"latest_fork_usec", 1644}, {"registered_jobs", 360}, {"registered_queues", 12}, } for _, c := range checkInt { assert.True(t, acc.CheckValue(c.name, c.value)) } checkFloat := []struct { name string value float64 }{ {"mem_fragmentation_ratio", 1.75}, {"used_cpu_sys", 19585.73}, {"used_cpu_user", 11255.96}, {"used_cpu_sys_children", 1.75}, {"used_cpu_user_children", 1.91}, } for _, c := range checkFloat { assert.True(t, acc.CheckValue(c.name, c.value)) } } const testOutput = `# Server disque_version:0.0.1 disque_git_sha1:b5247598 disque_git_dirty:0 disque_build_id:379fda78983a60c6 os:Linux 3.13.0-44-generic x86_64 arch_bits:64 multiplexing_api:epoll gcc_version:4.8.2 process_id:32420 run_id:1cfdfa4c6bc3f285182db5427522a8a4c16e42e4 tcp_port:7711 uptime_in_seconds:1452705 uptime_in_days:16 hz:10 config_file:/usr/local/etc/disque/disque.conf # Clients connected_clients:31 client_longest_output_list:0 client_biggest_input_buf:0 blocked_clients:13 # Memory used_memory:1840104 used_memory_human:1.75M used_memory_rss:3227648 used_memory_peak:89603656 used_memory_peak_human:85.45M mem_fragmentation_ratio:1.75 mem_allocator:jemalloc-3.6.0 # Jobs registered_jobs:360 # Queues registered_queues:12 # Persistence loading:0 aof_enabled:1 aof_state:on aof_rewrite_in_progress:0 aof_rewrite_scheduled:0 aof_last_rewrite_time_sec:0 aof_current_rewrite_time_sec:-1 aof_last_bgrewrite_status:ok aof_last_write_status:ok aof_current_size:41952430 aof_base_size:9808 aof_pending_rewrite:0 aof_buffer_length:0 aof_rewrite_buffer_length:0 aof_pending_bio_fsync:0 aof_delayed_fsync:1 # Stats total_connections_received:5062777 total_commands_processed:12308396 instantaneous_ops_per_sec:18 total_net_input_bytes:1346996528 total_net_output_bytes:1967551763 instantaneous_input_kbps:1.38 instantaneous_output_kbps:1.78 rejected_connections:0 latest_fork_usec:1644 # CPU used_cpu_sys:19585.73 used_cpu_user:11255.96 used_cpu_sys_children:1.75 used_cpu_user_children:1.91 `