commit
5284480240
|
@ -6,10 +6,17 @@
|
|||
- [#475](https://github.com/influxdata/telegraf/pull/475): Add response time to httpjson plugin. Thanks @titilambert!
|
||||
- [#519](https://github.com/influxdata/telegraf/pull/519): Added a sensors input based on lm-sensors. Thanks @md14454!
|
||||
- [#467](https://github.com/influxdata/telegraf/issues/467): Add option to disable statsd measurement name conversion.
|
||||
- [#534](https://github.com/influxdata/telegraf/pull/534): NSQ input plugin. Thanks @allingeek!
|
||||
- [#494](https://github.com/influxdata/telegraf/pull/494): Graphite output plugin. Thanks @titilambert!
|
||||
- AMQP SSL support. Thanks @ekini!
|
||||
- [#539](https://github.com/influxdata/telegraf/pull/539): Reload config on SIGHUP. Thanks @titilambert!
|
||||
- [#522](https://github.com/influxdata/telegraf/pull/522): Phusion passenger input plugin. Thanks @kureikain!
|
||||
- [#541](https://github.com/influxdata/telegraf/pull/541): Kafka output TLS cert support. Thanks @Ormod!
|
||||
|
||||
### Bugfixes
|
||||
- [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert!
|
||||
- [#508](https://github.com/influxdb/telegraf/pull/508): Fix prometheus cardinality issue with the `net` plugin
|
||||
- [#499](https://github.com/influxdata/telegraf/issues/499) & [#502](https://github.com/influxdata/telegraf/issues/502): php fpm unix socket and other fixes, thanks @kureikain!
|
||||
|
||||
## v0.10.0 [2016-01-12]
|
||||
|
||||
|
|
1
Godeps
1
Godeps
|
@ -46,6 +46,7 @@ github.com/wvanbergen/kafka 1a8639a45164fcc245d5c7b4bd3ccfbd1a0ffbf3
|
|||
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
|
||||
golang.org/x/crypto 3760e016850398b85094c4c99e955b8c3dea5711
|
||||
golang.org/x/net 99ca920b6037ef77af8a11297150f7f0d8f4ef80
|
||||
golang.org/x/text cf4986612c83df6c55578ba198316d1684a9a287
|
||||
gopkg.in/dancannon/gorethink.v1 e2cef022d0495329dfb0635991de76efcab5cf50
|
||||
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
|
||||
gopkg.in/mgo.v2 e30de8ac9ae3b30df7065f766c71f88bba7d4e49
|
||||
|
|
|
@ -152,7 +152,9 @@ Currently implemented sources:
|
|||
* mongodb
|
||||
* mysql
|
||||
* nginx
|
||||
* nsq
|
||||
* phpfpm
|
||||
* phusion passenger
|
||||
* ping
|
||||
* postgresql
|
||||
* procstat
|
||||
|
@ -188,6 +190,7 @@ want to add support for another service or third-party API.
|
|||
* amon
|
||||
* amqp
|
||||
* datadog
|
||||
* graphite
|
||||
* kafka
|
||||
* amazon kinesis
|
||||
* librato
|
||||
|
|
2
agent.go
2
agent.go
|
@ -58,7 +58,7 @@ func (a *Agent) Connect() error {
|
|||
}
|
||||
err := o.Output.Connect()
|
||||
if err != nil {
|
||||
log.Printf("Failed to connect to output %s, retrying in 15s\n", o.Name)
|
||||
log.Printf("Failed to connect to output %s, retrying in 15s, error was '%s' \n", o.Name, err)
|
||||
time.Sleep(15 * time.Second)
|
||||
err = o.Output.Connect()
|
||||
if err != nil {
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/influxdb/telegraf"
|
||||
"github.com/influxdb/telegraf/internal/config"
|
||||
|
@ -82,6 +83,10 @@ Examples:
|
|||
`
|
||||
|
||||
func main() {
|
||||
reload := make(chan bool, 1)
|
||||
reload <- true
|
||||
for <-reload {
|
||||
reload <- false
|
||||
flag.Usage = usageExit
|
||||
flag.Parse()
|
||||
|
||||
|
@ -196,10 +201,18 @@ func main() {
|
|||
|
||||
shutdown := make(chan struct{})
|
||||
signals := make(chan os.Signal)
|
||||
signal.Notify(signals, os.Interrupt)
|
||||
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)
|
||||
go func() {
|
||||
<-signals
|
||||
sig := <-signals
|
||||
if sig == os.Interrupt {
|
||||
close(shutdown)
|
||||
}
|
||||
if sig == syscall.SIGHUP {
|
||||
log.Printf("Reloading Telegraf config\n")
|
||||
<-reload
|
||||
reload <- true
|
||||
close(shutdown)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Printf("Starting Telegraf (version %s)\n", Version)
|
||||
|
@ -219,6 +232,7 @@ func main() {
|
|||
}
|
||||
|
||||
ag.Run(shutdown)
|
||||
}
|
||||
}
|
||||
|
||||
func usageExit() {
|
||||
|
|
|
@ -19,6 +19,8 @@ import (
|
|||
_ "github.com/influxdb/telegraf/plugins/inputs/mongodb"
|
||||
_ "github.com/influxdb/telegraf/plugins/inputs/mysql"
|
||||
_ "github.com/influxdb/telegraf/plugins/inputs/nginx"
|
||||
_ "github.com/influxdb/telegraf/plugins/inputs/nsq"
|
||||
_ "github.com/influxdb/telegraf/plugins/inputs/passenger"
|
||||
_ "github.com/influxdb/telegraf/plugins/inputs/phpfpm"
|
||||
_ "github.com/influxdb/telegraf/plugins/inputs/ping"
|
||||
_ "github.com/influxdb/telegraf/plugins/inputs/postgresql"
|
||||
|
|
|
@ -0,0 +1,271 @@
|
|||
// The MIT License (MIT)
|
||||
//
|
||||
// Copyright (c) 2015 Jeff Nickoloff (jeff@allingeek.com)
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
// of this software and associated documentation files (the "Software"), to deal
|
||||
// in the Software without restriction, including without limitation the rights
|
||||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
// copies of the Software, and to permit persons to whom the Software is
|
||||
// furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in all
|
||||
// copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
// SOFTWARE.
|
||||
|
||||
package nsq
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
// Might add Lookupd endpoints for cluster discovery
|
||||
type NSQ struct {
|
||||
Endpoints []string
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# An array of NSQD HTTP API endpoints
|
||||
endpoints = ["http://localhost:4151"]
|
||||
`
|
||||
|
||||
const (
|
||||
requestPattern = `%s/stats?format=json`
|
||||
)
|
||||
|
||||
func init() {
|
||||
inputs.Add("nsq", func() inputs.Input {
|
||||
return &NSQ{}
|
||||
})
|
||||
}
|
||||
|
||||
func (n *NSQ) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (n *NSQ) Description() string {
|
||||
return "Read NSQ topic and channel statistics."
|
||||
}
|
||||
|
||||
func (n *NSQ) Gather(acc inputs.Accumulator) error {
|
||||
var wg sync.WaitGroup
|
||||
var outerr error
|
||||
|
||||
for _, e := range n.Endpoints {
|
||||
wg.Add(1)
|
||||
go func(e string) {
|
||||
defer wg.Done()
|
||||
outerr = n.gatherEndpoint(e, acc)
|
||||
}(e)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return outerr
|
||||
}
|
||||
|
||||
var tr = &http.Transport{
|
||||
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
||||
}
|
||||
|
||||
var client = &http.Client{Transport: tr}
|
||||
|
||||
func (n *NSQ) gatherEndpoint(e string, acc inputs.Accumulator) error {
|
||||
u, err := buildURL(e)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r, err := client.Get(u.String())
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error while polling %s: %s", u.String(), err)
|
||||
}
|
||||
defer r.Body.Close()
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("%s returned HTTP status %s", u.String(), r.Status)
|
||||
}
|
||||
|
||||
s := &NSQStats{}
|
||||
err = json.NewDecoder(r.Body).Decode(s)
|
||||
if err != nil {
|
||||
return fmt.Errorf(`Error parsing response: %s`, err)
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
`server_host`: u.Host,
|
||||
`server_version`: s.Data.Version,
|
||||
}
|
||||
|
||||
fields := make(map[string]interface{})
|
||||
if s.Data.Health == `OK` {
|
||||
fields["server_count"] = int64(1)
|
||||
} else {
|
||||
fields["server_count"] = int64(0)
|
||||
}
|
||||
fields["topic_count"] = int64(len(s.Data.Topics))
|
||||
|
||||
acc.AddFields("nsq_server", fields, tags)
|
||||
for _, t := range s.Data.Topics {
|
||||
topicStats(t, acc, u.Host, s.Data.Version)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildURL(e string) (*url.URL, error) {
|
||||
u := fmt.Sprintf(requestPattern, e)
|
||||
addr, err := url.Parse(u)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Unable to parse address '%s': %s", u, err)
|
||||
}
|
||||
return addr, nil
|
||||
}
|
||||
|
||||
func topicStats(t TopicStats, acc inputs.Accumulator, host, version string) {
|
||||
// per topic overall (tag: name, paused, channel count)
|
||||
tags := map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": version,
|
||||
"topic": t.Name,
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"depth": t.Depth,
|
||||
"backend_depth": t.BackendDepth,
|
||||
"message_count": t.MessageCount,
|
||||
"channel_count": int64(len(t.Channels)),
|
||||
}
|
||||
acc.AddFields("nsq_topic", fields, tags)
|
||||
|
||||
for _, c := range t.Channels {
|
||||
channelStats(c, acc, host, version, t.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func channelStats(c ChannelStats, acc inputs.Accumulator, host, version, topic string) {
|
||||
tags := map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": version,
|
||||
"topic": topic,
|
||||
"channel": c.Name,
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"depth": c.Depth,
|
||||
"backend_depth": c.BackendDepth,
|
||||
"inflight_count": c.InFlightCount,
|
||||
"deferred_count": c.DeferredCount,
|
||||
"message_count": c.MessageCount,
|
||||
"requeue_count": c.RequeueCount,
|
||||
"timeout_count": c.TimeoutCount,
|
||||
"client_count": int64(len(c.Clients)),
|
||||
}
|
||||
|
||||
acc.AddFields("nsq_channel", fields, tags)
|
||||
for _, cl := range c.Clients {
|
||||
clientStats(cl, acc, host, version, topic, c.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func clientStats(c ClientStats, acc inputs.Accumulator, host, version, topic, channel string) {
|
||||
tags := map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": version,
|
||||
"topic": topic,
|
||||
"channel": channel,
|
||||
"client_name": c.Name,
|
||||
"client_id": c.ID,
|
||||
"client_hostname": c.Hostname,
|
||||
"client_version": c.Version,
|
||||
"client_address": c.RemoteAddress,
|
||||
"client_user_agent": c.UserAgent,
|
||||
"client_tls": strconv.FormatBool(c.TLS),
|
||||
"client_snappy": strconv.FormatBool(c.Snappy),
|
||||
"client_deflate": strconv.FormatBool(c.Deflate),
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"ready_count": c.ReadyCount,
|
||||
"inflight_count": c.InFlightCount,
|
||||
"message_count": c.MessageCount,
|
||||
"finish_count": c.FinishCount,
|
||||
"requeue_count": c.RequeueCount,
|
||||
}
|
||||
acc.AddFields("nsq_client", fields, tags)
|
||||
}
|
||||
|
||||
type NSQStats struct {
|
||||
Code int64 `json:"status_code"`
|
||||
Txt string `json:"status_txt"`
|
||||
Data NSQStatsData `json:"data"`
|
||||
}
|
||||
|
||||
type NSQStatsData struct {
|
||||
Version string `json:"version"`
|
||||
Health string `json:"health"`
|
||||
StartTime int64 `json:"start_time"`
|
||||
Topics []TopicStats `json:"topics"`
|
||||
}
|
||||
|
||||
// e2e_processing_latency is not modeled
|
||||
type TopicStats struct {
|
||||
Name string `json:"topic_name"`
|
||||
Depth int64 `json:"depth"`
|
||||
BackendDepth int64 `json:"backend_depth"`
|
||||
MessageCount int64 `json:"message_count"`
|
||||
Paused bool `json:"paused"`
|
||||
Channels []ChannelStats `json:"channels"`
|
||||
}
|
||||
|
||||
// e2e_processing_latency is not modeled
|
||||
type ChannelStats struct {
|
||||
Name string `json:"channel_name"`
|
||||
Depth int64 `json:"depth"`
|
||||
BackendDepth int64 `json:"backend_depth"`
|
||||
InFlightCount int64 `json:"in_flight_count"`
|
||||
DeferredCount int64 `json:"deferred_count"`
|
||||
MessageCount int64 `json:"message_count"`
|
||||
RequeueCount int64 `json:"requeue_count"`
|
||||
TimeoutCount int64 `json:"timeout_count"`
|
||||
Paused bool `json:"paused"`
|
||||
Clients []ClientStats `json:"clients"`
|
||||
}
|
||||
|
||||
type ClientStats struct {
|
||||
Name string `json:"name"`
|
||||
ID string `json:"client_id"`
|
||||
Hostname string `json:"hostname"`
|
||||
Version string `json:"version"`
|
||||
RemoteAddress string `json:"remote_address"`
|
||||
State int64 `json:"state"`
|
||||
ReadyCount int64 `json:"ready_count"`
|
||||
InFlightCount int64 `json:"in_flight_count"`
|
||||
MessageCount int64 `json:"message_count"`
|
||||
FinishCount int64 `json:"finish_count"`
|
||||
RequeueCount int64 `json:"requeue_count"`
|
||||
ConnectTime int64 `json:"connect_ts"`
|
||||
SampleRate int64 `json:"sample_rate"`
|
||||
Deflate bool `json:"deflate"`
|
||||
Snappy bool `json:"snappy"`
|
||||
UserAgent string `json:"user_agent"`
|
||||
TLS bool `json:"tls"`
|
||||
TLSCipherSuite string `json:"tls_cipher_suite"`
|
||||
TLSVersion string `json:"tls_version"`
|
||||
TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"`
|
||||
TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"`
|
||||
}
|
|
@ -0,0 +1,273 @@
|
|||
package nsq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/telegraf/testutil"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNSQStats(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintln(w, response)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
n := &NSQ{
|
||||
Endpoints: []string{ts.URL},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
err := n.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
u, err := url.Parse(ts.URL)
|
||||
require.NoError(t, err)
|
||||
host := u.Host
|
||||
|
||||
// actually validate the tests
|
||||
tests := []struct {
|
||||
m string
|
||||
f map[string]interface{}
|
||||
g map[string]string
|
||||
}{
|
||||
{
|
||||
"nsq_server",
|
||||
map[string]interface{}{
|
||||
"server_count": int64(1),
|
||||
"topic_count": int64(2),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "0.3.6",
|
||||
},
|
||||
},
|
||||
{
|
||||
"nsq_topic",
|
||||
map[string]interface{}{
|
||||
"depth": int64(12),
|
||||
"backend_depth": int64(13),
|
||||
"message_count": int64(14),
|
||||
"channel_count": int64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "0.3.6",
|
||||
"topic": "t1"},
|
||||
},
|
||||
{
|
||||
"nsq_channel",
|
||||
map[string]interface{}{
|
||||
"depth": int64(0),
|
||||
"backend_depth": int64(1),
|
||||
"inflight_count": int64(2),
|
||||
"deferred_count": int64(3),
|
||||
"message_count": int64(4),
|
||||
"requeue_count": int64(5),
|
||||
"timeout_count": int64(6),
|
||||
"client_count": int64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "0.3.6",
|
||||
"topic": "t1",
|
||||
"channel": "c1",
|
||||
},
|
||||
},
|
||||
{
|
||||
"nsq_client",
|
||||
map[string]interface{}{
|
||||
"ready_count": int64(200),
|
||||
"inflight_count": int64(7),
|
||||
"message_count": int64(8),
|
||||
"finish_count": int64(9),
|
||||
"requeue_count": int64(10),
|
||||
},
|
||||
map[string]string{"server_host": host, "server_version": "0.3.6",
|
||||
"topic": "t1", "channel": "c1", "client_name": "373a715cd990",
|
||||
"client_id": "373a715cd990", "client_hostname": "373a715cd990",
|
||||
"client_version": "V2", "client_address": "172.17.0.11:35560",
|
||||
"client_tls": "false", "client_snappy": "false",
|
||||
"client_deflate": "false",
|
||||
"client_user_agent": "nsq_to_nsq/0.3.6 go-nsq/1.0.5"},
|
||||
},
|
||||
{
|
||||
"nsq_topic",
|
||||
map[string]interface{}{
|
||||
"depth": int64(28),
|
||||
"backend_depth": int64(29),
|
||||
"message_count": int64(30),
|
||||
"channel_count": int64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "0.3.6",
|
||||
"topic": "t2"},
|
||||
},
|
||||
{
|
||||
"nsq_channel",
|
||||
map[string]interface{}{
|
||||
"depth": int64(15),
|
||||
"backend_depth": int64(16),
|
||||
"inflight_count": int64(17),
|
||||
"deferred_count": int64(18),
|
||||
"message_count": int64(19),
|
||||
"requeue_count": int64(20),
|
||||
"timeout_count": int64(21),
|
||||
"client_count": int64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": "0.3.6",
|
||||
"topic": "t2",
|
||||
"channel": "c2",
|
||||
},
|
||||
},
|
||||
{
|
||||
"nsq_client",
|
||||
map[string]interface{}{
|
||||
"ready_count": int64(22),
|
||||
"inflight_count": int64(23),
|
||||
"message_count": int64(24),
|
||||
"finish_count": int64(25),
|
||||
"requeue_count": int64(26),
|
||||
},
|
||||
map[string]string{"server_host": host, "server_version": "0.3.6",
|
||||
"topic": "t2", "channel": "c2", "client_name": "377569bd462b",
|
||||
"client_id": "377569bd462b", "client_hostname": "377569bd462b",
|
||||
"client_version": "V2", "client_address": "172.17.0.8:48145",
|
||||
"client_user_agent": "go-nsq/1.0.5", "client_tls": "true",
|
||||
"client_snappy": "true", "client_deflate": "true"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
acc.AssertContainsTaggedFields(t, test.m, test.f, test.g)
|
||||
}
|
||||
}
|
||||
|
||||
var response = `
|
||||
{
|
||||
"status_code": 200,
|
||||
"status_txt": "OK",
|
||||
"data": {
|
||||
"version": "0.3.6",
|
||||
"health": "OK",
|
||||
"start_time": 1452021674,
|
||||
"topics": [
|
||||
{
|
||||
"topic_name": "t1",
|
||||
"channels": [
|
||||
{
|
||||
"channel_name": "c1",
|
||||
"depth": 0,
|
||||
"backend_depth": 1,
|
||||
"in_flight_count": 2,
|
||||
"deferred_count": 3,
|
||||
"message_count": 4,
|
||||
"requeue_count": 5,
|
||||
"timeout_count": 6,
|
||||
"clients": [
|
||||
{
|
||||
"name": "373a715cd990",
|
||||
"client_id": "373a715cd990",
|
||||
"hostname": "373a715cd990",
|
||||
"version": "V2",
|
||||
"remote_address": "172.17.0.11:35560",
|
||||
"state": 3,
|
||||
"ready_count": 200,
|
||||
"in_flight_count": 7,
|
||||
"message_count": 8,
|
||||
"finish_count": 9,
|
||||
"requeue_count": 10,
|
||||
"connect_ts": 1452021675,
|
||||
"sample_rate": 11,
|
||||
"deflate": false,
|
||||
"snappy": false,
|
||||
"user_agent": "nsq_to_nsq\/0.3.6 go-nsq\/1.0.5",
|
||||
"tls": false,
|
||||
"tls_cipher_suite": "",
|
||||
"tls_version": "",
|
||||
"tls_negotiated_protocol": "",
|
||||
"tls_negotiated_protocol_is_mutual": false
|
||||
}
|
||||
],
|
||||
"paused": false,
|
||||
"e2e_processing_latency": {
|
||||
"count": 0,
|
||||
"percentiles": null
|
||||
}
|
||||
}
|
||||
],
|
||||
"depth": 12,
|
||||
"backend_depth": 13,
|
||||
"message_count": 14,
|
||||
"paused": false,
|
||||
"e2e_processing_latency": {
|
||||
"count": 0,
|
||||
"percentiles": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"topic_name": "t2",
|
||||
"channels": [
|
||||
{
|
||||
"channel_name": "c2",
|
||||
"depth": 15,
|
||||
"backend_depth": 16,
|
||||
"in_flight_count": 17,
|
||||
"deferred_count": 18,
|
||||
"message_count": 19,
|
||||
"requeue_count": 20,
|
||||
"timeout_count": 21,
|
||||
"clients": [
|
||||
{
|
||||
"name": "377569bd462b",
|
||||
"client_id": "377569bd462b",
|
||||
"hostname": "377569bd462b",
|
||||
"version": "V2",
|
||||
"remote_address": "172.17.0.8:48145",
|
||||
"state": 3,
|
||||
"ready_count": 22,
|
||||
"in_flight_count": 23,
|
||||
"message_count": 24,
|
||||
"finish_count": 25,
|
||||
"requeue_count": 26,
|
||||
"connect_ts": 1452021678,
|
||||
"sample_rate": 27,
|
||||
"deflate": true,
|
||||
"snappy": true,
|
||||
"user_agent": "go-nsq\/1.0.5",
|
||||
"tls": true,
|
||||
"tls_cipher_suite": "",
|
||||
"tls_version": "",
|
||||
"tls_negotiated_protocol": "",
|
||||
"tls_negotiated_protocol_is_mutual": false
|
||||
}
|
||||
],
|
||||
"paused": false,
|
||||
"e2e_processing_latency": {
|
||||
"count": 0,
|
||||
"percentiles": null
|
||||
}
|
||||
}
|
||||
],
|
||||
"depth": 28,
|
||||
"backend_depth": 29,
|
||||
"message_count": 30,
|
||||
"paused": false,
|
||||
"e2e_processing_latency": {
|
||||
"count": 0,
|
||||
"percentiles": null
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
`
|
|
@ -0,0 +1,138 @@
|
|||
# Telegraf plugin: passenger
|
||||
|
||||
Get phusion passenger stat using their command line utility
|
||||
`passenger-status`
|
||||
|
||||
# Measurements
|
||||
|
||||
Meta:
|
||||
|
||||
- tags:
|
||||
|
||||
* name
|
||||
* passenger_version
|
||||
* pid
|
||||
* code_revision
|
||||
|
||||
Measurement names:
|
||||
|
||||
- passenger:
|
||||
|
||||
* Tags: `passenger_version`
|
||||
* Fields:
|
||||
|
||||
- process_count
|
||||
- max
|
||||
- capacity_used
|
||||
- get_wait_list_size
|
||||
|
||||
- passenger_supergroup:
|
||||
|
||||
* Tags: `name`
|
||||
* Fields:
|
||||
|
||||
- get_wait_list_size
|
||||
- capacity_used
|
||||
|
||||
- passenger_group:
|
||||
|
||||
* Tags:
|
||||
|
||||
- name
|
||||
- app_root
|
||||
- app_type
|
||||
|
||||
* Fields:
|
||||
|
||||
- get_wait_list_size
|
||||
- capacity_used
|
||||
- processes_being_spawned
|
||||
|
||||
- passenger_process:
|
||||
|
||||
* Tags:
|
||||
|
||||
- group_name
|
||||
- app_root
|
||||
- supergroup_name
|
||||
- pid
|
||||
- code_revision
|
||||
- life_status
|
||||
- process_group_id
|
||||
|
||||
* Field:
|
||||
|
||||
- concurrency
|
||||
- sessions
|
||||
- busyness
|
||||
- processed
|
||||
- spawner_creation_time
|
||||
- spawn_start_time
|
||||
- spawn_end_time
|
||||
- last_used
|
||||
- uptime
|
||||
- cpu
|
||||
- rss
|
||||
- pss
|
||||
- private_dirty
|
||||
- swap
|
||||
- real_memory
|
||||
- vmsize
|
||||
|
||||
# Example output
|
||||
|
||||
Using this configuration:
|
||||
|
||||
```
|
||||
[[inputs.passenger]]
|
||||
# Path of passenger-status.
|
||||
#
|
||||
# Plugin gather metric via parsing XML output of passenger-status
|
||||
# More information about the tool:
|
||||
# https://www.phusionpassenger.com/library/admin/apache/overall_status_report.html
|
||||
#
|
||||
#
|
||||
# If no path is specified, then the plugin simply execute passenger-status
|
||||
# hopefully it can be found in your PATH
|
||||
command = "passenger-status -v --show=xml"
|
||||
```
|
||||
|
||||
When run with:
|
||||
|
||||
```
|
||||
./telegraf -config telegraf.conf -test -input-filter passenger
|
||||
```
|
||||
|
||||
It produces:
|
||||
|
||||
```
|
||||
> passenger,passenger_version=5.0.17 capacity_used=23i,get_wait_list_size=0i,max=23i,process_count=23i 1452984112799414257
|
||||
> passenger_supergroup,name=/var/app/current/public capacity_used=23i,get_wait_list_size=0i 1452984112799496977
|
||||
> passenger_group,app_root=/var/app/current,app_type=rack,name=/var/app/current/public capacity_used=23i,get_wait_list_size=0i,processes_being_spawned=0i 1452984112799527021
|
||||
> passenger_process,app_root=/var/app/current,code_revision=899ac7f,group_name=/var/app/current/public,life_status=ALIVE,pid=11553,process_group_id=13608,supergroup_name=/var/app/current/public busyness=0i,concurrency=1i,cpu=58i,last_used=1452747071764940i,private_dirty=314900i,processed=951i,pss=319391i,real_memory=314900i,rss=418548i,sessions=0i,spawn_end_time=1452746845013365i,spawn_start_time=1452746844946982i,spawner_creation_time=1452746835922747i,swap=0i,uptime=226i,vmsize=1563580i 1452984112799571490
|
||||
> passenger_process,app_root=/var/app/current,code_revision=899ac7f,group_name=/var/app/current/public,life_status=ALIVE,pid=11563,process_group_id=13608,supergroup_name=/var/app/current/public busyness=2147483647i,concurrency=1i,cpu=47i,last_used=1452747071709179i,private_dirty=309240i,processed=756i,pss=314036i,real_memory=309240i,rss=418296i,sessions=1i,spawn_end_time=1452746845172460i,spawn_start_time=1452746845136882i,spawner_creation_time=1452746835922747i,swap=0i,uptime=226i,vmsize=1563608i 1452984112799638581
|
||||
```
|
||||
|
||||
# Note
|
||||
|
||||
You have to ensure that you can run the `passenger-status` command under
|
||||
telegraf user. Depend on how you install and configure passenger, this
|
||||
maybe an issue for you. If you are using passenger standlone, or compile
|
||||
yourself, it is straight forward. However, if you are using gem and
|
||||
`rvm`, it maybe harder to get this right.
|
||||
|
||||
Such as with `rvm`, you can use this command:
|
||||
|
||||
```
|
||||
~/.rvm/bin/rvm default do passenger-status -v --show=xml
|
||||
```
|
||||
|
||||
You can use `&` and `;` in the shell command to run comlicated shell command
|
||||
in order to get the passenger-status such as load the rvm shell, source the
|
||||
path
|
||||
```
|
||||
command = "source .rvm/scripts/rvm && passenger-status -v --show=xml"
|
||||
```
|
||||
|
||||
Anyway, just ensure that you can run the command under `telegraf` user, and it
|
||||
has to produce XML output.
|
|
@ -0,0 +1,250 @@
|
|||
package passenger
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdb/telegraf/plugins/inputs"
|
||||
"golang.org/x/net/html/charset"
|
||||
)
|
||||
|
||||
type passenger struct {
|
||||
Command string
|
||||
}
|
||||
|
||||
func (p *passenger) parseCommand() (string, []string) {
|
||||
var arguments []string
|
||||
if !strings.Contains(p.Command, " ") {
|
||||
return p.Command, arguments
|
||||
}
|
||||
|
||||
arguments = strings.Split(p.Command, " ")
|
||||
if len(arguments) == 1 {
|
||||
return arguments[0], arguments[1:]
|
||||
}
|
||||
|
||||
return arguments[0], arguments[1:]
|
||||
}
|
||||
|
||||
type info struct {
|
||||
Passenger_version string `xml:"passenger_version"`
|
||||
Process_count int `xml:"process_count"`
|
||||
Capacity_used int `xml:"capacity_used"`
|
||||
Get_wait_list_size int `xml:"get_wait_list_size"`
|
||||
Max int `xml:"max"`
|
||||
Supergroups struct {
|
||||
Supergroup []struct {
|
||||
Name string `xml:"name"`
|
||||
Get_wait_list_size int `xml:"get_wait_list_size"`
|
||||
Capacity_used int `xml:"capacity_used"`
|
||||
Group []struct {
|
||||
Name string `xml:"name"`
|
||||
AppRoot string `xml:"app_root"`
|
||||
AppType string `xml:"app_type"`
|
||||
Enabled_process_count int `xml:"enabled_process_count"`
|
||||
Disabling_process_count int `xml:"disabling_process_count"`
|
||||
Disabled_process_count int `xml:"disabled_process_count"`
|
||||
Capacity_used int `xml:"capacity_used"`
|
||||
Get_wait_list_size int `xml:"get_wait_list_size"`
|
||||
Processes_being_spawned int `xml:"processes_being_spawned"`
|
||||
Processes struct {
|
||||
Process []*process `xml:"process"`
|
||||
} `xml:"processes"`
|
||||
} `xml:"group"`
|
||||
} `xml:"supergroup"`
|
||||
} `xml:"supergroups"`
|
||||
}
|
||||
|
||||
type process struct {
|
||||
Pid int `xml:"pid"`
|
||||
Concurrency int `xml:"concurrency"`
|
||||
Sessions int `xml:"sessions"`
|
||||
Busyness int `xml:"busyness"`
|
||||
Processed int `xml:"processed"`
|
||||
Spawner_creation_time int64 `xml:"spawner_creation_time"`
|
||||
Spawn_start_time int64 `xml:"spawn_start_time"`
|
||||
Spawn_end_time int64 `xml:"spawn_end_time"`
|
||||
Last_used int64 `xml:"last_used"`
|
||||
Uptime string `xml:"uptime"`
|
||||
Code_revision string `xml:"code_revision"`
|
||||
Life_status string `xml:"life_status"`
|
||||
Enabled string `xml:"enabled"`
|
||||
Has_metrics bool `xml:"has_metrics"`
|
||||
Cpu int64 `xml:"cpu"`
|
||||
Rss int64 `xml:"rss"`
|
||||
Pss int64 `xml:"pss"`
|
||||
Private_dirty int64 `xml:"private_dirty"`
|
||||
Swap int64 `xml:"swap"`
|
||||
Real_memory int64 `xml:"real_memory"`
|
||||
Vmsize int64 `xml:"vmsize"`
|
||||
Process_group_id string `xml:"process_group_id"`
|
||||
}
|
||||
|
||||
func (p *process) getUptime() int64 {
|
||||
if p.Uptime == "" {
|
||||
return 0
|
||||
}
|
||||
|
||||
timeSlice := strings.Split(p.Uptime, " ")
|
||||
var uptime int64
|
||||
uptime = 0
|
||||
for _, v := range timeSlice {
|
||||
switch {
|
||||
case strings.HasSuffix(v, "d"):
|
||||
iValue := strings.TrimSuffix(v, "d")
|
||||
value, err := strconv.ParseInt(iValue, 10, 64)
|
||||
if err == nil {
|
||||
uptime += value * (24 * 60 * 60)
|
||||
}
|
||||
case strings.HasSuffix(v, "h"):
|
||||
iValue := strings.TrimSuffix(v, "y")
|
||||
value, err := strconv.ParseInt(iValue, 10, 64)
|
||||
if err == nil {
|
||||
uptime += value * (60 * 60)
|
||||
}
|
||||
case strings.HasSuffix(v, "m"):
|
||||
iValue := strings.TrimSuffix(v, "m")
|
||||
value, err := strconv.ParseInt(iValue, 10, 64)
|
||||
if err == nil {
|
||||
uptime += value * 60
|
||||
}
|
||||
case strings.HasSuffix(v, "s"):
|
||||
iValue := strings.TrimSuffix(v, "s")
|
||||
value, err := strconv.ParseInt(iValue, 10, 64)
|
||||
if err == nil {
|
||||
uptime += value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return uptime
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# Path of passenger-status.
|
||||
#
|
||||
# Plugin gather metric via parsing XML output of passenger-status
|
||||
# More information about the tool:
|
||||
# https://www.phusionpassenger.com/library/admin/apache/overall_status_report.html
|
||||
#
|
||||
#
|
||||
# If no path is specified, then the plugin simply execute passenger-status
|
||||
# hopefully it can be found in your PATH
|
||||
command = "passenger-status -v --show=xml"
|
||||
`
|
||||
|
||||
func (r *passenger) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (r *passenger) Description() string {
|
||||
return "Read metrics of passenger using passenger-status"
|
||||
}
|
||||
|
||||
func (g *passenger) Gather(acc inputs.Accumulator) error {
|
||||
if g.Command == "" {
|
||||
g.Command = "passenger-status -v --show=xml"
|
||||
}
|
||||
|
||||
cmd, args := g.parseCommand()
|
||||
out, err := exec.Command(cmd, args...).Output()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = importMetric(out, acc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func importMetric(stat []byte, acc inputs.Accumulator) error {
|
||||
var p info
|
||||
|
||||
decoder := xml.NewDecoder(bytes.NewReader(stat))
|
||||
decoder.CharsetReader = charset.NewReaderLabel
|
||||
if err := decoder.Decode(&p); err != nil {
|
||||
return fmt.Errorf("Cannot parse input with error: %v\n", err)
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
"passenger_version": p.Passenger_version,
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"process_count": p.Process_count,
|
||||
"max": p.Max,
|
||||
"capacity_used": p.Capacity_used,
|
||||
"get_wait_list_size": p.Get_wait_list_size,
|
||||
}
|
||||
acc.AddFields("passenger", fields, tags)
|
||||
|
||||
for _, sg := range p.Supergroups.Supergroup {
|
||||
tags := map[string]string{
|
||||
"name": sg.Name,
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"get_wait_list_size": sg.Get_wait_list_size,
|
||||
"capacity_used": sg.Capacity_used,
|
||||
}
|
||||
acc.AddFields("passenger_supergroup", fields, tags)
|
||||
|
||||
for _, group := range sg.Group {
|
||||
tags := map[string]string{
|
||||
"name": group.Name,
|
||||
"app_root": group.AppRoot,
|
||||
"app_type": group.AppType,
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"get_wait_list_size": group.Get_wait_list_size,
|
||||
"capacity_used": group.Capacity_used,
|
||||
"processes_being_spawned": group.Processes_being_spawned,
|
||||
}
|
||||
acc.AddFields("passenger_group", fields, tags)
|
||||
|
||||
for _, process := range group.Processes.Process {
|
||||
tags := map[string]string{
|
||||
"group_name": group.Name,
|
||||
"app_root": group.AppRoot,
|
||||
"supergroup_name": sg.Name,
|
||||
"pid": fmt.Sprintf("%d", process.Pid),
|
||||
"code_revision": process.Code_revision,
|
||||
"life_status": process.Life_status,
|
||||
"process_group_id": process.Process_group_id,
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"concurrency": process.Concurrency,
|
||||
"sessions": process.Sessions,
|
||||
"busyness": process.Busyness,
|
||||
"processed": process.Processed,
|
||||
"spawner_creation_time": process.Spawner_creation_time,
|
||||
"spawn_start_time": process.Spawn_start_time,
|
||||
"spawn_end_time": process.Spawn_end_time,
|
||||
"last_used": process.Last_used,
|
||||
"uptime": process.getUptime(),
|
||||
"cpu": process.Cpu,
|
||||
"rss": process.Rss,
|
||||
"pss": process.Pss,
|
||||
"private_dirty": process.Private_dirty,
|
||||
"swap": process.Swap,
|
||||
"real_memory": process.Real_memory,
|
||||
"vmsize": process.Vmsize,
|
||||
}
|
||||
acc.AddFields("passenger_process", fields, tags)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("passenger", func() inputs.Input {
|
||||
return &passenger{}
|
||||
})
|
||||
}
|
|
@ -0,0 +1,301 @@
|
|||
package passenger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func fakePassengerStatus(stat string) {
|
||||
content := fmt.Sprintf("#!/bin/sh\ncat << EOF\n%s\nEOF", stat)
|
||||
ioutil.WriteFile("/tmp/passenger-status", []byte(content), 0700)
|
||||
}
|
||||
|
||||
func teardown() {
|
||||
os.Remove("/tmp/passenger-status")
|
||||
}
|
||||
|
||||
func Test_Invalid_Passenger_Status_Cli(t *testing.T) {
|
||||
r := &passenger{
|
||||
Command: "an-invalid-command passenger-status",
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := r.Gather(&acc)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, err.Error(), `exec: "an-invalid-command": executable file not found in $PATH`)
|
||||
}
|
||||
|
||||
func Test_Invalid_Xml(t *testing.T) {
|
||||
fakePassengerStatus("invalid xml")
|
||||
defer teardown()
|
||||
|
||||
r := &passenger{
|
||||
Command: "/tmp/passenger-status",
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := r.Gather(&acc)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, err.Error(), "Cannot parse input with error: EOF\n")
|
||||
}
|
||||
|
||||
// We test this by ensure that the error message match the path of default cli
|
||||
func Test_Default_Config_Load_Default_Command(t *testing.T) {
|
||||
fakePassengerStatus("invalid xml")
|
||||
defer teardown()
|
||||
|
||||
r := &passenger{}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := r.Gather(&acc)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, err.Error(), "exec: \"passenger-status\": executable file not found in $PATH")
|
||||
}
|
||||
|
||||
func TestPassengerGenerateMetric(t *testing.T) {
|
||||
fakePassengerStatus(sampleStat)
|
||||
defer teardown()
|
||||
|
||||
//Now we tested again above server, with our authentication data
|
||||
r := &passenger{
|
||||
Command: "/tmp/passenger-status",
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := r.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
tags := map[string]string{
|
||||
"passenger_version": "5.0.17",
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"process_count": 23,
|
||||
"max": 23,
|
||||
"capacity_used": 23,
|
||||
"get_wait_list_size": 3,
|
||||
}
|
||||
acc.AssertContainsTaggedFields(t, "passenger", fields, tags)
|
||||
|
||||
tags = map[string]string{
|
||||
"name": "/var/app/current/public",
|
||||
"app_root": "/var/app/current",
|
||||
"app_type": "rack",
|
||||
}
|
||||
fields = map[string]interface{}{
|
||||
"processes_being_spawned": 2,
|
||||
"capacity_used": 23,
|
||||
"get_wait_list_size": 3,
|
||||
}
|
||||
acc.AssertContainsTaggedFields(t, "passenger_group", fields, tags)
|
||||
|
||||
tags = map[string]string{
|
||||
"name": "/var/app/current/public",
|
||||
}
|
||||
|
||||
fields = map[string]interface{}{
|
||||
"capacity_used": 23,
|
||||
"get_wait_list_size": 3,
|
||||
}
|
||||
acc.AssertContainsTaggedFields(t, "passenger_supergroup", fields, tags)
|
||||
|
||||
tags = map[string]string{
|
||||
"app_root": "/var/app/current",
|
||||
"group_name": "/var/app/current/public",
|
||||
"supergroup_name": "/var/app/current/public",
|
||||
"pid": "11553",
|
||||
"code_revision": "899ac7f",
|
||||
"life_status": "ALIVE",
|
||||
"process_group_id": "13608",
|
||||
}
|
||||
fields = map[string]interface{}{
|
||||
"concurrency": 1,
|
||||
"sessions": 0,
|
||||
"busyness": 0,
|
||||
"processed": 951,
|
||||
"spawner_creation_time": int64(1452746835922747),
|
||||
"spawn_start_time": int64(1452746844946982),
|
||||
"spawn_end_time": int64(1452746845013365),
|
||||
"last_used": int64(1452747071764940),
|
||||
"uptime": int64(226), // in seconds of 3m 46s
|
||||
"cpu": int64(58),
|
||||
"rss": int64(418548),
|
||||
"pss": int64(319391),
|
||||
"private_dirty": int64(314900),
|
||||
"swap": int64(0),
|
||||
"real_memory": int64(314900),
|
||||
"vmsize": int64(1563580),
|
||||
}
|
||||
acc.AssertContainsTaggedFields(t, "passenger_process", fields, tags)
|
||||
}
|
||||
|
||||
var sampleStat = `
|
||||
<?xml version="1.0" encoding="iso8859-1" ?>
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<info version="3">
|
||||
<passenger_version>5.0.17</passenger_version>
|
||||
<group_count>1</group_count>
|
||||
<process_count>23</process_count>
|
||||
<max>23</max>
|
||||
<capacity_used>23</capacity_used>
|
||||
<get_wait_list_size>3</get_wait_list_size>
|
||||
<get_wait_list />
|
||||
<supergroups>
|
||||
<supergroup>
|
||||
<name>/var/app/current/public</name>
|
||||
<state>READY</state>
|
||||
<get_wait_list_size>3</get_wait_list_size>
|
||||
<capacity_used>23</capacity_used>
|
||||
<secret>foo</secret>
|
||||
<group default="true">
|
||||
<name>/var/app/current/public</name>
|
||||
<component_name>/var/app/current/public</component_name>
|
||||
<app_root>/var/app/current</app_root>
|
||||
<app_type>rack</app_type>
|
||||
<environment>production</environment>
|
||||
<uuid>QQUrbCVYxbJYpfgyDOwJ</uuid>
|
||||
<enabled_process_count>23</enabled_process_count>
|
||||
<disabling_process_count>0</disabling_process_count>
|
||||
<disabled_process_count>0</disabled_process_count>
|
||||
<capacity_used>23</capacity_used>
|
||||
<get_wait_list_size>3</get_wait_list_size>
|
||||
<disable_wait_list_size>0</disable_wait_list_size>
|
||||
<processes_being_spawned>2</processes_being_spawned>
|
||||
<secret>foo</secret>
|
||||
<api_key>foo</api_key>
|
||||
<life_status>ALIVE</life_status>
|
||||
<user>axcoto</user>
|
||||
<uid>1001</uid>
|
||||
<group>axcoto</group>
|
||||
<gid>1001</gid>
|
||||
<options>
|
||||
<app_root>/var/app/current</app_root>
|
||||
<app_group_name>/var/app/current/public</app_group_name>
|
||||
<app_type>rack</app_type>
|
||||
<start_command>/var/app/.rvm/gems/ruby-2.2.0-p645/gems/passenger-5.0.17/helper-scripts/rack-loader.rb</start_command>
|
||||
<startup_file>config.ru</startup_file>
|
||||
<process_title>Passenger RubyApp</process_title>
|
||||
<log_level>3</log_level>
|
||||
<start_timeout>90000</start_timeout>
|
||||
<environment>production</environment>
|
||||
<base_uri>/</base_uri>
|
||||
<spawn_method>smart</spawn_method>
|
||||
<default_user>nobody</default_user>
|
||||
<default_group>nogroup</default_group>
|
||||
<ruby>/var/app/.rvm/gems/ruby-2.2.0-p645/wrappers/ruby</ruby>
|
||||
<python>python</python>
|
||||
<nodejs>node</nodejs>
|
||||
<ust_router_address>unix:/tmp/passenger.eKFdvdC/agents.s/ust_router</ust_router_address>
|
||||
<ust_router_username>logging</ust_router_username>
|
||||
<ust_router_password>foo</ust_router_password>
|
||||
<debugger>false</debugger>
|
||||
<analytics>false</analytics>
|
||||
<api_key>foo</api_key>
|
||||
<min_processes>22</min_processes>
|
||||
<max_processes>0</max_processes>
|
||||
<max_preloader_idle_time>300</max_preloader_idle_time>
|
||||
<max_out_of_band_work_instances>1</max_out_of_band_work_instances>
|
||||
</options>
|
||||
<processes>
|
||||
<process>
|
||||
<pid>11553</pid>
|
||||
<sticky_session_id>378579907</sticky_session_id>
|
||||
<gupid>17173df-PoNT3J9HCf</gupid>
|
||||
<concurrency>1</concurrency>
|
||||
<sessions>0</sessions>
|
||||
<busyness>0</busyness>
|
||||
<processed>951</processed>
|
||||
<spawner_creation_time>1452746835922747</spawner_creation_time>
|
||||
<spawn_start_time>1452746844946982</spawn_start_time>
|
||||
<spawn_end_time>1452746845013365</spawn_end_time>
|
||||
<last_used>1452747071764940</last_used>
|
||||
<last_used_desc>0s ago</last_used_desc>
|
||||
<uptime>3m 46s</uptime>
|
||||
<code_revision>899ac7f</code_revision>
|
||||
<life_status>ALIVE</life_status>
|
||||
<enabled>ENABLED</enabled>
|
||||
<has_metrics>true</has_metrics>
|
||||
<cpu>58</cpu>
|
||||
<rss>418548</rss>
|
||||
<pss>319391</pss>
|
||||
<private_dirty>314900</private_dirty>
|
||||
<swap>0</swap>
|
||||
<real_memory>314900</real_memory>
|
||||
<vmsize>1563580</vmsize>
|
||||
<process_group_id>13608</process_group_id>
|
||||
<command>Passenger RubyApp: /var/app/current/public</command>
|
||||
<sockets>
|
||||
<socket>
|
||||
<name>main</name>
|
||||
<address>unix:/tmp/passenger.eKFdvdC/apps.s/ruby.UWF6zkRJ71aoMXPxpknpWVfC1POFqgWZzbEsdz5v0G46cSSMxJ3GHLFhJaUrK2I</address>
|
||||
<protocol>session</protocol>
|
||||
<concurrency>1</concurrency>
|
||||
<sessions>0</sessions>
|
||||
</socket>
|
||||
<socket>
|
||||
<name>http</name>
|
||||
<address>tcp://127.0.0.1:49888</address>
|
||||
<protocol>http</protocol>
|
||||
<concurrency>1</concurrency>
|
||||
<sessions>0</sessions>
|
||||
</socket>
|
||||
</sockets>
|
||||
</process>
|
||||
<process>
|
||||
<pid>11563</pid>
|
||||
<sticky_session_id>1549681201</sticky_session_id>
|
||||
<gupid>17173df-pX5iJOipd8</gupid>
|
||||
<concurrency>1</concurrency>
|
||||
<sessions>1</sessions>
|
||||
<busyness>2147483647</busyness>
|
||||
<processed>756</processed>
|
||||
<spawner_creation_time>1452746835922747</spawner_creation_time>
|
||||
<spawn_start_time>1452746845136882</spawn_start_time>
|
||||
<spawn_end_time>1452746845172460</spawn_end_time>
|
||||
<last_used>1452747071709179</last_used>
|
||||
<last_used_desc>0s ago</last_used_desc>
|
||||
<uptime>3m 46s</uptime>
|
||||
<code_revision>899ac7f</code_revision>
|
||||
<life_status>ALIVE</life_status>
|
||||
<enabled>ENABLED</enabled>
|
||||
<has_metrics>true</has_metrics>
|
||||
<cpu>47</cpu>
|
||||
<rss>418296</rss>
|
||||
<pss>314036</pss>
|
||||
<private_dirty>309240</private_dirty>
|
||||
<swap>0</swap>
|
||||
<real_memory>309240</real_memory>
|
||||
<vmsize>1563608</vmsize>
|
||||
<process_group_id>13608</process_group_id>
|
||||
<command>Passenger RubyApp: /var/app/current/public</command>
|
||||
<sockets>
|
||||
<socket>
|
||||
<name>main</name>
|
||||
<address>unix:/tmp/passenger.eKFdvdC/apps.s/ruby.PVCh7TmvCi9knqhba2vG5qXrlHGEIwhGrxnUvRbIAD6SPz9m0G7YlJ8HEsREHY3</address>
|
||||
<protocol>session</protocol>
|
||||
<concurrency>1</concurrency>
|
||||
<sessions>1</sessions>
|
||||
</socket>
|
||||
<socket>
|
||||
<name>http</name>
|
||||
<address>tcp://127.0.0.1:52783</address>
|
||||
<protocol>http</protocol>
|
||||
<concurrency>1</concurrency>
|
||||
<sessions>0</sessions>
|
||||
</socket>
|
||||
</sockets>
|
||||
</process>
|
||||
</processes>
|
||||
</group>
|
||||
</supergroup>
|
||||
</supergroups>
|
||||
</info>`
|
|
@ -6,10 +6,14 @@ Get phpfpm stat using either HTTP status page or fpm socket.
|
|||
|
||||
Meta:
|
||||
|
||||
- tags: `url=<ip> pool=poolname`
|
||||
- tags: `pool=poolname`
|
||||
|
||||
Measurement names:
|
||||
|
||||
- phpfpm
|
||||
|
||||
Measurement field:
|
||||
|
||||
- accepted_conn
|
||||
- listen_queue
|
||||
- max_listen_queue
|
||||
|
@ -50,36 +54,12 @@ It produces:
|
|||
|
||||
```
|
||||
* Plugin: phpfpm, Collection 1
|
||||
> [url="10.0.0.12" pool="www"] phpfpm_idle_processes value=1
|
||||
> [url="10.0.0.12" pool="www"] phpfpm_total_processes value=2
|
||||
> [url="10.0.0.12" pool="www"] phpfpm_max_children_reached value=0
|
||||
> [url="10.0.0.12" pool="www"] phpfpm_max_listen_queue value=0
|
||||
> [url="10.0.0.12" pool="www"] phpfpm_listen_queue value=0
|
||||
> [url="10.0.0.12" pool="www"] phpfpm_listen_queue_len value=0
|
||||
> [url="10.0.0.12" pool="www"] phpfpm_active_processes value=1
|
||||
> [url="10.0.0.12" pool="www"] phpfpm_max_active_processes value=2
|
||||
> [url="10.0.0.12" pool="www"] phpfpm_slow_requests value=0
|
||||
> [url="10.0.0.12" pool="www"] phpfpm_accepted_conn value=305
|
||||
|
||||
> [url="localhost" pool="www2"] phpfpm_max_children_reached value=0
|
||||
> [url="localhost" pool="www2"] phpfpm_slow_requests value=0
|
||||
> [url="localhost" pool="www2"] phpfpm_max_listen_queue value=0
|
||||
> [url="localhost" pool="www2"] phpfpm_active_processes value=1
|
||||
> [url="localhost" pool="www2"] phpfpm_listen_queue_len value=0
|
||||
> [url="localhost" pool="www2"] phpfpm_idle_processes value=1
|
||||
> [url="localhost" pool="www2"] phpfpm_total_processes value=2
|
||||
> [url="localhost" pool="www2"] phpfpm_max_active_processes value=2
|
||||
> [url="localhost" pool="www2"] phpfpm_accepted_conn value=306
|
||||
> [url="localhost" pool="www2"] phpfpm_listen_queue value=0
|
||||
|
||||
> [url="10.0.0.12:9000" pool="www3"] phpfpm_max_children_reached value=0
|
||||
> [url="10.0.0.12:9000" pool="www3"] phpfpm_slow_requests value=1
|
||||
> [url="10.0.0.12:9000" pool="www3"] phpfpm_max_listen_queue value=0
|
||||
> [url="10.0.0.12:9000" pool="www3"] phpfpm_active_processes value=1
|
||||
> [url="10.0.0.12:9000" pool="www3"] phpfpm_listen_queue_len value=0
|
||||
> [url="10.0.0.12:9000" pool="www3"] phpfpm_idle_processes value=2
|
||||
> [url="10.0.0.12:9000" pool="www3"] phpfpm_total_processes value=2
|
||||
> [url="10.0.0.12:9000" pool="www3"] phpfpm_max_active_processes value=2
|
||||
> [url="10.0.0.12:9000" pool="www3"] phpfpm_accepted_conn value=307
|
||||
> [url="10.0.0.12:9000" pool="www3"] phpfpm_listen_queue value=0
|
||||
> phpfpm,pool=www accepted_conn=13i,active_processes=2i,idle_processes=1i,listen_queue=0i,listen_queue_len=0i,max_active_processes=2i,max_children_reached=0i,max_listen_queue=0i,slow_requests=0i,total_processes=3i 1453011293083331187
|
||||
> phpfpm,pool=www2 accepted_conn=12i,active_processes=1i,idle_processes=2i,listen_queue=0i,listen_queue_len=0i,max_active_processes=2i,max_children_reached=0i,max_listen_queue=0i,slow_requests=0i,total_processes=3i 1453011293083691422
|
||||
> phpfpm,pool=www3 accepted_conn=11i,active_processes=1i,idle_processes=2i,listen_queue=0i,listen_queue_len=0i,max_active_processes=2i,max_children_reached=0i,max_listen_queue=0i,slow_requests=0i,total_processes=3i 1453011293083691658
|
||||
```
|
||||
|
||||
## Note
|
||||
|
||||
When using `unixsocket`, you have to ensure that telegraf runs on same
|
||||
host, and socket path is accessible to telegraf user.
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -40,20 +41,25 @@ type phpfpm struct {
|
|||
|
||||
var sampleConfig = `
|
||||
# An array of addresses to gather stats about. Specify an ip or hostname
|
||||
# with optional port and path.
|
||||
# with optional port and path
|
||||
#
|
||||
# Plugin can be configured in three modes (both can be used):
|
||||
# - http: the URL must start with http:// or https://, ex:
|
||||
# Plugin can be configured in three modes (either can be used):
|
||||
# - http: the URL must start with http:// or https://, ie:
|
||||
# "http://localhost/status"
|
||||
# "http://192.168.130.1/status?full"
|
||||
# - unixsocket: path to fpm socket, ex:
|
||||
#
|
||||
# - unixsocket: path to fpm socket, ie:
|
||||
# "/var/run/php5-fpm.sock"
|
||||
# "192.168.10.10:/var/run/php5-fpm-www2.sock"
|
||||
# - fcgi: the URL mush start with fcgi:// or cgi://, and port must present, ex:
|
||||
# or using a custom fpm status path:
|
||||
# "/var/run/php5-fpm.sock:fpm-custom-status-path"
|
||||
#
|
||||
# - fcgi: the URL must start with fcgi:// or cgi://, and port must be present, ie:
|
||||
# "fcgi://10.0.0.12:9000/status"
|
||||
# "cgi://10.0.10.12:9001/status"
|
||||
#
|
||||
# If no servers are specified, then default to 127.0.0.1/server-status
|
||||
# Example of multiple gathering from local socket and remove host
|
||||
# urls = ["http://192.168.1.20/status", "/tmp/fpm.sock"]
|
||||
# If no servers are specified, then default to http://127.0.0.1/status
|
||||
urls = ["http://localhost/status"]
|
||||
`
|
||||
|
||||
|
@ -62,7 +68,7 @@ func (r *phpfpm) SampleConfig() string {
|
|||
}
|
||||
|
||||
func (r *phpfpm) Description() string {
|
||||
return "Read metrics of phpfpm, via HTTP status page or socket(pending)"
|
||||
return "Read metrics of phpfpm, via HTTP status page or socket"
|
||||
}
|
||||
|
||||
// Reads stats from all configured servers accumulates stats.
|
||||
|
@ -89,15 +95,72 @@ func (g *phpfpm) Gather(acc inputs.Accumulator) error {
|
|||
return outerr
|
||||
}
|
||||
|
||||
// Request status page to get stat raw data
|
||||
// Request status page to get stat raw data and import it
|
||||
func (g *phpfpm) gatherServer(addr string, acc inputs.Accumulator) error {
|
||||
if g.client == nil {
|
||||
|
||||
client := &http.Client{}
|
||||
g.client = client
|
||||
}
|
||||
|
||||
if strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") {
|
||||
return g.gatherHttp(addr, acc)
|
||||
}
|
||||
|
||||
var (
|
||||
fcgi *conn
|
||||
socketPath string
|
||||
statusPath string
|
||||
)
|
||||
|
||||
if strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable parse server address '%s': %s", addr, err)
|
||||
}
|
||||
socketAddr := strings.Split(u.Host, ":")
|
||||
fcgiIp := socketAddr[0]
|
||||
fcgiPort, _ := strconv.Atoi(socketAddr[1])
|
||||
fcgi, _ = NewClient(fcgiIp, fcgiPort)
|
||||
} else {
|
||||
socketAddr := strings.Split(addr, ":")
|
||||
if len(socketAddr) >= 2 {
|
||||
socketPath = socketAddr[0]
|
||||
statusPath = socketAddr[1]
|
||||
} else {
|
||||
socketPath = socketAddr[0]
|
||||
statusPath = "status"
|
||||
}
|
||||
|
||||
if _, err := os.Stat(socketPath); os.IsNotExist(err) {
|
||||
return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err)
|
||||
}
|
||||
fcgi, _ = NewClient("unix", socketPath)
|
||||
}
|
||||
return g.gatherFcgi(fcgi, statusPath, acc)
|
||||
}
|
||||
|
||||
// Gather stat using fcgi protocol
|
||||
func (g *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc inputs.Accumulator) error {
|
||||
fpmOutput, fpmErr, err := fcgi.Request(map[string]string{
|
||||
"SCRIPT_NAME": "/" + statusPath,
|
||||
"SCRIPT_FILENAME": statusPath,
|
||||
"REQUEST_METHOD": "GET",
|
||||
"CONTENT_LENGTH": "0",
|
||||
"SERVER_PROTOCOL": "HTTP/1.0",
|
||||
"SERVER_SOFTWARE": "go / fcgiclient ",
|
||||
"REMOTE_ADDR": "127.0.0.1",
|
||||
}, "/"+statusPath)
|
||||
|
||||
if len(fpmErr) == 0 && err == nil {
|
||||
importMetric(bytes.NewReader(fpmOutput), acc)
|
||||
return nil
|
||||
} else {
|
||||
return fmt.Errorf("Unable parse phpfpm status. Error: %v %v", string(fpmErr), err)
|
||||
}
|
||||
}
|
||||
|
||||
// Gather stat using http protocol
|
||||
func (g *phpfpm) gatherHttp(addr string, acc inputs.Accumulator) error {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable parse server address '%s': %s", addr, err)
|
||||
|
@ -116,44 +179,12 @@ func (g *phpfpm) gatherServer(addr string, acc inputs.Accumulator) error {
|
|||
addr, err)
|
||||
}
|
||||
|
||||
importMetric(res.Body, acc, u.Host)
|
||||
} else {
|
||||
var (
|
||||
fcgi *FCGIClient
|
||||
fcgiAddr string
|
||||
)
|
||||
if strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable parse server address '%s': %s", addr, err)
|
||||
}
|
||||
socketAddr := strings.Split(u.Host, ":")
|
||||
fcgiIp := socketAddr[0]
|
||||
fcgiPort, _ := strconv.Atoi(socketAddr[1])
|
||||
fcgiAddr = u.Host
|
||||
fcgi, _ = NewClient(fcgiIp, fcgiPort)
|
||||
} else {
|
||||
socketAddr := strings.Split(addr, ":")
|
||||
fcgiAddr = socketAddr[0]
|
||||
fcgi, _ = NewClient("unix", socketAddr[1])
|
||||
}
|
||||
resOut, resErr, err := fcgi.Request(map[string]string{
|
||||
"SCRIPT_NAME": "/status",
|
||||
"SCRIPT_FILENAME": "status",
|
||||
"REQUEST_METHOD": "GET",
|
||||
}, "")
|
||||
|
||||
if len(resErr) == 0 && err == nil {
|
||||
importMetric(bytes.NewReader(resOut), acc, fcgiAddr)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
importMetric(res.Body, acc)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Import HTTP stat data into Telegraf system
|
||||
func importMetric(r io.Reader, acc inputs.Accumulator, host string) (poolStat, error) {
|
||||
// Import stat data into Telegraf system
|
||||
func importMetric(r io.Reader, acc inputs.Accumulator) (poolStat, error) {
|
||||
stats := make(poolStat)
|
||||
var currentPool string
|
||||
|
||||
|
@ -195,7 +226,6 @@ func importMetric(r io.Reader, acc inputs.Accumulator, host string) (poolStat, e
|
|||
// Finally, we push the pool metric
|
||||
for pool := range stats {
|
||||
tags := map[string]string{
|
||||
"url": host,
|
||||
"pool": pool,
|
||||
}
|
||||
fields := make(map[string]interface{})
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
// Copyright 2011 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package fcgi implements the FastCGI protocol.
|
||||
// Currently only the responder role is supported.
|
||||
// The protocol is defined at http://www.fastcgi.com/drupal/node/6?q=node/22
|
||||
package phpfpm
|
||||
|
||||
// FastCGI client to request via socket
|
||||
|
||||
// Copyright 2012 Junqing Tan <ivan@mysqlab.net> and The Go Authors
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// Part of source code is from Go fcgi package
|
||||
|
||||
// Fix bug: Can't recive more than 1 record untill FCGI_END_REQUEST 2012-09-15
|
||||
// By: wofeiwo
|
||||
// This file defines the raw protocol and some utilities used by the child and
|
||||
// the host.
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
|
@ -15,70 +16,84 @@ import (
|
|||
"encoding/binary"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"strings"
|
||||
)
|
||||
|
||||
const FCGI_LISTENSOCK_FILENO uint8 = 0
|
||||
const FCGI_HEADER_LEN uint8 = 8
|
||||
const VERSION_1 uint8 = 1
|
||||
const FCGI_NULL_REQUEST_ID uint8 = 0
|
||||
const FCGI_KEEP_CONN uint8 = 1
|
||||
// recType is a record type, as defined by
|
||||
// http://www.fastcgi.com/devkit/doc/fcgi-spec.html#S8
|
||||
type recType uint8
|
||||
|
||||
const (
|
||||
FCGI_BEGIN_REQUEST uint8 = iota + 1
|
||||
FCGI_ABORT_REQUEST
|
||||
FCGI_END_REQUEST
|
||||
FCGI_PARAMS
|
||||
FCGI_STDIN
|
||||
FCGI_STDOUT
|
||||
FCGI_STDERR
|
||||
FCGI_DATA
|
||||
FCGI_GET_VALUES
|
||||
FCGI_GET_VALUES_RESULT
|
||||
FCGI_UNKNOWN_TYPE
|
||||
FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
|
||||
typeBeginRequest recType = 1
|
||||
typeAbortRequest recType = 2
|
||||
typeEndRequest recType = 3
|
||||
typeParams recType = 4
|
||||
typeStdin recType = 5
|
||||
typeStdout recType = 6
|
||||
typeStderr recType = 7
|
||||
typeData recType = 8
|
||||
typeGetValues recType = 9
|
||||
typeGetValuesResult recType = 10
|
||||
typeUnknownType recType = 11
|
||||
)
|
||||
|
||||
const (
|
||||
FCGI_RESPONDER uint8 = iota + 1
|
||||
FCGI_AUTHORIZER
|
||||
FCGI_FILTER
|
||||
)
|
||||
// keep the connection between web-server and responder open after request
|
||||
const flagKeepConn = 1
|
||||
|
||||
const (
|
||||
FCGI_REQUEST_COMPLETE uint8 = iota
|
||||
FCGI_CANT_MPX_CONN
|
||||
FCGI_OVERLOADED
|
||||
FCGI_UNKNOWN_ROLE
|
||||
)
|
||||
|
||||
const (
|
||||
FCGI_MAX_CONNS string = "MAX_CONNS"
|
||||
FCGI_MAX_REQS string = "MAX_REQS"
|
||||
FCGI_MPXS_CONNS string = "MPXS_CONNS"
|
||||
)
|
||||
|
||||
const (
|
||||
maxWrite = 6553500 // maximum record body
|
||||
maxWrite = 65535 // maximum record body
|
||||
maxPad = 255
|
||||
)
|
||||
|
||||
const (
|
||||
roleResponder = iota + 1 // only Responders are implemented.
|
||||
roleAuthorizer
|
||||
roleFilter
|
||||
)
|
||||
|
||||
const (
|
||||
statusRequestComplete = iota
|
||||
statusCantMultiplex
|
||||
statusOverloaded
|
||||
statusUnknownRole
|
||||
)
|
||||
|
||||
const headerLen = 8
|
||||
|
||||
type header struct {
|
||||
Version uint8
|
||||
Type uint8
|
||||
Type recType
|
||||
Id uint16
|
||||
ContentLength uint16
|
||||
PaddingLength uint8
|
||||
Reserved uint8
|
||||
}
|
||||
|
||||
type beginRequest struct {
|
||||
role uint16
|
||||
flags uint8
|
||||
reserved [5]uint8
|
||||
}
|
||||
|
||||
func (br *beginRequest) read(content []byte) error {
|
||||
if len(content) != 8 {
|
||||
return errors.New("fcgi: invalid begin request record")
|
||||
}
|
||||
br.role = binary.BigEndian.Uint16(content)
|
||||
br.flags = content[2]
|
||||
return nil
|
||||
}
|
||||
|
||||
// for padding so we don't have to allocate all the time
|
||||
// not synchronized because we don't care what the contents are
|
||||
var pad [maxPad]byte
|
||||
|
||||
func (h *header) init(recType uint8, reqId uint16, contentLength int) {
|
||||
func (h *header) init(recType recType, reqId uint16, contentLength int) {
|
||||
h.Version = 1
|
||||
h.Type = recType
|
||||
h.Id = reqId
|
||||
|
@ -86,6 +101,26 @@ func (h *header) init(recType uint8, reqId uint16, contentLength int) {
|
|||
h.PaddingLength = uint8(-contentLength & 7)
|
||||
}
|
||||
|
||||
// conn sends records over rwc
|
||||
type conn struct {
|
||||
mutex sync.Mutex
|
||||
rwc io.ReadWriteCloser
|
||||
|
||||
// to avoid allocations
|
||||
buf bytes.Buffer
|
||||
h header
|
||||
}
|
||||
|
||||
func newConn(rwc io.ReadWriteCloser) *conn {
|
||||
return &conn{rwc: rwc}
|
||||
}
|
||||
|
||||
func (c *conn) Close() error {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
return c.rwc.Close()
|
||||
}
|
||||
|
||||
type record struct {
|
||||
h header
|
||||
buf [maxWrite + maxPad]byte
|
||||
|
@ -109,69 +144,39 @@ func (r *record) content() []byte {
|
|||
return r.buf[:r.h.ContentLength]
|
||||
}
|
||||
|
||||
type FCGIClient struct {
|
||||
mutex sync.Mutex
|
||||
rwc io.ReadWriteCloser
|
||||
h header
|
||||
buf bytes.Buffer
|
||||
keepAlive bool
|
||||
}
|
||||
|
||||
func NewClient(h string, args ...interface{}) (fcgi *FCGIClient, err error) {
|
||||
var conn net.Conn
|
||||
if len(args) != 1 {
|
||||
err = errors.New("fcgi: not enough params")
|
||||
return
|
||||
}
|
||||
switch args[0].(type) {
|
||||
case int:
|
||||
addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10)
|
||||
conn, err = net.Dial("tcp", addr)
|
||||
case string:
|
||||
laddr := net.UnixAddr{Name: args[0].(string), Net: h}
|
||||
conn, err = net.DialUnix(h, nil, &laddr)
|
||||
default:
|
||||
err = errors.New("fcgi: we only accept int (port) or string (socket) params.")
|
||||
}
|
||||
fcgi = &FCGIClient{
|
||||
rwc: conn,
|
||||
keepAlive: false,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (client *FCGIClient) writeRecord(recType uint8, reqId uint16, content []byte) (err error) {
|
||||
client.mutex.Lock()
|
||||
defer client.mutex.Unlock()
|
||||
client.buf.Reset()
|
||||
client.h.init(recType, reqId, len(content))
|
||||
if err := binary.Write(&client.buf, binary.BigEndian, client.h); err != nil {
|
||||
// writeRecord writes and sends a single record.
|
||||
func (c *conn) writeRecord(recType recType, reqId uint16, b []byte) error {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
c.buf.Reset()
|
||||
c.h.init(recType, reqId, len(b))
|
||||
if err := binary.Write(&c.buf, binary.BigEndian, c.h); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := client.buf.Write(content); err != nil {
|
||||
if _, err := c.buf.Write(b); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := client.buf.Write(pad[:client.h.PaddingLength]); err != nil {
|
||||
if _, err := c.buf.Write(pad[:c.h.PaddingLength]); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = client.rwc.Write(client.buf.Bytes())
|
||||
_, err := c.rwc.Write(c.buf.Bytes())
|
||||
return err
|
||||
}
|
||||
|
||||
func (client *FCGIClient) writeBeginRequest(reqId uint16, role uint16, flags uint8) error {
|
||||
func (c *conn) writeBeginRequest(reqId uint16, role uint16, flags uint8) error {
|
||||
b := [8]byte{byte(role >> 8), byte(role), flags}
|
||||
return client.writeRecord(FCGI_BEGIN_REQUEST, reqId, b[:])
|
||||
return c.writeRecord(typeBeginRequest, reqId, b[:])
|
||||
}
|
||||
|
||||
func (client *FCGIClient) writeEndRequest(reqId uint16, appStatus int, protocolStatus uint8) error {
|
||||
func (c *conn) writeEndRequest(reqId uint16, appStatus int, protocolStatus uint8) error {
|
||||
b := make([]byte, 8)
|
||||
binary.BigEndian.PutUint32(b, uint32(appStatus))
|
||||
b[4] = protocolStatus
|
||||
return client.writeRecord(FCGI_END_REQUEST, reqId, b)
|
||||
return c.writeRecord(typeEndRequest, reqId, b)
|
||||
}
|
||||
|
||||
func (client *FCGIClient) writePairs(recType uint8, reqId uint16, pairs map[string]string) error {
|
||||
w := newWriter(client, recType, reqId)
|
||||
func (c *conn) writePairs(recType recType, reqId uint16, pairs map[string]string) error {
|
||||
w := newWriter(c, recType, reqId)
|
||||
b := make([]byte, 8)
|
||||
for k, v := range pairs {
|
||||
n := encodeSize(b, uint32(len(k)))
|
||||
|
@ -238,7 +243,7 @@ func (w *bufWriter) Close() error {
|
|||
return w.closer.Close()
|
||||
}
|
||||
|
||||
func newWriter(c *FCGIClient, recType uint8, reqId uint16) *bufWriter {
|
||||
func newWriter(c *conn, recType recType, reqId uint16) *bufWriter {
|
||||
s := &streamWriter{c: c, recType: recType, reqId: reqId}
|
||||
w := bufio.NewWriterSize(s, maxWrite)
|
||||
return &bufWriter{s, w}
|
||||
|
@ -247,8 +252,8 @@ func newWriter(c *FCGIClient, recType uint8, reqId uint16) *bufWriter {
|
|||
// streamWriter abstracts out the separation of a stream into discrete records.
|
||||
// It only writes maxWrite bytes at a time.
|
||||
type streamWriter struct {
|
||||
c *FCGIClient
|
||||
recType uint8
|
||||
c *conn
|
||||
recType recType
|
||||
reqId uint16
|
||||
}
|
||||
|
||||
|
@ -273,22 +278,44 @@ func (w *streamWriter) Close() error {
|
|||
return w.c.writeRecord(w.recType, w.reqId, nil)
|
||||
}
|
||||
|
||||
func (client *FCGIClient) Request(env map[string]string, reqStr string) (retout []byte, reterr []byte, err error) {
|
||||
func NewClient(h string, args ...interface{}) (fcgi *conn, err error) {
|
||||
var con net.Conn
|
||||
if len(args) != 1 {
|
||||
err = errors.New("fcgi: not enough params")
|
||||
return
|
||||
}
|
||||
switch args[0].(type) {
|
||||
case int:
|
||||
addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10)
|
||||
con, err = net.Dial("tcp", addr)
|
||||
case string:
|
||||
laddr := net.UnixAddr{Name: args[0].(string), Net: h}
|
||||
con, err = net.DialUnix(h, nil, &laddr)
|
||||
default:
|
||||
err = errors.New("fcgi: we only accept int (port) or string (socket) params.")
|
||||
}
|
||||
fcgi = &conn{
|
||||
rwc: con,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
var reqId uint16 = 1
|
||||
func (client *conn) Request(env map[string]string, requestData string) (retout []byte, reterr []byte, err error) {
|
||||
defer client.rwc.Close()
|
||||
var reqId uint16 = 1
|
||||
|
||||
err = client.writeBeginRequest(reqId, uint16(FCGI_RESPONDER), 0)
|
||||
err = client.writeBeginRequest(reqId, uint16(roleResponder), 0)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = client.writePairs(FCGI_PARAMS, reqId, env)
|
||||
|
||||
err = client.writePairs(typeParams, reqId, env)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if len(reqStr) > 0 {
|
||||
err = client.writeRecord(FCGI_STDIN, reqId, []byte(reqStr))
|
||||
if err != nil {
|
||||
|
||||
if len(requestData) > 0 {
|
||||
if err = client.writeRecord(typeStdin, reqId, []byte(requestData)); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -297,23 +324,25 @@ func (client *FCGIClient) Request(env map[string]string, reqStr string) (retout
|
|||
var err1 error
|
||||
|
||||
// recive untill EOF or FCGI_END_REQUEST
|
||||
READ_LOOP:
|
||||
for {
|
||||
err1 = rec.read(client.rwc)
|
||||
if err1 != nil {
|
||||
if err1 != nil && strings.Contains(err1.Error(), "use of closed network connection") {
|
||||
if err1 != io.EOF {
|
||||
err = err1
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
switch {
|
||||
case rec.h.Type == FCGI_STDOUT:
|
||||
case rec.h.Type == typeStdout:
|
||||
retout = append(retout, rec.content()...)
|
||||
case rec.h.Type == FCGI_STDERR:
|
||||
case rec.h.Type == typeStderr:
|
||||
reterr = append(reterr, rec.content()...)
|
||||
case rec.h.Type == FCGI_END_REQUEST:
|
||||
case rec.h.Type == typeEndRequest:
|
||||
fallthrough
|
||||
default:
|
||||
break
|
||||
break READ_LOOP
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,24 +1,34 @@
|
|||
package phpfpm
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/fcgi"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
)
|
||||
|
||||
func TestPhpFpmGeneratesMetrics(t *testing.T) {
|
||||
//We create a fake server to return test data
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
type statServer struct{}
|
||||
|
||||
// We create a fake server to return test data
|
||||
func (s statServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
w.Header().Set("Content-Length", fmt.Sprint(len(outputSample)))
|
||||
fmt.Fprint(w, outputSample)
|
||||
}))
|
||||
}
|
||||
|
||||
func TestPhpFpmGeneratesMetrics_From_Http(t *testing.T) {
|
||||
sv := statServer{}
|
||||
ts := httptest.NewServer(sv)
|
||||
defer ts.Close()
|
||||
|
||||
//Now we tested again above server, with our authentication data
|
||||
r := &phpfpm{
|
||||
Urls: []string{ts.URL},
|
||||
}
|
||||
|
@ -29,7 +39,134 @@ func TestPhpFpmGeneratesMetrics(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
tags := map[string]string{
|
||||
"url": ts.Listener.Addr().String(),
|
||||
"pool": "www",
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"accepted_conn": int64(3),
|
||||
"listen_queue": int64(1),
|
||||
"max_listen_queue": int64(0),
|
||||
"listen_queue_len": int64(0),
|
||||
"idle_processes": int64(1),
|
||||
"active_processes": int64(1),
|
||||
"total_processes": int64(2),
|
||||
"max_active_processes": int64(1),
|
||||
"max_children_reached": int64(2),
|
||||
"slow_requests": int64(1),
|
||||
}
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
|
||||
}
|
||||
|
||||
func TestPhpFpmGeneratesMetrics_From_Fcgi(t *testing.T) {
|
||||
// Let OS find an available port
|
||||
tcp, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal("Cannot initalize test server")
|
||||
}
|
||||
defer tcp.Close()
|
||||
|
||||
s := statServer{}
|
||||
go fcgi.Serve(tcp, s)
|
||||
|
||||
//Now we tested again above server
|
||||
r := &phpfpm{
|
||||
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
err = r.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
tags := map[string]string{
|
||||
"pool": "www",
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"accepted_conn": int64(3),
|
||||
"listen_queue": int64(1),
|
||||
"max_listen_queue": int64(0),
|
||||
"listen_queue_len": int64(0),
|
||||
"idle_processes": int64(1),
|
||||
"active_processes": int64(1),
|
||||
"total_processes": int64(2),
|
||||
"max_active_processes": int64(1),
|
||||
"max_children_reached": int64(2),
|
||||
"slow_requests": int64(1),
|
||||
}
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
|
||||
}
|
||||
|
||||
func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {
|
||||
// Create a socket in /tmp because we always have write permission and if the
|
||||
// removing of socket fail when system restart /tmp is clear so
|
||||
// we don't have junk files around
|
||||
var randomNumber int64
|
||||
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
|
||||
tcp, err := net.Listen("unix", fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber))
|
||||
if err != nil {
|
||||
t.Fatal("Cannot initalize server on port ")
|
||||
}
|
||||
|
||||
defer tcp.Close()
|
||||
s := statServer{}
|
||||
go fcgi.Serve(tcp, s)
|
||||
|
||||
r := &phpfpm{
|
||||
Urls: []string{tcp.Addr().String()},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err = r.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
tags := map[string]string{
|
||||
"pool": "www",
|
||||
}
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"accepted_conn": int64(3),
|
||||
"listen_queue": int64(1),
|
||||
"max_listen_queue": int64(0),
|
||||
"listen_queue_len": int64(0),
|
||||
"idle_processes": int64(1),
|
||||
"active_processes": int64(1),
|
||||
"total_processes": int64(2),
|
||||
"max_active_processes": int64(1),
|
||||
"max_children_reached": int64(2),
|
||||
"slow_requests": int64(1),
|
||||
}
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
|
||||
}
|
||||
|
||||
func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
|
||||
// Create a socket in /tmp because we always have write permission. If the
|
||||
// removing of socket fail we won't have junk files around. Cuz when system
|
||||
// restart, it clears out /tmp
|
||||
var randomNumber int64
|
||||
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
|
||||
tcp, err := net.Listen("unix", fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber))
|
||||
if err != nil {
|
||||
t.Fatal("Cannot initalize server on port ")
|
||||
}
|
||||
|
||||
defer tcp.Close()
|
||||
s := statServer{}
|
||||
go fcgi.Serve(tcp, s)
|
||||
|
||||
r := &phpfpm{
|
||||
Urls: []string{tcp.Addr().String() + ":custom-status-path"},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err = r.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
tags := map[string]string{
|
||||
"pool": "www",
|
||||
}
|
||||
|
||||
|
@ -51,7 +188,7 @@ func TestPhpFpmGeneratesMetrics(t *testing.T) {
|
|||
|
||||
//When not passing server config, we default to localhost
|
||||
//We just want to make sure we did request stat from localhost
|
||||
func TestHaproxyDefaultGetFromLocalhost(t *testing.T) {
|
||||
func TestPhpFpmDefaultGetFromLocalhost(t *testing.T) {
|
||||
r := &phpfpm{}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
@ -61,6 +198,31 @@ func TestHaproxyDefaultGetFromLocalhost(t *testing.T) {
|
|||
assert.Contains(t, err.Error(), "127.0.0.1/status")
|
||||
}
|
||||
|
||||
func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t *testing.T) {
|
||||
r := &phpfpm{
|
||||
Urls: []string{"http://aninvalidone"},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := r.Gather(&acc)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), `Unable to connect to phpfpm status page 'http://aninvalidone': Get http://aninvalidone: dial tcp: lookup aninvalidone`)
|
||||
}
|
||||
|
||||
func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testing.T) {
|
||||
r := &phpfpm{
|
||||
Urls: []string{"/tmp/invalid.sock"},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := r.Gather(&acc)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, `Socket doesn't exist '/tmp/invalid.sock': stat /tmp/invalid.sock: no such file or directory`, err.Error())
|
||||
|
||||
}
|
||||
|
||||
const outputSample = `
|
||||
pool: www
|
||||
process manager: dynamic
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
_ "github.com/influxdb/telegraf/plugins/outputs/amon"
|
||||
_ "github.com/influxdb/telegraf/plugins/outputs/amqp"
|
||||
_ "github.com/influxdb/telegraf/plugins/outputs/datadog"
|
||||
_ "github.com/influxdb/telegraf/plugins/outputs/graphite"
|
||||
_ "github.com/influxdb/telegraf/plugins/outputs/influxdb"
|
||||
_ "github.com/influxdb/telegraf/plugins/outputs/kafka"
|
||||
_ "github.com/influxdb/telegraf/plugins/outputs/kinesis"
|
||||
|
|
|
@ -2,7 +2,10 @@ package amqp
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -17,6 +20,12 @@ type AMQP struct {
|
|||
URL string
|
||||
// AMQP exchange
|
||||
Exchange string
|
||||
// path to CA file
|
||||
SslCa string
|
||||
// path to host cert file
|
||||
SslCert string
|
||||
// path to cert key file
|
||||
SslKey string
|
||||
// Routing Key Tag
|
||||
RoutingTag string `toml:"routing_tag"`
|
||||
// InfluxDB database
|
||||
|
@ -46,6 +55,11 @@ var sampleConfig = `
|
|||
# ie, if this tag exists, it's value will be used as the routing key
|
||||
routing_tag = "host"
|
||||
|
||||
# Use ssl
|
||||
#ssl_ca = "/etc/telegraf/ca.pem"
|
||||
#ssl_cert = "/etc/telegraf/cert.pem"
|
||||
#ssl_key = "/etc/telegraf/key.pem"
|
||||
|
||||
# InfluxDB retention policy
|
||||
#retention_policy = "default"
|
||||
# InfluxDB database
|
||||
|
@ -64,7 +78,32 @@ func (q *AMQP) Connect() error {
|
|||
"retention_policy": q.RetentionPolicy,
|
||||
}
|
||||
|
||||
connection, err := amqp.Dial(q.URL)
|
||||
var connection *amqp.Connection
|
||||
var err error
|
||||
if q.SslCert != "" && q.SslKey != "" {
|
||||
// make new tls config
|
||||
cfg := new(tls.Config)
|
||||
if q.SslCa != "" {
|
||||
// create ca pool
|
||||
cfg.RootCAs = x509.NewCertPool()
|
||||
|
||||
// add self-signed cert
|
||||
if ca, err := ioutil.ReadFile(q.SslCa); err == nil {
|
||||
cfg.RootCAs.AppendCertsFromPEM(ca)
|
||||
} else {
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
if cert, err := tls.LoadX509KeyPair(q.SslCert, q.SslKey); err == nil {
|
||||
cfg.Certificates = append(cfg.Certificates, cert)
|
||||
} else {
|
||||
log.Println(err)
|
||||
}
|
||||
connection, err = amqp.DialTLS(q.URL, cfg)
|
||||
|
||||
} else {
|
||||
connection, err = amqp.Dial(q.URL)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
# Graphite Output Plugin
|
||||
|
||||
This plugin writes to [Graphite](http://graphite.readthedocs.org/en/latest/index.html) via raw TCP.
|
||||
|
||||
Parameters:
|
||||
|
||||
Servers []string
|
||||
Prefix string
|
||||
Timeout int
|
||||
|
||||
* `servers`: List of strings, ["mygraphiteserver:2003"].
|
||||
* `prefix`: String use to prefix all sent metrics.
|
||||
* `timeout`: Connection timeout in second.
|
|
@ -0,0 +1,134 @@
|
|||
package graphite
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/influxdb/influxdb/client/v2"
|
||||
"github.com/influxdb/telegraf/plugins/outputs"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Graphite struct {
|
||||
// URL is only for backwards compatability
|
||||
Servers []string
|
||||
Prefix string
|
||||
Timeout int
|
||||
conns []net.Conn
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# TCP endpoint for your graphite instance.
|
||||
servers = ["localhost:2003"]
|
||||
# Prefix metrics name
|
||||
prefix = ""
|
||||
# timeout in seconds for the write connection to graphite
|
||||
timeout = 2
|
||||
`
|
||||
|
||||
func (g *Graphite) Connect() error {
|
||||
// Set default values
|
||||
if g.Timeout <= 0 {
|
||||
g.Timeout = 2
|
||||
}
|
||||
if len(g.Servers) == 0 {
|
||||
g.Servers = append(g.Servers, "localhost:2003")
|
||||
}
|
||||
// Get Connections
|
||||
var conns []net.Conn
|
||||
for _, server := range g.Servers {
|
||||
conn, err := net.DialTimeout("tcp", server, time.Duration(g.Timeout)*time.Second)
|
||||
if err == nil {
|
||||
conns = append(conns, conn)
|
||||
}
|
||||
}
|
||||
g.conns = conns
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *Graphite) Close() error {
|
||||
// Closing all connections
|
||||
for _, conn := range g.conns {
|
||||
conn.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *Graphite) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (g *Graphite) Description() string {
|
||||
return "Configuration for Graphite server to send metrics to"
|
||||
}
|
||||
|
||||
// Choose a random server in the cluster to write to until a successful write
|
||||
// occurs, logging each unsuccessful. If all servers fail, return error.
|
||||
func (g *Graphite) Write(points []*client.Point) error {
|
||||
// Prepare data
|
||||
var bp []string
|
||||
for _, point := range points {
|
||||
// Get name
|
||||
name := point.Name()
|
||||
// Convert UnixNano to Unix timestamps
|
||||
timestamp := point.UnixNano() / 1000000000
|
||||
|
||||
for field_name, value := range point.Fields() {
|
||||
// Convert value
|
||||
value_str := fmt.Sprintf("%#v", value)
|
||||
// Write graphite point
|
||||
var graphitePoint string
|
||||
if name == field_name {
|
||||
graphitePoint = fmt.Sprintf("%s.%s %s %d\n",
|
||||
strings.Replace(point.Tags()["host"], ".", "_", -1),
|
||||
strings.Replace(name, ".", "_", -1),
|
||||
value_str,
|
||||
timestamp)
|
||||
} else {
|
||||
graphitePoint = fmt.Sprintf("%s.%s.%s %s %d\n",
|
||||
strings.Replace(point.Tags()["host"], ".", "_", -1),
|
||||
strings.Replace(name, ".", "_", -1),
|
||||
strings.Replace(field_name, ".", "_", -1),
|
||||
value_str,
|
||||
timestamp)
|
||||
}
|
||||
if g.Prefix != "" {
|
||||
graphitePoint = fmt.Sprintf("%s.%s", g.Prefix, graphitePoint)
|
||||
}
|
||||
bp = append(bp, graphitePoint)
|
||||
//fmt.Printf(graphitePoint)
|
||||
}
|
||||
}
|
||||
graphitePoints := strings.Join(bp, "")
|
||||
|
||||
// This will get set to nil if a successful write occurs
|
||||
err := errors.New("Could not write to any Graphite server in cluster\n")
|
||||
|
||||
// Send data to a random server
|
||||
p := rand.Perm(len(g.conns))
|
||||
for _, n := range p {
|
||||
if _, e := fmt.Fprintf(g.conns[n], graphitePoints); e != nil {
|
||||
// Error
|
||||
log.Println("ERROR: " + err.Error())
|
||||
// Let's try the next one
|
||||
} else {
|
||||
// Success
|
||||
err = nil
|
||||
break
|
||||
}
|
||||
}
|
||||
// try to reconnect
|
||||
if err != nil {
|
||||
g.Connect()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("graphite", func() outputs.Output {
|
||||
return &Graphite{}
|
||||
})
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
package graphite
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"net"
|
||||
"net/textproto"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/client/v2"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGraphiteError(t *testing.T) {
|
||||
// Init plugin
|
||||
g := Graphite{
|
||||
Servers: []string{"127.0.0.1:2003", "127.0.0.1:12003"},
|
||||
Prefix: "my.prefix",
|
||||
}
|
||||
// Init points
|
||||
pt1, _ := client.NewPoint(
|
||||
"mymeasurement",
|
||||
map[string]string{"host": "192.168.0.1"},
|
||||
map[string]interface{}{"mymeasurement": float64(3.14)},
|
||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
// Prepare point list
|
||||
var points []*client.Point
|
||||
points = append(points, pt1)
|
||||
// Error
|
||||
err1 := g.Connect()
|
||||
require.NoError(t, err1)
|
||||
err2 := g.Write(points)
|
||||
require.Error(t, err2)
|
||||
assert.Equal(t, "Could not write to any Graphite server in cluster\n", err2.Error())
|
||||
}
|
||||
|
||||
func TestGraphiteOK(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
// Init plugin
|
||||
g := Graphite{
|
||||
Prefix: "my.prefix",
|
||||
}
|
||||
// Init points
|
||||
pt1, _ := client.NewPoint(
|
||||
"mymeasurement",
|
||||
map[string]string{"host": "192.168.0.1"},
|
||||
map[string]interface{}{"mymeasurement": float64(3.14)},
|
||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
pt2, _ := client.NewPoint(
|
||||
"mymeasurement",
|
||||
map[string]string{"host": "192.168.0.1"},
|
||||
map[string]interface{}{"value": float64(3.14)},
|
||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
pt3, _ := client.NewPoint(
|
||||
"my_measurement",
|
||||
map[string]string{"host": "192.168.0.1"},
|
||||
map[string]interface{}{"value": float64(3.14)},
|
||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
// Prepare point list
|
||||
var points []*client.Point
|
||||
points = append(points, pt1)
|
||||
points = append(points, pt2)
|
||||
points = append(points, pt3)
|
||||
// Start TCP server
|
||||
wg.Add(1)
|
||||
go TCPServer(t, &wg)
|
||||
wg.Wait()
|
||||
// Connect
|
||||
wg.Add(1)
|
||||
err1 := g.Connect()
|
||||
wg.Wait()
|
||||
require.NoError(t, err1)
|
||||
// Send Data
|
||||
err2 := g.Write(points)
|
||||
require.NoError(t, err2)
|
||||
wg.Add(1)
|
||||
// Waiting TCPserver
|
||||
wg.Wait()
|
||||
g.Close()
|
||||
}
|
||||
|
||||
func TCPServer(t *testing.T, wg *sync.WaitGroup) {
|
||||
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
|
||||
wg.Done()
|
||||
conn, _ := tcpServer.Accept()
|
||||
wg.Done()
|
||||
reader := bufio.NewReader(conn)
|
||||
tp := textproto.NewReader(reader)
|
||||
data1, _ := tp.ReadLine()
|
||||
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data1)
|
||||
data2, _ := tp.ReadLine()
|
||||
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.value 3.14 1289430000", data2)
|
||||
data3, _ := tp.ReadLine()
|
||||
assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.value 3.14 1289430000", data3)
|
||||
conn.Close()
|
||||
wg.Done()
|
||||
}
|
|
@ -1,12 +1,14 @@
|
|||
package kafka
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/influxdb/influxdb/client/v2"
|
||||
"github.com/influxdb/telegraf/plugins/outputs"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
type Kafka struct {
|
||||
|
@ -16,7 +18,16 @@ type Kafka struct {
|
|||
Topic string
|
||||
// Routing Key Tag
|
||||
RoutingTag string `toml:"routing_tag"`
|
||||
// TLS client certificate
|
||||
Certificate string
|
||||
// TLS client key
|
||||
Key string
|
||||
// TLS certificate authority
|
||||
CA string
|
||||
// Verfiy SSL certificate chain
|
||||
VerifySsl bool
|
||||
|
||||
tlsConfig tls.Config
|
||||
producer sarama.SyncProducer
|
||||
}
|
||||
|
||||
|
@ -28,10 +39,60 @@ var sampleConfig = `
|
|||
# Telegraf tag to use as a routing key
|
||||
# ie, if this tag exists, it's value will be used as the routing key
|
||||
routing_tag = "host"
|
||||
|
||||
# Optional TLS configuration:
|
||||
# Client certificate
|
||||
certificate = ""
|
||||
# Client key
|
||||
key = ""
|
||||
# Certificate authority file
|
||||
ca = ""
|
||||
# Verify SSL certificate chain
|
||||
verify_ssl = false
|
||||
`
|
||||
|
||||
func createTlsConfiguration(k *Kafka) (t *tls.Config, err error) {
|
||||
if k.Certificate != "" && k.Key != "" && k.CA != "" {
|
||||
cert, err := tls.LoadX509KeyPair(k.Certificate, k.Key)
|
||||
if err != nil {
|
||||
return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS client key/certificate: %s",
|
||||
err))
|
||||
}
|
||||
|
||||
caCert, err := ioutil.ReadFile(k.CA)
|
||||
if err != nil {
|
||||
return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS CA: %s",
|
||||
err))
|
||||
}
|
||||
|
||||
caCertPool := x509.NewCertPool()
|
||||
caCertPool.AppendCertsFromPEM(caCert)
|
||||
|
||||
t = &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
RootCAs: caCertPool,
|
||||
InsecureSkipVerify: k.VerifySsl,
|
||||
}
|
||||
}
|
||||
// will be nil by default if nothing is provided
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (k *Kafka) Connect() error {
|
||||
producer, err := sarama.NewSyncProducer(k.Brokers, nil)
|
||||
config := sarama.NewConfig()
|
||||
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
|
||||
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
|
||||
tlsConfig, err := createTlsConfiguration(k)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if tlsConfig != nil {
|
||||
config.Net.TLS.Config = tlsConfig
|
||||
config.Net.TLS.Enable = true
|
||||
}
|
||||
|
||||
producer, err := sarama.NewSyncProducer(k.Brokers, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue