Clean up logging messages and add flusher startup delay

Fixes #294
This commit is contained in:
Cameron Sparr 2015-10-20 16:45:31 -06:00
parent dd2e9e08df
commit ac685d19f8
2 changed files with 16 additions and 16 deletions

View File

@ -1,7 +1,6 @@
package telegraf package telegraf
import ( import (
"errors"
"fmt" "fmt"
"log" "log"
"os" "os"
@ -212,7 +211,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
wg.Wait() wg.Wait()
elapsed := time.Since(start) elapsed := time.Since(start)
log.Printf("Default (%s) interval, gathered metrics from %d plugins in %s\n", log.Printf("Gathered metrics, (%s interval), from %d plugins in %s\n",
a.Interval, counter, elapsed) a.Interval, counter, elapsed)
return nil return nil
} }
@ -240,7 +239,7 @@ func (a *Agent) gatherSeparate(
} }
elapsed := time.Since(start) elapsed := time.Since(start)
log.Printf("Separate (%s) interval, gathered metrics from %s in %s\n", log.Printf("Gathered metrics, (separate %s interval), from %s in %s\n",
plugin.config.Interval, plugin.name, elapsed) plugin.config.Interval, plugin.name, elapsed)
if outerr != nil { if outerr != nil {
@ -294,29 +293,35 @@ func (a *Agent) Test() error {
return nil return nil
} }
func (a *Agent) flush(points []*client.Point) error { func (a *Agent) flush(points []*client.Point) {
var wg sync.WaitGroup var wg sync.WaitGroup
var outerr error
start := time.Now()
counter := 0
for _, o := range a.outputs { for _, o := range a.outputs {
wg.Add(1) wg.Add(1)
counter++
go func(ro *runningOutput) { go func(ro *runningOutput) {
defer wg.Done() defer wg.Done()
// Log all output errors: // Log all output errors:
if err := ro.output.Write(points); err != nil { if err := ro.output.Write(points); err != nil {
log.Printf("Error in output [%s]: %s", ro.name, err) log.Printf("Error in output [%s]: %s", ro.name, err.Error())
outerr = errors.New("Error encountered flushing outputs")
} }
}(o) }(o)
} }
wg.Wait() wg.Wait()
return outerr elapsed := time.Since(start)
log.Printf("Flushed %d metrics to %d output sinks in %s\n",
len(points), counter, elapsed)
} }
// flusher monitors the points input channel and flushes on the minimum interval // flusher monitors the points input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error { func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error {
// Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 100)
ticker := time.NewTicker(a.FlushInterval.Duration) ticker := time.NewTicker(a.FlushInterval.Duration)
points := make([]*client.Point, 0) points := make([]*client.Point, 0)
for { for {
@ -324,12 +329,7 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er
case <-shutdown: case <-shutdown:
return nil return nil
case <-ticker.C: case <-ticker.C:
start := time.Now() a.flush(points)
if err := a.flush(points); err != nil {
log.Printf(err.Error())
}
elapsed := time.Since(start)
log.Printf("Flushed %d metrics in %s\n", len(points), elapsed)
points = make([]*client.Point, 0) points = make([]*client.Point, 0)
case pt := <-pointChan: case pt := <-pointChan:
points = append(points, pt) points = append(points, pt)

View File

@ -133,8 +133,8 @@ func main() {
log.Printf("Loaded outputs: %s", strings.Join(outputs, " ")) log.Printf("Loaded outputs: %s", strings.Join(outputs, " "))
log.Printf("Loaded plugins: %s", strings.Join(plugins, " ")) log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v, "+ log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v, "+
"Precision:%#v, UTC: %#v\n", "Flush Interval:%s\n",
ag.Interval, ag.Debug, ag.Hostname, ag.Precision, ag.UTC) ag.Interval, ag.Debug, ag.Hostname, ag.FlushInterval)
log.Printf("Tags enabled: %s", config.ListTags()) log.Printf("Tags enabled: %s", config.ListTags())
if *fPidfile != "" { if *fPidfile != "" {