Utilizing new client and overhauling Accumulator interface

Fixes #280
Fixes #281
Fixes #289
This commit is contained in:
Cameron Sparr
2015-10-16 16:13:32 -06:00
parent 6263bc2d1b
commit c26ce9c4fe
27 changed files with 498 additions and 550 deletions

225
agent.go
View File

@@ -11,6 +11,8 @@ import (
"github.com/influxdb/telegraf/outputs"
"github.com/influxdb/telegraf/plugins"
"github.com/influxdb/influxdb/client/v2"
)
type runningOutput struct {
@@ -30,6 +32,13 @@ type Agent struct {
// Interval at which to gather information
Interval Duration
// Interval at which to flush data
FlushInterval Duration
// TODO(cam): Remove UTC and Precision parameters, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatability
// Option for outputting data in UTC
UTC bool `toml:"utc"`
@@ -50,10 +59,11 @@ type Agent struct {
// NewAgent returns an Agent struct based off the given Config
func NewAgent(config *Config) (*Agent, error) {
agent := &Agent{
Config: config,
Interval: Duration{10 * time.Second},
UTC: true,
Precision: "s",
Config: config,
Interval: Duration{10 * time.Second},
FlushInterval: Duration{10 * time.Second},
UTC: true,
Precision: "s",
}
// Apply the toml table to the agent config, overriding defaults
@@ -170,11 +180,9 @@ func (a *Agent) LoadPlugins(filters []string) ([]string, error) {
return names, nil
}
// crankParallel runs the plugins that are using the same reporting interval
// gatherParallel runs the plugins that are using the same reporting interval
// as the telegraf agent.
func (a *Agent) crankParallel() error {
points := make(chan *BatchPoints, len(a.plugins))
func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
var wg sync.WaitGroup
start := time.Now()
@@ -189,100 +197,51 @@ func (a *Agent) crankParallel() error {
go func(plugin *runningPlugin) {
defer wg.Done()
var bp BatchPoints
bp.Debug = a.Debug
bp.Prefix = plugin.name + "_"
bp.Config = plugin.config
bp.Precision = a.Precision
bp.Tags = a.Config.Tags
acc := NewAccumulator(plugin.config, pointChan)
acc.SetDebug(a.Debug)
acc.SetPrefix(plugin.name + "_")
acc.SetDefaultTags(a.Config.Tags)
if err := plugin.plugin.Gather(&bp); err != nil {
if err := plugin.plugin.Gather(acc); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.name, err)
}
points <- &bp
}(plugin)
}
wg.Wait()
close(points)
var bp BatchPoints
bp.Time = time.Now()
if a.UTC {
bp.Time = bp.Time.UTC()
}
bp.Precision = a.Precision
for sub := range points {
bp.Points = append(bp.Points, sub.Points...)
}
elapsed := time.Since(start)
log.Printf("Cranking default (%s) interval, gathered %d metrics from %d plugins in %s\n",
a.Interval, len(bp.Points), counter, elapsed)
return a.flush(&bp)
log.Printf("Default (%s) interval, gathered metrics from %d plugins in %s\n",
a.Interval, counter, elapsed)
return nil
}
// crank is mostly for test purposes.
func (a *Agent) crank() error {
var bp BatchPoints
bp.Debug = a.Debug
bp.Precision = a.Precision
for _, plugin := range a.plugins {
bp.Prefix = plugin.name + "_"
bp.Config = plugin.config
err := plugin.plugin.Gather(&bp)
if err != nil {
return err
}
}
bp.Tags = a.Config.Tags
bp.Time = time.Now()
if a.UTC {
bp.Time = bp.Time.UTC()
}
return a.flush(&bp)
}
// crankSeparate runs the plugins that have been configured with their own
// gatherSeparate runs the plugins that have been configured with their own
// reporting interval.
func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
func (a *Agent) gatherSeparate(
shutdown chan struct{},
plugin *runningPlugin,
pointChan chan *client.Point,
) error {
ticker := time.NewTicker(plugin.config.Interval)
for {
var bp BatchPoints
var outerr error
start := time.Now()
bp.Debug = a.Debug
acc := NewAccumulator(plugin.config, pointChan)
acc.SetDebug(a.Debug)
acc.SetPrefix(plugin.name + "_")
acc.SetDefaultTags(a.Config.Tags)
bp.Prefix = plugin.name + "_"
bp.Config = plugin.config
bp.Precision = a.Precision
bp.Tags = a.Config.Tags
if err := plugin.plugin.Gather(&bp); err != nil {
if err := plugin.plugin.Gather(acc); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.name, err)
outerr = errors.New("Error encountered processing plugins & outputs")
}
bp.Time = time.Now()
if a.UTC {
bp.Time = bp.Time.UTC()
}
elapsed := time.Since(start)
log.Printf("Cranking separate (%s) interval, gathered %d metrics from %s in %s\n",
plugin.config.Interval, len(bp.Points), plugin.name, elapsed)
if err := a.flush(&bp); err != nil {
outerr = errors.New("Error encountered processing plugins & outputs")
}
log.Printf("Separate (%s) interval, gathered metrics from %s in %s\n",
plugin.config.Interval, plugin.name, elapsed)
if outerr != nil {
return outerr
@@ -297,20 +256,55 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
}
}
func (a *Agent) flush(bp *BatchPoints) error {
// Test verifies that we can 'Gather' from all plugins with their configured
// Config struct
func (a *Agent) Test() error {
shutdown := make(chan struct{})
defer close(shutdown)
pointChan := make(chan *client.Point)
go a.flusher(shutdown, pointChan)
for _, plugin := range a.plugins {
acc := NewAccumulator(plugin.config, pointChan)
acc.SetDebug(true)
acc.SetPrefix(plugin.name + "_")
fmt.Printf("* Plugin: %s, Collection 1\n", plugin.name)
if plugin.config.Interval != 0 {
fmt.Printf("* Internal: %s\n", plugin.config.Interval)
}
if err := plugin.plugin.Gather(acc); err != nil {
return err
}
// Special instructions for some plugins. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
switch plugin.name {
case "cpu":
time.Sleep(500 * time.Millisecond)
fmt.Printf("* Plugin: %s, Collection 2\n", plugin.name)
if err := plugin.plugin.Gather(acc); err != nil {
return err
}
}
}
return nil
}
func (a *Agent) flush(points []*client.Point) error {
var wg sync.WaitGroup
var outerr error
for _, o := range a.outputs {
wg.Add(1)
// Copy BatchPoints
bpc := bp.deepcopy()
go func(ro *runningOutput) {
defer wg.Done()
// Log all output errors:
if err := ro.output.Write(bpc.BatchPoints); err != nil {
if err := ro.output.Write(points); err != nil {
log.Printf("Error in output [%s]: %s", ro.name, err)
outerr = errors.New("Error encountered flushing outputs")
}
@@ -321,45 +315,44 @@ func (a *Agent) flush(bp *BatchPoints) error {
return outerr
}
// Test verifies that we can 'Gather' from all plugins with their configured
// Config struct
func (a *Agent) Test() error {
var acc BatchPoints
acc.Debug = true
for _, plugin := range a.plugins {
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
fmt.Printf("* Plugin: %s, Collection 1\n", plugin.name)
if plugin.config.Interval != 0 {
fmt.Printf("* Internal: %s\n", plugin.config.Interval)
}
if err := plugin.plugin.Gather(&acc); err != nil {
return err
}
// Special instructions for some plugins. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
switch plugin.name {
case "cpu":
time.Sleep(500 * time.Millisecond)
fmt.Printf("* Plugin: %s, Collection 2\n", plugin.name)
if err := plugin.plugin.Gather(&acc); err != nil {
return err
// flusher monitors the points input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error {
ticker := time.NewTicker(a.FlushInterval.Duration)
points := make([]*client.Point, 0)
for {
select {
case <-shutdown:
return nil
case <-ticker.C:
start := time.Now()
if err := a.flush(points); err != nil {
log.Printf(err.Error())
}
elapsed := time.Since(start)
log.Printf("Flushed %d metrics in %s\n", len(points), elapsed)
points = make([]*client.Point, 0)
case pt := <-pointChan:
points = append(points, pt)
}
}
return nil
}
// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup
// channel shared between all plugin threads for accumulating points
pointChan := make(chan *client.Point, 1000)
wg.Add(1)
go func() {
defer wg.Done()
if err := a.flusher(shutdown, pointChan); err != nil {
log.Printf("Flusher routine failed, exiting: %s\n", err.Error())
close(shutdown)
}
}()
for _, plugin := range a.plugins {
// Start service of any ServicePlugins
@@ -374,12 +367,12 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}
// Special handling for plugins that have their own collection interval
// configured. Default intervals are handled below with crankParallel
// configured. Default intervals are handled below with gatherParallel
if plugin.config.Interval != 0 {
wg.Add(1)
go func(plugin *runningPlugin) {
defer wg.Done()
if err := a.crankSeparate(shutdown, plugin); err != nil {
if err := a.gatherSeparate(shutdown, plugin, pointChan); err != nil {
log.Printf(err.Error())
}
}(plugin)
@@ -391,7 +384,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
ticker := time.NewTicker(a.Interval.Duration)
for {
if err := a.crankParallel(); err != nil {
if err := a.gatherParallel(pointChan); err != nil {
log.Printf(err.Error())
}