Add postgresql plugin
This commit is contained in:
parent
5b9f7e7bf3
commit
4d0784a64d
|
@ -0,0 +1,119 @@
|
|||
package postgresql
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/influxdb/tivan/plugins"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
Address string
|
||||
Databases []string
|
||||
}
|
||||
|
||||
type Postgresql struct {
|
||||
Servers []*Server
|
||||
}
|
||||
|
||||
func (p *Postgresql) Gather(acc plugins.Accumulator) error {
|
||||
for _, serv := range p.Servers {
|
||||
err := p.gatherServer(serv, acc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error {
|
||||
db, err := sql.Open("postgres", serv.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer db.Close()
|
||||
|
||||
if len(serv.Databases) == 0 {
|
||||
rows, err := db.Query(`SELECT * FROM pg_stat_database`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
err := p.accRow(rows, acc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return rows.Err()
|
||||
} else {
|
||||
for _, name := range serv.Databases {
|
||||
row := db.QueryRow(`SELECT * FROM pg_stat_database WHERE datname=$1`, name)
|
||||
|
||||
err := p.accRow(row, acc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type scanner interface {
|
||||
Scan(dest ...interface{}) error
|
||||
}
|
||||
|
||||
func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator) error {
|
||||
var ignore interface{}
|
||||
var name string
|
||||
var commit, rollback, read, hit int64
|
||||
var returned, fetched, inserted, updated, deleted int64
|
||||
var conflicts, temp_files, temp_bytes, deadlocks int64
|
||||
var read_time, write_time float64
|
||||
|
||||
err := row.Scan(&ignore, &name, &ignore,
|
||||
&commit, &rollback,
|
||||
&read, &hit,
|
||||
&returned, &fetched, &inserted, &updated, &deleted,
|
||||
&conflicts, &temp_files, &temp_bytes,
|
||||
&deadlocks, &read_time, &write_time,
|
||||
&ignore,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tags := map[string]string{"db": name}
|
||||
|
||||
acc.Add("postgresql_xact_commit", commit, tags)
|
||||
acc.Add("postgresql_xact_rollback", rollback, tags)
|
||||
acc.Add("postgresql_blks_read", read, tags)
|
||||
acc.Add("postgresql_blks_hit", hit, tags)
|
||||
acc.Add("postgresql_tup_returned", returned, tags)
|
||||
acc.Add("postgresql_tup_fetched", fetched, tags)
|
||||
acc.Add("postgresql_tup_inserted", inserted, tags)
|
||||
acc.Add("postgresql_tup_updated", updated, tags)
|
||||
acc.Add("postgresql_tup_deleted", deleted, tags)
|
||||
acc.Add("postgresql_conflicts", conflicts, tags)
|
||||
acc.Add("postgresql_temp_files", temp_files, tags)
|
||||
acc.Add("postgresql_temp_bytes", temp_bytes, tags)
|
||||
acc.Add("postgresql_deadlocks", deadlocks, tags)
|
||||
acc.Add("postgresql_blk_read_time", read_time, tags)
|
||||
acc.Add("postgresql_blk_write_time", read_time, tags)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
plugins.Add("postgresql", func() plugins.Plugin {
|
||||
return &Postgresql{}
|
||||
})
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
package postgresql
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/tivan/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||
p := &Postgresql{
|
||||
Servers: []*Server{
|
||||
{
|
||||
Address: "sslmode=disable",
|
||||
Databases: []string{"postgres"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := p.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
intMetrics := []string{
|
||||
"postgresql_xact_commit",
|
||||
"postgresql_xact_rollback",
|
||||
"postgresql_blks_read",
|
||||
"postgresql_blks_hit",
|
||||
"postgresql_tup_returned",
|
||||
"postgresql_tup_fetched",
|
||||
"postgresql_tup_inserted",
|
||||
"postgresql_tup_updated",
|
||||
"postgresql_tup_deleted",
|
||||
"postgresql_conflicts",
|
||||
"postgresql_temp_files",
|
||||
"postgresql_temp_bytes",
|
||||
"postgresql_deadlocks",
|
||||
}
|
||||
|
||||
floatMetrics := []string{
|
||||
"postgresql_blk_read_time",
|
||||
"postgresql_blk_write_time",
|
||||
}
|
||||
|
||||
for _, metric := range intMetrics {
|
||||
assert.True(t, acc.HasIntValue(metric))
|
||||
}
|
||||
|
||||
for _, metric := range floatMetrics {
|
||||
assert.True(t, acc.HasFloatValue(metric))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPostgresqlTagsMetricsWithDatabaseName(t *testing.T) {
|
||||
p := &Postgresql{
|
||||
Servers: []*Server{
|
||||
{
|
||||
Address: "sslmode=disable",
|
||||
Databases: []string{"postgres"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := p.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
point, ok := acc.Get("postgresql_xact_commit")
|
||||
require.True(t, ok)
|
||||
|
||||
assert.Equal(t, "postgres", point.Tags["db"])
|
||||
}
|
||||
|
||||
func TestPostgresqlDefaultsToAllDatabases(t *testing.T) {
|
||||
p := &Postgresql{
|
||||
Servers: []*Server{
|
||||
{
|
||||
Address: "sslmode=disable",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
err := p.Gather(&acc)
|
||||
require.NoError(t, err)
|
||||
|
||||
var found bool
|
||||
|
||||
for _, pnt := range acc.Points {
|
||||
if pnt.Name == "postgresql_xact_commit" {
|
||||
if pnt.Tags["db"] == "postgres" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert.True(t, found)
|
||||
}
|
|
@ -16,6 +16,16 @@ func (a *Accumulator) Add(name string, value interface{}, tags map[string]string
|
|||
a.Points = append(a.Points, &Point{name, value, tags})
|
||||
}
|
||||
|
||||
func (a *Accumulator) Get(name string) (*Point, bool) {
|
||||
for _, p := range a.Points {
|
||||
if p.Name == name {
|
||||
return p, true
|
||||
}
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (a *Accumulator) CheckValue(name string, val interface{}) bool {
|
||||
for _, p := range a.Points {
|
||||
if p.Name == name {
|
||||
|
@ -60,3 +70,25 @@ func (a *Accumulator) ValidateTaggedValue(name string, val interface{}, tags map
|
|||
func (a *Accumulator) ValidateValue(name string, val interface{}) error {
|
||||
return a.ValidateTaggedValue(name, val, nil)
|
||||
}
|
||||
|
||||
func (a *Accumulator) HasIntValue(name string) bool {
|
||||
for _, p := range a.Points {
|
||||
if p.Name == name {
|
||||
_, ok := p.Value.(int64)
|
||||
return ok
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (a *Accumulator) HasFloatValue(name string) bool {
|
||||
for _, p := range a.Points {
|
||||
if p.Name == name {
|
||||
_, ok := p.Value.(float64)
|
||||
return ok
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue