diff --git a/Gopkg.lock b/Gopkg.lock index a93816c1e..7dd2261c0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -376,6 +376,24 @@ revision = "36d01c2b4cbeb3d2a12063e4880ce30800af9560" version = "v1.1.1" +[[projects]] + digest = "1:99a0607f79d36202b64b674c0464781549917cfc4bfb88037aaa98b31e124a18" + name = "github.com/ericchiang/k8s" + packages = [ + ".", + "apis/apiextensions/v1beta1", + "apis/core/v1", + "apis/meta/v1", + "apis/resource", + "runtime", + "runtime/schema", + "util/intstr", + "watch/versioned", + ] + pruneopts = "" + revision = "d1bbc0cffaf9849ddcae7b9efffae33e2dd52e9a" + version = "v1.2.0" + [[projects]] digest = "1:858b7fe7b0f4bc7ef9953926828f2816ea52d01a88d72d1c45bc8c108f23c356" name = "github.com/go-ini/ini" @@ -1154,7 +1172,7 @@ [[projects]] branch = "master" - digest = "1:b697592485cb412be4188c08ca0beed9aab87f36b86418e21acc4a3998f63734" + digest = "0:" name = "golang.org/x/oauth2" packages = [ ".", @@ -1235,7 +1253,7 @@ revision = "19ff8768a5c0b8e46ea281065664787eefc24121" [[projects]] - digest = "1:c1771ca6060335f9768dff6558108bc5ef6c58506821ad43377ee23ff059e472" + digest = "0:" name = "google.golang.org/appengine" packages = [ ".", @@ -1439,6 +1457,9 @@ "github.com/docker/docker/client", "github.com/docker/libnetwork/ipvs", "github.com/eclipse/paho.mqtt.golang", + "github.com/ericchiang/k8s", + "github.com/ericchiang/k8s/apis/core/v1", + "github.com/ericchiang/k8s/apis/meta/v1", "github.com/go-logfmt/logfmt", "github.com/go-redis/redis", "github.com/go-sql-driver/mysql", diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index 294d84150..37265d332 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -14,6 +14,17 @@ in Prometheus format. ## An array of Kubernetes services to scrape metrics from. # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] + ## Kubernetes config file to create client from. + # kube_config = "/path/to/kubernetes.config" + + ## Scrape Kubernetes pods for the following prometheus annotations: + ## - prometheus.io/scrape: Enable scraping for this pod + ## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to + ## set this to `https` & most likely set the tls config. + ## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. + ## - prometheus.io/port: If port is not 9102 use this annotation + # monitor_kubernetes_pods = true + ## Use bearer token for authorization # bearer_token = /path/to/bearer/token @@ -39,6 +50,18 @@ by looking up all A records assigned to the hostname as described in This method can be used to locate all [Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services). +#### Kubernetes scraping + +Enabling this option will allow the plugin to scrape for prometheus annotation on Kubernetes +pods. Currently, you can run this plugin in your kubernetes cluster, or we use the kubeconfig +file to determine where to monitor. +Currently the following annotation are supported: + +* `prometheus.io/scrape` Enable scraping for this pod. +* `prometheus.io/scheme` If the metrics endpoint is secured then you will need to set this to `https` & most likely set the tls config. (default 'http') +* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default '/metrics') +* `prometheus.io/port` Used to override the port. (default 9102) + #### Bearer Token If set, the file specified by the `bearer_token` parameter will be read on diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go new file mode 100644 index 000000000..4faf2d55e --- /dev/null +++ b/plugins/inputs/prometheus/kubernetes.go @@ -0,0 +1,198 @@ +package prometheus + +import ( + "context" + "fmt" + "io/ioutil" + "log" + "net" + "net/url" + "os/user" + "path/filepath" + "sync" + "time" + + "github.com/ericchiang/k8s" + corev1 "github.com/ericchiang/k8s/apis/core/v1" + "gopkg.in/yaml.v2" +) + +type payload struct { + eventype string + pod *corev1.Pod +} + +// loadClient parses a kubeconfig from a file and returns a Kubernetes +// client. It does not support extensions or client auth providers. +func loadClient(kubeconfigPath string) (*k8s.Client, error) { + data, err := ioutil.ReadFile(kubeconfigPath) + if err != nil { + return nil, fmt.Errorf("failed reading '%s': %v", kubeconfigPath, err) + } + + // Unmarshal YAML into a Kubernetes config object. + var config k8s.Config + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, err + } + return k8s.NewClient(&config) +} + +func (p *Prometheus) start(ctx context.Context) error { + client, err := k8s.NewInClusterClient() + if err != nil { + u, err := user.Current() + if err != nil { + return fmt.Errorf("Failed to get current user - %v", err) + } + configLocation := filepath.Join(u.HomeDir, ".kube/config") + if p.KubeConfig != "" { + configLocation = p.KubeConfig + } + client, err = loadClient(configLocation) + if err != nil { + return err + } + } + + p.wg = sync.WaitGroup{} + + p.wg.Add(1) + go func() { + defer p.wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + err := p.watch(ctx, client) + if err != nil { + log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err) + } + } + } + }() + + return nil +} + +func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error { + pod := &corev1.Pod{} + watcher, err := client.Watch(ctx, "", &corev1.Pod{}) + if err != nil { + return err + } + defer watcher.Close() + + for { + select { + case <-ctx.Done(): + return nil + default: + pod = &corev1.Pod{} + // An error here means we need to reconnect the watcher. + eventType, err := watcher.Next(pod) + if err != nil { + return err + } + + switch eventType { + case k8s.EventAdded: + registerPod(pod, p) + case k8s.EventDeleted: + unregisterPod(pod, p) + case k8s.EventModified: + } + } + } +} + +func registerPod(pod *corev1.Pod, p *Prometheus) { + targetURL := getScrapeURL(pod) + if targetURL == nil { + return + } + + log.Printf("D! [inputs.prometheus] will scrape metrics from %s", *targetURL) + // add annotation as metrics tags + tags := pod.GetMetadata().GetAnnotations() + tags["pod_name"] = pod.GetMetadata().GetName() + tags["namespace"] = pod.GetMetadata().GetNamespace() + // add labels as metrics tags + for k, v := range pod.GetMetadata().GetLabels() { + tags[k] = v + } + URL, err := url.Parse(*targetURL) + if err != nil { + log.Printf("E! [inputs.prometheus] could not parse URL %s: %v", *targetURL, err) + return + } + podURL := p.AddressToURL(URL, URL.Hostname()) + p.lock.Lock() + p.kubernetesPods = append(p.kubernetesPods, + URLAndAddress{ + URL: podURL, + Address: URL.Hostname(), + OriginalURL: URL, + Tags: tags}) + p.lock.Unlock() +} + +func getScrapeURL(pod *corev1.Pod) *string { + scrape := pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] + if scrape != "true" { + return nil + } + ip := pod.Status.GetPodIP() + if ip == "" { + // return as if scrape was disabled, we will be notified again once the pod + // has an IP + return nil + } + + scheme := pod.GetMetadata().GetAnnotations()["prometheus.io/scheme"] + path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"] + port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"] + + if scheme == "" { + scheme = "http" + } + if port == "" { + port = "9102" + } + if path == "" { + path = "/metrics" + } + + u := &url.URL{ + Scheme: scheme, + Host: net.JoinHostPort(ip, port), + Path: path, + } + + x := u.String() + + return &x +} + +func unregisterPod(pod *corev1.Pod, p *Prometheus) { + url := getScrapeURL(pod) + if url == nil { + return + } + + p.lock.Lock() + defer p.lock.Unlock() + log.Printf("D! [inputs.prometheus] registered a delete request for %s in namespace %s", + pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace()) + var result []URLAndAddress + for _, v := range p.kubernetesPods { + if v.URL.String() != *url { + result = append(result, v) + } else { + log.Printf("D! [inputs.prometheus] will stop scraping for %s", *url) + } + + } + p.kubernetesPods = result +} diff --git a/plugins/inputs/prometheus/kubernetes_test.go b/plugins/inputs/prometheus/kubernetes_test.go new file mode 100644 index 000000000..2afdbc5ec --- /dev/null +++ b/plugins/inputs/prometheus/kubernetes_test.go @@ -0,0 +1,88 @@ +package prometheus + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + v1 "github.com/ericchiang/k8s/apis/core/v1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" +) + +func TestScrapeURLNoAnnotations(t *testing.T) { + p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} + p.GetMetadata().Annotations = map[string]string{} + url := getScrapeURL(p) + assert.Nil(t, url) +} +func TestScrapeURLAnnotationsNoScrape(t *testing.T) { + p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} + p.Metadata.Name = str("myPod") + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"} + url := getScrapeURL(p) + assert.Nil(t, url) +} +func TestScrapeURLAnnotations(t *testing.T) { + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} + url := getScrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9102/metrics", *url) +} +func TestScrapeURLAnnotationsCustomPort(t *testing.T) { + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} + url := getScrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9000/metrics", *url) +} +func TestScrapeURLAnnotationsCustomPath(t *testing.T) { + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} + url := getScrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) +} + +func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"} + url := getScrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) +} + +func TestAddPod(t *testing.T) { + prom := &Prometheus{} + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + assert.Equal(t, 1, len(prom.kubernetesPods)) +} +func TestAddMultiplePods(t *testing.T) { + prom := &Prometheus{} + + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + p.Metadata.Name = str("Pod2") + registerPod(p, prom) + assert.Equal(t, 2, len(prom.kubernetesPods)) +} +func TestDeletePods(t *testing.T) { + prom := &Prometheus{} + + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + unregisterPod(p, prom) + assert.Equal(t, 0, len(prom.kubernetesPods)) +} + +func pod() *v1.Pod { + p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}} + p.Status.PodIP = str("127.0.0.1") + p.Metadata.Name = str("myPod") + p.Metadata.Namespace = str("default") + return p +} + +func str(x string) *string { + return &x +} diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index b8e346032..84fc31800 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -1,6 +1,7 @@ package prometheus import ( + "context" "errors" "fmt" "io/ioutil" @@ -26,6 +27,9 @@ type Prometheus struct { // An array of Kubernetes services to scrape metrics from. KubernetesServices []string + // Location of kubernetes config file + KubeConfig string + // Bearer Token authorization file path BearerToken string `toml:"bearer_token"` @@ -34,6 +38,13 @@ type Prometheus struct { tls.ClientConfig client *http.Client + + // Should we scrape Kubernetes services for prometheus annotations + MonitorPods bool `toml:"monitor_kubernetes_pods"` + lock sync.Mutex + kubernetesPods []URLAndAddress + cancel context.CancelFunc + wg sync.WaitGroup } var sampleConfig = ` @@ -43,6 +54,17 @@ var sampleConfig = ` ## An array of Kubernetes services to scrape metrics from. # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] + ## Kubernetes config file to create client from. + # kube_config = "/path/to/kubernetes.config" + + ## Scrape Kubernetes pods for the following prometheus annotations: + ## - prometheus.io/scrape: Enable scraping for this pod + ## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to + ## set this to 'https' & most likely set the tls config. + ## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. + ## - prometheus.io/port: If port is not 9102 use this annotation + # monitor_kubernetes_pods = true + ## Use bearer token for authorization # bearer_token = /path/to/bearer/token @@ -90,6 +112,7 @@ type URLAndAddress struct { OriginalURL *url.URL URL *url.URL Address string + Tags map[string]string } func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { @@ -97,20 +120,26 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { for _, u := range p.URLs { URL, err := url.Parse(u) if err != nil { - log.Printf("prometheus: Could not parse %s, skipping it. Error: %s", u, err) + log.Printf("prometheus: Could not parse %s, skipping it. Error: %s", u, err.Error()) continue } allURLs = append(allURLs, URLAndAddress{URL: URL, OriginalURL: URL}) } + p.lock.Lock() + defer p.lock.Unlock() + // loop through all pods scraped via the prometheus annotation on the pods + allURLs = append(allURLs, p.kubernetesPods...) + for _, service := range p.KubernetesServices { URL, err := url.Parse(service) if err != nil { return nil, err } + resolvedAddresses, err := net.LookupHost(URL.Hostname()) if err != nil { - log.Printf("prometheus: Could not resolve %s, skipping it. Error: %s", URL.Host, err) + log.Printf("prometheus: Could not resolve %s, skipping it. Error: %s", URL.Host, err.Error()) continue } for _, resolved := range resolvedAddresses { @@ -244,6 +273,9 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error if u.Address != "" { tags["address"] = u.Address } + for k, v := range u.Tags { + tags[k] = v + } switch metric.Type() { case telegraf.Counter: @@ -262,6 +294,21 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error return nil } +// Start will start the Kubernetes scraping if enabled in the configuration +func (p *Prometheus) Start(a telegraf.Accumulator) error { + if p.MonitorPods { + var ctx context.Context + ctx, p.cancel = context.WithCancel(context.Background()) + return p.start(ctx) + } + return nil +} + +func (p *Prometheus) Stop() { + p.cancel() + p.wg.Wait() +} + func init() { inputs.Add("prometheus", func() telegraf.Input { return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}}