Add scraping for Prometheus endpoint in Kubernetes (#4920)

This commit is contained in:
Greg 2018-11-05 14:30:16 -07:00 committed by Daniel Nelson
parent 19a338b922
commit 9c866553e8
5 changed files with 381 additions and 4 deletions

25
Gopkg.lock generated
View File

@ -376,6 +376,24 @@
revision = "36d01c2b4cbeb3d2a12063e4880ce30800af9560" revision = "36d01c2b4cbeb3d2a12063e4880ce30800af9560"
version = "v1.1.1" version = "v1.1.1"
[[projects]]
digest = "1:99a0607f79d36202b64b674c0464781549917cfc4bfb88037aaa98b31e124a18"
name = "github.com/ericchiang/k8s"
packages = [
".",
"apis/apiextensions/v1beta1",
"apis/core/v1",
"apis/meta/v1",
"apis/resource",
"runtime",
"runtime/schema",
"util/intstr",
"watch/versioned",
]
pruneopts = ""
revision = "d1bbc0cffaf9849ddcae7b9efffae33e2dd52e9a"
version = "v1.2.0"
[[projects]] [[projects]]
digest = "1:858b7fe7b0f4bc7ef9953926828f2816ea52d01a88d72d1c45bc8c108f23c356" digest = "1:858b7fe7b0f4bc7ef9953926828f2816ea52d01a88d72d1c45bc8c108f23c356"
name = "github.com/go-ini/ini" name = "github.com/go-ini/ini"
@ -1154,7 +1172,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:b697592485cb412be4188c08ca0beed9aab87f36b86418e21acc4a3998f63734" digest = "0:"
name = "golang.org/x/oauth2" name = "golang.org/x/oauth2"
packages = [ packages = [
".", ".",
@ -1235,7 +1253,7 @@
revision = "19ff8768a5c0b8e46ea281065664787eefc24121" revision = "19ff8768a5c0b8e46ea281065664787eefc24121"
[[projects]] [[projects]]
digest = "1:c1771ca6060335f9768dff6558108bc5ef6c58506821ad43377ee23ff059e472" digest = "0:"
name = "google.golang.org/appengine" name = "google.golang.org/appengine"
packages = [ packages = [
".", ".",
@ -1439,6 +1457,9 @@
"github.com/docker/docker/client", "github.com/docker/docker/client",
"github.com/docker/libnetwork/ipvs", "github.com/docker/libnetwork/ipvs",
"github.com/eclipse/paho.mqtt.golang", "github.com/eclipse/paho.mqtt.golang",
"github.com/ericchiang/k8s",
"github.com/ericchiang/k8s/apis/core/v1",
"github.com/ericchiang/k8s/apis/meta/v1",
"github.com/go-logfmt/logfmt", "github.com/go-logfmt/logfmt",
"github.com/go-redis/redis", "github.com/go-redis/redis",
"github.com/go-sql-driver/mysql", "github.com/go-sql-driver/mysql",

View File

@ -14,6 +14,17 @@ in Prometheus format.
## An array of Kubernetes services to scrape metrics from. ## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]
## Kubernetes config file to create client from.
# kube_config = "/path/to/kubernetes.config"
## Scrape Kubernetes pods for the following prometheus annotations:
## - prometheus.io/scrape: Enable scraping for this pod
## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to
## set this to `https` & most likely set the tls config.
## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
## - prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true
## Use bearer token for authorization ## Use bearer token for authorization
# bearer_token = /path/to/bearer/token # bearer_token = /path/to/bearer/token
@ -39,6 +50,18 @@ by looking up all A records assigned to the hostname as described in
This method can be used to locate all This method can be used to locate all
[Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services). [Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services).
#### Kubernetes scraping
Enabling this option will allow the plugin to scrape for prometheus annotation on Kubernetes
pods. Currently, you can run this plugin in your kubernetes cluster, or we use the kubeconfig
file to determine where to monitor.
Currently the following annotation are supported:
* `prometheus.io/scrape` Enable scraping for this pod.
* `prometheus.io/scheme` If the metrics endpoint is secured then you will need to set this to `https` & most likely set the tls config. (default 'http')
* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default '/metrics')
* `prometheus.io/port` Used to override the port. (default 9102)
#### Bearer Token #### Bearer Token
If set, the file specified by the `bearer_token` parameter will be read on If set, the file specified by the `bearer_token` parameter will be read on

View File

@ -0,0 +1,198 @@
package prometheus
import (
"context"
"fmt"
"io/ioutil"
"log"
"net"
"net/url"
"os/user"
"path/filepath"
"sync"
"time"
"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/apis/core/v1"
"gopkg.in/yaml.v2"
)
type payload struct {
eventype string
pod *corev1.Pod
}
// loadClient parses a kubeconfig from a file and returns a Kubernetes
// client. It does not support extensions or client auth providers.
func loadClient(kubeconfigPath string) (*k8s.Client, error) {
data, err := ioutil.ReadFile(kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("failed reading '%s': %v", kubeconfigPath, err)
}
// Unmarshal YAML into a Kubernetes config object.
var config k8s.Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, err
}
return k8s.NewClient(&config)
}
func (p *Prometheus) start(ctx context.Context) error {
client, err := k8s.NewInClusterClient()
if err != nil {
u, err := user.Current()
if err != nil {
return fmt.Errorf("Failed to get current user - %v", err)
}
configLocation := filepath.Join(u.HomeDir, ".kube/config")
if p.KubeConfig != "" {
configLocation = p.KubeConfig
}
client, err = loadClient(configLocation)
if err != nil {
return err
}
}
p.wg = sync.WaitGroup{}
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
err := p.watch(ctx, client)
if err != nil {
log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err)
}
}
}
}()
return nil
}
func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error {
pod := &corev1.Pod{}
watcher, err := client.Watch(ctx, "", &corev1.Pod{})
if err != nil {
return err
}
defer watcher.Close()
for {
select {
case <-ctx.Done():
return nil
default:
pod = &corev1.Pod{}
// An error here means we need to reconnect the watcher.
eventType, err := watcher.Next(pod)
if err != nil {
return err
}
switch eventType {
case k8s.EventAdded:
registerPod(pod, p)
case k8s.EventDeleted:
unregisterPod(pod, p)
case k8s.EventModified:
}
}
}
}
func registerPod(pod *corev1.Pod, p *Prometheus) {
targetURL := getScrapeURL(pod)
if targetURL == nil {
return
}
log.Printf("D! [inputs.prometheus] will scrape metrics from %s", *targetURL)
// add annotation as metrics tags
tags := pod.GetMetadata().GetAnnotations()
tags["pod_name"] = pod.GetMetadata().GetName()
tags["namespace"] = pod.GetMetadata().GetNamespace()
// add labels as metrics tags
for k, v := range pod.GetMetadata().GetLabels() {
tags[k] = v
}
URL, err := url.Parse(*targetURL)
if err != nil {
log.Printf("E! [inputs.prometheus] could not parse URL %s: %v", *targetURL, err)
return
}
podURL := p.AddressToURL(URL, URL.Hostname())
p.lock.Lock()
p.kubernetesPods = append(p.kubernetesPods,
URLAndAddress{
URL: podURL,
Address: URL.Hostname(),
OriginalURL: URL,
Tags: tags})
p.lock.Unlock()
}
func getScrapeURL(pod *corev1.Pod) *string {
scrape := pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"]
if scrape != "true" {
return nil
}
ip := pod.Status.GetPodIP()
if ip == "" {
// return as if scrape was disabled, we will be notified again once the pod
// has an IP
return nil
}
scheme := pod.GetMetadata().GetAnnotations()["prometheus.io/scheme"]
path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"]
port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"]
if scheme == "" {
scheme = "http"
}
if port == "" {
port = "9102"
}
if path == "" {
path = "/metrics"
}
u := &url.URL{
Scheme: scheme,
Host: net.JoinHostPort(ip, port),
Path: path,
}
x := u.String()
return &x
}
func unregisterPod(pod *corev1.Pod, p *Prometheus) {
url := getScrapeURL(pod)
if url == nil {
return
}
p.lock.Lock()
defer p.lock.Unlock()
log.Printf("D! [inputs.prometheus] registered a delete request for %s in namespace %s",
pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace())
var result []URLAndAddress
for _, v := range p.kubernetesPods {
if v.URL.String() != *url {
result = append(result, v)
} else {
log.Printf("D! [inputs.prometheus] will stop scraping for %s", *url)
}
}
p.kubernetesPods = result
}

View File

@ -0,0 +1,88 @@
package prometheus
import (
"testing"
"github.com/stretchr/testify/assert"
v1 "github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
)
func TestScrapeURLNoAnnotations(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.GetMetadata().Annotations = map[string]string{}
url := getScrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotationsNoScrape(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.Metadata.Name = str("myPod")
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"}
url := getScrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotations(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPort(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9000/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPath(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}
func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}
func TestAddPod(t *testing.T) {
prom := &Prometheus{}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
assert.Equal(t, 1, len(prom.kubernetesPods))
}
func TestAddMultiplePods(t *testing.T) {
prom := &Prometheus{}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
p.Metadata.Name = str("Pod2")
registerPod(p, prom)
assert.Equal(t, 2, len(prom.kubernetesPods))
}
func TestDeletePods(t *testing.T) {
prom := &Prometheus{}
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
unregisterPod(p, prom)
assert.Equal(t, 0, len(prom.kubernetesPods))
}
func pod() *v1.Pod {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}}
p.Status.PodIP = str("127.0.0.1")
p.Metadata.Name = str("myPod")
p.Metadata.Namespace = str("default")
return p
}
func str(x string) *string {
return &x
}

View File

@ -1,6 +1,7 @@
package prometheus package prometheus
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -26,6 +27,9 @@ type Prometheus struct {
// An array of Kubernetes services to scrape metrics from. // An array of Kubernetes services to scrape metrics from.
KubernetesServices []string KubernetesServices []string
// Location of kubernetes config file
KubeConfig string
// Bearer Token authorization file path // Bearer Token authorization file path
BearerToken string `toml:"bearer_token"` BearerToken string `toml:"bearer_token"`
@ -34,6 +38,13 @@ type Prometheus struct {
tls.ClientConfig tls.ClientConfig
client *http.Client client *http.Client
// Should we scrape Kubernetes services for prometheus annotations
MonitorPods bool `toml:"monitor_kubernetes_pods"`
lock sync.Mutex
kubernetesPods []URLAndAddress
cancel context.CancelFunc
wg sync.WaitGroup
} }
var sampleConfig = ` var sampleConfig = `
@ -43,6 +54,17 @@ var sampleConfig = `
## An array of Kubernetes services to scrape metrics from. ## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]
## Kubernetes config file to create client from.
# kube_config = "/path/to/kubernetes.config"
## Scrape Kubernetes pods for the following prometheus annotations:
## - prometheus.io/scrape: Enable scraping for this pod
## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to
## set this to 'https' & most likely set the tls config.
## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
## - prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true
## Use bearer token for authorization ## Use bearer token for authorization
# bearer_token = /path/to/bearer/token # bearer_token = /path/to/bearer/token
@ -90,6 +112,7 @@ type URLAndAddress struct {
OriginalURL *url.URL OriginalURL *url.URL
URL *url.URL URL *url.URL
Address string Address string
Tags map[string]string
} }
func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) {
@ -97,20 +120,26 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) {
for _, u := range p.URLs { for _, u := range p.URLs {
URL, err := url.Parse(u) URL, err := url.Parse(u)
if err != nil { if err != nil {
log.Printf("prometheus: Could not parse %s, skipping it. Error: %s", u, err) log.Printf("prometheus: Could not parse %s, skipping it. Error: %s", u, err.Error())
continue continue
} }
allURLs = append(allURLs, URLAndAddress{URL: URL, OriginalURL: URL}) allURLs = append(allURLs, URLAndAddress{URL: URL, OriginalURL: URL})
} }
p.lock.Lock()
defer p.lock.Unlock()
// loop through all pods scraped via the prometheus annotation on the pods
allURLs = append(allURLs, p.kubernetesPods...)
for _, service := range p.KubernetesServices { for _, service := range p.KubernetesServices {
URL, err := url.Parse(service) URL, err := url.Parse(service)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resolvedAddresses, err := net.LookupHost(URL.Hostname()) resolvedAddresses, err := net.LookupHost(URL.Hostname())
if err != nil { if err != nil {
log.Printf("prometheus: Could not resolve %s, skipping it. Error: %s", URL.Host, err) log.Printf("prometheus: Could not resolve %s, skipping it. Error: %s", URL.Host, err.Error())
continue continue
} }
for _, resolved := range resolvedAddresses { for _, resolved := range resolvedAddresses {
@ -244,6 +273,9 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
if u.Address != "" { if u.Address != "" {
tags["address"] = u.Address tags["address"] = u.Address
} }
for k, v := range u.Tags {
tags[k] = v
}
switch metric.Type() { switch metric.Type() {
case telegraf.Counter: case telegraf.Counter:
@ -262,6 +294,21 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
return nil return nil
} }
// Start will start the Kubernetes scraping if enabled in the configuration
func (p *Prometheus) Start(a telegraf.Accumulator) error {
if p.MonitorPods {
var ctx context.Context
ctx, p.cancel = context.WithCancel(context.Background())
return p.start(ctx)
}
return nil
}
func (p *Prometheus) Stop() {
p.cancel()
p.wg.Wait()
}
func init() { func init() {
inputs.Add("prometheus", func() telegraf.Input { inputs.Add("prometheus", func() telegraf.Input {
return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}} return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}}