Add input plugin for rollbar service. (#1247)

* Report rollbar events.

Signed-off-by: Cyril Duez <cyril@stormz.me>
Signed-off-by: François de Metz <francois@stormz.me>

* Fix indent with go fmt.

* Add test for rollbar webhooks.

* Report more data from new_item event.

* Handle new deploy webhook.

Signed-off-by: Cyril Duez <cyril@stormz.me>
Signed-off-by: François de Metz <francois@stormz.me>

* Update default port.

* Add readme.

* Add rollbar_webhooks to the readme.

* Add rollbar_webhooks to plugins list.

* Add tag level for new_item event.

* Update readme.

* Update changelog.
This commit is contained in:
Cyril Duez 2016-05-24 16:32:42 +02:00 committed by Cameron Sparr
parent d736c7235a
commit c78b6cdb4e
8 changed files with 417 additions and 0 deletions

View File

@ -24,6 +24,7 @@ to "stdout".
- [#1173](https://github.com/influxdata/telegraf/pull/1173): varnish input plugin. Thanks @sfox-xmatters!
- [#1138](https://github.com/influxdata/telegraf/pull/1138): nstat input plugin. Thanks @Maksadbek!
- [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar input plugin. Thanks @francois2metz and @cduez!
- [#1139](https://github.com/influxdata/telegraf/pull/1139): instrumental output plugin. Thanks @jasonroelofs!
- [#1172](https://github.com/influxdata/telegraf/pull/1172): Ceph storage stats. Thanks @robinpercy!
- [#1233](https://github.com/influxdata/telegraf/pull/1233): Updated golint gopsutil dependency.

View File

@ -216,6 +216,7 @@ Telegraf can also collect metrics via the following service plugins:
* [kafka_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/kafka_consumer)
* [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer)
* [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks)
* [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks)
We'll be adding support for many more over the coming months. Read on if you
want to add support for another service or third-party API.

View File

@ -53,6 +53,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak"
_ "github.com/influxdata/telegraf/plugins/inputs/rollbar_webhooks"
_ "github.com/influxdata/telegraf/plugins/inputs/sensors"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"

View File

@ -0,0 +1,47 @@
# rollbar_webhooks
This is a Telegraf service plugin that listens for events kicked off by Rollbar Webhooks service and persists data from them into configured outputs. To set up the listener first generate the proper configuration:
```sh
$ telegraf -sample-config -input-filter rollbar_webhooks -output-filter influxdb > config.conf.new
```
Change the config file to point to the InfluxDB server you are using and adjust the settings to match your environment. Once that is complete:
```sh
$ cp config.conf.new /etc/telegraf/telegraf.conf
$ sudo service telegraf start
```
Once the server is running you should configure your Rollbar's Webhooks to point at the `rollbar_webhooks` service. To do this go to `rollbar.com/` and click `Settings > Notifications > Webhook`. In the resulting page set `URL` to `http://<my_ip>:1619`, and click on `Enable Webhook Integration`.
## Events
The titles of the following sections are links to the full payloads and details for each event. The body contains what information from the event is persisted. The format is as follows:
```
# TAGS
* 'tagKey' = `tagValue` type
# FIELDS
* 'fieldKey' = `fieldValue` type
```
The tag values and field values show the place on the incoming JSON object where the data is sourced from.
See [webhook doc](https://rollbar.com/docs/webhooks/)
#### `new_item` event
**Tags:**
* 'event' = `event.event_name` string
* 'environment' = `event.data.item.environment` string
* 'project_id = `event.data.item.project_id` int
* 'language' = `event.data.item.last_occurence.language` string
* 'level' = `event.data.item.last_occurence.level` string
**Fields:**
* 'id' = `event.data.item.id` int
#### `deploy` event
**Tags:**
* 'event' = `event.event_name` string
* 'environment' = `event.data.deploy.environment` string
* 'project_id = `event.data.deploy.project_id` int
**Fields:**
* 'id' = `event.data.item.id` int

View File

@ -0,0 +1,119 @@
package rollbar_webhooks
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/mux"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
func init() {
inputs.Add("rollbar_webhooks", func() telegraf.Input { return NewRollbarWebhooks() })
}
type RollbarWebhooks struct {
ServiceAddress string
// Lock for the struct
sync.Mutex
// Events buffer to store events between Gather calls
events []Event
}
func NewRollbarWebhooks() *RollbarWebhooks {
return &RollbarWebhooks{}
}
func (rb *RollbarWebhooks) SampleConfig() string {
return `
## Address and port to host Webhook listener on
service_address = ":1619"
`
}
func (rb *RollbarWebhooks) Description() string {
return "A Rollbar Webhook Event collector"
}
func (rb *RollbarWebhooks) Gather(acc telegraf.Accumulator) error {
rb.Lock()
defer rb.Unlock()
for _, event := range rb.events {
acc.AddFields("rollbar_webhooks", event.Fields(), event.Tags(), time.Now())
}
rb.events = make([]Event, 0)
return nil
}
func (rb *RollbarWebhooks) Listen() {
r := mux.NewRouter()
r.HandleFunc("/", rb.eventHandler).Methods("POST")
err := http.ListenAndServe(fmt.Sprintf("%s", rb.ServiceAddress), r)
if err != nil {
log.Printf("Error starting server: %v", err)
}
}
func (rb *RollbarWebhooks) Start(_ telegraf.Accumulator) error {
go rb.Listen()
log.Printf("Started the rollbar_webhooks service on %s\n", rb.ServiceAddress)
return nil
}
func (rb *RollbarWebhooks) Stop() {
log.Println("Stopping the rbWebhooks service")
}
func (rb *RollbarWebhooks) eventHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
data, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
dummyEvent := &DummyEvent{}
err = json.Unmarshal(data, dummyEvent)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
event, err := NewEvent(dummyEvent, data)
if err != nil {
w.WriteHeader(http.StatusOK)
return
}
rb.Lock()
rb.events = append(rb.events, event)
rb.Unlock()
w.WriteHeader(http.StatusOK)
}
func generateEvent(event Event, data []byte) (Event, error) {
err := json.Unmarshal(data, event)
if err != nil {
return nil, err
}
return event, nil
}
func NewEvent(dummyEvent *DummyEvent, data []byte) (Event, error) {
switch dummyEvent.EventName {
case "new_item":
return generateEvent(&NewItem{}, data)
case "deploy":
return generateEvent(&Deploy{}, data)
default:
return nil, errors.New("Not implemented type: " + dummyEvent.EventName)
}
}

View File

@ -0,0 +1,78 @@
package rollbar_webhooks
import "strconv"
type Event interface {
Tags() map[string]string
Fields() map[string]interface{}
}
type DummyEvent struct {
EventName string `json:"event_name"`
}
type NewItemDataItemLastOccurence struct {
Language string `json:"language"`
Level string `json:"level"`
}
type NewItemDataItem struct {
Id int `json:"id"`
Environment string `json:"environment"`
ProjectId int `json:"project_id"`
LastOccurence NewItemDataItemLastOccurence `json:"last_occurrence"`
}
type NewItemData struct {
Item NewItemDataItem `json:"item"`
}
type NewItem struct {
EventName string `json:"event_name"`
Data NewItemData `json:"data"`
}
func (ni *NewItem) Tags() map[string]string {
return map[string]string{
"event": ni.EventName,
"environment": ni.Data.Item.Environment,
"project_id": strconv.Itoa(ni.Data.Item.ProjectId),
"language": ni.Data.Item.LastOccurence.Language,
"level": ni.Data.Item.LastOccurence.Level,
}
}
func (ni *NewItem) Fields() map[string]interface{} {
return map[string]interface{}{
"id": ni.Data.Item.Id,
}
}
type DeployDataDeploy struct {
Id int `json:"id"`
Environment string `json:"environment"`
ProjectId int `json:"project_id"`
}
type DeployData struct {
Deploy DeployDataDeploy `json:"deploy"`
}
type Deploy struct {
EventName string `json:"event_name"`
Data DeployData `json:"data"`
}
func (ni *Deploy) Tags() map[string]string {
return map[string]string{
"event": ni.EventName,
"environment": ni.Data.Deploy.Environment,
"project_id": strconv.Itoa(ni.Data.Deploy.ProjectId),
}
}
func (ni *Deploy) Fields() map[string]interface{} {
return map[string]interface{}{
"id": ni.Data.Deploy.Id,
}
}

View File

@ -0,0 +1,96 @@
package rollbar_webhooks
func NewItemJSON() string {
return `
{
"event_name": "new_item",
"data": {
"item": {
"public_item_id": null,
"integrations_data": {},
"last_activated_timestamp": 1382655421,
"unique_occurrences": null,
"id": 272716944,
"environment": "production",
"title": "testing aobg98wrwe",
"last_occurrence_id": 481761639,
"last_occurrence_timestamp": 1382655421,
"platform": 0,
"first_occurrence_timestamp": 1382655421,
"project_id": 90,
"resolved_in_version": null,
"status": 1,
"hash": "c595b2ae0af9b397bb6bdafd57104ac4d5f6b382",
"last_occurrence": {
"body": {
"message": {
"body": "testing aobg98wrwe"
}
},
"uuid": "d2036647-e0b7-4cad-bc98-934831b9b6d1",
"language": "python",
"level": "error",
"timestamp": 1382655421,
"server": {
"host": "dev",
"argv": [
""
]
},
"environment": "production",
"framework": "unknown",
"notifier": {
"version": "0.5.12",
"name": "pyrollbar"
},
"metadata": {
"access_token": "",
"debug": {
"routes": {
"start_time": 1382212080401,
"counters": {
"post_item": 3274122
}
}
},
"customer_timestamp": 1382655421,
"api_server_hostname": "web6"
}
},
"framework": 0,
"total_occurrences": 1,
"level": 40,
"counter": 4,
"first_occurrence_id": 481761639,
"activating_occurrence_id": 481761639
}
}
}`
}
func DeployJSON() string {
return `
{
"event_name": "deploy",
"data": {
"deploy": {
"comment": "deploying webs",
"user_id": 1,
"finish_time": 1382656039,
"start_time": 1382656038,
"id": 187585,
"environment": "production",
"project_id": 90,
"local_username": "brian",
"revision": "e4b9b7db860b2e5ac799f8c06b9498b71ab270bb"
}
}
}`
}
func UnknowJSON() string {
return `
{
"event_name": "roger"
}`
}

View File

@ -0,0 +1,74 @@
package rollbar_webhooks
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/influxdata/telegraf/testutil"
)
func postWebhooks(rb *RollbarWebhooks, eventBody string) *httptest.ResponseRecorder {
req, _ := http.NewRequest("POST", "/", strings.NewReader(eventBody))
w := httptest.NewRecorder()
w.Code = 500
rb.eventHandler(w, req)
return w
}
func TestNewItem(t *testing.T) {
var acc testutil.Accumulator
rb := NewRollbarWebhooks()
resp := postWebhooks(rb, NewItemJSON())
if resp.Code != http.StatusOK {
t.Errorf("POST new_item returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)
}
rb.Gather(&acc)
fields := map[string]interface{}{
"id": 272716944,
}
tags := map[string]string{
"event": "new_item",
"environment": "production",
"project_id": "90",
"language": "python",
"level": "error",
}
acc.AssertContainsTaggedFields(t, "rollbar_webhooks", fields, tags)
}
func TestDeploy(t *testing.T) {
var acc testutil.Accumulator
rb := NewRollbarWebhooks()
resp := postWebhooks(rb, DeployJSON())
if resp.Code != http.StatusOK {
t.Errorf("POST deploy returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)
}
rb.Gather(&acc)
fields := map[string]interface{}{
"id": 187585,
}
tags := map[string]string{
"event": "deploy",
"environment": "production",
"project_id": "90",
}
acc.AssertContainsTaggedFields(t, "rollbar_webhooks", fields, tags)
}
func TestUnknowItem(t *testing.T) {
rb := NewRollbarWebhooks()
resp := postWebhooks(rb, UnknowJSON())
if resp.Code != http.StatusOK {
t.Errorf("POST unknow returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK)
}
}