diff --git a/go.mod b/go.mod index de62dc620..5b23b61db 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ require ( github.com/gobwas/glob v0.2.3 github.com/gofrs/uuid v2.1.0+incompatible github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d + github.com/golang/geo v0.0.0-20190916061304-5b978397cfec github.com/golang/mock v1.3.1-0.20190508161146-9fa652df1129 // indirect github.com/golang/protobuf v1.3.2 github.com/google/go-cmp v0.4.0 diff --git a/go.sum b/go.sum index b15ec7343..995bcce10 100644 --- a/go.sum +++ b/go.sum @@ -166,6 +166,8 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d h1:3PaI8p3seN09VjbTYC/QWlUZdZ1qS1zGjy7LH2Wt07I= github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= +github.com/golang/geo v0.0.0-20190916061304-5b978397cfec h1:lJwO/92dFXWeXOZdoGXgptLmNLwynMSHUmU6besqtiw= +github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index ba72ee10e..e47445059 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -5,6 +5,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/converter" _ "github.com/influxdata/telegraf/plugins/processors/date" _ "github.com/influxdata/telegraf/plugins/processors/enum" + _ "github.com/influxdata/telegraf/plugins/processors/geo" _ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/parser" _ "github.com/influxdata/telegraf/plugins/processors/pivot" diff --git a/plugins/processors/geo/README.md b/plugins/processors/geo/README.md new file mode 100644 index 000000000..5a65d5e7d --- /dev/null +++ b/plugins/processors/geo/README.md @@ -0,0 +1,29 @@ +# S2 Geo Processor Plugin + +Use the `s2geo` processor to add tag with S2 cell ID token of specified [cell level][cell levels]. +The tag is used in `experimental/geo` Flux package functions. +The `lat` and `lon` fields values should contain WGS-84 coordinates in decimal degrees. + +### Configuration + +```toml +[[processors.geo]] + ## The name of the lat and lon fields containing WGS-84 latitude and longitude in decimal degrees + lat_field = "lat" + lon_field = "lon" + + ## New tag to create + tag_key = "s2_cell_id" + + ## Cell level (see https://s2geometry.io/resources/s2cell_statistics.html) + cell_level = 11 +``` + +### Example + +```diff +- mta,area=llir,id=GO505_20_2704,status=1 lat=40.878738,lon=-72.517572 1560540094 ++ mta,area=llir,id=GO505_20_2704,status=1,s2_cell_id=89e8ed4 lat=40.878738,lon=-72.517572 1560540094 +``` + +[cell levels]: https://s2geometry.io/resources/s2cell_statistics.html diff --git a/plugins/processors/geo/geo.go b/plugins/processors/geo/geo.go new file mode 100644 index 000000000..85f80c3df --- /dev/null +++ b/plugins/processors/geo/geo.go @@ -0,0 +1,76 @@ +package geo + +import ( + "fmt" + "github.com/golang/geo/s2" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +type Geo struct { + LatField string `toml:"lat_field"` + LonField string `toml:"lon_field"` + TagKey string `toml:"tag_key"` + CellLevel int `toml:"cell_level"` +} + +var SampleConfig = ` + ## The name of the lat and lon fields containing WGS-84 latitude and longitude in decimal degrees + lat_field = "lat" + lon_field = "lon" + + ## New tag to create + tag_key = "s2_cell_id" + + ## Cell level (see https://s2geometry.io/resources/s2cell_statistics.html) + cell_level = 9 +` + +func (g *Geo) SampleConfig() string { + return SampleConfig +} + +func (g *Geo) Description() string { + return "Reads latitude and longitude fields and adds tag with with S2 cell ID token of specified level." +} + +func (g *Geo) Init() error { + if g.CellLevel < 0 || g.CellLevel > 30 { + return fmt.Errorf("invalid cell level %d", g.CellLevel) + } + return nil +} + +func (g *Geo) Apply(in ...telegraf.Metric) []telegraf.Metric { + for _, point := range in { + var latOk, lonOk bool + var lat, lon float64 + for _, field := range point.FieldList() { + switch field.Key { + case g.LatField: + lat, latOk = field.Value.(float64) + case g.LonField: + lon, lonOk = field.Value.(float64) + } + } + if latOk && lonOk { + cellID := s2.CellIDFromLatLng(s2.LatLngFromDegrees(lat, lon)) + if cellID.IsValid() { + value := cellID.Parent(g.CellLevel).ToToken() + point.AddTag(g.TagKey, value) + } + } + } + return in +} + +func init() { + processors.Add("s2geo", func() telegraf.Processor { + return &Geo{ + LatField: "lat", + LonField: "lon", + TagKey: "s2_cell_id", + CellLevel: 9, + } + }) +} diff --git a/plugins/processors/geo/geo_test.go b/plugins/processors/geo/geo_test.go new file mode 100644 index 000000000..b06a1a06d --- /dev/null +++ b/plugins/processors/geo/geo_test.go @@ -0,0 +1,55 @@ +package geo + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestGeo(t *testing.T) { + plugin := &Geo{ + LatField: "lat", + LonField: "lon", + TagKey: "s2_cell_id", + CellLevel: 11, + } + + pluginMostlyDefault := &Geo{ + CellLevel: 11, + } + + err := plugin.Init() + require.NoError(t, err) + + metric := testutil.MustMetric( + "mta", + map[string]string{}, + map[string]interface{}{ + "lat": 40.878738, + "lon": -72.517572, + }, + time.Unix(1578603600, 0), + ) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "mta", + map[string]string{ + "s2_cell_id": "89e8ed4", + }, + map[string]interface{}{ + "lat": 40.878738, + "lon": -72.517572, + }, + time.Unix(1578603600, 0), + ), + } + + actual := plugin.Apply(metric) + testutil.RequireMetricsEqual(t, expected, actual) + actual = pluginMostlyDefault.Apply(metric) + testutil.RequireMetricsEqual(t, expected, actual) +}