Add metric_version option to mysql input (#3954)

This commit is contained in:
Daniel Nelson 2018-04-02 13:10:43 -07:00 committed by GitHub
parent 64b239663c
commit 82448a9dd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 435 additions and 33 deletions

View File

@ -2,15 +2,16 @@
### Release Notes
- The `mysql` input plugin has been updated to convert values to the
correct data type. This may cause a `field type error` when inserting into
InfluxDB due the change of types. It is recommended to drop the `mysql`,
`mysql_variables`, and `mysql_innodb`:
```
DROP MEASUREMENT mysql
DROP MEASUREMENT mysql_variables
DROP MEASUREMENT mysql_innodb
```
- The `mysql` input plugin has been updated fix a number of type convertion
issues. This may cause a `field type error` when inserting into InfluxDB due
the change of types.
To address this we have introduced a new `metric_version` option to control
enabling the new format. For in depth recommendations on upgrading please
reference the [mysql plugin documentation](./plugins/inputs/mysql/README.md#metric-version).
It is encouraged to migrate to the new model when possible as the old version
is deprecated and will be removed in a future version.
- The `postgresql` plugins now defaults to using a persistent connection to the database.
In environments where TCP connections are terminated the `max_lifetime`
@ -26,7 +27,8 @@
is set. It is encouraged to enable this option when possible as the old
ordering is deprecated.
- The `httpjson` is now deprecated, please migrate to the new `http` input.
- The new `http` input configured with `data_format = "json"` can perform the
same task as the, now deprecated, `httpjson` input.
### New Inputs

View File

@ -1,4 +1,4 @@
# MySQL Input plugin
# MySQL Input Plugin
This plugin gathers the statistic data from MySQL server
@ -18,9 +18,9 @@ This plugin gathers the statistic data from MySQL server
* File events statistics
* Table schema statistics
## Configuration
### Configuration
```
```toml
# Read metrics from one or many mysql servers
[[inputs.mysql]]
## specify servers via a url matching:
@ -88,7 +88,90 @@ This plugin gathers the statistic data from MySQL server
ssl_key = "/etc/telegraf/key.pem"
```
## Measurements & Fields
#### Metric Version
When `metric_version = 2`, a variety of field type issues are corrected as well
as naming inconsistencies. If you have existing data on the original version
enabling this feature will cause a `field type error` when inserted into
InfluxDB due to the change of types. For this reason, you should keep the
`metric_version` unset until you are ready to migrate to the new format.
If preserving your old data is not required you may wish to drop conflicting
measurements:
```
DROP SERIES from mysql
DROP SERIES from mysql_variables
DROP SERIES from mysql_innodb
```
Otherwise, migration can be performed using the following steps:
1. Duplicate your `mysql` plugin configuration and add a `name_suffix` and
`metric_version = 2`, this will result in collection using both the old and new
style concurrently:
```toml
[[inputs.mysql]]
servers = ["tcp(127.0.0.1:3306)/"]
[[inputs.mysql]]
name_override = "_2"
metric_version = 2
servers = ["tcp(127.0.0.1:3306)/"]
```
2. Upgrade all affected Telegraf clients to version >=1.6.
New measurements will be created with the `name_suffix`, for example::
- `mysql_v2`
- `mysql_variables_v2`
3. Update charts, alerts, and other supporting code to the new format.
4. You can now remove the old `mysql` plugin configuration and remove old
measurements.
If you wish to remove the `name_suffix` you may use Kapacitor to copy the
historical data to the default name. Do this only after retiring the old
measurement name.
1. Use the techinique described above to write to multiple locations:
```toml
[[inputs.mysql]]
servers = ["tcp(127.0.0.1:3306)/"]
metric_version = 2
[[inputs.mysql]]
name_override = "_2"
metric_version = 2
servers = ["tcp(127.0.0.1:3306)/"]
```
2. Create a TICKScript to copy the historical data:
```
dbrp "telegraf"."autogen"
batch
|query('''
SELECT * FROM "telegraf"."autogen"."mysql_v2"
''')
.period(5m)
.every(5m)
|influxDBOut()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('mysql')
```
3. Define a task for your script:
```sh
kapacitor define copy-measurement -tick copy-measurement.task
```
4. Run the task over the data you would like to migrate:
```sh
kapacitor replay-live batch -start 2018-03-30T20:00:00Z -stop 2018-04-01T12:00:00Z -rec-time -task copy-measurement
```
5. Verify copied data and repeat for other measurements.
### Metrics:
* Global statuses - all numeric and boolean values of `SHOW GLOBAL STATUSES`
* Global variables - all numeric and boolean values of `SHOW GLOBAL VARIABLES`
* Slave status - metrics from `SHOW SLAVE STATUS` the metrics are gathered when

View File

@ -13,6 +13,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/mysql/v1"
"github.com/go-sql-driver/mysql"
)
@ -40,6 +41,7 @@ type Mysql struct {
SSLCA string `toml:"ssl_ca"`
SSLCert string `toml:"ssl_cert"`
SSLKey string `toml:"ssl_key"`
MetricVersion int `toml:"metric_version"`
}
var sampleConfig = `
@ -52,6 +54,20 @@ var sampleConfig = `
#
## If no servers are specified, then localhost is used as the host.
servers = ["tcp(127.0.0.1:3306)/"]
## Selects the metric output format.
##
## This option exists to maintain backwards compatibility, if you have
## existing metrics do not set or change this value until you are ready to
## migrate to the new format.
##
## If you do not have existing metrics from this plugin set to the latest
## version.
##
## Telegraf >=1.6: metric_version = 2
## <1.6: metric_version = 1 (or unset)
metric_version = 2
## the limits for metrics form perf_events_statements
perf_events_statements_digest_text_limit = 120
perf_events_statements_limit = 250
@ -541,7 +557,7 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
fields[key] = string(val)
tags[key] = string(val)
}
if value, ok := parseValue(val); ok {
if value, ok := m.parseValue(val); ok {
fields[key] = value
}
// Send 20 fields at a time
@ -593,7 +609,7 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
// range over columns, and try to parse values
for i, col := range cols {
col = strings.ToLower(col)
if value, ok := parseValue(*vals[i].(*sql.RawBytes)); ok {
if value, ok := m.parseValue(*vals[i].(*sql.RawBytes)); ok {
fields["slave_"+col] = value
}
}
@ -662,10 +678,75 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
return err
}
key = strings.ToLower(key)
if m.MetricVersion < 2 {
var found bool
for _, mapped := range v1.Mappings {
if strings.HasPrefix(key, mapped.OnServer) {
// convert numeric values to integer
i, _ := strconv.Atoi(string(val))
fields[mapped.InExport+key[len(mapped.OnServer):]] = i
found = true
}
}
// Send 20 fields at a time
if len(fields) >= 20 {
acc.AddFields("mysql", fields, tags)
fields = make(map[string]interface{})
}
if found {
continue
}
if value, ok := parseValue(val); ok {
fields[key] = value
// search for specific values
switch key {
case "Queries":
i, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
} else {
fields["queries"] = i
}
case "Questions":
i, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
} else {
fields["questions"] = i
}
case "Slow_queries":
i, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
} else {
fields["slow_queries"] = i
}
case "Connections":
i, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
} else {
fields["connections"] = i
}
case "Syncs":
i, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
} else {
fields["syncs"] = i
}
case "Uptime":
i, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
} else {
fields["uptime"] = i
}
}
} else {
key = strings.ToLower(key)
if value, ok := m.parseValue(val); ok {
fields[key] = value
}
}
// Send 20 fields at a time
@ -820,7 +901,11 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
for s, c := range stateCounts {
fields[newNamespace("threads", s)] = c
}
acc.AddFields("mysql_process_list", fields, tags)
if m.MetricVersion < 2 {
acc.AddFields("mysql_info_schema", fields, tags)
} else {
acc.AddFields("mysql_process_list", fields, tags)
}
return nil
}
@ -1033,7 +1118,11 @@ func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc tel
fields["auto_increment_column"] = incValue
fields["auto_increment_column_max"] = maxInt
acc.AddFields("mysql_table_schema", fields, tags)
if m.MetricVersion < 2 {
acc.AddFields("mysql_info_schema", fields, tags)
} else {
acc.AddFields("mysql_table_schema", fields, tags)
}
}
return nil
}
@ -1059,7 +1148,7 @@ func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, serv string, acc telegraf.Accumu
return err
}
key = strings.ToLower(key)
if value, ok := parseValue(val); ok {
if value, ok := m.parseValue(val); ok {
fields[key] = value
}
// Send 20 fields at a time
@ -1430,17 +1519,37 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula
tags["schema"] = tableSchema
tags["table"] = tableName
acc.AddFields("mysql_table_schema",
map[string]interface{}{"rows": tableRows}, tags)
if m.MetricVersion < 2 {
acc.AddFields(newNamespace("info_schema", "table_rows"),
map[string]interface{}{"value": tableRows}, tags)
acc.AddFields("mysql_table_schema",
map[string]interface{}{"data_length": dataLength}, tags)
dlTags := copyTags(tags)
dlTags["component"] = "data_length"
acc.AddFields(newNamespace("info_schema", "table_size", "data_length"),
map[string]interface{}{"value": dataLength}, dlTags)
acc.AddFields("mysql_table_schema",
map[string]interface{}{"index_length": indexLength}, tags)
ilTags := copyTags(tags)
ilTags["component"] = "index_length"
acc.AddFields(newNamespace("info_schema", "table_size", "index_length"),
map[string]interface{}{"value": indexLength}, ilTags)
acc.AddFields("mysql_table_schema",
map[string]interface{}{"data_free": dataFree}, tags)
dfTags := copyTags(tags)
dfTags["component"] = "data_free"
acc.AddFields(newNamespace("info_schema", "table_size", "data_free"),
map[string]interface{}{"value": dataFree}, dfTags)
} else {
acc.AddFields("mysql_table_schema",
map[string]interface{}{"rows": tableRows}, tags)
acc.AddFields("mysql_table_schema",
map[string]interface{}{"data_length": dataLength}, tags)
acc.AddFields("mysql_table_schema",
map[string]interface{}{"index_length": indexLength}, tags)
acc.AddFields("mysql_table_schema",
map[string]interface{}{"data_free": dataFree}, tags)
}
versionTags := copyTags(tags)
versionTags["type"] = tableType
@ -1448,13 +1557,26 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula
versionTags["row_format"] = rowFormat
versionTags["create_options"] = createOptions
acc.AddFields("mysql_table_schema_version",
map[string]interface{}{"table_version": version}, versionTags)
if m.MetricVersion < 2 {
acc.AddFields(newNamespace("info_schema", "table_version"),
map[string]interface{}{"value": version}, versionTags)
} else {
acc.AddFields("mysql_table_schema_version",
map[string]interface{}{"table_version": version}, versionTags)
}
}
}
return nil
}
func (m *Mysql) parseValue(value sql.RawBytes) (interface{}, bool) {
if m.MetricVersion < 2 {
return v1.ParseValue(value)
} else {
return parseValue(value)
}
}
// parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1
func parseValue(value sql.RawBytes) (interface{}, bool) {
if bytes.EqualFold(value, []byte("YES")) || bytes.Compare(value, []byte("ON")) == 0 {

View File

@ -0,0 +1,195 @@
package v1
import (
"bytes"
"database/sql"
"strconv"
)
type Mapping struct {
OnServer string
InExport string
}
var Mappings = []*Mapping{
{
OnServer: "Aborted_",
InExport: "aborted_",
},
{
OnServer: "Bytes_",
InExport: "bytes_",
},
{
OnServer: "Com_",
InExport: "commands_",
},
{
OnServer: "Created_",
InExport: "created_",
},
{
OnServer: "Handler_",
InExport: "handler_",
},
{
OnServer: "Innodb_",
InExport: "innodb_",
},
{
OnServer: "Key_",
InExport: "key_",
},
{
OnServer: "Open_",
InExport: "open_",
},
{
OnServer: "Opened_",
InExport: "opened_",
},
{
OnServer: "Qcache_",
InExport: "qcache_",
},
{
OnServer: "Table_",
InExport: "table_",
},
{
OnServer: "Tokudb_",
InExport: "tokudb_",
},
{
OnServer: "Threads_",
InExport: "threads_",
},
{
OnServer: "Access_",
InExport: "access_",
},
{
OnServer: "Aria__",
InExport: "aria_",
},
{
OnServer: "Binlog__",
InExport: "binlog_",
},
{
OnServer: "Busy_",
InExport: "busy_",
},
{
OnServer: "Connection_",
InExport: "connection_",
},
{
OnServer: "Delayed_",
InExport: "delayed_",
},
{
OnServer: "Empty_",
InExport: "empty_",
},
{
OnServer: "Executed_",
InExport: "executed_",
},
{
OnServer: "Executed_",
InExport: "executed_",
},
{
OnServer: "Feature_",
InExport: "feature_",
},
{
OnServer: "Flush_",
InExport: "flush_",
},
{
OnServer: "Last_",
InExport: "last_",
},
{
OnServer: "Master_",
InExport: "master_",
},
{
OnServer: "Max_",
InExport: "max_",
},
{
OnServer: "Memory_",
InExport: "memory_",
},
{
OnServer: "Not_",
InExport: "not_",
},
{
OnServer: "Performance_",
InExport: "performance_",
},
{
OnServer: "Prepared_",
InExport: "prepared_",
},
{
OnServer: "Rows_",
InExport: "rows_",
},
{
OnServer: "Rpl_",
InExport: "rpl_",
},
{
OnServer: "Select_",
InExport: "select_",
},
{
OnServer: "Slave_",
InExport: "slave_",
},
{
OnServer: "Slow_",
InExport: "slow_",
},
{
OnServer: "Sort_",
InExport: "sort_",
},
{
OnServer: "Subquery_",
InExport: "subquery_",
},
{
OnServer: "Tc_",
InExport: "tc_",
},
{
OnServer: "Threadpool_",
InExport: "threadpool_",
},
{
OnServer: "wsrep_",
InExport: "wsrep_",
},
{
OnServer: "Uptime_",
InExport: "uptime_",
},
}
func ParseValue(value sql.RawBytes) (float64, bool) {
if bytes.Compare(value, []byte("Yes")) == 0 || bytes.Compare(value, []byte("ON")) == 0 {
return 1, true
}
if bytes.Compare(value, []byte("No")) == 0 || bytes.Compare(value, []byte("OFF")) == 0 {
return 0, true
}
n, err := strconv.ParseFloat(string(value), 64)
return n, err == nil
}