diff --git a/CHANGELOG.md b/CHANGELOG.md index 18a68ebf7..080ce5870 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,10 +6,17 @@ - [#475](https://github.com/influxdata/telegraf/pull/475): Add response time to httpjson plugin. Thanks @titilambert! - [#519](https://github.com/influxdata/telegraf/pull/519): Added a sensors input based on lm-sensors. Thanks @md14454! - [#467](https://github.com/influxdata/telegraf/issues/467): Add option to disable statsd measurement name conversion. +- [#534](https://github.com/influxdata/telegraf/pull/534): NSQ input plugin. Thanks @allingeek! +- [#494](https://github.com/influxdata/telegraf/pull/494): Graphite output plugin. Thanks @titilambert! +- AMQP SSL support. Thanks @ekini! +- [#539](https://github.com/influxdata/telegraf/pull/539): Reload config on SIGHUP. Thanks @titilambert! +- [#522](https://github.com/influxdata/telegraf/pull/522): Phusion passenger input plugin. Thanks @kureikain! +- [#541](https://github.com/influxdata/telegraf/pull/541): Kafka output TLS cert support. Thanks @Ormod! ### Bugfixes - [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert! - [#508](https://github.com/influxdb/telegraf/pull/508): Fix prometheus cardinality issue with the `net` plugin +- [#499](https://github.com/influxdata/telegraf/issues/499) & [#502](https://github.com/influxdata/telegraf/issues/502): php fpm unix socket and other fixes, thanks @kureikain! ## v0.10.0 [2016-01-12] diff --git a/Godeps b/Godeps index 1b427674a..0b8b9ceb1 100644 --- a/Godeps +++ b/Godeps @@ -46,6 +46,7 @@ github.com/wvanbergen/kafka 1a8639a45164fcc245d5c7b4bd3ccfbd1a0ffbf3 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 golang.org/x/crypto 3760e016850398b85094c4c99e955b8c3dea5711 golang.org/x/net 99ca920b6037ef77af8a11297150f7f0d8f4ef80 +golang.org/x/text cf4986612c83df6c55578ba198316d1684a9a287 gopkg.in/dancannon/gorethink.v1 e2cef022d0495329dfb0635991de76efcab5cf50 gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 gopkg.in/mgo.v2 e30de8ac9ae3b30df7065f766c71f88bba7d4e49 diff --git a/README.md b/README.md index 6b723787e..840fb5e72 100644 --- a/README.md +++ b/README.md @@ -152,7 +152,9 @@ Currently implemented sources: * mongodb * mysql * nginx +* nsq * phpfpm +* phusion passenger * ping * postgresql * procstat @@ -188,6 +190,7 @@ want to add support for another service or third-party API. * amon * amqp * datadog +* graphite * kafka * amazon kinesis * librato diff --git a/agent.go b/agent.go index 0c5d58db5..25fd46462 100644 --- a/agent.go +++ b/agent.go @@ -58,7 +58,7 @@ func (a *Agent) Connect() error { } err := o.Output.Connect() if err != nil { - log.Printf("Failed to connect to output %s, retrying in 15s\n", o.Name) + log.Printf("Failed to connect to output %s, retrying in 15s, error was '%s' \n", o.Name, err) time.Sleep(15 * time.Second) err = o.Output.Connect() if err != nil { diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index a2b5161be..47213e0e0 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -7,6 +7,7 @@ import ( "os" "os/signal" "strings" + "syscall" "github.com/influxdb/telegraf" "github.com/influxdb/telegraf/internal/config" @@ -82,143 +83,156 @@ Examples: ` func main() { - flag.Usage = usageExit - flag.Parse() + reload := make(chan bool, 1) + reload <- true + for <-reload { + reload <- false + flag.Usage = usageExit + flag.Parse() - if flag.NFlag() == 0 { - usageExit() - } + if flag.NFlag() == 0 { + usageExit() + } - var inputFilters []string - if *fInputFiltersLegacy != "" { - inputFilter := strings.TrimSpace(*fInputFiltersLegacy) - inputFilters = strings.Split(":"+inputFilter+":", ":") - } - if *fInputFilters != "" { - inputFilter := strings.TrimSpace(*fInputFilters) - inputFilters = strings.Split(":"+inputFilter+":", ":") - } + var inputFilters []string + if *fInputFiltersLegacy != "" { + inputFilter := strings.TrimSpace(*fInputFiltersLegacy) + inputFilters = strings.Split(":"+inputFilter+":", ":") + } + if *fInputFilters != "" { + inputFilter := strings.TrimSpace(*fInputFilters) + inputFilters = strings.Split(":"+inputFilter+":", ":") + } - var outputFilters []string - if *fOutputFiltersLegacy != "" { - outputFilter := strings.TrimSpace(*fOutputFiltersLegacy) - outputFilters = strings.Split(":"+outputFilter+":", ":") - } - if *fOutputFilters != "" { - outputFilter := strings.TrimSpace(*fOutputFilters) - outputFilters = strings.Split(":"+outputFilter+":", ":") - } + var outputFilters []string + if *fOutputFiltersLegacy != "" { + outputFilter := strings.TrimSpace(*fOutputFiltersLegacy) + outputFilters = strings.Split(":"+outputFilter+":", ":") + } + if *fOutputFilters != "" { + outputFilter := strings.TrimSpace(*fOutputFilters) + outputFilters = strings.Split(":"+outputFilter+":", ":") + } - if *fVersion { - v := fmt.Sprintf("Telegraf - Version %s", Version) - fmt.Println(v) - return - } + if *fVersion { + v := fmt.Sprintf("Telegraf - Version %s", Version) + fmt.Println(v) + return + } - if *fSampleConfig { - config.PrintSampleConfig(inputFilters, outputFilters) - return - } + if *fSampleConfig { + config.PrintSampleConfig(inputFilters, outputFilters) + return + } - if *fUsage != "" { - if err := config.PrintInputConfig(*fUsage); err != nil { - if err2 := config.PrintOutputConfig(*fUsage); err2 != nil { - log.Fatalf("%s and %s", err, err2) + if *fUsage != "" { + if err := config.PrintInputConfig(*fUsage); err != nil { + if err2 := config.PrintOutputConfig(*fUsage); err2 != nil { + log.Fatalf("%s and %s", err, err2) + } + } + return + } + + var ( + c *config.Config + err error + ) + + if *fConfig != "" { + c = config.NewConfig() + c.OutputFilters = outputFilters + c.InputFilters = inputFilters + err = c.LoadConfig(*fConfig) + if err != nil { + log.Fatal(err) + } + } else { + fmt.Println("Usage: Telegraf") + flag.PrintDefaults() + return + } + + if *fConfigDirectoryLegacy != "" { + err = c.LoadDirectory(*fConfigDirectoryLegacy) + if err != nil { + log.Fatal(err) } } - return - } - var ( - c *config.Config - err error - ) + if *fConfigDirectory != "" { + err = c.LoadDirectory(*fConfigDirectory) + if err != nil { + log.Fatal(err) + } + } + if len(c.Outputs) == 0 { + log.Fatalf("Error: no outputs found, did you provide a valid config file?") + } + if len(c.Inputs) == 0 { + log.Fatalf("Error: no plugins found, did you provide a valid config file?") + } - if *fConfig != "" { - c = config.NewConfig() - c.OutputFilters = outputFilters - c.InputFilters = inputFilters - err = c.LoadConfig(*fConfig) + ag, err := telegraf.NewAgent(c) if err != nil { log.Fatal(err) } - } else { - fmt.Println("Usage: Telegraf") - flag.PrintDefaults() - return - } - if *fConfigDirectoryLegacy != "" { - err = c.LoadDirectory(*fConfigDirectoryLegacy) + if *fDebug { + ag.Config.Agent.Debug = true + } + + if *fQuiet { + ag.Config.Agent.Quiet = true + } + + if *fTest { + err = ag.Test() + if err != nil { + log.Fatal(err) + } + return + } + + err = ag.Connect() if err != nil { log.Fatal(err) } - } - if *fConfigDirectory != "" { - err = c.LoadDirectory(*fConfigDirectory) - if err != nil { - log.Fatal(err) - } - } - if len(c.Outputs) == 0 { - log.Fatalf("Error: no outputs found, did you provide a valid config file?") - } - if len(c.Inputs) == 0 { - log.Fatalf("Error: no plugins found, did you provide a valid config file?") - } + shutdown := make(chan struct{}) + signals := make(chan os.Signal) + signal.Notify(signals, os.Interrupt, syscall.SIGHUP) + go func() { + sig := <-signals + if sig == os.Interrupt { + close(shutdown) + } + if sig == syscall.SIGHUP { + log.Printf("Reloading Telegraf config\n") + <-reload + reload <- true + close(shutdown) + } + }() - ag, err := telegraf.NewAgent(c) - if err != nil { - log.Fatal(err) - } + log.Printf("Starting Telegraf (version %s)\n", Version) + log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " ")) + log.Printf("Loaded plugins: %s", strings.Join(c.InputNames(), " ")) + log.Printf("Tags enabled: %s", c.ListTags()) - if *fDebug { - ag.Config.Agent.Debug = true - } + if *fPidfile != "" { + f, err := os.Create(*fPidfile) + if err != nil { + log.Fatalf("Unable to create pidfile: %s", err) + } - if *fQuiet { - ag.Config.Agent.Quiet = true - } + fmt.Fprintf(f, "%d\n", os.Getpid()) - if *fTest { - err = ag.Test() - if err != nil { - log.Fatal(err) - } - return - } - - err = ag.Connect() - if err != nil { - log.Fatal(err) - } - - shutdown := make(chan struct{}) - signals := make(chan os.Signal) - signal.Notify(signals, os.Interrupt) - go func() { - <-signals - close(shutdown) - }() - - log.Printf("Starting Telegraf (version %s)\n", Version) - log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " ")) - log.Printf("Loaded plugins: %s", strings.Join(c.InputNames(), " ")) - log.Printf("Tags enabled: %s", c.ListTags()) - - if *fPidfile != "" { - f, err := os.Create(*fPidfile) - if err != nil { - log.Fatalf("Unable to create pidfile: %s", err) + f.Close() } - fmt.Fprintf(f, "%d\n", os.Getpid()) - - f.Close() + ag.Run(shutdown) } - - ag.Run(shutdown) } func usageExit() { diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index b6b1e74da..c9e8ea4c8 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -19,6 +19,8 @@ import ( _ "github.com/influxdb/telegraf/plugins/inputs/mongodb" _ "github.com/influxdb/telegraf/plugins/inputs/mysql" _ "github.com/influxdb/telegraf/plugins/inputs/nginx" + _ "github.com/influxdb/telegraf/plugins/inputs/nsq" + _ "github.com/influxdb/telegraf/plugins/inputs/passenger" _ "github.com/influxdb/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdb/telegraf/plugins/inputs/ping" _ "github.com/influxdb/telegraf/plugins/inputs/postgresql" diff --git a/plugins/inputs/nsq/nsq.go b/plugins/inputs/nsq/nsq.go new file mode 100644 index 000000000..48a709a37 --- /dev/null +++ b/plugins/inputs/nsq/nsq.go @@ -0,0 +1,271 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Jeff Nickoloff (jeff@allingeek.com) +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package nsq + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "strconv" + "sync" + "time" + + "github.com/influxdb/telegraf/plugins/inputs" +) + +// Might add Lookupd endpoints for cluster discovery +type NSQ struct { + Endpoints []string +} + +var sampleConfig = ` + # An array of NSQD HTTP API endpoints + endpoints = ["http://localhost:4151"] +` + +const ( + requestPattern = `%s/stats?format=json` +) + +func init() { + inputs.Add("nsq", func() inputs.Input { + return &NSQ{} + }) +} + +func (n *NSQ) SampleConfig() string { + return sampleConfig +} + +func (n *NSQ) Description() string { + return "Read NSQ topic and channel statistics." +} + +func (n *NSQ) Gather(acc inputs.Accumulator) error { + var wg sync.WaitGroup + var outerr error + + for _, e := range n.Endpoints { + wg.Add(1) + go func(e string) { + defer wg.Done() + outerr = n.gatherEndpoint(e, acc) + }(e) + } + + wg.Wait() + + return outerr +} + +var tr = &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), +} + +var client = &http.Client{Transport: tr} + +func (n *NSQ) gatherEndpoint(e string, acc inputs.Accumulator) error { + u, err := buildURL(e) + if err != nil { + return err + } + r, err := client.Get(u.String()) + if err != nil { + return fmt.Errorf("Error while polling %s: %s", u.String(), err) + } + defer r.Body.Close() + + if r.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status %s", u.String(), r.Status) + } + + s := &NSQStats{} + err = json.NewDecoder(r.Body).Decode(s) + if err != nil { + return fmt.Errorf(`Error parsing response: %s`, err) + } + + tags := map[string]string{ + `server_host`: u.Host, + `server_version`: s.Data.Version, + } + + fields := make(map[string]interface{}) + if s.Data.Health == `OK` { + fields["server_count"] = int64(1) + } else { + fields["server_count"] = int64(0) + } + fields["topic_count"] = int64(len(s.Data.Topics)) + + acc.AddFields("nsq_server", fields, tags) + for _, t := range s.Data.Topics { + topicStats(t, acc, u.Host, s.Data.Version) + } + + return nil +} + +func buildURL(e string) (*url.URL, error) { + u := fmt.Sprintf(requestPattern, e) + addr, err := url.Parse(u) + if err != nil { + return nil, fmt.Errorf("Unable to parse address '%s': %s", u, err) + } + return addr, nil +} + +func topicStats(t TopicStats, acc inputs.Accumulator, host, version string) { + // per topic overall (tag: name, paused, channel count) + tags := map[string]string{ + "server_host": host, + "server_version": version, + "topic": t.Name, + } + + fields := map[string]interface{}{ + "depth": t.Depth, + "backend_depth": t.BackendDepth, + "message_count": t.MessageCount, + "channel_count": int64(len(t.Channels)), + } + acc.AddFields("nsq_topic", fields, tags) + + for _, c := range t.Channels { + channelStats(c, acc, host, version, t.Name) + } +} + +func channelStats(c ChannelStats, acc inputs.Accumulator, host, version, topic string) { + tags := map[string]string{ + "server_host": host, + "server_version": version, + "topic": topic, + "channel": c.Name, + } + + fields := map[string]interface{}{ + "depth": c.Depth, + "backend_depth": c.BackendDepth, + "inflight_count": c.InFlightCount, + "deferred_count": c.DeferredCount, + "message_count": c.MessageCount, + "requeue_count": c.RequeueCount, + "timeout_count": c.TimeoutCount, + "client_count": int64(len(c.Clients)), + } + + acc.AddFields("nsq_channel", fields, tags) + for _, cl := range c.Clients { + clientStats(cl, acc, host, version, topic, c.Name) + } +} + +func clientStats(c ClientStats, acc inputs.Accumulator, host, version, topic, channel string) { + tags := map[string]string{ + "server_host": host, + "server_version": version, + "topic": topic, + "channel": channel, + "client_name": c.Name, + "client_id": c.ID, + "client_hostname": c.Hostname, + "client_version": c.Version, + "client_address": c.RemoteAddress, + "client_user_agent": c.UserAgent, + "client_tls": strconv.FormatBool(c.TLS), + "client_snappy": strconv.FormatBool(c.Snappy), + "client_deflate": strconv.FormatBool(c.Deflate), + } + + fields := map[string]interface{}{ + "ready_count": c.ReadyCount, + "inflight_count": c.InFlightCount, + "message_count": c.MessageCount, + "finish_count": c.FinishCount, + "requeue_count": c.RequeueCount, + } + acc.AddFields("nsq_client", fields, tags) +} + +type NSQStats struct { + Code int64 `json:"status_code"` + Txt string `json:"status_txt"` + Data NSQStatsData `json:"data"` +} + +type NSQStatsData struct { + Version string `json:"version"` + Health string `json:"health"` + StartTime int64 `json:"start_time"` + Topics []TopicStats `json:"topics"` +} + +// e2e_processing_latency is not modeled +type TopicStats struct { + Name string `json:"topic_name"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + MessageCount int64 `json:"message_count"` + Paused bool `json:"paused"` + Channels []ChannelStats `json:"channels"` +} + +// e2e_processing_latency is not modeled +type ChannelStats struct { + Name string `json:"channel_name"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + InFlightCount int64 `json:"in_flight_count"` + DeferredCount int64 `json:"deferred_count"` + MessageCount int64 `json:"message_count"` + RequeueCount int64 `json:"requeue_count"` + TimeoutCount int64 `json:"timeout_count"` + Paused bool `json:"paused"` + Clients []ClientStats `json:"clients"` +} + +type ClientStats struct { + Name string `json:"name"` + ID string `json:"client_id"` + Hostname string `json:"hostname"` + Version string `json:"version"` + RemoteAddress string `json:"remote_address"` + State int64 `json:"state"` + ReadyCount int64 `json:"ready_count"` + InFlightCount int64 `json:"in_flight_count"` + MessageCount int64 `json:"message_count"` + FinishCount int64 `json:"finish_count"` + RequeueCount int64 `json:"requeue_count"` + ConnectTime int64 `json:"connect_ts"` + SampleRate int64 `json:"sample_rate"` + Deflate bool `json:"deflate"` + Snappy bool `json:"snappy"` + UserAgent string `json:"user_agent"` + TLS bool `json:"tls"` + TLSCipherSuite string `json:"tls_cipher_suite"` + TLSVersion string `json:"tls_version"` + TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` + TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` +} diff --git a/plugins/inputs/nsq/nsq_test.go b/plugins/inputs/nsq/nsq_test.go new file mode 100644 index 000000000..fc34a710b --- /dev/null +++ b/plugins/inputs/nsq/nsq_test.go @@ -0,0 +1,273 @@ +package nsq + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/influxdb/telegraf/testutil" + + "github.com/stretchr/testify/require" +) + +func TestNSQStats(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, response) + })) + defer ts.Close() + + n := &NSQ{ + Endpoints: []string{ts.URL}, + } + + var acc testutil.Accumulator + err := n.Gather(&acc) + require.NoError(t, err) + + u, err := url.Parse(ts.URL) + require.NoError(t, err) + host := u.Host + + // actually validate the tests + tests := []struct { + m string + f map[string]interface{} + g map[string]string + }{ + { + "nsq_server", + map[string]interface{}{ + "server_count": int64(1), + "topic_count": int64(2), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + }, + }, + { + "nsq_topic", + map[string]interface{}{ + "depth": int64(12), + "backend_depth": int64(13), + "message_count": int64(14), + "channel_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + "topic": "t1"}, + }, + { + "nsq_channel", + map[string]interface{}{ + "depth": int64(0), + "backend_depth": int64(1), + "inflight_count": int64(2), + "deferred_count": int64(3), + "message_count": int64(4), + "requeue_count": int64(5), + "timeout_count": int64(6), + "client_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + "topic": "t1", + "channel": "c1", + }, + }, + { + "nsq_client", + map[string]interface{}{ + "ready_count": int64(200), + "inflight_count": int64(7), + "message_count": int64(8), + "finish_count": int64(9), + "requeue_count": int64(10), + }, + map[string]string{"server_host": host, "server_version": "0.3.6", + "topic": "t1", "channel": "c1", "client_name": "373a715cd990", + "client_id": "373a715cd990", "client_hostname": "373a715cd990", + "client_version": "V2", "client_address": "172.17.0.11:35560", + "client_tls": "false", "client_snappy": "false", + "client_deflate": "false", + "client_user_agent": "nsq_to_nsq/0.3.6 go-nsq/1.0.5"}, + }, + { + "nsq_topic", + map[string]interface{}{ + "depth": int64(28), + "backend_depth": int64(29), + "message_count": int64(30), + "channel_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + "topic": "t2"}, + }, + { + "nsq_channel", + map[string]interface{}{ + "depth": int64(15), + "backend_depth": int64(16), + "inflight_count": int64(17), + "deferred_count": int64(18), + "message_count": int64(19), + "requeue_count": int64(20), + "timeout_count": int64(21), + "client_count": int64(1), + }, + map[string]string{ + "server_host": host, + "server_version": "0.3.6", + "topic": "t2", + "channel": "c2", + }, + }, + { + "nsq_client", + map[string]interface{}{ + "ready_count": int64(22), + "inflight_count": int64(23), + "message_count": int64(24), + "finish_count": int64(25), + "requeue_count": int64(26), + }, + map[string]string{"server_host": host, "server_version": "0.3.6", + "topic": "t2", "channel": "c2", "client_name": "377569bd462b", + "client_id": "377569bd462b", "client_hostname": "377569bd462b", + "client_version": "V2", "client_address": "172.17.0.8:48145", + "client_user_agent": "go-nsq/1.0.5", "client_tls": "true", + "client_snappy": "true", "client_deflate": "true"}, + }, + } + + for _, test := range tests { + acc.AssertContainsTaggedFields(t, test.m, test.f, test.g) + } +} + +var response = ` +{ + "status_code": 200, + "status_txt": "OK", + "data": { + "version": "0.3.6", + "health": "OK", + "start_time": 1452021674, + "topics": [ + { + "topic_name": "t1", + "channels": [ + { + "channel_name": "c1", + "depth": 0, + "backend_depth": 1, + "in_flight_count": 2, + "deferred_count": 3, + "message_count": 4, + "requeue_count": 5, + "timeout_count": 6, + "clients": [ + { + "name": "373a715cd990", + "client_id": "373a715cd990", + "hostname": "373a715cd990", + "version": "V2", + "remote_address": "172.17.0.11:35560", + "state": 3, + "ready_count": 200, + "in_flight_count": 7, + "message_count": 8, + "finish_count": 9, + "requeue_count": 10, + "connect_ts": 1452021675, + "sample_rate": 11, + "deflate": false, + "snappy": false, + "user_agent": "nsq_to_nsq\/0.3.6 go-nsq\/1.0.5", + "tls": false, + "tls_cipher_suite": "", + "tls_version": "", + "tls_negotiated_protocol": "", + "tls_negotiated_protocol_is_mutual": false + } + ], + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + } + ], + "depth": 12, + "backend_depth": 13, + "message_count": 14, + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + }, + { + "topic_name": "t2", + "channels": [ + { + "channel_name": "c2", + "depth": 15, + "backend_depth": 16, + "in_flight_count": 17, + "deferred_count": 18, + "message_count": 19, + "requeue_count": 20, + "timeout_count": 21, + "clients": [ + { + "name": "377569bd462b", + "client_id": "377569bd462b", + "hostname": "377569bd462b", + "version": "V2", + "remote_address": "172.17.0.8:48145", + "state": 3, + "ready_count": 22, + "in_flight_count": 23, + "message_count": 24, + "finish_count": 25, + "requeue_count": 26, + "connect_ts": 1452021678, + "sample_rate": 27, + "deflate": true, + "snappy": true, + "user_agent": "go-nsq\/1.0.5", + "tls": true, + "tls_cipher_suite": "", + "tls_version": "", + "tls_negotiated_protocol": "", + "tls_negotiated_protocol_is_mutual": false + } + ], + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + } + ], + "depth": 28, + "backend_depth": 29, + "message_count": 30, + "paused": false, + "e2e_processing_latency": { + "count": 0, + "percentiles": null + } + } + ] + } +} +` diff --git a/plugins/inputs/passenger/README.md b/plugins/inputs/passenger/README.md new file mode 100644 index 000000000..64e39729b --- /dev/null +++ b/plugins/inputs/passenger/README.md @@ -0,0 +1,138 @@ +# Telegraf plugin: passenger + +Get phusion passenger stat using their command line utility +`passenger-status` + +# Measurements + +Meta: + +- tags: + + * name + * passenger_version + * pid + * code_revision + +Measurement names: + +- passenger: + + * Tags: `passenger_version` + * Fields: + + - process_count + - max + - capacity_used + - get_wait_list_size + +- passenger_supergroup: + + * Tags: `name` + * Fields: + + - get_wait_list_size + - capacity_used + +- passenger_group: + + * Tags: + + - name + - app_root + - app_type + + * Fields: + + - get_wait_list_size + - capacity_used + - processes_being_spawned + +- passenger_process: + + * Tags: + + - group_name + - app_root + - supergroup_name + - pid + - code_revision + - life_status + - process_group_id + + * Field: + + - concurrency + - sessions + - busyness + - processed + - spawner_creation_time + - spawn_start_time + - spawn_end_time + - last_used + - uptime + - cpu + - rss + - pss + - private_dirty + - swap + - real_memory + - vmsize + +# Example output + +Using this configuration: + +``` +[[inputs.passenger]] + # Path of passenger-status. + # + # Plugin gather metric via parsing XML output of passenger-status + # More information about the tool: + # https://www.phusionpassenger.com/library/admin/apache/overall_status_report.html + # + # + # If no path is specified, then the plugin simply execute passenger-status + # hopefully it can be found in your PATH + command = "passenger-status -v --show=xml" +``` + +When run with: + +``` +./telegraf -config telegraf.conf -test -input-filter passenger +``` + +It produces: + +``` +> passenger,passenger_version=5.0.17 capacity_used=23i,get_wait_list_size=0i,max=23i,process_count=23i 1452984112799414257 +> passenger_supergroup,name=/var/app/current/public capacity_used=23i,get_wait_list_size=0i 1452984112799496977 +> passenger_group,app_root=/var/app/current,app_type=rack,name=/var/app/current/public capacity_used=23i,get_wait_list_size=0i,processes_being_spawned=0i 1452984112799527021 +> passenger_process,app_root=/var/app/current,code_revision=899ac7f,group_name=/var/app/current/public,life_status=ALIVE,pid=11553,process_group_id=13608,supergroup_name=/var/app/current/public busyness=0i,concurrency=1i,cpu=58i,last_used=1452747071764940i,private_dirty=314900i,processed=951i,pss=319391i,real_memory=314900i,rss=418548i,sessions=0i,spawn_end_time=1452746845013365i,spawn_start_time=1452746844946982i,spawner_creation_time=1452746835922747i,swap=0i,uptime=226i,vmsize=1563580i 1452984112799571490 +> passenger_process,app_root=/var/app/current,code_revision=899ac7f,group_name=/var/app/current/public,life_status=ALIVE,pid=11563,process_group_id=13608,supergroup_name=/var/app/current/public busyness=2147483647i,concurrency=1i,cpu=47i,last_used=1452747071709179i,private_dirty=309240i,processed=756i,pss=314036i,real_memory=309240i,rss=418296i,sessions=1i,spawn_end_time=1452746845172460i,spawn_start_time=1452746845136882i,spawner_creation_time=1452746835922747i,swap=0i,uptime=226i,vmsize=1563608i 1452984112799638581 +``` + +# Note + +You have to ensure that you can run the `passenger-status` command under +telegraf user. Depend on how you install and configure passenger, this +maybe an issue for you. If you are using passenger standlone, or compile +yourself, it is straight forward. However, if you are using gem and +`rvm`, it maybe harder to get this right. + +Such as with `rvm`, you can use this command: + +``` +~/.rvm/bin/rvm default do passenger-status -v --show=xml +``` + +You can use `&` and `;` in the shell command to run comlicated shell command +in order to get the passenger-status such as load the rvm shell, source the +path +``` +command = "source .rvm/scripts/rvm && passenger-status -v --show=xml" +``` + +Anyway, just ensure that you can run the command under `telegraf` user, and it +has to produce XML output. diff --git a/plugins/inputs/passenger/passenger.go b/plugins/inputs/passenger/passenger.go new file mode 100644 index 000000000..2d98f8c58 --- /dev/null +++ b/plugins/inputs/passenger/passenger.go @@ -0,0 +1,250 @@ +package passenger + +import ( + "bytes" + "encoding/xml" + "fmt" + "os/exec" + "strconv" + "strings" + + "github.com/influxdb/telegraf/plugins/inputs" + "golang.org/x/net/html/charset" +) + +type passenger struct { + Command string +} + +func (p *passenger) parseCommand() (string, []string) { + var arguments []string + if !strings.Contains(p.Command, " ") { + return p.Command, arguments + } + + arguments = strings.Split(p.Command, " ") + if len(arguments) == 1 { + return arguments[0], arguments[1:] + } + + return arguments[0], arguments[1:] +} + +type info struct { + Passenger_version string `xml:"passenger_version"` + Process_count int `xml:"process_count"` + Capacity_used int `xml:"capacity_used"` + Get_wait_list_size int `xml:"get_wait_list_size"` + Max int `xml:"max"` + Supergroups struct { + Supergroup []struct { + Name string `xml:"name"` + Get_wait_list_size int `xml:"get_wait_list_size"` + Capacity_used int `xml:"capacity_used"` + Group []struct { + Name string `xml:"name"` + AppRoot string `xml:"app_root"` + AppType string `xml:"app_type"` + Enabled_process_count int `xml:"enabled_process_count"` + Disabling_process_count int `xml:"disabling_process_count"` + Disabled_process_count int `xml:"disabled_process_count"` + Capacity_used int `xml:"capacity_used"` + Get_wait_list_size int `xml:"get_wait_list_size"` + Processes_being_spawned int `xml:"processes_being_spawned"` + Processes struct { + Process []*process `xml:"process"` + } `xml:"processes"` + } `xml:"group"` + } `xml:"supergroup"` + } `xml:"supergroups"` +} + +type process struct { + Pid int `xml:"pid"` + Concurrency int `xml:"concurrency"` + Sessions int `xml:"sessions"` + Busyness int `xml:"busyness"` + Processed int `xml:"processed"` + Spawner_creation_time int64 `xml:"spawner_creation_time"` + Spawn_start_time int64 `xml:"spawn_start_time"` + Spawn_end_time int64 `xml:"spawn_end_time"` + Last_used int64 `xml:"last_used"` + Uptime string `xml:"uptime"` + Code_revision string `xml:"code_revision"` + Life_status string `xml:"life_status"` + Enabled string `xml:"enabled"` + Has_metrics bool `xml:"has_metrics"` + Cpu int64 `xml:"cpu"` + Rss int64 `xml:"rss"` + Pss int64 `xml:"pss"` + Private_dirty int64 `xml:"private_dirty"` + Swap int64 `xml:"swap"` + Real_memory int64 `xml:"real_memory"` + Vmsize int64 `xml:"vmsize"` + Process_group_id string `xml:"process_group_id"` +} + +func (p *process) getUptime() int64 { + if p.Uptime == "" { + return 0 + } + + timeSlice := strings.Split(p.Uptime, " ") + var uptime int64 + uptime = 0 + for _, v := range timeSlice { + switch { + case strings.HasSuffix(v, "d"): + iValue := strings.TrimSuffix(v, "d") + value, err := strconv.ParseInt(iValue, 10, 64) + if err == nil { + uptime += value * (24 * 60 * 60) + } + case strings.HasSuffix(v, "h"): + iValue := strings.TrimSuffix(v, "y") + value, err := strconv.ParseInt(iValue, 10, 64) + if err == nil { + uptime += value * (60 * 60) + } + case strings.HasSuffix(v, "m"): + iValue := strings.TrimSuffix(v, "m") + value, err := strconv.ParseInt(iValue, 10, 64) + if err == nil { + uptime += value * 60 + } + case strings.HasSuffix(v, "s"): + iValue := strings.TrimSuffix(v, "s") + value, err := strconv.ParseInt(iValue, 10, 64) + if err == nil { + uptime += value + } + } + } + + return uptime +} + +var sampleConfig = ` + # Path of passenger-status. + # + # Plugin gather metric via parsing XML output of passenger-status + # More information about the tool: + # https://www.phusionpassenger.com/library/admin/apache/overall_status_report.html + # + # + # If no path is specified, then the plugin simply execute passenger-status + # hopefully it can be found in your PATH + command = "passenger-status -v --show=xml" +` + +func (r *passenger) SampleConfig() string { + return sampleConfig +} + +func (r *passenger) Description() string { + return "Read metrics of passenger using passenger-status" +} + +func (g *passenger) Gather(acc inputs.Accumulator) error { + if g.Command == "" { + g.Command = "passenger-status -v --show=xml" + } + + cmd, args := g.parseCommand() + out, err := exec.Command(cmd, args...).Output() + + if err != nil { + return err + } + + if err = importMetric(out, acc); err != nil { + return err + } + + return nil +} + +func importMetric(stat []byte, acc inputs.Accumulator) error { + var p info + + decoder := xml.NewDecoder(bytes.NewReader(stat)) + decoder.CharsetReader = charset.NewReaderLabel + if err := decoder.Decode(&p); err != nil { + return fmt.Errorf("Cannot parse input with error: %v\n", err) + } + + tags := map[string]string{ + "passenger_version": p.Passenger_version, + } + fields := map[string]interface{}{ + "process_count": p.Process_count, + "max": p.Max, + "capacity_used": p.Capacity_used, + "get_wait_list_size": p.Get_wait_list_size, + } + acc.AddFields("passenger", fields, tags) + + for _, sg := range p.Supergroups.Supergroup { + tags := map[string]string{ + "name": sg.Name, + } + fields := map[string]interface{}{ + "get_wait_list_size": sg.Get_wait_list_size, + "capacity_used": sg.Capacity_used, + } + acc.AddFields("passenger_supergroup", fields, tags) + + for _, group := range sg.Group { + tags := map[string]string{ + "name": group.Name, + "app_root": group.AppRoot, + "app_type": group.AppType, + } + fields := map[string]interface{}{ + "get_wait_list_size": group.Get_wait_list_size, + "capacity_used": group.Capacity_used, + "processes_being_spawned": group.Processes_being_spawned, + } + acc.AddFields("passenger_group", fields, tags) + + for _, process := range group.Processes.Process { + tags := map[string]string{ + "group_name": group.Name, + "app_root": group.AppRoot, + "supergroup_name": sg.Name, + "pid": fmt.Sprintf("%d", process.Pid), + "code_revision": process.Code_revision, + "life_status": process.Life_status, + "process_group_id": process.Process_group_id, + } + fields := map[string]interface{}{ + "concurrency": process.Concurrency, + "sessions": process.Sessions, + "busyness": process.Busyness, + "processed": process.Processed, + "spawner_creation_time": process.Spawner_creation_time, + "spawn_start_time": process.Spawn_start_time, + "spawn_end_time": process.Spawn_end_time, + "last_used": process.Last_used, + "uptime": process.getUptime(), + "cpu": process.Cpu, + "rss": process.Rss, + "pss": process.Pss, + "private_dirty": process.Private_dirty, + "swap": process.Swap, + "real_memory": process.Real_memory, + "vmsize": process.Vmsize, + } + acc.AddFields("passenger_process", fields, tags) + } + } + } + + return nil +} + +func init() { + inputs.Add("passenger", func() inputs.Input { + return &passenger{} + }) +} diff --git a/plugins/inputs/passenger/passenger_test.go b/plugins/inputs/passenger/passenger_test.go new file mode 100644 index 000000000..3440c5337 --- /dev/null +++ b/plugins/inputs/passenger/passenger_test.go @@ -0,0 +1,301 @@ +package passenger + +import ( + "fmt" + "io/ioutil" + "os" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func fakePassengerStatus(stat string) { + content := fmt.Sprintf("#!/bin/sh\ncat << EOF\n%s\nEOF", stat) + ioutil.WriteFile("/tmp/passenger-status", []byte(content), 0700) +} + +func teardown() { + os.Remove("/tmp/passenger-status") +} + +func Test_Invalid_Passenger_Status_Cli(t *testing.T) { + r := &passenger{ + Command: "an-invalid-command passenger-status", + } + + var acc testutil.Accumulator + + err := r.Gather(&acc) + require.Error(t, err) + assert.Equal(t, err.Error(), `exec: "an-invalid-command": executable file not found in $PATH`) +} + +func Test_Invalid_Xml(t *testing.T) { + fakePassengerStatus("invalid xml") + defer teardown() + + r := &passenger{ + Command: "/tmp/passenger-status", + } + + var acc testutil.Accumulator + + err := r.Gather(&acc) + require.Error(t, err) + assert.Equal(t, err.Error(), "Cannot parse input with error: EOF\n") +} + +// We test this by ensure that the error message match the path of default cli +func Test_Default_Config_Load_Default_Command(t *testing.T) { + fakePassengerStatus("invalid xml") + defer teardown() + + r := &passenger{} + + var acc testutil.Accumulator + + err := r.Gather(&acc) + require.Error(t, err) + assert.Equal(t, err.Error(), "exec: \"passenger-status\": executable file not found in $PATH") +} + +func TestPassengerGenerateMetric(t *testing.T) { + fakePassengerStatus(sampleStat) + defer teardown() + + //Now we tested again above server, with our authentication data + r := &passenger{ + Command: "/tmp/passenger-status", + } + + var acc testutil.Accumulator + + err := r.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "passenger_version": "5.0.17", + } + fields := map[string]interface{}{ + "process_count": 23, + "max": 23, + "capacity_used": 23, + "get_wait_list_size": 3, + } + acc.AssertContainsTaggedFields(t, "passenger", fields, tags) + + tags = map[string]string{ + "name": "/var/app/current/public", + "app_root": "/var/app/current", + "app_type": "rack", + } + fields = map[string]interface{}{ + "processes_being_spawned": 2, + "capacity_used": 23, + "get_wait_list_size": 3, + } + acc.AssertContainsTaggedFields(t, "passenger_group", fields, tags) + + tags = map[string]string{ + "name": "/var/app/current/public", + } + + fields = map[string]interface{}{ + "capacity_used": 23, + "get_wait_list_size": 3, + } + acc.AssertContainsTaggedFields(t, "passenger_supergroup", fields, tags) + + tags = map[string]string{ + "app_root": "/var/app/current", + "group_name": "/var/app/current/public", + "supergroup_name": "/var/app/current/public", + "pid": "11553", + "code_revision": "899ac7f", + "life_status": "ALIVE", + "process_group_id": "13608", + } + fields = map[string]interface{}{ + "concurrency": 1, + "sessions": 0, + "busyness": 0, + "processed": 951, + "spawner_creation_time": int64(1452746835922747), + "spawn_start_time": int64(1452746844946982), + "spawn_end_time": int64(1452746845013365), + "last_used": int64(1452747071764940), + "uptime": int64(226), // in seconds of 3m 46s + "cpu": int64(58), + "rss": int64(418548), + "pss": int64(319391), + "private_dirty": int64(314900), + "swap": int64(0), + "real_memory": int64(314900), + "vmsize": int64(1563580), + } + acc.AssertContainsTaggedFields(t, "passenger_process", fields, tags) +} + +var sampleStat = ` + + + + 5.0.17 + 1 + 23 + 23 + 23 + 3 + + + + /var/app/current/public + READY + 3 + 23 + foo + + /var/app/current/public + /var/app/current/public + /var/app/current + rack + production + QQUrbCVYxbJYpfgyDOwJ + 23 + 0 + 0 + 23 + 3 + 0 + 2 + foo + foo + ALIVE + axcoto + 1001 + axcoto + 1001 + + /var/app/current + /var/app/current/public + rack + /var/app/.rvm/gems/ruby-2.2.0-p645/gems/passenger-5.0.17/helper-scripts/rack-loader.rb + config.ru + Passenger RubyApp + 3 + 90000 + production + / + smart + nobody + nogroup + /var/app/.rvm/gems/ruby-2.2.0-p645/wrappers/ruby + python + node + unix:/tmp/passenger.eKFdvdC/agents.s/ust_router + logging + foo + false + false + foo + 22 + 0 + 300 + 1 + + + + 11553 + 378579907 + 17173df-PoNT3J9HCf + 1 + 0 + 0 + 951 + 1452746835922747 + 1452746844946982 + 1452746845013365 + 1452747071764940 + 0s ago + 3m 46s + 899ac7f + ALIVE + ENABLED + true + 58 + 418548 + 319391 + 314900 + 0 + 314900 + 1563580 + 13608 + Passenger RubyApp: /var/app/current/public + + + main +
unix:/tmp/passenger.eKFdvdC/apps.s/ruby.UWF6zkRJ71aoMXPxpknpWVfC1POFqgWZzbEsdz5v0G46cSSMxJ3GHLFhJaUrK2I
+ session + 1 + 0 +
+ + http +
tcp://127.0.0.1:49888
+ http + 1 + 0 +
+
+
+ + 11563 + 1549681201 + 17173df-pX5iJOipd8 + 1 + 1 + 2147483647 + 756 + 1452746835922747 + 1452746845136882 + 1452746845172460 + 1452747071709179 + 0s ago + 3m 46s + 899ac7f + ALIVE + ENABLED + true + 47 + 418296 + 314036 + 309240 + 0 + 309240 + 1563608 + 13608 + Passenger RubyApp: /var/app/current/public + + + main +
unix:/tmp/passenger.eKFdvdC/apps.s/ruby.PVCh7TmvCi9knqhba2vG5qXrlHGEIwhGrxnUvRbIAD6SPz9m0G7YlJ8HEsREHY3
+ session + 1 + 1 +
+ + http +
tcp://127.0.0.1:52783
+ http + 1 + 0 +
+
+
+
+
+
+
+
` diff --git a/plugins/inputs/phpfpm/README.md b/plugins/inputs/phpfpm/README.md index c2a42523a..b853b7fd7 100644 --- a/plugins/inputs/phpfpm/README.md +++ b/plugins/inputs/phpfpm/README.md @@ -6,10 +6,14 @@ Get phpfpm stat using either HTTP status page or fpm socket. Meta: -- tags: `url= pool=poolname` +- tags: `pool=poolname` Measurement names: +- phpfpm + +Measurement field: + - accepted_conn - listen_queue - max_listen_queue @@ -50,36 +54,12 @@ It produces: ``` * Plugin: phpfpm, Collection 1 -> [url="10.0.0.12" pool="www"] phpfpm_idle_processes value=1 -> [url="10.0.0.12" pool="www"] phpfpm_total_processes value=2 -> [url="10.0.0.12" pool="www"] phpfpm_max_children_reached value=0 -> [url="10.0.0.12" pool="www"] phpfpm_max_listen_queue value=0 -> [url="10.0.0.12" pool="www"] phpfpm_listen_queue value=0 -> [url="10.0.0.12" pool="www"] phpfpm_listen_queue_len value=0 -> [url="10.0.0.12" pool="www"] phpfpm_active_processes value=1 -> [url="10.0.0.12" pool="www"] phpfpm_max_active_processes value=2 -> [url="10.0.0.12" pool="www"] phpfpm_slow_requests value=0 -> [url="10.0.0.12" pool="www"] phpfpm_accepted_conn value=305 - -> [url="localhost" pool="www2"] phpfpm_max_children_reached value=0 -> [url="localhost" pool="www2"] phpfpm_slow_requests value=0 -> [url="localhost" pool="www2"] phpfpm_max_listen_queue value=0 -> [url="localhost" pool="www2"] phpfpm_active_processes value=1 -> [url="localhost" pool="www2"] phpfpm_listen_queue_len value=0 -> [url="localhost" pool="www2"] phpfpm_idle_processes value=1 -> [url="localhost" pool="www2"] phpfpm_total_processes value=2 -> [url="localhost" pool="www2"] phpfpm_max_active_processes value=2 -> [url="localhost" pool="www2"] phpfpm_accepted_conn value=306 -> [url="localhost" pool="www2"] phpfpm_listen_queue value=0 - -> [url="10.0.0.12:9000" pool="www3"] phpfpm_max_children_reached value=0 -> [url="10.0.0.12:9000" pool="www3"] phpfpm_slow_requests value=1 -> [url="10.0.0.12:9000" pool="www3"] phpfpm_max_listen_queue value=0 -> [url="10.0.0.12:9000" pool="www3"] phpfpm_active_processes value=1 -> [url="10.0.0.12:9000" pool="www3"] phpfpm_listen_queue_len value=0 -> [url="10.0.0.12:9000" pool="www3"] phpfpm_idle_processes value=2 -> [url="10.0.0.12:9000" pool="www3"] phpfpm_total_processes value=2 -> [url="10.0.0.12:9000" pool="www3"] phpfpm_max_active_processes value=2 -> [url="10.0.0.12:9000" pool="www3"] phpfpm_accepted_conn value=307 -> [url="10.0.0.12:9000" pool="www3"] phpfpm_listen_queue value=0 +> phpfpm,pool=www accepted_conn=13i,active_processes=2i,idle_processes=1i,listen_queue=0i,listen_queue_len=0i,max_active_processes=2i,max_children_reached=0i,max_listen_queue=0i,slow_requests=0i,total_processes=3i 1453011293083331187 +> phpfpm,pool=www2 accepted_conn=12i,active_processes=1i,idle_processes=2i,listen_queue=0i,listen_queue_len=0i,max_active_processes=2i,max_children_reached=0i,max_listen_queue=0i,slow_requests=0i,total_processes=3i 1453011293083691422 +> phpfpm,pool=www3 accepted_conn=11i,active_processes=1i,idle_processes=2i,listen_queue=0i,listen_queue_len=0i,max_active_processes=2i,max_children_reached=0i,max_listen_queue=0i,slow_requests=0i,total_processes=3i 1453011293083691658 ``` + +## Note + +When using `unixsocket`, you have to ensure that telegraf runs on same +host, and socket path is accessible to telegraf user. diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index ceffc673e..5600334b2 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "net/url" + "os" "strconv" "strings" "sync" @@ -40,20 +41,25 @@ type phpfpm struct { var sampleConfig = ` # An array of addresses to gather stats about. Specify an ip or hostname - # with optional port and path. + # with optional port and path # - # Plugin can be configured in three modes (both can be used): - # - http: the URL must start with http:// or https://, ex: + # Plugin can be configured in three modes (either can be used): + # - http: the URL must start with http:// or https://, ie: # "http://localhost/status" # "http://192.168.130.1/status?full" - # - unixsocket: path to fpm socket, ex: + # + # - unixsocket: path to fpm socket, ie: # "/var/run/php5-fpm.sock" - # "192.168.10.10:/var/run/php5-fpm-www2.sock" - # - fcgi: the URL mush start with fcgi:// or cgi://, and port must present, ex: + # or using a custom fpm status path: + # "/var/run/php5-fpm.sock:fpm-custom-status-path" + # + # - fcgi: the URL must start with fcgi:// or cgi://, and port must be present, ie: # "fcgi://10.0.0.12:9000/status" # "cgi://10.0.10.12:9001/status" # - # If no servers are specified, then default to 127.0.0.1/server-status + # Example of multiple gathering from local socket and remove host + # urls = ["http://192.168.1.20/status", "/tmp/fpm.sock"] + # If no servers are specified, then default to http://127.0.0.1/status urls = ["http://localhost/status"] ` @@ -62,7 +68,7 @@ func (r *phpfpm) SampleConfig() string { } func (r *phpfpm) Description() string { - return "Read metrics of phpfpm, via HTTP status page or socket(pending)" + return "Read metrics of phpfpm, via HTTP status page or socket" } // Reads stats from all configured servers accumulates stats. @@ -89,71 +95,96 @@ func (g *phpfpm) Gather(acc inputs.Accumulator) error { return outerr } -// Request status page to get stat raw data +// Request status page to get stat raw data and import it func (g *phpfpm) gatherServer(addr string, acc inputs.Accumulator) error { if g.client == nil { - client := &http.Client{} g.client = client } if strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") { + return g.gatherHttp(addr, acc) + } + + var ( + fcgi *conn + socketPath string + statusPath string + ) + + if strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") { u, err := url.Parse(addr) if err != nil { return fmt.Errorf("Unable parse server address '%s': %s", addr, err) } - - req, err := http.NewRequest("GET", fmt.Sprintf("%s://%s%s", u.Scheme, - u.Host, u.Path), nil) - res, err := g.client.Do(req) - if err != nil { - return fmt.Errorf("Unable to connect to phpfpm status page '%s': %v", - addr, err) - } - - if res.StatusCode != 200 { - return fmt.Errorf("Unable to get valid stat result from '%s': %v", - addr, err) - } - - importMetric(res.Body, acc, u.Host) + socketAddr := strings.Split(u.Host, ":") + fcgiIp := socketAddr[0] + fcgiPort, _ := strconv.Atoi(socketAddr[1]) + fcgi, _ = NewClient(fcgiIp, fcgiPort) } else { - var ( - fcgi *FCGIClient - fcgiAddr string - ) - if strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") { - u, err := url.Parse(addr) - if err != nil { - return fmt.Errorf("Unable parse server address '%s': %s", addr, err) - } - socketAddr := strings.Split(u.Host, ":") - fcgiIp := socketAddr[0] - fcgiPort, _ := strconv.Atoi(socketAddr[1]) - fcgiAddr = u.Host - fcgi, _ = NewClient(fcgiIp, fcgiPort) + socketAddr := strings.Split(addr, ":") + if len(socketAddr) >= 2 { + socketPath = socketAddr[0] + statusPath = socketAddr[1] } else { - socketAddr := strings.Split(addr, ":") - fcgiAddr = socketAddr[0] - fcgi, _ = NewClient("unix", socketAddr[1]) - } - resOut, resErr, err := fcgi.Request(map[string]string{ - "SCRIPT_NAME": "/status", - "SCRIPT_FILENAME": "status", - "REQUEST_METHOD": "GET", - }, "") - - if len(resErr) == 0 && err == nil { - importMetric(bytes.NewReader(resOut), acc, fcgiAddr) + socketPath = socketAddr[0] + statusPath = "status" } + if _, err := os.Stat(socketPath); os.IsNotExist(err) { + return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err) + } + fcgi, _ = NewClient("unix", socketPath) + } + return g.gatherFcgi(fcgi, statusPath, acc) +} + +// Gather stat using fcgi protocol +func (g *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc inputs.Accumulator) error { + fpmOutput, fpmErr, err := fcgi.Request(map[string]string{ + "SCRIPT_NAME": "/" + statusPath, + "SCRIPT_FILENAME": statusPath, + "REQUEST_METHOD": "GET", + "CONTENT_LENGTH": "0", + "SERVER_PROTOCOL": "HTTP/1.0", + "SERVER_SOFTWARE": "go / fcgiclient ", + "REMOTE_ADDR": "127.0.0.1", + }, "/"+statusPath) + + if len(fpmErr) == 0 && err == nil { + importMetric(bytes.NewReader(fpmOutput), acc) + return nil + } else { + return fmt.Errorf("Unable parse phpfpm status. Error: %v %v", string(fpmErr), err) + } +} + +// Gather stat using http protocol +func (g *phpfpm) gatherHttp(addr string, acc inputs.Accumulator) error { + u, err := url.Parse(addr) + if err != nil { + return fmt.Errorf("Unable parse server address '%s': %s", addr, err) } + req, err := http.NewRequest("GET", fmt.Sprintf("%s://%s%s", u.Scheme, + u.Host, u.Path), nil) + res, err := g.client.Do(req) + if err != nil { + return fmt.Errorf("Unable to connect to phpfpm status page '%s': %v", + addr, err) + } + + if res.StatusCode != 200 { + return fmt.Errorf("Unable to get valid stat result from '%s': %v", + addr, err) + } + + importMetric(res.Body, acc) return nil } -// Import HTTP stat data into Telegraf system -func importMetric(r io.Reader, acc inputs.Accumulator, host string) (poolStat, error) { +// Import stat data into Telegraf system +func importMetric(r io.Reader, acc inputs.Accumulator) (poolStat, error) { stats := make(poolStat) var currentPool string @@ -195,7 +226,6 @@ func importMetric(r io.Reader, acc inputs.Accumulator, host string) (poolStat, e // Finally, we push the pool metric for pool := range stats { tags := map[string]string{ - "url": host, "pool": pool, } fields := make(map[string]interface{}) diff --git a/plugins/inputs/phpfpm/phpfpm_fcgi.go b/plugins/inputs/phpfpm/phpfpm_fcgi.go index 65f4c789b..03aac7634 100644 --- a/plugins/inputs/phpfpm/phpfpm_fcgi.go +++ b/plugins/inputs/phpfpm/phpfpm_fcgi.go @@ -1,13 +1,14 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package fcgi implements the FastCGI protocol. +// Currently only the responder role is supported. +// The protocol is defined at http://www.fastcgi.com/drupal/node/6?q=node/22 package phpfpm -// FastCGI client to request via socket - -// Copyright 2012 Junqing Tan and The Go Authors -// Use of this source code is governed by a BSD-style -// Part of source code is from Go fcgi package - -// Fix bug: Can't recive more than 1 record untill FCGI_END_REQUEST 2012-09-15 -// By: wofeiwo +// This file defines the raw protocol and some utilities used by the child and +// the host. import ( "bufio" @@ -15,70 +16,84 @@ import ( "encoding/binary" "errors" "io" + "sync" + "net" "strconv" - "sync" + + "strings" ) -const FCGI_LISTENSOCK_FILENO uint8 = 0 -const FCGI_HEADER_LEN uint8 = 8 -const VERSION_1 uint8 = 1 -const FCGI_NULL_REQUEST_ID uint8 = 0 -const FCGI_KEEP_CONN uint8 = 1 +// recType is a record type, as defined by +// http://www.fastcgi.com/devkit/doc/fcgi-spec.html#S8 +type recType uint8 const ( - FCGI_BEGIN_REQUEST uint8 = iota + 1 - FCGI_ABORT_REQUEST - FCGI_END_REQUEST - FCGI_PARAMS - FCGI_STDIN - FCGI_STDOUT - FCGI_STDERR - FCGI_DATA - FCGI_GET_VALUES - FCGI_GET_VALUES_RESULT - FCGI_UNKNOWN_TYPE - FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE + typeBeginRequest recType = 1 + typeAbortRequest recType = 2 + typeEndRequest recType = 3 + typeParams recType = 4 + typeStdin recType = 5 + typeStdout recType = 6 + typeStderr recType = 7 + typeData recType = 8 + typeGetValues recType = 9 + typeGetValuesResult recType = 10 + typeUnknownType recType = 11 ) -const ( - FCGI_RESPONDER uint8 = iota + 1 - FCGI_AUTHORIZER - FCGI_FILTER -) +// keep the connection between web-server and responder open after request +const flagKeepConn = 1 const ( - FCGI_REQUEST_COMPLETE uint8 = iota - FCGI_CANT_MPX_CONN - FCGI_OVERLOADED - FCGI_UNKNOWN_ROLE -) - -const ( - FCGI_MAX_CONNS string = "MAX_CONNS" - FCGI_MAX_REQS string = "MAX_REQS" - FCGI_MPXS_CONNS string = "MPXS_CONNS" -) - -const ( - maxWrite = 6553500 // maximum record body + maxWrite = 65535 // maximum record body maxPad = 255 ) +const ( + roleResponder = iota + 1 // only Responders are implemented. + roleAuthorizer + roleFilter +) + +const ( + statusRequestComplete = iota + statusCantMultiplex + statusOverloaded + statusUnknownRole +) + +const headerLen = 8 + type header struct { Version uint8 - Type uint8 + Type recType Id uint16 ContentLength uint16 PaddingLength uint8 Reserved uint8 } +type beginRequest struct { + role uint16 + flags uint8 + reserved [5]uint8 +} + +func (br *beginRequest) read(content []byte) error { + if len(content) != 8 { + return errors.New("fcgi: invalid begin request record") + } + br.role = binary.BigEndian.Uint16(content) + br.flags = content[2] + return nil +} + // for padding so we don't have to allocate all the time // not synchronized because we don't care what the contents are var pad [maxPad]byte -func (h *header) init(recType uint8, reqId uint16, contentLength int) { +func (h *header) init(recType recType, reqId uint16, contentLength int) { h.Version = 1 h.Type = recType h.Id = reqId @@ -86,6 +101,26 @@ func (h *header) init(recType uint8, reqId uint16, contentLength int) { h.PaddingLength = uint8(-contentLength & 7) } +// conn sends records over rwc +type conn struct { + mutex sync.Mutex + rwc io.ReadWriteCloser + + // to avoid allocations + buf bytes.Buffer + h header +} + +func newConn(rwc io.ReadWriteCloser) *conn { + return &conn{rwc: rwc} +} + +func (c *conn) Close() error { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.rwc.Close() +} + type record struct { h header buf [maxWrite + maxPad]byte @@ -109,69 +144,39 @@ func (r *record) content() []byte { return r.buf[:r.h.ContentLength] } -type FCGIClient struct { - mutex sync.Mutex - rwc io.ReadWriteCloser - h header - buf bytes.Buffer - keepAlive bool -} - -func NewClient(h string, args ...interface{}) (fcgi *FCGIClient, err error) { - var conn net.Conn - if len(args) != 1 { - err = errors.New("fcgi: not enough params") - return - } - switch args[0].(type) { - case int: - addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10) - conn, err = net.Dial("tcp", addr) - case string: - laddr := net.UnixAddr{Name: args[0].(string), Net: h} - conn, err = net.DialUnix(h, nil, &laddr) - default: - err = errors.New("fcgi: we only accept int (port) or string (socket) params.") - } - fcgi = &FCGIClient{ - rwc: conn, - keepAlive: false, - } - return -} - -func (client *FCGIClient) writeRecord(recType uint8, reqId uint16, content []byte) (err error) { - client.mutex.Lock() - defer client.mutex.Unlock() - client.buf.Reset() - client.h.init(recType, reqId, len(content)) - if err := binary.Write(&client.buf, binary.BigEndian, client.h); err != nil { +// writeRecord writes and sends a single record. +func (c *conn) writeRecord(recType recType, reqId uint16, b []byte) error { + c.mutex.Lock() + defer c.mutex.Unlock() + c.buf.Reset() + c.h.init(recType, reqId, len(b)) + if err := binary.Write(&c.buf, binary.BigEndian, c.h); err != nil { return err } - if _, err := client.buf.Write(content); err != nil { + if _, err := c.buf.Write(b); err != nil { return err } - if _, err := client.buf.Write(pad[:client.h.PaddingLength]); err != nil { + if _, err := c.buf.Write(pad[:c.h.PaddingLength]); err != nil { return err } - _, err = client.rwc.Write(client.buf.Bytes()) + _, err := c.rwc.Write(c.buf.Bytes()) return err } -func (client *FCGIClient) writeBeginRequest(reqId uint16, role uint16, flags uint8) error { +func (c *conn) writeBeginRequest(reqId uint16, role uint16, flags uint8) error { b := [8]byte{byte(role >> 8), byte(role), flags} - return client.writeRecord(FCGI_BEGIN_REQUEST, reqId, b[:]) + return c.writeRecord(typeBeginRequest, reqId, b[:]) } -func (client *FCGIClient) writeEndRequest(reqId uint16, appStatus int, protocolStatus uint8) error { +func (c *conn) writeEndRequest(reqId uint16, appStatus int, protocolStatus uint8) error { b := make([]byte, 8) binary.BigEndian.PutUint32(b, uint32(appStatus)) b[4] = protocolStatus - return client.writeRecord(FCGI_END_REQUEST, reqId, b) + return c.writeRecord(typeEndRequest, reqId, b) } -func (client *FCGIClient) writePairs(recType uint8, reqId uint16, pairs map[string]string) error { - w := newWriter(client, recType, reqId) +func (c *conn) writePairs(recType recType, reqId uint16, pairs map[string]string) error { + w := newWriter(c, recType, reqId) b := make([]byte, 8) for k, v := range pairs { n := encodeSize(b, uint32(len(k))) @@ -238,7 +243,7 @@ func (w *bufWriter) Close() error { return w.closer.Close() } -func newWriter(c *FCGIClient, recType uint8, reqId uint16) *bufWriter { +func newWriter(c *conn, recType recType, reqId uint16) *bufWriter { s := &streamWriter{c: c, recType: recType, reqId: reqId} w := bufio.NewWriterSize(s, maxWrite) return &bufWriter{s, w} @@ -247,8 +252,8 @@ func newWriter(c *FCGIClient, recType uint8, reqId uint16) *bufWriter { // streamWriter abstracts out the separation of a stream into discrete records. // It only writes maxWrite bytes at a time. type streamWriter struct { - c *FCGIClient - recType uint8 + c *conn + recType recType reqId uint16 } @@ -273,22 +278,44 @@ func (w *streamWriter) Close() error { return w.c.writeRecord(w.recType, w.reqId, nil) } -func (client *FCGIClient) Request(env map[string]string, reqStr string) (retout []byte, reterr []byte, err error) { +func NewClient(h string, args ...interface{}) (fcgi *conn, err error) { + var con net.Conn + if len(args) != 1 { + err = errors.New("fcgi: not enough params") + return + } + switch args[0].(type) { + case int: + addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10) + con, err = net.Dial("tcp", addr) + case string: + laddr := net.UnixAddr{Name: args[0].(string), Net: h} + con, err = net.DialUnix(h, nil, &laddr) + default: + err = errors.New("fcgi: we only accept int (port) or string (socket) params.") + } + fcgi = &conn{ + rwc: con, + } + return +} - var reqId uint16 = 1 +func (client *conn) Request(env map[string]string, requestData string) (retout []byte, reterr []byte, err error) { defer client.rwc.Close() + var reqId uint16 = 1 - err = client.writeBeginRequest(reqId, uint16(FCGI_RESPONDER), 0) + err = client.writeBeginRequest(reqId, uint16(roleResponder), 0) if err != nil { return } - err = client.writePairs(FCGI_PARAMS, reqId, env) + + err = client.writePairs(typeParams, reqId, env) if err != nil { return } - if len(reqStr) > 0 { - err = client.writeRecord(FCGI_STDIN, reqId, []byte(reqStr)) - if err != nil { + + if len(requestData) > 0 { + if err = client.writeRecord(typeStdin, reqId, []byte(requestData)); err != nil { return } } @@ -297,23 +324,25 @@ func (client *FCGIClient) Request(env map[string]string, reqStr string) (retout var err1 error // recive untill EOF or FCGI_END_REQUEST +READ_LOOP: for { err1 = rec.read(client.rwc) - if err1 != nil { + if err1 != nil && strings.Contains(err1.Error(), "use of closed network connection") { if err1 != io.EOF { err = err1 } break } + switch { - case rec.h.Type == FCGI_STDOUT: + case rec.h.Type == typeStdout: retout = append(retout, rec.content()...) - case rec.h.Type == FCGI_STDERR: + case rec.h.Type == typeStderr: reterr = append(reterr, rec.content()...) - case rec.h.Type == FCGI_END_REQUEST: + case rec.h.Type == typeEndRequest: fallthrough default: - break + break READ_LOOP } } diff --git a/plugins/inputs/phpfpm/phpfpm_test.go b/plugins/inputs/phpfpm/phpfpm_test.go index 2f34372bf..58db0cf8b 100644 --- a/plugins/inputs/phpfpm/phpfpm_test.go +++ b/plugins/inputs/phpfpm/phpfpm_test.go @@ -1,24 +1,34 @@ package phpfpm import ( + "crypto/rand" + "encoding/binary" "fmt" + "net" + "net/http" + "net/http/fcgi" + "net/http/httptest" "testing" "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "net/http" - "net/http/httptest" ) -func TestPhpFpmGeneratesMetrics(t *testing.T) { - //We create a fake server to return test data - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprint(w, outputSample) - })) +type statServer struct{} + +// We create a fake server to return test data +func (s statServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") + w.Header().Set("Content-Length", fmt.Sprint(len(outputSample))) + fmt.Fprint(w, outputSample) +} + +func TestPhpFpmGeneratesMetrics_From_Http(t *testing.T) { + sv := statServer{} + ts := httptest.NewServer(sv) defer ts.Close() - //Now we tested again above server, with our authentication data r := &phpfpm{ Urls: []string{ts.URL}, } @@ -29,7 +39,134 @@ func TestPhpFpmGeneratesMetrics(t *testing.T) { require.NoError(t, err) tags := map[string]string{ - "url": ts.Listener.Addr().String(), + "pool": "www", + } + + fields := map[string]interface{}{ + "accepted_conn": int64(3), + "listen_queue": int64(1), + "max_listen_queue": int64(0), + "listen_queue_len": int64(0), + "idle_processes": int64(1), + "active_processes": int64(1), + "total_processes": int64(2), + "max_active_processes": int64(1), + "max_children_reached": int64(2), + "slow_requests": int64(1), + } + + acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags) +} + +func TestPhpFpmGeneratesMetrics_From_Fcgi(t *testing.T) { + // Let OS find an available port + tcp, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("Cannot initalize test server") + } + defer tcp.Close() + + s := statServer{} + go fcgi.Serve(tcp, s) + + //Now we tested again above server + r := &phpfpm{ + Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"}, + } + + var acc testutil.Accumulator + err = r.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "pool": "www", + } + + fields := map[string]interface{}{ + "accepted_conn": int64(3), + "listen_queue": int64(1), + "max_listen_queue": int64(0), + "listen_queue_len": int64(0), + "idle_processes": int64(1), + "active_processes": int64(1), + "total_processes": int64(2), + "max_active_processes": int64(1), + "max_children_reached": int64(2), + "slow_requests": int64(1), + } + + acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags) +} + +func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) { + // Create a socket in /tmp because we always have write permission and if the + // removing of socket fail when system restart /tmp is clear so + // we don't have junk files around + var randomNumber int64 + binary.Read(rand.Reader, binary.LittleEndian, &randomNumber) + tcp, err := net.Listen("unix", fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber)) + if err != nil { + t.Fatal("Cannot initalize server on port ") + } + + defer tcp.Close() + s := statServer{} + go fcgi.Serve(tcp, s) + + r := &phpfpm{ + Urls: []string{tcp.Addr().String()}, + } + + var acc testutil.Accumulator + + err = r.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "pool": "www", + } + + fields := map[string]interface{}{ + "accepted_conn": int64(3), + "listen_queue": int64(1), + "max_listen_queue": int64(0), + "listen_queue_len": int64(0), + "idle_processes": int64(1), + "active_processes": int64(1), + "total_processes": int64(2), + "max_active_processes": int64(1), + "max_children_reached": int64(2), + "slow_requests": int64(1), + } + + acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags) +} + +func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) { + // Create a socket in /tmp because we always have write permission. If the + // removing of socket fail we won't have junk files around. Cuz when system + // restart, it clears out /tmp + var randomNumber int64 + binary.Read(rand.Reader, binary.LittleEndian, &randomNumber) + tcp, err := net.Listen("unix", fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber)) + if err != nil { + t.Fatal("Cannot initalize server on port ") + } + + defer tcp.Close() + s := statServer{} + go fcgi.Serve(tcp, s) + + r := &phpfpm{ + Urls: []string{tcp.Addr().String() + ":custom-status-path"}, + } + + var acc testutil.Accumulator + + err = r.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ "pool": "www", } @@ -51,7 +188,7 @@ func TestPhpFpmGeneratesMetrics(t *testing.T) { //When not passing server config, we default to localhost //We just want to make sure we did request stat from localhost -func TestHaproxyDefaultGetFromLocalhost(t *testing.T) { +func TestPhpFpmDefaultGetFromLocalhost(t *testing.T) { r := &phpfpm{} var acc testutil.Accumulator @@ -61,6 +198,31 @@ func TestHaproxyDefaultGetFromLocalhost(t *testing.T) { assert.Contains(t, err.Error(), "127.0.0.1/status") } +func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t *testing.T) { + r := &phpfpm{ + Urls: []string{"http://aninvalidone"}, + } + + var acc testutil.Accumulator + + err := r.Gather(&acc) + require.Error(t, err) + assert.Contains(t, err.Error(), `Unable to connect to phpfpm status page 'http://aninvalidone': Get http://aninvalidone: dial tcp: lookup aninvalidone`) +} + +func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testing.T) { + r := &phpfpm{ + Urls: []string{"/tmp/invalid.sock"}, + } + + var acc testutil.Accumulator + + err := r.Gather(&acc) + require.Error(t, err) + assert.Equal(t, `Socket doesn't exist '/tmp/invalid.sock': stat /tmp/invalid.sock: no such file or directory`, err.Error()) + +} + const outputSample = ` pool: www process manager: dynamic diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 8a0d24f94..7eedb592a 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -4,6 +4,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/outputs/amon" _ "github.com/influxdb/telegraf/plugins/outputs/amqp" _ "github.com/influxdb/telegraf/plugins/outputs/datadog" + _ "github.com/influxdb/telegraf/plugins/outputs/graphite" _ "github.com/influxdb/telegraf/plugins/outputs/influxdb" _ "github.com/influxdb/telegraf/plugins/outputs/kafka" _ "github.com/influxdb/telegraf/plugins/outputs/kinesis" diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index 6f0e0fde3..e1d6302a1 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -2,7 +2,10 @@ package amqp import ( "bytes" + "crypto/tls" + "crypto/x509" "fmt" + "io/ioutil" "log" "sync" "time" @@ -17,6 +20,12 @@ type AMQP struct { URL string // AMQP exchange Exchange string + // path to CA file + SslCa string + // path to host cert file + SslCert string + // path to cert key file + SslKey string // Routing Key Tag RoutingTag string `toml:"routing_tag"` // InfluxDB database @@ -46,6 +55,11 @@ var sampleConfig = ` # ie, if this tag exists, it's value will be used as the routing key routing_tag = "host" + # Use ssl + #ssl_ca = "/etc/telegraf/ca.pem" + #ssl_cert = "/etc/telegraf/cert.pem" + #ssl_key = "/etc/telegraf/key.pem" + # InfluxDB retention policy #retention_policy = "default" # InfluxDB database @@ -64,7 +78,32 @@ func (q *AMQP) Connect() error { "retention_policy": q.RetentionPolicy, } - connection, err := amqp.Dial(q.URL) + var connection *amqp.Connection + var err error + if q.SslCert != "" && q.SslKey != "" { + // make new tls config + cfg := new(tls.Config) + if q.SslCa != "" { + // create ca pool + cfg.RootCAs = x509.NewCertPool() + + // add self-signed cert + if ca, err := ioutil.ReadFile(q.SslCa); err == nil { + cfg.RootCAs.AppendCertsFromPEM(ca) + } else { + log.Println(err) + } + } + if cert, err := tls.LoadX509KeyPair(q.SslCert, q.SslKey); err == nil { + cfg.Certificates = append(cfg.Certificates, cert) + } else { + log.Println(err) + } + connection, err = amqp.DialTLS(q.URL, cfg) + + } else { + connection, err = amqp.Dial(q.URL) + } if err != nil { return err } diff --git a/plugins/outputs/graphite/README.md b/plugins/outputs/graphite/README.md new file mode 100644 index 000000000..48313a886 --- /dev/null +++ b/plugins/outputs/graphite/README.md @@ -0,0 +1,13 @@ +# Graphite Output Plugin + +This plugin writes to [Graphite](http://graphite.readthedocs.org/en/latest/index.html) via raw TCP. + +Parameters: + + Servers []string + Prefix string + Timeout int + +* `servers`: List of strings, ["mygraphiteserver:2003"]. +* `prefix`: String use to prefix all sent metrics. +* `timeout`: Connection timeout in second. diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go new file mode 100644 index 000000000..dd2af8eb1 --- /dev/null +++ b/plugins/outputs/graphite/graphite.go @@ -0,0 +1,134 @@ +package graphite + +import ( + "errors" + "fmt" + "github.com/influxdb/influxdb/client/v2" + "github.com/influxdb/telegraf/plugins/outputs" + "log" + "math/rand" + "net" + "strings" + "time" +) + +type Graphite struct { + // URL is only for backwards compatability + Servers []string + Prefix string + Timeout int + conns []net.Conn +} + +var sampleConfig = ` + # TCP endpoint for your graphite instance. + servers = ["localhost:2003"] + # Prefix metrics name + prefix = "" + # timeout in seconds for the write connection to graphite + timeout = 2 +` + +func (g *Graphite) Connect() error { + // Set default values + if g.Timeout <= 0 { + g.Timeout = 2 + } + if len(g.Servers) == 0 { + g.Servers = append(g.Servers, "localhost:2003") + } + // Get Connections + var conns []net.Conn + for _, server := range g.Servers { + conn, err := net.DialTimeout("tcp", server, time.Duration(g.Timeout)*time.Second) + if err == nil { + conns = append(conns, conn) + } + } + g.conns = conns + return nil +} + +func (g *Graphite) Close() error { + // Closing all connections + for _, conn := range g.conns { + conn.Close() + } + return nil +} + +func (g *Graphite) SampleConfig() string { + return sampleConfig +} + +func (g *Graphite) Description() string { + return "Configuration for Graphite server to send metrics to" +} + +// Choose a random server in the cluster to write to until a successful write +// occurs, logging each unsuccessful. If all servers fail, return error. +func (g *Graphite) Write(points []*client.Point) error { + // Prepare data + var bp []string + for _, point := range points { + // Get name + name := point.Name() + // Convert UnixNano to Unix timestamps + timestamp := point.UnixNano() / 1000000000 + + for field_name, value := range point.Fields() { + // Convert value + value_str := fmt.Sprintf("%#v", value) + // Write graphite point + var graphitePoint string + if name == field_name { + graphitePoint = fmt.Sprintf("%s.%s %s %d\n", + strings.Replace(point.Tags()["host"], ".", "_", -1), + strings.Replace(name, ".", "_", -1), + value_str, + timestamp) + } else { + graphitePoint = fmt.Sprintf("%s.%s.%s %s %d\n", + strings.Replace(point.Tags()["host"], ".", "_", -1), + strings.Replace(name, ".", "_", -1), + strings.Replace(field_name, ".", "_", -1), + value_str, + timestamp) + } + if g.Prefix != "" { + graphitePoint = fmt.Sprintf("%s.%s", g.Prefix, graphitePoint) + } + bp = append(bp, graphitePoint) + //fmt.Printf(graphitePoint) + } + } + graphitePoints := strings.Join(bp, "") + + // This will get set to nil if a successful write occurs + err := errors.New("Could not write to any Graphite server in cluster\n") + + // Send data to a random server + p := rand.Perm(len(g.conns)) + for _, n := range p { + if _, e := fmt.Fprintf(g.conns[n], graphitePoints); e != nil { + // Error + log.Println("ERROR: " + err.Error()) + // Let's try the next one + } else { + // Success + err = nil + break + } + } + // try to reconnect + if err != nil { + g.Connect() + } + return err +} + +func init() { + outputs.Add("graphite", func() outputs.Output { + return &Graphite{} + }) +} diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go new file mode 100644 index 000000000..e9000c3c7 --- /dev/null +++ b/plugins/outputs/graphite/graphite_test.go @@ -0,0 +1,104 @@ +package graphite + +import ( + "bufio" + "net" + "net/textproto" + "sync" + "testing" + "time" + + "github.com/influxdb/influxdb/client/v2" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGraphiteError(t *testing.T) { + // Init plugin + g := Graphite{ + Servers: []string{"127.0.0.1:2003", "127.0.0.1:12003"}, + Prefix: "my.prefix", + } + // Init points + pt1, _ := client.NewPoint( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"mymeasurement": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + // Prepare point list + var points []*client.Point + points = append(points, pt1) + // Error + err1 := g.Connect() + require.NoError(t, err1) + err2 := g.Write(points) + require.Error(t, err2) + assert.Equal(t, "Could not write to any Graphite server in cluster\n", err2.Error()) +} + +func TestGraphiteOK(t *testing.T) { + var wg sync.WaitGroup + // Init plugin + g := Graphite{ + Prefix: "my.prefix", + } + // Init points + pt1, _ := client.NewPoint( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"mymeasurement": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + pt2, _ := client.NewPoint( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + pt3, _ := client.NewPoint( + "my_measurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + // Prepare point list + var points []*client.Point + points = append(points, pt1) + points = append(points, pt2) + points = append(points, pt3) + // Start TCP server + wg.Add(1) + go TCPServer(t, &wg) + wg.Wait() + // Connect + wg.Add(1) + err1 := g.Connect() + wg.Wait() + require.NoError(t, err1) + // Send Data + err2 := g.Write(points) + require.NoError(t, err2) + wg.Add(1) + // Waiting TCPserver + wg.Wait() + g.Close() +} + +func TCPServer(t *testing.T, wg *sync.WaitGroup) { + tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") + wg.Done() + conn, _ := tcpServer.Accept() + wg.Done() + reader := bufio.NewReader(conn) + tp := textproto.NewReader(reader) + data1, _ := tp.ReadLine() + assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data1) + data2, _ := tp.ReadLine() + assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.value 3.14 1289430000", data2) + data3, _ := tp.ReadLine() + assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.value 3.14 1289430000", data3) + conn.Close() + wg.Done() +} diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 8e53cc511..55ef35fb4 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -1,12 +1,14 @@ package kafka import ( + "crypto/tls" + "crypto/x509" "errors" "fmt" - "github.com/Shopify/sarama" "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/telegraf/plugins/outputs" + "io/ioutil" ) type Kafka struct { @@ -16,8 +18,17 @@ type Kafka struct { Topic string // Routing Key Tag RoutingTag string `toml:"routing_tag"` + // TLS client certificate + Certificate string + // TLS client key + Key string + // TLS certificate authority + CA string + // Verfiy SSL certificate chain + VerifySsl bool - producer sarama.SyncProducer + tlsConfig tls.Config + producer sarama.SyncProducer } var sampleConfig = ` @@ -28,10 +39,60 @@ var sampleConfig = ` # Telegraf tag to use as a routing key # ie, if this tag exists, it's value will be used as the routing key routing_tag = "host" + + # Optional TLS configuration: + # Client certificate + certificate = "" + # Client key + key = "" + # Certificate authority file + ca = "" + # Verify SSL certificate chain + verify_ssl = false ` +func createTlsConfiguration(k *Kafka) (t *tls.Config, err error) { + if k.Certificate != "" && k.Key != "" && k.CA != "" { + cert, err := tls.LoadX509KeyPair(k.Certificate, k.Key) + if err != nil { + return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS client key/certificate: %s", + err)) + } + + caCert, err := ioutil.ReadFile(k.CA) + if err != nil { + return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS CA: %s", + err)) + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + t = &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + InsecureSkipVerify: k.VerifySsl, + } + } + // will be nil by default if nothing is provided + return t, nil +} + func (k *Kafka) Connect() error { - producer, err := sarama.NewSyncProducer(k.Brokers, nil) + config := sarama.NewConfig() + config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message + config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message + tlsConfig, err := createTlsConfiguration(k) + if err != nil { + return err + } + + if tlsConfig != nil { + config.Net.TLS.Config = tlsConfig + config.Net.TLS.Enable = true + } + + producer, err := sarama.NewSyncProducer(k.Brokers, config) if err != nil { return err }