diff --git a/Makefile b/Makefile index 60555765e..02c9ca13f 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,11 @@ build: prepare "-X main.Version=$(VERSION)" \ ./cmd/telegraf/telegraf.go +dev: prepare + $(GOBIN)/godep go build -race -o telegraf -ldflags \ + "-X main.Version=$(VERSION)" \ + ./cmd/telegraf/telegraf.go + build-linux-bins: prepare GOARCH=amd64 GOOS=linux $(GOBIN)/godep go build -o telegraf_linux_amd64 \ -ldflags "-X main.Version=$(VERSION)" \ diff --git a/accumulator.go b/accumulator.go index 6c1962ac4..28d31b75e 100644 --- a/accumulator.go +++ b/accumulator.go @@ -24,6 +24,47 @@ type BatchPoints struct { Config *ConfiguredPlugin } +// deepcopy returns a deep copy of the BatchPoints object. This is primarily so +// we can do multithreaded output flushing (see Agent.flush) +func (bp *BatchPoints) deepcopy() *BatchPoints { + bp.mu.Lock() + defer bp.mu.Unlock() + + var bpc BatchPoints + bpc.Time = bp.Time + bpc.Precision = bp.Precision + + bpc.Tags = make(map[string]string) + for k, v := range bp.Tags { + bpc.Tags[k] = v + } + + var pts []client.Point + for _, pt := range bp.Points { + var ptc client.Point + + ptc.Measurement = pt.Measurement + ptc.Time = pt.Time + ptc.Precision = pt.Precision + ptc.Raw = pt.Raw + + ptc.Tags = make(map[string]string) + ptc.Fields = make(map[string]interface{}) + + for k, v := range pt.Tags { + ptc.Tags[k] = v + } + + for k, v := range pt.Fields { + ptc.Fields[k] = v + } + pts = append(pts, ptc) + } + + bpc.Points = pts + return &bpc +} + // Add adds a measurement func (bp *BatchPoints) Add( measurement string, diff --git a/agent.go b/agent.go index c1c9fcf14..5af5696e4 100644 --- a/agent.go +++ b/agent.go @@ -303,10 +303,14 @@ func (a *Agent) flush(bp *BatchPoints) error { for _, o := range a.outputs { wg.Add(1) + + // Copy BatchPoints + bpc := bp.deepcopy() + go func(ro *runningOutput) { defer wg.Done() // Log all output errors: - if err := ro.output.Write(bp.BatchPoints); err != nil { + if err := ro.output.Write(bpc.BatchPoints); err != nil { log.Printf("Error in output [%s]: %s", ro.name, err) outerr = errors.New("Error encountered flushing outputs") }