Output: Azure Monitor: Cleanup and add README

This commit is contained in:
Gunnar Aasen 2018-04-29 00:31:24 -07:00
parent 5c4c3a1ca9
commit 93a579d7e4
4 changed files with 375 additions and 316 deletions

View File

@ -0,0 +1,74 @@
## Azure Monitor Custom Metrics Output for Telegraf
This plugin will send custom metrics to Azure Monitor.
All metrics are written as summarized values: min, max, sum, count. The Telegraf field name is appended to the metric name. All Telegraf tags are set as the metric dimensions.
## Azure Authentication
This plugin can use one of several different types of credentials to authenticate
with the Azure Monitor Custom Metrics ingestion API endpoint. In the following
order the plugin will attempt to authenticate.
1. Managed Service Identity (MSI) token
- This is the prefered authentication method.
- Note: MSI is only available to ARM-based resources.
2. AAD Application Tokens (Service Principals)
- Primarily useful if Telegraf is writing metrics for other resources. [More information](https://docs.microsoft.com/en-us/azure/active-directory/develop/active-directory-application-objects).
- A Service Principal or User Principal needs to be assigned the `Monitoring Contributor` roles.
3. AAD User Tokens (User Principals)
- Allows Telegraf to authenticate like a user. It is best to use this method for development.
## Config
For this output plugin to function correctly the following variables
must be configured.
* resourceId
* region
### region
The region is the Azure region that you wish to connect to.
Examples include but are not limited to:
* useast
* centralus
* westcentralus
* westeurope
* southeastasia
### resourceId
The resourceId used for Azure Monitor metrics.
### Configuration:
```
# Configuration for sending aggregate metrics to Azure Monitor
[[outputs.azuremonitor]]
## The resource ID against which metric will be logged. If not
## specified, the plugin will attempt to retrieve the resource ID
## of the VM via the instance metadata service (optional if running
## on an Azure VM with MSI)
#resourceId = "/subscriptions/<subscription-id>/resourceGroups/<resource-group>/providers/Microsoft.Compute/virtualMachines/<vm-name>"
## Azure region to publish metrics against. Defaults to eastus.
## Leave blank to automatically query the region via MSI.
#region = "useast"
## Write HTTP timeout, formatted as a string. If not provided, will default
## to 5s. 0s means no timeout (not recommended).
# timeout = "5s"
## Whether or not to use managed service identity.
#useManagedServiceIdentity = true
## Fill in the following values if using Active Directory Service
## Principal or User Principal for authentication.
## Subscription ID
#azureSubscription = ""
## Tenant ID
#azureTenant = ""
## Client ID
#azureClientId = ""
## Client secrete
#azureClientSecret = ""
```

View File

@ -59,8 +59,8 @@ type VirtualMachineMetadata struct {
} `json:"network"`
}
// MsiToken is the managed service identity token
type MsiToken struct {
// msiToken is the managed service identity token
type msiToken struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpiresIn string `json:"expires_in"`
@ -74,7 +74,7 @@ type MsiToken struct {
raw string
}
func (m *MsiToken) parseTimes() {
func (m *msiToken) parseTimes() {
val, err := strconv.ParseInt(m.ExpiresOn, 10, 64)
if err == nil {
m.expiresAt = time.Unix(val, 0)
@ -87,23 +87,23 @@ func (m *MsiToken) parseTimes() {
}
// ExpiresAt is the time at which the token expires
func (m *MsiToken) ExpiresAt() time.Time {
func (m *msiToken) ExpiresAt() time.Time {
return m.expiresAt
}
// ExpiresInDuration returns the duration until the token expires
func (m *MsiToken) ExpiresInDuration() time.Duration {
func (m *msiToken) ExpiresInDuration() time.Duration {
expiresDuration := m.expiresAt.Sub(time.Now().UTC())
return expiresDuration
}
// NotBeforeTime returns the time at which the token becomes valid
func (m *MsiToken) NotBeforeTime() time.Time {
func (m *msiToken) NotBeforeTime() time.Time {
return m.notBefore
}
// GetMsiToken retrieves a managed service identity token from the specified port on the local VM
func (s *AzureInstanceMetadata) GetMsiToken(clientID string, resourceID string) (*MsiToken, error) {
func (s *AzureInstanceMetadata) getMsiToken(clientID string, resourceID string) (*msiToken, error) {
// Acquire an MSI token. Documented at:
// https://docs.microsoft.com/en-us/azure/active-directory/managed-service-identity/how-to-use-vm-token
//
@ -159,7 +159,7 @@ func (s *AzureInstanceMetadata) GetMsiToken(clientID string, resourceID string)
resp.StatusCode, resp.Status, reply)
}
var token MsiToken
var token msiToken
if err := json.Unmarshal(reply, &token); err != nil {
return nil, err
}

View File

@ -30,7 +30,7 @@ func TestGetTOKEN(t *testing.T) {
azureMetadata := &AzureInstanceMetadata{}
resourceID := "https://ingestion.monitor.azure.com/"
token, err := azureMetadata.GetMsiToken("", resourceID)
token, err := azureMetadata.getMsiToken("", resourceID)
require.NoError(t, err)
require.NotEmpty(t, token.AccessToken)

View File

@ -14,23 +14,25 @@ import (
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
)
// AzureMonitor allows publishing of metrics to the Azure Monitor custom metrics service
type AzureMonitor struct {
ResourceID string `toml:"resourceId"`
Region string `toml:"region"`
HTTPPostTimeout int `toml:"httpPostTimeout"`
AzureSubscriptionID string `toml:"azureSubscription"`
AzureTenantID string `toml:"azureTenant"`
AzureClientID string `toml:"azureClientId"`
AzureClientSecret string `toml:"azureClientSecret"`
ResourceID string `toml:"resource_id"`
Region string `toml:"region"`
Timeout internal.Duration `toml:"Timeout"`
AzureSubscriptionID string `toml:"azure_subscription"`
AzureTenantID string `toml:"azure_tenant"`
AzureClientID string `toml:"azure_client_id"`
AzureClientSecret string `toml:"azure_client_secret"`
StringAsDimension bool `toml:"string_as_dimension"`
useMsi bool
useMsi bool `toml:"use_managed_service_identity"`
metadataService *AzureInstanceMetadata
instanceMetadata *VirtualMachineMetadata
msiToken *MsiToken
msiToken *msiToken
msiResource string
bearerToken string
expiryWatermark time.Duration
@ -40,7 +42,7 @@ type AzureMonitor struct {
client *http.Client
cache map[uint64]azureMonitorMetric
cache map[string]*azureMonitorMetric
period time.Duration
delay time.Duration
periodStart time.Time
@ -75,31 +77,38 @@ type azureMonitorSeries struct {
}
var sampleConfig = `
## The resource ID against which metric will be logged. If not
## specified, the plugin will attempt to retrieve the resource ID
## of the VM via the instance metadata service (optional if running
## on an Azure VM with MSI)
resourceId = "/subscriptions/3e9c2afc-52b3-4137-9bba-02b6eb204331/resourceGroups/someresourcegroup-rg/providers/Microsoft.Compute/virtualMachines/somevmname"
## Azure region to publish metrics against. Defaults to eastus
region = "useast"
## Maximum duration to wait for HTTP post (in seconds). Defaults to 15
httpPostTimeout = 15
## Whether or not to use managed service identity (defaults to true).
useManagedServiceIdentity = true
## The resource ID against which metric will be logged. If not
## specified, the plugin will attempt to retrieve the resource ID
## of the VM via the instance metadata service (optional if running
## on an Azure VM with MSI)
#resource_id = "/subscriptions/<subscription-id>/resourceGroups/<resource-group>/providers/Microsoft.Compute/virtualMachines/<vm-name>"
## Azure region to publish metrics against. Defaults to eastus.
## Leave blank to automatically query the region via MSI.
#region = "useast"
## Leave this section blank to use Managed Service Identity.
## TODO
azureSubscription = "TODO"
## TODO
azureTenant = "TODO"
## TODO
azureClientId = "TODO"
## TODO
azureClientSecret = "TODO"
## Write HTTP timeout, formatted as a string. If not provided, will default
## to 5s. 0s means no timeout (not recommended).
# timeout = "5s"
## Whether or not to use managed service identity.
#use_managed_service_identity = true
## Fill in the following values if using Active Directory Service
## Principal or User Principal for authentication.
## Subscription ID
#azure_subscription = ""
## Tenant ID
#azure_tenant = ""
## Client ID
#azure_client_id = ""
## Client secrete
#azure_client_secret = ""
`
const (
azureMonitorDefaultRegion = "eastus"
defaultRegion = "eastus"
defaultMSIResource = "https://monitoring.azure.com/"
)
// Connect initializes the plugin and validates connectivity
@ -113,18 +122,13 @@ func (a *AzureMonitor) Connect() error {
return fmt.Errorf("Must provide values for azureSubscription, azureTenant, azureClient and azureClientSecret, or leave all blank to default to MSI")
}
if a.useMsi == false {
if !a.useMsi {
// If using direct AD authentication create the AD access client
oauthConfig, err := adal.NewOAuthConfig(azure.PublicCloud.ActiveDirectoryEndpoint, a.AzureTenantID)
if err != nil {
return fmt.Errorf("Could not initialize AD client: %s", err)
}
a.oauthConfig = oauthConfig
}
if a.HTTPPostTimeout == 0 {
a.HTTPPostTimeout = 10
}
a.metadataService = &AzureInstanceMetadata{}
@ -133,24 +137,18 @@ func (a *AzureMonitor) Connect() error {
a.msiResource = "https://monitoring.azure.com/"
// Validate the resource identifier
if a.ResourceID == "" {
metadata, err := a.metadataService.GetInstanceMetadata()
if err != nil {
return fmt.Errorf("No resource id specified, and Azure Instance metadata service not available. If not running on an Azure VM, provide a value for resourceId")
}
a.ResourceID = metadata.AzureResourceID
if a.Region == "" {
a.Region = metadata.Compute.Location
}
metadata, err := a.metadataService.GetInstanceMetadata()
if err != nil {
return fmt.Errorf("No resource id specified, and Azure Instance metadata service not available. If not running on an Azure VM, provide a value for resourceId")
}
a.ResourceID = metadata.AzureResourceID
if a.Region == "" {
a.Region = azureMonitorDefaultRegion
a.Region = metadata.Compute.Location
}
// Validate credentials
err := a.validateCredentials()
err = a.validateCredentials()
if err != nil {
return err
}
@ -161,6 +159,45 @@ func (a *AzureMonitor) Connect() error {
return nil
}
func (a *AzureMonitor) validateCredentials() error {
// Use managed service identity
if a.useMsi {
// Check expiry on the token
if a.msiToken != nil {
expiryDuration := a.msiToken.ExpiresInDuration()
if expiryDuration > a.expiryWatermark {
return nil
}
// Token is about to expire
log.Printf("Bearer token expiring in %s; acquiring new token\n", expiryDuration.String())
a.msiToken = nil
}
// No token, acquire an MSI token
if a.msiToken == nil {
msiToken, err := a.metadataService.getMsiToken(a.AzureClientID, a.msiResource)
if err != nil {
return err
}
log.Printf("Bearer token acquired; expiring in %s\n", msiToken.ExpiresInDuration().String())
a.msiToken = msiToken
a.bearerToken = msiToken.AccessToken
}
// Otherwise directory acquire a token
} else {
adToken, err := adal.NewServicePrincipalToken(
*(a.oauthConfig), a.AzureClientID, a.AzureClientSecret,
azure.PublicCloud.ActiveDirectoryEndpoint)
if err != nil {
return fmt.Errorf("Could not acquire ADAL token: %s", err)
}
a.adalToken = adToken
}
return nil
}
// Description provides a description of the plugin
func (a *AzureMonitor) Description() string {
return "Configuration for sending aggregate metrics to Azure Monitor"
@ -180,9 +217,7 @@ func (a *AzureMonitor) Close() error {
// Write writes metrics to the remote endpoint
func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
log.Printf("metrics collected: %+v", metrics)
// Assemble stats on incoming metrics
// Assemble basic stats on incoming metrics
for _, metric := range metrics {
select {
case a.metrics <- metric:
@ -194,257 +229,6 @@ func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
return nil
}
func (a *AzureMonitor) validateCredentials() error {
// Use managed service identity
if a.useMsi {
// Check expiry on the token
if a.msiToken != nil {
expiryDuration := a.msiToken.ExpiresInDuration()
if expiryDuration > a.expiryWatermark {
return nil
}
// Token is about to expire
log.Printf("Bearer token expiring in %s; acquiring new token\n", expiryDuration.String())
a.msiToken = nil
}
// No token, acquire an MSI token
if a.msiToken == nil {
msiToken, err := a.metadataService.GetMsiToken(a.AzureClientID, a.msiResource)
if err != nil {
return err
}
log.Printf("Bearer token acquired; expiring in %s\n", msiToken.ExpiresInDuration().String())
a.msiToken = msiToken
a.bearerToken = msiToken.AccessToken
}
// Otherwise directory acquire a token
} else {
adToken, err := adal.NewServicePrincipalToken(
*(a.oauthConfig), a.AzureClientID, a.AzureClientSecret,
azure.PublicCloud.ActiveDirectoryEndpoint)
if err != nil {
return fmt.Errorf("Could not acquire ADAL token: %s", err)
}
a.adalToken = adToken
}
return nil
}
func (a *AzureMonitor) add(metric telegraf.Metric) {
id := metric.HashID()
if azm, ok := a.cache[id]; !ok {
// hit an uncached metric, create caches for first time:
var dimensionNames []string
var dimensionValues []string
for i, tag := range metric.TagList() {
// Azure custom metrics service supports up to 10 dimensions
if i > 9 {
continue
}
dimensionNames = append(dimensionNames, tag.Key)
dimensionValues = append(dimensionValues, tag.Value)
}
// Field keys are stored as the last dimension
dimensionNames = append(dimensionNames, "field")
var seriesList []*azureMonitorSeries
// Store each field as a separate series with field key as a new dimension
for _, field := range metric.FieldList() {
azmseries := newAzureMonitorSeries(field, dimensionValues)
seriesList = append(seriesList, azmseries)
}
if len(seriesList) < 1 {
log.Printf("no valid fields for metric: %s", metric)
return
}
a.cache[id] = azureMonitorMetric{
Time: metric.Time(),
Data: &azureMonitorData{
BaseData: &azureMonitorBaseData{
Metric: metric.Name(),
Namespace: "default",
DimensionNames: dimensionNames,
Series: seriesList,
},
},
}
} else {
for _, f := range metric.FieldList() {
fv, ok := convert(f.Value)
if !ok {
continue
}
tmp, ok := azm.findSeriesWithField(f.Key)
if !ok {
// hit an uncached field of a cached metric
var dimensionValues []string
for i, tag := range metric.TagList() {
// Azure custom metrics service supports up to 10 dimensions
if i > 9 {
continue
}
dimensionValues = append(dimensionValues, tag.Value)
}
azm.Data.BaseData.Series = append(azm.Data.BaseData.Series, newAzureMonitorSeries(f, dimensionValues))
continue
}
//counter compute
n := tmp.Count + 1
tmp.Count = n
//max/min compute
if fv < tmp.Min {
tmp.Min = fv
} else if fv > tmp.Max {
tmp.Max = fv
}
//sum compute
tmp.Sum += fv
//store final data
a.cache[id].Data.BaseData.Series = append(a.cache[id].Data.BaseData.Series, tmp)
}
}
}
func (b *azureMonitorMetric) findSeriesWithField(f string) (*azureMonitorSeries, bool) {
if len(b.Data.BaseData.Series) > 0 {
for _, s := range b.Data.BaseData.Series {
if f == s.DimensionValues[len(s.DimensionValues)-1] {
return s, true
}
}
}
return nil, false
}
func newAzureMonitorSeries(f *telegraf.Field, dv []string) *azureMonitorSeries {
fv, ok := convert(f.Value)
if !ok {
log.Printf("unable to convert field %s (type %T) to float type: %v", f.Key, fv, fv)
return nil
}
return &azureMonitorSeries{
DimensionValues: append(append([]string{}, dv...), f.Key),
Min: fv,
Max: fv,
Sum: fv,
Count: 1,
}
}
func (a *AzureMonitor) reset() {
a.cache = make(map[uint64]azureMonitorMetric)
}
func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case int:
return float64(v), true
case int8:
return float64(v), true
case int16:
return float64(v), true
case int32:
return float64(v), true
case int64:
return float64(v), true
case uint:
return float64(v), true
case uint8:
return float64(v), true
case uint16:
return float64(v), true
case uint32:
return float64(v), true
case uint64:
return float64(v), true
case float32:
return float64(v), true
case float64:
return v, true
case string:
f, err := strconv.ParseFloat(v, 64)
if err != nil {
log.Printf("converted string: %s to %v", v, f)
return 0, false
}
return f, true
default:
log.Printf("did not convert %T: %s", v, v)
return 0, false
}
}
func (a *AzureMonitor) push() {
var body []byte
for _, metric := range a.cache {
jsonBytes, err := json.Marshal(&metric)
log.Printf("marshalled point %s", jsonBytes)
if err != nil {
log.Printf("Error marshalling metrics %s", err)
return
}
body = append(body, jsonBytes...)
body = append(body, '\n')
}
log.Printf("Publishing metrics %s", body)
_, err := a.postData(&body)
if err != nil {
log.Printf("Error publishing metrics %s", err)
return
}
return
}
func (a *AzureMonitor) postData(msg *[]byte) (*http.Request, error) {
metricsEndpoint := fmt.Sprintf("https://%s.monitoring.azure.com%s/metrics",
a.Region, a.ResourceID)
req, err := http.NewRequest("POST", metricsEndpoint, bytes.NewBuffer(*msg))
if err != nil {
log.Printf("Error creating HTTP request")
return nil, err
}
req.Header.Set("Authorization", "Bearer "+a.bearerToken)
req.Header.Set("Content-Type", "application/x-ndjson")
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := http.Client{
Transport: tr,
// TODO - fix this
//Timeout: time.Duration(s.HTTPPostTimeout * time.Second),
Timeout: time.Duration(10 * time.Second),
}
resp, err := client.Do(req)
if err != nil {
return req, err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
var reply []byte
reply, err = ioutil.ReadAll(resp.Body)
if err != nil {
reply = nil
}
return req, fmt.Errorf("Post Error. HTTP response code:%d message:%s reply:\n%s",
resp.StatusCode, resp.Status, reply)
}
return req, nil
}
func (a *AzureMonitor) run() {
// The start of the period is truncated to the nearest minute.
//
@ -495,13 +279,214 @@ func (a *AzureMonitor) run() {
}
}
func (a *AzureMonitor) reset() {
a.cache = make(map[string]*azureMonitorMetric)
}
func (a *AzureMonitor) add(metric telegraf.Metric) {
var dimensionNames []string
var dimensionValues []string
for i, tag := range metric.TagList() {
// Azure custom metrics service supports up to 10 dimensions
if i > 10 {
continue
}
dimensionNames = append(dimensionNames, tag.Key)
dimensionValues = append(dimensionValues, tag.Value)
}
// Azure Monitoe does not support string value types, so convert string
// fields to dimensions if enabled.
if a.StringAsDimension {
for _, f := range metric.FieldList() {
switch fv := f.Value.(type) {
case string:
dimensionNames = append(dimensionNames, f.Key)
dimensionValues = append(dimensionValues, fv)
metric.RemoveField(f.Key)
}
}
}
for _, f := range metric.FieldList() {
name := metric.Name() + "_" + f.Key
fv, ok := convert(f.Value)
if !ok {
log.Printf("unable to convert field %s (type %T) to float type: %v", f.Key, fv, fv)
continue
}
if azm, ok := a.cache[name]; !ok {
// hit an uncached metric, create it for first time
a.cache[name] = &azureMonitorMetric{
Time: metric.Time(),
Data: &azureMonitorData{
BaseData: &azureMonitorBaseData{
Metric: name,
Namespace: "default",
DimensionNames: dimensionNames,
Series: []*azureMonitorSeries{
newAzureMonitorSeries(dimensionValues, fv),
},
},
},
}
} else {
tmp, i, ok := azm.findSeries(dimensionValues)
if !ok {
// add series new series (should be rare)
n := append(azm.Data.BaseData.Series, newAzureMonitorSeries(dimensionValues, fv))
a.cache[name].Data.BaseData.Series = n
continue
}
//counter compute
n := tmp.Count + 1
tmp.Count = n
//max/min compute
if fv < tmp.Min {
tmp.Min = fv
} else if fv > tmp.Max {
tmp.Max = fv
}
//sum compute
tmp.Sum += fv
//store final data
a.cache[name].Data.BaseData.Series[i] = tmp
}
}
}
func (m *azureMonitorMetric) findSeries(dv []string) (*azureMonitorSeries, int, bool) {
if len(m.Data.BaseData.DimensionNames) != len(dv) {
return nil, 0, false
}
for i := range m.Data.BaseData.Series {
if m.Data.BaseData.Series[i].equal(dv) {
return m.Data.BaseData.Series[i], i, true
}
}
return nil, 0, false
}
func newAzureMonitorSeries(dv []string, fv float64) *azureMonitorSeries {
return &azureMonitorSeries{
DimensionValues: append([]string{}, dv...),
Min: fv,
Max: fv,
Sum: fv,
Count: 1,
}
}
func (s *azureMonitorSeries) equal(dv []string) bool {
if len(s.DimensionValues) != len(dv) {
return false
}
for i := range dv {
if dv[i] != s.DimensionValues[i] {
return false
}
}
return true
}
func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case int64:
return float64(v), true
case uint64:
return float64(v), true
case float64:
return v, true
case bool:
if v {
return 1, true
}
return 1, true
case string:
f, err := strconv.ParseFloat(v, 64)
if err != nil {
return 0, false
}
return f, true
default:
return 0, false
}
}
func (a *AzureMonitor) push() {
var body []byte
for _, metric := range a.cache {
jsonBytes, err := json.Marshal(&metric)
if err != nil {
log.Printf("Error marshalling metrics %s", err)
return
}
body = append(body, jsonBytes...)
body = append(body, '\n')
}
_, err := a.postData(&body)
if err != nil {
log.Printf("Error publishing aggregate metrics %s", err)
}
return
}
func (a *AzureMonitor) postData(msg *[]byte) (*http.Request, error) {
if err := a.validateCredentials(); err != nil {
return nil, fmt.Errorf("Error authenticating: %v", err)
}
metricsEndpoint := fmt.Sprintf("https://%s.monitoring.azure.com%s/metrics",
a.Region, a.ResourceID)
req, err := http.NewRequest("POST", metricsEndpoint, bytes.NewBuffer(*msg))
if err != nil {
log.Printf("Error creating HTTP request")
return nil, err
}
req.Header.Set("Authorization", "Bearer "+a.bearerToken)
req.Header.Set("Content-Type", "application/x-ndjson")
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := http.Client{
Transport: tr,
Timeout: a.Timeout.Duration,
}
resp, err := client.Do(req)
if err != nil {
return req, err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
var reply []byte
reply, err = ioutil.ReadAll(resp.Body)
if err != nil {
reply = nil
}
return req, fmt.Errorf("Post Error. HTTP response code:%d message:%s reply:\n%s",
resp.StatusCode, resp.Status, reply)
}
return req, nil
}
func init() {
outputs.Add("azuremonitor", func() telegraf.Output {
return &AzureMonitor{
period: time.Minute,
delay: time.Second * 5,
metrics: make(chan telegraf.Metric, 100),
shutdown: make(chan struct{}),
StringAsDimension: true,
Timeout: internal.Duration{Duration: time.Second * 5},
Region: defaultRegion,
period: time.Minute,
delay: time.Second * 5,
metrics: make(chan telegraf.Metric, 100),
shutdown: make(chan struct{}),
}
})
}