Add postgresql_extensible plugin

This commit is contained in:
Thomas Menard 2016-03-04 15:22:26 +01:00
parent b4b1866286
commit 3ef77715b5
4 changed files with 458 additions and 0 deletions

View File

@ -33,6 +33,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdata/telegraf/plugins/inputs/phpfpm"
_ "github.com/influxdata/telegraf/plugins/inputs/ping" _ "github.com/influxdata/telegraf/plugins/inputs/ping"
_ "github.com/influxdata/telegraf/plugins/inputs/postgresql" _ "github.com/influxdata/telegraf/plugins/inputs/postgresql"
_ "github.com/influxdata/telegraf/plugins/inputs/postgresql_extensible"
_ "github.com/influxdata/telegraf/plugins/inputs/powerdns" _ "github.com/influxdata/telegraf/plugins/inputs/powerdns"
_ "github.com/influxdata/telegraf/plugins/inputs/procstat" _ "github.com/influxdata/telegraf/plugins/inputs/procstat"
_ "github.com/influxdata/telegraf/plugins/inputs/prometheus" _ "github.com/influxdata/telegraf/plugins/inputs/prometheus"

View File

@ -0,0 +1,71 @@
# PostgreSQL plugin
This postgresql plugin provides metrics for your postgres database. It has been designed to parse a sql query json file with some parameters.
For now the plugin only support one postgresql instance, the plan is to be able to extend easily your postgres monitoring.
View to create :
-- View: public.sessions
DROP VIEW public.sessions;
CREATE OR REPLACE VIEW public.sessions AS
WITH proctab AS (
SELECT pg_proctab.pid,
CASE
WHEN pg_proctab.state::text = 'R'::bpchar THEN 'running'::text
WHEN pg_proctab.state::text = 'D'::bpchar THEN 'sleep-io'::text
WHEN pg_proctab.state::text = 'S'::bpchar THEN 'sleep-waiting'::text
WHEN pg_proctab.state::text = 'Z'::bpchar THEN 'zombie'::text
WHEN pg_proctab.state::text = 'T'::bpchar THEN 'stopped'::text
ELSE NULL::text
END AS proc_state,
pg_proctab.ppid,
pg_proctab.utime,
pg_proctab.stime,
pg_proctab.vsize,
pg_proctab.rss,
pg_proctab.processor,
pg_proctab.rchar,
pg_proctab.wchar,
pg_proctab.syscr,
pg_proctab.syscw,
pg_proctab.reads,
pg_proctab.writes,
pg_proctab.cwrites
FROM pg_proctab() pg_proctab
), stat_activity AS (
SELECT pg_stat_activity.datname,
pg_stat_activity.pid,
pg_stat_activity.usename,
CASE
WHEN pg_stat_activity.query IS NULL THEN 'no query'::text
ELSE regexp_replace(pg_stat_activity.query, '[\n\r]+'::text, ' '::text, 'g'::text)
END AS query
FROM pg_stat_activity
)
SELECT ('"'::text || stat.datname::text) || '"'::text AS db,
('"'::text || stat.usename::text) || '"'::text as username,
stat.pid AS pid,
('"'::text || proc.proc_state ::text) || '"'::text AS state,
('"'::text || stat.query::text) || '"'::text AS query,
proc.utime AS session_usertime,
proc.stime AS session_systemtime,
proc.vsize AS session_virtual_memory_size,
proc.rss AS session_resident_memory_size,
proc.processor AS session_processor_number,
proc.rchar AS session_bytes_read,
proc.wchar AS session_bytes_written,
proc.syscr AS session_read_io,
proc.syscw AS session_write_io,
proc.reads AS session_physical_reads,
proc.writes AS session_physical_writes,
proc.cwrites AS session_cancel_writes
FROM proctab proc,
stat_activity stat
WHERE proc.pid = stat.pid;
ALTER TABLE public.sessions
OWNER TO postgres;

View File

@ -0,0 +1,234 @@
package postgresql_extensible
import (
"bytes"
"database/sql"
"fmt"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/lib/pq"
)
type Postgresql struct {
Address string
Databases []string
OrderedColumns []string
AllColumns []string
AdditionalTags []string
Query []struct {
Sqlquery string
Version int
Withdbname string
Tagvalue string
}
}
var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true}
var sampleConfig = `
# specify address via a url matching:
# postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
# or a simple string:
# host=localhost user=pqotest password=... sslmode=... dbname=app_production
#
# All connection parameters are optional. #
# Without the dbname parameter, the driver will default to a database
# with the same name as the user. This dbname is just for instantiating a
# connection with the server and doesn't restrict the databases we are trying
# to grab metrics for.
#
address = "host=localhost user=postgres sslmode=disable"
# A list of databases to pull metrics about. If not specified, metrics for all
# databases are gathered.
# databases = ["app_production", "testing"]
#
# Define the toml config where the sql queries are stored
# New queries can be added, if the withdbname is set to true and there is no databases defined
# in the 'databases field', the sql query is ended by a 'is not null' in order to make the query
# succeed.
# the tagvalue field is used to define custom tags (separated by comas)
#
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_database where datname"
version=901
withdbname="true"
tagvalue=""
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_bgwriter"
version=901
withdbname="false"
tagvalue=""
`
func (p *Postgresql) SampleConfig() string {
return sampleConfig
}
func (p *Postgresql) Description() string {
return "Read metrics from one or many postgresql servers"
}
func (p *Postgresql) IgnoredColumns() map[string]bool {
return ignoredColumns
}
var localhost = "host=localhost sslmode=disable"
func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
var sql_query string
var query_version int
var query_with_dbname string
var query_addon string
var db_version int
var query string
var tag_value string
if p.Address == "" || p.Address == "localhost" {
p.Address = localhost
}
db, err := sql.Open("postgres", p.Address)
if err != nil {
return err
}
defer db.Close()
// Retreiving the database version
query = `select substring(setting from 1 for 3) as version from pg_settings where name='server_version_num'`
err = db.QueryRow(query).Scan(&db_version)
if err != nil {
return err
}
// We loop in order to process each query
// Query is not run if Database version does not match the query version.
for i := range p.Query {
sql_query = p.Query[i].Sqlquery
query_version = p.Query[i].Version
query_with_dbname = p.Query[i].Withdbname
tag_value = p.Query[i].Tagvalue
if query_with_dbname == "true" {
if len(p.Databases) != 0 {
query_addon = fmt.Sprintf(` IN ('%s')`,
strings.Join(p.Databases, "','"))
} else {
query_addon = " is not null"
}
} else {
query_addon = ""
}
sql_query += query_addon
if query_version <= db_version {
rows, err := db.Query(sql_query)
if err != nil {
return err
}
defer rows.Close()
// grab the column information from the result
p.OrderedColumns, err = rows.Columns()
if err != nil {
return err
} else {
for _, v := range p.OrderedColumns {
p.AllColumns = append(p.AllColumns, v)
}
}
p.AdditionalTags = nil
if tag_value != "" {
tag_list := strings.Split(tag_value, ",")
for t := range tag_list {
p.AdditionalTags = append(p.AdditionalTags, tag_list[t])
}
}
for rows.Next() {
err = p.accRow(rows, acc)
if err != nil {
return err
}
}
}
}
return nil
}
type scanner interface {
Scan(dest ...interface{}) error
}
func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error {
var columnVars []interface{}
var dbname bytes.Buffer
// this is where we'll store the column name with its *interface{}
columnMap := make(map[string]*interface{})
for _, column := range p.OrderedColumns {
columnMap[column] = new(interface{})
}
// populate the array of interface{} with the pointers in the right order
for i := 0; i < len(columnMap); i++ {
columnVars = append(columnVars, columnMap[p.OrderedColumns[i]])
}
// deconstruct array of variables and send to Scan
err := row.Scan(columnVars...)
if err != nil {
return err
}
if columnMap["datname"] != nil {
// extract the database name from the column map
dbnameChars := (*columnMap["datname"]).([]uint8)
for i := 0; i < len(dbnameChars); i++ {
dbname.WriteString(string(dbnameChars[i]))
}
} else {
dbname.WriteString("postgres")
}
// Process the additional tags
tags := map[string]string{}
tags["server"] = p.Address
tags["db"] = dbname.String()
fields := make(map[string]interface{})
for col, val := range columnMap {
_, ignore := ignoredColumns[col]
//if !ignore && *val != "" {
if !ignore {
for tag := range p.AdditionalTags {
if col == p.AdditionalTags[tag] {
value_type_p := fmt.Sprintf(`%T`, *val)
if value_type_p == "[]uint8" {
tags[col] = fmt.Sprintf(`%s`, *val)
} else if value_type_p == "int64" {
tags[col] = fmt.Sprintf(`%v`, *val)
}
}
}
fields[col] = *val
}
}
acc.AddFields("postgresql", fields, tags)
return nil
}
func init() {
inputs.Add("postgresql_extensible", func() telegraf.Input {
return &Postgresql{}
})
}

View File

@ -0,0 +1,152 @@
package postgresql
import (
"fmt"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPostgresqlGeneratesMetrics(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p := &Postgresql{
Address: fmt.Sprintf("host=%s user=postgres sslmode=disable",
testutil.GetLocalHost()),
Databases: []string{"postgres"},
}
var acc testutil.Accumulator
err := p.Gather(&acc)
require.NoError(t, err)
fmt.Printf(p.Databases)
availableColumns := make(map[string]bool)
for _, col := range p.AllColumns {
availableColumns[col] = true
}
intMetrics := []string{
"xact_commit",
"xact_rollback",
"blks_read",
"blks_hit",
"tup_returned",
"tup_fetched",
"tup_inserted",
"tup_updated",
"tup_deleted",
"conflicts",
"temp_files",
"temp_bytes",
"deadlocks",
"numbackends",
"buffers_alloc",
"buffers_backend",
"buffers_backend_fsync",
"buffers_checkpoint",
"buffers_clean",
"checkpoints_req",
"checkpoints_timed",
"maxwritten_clean",
}
floatMetrics := []string{
"blk_read_time",
"blk_write_time",
}
metricsCounted := 0
for _, metric := range intMetrics {
_, ok := availableColumns[metric]
if ok {
assert.True(t, acc.HasIntField("postgresql", metric))
metricsCounted++
}
}
for _, metric := range floatMetrics {
_, ok := availableColumns[metric]
if ok {
assert.True(t, acc.HasFloatField("postgresql", metric))
metricsCounted++
}
}
assert.True(t, metricsCounted > 0)
//assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted)
}
func TestPostgresqlTagsMetricsWithDatabaseName(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p := &Postgresql{
Address: fmt.Sprintf("host=%s user=postgres sslmode=disable",
testutil.GetLocalHost()),
Databases: []string{"postgres"},
}
var acc testutil.Accumulator
err := p.Gather(&acc)
require.NoError(t, err)
point, ok := acc.Get("postgresql")
require.True(t, ok)
assert.Equal(t, "postgres", point.Tags["db"])
}
func TestPostgresqlDefaultsToAllDatabases(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p := &Postgresql{
Address: fmt.Sprintf("host=%s user=postgres sslmode=disable",
testutil.GetLocalHost()),
}
var acc testutil.Accumulator
err := p.Gather(&acc)
require.NoError(t, err)
var found bool
for _, pnt := range acc.Metrics {
if pnt.Measurement == "postgresql" {
if pnt.Tags["db"] == "postgres" {
found = true
break
}
}
}
assert.True(t, found)
}
func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p := &Postgresql{
Address: fmt.Sprintf("host=%s user=postgres sslmode=disable",
testutil.GetLocalHost()),
}
var acc testutil.Accumulator
err := p.Gather(&acc)
require.NoError(t, err)
for col := range p.IgnoredColumns() {
assert.False(t, acc.HasMeasurement(col))
}
}