diff --git a/CHANGELOG.md b/CHANGELOG.md index 762c7ceff..4ab7df1b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ consistent with the behavior of `collection_jitter`. - [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin. - [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag. - [#1411](https://github.com/influxdata/telegraf/pull/1411): Implement support for fetching hddtemp data +- [#1400](https://github.com/influxdata/telegraf/pull/1400): Add supoort for Pgbouncer - [#1340](https://github.com/influxdata/telegraf/issues/1340): statsd: do not log every dropped metric. - [#1368](https://github.com/influxdata/telegraf/pull/1368): Add precision rounding to all metrics on collection. - [#1390](https://github.com/influxdata/telegraf/pull/1390): Add support for Tengine diff --git a/Makefile b/Makefile index 19eccbb70..1859eba6d 100644 --- a/Makefile +++ b/Makefile @@ -57,6 +57,13 @@ docker-run: docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim + docker run --name pgbouncer \ + -p "6432:6432" \ + -e PGB_USERLISTS="postgres:postgres" \ + -e PGB_ADMIN_USERS="postgres" \ + -e PGB_STATS_USERS="postgres" \ + --link postgres:pg \ + -d jsvisa/pgbouncer # Run docker containers necessary for CircleCI unit tests docker-run-circle: @@ -70,11 +77,17 @@ docker-run-circle: docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim + docker run --name pgbouncer \ + -p "6432:6432" \ + -e PGB_USERLISTS="postgres:postgres" \ + -e PGB_ADMIN_USERS="postgres" \ + -e PGB_STATS_USERS="postgres" \ + -d jsvisa/pgbouncer # Kill all docker containers, ignore errors docker-kill: - -docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann snmp - -docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann snmp + -docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann snmp pgbouncer + -docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann snmp pgbouncer # Run full unit tests using docker containers (includes setup and teardown) test: vet docker-kill docker-run diff --git a/README.md b/README.md index 74bbf2a4f..c9fe12351 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,7 @@ Currently implemented sources: * [nsq](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq) * [nstat](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nstat) * [ntpq](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/ntpq) +* [pgbouncer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/pgbouncer) * [phpfpm](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/phpfpm) * [phusion passenger](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/passenger) * [ping](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/ping) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index ddb7d4039..dacbff644 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -46,6 +46,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/nstat" _ "github.com/influxdata/telegraf/plugins/inputs/ntpq" _ "github.com/influxdata/telegraf/plugins/inputs/passenger" + _ "github.com/influxdata/telegraf/plugins/inputs/pgbouncer" _ "github.com/influxdata/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdata/telegraf/plugins/inputs/ping" _ "github.com/influxdata/telegraf/plugins/inputs/postgresql" diff --git a/plugins/inputs/pgbouncer/README.md b/plugins/inputs/pgbouncer/README.md new file mode 100644 index 000000000..31e883f11 --- /dev/null +++ b/plugins/inputs/pgbouncer/README.md @@ -0,0 +1,62 @@ +# Pgbouncer plugin + +This pgbouncer plugin provides metrics for your pgbouncer connection information. + +### Configuration: + +```toml +# Description +[[inputs.pgbouncer]] + ## specify address via a url matching: + ## postgres://[pqgotest[:password]]@localhost:port[/dbname]\ + ## ?sslmode=[disable|verify-ca|verify-full] + ## or a simple string: + ## host=localhost user=pqotest port=... password=... sslmode=... dbname=... + ## + ## All connection parameters are optional, except for dbname, + ## you need to set it always as pgbouncer. + address = "host=localhost user=postgres port=6432 sslmode=disable dbname=pgbouncer" + + ## A list of databases to pull metrics about. If not specified, metrics for all + ## databases are gathered. + # databases = ["app_production", "testing"] +` +``` + +### Measurements & Fields: + +Pgbouncer provides two measurement named "pgbouncer_pools" and "pgbouncer_stats", each have the fields as below: + +#### pgbouncer_pools + +- cl_active +- cl_waiting +- maxwait +- pool_mode +- sv_active +- sv_idle +- sv_login +- sv_tested +- sv_used + +### pgbouncer_stats + +- avg_query +- avg_recv +- avg_req +- avg_sent +- total_query_time +- total_received +- total_requests +- total_sent + +More information about the meaning of these metrics can be found in the [PgBouncer usage](https://pgbouncer.github.io/usage.html) + +### Example Output: + +``` +$ ./telegraf -config telegraf.conf -input-filter pgbouncer -test +> pgbouncer_pools,db=pgbouncer,host=localhost,pool_mode=transaction,server=host\=localhost\ user\=elena\ port\=6432\ dbname\=pgbouncer\ sslmode\=disable,user=elena cl_active=1500i,cl_waiting=0i,maxwait=0i,sv_active=0i,sv_idle=5i,sv_login=0i,sv_tested=0i,sv_used=5i 1466594520564518897 +> pgbouncer_stats,db=pgbouncer,host=localhost,server=host\=localhost\ user\=elena\ port\=6432\ dbname\=pgbouncer\ sslmode\=disable avg_query=1157i,avg_recv=36727i,avg_req=131i,avg_sent=23359i,total_query_time=252173878876i,total_received=55956189078i,total_requests=193601888i,total_sent=36703848280i 1466594520564825345 +``` + diff --git a/plugins/inputs/pgbouncer/pgbouncer.go b/plugins/inputs/pgbouncer/pgbouncer.go new file mode 100644 index 000000000..df4179cd6 --- /dev/null +++ b/plugins/inputs/pgbouncer/pgbouncer.go @@ -0,0 +1,206 @@ +package pgbouncer + +import ( + "bytes" + "database/sql" + "regexp" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + + "github.com/lib/pq" +) + +type Pgbouncer struct { + Address string + Databases []string + OrderedColumns []string + AllColumns []string + sanitizedAddress string +} + +var ignoredColumns = map[string]bool{"pool_mode": true, "database": true, "user": true} + +var sampleConfig = ` + ## specify address via a url matching: + ## postgres://[pqgotest[:password]]@localhost:port[/dbname]\ + ## ?sslmode=[disable|verify-ca|verify-full] + ## or a simple string: + ## host=localhost user=pqotest port=6432 password=... sslmode=... dbname=pgbouncer + ## + ## All connection parameters are optional, except for dbname, + ## you need to set it always as pgbouncer. + address = "host=localhost user=postgres port=6432 sslmode=disable dbname=pgbouncer" + + ## A list of databases to pull metrics about. If not specified, metrics for all + ## databases are gathered. + # databases = ["app_production", "testing"] +` + +func (p *Pgbouncer) SampleConfig() string { + return sampleConfig +} + +func (p *Pgbouncer) Description() string { + return "Read metrics from one or many pgbouncer servers" +} + +func (p *Pgbouncer) IgnoredColumns() map[string]bool { + return ignoredColumns +} + +var localhost = "host=localhost port=6432 sslmode=disable dbname=pgbouncer" + +func (p *Pgbouncer) Gather(acc telegraf.Accumulator) error { + if p.Address == "" || p.Address == "localhost" { + p.Address = localhost + } + + db, err := sql.Open("postgres", p.Address) + if err != nil { + return err + } + + defer db.Close() + + queries := map[string]string{"pools": "SHOW POOLS", "stats": "SHOW STATS"} + + for metric, query := range queries { + rows, err := db.Query(query) + if err != nil { + return err + } + + defer rows.Close() + + // grab the column information from the result + p.OrderedColumns, err = rows.Columns() + if err != nil { + return err + } else { + p.AllColumns = make([]string, len(p.OrderedColumns)) + copy(p.AllColumns, p.OrderedColumns) + } + + for rows.Next() { + err = p.accRow(rows, metric, acc) + if err != nil { + return err + } + } + } + return nil +} + +type scanner interface { + Scan(dest ...interface{}) error +} + +var passwordKVMatcher, _ = regexp.Compile("password=\\S+ ?") + +func (p *Pgbouncer) SanitizedAddress() (_ string, err error) { + var canonicalizedAddress string + if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") { + canonicalizedAddress, err = pq.ParseURL(p.Address) + if err != nil { + return p.sanitizedAddress, err + } + } else { + canonicalizedAddress = p.Address + } + p.sanitizedAddress = passwordKVMatcher.ReplaceAllString(canonicalizedAddress, "") + + return p.sanitizedAddress, err +} + +func (p *Pgbouncer) accRow(row scanner, metric string, acc telegraf.Accumulator) error { + var columnVars []interface{} + var tags = make(map[string]string) + var dbname, user, poolMode bytes.Buffer + + // this is where we'll store the column name with its *interface{} + columnMap := make(map[string]*interface{}) + + for _, column := range p.OrderedColumns { + columnMap[column] = new(interface{}) + } + + // populate the array of interface{} with the pointers in the right order + for i := 0; i < len(columnMap); i++ { + columnVars = append(columnVars, columnMap[p.OrderedColumns[i]]) + } + + // deconstruct array of variables and send to Scan + err := row.Scan(columnVars...) + + if err != nil { + return err + } + + // extract the database name from the column map + dbnameChars := (*columnMap["database"]).([]uint8) + for i := 0; i < len(dbnameChars); i++ { + dbname.WriteString(string(dbnameChars[i])) + } + + if p.ignoreDatabase(dbname.String()) { + return nil + } + + tags["db"] = dbname.String() + + if columnMap["user"] != nil { + userChars := (*columnMap["user"]).([]uint8) + for i := 0; i < len(userChars); i++ { + user.WriteString(string(userChars[i])) + } + tags["user"] = user.String() + } + + if columnMap["pool_mode"] != nil { + poolChars := (*columnMap["pool_mode"]).([]uint8) + for i := 0; i < len(poolChars); i++ { + poolMode.WriteString(string(poolChars[i])) + } + tags["pool_mode"] = poolMode.String() + } + + var tagAddress string + tagAddress, err = p.SanitizedAddress() + if err != nil { + return err + } else { + tags["server"] = tagAddress + } + + fields := make(map[string]interface{}) + for col, val := range columnMap { + _, ignore := ignoredColumns[col] + if !ignore { + fields[col] = *val + } + } + acc.AddFields("pgbouncer_"+metric, fields, tags) + + return nil +} + +func (p *Pgbouncer) ignoreDatabase(db string) bool { + if len(p.Databases) == 0 { + return false + } + + for _, dbName := range p.Databases { + if db == dbName { + return false + } + } + return true +} + +func init() { + inputs.Add("pgbouncer", func() telegraf.Input { + return &Pgbouncer{} + }) +} diff --git a/plugins/inputs/pgbouncer/pgbouncer_test.go b/plugins/inputs/pgbouncer/pgbouncer_test.go new file mode 100644 index 000000000..d7d244633 --- /dev/null +++ b/plugins/inputs/pgbouncer/pgbouncer_test.go @@ -0,0 +1,180 @@ +package pgbouncer + +import ( + "fmt" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPgbouncerGeneratesMetrics(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &Pgbouncer{ + Address: fmt.Sprintf("host=%s port=6432 user=postgres dbname=pgbouncer sslmode=disable", + testutil.GetLocalHost()), + Databases: []string{"pgbouncer"}, + } + + var acc testutil.Accumulator + err := p.Gather(&acc) + require.NoError(t, err) + + availableColumns := make(map[string]bool) + for _, col := range p.AllColumns { + availableColumns[col] = true + } + poolMetrics := []string{ + "cl_active", + "cl_waiting", + "maxwait", + "pool_mode", + "sv_active", + "sv_idle", + "sv_login", + "sv_tested", + "sv_used", + } + + statMetrics := []string{ + "avg_query", + "avg_recv", + "avg_req", + "avg_sent", + "total_query_time", + "total_received", + "total_requests", + "total_sent", + } + + metricsCounted := 0 + + for _, metric := range poolMetrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasIntField("pgbouncer_pools", metric)) + metricsCounted++ + } + } + + for _, metric := range statMetrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasIntField("pgbouncer_stats", metric)) + metricsCounted++ + } + } + + assert.True(t, metricsCounted > 0) + // assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted) +} + +func TestPgbouncerTagsMetricsWithDatabaseName(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &Pgbouncer{ + Address: fmt.Sprintf("host=%s port=6432 user=postgres dbname=pgbouncer sslmode=disable", + testutil.GetLocalHost()), + Databases: []string{"pgbouncer"}, + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + point, ok := acc.Get("pgbouncer_pools") + require.True(t, ok) + + assert.Equal(t, "pgbouncer", point.Tags["db"]) + + point, ok = acc.Get("pgbouncer_stats") + require.True(t, ok) + + assert.Equal(t, "pgbouncer", point.Tags["db"]) +} + +func TestPgbouncerTagsMetricsWithSpecifiedDatabaseName(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &Pgbouncer{ + Address: fmt.Sprintf("host=%s port=6432 user=postgres dbname=pgbouncer sslmode=disable", + testutil.GetLocalHost()), + Databases: []string{"foo"}, + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + _, ok := acc.Get("pgbouncer_pools") + require.False(t, ok) + + _, ok = acc.Get("pgbouncer_stats") + require.False(t, ok) +} + +func TestPgbouncerDefaultsToAllDatabases(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &Pgbouncer{ + Address: fmt.Sprintf("host=%s port=6432 user=postgres dbname=pgbouncer sslmode=disable", + testutil.GetLocalHost()), + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + var found bool + + for _, pnt := range acc.Metrics { + if pnt.Measurement == "pgbouncer_pools" { + if pnt.Tags["db"] == "pgbouncer" { + found = true + break + } + } + + if pnt.Measurement == "pgbouncer_stats" { + if pnt.Tags["db"] == "pgbouncer" { + found = true + break + } + } + } + + assert.True(t, found) +} + +func TestPgbouncerIgnoresUnwantedColumns(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &Pgbouncer{ + Address: fmt.Sprintf("host=%s port=6432 user=postgres dbname=pgbouncer sslmode=disable", + testutil.GetLocalHost()), + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + for col := range p.IgnoredColumns() { + assert.False(t, acc.HasMeasurement(col)) + } +}