Add new webhooks plugin that superseed github and rollbar plugins.

closes #1289

Signed-off-by: François de Metz <francois@stormz.me>
Signed-off-by: Cyril Duez <cyril@stormz.me>

Rename internals struct.

Signed-off-by: François de Metz <francois@stormz.me>
Signed-off-by: Cyril Duez <cyril@stormz.me>

Update changelog.

Signed-off-by: François de Metz <francois@stormz.me>
Signed-off-by: Cyril Duez <cyril@stormz.me>

Update READMEs and CHANGELOG.

Signed-off-by: François de Metz <francois@stormz.me>
Signed-off-by: Cyril Duez <cyril@stormz.me>

Update SampleConfig.

Update the config format.

Update telegraf config.

Update the webhooks README.

Update changelog.

Update the changelog with an upgrade path.

Update default ports.

Fix indent.

Check for nil value on AvailableWebhooks.

Check for CanInterface.
This commit is contained in:
François de Metz 2016-05-27 17:27:54 +02:00 committed by Cameron Sparr
parent e3448153e1
commit e603825e37
18 changed files with 302 additions and 235 deletions

View File

@ -1,7 +1,36 @@
## v1.0 ## v1.0
### Release Notes
**Breaking Change**: users of github_webhooks must change to the new
`[[inputs.webhooks]]` plugin.
This means that the default github_webhooks config:
```
# A Github Webhook Event collector
[[inputs.github_webhooks]]
## Address and port to host Webhook listener on
service_address = ":1618"
```
should now look like:
```
# A Webhooks Event collector
[[inputs.webhooks]]
## Address and port to host Webhook listener on
service_address = ":1618"
[inputs.webhooks.github]
path = "/"
```
### Features ### Features
- [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez!
- [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin.
### Bugfixes ### Bugfixes
- [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin. - [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin.
@ -50,11 +79,11 @@ in conjunction with wildcard dimension values as it will control the amount of
time before a new metric is included by the plugin. time before a new metric is included by the plugin.
### Features ### Features
- [#1262](https://github.com/influxdata/telegraf/pull/1261): Add graylog input pluging. - [#1262](https://github.com/influxdata/telegraf/pull/1261): Add graylog input pluging.
- [#1294](https://github.com/influxdata/telegraf/pull/1294): consul input plugin. Thanks @harnash - [#1294](https://github.com/influxdata/telegraf/pull/1294): consul input plugin. Thanks @harnash
- [#1164](https://github.com/influxdata/telegraf/pull/1164): conntrack input plugin. Thanks @robinpercy! - [#1164](https://github.com/influxdata/telegraf/pull/1164): conntrack input plugin. Thanks @robinpercy!
- [#1165](https://github.com/influxdata/telegraf/pull/1165): vmstat input plugin. Thanks @jshim-xm! - [#1165](https://github.com/influxdata/telegraf/pull/1165): vmstat input plugin. Thanks @jshim-xm!
- [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar input plugin. Thanks @francois2metz and @cduez!
- [#1208](https://github.com/influxdata/telegraf/pull/1208): Standardized AWS credentials evaluation & wildcard CloudWatch dimensions. Thanks @johnrengelman! - [#1208](https://github.com/influxdata/telegraf/pull/1208): Standardized AWS credentials evaluation & wildcard CloudWatch dimensions. Thanks @johnrengelman!
- [#1264](https://github.com/influxdata/telegraf/pull/1264): Add SSL config options to http_response plugin. - [#1264](https://github.com/influxdata/telegraf/pull/1264): Add SSL config options to http_response plugin.
- [#1272](https://github.com/influxdata/telegraf/pull/1272): graphite parser: add ability to specify multiple tag keys, for consistency with influxdb parser. - [#1272](https://github.com/influxdata/telegraf/pull/1272): graphite parser: add ability to specify multiple tag keys, for consistency with influxdb parser.

View File

@ -217,8 +217,9 @@ Telegraf can also collect metrics via the following service plugins:
* [mqtt_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/mqtt_consumer) * [mqtt_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/mqtt_consumer)
* [kafka_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/kafka_consumer) * [kafka_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/kafka_consumer)
* [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer) * [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer)
* [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks) * [webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks)
* [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks) * [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github)
* [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar)
We'll be adding support for many more over the coming months. Read on if you We'll be adding support for many more over the coming months. Read on if you
want to add support for another service or third-party API. want to add support for another service or third-party API.

View File

@ -1490,12 +1490,6 @@
# SERVICE INPUT PLUGINS # # SERVICE INPUT PLUGINS #
############################################################################### ###############################################################################
# # A Github Webhook Event collector
# [[inputs.github_webhooks]]
# ## Address and port to host Webhook listener on
# service_address = ":1618"
# # Read metrics from Kafka topic(s) # # Read metrics from Kafka topic(s)
# [[inputs.kafka_consumer]] # [[inputs.kafka_consumer]]
# ## topic(s) to consume # ## topic(s) to consume
@ -1601,12 +1595,6 @@
# data_format = "influx" # data_format = "influx"
# # A Rollbar Webhook Event collector
# [[inputs.rollbar_webhooks]]
# ## Address and port to host Webhook listener on
# service_address = ":1619"
# # Statsd Server # # Statsd Server
# [[inputs.statsd]] # [[inputs.statsd]]
# ## Address and port to host UDP listener on # ## Address and port to host UDP listener on
@ -1701,3 +1689,15 @@
# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md # ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx" # data_format = "influx"
# # A Webhooks Event collector
# [[inputs.webhooks]]
# ## Address and port to host Webhook listener on
# service_address = ":1619"
#
# [inputs.webhooks.github]
# path = "/github"
#
# [inputs.webhooks.rollbar]
# path = "/rollbar"

View File

@ -19,7 +19,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch" _ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch"
_ "github.com/influxdata/telegraf/plugins/inputs/exec" _ "github.com/influxdata/telegraf/plugins/inputs/exec"
_ "github.com/influxdata/telegraf/plugins/inputs/filestat" _ "github.com/influxdata/telegraf/plugins/inputs/filestat"
_ "github.com/influxdata/telegraf/plugins/inputs/github_webhooks"
_ "github.com/influxdata/telegraf/plugins/inputs/graylog" _ "github.com/influxdata/telegraf/plugins/inputs/graylog"
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy" _ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/http_response" _ "github.com/influxdata/telegraf/plugins/inputs/http_response"
@ -57,7 +56,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak" _ "github.com/influxdata/telegraf/plugins/inputs/riak"
_ "github.com/influxdata/telegraf/plugins/inputs/rollbar_webhooks"
_ "github.com/influxdata/telegraf/plugins/inputs/sensors" _ "github.com/influxdata/telegraf/plugins/inputs/sensors"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp" _ "github.com/influxdata/telegraf/plugins/inputs/snmp"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
@ -70,6 +68,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/udp_listener" _ "github.com/influxdata/telegraf/plugins/inputs/udp_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/varnish" _ "github.com/influxdata/telegraf/plugins/inputs/varnish"
_ "github.com/influxdata/telegraf/plugins/inputs/webhooks"
_ "github.com/influxdata/telegraf/plugins/inputs/win_perf_counters" _ "github.com/influxdata/telegraf/plugins/inputs/win_perf_counters"
_ "github.com/influxdata/telegraf/plugins/inputs/zfs" _ "github.com/influxdata/telegraf/plugins/inputs/zfs"
_ "github.com/influxdata/telegraf/plugins/inputs/zookeeper" _ "github.com/influxdata/telegraf/plugins/inputs/zookeeper"

View File

@ -1,119 +0,0 @@
package rollbar_webhooks
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/mux"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
func init() {
inputs.Add("rollbar_webhooks", func() telegraf.Input { return NewRollbarWebhooks() })
}
type RollbarWebhooks struct {
ServiceAddress string
// Lock for the struct
sync.Mutex
// Events buffer to store events between Gather calls
events []Event
}
func NewRollbarWebhooks() *RollbarWebhooks {
return &RollbarWebhooks{}
}
func (rb *RollbarWebhooks) SampleConfig() string {
return `
## Address and port to host Webhook listener on
service_address = ":1619"
`
}
func (rb *RollbarWebhooks) Description() string {
return "A Rollbar Webhook Event collector"
}
func (rb *RollbarWebhooks) Gather(acc telegraf.Accumulator) error {
rb.Lock()
defer rb.Unlock()
for _, event := range rb.events {
acc.AddFields("rollbar_webhooks", event.Fields(), event.Tags(), time.Now())
}
rb.events = make([]Event, 0)
return nil
}
func (rb *RollbarWebhooks) Listen() {
r := mux.NewRouter()
r.HandleFunc("/", rb.eventHandler).Methods("POST")
err := http.ListenAndServe(fmt.Sprintf("%s", rb.ServiceAddress), r)
if err != nil {
log.Printf("Error starting server: %v", err)
}
}
func (rb *RollbarWebhooks) Start(_ telegraf.Accumulator) error {
go rb.Listen()
log.Printf("Started the rollbar_webhooks service on %s\n", rb.ServiceAddress)
return nil
}
func (rb *RollbarWebhooks) Stop() {
log.Println("Stopping the rbWebhooks service")
}
func (rb *RollbarWebhooks) eventHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
data, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
dummyEvent := &DummyEvent{}
err = json.Unmarshal(data, dummyEvent)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
event, err := NewEvent(dummyEvent, data)
if err != nil {
w.WriteHeader(http.StatusOK)
return
}
rb.Lock()
rb.events = append(rb.events, event)
rb.Unlock()
w.WriteHeader(http.StatusOK)
}
func generateEvent(event Event, data []byte) (Event, error) {
err := json.Unmarshal(data, event)
if err != nil {
return nil, err
}
return event, nil
}
func NewEvent(dummyEvent *DummyEvent, data []byte) (Event, error) {
switch dummyEvent.EventName {
case "new_item":
return generateEvent(&NewItem{}, data)
case "deploy":
return generateEvent(&Deploy{}, data)
default:
return nil, errors.New("Not implemented type: " + dummyEvent.EventName)
}
}

View File

@ -0,0 +1,27 @@
# Webhooks
This is a Telegraf service plugin that start an http server and register multiple webhook listeners.
```sh
$ telegraf -sample-config -input-filter webhooks -output-filter influxdb > config.conf.new
```
Change the config file to point to the InfluxDB server you are using and adjust the settings to match your environment. Once that is complete:
```sh
$ cp config.conf.new /etc/telegraf/telegraf.conf
$ sudo service telegraf start
```
## Available webhooks
- [Github](github/)
- [Rollbar](rollbar/)
## Adding new webhooks plugin
1. Add your webhook plugin inside the `webhooks` folder
1. Your plugin must implement the `Webhook` interface
1. Import your plugin in the `webhooks.go` file and add it to the `Webhooks` struct
Both [Github](github/) and [Rollbar](rollbar/) are good example to follow.

View File

@ -1,15 +1,6 @@
# github_webhooks # github webhooks
This is a Telegraf service plugin that listens for events kicked off by Github's Webhooks service and persists data from them into configured outputs. To set up the listener first generate the proper configuration: You should configure your Organization's Webhooks to point at the `webhooks` service. To do this go to `github.com/{my_organization}` and click `Settings > Webhooks > Add webhook`. In the resulting menu set `Payload URL` to `http://<my_ip>:1619/github`, `Content type` to `application/json` and under the section `Which events would you like to trigger this webhook?` select 'Send me <b>everything</b>'. By default all of the events will write to the `github_webhooks` measurement, this is configurable by setting the `measurement_name` in the config file.
```sh
$ telegraf -sample-config -input-filter github_webhooks -output-filter influxdb > config.conf.new
```
Change the config file to point to the InfluxDB server you are using and adjust the settings to match your environment. Once that is complete:
```sh
$ cp config.conf.new /etc/telegraf/telegraf.conf
$ sudo service telegraf start
```
Once the server is running you should configure your Organization's Webhooks to point at the `github_webhooks` service. To do this go to `github.com/{my_organization}` and click `Settings > Webhooks > Add webhook`. In the resulting menu set `Payload URL` to `http://<my_ip>:1618`, `Content type` to `application/json` and under the section `Which events would you like to trigger this webhook?` select 'Send me <b>everything</b>'. By default all of the events will write to the `github_webhooks` measurement, this is configurable by setting the `measurement_name` in the config file.
## Events ## Events

View File

@ -1,78 +1,27 @@
package github_webhooks package github
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
"sync"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
) )
func init() { type GithubWebhook struct {
inputs.Add("github_webhooks", func() telegraf.Input { return &GithubWebhooks{} }) Path string
acc telegraf.Accumulator
} }
type GithubWebhooks struct { func (gh *GithubWebhook) Register(router *mux.Router, acc telegraf.Accumulator) {
ServiceAddress string router.HandleFunc(gh.Path, gh.eventHandler).Methods("POST")
// Lock for the struct log.Printf("Started the webhooks_github on %s\n", gh.Path)
sync.Mutex gh.acc = acc
// Events buffer to store events between Gather calls
events []Event
} }
func NewGithubWebhooks() *GithubWebhooks { func (gh *GithubWebhook) eventHandler(w http.ResponseWriter, r *http.Request) {
return &GithubWebhooks{}
}
func (gh *GithubWebhooks) SampleConfig() string {
return `
## Address and port to host Webhook listener on
service_address = ":1618"
`
}
func (gh *GithubWebhooks) Description() string {
return "A Github Webhook Event collector"
}
// Writes the points from <-gh.in to the Accumulator
func (gh *GithubWebhooks) Gather(acc telegraf.Accumulator) error {
gh.Lock()
defer gh.Unlock()
for _, event := range gh.events {
p := event.NewMetric()
acc.AddFields("github_webhooks", p.Fields(), p.Tags(), p.Time())
}
gh.events = make([]Event, 0)
return nil
}
func (gh *GithubWebhooks) Listen() {
r := mux.NewRouter()
r.HandleFunc("/", gh.eventHandler).Methods("POST")
err := http.ListenAndServe(fmt.Sprintf("%s", gh.ServiceAddress), r)
if err != nil {
log.Printf("Error starting server: %v", err)
}
}
func (gh *GithubWebhooks) Start(_ telegraf.Accumulator) error {
go gh.Listen()
log.Printf("Started the github_webhooks service on %s\n", gh.ServiceAddress)
return nil
}
func (gh *GithubWebhooks) Stop() {
log.Println("Stopping the ghWebhooks service")
}
// Handles the / route
func (gh *GithubWebhooks) eventHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close() defer r.Body.Close()
eventType := r.Header["X-Github-Event"][0] eventType := r.Header["X-Github-Event"][0]
data, err := ioutil.ReadAll(r.Body) data, err := ioutil.ReadAll(r.Body)
@ -85,9 +34,10 @@ func (gh *GithubWebhooks) eventHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
gh.Lock()
gh.events = append(gh.events, e) p := e.NewMetric()
gh.Unlock() gh.acc.AddFields("github_webhooks", p.Fields(), p.Tags(), p.Time())
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }

View File

@ -1,4 +1,4 @@
package github_webhooks package github
func CommitCommentEventJSON() string { func CommitCommentEventJSON() string {
return `{ return `{

View File

@ -1,4 +1,4 @@
package github_webhooks package github
import ( import (
"fmt" "fmt"

View File

@ -1,15 +1,18 @@
package github_webhooks package github
import ( import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strings" "strings"
"testing" "testing"
"github.com/influxdata/telegraf/testutil"
) )
func GithubWebhookRequest(event string, jsonString string, t *testing.T) { func GithubWebhookRequest(event string, jsonString string, t *testing.T) {
gh := NewGithubWebhooks() var acc testutil.Accumulator
req, _ := http.NewRequest("POST", "/", strings.NewReader(jsonString)) gh := &GithubWebhook{Path: "/github", acc: &acc}
req, _ := http.NewRequest("POST", "/github", strings.NewReader(jsonString))
req.Header.Add("X-Github-Event", event) req.Header.Add("X-Github-Event", event)
w := httptest.NewRecorder() w := httptest.NewRecorder()
gh.eventHandler(w, req) gh.eventHandler(w, req)

View File

@ -1,15 +1,6 @@
# rollbar_webhooks # rollbar webhooks
This is a Telegraf service plugin that listens for events kicked off by Rollbar Webhooks service and persists data from them into configured outputs. To set up the listener first generate the proper configuration: You should configure your Rollbar's Webhooks to point at the `webhooks` service. To do this go to `rollbar.com/` and click `Settings > Notifications > Webhook`. In the resulting page set `URL` to `http://<my_ip>:1619/rollbar`, and click on `Enable Webhook Integration`.
```sh
$ telegraf -sample-config -input-filter rollbar_webhooks -output-filter influxdb > config.conf.new
```
Change the config file to point to the InfluxDB server you are using and adjust the settings to match your environment. Once that is complete:
```sh
$ cp config.conf.new /etc/telegraf/telegraf.conf
$ sudo service telegraf start
```
Once the server is running you should configure your Rollbar's Webhooks to point at the `rollbar_webhooks` service. To do this go to `rollbar.com/` and click `Settings > Notifications > Webhook`. In the resulting page set `URL` to `http://<my_ip>:1619`, and click on `Enable Webhook Integration`.
## Events ## Events

View File

@ -0,0 +1,69 @@
package rollbar
import (
"encoding/json"
"errors"
"io/ioutil"
"log"
"net/http"
"time"
"github.com/gorilla/mux"
"github.com/influxdata/telegraf"
)
type RollbarWebhook struct {
Path string
acc telegraf.Accumulator
}
func (rb *RollbarWebhook) Register(router *mux.Router, acc telegraf.Accumulator) {
router.HandleFunc(rb.Path, rb.eventHandler).Methods("POST")
log.Printf("Started the webhooks_rollbar on %s\n", rb.Path)
rb.acc = acc
}
func (rb *RollbarWebhook) eventHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
data, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
dummyEvent := &DummyEvent{}
err = json.Unmarshal(data, dummyEvent)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
event, err := NewEvent(dummyEvent, data)
if err != nil {
w.WriteHeader(http.StatusOK)
return
}
rb.acc.AddFields("rollbar_webhooks", event.Fields(), event.Tags(), time.Now())
w.WriteHeader(http.StatusOK)
}
func generateEvent(event Event, data []byte) (Event, error) {
err := json.Unmarshal(data, event)
if err != nil {
return nil, err
}
return event, nil
}
func NewEvent(dummyEvent *DummyEvent, data []byte) (Event, error) {
switch dummyEvent.EventName {
case "new_item":
return generateEvent(&NewItem{}, data)
case "deploy":
return generateEvent(&Deploy{}, data)
default:
return nil, errors.New("Not implemented type: " + dummyEvent.EventName)
}
}

View File

@ -1,4 +1,4 @@
package rollbar_webhooks package rollbar
import "strconv" import "strconv"

View File

@ -1,4 +1,4 @@
package rollbar_webhooks package rollbar
func NewItemJSON() string { func NewItemJSON() string {
return ` return `

View File

@ -1,4 +1,4 @@
package rollbar_webhooks package rollbar
import ( import (
"net/http" "net/http"
@ -9,7 +9,7 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
func postWebhooks(rb *RollbarWebhooks, eventBody string) *httptest.ResponseRecorder { func postWebhooks(rb *RollbarWebhook, eventBody string) *httptest.ResponseRecorder {
req, _ := http.NewRequest("POST", "/", strings.NewReader(eventBody)) req, _ := http.NewRequest("POST", "/", strings.NewReader(eventBody))
w := httptest.NewRecorder() w := httptest.NewRecorder()
w.Code = 500 w.Code = 500
@ -21,12 +21,11 @@ func postWebhooks(rb *RollbarWebhooks, eventBody string) *httptest.ResponseRecor
func TestNewItem(t *testing.T) { func TestNewItem(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
rb := NewRollbarWebhooks() rb := &RollbarWebhook{Path: "/rollbar", acc: &acc}
resp := postWebhooks(rb, NewItemJSON()) resp := postWebhooks(rb, NewItemJSON())
if resp.Code != http.StatusOK { if resp.Code != http.StatusOK {
t.Errorf("POST new_item returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) t.Errorf("POST new_item returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)
} }
rb.Gather(&acc)
fields := map[string]interface{}{ fields := map[string]interface{}{
"id": 272716944, "id": 272716944,
@ -45,12 +44,11 @@ func TestNewItem(t *testing.T) {
func TestDeploy(t *testing.T) { func TestDeploy(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
rb := NewRollbarWebhooks() rb := &RollbarWebhook{Path: "/rollbar", acc: &acc}
resp := postWebhooks(rb, DeployJSON()) resp := postWebhooks(rb, DeployJSON())
if resp.Code != http.StatusOK { if resp.Code != http.StatusOK {
t.Errorf("POST deploy returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) t.Errorf("POST deploy returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)
} }
rb.Gather(&acc)
fields := map[string]interface{}{ fields := map[string]interface{}{
"id": 187585, "id": 187585,
@ -66,7 +64,7 @@ func TestDeploy(t *testing.T) {
} }
func TestUnknowItem(t *testing.T) { func TestUnknowItem(t *testing.T) {
rb := NewRollbarWebhooks() rb := &RollbarWebhook{Path: "/rollbar"}
resp := postWebhooks(rb, UnknowJSON()) resp := postWebhooks(rb, UnknowJSON())
if resp.Code != http.StatusOK { if resp.Code != http.StatusOK {
t.Errorf("POST unknow returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) t.Errorf("POST unknow returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)

View File

@ -0,0 +1,99 @@
package webhooks
import (
"fmt"
"log"
"net/http"
"reflect"
"github.com/gorilla/mux"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/github"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar"
)
type Webhook interface {
Register(router *mux.Router, acc telegraf.Accumulator)
}
func init() {
inputs.Add("webhooks", func() telegraf.Input { return NewWebhooks() })
}
type Webhooks struct {
ServiceAddress string
Github *github.GithubWebhook
Rollbar *rollbar.RollbarWebhook
}
func NewWebhooks() *Webhooks {
return &Webhooks{}
}
func (wb *Webhooks) SampleConfig() string {
return `
## Address and port to host Webhook listener on
service_address = ":1619"
[inputs.webhooks.github]
path = "/github"
[inputs.webhooks.rollbar]
path = "/rollbar"
`
}
func (wb *Webhooks) Description() string {
return "A Webhooks Event collector"
}
func (wb *Webhooks) Gather(_ telegraf.Accumulator) error {
return nil
}
func (wb *Webhooks) Listen(acc telegraf.Accumulator) {
r := mux.NewRouter()
for _, webhook := range wb.AvailableWebhooks() {
webhook.Register(r, acc)
}
err := http.ListenAndServe(fmt.Sprintf("%s", wb.ServiceAddress), r)
if err != nil {
log.Printf("Error starting server: %v", err)
}
}
// Looks for fields which implement Webhook interface
func (wb *Webhooks) AvailableWebhooks() []Webhook {
webhooks := make([]Webhook, 0)
s := reflect.ValueOf(wb).Elem()
for i := 0; i < s.NumField(); i++ {
f := s.Field(i)
if !f.CanInterface() {
continue
}
if wbPlugin, ok := f.Interface().(Webhook); ok {
if !reflect.ValueOf(wbPlugin).IsNil() {
webhooks = append(webhooks, wbPlugin)
}
}
}
return webhooks
}
func (wb *Webhooks) Start(acc telegraf.Accumulator) error {
go wb.Listen(acc)
log.Printf("Started the webhooks service on %s\n", wb.ServiceAddress)
return nil
}
func (rb *Webhooks) Stop() {
log.Println("Stopping the Webhooks service")
}

View File

@ -0,0 +1,29 @@
package webhooks
import (
"reflect"
"testing"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/github"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar"
)
func TestAvailableWebhooks(t *testing.T) {
wb := NewWebhooks()
expected := make([]Webhook, 0)
if !reflect.DeepEqual(wb.AvailableWebhooks(), expected) {
t.Errorf("expected to %v.\nGot %v", expected, wb.AvailableWebhooks())
}
wb.Github = &github.GithubWebhook{Path: "/github"}
expected = append(expected, wb.Github)
if !reflect.DeepEqual(wb.AvailableWebhooks(), expected) {
t.Errorf("expected to be %v.\nGot %v", expected, wb.AvailableWebhooks())
}
wb.Rollbar = &rollbar.RollbarWebhook{Path: "/rollbar"}
expected = append(expected, wb.Rollbar)
if !reflect.DeepEqual(wb.AvailableWebhooks(), expected) {
t.Errorf("expected to be %v.\nGot %v", expected, wb.AvailableWebhooks())
}
}