diff --git a/Gopkg.lock b/Gopkg.lock index 248d55456..c38cbb506 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -47,6 +47,22 @@ revision = "2b93072101d466aa4120b3c23c2e1b08af01541c" version = "v0.6.0" +[[projects]] + digest = "1:bd444f85703c5aff1ba686cb52766fd38c3730d4e1dfb02327b2481bfe674997" + name = "github.com/Azure/azure-pipeline-go" + packages = ["pipeline"] + pruneopts = "" + revision = "b8e3409182fd52e74f7d7bdfbff5833591b3b655" + version = "v0.1.8" + +[[projects]] + digest = "1:6ef03ecdaf3e9a003c2ebd67bfa673bbe8df2c23c82217a4448da766e8ef6b30" + name = "github.com/Azure/azure-storage-queue-go" + packages = ["azqueue"] + pruneopts = "" + revision = "6ed74e755687d1a74f08d9aab5a9e3f2fbe7d162" + version = "0.2.0" + [[projects]] digest = "1:5923e22a060ab818a015593422f9e8a35b9d881d4cfcfed0669a82959b11c7ee" name = "github.com/Azure/go-autorest" @@ -1669,6 +1685,7 @@ "cloud.google.com/go/pubsub", "collectd.org/api", "collectd.org/network", + "github.com/Azure/azure-storage-queue-go/azqueue", "github.com/Azure/go-autorest/autorest", "github.com/Azure/go-autorest/autorest/azure/auth", "github.com/Microsoft/ApplicationInsights-Go/appinsights", diff --git a/Gopkg.toml b/Gopkg.toml index 2cc57dd71..6848b947f 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -236,6 +236,10 @@ name = "github.com/Azure/go-autorest" version = "10.12.0" +[[constraint]] + name = "github.com/Azure/azure-storage-queue-go" + version = "0.2.0" + [[constraint]] branch = "master" name = "golang.org/x/oauth2" @@ -257,6 +261,10 @@ name = "github.com/karrick/godirwalk" version = "1.7.5" +[[constraint]] + name = "github.com/Azure/azure-pipeline-go" + version = "0.1.8" + [[override]] name = "github.com/harlow/kinesis-consumer" branch = "master" diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 81ecaac81..bb1e90007 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -12,6 +12,7 @@ following works: - github.com/amir/raidman [The Unlicense](https://github.com/amir/raidman/blob/master/UNLICENSE) - github.com/apache/thrift [Apache License 2.0](https://github.com/apache/thrift/blob/master/LICENSE) - github.com/aws/aws-sdk-go [Apache License 2.0](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt) +- github.com/Azure/azure-storage-queue-go [MIT License](https://github.com/Azure/azure-storage-queue-go/blob/master/LICENSE) - github.com/Azure/go-autorest [Apache License 2.0](https://github.com/Azure/go-autorest/blob/master/LICENSE) - github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE) - github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 7381487d5..f8ff6c879 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -7,6 +7,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/apache" _ "github.com/influxdata/telegraf/plugins/inputs/apcupsd" _ "github.com/influxdata/telegraf/plugins/inputs/aurora" + _ "github.com/influxdata/telegraf/plugins/inputs/azure_storage_queue" _ "github.com/influxdata/telegraf/plugins/inputs/bcache" _ "github.com/influxdata/telegraf/plugins/inputs/beanstalkd" _ "github.com/influxdata/telegraf/plugins/inputs/bind" diff --git a/plugins/inputs/azure_storage_queue/README.md b/plugins/inputs/azure_storage_queue/README.md new file mode 100644 index 000000000..7985c886e --- /dev/null +++ b/plugins/inputs/azure_storage_queue/README.md @@ -0,0 +1,35 @@ +# Telegraf Input Plugin: Azure Storage Queue + +This plugin gathers sizes of Azure Storage Queues. + +### Configuration: + +```toml +# Description +[[inputs.azure_storage_queue]] + ## Required Azure Storage Account name + account_name = "mystorageaccount" + + ## Required Azure Storage Account access key + account_key = "storageaccountaccesskey" + + ## Set to false to disable peeking age of oldest message (executes faster) + # peek_oldest_message_age = true +``` + +### Metrics +- azure_storage_queues + - tags: + - queue + - account + - fields: + - size (integer, count) + - oldest_message_age_ns (integer, nanoseconds) Age of message at the head of the queue. + Requires `peek_oldest_message_age` to be configured to `true`. + +### Example Output + +``` +azure_storage_queues,queue=myqueue,account=mystorageaccount oldest_message_age=799714900i,size=7i 1565970503000000000 +azure_storage_queues,queue=myemptyqueue,account=mystorageaccount size=0i 1565970502000000000 +``` \ No newline at end of file diff --git a/plugins/inputs/azure_storage_queue/azure_storage_queue.go b/plugins/inputs/azure_storage_queue/azure_storage_queue.go new file mode 100644 index 000000000..0fa7b0fd6 --- /dev/null +++ b/plugins/inputs/azure_storage_queue/azure_storage_queue.go @@ -0,0 +1,134 @@ +package activemq + +import ( + "context" + "errors" + "log" + "net/url" + "strings" + "time" + + "github.com/Azure/azure-storage-queue-go/azqueue" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type AzureStorageQueue struct { + StorageAccountName string `toml:"account_name"` + StorageAccountKey string `toml:"account_key"` + PeekOldestMessageAge bool `toml:"peek_oldest_message_age"` + + serviceURL *azqueue.ServiceURL +} + +var sampleConfig = ` + ## Required Azure Storage Account name + account_name = "mystorageaccount" + + ## Required Azure Storage Account access key + account_key = "storageaccountaccesskey" + + ## Set to false to disable peeking age of oldest message (executes faster) + # peek_oldest_message_age = true + ` + +func (a *AzureStorageQueue) Description() string { + return "Gather Azure Storage Queue metrics" +} + +func (a *AzureStorageQueue) SampleConfig() string { + return sampleConfig +} + +func (a *AzureStorageQueue) Init() error { + if a.StorageAccountName == "" { + return errors.New("account_name must be configured") + } + + if a.StorageAccountKey == "" { + return errors.New("account_key must be configured") + } + return nil +} + +func (a *AzureStorageQueue) GetServiceURL() (azqueue.ServiceURL, error) { + if a.serviceURL == nil { + _url, err := url.Parse("https://" + a.StorageAccountName + ".queue.core.windows.net") + if err != nil { + return azqueue.ServiceURL{}, err + } + + credential, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey) + if err != nil { + return azqueue.ServiceURL{}, err + } + + pipeline := azqueue.NewPipeline(credential, azqueue.PipelineOptions{}) + + serviceURL := azqueue.NewServiceURL(*_url, pipeline) + a.serviceURL = &serviceURL + } + return *a.serviceURL, nil +} + +func (a *AzureStorageQueue) GatherQueueMetrics(acc telegraf.Accumulator, queueItem azqueue.QueueItem, properties *azqueue.QueueGetPropertiesResponse, peekedMessage *azqueue.PeekedMessage) { + fields := make(map[string]interface{}) + tags := make(map[string]string) + tags["queue"] = strings.TrimSpace(queueItem.Name) + tags["account"] = a.StorageAccountName + fields["size"] = properties.ApproximateMessagesCount() + if peekedMessage != nil { + fields["oldest_message_age_ns"] = time.Now().UnixNano() - peekedMessage.InsertionTime.UnixNano() + } + acc.AddFields("azure_storage_queues", fields, tags) +} + +func (a *AzureStorageQueue) Gather(acc telegraf.Accumulator) error { + serviceURL, err := a.GetServiceURL() + if err != nil { + return err + } + + ctx := context.TODO() + + for marker := (azqueue.Marker{}); marker.NotDone(); { + log.Printf("D! [inputs.azure_storage_queue] Listing queues of storage account '%s'", a.StorageAccountName) + queuesSegment, err := serviceURL.ListQueuesSegment(ctx, marker, + azqueue.ListQueuesSegmentOptions{ + Detail: azqueue.ListQueuesSegmentDetails{Metadata: false}, + }) + if err != nil { + return err + } + marker = queuesSegment.NextMarker + + for _, queueItem := range queuesSegment.QueueItems { + log.Printf("D! [inputs.azure_storage_queue] Processing queue '%s' of storage account '%s'", queueItem.Name, a.StorageAccountName) + queueURL := serviceURL.NewQueueURL(queueItem.Name) + properties, err := queueURL.GetProperties(ctx) + if err != nil { + log.Printf("E! [inputs.azure_storage_queue] Error getting properties for queue %s: %s", queueItem.Name, err.Error()) + continue + } + var peekedMessage *azqueue.PeekedMessage + if a.PeekOldestMessageAge { + messagesURL := queueURL.NewMessagesURL() + messagesResponse, err := messagesURL.Peek(ctx, 1) + if err != nil { + log.Printf("E! [inputs.azure_storage_queue] Error peeking queue %s: %s", queueItem.Name, err.Error()) + } else if messagesResponse.NumMessages() > 0 { + peekedMessage = messagesResponse.Message(0) + } + } + + a.GatherQueueMetrics(acc, queueItem, properties, peekedMessage) + } + } + return nil +} + +func init() { + inputs.Add("azure_storage_queue", func() telegraf.Input { + return &AzureStorageQueue{PeekOldestMessageAge: true} + }) +}