From 3ef77715b595790512ed91dc07b72705c5eac98c Mon Sep 17 00:00:00 2001 From: Thomas Menard Date: Fri, 4 Mar 2016 15:22:26 +0100 Subject: [PATCH] Add postgresql_extensible plugin --- plugins/inputs/all/all.go | 1 + .../inputs/postgresql_extensible/README.md | 71 ++++++ .../postgresql_extensible.go | 234 ++++++++++++++++++ .../postgresql_extensible_test.go | 152 ++++++++++++ 4 files changed, 458 insertions(+) create mode 100644 plugins/inputs/postgresql_extensible/README.md create mode 100644 plugins/inputs/postgresql_extensible/postgresql_extensible.go create mode 100644 plugins/inputs/postgresql_extensible/postgresql_extensible_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 262de37ac..d647872e6 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -33,6 +33,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdata/telegraf/plugins/inputs/ping" _ "github.com/influxdata/telegraf/plugins/inputs/postgresql" + _ "github.com/influxdata/telegraf/plugins/inputs/postgresql_extensible" _ "github.com/influxdata/telegraf/plugins/inputs/powerdns" _ "github.com/influxdata/telegraf/plugins/inputs/procstat" _ "github.com/influxdata/telegraf/plugins/inputs/prometheus" diff --git a/plugins/inputs/postgresql_extensible/README.md b/plugins/inputs/postgresql_extensible/README.md new file mode 100644 index 000000000..e591877d0 --- /dev/null +++ b/plugins/inputs/postgresql_extensible/README.md @@ -0,0 +1,71 @@ +# PostgreSQL plugin + +This postgresql plugin provides metrics for your postgres database. It has been designed to parse a sql query json file with some parameters. + +For now the plugin only support one postgresql instance, the plan is to be able to extend easily your postgres monitoring. + + + +View to create : +-- View: public.sessions + +DROP VIEW public.sessions; + +CREATE OR REPLACE VIEW public.sessions AS + WITH proctab AS ( + SELECT pg_proctab.pid, + CASE + WHEN pg_proctab.state::text = 'R'::bpchar THEN 'running'::text + WHEN pg_proctab.state::text = 'D'::bpchar THEN 'sleep-io'::text + WHEN pg_proctab.state::text = 'S'::bpchar THEN 'sleep-waiting'::text + WHEN pg_proctab.state::text = 'Z'::bpchar THEN 'zombie'::text + WHEN pg_proctab.state::text = 'T'::bpchar THEN 'stopped'::text + ELSE NULL::text + END AS proc_state, + pg_proctab.ppid, + pg_proctab.utime, + pg_proctab.stime, + pg_proctab.vsize, + pg_proctab.rss, + pg_proctab.processor, + pg_proctab.rchar, + pg_proctab.wchar, + pg_proctab.syscr, + pg_proctab.syscw, + pg_proctab.reads, + pg_proctab.writes, + pg_proctab.cwrites + FROM pg_proctab() pg_proctab + ), stat_activity AS ( + SELECT pg_stat_activity.datname, + pg_stat_activity.pid, + pg_stat_activity.usename, + CASE + WHEN pg_stat_activity.query IS NULL THEN 'no query'::text + ELSE regexp_replace(pg_stat_activity.query, '[\n\r]+'::text, ' '::text, 'g'::text) + END AS query + FROM pg_stat_activity + ) + SELECT ('"'::text || stat.datname::text) || '"'::text AS db, + ('"'::text || stat.usename::text) || '"'::text as username, + stat.pid AS pid, + ('"'::text || proc.proc_state ::text) || '"'::text AS state, + ('"'::text || stat.query::text) || '"'::text AS query, + proc.utime AS session_usertime, + proc.stime AS session_systemtime, + proc.vsize AS session_virtual_memory_size, + proc.rss AS session_resident_memory_size, + proc.processor AS session_processor_number, + proc.rchar AS session_bytes_read, + proc.wchar AS session_bytes_written, + proc.syscr AS session_read_io, + proc.syscw AS session_write_io, + proc.reads AS session_physical_reads, + proc.writes AS session_physical_writes, + proc.cwrites AS session_cancel_writes + FROM proctab proc, + stat_activity stat + WHERE proc.pid = stat.pid; + +ALTER TABLE public.sessions + OWNER TO postgres; diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go new file mode 100644 index 000000000..bc30329cd --- /dev/null +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go @@ -0,0 +1,234 @@ +package postgresql_extensible + +import ( + "bytes" + "database/sql" + "fmt" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + + _ "github.com/lib/pq" +) + +type Postgresql struct { + Address string + Databases []string + OrderedColumns []string + AllColumns []string + AdditionalTags []string + Query []struct { + Sqlquery string + Version int + Withdbname string + Tagvalue string + } +} + +var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true} + +var sampleConfig = ` + # specify address via a url matching: + # postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full] + # or a simple string: + # host=localhost user=pqotest password=... sslmode=... dbname=app_production + # + # All connection parameters are optional. # + # Without the dbname parameter, the driver will default to a database + # with the same name as the user. This dbname is just for instantiating a + # connection with the server and doesn't restrict the databases we are trying + # to grab metrics for. + # + address = "host=localhost user=postgres sslmode=disable" + # A list of databases to pull metrics about. If not specified, metrics for all + # databases are gathered. + # databases = ["app_production", "testing"] + # + # Define the toml config where the sql queries are stored + # New queries can be added, if the withdbname is set to true and there is no databases defined + # in the 'databases field', the sql query is ended by a 'is not null' in order to make the query + # succeed. + # the tagvalue field is used to define custom tags (separated by comas) + # + [[inputs.postgresql_extensible.query]] + sqlquery="SELECT * FROM pg_stat_database where datname" + version=901 + withdbname="true" + tagvalue="" + [[inputs.postgresql_extensible.query]] + sqlquery="SELECT * FROM pg_stat_bgwriter" + version=901 + withdbname="false" + tagvalue="" +` + +func (p *Postgresql) SampleConfig() string { + return sampleConfig +} + +func (p *Postgresql) Description() string { + return "Read metrics from one or many postgresql servers" +} + +func (p *Postgresql) IgnoredColumns() map[string]bool { + return ignoredColumns +} + +var localhost = "host=localhost sslmode=disable" + +func (p *Postgresql) Gather(acc telegraf.Accumulator) error { + + var sql_query string + var query_version int + var query_with_dbname string + var query_addon string + var db_version int + var query string + var tag_value string + + if p.Address == "" || p.Address == "localhost" { + p.Address = localhost + } + + db, err := sql.Open("postgres", p.Address) + if err != nil { + return err + } + + defer db.Close() + + // Retreiving the database version + + query = `select substring(setting from 1 for 3) as version from pg_settings where name='server_version_num'` + err = db.QueryRow(query).Scan(&db_version) + if err != nil { + return err + } + + // We loop in order to process each query + // Query is not run if Database version does not match the query version. + + for i := range p.Query { + sql_query = p.Query[i].Sqlquery + query_version = p.Query[i].Version + query_with_dbname = p.Query[i].Withdbname + tag_value = p.Query[i].Tagvalue + + if query_with_dbname == "true" { + if len(p.Databases) != 0 { + query_addon = fmt.Sprintf(` IN ('%s')`, + strings.Join(p.Databases, "','")) + } else { + query_addon = " is not null" + } + } else { + query_addon = "" + } + sql_query += query_addon + + if query_version <= db_version { + rows, err := db.Query(sql_query) + if err != nil { + return err + } + + defer rows.Close() + + // grab the column information from the result + p.OrderedColumns, err = rows.Columns() + if err != nil { + return err + } else { + for _, v := range p.OrderedColumns { + p.AllColumns = append(p.AllColumns, v) + } + } + p.AdditionalTags = nil + if tag_value != "" { + tag_list := strings.Split(tag_value, ",") + for t := range tag_list { + p.AdditionalTags = append(p.AdditionalTags, tag_list[t]) + } + } + + for rows.Next() { + err = p.accRow(rows, acc) + if err != nil { + return err + } + } + } + } + return nil +} + +type scanner interface { + Scan(dest ...interface{}) error +} + +func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error { + var columnVars []interface{} + var dbname bytes.Buffer + + // this is where we'll store the column name with its *interface{} + columnMap := make(map[string]*interface{}) + + for _, column := range p.OrderedColumns { + columnMap[column] = new(interface{}) + } + + // populate the array of interface{} with the pointers in the right order + for i := 0; i < len(columnMap); i++ { + columnVars = append(columnVars, columnMap[p.OrderedColumns[i]]) + } + + // deconstruct array of variables and send to Scan + err := row.Scan(columnVars...) + + if err != nil { + return err + } + if columnMap["datname"] != nil { + // extract the database name from the column map + dbnameChars := (*columnMap["datname"]).([]uint8) + for i := 0; i < len(dbnameChars); i++ { + dbname.WriteString(string(dbnameChars[i])) + } + } else { + dbname.WriteString("postgres") + } + + // Process the additional tags + + tags := map[string]string{} + tags["server"] = p.Address + tags["db"] = dbname.String() + + fields := make(map[string]interface{}) + for col, val := range columnMap { + _, ignore := ignoredColumns[col] + //if !ignore && *val != "" { + if !ignore { + for tag := range p.AdditionalTags { + if col == p.AdditionalTags[tag] { + value_type_p := fmt.Sprintf(`%T`, *val) + if value_type_p == "[]uint8" { + tags[col] = fmt.Sprintf(`%s`, *val) + } else if value_type_p == "int64" { + tags[col] = fmt.Sprintf(`%v`, *val) + } + } + } + fields[col] = *val + } + } + acc.AddFields("postgresql", fields, tags) + return nil +} + +func init() { + inputs.Add("postgresql_extensible", func() telegraf.Input { + return &Postgresql{} + }) +} diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go new file mode 100644 index 000000000..1ec4d7bea --- /dev/null +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go @@ -0,0 +1,152 @@ +package postgresql + +import ( + "fmt" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPostgresqlGeneratesMetrics(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &Postgresql{ + Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", + testutil.GetLocalHost()), + Databases: []string{"postgres"}, + } + + var acc testutil.Accumulator + err := p.Gather(&acc) + require.NoError(t, err) + fmt.Printf(p.Databases) + availableColumns := make(map[string]bool) + for _, col := range p.AllColumns { + availableColumns[col] = true + } + intMetrics := []string{ + "xact_commit", + "xact_rollback", + "blks_read", + "blks_hit", + "tup_returned", + "tup_fetched", + "tup_inserted", + "tup_updated", + "tup_deleted", + "conflicts", + "temp_files", + "temp_bytes", + "deadlocks", + "numbackends", + "buffers_alloc", + "buffers_backend", + "buffers_backend_fsync", + "buffers_checkpoint", + "buffers_clean", + "checkpoints_req", + "checkpoints_timed", + "maxwritten_clean", + } + + floatMetrics := []string{ + "blk_read_time", + "blk_write_time", + } + + metricsCounted := 0 + + for _, metric := range intMetrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasIntField("postgresql", metric)) + metricsCounted++ + } + } + + for _, metric := range floatMetrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasFloatField("postgresql", metric)) + metricsCounted++ + } + } + + assert.True(t, metricsCounted > 0) + //assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted) +} + +func TestPostgresqlTagsMetricsWithDatabaseName(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &Postgresql{ + Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", + testutil.GetLocalHost()), + Databases: []string{"postgres"}, + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + point, ok := acc.Get("postgresql") + require.True(t, ok) + + assert.Equal(t, "postgres", point.Tags["db"]) +} + +func TestPostgresqlDefaultsToAllDatabases(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &Postgresql{ + Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", + testutil.GetLocalHost()), + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + var found bool + + for _, pnt := range acc.Metrics { + if pnt.Measurement == "postgresql" { + if pnt.Tags["db"] == "postgres" { + found = true + break + } + } + } + + assert.True(t, found) +} + +func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &Postgresql{ + Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", + testutil.GetLocalHost()), + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + for col := range p.IgnoredColumns() { + assert.False(t, acc.HasMeasurement(col)) + } +}