Add azure_storage_queue input plugin (#5323)
This commit is contained in:
parent
558c825478
commit
4f54b11973
|
@ -47,6 +47,22 @@
|
|||
revision = "2b93072101d466aa4120b3c23c2e1b08af01541c"
|
||||
version = "v0.6.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:bd444f85703c5aff1ba686cb52766fd38c3730d4e1dfb02327b2481bfe674997"
|
||||
name = "github.com/Azure/azure-pipeline-go"
|
||||
packages = ["pipeline"]
|
||||
pruneopts = ""
|
||||
revision = "b8e3409182fd52e74f7d7bdfbff5833591b3b655"
|
||||
version = "v0.1.8"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:6ef03ecdaf3e9a003c2ebd67bfa673bbe8df2c23c82217a4448da766e8ef6b30"
|
||||
name = "github.com/Azure/azure-storage-queue-go"
|
||||
packages = ["azqueue"]
|
||||
pruneopts = ""
|
||||
revision = "6ed74e755687d1a74f08d9aab5a9e3f2fbe7d162"
|
||||
version = "0.2.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:5923e22a060ab818a015593422f9e8a35b9d881d4cfcfed0669a82959b11c7ee"
|
||||
name = "github.com/Azure/go-autorest"
|
||||
|
@ -1669,6 +1685,7 @@
|
|||
"cloud.google.com/go/pubsub",
|
||||
"collectd.org/api",
|
||||
"collectd.org/network",
|
||||
"github.com/Azure/azure-storage-queue-go/azqueue",
|
||||
"github.com/Azure/go-autorest/autorest",
|
||||
"github.com/Azure/go-autorest/autorest/azure/auth",
|
||||
"github.com/Microsoft/ApplicationInsights-Go/appinsights",
|
||||
|
|
|
@ -236,6 +236,10 @@
|
|||
name = "github.com/Azure/go-autorest"
|
||||
version = "10.12.0"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/Azure/azure-storage-queue-go"
|
||||
version = "0.2.0"
|
||||
|
||||
[[constraint]]
|
||||
branch = "master"
|
||||
name = "golang.org/x/oauth2"
|
||||
|
@ -257,6 +261,10 @@
|
|||
name = "github.com/karrick/godirwalk"
|
||||
version = "1.7.5"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/Azure/azure-pipeline-go"
|
||||
version = "0.1.8"
|
||||
|
||||
[[override]]
|
||||
name = "github.com/harlow/kinesis-consumer"
|
||||
branch = "master"
|
||||
|
|
|
@ -12,6 +12,7 @@ following works:
|
|||
- github.com/amir/raidman [The Unlicense](https://github.com/amir/raidman/blob/master/UNLICENSE)
|
||||
- github.com/apache/thrift [Apache License 2.0](https://github.com/apache/thrift/blob/master/LICENSE)
|
||||
- github.com/aws/aws-sdk-go [Apache License 2.0](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt)
|
||||
- github.com/Azure/azure-storage-queue-go [MIT License](https://github.com/Azure/azure-storage-queue-go/blob/master/LICENSE)
|
||||
- github.com/Azure/go-autorest [Apache License 2.0](https://github.com/Azure/go-autorest/blob/master/LICENSE)
|
||||
- github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE)
|
||||
- github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE)
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/apcupsd"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/azure_storage_queue"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/beanstalkd"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/bind"
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
# Telegraf Input Plugin: Azure Storage Queue
|
||||
|
||||
This plugin gathers sizes of Azure Storage Queues.
|
||||
|
||||
### Configuration:
|
||||
|
||||
```toml
|
||||
# Description
|
||||
[[inputs.azure_storage_queue]]
|
||||
## Required Azure Storage Account name
|
||||
account_name = "mystorageaccount"
|
||||
|
||||
## Required Azure Storage Account access key
|
||||
account_key = "storageaccountaccesskey"
|
||||
|
||||
## Set to false to disable peeking age of oldest message (executes faster)
|
||||
# peek_oldest_message_age = true
|
||||
```
|
||||
|
||||
### Metrics
|
||||
- azure_storage_queues
|
||||
- tags:
|
||||
- queue
|
||||
- account
|
||||
- fields:
|
||||
- size (integer, count)
|
||||
- oldest_message_age_ns (integer, nanoseconds) Age of message at the head of the queue.
|
||||
Requires `peek_oldest_message_age` to be configured to `true`.
|
||||
|
||||
### Example Output
|
||||
|
||||
```
|
||||
azure_storage_queues,queue=myqueue,account=mystorageaccount oldest_message_age=799714900i,size=7i 1565970503000000000
|
||||
azure_storage_queues,queue=myemptyqueue,account=mystorageaccount size=0i 1565970502000000000
|
||||
```
|
|
@ -0,0 +1,134 @@
|
|||
package activemq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-storage-queue-go/azqueue"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
type AzureStorageQueue struct {
|
||||
StorageAccountName string `toml:"account_name"`
|
||||
StorageAccountKey string `toml:"account_key"`
|
||||
PeekOldestMessageAge bool `toml:"peek_oldest_message_age"`
|
||||
|
||||
serviceURL *azqueue.ServiceURL
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
## Required Azure Storage Account name
|
||||
account_name = "mystorageaccount"
|
||||
|
||||
## Required Azure Storage Account access key
|
||||
account_key = "storageaccountaccesskey"
|
||||
|
||||
## Set to false to disable peeking age of oldest message (executes faster)
|
||||
# peek_oldest_message_age = true
|
||||
`
|
||||
|
||||
func (a *AzureStorageQueue) Description() string {
|
||||
return "Gather Azure Storage Queue metrics"
|
||||
}
|
||||
|
||||
func (a *AzureStorageQueue) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (a *AzureStorageQueue) Init() error {
|
||||
if a.StorageAccountName == "" {
|
||||
return errors.New("account_name must be configured")
|
||||
}
|
||||
|
||||
if a.StorageAccountKey == "" {
|
||||
return errors.New("account_key must be configured")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *AzureStorageQueue) GetServiceURL() (azqueue.ServiceURL, error) {
|
||||
if a.serviceURL == nil {
|
||||
_url, err := url.Parse("https://" + a.StorageAccountName + ".queue.core.windows.net")
|
||||
if err != nil {
|
||||
return azqueue.ServiceURL{}, err
|
||||
}
|
||||
|
||||
credential, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey)
|
||||
if err != nil {
|
||||
return azqueue.ServiceURL{}, err
|
||||
}
|
||||
|
||||
pipeline := azqueue.NewPipeline(credential, azqueue.PipelineOptions{})
|
||||
|
||||
serviceURL := azqueue.NewServiceURL(*_url, pipeline)
|
||||
a.serviceURL = &serviceURL
|
||||
}
|
||||
return *a.serviceURL, nil
|
||||
}
|
||||
|
||||
func (a *AzureStorageQueue) GatherQueueMetrics(acc telegraf.Accumulator, queueItem azqueue.QueueItem, properties *azqueue.QueueGetPropertiesResponse, peekedMessage *azqueue.PeekedMessage) {
|
||||
fields := make(map[string]interface{})
|
||||
tags := make(map[string]string)
|
||||
tags["queue"] = strings.TrimSpace(queueItem.Name)
|
||||
tags["account"] = a.StorageAccountName
|
||||
fields["size"] = properties.ApproximateMessagesCount()
|
||||
if peekedMessage != nil {
|
||||
fields["oldest_message_age_ns"] = time.Now().UnixNano() - peekedMessage.InsertionTime.UnixNano()
|
||||
}
|
||||
acc.AddFields("azure_storage_queues", fields, tags)
|
||||
}
|
||||
|
||||
func (a *AzureStorageQueue) Gather(acc telegraf.Accumulator) error {
|
||||
serviceURL, err := a.GetServiceURL()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
|
||||
for marker := (azqueue.Marker{}); marker.NotDone(); {
|
||||
log.Printf("D! [inputs.azure_storage_queue] Listing queues of storage account '%s'", a.StorageAccountName)
|
||||
queuesSegment, err := serviceURL.ListQueuesSegment(ctx, marker,
|
||||
azqueue.ListQueuesSegmentOptions{
|
||||
Detail: azqueue.ListQueuesSegmentDetails{Metadata: false},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
marker = queuesSegment.NextMarker
|
||||
|
||||
for _, queueItem := range queuesSegment.QueueItems {
|
||||
log.Printf("D! [inputs.azure_storage_queue] Processing queue '%s' of storage account '%s'", queueItem.Name, a.StorageAccountName)
|
||||
queueURL := serviceURL.NewQueueURL(queueItem.Name)
|
||||
properties, err := queueURL.GetProperties(ctx)
|
||||
if err != nil {
|
||||
log.Printf("E! [inputs.azure_storage_queue] Error getting properties for queue %s: %s", queueItem.Name, err.Error())
|
||||
continue
|
||||
}
|
||||
var peekedMessage *azqueue.PeekedMessage
|
||||
if a.PeekOldestMessageAge {
|
||||
messagesURL := queueURL.NewMessagesURL()
|
||||
messagesResponse, err := messagesURL.Peek(ctx, 1)
|
||||
if err != nil {
|
||||
log.Printf("E! [inputs.azure_storage_queue] Error peeking queue %s: %s", queueItem.Name, err.Error())
|
||||
} else if messagesResponse.NumMessages() > 0 {
|
||||
peekedMessage = messagesResponse.Message(0)
|
||||
}
|
||||
}
|
||||
|
||||
a.GatherQueueMetrics(acc, queueItem, properties, peekedMessage)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("azure_storage_queue", func() telegraf.Input {
|
||||
return &AzureStorageQueue{PeekOldestMessageAge: true}
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue