Implement telegraf's own full metric type

main reasons behind this:
- make adding/removing tags cheap
- make adding/removing fields cheap
- make parsing cheaper
- make parse -> decorate -> write out bytes metric flow much faster

Refactor serializer to use byte buffer
This commit is contained in:
Cameron Sparr
2016-11-22 12:51:57 +00:00
parent 332f678afb
commit db7a4b24b6
40 changed files with 1376 additions and 398 deletions

View File

@@ -1,7 +1,6 @@
package amqp
import (
"bytes"
"fmt"
"log"
"strings"
@@ -178,7 +177,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
var outbuf = make(map[string][][]byte)
outbuf := make(map[string][]byte)
for _, metric := range metrics {
var key string
@@ -188,14 +187,12 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
}
}
values, err := q.serializer.Serialize(metric)
buf, err := q.serializer.Serialize(metric)
if err != nil {
return err
}
for _, value := range values {
outbuf[key] = append(outbuf[key], []byte(value))
}
outbuf[key] = append(outbuf[key], buf...)
}
for key, buf := range outbuf {
@@ -207,7 +204,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
amqp.Publishing{
Headers: q.headers,
ContentType: "text/plain",
Body: bytes.Join(buf, []byte("\n")),
Body: buf,
})
if err != nil {
return fmt.Errorf("FAILED to send amqp message: %s", err)

View File

@@ -92,16 +92,9 @@ func (f *File) Write(metrics []telegraf.Metric) error {
}
for _, metric := range metrics {
values, err := f.serializer.Serialize(metric)
_, err := f.writer.Write(metric.Serialize())
if err != nil {
return err
}
for _, value := range values {
_, err = f.writer.Write([]byte(value + "\n"))
if err != nil {
return fmt.Errorf("FAILED to write message: %s, %s", value, err)
}
return fmt.Errorf("FAILED to write message: %s, %s", metric.Serialize(), err)
}
}
return nil

View File

@@ -5,7 +5,6 @@ import (
"log"
"math/rand"
"net"
"strings"
"time"
"github.com/influxdata/telegraf"
@@ -76,20 +75,19 @@ func (g *Graphite) Description() string {
// occurs, logging each unsuccessful. If all servers fail, return error.
func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data
var bp []string
var batch []byte
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template)
if err != nil {
return err
}
for _, metric := range metrics {
gMetrics, err := s.Serialize(metric)
buf, err := s.Serialize(metric)
if err != nil {
log.Printf("E! Error serializing some metrics to graphite: %s", err.Error())
}
bp = append(bp, gMetrics...)
batch = append(batch, buf...)
}
graphitePoints := strings.Join(bp, "\n") + "\n"
// This will get set to nil if a successful write occurs
err = errors.New("Could not write to any Graphite server in cluster\n")
@@ -100,7 +98,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
if g.Timeout > 0 {
g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
}
if _, e := g.conns[n].Write([]byte(graphitePoints)); e != nil {
if _, e := g.conns[n].Write(batch); e != nil {
// Error
log.Println("E! Graphite Error: " + e.Error())
// Let's try the next one

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -21,7 +22,7 @@ func TestGraphiteError(t *testing.T) {
Prefix: "my.prefix",
}
// Init metrics
m1, _ := telegraf.NewMetric(
m1, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"mymeasurement": float64(3.14)},
@@ -51,19 +52,19 @@ func TestGraphiteOK(t *testing.T) {
Prefix: "my.prefix",
}
// Init metrics
m1, _ := telegraf.NewMetric(
m1, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := telegraf.NewMetric(
m2, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m3, _ := telegraf.NewMetric(
m3, _ := metric.New(
"my_measurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},

View File

@@ -1,6 +1,7 @@
package instrumental
import (
"bytes"
"fmt"
"io"
"log"
@@ -10,11 +11,17 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
)
var (
ValueIncludesBadChar = regexp.MustCompile("[^[:digit:].]")
MetricNameReplacer = regexp.MustCompile("[^-[:alnum:]_.]+")
)
type Instrumental struct {
Host string
ApiToken string
@@ -34,11 +41,6 @@ const (
HandshakeFormat = HelloMessage + AuthFormat
)
var (
ValueIncludesBadChar = regexp.MustCompile("[^[:digit:].]")
MetricNameReplacer = regexp.MustCompile("[^-[:alnum:]_.]+")
)
var sampleConfig = `
## Project API Token (required)
api_token = "API Token" # required
@@ -94,7 +96,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
var toSerialize telegraf.Metric
var newTags map[string]string
for _, metric := range metrics {
for _, m := range metrics {
// Pull the metric_type out of the metric's tags. We don't want the type
// to show up with the other tags pulled from the system, as they go in the
// beginning of the line instead.
@@ -106,18 +108,18 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
//
// increment some_prefix.host.tag1.tag2.tag3.counter.field value timestamp
//
newTags = metric.Tags()
newTags = m.Tags()
metricType = newTags["metric_type"]
delete(newTags, "metric_type")
toSerialize, _ = telegraf.NewMetric(
metric.Name(),
toSerialize, _ = metric.New(
m.Name(),
newTags,
metric.Fields(),
metric.Time(),
m.Fields(),
m.Time(),
)
stats, err := s.Serialize(toSerialize)
buf, err := s.Serialize(toSerialize)
if err != nil {
log.Printf("E! Error serializing a metric to Instrumental: %s", err)
}
@@ -131,20 +133,25 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
metricType = "gauge"
}
for _, stat := range stats {
buffer := bytes.NewBuffer(buf)
for {
line, err := buffer.ReadBytes('\n')
if err != nil {
break
}
stat := string(line)
// decompose "metric.name value time"
splitStat := strings.SplitN(stat, " ", 3)
metric := splitStat[0]
name := splitStat[0]
value := splitStat[1]
time := splitStat[2]
// replace invalid components of metric name with underscore
clean_metric := MetricNameReplacer.ReplaceAllString(metric, "_")
clean_metric := MetricNameReplacer.ReplaceAllString(name, "_")
if !ValueIncludesBadChar.MatchString(value) {
points = append(points, fmt.Sprintf("%s %s %s %s", metricType, clean_metric, value, time))
} else if i.Debug {
log.Printf("E! Instrumental unable to send bad stat: %s", stat)
}
}
}
@@ -152,8 +159,6 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
allPoints := strings.Join(points, "\n") + "\n"
_, err = fmt.Fprintf(i.conn, allPoints)
log.Println("D! Instrumental: " + allPoints)
if err != nil {
if err == io.EOF {
i.Close()

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
)
@@ -26,13 +27,13 @@ func TestWrite(t *testing.T) {
}
// Default to gauge
m1, _ := telegraf.NewMetric(
m1, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := telegraf.NewMetric(
m2, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1", "metric_type": "set"},
map[string]interface{}{"value": float64(3.14)},
@@ -43,27 +44,27 @@ func TestWrite(t *testing.T) {
i.Write(metrics)
// Counter and Histogram are increments
m3, _ := telegraf.NewMetric(
m3, _ := metric.New(
"my_histogram",
map[string]string{"host": "192.168.0.1", "metric_type": "histogram"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
// We will modify metric names that won't be accepted by Instrumental
m4, _ := telegraf.NewMetric(
m4, _ := metric.New(
"bad_metric_name",
map[string]string{"host": "192.168.0.1:8888::123", "metric_type": "counter"},
map[string]interface{}{"value": 1},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
// We will drop metric values that won't be accepted by Instrumental
m5, _ := telegraf.NewMetric(
m5, _ := metric.New(
"bad_values",
map[string]string{"host": "192.168.0.1", "metric_type": "counter"},
map[string]interface{}{"value": "\" 3:30\""},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m6, _ := telegraf.NewMetric(
m6, _ := metric.New(
"my_counter",
map[string]string{"host": "192.168.0.1", "metric_type": "counter"},
map[string]interface{}{"value": float64(3.14)},

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/require"
)
@@ -163,7 +164,7 @@ func TestBuildGauge(t *testing.T) {
}
func newHostMetric(value interface{}, name, host string) (metric telegraf.Metric) {
metric, _ = telegraf.NewMetric(
metric, _ = metric.New(
name,
map[string]string{"host": host},
map[string]interface{}{"value": value},
@@ -174,19 +175,19 @@ func newHostMetric(value interface{}, name, host string) (metric telegraf.Metric
func TestBuildGaugeWithSource(t *testing.T) {
mtime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
pt1, _ := telegraf.NewMetric(
pt1, _ := metric.New(
"test1",
map[string]string{"hostname": "192.168.0.1", "tag1": "value1"},
map[string]interface{}{"value": 0.0},
mtime,
)
pt2, _ := telegraf.NewMetric(
pt2, _ := metric.New(
"test2",
map[string]string{"hostnam": "192.168.0.1", "tag1": "value1"},
map[string]interface{}{"value": 1.0},
mtime,
)
pt3, _ := telegraf.NewMetric(
pt3, _ := metric.New(
"test3",
map[string]string{
"hostname": "192.168.0.1",
@@ -195,7 +196,7 @@ func TestBuildGaugeWithSource(t *testing.T) {
map[string]interface{}{"value": 1.0},
mtime,
)
pt4, _ := telegraf.NewMetric(
pt4, _ := metric.New(
"test4",
map[string]string{
"hostname": "192.168.0.1",

View File

@@ -128,24 +128,22 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
t = append(t, metric.Name())
topic := strings.Join(t, "/")
values, err := m.serializer.Serialize(metric)
buf, err := m.serializer.Serialize(metric)
if err != nil {
return fmt.Errorf("MQTT Could not serialize metric: %s",
metric.String())
}
for _, value := range values {
err = m.publish(topic, value)
if err != nil {
return fmt.Errorf("Could not write to MQTT server, %s", err)
}
err = m.publish(topic, buf)
if err != nil {
return fmt.Errorf("Could not write to MQTT server, %s", err)
}
}
return nil
}
func (m *MQTT) publish(topic, body string) error {
func (m *MQTT) publish(topic string, body []byte) error {
token := m.client.Publish(topic, byte(m.QoS), false, body)
token.Wait()
if token.Error() != nil {

View File

@@ -115,20 +115,13 @@ func (n *NATS) Write(metrics []telegraf.Metric) error {
}
for _, metric := range metrics {
values, err := n.serializer.Serialize(metric)
buf, err := n.serializer.Serialize(metric)
if err != nil {
return err
}
var pubErr error
for _, value := range values {
err = n.conn.Publish(n.Subject, []byte(value))
if err != nil {
pubErr = err
}
}
if pubErr != nil {
err = n.conn.Publish(n.Subject, buf)
if err != nil {
return fmt.Errorf("FAILED to send NATS message: %s", err)
}
}

View File

@@ -66,20 +66,13 @@ func (n *NSQ) Write(metrics []telegraf.Metric) error {
}
for _, metric := range metrics {
values, err := n.serializer.Serialize(metric)
buf, err := n.serializer.Serialize(metric)
if err != nil {
return err
}
var pubErr error
for _, value := range values {
err = n.producer.Publish(n.Topic, []byte(value))
if err != nil {
pubErr = err
}
}
if pubErr != nil {
err = n.producer.Publish(n.Topic, buf)
if err != nil {
return fmt.Errorf("FAILED to send NSQD message: %s", err)
}
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/testutil"
)
@@ -26,12 +27,12 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
now := time.Now()
tags := make(map[string]string)
pt1, _ := telegraf.NewMetric(
pt1, _ := metric.New(
"test_point_1",
tags,
map[string]interface{}{"value": 0.0},
now)
pt2, _ := telegraf.NewMetric(
pt2, _ := metric.New(
"test_point_2",
tags,
map[string]interface{}{"value": 1.0},
@@ -61,12 +62,12 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
tags = make(map[string]string)
tags["testtag"] = "testvalue"
pt3, _ := telegraf.NewMetric(
pt3, _ := metric.New(
"test_point_3",
tags,
map[string]interface{}{"value": 0.0},
now)
pt4, _ := telegraf.NewMetric(
pt4, _ := metric.New(
"test_point_4",
tags,
map[string]interface{}{"value": 1.0},
@@ -104,7 +105,7 @@ func TestPrometheusExpireOldMetrics(t *testing.T) {
now := time.Now()
tags := make(map[string]string)
pt1, _ := telegraf.NewMetric(
pt1, _ := metric.New(
"test_point_1",
tags,
map[string]interface{}{"value": 0.0},
@@ -116,7 +117,7 @@ func TestPrometheusExpireOldMetrics(t *testing.T) {
m.Expiration = now.Add(time.Duration(-15) * time.Second)
}
pt2, _ := telegraf.NewMetric(
pt2, _ := metric.New(
"test_point_2",
tags,
map[string]interface{}{"value": 1.0},