74
agent.go
74
agent.go
@@ -3,6 +3,7 @@ package telegraf
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
@@ -32,12 +33,19 @@ type Agent struct {
|
||||
// Interval at which to gather information
|
||||
Interval duration.Duration
|
||||
|
||||
// RoundInterval rounds collection interval to 'interval'.
|
||||
// ie, if Interval=10s then always collect on :00, :10, :20, etc.
|
||||
RoundInterval bool
|
||||
|
||||
// Interval at which to flush data
|
||||
FlushInterval duration.Duration
|
||||
|
||||
// FlushRetries is the number of times to retry each data flush
|
||||
FlushRetries int
|
||||
|
||||
// FlushJitter tells
|
||||
FlushJitter duration.Duration
|
||||
|
||||
// TODO(cam): Remove UTC and Precision parameters, they are no longer
|
||||
// valid for the agent config. Leaving them here for now for backwards-
|
||||
// compatability
|
||||
@@ -64,10 +72,10 @@ func NewAgent(config *Config) (*Agent, error) {
|
||||
agent := &Agent{
|
||||
Tags: make(map[string]string),
|
||||
Interval: duration.Duration{10 * time.Second},
|
||||
RoundInterval: true,
|
||||
FlushInterval: duration.Duration{10 * time.Second},
|
||||
FlushRetries: 2,
|
||||
UTC: true,
|
||||
Precision: "s",
|
||||
FlushJitter: duration.Duration{5 * time.Second},
|
||||
}
|
||||
|
||||
// Apply the toml table to the agent config, overriding defaults
|
||||
@@ -294,30 +302,37 @@ func (a *Agent) Test() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeOutput writes a list of points to a single output, with retries
|
||||
// writeOutput writes a list of points to a single output, with retries.
|
||||
// Optionally takes a `done` channel to indicate that it is done writing.
|
||||
func (a *Agent) writeOutput(
|
||||
points []*client.Point,
|
||||
ro *runningOutput,
|
||||
shutdown chan struct{},
|
||||
wg *sync.WaitGroup,
|
||||
) {
|
||||
defer wg.Done()
|
||||
if len(points) == 0 {
|
||||
return
|
||||
}
|
||||
retry := 0
|
||||
retries := a.FlushRetries
|
||||
start := time.Now()
|
||||
|
||||
for {
|
||||
err := ro.output.Write(points)
|
||||
if err == nil {
|
||||
// Write successful
|
||||
elapsed := time.Since(start)
|
||||
log.Printf("Flushed %d metrics to output %s in %s\n",
|
||||
len(points), ro.name, elapsed)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-shutdown:
|
||||
return
|
||||
default:
|
||||
if err == nil {
|
||||
// Write successful
|
||||
elapsed := time.Since(start)
|
||||
log.Printf("Flushed %d metrics to output %s in %s\n",
|
||||
len(points), ro.name, elapsed)
|
||||
return
|
||||
} else if retry >= retries {
|
||||
if retry >= retries {
|
||||
// No more retries
|
||||
msg := "FATAL: Write to output [%s] failed %d times, dropping" +
|
||||
" %d metrics\n"
|
||||
@@ -336,13 +351,18 @@ func (a *Agent) writeOutput(
|
||||
}
|
||||
|
||||
// flush writes a list of points to all configured outputs
|
||||
func (a *Agent) flush(points []*client.Point, shutdown chan struct{}) {
|
||||
if len(points) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
func (a *Agent) flush(
|
||||
points []*client.Point,
|
||||
shutdown chan struct{},
|
||||
wait bool,
|
||||
) {
|
||||
var wg sync.WaitGroup
|
||||
for _, o := range a.outputs {
|
||||
go a.writeOutput(points, o, shutdown)
|
||||
wg.Add(1)
|
||||
go a.writeOutput(points, o, shutdown, &wg)
|
||||
}
|
||||
if wait {
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -353,14 +373,23 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
ticker := time.NewTicker(a.FlushInterval.Duration)
|
||||
points := make([]*client.Point, 0)
|
||||
jitter := rand.Int63n(int64(a.FlushJitter.Duration))
|
||||
for {
|
||||
select {
|
||||
case <-shutdown:
|
||||
log.Println("Hang on, flushing any cached points before shutdown")
|
||||
a.flush(points, shutdown)
|
||||
a.flush(points, shutdown, true)
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
a.flush(points, shutdown)
|
||||
timer := time.NewTimer(time.Duration(jitter))
|
||||
select {
|
||||
case <-timer.C:
|
||||
a.flush(points, shutdown, false)
|
||||
case <-shutdown:
|
||||
log.Println("Hang on, flushing any cached points before shutdown")
|
||||
a.flush(points, shutdown, true)
|
||||
return nil
|
||||
}
|
||||
points = make([]*client.Point, 0)
|
||||
case pt := <-pointChan:
|
||||
points = append(points, pt)
|
||||
@@ -375,6 +404,13 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
||||
// channel shared between all plugin threads for accumulating points
|
||||
pointChan := make(chan *client.Point, 1000)
|
||||
|
||||
// Round collection to nearest interval by sleeping
|
||||
if a.RoundInterval {
|
||||
i := int64(a.Interval.Duration)
|
||||
time.Sleep(time.Duration(i - (time.Now().UnixNano() % i)))
|
||||
}
|
||||
ticker := time.NewTicker(a.Interval.Duration)
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
@@ -412,8 +448,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
||||
|
||||
defer wg.Wait()
|
||||
|
||||
ticker := time.NewTicker(a.Interval.Duration)
|
||||
|
||||
for {
|
||||
if err := a.gatherParallel(pointChan); err != nil {
|
||||
log.Printf(err.Error())
|
||||
|
||||
Reference in New Issue
Block a user