From 49c1bf3ef6fa0d20c20b0a521cef816e6cc0b673 Mon Sep 17 00:00:00 2001 From: Codeb Fan Date: Fri, 13 Nov 2015 00:16:49 +0800 Subject: [PATCH 1/5] Add plugin for Twemproxy This plugin collects data from Twemproxy's stats interface --- plugins/all/all.go | 1 + plugins/twemproxy/twemproxy.go | 182 ++++++++++++++++++++++++++++ plugins/twemproxy/twemproxy_test.go | 134 ++++++++++++++++++++ 3 files changed, 317 insertions(+) create mode 100644 plugins/twemproxy/twemproxy.go create mode 100644 plugins/twemproxy/twemproxy_test.go diff --git a/plugins/all/all.go b/plugins/all/all.go index 977b980b8..37e7d27ea 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -28,6 +28,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/rethinkdb" _ "github.com/influxdb/telegraf/plugins/statsd" _ "github.com/influxdb/telegraf/plugins/system" + _ "github.com/influxdb/telegraf/plugins/twemproxy" _ "github.com/influxdb/telegraf/plugins/zfs" _ "github.com/influxdb/telegraf/plugins/zookeeper" ) diff --git a/plugins/twemproxy/twemproxy.go b/plugins/twemproxy/twemproxy.go new file mode 100644 index 000000000..ee794de73 --- /dev/null +++ b/plugins/twemproxy/twemproxy.go @@ -0,0 +1,182 @@ +package twemproxy + +import ( + "encoding/json" + "errors" + "io/ioutil" + "net" + "strings" + "sync" + "time" + + "github.com/influxdb/telegraf/plugins" +) + +type Twemproxy struct { + Instances []TwemproxyInstance +} + +type TwemproxyInstance struct { + StatsAddr string + Pools []string +} + +var sampleConfig = ` + # Twemproxy plugin config + [twemproxy] + [[twemproxy.instances]] + # Twemproxy stats address and port(NO scheme!) + statsAddr = "10.16.29.1:22222" + # Monitor pool name + pools = ["redis_pool", "mc_pool"] +` + +func (t *Twemproxy) SampleConfig() string { + return sampleConfig +} + +func (t *Twemproxy) Description() string { + return "Read Twemproxy stats data" +} + +// Gather data from all Twemproxy instances +func (t *Twemproxy) Gather(acc plugins.Accumulator) error { + var wg sync.WaitGroup + errorChan := make(chan error, len(t.Instances)) + for _, inst := range t.Instances { + wg.Add(1) + go func(inst TwemproxyInstance) { + defer wg.Done() + if err := inst.Gather(acc); err != nil { + errorChan <- err + } + }(inst) + } + wg.Wait() + + close(errorChan) + errs := []string{} + for err := range errorChan { + errs = append(errs, err.Error()) + } + if len(errs) == 0 { + return nil + } + return errors.New(strings.Join(errs, "\n")) +} + +// Gather data from one Twemproxy +func (ti *TwemproxyInstance) Gather( + acc plugins.Accumulator, +) error { + conn, err := net.DialTimeout("tcp", ti.StatsAddr, 1*time.Second) + if err != nil { + return err + } + body, err := ioutil.ReadAll(conn) + if err != nil { + return err + } + + var stats map[string]interface{} + if err = json.Unmarshal(body, &stats); err != nil { + return errors.New("Error decoding JSON response") + } + + tags := make(map[string]string) + tags["twemproxy"] = ti.StatsAddr + ti.processStat(acc, tags, stats) + + return nil +} + +// Process Twemproxy server stats +func (ti *TwemproxyInstance) processStat( + acc plugins.Accumulator, + tags map[string]string, + data map[string]interface{}, +) { + if source, ok := data["source"]; ok { + if val, ok := source.(string); ok { + tags["source"] = val + } + } + + metrics := []string{"total_connections", "curr_connections", "timestamp"} + for _, m := range metrics { + if value, ok := data[m]; ok { + if val, ok := value.(float64); ok { + acc.Add(m, val, tags) + } + } + } + + for _, pool := range ti.Pools { + if poolStat, ok := data[pool]; ok { + if data, ok := poolStat.(map[string]interface{}); ok { + poolTags := copyTags(tags) + poolTags["pool"] = pool + ti.processPool(acc, poolTags, pool+"_", data) + } + } + } +} + +// Process pool data in Twemproxy stats +func (ti *TwemproxyInstance) processPool( + acc plugins.Accumulator, + tags map[string]string, + prefix string, + data map[string]interface{}, +) { + serverTags := make(map[string]map[string]string) + + for key, value := range data { + switch key { + case "client_connections", "forward_error", "client_err", "server_ejects", "fragments", "client_eof": + if val, ok := value.(float64); ok { + acc.Add(prefix+key, val, tags) + } + default: + if data, ok := value.(map[string]interface{}); ok { + if _, ok := serverTags[key]; !ok { + serverTags[key] = copyTags(tags) + serverTags[key]["server"] = key + } + ti.processServer(acc, serverTags[key], prefix, data) + } + } + } +} + +// Process backend server(redis/memcached) stats +func (ti *TwemproxyInstance) processServer( + acc plugins.Accumulator, + tags map[string]string, + prefix string, + data map[string]interface{}, +) { + for key, value := range data { + switch key { + default: + if val, ok := value.(float64); ok { + acc.Add(prefix+key, val, tags) + } + } + } +} + +// Tags is not expected to be mutated after passing to Add. +func copyTags(tags map[string]string) map[string]string { + newTags := make(map[string]string) + for k, v := range tags { + newTags[k] = v + } + return newTags +} + +func init() { + plugins.Add("twemproxy", func() plugins.Plugin { + return &Twemproxy{} + }) +} diff --git a/plugins/twemproxy/twemproxy_test.go b/plugins/twemproxy/twemproxy_test.go new file mode 100644 index 000000000..887a770e4 --- /dev/null +++ b/plugins/twemproxy/twemproxy_test.go @@ -0,0 +1,134 @@ +package twemproxy + +import ( + "net" + "testing" + "encoding/json" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const sampleStatsAddr = "127.0.0.1:22222" + +const sampleStats = `{ + "total_connections": 276448, + "uptime": 160657, + "version": "0.4.1", + "service": "nutcracker", + "curr_connections": 1322, + "source": "server1.website.com", + "demo": { + "client_connections": 1305, + "forward_error": 11684, + "client_err": 147942, + "server_ejects": 0, + "fragments": 0, + "client_eof": 126813, + "10.16.29.1:6379": { + "requests": 43604566, + "server_eof": 0, + "out_queue": 0, + "server_err": 0, + "out_queue_bytes": 0, + "in_queue": 0, + "server_timedout": 24, + "request_bytes": 2775840400, + "server_connections": 1, + "response_bytes": 7663182096, + "in_queue_bytes": 0, + "server_ejected_at": 0, + "responses": 43603900 + }, + "10.16.29.2:6379": { + "requests": 37870211, + "server_eof": 0, + "out_queue": 0, + "server_err": 0, + "out_queue_bytes": 0, + "in_queue": 0, + "server_timedout": 25, + "request_bytes": 2412114759, + "server_connections": 1, + "response_bytes": 5228980582, + "in_queue_bytes": 0, + "server_ejected_at": 0, + "responses": 37869551 + } + }, + "timestamp": 1447312436 +}` + +func mockTwemproxyServer() (net.Listener, error) { + listener, err := net.Listen("tcp", sampleStatsAddr) + if err != nil { + return nil, err + } + go func(l net.Listener) { + for { + conn, _ := l.Accept() + conn.Write([]byte(sampleStats)) + conn.Close() + break + } + }(listener) + + return listener, nil +} + +func TestGather(t *testing.T) { + mockServer, err := mockTwemproxyServer() + if err != nil { + panic(err) + } + defer mockServer.Close() + + twemproxy := &Twemproxy{ + Instances: []TwemproxyInstance{ + TwemproxyInstance{ + StatsAddr: sampleStatsAddr, + Pools: []string{"demo"}, + }, + }, + } + + var acc testutil.Accumulator + err = twemproxy.Instances[0].Gather(&acc) + require.NoError(t, err) + + var sourceData map[string]interface{} + if err := json.Unmarshal([]byte(sampleStats), &sourceData); err != nil { + panic(err) + } + + metrics := []string{"total_connections", "curr_connections", "timestamp"} + tags := map[string]string{ + "twemproxy": sampleStatsAddr, + "source": sourceData["source"].(string), + } + for _, m := range metrics { + assert.NoError(t, acc.ValidateTaggedValue(m, sourceData[m].(float64), tags)) + } + + poolName := "demo" + poolMetrics := []string{ + "client_connections", "forward_error", "client_err", "server_ejects", + "fragments", "client_eof", + } + tags["pool"] = poolName + poolData := sourceData[poolName].(map[string]interface{}) + for _, m := range poolMetrics { + measurement := poolName + "_" + m + assert.NoError(t, acc.ValidateTaggedValue(measurement, poolData[m].(float64), tags)) + } + poolServers := []string{"10.16.29.1:6379", "10.16.29.2:6379"} + for _, s := range poolServers { + tags["server"] = s + serverData := poolData[s].(map[string]interface{}) + for k, v := range serverData { + measurement := poolName + "_" + k + assert.NoError(t, acc.ValidateTaggedValue(measurement, v, tags)) + } + } +} From 2c01aada2da4afd9b65a6f12589e7c2b956b1c62 Mon Sep 17 00:00:00 2001 From: Codeb Fan Date: Fri, 13 Nov 2015 00:25:42 +0800 Subject: [PATCH 2/5] gofmt twemproxy_test.go --- plugins/twemproxy/twemproxy_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/twemproxy/twemproxy_test.go b/plugins/twemproxy/twemproxy_test.go index 887a770e4..d89ccab9e 100644 --- a/plugins/twemproxy/twemproxy_test.go +++ b/plugins/twemproxy/twemproxy_test.go @@ -1,9 +1,9 @@ package twemproxy import ( + "encoding/json" "net" "testing" - "encoding/json" "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/assert" From 7e4c69f923268f485487658fe3492eb5fa89177b Mon Sep 17 00:00:00 2001 From: Codeb Fan Date: Fri, 13 Nov 2015 00:38:45 +0800 Subject: [PATCH 3/5] Rename `TwemproxyInstance.StatsAddr` to `TwemproxyInstance.Addr` --- plugins/twemproxy/twemproxy.go | 10 +++++----- plugins/twemproxy/twemproxy_test.go | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/plugins/twemproxy/twemproxy.go b/plugins/twemproxy/twemproxy.go index ee794de73..743daa190 100644 --- a/plugins/twemproxy/twemproxy.go +++ b/plugins/twemproxy/twemproxy.go @@ -17,8 +17,8 @@ type Twemproxy struct { } type TwemproxyInstance struct { - StatsAddr string - Pools []string + Addr string + Pools []string } var sampleConfig = ` @@ -26,7 +26,7 @@ var sampleConfig = ` [twemproxy] [[twemproxy.instances]] # Twemproxy stats address and port(NO scheme!) - statsAddr = "10.16.29.1:22222" + addr = "10.16.29.1:22222" # Monitor pool name pools = ["redis_pool", "mc_pool"] ` @@ -69,7 +69,7 @@ func (t *Twemproxy) Gather(acc plugins.Accumulator) error { func (ti *TwemproxyInstance) Gather( acc plugins.Accumulator, ) error { - conn, err := net.DialTimeout("tcp", ti.StatsAddr, 1*time.Second) + conn, err := net.DialTimeout("tcp", ti.Addr, 1*time.Second) if err != nil { return err } @@ -84,7 +84,7 @@ func (ti *TwemproxyInstance) Gather( } tags := make(map[string]string) - tags["twemproxy"] = ti.StatsAddr + tags["twemproxy"] = ti.Addr ti.processStat(acc, tags, stats) return nil diff --git a/plugins/twemproxy/twemproxy_test.go b/plugins/twemproxy/twemproxy_test.go index d89ccab9e..c941cc197 100644 --- a/plugins/twemproxy/twemproxy_test.go +++ b/plugins/twemproxy/twemproxy_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" ) -const sampleStatsAddr = "127.0.0.1:22222" +const sampleAddr = "127.0.0.1:22222" const sampleStats = `{ "total_connections": 276448, @@ -61,7 +61,7 @@ const sampleStats = `{ }` func mockTwemproxyServer() (net.Listener, error) { - listener, err := net.Listen("tcp", sampleStatsAddr) + listener, err := net.Listen("tcp", sampleAddr) if err != nil { return nil, err } @@ -87,8 +87,8 @@ func TestGather(t *testing.T) { twemproxy := &Twemproxy{ Instances: []TwemproxyInstance{ TwemproxyInstance{ - StatsAddr: sampleStatsAddr, - Pools: []string{"demo"}, + Addr: sampleAddr, + Pools: []string{"demo"}, }, }, } @@ -104,7 +104,7 @@ func TestGather(t *testing.T) { metrics := []string{"total_connections", "curr_connections", "timestamp"} tags := map[string]string{ - "twemproxy": sampleStatsAddr, + "twemproxy": sampleAddr, "source": sourceData["source"].(string), } for _, m := range metrics { From 5be3dd39b03af16f68dcf197fffe96ed14014b47 Mon Sep 17 00:00:00 2001 From: Codeb Fan Date: Fri, 13 Nov 2015 00:57:43 +0800 Subject: [PATCH 4/5] Fix dumplicated table in sample config --- plugins/twemproxy/twemproxy.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/plugins/twemproxy/twemproxy.go b/plugins/twemproxy/twemproxy.go index 743daa190..718da1185 100644 --- a/plugins/twemproxy/twemproxy.go +++ b/plugins/twemproxy/twemproxy.go @@ -22,13 +22,11 @@ type TwemproxyInstance struct { } var sampleConfig = ` - # Twemproxy plugin config - [twemproxy] - [[twemproxy.instances]] - # Twemproxy stats address and port(NO scheme!) - addr = "10.16.29.1:22222" - # Monitor pool name - pools = ["redis_pool", "mc_pool"] + [[twemproxy.instances]] + # Twemproxy stats address and port (no scheme) + addr = "10.16.29.1:22222" + # Monitor pool name + pools = ["redis_pool", "mc_pool"] ` func (t *Twemproxy) SampleConfig() string { From f7cea58fb83b211487998147ae63178bde6accb3 Mon Sep 17 00:00:00 2001 From: Codeb Fan Date: Fri, 13 Nov 2015 11:14:44 +0800 Subject: [PATCH 5/5] Update README and CHANGELOG --- CHANGELOG.md | 1 + README.md | 1 + 2 files changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c2142857..400dd69e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ changed to just run docker commands in the Makefile. See `make docker-run` and - [#338](https://github.com/influxdb/telegraf/pull/338): Restart Telegraf on package upgrade. Thanks @linsomniac! - [#337](https://github.com/influxdb/telegraf/pull/337): Jolokia plugin, thanks @saiello! - [#350](https://github.com/influxdb/telegraf/pull/350): Amon output. +- [#365](https://github.com/influxdb/telegraf/pull/365): Twemproxy plugin by @codeb2cc - [#317](https://github.com/influxdb/telegraf/issues/317): ZFS plugin, thanks @cornerot! ### Bugfixes diff --git a/README.md b/README.md index 69378adbb..e25542de8 100644 --- a/README.md +++ b/README.md @@ -190,6 +190,7 @@ Telegraf currently has support for collecting metrics from: * rabbitmq * redis * rethinkdb +* twemproxy * zfs * zookeeper * system