From b371ec5cf642df5c20fadf18673b4d45fe215b21 Mon Sep 17 00:00:00 2001 From: Thomas Menard Date: Mon, 14 Mar 2016 10:27:07 +0100 Subject: [PATCH] Add the postgresql_extensible plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This plugin is intended to add an extended support of Postgresql compared to the legacy postgres plugin. Basically, the plugin don’t have any metrics defined and it’s up to the user to define it in the telegraph.conf (as a toml structure). Each query can have it’s specific tags, and can be written specifically using a where clause in order to eventually filter per database name. To be more generic, a minimum postgresql version has been defined per query in case you have 2 different version of Postgresql running on the same host. --- plugins/inputs/all/all.go | 1 + .../inputs/postgresql_extensible/README.md | 59 ++++ .../postgresql_extensible.go | 271 ++++++++++++++++++ .../postgresql_extensible_test.go | 98 +++++++ 4 files changed, 429 insertions(+) create mode 100644 plugins/inputs/postgresql_extensible/README.md create mode 100644 plugins/inputs/postgresql_extensible/postgresql_extensible.go create mode 100644 plugins/inputs/postgresql_extensible/postgresql_extensible_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index a3300df66..db36cfbec 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -34,6 +34,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdata/telegraf/plugins/inputs/ping" _ "github.com/influxdata/telegraf/plugins/inputs/postgresql" + _ "github.com/influxdata/telegraf/plugins/inputs/postgresql_extensible" _ "github.com/influxdata/telegraf/plugins/inputs/powerdns" _ "github.com/influxdata/telegraf/plugins/inputs/procstat" _ "github.com/influxdata/telegraf/plugins/inputs/prometheus" diff --git a/plugins/inputs/postgresql_extensible/README.md b/plugins/inputs/postgresql_extensible/README.md new file mode 100644 index 000000000..f44c66596 --- /dev/null +++ b/plugins/inputs/postgresql_extensible/README.md @@ -0,0 +1,59 @@ +# PostgreSQL plugin + +This postgresql plugin provides metrics for your postgres database. It has been designed to parse ithe sql queries in the plugin section of your telegraf.conf. + +For now only two queries are specified and it's up to you to add more; some per query parameters have been added : + +* The SQl query itself +* The minimum version supported (here in numeric display visible in pg_settings) +* A boolean to define if the query have to be run against some specific variables (defined in the databaes variable of the plugin section) +* The list of the column that have to be defined has tags + +``` + # specify address via a url matching: + # postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full] + # or a simple string: + # host=localhost user=pqotest password=... sslmode=... dbname=app_production + # + # All connection parameters are optional. # + # Without the dbname parameter, the driver will default to a database + # with the same name as the user. This dbname is just for instantiating a + # connection with the server and doesn't restrict the databases we are trying + # to grab metrics for. + # + address = "host=localhost user=postgres sslmode=disable" + # A list of databases to pull metrics about. If not specified, metrics for all + # databases are gathered. + # databases = ["app_production", "testing"] + # + # Define the toml config where the sql queries are stored + # New queries can be added, if the withdbname is set to true and there is no databases defined + # in the 'databases field', the sql query is ended by a 'is not null' in order to make the query + # succeed. + # Be careful that the sqlquery must contain the where clause with a part of the filtering, the plugin will + # add a 'IN (dbname list)' clause if the withdbname is set to true + # Example : + # The sqlquery : "SELECT * FROM pg_stat_database where datname" become "SELECT * FROM pg_stat_database where datname IN ('postgres', 'pgbench')" + # because the databases variable was set to ['postgres', 'pgbench' ] and the withdbname was true. + # Be careful that if the withdbname is set to false you d'ont have to define the where clause (aka with the dbname) + # the tagvalue field is used to define custom tags (separated by comas) + # + # Structure : + # [[inputs.postgresql_extensible.query]] + # sqlquery string + # version string + # withdbname boolean + # tagvalue string (coma separated) + [[inputs.postgresql_extensible.query]] + sqlquery="SELECT * FROM pg_stat_database where datname" + version=901 + withdbname=false + tagvalue="" + [[inputs.postgresql_extensible.query]] + sqlquery="SELECT * FROM pg_stat_bgwriter" + version=901 + withdbname=false + tagvalue="" +``` + +The system can be easily extended using homemade metrics collection tools or using postgreql extensions ([pg_stat_statements](http://www.postgresql.org/docs/current/static/pgstatstatements.html), [pg_proctab](https://github.com/markwkm/pg_proctab), [powa](http://dalibo.github.io/powa/)...) diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go new file mode 100644 index 000000000..44c452e0b --- /dev/null +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go @@ -0,0 +1,271 @@ +package postgresql_extensible + +import ( + "bytes" + "database/sql" + "fmt" + "regexp" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + + "github.com/lib/pq" +) + +type Postgresql struct { + Address string + Databases []string + OrderedColumns []string + AllColumns []string + AdditionalTags []string + sanitizedAddress string + Query []struct { + Sqlquery string + Version int + Withdbname bool + Tagvalue string + } +} + +type query []struct { + Sqlquery string + Version int + Withdbname bool + Tagvalue string +} + +var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true} + +var sampleConfig = ` + # specify address via a url matching: + # postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full] + # or a simple string: + # host=localhost user=pqotest password=... sslmode=... dbname=app_production + # + # All connection parameters are optional. # + # Without the dbname parameter, the driver will default to a database + # with the same name as the user. This dbname is just for instantiating a + # connection with the server and doesn't restrict the databases we are trying + # to grab metrics for. + # + address = "host=localhost user=postgres sslmode=disable" + # A list of databases to pull metrics about. If not specified, metrics for all + # databases are gathered. + # databases = ["app_production", "testing"] + # + # Define the toml config where the sql queries are stored + # New queries can be added, if the withdbname is set to true and there is no databases defined + # in the 'databases field', the sql query is ended by a 'is not null' in order to make the query + # succeed. + # Example : + # The sqlquery : "SELECT * FROM pg_stat_database where datname" become "SELECT * FROM pg_stat_database where datname IN ('postgres', 'pgbench')" + # because the databases variable was set to ['postgres', 'pgbench' ] and the withdbname was true. + # Be careful that if the withdbname is set to false you d'ont have to define the where clause (aka with the dbname) + # the tagvalue field is used to define custom tags (separated by comas) + # + # Structure : + # [[inputs.postgresql_extensible.query]] + # sqlquery string + # version string + # withdbname boolean + # tagvalue string (coma separated) + [[inputs.postgresql_extensible.query]] + sqlquery="SELECT * FROM pg_stat_database" + version=901 + withdbname=false + tagvalue="" + [[inputs.postgresql_extensible.query]] + sqlquery="SELECT * FROM pg_stat_bgwriter" + version=901 + withdbname=false + tagvalue="" +` + +func (p *Postgresql) SampleConfig() string { + return sampleConfig +} + +func (p *Postgresql) Description() string { + return "Read metrics from one or many postgresql servers" +} + +func (p *Postgresql) IgnoredColumns() map[string]bool { + return ignoredColumns +} + +var localhost = "host=localhost sslmode=disable" + +func (p *Postgresql) Gather(acc telegraf.Accumulator) error { + + var sql_query string + var query_addon string + var db_version int + var query string + var tag_value string + + if p.Address == "" || p.Address == "localhost" { + p.Address = localhost + } + + db, err := sql.Open("postgres", p.Address) + if err != nil { + return err + } + + defer db.Close() + + // Retreiving the database version + + query = `select substring(setting from 1 for 3) as version from pg_settings where name='server_version_num'` + err = db.QueryRow(query).Scan(&db_version) + if err != nil { + return err + } + // We loop in order to process each query + // Query is not run if Database version does not match the query version. + + for i := range p.Query { + sql_query = p.Query[i].Sqlquery + tag_value = p.Query[i].Tagvalue + + if p.Query[i].Withdbname { + if len(p.Databases) != 0 { + query_addon = fmt.Sprintf(` IN ('%s')`, + strings.Join(p.Databases, "','")) + } else { + query_addon = " is not null" + } + } else { + query_addon = "" + } + sql_query += query_addon + + if p.Query[i].Version <= db_version { + rows, err := db.Query(sql_query) + if err != nil { + return err + } + + defer rows.Close() + + // grab the column information from the result + p.OrderedColumns, err = rows.Columns() + if err != nil { + return err + } else { + for _, v := range p.OrderedColumns { + p.AllColumns = append(p.AllColumns, v) + } + } + p.AdditionalTags = nil + if tag_value != "" { + tag_list := strings.Split(tag_value, ",") + for t := range tag_list { + p.AdditionalTags = append(p.AdditionalTags, tag_list[t]) + } + } + + for rows.Next() { + err = p.accRow(rows, acc) + if err != nil { + return err + } + } + } + } + return nil +} + +type scanner interface { + Scan(dest ...interface{}) error +} + +var passwordKVMatcher, _ = regexp.Compile("password=\\S+ ?") + +func (p *Postgresql) SanitizedAddress() (_ string, err error) { + var canonicalizedAddress string + if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") { + canonicalizedAddress, err = pq.ParseURL(p.Address) + if err != nil { + return p.sanitizedAddress, err + } + } else { + canonicalizedAddress = p.Address + } + p.sanitizedAddress = passwordKVMatcher.ReplaceAllString(canonicalizedAddress, "") + + return p.sanitizedAddress, err +} + +func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error { + var columnVars []interface{} + var dbname bytes.Buffer + + // this is where we'll store the column name with its *interface{} + columnMap := make(map[string]*interface{}) + + for _, column := range p.OrderedColumns { + columnMap[column] = new(interface{}) + } + + // populate the array of interface{} with the pointers in the right order + for i := 0; i < len(columnMap); i++ { + columnVars = append(columnVars, columnMap[p.OrderedColumns[i]]) + } + + // deconstruct array of variables and send to Scan + err := row.Scan(columnVars...) + + if err != nil { + return err + } + if columnMap["datname"] != nil { + // extract the database name from the column map + dbnameChars := (*columnMap["datname"]).([]uint8) + for i := 0; i < len(dbnameChars); i++ { + dbname.WriteString(string(dbnameChars[i])) + } + } else { + dbname.WriteString("postgres") + } + + var tagAddress string + tagAddress, err = p.SanitizedAddress() + if err != nil { + return err + } + + // Process the additional tags + + tags := map[string]string{} + tags["server"] = tagAddress + tags["db"] = dbname.String() + + fields := make(map[string]interface{}) + for col, val := range columnMap { + _, ignore := ignoredColumns[col] + //if !ignore && *val != "" { + if !ignore { + for tag := range p.AdditionalTags { + if col == p.AdditionalTags[tag] { + value_type_p := fmt.Sprintf(`%T`, *val) + if value_type_p == "[]uint8" { + tags[col] = fmt.Sprintf(`%s`, *val) + } else if value_type_p == "int64" { + tags[col] = fmt.Sprintf(`%v`, *val) + } + } + } + fields[col] = *val + } + } + acc.AddFields("postgresql", fields, tags) + return nil +} + +func init() { + inputs.Add("postgresql_extensible", func() telegraf.Input { + return &Postgresql{} + }) +} diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go new file mode 100644 index 000000000..7fd907102 --- /dev/null +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go @@ -0,0 +1,98 @@ +package postgresql_extensible + +import ( + "fmt" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPostgresqlGeneratesMetrics(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &Postgresql{ + Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", + testutil.GetLocalHost()), + Databases: []string{"postgres"}, + Query: query{ + {Sqlquery: "select * from pg_stat_database", + Version: 901, + Withdbname: false, + Tagvalue: ""}, + }, + } + var acc testutil.Accumulator + err := p.Gather(&acc) + require.NoError(t, err) + + availableColumns := make(map[string]bool) + for _, col := range p.AllColumns { + availableColumns[col] = true + } + intMetrics := []string{ + "xact_commit", + "xact_rollback", + "blks_read", + "blks_hit", + "tup_returned", + "tup_fetched", + "tup_inserted", + "tup_updated", + "tup_deleted", + "conflicts", + "temp_files", + "temp_bytes", + "deadlocks", + "numbackends", + } + + floatMetrics := []string{ + "blk_read_time", + "blk_write_time", + } + + metricsCounted := 0 + + for _, metric := range intMetrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasIntField("postgresql", metric)) + metricsCounted++ + } + } + + for _, metric := range floatMetrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasFloatField("postgresql", metric)) + metricsCounted++ + } + } + + assert.True(t, metricsCounted > 0) + assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted) +} + +func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &Postgresql{ + Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", + testutil.GetLocalHost()), + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + for col := range p.IgnoredColumns() { + assert.False(t, acc.HasMeasurement(col)) + } +}