Add additional numeric type handling tests for postgresql_extensible (#3066)
This commit is contained in:
parent
063f3f68df
commit
837e6b1a32
|
@ -231,8 +231,12 @@ func (p *Postgresql) SanitizedAddress() (_ string, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumulator) error {
|
func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumulator) error {
|
||||||
var columnVars []interface{}
|
var (
|
||||||
var dbname bytes.Buffer
|
err error
|
||||||
|
columnVars []interface{}
|
||||||
|
dbname bytes.Buffer
|
||||||
|
tagAddress string
|
||||||
|
)
|
||||||
|
|
||||||
// this is where we'll store the column name with its *interface{}
|
// this is where we'll store the column name with its *interface{}
|
||||||
columnMap := make(map[string]*interface{})
|
columnMap := make(map[string]*interface{})
|
||||||
|
@ -247,11 +251,10 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula
|
||||||
}
|
}
|
||||||
|
|
||||||
// deconstruct array of variables and send to Scan
|
// deconstruct array of variables and send to Scan
|
||||||
err := row.Scan(columnVars...)
|
if err = row.Scan(columnVars...); err != nil {
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if columnMap["datname"] != nil {
|
if columnMap["datname"] != nil {
|
||||||
// extract the database name from the column map
|
// extract the database name from the column map
|
||||||
dbname.WriteString((*columnMap["datname"]).(string))
|
dbname.WriteString((*columnMap["datname"]).(string))
|
||||||
|
@ -259,17 +262,16 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula
|
||||||
dbname.WriteString("postgres")
|
dbname.WriteString("postgres")
|
||||||
}
|
}
|
||||||
|
|
||||||
var tagAddress string
|
if tagAddress, err = p.SanitizedAddress(); err != nil {
|
||||||
tagAddress, err = p.SanitizedAddress()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process the additional tags
|
// Process the additional tags
|
||||||
|
tags := map[string]string{
|
||||||
|
"server": tagAddress,
|
||||||
|
"db": dbname.String(),
|
||||||
|
}
|
||||||
|
|
||||||
tags := map[string]string{}
|
|
||||||
tags["server"] = tagAddress
|
|
||||||
tags["db"] = dbname.String()
|
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
COLUMN:
|
COLUMN:
|
||||||
for col, val := range columnMap {
|
for col, val := range columnMap {
|
||||||
|
@ -295,6 +297,7 @@ COLUMN:
|
||||||
}
|
}
|
||||||
continue COLUMN
|
continue COLUMN
|
||||||
}
|
}
|
||||||
|
|
||||||
if v, ok := (*val).([]byte); ok {
|
if v, ok := (*val).([]byte); ok {
|
||||||
fields[col] = string(v)
|
fields[col] = string(v)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -9,25 +9,30 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func queryRunner(t *testing.T, q query) (*Postgresql, *testutil.Accumulator) {
|
||||||
|
p := &Postgresql{
|
||||||
|
Address: fmt.Sprintf("host=%s user=postgres sslmode=disable",
|
||||||
|
testutil.GetLocalHost()),
|
||||||
|
Databases: []string{"postgres"},
|
||||||
|
Query: q,
|
||||||
|
}
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
require.NoError(t, acc.GatherError(p.Gather))
|
||||||
|
return p, &acc
|
||||||
|
}
|
||||||
|
|
||||||
func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("Skipping integration test in short mode")
|
t.Skip("Skipping integration test in short mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &Postgresql{
|
p, acc := queryRunner(t, query{{
|
||||||
Address: fmt.Sprintf("host=%s user=postgres sslmode=disable",
|
Sqlquery: "select * from pg_stat_database",
|
||||||
testutil.GetLocalHost()),
|
|
||||||
Databases: []string{"postgres"},
|
|
||||||
Query: query{
|
|
||||||
{Sqlquery: "select * from pg_stat_database",
|
|
||||||
Version: 901,
|
Version: 901,
|
||||||
Withdbname: false,
|
Withdbname: false,
|
||||||
Tagvalue: ""},
|
Tagvalue: "",
|
||||||
},
|
}})
|
||||||
}
|
|
||||||
var acc testutil.Accumulator
|
|
||||||
err := acc.GatherError(p.Gather)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
availableColumns := make(map[string]bool)
|
availableColumns := make(map[string]bool)
|
||||||
for _, col := range p.AllColumns {
|
for _, col := range p.AllColumns {
|
||||||
|
@ -102,6 +107,111 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||||
assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted)
|
assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPostgresqlQueryOutputTests(t *testing.T) {
|
||||||
|
const measurement = "postgresql"
|
||||||
|
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
examples := map[string]func(*testutil.Accumulator){
|
||||||
|
"SELECT 10.0::float AS myvalue": func(acc *testutil.Accumulator) {
|
||||||
|
v, found := acc.FloatField(measurement, "myvalue")
|
||||||
|
assert.True(t, found)
|
||||||
|
assert.Equal(t, 10.0, v)
|
||||||
|
},
|
||||||
|
"SELECT 10.0 AS myvalue": func(acc *testutil.Accumulator) {
|
||||||
|
v, found := acc.StringField(measurement, "myvalue")
|
||||||
|
assert.True(t, found)
|
||||||
|
assert.Equal(t, "10.0", v)
|
||||||
|
},
|
||||||
|
"SELECT 'hello world' AS myvalue": func(acc *testutil.Accumulator) {
|
||||||
|
v, found := acc.StringField(measurement, "myvalue")
|
||||||
|
assert.True(t, found)
|
||||||
|
assert.Equal(t, "hello world", v)
|
||||||
|
},
|
||||||
|
"SELECT true AS myvalue": func(acc *testutil.Accumulator) {
|
||||||
|
v, found := acc.BoolField(measurement, "myvalue")
|
||||||
|
assert.True(t, found)
|
||||||
|
assert.Equal(t, true, v)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for q, assertions := range examples {
|
||||||
|
_, acc := queryRunner(t, query{{
|
||||||
|
Sqlquery: q,
|
||||||
|
Version: 901,
|
||||||
|
Withdbname: false,
|
||||||
|
Tagvalue: "",
|
||||||
|
}})
|
||||||
|
assertions(acc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPostgresqlFieldOutput(t *testing.T) {
|
||||||
|
const measurement = "postgresql"
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, acc := queryRunner(t, query{{
|
||||||
|
Sqlquery: "select * from pg_stat_database",
|
||||||
|
Version: 901,
|
||||||
|
Withdbname: false,
|
||||||
|
Tagvalue: "",
|
||||||
|
}})
|
||||||
|
|
||||||
|
intMetrics := []string{
|
||||||
|
"xact_commit",
|
||||||
|
"xact_rollback",
|
||||||
|
"blks_read",
|
||||||
|
"blks_hit",
|
||||||
|
"tup_returned",
|
||||||
|
"tup_fetched",
|
||||||
|
"tup_inserted",
|
||||||
|
"tup_updated",
|
||||||
|
"tup_deleted",
|
||||||
|
"conflicts",
|
||||||
|
"temp_files",
|
||||||
|
"temp_bytes",
|
||||||
|
"deadlocks",
|
||||||
|
}
|
||||||
|
|
||||||
|
int32Metrics := []string{
|
||||||
|
"numbackends",
|
||||||
|
}
|
||||||
|
|
||||||
|
floatMetrics := []string{
|
||||||
|
"blk_read_time",
|
||||||
|
"blk_write_time",
|
||||||
|
}
|
||||||
|
|
||||||
|
stringMetrics := []string{
|
||||||
|
"datname",
|
||||||
|
"datid",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, field := range intMetrics {
|
||||||
|
_, found := acc.Int64Field(measurement, field)
|
||||||
|
assert.True(t, found, fmt.Sprintf("expected %s to be an integer", field))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, field := range int32Metrics {
|
||||||
|
_, found := acc.Int32Field(measurement, field)
|
||||||
|
assert.True(t, found, fmt.Sprintf("expected %s to be an int32", field))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, field := range floatMetrics {
|
||||||
|
_, found := acc.FloatField(measurement, field)
|
||||||
|
assert.True(t, found, fmt.Sprintf("expected %s to be a float64", field))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, field := range stringMetrics {
|
||||||
|
_, found := acc.StringField(measurement, field)
|
||||||
|
assert.True(t, found, fmt.Sprintf("expected %s to be a str", field))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) {
|
func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("Skipping integration test in short mode")
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
@ -113,10 +223,9 @@ func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, acc.GatherError(p.Gather))
|
||||||
|
|
||||||
err := acc.GatherError(p.Gather)
|
assert.NotEmpty(t, p.IgnoredColumns())
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
for col := range p.IgnoredColumns() {
|
for col := range p.IgnoredColumns() {
|
||||||
assert.False(t, acc.HasMeasurement(col))
|
assert.False(t, acc.HasMeasurement(col))
|
||||||
}
|
}
|
||||||
|
|
|
@ -409,7 +409,7 @@ func (a *Accumulator) HasStringField(measurement string, field string) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasUIntValue returns true if the measurement has a UInt value
|
// HasUIntField returns true if the measurement has a UInt value
|
||||||
func (a *Accumulator) HasUIntField(measurement string, field string) bool {
|
func (a *Accumulator) HasUIntField(measurement string, field string) bool {
|
||||||
a.Lock()
|
a.Lock()
|
||||||
defer a.Unlock()
|
defer a.Unlock()
|
||||||
|
@ -427,7 +427,7 @@ func (a *Accumulator) HasUIntField(measurement string, field string) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasFloatValue returns true if the given measurement has a float value
|
// HasFloatField returns true if the given measurement has a float value
|
||||||
func (a *Accumulator) HasFloatField(measurement string, field string) bool {
|
func (a *Accumulator) HasFloatField(measurement string, field string) bool {
|
||||||
a.Lock()
|
a.Lock()
|
||||||
defer a.Unlock()
|
defer a.Unlock()
|
||||||
|
@ -458,6 +458,7 @@ func (a *Accumulator) HasMeasurement(measurement string) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IntField returns the int value of the given measurement and field or false.
|
||||||
func (a *Accumulator) IntField(measurement string, field string) (int, bool) {
|
func (a *Accumulator) IntField(measurement string, field string) (int, bool) {
|
||||||
a.Lock()
|
a.Lock()
|
||||||
defer a.Unlock()
|
defer a.Unlock()
|
||||||
|
@ -475,6 +476,43 @@ func (a *Accumulator) IntField(measurement string, field string) (int, bool) {
|
||||||
return 0, false
|
return 0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Int64Field returns the int64 value of the given measurement and field or false.
|
||||||
|
func (a *Accumulator) Int64Field(measurement string, field string) (int64, bool) {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
|
for _, p := range a.Metrics {
|
||||||
|
if p.Measurement == measurement {
|
||||||
|
for fieldname, value := range p.Fields {
|
||||||
|
if fieldname == field {
|
||||||
|
v, ok := value.(int64)
|
||||||
|
return v, ok
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Int32Field returns the int32 value of the given measurement and field or false.
|
||||||
|
func (a *Accumulator) Int32Field(measurement string, field string) (int32, bool) {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
|
for _, p := range a.Metrics {
|
||||||
|
if p.Measurement == measurement {
|
||||||
|
for fieldname, value := range p.Fields {
|
||||||
|
if fieldname == field {
|
||||||
|
v, ok := value.(int32)
|
||||||
|
return v, ok
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// FloatField returns the float64 value of the given measurement and field or false.
|
||||||
func (a *Accumulator) FloatField(measurement string, field string) (float64, bool) {
|
func (a *Accumulator) FloatField(measurement string, field string) (float64, bool) {
|
||||||
a.Lock()
|
a.Lock()
|
||||||
defer a.Unlock()
|
defer a.Unlock()
|
||||||
|
@ -492,6 +530,7 @@ func (a *Accumulator) FloatField(measurement string, field string) (float64, boo
|
||||||
return 0.0, false
|
return 0.0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StringField returns the string value of the given measurement and field or false.
|
||||||
func (a *Accumulator) StringField(measurement string, field string) (string, bool) {
|
func (a *Accumulator) StringField(measurement string, field string) (string, bool) {
|
||||||
a.Lock()
|
a.Lock()
|
||||||
defer a.Unlock()
|
defer a.Unlock()
|
||||||
|
@ -507,3 +546,21 @@ func (a *Accumulator) StringField(measurement string, field string) (string, boo
|
||||||
}
|
}
|
||||||
return "", false
|
return "", false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BoolField returns the bool value of the given measurement and field or false.
|
||||||
|
func (a *Accumulator) BoolField(measurement string, field string) (bool, bool) {
|
||||||
|
a.Lock()
|
||||||
|
defer a.Unlock()
|
||||||
|
for _, p := range a.Metrics {
|
||||||
|
if p.Measurement == measurement {
|
||||||
|
for fieldname, value := range p.Fields {
|
||||||
|
if fieldname == field {
|
||||||
|
v, ok := value.(bool)
|
||||||
|
return v, ok
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, false
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue