From ec06b8755457b9ff12aaac1e5d97537b2b641d14 Mon Sep 17 00:00:00 2001 From: Thibault Cohen Date: Sun, 28 Feb 2016 04:37:03 -0500 Subject: [PATCH] Add poller mode to Telegraf --- Makefile | 6 +- agent/accumulator.go | 2 +- agent/agent.go | 12 +- agent/agent_test.go | 10 +- cmd/telegraf/telegraf.go | 80 ++- internal/config/config.go | 57 ++- .../config/testdata/telegraf-poller-bad.toml | 59 +++ internal/config/testdata/telegraf-poller.toml | 59 +++ poller/poller.go | 456 ++++++++++++++++++ poller/poller_test.go | 207 ++++++++ 10 files changed, 905 insertions(+), 43 deletions(-) create mode 100644 internal/config/testdata/telegraf-poller-bad.toml create mode 100644 internal/config/testdata/telegraf-poller.toml create mode 100644 poller/poller.go create mode 100644 poller/poller_test.go diff --git a/Makefile b/Makefile index ef316bd03..c056c391f 100644 --- a/Makefile +++ b/Makefile @@ -71,6 +71,7 @@ endif docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim + docker run --name poller -p "5673:5672" -d rabbitmq:3-management # Run docker containers necessary for CircleCI unit tests docker-run-circle: @@ -85,11 +86,12 @@ docker-run-circle: docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim + docker run --name poller -p "5673:5672" -d rabbitmq:3-management # Kill all docker containers, ignore errors docker-kill: - -docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp - -docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp + -docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp poller + -docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp poller # Run full unit tests using docker containers (includes setup and teardown) test: vet docker-kill docker-run diff --git a/agent/accumulator.go b/agent/accumulator.go index 7ec22cd7f..e064dfc42 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -165,7 +165,7 @@ func (ac *accumulator) SetDebug(debug bool) { ac.debug = debug } -func (ac *accumulator) setDefaultTags(tags map[string]string) { +func (ac *accumulator) SetDefaultTags(tags map[string]string) { ac.defaultTags = tags } diff --git a/agent/agent.go b/agent/agent.go index 8a8800cc2..0c1311fb6 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -121,7 +121,7 @@ func (a *Agent) gatherParallel(metricC chan telegraf.Metric) error { acc := NewAccumulator(input.Config, metricC) acc.SetDebug(a.Config.Agent.Debug) - acc.setDefaultTags(a.Config.Tags) + acc.SetDefaultTags(a.Config.Tags) if jitter != 0 { nanoSleep := rand.Int63n(jitter) @@ -172,7 +172,7 @@ func (a *Agent) gatherSeparate( acc := NewAccumulator(input.Config, metricC) acc.SetDebug(a.Config.Agent.Debug) - acc.setDefaultTags(a.Config.Tags) + acc.SetDefaultTags(a.Config.Tags) if err := input.Input.Gather(acc); err != nil { log.Printf("Error in input [%s]: %s", input.Name, err) @@ -287,9 +287,9 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er } } -// jitterInterval applies the the interval jitter to the flush interval using +// JitterInterval applies the the interval jitter to the flush interval using // crypto/rand number generator -func jitterInterval(ininterval, injitter time.Duration) time.Duration { +func JitterInterval(ininterval, injitter time.Duration) time.Duration { var jitter int64 outinterval := ininterval if injitter.Nanoseconds() != 0 { @@ -312,7 +312,7 @@ func jitterInterval(ininterval, injitter time.Duration) time.Duration { func (a *Agent) Run(shutdown chan struct{}) error { var wg sync.WaitGroup - a.Config.Agent.FlushInterval.Duration = jitterInterval( + a.Config.Agent.FlushInterval.Duration = JitterInterval( a.Config.Agent.FlushInterval.Duration, a.Config.Agent.FlushJitter.Duration) @@ -330,7 +330,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { case telegraf.ServiceInput: acc := NewAccumulator(input.Config, metricC) acc.SetDebug(a.Config.Agent.Debug) - acc.setDefaultTags(a.Config.Tags) + acc.SetDefaultTags(a.Config.Tags) if err := p.Start(acc); err != nil { log.Printf("Service for input %s failed to start, exiting\n%s\n", input.Name, err.Error()) diff --git a/agent/agent_test.go b/agent/agent_test.go index 8bf8a150b..da6525ef0 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -103,7 +103,7 @@ func TestAgent_LoadOutput(t *testing.T) { } func TestAgent_ZeroJitter(t *testing.T) { - flushinterval := jitterInterval(time.Duration(10*time.Second), + flushinterval := JitterInterval(time.Duration(10*time.Second), time.Duration(0*time.Second)) actual := flushinterval.Nanoseconds() @@ -119,7 +119,7 @@ func TestAgent_ZeroInterval(t *testing.T) { max := time.Duration(5 * time.Second).Nanoseconds() for i := 0; i < 1000; i++ { - flushinterval := jitterInterval(time.Duration(0*time.Second), + flushinterval := JitterInterval(time.Duration(0*time.Second), time.Duration(5*time.Second)) actual := flushinterval.Nanoseconds() @@ -135,7 +135,7 @@ func TestAgent_ZeroInterval(t *testing.T) { } func TestAgent_ZeroBoth(t *testing.T) { - flushinterval := jitterInterval(time.Duration(0*time.Second), + flushinterval := JitterInterval(time.Duration(0*time.Second), time.Duration(0*time.Second)) actual := flushinterval @@ -150,7 +150,7 @@ func TestAgent_JitterMax(t *testing.T) { max := time.Duration(32 * time.Second).Nanoseconds() for i := 0; i < 1000; i++ { - flushinterval := jitterInterval(time.Duration(30*time.Second), + flushinterval := JitterInterval(time.Duration(30*time.Second), time.Duration(2*time.Second)) actual := flushinterval.Nanoseconds() if actual > max { @@ -164,7 +164,7 @@ func TestAgent_JitterMin(t *testing.T) { min := time.Duration(30 * time.Second).Nanoseconds() for i := 0; i < 1000; i++ { - flushinterval := jitterInterval(time.Duration(30*time.Second), + flushinterval := JitterInterval(time.Duration(30*time.Second), time.Duration(2*time.Second)) actual := flushinterval.Nanoseconds() if actual < min { diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index d54aaa4e3..59ee0b5a9 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -15,6 +15,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/all" "github.com/influxdata/telegraf/plugins/outputs" _ "github.com/influxdata/telegraf/plugins/outputs/all" + "github.com/influxdata/telegraf/poller" ) var fDebug = flag.Bool("debug", false, @@ -191,34 +192,63 @@ func main() { if len(c.Outputs) == 0 { log.Fatalf("Error: no outputs found, did you provide a valid config file?") } - if len(c.Inputs) == 0 { - log.Fatalf("Error: no inputs found, did you provide a valid config file?") + + if len(c.Inputs) == 0 && c.Poller.AMQPUrl == "" { + log.Fatalf("Error: no inputs found and poller node disabled, did you provide a valid config file?") } - ag, err := agent.NewAgent(c) - if err != nil { - log.Fatal(err) - } - - if *fDebug { - ag.Config.Agent.Debug = true - } - - if *fQuiet { - ag.Config.Agent.Quiet = true - } - - if *fTest { - err = ag.Test() + // TODO: Do we want agent and poller mode enabled in the same time ? + var ag *agent.Agent + var po *poller.Poller + if c.Poller.AMQPUrl == "" { + // Agent + if c.Agent == nil { + log.Fatal("err") + } + ag, err = agent.NewAgent(c) if err != nil { log.Fatal(err) } - return - } - err = ag.Connect() - if err != nil { - log.Fatal(err) + if *fDebug { + ag.Config.Agent.Debug = true + } + + if *fQuiet { + ag.Config.Agent.Quiet = true + } + + if *fTest { + err = ag.Test() + if err != nil { + log.Fatal(err) + } + return + } + + err = ag.Connect() + if err != nil { + log.Fatal(err) + } + } else { + // Poller + po, err = poller.NewPoller(c) + if err != nil { + log.Fatal(err) + } + + if *fDebug { + po.Config.Poller.Debug = true + } + + if *fQuiet { + po.Config.Poller.Quiet = true + } + + err = po.Connect() + if err != nil { + log.Fatal(err) + } } shutdown := make(chan struct{}) @@ -253,7 +283,11 @@ func main() { f.Close() } - ag.Run(shutdown) + if c.Poller.AMQPUrl == "" { + ag.Run(shutdown) + } else { + po.Run(shutdown) + } } } diff --git a/internal/config/config.go b/internal/config/config.go index f64e0a56a..51a569429 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -31,6 +31,7 @@ type Config struct { OutputFilters []string Agent *AgentConfig + Poller *PollerConfig Inputs []*internal_models.RunningInput Outputs []*internal_models.RunningOutput } @@ -44,6 +45,11 @@ func NewConfig() *Config { FlushInterval: internal.Duration{Duration: 10 * time.Second}, FlushJitter: internal.Duration{Duration: 5 * time.Second}, }, + // Poller defaults: + Poller: &PollerConfig{ + FlushInterval: internal.Duration{Duration: 10 * time.Second}, + FlushJitter: internal.Duration{Duration: 5 * time.Second}, + }, Tags: make(map[string]string), Inputs: make([]*internal_models.RunningInput, 0), @@ -101,6 +107,40 @@ type AgentConfig struct { Hostname string } +type PollerConfig struct { + // Rabbitmq url amqp://guest:guest@localhost:5672/ + AMQPUrl string `toml:"AMQPurl"` + + // Rabbitmq labels + AMQPlabels []string `toml:"AMQPlabels"` + + // FlushInterval is the Interval at which to flush data + FlushInterval internal.Duration + + // FlushJitter Jitters the flush interval by a random amount. + // This is primarily to avoid large write spikes for users running a large + // number of telegraf instances. + // ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s + FlushJitter internal.Duration + + // MetricBufferLimit is the max number of metrics that each output plugin + // will cache. The buffer is cleared when a successful write occurs. When + // full, the oldest metrics will be overwritten. + MetricBufferLimit int + + // FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever + // it fills up, regardless of FlushInterval. Setting this option to true + // does _not_ deactivate FlushInterval. + FlushBufferWhenFull bool + + // Debug is the option for running in debug mode + Debug bool + + // Quiet is the option for running in quiet mode + Quiet bool + Hostname string +} + // Inputs returns a list of strings of the configured inputs. func (c *Config) InputNames() []string { var name []string @@ -339,6 +379,11 @@ func (c *Config) LoadConfig(path string) error { log.Printf("Could not parse [agent] config\n") return err } + case "poller": + if err = config.UnmarshalTable(subTable, c.Poller); err != nil { + log.Printf("Could not parse [poller] config\n") + return err + } case "global_tags", "tags": if err = config.UnmarshalTable(subTable, c.Tags); err != nil { log.Printf("Could not parse [global_tags] config\n") @@ -449,14 +494,14 @@ func (c *Config) addInput(name string, table *ast.Table) error { // arbitrary types of input, so build the parser and set it. switch t := input.(type) { case parsers.ParserInput: - parser, err := buildParser(name, table) + parser, err := BuildParser(name, table) if err != nil { return err } t.SetParser(parser) } - pluginConfig, err := buildInput(name, table) + pluginConfig, err := BuildInput(name, table) if err != nil { return err } @@ -588,10 +633,10 @@ func buildFilter(tbl *ast.Table) internal_models.Filter { return f } -// buildInput parses input specific items from the ast.Table, +// BuildInput parses input specific items from the ast.Table, // builds the filter and returns a // internal_models.InputConfig to be inserted into internal_models.RunningInput -func buildInput(name string, tbl *ast.Table) (*internal_models.InputConfig, error) { +func BuildInput(name string, tbl *ast.Table) (*internal_models.InputConfig, error) { cp := &internal_models.InputConfig{Name: name} if node, ok := tbl.Fields["interval"]; ok { if kv, ok := node.(*ast.KeyValue); ok { @@ -648,10 +693,10 @@ func buildInput(name string, tbl *ast.Table) (*internal_models.InputConfig, erro return cp, nil } -// buildParser grabs the necessary entries from the ast.Table for creating +// BuildParser grabs the necessary entries from the ast.Table for creating // a parsers.Parser object, and creates it, which can then be added onto // an Input object. -func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { +func BuildParser(name string, tbl *ast.Table) (parsers.Parser, error) { c := &parsers.Config{} if node, ok := tbl.Fields["data_format"]; ok { diff --git a/internal/config/testdata/telegraf-poller-bad.toml b/internal/config/testdata/telegraf-poller-bad.toml new file mode 100644 index 000000000..b082fbb67 --- /dev/null +++ b/internal/config/testdata/telegraf-poller-bad.toml @@ -0,0 +1,59 @@ +# Telegraf configuration + +# Telegraf is entirely plugin driven. All metrics are gathered from the +# declared inputs. + +# Even if a plugin has no configuration, it must be declared in here +# to be active. Declaring a plugin means just specifying the name +# as a section with no variables. To deactivate a plugin, comment +# out the name and any variables. + +# Use 'telegraf -config telegraf.toml -test' to see what metrics a config +# file would generate. + +# One rule that plugins conform to is wherever a connection string +# can be passed, the values '' and 'localhost' are treated specially. +# They indicate to the plugin to use their own builtin configuration to +# connect to the local system. + +# NOTE: The configuration has a few required parameters. They are marked +# with 'required'. Be sure to edit those to make this configuration work. + +# Tags can also be specified via a normal map, but only one form at a time: +[global_tags] + dc = "us-east-1" + +[poller] + AMQPurl = "amqp://guest:guest@127.0.0.1:5674/" + AMQPlabels = ["label1"] + + # run telegraf in debug mode + debug = false + + # Override default hostname, if empty use os.Hostname() + hostname = "" + + ## Default flushing interval for all outputs. You shouldn't set this below + ## interval. Maximum flush_interval will be flush_interval + flush_jitter + flush_interval = "3s" + ## Jitter the flush interval by a random amount. This is primarily to avoid + ## large write spikes for users running a large number of telegraf instances. + ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s + flush_jitter = "0s" + + +############################################################################### +# OUTPUTS # +############################################################################### + +[[outputs.file]] + ## Files to write to, "stdout" is a specially handled file. + files = ["/tmp/metrics.out"] + + + ## Data format to output. This can be "influx" or "graphite" + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" + diff --git a/internal/config/testdata/telegraf-poller.toml b/internal/config/testdata/telegraf-poller.toml new file mode 100644 index 000000000..b097233b2 --- /dev/null +++ b/internal/config/testdata/telegraf-poller.toml @@ -0,0 +1,59 @@ +# Telegraf configuration + +# Telegraf is entirely plugin driven. All metrics are gathered from the +# declared inputs. + +# Even if a plugin has no configuration, it must be declared in here +# to be active. Declaring a plugin means just specifying the name +# as a section with no variables. To deactivate a plugin, comment +# out the name and any variables. + +# Use 'telegraf -config telegraf.toml -test' to see what metrics a config +# file would generate. + +# One rule that plugins conform to is wherever a connection string +# can be passed, the values '' and 'localhost' are treated specially. +# They indicate to the plugin to use their own builtin configuration to +# connect to the local system. + +# NOTE: The configuration has a few required parameters. They are marked +# with 'required'. Be sure to edit those to make this configuration work. + +# Tags can also be specified via a normal map, but only one form at a time: +[global_tags] + dc = "us-east-1" + +[poller] + AMQPurl = "amqp://guest:guest@127.0.0.1:5673/" + AMQPlabels = ["label1"] + + # run telegraf in debug mode + debug = false + + # Override default hostname, if empty use os.Hostname() + hostname = "" + + ## Default flushing interval for all outputs. You shouldn't set this below + ## interval. Maximum flush_interval will be flush_interval + flush_jitter + flush_interval = "3s" + ## Jitter the flush interval by a random amount. This is primarily to avoid + ## large write spikes for users running a large number of telegraf instances. + ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s + flush_jitter = "0s" + + +############################################################################### +# OUTPUTS # +############################################################################### + +[[outputs.file]] + ## Files to write to, "stdout" is a specially handled file. + files = ["/tmp/metrics.out"] + + + ## Data format to output. This can be "influx" or "graphite" + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" + diff --git a/poller/poller.go b/poller/poller.go new file mode 100644 index 000000000..792422899 --- /dev/null +++ b/poller/poller.go @@ -0,0 +1,456 @@ +package poller + +import ( + "errors" + "fmt" + "log" + "os" + "runtime" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/agent" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" + + "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/telegraf/internal/models" + + influxconfig "github.com/influxdata/config" + + "github.com/influxdata/toml" + "github.com/influxdata/toml/ast" + "github.com/streadway/amqp" +) + +// Poller runs telegraf and collects data based on the given config +type Poller struct { + Config *config.Config + AMQPconn *amqp.Connection + AMQPchannel *amqp.Channel + rawTasks chan []byte +} + +// NewPoller returns an Poller struct based off the given Config +func NewPoller(config *config.Config) (*Poller, error) { + p := &Poller{ + Config: config, + } + + if p.Config.Poller.Hostname == "" { + hostname, err := os.Hostname() + if err != nil { + return nil, err + } + + p.Config.Poller.Hostname = hostname + } + + config.Tags["host"] = p.Config.Poller.Hostname + + return p, nil +} + +func (p *Poller) getTask(conn *amqp.Connection, queueName string, consumerTag string, toto chan []byte) error { + // defer conn.Close() + tasks, err := p.AMQPchannel.Consume(queueName, consumerTag+"_"+queueName, false, false, false, false, nil) + if err != nil { + // TODO BETER HANDLING + return fmt.Errorf("basic.consume: %v", err) + } + for task := range tasks { + //log.Printf("%s \n", task) + toto <- task.Body + err := task.Nack(false, false) + if err != nil { + //TODO ???? + } + } + return nil +} + +// Conenctio AMQP server +func (p *Poller) AMQPConnect() error { + p.rawTasks = make(chan []byte) + var err error + // Prepare config + // TODO Handle vhost + conf := amqp.Config{ + // Vhost: "/telegraf", + Heartbeat: time.Duration(0) * time.Second, + } + // Dial server + p.AMQPconn, err = amqp.DialConfig(p.Config.Poller.AMQPUrl, conf) + if err != nil { + return err + } + return nil +} + +// Create Channel +func (p *Poller) AMQPCreateChannel() error { + var err error + // Create Channel + p.AMQPchannel, err = p.AMQPconn.Channel() + if err != nil { + return err + } + + for _, AMQPlabel := range p.Config.Poller.AMQPlabels { + // Subscribing to queue + go p.getTask(p.AMQPconn, AMQPlabel, p.Config.Poller.Hostname, p.rawTasks) + } + return nil +} + +// Connect connects to all configured outputs +func (p *Poller) Connect() error { + for _, o := range p.Config.Outputs { + o.Quiet = p.Config.Poller.Quiet + + switch ot := o.Output.(type) { + case telegraf.ServiceOutput: + if err := ot.Start(); err != nil { + log.Printf("Service for output %s failed to start, exiting\n%s\n", + o.Name, err.Error()) + return err + } + } + + if p.Config.Poller.Debug { + log.Printf("Attempting connection to output: %s\n", o.Name) + } + err := o.Output.Connect() + if err != nil { + log.Printf("Failed to connect to output %s, retrying in 15s, "+ + "error was '%s' \n", o.Name, err) + time.Sleep(15 * time.Second) + err = o.Output.Connect() + if err != nil { + return err + } + } + if p.Config.Poller.Debug { + log.Printf("Successfully connected to output: %s\n", o.Name) + } + } + return nil +} + +// Close closes the connection to all configured outputs +func (p *Poller) Close() error { + var err error + for _, o := range p.Config.Outputs { + err = o.Output.Close() + switch ot := o.Output.(type) { + case telegraf.ServiceOutput: + ot.Stop() + } + } + // TODO close AMQP connection + return err +} + +func panicRecover(input *internal_models.RunningInput) { + if err := recover(); err != nil { + trace := make([]byte, 2048) + runtime.Stack(trace, true) + log.Printf("FATAL: Input [%s] panicked: %s, Stack:\n%s\n", + input.Name, err, trace) + log.Println("PLEASE REPORT THIS PANIC ON GITHUB with " + + "stack trace, configuration, and OS information: " + + "https://github.com/influxdata/telegraf/issues/new") + } +} + +func (p *Poller) gather(input *internal_models.RunningInput, metricC chan telegraf.Metric) error { + defer panicRecover(input) + + var outerr error + start := time.Now() + acc := agent.NewAccumulator(input.Config, metricC) + acc.SetDebug(p.Config.Poller.Debug) + acc.SetDefaultTags(p.Config.Tags) + + if err := input.Input.Gather(acc); err != nil { + log.Printf("Error in input [%s]: %s", input.Name, err) + } + + elapsed := time.Since(start) + if !p.Config.Poller.Quiet { + log.Printf("Gathered metric, from polling, from %s in %s\n", + input.Name, elapsed) + } + + return outerr +} + +func (p *Poller) getInput(rawInput []byte) (*internal_models.RunningInput, error) { + // Transform rawInput from Message body to input plugin object + table, err := toml.Parse(rawInput) + if err != nil { + return nil, errors.New("invalid configuration") + } + + for name, val := range table.Fields { + subTable, ok := val.(*ast.Table) + if !ok { + return nil, errors.New("invalid configuration") + } + + switch name { + case "inputs", "plugins": + for pluginName, pluginVal := range subTable.Fields { + + name := pluginName + + var table *ast.Table + switch pluginSubTable := pluginVal.(type) { + case *ast.Table: + table = pluginSubTable + case []*ast.Table: + table = pluginSubTable[0] + // TODO handle this case + /* + for _, t := range pluginSubTable { + if err = c.addInput(pluginName, t); err != nil { + return err + } + }*/ + default: + return nil, fmt.Errorf("Unsupported config format: %s", + pluginName) + } + + // TODO factorize copy/paste from config/addInput + // Legacy support renaming io input to diskio + if name == "io" { + name = "diskio" + } + + creator, ok := inputs.Inputs[name] + if !ok { + return nil, fmt.Errorf("Undefined but requested input: %s", name) + } + input := creator() + + // If the input has a SetParser function, then this means it can accept + // arbitrary types of input, so build the parser and set it. + switch t := input.(type) { + case parsers.ParserInput: + parser, err := config.BuildParser(name, table) + if err != nil { + return nil, err + } + t.SetParser(parser) + } + + pluginConfig, err := config.BuildInput(name, table) + if err != nil { + return nil, err + } + + if err := influxconfig.UnmarshalTable(table, input); err != nil { + return nil, err + } + + rp := &internal_models.RunningInput{ + Name: name, + Input: input, + Config: pluginConfig, + } + + return rp, nil + } + default: + // TODO log bad conf + continue + } + } + return nil, nil +} + +// Test verifies that we can 'Gather' from all inputs with their configured +// Config struct +func (p *Poller) Test() error { + //TODO remove it ????? + shutdown := make(chan struct{}) + defer close(shutdown) + metricC := make(chan telegraf.Metric) + + // dummy receiver for the point channel + go func() { + for { + select { + case <-metricC: + // do nothing + case <-shutdown: + return + } + } + }() + + for _, input := range p.Config.Inputs { + acc := agent.NewAccumulator(input.Config, metricC) + acc.SetDebug(true) + + fmt.Printf("* Plugin: %s, Collection 1\n", input.Name) + if input.Config.Interval != 0 { + fmt.Printf("* Internal: %s\n", input.Config.Interval) + } + + if err := input.Input.Gather(acc); err != nil { + return err + } + + // Special instructions for some inputs. cpu, for example, needs to be + // run twice in order to return cpu usage percentages. + switch input.Name { + case "cpu", "mongodb", "procstat": + time.Sleep(500 * time.Millisecond) + fmt.Printf("* Plugin: %s, Collection 2\n", input.Name) + if err := input.Input.Gather(acc); err != nil { + return err + } + } + + } + return nil +} + +// flush writes a list of metrics to all configured outputs +func (p *Poller) flush() { + var wg sync.WaitGroup + + wg.Add(len(p.Config.Outputs)) + for _, o := range p.Config.Outputs { + go func(output *internal_models.RunningOutput) { + defer wg.Done() + err := output.Write() + if err != nil { + log.Printf("Error writing to output [%s]: %s\n", + output.Name, err.Error()) + } + }(o) + } + + wg.Wait() +} + +// flusher monitors the metrics input channel and flushes on the minimum interval +func (p *Poller) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error { + // Inelegant, but this sleep is to allow the Gather threads to run, so that + // the flusher will flush after metrics are collected. + time.Sleep(time.Millisecond * 200) + + ticker := time.NewTicker(p.Config.Poller.FlushInterval.Duration) + + for { + select { + case <-shutdown: + log.Println("Hang on, flushing any cached metrics before shutdown") + p.flush() + return nil + case <-ticker.C: + p.flush() + case m := <-metricC: + for _, o := range p.Config.Outputs { + o.AddMetric(m) + } + } + } +} + +// Run runs the agent daemon, gathering every Interval +func (p *Poller) Run(shutdown chan struct{}) error { + var wg sync.WaitGroup + + p.Config.Agent.FlushInterval.Duration = agent.JitterInterval( + p.Config.Agent.FlushInterval.Duration, + p.Config.Agent.FlushJitter.Duration) + + log.Printf("Agent Config: Debug:%#v, Quiet:%#v, Hostname:%#v, "+ + "Flush Interval:%s \n", + p.Config.Poller.Debug, p.Config.Poller.Quiet, + p.Config.Poller.Hostname, p.Config.Poller.FlushInterval.Duration) + + log.Print("Message queue connection\n") + err := p.AMQPConnect() + if err == nil { + log.Print("Channel creation\n") + err = p.AMQPCreateChannel() + } + if err == nil { + log.Print("Message queue connected\n") + } else { + log.Printf("Message queue connection error: %s\n", err) + } + + // channel shared between all input threads for accumulating metrics + metricC := make(chan telegraf.Metric, 10000) + + wg.Add(1) + go func() { + defer wg.Done() + if err := p.flusher(shutdown, metricC); err != nil { + log.Printf("Flusher routine failed, exiting: %s\n", err.Error()) + close(shutdown) + } + }() + + defer wg.Wait() + + c := make(chan *amqp.Error) +reconnection: + for { + // We need to be sure that channel is open + // TODO handle channelS!!! + if p.AMQPchannel != nil && p.AMQPconn != nil { + select { + case <-shutdown: + return nil + + case rawTask := <-p.rawTasks: + go func(rawTask []byte) { + // Get input obj from message + input, err := p.getInput(rawTask) + if err != nil { + log.Printf(err.Error()) + } else { + // Do the check + if err := p.gather(input, metricC); err != nil { + log.Printf(err.Error()) + } + } + }(rawTask) + case err := <-p.AMQPconn.NotifyClose(c): + // handle connection errors + // and reconnections + log.Printf("Connection error: %s\n", err) + break reconnection + case err := <-p.AMQPchannel.NotifyClose(c): + // handle channel errors + // and reconnections + log.Printf("Channel error: %s\n", err) + break reconnection + } + } else { + break + } + } + + // Handle restart + log.Print("Message queue reconnection in 3 seconds\n") + ticker := time.NewTicker(time.Duration(3) * time.Second) + select { + case <-shutdown: + return nil + case <-ticker.C: + } + // Send shutdown signal to restart routines + log.Print("Shutdown signal send to routines\n") + shutdown <- struct{}{} + + return p.Run(shutdown) +} diff --git a/poller/poller_test.go b/poller/poller_test.go new file mode 100644 index 000000000..2cd71b32e --- /dev/null +++ b/poller/poller_test.go @@ -0,0 +1,207 @@ +package poller + +import ( + "github.com/stretchr/testify/assert" + "log" + "os" + "testing" + "time" + + "github.com/influxdata/telegraf/internal/config" + + // needing to load the plugins + _ "github.com/influxdata/telegraf/plugins/inputs/all" + // needing to load the outputs + _ "github.com/influxdata/telegraf/plugins/outputs/all" + + "github.com/streadway/amqp" +) + +func AMQPcreation() { + // Connect + connection, _ := amqp.Dial("amqp://guest:guest@127.0.0.1:5673/") + // Channel + channel, _ := connection.Channel() + + // SPLIT_BY_TAGS_EXCHANGE + channel.ExchangeDeclare( + "SPLIT_BY_TAGS_EXCHANGE", // name + "headers", // type + true, // durable + false, // delete when unused + false, // internal + false, // no-wait + nil, // arguments + ) + // ENTRANCE + channel.ExchangeDeclare( + "ENTRANCE", // name + "fanout", // type + true, // durable + false, // delete when unused + false, // internal + false, // no-wait + nil, // arguments + ) + channel.ExchangeBind( + "SPLIT_BY_TAGS_EXCHANGE", + "", + "ENTRANCE", + false, + nil, + ) + //REQUEUE_AFTER_WAIT_EXCHANGE + channel.ExchangeDeclare( + "REQUEUE_AFTER_WAIT_EXCHANGE", + "fanout", // type + true, // durable + false, // delete when unused + false, // internal + false, // no-wait + nil, // arguments + ) + channel.ExchangeBind( + "SPLIT_BY_TAGS_EXCHANGE", + "", + "REQUEUE_AFTER_WAIT_EXCHANGE", + false, + nil, + ) + //SPLIT_BY_INTERVAL_EXCHANGE + channel.ExchangeDeclare( + "SPLIT_BY_INTERVAL_EXCHANGE", // name + "headers", // type + true, // durable + false, // delete when unused + false, // internal + false, // no-wait + nil, // arguments + ) + // QUEUE wait_queue_5s + args := amqp.Table{} + args["x-dead-letter-exchange"] = "REQUEUE_AFTER_WAIT_EXCHANGE" + args["x-message-ttl"] = int64(5000) + channel.QueueDeclare( + "wait_queue_5s", + true, + false, + false, + false, + args, + ) + // Purge queue + channel.QueuePurge("wait_queue_5s", false) + args = amqp.Table{} + args["interval"] = "5000" + args["x-match"] = "all" + channel.QueueBind( + "wait_queue_5s", + "", + "SPLIT_BY_INTERVAL_EXCHANGE", + false, + args, + ) + // QUEUE label1 + args = amqp.Table{} + args["x-dead-letter-exchange"] = "SPLIT_BY_INTERVAL_EXCHANGE" + channel.QueueDeclare( + "label1", + true, + false, + false, + false, + args, + ) + // Purge queue + channel.QueuePurge("label1", false) + args = amqp.Table{} + args["label1"] = "value1" + args["x-match"] = "all" + channel.QueueBind( + "label1", + "", + "SPLIT_BY_TAGS_EXCHANGE", + false, + args, + ) + // Create tasks + msg := amqp.Publishing{ + Headers: amqp.Table{ + "interval": "5000", + "label1": "value1", + }, + Body: []byte("[[inputs.mem]]"), + } + channel.Publish( + "ENTRANCE", + "", + false, + false, + msg, + ) +} + +func TestPoller_Reconnection(t *testing.T) { + // Remove old file + os.Remove("/tmp/metrics.out") + // Create conf + c := config.NewConfig() + err := c.LoadConfig("../internal/config/testdata/telegraf-poller-bad.toml") + assert.NoError(t, err) + p, _ := NewPoller(c) + // Connect output + p.Connect() + // Prepare shutdown + shutdown := make(chan struct{}) + go func(sdchan chan struct{}) { + log.Println("Waiting 10s before shuting down poller") + time.Sleep(time.Duration(5) * time.Second) + sdchan <- struct{}{} + close(sdchan) + }(shutdown) + err = p.Run(shutdown) + assert.NoError(t, err) + p.Config.Outputs[0].Output.Close() + // Check output + f, err := os.Open("/tmp/metrics.out") + assert.NoError(t, err) + //if err != nil { + finfo, _ := f.Stat() + assert.True(t, finfo.Size() == int64(0)) + f.Close() + //} + +} + +func TestPoller_Polling(t *testing.T) { + // Remove old file + os.Remove("/tmp/metrics.out") + // Create AMQP flow + AMQPcreation() + // Create conf + c := config.NewConfig() + err := c.LoadConfig("../internal/config/testdata/telegraf-poller.toml") + assert.NoError(t, err) + p, _ := NewPoller(c) + // Connect output + p.Connect() + // Prepare shutdown + shutdown := make(chan struct{}) + go func(sdchan chan struct{}) { + log.Println("Waiting 10s before shuting down poller") + time.Sleep(time.Duration(10) * time.Second) + // time.Sleep(time.Duration(5) * time.Second) + sdchan <- struct{}{} + //p.Config.Outputs[0].Output.Close() + // p.Close() + close(sdchan) + }(shutdown) + err = p.Run(shutdown) + assert.NoError(t, err) + p.Config.Outputs[0].Output.Close() + // Check output + f, _ := os.Open("/tmp/metrics.out") + finfo, _ := f.Stat() + assert.True(t, finfo.Size() > int64(0)) + f.Close() +}