From dfc59866e896122cbf80cbad858093a8f12d38f6 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 21 Oct 2015 10:57:51 -0600 Subject: [PATCH] Add support for retrying output writes, using independent threads Fixes #285 --- agent.go | 70 +++++++++++++++++++++++++++++++----------- config.go | 2 ++ etc/config.sample.toml | 18 +++++------ 3 files changed, 61 insertions(+), 29 deletions(-) diff --git a/agent.go b/agent.go index aa146ca3d..d13c171f8 100644 --- a/agent.go +++ b/agent.go @@ -34,6 +34,9 @@ type Agent struct { // Interval at which to flush data FlushInterval Duration + // FlushRetries is the number of times to retry each data flush + FlushRetries int + // TODO(cam): Remove UTC and Precision parameters, they are no longer // valid for the agent config. Leaving them here for now for backwards- // compatability @@ -61,6 +64,7 @@ func NewAgent(config *Config) (*Agent, error) { Config: config, Interval: Duration{10 * time.Second}, FlushInterval: Duration{10 * time.Second}, + FlushRetries: 2, UTC: true, Precision: "s", } @@ -293,28 +297,56 @@ func (a *Agent) Test() error { return nil } -func (a *Agent) flush(points []*client.Point) { - var wg sync.WaitGroup - +// writeOutput writes a list of points to a single output, with retries +func (a *Agent) writeOutput( + points []*client.Point, + ro *runningOutput, + shutdown chan struct{}, +) { + retry := 0 + retries := a.FlushRetries start := time.Now() - counter := 0 - for _, o := range a.outputs { - wg.Add(1) - counter++ - go func(ro *runningOutput) { - defer wg.Done() - // Log all output errors: - if err := ro.output.Write(points); err != nil { - log.Printf("Error in output [%s]: %s", ro.name, err.Error()) + for { + err := ro.output.Write(points) + + select { + case <-shutdown: + return + default: + if err == nil { + // Write successful + elapsed := time.Since(start) + log.Printf("Flushed %d metrics to output %s in %s\n", + len(points), ro.name, elapsed) + return + } else if retry >= retries { + // No more retries + msg := "FATAL: Write to output [%s] failed %d times, dropping" + + " %d metrics\n" + log.Printf(msg, ro.name, retries+1, len(points)) + return + } else if err != nil { + // Sleep for a retry + log.Printf("Error in output [%s]: %s, retrying in %s", + ro.name, err.Error(), a.FlushInterval.Duration) + time.Sleep(a.FlushInterval.Duration) } - }(o) + } + + retry++ + } +} + +// flush writes a list of points to all configured outputs +func (a *Agent) flush(points []*client.Point, shutdown chan struct{}) { + if len(points) == 0 { + return } - wg.Wait() - elapsed := time.Since(start) - log.Printf("Flushed %d metrics to %d output sinks in %s\n", - len(points), counter, elapsed) + for _, o := range a.outputs { + go a.writeOutput(points, o, shutdown) + } } // flusher monitors the points input channel and flushes on the minimum interval @@ -327,9 +359,11 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er for { select { case <-shutdown: + log.Println("Hang on, flushing any cached points before shutdown") + a.flush(points, shutdown) return nil case <-ticker.C: - a.flush(points) + a.flush(points, shutdown) points = make([]*client.Point, 0) case pt := <-pointChan: points = append(points, pt) diff --git a/config.go b/config.go index 13f62ce55..ac0fba425 100644 --- a/config.go +++ b/config.go @@ -357,6 +357,8 @@ var header = `# Telegraf configuration interval = "10s" # Default data flushing interval for all outputs flush_interval = "10s" + # Number of times to retry each data flush + flush_retries = 2 # run telegraf in debug mode debug = false # Override default hostname, if empty use os.Hostname() diff --git a/etc/config.sample.toml b/etc/config.sample.toml index c66ea50f4..160d56630 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -27,17 +27,12 @@ [agent] # Default data collection interval for all plugins interval = "10s" - - # If utc = false, uses local time (utc is highly recommended) - utc = true - - # Precision of writes, valid values are n, u, ms, s, m, and h - # note: using second precision greatly helps InfluxDB compression - precision = "s" - + # Default data flushing interval for all outputs + flush_interval = "10s" + # Number of times to retry each data flush + flush_retries = 2 # run telegraf in debug mode debug = false - # Override default hostname, if empty use os.Hostname() hostname = "" @@ -54,15 +49,16 @@ # Multiple urls can be specified for InfluxDB cluster support. Server to # write to will be randomly chosen each interval. urls = ["http://localhost:8086"] # required. - # The target database for metrics. This database must already exist database = "telegraf" # required. + # Precision of writes, valid values are n, u, ms, s, m, and h + # note: using second precision greatly helps InfluxDB compression + precision = "s" # Connection timeout (for the connection with InfluxDB), formatted as a string. # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". # If not provided, will default to 0 (no timeout) # timeout = "5s" - # username = "telegraf" # password = "metricsmetricsmetricsmetrics"