diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go index 1536f115c..07a782f89 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go @@ -231,8 +231,12 @@ func (p *Postgresql) SanitizedAddress() (_ string, err error) { } func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumulator) error { - var columnVars []interface{} - var dbname bytes.Buffer + var ( + err error + columnVars []interface{} + dbname bytes.Buffer + tagAddress string + ) // this is where we'll store the column name with its *interface{} columnMap := make(map[string]*interface{}) @@ -247,11 +251,10 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula } // deconstruct array of variables and send to Scan - err := row.Scan(columnVars...) - - if err != nil { + if err = row.Scan(columnVars...); err != nil { return err } + if columnMap["datname"] != nil { // extract the database name from the column map dbname.WriteString((*columnMap["datname"]).(string)) @@ -259,17 +262,16 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula dbname.WriteString("postgres") } - var tagAddress string - tagAddress, err = p.SanitizedAddress() - if err != nil { + if tagAddress, err = p.SanitizedAddress(); err != nil { return err } // Process the additional tags + tags := map[string]string{ + "server": tagAddress, + "db": dbname.String(), + } - tags := map[string]string{} - tags["server"] = tagAddress - tags["db"] = dbname.String() fields := make(map[string]interface{}) COLUMN: for col, val := range columnMap { @@ -295,6 +297,7 @@ COLUMN: } continue COLUMN } + if v, ok := (*val).([]byte); ok { fields[col] = string(v) } else { diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go index 1c59bffaa..466cdfd98 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go @@ -9,25 +9,30 @@ import ( "github.com/stretchr/testify/require" ) +func queryRunner(t *testing.T, q query) (*Postgresql, *testutil.Accumulator) { + p := &Postgresql{ + Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", + testutil.GetLocalHost()), + Databases: []string{"postgres"}, + Query: q, + } + var acc testutil.Accumulator + + require.NoError(t, acc.GatherError(p.Gather)) + return p, &acc +} + func TestPostgresqlGeneratesMetrics(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } - p := &Postgresql{ - Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", - testutil.GetLocalHost()), - Databases: []string{"postgres"}, - Query: query{ - {Sqlquery: "select * from pg_stat_database", - Version: 901, - Withdbname: false, - Tagvalue: ""}, - }, - } - var acc testutil.Accumulator - err := acc.GatherError(p.Gather) - require.NoError(t, err) + p, acc := queryRunner(t, query{{ + Sqlquery: "select * from pg_stat_database", + Version: 901, + Withdbname: false, + Tagvalue: "", + }}) availableColumns := make(map[string]bool) for _, col := range p.AllColumns { @@ -102,6 +107,111 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted) } +func TestPostgresqlQueryOutputTests(t *testing.T) { + const measurement = "postgresql" + + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + examples := map[string]func(*testutil.Accumulator){ + "SELECT 10.0::float AS myvalue": func(acc *testutil.Accumulator) { + v, found := acc.FloatField(measurement, "myvalue") + assert.True(t, found) + assert.Equal(t, 10.0, v) + }, + "SELECT 10.0 AS myvalue": func(acc *testutil.Accumulator) { + v, found := acc.StringField(measurement, "myvalue") + assert.True(t, found) + assert.Equal(t, "10.0", v) + }, + "SELECT 'hello world' AS myvalue": func(acc *testutil.Accumulator) { + v, found := acc.StringField(measurement, "myvalue") + assert.True(t, found) + assert.Equal(t, "hello world", v) + }, + "SELECT true AS myvalue": func(acc *testutil.Accumulator) { + v, found := acc.BoolField(measurement, "myvalue") + assert.True(t, found) + assert.Equal(t, true, v) + }, + } + + for q, assertions := range examples { + _, acc := queryRunner(t, query{{ + Sqlquery: q, + Version: 901, + Withdbname: false, + Tagvalue: "", + }}) + assertions(acc) + } +} + +func TestPostgresqlFieldOutput(t *testing.T) { + const measurement = "postgresql" + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + _, acc := queryRunner(t, query{{ + Sqlquery: "select * from pg_stat_database", + Version: 901, + Withdbname: false, + Tagvalue: "", + }}) + + intMetrics := []string{ + "xact_commit", + "xact_rollback", + "blks_read", + "blks_hit", + "tup_returned", + "tup_fetched", + "tup_inserted", + "tup_updated", + "tup_deleted", + "conflicts", + "temp_files", + "temp_bytes", + "deadlocks", + } + + int32Metrics := []string{ + "numbackends", + } + + floatMetrics := []string{ + "blk_read_time", + "blk_write_time", + } + + stringMetrics := []string{ + "datname", + "datid", + } + + for _, field := range intMetrics { + _, found := acc.Int64Field(measurement, field) + assert.True(t, found, fmt.Sprintf("expected %s to be an integer", field)) + } + + for _, field := range int32Metrics { + _, found := acc.Int32Field(measurement, field) + assert.True(t, found, fmt.Sprintf("expected %s to be an int32", field)) + } + + for _, field := range floatMetrics { + _, found := acc.FloatField(measurement, field) + assert.True(t, found, fmt.Sprintf("expected %s to be a float64", field)) + } + + for _, field := range stringMetrics { + _, found := acc.StringField(measurement, field) + assert.True(t, found, fmt.Sprintf("expected %s to be a str", field)) + } +} + func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") @@ -113,10 +223,9 @@ func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) { } var acc testutil.Accumulator + require.NoError(t, acc.GatherError(p.Gather)) - err := acc.GatherError(p.Gather) - require.NoError(t, err) - + assert.NotEmpty(t, p.IgnoredColumns()) for col := range p.IgnoredColumns() { assert.False(t, acc.HasMeasurement(col)) } diff --git a/testutil/accumulator.go b/testutil/accumulator.go index fdca47b07..9a8eef9e3 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -409,7 +409,7 @@ func (a *Accumulator) HasStringField(measurement string, field string) bool { return false } -// HasUIntValue returns true if the measurement has a UInt value +// HasUIntField returns true if the measurement has a UInt value func (a *Accumulator) HasUIntField(measurement string, field string) bool { a.Lock() defer a.Unlock() @@ -427,7 +427,7 @@ func (a *Accumulator) HasUIntField(measurement string, field string) bool { return false } -// HasFloatValue returns true if the given measurement has a float value +// HasFloatField returns true if the given measurement has a float value func (a *Accumulator) HasFloatField(measurement string, field string) bool { a.Lock() defer a.Unlock() @@ -458,6 +458,7 @@ func (a *Accumulator) HasMeasurement(measurement string) bool { return false } +// IntField returns the int value of the given measurement and field or false. func (a *Accumulator) IntField(measurement string, field string) (int, bool) { a.Lock() defer a.Unlock() @@ -475,6 +476,43 @@ func (a *Accumulator) IntField(measurement string, field string) (int, bool) { return 0, false } +// Int64Field returns the int64 value of the given measurement and field or false. +func (a *Accumulator) Int64Field(measurement string, field string) (int64, bool) { + a.Lock() + defer a.Unlock() + for _, p := range a.Metrics { + if p.Measurement == measurement { + for fieldname, value := range p.Fields { + if fieldname == field { + v, ok := value.(int64) + return v, ok + } + } + } + } + + return 0, false +} + +// Int32Field returns the int32 value of the given measurement and field or false. +func (a *Accumulator) Int32Field(measurement string, field string) (int32, bool) { + a.Lock() + defer a.Unlock() + for _, p := range a.Metrics { + if p.Measurement == measurement { + for fieldname, value := range p.Fields { + if fieldname == field { + v, ok := value.(int32) + return v, ok + } + } + } + } + + return 0, false +} + +// FloatField returns the float64 value of the given measurement and field or false. func (a *Accumulator) FloatField(measurement string, field string) (float64, bool) { a.Lock() defer a.Unlock() @@ -492,6 +530,7 @@ func (a *Accumulator) FloatField(measurement string, field string) (float64, boo return 0.0, false } +// StringField returns the string value of the given measurement and field or false. func (a *Accumulator) StringField(measurement string, field string) (string, bool) { a.Lock() defer a.Unlock() @@ -507,3 +546,21 @@ func (a *Accumulator) StringField(measurement string, field string) (string, boo } return "", false } + +// BoolField returns the bool value of the given measurement and field or false. +func (a *Accumulator) BoolField(measurement string, field string) (bool, bool) { + a.Lock() + defer a.Unlock() + for _, p := range a.Metrics { + if p.Measurement == measurement { + for fieldname, value := range p.Fields { + if fieldname == field { + v, ok := value.(bool) + return v, ok + } + } + } + } + + return false, false +}