Fix address already in use with webhooks input during reload (#3206)
This commit is contained in:
parent
8e333492f8
commit
5079187fde
|
@ -3,6 +3,7 @@ package webhooks
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
||||||
|
@ -33,6 +34,8 @@ type Webhooks struct {
|
||||||
Mandrill *mandrill.MandrillWebhook
|
Mandrill *mandrill.MandrillWebhook
|
||||||
Rollbar *rollbar.RollbarWebhook
|
Rollbar *rollbar.RollbarWebhook
|
||||||
Papertrail *papertrail.PapertrailWebhook
|
Papertrail *papertrail.PapertrailWebhook
|
||||||
|
|
||||||
|
srv *http.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWebhooks() *Webhooks {
|
func NewWebhooks() *Webhooks {
|
||||||
|
@ -70,19 +73,6 @@ func (wb *Webhooks) Gather(_ telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wb *Webhooks) Listen(acc telegraf.Accumulator) {
|
|
||||||
r := mux.NewRouter()
|
|
||||||
|
|
||||||
for _, webhook := range wb.AvailableWebhooks() {
|
|
||||||
webhook.Register(r, acc)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := http.ListenAndServe(fmt.Sprintf("%s", wb.ServiceAddress), r)
|
|
||||||
if err != nil {
|
|
||||||
acc.AddError(fmt.Errorf("E! Error starting server: %v", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Looks for fields which implement Webhook interface
|
// Looks for fields which implement Webhook interface
|
||||||
func (wb *Webhooks) AvailableWebhooks() []Webhook {
|
func (wb *Webhooks) AvailableWebhooks() []Webhook {
|
||||||
webhooks := make([]Webhook, 0)
|
webhooks := make([]Webhook, 0)
|
||||||
|
@ -105,11 +95,35 @@ func (wb *Webhooks) AvailableWebhooks() []Webhook {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wb *Webhooks) Start(acc telegraf.Accumulator) error {
|
func (wb *Webhooks) Start(acc telegraf.Accumulator) error {
|
||||||
go wb.Listen(acc)
|
r := mux.NewRouter()
|
||||||
|
|
||||||
|
for _, webhook := range wb.AvailableWebhooks() {
|
||||||
|
webhook.Register(r, acc)
|
||||||
|
}
|
||||||
|
|
||||||
|
wb.srv = &http.Server{Handler: r}
|
||||||
|
|
||||||
|
ln, err := net.Listen("tcp", fmt.Sprintf("%s", wb.ServiceAddress))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("E! Error starting server: %v", err)
|
||||||
|
return err
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := wb.srv.Serve(ln); err != nil {
|
||||||
|
if err != http.ErrServerClosed {
|
||||||
|
acc.AddError(fmt.Errorf("E! Error listening: %v", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
log.Printf("I! Started the webhooks service on %s\n", wb.ServiceAddress)
|
log.Printf("I! Started the webhooks service on %s\n", wb.ServiceAddress)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rb *Webhooks) Stop() {
|
func (rb *Webhooks) Stop() {
|
||||||
|
rb.srv.Close()
|
||||||
log.Println("I! Stopping the Webhooks service")
|
log.Println("I! Stopping the Webhooks service")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue