instrumental plugin, rewrite connection retries

closes #1412

separate hello and authenticate functions,
force connection close at end of write cycle so we don't
hold open idle connections,
which has the benefit of mostly removing
the chance of getting hopelessly connection lost

bump instrumental agent version

fix test to deal with better better connect/reconnect logic and changed ident & auth handshake

Update CHANGELOG.md

correct URL from instrumental fork to origin and put the change in the correct part of the file

go fmt

undo split hello and auth commands, to reduce roundtrips
This commit is contained in:
Joel Meador 2016-06-21 16:28:31 -04:00 committed by Cameron Sparr
parent 4651ab88ad
commit 21add2c799
3 changed files with 14 additions and 11 deletions

View File

@ -48,6 +48,7 @@ should now look like:
- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix. - [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix.
- [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes. - [#1334](https://github.com/influxdata/telegraf/issues/1334): Prometheus output, metric refresh and caching fixes.
- [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load. - [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load.
- [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior
## v1.0 beta 2 [2016-06-21] ## v1.0 beta 2 [2016-06-21]

View File

@ -29,7 +29,9 @@ type Instrumental struct {
const ( const (
DefaultHost = "collector.instrumentalapp.com" DefaultHost = "collector.instrumentalapp.com"
AuthFormat = "hello version go/telegraf/1.0\nauthenticate %s\n" HelloMessage = "hello version go/telegraf/1.1\n"
AuthFormat = "authenticate %s\n"
HandshakeFormat = HelloMessage + AuthFormat
) )
var ( var (
@ -52,6 +54,7 @@ var sampleConfig = `
func (i *Instrumental) Connect() error { func (i *Instrumental) Connect() error {
connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration) connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration)
if err != nil { if err != nil {
i.conn = nil i.conn = nil
return err return err
@ -151,6 +154,11 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
return err return err
} }
// force the connection closed after sending data
// to deal with various disconnection scenarios and eschew holding
// open idle connections en masse
i.Close()
return nil return nil
} }
@ -163,7 +171,7 @@ func (i *Instrumental) SampleConfig() string {
} }
func (i *Instrumental) authenticate(conn net.Conn) error { func (i *Instrumental) authenticate(conn net.Conn) error {
_, err := fmt.Fprintf(conn, AuthFormat, i.ApiToken) _, err := fmt.Fprintf(conn, HandshakeFormat, i.ApiToken)
if err != nil { if err != nil {
return err return err
} }

View File

@ -24,7 +24,6 @@ func TestWrite(t *testing.T) {
ApiToken: "abc123token", ApiToken: "abc123token",
Prefix: "my.prefix", Prefix: "my.prefix",
} }
i.Connect()
// Default to gauge // Default to gauge
m1, _ := telegraf.NewMetric( m1, _ := telegraf.NewMetric(
@ -40,10 +39,8 @@ func TestWrite(t *testing.T) {
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
// Simulate a connection close and reconnect.
metrics := []telegraf.Metric{m1, m2} metrics := []telegraf.Metric{m1, m2}
i.Write(metrics) i.Write(metrics)
i.Close()
// Counter and Histogram are increments // Counter and Histogram are increments
m3, _ := telegraf.NewMetric( m3, _ := telegraf.NewMetric(
@ -70,7 +67,6 @@ func TestWrite(t *testing.T) {
i.Write(metrics) i.Write(metrics)
wg.Wait() wg.Wait()
i.Close()
} }
func TCPServer(t *testing.T, wg *sync.WaitGroup) { func TCPServer(t *testing.T, wg *sync.WaitGroup) {
@ -82,10 +78,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tp := textproto.NewReader(reader) tp := textproto.NewReader(reader)
hello, _ := tp.ReadLine() hello, _ := tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello) assert.Equal(t, "hello version go/telegraf/1.1", hello)
auth, _ := tp.ReadLine() auth, _ := tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth) assert.Equal(t, "authenticate abc123token", auth)
conn.Write([]byte("ok\nok\n")) conn.Write([]byte("ok\nok\n"))
data1, _ := tp.ReadLine() data1, _ := tp.ReadLine()
@ -99,10 +94,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
tp = textproto.NewReader(reader) tp = textproto.NewReader(reader)
hello, _ = tp.ReadLine() hello, _ = tp.ReadLine()
assert.Equal(t, "hello version go/telegraf/1.0", hello) assert.Equal(t, "hello version go/telegraf/1.1", hello)
auth, _ = tp.ReadLine() auth, _ = tp.ReadLine()
assert.Equal(t, "authenticate abc123token", auth) assert.Equal(t, "authenticate abc123token", auth)
conn.Write([]byte("ok\nok\n")) conn.Write([]byte("ok\nok\n"))
data3, _ := tp.ReadLine() data3, _ := tp.ReadLine()