151 lines
3.2 KiB
Go
151 lines
3.2 KiB
Go
package twemproxy
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"io/ioutil"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf/plugins"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
)
|
|
|
|
type Twemproxy struct {
|
|
Addr string
|
|
Pools []string
|
|
}
|
|
|
|
var sampleConfig = `
|
|
## Twemproxy stats address and port (no scheme)
|
|
addr = "localhost:22222"
|
|
## Monitor pool name
|
|
pools = ["redis_pool", "mc_pool"]
|
|
`
|
|
|
|
func (t *Twemproxy) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
func (t *Twemproxy) Description() string {
|
|
return "Read Twemproxy stats data"
|
|
}
|
|
|
|
// Gather data from all Twemproxy instances
|
|
func (t *Twemproxy) Gather(acc plugins.Accumulator) error {
|
|
conn, err := net.DialTimeout("tcp", t.Addr, 1*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
body, err := ioutil.ReadAll(conn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var stats map[string]interface{}
|
|
if err = json.Unmarshal(body, &stats); err != nil {
|
|
return errors.New("Error decoding JSON response")
|
|
}
|
|
|
|
tags := make(map[string]string)
|
|
tags["twemproxy"] = t.Addr
|
|
t.processStat(acc, tags, stats)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Process Twemproxy server stats
|
|
func (t *Twemproxy) processStat(
|
|
acc plugins.Accumulator,
|
|
tags map[string]string,
|
|
data map[string]interface{},
|
|
) {
|
|
if source, ok := data["source"]; ok {
|
|
if val, ok := source.(string); ok {
|
|
tags["source"] = val
|
|
}
|
|
}
|
|
|
|
fields := make(map[string]interface{})
|
|
metrics := []string{"total_connections", "curr_connections", "timestamp"}
|
|
for _, m := range metrics {
|
|
if value, ok := data[m]; ok {
|
|
if val, ok := value.(float64); ok {
|
|
fields[m] = val
|
|
}
|
|
}
|
|
}
|
|
acc.AddFields("twemproxy", fields, tags)
|
|
|
|
for _, pool := range t.Pools {
|
|
if poolStat, ok := data[pool]; ok {
|
|
if data, ok := poolStat.(map[string]interface{}); ok {
|
|
poolTags := copyTags(tags)
|
|
poolTags["pool"] = pool
|
|
t.processPool(acc, poolTags, data)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process pool data in Twemproxy stats
|
|
func (t *Twemproxy) processPool(
|
|
acc plugins.Accumulator,
|
|
tags map[string]string,
|
|
data map[string]interface{},
|
|
) {
|
|
serverTags := make(map[string]map[string]string)
|
|
|
|
fields := make(map[string]interface{})
|
|
for key, value := range data {
|
|
switch key {
|
|
case "client_connections", "forward_error", "client_err", "server_ejects", "fragments", "client_eof":
|
|
if val, ok := value.(float64); ok {
|
|
fields[key] = val
|
|
}
|
|
default:
|
|
if data, ok := value.(map[string]interface{}); ok {
|
|
if _, ok := serverTags[key]; !ok {
|
|
serverTags[key] = copyTags(tags)
|
|
serverTags[key]["server"] = key
|
|
}
|
|
t.processServer(acc, serverTags[key], data)
|
|
}
|
|
}
|
|
}
|
|
acc.AddFields("twemproxy_pool", fields, tags)
|
|
}
|
|
|
|
// Process backend server(redis/memcached) stats
|
|
func (t *Twemproxy) processServer(
|
|
acc plugins.Accumulator,
|
|
tags map[string]string,
|
|
data map[string]interface{},
|
|
) {
|
|
fields := make(map[string]interface{})
|
|
for key, value := range data {
|
|
switch key {
|
|
default:
|
|
if val, ok := value.(float64); ok {
|
|
fields[key] = val
|
|
}
|
|
}
|
|
}
|
|
acc.AddFields("twemproxy_pool_server", fields, tags)
|
|
}
|
|
|
|
// Tags is not expected to be mutated after passing to Add.
|
|
func copyTags(tags map[string]string) map[string]string {
|
|
newTags := make(map[string]string)
|
|
for k, v := range tags {
|
|
newTags[k] = v
|
|
}
|
|
return newTags
|
|
}
|
|
|
|
func init() {
|
|
inputs.Add("twemproxy", func() plugins.Input {
|
|
return &Twemproxy{}
|
|
})
|
|
}
|