From 4d0784a64d95b2ca69b50efba7554f3b89a3947e Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Mon, 18 May 2015 10:46:44 -0700 Subject: [PATCH] Add postgresql plugin --- plugins/postgresql/postgresql.go | 119 ++++++++++++++++++++++++++ plugins/postgresql/postgresql_test.go | 103 ++++++++++++++++++++++ testutil/accumulator.go | 32 +++++++ 3 files changed, 254 insertions(+) create mode 100644 plugins/postgresql/postgresql.go create mode 100644 plugins/postgresql/postgresql_test.go diff --git a/plugins/postgresql/postgresql.go b/plugins/postgresql/postgresql.go new file mode 100644 index 000000000..9416dbd61 --- /dev/null +++ b/plugins/postgresql/postgresql.go @@ -0,0 +1,119 @@ +package postgresql + +import ( + "database/sql" + + "github.com/influxdb/tivan/plugins" + + _ "github.com/lib/pq" +) + +type Server struct { + Address string + Databases []string +} + +type Postgresql struct { + Servers []*Server +} + +func (p *Postgresql) Gather(acc plugins.Accumulator) error { + for _, serv := range p.Servers { + err := p.gatherServer(serv, acc) + if err != nil { + return err + } + } + + return nil +} + +func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error { + db, err := sql.Open("postgres", serv.Address) + if err != nil { + return err + } + + defer db.Close() + + if len(serv.Databases) == 0 { + rows, err := db.Query(`SELECT * FROM pg_stat_database`) + if err != nil { + return err + } + + defer rows.Close() + + for rows.Next() { + err := p.accRow(rows, acc) + if err != nil { + return err + } + } + + return rows.Err() + } else { + for _, name := range serv.Databases { + row := db.QueryRow(`SELECT * FROM pg_stat_database WHERE datname=$1`, name) + + err := p.accRow(row, acc) + if err != nil { + return err + } + } + } + + return nil +} + +type scanner interface { + Scan(dest ...interface{}) error +} + +func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator) error { + var ignore interface{} + var name string + var commit, rollback, read, hit int64 + var returned, fetched, inserted, updated, deleted int64 + var conflicts, temp_files, temp_bytes, deadlocks int64 + var read_time, write_time float64 + + err := row.Scan(&ignore, &name, &ignore, + &commit, &rollback, + &read, &hit, + &returned, &fetched, &inserted, &updated, &deleted, + &conflicts, &temp_files, &temp_bytes, + &deadlocks, &read_time, &write_time, + &ignore, + ) + + if err != nil { + return err + } + + tags := map[string]string{"db": name} + + acc.Add("postgresql_xact_commit", commit, tags) + acc.Add("postgresql_xact_rollback", rollback, tags) + acc.Add("postgresql_blks_read", read, tags) + acc.Add("postgresql_blks_hit", hit, tags) + acc.Add("postgresql_tup_returned", returned, tags) + acc.Add("postgresql_tup_fetched", fetched, tags) + acc.Add("postgresql_tup_inserted", inserted, tags) + acc.Add("postgresql_tup_updated", updated, tags) + acc.Add("postgresql_tup_deleted", deleted, tags) + acc.Add("postgresql_conflicts", conflicts, tags) + acc.Add("postgresql_temp_files", temp_files, tags) + acc.Add("postgresql_temp_bytes", temp_bytes, tags) + acc.Add("postgresql_deadlocks", deadlocks, tags) + acc.Add("postgresql_blk_read_time", read_time, tags) + acc.Add("postgresql_blk_write_time", read_time, tags) + + return nil +} + +func init() { + plugins.Add("postgresql", func() plugins.Plugin { + return &Postgresql{} + }) +} diff --git a/plugins/postgresql/postgresql_test.go b/plugins/postgresql/postgresql_test.go new file mode 100644 index 000000000..e527dbb3a --- /dev/null +++ b/plugins/postgresql/postgresql_test.go @@ -0,0 +1,103 @@ +package postgresql + +import ( + "testing" + + "github.com/influxdb/tivan/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPostgresqlGeneratesMetrics(t *testing.T) { + p := &Postgresql{ + Servers: []*Server{ + { + Address: "sslmode=disable", + Databases: []string{"postgres"}, + }, + }, + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + intMetrics := []string{ + "postgresql_xact_commit", + "postgresql_xact_rollback", + "postgresql_blks_read", + "postgresql_blks_hit", + "postgresql_tup_returned", + "postgresql_tup_fetched", + "postgresql_tup_inserted", + "postgresql_tup_updated", + "postgresql_tup_deleted", + "postgresql_conflicts", + "postgresql_temp_files", + "postgresql_temp_bytes", + "postgresql_deadlocks", + } + + floatMetrics := []string{ + "postgresql_blk_read_time", + "postgresql_blk_write_time", + } + + for _, metric := range intMetrics { + assert.True(t, acc.HasIntValue(metric)) + } + + for _, metric := range floatMetrics { + assert.True(t, acc.HasFloatValue(metric)) + } +} + +func TestPostgresqlTagsMetricsWithDatabaseName(t *testing.T) { + p := &Postgresql{ + Servers: []*Server{ + { + Address: "sslmode=disable", + Databases: []string{"postgres"}, + }, + }, + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + point, ok := acc.Get("postgresql_xact_commit") + require.True(t, ok) + + assert.Equal(t, "postgres", point.Tags["db"]) +} + +func TestPostgresqlDefaultsToAllDatabases(t *testing.T) { + p := &Postgresql{ + Servers: []*Server{ + { + Address: "sslmode=disable", + }, + }, + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + var found bool + + for _, pnt := range acc.Points { + if pnt.Name == "postgresql_xact_commit" { + if pnt.Tags["db"] == "postgres" { + found = true + break + } + } + } + + assert.True(t, found) +} diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 98fcaca44..2ac4c4849 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -16,6 +16,16 @@ func (a *Accumulator) Add(name string, value interface{}, tags map[string]string a.Points = append(a.Points, &Point{name, value, tags}) } +func (a *Accumulator) Get(name string) (*Point, bool) { + for _, p := range a.Points { + if p.Name == name { + return p, true + } + } + + return nil, false +} + func (a *Accumulator) CheckValue(name string, val interface{}) bool { for _, p := range a.Points { if p.Name == name { @@ -60,3 +70,25 @@ func (a *Accumulator) ValidateTaggedValue(name string, val interface{}, tags map func (a *Accumulator) ValidateValue(name string, val interface{}) error { return a.ValidateTaggedValue(name, val, nil) } + +func (a *Accumulator) HasIntValue(name string) bool { + for _, p := range a.Points { + if p.Name == name { + _, ok := p.Value.(int64) + return ok + } + } + + return false +} + +func (a *Accumulator) HasFloatValue(name string) bool { + for _, p := range a.Points { + if p.Name == name { + _, ok := p.Value.(float64) + return ok + } + } + + return false +}