parent
							
								
									be7ca56872
								
							
						
					
					
						commit
						f32916a5bd
					
				|  | @ -3,6 +3,7 @@ | ||||||
| ### Features | ### Features | ||||||
| 
 | 
 | ||||||
| - [#1138](https://github.com/influxdata/telegraf/pull/1138): nstat input plugin. Thanks @Maksadbek! | - [#1138](https://github.com/influxdata/telegraf/pull/1138): nstat input plugin. Thanks @Maksadbek! | ||||||
|  | - [#1139](https://github.com/influxdata/telegraf/pull/1139): instrumental output plugin. Thanks @jasonroelofs! | ||||||
| 
 | 
 | ||||||
| ### Bugfixes | ### Bugfixes | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -245,6 +245,7 @@ want to add support for another service or third-party API. | ||||||
| * [datadog](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/datadog) | * [datadog](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/datadog) | ||||||
| * [file](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/file) | * [file](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/file) | ||||||
| * [graphite](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/graphite) | * [graphite](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/graphite) | ||||||
|  | * [instrumental](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/instrumental) | ||||||
| * [kafka](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/kafka) | * [kafka](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/kafka) | ||||||
| * [librato](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/librato) | * [librato](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/librato) | ||||||
| * [mqtt](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/mqtt) | * [mqtt](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/mqtt) | ||||||
|  |  | ||||||
|  | @ -8,6 +8,7 @@ import ( | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/file" | 	_ "github.com/influxdata/telegraf/plugins/outputs/file" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/graphite" | 	_ "github.com/influxdata/telegraf/plugins/outputs/graphite" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/influxdb" | 	_ "github.com/influxdata/telegraf/plugins/outputs/influxdb" | ||||||
|  | 	_ "github.com/influxdata/telegraf/plugins/outputs/instrumental" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/kafka" | 	_ "github.com/influxdata/telegraf/plugins/outputs/kafka" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/kinesis" | 	_ "github.com/influxdata/telegraf/plugins/outputs/kinesis" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/librato" | 	_ "github.com/influxdata/telegraf/plugins/outputs/librato" | ||||||
|  |  | ||||||
|  | @ -0,0 +1,25 @@ | ||||||
|  | # Instrumental Output Plugin | ||||||
|  | 
 | ||||||
|  | This plugin writes to the [Instrumental Collector API](https://instrumentalapp.com/docs/tcp-collector) | ||||||
|  | and requires a Project-specific API token. | ||||||
|  | 
 | ||||||
|  | Instrumental accepts stats in a format very close to Graphite, with the only difference being that | ||||||
|  | the type of stat (gauge, increment) is the first token, separated from the metric itself | ||||||
|  | by whitespace. The `increment` type is only used if the metric comes in as a counter through `[[input.statsd]]`. | ||||||
|  | 
 | ||||||
|  | ## Configuration: | ||||||
|  | 
 | ||||||
|  | ```toml | ||||||
|  | [[outputs.instrumental]] | ||||||
|  |   ## Project API Token (required) | ||||||
|  |   api_token = "API Token"  # required | ||||||
|  |   ## Prefix the metrics with a given name | ||||||
|  |   prefix = "" | ||||||
|  |   ## Stats output template (Graphite formatting) | ||||||
|  |   ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite | ||||||
|  |   template = "host.tags.measurement.field" | ||||||
|  |   ## Timeout in seconds to connect | ||||||
|  |   timeout = "2s" | ||||||
|  |   ## Debug true - Print communcation to Instrumental | ||||||
|  |   debug = false | ||||||
|  | ``` | ||||||
|  | @ -0,0 +1,192 @@ | ||||||
|  | package instrumental | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"log" | ||||||
|  | 	"net" | ||||||
|  | 	"regexp" | ||||||
|  | 	"strings" | ||||||
|  | 
 | ||||||
|  | 	"github.com/influxdata/telegraf" | ||||||
|  | 	"github.com/influxdata/telegraf/internal" | ||||||
|  | 	"github.com/influxdata/telegraf/plugins/outputs" | ||||||
|  | 	"github.com/influxdata/telegraf/plugins/serializers" | ||||||
|  | 	"github.com/influxdata/telegraf/plugins/serializers/graphite" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type Instrumental struct { | ||||||
|  | 	Host       string | ||||||
|  | 	ApiToken   string | ||||||
|  | 	Prefix     string | ||||||
|  | 	DataFormat string | ||||||
|  | 	Template   string | ||||||
|  | 	Timeout    internal.Duration | ||||||
|  | 	Debug      bool | ||||||
|  | 
 | ||||||
|  | 	conn net.Conn | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	DefaultHost = "collector.instrumentalapp.com" | ||||||
|  | 	AuthFormat  = "hello version go/telegraf/1.0\nauthenticate %s\n" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | var ( | ||||||
|  | 	StatIncludesBadChar = regexp.MustCompile("[^[:alnum:][:blank:]-_.]") | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | var sampleConfig = ` | ||||||
|  |   ## Project API Token (required) | ||||||
|  |   api_token = "API Token" # required | ||||||
|  |   ## Prefix the metrics with a given name | ||||||
|  |   prefix = "" | ||||||
|  |   ## Stats output template (Graphite formatting) | ||||||
|  |   ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
 | ||||||
|  |   template = "host.tags.measurement.field" | ||||||
|  |   ## Timeout in seconds to connect | ||||||
|  |   timeout = "2s" | ||||||
|  |   ## Display Communcation to Instrumental | ||||||
|  |   debug = false | ||||||
|  | ` | ||||||
|  | 
 | ||||||
|  | func (i *Instrumental) Connect() error { | ||||||
|  | 	connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration) | ||||||
|  | 	if err != nil { | ||||||
|  | 		i.conn = nil | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	err = i.authenticate(connection) | ||||||
|  | 	if err != nil { | ||||||
|  | 		i.conn = nil | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (i *Instrumental) Close() error { | ||||||
|  | 	i.conn.Close() | ||||||
|  | 	i.conn = nil | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (i *Instrumental) Write(metrics []telegraf.Metric) error { | ||||||
|  | 	if i.conn == nil { | ||||||
|  | 		err := i.Connect() | ||||||
|  | 		if err != nil { | ||||||
|  | 			return fmt.Errorf("FAILED to (re)connect to Instrumental. Error: %s\n", err) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	var points []string | ||||||
|  | 	var metricType string | ||||||
|  | 	var toSerialize telegraf.Metric | ||||||
|  | 	var newTags map[string]string | ||||||
|  | 
 | ||||||
|  | 	for _, metric := range metrics { | ||||||
|  | 		// Pull the metric_type out of the metric's tags. We don't want the type
 | ||||||
|  | 		// to show up with the other tags pulled from the system, as they go in the
 | ||||||
|  | 		// beginning of the line instead.
 | ||||||
|  | 		// e.g we want:
 | ||||||
|  | 		//
 | ||||||
|  | 		//  increment some_prefix.host.tag1.tag2.tag3.field value timestamp
 | ||||||
|  | 		//
 | ||||||
|  | 		// vs
 | ||||||
|  | 		//
 | ||||||
|  | 		//  increment some_prefix.host.tag1.tag2.tag3.counter.field value timestamp
 | ||||||
|  | 		//
 | ||||||
|  | 		newTags = metric.Tags() | ||||||
|  | 		metricType = newTags["metric_type"] | ||||||
|  | 		delete(newTags, "metric_type") | ||||||
|  | 
 | ||||||
|  | 		toSerialize, _ = telegraf.NewMetric( | ||||||
|  | 			metric.Name(), | ||||||
|  | 			newTags, | ||||||
|  | 			metric.Fields(), | ||||||
|  | 			metric.Time(), | ||||||
|  | 		) | ||||||
|  | 
 | ||||||
|  | 		stats, err := s.Serialize(toSerialize) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.Printf("Error serializing a metric to Instrumental: %s", err) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		switch metricType { | ||||||
|  | 		case "counter": | ||||||
|  | 			fallthrough | ||||||
|  | 		case "histogram": | ||||||
|  | 			metricType = "increment" | ||||||
|  | 		default: | ||||||
|  | 			metricType = "gauge" | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		for _, stat := range stats { | ||||||
|  | 			if !StatIncludesBadChar.MatchString(stat) { | ||||||
|  | 				points = append(points, fmt.Sprintf("%s %s", metricType, stat)) | ||||||
|  | 			} else if i.Debug { | ||||||
|  | 				log.Printf("Unable to send bad stat: %s", stat) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	allPoints := strings.Join(points, "\n") + "\n" | ||||||
|  | 	_, err = fmt.Fprintf(i.conn, allPoints) | ||||||
|  | 
 | ||||||
|  | 	if i.Debug { | ||||||
|  | 		log.Println(allPoints) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err != nil { | ||||||
|  | 		if err == io.EOF { | ||||||
|  | 			i.Close() | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (i *Instrumental) Description() string { | ||||||
|  | 	return "Configuration for sending metrics to an Instrumental project" | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (i *Instrumental) SampleConfig() string { | ||||||
|  | 	return sampleConfig | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (i *Instrumental) authenticate(conn net.Conn) error { | ||||||
|  | 	_, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// The response here will either be two "ok"s or an error message.
 | ||||||
|  | 	responses := make([]byte, 512) | ||||||
|  | 	if _, err = conn.Read(responses); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if string(responses)[:6] != "ok\nok\n" { | ||||||
|  | 		return fmt.Errorf("Authentication failed: %s", responses) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	i.conn = conn | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func init() { | ||||||
|  | 	outputs.Add("instrumental", func() telegraf.Output { | ||||||
|  | 		return &Instrumental{ | ||||||
|  | 			Host:     DefaultHost, | ||||||
|  | 			Template: graphite.DEFAULT_TEMPLATE, | ||||||
|  | 		} | ||||||
|  | 	}) | ||||||
|  | } | ||||||
|  | @ -0,0 +1,114 @@ | ||||||
|  | package instrumental | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"bufio" | ||||||
|  | 	"net" | ||||||
|  | 	"net/textproto" | ||||||
|  | 	"sync" | ||||||
|  | 	"testing" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/influxdata/telegraf" | ||||||
|  | 	"github.com/stretchr/testify/assert" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func TestWrite(t *testing.T) { | ||||||
|  | 	var wg sync.WaitGroup | ||||||
|  | 	wg.Add(1) | ||||||
|  | 	go TCPServer(t, &wg) | ||||||
|  | 	// Give the fake TCP server some time to start:
 | ||||||
|  | 	time.Sleep(time.Millisecond * 100) | ||||||
|  | 
 | ||||||
|  | 	i := Instrumental{ | ||||||
|  | 		Host:     "127.0.0.1", | ||||||
|  | 		ApiToken: "abc123token", | ||||||
|  | 		Prefix:   "my.prefix", | ||||||
|  | 	} | ||||||
|  | 	i.Connect() | ||||||
|  | 
 | ||||||
|  | 	// Default to gauge
 | ||||||
|  | 	m1, _ := telegraf.NewMetric( | ||||||
|  | 		"mymeasurement", | ||||||
|  | 		map[string]string{"host": "192.168.0.1"}, | ||||||
|  | 		map[string]interface{}{"myfield": float64(3.14)}, | ||||||
|  | 		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||||||
|  | 	) | ||||||
|  | 	m2, _ := telegraf.NewMetric( | ||||||
|  | 		"mymeasurement", | ||||||
|  | 		map[string]string{"host": "192.168.0.1", "metric_type": "set"}, | ||||||
|  | 		map[string]interface{}{"value": float64(3.14)}, | ||||||
|  | 		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	// Simulate a connection close and reconnect.
 | ||||||
|  | 	metrics := []telegraf.Metric{m1, m2} | ||||||
|  | 	i.Write(metrics) | ||||||
|  | 	i.Close() | ||||||
|  | 
 | ||||||
|  | 	// Counter and Histogram are increments
 | ||||||
|  | 	m3, _ := telegraf.NewMetric( | ||||||
|  | 		"my_histogram", | ||||||
|  | 		map[string]string{"host": "192.168.0.1", "metric_type": "histogram"}, | ||||||
|  | 		map[string]interface{}{"value": float64(3.14)}, | ||||||
|  | 		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||||||
|  | 	) | ||||||
|  | 	// We will drop metrics that simply won't be accepted by Instrumental
 | ||||||
|  | 	m4, _ := telegraf.NewMetric( | ||||||
|  | 		"bad_values", | ||||||
|  | 		map[string]string{"host": "192.168.0.1", "metric_type": "counter"}, | ||||||
|  | 		map[string]interface{}{"value": "\" 3:30\""}, | ||||||
|  | 		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||||||
|  | 	) | ||||||
|  | 	m5, _ := telegraf.NewMetric( | ||||||
|  | 		"my_counter", | ||||||
|  | 		map[string]string{"host": "192.168.0.1", "metric_type": "counter"}, | ||||||
|  | 		map[string]interface{}{"value": float64(3.14)}, | ||||||
|  | 		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	metrics = []telegraf.Metric{m3, m4, m5} | ||||||
|  | 	i.Write(metrics) | ||||||
|  | 
 | ||||||
|  | 	wg.Wait() | ||||||
|  | 	i.Close() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TCPServer(t *testing.T, wg *sync.WaitGroup) { | ||||||
|  | 	tcpServer, _ := net.Listen("tcp", "127.0.0.1:8000") | ||||||
|  | 	defer wg.Done() | ||||||
|  | 	conn, _ := tcpServer.Accept() | ||||||
|  | 	conn.SetDeadline(time.Now().Add(1 * time.Second)) | ||||||
|  | 	reader := bufio.NewReader(conn) | ||||||
|  | 	tp := textproto.NewReader(reader) | ||||||
|  | 
 | ||||||
|  | 	hello, _ := tp.ReadLine() | ||||||
|  | 	assert.Equal(t, "hello version go/telegraf/1.0", hello) | ||||||
|  | 	auth, _ := tp.ReadLine() | ||||||
|  | 	assert.Equal(t, "authenticate abc123token", auth) | ||||||
|  | 
 | ||||||
|  | 	conn.Write([]byte("ok\nok\n")) | ||||||
|  | 
 | ||||||
|  | 	data1, _ := tp.ReadLine() | ||||||
|  | 	assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) | ||||||
|  | 	data2, _ := tp.ReadLine() | ||||||
|  | 	assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) | ||||||
|  | 
 | ||||||
|  | 	conn, _ = tcpServer.Accept() | ||||||
|  | 	conn.SetDeadline(time.Now().Add(1 * time.Second)) | ||||||
|  | 	reader = bufio.NewReader(conn) | ||||||
|  | 	tp = textproto.NewReader(reader) | ||||||
|  | 
 | ||||||
|  | 	hello, _ = tp.ReadLine() | ||||||
|  | 	assert.Equal(t, "hello version go/telegraf/1.0", hello) | ||||||
|  | 	auth, _ = tp.ReadLine() | ||||||
|  | 	assert.Equal(t, "authenticate abc123token", auth) | ||||||
|  | 
 | ||||||
|  | 	conn.Write([]byte("ok\nok\n")) | ||||||
|  | 
 | ||||||
|  | 	data3, _ := tp.ReadLine() | ||||||
|  | 	assert.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3) | ||||||
|  | 	data4, _ := tp.ReadLine() | ||||||
|  | 	assert.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data4) | ||||||
|  | 
 | ||||||
|  | 	conn.Close() | ||||||
|  | } | ||||||
		Loading…
	
		Reference in New Issue