diff --git a/agent/agent.go b/agent/agent.go index 6b6714760..4ce7a3aab 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/models" + "github.com/influxdata/telegraf/internal/webserver" ) // Agent runs telegraf and collects data based on the given config @@ -304,6 +305,13 @@ func jitterInterval(ininterval, injitter time.Duration) time.Duration { return outinterval } +func createAccumulatorForInput(a *Agent, input *internal_models.RunningInput, metricC chan telegraf.Metric) *accumulator { + acc := NewAccumulator(input.Config, metricC) + acc.SetDebug(a.Config.Agent.Debug) + acc.setDefaultTags(a.Config.Tags) + return acc +} + // Run runs the agent daemon, gathering every Interval func (a *Agent) Run(shutdown chan struct{}) error { var wg sync.WaitGroup @@ -319,22 +327,30 @@ func (a *Agent) Run(shutdown chan struct{}) error { // channel shared between all input threads for accumulating metrics metricC := make(chan telegraf.Metric, 10000) + webserver := webserver.NewWebserver() + webserver.ServiceAddress = ":1619" for _, input := range a.Config.Inputs { // Start service of any ServicePlugins switch p := input.Input.(type) { case telegraf.ServiceInput: - acc := NewAccumulator(input.Config, metricC) - acc.SetDebug(a.Config.Agent.Debug) - acc.setDefaultTags(a.Config.Tags) + acc := createAccumulatorForInput(a, input, metricC) if err := p.Start(acc); err != nil { log.Printf("Service for input %s failed to start, exiting\n%s\n", input.Name, err.Error()) return err } defer p.Stop() + case telegraf.WebhookInput: + acc := createAccumulatorForInput(a, input, metricC) + if err := p.Register(webserver.Router(), acc); err != nil { + log.Printf("Webhook for input %s failed to start, exiting\n%s\n", + input.Name, err.Error()) + return err + } } } + webserver.Start() // Round collection to nearest interval by sleeping if a.Config.Agent.RoundInterval { diff --git a/input.go b/input.go index f7e1493e2..db799023a 100644 --- a/input.go +++ b/input.go @@ -1,5 +1,9 @@ package telegraf +import ( + "github.com/gorilla/mux" +) + type Input interface { // SampleConfig returns the default configuration of the Input SampleConfig() string @@ -29,3 +33,18 @@ type ServiceInput interface { // Stop stops the services and closes any necessary channels and connections Stop() } + +type WebhookInput interface { + // SampleConfig returns the default configuration of the Input + SampleConfig() string + + // Description returns a one-sentence description on the Input + Description() string + + // Gather takes in an accumulator and adds the metrics that the Input + // gathers. This is called every "interval" + Gather(Accumulator) error + + // Register + Register(*mux.Router, Accumulator) error +} diff --git a/internal/webserver/webserver.go b/internal/webserver/webserver.go new file mode 100644 index 000000000..07909c339 --- /dev/null +++ b/internal/webserver/webserver.go @@ -0,0 +1,35 @@ +package webserver + +import ( + "fmt" + "log" + "net/http" + + "github.com/gorilla/mux" +) + +type Webserver struct { + ServiceAddress string + router *mux.Router +} + +func NewWebserver() *Webserver { + return &Webserver{router:mux.NewRouter()} +} + +func (wb *Webserver) Router() *mux.Router { + return wb.router +} + +func (wb *Webserver) Listen() { + err := http.ListenAndServe(fmt.Sprintf("%s", wb.ServiceAddress), wb.router) + if err != nil { + log.Printf("Error starting server: %v", err) + } +} + +func (wb *Webserver) Start() error { + go wb.Listen() + log.Printf("Started the webhook server on %s\n", wb.ServiceAddress) + return nil +}