Add the postgresql_extensible plugin

This plugin is intended to add an extended support of Postgresql
compared to the legacy postgres plugin.

Basically, the plugin don’t have any metrics defined and it’s up to the
user to define it in the telegraph.conf (as a toml structure).

Each query can have it’s specific tags, and can be written specifically
using a where clause in order to eventually filter per database name.

To be more generic, a minimum postgresql version  has been defined per
query in case you have 2 different version of Postgresql running on the
same host.
This commit is contained in:
Thomas Menard 2016-03-14 10:27:07 +01:00 committed by Cameron Sparr
parent 18f4afb388
commit b371ec5cf6
4 changed files with 429 additions and 0 deletions

View File

@ -34,6 +34,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdata/telegraf/plugins/inputs/phpfpm"
_ "github.com/influxdata/telegraf/plugins/inputs/ping" _ "github.com/influxdata/telegraf/plugins/inputs/ping"
_ "github.com/influxdata/telegraf/plugins/inputs/postgresql" _ "github.com/influxdata/telegraf/plugins/inputs/postgresql"
_ "github.com/influxdata/telegraf/plugins/inputs/postgresql_extensible"
_ "github.com/influxdata/telegraf/plugins/inputs/powerdns" _ "github.com/influxdata/telegraf/plugins/inputs/powerdns"
_ "github.com/influxdata/telegraf/plugins/inputs/procstat" _ "github.com/influxdata/telegraf/plugins/inputs/procstat"
_ "github.com/influxdata/telegraf/plugins/inputs/prometheus" _ "github.com/influxdata/telegraf/plugins/inputs/prometheus"

View File

@ -0,0 +1,59 @@
# PostgreSQL plugin
This postgresql plugin provides metrics for your postgres database. It has been designed to parse ithe sql queries in the plugin section of your telegraf.conf.
For now only two queries are specified and it's up to you to add more; some per query parameters have been added :
* The SQl query itself
* The minimum version supported (here in numeric display visible in pg_settings)
* A boolean to define if the query have to be run against some specific variables (defined in the databaes variable of the plugin section)
* The list of the column that have to be defined has tags
```
# specify address via a url matching:
# postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
# or a simple string:
# host=localhost user=pqotest password=... sslmode=... dbname=app_production
#
# All connection parameters are optional. #
# Without the dbname parameter, the driver will default to a database
# with the same name as the user. This dbname is just for instantiating a
# connection with the server and doesn't restrict the databases we are trying
# to grab metrics for.
#
address = "host=localhost user=postgres sslmode=disable"
# A list of databases to pull metrics about. If not specified, metrics for all
# databases are gathered.
# databases = ["app_production", "testing"]
#
# Define the toml config where the sql queries are stored
# New queries can be added, if the withdbname is set to true and there is no databases defined
# in the 'databases field', the sql query is ended by a 'is not null' in order to make the query
# succeed.
# Be careful that the sqlquery must contain the where clause with a part of the filtering, the plugin will
# add a 'IN (dbname list)' clause if the withdbname is set to true
# Example :
# The sqlquery : "SELECT * FROM pg_stat_database where datname" become "SELECT * FROM pg_stat_database where datname IN ('postgres', 'pgbench')"
# because the databases variable was set to ['postgres', 'pgbench' ] and the withdbname was true.
# Be careful that if the withdbname is set to false you d'ont have to define the where clause (aka with the dbname)
# the tagvalue field is used to define custom tags (separated by comas)
#
# Structure :
# [[inputs.postgresql_extensible.query]]
# sqlquery string
# version string
# withdbname boolean
# tagvalue string (coma separated)
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_database where datname"
version=901
withdbname=false
tagvalue=""
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_bgwriter"
version=901
withdbname=false
tagvalue=""
```
The system can be easily extended using homemade metrics collection tools or using postgreql extensions ([pg_stat_statements](http://www.postgresql.org/docs/current/static/pgstatstatements.html), [pg_proctab](https://github.com/markwkm/pg_proctab), [powa](http://dalibo.github.io/powa/)...)

View File

@ -0,0 +1,271 @@
package postgresql_extensible
import (
"bytes"
"database/sql"
"fmt"
"regexp"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/lib/pq"
)
type Postgresql struct {
Address string
Databases []string
OrderedColumns []string
AllColumns []string
AdditionalTags []string
sanitizedAddress string
Query []struct {
Sqlquery string
Version int
Withdbname bool
Tagvalue string
}
}
type query []struct {
Sqlquery string
Version int
Withdbname bool
Tagvalue string
}
var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true}
var sampleConfig = `
# specify address via a url matching:
# postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
# or a simple string:
# host=localhost user=pqotest password=... sslmode=... dbname=app_production
#
# All connection parameters are optional. #
# Without the dbname parameter, the driver will default to a database
# with the same name as the user. This dbname is just for instantiating a
# connection with the server and doesn't restrict the databases we are trying
# to grab metrics for.
#
address = "host=localhost user=postgres sslmode=disable"
# A list of databases to pull metrics about. If not specified, metrics for all
# databases are gathered.
# databases = ["app_production", "testing"]
#
# Define the toml config where the sql queries are stored
# New queries can be added, if the withdbname is set to true and there is no databases defined
# in the 'databases field', the sql query is ended by a 'is not null' in order to make the query
# succeed.
# Example :
# The sqlquery : "SELECT * FROM pg_stat_database where datname" become "SELECT * FROM pg_stat_database where datname IN ('postgres', 'pgbench')"
# because the databases variable was set to ['postgres', 'pgbench' ] and the withdbname was true.
# Be careful that if the withdbname is set to false you d'ont have to define the where clause (aka with the dbname)
# the tagvalue field is used to define custom tags (separated by comas)
#
# Structure :
# [[inputs.postgresql_extensible.query]]
# sqlquery string
# version string
# withdbname boolean
# tagvalue string (coma separated)
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_database"
version=901
withdbname=false
tagvalue=""
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_bgwriter"
version=901
withdbname=false
tagvalue=""
`
func (p *Postgresql) SampleConfig() string {
return sampleConfig
}
func (p *Postgresql) Description() string {
return "Read metrics from one or many postgresql servers"
}
func (p *Postgresql) IgnoredColumns() map[string]bool {
return ignoredColumns
}
var localhost = "host=localhost sslmode=disable"
func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
var sql_query string
var query_addon string
var db_version int
var query string
var tag_value string
if p.Address == "" || p.Address == "localhost" {
p.Address = localhost
}
db, err := sql.Open("postgres", p.Address)
if err != nil {
return err
}
defer db.Close()
// Retreiving the database version
query = `select substring(setting from 1 for 3) as version from pg_settings where name='server_version_num'`
err = db.QueryRow(query).Scan(&db_version)
if err != nil {
return err
}
// We loop in order to process each query
// Query is not run if Database version does not match the query version.
for i := range p.Query {
sql_query = p.Query[i].Sqlquery
tag_value = p.Query[i].Tagvalue
if p.Query[i].Withdbname {
if len(p.Databases) != 0 {
query_addon = fmt.Sprintf(` IN ('%s')`,
strings.Join(p.Databases, "','"))
} else {
query_addon = " is not null"
}
} else {
query_addon = ""
}
sql_query += query_addon
if p.Query[i].Version <= db_version {
rows, err := db.Query(sql_query)
if err != nil {
return err
}
defer rows.Close()
// grab the column information from the result
p.OrderedColumns, err = rows.Columns()
if err != nil {
return err
} else {
for _, v := range p.OrderedColumns {
p.AllColumns = append(p.AllColumns, v)
}
}
p.AdditionalTags = nil
if tag_value != "" {
tag_list := strings.Split(tag_value, ",")
for t := range tag_list {
p.AdditionalTags = append(p.AdditionalTags, tag_list[t])
}
}
for rows.Next() {
err = p.accRow(rows, acc)
if err != nil {
return err
}
}
}
}
return nil
}
type scanner interface {
Scan(dest ...interface{}) error
}
var passwordKVMatcher, _ = regexp.Compile("password=\\S+ ?")
func (p *Postgresql) SanitizedAddress() (_ string, err error) {
var canonicalizedAddress string
if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") {
canonicalizedAddress, err = pq.ParseURL(p.Address)
if err != nil {
return p.sanitizedAddress, err
}
} else {
canonicalizedAddress = p.Address
}
p.sanitizedAddress = passwordKVMatcher.ReplaceAllString(canonicalizedAddress, "")
return p.sanitizedAddress, err
}
func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error {
var columnVars []interface{}
var dbname bytes.Buffer
// this is where we'll store the column name with its *interface{}
columnMap := make(map[string]*interface{})
for _, column := range p.OrderedColumns {
columnMap[column] = new(interface{})
}
// populate the array of interface{} with the pointers in the right order
for i := 0; i < len(columnMap); i++ {
columnVars = append(columnVars, columnMap[p.OrderedColumns[i]])
}
// deconstruct array of variables and send to Scan
err := row.Scan(columnVars...)
if err != nil {
return err
}
if columnMap["datname"] != nil {
// extract the database name from the column map
dbnameChars := (*columnMap["datname"]).([]uint8)
for i := 0; i < len(dbnameChars); i++ {
dbname.WriteString(string(dbnameChars[i]))
}
} else {
dbname.WriteString("postgres")
}
var tagAddress string
tagAddress, err = p.SanitizedAddress()
if err != nil {
return err
}
// Process the additional tags
tags := map[string]string{}
tags["server"] = tagAddress
tags["db"] = dbname.String()
fields := make(map[string]interface{})
for col, val := range columnMap {
_, ignore := ignoredColumns[col]
//if !ignore && *val != "" {
if !ignore {
for tag := range p.AdditionalTags {
if col == p.AdditionalTags[tag] {
value_type_p := fmt.Sprintf(`%T`, *val)
if value_type_p == "[]uint8" {
tags[col] = fmt.Sprintf(`%s`, *val)
} else if value_type_p == "int64" {
tags[col] = fmt.Sprintf(`%v`, *val)
}
}
}
fields[col] = *val
}
}
acc.AddFields("postgresql", fields, tags)
return nil
}
func init() {
inputs.Add("postgresql_extensible", func() telegraf.Input {
return &Postgresql{}
})
}

View File

@ -0,0 +1,98 @@
package postgresql_extensible
import (
"fmt"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPostgresqlGeneratesMetrics(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p := &Postgresql{
Address: fmt.Sprintf("host=%s user=postgres sslmode=disable",
testutil.GetLocalHost()),
Databases: []string{"postgres"},
Query: query{
{Sqlquery: "select * from pg_stat_database",
Version: 901,
Withdbname: false,
Tagvalue: ""},
},
}
var acc testutil.Accumulator
err := p.Gather(&acc)
require.NoError(t, err)
availableColumns := make(map[string]bool)
for _, col := range p.AllColumns {
availableColumns[col] = true
}
intMetrics := []string{
"xact_commit",
"xact_rollback",
"blks_read",
"blks_hit",
"tup_returned",
"tup_fetched",
"tup_inserted",
"tup_updated",
"tup_deleted",
"conflicts",
"temp_files",
"temp_bytes",
"deadlocks",
"numbackends",
}
floatMetrics := []string{
"blk_read_time",
"blk_write_time",
}
metricsCounted := 0
for _, metric := range intMetrics {
_, ok := availableColumns[metric]
if ok {
assert.True(t, acc.HasIntField("postgresql", metric))
metricsCounted++
}
}
for _, metric := range floatMetrics {
_, ok := availableColumns[metric]
if ok {
assert.True(t, acc.HasFloatField("postgresql", metric))
metricsCounted++
}
}
assert.True(t, metricsCounted > 0)
assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted)
}
func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p := &Postgresql{
Address: fmt.Sprintf("host=%s user=postgres sslmode=disable",
testutil.GetLocalHost()),
}
var acc testutil.Accumulator
err := p.Gather(&acc)
require.NoError(t, err)
for col := range p.IgnoredColumns() {
assert.False(t, acc.HasMeasurement(col))
}
}