From 21add2c79995eb2297f020abb2d59872c7c3047e Mon Sep 17 00:00:00 2001 From: Joel Meador Date: Tue, 21 Jun 2016 16:28:31 -0400 Subject: [PATCH] instrumental plugin, rewrite connection retries closes #1412 separate hello and authenticate functions, force connection close at end of write cycle so we don't hold open idle connections, which has the benefit of mostly removing the chance of getting hopelessly connection lost bump instrumental agent version fix test to deal with better better connect/reconnect logic and changed ident & auth handshake Update CHANGELOG.md correct URL from instrumental fork to origin and put the change in the correct part of the file go fmt undo split hello and auth commands, to reduce roundtrips --- CHANGELOG.md | 1 + plugins/outputs/instrumental/instrumental.go | 14 +++++++++++--- plugins/outputs/instrumental/instrumental_test.go | 10 ++-------- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b1daa60ac..da4cbf5cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ should now look like: - [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix. - [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes. - [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load. +- [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/outputs/instrumental/instrumental.go b/plugins/outputs/instrumental/instrumental.go index 461ba9d9e..2fcc28cc0 100644 --- a/plugins/outputs/instrumental/instrumental.go +++ b/plugins/outputs/instrumental/instrumental.go @@ -28,8 +28,10 @@ type Instrumental struct { } const ( - DefaultHost = "collector.instrumentalapp.com" - AuthFormat = "hello version go/telegraf/1.0\nauthenticate %s\n" + DefaultHost = "collector.instrumentalapp.com" + HelloMessage = "hello version go/telegraf/1.1\n" + AuthFormat = "authenticate %s\n" + HandshakeFormat = HelloMessage + AuthFormat ) var ( @@ -52,6 +54,7 @@ var sampleConfig = ` func (i *Instrumental) Connect() error { connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration) + if err != nil { i.conn = nil return err @@ -151,6 +154,11 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { return err } + // force the connection closed after sending data + // to deal with various disconnection scenarios and eschew holding + // open idle connections en masse + i.Close() + return nil } @@ -163,7 +171,7 @@ func (i *Instrumental) SampleConfig() string { } func (i *Instrumental) authenticate(conn net.Conn) error { - _, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken) + _, err := fmt.Fprintf(conn, HandshakeFormat, i.ApiToken) if err != nil { return err } diff --git a/plugins/outputs/instrumental/instrumental_test.go b/plugins/outputs/instrumental/instrumental_test.go index ceb53bac6..9708a2590 100644 --- a/plugins/outputs/instrumental/instrumental_test.go +++ b/plugins/outputs/instrumental/instrumental_test.go @@ -24,7 +24,6 @@ func TestWrite(t *testing.T) { ApiToken: "abc123token", Prefix: "my.prefix", } - i.Connect() // Default to gauge m1, _ := telegraf.NewMetric( @@ -40,10 +39,8 @@ func TestWrite(t *testing.T) { time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) - // Simulate a connection close and reconnect. metrics := []telegraf.Metric{m1, m2} i.Write(metrics) - i.Close() // Counter and Histogram are increments m3, _ := telegraf.NewMetric( @@ -70,7 +67,6 @@ func TestWrite(t *testing.T) { i.Write(metrics) wg.Wait() - i.Close() } func TCPServer(t *testing.T, wg *sync.WaitGroup) { @@ -82,10 +78,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) { tp := textproto.NewReader(reader) hello, _ := tp.ReadLine() - assert.Equal(t, "hello version go/telegraf/1.0", hello) + assert.Equal(t, "hello version go/telegraf/1.1", hello) auth, _ := tp.ReadLine() assert.Equal(t, "authenticate abc123token", auth) - conn.Write([]byte("ok\nok\n")) data1, _ := tp.ReadLine() @@ -99,10 +94,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) { tp = textproto.NewReader(reader) hello, _ = tp.ReadLine() - assert.Equal(t, "hello version go/telegraf/1.0", hello) + assert.Equal(t, "hello version go/telegraf/1.1", hello) auth, _ = tp.ReadLine() assert.Equal(t, "authenticate abc123token", auth) - conn.Write([]byte("ok\nok\n")) data3, _ := tp.ReadLine()