diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 1a386d97c..8c76718fa 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -19,7 +19,6 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch" _ "github.com/influxdata/telegraf/plugins/inputs/exec" _ "github.com/influxdata/telegraf/plugins/inputs/filestat" - _ "github.com/influxdata/telegraf/plugins/inputs/github_webhooks" _ "github.com/influxdata/telegraf/plugins/inputs/graylog" _ "github.com/influxdata/telegraf/plugins/inputs/haproxy" _ "github.com/influxdata/telegraf/plugins/inputs/http_response" @@ -56,7 +55,6 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/riak" - _ "github.com/influxdata/telegraf/plugins/inputs/rollbar_webhooks" _ "github.com/influxdata/telegraf/plugins/inputs/sensors" _ "github.com/influxdata/telegraf/plugins/inputs/snmp" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" @@ -69,6 +67,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" _ "github.com/influxdata/telegraf/plugins/inputs/udp_listener" _ "github.com/influxdata/telegraf/plugins/inputs/varnish" + _ "github.com/influxdata/telegraf/plugins/inputs/webhooks" _ "github.com/influxdata/telegraf/plugins/inputs/win_perf_counters" _ "github.com/influxdata/telegraf/plugins/inputs/zfs" _ "github.com/influxdata/telegraf/plugins/inputs/zookeeper" diff --git a/plugins/inputs/rollbar_webhooks/rollbar_webhooks.go b/plugins/inputs/rollbar_webhooks/rollbar_webhooks.go deleted file mode 100644 index 5e7dc8847..000000000 --- a/plugins/inputs/rollbar_webhooks/rollbar_webhooks.go +++ /dev/null @@ -1,119 +0,0 @@ -package rollbar_webhooks - -import ( - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "log" - "net/http" - "sync" - "time" - - "github.com/gorilla/mux" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/inputs" -) - -func init() { - inputs.Add("rollbar_webhooks", func() telegraf.Input { return NewRollbarWebhooks() }) -} - -type RollbarWebhooks struct { - ServiceAddress string - // Lock for the struct - sync.Mutex - // Events buffer to store events between Gather calls - events []Event -} - -func NewRollbarWebhooks() *RollbarWebhooks { - return &RollbarWebhooks{} -} - -func (rb *RollbarWebhooks) SampleConfig() string { - return ` - ## Address and port to host Webhook listener on - service_address = ":1619" -` -} - -func (rb *RollbarWebhooks) Description() string { - return "A Rollbar Webhook Event collector" -} - -func (rb *RollbarWebhooks) Gather(acc telegraf.Accumulator) error { - rb.Lock() - defer rb.Unlock() - for _, event := range rb.events { - acc.AddFields("rollbar_webhooks", event.Fields(), event.Tags(), time.Now()) - } - rb.events = make([]Event, 0) - return nil -} - -func (rb *RollbarWebhooks) Listen() { - r := mux.NewRouter() - r.HandleFunc("/", rb.eventHandler).Methods("POST") - err := http.ListenAndServe(fmt.Sprintf("%s", rb.ServiceAddress), r) - if err != nil { - log.Printf("Error starting server: %v", err) - } -} - -func (rb *RollbarWebhooks) Start(_ telegraf.Accumulator) error { - go rb.Listen() - log.Printf("Started the rollbar_webhooks service on %s\n", rb.ServiceAddress) - return nil -} - -func (rb *RollbarWebhooks) Stop() { - log.Println("Stopping the rbWebhooks service") -} - -func (rb *RollbarWebhooks) eventHandler(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - data, err := ioutil.ReadAll(r.Body) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - dummyEvent := &DummyEvent{} - err = json.Unmarshal(data, dummyEvent) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - event, err := NewEvent(dummyEvent, data) - if err != nil { - w.WriteHeader(http.StatusOK) - return - } - - rb.Lock() - rb.events = append(rb.events, event) - rb.Unlock() - - w.WriteHeader(http.StatusOK) -} - -func generateEvent(event Event, data []byte) (Event, error) { - err := json.Unmarshal(data, event) - if err != nil { - return nil, err - } - return event, nil -} - -func NewEvent(dummyEvent *DummyEvent, data []byte) (Event, error) { - switch dummyEvent.EventName { - case "new_item": - return generateEvent(&NewItem{}, data) - case "deploy": - return generateEvent(&Deploy{}, data) - default: - return nil, errors.New("Not implemented type: " + dummyEvent.EventName) - } -} diff --git a/plugins/inputs/webhooks/README.md b/plugins/inputs/webhooks/README.md new file mode 100644 index 000000000..550338523 --- /dev/null +++ b/plugins/inputs/webhooks/README.md @@ -0,0 +1,2 @@ +# Webhooks plugin + diff --git a/plugins/inputs/github_webhooks/README.md b/plugins/inputs/webhooks/github/README.md similarity index 100% rename from plugins/inputs/github_webhooks/README.md rename to plugins/inputs/webhooks/github/README.md diff --git a/plugins/inputs/github_webhooks/github_webhooks.go b/plugins/inputs/webhooks/github/github_webhooks.go similarity index 61% rename from plugins/inputs/github_webhooks/github_webhooks.go rename to plugins/inputs/webhooks/github/github_webhooks.go index 9e8fc22cd..b0cf0bec9 100644 --- a/plugins/inputs/github_webhooks/github_webhooks.go +++ b/plugins/inputs/webhooks/github/github_webhooks.go @@ -1,74 +1,33 @@ -package github_webhooks +package github import ( "encoding/json" - "fmt" "io/ioutil" "log" "net/http" - "sync" "github.com/gorilla/mux" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/inputs/webhooks/webhooks_models" ) func init() { - inputs.Add("github_webhooks", func() telegraf.Input { return &GithubWebhooks{} }) + webhooks_models.Add("github", func(path string) webhooks_models.Webhook { return NewGithubWebhooks(path) }) } type GithubWebhooks struct { - ServiceAddress string - // Lock for the struct - sync.Mutex - // Events buffer to store events between Gather calls - events []Event + Path string + acc telegraf.Accumulator } -func NewGithubWebhooks() *GithubWebhooks { - return &GithubWebhooks{} +func NewGithubWebhooks(path string) *GithubWebhooks { + return &GithubWebhooks{Path: path} } -func (gh *GithubWebhooks) SampleConfig() string { - return ` - ## Address and port to host Webhook listener on - service_address = ":1618" -` -} - -func (gh *GithubWebhooks) Description() string { - return "A Github Webhook Event collector" -} - -// Writes the points from <-gh.in to the Accumulator -func (gh *GithubWebhooks) Gather(acc telegraf.Accumulator) error { - gh.Lock() - defer gh.Unlock() - for _, event := range gh.events { - p := event.NewMetric() - acc.AddFields("github_webhooks", p.Fields(), p.Tags(), p.Time()) - } - gh.events = make([]Event, 0) - return nil -} - -func (gh *GithubWebhooks) Listen() { - r := mux.NewRouter() - r.HandleFunc("/", gh.eventHandler).Methods("POST") - err := http.ListenAndServe(fmt.Sprintf("%s", gh.ServiceAddress), r) - if err != nil { - log.Printf("Error starting server: %v", err) - } -} - -func (gh *GithubWebhooks) Start(_ telegraf.Accumulator) error { - go gh.Listen() - log.Printf("Started the github_webhooks service on %s\n", gh.ServiceAddress) - return nil -} - -func (gh *GithubWebhooks) Stop() { - log.Println("Stopping the ghWebhooks service") +func (gh *GithubWebhooks) Register(router *mux.Router, acc telegraf.Accumulator) { + router.HandleFunc(gh.Path, gh.eventHandler).Methods("POST") + log.Printf("Started the webhooks_github on %s\n", gh.Path) + gh.acc = acc } // Handles the / route @@ -85,9 +44,10 @@ func (gh *GithubWebhooks) eventHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } - gh.Lock() - gh.events = append(gh.events, e) - gh.Unlock() + + p := e.NewMetric() + gh.acc.AddFields("github_webhooks", p.Fields(), p.Tags(), p.Time()) + w.WriteHeader(http.StatusOK) } diff --git a/plugins/inputs/github_webhooks/github_webhooks_mock_json.go b/plugins/inputs/webhooks/github/github_webhooks_mock_json.go similarity index 99% rename from plugins/inputs/github_webhooks/github_webhooks_mock_json.go rename to plugins/inputs/webhooks/github/github_webhooks_mock_json.go index 386d62e65..91af9a330 100644 --- a/plugins/inputs/github_webhooks/github_webhooks_mock_json.go +++ b/plugins/inputs/webhooks/github/github_webhooks_mock_json.go @@ -1,4 +1,4 @@ -package github_webhooks +package github func CommitCommentEventJSON() string { return `{ diff --git a/plugins/inputs/github_webhooks/github_webhooks_models.go b/plugins/inputs/webhooks/github/github_webhooks_models.go similarity index 99% rename from plugins/inputs/github_webhooks/github_webhooks_models.go rename to plugins/inputs/webhooks/github/github_webhooks_models.go index 2902708c2..9cbcef9f4 100644 --- a/plugins/inputs/github_webhooks/github_webhooks_models.go +++ b/plugins/inputs/webhooks/github/github_webhooks_models.go @@ -1,4 +1,4 @@ -package github_webhooks +package github import ( "fmt" diff --git a/plugins/inputs/github_webhooks/github_webhooks_test.go b/plugins/inputs/webhooks/github/github_webhooks_test.go similarity index 91% rename from plugins/inputs/github_webhooks/github_webhooks_test.go rename to plugins/inputs/webhooks/github/github_webhooks_test.go index a71d68548..4af51b536 100644 --- a/plugins/inputs/github_webhooks/github_webhooks_test.go +++ b/plugins/inputs/webhooks/github/github_webhooks_test.go @@ -1,15 +1,19 @@ -package github_webhooks +package github import ( "net/http" "net/http/httptest" "strings" "testing" + + "github.com/influxdata/telegraf/testutil" ) func GithubWebhookRequest(event string, jsonString string, t *testing.T) { - gh := NewGithubWebhooks() - req, _ := http.NewRequest("POST", "/", strings.NewReader(jsonString)) + var acc testutil.Accumulator + gh := NewGithubWebhooks("/github") + gh.acc = &acc + req, _ := http.NewRequest("POST", "/github", strings.NewReader(jsonString)) req.Header.Add("X-Github-Event", event) w := httptest.NewRecorder() gh.eventHandler(w, req) diff --git a/plugins/inputs/rollbar_webhooks/README.md b/plugins/inputs/webhooks/rollbar/README.md similarity index 100% rename from plugins/inputs/rollbar_webhooks/README.md rename to plugins/inputs/webhooks/rollbar/README.md diff --git a/plugins/inputs/webhooks/rollbar/rollbar_webhooks.go b/plugins/inputs/webhooks/rollbar/rollbar_webhooks.go new file mode 100644 index 000000000..985d5aa50 --- /dev/null +++ b/plugins/inputs/webhooks/rollbar/rollbar_webhooks.go @@ -0,0 +1,79 @@ +package rollbar + +import ( + "encoding/json" + "errors" + "io/ioutil" + "log" + "net/http" + "time" + + "github.com/gorilla/mux" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs/webhooks/webhooks_models" +) + +func init() { + webhooks_models.Add("rollbar", func(path string) webhooks_models.Webhook { return NewRollbarWebhooks(path) }) +} + +// FIXME: rename +type RollbarWebhooks struct { + Path string + acc telegraf.Accumulator +} + +func NewRollbarWebhooks(path string) *RollbarWebhooks { + return &RollbarWebhooks{Path: path} +} + +func (rb *RollbarWebhooks) Register(router *mux.Router, acc telegraf.Accumulator) { + router.HandleFunc(rb.Path, rb.eventHandler).Methods("POST") + log.Printf("Started the webhooks_rollbar on %s\n", rb.Path) + rb.acc = acc +} + +func (rb *RollbarWebhooks) eventHandler(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + data, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + dummyEvent := &DummyEvent{} + err = json.Unmarshal(data, dummyEvent) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + event, err := NewEvent(dummyEvent, data) + if err != nil { + w.WriteHeader(http.StatusOK) + return + } + + rb.acc.AddFields("rollbar_webhooks", event.Fields(), event.Tags(), time.Now()) + + w.WriteHeader(http.StatusOK) +} + +func generateEvent(event Event, data []byte) (Event, error) { + err := json.Unmarshal(data, event) + if err != nil { + return nil, err + } + return event, nil +} + +func NewEvent(dummyEvent *DummyEvent, data []byte) (Event, error) { + switch dummyEvent.EventName { + case "new_item": + return generateEvent(&NewItem{}, data) + case "deploy": + return generateEvent(&Deploy{}, data) + default: + return nil, errors.New("Not implemented type: " + dummyEvent.EventName) + } +} diff --git a/plugins/inputs/rollbar_webhooks/rollbar_webhooks_events.go b/plugins/inputs/webhooks/rollbar/rollbar_webhooks_events.go similarity index 98% rename from plugins/inputs/rollbar_webhooks/rollbar_webhooks_events.go rename to plugins/inputs/webhooks/rollbar/rollbar_webhooks_events.go index 8cccec336..e40e95858 100644 --- a/plugins/inputs/rollbar_webhooks/rollbar_webhooks_events.go +++ b/plugins/inputs/webhooks/rollbar/rollbar_webhooks_events.go @@ -1,4 +1,4 @@ -package rollbar_webhooks +package rollbar import "strconv" diff --git a/plugins/inputs/rollbar_webhooks/rollbar_webhooks_events_json_test.go b/plugins/inputs/webhooks/rollbar/rollbar_webhooks_events_json_test.go similarity index 98% rename from plugins/inputs/rollbar_webhooks/rollbar_webhooks_events_json_test.go rename to plugins/inputs/webhooks/rollbar/rollbar_webhooks_events_json_test.go index 99a6db8ff..5244a9d2f 100644 --- a/plugins/inputs/rollbar_webhooks/rollbar_webhooks_events_json_test.go +++ b/plugins/inputs/webhooks/rollbar/rollbar_webhooks_events_json_test.go @@ -1,4 +1,4 @@ -package rollbar_webhooks +package rollbar func NewItemJSON() string { return ` diff --git a/plugins/inputs/rollbar_webhooks/rollbar_webhooks_test.go b/plugins/inputs/webhooks/rollbar/rollbar_webhooks_test.go similarity index 90% rename from plugins/inputs/rollbar_webhooks/rollbar_webhooks_test.go rename to plugins/inputs/webhooks/rollbar/rollbar_webhooks_test.go index e0b183a8c..eda94ef74 100644 --- a/plugins/inputs/rollbar_webhooks/rollbar_webhooks_test.go +++ b/plugins/inputs/webhooks/rollbar/rollbar_webhooks_test.go @@ -1,4 +1,4 @@ -package rollbar_webhooks +package rollbar import ( "net/http" @@ -21,12 +21,12 @@ func postWebhooks(rb *RollbarWebhooks, eventBody string) *httptest.ResponseRecor func TestNewItem(t *testing.T) { var acc testutil.Accumulator - rb := NewRollbarWebhooks() + rb := NewRollbarWebhooks("/rollbar") + rb.acc = &acc resp := postWebhooks(rb, NewItemJSON()) if resp.Code != http.StatusOK { t.Errorf("POST new_item returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) } - rb.Gather(&acc) fields := map[string]interface{}{ "id": 272716944, @@ -45,12 +45,12 @@ func TestNewItem(t *testing.T) { func TestDeploy(t *testing.T) { var acc testutil.Accumulator - rb := NewRollbarWebhooks() + rb := NewRollbarWebhooks("/rollbar") + rb.acc = &acc resp := postWebhooks(rb, DeployJSON()) if resp.Code != http.StatusOK { t.Errorf("POST deploy returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) } - rb.Gather(&acc) fields := map[string]interface{}{ "id": 187585, @@ -66,7 +66,7 @@ func TestDeploy(t *testing.T) { } func TestUnknowItem(t *testing.T) { - rb := NewRollbarWebhooks() + rb := NewRollbarWebhooks("/rollbar") resp := postWebhooks(rb, UnknowJSON()) if resp.Code != http.StatusOK { t.Errorf("POST unknow returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) diff --git a/plugins/inputs/webhooks/webhooks.go b/plugins/inputs/webhooks/webhooks.go new file mode 100644 index 000000000..d0ac30ba9 --- /dev/null +++ b/plugins/inputs/webhooks/webhooks.go @@ -0,0 +1,73 @@ +package webhooks + +import ( + "fmt" + "log" + "net/http" + + "github.com/gorilla/mux" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + _ "github.com/influxdata/telegraf/plugins/inputs/webhooks/webhooks_all" + "github.com/influxdata/telegraf/plugins/inputs/webhooks/webhooks_models" +) + +func init() { + inputs.Add("webhooks", func() telegraf.Input { return NewWebhooks() }) +} + +type Webhooks struct { + ServiceAddress string + + Webhook []WebhookConfig +} + +type WebhookConfig struct { + Name string + Path string +} + +func NewWebhooks() *Webhooks { + return &Webhooks{} +} + +func (wb *Webhooks) SampleConfig() string { + return ` + ## Address and port to host Webhook listener on + service_address = ":1619" +` +} + +func (wb *Webhooks) Description() string { + return "A Webhooks Event collector" +} + +func (wb *Webhooks) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (wb *Webhooks) Listen(acc telegraf.Accumulator) { + r := mux.NewRouter() + for _, webhook := range wb.Webhook { + if plugin, ok := webhooks_models.Webhooks[webhook.Name]; ok { + sub := plugin(webhook.Path) + sub.Register(r, acc) + } else { + log.Printf("Webhook %s is unknow\n", webhook.Name) + } + } + err := http.ListenAndServe(fmt.Sprintf("%s", wb.ServiceAddress), r) + if err != nil { + log.Printf("Error starting server: %v", err) + } +} + +func (wb *Webhooks) Start(acc telegraf.Accumulator) error { + go wb.Listen(acc) + log.Printf("Started the webhooks service on %s\n", wb.ServiceAddress) + return nil +} + +func (rb *Webhooks) Stop() { + log.Println("Stopping the Webhooks service") +} diff --git a/plugins/inputs/webhooks/webhooks_all/all.go b/plugins/inputs/webhooks/webhooks_all/all.go new file mode 100644 index 000000000..b60566608 --- /dev/null +++ b/plugins/inputs/webhooks/webhooks_all/all.go @@ -0,0 +1,6 @@ +package webhooks_all + +import ( + _ "github.com/influxdata/telegraf/plugins/inputs/webhooks/github" + _ "github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar" +) diff --git a/plugins/inputs/webhooks/webhooks_models/models.go b/plugins/inputs/webhooks/webhooks_models/models.go new file mode 100644 index 000000000..b7a4c76f7 --- /dev/null +++ b/plugins/inputs/webhooks/webhooks_models/models.go @@ -0,0 +1,16 @@ +package webhooks_models + +import ( + "github.com/gorilla/mux" + "github.com/influxdata/telegraf" +) + +type Webhook interface { + Register(router *mux.Router, acc telegraf.Accumulator) +} + +var Webhooks map[string]func(string) Webhook = make(map[string]func(string) Webhook) + +func Add(name string, fun func(string) Webhook) { + Webhooks[name] = fun +}