From bccef14fbf05c33e0da15bbb9936518e4dec9505 Mon Sep 17 00:00:00 2001 From: James Lawrence Date: Wed, 10 Aug 2016 09:16:24 -0400 Subject: [PATCH] fix postgresql 'name', and 'oid' data types by switching to a driver that handles them properly --- CHANGELOG.md | 4 + Godeps | 2 +- plugins/inputs/postgresql/postgresql.go | 102 ++++++++++++++-- plugins/inputs/postgresql/postgresql_test.go | 31 ++++- .../postgresql_extensible.go | 111 ++++++++++++++++-- .../postgresql_extensible_test.go | 25 ++++ testutil/accumulator.go | 38 +++++- 7 files changed, 291 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 762c7ceff..deed71f01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,8 +40,12 @@ should now look like: evaluated at every flush interval, rather than once at startup. This makes it consistent with the behavior of `collection_jitter`. +- postgresql plugins switched drivers to better handle oid and name data types. +allowed removal of special casing data types by column name. + ### Features +- [#1617](https://github.com/influxdata/telegraf/pull/1617): postgresql_extensible now handles name and oid types correctly. - [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag. - [#1525](https://github.com/influxdata/telegraf/pull/1525): Support setting per-device and total metrics for Docker network and blockio. - [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats() diff --git a/Godeps b/Godeps index 3c70bcaf8..c14abf0e0 100644 --- a/Godeps +++ b/Godeps @@ -32,7 +32,6 @@ github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0 github.com/kardianos/osext 29ae4ffbc9a6fe9fb2bc5029050ce6996ea1d3bc github.com/kardianos/service 5e335590050d6d00f3aa270217d288dda1c94d0a github.com/klauspost/crc32 19b0b332c9e4516a6370a0456e6182c3b5036720 -github.com/lib/pq e182dc4027e2ded4b19396d638610f2653295f36 github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3bb336a453 github.com/miekg/dns cce6c130cdb92c752850880fd285bea1d64439dd github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504 @@ -63,3 +62,4 @@ gopkg.in/dancannon/gorethink.v1 7d1af5be49cb5ecc7b177bf387d232050299d6ef gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 gopkg.in/mgo.v2 d90005c5262a3463800497ea5a89aed5fe22c886 gopkg.in/yaml.v2 a83829b6f1293c91addabc89d0571c246397bbf4 +github.com/jackc/pgx bb73d8427902891bbad7b949b9c60b32949d935f diff --git a/plugins/inputs/postgresql/postgresql.go b/plugins/inputs/postgresql/postgresql.go index da8ee8001..ed584b1f1 100644 --- a/plugins/inputs/postgresql/postgresql.go +++ b/plugins/inputs/postgresql/postgresql.go @@ -4,6 +4,8 @@ import ( "bytes" "database/sql" "fmt" + "net" + "net/url" "regexp" "sort" "strings" @@ -11,7 +13,8 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/lib/pq" + "github.com/jackc/pgx" + "github.com/jackc/pgx/stdlib" ) type Postgresql struct { @@ -22,7 +25,7 @@ type Postgresql struct { sanitizedAddress string } -var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true} +var ignoredColumns = map[string]bool{"stats_reset": true} var sampleConfig = ` ## specify address via a url matching: @@ -66,7 +69,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { p.Address = localhost } - db, err := sql.Open("postgres", p.Address) + db, err := connect(p.Address) if err != nil { return err } @@ -141,7 +144,7 @@ var passwordKVMatcher, _ = regexp.Compile("password=\\S+ ?") func (p *Postgresql) SanitizedAddress() (_ string, err error) { var canonicalizedAddress string if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") { - canonicalizedAddress, err = pq.ParseURL(p.Address) + canonicalizedAddress, err = parseURL(p.Address) if err != nil { return p.sanitizedAddress, err } @@ -177,10 +180,7 @@ func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error { } if columnMap["datname"] != nil { // extract the database name from the column map - dbnameChars := (*columnMap["datname"]).([]uint8) - for i := 0; i < len(dbnameChars); i++ { - dbname.WriteString(string(dbnameChars[i])) - } + dbname.WriteString((*columnMap["datname"]).(string)) } else { dbname.WriteString("postgres") } @@ -210,3 +210,89 @@ func init() { return &Postgresql{} }) } + +// pulled from lib/pq +// ParseURL no longer needs to be used by clients of this library since supplying a URL as a +// connection string to sql.Open() is now supported: +// +// sql.Open("postgres", "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full") +// +// It remains exported here for backwards-compatibility. +// +// ParseURL converts a url to a connection string for driver.Open. +// Example: +// +// "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full" +// +// converts to: +// +// "user=bob password=secret host=1.2.3.4 port=5432 dbname=mydb sslmode=verify-full" +// +// A minimal example: +// +// "postgres://" +// +// This will be blank, causing driver.Open to use all of the defaults +func parseURL(uri string) (string, error) { + u, err := url.Parse(uri) + if err != nil { + return "", err + } + + if u.Scheme != "postgres" && u.Scheme != "postgresql" { + return "", fmt.Errorf("invalid connection protocol: %s", u.Scheme) + } + + var kvs []string + escaper := strings.NewReplacer(` `, `\ `, `'`, `\'`, `\`, `\\`) + accrue := func(k, v string) { + if v != "" { + kvs = append(kvs, k+"="+escaper.Replace(v)) + } + } + + if u.User != nil { + v := u.User.Username() + accrue("user", v) + + v, _ = u.User.Password() + accrue("password", v) + } + + if host, port, err := net.SplitHostPort(u.Host); err != nil { + accrue("host", u.Host) + } else { + accrue("host", host) + accrue("port", port) + } + + if u.Path != "" { + accrue("dbname", u.Path[1:]) + } + + q := u.Query() + for k := range q { + accrue(k, q.Get(k)) + } + + sort.Strings(kvs) // Makes testing easier (not a performance concern) + return strings.Join(kvs, " "), nil +} + +func connect(address string) (*sql.DB, error) { + if strings.HasPrefix(address, "postgres://") || strings.HasPrefix(address, "postgresql://") { + return sql.Open("pgx", address) + } + + config, err := pgx.ParseDSN(address) + if err != nil { + return nil, err + } + + pool, err := pgx.NewConnPool(pgx.ConnPoolConfig{ConnConfig: config}) + if err != nil { + return nil, err + } + + return stdlib.OpenFromConnPool(pool) +} diff --git a/plugins/inputs/postgresql/postgresql_test.go b/plugins/inputs/postgresql/postgresql_test.go index 552b18cdb..889ac0440 100644 --- a/plugins/inputs/postgresql/postgresql_test.go +++ b/plugins/inputs/postgresql/postgresql_test.go @@ -28,6 +28,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { for _, col := range p.AllColumns { availableColumns[col] = true } + intMetrics := []string{ "xact_commit", "xact_rollback", @@ -42,7 +43,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "temp_files", "temp_bytes", "deadlocks", - "numbackends", "buffers_alloc", "buffers_backend", "buffers_backend_fsync", @@ -53,9 +53,20 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "maxwritten_clean", } + int32Metrics := []string{ + "numbackends", + } + floatMetrics := []string{ "blk_read_time", "blk_write_time", + "checkpoint_write_time", + "checkpoint_sync_time", + } + + stringMetrics := []string{ + "datname", + "datid", } metricsCounted := 0 @@ -68,6 +79,14 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { } } + for _, metric := range int32Metrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasInt32Field("postgresql", metric)) + metricsCounted++ + } + } + for _, metric := range floatMetrics { _, ok := availableColumns[metric] if ok { @@ -76,8 +95,16 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { } } + for _, metric := range stringMetrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasStringField("postgresql", metric)) + metricsCounted++ + } + } + assert.True(t, metricsCounted > 0) - //assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted) + assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted) } func TestPostgresqlTagsMetricsWithDatabaseName(t *testing.T) { diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go index ec281fca2..62d4ee143 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go @@ -5,13 +5,17 @@ import ( "database/sql" "fmt" "log" + "net" + "net/url" "regexp" + "sort" "strings" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/lib/pq" + "github.com/jackc/pgx" + "github.com/jackc/pgx/stdlib" ) type Postgresql struct { @@ -39,7 +43,7 @@ type query []struct { Measurement string } -var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true} +var ignoredColumns = map[string]bool{"stats_reset": true} var sampleConfig = ` ## specify address via a url matching: @@ -125,7 +129,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { p.Address = localhost } - db, err := sql.Open("postgres", p.Address) + db, err := connect(p.Address) if err != nil { return err } @@ -211,7 +215,7 @@ func (p *Postgresql) SanitizedAddress() (_ string, err error) { } var canonicalizedAddress string if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") { - canonicalizedAddress, err = pq.ParseURL(p.Address) + canonicalizedAddress, err = parseURL(p.Address) if err != nil { return p.sanitizedAddress, err } @@ -247,10 +251,7 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula } if columnMap["datname"] != nil { // extract the database name from the column map - dbnameChars := (*columnMap["datname"]).([]uint8) - for i := 0; i < len(dbnameChars); i++ { - dbname.WriteString(string(dbnameChars[i])) - } + dbname.WriteString((*columnMap["datname"]).(string)) } else { dbname.WriteString("postgres") } @@ -276,19 +277,23 @@ COLUMN: if ignore || *val == nil { continue } + for _, tag := range p.AdditionalTags { if col != tag { continue } switch v := (*val).(type) { + case string: + tags[col] = v case []byte: tags[col] = string(v) - case int64: + case int64, int32, int: tags[col] = fmt.Sprintf("%d", v) + default: + log.Println("failed to add additional tag", col) } continue COLUMN } - if v, ok := (*val).([]byte); ok { fields[col] = string(v) } else { @@ -304,3 +309,89 @@ func init() { return &Postgresql{} }) } + +// pulled from lib/pq +// ParseURL no longer needs to be used by clients of this library since supplying a URL as a +// connection string to sql.Open() is now supported: +// +// sql.Open("postgres", "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full") +// +// It remains exported here for backwards-compatibility. +// +// ParseURL converts a url to a connection string for driver.Open. +// Example: +// +// "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full" +// +// converts to: +// +// "user=bob password=secret host=1.2.3.4 port=5432 dbname=mydb sslmode=verify-full" +// +// A minimal example: +// +// "postgres://" +// +// This will be blank, causing driver.Open to use all of the defaults +func parseURL(uri string) (string, error) { + u, err := url.Parse(uri) + if err != nil { + return "", err + } + + if u.Scheme != "postgres" && u.Scheme != "postgresql" { + return "", fmt.Errorf("invalid connection protocol: %s", u.Scheme) + } + + var kvs []string + escaper := strings.NewReplacer(` `, `\ `, `'`, `\'`, `\`, `\\`) + accrue := func(k, v string) { + if v != "" { + kvs = append(kvs, k+"="+escaper.Replace(v)) + } + } + + if u.User != nil { + v := u.User.Username() + accrue("user", v) + + v, _ = u.User.Password() + accrue("password", v) + } + + if host, port, err := net.SplitHostPort(u.Host); err != nil { + accrue("host", u.Host) + } else { + accrue("host", host) + accrue("port", port) + } + + if u.Path != "" { + accrue("dbname", u.Path[1:]) + } + + q := u.Query() + for k := range q { + accrue(k, q.Get(k)) + } + + sort.Strings(kvs) // Makes testing easier (not a performance concern) + return strings.Join(kvs, " "), nil +} + +func connect(address string) (*sql.DB, error) { + if strings.HasPrefix(address, "postgres://") || strings.HasPrefix(address, "postgresql://") { + return sql.Open("pgx", address) + } + + config, err := pgx.ParseDSN(address) + if err != nil { + return nil, err + } + + pool, err := pgx.NewConnPool(pgx.ConnPoolConfig{ConnConfig: config}) + if err != nil { + return nil, err + } + + return stdlib.OpenFromConnPool(pool) +} diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go index 7fd907102..f92284ee4 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go @@ -33,6 +33,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { for _, col := range p.AllColumns { availableColumns[col] = true } + intMetrics := []string{ "xact_commit", "xact_rollback", @@ -47,6 +48,9 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "temp_files", "temp_bytes", "deadlocks", + } + + int32Metrics := []string{ "numbackends", } @@ -55,6 +59,11 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "blk_write_time", } + stringMetrics := []string{ + "datname", + "datid", + } + metricsCounted := 0 for _, metric := range intMetrics { @@ -65,6 +74,14 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { } } + for _, metric := range int32Metrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasInt32Field("postgresql", metric)) + metricsCounted++ + } + } + for _, metric := range floatMetrics { _, ok := availableColumns[metric] if ok { @@ -73,6 +90,14 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { } } + for _, metric := range stringMetrics { + _, ok := availableColumns[metric] + if ok { + assert.True(t, acc.HasStringField("postgresql", metric)) + metricsCounted++ + } + } + assert.True(t, metricsCounted > 0) assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted) } diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 62b765a3c..5c2e29102 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -188,7 +188,7 @@ func (a *Accumulator) AssertContainsFields( assert.Fail(t, msg) } -// HasIntValue returns true if the measurement has an Int value +// HasIntField returns true if the measurement has an Int value func (a *Accumulator) HasIntField(measurement string, field string) bool { a.Lock() defer a.Unlock() @@ -206,6 +206,42 @@ func (a *Accumulator) HasIntField(measurement string, field string) bool { return false } +// HasInt32Field returns true if the measurement has an Int value +func (a *Accumulator) HasInt32Field(measurement string, field string) bool { + a.Lock() + defer a.Unlock() + for _, p := range a.Metrics { + if p.Measurement == measurement { + for fieldname, value := range p.Fields { + if fieldname == field { + _, ok := value.(int32) + return ok + } + } + } + } + + return false +} + +// HasStringField returns true if the measurement has an Int value +func (a *Accumulator) HasStringField(measurement string, field string) bool { + a.Lock() + defer a.Unlock() + for _, p := range a.Metrics { + if p.Measurement == measurement { + for fieldname, value := range p.Fields { + if fieldname == field { + _, ok := value.(string) + return ok + } + } + } + } + + return false +} + // HasUIntValue returns true if the measurement has a UInt value func (a *Accumulator) HasUIntField(measurement string, field string) bool { a.Lock()