Compare commits

..

10 Commits

Author SHA1 Message Date
Kelvin Wang
54ac4d70c9 add kube state metrics 2018-07-06 11:54:44 -07:00
kelwang
90a38bd125 Merge pull request #2 from kelwang/revert-1-jenkins-gollar-changes
Revert "Jenkins gollar changes"
2018-06-25 17:14:14 -07:00
kelwang
d017718033 Revert "Jenkins gollar changes" 2018-06-25 17:14:03 -07:00
kelwang
8ff50e4327 Merge pull request #1 from kelwang/jenkins-gollar-changes
Jenkins gollar changes
2018-06-25 17:12:08 -07:00
Kelvin Wang
4ec7999186 use semaphore model 2018-06-25 17:10:15 -07:00
Kelvin Wang
3457c98eb1 remove err struct 2018-06-25 15:43:10 -07:00
Kelvin Wang
e7ff7d506b fix conflicts 2018-06-22 10:48:18 -07:00
Kelvin Wang
cdc15205d8 fix go 1.8 compatibility 2018-06-22 10:35:43 -07:00
Kelvin Wang
73eaa057d1 add requested changes 2018-06-22 10:35:43 -07:00
Kelvin Wang
9c85c05fcb add jenkins lib 2018-06-22 10:35:43 -07:00
26 changed files with 826 additions and 1180 deletions

59
Gopkg.lock generated
View File

@@ -317,10 +317,19 @@
[[projects]] [[projects]]
name = "github.com/gogo/protobuf" name = "github.com/gogo/protobuf"
packages = ["proto"] packages = [
"proto",
"sortkeys"
]
revision = "1adfc126b41513cc696b209667c8656ea7aac67c" revision = "1adfc126b41513cc696b209667c8656ea7aac67c"
version = "v1.0.0" version = "v1.0.0"
[[projects]]
branch = "master"
name = "github.com/golang/glog"
packages = ["."]
revision = "23def4e6c14b4da8ac2ed8007337bc5eb5007998"
[[projects]] [[projects]]
name = "github.com/golang/protobuf" name = "github.com/golang/protobuf"
packages = [ packages = [
@@ -350,6 +359,12 @@
revision = "3af367b6b30c263d47e8895973edcca9a49cf029" revision = "3af367b6b30c263d47e8895973edcca9a49cf029"
version = "v0.2.0" version = "v0.2.0"
[[projects]]
branch = "master"
name = "github.com/google/gofuzz"
packages = ["."]
revision = "24818f796faf91cd76ec7bddd72458fbced7a6c1"
[[projects]] [[projects]]
name = "github.com/gorilla/context" name = "github.com/gorilla/context"
packages = ["."] packages = ["."]
@@ -925,6 +940,12 @@
revision = "7f5bdfd858bb064d80559b2a32b86669c5de5d3b" revision = "7f5bdfd858bb064d80559b2a32b86669c5de5d3b"
version = "v3.0.5" version = "v3.0.5"
[[projects]]
name = "gopkg.in/inf.v0"
packages = ["."]
revision = "d2d2541c53f18d2a059457998ce2876cc8e67cbf"
version = "v0.9.1"
[[projects]] [[projects]]
name = "gopkg.in/ldap.v2" name = "gopkg.in/ldap.v2"
packages = ["."] packages = ["."]
@@ -965,9 +986,43 @@
revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183" revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183"
version = "v2.2.1" version = "v2.2.1"
[[projects]]
name = "k8s.io/api"
packages = ["core/v1"]
revision = "af4bc157c3a209798fc897f6d4aaaaeb6c2e0d6a"
version = "kubernetes-1.9.0"
[[projects]]
branch = "release-1.11"
name = "k8s.io/apimachinery"
packages = [
"pkg/api/resource",
"pkg/apis/meta/v1",
"pkg/conversion",
"pkg/conversion/queryparams",
"pkg/fields",
"pkg/labels",
"pkg/runtime",
"pkg/runtime/schema",
"pkg/selection",
"pkg/types",
"pkg/util/errors",
"pkg/util/intstr",
"pkg/util/json",
"pkg/util/net",
"pkg/util/runtime",
"pkg/util/sets",
"pkg/util/validation",
"pkg/util/validation/field",
"pkg/util/wait",
"pkg/watch",
"third_party/forked/golang/reflect"
]
revision = "103fd098999dc9c0c88536f5c9ad2e5da39373ae"
[solve-meta] [solve-meta]
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
inputs-digest = "024194b983d91b9500fe97e0aa0ddb5fe725030cb51ddfb034e386cae1098370" inputs-digest = "e475e221e1a1bbcd2eced72dfe4c152382581c7588f087d3f36941df8984c8f6"
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View File

@@ -241,3 +241,15 @@
[[override]] [[override]]
source = "https://github.com/fsnotify/fsnotify/archive/v1.4.7.tar.gz" source = "https://github.com/fsnotify/fsnotify/archive/v1.4.7.tar.gz"
name = "gopkg.in/fsnotify.v1" name = "gopkg.in/fsnotify.v1"
[[constraint]]
name = "k8s.io/api"
version = "kubernetes-1.11.0"
[[constraint]]
name = "k8s.io/apimachinery"
version = "kubernetes-1.11.0"
[[constraint]]
name = "k8s.io/kubernetes"
version = "v1.11.0"

View File

@@ -54,11 +54,11 @@ fmtcheck:
@echo '[INFO] done.' @echo '[INFO] done.'
test-windows: test-windows:
go test ./plugins/inputs/ping/... go test -short ./plugins/inputs/ping/...
go test ./plugins/inputs/win_perf_counters/... go test -short ./plugins/inputs/win_perf_counters/...
go test ./plugins/inputs/win_services/... go test -short ./plugins/inputs/win_services/...
go test ./plugins/inputs/procstat/... go test -short ./plugins/inputs/procstat/...
go test ./plugins/inputs/ntpq/... go test -short ./plugins/inputs/ntpq/...
# vet runs the Go source code static analysis tool `vet` to find # vet runs the Go source code static analysis tool `vet` to find
# any common errors. # any common errors.
@@ -92,4 +92,4 @@ docker-image:
plugins/parsers/influx/machine.go: plugins/parsers/influx/machine.go.rl plugins/parsers/influx/machine.go: plugins/parsers/influx/machine.go.rl
ragel -Z -G2 $^ -o $@ ragel -Z -G2 $^ -o $@
.PHONY: deps telegraf install test test-windows lint vet test-all package clean docker-image fmtcheck uint64 .PHONY: deps telegraf install test test-windows lint vet test-all package clean docker-image fmtcheck uint64

View File

@@ -9,8 +9,6 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only) 1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd) 1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd)
1. [Dropwizard](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#dropwizard) 1. [Dropwizard](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#dropwizard)
1. [Grok](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#grok)
Telegraf metrics, like InfluxDB Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
@@ -653,37 +651,5 @@ For more information about the dropwizard json format see
# [inputs.exec.dropwizard_tag_paths] # [inputs.exec.dropwizard_tag_paths]
# tag1 = "tags.tag1" # tag1 = "tags.tag1"
# tag2 = "tags.tag2" # tag2 = "tags.tag2"
```
#### Grok ```
Parse logstash-style "grok" patterns:
```toml
[inputs.reader]
## This is a list of patterns to check the given log file(s) for.
## Note that adding patterns here increases processing time. The most
## efficient configuration is to have one pattern per logparser.
## Other common built-in patterns are:
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
patterns = ["%{COMBINED_LOG_FORMAT}"]
## Name of the outputted measurement name.
name_override = "apache_access_log"
## Full path(s) to custom pattern files.
custom_pattern_files = []
## Custom patterns can also be defined here. Put one pattern per line.
custom_patterns = '''
## Timezone allows you to provide an override for timestamps that
## don't already include an offset
## e.g. 04/06/2016 12:41:45 data one two 5.43µs
##
## Default: "" which renders UTC
## Options are as follows:
## 1. Local -- interpret based on machine localtime
## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
## 3. UTC -- or blank/unspecified, will return timestamp in UTC
timezone = "Canada/Eastern"
```

View File

@@ -211,12 +211,16 @@ var header = `# Telegraf Configuration
# Environment variables can be used anywhere in this config file, simply prepend # Environment variables can be used anywhere in this config file, simply prepend
# them with $. For strings the variable must be within quotes (ie, "$STR_VAR"), # them with $. For strings the variable must be within quotes (ie, "$STR_VAR"),
# for numbers and booleans they should be plain (ie, $INT_VAR, $BOOL_VAR) # for numbers and booleans they should be plain (ie, $INT_VAR, $BOOL_VAR)
# Global tags can be specified here in key="value" format. # Global tags can be specified here in key="value" format.
[global_tags] [global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1 # dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a" # rack = "1a"
## Environment variables can be used as tags, and throughout the config file ## Environment variables can be used as tags, and throughout the config file
# user = "$USER" # user = "$USER"
# Configuration for telegraf agent # Configuration for telegraf agent
[agent] [agent]
## Default data collection interval for all inputs ## Default data collection interval for all inputs
@@ -224,20 +228,24 @@ var header = `# Telegraf Configuration
## Rounds collection interval to 'interval' ## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc. ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true round_interval = true
## Telegraf will send metrics to outputs in batches of at most ## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics. ## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins. ## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000 metric_batch_size = 1000
## For failed writes, telegraf will cache metric_buffer_limit metrics for each ## For failed writes, telegraf will cache metric_buffer_limit metrics for each
## output, and will flush this buffer on a successful write. Oldest metrics ## output, and will flush this buffer on a successful write. Oldest metrics
## are dropped first when this buffer fills. ## are dropped first when this buffer fills.
## This buffer only fills when writes fail to output plugin(s). ## This buffer only fills when writes fail to output plugin(s).
metric_buffer_limit = 10000 metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount. ## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting. ## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the ## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system. ## same time, which can have a measurable effect on the system.
collection_jitter = "0s" collection_jitter = "0s"
## Default flushing interval for all outputs. You shouldn't set this below ## Default flushing interval for all outputs. You shouldn't set this below
## interval. Maximum flush_interval will be flush_interval + flush_jitter ## interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "10s" flush_interval = "10s"
@@ -245,6 +253,7 @@ var header = `# Telegraf Configuration
## large write spikes for users running a large number of telegraf instances. ## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s" flush_jitter = "0s"
## By default or when set to "0s", precision will be set to the same ## By default or when set to "0s", precision will be set to the same
## timestamp order as the collection interval, with the maximum being 1s. ## timestamp order as the collection interval, with the maximum being 1s.
## ie, when interval = "10s", precision will be "1s" ## ie, when interval = "10s", precision will be "1s"
@@ -253,6 +262,7 @@ var header = `# Telegraf Configuration
## service input to set the timestamp at the appropriate precision. ## service input to set the timestamp at the appropriate precision.
## Valid time units are "ns", "us" (or "µs"), "ms", "s". ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
precision = "" precision = ""
## Logging configuration: ## Logging configuration:
## Run telegraf with debug log messages. ## Run telegraf with debug log messages.
debug = false debug = false
@@ -260,34 +270,41 @@ var header = `# Telegraf Configuration
quiet = false quiet = false
## Specify the log file name. The empty string means to log to stderr. ## Specify the log file name. The empty string means to log to stderr.
logfile = "" logfile = ""
## Override default hostname, if empty use os.Hostname() ## Override default hostname, if empty use os.Hostname()
hostname = "" hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent. ## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false omit_hostname = false
############################################################################### ###############################################################################
# OUTPUT PLUGINS # # OUTPUT PLUGINS #
############################################################################### ###############################################################################
` `
var processorHeader = ` var processorHeader = `
############################################################################### ###############################################################################
# PROCESSOR PLUGINS # # PROCESSOR PLUGINS #
############################################################################### ###############################################################################
` `
var aggregatorHeader = ` var aggregatorHeader = `
############################################################################### ###############################################################################
# AGGREGATOR PLUGINS # # AGGREGATOR PLUGINS #
############################################################################### ###############################################################################
` `
var inputHeader = ` var inputHeader = `
############################################################################### ###############################################################################
# INPUT PLUGINS # # INPUT PLUGINS #
############################################################################### ###############################################################################
` `
var serviceInputHeader = ` var serviceInputHeader = `
############################################################################### ###############################################################################
# SERVICE INPUT PLUGINS # # SERVICE INPUT PLUGINS #
############################################################################### ###############################################################################
@@ -1321,59 +1338,6 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
} }
} }
//for grok data_format
if node, ok := tbl.Fields["named_patterns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.NamedPatterns = append(c.NamedPatterns, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["patterns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.Patterns = append(c.Patterns, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["custom_patterns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CustomPatterns = str.Value
}
}
}
if node, ok := tbl.Fields["custom_pattern_files"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.CustomPatternFiles = append(c.CustomPatternFiles, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["timezone"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.TimeZone = str.Value
}
}
}
c.MetricName = name c.MetricName = name
delete(tbl.Fields, "data_format") delete(tbl.Fields, "data_format")
@@ -1389,11 +1353,6 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "dropwizard_time_format") delete(tbl.Fields, "dropwizard_time_format")
delete(tbl.Fields, "dropwizard_tags_path") delete(tbl.Fields, "dropwizard_tags_path")
delete(tbl.Fields, "dropwizard_tag_paths") delete(tbl.Fields, "dropwizard_tag_paths")
delete(tbl.Fields, "named_patterns")
delete(tbl.Fields, "patterns")
delete(tbl.Fields, "custom_patterns")
delete(tbl.Fields, "custom_pattern_files")
delete(tbl.Fields, "timezone")
return parsers.NewParser(c) return parsers.NewParser(c)
} }

View File

@@ -48,6 +48,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy"
_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor" _ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"
_ "github.com/influxdata/telegraf/plugins/inputs/kube_state"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes" _ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs" _ "github.com/influxdata/telegraf/plugins/inputs/leofs"
_ "github.com/influxdata/telegraf/plugins/inputs/logparser" _ "github.com/influxdata/telegraf/plugins/inputs/logparser"
@@ -85,7 +86,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/puppetagent" _ "github.com/influxdata/telegraf/plugins/inputs/puppetagent"
_ "github.com/influxdata/telegraf/plugins/inputs/rabbitmq" _ "github.com/influxdata/telegraf/plugins/inputs/rabbitmq"
_ "github.com/influxdata/telegraf/plugins/inputs/raindrops" _ "github.com/influxdata/telegraf/plugins/inputs/raindrops"
_ "github.com/influxdata/telegraf/plugins/inputs/reader"
_ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak" _ "github.com/influxdata/telegraf/plugins/inputs/riak"

View File

@@ -0,0 +1,77 @@
### Line Protocol
### PODs
#### kube_pod
namespace =
name =
host_ip =
pod_ip =
node =
created_by_kind =
created_by_name =
owner_kind =
owner_name =
owner_is_controller = "true"
label_1 = ""
label_2 = ""
created = ""
start_time =
completion_time =
owner =
label_* =
created =
status_scheduled_time
#### kube_pod_status_scheduled_time
#### kube_pod_status_phase
#### kube_pod_status_ready
#### kube_pod_status_scheduled
#### kube_pod_container_info
namespace=
pod_name=
container_name=
#### kube_pod_container_status_waiting
#### kube_pod_container_status_waiting_reason
#### kube_pod_container_status_running
#### kube_pod_container_status_terminated
#### kube_pod_container_status_terminated_reason
#### kube_pod_container_status_ready
#### kube_pod_container_status_restarts_total
#### kube_pod_container_resource_requests
#### kube_pod_container_resource_limits
#### kube_pod_container_resource_requests_cpu_cores
#### kube_pod_container_resource_requests_memory_bytes
#### kube_pod_container_resource_limits_cpu_cores
#### kube_pod_container_resource_limits_memory_bytes
#### kube_pod_spec_volumes_persistentvolumeclaims_info
#### kube_pod_spec_volumes_persistentvolumeclaims_readonly

View File

@@ -0,0 +1,144 @@
package kube_state
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type client struct {
baseURL string
httpClient *http.Client
bearerToken string
semaphore chan struct{}
}
func newClient(baseURL string, timeout time.Duration, maxConns int, bearerToken string, tlsConfig *tls.Config) *client {
return &client{
baseURL: baseURL,
httpClient: &http.Client{
Transport: &http.Transport{
MaxIdleConns: maxConns,
TLSClientConfig: tlsConfig,
},
Timeout: timeout,
},
bearerToken: bearerToken,
semaphore: make(chan struct{}, maxConns),
}
}
func (c *client) getAPIResourceList(ctx context.Context) (rList *metav1.APIResourceList, err error) {
rList = new(metav1.APIResourceList)
if err = c.doGet(ctx, "", rList); err != nil {
return nil, err
}
if rList.GroupVersion == "" {
return nil, &APIError{
URL: c.baseURL,
StatusCode: http.StatusOK,
Title: "empty group version",
}
}
return rList, nil
}
func (c *client) getNodes(ctx context.Context) (list *v1.NodeList, err error) {
list = new(v1.NodeList)
if err = c.doGet(ctx, "/nodes/", list); err != nil {
return nil, err
}
return list, nil
}
func (c *client) getPods(ctx context.Context) (list *v1.PodList, err error) {
list = new(v1.PodList)
if err = c.doGet(ctx, "/pods/", list); err != nil {
return nil, err
}
return list, nil
}
func (c *client) getConfigMaps(ctx context.Context) (list *v1.ConfigMapList, err error) {
list = new(v1.ConfigMapList)
if err = c.doGet(ctx, "/configmaps/", list); err != nil {
return nil, err
}
return list, nil
}
func (c *client) doGet(ctx context.Context, url string, v interface{}) error {
req, err := createGetRequest(c.baseURL+url, c.bearerToken)
if err != nil {
return err
}
select {
case c.semaphore <- struct{}{}:
break
case <-ctx.Done():
return ctx.Err()
}
resp, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
<-c.semaphore
return err
}
defer func() {
resp.Body.Close()
<-c.semaphore
}()
// Clear invalid token if unauthorized
if resp.StatusCode == http.StatusUnauthorized {
c.bearerToken = ""
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return &APIError{
URL: url,
StatusCode: resp.StatusCode,
Title: resp.Status,
}
}
if resp.StatusCode == http.StatusNoContent {
return nil
}
return json.NewDecoder(resp.Body).Decode(v)
}
func createGetRequest(url string, token string) (*http.Request, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
if token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}
req.Header.Add("Accept", "application/json")
return req, nil
}
type APIError struct {
URL string
StatusCode int
Title string
Description string
}
func (e APIError) Error() string {
if e.Description != "" {
return fmt.Sprintf("[%s] %s: %s", e.URL, e.Title, e.Description)
}
return fmt.Sprintf("[%s] %s", e.URL, e.Title)
}

View File

@@ -0,0 +1,42 @@
package kube_state
import (
"context"
"time"
"github.com/influxdata/telegraf"
"k8s.io/api/core/v1"
)
var configMapMeasurement = "kube_configmap"
func registerConfigMapCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
list, err := ks.client.getConfigMaps(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, s := range list.Items {
if err = ks.gatherConfigMap(s, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ks *KubenetesState) gatherConfigMap(s v1.ConfigMap, acc telegraf.Accumulator) error {
var creationTime time.Time
if !s.CreationTimestamp.IsZero() {
creationTime = s.CreationTimestamp.Time
}
fields := map[string]interface{}{
"gauge": 1,
}
tags := map[string]string{
"namespace": s.Namespace,
"configmap": s.Name,
"resource_version": s.ResourceVersion,
}
acc.AddFields(configMapMeasurement, fields, tags, creationTime)
return nil
}

View File

@@ -0,0 +1,194 @@
package kube_state
import (
"bytes"
"context"
"crypto/md5"
"fmt"
"log"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// KubenetesState represents the config object for the plugin.
type KubenetesState struct {
URL string
// Bearer Token authorization file path
BearerToken string `toml:"bearer_token"`
// MaxConnections for worker pool tcp connections
MaxConnections int `toml:"max_connections"`
// HTTP Timeout specified as a string - 3s, 1m, 1h
ResponseTimeout internal.Duration `toml:"response_timeout"`
tls.ClientConfig
client *client
rListHash string
filter filter.Filter
lastFilterBuilt int64
ResourceListCheckInterval *internal.Duration `toml:"resouce_list_check_interval"`
ResourceExclude []string `toml:"resource_exclude"`
DisablePodNonGenericResourceMetrics bool `json:"disable_pod_non_generic_resource_metrics"`
DisableNodeNonGenericResourceMetrics bool `json:"disable_node_non_generic_resource_metrics"`
}
var sampleConfig = `
## URL for the kubelet
url = "http://1.1.1.1:10255"
## Use bearer token for authorization
# bearer_token = /path/to/bearer/token
## Set response_timeout (default 5 seconds)
# response_timeout = "5s"
## Optional TLS Config
# tls_ca = /path/to/cafile
# tls_cert = /path/to/certfile
# tls_key = /path/to/keyfile
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Woker pool for kube_state_metric plugin only
# empty this field will use default value 30
# max_connections = 30
`
//SampleConfig returns a sample config
func (k *KubenetesState) SampleConfig() string {
return sampleConfig
}
//Description returns the description of this plugin
func (k *KubenetesState) Description() string {
return "Read metrics from the kubernetes kubelet api"
}
//Gather collects kubernetes metrics from a given URL
func (k *KubenetesState) Gather(acc telegraf.Accumulator) (err error) {
var rList *metav1.APIResourceList
if k.client == nil {
if k.client, rList, err = k.initClient(); err != nil {
return err
}
goto buildFilter
}
if k.lastFilterBuilt > 0 && time.Now().Unix()-k.lastFilterBuilt < int64(k.ResourceListCheckInterval.Duration.Seconds()) {
println("! skip to gather")
goto doGather
}
rList, err = k.client.getAPIResourceList(context.Background())
if err != nil {
return err
}
buildFilter:
k.lastFilterBuilt = time.Now().Unix()
if err = k.buildFilter(rList); err != nil {
return err
}
doGather:
for n, f := range availableCollectors {
ctx := context.Background()
if k.filter.Match(n) {
println("!", n)
go f(ctx, acc, k)
}
}
return nil
}
func (k *KubenetesState) buildFilter(rList *metav1.APIResourceList) error {
hash, err := genHash(rList)
if err != nil {
return err
}
if k.rListHash == hash {
return nil
}
k.rListHash = hash
include := make([]string, len(rList.APIResources))
for k, v := range rList.APIResources {
include[k] = v.Name
}
k.filter, err = filter.NewIncludeExcludeFilter(include, k.ResourceExclude)
return err
}
func genHash(rList *metav1.APIResourceList) (string, error) {
buf := new(bytes.Buffer)
for _, v := range rList.APIResources {
if _, err := buf.WriteString(v.Name + "|"); err != nil {
return "", err
}
}
sum := md5.Sum(buf.Bytes())
return string(sum[:]), nil
}
var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, k *KubenetesState){
// "cronjobs": RegisterCronJobCollector,
// "daemonsets": RegisterDaemonSetCollector,
// "deployments": RegisterDeploymentCollector,
// "jobs": RegisterJobCollector,
// "limitranges": RegisterLimitRangeCollector,
"nodes": registerNodeCollector,
"pods": registerPodCollector,
// "replicasets": RegisterReplicaSetCollector,
// "replicationcontrollers": RegisterReplicationControllerCollector,
// "resourcequotas": RegisterResourceQuotaCollector,
// "services": RegisterServiceCollector,
// "statefulsets": RegisterStatefulSetCollector,
// "persistentvolumes": RegisterPersistentVolumeCollector,
// "persistentvolumeclaims": RegisterPersistentVolumeClaimCollector,
// "namespaces": RegisterNamespaceCollector,
// "horizontalpodautoscalers": RegisterHorizontalPodAutoScalerCollector,
// "endpoints": RegisterEndpointCollector,
// "secrets": RegisterSecretCollector,
"configmaps": registerConfigMapCollector,
}
func (k *KubenetesState) initClient() (*client, *metav1.APIResourceList, error) {
tlsCfg, err := k.ClientConfig.TLSConfig()
if err != nil {
return nil, nil, fmt.Errorf("error parse kube state metrics config[%s]: %v", k.URL, err)
}
// default 30 concurrent TCP connections
if k.MaxConnections == 0 {
k.MaxConnections = 30
}
// default check resourceList every hour
if k.ResourceListCheckInterval == nil {
k.ResourceListCheckInterval = &internal.Duration{
Duration: time.Hour,
}
}
c := newClient(k.URL, k.ResponseTimeout.Duration, k.MaxConnections, k.BearerToken, tlsCfg)
rList, err := c.getAPIResourceList(context.Background())
if err != nil {
return nil, nil, fmt.Errorf("error connect to kubenetes api endpoint[%s]: %v", k.URL, err)
}
log.Printf("I! Kubenetes API group version is %s", rList.GroupVersion)
return c, rList, nil
}
func init() {
inputs.Add("kubernetes_state", func() telegraf.Input {
return &KubenetesState{}
})
}

View File

@@ -0,0 +1,73 @@
package kube_state
import (
"context"
"strconv"
"github.com/influxdata/telegraf"
"k8s.io/api/core/v1"
)
var (
nodeMeasurement = "kube_node"
nodeTaintMeasurement = "kube_node_spec_taint"
)
func registerNodeCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
list, err := ks.client.getNodes(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, n := range list.Items {
if err = ks.gatherNode(n, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ks *KubenetesState) gatherNode(n v1.Node, acc telegraf.Accumulator) error {
fields := map[string]interface{}{}
tags := map[string]string{
"node": n.Name,
"kernel_version": n.Status.NodeInfo.KernelVersion,
"os_image": n.Status.NodeInfo.OSImage,
"container_runtime_version": n.Status.NodeInfo.ContainerRuntimeVersion,
"kubelet_version": n.Status.NodeInfo.KubeletVersion,
"kubeproxy_version": n.Status.NodeInfo.KubeProxyVersion,
"provider_id": n.Spec.ProviderID,
"spec_unschedulable": strconv.FormatBool(n.Spec.Unschedulable)
}
if !n.CreationTimestamp.IsZero() {
fields["created"] = n.CreationTimestamp.Unix()
}
for k, v := range n.Labels {
tags["label_"+sanitizeLabelName(k)] = v
}
// Collect node taints
for _, taint := range n.Spec.Taints {
go gatherNodeTaint(n, taint, acc)
}
acc.AddFields(nodeMeasurement, fields, tags)
return nil
}
func gatherNodeTaint(n v1.Node, taint v1.Taint,acc telegraf.Accumulator){
fields := map[string]interface{}{
"gauge":1,
}
tags := map[string]string{
"node": n.Name,
"key": taint.Key,
"value": taint.Value,
"effect":string(taint.Effect),
}
acc.AddFields(nodeTaintMeasurement, fields, tags)
}

View File

@@ -0,0 +1,202 @@
package kube_state
import (
"context"
"regexp"
"strconv"
"github.com/influxdata/telegraf"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/util/node"
)
var (
podMeasurement = "kube_pod"
podContainerMeasurement = "kube_pod_container"
podVolumeMeasurement = "kube_pod_spec_volumes"
)
func registerPodCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
list, err := ks.client.getPods(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, p := range list.Items {
if err = ks.gatherPod(p, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ks *KubenetesState) gatherPod(p v1.Pod, acc telegraf.Accumulator) error {
nodeName := p.Spec.NodeName
fields := make(map[string]interface{})
tags := make(map[string]string)
createdBy := metav1.GetControllerOf(&p)
createdByKind := ""
createdByName := ""
if createdBy != nil {
if createdBy.Kind != "" {
createdByKind = createdBy.Kind
}
if createdBy.Name != "" {
createdByName = createdBy.Name
}
}
if p.Status.StartTime != nil {
fields["start_time"] = p.Status.StartTime.UnixNano()
}
tags["namesapce"] = p.Namespace
tags["name"] = p.Name
tags["host_ip"] = p.Status.HostIP
tags["pod_ip"] = p.Status.PodIP
tags["node"] = nodeName
tags["created_by_kind"] = createdByKind
tags["created_by_name"] = createdByName
tags["status_scheduled"] = "false"
tags["status_ready"] = "false"
owners := p.GetOwnerReferences()
if len(owners) == 0 {
tags["owner_kind"] = ""
tags["owner_name"] = ""
tags["owner_is_controller"] = ""
} else {
tags["owner_kind"] = owners[0].Kind
tags["owner_name"] = owners[0].Name
if owners[0].Controller != nil {
tags["owner_is_controller"] = strconv.FormatBool(*owners[0].Controller)
} else {
tags["owner_is_controller"] = "false"
}
}
for k, v := range p.Labels {
tags["label_"+sanitizeLabelName(k)] = v
}
if phase := p.Status.Phase; phase != "" {
tags["status_phase"] = string(phase)
// This logic is directly copied from: https://github.com/kubernetes/kubernetes/blob/d39bfa0d138368bbe72b0eaf434501dcb4ec9908/pkg/printers/internalversion/printers.go#L597-L601
// For more info, please go to: https://github.com/kubernetes/kube-state-metrics/issues/410
if p.DeletionTimestamp != nil && p.Status.Reason == node.NodeUnreachablePodReason {
tags["status_phase"] = string(v1.PodUnknown)
}
}
if !p.CreationTimestamp.IsZero() {
fields["created"] = p.CreationTimestamp.Unix()
}
for _, c := range p.Status.Conditions {
switch c.Type {
case v1.PodReady:
tags["status_ready"] = "true"
case v1.PodScheduled:
tags["status_scheduled"] = "true"
fields["status_scheduled_time"] = c.LastTransitionTime.Unix()
}
}
var lastFinishTime int64
for i, cs := range p.Status.ContainerStatuses {
c := p.Spec.Containers[i]
gatherPodContainer(nodeName, p, cs, c, &lastFinishTime, acc)
}
if lastFinishTime > 0 {
fields["completion_time"] = lastFinishTime
}
for _, v := range p.Spec.Volumes {
if v.PersistentVolumeClaim != nil {
gatherPodVolume(v, p, acc)
}
}
acc.AddFields(podMeasurement, fields, tags)
return nil
}
func gatherPodVolume(v v1.Volume, p v1.Pod, acc telegraf.Accumulator) {
fields := map[string]interface{}{
"read_only": 0.0,
}
tags := map[string]string{
"namespace": p.Namespace,
"pod": p.Name,
"volume": v.Name,
"persistentvolumeclaim": v.PersistentVolumeClaim.ClaimName,
}
if v.PersistentVolumeClaim.ReadOnly {
fields["read_only"] = 1.0
}
acc.AddFields(podVolumeMeasurement, fields, tags)
}
func gatherPodContainer(nodeName string, p v1.Pod, cs v1.ContainerStatus, c v1.Container, lastFinishTime *int64, acc telegraf.Accumulator) {
fields := map[string]interface{}{
"status_restarts_total": cs.RestartCount,
}
tags := map[string]string{
"namespace": p.Namespace,
"pod_name": p.Name,
"node_name": nodeName,
"container": c.Name,
"image": cs.Image,
"image_id": cs.ImageID,
"container_id": cs.ContainerID,
"status_waiting": strconv.FormatBool(cs.State.Waiting != nil),
"status_waiting_reason": "",
"status_running": strconv.FormatBool(cs.State.Terminated != nil),
"status_terminated": strconv.FormatBool(cs.State.Running != nil),
"status_terminated_reason": "",
"container_status_ready": strconv.FormatBool(cs.Ready),
}
if cs.State.Waiting != nil {
tags["status_waiting_reason"] = cs.State.Waiting.Reason
}
if cs.State.Terminated != nil {
tags["status_terminated_reason"] = cs.State.Terminated.Reason
if *lastFinishTime == 0 || *lastFinishTime < cs.State.Terminated.FinishedAt.Unix() {
*lastFinishTime = cs.State.Terminated.FinishedAt.Unix()
}
}
req := c.Resources.Requests
lim := c.Resources.Limits
for resourceName, val := range req {
switch resourceName {
case v1.ResourceCPU:
fields["resource_requests_cpu_cores"] = val.MilliValue() / 1000
default:
fields["resource_requests_"+sanitizeLabelName(string(resourceName))+"_bytes"] = val.Value()
}
}
for resourceName, val := range lim {
switch resourceName {
case v1.ResourceCPU:
fields["resource_limits_cpu_cores"] = val.MilliValue() / 1000
default:
fields["resource_limits_"+sanitizeLabelName(string(resourceName))+"_bytes"] = val.Value()
}
}
acc.AddFields(podContainerMeasurement, fields, tags)
}
var invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
func sanitizeLabelName(s string) string {
return invalidLabelCharRE.ReplaceAllString(s, "_")
}

View File

@@ -1,23 +0,0 @@
# Reader Input Plugin
The `reader` plugin reads and parses files every interval. Reader will always begin at the top of each file.
Reader supports all data_format formats
### Configuration
```toml
## Files to parse each interval.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = ""
```

View File

@@ -1,14 +0,0 @@
version: '3'
services:
telegraf:
image: glinton/scratch
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
- ./json_a.log:/var/log/test.log
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@@ -1,15 +0,0 @@
{
"parent": {
"child": 3.0,
"ignored_child": "hi"
},
"ignored_null": null,
"integer": 4,
"list": [3, 4],
"ignored_parent": {
"another_ignored_null": null,
"ignored_string": "hello, world!"
},
"another_list": [4]
}

View File

@@ -1,8 +0,0 @@
[[inputs.reader]]
files = ["/var/log/test.log"]
data_format = "json"
name_override = "json_reader"
[[outputs.file]]
files = ["stdout"]

View File

@@ -1,102 +0,0 @@
package reader
import (
"io/ioutil"
"log"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
type Reader struct {
Filepaths []string `toml:"files"`
FromBeginning bool
parser parsers.Parser
Filenames []string
}
const sampleConfig = `## Files to parse each interval.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = ""
`
// SampleConfig returns the default configuration of the Input
func (r *Reader) SampleConfig() string {
return sampleConfig
}
func (r *Reader) Description() string {
return "reload and gather from file[s] on telegraf's interval"
}
func (r *Reader) Gather(acc telegraf.Accumulator) error {
r.refreshFilePaths()
for _, k := range r.Filenames {
metrics, err := r.readMetric(k)
if err != nil {
return err
}
for i, m := range metrics {
//error if m is nil
if m == nil {
log.Printf("E! Metric could not be parsed from: %v, on line %v", k, i)
continue
}
acc.AddFields(m.Name(), m.Fields(), m.Tags())
}
}
return nil
}
func (r *Reader) SetParser(p parsers.Parser) {
r.parser = p
}
func (r *Reader) refreshFilePaths() {
var allFiles []string
for _, filepath := range r.Filepaths {
g, err := globpath.Compile(filepath)
if err != nil {
log.Printf("E! Error Glob %s failed to compile, %s", filepath, err)
continue
}
files := g.Match()
for k := range files {
allFiles = append(allFiles, k)
}
}
r.Filenames = allFiles
}
//requires that Parser has been compiled
func (r *Reader) readMetric(filename string) ([]telegraf.Metric, error) {
fileContents, err := ioutil.ReadFile(filename)
if err != nil {
log.Printf("E! File could not be opened: %v", filename)
}
return r.parser.Parse(fileContents)
}
func init() {
inputs.Add("reader", func() telegraf.Input {
return &Reader{}
})
}

View File

@@ -1,64 +0,0 @@
package reader
import (
"runtime"
"strings"
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
func TestRefreshFilePaths(t *testing.T) {
testDir := getPluginDir()
r := Reader{
Filepaths: []string{testDir + "/logparser/grok/testdata/**.log"},
}
r.refreshFilePaths()
assert.Equal(t, len(r.Filenames), 2)
}
func TestJSONParserCompile(t *testing.T) {
testDir := getPluginDir()
var acc testutil.Accumulator
r := Reader{
Filepaths: []string{testDir + "/reader/testfiles/json_a.log"},
}
parserConfig := parsers.Config{
DataFormat: "json",
TagKeys: []string{"parent_ignored_child"},
}
nParser, err := parsers.NewParser(&parserConfig)
r.parser = nParser
assert.NoError(t, err)
r.Gather(&acc)
assert.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags)
assert.Equal(t, 5, len(acc.Metrics[0].Fields))
}
func TestGrokParser(t *testing.T) {
testDir := getPluginDir()
var acc testutil.Accumulator
r := Reader{
Filepaths: []string{testDir + "/reader/testfiles/grok_a.log"},
}
parserConfig := parsers.Config{
DataFormat: "grok",
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
}
nParser, err := parsers.NewParser(&parserConfig)
r.parser = nParser
assert.NoError(t, err)
err = r.Gather(&acc)
assert.Equal(t, 2, len(acc.Metrics))
}
func getPluginDir() string {
_, filename, _, _ := runtime.Caller(1)
return strings.Replace(filename, "/reader/reader_test.go", "", 1)
}

View File

@@ -1,2 +0,0 @@
127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326
128.0.0.1 user-identifier tony [10/Oct/2000:13:55:36 -0800] "GET /apache_pb.gif HTTP/1.0" 300 45

View File

@@ -1,14 +0,0 @@
{
"parent": {
"child": 3.0,
"ignored_child": "hi"
},
"ignored_null": null,
"integer": 4,
"list": [3, 4],
"ignored_parent": {
"another_ignored_null": null,
"ignored_string": "hello, world!"
},
"another_list": [4]
}

View File

@@ -1,73 +0,0 @@
# Captures are a slightly modified version of logstash "grok" patterns, with
# the format %{<capture syntax>[:<semantic name>][:<modifier>]}
# By default all named captures are converted into string fields.
# Modifiers can be used to convert captures to other types or tags.
# Timestamp modifiers can be used to convert captures to the timestamp of the
# parsed metric.
# View logstash grok pattern docs here:
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
# All default logstash patterns are supported, these can be viewed here:
# https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
# Available modifiers:
# string (default if nothing is specified)
# int
# float
# duration (ie, 5.23ms gets converted to int nanoseconds)
# tag (converts the field into a tag)
# drop (drops the field completely)
# Timestamp modifiers:
# ts-ansic ("Mon Jan _2 15:04:05 2006")
# ts-unix ("Mon Jan _2 15:04:05 MST 2006")
# ts-ruby ("Mon Jan 02 15:04:05 -0700 2006")
# ts-rfc822 ("02 Jan 06 15:04 MST")
# ts-rfc822z ("02 Jan 06 15:04 -0700")
# ts-rfc850 ("Monday, 02-Jan-06 15:04:05 MST")
# ts-rfc1123 ("Mon, 02 Jan 2006 15:04:05 MST")
# ts-rfc1123z ("Mon, 02 Jan 2006 15:04:05 -0700")
# ts-rfc3339 ("2006-01-02T15:04:05Z07:00")
# ts-rfc3339nano ("2006-01-02T15:04:05.999999999Z07:00")
# ts-httpd ("02/Jan/2006:15:04:05 -0700")
# ts-epoch (seconds since unix epoch)
# ts-epochnano (nanoseconds since unix epoch)
# ts-"CUSTOM"
# CUSTOM time layouts must be within quotes and be the representation of the
# "reference time", which is Mon Jan 2 15:04:05 -0700 MST 2006
# See https://golang.org/pkg/time/#Parse for more details.
# Example log file pattern, example log looks like this:
# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs
# Breakdown of the DURATION pattern below:
# NUMBER is a builtin logstash grok pattern matching float & int numbers.
# [nuµm]? is a regex specifying 0 or 1 of the characters within brackets.
# s is also regex, this pattern must end in "s".
# so DURATION will match something like '5.324ms' or '6.1µs' or '10s'
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time_ns:duration}
EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
# Wider-ranging username matching vs. logstash built-in %{USER}
NGUSERNAME [a-zA-Z0-9\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
# Wider-ranging client IP matching
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
##
## COMMON LOG PATTERNS
##
# apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)
# Combined log format is the same as the common log format but with the addition
# of two quoted strings at the end for "referrer" and "agent"
# See Examples at http://httpd.apache.org/docs/current/mod/mod_log_config.html
COMBINED_LOG_FORMAT %{COMMON_LOG_FORMAT} %{QS:referrer} %{QS:agent}
# HTTPD log formats
HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel:tag}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg}
HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel:tag}\] \[pid %{POSINT:pid:int}:tid %{NUMBER:tid:int}\]( \(%{POSINT:proxy_errorcode:int}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message}
HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}

View File

@@ -1,78 +0,0 @@
package grok
// DEFAULT_PATTERNS SHOULD BE KEPT IN-SYNC WITH patterns/influx-patterns
const DEFAULT_PATTERNS = `
# Captures are a slightly modified version of logstash "grok" patterns, with
# the format %{<capture syntax>[:<semantic name>][:<modifier>]}
# By default all named captures are converted into string fields.
# Modifiers can be used to convert captures to other types or tags.
# Timestamp modifiers can be used to convert captures to the timestamp of the
# parsed metric.
# View logstash grok pattern docs here:
# https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
# All default logstash patterns are supported, these can be viewed here:
# https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
# Available modifiers:
# string (default if nothing is specified)
# int
# float
# duration (ie, 5.23ms gets converted to int nanoseconds)
# tag (converts the field into a tag)
# drop (drops the field completely)
# Timestamp modifiers:
# ts-ansic ("Mon Jan _2 15:04:05 2006")
# ts-unix ("Mon Jan _2 15:04:05 MST 2006")
# ts-ruby ("Mon Jan 02 15:04:05 -0700 2006")
# ts-rfc822 ("02 Jan 06 15:04 MST")
# ts-rfc822z ("02 Jan 06 15:04 -0700")
# ts-rfc850 ("Monday, 02-Jan-06 15:04:05 MST")
# ts-rfc1123 ("Mon, 02 Jan 2006 15:04:05 MST")
# ts-rfc1123z ("Mon, 02 Jan 2006 15:04:05 -0700")
# ts-rfc3339 ("2006-01-02T15:04:05Z07:00")
# ts-rfc3339nano ("2006-01-02T15:04:05.999999999Z07:00")
# ts-httpd ("02/Jan/2006:15:04:05 -0700")
# ts-epoch (seconds since unix epoch)
# ts-epochnano (nanoseconds since unix epoch)
# ts-"CUSTOM"
# CUSTOM time layouts must be within quotes and be the representation of the
# "reference time", which is Mon Jan 2 15:04:05 -0700 MST 2006
# See https://golang.org/pkg/time/#Parse for more details.
# Example log file pattern, example log looks like this:
# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432µs
# Breakdown of the DURATION pattern below:
# NUMBER is a builtin logstash grok pattern matching float & int numbers.
# [nuµm]? is a regex specifying 0 or 1 of the characters within brackets.
# s is also regex, this pattern must end in "s".
# so DURATION will match something like '5.324ms' or '6.1µs' or '10s'
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time_ns:duration}
EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
# Wider-ranging username matching vs. logstash built-in %{USER}
NGUSERNAME [a-zA-Z0-9\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
# Wider-ranging client IP matching
CLIENT (?:%{IPV6}|%{IPV4}|%{HOSTNAME}|%{HOSTPORT})
##
## COMMON LOG PATTERNS
##
# apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NOTSPACE:ident} %{NOTSPACE:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)
# Combined log format is the same as the common log format but with the addition
# of two quoted strings at the end for "referrer" and "agent"
# See Examples at http://httpd.apache.org/docs/current/mod/mod_log_config.html
COMBINED_LOG_FORMAT %{COMMON_LOG_FORMAT} %{QS:referrer} %{QS:agent}
# HTTPD log formats
HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel:tag}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg}
HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel:tag}\] \[pid %{POSINT:pid:int}:tid %{NUMBER:tid:int}\]( \(%{POSINT:proxy_errorcode:int}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message}
HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}
`

View File

@@ -1,527 +0,0 @@
package grok
import (
"bufio"
"fmt"
"log"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/vjeantet/grok"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
var timeLayouts = map[string]string{
"ts-ansic": "Mon Jan _2 15:04:05 2006",
"ts-unix": "Mon Jan _2 15:04:05 MST 2006",
"ts-ruby": "Mon Jan 02 15:04:05 -0700 2006",
"ts-rfc822": "02 Jan 06 15:04 MST",
"ts-rfc822z": "02 Jan 06 15:04 -0700", // RFC822 with numeric zone
"ts-rfc850": "Monday, 02-Jan-06 15:04:05 MST",
"ts-rfc1123": "Mon, 02 Jan 2006 15:04:05 MST",
"ts-rfc1123z": "Mon, 02 Jan 2006 15:04:05 -0700", // RFC1123 with numeric zone
"ts-rfc3339": "2006-01-02T15:04:05Z07:00",
"ts-rfc3339nano": "2006-01-02T15:04:05.999999999Z07:00",
"ts-httpd": "02/Jan/2006:15:04:05 -0700",
// These three are not exactly "layouts", but they are special cases that
// will get handled in the ParseLine function.
"ts-epoch": "EPOCH",
"ts-epochnano": "EPOCH_NANO",
"ts-syslog": "SYSLOG_TIMESTAMP",
"ts": "GENERIC_TIMESTAMP", // try parsing all known timestamp layouts.
}
const (
INT = "int"
TAG = "tag"
FLOAT = "float"
STRING = "string"
DURATION = "duration"
DROP = "drop"
EPOCH = "EPOCH"
EPOCH_NANO = "EPOCH_NANO"
SYSLOG_TIMESTAMP = "SYSLOG_TIMESTAMP"
GENERIC_TIMESTAMP = "GENERIC_TIMESTAMP"
)
var (
// matches named captures that contain a modifier.
// ie,
// %{NUMBER:bytes:int}
// %{IPORHOST:clientip:tag}
// %{HTTPDATE:ts1:ts-http}
// %{HTTPDATE:ts2:ts-"02 Jan 06 15:04"}
modifierRe = regexp.MustCompile(`%{\w+:(\w+):(ts-".+"|t?s?-?\w+)}`)
// matches a plain pattern name. ie, %{NUMBER}
patternOnlyRe = regexp.MustCompile(`%{(\w+)}`)
)
// Parser is the primary struct to handle and grok-patterns defined in the config toml
type Parser struct {
Patterns []string
// namedPatterns is a list of internally-assigned names to the patterns
// specified by the user in Patterns.
// They will look like:
// GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc.
NamedPatterns []string
CustomPatterns string
CustomPatternFiles []string
Measurement string
// Timezone is an optional component to help render log dates to
// your chosen zone.
// Default: "" which renders UTC
// Options are as follows:
// 1. Local -- interpret based on machine localtime
// 2. "America/Chicago" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
// 3. UTC -- or blank/unspecified, will return timestamp in UTC
Timezone string
loc *time.Location
// typeMap is a map of patterns -> capture name -> modifier,
// ie, {
// "%{TESTLOG}":
// {
// "bytes": "int",
// "clientip": "tag"
// }
// }
typeMap map[string]map[string]string
// tsMap is a map of patterns -> capture name -> timestamp layout.
// ie, {
// "%{TESTLOG}":
// {
// "httptime": "02/Jan/2006:15:04:05 -0700"
// }
// }
tsMap map[string]map[string]string
// patterns is a map of all of the parsed patterns from CustomPatterns
// and CustomPatternFiles.
// ie, {
// "DURATION": "%{NUMBER}[nuµm]?s"
// "RESPONSE_CODE": "%{NUMBER:rc:tag}"
// }
patterns map[string]string
// foundTsLayouts is a slice of timestamp patterns that have been found
// in the log lines. This slice gets updated if the user uses the generic
// 'ts' modifier for timestamps. This slice is checked first for matches,
// so that previously-matched layouts get priority over all other timestamp
// layouts.
foundTsLayouts []string
timeFunc func() time.Time
g *grok.Grok
tsModder *tsModder
}
// Compile is a bound method to Parser which will process the options for our parser
func (p *Parser) Compile() error {
p.typeMap = make(map[string]map[string]string)
p.tsMap = make(map[string]map[string]string)
p.patterns = make(map[string]string)
p.tsModder = &tsModder{}
var err error
p.g, err = grok.NewWithConfig(&grok.Config{NamedCapturesOnly: true})
if err != nil {
return err
}
// Give Patterns fake names so that they can be treated as named
// "custom patterns"
p.NamedPatterns = make([]string, 0, len(p.Patterns))
for i, pattern := range p.Patterns {
pattern = strings.TrimSpace(pattern)
if pattern == "" {
continue
}
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.NamedPatterns = append(p.NamedPatterns, "%{"+name+"}")
}
if len(p.NamedPatterns) == 0 {
return fmt.Errorf("pattern required")
}
// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse
// them together as the same type of pattern.
p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns
if len(p.CustomPatterns) != 0 {
scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns))
p.addCustomPatterns(scanner)
}
// Parse any custom pattern files supplied.
for _, filename := range p.CustomPatternFiles {
file, fileErr := os.Open(filename)
if fileErr != nil {
return fileErr
}
scanner := bufio.NewScanner(bufio.NewReader(file))
p.addCustomPatterns(scanner)
}
if p.Measurement == "" {
p.Measurement = "logparser_grok"
}
p.loc, err = time.LoadLocation(p.Timezone)
if err != nil {
log.Printf("W! improper timezone supplied (%s), setting loc to UTC", p.Timezone)
p.loc, _ = time.LoadLocation("UTC")
}
if p.timeFunc == nil {
p.timeFunc = time.Now
}
return p.compileCustomPatterns()
}
// ParseLine is the primary function to process individual lines, returning the metrics
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
var err error
// values are the parsed fields from the log line
var values map[string]string
// the matching pattern string
var patternName string
for _, pattern := range p.NamedPatterns {
if values, err = p.g.Parse(pattern, line); err != nil {
return nil, err
}
if len(values) != 0 {
patternName = pattern
break
}
}
if len(values) == 0 {
log.Printf("D! Grok no match found for: %q", line)
return nil, nil
}
fields := make(map[string]interface{})
tags := make(map[string]string)
timestamp := time.Now()
for k, v := range values {
if k == "" || v == "" {
continue
}
// t is the modifier of the field
var t string
// check if pattern has some modifiers
if types, ok := p.typeMap[patternName]; ok {
t = types[k]
}
// if we didn't find a modifier, check if we have a timestamp layout
if t == "" {
if ts, ok := p.tsMap[patternName]; ok {
// check if the modifier is a timestamp layout
if layout, ok := ts[k]; ok {
t = layout
}
}
}
// if we didn't find a type OR timestamp modifier, assume string
if t == "" {
t = STRING
}
switch t {
case INT:
iv, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Printf("E! Error parsing %s to int: %s", v, err)
} else {
fields[k] = iv
}
case FLOAT:
fv, err := strconv.ParseFloat(v, 64)
if err != nil {
log.Printf("E! Error parsing %s to float: %s", v, err)
} else {
fields[k] = fv
}
case DURATION:
d, err := time.ParseDuration(v)
if err != nil {
log.Printf("E! Error parsing %s to duration: %s", v, err)
} else {
fields[k] = int64(d)
}
case TAG:
tags[k] = v
case STRING:
fields[k] = strings.Trim(v, `"`)
case EPOCH:
parts := strings.SplitN(v, ".", 2)
if len(parts) == 0 {
log.Printf("E! Error parsing %s to timestamp: %s", v, err)
break
}
sec, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
log.Printf("E! Error parsing %s to timestamp: %s", v, err)
break
}
ts := time.Unix(sec, 0)
if len(parts) == 2 {
padded := fmt.Sprintf("%-9s", parts[1])
nsString := strings.Replace(padded[:9], " ", "0", -1)
nanosec, err := strconv.ParseInt(nsString, 10, 64)
if err != nil {
log.Printf("E! Error parsing %s to timestamp: %s", v, err)
break
}
ts = ts.Add(time.Duration(nanosec) * time.Nanosecond)
}
timestamp = ts
case EPOCH_NANO:
iv, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Printf("E! Error parsing %s to int: %s", v, err)
} else {
timestamp = time.Unix(0, iv)
}
case SYSLOG_TIMESTAMP:
ts, err := time.ParseInLocation("Jan 02 15:04:05", v, p.loc)
if err == nil {
if ts.Year() == 0 {
ts = ts.AddDate(timestamp.Year(), 0, 0)
}
timestamp = ts
} else {
log.Printf("E! Error parsing %s to time layout [%s]: %s", v, t, err)
}
case GENERIC_TIMESTAMP:
var foundTs bool
// first try timestamp layouts that we've already found
for _, layout := range p.foundTsLayouts {
ts, err := time.ParseInLocation(layout, v, p.loc)
if err == nil {
timestamp = ts
foundTs = true
break
}
}
// if we haven't found a timestamp layout yet, try all timestamp
// layouts.
if !foundTs {
for _, layout := range timeLayouts {
ts, err := time.ParseInLocation(layout, v, p.loc)
if err == nil {
timestamp = ts
foundTs = true
p.foundTsLayouts = append(p.foundTsLayouts, layout)
break
}
}
}
// if we still haven't found a timestamp layout, log it and we will
// just use time.Now()
if !foundTs {
log.Printf("E! Error parsing timestamp [%s], could not find any "+
"suitable time layouts.", v)
}
case DROP:
// goodbye!
default:
ts, err := time.ParseInLocation(t, v, p.loc)
if err == nil {
timestamp = ts
} else {
log.Printf("E! Error parsing %s to time layout [%s]: %s", v, t, err)
}
}
}
if len(fields) == 0 {
return nil, fmt.Errorf("logparser_grok: must have one or more fields")
}
return metric.New(p.Measurement, tags, fields, p.tsModder.tsMod(timestamp))
}
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
lines := strings.Split(string(buf), "\n")
var metrics []telegraf.Metric
for _, line := range lines {
m, err := p.ParseLine(line)
if err != nil {
return nil, err
}
metrics = append(metrics, m)
}
return metrics, nil
}
func (p *Parser) SetDefaultTags(tags map[string]string) {
//needs implementation
}
func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) {
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if len(line) > 0 && line[0] != '#' {
names := strings.SplitN(line, " ", 2)
p.patterns[names[0]] = names[1]
}
}
}
func (p *Parser) compileCustomPatterns() error {
var err error
// check if the pattern contains a subpattern that is already defined
// replace it with the subpattern for modifier inheritance.
for i := 0; i < 2; i++ {
for name, pattern := range p.patterns {
subNames := patternOnlyRe.FindAllStringSubmatch(pattern, -1)
for _, subName := range subNames {
if subPattern, ok := p.patterns[subName[1]]; ok {
pattern = strings.Replace(pattern, subName[0], subPattern, 1)
}
}
p.patterns[name] = pattern
}
}
// check if pattern contains modifiers. Parse them out if it does.
for name, pattern := range p.patterns {
if modifierRe.MatchString(pattern) {
// this pattern has modifiers, so parse out the modifiers
pattern, err = p.parseTypedCaptures(name, pattern)
if err != nil {
return err
}
p.patterns[name] = pattern
}
}
return p.g.AddPatternsFromMap(p.patterns)
}
// parseTypedCaptures parses the capture modifiers, and then deletes the
// modifier from the line so that it is a valid "grok" pattern again.
// ie,
// %{NUMBER:bytes:int} => %{NUMBER:bytes} (stores %{NUMBER}->bytes->int)
// %{IPORHOST:clientip:tag} => %{IPORHOST:clientip} (stores %{IPORHOST}->clientip->tag)
func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) {
matches := modifierRe.FindAllStringSubmatch(pattern, -1)
// grab the name of the capture pattern
patternName := "%{" + name + "}"
// create type map for this pattern
p.typeMap[patternName] = make(map[string]string)
p.tsMap[patternName] = make(map[string]string)
// boolean to verify that each pattern only has a single ts- data type.
hasTimestamp := false
for _, match := range matches {
// regex capture 1 is the name of the capture
// regex capture 2 is the modifier of the capture
if strings.HasPrefix(match[2], "ts") {
if hasTimestamp {
return pattern, fmt.Errorf("logparser pattern compile error: "+
"Each pattern is allowed only one named "+
"timestamp data type. pattern: %s", pattern)
}
if layout, ok := timeLayouts[match[2]]; ok {
// built-in time format
p.tsMap[patternName][match[1]] = layout
} else {
// custom time format
p.tsMap[patternName][match[1]] = strings.TrimSuffix(strings.TrimPrefix(match[2], `ts-"`), `"`)
}
hasTimestamp = true
} else {
p.typeMap[patternName][match[1]] = match[2]
}
// the modifier is not a valid part of a "grok" pattern, so remove it
// from the pattern.
pattern = strings.Replace(pattern, ":"+match[2]+"}", "}", 1)
}
return pattern, nil
}
// tsModder is a struct for incrementing identical timestamps of log lines
// so that we don't push identical metrics that will get overwritten.
type tsModder struct {
dupe time.Time
last time.Time
incr time.Duration
incrn time.Duration
rollover time.Duration
}
// tsMod increments the given timestamp one unit more from the previous
// duplicate timestamp.
// the increment unit is determined as the next smallest time unit below the
// most significant time unit of ts.
// ie, if the input is at ms precision, it will increment it 1µs.
func (t *tsModder) tsMod(ts time.Time) time.Time {
defer func() { t.last = ts }()
// don't mod the time if we don't need to
if t.last.IsZero() || ts.IsZero() {
t.incrn = 0
t.rollover = 0
return ts
}
if !ts.Equal(t.last) && !ts.Equal(t.dupe) {
t.incr = 0
t.incrn = 0
t.rollover = 0
return ts
}
if ts.Equal(t.last) {
t.dupe = ts
}
if ts.Equal(t.dupe) && t.incr == time.Duration(0) {
tsNano := ts.UnixNano()
d := int64(10)
counter := 1
for {
a := tsNano % d
if a > 0 {
break
}
d = d * 10
counter++
}
switch {
case counter <= 6:
t.incr = time.Nanosecond
case counter <= 9:
t.incr = time.Microsecond
case counter > 9:
t.incr = time.Millisecond
}
}
t.incrn++
if t.incrn == 999 && t.incr > time.Nanosecond {
t.rollover = t.incr * t.incrn
t.incrn = 1
t.incr = t.incr / 1000
if t.incr < time.Nanosecond {
t.incr = time.Nanosecond
}
}
return ts.Add(t.incr*t.incrn + t.rollover)
}

View File

@@ -1,19 +0,0 @@
package grok
import (
"log"
"testing"
"github.com/stretchr/testify/assert"
)
func TestGrokParse(t *testing.T) {
parser := Parser{
Measurement: "t_met",
Patterns: []string{"%{COMMON_LOG_FORMAT}"},
}
parser.Compile()
metrics, err := parser.Parse([]byte(`127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326`))
log.Printf("metric_tags: %v, metric_fields: %v", metrics[0].Tags(), metrics[0].Fields())
assert.NoError(t, err)
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/collectd" "github.com/influxdata/telegraf/plugins/parsers/collectd"
"github.com/influxdata/telegraf/plugins/parsers/dropwizard" "github.com/influxdata/telegraf/plugins/parsers/dropwizard"
"github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/nagios"
@@ -88,13 +87,6 @@ type Config struct {
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values // an optional map containing tag names as keys and json paths to retrieve the tag values from as values
// used if TagsPath is empty or doesn't return any tags // used if TagsPath is empty or doesn't return any tags
DropwizardTagPathsMap map[string]string DropwizardTagPathsMap map[string]string
//grok patterns
Patterns []string
NamedPatterns []string
CustomPatterns string
CustomPatternFiles []string
TimeZone string
} }
// NewParser returns a Parser interface based on the given config. // NewParser returns a Parser interface based on the given config.
@@ -128,39 +120,12 @@ func NewParser(config *Config) (Parser, error) {
config.DefaultTags, config.DefaultTags,
config.Separator, config.Separator,
config.Templates) config.Templates)
case "grok":
parser, err = NewGrokParser(
config.MetricName,
config.Patterns,
config.NamedPatterns,
config.CustomPatterns,
config.CustomPatternFiles,
config.TimeZone)
default: default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat) err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
} }
return parser, err return parser, err
} }
func NewGrokParser(metricName string,
patterns []string,
nPatterns []string,
cPatterns string,
cPatternFiles []string, tZone string) (Parser, error) {
parser := grok.Parser{
Measurement: metricName,
Patterns: patterns,
NamedPatterns: nPatterns,
CustomPatterns: cPatterns,
CustomPatternFiles: cPatternFiles,
Timezone: tZone,
}
parser.Compile()
return &parser, nil
}
func NewJSONParser( func NewJSONParser(
metricName string, metricName string,
tagKeys []string, tagKeys []string,

View File

@@ -1,104 +0,0 @@
# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000
## For failed writes, telegraf will cache metric_buffer_limit metrics for each
## output, and will flush this buffer on a successful write. Oldest metrics
## are dropped first when this buffer fills.
## This buffer only fills when writes fail to output plugin(s).
metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Default flushing interval for all outputs. You shouldn't set this below
## interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## By default or when set to "0s", precision will be set to the same
## timestamp order as the collection interval, with the maximum being 1s.
## ie, when interval = "10s", precision will be "1s"
## when interval = "250ms", precision will be "1ms"
## Precision will NOT be used for service inputs. It is up to each individual
## service input to set the timestamp at the appropriate precision.
## Valid time units are "ns", "us" (or "µs"), "ms", "s".
precision = ""
## Logging configuration:
## Run telegraf with debug log messages.
debug = false
## Run telegraf in quiet mode (error log messages only).
quiet = false
## Specify the log file name. The empty string means to log to stderr.
logfile = ""
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
# # reload and gather from file[s] on telegraf's interval
[[inputs.reader]]
# ## These accept standard unix glob matching rules, but with the addition of
# ## ** as a "super asterisk". ie:
# ## /var/log/**.log -> recursively find all .log files in /var/log
# ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
# ## /var/log/apache.log -> only tail the apache log file
files = ["/Users/maxu/go/src/github.com/influxdata/telegraf/plugins/inputs/logparser/grok/testdata/**.log"]
#
# ## The dataformat to be read from files
# ## Each data format has its own unique set of configuration options, read
# ## more about them here:
# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "grok"
#
patterns = ["%{TEST_LOG_B}","%{TEST_LOG_A}"]
#
# ## Name of the outputted measurement name.
name_override = "grok_reader"
#
# ## Full path(s) to custom pattern files.
custom_pattern_files = ["/Users/maxu/go/src/github.com/influxdata/telegraf/plugins/inputs/logparser/grok/testdata/test-patterns"]
#
# ## Custom patterns can also be defined here. Put one pattern per line.
# custom_patterns = '''
# '''
#
# ## Timezone allows you to provide an override for timestamps that
# ## don't already include an offset
# ## e.g. 04/06/2016 12:41:45 data one two 5.43µs
# ##
# ## Default: "" which renders UTC
# ## Options are as follows:
# ## 1. Local -- interpret based on machine localtime
# ## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
# ## 3. UTC -- or blank/unspecified, will return timestamp in UTC
# timezone = "Canada/Eastern"