diff --git a/plugins/all/all.go b/plugins/all/all.go index 977b980b8..37e7d27ea 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -28,6 +28,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/rethinkdb" _ "github.com/influxdb/telegraf/plugins/statsd" _ "github.com/influxdb/telegraf/plugins/system" + _ "github.com/influxdb/telegraf/plugins/twemproxy" _ "github.com/influxdb/telegraf/plugins/zfs" _ "github.com/influxdb/telegraf/plugins/zookeeper" ) diff --git a/plugins/twemproxy/twemproxy.go b/plugins/twemproxy/twemproxy.go new file mode 100644 index 000000000..ee794de73 --- /dev/null +++ b/plugins/twemproxy/twemproxy.go @@ -0,0 +1,182 @@ +package twemproxy + +import ( + "encoding/json" + "errors" + "io/ioutil" + "net" + "strings" + "sync" + "time" + + "github.com/influxdb/telegraf/plugins" +) + +type Twemproxy struct { + Instances []TwemproxyInstance +} + +type TwemproxyInstance struct { + StatsAddr string + Pools []string +} + +var sampleConfig = ` + # Twemproxy plugin config + [twemproxy] + [[twemproxy.instances]] + # Twemproxy stats address and port(NO scheme!) + statsAddr = "10.16.29.1:22222" + # Monitor pool name + pools = ["redis_pool", "mc_pool"] +` + +func (t *Twemproxy) SampleConfig() string { + return sampleConfig +} + +func (t *Twemproxy) Description() string { + return "Read Twemproxy stats data" +} + +// Gather data from all Twemproxy instances +func (t *Twemproxy) Gather(acc plugins.Accumulator) error { + var wg sync.WaitGroup + errorChan := make(chan error, len(t.Instances)) + for _, inst := range t.Instances { + wg.Add(1) + go func(inst TwemproxyInstance) { + defer wg.Done() + if err := inst.Gather(acc); err != nil { + errorChan <- err + } + }(inst) + } + wg.Wait() + + close(errorChan) + errs := []string{} + for err := range errorChan { + errs = append(errs, err.Error()) + } + if len(errs) == 0 { + return nil + } + return errors.New(strings.Join(errs, "\n")) +} + +// Gather data from one Twemproxy +func (ti *TwemproxyInstance) Gather( + acc plugins.Accumulator, +) error { + conn, err := net.DialTimeout("tcp", ti.StatsAddr, 1*time.Second) + if err != nil { + return err + } + body, err := ioutil.ReadAll(conn) + if err != nil { + return err + } + + var stats map[string]interface{} + if err = json.Unmarshal(body, &stats); err != nil { + return errors.New("Error decoding JSON response") + } + + tags := make(map[string]string) + tags["twemproxy"] = ti.StatsAddr + ti.processStat(acc, tags, stats) + + return nil +} + +// Process Twemproxy server stats +func (ti *TwemproxyInstance) processStat( + acc plugins.Accumulator, + tags map[string]string, + data map[string]interface{}, +) { + if source, ok := data["source"]; ok { + if val, ok := source.(string); ok { + tags["source"] = val + } + } + + metrics := []string{"total_connections", "curr_connections", "timestamp"} + for _, m := range metrics { + if value, ok := data[m]; ok { + if val, ok := value.(float64); ok { + acc.Add(m, val, tags) + } + } + } + + for _, pool := range ti.Pools { + if poolStat, ok := data[pool]; ok { + if data, ok := poolStat.(map[string]interface{}); ok { + poolTags := copyTags(tags) + poolTags["pool"] = pool + ti.processPool(acc, poolTags, pool+"_", data) + } + } + } +} + +// Process pool data in Twemproxy stats +func (ti *TwemproxyInstance) processPool( + acc plugins.Accumulator, + tags map[string]string, + prefix string, + data map[string]interface{}, +) { + serverTags := make(map[string]map[string]string) + + for key, value := range data { + switch key { + case "client_connections", "forward_error", "client_err", "server_ejects", "fragments", "client_eof": + if val, ok := value.(float64); ok { + acc.Add(prefix+key, val, tags) + } + default: + if data, ok := value.(map[string]interface{}); ok { + if _, ok := serverTags[key]; !ok { + serverTags[key] = copyTags(tags) + serverTags[key]["server"] = key + } + ti.processServer(acc, serverTags[key], prefix, data) + } + } + } +} + +// Process backend server(redis/memcached) stats +func (ti *TwemproxyInstance) processServer( + acc plugins.Accumulator, + tags map[string]string, + prefix string, + data map[string]interface{}, +) { + for key, value := range data { + switch key { + default: + if val, ok := value.(float64); ok { + acc.Add(prefix+key, val, tags) + } + } + } +} + +// Tags is not expected to be mutated after passing to Add. +func copyTags(tags map[string]string) map[string]string { + newTags := make(map[string]string) + for k, v := range tags { + newTags[k] = v + } + return newTags +} + +func init() { + plugins.Add("twemproxy", func() plugins.Plugin { + return &Twemproxy{} + }) +} diff --git a/plugins/twemproxy/twemproxy_test.go b/plugins/twemproxy/twemproxy_test.go new file mode 100644 index 000000000..887a770e4 --- /dev/null +++ b/plugins/twemproxy/twemproxy_test.go @@ -0,0 +1,134 @@ +package twemproxy + +import ( + "net" + "testing" + "encoding/json" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const sampleStatsAddr = "127.0.0.1:22222" + +const sampleStats = `{ + "total_connections": 276448, + "uptime": 160657, + "version": "0.4.1", + "service": "nutcracker", + "curr_connections": 1322, + "source": "server1.website.com", + "demo": { + "client_connections": 1305, + "forward_error": 11684, + "client_err": 147942, + "server_ejects": 0, + "fragments": 0, + "client_eof": 126813, + "10.16.29.1:6379": { + "requests": 43604566, + "server_eof": 0, + "out_queue": 0, + "server_err": 0, + "out_queue_bytes": 0, + "in_queue": 0, + "server_timedout": 24, + "request_bytes": 2775840400, + "server_connections": 1, + "response_bytes": 7663182096, + "in_queue_bytes": 0, + "server_ejected_at": 0, + "responses": 43603900 + }, + "10.16.29.2:6379": { + "requests": 37870211, + "server_eof": 0, + "out_queue": 0, + "server_err": 0, + "out_queue_bytes": 0, + "in_queue": 0, + "server_timedout": 25, + "request_bytes": 2412114759, + "server_connections": 1, + "response_bytes": 5228980582, + "in_queue_bytes": 0, + "server_ejected_at": 0, + "responses": 37869551 + } + }, + "timestamp": 1447312436 +}` + +func mockTwemproxyServer() (net.Listener, error) { + listener, err := net.Listen("tcp", sampleStatsAddr) + if err != nil { + return nil, err + } + go func(l net.Listener) { + for { + conn, _ := l.Accept() + conn.Write([]byte(sampleStats)) + conn.Close() + break + } + }(listener) + + return listener, nil +} + +func TestGather(t *testing.T) { + mockServer, err := mockTwemproxyServer() + if err != nil { + panic(err) + } + defer mockServer.Close() + + twemproxy := &Twemproxy{ + Instances: []TwemproxyInstance{ + TwemproxyInstance{ + StatsAddr: sampleStatsAddr, + Pools: []string{"demo"}, + }, + }, + } + + var acc testutil.Accumulator + err = twemproxy.Instances[0].Gather(&acc) + require.NoError(t, err) + + var sourceData map[string]interface{} + if err := json.Unmarshal([]byte(sampleStats), &sourceData); err != nil { + panic(err) + } + + metrics := []string{"total_connections", "curr_connections", "timestamp"} + tags := map[string]string{ + "twemproxy": sampleStatsAddr, + "source": sourceData["source"].(string), + } + for _, m := range metrics { + assert.NoError(t, acc.ValidateTaggedValue(m, sourceData[m].(float64), tags)) + } + + poolName := "demo" + poolMetrics := []string{ + "client_connections", "forward_error", "client_err", "server_ejects", + "fragments", "client_eof", + } + tags["pool"] = poolName + poolData := sourceData[poolName].(map[string]interface{}) + for _, m := range poolMetrics { + measurement := poolName + "_" + m + assert.NoError(t, acc.ValidateTaggedValue(measurement, poolData[m].(float64), tags)) + } + poolServers := []string{"10.16.29.1:6379", "10.16.29.2:6379"} + for _, s := range poolServers { + tags["server"] = s + serverData := poolData[s].(map[string]interface{}) + for k, v := range serverData { + measurement := poolName + "_" + k + assert.NoError(t, acc.ValidateTaggedValue(measurement, v, tags)) + } + } +}