diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto index 1f55927e4ae..016571fd10d 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto @@ -252,6 +252,7 @@ message InitializeExecutorRequest { int32 totalWorkerCount = 1; core.OpExecInitInfo opExecInitInfo = 2; bool isSource = 3; + core.ExecutionIdentity executionId = 4; } message UpdateExecutorRequest { diff --git a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py index 2c2dc1ad3c7..46d851f2419 100644 --- a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py @@ -26,7 +26,12 @@ class InitializeExecutorHandler(ControlHandler): async def initialize_executor(self, req: InitializeExecutorRequest) -> EmptyReturn: + from pytexera.storage import large_binary_manager + op_exec_with_code: OpExecWithCode = get_one_of(req.op_exec_init_info) + large_binary_manager.set_current_execution_id( + req.execution_id.id if req.execution_id else None + ) self.context.executor_manager.initialize_executor( op_exec_with_code.code, req.is_source, op_exec_with_code.language ) diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py b/amber/src/main/python/pytexera/storage/large_binary_manager.py index e061eac6228..35ebd5312c2 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_manager.py +++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py @@ -22,7 +22,6 @@ and LargeBinaryInputStream/LargeBinaryOutputStream instead. """ -import time import uuid from loguru import logger from core.storage.storage_config import StorageConfig @@ -31,6 +30,20 @@ _s3_client = None DEFAULT_BUCKET = "texera-large-binaries" +# Set at executor init and read by create() +_current_execution_id = None + + +def set_current_execution_id(execution_id): + """Sets the execution id used to scope large binaries created by this worker.""" + global _current_execution_id + _current_execution_id = execution_id + + +def get_current_execution_id(): + """Returns the execution id set for this worker, or None if unset.""" + return _current_execution_id + def _get_s3_client(): """Get or initialize S3 client (lazy initialization, cached).""" @@ -68,11 +81,19 @@ def create() -> str: """ Creates a new largebinary reference with a unique S3 URI. + The object key is namespaced by the current execution id so cleanup can delete + only this execution's objects. + Returns: - S3 URI string (format: s3://bucket/key) + S3 URI string (format: s3://bucket/objects/{execution_id}/{uuid}) """ _ensure_bucket_exists(DEFAULT_BUCKET) - timestamp_ms = int(time.time() * 1000) + execution_id = get_current_execution_id() + if execution_id is None: + raise RuntimeError( + "largebinary() requires an execution context, but no execution id " + "has been set for this worker." + ) unique_id = uuid.uuid4() - object_key = f"objects/{timestamp_ms}/{unique_id}" + object_key = f"objects/{execution_id}/{unique_id}" return f"s3://{DEFAULT_BUCKET}/{object_key}" diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 5a9df11b589..91191af139f 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -404,7 +404,8 @@ class RegionExecutionCoordinator( InitializeExecutorRequest( workerConfigs.length, physicalOp.opExecInitInfo, - physicalOp.isSourceOperator + physicalOp.isSourceOperator, + Some(physicalOp.executionId) ), asyncRPCClient.mkContext(workerId) ) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index 969b466a1b2..8a5c71747bc 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -27,6 +27,7 @@ import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer import org.apache.texera.amber.util.VirtualIdentityUtils +import org.apache.texera.service.util.LargeBinaryManager trait InitializeExecutorHandler { this: DataProcessorRPCHandlerInitializer => @@ -44,6 +45,7 @@ trait InitializeExecutorHandler { ) ) cachedTotalWorkerCount = req.totalWorkerCount + req.executionId.foreach(eid => LargeBinaryManager.setCurrentExecutionId(eid.id)) setupExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount) EmptyReturn() } diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala index cb910d11c3c..6d6e68a2b5f 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala @@ -611,7 +611,8 @@ class WorkflowResource extends LazyLogging { .asScala .toList - LargeBinaryManager.deleteAllObjects() + // Delete large binaries for each execution belonging to the workflows being removed + eids.foreach(eid => LargeBinaryManager.deleteByExecution(eid.longValue())) // Collect all URIs related to executions for cleanup val uris = eids.flatMap { eid => diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index 809faf6a520..3f6a1db9f6e 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -311,7 +311,7 @@ class WorkflowService( * 2. Clears URI references from the execution registry * 3. Safely clears all result and console message documents * 4. Expires Iceberg snapshots for runtime statistics - * 5. Deletes large binaries from MinIO + * 5. Deletes this execution's large binaries from MinIO * * @param eid The execution identity to clean up resources for */ @@ -348,7 +348,7 @@ class WorkflowService( logger.debug(s"Error processing document at $uri: ${error.getMessage}") } } - // Delete large binaries - LargeBinaryManager.deleteAllObjects() + // Delete this execution's large binaries + LargeBinaryManager.deleteByExecution(eid.id) } } diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py index 1942e91f8bc..7b9b6f3ce11 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +import re + import pytest from unittest.mock import patch, MagicMock from pytexera.storage import large_binary_manager @@ -42,6 +44,11 @@ def setup_storage_config(self): s3_auth_username="minioadmin", s3_auth_password="minioadmin", ) + # Provide a default execution id so create() doesn't raise. + original_eid = large_binary_manager.get_current_execution_id() + large_binary_manager.set_current_execution_id(1) + yield + large_binary_manager.set_current_execution_id(original_eid) def test_get_s3_client_initializes_once(self): """Test that S3 client is initialized and cached.""" @@ -119,7 +126,7 @@ def test_ensure_bucket_exists_creates_bucket_when_missing(self): mock_client.create_bucket.assert_called_once_with(Bucket="test-bucket") def test_create_generates_unique_uri(self): - """Test that create() generates a unique S3 URI.""" + """Test that create() generates a unique execution-scoped S3 URI.""" large_binary_manager._s3_client = None with patch("boto3.client") as mock_boto3_client: @@ -130,10 +137,10 @@ def test_create_generates_unique_uri(self): uri = large_binary_manager.create() - # Check URI format + # Check URI format: s3://bucket/objects/{eid}/{uuid} assert uri.startswith("s3://") assert uri.startswith(f"s3://{large_binary_manager.DEFAULT_BUCKET}/") - assert "objects/" in uri + assert f"objects/{large_binary_manager.get_current_execution_id()}/" in uri # Verify bucket was checked/created mock_client.head_bucket.assert_called_once_with( @@ -152,3 +159,26 @@ def test_create_uses_default_bucket(self): uri = large_binary_manager.create() assert large_binary_manager.DEFAULT_BUCKET in uri + assert f"objects/{large_binary_manager.get_current_execution_id()}/" in uri + + +def test_create_stamps_execution_id(monkeypatch): + # Avoid touching real S3 while testing key generation. + monkeypatch.setattr( + large_binary_manager, "_ensure_bucket_exists", lambda bucket: None + ) + monkeypatch.setattr(large_binary_manager, "_current_execution_id", 42) + + uri = large_binary_manager.create() + + assert re.fullmatch(r"s3://texera-large-binaries/objects/42/[0-9a-fA-F-]+", uri) + + +def test_create_without_execution_context_raises(monkeypatch): + monkeypatch.setattr( + large_binary_manager, "_ensure_bucket_exists", lambda bucket: None + ) + monkeypatch.setattr(large_binary_manager, "_current_execution_id", None) + + with pytest.raises(RuntimeError): + large_binary_manager.create() diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala index df9cb086a60..5494b763e24 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/WorkerSpec.scala @@ -200,7 +200,8 @@ class WorkerSpec OpExecWithClassName( "org.apache.texera.amber.engine.architecture.worker.DummyOperatorExecutor" ), - isSource = false + isSource = false, + executionId = None ), AsyncRPCContext(CONTROLLER, identifier1), 4 diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala index 44db3929f27..467a3670213 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala @@ -33,31 +33,63 @@ object LargeBinaryManager extends LazyLogging { val DEFAULT_BUCKET: String = "texera-large-binaries" /** - * Creates a new LargeBinary reference. + * Worker-scoped execution context. It is set on the data-processing thread when an + * executor is initialized. + */ + private val currentExecutionId: ThreadLocal[Option[Long]] = + ThreadLocal.withInitial(() => Option.empty[Long]) + + /** Sets the execution id for large binaries created on the current thread. */ + def setCurrentExecutionId(executionId: Long): Unit = + currentExecutionId.set(Some(executionId)) + + /** + * Creates a new LargeBinary reference scoped to the current execution. * The actual data upload happens separately via LargeBinaryOutputStream. * - * @return S3 URI string for the new LargeBinary (format: s3://bucket/key) + * @return S3 URI string for the new LargeBinary (format: s3://bucket/objects/{eid}/{uuid}) */ def create(): String = { - val objectKey = s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}" - val uri = s"s3://$DEFAULT_BUCKET/$objectKey" - - uri + val eid = currentExecutionId + .get() + .getOrElse( + throw new IllegalStateException( + "LargeBinaryManager.create() requires an execution context, " + + "but none was set on the current thread." + ) + ) + val objectKey = s"objects/$eid/${UUID.randomUUID()}" + s"s3://$DEFAULT_BUCKET/$objectKey" } /** - * Deletes all large binaries from the bucket. + * Deletes all large binaries belonging to a single execution. * - * @throws java.lang.Exception if the deletion fails - * @return Unit + * @param executionId the execution whose large binaries should be removed + */ + def deleteByExecution(executionId: Long): Unit = + deleteByExecution(executionId, S3StorageClient.deleteDirectory) + + /** + * Overload that takes the directory-delete operation as a parameter. Visible for + * testing */ - def deleteAllObjects(): Unit = { + private[util] def deleteByExecution( + executionId: Long, + deleteDir: (String, String) => Unit + ): Unit = { try { - S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects") - logger.info(s"Successfully deleted all large binaries from bucket: $DEFAULT_BUCKET") + deleteDir(DEFAULT_BUCKET, s"objects/$executionId") + logger.info( + s"Deleted large binaries for execution $executionId from bucket: $DEFAULT_BUCKET" + ) } catch { case e: Exception => - logger.warn(s"Failed to delete large binaries from bucket: $DEFAULT_BUCKET", e) + logger.warn( + s"Failed to delete large binaries for execution $executionId " + + s"from bucket: $DEFAULT_BUCKET", + e + ) } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala index 77d142efeeb..0a15f0832fa 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala @@ -21,8 +21,18 @@ package org.apache.texera.service.util import org.apache.texera.amber.core.tuple.LargeBinary import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.BeforeAndAfterEach -class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { +class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase with BeforeAndAfterEach { + + /** Execution id used by the bulk of the tests. */ + private val TestExecutionId: Long = 9999L + + /** Each test creates large binaries; they need an execution context on the thread. */ + override def beforeEach(): Unit = { + super.beforeEach() + LargeBinaryManager.setCurrentExecutionId(TestExecutionId) + } /** Creates a large binary from string data and returns it. */ private def createLargeBinary(data: String): LargeBinary = { @@ -54,7 +64,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(stream.readAllBytes().sameElements(data.getBytes)) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should read exact number of bytes") { @@ -67,7 +77,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(result.sameElements("0123456789".getBytes)) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should handle reading more bytes than available") { @@ -81,7 +91,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(result.sameElements(data.getBytes)) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should support standard single-byte read") { @@ -94,7 +104,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(stream.read() == -1) // EOF stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should return -1 at EOF") { @@ -105,7 +115,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(stream.read() == -1) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should throw exception when reading from closed stream") { @@ -117,7 +127,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertThrows[java.io.IOException](stream.read()) assertThrows[java.io.IOException](stream.readAllBytes()) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should handle multiple close calls") { @@ -127,7 +137,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { stream.close() stream.close() // Should not throw - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should read large data correctly") { @@ -145,7 +155,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(result.sameElements(largeData)) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } // ======================================== @@ -200,18 +210,18 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { out2.close() } - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should handle delete with no objects gracefully") { - LargeBinaryManager.deleteAllObjects() // Should not throw exception + LargeBinaryManager.deleteByExecution(TestExecutionId) // Should not throw exception } test("LargeBinaryManager should delete all objects") { val pointer1 = createLargeBinary("Test data") val pointer2 = createLargeBinary("Test data") - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should create bucket if it doesn't exist") { @@ -219,7 +229,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertStandardBucket(pointer) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should handle large objects correctly") { @@ -237,7 +247,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { stream.close() assert(readData.sameElements(largeData)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should generate unique URIs for different objects") { @@ -261,7 +271,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(pointer1.getUri != pointer2.getUri) assert(pointer1.getObjectKey != pointer2.getObjectKey) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should handle multiple reads from the same large binary") { @@ -279,7 +289,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData1.sameElements(data.getBytes)) assert(readData2.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should properly parse bucket name and object key from large binary") { @@ -289,7 +299,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(largeBinary.getObjectKey.nonEmpty) assert(!largeBinary.getObjectKey.startsWith("/")) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } // ======================================== @@ -309,7 +319,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertStandardBucket(largeBinary) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream constructor should read large binary contents") { @@ -322,7 +332,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream and LargeBinaryInputStream should work together end-to-end") { @@ -344,7 +354,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } // ======================================== @@ -368,7 +378,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should create large binary") { @@ -381,7 +391,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertStandardBucket(largeBinary) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should handle large data correctly") { @@ -399,7 +409,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(largeData)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should handle multiple writes") { @@ -416,7 +426,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements("Hello World!".getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should throw exception when writing to closed stream") { @@ -427,7 +437,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertThrows[java.io.IOException](outStream.write("more".getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should handle close() being called multiple times") { @@ -437,7 +447,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { outStream.close() outStream.close() // Should not throw - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("New LargeBinary() constructor should create unique URIs") { @@ -447,7 +457,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(largeBinary1.getUri != largeBinary2.getUri) assert(largeBinary1.getObjectKey != largeBinary2.getObjectKey) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinary() and LargeBinaryOutputStream API should be symmetric with input") { @@ -466,6 +476,42 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) + } + + test("create() stamps the object key with the current execution id") { + LargeBinaryManager.setCurrentExecutionId(123L) + val uri = LargeBinaryManager.create() + assert(uri.startsWith("s3://texera-large-binaries/objects/123/")) + } + + test("deleteByExecution removes only the target execution's binaries") { + // Create one binary under execution 1001 and another under 1002. + LargeBinaryManager.setCurrentExecutionId(1001L) + createLargeBinary("data for 1001") + LargeBinaryManager.setCurrentExecutionId(1002L) + createLargeBinary("data for 1002") + + // Delete only execution 1001's binaries. + LargeBinaryManager.deleteByExecution(1001L) + + try { + assert(!S3StorageClient.directoryExists("texera-large-binaries", "objects/1001")) + assert(S3StorageClient.directoryExists("texera-large-binaries", "objects/1002")) + } finally { + LargeBinaryManager.deleteByExecution(1002L) + } + } + + test("create() throws when no execution context is set on the thread") { + // Run on a fresh thread, where the thread-local defaults to None. + @volatile var caught: Option[Throwable] = None + val t = new Thread(() => { + try LargeBinaryManager.create() + catch { case e: Throwable => caught = Some(e) } + }) + t.start() + t.join() + assert(caught.exists(_.isInstanceOf[IllegalStateException])) } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala new file mode 100644 index 00000000000..196a9ca7827 --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import org.scalatest.funsuite.AnyFunSuite + +/** + * Unit tests for [[LargeBinaryManager.deleteByExecution]] that do not require a live + * S3/MinIO endpoint. The directory-delete operation is injected so both the success + * and the swallow-and-log error path can be exercised deterministically. + */ +class LargeBinaryManagerUnitSpec extends AnyFunSuite { + + test("deleteByExecution issues a delete scoped to the execution's object prefix") { + var captured: Option[(String, String)] = None + LargeBinaryManager.deleteByExecution( + 42L, + (bucket, prefix) => captured = Some((bucket, prefix)) + ) + assert(captured.contains((LargeBinaryManager.DEFAULT_BUCKET, "objects/42"))) + } + + test("deleteByExecution swallows exceptions raised by the underlying delete") { + // The error path logs and returns; it must not propagate the failure to callers. + LargeBinaryManager.deleteByExecution(7L, (_, _) => throw new RuntimeException("boom")) + succeed + } +}