Merge branch 'gfloyd-disque-plugin'
This commit is contained in:
commit
8b491a46f3
|
@ -1,6 +1,7 @@
|
||||||
package all
|
package all
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
_ "github.com/influxdb/telegraf/plugins/disque"
|
||||||
_ "github.com/influxdb/telegraf/plugins/elasticsearch"
|
_ "github.com/influxdb/telegraf/plugins/elasticsearch"
|
||||||
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
|
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
|
||||||
_ "github.com/influxdb/telegraf/plugins/lustre2"
|
_ "github.com/influxdb/telegraf/plugins/lustre2"
|
||||||
|
|
|
@ -0,0 +1,202 @@
|
||||||
|
package disque
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/plugins"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Disque struct {
|
||||||
|
Servers []string
|
||||||
|
|
||||||
|
c net.Conn
|
||||||
|
buf []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
# An array of URI to gather stats about. Specify an ip or hostname
|
||||||
|
# with optional port and password. ie disque://localhost, disque://10.10.3.33:18832,
|
||||||
|
# 10.0.0.1:10000, etc.
|
||||||
|
#
|
||||||
|
# If no servers are specified, then localhost is used as the host.
|
||||||
|
servers = ["localhost"]`
|
||||||
|
|
||||||
|
func (r *Disque) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Disque) Description() string {
|
||||||
|
return "Read metrics from one or many disque servers"
|
||||||
|
}
|
||||||
|
|
||||||
|
var Tracking = map[string]string{
|
||||||
|
"uptime_in_seconds": "uptime",
|
||||||
|
"connected_clients": "clients",
|
||||||
|
"blocked_clients": "blocked_clients",
|
||||||
|
"used_memory": "used_memory",
|
||||||
|
"used_memory_rss": "used_memory_rss",
|
||||||
|
"used_memory_peak": "used_memory_peak",
|
||||||
|
"total_connections_received": "total_connections_received",
|
||||||
|
"total_commands_processed": "total_commands_processed",
|
||||||
|
"instantaneous_ops_per_sec": "instantaneous_ops_per_sec",
|
||||||
|
"latest_fork_usec": "latest_fork_usec",
|
||||||
|
"mem_fragmentation_ratio": "mem_fragmentation_ratio",
|
||||||
|
"used_cpu_sys": "used_cpu_sys",
|
||||||
|
"used_cpu_user": "used_cpu_user",
|
||||||
|
"used_cpu_sys_children": "used_cpu_sys_children",
|
||||||
|
"used_cpu_user_children": "used_cpu_user_children",
|
||||||
|
"registered_jobs": "registered_jobs",
|
||||||
|
"registered_queues": "registered_queues",
|
||||||
|
}
|
||||||
|
|
||||||
|
var ErrProtocolError = errors.New("disque protocol error")
|
||||||
|
|
||||||
|
// Reads stats from all configured servers accumulates stats.
|
||||||
|
// Returns one of the errors encountered while gather stats (if any).
|
||||||
|
func (g *Disque) Gather(acc plugins.Accumulator) error {
|
||||||
|
if len(g.Servers) == 0 {
|
||||||
|
url := &url.URL{
|
||||||
|
Host: ":7711",
|
||||||
|
}
|
||||||
|
g.gatherServer(url, acc)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
var outerr error
|
||||||
|
|
||||||
|
for _, serv := range g.Servers {
|
||||||
|
u, err := url.Parse(serv)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
|
||||||
|
} else if u.Scheme == "" {
|
||||||
|
// fallback to simple string based address (i.e. "10.0.0.1:10000")
|
||||||
|
u.Scheme = "tcp"
|
||||||
|
u.Host = serv
|
||||||
|
u.Path = ""
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func(serv string) {
|
||||||
|
defer wg.Done()
|
||||||
|
outerr = g.gatherServer(u, acc)
|
||||||
|
}(serv)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return outerr
|
||||||
|
}
|
||||||
|
|
||||||
|
const defaultPort = "7711"
|
||||||
|
|
||||||
|
func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
|
||||||
|
if g.c == nil {
|
||||||
|
|
||||||
|
_, _, err := net.SplitHostPort(addr.Host)
|
||||||
|
if err != nil {
|
||||||
|
addr.Host = addr.Host + ":" + defaultPort
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := net.Dial("tcp", addr.Host)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Unable to connect to disque server '%s': %s", addr.Host, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if addr.User != nil {
|
||||||
|
pwd, set := addr.User.Password()
|
||||||
|
if set && pwd != "" {
|
||||||
|
c.Write([]byte(fmt.Sprintf("AUTH %s\r\n", pwd)))
|
||||||
|
|
||||||
|
r := bufio.NewReader(c)
|
||||||
|
|
||||||
|
line, err := r.ReadString('\n')
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if line[0] != '+' {
|
||||||
|
return fmt.Errorf("%s", strings.TrimSpace(line)[1:])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
g.c = c
|
||||||
|
}
|
||||||
|
|
||||||
|
g.c.Write([]byte("info\r\n"))
|
||||||
|
|
||||||
|
r := bufio.NewReader(g.c)
|
||||||
|
|
||||||
|
line, err := r.ReadString('\n')
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if line[0] != '$' {
|
||||||
|
return fmt.Errorf("bad line start: %s", ErrProtocolError)
|
||||||
|
}
|
||||||
|
|
||||||
|
line = strings.TrimSpace(line)
|
||||||
|
|
||||||
|
szStr := line[1:]
|
||||||
|
|
||||||
|
sz, err := strconv.Atoi(szStr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("bad size string <<%s>>: %s", szStr, ErrProtocolError)
|
||||||
|
}
|
||||||
|
|
||||||
|
var read int
|
||||||
|
|
||||||
|
for read < sz {
|
||||||
|
line, err := r.ReadString('\n')
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
read += len(line)
|
||||||
|
|
||||||
|
if len(line) == 1 || line[0] == '#' {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
parts := strings.SplitN(line, ":", 2)
|
||||||
|
|
||||||
|
name := string(parts[0])
|
||||||
|
|
||||||
|
metric, ok := Tracking[name]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := map[string]string{"host": addr.String()}
|
||||||
|
val := strings.TrimSpace(parts[1])
|
||||||
|
|
||||||
|
ival, err := strconv.ParseUint(val, 10, 64)
|
||||||
|
if err == nil {
|
||||||
|
acc.Add(metric, ival, tags)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fval, err := strconv.ParseFloat(val, 64)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.Add(metric, fval, tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
plugins.Add("disque", func() plugins.Plugin {
|
||||||
|
return &Disque{}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,242 @@
|
||||||
|
package disque
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDisqueGeneratesMetrics(t *testing.T) {
|
||||||
|
l, err := net.Listen("tcp", ":0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer l.Close()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
c, err := l.Accept()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := bufio.NewReader(c)
|
||||||
|
|
||||||
|
for {
|
||||||
|
line, err := buf.ReadString('\n')
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if line != "info\r\n" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(c, "$%d\n", len(testOutput))
|
||||||
|
c.Write([]byte(testOutput))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
addr := fmt.Sprintf("disque://%s", l.Addr().String())
|
||||||
|
|
||||||
|
r := &Disque{
|
||||||
|
Servers: []string{addr},
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
err = r.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
checkInt := []struct {
|
||||||
|
name string
|
||||||
|
value uint64
|
||||||
|
}{
|
||||||
|
{"uptime", 1452705},
|
||||||
|
{"clients", 31},
|
||||||
|
{"blocked_clients", 13},
|
||||||
|
{"used_memory", 1840104},
|
||||||
|
{"used_memory_rss", 3227648},
|
||||||
|
{"used_memory_peak", 89603656},
|
||||||
|
{"total_connections_received", 5062777},
|
||||||
|
{"total_commands_processed", 12308396},
|
||||||
|
{"instantaneous_ops_per_sec", 18},
|
||||||
|
{"latest_fork_usec", 1644},
|
||||||
|
{"registered_jobs", 360},
|
||||||
|
{"registered_queues", 12},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range checkInt {
|
||||||
|
assert.True(t, acc.CheckValue(c.name, c.value))
|
||||||
|
}
|
||||||
|
|
||||||
|
checkFloat := []struct {
|
||||||
|
name string
|
||||||
|
value float64
|
||||||
|
}{
|
||||||
|
{"mem_fragmentation_ratio", 1.75},
|
||||||
|
{"used_cpu_sys", 19585.73},
|
||||||
|
{"used_cpu_user", 11255.96},
|
||||||
|
{"used_cpu_sys_children", 1.75},
|
||||||
|
{"used_cpu_user_children", 1.91},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range checkFloat {
|
||||||
|
assert.True(t, acc.CheckValue(c.name, c.value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDisqueCanPullStatsFromMultipleServers(t *testing.T) {
|
||||||
|
l, err := net.Listen("tcp", ":0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer l.Close()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
c, err := l.Accept()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := bufio.NewReader(c)
|
||||||
|
|
||||||
|
for {
|
||||||
|
line, err := buf.ReadString('\n')
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if line != "info\r\n" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(c, "$%d\n", len(testOutput))
|
||||||
|
c.Write([]byte(testOutput))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
addr := fmt.Sprintf("disque://%s", l.Addr().String())
|
||||||
|
|
||||||
|
r := &Disque{
|
||||||
|
Servers: []string{addr},
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
err = r.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
checkInt := []struct {
|
||||||
|
name string
|
||||||
|
value uint64
|
||||||
|
}{
|
||||||
|
{"uptime", 1452705},
|
||||||
|
{"clients", 31},
|
||||||
|
{"blocked_clients", 13},
|
||||||
|
{"used_memory", 1840104},
|
||||||
|
{"used_memory_rss", 3227648},
|
||||||
|
{"used_memory_peak", 89603656},
|
||||||
|
{"total_connections_received", 5062777},
|
||||||
|
{"total_commands_processed", 12308396},
|
||||||
|
{"instantaneous_ops_per_sec", 18},
|
||||||
|
{"latest_fork_usec", 1644},
|
||||||
|
{"registered_jobs", 360},
|
||||||
|
{"registered_queues", 12},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range checkInt {
|
||||||
|
assert.True(t, acc.CheckValue(c.name, c.value))
|
||||||
|
}
|
||||||
|
|
||||||
|
checkFloat := []struct {
|
||||||
|
name string
|
||||||
|
value float64
|
||||||
|
}{
|
||||||
|
{"mem_fragmentation_ratio", 1.75},
|
||||||
|
{"used_cpu_sys", 19585.73},
|
||||||
|
{"used_cpu_user", 11255.96},
|
||||||
|
{"used_cpu_sys_children", 1.75},
|
||||||
|
{"used_cpu_user_children", 1.91},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range checkFloat {
|
||||||
|
assert.True(t, acc.CheckValue(c.name, c.value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const testOutput = `# Server
|
||||||
|
disque_version:0.0.1
|
||||||
|
disque_git_sha1:b5247598
|
||||||
|
disque_git_dirty:0
|
||||||
|
disque_build_id:379fda78983a60c6
|
||||||
|
os:Linux 3.13.0-44-generic x86_64
|
||||||
|
arch_bits:64
|
||||||
|
multiplexing_api:epoll
|
||||||
|
gcc_version:4.8.2
|
||||||
|
process_id:32420
|
||||||
|
run_id:1cfdfa4c6bc3f285182db5427522a8a4c16e42e4
|
||||||
|
tcp_port:7711
|
||||||
|
uptime_in_seconds:1452705
|
||||||
|
uptime_in_days:16
|
||||||
|
hz:10
|
||||||
|
config_file:/usr/local/etc/disque/disque.conf
|
||||||
|
|
||||||
|
# Clients
|
||||||
|
connected_clients:31
|
||||||
|
client_longest_output_list:0
|
||||||
|
client_biggest_input_buf:0
|
||||||
|
blocked_clients:13
|
||||||
|
|
||||||
|
# Memory
|
||||||
|
used_memory:1840104
|
||||||
|
used_memory_human:1.75M
|
||||||
|
used_memory_rss:3227648
|
||||||
|
used_memory_peak:89603656
|
||||||
|
used_memory_peak_human:85.45M
|
||||||
|
mem_fragmentation_ratio:1.75
|
||||||
|
mem_allocator:jemalloc-3.6.0
|
||||||
|
|
||||||
|
# Jobs
|
||||||
|
registered_jobs:360
|
||||||
|
|
||||||
|
# Queues
|
||||||
|
registered_queues:12
|
||||||
|
|
||||||
|
# Persistence
|
||||||
|
loading:0
|
||||||
|
aof_enabled:1
|
||||||
|
aof_state:on
|
||||||
|
aof_rewrite_in_progress:0
|
||||||
|
aof_rewrite_scheduled:0
|
||||||
|
aof_last_rewrite_time_sec:0
|
||||||
|
aof_current_rewrite_time_sec:-1
|
||||||
|
aof_last_bgrewrite_status:ok
|
||||||
|
aof_last_write_status:ok
|
||||||
|
aof_current_size:41952430
|
||||||
|
aof_base_size:9808
|
||||||
|
aof_pending_rewrite:0
|
||||||
|
aof_buffer_length:0
|
||||||
|
aof_rewrite_buffer_length:0
|
||||||
|
aof_pending_bio_fsync:0
|
||||||
|
aof_delayed_fsync:1
|
||||||
|
|
||||||
|
# Stats
|
||||||
|
total_connections_received:5062777
|
||||||
|
total_commands_processed:12308396
|
||||||
|
instantaneous_ops_per_sec:18
|
||||||
|
total_net_input_bytes:1346996528
|
||||||
|
total_net_output_bytes:1967551763
|
||||||
|
instantaneous_input_kbps:1.38
|
||||||
|
instantaneous_output_kbps:1.78
|
||||||
|
rejected_connections:0
|
||||||
|
latest_fork_usec:1644
|
||||||
|
|
||||||
|
# CPU
|
||||||
|
used_cpu_sys:19585.73
|
||||||
|
used_cpu_user:11255.96
|
||||||
|
used_cpu_sys_children:1.75
|
||||||
|
used_cpu_user_children:1.91
|
||||||
|
`
|
Loading…
Reference in New Issue