From c0fa6af51b97cd7c3e79535175077c2b86c3ec83 Mon Sep 17 00:00:00 2001 From: Graham Floyd Date: Fri, 31 Jul 2015 14:46:46 -0500 Subject: [PATCH] Add disque plugin --- plugins/all/all.go | 1 + plugins/disque/disque.go | 202 ++++++++++++++++++++++++++++ plugins/disque/disque_test.go | 242 ++++++++++++++++++++++++++++++++++ 3 files changed, 445 insertions(+) create mode 100644 plugins/disque/disque.go create mode 100644 plugins/disque/disque_test.go diff --git a/plugins/all/all.go b/plugins/all/all.go index 595fdcaf4..dcedf2bf8 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/influxdb/telegraf/plugins/disque" _ "github.com/influxdb/telegraf/plugins/elasticsearch" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/memcached" diff --git a/plugins/disque/disque.go b/plugins/disque/disque.go new file mode 100644 index 000000000..292e1b363 --- /dev/null +++ b/plugins/disque/disque.go @@ -0,0 +1,202 @@ +package disque + +import ( + "bufio" + "errors" + "fmt" + "net" + "net/url" + "strconv" + "strings" + "sync" + + "github.com/influxdb/telegraf/plugins" +) + +type Disque struct { + Servers []string + + c net.Conn + buf []byte +} + +var sampleConfig = ` +# An array of URI to gather stats about. Specify an ip or hostname +# with optional port and password. ie disque://localhost, disque://10.10.3.33:18832, +# 10.0.0.1:10000, etc. +# +# If no servers are specified, then localhost is used as the host. +servers = ["localhost"]` + +func (r *Disque) SampleConfig() string { + return sampleConfig +} + +func (r *Disque) Description() string { + return "Read metrics from one or many disque servers" +} + +var Tracking = map[string]string{ + "uptime_in_seconds": "uptime", + "connected_clients": "clients", + "blocked_clients": "blocked_clients", + "used_memory": "used_memory", + "used_memory_rss": "used_memory_rss", + "used_memory_peak": "used_memory_peak", + "total_connections_received": "total_connections_received", + "total_commands_processed": "total_commands_processed", + "instantaneous_ops_per_sec": "instantaneous_ops_per_sec", + "latest_fork_usec": "latest_fork_usec", + "mem_fragmentation_ratio": "mem_fragmentation_ratio", + "used_cpu_sys": "used_cpu_sys", + "used_cpu_user": "used_cpu_user", + "used_cpu_sys_children": "used_cpu_sys_children", + "used_cpu_user_children": "used_cpu_user_children", + "registered_jobs": "registered_jobs", + "registered_queues": "registered_queues", +} + +var ErrProtocolError = errors.New("disque protocol error") + +// Reads stats from all configured servers accumulates stats. +// Returns one of the errors encountered while gather stats (if any). +func (g *Disque) Gather(acc plugins.Accumulator) error { + if len(g.Servers) == 0 { + url := &url.URL{ + Host: ":7711", + } + g.gatherServer(url, acc) + return nil + } + + var wg sync.WaitGroup + + var outerr error + + for _, serv := range g.Servers { + u, err := url.Parse(serv) + if err != nil { + return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) + } else if u.Scheme == "" { + // fallback to simple string based address (i.e. "10.0.0.1:10000") + u.Scheme = "tcp" + u.Host = serv + u.Path = "" + } + wg.Add(1) + go func(serv string) { + defer wg.Done() + outerr = g.gatherServer(u, acc) + }(serv) + } + + wg.Wait() + + return outerr +} + +const defaultPort = "7711" + +func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error { + if g.c == nil { + + _, _, err := net.SplitHostPort(addr.Host) + if err != nil { + addr.Host = addr.Host + ":" + defaultPort + } + + c, err := net.Dial("tcp", addr.Host) + if err != nil { + return fmt.Errorf("Unable to connect to disque server '%s': %s", addr.Host, err) + } + + if addr.User != nil { + pwd, set := addr.User.Password() + if set && pwd != "" { + c.Write([]byte(fmt.Sprintf("AUTH %s\r\n", pwd))) + + r := bufio.NewReader(c) + + line, err := r.ReadString('\n') + if err != nil { + return err + } + if line[0] != '+' { + return fmt.Errorf("%s", strings.TrimSpace(line)[1:]) + } + } + } + + g.c = c + } + + g.c.Write([]byte("info\r\n")) + + r := bufio.NewReader(g.c) + + line, err := r.ReadString('\n') + if err != nil { + return err + } + + if line[0] != '$' { + return fmt.Errorf("bad line start: %s", ErrProtocolError) + } + + line = strings.TrimSpace(line) + + szStr := line[1:] + + sz, err := strconv.Atoi(szStr) + if err != nil { + return fmt.Errorf("bad size string <<%s>>: %s", szStr, ErrProtocolError) + } + + var read int + + for read < sz { + line, err := r.ReadString('\n') + if err != nil { + return err + } + + read += len(line) + + if len(line) == 1 || line[0] == '#' { + continue + } + + parts := strings.SplitN(line, ":", 2) + + name := string(parts[0]) + + metric, ok := Tracking[name] + if !ok { + continue + } + + tags := map[string]string{"host": addr.String()} + val := strings.TrimSpace(parts[1]) + + ival, err := strconv.ParseUint(val, 10, 64) + if err == nil { + acc.Add(metric, ival, tags) + continue + } + + fval, err := strconv.ParseFloat(val, 64) + if err != nil { + return err + } + + acc.Add(metric, fval, tags) + } + + return nil +} + +func init() { + plugins.Add("disque", func() plugins.Plugin { + return &Disque{} + }) +} diff --git a/plugins/disque/disque_test.go b/plugins/disque/disque_test.go new file mode 100644 index 000000000..68228e538 --- /dev/null +++ b/plugins/disque/disque_test.go @@ -0,0 +1,242 @@ +package disque + +import ( + "bufio" + "fmt" + "net" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDisqueGeneratesMetrics(t *testing.T) { + l, err := net.Listen("tcp", ":0") + require.NoError(t, err) + + defer l.Close() + + go func() { + c, err := l.Accept() + if err != nil { + return + } + + buf := bufio.NewReader(c) + + for { + line, err := buf.ReadString('\n') + if err != nil { + return + } + + if line != "info\r\n" { + return + } + + fmt.Fprintf(c, "$%d\n", len(testOutput)) + c.Write([]byte(testOutput)) + } + }() + + addr := fmt.Sprintf("disque://%s", l.Addr().String()) + + r := &Disque{ + Servers: []string{addr}, + } + + var acc testutil.Accumulator + + err = r.Gather(&acc) + require.NoError(t, err) + + checkInt := []struct { + name string + value uint64 + }{ + {"uptime", 1452705}, + {"clients", 31}, + {"blocked_clients", 13}, + {"used_memory", 1840104}, + {"used_memory_rss", 3227648}, + {"used_memory_peak", 89603656}, + {"total_connections_received", 5062777}, + {"total_commands_processed", 12308396}, + {"instantaneous_ops_per_sec", 18}, + {"latest_fork_usec", 1644}, + {"registered_jobs", 360}, + {"registered_queues", 12}, + } + + for _, c := range checkInt { + assert.True(t, acc.CheckValue(c.name, c.value)) + } + + checkFloat := []struct { + name string + value float64 + }{ + {"mem_fragmentation_ratio", 1.75}, + {"used_cpu_sys", 19585.73}, + {"used_cpu_user", 11255.96}, + {"used_cpu_sys_children", 1.75}, + {"used_cpu_user_children", 1.91}, + } + + for _, c := range checkFloat { + assert.True(t, acc.CheckValue(c.name, c.value)) + } +} + +func TestDisqueCanPullStatsFromMultipleServers(t *testing.T) { + l, err := net.Listen("tcp", ":0") + require.NoError(t, err) + + defer l.Close() + + go func() { + c, err := l.Accept() + if err != nil { + return + } + + buf := bufio.NewReader(c) + + for { + line, err := buf.ReadString('\n') + if err != nil { + return + } + + if line != "info\r\n" { + return + } + + fmt.Fprintf(c, "$%d\n", len(testOutput)) + c.Write([]byte(testOutput)) + } + }() + + addr := fmt.Sprintf("disque://%s", l.Addr().String()) + + r := &Disque{ + Servers: []string{addr}, + } + + var acc testutil.Accumulator + + err = r.Gather(&acc) + require.NoError(t, err) + + checkInt := []struct { + name string + value uint64 + }{ + {"uptime", 1452705}, + {"clients", 31}, + {"blocked_clients", 13}, + {"used_memory", 1840104}, + {"used_memory_rss", 3227648}, + {"used_memory_peak", 89603656}, + {"total_connections_received", 5062777}, + {"total_commands_processed", 12308396}, + {"instantaneous_ops_per_sec", 18}, + {"latest_fork_usec", 1644}, + {"registered_jobs", 360}, + {"registered_queues", 12}, + } + + for _, c := range checkInt { + assert.True(t, acc.CheckValue(c.name, c.value)) + } + + checkFloat := []struct { + name string + value float64 + }{ + {"mem_fragmentation_ratio", 1.75}, + {"used_cpu_sys", 19585.73}, + {"used_cpu_user", 11255.96}, + {"used_cpu_sys_children", 1.75}, + {"used_cpu_user_children", 1.91}, + } + + for _, c := range checkFloat { + assert.True(t, acc.CheckValue(c.name, c.value)) + } +} + +const testOutput = `# Server +disque_version:0.0.1 +disque_git_sha1:b5247598 +disque_git_dirty:0 +disque_build_id:379fda78983a60c6 +os:Linux 3.13.0-44-generic x86_64 +arch_bits:64 +multiplexing_api:epoll +gcc_version:4.8.2 +process_id:32420 +run_id:1cfdfa4c6bc3f285182db5427522a8a4c16e42e4 +tcp_port:7711 +uptime_in_seconds:1452705 +uptime_in_days:16 +hz:10 +config_file:/usr/local/etc/disque/disque.conf + +# Clients +connected_clients:31 +client_longest_output_list:0 +client_biggest_input_buf:0 +blocked_clients:13 + +# Memory +used_memory:1840104 +used_memory_human:1.75M +used_memory_rss:3227648 +used_memory_peak:89603656 +used_memory_peak_human:85.45M +mem_fragmentation_ratio:1.75 +mem_allocator:jemalloc-3.6.0 + +# Jobs +registered_jobs:360 + +# Queues +registered_queues:12 + +# Persistence +loading:0 +aof_enabled:1 +aof_state:on +aof_rewrite_in_progress:0 +aof_rewrite_scheduled:0 +aof_last_rewrite_time_sec:0 +aof_current_rewrite_time_sec:-1 +aof_last_bgrewrite_status:ok +aof_last_write_status:ok +aof_current_size:41952430 +aof_base_size:9808 +aof_pending_rewrite:0 +aof_buffer_length:0 +aof_rewrite_buffer_length:0 +aof_pending_bio_fsync:0 +aof_delayed_fsync:1 + +# Stats +total_connections_received:5062777 +total_commands_processed:12308396 +instantaneous_ops_per_sec:18 +total_net_input_bytes:1346996528 +total_net_output_bytes:1967551763 +instantaneous_input_kbps:1.38 +instantaneous_output_kbps:1.78 +rejected_connections:0 +latest_fork_usec:1644 + +# CPU +used_cpu_sys:19585.73 +used_cpu_user:11255.96 +used_cpu_sys_children:1.75 +used_cpu_user_children:1.91 +`