Added userstats to mysql input plugin (#2137)
* Added GatherUserStatistics, row Uptime in gatherGlobalStatuses, and version fields & tags * Updated README file * pulling in latest from master * ran go fmt to fix formatting * fix unreachable code * few fixes * cleaning up and applying suggestions from sparrc
This commit is contained in:
parent
c4a708751a
commit
338f01a8da
|
@ -7,6 +7,7 @@ This plugin gathers the statistic data from MySQL server
|
||||||
* Slave statuses
|
* Slave statuses
|
||||||
* Binlog size
|
* Binlog size
|
||||||
* Process list
|
* Process list
|
||||||
|
* User Statistics
|
||||||
* Info schema auto increment columns
|
* Info schema auto increment columns
|
||||||
* Table I/O waits
|
* Table I/O waits
|
||||||
* Index I/O waits
|
* Index I/O waits
|
||||||
|
@ -44,6 +45,9 @@ This plugin gathers the statistic data from MySQL server
|
||||||
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
|
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
|
||||||
gather_process_list = true
|
gather_process_list = true
|
||||||
#
|
#
|
||||||
|
## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS
|
||||||
|
gather_user_statistics = true
|
||||||
|
#
|
||||||
## gather auto_increment columns and max values from information schema
|
## gather auto_increment columns and max values from information schema
|
||||||
gather_info_schema_auto_inc = true
|
gather_info_schema_auto_inc = true
|
||||||
#
|
#
|
||||||
|
@ -89,6 +93,30 @@ Requires to be turned on in configuration.
|
||||||
* binary_files_count(int, number)
|
* binary_files_count(int, number)
|
||||||
* Process list - connection metrics from processlist for each user. It has the following tags
|
* Process list - connection metrics from processlist for each user. It has the following tags
|
||||||
* connections(int, number)
|
* connections(int, number)
|
||||||
|
* User Statistics - connection metrics from user statistics for each user. It has the following fields
|
||||||
|
* access_denied
|
||||||
|
* binlog_bytes_written
|
||||||
|
* busy_time
|
||||||
|
* bytes_received
|
||||||
|
* bytes_sent
|
||||||
|
* commit_transactions
|
||||||
|
* concurrent_connections
|
||||||
|
* connected_time
|
||||||
|
* cpu_time
|
||||||
|
* denied_connections
|
||||||
|
* empty_queries
|
||||||
|
* hostlost_connections
|
||||||
|
* other_commands
|
||||||
|
* rollback_transactions
|
||||||
|
* rows_fetched
|
||||||
|
* rows_updated
|
||||||
|
* select_commands
|
||||||
|
* server
|
||||||
|
* table_rows_read
|
||||||
|
* total_connections
|
||||||
|
* total_ssl_connections
|
||||||
|
* update_commands
|
||||||
|
* user
|
||||||
* Perf Table IO waits - total count and time of I/O waits event for each table
|
* Perf Table IO waits - total count and time of I/O waits event for each table
|
||||||
and process. It has following fields:
|
and process. It has following fields:
|
||||||
* table_io_waits_total_fetch(float, number)
|
* table_io_waits_total_fetch(float, number)
|
||||||
|
@ -158,6 +186,8 @@ The unit of fields varies by the tags.
|
||||||
* server (the host name from which the metrics are gathered)
|
* server (the host name from which the metrics are gathered)
|
||||||
* Process list measurement has following tags
|
* Process list measurement has following tags
|
||||||
* user (username for whom the metrics are gathered)
|
* user (username for whom the metrics are gathered)
|
||||||
|
* User Statistics measurement has following tags
|
||||||
|
* user (username for whom the metrics are gathered)
|
||||||
* Perf table IO waits measurement has following tags
|
* Perf table IO waits measurement has following tags
|
||||||
* schema
|
* schema
|
||||||
* name (object name for event or process)
|
* name (object name for event or process)
|
||||||
|
|
|
@ -23,6 +23,7 @@ type Mysql struct {
|
||||||
PerfEventsStatementsTimeLimit int64 `toml:"perf_events_statemetns_time_limit"`
|
PerfEventsStatementsTimeLimit int64 `toml:"perf_events_statemetns_time_limit"`
|
||||||
TableSchemaDatabases []string `toml:"table_schema_databases"`
|
TableSchemaDatabases []string `toml:"table_schema_databases"`
|
||||||
GatherProcessList bool `toml:"gather_process_list"`
|
GatherProcessList bool `toml:"gather_process_list"`
|
||||||
|
GatherUserStatistics bool `toml:"gather_user_statistics"`
|
||||||
GatherInfoSchemaAutoInc bool `toml:"gather_info_schema_auto_inc"`
|
GatherInfoSchemaAutoInc bool `toml:"gather_info_schema_auto_inc"`
|
||||||
GatherSlaveStatus bool `toml:"gather_slave_status"`
|
GatherSlaveStatus bool `toml:"gather_slave_status"`
|
||||||
GatherBinaryLogs bool `toml:"gather_binary_logs"`
|
GatherBinaryLogs bool `toml:"gather_binary_logs"`
|
||||||
|
@ -60,6 +61,9 @@ var sampleConfig = `
|
||||||
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
|
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
|
||||||
gather_process_list = true
|
gather_process_list = true
|
||||||
#
|
#
|
||||||
|
## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS
|
||||||
|
gather_user_statistics = true
|
||||||
|
#
|
||||||
## gather auto_increment columns and max values from information schema
|
## gather auto_increment columns and max values from information schema
|
||||||
gather_info_schema_auto_inc = true
|
gather_info_schema_auto_inc = true
|
||||||
#
|
#
|
||||||
|
@ -415,6 +419,10 @@ const (
|
||||||
WHERE ID != connection_id()
|
WHERE ID != connection_id()
|
||||||
GROUP BY command,state
|
GROUP BY command,state
|
||||||
ORDER BY null`
|
ORDER BY null`
|
||||||
|
infoSchemaUserStatisticsQuery = `
|
||||||
|
SELECT *,count(*)
|
||||||
|
FROM information_schema.user_statistics
|
||||||
|
GROUP BY user`
|
||||||
infoSchemaAutoIncQuery = `
|
infoSchemaAutoIncQuery = `
|
||||||
SELECT table_schema, table_name, column_name, auto_increment,
|
SELECT table_schema, table_name, column_name, auto_increment,
|
||||||
CAST(pow(2, case data_type
|
CAST(pow(2, case data_type
|
||||||
|
@ -530,7 +538,6 @@ const (
|
||||||
table_name
|
table_name
|
||||||
FROM information_schema.tables
|
FROM information_schema.tables
|
||||||
WHERE table_schema = 'performance_schema' AND table_name = ?
|
WHERE table_schema = 'performance_schema' AND table_name = ?
|
||||||
|
|
||||||
`
|
`
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -582,6 +589,13 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if m.GatherUserStatistics {
|
||||||
|
err = m.GatherUserStatisticsStatuses(db, serv, acc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if m.GatherSlaveStatus {
|
if m.GatherSlaveStatus {
|
||||||
err = m.gatherSlaveStatuses(db, serv, acc)
|
err = m.gatherSlaveStatuses(db, serv, acc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -669,6 +683,11 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
key = strings.ToLower(key)
|
key = strings.ToLower(key)
|
||||||
|
// parse mysql version and put into field and tag
|
||||||
|
if strings.Contains(key, "version") {
|
||||||
|
fields[key] = string(val)
|
||||||
|
tags[key] = string(val)
|
||||||
|
}
|
||||||
// parse value, if it is numeric then save, otherwise ignore
|
// parse value, if it is numeric then save, otherwise ignore
|
||||||
if floatVal, ok := parseValue(val); ok {
|
if floatVal, ok := parseValue(val); ok {
|
||||||
fields[key] = floatVal
|
fields[key] = floatVal
|
||||||
|
@ -854,6 +873,12 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fields["syncs"] = i
|
fields["syncs"] = i
|
||||||
|
case "Uptime":
|
||||||
|
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fields["uptime"] = i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Send any remaining fields
|
// Send any remaining fields
|
||||||
|
@ -884,6 +909,74 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// gather connection metrics from user_statistics for each user
|
||||||
|
if m.GatherUserStatistics {
|
||||||
|
conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user")
|
||||||
|
|
||||||
|
for conn_rows.Next() {
|
||||||
|
var user string
|
||||||
|
var total_connections int64
|
||||||
|
var concurrent_connections int64
|
||||||
|
var connected_time int64
|
||||||
|
var busy_time int64
|
||||||
|
var cpu_time int64
|
||||||
|
var bytes_received int64
|
||||||
|
var bytes_sent int64
|
||||||
|
var binlog_bytes_written int64
|
||||||
|
var rows_fetched int64
|
||||||
|
var rows_updated int64
|
||||||
|
var table_rows_read int64
|
||||||
|
var select_commands int64
|
||||||
|
var update_commands int64
|
||||||
|
var other_commands int64
|
||||||
|
var commit_transactions int64
|
||||||
|
var rollback_transactions int64
|
||||||
|
var denied_connections int64
|
||||||
|
var lost_connections int64
|
||||||
|
var access_denied int64
|
||||||
|
var empty_queries int64
|
||||||
|
var total_ssl_connections int64
|
||||||
|
|
||||||
|
err = conn_rows.Scan(&user, &total_connections, &concurrent_connections,
|
||||||
|
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
|
||||||
|
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
|
||||||
|
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
|
||||||
|
&empty_queries, &total_ssl_connections,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := map[string]string{"server": servtag, "user": user}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"total_connections": total_connections,
|
||||||
|
"concurrent_connections": concurrent_connections,
|
||||||
|
"connected_time": connected_time,
|
||||||
|
"busy_time": busy_time,
|
||||||
|
"cpu_time": cpu_time,
|
||||||
|
"bytes_received": bytes_received,
|
||||||
|
"bytes_sent": bytes_sent,
|
||||||
|
"binlog_bytes_written": binlog_bytes_written,
|
||||||
|
"rows_fetched": rows_fetched,
|
||||||
|
"rows_updated": rows_updated,
|
||||||
|
"table_rows_read": table_rows_read,
|
||||||
|
"select_commands": select_commands,
|
||||||
|
"update_commands": update_commands,
|
||||||
|
"other_commands": other_commands,
|
||||||
|
"commit_transactions": commit_transactions,
|
||||||
|
"rollback_transactions": rollback_transactions,
|
||||||
|
"denied_connections": denied_connections,
|
||||||
|
"lost_connections": lost_connections,
|
||||||
|
"access_denied": access_denied,
|
||||||
|
"empty_queries": empty_queries,
|
||||||
|
"total_ssl_connections": total_ssl_connections,
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AddFields("mysql_user_stats", fields, tags)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -932,6 +1025,88 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GatherUserStatistics can be used to collect metrics on each running command
|
||||||
|
// and its state with its running count
|
||||||
|
func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
|
||||||
|
// run query
|
||||||
|
rows, err := db.Query(infoSchemaUserStatisticsQuery)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var (
|
||||||
|
user string
|
||||||
|
total_connections int64
|
||||||
|
concurrent_connections int64
|
||||||
|
connected_time int64
|
||||||
|
busy_time int64
|
||||||
|
cpu_time int64
|
||||||
|
bytes_received int64
|
||||||
|
bytes_sent int64
|
||||||
|
binlog_bytes_written int64
|
||||||
|
rows_fetched int64
|
||||||
|
rows_updated int64
|
||||||
|
table_rows_read int64
|
||||||
|
select_commands int64
|
||||||
|
update_commands int64
|
||||||
|
other_commands int64
|
||||||
|
commit_transactions int64
|
||||||
|
rollback_transactions int64
|
||||||
|
denied_connections int64
|
||||||
|
lost_connections int64
|
||||||
|
access_denied int64
|
||||||
|
empty_queries int64
|
||||||
|
total_ssl_connections int64
|
||||||
|
count uint32
|
||||||
|
)
|
||||||
|
|
||||||
|
var servtag string
|
||||||
|
servtag, err = parseDSN(serv)
|
||||||
|
if err != nil {
|
||||||
|
servtag = "localhost"
|
||||||
|
}
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
err = rows.Scan(&user, &total_connections, &concurrent_connections,
|
||||||
|
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
|
||||||
|
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
|
||||||
|
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
|
||||||
|
&empty_queries, &total_ssl_connections, &count,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := map[string]string{"server": servtag, "user": user}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
|
||||||
|
"total_connections": total_connections,
|
||||||
|
"concurrent_connections": concurrent_connections,
|
||||||
|
"connected_time": connected_time,
|
||||||
|
"busy_time": busy_time,
|
||||||
|
"cpu_time": cpu_time,
|
||||||
|
"bytes_received": bytes_received,
|
||||||
|
"bytes_sent": bytes_sent,
|
||||||
|
"binlog_bytes_written": binlog_bytes_written,
|
||||||
|
"rows_fetched": rows_fetched,
|
||||||
|
"rows_updated": rows_updated,
|
||||||
|
"table_rows_read": table_rows_read,
|
||||||
|
"select_commands": select_commands,
|
||||||
|
"update_commands": update_commands,
|
||||||
|
"other_commands": other_commands,
|
||||||
|
"commit_transactions": commit_transactions,
|
||||||
|
"rollback_transactions": rollback_transactions,
|
||||||
|
"denied_connections": denied_connections,
|
||||||
|
"lost_connections": lost_connections,
|
||||||
|
"access_denied": access_denied,
|
||||||
|
"empty_queries": empty_queries,
|
||||||
|
"total_ssl_connections": total_ssl_connections,
|
||||||
|
}
|
||||||
|
acc.AddFields("mysql_user_stats", fields, tags)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// gatherPerfTableIOWaits can be used to get total count and time
|
// gatherPerfTableIOWaits can be used to get total count and time
|
||||||
// of I/O wait event for each table and process
|
// of I/O wait event for each table and process
|
||||||
func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {
|
func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {
|
||||||
|
|
Loading…
Reference in New Issue