From 54fcc41496c05756bdb046ced1b5d74420d61c8e Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 23 May 2016 09:33:32 +0100 Subject: [PATCH] mysql: do not use plugin-wide maps to avoid data race closes #1236 --- plugins/inputs/mysql/mysql.go | 360 ++++++++++++++++++---------------- 1 file changed, 186 insertions(+), 174 deletions(-) diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index b8ff3945a..ca6012038 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -32,6 +32,10 @@ type Mysql struct { GatherFileEventsStats bool `toml:"gather_file_events_stats"` GatherPerfEventsStatements bool `toml:"gather_perf_events_statements"` IntervalSlow string `toml:"interval_slow"` + + mappings []*mapping + generalThreadStates map[string]uint32 + stateStatusMappings map[string]string } var sampleConfig = ` @@ -145,172 +149,172 @@ type mapping struct { inExport string } -var mappings = []*mapping{ - { - onServer: "Aborted_", - inExport: "aborted_", - }, - { - onServer: "Bytes_", - inExport: "bytes_", - }, - { - onServer: "Com_", - inExport: "commands_", - }, - { - onServer: "Created_", - inExport: "created_", - }, - { - onServer: "Handler_", - inExport: "handler_", - }, - { - onServer: "Innodb_", - inExport: "innodb_", - }, - { - onServer: "Key_", - inExport: "key_", - }, - { - onServer: "Open_", - inExport: "open_", - }, - { - onServer: "Opened_", - inExport: "opened_", - }, - { - onServer: "Qcache_", - inExport: "qcache_", - }, - { - onServer: "Table_", - inExport: "table_", - }, - { - onServer: "Tokudb_", - inExport: "tokudb_", - }, - { - onServer: "Threads_", - inExport: "threads_", - }, - { - onServer: "Access_", - inExport: "access_", - }, - { - onServer: "Aria__", - inExport: "aria_", - }, - { - onServer: "Binlog__", - inExport: "binlog_", - }, - { - onServer: "Busy_", - inExport: "busy_", - }, - { - onServer: "Connection_", - inExport: "connection_", - }, - { - onServer: "Delayed_", - inExport: "delayed_", - }, - { - onServer: "Empty_", - inExport: "empty_", - }, - { - onServer: "Executed_", - inExport: "executed_", - }, - { - onServer: "Executed_", - inExport: "executed_", - }, - { - onServer: "Feature_", - inExport: "feature_", - }, - { - onServer: "Flush_", - inExport: "flush_", - }, - { - onServer: "Last_", - inExport: "last_", - }, - { - onServer: "Master_", - inExport: "master_", - }, - { - onServer: "Max_", - inExport: "max_", - }, - { - onServer: "Memory_", - inExport: "memory_", - }, - { - onServer: "Not_", - inExport: "not_", - }, - { - onServer: "Performance_", - inExport: "performance_", - }, - { - onServer: "Prepared_", - inExport: "prepared_", - }, - { - onServer: "Rows_", - inExport: "rows_", - }, - { - onServer: "Rpl_", - inExport: "rpl_", - }, - { - onServer: "Select_", - inExport: "select_", - }, - { - onServer: "Slave_", - inExport: "slave_", - }, - { - onServer: "Slow_", - inExport: "slow_", - }, - { - onServer: "Sort_", - inExport: "sort_", - }, - { - onServer: "Subquery_", - inExport: "subquery_", - }, - { - onServer: "Tc_", - inExport: "tc_", - }, - { - onServer: "Threadpool_", - inExport: "threadpool_", - }, -} +func NewMysql() *Mysql { + out := &Mysql{} + out.mappings = []*mapping{ + { + onServer: "Aborted_", + inExport: "aborted_", + }, + { + onServer: "Bytes_", + inExport: "bytes_", + }, + { + onServer: "Com_", + inExport: "commands_", + }, + { + onServer: "Created_", + inExport: "created_", + }, + { + onServer: "Handler_", + inExport: "handler_", + }, + { + onServer: "Innodb_", + inExport: "innodb_", + }, + { + onServer: "Key_", + inExport: "key_", + }, + { + onServer: "Open_", + inExport: "open_", + }, + { + onServer: "Opened_", + inExport: "opened_", + }, + { + onServer: "Qcache_", + inExport: "qcache_", + }, + { + onServer: "Table_", + inExport: "table_", + }, + { + onServer: "Tokudb_", + inExport: "tokudb_", + }, + { + onServer: "Threads_", + inExport: "threads_", + }, + { + onServer: "Access_", + inExport: "access_", + }, + { + onServer: "Aria__", + inExport: "aria_", + }, + { + onServer: "Binlog__", + inExport: "binlog_", + }, + { + onServer: "Busy_", + inExport: "busy_", + }, + { + onServer: "Connection_", + inExport: "connection_", + }, + { + onServer: "Delayed_", + inExport: "delayed_", + }, + { + onServer: "Empty_", + inExport: "empty_", + }, + { + onServer: "Executed_", + inExport: "executed_", + }, + { + onServer: "Executed_", + inExport: "executed_", + }, + { + onServer: "Feature_", + inExport: "feature_", + }, + { + onServer: "Flush_", + inExport: "flush_", + }, + { + onServer: "Last_", + inExport: "last_", + }, + { + onServer: "Master_", + inExport: "master_", + }, + { + onServer: "Max_", + inExport: "max_", + }, + { + onServer: "Memory_", + inExport: "memory_", + }, + { + onServer: "Not_", + inExport: "not_", + }, + { + onServer: "Performance_", + inExport: "performance_", + }, + { + onServer: "Prepared_", + inExport: "prepared_", + }, + { + onServer: "Rows_", + inExport: "rows_", + }, + { + onServer: "Rpl_", + inExport: "rpl_", + }, + { + onServer: "Select_", + inExport: "select_", + }, + { + onServer: "Slave_", + inExport: "slave_", + }, + { + onServer: "Slow_", + inExport: "slow_", + }, + { + onServer: "Sort_", + inExport: "sort_", + }, + { + onServer: "Subquery_", + inExport: "subquery_", + }, + { + onServer: "Tc_", + inExport: "tc_", + }, + { + onServer: "Threadpool_", + inExport: "threadpool_", + }, + } -var ( - // status counter - generalThreadStates = map[string]uint32{ + out.generalThreadStates = map[string]uint32{ "after create": uint32(0), "altering table": uint32(0), "analyzing": uint32(0), @@ -363,8 +367,8 @@ var ( "writing to net": uint32(0), "other": uint32(0), } - // plaintext statuses - stateStatusMappings = map[string]string{ + + out.stateStatusMappings = map[string]string{ "user sleep": "idle", "creating index": "altering table", "committing alter table to storage engine": "altering table", @@ -385,7 +389,9 @@ var ( "deleting from main table": "deleting", "deleting from reference tables": "deleting", } -) + + return out +} func dsnAddTimeout(dsn string) (string, error) { @@ -825,7 +831,7 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum var found bool // iterate over mappings and gather metrics that is provided on mapping - for _, mapped := range mappings { + for _, mapped := range m.mappings { if strings.HasPrefix(name, mapped.onServer) { // convert numeric values to integer i, _ := strconv.Atoi(string(val.([]byte))) @@ -927,9 +933,9 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf. } // mapping of state with its counts - stateCounts := make(map[string]uint32, len(generalThreadStates)) + stateCounts := make(map[string]uint32, len(m.generalThreadStates)) // set map with keys and default values - for k, v := range generalThreadStates { + for k, v := range m.generalThreadStates { stateCounts[k] = v } @@ -939,7 +945,8 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf. return err } // each state has its mapping - foundState := findThreadState(command, state) + foundState := findThreadState(m.generalThreadStates, + m.stateStatusMappings, command, state) // count each state stateCounts[foundState] += count } @@ -1510,7 +1517,12 @@ func parseValue(value sql.RawBytes) (float64, bool) { } // findThreadState can be used to find thread state by command and plain state -func findThreadState(rawCommand, rawState string) string { +func findThreadState( + generalThreadStates map[string]uint32, + stateStatusMappings map[string]string, + rawCommand string, + rawState string, +) string { var ( // replace '_' symbol with space command = strings.Replace(strings.ToLower(rawCommand), "_", " ", -1) @@ -1560,6 +1572,6 @@ func copyTags(in map[string]string) map[string]string { func init() { inputs.Add("mysql", func() telegraf.Input { - return &Mysql{} + return NewMysql() }) }