Compare commits

...

7 Commits

Author SHA1 Message Date
Gunnar Aasen 4b8d0ad35d Additional code cleanup 2018-05-24 09:45:45 -07:00
Gunnar Aasen 78e5f52966 Revert Godeps changes 2018-05-24 09:45:45 -07:00
Gunnar Aasen 79b6edadd2 Refactor to use AggregatingOutput 2018-05-24 09:44:16 -07:00
Gunnar Aasen 9490a22aeb Output: Azure Monitor: Cleanup and add README 2018-05-24 09:44:16 -07:00
Gunnar Aasen 17093efad5 Output: Azure Monitor: Initial aggregated metric implementation 2018-05-24 09:44:16 -07:00
Mark Simms d077f5dbc7 Starting on azure monitor metrics integration with MSI auth 2018-05-24 09:42:31 -07:00
Daniel Nelson 6cea487bfc Add idea for an output that aggregates before adding to metric buffer 2018-05-24 09:42:31 -07:00
8 changed files with 764 additions and 0 deletions

1
Godeps
View File

@ -1,6 +1,7 @@
code.cloudfoundry.org/clock e9dc86bbf0e5bbe6bf7ff5a6f71e048959b61f71
collectd.org 2ce144541b8903101fb8f1483cc0497a68798122
github.com/aerospike/aerospike-client-go 9701404f4c60a6ea256595d24bf318f721a7e8b8
github.com/Azure/go-autorest 9ad9326b278af8fa5cc67c30c0ce9a58cc0862b2
github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985
github.com/apache/thrift 4aaa92ece8503a6da9bc6701604f69acf2b99d07
github.com/aws/aws-sdk-go c861d27d0304a79f727e9a8a4e2ac1e74602fdc0

View File

@ -113,6 +113,11 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
m, _ = metric.New(name, tags, fields, t)
}
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
output.Add(m)
return
}
ro.metrics.Add(m)
if ro.metrics.Len() == ro.MetricBatchSize {
batch := ro.metrics.Batch(ro.MetricBatchSize)
@ -125,6 +130,12 @@ func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
// Write writes all cached points to this output.
func (ro *RunningOutput) Write() error {
if output, ok := ro.Output.(telegraf.AggregatingOutput); ok {
metrics := output.Push()
ro.metrics.Add(metrics...)
output.Reset()
}
nFails, nMetrics := ro.failMetrics.Len(), ro.metrics.Len()
ro.BufferSize.Set(int64(nFails + nMetrics))
log.Printf("D! Output [%s] buffer fullness: %d / %d metrics. ",

View File

@ -13,6 +13,12 @@ type Output interface {
Write(metrics []Metric) error
}
type AggregatingOutput interface {
Add(in Metric)
Push() []Metric
Reset()
}
type ServiceOutput interface {
// Connect to the Output
Connect() error

View File

@ -4,6 +4,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/amon"
_ "github.com/influxdata/telegraf/plugins/outputs/amqp"
_ "github.com/influxdata/telegraf/plugins/outputs/application_insights"
_ "github.com/influxdata/telegraf/plugins/outputs/azuremonitor"
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
_ "github.com/influxdata/telegraf/plugins/outputs/cratedb"
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"

View File

@ -0,0 +1,74 @@
## Azure Monitor Custom Metrics Output for Telegraf
This plugin will send custom metrics to Azure Monitor.
All metrics are written as summarized values: min, max, sum, count. The Telegraf field name is appended to the metric name. All Telegraf tags are set as the metric dimensions.
## Azure Authentication
This plugin can use one of several different types of credentials to authenticate
with the Azure Monitor Custom Metrics ingestion API endpoint. In the following
order the plugin will attempt to authenticate.
1. Managed Service Identity (MSI) token
- This is the prefered authentication method.
- Note: MSI is only available to ARM-based resources.
2. AAD Application Tokens (Service Principals)
- Primarily useful if Telegraf is writing metrics for other resources. [More information](https://docs.microsoft.com/en-us/azure/active-directory/develop/active-directory-application-objects).
- A Service Principal or User Principal needs to be assigned the `Monitoring Contributor` roles.
3. AAD User Tokens (User Principals)
- Allows Telegraf to authenticate like a user. It is best to use this method for development.
## Config
For this output plugin to function correctly the following variables
must be configured.
* resourceId
* region
### region
The region is the Azure region that you wish to connect to.
Examples include but are not limited to:
* useast
* centralus
* westcentralus
* westeurope
* southeastasia
### resourceId
The resourceId used for Azure Monitor metrics.
### Configuration:
```
# Configuration for sending aggregate metrics to Azure Monitor
[[outputs.azuremonitor]]
## The resource ID against which metric will be logged. If not
## specified, the plugin will attempt to retrieve the resource ID
## of the VM via the instance metadata service (optional if running
## on an Azure VM with MSI)
#resource_id = "/subscriptions/<subscription_id>/resourceGroups/<resource_group>/providers/Microsoft.Compute/virtualMachines/<vm_name>"
## Azure region to publish metrics against. Defaults to eastus.
## Leave blank to automatically query the region via MSI.
#region = "useast"
## Write HTTP timeout, formatted as a string. If not provided, will default
## to 5s. 0s means no timeout (not recommended).
# timeout = "5s"
## Whether or not to use managed service identity.
#useManagedServiceIdentity = true
## Fill in the following values if using Active Directory Service
## Principal or User Principal for authentication.
## Subscription ID
#azureSubscription = ""
## Tenant ID
#azureTenant = ""
## Client ID
#azureClientId = ""
## Client secrete
#azureClientSecret = ""
```

View File

@ -0,0 +1,186 @@
package azuremonitor
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
)
// VirtualMachineMetadata contains information about a VM from the metadata service
type VirtualMachineMetadata struct {
Raw string
AzureResourceID string
Compute struct {
Location string `json:"location"`
Name string `json:"name"`
Offer string `json:"offer"`
OsType string `json:"osType"`
PlacementGroupID string `json:"placementGroupId"`
PlatformFaultDomain string `json:"platformFaultDomain"`
PlatformUpdateDomain string `json:"platformUpdateDomain"`
Publisher string `json:"publisher"`
ResourceGroupName string `json:"resourceGroupName"`
Sku string `json:"sku"`
SubscriptionID string `json:"subscriptionId"`
Tags string `json:"tags"`
Version string `json:"version"`
VMID string `json:"vmId"`
VMScaleSetName string `json:"vmScaleSetName"`
VMSize string `json:"vmSize"`
Zone string `json:"zone"`
} `json:"compute"`
Network struct {
Interface []struct {
Ipv4 struct {
IPAddress []struct {
PrivateIPAddress string `json:"privateIpAddress"`
PublicIPAddress string `json:"publicIpAddress"`
} `json:"ipAddress"`
Subnet []struct {
Address string `json:"address"`
Prefix string `json:"prefix"`
} `json:"subnet"`
} `json:"ipv4"`
Ipv6 struct {
IPAddress []interface{} `json:"ipAddress"`
} `json:"ipv6"`
MacAddress string `json:"macAddress"`
} `json:"interface"`
} `json:"network"`
}
// msiToken is the managed service identity token
type msiToken struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpiresIn string `json:"expires_in"`
ExpiresOn string `json:"expires_on"`
NotBefore string `json:"not_before"`
Resource string `json:"resource"`
TokenType string `json:"token_type"`
expiresAt time.Time
notBefore time.Time
raw string
}
func (m *msiToken) parseTimes() {
val, err := strconv.ParseInt(m.ExpiresOn, 10, 64)
if err == nil {
m.expiresAt = time.Unix(val, 0)
}
val, err = strconv.ParseInt(m.NotBefore, 10, 64)
if err == nil {
m.notBefore = time.Unix(val, 0)
}
}
// ExpiresInDuration returns the duration until the token expires
func (m *msiToken) expiresInDuration() time.Duration {
expiresDuration := m.expiresAt.Sub(time.Now().UTC())
return expiresDuration
}
// GetMsiToken retrieves a managed service identity token from the specified port on the local VM
func (a *AzureMonitor) getMsiToken(clientID string) (*msiToken, error) {
// Acquire an MSI token. Documented at:
// https://docs.microsoft.com/en-us/azure/active-directory/managed-service-identity/how-to-use-vm-token
//
//GET http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fmanagement.azure.com%2F&client_id=712eac09-e943-418c-9be6-9fd5c91078bl HTTP/1.1 Metadata: true
// Create HTTP request for MSI token to access Azure Resource Manager
var msiEndpoint *url.URL
msiEndpoint, err := url.Parse(msiInstanceMetadataURL)
if err != nil {
return nil, err
}
msiParameters := url.Values{}
// Resource ID defaults to https://management.azure.com
msiParameters.Add("resource", defaultMSIResource)
msiParameters.Add("api-version", "2018-02-01")
// Client id is optional
if clientID != "" {
msiParameters.Add("client_id", clientID)
}
msiEndpoint.RawQuery = msiParameters.Encode()
req, err := http.NewRequest("GET", msiEndpoint.String(), nil)
if err != nil {
return nil, err
}
req.Header.Add("Metadata", "true")
resp, err := a.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
reply, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
return nil, fmt.Errorf("E! Get Error. %d HTTP response: %s response body: %s",
resp.StatusCode, resp.Status, reply)
}
var token msiToken
if err := json.Unmarshal(reply, &token); err != nil {
return nil, err
}
token.parseTimes()
token.raw = string(reply)
return &token, nil
}
// GetInstanceMetadata retrieves metadata about the current Azure VM
func (a *AzureMonitor) GetInstanceMetadata() error {
req, err := http.NewRequest("GET", vmInstanceMetadataURL, nil)
if err != nil {
return fmt.Errorf("Error creating HTTP request")
}
req.Header.Set("Metadata", "true")
resp, err := a.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
reply, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
return fmt.Errorf("Post Error. HTTP response code:%d message:%s reply:\n%s",
resp.StatusCode, resp.Status, reply)
}
var metadata VirtualMachineMetadata
if err := json.Unmarshal(reply, &metadata); err != nil {
return err
}
if a.ResourceID == "" {
a.ResourceID = fmt.Sprintf(resourceIDTemplate,
metadata.Compute.SubscriptionID, metadata.Compute.ResourceGroupName, metadata.Compute.Name)
}
if a.Region == "" {
a.Region = metadata.Compute.Location
}
a.url = fmt.Sprintf(urlTemplate, a.Region, a.ResourceID)
return nil
}

View File

@ -0,0 +1,445 @@
package azuremonitor
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"hash/fnv"
"io/ioutil"
"log"
"net/http"
"regexp"
"time"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs"
)
var _ telegraf.AggregatingOutput = (*AzureMonitor)(nil)
var _ telegraf.Output = (*AzureMonitor)(nil)
// AzureMonitor allows publishing of metrics to the Azure Monitor custom metrics service
type AzureMonitor struct {
useMsi bool `toml:"use_managed_service_identity"`
ResourceID string `toml:"resource_id"`
Region string `toml:"region"`
Timeout internal.Duration `toml:"Timeout"`
AzureSubscriptionID string `toml:"azure_subscription"`
AzureTenantID string `toml:"azure_tenant"`
AzureClientID string `toml:"azure_client_id"`
AzureClientSecret string `toml:"azure_client_secret"`
StringAsDimension bool `toml:"string_as_dimension"`
url string
msiToken *msiToken
oauthConfig *adal.OAuthConfig
adalToken adal.OAuthTokenProvider
client *http.Client
cache map[time.Time]map[uint64]*aggregate
}
type aggregate struct {
telegraf.Metric
updated bool
}
const (
defaultRegion string = "eastus"
defaultMSIResource string = "https://monitoring.azure.com/"
urlTemplate string = "https://%s.monitoring.azure.com%s/metrics"
resourceIDTemplate string = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachines/%s"
vmInstanceMetadataURL string = "http://169.254.169.254/metadata/instance?api-version=2017-12-01"
msiInstanceMetadataURL string = "http://169.254.169.254/metadata/identity/oauth2/token"
)
var sampleConfig = `
## The resource ID against which metric will be logged. If not
## specified, the plugin will attempt to retrieve the resource ID
## of the VM via the instance metadata service (optional if running
## on an Azure VM with MSI)
#resource_id = "/subscriptions/<subscription_id>/resourceGroups/<resource_group>/providers/Microsoft.Compute/virtualMachines/<vm_name>"
## Azure region to publish metrics against. Defaults to eastus.
## Leave blank to automatically query the region via MSI.
#region = "useast"
## Write HTTP timeout, formatted as a string. If not provided, will default
## to 5s. 0s means no timeout (not recommended).
# timeout = "5s"
## Whether or not to use managed service identity.
#use_managed_service_identity = true
## Fill in the following values if using Active Directory Service
## Principal or User Principal for authentication.
## Subscription ID
#azure_subscription = ""
## Tenant ID
#azure_tenant = ""
## Client ID
#azure_client_id = ""
## Client secrete
#azure_client_secret = ""
`
// Description provides a description of the plugin
func (a *AzureMonitor) Description() string {
return "Configuration for sending aggregate metrics to Azure Monitor"
}
// SampleConfig provides a sample configuration for the plugin
func (a *AzureMonitor) SampleConfig() string {
return sampleConfig
}
// Connect initializes the plugin and validates connectivity
func (a *AzureMonitor) Connect() error {
a.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: a.Timeout.Duration,
}
// If no direct AD values provided, fall back to MSI
if a.AzureSubscriptionID == "" && a.AzureTenantID == "" && a.AzureClientID == "" && a.AzureClientSecret == "" {
a.useMsi = true
} else if a.AzureSubscriptionID == "" || a.AzureTenantID == "" || a.AzureClientID == "" || a.AzureClientSecret == "" {
return fmt.Errorf("E! Must provide values for azure_subscription, azure_tenant, azure_client and azure_client_secret, or leave all blank to default to MSI")
}
if !a.useMsi {
// If using direct AD authentication create the AD access client
oauthConfig, err := adal.NewOAuthConfig(azure.PublicCloud.ActiveDirectoryEndpoint, a.AzureTenantID)
if err != nil {
return fmt.Errorf("E! Could not initialize AD client: %s", err)
}
a.oauthConfig = oauthConfig
}
// Pull region and resource identifier
err := a.GetInstanceMetadata()
if err != nil && a.ResourceID == "" && a.Region == "" {
return fmt.Errorf("E! No resource id specified, and Azure Instance metadata service not available. If not running on an Azure VM, provide a value for resource_id")
}
err = a.validateCredentials()
if err != nil {
return fmt.Errorf("E! Unable to fetch authentication credentials: %v", err)
}
a.Reset()
return nil
}
func (a *AzureMonitor) validateCredentials() error {
if a.useMsi {
// Check expiry on the token
if a.msiToken == nil || a.msiToken.expiresInDuration() < time.Minute {
msiToken, err := a.getMsiToken(a.AzureClientID)
if err != nil {
return err
}
a.msiToken = msiToken
}
return nil
}
adToken, err := adal.NewServicePrincipalToken(
*(a.oauthConfig), a.AzureClientID, a.AzureClientSecret,
azure.PublicCloud.ActiveDirectoryEndpoint)
if err != nil {
return fmt.Errorf("Could not acquire ADAL token: %s", err)
}
a.adalToken = adToken
return nil
}
// Close shuts down an any active connections
func (a *AzureMonitor) Close() error {
a.client = nil
return nil
}
type azureMonitorMetric struct {
Time time.Time `json:"time"`
Data *azureMonitorData `json:"data"`
}
type azureMonitorData struct {
BaseData *azureMonitorBaseData `json:"baseData"`
}
type azureMonitorBaseData struct {
Metric string `json:"metric"`
Namespace string `json:"namespace"`
DimensionNames []string `json:"dimNames"`
Series []*azureMonitorSeries `json:"series"`
}
type azureMonitorSeries struct {
DimensionValues []string `json:"dimValues"`
Min float64 `json:"min"`
Max float64 `json:"max"`
Sum float64 `json:"sum"`
Count int64 `json:"count"`
}
// Write writes metrics to the remote endpoint
func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
azmetrics := make(map[uint64]*azureMonitorMetric, len(metrics))
for _, m := range metrics {
id := hashIDWithTagKeysOnly(m)
if azm, ok := azmetrics[id]; !ok {
azmetrics[id] = translate(m)
} else {
azmetrics[id].Data.BaseData.Series = append(
azm.Data.BaseData.Series,
translate(m).Data.BaseData.Series...,
)
}
}
var body []byte
for _, m := range azmetrics {
// Azure Monitor accepts new batches of points in new-line delimited
// JSON, following RFC 4288 (see https://github.com/ndjson/ndjson-spec).
jsonBytes, err := json.Marshal(&m)
if err != nil {
return err
}
body = append(body, jsonBytes...)
body = append(body, '\n')
}
if err := a.validateCredentials(); err != nil {
return fmt.Errorf("E! Unable to fetch authentication credentials: %v", err)
}
req, err := http.NewRequest("POST", a.url, bytes.NewBuffer(body))
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+a.msiToken.AccessToken)
req.Header.Set("Content-Type", "application/x-ndjson")
resp, err := a.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
reply, err := ioutil.ReadAll(resp.Body)
if err != nil {
reply = nil
}
return fmt.Errorf("E! Get Error. %d HTTP response: %s response body: %s",
resp.StatusCode, resp.Status, reply)
}
return nil
}
func hashIDWithTagKeysOnly(m telegraf.Metric) uint64 {
h := fnv.New64a()
h.Write([]byte(m.Name()))
h.Write([]byte("\n"))
for _, tag := range m.TagList() {
h.Write([]byte(tag.Key))
h.Write([]byte("\n"))
}
b := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(b, uint64(m.Time().UnixNano()))
h.Write(b[:n])
h.Write([]byte("\n"))
return h.Sum64()
}
func translate(m telegraf.Metric) *azureMonitorMetric {
var dimensionNames []string
var dimensionValues []string
for i, tag := range m.TagList() {
// Azure custom metrics service supports up to 10 dimensions
if i > 10 {
log.Printf("W! [outputs.azuremonitor] metric [%s] exceeds 10 dimensions", m.Name())
continue
}
dimensionNames = append(dimensionNames, tag.Key)
dimensionValues = append(dimensionValues, tag.Value)
}
min, _ := m.GetField("min")
max, _ := m.GetField("max")
sum, _ := m.GetField("sum")
count, _ := m.GetField("count")
return &azureMonitorMetric{
Time: m.Time(),
Data: &azureMonitorData{
BaseData: &azureMonitorBaseData{
Metric: m.Name(),
Namespace: "default",
DimensionNames: dimensionNames,
Series: []*azureMonitorSeries{
&azureMonitorSeries{
DimensionValues: dimensionValues,
Min: min.(float64),
Max: max.(float64),
Sum: sum.(float64),
Count: count.(int64),
},
},
},
},
}
}
// Add will append a metric to the output aggregate
func (a *AzureMonitor) Add(m telegraf.Metric) {
// Azure Monitor only supports aggregates 30 minutes into the past
// and 4 minutes into the future. Future metrics are dropped when pushed.
t := m.Time()
tbucket := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), 0, 0, t.Location())
if tbucket.Before(time.Now().Add(-time.Minute * 30)) {
log.Printf("W! attempted to aggregate metric over 30 minutes old: %v, %v", t, tbucket)
return
}
// Azure Monitor doesn't have a string value type, so convert string
// fields to dimensions (a.k.a. tags) if enabled.
if a.StringAsDimension {
for fk, fv := range m.Fields() {
if v, ok := fv.(string); ok {
m.AddTag(fk, v)
}
}
}
for _, f := range m.FieldList() {
fv, ok := convert(f.Value)
if !ok {
continue
}
// Azure Monitor does not support fields so the field
// name is appended to the metric name.
name := m.Name() + "_" + sanitize(f.Key)
id := hashIDWithField(m.HashID(), f.Key)
_, ok = a.cache[tbucket]
if !ok {
// Time bucket does not exist and needs to be created.
a.cache[tbucket] = make(map[uint64]*aggregate)
}
nf := make(map[string]interface{}, 4)
nf["min"] = fv
nf["max"] = fv
nf["sum"] = fv
nf["count"] = 1
// Fetch existing aggregate
agg, ok := a.cache[tbucket][id]
if ok {
aggfields := agg.Fields()
if fv > aggfields["min"].(float64) {
nf["min"] = aggfields["min"]
}
if fv < aggfields["max"].(float64) {
nf["max"] = aggfields["max"]
}
nf["sum"] = fv + aggfields["sum"].(float64)
nf["count"] = aggfields["count"].(int64) + 1
}
na, _ := metric.New(name, m.Tags(), nf, tbucket)
a.cache[tbucket][id] = &aggregate{na, true}
}
}
func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case int64:
return float64(v), true
case uint64:
return float64(v), true
case float64:
return v, true
case bool:
if v {
return 1, true
}
return 0, true
default:
return 0, false
}
}
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
func sanitize(value string) string {
return invalidNameCharRE.ReplaceAllString(value, "_")
}
func hashIDWithField(id uint64, fk string) uint64 {
h := fnv.New64a()
b := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(b, id)
h.Write(b[:n])
h.Write([]byte("\n"))
h.Write([]byte(fk))
h.Write([]byte("\n"))
return h.Sum64()
}
// Push sends metrics to the output metric buffer
func (a *AzureMonitor) Push() []telegraf.Metric {
var metrics []telegraf.Metric
for tbucket, aggs := range a.cache {
// Do not send metrics early
if tbucket.After(time.Now().Add(-time.Minute)) {
continue
}
for _, agg := range aggs {
// Only send aggregates that have had an update since
// the last push.
if !agg.updated {
continue
}
metrics = append(metrics, agg.Metric)
}
}
return metrics
}
// Reset clears the cache of aggregate metrics
func (a *AzureMonitor) Reset() {
for tbucket := range a.cache {
// Remove aggregates older than 30 minutes
if tbucket.Before(time.Now().Add(-time.Minute * 30)) {
delete(a.cache, tbucket)
continue
}
for id := range a.cache[tbucket] {
a.cache[tbucket][id].updated = false
}
}
}
func init() {
outputs.Add("azuremonitor", func() telegraf.Output {
return &AzureMonitor{
StringAsDimension: false,
Timeout: internal.Duration{Duration: time.Second * 5},
Region: defaultRegion,
cache: make(map[time.Time]map[uint64]*aggregate, 36),
}
})
}

View File

@ -0,0 +1,40 @@
package azuremonitor
import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
// MockMetrics returns a mock []telegraf.Metric object for using in unit tests
// of telegraf output sinks.
func getMockMetrics() []telegraf.Metric {
metrics := make([]telegraf.Metric, 0)
// Create a new point batch
metrics = append(metrics, getTestMetric(1.0))
return metrics
}
// TestMetric Returns a simple test point:
// measurement -> "test1" or name
// tags -> "tag1":"value1"
// value -> value
// time -> time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
func getTestMetric(value interface{}, name ...string) telegraf.Metric {
if value == nil {
panic("Cannot use a nil value")
}
measurement := "test1"
if len(name) > 0 {
measurement = name[0]
}
tags := map[string]string{"tag1": "value1"}
pt, _ := metric.New(
measurement,
tags,
map[string]interface{}{"value": value},
time.Now().UTC(),
)
return pt
}