telegraf/plugins/outputs/instrumental/instrumental.go

214 lines
4.6 KiB
Go

package instrumental
import (
"bytes"
"fmt"
"io"
"log"
"net"
"regexp"
"strings"
"github.com/influxdata/telegraf/plugins"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
)
var (
ValueIncludesBadChar = regexp.MustCompile("[^[:digit:].]")
MetricNameReplacer = regexp.MustCompile("[^-[:alnum:]_.]+")
)
type Instrumental struct {
Host string
ApiToken string
Prefix string
DataFormat string
Template string
Timeout internal.Duration
Debug bool
conn net.Conn
}
const (
DefaultHost = "collector.instrumentalapp.com"
HelloMessage = "hello version go/telegraf/1.1\n"
AuthFormat = "authenticate %s\n"
HandshakeFormat = HelloMessage + AuthFormat
)
var sampleConfig = `
## Project API Token (required)
api_token = "API Token" # required
## Prefix the metrics with a given name
prefix = ""
## Stats output template (Graphite formatting)
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
template = "host.tags.measurement.field"
## Timeout in seconds to connect
timeout = "2s"
## Display Communcation to Instrumental
debug = false
`
func (i *Instrumental) Connect() error {
connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration)
if err != nil {
i.conn = nil
return err
}
err = i.authenticate(connection)
if err != nil {
i.conn = nil
return err
}
return nil
}
func (i *Instrumental) Close() error {
i.conn.Close()
i.conn = nil
return nil
}
func (i *Instrumental) Write(metrics []plugins.Metric) error {
if i.conn == nil {
err := i.Connect()
if err != nil {
return fmt.Errorf("FAILED to (re)connect to Instrumental. Error: %s\n", err)
}
}
s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template)
if err != nil {
return err
}
var points []string
var metricType string
var toSerialize plugins.Metric
var newTags map[string]string
for _, m := range metrics {
// Pull the metric_type out of the metric's tags. We don't want the type
// to show up with the other tags pulled from the system, as they go in the
// beginning of the line instead.
// e.g we want:
//
// increment some_prefix.host.tag1.tag2.tag3.field value timestamp
//
// vs
//
// increment some_prefix.host.tag1.tag2.tag3.counter.field value timestamp
//
newTags = m.Tags()
metricType = newTags["metric_type"]
delete(newTags, "metric_type")
toSerialize, _ = metric.New(
m.Name(),
newTags,
m.Fields(),
m.Time(),
)
buf, err := s.Serialize(toSerialize)
if err != nil {
log.Printf("E! Error serializing a metric to Instrumental: %s", err)
}
switch metricType {
case "counter":
fallthrough
case "histogram":
metricType = "increment"
default:
metricType = "gauge"
}
buffer := bytes.NewBuffer(buf)
for {
line, err := buffer.ReadBytes('\n')
if err != nil {
break
}
stat := string(line)
// decompose "metric.name value time"
splitStat := strings.SplitN(stat, " ", 3)
name := splitStat[0]
value := splitStat[1]
time := splitStat[2]
// replace invalid components of metric name with underscore
clean_metric := MetricNameReplacer.ReplaceAllString(name, "_")
if !ValueIncludesBadChar.MatchString(value) {
points = append(points, fmt.Sprintf("%s %s %s %s", metricType, clean_metric, value, time))
}
}
}
allPoints := strings.Join(points, "")
_, err = fmt.Fprintf(i.conn, allPoints)
if err != nil {
if err == io.EOF {
i.Close()
}
return err
}
// force the connection closed after sending data
// to deal with various disconnection scenarios and eschew holding
// open idle connections en masse
i.Close()
return nil
}
func (i *Instrumental) Description() string {
return "Configuration for sending metrics to an Instrumental project"
}
func (i *Instrumental) SampleConfig() string {
return sampleConfig
}
func (i *Instrumental) authenticate(conn net.Conn) error {
_, err := fmt.Fprintf(conn, HandshakeFormat, i.ApiToken)
if err != nil {
return err
}
// The response here will either be two "ok"s or an error message.
responses := make([]byte, 512)
if _, err = conn.Read(responses); err != nil {
return err
}
if string(responses)[:6] != "ok\nok\n" {
return fmt.Errorf("Authentication failed: %s", responses)
}
i.conn = conn
return nil
}
func init() {
outputs.Add("instrumental", func() plugins.Output {
return &Instrumental{
Host: DefaultHost,
Template: graphite.DEFAULT_TEMPLATE,
}
})
}