Reconnect before sending graphite metrics if disconnected (#3680)
This commit is contained in:
parent
6e253a67a7
commit
5f8d908f74
|
@ -155,8 +155,22 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
||||||
batch = append(batch, buf...)
|
batch = append(batch, buf...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = g.send(batch)
|
||||||
|
|
||||||
|
// try to reconnect and retry to send
|
||||||
|
if err != nil {
|
||||||
|
log.Println("E! Graphite: Reconnecting and retrying: ")
|
||||||
|
g.Connect()
|
||||||
|
err = g.send(batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *Graphite) send(batch []byte) error {
|
||||||
// This will get set to nil if a successful write occurs
|
// This will get set to nil if a successful write occurs
|
||||||
err = errors.New("Could not write to any Graphite server in cluster\n")
|
err := errors.New("Could not write to any Graphite server in cluster\n")
|
||||||
|
|
||||||
// Send data to a random server
|
// Send data to a random server
|
||||||
p := rand.Perm(len(g.conns))
|
p := rand.Perm(len(g.conns))
|
||||||
for _, n := range p {
|
for _, n := range p {
|
||||||
|
@ -167,6 +181,8 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
||||||
if _, e := g.conns[n].Write(batch); e != nil {
|
if _, e := g.conns[n].Write(batch); e != nil {
|
||||||
// Error
|
// Error
|
||||||
log.Println("E! Graphite Error: " + e.Error())
|
log.Println("E! Graphite Error: " + e.Error())
|
||||||
|
// Close explicitely
|
||||||
|
g.conns[n].Close()
|
||||||
// Let's try the next one
|
// Let's try the next one
|
||||||
} else {
|
} else {
|
||||||
// Success
|
// Success
|
||||||
|
@ -174,11 +190,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// try to reconnect
|
|
||||||
if err != nil {
|
|
||||||
log.Println("E! Reconnecting: ")
|
|
||||||
g.Connect()
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,7 @@ func TestGraphiteOK(t *testing.T) {
|
||||||
err2 := g.Write(metrics)
|
err2 := g.Write(metrics)
|
||||||
require.NoError(t, err2)
|
require.NoError(t, err2)
|
||||||
|
|
||||||
// Waiting TCPserver
|
// Waiting TCPserver, should reconnect and resend
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
t.Log("Finished Waiting for first data")
|
t.Log("Finished Waiting for first data")
|
||||||
var wg2 sync.WaitGroup
|
var wg2 sync.WaitGroup
|
||||||
|
@ -89,10 +89,8 @@ func TestGraphiteOK(t *testing.T) {
|
||||||
wg2.Add(1)
|
wg2.Add(1)
|
||||||
TCPServer2(t, &wg2)
|
TCPServer2(t, &wg2)
|
||||||
//Write but expect an error, but reconnect
|
//Write but expect an error, but reconnect
|
||||||
g.Write(metrics2)
|
|
||||||
err3 := g.Write(metrics2)
|
err3 := g.Write(metrics2)
|
||||||
t.Log("Finished writing second data, it should have failed")
|
t.Log("Finished writing second data, it should have reconnected automatically")
|
||||||
//Actually write the new metrics
|
|
||||||
|
|
||||||
require.NoError(t, err3)
|
require.NoError(t, err3)
|
||||||
t.Log("Finished writing third data")
|
t.Log("Finished writing third data")
|
||||||
|
|
Loading…
Reference in New Issue