Add Azure IoT Hub / Event Hub input plugin (#6928)
This commit is contained in:
parent
2a465cc687
commit
f69b639aa9
|
@ -185,6 +185,7 @@ For documentation on the latest development code see the [documentation index][d
|
|||
* [aws ecs](./plugins/inputs/ecs) (Amazon Elastic Container Service, Fargate)
|
||||
* [elasticsearch](./plugins/inputs/elasticsearch)
|
||||
* [ethtool](./plugins/inputs/ethtool)
|
||||
* [eventhub](./plugins/inputs/eventhub) (Azure Event Hubs \& Azure IoT Hub)
|
||||
* [exec](./plugins/inputs/exec) (generic executable plugin, support JSON, influx, graphite and nagios)
|
||||
* [execd](./plugins/inputs/execd)
|
||||
* [fail2ban](./plugins/inputs/fail2ban)
|
||||
|
|
|
@ -6,8 +6,12 @@ following works:
|
|||
- cloud.google.com/go [Apache License 2.0](https://github.com/GoogleCloudPlatform/google-cloud-go/blob/master/LICENSE)
|
||||
- code.cloudfoundry.org/clock [Apache License 2.0](https://github.com/cloudfoundry/clock/blob/master/LICENSE)
|
||||
- collectd.org [MIT License](https://git.octo.it/?p=collectd.git;a=blob;f=COPYING;hb=HEAD)
|
||||
- github.com/Azure/azure-amqp-common-go [MIT License](https://github.com/Azure/azure-amqp-common-go/blob/master/LICENSE)
|
||||
- github.com/Azure/azure-event-hubs-go [MIT License](https://github.com/Azure/azure-event-hubs-go/blob/master/LICENSE)
|
||||
- github.com/Azure/azure-pipeline-go [MIT License](https://github.com/Azure/azure-pipeline-go/blob/master/LICENSE)
|
||||
- github.com/Azure/azure-sdk-for-go [Apache License 2.0](https://github.com/Azure/azure-sdk-for-go/blob/master/LICENSE)
|
||||
- github.com/Azure/azure-storage-queue-go [MIT License](https://github.com/Azure/azure-storage-queue-go/blob/master/LICENSE)
|
||||
- github.com/Azure/go-amqp [MIT License](https://github.com/Azure/go-amqp/blob/master/LICENSE)
|
||||
- github.com/Azure/go-autorest [Apache License 2.0](https://github.com/Azure/go-autorest/blob/master/LICENSE)
|
||||
- github.com/Mellanox/rdmamap [Apache License 2.0](https://github.com/Mellanox/rdmamap/blob/master/LICENSE)
|
||||
- github.com/Microsoft/ApplicationInsights-Go [MIT License](https://github.com/Microsoft/ApplicationInsights-Go/blob/master/LICENSE)
|
||||
|
@ -30,6 +34,7 @@ following works:
|
|||
- github.com/couchbase/goutils [COUCHBASE INC. COMMUNITY EDITION LICENSE](https://github.com/couchbase/goutils/blob/master/LICENSE.md)
|
||||
- github.com/davecgh/go-spew [ISC License](https://github.com/davecgh/go-spew/blob/master/LICENSE)
|
||||
- github.com/denisenkom/go-mssqldb [BSD 3-Clause "New" or "Revised" License](https://github.com/denisenkom/go-mssqldb/blob/master/LICENSE.txt)
|
||||
- github.com/devigned/tab [MIT License](https://github.com/devigned/tab/blob/master/LICENSE)
|
||||
- github.com/dgrijalva/jwt-go [MIT License](https://github.com/dgrijalva/jwt-go/blob/master/LICENSE)
|
||||
- github.com/dimchansky/utfbom [Apache License 2.0](https://github.com/dimchansky/utfbom/blob/master/LICENSE)
|
||||
- github.com/docker/distribution [Apache License 2.0](https://github.com/docker/distribution/blob/master/LICENSE)
|
||||
|
@ -76,6 +81,7 @@ following works:
|
|||
- github.com/jackc/pgx [MIT License](https://github.com/jackc/pgx/blob/master/LICENSE)
|
||||
- github.com/jcmturner/gofork [BSD 3-Clause "New" or "Revised" License](https://github.com/jcmturner/gofork/blob/master/LICENSE)
|
||||
- github.com/jmespath/go-jmespath [Apache License 2.0](https://github.com/jmespath/go-jmespath/blob/master/LICENSE)
|
||||
- github.com/jpillora/backoff [MIT License](https://github.com/jpillora/backoff/blob/master/LICENSE)
|
||||
- github.com/kardianos/service [zlib License](https://github.com/kardianos/service/blob/master/LICENSE)
|
||||
- github.com/karrick/godirwalk [BSD 2-Clause "Simplified" License](https://github.com/karrick/godirwalk/blob/master/LICENSE)
|
||||
- github.com/kballard/go-shellquote [MIT License](https://github.com/kballard/go-shellquote/blob/master/LICENSE)
|
||||
|
|
2
go.mod
2
go.mod
|
@ -6,6 +6,7 @@ require (
|
|||
cloud.google.com/go v0.37.4
|
||||
code.cloudfoundry.org/clock v1.0.0 // indirect
|
||||
collectd.org v0.3.0
|
||||
github.com/Azure/azure-event-hubs-go/v3 v3.2.0
|
||||
github.com/Azure/azure-storage-queue-go v0.0.0-20181215014128-6ed74e755687
|
||||
github.com/Azure/go-autorest/autorest v0.9.3
|
||||
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
|
||||
|
@ -86,7 +87,6 @@ require (
|
|||
github.com/mdlayher/apcupsd v0.0.0-20190314144147-eb3dd99a75fe
|
||||
github.com/miekg/dns v1.0.14
|
||||
github.com/mitchellh/go-testing-interface v1.0.0 // indirect
|
||||
github.com/mitchellh/mapstructure v0.0.0-20180715050151-f15292f7a699 // indirect
|
||||
github.com/multiplay/go-ts3 v1.0.0
|
||||
github.com/naoina/go-stringutil v0.1.0 // indirect
|
||||
github.com/nats-io/nats-server/v2 v2.1.4
|
||||
|
|
43
go.sum
43
go.sum
|
@ -6,10 +6,20 @@ code.cloudfoundry.org/clock v1.0.0 h1:kFXWQM4bxYvdBw2X8BbBeXwQNgfoWv1vqAk2ZZyBN2
|
|||
code.cloudfoundry.org/clock v1.0.0/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8=
|
||||
collectd.org v0.3.0 h1:iNBHGw1VvPJxH2B6RiFWFZ+vsjo1lCdRszBeOuwGi00=
|
||||
collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE=
|
||||
github.com/Azure/azure-pipeline-go v0.1.8 h1:KmVRa8oFMaargVesEuuEoiLCQ4zCCwQ8QX/xg++KS20=
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.0.0 h1:j9tjcwhypb/jek3raNrwlCIl7iKQYOug7CLpSyBBodc=
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.0.0/go.mod h1:SY08giD/XbhTz07tJdpw1SoxQXHPN30+DI3Z04SYqyg=
|
||||
github.com/Azure/azure-event-hubs-go/v3 v3.2.0 h1:CQlxKH5a4NX1ZmbdqXUPRwuNGh2XvtgmhkZvkEuWzhs=
|
||||
github.com/Azure/azure-event-hubs-go/v3 v3.2.0/go.mod h1:BPIIJNH/l/fVHYq3Rm6eg4clbrULrQ3q7+icmqHyyLc=
|
||||
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
|
||||
github.com/Azure/azure-pipeline-go v0.1.9 h1:u7JFb9fFTE6Y/j8ae2VK33ePrRqJqoCM/IWkQdAZ+rg=
|
||||
github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
|
||||
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible h1:aFlw3lP7ZHQi4m1kWCpcwYtczhDkGhDoRaMTaxcOf68=
|
||||
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
|
||||
github.com/Azure/azure-storage-queue-go v0.0.0-20181215014128-6ed74e755687 h1:7MiZ6Th+YTmwUdrKmFg5OMsGYz7IdQwjqL0RPxkhhOQ=
|
||||
github.com/Azure/azure-storage-queue-go v0.0.0-20181215014128-6ed74e755687/go.mod h1:K6am8mT+5iFXgingS9LUc7TmbsW6XBw3nxaRyaMyWc8=
|
||||
github.com/Azure/go-amqp v0.12.6 h1:34yItuwhA/nusvq2sPSNPQxZLCf/CtaogYH8n578mnY=
|
||||
github.com/Azure/go-amqp v0.12.6/go.mod h1:qApuH6OFTSKZFmCOxccvAv5rLizBQf4v8pRmG138DPo=
|
||||
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
|
||||
github.com/Azure/go-autorest/autorest v0.9.3 h1:OZEIaBbMdUE/Js+BQKlpO81XlISgipr6yDJ+PSwsgi4=
|
||||
github.com/Azure/go-autorest/autorest v0.9.3/go.mod h1:GsRuLYvwzLjjjRoWEIyMUaYq8GNUx2nRB378IPt/1p0=
|
||||
|
@ -28,6 +38,10 @@ github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxB
|
|||
github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
|
||||
github.com/Azure/go-autorest/autorest/mocks v0.3.0 h1:qJumjCaCudz+OcqE9/XtEPfvtOjOmKaui4EOpFI6zZc=
|
||||
github.com/Azure/go-autorest/autorest/mocks v0.3.0/go.mod h1:a8FDP3DYzQ4RYfVAxAN3SVSiiO77gL2j2ronKKP0syM=
|
||||
github.com/Azure/go-autorest/autorest/to v0.3.0 h1:zebkZaadz7+wIQYgC7GXaz3Wb28yKYfVkkBKwc38VF8=
|
||||
github.com/Azure/go-autorest/autorest/to v0.3.0/go.mod h1:MgwOyqaIuKdG4TL/2ywSsIWKAfJfgHDo8ObuUk3t5sA=
|
||||
github.com/Azure/go-autorest/autorest/validation v0.2.0 h1:15vMO4y76dehZSq7pAaOLQxC6dZYsSrj2GQpflyM/L4=
|
||||
github.com/Azure/go-autorest/autorest/validation v0.2.0/go.mod h1:3EEqHnBxQGHXRYq3HT1WyXAvT7LLY3tl70hw6tQIbjI=
|
||||
github.com/Azure/go-autorest/logger v0.1.0 h1:ruG4BSDXONFRrZZJ2GUXDiUyVpayPmb1GnWeHDdaNKY=
|
||||
github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc=
|
||||
github.com/Azure/go-autorest/tracing v0.5.0 h1:TRn4WjSnkcSy5AEG3pnbtFSwNtwzjr4VYyQflFE619k=
|
||||
|
@ -98,6 +112,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
|
|||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4 h1:YcpmyvADGYw5LqMnHqSkyIELsHCGF6PkrmM31V8rF7o=
|
||||
github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM=
|
||||
github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA=
|
||||
github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||
github.com/dimchansky/utfbom v1.1.0 h1:FcM3g+nofKgUteL8dm/UpdRXNC9KmADgTpLKsu0TRo4=
|
||||
|
@ -165,7 +181,6 @@ github.com/gofrs/uuid v2.1.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRx
|
|||
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
|
||||
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
|
||||
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||
github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI=
|
||||
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d h1:3PaI8p3seN09VjbTYC/QWlUZdZ1qS1zGjy7LH2Wt07I=
|
||||
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
|
@ -180,7 +195,6 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
|
|||
github.com/golang/mock v1.3.1-0.20190508161146-9fa652df1129 h1:tT8iWCYw4uOem71yYA3htfH+LNopJvcqZQshm56G5L4=
|
||||
github.com/golang/mock v1.3.1-0.20190508161146-9fa652df1129/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
|
||||
github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
|
||||
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
|
@ -190,10 +204,13 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
|
|||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
|
||||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
|
||||
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY=
|
||||
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
|
||||
|
@ -240,7 +257,6 @@ github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerX
|
|||
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
|
||||
github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
|
||||
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
|
||||
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
|
||||
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
|
||||
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
|
@ -267,6 +283,10 @@ github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem
|
|||
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
|
||||
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
|
||||
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
|
||||
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 h1:K//n/AqR5HjG3qxbrBCL4vJPW0MVFSs9CPK1OOJdRME=
|
||||
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
|
||||
github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw=
|
||||
github.com/jsimonetti/rtnetlink v0.0.0-20200117123717-f846d4f6c1f4 h1:nwOc1YaOrYJ37sEBrtWZrdqzK22hiJs3GpDmP3sR2Yw=
|
||||
github.com/jsimonetti/rtnetlink v0.0.0-20200117123717-f846d4f6c1f4/go.mod h1:WGuG/smIU4J/54PblvSbh+xvCZmpJnFgr3ds6Z55XMQ=
|
||||
|
@ -326,8 +346,8 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG
|
|||
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||
github.com/mitchellh/go-testing-interface v1.0.0 h1:fzU/JVNcaqHQEcVFAKeR41fkiLdIPrefOvVG1VZ96U0=
|
||||
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
|
||||
github.com/mitchellh/mapstructure v0.0.0-20180715050151-f15292f7a699 h1:KXZJFdun9knAVAR8tg/aHJEr5DgtcbqyvzacK+CDCaI=
|
||||
github.com/mitchellh/mapstructure v0.0.0-20180715050151-f15292f7a699/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
|
||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v0.0.0-20180320133207-05fbef0ca5da/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
|
||||
|
@ -356,12 +376,10 @@ github.com/nsqio/go-nsq v1.0.7 h1:O0pIZJYTf+x7cZBA0UMY8WxFG79lYTURmWzAAh48ljY=
|
|||
github.com/nsqio/go-nsq v1.0.7/go.mod h1:XP5zaUs3pqf+Q71EqUJs3HYfBIqfK6G83WQMdNN+Ito=
|
||||
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
|
||||
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
|
||||
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
|
||||
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
|
||||
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
|
||||
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
|
@ -384,7 +402,6 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T
|
|||
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
github.com/pierrec/lz4 v2.2.6+incompatible h1:6aCX4/YZ9v8q69hTyiR7dNLnTA3fgtKHVVW5BCd5Znw=
|
||||
github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
|
||||
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
|
@ -430,7 +447,6 @@ github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
|
|||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
|
@ -497,7 +513,7 @@ golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73r
|
|||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191003171128-d98b1b443823/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
|
@ -538,7 +554,6 @@ golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 h1:sfkvUWPNGwSV+8/fNqctR5lS2
|
|||
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 h1:z99zHgr7hKfrUcX/KsoJk5FJfjTceCKIp96+biqP4To=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
|
@ -556,6 +571,7 @@ golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3
|
|||
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.zx2c4.com/wireguard v0.0.20200121 h1:vcswa5Q6f+sylDfjqyrVNNrjsFUUbPsgAQTBCAg/Qf8=
|
||||
golang.zx2c4.com/wireguard v0.0.20200121/go.mod h1:P2HsVp8SKwZEufsnezXZA4GRX/T49/HlU7DGuelXsU4=
|
||||
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4 h1:KTi97NIQGgSMaN0v/oxniJV0MEzfzmrDUOAWxombQVc=
|
||||
|
@ -613,7 +629,6 @@ gopkg.in/olivere/elastic.v5 v5.0.70/go.mod h1:FylZT6jQWtfHsicejzOm3jIMVPOAksa80i
|
|||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
|
||||
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
|
|
@ -40,6 +40,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/inputs/ecs"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/ethtool"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/eventhub"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/exec"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/execd"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/fail2ban"
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
# Azure Event Hubs input plugin
|
||||
|
||||
This plugin provides a consumer for use with Azure Event Hubs and Azure IoT Hub. The implementation is in essence a wrapper for [Microsoft Azure Event Hubs Client for Golang](https://github.com/Azure/azure-event-hubs-go).
|
||||
|
||||
## Configuration
|
||||
|
||||
```toml
|
||||
[[inputs.eventhub]]
|
||||
## The default behavior is to create a new Event Hub client from environment variables.
|
||||
## This requires one of the following sets of environment variables to be set:
|
||||
##
|
||||
## 1) Expected Environment Variables:
|
||||
## - "EVENTHUB_NAMESPACE"
|
||||
## - "EVENTHUB_NAME"
|
||||
## - "EVENTHUB_CONNECTION_STRING"
|
||||
##
|
||||
## 2) Expected Environment Variables:
|
||||
## - "EVENTHUB_NAMESPACE"
|
||||
## - "EVENTHUB_NAME"
|
||||
## - "EVENTHUB_KEY_NAME"
|
||||
## - "EVENTHUB_KEY_VALUE"
|
||||
|
||||
## Uncommenting the option below will create an Event Hub client based solely on the connection string.
|
||||
## This can either be the associated environment variable or hard coded directly.
|
||||
# connection_string = "$EVENTHUB_CONNECTION_STRING"
|
||||
|
||||
## Set persistence directory to a valid folder to use a file persister instead of an in-memory persister
|
||||
# persistence_dir = ""
|
||||
|
||||
## Change the default consumer group
|
||||
# consumer_group = ""
|
||||
|
||||
## By default the event hub receives all messages present on the broker, alternative modes can be set below.
|
||||
## The timestamp should be in https://github.com/toml-lang/toml#offset-date-time format (RFC 3339).
|
||||
## The 3 options below only apply if no valid persister is read from memory or file (e.g. first run).
|
||||
# from_timestamp =
|
||||
# latest = true
|
||||
|
||||
## Set a custom prefetch count for the receiver(s)
|
||||
# prefetch_count = 1000
|
||||
|
||||
## Add an epoch to the receiver(s)
|
||||
# epoch = 0
|
||||
|
||||
## Change to set a custom user agent, "telegraf" is used by default
|
||||
# user_agent = "telegraf"
|
||||
|
||||
## To consume from a specific partition, set the partition_ids option.
|
||||
## An empty array will result in receiving from all partitions.
|
||||
# partition_ids = ["0","1"]
|
||||
|
||||
## Max undelivered messages
|
||||
# max_undelivered_messages = 1000
|
||||
|
||||
## Set either option below to true to use a system property as timestamp.
|
||||
## You have the choice between EnqueuedTime and IoTHubEnqueuedTime.
|
||||
## It is recommended to use this setting when the data itself has no timestamp.
|
||||
# enqueued_time_as_ts = true
|
||||
# iot_hub_enqueued_time_as_ts = true
|
||||
|
||||
## Tags or fields to create from keys present in the application property bag.
|
||||
## These could for example be set by message enrichments in Azure IoT Hub.
|
||||
application_property_tags = []
|
||||
application_property_fields = []
|
||||
|
||||
## Tag or field name to use for metadata
|
||||
## By default all metadata is disabled
|
||||
# sequence_number_field = "SequenceNumber"
|
||||
# enqueued_time_field = "EnqueuedTime"
|
||||
# offset_field = "Offset"
|
||||
# partition_id_tag = "PartitionID"
|
||||
# partition_key_tag = "PartitionKey"
|
||||
# iot_hub_device_connection_id_tag = "IoTHubDeviceConnectionID"
|
||||
# iot_hub_auth_generation_id_tag = "IoTHubAuthGenerationID"
|
||||
# iot_hub_connection_auth_method_tag = "IoTHubConnectionAuthMethod"
|
||||
# iot_hub_connection_module_id_tag = "IoTHubConnectionModuleID"
|
||||
# iot_hub_enqueued_time_field = "IoTHubEnqueuedTime"
|
||||
|
||||
## Data format to consume.
|
||||
## Each data format has its own unique set of configuration options, read
|
||||
## more about them here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
data_format = "influx"
|
||||
```
|
||||
## Testing
|
||||
|
||||
The main focus for development of this plugin is Azure IoT hub:
|
||||
|
||||
1. Create an Azure IoT Hub by following any of the guides provided here: https://docs.microsoft.com/en-us/azure/iot-hub/
|
||||
2. Create a device, for example a [simulated Raspberry Pi](https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-raspberry-pi-web-simulator-get-started)
|
||||
3. The connection string needed for the plugin is located under *Shared access policies*, both the *iothubowner* and *service* policies should work
|
||||
|
||||
## Untested features:
|
||||
|
||||
- Authentication with [AAD TokenProvider environment variables](https://github.com/Azure/azure-event-hubs-go#aad-tokenprovider-environment-variables)
|
||||
|
||||
## Not implemented:
|
||||
- [Event Processor Host](https://github.com/Azure/azure-event-hubs-go#event-processor-host) (should only be needed when using multiple Telegraf instances consuming the same partition)
|
|
@ -0,0 +1,422 @@
|
|||
package eventhub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
eventhub "github.com/Azure/azure-event-hubs-go/v3"
|
||||
"github.com/Azure/azure-event-hubs-go/v3/persist"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMaxUndeliveredMessages = 1000
|
||||
)
|
||||
|
||||
type empty struct{}
|
||||
type semaphore chan empty
|
||||
|
||||
// EventHub is the top level struct for this plugin
|
||||
type EventHub struct {
|
||||
// Configuration
|
||||
ConnectionString string `toml:"connection_string"`
|
||||
PersistenceDir string `toml:"persistence_dir"`
|
||||
ConsumerGroup string `toml:"consumer_group"`
|
||||
FromTimestamp time.Time `toml:"from_timestamp"`
|
||||
Latest bool `toml:"latest"`
|
||||
PrefetchCount uint32 `toml:"prefetch_count"`
|
||||
Epoch int64 `toml:"epoch"`
|
||||
UserAgent string `toml:"user_agent"`
|
||||
PartitionIDs []string `toml:"partition_ids"`
|
||||
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
||||
EnqueuedTimeAsTs bool `toml:"enqueued_time_as_ts"`
|
||||
IotHubEnqueuedTimeAsTs bool `toml:"iot_hub_enqueued_time_as_ts"`
|
||||
|
||||
// Metadata
|
||||
ApplicationPropertyFields []string `toml:"application_property_fields"`
|
||||
ApplicationPropertyTags []string `toml:"application_property_tags"`
|
||||
SequenceNumberField string `toml:"sequence_number_field"`
|
||||
EnqueuedTimeField string `toml:"enqueued_time_field"`
|
||||
OffsetField string `toml:"offset_field"`
|
||||
PartitionIDTag string `toml:"partition_id_tag"`
|
||||
PartitionKeyTag string `toml:"partition_key_tag"`
|
||||
IoTHubDeviceConnectionIDTag string `toml:"iot_hub_device_connection_id_tag"`
|
||||
IoTHubAuthGenerationIDTag string `toml:"iot_hub_auth_generation_id_tag"`
|
||||
IoTHubConnectionAuthMethodTag string `toml:"iot_hub_connection_auth_method_tag"`
|
||||
IoTHubConnectionModuleIDTag string `toml:"iot_hub_connection_module_id_tag"`
|
||||
IoTHubEnqueuedTimeField string `toml:"iot_hub_enqueued_time_field"`
|
||||
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
// Azure
|
||||
hub *eventhub.Hub
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
parser parsers.Parser
|
||||
in chan []telegraf.Metric
|
||||
}
|
||||
|
||||
// SampleConfig is provided here
|
||||
func (*EventHub) SampleConfig() string {
|
||||
return `
|
||||
## The default behavior is to create a new Event Hub client from environment variables.
|
||||
## This requires one of the following sets of environment variables to be set:
|
||||
##
|
||||
## 1) Expected Environment Variables:
|
||||
## - "EVENTHUB_NAMESPACE"
|
||||
## - "EVENTHUB_NAME"
|
||||
## - "EVENTHUB_CONNECTION_STRING"
|
||||
##
|
||||
## 2) Expected Environment Variables:
|
||||
## - "EVENTHUB_NAMESPACE"
|
||||
## - "EVENTHUB_NAME"
|
||||
## - "EVENTHUB_KEY_NAME"
|
||||
## - "EVENTHUB_KEY_VALUE"
|
||||
|
||||
## Uncommenting the option below will create an Event Hub client based solely on the connection string.
|
||||
## This can either be the associated environment variable or hard coded directly.
|
||||
# connection_string = "$EVENTHUB_CONNECTION_STRING"
|
||||
|
||||
## Set persistence directory to a valid folder to use a file persister instead of an in-memory persister
|
||||
# persistence_dir = ""
|
||||
|
||||
## Change the default consumer group
|
||||
# consumer_group = ""
|
||||
|
||||
## By default the event hub receives all messages present on the broker, alternative modes can be set below.
|
||||
## The timestamp should be in https://github.com/toml-lang/toml#offset-date-time format (RFC 3339).
|
||||
## The 3 options below only apply if no valid persister is read from memory or file (e.g. first run).
|
||||
# from_timestamp =
|
||||
# latest = true
|
||||
|
||||
## Set a custom prefetch count for the receiver(s)
|
||||
# prefetch_count = 1000
|
||||
|
||||
## Add an epoch to the receiver(s)
|
||||
# epoch = 0
|
||||
|
||||
## Change to set a custom user agent, "telegraf" is used by default
|
||||
# user_agent = "telegraf"
|
||||
|
||||
## To consume from a specific partition, set the partition_ids option.
|
||||
## An empty array will result in receiving from all partitions.
|
||||
# partition_ids = ["0","1"]
|
||||
|
||||
## Max undelivered messages
|
||||
# max_undelivered_messages = 1000
|
||||
|
||||
## Set either option below to true to use a system property as timestamp.
|
||||
## You have the choice between EnqueuedTime and IoTHubEnqueuedTime.
|
||||
## It is recommended to use this setting when the data itself has no timestamp.
|
||||
# enqueued_time_as_ts = true
|
||||
# iot_hub_enqueued_time_as_ts = true
|
||||
|
||||
## Tags or fields to create from keys present in the application property bag.
|
||||
## These could for example be set by message enrichments in Azure IoT Hub.
|
||||
application_property_tags = []
|
||||
application_property_fields = []
|
||||
|
||||
## Tag or field name to use for metadata
|
||||
## By default all metadata is disabled
|
||||
# sequence_number_field = "SequenceNumber"
|
||||
# enqueued_time_field = "EnqueuedTime"
|
||||
# offset_field = "Offset"
|
||||
# partition_id_tag = "PartitionID"
|
||||
# partition_key_tag = "PartitionKey"
|
||||
# iot_hub_device_connection_id_tag = "IoTHubDeviceConnectionID"
|
||||
# iot_hub_auth_generation_id_tag = "IoTHubAuthGenerationID"
|
||||
# iot_hub_connection_auth_method_tag = "IoTHubConnectionAuthMethod"
|
||||
# iot_hub_connection_module_id_tag = "IoTHubConnectionModuleID"
|
||||
# iot_hub_enqueued_time_field = "IoTHubEnqueuedTime"
|
||||
|
||||
## Data format to consume.
|
||||
## Each data format has its own unique set of configuration options, read
|
||||
## more about them here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
data_format = "influx"
|
||||
`
|
||||
}
|
||||
|
||||
// Description of the plugin
|
||||
func (*EventHub) Description() string {
|
||||
return "Azure Event Hubs service input plugin"
|
||||
}
|
||||
|
||||
// SetParser sets the parser
|
||||
func (e *EventHub) SetParser(parser parsers.Parser) {
|
||||
e.parser = parser
|
||||
}
|
||||
|
||||
// Gather function is unused
|
||||
func (*EventHub) Gather(telegraf.Accumulator) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Init the EventHub ServiceInput
|
||||
func (e *EventHub) Init() (err error) {
|
||||
if e.MaxUndeliveredMessages == 0 {
|
||||
e.MaxUndeliveredMessages = defaultMaxUndeliveredMessages
|
||||
}
|
||||
|
||||
// Set hub options
|
||||
hubOpts := []eventhub.HubOption{}
|
||||
|
||||
if e.PersistenceDir != "" {
|
||||
persister, err := persist.NewFilePersister(e.PersistenceDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hubOpts = append(hubOpts, eventhub.HubWithOffsetPersistence(persister))
|
||||
}
|
||||
|
||||
if e.UserAgent != "" {
|
||||
hubOpts = append(hubOpts, eventhub.HubWithUserAgent(e.UserAgent))
|
||||
} else {
|
||||
hubOpts = append(hubOpts, eventhub.HubWithUserAgent(internal.ProductToken()))
|
||||
}
|
||||
|
||||
// Create event hub connection
|
||||
if e.ConnectionString != "" {
|
||||
e.hub, err = eventhub.NewHubFromConnectionString(e.ConnectionString, hubOpts...)
|
||||
} else {
|
||||
e.hub, err = eventhub.NewHubFromEnvironment(hubOpts...)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the EventHub ServiceInput
|
||||
func (e *EventHub) Start(acc telegraf.Accumulator) error {
|
||||
e.in = make(chan []telegraf.Metric)
|
||||
|
||||
var ctx context.Context
|
||||
ctx, e.cancel = context.WithCancel(context.Background())
|
||||
|
||||
// Start tracking
|
||||
e.wg.Add(1)
|
||||
go func() {
|
||||
defer e.wg.Done()
|
||||
e.startTracking(ctx, acc)
|
||||
}()
|
||||
|
||||
// Configure receiver options
|
||||
receiveOpts, err := e.configureReceiver()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
partitions := e.PartitionIDs
|
||||
|
||||
if len(e.PartitionIDs) == 0 {
|
||||
runtimeinfo, err := e.hub.GetRuntimeInformation(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
partitions = runtimeinfo.PartitionIDs
|
||||
}
|
||||
|
||||
for _, partitionID := range partitions {
|
||||
_, err = e.hub.Receive(ctx, partitionID, e.onMessage, receiveOpts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating receiver for partition %q: %v", partitionID, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *EventHub) configureReceiver() ([]eventhub.ReceiveOption, error) {
|
||||
receiveOpts := []eventhub.ReceiveOption{}
|
||||
|
||||
if e.ConsumerGroup != "" {
|
||||
receiveOpts = append(receiveOpts, eventhub.ReceiveWithConsumerGroup(e.ConsumerGroup))
|
||||
}
|
||||
|
||||
if !e.FromTimestamp.IsZero() {
|
||||
receiveOpts = append(receiveOpts, eventhub.ReceiveFromTimestamp(e.FromTimestamp))
|
||||
} else if e.Latest {
|
||||
receiveOpts = append(receiveOpts, eventhub.ReceiveWithLatestOffset())
|
||||
}
|
||||
|
||||
if e.PrefetchCount != 0 {
|
||||
receiveOpts = append(receiveOpts, eventhub.ReceiveWithPrefetchCount(e.PrefetchCount))
|
||||
}
|
||||
|
||||
if e.Epoch != 0 {
|
||||
receiveOpts = append(receiveOpts, eventhub.ReceiveWithEpoch(e.Epoch))
|
||||
}
|
||||
|
||||
return receiveOpts, nil
|
||||
}
|
||||
|
||||
// OnMessage handles an Event. When this function returns without error the
|
||||
// Event is immediately accepted and the offset is updated. If an error is
|
||||
// returned the Event is marked for redelivery.
|
||||
func (e *EventHub) onMessage(ctx context.Context, event *eventhub.Event) error {
|
||||
metrics, err := e.createMetrics(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case e.in <- metrics:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// OnDelivery returns true if a new slot has opened up in the TrackingAccumulator.
|
||||
func (e *EventHub) onDelivery(
|
||||
acc telegraf.TrackingAccumulator,
|
||||
groups map[telegraf.TrackingID][]telegraf.Metric,
|
||||
track telegraf.DeliveryInfo,
|
||||
) bool {
|
||||
if track.Delivered() {
|
||||
delete(groups, track.ID())
|
||||
return true
|
||||
}
|
||||
|
||||
// The metric was already accepted when onMessage completed, so we can't
|
||||
// fallback on redelivery from Event Hub. Add a new copy of the metric for
|
||||
// reprocessing.
|
||||
metrics, ok := groups[track.ID()]
|
||||
delete(groups, track.ID())
|
||||
if !ok {
|
||||
// The metrics should always be found, this message indicates a programming error.
|
||||
e.Log.Errorf("Could not find delievery: %d", track.ID())
|
||||
return true
|
||||
}
|
||||
|
||||
backup := deepCopyMetrics(metrics)
|
||||
id := acc.AddTrackingMetricGroup(metrics)
|
||||
groups[id] = backup
|
||||
return false
|
||||
}
|
||||
|
||||
func (e *EventHub) startTracking(ctx context.Context, ac telegraf.Accumulator) {
|
||||
acc := ac.WithTracking(e.MaxUndeliveredMessages)
|
||||
sem := make(semaphore, e.MaxUndeliveredMessages)
|
||||
groups := make(map[telegraf.TrackingID][]telegraf.Metric, e.MaxUndeliveredMessages)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case track := <-acc.Delivered():
|
||||
if e.onDelivery(acc, groups, track) {
|
||||
<-sem
|
||||
}
|
||||
case sem <- empty{}:
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case track := <-acc.Delivered():
|
||||
if e.onDelivery(acc, groups, track) {
|
||||
<-sem
|
||||
<-sem
|
||||
}
|
||||
case metrics := <-e.in:
|
||||
backup := deepCopyMetrics(metrics)
|
||||
id := acc.AddTrackingMetricGroup(metrics)
|
||||
groups[id] = backup
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func deepCopyMetrics(in []telegraf.Metric) []telegraf.Metric {
|
||||
metrics := make([]telegraf.Metric, 0, len(in))
|
||||
for _, m := range in {
|
||||
metrics = append(metrics, m.Copy())
|
||||
}
|
||||
return metrics
|
||||
}
|
||||
|
||||
// CreateMetrics returns the Metrics from the Event.
|
||||
func (e *EventHub) createMetrics(event *eventhub.Event) ([]telegraf.Metric, error) {
|
||||
metrics, err := e.parser.Parse(event.Data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := range metrics {
|
||||
for _, field := range e.ApplicationPropertyFields {
|
||||
if val, ok := event.Get(field); ok {
|
||||
metrics[i].AddField(field, val)
|
||||
}
|
||||
}
|
||||
|
||||
for _, tag := range e.ApplicationPropertyTags {
|
||||
if val, ok := event.Get(tag); ok {
|
||||
metrics[i].AddTag(tag, fmt.Sprintf("%v", val))
|
||||
}
|
||||
}
|
||||
|
||||
if e.SequenceNumberField != "" {
|
||||
metrics[i].AddField(e.SequenceNumberField, *event.SystemProperties.SequenceNumber)
|
||||
}
|
||||
|
||||
if e.EnqueuedTimeAsTs {
|
||||
metrics[i].SetTime(*event.SystemProperties.EnqueuedTime)
|
||||
} else if e.EnqueuedTimeField != "" {
|
||||
metrics[i].AddField(e.EnqueuedTimeField, (*event.SystemProperties.EnqueuedTime).UnixNano()/int64(time.Millisecond))
|
||||
}
|
||||
|
||||
if e.OffsetField != "" {
|
||||
metrics[i].AddField(e.OffsetField, *event.SystemProperties.Offset)
|
||||
}
|
||||
|
||||
if event.SystemProperties.PartitionID != nil && e.PartitionIDTag != "" {
|
||||
metrics[i].AddTag(e.PartitionIDTag, string(*event.SystemProperties.PartitionID))
|
||||
}
|
||||
if event.SystemProperties.PartitionKey != nil && e.PartitionKeyTag != "" {
|
||||
metrics[i].AddTag(e.PartitionKeyTag, *event.SystemProperties.PartitionKey)
|
||||
}
|
||||
if event.SystemProperties.IoTHubDeviceConnectionID != nil && e.IoTHubDeviceConnectionIDTag != "" {
|
||||
metrics[i].AddTag(e.IoTHubDeviceConnectionIDTag, *event.SystemProperties.IoTHubDeviceConnectionID)
|
||||
}
|
||||
if event.SystemProperties.IoTHubAuthGenerationID != nil && e.IoTHubAuthGenerationIDTag != "" {
|
||||
metrics[i].AddTag(e.IoTHubAuthGenerationIDTag, *event.SystemProperties.IoTHubAuthGenerationID)
|
||||
}
|
||||
if event.SystemProperties.IoTHubConnectionAuthMethod != nil && e.IoTHubConnectionAuthMethodTag != "" {
|
||||
metrics[i].AddTag(e.IoTHubConnectionAuthMethodTag, *event.SystemProperties.IoTHubConnectionAuthMethod)
|
||||
}
|
||||
if event.SystemProperties.IoTHubConnectionModuleID != nil && e.IoTHubConnectionModuleIDTag != "" {
|
||||
metrics[i].AddTag(e.IoTHubConnectionModuleIDTag, *event.SystemProperties.IoTHubConnectionModuleID)
|
||||
}
|
||||
if event.SystemProperties.IoTHubEnqueuedTime != nil {
|
||||
if e.IotHubEnqueuedTimeAsTs {
|
||||
metrics[i].SetTime(*event.SystemProperties.IoTHubEnqueuedTime)
|
||||
} else if e.IoTHubEnqueuedTimeField != "" {
|
||||
metrics[i].AddField(e.IoTHubEnqueuedTimeField, (*event.SystemProperties.IoTHubEnqueuedTime).UnixNano()/int64(time.Millisecond))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
// Stop the EventHub ServiceInput
|
||||
func (e *EventHub) Stop() {
|
||||
err := e.hub.Close(context.Background())
|
||||
if err != nil {
|
||||
e.Log.Errorf("Error closing Event Hub connection: %v", err)
|
||||
}
|
||||
e.cancel()
|
||||
e.wg.Wait()
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("eventhub", func() telegraf.Input {
|
||||
return &EventHub{}
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue