Allow globs in FPM unix socket paths (#7089)
This commit is contained in:
parent
a34180459a
commit
88216eb4d2
|
@ -19,6 +19,8 @@ Get phpfpm stats using either HTTP status page or fpm socket.
|
||||||
## "/var/run/php5-fpm.sock"
|
## "/var/run/php5-fpm.sock"
|
||||||
## or using a custom fpm status path:
|
## or using a custom fpm status path:
|
||||||
## "/var/run/php5-fpm.sock:fpm-custom-status-path"
|
## "/var/run/php5-fpm.sock:fpm-custom-status-path"
|
||||||
|
## glob patterns are also supported:
|
||||||
|
## "/var/run/php*.sock"
|
||||||
##
|
##
|
||||||
## - fcgi: the URL must start with fcgi:// or cgi://, and port must be present, ie:
|
## - fcgi: the URL must start with fcgi:// or cgi://, and port must be present, ie:
|
||||||
## "fcgi://10.0.0.12:9000/status"
|
## "fcgi://10.0.0.12:9000/status"
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/internal/globpath"
|
||||||
"github.com/influxdata/telegraf/internal/tls"
|
"github.com/influxdata/telegraf/internal/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
)
|
)
|
||||||
|
@ -95,7 +96,12 @@ func (g *phpfpm) Gather(acc telegraf.Accumulator) error {
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for _, serv := range g.Urls {
|
urls, err := expandUrls(g.Urls)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, serv := range urls {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(serv string) {
|
go func(serv string) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
@ -153,18 +159,10 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
|
||||||
statusPath = "status"
|
statusPath = "status"
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
socketAddr := strings.Split(addr, ":")
|
socketPath, statusPath = unixSocketPaths(addr)
|
||||||
if len(socketAddr) >= 2 {
|
if statusPath == "" {
|
||||||
socketPath = socketAddr[0]
|
|
||||||
statusPath = socketAddr[1]
|
|
||||||
} else {
|
|
||||||
socketPath = socketAddr[0]
|
|
||||||
statusPath = "status"
|
statusPath = "status"
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := os.Stat(socketPath); os.IsNotExist(err) {
|
|
||||||
return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err)
|
|
||||||
}
|
|
||||||
fcgi, err = newFcgiClient("unix", socketPath)
|
fcgi, err = newFcgiClient("unix", socketPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -277,6 +275,70 @@ func importMetric(r io.Reader, acc telegraf.Accumulator, addr string) (poolStat,
|
||||||
return stats, nil
|
return stats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func expandUrls(urls []string) ([]string, error) {
|
||||||
|
addrs := make([]string, 0, len(urls))
|
||||||
|
for _, url := range urls {
|
||||||
|
if isNetworkURL(url) {
|
||||||
|
addrs = append(addrs, url)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
paths, err := globUnixSocket(url)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
addrs = append(addrs, paths...)
|
||||||
|
}
|
||||||
|
return addrs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func globUnixSocket(url string) ([]string, error) {
|
||||||
|
pattern, status := unixSocketPaths(url)
|
||||||
|
glob, err := globpath.Compile(pattern)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not compile glob %q: %v", pattern, err)
|
||||||
|
}
|
||||||
|
paths := glob.Match()
|
||||||
|
if len(paths) == 0 {
|
||||||
|
if _, err := os.Stat(paths[0]); err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return nil, fmt.Errorf("Socket doesn't exist '%s': %s", pattern, err)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
addrs := make([]string, 0, len(paths))
|
||||||
|
|
||||||
|
for _, path := range paths {
|
||||||
|
if status != "" {
|
||||||
|
status = fmt.Sprintf(":%s", status)
|
||||||
|
}
|
||||||
|
addrs = append(addrs, fmt.Sprintf("%s%s", path, status))
|
||||||
|
}
|
||||||
|
|
||||||
|
return addrs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func unixSocketPaths(addr string) (string, string) {
|
||||||
|
var socketPath, statusPath string
|
||||||
|
|
||||||
|
socketAddr := strings.Split(addr, ":")
|
||||||
|
if len(socketAddr) >= 2 {
|
||||||
|
socketPath = socketAddr[0]
|
||||||
|
statusPath = socketAddr[1]
|
||||||
|
} else {
|
||||||
|
socketPath = socketAddr[0]
|
||||||
|
statusPath = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return socketPath, statusPath
|
||||||
|
}
|
||||||
|
|
||||||
|
func isNetworkURL(addr string) bool {
|
||||||
|
return strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") || strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://")
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("phpfpm", func() telegraf.Input {
|
inputs.Add("phpfpm", func() telegraf.Input {
|
||||||
return &phpfpm{}
|
return &phpfpm{}
|
||||||
|
|
|
@ -148,6 +148,71 @@ func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {
|
||||||
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
|
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPhpFpmGeneratesMetrics_From_Multiple_Sockets_With_Glob(t *testing.T) {
|
||||||
|
// Create a socket in /tmp because we always have write permission and if the
|
||||||
|
// removing of socket fail when system restart /tmp is clear so
|
||||||
|
// we don't have junk files around
|
||||||
|
var randomNumber int64
|
||||||
|
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
|
||||||
|
socket1 := fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber)
|
||||||
|
tcp1, err := net.Listen("unix", socket1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Cannot initialize server on port ")
|
||||||
|
}
|
||||||
|
defer tcp1.Close()
|
||||||
|
|
||||||
|
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
|
||||||
|
socket2 := fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber)
|
||||||
|
tcp2, err := net.Listen("unix", socket2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Cannot initialize server on port ")
|
||||||
|
}
|
||||||
|
defer tcp2.Close()
|
||||||
|
|
||||||
|
s := statServer{}
|
||||||
|
go fcgi.Serve(tcp1, s)
|
||||||
|
go fcgi.Serve(tcp2, s)
|
||||||
|
|
||||||
|
r := &phpfpm{
|
||||||
|
Urls: []string{"/tmp/test-fpm[\\-0-9]*.sock"},
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc1, acc2 testutil.Accumulator
|
||||||
|
|
||||||
|
err = acc1.GatherError(r.Gather)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = acc2.GatherError(r.Gather)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tags1 := map[string]string{
|
||||||
|
"pool": "www",
|
||||||
|
"url": socket1,
|
||||||
|
}
|
||||||
|
|
||||||
|
tags2 := map[string]string{
|
||||||
|
"pool": "www",
|
||||||
|
"url": socket2,
|
||||||
|
}
|
||||||
|
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"start_since": int64(1991),
|
||||||
|
"accepted_conn": int64(3),
|
||||||
|
"listen_queue": int64(1),
|
||||||
|
"max_listen_queue": int64(0),
|
||||||
|
"listen_queue_len": int64(0),
|
||||||
|
"idle_processes": int64(1),
|
||||||
|
"active_processes": int64(1),
|
||||||
|
"total_processes": int64(2),
|
||||||
|
"max_active_processes": int64(1),
|
||||||
|
"max_children_reached": int64(2),
|
||||||
|
"slow_requests": int64(1),
|
||||||
|
}
|
||||||
|
|
||||||
|
acc1.AssertContainsTaggedFields(t, "phpfpm", fields, tags1)
|
||||||
|
acc2.AssertContainsTaggedFields(t, "phpfpm", fields, tags2)
|
||||||
|
}
|
||||||
|
|
||||||
func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
|
func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
|
||||||
// Create a socket in /tmp because we always have write permission. If the
|
// Create a socket in /tmp because we always have write permission. If the
|
||||||
// removing of socket fail we won't have junk files around. Cuz when system
|
// removing of socket fail we won't have junk files around. Cuz when system
|
||||||
|
@ -227,7 +292,7 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testi
|
||||||
|
|
||||||
err := acc.GatherError(r.Gather)
|
err := acc.GatherError(r.Gather)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
assert.Equal(t, `Socket doesn't exist '/tmp/invalid.sock': stat /tmp/invalid.sock: no such file or directory`, err.Error())
|
assert.Equal(t, `dial unix /tmp/invalid.sock: connect: no such file or directory`, err.Error())
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue