Use persistent connection to postgresql database (#2701)

This commit is contained in:
James
2018-01-05 19:03:09 -05:00
committed by Daniel Nelson
parent b9286bbe23
commit ab4f3176bb
7 changed files with 298 additions and 281 deletions

View File

@@ -2,26 +2,21 @@ package postgresql
import (
"bytes"
"database/sql"
"fmt"
"regexp"
"sort"
"strings"
// register in driver.
_ "github.com/jackc/pgx/stdlib"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)
type Postgresql struct {
Address string
Service
Databases []string
IgnoredDatabases []string
OrderedColumns []string
AllColumns []string
sanitizedAddress string
}
var ignoredColumns = map[string]bool{"stats_reset": true}
@@ -41,6 +36,15 @@ var sampleConfig = `
## to grab metrics for.
##
address = "host=localhost user=postgres sslmode=disable"
## A custom name for the database that will be used as the "server" tag in the
## measurement output. If not specified, a default one generated from
## the connection address is used.
# outputaddress = "db01"
## connection configuration.
## maxlifetime - specify the maximum lifetime of a connection.
## default is forever (0s)
max_lifetime = "0s"
## A list of databases to explicitly ignore. If not specified, metrics for all
## databases are gathered. Do NOT use with the 'databases' option.
@@ -63,24 +67,13 @@ func (p *Postgresql) IgnoredColumns() map[string]bool {
return ignoredColumns
}
var localhost = "host=localhost sslmode=disable"
func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
var (
err error
db *sql.DB
query string
err error
query string
columns []string
)
if p.Address == "" || p.Address == "localhost" {
p.Address = localhost
}
if db, err = sql.Open("pgx", p.Address); err != nil {
return err
}
defer db.Close()
if len(p.Databases) == 0 && len(p.IgnoredDatabases) == 0 {
query = `SELECT * FROM pg_stat_database`
} else if len(p.IgnoredDatabases) != 0 {
@@ -91,7 +84,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
strings.Join(p.Databases, "','"))
}
rows, err := db.Query(query)
rows, err := p.DB.Query(query)
if err != nil {
return err
}
@@ -99,16 +92,12 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
defer rows.Close()
// grab the column information from the result
p.OrderedColumns, err = rows.Columns()
if err != nil {
if columns, err = rows.Columns(); err != nil {
return err
} else {
p.AllColumns = make([]string, len(p.OrderedColumns))
copy(p.AllColumns, p.OrderedColumns)
}
for rows.Next() {
err = p.accRow(rows, acc)
err = p.accRow(rows, acc, columns)
if err != nil {
return err
}
@@ -116,7 +105,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
query = `SELECT * FROM pg_stat_bgwriter`
bg_writer_row, err := db.Query(query)
bg_writer_row, err := p.DB.Query(query)
if err != nil {
return err
}
@@ -124,22 +113,17 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
defer bg_writer_row.Close()
// grab the column information from the result
p.OrderedColumns, err = bg_writer_row.Columns()
if err != nil {
if columns, err = bg_writer_row.Columns(); err != nil {
return err
} else {
for _, v := range p.OrderedColumns {
p.AllColumns = append(p.AllColumns, v)
}
}
for bg_writer_row.Next() {
err = p.accRow(bg_writer_row, acc)
err = p.accRow(bg_writer_row, acc, columns)
if err != nil {
return err
}
}
sort.Strings(p.AllColumns)
return bg_writer_row.Err()
}
@@ -147,37 +131,20 @@ type scanner interface {
Scan(dest ...interface{}) error
}
var passwordKVMatcher, _ = regexp.Compile("password=\\S+ ?")
func (p *Postgresql) SanitizedAddress() (_ string, err error) {
var canonicalizedAddress string
if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") {
canonicalizedAddress, err = ParseURL(p.Address)
if err != nil {
return p.sanitizedAddress, err
}
} else {
canonicalizedAddress = p.Address
}
p.sanitizedAddress = passwordKVMatcher.ReplaceAllString(canonicalizedAddress, "")
return p.sanitizedAddress, err
}
func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error {
func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator, columns []string) error {
var columnVars []interface{}
var dbname bytes.Buffer
// this is where we'll store the column name with its *interface{}
columnMap := make(map[string]*interface{})
for _, column := range p.OrderedColumns {
for _, column := range columns {
columnMap[column] = new(interface{})
}
// populate the array of interface{} with the pointers in the right order
for i := 0; i < len(columnMap); i++ {
columnVars = append(columnVars, columnMap[p.OrderedColumns[i]])
columnVars = append(columnVars, columnMap[columns[i]])
}
// deconstruct array of variables and send to Scan
@@ -215,6 +182,14 @@ func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error {
func init() {
inputs.Add("postgresql", func() telegraf.Input {
return &Postgresql{}
return &Postgresql{
Service: Service{
MaxIdle: 1,
MaxOpen: 1,
MaxLifetime: internal.Duration{
Duration: 0,
},
},
}
})
}