diff --git a/.gitignore b/.gitignore index 85e7c1d..637018e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /.idea/ +_testdata \ No newline at end of file diff --git a/json_db_index.go b/json_db_index.go index 0651f64..b39766b 100644 --- a/json_db_index.go +++ b/json_db_index.go @@ -9,58 +9,83 @@ import ( ) type jsonDBIndex struct { - root string + layers []jsonLayer } -/*func NewJSONDBIndex(root string) DBIndex { +func NewJSONDBIndex(layers []Layer) DBIndex { + jLayers := make([]jsonLayer, len(layers)) + for i, layer := range layers { + jLayers[i] = layer2JsonLayer(layer) + } return &jsonDBIndex{ - root: root, + layers: jLayers, } -}*/ +} func (j *jsonDBIndex) Databases() ([]string, error) { - ents, err := os.ReadDir(j.root) - if err != nil { - return nil, err - } - var res []string - for _, ent := range ents { - if ent.IsDir() { - res = append(res, ent.Name()) + res := map[string]bool{} + for _, l := range j.layers { + ents, err := os.ReadDir(path.Join(l.Path)) + if errors.Is(err, os.ErrNotExist) { + continue + } + if err != nil { + return nil, err } + for _, ent := range ents { + if ent.IsDir() { + res[ent.Name()] = true + } + } + } + _res := make([]string, 0, len(res)) + for k := range res { + _res = append(_res, k) } - return res, nil + return _res, nil } func (j *jsonDBIndex) Tables(database string) ([]string, error) { - ents, err := os.ReadDir(path.Join(j.root, database)) - if errors.Is(err, os.ErrNotExist) { - return nil, nil - } - if err != nil { - return nil, err - } - var res []string - for _, ent := range ents { - if ent.IsDir() { - res = append(res, ent.Name()) + res := map[string]bool{} + for _, l := range j.layers { + ents, err := os.ReadDir(path.Join(l.Path, database)) + if errors.Is(err, os.ErrNotExist) { + continue + } + if err != nil { + return nil, err + } + for _, ent := range ents { + if ent.IsDir() { + res[ent.Name()] = true + } } } - return res, nil + _res := make([]string, 0, len(res)) + for k := range res { + _res = append(_res, k) + } + return _res, nil } -func (j *jsonDBIndex) Paths(database string, table string) []string { - root := path.Join(j.root, database, table) - var res []string - filepath.Walk(path.Join(j.root, database, table), func(path string, info os.FileInfo, err error) error { - if !info.IsDir() { +func (j *jsonDBIndex) Paths(database string, table string) ([]string, error) { + res := map[string]bool{} + for _, l := range j.layers { + root := path.Join(l.Path, database, table) + filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if !info.IsDir() { + return nil + } + if strings.HasPrefix(info.Name(), "hour=") { + res[path[len(root)+1:]] = true + return filepath.SkipDir + } return nil - } - if strings.HasPrefix(info.Name(), "hour=") { - res = append(res, path[len(root):]) - return filepath.SkipDir - } - return nil - }) - return res + }) + } + _res := make([]string, 0, len(res)) + for k := range res { + _res = append(_res, k) + } + return _res, nil } diff --git a/json_drop_planner.go b/json_drop_planner.go new file mode 100644 index 0000000..ff4bb57 --- /dev/null +++ b/json_drop_planner.go @@ -0,0 +1,37 @@ +package metadata + +import "path" + +func (J *JSONIndex) GetDropQueue(writerId string, layer string) (DropPlan, error) { + parts := J.parts[layer] + if parts == nil { + return DropPlan{}, nil + } + for _, idx := range parts { + p, err := idx.GetDropPlanner().GetDropQueue(writerId, layer) + if err != nil { + return DropPlan{}, err + } + if p.Path != "" { + return p, nil + } + } + return DropPlan{}, nil +} + +func (J *JSONIndex) GetDropPlanner() TableDropPlanner { + return J +} + +func (J *JSONIndex) RmFromDropQueue(plan DropPlan) Promise[int32] { + l := J.parts[plan.Layer] + if l == nil { + return Fulfilled(nil, int32(0)) + } + dir := path.Dir(plan.Path) + part := l[dir] + if part != nil { + return part.RmFromDropQueue(plan) + } + return Fulfilled(nil, int32(0)) +} diff --git a/json_index.go b/json_index.go index 8f4f816..e4632ff 100644 --- a/json_index.go +++ b/json_index.go @@ -2,6 +2,7 @@ package metadata import ( "fmt" + "io/fs" "os" "path" "path/filepath" @@ -11,55 +12,67 @@ import ( "time" ) +type jsonLayer struct { + Layer + Path string +} + +func layer2JsonLayer(layer Layer) jsonLayer { + path := "" + if strings.HasPrefix(layer.URL, "file://") { + path = strings.TrimPrefix(layer.URL, "file://") + } + return jsonLayer{ + Layer: layer, + Path: path, + } +} + type JSONIndex struct { root string database string table string lock sync.Mutex - parts map[string]*jsonPartIndex + parts map[string]map[string]*jsonPartIndex + layers []jsonLayer } -func NewJSONIndex(root string, database string, table string) TableIndex { - return &JSONIndex{ +func NewJSONIndex(root string, database string, table string, layers []Layer) (TableIndex, error) { + var jLayers []jsonLayer + for _, layer := range layers { + jLayers = append(jLayers, layer2JsonLayer(layer)) + } + res := &JSONIndex{ root: root, database: database, table: table, - parts: map[string]*jsonPartIndex{}, + parts: map[string]map[string]*jsonPartIndex{}, + layers: jLayers, } -} - -func (J *JSONIndex) GetMergePlan(layer string, iteration int) (*MergePlan, error) { - J.lock.Lock() - defer J.lock.Unlock() - for _, part := range J.parts { - plan, err := part.GetMergePlan(layer, iteration) + for _, layer := range jLayers { + prefix := filepath.Join(layer.Path, database, table, "data") + err := filepath.Walk(prefix, func(path string, info fs.FileInfo, err error) error { + if info == nil { + return nil + } + if !info.IsDir() { + return nil + } + metadataPath := filepath.Join(path, "metadata.json") + if _, err := os.Stat(metadataPath); !os.IsNotExist(err) { + _, err := res.populate(layer.Name, path[len(prefix)+1:]) + if err != nil { + return err + } + return filepath.SkipDir + } + return nil + }) if err != nil { return nil, err } - if plan == nil || len(plan.From) == 0 { - continue - } - return plan, nil - } - return nil, nil -} - -func (J *JSONIndex) EndMerge(plan *MergePlan) error { - if len(plan.From) == 0 { - return nil - } - J.lock.Lock() - defer J.lock.Unlock() - dir := path.Dir(plan.From[0]) - part := J.parts[dir] - if part != nil { - return part.EndMerge(plan) } - return nil -} - -func (J *JSONIndex) GetMergePlanner() TableMergePlanner { - return J + return res, nil } func (J *JSONIndex) GetQuerier() TableQuerier { @@ -67,6 +80,27 @@ func (J *JSONIndex) GetQuerier() TableQuerier { } func (J *JSONIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32] { + J.lock.Lock() + defer J.lock.Unlock() + addByLayer := make(map[string][]*IndexEntry) + rmByLayer := make(map[string][]*IndexEntry) + layers := make(map[string]bool) + for _, entry := range add { + addByLayer[entry.Layer] = append(addByLayer[entry.Layer], entry) + layers[entry.Layer] = true + } + for _, entry := range rm { + rmByLayer[entry.Layer] = append(rmByLayer[entry.Layer], entry) + layers[entry.Layer] = true + } + var promises []Promise[int32] + for l := range layers { + promises = append(promises, J.batchLayer(l, addByLayer[l], rmByLayer[l])) + } + return NewWaitForAll[int32](promises) +} + +func (J *JSONIndex) batchLayer(layer string, add []*IndexEntry, rm []*IndexEntry) Promise[int32] { addByPath := make(map[string][]*IndexEntry) rmByPath := make(map[string][]*IndexEntry) paths := make(map[string]bool) @@ -81,87 +115,85 @@ func (J *JSONIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32] { paths[_path] = true } - J.lock.Lock() - defer J.lock.Unlock() - var promises []Promise[int32] for partPath := range paths { - idx, err := J.populate(partPath) + idx, err := J.populate(layer, partPath) if err != nil { - //TODO: we should do something with the error - continue + return Fulfilled[int32](err, 0) } promises = append(promises, idx.Batch(addByPath[partPath], rmByPath[partPath])) } return NewWaitForAll[int32](promises) } -func (J *JSONIndex) populate(dir string) (*jsonPartIndex, error) { - idx := J.parts[dir] +func (J *JSONIndex) populate(layer string, dir string) (*jsonPartIndex, error) { + layerParts := J.parts[layer] + if layerParts == nil { + layerParts = make(map[string]*jsonPartIndex) + J.parts[layer] = layerParts + } + idx := layerParts[dir] + var _layer *jsonLayer + for _, l := range J.layers { + if l.Name == layer { + _layer = &l + break + } + } + if _layer == nil { + return nil, fmt.Errorf("layer \"%s\" not found", layer) + } + if _layer.Path == "" { + return nil, fmt.Errorf("layer path \"%s\" not supported", _layer.URL) + } + if idx != nil { return idx, nil } - idx, err := newJsonPartIndex(J.root, J.database, J.table, dir) + idx, err := newJsonPartIndex(jsonPartIdxOpts{ + rootPath: _layer.Path, + database: J.database, + table: J.table, + partPath: dir, + layers: J.layers, + layer: layer, + }) if err != nil { return nil, err } idx.Run() - J.parts[dir] = idx + layerParts[dir] = idx return idx, nil } -func (J *JSONIndex) Get(_path string) *IndexEntry { +func (J *JSONIndex) Get(layer string, _path string) *IndexEntry { dir := path.Dir(_path) J.lock.Lock() defer J.lock.Unlock() - idx, err := J.populate(dir) + idx, err := J.populate(layer, dir) if err != nil { return nil } - return idx.Get(_path) + return idx.Get(layer, _path) } func (J *JSONIndex) Run() { } func (J *JSONIndex) Stop() { - for _, idx := range J.parts { - idx.Stop() - } -} - -func (J *JSONIndex) RmFromDropQueue(files []string) Promise[int32] { - filesByPath := make(map[string][]string) - for _, file := range files { - _path := path.Dir(file) - filesByPath[_path] = append(filesByPath[_path], file) - } - J.lock.Lock() - defer J.lock.Unlock() - - var promises []Promise[int32] - for partPath, files := range filesByPath { - idx, err := J.populate(partPath) - if err != nil { - //TODO: we should do something with the error - continue + for _, l := range J.parts { + for _, idx := range l { + idx.Stop() } - promises = append(promises, idx.RmFromDropQueue(files)) } - return NewWaitForAll[int32](promises) } -func (J *JSONIndex) GetDropQueue() []string { - var queue []string - for _, idx := range J.parts { - queue = append(queue, idx.GetDropQueue()...) - } - return queue -} - -func (J *JSONIndex) findHours(options QueryOptions) ([]time.Time, error) { +func (J *JSONIndex) findHours(options QueryOptions, layer jsonLayer) ([]time.Time, error) { var hours []time.Time - err := filepath.Walk(path.Join(J.root, J.database, J.table), func(path string, info os.FileInfo, err error) error { + err := filepath.Walk(path.Join(layer.Path, J.database, J.table, "data"), func(path string, info os.FileInfo, err error) error { + if info == nil { + return nil + } if !info.IsDir() { return nil } @@ -212,23 +244,29 @@ func (J *JSONIndex) findHours(options QueryOptions) ([]time.Time, error) { } func (J *JSONIndex) Query(options QueryOptions) ([]*IndexEntry, error) { - hours, err := J.findHours(options) - if err != nil { - return nil, err - } var entries []*IndexEntry - for _, hour := range hours { - idx, err := J.populate(path.Join( - fmt.Sprintf("date=%s", hour.Format("2006-01-02")), - fmt.Sprintf("hour=%02d", hour.Hour()))) - if err != nil { - return nil, err + for _, l := range J.layers { + if l.Path == "" { + continue } - _entries, err := idx.Query(options) + hours, err := J.findHours(options, l) if err != nil { return nil, err } - entries = append(entries, _entries...) + for _, hour := range hours { + idx, err := J.populate(l.Name, path.Join( + fmt.Sprintf("date=%s", hour.Format("2006-01-02")), + fmt.Sprintf("hour=%02d", hour.Hour()))) + if err != nil { + return nil, err + } + _entries, err := idx.Query(options) + if err != nil { + return nil, err + } + entries = append(entries, _entries...) + } } + return entries, nil } diff --git a/json_index_test.go b/json_index_test.go new file mode 100644 index 0000000..1dc7822 --- /dev/null +++ b/json_index_test.go @@ -0,0 +1,94 @@ +package metadata + +import ( + "fmt" + "github.com/google/uuid" + "testing" + "time" +) + +func TestJSONSave(t *testing.T) { + MergeConfigurations = []MergeConfigurationsConf{ + {10, 10 * 1024 * 1024, 1}, + } + idx, err := NewJSONIndex( + "_testdata", + "default", + "test", + layers) + if err != nil { + panic(err) + } + var ents []*IndexEntry + now := time.Now() + threeDaysAgo := now.Add(-3 * 24 * time.Hour) + + for ts := threeDaysAgo; ts.Before(now); ts = ts.Add(15 * time.Second) { + ents = append(ents, &IndexEntry{ + Database: "default", + Table: "test", + MinTime: ts.UnixNano(), + MaxTime: ts.Add(15 * time.Second).UnixNano(), + Path: fmt.Sprintf("date=%s/hour=%02d/%s.1.parquet", + ts.UTC().Format("2006-01-02"), + ts.UTC().Hour(), + uuid.New().String()), + SizeBytes: 1000000, + ChunkTime: time.Now().UnixNano(), + Layer: "l1", + }) + } + + p := idx.Batch(ents, nil) + _, err = p.Get() + if err != nil { + panic(err) + } + fmt.Printf("Items saved: %d\n", len(ents)) +} + +func TestJSONSaveAndRM(t *testing.T) { + MergeConfigurations = []MergeConfigurationsConf{ + {10, 10 * 1024 * 1024, 1}, + } + idx, err := NewJSONIndex( + "_testdata", + "default", + "test", + layers) + if err != nil { + panic(err) + } + var ents []*IndexEntry + now := time.Now() + threeDaysAgo := now.Add(-3 * 24 * time.Hour) + + for ts := threeDaysAgo; ts.Before(now); ts = ts.Add(15 * time.Second) { + ents = append(ents, &IndexEntry{ + Database: "default", + Table: "test", + MinTime: ts.UnixNano(), + MaxTime: ts.Add(15 * time.Second).UnixNano(), + Path: fmt.Sprintf("date=%s/hour=%02d/%s.1.parquet", + ts.UTC().Format("2006-01-02"), + ts.UTC().Hour(), + uuid.New().String()), + SizeBytes: 1000000, + ChunkTime: time.Now().UnixNano(), + Layer: "l1", + }) + } + + p := idx.Batch(ents, nil) + _, err = p.Get() + if err != nil { + panic(err) + } + fmt.Printf("Items saved: %d\n", len(ents)) + + p = idx.Batch(nil, ents) + _, err = p.Get() + if err != nil { + panic(err) + } +} diff --git a/json_merge_planner.go b/json_merge_planner.go new file mode 100644 index 0000000..5b1d7f1 --- /dev/null +++ b/json_merge_planner.go @@ -0,0 +1,42 @@ +package metadata + +import "path" + +func (J *JSONIndex) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error) { + J.lock.Lock() + defer J.lock.Unlock() + parts, ok := J.parts[layer] + if !ok { + return MergePlan{}, nil + } + for _, part := range parts { + plan, err := part.GetMergePlan(writerId, layer, iteration) + if err != nil { + return MergePlan{}, err + } + if len(plan.From) != 0 { + return plan, nil + } + + } + return MergePlan{}, nil +} + +func (J *JSONIndex) EndMerge(plan MergePlan) Promise[int32] { + if len(plan.From) == 0 { + return nil + } + J.lock.Lock() + defer J.lock.Unlock() + parts := J.parts[plan.Layer] + dir := path.Dir(plan.From[0]) + part := parts[dir] + if part != nil { + return part.EndMerge(plan) + } + return nil +} + +func (J *JSONIndex) GetMergePlanner() TableMergePlanner { + return J +} diff --git a/json_move_planner.go b/json_move_planner.go new file mode 100644 index 0000000..85e14f3 --- /dev/null +++ b/json_move_planner.go @@ -0,0 +1,44 @@ +package metadata + +import ( + "fmt" + "path" +) + +func (J *JSONIndex) GetMovePlan(writerId string, layer string) (MovePlan, error) { + J.lock.Lock() + defer J.lock.Unlock() + l := J.parts[layer] + if l == nil { + return MovePlan{}, nil + } + for _, p := range l { + mp, err := p.GetMovePlanner().GetMovePlan(writerId, layer) + if err != nil { + return MovePlan{}, err + } + if mp.PathFrom != "" { + return mp, nil + } + } + return MovePlan{}, nil +} + +func (J *JSONIndex) EndMove(plan MovePlan) Promise[int32] { + J.lock.Lock() + defer J.lock.Unlock() + l := J.parts[plan.LayerFrom] + if l == nil { + return Fulfilled(fmt.Errorf("layer \"%s\" not found", plan.LayerFrom), int32(0)) + } + dir := path.Dir(plan.PathFrom) + part := l[dir] + if part != nil { + return part.EndMove(plan) + } + return nil +} + +func (J *JSONIndex) GetMovePlanner() TableMovePlanner { + return J +} diff --git a/json_part_drop_planner.go b/json_part_drop_planner.go new file mode 100644 index 0000000..14c2a85 --- /dev/null +++ b/json_part_drop_planner.go @@ -0,0 +1,38 @@ +package metadata + +func (J *jsonPartIndex) GetDropPlanner() TableDropPlanner { + return J +} + +func (J *jsonPartIndex) RmFromDropQueue(plan DropPlan) Promise[int32] { + J.m.Lock() + defer J.m.Unlock() + + updated := false + + for i := len(J.dropQueue) - 1; i >= 0; i-- { + if J.dropQueue[i].Path != plan.Path { + continue + } + J.dropQueue[i] = J.dropQueue[len(J.dropQueue)-1] + J.dropQueue = J.dropQueue[:len(J.dropQueue)-1] + updated = true + break + } + + if !updated { + return Fulfilled[int32](nil, 0) + } + + p := NewPromise[int32]() + J.promises = append(J.promises, p) + J.doUpdate() + return p +} + +func (J *jsonPartIndex) GetDropQueue(writerId string, layer string) (DropPlan, error) { + if len(J.dropQueue) == 0 { + return DropPlan{}, nil + } + return J.dropQueue[0], nil +} diff --git a/json_part_index.go b/json_part_index.go index 5c9d903..2e5997f 100644 --- a/json_part_index.go +++ b/json_part_index.go @@ -4,33 +4,38 @@ import ( "context" "encoding/json" "fmt" - "github.com/google/uuid" jsoniter "github.com/json-iterator/go" "os" "path" "strings" "sync" "sync/atomic" + "time" ) type jsonIndexEntry struct { + IndexEntry Id uint32 `json:"id"` - Layer string `json:"layer"` - Path string `json:"path"` - SizeBytes int64 `json:"size_bytes"` - RowCount int64 `json:"row_count"` - ChunkTime int64 `json:"chunk_time"` - MinTime int64 `json:"min_time"` - MaxTime int64 `json:"max_time"` Range string `json:"range"` Type string `json:"type"` _marshalled string `json:"-"` } +type jsonPartIdxOpts struct { + rootPath string + database string + table string + partPath string + layers []jsonLayer + layer string +} + type jsonPartIndex struct { rootPath string database string table string + layer string + layers []jsonLayer idxPath string @@ -43,90 +48,50 @@ type jsonPartIndex struct { stop context.CancelFunc lastId uint32 - dropQueue []string + dropQueue []DropPlan parquetSizeBytes int64 rowCount int64 minTime int64 maxTime int64 filesInMerge map[string]bool + filesInMove map[string]bool } -func newJsonPartIndex(rootPath string, database string, table string, partPath string, -) (*jsonPartIndex, error) { +var _ TableIndex = &jsonPartIndex{} + +func newJsonPartIndex(opts jsonPartIdxOpts) (*jsonPartIndex, error) { res := &jsonPartIndex{ - rootPath: rootPath, - database: database, - table: table, - idxPath: path.Join(rootPath, database, table, "data", partPath), + rootPath: opts.rootPath, + database: opts.database, + table: opts.table, + idxPath: path.Join(opts.rootPath, opts.database, opts.table, "data", opts.partPath), entries: &sync.Map{}, filesInMerge: make(map[string]bool), + layers: opts.layers, + } + _, err := os.Stat(res.idxPath) + if os.IsNotExist(err) { + os.MkdirAll(res.idxPath, 0o755) } res.updateCtx, res.doUpdate = context.WithCancel(context.Background()) res.workCtx, res.stop = context.WithCancel(context.Background()) - err := res.populate() + err = res.populate() return res, err } -func (J *jsonPartIndex) GetMergePlan(layer string, iteration int) (*MergePlan, error) { - suffix := fmt.Sprintf(".%d.parquet", iteration) - var from []string - var size int64 - if iteration > len(MergeConfigurations) { - return nil, fmt.Errorf("no more merge configurations available for iteration %d", iteration) - } - conf := MergeConfigurations[iteration-1] - J.m.Lock() - defer J.m.Unlock() - J.entries.Range(func(key, value interface{}) bool { - entry := value.(*jsonIndexEntry) - if !strings.HasSuffix(entry.Path, suffix) { - return true - } - if J.filesInMerge[entry.Path] { - return true +func (J *jsonPartIndex) getLayer(name string) int { + for i, layer := range J.layers { + if layer.Name == name { + return i } - if size > conf[1] { - return false - } - from = append(from, entry.Path) - size += entry.SizeBytes - return true - }) - for _, file := range from { - J.filesInMerge[file] = true } - uid, _ := uuid.NewUUID() - - tablePath := path.Join(J.rootPath, J.database, J.table, "data") + "/" - partPath := J.idxPath[len(tablePath):] - return &MergePlan{ - Database: J.database, - Table: J.table, - From: from, - To: path.Join(partPath, fmt.Sprintf("%s.%d.parquet", uid.String(), iteration+1)), - Iteration: iteration, - }, nil -} - -func (J *jsonPartIndex) EndMerge(plan *MergePlan) error { - J.m.Lock() - defer J.m.Unlock() - for _, file := range plan.From { - delete(J.filesInMerge, file) - } - return nil + return -1 } func (J *jsonPartIndex) GetQuerier() TableQuerier { return J } -func (J *jsonPartIndex) GetMergePlanner() TableMergePlanner { - return J -} - -var _ TableIndex = &jsonPartIndex{} - func (J *jsonPartIndex) Query(options QueryOptions) ([]*IndexEntry, error) { var res []*IndexEntry var suffix string @@ -152,42 +117,18 @@ func (J *jsonPartIndex) Query(options QueryOptions) ([]*IndexEntry, error) { func (J *jsonPartIndex) addToDropQueue(files []*IndexEntry) { for _, f := range files { - J.dropQueue = append(J.dropQueue, f.Path) + J.dropQueue = append(J.dropQueue, DropPlan{ + WriterID: f.WriterID, + Layer: f.Layer, + Database: f.Database, + Table: f.Table, + Path: f.Path, + TimeS: int32(time.Now().Add(time.Second * 30).Unix()), + }) } } -func (J *jsonPartIndex) RmFromDropQueue(files []string) Promise[int32] { - J.m.Lock() - defer J.m.Unlock() - - updated := false - for i := len(J.dropQueue) - 1; i >= 0; i-- { - for _, file := range files { - if J.dropQueue[i] != file { - continue - } - J.dropQueue[i] = J.dropQueue[len(J.dropQueue)-1] - J.dropQueue = J.dropQueue[:len(J.dropQueue)-1] - updated = true - break - } - } - - if !updated { - return Fulfilled[int32](nil, 0) - } - - p := NewPromise[int32]() - J.promises = append(J.promises, p) - J.doUpdate() - return p -} - -func (J *jsonPartIndex) GetDropQueue() []string { - return J.dropQueue -} - func (J *jsonPartIndex) populate() error { partPath := J.idxPath if _, err := os.Stat(path.Join(partPath, "metadata.json")); os.IsNotExist(err) { @@ -205,7 +146,26 @@ func (J *jsonPartIndex) populate() error { switch s { case "drop_queue": for iterator.ReadArray() { - dropQueueEntry := iterator.ReadString() + var dropQueueEntry DropPlan + iterator.ReadMapCB(func(iterator *jsoniter.Iterator, s string) bool { + switch s { + case "writer_id": + dropQueueEntry.WriterID = iterator.ReadString() + case "layer": + dropQueueEntry.Layer = iterator.ReadString() + case "database": + dropQueueEntry.Database = iterator.ReadString() + case "table": + dropQueueEntry.Table = iterator.ReadString() + case "path": + dropQueueEntry.Path = iterator.ReadString() + case "time_s": + dropQueueEntry.TimeS = iterator.ReadInt32() + default: + iterator.Skip() + } + return true + }) J.dropQueue = append(J.dropQueue, dropQueueEntry) } case "type": @@ -278,16 +238,10 @@ func (J *jsonPartIndex) entry2JEntry(entries []*IndexEntry) ([]*jsonIndexEntry, for i, entry := range entries { id := atomic.AddUint32(&J.lastId, 1) _entry := &jsonIndexEntry{ - Id: id, - Layer: entry.Layer, - Path: entry.Path, - SizeBytes: entry.SizeBytes, - RowCount: entry.RowCount, - ChunkTime: entry.ChunkTime, - MinTime: entry.MinTime, - MaxTime: entry.MaxTime, - Range: "1h", - Type: "compacted", + Id: id, + IndexEntry: *entry, + Range: "1h", + Type: "compacted", } _marshalled, err := json.Marshal(_entry) if err != nil { @@ -439,7 +393,12 @@ func (J *jsonPartIndex) flush() { if i > 0 { stream.WriteMore() } - stream.WriteString(d) + strD, err := json.Marshal(d) + if err != nil { + onErr(err) + return + } + stream.WriteRaw(string(strD)) } stream.WriteArrayEnd() @@ -499,17 +458,10 @@ func (J *jsonPartIndex) Stop() { } func (J *jsonPartIndex) jEntry2Entry(_e *jsonIndexEntry) *IndexEntry { - return &IndexEntry{ - Path: _e.Path, - SizeBytes: _e.SizeBytes, - RowCount: _e.RowCount, - ChunkTime: _e.ChunkTime, - MinTime: _e.MinTime, - MaxTime: _e.MaxTime, - } + return &_e.IndexEntry } -func (J *jsonPartIndex) Get(path string) *IndexEntry { +func (J *jsonPartIndex) Get(layer string, path string) *IndexEntry { e, _ := J.entries.Load(path) if e == nil { return nil diff --git a/json_part_merge_planner.go b/json_part_merge_planner.go new file mode 100644 index 0000000..c088441 --- /dev/null +++ b/json_part_merge_planner.go @@ -0,0 +1,76 @@ +package metadata + +import ( + "fmt" + "github.com/google/uuid" + "path" + "strings" + "time" +) + +func (J *jsonPartIndex) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error) { + suffix := fmt.Sprintf(".%d.parquet", iteration) + var from []string + var size int64 + if iteration > len(MergeConfigurations) { + return MergePlan{}, fmt.Errorf("no more merge configurations available for iteration %d", iteration) + } + conf := MergeConfigurations[iteration-1] + now := time.Now() + J.m.Lock() + defer J.m.Unlock() + J.entries.Range(func(key, value interface{}) bool { + entry := value.(*jsonIndexEntry) + if !strings.HasSuffix(entry.Path, suffix) { + return true + } + if J.filesInMerge[entry.Path] { + return true + } + if entry.ChunkTime+conf.TimeoutSec()*1000000000 >= now.UnixNano() { + return true + } + if size > conf[1] { + return false + } + + from = append(from, entry.Path) + size += entry.SizeBytes + return true + }) + for _, file := range from { + J.filesInMerge[file] = true + } + uid, _ := uuid.NewUUID() + + tablePath := path.Join(J.rootPath, J.database, J.table, "data") + "/" + partPath := J.idxPath[len(tablePath):] + return MergePlan{ + Layer: layer, + Database: J.database, + Table: J.table, + From: from, + To: path.Join(partPath, fmt.Sprintf("%s.%d.parquet", uid.String(), iteration+1)), + Iteration: iteration, + }, nil +} + +func (J *jsonPartIndex) EndMerge(plan MergePlan) Promise[int32] { + J.m.Lock() + defer J.m.Unlock() + update := false + for _, file := range plan.From { + update = update || J.filesInMerge[file] + delete(J.filesInMerge, file) + } + if !update { + return Fulfilled[int32](nil, 0) + } + p := NewPromise[int32]() + J.promises = append(J.promises, p) + return p +} + +func (J *jsonPartIndex) GetMergePlanner() TableMergePlanner { + return J +} diff --git a/json_part_move_planner.go b/json_part_move_planner.go new file mode 100644 index 0000000..f5e7912 --- /dev/null +++ b/json_part_move_planner.go @@ -0,0 +1,60 @@ +package metadata + +import ( + "time" +) + +func (J *jsonPartIndex) GetMovePlan(writerId string, layer string) (MovePlan, error) { + J.m.Lock() + defer J.m.Unlock() + var plan *MovePlan + J.entries.Range(func(key, value any) bool { + val := value.(*jsonIndexEntry) + if J.filesInMerge[val.Path] { + return true + } + layerTdx := J.getLayer(val.Layer) + if layerTdx < 0 { + return true + } + layerTo := "" + if layerTdx+1 < len(J.layers) { + layerTo = J.layers[layerTdx+1].Name + } + if J.layers[layerTdx].TTLSec > 0 && + time.Now().UnixNano()-val.ChunkTime >= int64(J.layers[layerTdx].TTLSec)*1000000000 { + plan = &MovePlan{ + ID: "", + Database: J.database, + Table: J.table, + PathFrom: val.Path, + LayerFrom: val.Layer, + PathTo: val.Path, + LayerTo: layerTo, + } + return false + } + return true + }) + if plan == nil { + return MovePlan{}, nil + } + return *plan, nil +} + +func (J *jsonPartIndex) EndMove(plan MovePlan) Promise[int32] { + J.m.Lock() + defer J.m.Unlock() + if _, ok := J.filesInMove[plan.PathFrom]; !ok { + return Fulfilled[int32](nil, 0) + } + delete(J.filesInMove, plan.PathFrom) + p := NewPromise[int32]() + J.promises = append(J.promises, p) + J.doUpdate() + return p +} + +func (J *jsonPartIndex) GetMovePlanner() TableMovePlanner { + return J +} diff --git a/redis_drop_planner.go b/redis_drop_planner.go new file mode 100644 index 0000000..a12e24b --- /dev/null +++ b/redis_drop_planner.go @@ -0,0 +1,32 @@ +package metadata + +func (r *RedisIndex) GetDropPlanner() TableDropPlanner { + return r +} + +func (r *RedisIndex) RmFromDropQueue(plan DropPlan) Promise[int32] { + return Fulfilled((&redisTaskQueue[DropPlan]{ + prefix: "drop", + database: r.database, + table: r.table, + suffix: "", + writerId: plan.WriterID, + layer: plan.Layer, + getEntrySHA: r.getMergePlanSha, + redis: r.c, + }).finishProcess(plan), int32(0)) +} + +func (r *RedisIndex) GetDropQueue(writerId string, layer string) (DropPlan, error) { + plan, err := (&redisTaskQueue[DropPlan]{ + prefix: "drop", + database: r.database, + table: r.table, + suffix: "", + writerId: writerId, + layer: layer, + getEntrySHA: r.getMergePlanSha, + redis: r.c, + }).processEntry() + return plan, err +} diff --git a/redis_index.go b/redis_index.go index 98d1535..f795638 100644 --- a/redis_index.go +++ b/redis_index.go @@ -5,18 +5,21 @@ import ( "crypto/tls" "encoding/json" "fmt" - "github.com/google/uuid" "github.com/redis/go-redis/v9" "math" "net" "net/url" "os" - "path/filepath" "strconv" "strings" "time" ) +type redisLayer struct { + Layer + LayerTo string `json:"layer_to"` +} + type redisIndexEntry struct { IndexEntry StrMinTime string `json:"str_min_time"` @@ -60,6 +63,7 @@ type RedisIndex struct { database string table string + layers []redisLayer } func getRedisClient(u *url.URL) (*redis.Client, error) { @@ -93,15 +97,29 @@ func getRedisClient(u *url.URL) (*redis.Client, error) { return redis.NewClient(opts), nil } -func NewRedisIndex(URL string, database string, table string) (TableIndex, error) { +func NewRedisIndex(URL string, database string, table string, layers []Layer) (TableIndex, error) { u, err := url.Parse(URL) if err != nil { return nil, err } + + redisLayers := make([]redisLayer, len(layers)) + for i, layer := range layers { + layerTo := "" + if i < len(layers)-1 { + layerTo = layers[i+1].Name + } + redisLayers = append(redisLayers, redisLayer{ + Layer: layer, + LayerTo: layerTo, + }) + } + idx := &RedisIndex{ url: u, database: database, table: table, + layers: redisLayers, } client, err := getRedisClient(u) @@ -118,63 +136,6 @@ func NewRedisIndex(URL string, database string, table string) (TableIndex, error return idx, nil } -type redisMergePlan struct { - ID string `json:"id"` - Time int32 `json:"time"` - Paths []string `json:"paths"` -} - -func (r *RedisIndex) GetMergePlan(layer string, iteration int) (*MergePlan, error) { - res, err := r.c.EvalSha(context.Background(), r.getMergePlanSha, []string{ - r.database, - r.table, - strconv.Itoa(iteration), - strconv.FormatInt(MergeConfigurations[iteration-1][1], 10), - }, nil).Result() - if err != nil { - return nil, fmt.Errorf("failed to execute script: %v", err) - } - if res == nil || res.(string) == "" { - return nil, nil - } - var plan redisMergePlan - err = json.Unmarshal([]byte(res.(string)), &plan) - - if len(plan.Paths) == 0 { - return nil, nil - } - - firstFile := plan.Paths[0] - firstFileDir := filepath.Dir(firstFile) - - return &MergePlan{ - ID: plan.ID, - Layer: layer, - Database: r.database, - Table: r.table, - From: plan.Paths, - To: filepath.Join(firstFileDir, fmt.Sprintf("%s.%d.parquet", uuid.New().String(), iteration+1)), - Iteration: iteration, - }, err -} - -func (r *RedisIndex) EndMerge(plan *MergePlan) error { - if plan == nil { - return nil - } - _, err := r.c.EvalSha(context.Background(), r.endMergeSha, []string{ - plan.Database, - plan.Table, - strconv.Itoa(plan.Iteration), - plan.ID, - }).Result() - return err -} - -func (r *RedisIndex) GetMergePlanner() TableMergePlanner { - return r -} - func (r *RedisIndex) GetQuerier() TableQuerier { return r } @@ -211,19 +172,34 @@ func (r *RedisIndex) Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32] { } res := NewPromise[int32]() - var keys []string - for _, c := range MergeConfigurations { - keys = append(keys, strconv.FormatInt(c[1], 10)) + var err error + var keys [2][]byte + keys[0], err = json.Marshal(MergeConfigurations) + if err != nil { + res.Done(0, err) + return res + } + lmap := make(map[string]redisLayer) + for _, layer := range r.layers { + lmap[layer.Name] = layer + } + keys[1], err = json.Marshal(lmap) + if err != nil { + res.Done(0, err) + return res } go func() { - _, err := r.c.EvalSha(context.Background(), r.patchSha, keys, cmds...).Result() + _, err := r.c.EvalSha(context.Background(), r.patchSha, []string{ + string(keys[0]), + string(keys[1]), + }, cmds...).Result() res.Done(0, err) }() return res } -func (r *RedisIndex) Get(path string) *IndexEntry { +func (r *RedisIndex) Get(layer string, path string) *IndexEntry { firstFolder := strings.Split(path, "/")[0] res, err := r.c.HGet( context.Background(), @@ -247,50 +223,6 @@ func (r *RedisIndex) Run() { func (r *RedisIndex) Stop() { } -type QEntry struct { - Path string `json:"path"` - TimeS int32 `json:"time"` -} - -func (r *RedisIndex) AddToDropQueue(files []string) Promise[int32] { - _files := make([]any, len(files)) - for i, file := range files { - _file, err := json.Marshal(QEntry{ - Path: file, - TimeS: int32(time.Now().Unix()), - }) - if err != nil { - return Fulfilled[int32](err, 0) - } - _files[i] = string(_file) - } - r.c.LPush(context.Background(), "drop", _files...) - return Fulfilled[int32](nil, 0) -} - -func (r *RedisIndex) RmFromDropQueue(files []string) Promise[int32] { - res := NewPromise[int32]() - go func() { - for _, file := range files { - _, err := r.c.LRem(context.Background(), "drop", 1, file).Result() - if err != nil { - res.Done(0, err) - return - } - } - res.Done(0, nil) - }() - return res -} - -func (r *RedisIndex) GetDropQueue() []string { - res, err := r.c.LRange(context.Background(), "drop", 0, -1).Result() - if err != nil { - return nil - } - return res -} - func redisScan(scanFn func(cursor uint64) (uint64, error)) error { var err error var cursor uint64 = 0 diff --git a/redis_index_test.go b/redis_index_test.go index 322b8dd..828d51b 100644 --- a/redis_index_test.go +++ b/redis_index_test.go @@ -7,11 +7,19 @@ import ( "time" ) +var layers = []Layer{ + {"file://./_testdata", "l1", "fs", 20}, +} + func TestSave(t *testing.T) { - MergeConfigurations = [][3]int64{ + MergeConfigurations = []MergeConfigurationsConf{ {10, 10 * 1024 * 1024, 1}, } - idx, err := NewRedisIndex("redis://localhost:6379/0", "default", "test") + idx, err := NewRedisIndex( + "redis://localhost:6379/0", + "default", + "test", + layers) if err != nil { t.Fatalf("Failed to create index: %v", err) } @@ -30,6 +38,8 @@ func TestSave(t *testing.T) { ts.UTC().Hour(), uuid.New().String()), SizeBytes: 1000000, + ChunkTime: time.Now().UnixNano(), + Layer: "l1", }) } @@ -42,10 +52,14 @@ func TestSave(t *testing.T) { } func TestSaveAndDel(t *testing.T) { - MergeConfigurations = [][3]int64{ + MergeConfigurations = []MergeConfigurationsConf{ {10, 10 * 1024 * 1024, 1}, } - idx, err := NewRedisIndex("redis://localhost:6379/0", "default", "test") + idx, err := NewRedisIndex( + "redis://localhost:6379/0", + "default", + "test", + layers) if err != nil { t.Fatalf("Failed to create index: %v", err) } @@ -65,30 +79,66 @@ func TestSaveAndDel(t *testing.T) { MaxTime: ts.Add(15 * time.Second).UnixNano(), Path: pth, SizeBytes: 1000000, + ChunkTime: time.Now().UnixNano(), + Layer: "l1", }) } - + start := time.Now() p := idx.Batch(ents, nil) _, err = p.Get() if err != nil { panic(err) } + fmt.Printf("%d items saved in %v\n", len(ents), time.Since(start)) time.Sleep(time.Second) + start = time.Now() p = idx.Batch(nil, ents) _, err = p.Get() if err != nil { panic(err) } - fmt.Printf("Items saved: %d\n", len(ents)) + fmt.Printf("%d items dropped in %v\n", len(ents), time.Since(start)) + + time.Sleep(time.Second * 30) + + start = time.Now() + var drops []DropPlan + d, err := idx.GetDropPlanner().GetDropQueue("", "l1") + if err != nil { + t.Fatalf("Failed to get drop queue: %v", err) + return + } + for d.Path != "" { + d, err = idx.GetDropPlanner().GetDropQueue("", "l1") + if err != nil { + t.Fatalf("Failed to get drop queue: %v", err) + return + } + drops = append(drops, d) + } + fmt.Printf("Acquired drop of %d items in %v\n", len(drops), time.Since(start)) + start = time.Now() + for _, d := range drops[:200] { + _, err = idx.GetDropPlanner().RmFromDropQueue(d).Get() + if err != nil { + t.Fatalf("Failed to remove from drop queue: %v", err) + return + } + } + fmt.Printf("Processed 200 drop items in %v\n", time.Since(start)) } func TestRedisIndex2(t *testing.T) { - MergeConfigurations = [][3]int64{ + MergeConfigurations = []MergeConfigurationsConf{ {10, 10 * 1024 * 1024, 1}, } - idx, err := NewRedisIndex("redis://localhost:6379/0", "default", "test") + idx, err := NewRedisIndex( + "redis://localhost:6379/0", + "default", + "test", + layers) if err != nil { t.Fatalf("Failed to create index: %v", err) } @@ -100,7 +150,7 @@ func TestRedisIndex2(t *testing.T) { } func TestRedisDBIndex(t *testing.T) { - MergeConfigurations = [][3]int64{ + MergeConfigurations = []MergeConfigurationsConf{ {10, 10 * 1024 * 1024, 1}, } idx, err := NewRedisDbIndex("redis://localhost:6379/0") diff --git a/redis_merge_planner.go b/redis_merge_planner.go new file mode 100644 index 0000000..913e4e4 --- /dev/null +++ b/redis_merge_planner.go @@ -0,0 +1,101 @@ +package metadata + +import ( + "context" + "fmt" + "github.com/google/uuid" + "path/filepath" + "slices" + "strconv" + "strings" +) + +type redisMergePlan struct { + ID string `json:"id"` + Time int32 `json:"time"` + Paths []string `json:"paths"` +} + +func (r redisMergePlan) Id() string { + return r.ID +} + +func (r *RedisIndex) GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error) { + mergePattern := fmt.Sprintf("merge:%s:%s:%d:*:%s:%s:*", r.database, r.table, iteration, layer, writerId) + var keys []string + err := redisScan(func(cursor uint64) (uint64, error) { + _keys, cursor, err := r.c.Scan(context.Background(), cursor, mergePattern, 1000).Result() + if err != nil { + return 0, err + } + keys = append(keys, _keys...) + return cursor, nil + }) + if err != nil { + return MergePlan{}, err + } + slices.Sort(keys) + var plan redisMergePlan + for _, k := range keys { + parts := strings.SplitN(k, ":", 6) + if len(parts) < 6 { + continue + } + dir := parts[4] + plan, err = (&redisTaskQueue[redisMergePlan]{ + prefix: "merge", + database: r.database, + table: r.table, + suffix: strconv.Itoa(iteration) + ":" + dir, + writerId: writerId, + layer: layer, + getEntrySHA: r.getMergePlanSha, + redis: r.c, + }).processEntry() + if err != nil { + return MergePlan{}, err + } + if len(plan.Paths) > 0 { + break + } + } + + if len(plan.Paths) == 0 { + return MergePlan{}, nil + } + + firstFile := plan.Paths[0] + firstFileDir := filepath.Dir(firstFile) + + return MergePlan{ + ID: plan.ID, + Layer: layer, + Database: r.database, + Table: r.table, + From: plan.Paths, + To: filepath.Join(firstFileDir, fmt.Sprintf("%s.%d.parquet", uuid.New().String(), iteration+1)), + Iteration: iteration, + WriterID: writerId, + }, err +} + +func (r *RedisIndex) EndMerge(plan MergePlan) Promise[int32] { + fmt.Println("removing merge plan from Redis: ", plan.ID) + dir := filepath.Dir(plan.To) + err := (&redisTaskQueue[redisMergePlan]{ + prefix: "merge", + database: r.database, + table: r.table, + suffix: strconv.Itoa(plan.Iteration) + ":" + dir, + writerId: plan.WriterID, + layer: plan.Layer, + getEntrySHA: r.getMergePlanSha, + redis: r.c, + }).finishProcess(redisMergePlan{ID: plan.ID}) + fmt.Println("removing merge plan from Redis ok") + return Fulfilled(err, int32(0)) +} + +func (r *RedisIndex) GetMergePlanner() TableMergePlanner { + return r +} diff --git a/redis_move_planner.go b/redis_move_planner.go new file mode 100644 index 0000000..45a6d9f --- /dev/null +++ b/redis_move_planner.go @@ -0,0 +1,31 @@ +package metadata + +func (r *RedisIndex) GetMovePlanner() TableMovePlanner { + return r +} + +func (r *RedisIndex) GetMovePlan(writerId string, layer string) (MovePlan, error) { + return (&redisTaskQueue[MovePlan]{ + prefix: "move", + database: r.database, + table: r.table, + suffix: "", + writerId: writerId, + layer: layer, + getEntrySHA: r.getMergePlanSha, + redis: r.c, + }).processEntry() +} + +func (r *RedisIndex) EndMove(plan MovePlan) Promise[int32] { + return Fulfilled((&redisTaskQueue[MovePlan]{ + prefix: "move", + database: r.database, + table: r.table, + suffix: "", + writerId: plan.WriterID, + layer: plan.LayerFrom, + getEntrySHA: r.getMergePlanSha, + redis: r.c, + }).finishProcess(plan), int32(0)) +} diff --git a/redis_scripts/end_merge.lua b/redis_scripts/end_merge.lua index 0bfe916..262d82a 100644 --- a/redis_scripts/end_merge.lua +++ b/redis_scripts/end_merge.lua @@ -2,10 +2,12 @@ local database = KEYS[1] local table = KEYS[2] local index = KEYS[3] -local merge_id = KEYS[4] +local layer = KEYS[4] +local writer_id = KEYS[5] +local merge_id = KEYS[6] -- Construct the merge key -local merge_key = "merge:" .. database .. ":" .. table .. ":" .. index .. ":processing" +local merge_key = "merge:" .. database .. ":" .. table .. ":" .. index .. ":" .. layer .. ":" .. writer_id .. ":processing" -- Function to remove a merge plan by ID local function remove_merge_plan(key, id) diff --git a/redis_scripts/get_merge_plan.lua b/redis_scripts/get_merge_plan.lua index eb79cb8..6b06d3c 100644 --- a/redis_scripts/get_merge_plan.lua +++ b/redis_scripts/get_merge_plan.lua @@ -1,10 +1,15 @@ -- Construct the merge key from the provided KEYS -local database = KEYS[1] -local table = KEYS[2] -local index = KEYS[3] --- Timeout before we take an idle merge plan into process -local timeout_s = tonumber(KEYS[4]) -local merge_key_base = "merge:" .. database .. ":" .. table .. ":" .. index +local prefix = KEYS[1] +local database = KEYS[2] +local table = KEYS[3] +local suffix = KEYS[4] +local layer = KEYS[5] +local writer_id = KEYS[6] + +if suffix ~= "" then + suffix = suffix .. ":" +end +local merge_key_base = prefix .. ":" .. database .. ":" .. table .. ":" .. suffix .. layer .. ":" .. writer_id local merge_key_idle = merge_key_base .. ":idle" local merge_key_processing = merge_key_base .. ":processing" @@ -16,10 +21,11 @@ local function process_idle_item() local merge_item_json = redis.call("LPOP", merge_key_idle) if merge_item_json then local merge_item = cjson.decode(merge_item_json) - if tonumber(merge_item.time) + timeout_s < tonumber(current_time) then + if merge_item.time_s > current_time then + redis.call("LPUSH", merge_key_idle, merge_item_json) return false end - merge_item.time = current_time + merge_item.time_s = current_time + 1800 -- 30m timeout to reprocess the dead items local updated_item_json = cjson.encode(merge_item) -- Push the updated item to the processing list @@ -32,23 +38,20 @@ end -- Function to process a processing item local function process_processing_item() - local processing_item_json = redis.call("LINDEX", merge_key_processing, 0) - - if processing_item_json then - local processing_item = cjson.decode(processing_item_json) - - -- Check if the first item in processing is older than the timeout - if tonumber(current_time) - tonumber(processing_item.time) > timeout_s then - -- Remove the item from the processing list - redis.call("LPOP", merge_key_processing) + local merge_item_json = redis.call("LPOP", merge_key_processing) + if merge_item_json then + local merge_item = cjson.decode(merge_item_json) + if merge_item.time_s > current_time then + redis.call("LPUSH", merge_key_processing, merge_item_json) + return false + end + merge_item.time_s = current_time + 1800 -- 30m timeout to reprocess the dead items + local updated_item_json = cjson.encode(merge_item) - -- Update the time and re-add to processing - processing_item.time = current_time - local updated_item_json = cjson.encode(processing_item) - redis.call("RPUSH", merge_key_processing, updated_item_json) + -- Push the updated item to the processing list + redis.call("RPUSH", merge_key_processing, updated_item_json) - return updated_item_json - end + return updated_item_json end return false end diff --git a/redis_scripts/patch_index.lua b/redis_scripts/patch_index.lua index afed5a4..ea81f12 100644 --- a/redis_scripts/patch_index.lua +++ b/redis_scripts/patch_index.lua @@ -1,9 +1,14 @@ +local merge_conf = cjson.decode(KEYS[1]) +local move_conf = cjson.decode(KEYS[2]) + +math.randomseed(tonumber(redis.call('TIME')[1]) * 1000 + + tonumber(redis.call('TIME')[2]) / 1000) -- Seed the random number generator with the current time + -- Function to generate a pseudo-UUID local function generate_uuid() - local random = math.random local template ='xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx' return string.gsub(template, '[xy]', function (c) - local v = (c == 'x') and random(0, 0xf) or random(8, 0xb) + local v = (c == 'x') and math.random(0, 0xf) or math.random(8, 0xb) return string.format('%x', v) end) end @@ -14,11 +19,12 @@ local function get_dir(path) end -- Function to create and push a new merge object -local function create_and_push_new_merge(merge_key, path, size) - local current_time = redis.call("TIME")[1] +local function create_and_push_new_merge(merge_key, path, size, index) + local current_time = tonumber(redis.call("TIME")[1]) + local merge_ttl_s = merge_conf[index][1] local new_merge = cjson.encode({ id = generate_uuid(), - time = current_time, + time_s = current_time + merge_ttl_s, paths = {path}, size = size }) @@ -51,6 +57,73 @@ local function delete_file(entry) redis.call("DEL", "folders:" .. entry.database .. ":" .. entry.table) end + local drop_queue_key = "drop:" .. entry.database .. ":".. entry.table .. ":".. entry.layer .. ":".. + entry.writer_id .. ":idle" + local new_drop = cjson.encode({ + id = generate_uuid(), + writer_id = entry.writer_id, + layer = entry.layer, + path = entry.path, + database = entry.database, + table = entry.table, + time_s = tonumber(redis.call("TIME")[1]) + 30 + }) + redis.call("RPUSH", drop_queue_key, new_drop) + + return {success = true} +end + +local function merge_entry(entry, index) + local dir = get_dir(entry.path) + local merge_key = "merge:" .. entry.database .. ":" .. entry.table .. ":" .. index .. ":" .. dir .. ":" .. entry.layer .. ":" .. entry.writer_id .. ":idle" + local last_merge = redis.call("LINDEX", merge_key, -1) + + if not last_merge then + -- Create and push a new merge object + create_and_push_new_merge(merge_key, entry.path, entry.size_bytes, index) + return {success = true} + end + + -- Parse JSON from the last merge entry + local last_merge_data = cjson.decode(last_merge) + + if last_merge_data.size + entry.size_bytes > tonumber(merge_conf[index][2]) then + -- Create and push a new merge object + create_and_push_new_merge(merge_key, entry.path, entry.size_bytes, index) + return {success = true} + end + + -- Update the last merge entry + last_merge_data.size = last_merge_data.size + entry.size_bytes + table.insert(last_merge_data.paths, entry.path) + local updated_merge = cjson.encode(last_merge_data) + redis.call("LSET", merge_key, -1, updated_merge) + return {success = true} +end + +local function move_entry(entry) + local move_key = "move:".. entry.database.. ":".. entry.table.. ":".. entry.layer .. ":".. entry.writer_id .. ":idle" + + -- Extract parts of the path + local folder, uuid, iteration = string.match(entry.path, "(.+)/([^/.]+)%.(%d+)%.parquet$") + + -- Generate a new UUID for the destination path + local new_uuid = generate_uuid() + + -- Construct the new path + local path_to = folder .. "/" .. new_uuid .. "." .. iteration .. ".parquet" + local move_entry = { + id = generate_uuid(), + writer_id = entry.writer_id, + database = entry.database, + table = entry.table, + path_from = entry.path, + layer_from = entry.layer, + path_to = path_to, + layer_to = move_conf[entry.layer].layer_to, + time_s = entry.time_s + } + redis.call("RPUSH", move_key, cjson.encode(move_entry)) return {success = true} end @@ -66,44 +139,44 @@ local function process_file(entry) return {success = false, error = "Invalid file path format: " .. entry.path} end - local index_num = tonumber(index) + local index_num = tonumber(index) local dir = get_dir(entry.path) redis.call("HINCRBY", "folders:" .. entry.database .. ":" .. entry.table, dir, 1) -- Create a Redis entry for the file - local main_key = hash_key(entry) + local main_key = hash_key(entry) redis.call("HSET", main_key, entry.path, cjson.encode(entry)) - if index_num > #KEYS then - return {success = true} + local merge_ttl = -1 + local move_ttl = -1 + if index_num <= #merge_conf then + merge_ttl = merge_conf[index_num][1] + end + if move_conf[entry.layer].ttl_sec > 0 then + move_ttl = move_conf[entry.layer].ttl_sec end - -- Get the last value from the merge list - local merge_key = "merge:" .. entry.database .. ":" .. entry.table .. ":" .. index .. ":idle" - local last_merge = redis.call("LINDEX", merge_key, -1) - - if not last_merge then - -- Create and push a new merge object - create_and_push_new_merge(merge_key, entry.path, entry.size_bytes) + if merge_ttl ~= -1 and move_ttl == -1 then + return merge_entry(entry, index_num) + end + if move_ttl ~= -1 and merge_ttl == -1 then + return move_entry(entry) + end + if merge_ttl == -1 and move_ttl == -1 then return {success = true} end - -- Parse JSON from the last merge entry - local last_merge_data = cjson.decode(last_merge) + local chunk_time_s = tonumber(entry.str_chunk_time) / 1000000000 + local merge_time_s = chunk_time_s + tonumber(merge_conf[index_num][1]) + local move_time_s = chunk_time_s + move_conf[entry.layer].ttl_sec - if last_merge_data.size + entry.size_bytes > tonumber(KEYS[index_num]) then - -- Create and push a new merge object - create_and_push_new_merge(merge_key, entry.path, entry.size_bytes) - return {success = true} + if move_conf[entry.layer].ttl_sec == 0 or merge_time_s <= move_time_s then + entry.time_s = merge_time_s + return merge_entry(entry, index_num) end - - -- Update the last merge entry - last_merge_data.size = last_merge_data.size + entry.size_bytes - table.insert(last_merge_data.paths, entry.path) - local updated_merge = cjson.encode(last_merge_data) - redis.call("LSET", merge_key, -1, updated_merge) - return {success = true} + entry.time_s = move_time_s + return move_entry(entry) end -- Process all files diff --git a/redis_task_queue.go b/redis_task_queue.go new file mode 100644 index 0000000..ffb90f2 --- /dev/null +++ b/redis_task_queue.go @@ -0,0 +1,74 @@ +package metadata + +import ( + "context" + "encoding/json" + "fmt" + "github.com/redis/go-redis/v9" +) + +type redisTaskQueue[T Identified] struct { + prefix string + database string + table string + suffix string + writerId string + layer string + + getEntrySHA string + redis *redis.Client +} + +func (q *redisTaskQueue[T]) processEntry() (T, error) { + var res T + eStr, err := q.redis.EvalSha(context.Background(), q.getEntrySHA, []string{ + q.prefix, + q.database, + q.table, + q.suffix, + q.layer, + q.writerId, + }, nil).Result() + if err != nil { + return res, err + } + if eStr == "" { + return res, nil + } + + err = json.Unmarshal([]byte(eStr.(string)), &res) + + return res, err +} + +func (q *redisTaskQueue[T]) finishProcess(entry T) error { + suffix := q.suffix + if suffix != "" { + suffix += ":" + } + key := fmt.Sprintf("%s:%s:%s:%s%s:%s:processing", + q.prefix, q.database, q.table, suffix, q.layer, q.writerId) + strQ, err := q.redis.LRange(context.Background(), key, 0, -1).Result() + if err != nil { + return err + } + for _, e := range strQ { + var _e T + err = json.Unmarshal([]byte(e), &_e) + if err != nil { + fmt.Println("Error unmarshalling entry:", err) + continue + } + if _e.Id() == entry.Id() { + _, err := q.redis.LRem(context.Background(), key, 1, e).Result() + fmt.Println("Removed entry from processing queue:", _e.Id()) + return err + } + } + return nil +} + +func (q *redisTaskQueue[T]) AddEntry(entry T) Promise[int32] { + //TODO: not implemented + return nil +} diff --git a/types.go b/types.go index 2752736..093756e 100644 --- a/types.go +++ b/types.go @@ -7,9 +7,28 @@ import ( // MergeConfiguration is array of arrays of: // [[timeout_sec, max_size, merge_iteration_id], ...] // You have to init MergeConfigurations in the very beginning -type MergeConfigurationsConf [][3]int64 +type MergeConfigurationsConf [3]int64 -var MergeConfigurations MergeConfigurationsConf +func (m MergeConfigurationsConf) TimeoutSec() int64 { + return m[0] +} + +func (m MergeConfigurationsConf) MaxSize() int64 { + return m[1] +} + +func (m MergeConfigurationsConf) MergeIterationId() int64 { + return m[2] +} + +var MergeConfigurations []MergeConfigurationsConf + +type Layer struct { + URL string `json:"url"` + Name string `json:"name"` + Type string `json:"type"` + TTLSec int32 `json:"ttl_sec"` +} type IndexEntry struct { Layer string `json:"layer"` @@ -23,6 +42,7 @@ type IndexEntry struct { Max map[string]any `json:"max"` MinTime int64 `json:"min_time"` MaxTime int64 `json:"max_time"` + WriterID string `json:"writer_id"` } type QueryOptions struct { @@ -32,8 +52,13 @@ type QueryOptions struct { Iteration int } +type Identified interface { + Id() string +} + type MergePlan struct { ID string + WriterID string Layer string Database string Table string @@ -42,6 +67,39 @@ type MergePlan struct { Iteration int } +func (m MergePlan) Id() string { + return m.ID +} + +type MovePlan struct { + ID string `json:"id"` + WriterID string `json:"writer_id"` + Database string `json:"database"` + Table string `json:"table"` + PathFrom string `json:"path_from"` + LayerFrom string `json:"layer_from"` + PathTo string `json:"path_to"` + LayerTo string `json:"layer_to"` +} + +func (m MovePlan) Id() string { + return m.ID +} + +type DropPlan struct { + ID string + WriterID string + Layer string + Database string + Table string + Path string + TimeS int32 +} + +func (d DropPlan) Id() string { + return d.ID +} + type DBIndex interface { Databases() ([]string, error) Tables(database string) ([]string, error) @@ -50,18 +108,28 @@ type DBIndex interface { type TableIndex interface { Batch(add []*IndexEntry, rm []*IndexEntry) Promise[int32] - Get(path string) *IndexEntry + Get(layer string, path string) *IndexEntry Run() Stop() - RmFromDropQueue(files []string) Promise[int32] - GetDropQueue() []string GetMergePlanner() TableMergePlanner GetQuerier() TableQuerier + GetMovePlanner() TableMovePlanner + GetDropPlanner() TableDropPlanner +} + +type TableDropPlanner interface { + GetDropQueue(writerId string, layer string) (DropPlan, error) + RmFromDropQueue(plan DropPlan) Promise[int32] } type TableMergePlanner interface { - GetMergePlan(layer string, iteration int) (*MergePlan, error) - EndMerge(plan *MergePlan) error + GetMergePlan(writerId string, layer string, iteration int) (MergePlan, error) + EndMerge(plan MergePlan) Promise[int32] +} + +type TableMovePlanner interface { + GetMovePlan(writerId string, layer string) (MovePlan, error) + EndMove(plan MovePlan) Promise[int32] } type TableQuerier interface {