diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index 24f8e08d0..8d4447cdd 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -2,6 +2,7 @@ package graphite import ( "errors" + "io" "log" "math/rand" "net" @@ -71,6 +72,31 @@ func (g *Graphite) Description() string { return "Configuration for Graphite server to send metrics to" } +// We need check eof as we can write to nothing without noticing anything is wrong +// the connection stays in a close_wait +// We can detect that by finding an eof +// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!) +// props to Tv via the authors of carbon-relay-ng` for this trick. +func checkEOF(conn net.Conn) { + b := make([]byte, 1024) + conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)) + num, err := conn.Read(b) + if err == io.EOF { + log.Printf("E! Conn %s is closed. closing conn explicitly", conn) + conn.Close() + return + } + // just in case i misunderstand something or the remote behaves badly + if num != 0 { + log.Printf("I! conn %s .conn.Read data? did not expect that. data: %s\n", conn, b[:num]) + } + // Log non-timeout errors or close. + if e, ok := err.(net.Error); !(ok && e.Timeout()) { + log.Printf("E! conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s\n", conn, err) + conn.Close() + } +} + // Choose a random server in the cluster to write to until a successful write // occurs, logging each unsuccessful. If all servers fail, return error. func (g *Graphite) Write(metrics []telegraf.Metric) error { @@ -91,13 +117,13 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { // This will get set to nil if a successful write occurs err = errors.New("Could not write to any Graphite server in cluster\n") - // Send data to a random server p := rand.Perm(len(g.conns)) for _, n := range p { if g.Timeout > 0 { g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second)) } + checkEOF(g.conns[n]) if _, e := g.conns[n].Write(batch); e != nil { // Error log.Println("E! Graphite Error: " + e.Error()) @@ -110,6 +136,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { } // try to reconnect if err != nil { + log.Println("E! Reconnecting: ") g.Connect() } return err diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index c4f132725..4f1f2fef6 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -43,7 +43,8 @@ func TestGraphiteOK(t *testing.T) { var wg sync.WaitGroup // Start TCP server wg.Add(1) - go TCPServer(t, &wg) + t.Log("Starting server") + go TCPServer1(t, &wg) // Give the fake graphite TCP server some time to start: time.Sleep(time.Millisecond * 100) @@ -51,6 +52,7 @@ func TestGraphiteOK(t *testing.T) { g := Graphite{ Prefix: "my.prefix", } + // Init metrics m1, _ := metric.New( "mymeasurement", @@ -72,29 +74,58 @@ func TestGraphiteOK(t *testing.T) { ) // Prepare point list - metrics := []telegraf.Metric{m1, m2, m3} + metrics := []telegraf.Metric{m1} + metrics2 := []telegraf.Metric{m2, m3} err1 := g.Connect() require.NoError(t, err1) // Send Data + t.Log("Send first data") err2 := g.Write(metrics) require.NoError(t, err2) // Waiting TCPserver wg.Wait() + t.Log("Finished Waiting for first data") + var wg2 sync.WaitGroup + // Start TCP server + time.Sleep(time.Millisecond * 100) + wg2.Add(1) + go TCPServer2(t, &wg2) + time.Sleep(time.Millisecond * 100) + //Write but expect an error, but reconnect + g.Write(metrics2) + err3 := g.Write(metrics2) + t.Log("Finished writing second data, it should have failed") + //Actually write the new metrics + + require.NoError(t, err3) + t.Log("Finished writing third data") + wg2.Wait() g.Close() } -func TCPServer(t *testing.T, wg *sync.WaitGroup) { - tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") +func TCPServer1(t *testing.T, wg *sync.WaitGroup) { defer wg.Done() - conn, _ := tcpServer.Accept() + tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") + conn, _ := (tcpServer).Accept() reader := bufio.NewReader(conn) tp := textproto.NewReader(reader) data1, _ := tp.ReadLine() assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) + conn.Close() + tcpServer.Close() +} + +func TCPServer2(t *testing.T, wg *sync.WaitGroup) { + defer wg.Done() + tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") + conn2, _ := (tcpServer).Accept() + reader := bufio.NewReader(conn2) + tp := textproto.NewReader(reader) data2, _ := tp.ReadLine() assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) data3, _ := tp.ReadLine() assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3) - conn.Close() + conn2.Close() + tcpServer.Close() }