0.3.0 unit tests: jolokia, kafka_consumer, leofs, lustre2

This commit is contained in:
Cameron Sparr 2016-01-06 16:55:28 -07:00
parent 524fddedb4
commit 9ada89d51a
8 changed files with 75 additions and 97 deletions

View File

@ -69,6 +69,10 @@ func (ac *accumulator) AddFields(
tags map[string]string, tags map[string]string,
t ...time.Time, t ...time.Time,
) { ) {
if len(fields) == 0 || len(measurement) == 0 {
return
}
if !ac.pluginConfig.Filter.ShouldTagsPass(tags) { if !ac.pluginConfig.Filter.ShouldTagsPass(tags) {
return return
} }

View File

@ -75,7 +75,6 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer req.Body.Close()
resp, err := j.jClient.MakeRequest(req) resp, err := j.jClient.MakeRequest(req)
if err != nil { if err != nil {

View File

@ -48,7 +48,7 @@ const empty = ""
var Servers = []Server{Server{Name: "as1", Host: "127.0.0.1", Port: "8080"}} var Servers = []Server{Server{Name: "as1", Host: "127.0.0.1", Port: "8080"}}
var HeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"} var HeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"}
var UsedHeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage", Pass: []string{"used"}} var UsedHeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"}
type jolokiaClientStub struct { type jolokiaClientStub struct {
responseBody string responseBody string
@ -79,7 +79,6 @@ func genJolokiaClientStub(response string, statusCode int, servers []Server, met
// Test that the proper values are ignored or collected // Test that the proper values are ignored or collected
func TestHttpJsonMultiValue(t *testing.T) { func TestHttpJsonMultiValue(t *testing.T) {
jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{HeapMetric}) jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{HeapMetric})
var acc testutil.Accumulator var acc testutil.Accumulator
@ -88,58 +87,28 @@ func TestHttpJsonMultiValue(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points)) assert.Equal(t, 1, len(acc.Points))
assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"init": 67108864.0, fields := map[string]interface{}{
"committed": 456130560.0, "heap_memory_usage_init": 67108864.0,
"max": 477626368.0, "heap_memory_usage_committed": 456130560.0,
"used": 203288528.0})) "heap_memory_usage_max": 477626368.0,
} "heap_memory_usage_used": 203288528.0,
}
// Test that the proper values are ignored or collected tags := map[string]string{
func TestHttpJsonMultiValueWithPass(t *testing.T) { "host": "127.0.0.1",
"port": "8080",
jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric}) "server": "as1",
}
var acc testutil.Accumulator acc.AssertContainsTaggedFields(t, "jolokia", fields, tags)
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))
assert.True(t, acc.CheckFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0}))
}
// Test that the proper values are ignored or collected
func TestHttpJsonMultiValueTags(t *testing.T) {
jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{UsedHeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))
assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"used": 203288528.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"}))
}
// Test that the proper values are ignored or collected
func TestHttpJsonSingleValueTags(t *testing.T) {
jolokia := genJolokiaClientStub(validSingleValueJSON, 200, Servers, []Metric{UsedHeapMetric})
var acc testutil.Accumulator
err := jolokia.Gather(&acc)
assert.Nil(t, err)
assert.Equal(t, 1, len(acc.Points))
assert.NoError(t, acc.ValidateTaggedFieldsValue("heap_memory_usage", map[string]interface{}{"value": 209274376.0}, map[string]string{"host": "127.0.0.1", "port": "8080", "server": "as1"}))
} }
// Test that the proper values are ignored or collected // Test that the proper values are ignored or collected
func TestHttpJsonOn404(t *testing.T) { func TestHttpJsonOn404(t *testing.T) {
jolokia := genJolokiaClientStub(validMultiValueJSON, 404, Servers, []Metric{UsedHeapMetric}) jolokia := genJolokiaClientStub(validMultiValueJSON, 404, Servers,
[]Metric{UsedHeapMetric})
var acc testutil.Accumulator var acc testutil.Accumulator
acc.SetDebug(true)
err := jolokia.Gather(&acc) err := jolokia.Gather(&acc)
assert.Nil(t, err) assert.Nil(t, err)

View File

@ -85,7 +85,8 @@ func TestRunParserAndGather(t *testing.T) {
k.Gather(&acc) k.Gather(&acc)
assert.Equal(t, len(acc.Points), 1) assert.Equal(t, len(acc.Points), 1)
assert.True(t, acc.CheckValue("cpu_load_short", 23422.0)) acc.AssertContainsFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(23422)})
} }
func saramaMsg(val string) *sarama.ConsumerMessage { func saramaMsg(val string) *sarama.ConsumerMessage {

View File

@ -129,7 +129,6 @@ func buildFakeSNMPCmd(src string) {
} }
func testMain(t *testing.T, code string, endpoint string, serverType ServerType) { func testMain(t *testing.T, code string, endpoint string, serverType ServerType) {
// Build the fake snmpwalk for test // Build the fake snmpwalk for test
src := makeFakeSNMPSrc(code) src := makeFakeSNMPSrc(code)
defer os.Remove(src) defer os.Remove(src)
@ -145,6 +144,7 @@ func testMain(t *testing.T, code string, endpoint string, serverType ServerType)
} }
var acc testutil.Accumulator var acc testutil.Accumulator
acc.SetDebug(true)
err := l.Gather(&acc) err := l.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
@ -152,7 +152,7 @@ func testMain(t *testing.T, code string, endpoint string, serverType ServerType)
floatMetrics := KeyMapping[serverType] floatMetrics := KeyMapping[serverType]
for _, metric := range floatMetrics { for _, metric := range floatMetrics {
assert.True(t, acc.HasFloatValue(metric), metric) assert.True(t, acc.HasFloatField("leofs", metric), metric)
} }
} }

View File

@ -22,6 +22,9 @@ import (
type Lustre2 struct { type Lustre2 struct {
Ost_procfiles []string Ost_procfiles []string
Mds_procfiles []string Mds_procfiles []string
// allFields maps and OST name to the metric fields associated with that OST
allFields map[string]map[string]interface{}
} }
var sampleConfig = ` var sampleConfig = `
@ -140,8 +143,11 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
*/ */
path := strings.Split(file, "/") path := strings.Split(file, "/")
name := path[len(path)-2] name := path[len(path)-2]
tags := map[string]string{ var fields map[string]interface{}
"name": name, fields, ok := l.allFields[name]
if !ok {
fields = make(map[string]interface{})
l.allFields[name] = fields
} }
lines, err := internal.ReadLines(file) lines, err := internal.ReadLines(file)
@ -149,7 +155,6 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
return err return err
} }
fields := make(map[string]interface{})
for _, line := range lines { for _, line := range lines {
parts := strings.Fields(line) parts := strings.Fields(line)
for _, wanted := range wanted_fields { for _, wanted := range wanted_fields {
@ -173,7 +178,6 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
} }
} }
} }
acc.AddFields("lustre2", fields, tags)
} }
return nil return nil
} }
@ -190,15 +194,18 @@ func (l *Lustre2) Description() string {
// Gather reads stats from all lustre targets // Gather reads stats from all lustre targets
func (l *Lustre2) Gather(acc plugins.Accumulator) error { func (l *Lustre2) Gather(acc plugins.Accumulator) error {
l.allFields = make(map[string]map[string]interface{})
if len(l.Ost_procfiles) == 0 { if len(l.Ost_procfiles) == 0 {
// read/write bytes are in obdfilter/<ost_name>/stats // read/write bytes are in obdfilter/<ost_name>/stats
err := l.GetLustreProcStats("/proc/fs/lustre/obdfilter/*/stats", wanted_ost_fields, acc) err := l.GetLustreProcStats("/proc/fs/lustre/obdfilter/*/stats",
wanted_ost_fields, acc)
if err != nil { if err != nil {
return err return err
} }
// cache counters are in osd-ldiskfs/<ost_name>/stats // cache counters are in osd-ldiskfs/<ost_name>/stats
err = l.GetLustreProcStats("/proc/fs/lustre/osd-ldiskfs/*/stats", wanted_ost_fields, acc) err = l.GetLustreProcStats("/proc/fs/lustre/osd-ldiskfs/*/stats",
wanted_ost_fields, acc)
if err != nil { if err != nil {
return err return err
} }
@ -206,7 +213,8 @@ func (l *Lustre2) Gather(acc plugins.Accumulator) error {
if len(l.Mds_procfiles) == 0 { if len(l.Mds_procfiles) == 0 {
// Metadata server stats // Metadata server stats
err := l.GetLustreProcStats("/proc/fs/lustre/mdt/*/md_stats", wanted_mds_fields, acc) err := l.GetLustreProcStats("/proc/fs/lustre/mdt/*/md_stats",
wanted_mds_fields, acc)
if err != nil { if err != nil {
return err return err
} }
@ -225,6 +233,13 @@ func (l *Lustre2) Gather(acc plugins.Accumulator) error {
} }
} }
for name, fields := range l.allFields {
tags := map[string]string{
"name": name,
}
acc.AddFields("lustre2", fields, tags)
}
return nil return nil
} }

View File

@ -6,7 +6,6 @@ import (
"testing" "testing"
"github.com/influxdb/telegraf/testutil" "github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -58,11 +57,6 @@ samedir_rename 259625 samples [reqs]
crossdir_rename 369571 samples [reqs] crossdir_rename 369571 samples [reqs]
` `
type metrics struct {
name string
value uint64
}
func TestLustre2GeneratesMetrics(t *testing.T) { func TestLustre2GeneratesMetrics(t *testing.T) {
tempdir := os.TempDir() + "/telegraf/proc/fs/lustre/" tempdir := os.TempDir() + "/telegraf/proc/fs/lustre/"
@ -103,41 +97,33 @@ func TestLustre2GeneratesMetrics(t *testing.T) {
"name": ost_name, "name": ost_name,
} }
intMetrics := []*metrics{ fields := map[string]interface{}{
{ "cache_access": uint64(19047063027),
name: "write_bytes", "cache_hit": uint64(7393729777),
value: 15201500833981, "cache_miss": uint64(11653333250),
}, "close": uint64(873243496),
{ "crossdir_rename": uint64(369571),
name: "read_bytes", "getattr": uint64(1503663097),
value: 78026117632000, "getxattr": uint64(6145349681),
}, "link": uint64(445),
{ "mkdir": uint64(705499),
name: "write_calls", "mknod": uint64(349042),
value: 71893382, "open": uint64(1024577037),
}, "read_bytes": uint64(78026117632000),
{ "read_calls": uint64(203238095),
name: "read_calls", "rename": uint64(629196),
value: 203238095, "rmdir": uint64(227434),
}, "samedir_rename": uint64(259625),
{ "setattr": uint64(1898364),
name: "cache_hit", "setxattr": uint64(83969),
value: 7393729777, "statfs": uint64(2916320),
}, "sync": uint64(434081),
{ "unlink": uint64(3549417),
name: "cache_access", "write_bytes": uint64(15201500833981),
value: 19047063027, "write_calls": uint64(71893382),
},
{
name: "cache_miss",
value: 11653333250,
},
} }
for _, metric := range intMetrics { acc.AssertContainsTaggedFields(t, "lustre2", fields, tags)
assert.True(t, acc.HasUIntValue(metric.name), metric.name)
assert.True(t, acc.CheckTaggedValue(metric.name, metric.value, tags))
}
err = os.RemoveAll(os.TempDir() + "/telegraf") err = os.RemoveAll(os.TempDir() + "/telegraf")
require.NoError(t, err) require.NoError(t, err)

View File

@ -55,6 +55,10 @@ func (a *Accumulator) AddFields(
tags = map[string]string{} tags = map[string]string{}
} }
if len(fields) == 0 {
return
}
var t time.Time var t time.Time
if len(timestamp) > 0 { if len(timestamp) > 0 {
t = timestamp[0] t = timestamp[0]