diff --git a/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/__init__.py b/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/documentdb_tests/compatibility/tests/core/cursors/tailable-cursors/test_smoke_tailable-cursors.py b/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_smoke_tailable_cursors.py similarity index 100% rename from documentdb_tests/compatibility/tests/core/cursors/tailable-cursors/test_smoke_tailable-cursors.py rename to documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_smoke_tailable_cursors.py diff --git a/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_tailable_cursors.py b/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_tailable_cursors.py new file mode 100644 index 00000000..cb4b32cf --- /dev/null +++ b/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_tailable_cursors.py @@ -0,0 +1,309 @@ +"""Tests for tailable cursor creation and error conditions on capped collections.""" + +from __future__ import annotations + +import pytest + +from documentdb_tests.compatibility.tests.core.collections.commands.utils.command_test_case import ( + CommandContext, + CommandTestCase, +) +from documentdb_tests.framework.assertions import assertResult +from documentdb_tests.framework.error_codes import ( + BAD_VALUE_ERROR, + UNRECOGNIZED_EXPRESSION_ERROR, +) +from documentdb_tests.framework.executor import execute_command +from documentdb_tests.framework.parametrize import pytest_params +from documentdb_tests.framework.property_checks import Eq, Ne +from documentdb_tests.framework.target_collection import ( + CappedCollection, + SiblingCollection, + TargetCollection, + TimeseriesCollection, +) +from documentdb_tests.framework.test_constants import INT64_ZERO + +# Property [Tailable Cursor Creation]: a tailable cursor on a capped collection +# remains open after exhausting available results. +TAILABLE_CREATION_TESTS: list[CommandTestCase] = [ + CommandTestCase( + "creation_nonempty_cursor_open", + target_collection=CappedCollection(), + docs=[{"_id": 1, "x": 1}, {"_id": 2, "x": 2}], + command=lambda ctx: {"find": ctx.collection, "tailable": True, "batchSize": 100}, + expected={ + "cursor": { + "id": Ne(INT64_ZERO), + "firstBatch": Eq([{"_id": 1, "x": 1}, {"_id": 2, "x": 2}]), + }, + }, + msg="Tailable cursor on non-empty capped collection should stay open", + ), + CommandTestCase( + "creation_nontailable_cursor_closed", + target_collection=CappedCollection(), + docs=[{"_id": 1, "x": 1}, {"_id": 2, "x": 2}], + command=lambda ctx: {"find": ctx.collection, "tailable": False, "batchSize": 100}, + expected={ + "cursor": { + "id": Eq(INT64_ZERO), + "firstBatch": Eq([{"_id": 1, "x": 1}, {"_id": 2, "x": 2}]), + }, + }, + msg="Non-tailable cursor on non-empty capped collection should close", + ), + CommandTestCase( + "creation_empty_dead_cursor", + target_collection=CappedCollection(), + docs=[], + command=lambda ctx: {"find": ctx.collection, "tailable": True, "batchSize": 100}, + expected={"cursor": {"id": Eq(INT64_ZERO), "firstBatch": Eq([])}}, + msg="Tailable cursor on empty capped collection should produce a dead cursor", + ), + CommandTestCase( + "creation_nomatch_filter_cursor_open", + target_collection=CappedCollection(), + docs=[{"_id": 1, "x": 1}, {"_id": 2, "x": 2}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "batchSize": 100, + "filter": {"x": 999}, + }, + expected={"cursor": {"id": Ne(INT64_ZERO), "firstBatch": Eq([])}}, + msg="Tailable cursor with no-match filter on non-empty capped should stay open", + ), + CommandTestCase( + "creation_false_same_as_omitted", + target_collection=CappedCollection(), + docs=[{"_id": 1, "x": 1}], + command=lambda ctx: {"find": ctx.collection, "tailable": False, "batchSize": 100}, + expected={ + "cursor": { + "id": Eq(INT64_ZERO), + "firstBatch": Eq([{"_id": 1, "x": 1}]), + }, + }, + msg="tailable: false should behave identically to omitting the field", + ), +] + +# Property [Non-Capped and Special Collections]: tailable cursors require a capped +# collection. +TAILABLE_NONCAPPED_TESTS: list[CommandTestCase] = [ + CommandTestCase( + "noncapped_error", + target_collection=TargetCollection(), + docs=[{"_id": 1}], + command=lambda ctx: {"find": ctx.collection, "tailable": True}, + error_code=BAD_VALUE_ERROR, + msg="Tailable cursor on non-capped collection should produce an error", + ), + CommandTestCase( + "view_error", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + siblings=[SiblingCollection(suffix="_view", view_on_source=True)], + command=lambda ctx: {"find": f"{ctx.collection}_view", "tailable": True}, + error_code=UNRECOGNIZED_EXPRESSION_ERROR, + msg="Tailable cursor on a view should produce a view error", + ), + CommandTestCase( + "timeseries_error", + target_collection=TimeseriesCollection(), + command=lambda ctx: {"find": ctx.collection, "tailable": True}, + error_code=UNRECOGNIZED_EXPRESSION_ERROR, + msg="Tailable cursor on a timeseries collection should produce an error", + ), + CommandTestCase( + "nonexistent_dead_cursor", + docs=None, + command=lambda ctx: {"find": ctx.collection, "tailable": True}, + expected={"cursor": {"id": Eq(INT64_ZERO), "firstBatch": Eq([])}}, + msg="Tailable cursor on non-existent collection should produce dead cursor", + ), +] + +# Property [Sort Restrictions - Allowed]: tailable cursors only accept forward natural +# sort order. +TAILABLE_SORT_ALLOWED_TESTS: list[CommandTestCase] = [ + CommandTestCase( + "sort_natural_forward", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "sort": {"$natural": 1}, + }, + expected={ + "cursor": {"id": Ne(INT64_ZERO), "firstBatch": Eq([{"_id": 1}])}, + }, + msg="sort: {$natural: 1} should be allowed with tailable", + ), + CommandTestCase( + "sort_empty", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "sort": {}, + }, + expected={ + "cursor": {"id": Ne(INT64_ZERO), "firstBatch": Eq([{"_id": 1}])}, + }, + msg="sort: {} should be allowed with tailable", + ), +] + +# Property [Sort Restrictions - Rejected]: non-natural sorts are rejected with tailable. +TAILABLE_SORT_ERROR_TESTS: list[CommandTestCase] = [ + CommandTestCase( + "sort_natural_reverse_error", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "sort": {"$natural": -1}, + }, + error_code=BAD_VALUE_ERROR, + msg="sort: {$natural: -1} with tailable should produce an error", + ), + CommandTestCase( + "sort_field_error", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "sort": {"x": 1}, + }, + error_code=BAD_VALUE_ERROR, + msg="Non-$natural sort with tailable should produce an error", + ), + CommandTestCase( + "sort_compound_natural_error", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "sort": {"$natural": 1, "x": 1}, + }, + error_code=BAD_VALUE_ERROR, + msg="Compound sort with $natural and tailable should produce an error", + ), +] + +# Property [Incompatible Options]: certain find options are incompatible with tailable. +TAILABLE_INCOMPATIBLE_TESTS: list[CommandTestCase] = [ + CommandTestCase( + "singlebatch_conflict", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "singleBatch": True, + }, + error_code=BAD_VALUE_ERROR, + msg="singleBatch: true with tailable: true should produce an error", + ), + CommandTestCase( + "negative_limit", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "limit": -1, + }, + error_code=BAD_VALUE_ERROR, + msg="Negative limit with tailable should produce an error", + ), + CommandTestCase( + "negative_skip", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "skip": -1, + }, + error_code=BAD_VALUE_ERROR, + msg="Negative skip with tailable should produce an error", + ), +] + +# Property [Compatible Find Options]: standard find options are accepted alongside +# tailable. +TAILABLE_COMPATIBLE_TESTS: list[CommandTestCase] = [ + CommandTestCase( + "compatible_mixed_options", + target_collection=CappedCollection(), + docs=[{"_id": 1, "x": 1}, {"_id": 2, "x": 2}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "filter": {"x": {"$gte": 1}}, + "projection": {"x": 1}, + "skip": 0, + "limit": 10, + "batchSize": 5, + "allowDiskUse": True, + }, + expected={"cursor": {"id": Ne(INT64_ZERO)}}, + msg="Compatible find options should be accepted with tailable cursors", + ), + CommandTestCase( + "compatible_hint_index", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "hint": {"_id": 1}, + }, + expected={"cursor": {"id": Ne(INT64_ZERO)}}, + msg="hint: {_id: 1} should be accepted with tailable cursors", + ), + CommandTestCase( + "compatible_hint_natural_reverse", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "hint": {"$natural": -1}, + }, + expected={"cursor": {"id": Ne(INT64_ZERO)}}, + msg="hint: {$natural: -1} should be accepted with tailable cursors (unlike sort)", + ), +] + +ALL_TESTS = ( + TAILABLE_CREATION_TESTS + + TAILABLE_NONCAPPED_TESTS + + TAILABLE_SORT_ALLOWED_TESTS + + TAILABLE_SORT_ERROR_TESTS + + TAILABLE_INCOMPATIBLE_TESTS + + TAILABLE_COMPATIBLE_TESTS +) + + +@pytest.mark.parametrize("test", pytest_params(ALL_TESTS)) +def test_tailable_cursors(database_client, collection, test: CommandTestCase): + """Test tailable cursor creation and error conditions.""" + resolved = test.prepare(database_client, collection) + ctx = CommandContext.from_collection(resolved) + result = execute_command(resolved, test.build_command(ctx)) + assertResult( + result, + expected=test.build_expected(ctx), + error_code=test.error_code, + msg=test.msg, + raw_res=True, + ) diff --git a/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_tailable_cursors_awaitdata.py b/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_tailable_cursors_awaitdata.py new file mode 100644 index 00000000..1679d4ec --- /dev/null +++ b/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_tailable_cursors_awaitdata.py @@ -0,0 +1,192 @@ +"""Tests for tailable cursor awaitData behavior and noCursorTimeout.""" + +from __future__ import annotations + +import pytest + +from documentdb_tests.compatibility.tests.core.collections.commands.utils.command_test_case import ( + CommandContext, + CommandTestCase, +) +from documentdb_tests.framework.assertions import assertFailureCode, assertProperties, assertResult +from documentdb_tests.framework.error_codes import ( + BAD_VALUE_ERROR, + FAILED_TO_PARSE_ERROR, + UNRECOGNIZED_COMMAND_FIELD_ERROR, +) +from documentdb_tests.framework.executor import execute_command +from documentdb_tests.framework.parametrize import pytest_params +from documentdb_tests.framework.property_checks import Eq, Ne +from documentdb_tests.framework.target_collection import CappedCollection +from documentdb_tests.framework.test_constants import INT64_ZERO + +from .utils.capped import create_capped + +# Property [awaitData Behavior]: awaitData requires tailable and modifies cursor +# blocking behavior. +TAILABLE_AWAITDATA_SINGLE_CMD_TESTS: list[CommandTestCase] = [ + CommandTestCase( + "awaitdata_empty_capped_dead_cursor", + target_collection=CappedCollection(), + docs=[], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "awaitData": True, + "batchSize": 100, + }, + expected={"cursor": {"id": Eq(INT64_ZERO), "firstBatch": Eq([])}}, + msg="awaitData on empty capped collection should produce dead cursor", + ), + CommandTestCase( + "awaitdata_requires_tailable", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + command=lambda ctx: {"find": ctx.collection, "awaitData": True}, + error_code=FAILED_TO_PARSE_ERROR, + msg="awaitData: true without tailable should produce a parse error", + ), + CommandTestCase( + "awaitdata_false_without_tailable", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + command=lambda ctx: {"find": ctx.collection, "awaitData": False}, + expected={"cursor": {"firstBatch": Eq([{"_id": 1}])}}, + msg="awaitData: false without tailable should be accepted", + ), + CommandTestCase( + "nocursortimeout_with_tailable", + target_collection=CappedCollection(), + docs=[{"_id": 1}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "noCursorTimeout": True, + "batchSize": 100, + }, + expected={"cursor": {"id": Ne(INT64_ZERO)}}, + msg="noCursorTimeout with tailable on capped collection should keep cursor open", + ), +] + + +@pytest.mark.parametrize("test", pytest_params(TAILABLE_AWAITDATA_SINGLE_CMD_TESTS)) +def test_tailable_cursors_awaitdata_single(database_client, collection, test: CommandTestCase): + """Test single-command awaitData and noCursorTimeout behavior.""" + resolved = test.prepare(database_client, collection) + ctx = CommandContext.from_collection(resolved) + result = execute_command(resolved, test.build_command(ctx)) + assertResult( + result, + expected=test.build_expected(ctx), + error_code=test.error_code, + msg=test.msg, + raw_res=True, + ) + + +# Property [awaitData getMore Behavior]: an awaitData cursor blocks on getMore +# until maxTimeMS expires when no new data is available. +def test_tailable_cursors_awaitdata_blocks(database_client, collection): + """Test awaitData cursor blocks at end of data.""" + capped = create_capped(database_client, collection, [{"_id": 1}]) + result = execute_command( + capped, + {"find": capped.name, "tailable": True, "awaitData": True, "batchSize": 100}, + ) + cursor_id = result["cursor"]["id"] + + gm_result = execute_command( + capped, {"getMore": cursor_id, "collection": capped.name, "maxTimeMS": 200} + ) + assertProperties( + gm_result, + {"cursor": {"id": Ne(INT64_ZERO), "nextBatch": Eq([])}}, + msg="awaitData cursor should block then return empty batch", + raw_res=True, + ) + + +# Property [awaitData Non-Blocking]: a tailable cursor without awaitData returns +# immediately from getMore when no new data is available. +def test_tailable_cursors_awaitdata_nonblocking(database_client, collection): + """Test non-awaitData tailable cursor returns immediately.""" + capped = create_capped(database_client, collection, [{"_id": 1}]) + result = execute_command( + capped, + {"find": capped.name, "tailable": True, "awaitData": False, "batchSize": 100}, + ) + cursor_id = result["cursor"]["id"] + + gm_result = execute_command(capped, {"getMore": cursor_id, "collection": capped.name}) + assertProperties( + gm_result, + {"cursor": {"id": Ne(INT64_ZERO), "nextBatch": Eq([])}}, + msg="Non-awaitData tailable cursor should return immediately", + raw_res=True, + ) + + +# Property [awaitData maxTimeMS Zero]: maxTimeMS: 0 on getMore causes an awaitData +# cursor to return immediately. +def test_tailable_cursors_awaitdata_maxtime_zero(database_client, collection): + """Test getMore maxTimeMS: 0 on awaitData cursor returns immediately.""" + capped = create_capped(database_client, collection, [{"_id": 1}]) + result = execute_command( + capped, + {"find": capped.name, "tailable": True, "awaitData": True, "batchSize": 100}, + ) + cursor_id = result["cursor"]["id"] + + gm_result = execute_command( + capped, {"getMore": cursor_id, "collection": capped.name, "maxTimeMS": 0} + ) + assertProperties( + gm_result, + {"cursor": {"id": Ne(INT64_ZERO), "nextBatch": Eq([])}}, + msg="getMore maxTimeMS: 0 on awaitData cursor should return immediately", + raw_res=True, + ) + + +# Property [maxTimeMS Requires awaitData]: maxTimeMS on getMore is rejected for +# non-awaitData tailable cursors. +def test_tailable_cursors_awaitdata_maxtime_noawait_error(database_client, collection): + """Test maxTimeMS on getMore for non-awaitData tailable cursor produces error.""" + capped = create_capped(database_client, collection, [{"_id": 1}]) + result = execute_command( + capped, + {"find": capped.name, "tailable": True, "awaitData": False, "batchSize": 100}, + ) + cursor_id = result["cursor"]["id"] + + gm_result = execute_command( + capped, {"getMore": cursor_id, "collection": capped.name, "maxTimeMS": 100} + ) + assertFailureCode( + gm_result, + BAD_VALUE_ERROR, + msg="maxTimeMS on getMore for non-awaitData tailable cursor should produce an error", + ) + + +# Property [getMore Rejects awaitData Field]: the awaitData field is not accepted +# on the getMore command itself. +def test_tailable_cursors_awaitdata_getmore_rejects_awaitdata_field(database_client, collection): + """Test specifying awaitData on getMore command is rejected.""" + capped = create_capped(database_client, collection, [{"_id": 1}]) + result = execute_command( + capped, + {"find": capped.name, "tailable": True, "awaitData": True, "batchSize": 100}, + ) + cursor_id = result["cursor"]["id"] + + gm_result = execute_command( + capped, + {"getMore": cursor_id, "collection": capped.name, "awaitData": True}, + ) + assertFailureCode( + gm_result, + UNRECOGNIZED_COMMAND_FIELD_ERROR, + msg="Specifying awaitData on getMore should produce unrecognized command field error", + ) diff --git a/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_tailable_cursors_invalidation.py b/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_tailable_cursors_invalidation.py new file mode 100644 index 00000000..194a1b9d --- /dev/null +++ b/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_tailable_cursors_invalidation.py @@ -0,0 +1,233 @@ +"""Tests for tailable cursor invalidation and collection operations.""" + +from __future__ import annotations + +import pytest + +from documentdb_tests.framework.assertions import ( + assertFailureCode, + assertProperties, +) +from documentdb_tests.framework.error_codes import ( + CAPPED_POSITION_LOST_ERROR, + CURSOR_NOT_FOUND_ERROR, + QUERY_PLAN_KILLED_ERROR, +) +from documentdb_tests.framework.executor import execute_command +from documentdb_tests.framework.property_checks import Eq, Ne +from documentdb_tests.framework.test_constants import INT64_ZERO + +from .utils.capped import create_capped + + +# Property [killCursors on Tailable Cursors]: killCursors terminates a tailable cursor. +def test_tailable_cursors_getmore_after_kill(database_client, collection): + """Test getMore after killCursors produces cursor not found error.""" + capped = create_capped(database_client, collection, [{"_id": 1}]) + result = execute_command(capped, {"find": capped.name, "tailable": True, "batchSize": 100}) + cursor_id = result["cursor"]["id"] + + execute_command(capped, {"killCursors": capped.name, "cursors": [cursor_id]}) + + gm_result = execute_command(capped, {"getMore": cursor_id, "collection": capped.name}) + assertFailureCode( + gm_result, + CURSOR_NOT_FOUND_ERROR, + msg="getMore after killCursors should produce cursor not found error", + ) + + +# Property [Capped Position Lost]: when eviction overwrites the cursor's read position, +# getMore returns CappedPositionLost error. +def test_tailable_cursors_capped_position_lost(database_client, collection): + """Test getMore returns CappedPositionLost when eviction overwrites cursor position.""" + capped = create_capped( + database_client, + collection, + [{"_id": 1}, {"_id": 2}, {"_id": 3}], + size=100_000, + max=3, + ) + + result = execute_command(capped, {"find": capped.name, "tailable": True, "batchSize": 100}) + cursor_id = result["cursor"]["id"] + + # Evict all documents the cursor has seen. + capped.insert_many([{"_id": 4}, {"_id": 5}, {"_id": 6}]) + + gm_result = execute_command(capped, {"getMore": cursor_id, "collection": capped.name}) + assertFailureCode( + gm_result, + CAPPED_POSITION_LOST_ERROR, + msg="getMore should return CappedPositionLost when eviction overwrites cursor position", + ) + + +# Property [Partial Eviction]: when earlier documents are evicted but the cursor's +# read position is still valid, the cursor continues normally. +def test_tailable_cursors_partial_eviction(database_client, collection): + """Test cursor survives partial eviction and returns new documents.""" + capped = create_capped( + database_client, + collection, + [{"_id": 1}, {"_id": 2}, {"_id": 3}, {"_id": 4}, {"_id": 5}], + size=100_000, + max=5, + ) + + result = execute_command(capped, {"find": capped.name, "tailable": True, "batchSize": 100}) + cursor_id = result["cursor"]["id"] + + # Evict _id:1 and _id:2, but cursor position (after _id:5) is still valid. + capped.insert_many([{"_id": 6}, {"_id": 7}]) + + gm_result = execute_command(capped, {"getMore": cursor_id, "collection": capped.name}) + assertProperties( + gm_result, + {"cursor": {"id": Ne(INT64_ZERO), "nextBatch": Eq([{"_id": 6}, {"_id": 7}])}}, + msg="Cursor should survive partial eviction and return new documents", + raw_res=True, + ) + + +# Property [Collection Drop]: dropping the collection invalidates open tailable +# cursors. +def test_tailable_cursors_getmore_after_drop(database_client, collection): + """Test getMore after collection drop produces query plan killed error.""" + capped = create_capped(database_client, collection, [{"_id": 1}]) + result = execute_command(capped, {"find": capped.name, "tailable": True, "batchSize": 100}) + cursor_id = result["cursor"]["id"] + + capped.drop() + + gm_result = execute_command(capped, {"getMore": cursor_id, "collection": capped.name}) + assertFailureCode( + gm_result, + QUERY_PLAN_KILLED_ERROR, + msg="getMore after collection drop should produce query plan killed error", + ) + + +# Property [Drop and Recreate]: recreating a collection with the same name does not +# revive cursors from the original. +def test_tailable_cursors_getmore_after_drop_recreate(database_client, collection): + """Test getMore after drop and recreate still produces query plan killed error.""" + capped = create_capped(database_client, collection, [{"_id": 1}]) + capped_name = capped.name + result = execute_command(capped, {"find": capped_name, "tailable": True, "batchSize": 100}) + cursor_id = result["cursor"]["id"] + + capped.drop() + # Recreate with same name + execute_command( + collection, + {"create": capped_name, "capped": True, "size": 4096}, + ) + + gm_result = execute_command(capped, {"getMore": cursor_id, "collection": capped_name}) + assertFailureCode( + gm_result, + QUERY_PLAN_KILLED_ERROR, + msg="getMore after drop and recreate should produce query plan killed error", + ) + + +# Property [Rename Invalidation]: renaming the collection invalidates open tailable +# cursors. +def test_tailable_cursors_getmore_after_rename(database_client, collection): + """Test getMore after collection rename produces query plan killed error.""" + capped = create_capped(database_client, collection, [{"_id": 1}]) + capped_name = capped.name + result = execute_command(capped, {"find": capped_name, "tailable": True, "batchSize": 100}) + cursor_id = result["cursor"]["id"] + + renamed = f"{capped_name}_renamed" + collection.database.client["admin"].command( + { + "renameCollection": f"{capped.database.name}.{capped_name}", + "to": f"{capped.database.name}.{renamed}", + } + ) + + gm_result = execute_command(capped, {"getMore": cursor_id, "collection": capped_name}) + assertFailureCode( + gm_result, + QUERY_PLAN_KILLED_ERROR, + msg="getMore after collection rename should produce query plan killed error", + ) + + +# Property [Multiple Independent Tailable Cursors]: multiple tailable cursors on the +# same collection each independently see new documents. +def test_tailable_cursors_multiple_cursor1_sees_new(database_client, collection): + """Test first cursor sees documents inserted while second cursor is also open.""" + capped = create_capped(database_client, collection, [{"_id": 1}]) + + r1 = execute_command(capped, {"find": capped.name, "tailable": True, "batchSize": 100}) + # Open a second cursor to ensure multiple cursors coexist + execute_command(capped, {"find": capped.name, "tailable": True, "batchSize": 100}) + cid1 = r1["cursor"]["id"] + + capped.insert_one({"_id": 2, "x": 2}) + + gm1 = execute_command(capped, {"getMore": cid1, "collection": capped.name}) + assertProperties( + gm1, + {"cursor": {"nextBatch": Eq([{"_id": 2, "x": 2}])}}, + msg="First cursor should see new document while second cursor is open", + raw_res=True, + ) + + +# Property [Independent Cursor Position]: a second tailable cursor sees the same new +# documents as the first, maintaining its own read position. +def test_tailable_cursors_multiple_cursor2_sees_new(database_client, collection): + """Test second cursor independently sees new documents.""" + capped = create_capped(database_client, collection, [{"_id": 1}]) + + # Open two cursors; advance the first past the new insert + r1 = execute_command(capped, {"find": capped.name, "tailable": True, "batchSize": 100}) + r2 = execute_command(capped, {"find": capped.name, "tailable": True, "batchSize": 100}) + cid1 = r1["cursor"]["id"] + cid2 = r2["cursor"]["id"] + + capped.insert_one({"_id": 2, "x": 2}) + + # Consume from first cursor + execute_command(capped, {"getMore": cid1, "collection": capped.name}) + + # Second cursor should still see the same document + gm2 = execute_command(capped, {"getMore": cid2, "collection": capped.name}) + assertProperties( + gm2, + {"cursor": {"nextBatch": Eq([{"_id": 2, "x": 2}])}}, + msg="Second cursor should see document already consumed by first cursor", + raw_res=True, + ) + + +# Property [Concurrent Writers]: concurrent inserts from multiple connections are +# all visible to a tailable cursor. +@pytest.mark.no_parallel +def test_tailable_cursors_concurrent_writers(engine_client, database_client, collection): + """Test concurrent inserts from multiple connections are visible.""" + capped = create_capped(database_client, collection, [{"_id": 0}]) + + result = execute_command(capped, {"find": capped.name, "tailable": True, "batchSize": 100}) + cursor_id = result["cursor"]["id"] + + # Insert from the same client but different collection handle + capped.insert_one({"_id": 1, "src": "conn1"}) + + # Use the engine_client to get a second handle to the same collection + capped2 = engine_client[capped.database.name][capped.name] + capped2.insert_one({"_id": 2, "src": "conn2"}) + + # getMore should see both documents + gm_result = execute_command(capped, {"getMore": cursor_id, "collection": capped.name}) + assertProperties( + gm_result, + {"cursor": {"id": Ne(INT64_ZERO)}}, + msg="Concurrent inserts should be visible to tailable cursor", + raw_res=True, + ) diff --git a/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_tailable_cursors_lifecycle.py b/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_tailable_cursors_lifecycle.py new file mode 100644 index 00000000..de6f8725 --- /dev/null +++ b/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/test_tailable_cursors_lifecycle.py @@ -0,0 +1,207 @@ +"""Tests for tailable cursor lifecycle with getMore.""" + +from __future__ import annotations + +import pytest + +from documentdb_tests.compatibility.tests.core.collections.commands.utils.command_test_case import ( + CommandContext, + CommandTestCase, +) +from documentdb_tests.framework.assertions import assertProperties, assertResult +from documentdb_tests.framework.executor import execute_command +from documentdb_tests.framework.parametrize import pytest_params +from documentdb_tests.framework.property_checks import Eq, Ne +from documentdb_tests.framework.target_collection import CappedCollection +from documentdb_tests.framework.test_constants import INT64_ZERO + +from .utils.capped import create_capped + +# Property [Cursor Lifecycle with getMore]: find options like skip, batchSize, and +# limit affect the initial tailable cursor response. +TAILABLE_LIFECYCLE_FIND_TESTS: list[CommandTestCase] = [ + CommandTestCase( + "skip_initial", + target_collection=CappedCollection(), + docs=[{"_id": 1, "x": 1}, {"_id": 2, "x": 2}, {"_id": 3, "x": 3}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "skip": 2, + "batchSize": 100, + }, + expected={"cursor": {"firstBatch": Eq([{"_id": 3, "x": 3}])}}, + msg="Skip should apply to initial query", + ), + CommandTestCase( + "batchsize_zero_find", + target_collection=CappedCollection(), + docs=[{"_id": 1, "x": 1}], + command=lambda ctx: {"find": ctx.collection, "tailable": True, "batchSize": 0}, + expected={"cursor": {"id": Ne(INT64_ZERO), "firstBatch": Eq([])}}, + msg="batchSize: 0 should establish cursor without returning documents", + ), + CommandTestCase( + "sequential_batches_first", + target_collection=CappedCollection(), + docs=[{"_id": i} for i in range(1, 6)], + command=lambda ctx: {"find": ctx.collection, "tailable": True, "batchSize": 2}, + expected={ + "cursor": { + "id": Ne(INT64_ZERO), + "firstBatch": Eq([{"_id": 1}, {"_id": 2}]), + }, + }, + msg="First batch should contain first 2 documents", + ), + CommandTestCase( + "limit_restricts_results", + target_collection=CappedCollection(), + docs=[{"_id": 1}, {"_id": 2}, {"_id": 3}, {"_id": 4}, {"_id": 5}], + command=lambda ctx: { + "find": ctx.collection, + "tailable": True, + "limit": 2, + "batchSize": 100, + }, + expected={ + "cursor": { + "id": Ne(INT64_ZERO), + "firstBatch": Eq([{"_id": 1}, {"_id": 2}]), + }, + }, + msg="Tailable cursor with limit should return at most limit documents", + ), +] + + +@pytest.mark.parametrize("test", pytest_params(TAILABLE_LIFECYCLE_FIND_TESTS)) +def test_tailable_cursors_lifecycle_find(database_client, collection, test: CommandTestCase): + """Test single-command tailable cursor lifecycle behavior.""" + resolved = test.prepare(database_client, collection) + ctx = CommandContext.from_collection(resolved) + result = execute_command(resolved, test.build_command(ctx)) + assertResult(result, expected=test.build_expected(ctx), msg=test.msg, raw_res=True) + + +# Property [Empty getMore]: getMore at end of data returns an empty batch and the +# cursor stays open. +def test_tailable_cursors_lifecycle_empty_getmore(database_client, collection): + """Test getMore at end of data returns empty batch, cursor stays open.""" + capped = create_capped(database_client, collection, [{"_id": 1, "x": 1}]) + result = execute_command(capped, {"find": capped.name, "tailable": True, "batchSize": 100}) + cursor_id = result["cursor"]["id"] + + gm_result = execute_command(capped, {"getMore": cursor_id, "collection": capped.name}) + assertProperties( + gm_result, + {"cursor": {"id": Ne(INT64_ZERO), "nextBatch": Eq([])}}, + msg="getMore at end of data should return empty batch with cursor open", + raw_res=True, + ) + + +# Property [New Insert Visibility]: getMore returns documents inserted after the +# cursor was opened. +def test_tailable_cursors_lifecycle_new_inserts(database_client, collection): + """Test getMore returns newly inserted documents.""" + capped = create_capped(database_client, collection, [{"_id": 1, "x": 1}]) + result = execute_command(capped, {"find": capped.name, "tailable": True, "batchSize": 100}) + cursor_id = result["cursor"]["id"] + + capped.insert_one({"_id": 2, "x": 2}) + + gm_result = execute_command(capped, {"getMore": cursor_id, "collection": capped.name}) + assertProperties( + gm_result, + {"cursor": {"id": Ne(INT64_ZERO), "nextBatch": Eq([{"_id": 2, "x": 2}])}}, + msg="getMore should return newly inserted documents", + raw_res=True, + ) + + +# Property [Limit with Tailable]: limit caps total documents returned but the cursor +# remains open. +def test_tailable_cursors_lifecycle_limit_cursor_stays_open(database_client, collection): + """Test tailable cursor stays open after limit is exhausted.""" + capped = create_capped( + database_client, collection, [{"_id": 1}, {"_id": 2}, {"_id": 3}, {"_id": 4}, {"_id": 5}] + ) + result = execute_command( + capped, {"find": capped.name, "tailable": True, "limit": 2, "batchSize": 100} + ) + cursor_id = result["cursor"]["id"] + + gm_result = execute_command(capped, {"getMore": cursor_id, "collection": capped.name}) + assertProperties( + gm_result, + {"cursor": {"id": Ne(INT64_ZERO), "nextBatch": Eq([])}}, + msg="Tailable cursor should stay open after limit is exhausted", + raw_res=True, + ) + + +# Property [Limit Blocks New Inserts]: new inserts are not visible via getMore after +# the limit is exhausted. +def test_tailable_cursors_lifecycle_limit_blocks_new_inserts(database_client, collection): + """Test new inserts are not visible after limit is exhausted.""" + capped = create_capped( + database_client, collection, [{"_id": 1}, {"_id": 2}, {"_id": 3}, {"_id": 4}, {"_id": 5}] + ) + result = execute_command( + capped, {"find": capped.name, "tailable": True, "limit": 2, "batchSize": 100} + ) + cursor_id = result["cursor"]["id"] + + capped.insert_one({"_id": 6}) + + gm_result = execute_command(capped, {"getMore": cursor_id, "collection": capped.name}) + assertProperties( + gm_result, + {"cursor": {"id": Ne(INT64_ZERO), "nextBatch": Eq([])}}, + msg="New inserts should not be visible after limit is exhausted", + raw_res=True, + ) + + +# Property [Limit Counts Across getMores]: limit counts total documents delivered +# across all getMores, not just the initial batch. +def test_tailable_cursors_lifecycle_limit_counts_across_getmores(database_client, collection): + """Test limit counts total documents across getMores, not just initial batch.""" + capped = create_capped(database_client, collection, [{"_id": 1}]) + result = execute_command( + capped, {"find": capped.name, "tailable": True, "limit": 2, "batchSize": 100} + ) + cursor_id = result["cursor"]["id"] + + capped.insert_many([{"_id": 2}, {"_id": 3}]) + gm1 = execute_command(capped, {"getMore": cursor_id, "collection": capped.name}) + assertProperties( + gm1, + {"cursor": {"id": Ne(INT64_ZERO), "nextBatch": Eq([{"_id": 2}])}}, + msg="getMore should return only one doc when limit of 2 allows one more", + raw_res=True, + ) + + +# Property [Limit Exhausted Across getMores]: documents beyond the limit are not +# returned even via subsequent getMore calls. +def test_tailable_cursors_lifecycle_limit_exhausted_across_getmores(database_client, collection): + """Test documents beyond limit are not returned even via getMore.""" + capped = create_capped(database_client, collection, [{"_id": 1}]) + result = execute_command( + capped, {"find": capped.name, "tailable": True, "limit": 2, "batchSize": 100} + ) + cursor_id = result["cursor"]["id"] + + capped.insert_one({"_id": 2}) + gm1 = execute_command(capped, {"getMore": cursor_id, "collection": capped.name}) + + capped.insert_one({"_id": 3}) + gm2 = execute_command(capped, {"getMore": gm1["cursor"]["id"], "collection": capped.name}) + assertProperties( + gm2, + {"cursor": {"id": Ne(INT64_ZERO), "nextBatch": Eq([])}}, + msg="Third doc should not be visible after limit of 2 is exhausted across getMores", + raw_res=True, + ) diff --git a/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/utils/__init__.py b/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/utils/capped.py b/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/utils/capped.py new file mode 100644 index 00000000..3eb17d1d --- /dev/null +++ b/documentdb_tests/compatibility/tests/core/cursors/tailable_cursors/utils/capped.py @@ -0,0 +1,23 @@ +"""Shared utilities for tailable cursor tests.""" + +from __future__ import annotations + +from typing import Any + +from pymongo.collection import Collection +from pymongo.database import Database + +from documentdb_tests.framework.target_collection import CappedCollection + + +def create_capped( + db: Database, + collection: Collection, + docs: list[dict[str, Any]], + **kwargs: Any, +) -> Collection: + """Create a capped collection and insert docs.""" + capped = CappedCollection(**kwargs).resolve(db, collection) + if docs: + capped.insert_many(docs) + return capped diff --git a/documentdb_tests/framework/error_codes.py b/documentdb_tests/framework/error_codes.py index adbc5c20..00d6a07b 100644 --- a/documentdb_tests/framework/error_codes.py +++ b/documentdb_tests/framework/error_codes.py @@ -12,6 +12,7 @@ ILLEGAL_OPERATION_ERROR = 20 NAMESPACE_NOT_FOUND_ERROR = 26 INDEX_NOT_FOUND_ERROR = 27 +CURSOR_NOT_FOUND_ERROR = 43 NAMESPACE_EXISTS_ERROR = 48 COMMAND_NOT_FOUND_ERROR = 59 CANNOT_CREATE_INDEX_ERROR = 67 @@ -22,11 +23,13 @@ INDEX_OPTIONS_CONFLICT_ERROR = 85 INDEX_KEY_SPECS_CONFLICT_ERROR = 86 DOCUMENT_VALIDATION_FAILURE_ERROR = 121 +CAPPED_POSITION_LOST_ERROR = 136 INCOMPATIBLE_COLLATION_VERSION_ERROR = 161 VIEW_DEPTH_LIMIT_ERROR = 165 COMMAND_NOT_SUPPORTED_ON_VIEW_ERROR = 166 OPTION_NOT_SUPPORTED_ON_VIEW_ERROR = 167 UNRECOGNIZED_EXPRESSION_ERROR = 168 +QUERY_PLAN_KILLED_ERROR = 175 VIEW_PIPELINE_TOO_LARGE_ERROR = 195 INVALID_INDEX_SPEC_OPTION_ERROR = 197 INVALID_UUID_ERROR = 207