Add support for retrying output writes, using independent threads

Fixes #285
This commit is contained in:
Cameron Sparr 2015-10-21 10:57:51 -06:00
parent ac685d19f8
commit dfc59866e8
3 changed files with 61 additions and 29 deletions

View File

@ -34,6 +34,9 @@ type Agent struct {
// Interval at which to flush data // Interval at which to flush data
FlushInterval Duration FlushInterval Duration
// FlushRetries is the number of times to retry each data flush
FlushRetries int
// TODO(cam): Remove UTC and Precision parameters, they are no longer // TODO(cam): Remove UTC and Precision parameters, they are no longer
// valid for the agent config. Leaving them here for now for backwards- // valid for the agent config. Leaving them here for now for backwards-
// compatability // compatability
@ -61,6 +64,7 @@ func NewAgent(config *Config) (*Agent, error) {
Config: config, Config: config,
Interval: Duration{10 * time.Second}, Interval: Duration{10 * time.Second},
FlushInterval: Duration{10 * time.Second}, FlushInterval: Duration{10 * time.Second},
FlushRetries: 2,
UTC: true, UTC: true,
Precision: "s", Precision: "s",
} }
@ -293,28 +297,56 @@ func (a *Agent) Test() error {
return nil return nil
} }
func (a *Agent) flush(points []*client.Point) { // writeOutput writes a list of points to a single output, with retries
var wg sync.WaitGroup func (a *Agent) writeOutput(
points []*client.Point,
ro *runningOutput,
shutdown chan struct{},
) {
retry := 0
retries := a.FlushRetries
start := time.Now() start := time.Now()
counter := 0
for _, o := range a.outputs {
wg.Add(1)
counter++
go func(ro *runningOutput) { for {
defer wg.Done() err := ro.output.Write(points)
// Log all output errors:
if err := ro.output.Write(points); err != nil { select {
log.Printf("Error in output [%s]: %s", ro.name, err.Error()) case <-shutdown:
return
default:
if err == nil {
// Write successful
elapsed := time.Since(start)
log.Printf("Flushed %d metrics to output %s in %s\n",
len(points), ro.name, elapsed)
return
} else if retry >= retries {
// No more retries
msg := "FATAL: Write to output [%s] failed %d times, dropping" +
" %d metrics\n"
log.Printf(msg, ro.name, retries+1, len(points))
return
} else if err != nil {
// Sleep for a retry
log.Printf("Error in output [%s]: %s, retrying in %s",
ro.name, err.Error(), a.FlushInterval.Duration)
time.Sleep(a.FlushInterval.Duration)
} }
}(o) }
retry++
}
}
// flush writes a list of points to all configured outputs
func (a *Agent) flush(points []*client.Point, shutdown chan struct{}) {
if len(points) == 0 {
return
} }
wg.Wait() for _, o := range a.outputs {
elapsed := time.Since(start) go a.writeOutput(points, o, shutdown)
log.Printf("Flushed %d metrics to %d output sinks in %s\n", }
len(points), counter, elapsed)
} }
// flusher monitors the points input channel and flushes on the minimum interval // flusher monitors the points input channel and flushes on the minimum interval
@ -327,9 +359,11 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er
for { for {
select { select {
case <-shutdown: case <-shutdown:
log.Println("Hang on, flushing any cached points before shutdown")
a.flush(points, shutdown)
return nil return nil
case <-ticker.C: case <-ticker.C:
a.flush(points) a.flush(points, shutdown)
points = make([]*client.Point, 0) points = make([]*client.Point, 0)
case pt := <-pointChan: case pt := <-pointChan:
points = append(points, pt) points = append(points, pt)

View File

@ -357,6 +357,8 @@ var header = `# Telegraf configuration
interval = "10s" interval = "10s"
# Default data flushing interval for all outputs # Default data flushing interval for all outputs
flush_interval = "10s" flush_interval = "10s"
# Number of times to retry each data flush
flush_retries = 2
# run telegraf in debug mode # run telegraf in debug mode
debug = false debug = false
# Override default hostname, if empty use os.Hostname() # Override default hostname, if empty use os.Hostname()

View File

@ -27,17 +27,12 @@
[agent] [agent]
# Default data collection interval for all plugins # Default data collection interval for all plugins
interval = "10s" interval = "10s"
# Default data flushing interval for all outputs
# If utc = false, uses local time (utc is highly recommended) flush_interval = "10s"
utc = true # Number of times to retry each data flush
flush_retries = 2
# Precision of writes, valid values are n, u, ms, s, m, and h
# note: using second precision greatly helps InfluxDB compression
precision = "s"
# run telegraf in debug mode # run telegraf in debug mode
debug = false debug = false
# Override default hostname, if empty use os.Hostname() # Override default hostname, if empty use os.Hostname()
hostname = "" hostname = ""
@ -54,15 +49,16 @@
# Multiple urls can be specified for InfluxDB cluster support. Server to # Multiple urls can be specified for InfluxDB cluster support. Server to
# write to will be randomly chosen each interval. # write to will be randomly chosen each interval.
urls = ["http://localhost:8086"] # required. urls = ["http://localhost:8086"] # required.
# The target database for metrics. This database must already exist # The target database for metrics. This database must already exist
database = "telegraf" # required. database = "telegraf" # required.
# Precision of writes, valid values are n, u, ms, s, m, and h
# note: using second precision greatly helps InfluxDB compression
precision = "s"
# Connection timeout (for the connection with InfluxDB), formatted as a string. # Connection timeout (for the connection with InfluxDB), formatted as a string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# If not provided, will default to 0 (no timeout) # If not provided, will default to 0 (no timeout)
# timeout = "5s" # timeout = "5s"
# username = "telegraf" # username = "telegraf"
# password = "metricsmetricsmetricsmetrics" # password = "metricsmetricsmetricsmetrics"