diff --git a/plugins/inputs/cloudwatch/cloudwatch_test.go b/plugins/inputs/cloudwatch/cloudwatch_test.go index f2d58a00c..3aaab7d45 100644 --- a/plugins/inputs/cloudwatch/cloudwatch_test.go +++ b/plugins/inputs/cloudwatch/cloudwatch_test.go @@ -207,14 +207,13 @@ func TestGenerateStatisticsInputParams(t *testing.T) { } func TestMetricsCacheTimeout(t *testing.T) { - ttl, _ := time.ParseDuration("5ms") cache := &MetricCache{ Metrics: []*cloudwatch.Metric{}, Fetched: time.Now(), - TTL: ttl, + TTL: time.Minute, } assert.True(t, cache.IsValid()) - time.Sleep(ttl) + cache.Fetched = time.Now().Add(-time.Minute) assert.False(t, cache.IsValid()) } diff --git a/plugins/inputs/http_listener/http_listener_test.go b/plugins/inputs/http_listener/http_listener_test.go index b5f858fde..7e6fbc8ab 100644 --- a/plugins/inputs/http_listener/http_listener_test.go +++ b/plugins/inputs/http_listener/http_listener_test.go @@ -6,7 +6,6 @@ import ( "net/http" "sync" "testing" - "time" "github.com/influxdata/telegraf/testutil" @@ -43,14 +42,12 @@ func TestWriteHTTP(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) - // post single message to listener resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) - time.Sleep(time.Millisecond * 15) + acc.Wait(1) acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, map[string]string{"host": "server01"}, @@ -61,7 +58,7 @@ func TestWriteHTTP(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) - time.Sleep(time.Millisecond * 15) + acc.Wait(2) hostTags := []string{"server02", "server03", "server04", "server05", "server06"} for _, hostTag := range hostTags { @@ -76,7 +73,7 @@ func TestWriteHTTP(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 400, resp.StatusCode) - time.Sleep(time.Millisecond * 15) + acc.Wait(3) acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, map[string]string{"host": "server01"}, @@ -91,14 +88,12 @@ func TestWriteHTTPNoNewline(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) - // post single message to listener resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgNoNewline))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) - time.Sleep(time.Millisecond * 15) + acc.Wait(1) acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, map[string]string{"host": "server01"}, @@ -115,8 +110,6 @@ func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) - // Post a gigantic metric to the listener and verify that it writes OK this time: resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric))) require.NoError(t, err) @@ -133,8 +126,6 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) - resp, err := http.Post("http://localhost:8297/write", "", bytes.NewBuffer([]byte(hugeMetric))) require.NoError(t, err) require.EqualValues(t, 413, resp.StatusCode) @@ -150,15 +141,13 @@ func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) - resp, err := http.Post("http://localhost:8298/write", "", bytes.NewBuffer([]byte(testMsgs))) require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) - time.Sleep(time.Millisecond * 15) hostTags := []string{"server02", "server03", "server04", "server05", "server06"} + acc.Wait(len(hostTags)) for _, hostTag := range hostTags { acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, @@ -177,15 +166,13 @@ func TestWriteHTTPLargeLinesSkipped(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) - resp, err := http.Post("http://localhost:8300/write", "", bytes.NewBuffer([]byte(hugeMetric+testMsgs))) require.NoError(t, err) require.EqualValues(t, 400, resp.StatusCode) - time.Sleep(time.Millisecond * 15) hostTags := []string{"server02", "server03", "server04", "server05", "server06"} + acc.Wait(len(hostTags)) for _, hostTag := range hostTags { acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, @@ -204,8 +191,6 @@ func TestWriteHTTPGzippedData(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) - data, err := ioutil.ReadFile("./testdata/testmsgs.gz") require.NoError(t, err) @@ -218,9 +203,9 @@ func TestWriteHTTPGzippedData(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 204, resp.StatusCode) - time.Sleep(time.Millisecond * 50) hostTags := []string{"server02", "server03", "server04", "server05", "server06"} + acc.Wait(len(hostTags)) for _, hostTag := range hostTags { acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, @@ -237,8 +222,6 @@ func TestWriteHTTPHighTraffic(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) - // post many messages to listener var wg sync.WaitGroup for i := 0; i < 10; i++ { @@ -254,9 +237,9 @@ func TestWriteHTTPHighTraffic(t *testing.T) { } wg.Wait() - time.Sleep(time.Millisecond * 250) listener.Gather(acc) + acc.Wait(25000) require.Equal(t, int64(25000), int64(acc.NMetrics())) } @@ -267,8 +250,6 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) - // post single message to listener resp, err := http.Post("http://localhost:8186/foobar", "", bytes.NewBuffer([]byte(testMsg))) require.NoError(t, err) @@ -276,16 +257,12 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) { } func TestWriteHTTPInvalid(t *testing.T) { - time.Sleep(time.Millisecond * 250) - listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) - // post single message to listener resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg))) require.NoError(t, err) @@ -293,16 +270,12 @@ func TestWriteHTTPInvalid(t *testing.T) { } func TestWriteHTTPEmpty(t *testing.T) { - time.Sleep(time.Millisecond * 250) - listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) - // post single message to listener resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(emptyMsg))) require.NoError(t, err) @@ -310,16 +283,12 @@ func TestWriteHTTPEmpty(t *testing.T) { } func TestQueryAndPingHTTP(t *testing.T) { - time.Sleep(time.Millisecond * 250) - listener := newTestHTTPListener() acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) - // post query to listener resp, err := http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil) require.NoError(t, err) diff --git a/plugins/inputs/http_response/http_response_test.go b/plugins/inputs/http_response/http_response_test.go index 236e5d88b..b65b1f954 100644 --- a/plugins/inputs/http_response/http_response_test.go +++ b/plugins/inputs/http_response/http_response_test.go @@ -329,7 +329,7 @@ func TestTimeout(t *testing.T) { Address: ts.URL + "/twosecondnap", Body: "{ 'test': 'data'}", Method: "GET", - ResponseTimeout: internal.Duration{Duration: time.Second * 1}, + ResponseTimeout: internal.Duration{Duration: time.Millisecond}, Headers: map[string]string{ "Content-Type": "application/json", }, diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index f4176edd3..6f1f4020b 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -1,6 +1,7 @@ package kafka_consumer import ( + "fmt" "log" "strings" "sync" @@ -129,13 +130,13 @@ func (k *Kafka) receiver() { return case err := <-k.errs: if err != nil { - log.Printf("E! Kafka Consumer Error: %s\n", err) + k.acc.AddError(fmt.Errorf("Kafka Consumer Error: %s\n", err)) } case msg := <-k.in: metrics, err := k.parser.Parse(msg.Value) if err != nil { - log.Printf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s", - string(msg.Value), err.Error()) + k.acc.AddError(fmt.Errorf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s", + string(msg.Value), err.Error())) } for _, metric := range metrics { @@ -158,7 +159,7 @@ func (k *Kafka) Stop() { defer k.Unlock() close(k.done) if err := k.Consumer.Close(); err != nil { - log.Printf("E! Error closing kafka consumer: %s\n", err.Error()) + k.acc.AddError(fmt.Errorf("E! Error closing kafka consumer: %s\n", err.Error())) } } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index c4936974f..e1c24adbe 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -2,7 +2,6 @@ package kafka_consumer import ( "testing" - "time" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" @@ -43,7 +42,7 @@ func TestRunParser(t *testing.T) { k.parser, _ = parsers.NewInfluxParser() go k.receiver() in <- saramaMsg(testMsg) - time.Sleep(time.Millisecond * 5) + acc.Wait(1) assert.Equal(t, acc.NFields(), 1) } @@ -58,7 +57,7 @@ func TestRunParserInvalidMsg(t *testing.T) { k.parser, _ = parsers.NewInfluxParser() go k.receiver() in <- saramaMsg(invalidMsg) - time.Sleep(time.Millisecond * 5) + acc.WaitError(1) assert.Equal(t, acc.NFields(), 0) } @@ -73,7 +72,7 @@ func TestRunParserAndGather(t *testing.T) { k.parser, _ = parsers.NewInfluxParser() go k.receiver() in <- saramaMsg(testMsg) - time.Sleep(time.Millisecond * 5) + acc.Wait(1) k.Gather(&acc) @@ -92,7 +91,7 @@ func TestRunParserAndGatherGraphite(t *testing.T) { k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) go k.receiver() in <- saramaMsg(testMsgGraphite) - time.Sleep(time.Millisecond * 5) + acc.Wait(1) k.Gather(&acc) @@ -111,7 +110,7 @@ func TestRunParserAndGatherJSON(t *testing.T) { k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil) go k.receiver() in <- saramaMsg(testMsgJSON) - time.Sleep(time.Millisecond * 5) + acc.Wait(1) k.Gather(&acc) diff --git a/plugins/inputs/logparser/logparser_test.go b/plugins/inputs/logparser/logparser_test.go index 059bfd266..db9795f28 100644 --- a/plugins/inputs/logparser/logparser_test.go +++ b/plugins/inputs/logparser/logparser_test.go @@ -6,7 +6,6 @@ import ( "runtime" "strings" "testing" - "time" "github.com/influxdata/telegraf/testutil" @@ -41,7 +40,6 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) { acc := testutil.Accumulator{} assert.Error(t, logparser.Start(&acc)) - time.Sleep(time.Millisecond * 500) logparser.Stop() } @@ -61,7 +59,8 @@ func TestGrokParseLogFiles(t *testing.T) { acc := testutil.Accumulator{} assert.NoError(t, logparser.Start(&acc)) - time.Sleep(time.Millisecond * 500) + acc.Wait(2) + logparser.Stop() acc.AssertContainsTaggedFields(t, "logparser_grok", @@ -102,14 +101,13 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) { acc := testutil.Accumulator{} assert.NoError(t, logparser.Start(&acc)) - time.Sleep(time.Millisecond * 500) assert.Equal(t, acc.NFields(), 0) os.Symlink( thisdir+"grok/testdata/test_a.log", emptydir+"/test_a.log") assert.NoError(t, logparser.Gather(&acc)) - time.Sleep(time.Millisecond * 500) + acc.Wait(1) logparser.Stop() @@ -143,7 +141,7 @@ func TestGrokParseLogFilesOneBad(t *testing.T) { acc.SetDebug(true) assert.NoError(t, logparser.Start(&acc)) - time.Sleep(time.Millisecond * 500) + acc.Wait(1) logparser.Stop() acc.AssertContainsTaggedFields(t, "logparser_grok", diff --git a/plugins/inputs/mongodb/mongodb_server_test.go b/plugins/inputs/mongodb/mongodb_server_test.go index 7ad0f38a2..e9d1bae9e 100644 --- a/plugins/inputs/mongodb/mongodb_server_test.go +++ b/plugins/inputs/mongodb/mongodb_server_test.go @@ -4,7 +4,6 @@ package mongodb import ( "testing" - "time" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" @@ -32,12 +31,11 @@ func TestAddDefaultStats(t *testing.T) { err := server.gatherData(&acc, false) require.NoError(t, err) - time.Sleep(time.Duration(1) * time.Second) // need to call this twice so it can perform the diff err = server.gatherData(&acc, false) require.NoError(t, err) for key, _ := range DefaultStats { - assert.True(t, acc.HasIntValue(key)) + assert.True(t, acc.HasIntField("mongodb", key)) } } diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index cfade2944..3ea0480b8 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -142,8 +142,8 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) { subscribeToken := c.SubscribeMultiple(topics, m.recvMessage) subscribeToken.Wait() if subscribeToken.Error() != nil { - log.Printf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s", - strings.Join(m.Topics[:], ","), subscribeToken.Error()) + m.acc.AddError(fmt.Errorf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s", + strings.Join(m.Topics[:], ","), subscribeToken.Error())) } m.started = true } @@ -151,7 +151,7 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) { } func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) { - log.Printf("E! MQTT Connection lost\nerror: %s\nMQTT Client will try to reconnect", err.Error()) + m.acc.AddError(fmt.Errorf("E! MQTT Connection lost\nerror: %s\nMQTT Client will try to reconnect", err.Error())) return } @@ -166,8 +166,8 @@ func (m *MQTTConsumer) receiver() { topic := msg.Topic() metrics, err := m.parser.Parse(msg.Payload()) if err != nil { - log.Printf("E! MQTT Parse Error\nmessage: %s\nerror: %s", - string(msg.Payload()), err.Error()) + m.acc.AddError(fmt.Errorf("E! MQTT Parse Error\nmessage: %s\nerror: %s", + string(msg.Payload()), err.Error())) } for _, metric := range metrics { diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index 2f5276191..027e4818b 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -2,7 +2,6 @@ package mqtt_consumer import ( "testing" - "time" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" @@ -86,7 +85,7 @@ func TestRunParser(t *testing.T) { n.parser, _ = parsers.NewInfluxParser() go n.receiver() in <- mqttMsg(testMsgNeg) - time.Sleep(time.Millisecond * 250) + acc.Wait(1) if a := acc.NFields(); a != 1 { t.Errorf("got %v, expected %v", a, 1) @@ -102,7 +101,7 @@ func TestRunParserNegativeNumber(t *testing.T) { n.parser, _ = parsers.NewInfluxParser() go n.receiver() in <- mqttMsg(testMsg) - time.Sleep(time.Millisecond * 25) + acc.Wait(1) if a := acc.NFields(); a != 1 { t.Errorf("got %v, expected %v", a, 1) @@ -119,11 +118,12 @@ func TestRunParserInvalidMsg(t *testing.T) { n.parser, _ = parsers.NewInfluxParser() go n.receiver() in <- mqttMsg(invalidMsg) - time.Sleep(time.Millisecond * 25) + acc.WaitError(1) if a := acc.NFields(); a != 0 { t.Errorf("got %v, expected %v", a, 0) } + assert.Contains(t, acc.Errors[0].Error(), "MQTT Parse Error") } // Test that the parser parses line format messages into metrics @@ -136,7 +136,7 @@ func TestRunParserAndGather(t *testing.T) { n.parser, _ = parsers.NewInfluxParser() go n.receiver() in <- mqttMsg(testMsg) - time.Sleep(time.Millisecond * 25) + acc.Wait(1) n.Gather(&acc) @@ -154,9 +154,9 @@ func TestRunParserAndGatherGraphite(t *testing.T) { n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) go n.receiver() in <- mqttMsg(testMsgGraphite) - time.Sleep(time.Millisecond * 25) n.Gather(&acc) + acc.Wait(1) acc.AssertContainsFields(t, "cpu_load_short_graphite", map[string]interface{}{"value": float64(23422)}) @@ -172,10 +172,11 @@ func TestRunParserAndGatherJSON(t *testing.T) { n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) go n.receiver() in <- mqttMsg(testMsgJSON) - time.Sleep(time.Millisecond * 25) n.Gather(&acc) + acc.Wait(1) + acc.AssertContainsFields(t, "nats_json_test", map[string]interface{}{ "a": float64(5), diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index cbb85e016..7c9f53941 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -162,11 +162,11 @@ func (n *natsConsumer) receiver() { case <-n.done: return case err := <-n.errs: - log.Printf("E! error reading from %s\n", err.Error()) + n.acc.AddError(fmt.Errorf("E! error reading from %s\n", err.Error())) case msg := <-n.in: metrics, err := n.parser.Parse(msg.Data) if err != nil { - log.Printf("E! subject: %s, error: %s", msg.Subject, err.Error()) + n.acc.AddError(fmt.Errorf("E! subject: %s, error: %s", msg.Subject, err.Error())) } for _, metric := range metrics { @@ -179,8 +179,8 @@ func (n *natsConsumer) receiver() { func (n *natsConsumer) clean() { for _, sub := range n.Subs { if err := sub.Unsubscribe(); err != nil { - log.Printf("E! Error unsubscribing from subject %s in queue %s: %s\n", - sub.Subject, sub.Queue, err.Error()) + n.acc.AddError(fmt.Errorf("E! Error unsubscribing from subject %s in queue %s: %s\n", + sub.Subject, sub.Queue, err.Error())) } } diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 2f4d14d73..30ba0d2af 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -2,11 +2,11 @@ package natsconsumer import ( "testing" - "time" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" "github.com/nats-io/nats" + "github.com/stretchr/testify/assert" ) const ( @@ -42,11 +42,8 @@ func TestRunParser(t *testing.T) { n.wg.Add(1) go n.receiver() in <- natsMsg(testMsg) - time.Sleep(time.Millisecond * 25) - if acc.NFields() != 1 { - t.Errorf("got %v, expected %v", acc.NFields(), 1) - } + acc.Wait(1) } // Test that the parser ignores invalid messages @@ -60,11 +57,10 @@ func TestRunParserInvalidMsg(t *testing.T) { n.wg.Add(1) go n.receiver() in <- natsMsg(invalidMsg) - time.Sleep(time.Millisecond * 25) - if acc.NFields() != 0 { - t.Errorf("got %v, expected %v", acc.NFields(), 0) - } + acc.WaitError(1) + assert.Contains(t, acc.Errors[0].Error(), "E! subject: telegraf, error: metric parsing error") + assert.EqualValues(t, 0, acc.NMetrics()) } // Test that the parser parses line format messages into metrics @@ -78,10 +74,10 @@ func TestRunParserAndGather(t *testing.T) { n.wg.Add(1) go n.receiver() in <- natsMsg(testMsg) - time.Sleep(time.Millisecond * 25) n.Gather(&acc) + acc.Wait(1) acc.AssertContainsFields(t, "cpu_load_short", map[string]interface{}{"value": float64(23422)}) } @@ -97,10 +93,10 @@ func TestRunParserAndGatherGraphite(t *testing.T) { n.wg.Add(1) go n.receiver() in <- natsMsg(testMsgGraphite) - time.Sleep(time.Millisecond * 25) n.Gather(&acc) + acc.Wait(1) acc.AssertContainsFields(t, "cpu_load_short_graphite", map[string]interface{}{"value": float64(23422)}) } @@ -116,10 +112,10 @@ func TestRunParserAndGatherJSON(t *testing.T) { n.wg.Add(1) go n.receiver() in <- natsMsg(testMsgJSON) - time.Sleep(time.Millisecond * 25) n.Gather(&acc) + acc.Wait(1) acc.AssertContainsFields(t, "nats_json_test", map[string]interface{}{ "a": float64(5), diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index 6764b6d2d..9fa472809 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -81,42 +81,25 @@ func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) { acc := sl.Accumulator.(*testutil.Accumulator) + acc.Wait(3) acc.Lock() - if len(acc.Metrics) < 1 { - acc.Wait() - } - require.True(t, len(acc.Metrics) >= 1) - m := acc.Metrics[0] + m1 := acc.Metrics[0] + m2 := acc.Metrics[1] + m3 := acc.Metrics[2] acc.Unlock() - assert.Equal(t, "test", m.Measurement) - assert.Equal(t, map[string]string{"foo": "bar"}, m.Tags) - assert.Equal(t, map[string]interface{}{"v": int64(1)}, m.Fields) - assert.True(t, time.Unix(0, 123456789).Equal(m.Time)) + assert.Equal(t, "test", m1.Measurement) + assert.Equal(t, map[string]string{"foo": "bar"}, m1.Tags) + assert.Equal(t, map[string]interface{}{"v": int64(1)}, m1.Fields) + assert.True(t, time.Unix(0, 123456789).Equal(m1.Time)) - acc.Lock() - if len(acc.Metrics) < 2 { - acc.Wait() - } - require.True(t, len(acc.Metrics) >= 2) - m = acc.Metrics[1] - acc.Unlock() + assert.Equal(t, "test", m2.Measurement) + assert.Equal(t, map[string]string{"foo": "baz"}, m2.Tags) + assert.Equal(t, map[string]interface{}{"v": int64(2)}, m2.Fields) + assert.True(t, time.Unix(0, 123456790).Equal(m2.Time)) - assert.Equal(t, "test", m.Measurement) - assert.Equal(t, map[string]string{"foo": "baz"}, m.Tags) - assert.Equal(t, map[string]interface{}{"v": int64(2)}, m.Fields) - assert.True(t, time.Unix(0, 123456790).Equal(m.Time)) - - acc.Lock() - if len(acc.Metrics) < 3 { - acc.Wait() - } - require.True(t, len(acc.Metrics) >= 3) - m = acc.Metrics[2] - acc.Unlock() - - assert.Equal(t, "test", m.Measurement) - assert.Equal(t, map[string]string{"foo": "zab"}, m.Tags) - assert.Equal(t, map[string]interface{}{"v": int64(3)}, m.Fields) - assert.True(t, time.Unix(0, 123456791).Equal(m.Time)) + assert.Equal(t, "test", m3.Measurement) + assert.Equal(t, map[string]string{"foo": "zab"}, m3.Tags) + assert.Equal(t, map[string]interface{}{"v": int64(3)}, m3.Fields) + assert.True(t, time.Unix(0, 123456791).Equal(m3.Time)) } diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index 508c1e320..0c19f9116 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -2,7 +2,6 @@ package tail import ( "fmt" - "log" "sync" "github.com/hpcloud/tail" @@ -86,7 +85,7 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { for _, filepath := range t.Files { g, err := globpath.Compile(filepath) if err != nil { - log.Printf("E! Error Glob %s failed to compile, %s", filepath, err) + t.acc.AddError(fmt.Errorf("E! Error Glob %s failed to compile, %s", filepath, err)) } for file, _ := range g.Match() { tailer, err := tail.TailFile(file, @@ -124,21 +123,21 @@ func (t *Tail) receiver(tailer *tail.Tail) { var line *tail.Line for line = range tailer.Lines { if line.Err != nil { - log.Printf("E! Error tailing file %s, Error: %s\n", - tailer.Filename, err) + t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n", + tailer.Filename, err)) continue } m, err = t.parser.ParseLine(line.Text) if err == nil { t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) } else { - log.Printf("E! Malformed log line in %s: [%s], Error: %s\n", - tailer.Filename, line.Text, err) + t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n", + tailer.Filename, line.Text, err)) } } if err := tailer.Err(); err != nil { - log.Printf("E! Error tailing file %s, Error: %s\n", - tailer.Filename, err) + t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n", + tailer.Filename, err)) } } @@ -146,12 +145,12 @@ func (t *Tail) Stop() { t.Lock() defer t.Unlock() - for _, t := range t.tailers { - err := t.Stop() + for _, tailer := range t.tailers { + err := tailer.Stop() if err != nil { - log.Printf("E! Error stopping tail on file %s\n", t.Filename) + t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename)) } - t.Cleanup() + tailer.Cleanup() } t.wg.Wait() } diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 31ecfbf30..b927d160c 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -3,6 +3,7 @@ package tail import ( "io/ioutil" "os" + "runtime" "testing" "time" @@ -30,11 +31,9 @@ func TestTailFromBeginning(t *testing.T) { acc := testutil.Accumulator{} require.NoError(t, tt.Start(&acc)) - time.Sleep(time.Millisecond * 100) require.NoError(t, tt.Gather(&acc)) - // arbitrary sleep to wait for message to show up - time.Sleep(time.Millisecond * 150) + acc.Wait(1) acc.AssertContainsTaggedFields(t, "cpu", map[string]interface{}{ "usage_idle": float64(100), @@ -60,13 +59,19 @@ func TestTailFromEnd(t *testing.T) { acc := testutil.Accumulator{} require.NoError(t, tt.Start(&acc)) - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 200) //TODO remove once https://github.com/hpcloud/tail/pull/114 is merged & added to Godeps + for _, tailer := range tt.tailers { + for n, err := tailer.Tell(); err == nil && n == 0; n, err = tailer.Tell() { + // wait for tailer to jump to end + runtime.Gosched() + } + } _, err = tmpfile.WriteString("cpu,othertag=foo usage_idle=100\n") require.NoError(t, err) require.NoError(t, tt.Gather(&acc)) - time.Sleep(time.Millisecond * 50) + acc.Wait(1) acc.AssertContainsTaggedFields(t, "cpu", map[string]interface{}{ "usage_idle": float64(100), @@ -96,7 +101,7 @@ func TestTailBadLine(t *testing.T) { _, err = tmpfile.WriteString("cpu mytag= foo usage_idle= 100\n") require.NoError(t, err) require.NoError(t, tt.Gather(&acc)) - time.Sleep(time.Millisecond * 50) - assert.Len(t, acc.Metrics, 0) + acc.WaitError(1) + assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line") } diff --git a/plugins/inputs/tcp_listener/tcp_listener_test.go b/plugins/inputs/tcp_listener/tcp_listener_test.go index f7e5784d3..27ced791c 100644 --- a/plugins/inputs/tcp_listener/tcp_listener_test.go +++ b/plugins/inputs/tcp_listener/tcp_listener_test.go @@ -1,10 +1,15 @@ package tcp_listener import ( + "bufio" + "bytes" "fmt" + "io" + "log" "net" + "os" + "strings" "testing" - "time" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" @@ -54,7 +59,6 @@ func BenchmarkTCP(b *testing.B) { panic(err) } - time.Sleep(time.Millisecond * 25) conn, err := net.Dial("tcp", "127.0.0.1:8198") if err != nil { panic(err) @@ -62,8 +66,10 @@ func BenchmarkTCP(b *testing.B) { for i := 0; i < 100000; i++ { fmt.Fprintf(conn, testMsg) } - // wait for 100,000 metrics to get added to accumulator - time.Sleep(time.Millisecond) + conn.(*net.TCPConn).CloseWrite() + // wait for all 100,000 metrics to be processed + buf := []byte{0} + conn.Read(buf) // will EOF when completed listener.Stop() } } @@ -81,16 +87,18 @@ func TestHighTrafficTCP(t *testing.T) { err := listener.Start(acc) require.NoError(t, err) - time.Sleep(time.Millisecond * 25) conn, err := net.Dial("tcp", "127.0.0.1:8199") require.NoError(t, err) for i := 0; i < 100000; i++ { fmt.Fprintf(conn, testMsg) } - time.Sleep(time.Millisecond) + conn.(*net.TCPConn).CloseWrite() + buf := []byte{0} + _, err = conn.Read(buf) + assert.Equal(t, err, io.EOF) listener.Stop() - assert.Equal(t, 100000, len(acc.Metrics)) + assert.Equal(t, 100000, int(acc.NMetrics())) } func TestConnectTCP(t *testing.T) { @@ -105,13 +113,12 @@ func TestConnectTCP(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) conn, err := net.Dial("tcp", "127.0.0.1:8194") require.NoError(t, err) // send single message to socket fmt.Fprintf(conn, testMsg) - time.Sleep(time.Millisecond * 15) + acc.Wait(1) acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, map[string]string{"host": "server01"}, @@ -119,7 +126,7 @@ func TestConnectTCP(t *testing.T) { // send multiple messages to socket fmt.Fprintf(conn, testMsgs) - time.Sleep(time.Millisecond * 15) + acc.Wait(6) hostTags := []string{"server02", "server03", "server04", "server05", "server06"} for _, hostTag := range hostTags { @@ -143,7 +150,6 @@ func TestConcurrentConns(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) _, err := net.Dial("tcp", "127.0.0.1:8195") assert.NoError(t, err) _, err = net.Dial("tcp", "127.0.0.1:8195") @@ -162,10 +168,8 @@ func TestConcurrentConns(t *testing.T) { " the Telegraf tcp listener configuration.\n", string(buf[:n])) - _, err = conn.Write([]byte(testMsg)) - assert.NoError(t, err) - time.Sleep(time.Millisecond * 10) - assert.Zero(t, acc.NFields()) + _, err = conn.Read(buf) + assert.Equal(t, io.EOF, err) } // Test that MaxTCPConections is respected when max==1 @@ -181,7 +185,6 @@ func TestConcurrentConns1(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) _, err := net.Dial("tcp", "127.0.0.1:8196") assert.NoError(t, err) @@ -198,10 +201,8 @@ func TestConcurrentConns1(t *testing.T) { " the Telegraf tcp listener configuration.\n", string(buf[:n])) - _, err = conn.Write([]byte(testMsg)) - assert.NoError(t, err) - time.Sleep(time.Millisecond * 10) - assert.Zero(t, acc.NFields()) + _, err = conn.Read(buf) + assert.Equal(t, io.EOF, err) } // Test that MaxTCPConections is respected @@ -216,7 +217,6 @@ func TestCloseConcurrentConns(t *testing.T) { acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) - time.Sleep(time.Millisecond * 25) _, err := net.Dial("tcp", "127.0.0.1:8195") assert.NoError(t, err) _, err = net.Dial("tcp", "127.0.0.1:8195") @@ -238,13 +238,9 @@ func TestRunParser(t *testing.T) { go listener.tcpParser() in <- testmsg - time.Sleep(time.Millisecond * 25) listener.Gather(&acc) - if a := acc.NFields(); a != 1 { - t.Errorf("got %v, expected %v", a, 1) - } - + acc.Wait(1) acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, map[string]string{"host": "server01"}, @@ -263,11 +259,16 @@ func TestRunParserInvalidMsg(t *testing.T) { listener.wg.Add(1) go listener.tcpParser() + buf := bytes.NewBuffer(nil) + log.SetOutput(buf) + defer log.SetOutput(os.Stderr) in <- testmsg - time.Sleep(time.Millisecond * 25) - if a := acc.NFields(); a != 0 { - t.Errorf("got %v, expected %v", a, 0) + scnr := bufio.NewScanner(buf) + for scnr.Scan() { + if strings.Contains(scnr.Text(), fmt.Sprintf(malformedwarn, 1)) { + break + } } } @@ -284,9 +285,9 @@ func TestRunParserGraphiteMsg(t *testing.T) { go listener.tcpParser() in <- testmsg - time.Sleep(time.Millisecond * 25) listener.Gather(&acc) + acc.Wait(1) acc.AssertContainsFields(t, "cpu_load_graphite", map[string]interface{}{"value": float64(12)}) } @@ -304,9 +305,9 @@ func TestRunParserJSONMsg(t *testing.T) { go listener.tcpParser() in <- testmsg - time.Sleep(time.Millisecond * 25) listener.Gather(&acc) + acc.Wait(1) acc.AssertContainsFields(t, "udp_json_test", map[string]interface{}{ "a": float64(5), diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index 53c6a72f5..d0a728b3c 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -1,6 +1,7 @@ package udp_listener import ( + "fmt" "log" "net" "sync" @@ -107,8 +108,9 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error { u.in = make(chan []byte, u.AllowedPendingMessages) u.done = make(chan struct{}) - u.wg.Add(2) - go u.udpListen() + u.udpListen() + + u.wg.Add(1) go u.udpParser() log.Printf("I! Started UDP listener service on %s (ReadBuffer: %d)\n", u.ServiceAddress, u.UDPBufferSize) @@ -126,32 +128,37 @@ func (u *UdpListener) Stop() { } func (u *UdpListener) udpListen() error { - defer u.wg.Done() var err error address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress) u.listener, err = net.ListenUDP("udp", address) if err != nil { - log.Fatalf("E! Error: ListenUDP - %s", err) + return fmt.Errorf("E! Error: ListenUDP - %s", err) } log.Println("I! UDP server listening on: ", u.listener.LocalAddr().String()) - buf := make([]byte, UDP_MAX_PACKET_SIZE) - if u.UDPBufferSize > 0 { err = u.listener.SetReadBuffer(u.UDPBufferSize) // if we want to move away from OS default if err != nil { - log.Printf("E! Failed to set UDP read buffer to %d: %s", u.UDPBufferSize, err) - return err + return fmt.Errorf("E! Failed to set UDP read buffer to %d: %s", u.UDPBufferSize, err) } } + u.wg.Add(1) + go u.udpListenLoop() + return nil +} + +func (u *UdpListener) udpListenLoop() { + defer u.wg.Done() + + buf := make([]byte, UDP_MAX_PACKET_SIZE) for { select { case <-u.done: - return nil + return default: u.listener.SetReadDeadline(time.Now().Add(time.Second)) diff --git a/plugins/inputs/udp_listener/udp_listener_test.go b/plugins/inputs/udp_listener/udp_listener_test.go index eefdd593e..4d78a1a42 100644 --- a/plugins/inputs/udp_listener/udp_listener_test.go +++ b/plugins/inputs/udp_listener/udp_listener_test.go @@ -1,12 +1,16 @@ package udp_listener import ( + "bufio" + "bytes" "fmt" "io/ioutil" "log" "net" + "os" + "runtime" + "strings" "testing" - "time" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" @@ -50,22 +54,27 @@ func TestHighTrafficUDP(t *testing.T) { err := listener.Start(acc) require.NoError(t, err) - time.Sleep(time.Millisecond * 25) conn, err := net.Dial("udp", "127.0.0.1:8126") require.NoError(t, err) + mlen := int64(len(testMsgs)) + var sent int64 for i := 0; i < 20000; i++ { - // arbitrary, just to give the OS buffer some slack handling the - // packet storm. - time.Sleep(time.Microsecond) - fmt.Fprintf(conn, testMsgs) + for sent > listener.BytesRecv.Get()+32000 { + // more than 32kb sitting in OS buffer, let it drain + runtime.Gosched() + } + conn.Write([]byte(testMsgs)) + sent += mlen + } + for sent > listener.BytesRecv.Get() { + runtime.Gosched() + } + for len(listener.in) > 0 { + runtime.Gosched() } - time.Sleep(time.Millisecond) listener.Stop() - // this is not an exact science, since UDP packets can easily get lost or - // dropped, but assume that the OS will be able to - // handle at least 90% of the sent UDP packets. - assert.InDelta(t, 100000, len(acc.Metrics), 10000) + assert.Equal(t, uint64(100000), acc.NMetrics()) } func TestConnectUDP(t *testing.T) { @@ -79,13 +88,12 @@ func TestConnectUDP(t *testing.T) { require.NoError(t, listener.Start(acc)) defer listener.Stop() - time.Sleep(time.Millisecond * 25) conn, err := net.Dial("udp", "127.0.0.1:8127") require.NoError(t, err) // send single message to socket fmt.Fprintf(conn, testMsg) - time.Sleep(time.Millisecond * 15) + acc.Wait(1) acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, map[string]string{"host": "server01"}, @@ -93,7 +101,7 @@ func TestConnectUDP(t *testing.T) { // send multiple messages to socket fmt.Fprintf(conn, testMsgs) - time.Sleep(time.Millisecond * 15) + acc.Wait(6) hostTags := []string{"server02", "server03", "server04", "server05", "server06"} for _, hostTag := range hostTags { @@ -118,13 +126,9 @@ func TestRunParser(t *testing.T) { go listener.udpParser() in <- testmsg - time.Sleep(time.Millisecond * 25) listener.Gather(&acc) - if a := acc.NFields(); a != 1 { - t.Errorf("got %v, expected %v", a, 1) - } - + acc.Wait(1) acc.AssertContainsTaggedFields(t, "cpu_load_short", map[string]interface{}{"value": float64(12)}, map[string]string{"host": "server01"}, @@ -144,11 +148,16 @@ func TestRunParserInvalidMsg(t *testing.T) { listener.wg.Add(1) go listener.udpParser() + buf := bytes.NewBuffer(nil) + log.SetOutput(buf) + defer log.SetOutput(os.Stderr) in <- testmsg - time.Sleep(time.Millisecond * 25) - if a := acc.NFields(); a != 0 { - t.Errorf("got %v, expected %v", a, 0) + scnr := bufio.NewScanner(buf) + for scnr.Scan() { + if strings.Contains(scnr.Text(), fmt.Sprintf(malformedwarn, 1)) { + break + } } } @@ -166,9 +175,9 @@ func TestRunParserGraphiteMsg(t *testing.T) { go listener.udpParser() in <- testmsg - time.Sleep(time.Millisecond * 25) listener.Gather(&acc) + acc.Wait(1) acc.AssertContainsFields(t, "cpu_load_graphite", map[string]interface{}{"value": float64(12)}) } @@ -187,9 +196,9 @@ func TestRunParserJSONMsg(t *testing.T) { go listener.udpParser() in <- testmsg - time.Sleep(time.Millisecond * 25) listener.Gather(&acc) + acc.Wait(1) acc.AssertContainsFields(t, "udp_json_test", map[string]interface{}{ "a": float64(5), diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index 4f1f2fef6..3984728af 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -44,9 +44,7 @@ func TestGraphiteOK(t *testing.T) { // Start TCP server wg.Add(1) t.Log("Starting server") - go TCPServer1(t, &wg) - // Give the fake graphite TCP server some time to start: - time.Sleep(time.Millisecond * 100) + TCPServer1(t, &wg) // Init plugin g := Graphite{ @@ -88,10 +86,8 @@ func TestGraphiteOK(t *testing.T) { t.Log("Finished Waiting for first data") var wg2 sync.WaitGroup // Start TCP server - time.Sleep(time.Millisecond * 100) wg2.Add(1) - go TCPServer2(t, &wg2) - time.Sleep(time.Millisecond * 100) + TCPServer2(t, &wg2) //Write but expect an error, but reconnect g.Write(metrics2) err3 := g.Write(metrics2) @@ -105,27 +101,31 @@ func TestGraphiteOK(t *testing.T) { } func TCPServer1(t *testing.T, wg *sync.WaitGroup) { - defer wg.Done() tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") - conn, _ := (tcpServer).Accept() - reader := bufio.NewReader(conn) - tp := textproto.NewReader(reader) - data1, _ := tp.ReadLine() - assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) - conn.Close() - tcpServer.Close() + go func() { + defer wg.Done() + conn, _ := (tcpServer).Accept() + reader := bufio.NewReader(conn) + tp := textproto.NewReader(reader) + data1, _ := tp.ReadLine() + assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) + conn.Close() + tcpServer.Close() + }() } func TCPServer2(t *testing.T, wg *sync.WaitGroup) { - defer wg.Done() tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") - conn2, _ := (tcpServer).Accept() - reader := bufio.NewReader(conn2) - tp := textproto.NewReader(reader) - data2, _ := tp.ReadLine() - assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) - data3, _ := tp.ReadLine() - assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3) - conn2.Close() - tcpServer.Close() + go func() { + defer wg.Done() + conn2, _ := (tcpServer).Accept() + reader := bufio.NewReader(conn2) + tp := textproto.NewReader(reader) + data2, _ := tp.ReadLine() + assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) + data3, _ := tp.ReadLine() + assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3) + conn2.Close() + tcpServer.Close() + }() } diff --git a/plugins/outputs/influxdb/client/udp_test.go b/plugins/outputs/influxdb/client/udp_test.go index 31196ddca..84efe0b22 100644 --- a/plugins/outputs/influxdb/client/udp_test.go +++ b/plugins/outputs/influxdb/client/udp_test.go @@ -66,7 +66,6 @@ func TestUDPClient_Write(t *testing.T) { }() // test sending simple metric - time.Sleep(time.Second) n, err := client.Write([]byte("cpu value=99\n")) assert.Equal(t, n, 13) assert.NoError(t, err) diff --git a/plugins/outputs/instrumental/instrumental_test.go b/plugins/outputs/instrumental/instrumental_test.go index d77d8eb05..0d3ce9040 100644 --- a/plugins/outputs/instrumental/instrumental_test.go +++ b/plugins/outputs/instrumental/instrumental_test.go @@ -16,9 +16,7 @@ import ( func TestWrite(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - go TCPServer(t, &wg) - // Give the fake TCP server some time to start: - time.Sleep(time.Millisecond * 100) + TCPServer(t, &wg) i := Instrumental{ Host: "127.0.0.1", @@ -79,45 +77,47 @@ func TestWrite(t *testing.T) { func TCPServer(t *testing.T, wg *sync.WaitGroup) { tcpServer, _ := net.Listen("tcp", "127.0.0.1:8000") - defer wg.Done() - conn, _ := tcpServer.Accept() - conn.SetDeadline(time.Now().Add(1 * time.Second)) - reader := bufio.NewReader(conn) - tp := textproto.NewReader(reader) + go func() { + defer wg.Done() + conn, _ := tcpServer.Accept() + conn.SetDeadline(time.Now().Add(1 * time.Second)) + reader := bufio.NewReader(conn) + tp := textproto.NewReader(reader) - hello, _ := tp.ReadLine() - assert.Equal(t, "hello version go/telegraf/1.1", hello) - auth, _ := tp.ReadLine() - assert.Equal(t, "authenticate abc123token", auth) - conn.Write([]byte("ok\nok\n")) + hello, _ := tp.ReadLine() + assert.Equal(t, "hello version go/telegraf/1.1", hello) + auth, _ := tp.ReadLine() + assert.Equal(t, "authenticate abc123token", auth) + conn.Write([]byte("ok\nok\n")) - data1, _ := tp.ReadLine() - assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) - data2, _ := tp.ReadLine() - assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) + data1, _ := tp.ReadLine() + assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) + data2, _ := tp.ReadLine() + assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) - conn, _ = tcpServer.Accept() - conn.SetDeadline(time.Now().Add(1 * time.Second)) - reader = bufio.NewReader(conn) - tp = textproto.NewReader(reader) + conn, _ = tcpServer.Accept() + conn.SetDeadline(time.Now().Add(1 * time.Second)) + reader = bufio.NewReader(conn) + tp = textproto.NewReader(reader) - hello, _ = tp.ReadLine() - assert.Equal(t, "hello version go/telegraf/1.1", hello) - auth, _ = tp.ReadLine() - assert.Equal(t, "authenticate abc123token", auth) - conn.Write([]byte("ok\nok\n")) + hello, _ = tp.ReadLine() + assert.Equal(t, "hello version go/telegraf/1.1", hello) + auth, _ = tp.ReadLine() + assert.Equal(t, "authenticate abc123token", auth) + conn.Write([]byte("ok\nok\n")) - data3, _ := tp.ReadLine() - assert.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3) + data3, _ := tp.ReadLine() + assert.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3) - data4, _ := tp.ReadLine() - assert.Equal(t, "increment my.prefix.192_168_0_1_8888_123.bad_metric_name 1 1289430000", data4) + data4, _ := tp.ReadLine() + assert.Equal(t, "increment my.prefix.192_168_0_1_8888_123.bad_metric_name 1 1289430000", data4) - data5, _ := tp.ReadLine() - assert.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data5) + data5, _ := tp.ReadLine() + assert.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data5) - data6, _ := tp.ReadLine() - assert.Equal(t, "", data6) + data6, _ := tp.ReadLine() + assert.Equal(t, "", data6) - conn.Close() + conn.Close() + }() } diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 63dfddd7a..02bebf9c8 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -129,6 +129,9 @@ func (a *Accumulator) AddError(err error) { } a.Lock() a.Errors = append(a.Errors, err) + if a.Cond != nil { + a.Cond.Broadcast() + } a.Unlock() } @@ -198,13 +201,28 @@ func (a *Accumulator) NFields() int { return counter } -// Wait waits for a metric to be added to the accumulator. -// Accumulator must already be locked. -func (a *Accumulator) Wait() { +// Wait waits for the given number of metrics to be added to the accumulator. +func (a *Accumulator) Wait(n int) { + a.Lock() if a.Cond == nil { a.Cond = sync.NewCond(&a.Mutex) } - a.Cond.Wait() + for int(a.NMetrics()) < n { + a.Cond.Wait() + } + a.Unlock() +} + +// WaitError waits for the given number of errors to be added to the accumulator. +func (a *Accumulator) WaitError(n int) { + a.Lock() + if a.Cond == nil { + a.Cond = sync.NewCond(&a.Mutex) + } + for len(a.Errors) < n { + a.Cond.Wait() + } + a.Unlock() } func (a *Accumulator) AssertContainsTaggedFields(