godep update influxdb to 0.9.3-rc1

This commit is contained in:
Cameron Sparr 2015-08-20 14:26:44 -06:00
parent 532d953b5a
commit f8c1e953d4
22 changed files with 3060 additions and 240 deletions

24
Godeps/Godeps.json generated
View File

@ -92,33 +92,33 @@
},
{
"ImportPath": "github.com/influxdb/influxdb/client",
"Comment": "v0.9.1-rc1-545-g8de66eb",
"Rev": "8de66eb37024cd6bd953662e5588253f0888874b"
"Comment": "v0.9.3-rc1",
"Rev": "f4077764b2bb2b03241452d88e9db321c62bb560"
},
{
"ImportPath": "github.com/influxdb/influxdb/influxql",
"Comment": "v0.9.1-rc1-545-g8de66eb",
"Rev": "8de66eb37024cd6bd953662e5588253f0888874b"
"Comment": "v0.9.3-rc1",
"Rev": "f4077764b2bb2b03241452d88e9db321c62bb560"
},
{
"ImportPath": "github.com/influxdb/influxdb/meta",
"Comment": "v0.9.1-rc1-545-g8de66eb",
"Rev": "8de66eb37024cd6bd953662e5588253f0888874b"
"Comment": "v0.9.3-rc1",
"Rev": "f4077764b2bb2b03241452d88e9db321c62bb560"
},
{
"ImportPath": "github.com/influxdb/influxdb/snapshot",
"Comment": "v0.9.1-rc1-545-g8de66eb",
"Rev": "8de66eb37024cd6bd953662e5588253f0888874b"
"Comment": "v0.9.3-rc1",
"Rev": "f4077764b2bb2b03241452d88e9db321c62bb560"
},
{
"ImportPath": "github.com/influxdb/influxdb/toml",
"Comment": "v0.9.1-rc1-545-g8de66eb",
"Rev": "8de66eb37024cd6bd953662e5588253f0888874b"
"Comment": "v0.9.3-rc1",
"Rev": "f4077764b2bb2b03241452d88e9db321c62bb560"
},
{
"ImportPath": "github.com/influxdb/influxdb/tsdb",
"Comment": "v0.9.1-rc1-545-g8de66eb",
"Rev": "8de66eb37024cd6bd953662e5588253f0888874b"
"Comment": "v0.9.3-rc1",
"Rev": "f4077764b2bb2b03241452d88e9db321c62bb560"
},
{
"ImportPath": "github.com/lib/pq",

View File

@ -2469,6 +2469,8 @@ func timeExprValue(ref Expr, lit Expr) time.Time {
return lit.Val
case *DurationLiteral:
return time.Unix(0, int64(lit.Val)).UTC()
case *NumberLiteral:
return time.Unix(0, int64(lit.Val)).UTC()
}
}
return time.Time{}

View File

@ -483,6 +483,9 @@ func TestTimeRange(t *testing.T) {
{expr: `'2000-01-01 00:00:00' < time`, min: `2000-01-01T00:00:00.000000001Z`, max: `0001-01-01T00:00:00Z`},
{expr: `'2000-01-01 00:00:00' <= time`, min: `2000-01-01T00:00:00Z`, max: `0001-01-01T00:00:00Z`},
// number literal
{expr: `time < 10`, min: `0001-01-01T00:00:00Z`, max: `1970-01-01T00:00:00.000000009Z`},
// Equality
{expr: `time = '2000-01-01 00:00:00'`, min: `2000-01-01T00:00:00Z`, max: `2000-01-01T00:00:00Z`},

View File

@ -31,6 +31,11 @@ type Row struct {
Err error `json:"err,omitempty"`
}
// SameSeries returns true if r contains values for the same series as o.
func (r *Row) SameSeries(o *Row) bool {
return r.tagsHash() == o.tagsHash() && r.Name == o.Name
}
// tagsHash returns a hash of tag key/value pairs.
func (r *Row) tagsHash() uint64 {
h := fnv.New64a()

View File

@ -37,8 +37,8 @@ Fields are key-value metrics associated with the measurement. Every line must h
Field keys are always strings and follow the same syntactical rules as described above for tag keys and values. Field values can be one of four types. The first value written for a given field on a given measurement defines the type of that field for all series under that measurement.
* _integer_ - Numeric values that do not include a decimal. (e.g. 1, 345, 2015, -10)
* _float_ - Numeric values that include a decimal. (e.g. 1.0, -3.14, 6.0+e5). Note that all values _must_ have a decimal even if the decimal value is zero (1 is an _integer_, 1.0 is a _float_).
* _integer_ - Numeric values that do not include a decimal and are followed by a trailing i when inserted (e.g. 1i, 345i, 2015i, -10i). Note that all values must have a trailing i. If they do not they will be written as floats.
* _float_ - Numeric values tha are not followed by a trailing i. (e.g. 1, 1.0, -3.14, 6.0+e5, 10).
* _boolean_ - A value indicating true or false. Valid boolean strings are (t, T, true, TRUE, f, F, false, and FALSE).
* _string_ - A text value. All string values _must_ be surrounded in double-quotes `"`. If the string contains
a double-quote, it must be escaped with a backslash, e.g. `\"`.
@ -46,9 +46,15 @@ a double-quote, it must be escaped with a backslash, e.g. `\"`.
```
# integer value
cpu value=1
cpu value=1i
cpu value=1.1i # will result in a parse error
# float value
cpu_load value=1
cpu_load value=1.0
cpu_load value=1.2
# boolean value
@ -58,7 +64,7 @@ error fatal=true
event msg="logged out"
# multiple values
cpu load=10.0,alert=true,reason="value above maximum threshold"
cpu load=10,alert=true,reason="value above maximum threshold"
```
## Timestamp
@ -71,13 +77,13 @@ an integer epoch in microseconds, milliseconds, seconds, minutes or hours.
## Full Example
A full example is shown below.
```
cpu,host=server01,region=uswest value=1.0 1434055562000000000
cpu,host=server02,region=uswest value=3.0 1434055562000010000
cpu,host=server01,region=uswest value=1 1434055562000000000
cpu,host=server02,region=uswest value=3 1434055562000010000
```
In this example the first line shows a `measurement` of "cpu", there are two tags "host" and "region, the `value` is 1.0, and the `timestamp` is 1434055562000000000. Following this is a second line, also a point in the `measurement` "cpu" but belonging to a different "host".
```
cpu,host=server\ 01,region=uswest value=1.0,msg="all systems nominal"
cpu,host=server\ 01,region=us\,west value_int=1
cpu,host=server\ 01,region=uswest value=1,msg="all systems nominal"
cpu,host=server\ 01,region=us\,west value_int=1i
```
In these examples, the "host" is set to `server 01`. The field value associated with field key `msg` is double-quoted, as it is a string. The second example shows a region of `us,west` with the comma properly escaped. In the first example `value` is written as a floating point number. In the second, `value_int` is an integer.

View File

@ -16,13 +16,48 @@ const (
// DefaultWALPartitionFlushDelay is the sleep time between WAL partition flushes.
DefaultWALPartitionFlushDelay = 2 * time.Second
// tsdb/engine/wal configuration options
// DefaultReadySeriesSize of 32KB specifies when a series is eligible to be flushed
DefaultReadySeriesSize = 30 * 1024
// DefaultCompactionThreshold flush and compact a partition once this ratio of keys are over the flush size
DefaultCompactionThreshold = 0.5
// DefaultMaxSeriesSize specifies the size at which a series will be forced to flush
DefaultMaxSeriesSize = 1024 * 1024
// DefaultFlushColdInterval specifies how long after a partition has been cold
// for writes that a full flush and compaction are forced
DefaultFlushColdInterval = 5 * time.Minute
// DefaultParititionSizeThreshold specifies when a partition gets to this size in
// memory, we should slow down writes until it gets a chance to compact.
// This will force clients to get backpressure if they're writing too fast. We need
// this because the WAL can take writes much faster than the index. So eventually
// we'll need to create backpressure, otherwise we'll fill up the memory and die.
// This number multiplied by the parition count is roughly the max possible memory
// size for the in-memory WAL cache.
DefaultPartitionSizeThreshold = 20 * 1024 * 1024 // 20MB
)
type Config struct {
Dir string `toml:"dir"`
Dir string `toml:"dir"`
// WAL config options for b1 (introduced in 0.9.2)
MaxWALSize int `toml:"max-wal-size"`
WALFlushInterval toml.Duration `toml:"wal-flush-interval"`
WALPartitionFlushDelay toml.Duration `toml:"wal-partition-flush-delay"`
// WAL configuration options for bz1 (introduced in 0.9.3)
WALDir string `toml:"wal-dir"`
WALEnableLogging bool `toml:"wal-enable-logging"`
WALReadySeriesSize int `toml:"wal-ready-series-size"`
WALCompactionThreshold float64 `toml:"wal-compaction-threshold"`
WALMaxSeriesSize int `toml:"wal-max-series-size"`
WALFlushColdInterval toml.Duration `toml:"wal-flush-cold-interval"`
WALPartitionSizeThreshold uint64 `toml:"wal-partition-size-threshold"`
}
func NewConfig() Config {
@ -30,5 +65,12 @@ func NewConfig() Config {
MaxWALSize: DefaultMaxWALSize,
WALFlushInterval: toml.Duration(DefaultWALFlushInterval),
WALPartitionFlushDelay: toml.Duration(DefaultWALPartitionFlushDelay),
WALEnableLogging: true,
WALReadySeriesSize: DefaultReadySeriesSize,
WALCompactionThreshold: DefaultCompactionThreshold,
WALMaxSeriesSize: DefaultMaxSeriesSize,
WALFlushColdInterval: toml.Duration(DefaultFlushColdInterval),
WALPartitionSizeThreshold: DefaultPartitionSizeThreshold,
}
}

View File

@ -1,10 +1,12 @@
package tsdb
import (
"bytes"
"errors"
"fmt"
"io"
"os"
"sort"
"time"
"github.com/boltdb/bolt"
@ -16,7 +18,7 @@ var (
)
// DefaultEngine is the default engine used by the shard when initializing.
const DefaultEngine = "b1"
const DefaultEngine = "bz1"
// Engine represents a swappable storage engine for the shard.
type Engine interface {
@ -52,7 +54,7 @@ func RegisterEngine(name string, fn NewEngineFunc) {
func NewEngine(path string, options EngineOptions) (Engine, error) {
// Create a new engine
if _, err := os.Stat(path); os.IsNotExist(err) {
return newEngineFuncs[DefaultEngine](path, options), nil
return newEngineFuncs[options.EngineVersion](path, options), nil
}
// Only bolt-based backends are currently supported so open it and check the format.
@ -96,17 +98,22 @@ func NewEngine(path string, options EngineOptions) (Engine, error) {
// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
MaxWALSize int
WALFlushInterval time.Duration
WALPartitionFlushDelay time.Duration
Config Config
}
// NewEngineOptions returns the default options.
func NewEngineOptions() EngineOptions {
return EngineOptions{
EngineVersion: DefaultEngine,
MaxWALSize: DefaultMaxWALSize,
WALFlushInterval: DefaultWALFlushInterval,
WALPartitionFlushDelay: DefaultWALPartitionFlushDelay,
Config: NewConfig(),
}
}
@ -125,3 +132,29 @@ type Cursor interface {
Seek(seek []byte) (key, value []byte)
Next() (key, value []byte)
}
// DedupeEntries returns slices with unique keys (the first 8 bytes).
func DedupeEntries(a [][]byte) [][]byte {
// Convert to a map where the last slice is used.
m := make(map[string][]byte)
for _, b := range a {
m[string(b[0:8])] = b
}
// Convert map back to a slice of byte slices.
other := make([][]byte, 0, len(m))
for _, v := range m {
other = append(other, v)
}
// Sort entries.
sort.Sort(ByteSlices(other))
return other
}
type ByteSlices [][]byte
func (a ByteSlices) Len() int { return len(a) }
func (a ByteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }

View File

@ -194,7 +194,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields
meta = tx.Bucket([]byte("series"))
c = meta.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
series := &tsdb.Series{}
series := tsdb.NewSeries("", nil)
if err := series.UnmarshalBinary(v); err != nil {
return err
}

View File

@ -22,7 +22,7 @@ func TestEngine_WritePoints(t *testing.T) {
mf := &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)}
mf.CreateFieldIfNotExists("value", influxql.Float)
seriesToCreate := []*tsdb.SeriesCreate{
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("temperature"), nil))}},
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("temperature"), nil)), nil)},
}
// Parse point.

View File

@ -3,11 +3,13 @@ package bz1
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"math"
"path/filepath"
"sort"
"sync"
"time"
@ -15,6 +17,7 @@ import (
"github.com/boltdb/bolt"
"github.com/golang/snappy"
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/influxdb/tsdb/engine/wal"
)
var (
@ -22,8 +25,10 @@ var (
ErrSeriesExists = errors.New("series exists")
)
// Format is the file format name of this engine.
const Format = "bz1"
const (
// Format is the file format name of this engine.
Format = "bz1"
)
func init() {
tsdb.RegisterEngine(Format, NewEngine)
@ -44,21 +49,44 @@ type Engine struct {
db *bolt.DB
// Write-ahead log storage.
PointsWriter interface {
WritePoints(points []tsdb.Point) error
}
WAL WAL
// Size of uncompressed points to write to a block.
BlockSize int
}
// WAL represents a write ahead log that can be queried
type WAL interface {
WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error
DeleteSeries(keys []string) error
Cursor(key string) tsdb.Cursor
Open() error
Close() error
}
// NewEngine returns a new instance of Engine.
func NewEngine(path string, opt tsdb.EngineOptions) tsdb.Engine {
return &Engine{
// create the writer with a directory of the same name as the shard, but with the wal extension
w := wal.NewLog(filepath.Join(opt.Config.WALDir, filepath.Base(path)))
w.ReadySeriesSize = opt.Config.WALReadySeriesSize
w.FlushColdInterval = time.Duration(opt.Config.WALFlushColdInterval)
w.MaxSeriesSize = opt.Config.WALMaxSeriesSize
w.CompactionThreshold = opt.Config.WALCompactionThreshold
w.PartitionSizeThreshold = opt.Config.WALPartitionSizeThreshold
w.ReadySeriesSize = opt.Config.WALReadySeriesSize
e := &Engine{
path: path,
BlockSize: DefaultBlockSize,
WAL: w,
}
w.Index = e
return e
}
// Path returns the path the engine was opened with.
@ -79,8 +107,6 @@ func (e *Engine) Open() error {
// Initialize data file.
if err := e.db.Update(func(tx *bolt.Tx) error {
_, _ = tx.CreateBucketIfNotExists([]byte("series"))
_, _ = tx.CreateBucketIfNotExists([]byte("fields"))
_, _ = tx.CreateBucketIfNotExists([]byte("points"))
// Set file format, if not set yet.
@ -101,6 +127,7 @@ func (e *Engine) Open() error {
e.close()
return err
}
return nil
}
@ -108,7 +135,10 @@ func (e *Engine) Open() error {
func (e *Engine) Close() error {
e.mu.Lock()
defer e.mu.Unlock()
return e.close()
if err := e.close(); err != nil {
return err
}
return e.WAL.Close()
}
func (e *Engine) close() error {
@ -123,16 +153,14 @@ func (e *Engine) SetLogOutput(w io.Writer) {}
// LoadMetadataIndex loads the shard metadata into memory.
func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
return e.db.View(func(tx *bolt.Tx) error {
if err := e.db.View(func(tx *bolt.Tx) error {
// Load measurement metadata
meta := tx.Bucket([]byte("fields"))
c := meta.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
fields, err := e.readFields(tx)
if err != nil {
return err
}
for k, mf := range fields {
m := index.CreateMeasurementIndexIfNotExists(string(k))
mf := &tsdb.MeasurementFields{}
if err := mf.UnmarshalBinary(v); err != nil {
return err
}
for name, _ := range mf.Fields {
m.SetFieldName(name)
}
@ -141,97 +169,59 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields
}
// Load series metadata
meta = tx.Bucket([]byte("series"))
c = meta.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
series := &tsdb.Series{}
if err := series.UnmarshalBinary(v); err != nil {
return err
}
index.CreateSeriesIndexIfNotExists(tsdb.MeasurementFromSeriesKey(string(k)), series)
}
return nil
})
}
// WritePoints writes metadata and point data into the engine.
// Returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
// Write series & field metadata.
if err := e.db.Update(func(tx *bolt.Tx) error {
if err := e.writeSeries(tx, seriesToCreate); err != nil {
return fmt.Errorf("write series: %s", err)
}
if err := e.writeFields(tx, measurementFieldsToSave); err != nil {
return fmt.Errorf("write fields: %s", err)
series, err := e.readSeries(tx)
if err != nil {
return err
}
// Load the series into the in-memory index in sorted order to ensure
// it's always consistent for testing purposes
a := make([]string, 0, len(series))
for k, _ := range series {
a = append(a, k)
}
sort.Strings(a)
for _, key := range a {
s := series[key]
s.InitializeShards()
index.CreateSeriesIndexIfNotExists(tsdb.MeasurementFromSeriesKey(string(key)), s)
}
return nil
}); err != nil {
return err
}
// now flush the metadata that was in the WAL, but hand't yet been flushed
if err := e.WAL.LoadMetadataIndex(index, measurementFields); err != nil {
return err
}
// finally open the WAL up
return e.WAL.Open()
}
// WritePoints writes metadata and point data into the engine.
// Returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
// Write points to the WAL.
if err := e.PointsWriter.WritePoints(points); err != nil {
if err := e.WAL.WritePoints(points, measurementFieldsToSave, seriesToCreate); err != nil {
return fmt.Errorf("write points: %s", err)
}
return nil
}
// writeSeries writes a list of series to the metadata.
func (e *Engine) writeSeries(tx *bolt.Tx, a []*tsdb.SeriesCreate) error {
// Ignore if there are no series.
if len(a) == 0 {
return nil
}
// Marshal and insert each series into the metadata.
b := tx.Bucket([]byte("series"))
for _, sc := range a {
// Marshal series into bytes.
data, err := sc.Series.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal series: %s", err)
}
// Insert marshaled data into appropriate key.
if err := b.Put([]byte(sc.Series.Key), data); err != nil {
return fmt.Errorf("put: %s", err)
}
}
return nil
}
// writeFields writes a list of measurement fields to the metadata.
func (e *Engine) writeFields(tx *bolt.Tx, m map[string]*tsdb.MeasurementFields) error {
// Ignore if there are no fields to save.
if len(m) == 0 {
return nil
}
// Persist each measurement field in the map.
b := tx.Bucket([]byte("fields"))
for k, f := range m {
// Marshal field into bytes.
data, err := f.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal measurement field: %s", err)
}
// Insert marshaled data into key.
if err := b.Put([]byte(k), data); err != nil {
return fmt.Errorf("put: %s", err)
}
}
return nil
}
// WriteIndex writes marshaled points to the engine's underlying index.
func (e *Engine) WriteIndex(pointsByKey map[string][][]byte) error {
func (e *Engine) WriteIndex(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
return e.db.Update(func(tx *bolt.Tx) error {
// Write series & field metadata.
if err := e.writeNewSeries(tx, seriesToCreate); err != nil {
return fmt.Errorf("write series: %s", err)
}
if err := e.writeNewFields(tx, measurementFieldsToSave); err != nil {
return fmt.Errorf("write fields: %s", err)
}
for key, values := range pointsByKey {
if err := e.writeIndex(tx, key, values); err != nil {
return fmt.Errorf("write: key=%x, err=%s", key, err)
@ -241,6 +231,103 @@ func (e *Engine) WriteIndex(pointsByKey map[string][][]byte) error {
})
}
func (e *Engine) writeNewFields(tx *bolt.Tx, measurementFieldsToSave map[string]*tsdb.MeasurementFields) error {
if len(measurementFieldsToSave) == 0 {
return nil
}
// read in all the previously saved fields
fields, err := e.readFields(tx)
if err != nil {
return err
}
// add the new ones or overwrite old ones
for name, mf := range measurementFieldsToSave {
fields[name] = mf
}
return e.writeFields(tx, fields)
}
func (e *Engine) writeFields(tx *bolt.Tx, fields map[string]*tsdb.MeasurementFields) error {
// compress and save everything
data, err := json.Marshal(fields)
if err != nil {
return err
}
return tx.Bucket([]byte("meta")).Put([]byte("fields"), snappy.Encode(nil, data))
}
func (e *Engine) readFields(tx *bolt.Tx) (map[string]*tsdb.MeasurementFields, error) {
fields := make(map[string]*tsdb.MeasurementFields)
b := tx.Bucket([]byte("meta")).Get([]byte("fields"))
if b == nil {
return fields, nil
}
data, err := snappy.Decode(nil, b)
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &fields); err != nil {
return nil, err
}
return fields, nil
}
func (e *Engine) writeNewSeries(tx *bolt.Tx, seriesToCreate []*tsdb.SeriesCreate) error {
if len(seriesToCreate) == 0 {
return nil
}
// read in previously saved series
series, err := e.readSeries(tx)
if err != nil {
return err
}
// add new ones, compress and save
for _, s := range seriesToCreate {
series[s.Series.Key] = s.Series
}
return e.writeSeries(tx, series)
}
func (e *Engine) writeSeries(tx *bolt.Tx, series map[string]*tsdb.Series) error {
data, err := json.Marshal(series)
if err != nil {
return err
}
return tx.Bucket([]byte("meta")).Put([]byte("series"), snappy.Encode(nil, data))
}
func (e *Engine) readSeries(tx *bolt.Tx) (map[string]*tsdb.Series, error) {
series := make(map[string]*tsdb.Series)
b := tx.Bucket([]byte("meta")).Get([]byte("series"))
if b == nil {
return series, nil
}
data, err := snappy.Decode(nil, b)
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &series); err != nil {
return nil, err
}
return series, nil
}
// writeIndex writes a set of points for a single key.
func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
// Ignore if there are no points.
@ -256,8 +343,13 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
c := bkt.Cursor()
// Ensure the slice is sorted before retrieving the time range.
a = DedupeEntries(a)
sort.Sort(byteSlices(a))
a = tsdb.DedupeEntries(a)
// Convert the raw time and byte slices to entries with lengths
for i, p := range a {
timestamp := int64(btou64(p[0:8]))
a[i] = MarshalEntry(timestamp, p[8:])
}
// Determine time range of new data.
tmin, tmax := int64(btou64(a[0][0:8])), int64(btou64(a[len(a)-1][0:8]))
@ -270,6 +362,7 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
if err := e.writeBlocks(bkt, a); err != nil {
return fmt.Errorf("append blocks: %s", err)
}
return nil
}
// Generate map of inserted keys.
@ -311,7 +404,7 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
// Merge entries before rewriting.
a = append(existing, a...)
sort.Sort(byteSlices(a))
sort.Sort(tsdb.ByteSlices(a))
// Rewrite points to new blocks.
if err := e.writeBlocks(bkt, a); err != nil {
@ -325,9 +418,6 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
func (e *Engine) writeBlocks(bkt *bolt.Bucket, a [][]byte) error {
var block []byte
// Dedupe points by key.
a = DedupeEntries(a)
// Group points into blocks by size.
tmin, tmax := int64(math.MaxInt64), int64(math.MinInt64)
for i, p := range a {
@ -367,36 +457,56 @@ func (e *Engine) writeBlocks(bkt *bolt.Bucket, a [][]byte) error {
// DeleteSeries deletes the series from the engine.
func (e *Engine) DeleteSeries(keys []string) error {
// remove it from the WAL first
if err := e.WAL.DeleteSeries(keys); err != nil {
return err
}
return e.db.Update(func(tx *bolt.Tx) error {
series, err := e.readSeries(tx)
if err != nil {
return err
}
for _, k := range keys {
if err := tx.Bucket([]byte("series")).Delete([]byte(k)); err != nil {
return fmt.Errorf("delete series metadata: %s", err)
}
delete(series, k)
if err := tx.Bucket([]byte("points")).DeleteBucket([]byte(k)); err != nil && err != bolt.ErrBucketNotFound {
return fmt.Errorf("delete series data: %s", err)
}
}
return nil
return e.writeSeries(tx, series)
})
}
// DeleteMeasurement deletes a measurement and all related series.
func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error {
// remove from the WAL first so it won't get flushed after removing from Bolt
if err := e.WAL.DeleteSeries(seriesKeys); err != nil {
return err
}
return e.db.Update(func(tx *bolt.Tx) error {
if err := tx.Bucket([]byte("fields")).Delete([]byte(name)); err != nil {
fields, err := e.readFields(tx)
if err != nil {
return err
}
delete(fields, name)
if err := e.writeFields(tx, fields); err != nil {
return err
}
series, err := e.readSeries(tx)
if err != nil {
return err
}
for _, k := range seriesKeys {
if err := tx.Bucket([]byte("series")).Delete([]byte(k)); err != nil {
return fmt.Errorf("delete series metadata: %s", err)
}
delete(series, k)
if err := tx.Bucket([]byte("points")).DeleteBucket([]byte(k)); err != nil && err != bolt.ErrBucketNotFound {
return fmt.Errorf("delete series data: %s", err)
}
}
return nil
return e.writeSeries(tx, series)
})
}
@ -418,7 +528,7 @@ func (e *Engine) Begin(writable bool) (tsdb.Tx, error) {
if err != nil {
return nil, err
}
return &Tx{Tx: tx, engine: e}, nil
return &Tx{Tx: tx, engine: e, wal: e.WAL}, nil
}
// Stats returns internal statistics for the engine.
@ -439,19 +549,25 @@ type Stats struct {
type Tx struct {
*bolt.Tx
engine *Engine
wal WAL
}
// Cursor returns an iterator for a key.
func (tx *Tx) Cursor(key string) tsdb.Cursor {
walCursor := tx.wal.Cursor(key)
// Retrieve points bucket. Ignore if there is no bucket.
b := tx.Bucket([]byte("points")).Bucket([]byte(key))
if b == nil {
return nil
return walCursor
}
return &Cursor{
c := &Cursor{
cursor: b.Cursor(),
buf: make([]byte, DefaultBlockSize),
}
return tsdb.MultiCursor(walCursor, c)
}
// Cursor provides ordered iteration across a series.
@ -584,26 +700,6 @@ func SplitEntries(b []byte) [][]byte {
}
}
// DedupeEntries returns slices with unique keys (the first 8 bytes).
func DedupeEntries(a [][]byte) [][]byte {
// Convert to a map where the last slice is used.
m := make(map[string][]byte)
for _, b := range a {
m[string(b[0:8])] = b
}
// Convert map back to a slice of byte slices.
other := make([][]byte, 0, len(m))
for _, v := range m {
other = append(other, v)
}
// Sort entries.
sort.Sort(byteSlices(other))
return other
}
// entryHeaderSize is the number of bytes required for the header.
const entryHeaderSize = 8 + 4
@ -619,9 +715,3 @@ func u64tob(v uint64) []byte {
// btou64 converts an 8-byte slice into an uint64.
func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) }
type byteSlices [][]byte
func (a byteSlices) Len() int { return len(a) }
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }

View File

@ -23,15 +23,16 @@ func TestEngine_LoadMetadataIndex_Series(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
// Setup nop mock.
e.PointsWriter.WritePointsFn = func(a []tsdb.Point) error { return nil }
// Setup mock that writes the index
seriesToCreate := []*tsdb.SeriesCreate{
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server0"})), map[string]string{"host": "server0"})},
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server1"})), map[string]string{"host": "server1"})},
{Series: tsdb.NewSeries("series with spaces", nil)},
}
e.PointsWriter.WritePointsFn = func(a []tsdb.Point) error { return e.WriteIndex(nil, nil, seriesToCreate) }
// Write series metadata.
if err := e.WritePoints(nil, nil, []*tsdb.SeriesCreate{
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server0"})), Tags: map[string]string{"host": "server0"}}},
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server1"})), Tags: map[string]string{"host": "server1"}}},
{Series: &tsdb.Series{Key: "series with spaces"}},
}); err != nil {
if err := e.WritePoints(nil, nil, seriesToCreate); err != nil {
t.Fatal(err)
}
@ -62,17 +63,18 @@ func TestEngine_LoadMetadataIndex_Fields(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
// Setup nop mock.
e.PointsWriter.WritePointsFn = func(a []tsdb.Point) error { return nil }
// Write series metadata.
if err := e.WritePoints(nil, map[string]*tsdb.MeasurementFields{
// Setup mock that writes the index
fields := map[string]*tsdb.MeasurementFields{
"cpu": &tsdb.MeasurementFields{
Fields: map[string]*tsdb.Field{
"value": &tsdb.Field{ID: 0, Name: "value"},
},
},
}, nil); err != nil {
}
e.PointsWriter.WritePointsFn = func(a []tsdb.Point) error { return e.WriteIndex(nil, fields, nil) }
// Write series metadata.
if err := e.WritePoints(nil, fields, nil); err != nil {
t.Fatal(err)
}
@ -144,13 +146,13 @@ func TestEngine_WriteIndex_Append(t *testing.T) {
// Append points to index.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
bz1.MarshalEntry(1, []byte{0x10}),
bz1.MarshalEntry(2, []byte{0x20}),
append(u64tob(1), 0x10),
append(u64tob(2), 0x20),
},
"mem": [][]byte{
bz1.MarshalEntry(0, []byte{0x30}),
append(u64tob(0), 0x30),
},
}); err != nil {
}, nil, nil); err != nil {
t.Fatal(err)
}
@ -185,32 +187,32 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
// Write initial points to index.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
bz1.MarshalEntry(10, []byte{0x10}),
bz1.MarshalEntry(20, []byte{0x20}),
bz1.MarshalEntry(30, []byte{0x30}),
append(u64tob(10), 0x10),
append(u64tob(20), 0x20),
append(u64tob(30), 0x30),
},
}); err != nil {
}, nil, nil); err != nil {
t.Fatal(err)
}
// Write overlapping points to index.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
bz1.MarshalEntry(9, []byte{0x09}),
bz1.MarshalEntry(10, []byte{0xFF}),
bz1.MarshalEntry(25, []byte{0x25}),
bz1.MarshalEntry(31, []byte{0x31}),
append(u64tob(9), 0x09),
append(u64tob(10), 0xFF),
append(u64tob(25), 0x25),
append(u64tob(31), 0x31),
},
}); err != nil {
}, nil, nil); err != nil {
t.Fatal(err)
}
// Write overlapping points to index again.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
bz1.MarshalEntry(31, []byte{0xFF}),
append(u64tob(31), 0xFF),
},
}); err != nil {
}, nil, nil); err != nil {
t.Fatal(err)
}
@ -239,7 +241,7 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
func TestEngine_WriteIndex_NoKeys(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
if err := e.WriteIndex(nil); err != nil {
if err := e.WriteIndex(nil, nil, nil); err != nil {
t.Fatal(err)
}
}
@ -248,7 +250,7 @@ func TestEngine_WriteIndex_NoKeys(t *testing.T) {
func TestEngine_WriteIndex_NoPoints(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
if err := e.WriteIndex(map[string][][]byte{"cpu": nil}); err != nil {
if err := e.WriteIndex(map[string][][]byte{"cpu": nil}, nil, nil); err != nil {
t.Fatal(err)
}
}
@ -266,7 +268,7 @@ func TestEngine_WriteIndex_Quick(t *testing.T) {
// Write points to index in multiple sets.
for _, set := range sets {
if err := e.WriteIndex(map[string][][]byte(set)); err != nil {
if err := e.WriteIndex(map[string][][]byte(set), nil, nil); err != nil {
t.Fatal(err)
}
}
@ -291,15 +293,8 @@ func TestEngine_WriteIndex_Quick(t *testing.T) {
got = append(got, append(copyBytes(k), v...))
}
// Generate expected values.
// We need to remove the data length from the slice.
var exp [][]byte
for _, b := range points[key] {
exp = append(exp, append(copyBytes(b[0:8]), b[12:]...)) // remove data len
}
if !reflect.DeepEqual(got, exp) {
t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, exp)
if !reflect.DeepEqual(got, points[key]) {
t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, points[key])
}
}
@ -324,7 +319,7 @@ func NewEngine(opt tsdb.EngineOptions) *Engine {
e := &Engine{
Engine: bz1.NewEngine(f.Name(), opt).(*bz1.Engine),
}
e.Engine.PointsWriter = &e.PointsWriter
e.Engine.WAL = &e.PointsWriter
return e
}
@ -361,10 +356,30 @@ type EnginePointsWriter struct {
WritePointsFn func(points []tsdb.Point) error
}
func (w *EnginePointsWriter) WritePoints(points []tsdb.Point) error {
func (w *EnginePointsWriter) WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
return w.WritePointsFn(points)
}
func (w *EnginePointsWriter) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
return nil
}
func (w *EnginePointsWriter) DeleteSeries(keys []string) error { return nil }
func (w *EnginePointsWriter) Open() error { return nil }
func (w *EnginePointsWriter) Close() error { return nil }
func (w *EnginePointsWriter) Cursor(key string) tsdb.Cursor { return &Cursor{} }
// Cursor represents a mock that implements tsdb.Curosr.
type Cursor struct {
}
func (c *Cursor) Seek(key []byte) ([]byte, []byte) { return nil, nil }
func (c *Cursor) Next() ([]byte, []byte) { return nil, nil }
// Points represents a set of encoded points by key. Implements quick.Generator.
type Points map[string][][]byte
@ -411,7 +426,7 @@ func MergePoints(a []Points) Points {
// Dedupe points.
for key, values := range m {
m[key] = bz1.DedupeEntries(values)
m[key] = tsdb.DedupeEntries(values)
}
return m

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,906 @@
package wal
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"reflect"
"testing"
"time"
// "runtime"
// "sync"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/tsdb"
)
func TestWAL_WritePoints(t *testing.T) {
log := openTestWAL()
defer log.Close()
defer os.RemoveAll(log.path)
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
// test that we can write to two different series
p1 := parsePoint("cpu,host=A value=23.2 1", codec)
p2 := parsePoint("cpu,host=A value=25.3 4", codec)
p3 := parsePoint("cpu,host=B value=1.0 1", codec)
if err := log.WritePoints([]tsdb.Point{p1, p2, p3}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
verify := func() {
c := log.Cursor("cpu,host=A")
k, v := c.Seek(inttob(1))
// ensure the series are there and points are in order
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
}
k, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
}
k, v = c.Next()
if k != nil {
t.Fatalf("expected nil on last seek: %v %v", k, v)
}
c = log.Cursor("cpu,host=B")
k, v = c.Next()
if bytes.Compare(v, p3.Data()) != 0 {
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
}
}
verify()
// ensure that we can close and re-open the log with points still there
log.Close()
log.Open()
verify()
// ensure we can write new points into the series
p4 := parsePoint("cpu,host=A value=1.0 7", codec)
// ensure we can write an all new series
p5 := parsePoint("cpu,host=C value=1.4 2", codec)
// ensure we can write a point out of order and get it back
p6 := parsePoint("cpu,host=A value=1.3 2", codec)
// // ensure we can write to a new partition
// p7 := parsePoint("cpu,region=west value=2.2", codec)
if err := log.WritePoints([]tsdb.Point{p4, p5, p6}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
verify2 := func() {
c := log.Cursor("cpu,host=A")
k, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatalf("order wrong, expected p1, %v %v %v", v, k, p1.Data())
}
_, v = c.Next()
if bytes.Compare(v, p6.Data()) != 0 {
t.Fatal("order wrong, expected p6")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatal("order wrong, expected p6")
}
_, v = c.Next()
if bytes.Compare(v, p4.Data()) != 0 {
t.Fatal("order wrong, expected p6")
}
c = log.Cursor("cpu,host=C")
_, v = c.Next()
if bytes.Compare(v, p5.Data()) != 0 {
t.Fatal("order wrong, expected p6")
}
}
verify2()
log.Close()
log.Open()
verify2()
}
func TestWAL_CorruptDataLengthSize(t *testing.T) {
log := openTestWAL()
defer log.Close()
defer os.RemoveAll(log.path)
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
// test that we can write to two different series
p1 := parsePoint("cpu,host=A value=23.2 1", codec)
p2 := parsePoint("cpu,host=A value=25.3 4", codec)
if err := log.WritePoints([]tsdb.Point{p1, p2}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
verify := func() {
c := log.Cursor("cpu,host=A")
_, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatal("p1 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatal("p2 value wrong")
}
_, v = c.Next()
if v != nil {
t.Fatal("expected cursor to return nil")
}
}
verify()
// now write junk data and ensure that we can close, re-open and read
f := log.partitions[1].currentSegmentFile
f.Write([]byte{0x23, 0x12})
f.Sync()
log.Close()
log.Open()
verify()
// now write new data and ensure it's all good
p3 := parsePoint("cpu,host=A value=29.2 6", codec)
if err := log.WritePoints([]tsdb.Point{p3}, nil, nil); err != nil {
t.Fatalf("failed to write point: %s", err.Error())
}
verify = func() {
c := log.Cursor("cpu,host=A")
_, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatal("p1 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatal("p2 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p3.Data()) != 0 {
t.Fatal("p3 value wrong")
}
}
verify()
log.Close()
log.Open()
verify()
}
func TestWAL_CorruptDataBlock(t *testing.T) {
log := openTestWAL()
defer log.Close()
defer os.RemoveAll(log.path)
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
// test that we can write to two different series
p1 := parsePoint("cpu,host=A value=23.2 1", codec)
p2 := parsePoint("cpu,host=A value=25.3 4", codec)
if err := log.WritePoints([]tsdb.Point{p1, p2}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
verify := func() {
c := log.Cursor("cpu,host=A")
_, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatal("p1 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatal("p2 value wrong")
}
_, v = c.Next()
if v != nil {
t.Fatal("expected cursor to return nil")
}
}
verify()
// now write junk data and ensure that we can close, re-open and read
f := log.partitions[1].currentSegmentFile
f.Write(u64tob(23))
// now write a bunch of garbage
for i := 0; i < 1000; i++ {
f.Write([]byte{0x23, 0x78, 0x11, 0x33})
}
f.Sync()
log.Close()
log.Open()
verify()
// now write new data and ensure it's all good
p3 := parsePoint("cpu,host=A value=29.2 6", codec)
if err := log.WritePoints([]tsdb.Point{p3}, nil, nil); err != nil {
t.Fatalf("failed to write point: %s", err.Error())
}
verify = func() {
c := log.Cursor("cpu,host=A")
_, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
t.Fatal("p1 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
t.Fatal("p2 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p3.Data()) != 0 {
t.Fatal("p3 value wrong", p3.Data(), v)
}
}
verify()
log.Close()
log.Open()
verify()
}
// Ensure the wal flushes and compacts after a partition has enough series in
// it with enough data to flush
func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
log := openTestWAL()
log.partitionCount = 2
log.CompactionThreshold = 0.7
log.ReadySeriesSize = 1024
// set this high so that a flush doesn't automatically kick in and mess up our test
log.flushCheckInterval = time.Minute
defer log.Close()
defer os.RemoveAll(log.path)
points := make([]map[string][][]byte, 0)
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
points = append(points, pointsByKey)
return nil
}}
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
numSeries := 100
b := make([]byte, 70*5000)
for i := 1; i <= 100; i++ {
buf := bytes.NewBuffer(b)
for j := 1; j <= numSeries; j++ {
buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i))
}
// ensure that before we go over the threshold it isn't marked for flushing
if i < 50 {
// interleave data for some series that won't be ready to flush
buf.WriteString(fmt.Sprintf("cpu,host=A,region=useast1 value=%.3f %d\n", rand.Float64(), i))
buf.WriteString(fmt.Sprintf("cpu,host=A,region=useast3 value=%.3f %d\n", rand.Float64(), i))
// ensure that as a whole its not ready for flushing yet
if log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold) != noFlush {
t.Fatal("expected partition 1 to return false from shouldFlush")
}
}
// write the batch out
if err := log.WritePoints(parsePoints(buf.String(), codec), nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
buf = bytes.NewBuffer(b)
}
// ensure we have some data
c := log.Cursor("cpu,host=A,region=uswest23")
k, v := c.Next()
if btou64(k) != 1 {
t.Fatalf("expected timestamp of 1, but got %v %v", k, v)
}
// ensure it is marked as should flush because of the threshold
if log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold) != thresholdFlush {
t.Fatal("expected partition 1 to return true from shouldFlush")
}
if err := log.partitions[1].flushAndCompact(thresholdFlush); err != nil {
t.Fatalf("error flushing and compacting: %s", err.Error())
}
// should be nil
c = log.Cursor("cpu,host=A,region=uswest23")
k, v = c.Next()
if k != nil || v != nil {
t.Fatal("expected cache to be nil after flush: ", k, v)
}
c = log.Cursor("cpu,host=A,region=useast1")
k, v = c.Next()
if btou64(k) != 1 {
t.Fatal("expected cache to be there after flush and compact: ", k, v)
}
if len(points) == 0 {
t.Fatal("expected points to be flushed to index")
}
// now close and re-open the wal and ensure the compacted data is gone and other data is still there
log.Close()
log.Open()
c = log.Cursor("cpu,host=A,region=uswest23")
k, v = c.Next()
if k != nil || v != nil {
t.Fatal("expected cache to be nil after flush and re-open: ", k, v)
}
c = log.Cursor("cpu,host=A,region=useast1")
k, v = c.Next()
if btou64(k) != 1 {
t.Fatal("expected cache to be there after flush and compact: ", k, v)
}
}
// Ensure the wal forces a full flush after not having a write in a given interval of time
func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) {
log := openTestWAL()
log.partitionCount = 1
// set this low
log.flushCheckInterval = 10 * time.Millisecond
log.FlushColdInterval = 500 * time.Millisecond
defer log.Close()
defer os.RemoveAll(log.path)
points := make([]map[string][][]byte, 0)
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
points = append(points, pointsByKey)
return nil
}}
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
numSeries := 100
b := make([]byte, 70*5000)
for i := 1; i <= 10; i++ {
buf := bytes.NewBuffer(b)
for j := 1; j <= numSeries; j++ {
buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i))
}
// write the batch out
if err := log.WritePoints(parsePoints(buf.String(), codec), nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
buf = bytes.NewBuffer(b)
}
// ensure we have some data
c := log.Cursor("cpu,host=A,region=uswest10")
k, _ := c.Next()
if btou64(k) != 1 {
t.Fatalf("expected first data point but got one with key: %v", k)
}
time.Sleep(700 * time.Millisecond)
// ensure that as a whole its not ready for flushing yet
if f := log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold); f != noFlush {
t.Fatalf("expected partition 1 to return noFlush from shouldFlush %v", f)
}
// ensure that the partition is empty
if log.partitions[1].memorySize != 0 || len(log.partitions[1].cache) != 0 {
t.Fatal("expected partition to be empty")
}
// ensure that we didn't bother to open a new segment file
if log.partitions[1].currentSegmentFile != nil {
t.Fatal("expected partition to not have an open segment file")
}
}
func TestWAL_SeriesAndFieldsGetPersisted(t *testing.T) {
log := openTestWAL()
defer log.Close()
defer os.RemoveAll(log.path)
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
var measurementsToIndex map[string]*tsdb.MeasurementFields
var seriesToIndex []*tsdb.SeriesCreate
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
measurementsToIndex = measurementFieldsToSave
seriesToIndex = append(seriesToIndex, seriesToCreate...)
return nil
}}
// test that we can write to two different series
p1 := parsePoint("cpu,host=A value=23.2 1", codec)
p2 := parsePoint("cpu,host=A value=25.3 4", codec)
p3 := parsePoint("cpu,host=B value=1.0 1", codec)
seriesToCreate := []*tsdb.SeriesCreate{
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "A"})), map[string]string{"host": "A"})},
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "B"})), map[string]string{"host": "B"})},
}
measaurementsToCreate := map[string]*tsdb.MeasurementFields{
"cpu": {
Fields: map[string]*tsdb.Field{
"value": {ID: 1, Name: "value"},
},
},
}
if err := log.WritePoints([]tsdb.Point{p1, p2, p3}, measaurementsToCreate, seriesToCreate); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// now close it and see if loading the metadata index will populate the measurement and series info
log.Close()
idx := tsdb.NewDatabaseIndex()
mf := make(map[string]*tsdb.MeasurementFields)
if err := log.LoadMetadataIndex(idx, mf); err != nil {
t.Fatalf("error loading metadata index: %s", err.Error())
}
s := idx.Series("cpu,host=A")
if s == nil {
t.Fatal("expected to find series cpu,host=A in index")
}
s = idx.Series("cpu,host=B")
if s == nil {
t.Fatal("expected to find series cpu,host=B in index")
}
m := mf["cpu"]
if m == nil {
t.Fatal("expected to find measurement fields for cpu", mf)
}
if m.Fields["value"] == nil {
t.Fatal("expected to find field definition for 'value'")
}
// ensure that they were actually flushed to the index. do it this way because the annoying deepequal doessn't really work for these
for i, s := range seriesToCreate {
if seriesToIndex[i].Measurement != s.Measurement {
t.Fatal("expected measurement to be the same")
}
if seriesToIndex[i].Series.Key != s.Series.Key {
t.Fatal("expected series key to be the same")
}
if !reflect.DeepEqual(seriesToIndex[i].Series.Tags, s.Series.Tags) {
t.Fatal("expected series tags to be the same")
}
}
// ensure that the measurement fields were flushed to the index
for k, v := range measaurementsToCreate {
m := measurementsToIndex[k]
if m == nil {
t.Fatalf("measurement %s wasn't indexed", k)
}
if !reflect.DeepEqual(m.Fields, v.Fields) {
t.Fatal("measurement fields not equal")
}
}
// now open and close the log and try to reload the metadata index, which should now be empty
if err := log.Open(); err != nil {
t.Fatalf("error opening log: %s", err.Error())
}
if err := log.Close(); err != nil {
t.Fatalf("error closing log: %s", err.Error())
}
idx = tsdb.NewDatabaseIndex()
mf = make(map[string]*tsdb.MeasurementFields)
if err := log.LoadMetadataIndex(idx, mf); err != nil {
t.Fatalf("error loading metadata index: %s", err.Error())
}
if len(idx.Measurements()) != 0 || len(mf) != 0 {
t.Fatal("expected index and measurement fields to be empty")
}
}
func TestWAL_DeleteSeries(t *testing.T) {
log := openTestWAL()
defer log.Close()
defer os.RemoveAll(log.path)
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
var seriesToIndex []*tsdb.SeriesCreate
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
seriesToIndex = append(seriesToIndex, seriesToCreate...)
return nil
}}
seriesToCreate := []*tsdb.SeriesCreate{
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "A"})), map[string]string{"host": "A"})},
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "B"})), map[string]string{"host": "B"})},
}
// test that we can write to two different series
p1 := parsePoint("cpu,host=A value=23.2 1", codec)
p2 := parsePoint("cpu,host=B value=0.9 2", codec)
p3 := parsePoint("cpu,host=A value=25.3 4", codec)
p4 := parsePoint("cpu,host=B value=1.0 3", codec)
if err := log.WritePoints([]tsdb.Point{p1, p2, p3, p4}, nil, seriesToCreate); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// ensure data is there
c := log.Cursor("cpu,host=A")
if k, _ := c.Next(); btou64(k) != 1 {
t.Fatal("expected data point for cpu,host=A")
}
c = log.Cursor("cpu,host=B")
if k, _ := c.Next(); btou64(k) != 2 {
t.Fatal("expected data point for cpu,host=B")
}
// delete the series and ensure metadata was flushed and data is gone
if err := log.DeleteSeries([]string{"cpu,host=B"}); err != nil {
t.Fatalf("error deleting series: %s", err.Error())
}
// ensure data is there
c = log.Cursor("cpu,host=A")
if k, _ := c.Next(); btou64(k) != 1 {
t.Fatal("expected data point for cpu,host=A")
}
// ensure series is deleted
c = log.Cursor("cpu,host=B")
if k, _ := c.Next(); k != nil {
t.Fatal("expected no data for cpu,host=B")
}
// ensure that they were actually flushed to the index. do it this way because the annoying deepequal doessn't really work for these
for i, s := range seriesToCreate {
if seriesToIndex[i].Measurement != s.Measurement {
t.Fatal("expected measurement to be the same")
}
if seriesToIndex[i].Series.Key != s.Series.Key {
t.Fatal("expected series key to be the same")
}
if !reflect.DeepEqual(seriesToIndex[i].Series.Tags, s.Series.Tags) {
t.Fatal("expected series tags to be the same")
}
}
// close and re-open the WAL to ensure that the data didn't show back up
if err := log.Close(); err != nil {
t.Fatalf("error closing log: %s", err.Error())
}
if err := log.Open(); err != nil {
t.Fatalf("error opening log: %s", err.Error())
}
// ensure data is there
c = log.Cursor("cpu,host=A")
if k, _ := c.Next(); btou64(k) != 1 {
t.Fatal("expected data point for cpu,host=A")
}
// ensure series is deleted
c = log.Cursor("cpu,host=B")
if k, _ := c.Next(); k != nil {
t.Fatal("expected no data for cpu,host=B")
}
}
// Ensure a partial compaction can be recovered from.
func TestWAL_Compact_Recovery(t *testing.T) {
log := openTestWAL()
log.partitionCount = 1
log.CompactionThreshold = 0.7
log.ReadySeriesSize = 1024
log.flushCheckInterval = time.Minute
defer log.Close()
defer os.RemoveAll(log.path)
points := make([]map[string][][]byte, 0)
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
points = append(points, pointsByKey)
return nil
}}
if err := log.Open(); err != nil {
t.Fatalf("couldn't open wal: %s", err.Error())
}
// Retrieve partition.
p := log.partitions[1]
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
b := make([]byte, 70*5000)
for i := 1; i <= 100; i++ {
buf := bytes.NewBuffer(b)
for j := 1; j <= 1000; j++ {
buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i))
}
buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", rand.Int(), rand.Float64(), i))
// Write the batch out.
if err := log.WritePoints(parsePoints(buf.String(), codec), nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
}
// Mock second open call to fail.
p.os.OpenSegmentFile = func(name string, flag int, perm os.FileMode) (file *os.File, err error) {
if filepath.Base(name) == "01.000001.wal" {
return os.OpenFile(name, flag, perm)
}
return nil, errors.New("marker")
}
if err := p.flushAndCompact(thresholdFlush); err == nil || err.Error() != "marker" {
t.Fatalf("unexpected flush error: %s", err)
}
p.os.OpenSegmentFile = os.OpenFile
// Append second file to simulate partial write.
func() {
f, err := os.OpenFile(p.compactionFileName(), os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
t.Fatal(err)
}
defer f.Close()
// Append filename and partial data.
if err := p.writeCompactionEntry(f, "01.000002.wal", []*entry{{key: []byte("foo"), data: []byte("bar"), timestamp: 100}}); err != nil {
t.Fatal(err)
}
// Truncate by a few bytes.
if fi, err := f.Stat(); err != nil {
t.Fatal(err)
} else if err = f.Truncate(fi.Size() - 2); err != nil {
t.Fatal(err)
}
}()
// Now close and re-open the wal and ensure there are no errors.
log.Close()
if err := log.Open(); err != nil {
t.Fatalf("unexpected open error: %s", err)
}
}
// test that partitions get compacted and flushed when number of series hits compaction threshold
// test that partitions get compacted and flushed when a single series hits the compaction threshold
// test that writes slow down when the partition size threshold is hit
// func TestWAL_MultipleSegments(t *testing.T) {
// runtime.GOMAXPROCS(8)
// log := openTestWAL()
// defer log.Close()
// defer os.RemoveAll(log.path)
// log.PartitionSizeThreshold = 1024 * 1024 * 100
// flushCount := 0
// log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte) error {
// flushCount += 1
// fmt.Println("FLUSH: ", len(pointsByKey))
// return nil
// }}
// if err := log.Open(); err != nil {
// t.Fatalf("couldn't open wal: ", err.Error())
// }
// codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
// "value": {
// ID: uint8(1),
// Name: "value",
// Type: influxql.Float,
// },
// })
// startTime := time.Now()
// numSeries := 5000
// perPost := 5000
// b := make([]byte, 70*5000)
// totalPoints := 0
// for i := 1; i <= 10000; i++ {
// fmt.Println("WRITING: ", i*numSeries)
// n := 0
// buf := bytes.NewBuffer(b)
// var wg sync.WaitGroup
// for j := 1; j <= numSeries; j++ {
// totalPoints += 1
// n += 1
// buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i))
// if n >= perPost {
// go func(b string) {
// wg.Add(1)
// if err := log.WritePoints(parsePoints(b, codec)); err != nil {
// t.Fatalf("failed to write points: %s", err.Error())
// }
// wg.Done()
// }(buf.String())
// buf = bytes.NewBuffer(b)
// n = 0
// }
// }
// wg.Wait()
// }
// fmt.Println("PATH: ", log.path)
// dur := time.Now().Sub(startTime)
// fmt.Println("TIME TO WRITE: ", totalPoints, dur, float64(totalPoints)/dur.Seconds())
// fmt.Println("FLUSH COUNT: ", flushCount)
// for _, p := range log.partitions {
// fmt.Println("SIZE: ", p.memorySize/1024/1024)
// }
// max := 0
// for _, p := range log.partitions {
// for k, s := range p.cacheSizes {
// if s > max {
// fmt.Println(k, s)
// max = s
// }
// }
// }
// fmt.Println("CLOSING")
// log.Close()
// fmt.Println("TEST OPENING")
// startTime = time.Now()
// log.Open()
// fmt.Println("TIME TO OPEN: ", time.Now().Sub(startTime))
// for _, p := range log.partitions {
// fmt.Println("SIZE: ", p.memorySize)
// }
// c := log.Cursor("cpu,host=A,region=uswest10")
// k, v := c.Seek(inttob(23))
// fmt.Println("VALS: ", k, v)
// time.Sleep(time.Minute)
// }
type testIndexWriter struct {
fn func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
}
func (t *testIndexWriter) WriteIndex(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
return t.fn(pointsByKey, measurementFieldsToSave, seriesToCreate)
}
func openTestWAL() *Log {
dir, err := ioutil.TempDir("", "wal-test")
if err != nil {
panic("couldn't get temp dir")
}
return NewLog(dir)
}
func parsePoints(buf string, codec *tsdb.FieldCodec) []tsdb.Point {
points, err := tsdb.ParsePointsString(buf)
if err != nil {
panic(fmt.Sprintf("couldn't parse points: %s", err.Error()))
}
for _, p := range points {
b, err := codec.EncodeFields(p.Fields())
if err != nil {
panic(fmt.Sprintf("couldn't encode fields: %s", err.Error()))
}
p.SetData(b)
}
return points
}
func parsePoint(buf string, codec *tsdb.FieldCodec) tsdb.Point {
return parsePoints(buf, codec)[0]
}
func inttob(v int) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(v))
return b
}

View File

@ -5,6 +5,7 @@ import (
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"
"time"
@ -947,6 +948,8 @@ func testStore() *tsdb.Store {
path, _ := ioutil.TempDir("", "")
store := tsdb.NewStore(path)
store.EngineOptions.Config.WALDir = filepath.Join(path, "wal")
err := store.Open()
if err != nil {
panic(err)

View File

@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"strings"
"testing"
@ -426,7 +427,7 @@ func TestShardMapper_WriteAndSingleMapperAggregateQuery(t *testing.T) {
}
}
func TestShardMapper_LocalMapperTagSets(t *testing.T) {
func TestShardMapper_LocalMapperTagSetsFields(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
shard := mustCreateShard(tmpDir)
@ -451,50 +452,64 @@ func TestShardMapper_LocalMapperTagSets(t *testing.T) {
}
var tests = []struct {
stmt string
expected []string
stmt string
expectedFields []string
expectedTags []string
}{
{
stmt: `SELECT sum(value) FROM cpu`,
expected: []string{"cpu"},
stmt: `SELECT sum(value) FROM cpu`,
expectedFields: []string{"value"},
expectedTags: []string{"cpu"},
},
{
stmt: `SELECT sum(value) FROM cpu GROUP BY host`,
expected: []string{"cpu|host|serverA", "cpu|host|serverB"},
stmt: `SELECT sum(value) FROM cpu GROUP BY host`,
expectedFields: []string{"value"},
expectedTags: []string{"cpu|host|serverA", "cpu|host|serverB"},
},
{
stmt: `SELECT sum(value) FROM cpu GROUP BY region`,
expected: []string{"cpu|region|us-east"},
stmt: `SELECT sum(value) FROM cpu GROUP BY region`,
expectedFields: []string{"value"},
expectedTags: []string{"cpu|region|us-east"},
},
{
stmt: `SELECT sum(value) FROM cpu WHERE host='serverA'`,
expected: []string{"cpu"},
stmt: `SELECT sum(value) FROM cpu WHERE host='serverA'`,
expectedFields: []string{"value"},
expectedTags: []string{"cpu"},
},
{
stmt: `SELECT sum(value) FROM cpu WHERE host='serverB'`,
expected: []string{"cpu"},
stmt: `SELECT sum(value) FROM cpu WHERE host='serverB'`,
expectedFields: []string{"value"},
expectedTags: []string{"cpu"},
},
{
stmt: `SELECT sum(value) FROM cpu WHERE host='serverC'`,
expected: []string{},
stmt: `SELECT sum(value) FROM cpu WHERE host='serverC'`,
expectedFields: []string{"value"},
expectedTags: []string{},
},
}
for _, tt := range tests {
stmt := mustParseSelectStatement(tt.stmt)
mapper := openLocalMapperOrFail(t, shard, stmt)
got := mapper.TagSets()
if !reflect.DeepEqual(got, tt.expected) {
t.Errorf("test '%s'\n\tgot %s\n\texpected %s", tt.stmt, got, tt.expected)
fields := mapper.Fields()
if !reflect.DeepEqual(fields, tt.expectedFields) {
t.Errorf("test '%s'\n\tgot %s\n\texpected %s", tt.stmt, fields, tt.expectedFields)
}
tags := mapper.TagSets()
if !reflect.DeepEqual(tags, tt.expectedTags) {
t.Errorf("test '%s'\n\tgot %s\n\texpected %s", tt.stmt, tags, tt.expectedTags)
}
}
}
func mustCreateShard(dir string) *tsdb.Shard {
tmpShard := path.Join(dir, "shard")
index := tsdb.NewDatabaseIndex()
sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(dir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, opts)
if err := sh.Open(); err != nil {
panic(fmt.Sprintf("error opening shard: %s", err.Error()))
}

View File

@ -980,6 +980,16 @@ type Series struct {
id uint64
measurement *Measurement
shardIDs map[uint64]bool // shards that have this series defined
}
// NewSeries returns an initialized series struct
func NewSeries(key string, tags map[string]string) *Series {
return &Series{
Key: key,
Tags: tags,
shardIDs: make(map[uint64]bool),
}
}
// MarshalBinary encodes the object to a binary format.
@ -1008,6 +1018,10 @@ func (s *Series) UnmarshalBinary(buf []byte) error {
return nil
}
func (s *Series) InitializeShards() {
s.shardIDs = make(map[uint64]bool)
}
// match returns true if all tags match the series' tags.
func (s *Series) match(tags map[string]string) bool {
for k, v := range tags {

View File

@ -182,10 +182,7 @@ func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries {
for _, ts := range tagSets {
series = append(series, &TestSeries{
Measurement: m,
Series: &tsdb.Series{
Key: fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))),
Tags: ts,
},
Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), ts),
})
}
}

View File

@ -54,7 +54,9 @@ func TestWritePointsAndExecuteQuery(t *testing.T) {
}
store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
if err := store.Open(); err != nil {
t.Fatalf(err.Error())
}
@ -84,7 +86,9 @@ func TestWritePointsAndExecuteQuery_Update(t *testing.T) {
// Restart store.
store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
if err := store.Open(); err != nil {
t.Fatalf(err.Error())
}
@ -145,7 +149,9 @@ func TestDropSeriesStatement(t *testing.T) {
}
store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
store.Open()
executor.Store = store
@ -215,7 +221,9 @@ func TestDropMeasurementStatement(t *testing.T) {
validateDrop()
store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
store.Open()
executor.Store = store
validateDrop()
@ -279,7 +287,9 @@ func TestDropDatabase(t *testing.T) {
}
store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
store.Open()
executor.Store = store
executor.ShardMapper = &testShardMapper{store: store}
@ -344,6 +354,8 @@ func testStoreAndExecutor() (*tsdb.Store, *tsdb.QueryExecutor) {
path, _ := ioutil.TempDir("", "")
store := tsdb.NewStore(path)
store.EngineOptions.Config.WALDir = filepath.Join(path, "wal")
err := store.Open()
if err != nil {
panic(err)

View File

@ -40,6 +40,7 @@ type Shard struct {
db *bolt.DB // underlying data store
index *DatabaseIndex
path string
id uint64
engine Engine
options EngineOptions
@ -52,10 +53,11 @@ type Shard struct {
}
// NewShard returns a new initialized Shard
func NewShard(index *DatabaseIndex, path string, options EngineOptions) *Shard {
func NewShard(id uint64, index *DatabaseIndex, path string, options EngineOptions) *Shard {
return &Shard{
index: index,
path: path,
id: id,
options: options,
measurementFields: make(map[string]*MeasurementFields),
@ -327,8 +329,12 @@ func (s *Shard) validateSeriesAndFields(points []Point) ([]*SeriesCreate, []*Fie
for _, p := range points {
// see if the series should be added to the index
if ss := s.index.series[string(p.Key())]; ss == nil {
series := &Series{Key: string(p.Key()), Tags: p.Tags()}
series := NewSeries(string(p.Key()), p.Tags())
seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), series})
} else if !ss.shardIDs[s.id] {
// this is the first time this series is being written into this shard, persist it
ss.shardIDs[s.id] = true
seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), ss})
}
// see if the field definitions need to be saved to the shard

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/influxdb/tsdb/engine/b1"
)
func TestShardWriteAndIndex(t *testing.T) {
@ -19,7 +20,10 @@ func TestShardWriteAndIndex(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
index := tsdb.NewDatabaseIndex()
sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
}
@ -65,7 +69,7 @@ func TestShardWriteAndIndex(t *testing.T) {
sh.Close()
index = tsdb.NewDatabaseIndex()
sh = tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
sh = tsdb.NewShard(1, index, tmpShard, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
}
@ -86,7 +90,10 @@ func TestShardWriteAddNewField(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
index := tsdb.NewDatabaseIndex()
sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
}
@ -142,7 +149,8 @@ func TestShard_Autoflush(t *testing.T) {
defer os.RemoveAll(path)
// Open shard with a really low size threshold, high flush interval.
sh := tsdb.NewShard(tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
sh := tsdb.NewShard(1, tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
EngineVersion: b1.Format,
MaxWALSize: 1024, // 1KB
WALFlushInterval: 1 * time.Hour,
WALPartitionFlushDelay: 1 * time.Millisecond,
@ -181,7 +189,8 @@ func TestShard_Autoflush_FlushInterval(t *testing.T) {
defer os.RemoveAll(path)
// Open shard with a high size threshold, small time threshold.
sh := tsdb.NewShard(tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
sh := tsdb.NewShard(1, tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
EngineVersion: b1.Format,
MaxWALSize: 10 * 1024 * 1024, // 10MB
WALFlushInterval: 100 * time.Millisecond,
WALPartitionFlushDelay: 1 * time.Millisecond,
@ -263,7 +272,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
for n := 0; n < b.N; n++ {
tmpDir, _ := ioutil.TempDir("", "shard_test")
tmpShard := path.Join(tmpDir, "shard")
shard := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
shard := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
shard.Open()
b.StartTimer()
@ -298,7 +307,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
tmpDir, _ := ioutil.TempDir("", "")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
shard := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
shard := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
shard.Open()
defer shard.Close()
chunkedWrite(shard, points)

View File

@ -14,9 +14,12 @@ import (
)
func NewStore(path string) *Store {
opts := NewEngineOptions()
opts.Config = NewConfig()
return &Store{
path: path,
EngineOptions: NewEngineOptions(),
EngineOptions: opts,
Logger: log.New(os.Stderr, "[store] ", log.LstdFlags),
}
}
@ -82,7 +85,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
}
shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
shard := NewShard(db, shardPath, s.EngineOptions)
shard := NewShard(shardID, db, shardPath, s.EngineOptions)
if err := shard.Open(); err != nil {
return err
}
@ -236,7 +239,7 @@ func (s *Store) loadShards() error {
continue
}
shard := NewShard(s.databaseIndexes[db], path, s.EngineOptions)
shard := NewShard(shardID, s.databaseIndexes[db], path, s.EngineOptions)
err = shard.Open()
if err != nil {
return fmt.Errorf("failed to open shard %d: %s", shardID, err)

View File

@ -22,6 +22,7 @@ func TestStoreOpen(t *testing.T) {
}
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
@ -49,6 +50,7 @@ func TestStoreOpenShard(t *testing.T) {
}
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
@ -82,6 +84,7 @@ func TestStoreOpenShardCreateDelete(t *testing.T) {
}
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
@ -129,6 +132,7 @@ func TestStoreOpenNotDatabaseDir(t *testing.T) {
}
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
@ -159,6 +163,7 @@ func TestStoreOpenNotRPDir(t *testing.T) {
}
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
@ -195,6 +200,7 @@ func TestStoreOpenShardBadShardPath(t *testing.T) {
}
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
@ -213,6 +219,63 @@ func TestStoreOpenShardBadShardPath(t *testing.T) {
}
func TestStoreEnsureSeriesPersistedInNewShards(t *testing.T) {
dir, err := ioutil.TempDir("", "store_test")
if err != nil {
t.Fatalf("Store.Open() failed to create temp dir: %v", err)
}
defer os.RemoveAll(dir)
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
if err := s.CreateShard("foo", "default", 1); err != nil {
t.Fatalf("error creating shard: %v", err)
}
p, _ := tsdb.ParsePoints([]byte("cpu val=1"))
if err := s.WriteToShard(1, p); err != nil {
t.Fatalf("error writing to shard: %v", err)
}
if err := s.CreateShard("foo", "default", 2); err != nil {
t.Fatalf("error creating shard: %v", err)
}
if err := s.WriteToShard(2, p); err != nil {
t.Fatalf("error writing to shard: %v", err)
}
d := s.DatabaseIndex("foo")
if d == nil {
t.Fatal("expected to have database index for foo")
}
if d.Series("cpu") == nil {
t.Fatal("expected series cpu to be in the index")
}
// delete the shard, close the store and reopen it and confirm the measurement is still there
s.DeleteShard(1)
s.Close()
s = tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
d = s.DatabaseIndex("foo")
if d == nil {
t.Fatal("expected to have database index for foo")
}
if d.Series("cpu") == nil {
t.Fatal("expected series cpu to be in the index")
}
}
func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(b, 64, 5, 5, 1, 100) }
func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) {