Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ message InitializeExecutorRequest {
int32 totalWorkerCount = 1;
core.OpExecInitInfo opExecInitInfo = 2;
bool isSource = 3;
core.ExecutionIdentity executionId = 4;
}

message UpdateExecutorRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@

class InitializeExecutorHandler(ControlHandler):
async def initialize_executor(self, req: InitializeExecutorRequest) -> EmptyReturn:
from pytexera.storage import large_binary_manager

op_exec_with_code: OpExecWithCode = get_one_of(req.op_exec_init_info)
large_binary_manager.set_current_execution_id(
req.execution_id.id if req.execution_id else None
)
self.context.executor_manager.initialize_executor(
op_exec_with_code.code, req.is_source, op_exec_with_code.language
)
Expand Down
29 changes: 25 additions & 4 deletions amber/src/main/python/pytexera/storage/large_binary_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
and LargeBinaryInputStream/LargeBinaryOutputStream instead.
"""

import time
import uuid
from loguru import logger
from core.storage.storage_config import StorageConfig
Expand All @@ -31,6 +30,20 @@
_s3_client = None
DEFAULT_BUCKET = "texera-large-binaries"

# Set at executor init and read by create()
_current_execution_id = None


def set_current_execution_id(execution_id):
"""Sets the execution id used to scope large binaries created by this worker."""
global _current_execution_id
_current_execution_id = execution_id


def get_current_execution_id():
"""Returns the execution id set for this worker, or None if unset."""
return _current_execution_id


def _get_s3_client():
"""Get or initialize S3 client (lazy initialization, cached)."""
Expand Down Expand Up @@ -68,11 +81,19 @@ def create() -> str:
"""
Creates a new largebinary reference with a unique S3 URI.

The object key is namespaced by the current execution id so cleanup can delete
only this execution's objects.

Returns:
S3 URI string (format: s3://bucket/key)
S3 URI string (format: s3://bucket/objects/{execution_id}/{uuid})
"""
_ensure_bucket_exists(DEFAULT_BUCKET)
timestamp_ms = int(time.time() * 1000)
execution_id = get_current_execution_id()
if execution_id is None:
raise RuntimeError(
"largebinary() requires an execution context, but no execution id "
"has been set for this worker."
)
unique_id = uuid.uuid4()
object_key = f"objects/{timestamp_ms}/{unique_id}"
object_key = f"objects/{execution_id}/{unique_id}"
return f"s3://{DEFAULT_BUCKET}/{object_key}"
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,8 @@ class RegionExecutionCoordinator(
InitializeExecutorRequest(
workerConfigs.length,
physicalOp.opExecInitInfo,
physicalOp.isSourceOperator
physicalOp.isSourceOperator,
Some(physicalOp.executionId)
),
asyncRPCClient.mkContext(workerId)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer
import org.apache.texera.amber.util.VirtualIdentityUtils
import org.apache.texera.service.util.LargeBinaryManager

trait InitializeExecutorHandler {
this: DataProcessorRPCHandlerInitializer =>
Expand All @@ -44,6 +45,7 @@ trait InitializeExecutorHandler {
)
)
cachedTotalWorkerCount = req.totalWorkerCount
req.executionId.foreach(eid => LargeBinaryManager.setCurrentExecutionId(eid.id))
setupExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount)
EmptyReturn()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,8 @@ class WorkflowResource extends LazyLogging {
.asScala
.toList

LargeBinaryManager.deleteAllObjects()
// Delete large binaries for each execution belonging to the workflows being removed
eids.foreach(eid => LargeBinaryManager.deleteByExecution(eid.longValue()))

// Collect all URIs related to executions for cleanup
val uris = eids.flatMap { eid =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class WorkflowService(
* 2. Clears URI references from the execution registry
* 3. Safely clears all result and console message documents
* 4. Expires Iceberg snapshots for runtime statistics
* 5. Deletes large binaries from MinIO
* 5. Deletes this execution's large binaries from MinIO
*
* @param eid The execution identity to clean up resources for
*/
Expand Down Expand Up @@ -348,7 +348,7 @@ class WorkflowService(
logger.debug(s"Error processing document at $uri: ${error.getMessage}")
}
}
// Delete large binaries
LargeBinaryManager.deleteAllObjects()
// Delete this execution's large binaries
LargeBinaryManager.deleteByExecution(eid.id)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.

import re

import pytest
from unittest.mock import patch, MagicMock
from pytexera.storage import large_binary_manager
Expand Down Expand Up @@ -42,6 +44,11 @@ def setup_storage_config(self):
s3_auth_username="minioadmin",
s3_auth_password="minioadmin",
)
# Provide a default execution id so create() doesn't raise.
original_eid = large_binary_manager.get_current_execution_id()
large_binary_manager.set_current_execution_id(1)
yield
large_binary_manager.set_current_execution_id(original_eid)

def test_get_s3_client_initializes_once(self):
"""Test that S3 client is initialized and cached."""
Expand Down Expand Up @@ -119,7 +126,7 @@ def test_ensure_bucket_exists_creates_bucket_when_missing(self):
mock_client.create_bucket.assert_called_once_with(Bucket="test-bucket")

def test_create_generates_unique_uri(self):
"""Test that create() generates a unique S3 URI."""
"""Test that create() generates a unique execution-scoped S3 URI."""
large_binary_manager._s3_client = None

with patch("boto3.client") as mock_boto3_client:
Expand All @@ -130,10 +137,10 @@ def test_create_generates_unique_uri(self):

uri = large_binary_manager.create()

# Check URI format
# Check URI format: s3://bucket/objects/{eid}/{uuid}
assert uri.startswith("s3://")
assert uri.startswith(f"s3://{large_binary_manager.DEFAULT_BUCKET}/")
assert "objects/" in uri
assert f"objects/{large_binary_manager.get_current_execution_id()}/" in uri

# Verify bucket was checked/created
mock_client.head_bucket.assert_called_once_with(
Expand All @@ -152,3 +159,26 @@ def test_create_uses_default_bucket(self):

uri = large_binary_manager.create()
assert large_binary_manager.DEFAULT_BUCKET in uri
assert f"objects/{large_binary_manager.get_current_execution_id()}/" in uri


def test_create_stamps_execution_id(monkeypatch):
# Avoid touching real S3 while testing key generation.
monkeypatch.setattr(
large_binary_manager, "_ensure_bucket_exists", lambda bucket: None
)
monkeypatch.setattr(large_binary_manager, "_current_execution_id", 42)

uri = large_binary_manager.create()

assert re.fullmatch(r"s3://texera-large-binaries/objects/42/[0-9a-fA-F-]+", uri)


def test_create_without_execution_context_raises(monkeypatch):
monkeypatch.setattr(
large_binary_manager, "_ensure_bucket_exists", lambda bucket: None
)
monkeypatch.setattr(large_binary_manager, "_current_execution_id", None)

with pytest.raises(RuntimeError):
large_binary_manager.create()
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ class WorkerSpec
OpExecWithClassName(
"org.apache.texera.amber.engine.architecture.worker.DummyOperatorExecutor"
),
isSource = false
isSource = false,
executionId = None
),
AsyncRPCContext(CONTROLLER, identifier1),
4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,63 @@ object LargeBinaryManager extends LazyLogging {
val DEFAULT_BUCKET: String = "texera-large-binaries"

/**
* Creates a new LargeBinary reference.
* Worker-scoped execution context. It is set on the data-processing thread when an
* executor is initialized.
*/
private val currentExecutionId: ThreadLocal[Option[Long]] =
ThreadLocal.withInitial(() => Option.empty[Long])

/** Sets the execution id for large binaries created on the current thread. */
def setCurrentExecutionId(executionId: Long): Unit =
currentExecutionId.set(Some(executionId))

/**
* Creates a new LargeBinary reference scoped to the current execution.
* The actual data upload happens separately via LargeBinaryOutputStream.
*
* @return S3 URI string for the new LargeBinary (format: s3://bucket/key)
* @return S3 URI string for the new LargeBinary (format: s3://bucket/objects/{eid}/{uuid})
*/
def create(): String = {
val objectKey = s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}"
val uri = s"s3://$DEFAULT_BUCKET/$objectKey"

uri
val eid = currentExecutionId
.get()
.getOrElse(
throw new IllegalStateException(
"LargeBinaryManager.create() requires an execution context, " +
"but none was set on the current thread."
)
)
val objectKey = s"objects/$eid/${UUID.randomUUID()}"
s"s3://$DEFAULT_BUCKET/$objectKey"
}

/**
* Deletes all large binaries from the bucket.
* Deletes all large binaries belonging to a single execution.
*
* @throws java.lang.Exception if the deletion fails
* @return Unit
* @param executionId the execution whose large binaries should be removed
*/
def deleteByExecution(executionId: Long): Unit =
deleteByExecution(executionId, S3StorageClient.deleteDirectory)

/**
* Overload that takes the directory-delete operation as a parameter. Visible for
* testing
*/
def deleteAllObjects(): Unit = {
private[util] def deleteByExecution(
executionId: Long,
deleteDir: (String, String) => Unit
): Unit = {
try {
S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects")
logger.info(s"Successfully deleted all large binaries from bucket: $DEFAULT_BUCKET")
deleteDir(DEFAULT_BUCKET, s"objects/$executionId")
logger.info(
s"Deleted large binaries for execution $executionId from bucket: $DEFAULT_BUCKET"
)
} catch {
case e: Exception =>
logger.warn(s"Failed to delete large binaries from bucket: $DEFAULT_BUCKET", e)
logger.warn(
s"Failed to delete large binaries for execution $executionId " +
s"from bucket: $DEFAULT_BUCKET",
e
)
}
}

Expand Down
Loading
Loading