Merge remote-tracking branch 'origin/master' into pagerduty

This commit is contained in:
Ranjib Dey
2016-08-21 00:35:56 -07:00
201 changed files with 8242 additions and 2318 deletions

View File

@@ -9,6 +9,8 @@ via raw TCP.
# Configuration for Graphite server to send metrics to
[[outputs.graphite]]
## TCP endpoint for your graphite instance.
## If multiple endpoints are configured, the output will be load balanced.
## Only one of the endpoints will be written to with each iteration.
servers = ["localhost:2003"]
## Prefix metrics name
prefix = ""

View File

@@ -2,7 +2,6 @@ package graphite
import (
"errors"
"fmt"
"log"
"math/rand"
"net"
@@ -25,6 +24,8 @@ type Graphite struct {
var sampleConfig = `
## TCP endpoint for your graphite instance.
## If multiple endpoints are configured, output will be load balanced.
## Only one of the endpoints will be written to with each iteration.
servers = ["localhost:2003"]
## Prefix metrics name
prefix = ""
@@ -96,9 +97,12 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Send data to a random server
p := rand.Perm(len(g.conns))
for _, n := range p {
if _, e := fmt.Fprintf(g.conns[n], graphitePoints); e != nil {
if g.Timeout > 0 {
g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
}
if _, e := g.conns[n].Write([]byte(graphitePoints)); e != nil {
// Error
log.Println("ERROR: " + err.Error())
log.Println("ERROR: " + e.Error())
// Let's try the next one
} else {
// Success

View File

@@ -2,6 +2,42 @@
This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP.
### Configuration:
```toml
# Configuration for influxdb server to send metrics to
[[outputs.influxdb]]
## The full HTTP or UDP endpoint URL for your InfluxDB instance.
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
# urls = ["udp://localhost:8089"] # UDP endpoint example
urls = ["http://localhost:8086"] # required
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp.
retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
## Write timeout (for the InfluxDB client), formatted as a string.
## If not provided, will default to 5s. 0s means no timeout (not recommended).
timeout = "5s"
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Set the user agent for HTTP POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
## Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
# udp_payload = 512
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
```
### Required parameters:
* `urls`: List of strings, this is for InfluxDB clustering
@@ -12,16 +48,14 @@ to write to. Each URL should start with either `http://` or `udp://`
### Optional parameters:
* `write_consistency`: Write consistency (clusters only), can be: "any", "one", "quorum", "all".
* `retention_policy`: Retention policy to write to.
* `precision`: Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h". note: using "s" precision greatly improves InfluxDB compression.
* `timeout`: Write timeout (for the InfluxDB client), formatted as a string. If not provided, will default to 5s. 0s means no timeout (not recommended).
* `username`: Username for influxdb
* `password`: Password for influxdb
* `user_agent`: Set the user agent for HTTP POSTs (can be useful for log differentiation)
* `udp_payload`: Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
## Optional SSL Config
* `ssl_ca`: SSL CA
* `ssl_cert`: SSL CERT
* `ssl_key`: SSL key
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)
* `write_consistency`: Write consistency for clusters only, can be: "any", "one", "quorom", "all"

View File

@@ -24,7 +24,6 @@ type InfluxDB struct {
Password string
Database string
UserAgent string
Precision string
RetentionPolicy string
WriteConsistency string
Timeout internal.Duration
@@ -39,6 +38,9 @@ type InfluxDB struct {
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
// Precision is only here for legacy support. It will be ignored.
Precision string
conns []client.Client
}
@@ -50,13 +52,10 @@ var sampleConfig = `
urls = ["http://localhost:8086"] # required
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h".
## note: using "s" precision greatly improves InfluxDB compression.
precision = "s"
## Retention policy to write to.
retention_policy = "default"
## Write consistency (clusters only), can be: "any", "one", "quorom", "all"
## Retention policy to write to. Empty string writes to the default rp.
retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
## Write timeout (for the InfluxDB client), formatted as a string.
@@ -147,7 +146,7 @@ func (i *InfluxDB) Connect() error {
func createDatabase(c client.Client, database string) error {
// Create Database if it doesn't exist
_, err := c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE IF NOT EXISTS \"%s\"", database),
Command: fmt.Sprintf("CREATE DATABASE \"%s\"", database),
})
return err
}
@@ -184,7 +183,6 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
}
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: i.Database,
Precision: i.Precision,
RetentionPolicy: i.RetentionPolicy,
WriteConsistency: i.WriteConsistency,
})

View File

@@ -28,8 +28,10 @@ type Instrumental struct {
}
const (
DefaultHost = "collector.instrumentalapp.com"
AuthFormat = "hello version go/telegraf/1.0\nauthenticate %s\n"
DefaultHost = "collector.instrumentalapp.com"
HelloMessage = "hello version go/telegraf/1.1\n"
AuthFormat = "authenticate %s\n"
HandshakeFormat = HelloMessage + AuthFormat
)
var (
@@ -52,6 +54,7 @@ var sampleConfig = `
func (i *Instrumental) Connect() error {
connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration)
if err != nil {
i.conn = nil
return err
@@ -151,6 +154,11 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
return err
}
// force the connection closed after sending data
// to deal with various disconnection scenarios and eschew holding
// open idle connections en masse
i.Close()
return nil
}
@@ -163,7 +171,7 @@ func (i *Instrumental) SampleConfig() string {
}
func (i *Instrumental) authenticate(conn net.Conn) error {
_, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken)
_, err := fmt.Fprintf(conn, HandshakeFormat, i.ApiToken)
if err != nil {
return err
}

View File

@@ -24,7 +24,6 @@ func TestWrite(t *testing.T) {
ApiToken: "abc123token",
Prefix: "my.prefix",
}
i.Connect()
// Default to gauge
m1, _ := telegraf.NewMetric(
@@ -40,10 +39,8 @@ func TestWrite(t *testing.T) {
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
// Simulate a connection close and reconnect.
metrics := []telegraf.Metric{m1, m2}
i.Write(metrics)
i.Close()
// Counter and Histogram are increments
m3, _ := telegraf.NewMetric(
@@ -70,7 +67,6 @@ func TestWrite(t *testing.T) {
i.Write(metrics)
wg.Wait()
i.Close()
}
func TCPServer(t *testing.T, wg *sync.WaitGroup) {
@@ -82,10 +78,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tp := textproto.NewReader(reader)
hello, _ := tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello)
assert.Equal(t, "hello version go/telegraf/1.1", hello)
auth, _ := tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth)
conn.Write([]byte("ok\nok\n"))
data1, _ := tp.ReadLine()
@@ -99,10 +94,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tp = textproto.NewReader(reader)
hello, _ = tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello)
assert.Equal(t, "hello version go/telegraf/1.1", hello)
auth, _ = tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth)
conn.Write([]byte("ok\nok\n"))
data3, _ := tp.ReadLine()

View File

@@ -0,0 +1,67 @@
# Kafka Producer Output Plugin
This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.html) acting a Kafka Producer.
```
[[outputs.kafka]]
## URLs of kafka brokers
brokers = ["localhost:9092"]
## Kafka topic for producer messages
topic = "telegraf"
## Telegraf tag to use as a routing key
## ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"
## CompressionCodec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : No compression
## 1 : Gzip compression
## 2 : Snappy compression
compression_codec = 0
## RequiredAcks is used in Produce Requests to tell the broker how many
## replica acknowledgements it must see before responding
## 0 : the producer never waits for an acknowledgement from the broker.
## This option provides the lowest latency but the weakest durability
## guarantees (some data will be lost when a server fails).
## 1 : the producer gets an acknowledgement after the leader replica has
## received the data. This option provides better durability as the
## client waits until the server acknowledges the request as successful
## (only messages that were written to the now-dead leader but not yet
## replicated will be lost).
## -1: the producer gets an acknowledgement after all in-sync replicas have
## received the data. This option provides the best durability, we
## guarantee that no messages will be lost as long as at least one in
## sync replica remains.
required_acks = -1
## The total number of times to retry sending a message
max_retry = 3
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
data_format = "influx"
```
### Required parameters:
* `brokers`: List of strings, this is for speaking to a cluster of `kafka` brokers. On each flush interval, Telegraf will randomly choose one of the urls to write to. Each URL should just include host and port e.g. -> `["{host}:{port}","{host2}:{port2}"]`
* `topic`: The `kafka` topic to publish to.
### Optional parameters:
* `routing_tag`: if this tag exists, it's value will be used as the routing key
* `compression_codec`: What level of compression to use: `0` -> no compression, `1` -> gzip compression, `2` -> snappy compression
* `required_acks`: a setting for how may `acks` required from the `kafka` broker cluster.
* `max_retry`: Max number of times to retry failed write
* `ssl_ca`: SSL CA
* `ssl_cert`: SSL CERT
* `ssl_key`: SSL key
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)
* `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md)

View File

@@ -7,6 +7,7 @@ import (
"io/ioutil"
"log"
"net/http"
"regexp"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
@@ -14,19 +15,22 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/graphite"
)
// Librato structure for configuration and client
type Librato struct {
ApiUser string
ApiToken string
Debug bool
NameFromTags bool
SourceTag string
Timeout internal.Duration
Template string
APIUser string
APIToken string
Debug bool
SourceTag string // Deprecated, keeping for backward-compatibility
Timeout internal.Duration
Template string
apiUrl string
APIUrl string
client *http.Client
}
// https://www.librato.com/docs/kb/faq/best_practices/naming_convention_metrics_sources.html#naming-limitations-for-sources-and-metrics
var reUnacceptedChar = regexp.MustCompile("[^.a-zA-Z0-9_-]")
var sampleConfig = `
## Librator API Docs
## http://dev.librato.com/v1/metrics-authentication
@@ -36,20 +40,21 @@ var sampleConfig = `
api_token = "my-secret-token" # required.
## Debug
# debug = false
## Tag Field to populate source attribute (optional)
## This is typically the _hostname_ from which the metric was obtained.
source_tag = "host"
## Connection timeout.
# timeout = "5s"
## Output Name Template (same as graphite buckets)
## Output source Template (same as graphite buckets)
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
template = "host.tags.measurement.field"
## This template is used in librato's source (not metric's name)
template = "host"
`
// LMetrics is the default struct for Librato's API fromat
type LMetrics struct {
Gauges []*Gauge `json:"gauges"`
}
// Gauge is the gauge format for Librato's API fromat
type Gauge struct {
Name string `json:"name"`
Value float64 `json:"value"`
@@ -57,17 +62,22 @@ type Gauge struct {
MeasureTime int64 `json:"measure_time"`
}
const librato_api = "https://metrics-api.librato.com/v1/metrics"
const libratoAPI = "https://metrics-api.librato.com/v1/metrics"
func NewLibrato(apiUrl string) *Librato {
// NewLibrato is the main constructor for librato output plugins
func NewLibrato(apiURL string) *Librato {
return &Librato{
apiUrl: apiUrl,
APIUrl: apiURL,
Template: "host",
}
}
// Connect is the default output plugin connection function who make sure it
// can connect to the endpoint
func (l *Librato) Connect() error {
if l.ApiUser == "" || l.ApiToken == "" {
return fmt.Errorf("api_user and api_token are required fields for librato output")
if l.APIUser == "" || l.APIToken == "" {
return fmt.Errorf(
"api_user and api_token are required fields for librato output")
}
l.client = &http.Client{
Timeout: l.Timeout.Duration,
@@ -76,18 +86,23 @@ func (l *Librato) Connect() error {
}
func (l *Librato) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
lmetrics := LMetrics{}
if l.Template == "" {
l.Template = "host"
}
if l.SourceTag != "" {
l.Template = l.SourceTag
}
tempGauges := []*Gauge{}
metricCounter := 0
for _, m := range metrics {
if gauges, err := l.buildGauges(m); err == nil {
for _, gauge := range gauges {
tempGauges = append(tempGauges, gauge)
metricCounter++
if l.Debug {
log.Printf("[DEBUG] Got a gauge: %v\n", gauge)
}
@@ -100,82 +115,115 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
}
}
lmetrics.Gauges = make([]*Gauge, metricCounter)
copy(lmetrics.Gauges, tempGauges[0:])
metricsBytes, err := json.Marshal(lmetrics)
if err != nil {
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
} else {
metricCounter := len(tempGauges)
// make sur we send a batch of maximum 300
sizeBatch := 300
for start := 0; start < metricCounter; start += sizeBatch {
lmetrics := LMetrics{}
end := start + sizeBatch
if end > metricCounter {
end = metricCounter
sizeBatch = end - start
}
lmetrics.Gauges = make([]*Gauge, sizeBatch)
copy(lmetrics.Gauges, tempGauges[start:end])
metricsBytes, err := json.Marshal(lmetrics)
if err != nil {
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
}
if l.Debug {
log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes))
}
}
req, err := http.NewRequest("POST", l.apiUrl, bytes.NewBuffer(metricsBytes))
if err != nil {
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
}
req.Header.Add("Content-Type", "application/json")
req.SetBasicAuth(l.ApiUser, l.ApiToken)
resp, err := l.client.Do(req)
if err != nil {
if l.Debug {
log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error())
req, err := http.NewRequest(
"POST",
l.APIUrl,
bytes.NewBuffer(metricsBytes))
if err != nil {
return fmt.Errorf(
"unable to create http.Request, %s\n",
err.Error())
}
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
} else {
if l.Debug {
req.Header.Add("Content-Type", "application/json")
req.SetBasicAuth(l.APIUser, l.APIToken)
resp, err := l.client.Do(req)
if err != nil {
if l.Debug {
log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error())
}
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
}
defer resp.Body.Close()
if resp.StatusCode != 200 || l.Debug {
htmlData, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[DEBUG] Couldn't get response! (%v)\n", err)
} else {
}
if resp.StatusCode != 200 {
return fmt.Errorf(
"received bad status code, %d\n %s",
resp.StatusCode,
string(htmlData))
}
if l.Debug {
log.Printf("[DEBUG] Librato response: %v\n", string(htmlData))
}
}
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
}
return nil
}
// SampleConfig is function who return the default configuration for this
// output
func (l *Librato) SampleConfig() string {
return sampleConfig
}
// Description is function who return the Description of this output
func (l *Librato) Description() string {
return "Configuration for Librato API to send metrics to."
}
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
gauges := []*Gauge{}
serializer := graphite.GraphiteSerializer{Template: l.Template}
bucket := serializer.SerializeBucketName(m.Name(), m.Tags())
if m.Time().Unix() == 0 {
return gauges, fmt.Errorf(
"Measure time must not be zero\n <%s> \n",
m.String())
}
metricSource := graphite.InsertField(
graphite.SerializeBucketName("", m.Tags(), l.Template, ""),
"value")
if metricSource == "" {
return gauges,
fmt.Errorf("undeterminable Source type from Field, %s\n",
l.Template)
}
for fieldName, value := range m.Fields() {
metricName := m.Name()
if fieldName != "value" {
metricName = fmt.Sprintf("%s.%s", m.Name(), fieldName)
}
gauge := &Gauge{
Name: graphite.InsertField(bucket, fieldName),
Source: reUnacceptedChar.ReplaceAllString(metricSource, "-"),
Name: reUnacceptedChar.ReplaceAllString(metricName, "-"),
MeasureTime: m.Time().Unix(),
}
if !gauge.verifyValue(value) {
if !verifyValue(value) {
continue
}
if err := gauge.setValue(value); err != nil {
return gauges, fmt.Errorf("unable to extract value from Fields, %s\n",
return gauges, fmt.Errorf(
"unable to extract value from Fields, %s\n",
err.Error())
}
if l.SourceTag != "" {
if source, ok := m.Tags()[l.SourceTag]; ok {
gauge.Source = source
} else {
return gauges,
fmt.Errorf("undeterminable Source type from Field, %s\n",
l.SourceTag)
}
}
gauges = append(gauges, gauge)
}
if l.Debug {
@@ -184,7 +232,7 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
return gauges, nil
}
func (g *Gauge) verifyValue(v interface{}) bool {
func verifyValue(v interface{}) bool {
switch v.(type) {
case string:
return false
@@ -210,12 +258,13 @@ func (g *Gauge) setValue(v interface{}) error {
return nil
}
//Close is used to close the connection to librato Output
func (l *Librato) Close() error {
return nil
}
func init() {
outputs.Add("librato", func() telegraf.Output {
return NewLibrato(librato_api)
return NewLibrato(libratoAPI)
})
}

View File

@@ -1,7 +1,6 @@
package librato
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
@@ -10,141 +9,137 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
var (
fakeUrl = "http://test.librato.com"
fakeURL = "http://test.librato.com"
fakeUser = "telegraf@influxdb.com"
fakeToken = "123456"
)
func fakeLibrato() *Librato {
l := NewLibrato(fakeUrl)
l.ApiUser = fakeUser
l.ApiToken = fakeToken
l := NewLibrato(fakeURL)
l.APIUser = fakeUser
l.APIToken = fakeToken
return l
}
func BuildTags(t *testing.T) {
testMetric := testutil.TestMetric(0.0, "test1")
graphiteSerializer := graphite.GraphiteSerializer{}
tags, err := graphiteSerializer.Serialize(testMetric)
fmt.Printf("Tags: %v", tags)
require.NoError(t, err)
}
func TestUriOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
ts := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
l := NewLibrato(ts.URL)
l.ApiUser = "telegraf@influxdb.com"
l.ApiToken = "123456"
l.APIUser = "telegraf@influxdb.com"
l.APIToken = "123456"
err := l.Connect()
require.NoError(t, err)
err = l.Write(testutil.MockMetrics())
err = l.Write([]telegraf.Metric{newHostMetric(int32(0), "name", "host")})
require.NoError(t, err)
}
func TestBadStatusCode(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(`{
"errors": {
"system": [
"The API is currently down for maintenance. It'll be back shortly."
]
}
}`)
}))
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
}))
defer ts.Close()
l := NewLibrato(ts.URL)
l.ApiUser = "telegraf@influxdb.com"
l.ApiToken = "123456"
l.APIUser = "telegraf@influxdb.com"
l.APIToken = "123456"
err := l.Connect()
require.NoError(t, err)
err = l.Write(testutil.MockMetrics())
err = l.Write([]telegraf.Metric{newHostMetric(int32(0), "name", "host")})
if err == nil {
t.Errorf("error expected but none returned")
} else {
require.EqualError(t, fmt.Errorf("received bad status code, 503\n"), err.Error())
require.EqualError(
t,
fmt.Errorf("received bad status code, 503\n "), err.Error())
}
}
func TestBuildGauge(t *testing.T) {
mtime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()
var gaugeTests = []struct {
ptIn telegraf.Metric
outGauge *Gauge
err error
}{
{
testutil.TestMetric(0.0, "test1"),
newHostMetric(0.0, "test1", "host1"),
&Gauge{
Name: "value1.test1",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test1",
MeasureTime: mtime,
Value: 0.0,
Source: "host1",
},
nil,
},
{
testutil.TestMetric(1.0, "test2"),
newHostMetric(1.0, "test2", "host2"),
&Gauge{
Name: "value1.test2",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test2",
MeasureTime: mtime,
Value: 1.0,
Source: "host2",
},
nil,
},
{
testutil.TestMetric(10, "test3"),
newHostMetric(10, "test3", "host3"),
&Gauge{
Name: "value1.test3",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test3",
MeasureTime: mtime,
Value: 10.0,
Source: "host3",
},
nil,
},
{
testutil.TestMetric(int32(112345), "test4"),
newHostMetric(int32(112345), "test4", "host4"),
&Gauge{
Name: "value1.test4",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test4",
MeasureTime: mtime,
Value: 112345.0,
Source: "host4",
},
nil,
},
{
testutil.TestMetric(int64(112345), "test5"),
newHostMetric(int64(112345), "test5", "host5"),
&Gauge{
Name: "value1.test5",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test5",
MeasureTime: mtime,
Value: 112345.0,
Source: "host5",
},
nil,
},
{
testutil.TestMetric(float32(11234.5), "test6"),
newHostMetric(float32(11234.5), "test6", "host6"),
&Gauge{
Name: "value1.test6",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test6",
MeasureTime: mtime,
Value: 11234.5,
Source: "host6",
},
nil,
},
{
testutil.TestMetric("11234.5", "test7"),
newHostMetric("11234.5", "test7", "host7"),
nil,
nil,
},
}
l := NewLibrato(fakeUrl)
l := NewLibrato(fakeURL)
for _, gt := range gaugeTests {
gauges, err := l.buildGauges(gt.ptIn)
if err != nil && gt.err == nil {
@@ -167,61 +162,121 @@ func TestBuildGauge(t *testing.T) {
}
}
func newHostMetric(value interface{}, name, host string) (metric telegraf.Metric) {
metric, _ = telegraf.NewMetric(
name,
map[string]string{"host": host},
map[string]interface{}{"value": value},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
)
return
}
func TestBuildGaugeWithSource(t *testing.T) {
mtime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
pt1, _ := telegraf.NewMetric(
"test1",
map[string]string{"hostname": "192.168.0.1", "tag1": "value1"},
map[string]interface{}{"value": 0.0},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
mtime,
)
pt2, _ := telegraf.NewMetric(
"test2",
map[string]string{"hostnam": "192.168.0.1", "tag1": "value1"},
map[string]interface{}{"value": 1.0},
time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC),
mtime,
)
pt3, _ := telegraf.NewMetric(
"test3",
map[string]string{
"hostname": "192.168.0.1",
"tag2": "value2",
"tag1": "value1"},
map[string]interface{}{"value": 1.0},
mtime,
)
pt4, _ := telegraf.NewMetric(
"test4",
map[string]string{
"hostname": "192.168.0.1",
"tag2": "value2",
"tag1": "value1"},
map[string]interface{}{"value": 1.0},
mtime,
)
var gaugeTests = []struct {
ptIn telegraf.Metric
template string
outGauge *Gauge
err error
}{
{
pt1,
"hostname",
&Gauge{
Name: "192_168_0_1.value1.test1",
MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test1",
MeasureTime: mtime.Unix(),
Value: 0.0,
Source: "192.168.0.1",
Source: "192_168_0_1",
},
nil,
},
{
pt2,
"hostname",
&Gauge{
Name: "192_168_0_1.value1.test1",
MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(),
Name: "test2",
MeasureTime: mtime.Unix(),
Value: 1.0,
},
fmt.Errorf("undeterminable Source type from Field, hostname"),
},
{
pt3,
"tags",
&Gauge{
Name: "test3",
MeasureTime: mtime.Unix(),
Value: 1.0,
Source: "192_168_0_1.value1.value2",
},
nil,
},
{
pt4,
"hostname.tag2",
&Gauge{
Name: "test4",
MeasureTime: mtime.Unix(),
Value: 1.0,
Source: "192_168_0_1.value2",
},
nil,
},
}
l := NewLibrato(fakeUrl)
l.SourceTag = "hostname"
l := NewLibrato(fakeURL)
for _, gt := range gaugeTests {
l.Template = gt.template
gauges, err := l.buildGauges(gt.ptIn)
if err != nil && gt.err == nil {
t.Errorf("%s: unexpected error, %+v\n", gt.ptIn.Name(), err)
}
if gt.err != nil && err == nil {
t.Errorf("%s: expected an error (%s) but none returned", gt.ptIn.Name(), gt.err.Error())
t.Errorf(
"%s: expected an error (%s) but none returned",
gt.ptIn.Name(),
gt.err.Error())
}
if len(gauges) == 0 {
continue
}
if gt.err == nil && !reflect.DeepEqual(gauges[0], gt.outGauge) {
t.Errorf("%s: \nexpected %+v\ngot %+v\n", gt.ptIn.Name(), gt.outGauge, gauges[0])
t.Errorf(
"%s: \nexpected %+v\ngot %+v\n",
gt.ptIn.Name(),
gt.outGauge, gauges[0])
}
}
}

View File

@@ -3,9 +3,8 @@ package opentsdb
import (
"reflect"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
// "github.com/influxdata/telegraf/testutil"
// "github.com/stretchr/testify/require"
)
func TestBuildTagsTelnet(t *testing.T) {
@@ -42,40 +41,40 @@ func TestBuildTagsTelnet(t *testing.T) {
}
}
func TestWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// func TestWrite(t *testing.T) {
// if testing.Short() {
// t.Skip("Skipping integration test in short mode")
// }
o := &OpenTSDB{
Host: testutil.GetLocalHost(),
Port: 4242,
Prefix: "prefix.test.",
}
// o := &OpenTSDB{
// Host: testutil.GetLocalHost(),
// Port: 4242,
// Prefix: "prefix.test.",
// }
// Verify that we can connect to the OpenTSDB instance
err := o.Connect()
require.NoError(t, err)
// // Verify that we can connect to the OpenTSDB instance
// err := o.Connect()
// require.NoError(t, err)
// Verify that we can successfully write data to OpenTSDB
err = o.Write(testutil.MockMetrics())
require.NoError(t, err)
// // Verify that we can successfully write data to OpenTSDB
// err = o.Write(testutil.MockMetrics())
// require.NoError(t, err)
// Verify postive and negative test cases of writing data
metrics := testutil.MockMetrics()
metrics = append(metrics, testutil.TestMetric(float64(1.0),
"justametric.float"))
metrics = append(metrics, testutil.TestMetric(int64(123456789),
"justametric.int"))
metrics = append(metrics, testutil.TestMetric(uint64(123456789012345),
"justametric.uint"))
metrics = append(metrics, testutil.TestMetric("Lorem Ipsum",
"justametric.string"))
metrics = append(metrics, testutil.TestMetric(float64(42.0),
"justametric.anotherfloat"))
metrics = append(metrics, testutil.TestMetric(float64(42.0),
"metric w/ specialchars"))
// // Verify postive and negative test cases of writing data
// metrics := testutil.MockMetrics()
// metrics = append(metrics, testutil.TestMetric(float64(1.0),
// "justametric.float"))
// metrics = append(metrics, testutil.TestMetric(int64(123456789),
// "justametric.int"))
// metrics = append(metrics, testutil.TestMetric(uint64(123456789012345),
// "justametric.uint"))
// metrics = append(metrics, testutil.TestMetric("Lorem Ipsum",
// "justametric.string"))
// metrics = append(metrics, testutil.TestMetric(float64(42.0),
// "justametric.anotherfloat"))
// metrics = append(metrics, testutil.TestMetric(float64(42.0),
// "metric w/ specialchars"))
err = o.Write(metrics)
require.NoError(t, err)
}
// err = o.Write(metrics)
// require.NoError(t, err)
// }

View File

@@ -5,28 +5,21 @@ import (
"log"
"net/http"
"regexp"
"strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/prometheus/client_golang/prometheus"
)
var (
sanitizedChars = strings.NewReplacer("/", "_", "@", "_", " ", "_", "-", "_", ".", "_")
// Prometheus metric names must match this regex
// see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
metricName = regexp.MustCompile("^[a-zA-Z_:][a-zA-Z0-9_:]*$")
// Prometheus labels must match this regex
// see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
labelName = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$")
)
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
type PrometheusClient struct {
Listen string
metrics map[string]*prometheus.UntypedVec
Listen string
metrics map[string]prometheus.Metric
sync.Mutex
}
var sampleConfig = `
@@ -35,6 +28,15 @@ var sampleConfig = `
`
func (p *PrometheusClient) Start() error {
prometheus.MustRegister(p)
defer func() {
if r := recover(); r != nil {
// recovering from panic here because there is no way to stop a
// running http go server except by a kill signal. Since the server
// does not stop on SIGHUP, Start() will panic when the process
// is reloaded.
}
}()
if p.Listen == "" {
p.Listen = "localhost:9126"
}
@@ -44,7 +46,6 @@ func (p *PrometheusClient) Start() error {
Addr: p.Listen,
}
p.metrics = make(map[string]*prometheus.UntypedVec)
go server.ListenAndServe()
return nil
}
@@ -72,25 +73,42 @@ func (p *PrometheusClient) Description() string {
return "Configuration for the Prometheus client to spawn"
}
// Implements prometheus.Collector
func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
}
// Implements prometheus.Collector
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
p.Lock()
defer p.Unlock()
for _, m := range p.metrics {
ch <- m
}
}
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
p.Lock()
defer p.Unlock()
p.metrics = make(map[string]prometheus.Metric)
if len(metrics) == 0 {
return nil
}
for _, point := range metrics {
key := point.Name()
key = sanitizedChars.Replace(key)
key = invalidNameCharRE.ReplaceAllString(key, "_")
var labels []string
l := prometheus.Labels{}
for k, v := range point.Tags() {
k = sanitizedChars.Replace(k)
k = invalidNameCharRE.ReplaceAllString(k, "_")
if len(k) == 0 {
continue
}
if !labelName.MatchString(k) {
continue
}
labels = append(labels, k)
l[k] = v
}
@@ -105,7 +123,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
}
// sanitize the measurement name
n = sanitizedChars.Replace(n)
n = invalidNameCharRE.ReplaceAllString(n, "_")
var mname string
if n == "value" {
mname = key
@@ -113,48 +131,23 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
mname = fmt.Sprintf("%s_%s", key, n)
}
// verify that it is a valid measurement name
if !metricName.MatchString(mname) {
continue
}
// Create a new metric if it hasn't been created yet.
if _, ok := p.metrics[mname]; !ok {
p.metrics[mname] = prometheus.NewUntypedVec(
prometheus.UntypedOpts{
Name: mname,
Help: "Telegraf collected metric",
},
labels,
)
if err := prometheus.Register(p.metrics[mname]); err != nil {
log.Printf("prometheus_client: Metric failed to register with prometheus, %s", err)
continue
}
}
desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
var metric prometheus.Metric
var err error
switch val := val.(type) {
case int64:
m, err := p.metrics[mname].GetMetricWith(l)
if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
continue
}
m.Set(float64(val))
metric, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, float64(val))
case float64:
m, err := p.metrics[mname].GetMetricWith(l)
if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
continue
}
m.Set(val)
metric, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, val)
default:
continue
}
if err != nil {
log.Printf("ERROR creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
}
p.metrics[desc.String()] = metric
}
}
return nil

View File

@@ -17,6 +17,7 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
now := time.Now()
pTesting = &PrometheusClient{Listen: "localhost:9127"}
err := pTesting.Start()
time.Sleep(time.Millisecond * 200)
@@ -30,11 +31,13 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
pt1, _ := telegraf.NewMetric(
"test_point_1",
tags,
map[string]interface{}{"value": 0.0})
map[string]interface{}{"value": 0.0},
now)
pt2, _ := telegraf.NewMetric(
"test_point_2",
tags,
map[string]interface{}{"value": 1.0})
map[string]interface{}{"value": 1.0},
now)
var metrics = []telegraf.Metric{
pt1,
pt2,
@@ -63,11 +66,13 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
pt3, _ := telegraf.NewMetric(
"test_point_3",
tags,
map[string]interface{}{"value": 0.0})
map[string]interface{}{"value": 0.0},
now)
pt4, _ := telegraf.NewMetric(
"test_point_4",
tags,
map[string]interface{}{"value": 1.0})
map[string]interface{}{"value": 1.0},
now)
metrics = []telegraf.Metric{
pt3,
pt4,