From c78b6cdb4ee5a71325e1e71ac85b3585d04a33bc Mon Sep 17 00:00:00 2001 From: Cyril Duez Date: Tue, 24 May 2016 16:32:42 +0200 Subject: [PATCH] Add input plugin for rollbar service. (#1247) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Report rollbar events. Signed-off-by: Cyril Duez Signed-off-by: François de Metz * Fix indent with go fmt. * Add test for rollbar webhooks. * Report more data from new_item event. * Handle new deploy webhook. Signed-off-by: Cyril Duez Signed-off-by: François de Metz * Update default port. * Add readme. * Add rollbar_webhooks to the readme. * Add rollbar_webhooks to plugins list. * Add tag level for new_item event. * Update readme. * Update changelog. --- CHANGELOG.md | 1 + README.md | 1 + plugins/inputs/all/all.go | 1 + plugins/inputs/rollbar_webhooks/README.md | 47 +++++++ .../rollbar_webhooks/rollbar_webhooks.go | 119 ++++++++++++++++++ .../rollbar_webhooks_events.go | 78 ++++++++++++ .../rollbar_webhooks_events_json_test.go | 96 ++++++++++++++ .../rollbar_webhooks/rollbar_webhooks_test.go | 74 +++++++++++ 8 files changed, 417 insertions(+) create mode 100644 plugins/inputs/rollbar_webhooks/README.md create mode 100644 plugins/inputs/rollbar_webhooks/rollbar_webhooks.go create mode 100644 plugins/inputs/rollbar_webhooks/rollbar_webhooks_events.go create mode 100644 plugins/inputs/rollbar_webhooks/rollbar_webhooks_events_json_test.go create mode 100644 plugins/inputs/rollbar_webhooks/rollbar_webhooks_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index da53d893d..64d205fb0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ to "stdout". - [#1173](https://github.com/influxdata/telegraf/pull/1173): varnish input plugin. Thanks @sfox-xmatters! - [#1138](https://github.com/influxdata/telegraf/pull/1138): nstat input plugin. Thanks @Maksadbek! +- [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar input plugin. Thanks @francois2metz and @cduez! - [#1139](https://github.com/influxdata/telegraf/pull/1139): instrumental output plugin. Thanks @jasonroelofs! - [#1172](https://github.com/influxdata/telegraf/pull/1172): Ceph storage stats. Thanks @robinpercy! - [#1233](https://github.com/influxdata/telegraf/pull/1233): Updated golint gopsutil dependency. diff --git a/README.md b/README.md index 3343f0f44..1a6a04382 100644 --- a/README.md +++ b/README.md @@ -216,6 +216,7 @@ Telegraf can also collect metrics via the following service plugins: * [kafka_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/kafka_consumer) * [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer) * [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks) +* [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks) We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API. diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 7c3084ff7..c2322c436 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -53,6 +53,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/riak" + _ "github.com/influxdata/telegraf/plugins/inputs/rollbar_webhooks" _ "github.com/influxdata/telegraf/plugins/inputs/sensors" _ "github.com/influxdata/telegraf/plugins/inputs/snmp" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" diff --git a/plugins/inputs/rollbar_webhooks/README.md b/plugins/inputs/rollbar_webhooks/README.md new file mode 100644 index 000000000..d6938df28 --- /dev/null +++ b/plugins/inputs/rollbar_webhooks/README.md @@ -0,0 +1,47 @@ +# rollbar_webhooks + +This is a Telegraf service plugin that listens for events kicked off by Rollbar Webhooks service and persists data from them into configured outputs. To set up the listener first generate the proper configuration: +```sh +$ telegraf -sample-config -input-filter rollbar_webhooks -output-filter influxdb > config.conf.new +``` +Change the config file to point to the InfluxDB server you are using and adjust the settings to match your environment. Once that is complete: +```sh +$ cp config.conf.new /etc/telegraf/telegraf.conf +$ sudo service telegraf start +``` +Once the server is running you should configure your Rollbar's Webhooks to point at the `rollbar_webhooks` service. To do this go to `rollbar.com/` and click `Settings > Notifications > Webhook`. In the resulting page set `URL` to `http://:1619`, and click on `Enable Webhook Integration`. + +## Events + +The titles of the following sections are links to the full payloads and details for each event. The body contains what information from the event is persisted. The format is as follows: +``` +# TAGS +* 'tagKey' = `tagValue` type +# FIELDS +* 'fieldKey' = `fieldValue` type +``` +The tag values and field values show the place on the incoming JSON object where the data is sourced from. + +See [webhook doc](https://rollbar.com/docs/webhooks/) + +#### `new_item` event + +**Tags:** +* 'event' = `event.event_name` string +* 'environment' = `event.data.item.environment` string +* 'project_id = `event.data.item.project_id` int +* 'language' = `event.data.item.last_occurence.language` string +* 'level' = `event.data.item.last_occurence.level` string + +**Fields:** +* 'id' = `event.data.item.id` int + +#### `deploy` event + +**Tags:** +* 'event' = `event.event_name` string +* 'environment' = `event.data.deploy.environment` string +* 'project_id = `event.data.deploy.project_id` int + +**Fields:** +* 'id' = `event.data.item.id` int diff --git a/plugins/inputs/rollbar_webhooks/rollbar_webhooks.go b/plugins/inputs/rollbar_webhooks/rollbar_webhooks.go new file mode 100644 index 000000000..5e7dc8847 --- /dev/null +++ b/plugins/inputs/rollbar_webhooks/rollbar_webhooks.go @@ -0,0 +1,119 @@ +package rollbar_webhooks + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "log" + "net/http" + "sync" + "time" + + "github.com/gorilla/mux" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +func init() { + inputs.Add("rollbar_webhooks", func() telegraf.Input { return NewRollbarWebhooks() }) +} + +type RollbarWebhooks struct { + ServiceAddress string + // Lock for the struct + sync.Mutex + // Events buffer to store events between Gather calls + events []Event +} + +func NewRollbarWebhooks() *RollbarWebhooks { + return &RollbarWebhooks{} +} + +func (rb *RollbarWebhooks) SampleConfig() string { + return ` + ## Address and port to host Webhook listener on + service_address = ":1619" +` +} + +func (rb *RollbarWebhooks) Description() string { + return "A Rollbar Webhook Event collector" +} + +func (rb *RollbarWebhooks) Gather(acc telegraf.Accumulator) error { + rb.Lock() + defer rb.Unlock() + for _, event := range rb.events { + acc.AddFields("rollbar_webhooks", event.Fields(), event.Tags(), time.Now()) + } + rb.events = make([]Event, 0) + return nil +} + +func (rb *RollbarWebhooks) Listen() { + r := mux.NewRouter() + r.HandleFunc("/", rb.eventHandler).Methods("POST") + err := http.ListenAndServe(fmt.Sprintf("%s", rb.ServiceAddress), r) + if err != nil { + log.Printf("Error starting server: %v", err) + } +} + +func (rb *RollbarWebhooks) Start(_ telegraf.Accumulator) error { + go rb.Listen() + log.Printf("Started the rollbar_webhooks service on %s\n", rb.ServiceAddress) + return nil +} + +func (rb *RollbarWebhooks) Stop() { + log.Println("Stopping the rbWebhooks service") +} + +func (rb *RollbarWebhooks) eventHandler(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + data, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + dummyEvent := &DummyEvent{} + err = json.Unmarshal(data, dummyEvent) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + event, err := NewEvent(dummyEvent, data) + if err != nil { + w.WriteHeader(http.StatusOK) + return + } + + rb.Lock() + rb.events = append(rb.events, event) + rb.Unlock() + + w.WriteHeader(http.StatusOK) +} + +func generateEvent(event Event, data []byte) (Event, error) { + err := json.Unmarshal(data, event) + if err != nil { + return nil, err + } + return event, nil +} + +func NewEvent(dummyEvent *DummyEvent, data []byte) (Event, error) { + switch dummyEvent.EventName { + case "new_item": + return generateEvent(&NewItem{}, data) + case "deploy": + return generateEvent(&Deploy{}, data) + default: + return nil, errors.New("Not implemented type: " + dummyEvent.EventName) + } +} diff --git a/plugins/inputs/rollbar_webhooks/rollbar_webhooks_events.go b/plugins/inputs/rollbar_webhooks/rollbar_webhooks_events.go new file mode 100644 index 000000000..8cccec336 --- /dev/null +++ b/plugins/inputs/rollbar_webhooks/rollbar_webhooks_events.go @@ -0,0 +1,78 @@ +package rollbar_webhooks + +import "strconv" + +type Event interface { + Tags() map[string]string + Fields() map[string]interface{} +} + +type DummyEvent struct { + EventName string `json:"event_name"` +} + +type NewItemDataItemLastOccurence struct { + Language string `json:"language"` + Level string `json:"level"` +} + +type NewItemDataItem struct { + Id int `json:"id"` + Environment string `json:"environment"` + ProjectId int `json:"project_id"` + LastOccurence NewItemDataItemLastOccurence `json:"last_occurrence"` +} + +type NewItemData struct { + Item NewItemDataItem `json:"item"` +} + +type NewItem struct { + EventName string `json:"event_name"` + Data NewItemData `json:"data"` +} + +func (ni *NewItem) Tags() map[string]string { + return map[string]string{ + "event": ni.EventName, + "environment": ni.Data.Item.Environment, + "project_id": strconv.Itoa(ni.Data.Item.ProjectId), + "language": ni.Data.Item.LastOccurence.Language, + "level": ni.Data.Item.LastOccurence.Level, + } +} + +func (ni *NewItem) Fields() map[string]interface{} { + return map[string]interface{}{ + "id": ni.Data.Item.Id, + } +} + +type DeployDataDeploy struct { + Id int `json:"id"` + Environment string `json:"environment"` + ProjectId int `json:"project_id"` +} + +type DeployData struct { + Deploy DeployDataDeploy `json:"deploy"` +} + +type Deploy struct { + EventName string `json:"event_name"` + Data DeployData `json:"data"` +} + +func (ni *Deploy) Tags() map[string]string { + return map[string]string{ + "event": ni.EventName, + "environment": ni.Data.Deploy.Environment, + "project_id": strconv.Itoa(ni.Data.Deploy.ProjectId), + } +} + +func (ni *Deploy) Fields() map[string]interface{} { + return map[string]interface{}{ + "id": ni.Data.Deploy.Id, + } +} diff --git a/plugins/inputs/rollbar_webhooks/rollbar_webhooks_events_json_test.go b/plugins/inputs/rollbar_webhooks/rollbar_webhooks_events_json_test.go new file mode 100644 index 000000000..99a6db8ff --- /dev/null +++ b/plugins/inputs/rollbar_webhooks/rollbar_webhooks_events_json_test.go @@ -0,0 +1,96 @@ +package rollbar_webhooks + +func NewItemJSON() string { + return ` + { + "event_name": "new_item", + "data": { + "item": { + "public_item_id": null, + "integrations_data": {}, + "last_activated_timestamp": 1382655421, + "unique_occurrences": null, + "id": 272716944, + "environment": "production", + "title": "testing aobg98wrwe", + "last_occurrence_id": 481761639, + "last_occurrence_timestamp": 1382655421, + "platform": 0, + "first_occurrence_timestamp": 1382655421, + "project_id": 90, + "resolved_in_version": null, + "status": 1, + "hash": "c595b2ae0af9b397bb6bdafd57104ac4d5f6b382", + "last_occurrence": { + "body": { + "message": { + "body": "testing aobg98wrwe" + } + }, + "uuid": "d2036647-e0b7-4cad-bc98-934831b9b6d1", + "language": "python", + "level": "error", + "timestamp": 1382655421, + "server": { + "host": "dev", + "argv": [ + "" + ] + }, + "environment": "production", + "framework": "unknown", + "notifier": { + "version": "0.5.12", + "name": "pyrollbar" + }, + "metadata": { + "access_token": "", + "debug": { + "routes": { + "start_time": 1382212080401, + "counters": { + "post_item": 3274122 + } + } + }, + "customer_timestamp": 1382655421, + "api_server_hostname": "web6" + } + }, + "framework": 0, + "total_occurrences": 1, + "level": 40, + "counter": 4, + "first_occurrence_id": 481761639, + "activating_occurrence_id": 481761639 + } + } + }` +} + +func DeployJSON() string { + return ` + { + "event_name": "deploy", + "data": { + "deploy": { + "comment": "deploying webs", + "user_id": 1, + "finish_time": 1382656039, + "start_time": 1382656038, + "id": 187585, + "environment": "production", + "project_id": 90, + "local_username": "brian", + "revision": "e4b9b7db860b2e5ac799f8c06b9498b71ab270bb" + } + } + }` +} + +func UnknowJSON() string { + return ` + { + "event_name": "roger" + }` +} diff --git a/plugins/inputs/rollbar_webhooks/rollbar_webhooks_test.go b/plugins/inputs/rollbar_webhooks/rollbar_webhooks_test.go new file mode 100644 index 000000000..e0b183a8c --- /dev/null +++ b/plugins/inputs/rollbar_webhooks/rollbar_webhooks_test.go @@ -0,0 +1,74 @@ +package rollbar_webhooks + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/influxdata/telegraf/testutil" +) + +func postWebhooks(rb *RollbarWebhooks, eventBody string) *httptest.ResponseRecorder { + req, _ := http.NewRequest("POST", "/", strings.NewReader(eventBody)) + w := httptest.NewRecorder() + w.Code = 500 + + rb.eventHandler(w, req) + + return w +} + +func TestNewItem(t *testing.T) { + var acc testutil.Accumulator + rb := NewRollbarWebhooks() + resp := postWebhooks(rb, NewItemJSON()) + if resp.Code != http.StatusOK { + t.Errorf("POST new_item returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) + } + rb.Gather(&acc) + + fields := map[string]interface{}{ + "id": 272716944, + } + + tags := map[string]string{ + "event": "new_item", + "environment": "production", + "project_id": "90", + "language": "python", + "level": "error", + } + + acc.AssertContainsTaggedFields(t, "rollbar_webhooks", fields, tags) +} + +func TestDeploy(t *testing.T) { + var acc testutil.Accumulator + rb := NewRollbarWebhooks() + resp := postWebhooks(rb, DeployJSON()) + if resp.Code != http.StatusOK { + t.Errorf("POST deploy returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) + } + rb.Gather(&acc) + + fields := map[string]interface{}{ + "id": 187585, + } + + tags := map[string]string{ + "event": "deploy", + "environment": "production", + "project_id": "90", + } + + acc.AssertContainsTaggedFields(t, "rollbar_webhooks", fields, tags) +} + +func TestUnknowItem(t *testing.T) { + rb := NewRollbarWebhooks() + resp := postWebhooks(rb, UnknowJSON()) + if resp.Code != http.StatusOK { + t.Errorf("POST unknow returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) + } +}