remove sleep from tests (#2555)
This commit is contained in:
parent
616b66f5cb
commit
1402c158b7
|
@ -207,14 +207,13 @@ func TestGenerateStatisticsInputParams(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMetricsCacheTimeout(t *testing.T) {
|
func TestMetricsCacheTimeout(t *testing.T) {
|
||||||
ttl, _ := time.ParseDuration("5ms")
|
|
||||||
cache := &MetricCache{
|
cache := &MetricCache{
|
||||||
Metrics: []*cloudwatch.Metric{},
|
Metrics: []*cloudwatch.Metric{},
|
||||||
Fetched: time.Now(),
|
Fetched: time.Now(),
|
||||||
TTL: ttl,
|
TTL: time.Minute,
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.True(t, cache.IsValid())
|
assert.True(t, cache.IsValid())
|
||||||
time.Sleep(ttl)
|
cache.Fetched = time.Now().Add(-time.Minute)
|
||||||
assert.False(t, cache.IsValid())
|
assert.False(t, cache.IsValid())
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
||||||
|
@ -43,14 +42,12 @@ func TestWriteHTTP(t *testing.T) {
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
// post single message to listener
|
// post single message to listener
|
||||||
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg)))
|
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.EqualValues(t, 204, resp.StatusCode)
|
require.EqualValues(t, 204, resp.StatusCode)
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 15)
|
acc.Wait(1)
|
||||||
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
map[string]interface{}{"value": float64(12)},
|
map[string]interface{}{"value": float64(12)},
|
||||||
map[string]string{"host": "server01"},
|
map[string]string{"host": "server01"},
|
||||||
|
@ -61,7 +58,7 @@ func TestWriteHTTP(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.EqualValues(t, 204, resp.StatusCode)
|
require.EqualValues(t, 204, resp.StatusCode)
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 15)
|
acc.Wait(2)
|
||||||
hostTags := []string{"server02", "server03",
|
hostTags := []string{"server02", "server03",
|
||||||
"server04", "server05", "server06"}
|
"server04", "server05", "server06"}
|
||||||
for _, hostTag := range hostTags {
|
for _, hostTag := range hostTags {
|
||||||
|
@ -76,7 +73,7 @@ func TestWriteHTTP(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.EqualValues(t, 400, resp.StatusCode)
|
require.EqualValues(t, 400, resp.StatusCode)
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 15)
|
acc.Wait(3)
|
||||||
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
map[string]interface{}{"value": float64(12)},
|
map[string]interface{}{"value": float64(12)},
|
||||||
map[string]string{"host": "server01"},
|
map[string]string{"host": "server01"},
|
||||||
|
@ -91,14 +88,12 @@ func TestWriteHTTPNoNewline(t *testing.T) {
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
// post single message to listener
|
// post single message to listener
|
||||||
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgNoNewline)))
|
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgNoNewline)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.EqualValues(t, 204, resp.StatusCode)
|
require.EqualValues(t, 204, resp.StatusCode)
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 15)
|
acc.Wait(1)
|
||||||
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
map[string]interface{}{"value": float64(12)},
|
map[string]interface{}{"value": float64(12)},
|
||||||
map[string]string{"host": "server01"},
|
map[string]string{"host": "server01"},
|
||||||
|
@ -115,8 +110,6 @@ func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) {
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
// Post a gigantic metric to the listener and verify that it writes OK this time:
|
// Post a gigantic metric to the listener and verify that it writes OK this time:
|
||||||
resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric)))
|
resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -133,8 +126,6 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
resp, err := http.Post("http://localhost:8297/write", "", bytes.NewBuffer([]byte(hugeMetric)))
|
resp, err := http.Post("http://localhost:8297/write", "", bytes.NewBuffer([]byte(hugeMetric)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.EqualValues(t, 413, resp.StatusCode)
|
require.EqualValues(t, 413, resp.StatusCode)
|
||||||
|
@ -150,15 +141,13 @@ func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) {
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
resp, err := http.Post("http://localhost:8298/write", "", bytes.NewBuffer([]byte(testMsgs)))
|
resp, err := http.Post("http://localhost:8298/write", "", bytes.NewBuffer([]byte(testMsgs)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.EqualValues(t, 204, resp.StatusCode)
|
require.EqualValues(t, 204, resp.StatusCode)
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 15)
|
|
||||||
hostTags := []string{"server02", "server03",
|
hostTags := []string{"server02", "server03",
|
||||||
"server04", "server05", "server06"}
|
"server04", "server05", "server06"}
|
||||||
|
acc.Wait(len(hostTags))
|
||||||
for _, hostTag := range hostTags {
|
for _, hostTag := range hostTags {
|
||||||
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
map[string]interface{}{"value": float64(12)},
|
map[string]interface{}{"value": float64(12)},
|
||||||
|
@ -177,15 +166,13 @@ func TestWriteHTTPLargeLinesSkipped(t *testing.T) {
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
resp, err := http.Post("http://localhost:8300/write", "", bytes.NewBuffer([]byte(hugeMetric+testMsgs)))
|
resp, err := http.Post("http://localhost:8300/write", "", bytes.NewBuffer([]byte(hugeMetric+testMsgs)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.EqualValues(t, 400, resp.StatusCode)
|
require.EqualValues(t, 400, resp.StatusCode)
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 15)
|
|
||||||
hostTags := []string{"server02", "server03",
|
hostTags := []string{"server02", "server03",
|
||||||
"server04", "server05", "server06"}
|
"server04", "server05", "server06"}
|
||||||
|
acc.Wait(len(hostTags))
|
||||||
for _, hostTag := range hostTags {
|
for _, hostTag := range hostTags {
|
||||||
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
map[string]interface{}{"value": float64(12)},
|
map[string]interface{}{"value": float64(12)},
|
||||||
|
@ -204,8 +191,6 @@ func TestWriteHTTPGzippedData(t *testing.T) {
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
data, err := ioutil.ReadFile("./testdata/testmsgs.gz")
|
data, err := ioutil.ReadFile("./testdata/testmsgs.gz")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -218,9 +203,9 @@ func TestWriteHTTPGzippedData(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.EqualValues(t, 204, resp.StatusCode)
|
require.EqualValues(t, 204, resp.StatusCode)
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 50)
|
|
||||||
hostTags := []string{"server02", "server03",
|
hostTags := []string{"server02", "server03",
|
||||||
"server04", "server05", "server06"}
|
"server04", "server05", "server06"}
|
||||||
|
acc.Wait(len(hostTags))
|
||||||
for _, hostTag := range hostTags {
|
for _, hostTag := range hostTags {
|
||||||
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
map[string]interface{}{"value": float64(12)},
|
map[string]interface{}{"value": float64(12)},
|
||||||
|
@ -237,8 +222,6 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
// post many messages to listener
|
// post many messages to listener
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
|
@ -254,9 +237,9 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
time.Sleep(time.Millisecond * 250)
|
|
||||||
listener.Gather(acc)
|
listener.Gather(acc)
|
||||||
|
|
||||||
|
acc.Wait(25000)
|
||||||
require.Equal(t, int64(25000), int64(acc.NMetrics()))
|
require.Equal(t, int64(25000), int64(acc.NMetrics()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -267,8 +250,6 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
// post single message to listener
|
// post single message to listener
|
||||||
resp, err := http.Post("http://localhost:8186/foobar", "", bytes.NewBuffer([]byte(testMsg)))
|
resp, err := http.Post("http://localhost:8186/foobar", "", bytes.NewBuffer([]byte(testMsg)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -276,16 +257,12 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWriteHTTPInvalid(t *testing.T) {
|
func TestWriteHTTPInvalid(t *testing.T) {
|
||||||
time.Sleep(time.Millisecond * 250)
|
|
||||||
|
|
||||||
listener := newTestHTTPListener()
|
listener := newTestHTTPListener()
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
// post single message to listener
|
// post single message to listener
|
||||||
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg)))
|
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -293,16 +270,12 @@ func TestWriteHTTPInvalid(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWriteHTTPEmpty(t *testing.T) {
|
func TestWriteHTTPEmpty(t *testing.T) {
|
||||||
time.Sleep(time.Millisecond * 250)
|
|
||||||
|
|
||||||
listener := newTestHTTPListener()
|
listener := newTestHTTPListener()
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
// post single message to listener
|
// post single message to listener
|
||||||
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(emptyMsg)))
|
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(emptyMsg)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -310,16 +283,12 @@ func TestWriteHTTPEmpty(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestQueryAndPingHTTP(t *testing.T) {
|
func TestQueryAndPingHTTP(t *testing.T) {
|
||||||
time.Sleep(time.Millisecond * 250)
|
|
||||||
|
|
||||||
listener := newTestHTTPListener()
|
listener := newTestHTTPListener()
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
// post query to listener
|
// post query to listener
|
||||||
resp, err := http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil)
|
resp, err := http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -329,7 +329,7 @@ func TestTimeout(t *testing.T) {
|
||||||
Address: ts.URL + "/twosecondnap",
|
Address: ts.URL + "/twosecondnap",
|
||||||
Body: "{ 'test': 'data'}",
|
Body: "{ 'test': 'data'}",
|
||||||
Method: "GET",
|
Method: "GET",
|
||||||
ResponseTimeout: internal.Duration{Duration: time.Second * 1},
|
ResponseTimeout: internal.Duration{Duration: time.Millisecond},
|
||||||
Headers: map[string]string{
|
Headers: map[string]string{
|
||||||
"Content-Type": "application/json",
|
"Content-Type": "application/json",
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package kafka_consumer
|
package kafka_consumer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -129,13 +130,13 @@ func (k *Kafka) receiver() {
|
||||||
return
|
return
|
||||||
case err := <-k.errs:
|
case err := <-k.errs:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("E! Kafka Consumer Error: %s\n", err)
|
k.acc.AddError(fmt.Errorf("Kafka Consumer Error: %s\n", err))
|
||||||
}
|
}
|
||||||
case msg := <-k.in:
|
case msg := <-k.in:
|
||||||
metrics, err := k.parser.Parse(msg.Value)
|
metrics, err := k.parser.Parse(msg.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s",
|
k.acc.AddError(fmt.Errorf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s",
|
||||||
string(msg.Value), err.Error())
|
string(msg.Value), err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
|
@ -158,7 +159,7 @@ func (k *Kafka) Stop() {
|
||||||
defer k.Unlock()
|
defer k.Unlock()
|
||||||
close(k.done)
|
close(k.done)
|
||||||
if err := k.Consumer.Close(); err != nil {
|
if err := k.Consumer.Close(); err != nil {
|
||||||
log.Printf("E! Error closing kafka consumer: %s\n", err.Error())
|
k.acc.AddError(fmt.Errorf("E! Error closing kafka consumer: %s\n", err.Error()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,6 @@ package kafka_consumer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
@ -43,7 +42,7 @@ func TestRunParser(t *testing.T) {
|
||||||
k.parser, _ = parsers.NewInfluxParser()
|
k.parser, _ = parsers.NewInfluxParser()
|
||||||
go k.receiver()
|
go k.receiver()
|
||||||
in <- saramaMsg(testMsg)
|
in <- saramaMsg(testMsg)
|
||||||
time.Sleep(time.Millisecond * 5)
|
acc.Wait(1)
|
||||||
|
|
||||||
assert.Equal(t, acc.NFields(), 1)
|
assert.Equal(t, acc.NFields(), 1)
|
||||||
}
|
}
|
||||||
|
@ -58,7 +57,7 @@ func TestRunParserInvalidMsg(t *testing.T) {
|
||||||
k.parser, _ = parsers.NewInfluxParser()
|
k.parser, _ = parsers.NewInfluxParser()
|
||||||
go k.receiver()
|
go k.receiver()
|
||||||
in <- saramaMsg(invalidMsg)
|
in <- saramaMsg(invalidMsg)
|
||||||
time.Sleep(time.Millisecond * 5)
|
acc.WaitError(1)
|
||||||
|
|
||||||
assert.Equal(t, acc.NFields(), 0)
|
assert.Equal(t, acc.NFields(), 0)
|
||||||
}
|
}
|
||||||
|
@ -73,7 +72,7 @@ func TestRunParserAndGather(t *testing.T) {
|
||||||
k.parser, _ = parsers.NewInfluxParser()
|
k.parser, _ = parsers.NewInfluxParser()
|
||||||
go k.receiver()
|
go k.receiver()
|
||||||
in <- saramaMsg(testMsg)
|
in <- saramaMsg(testMsg)
|
||||||
time.Sleep(time.Millisecond * 5)
|
acc.Wait(1)
|
||||||
|
|
||||||
k.Gather(&acc)
|
k.Gather(&acc)
|
||||||
|
|
||||||
|
@ -92,7 +91,7 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
|
||||||
k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
|
k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
|
||||||
go k.receiver()
|
go k.receiver()
|
||||||
in <- saramaMsg(testMsgGraphite)
|
in <- saramaMsg(testMsgGraphite)
|
||||||
time.Sleep(time.Millisecond * 5)
|
acc.Wait(1)
|
||||||
|
|
||||||
k.Gather(&acc)
|
k.Gather(&acc)
|
||||||
|
|
||||||
|
@ -111,7 +110,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
||||||
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
|
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
|
||||||
go k.receiver()
|
go k.receiver()
|
||||||
in <- saramaMsg(testMsgJSON)
|
in <- saramaMsg(testMsgJSON)
|
||||||
time.Sleep(time.Millisecond * 5)
|
acc.Wait(1)
|
||||||
|
|
||||||
k.Gather(&acc)
|
k.Gather(&acc)
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
||||||
|
@ -41,7 +40,6 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
|
||||||
acc := testutil.Accumulator{}
|
acc := testutil.Accumulator{}
|
||||||
assert.Error(t, logparser.Start(&acc))
|
assert.Error(t, logparser.Start(&acc))
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 500)
|
|
||||||
logparser.Stop()
|
logparser.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,7 +59,8 @@ func TestGrokParseLogFiles(t *testing.T) {
|
||||||
acc := testutil.Accumulator{}
|
acc := testutil.Accumulator{}
|
||||||
assert.NoError(t, logparser.Start(&acc))
|
assert.NoError(t, logparser.Start(&acc))
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 500)
|
acc.Wait(2)
|
||||||
|
|
||||||
logparser.Stop()
|
logparser.Stop()
|
||||||
|
|
||||||
acc.AssertContainsTaggedFields(t, "logparser_grok",
|
acc.AssertContainsTaggedFields(t, "logparser_grok",
|
||||||
|
@ -102,14 +101,13 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
|
||||||
acc := testutil.Accumulator{}
|
acc := testutil.Accumulator{}
|
||||||
assert.NoError(t, logparser.Start(&acc))
|
assert.NoError(t, logparser.Start(&acc))
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 500)
|
|
||||||
assert.Equal(t, acc.NFields(), 0)
|
assert.Equal(t, acc.NFields(), 0)
|
||||||
|
|
||||||
os.Symlink(
|
os.Symlink(
|
||||||
thisdir+"grok/testdata/test_a.log",
|
thisdir+"grok/testdata/test_a.log",
|
||||||
emptydir+"/test_a.log")
|
emptydir+"/test_a.log")
|
||||||
assert.NoError(t, logparser.Gather(&acc))
|
assert.NoError(t, logparser.Gather(&acc))
|
||||||
time.Sleep(time.Millisecond * 500)
|
acc.Wait(1)
|
||||||
|
|
||||||
logparser.Stop()
|
logparser.Stop()
|
||||||
|
|
||||||
|
@ -143,7 +141,7 @@ func TestGrokParseLogFilesOneBad(t *testing.T) {
|
||||||
acc.SetDebug(true)
|
acc.SetDebug(true)
|
||||||
assert.NoError(t, logparser.Start(&acc))
|
assert.NoError(t, logparser.Start(&acc))
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 500)
|
acc.Wait(1)
|
||||||
logparser.Stop()
|
logparser.Stop()
|
||||||
|
|
||||||
acc.AssertContainsTaggedFields(t, "logparser_grok",
|
acc.AssertContainsTaggedFields(t, "logparser_grok",
|
||||||
|
|
|
@ -4,7 +4,6 @@ package mongodb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
@ -32,12 +31,11 @@ func TestAddDefaultStats(t *testing.T) {
|
||||||
err := server.gatherData(&acc, false)
|
err := server.gatherData(&acc, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
time.Sleep(time.Duration(1) * time.Second)
|
|
||||||
// need to call this twice so it can perform the diff
|
// need to call this twice so it can perform the diff
|
||||||
err = server.gatherData(&acc, false)
|
err = server.gatherData(&acc, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
for key, _ := range DefaultStats {
|
for key, _ := range DefaultStats {
|
||||||
assert.True(t, acc.HasIntValue(key))
|
assert.True(t, acc.HasIntField("mongodb", key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,8 +142,8 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) {
|
||||||
subscribeToken := c.SubscribeMultiple(topics, m.recvMessage)
|
subscribeToken := c.SubscribeMultiple(topics, m.recvMessage)
|
||||||
subscribeToken.Wait()
|
subscribeToken.Wait()
|
||||||
if subscribeToken.Error() != nil {
|
if subscribeToken.Error() != nil {
|
||||||
log.Printf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s",
|
m.acc.AddError(fmt.Errorf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s",
|
||||||
strings.Join(m.Topics[:], ","), subscribeToken.Error())
|
strings.Join(m.Topics[:], ","), subscribeToken.Error()))
|
||||||
}
|
}
|
||||||
m.started = true
|
m.started = true
|
||||||
}
|
}
|
||||||
|
@ -151,7 +151,7 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) {
|
func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) {
|
||||||
log.Printf("E! MQTT Connection lost\nerror: %s\nMQTT Client will try to reconnect", err.Error())
|
m.acc.AddError(fmt.Errorf("E! MQTT Connection lost\nerror: %s\nMQTT Client will try to reconnect", err.Error()))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,8 +166,8 @@ func (m *MQTTConsumer) receiver() {
|
||||||
topic := msg.Topic()
|
topic := msg.Topic()
|
||||||
metrics, err := m.parser.Parse(msg.Payload())
|
metrics, err := m.parser.Parse(msg.Payload())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("E! MQTT Parse Error\nmessage: %s\nerror: %s",
|
m.acc.AddError(fmt.Errorf("E! MQTT Parse Error\nmessage: %s\nerror: %s",
|
||||||
string(msg.Payload()), err.Error())
|
string(msg.Payload()), err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
|
|
|
@ -2,7 +2,6 @@ package mqtt_consumer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
@ -86,7 +85,7 @@ func TestRunParser(t *testing.T) {
|
||||||
n.parser, _ = parsers.NewInfluxParser()
|
n.parser, _ = parsers.NewInfluxParser()
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- mqttMsg(testMsgNeg)
|
in <- mqttMsg(testMsgNeg)
|
||||||
time.Sleep(time.Millisecond * 250)
|
acc.Wait(1)
|
||||||
|
|
||||||
if a := acc.NFields(); a != 1 {
|
if a := acc.NFields(); a != 1 {
|
||||||
t.Errorf("got %v, expected %v", a, 1)
|
t.Errorf("got %v, expected %v", a, 1)
|
||||||
|
@ -102,7 +101,7 @@ func TestRunParserNegativeNumber(t *testing.T) {
|
||||||
n.parser, _ = parsers.NewInfluxParser()
|
n.parser, _ = parsers.NewInfluxParser()
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- mqttMsg(testMsg)
|
in <- mqttMsg(testMsg)
|
||||||
time.Sleep(time.Millisecond * 25)
|
acc.Wait(1)
|
||||||
|
|
||||||
if a := acc.NFields(); a != 1 {
|
if a := acc.NFields(); a != 1 {
|
||||||
t.Errorf("got %v, expected %v", a, 1)
|
t.Errorf("got %v, expected %v", a, 1)
|
||||||
|
@ -119,11 +118,12 @@ func TestRunParserInvalidMsg(t *testing.T) {
|
||||||
n.parser, _ = parsers.NewInfluxParser()
|
n.parser, _ = parsers.NewInfluxParser()
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- mqttMsg(invalidMsg)
|
in <- mqttMsg(invalidMsg)
|
||||||
time.Sleep(time.Millisecond * 25)
|
acc.WaitError(1)
|
||||||
|
|
||||||
if a := acc.NFields(); a != 0 {
|
if a := acc.NFields(); a != 0 {
|
||||||
t.Errorf("got %v, expected %v", a, 0)
|
t.Errorf("got %v, expected %v", a, 0)
|
||||||
}
|
}
|
||||||
|
assert.Contains(t, acc.Errors[0].Error(), "MQTT Parse Error")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that the parser parses line format messages into metrics
|
// Test that the parser parses line format messages into metrics
|
||||||
|
@ -136,7 +136,7 @@ func TestRunParserAndGather(t *testing.T) {
|
||||||
n.parser, _ = parsers.NewInfluxParser()
|
n.parser, _ = parsers.NewInfluxParser()
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- mqttMsg(testMsg)
|
in <- mqttMsg(testMsg)
|
||||||
time.Sleep(time.Millisecond * 25)
|
acc.Wait(1)
|
||||||
|
|
||||||
n.Gather(&acc)
|
n.Gather(&acc)
|
||||||
|
|
||||||
|
@ -154,9 +154,9 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
|
||||||
n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
|
n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- mqttMsg(testMsgGraphite)
|
in <- mqttMsg(testMsgGraphite)
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
n.Gather(&acc)
|
n.Gather(&acc)
|
||||||
|
acc.Wait(1)
|
||||||
|
|
||||||
acc.AssertContainsFields(t, "cpu_load_short_graphite",
|
acc.AssertContainsFields(t, "cpu_load_short_graphite",
|
||||||
map[string]interface{}{"value": float64(23422)})
|
map[string]interface{}{"value": float64(23422)})
|
||||||
|
@ -172,10 +172,11 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
||||||
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
|
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- mqttMsg(testMsgJSON)
|
in <- mqttMsg(testMsgJSON)
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
n.Gather(&acc)
|
n.Gather(&acc)
|
||||||
|
|
||||||
|
acc.Wait(1)
|
||||||
|
|
||||||
acc.AssertContainsFields(t, "nats_json_test",
|
acc.AssertContainsFields(t, "nats_json_test",
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"a": float64(5),
|
"a": float64(5),
|
||||||
|
|
|
@ -162,11 +162,11 @@ func (n *natsConsumer) receiver() {
|
||||||
case <-n.done:
|
case <-n.done:
|
||||||
return
|
return
|
||||||
case err := <-n.errs:
|
case err := <-n.errs:
|
||||||
log.Printf("E! error reading from %s\n", err.Error())
|
n.acc.AddError(fmt.Errorf("E! error reading from %s\n", err.Error()))
|
||||||
case msg := <-n.in:
|
case msg := <-n.in:
|
||||||
metrics, err := n.parser.Parse(msg.Data)
|
metrics, err := n.parser.Parse(msg.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("E! subject: %s, error: %s", msg.Subject, err.Error())
|
n.acc.AddError(fmt.Errorf("E! subject: %s, error: %s", msg.Subject, err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
|
@ -179,8 +179,8 @@ func (n *natsConsumer) receiver() {
|
||||||
func (n *natsConsumer) clean() {
|
func (n *natsConsumer) clean() {
|
||||||
for _, sub := range n.Subs {
|
for _, sub := range n.Subs {
|
||||||
if err := sub.Unsubscribe(); err != nil {
|
if err := sub.Unsubscribe(); err != nil {
|
||||||
log.Printf("E! Error unsubscribing from subject %s in queue %s: %s\n",
|
n.acc.AddError(fmt.Errorf("E! Error unsubscribing from subject %s in queue %s: %s\n",
|
||||||
sub.Subject, sub.Queue, err.Error())
|
sub.Subject, sub.Queue, err.Error()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,11 +2,11 @@ package natsconsumer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/nats-io/nats"
|
"github.com/nats-io/nats"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -42,11 +42,8 @@ func TestRunParser(t *testing.T) {
|
||||||
n.wg.Add(1)
|
n.wg.Add(1)
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- natsMsg(testMsg)
|
in <- natsMsg(testMsg)
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
if acc.NFields() != 1 {
|
acc.Wait(1)
|
||||||
t.Errorf("got %v, expected %v", acc.NFields(), 1)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that the parser ignores invalid messages
|
// Test that the parser ignores invalid messages
|
||||||
|
@ -60,11 +57,10 @@ func TestRunParserInvalidMsg(t *testing.T) {
|
||||||
n.wg.Add(1)
|
n.wg.Add(1)
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- natsMsg(invalidMsg)
|
in <- natsMsg(invalidMsg)
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
if acc.NFields() != 0 {
|
acc.WaitError(1)
|
||||||
t.Errorf("got %v, expected %v", acc.NFields(), 0)
|
assert.Contains(t, acc.Errors[0].Error(), "E! subject: telegraf, error: metric parsing error")
|
||||||
}
|
assert.EqualValues(t, 0, acc.NMetrics())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that the parser parses line format messages into metrics
|
// Test that the parser parses line format messages into metrics
|
||||||
|
@ -78,10 +74,10 @@ func TestRunParserAndGather(t *testing.T) {
|
||||||
n.wg.Add(1)
|
n.wg.Add(1)
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- natsMsg(testMsg)
|
in <- natsMsg(testMsg)
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
n.Gather(&acc)
|
n.Gather(&acc)
|
||||||
|
|
||||||
|
acc.Wait(1)
|
||||||
acc.AssertContainsFields(t, "cpu_load_short",
|
acc.AssertContainsFields(t, "cpu_load_short",
|
||||||
map[string]interface{}{"value": float64(23422)})
|
map[string]interface{}{"value": float64(23422)})
|
||||||
}
|
}
|
||||||
|
@ -97,10 +93,10 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
|
||||||
n.wg.Add(1)
|
n.wg.Add(1)
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- natsMsg(testMsgGraphite)
|
in <- natsMsg(testMsgGraphite)
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
n.Gather(&acc)
|
n.Gather(&acc)
|
||||||
|
|
||||||
|
acc.Wait(1)
|
||||||
acc.AssertContainsFields(t, "cpu_load_short_graphite",
|
acc.AssertContainsFields(t, "cpu_load_short_graphite",
|
||||||
map[string]interface{}{"value": float64(23422)})
|
map[string]interface{}{"value": float64(23422)})
|
||||||
}
|
}
|
||||||
|
@ -116,10 +112,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
||||||
n.wg.Add(1)
|
n.wg.Add(1)
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- natsMsg(testMsgJSON)
|
in <- natsMsg(testMsgJSON)
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
n.Gather(&acc)
|
n.Gather(&acc)
|
||||||
|
|
||||||
|
acc.Wait(1)
|
||||||
acc.AssertContainsFields(t, "nats_json_test",
|
acc.AssertContainsFields(t, "nats_json_test",
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"a": float64(5),
|
"a": float64(5),
|
||||||
|
|
|
@ -81,42 +81,25 @@ func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {
|
||||||
|
|
||||||
acc := sl.Accumulator.(*testutil.Accumulator)
|
acc := sl.Accumulator.(*testutil.Accumulator)
|
||||||
|
|
||||||
|
acc.Wait(3)
|
||||||
acc.Lock()
|
acc.Lock()
|
||||||
if len(acc.Metrics) < 1 {
|
m1 := acc.Metrics[0]
|
||||||
acc.Wait()
|
m2 := acc.Metrics[1]
|
||||||
}
|
m3 := acc.Metrics[2]
|
||||||
require.True(t, len(acc.Metrics) >= 1)
|
|
||||||
m := acc.Metrics[0]
|
|
||||||
acc.Unlock()
|
acc.Unlock()
|
||||||
|
|
||||||
assert.Equal(t, "test", m.Measurement)
|
assert.Equal(t, "test", m1.Measurement)
|
||||||
assert.Equal(t, map[string]string{"foo": "bar"}, m.Tags)
|
assert.Equal(t, map[string]string{"foo": "bar"}, m1.Tags)
|
||||||
assert.Equal(t, map[string]interface{}{"v": int64(1)}, m.Fields)
|
assert.Equal(t, map[string]interface{}{"v": int64(1)}, m1.Fields)
|
||||||
assert.True(t, time.Unix(0, 123456789).Equal(m.Time))
|
assert.True(t, time.Unix(0, 123456789).Equal(m1.Time))
|
||||||
|
|
||||||
acc.Lock()
|
assert.Equal(t, "test", m2.Measurement)
|
||||||
if len(acc.Metrics) < 2 {
|
assert.Equal(t, map[string]string{"foo": "baz"}, m2.Tags)
|
||||||
acc.Wait()
|
assert.Equal(t, map[string]interface{}{"v": int64(2)}, m2.Fields)
|
||||||
}
|
assert.True(t, time.Unix(0, 123456790).Equal(m2.Time))
|
||||||
require.True(t, len(acc.Metrics) >= 2)
|
|
||||||
m = acc.Metrics[1]
|
|
||||||
acc.Unlock()
|
|
||||||
|
|
||||||
assert.Equal(t, "test", m.Measurement)
|
assert.Equal(t, "test", m3.Measurement)
|
||||||
assert.Equal(t, map[string]string{"foo": "baz"}, m.Tags)
|
assert.Equal(t, map[string]string{"foo": "zab"}, m3.Tags)
|
||||||
assert.Equal(t, map[string]interface{}{"v": int64(2)}, m.Fields)
|
assert.Equal(t, map[string]interface{}{"v": int64(3)}, m3.Fields)
|
||||||
assert.True(t, time.Unix(0, 123456790).Equal(m.Time))
|
assert.True(t, time.Unix(0, 123456791).Equal(m3.Time))
|
||||||
|
|
||||||
acc.Lock()
|
|
||||||
if len(acc.Metrics) < 3 {
|
|
||||||
acc.Wait()
|
|
||||||
}
|
|
||||||
require.True(t, len(acc.Metrics) >= 3)
|
|
||||||
m = acc.Metrics[2]
|
|
||||||
acc.Unlock()
|
|
||||||
|
|
||||||
assert.Equal(t, "test", m.Measurement)
|
|
||||||
assert.Equal(t, map[string]string{"foo": "zab"}, m.Tags)
|
|
||||||
assert.Equal(t, map[string]interface{}{"v": int64(3)}, m.Fields)
|
|
||||||
assert.True(t, time.Unix(0, 123456791).Equal(m.Time))
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package tail
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/hpcloud/tail"
|
"github.com/hpcloud/tail"
|
||||||
|
@ -86,7 +85,7 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
|
||||||
for _, filepath := range t.Files {
|
for _, filepath := range t.Files {
|
||||||
g, err := globpath.Compile(filepath)
|
g, err := globpath.Compile(filepath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("E! Error Glob %s failed to compile, %s", filepath, err)
|
t.acc.AddError(fmt.Errorf("E! Error Glob %s failed to compile, %s", filepath, err))
|
||||||
}
|
}
|
||||||
for file, _ := range g.Match() {
|
for file, _ := range g.Match() {
|
||||||
tailer, err := tail.TailFile(file,
|
tailer, err := tail.TailFile(file,
|
||||||
|
@ -124,21 +123,21 @@ func (t *Tail) receiver(tailer *tail.Tail) {
|
||||||
var line *tail.Line
|
var line *tail.Line
|
||||||
for line = range tailer.Lines {
|
for line = range tailer.Lines {
|
||||||
if line.Err != nil {
|
if line.Err != nil {
|
||||||
log.Printf("E! Error tailing file %s, Error: %s\n",
|
t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
|
||||||
tailer.Filename, err)
|
tailer.Filename, err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
m, err = t.parser.ParseLine(line.Text)
|
m, err = t.parser.ParseLine(line.Text)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
|
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
|
||||||
} else {
|
} else {
|
||||||
log.Printf("E! Malformed log line in %s: [%s], Error: %s\n",
|
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
|
||||||
tailer.Filename, line.Text, err)
|
tailer.Filename, line.Text, err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := tailer.Err(); err != nil {
|
if err := tailer.Err(); err != nil {
|
||||||
log.Printf("E! Error tailing file %s, Error: %s\n",
|
t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
|
||||||
tailer.Filename, err)
|
tailer.Filename, err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,12 +145,12 @@ func (t *Tail) Stop() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
|
|
||||||
for _, t := range t.tailers {
|
for _, tailer := range t.tailers {
|
||||||
err := t.Stop()
|
err := tailer.Stop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("E! Error stopping tail on file %s\n", t.Filename)
|
t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename))
|
||||||
}
|
}
|
||||||
t.Cleanup()
|
tailer.Cleanup()
|
||||||
}
|
}
|
||||||
t.wg.Wait()
|
t.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package tail
|
||||||
import (
|
import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -30,11 +31,9 @@ func TestTailFromBeginning(t *testing.T) {
|
||||||
|
|
||||||
acc := testutil.Accumulator{}
|
acc := testutil.Accumulator{}
|
||||||
require.NoError(t, tt.Start(&acc))
|
require.NoError(t, tt.Start(&acc))
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
require.NoError(t, tt.Gather(&acc))
|
require.NoError(t, tt.Gather(&acc))
|
||||||
// arbitrary sleep to wait for message to show up
|
|
||||||
time.Sleep(time.Millisecond * 150)
|
|
||||||
|
|
||||||
|
acc.Wait(1)
|
||||||
acc.AssertContainsTaggedFields(t, "cpu",
|
acc.AssertContainsTaggedFields(t, "cpu",
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"usage_idle": float64(100),
|
"usage_idle": float64(100),
|
||||||
|
@ -60,13 +59,19 @@ func TestTailFromEnd(t *testing.T) {
|
||||||
|
|
||||||
acc := testutil.Accumulator{}
|
acc := testutil.Accumulator{}
|
||||||
require.NoError(t, tt.Start(&acc))
|
require.NoError(t, tt.Start(&acc))
|
||||||
time.Sleep(time.Millisecond * 100)
|
time.Sleep(time.Millisecond * 200) //TODO remove once https://github.com/hpcloud/tail/pull/114 is merged & added to Godeps
|
||||||
|
for _, tailer := range tt.tailers {
|
||||||
|
for n, err := tailer.Tell(); err == nil && n == 0; n, err = tailer.Tell() {
|
||||||
|
// wait for tailer to jump to end
|
||||||
|
runtime.Gosched()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_, err = tmpfile.WriteString("cpu,othertag=foo usage_idle=100\n")
|
_, err = tmpfile.WriteString("cpu,othertag=foo usage_idle=100\n")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, tt.Gather(&acc))
|
require.NoError(t, tt.Gather(&acc))
|
||||||
time.Sleep(time.Millisecond * 50)
|
|
||||||
|
|
||||||
|
acc.Wait(1)
|
||||||
acc.AssertContainsTaggedFields(t, "cpu",
|
acc.AssertContainsTaggedFields(t, "cpu",
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"usage_idle": float64(100),
|
"usage_idle": float64(100),
|
||||||
|
@ -96,7 +101,7 @@ func TestTailBadLine(t *testing.T) {
|
||||||
_, err = tmpfile.WriteString("cpu mytag= foo usage_idle= 100\n")
|
_, err = tmpfile.WriteString("cpu mytag= foo usage_idle= 100\n")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, tt.Gather(&acc))
|
require.NoError(t, tt.Gather(&acc))
|
||||||
time.Sleep(time.Millisecond * 50)
|
|
||||||
|
|
||||||
assert.Len(t, acc.Metrics, 0)
|
acc.WaitError(1)
|
||||||
|
assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line")
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,15 @@
|
||||||
package tcp_listener
|
package tcp_listener
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
"net"
|
"net"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
@ -54,7 +59,6 @@ func BenchmarkTCP(b *testing.B) {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
conn, err := net.Dial("tcp", "127.0.0.1:8198")
|
conn, err := net.Dial("tcp", "127.0.0.1:8198")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -62,8 +66,10 @@ func BenchmarkTCP(b *testing.B) {
|
||||||
for i := 0; i < 100000; i++ {
|
for i := 0; i < 100000; i++ {
|
||||||
fmt.Fprintf(conn, testMsg)
|
fmt.Fprintf(conn, testMsg)
|
||||||
}
|
}
|
||||||
// wait for 100,000 metrics to get added to accumulator
|
conn.(*net.TCPConn).CloseWrite()
|
||||||
time.Sleep(time.Millisecond)
|
// wait for all 100,000 metrics to be processed
|
||||||
|
buf := []byte{0}
|
||||||
|
conn.Read(buf) // will EOF when completed
|
||||||
listener.Stop()
|
listener.Stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -81,16 +87,18 @@ func TestHighTrafficTCP(t *testing.T) {
|
||||||
err := listener.Start(acc)
|
err := listener.Start(acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
conn, err := net.Dial("tcp", "127.0.0.1:8199")
|
conn, err := net.Dial("tcp", "127.0.0.1:8199")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
for i := 0; i < 100000; i++ {
|
for i := 0; i < 100000; i++ {
|
||||||
fmt.Fprintf(conn, testMsg)
|
fmt.Fprintf(conn, testMsg)
|
||||||
}
|
}
|
||||||
time.Sleep(time.Millisecond)
|
conn.(*net.TCPConn).CloseWrite()
|
||||||
|
buf := []byte{0}
|
||||||
|
_, err = conn.Read(buf)
|
||||||
|
assert.Equal(t, err, io.EOF)
|
||||||
listener.Stop()
|
listener.Stop()
|
||||||
|
|
||||||
assert.Equal(t, 100000, len(acc.Metrics))
|
assert.Equal(t, 100000, int(acc.NMetrics()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConnectTCP(t *testing.T) {
|
func TestConnectTCP(t *testing.T) {
|
||||||
|
@ -105,13 +113,12 @@ func TestConnectTCP(t *testing.T) {
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
conn, err := net.Dial("tcp", "127.0.0.1:8194")
|
conn, err := net.Dial("tcp", "127.0.0.1:8194")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// send single message to socket
|
// send single message to socket
|
||||||
fmt.Fprintf(conn, testMsg)
|
fmt.Fprintf(conn, testMsg)
|
||||||
time.Sleep(time.Millisecond * 15)
|
acc.Wait(1)
|
||||||
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
map[string]interface{}{"value": float64(12)},
|
map[string]interface{}{"value": float64(12)},
|
||||||
map[string]string{"host": "server01"},
|
map[string]string{"host": "server01"},
|
||||||
|
@ -119,7 +126,7 @@ func TestConnectTCP(t *testing.T) {
|
||||||
|
|
||||||
// send multiple messages to socket
|
// send multiple messages to socket
|
||||||
fmt.Fprintf(conn, testMsgs)
|
fmt.Fprintf(conn, testMsgs)
|
||||||
time.Sleep(time.Millisecond * 15)
|
acc.Wait(6)
|
||||||
hostTags := []string{"server02", "server03",
|
hostTags := []string{"server02", "server03",
|
||||||
"server04", "server05", "server06"}
|
"server04", "server05", "server06"}
|
||||||
for _, hostTag := range hostTags {
|
for _, hostTag := range hostTags {
|
||||||
|
@ -143,7 +150,6 @@ func TestConcurrentConns(t *testing.T) {
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
_, err := net.Dial("tcp", "127.0.0.1:8195")
|
_, err := net.Dial("tcp", "127.0.0.1:8195")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
_, err = net.Dial("tcp", "127.0.0.1:8195")
|
_, err = net.Dial("tcp", "127.0.0.1:8195")
|
||||||
|
@ -162,10 +168,8 @@ func TestConcurrentConns(t *testing.T) {
|
||||||
" the Telegraf tcp listener configuration.\n",
|
" the Telegraf tcp listener configuration.\n",
|
||||||
string(buf[:n]))
|
string(buf[:n]))
|
||||||
|
|
||||||
_, err = conn.Write([]byte(testMsg))
|
_, err = conn.Read(buf)
|
||||||
assert.NoError(t, err)
|
assert.Equal(t, io.EOF, err)
|
||||||
time.Sleep(time.Millisecond * 10)
|
|
||||||
assert.Zero(t, acc.NFields())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that MaxTCPConections is respected when max==1
|
// Test that MaxTCPConections is respected when max==1
|
||||||
|
@ -181,7 +185,6 @@ func TestConcurrentConns1(t *testing.T) {
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
_, err := net.Dial("tcp", "127.0.0.1:8196")
|
_, err := net.Dial("tcp", "127.0.0.1:8196")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
@ -198,10 +201,8 @@ func TestConcurrentConns1(t *testing.T) {
|
||||||
" the Telegraf tcp listener configuration.\n",
|
" the Telegraf tcp listener configuration.\n",
|
||||||
string(buf[:n]))
|
string(buf[:n]))
|
||||||
|
|
||||||
_, err = conn.Write([]byte(testMsg))
|
_, err = conn.Read(buf)
|
||||||
assert.NoError(t, err)
|
assert.Equal(t, io.EOF, err)
|
||||||
time.Sleep(time.Millisecond * 10)
|
|
||||||
assert.Zero(t, acc.NFields())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that MaxTCPConections is respected
|
// Test that MaxTCPConections is respected
|
||||||
|
@ -216,7 +217,6 @@ func TestCloseConcurrentConns(t *testing.T) {
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
_, err := net.Dial("tcp", "127.0.0.1:8195")
|
_, err := net.Dial("tcp", "127.0.0.1:8195")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
_, err = net.Dial("tcp", "127.0.0.1:8195")
|
_, err = net.Dial("tcp", "127.0.0.1:8195")
|
||||||
|
@ -238,13 +238,9 @@ func TestRunParser(t *testing.T) {
|
||||||
go listener.tcpParser()
|
go listener.tcpParser()
|
||||||
|
|
||||||
in <- testmsg
|
in <- testmsg
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
listener.Gather(&acc)
|
listener.Gather(&acc)
|
||||||
|
|
||||||
if a := acc.NFields(); a != 1 {
|
acc.Wait(1)
|
||||||
t.Errorf("got %v, expected %v", a, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
map[string]interface{}{"value": float64(12)},
|
map[string]interface{}{"value": float64(12)},
|
||||||
map[string]string{"host": "server01"},
|
map[string]string{"host": "server01"},
|
||||||
|
@ -263,11 +259,16 @@ func TestRunParserInvalidMsg(t *testing.T) {
|
||||||
listener.wg.Add(1)
|
listener.wg.Add(1)
|
||||||
go listener.tcpParser()
|
go listener.tcpParser()
|
||||||
|
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
log.SetOutput(buf)
|
||||||
|
defer log.SetOutput(os.Stderr)
|
||||||
in <- testmsg
|
in <- testmsg
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
if a := acc.NFields(); a != 0 {
|
scnr := bufio.NewScanner(buf)
|
||||||
t.Errorf("got %v, expected %v", a, 0)
|
for scnr.Scan() {
|
||||||
|
if strings.Contains(scnr.Text(), fmt.Sprintf(malformedwarn, 1)) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -284,9 +285,9 @@ func TestRunParserGraphiteMsg(t *testing.T) {
|
||||||
go listener.tcpParser()
|
go listener.tcpParser()
|
||||||
|
|
||||||
in <- testmsg
|
in <- testmsg
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
listener.Gather(&acc)
|
listener.Gather(&acc)
|
||||||
|
|
||||||
|
acc.Wait(1)
|
||||||
acc.AssertContainsFields(t, "cpu_load_graphite",
|
acc.AssertContainsFields(t, "cpu_load_graphite",
|
||||||
map[string]interface{}{"value": float64(12)})
|
map[string]interface{}{"value": float64(12)})
|
||||||
}
|
}
|
||||||
|
@ -304,9 +305,9 @@ func TestRunParserJSONMsg(t *testing.T) {
|
||||||
go listener.tcpParser()
|
go listener.tcpParser()
|
||||||
|
|
||||||
in <- testmsg
|
in <- testmsg
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
listener.Gather(&acc)
|
listener.Gather(&acc)
|
||||||
|
|
||||||
|
acc.Wait(1)
|
||||||
acc.AssertContainsFields(t, "udp_json_test",
|
acc.AssertContainsFields(t, "udp_json_test",
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"a": float64(5),
|
"a": float64(5),
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package udp_listener
|
package udp_listener
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -107,8 +108,9 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error {
|
||||||
u.in = make(chan []byte, u.AllowedPendingMessages)
|
u.in = make(chan []byte, u.AllowedPendingMessages)
|
||||||
u.done = make(chan struct{})
|
u.done = make(chan struct{})
|
||||||
|
|
||||||
u.wg.Add(2)
|
u.udpListen()
|
||||||
go u.udpListen()
|
|
||||||
|
u.wg.Add(1)
|
||||||
go u.udpParser()
|
go u.udpParser()
|
||||||
|
|
||||||
log.Printf("I! Started UDP listener service on %s (ReadBuffer: %d)\n", u.ServiceAddress, u.UDPBufferSize)
|
log.Printf("I! Started UDP listener service on %s (ReadBuffer: %d)\n", u.ServiceAddress, u.UDPBufferSize)
|
||||||
|
@ -126,32 +128,37 @@ func (u *UdpListener) Stop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *UdpListener) udpListen() error {
|
func (u *UdpListener) udpListen() error {
|
||||||
defer u.wg.Done()
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress)
|
address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress)
|
||||||
u.listener, err = net.ListenUDP("udp", address)
|
u.listener, err = net.ListenUDP("udp", address)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("E! Error: ListenUDP - %s", err)
|
return fmt.Errorf("E! Error: ListenUDP - %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("I! UDP server listening on: ", u.listener.LocalAddr().String())
|
log.Println("I! UDP server listening on: ", u.listener.LocalAddr().String())
|
||||||
|
|
||||||
buf := make([]byte, UDP_MAX_PACKET_SIZE)
|
|
||||||
|
|
||||||
if u.UDPBufferSize > 0 {
|
if u.UDPBufferSize > 0 {
|
||||||
err = u.listener.SetReadBuffer(u.UDPBufferSize) // if we want to move away from OS default
|
err = u.listener.SetReadBuffer(u.UDPBufferSize) // if we want to move away from OS default
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("E! Failed to set UDP read buffer to %d: %s", u.UDPBufferSize, err)
|
return fmt.Errorf("E! Failed to set UDP read buffer to %d: %s", u.UDPBufferSize, err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
u.wg.Add(1)
|
||||||
|
go u.udpListenLoop()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *UdpListener) udpListenLoop() {
|
||||||
|
defer u.wg.Done()
|
||||||
|
|
||||||
|
buf := make([]byte, UDP_MAX_PACKET_SIZE)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-u.done:
|
case <-u.done:
|
||||||
return nil
|
return
|
||||||
default:
|
default:
|
||||||
u.listener.SetReadDeadline(time.Now().Add(time.Second))
|
u.listener.SetReadDeadline(time.Now().Add(time.Second))
|
||||||
|
|
||||||
|
|
|
@ -1,12 +1,16 @@
|
||||||
package udp_listener
|
package udp_listener
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
|
"os"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
@ -50,22 +54,27 @@ func TestHighTrafficUDP(t *testing.T) {
|
||||||
err := listener.Start(acc)
|
err := listener.Start(acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
conn, err := net.Dial("udp", "127.0.0.1:8126")
|
conn, err := net.Dial("udp", "127.0.0.1:8126")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
mlen := int64(len(testMsgs))
|
||||||
|
var sent int64
|
||||||
for i := 0; i < 20000; i++ {
|
for i := 0; i < 20000; i++ {
|
||||||
// arbitrary, just to give the OS buffer some slack handling the
|
for sent > listener.BytesRecv.Get()+32000 {
|
||||||
// packet storm.
|
// more than 32kb sitting in OS buffer, let it drain
|
||||||
time.Sleep(time.Microsecond)
|
runtime.Gosched()
|
||||||
fmt.Fprintf(conn, testMsgs)
|
}
|
||||||
|
conn.Write([]byte(testMsgs))
|
||||||
|
sent += mlen
|
||||||
|
}
|
||||||
|
for sent > listener.BytesRecv.Get() {
|
||||||
|
runtime.Gosched()
|
||||||
|
}
|
||||||
|
for len(listener.in) > 0 {
|
||||||
|
runtime.Gosched()
|
||||||
}
|
}
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
listener.Stop()
|
listener.Stop()
|
||||||
|
|
||||||
// this is not an exact science, since UDP packets can easily get lost or
|
assert.Equal(t, uint64(100000), acc.NMetrics())
|
||||||
// dropped, but assume that the OS will be able to
|
|
||||||
// handle at least 90% of the sent UDP packets.
|
|
||||||
assert.InDelta(t, 100000, len(acc.Metrics), 10000)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConnectUDP(t *testing.T) {
|
func TestConnectUDP(t *testing.T) {
|
||||||
|
@ -79,13 +88,12 @@ func TestConnectUDP(t *testing.T) {
|
||||||
require.NoError(t, listener.Start(acc))
|
require.NoError(t, listener.Start(acc))
|
||||||
defer listener.Stop()
|
defer listener.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
conn, err := net.Dial("udp", "127.0.0.1:8127")
|
conn, err := net.Dial("udp", "127.0.0.1:8127")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// send single message to socket
|
// send single message to socket
|
||||||
fmt.Fprintf(conn, testMsg)
|
fmt.Fprintf(conn, testMsg)
|
||||||
time.Sleep(time.Millisecond * 15)
|
acc.Wait(1)
|
||||||
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
map[string]interface{}{"value": float64(12)},
|
map[string]interface{}{"value": float64(12)},
|
||||||
map[string]string{"host": "server01"},
|
map[string]string{"host": "server01"},
|
||||||
|
@ -93,7 +101,7 @@ func TestConnectUDP(t *testing.T) {
|
||||||
|
|
||||||
// send multiple messages to socket
|
// send multiple messages to socket
|
||||||
fmt.Fprintf(conn, testMsgs)
|
fmt.Fprintf(conn, testMsgs)
|
||||||
time.Sleep(time.Millisecond * 15)
|
acc.Wait(6)
|
||||||
hostTags := []string{"server02", "server03",
|
hostTags := []string{"server02", "server03",
|
||||||
"server04", "server05", "server06"}
|
"server04", "server05", "server06"}
|
||||||
for _, hostTag := range hostTags {
|
for _, hostTag := range hostTags {
|
||||||
|
@ -118,13 +126,9 @@ func TestRunParser(t *testing.T) {
|
||||||
go listener.udpParser()
|
go listener.udpParser()
|
||||||
|
|
||||||
in <- testmsg
|
in <- testmsg
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
listener.Gather(&acc)
|
listener.Gather(&acc)
|
||||||
|
|
||||||
if a := acc.NFields(); a != 1 {
|
acc.Wait(1)
|
||||||
t.Errorf("got %v, expected %v", a, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
map[string]interface{}{"value": float64(12)},
|
map[string]interface{}{"value": float64(12)},
|
||||||
map[string]string{"host": "server01"},
|
map[string]string{"host": "server01"},
|
||||||
|
@ -144,11 +148,16 @@ func TestRunParserInvalidMsg(t *testing.T) {
|
||||||
listener.wg.Add(1)
|
listener.wg.Add(1)
|
||||||
go listener.udpParser()
|
go listener.udpParser()
|
||||||
|
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
log.SetOutput(buf)
|
||||||
|
defer log.SetOutput(os.Stderr)
|
||||||
in <- testmsg
|
in <- testmsg
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
|
|
||||||
if a := acc.NFields(); a != 0 {
|
scnr := bufio.NewScanner(buf)
|
||||||
t.Errorf("got %v, expected %v", a, 0)
|
for scnr.Scan() {
|
||||||
|
if strings.Contains(scnr.Text(), fmt.Sprintf(malformedwarn, 1)) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,9 +175,9 @@ func TestRunParserGraphiteMsg(t *testing.T) {
|
||||||
go listener.udpParser()
|
go listener.udpParser()
|
||||||
|
|
||||||
in <- testmsg
|
in <- testmsg
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
listener.Gather(&acc)
|
listener.Gather(&acc)
|
||||||
|
|
||||||
|
acc.Wait(1)
|
||||||
acc.AssertContainsFields(t, "cpu_load_graphite",
|
acc.AssertContainsFields(t, "cpu_load_graphite",
|
||||||
map[string]interface{}{"value": float64(12)})
|
map[string]interface{}{"value": float64(12)})
|
||||||
}
|
}
|
||||||
|
@ -187,9 +196,9 @@ func TestRunParserJSONMsg(t *testing.T) {
|
||||||
go listener.udpParser()
|
go listener.udpParser()
|
||||||
|
|
||||||
in <- testmsg
|
in <- testmsg
|
||||||
time.Sleep(time.Millisecond * 25)
|
|
||||||
listener.Gather(&acc)
|
listener.Gather(&acc)
|
||||||
|
|
||||||
|
acc.Wait(1)
|
||||||
acc.AssertContainsFields(t, "udp_json_test",
|
acc.AssertContainsFields(t, "udp_json_test",
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"a": float64(5),
|
"a": float64(5),
|
||||||
|
|
|
@ -44,9 +44,7 @@ func TestGraphiteOK(t *testing.T) {
|
||||||
// Start TCP server
|
// Start TCP server
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
t.Log("Starting server")
|
t.Log("Starting server")
|
||||||
go TCPServer1(t, &wg)
|
TCPServer1(t, &wg)
|
||||||
// Give the fake graphite TCP server some time to start:
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
|
|
||||||
// Init plugin
|
// Init plugin
|
||||||
g := Graphite{
|
g := Graphite{
|
||||||
|
@ -88,10 +86,8 @@ func TestGraphiteOK(t *testing.T) {
|
||||||
t.Log("Finished Waiting for first data")
|
t.Log("Finished Waiting for first data")
|
||||||
var wg2 sync.WaitGroup
|
var wg2 sync.WaitGroup
|
||||||
// Start TCP server
|
// Start TCP server
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
wg2.Add(1)
|
wg2.Add(1)
|
||||||
go TCPServer2(t, &wg2)
|
TCPServer2(t, &wg2)
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
//Write but expect an error, but reconnect
|
//Write but expect an error, but reconnect
|
||||||
g.Write(metrics2)
|
g.Write(metrics2)
|
||||||
err3 := g.Write(metrics2)
|
err3 := g.Write(metrics2)
|
||||||
|
@ -105,8 +101,9 @@ func TestGraphiteOK(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
|
func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
|
||||||
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
|
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
conn, _ := (tcpServer).Accept()
|
conn, _ := (tcpServer).Accept()
|
||||||
reader := bufio.NewReader(conn)
|
reader := bufio.NewReader(conn)
|
||||||
tp := textproto.NewReader(reader)
|
tp := textproto.NewReader(reader)
|
||||||
|
@ -114,11 +111,13 @@ func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
|
||||||
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
|
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
|
||||||
conn.Close()
|
conn.Close()
|
||||||
tcpServer.Close()
|
tcpServer.Close()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
|
func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
|
||||||
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
|
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
conn2, _ := (tcpServer).Accept()
|
conn2, _ := (tcpServer).Accept()
|
||||||
reader := bufio.NewReader(conn2)
|
reader := bufio.NewReader(conn2)
|
||||||
tp := textproto.NewReader(reader)
|
tp := textproto.NewReader(reader)
|
||||||
|
@ -128,4 +127,5 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
|
||||||
assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3)
|
assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3)
|
||||||
conn2.Close()
|
conn2.Close()
|
||||||
tcpServer.Close()
|
tcpServer.Close()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,6 @@ func TestUDPClient_Write(t *testing.T) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// test sending simple metric
|
// test sending simple metric
|
||||||
time.Sleep(time.Second)
|
|
||||||
n, err := client.Write([]byte("cpu value=99\n"))
|
n, err := client.Write([]byte("cpu value=99\n"))
|
||||||
assert.Equal(t, n, 13)
|
assert.Equal(t, n, 13)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
|
@ -16,9 +16,7 @@ import (
|
||||||
func TestWrite(t *testing.T) {
|
func TestWrite(t *testing.T) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go TCPServer(t, &wg)
|
TCPServer(t, &wg)
|
||||||
// Give the fake TCP server some time to start:
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
|
||||||
|
|
||||||
i := Instrumental{
|
i := Instrumental{
|
||||||
Host: "127.0.0.1",
|
Host: "127.0.0.1",
|
||||||
|
@ -79,6 +77,7 @@ func TestWrite(t *testing.T) {
|
||||||
|
|
||||||
func TCPServer(t *testing.T, wg *sync.WaitGroup) {
|
func TCPServer(t *testing.T, wg *sync.WaitGroup) {
|
||||||
tcpServer, _ := net.Listen("tcp", "127.0.0.1:8000")
|
tcpServer, _ := net.Listen("tcp", "127.0.0.1:8000")
|
||||||
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
conn, _ := tcpServer.Accept()
|
conn, _ := tcpServer.Accept()
|
||||||
conn.SetDeadline(time.Now().Add(1 * time.Second))
|
conn.SetDeadline(time.Now().Add(1 * time.Second))
|
||||||
|
@ -120,4 +119,5 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
|
||||||
assert.Equal(t, "", data6)
|
assert.Equal(t, "", data6)
|
||||||
|
|
||||||
conn.Close()
|
conn.Close()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,6 +129,9 @@ func (a *Accumulator) AddError(err error) {
|
||||||
}
|
}
|
||||||
a.Lock()
|
a.Lock()
|
||||||
a.Errors = append(a.Errors, err)
|
a.Errors = append(a.Errors, err)
|
||||||
|
if a.Cond != nil {
|
||||||
|
a.Cond.Broadcast()
|
||||||
|
}
|
||||||
a.Unlock()
|
a.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,13 +201,28 @@ func (a *Accumulator) NFields() int {
|
||||||
return counter
|
return counter
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait waits for a metric to be added to the accumulator.
|
// Wait waits for the given number of metrics to be added to the accumulator.
|
||||||
// Accumulator must already be locked.
|
func (a *Accumulator) Wait(n int) {
|
||||||
func (a *Accumulator) Wait() {
|
a.Lock()
|
||||||
if a.Cond == nil {
|
if a.Cond == nil {
|
||||||
a.Cond = sync.NewCond(&a.Mutex)
|
a.Cond = sync.NewCond(&a.Mutex)
|
||||||
}
|
}
|
||||||
|
for int(a.NMetrics()) < n {
|
||||||
a.Cond.Wait()
|
a.Cond.Wait()
|
||||||
|
}
|
||||||
|
a.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitError waits for the given number of errors to be added to the accumulator.
|
||||||
|
func (a *Accumulator) WaitError(n int) {
|
||||||
|
a.Lock()
|
||||||
|
if a.Cond == nil {
|
||||||
|
a.Cond = sync.NewCond(&a.Mutex)
|
||||||
|
}
|
||||||
|
for len(a.Errors) < n {
|
||||||
|
a.Cond.Wait()
|
||||||
|
}
|
||||||
|
a.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Accumulator) AssertContainsTaggedFields(
|
func (a *Accumulator) AssertContainsTaggedFields(
|
||||||
|
|
Loading…
Reference in New Issue