Output stats to the Instrumental TCP Collector
This commit is contained in:
parent
8371546a66
commit
1f417bcd67
|
@ -8,6 +8,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/outputs/file"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/influxdb"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/instrumental"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/kafka"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/kinesis"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/librato"
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
# Instrumental Output Plugin
|
||||
|
||||
This plugin writes to the [Instrumental Collector API](https://instrumentalapp.com/docs/tcp-collector)
|
||||
and requires a Project-specific API token.
|
||||
|
||||
Instrumental accepts stats in a format very close to Graphite, with the only difference being that
|
||||
the type of stat (gauge, increment) is the first token, separated from the metric itself
|
||||
by whitespace. The `increment` type is only used if the metric comes in as a counter through `[[input.statsd]]`.
|
||||
|
||||
## Configuration:
|
||||
|
||||
```toml
|
||||
[[outputs.instrumental]]
|
||||
## Project API Token (required)
|
||||
api_token = "API Token" # required
|
||||
## Prefix the metrics with a given name
|
||||
prefix = ""
|
||||
## Stats output template (Graphite formatting)
|
||||
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
|
||||
template = "host.tags.measurement.field"
|
||||
## Timeout in seconds to connect
|
||||
timeout = "2s"
|
||||
## Debug true - Print communcation to Instrumental
|
||||
debug = false
|
||||
```
|
|
@ -0,0 +1,192 @@
|
|||
package instrumental
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf/plugins/serializers"
|
||||
"github.com/influxdata/telegraf/plugins/serializers/graphite"
|
||||
)
|
||||
|
||||
type Instrumental struct {
|
||||
Host string
|
||||
ApiToken string
|
||||
Prefix string
|
||||
DataFormat string
|
||||
Template string
|
||||
Timeout internal.Duration
|
||||
Debug bool
|
||||
|
||||
conn net.Conn
|
||||
}
|
||||
|
||||
const (
|
||||
DefaultHost = "collector.instrumentalapp.com"
|
||||
AuthFormat = "hello version go/telegraf/1.0\nauthenticate %s\n"
|
||||
)
|
||||
|
||||
var (
|
||||
StatIncludesBadChar = regexp.MustCompile("[^[:alnum:][:blank:]-_.]")
|
||||
)
|
||||
|
||||
var sampleConfig = `
|
||||
## Project API Token (required)
|
||||
api_token = "API Token" # required
|
||||
## Prefix the metrics with a given name
|
||||
prefix = ""
|
||||
## Stats output template (Graphite formatting)
|
||||
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
|
||||
template = "host.tags.measurement.field"
|
||||
## Timeout in seconds to connect
|
||||
timeout = "2s"
|
||||
## Display Communcation to Instrumental
|
||||
debug = false
|
||||
`
|
||||
|
||||
func (i *Instrumental) Connect() error {
|
||||
connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration)
|
||||
if err != nil {
|
||||
i.conn = nil
|
||||
return err
|
||||
}
|
||||
|
||||
err = i.authenticate(connection)
|
||||
if err != nil {
|
||||
i.conn = nil
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Instrumental) Close() error {
|
||||
i.conn.Close()
|
||||
i.conn = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Instrumental) Write(metrics []telegraf.Metric) error {
|
||||
if i.conn == nil {
|
||||
err := i.Connect()
|
||||
if err != nil {
|
||||
return fmt.Errorf("FAILED to (re)connect to Instrumental. Error: %s\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var points []string
|
||||
var metricType string
|
||||
var toSerialize telegraf.Metric
|
||||
var newTags map[string]string
|
||||
|
||||
for _, metric := range metrics {
|
||||
// Pull the metric_type out of the metric's tags. We don't want the type
|
||||
// to show up with the other tags pulled from the system, as they go in the
|
||||
// beginning of the line instead.
|
||||
// e.g we want:
|
||||
//
|
||||
// increment some_prefix.host.tag1.tag2.tag3.field value timestamp
|
||||
//
|
||||
// vs
|
||||
//
|
||||
// increment some_prefix.host.tag1.tag2.tag3.counter.field value timestamp
|
||||
//
|
||||
newTags = metric.Tags()
|
||||
metricType = newTags["metric_type"]
|
||||
delete(newTags, "metric_type")
|
||||
|
||||
toSerialize, _ = telegraf.NewMetric(
|
||||
metric.Name(),
|
||||
newTags,
|
||||
metric.Fields(),
|
||||
metric.Time(),
|
||||
)
|
||||
|
||||
stats, err := s.Serialize(toSerialize)
|
||||
if err != nil {
|
||||
log.Printf("Error serializing a metric to Instrumental: %s", err)
|
||||
}
|
||||
|
||||
switch metricType {
|
||||
case "counter":
|
||||
fallthrough
|
||||
case "histogram":
|
||||
metricType = "increment"
|
||||
default:
|
||||
metricType = "gauge"
|
||||
}
|
||||
|
||||
for _, stat := range stats {
|
||||
if !StatIncludesBadChar.MatchString(stat) {
|
||||
points = append(points, fmt.Sprintf("%s %s", metricType, stat))
|
||||
} else if i.Debug {
|
||||
log.Printf("Unable to send bad stat: %s", stat)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
allPoints := strings.Join(points, "\n") + "\n"
|
||||
_, err = fmt.Fprintf(i.conn, allPoints)
|
||||
|
||||
if i.Debug {
|
||||
log.Println(allPoints)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
i.Close()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Instrumental) Description() string {
|
||||
return "Configuration for sending metrics to an Instrumental project"
|
||||
}
|
||||
|
||||
func (i *Instrumental) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (i *Instrumental) authenticate(conn net.Conn) error {
|
||||
_, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// The response here will either be two "ok"s or an error message.
|
||||
responses := make([]byte, 512)
|
||||
if _, err = conn.Read(responses); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if string(responses)[:6] != "ok\nok\n" {
|
||||
return fmt.Errorf("Authentication failed: %s", responses)
|
||||
}
|
||||
|
||||
i.conn = conn
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("instrumental", func() telegraf.Output {
|
||||
return &Instrumental{
|
||||
Host: DefaultHost,
|
||||
Template: graphite.DEFAULT_TEMPLATE,
|
||||
}
|
||||
})
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
package instrumental
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"net"
|
||||
"net/textproto"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestWrite(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go TCPServer(t, &wg)
|
||||
// Give the fake TCP server some time to start:
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
|
||||
i := Instrumental{
|
||||
Host: "127.0.0.1",
|
||||
ApiToken: "abc123token",
|
||||
Prefix: "my.prefix",
|
||||
}
|
||||
i.Connect()
|
||||
|
||||
// Default to gauge
|
||||
m1, _ := telegraf.NewMetric(
|
||||
"mymeasurement",
|
||||
map[string]string{"host": "192.168.0.1"},
|
||||
map[string]interface{}{"myfield": float64(3.14)},
|
||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
m2, _ := telegraf.NewMetric(
|
||||
"mymeasurement",
|
||||
map[string]string{"host": "192.168.0.1", "metric_type": "set"},
|
||||
map[string]interface{}{"value": float64(3.14)},
|
||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
|
||||
// Simulate a connection close and reconnect.
|
||||
metrics := []telegraf.Metric{m1, m2}
|
||||
i.Write(metrics)
|
||||
i.Close()
|
||||
|
||||
// Counter and Histogram are increments
|
||||
m3, _ := telegraf.NewMetric(
|
||||
"my_histogram",
|
||||
map[string]string{"host": "192.168.0.1", "metric_type": "histogram"},
|
||||
map[string]interface{}{"value": float64(3.14)},
|
||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
// We will drop metrics that simply won't be accepted by Instrumental
|
||||
m4, _ := telegraf.NewMetric(
|
||||
"bad_values",
|
||||
map[string]string{"host": "192.168.0.1", "metric_type": "counter"},
|
||||
map[string]interface{}{"value": "\" 3:30\""},
|
||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
m5, _ := telegraf.NewMetric(
|
||||
"my_counter",
|
||||
map[string]string{"host": "192.168.0.1", "metric_type": "counter"},
|
||||
map[string]interface{}{"value": float64(3.14)},
|
||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
|
||||
metrics = []telegraf.Metric{m3, m4, m5}
|
||||
i.Write(metrics)
|
||||
|
||||
wg.Wait()
|
||||
i.Close()
|
||||
}
|
||||
|
||||
func TCPServer(t *testing.T, wg *sync.WaitGroup) {
|
||||
tcpServer, _ := net.Listen("tcp", "127.0.0.1:8000")
|
||||
defer wg.Done()
|
||||
conn, _ := tcpServer.Accept()
|
||||
conn.SetDeadline(time.Now().Add(1 * time.Second))
|
||||
reader := bufio.NewReader(conn)
|
||||
tp := textproto.NewReader(reader)
|
||||
|
||||
hello, _ := tp.ReadLine()
|
||||
assert.Equal(t, "hello version go/telegraf/1.0", hello)
|
||||
auth, _ := tp.ReadLine()
|
||||
assert.Equal(t, "authenticate abc123token", auth)
|
||||
|
||||
conn.Write([]byte("ok\nok\n"))
|
||||
|
||||
data1, _ := tp.ReadLine()
|
||||
assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
|
||||
data2, _ := tp.ReadLine()
|
||||
assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
|
||||
|
||||
conn, _ = tcpServer.Accept()
|
||||
conn.SetDeadline(time.Now().Add(1 * time.Second))
|
||||
reader = bufio.NewReader(conn)
|
||||
tp = textproto.NewReader(reader)
|
||||
|
||||
hello, _ = tp.ReadLine()
|
||||
assert.Equal(t, "hello version go/telegraf/1.0", hello)
|
||||
auth, _ = tp.ReadLine()
|
||||
assert.Equal(t, "authenticate abc123token", auth)
|
||||
|
||||
conn.Write([]byte("ok\nok\n"))
|
||||
|
||||
data3, _ := tp.ReadLine()
|
||||
assert.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3)
|
||||
data4, _ := tp.ReadLine()
|
||||
assert.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data4)
|
||||
|
||||
conn.Close()
|
||||
}
|
Loading…
Reference in New Issue