From cfce43caacbb29590477eb238c61175532b11705 Mon Sep 17 00:00:00 2001 From: Kevin Bouwkamp Date: Sun, 13 Sep 2015 04:30:38 -0400 Subject: [PATCH 1/8] Generating metric information dynamically. Makes compatible with postgresql versions < 9.2 --- plugins/postgresql/postgresql.go | 109 +++++++++++++------------- plugins/postgresql/postgresql_test.go | 31 ++++++++ 2 files changed, 85 insertions(+), 55 deletions(-) diff --git a/plugins/postgresql/postgresql.go b/plugins/postgresql/postgresql.go index 1a467fee9..a7a8d1acd 100644 --- a/plugins/postgresql/postgresql.go +++ b/plugins/postgresql/postgresql.go @@ -1,7 +1,10 @@ package postgresql import ( + "bytes" "database/sql" + "fmt" + "strings" "github.com/influxdb/telegraf/plugins" @@ -9,8 +12,9 @@ import ( ) type Server struct { - Address string - Databases []string + Address string + Databases []string + OrderedColumns []string } type Postgresql struct { @@ -51,6 +55,7 @@ func (p *Postgresql) Description() string { } var localhost = &Server{Address: "sslmode=disable"} +var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true} func (p *Postgresql) Gather(acc plugins.Accumulator) error { if len(p.Servers) == 0 { @@ -69,6 +74,8 @@ func (p *Postgresql) Gather(acc plugins.Accumulator) error { } func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error { + var query string + if serv.Address == "" || serv.Address == "localhost" { serv = localhost } @@ -81,77 +88,69 @@ func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error { defer db.Close() if len(serv.Databases) == 0 { - rows, err := db.Query(`SELECT * FROM pg_stat_database`) + query = `SELECT * FROM pg_stat_database` + } else { + query = fmt.Sprintf(`SELECT * FROM pg_stat_database WHERE datname IN ('%s')`, strings.Join(serv.Databases, "','")) + } + + rows, err := db.Query(query) + if err != nil { + return err + } + + defer rows.Close() + + serv.OrderedColumns, err = rows.Columns() + if err != nil { + return err + } + + for rows.Next() { + err := p.accRow(rows, acc, serv) if err != nil { return err } - - defer rows.Close() - - for rows.Next() { - err := p.accRow(rows, acc, serv.Address) - if err != nil { - return err - } - } - - return rows.Err() - } else { - for _, name := range serv.Databases { - row := db.QueryRow(`SELECT * FROM pg_stat_database WHERE datname=$1`, name) - - err := p.accRow(row, acc, serv.Address) - if err != nil { - return err - } - } } - return nil + return rows.Err() } type scanner interface { Scan(dest ...interface{}) error } -func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator, server string) error { - var ignore interface{} - var name string - var commit, rollback, read, hit int64 - var returned, fetched, inserted, updated, deleted int64 - var conflicts, temp_files, temp_bytes, deadlocks int64 - var read_time, write_time float64 +func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator, serv *Server) error { + var columnVars []interface{} + var dbname bytes.Buffer - err := row.Scan(&ignore, &name, &ignore, - &commit, &rollback, - &read, &hit, - &returned, &fetched, &inserted, &updated, &deleted, - &conflicts, &temp_files, &temp_bytes, - &deadlocks, &read_time, &write_time, - &ignore, - ) + columnMap := make(map[string]*interface{}) + + for _, column := range serv.OrderedColumns { + columnMap[column] = new(interface{}) + } + + for i := 0; i < len(columnMap); i++ { + columnVars = append(columnVars, columnMap[serv.OrderedColumns[i]]) + } + + err := row.Scan(columnVars...) if err != nil { return err } - tags := map[string]string{"server": server, "db": name} + dbnameChars := (*columnMap["datname"]).([]uint8) + for i := 0; i < len(dbnameChars); i++ { + dbname.WriteString(string(dbnameChars[i])) + } - acc.Add("xact_commit", commit, tags) - acc.Add("xact_rollback", rollback, tags) - acc.Add("blks_read", read, tags) - acc.Add("blks_hit", hit, tags) - acc.Add("tup_returned", returned, tags) - acc.Add("tup_fetched", fetched, tags) - acc.Add("tup_inserted", inserted, tags) - acc.Add("tup_updated", updated, tags) - acc.Add("tup_deleted", deleted, tags) - acc.Add("conflicts", conflicts, tags) - acc.Add("temp_files", temp_files, tags) - acc.Add("temp_bytes", temp_bytes, tags) - acc.Add("deadlocks", deadlocks, tags) - acc.Add("blk_read_time", read_time, tags) - acc.Add("blk_write_time", read_time, tags) + tags := map[string]string{"server": serv.Address, "db": dbname.String()} + + for col, val := range columnMap { + if !ignoredColumns[col] { + acc.Add(col, *val, tags) + } + } return nil } diff --git a/plugins/postgresql/postgresql_test.go b/plugins/postgresql/postgresql_test.go index 363d289f9..7910425f5 100644 --- a/plugins/postgresql/postgresql_test.go +++ b/plugins/postgresql/postgresql_test.go @@ -117,3 +117,34 @@ func TestPostgresqlDefaultsToAllDatabases(t *testing.T) { assert.True(t, found) } + +func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) { + // if testing.Short() { + // t.Skip("Skipping integration test in short mode") + // } + + p := &Postgresql{ + Servers: []*Server{ + { + Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", + testutil.GetLocalHost()), + }, + }, + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + var found bool + + for _, pnt := range acc.Points { + if pnt.Measurement == "datname" || pnt.Measurement == "datid" || pnt.Measurement == "stats_reset" { + found = true + break + } + } + + assert.False(t, found) +} From 708694139a2de5c7545ca4a0f688475f4213c134 Mon Sep 17 00:00:00 2001 From: Kevin Bouwkamp Date: Sun, 13 Sep 2015 04:34:54 -0400 Subject: [PATCH 2/8] uncomment to skip test in short mode --- plugins/postgresql/postgresql_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/postgresql/postgresql_test.go b/plugins/postgresql/postgresql_test.go index 7910425f5..f05fddb68 100644 --- a/plugins/postgresql/postgresql_test.go +++ b/plugins/postgresql/postgresql_test.go @@ -119,9 +119,9 @@ func TestPostgresqlDefaultsToAllDatabases(t *testing.T) { } func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) { - // if testing.Short() { - // t.Skip("Skipping integration test in short mode") - // } + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } p := &Postgresql{ Servers: []*Server{ From dabf8b74eb8f865f4a01d3c0c0fb573828d267b4 Mon Sep 17 00:00:00 2001 From: Kevin Bouwkamp Date: Sun, 13 Sep 2015 04:40:32 -0400 Subject: [PATCH 3/8] Add a few notes about the connection strings --- plugins/postgresql/postgresql.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/plugins/postgresql/postgresql.go b/plugins/postgresql/postgresql.go index a7a8d1acd..007ac20bf 100644 --- a/plugins/postgresql/postgresql.go +++ b/plugins/postgresql/postgresql.go @@ -26,13 +26,17 @@ var sampleConfig = ` [[postgresql.servers]] # specify address via a url matching: - # postgres://[pqgotest[:password]]@localhost?sslmode=[disable|verify-ca|verify-full] + # postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full] # or a simple string: - # host=localhost user=pqotest password=... sslmode=... + # host=localhost user=pqotest password=... sslmode=... dbname=app_production # # All connection parameters are optional. By default, the host is localhost # and the user is the currently running user. For localhost, we default # to sslmode=disable as well. + # Without the dbname parameter, the driver will default to a database + # with the same name as the user. This dbname is just for instantiating a + # connection with the server and doesn't restrict the databases we are trying + # to grab metrics for. # address = "sslmode=disable" From eec815d3474859f9e92f389bee339abb0c903437 Mon Sep 17 00:00:00 2001 From: Kevin Bouwkamp Date: Sun, 13 Sep 2015 04:43:08 -0400 Subject: [PATCH 4/8] fix some more indentation... --- plugins/postgresql/postgresql.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/plugins/postgresql/postgresql.go b/plugins/postgresql/postgresql.go index 007ac20bf..a4ffc6988 100644 --- a/plugins/postgresql/postgresql.go +++ b/plugins/postgresql/postgresql.go @@ -33,10 +33,11 @@ var sampleConfig = ` # All connection parameters are optional. By default, the host is localhost # and the user is the currently running user. For localhost, we default # to sslmode=disable as well. - # Without the dbname parameter, the driver will default to a database - # with the same name as the user. This dbname is just for instantiating a - # connection with the server and doesn't restrict the databases we are trying - # to grab metrics for. + # + # Without the dbname parameter, the driver will default to a database + # with the same name as the user. This dbname is just for instantiating a + # connection with the server and doesn't restrict the databases we are trying + # to grab metrics for. # address = "sslmode=disable" From a5bbb1f9a5733b9a40eaaa93d67e2f98c0e2f72a Mon Sep 17 00:00:00 2001 From: Kevin Bouwkamp Date: Sun, 13 Sep 2015 12:51:50 -0400 Subject: [PATCH 5/8] add some comments --- plugins/postgresql/postgresql.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/plugins/postgresql/postgresql.go b/plugins/postgresql/postgresql.go index a4ffc6988..5310f7bd6 100644 --- a/plugins/postgresql/postgresql.go +++ b/plugins/postgresql/postgresql.go @@ -105,13 +105,14 @@ func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error { defer rows.Close() + // grab the column information from the result serv.OrderedColumns, err = rows.Columns() if err != nil { return err } for rows.Next() { - err := p.accRow(rows, acc, serv) + err = p.accRow(rows, acc, serv) if err != nil { return err } @@ -128,22 +129,26 @@ func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator, serv *Server) var columnVars []interface{} var dbname bytes.Buffer + // this is where we'll store the column name with its *interface{} columnMap := make(map[string]*interface{}) for _, column := range serv.OrderedColumns { columnMap[column] = new(interface{}) } + // populate the array of interface{} with the pointers in the right order for i := 0; i < len(columnMap); i++ { columnVars = append(columnVars, columnMap[serv.OrderedColumns[i]]) } + // deconstruct array of variables and send to Scan err := row.Scan(columnVars...) if err != nil { return err } + // extract the database name from the column map dbnameChars := (*columnMap["datname"]).([]uint8) for i := 0; i < len(dbnameChars); i++ { dbname.WriteString(string(dbnameChars[i])) From 7390542da27d18fb411dfa41a2e98f53d567f330 Mon Sep 17 00:00:00 2001 From: Kevin Bouwkamp Date: Sun, 13 Sep 2015 20:11:49 -0400 Subject: [PATCH 6/8] Makes the test also work across pg versions --- plugins/postgresql/postgresql.go | 10 ++++++++-- plugins/postgresql/postgresql_test.go | 23 +++++++++++++++++++++-- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/plugins/postgresql/postgresql.go b/plugins/postgresql/postgresql.go index 5310f7bd6..e72ada85f 100644 --- a/plugins/postgresql/postgresql.go +++ b/plugins/postgresql/postgresql.go @@ -21,6 +21,8 @@ type Postgresql struct { Servers []*Server } +var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true} + var sampleConfig = ` # specify servers via an array of tables [[postgresql.servers]] @@ -59,8 +61,11 @@ func (p *Postgresql) Description() string { return "Read metrics from one or many postgresql servers" } +func (p *Postgresql) IgnoredColumns() map[string]bool { + return ignoredColumns +} + var localhost = &Server{Address: "sslmode=disable"} -var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true} func (p *Postgresql) Gather(acc plugins.Accumulator) error { if len(p.Servers) == 0 { @@ -157,7 +162,8 @@ func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator, serv *Server) tags := map[string]string{"server": serv.Address, "db": dbname.String()} for col, val := range columnMap { - if !ignoredColumns[col] { + _, ignore := ignoredColumns[col] + if !ignore { acc.Add(col, *val, tags) } } diff --git a/plugins/postgresql/postgresql_test.go b/plugins/postgresql/postgresql_test.go index f05fddb68..4c44addce 100644 --- a/plugins/postgresql/postgresql_test.go +++ b/plugins/postgresql/postgresql_test.go @@ -29,6 +29,11 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { err := p.Gather(&acc) require.NoError(t, err) + availableColumns := make(map[string]bool) + for _, col := range p.Servers[0].OrderedColumns { + availableColumns[col] = true + } + intMetrics := []string{ "xact_commit", "xact_rollback", @@ -43,6 +48,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "temp_files", "temp_bytes", "deadlocks", + "numbackends", } floatMetrics := []string{ @@ -50,13 +56,26 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "blk_write_time", } + metricsCounted := 0 + for _, metric := range intMetrics { - assert.True(t, acc.HasIntValue(metric)) + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasIntValue(metric)) + metricsCounted++ + } } for _, metric := range floatMetrics { - assert.True(t, acc.HasFloatValue(metric)) + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasFloatValue(metric)) + metricsCounted++ + } } + + assert.True(t, metricsCounted > 0) + assert.Equal(t, len(availableColumns) - len(p.IgnoredColumns()), metricsCounted) } func TestPostgresqlTagsMetricsWithDatabaseName(t *testing.T) { From db0a8e3794b763fdb8f108b52bb8e773337bb2f3 Mon Sep 17 00:00:00 2001 From: Kevin Bouwkamp Date: Sun, 13 Sep 2015 20:22:19 -0400 Subject: [PATCH 7/8] no longer duplicate ignored columns here --- plugins/postgresql/postgresql_test.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/plugins/postgresql/postgresql_test.go b/plugins/postgresql/postgresql_test.go index 4c44addce..e9ff99e4e 100644 --- a/plugins/postgresql/postgresql_test.go +++ b/plugins/postgresql/postgresql_test.go @@ -156,14 +156,7 @@ func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) { err := p.Gather(&acc) require.NoError(t, err) - var found bool - - for _, pnt := range acc.Points { - if pnt.Measurement == "datname" || pnt.Measurement == "datid" || pnt.Measurement == "stats_reset" { - found = true - break - } + for col := range p.IgnoredColumns() { + assert.False(t, acc.HasMeasurement(col)) } - - assert.False(t, found) } From ecc237e13a01ece12cba1e318d7a6b249fb7faff Mon Sep 17 00:00:00 2001 From: Kevin Bouwkamp Date: Mon, 14 Sep 2015 20:50:07 -0400 Subject: [PATCH 8/8] add bugfix in CHANGELOG and some notes in pg README --- CHANGELOG.md | 1 + plugins/postgresql/README.md | 30 ++++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) create mode 100644 plugins/postgresql/README.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 2cfaefda5..e8c264cc4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ will still be backwards compatible if only `url` is specified. - [#178](https://github.com/influxdb/telegraf/issues/178): redis plugin, multiple server thread hang bug - Fix net plugin on darwin - [#84](https://github.com/influxdb/telegraf/issues/84): Fix docker plugin on CentOS. Thanks @neezgee! +- [#192](https://github.com/influxdb/telegraf/issues/192): Increase compatibility of postgresql plugin. Now supports versions 8.1+ ## v0.1.8 [2015-09-04] diff --git a/plugins/postgresql/README.md b/plugins/postgresql/README.md new file mode 100644 index 000000000..ce0ae18d6 --- /dev/null +++ b/plugins/postgresql/README.md @@ -0,0 +1,30 @@ +# PostgreSQL plugin + +This postgresql plugin provides metrics for your postgres database. It currently works with postgres versions 8.1+. It uses data from the built in _pg_stat_database_ view. The metrics recorded depend on your version of postgres. See table: +``` +pg version 9.2+ 9.1 8.3-9.0 8.1-8.2 7.4-8.0(unsupported) +--- --- --- ------- ------- ------- +datid* x x x x +datname* x x x x +numbackends x x x x x +xact_commit x x x x x +xact_rollback x x x x x +blks_read x x x x x +blks_hit x x x x x +tup_returned x x x +tup_fetched x x x +tup_inserted x x x +tup_updated x x x +tup_deleted x x x +conflicts x x +temp_files x +temp_bytes x +deadlocks x +blk_read_time x +blk_write_time x +stats_reset* x x +``` + +_* value ignored and therefore not recorded._ + +More information about the meaning of these metrics can be found in the [PostgreSQL Documentation](http://www.postgresql.org/docs/9.2/static/monitoring-stats.html#PG-STAT-DATABASE-VIEW)