diff --git a/.txt b/.txt new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/inputs/webhooks/particle/README.md b/plugins/inputs/webhooks/particle/README.md new file mode 100644 index 000000000..1212b742a --- /dev/null +++ b/plugins/inputs/webhooks/particle/README.md @@ -0,0 +1,26 @@ +# particle webhooks + +You should configure your Rollbar's Webhooks to point at the `webhooks` service. To do this go to `particle.com/` and click `Settings > Notifications > Webhook`. In the resulting page set `URL` to `http://:1619/particle`, and click on `Enable Webhook Integration`. + +## Events + +Your Particle device should publish an event that contains a JSON in the form of: +``` +String data = String::format("{ \"tags\" : { + \"tag_name\": \"tag_value\", + \"other_tag\": \"other_value\" + }, + \"values\": { + \"value_name\": %f, + \"other_value\": %f, + } + }", value_value, other_value + ); + Particle.publish("event_name", data, PRIVATE); +``` +Escaping the "" is required in the source file. +The number of tag values and field values is not restrictied so you can send as many values per webhook call as you'd like. + +You will need to enable JSON messages in the Webhooks setup of Particle.io + +See [webhook doc](https://docs.particle.io/reference/webhooks/) diff --git a/plugins/inputs/webhooks/particle/particle_webhooks.go b/plugins/inputs/webhooks/particle/particle_webhooks.go new file mode 100644 index 000000000..813bd0665 --- /dev/null +++ b/plugins/inputs/webhooks/particle/particle_webhooks.go @@ -0,0 +1,48 @@ +package particle + +import ( + "encoding/json" + "github.com/gorilla/mux" + "github.com/influxdata/telegraf" + "io/ioutil" + "log" + "net/http" + "time" +) + +type ParticleWebhook struct { + Path string + acc telegraf.Accumulator +} + +func (rb *ParticleWebhook) Register(router *mux.Router, acc telegraf.Accumulator) { + router.HandleFunc(rb.Path, rb.eventHandler).Methods("POST") + log.Printf("I! Started the webhooks_particle on %s\n", rb.Path) + rb.acc = acc +} + +func (rb *ParticleWebhook) eventHandler(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + data, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + dummy := &DummyData{} + if err := json.Unmarshal(data, dummy); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + pd := &ParticleData{} + if err := json.Unmarshal([]byte(dummy.Data), pd); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + pTime, err := dummy.Time() + if err != nil { + log.Printf("Time Conversion Error") + pTime = time.Now() + } + rb.acc.AddFields(dummy.InfluxDB, pd.Fields, pd.Tags, pTime) + w.WriteHeader(http.StatusOK) +} diff --git a/plugins/inputs/webhooks/particle/particle_webhooks_events.go b/plugins/inputs/webhooks/particle/particle_webhooks_events.go new file mode 100644 index 000000000..089536525 --- /dev/null +++ b/plugins/inputs/webhooks/particle/particle_webhooks_events.go @@ -0,0 +1,22 @@ +package particle + +import ( + "time" +) + +type DummyData struct { + Event string `json:"event"` + Data string `json:"data"` + Ttl int `json:"ttl"` + PublishedAt string `json:"published_at"` + InfluxDB string `json:"influx_db"` +} +type ParticleData struct { + Event string `json:"event"` + Tags map[string]string `json:"tags"` + Fields map[string]interface{} `json:"values"` +} + +func (d *DummyData) Time() (time.Time, error) { + return time.Parse("2006-01-02T15:04:05Z", d.PublishedAt) +} diff --git a/plugins/inputs/webhooks/particle/particle_webhooks_events_json_test.go b/plugins/inputs/webhooks/particle/particle_webhooks_events_json_test.go new file mode 100644 index 000000000..aef0537e9 --- /dev/null +++ b/plugins/inputs/webhooks/particle/particle_webhooks_events_json_test.go @@ -0,0 +1,39 @@ +package particle + +func NewItemJSON() string { + return ` + { + "event": "temperature", + "data": "{ + "tags": { + "id": "230035001147343438323536", + "location": "TravelingWilbury" + }, + "values": { + "temp_c": 26.680000, + "temp_f": 80.024001, + "humidity": 44.937500, + "pressure": 998.998901, + "altitude": 119.331436, + "broadband": 1266, + "infrared": 528, + "lux": 0 + } + }", + "ttl": 60, + "published_at": "2017-09-28T21:54:10.897Z", + "coreid": "123456789938323536", + "userid": "1234ee123ac8e5ec1231a123d", + "version": 10, + "public": false, + "productID": 1234, + "name": "sensor" + "influx_db": "mydata" + }` +} +func UnknowJSON() string { + return ` + { + "event": "roger" + }` +} diff --git a/plugins/inputs/webhooks/particle/particle_webhooks_test.go b/plugins/inputs/webhooks/particle/particle_webhooks_test.go new file mode 100644 index 000000000..c62e0f0c8 --- /dev/null +++ b/plugins/inputs/webhooks/particle/particle_webhooks_test.go @@ -0,0 +1,57 @@ +package particle + +import ( + "github.com/influxdata/telegraf/testutil" + "log" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func postWebhooks(rb *ParticleWebhook, eventBody string) *httptest.ResponseRecorder { + req, _ := http.NewRequest("POST", "/", strings.NewReader(eventBody)) + log.Printf("eventBody: %s\n", eventBody) + w := httptest.NewRecorder() + w.Code = 500 + + rb.eventHandler(w, req) + + return w +} + +func TestNewItem(t *testing.T) { + var acc testutil.Accumulator + rb := &ParticleWebhook{Path: "/particle", acc: &acc} + resp := postWebhooks(rb, NewItemJSON()) + log.Printf("Respnse: %s\n", resp.Body) + if resp.Code != http.StatusOK { + t.Errorf("POST new_item returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) + } + + fields := map[string]interface{}{ + "temp_c": 26.680000, + "temp_f": 80.024001, + "humidity": 44.937500, + "pressure": 998.998901, + "altitude": 119.331436, + "broadband": 1266, + "infrared": 528, + "lux": 0, + } + + tags := map[string]string{ + "id": "230035001147343438323536", + "location": "TravelingWilbury", + } + + acc.AssertContainsTaggedFields(t, "particle_webhooks", fields, tags) +} +func TestUnknowItem(t *testing.T) { + rb := &ParticleWebhook{Path: "/particle"} + resp := postWebhooks(rb, UnknowJSON()) + log.Printf("Response: %s\n", resp.Body) + if resp.Code != http.StatusOK { + t.Errorf("POST unknown returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) + } +} diff --git a/plugins/inputs/webhooks/webhooks.go b/plugins/inputs/webhooks/webhooks.go index 698cde159..794b55168 100644 --- a/plugins/inputs/webhooks/webhooks.go +++ b/plugins/inputs/webhooks/webhooks.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs/webhooks/github" "github.com/influxdata/telegraf/plugins/inputs/webhooks/mandrill" "github.com/influxdata/telegraf/plugins/inputs/webhooks/papertrail" + "github.com/influxdata/telegraf/plugins/inputs/webhooks/particle" "github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar" ) @@ -34,6 +35,7 @@ type Webhooks struct { Mandrill *mandrill.MandrillWebhook Rollbar *rollbar.RollbarWebhook Papertrail *papertrail.PapertrailWebhook + Particle *particle.ParticleWebhook srv *http.Server } @@ -62,6 +64,9 @@ func (wb *Webhooks) SampleConfig() string { [inputs.webhooks.papertrail] path = "/papertrail" + + [inputs.webhooks.particle] + path = "/particle" ` } diff --git a/plugins/inputs/webhooks/webhooks_test.go b/plugins/inputs/webhooks/webhooks_test.go index 6d3448870..1a5fa4aa1 100644 --- a/plugins/inputs/webhooks/webhooks_test.go +++ b/plugins/inputs/webhooks/webhooks_test.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs/webhooks/github" "github.com/influxdata/telegraf/plugins/inputs/webhooks/papertrail" + "github.com/influxdata/telegraf/plugins/inputs/webhooks/particle" "github.com/influxdata/telegraf/plugins/inputs/webhooks/rollbar" ) @@ -33,4 +34,9 @@ func TestAvailableWebhooks(t *testing.T) { if !reflect.DeepEqual(wb.AvailableWebhooks(), expected) { t.Errorf("expected to be %v.\nGot %v", expected, wb.AvailableWebhooks()) } + wb.Particle = &particle.ParticleWebhook{Path: "/particle"} + expected = append(expected, wb.Particle) + if !reflect.DeepEqual(wb.AvailableWebhooks(), expected) { + t.Errorf("expected to be %v.\nGot %v", expected, wb.AvailableWebhooks()) + } }