Add webhook type support.

This commit is contained in:
François de Metz 2016-05-25 19:38:25 +02:00 committed by Cyril Duez
parent c2797c85d1
commit 2221344028
3 changed files with 73 additions and 3 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/internal/webserver"
) )
// Agent runs telegraf and collects data based on the given config // Agent runs telegraf and collects data based on the given config
@ -304,6 +305,13 @@ func jitterInterval(ininterval, injitter time.Duration) time.Duration {
return outinterval return outinterval
} }
func createAccumulatorForInput(a *Agent, input *internal_models.RunningInput, metricC chan telegraf.Metric) *accumulator {
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)
return acc
}
// Run runs the agent daemon, gathering every Interval // Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error { func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup var wg sync.WaitGroup
@ -319,22 +327,30 @@ func (a *Agent) Run(shutdown chan struct{}) error {
// channel shared between all input threads for accumulating metrics // channel shared between all input threads for accumulating metrics
metricC := make(chan telegraf.Metric, 10000) metricC := make(chan telegraf.Metric, 10000)
webserver := webserver.NewWebserver()
webserver.ServiceAddress = ":1619"
for _, input := range a.Config.Inputs { for _, input := range a.Config.Inputs {
// Start service of any ServicePlugins // Start service of any ServicePlugins
switch p := input.Input.(type) { switch p := input.Input.(type) {
case telegraf.ServiceInput: case telegraf.ServiceInput:
acc := NewAccumulator(input.Config, metricC) acc := createAccumulatorForInput(a, input, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil { if err := p.Start(acc); err != nil {
log.Printf("Service for input %s failed to start, exiting\n%s\n", log.Printf("Service for input %s failed to start, exiting\n%s\n",
input.Name, err.Error()) input.Name, err.Error())
return err return err
} }
defer p.Stop() defer p.Stop()
case telegraf.WebhookInput:
acc := createAccumulatorForInput(a, input, metricC)
if err := p.Register(webserver.Router(), acc); err != nil {
log.Printf("Webhook for input %s failed to start, exiting\n%s\n",
input.Name, err.Error())
return err
} }
} }
}
webserver.Start()
// Round collection to nearest interval by sleeping // Round collection to nearest interval by sleeping
if a.Config.Agent.RoundInterval { if a.Config.Agent.RoundInterval {

View File

@ -1,5 +1,9 @@
package telegraf package telegraf
import (
"github.com/gorilla/mux"
)
type Input interface { type Input interface {
// SampleConfig returns the default configuration of the Input // SampleConfig returns the default configuration of the Input
SampleConfig() string SampleConfig() string
@ -29,3 +33,18 @@ type ServiceInput interface {
// Stop stops the services and closes any necessary channels and connections // Stop stops the services and closes any necessary channels and connections
Stop() Stop()
} }
type WebhookInput interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string
// Description returns a one-sentence description on the Input
Description() string
// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
Gather(Accumulator) error
// Register
Register(*mux.Router, Accumulator) error
}

View File

@ -0,0 +1,35 @@
package webserver
import (
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
)
type Webserver struct {
ServiceAddress string
router *mux.Router
}
func NewWebserver() *Webserver {
return &Webserver{router:mux.NewRouter()}
}
func (wb *Webserver) Router() *mux.Router {
return wb.router
}
func (wb *Webserver) Listen() {
err := http.ListenAndServe(fmt.Sprintf("%s", wb.ServiceAddress), wb.router)
if err != nil {
log.Printf("Error starting server: %v", err)
}
}
func (wb *Webserver) Start() error {
go wb.Listen()
log.Printf("Started the webhook server on %s\n", wb.ServiceAddress)
return nil
}