From e0de0edeb9155f367c9c78364d7adb6a4c54e153 Mon Sep 17 00:00:00 2001 From: Alex Gaetano Padula Date: Mon, 27 Apr 2026 04:34:42 -0400 Subject: [PATCH] align with tdb 920 with tombstone density capability, and range compaction --- pyproject.toml | 2 +- src/tidesdb/__init__.py | 2 +- src/tidesdb/tidesdb.py | 105 +++++++++++++++++++++++++- tests/test_tidesdb.py | 158 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 263 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index dbd88c7..9c70be1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "tidesdb" -version = "0.10.0" +version = "0.11.0" description = "Official Python bindings for TidesDB - A high-performance embedded key-value storage engine" readme = "README.md" requires-python = ">=3.10" diff --git a/src/tidesdb/__init__.py b/src/tidesdb/__init__.py index 35517ef..b2c1428 100644 --- a/src/tidesdb/__init__.py +++ b/src/tidesdb/__init__.py @@ -49,7 +49,7 @@ TDB_ERR_READONLY, ) -__version__ = "0.10.0" +__version__ = "0.11.0" __all__ = [ "TidesDB", "Transaction", diff --git a/src/tidesdb/tidesdb.py b/src/tidesdb/tidesdb.py index 9df4211..79e61f4 100644 --- a/src/tidesdb/tidesdb.py +++ b/src/tidesdb/tidesdb.py @@ -209,6 +209,8 @@ class _CColumnFamilyConfig(Structure): ("min_disk_space", c_uint64), ("l1_file_count_trigger", c_int), ("l0_queue_stall_threshold", c_int), + ("tombstone_density_trigger", c_double), + ("tombstone_density_min_entries", c_uint64), ("use_btree", c_int), ("commit_hook_fn", c_void_p), ("commit_hook_ctx", c_void_p), @@ -279,6 +281,7 @@ class _CConfig(Structure): ("unified_memtable_sync_interval_us", c_uint64), ("object_store", c_void_p), ("object_store_config", c_void_p), + ("max_concurrent_flushes", c_int), ] @@ -302,6 +305,11 @@ class _CStats(Structure): ("btree_total_nodes", c_uint64), ("btree_max_height", c_uint32), ("btree_avg_height", c_double), + ("total_tombstones", c_uint64), + ("tombstone_ratio", c_double), + ("level_tombstone_counts", POINTER(c_uint64)), + ("max_sst_density", c_double), + ("max_sst_density_level", c_int), ] @@ -477,6 +485,15 @@ class _CDbStats(Structure): _lib.tidesdb_compact.argtypes = [c_void_p] _lib.tidesdb_compact.restype = c_int +_lib.tidesdb_compact_range.argtypes = [ + c_void_p, + POINTER(c_uint8), + c_size_t, + POINTER(c_uint8), + c_size_t, +] +_lib.tidesdb_compact_range.restype = c_int + _lib.tidesdb_flush_memtable.argtypes = [c_void_p] _lib.tidesdb_flush_memtable.restype = c_int @@ -633,6 +650,7 @@ class Config: unified_memtable_sync_interval_us: int = 0 object_store: c_void_p | None = None object_store_config: ObjStoreConfig | None = None + max_concurrent_flushes: int = 0 @dataclass @@ -659,6 +677,8 @@ class ColumnFamilyConfig: min_disk_space: int = 100 * 1024 * 1024 l1_file_count_trigger: int = 4 l0_queue_stall_threshold: int = 20 + tombstone_density_trigger: float = 0.0 + tombstone_density_min_entries: int = 1024 use_btree: bool = False object_lazy_compaction: bool = False object_prefetch_compaction: bool = True @@ -691,6 +711,8 @@ def _to_c_struct(self, name: str = "") -> _CColumnFamilyConfig: c_config.min_disk_space = self.min_disk_space c_config.l1_file_count_trigger = self.l1_file_count_trigger c_config.l0_queue_stall_threshold = self.l0_queue_stall_threshold + c_config.tombstone_density_trigger = self.tombstone_density_trigger + c_config.tombstone_density_min_entries = self.tombstone_density_min_entries c_config.use_btree = 1 if self.use_btree else 0 c_config.object_lazy_compaction = 1 if self.object_lazy_compaction else 0 c_config.object_prefetch_compaction = 1 if self.object_prefetch_compaction else 0 @@ -721,6 +743,11 @@ class Stats: btree_total_nodes: int = 0 btree_max_height: int = 0 btree_avg_height: float = 0.0 + total_tombstones: int = 0 + tombstone_ratio: float = 0.0 + level_tombstone_counts: list[int] | None = None + max_sst_density: float = 0.0 + max_sst_density_level: int = 0 config: ColumnFamilyConfig | None = None @@ -785,8 +812,26 @@ class CommitOp: def default_config() -> Config: - """Get default database configuration.""" - return Config(db_path="") + """Get default database configuration sourced from the C library.""" + c_cfg = _lib.tidesdb_default_config() + return Config( + db_path="", + num_flush_threads=c_cfg.num_flush_threads, + num_compaction_threads=c_cfg.num_compaction_threads, + log_level=LogLevel(c_cfg.log_level), + block_cache_size=c_cfg.block_cache_size, + max_open_sstables=c_cfg.max_open_sstables, + log_to_file=bool(c_cfg.log_to_file), + log_truncation_at=c_cfg.log_truncation_at, + max_memory_usage=c_cfg.max_memory_usage, + unified_memtable=bool(c_cfg.unified_memtable), + unified_memtable_write_buffer_size=c_cfg.unified_memtable_write_buffer_size, + unified_memtable_skip_list_max_level=c_cfg.unified_memtable_skip_list_max_level, + unified_memtable_skip_list_probability=c_cfg.unified_memtable_skip_list_probability, + unified_memtable_sync_mode=SyncMode(c_cfg.unified_memtable_sync_mode), + unified_memtable_sync_interval_us=c_cfg.unified_memtable_sync_interval_us, + max_concurrent_flushes=c_cfg.max_concurrent_flushes, + ) def objstore_default_config() -> ObjStoreConfig: @@ -864,6 +909,8 @@ def default_column_family_config() -> ColumnFamilyConfig: min_disk_space=c_config.min_disk_space, l1_file_count_trigger=c_config.l1_file_count_trigger, l0_queue_stall_threshold=c_config.l0_queue_stall_threshold, + tombstone_density_trigger=c_config.tombstone_density_trigger, + tombstone_density_min_entries=c_config.tombstone_density_min_entries, use_btree=bool(c_config.use_btree), object_lazy_compaction=bool(c_config.object_lazy_compaction), object_prefetch_compaction=bool(c_config.object_prefetch_compaction), @@ -926,6 +973,8 @@ def load_config_from_ini(file_path: str, cf_name: str) -> ColumnFamilyConfig: min_disk_space=c_config.min_disk_space, l1_file_count_trigger=c_config.l1_file_count_trigger, l0_queue_stall_threshold=c_config.l0_queue_stall_threshold, + tombstone_density_trigger=c_config.tombstone_density_trigger, + tombstone_density_min_entries=c_config.tombstone_density_min_entries, use_btree=bool(c_config.use_btree), object_lazy_compaction=bool(c_config.object_lazy_compaction), object_prefetch_compaction=bool(c_config.object_prefetch_compaction), @@ -1090,6 +1139,42 @@ def compact(self) -> None: if result != TDB_SUCCESS: raise TidesDBError.from_code(result, "failed to compact column family") + def compact_range(self, start_key: bytes | None, end_key: bytes | None) -> None: + """ + Synchronously compact every SSTable whose key range overlaps + [start_key, end_key). + + Output is merged toward the largest level affected. Pass None for an + unbounded endpoint. Both endpoints None/empty is rejected with + TDB_ERR_INVALID_ARGS - use compact() for full-CF compaction. + + Args: + start_key: Inclusive lower bound, or None for unbounded. + end_key: Exclusive upper bound, or None for unbounded. + + Raises: + TidesDBError: TDB_ERR_INVALID_ARGS if both endpoints are unbounded, + TDB_ERR_LOCKED if another compaction is running, or other I/O + errors from the underlying merge. + """ + if start_key is None or len(start_key) == 0: + start_buf = None + start_len = 0 + else: + start_buf = (c_uint8 * len(start_key)).from_buffer_copy(start_key) + start_len = len(start_key) + + if end_key is None or len(end_key) == 0: + end_buf = None + end_len = 0 + else: + end_buf = (c_uint8 * len(end_key)).from_buffer_copy(end_key) + end_len = len(end_key) + + result = _lib.tidesdb_compact_range(self._cf, start_buf, start_len, end_buf, end_len) + if result != TDB_SUCCESS: + raise TidesDBError.from_code(result, "failed to compact range") + def flush_memtable(self) -> None: """Manually trigger memtable flush for this column family.""" result = _lib.tidesdb_flush_memtable(self._cf) @@ -1266,6 +1351,7 @@ def get_stats(self) -> Stats: level_sizes = [] level_num_sstables = [] level_key_counts = [] + level_tombstone_counts: list[int] = [] if c_stats.num_levels > 0: if c_stats.level_sizes: @@ -1277,6 +1363,9 @@ def get_stats(self) -> Stats: if c_stats.level_key_counts: for i in range(c_stats.num_levels): level_key_counts.append(c_stats.level_key_counts[i]) + if c_stats.level_tombstone_counts: + for i in range(c_stats.num_levels): + level_tombstone_counts.append(c_stats.level_tombstone_counts[i]) config = None if c_stats.config: @@ -1302,6 +1391,8 @@ def get_stats(self) -> Stats: min_disk_space=c_cfg.min_disk_space, l1_file_count_trigger=c_cfg.l1_file_count_trigger, l0_queue_stall_threshold=c_cfg.l0_queue_stall_threshold, + tombstone_density_trigger=c_cfg.tombstone_density_trigger, + tombstone_density_min_entries=c_cfg.tombstone_density_min_entries, use_btree=bool(c_cfg.use_btree), object_lazy_compaction=bool(c_cfg.object_lazy_compaction), object_prefetch_compaction=bool(c_cfg.object_prefetch_compaction), @@ -1323,6 +1414,11 @@ def get_stats(self) -> Stats: btree_total_nodes=c_stats.btree_total_nodes, btree_max_height=c_stats.btree_max_height, btree_avg_height=c_stats.btree_avg_height, + total_tombstones=c_stats.total_tombstones, + tombstone_ratio=c_stats.tombstone_ratio, + level_tombstone_counts=level_tombstone_counts, + max_sst_density=c_stats.max_sst_density, + max_sst_density_level=c_stats.max_sst_density_level, config=config, ) @@ -1624,6 +1720,7 @@ def __init__(self, config: Config) -> None: unified_memtable_sync_interval_us=config.unified_memtable_sync_interval_us, object_store=obj_store_ptr, object_store_config=obj_store_config_ptr, + max_concurrent_flushes=config.max_concurrent_flushes, ) db_ptr = c_void_p() @@ -1654,6 +1751,7 @@ def open( unified_memtable_sync_interval_us: int = 0, object_store: c_void_p | None = None, object_store_config: ObjStoreConfig | None = None, + max_concurrent_flushes: int = 0, ) -> TidesDB: """ Convenience method to open a database with individual parameters. @@ -1676,6 +1774,8 @@ def open( unified_memtable_sync_interval_us: Sync interval in microseconds for unified memtable object_store: Object store connector handle (from objstore_fs_create()) object_store_config: Object store behavior configuration + max_concurrent_flushes: Global semaphore on in-flight memtable flushes + across all column families (0 = library default) Returns: TidesDB instance @@ -1698,6 +1798,7 @@ def open( unified_memtable_sync_interval_us=unified_memtable_sync_interval_us, object_store=object_store, object_store_config=object_store_config, + max_concurrent_flushes=max_concurrent_flushes, ) return cls(config) diff --git a/tests/test_tidesdb.py b/tests/test_tidesdb.py index 96323c3..5551ff8 100644 --- a/tests/test_tidesdb.py +++ b/tests/test_tidesdb.py @@ -1514,5 +1514,163 @@ def test_readonly_error_message(self): assert "read-only" in str(err) +class TestTombstoneConfig: + """Tests for the new tombstone density CF config fields.""" + + def test_defaults_are_sensible(self): + """Default tombstone fields should mirror the C library defaults.""" + cfg = tidesdb.default_column_family_config() + # Trigger is 0.0 (disabled) by default; min_entries should be ~1024. + assert cfg.tombstone_density_trigger == 0.0 + assert cfg.tombstone_density_min_entries == 1024 + + def test_roundtrip_through_get_stats(self, db): + """Custom tombstone settings must round-trip via get_stats().config.""" + cfg = tidesdb.default_column_family_config() + cfg.tombstone_density_trigger = 0.5 + cfg.tombstone_density_min_entries = 256 + db.create_column_family("tomb_cf", cfg) + try: + cf = db.get_column_family("tomb_cf") + stats = cf.get_stats() + assert stats.config is not None + assert stats.config.tombstone_density_trigger == 0.5 + assert stats.config.tombstone_density_min_entries == 256 + finally: + try: + db.drop_column_family("tomb_cf") + except tidesdb.TidesDBError: + pass + + def test_ini_roundtrip(self, temp_db_path): + """Tombstone settings must round-trip through INI save/load.""" + original = tidesdb.default_column_family_config() + original.tombstone_density_trigger = 0.75 + original.tombstone_density_min_entries = 2048 + + ini_path = os.path.join(temp_db_path, "tomb.ini") + tidesdb.save_config_to_ini(ini_path, "tomb_cf", original) + loaded = tidesdb.load_config_from_ini(ini_path, "tomb_cf") + assert loaded.tombstone_density_trigger == 0.75 + assert loaded.tombstone_density_min_entries == 2048 + + +class TestTombstoneStats: + """Tests for the new tombstone fields on Stats.""" + + def test_tombstone_stats_populated(self, db, cf): + """After flushing some deletes, tombstone stats should be sensible.""" + n = 200 + with db.begin_txn() as txn: + for i in range(n): + txn.put(cf, f"k{i:04d}".encode(), f"v{i}".encode()) + txn.commit() + + cf.flush_memtable() + time.sleep(0.5) + + with db.begin_txn() as txn: + for i in range(n // 2): + txn.delete(cf, f"k{i:04d}".encode()) + txn.commit() + + cf.flush_memtable() + time.sleep(0.5) + + stats = cf.get_stats() + assert stats.total_tombstones >= 0 + assert 0.0 <= stats.tombstone_ratio <= 1.0 + assert 0.0 <= stats.max_sst_density <= 1.0 + assert isinstance(stats.max_sst_density_level, int) + assert stats.level_tombstone_counts is not None + assert len(stats.level_tombstone_counts) == stats.num_levels + + def test_tombstone_stats_defaults_on_empty_cf(self, db, cf): + """Empty CF should report zero tombstones and a [0,1] ratio.""" + stats = cf.get_stats() + assert stats.total_tombstones == 0 + assert stats.tombstone_ratio == 0.0 + assert stats.max_sst_density == 0.0 + assert stats.max_sst_density_level == 0 + assert stats.level_tombstone_counts is not None + assert len(stats.level_tombstone_counts) == stats.num_levels + + +class TestCompactRange: + """Tests for ColumnFamily.compact_range.""" + + def test_compact_range_narrow(self, db, cf): + """compact_range over a narrow range should succeed and not affect outside data.""" + for batch in range(3): + with db.begin_txn() as txn: + for i in range(50): + key = f"key:{batch:02d}:{i:04d}".encode() + txn.put(cf, key, b"v" * 32) + txn.commit() + cf.flush_memtable() + time.sleep(0.3) + + cf.compact_range(b"key:01:0010", b"key:01:0020") + + with db.begin_txn() as txn: + assert txn.get(cf, b"key:00:0005") == b"v" * 32 + assert txn.get(cf, b"key:02:0049") == b"v" * 32 + + def test_compact_range_unbounded_start(self, db, cf): + """None start means unbounded on the lower side.""" + with db.begin_txn() as txn: + for i in range(20): + txn.put(cf, f"k{i:03d}".encode(), b"value") + txn.commit() + cf.flush_memtable() + time.sleep(0.3) + + cf.compact_range(None, b"k010") + + with db.begin_txn() as txn: + assert txn.get(cf, b"k015") == b"value" + + def test_compact_range_both_unbounded_rejected(self, db, cf): + """Both endpoints unbounded must raise INVALID_ARGS.""" + with pytest.raises(tidesdb.TidesDBError) as excinfo: + cf.compact_range(None, None) + assert excinfo.value.code == tidesdb.TDB_ERR_INVALID_ARGS + + def test_compact_range_both_empty_rejected(self, db, cf): + """Both empty-byte endpoints must raise INVALID_ARGS.""" + with pytest.raises(tidesdb.TidesDBError) as excinfo: + cf.compact_range(b"", b"") + assert excinfo.value.code == tidesdb.TDB_ERR_INVALID_ARGS + + +class TestMaxConcurrentFlushes: + """Tests for the new DB-level max_concurrent_flushes config.""" + + def test_default_is_nonzero(self): + """default_config() must mirror the C library default (non-zero).""" + cfg = tidesdb.default_config() + assert cfg.max_concurrent_flushes != 0 + + def test_open_with_explicit_value(self, temp_db_path): + """Opening with max_concurrent_flushes=1 should succeed and allow normal writes.""" + cfg = tidesdb.default_config() + cfg.db_path = temp_db_path + cfg.max_concurrent_flushes = 1 + + db = tidesdb.TidesDB(cfg) + try: + db.create_column_family("flush_cf") + cf = db.get_column_family("flush_cf") + with db.begin_txn() as txn: + txn.put(cf, b"k", b"v") + txn.commit() + cf.flush_memtable() + time.sleep(0.3) + with db.begin_txn() as txn: + assert txn.get(cf, b"k") == b"v" + finally: + db.close() + + if __name__ == "__main__": pytest.main([__file__, "-v"])