From ab48331a1133a655d523ea704f7095c352c2973f Mon Sep 17 00:00:00 2001 From: akvlad Date: Wed, 4 Jun 2025 11:49:17 +0300 Subject: [PATCH 1/6] init --- json_index.go | 191 ++++++++++++++++++++++++++++++++++++--------- json_part_index.go | 122 ++++++++++++++++++++--------- redis_index.go | 20 ++++- types.go | 30 ++++++- 4 files changed, 284 insertions(+), 79 deletions(-) diff --git a/json_index.go b/json_index.go index 8f4f816..d1dbb88 100644 --- a/json_index.go +++ b/json_index.go @@ -11,27 +11,53 @@ import ( "time" ) +type jsonLayer struct { + Layer + Path string +} + +func layer2JsonLayer(layer Layer) jsonLayer { + path := "" + if strings.HasPrefix(layer.URL, "file://") { + path = strings.TrimPrefix(layer.URL, "file://") + } + return jsonLayer{ + Layer: layer, + Path: path, + } +} + type JSONIndex struct { root string database string table string lock sync.Mutex - parts map[string]*jsonPartIndex + parts map[string]map[string]*jsonPartIndex + layers []jsonLayer } -func NewJSONIndex(root string, database string, table string) TableIndex { +func NewJSONIndex(root string, database string, table string, layers []Layer) TableIndex { + var jLayers []jsonLayer + for _, layer := range layers { + jLayers = append(jLayers, layer2JsonLayer(layer)) + } return &JSONIndex{ root: root, database: database, table: table, - parts: map[string]*jsonPartIndex{}, + parts: map[string]map[string]*jsonPartIndex{}, + layers: jLayers, } } func (J *JSONIndex) GetMergePlan(layer string, iteration int) (*MergePlan, error) { J.lock.Lock() defer J.lock.Unlock() - for _, part := range J.parts { + parts, ok := J.parts[layer] + if !ok { + return nil, nil + } + for _, part := range parts { plan, err := part.GetMergePlan(layer, iteration) if err != nil { return nil, err @@ -50,8 +76,9 @@ func (J *JSONIndex) EndMerge(plan *MergePlan) error { } J.lock.Lock() defer J.lock.Unlock() + parts := J.parts[plan.Layer] dir := path.Dir(plan.From[0]) - part := J.parts[dir] + part := parts[dir] if part != nil { return part.EndMerge(plan) } @@ -67,6 +94,27 @@ func (J *JSONIndex) GetQuerier() TableQuerier { } func (J *JSONIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32] { + J.lock.Lock() + defer J.lock.Unlock() + addByLayer := make(map[string][]*IndexEntry) + rmByLayer := make(map[string][]*IndexEntry) + layers := make(map[string]bool) + for _, entry := range add { + addByLayer[entry.Layer] = append(addByLayer[entry.Layer], entry) + layers[entry.Layer] = true + } + for _, entry := range rm { + rmByLayer[entry.Layer] = append(rmByLayer[entry.Layer], entry) + layers[entry.Layer] = true + } + var promises []Promise[int32] + for l := range layers { + promises = append(promises, J.batchLayer(J.parts[l], addByLayer[l], rmByLayer[l])) + } + return NewWaitForAll[int32](promises) +} + +func (J *JSONIndex) batchLayer(parts map[string]*jsonPartIndex, add []*IndexEntry, rm []*IndexEntry) Promise[int32] { addByPath := make(map[string][]*IndexEntry) rmByPath := make(map[string][]*IndexEntry) paths := make(map[string]bool) @@ -81,56 +129,80 @@ func (J *JSONIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32] { paths[_path] = true } - J.lock.Lock() - defer J.lock.Unlock() - var promises []Promise[int32] for partPath := range paths { - idx, err := J.populate(partPath) - if err != nil { - //TODO: we should do something with the error - continue + idx := parts[partPath] + if idx == nil { + return Fulfilled[int32](fmt.Errorf("part \"%s\" not found", partPath), 0) } promises = append(promises, idx.Batch(addByPath[partPath], rmByPath[partPath])) } return NewWaitForAll[int32](promises) } -func (J *JSONIndex) populate(dir string) (*jsonPartIndex, error) { - idx := J.parts[dir] +func (J *JSONIndex) populate(dir string, layer string) (*jsonPartIndex, error) { + layerParts := J.parts[layer] + if layerParts == nil { + layerParts = make(map[string]*jsonPartIndex) + J.parts[layer] = layerParts + } + idx := layerParts[dir] + var _layer *jsonLayer + for _, l := range J.layers { + if l.Name == layer { + _layer = &l + break + } + } + if _layer == nil { + return nil, fmt.Errorf("layer \"%s\" not found", layer) + } + if _layer.Path == "" { + return nil, fmt.Errorf("layer path \"%s\" not supported", _layer.URL) + } + if idx != nil { return idx, nil } - idx, err := newJsonPartIndex(J.root, J.database, J.table, dir) + idx, err := newJsonPartIndex(jsonPartIdxOpts{ + rootPath: _layer.Path, + database: J.database, + table: J.table, + partPath: dir, + layers: J.layers, + layer: layer, + }) if err != nil { return nil, err } idx.Run() - J.parts[dir] = idx + layerParts[dir] = idx return idx, nil } -func (J *JSONIndex) Get(_path string) *IndexEntry { +func (J *JSONIndex) Get(layer string, _path string) *IndexEntry { dir := path.Dir(_path) J.lock.Lock() defer J.lock.Unlock() - idx, err := J.populate(dir) + idx, err := J.populate(layer, dir) if err != nil { return nil } - return idx.Get(_path) + return idx.Get(layer, _path) } func (J *JSONIndex) Run() { } func (J *JSONIndex) Stop() { - for _, idx := range J.parts { - idx.Stop() + for _, l := range J.parts { + for _, idx := range l { + idx.Stop() + } } } -func (J *JSONIndex) RmFromDropQueue(files []string) Promise[int32] { +func (J *JSONIndex) RmFromDropQueue(layer string, files []string) Promise[int32] { filesByPath := make(map[string][]string) for _, file := range files { _path := path.Dir(file) @@ -141,20 +213,24 @@ func (J *JSONIndex) RmFromDropQueue(files []string) Promise[int32] { var promises []Promise[int32] for partPath, files := range filesByPath { - idx, err := J.populate(partPath) + idx, err := J.populate(layer, partPath) if err != nil { //TODO: we should do something with the error continue } - promises = append(promises, idx.RmFromDropQueue(files)) + promises = append(promises, idx.RmFromDropQueue(layer, files)) } return NewWaitForAll[int32](promises) } -func (J *JSONIndex) GetDropQueue() []string { +func (J *JSONIndex) GetDropQueue(layer string) []string { var queue []string - for _, idx := range J.parts { - queue = append(queue, idx.GetDropQueue()...) + parts := J.parts[layer] + if parts == nil { + return nil + } + for _, idx := range parts { + queue = append(queue, idx.GetDropQueue(layer)...) } return queue } @@ -217,18 +293,59 @@ func (J *JSONIndex) Query(options QueryOptions) ([]*IndexEntry, error) { return nil, err } var entries []*IndexEntry - for _, hour := range hours { - idx, err := J.populate(path.Join( - fmt.Sprintf("date=%s", hour.Format("2006-01-02")), - fmt.Sprintf("hour=%02d", hour.Hour()))) - if err != nil { - return nil, err + for _, l := range J.layers { + if l.Path == "" { + continue } - _entries, err := idx.Query(options) - if err != nil { - return nil, err + for _, hour := range hours { + idx, err := J.populate(l.Name, path.Join( + fmt.Sprintf("date=%s", hour.Format("2006-01-02")), + fmt.Sprintf("hour=%02d", hour.Hour()))) + if err != nil { + return nil, err + } + _entries, err := idx.Query(options) + if err != nil { + return nil, err + } + entries = append(entries, _entries...) } - entries = append(entries, _entries...) } + return entries, nil } + +func (J *JSONIndex) GetMovePlan(layer string) *MovePlan { + J.lock.Lock() + defer J.lock.Unlock() + l := J.parts[layer] + if l == nil { + return nil + } + for _, p := range l { + mp := p.GetMovePlanner().GetMovePlan(layer) + if mp != nil { + return mp + } + } + return nil +} + +func (J *JSONIndex) EndMove(plan *MovePlan) error { + J.lock.Lock() + defer J.lock.Unlock() + l := J.parts[plan.LayerFrom] + if l == nil { + return fmt.Errorf("layer \"%s\" not found", plan.LayerFrom) + } + dir := path.Dir(plan.PathFrom) + part := l[dir] + if part != nil { + return part.EndMove(plan) + } + return nil +} + +func (J *JSONIndex) GetMovePlanner() TableMovePlanner { + return J +} diff --git a/json_part_index.go b/json_part_index.go index 5c9d903..f925cee 100644 --- a/json_part_index.go +++ b/json_part_index.go @@ -11,26 +11,32 @@ import ( "strings" "sync" "sync/atomic" + "time" ) type jsonIndexEntry struct { + IndexEntry Id uint32 `json:"id"` - Layer string `json:"layer"` - Path string `json:"path"` - SizeBytes int64 `json:"size_bytes"` - RowCount int64 `json:"row_count"` - ChunkTime int64 `json:"chunk_time"` - MinTime int64 `json:"min_time"` - MaxTime int64 `json:"max_time"` Range string `json:"range"` Type string `json:"type"` _marshalled string `json:"-"` } +type jsonPartIdxOpts struct { + rootPath string + database string + table string + partPath string + layers []jsonLayer + layer string +} + type jsonPartIndex struct { rootPath string database string table string + layer string + layers []jsonLayer idxPath string @@ -49,17 +55,20 @@ type jsonPartIndex struct { minTime int64 maxTime int64 filesInMerge map[string]bool + filesInMove map[string]bool } -func newJsonPartIndex(rootPath string, database string, table string, partPath string, -) (*jsonPartIndex, error) { +var _ TableIndex = &jsonPartIndex{} + +func newJsonPartIndex(opts jsonPartIdxOpts) (*jsonPartIndex, error) { res := &jsonPartIndex{ - rootPath: rootPath, - database: database, - table: table, - idxPath: path.Join(rootPath, database, table, "data", partPath), + rootPath: opts.rootPath, + database: opts.database, + table: opts.table, + idxPath: path.Join(opts.rootPath, opts.database, opts.table, "data", opts.partPath), entries: &sync.Map{}, filesInMerge: make(map[string]bool), + layers: opts.layers, } res.updateCtx, res.doUpdate = context.WithCancel(context.Background()) res.workCtx, res.stop = context.WithCancel(context.Background()) @@ -67,6 +76,15 @@ func newJsonPartIndex(rootPath string, database string, table string, partPath s return res, err } +func (J *jsonPartIndex) getLayer(name string) int { + for i, layer := range J.layers { + if layer.Name == name { + return i + } + } + return -1 +} + func (J *jsonPartIndex) GetMergePlan(layer string, iteration int) (*MergePlan, error) { suffix := fmt.Sprintf(".%d.parquet", iteration) var from []string @@ -125,8 +143,6 @@ func (J *jsonPartIndex) GetMergePlanner() TableMergePlanner { return J } -var _ TableIndex = &jsonPartIndex{} - func (J *jsonPartIndex) Query(options QueryOptions) ([]*IndexEntry, error) { var res []*IndexEntry var suffix string @@ -157,7 +173,7 @@ func (J *jsonPartIndex) addToDropQueue(files []*IndexEntry) { } -func (J *jsonPartIndex) RmFromDropQueue(files []string) Promise[int32] { +func (J *jsonPartIndex) RmFromDropQueue(layer string, files []string) Promise[int32] { J.m.Lock() defer J.m.Unlock() @@ -184,7 +200,7 @@ func (J *jsonPartIndex) RmFromDropQueue(files []string) Promise[int32] { return p } -func (J *jsonPartIndex) GetDropQueue() []string { +func (J *jsonPartIndex) GetDropQueue(layes string) []string { return J.dropQueue } @@ -278,16 +294,10 @@ func (J *jsonPartIndex) entry2JEntry(entries []*IndexEntry) ([]*jsonIndexEntry, for i, entry := range entries { id := atomic.AddUint32(&J.lastId, 1) _entry := &jsonIndexEntry{ - Id: id, - Layer: entry.Layer, - Path: entry.Path, - SizeBytes: entry.SizeBytes, - RowCount: entry.RowCount, - ChunkTime: entry.ChunkTime, - MinTime: entry.MinTime, - MaxTime: entry.MaxTime, - Range: "1h", - Type: "compacted", + Id: id, + IndexEntry: *entry, + Range: "1h", + Type: "compacted", } _marshalled, err := json.Marshal(_entry) if err != nil { @@ -499,17 +509,10 @@ func (J *jsonPartIndex) Stop() { } func (J *jsonPartIndex) jEntry2Entry(_e *jsonIndexEntry) *IndexEntry { - return &IndexEntry{ - Path: _e.Path, - SizeBytes: _e.SizeBytes, - RowCount: _e.RowCount, - ChunkTime: _e.ChunkTime, - MinTime: _e.MinTime, - MaxTime: _e.MaxTime, - } + return &_e.IndexEntry } -func (J *jsonPartIndex) Get(path string) *IndexEntry { +func (J *jsonPartIndex) Get(layer string, path string) *IndexEntry { e, _ := J.entries.Load(path) if e == nil { return nil @@ -517,3 +520,50 @@ func (J *jsonPartIndex) Get(path string) *IndexEntry { _e := e.(*jsonIndexEntry) return J.jEntry2Entry(_e) } + +func (J *jsonPartIndex) GetMovePlan(layer string) *MovePlan { + + J.m.Lock() + defer J.m.Unlock() + var plan *MovePlan + J.entries.Range(func(key, value any) bool { + val := value.(*jsonIndexEntry) + if J.filesInMerge[val.Path] { + return true + } + layerTdx := J.getLayer(val.Layer) + if layerTdx < 0 { + return true + } + layerTo := "" + if layerTdx+1 < len(J.layers) { + layerTo = J.layers[layerTdx+1].Name + } + if J.layers[layerTdx].TTL.Nanoseconds() > 0 && + time.Now().UnixNano()-val.ChunkTime >= J.layers[layerTdx].TTL.Nanoseconds() { + plan = &MovePlan{ + ID: "", + Database: J.database, + Table: J.table, + PathFrom: val.Path, + LayerFrom: val.Layer, + PathTo: val.Path, + LayerTo: layerTo, + } + return false + } + return true + }) + return plan +} + +func (J *jsonPartIndex) EndMove(plan *MovePlan) error { + J.m.Lock() + defer J.m.Unlock() + delete(J.filesInMove, plan.PathFrom) + return nil +} + +func (J *jsonPartIndex) GetMovePlanner() TableMovePlanner { + return J +} diff --git a/redis_index.go b/redis_index.go index 98d1535..4fdd667 100644 --- a/redis_index.go +++ b/redis_index.go @@ -223,7 +223,7 @@ func (r *RedisIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32] { return res } -func (r *RedisIndex) Get(path string) *IndexEntry { +func (r *RedisIndex) Get(layer string, path string) *IndexEntry { firstFolder := strings.Split(path, "/")[0] res, err := r.c.HGet( context.Background(), @@ -268,7 +268,7 @@ func (r *RedisIndex) AddToDropQueue(files []string) Promise[int32] { return Fulfilled[int32](nil, 0) } -func (r *RedisIndex) RmFromDropQueue(files []string) Promise[int32] { +func (r *RedisIndex) RmFromDropQueue(layer string, files []string) Promise[int32] { res := NewPromise[int32]() go func() { for _, file := range files { @@ -283,7 +283,7 @@ func (r *RedisIndex) RmFromDropQueue(files []string) Promise[int32] { return res } -func (r *RedisIndex) GetDropQueue() []string { +func (r *RedisIndex) GetDropQueue(layer string) []string { res, err := r.c.LRange(context.Background(), "drop", 0, -1).Result() if err != nil { return nil @@ -475,3 +475,17 @@ func (r *RedisIndex) Query(options QueryOptions) ([]*IndexEntry, error) { } return res, nil } + +func (r *RedisIndex) GetMovePlanner() TableMovePlanner { + return r +} + +func (r *RedisIndex) GetMovePlan() *MovePlan { + //TODO implement me + panic("implement me") +} + +func (r *RedisIndex) EndMove(plan *MovePlan) error { + //TODO implement me + panic("implement me") +} diff --git a/types.go b/types.go index 2752736..a303d7d 100644 --- a/types.go +++ b/types.go @@ -11,6 +11,13 @@ type MergeConfigurationsConf [][3]int64 var MergeConfigurations MergeConfigurationsConf +type Layer struct { + URL string + Name string + Type string + TTL time.Duration +} + type IndexEntry struct { Layer string `json:"layer"` Database string `json:"database"` @@ -23,6 +30,7 @@ type IndexEntry struct { Max map[string]any `json:"max"` MinTime int64 `json:"min_time"` MaxTime int64 `json:"max_time"` + WriterID string `json:"writer_id"` } type QueryOptions struct { @@ -42,6 +50,16 @@ type MergePlan struct { Iteration int } +type MovePlan struct { + ID string + Database string + Table string + PathFrom string + LayerFrom string + PathTo string + LayerTo string +} + type DBIndex interface { Databases() ([]string, error) Tables(database string) ([]string, error) @@ -50,13 +68,14 @@ type DBIndex interface { type TableIndex interface { Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32] - Get(path string) *IndexEntry + Get(layer string, path string) *IndexEntry Run() Stop() - RmFromDropQueue(files []string) Promise[int32] - GetDropQueue() []string + RmFromDropQueue(layer string, files []string) Promise[int32] + GetDropQueue(layer string) []string GetMergePlanner() TableMergePlanner GetQuerier() TableQuerier + GetMovePlanner() TableMovePlanner } type TableMergePlanner interface { @@ -64,6 +83,11 @@ type TableMergePlanner interface { EndMerge(plan *MergePlan) error } +type TableMovePlanner interface { + GetMovePlan(layer string) *MovePlan + EndMove(plan *MovePlan) error +} + type TableQuerier interface { Query(options QueryOptions) ([]*IndexEntry, error) } From 743458403f99f888298fa1cbc7658e08871139c3 Mon Sep 17 00:00:00 2001 From: akvlad Date: Thu, 5 Jun 2025 20:17:55 +0300 Subject: [PATCH 2/6] layers; writer_id implementation --- json_drop_planner.go | 37 +++++++ json_index.go | 107 ------------------- json_merge_planner.go | 42 ++++++++ json_move_planner.go | 44 ++++++++ json_part_drop_planner.go | 38 +++++++ json_part_index.go | 172 +++++++------------------------ json_part_merge_planner.go | 69 +++++++++++++ json_part_move_planner.go | 58 +++++++++++ redis_drop_planner.go | 32 ++++++ redis_index.go | 144 ++++---------------------- redis_index_test.go | 13 ++- redis_merge_planner.go | 67 ++++++++++++ redis_move_planner.go | 31 ++++++ redis_scripts/end_merge.lua | 6 +- redis_scripts/get_merge_plan.lua | 23 +++-- redis_scripts/patch_index.lua | 79 +++++++++----- redis_task_queue.go | 68 ++++++++++++ types.go | 52 ++++++++-- 18 files changed, 667 insertions(+), 415 deletions(-) create mode 100644 json_drop_planner.go create mode 100644 json_merge_planner.go create mode 100644 json_move_planner.go create mode 100644 json_part_drop_planner.go create mode 100644 json_part_merge_planner.go create mode 100644 json_part_move_planner.go create mode 100644 redis_drop_planner.go create mode 100644 redis_merge_planner.go create mode 100644 redis_move_planner.go create mode 100644 redis_task_queue.go diff --git a/json_drop_planner.go b/json_drop_planner.go new file mode 100644 index 0000000..ff4bb57 --- /dev/null +++ b/json_drop_planner.go @@ -0,0 +1,37 @@ +package metadata + +import "path" + +func (J *JSONIndex) GetDropQueue(writerId string, layer string) (DropPlan, error) { + parts := J.parts[layer] + if parts == nil { + return DropPlan{}, nil + } + for _, idx := range parts { + p, err := idx.GetDropPlanner().GetDropQueue(writerId, layer) + if err != nil { + return DropPlan{}, err + } + if p.Path != "" { + return p, nil + } + } + return DropPlan{}, nil +} + +func (J *JSONIndex) GetDropPlanner() TableDropPlanner { + return J +} + +func (J *JSONIndex) RmFromDropQueue(plan DropPlan) Promise[int32] { + l := J.parts[plan.Layer] + if l == nil { + return Fulfilled(nil, int32(0)) + } + dir := path.Dir(plan.Path) + part := l[dir] + if part != nil { + return part.RmFromDropQueue(plan) + } + return Fulfilled(nil, int32(0)) +} diff --git a/json_index.go b/json_index.go index d1dbb88..d35513e 100644 --- a/json_index.go +++ b/json_index.go @@ -50,45 +50,6 @@ func NewJSONIndex(root string, database string, table string, layers []Layer) Ta } } -func (J *JSONIndex) GetMergePlan(layer string, iteration int) (*MergePlan, error) { - J.lock.Lock() - defer J.lock.Unlock() - parts, ok := J.parts[layer] - if !ok { - return nil, nil - } - for _, part := range parts { - plan, err := part.GetMergePlan(layer, iteration) - if err != nil { - return nil, err - } - if plan == nil || len(plan.From) == 0 { - continue - } - return plan, nil - } - return nil, nil -} - -func (J *JSONIndex) EndMerge(plan *MergePlan) error { - if len(plan.From) == 0 { - return nil - } - J.lock.Lock() - defer J.lock.Unlock() - parts := J.parts[plan.Layer] - dir := path.Dir(plan.From[0]) - part := parts[dir] - if part != nil { - return part.EndMerge(plan) - } - return nil -} - -func (J *JSONIndex) GetMergePlanner() TableMergePlanner { - return J -} - func (J *JSONIndex) GetQuerier() TableQuerier { return J } @@ -202,39 +163,6 @@ func (J *JSONIndex) Stop() { } } -func (J *JSONIndex) RmFromDropQueue(layer string, files []string) Promise[int32] { - filesByPath := make(map[string][]string) - for _, file := range files { - _path := path.Dir(file) - filesByPath[_path] = append(filesByPath[_path], file) - } - J.lock.Lock() - defer J.lock.Unlock() - - var promises []Promise[int32] - for partPath, files := range filesByPath { - idx, err := J.populate(layer, partPath) - if err != nil { - //TODO: we should do something with the error - continue - } - promises = append(promises, idx.RmFromDropQueue(layer, files)) - } - return NewWaitForAll[int32](promises) -} - -func (J *JSONIndex) GetDropQueue(layer string) []string { - var queue []string - parts := J.parts[layer] - if parts == nil { - return nil - } - for _, idx := range parts { - queue = append(queue, idx.GetDropQueue(layer)...) - } - return queue -} - func (J *JSONIndex) findHours(options QueryOptions) ([]time.Time, error) { var hours []time.Time err := filepath.Walk(path.Join(J.root, J.database, J.table), func(path string, info os.FileInfo, err error) error { @@ -314,38 +242,3 @@ func (J *JSONIndex) Query(options QueryOptions) ([]*IndexEntry, error) { return entries, nil } - -func (J *JSONIndex) GetMovePlan(layer string) *MovePlan { - J.lock.Lock() - defer J.lock.Unlock() - l := J.parts[layer] - if l == nil { - return nil - } - for _, p := range l { - mp := p.GetMovePlanner().GetMovePlan(layer) - if mp != nil { - return mp - } - } - return nil -} - -func (J *JSONIndex) EndMove(plan *MovePlan) error { - J.lock.Lock() - defer J.lock.Unlock() - l := J.parts[plan.LayerFrom] - if l == nil { - return fmt.Errorf("layer \"%s\" not found", plan.LayerFrom) - } - dir := path.Dir(plan.PathFrom) - part := l[dir] - if part != nil { - return part.EndMove(plan) - } - return nil -} - -func (J *JSONIndex) GetMovePlanner() TableMovePlanner { - return J -} diff --git a/json_merge_planner.go b/json_merge_planner.go new file mode 100644 index 0000000..5b1d7f1 --- /dev/null +++ b/json_merge_planner.go @@ -0,0 +1,42 @@ +package metadata + +import "path" + +func (J *JSONIndex) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error) { + J.lock.Lock() + defer J.lock.Unlock() + parts, ok := J.parts[layer] + if !ok { + return MergePlan{}, nil + } + for _, part := range parts { + plan, err := part.GetMergePlan(writerId, layer, iteration) + if err != nil { + return MergePlan{}, err + } + if len(plan.From) != 0 { + return plan, nil + } + + } + return MergePlan{}, nil +} + +func (J *JSONIndex) EndMerge(plan MergePlan) Promise[int32] { + if len(plan.From) == 0 { + return nil + } + J.lock.Lock() + defer J.lock.Unlock() + parts := J.parts[plan.Layer] + dir := path.Dir(plan.From[0]) + part := parts[dir] + if part != nil { + return part.EndMerge(plan) + } + return nil +} + +func (J *JSONIndex) GetMergePlanner() TableMergePlanner { + return J +} diff --git a/json_move_planner.go b/json_move_planner.go new file mode 100644 index 0000000..85e14f3 --- /dev/null +++ b/json_move_planner.go @@ -0,0 +1,44 @@ +package metadata + +import ( + "fmt" + "path" +) + +func (J *JSONIndex) GetMovePlan(writerId string, layer string) (MovePlan, error) { + J.lock.Lock() + defer J.lock.Unlock() + l := J.parts[layer] + if l == nil { + return MovePlan{}, nil + } + for _, p := range l { + mp, err := p.GetMovePlanner().GetMovePlan(writerId, layer) + if err != nil { + return MovePlan{}, err + } + if mp.PathFrom != "" { + return mp, nil + } + } + return MovePlan{}, nil +} + +func (J *JSONIndex) EndMove(plan MovePlan) Promise[int32] { + J.lock.Lock() + defer J.lock.Unlock() + l := J.parts[plan.LayerFrom] + if l == nil { + return Fulfilled(fmt.Errorf("layer \"%s\" not found", plan.LayerFrom), int32(0)) + } + dir := path.Dir(plan.PathFrom) + part := l[dir] + if part != nil { + return part.EndMove(plan) + } + return nil +} + +func (J *JSONIndex) GetMovePlanner() TableMovePlanner { + return J +} diff --git a/json_part_drop_planner.go b/json_part_drop_planner.go new file mode 100644 index 0000000..14c2a85 --- /dev/null +++ b/json_part_drop_planner.go @@ -0,0 +1,38 @@ +package metadata + +func (J *jsonPartIndex) GetDropPlanner() TableDropPlanner { + return J +} + +func (J *jsonPartIndex) RmFromDropQueue(plan DropPlan) Promise[int32] { + J.m.Lock() + defer J.m.Unlock() + + updated := false + + for i := len(J.dropQueue) - 1; i >= 0; i-- { + if J.dropQueue[i].Path != plan.Path { + continue + } + J.dropQueue[i] = J.dropQueue[len(J.dropQueue)-1] + J.dropQueue = J.dropQueue[:len(J.dropQueue)-1] + updated = true + break + } + + if !updated { + return Fulfilled[int32](nil, 0) + } + + p := NewPromise[int32]() + J.promises = append(J.promises, p) + J.doUpdate() + return p +} + +func (J *jsonPartIndex) GetDropQueue(writerId string, layer string) (DropPlan, error) { + if len(J.dropQueue) == 0 { + return DropPlan{}, nil + } + return J.dropQueue[0], nil +} diff --git a/json_part_index.go b/json_part_index.go index f925cee..02ca9c0 100644 --- a/json_part_index.go +++ b/json_part_index.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/google/uuid" jsoniter "github.com/json-iterator/go" "os" "path" @@ -49,7 +48,7 @@ type jsonPartIndex struct { stop context.CancelFunc lastId uint32 - dropQueue []string + dropQueue []DropPlan parquetSizeBytes int64 rowCount int64 minTime int64 @@ -85,64 +84,10 @@ func (J *jsonPartIndex) getLayer(name string) int { return -1 } -func (J *jsonPartIndex) GetMergePlan(layer string, iteration int) (*MergePlan, error) { - suffix := fmt.Sprintf(".%d.parquet", iteration) - var from []string - var size int64 - if iteration > len(MergeConfigurations) { - return nil, fmt.Errorf("no more merge configurations available for iteration %d", iteration) - } - conf := MergeConfigurations[iteration-1] - J.m.Lock() - defer J.m.Unlock() - J.entries.Range(func(key, value interface{}) bool { - entry := value.(*jsonIndexEntry) - if !strings.HasSuffix(entry.Path, suffix) { - return true - } - if J.filesInMerge[entry.Path] { - return true - } - if size > conf[1] { - return false - } - from = append(from, entry.Path) - size += entry.SizeBytes - return true - }) - for _, file := range from { - J.filesInMerge[file] = true - } - uid, _ := uuid.NewUUID() - - tablePath := path.Join(J.rootPath, J.database, J.table, "data") + "/" - partPath := J.idxPath[len(tablePath):] - return &MergePlan{ - Database: J.database, - Table: J.table, - From: from, - To: path.Join(partPath, fmt.Sprintf("%s.%d.parquet", uid.String(), iteration+1)), - Iteration: iteration, - }, nil -} - -func (J *jsonPartIndex) EndMerge(plan *MergePlan) error { - J.m.Lock() - defer J.m.Unlock() - for _, file := range plan.From { - delete(J.filesInMerge, file) - } - return nil -} - func (J *jsonPartIndex) GetQuerier() TableQuerier { return J } -func (J *jsonPartIndex) GetMergePlanner() TableMergePlanner { - return J -} - func (J *jsonPartIndex) Query(options QueryOptions) ([]*IndexEntry, error) { var res []*IndexEntry var suffix string @@ -168,40 +113,16 @@ func (J *jsonPartIndex) Query(options QueryOptions) ([]*IndexEntry, error) { func (J *jsonPartIndex) addToDropQueue(files []*IndexEntry) { for _, f := range files { - J.dropQueue = append(J.dropQueue, f.Path) - } - -} - -func (J *jsonPartIndex) RmFromDropQueue(layer string, files []string) Promise[int32] { - J.m.Lock() - defer J.m.Unlock() - - updated := false - for i := len(J.dropQueue) - 1; i >= 0; i-- { - for _, file := range files { - if J.dropQueue[i] != file { - continue - } - J.dropQueue[i] = J.dropQueue[len(J.dropQueue)-1] - J.dropQueue = J.dropQueue[:len(J.dropQueue)-1] - updated = true - break - } + J.dropQueue = append(J.dropQueue, DropPlan{ + WriterID: f.WriterID, + Layer: f.Layer, + Database: f.Database, + Table: f.Table, + Path: f.Path, + TimeS: int32(time.Now().Add(time.Second * 30).Unix()), + }) } - if !updated { - return Fulfilled[int32](nil, 0) - } - - p := NewPromise[int32]() - J.promises = append(J.promises, p) - J.doUpdate() - return p -} - -func (J *jsonPartIndex) GetDropQueue(layes string) []string { - return J.dropQueue } func (J *jsonPartIndex) populate() error { @@ -221,7 +142,26 @@ func (J *jsonPartIndex) populate() error { switch s { case "drop_queue": for iterator.ReadArray() { - dropQueueEntry := iterator.ReadString() + var dropQueueEntry DropPlan + iterator.ReadMapCB(func(iterator *jsoniter.Iterator, s string) bool { + switch s { + case "writer_id": + dropQueueEntry.WriterID = iterator.ReadString() + case "layer": + dropQueueEntry.Layer = iterator.ReadString() + case "database": + dropQueueEntry.Database = iterator.ReadString() + case "table": + dropQueueEntry.Table = iterator.ReadString() + case "path": + dropQueueEntry.Path = iterator.ReadString() + case "time_s": + dropQueueEntry.TimeS = iterator.ReadInt32() + default: + iterator.Skip() + } + return true + }) J.dropQueue = append(J.dropQueue, dropQueueEntry) } case "type": @@ -449,7 +389,12 @@ func (J *jsonPartIndex) flush() { if i > 0 { stream.WriteMore() } - stream.WriteString(d) + strD, err := json.Marshal(d) + if err != nil { + onErr(err) + return + } + stream.WriteString(string(strD)) } stream.WriteArrayEnd() @@ -520,50 +465,3 @@ func (J *jsonPartIndex) Get(layer string, path string) *IndexEntry { _e := e.(*jsonIndexEntry) return J.jEntry2Entry(_e) } - -func (J *jsonPartIndex) GetMovePlan(layer string) *MovePlan { - - J.m.Lock() - defer J.m.Unlock() - var plan *MovePlan - J.entries.Range(func(key, value any) bool { - val := value.(*jsonIndexEntry) - if J.filesInMerge[val.Path] { - return true - } - layerTdx := J.getLayer(val.Layer) - if layerTdx < 0 { - return true - } - layerTo := "" - if layerTdx+1 < len(J.layers) { - layerTo = J.layers[layerTdx+1].Name - } - if J.layers[layerTdx].TTL.Nanoseconds() > 0 && - time.Now().UnixNano()-val.ChunkTime >= J.layers[layerTdx].TTL.Nanoseconds() { - plan = &MovePlan{ - ID: "", - Database: J.database, - Table: J.table, - PathFrom: val.Path, - LayerFrom: val.Layer, - PathTo: val.Path, - LayerTo: layerTo, - } - return false - } - return true - }) - return plan -} - -func (J *jsonPartIndex) EndMove(plan *MovePlan) error { - J.m.Lock() - defer J.m.Unlock() - delete(J.filesInMove, plan.PathFrom) - return nil -} - -func (J *jsonPartIndex) GetMovePlanner() TableMovePlanner { - return J -} diff --git a/json_part_merge_planner.go b/json_part_merge_planner.go new file mode 100644 index 0000000..a2105d5 --- /dev/null +++ b/json_part_merge_planner.go @@ -0,0 +1,69 @@ +package metadata + +import ( + "fmt" + "github.com/google/uuid" + "path" + "strings" +) + +func (J *jsonPartIndex) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error) { + suffix := fmt.Sprintf(".%d.parquet", iteration) + var from []string + var size int64 + if iteration > len(MergeConfigurations) { + return MergePlan{}, fmt.Errorf("no more merge configurations available for iteration %d", iteration) + } + conf := MergeConfigurations[iteration-1] + J.m.Lock() + defer J.m.Unlock() + J.entries.Range(func(key, value interface{}) bool { + entry := value.(*jsonIndexEntry) + if !strings.HasSuffix(entry.Path, suffix) { + return true + } + if J.filesInMerge[entry.Path] { + return true + } + if size > conf[1] { + return false + } + from = append(from, entry.Path) + size += entry.SizeBytes + return true + }) + for _, file := range from { + J.filesInMerge[file] = true + } + uid, _ := uuid.NewUUID() + + tablePath := path.Join(J.rootPath, J.database, J.table, "data") + "/" + partPath := J.idxPath[len(tablePath):] + return MergePlan{ + Database: J.database, + Table: J.table, + From: from, + To: path.Join(partPath, fmt.Sprintf("%s.%d.parquet", uid.String(), iteration+1)), + Iteration: iteration, + }, nil +} + +func (J *jsonPartIndex) EndMerge(plan MergePlan) Promise[int32] { + J.m.Lock() + defer J.m.Unlock() + update := false + for _, file := range plan.From { + update = update || J.filesInMerge[file] + delete(J.filesInMerge, file) + } + if !update { + return Fulfilled[int32](nil, 0) + } + p := NewPromise[int32]() + J.promises = append(J.promises, p) + return p +} + +func (J *jsonPartIndex) GetMergePlanner() TableMergePlanner { + return J +} diff --git a/json_part_move_planner.go b/json_part_move_planner.go new file mode 100644 index 0000000..85c9797 --- /dev/null +++ b/json_part_move_planner.go @@ -0,0 +1,58 @@ +package metadata + +import ( + "time" +) + +func (J *jsonPartIndex) GetMovePlan(writerId string, layer string) (MovePlan, error) { + + J.m.Lock() + defer J.m.Unlock() + var plan *MovePlan + J.entries.Range(func(key, value any) bool { + val := value.(*jsonIndexEntry) + if J.filesInMerge[val.Path] { + return true + } + layerTdx := J.getLayer(val.Layer) + if layerTdx < 0 { + return true + } + layerTo := "" + if layerTdx+1 < len(J.layers) { + layerTo = J.layers[layerTdx+1].Name + } + if J.layers[layerTdx].TTLSec > 0 && + time.Now().UnixNano()-val.ChunkTime >= int64(J.layers[layerTdx].TTLSec)*1000000000 { + plan = &MovePlan{ + ID: "", + Database: J.database, + Table: J.table, + PathFrom: val.Path, + LayerFrom: val.Layer, + PathTo: val.Path, + LayerTo: layerTo, + } + return false + } + return true + }) + return *plan, nil +} + +func (J *jsonPartIndex) EndMove(plan MovePlan) Promise[int32] { + J.m.Lock() + defer J.m.Unlock() + if _, ok := J.filesInMove[plan.PathFrom]; !ok { + return Fulfilled[int32](nil, 0) + } + delete(J.filesInMove, plan.PathFrom) + p := NewPromise[int32]() + J.promises = append(J.promises, p) + J.doUpdate() + return p +} + +func (J *jsonPartIndex) GetMovePlanner() TableMovePlanner { + return J +} diff --git a/redis_drop_planner.go b/redis_drop_planner.go new file mode 100644 index 0000000..a12e24b --- /dev/null +++ b/redis_drop_planner.go @@ -0,0 +1,32 @@ +package metadata + +func (r *RedisIndex) GetDropPlanner() TableDropPlanner { + return r +} + +func (r *RedisIndex) RmFromDropQueue(plan DropPlan) Promise[int32] { + return Fulfilled((&redisTaskQueue[DropPlan]{ + prefix: "drop", + database: r.database, + table: r.table, + suffix: "", + writerId: plan.WriterID, + layer: plan.Layer, + getEntrySHA: r.getMergePlanSha, + redis: r.c, + }).finishProcess(plan), int32(0)) +} + +func (r *RedisIndex) GetDropQueue(writerId string, layer string) (DropPlan, error) { + plan, err := (&redisTaskQueue[DropPlan]{ + prefix: "drop", + database: r.database, + table: r.table, + suffix: "", + writerId: writerId, + layer: layer, + getEntrySHA: r.getMergePlanSha, + redis: r.c, + }).processEntry() + return plan, err +} diff --git a/redis_index.go b/redis_index.go index 4fdd667..5be3e1d 100644 --- a/redis_index.go +++ b/redis_index.go @@ -5,13 +5,11 @@ import ( "crypto/tls" "encoding/json" "fmt" - "github.com/google/uuid" "github.com/redis/go-redis/v9" "math" "net" "net/url" "os" - "path/filepath" "strconv" "strings" "time" @@ -60,6 +58,7 @@ type RedisIndex struct { database string table string + layers []Layer } func getRedisClient(u *url.URL) (*redis.Client, error) { @@ -93,7 +92,7 @@ func getRedisClient(u *url.URL) (*redis.Client, error) { return redis.NewClient(opts), nil } -func NewRedisIndex(URL string, database string, table string) (TableIndex, error) { +func NewRedisIndex(URL string, database string, table string, layers []Layer) (TableIndex, error) { u, err := url.Parse(URL) if err != nil { return nil, err @@ -102,6 +101,7 @@ func NewRedisIndex(URL string, database string, table string) (TableIndex, error url: u, database: database, table: table, + layers: layers, } client, err := getRedisClient(u) @@ -118,63 +118,6 @@ func NewRedisIndex(URL string, database string, table string) (TableIndex, error return idx, nil } -type redisMergePlan struct { - ID string `json:"id"` - Time int32 `json:"time"` - Paths []string `json:"paths"` -} - -func (r *RedisIndex) GetMergePlan(layer string, iteration int) (*MergePlan, error) { - res, err := r.c.EvalSha(context.Background(), r.getMergePlanSha, []string{ - r.database, - r.table, - strconv.Itoa(iteration), - strconv.FormatInt(MergeConfigurations[iteration-1][1], 10), - }, nil).Result() - if err != nil { - return nil, fmt.Errorf("failed to execute script: %v", err) - } - if res == nil || res.(string) == "" { - return nil, nil - } - var plan redisMergePlan - err = json.Unmarshal([]byte(res.(string)), &plan) - - if len(plan.Paths) == 0 { - return nil, nil - } - - firstFile := plan.Paths[0] - firstFileDir := filepath.Dir(firstFile) - - return &MergePlan{ - ID: plan.ID, - Layer: layer, - Database: r.database, - Table: r.table, - From: plan.Paths, - To: filepath.Join(firstFileDir, fmt.Sprintf("%s.%d.parquet", uuid.New().String(), iteration+1)), - Iteration: iteration, - }, err -} - -func (r *RedisIndex) EndMerge(plan *MergePlan) error { - if plan == nil { - return nil - } - _, err := r.c.EvalSha(context.Background(), r.endMergeSha, []string{ - plan.Database, - plan.Table, - strconv.Itoa(plan.Iteration), - plan.ID, - }).Result() - return err -} - -func (r *RedisIndex) GetMergePlanner() TableMergePlanner { - return r -} - func (r *RedisIndex) GetQuerier() TableQuerier { return r } @@ -211,13 +154,28 @@ func (r *RedisIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32] { } res := NewPromise[int32]() - var keys []string - for _, c := range MergeConfigurations { - keys = append(keys, strconv.FormatInt(c[1], 10)) + var err error + var keys [2][]byte + keys[0], err = json.Marshal(MergeConfigurations) + if err != nil { + res.Done(0, err) + return res + } + lmap := make(map[string]Layer) + for _, layer := range r.layers { + lmap[layer.Name] = layer + } + keys[1], err = json.Marshal(lmap) + if err != nil { + res.Done(0, err) + return res } go func() { - _, err := r.c.EvalSha(context.Background(), r.patchSha, keys, cmds...).Result() + _, err := r.c.EvalSha(context.Background(), r.patchSha, []string{ + string(keys[0]), + string(keys[1]), + }, cmds...).Result() res.Done(0, err) }() return res @@ -247,50 +205,6 @@ func (r *RedisIndex) Run() { func (r *RedisIndex) Stop() { } -type QEntry struct { - Path string `json:"path"` - TimeS int32 `json:"time"` -} - -func (r *RedisIndex) AddToDropQueue(files []string) Promise[int32] { - _files := make([]any, len(files)) - for i, file := range files { - _file, err := json.Marshal(QEntry{ - Path: file, - TimeS: int32(time.Now().Unix()), - }) - if err != nil { - return Fulfilled[int32](err, 0) - } - _files[i] = string(_file) - } - r.c.LPush(context.Background(), "drop", _files...) - return Fulfilled[int32](nil, 0) -} - -func (r *RedisIndex) RmFromDropQueue(layer string, files []string) Promise[int32] { - res := NewPromise[int32]() - go func() { - for _, file := range files { - _, err := r.c.LRem(context.Background(), "drop", 1, file).Result() - if err != nil { - res.Done(0, err) - return - } - } - res.Done(0, nil) - }() - return res -} - -func (r *RedisIndex) GetDropQueue(layer string) []string { - res, err := r.c.LRange(context.Background(), "drop", 0, -1).Result() - if err != nil { - return nil - } - return res -} - func redisScan(scanFn func(cursor uint64) (uint64, error)) error { var err error var cursor uint64 = 0 @@ -475,17 +389,3 @@ func (r *RedisIndex) Query(options QueryOptions) ([]*IndexEntry, error) { } return res, nil } - -func (r *RedisIndex) GetMovePlanner() TableMovePlanner { - return r -} - -func (r *RedisIndex) GetMovePlan() *MovePlan { - //TODO implement me - panic("implement me") -} - -func (r *RedisIndex) EndMove(plan *MovePlan) error { - //TODO implement me - panic("implement me") -} diff --git a/redis_index_test.go b/redis_index_test.go index 322b8dd..887fec6 100644 --- a/redis_index_test.go +++ b/redis_index_test.go @@ -11,7 +11,12 @@ func TestSave(t *testing.T) { MergeConfigurations = [][3]int64{ {10, 10 * 1024 * 1024, 1}, } - idx, err := NewRedisIndex("redis://localhost:6379/0", "default", "test") + idx, err := NewRedisIndex( + "redis://localhost:6379/0", + "default", + "test", []Layer{ + {"file:///data", "l1", "fs", 20}, + }) if err != nil { t.Fatalf("Failed to create index: %v", err) } @@ -30,6 +35,8 @@ func TestSave(t *testing.T) { ts.UTC().Hour(), uuid.New().String()), SizeBytes: 1000000, + ChunkTime: time.Now().UnixNano(), + Layer: "l1", }) } @@ -41,7 +48,7 @@ func TestSave(t *testing.T) { fmt.Printf("Items saved: %d\n", len(ents)) } -func TestSaveAndDel(t *testing.T) { +/*func TestSaveAndDel(t *testing.T) { MergeConfigurations = [][3]int64{ {10, 10 * 1024 * 1024, 1}, } @@ -115,4 +122,4 @@ func TestRedisDBIndex(t *testing.T) { paths, err := idx.Paths("default", "test") fmt.Println("Paths:", paths) -} +}*/ diff --git a/redis_merge_planner.go b/redis_merge_planner.go new file mode 100644 index 0000000..48e889b --- /dev/null +++ b/redis_merge_planner.go @@ -0,0 +1,67 @@ +package metadata + +import ( + "fmt" + "github.com/google/uuid" + "path/filepath" + "strconv" +) + +type redisMergePlan struct { + ID string `json:"id"` + Time int32 `json:"time"` + Paths []string `json:"paths"` +} + +func (r redisMergePlan) Id() string { + return r.ID +} + +func (r *RedisIndex) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error) { + plan, err := (&redisTaskQueue[redisMergePlan]{ + prefix: "merge:", + database: r.database, + table: r.table, + suffix: strconv.Itoa(iteration), + writerId: writerId, + layer: layer, + getEntrySHA: r.getMergePlanSha, + redis: r.c, + }).processEntry() + + if len(plan.Paths) == 0 { + return MergePlan{}, nil + } + + firstFile := plan.Paths[0] + firstFileDir := filepath.Dir(firstFile) + + return MergePlan{ + ID: plan.ID, + Layer: layer, + Database: r.database, + Table: r.table, + From: plan.Paths, + To: filepath.Join(firstFileDir, fmt.Sprintf("%s.%d.parquet", uuid.New().String(), iteration+1)), + Iteration: iteration, + WriterID: writerId, + }, err +} + +func (r *RedisIndex) EndMerge(plan MergePlan) Promise[int32] { + err := (&redisTaskQueue[redisMergePlan]{ + prefix: "merge:", + database: r.database, + table: r.table, + suffix: strconv.Itoa(plan.Iteration), + writerId: plan.WriterID, + layer: plan.Layer, + getEntrySHA: r.getMergePlanSha, + redis: r.c, + }).finishProcess(redisMergePlan{ID: plan.ID}) + return Fulfilled(err, int32(0)) +} + +func (r *RedisIndex) GetMergePlanner() TableMergePlanner { + return r +} diff --git a/redis_move_planner.go b/redis_move_planner.go new file mode 100644 index 0000000..d1dba2f --- /dev/null +++ b/redis_move_planner.go @@ -0,0 +1,31 @@ +package metadata + +func (r *RedisIndex) GetMovePlanner() TableMovePlanner { + return r +} + +func (r *RedisIndex) GetMovePlan(writerId string, layer string) (MovePlan, error) { + return (&redisTaskQueue[MovePlan]{ + prefix: "move:", + database: r.database, + table: r.table, + suffix: "", + writerId: writerId, + layer: layer, + getEntrySHA: r.getMergePlanSha, + redis: r.c, + }).processEntry() +} + +func (r *RedisIndex) EndMove(plan MovePlan) Promise[int32] { + return Fulfilled((&redisTaskQueue[MovePlan]{ + prefix: "move:", + database: r.database, + table: r.table, + suffix: "", + writerId: plan.WriterID, + layer: plan.LayerFrom, + getEntrySHA: r.getMergePlanSha, + redis: r.c, + }).finishProcess(plan), int32(0)) +} diff --git a/redis_scripts/end_merge.lua b/redis_scripts/end_merge.lua index 0bfe916..262d82a 100644 --- a/redis_scripts/end_merge.lua +++ b/redis_scripts/end_merge.lua @@ -2,10 +2,12 @@ local database = KEYS[1] local table = KEYS[2] local index = KEYS[3] -local merge_id = KEYS[4] +local layer = KEYS[4] +local writer_id = KEYS[5] +local merge_id = KEYS[6] -- Construct the merge key -local merge_key = "merge:" .. database .. ":" .. table .. ":" .. index .. ":processing" +local merge_key = "merge:" .. database .. ":" .. table .. ":" .. index .. ":" .. layer .. ":" .. writer_id .. ":processing" -- Function to remove a merge plan by ID local function remove_merge_plan(key, id) diff --git a/redis_scripts/get_merge_plan.lua b/redis_scripts/get_merge_plan.lua index eb79cb8..33fc666 100644 --- a/redis_scripts/get_merge_plan.lua +++ b/redis_scripts/get_merge_plan.lua @@ -1,10 +1,15 @@ -- Construct the merge key from the provided KEYS -local database = KEYS[1] -local table = KEYS[2] -local index = KEYS[3] --- Timeout before we take an idle merge plan into process -local timeout_s = tonumber(KEYS[4]) -local merge_key_base = "merge:" .. database .. ":" .. table .. ":" .. index +local prefix = KEYS[1] +local database = KEYS[2] +local table = KEYS[3] +local suffix = KEYS[4] +local layer = KEYS[5] +local writer_id = KEYS[6] + +if suffix ~= "" then + suffix = suffix .. ":" +end +local merge_key_base = prefix .. ":" .. database .. ":" .. table .. ":" .. suffix .. layer .. ":" .. writer_id local merge_key_idle = merge_key_base .. ":idle" local merge_key_processing = merge_key_base .. ":processing" @@ -16,10 +21,10 @@ local function process_idle_item() local merge_item_json = redis.call("LPOP", merge_key_idle) if merge_item_json then local merge_item = cjson.decode(merge_item_json) - if tonumber(merge_item.time) + timeout_s < tonumber(current_time) then + if tonumber(merge_item.time) < current_time then return false end - merge_item.time = current_time + merge_item.time = current_time + 1800 -- 30m timeout to reprocess the dead items local updated_item_json = cjson.encode(merge_item) -- Push the updated item to the processing list @@ -38,7 +43,7 @@ local function process_processing_item() local processing_item = cjson.decode(processing_item_json) -- Check if the first item in processing is older than the timeout - if tonumber(current_time) - tonumber(processing_item.time) > timeout_s then + if current_time > tonumber(processing_item.time) then -- Remove the item from the processing list redis.call("LPOP", merge_key_processing) diff --git a/redis_scripts/patch_index.lua b/redis_scripts/patch_index.lua index afed5a4..2b97caa 100644 --- a/redis_scripts/patch_index.lua +++ b/redis_scripts/patch_index.lua @@ -1,3 +1,6 @@ +local merge_conf = cjson.decode(KEYS[1]) +local move_conf = cjson.decode(KEYS[2]) + -- Function to generate a pseudo-UUID local function generate_uuid() local random = math.random @@ -51,9 +54,52 @@ local function delete_file(entry) redis.call("DEL", "folders:" .. entry.database .. ":" .. entry.table) end + local drop_queue_key = "drop:" .. entry.database .. ":".. entry.table .. ":".. entry.layer .. ":".. entry.writer_id + local new_drop = cjson.encode({ + writer_id = entry.writer_id, + layer = entry.layer, + path = entry.path, + database = entry.database, + table = entry.table, + time_s = tonumber(redis.call("TIME")[1]) + 30 + }) + redis.call("LPUSH", drop_queue_key, new_drop) + + return {success = true} +end + +local function merge_entry(entry, index) + local merge_key = "merge:" .. entry.database .. ":" .. entry.table .. ":" .. index .. ":" .. entry.layer .. ":" .. entry.writer_id .. ":idle" + local last_merge = redis.call("LINDEX", merge_key, -1) + + if not last_merge then + -- Create and push a new merge object + create_and_push_new_merge(merge_key, entry.path, entry.size_bytes) + return {success = true} + end + + -- Parse JSON from the last merge entry + local last_merge_data = cjson.decode(last_merge) + + if last_merge_data.size + entry.size_bytes > tonumber(merge_conf[index][2]) then + -- Create and push a new merge object + create_and_push_new_merge(merge_key, entry.path, entry.size_bytes) + return {success = true} + end + + -- Update the last merge entry + last_merge_data.size = last_merge_data.size + entry.size_bytes + table.insert(last_merge_data.paths, entry.path) + local updated_merge = cjson.encode(last_merge_data) + redis.call("LSET", merge_key, -1, updated_merge) return {success = true} end +local function move_entry(entry) + local move_key = "move:".. entry.database.. ":".. entry.table.. ":".. entry.layer_from .. ":".. entry.writer_id + redis.call("LPUSH", move_key, cjson.encode(entry)) +end + -- Function to process a single file local function process_file(entry) if entry.cmd == "DELETE" then @@ -66,44 +112,27 @@ local function process_file(entry) return {success = false, error = "Invalid file path format: " .. entry.path} end - local index_num = tonumber(index) + local index_num = tonumber(index) local dir = get_dir(entry.path) redis.call("HINCRBY", "folders:" .. entry.database .. ":" .. entry.table, dir, 1) -- Create a Redis entry for the file - local main_key = hash_key(entry) + local main_key = hash_key(entry) redis.call("HSET", main_key, entry.path, cjson.encode(entry)) if index_num > #KEYS then return {success = true} end - -- Get the last value from the merge list - local merge_key = "merge:" .. entry.database .. ":" .. entry.table .. ":" .. index .. ":idle" - local last_merge = redis.call("LINDEX", merge_key, -1) + local chunk_time_s = tonumber(entry.str_chunk_time) / 1000000000 + local merge_time_s = chunk_time_s + tonumber(merge_conf[index_num][1]) + local move_time_s = chunk_time_s + move_conf[entry.layer].ttl_sec - if not last_merge then - -- Create and push a new merge object - create_and_push_new_merge(merge_key, entry.path, entry.size_bytes) - return {success = true} + if move_conf[entry.layer].ttl_sec == 0 or merge_time_s <= move_time_s then + return merge_entry(entry, index_num) end - - -- Parse JSON from the last merge entry - local last_merge_data = cjson.decode(last_merge) - - if last_merge_data.size + entry.size_bytes > tonumber(KEYS[index_num]) then - -- Create and push a new merge object - create_and_push_new_merge(merge_key, entry.path, entry.size_bytes) - return {success = true} - end - - -- Update the last merge entry - last_merge_data.size = last_merge_data.size + entry.size_bytes - table.insert(last_merge_data.paths, entry.path) - local updated_merge = cjson.encode(last_merge_data) - redis.call("LSET", merge_key, -1, updated_merge) - return {success = true} + return move_entry(entry) end -- Process all files diff --git a/redis_task_queue.go b/redis_task_queue.go new file mode 100644 index 0000000..a10c928 --- /dev/null +++ b/redis_task_queue.go @@ -0,0 +1,68 @@ +package metadata + +import ( + "context" + "encoding/json" + "fmt" + "github.com/redis/go-redis/v9" +) + +type redisTaskQueue[T Identified] struct { + prefix string + database string + table string + suffix string + writerId string + layer string + + getEntrySHA string + redis *redis.Client +} + +func (q *redisTaskQueue[T]) processEntry() (T, error) { + var res T + eStr, err := q.redis.EvalSha(context.Background(), q.getEntrySHA, []string{ + q.prefix, + q.database, + q.table, + q.suffix, + q.layer, + q.writerId, + }, nil).Result() + if err != nil { + return res, err + } + + err = json.Unmarshal([]byte(eStr.(string)), &res) + return res, err +} + +func (q *redisTaskQueue[T]) finishProcess(entry T) error { + suffix := q.suffix + if suffix != "" { + suffix += ":" + } + key := fmt.Sprintf("%s:%s:%s:%s%s:%s:processing", + q.prefix, q.database, q.table, q.suffix, q.layer, q.writerId) + strQ, err := q.redis.LRange(context.Background(), key, 0, -1).Result() + if err != nil { + return err + } + for _, e := range strQ { + var _e T + err = json.Unmarshal([]byte(e), &_e) + if err != nil { + continue + } + if _e.Id() == entry.Id() { + _, err := q.redis.LRem(context.Background(), key, 1, e).Result() + return err + } + } + return nil +} + +func (q *redisTaskQueue[T]) AddEntry(entry T) Promise[int32] { + //TODO: not implemented + return nil +} diff --git a/types.go b/types.go index a303d7d..19b9417 100644 --- a/types.go +++ b/types.go @@ -12,10 +12,10 @@ type MergeConfigurationsConf [][3]int64 var MergeConfigurations MergeConfigurationsConf type Layer struct { - URL string - Name string - Type string - TTL time.Duration + URL string `json:"url"` + Name string `json:"name"` + Type string `json:"type"` + TTLSec int32 `json:"ttl_sec"` } type IndexEntry struct { @@ -40,8 +40,13 @@ type QueryOptions struct { Iteration int } +type Identified interface { + Id() string +} + type MergePlan struct { ID string + WriterID string Layer string Database string Table string @@ -50,8 +55,13 @@ type MergePlan struct { Iteration int } +func (m MergePlan) Id() string { + return m.ID +} + type MovePlan struct { ID string + WriterID string Database string Table string PathFrom string @@ -60,6 +70,24 @@ type MovePlan struct { LayerTo string } +func (m MovePlan) Id() string { + return m.ID +} + +type DropPlan struct { + ID string + WriterID string + Layer string + Database string + Table string + Path string + TimeS int32 +} + +func (d DropPlan) Id() string { + return d.ID +} + type DBIndex interface { Databases() ([]string, error) Tables(database string) ([]string, error) @@ -71,21 +99,25 @@ type TableIndex interface { Get(layer string, path string) *IndexEntry Run() Stop() - RmFromDropQueue(layer string, files []string) Promise[int32] - GetDropQueue(layer string) []string GetMergePlanner() TableMergePlanner GetQuerier() TableQuerier GetMovePlanner() TableMovePlanner + GetDropPlanner() TableDropPlanner +} + +type TableDropPlanner interface { + GetDropQueue(writerId string, layer string) (DropPlan, error) + RmFromDropQueue(plan DropPlan) Promise[int32] } type TableMergePlanner interface { - GetMergePlan(layer string, iteration int) (*MergePlan, error) - EndMerge(plan *MergePlan) error + GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error) + EndMerge(plan MergePlan) Promise[int32] } type TableMovePlanner interface { - GetMovePlan(layer string) *MovePlan - EndMove(plan *MovePlan) error + GetMovePlan(writerId string, layer string) (MovePlan, error) + EndMove(plan MovePlan) Promise[int32] } type TableQuerier interface { From 20189b75da671798aff373320942f200e83fb099 Mon Sep 17 00:00:00 2001 From: akvlad Date: Thu, 5 Jun 2025 23:15:30 +0300 Subject: [PATCH 3/6] debug & test json index --- .gitignore | 1 + json_index.go | 10 ++-- json_index_test.go | 88 ++++++++++++++++++++++++++++++++ json_part_index.go | 6 ++- redis_index_test.go | 61 ++++++++++++++++++---- redis_scripts/get_merge_plan.lua | 3 +- redis_scripts/patch_index.lua | 7 +-- redis_task_queue.go | 4 ++ 8 files changed, 161 insertions(+), 19 deletions(-) create mode 100644 json_index_test.go diff --git a/.gitignore b/.gitignore index 85e7c1d..637018e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /.idea/ +_testdata \ No newline at end of file diff --git a/json_index.go b/json_index.go index d35513e..3593b8e 100644 --- a/json_index.go +++ b/json_index.go @@ -70,12 +70,12 @@ func (J *JSONIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32] { } var promises []Promise[int32] for l := range layers { - promises = append(promises, J.batchLayer(J.parts[l], addByLayer[l], rmByLayer[l])) + promises = append(promises, J.batchLayer(l, addByLayer[l], rmByLayer[l])) } return NewWaitForAll[int32](promises) } -func (J *JSONIndex) batchLayer(parts map[string]*jsonPartIndex, add []*IndexEntry, rm []*IndexEntry) Promise[int32] { +func (J *JSONIndex) batchLayer(layer string, add []*IndexEntry, rm []*IndexEntry) Promise[int32] { addByPath := make(map[string][]*IndexEntry) rmByPath := make(map[string][]*IndexEntry) paths := make(map[string]bool) @@ -92,9 +92,9 @@ func (J *JSONIndex) batchLayer(parts map[string]*jsonPartIndex, add []*IndexEntr var promises []Promise[int32] for partPath := range paths { - idx := parts[partPath] - if idx == nil { - return Fulfilled[int32](fmt.Errorf("part \"%s\" not found", partPath), 0) + idx, err := J.populate(partPath, layer) + if err != nil { + return Fulfilled[int32](err, 0) } promises = append(promises, idx.Batch(addByPath[partPath], rmByPath[partPath])) } diff --git a/json_index_test.go b/json_index_test.go new file mode 100644 index 0000000..53b4f9a --- /dev/null +++ b/json_index_test.go @@ -0,0 +1,88 @@ +package metadata + +import ( + "fmt" + "github.com/google/uuid" + "testing" + "time" +) + +func TestJSONSave(t *testing.T) { + MergeConfigurations = [][3]int64{ + {10, 10 * 1024 * 1024, 1}, + } + idx := NewJSONIndex( + "redis://localhost:6379/0", + "default", + "test", + layers) + var ents []*IndexEntry + now := time.Now() + threeDaysAgo := now.Add(-3 * 24 * time.Hour) + + for ts := threeDaysAgo; ts.Before(now); ts = ts.Add(15 * time.Second) { + ents = append(ents, &IndexEntry{ + Database: "default", + Table: "test", + MinTime: ts.UnixNano(), + MaxTime: ts.Add(15 * time.Second).UnixNano(), + Path: fmt.Sprintf("date=%s/hour=%02d/%s.1.parquet", + ts.UTC().Format("2006-01-02"), + ts.UTC().Hour(), + uuid.New().String()), + SizeBytes: 1000000, + ChunkTime: time.Now().UnixNano(), + Layer: "l1", + }) + } + + p := idx.Batch(ents, nil) + _, err := p.Get() + if err != nil { + panic(err) + } + fmt.Printf("Items saved: %d\n", len(ents)) +} + +func TestJSONSaveAndRM(t *testing.T) { + MergeConfigurations = [][3]int64{ + {10, 10 * 1024 * 1024, 1}, + } + idx := NewJSONIndex( + "redis://localhost:6379/0", + "default", + "test", + layers) + var ents []*IndexEntry + now := time.Now() + threeDaysAgo := now.Add(-3 * 24 * time.Hour) + + for ts := threeDaysAgo; ts.Before(now); ts = ts.Add(15 * time.Second) { + ents = append(ents, &IndexEntry{ + Database: "default", + Table: "test", + MinTime: ts.UnixNano(), + MaxTime: ts.Add(15 * time.Second).UnixNano(), + Path: fmt.Sprintf("date=%s/hour=%02d/%s.1.parquet", + ts.UTC().Format("2006-01-02"), + ts.UTC().Hour(), + uuid.New().String()), + SizeBytes: 1000000, + ChunkTime: time.Now().UnixNano(), + Layer: "l1", + }) + } + + p := idx.Batch(ents, nil) + _, err := p.Get() + if err != nil { + panic(err) + } + fmt.Printf("Items saved: %d\n", len(ents)) + + p = idx.Batch(nil, ents) + _, err = p.Get() + if err != nil { + panic(err) + } +} diff --git a/json_part_index.go b/json_part_index.go index 02ca9c0..3a5eff0 100644 --- a/json_part_index.go +++ b/json_part_index.go @@ -69,9 +69,13 @@ func newJsonPartIndex(opts jsonPartIdxOpts) (*jsonPartIndex, error) { filesInMerge: make(map[string]bool), layers: opts.layers, } + _, err := os.Stat(res.idxPath) + if os.IsNotExist(err) { + os.MkdirAll(res.idxPath, 0o755) + } res.updateCtx, res.doUpdate = context.WithCancel(context.Background()) res.workCtx, res.stop = context.WithCancel(context.Background()) - err := res.populate() + err = res.populate() return res, err } diff --git a/redis_index_test.go b/redis_index_test.go index 887fec6..890cf99 100644 --- a/redis_index_test.go +++ b/redis_index_test.go @@ -7,6 +7,10 @@ import ( "time" ) +var layers = []Layer{ + {"file://./_testdata", "l1", "fs", 20}, +} + func TestSave(t *testing.T) { MergeConfigurations = [][3]int64{ {10, 10 * 1024 * 1024, 1}, @@ -14,9 +18,8 @@ func TestSave(t *testing.T) { idx, err := NewRedisIndex( "redis://localhost:6379/0", "default", - "test", []Layer{ - {"file:///data", "l1", "fs", 20}, - }) + "test", + layers) if err != nil { t.Fatalf("Failed to create index: %v", err) } @@ -48,11 +51,15 @@ func TestSave(t *testing.T) { fmt.Printf("Items saved: %d\n", len(ents)) } -/*func TestSaveAndDel(t *testing.T) { +func TestSaveAndDel(t *testing.T) { MergeConfigurations = [][3]int64{ {10, 10 * 1024 * 1024, 1}, } - idx, err := NewRedisIndex("redis://localhost:6379/0", "default", "test") + idx, err := NewRedisIndex( + "redis://localhost:6379/0", + "default", + "test", + layers) if err != nil { t.Fatalf("Failed to create index: %v", err) } @@ -72,30 +79,66 @@ func TestSave(t *testing.T) { MaxTime: ts.Add(15 * time.Second).UnixNano(), Path: pth, SizeBytes: 1000000, + ChunkTime: time.Now().UnixNano(), + Layer: "l1", }) } - + start := time.Now() p := idx.Batch(ents, nil) _, err = p.Get() if err != nil { panic(err) } + fmt.Printf("%d items saved in %v\n", len(ents), time.Since(start)) time.Sleep(time.Second) + start = time.Now() p = idx.Batch(nil, ents) _, err = p.Get() if err != nil { panic(err) } - fmt.Printf("Items saved: %d\n", len(ents)) + fmt.Printf("%d items dropped in %v\n", len(ents), time.Since(start)) + + time.Sleep(time.Second) + + start = time.Now() + var drops []DropPlan + d, err := idx.GetDropPlanner().GetDropQueue("", "l1") + if err != nil { + t.Fatalf("Failed to get drop queue: %v", err) + return + } + for d.Path != "" { + d, err = idx.GetDropPlanner().GetDropQueue("", "l1") + if err != nil { + t.Fatalf("Failed to get drop queue: %v", err) + return + } + drops = append(drops, d) + } + fmt.Printf("Acquired drop of %d items in %v\n", len(drops), time.Since(start)) + start = time.Now() + for _, d := range drops[:200] { + _, err = idx.GetDropPlanner().RmFromDropQueue(d).Get() + if err != nil { + t.Fatalf("Failed to remove from drop queue: %v", err) + return + } + } + fmt.Printf("Processed 200 drop items in %v\n", time.Since(start)) } func TestRedisIndex2(t *testing.T) { MergeConfigurations = [][3]int64{ {10, 10 * 1024 * 1024, 1}, } - idx, err := NewRedisIndex("redis://localhost:6379/0", "default", "test") + idx, err := NewRedisIndex( + "redis://localhost:6379/0", + "default", + "test", + layers) if err != nil { t.Fatalf("Failed to create index: %v", err) } @@ -122,4 +165,4 @@ func TestRedisDBIndex(t *testing.T) { paths, err := idx.Paths("default", "test") fmt.Println("Paths:", paths) -}*/ +} diff --git a/redis_scripts/get_merge_plan.lua b/redis_scripts/get_merge_plan.lua index 33fc666..ca35711 100644 --- a/redis_scripts/get_merge_plan.lua +++ b/redis_scripts/get_merge_plan.lua @@ -21,7 +21,8 @@ local function process_idle_item() local merge_item_json = redis.call("LPOP", merge_key_idle) if merge_item_json then local merge_item = cjson.decode(merge_item_json) - if tonumber(merge_item.time) < current_time then + if tonumber(merge_item.time_s) > current_time then + redis.call("LPUSH", merge_key_idle, merge_item_json) return false end merge_item.time = current_time + 1800 -- 30m timeout to reprocess the dead items diff --git a/redis_scripts/patch_index.lua b/redis_scripts/patch_index.lua index 2b97caa..674118c 100644 --- a/redis_scripts/patch_index.lua +++ b/redis_scripts/patch_index.lua @@ -21,7 +21,7 @@ local function create_and_push_new_merge(merge_key, path, size) local current_time = redis.call("TIME")[1] local new_merge = cjson.encode({ id = generate_uuid(), - time = current_time, + time_s = current_time, paths = {path}, size = size }) @@ -54,14 +54,15 @@ local function delete_file(entry) redis.call("DEL", "folders:" .. entry.database .. ":" .. entry.table) end - local drop_queue_key = "drop:" .. entry.database .. ":".. entry.table .. ":".. entry.layer .. ":".. entry.writer_id + local drop_queue_key = "drop:" .. entry.database .. ":".. entry.table .. ":".. entry.layer .. ":".. + entry.writer_id .. ":idle" local new_drop = cjson.encode({ writer_id = entry.writer_id, layer = entry.layer, path = entry.path, database = entry.database, table = entry.table, - time_s = tonumber(redis.call("TIME")[1]) + 30 + time_s = tonumber(redis.call("TIME")[1]) + 1 }) redis.call("LPUSH", drop_queue_key, new_drop) diff --git a/redis_task_queue.go b/redis_task_queue.go index a10c928..0d25dfc 100644 --- a/redis_task_queue.go +++ b/redis_task_queue.go @@ -32,8 +32,12 @@ func (q *redisTaskQueue[T]) processEntry() (T, error) { if err != nil { return res, err } + if eStr == "" { + return res, nil + } err = json.Unmarshal([]byte(eStr.(string)), &res) + return res, err } From 2bea9447a27ef21b4c1df868f046ad10002fd86e Mon Sep 17 00:00:00 2001 From: akvlad Date: Thu, 5 Jun 2025 23:18:58 +0300 Subject: [PATCH 4/6] debug & test json index --- json_part_index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/json_part_index.go b/json_part_index.go index 3a5eff0..2e5997f 100644 --- a/json_part_index.go +++ b/json_part_index.go @@ -398,7 +398,7 @@ func (J *jsonPartIndex) flush() { onErr(err) return } - stream.WriteString(string(strD)) + stream.WriteRaw(string(strD)) } stream.WriteArrayEnd() From ea3e66e16de888db3d270d039e0fd30c8e187e32 Mon Sep 17 00:00:00 2001 From: akvlad Date: Tue, 10 Jun 2025 18:18:06 +0300 Subject: [PATCH 5/6] layers support debug --- json_db_index.go | 101 +++++++++++++++++++------------ json_index.go | 48 ++++++++++++--- json_part_merge_planner.go | 7 +++ json_part_move_planner.go | 4 +- redis_index.go | 24 +++++++- redis_merge_planner.go | 58 ++++++++++++++---- redis_move_planner.go | 4 +- redis_scripts/get_merge_plan.lua | 31 +++++----- redis_scripts/patch_index.lua | 69 +++++++++++++++++---- redis_task_queue.go | 4 +- types.go | 32 +++++++--- 11 files changed, 275 insertions(+), 107 deletions(-) diff --git a/json_db_index.go b/json_db_index.go index 0651f64..b39766b 100644 --- a/json_db_index.go +++ b/json_db_index.go @@ -9,58 +9,83 @@ import ( ) type jsonDBIndex struct { - root string + layers []jsonLayer } -/*func NewJSONDBIndex(root string) DBIndex { +func NewJSONDBIndex(layers []Layer) DBIndex { + jLayers := make([]jsonLayer, len(layers)) + for i, layer := range layers { + jLayers[i] = layer2JsonLayer(layer) + } return &jsonDBIndex{ - root: root, + layers: jLayers, } -}*/ +} func (j *jsonDBIndex) Databases() ([]string, error) { - ents, err := os.ReadDir(j.root) - if err != nil { - return nil, err - } - var res []string - for _, ent := range ents { - if ent.IsDir() { - res = append(res, ent.Name()) + res := map[string]bool{} + for _, l := range j.layers { + ents, err := os.ReadDir(path.Join(l.Path)) + if errors.Is(err, os.ErrNotExist) { + continue + } + if err != nil { + return nil, err } + for _, ent := range ents { + if ent.IsDir() { + res[ent.Name()] = true + } + } + } + _res := make([]string, 0, len(res)) + for k := range res { + _res = append(_res, k) } - return res, nil + return _res, nil } func (j *jsonDBIndex) Tables(database string) ([]string, error) { - ents, err := os.ReadDir(path.Join(j.root, database)) - if errors.Is(err, os.ErrNotExist) { - return nil, nil - } - if err != nil { - return nil, err - } - var res []string - for _, ent := range ents { - if ent.IsDir() { - res = append(res, ent.Name()) + res := map[string]bool{} + for _, l := range j.layers { + ents, err := os.ReadDir(path.Join(l.Path, database)) + if errors.Is(err, os.ErrNotExist) { + continue + } + if err != nil { + return nil, err + } + for _, ent := range ents { + if ent.IsDir() { + res[ent.Name()] = true + } } } - return res, nil + _res := make([]string, 0, len(res)) + for k := range res { + _res = append(_res, k) + } + return _res, nil } -func (j *jsonDBIndex) Paths(database string, table string) []string { - root := path.Join(j.root, database, table) - var res []string - filepath.Walk(path.Join(j.root, database, table), func(path string, info os.FileInfo, err error) error { - if !info.IsDir() { +func (j *jsonDBIndex) Paths(database string, table string) ([]string, error) { + res := map[string]bool{} + for _, l := range j.layers { + root := path.Join(l.Path, database, table) + filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if !info.IsDir() { + return nil + } + if strings.HasPrefix(info.Name(), "hour=") { + res[path[len(root)+1:]] = true + return filepath.SkipDir + } return nil - } - if strings.HasPrefix(info.Name(), "hour=") { - res = append(res, path[len(root):]) - return filepath.SkipDir - } - return nil - }) - return res + }) + } + _res := make([]string, 0, len(res)) + for k := range res { + _res = append(_res, k) + } + return _res, nil } diff --git a/json_index.go b/json_index.go index 3593b8e..e4632ff 100644 --- a/json_index.go +++ b/json_index.go @@ -2,6 +2,7 @@ package metadata import ( "fmt" + "io/fs" "os" "path" "path/filepath" @@ -36,18 +37,42 @@ type JSONIndex struct { layers []jsonLayer } -func NewJSONIndex(root string, database string, table string, layers []Layer) TableIndex { +func NewJSONIndex(root string, database string, table string, layers []Layer) (TableIndex, error) { var jLayers []jsonLayer for _, layer := range layers { jLayers = append(jLayers, layer2JsonLayer(layer)) } - return &JSONIndex{ + res := &JSONIndex{ root: root, database: database, table: table, parts: map[string]map[string]*jsonPartIndex{}, layers: jLayers, } + for _, layer := range jLayers { + prefix := filepath.Join(layer.Path, database, table, "data") + err := filepath.Walk(prefix, func(path string, info fs.FileInfo, err error) error { + if info == nil { + return nil + } + if !info.IsDir() { + return nil + } + metadataPath := filepath.Join(path, "metadata.json") + if _, err := os.Stat(metadataPath); !os.IsNotExist(err) { + _, err := res.populate(layer.Name, path[len(prefix)+1:]) + if err != nil { + return err + } + return filepath.SkipDir + } + return nil + }) + if err != nil { + return nil, err + } + } + return res, nil } func (J *JSONIndex) GetQuerier() TableQuerier { @@ -92,7 +117,7 @@ func (J *JSONIndex) batchLayer(layer string, add []*IndexEntry, rm []*IndexEntry var promises []Promise[int32] for partPath := range paths { - idx, err := J.populate(partPath, layer) + idx, err := J.populate(layer, partPath) if err != nil { return Fulfilled[int32](err, 0) } @@ -101,7 +126,7 @@ func (J *JSONIndex) batchLayer(layer string, add []*IndexEntry, rm []*IndexEntry return NewWaitForAll[int32](promises) } -func (J *JSONIndex) populate(dir string, layer string) (*jsonPartIndex, error) { +func (J *JSONIndex) populate(layer string, dir string) (*jsonPartIndex, error) { layerParts := J.parts[layer] if layerParts == nil { layerParts = make(map[string]*jsonPartIndex) @@ -163,9 +188,12 @@ func (J *JSONIndex) Stop() { } } -func (J *JSONIndex) findHours(options QueryOptions) ([]time.Time, error) { +func (J *JSONIndex) findHours(options QueryOptions, layer jsonLayer) ([]time.Time, error) { var hours []time.Time - err := filepath.Walk(path.Join(J.root, J.database, J.table), func(path string, info os.FileInfo, err error) error { + err := filepath.Walk(path.Join(layer.Path, J.database, J.table, "data"), func(path string, info os.FileInfo, err error) error { + if info == nil { + return nil + } if !info.IsDir() { return nil } @@ -216,15 +244,15 @@ func (J *JSONIndex) findHours(options QueryOptions) ([]time.Time, error) { } func (J *JSONIndex) Query(options QueryOptions) ([]*IndexEntry, error) { - hours, err := J.findHours(options) - if err != nil { - return nil, err - } var entries []*IndexEntry for _, l := range J.layers { if l.Path == "" { continue } + hours, err := J.findHours(options, l) + if err != nil { + return nil, err + } for _, hour := range hours { idx, err := J.populate(l.Name, path.Join( fmt.Sprintf("date=%s", hour.Format("2006-01-02")), diff --git a/json_part_merge_planner.go b/json_part_merge_planner.go index a2105d5..c088441 100644 --- a/json_part_merge_planner.go +++ b/json_part_merge_planner.go @@ -5,6 +5,7 @@ import ( "github.com/google/uuid" "path" "strings" + "time" ) func (J *jsonPartIndex) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error) { @@ -15,6 +16,7 @@ func (J *jsonPartIndex) GetMergePlan(writerId string, layer string, iteration in return MergePlan{}, fmt.Errorf("no more merge configurations available for iteration %d", iteration) } conf := MergeConfigurations[iteration-1] + now := time.Now() J.m.Lock() defer J.m.Unlock() J.entries.Range(func(key, value interface{}) bool { @@ -25,9 +27,13 @@ func (J *jsonPartIndex) GetMergePlan(writerId string, layer string, iteration in if J.filesInMerge[entry.Path] { return true } + if entry.ChunkTime+conf.TimeoutSec()*1000000000 >= now.UnixNano() { + return true + } if size > conf[1] { return false } + from = append(from, entry.Path) size += entry.SizeBytes return true @@ -40,6 +46,7 @@ func (J *jsonPartIndex) GetMergePlan(writerId string, layer string, iteration in tablePath := path.Join(J.rootPath, J.database, J.table, "data") + "/" partPath := J.idxPath[len(tablePath):] return MergePlan{ + Layer: layer, Database: J.database, Table: J.table, From: from, diff --git a/json_part_move_planner.go b/json_part_move_planner.go index 85c9797..f5e7912 100644 --- a/json_part_move_planner.go +++ b/json_part_move_planner.go @@ -5,7 +5,6 @@ import ( ) func (J *jsonPartIndex) GetMovePlan(writerId string, layer string) (MovePlan, error) { - J.m.Lock() defer J.m.Unlock() var plan *MovePlan @@ -37,6 +36,9 @@ func (J *jsonPartIndex) GetMovePlan(writerId string, layer string) (MovePlan, er } return true }) + if plan == nil { + return MovePlan{}, nil + } return *plan, nil } diff --git a/redis_index.go b/redis_index.go index 5be3e1d..f795638 100644 --- a/redis_index.go +++ b/redis_index.go @@ -15,6 +15,11 @@ import ( "time" ) +type redisLayer struct { + Layer + LayerTo string `json:"layer_to"` +} + type redisIndexEntry struct { IndexEntry StrMinTime string `json:"str_min_time"` @@ -58,7 +63,7 @@ type RedisIndex struct { database string table string - layers []Layer + layers []redisLayer } func getRedisClient(u *url.URL) (*redis.Client, error) { @@ -97,11 +102,24 @@ func NewRedisIndex(URL string, database string, table string, layers []Layer) (T if err != nil { return nil, err } + + redisLayers := make([]redisLayer, len(layers)) + for i, layer := range layers { + layerTo := "" + if i < len(layers)-1 { + layerTo = layers[i+1].Name + } + redisLayers = append(redisLayers, redisLayer{ + Layer: layer, + LayerTo: layerTo, + }) + } + idx := &RedisIndex{ url: u, database: database, table: table, - layers: layers, + layers: redisLayers, } client, err := getRedisClient(u) @@ -161,7 +179,7 @@ func (r *RedisIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32] { res.Done(0, err) return res } - lmap := make(map[string]Layer) + lmap := make(map[string]redisLayer) for _, layer := range r.layers { lmap[layer.Name] = layer } diff --git a/redis_merge_planner.go b/redis_merge_planner.go index 48e889b..913e4e4 100644 --- a/redis_merge_planner.go +++ b/redis_merge_planner.go @@ -1,10 +1,13 @@ package metadata import ( + "context" "fmt" "github.com/google/uuid" "path/filepath" + "slices" "strconv" + "strings" ) type redisMergePlan struct { @@ -18,16 +21,44 @@ func (r redisMergePlan) Id() string { } func (r *RedisIndex) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error) { - plan, err := (&redisTaskQueue[redisMergePlan]{ - prefix: "merge:", - database: r.database, - table: r.table, - suffix: strconv.Itoa(iteration), - writerId: writerId, - layer: layer, - getEntrySHA: r.getMergePlanSha, - redis: r.c, - }).processEntry() + mergePattern := fmt.Sprintf("merge:%s:%s:%d:*:%s:%s:*", r.database, r.table, iteration, layer, writerId) + var keys []string + err := redisScan(func(cursor uint64) (uint64, error) { + _keys, cursor, err := r.c.Scan(context.Background(), cursor, mergePattern, 1000).Result() + if err != nil { + return 0, err + } + keys = append(keys, _keys...) + return cursor, nil + }) + if err != nil { + return MergePlan{}, err + } + slices.Sort(keys) + var plan redisMergePlan + for _, k := range keys { + parts := strings.SplitN(k, ":", 6) + if len(parts) < 6 { + continue + } + dir := parts[4] + plan, err = (&redisTaskQueue[redisMergePlan]{ + prefix: "merge", + database: r.database, + table: r.table, + suffix: strconv.Itoa(iteration) + ":" + dir, + writerId: writerId, + layer: layer, + getEntrySHA: r.getMergePlanSha, + redis: r.c, + }).processEntry() + if err != nil { + return MergePlan{}, err + } + if len(plan.Paths) > 0 { + break + } + } if len(plan.Paths) == 0 { return MergePlan{}, nil @@ -49,16 +80,19 @@ func (r *RedisIndex) GetMergePlan(writerId string, layer string, iteration int) } func (r *RedisIndex) EndMerge(plan MergePlan) Promise[int32] { + fmt.Println("removing merge plan from Redis: ", plan.ID) + dir := filepath.Dir(plan.To) err := (&redisTaskQueue[redisMergePlan]{ - prefix: "merge:", + prefix: "merge", database: r.database, table: r.table, - suffix: strconv.Itoa(plan.Iteration), + suffix: strconv.Itoa(plan.Iteration) + ":" + dir, writerId: plan.WriterID, layer: plan.Layer, getEntrySHA: r.getMergePlanSha, redis: r.c, }).finishProcess(redisMergePlan{ID: plan.ID}) + fmt.Println("removing merge plan from Redis ok") return Fulfilled(err, int32(0)) } diff --git a/redis_move_planner.go b/redis_move_planner.go index d1dba2f..45a6d9f 100644 --- a/redis_move_planner.go +++ b/redis_move_planner.go @@ -6,7 +6,7 @@ func (r *RedisIndex) GetMovePlanner() TableMovePlanner { func (r *RedisIndex) GetMovePlan(writerId string, layer string) (MovePlan, error) { return (&redisTaskQueue[MovePlan]{ - prefix: "move:", + prefix: "move", database: r.database, table: r.table, suffix: "", @@ -19,7 +19,7 @@ func (r *RedisIndex) GetMovePlan(writerId string, layer string) (MovePlan, error func (r *RedisIndex) EndMove(plan MovePlan) Promise[int32] { return Fulfilled((&redisTaskQueue[MovePlan]{ - prefix: "move:", + prefix: "move", database: r.database, table: r.table, suffix: "", diff --git a/redis_scripts/get_merge_plan.lua b/redis_scripts/get_merge_plan.lua index ca35711..6b06d3c 100644 --- a/redis_scripts/get_merge_plan.lua +++ b/redis_scripts/get_merge_plan.lua @@ -21,11 +21,11 @@ local function process_idle_item() local merge_item_json = redis.call("LPOP", merge_key_idle) if merge_item_json then local merge_item = cjson.decode(merge_item_json) - if tonumber(merge_item.time_s) > current_time then + if merge_item.time_s > current_time then redis.call("LPUSH", merge_key_idle, merge_item_json) return false end - merge_item.time = current_time + 1800 -- 30m timeout to reprocess the dead items + merge_item.time_s = current_time + 1800 -- 30m timeout to reprocess the dead items local updated_item_json = cjson.encode(merge_item) -- Push the updated item to the processing list @@ -38,23 +38,20 @@ end -- Function to process a processing item local function process_processing_item() - local processing_item_json = redis.call("LINDEX", merge_key_processing, 0) - - if processing_item_json then - local processing_item = cjson.decode(processing_item_json) - - -- Check if the first item in processing is older than the timeout - if current_time > tonumber(processing_item.time) then - -- Remove the item from the processing list - redis.call("LPOP", merge_key_processing) + local merge_item_json = redis.call("LPOP", merge_key_processing) + if merge_item_json then + local merge_item = cjson.decode(merge_item_json) + if merge_item.time_s > current_time then + redis.call("LPUSH", merge_key_processing, merge_item_json) + return false + end + merge_item.time_s = current_time + 1800 -- 30m timeout to reprocess the dead items + local updated_item_json = cjson.encode(merge_item) - -- Update the time and re-add to processing - processing_item.time = current_time - local updated_item_json = cjson.encode(processing_item) - redis.call("RPUSH", merge_key_processing, updated_item_json) + -- Push the updated item to the processing list + redis.call("RPUSH", merge_key_processing, updated_item_json) - return updated_item_json - end + return updated_item_json end return false end diff --git a/redis_scripts/patch_index.lua b/redis_scripts/patch_index.lua index 674118c..ea81f12 100644 --- a/redis_scripts/patch_index.lua +++ b/redis_scripts/patch_index.lua @@ -1,12 +1,14 @@ local merge_conf = cjson.decode(KEYS[1]) local move_conf = cjson.decode(KEYS[2]) +math.randomseed(tonumber(redis.call('TIME')[1]) * 1000 + + tonumber(redis.call('TIME')[2]) / 1000) -- Seed the random number generator with the current time + -- Function to generate a pseudo-UUID local function generate_uuid() - local random = math.random local template ='xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx' return string.gsub(template, '[xy]', function (c) - local v = (c == 'x') and random(0, 0xf) or random(8, 0xb) + local v = (c == 'x') and math.random(0, 0xf) or math.random(8, 0xb) return string.format('%x', v) end) end @@ -17,11 +19,12 @@ local function get_dir(path) end -- Function to create and push a new merge object -local function create_and_push_new_merge(merge_key, path, size) - local current_time = redis.call("TIME")[1] +local function create_and_push_new_merge(merge_key, path, size, index) + local current_time = tonumber(redis.call("TIME")[1]) + local merge_ttl_s = merge_conf[index][1] local new_merge = cjson.encode({ id = generate_uuid(), - time_s = current_time, + time_s = current_time + merge_ttl_s, paths = {path}, size = size }) @@ -57,25 +60,27 @@ local function delete_file(entry) local drop_queue_key = "drop:" .. entry.database .. ":".. entry.table .. ":".. entry.layer .. ":".. entry.writer_id .. ":idle" local new_drop = cjson.encode({ + id = generate_uuid(), writer_id = entry.writer_id, layer = entry.layer, path = entry.path, database = entry.database, table = entry.table, - time_s = tonumber(redis.call("TIME")[1]) + 1 + time_s = tonumber(redis.call("TIME")[1]) + 30 }) - redis.call("LPUSH", drop_queue_key, new_drop) + redis.call("RPUSH", drop_queue_key, new_drop) return {success = true} end local function merge_entry(entry, index) - local merge_key = "merge:" .. entry.database .. ":" .. entry.table .. ":" .. index .. ":" .. entry.layer .. ":" .. entry.writer_id .. ":idle" + local dir = get_dir(entry.path) + local merge_key = "merge:" .. entry.database .. ":" .. entry.table .. ":" .. index .. ":" .. dir .. ":" .. entry.layer .. ":" .. entry.writer_id .. ":idle" local last_merge = redis.call("LINDEX", merge_key, -1) if not last_merge then -- Create and push a new merge object - create_and_push_new_merge(merge_key, entry.path, entry.size_bytes) + create_and_push_new_merge(merge_key, entry.path, entry.size_bytes, index) return {success = true} end @@ -84,7 +89,7 @@ local function merge_entry(entry, index) if last_merge_data.size + entry.size_bytes > tonumber(merge_conf[index][2]) then -- Create and push a new merge object - create_and_push_new_merge(merge_key, entry.path, entry.size_bytes) + create_and_push_new_merge(merge_key, entry.path, entry.size_bytes, index) return {success = true} end @@ -97,8 +102,29 @@ local function merge_entry(entry, index) end local function move_entry(entry) - local move_key = "move:".. entry.database.. ":".. entry.table.. ":".. entry.layer_from .. ":".. entry.writer_id - redis.call("LPUSH", move_key, cjson.encode(entry)) + local move_key = "move:".. entry.database.. ":".. entry.table.. ":".. entry.layer .. ":".. entry.writer_id .. ":idle" + + -- Extract parts of the path + local folder, uuid, iteration = string.match(entry.path, "(.+)/([^/.]+)%.(%d+)%.parquet$") + + -- Generate a new UUID for the destination path + local new_uuid = generate_uuid() + + -- Construct the new path + local path_to = folder .. "/" .. new_uuid .. "." .. iteration .. ".parquet" + local move_entry = { + id = generate_uuid(), + writer_id = entry.writer_id, + database = entry.database, + table = entry.table, + path_from = entry.path, + layer_from = entry.layer, + path_to = path_to, + layer_to = move_conf[entry.layer].layer_to, + time_s = entry.time_s + } + redis.call("RPUSH", move_key, cjson.encode(move_entry)) + return {success = true} end -- Function to process a single file @@ -122,7 +148,22 @@ local function process_file(entry) local main_key = hash_key(entry) redis.call("HSET", main_key, entry.path, cjson.encode(entry)) - if index_num > #KEYS then + local merge_ttl = -1 + local move_ttl = -1 + if index_num <= #merge_conf then + merge_ttl = merge_conf[index_num][1] + end + if move_conf[entry.layer].ttl_sec > 0 then + move_ttl = move_conf[entry.layer].ttl_sec + end + + if merge_ttl ~= -1 and move_ttl == -1 then + return merge_entry(entry, index_num) + end + if move_ttl ~= -1 and merge_ttl == -1 then + return move_entry(entry) + end + if merge_ttl == -1 and move_ttl == -1 then return {success = true} end @@ -131,8 +172,10 @@ local function process_file(entry) local move_time_s = chunk_time_s + move_conf[entry.layer].ttl_sec if move_conf[entry.layer].ttl_sec == 0 or merge_time_s <= move_time_s then + entry.time_s = merge_time_s return merge_entry(entry, index_num) end + entry.time_s = move_time_s return move_entry(entry) end diff --git a/redis_task_queue.go b/redis_task_queue.go index 0d25dfc..ffb90f2 100644 --- a/redis_task_queue.go +++ b/redis_task_queue.go @@ -47,7 +47,7 @@ func (q *redisTaskQueue[T]) finishProcess(entry T) error { suffix += ":" } key := fmt.Sprintf("%s:%s:%s:%s%s:%s:processing", - q.prefix, q.database, q.table, q.suffix, q.layer, q.writerId) + q.prefix, q.database, q.table, suffix, q.layer, q.writerId) strQ, err := q.redis.LRange(context.Background(), key, 0, -1).Result() if err != nil { return err @@ -56,10 +56,12 @@ func (q *redisTaskQueue[T]) finishProcess(entry T) error { var _e T err = json.Unmarshal([]byte(e), &_e) if err != nil { + fmt.Println("Error unmarshalling entry:", err) continue } if _e.Id() == entry.Id() { _, err := q.redis.LRem(context.Background(), key, 1, e).Result() + fmt.Println("Removed entry from processing queue:", _e.Id()) return err } } diff --git a/types.go b/types.go index 19b9417..093756e 100644 --- a/types.go +++ b/types.go @@ -7,9 +7,21 @@ import ( // MergeConfiguration is array of arrays of: // [[timeout_sec, max_size, merge_iteration_id], ...] // You have to init MergeConfigurations in the very beginning -type MergeConfigurationsConf [][3]int64 +type MergeConfigurationsConf [3]int64 -var MergeConfigurations MergeConfigurationsConf +func (m MergeConfigurationsConf) TimeoutSec() int64 { + return m[0] +} + +func (m MergeConfigurationsConf) MaxSize() int64 { + return m[1] +} + +func (m MergeConfigurationsConf) MergeIterationId() int64 { + return m[2] +} + +var MergeConfigurations []MergeConfigurationsConf type Layer struct { URL string `json:"url"` @@ -60,14 +72,14 @@ func (m MergePlan) Id() string { } type MovePlan struct { - ID string - WriterID string - Database string - Table string - PathFrom string - LayerFrom string - PathTo string - LayerTo string + ID string `json:"id"` + WriterID string `json:"writer_id"` + Database string `json:"database"` + Table string `json:"table"` + PathFrom string `json:"path_from"` + LayerFrom string `json:"layer_from"` + PathTo string `json:"path_to"` + LayerTo string `json:"layer_to"` } func (m MovePlan) Id() string { From 9c0c2da7db68bfa3fd4d5999979d2a9e9cf8347a Mon Sep 17 00:00:00 2001 From: akvlad Date: Tue, 10 Jun 2025 19:00:48 +0300 Subject: [PATCH 6/6] debug tests --- json_index_test.go | 22 ++++++++++++++-------- redis_index_test.go | 10 +++++----- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/json_index_test.go b/json_index_test.go index 53b4f9a..1dc7822 100644 --- a/json_index_test.go +++ b/json_index_test.go @@ -8,14 +8,17 @@ import ( ) func TestJSONSave(t *testing.T) { - MergeConfigurations = [][3]int64{ + MergeConfigurations = []MergeConfigurationsConf{ {10, 10 * 1024 * 1024, 1}, } - idx := NewJSONIndex( - "redis://localhost:6379/0", + idx, err := NewJSONIndex( + "_testdata", "default", "test", layers) + if err != nil { + panic(err) + } var ents []*IndexEntry now := time.Now() threeDaysAgo := now.Add(-3 * 24 * time.Hour) @@ -37,7 +40,7 @@ func TestJSONSave(t *testing.T) { } p := idx.Batch(ents, nil) - _, err := p.Get() + _, err = p.Get() if err != nil { panic(err) } @@ -45,14 +48,17 @@ func TestJSONSave(t *testing.T) { } func TestJSONSaveAndRM(t *testing.T) { - MergeConfigurations = [][3]int64{ + MergeConfigurations = []MergeConfigurationsConf{ {10, 10 * 1024 * 1024, 1}, } - idx := NewJSONIndex( - "redis://localhost:6379/0", + idx, err := NewJSONIndex( + "_testdata", "default", "test", layers) + if err != nil { + panic(err) + } var ents []*IndexEntry now := time.Now() threeDaysAgo := now.Add(-3 * 24 * time.Hour) @@ -74,7 +80,7 @@ func TestJSONSaveAndRM(t *testing.T) { } p := idx.Batch(ents, nil) - _, err := p.Get() + _, err = p.Get() if err != nil { panic(err) } diff --git a/redis_index_test.go b/redis_index_test.go index 890cf99..828d51b 100644 --- a/redis_index_test.go +++ b/redis_index_test.go @@ -12,7 +12,7 @@ var layers = []Layer{ } func TestSave(t *testing.T) { - MergeConfigurations = [][3]int64{ + MergeConfigurations = []MergeConfigurationsConf{ {10, 10 * 1024 * 1024, 1}, } idx, err := NewRedisIndex( @@ -52,7 +52,7 @@ func TestSave(t *testing.T) { } func TestSaveAndDel(t *testing.T) { - MergeConfigurations = [][3]int64{ + MergeConfigurations = []MergeConfigurationsConf{ {10, 10 * 1024 * 1024, 1}, } idx, err := NewRedisIndex( @@ -101,7 +101,7 @@ func TestSaveAndDel(t *testing.T) { } fmt.Printf("%d items dropped in %v\n", len(ents), time.Since(start)) - time.Sleep(time.Second) + time.Sleep(time.Second * 30) start = time.Now() var drops []DropPlan @@ -131,7 +131,7 @@ func TestSaveAndDel(t *testing.T) { } func TestRedisIndex2(t *testing.T) { - MergeConfigurations = [][3]int64{ + MergeConfigurations = []MergeConfigurationsConf{ {10, 10 * 1024 * 1024, 1}, } idx, err := NewRedisIndex( @@ -150,7 +150,7 @@ func TestRedisIndex2(t *testing.T) { } func TestRedisDBIndex(t *testing.T) { - MergeConfigurations = [][3]int64{ + MergeConfigurations = []MergeConfigurationsConf{ {10, 10 * 1024 * 1024, 1}, } idx, err := NewRedisDbIndex("redis://localhost:6379/0")