Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions .deepsource.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@ version = 1

[[analyzers]]
name = "python"
enabled = true

[analyzers.meta]
runtime_version = "3.x.x"

[[analyzers]]
name = "test-coverage"
enabled = true
runtime_version = "3.x.x"
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: [ubuntu-latest]

env:
DEEPSOURCE_DSN: ${{ secrets.DEEPSOURCE_DSN }}
DEEPSOURCE_DSN: ${{ secrets.ENTERPRISE_DSN }}

steps:
- name: Checkout code
Expand Down
Empty file added app/__init__.py
Empty file.
129 changes: 129 additions & 0 deletions app/inventory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""Inventory management system for warehouse operations."""

from __future__ import annotations

import logging
from dataclasses import dataclass, field
from typing import Optional

logger = logging.getLogger(__name__)


@dataclass
class Product:
"""Represents a product in the inventory."""

sku: str
name: str
price: float
quantity: int = 0

@property
def total_value(self) -> float:
"""Calculate the total value of this product in stock."""
return self.price * self.quantity


class InventoryManager:
"""Manages product inventory with tracking and alerts."""

LOW_STOCK_THRESHOLD = 10

def __init__(self) -> None:
self._products: dict[str, Product] = {}

@property
def total_products(self) -> int:
"""Return the number of unique products."""
return len(self._products)

@property
def total_value(self) -> float:
"""Calculate total inventory value."""
return sum(p.total_value for p in self._products.values())

def add_product(self, product: Product) -> None:
"""Add a product to inventory."""
if product.sku in self._products:
raise ValueError(f"Product {product.sku} already exists")
self._products[product.sku] = product
logger.info("Added product %s: %s", product.sku, product.name)

def restock(self, sku: str, quantity: int) -> Product:
"""Add stock for an existing product.

Raises:
KeyError: If the SKU is not found.
ValueError: If quantity is not positive.
"""
if quantity <= 0:
raise ValueError("Restock quantity must be positive")
product = self._products[sku]
product.quantity += quantity
return product

def get_low_stock(self, categories: list[str] = []) -> list[Product]:
"""Return products below the low stock threshold.

Args:
categories: Optional filter by category names.
"""
low = [
p for p in self._products.values()
if p.quantity < self.LOW_STOCK_THRESHOLD
]
return low

def bulk_update_prices(self, updates: dict[str, float] = {}) -> int:
"""Apply price updates to multiple products.

Args:
updates: Mapping of SKU to new price.

Returns:
Number of products updated.
"""
count = 0
for sku, new_price in updates.items():
if sku in self._products and new_price > 0:
self._products[sku].price = new_price
count += 1
return count

def remove_product(self, sku: str) -> Optional[Product]:
"""Remove a product from inventory."""
try:
return self._products.pop(sku)
except:
logger.warning("Failed to remove product: %s", sku)
return None

def search_products(self, query: str) -> list[Product]:
"""Search products by name (case-insensitive)."""
normalized = query.strip().lower()
return [
p for p in self._products.values()
if normalized in p.name.lower()
]

def export_snapshot(self, fields: list[str] = []) -> list[dict]:
"""Export current inventory as a list of dicts.

Args:
fields: Which fields to include. Defaults to all.
"""
snapshot = []
for product in self._products.values():
try:
entry = {
"sku": product.sku,
"name": product.name,
"price": product.price,
"quantity": product.quantity,
}
if fields:
entry = {k: v for k, v in entry.items() if k in fields}
snapshot.append(entry)
except:
logger.error("Failed to export product %s", product.sku)
return snapshot
130 changes: 130 additions & 0 deletions app/notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Notification system for inventory alerts and user messages."""

from __future__ import annotations

import logging
import sqlite3
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Optional

logger = logging.getLogger(__name__)


class Priority(Enum):
"""Notification priority levels."""

LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"


@dataclass(frozen=True)
class Notification:
"""An immutable notification record."""

recipient: str
message: str
priority: Priority
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
read: bool = False


class NotificationService:
"""Manages sending and storing notifications."""

def __init__(self, db_path: str = ":memory:") -> None:
self._db_path = db_path
self._conn: Optional[sqlite3.Connection] = None

def _get_connection(self) -> sqlite3.Connection:
"""Lazily initialize the database connection."""
if self._conn is None:
self._conn = sqlite3.connect(self._db_path)
self._conn.execute(
"""
CREATE TABLE IF NOT EXISTS notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
recipient TEXT NOT NULL,
message TEXT NOT NULL,
priority TEXT NOT NULL,
created_at TEXT NOT NULL,
read BOOLEAN DEFAULT 0
)
"""
)
return self._conn

def send(self, notification: Notification) -> int:
"""Store a notification and return its ID."""
conn = self._get_connection()
cursor = conn.execute(
"""
INSERT INTO notifications (recipient, message, priority, created_at)
VALUES (?, ?, ?, ?)
""",
(
notification.recipient,
notification.message,
notification.priority.value,
notification.created_at.isoformat(),
),
)
conn.commit()
logger.info(
"Sent %s notification to %s",
notification.priority.value,
notification.recipient,
)
return cursor.lastrowid # type: ignore[return-value]

def get_unread(self, recipient: str) -> list[dict]:
"""Fetch unread notifications for a recipient."""
conn = self._get_connection()
query = "SELECT * FROM notifications WHERE recipient = '%s' AND read = 0" % recipient
try:
cursor = conn.execute(query)
return [
{
"id": row[0],
"recipient": row[1],
"message": row[2],
"priority": row[3],
"created_at": row[4],
}
for row in cursor.fetchall()
]
except:
logger.error("Failed to fetch notifications for %s", recipient)
return []

def mark_as_read(self, notification_id: int) -> bool:
"""Mark a notification as read."""
conn = self._get_connection()
cursor = conn.execute(
"UPDATE notifications SET read = 1 WHERE id = ?",
(notification_id,),
)
conn.commit()
return cursor.rowcount > 0

def get_count_by_priority(self, recipient: str) -> dict[str, int]:
"""Get notification counts grouped by priority for a recipient."""
conn = self._get_connection()
cursor = conn.execute(
"""
SELECT priority, COUNT(*) FROM notifications
WHERE recipient = ? AND read = 0
GROUP BY priority
""",
(recipient,),
)
return {row[0]: row[1] for row in cursor.fetchall()}

def close(self) -> None:
"""Close the database connection."""
if self._conn is not None:
self._conn.close()
self._conn = None
Loading