Fix problem with graphite talking to closed connections (#2171)

We were having problems with telegraf talking to
carbon-relay-ng using the graphite output. When
the carbon-relay-ng server restarted the connection
the telegraf side would go into CLOSE_WAIT but telegraf
would continue to send statistics through the connection.

Reading around it seems you need to a read from the connection
and see a EOF error. We've implemented this and added a test
that replicates roughly the error we were having.

Pair: @whpearson @joshmyers
This commit is contained in:
Will Pearson 2017-01-24 20:50:29 +00:00 committed by Cameron Sparr
parent 29933d0835
commit e1faf06974
2 changed files with 65 additions and 7 deletions

View File

@ -2,6 +2,7 @@ package graphite
import ( import (
"errors" "errors"
"io"
"log" "log"
"math/rand" "math/rand"
"net" "net"
@ -71,6 +72,31 @@ func (g *Graphite) Description() string {
return "Configuration for Graphite server to send metrics to" return "Configuration for Graphite server to send metrics to"
} }
// We need check eof as we can write to nothing without noticing anything is wrong
// the connection stays in a close_wait
// We can detect that by finding an eof
// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!)
// props to Tv via the authors of carbon-relay-ng` for this trick.
func checkEOF(conn net.Conn) {
b := make([]byte, 1024)
conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
num, err := conn.Read(b)
if err == io.EOF {
log.Printf("E! Conn %s is closed. closing conn explicitly", conn)
conn.Close()
return
}
// just in case i misunderstand something or the remote behaves badly
if num != 0 {
log.Printf("I! conn %s .conn.Read data? did not expect that. data: %s\n", conn, b[:num])
}
// Log non-timeout errors or close.
if e, ok := err.(net.Error); !(ok && e.Timeout()) {
log.Printf("E! conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s\n", conn, err)
conn.Close()
}
}
// Choose a random server in the cluster to write to until a successful write // Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error. // occurs, logging each unsuccessful. If all servers fail, return error.
func (g *Graphite) Write(metrics []telegraf.Metric) error { func (g *Graphite) Write(metrics []telegraf.Metric) error {
@ -91,13 +117,13 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
// This will get set to nil if a successful write occurs // This will get set to nil if a successful write occurs
err = errors.New("Could not write to any Graphite server in cluster\n") err = errors.New("Could not write to any Graphite server in cluster\n")
// Send data to a random server // Send data to a random server
p := rand.Perm(len(g.conns)) p := rand.Perm(len(g.conns))
for _, n := range p { for _, n := range p {
if g.Timeout > 0 { if g.Timeout > 0 {
g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second)) g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
} }
checkEOF(g.conns[n])
if _, e := g.conns[n].Write(batch); e != nil { if _, e := g.conns[n].Write(batch); e != nil {
// Error // Error
log.Println("E! Graphite Error: " + e.Error()) log.Println("E! Graphite Error: " + e.Error())
@ -110,6 +136,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
} }
// try to reconnect // try to reconnect
if err != nil { if err != nil {
log.Println("E! Reconnecting: ")
g.Connect() g.Connect()
} }
return err return err

View File

@ -43,7 +43,8 @@ func TestGraphiteOK(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
// Start TCP server // Start TCP server
wg.Add(1) wg.Add(1)
go TCPServer(t, &wg) t.Log("Starting server")
go TCPServer1(t, &wg)
// Give the fake graphite TCP server some time to start: // Give the fake graphite TCP server some time to start:
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
@ -51,6 +52,7 @@ func TestGraphiteOK(t *testing.T) {
g := Graphite{ g := Graphite{
Prefix: "my.prefix", Prefix: "my.prefix",
} }
// Init metrics // Init metrics
m1, _ := metric.New( m1, _ := metric.New(
"mymeasurement", "mymeasurement",
@ -72,29 +74,58 @@ func TestGraphiteOK(t *testing.T) {
) )
// Prepare point list // Prepare point list
metrics := []telegraf.Metric{m1, m2, m3} metrics := []telegraf.Metric{m1}
metrics2 := []telegraf.Metric{m2, m3}
err1 := g.Connect() err1 := g.Connect()
require.NoError(t, err1) require.NoError(t, err1)
// Send Data // Send Data
t.Log("Send first data")
err2 := g.Write(metrics) err2 := g.Write(metrics)
require.NoError(t, err2) require.NoError(t, err2)
// Waiting TCPserver // Waiting TCPserver
wg.Wait() wg.Wait()
t.Log("Finished Waiting for first data")
var wg2 sync.WaitGroup
// Start TCP server
time.Sleep(time.Millisecond * 100)
wg2.Add(1)
go TCPServer2(t, &wg2)
time.Sleep(time.Millisecond * 100)
//Write but expect an error, but reconnect
g.Write(metrics2)
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have failed")
//Actually write the new metrics
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close() g.Close()
} }
func TCPServer(t *testing.T, wg *sync.WaitGroup) { func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
defer wg.Done() defer wg.Done()
conn, _ := tcpServer.Accept() tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
conn, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn) reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader) tp := textproto.NewReader(reader)
data1, _ := tp.ReadLine() data1, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
conn.Close()
tcpServer.Close()
}
func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
defer wg.Done()
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
conn2, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn2)
tp := textproto.NewReader(reader)
data2, _ := tp.ReadLine() data2, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
data3, _ := tp.ReadLine() data3, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3) assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3)
conn.Close() conn2.Close()
tcpServer.Close()
} }