remove sleep from tests (#2555)

This commit is contained in:
Patrick Hemmer 2017-03-24 15:03:36 -04:00 committed by Daniel Nelson
parent 6588c4a1a7
commit c65cfb6a6e
21 changed files with 252 additions and 270 deletions

View File

@ -207,14 +207,13 @@ func TestGenerateStatisticsInputParams(t *testing.T) {
} }
func TestMetricsCacheTimeout(t *testing.T) { func TestMetricsCacheTimeout(t *testing.T) {
ttl, _ := time.ParseDuration("5ms")
cache := &MetricCache{ cache := &MetricCache{
Metrics: []*cloudwatch.Metric{}, Metrics: []*cloudwatch.Metric{},
Fetched: time.Now(), Fetched: time.Now(),
TTL: ttl, TTL: time.Minute,
} }
assert.True(t, cache.IsValid()) assert.True(t, cache.IsValid())
time.Sleep(ttl) cache.Fetched = time.Now().Add(-time.Minute)
assert.False(t, cache.IsValid()) assert.False(t, cache.IsValid())
} }

View File

@ -6,7 +6,6 @@ import (
"net/http" "net/http"
"sync" "sync"
"testing" "testing"
"time"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -43,14 +42,12 @@ func TestWriteHTTP(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
// post single message to listener // post single message to listener
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg))) resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg)))
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
time.Sleep(time.Millisecond * 15) acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short", acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"}, map[string]string{"host": "server01"},
@ -61,7 +58,7 @@ func TestWriteHTTP(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
time.Sleep(time.Millisecond * 15) acc.Wait(2)
hostTags := []string{"server02", "server03", hostTags := []string{"server02", "server03",
"server04", "server05", "server06"} "server04", "server05", "server06"}
for _, hostTag := range hostTags { for _, hostTag := range hostTags {
@ -76,7 +73,7 @@ func TestWriteHTTP(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 400, resp.StatusCode) require.EqualValues(t, 400, resp.StatusCode)
time.Sleep(time.Millisecond * 15) acc.Wait(3)
acc.AssertContainsTaggedFields(t, "cpu_load_short", acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"}, map[string]string{"host": "server01"},
@ -91,14 +88,12 @@ func TestWriteHTTPNoNewline(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
// post single message to listener // post single message to listener
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgNoNewline))) resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgNoNewline)))
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
time.Sleep(time.Millisecond * 15) acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short", acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"}, map[string]string{"host": "server01"},
@ -115,8 +110,6 @@ func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
// Post a gigantic metric to the listener and verify that it writes OK this time: // Post a gigantic metric to the listener and verify that it writes OK this time:
resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric))) resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err) require.NoError(t, err)
@ -133,8 +126,6 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
resp, err := http.Post("http://localhost:8297/write", "", bytes.NewBuffer([]byte(hugeMetric))) resp, err := http.Post("http://localhost:8297/write", "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 413, resp.StatusCode) require.EqualValues(t, 413, resp.StatusCode)
@ -150,15 +141,13 @@ func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
resp, err := http.Post("http://localhost:8298/write", "", bytes.NewBuffer([]byte(testMsgs))) resp, err := http.Post("http://localhost:8298/write", "", bytes.NewBuffer([]byte(testMsgs)))
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
time.Sleep(time.Millisecond * 15)
hostTags := []string{"server02", "server03", hostTags := []string{"server02", "server03",
"server04", "server05", "server06"} "server04", "server05", "server06"}
acc.Wait(len(hostTags))
for _, hostTag := range hostTags { for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short", acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
@ -177,15 +166,13 @@ func TestWriteHTTPLargeLinesSkipped(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
resp, err := http.Post("http://localhost:8300/write", "", bytes.NewBuffer([]byte(hugeMetric+testMsgs))) resp, err := http.Post("http://localhost:8300/write", "", bytes.NewBuffer([]byte(hugeMetric+testMsgs)))
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 400, resp.StatusCode) require.EqualValues(t, 400, resp.StatusCode)
time.Sleep(time.Millisecond * 15)
hostTags := []string{"server02", "server03", hostTags := []string{"server02", "server03",
"server04", "server05", "server06"} "server04", "server05", "server06"}
acc.Wait(len(hostTags))
for _, hostTag := range hostTags { for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short", acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
@ -204,8 +191,6 @@ func TestWriteHTTPGzippedData(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
data, err := ioutil.ReadFile("./testdata/testmsgs.gz") data, err := ioutil.ReadFile("./testdata/testmsgs.gz")
require.NoError(t, err) require.NoError(t, err)
@ -218,9 +203,9 @@ func TestWriteHTTPGzippedData(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
time.Sleep(time.Millisecond * 50)
hostTags := []string{"server02", "server03", hostTags := []string{"server02", "server03",
"server04", "server05", "server06"} "server04", "server05", "server06"}
acc.Wait(len(hostTags))
for _, hostTag := range hostTags { for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short", acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
@ -237,8 +222,6 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
// post many messages to listener // post many messages to listener
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
@ -254,9 +237,9 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
} }
wg.Wait() wg.Wait()
time.Sleep(time.Millisecond * 250)
listener.Gather(acc) listener.Gather(acc)
acc.Wait(25000)
require.Equal(t, int64(25000), int64(acc.NMetrics())) require.Equal(t, int64(25000), int64(acc.NMetrics()))
} }
@ -267,8 +250,6 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
// post single message to listener // post single message to listener
resp, err := http.Post("http://localhost:8186/foobar", "", bytes.NewBuffer([]byte(testMsg))) resp, err := http.Post("http://localhost:8186/foobar", "", bytes.NewBuffer([]byte(testMsg)))
require.NoError(t, err) require.NoError(t, err)
@ -276,16 +257,12 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
} }
func TestWriteHTTPInvalid(t *testing.T) { func TestWriteHTTPInvalid(t *testing.T) {
time.Sleep(time.Millisecond * 250)
listener := newTestHTTPListener() listener := newTestHTTPListener()
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
// post single message to listener // post single message to listener
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg))) resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg)))
require.NoError(t, err) require.NoError(t, err)
@ -293,16 +270,12 @@ func TestWriteHTTPInvalid(t *testing.T) {
} }
func TestWriteHTTPEmpty(t *testing.T) { func TestWriteHTTPEmpty(t *testing.T) {
time.Sleep(time.Millisecond * 250)
listener := newTestHTTPListener() listener := newTestHTTPListener()
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
// post single message to listener // post single message to listener
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(emptyMsg))) resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(emptyMsg)))
require.NoError(t, err) require.NoError(t, err)
@ -310,16 +283,12 @@ func TestWriteHTTPEmpty(t *testing.T) {
} }
func TestQueryAndPingHTTP(t *testing.T) { func TestQueryAndPingHTTP(t *testing.T) {
time.Sleep(time.Millisecond * 250)
listener := newTestHTTPListener() listener := newTestHTTPListener()
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
// post query to listener // post query to listener
resp, err := http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil) resp, err := http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil)
require.NoError(t, err) require.NoError(t, err)

View File

@ -329,7 +329,7 @@ func TestTimeout(t *testing.T) {
Address: ts.URL + "/twosecondnap", Address: ts.URL + "/twosecondnap",
Body: "{ 'test': 'data'}", Body: "{ 'test': 'data'}",
Method: "GET", Method: "GET",
ResponseTimeout: internal.Duration{Duration: time.Second * 1}, ResponseTimeout: internal.Duration{Duration: time.Millisecond},
Headers: map[string]string{ Headers: map[string]string{
"Content-Type": "application/json", "Content-Type": "application/json",
}, },

View File

@ -1,6 +1,7 @@
package kafka_consumer package kafka_consumer
import ( import (
"fmt"
"log" "log"
"strings" "strings"
"sync" "sync"
@ -129,13 +130,13 @@ func (k *Kafka) receiver() {
return return
case err := <-k.errs: case err := <-k.errs:
if err != nil { if err != nil {
log.Printf("E! Kafka Consumer Error: %s\n", err) k.acc.AddError(fmt.Errorf("Kafka Consumer Error: %s\n", err))
} }
case msg := <-k.in: case msg := <-k.in:
metrics, err := k.parser.Parse(msg.Value) metrics, err := k.parser.Parse(msg.Value)
if err != nil { if err != nil {
log.Printf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s", k.acc.AddError(fmt.Errorf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s",
string(msg.Value), err.Error()) string(msg.Value), err.Error()))
} }
for _, metric := range metrics { for _, metric := range metrics {
@ -158,7 +159,7 @@ func (k *Kafka) Stop() {
defer k.Unlock() defer k.Unlock()
close(k.done) close(k.done)
if err := k.Consumer.Close(); err != nil { if err := k.Consumer.Close(); err != nil {
log.Printf("E! Error closing kafka consumer: %s\n", err.Error()) k.acc.AddError(fmt.Errorf("E! Error closing kafka consumer: %s\n", err.Error()))
} }
} }

View File

@ -2,7 +2,6 @@ package kafka_consumer
import ( import (
"testing" "testing"
"time"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -43,7 +42,7 @@ func TestRunParser(t *testing.T) {
k.parser, _ = parsers.NewInfluxParser() k.parser, _ = parsers.NewInfluxParser()
go k.receiver() go k.receiver()
in <- saramaMsg(testMsg) in <- saramaMsg(testMsg)
time.Sleep(time.Millisecond * 5) acc.Wait(1)
assert.Equal(t, acc.NFields(), 1) assert.Equal(t, acc.NFields(), 1)
} }
@ -58,7 +57,7 @@ func TestRunParserInvalidMsg(t *testing.T) {
k.parser, _ = parsers.NewInfluxParser() k.parser, _ = parsers.NewInfluxParser()
go k.receiver() go k.receiver()
in <- saramaMsg(invalidMsg) in <- saramaMsg(invalidMsg)
time.Sleep(time.Millisecond * 5) acc.WaitError(1)
assert.Equal(t, acc.NFields(), 0) assert.Equal(t, acc.NFields(), 0)
} }
@ -73,7 +72,7 @@ func TestRunParserAndGather(t *testing.T) {
k.parser, _ = parsers.NewInfluxParser() k.parser, _ = parsers.NewInfluxParser()
go k.receiver() go k.receiver()
in <- saramaMsg(testMsg) in <- saramaMsg(testMsg)
time.Sleep(time.Millisecond * 5) acc.Wait(1)
k.Gather(&acc) k.Gather(&acc)
@ -92,7 +91,7 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
go k.receiver() go k.receiver()
in <- saramaMsg(testMsgGraphite) in <- saramaMsg(testMsgGraphite)
time.Sleep(time.Millisecond * 5) acc.Wait(1)
k.Gather(&acc) k.Gather(&acc)
@ -111,7 +110,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil) k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
go k.receiver() go k.receiver()
in <- saramaMsg(testMsgJSON) in <- saramaMsg(testMsgJSON)
time.Sleep(time.Millisecond * 5) acc.Wait(1)
k.Gather(&acc) k.Gather(&acc)

View File

@ -6,7 +6,6 @@ import (
"runtime" "runtime"
"strings" "strings"
"testing" "testing"
"time"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -41,7 +40,6 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
assert.Error(t, logparser.Start(&acc)) assert.Error(t, logparser.Start(&acc))
time.Sleep(time.Millisecond * 500)
logparser.Stop() logparser.Stop()
} }
@ -61,7 +59,8 @@ func TestGrokParseLogFiles(t *testing.T) {
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
assert.NoError(t, logparser.Start(&acc)) assert.NoError(t, logparser.Start(&acc))
time.Sleep(time.Millisecond * 500) acc.Wait(2)
logparser.Stop() logparser.Stop()
acc.AssertContainsTaggedFields(t, "logparser_grok", acc.AssertContainsTaggedFields(t, "logparser_grok",
@ -102,14 +101,13 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
assert.NoError(t, logparser.Start(&acc)) assert.NoError(t, logparser.Start(&acc))
time.Sleep(time.Millisecond * 500)
assert.Equal(t, acc.NFields(), 0) assert.Equal(t, acc.NFields(), 0)
os.Symlink( os.Symlink(
thisdir+"grok/testdata/test_a.log", thisdir+"grok/testdata/test_a.log",
emptydir+"/test_a.log") emptydir+"/test_a.log")
assert.NoError(t, logparser.Gather(&acc)) assert.NoError(t, logparser.Gather(&acc))
time.Sleep(time.Millisecond * 500) acc.Wait(1)
logparser.Stop() logparser.Stop()
@ -143,7 +141,7 @@ func TestGrokParseLogFilesOneBad(t *testing.T) {
acc.SetDebug(true) acc.SetDebug(true)
assert.NoError(t, logparser.Start(&acc)) assert.NoError(t, logparser.Start(&acc))
time.Sleep(time.Millisecond * 500) acc.Wait(1)
logparser.Stop() logparser.Stop()
acc.AssertContainsTaggedFields(t, "logparser_grok", acc.AssertContainsTaggedFields(t, "logparser_grok",

View File

@ -4,7 +4,6 @@ package mongodb
import ( import (
"testing" "testing"
"time"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -32,12 +31,11 @@ func TestAddDefaultStats(t *testing.T) {
err := server.gatherData(&acc, false) err := server.gatherData(&acc, false)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Duration(1) * time.Second)
// need to call this twice so it can perform the diff // need to call this twice so it can perform the diff
err = server.gatherData(&acc, false) err = server.gatherData(&acc, false)
require.NoError(t, err) require.NoError(t, err)
for key, _ := range DefaultStats { for key, _ := range DefaultStats {
assert.True(t, acc.HasIntValue(key)) assert.True(t, acc.HasIntField("mongodb", key))
} }
} }

View File

@ -142,8 +142,8 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) {
subscribeToken := c.SubscribeMultiple(topics, m.recvMessage) subscribeToken := c.SubscribeMultiple(topics, m.recvMessage)
subscribeToken.Wait() subscribeToken.Wait()
if subscribeToken.Error() != nil { if subscribeToken.Error() != nil {
log.Printf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s", m.acc.AddError(fmt.Errorf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s",
strings.Join(m.Topics[:], ","), subscribeToken.Error()) strings.Join(m.Topics[:], ","), subscribeToken.Error()))
} }
m.started = true m.started = true
} }
@ -151,7 +151,7 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) {
} }
func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) { func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) {
log.Printf("E! MQTT Connection lost\nerror: %s\nMQTT Client will try to reconnect", err.Error()) m.acc.AddError(fmt.Errorf("E! MQTT Connection lost\nerror: %s\nMQTT Client will try to reconnect", err.Error()))
return return
} }
@ -166,8 +166,8 @@ func (m *MQTTConsumer) receiver() {
topic := msg.Topic() topic := msg.Topic()
metrics, err := m.parser.Parse(msg.Payload()) metrics, err := m.parser.Parse(msg.Payload())
if err != nil { if err != nil {
log.Printf("E! MQTT Parse Error\nmessage: %s\nerror: %s", m.acc.AddError(fmt.Errorf("E! MQTT Parse Error\nmessage: %s\nerror: %s",
string(msg.Payload()), err.Error()) string(msg.Payload()), err.Error()))
} }
for _, metric := range metrics { for _, metric := range metrics {

View File

@ -2,7 +2,6 @@ package mqtt_consumer
import ( import (
"testing" "testing"
"time"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -86,7 +85,7 @@ func TestRunParser(t *testing.T) {
n.parser, _ = parsers.NewInfluxParser() n.parser, _ = parsers.NewInfluxParser()
go n.receiver() go n.receiver()
in <- mqttMsg(testMsgNeg) in <- mqttMsg(testMsgNeg)
time.Sleep(time.Millisecond * 250) acc.Wait(1)
if a := acc.NFields(); a != 1 { if a := acc.NFields(); a != 1 {
t.Errorf("got %v, expected %v", a, 1) t.Errorf("got %v, expected %v", a, 1)
@ -102,7 +101,7 @@ func TestRunParserNegativeNumber(t *testing.T) {
n.parser, _ = parsers.NewInfluxParser() n.parser, _ = parsers.NewInfluxParser()
go n.receiver() go n.receiver()
in <- mqttMsg(testMsg) in <- mqttMsg(testMsg)
time.Sleep(time.Millisecond * 25) acc.Wait(1)
if a := acc.NFields(); a != 1 { if a := acc.NFields(); a != 1 {
t.Errorf("got %v, expected %v", a, 1) t.Errorf("got %v, expected %v", a, 1)
@ -119,11 +118,12 @@ func TestRunParserInvalidMsg(t *testing.T) {
n.parser, _ = parsers.NewInfluxParser() n.parser, _ = parsers.NewInfluxParser()
go n.receiver() go n.receiver()
in <- mqttMsg(invalidMsg) in <- mqttMsg(invalidMsg)
time.Sleep(time.Millisecond * 25) acc.WaitError(1)
if a := acc.NFields(); a != 0 { if a := acc.NFields(); a != 0 {
t.Errorf("got %v, expected %v", a, 0) t.Errorf("got %v, expected %v", a, 0)
} }
assert.Contains(t, acc.Errors[0].Error(), "MQTT Parse Error")
} }
// Test that the parser parses line format messages into metrics // Test that the parser parses line format messages into metrics
@ -136,7 +136,7 @@ func TestRunParserAndGather(t *testing.T) {
n.parser, _ = parsers.NewInfluxParser() n.parser, _ = parsers.NewInfluxParser()
go n.receiver() go n.receiver()
in <- mqttMsg(testMsg) in <- mqttMsg(testMsg)
time.Sleep(time.Millisecond * 25) acc.Wait(1)
n.Gather(&acc) n.Gather(&acc)
@ -154,9 +154,9 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
go n.receiver() go n.receiver()
in <- mqttMsg(testMsgGraphite) in <- mqttMsg(testMsgGraphite)
time.Sleep(time.Millisecond * 25)
n.Gather(&acc) n.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "cpu_load_short_graphite", acc.AssertContainsFields(t, "cpu_load_short_graphite",
map[string]interface{}{"value": float64(23422)}) map[string]interface{}{"value": float64(23422)})
@ -172,10 +172,11 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
go n.receiver() go n.receiver()
in <- mqttMsg(testMsgJSON) in <- mqttMsg(testMsgJSON)
time.Sleep(time.Millisecond * 25)
n.Gather(&acc) n.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "nats_json_test", acc.AssertContainsFields(t, "nats_json_test",
map[string]interface{}{ map[string]interface{}{
"a": float64(5), "a": float64(5),

View File

@ -162,11 +162,11 @@ func (n *natsConsumer) receiver() {
case <-n.done: case <-n.done:
return return
case err := <-n.errs: case err := <-n.errs:
log.Printf("E! error reading from %s\n", err.Error()) n.acc.AddError(fmt.Errorf("E! error reading from %s\n", err.Error()))
case msg := <-n.in: case msg := <-n.in:
metrics, err := n.parser.Parse(msg.Data) metrics, err := n.parser.Parse(msg.Data)
if err != nil { if err != nil {
log.Printf("E! subject: %s, error: %s", msg.Subject, err.Error()) n.acc.AddError(fmt.Errorf("E! subject: %s, error: %s", msg.Subject, err.Error()))
} }
for _, metric := range metrics { for _, metric := range metrics {
@ -179,8 +179,8 @@ func (n *natsConsumer) receiver() {
func (n *natsConsumer) clean() { func (n *natsConsumer) clean() {
for _, sub := range n.Subs { for _, sub := range n.Subs {
if err := sub.Unsubscribe(); err != nil { if err := sub.Unsubscribe(); err != nil {
log.Printf("E! Error unsubscribing from subject %s in queue %s: %s\n", n.acc.AddError(fmt.Errorf("E! Error unsubscribing from subject %s in queue %s: %s\n",
sub.Subject, sub.Queue, err.Error()) sub.Subject, sub.Queue, err.Error()))
} }
} }

View File

@ -2,11 +2,11 @@ package natsconsumer
import ( import (
"testing" "testing"
"time"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/nats-io/nats" "github.com/nats-io/nats"
"github.com/stretchr/testify/assert"
) )
const ( const (
@ -42,11 +42,8 @@ func TestRunParser(t *testing.T) {
n.wg.Add(1) n.wg.Add(1)
go n.receiver() go n.receiver()
in <- natsMsg(testMsg) in <- natsMsg(testMsg)
time.Sleep(time.Millisecond * 25)
if acc.NFields() != 1 { acc.Wait(1)
t.Errorf("got %v, expected %v", acc.NFields(), 1)
}
} }
// Test that the parser ignores invalid messages // Test that the parser ignores invalid messages
@ -60,11 +57,10 @@ func TestRunParserInvalidMsg(t *testing.T) {
n.wg.Add(1) n.wg.Add(1)
go n.receiver() go n.receiver()
in <- natsMsg(invalidMsg) in <- natsMsg(invalidMsg)
time.Sleep(time.Millisecond * 25)
if acc.NFields() != 0 { acc.WaitError(1)
t.Errorf("got %v, expected %v", acc.NFields(), 0) assert.Contains(t, acc.Errors[0].Error(), "E! subject: telegraf, error: metric parsing error")
} assert.EqualValues(t, 0, acc.NMetrics())
} }
// Test that the parser parses line format messages into metrics // Test that the parser parses line format messages into metrics
@ -78,10 +74,10 @@ func TestRunParserAndGather(t *testing.T) {
n.wg.Add(1) n.wg.Add(1)
go n.receiver() go n.receiver()
in <- natsMsg(testMsg) in <- natsMsg(testMsg)
time.Sleep(time.Millisecond * 25)
n.Gather(&acc) n.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "cpu_load_short", acc.AssertContainsFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(23422)}) map[string]interface{}{"value": float64(23422)})
} }
@ -97,10 +93,10 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
n.wg.Add(1) n.wg.Add(1)
go n.receiver() go n.receiver()
in <- natsMsg(testMsgGraphite) in <- natsMsg(testMsgGraphite)
time.Sleep(time.Millisecond * 25)
n.Gather(&acc) n.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "cpu_load_short_graphite", acc.AssertContainsFields(t, "cpu_load_short_graphite",
map[string]interface{}{"value": float64(23422)}) map[string]interface{}{"value": float64(23422)})
} }
@ -116,10 +112,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.wg.Add(1) n.wg.Add(1)
go n.receiver() go n.receiver()
in <- natsMsg(testMsgJSON) in <- natsMsg(testMsgJSON)
time.Sleep(time.Millisecond * 25)
n.Gather(&acc) n.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "nats_json_test", acc.AssertContainsFields(t, "nats_json_test",
map[string]interface{}{ map[string]interface{}{
"a": float64(5), "a": float64(5),

View File

@ -81,42 +81,25 @@ func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {
acc := sl.Accumulator.(*testutil.Accumulator) acc := sl.Accumulator.(*testutil.Accumulator)
acc.Wait(3)
acc.Lock() acc.Lock()
if len(acc.Metrics) < 1 { m1 := acc.Metrics[0]
acc.Wait() m2 := acc.Metrics[1]
} m3 := acc.Metrics[2]
require.True(t, len(acc.Metrics) >= 1)
m := acc.Metrics[0]
acc.Unlock() acc.Unlock()
assert.Equal(t, "test", m.Measurement) assert.Equal(t, "test", m1.Measurement)
assert.Equal(t, map[string]string{"foo": "bar"}, m.Tags) assert.Equal(t, map[string]string{"foo": "bar"}, m1.Tags)
assert.Equal(t, map[string]interface{}{"v": int64(1)}, m.Fields) assert.Equal(t, map[string]interface{}{"v": int64(1)}, m1.Fields)
assert.True(t, time.Unix(0, 123456789).Equal(m.Time)) assert.True(t, time.Unix(0, 123456789).Equal(m1.Time))
acc.Lock() assert.Equal(t, "test", m2.Measurement)
if len(acc.Metrics) < 2 { assert.Equal(t, map[string]string{"foo": "baz"}, m2.Tags)
acc.Wait() assert.Equal(t, map[string]interface{}{"v": int64(2)}, m2.Fields)
} assert.True(t, time.Unix(0, 123456790).Equal(m2.Time))
require.True(t, len(acc.Metrics) >= 2)
m = acc.Metrics[1] assert.Equal(t, "test", m3.Measurement)
acc.Unlock() assert.Equal(t, map[string]string{"foo": "zab"}, m3.Tags)
assert.Equal(t, map[string]interface{}{"v": int64(3)}, m3.Fields)
assert.Equal(t, "test", m.Measurement) assert.True(t, time.Unix(0, 123456791).Equal(m3.Time))
assert.Equal(t, map[string]string{"foo": "baz"}, m.Tags)
assert.Equal(t, map[string]interface{}{"v": int64(2)}, m.Fields)
assert.True(t, time.Unix(0, 123456790).Equal(m.Time))
acc.Lock()
if len(acc.Metrics) < 3 {
acc.Wait()
}
require.True(t, len(acc.Metrics) >= 3)
m = acc.Metrics[2]
acc.Unlock()
assert.Equal(t, "test", m.Measurement)
assert.Equal(t, map[string]string{"foo": "zab"}, m.Tags)
assert.Equal(t, map[string]interface{}{"v": int64(3)}, m.Fields)
assert.True(t, time.Unix(0, 123456791).Equal(m.Time))
} }

View File

@ -2,7 +2,6 @@ package tail
import ( import (
"fmt" "fmt"
"log"
"sync" "sync"
"github.com/hpcloud/tail" "github.com/hpcloud/tail"
@ -86,7 +85,7 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
for _, filepath := range t.Files { for _, filepath := range t.Files {
g, err := globpath.Compile(filepath) g, err := globpath.Compile(filepath)
if err != nil { if err != nil {
log.Printf("E! Error Glob %s failed to compile, %s", filepath, err) t.acc.AddError(fmt.Errorf("E! Error Glob %s failed to compile, %s", filepath, err))
} }
for file, _ := range g.Match() { for file, _ := range g.Match() {
tailer, err := tail.TailFile(file, tailer, err := tail.TailFile(file,
@ -124,21 +123,21 @@ func (t *Tail) receiver(tailer *tail.Tail) {
var line *tail.Line var line *tail.Line
for line = range tailer.Lines { for line = range tailer.Lines {
if line.Err != nil { if line.Err != nil {
log.Printf("E! Error tailing file %s, Error: %s\n", t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, err) tailer.Filename, err))
continue continue
} }
m, err = t.parser.ParseLine(line.Text) m, err = t.parser.ParseLine(line.Text)
if err == nil { if err == nil {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
} else { } else {
log.Printf("E! Malformed log line in %s: [%s], Error: %s\n", t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
tailer.Filename, line.Text, err) tailer.Filename, line.Text, err))
} }
} }
if err := tailer.Err(); err != nil { if err := tailer.Err(); err != nil {
log.Printf("E! Error tailing file %s, Error: %s\n", t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, err) tailer.Filename, err))
} }
} }
@ -146,12 +145,12 @@ func (t *Tail) Stop() {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
for _, t := range t.tailers { for _, tailer := range t.tailers {
err := t.Stop() err := tailer.Stop()
if err != nil { if err != nil {
log.Printf("E! Error stopping tail on file %s\n", t.Filename) t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename))
} }
t.Cleanup() tailer.Cleanup()
} }
t.wg.Wait() t.wg.Wait()
} }

View File

@ -3,6 +3,7 @@ package tail
import ( import (
"io/ioutil" "io/ioutil"
"os" "os"
"runtime"
"testing" "testing"
"time" "time"
@ -30,11 +31,9 @@ func TestTailFromBeginning(t *testing.T) {
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc)) require.NoError(t, tt.Start(&acc))
time.Sleep(time.Millisecond * 100)
require.NoError(t, tt.Gather(&acc)) require.NoError(t, tt.Gather(&acc))
// arbitrary sleep to wait for message to show up
time.Sleep(time.Millisecond * 150)
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu", acc.AssertContainsTaggedFields(t, "cpu",
map[string]interface{}{ map[string]interface{}{
"usage_idle": float64(100), "usage_idle": float64(100),
@ -60,13 +59,19 @@ func TestTailFromEnd(t *testing.T) {
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc)) require.NoError(t, tt.Start(&acc))
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 200) //TODO remove once https://github.com/hpcloud/tail/pull/114 is merged & added to Godeps
for _, tailer := range tt.tailers {
for n, err := tailer.Tell(); err == nil && n == 0; n, err = tailer.Tell() {
// wait for tailer to jump to end
runtime.Gosched()
}
}
_, err = tmpfile.WriteString("cpu,othertag=foo usage_idle=100\n") _, err = tmpfile.WriteString("cpu,othertag=foo usage_idle=100\n")
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, tt.Gather(&acc)) require.NoError(t, tt.Gather(&acc))
time.Sleep(time.Millisecond * 50)
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu", acc.AssertContainsTaggedFields(t, "cpu",
map[string]interface{}{ map[string]interface{}{
"usage_idle": float64(100), "usage_idle": float64(100),
@ -96,7 +101,7 @@ func TestTailBadLine(t *testing.T) {
_, err = tmpfile.WriteString("cpu mytag= foo usage_idle= 100\n") _, err = tmpfile.WriteString("cpu mytag= foo usage_idle= 100\n")
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, tt.Gather(&acc)) require.NoError(t, tt.Gather(&acc))
time.Sleep(time.Millisecond * 50)
assert.Len(t, acc.Metrics, 0) acc.WaitError(1)
assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line")
} }

View File

@ -1,10 +1,15 @@
package tcp_listener package tcp_listener
import ( import (
"bufio"
"bytes"
"fmt" "fmt"
"io"
"log"
"net" "net"
"os"
"strings"
"testing" "testing"
"time"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -54,7 +59,6 @@ func BenchmarkTCP(b *testing.B) {
panic(err) panic(err)
} }
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("tcp", "127.0.0.1:8198") conn, err := net.Dial("tcp", "127.0.0.1:8198")
if err != nil { if err != nil {
panic(err) panic(err)
@ -62,8 +66,10 @@ func BenchmarkTCP(b *testing.B) {
for i := 0; i < 100000; i++ { for i := 0; i < 100000; i++ {
fmt.Fprintf(conn, testMsg) fmt.Fprintf(conn, testMsg)
} }
// wait for 100,000 metrics to get added to accumulator conn.(*net.TCPConn).CloseWrite()
time.Sleep(time.Millisecond) // wait for all 100,000 metrics to be processed
buf := []byte{0}
conn.Read(buf) // will EOF when completed
listener.Stop() listener.Stop()
} }
} }
@ -81,16 +87,18 @@ func TestHighTrafficTCP(t *testing.T) {
err := listener.Start(acc) err := listener.Start(acc)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("tcp", "127.0.0.1:8199") conn, err := net.Dial("tcp", "127.0.0.1:8199")
require.NoError(t, err) require.NoError(t, err)
for i := 0; i < 100000; i++ { for i := 0; i < 100000; i++ {
fmt.Fprintf(conn, testMsg) fmt.Fprintf(conn, testMsg)
} }
time.Sleep(time.Millisecond) conn.(*net.TCPConn).CloseWrite()
buf := []byte{0}
_, err = conn.Read(buf)
assert.Equal(t, err, io.EOF)
listener.Stop() listener.Stop()
assert.Equal(t, 100000, len(acc.Metrics)) assert.Equal(t, 100000, int(acc.NMetrics()))
} }
func TestConnectTCP(t *testing.T) { func TestConnectTCP(t *testing.T) {
@ -105,13 +113,12 @@ func TestConnectTCP(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("tcp", "127.0.0.1:8194") conn, err := net.Dial("tcp", "127.0.0.1:8194")
require.NoError(t, err) require.NoError(t, err)
// send single message to socket // send single message to socket
fmt.Fprintf(conn, testMsg) fmt.Fprintf(conn, testMsg)
time.Sleep(time.Millisecond * 15) acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short", acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"}, map[string]string{"host": "server01"},
@ -119,7 +126,7 @@ func TestConnectTCP(t *testing.T) {
// send multiple messages to socket // send multiple messages to socket
fmt.Fprintf(conn, testMsgs) fmt.Fprintf(conn, testMsgs)
time.Sleep(time.Millisecond * 15) acc.Wait(6)
hostTags := []string{"server02", "server03", hostTags := []string{"server02", "server03",
"server04", "server05", "server06"} "server04", "server05", "server06"}
for _, hostTag := range hostTags { for _, hostTag := range hostTags {
@ -143,7 +150,6 @@ func TestConcurrentConns(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
_, err := net.Dial("tcp", "127.0.0.1:8195") _, err := net.Dial("tcp", "127.0.0.1:8195")
assert.NoError(t, err) assert.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8195") _, err = net.Dial("tcp", "127.0.0.1:8195")
@ -162,10 +168,8 @@ func TestConcurrentConns(t *testing.T) {
" the Telegraf tcp listener configuration.\n", " the Telegraf tcp listener configuration.\n",
string(buf[:n])) string(buf[:n]))
_, err = conn.Write([]byte(testMsg)) _, err = conn.Read(buf)
assert.NoError(t, err) assert.Equal(t, io.EOF, err)
time.Sleep(time.Millisecond * 10)
assert.Zero(t, acc.NFields())
} }
// Test that MaxTCPConections is respected when max==1 // Test that MaxTCPConections is respected when max==1
@ -181,7 +185,6 @@ func TestConcurrentConns1(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
_, err := net.Dial("tcp", "127.0.0.1:8196") _, err := net.Dial("tcp", "127.0.0.1:8196")
assert.NoError(t, err) assert.NoError(t, err)
@ -198,10 +201,8 @@ func TestConcurrentConns1(t *testing.T) {
" the Telegraf tcp listener configuration.\n", " the Telegraf tcp listener configuration.\n",
string(buf[:n])) string(buf[:n]))
_, err = conn.Write([]byte(testMsg)) _, err = conn.Read(buf)
assert.NoError(t, err) assert.Equal(t, io.EOF, err)
time.Sleep(time.Millisecond * 10)
assert.Zero(t, acc.NFields())
} }
// Test that MaxTCPConections is respected // Test that MaxTCPConections is respected
@ -216,7 +217,6 @@ func TestCloseConcurrentConns(t *testing.T) {
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
time.Sleep(time.Millisecond * 25)
_, err := net.Dial("tcp", "127.0.0.1:8195") _, err := net.Dial("tcp", "127.0.0.1:8195")
assert.NoError(t, err) assert.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8195") _, err = net.Dial("tcp", "127.0.0.1:8195")
@ -238,13 +238,9 @@ func TestRunParser(t *testing.T) {
go listener.tcpParser() go listener.tcpParser()
in <- testmsg in <- testmsg
time.Sleep(time.Millisecond * 25)
listener.Gather(&acc) listener.Gather(&acc)
if a := acc.NFields(); a != 1 { acc.Wait(1)
t.Errorf("got %v, expected %v", a, 1)
}
acc.AssertContainsTaggedFields(t, "cpu_load_short", acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"}, map[string]string{"host": "server01"},
@ -263,11 +259,16 @@ func TestRunParserInvalidMsg(t *testing.T) {
listener.wg.Add(1) listener.wg.Add(1)
go listener.tcpParser() go listener.tcpParser()
buf := bytes.NewBuffer(nil)
log.SetOutput(buf)
defer log.SetOutput(os.Stderr)
in <- testmsg in <- testmsg
time.Sleep(time.Millisecond * 25)
if a := acc.NFields(); a != 0 { scnr := bufio.NewScanner(buf)
t.Errorf("got %v, expected %v", a, 0) for scnr.Scan() {
if strings.Contains(scnr.Text(), fmt.Sprintf(malformedwarn, 1)) {
break
}
} }
} }
@ -284,9 +285,9 @@ func TestRunParserGraphiteMsg(t *testing.T) {
go listener.tcpParser() go listener.tcpParser()
in <- testmsg in <- testmsg
time.Sleep(time.Millisecond * 25)
listener.Gather(&acc) listener.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "cpu_load_graphite", acc.AssertContainsFields(t, "cpu_load_graphite",
map[string]interface{}{"value": float64(12)}) map[string]interface{}{"value": float64(12)})
} }
@ -304,9 +305,9 @@ func TestRunParserJSONMsg(t *testing.T) {
go listener.tcpParser() go listener.tcpParser()
in <- testmsg in <- testmsg
time.Sleep(time.Millisecond * 25)
listener.Gather(&acc) listener.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "udp_json_test", acc.AssertContainsFields(t, "udp_json_test",
map[string]interface{}{ map[string]interface{}{
"a": float64(5), "a": float64(5),

View File

@ -1,6 +1,7 @@
package udp_listener package udp_listener
import ( import (
"fmt"
"log" "log"
"net" "net"
"sync" "sync"
@ -107,8 +108,9 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error {
u.in = make(chan []byte, u.AllowedPendingMessages) u.in = make(chan []byte, u.AllowedPendingMessages)
u.done = make(chan struct{}) u.done = make(chan struct{})
u.wg.Add(2) u.udpListen()
go u.udpListen()
u.wg.Add(1)
go u.udpParser() go u.udpParser()
log.Printf("I! Started UDP listener service on %s (ReadBuffer: %d)\n", u.ServiceAddress, u.UDPBufferSize) log.Printf("I! Started UDP listener service on %s (ReadBuffer: %d)\n", u.ServiceAddress, u.UDPBufferSize)
@ -126,32 +128,37 @@ func (u *UdpListener) Stop() {
} }
func (u *UdpListener) udpListen() error { func (u *UdpListener) udpListen() error {
defer u.wg.Done()
var err error var err error
address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress) address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress)
u.listener, err = net.ListenUDP("udp", address) u.listener, err = net.ListenUDP("udp", address)
if err != nil { if err != nil {
log.Fatalf("E! Error: ListenUDP - %s", err) return fmt.Errorf("E! Error: ListenUDP - %s", err)
} }
log.Println("I! UDP server listening on: ", u.listener.LocalAddr().String()) log.Println("I! UDP server listening on: ", u.listener.LocalAddr().String())
buf := make([]byte, UDP_MAX_PACKET_SIZE)
if u.UDPBufferSize > 0 { if u.UDPBufferSize > 0 {
err = u.listener.SetReadBuffer(u.UDPBufferSize) // if we want to move away from OS default err = u.listener.SetReadBuffer(u.UDPBufferSize) // if we want to move away from OS default
if err != nil { if err != nil {
log.Printf("E! Failed to set UDP read buffer to %d: %s", u.UDPBufferSize, err) return fmt.Errorf("E! Failed to set UDP read buffer to %d: %s", u.UDPBufferSize, err)
return err
} }
} }
u.wg.Add(1)
go u.udpListenLoop()
return nil
}
func (u *UdpListener) udpListenLoop() {
defer u.wg.Done()
buf := make([]byte, UDP_MAX_PACKET_SIZE)
for { for {
select { select {
case <-u.done: case <-u.done:
return nil return
default: default:
u.listener.SetReadDeadline(time.Now().Add(time.Second)) u.listener.SetReadDeadline(time.Now().Add(time.Second))

View File

@ -1,12 +1,16 @@
package udp_listener package udp_listener
import ( import (
"bufio"
"bytes"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net" "net"
"os"
"runtime"
"strings"
"testing" "testing"
"time"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -50,22 +54,27 @@ func TestHighTrafficUDP(t *testing.T) {
err := listener.Start(acc) err := listener.Start(acc)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("udp", "127.0.0.1:8126") conn, err := net.Dial("udp", "127.0.0.1:8126")
require.NoError(t, err) require.NoError(t, err)
mlen := int64(len(testMsgs))
var sent int64
for i := 0; i < 20000; i++ { for i := 0; i < 20000; i++ {
// arbitrary, just to give the OS buffer some slack handling the for sent > listener.BytesRecv.Get()+32000 {
// packet storm. // more than 32kb sitting in OS buffer, let it drain
time.Sleep(time.Microsecond) runtime.Gosched()
fmt.Fprintf(conn, testMsgs) }
conn.Write([]byte(testMsgs))
sent += mlen
}
for sent > listener.BytesRecv.Get() {
runtime.Gosched()
}
for len(listener.in) > 0 {
runtime.Gosched()
} }
time.Sleep(time.Millisecond)
listener.Stop() listener.Stop()
// this is not an exact science, since UDP packets can easily get lost or assert.Equal(t, uint64(100000), acc.NMetrics())
// dropped, but assume that the OS will be able to
// handle at least 90% of the sent UDP packets.
assert.InDelta(t, 100000, len(acc.Metrics), 10000)
} }
func TestConnectUDP(t *testing.T) { func TestConnectUDP(t *testing.T) {
@ -79,13 +88,12 @@ func TestConnectUDP(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("udp", "127.0.0.1:8127") conn, err := net.Dial("udp", "127.0.0.1:8127")
require.NoError(t, err) require.NoError(t, err)
// send single message to socket // send single message to socket
fmt.Fprintf(conn, testMsg) fmt.Fprintf(conn, testMsg)
time.Sleep(time.Millisecond * 15) acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short", acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"}, map[string]string{"host": "server01"},
@ -93,7 +101,7 @@ func TestConnectUDP(t *testing.T) {
// send multiple messages to socket // send multiple messages to socket
fmt.Fprintf(conn, testMsgs) fmt.Fprintf(conn, testMsgs)
time.Sleep(time.Millisecond * 15) acc.Wait(6)
hostTags := []string{"server02", "server03", hostTags := []string{"server02", "server03",
"server04", "server05", "server06"} "server04", "server05", "server06"}
for _, hostTag := range hostTags { for _, hostTag := range hostTags {
@ -118,13 +126,9 @@ func TestRunParser(t *testing.T) {
go listener.udpParser() go listener.udpParser()
in <- testmsg in <- testmsg
time.Sleep(time.Millisecond * 25)
listener.Gather(&acc) listener.Gather(&acc)
if a := acc.NFields(); a != 1 { acc.Wait(1)
t.Errorf("got %v, expected %v", a, 1)
}
acc.AssertContainsTaggedFields(t, "cpu_load_short", acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"}, map[string]string{"host": "server01"},
@ -144,11 +148,16 @@ func TestRunParserInvalidMsg(t *testing.T) {
listener.wg.Add(1) listener.wg.Add(1)
go listener.udpParser() go listener.udpParser()
buf := bytes.NewBuffer(nil)
log.SetOutput(buf)
defer log.SetOutput(os.Stderr)
in <- testmsg in <- testmsg
time.Sleep(time.Millisecond * 25)
if a := acc.NFields(); a != 0 { scnr := bufio.NewScanner(buf)
t.Errorf("got %v, expected %v", a, 0) for scnr.Scan() {
if strings.Contains(scnr.Text(), fmt.Sprintf(malformedwarn, 1)) {
break
}
} }
} }
@ -166,9 +175,9 @@ func TestRunParserGraphiteMsg(t *testing.T) {
go listener.udpParser() go listener.udpParser()
in <- testmsg in <- testmsg
time.Sleep(time.Millisecond * 25)
listener.Gather(&acc) listener.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "cpu_load_graphite", acc.AssertContainsFields(t, "cpu_load_graphite",
map[string]interface{}{"value": float64(12)}) map[string]interface{}{"value": float64(12)})
} }
@ -187,9 +196,9 @@ func TestRunParserJSONMsg(t *testing.T) {
go listener.udpParser() go listener.udpParser()
in <- testmsg in <- testmsg
time.Sleep(time.Millisecond * 25)
listener.Gather(&acc) listener.Gather(&acc)
acc.Wait(1)
acc.AssertContainsFields(t, "udp_json_test", acc.AssertContainsFields(t, "udp_json_test",
map[string]interface{}{ map[string]interface{}{
"a": float64(5), "a": float64(5),

View File

@ -44,9 +44,7 @@ func TestGraphiteOK(t *testing.T) {
// Start TCP server // Start TCP server
wg.Add(1) wg.Add(1)
t.Log("Starting server") t.Log("Starting server")
go TCPServer1(t, &wg) TCPServer1(t, &wg)
// Give the fake graphite TCP server some time to start:
time.Sleep(time.Millisecond * 100)
// Init plugin // Init plugin
g := Graphite{ g := Graphite{
@ -88,10 +86,8 @@ func TestGraphiteOK(t *testing.T) {
t.Log("Finished Waiting for first data") t.Log("Finished Waiting for first data")
var wg2 sync.WaitGroup var wg2 sync.WaitGroup
// Start TCP server // Start TCP server
time.Sleep(time.Millisecond * 100)
wg2.Add(1) wg2.Add(1)
go TCPServer2(t, &wg2) TCPServer2(t, &wg2)
time.Sleep(time.Millisecond * 100)
//Write but expect an error, but reconnect //Write but expect an error, but reconnect
g.Write(metrics2) g.Write(metrics2)
err3 := g.Write(metrics2) err3 := g.Write(metrics2)
@ -105,8 +101,9 @@ func TestGraphiteOK(t *testing.T) {
} }
func TCPServer1(t *testing.T, wg *sync.WaitGroup) { func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
defer wg.Done()
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn, _ := (tcpServer).Accept() conn, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn) reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader) tp := textproto.NewReader(reader)
@ -114,11 +111,13 @@ func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
conn.Close() conn.Close()
tcpServer.Close() tcpServer.Close()
}()
} }
func TCPServer2(t *testing.T, wg *sync.WaitGroup) { func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
defer wg.Done()
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn2, _ := (tcpServer).Accept() conn2, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn2) reader := bufio.NewReader(conn2)
tp := textproto.NewReader(reader) tp := textproto.NewReader(reader)
@ -128,4 +127,5 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3) assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3)
conn2.Close() conn2.Close()
tcpServer.Close() tcpServer.Close()
}()
} }

View File

@ -66,7 +66,6 @@ func TestUDPClient_Write(t *testing.T) {
}() }()
// test sending simple metric // test sending simple metric
time.Sleep(time.Second)
n, err := client.Write([]byte("cpu value=99\n")) n, err := client.Write([]byte("cpu value=99\n"))
assert.Equal(t, n, 13) assert.Equal(t, n, 13)
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -16,9 +16,7 @@ import (
func TestWrite(t *testing.T) { func TestWrite(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go TCPServer(t, &wg) TCPServer(t, &wg)
// Give the fake TCP server some time to start:
time.Sleep(time.Millisecond * 100)
i := Instrumental{ i := Instrumental{
Host: "127.0.0.1", Host: "127.0.0.1",
@ -79,6 +77,7 @@ func TestWrite(t *testing.T) {
func TCPServer(t *testing.T, wg *sync.WaitGroup) { func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:8000") tcpServer, _ := net.Listen("tcp", "127.0.0.1:8000")
go func() {
defer wg.Done() defer wg.Done()
conn, _ := tcpServer.Accept() conn, _ := tcpServer.Accept()
conn.SetDeadline(time.Now().Add(1 * time.Second)) conn.SetDeadline(time.Now().Add(1 * time.Second))
@ -120,4 +119,5 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
assert.Equal(t, "", data6) assert.Equal(t, "", data6)
conn.Close() conn.Close()
}()
} }

View File

@ -129,6 +129,9 @@ func (a *Accumulator) AddError(err error) {
} }
a.Lock() a.Lock()
a.Errors = append(a.Errors, err) a.Errors = append(a.Errors, err)
if a.Cond != nil {
a.Cond.Broadcast()
}
a.Unlock() a.Unlock()
} }
@ -198,14 +201,29 @@ func (a *Accumulator) NFields() int {
return counter return counter
} }
// Wait waits for a metric to be added to the accumulator. // Wait waits for the given number of metrics to be added to the accumulator.
// Accumulator must already be locked. func (a *Accumulator) Wait(n int) {
func (a *Accumulator) Wait() { a.Lock()
if a.Cond == nil { if a.Cond == nil {
a.Cond = sync.NewCond(&a.Mutex) a.Cond = sync.NewCond(&a.Mutex)
} }
for int(a.NMetrics()) < n {
a.Cond.Wait() a.Cond.Wait()
} }
a.Unlock()
}
// WaitError waits for the given number of errors to be added to the accumulator.
func (a *Accumulator) WaitError(n int) {
a.Lock()
if a.Cond == nil {
a.Cond = sync.NewCond(&a.Mutex)
}
for len(a.Errors) < n {
a.Cond.Wait()
}
a.Unlock()
}
func (a *Accumulator) AssertContainsTaggedFields( func (a *Accumulator) AssertContainsTaggedFields(
t *testing.T, t *testing.T,