fix postgresql 'name', and 'oid' data types by switching to a driver (#1750)
that handles them properly
This commit is contained in:
@@ -4,8 +4,8 @@ This postgresql plugin provides metrics for your postgres database. It currently
|
||||
```
|
||||
pg version 9.2+ 9.1 8.3-9.0 8.1-8.2 7.4-8.0(unsupported)
|
||||
--- --- --- ------- ------- -------
|
||||
datid* x x x x
|
||||
datname* x x x x
|
||||
datid x x x x
|
||||
datname x x x x
|
||||
numbackends x x x x x
|
||||
xact_commit x x x x x
|
||||
xact_rollback x x x x x
|
||||
|
||||
99
plugins/inputs/postgresql/connect.go
Normal file
99
plugins/inputs/postgresql/connect.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package postgresql
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/jackc/pgx"
|
||||
"github.com/jackc/pgx/stdlib"
|
||||
)
|
||||
|
||||
// pulled from lib/pq
|
||||
// ParseURL no longer needs to be used by clients of this library since supplying a URL as a
|
||||
// connection string to sql.Open() is now supported:
|
||||
//
|
||||
// sql.Open("postgres", "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full")
|
||||
//
|
||||
// It remains exported here for backwards-compatibility.
|
||||
//
|
||||
// ParseURL converts a url to a connection string for driver.Open.
|
||||
// Example:
|
||||
//
|
||||
// "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full"
|
||||
//
|
||||
// converts to:
|
||||
//
|
||||
// "user=bob password=secret host=1.2.3.4 port=5432 dbname=mydb sslmode=verify-full"
|
||||
//
|
||||
// A minimal example:
|
||||
//
|
||||
// "postgres://"
|
||||
//
|
||||
// This will be blank, causing driver.Open to use all of the defaults
|
||||
func ParseURL(uri string) (string, error) {
|
||||
u, err := url.Parse(uri)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if u.Scheme != "postgres" && u.Scheme != "postgresql" {
|
||||
return "", fmt.Errorf("invalid connection protocol: %s", u.Scheme)
|
||||
}
|
||||
|
||||
var kvs []string
|
||||
escaper := strings.NewReplacer(` `, `\ `, `'`, `\'`, `\`, `\\`)
|
||||
accrue := func(k, v string) {
|
||||
if v != "" {
|
||||
kvs = append(kvs, k+"="+escaper.Replace(v))
|
||||
}
|
||||
}
|
||||
|
||||
if u.User != nil {
|
||||
v := u.User.Username()
|
||||
accrue("user", v)
|
||||
|
||||
v, _ = u.User.Password()
|
||||
accrue("password", v)
|
||||
}
|
||||
|
||||
if host, port, err := net.SplitHostPort(u.Host); err != nil {
|
||||
accrue("host", u.Host)
|
||||
} else {
|
||||
accrue("host", host)
|
||||
accrue("port", port)
|
||||
}
|
||||
|
||||
if u.Path != "" {
|
||||
accrue("dbname", u.Path[1:])
|
||||
}
|
||||
|
||||
q := u.Query()
|
||||
for k := range q {
|
||||
accrue(k, q.Get(k))
|
||||
}
|
||||
|
||||
sort.Strings(kvs) // Makes testing easier (not a performance concern)
|
||||
return strings.Join(kvs, " "), nil
|
||||
}
|
||||
|
||||
func Connect(address string) (*sql.DB, error) {
|
||||
if strings.HasPrefix(address, "postgres://") || strings.HasPrefix(address, "postgresql://") {
|
||||
return sql.Open("pgx", address)
|
||||
}
|
||||
|
||||
config, err := pgx.ParseDSN(address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pool, err := pgx.NewConnPool(pgx.ConnPoolConfig{ConnConfig: config})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return stdlib.OpenFromConnPool(pool)
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package postgresql
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sort"
|
||||
@@ -10,8 +9,6 @@ import (
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
|
||||
"github.com/lib/pq"
|
||||
)
|
||||
|
||||
type Postgresql struct {
|
||||
@@ -23,7 +20,7 @@ type Postgresql struct {
|
||||
sanitizedAddress string
|
||||
}
|
||||
|
||||
var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true}
|
||||
var ignoredColumns = map[string]bool{"stats_reset": true}
|
||||
|
||||
var sampleConfig = `
|
||||
## specify address via a url matching:
|
||||
@@ -71,7 +68,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
|
||||
p.Address = localhost
|
||||
}
|
||||
|
||||
db, err := sql.Open("postgres", p.Address)
|
||||
db, err := Connect(p.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -149,7 +146,7 @@ var passwordKVMatcher, _ = regexp.Compile("password=\\S+ ?")
|
||||
func (p *Postgresql) SanitizedAddress() (_ string, err error) {
|
||||
var canonicalizedAddress string
|
||||
if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") {
|
||||
canonicalizedAddress, err = pq.ParseURL(p.Address)
|
||||
canonicalizedAddress, err = ParseURL(p.Address)
|
||||
if err != nil {
|
||||
return p.sanitizedAddress, err
|
||||
}
|
||||
@@ -185,10 +182,7 @@ func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error {
|
||||
}
|
||||
if columnMap["datname"] != nil {
|
||||
// extract the database name from the column map
|
||||
dbnameChars := (*columnMap["datname"]).([]uint8)
|
||||
for i := 0; i < len(dbnameChars); i++ {
|
||||
dbname.WriteString(string(dbnameChars[i]))
|
||||
}
|
||||
dbname.WriteString((*columnMap["datname"]).(string))
|
||||
} else {
|
||||
dbname.WriteString("postgres")
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||
for _, col := range p.AllColumns {
|
||||
availableColumns[col] = true
|
||||
}
|
||||
|
||||
intMetrics := []string{
|
||||
"xact_commit",
|
||||
"xact_rollback",
|
||||
@@ -42,7 +43,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||
"temp_files",
|
||||
"temp_bytes",
|
||||
"deadlocks",
|
||||
"numbackends",
|
||||
"buffers_alloc",
|
||||
"buffers_backend",
|
||||
"buffers_backend_fsync",
|
||||
@@ -53,9 +53,20 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||
"maxwritten_clean",
|
||||
}
|
||||
|
||||
int32Metrics := []string{
|
||||
"numbackends",
|
||||
}
|
||||
|
||||
floatMetrics := []string{
|
||||
"blk_read_time",
|
||||
"blk_write_time",
|
||||
"checkpoint_write_time",
|
||||
"checkpoint_sync_time",
|
||||
}
|
||||
|
||||
stringMetrics := []string{
|
||||
"datname",
|
||||
"datid",
|
||||
}
|
||||
|
||||
metricsCounted := 0
|
||||
@@ -68,6 +79,14 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
for _, metric := range int32Metrics {
|
||||
_, ok := availableColumns[metric]
|
||||
if ok {
|
||||
assert.True(t, acc.HasInt32Field("postgresql", metric))
|
||||
metricsCounted++
|
||||
}
|
||||
}
|
||||
|
||||
for _, metric := range floatMetrics {
|
||||
_, ok := availableColumns[metric]
|
||||
if ok {
|
||||
@@ -76,8 +95,16 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
for _, metric := range stringMetrics {
|
||||
_, ok := availableColumns[metric]
|
||||
if ok {
|
||||
assert.True(t, acc.HasStringField("postgresql", metric))
|
||||
metricsCounted++
|
||||
}
|
||||
}
|
||||
|
||||
assert.True(t, metricsCounted > 0)
|
||||
//assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted)
|
||||
assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted)
|
||||
}
|
||||
|
||||
func TestPostgresqlTagsMetricsWithDatabaseName(t *testing.T) {
|
||||
|
||||
@@ -2,7 +2,6 @@ package postgresql_extensible
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"regexp"
|
||||
@@ -10,8 +9,7 @@ import (
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/postgresql"
|
||||
)
|
||||
|
||||
type Postgresql struct {
|
||||
@@ -40,7 +38,7 @@ type query []struct {
|
||||
Measurement string
|
||||
}
|
||||
|
||||
var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true}
|
||||
var ignoredColumns = map[string]bool{"stats_reset": true}
|
||||
|
||||
var sampleConfig = `
|
||||
## specify address via a url matching:
|
||||
@@ -126,7 +124,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
|
||||
p.Address = localhost
|
||||
}
|
||||
|
||||
db, err := sql.Open("postgres", p.Address)
|
||||
db, err := postgresql.Connect(p.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -212,7 +210,7 @@ func (p *Postgresql) SanitizedAddress() (_ string, err error) {
|
||||
}
|
||||
var canonicalizedAddress string
|
||||
if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") {
|
||||
canonicalizedAddress, err = pq.ParseURL(p.Address)
|
||||
canonicalizedAddress, err = postgresql.ParseURL(p.Address)
|
||||
if err != nil {
|
||||
return p.sanitizedAddress, err
|
||||
}
|
||||
@@ -248,10 +246,7 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula
|
||||
}
|
||||
if columnMap["datname"] != nil {
|
||||
// extract the database name from the column map
|
||||
dbnameChars := (*columnMap["datname"]).([]uint8)
|
||||
for i := 0; i < len(dbnameChars); i++ {
|
||||
dbname.WriteString(string(dbnameChars[i]))
|
||||
}
|
||||
dbname.WriteString((*columnMap["datname"]).(string))
|
||||
} else {
|
||||
dbname.WriteString("postgres")
|
||||
}
|
||||
@@ -275,19 +270,23 @@ COLUMN:
|
||||
if ignore || *val == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, tag := range p.AdditionalTags {
|
||||
if col != tag {
|
||||
continue
|
||||
}
|
||||
switch v := (*val).(type) {
|
||||
case string:
|
||||
tags[col] = v
|
||||
case []byte:
|
||||
tags[col] = string(v)
|
||||
case int64:
|
||||
case int64, int32, int:
|
||||
tags[col] = fmt.Sprintf("%d", v)
|
||||
default:
|
||||
log.Println("failed to add additional tag", col)
|
||||
}
|
||||
continue COLUMN
|
||||
}
|
||||
|
||||
if v, ok := (*val).([]byte); ok {
|
||||
fields[col] = string(v)
|
||||
} else {
|
||||
|
||||
@@ -33,6 +33,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||
for _, col := range p.AllColumns {
|
||||
availableColumns[col] = true
|
||||
}
|
||||
|
||||
intMetrics := []string{
|
||||
"xact_commit",
|
||||
"xact_rollback",
|
||||
@@ -47,6 +48,9 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||
"temp_files",
|
||||
"temp_bytes",
|
||||
"deadlocks",
|
||||
}
|
||||
|
||||
int32Metrics := []string{
|
||||
"numbackends",
|
||||
}
|
||||
|
||||
@@ -55,6 +59,11 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||
"blk_write_time",
|
||||
}
|
||||
|
||||
stringMetrics := []string{
|
||||
"datname",
|
||||
"datid",
|
||||
}
|
||||
|
||||
metricsCounted := 0
|
||||
|
||||
for _, metric := range intMetrics {
|
||||
@@ -65,6 +74,14 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
for _, metric := range int32Metrics {
|
||||
_, ok := availableColumns[metric]
|
||||
if ok {
|
||||
assert.True(t, acc.HasInt32Field("postgresql", metric))
|
||||
metricsCounted++
|
||||
}
|
||||
}
|
||||
|
||||
for _, metric := range floatMetrics {
|
||||
_, ok := availableColumns[metric]
|
||||
if ok {
|
||||
@@ -73,6 +90,14 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
for _, metric := range stringMetrics {
|
||||
_, ok := availableColumns[metric]
|
||||
if ok {
|
||||
assert.True(t, acc.HasStringField("postgresql", metric))
|
||||
metricsCounted++
|
||||
}
|
||||
}
|
||||
|
||||
assert.True(t, metricsCounted > 0)
|
||||
assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user