From 6994b25656066e4058d8a8781da77eb1ab8d7545 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 24 Jan 2017 15:36:36 -0500 Subject: [PATCH] fix postgresql 'name', and 'oid' data types by switching to a driver (#1750) that handles them properly --- CHANGELOG.md | 3 + Godeps | 2 +- plugins/inputs/postgresql/README.md | 4 +- plugins/inputs/postgresql/connect.go | 99 +++++++++++++++++++ plugins/inputs/postgresql/postgresql.go | 14 +-- plugins/inputs/postgresql/postgresql_test.go | 31 +++++- .../postgresql_extensible.go | 23 +++-- .../postgresql_extensible_test.go | 25 +++++ testutil/accumulator.go | 38 ++++++- 9 files changed, 211 insertions(+), 28 deletions(-) create mode 100644 plugins/inputs/postgresql/connect.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d321e89b9..68d43f2f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -235,8 +235,11 @@ which can be installed via evaluated at every flush interval, rather than once at startup. This makes it consistent with the behavior of `collection_jitter`. +- postgresql plugins now handle oid and name typed columns seamlessly, previously they were ignored/skipped. + ### Features +- [#1617](https://github.com/influxdata/telegraf/pull/1617): postgresql_extensible now handles name and oid types correctly. - [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag. - [#1525](https://github.com/influxdata/telegraf/pull/1525): Support setting per-device and total metrics for Docker network and blockio. - [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats() diff --git a/Godeps b/Godeps index 885213c96..99606414e 100644 --- a/Godeps +++ b/Godeps @@ -33,7 +33,6 @@ github.com/kardianos/osext 29ae4ffbc9a6fe9fb2bc5029050ce6996ea1d3bc github.com/kardianos/service 5e335590050d6d00f3aa270217d288dda1c94d0a github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142 github.com/klauspost/crc32 19b0b332c9e4516a6370a0456e6182c3b5036720 -github.com/lib/pq e182dc4027e2ded4b19396d638610f2653295f36 github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3bb336a453 github.com/miekg/dns cce6c130cdb92c752850880fd285bea1d64439dd github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504 @@ -63,3 +62,4 @@ gopkg.in/dancannon/gorethink.v1 7d1af5be49cb5ecc7b177bf387d232050299d6ef gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 gopkg.in/mgo.v2 d90005c5262a3463800497ea5a89aed5fe22c886 gopkg.in/yaml.v2 a83829b6f1293c91addabc89d0571c246397bbf4 +github.com/jackc/pgx bb73d8427902891bbad7b949b9c60b32949d935f diff --git a/plugins/inputs/postgresql/README.md b/plugins/inputs/postgresql/README.md index e5e9a8961..e309aa80f 100644 --- a/plugins/inputs/postgresql/README.md +++ b/plugins/inputs/postgresql/README.md @@ -4,8 +4,8 @@ This postgresql plugin provides metrics for your postgres database. It currently ``` pg version 9.2+ 9.1 8.3-9.0 8.1-8.2 7.4-8.0(unsupported) --- --- --- ------- ------- ------- -datid* x x x x -datname* x x x x +datid x x x x +datname x x x x numbackends x x x x x xact_commit x x x x x xact_rollback x x x x x diff --git a/plugins/inputs/postgresql/connect.go b/plugins/inputs/postgresql/connect.go new file mode 100644 index 000000000..77858cda2 --- /dev/null +++ b/plugins/inputs/postgresql/connect.go @@ -0,0 +1,99 @@ +package postgresql + +import ( + "database/sql" + "fmt" + "net" + "net/url" + "sort" + "strings" + + "github.com/jackc/pgx" + "github.com/jackc/pgx/stdlib" +) + +// pulled from lib/pq +// ParseURL no longer needs to be used by clients of this library since supplying a URL as a +// connection string to sql.Open() is now supported: +// +// sql.Open("postgres", "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full") +// +// It remains exported here for backwards-compatibility. +// +// ParseURL converts a url to a connection string for driver.Open. +// Example: +// +// "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full" +// +// converts to: +// +// "user=bob password=secret host=1.2.3.4 port=5432 dbname=mydb sslmode=verify-full" +// +// A minimal example: +// +// "postgres://" +// +// This will be blank, causing driver.Open to use all of the defaults +func ParseURL(uri string) (string, error) { + u, err := url.Parse(uri) + if err != nil { + return "", err + } + + if u.Scheme != "postgres" && u.Scheme != "postgresql" { + return "", fmt.Errorf("invalid connection protocol: %s", u.Scheme) + } + + var kvs []string + escaper := strings.NewReplacer(` `, `\ `, `'`, `\'`, `\`, `\\`) + accrue := func(k, v string) { + if v != "" { + kvs = append(kvs, k+"="+escaper.Replace(v)) + } + } + + if u.User != nil { + v := u.User.Username() + accrue("user", v) + + v, _ = u.User.Password() + accrue("password", v) + } + + if host, port, err := net.SplitHostPort(u.Host); err != nil { + accrue("host", u.Host) + } else { + accrue("host", host) + accrue("port", port) + } + + if u.Path != "" { + accrue("dbname", u.Path[1:]) + } + + q := u.Query() + for k := range q { + accrue(k, q.Get(k)) + } + + sort.Strings(kvs) // Makes testing easier (not a performance concern) + return strings.Join(kvs, " "), nil +} + +func Connect(address string) (*sql.DB, error) { + if strings.HasPrefix(address, "postgres://") || strings.HasPrefix(address, "postgresql://") { + return sql.Open("pgx", address) + } + + config, err := pgx.ParseDSN(address) + if err != nil { + return nil, err + } + + pool, err := pgx.NewConnPool(pgx.ConnPoolConfig{ConnConfig: config}) + if err != nil { + return nil, err + } + + return stdlib.OpenFromConnPool(pool) +} diff --git a/plugins/inputs/postgresql/postgresql.go b/plugins/inputs/postgresql/postgresql.go index 0e7cdb509..7019762ed 100644 --- a/plugins/inputs/postgresql/postgresql.go +++ b/plugins/inputs/postgresql/postgresql.go @@ -2,7 +2,6 @@ package postgresql import ( "bytes" - "database/sql" "fmt" "regexp" "sort" @@ -10,8 +9,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" - - "github.com/lib/pq" ) type Postgresql struct { @@ -23,7 +20,7 @@ type Postgresql struct { sanitizedAddress string } -var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true} +var ignoredColumns = map[string]bool{"stats_reset": true} var sampleConfig = ` ## specify address via a url matching: @@ -71,7 +68,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { p.Address = localhost } - db, err := sql.Open("postgres", p.Address) + db, err := Connect(p.Address) if err != nil { return err } @@ -149,7 +146,7 @@ var passwordKVMatcher, _ = regexp.Compile("password=\\S+ ?") func (p *Postgresql) SanitizedAddress() (_ string, err error) { var canonicalizedAddress string if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") { - canonicalizedAddress, err = pq.ParseURL(p.Address) + canonicalizedAddress, err = ParseURL(p.Address) if err != nil { return p.sanitizedAddress, err } @@ -185,10 +182,7 @@ func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error { } if columnMap["datname"] != nil { // extract the database name from the column map - dbnameChars := (*columnMap["datname"]).([]uint8) - for i := 0; i < len(dbnameChars); i++ { - dbname.WriteString(string(dbnameChars[i])) - } + dbname.WriteString((*columnMap["datname"]).(string)) } else { dbname.WriteString("postgres") } diff --git a/plugins/inputs/postgresql/postgresql_test.go b/plugins/inputs/postgresql/postgresql_test.go index 64926f61e..a0690961d 100644 --- a/plugins/inputs/postgresql/postgresql_test.go +++ b/plugins/inputs/postgresql/postgresql_test.go @@ -28,6 +28,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { for _, col := range p.AllColumns { availableColumns[col] = true } + intMetrics := []string{ "xact_commit", "xact_rollback", @@ -42,7 +43,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "temp_files", "temp_bytes", "deadlocks", - "numbackends", "buffers_alloc", "buffers_backend", "buffers_backend_fsync", @@ -53,9 +53,20 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "maxwritten_clean", } + int32Metrics := []string{ + "numbackends", + } + floatMetrics := []string{ "blk_read_time", "blk_write_time", + "checkpoint_write_time", + "checkpoint_sync_time", + } + + stringMetrics := []string{ + "datname", + "datid", } metricsCounted := 0 @@ -68,6 +79,14 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { } } + for _, metric := range int32Metrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasInt32Field("postgresql", metric)) + metricsCounted++ + } + } + for _, metric := range floatMetrics { _, ok := availableColumns[metric] if ok { @@ -76,8 +95,16 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { } } + for _, metric := range stringMetrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasStringField("postgresql", metric)) + metricsCounted++ + } + } + assert.True(t, metricsCounted > 0) - //assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted) + assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted) } func TestPostgresqlTagsMetricsWithDatabaseName(t *testing.T) { diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go index beb010fce..00729bf75 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go @@ -2,7 +2,6 @@ package postgresql_extensible import ( "bytes" - "database/sql" "fmt" "log" "regexp" @@ -10,8 +9,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" - - "github.com/lib/pq" + "github.com/influxdata/telegraf/plugins/inputs/postgresql" ) type Postgresql struct { @@ -40,7 +38,7 @@ type query []struct { Measurement string } -var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true} +var ignoredColumns = map[string]bool{"stats_reset": true} var sampleConfig = ` ## specify address via a url matching: @@ -126,7 +124,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { p.Address = localhost } - db, err := sql.Open("postgres", p.Address) + db, err := postgresql.Connect(p.Address) if err != nil { return err } @@ -212,7 +210,7 @@ func (p *Postgresql) SanitizedAddress() (_ string, err error) { } var canonicalizedAddress string if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") { - canonicalizedAddress, err = pq.ParseURL(p.Address) + canonicalizedAddress, err = postgresql.ParseURL(p.Address) if err != nil { return p.sanitizedAddress, err } @@ -248,10 +246,7 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula } if columnMap["datname"] != nil { // extract the database name from the column map - dbnameChars := (*columnMap["datname"]).([]uint8) - for i := 0; i < len(dbnameChars); i++ { - dbname.WriteString(string(dbnameChars[i])) - } + dbname.WriteString((*columnMap["datname"]).(string)) } else { dbname.WriteString("postgres") } @@ -275,19 +270,23 @@ COLUMN: if ignore || *val == nil { continue } + for _, tag := range p.AdditionalTags { if col != tag { continue } switch v := (*val).(type) { + case string: + tags[col] = v case []byte: tags[col] = string(v) - case int64: + case int64, int32, int: tags[col] = fmt.Sprintf("%d", v) + default: + log.Println("failed to add additional tag", col) } continue COLUMN } - if v, ok := (*val).([]byte); ok { fields[col] = string(v) } else { diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go index 7fd907102..f92284ee4 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go @@ -33,6 +33,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { for _, col := range p.AllColumns { availableColumns[col] = true } + intMetrics := []string{ "xact_commit", "xact_rollback", @@ -47,6 +48,9 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "temp_files", "temp_bytes", "deadlocks", + } + + int32Metrics := []string{ "numbackends", } @@ -55,6 +59,11 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "blk_write_time", } + stringMetrics := []string{ + "datname", + "datid", + } + metricsCounted := 0 for _, metric := range intMetrics { @@ -65,6 +74,14 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { } } + for _, metric := range int32Metrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasInt32Field("postgresql", metric)) + metricsCounted++ + } + } + for _, metric := range floatMetrics { _, ok := availableColumns[metric] if ok { @@ -73,6 +90,14 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { } } + for _, metric := range stringMetrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasStringField("postgresql", metric)) + metricsCounted++ + } + } + assert.True(t, metricsCounted > 0) assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted) } diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 2efee5572..4f131ec8f 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -221,7 +221,7 @@ func (a *Accumulator) AssertDoesNotContainMeasurement(t *testing.T, measurement } } -// HasIntValue returns true if the measurement has an Int value +// HasIntField returns true if the measurement has an Int value func (a *Accumulator) HasIntField(measurement string, field string) bool { a.Lock() defer a.Unlock() @@ -239,6 +239,42 @@ func (a *Accumulator) HasIntField(measurement string, field string) bool { return false } +// HasInt32Field returns true if the measurement has an Int value +func (a *Accumulator) HasInt32Field(measurement string, field string) bool { + a.Lock() + defer a.Unlock() + for _, p := range a.Metrics { + if p.Measurement == measurement { + for fieldname, value := range p.Fields { + if fieldname == field { + _, ok := value.(int32) + return ok + } + } + } + } + + return false +} + +// HasStringField returns true if the measurement has an String value +func (a *Accumulator) HasStringField(measurement string, field string) bool { + a.Lock() + defer a.Unlock() + for _, p := range a.Metrics { + if p.Measurement == measurement { + for fieldname, value := range p.Fields { + if fieldname == field { + _, ok := value.(string) + return ok + } + } + } + } + + return false +} + // HasUIntValue returns true if the measurement has a UInt value func (a *Accumulator) HasUIntField(measurement string, field string) bool { a.Lock()