Throughout telegraf, use telegraf.Metric rather than client.Point

closes #599
This commit is contained in:
Cameron Sparr
2016-01-27 16:15:14 -07:00
committed by Ryan Merrick
parent 5364a20825
commit 1edfa9bbd0
52 changed files with 391 additions and 437 deletions

View File

@@ -5,7 +5,6 @@ import (
"strings"
"sync"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
@@ -28,8 +27,8 @@ type Kafka struct {
// channel for all kafka consumer errors
errs <-chan *sarama.ConsumerError
// channel for all incoming parsed kafka points
pointChan chan models.Point
done chan struct{}
metricC chan telegraf.Metric
done chan struct{}
// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer
// this is mostly for test purposes, but there may be a use-case for it later.
@@ -94,7 +93,7 @@ func (k *Kafka) Start() error {
if k.PointBuffer == 0 {
k.PointBuffer = 100000
}
k.pointChan = make(chan models.Point, k.PointBuffer)
k.metricC = make(chan telegraf.Metric, k.PointBuffer)
// Start the kafka message reader
go k.parser()
@@ -113,18 +112,18 @@ func (k *Kafka) parser() {
case err := <-k.errs:
log.Printf("Kafka Consumer Error: %s\n", err.Error())
case msg := <-k.in:
points, err := models.ParsePoints(msg.Value)
metrics, err := telegraf.ParseMetrics(msg.Value)
if err != nil {
log.Printf("Could not parse kafka message: %s, error: %s",
string(msg.Value), err.Error())
}
for _, point := range points {
for _, metric := range metrics {
select {
case k.pointChan <- point:
case k.metricC <- metric:
continue
default:
log.Printf("Kafka Consumer buffer is full, dropping a point." +
log.Printf("Kafka Consumer buffer is full, dropping a metric." +
" You may want to increase the point_buffer setting")
}
}
@@ -152,9 +151,9 @@ func (k *Kafka) Stop() {
func (k *Kafka) Gather(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
npoints := len(k.pointChan)
npoints := len(k.metricC)
for i := 0; i < npoints; i++ {
point := <-k.pointChan
point := <-k.metricC
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
}
return nil

View File

@@ -51,13 +51,13 @@ func TestReadsMetricsFromKafka(t *testing.T) {
// Verify that we can now gather the sent message
var acc testutil.Accumulator
// Sanity check
assert.Equal(t, 0, len(acc.Points), "There should not be any points")
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
// Gather points
err = k.Gather(&acc)
require.NoError(t, err)
if len(acc.Points) == 1 {
point := acc.Points[0]
if len(acc.Metrics) == 1 {
point := acc.Metrics[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
assert.Equal(t, map[string]string{
@@ -83,7 +83,7 @@ func waitForPoint(k *Kafka, t *testing.T) {
counter++
if counter > 1000 {
t.Fatal("Waited for 5s, point never arrived to consumer")
} else if len(k.pointChan) == 1 {
} else if len(k.metricC) == 1 {
return
}
}

View File

@@ -4,7 +4,7 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/Shopify/sarama"
@@ -29,7 +29,7 @@ func NewTestKafka() (*Kafka, chan *sarama.ConsumerMessage) {
doNotCommitMsgs: true,
errs: make(chan *sarama.ConsumerError, pointBuffer),
done: make(chan struct{}),
pointChan: make(chan models.Point, pointBuffer),
metricC: make(chan telegraf.Metric, pointBuffer),
}
return &k, in
}
@@ -43,7 +43,7 @@ func TestRunParser(t *testing.T) {
in <- saramaMsg(testMsg)
time.Sleep(time.Millisecond)
assert.Equal(t, len(k.pointChan), 1)
assert.Equal(t, len(k.metricC), 1)
}
// Test that the parser ignores invalid messages
@@ -55,7 +55,7 @@ func TestRunParserInvalidMsg(t *testing.T) {
in <- saramaMsg(invalidMsg)
time.Sleep(time.Millisecond)
assert.Equal(t, len(k.pointChan), 0)
assert.Equal(t, len(k.metricC), 0)
}
// Test that points are dropped when we hit the buffer limit
@@ -69,7 +69,7 @@ func TestRunParserRespectsBuffer(t *testing.T) {
}
time.Sleep(time.Millisecond)
assert.Equal(t, len(k.pointChan), 5)
assert.Equal(t, len(k.metricC), 5)
}
// Test that the parser parses kafka messages into points
@@ -84,7 +84,7 @@ func TestRunParserAndGather(t *testing.T) {
acc := testutil.Accumulator{}
k.Gather(&acc)
assert.Equal(t, len(acc.Points), 1)
assert.Equal(t, len(acc.Metrics), 1)
acc.AssertContainsFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(23422)})
}