diff --git a/plugins/inputs/webhooks/webhooks.go b/plugins/inputs/webhooks/webhooks.go index 9a86b7eff..698cde159 100644 --- a/plugins/inputs/webhooks/webhooks.go +++ b/plugins/inputs/webhooks/webhooks.go @@ -3,6 +3,7 @@ package webhooks import ( "fmt" "log" + "net" "net/http" "reflect" @@ -33,6 +34,8 @@ type Webhooks struct { Mandrill *mandrill.MandrillWebhook Rollbar *rollbar.RollbarWebhook Papertrail *papertrail.PapertrailWebhook + + srv *http.Server } func NewWebhooks() *Webhooks { @@ -70,19 +73,6 @@ func (wb *Webhooks) Gather(_ telegraf.Accumulator) error { return nil } -func (wb *Webhooks) Listen(acc telegraf.Accumulator) { - r := mux.NewRouter() - - for _, webhook := range wb.AvailableWebhooks() { - webhook.Register(r, acc) - } - - err := http.ListenAndServe(fmt.Sprintf("%s", wb.ServiceAddress), r) - if err != nil { - acc.AddError(fmt.Errorf("E! Error starting server: %v", err)) - } -} - // Looks for fields which implement Webhook interface func (wb *Webhooks) AvailableWebhooks() []Webhook { webhooks := make([]Webhook, 0) @@ -105,11 +95,35 @@ func (wb *Webhooks) AvailableWebhooks() []Webhook { } func (wb *Webhooks) Start(acc telegraf.Accumulator) error { - go wb.Listen(acc) + r := mux.NewRouter() + + for _, webhook := range wb.AvailableWebhooks() { + webhook.Register(r, acc) + } + + wb.srv = &http.Server{Handler: r} + + ln, err := net.Listen("tcp", fmt.Sprintf("%s", wb.ServiceAddress)) + if err != nil { + log.Fatalf("E! Error starting server: %v", err) + return err + + } + + go func() { + if err := wb.srv.Serve(ln); err != nil { + if err != http.ErrServerClosed { + acc.AddError(fmt.Errorf("E! Error listening: %v", err)) + } + } + }() + log.Printf("I! Started the webhooks service on %s\n", wb.ServiceAddress) + return nil } func (rb *Webhooks) Stop() { + rb.srv.Close() log.Println("I! Stopping the Webhooks service") }