Add new webhooks plugin that superseed github and rollbar plugins.

Signed-off-by: François de Metz <francois@stormz.me>
Signed-off-by: Cyril Duez <cyril@stormz.me>
This commit is contained in:
François de Metz 2016-05-27 17:27:54 +02:00
parent 8173338f8a
commit 6775719975
16 changed files with 209 additions and 189 deletions

View File

@ -19,7 +19,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch"
_ "github.com/influxdata/telegraf/plugins/inputs/exec"
_ "github.com/influxdata/telegraf/plugins/inputs/filestat"
_ "github.com/influxdata/telegraf/plugins/inputs/github_webhooks"
_ "github.com/influxdata/telegraf/plugins/inputs/graylog"
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/http_response"
@ -56,7 +55,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak"
_ "github.com/influxdata/telegraf/plugins/inputs/rollbar_webhooks"
_ "github.com/influxdata/telegraf/plugins/inputs/sensors"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
@ -69,6 +67,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/twemproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/udp_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/varnish"
_ "github.com/influxdata/telegraf/plugins/inputs/webhooks"
_ "github.com/influxdata/telegraf/plugins/inputs/win_perf_counters"
_ "github.com/influxdata/telegraf/plugins/inputs/zfs"
_ "github.com/influxdata/telegraf/plugins/inputs/zookeeper"

View File

@ -1,119 +0,0 @@
package rollbar_webhooks
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/mux"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
func init() {
inputs.Add("rollbar_webhooks", func() telegraf.Input { return NewRollbarWebhooks() })
}
type RollbarWebhooks struct {
ServiceAddress string
// Lock for the struct
sync.Mutex
// Events buffer to store events between Gather calls
events []Event
}
func NewRollbarWebhooks() *RollbarWebhooks {
return &RollbarWebhooks{}
}
func (rb *RollbarWebhooks) SampleConfig() string {
return `
## Address and port to host Webhook listener on
service_address = ":1619"
`
}
func (rb *RollbarWebhooks) Description() string {
return "A Rollbar Webhook Event collector"
}
func (rb *RollbarWebhooks) Gather(acc telegraf.Accumulator) error {
rb.Lock()
defer rb.Unlock()
for _, event := range rb.events {
acc.AddFields("rollbar_webhooks", event.Fields(), event.Tags(), time.Now())
}
rb.events = make([]Event, 0)
return nil
}
func (rb *RollbarWebhooks) Listen() {
r := mux.NewRouter()
r.HandleFunc("/", rb.eventHandler).Methods("POST")
err := http.ListenAndServe(fmt.Sprintf("%s", rb.ServiceAddress), r)
if err != nil {
log.Printf("Error starting server: %v", err)
}
}
func (rb *RollbarWebhooks) Start(_ telegraf.Accumulator) error {
go rb.Listen()
log.Printf("Started the rollbar_webhooks service on %s\n", rb.ServiceAddress)
return nil
}
func (rb *RollbarWebhooks) Stop() {
log.Println("Stopping the rbWebhooks service")
}
func (rb *RollbarWebhooks) eventHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
data, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
dummyEvent := &DummyEvent{}
err = json.Unmarshal(data, dummyEvent)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
event, err := NewEvent(dummyEvent, data)
if err != nil {
w.WriteHeader(http.StatusOK)
return
}
rb.Lock()
rb.events = append(rb.events, event)
rb.Unlock()
w.WriteHeader(http.StatusOK)
}
func generateEvent(event Event, data []byte) (Event, error) {
err := json.Unmarshal(data, event)
if err != nil {
return nil, err
}
return event, nil
}
func NewEvent(dummyEvent *DummyEvent, data []byte) (Event, error) {
switch dummyEvent.EventName {
case "new_item":
return generateEvent(&NewItem{}, data)
case "deploy":
return generateEvent(&Deploy{}, data)
default:
return nil, errors.New("Not implemented type: " + dummyEvent.EventName)
}
}

View File

@ -0,0 +1,2 @@
# Webhooks plugin

View File

@ -1,74 +1,33 @@
package github_webhooks
package github
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
"github.com/gorilla/mux"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/webhooks_models"
)
func init() {
inputs.Add("github_webhooks", func() telegraf.Input { return &GithubWebhooks{} })
webhooks_models.Add("github", func(path string) webhooks_models.Webhook { return NewGithubWebhooks(path) })
}
type GithubWebhooks struct {
ServiceAddress string
// Lock for the struct
sync.Mutex
// Events buffer to store events between Gather calls
events []Event
Path string
acc telegraf.Accumulator
}
func NewGithubWebhooks() *GithubWebhooks {
return &GithubWebhooks{}
func NewGithubWebhooks(path string) *GithubWebhooks {
return &GithubWebhooks{Path: path}
}
func (gh *GithubWebhooks) SampleConfig() string {
return `
## Address and port to host Webhook listener on
service_address = ":1618"
`
}
func (gh *GithubWebhooks) Description() string {
return "A Github Webhook Event collector"
}
// Writes the points from <-gh.in to the Accumulator
func (gh *GithubWebhooks) Gather(acc telegraf.Accumulator) error {
gh.Lock()
defer gh.Unlock()
for _, event := range gh.events {
p := event.NewMetric()
acc.AddFields("github_webhooks", p.Fields(), p.Tags(), p.Time())
}
gh.events = make([]Event, 0)
return nil
}
func (gh *GithubWebhooks) Listen() {
r := mux.NewRouter()
r.HandleFunc("/", gh.eventHandler).Methods("POST")
err := http.ListenAndServe(fmt.Sprintf("%s", gh.ServiceAddress), r)
if err != nil {
log.Printf("Error starting server: %v", err)
}
}
func (gh *GithubWebhooks) Start(_ telegraf.Accumulator) error {
go gh.Listen()
log.Printf("Started the github_webhooks service on %s\n", gh.ServiceAddress)
return nil
}
func (gh *GithubWebhooks) Stop() {
log.Println("Stopping the ghWebhooks service")
func (gh *GithubWebhooks) Register(router *mux.Router, acc telegraf.Accumulator) {
router.HandleFunc(gh.Path, gh.eventHandler).Methods("POST")
log.Printf("Started the webhooks_github on %s\n", gh.Path)
gh.acc = acc
}
// Handles the / route
@ -85,9 +44,10 @@ func (gh *GithubWebhooks) eventHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
return
}
gh.Lock()
gh.events = append(gh.events, e)
gh.Unlock()
p := e.NewMetric()
gh.acc.AddFields("github_webhooks", p.Fields(), p.Tags(), p.Time())
w.WriteHeader(http.StatusOK)
}

View File

@ -1,4 +1,4 @@
package github_webhooks
package github
func CommitCommentEventJSON() string {
return `{

View File

@ -1,4 +1,4 @@
package github_webhooks
package github
import (
"fmt"

View File

@ -1,15 +1,19 @@
package github_webhooks
package github
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/influxdata/telegraf/testutil"
)
func GithubWebhookRequest(event string, jsonString string, t *testing.T) {
gh := NewGithubWebhooks()
req, _ := http.NewRequest("POST", "/", strings.NewReader(jsonString))
var acc testutil.Accumulator
gh := NewGithubWebhooks("/github")
gh.acc = &acc
req, _ := http.NewRequest("POST", "/github", strings.NewReader(jsonString))
req.Header.Add("X-Github-Event", event)
w := httptest.NewRecorder()
gh.eventHandler(w, req)

View File

@ -0,0 +1,79 @@
package rollbar
import (
"encoding/json"
"errors"
"io/ioutil"
"log"
"net/http"
"time"
"github.com/gorilla/mux"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/webhooks_models"
)
func init() {
webhooks_models.Add("rollbar", func(path string) webhooks_models.Webhook { return NewRollbarWebhooks(path) })
}
// FIXME: rename
type RollbarWebhooks struct {
Path string
acc telegraf.Accumulator
}
func NewRollbarWebhooks(path string) *RollbarWebhooks {
return &RollbarWebhooks{Path: path}
}
func (rb *RollbarWebhooks) Register(router *mux.Router, acc telegraf.Accumulator) {
router.HandleFunc(rb.Path, rb.eventHandler).Methods("POST")
log.Printf("Started the webhooks_rollbar on %s\n", rb.Path)
rb.acc = acc
}
func (rb *RollbarWebhooks) eventHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
data, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
dummyEvent := &DummyEvent{}
err = json.Unmarshal(data, dummyEvent)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
event, err := NewEvent(dummyEvent, data)
if err != nil {
w.WriteHeader(http.StatusOK)
return
}
rb.acc.AddFields("rollbar_webhooks", event.Fields(), event.Tags(), time.Now())
w.WriteHeader(http.StatusOK)
}
func generateEvent(event Event, data []byte) (Event, error) {
err := json.Unmarshal(data, event)
if err != nil {
return nil, err
}
return event, nil
}
func NewEvent(dummyEvent *DummyEvent, data []byte) (Event, error) {
switch dummyEvent.EventName {
case "new_item":
return generateEvent(&NewItem{}, data)
case "deploy":
return generateEvent(&Deploy{}, data)
default:
return nil, errors.New("Not implemented type: " + dummyEvent.EventName)
}
}

View File

@ -1,4 +1,4 @@
package rollbar_webhooks
package rollbar
import "strconv"

View File

@ -1,4 +1,4 @@
package rollbar_webhooks
package rollbar
func NewItemJSON() string {
return `

View File

@ -1,4 +1,4 @@
package rollbar_webhooks
package rollbar
import (
"net/http"
@ -21,12 +21,12 @@ func postWebhooks(rb *RollbarWebhooks, eventBody string) *httptest.ResponseRecor
func TestNewItem(t *testing.T) {
var acc testutil.Accumulator
rb := NewRollbarWebhooks()
rb := NewRollbarWebhooks("/rollbar")
rb.acc = &acc
resp := postWebhooks(rb, NewItemJSON())
if resp.Code != http.StatusOK {
t.Errorf("POST new_item returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)
}
rb.Gather(&acc)
fields := map[string]interface{}{
"id": 272716944,
@ -45,12 +45,12 @@ func TestNewItem(t *testing.T) {
func TestDeploy(t *testing.T) {
var acc testutil.Accumulator
rb := NewRollbarWebhooks()
rb := NewRollbarWebhooks("/rollbar")
rb.acc = &acc
resp := postWebhooks(rb, DeployJSON())
if resp.Code != http.StatusOK {
t.Errorf("POST deploy returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)
}
rb.Gather(&acc)
fields := map[string]interface{}{
"id": 187585,
@ -66,7 +66,7 @@ func TestDeploy(t *testing.T) {
}
func TestUnknowItem(t *testing.T) {
rb := NewRollbarWebhooks()
rb := NewRollbarWebhooks("/rollbar")
resp := postWebhooks(rb, UnknowJSON())
if resp.Code != http.StatusOK {
t.Errorf("POST unknow returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)

View File

@ -0,0 +1,73 @@
package webhooks
import (
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/webhooks/webhooks_all"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/webhooks_models"
)
func init() {
inputs.Add("webhooks", func() telegraf.Input { return NewWebhooks() })
}
type Webhooks struct {
ServiceAddress string
Webhook []WebhookConfig
}
type WebhookConfig struct {
Name string
Path string
}
func NewWebhooks() *Webhooks {
return &Webhooks{}
}
func (wb *Webhooks) SampleConfig() string {
return `
## Address and port to host Webhook listener on
service_address = ":1619"
`
}
func (wb *Webhooks) Description() string {
return "A Webhooks Event collector"
}
func (wb *Webhooks) Gather(_ telegraf.Accumulator) error {
return nil
}
func (wb *Webhooks) Listen(acc telegraf.Accumulator) {
r := mux.NewRouter()
for _, webhook := range wb.Webhook {
if plugin, ok := webhooks_models.Webhooks[webhook.Name]; ok {
sub := plugin(webhook.Path)
sub.Register(r, acc)
} else {
log.Printf("Webhook %s is unknow\n", webhook.Name)
}
}
err := http.ListenAndServe(fmt.Sprintf("%s", wb.ServiceAddress), r)
if err != nil {
log.Printf("Error starting server: %v", err)
}
}
func (wb *Webhooks) Start(acc telegraf.Accumulator) error {
go wb.Listen(acc)
log.Printf("Started the webhooks service on %s\n", wb.ServiceAddress)
return nil
}
func (rb *Webhooks) Stop() {
log.Println("Stopping the Webhooks service")
}

View File

@ -0,0 +1,6 @@
package webhooks_all
import (
_ "github.com/influxdata/telegraf/plugins/inputs/webhooks/github"
_ "github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar"
)

View File

@ -0,0 +1,16 @@
package webhooks_models
import (
"github.com/gorilla/mux"
"github.com/influxdata/telegraf"
)
type Webhook interface {
Register(router *mux.Router, acc telegraf.Accumulator)
}
var Webhooks map[string]func(string) Webhook = make(map[string]func(string) Webhook)
func Add(name string, fun func(string) Webhook) {
Webhooks[name] = fun
}