From e86b5d6fb2ec7186b4911b4b4e1f16caf407ff6a Mon Sep 17 00:00:00 2001 From: Mahmoud Hashemi Date: Thu, 26 Feb 2026 12:46:27 -0800 Subject: [PATCH 1/2] threaded healthcheck server and db off the critical path for healthcheck --- bq/app.py | 69 ++++++++++---------- tests/unit/test_healthcheck.py | 112 +++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 35 deletions(-) create mode 100644 tests/unit/test_healthcheck.py diff --git a/bq/app.py b/bq/app.py index 5e44aa7..a765404 100644 --- a/bq/app.py +++ b/bq/app.py @@ -1,3 +1,4 @@ +import socketserver import functools import importlib import json @@ -13,6 +14,7 @@ from importlib.metadata import version from wsgiref.simple_server import make_server from wsgiref.simple_server import WSGIRequestHandler +from wsgiref.simple_server import WSGIServer import venusian from sqlalchemy import func @@ -53,6 +55,10 @@ def log_message(self, format, *args): ) + +class ThreadingWSGIServer(socketserver.ThreadingMixIn, WSGIServer): + daemon_threads = True + class BeanQueue: def __init__( self, @@ -70,6 +76,8 @@ def __init__( self._worker_update_shutdown_event: threading.Event = threading.Event() # noop if metrics thread is not started yet, shutdown if it is started self._metrics_server_shutdown: typing.Callable[[], None] = lambda: None + self._health_ok: bool = False + self._health_info: dict = {} def create_default_engine(self): # Use thread-safe connection pool when thread pool executor is enabled @@ -195,6 +203,10 @@ def update_workers( db.commit() if current_worker.state != models.WorkerState.RUNNING: + self._health_ok = False + self._health_info = { + "state": str(current_worker.state), + } # This probably means we are somehow very slow to update the heartbeat in time, or the timeout window # is set too short. It could also be the administrator update the worker state to something else than # RUNNING. Regardless the reason, let's stop processing. @@ -203,6 +215,7 @@ def update_workers( current_worker.id, current_worker.state, ) + self._health_ok = False sys.exit(0) do_shutdown = self._worker_update_shutdown_event.wait( @@ -214,51 +227,34 @@ def update_workers( current_worker.last_heartbeat = func.now() db.add(current_worker) db.commit() + self._health_ok = (current_worker.state == models.WorkerState.RUNNING) + self._health_info = { + "state": str(current_worker.state), + } def _serve_http_request( self, worker_id: typing.Any, environ: dict, start_response: typing.Callable ) -> list[bytes]: path = environ["PATH_INFO"] if path == "/healthz": - db = self.make_session() - worker_service = self._make_worker_service(db) - worker = worker_service.get_worker(worker_id) - if worker is not None and worker.state == models.WorkerState.RUNNING: - start_response( - "200 OK", - [ - ("Content-Type", "application/json"), - ], - ) - return [ - json.dumps(dict(status="ok", worker_id=str(worker_id))).encode( - "utf8" - ) - ] + if self._health_ok: + start_response("200 OK", [("Content-Type", "application/json")]) + return [json.dumps(dict( + status="ok", + worker_id=str(worker_id), + **self._health_info, + )).encode("utf8")] else: - logger.warning("Bad worker %s state %s", worker_id, worker.state) start_response( "500 Internal Server Error", - [ - ("Content-Type", "application/json"), - ], + [("Content-Type", "application/json")], ) - return [ - json.dumps( - dict( - status="internal error", - worker_id=str(worker_id), - state=str(worker.state), - ) - ).encode("utf8") - ] - # TODO: add other metrics endpoints - start_response( - "404 NOT FOUND", - [ - ("Content-Type", "application/json"), - ], - ) + return [json.dumps(dict( + status="error", + worker_id=str(worker_id), + **self._health_info, + )).encode("utf8")] + start_response("404 NOT FOUND", [("Content-Type", "application/json")]) return [json.dumps(dict(status="not found")).encode("utf8")] def run_metrics_http_server(self, worker_id: typing.Any): @@ -269,6 +265,7 @@ def run_metrics_http_server(self, worker_id: typing.Any): port, functools.partial(self._serve_http_request, worker_id), handler_class=WSGIRequestHandlerWithLogger, + server_class=ThreadingWSGIServer, ) as httpd: # expose graceful shutdown to the main thread self._metrics_server_shutdown = httpd.shutdown @@ -475,6 +472,8 @@ def process_tasks( db.add(worker) dispatch_service.listen(channels) db.commit() + self._health_ok = True + self._health_info = {"state": "RUNNING"} metrics_server_thread = None if self.config.METRICS_HTTP_SERVER_ENABLED: diff --git a/tests/unit/test_healthcheck.py b/tests/unit/test_healthcheck.py new file mode 100644 index 0000000..ffe4eda --- /dev/null +++ b/tests/unit/test_healthcheck.py @@ -0,0 +1,112 @@ +import json +from unittest.mock import MagicMock, patch + +import pytest + +from bq.app import BeanQueue + + +def _make_environ(path: str) -> dict: + """Build a minimal WSGI environ dict.""" + return {"PATH_INFO": path, "REQUEST_METHOD": "GET"} + + +@pytest.fixture +def bq(): + """Create a BeanQueue with stubbed config (no real DB needed).""" + with patch("bq.app.Config") as MockConfig: + config = MockConfig.return_value + config.DATABASE_URL = "postgresql://test@localhost/test" + config.METRICS_HTTP_SERVER_INTERFACE = "127.0.0.1" + config.METRICS_HTTP_SERVER_PORT = 0 + config.METRICS_HTTP_SERVER_ENABLED = False + config.METRICS_HTTP_SERVER_LOG_LEVEL = "WARNING" + config.WORKER_HEARTBEAT_PERIOD = 30 + config.WORKER_HEARTBEAT_TIMEOUT = 60 + config.MAX_WORKER_THREADS = 1 + instance = BeanQueue(config=config) + yield instance + + +class TestHealthzEndpoint: + """Tests for the /healthz HTTP handler.""" + + def test_healthz_returns_200_when_healthy(self, bq): + bq._health_ok = True + bq._health_info = {"state": "RUNNING"} + + start_response = MagicMock() + result = bq._serve_http_request("42", _make_environ("/healthz"), start_response) + + start_response.assert_called_once_with( + "200 OK", [("Content-Type", "application/json")] + ) + body = json.loads(result[0]) + assert body["status"] == "ok" + assert body["worker_id"] == "42" + assert body["state"] == "RUNNING" + + def test_healthz_returns_500_when_unhealthy(self, bq): + bq._health_ok = False + bq._health_info = {"state": "SHUTDOWN"} + + start_response = MagicMock() + result = bq._serve_http_request("42", _make_environ("/healthz"), start_response) + + start_response.assert_called_once_with( + "500 Internal Server Error", + [("Content-Type", "application/json")], + ) + body = json.loads(result[0]) + assert body["status"] == "error" + assert body["worker_id"] == "42" + assert body["state"] == "SHUTDOWN" + + def test_healthz_returns_500_before_worker_initialized(self, bq): + """Before process_tasks runs, _health_ok is False and _health_info is empty.""" + start_response = MagicMock() + result = bq._serve_http_request("1", _make_environ("/healthz"), start_response) + + start_response.assert_called_once_with( + "500 Internal Server Error", + [("Content-Type", "application/json")], + ) + body = json.loads(result[0]) + assert body["status"] == "error" + assert body["worker_id"] == "1" + + def test_healthz_does_not_create_db_session(self, bq): + """The critical fix: /healthz must never touch the DB.""" + bq._health_ok = True + bq._health_info = {"state": "RUNNING"} + + bq.make_session = MagicMock() + start_response = MagicMock() + bq._serve_http_request("42", _make_environ("/healthz"), start_response) + + bq.make_session.assert_not_called() + + def test_unknown_path_returns_404(self, bq): + start_response = MagicMock() + result = bq._serve_http_request("42", _make_environ("/unknown"), start_response) + + start_response.assert_called_once_with( + "404 NOT FOUND", [("Content-Type", "application/json")] + ) + body = json.loads(result[0]) + assert body["status"] == "not found" + + def test_404_does_not_create_db_session(self, bq): + bq.make_session = MagicMock() + start_response = MagicMock() + bq._serve_http_request("42", _make_environ("/anything"), start_response) + + bq.make_session.assert_not_called() + + +class TestHealthStateInitialization: + """Tests that _health_ok defaults correctly.""" + + def test_defaults_to_unhealthy(self, bq): + assert bq._health_ok is False + assert bq._health_info == {} From 193bdc4bf3cb13392a84356afc9f431c4a99e89d Mon Sep 17 00:00:00 2001 From: Mohamad Fazeli Date: Mon, 13 Apr 2026 20:12:52 +0330 Subject: [PATCH 2/2] Fix healthcheck race condition and cleanup - Combine _health_ok + _health_info into atomic _health_state tuple to prevent torn reads between HTTP handler and heartbeat threads - Set health state to unhealthy during graceful shutdown - Remove redundant _health_ok = False assignment - Fix import ordering (alphabetical stdlib) - Remove extra blank line before ThreadingWSGIServer - Simplify test fixture: use real Config instead of mock.patch - Remove stray files unrelated to healthcheck fix --- THREAD_EXECUTOR_IMPROVEMENTS.md | 259 ------------------------- bq/app.py | 34 ++-- test_custom_fields_summary.md | 145 -------------- test_custom_models.py | 319 ------------------------------- test_real_data.py | 329 -------------------------------- tests/unit/test_healthcheck.py | 34 ++-- 6 files changed, 26 insertions(+), 1094 deletions(-) delete mode 100644 THREAD_EXECUTOR_IMPROVEMENTS.md delete mode 100644 test_custom_fields_summary.md delete mode 100644 test_custom_models.py delete mode 100644 test_real_data.py diff --git a/THREAD_EXECUTOR_IMPROVEMENTS.md b/THREAD_EXECUTOR_IMPROVEMENTS.md deleted file mode 100644 index f5ddcbd..0000000 --- a/THREAD_EXECUTOR_IMPROVEMENTS.md +++ /dev/null @@ -1,259 +0,0 @@ -# Thread Executor Improvements - Learning from Celery - -## Current Implementation Analysis - -### Architecture -- **Pattern**: Batch dispatch → Commit → Submit all → Wait for all -- **BATCH_SIZE**: Number of tasks fetched per iteration -- **MAX_WORKER_THREADS**: Concurrent processing threads -- **Issue**: If BATCH_SIZE > MAX_WORKER_THREADS, tasks wait unnecessarily - -### Comparison with Celery - -| Feature | Current BeanQueue | Celery Thread Pool | Should Implement? | -|---------|-------------------|-------------------|-------------------| -| Task feeding | Batch-and-wait | Continuous feeding | ✅ Yes | -| Prefetch multiplier | No (fixed BATCH_SIZE) | Yes (configurable) | ✅ Yes | -| Result tracking | DB only | In-memory futures | 🤔 Maybe | -| Pool stats | No | Yes (_get_info) | ✅ Yes | -| Acknowledgement | Implicit (PROCESSING) | Explicit (early/late) | ✅ Yes | -| Dynamic resize | No | Yes (autoscale) | ⏸️ Future | - -## Improvement Proposals - -### 1. **Continuous Task Feeding** (High Priority) - -**Problem**: Current implementation fetches batch of tasks, waits for all to complete, then fetches next batch. - -**Current Code**: -```python -while True: - tasks = dispatch(..., limit=BATCH_SIZE).all() # Fetch 10 - if executor: - db.commit() - futures = [] - for task in tasks: # Submit all 10 - futures.append(executor.submit(...)) - for f in futures: # Wait for all 10 - f.result() - if not tasks: - break -``` - -**Proposed Improvement**: -```python -# Keep executor queue full continuously -from collections import deque - -running_futures = deque() -max_queued = MAX_WORKER_THREADS * 2 # Allow some queueing - -while True: - # Remove completed futures - while running_futures and running_futures[0].done(): - try: - running_futures.popleft().result() - except Exception as e: - logger.error("Task failed: %s", e) - - # Keep pool fed - while len(running_futures) < max_queued: - tasks = dispatch(..., limit=1).all() # Fetch one at a time - if not tasks: - break - db.commit() - task = tasks[0] - future = executor.submit(process_task, task.id) - running_futures.append(future) - - # Wait briefly if queue is full - if len(running_futures) >= max_queued: - time.sleep(0.01) # Prevent busy waiting -``` - -**Benefits**: -- Threads never starve -- Better resource utilization -- Faster overall throughput - -### 2. **Add Prefetch Multiplier** (Medium Priority) - -**Purpose**: Optimize for different task durations - -**Configuration**: -```python -class Config(BaseSettings): - MAX_WORKER_THREADS: int = 1 - BATCH_SIZE: int = 1 # Deprecated, use PREFETCH_MULTIPLIER - - # New config - WORKER_PREFETCH_MULTIPLIER: int = 1 - - @property - def max_prefetch(self): - """Maximum tasks to reserve at once""" - if self.MAX_WORKER_THREADS == 1: - return self.BATCH_SIZE # Backward compat - return self.MAX_WORKER_THREADS * self.WORKER_PREFETCH_MULTIPLIER -``` - -**Usage Guide**: -```python -# For long-running tasks (5-60 seconds each) -WORKER_PREFETCH_MULTIPLIER = 1 # Only reserve as many as can process -# With 4 threads: reserves max 4 tasks - -# For short tasks (< 1 second each) -WORKER_PREFETCH_MULTIPLIER = 4 # Reserve more to reduce latency -# With 4 threads: reserves up to 16 tasks -``` - -### 3. **Add Late Acknowledgement Mode** (Low Priority) - -**Current Behavior**: Tasks marked PROCESSING immediately on dispatch (early ACK) - -**Problem**: If worker crashes after dispatch but before processing, tasks are stuck in PROCESSING state - -**Proposed**: Add configuration for late acknowledgement - -```python -class Config(BaseSettings): - # When True: commit PROCESSING state only after task completes successfully - # When False: commit PROCESSING state immediately (current behavior) - TASK_ACKS_LATE: bool = False -``` - -**Implementation**: -```python -def _process_task_in_thread(self, task_id, registry): - db = self.make_session() - try: - task = db.query(self.task_model).filter(...).one() - - if self.config.TASK_ACKS_LATE: - # Task still in PENDING, set to PROCESSING now - task.state = TaskState.PROCESSING - db.commit() - - registry.process(task, event_cls=self.event_model) - db.commit() - except: - if self.config.TASK_ACKS_LATE: - # Reset to PENDING so another worker can try - task.state = TaskState.PENDING - db.commit() - raise -``` - -**Tradeoff**: -- ✅ Better failure recovery -- ❌ Same task might be processed twice if crash happens mid-processing -- ❌ More database transactions - -### 4. **Pool Statistics** (Low Priority) - -**Celery Pattern**: -```python -def _get_info(self): - return { - 'max-concurrency': self.limit, - 'threads': len(self.executor._threads), - } -``` - -**BeanQueue Implementation**: -```python -def get_pool_stats(self): - """Get thread pool statistics""" - if not hasattr(self, '_executor') or self._executor is None: - return { - 'mode': 'sequential', - 'max_workers': 1, - 'active_threads': 0, - } - - return { - 'mode': 'threaded', - 'max_workers': self._executor._max_workers, - 'active_threads': len(self._executor._threads), - 'queue_size': self._executor._work_queue.qsize(), - } -``` - -**Usage**: Expose via metrics endpoint or logging - -### 5. **Optimize BATCH_SIZE Default** (High Priority) - -**Current**: `BATCH_SIZE = 1` (safe but slow) - -**Proposed**: -```python -class Config(BaseSettings): - MAX_WORKER_THREADS: int = 1 - - # Auto-calculate optimal batch size - BATCH_SIZE: int = None # None = auto - - @field_validator("BATCH_SIZE", mode="after") - def set_batch_size_default(cls, v, info): - if v is not None: - return v - # Default: match thread count for thread mode - max_workers = info.data.get("MAX_WORKER_THREADS", 1) - if max_workers > 1: - return max_workers # Fetch as many as can process - return 1 # Sequential mode -``` - -## Implementation Priority - -1. **Critical** (Do Now): - - ✅ Already fixed: Commit after dispatch - - 🔄 Optimize BATCH_SIZE default - -2. **High** (Next): - - Continuous task feeding - - Prefetch multiplier - -3. **Medium** (Soon): - - Pool statistics/monitoring - -4. **Low** (Future): - - Late acknowledgement mode - - Dynamic pool resizing (like Celery autoscale) - -## Testing Strategy - -### Benchmark Tests -```python -def test_throughput_short_tasks(): - """Test throughput with 1000 tasks of 10ms each""" - # Compare: batch-and-wait vs continuous feeding - -def test_resource_utilization(): - """Verify threads don't starve""" - # Monitor: executor._threads length over time - # Should stay at max_workers, not fluctuate - -def test_latency_distribution(): - """Measure p50, p95, p99 latency""" - # Lower is better -``` - -## Migration Path - -1. **Phase 1** (Current): Basic thread pool ✅ -2. **Phase 2**: Optimize BATCH_SIZE defaults -3. **Phase 3**: Continuous feeding pattern -4. **Phase 4**: Prefetch multiplier support -5. **Phase 5**: Advanced features (late ack, autoscale) - -## Conclusion - -Celery's thread pool is mature and battle-tested. Key learnings: -- **Continuous feeding** > Batch-and-wait -- **Prefetch multiplier** allows optimization -- **Explicit acknowledgement** gives control -- **Pool monitoring** enables observability - -Our current implementation is **solid for v1** but can improve throughput with these patterns. diff --git a/bq/app.py b/bq/app.py index a765404..1a74a3d 100644 --- a/bq/app.py +++ b/bq/app.py @@ -1,9 +1,9 @@ -import socketserver import functools import importlib import json import logging import platform +import socketserver import sys import threading import typing @@ -54,8 +54,6 @@ def log_message(self, format, *args): ) ) - - class ThreadingWSGIServer(socketserver.ThreadingMixIn, WSGIServer): daemon_threads = True @@ -76,8 +74,9 @@ def __init__( self._worker_update_shutdown_event: threading.Event = threading.Event() # noop if metrics thread is not started yet, shutdown if it is started self._metrics_server_shutdown: typing.Callable[[], None] = lambda: None - self._health_ok: bool = False - self._health_info: dict = {} + # Health state as atomic tuple: (is_ok, info_dict) + # Written by heartbeat/main threads, read by HTTP handler threads + self._health_state: tuple[bool, dict] = (False, {}) def create_default_engine(self): # Use thread-safe connection pool when thread pool executor is enabled @@ -203,10 +202,7 @@ def update_workers( db.commit() if current_worker.state != models.WorkerState.RUNNING: - self._health_ok = False - self._health_info = { - "state": str(current_worker.state), - } + self._health_state = (False, {"state": str(current_worker.state)}) # This probably means we are somehow very slow to update the heartbeat in time, or the timeout window # is set too short. It could also be the administrator update the worker state to something else than # RUNNING. Regardless the reason, let's stop processing. @@ -215,7 +211,6 @@ def update_workers( current_worker.id, current_worker.state, ) - self._health_ok = False sys.exit(0) do_shutdown = self._worker_update_shutdown_event.wait( @@ -227,22 +222,23 @@ def update_workers( current_worker.last_heartbeat = func.now() db.add(current_worker) db.commit() - self._health_ok = (current_worker.state == models.WorkerState.RUNNING) - self._health_info = { - "state": str(current_worker.state), - } + self._health_state = ( + current_worker.state == models.WorkerState.RUNNING, + {"state": str(current_worker.state)}, + ) def _serve_http_request( self, worker_id: typing.Any, environ: dict, start_response: typing.Callable ) -> list[bytes]: path = environ["PATH_INFO"] if path == "/healthz": - if self._health_ok: + health_ok, health_info = self._health_state + if health_ok: start_response("200 OK", [("Content-Type", "application/json")]) return [json.dumps(dict( status="ok", worker_id=str(worker_id), - **self._health_info, + **health_info, )).encode("utf8")] else: start_response( @@ -252,7 +248,7 @@ def _serve_http_request( return [json.dumps(dict( status="error", worker_id=str(worker_id), - **self._health_info, + **health_info, )).encode("utf8")] start_response("404 NOT FOUND", [("Content-Type", "application/json")]) return [json.dumps(dict(status="not found")).encode("utf8")] @@ -472,8 +468,7 @@ def process_tasks( db.add(worker) dispatch_service.listen(channels) db.commit() - self._health_ok = True - self._health_info = {"state": "RUNNING"} + self._health_state = (True, {"state": "RUNNING"}) metrics_server_thread = None if self.config.METRICS_HTTP_SERVER_ENABLED: @@ -537,6 +532,7 @@ def process_tasks( ) except (SystemExit, KeyboardInterrupt): db.rollback() + self._health_state = (False, {}) logger.info("Shutting down ...") # Shutdown the executor if it was created diff --git a/test_custom_fields_summary.md b/test_custom_fields_summary.md deleted file mode 100644 index 2f9a43c..0000000 --- a/test_custom_fields_summary.md +++ /dev/null @@ -1,145 +0,0 @@ -# Thread Executor with Custom Models - Summary - -## Question -> Does the thread executor work with customized models and such? - -## Answer: Yes, with limitations - -The thread executor **works correctly** with the standard bq.Task and bq.Event models. Thread-safe session handling, connection pooling, and concurrent processing all function as expected. - -### What Works ✅ - -1. **Standard Models + Thread Executor**: Fully supported - - Multiple worker threads process tasks concurrently - - Thread-safe connection pooling (QueuePool) - - Proper session isolation per thread - - No race conditions or deadlocks - -2. **Custom Processors**: Fully supported - - You can create any processor logic - - Access task kwargs, state, metadata - - Return complex results - - Handle retries and failures - -### Custom Models Limitation ⚠️ - -**SQLAlchemy Polymorphic Inheritance Issue**: -When you extend `bq.Task` with custom fields (e.g., priority, category, user_id), SQLAlchemy requires proper polymorphic mapper configuration. The current BeanQueue codebase doesn't configure polymorphic inheritance, which causes this error: - -``` -FlushError: Attempting to flush an item of type as a member -of collection "Event.task". Expected an object of type -``` - -This happens because: -- Events create a relationship to Task (not CustomTask) -- SQLAlchemy doesn't know CustomTask is a valid subtype -- The mapper needs `__mapper_args__ = {"polymorphic_identity": "custom"}` configuration - -## Workaround Solutions - -### Option 1: Use Task Kwargs (Recommended) -Instead of custom model fields, store extra data in task kwargs: - -```python -task = my_processor.run( - priority=10, - category="critical", - user_id="admin-1" -) - -# Access in processor -@app.processor(channel="processing") -def my_processor(task: bq.Task, priority: int, category: str, user_id: str): - # Use the parameters directly - if priority > 5: - # High priority logic - pass -``` - -### Option 2: Use JSON Metadata Field -Store custom data in task metadata: - -```python -import json - -task = my_processor.run(data={"foo": "bar"}) -task.metadata = json.dumps({ - "priority": 10, - "category": "critical", - "user_id": "admin-1" -}) -db.commit() - -# Access in processor -@app.processor(channel="processing") -def my_processor(task: bq.Task, data: dict): - custom_data = json.loads(task.metadata or "{}") - priority = custom_data.get("priority", 0) -``` - -### Option 3: Disable Events -If you don't need event tracking, disable it: - -```python -app = bq.BeanQueue( - config=bq.Config( - EVENT_MODEL=None, # Disable events - # ... other config - ) -) -``` - -This removes the relationship constraint, but you lose event history. - -## Test Results - -### ✅ Thread Executor (Standard Models) -- **70 tasks** processed successfully -- **100% success rate** -- **6.82x speedup** with 8 threads -- **45.46 tasks/second** throughput -- **No race conditions** or deadlocks -- **Proper data integrity** maintained - -### ❌ Custom Models with Events -- Requires polymorphic mapper configuration -- Not currently supported without code changes to BeanQueue - -## Recommendations - -For most use cases, **Option 1 (kwargs)** or **Option 2 (JSON metadata)** provide all the flexibility you need without requiring custom models: - -```python -# Example: Priority-based processing with standard models -@app.processor(channel="processing") -def process_transaction( - task: bq.Task, - transaction_id: str, - amount: float, - merchant: str, - priority: int = 5, # Default priority - user_id: str | None = None, -): - # Your processing logic here - if priority > 7: - # Urgent processing - pass - - return { - "transaction_id": transaction_id, - "status": "completed", - "processed_by": user_id, - } - -# Usage -task = process_transaction.run( - transaction_id="TXN-12345", - amount=100.50, - merchant="Acme Corp", - priority=10, # High priority - user_id="user-123", -) -``` - -The thread executor works perfectly with this approach! 🚀 diff --git a/test_custom_models.py b/test_custom_models.py deleted file mode 100644 index cb195a5..0000000 --- a/test_custom_models.py +++ /dev/null @@ -1,319 +0,0 @@ -#!/usr/bin/env python3 -""" -Test thread executor with customized Task and Event models. - -This test verifies that the thread executor works correctly with: -- Custom task models with additional fields -- Custom event models with additional fields -- Custom processors that access custom fields -""" - -import bq -import time -import datetime -from multiprocessing import Process -from sqlalchemy import Column, String, Integer, DateTime, create_engine -from sqlalchemy.orm import Session - - -# Define custom models with additional fields -class CustomTask(bq.Task): - """Custom task model with additional tracking fields.""" - # Use same table as bq.Task (single table inheritance) - # SQLAlchemy will add these columns to the bq_tasks table - - # Additional custom fields - priority = Column(Integer, default=0) - category = Column(String(50)) - user_id = Column(String(50)) - metadata_json = Column(String(1000)) - - -class CustomEvent(bq.Event): - """Custom event model with additional tracking fields.""" - # Use same table as bq.Event (single table inheritance) - - # Additional custom fields - severity = Column(String(20)) - source = Column(String(50)) - - -# Create app with custom models via config -app = bq.BeanQueue( - config=bq.Config( - PROCESSOR_PACKAGES=["__main__"], - DATABASE_URL="postgresql://bq:@localhost/bq_test_custom", - MAX_WORKER_THREADS=4, - BATCH_SIZE=8, - TASK_MODEL="__main__.CustomTask", - EVENT_MODEL="__main__.CustomEvent", - ), -) - - -# Processor that uses custom task fields -@app.processor(channel="custom-processing") -def process_with_custom_fields(task: CustomTask, data: dict): - """Process task using custom fields.""" - time.sleep(0.1) # Simulate work - - # Access custom fields from task - priority = task.priority - category = task.category - user_id = task.user_id - - # Process based on custom fields - result = { - "data": data, - "priority": priority, - "category": category, - "user_id": user_id, - "processed_at": datetime.datetime.now().isoformat(), - "processing_thread": f"thread-{id(task) % 1000}", - } - - return result - - -# High priority processor -@app.processor(channel="priority-processing") -def high_priority_task(task: CustomTask, operation: str): - """Process high priority tasks.""" - time.sleep(0.05) - - return { - "operation": operation, - "priority": task.priority, - "status": "completed", - "processed_at": datetime.datetime.now().isoformat(), - } - - -def run_worker_process(db_url: str): - """Run worker process with custom models and thread pool.""" - from bq.config import Config - - # Create app with same custom models - worker_app = bq.BeanQueue( - config=Config( - PROCESSOR_PACKAGES=["__main__"], - DATABASE_URL=db_url, - MAX_WORKER_THREADS=4, - BATCH_SIZE=8, - TASK_MODEL="__main__.CustomTask", - EVENT_MODEL="__main__.CustomEvent", - ), - ) - worker_app.process_tasks(channels=("custom-processing", "priority-processing")) - - -def run_custom_model_test(): - """Run integration test with custom models.""" - - print("=" * 80) - print("THREAD EXECUTOR TEST - CUSTOM MODELS") - print("=" * 80) - - # Setup database - engine = create_engine("postgresql://bq:@localhost/bq_test_custom") - - # Drop and recreate tables - from bq.db.base import Base - Base.metadata.drop_all(bind=engine) - Base.metadata.create_all(bind=engine) - - db = Session(bind=engine) - - print("\n📊 Creating tasks with custom fields...") - - # Create tasks with custom fields - tasks_created = [] - - # High priority tasks - for i in range(10): - task = high_priority_task.run(operation=f"priority-op-{i}") - # Update custom fields - task.priority = 10 - task.category = "critical" - task.user_id = f"admin-{i % 3}" - task.metadata_json = f'{{"request_id": "REQ-{i:04d}"}}' - db.add(task) - db.commit() - tasks_created.append(task.id) - - # Normal priority tasks with custom processing - for i in range(15): - task = process_with_custom_fields.run(data={"item_id": i, "value": i * 10}) - # Update custom fields - task.priority = 5 - task.category = "standard" - task.user_id = f"user-{i % 5}" - task.metadata_json = f'{{"batch": "BATCH-{i // 5}"}}' - db.add(task) - db.commit() - tasks_created.append(task.id) - - print(f"✓ Created {len(tasks_created)} tasks with custom fields") - print(f" - 10 high priority tasks (priority=10)") - print(f" - 15 standard priority tasks (priority=5)") - - # Verify custom fields are set - sample_task = db.query(CustomTask).filter(CustomTask.priority == 10).first() - print(f"\n📋 Sample task with custom fields:") - print(f" - ID: {sample_task.id}") - print(f" - Priority: {sample_task.priority}") - print(f" - Category: {sample_task.category}") - print(f" - User ID: {sample_task.user_id}") - print(f" - Metadata: {sample_task.metadata_json}") - - print(f"\n🚀 Starting worker with 4 threads...") - - # Start worker process - proc = Process( - target=run_worker_process, - args=("postgresql://bq:@localhost/bq_test_custom",), - ) - proc.start() - - # Wait for tasks to complete - start_time = time.time() - begin = datetime.datetime.now() - - while True: - db.expire_all() - completed = ( - db.query(CustomTask) - .filter(CustomTask.state == bq.TaskState.DONE) - .count() - ) - - if completed == len(tasks_created): - break - - delta = datetime.datetime.now() - begin - if delta.total_seconds() > 30: - proc.kill() - proc.join(3) - raise TimeoutError( - f"Timeout waiting for tasks. Only {completed}/{len(tasks_created)} completed" - ) - time.sleep(0.5) - - elapsed = time.time() - start_time - - # Stop worker - proc.kill() - proc.join(3) - - # Check results - print(f"\n📊 Verifying results...") - - completed_tasks = db.query(CustomTask).filter( - CustomTask.state == bq.TaskState.DONE - ).all() - - failed_tasks = db.query(CustomTask).filter( - CustomTask.state == bq.TaskState.FAILED - ).all() - - processing_tasks = db.query(CustomTask).filter( - CustomTask.state == bq.TaskState.PROCESSING - ).all() - - # Check custom events - events = db.query(CustomEvent).all() - - print("\n" + "=" * 80) - print("📊 TEST RESULTS") - print("=" * 80) - print(f"\n✓ Total Tasks: {len(tasks_created)}") - print(f"✓ Completed: {len(completed_tasks)} ({len(completed_tasks)/len(tasks_created)*100:.1f}%)") - print(f"✓ Failed: {len(failed_tasks)}") - print(f"✓ Time Elapsed: {elapsed:.2f} seconds") - print(f"✓ Throughput: {len(completed_tasks)/elapsed:.2f} tasks/second") - - print(f"\n📝 Events Generated: {len(events)}") - - # Verify results contain custom field data - print(f"\n🔍 Verifying custom field data in results...") - - tasks_with_results = [t for t in completed_tasks if t.result] - high_priority_completed = [ - t for t in completed_tasks - if t.priority == 10 - ] - standard_priority_completed = [ - t for t in completed_tasks - if t.priority == 5 - ] - - print(f" ✓ Tasks with results: {len(tasks_with_results)}/{len(completed_tasks)}") - print(f" ✓ High priority tasks completed: {len(high_priority_completed)}/10") - print(f" ✓ Standard priority tasks completed: {len(standard_priority_completed)}/15") - - # Sample some results - if tasks_with_results: - print(f"\n📋 Sample Results:") - for i, task in enumerate(tasks_with_results[:3], 1): - print(f"\n Task {i}:") - print(f" Priority: {task.priority}") - print(f" Category: {task.category}") - print(f" User ID: {task.user_id}") - print(f" Result: {task.result}") - - # Data integrity checks - print(f"\n🔍 Data Integrity Check:") - if processing_tasks: - print(f" ⚠️ Warning: {len(processing_tasks)} tasks stuck in PROCESSING state") - else: - print(f" ✓ No tasks stuck in PROCESSING state") - - # Verify custom fields preserved - corrupted_tasks = [ - t for t in completed_tasks - if t.priority is None or t.category is None - ] - if corrupted_tasks: - print(f" ⚠️ Warning: {len(corrupted_tasks)} tasks lost custom field data") - else: - print(f" ✓ All custom fields preserved correctly") - - # Cleanup - print(f"\n🧹 Cleaning up...") - db.query(CustomEvent).delete() - db.query(CustomTask).delete() - db.commit() - db.close() - Base.metadata.drop_all(bind=engine) - - print("\n" + "=" * 80) - - # Determine success - success = ( - len(completed_tasks) == len(tasks_created) and - len(failed_tasks) == 0 and - len(processing_tasks) == 0 and - len(corrupted_tasks) == 0 and - len(tasks_with_results) == len(completed_tasks) - ) - - if success: - print("✅ TEST PASSED") - print("=" * 80) - print("\n🎉 SUCCESS: Thread executor works correctly with custom models!") - return True - else: - print("❌ TEST FAILED") - print("=" * 80) - return False - - -if __name__ == "__main__": - try: - result = run_custom_model_test() - exit(0 if result else 1) - except Exception as e: - print(f"\n💥 ERROR: {e}") - import traceback - traceback.print_exc() - exit(1) diff --git a/test_real_data.py b/test_real_data.py deleted file mode 100644 index 537fb19..0000000 --- a/test_real_data.py +++ /dev/null @@ -1,329 +0,0 @@ -#!/usr/bin/env python -""" -Real-world integration test for thread executor. -Simulates a data processing pipeline with realistic tasks. -""" -import datetime -import json -import random -import time -from dataclasses import dataclass -from typing import List - -from sqlalchemy import create_engine -from sqlalchemy.orm import Session - -import bq -from bq.config import Config -from bq.db.base import Base -from bq.models import TaskState - - -# Define realistic task processors -app = bq.BeanQueue( - config=Config( - PROCESSOR_PACKAGES=["__main__"], - DATABASE_URL="postgresql://bq:@localhost/bq_test", - MAX_WORKER_THREADS=8, - BATCH_SIZE=16, - ) -) - - -@dataclass -class Transaction: - """Simulated financial transaction""" - id: str - amount: float - currency: str - merchant: str - timestamp: str - category: str - - -@app.processor(channel="data-processing") -def process_transaction(task: bq.Task, transaction_data: dict): - """Process a financial transaction with validation and categorization.""" - time.sleep(0.1) # Simulate DB lookup/API call - - # Parse transaction - txn = Transaction(**transaction_data) - - # Simulate processing logic - categorized = { - "transaction_id": txn.id, - "amount": txn.amount, - "currency": txn.currency, - "merchant": txn.merchant, - "category": txn.category, - "processed_at": datetime.datetime.now().isoformat(), - "risk_score": random.uniform(0, 1), - } - - return categorized - - -@app.processor(channel="data-processing") -def analyze_merchant(task: bq.Task, merchant_name: str, transaction_count: int): - """Analyze merchant data and compute statistics.""" - time.sleep(0.15) # Simulate data aggregation - - return { - "merchant": merchant_name, - "total_transactions": transaction_count, - "average_amount": random.uniform(10, 500), - "fraud_probability": random.uniform(0, 0.1), - "analyzed_at": datetime.datetime.now().isoformat(), - } - - -@app.processor(channel="data-processing") -def generate_report(task: bq.Task, report_type: str, data_count: int): - """Generate a comprehensive report.""" - time.sleep(0.2) # Simulate report generation - - return { - "report_type": report_type, - "records_processed": data_count, - "generated_at": datetime.datetime.now().isoformat(), - "status": "completed", - } - - -def generate_realistic_transactions(count: int) -> List[dict]: - """Generate realistic transaction data.""" - merchants = [ - "Amazon", "Walmart", "Target", "Starbucks", "McDonald's", - "Shell Gas", "Uber", "Netflix", "Spotify", "Apple Store", - "Best Buy", "Home Depot", "Kroger", "Costco", "CVS Pharmacy" - ] - - categories = [ - "groceries", "entertainment", "transportation", "utilities", - "shopping", "dining", "healthcare", "education", "travel" - ] - - currencies = ["USD", "EUR", "GBP", "CAD"] - - transactions = [] - base_time = datetime.datetime.now() - - for i in range(count): - txn = { - "id": f"TXN-{i+1:06d}", - "amount": round(random.uniform(5.0, 500.0), 2), - "currency": random.choice(currencies), - "merchant": random.choice(merchants), - "timestamp": (base_time - datetime.timedelta(hours=random.randint(0, 720))).isoformat(), - "category": random.choice(categories), - } - transactions.append(txn) - - return transactions - - -def run_integration_test(): - """Run comprehensive integration test with realistic data.""" - print("=" * 80) - print("THREAD EXECUTOR INTEGRATION TEST - REALISTIC DATA") - print("=" * 80) - - # Setup - engine = create_engine("postgresql://bq:@localhost/bq_test") - Base.metadata.create_all(bind=engine) - db = Session(bind=engine) - - # Clean up existing tasks (delete events first due to FK constraint) - from bq.models import Event - db.query(Event).delete() - db.query(bq.Task).delete() - db.commit() - - print("\n📊 Generating realistic test data...") - - # Generate realistic transaction data - transactions = generate_realistic_transactions(50) - merchants = list(set(txn["merchant"] for txn in transactions)) - - print(f"✓ Generated {len(transactions)} transactions") - print(f"✓ From {len(merchants)} unique merchants") - print(f"✓ Sample transaction: {json.dumps(transactions[0], indent=2)}") - - # Create tasks - print("\n📝 Creating tasks...") - task_count = 0 - - # 1. Transaction processing tasks (50 tasks) - for txn in transactions: - task = process_transaction.run(transaction_data=txn) - db.add(task) - task_count += 1 - - # 2. Merchant analysis tasks (15 tasks) - for merchant in merchants: - merchant_txns = [t for t in transactions if t["merchant"] == merchant] - task = analyze_merchant.run( - merchant_name=merchant, - transaction_count=len(merchant_txns) - ) - db.add(task) - task_count += 1 - - # 3. Report generation tasks (5 tasks) - report_types = ["daily_summary", "fraud_analysis", "merchant_ranking", "category_breakdown", "currency_report"] - for report_type in report_types: - task = generate_report.run( - report_type=report_type, - data_count=len(transactions) - ) - db.add(task) - task_count += 1 - - db.commit() - print(f"✓ Created {task_count} tasks total") - print(f" - {len(transactions)} transaction processing tasks") - print(f" - {len(merchants)} merchant analysis tasks") - print(f" - {len(report_types)} report generation tasks") - - # Start worker process - print(f"\n🚀 Starting worker with configuration:") - print(f" - MAX_WORKER_THREADS: {app.config.MAX_WORKER_THREADS}") - print(f" - BATCH_SIZE: {app.config.BATCH_SIZE}") - print(f" - Connection Pool: {app.engine.pool.__class__.__name__}") - print(f" - Pool Size: {app.engine.pool.size()}") - - from multiprocessing import Process - - def run_worker(): - app.process_tasks(channels=("data-processing",)) - - worker = Process(target=run_worker) - worker.start() - - # Monitor progress - print("\n⏱️ Processing tasks...") - start_time = time.time() - last_done = 0 - - while True: - db.expire_all() - - done = db.query(bq.Task).filter(bq.Task.state == TaskState.DONE).count() - processing = db.query(bq.Task).filter(bq.Task.state == TaskState.PROCESSING).count() - failed = db.query(bq.Task).filter(bq.Task.state == TaskState.FAILED).count() - pending = db.query(bq.Task).filter(bq.Task.state == TaskState.PENDING).count() - - elapsed = time.time() - start_time - - if done != last_done: - progress = (done / task_count) * 100 - throughput = done / elapsed if elapsed > 0 else 0 - print(f" Progress: {done}/{task_count} ({progress:.1f}%) | " - f"Processing: {processing} | Failed: {failed} | " - f"Throughput: {throughput:.1f} tasks/sec") - last_done = done - - if done + failed == task_count: - break - - if elapsed > 60: - print("\n⚠️ Timeout - stopping test") - break - - time.sleep(0.5) - - worker.kill() - worker.join(3) - - # Results - elapsed_time = time.time() - start_time - - print("\n" + "=" * 80) - print("📊 TEST RESULTS") - print("=" * 80) - - # Summary statistics - done_tasks = db.query(bq.Task).filter(bq.Task.state == TaskState.DONE).all() - failed_tasks = db.query(bq.Task).filter(bq.Task.state == TaskState.FAILED).all() - - print(f"\n✓ Total Tasks: {task_count}") - print(f"✓ Completed: {len(done_tasks)} ({len(done_tasks)/task_count*100:.1f}%)") - print(f"✓ Failed: {len(failed_tasks)}") - print(f"✓ Time Elapsed: {elapsed_time:.2f} seconds") - print(f"✓ Throughput: {len(done_tasks)/elapsed_time:.2f} tasks/second") - - # Expected vs actual performance - sequential_time = task_count * 0.15 # Average task duration - print(f"\n⚡ Performance Comparison:") - print(f" - Sequential (estimated): {sequential_time:.1f}s") - print(f" - Parallel (actual): {elapsed_time:.1f}s") - print(f" - Speedup: {sequential_time/elapsed_time:.2f}x") - - # Sample results - if done_tasks: - print(f"\n📋 Sample Processed Results:") - for i, task in enumerate(done_tasks[:3]): - print(f"\n Task {i+1}: {task.func_name}") - print(f" Result: {json.dumps(task.result, indent=6)}") - - # Task breakdown by type - print(f"\n📈 Task Breakdown:") - txn_tasks = [t for t in done_tasks if t.func_name == "process_transaction"] - merchant_tasks = [t for t in done_tasks if t.func_name == "analyze_merchant"] - report_tasks = [t for t in done_tasks if t.func_name == "generate_report"] - - print(f" - Transactions processed: {len(txn_tasks)}") - print(f" - Merchant analyses: {len(merchant_tasks)}") - print(f" - Reports generated: {len(report_tasks)}") - - # Verify data integrity - print(f"\n🔍 Data Integrity Check:") - all_results = [t.result for t in done_tasks if t.result] - print(f" ✓ All {len(all_results)} tasks have valid results") - - # Check for any stuck tasks - stuck_processing = db.query(bq.Task).filter(bq.Task.state == TaskState.PROCESSING).count() - if stuck_processing > 0: - print(f" ⚠️ Warning: {stuck_processing} tasks stuck in PROCESSING state") - else: - print(f" ✓ No tasks stuck in PROCESSING state") - - # Cleanup - print(f"\n🧹 Cleaning up...") - # Delete events first due to foreign key constraint - from bq.models import Event - db.query(Event).delete() - db.query(bq.Task).delete() - db.commit() - db.close() - Base.metadata.drop_all(bind=engine) - - print("\n" + "=" * 80) - print("✅ INTEGRATION TEST COMPLETE") - print("=" * 80) - - return { - "success": len(failed_tasks) == 0 and stuck_processing == 0, - "total_tasks": task_count, - "completed": len(done_tasks), - "failed": len(failed_tasks), - "elapsed_time": elapsed_time, - "throughput": len(done_tasks) / elapsed_time, - } - - -if __name__ == "__main__": - try: - result = run_integration_test() - - if result["success"]: - print("\n🎉 SUCCESS: Thread executor working correctly with realistic data!") - exit(0) - else: - print("\n❌ FAILURE: Some tasks failed or got stuck") - exit(1) - except Exception as e: - print(f"\n💥 ERROR: {e}") - import traceback - traceback.print_exc() - exit(1) diff --git a/tests/unit/test_healthcheck.py b/tests/unit/test_healthcheck.py index ffe4eda..445a7f8 100644 --- a/tests/unit/test_healthcheck.py +++ b/tests/unit/test_healthcheck.py @@ -1,9 +1,10 @@ import json -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import pytest from bq.app import BeanQueue +from bq.config import Config def _make_environ(path: str) -> dict: @@ -13,27 +14,17 @@ def _make_environ(path: str) -> dict: @pytest.fixture def bq(): - """Create a BeanQueue with stubbed config (no real DB needed).""" - with patch("bq.app.Config") as MockConfig: - config = MockConfig.return_value - config.DATABASE_URL = "postgresql://test@localhost/test" - config.METRICS_HTTP_SERVER_INTERFACE = "127.0.0.1" - config.METRICS_HTTP_SERVER_PORT = 0 - config.METRICS_HTTP_SERVER_ENABLED = False - config.METRICS_HTTP_SERVER_LOG_LEVEL = "WARNING" - config.WORKER_HEARTBEAT_PERIOD = 30 - config.WORKER_HEARTBEAT_TIMEOUT = 60 - config.MAX_WORKER_THREADS = 1 - instance = BeanQueue(config=config) - yield instance - + """Create a BeanQueue with real Config (no real DB needed).""" + instance = BeanQueue(config=Config( + DATABASE_URL="postgresql://test@localhost/test", + )) + return instance class TestHealthzEndpoint: """Tests for the /healthz HTTP handler.""" def test_healthz_returns_200_when_healthy(self, bq): - bq._health_ok = True - bq._health_info = {"state": "RUNNING"} + bq._health_state = (True, {"state": "RUNNING"}) start_response = MagicMock() result = bq._serve_http_request("42", _make_environ("/healthz"), start_response) @@ -47,8 +38,7 @@ def test_healthz_returns_200_when_healthy(self, bq): assert body["state"] == "RUNNING" def test_healthz_returns_500_when_unhealthy(self, bq): - bq._health_ok = False - bq._health_info = {"state": "SHUTDOWN"} + bq._health_state = (False, {"state": "SHUTDOWN"}) start_response = MagicMock() result = bq._serve_http_request("42", _make_environ("/healthz"), start_response) @@ -77,8 +67,7 @@ def test_healthz_returns_500_before_worker_initialized(self, bq): def test_healthz_does_not_create_db_session(self, bq): """The critical fix: /healthz must never touch the DB.""" - bq._health_ok = True - bq._health_info = {"state": "RUNNING"} + bq._health_state = (True, {"state": "RUNNING"}) bq.make_session = MagicMock() start_response = MagicMock() @@ -108,5 +97,4 @@ class TestHealthStateInitialization: """Tests that _health_ok defaults correctly.""" def test_defaults_to_unhealthy(self, bq): - assert bq._health_ok is False - assert bq._health_info == {} + assert bq._health_state == (False, {})