telegraf/plugins/inputs/azure_storage_queue/azure_storage_queue.go

135 lines
3.9 KiB
Go

package azure_storage_queue
import (
"context"
"errors"
"net/url"
"strings"
"time"
"github.com/Azure/azure-storage-queue-go/azqueue"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type AzureStorageQueue struct {
StorageAccountName string `toml:"account_name"`
StorageAccountKey string `toml:"account_key"`
PeekOldestMessageAge bool `toml:"peek_oldest_message_age"`
Log telegraf.Logger
serviceURL *azqueue.ServiceURL
}
var sampleConfig = `
## Required Azure Storage Account name
account_name = "mystorageaccount"
## Required Azure Storage Account access key
account_key = "storageaccountaccesskey"
## Set to false to disable peeking age of oldest message (executes faster)
# peek_oldest_message_age = true
`
func (a *AzureStorageQueue) Description() string {
return "Gather Azure Storage Queue metrics"
}
func (a *AzureStorageQueue) SampleConfig() string {
return sampleConfig
}
func (a *AzureStorageQueue) Init() error {
if a.StorageAccountName == "" {
return errors.New("account_name must be configured")
}
if a.StorageAccountKey == "" {
return errors.New("account_key must be configured")
}
return nil
}
func (a *AzureStorageQueue) GetServiceURL() (azqueue.ServiceURL, error) {
if a.serviceURL == nil {
_url, err := url.Parse("https://" + a.StorageAccountName + ".queue.core.windows.net")
if err != nil {
return azqueue.ServiceURL{}, err
}
credential, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey)
if err != nil {
return azqueue.ServiceURL{}, err
}
pipeline := azqueue.NewPipeline(credential, azqueue.PipelineOptions{})
serviceURL := azqueue.NewServiceURL(*_url, pipeline)
a.serviceURL = &serviceURL
}
return *a.serviceURL, nil
}
func (a *AzureStorageQueue) GatherQueueMetrics(acc telegraf.Accumulator, queueItem azqueue.QueueItem, properties *azqueue.QueueGetPropertiesResponse, peekedMessage *azqueue.PeekedMessage) {
fields := make(map[string]interface{})
tags := make(map[string]string)
tags["queue"] = strings.TrimSpace(queueItem.Name)
tags["account"] = a.StorageAccountName
fields["size"] = properties.ApproximateMessagesCount()
if peekedMessage != nil {
fields["oldest_message_age_ns"] = time.Now().UnixNano() - peekedMessage.InsertionTime.UnixNano()
}
acc.AddFields("azure_storage_queues", fields, tags)
}
func (a *AzureStorageQueue) Gather(acc telegraf.Accumulator) error {
serviceURL, err := a.GetServiceURL()
if err != nil {
return err
}
ctx := context.TODO()
for marker := (azqueue.Marker{}); marker.NotDone(); {
a.Log.Debugf("Listing queues of storage account '%s'", a.StorageAccountName)
queuesSegment, err := serviceURL.ListQueuesSegment(ctx, marker,
azqueue.ListQueuesSegmentOptions{
Detail: azqueue.ListQueuesSegmentDetails{Metadata: false},
})
if err != nil {
return err
}
marker = queuesSegment.NextMarker
for _, queueItem := range queuesSegment.QueueItems {
a.Log.Debugf("Processing queue '%s' of storage account '%s'", queueItem.Name, a.StorageAccountName)
queueURL := serviceURL.NewQueueURL(queueItem.Name)
properties, err := queueURL.GetProperties(ctx)
if err != nil {
a.Log.Errorf("Error getting properties for queue %s: %s", queueItem.Name, err.Error())
continue
}
var peekedMessage *azqueue.PeekedMessage
if a.PeekOldestMessageAge {
messagesURL := queueURL.NewMessagesURL()
messagesResponse, err := messagesURL.Peek(ctx, 1)
if err != nil {
a.Log.Errorf("Error peeking queue %s: %s", queueItem.Name, err.Error())
} else if messagesResponse.NumMessages() > 0 {
peekedMessage = messagesResponse.Message(0)
}
}
a.GatherQueueMetrics(acc, queueItem, properties, peekedMessage)
}
}
return nil
}
func init() {
inputs.Add("azure_storage_queue", func() telegraf.Input {
return &AzureStorageQueue{PeekOldestMessageAge: true}
})
}