Fixed formatting

This commit is contained in:
Greg Linton 2016-05-10 14:08:11 -04:00
parent f4c8d2f683
commit 08c548d408
1 changed files with 181 additions and 183 deletions

View File

@ -1,30 +1,29 @@
package haproxy package haproxy
import ( import (
"encoding/csv" "encoding/csv"
"fmt" "fmt"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"io" "io"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
) )
type haproxy struct { type haproxy struct {
Servers []string Servers []string
client *http.Client client *http.Client
} }
var sampleConfig = ` var sampleConfig = `
## An array of address to gather stats about. Specify an ip on hostname ## An array of address to gather stats about. Specify an ip on hostname
## with optional port. ie localhost, 10.10.3.33:1936, etc. ## with optional port. ie localhost, 10.10.3.33:1936, etc.
## If no servers are specified, then default to 127.0.0.1:1936 ## If no servers are specified, then default to 127.0.0.1:1936
servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"] servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"]
## Or you can also use local socket ## Or you can also use local socket
@ -32,215 +31,214 @@ var sampleConfig = `
` `
func (r *haproxy) SampleConfig() string { func (r *haproxy) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (r *haproxy) Description() string { func (r *haproxy) Description() string {
return "Read metrics of haproxy, via socket or csv stats page" return "Read metrics of haproxy, via socket or csv stats page"
} }
// Reads stats from all configured servers accumulates stats. // Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any). // Returns one of the errors encountered while gather stats (if any).
func (g *haproxy) Gather(acc telegraf.Accumulator) error { func (g *haproxy) Gather(acc telegraf.Accumulator) error {
if len(g.Servers) == 0 { if len(g.Servers) == 0 {
return g.gatherServer("http://127.0.0.1:1936", acc) return g.gatherServer("http://127.0.0.1:1936", acc)
} }
var wg sync.WaitGroup var wg sync.WaitGroup
var outerr error var outerr error
for _, serv := range g.Servers { for _, serv := range g.Servers {
wg.Add(1) wg.Add(1)
go func(serv string) { go func(serv string) {
defer wg.Done() defer wg.Done()
outerr = g.gatherServer(serv, acc) outerr = g.gatherServer(serv, acc)
}(serv) }(serv)
} }
wg.Wait() wg.Wait()
return outerr return outerr
} }
func (g *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error { func (g *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error {
var socketPath string var socketPath string
socketAddr := strings.Split(addr, ":") socketAddr := strings.Split(addr, ":")
if len(socketAddr) >= 2 { if len(socketAddr) >= 2 {
socketPath = socketAddr[1] socketPath = socketAddr[1]
} else { } else {
socketPath = socketAddr[0] socketPath = socketAddr[0]
} }
c, err := net.Dial("unix", socketPath) c, err := net.Dial("unix", socketPath)
if err != nil { if err != nil {
return fmt.Errorf("Could not connect to socket '%s': %s", addr, err) return fmt.Errorf("Could not connect to socket '%s': %s", addr, err)
} }
_, errw := c.Write([]byte("show stat\n")) _, errw := c.Write([]byte("show stat\n"))
if errw != nil { if errw != nil {
return fmt.Errorf("Could not write to socket '%s': %s", addr, errw) return fmt.Errorf("Could not write to socket '%s': %s", addr, errw)
} }
return importCsvResult(c, acc, socketPath) return importCsvResult(c, acc, socketPath)
} }
func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error { func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
if !strings.HasPrefix(addr, "http") { if !strings.HasPrefix(addr, "http") {
return g.gatherServerSocket(addr, acc) return g.gatherServerSocket(addr, acc)
} }
if g.client == nil { if g.client == nil {
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)} tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
client := &http.Client{ client := &http.Client{
Transport: tr, Transport: tr,
Timeout: time.Duration(4 * time.Second), Timeout: time.Duration(4 * time.Second),
} }
g.client = client g.client = client
} }
u, err := url.Parse(addr) u, err := url.Parse(addr)
if err != nil { if err != nil {
return fmt.Errorf("Unable parse server address '%s': %s", addr, err) return fmt.Errorf("Unable parse server address '%s': %s", addr, err)
} }
req, err := http.NewRequest("GET", fmt.Sprintf("%s://%s%s/;csv", u.Scheme, u.Host, u.Path), nil) req, err := http.NewRequest("GET", fmt.Sprintf("%s://%s%s/;csv", u.Scheme, u.Host, u.Path), nil)
if u.User != nil { if u.User != nil {
p, _ := u.User.Password() p, _ := u.User.Password()
req.SetBasicAuth(u.User.Username(), p) req.SetBasicAuth(u.User.Username(), p)
} }
res, err := g.client.Do(req) res, err := g.client.Do(req)
if err != nil { if err != nil {
return fmt.Errorf("Unable to connect to haproxy server '%s': %s", addr, err) return fmt.Errorf("Unable to connect to haproxy server '%s': %s", addr, err)
} }
if res.StatusCode != 200 { if res.StatusCode != 200 {
return fmt.Errorf("Unable to get valid stat result from '%s': %s", addr, err) return fmt.Errorf("Unable to get valid stat result from '%s': %s", addr, err)
} }
return importCsvResult(res.Body, acc, u.Host) return importCsvResult(res.Body, acc, u.Host)
} }
func importCsvResult(r io.Reader, acc telegraf.Accumulator, host string) error { func importCsvResult(r io.Reader, acc telegraf.Accumulator, host string) error {
csv := csv.NewReader(r) csv := csv.NewReader(r)
result, err := csv.ReadAll() result, err := csv.ReadAll()
now := time.Now() now := time.Now()
var keys []string
var keys []string px_sv_status := make(map[string]map[string]int)
px_sv_status := make(map[string]map[string]int) non_stat_fields := map[string]bool{
non_stat_fields := map[string]bool { "pxname": true,
"pxname": true, "svname": true,
"svname": true, "status": true,
"status": true, "tracked": true,
"tracked": true, "check_status": true,
"check_status": true, "last_chk": true,
"last_chk": true, "pid": true,
"pid": true, "iid": true,
"iid": true, "sid": true,
"sid": true, "lastchg": true,
"lastchg": true, "type": true,
"type": true, "check_code": true,
"check_code": true, }
}
for i := range result { for i := range result {
if i == 0 { if i == 0 {
keys = result[i] keys = result[i]
keys[0] = strings.Replace(keys[0], "# ", "", -1) keys[0] = strings.Replace(keys[0], "# ", "", -1)
} else { } else {
row := make(map[string]string, len(result[i]))
for f, v := range result[i] { row := make(map[string]string, len(result[i]))
row[keys[f]] = v
}
tags := map[string]string{
"server": host,
"proxy": row["pxname"],
"sv": row["svname"],
}
if row["svname"] != "BACKEND" && row["svname"] != "FRONTEND" { for f, v := range result[i] {
if len(px_sv_status[row["pxname"]]) == 0 { row[keys[f]] = v
px_sv_status[row["pxname"]] = map[string]int{ }
"_status_act": 0,
"_status_act_up": 0,
"_status_act_down": 0,
"_status_act_maint": 0,
"_status_act_drain": 0,
"_status_act_other": 0,
"_status_bck": 0,
"_status_bck_up": 0,
"_status_bck_down": 0,
"_status_bck_maint": 0,
"_status_bck_drain": 0,
"_status_bck_other": 0,
}
}
if row["act"] == "1" {
px_sv_status[row["pxname"]]["_status_act"]++
switch row["status"] {
case "UP":
px_sv_status[row["pxname"]]["_status_act_up"]++
case "DOWN":
px_sv_status[row["pxname"]]["_status_act_down"]++
case "MAINT":
px_sv_status[row["pxname"]]["_status_act_maint"]++
case "DRAIN":
px_sv_status[row["pxname"]]["_status_act_drain"]++
default:
px_sv_status[row["pxname"]]["_status_act_other"]++
}
} else {
px_sv_status[row["pxname"]]["_status_bck"]++
switch row["status"] {
case "UP":
px_sv_status[row["pxname"]]["_status_bck_up"]++
case "DOWN":
px_sv_status[row["pxname"]]["_status_bck_down"]++
case "MAINT":
px_sv_status[row["pxname"]]["_status_bck_maint"]++
case "DRAIN":
px_sv_status[row["pxname"]]["_status_bck_drain"]++
default:
px_sv_status[row["pxname"]]["_status_bck_other"]++
}
}
}
if row["svname"] == "BACKEND" && len(px_sv_status[row["pxname"]]) > 0 { tags := map[string]string{
for s, c := range px_sv_status[row["pxname"]] { "server": host,
row[s] = strconv.Itoa(c) "proxy": row["pxname"],
} "sv": row["svname"],
} }
fields := make(map[string]interface{})
for field, v := range row {
if non_stat_fields[field] {
continue
}
ival, err := strconv.ParseUint(v, 10, 64) if row["svname"] != "BACKEND" && row["svname"] != "FRONTEND" {
if err == nil { if len(px_sv_status[row["pxname"]]) == 0 {
fields[field] = ival px_sv_status[row["pxname"]] = map[string]int{
} "_status_act": 0,
} "_status_act_up": 0,
acc.AddFields("haproxy", fields, tags, now) "_status_act_down": 0,
} "_status_act_maint": 0,
} "_status_act_drain": 0,
return err "_status_act_other": 0,
"_status_bck": 0,
"_status_bck_up": 0,
"_status_bck_down": 0,
"_status_bck_maint": 0,
"_status_bck_drain": 0,
"_status_bck_other": 0,
}
}
if row["act"] == "1" {
px_sv_status[row["pxname"]]["_status_act"]++
switch row["status"] {
case "UP":
px_sv_status[row["pxname"]]["_status_act_up"]++
case "DOWN":
px_sv_status[row["pxname"]]["_status_act_down"]++
case "MAINT":
px_sv_status[row["pxname"]]["_status_act_maint"]++
case "DRAIN":
px_sv_status[row["pxname"]]["_status_act_drain"]++
default:
px_sv_status[row["pxname"]]["_status_act_other"]++
}
} else {
px_sv_status[row["pxname"]]["_status_bck"]++
switch row["status"] {
case "UP":
px_sv_status[row["pxname"]]["_status_bck_up"]++
case "DOWN":
px_sv_status[row["pxname"]]["_status_bck_down"]++
case "MAINT":
px_sv_status[row["pxname"]]["_status_bck_maint"]++
case "DRAIN":
px_sv_status[row["pxname"]]["_status_bck_drain"]++
default:
px_sv_status[row["pxname"]]["_status_bck_other"]++
}
}
}
if row["svname"] == "BACKEND" && len(px_sv_status[row["pxname"]]) > 0 {
for s, c := range px_sv_status[row["pxname"]] {
row[s] = strconv.Itoa(c)
}
}
fields := make(map[string]interface{})
for field, v := range row {
if non_stat_fields[field] {
continue
}
ival, err := strconv.ParseUint(v, 10, 64)
if err == nil {
fields[field] = ival
}
}
acc.AddFields("haproxy", fields, tags, now)
}
}
return err
} }
func init() { func init() {
inputs.Add("haproxy", func() telegraf.Input { inputs.Add("haproxy", func() telegraf.Input {
return &haproxy{} return &haproxy{}
}) })
} }