From 5f8d908f74f65b06f3338fa29dcbae8efb1b0435 Mon Sep 17 00:00:00 2001 From: Piotr Popieluch Date: Thu, 18 Jan 2018 00:27:24 +0100 Subject: [PATCH] Reconnect before sending graphite metrics if disconnected (#3680) --- plugins/outputs/graphite/graphite.go | 24 +++++++++++++++++------ plugins/outputs/graphite/graphite_test.go | 6 ++---- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index 53c4bdc1b..7bad4be07 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -155,8 +155,22 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { batch = append(batch, buf...) } + err = g.send(batch) + + // try to reconnect and retry to send + if err != nil { + log.Println("E! Graphite: Reconnecting and retrying: ") + g.Connect() + err = g.send(batch) + } + + return err +} + +func (g *Graphite) send(batch []byte) error { // This will get set to nil if a successful write occurs - err = errors.New("Could not write to any Graphite server in cluster\n") + err := errors.New("Could not write to any Graphite server in cluster\n") + // Send data to a random server p := rand.Perm(len(g.conns)) for _, n := range p { @@ -167,6 +181,8 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { if _, e := g.conns[n].Write(batch); e != nil { // Error log.Println("E! Graphite Error: " + e.Error()) + // Close explicitely + g.conns[n].Close() // Let's try the next one } else { // Success @@ -174,11 +190,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { break } } - // try to reconnect - if err != nil { - log.Println("E! Reconnecting: ") - g.Connect() - } + return err } diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index 3984728af..485829fec 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -81,7 +81,7 @@ func TestGraphiteOK(t *testing.T) { err2 := g.Write(metrics) require.NoError(t, err2) - // Waiting TCPserver + // Waiting TCPserver, should reconnect and resend wg.Wait() t.Log("Finished Waiting for first data") var wg2 sync.WaitGroup @@ -89,10 +89,8 @@ func TestGraphiteOK(t *testing.T) { wg2.Add(1) TCPServer2(t, &wg2) //Write but expect an error, but reconnect - g.Write(metrics2) err3 := g.Write(metrics2) - t.Log("Finished writing second data, it should have failed") - //Actually write the new metrics + t.Log("Finished writing second data, it should have reconnected automatically") require.NoError(t, err3) t.Log("Finished writing third data")