diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 2808ce2b5..243d815d1 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -33,6 +33,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdata/telegraf/plugins/inputs/ping" _ "github.com/influxdata/telegraf/plugins/inputs/postgresql" + _ "github.com/influxdata/telegraf/plugins/inputs/postgresql_extensible" _ "github.com/influxdata/telegraf/plugins/inputs/powerdns" _ "github.com/influxdata/telegraf/plugins/inputs/procstat" _ "github.com/influxdata/telegraf/plugins/inputs/prometheus" diff --git a/plugins/inputs/postgresql_extensible/README.md b/plugins/inputs/postgresql_extensible/README.md new file mode 100644 index 000000000..f44c66596 --- /dev/null +++ b/plugins/inputs/postgresql_extensible/README.md @@ -0,0 +1,59 @@ +# PostgreSQL plugin + +This postgresql plugin provides metrics for your postgres database. It has been designed to parse ithe sql queries in the plugin section of your telegraf.conf. + +For now only two queries are specified and it's up to you to add more; some per query parameters have been added : + +* The SQl query itself +* The minimum version supported (here in numeric display visible in pg_settings) +* A boolean to define if the query have to be run against some specific variables (defined in the databaes variable of the plugin section) +* The list of the column that have to be defined has tags + +``` + # specify address via a url matching: + # postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full] + # or a simple string: + # host=localhost user=pqotest password=... sslmode=... dbname=app_production + # + # All connection parameters are optional. # + # Without the dbname parameter, the driver will default to a database + # with the same name as the user. This dbname is just for instantiating a + # connection with the server and doesn't restrict the databases we are trying + # to grab metrics for. + # + address = "host=localhost user=postgres sslmode=disable" + # A list of databases to pull metrics about. If not specified, metrics for all + # databases are gathered. + # databases = ["app_production", "testing"] + # + # Define the toml config where the sql queries are stored + # New queries can be added, if the withdbname is set to true and there is no databases defined + # in the 'databases field', the sql query is ended by a 'is not null' in order to make the query + # succeed. + # Be careful that the sqlquery must contain the where clause with a part of the filtering, the plugin will + # add a 'IN (dbname list)' clause if the withdbname is set to true + # Example : + # The sqlquery : "SELECT * FROM pg_stat_database where datname" become "SELECT * FROM pg_stat_database where datname IN ('postgres', 'pgbench')" + # because the databases variable was set to ['postgres', 'pgbench' ] and the withdbname was true. + # Be careful that if the withdbname is set to false you d'ont have to define the where clause (aka with the dbname) + # the tagvalue field is used to define custom tags (separated by comas) + # + # Structure : + # [[inputs.postgresql_extensible.query]] + # sqlquery string + # version string + # withdbname boolean + # tagvalue string (coma separated) + [[inputs.postgresql_extensible.query]] + sqlquery="SELECT * FROM pg_stat_database where datname" + version=901 + withdbname=false + tagvalue="" + [[inputs.postgresql_extensible.query]] + sqlquery="SELECT * FROM pg_stat_bgwriter" + version=901 + withdbname=false + tagvalue="" +``` + +The system can be easily extended using homemade metrics collection tools or using postgreql extensions ([pg_stat_statements](http://www.postgresql.org/docs/current/static/pgstatstatements.html), [pg_proctab](https://github.com/markwkm/pg_proctab), [powa](http://dalibo.github.io/powa/)...) diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go new file mode 100644 index 000000000..44c452e0b --- /dev/null +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go @@ -0,0 +1,271 @@ +package postgresql_extensible + +import ( + "bytes" + "database/sql" + "fmt" + "regexp" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + + "github.com/lib/pq" +) + +type Postgresql struct { + Address string + Databases []string + OrderedColumns []string + AllColumns []string + AdditionalTags []string + sanitizedAddress string + Query []struct { + Sqlquery string + Version int + Withdbname bool + Tagvalue string + } +} + +type query []struct { + Sqlquery string + Version int + Withdbname bool + Tagvalue string +} + +var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true} + +var sampleConfig = ` + # specify address via a url matching: + # postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full] + # or a simple string: + # host=localhost user=pqotest password=... sslmode=... dbname=app_production + # + # All connection parameters are optional. # + # Without the dbname parameter, the driver will default to a database + # with the same name as the user. This dbname is just for instantiating a + # connection with the server and doesn't restrict the databases we are trying + # to grab metrics for. + # + address = "host=localhost user=postgres sslmode=disable" + # A list of databases to pull metrics about. If not specified, metrics for all + # databases are gathered. + # databases = ["app_production", "testing"] + # + # Define the toml config where the sql queries are stored + # New queries can be added, if the withdbname is set to true and there is no databases defined + # in the 'databases field', the sql query is ended by a 'is not null' in order to make the query + # succeed. + # Example : + # The sqlquery : "SELECT * FROM pg_stat_database where datname" become "SELECT * FROM pg_stat_database where datname IN ('postgres', 'pgbench')" + # because the databases variable was set to ['postgres', 'pgbench' ] and the withdbname was true. + # Be careful that if the withdbname is set to false you d'ont have to define the where clause (aka with the dbname) + # the tagvalue field is used to define custom tags (separated by comas) + # + # Structure : + # [[inputs.postgresql_extensible.query]] + # sqlquery string + # version string + # withdbname boolean + # tagvalue string (coma separated) + [[inputs.postgresql_extensible.query]] + sqlquery="SELECT * FROM pg_stat_database" + version=901 + withdbname=false + tagvalue="" + [[inputs.postgresql_extensible.query]] + sqlquery="SELECT * FROM pg_stat_bgwriter" + version=901 + withdbname=false + tagvalue="" +` + +func (p *Postgresql) SampleConfig() string { + return sampleConfig +} + +func (p *Postgresql) Description() string { + return "Read metrics from one or many postgresql servers" +} + +func (p *Postgresql) IgnoredColumns() map[string]bool { + return ignoredColumns +} + +var localhost = "host=localhost sslmode=disable" + +func (p *Postgresql) Gather(acc telegraf.Accumulator) error { + + var sql_query string + var query_addon string + var db_version int + var query string + var tag_value string + + if p.Address == "" || p.Address == "localhost" { + p.Address = localhost + } + + db, err := sql.Open("postgres", p.Address) + if err != nil { + return err + } + + defer db.Close() + + // Retreiving the database version + + query = `select substring(setting from 1 for 3) as version from pg_settings where name='server_version_num'` + err = db.QueryRow(query).Scan(&db_version) + if err != nil { + return err + } + // We loop in order to process each query + // Query is not run if Database version does not match the query version. + + for i := range p.Query { + sql_query = p.Query[i].Sqlquery + tag_value = p.Query[i].Tagvalue + + if p.Query[i].Withdbname { + if len(p.Databases) != 0 { + query_addon = fmt.Sprintf(` IN ('%s')`, + strings.Join(p.Databases, "','")) + } else { + query_addon = " is not null" + } + } else { + query_addon = "" + } + sql_query += query_addon + + if p.Query[i].Version <= db_version { + rows, err := db.Query(sql_query) + if err != nil { + return err + } + + defer rows.Close() + + // grab the column information from the result + p.OrderedColumns, err = rows.Columns() + if err != nil { + return err + } else { + for _, v := range p.OrderedColumns { + p.AllColumns = append(p.AllColumns, v) + } + } + p.AdditionalTags = nil + if tag_value != "" { + tag_list := strings.Split(tag_value, ",") + for t := range tag_list { + p.AdditionalTags = append(p.AdditionalTags, tag_list[t]) + } + } + + for rows.Next() { + err = p.accRow(rows, acc) + if err != nil { + return err + } + } + } + } + return nil +} + +type scanner interface { + Scan(dest ...interface{}) error +} + +var passwordKVMatcher, _ = regexp.Compile("password=\\S+ ?") + +func (p *Postgresql) SanitizedAddress() (_ string, err error) { + var canonicalizedAddress string + if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") { + canonicalizedAddress, err = pq.ParseURL(p.Address) + if err != nil { + return p.sanitizedAddress, err + } + } else { + canonicalizedAddress = p.Address + } + p.sanitizedAddress = passwordKVMatcher.ReplaceAllString(canonicalizedAddress, "") + + return p.sanitizedAddress, err +} + +func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error { + var columnVars []interface{} + var dbname bytes.Buffer + + // this is where we'll store the column name with its *interface{} + columnMap := make(map[string]*interface{}) + + for _, column := range p.OrderedColumns { + columnMap[column] = new(interface{}) + } + + // populate the array of interface{} with the pointers in the right order + for i := 0; i < len(columnMap); i++ { + columnVars = append(columnVars, columnMap[p.OrderedColumns[i]]) + } + + // deconstruct array of variables and send to Scan + err := row.Scan(columnVars...) + + if err != nil { + return err + } + if columnMap["datname"] != nil { + // extract the database name from the column map + dbnameChars := (*columnMap["datname"]).([]uint8) + for i := 0; i < len(dbnameChars); i++ { + dbname.WriteString(string(dbnameChars[i])) + } + } else { + dbname.WriteString("postgres") + } + + var tagAddress string + tagAddress, err = p.SanitizedAddress() + if err != nil { + return err + } + + // Process the additional tags + + tags := map[string]string{} + tags["server"] = tagAddress + tags["db"] = dbname.String() + + fields := make(map[string]interface{}) + for col, val := range columnMap { + _, ignore := ignoredColumns[col] + //if !ignore && *val != "" { + if !ignore { + for tag := range p.AdditionalTags { + if col == p.AdditionalTags[tag] { + value_type_p := fmt.Sprintf(`%T`, *val) + if value_type_p == "[]uint8" { + tags[col] = fmt.Sprintf(`%s`, *val) + } else if value_type_p == "int64" { + tags[col] = fmt.Sprintf(`%v`, *val) + } + } + } + fields[col] = *val + } + } + acc.AddFields("postgresql", fields, tags) + return nil +} + +func init() { + inputs.Add("postgresql_extensible", func() telegraf.Input { + return &Postgresql{} + }) +} diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go new file mode 100644 index 000000000..7fd907102 --- /dev/null +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go @@ -0,0 +1,98 @@ +package postgresql_extensible + +import ( + "fmt" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPostgresqlGeneratesMetrics(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &Postgresql{ + Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", + testutil.GetLocalHost()), + Databases: []string{"postgres"}, + Query: query{ + {Sqlquery: "select * from pg_stat_database", + Version: 901, + Withdbname: false, + Tagvalue: ""}, + }, + } + var acc testutil.Accumulator + err := p.Gather(&acc) + require.NoError(t, err) + + availableColumns := make(map[string]bool) + for _, col := range p.AllColumns { + availableColumns[col] = true + } + intMetrics := []string{ + "xact_commit", + "xact_rollback", + "blks_read", + "blks_hit", + "tup_returned", + "tup_fetched", + "tup_inserted", + "tup_updated", + "tup_deleted", + "conflicts", + "temp_files", + "temp_bytes", + "deadlocks", + "numbackends", + } + + floatMetrics := []string{ + "blk_read_time", + "blk_write_time", + } + + metricsCounted := 0 + + for _, metric := range intMetrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasIntField("postgresql", metric)) + metricsCounted++ + } + } + + for _, metric := range floatMetrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasFloatField("postgresql", metric)) + metricsCounted++ + } + } + + assert.True(t, metricsCounted > 0) + assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted) +} + +func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &Postgresql{ + Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", + testutil.GetLocalHost()), + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + for col := range p.IgnoredColumns() { + assert.False(t, acc.HasMeasurement(col)) + } +}