This commit is contained in:
Aurélien Hébert
2016-07-25 09:49:25 +02:00
87 changed files with 2484 additions and 1318 deletions

View File

@@ -9,6 +9,8 @@ via raw TCP.
# Configuration for Graphite server to send metrics to
[[outputs.graphite]]
## TCP endpoint for your graphite instance.
## If multiple endpoints are configured, the output will be load balanced.
## Only one of the endpoints will be written to with each iteration.
servers = ["localhost:2003"]
## Prefix metrics name
prefix = ""

View File

@@ -2,7 +2,6 @@ package graphite
import (
"errors"
"fmt"
"log"
"math/rand"
"net"
@@ -25,6 +24,8 @@ type Graphite struct {
var sampleConfig = `
## TCP endpoint for your graphite instance.
## If multiple endpoints are configured, output will be load balanced.
## Only one of the endpoints will be written to with each iteration.
servers = ["localhost:2003"]
## Prefix metrics name
prefix = ""
@@ -96,9 +97,12 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Send data to a random server
p := rand.Perm(len(g.conns))
for _, n := range p {
if _, e := fmt.Fprint(g.conns[n], graphitePoints); e != nil {
if g.Timeout > 0 {
g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
}
if _, e := g.conns[n].Write([]byte(graphitePoints)); e != nil {
// Error
log.Println("ERROR: " + err.Error())
log.Println("ERROR: " + e.Error())
// Let's try the next one
} else {
// Success

View File

@@ -28,8 +28,10 @@ type Instrumental struct {
}
const (
DefaultHost = "collector.instrumentalapp.com"
AuthFormat = "hello version go/telegraf/1.0\nauthenticate %s\n"
DefaultHost = "collector.instrumentalapp.com"
HelloMessage = "hello version go/telegraf/1.1\n"
AuthFormat = "authenticate %s\n"
HandshakeFormat = HelloMessage + AuthFormat
)
var (
@@ -52,6 +54,7 @@ var sampleConfig = `
func (i *Instrumental) Connect() error {
connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration)
if err != nil {
i.conn = nil
return err
@@ -151,6 +154,11 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
return err
}
// force the connection closed after sending data
// to deal with various disconnection scenarios and eschew holding
// open idle connections en masse
i.Close()
return nil
}
@@ -163,7 +171,7 @@ func (i *Instrumental) SampleConfig() string {
}
func (i *Instrumental) authenticate(conn net.Conn) error {
_, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken)
_, err := fmt.Fprintf(conn, HandshakeFormat, i.ApiToken)
if err != nil {
return err
}

View File

@@ -24,7 +24,6 @@ func TestWrite(t *testing.T) {
ApiToken: "abc123token",
Prefix: "my.prefix",
}
i.Connect()
// Default to gauge
m1, _ := telegraf.NewMetric(
@@ -40,10 +39,8 @@ func TestWrite(t *testing.T) {
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
// Simulate a connection close and reconnect.
metrics := []telegraf.Metric{m1, m2}
i.Write(metrics)
i.Close()
// Counter and Histogram are increments
m3, _ := telegraf.NewMetric(
@@ -70,7 +67,6 @@ func TestWrite(t *testing.T) {
i.Write(metrics)
wg.Wait()
i.Close()
}
func TCPServer(t *testing.T, wg *sync.WaitGroup) {
@@ -82,10 +78,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tp := textproto.NewReader(reader)
hello, _ := tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello)
assert.Equal(t, "hello version go/telegraf/1.1", hello)
auth, _ := tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth)
conn.Write([]byte("ok\nok\n"))
data1, _ := tp.ReadLine()
@@ -99,10 +94,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tp = textproto.NewReader(reader)
hello, _ = tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello)
assert.Equal(t, "hello version go/telegraf/1.1", hello)
auth, _ = tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth)
conn.Write([]byte("ok\nok\n"))
data3, _ := tp.ReadLine()

View File

@@ -153,8 +153,7 @@ func (l *Librato) Description() string {
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
gauges := []*Gauge{}
serializer := graphite.GraphiteSerializer{Template: l.Template}
bucket := serializer.SerializeBucketName(m.Name(), m.Tags())
bucket := graphite.SerializeBucketName(m.Name(), m.Tags(), l.Template, "")
for fieldName, value := range m.Fields() {
gauge := &Gauge{
Name: graphite.InsertField(bucket, fieldName),

View File

@@ -5,27 +5,21 @@ import (
"log"
"net/http"
"regexp"
"strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/prometheus/client_golang/prometheus"
)
var (
sanitizedChars = strings.NewReplacer("/", "_", "@", "_", " ", "_", "-", "_", ".", "_")
// Prometheus metric names must match this regex
// see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
metricName = regexp.MustCompile("^[a-zA-Z_:][a-zA-Z0-9_:]*$")
// Prometheus labels must match this regex
// see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
labelName = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$")
)
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
type PrometheusClient struct {
Listen string
metrics map[string]prometheus.Metric
sync.Mutex
}
var sampleConfig = `
@@ -34,6 +28,7 @@ var sampleConfig = `
`
func (p *PrometheusClient) Start() error {
prometheus.MustRegister(p)
defer func() {
if r := recover(); r != nil {
// recovering from panic here because there is no way to stop a
@@ -78,25 +73,42 @@ func (p *PrometheusClient) Description() string {
return "Configuration for the Prometheus client to spawn"
}
// Implements prometheus.Collector
func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
}
// Implements prometheus.Collector
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
p.Lock()
defer p.Unlock()
for _, m := range p.metrics {
ch <- m
}
}
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
p.Lock()
defer p.Unlock()
p.metrics = make(map[string]prometheus.Metric)
if len(metrics) == 0 {
return nil
}
for _, point := range metrics {
key := point.Name()
key = sanitizedChars.Replace(key)
key = invalidNameCharRE.ReplaceAllString(key, "_")
var labels []string
l := prometheus.Labels{}
for k, v := range point.Tags() {
k = sanitizedChars.Replace(k)
k = invalidNameCharRE.ReplaceAllString(k, "_")
if len(k) == 0 {
continue
}
if !labelName.MatchString(k) {
continue
}
labels = append(labels, k)
l[k] = v
}
@@ -111,7 +123,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
}
// sanitize the measurement name
n = sanitizedChars.Replace(n)
n = invalidNameCharRE.ReplaceAllString(n, "_")
var mname string
if n == "value" {
mname = key
@@ -119,50 +131,23 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
mname = fmt.Sprintf("%s_%s", key, n)
}
// verify that it is a valid measurement name
if !metricName.MatchString(mname) {
continue
}
mVec := prometheus.NewUntypedVec(
prometheus.UntypedOpts{
Name: mname,
Help: "Telegraf collected metric",
},
labels,
)
collector, err := prometheus.RegisterOrGet(mVec)
if err != nil {
log.Printf("prometheus_client: Metric failed to register with prometheus, %s", err)
continue
}
mVec, ok := collector.(*prometheus.UntypedVec)
if !ok {
continue
}
desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
var metric prometheus.Metric
var err error
switch val := val.(type) {
case int64:
m, err := mVec.GetMetricWith(l)
if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
continue
}
m.Set(float64(val))
metric, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, float64(val))
case float64:
m, err := mVec.GetMetricWith(l)
if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
continue
}
m.Set(val)
metric, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, val)
default:
continue
}
if err != nil {
log.Printf("ERROR creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
}
p.metrics[desc.String()] = metric
}
}
return nil