mysql: do not use plugin-wide maps to avoid data race

closes #1236
This commit is contained in:
Cameron Sparr 2016-05-23 09:33:32 +01:00
parent 3c5c3b98df
commit 54fcc41496
1 changed files with 186 additions and 174 deletions

View File

@ -32,6 +32,10 @@ type Mysql struct {
GatherFileEventsStats bool `toml:"gather_file_events_stats"`
GatherPerfEventsStatements bool `toml:"gather_perf_events_statements"`
IntervalSlow string `toml:"interval_slow"`
mappings []*mapping
generalThreadStates map[string]uint32
stateStatusMappings map[string]string
}
var sampleConfig = `
@ -145,7 +149,9 @@ type mapping struct {
inExport string
}
var mappings = []*mapping{
func NewMysql() *Mysql {
out := &Mysql{}
out.mappings = []*mapping{
{
onServer: "Aborted_",
inExport: "aborted_",
@ -306,11 +312,9 @@ var mappings = []*mapping{
onServer: "Threadpool_",
inExport: "threadpool_",
},
}
}
var (
// status counter
generalThreadStates = map[string]uint32{
out.generalThreadStates = map[string]uint32{
"after create": uint32(0),
"altering table": uint32(0),
"analyzing": uint32(0),
@ -363,8 +367,8 @@ var (
"writing to net": uint32(0),
"other": uint32(0),
}
// plaintext statuses
stateStatusMappings = map[string]string{
out.stateStatusMappings = map[string]string{
"user sleep": "idle",
"creating index": "altering table",
"committing alter table to storage engine": "altering table",
@ -385,7 +389,9 @@ var (
"deleting from main table": "deleting",
"deleting from reference tables": "deleting",
}
)
return out
}
func dsnAddTimeout(dsn string) (string, error) {
@ -825,7 +831,7 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
var found bool
// iterate over mappings and gather metrics that is provided on mapping
for _, mapped := range mappings {
for _, mapped := range m.mappings {
if strings.HasPrefix(name, mapped.onServer) {
// convert numeric values to integer
i, _ := strconv.Atoi(string(val.([]byte)))
@ -927,9 +933,9 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
}
// mapping of state with its counts
stateCounts := make(map[string]uint32, len(generalThreadStates))
stateCounts := make(map[string]uint32, len(m.generalThreadStates))
// set map with keys and default values
for k, v := range generalThreadStates {
for k, v := range m.generalThreadStates {
stateCounts[k] = v
}
@ -939,7 +945,8 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
return err
}
// each state has its mapping
foundState := findThreadState(command, state)
foundState := findThreadState(m.generalThreadStates,
m.stateStatusMappings, command, state)
// count each state
stateCounts[foundState] += count
}
@ -1510,7 +1517,12 @@ func parseValue(value sql.RawBytes) (float64, bool) {
}
// findThreadState can be used to find thread state by command and plain state
func findThreadState(rawCommand, rawState string) string {
func findThreadState(
generalThreadStates map[string]uint32,
stateStatusMappings map[string]string,
rawCommand string,
rawState string,
) string {
var (
// replace '_' symbol with space
command = strings.Replace(strings.ToLower(rawCommand), "_", " ", -1)
@ -1560,6 +1572,6 @@ func copyTags(in map[string]string) map[string]string {
func init() {
inputs.Add("mysql", func() telegraf.Input {
return &Mysql{}
return NewMysql()
})
}