bug fixes and refactoring
This commit is contained in:
		
							parent
							
								
									ba462f5c94
								
							
						
					
					
						commit
						86961cc814
					
				|  | @ -2,14 +2,40 @@ package particle | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"github.com/gorilla/mux" |  | ||||||
| 	"github.com/influxdata/telegraf" |  | ||||||
| 	"io/ioutil" |  | ||||||
| 	"log" | 	"log" | ||||||
| 	"net/http" | 	"net/http" | ||||||
| 	"time" | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/gorilla/mux" | ||||||
|  | 	"github.com/influxdata/telegraf" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | type event struct { | ||||||
|  | 	Name        string `json:"event"` | ||||||
|  | 	Data        data   `json:"data"` | ||||||
|  | 	TTL         int    `json:"ttl"` | ||||||
|  | 	PublishedAt string `json:"published_at"` | ||||||
|  | 	Database    string `json:"influx_db"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type data struct { | ||||||
|  | 	Tags   map[string]string      `json:"tags"` | ||||||
|  | 	Fields map[string]interface{} `json:"values"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func newEvent() *event { | ||||||
|  | 	return &event{ | ||||||
|  | 		Data: data{ | ||||||
|  | 			Tags:   make(map[string]string), | ||||||
|  | 			Fields: make(map[string]interface{}), | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (e *event) Time() (time.Time, error) { | ||||||
|  | 	return time.Parse("2006-01-02T15:04:05Z", e.PublishedAt) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| type ParticleWebhook struct { | type ParticleWebhook struct { | ||||||
| 	Path string | 	Path string | ||||||
| 	acc  telegraf.Accumulator | 	acc  telegraf.Accumulator | ||||||
|  | @ -23,26 +49,19 @@ func (rb *ParticleWebhook) Register(router *mux.Router, acc telegraf.Accumulator | ||||||
| 
 | 
 | ||||||
| func (rb *ParticleWebhook) eventHandler(w http.ResponseWriter, r *http.Request) { | func (rb *ParticleWebhook) eventHandler(w http.ResponseWriter, r *http.Request) { | ||||||
| 	defer r.Body.Close() | 	defer r.Body.Close() | ||||||
| 	data, err := ioutil.ReadAll(r.Body) | 	e := newEvent() | ||||||
|  | 	if err := json.NewDecoder(r.Body).Decode(e); err != nil { | ||||||
|  | 		log.Println(err) | ||||||
|  | 		w.WriteHeader(http.StatusBadRequest) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	pTime, err := e.Time() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		w.WriteHeader(http.StatusBadRequest) |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 	dummy := &DummyData{} |  | ||||||
| 	if err := json.Unmarshal(data, dummy); err != nil { |  | ||||||
| 		w.WriteHeader(http.StatusBadRequest) |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 	pd := &ParticleData{} |  | ||||||
| 	if err := json.Unmarshal([]byte(dummy.Data), pd); err != nil { |  | ||||||
| 		w.WriteHeader(http.StatusBadRequest) |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 	pTime, err := dummy.Time() |  | ||||||
| 	if err != nil { |  | ||||||
| 		log.Printf("Time Conversion Error") |  | ||||||
| 		pTime = time.Now() | 		pTime = time.Now() | ||||||
|  | 		log.Printf("error parsing particle event time: %s. Using telegraf host time instead: %s", e.PublishedAt, pTime) | ||||||
| 	} | 	} | ||||||
| 	rb.acc.AddFields(dummy.InfluxDB, pd.Fields, pd.Tags, pTime) | 
 | ||||||
|  | 	rb.acc.AddFields(e.Name, e.Data.Fields, e.Data.Tags, pTime) | ||||||
| 	w.WriteHeader(http.StatusOK) | 	w.WriteHeader(http.StatusOK) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -1,22 +0,0 @@ | ||||||
| package particle |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"time" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| type DummyData struct { |  | ||||||
| 	Event       string `json:"event"` |  | ||||||
| 	Data        string `json:"data"` |  | ||||||
| 	Ttl         int    `json:"ttl"` |  | ||||||
| 	PublishedAt string `json:"published_at"` |  | ||||||
| 	InfluxDB    string `json:"influx_db"` |  | ||||||
| } |  | ||||||
| type ParticleData struct { |  | ||||||
| 	Event  string                 `json:"event"` |  | ||||||
| 	Tags   map[string]string      `json:"tags"` |  | ||||||
| 	Fields map[string]interface{} `json:"values"` |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (d *DummyData) Time() (time.Time, error) { |  | ||||||
| 	return time.Parse("2006-01-02T15:04:05Z", d.PublishedAt) |  | ||||||
| } |  | ||||||
|  | @ -1,39 +0,0 @@ | ||||||
| package particle |  | ||||||
| 
 |  | ||||||
| func NewItemJSON() string { |  | ||||||
| 	return ` |  | ||||||
| 	{  |  | ||||||
| 	  "event": "temperature", |  | ||||||
| 	  "data": "{  |  | ||||||
| 		  "tags": { |  | ||||||
| 			  "id": "230035001147343438323536",  |  | ||||||
| 			  "location": "TravelingWilbury" |  | ||||||
| 		  },  |  | ||||||
| 		  "values": { |  | ||||||
| 			  "temp_c": 26.680000,  |  | ||||||
| 			  "temp_f": 80.024001,  |  | ||||||
| 			  "humidity": 44.937500,  |  | ||||||
| 			  "pressure": 998.998901,  |  | ||||||
| 			  "altitude": 119.331436,  |  | ||||||
| 			  "broadband": 1266,  |  | ||||||
| 			  "infrared": 528,  |  | ||||||
| 			  "lux": 0 |  | ||||||
| 		  } |  | ||||||
| 	  }", |  | ||||||
| 	  "ttl": 60, |  | ||||||
| 	  "published_at": "2017-09-28T21:54:10.897Z", |  | ||||||
| 	  "coreid": "123456789938323536", |  | ||||||
| 	  "userid": "1234ee123ac8e5ec1231a123d", |  | ||||||
| 	  "version": 10, |  | ||||||
| 	  "public": false, |  | ||||||
| 	  "productID": 1234, |  | ||||||
| 	  "name": "sensor" |  | ||||||
| 	  "influx_db": "mydata" |  | ||||||
|   }` |  | ||||||
| } |  | ||||||
| func UnknowJSON() string { |  | ||||||
| 	return ` |  | ||||||
|     { |  | ||||||
|       "event": "roger" |  | ||||||
|     }` |  | ||||||
| } |  | ||||||
|  | @ -1,17 +1,16 @@ | ||||||
| package particle | package particle | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"github.com/influxdata/telegraf/testutil" |  | ||||||
| 	"log" |  | ||||||
| 	"net/http" | 	"net/http" | ||||||
| 	"net/http/httptest" | 	"net/http/httptest" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"testing" | 	"testing" | ||||||
|  | 
 | ||||||
|  | 	"github.com/influxdata/telegraf/testutil" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func postWebhooks(rb *ParticleWebhook, eventBody string) *httptest.ResponseRecorder { | func postWebhooks(rb *ParticleWebhook, eventBody string) *httptest.ResponseRecorder { | ||||||
| 	req, _ := http.NewRequest("POST", "/", strings.NewReader(eventBody)) | 	req, _ := http.NewRequest("POST", "/", strings.NewReader(eventBody)) | ||||||
| 	log.Printf("eventBody: %s\n", eventBody) |  | ||||||
| 	w := httptest.NewRecorder() | 	w := httptest.NewRecorder() | ||||||
| 	w.Code = 500 | 	w.Code = 500 | ||||||
| 
 | 
 | ||||||
|  | @ -21,10 +20,10 @@ func postWebhooks(rb *ParticleWebhook, eventBody string) *httptest.ResponseRecor | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func TestNewItem(t *testing.T) { | func TestNewItem(t *testing.T) { | ||||||
|  | 	t.Parallel() | ||||||
| 	var acc testutil.Accumulator | 	var acc testutil.Accumulator | ||||||
| 	rb := &ParticleWebhook{Path: "/particle", acc: &acc} | 	rb := &ParticleWebhook{Path: "/particle", acc: &acc} | ||||||
| 	resp := postWebhooks(rb, NewItemJSON()) | 	resp := postWebhooks(rb, NewItemJSON()) | ||||||
| 	log.Printf("Respnse: %s\n", resp.Body) |  | ||||||
| 	if resp.Code != http.StatusOK { | 	if resp.Code != http.StatusOK { | ||||||
| 		t.Errorf("POST new_item returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) | 		t.Errorf("POST new_item returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) | ||||||
| 	} | 	} | ||||||
|  | @ -32,12 +31,12 @@ func TestNewItem(t *testing.T) { | ||||||
| 	fields := map[string]interface{}{ | 	fields := map[string]interface{}{ | ||||||
| 		"temp_c":    26.680000, | 		"temp_c":    26.680000, | ||||||
| 		"temp_f":    80.024001, | 		"temp_f":    80.024001, | ||||||
|  | 		"infrared":  528.0, | ||||||
|  | 		"lux":       0.0, | ||||||
| 		"humidity":  44.937500, | 		"humidity":  44.937500, | ||||||
| 		"pressure":  998.998901, | 		"pressure":  998.998901, | ||||||
| 		"altitude":  119.331436, | 		"altitude":  119.331436, | ||||||
| 		"broadband": 1266, | 		"broadband": 1266.0, | ||||||
| 		"infrared":  528, |  | ||||||
| 		"lux":       0, |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	tags := map[string]string{ | 	tags := map[string]string{ | ||||||
|  | @ -45,13 +44,54 @@ func TestNewItem(t *testing.T) { | ||||||
| 		"location": "TravelingWilbury", | 		"location": "TravelingWilbury", | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	acc.AssertContainsTaggedFields(t, "particle_webhooks", fields, tags) | 	acc.AssertContainsTaggedFields(t, "temperature", fields, tags) | ||||||
| } | } | ||||||
|  | 
 | ||||||
| func TestUnknowItem(t *testing.T) { | func TestUnknowItem(t *testing.T) { | ||||||
| 	rb := &ParticleWebhook{Path: "/particle"} | 	t.Parallel() | ||||||
|  | 	var acc testutil.Accumulator | ||||||
|  | 	rb := &ParticleWebhook{Path: "/particle", acc: &acc} | ||||||
| 	resp := postWebhooks(rb, UnknowJSON()) | 	resp := postWebhooks(rb, UnknowJSON()) | ||||||
| 	log.Printf("Response: %s\n", resp.Body) |  | ||||||
| 	if resp.Code != http.StatusOK { | 	if resp.Code != http.StatusOK { | ||||||
| 		t.Errorf("POST unknown returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) | 		t.Errorf("POST unknown returned HTTP status code %v.\nExpected %v", resp.Code, http.StatusOK) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func NewItemJSON() string { | ||||||
|  | 	return ` | ||||||
|  | 	{ | ||||||
|  | 	  "event": "temperature", | ||||||
|  | 	  "data": { | ||||||
|  | 		  "tags": { | ||||||
|  | 			  "id": "230035001147343438323536", | ||||||
|  | 			  "location": "TravelingWilbury" | ||||||
|  | 		  }, | ||||||
|  | 		  "values": { | ||||||
|  | 			  "temp_c": 26.680000, | ||||||
|  | 			  "temp_f": 80.024001, | ||||||
|  | 			  "humidity": 44.937500, | ||||||
|  | 			  "pressure": 998.998901, | ||||||
|  | 			  "altitude": 119.331436, | ||||||
|  | 			  "broadband": 1266, | ||||||
|  | 			  "infrared": 528, | ||||||
|  | 			  "lux": 0 | ||||||
|  | 		  } | ||||||
|  | 	  }, | ||||||
|  | 	  "ttl": 60, | ||||||
|  | 	  "published_at": "2017-09-28T21:54:10.897Z", | ||||||
|  | 	  "coreid": "123456789938323536", | ||||||
|  | 	  "userid": "1234ee123ac8e5ec1231a123d", | ||||||
|  | 	  "version": 10, | ||||||
|  | 	  "public": false, | ||||||
|  | 	  "productID": 1234, | ||||||
|  | 	  "name": "sensor", | ||||||
|  | 	  "influx_db": "mydata" | ||||||
|  |   }` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func UnknowJSON() string { | ||||||
|  | 	return ` | ||||||
|  |     { | ||||||
|  |       "event": "roger" | ||||||
|  |     }` | ||||||
|  | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue