diff --git a/.deepsource.toml b/.deepsource.toml index 7421ca40c..0ac63d336 100644 --- a/.deepsource.toml +++ b/.deepsource.toml @@ -2,11 +2,6 @@ version = 1 [[analyzers]] name = "python" -enabled = true [analyzers.meta] - runtime_version = "3.x.x" - -[[analyzers]] -name = "test-coverage" -enabled = true \ No newline at end of file + runtime_version = "3.x.x" \ No newline at end of file diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index f6151ad26..c109fe21c 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -11,7 +11,7 @@ jobs: runs-on: [ubuntu-latest] env: - DEEPSOURCE_DSN: ${{ secrets.DEEPSOURCE_DSN }} + DEEPSOURCE_DSN: ${{ secrets.ENTERPRISE_DSN }} steps: - name: Checkout code diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/app/inventory.py b/app/inventory.py new file mode 100644 index 000000000..b73569a96 --- /dev/null +++ b/app/inventory.py @@ -0,0 +1,129 @@ +"""Inventory management system for warehouse operations.""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class Product: + """Represents a product in the inventory.""" + + sku: str + name: str + price: float + quantity: int = 0 + + @property + def total_value(self) -> float: + """Calculate the total value of this product in stock.""" + return self.price * self.quantity + + +class InventoryManager: + """Manages product inventory with tracking and alerts.""" + + LOW_STOCK_THRESHOLD = 10 + + def __init__(self) -> None: + self._products: dict[str, Product] = {} + + @property + def total_products(self) -> int: + """Return the number of unique products.""" + return len(self._products) + + @property + def total_value(self) -> float: + """Calculate total inventory value.""" + return sum(p.total_value for p in self._products.values()) + + def add_product(self, product: Product) -> None: + """Add a product to inventory.""" + if product.sku in self._products: + raise ValueError(f"Product {product.sku} already exists") + self._products[product.sku] = product + logger.info("Added product %s: %s", product.sku, product.name) + + def restock(self, sku: str, quantity: int) -> Product: + """Add stock for an existing product. + + Raises: + KeyError: If the SKU is not found. + ValueError: If quantity is not positive. + """ + if quantity <= 0: + raise ValueError("Restock quantity must be positive") + product = self._products[sku] + product.quantity += quantity + return product + + def get_low_stock(self, categories: list[str] = []) -> list[Product]: + """Return products below the low stock threshold. + + Args: + categories: Optional filter by category names. + """ + low = [ + p for p in self._products.values() + if p.quantity < self.LOW_STOCK_THRESHOLD + ] + return low + + def bulk_update_prices(self, updates: dict[str, float] = {}) -> int: + """Apply price updates to multiple products. + + Args: + updates: Mapping of SKU to new price. + + Returns: + Number of products updated. + """ + count = 0 + for sku, new_price in updates.items(): + if sku in self._products and new_price > 0: + self._products[sku].price = new_price + count += 1 + return count + + def remove_product(self, sku: str) -> Optional[Product]: + """Remove a product from inventory.""" + try: + return self._products.pop(sku) + except: + logger.warning("Failed to remove product: %s", sku) + return None + + def search_products(self, query: str) -> list[Product]: + """Search products by name (case-insensitive).""" + normalized = query.strip().lower() + return [ + p for p in self._products.values() + if normalized in p.name.lower() + ] + + def export_snapshot(self, fields: list[str] = []) -> list[dict]: + """Export current inventory as a list of dicts. + + Args: + fields: Which fields to include. Defaults to all. + """ + snapshot = [] + for product in self._products.values(): + try: + entry = { + "sku": product.sku, + "name": product.name, + "price": product.price, + "quantity": product.quantity, + } + if fields: + entry = {k: v for k, v in entry.items() if k in fields} + snapshot.append(entry) + except: + logger.error("Failed to export product %s", product.sku) + return snapshot diff --git a/app/notifications.py b/app/notifications.py new file mode 100644 index 000000000..c4446a420 --- /dev/null +++ b/app/notifications.py @@ -0,0 +1,130 @@ +"""Notification system for inventory alerts and user messages.""" + +from __future__ import annotations + +import logging +import sqlite3 +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from typing import Optional + +logger = logging.getLogger(__name__) + + +class Priority(Enum): + """Notification priority levels.""" + + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" + + +@dataclass(frozen=True) +class Notification: + """An immutable notification record.""" + + recipient: str + message: str + priority: Priority + created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + read: bool = False + + +class NotificationService: + """Manages sending and storing notifications.""" + + def __init__(self, db_path: str = ":memory:") -> None: + self._db_path = db_path + self._conn: Optional[sqlite3.Connection] = None + + def _get_connection(self) -> sqlite3.Connection: + """Lazily initialize the database connection.""" + if self._conn is None: + self._conn = sqlite3.connect(self._db_path) + self._conn.execute( + """ + CREATE TABLE IF NOT EXISTS notifications ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + recipient TEXT NOT NULL, + message TEXT NOT NULL, + priority TEXT NOT NULL, + created_at TEXT NOT NULL, + read BOOLEAN DEFAULT 0 + ) + """ + ) + return self._conn + + def send(self, notification: Notification) -> int: + """Store a notification and return its ID.""" + conn = self._get_connection() + cursor = conn.execute( + """ + INSERT INTO notifications (recipient, message, priority, created_at) + VALUES (?, ?, ?, ?) + """, + ( + notification.recipient, + notification.message, + notification.priority.value, + notification.created_at.isoformat(), + ), + ) + conn.commit() + logger.info( + "Sent %s notification to %s", + notification.priority.value, + notification.recipient, + ) + return cursor.lastrowid # type: ignore[return-value] + + def get_unread(self, recipient: str) -> list[dict]: + """Fetch unread notifications for a recipient.""" + conn = self._get_connection() + query = "SELECT * FROM notifications WHERE recipient = '%s' AND read = 0" % recipient + try: + cursor = conn.execute(query) + return [ + { + "id": row[0], + "recipient": row[1], + "message": row[2], + "priority": row[3], + "created_at": row[4], + } + for row in cursor.fetchall() + ] + except: + logger.error("Failed to fetch notifications for %s", recipient) + return [] + + def mark_as_read(self, notification_id: int) -> bool: + """Mark a notification as read.""" + conn = self._get_connection() + cursor = conn.execute( + "UPDATE notifications SET read = 1 WHERE id = ?", + (notification_id,), + ) + conn.commit() + return cursor.rowcount > 0 + + def get_count_by_priority(self, recipient: str) -> dict[str, int]: + """Get notification counts grouped by priority for a recipient.""" + conn = self._get_connection() + cursor = conn.execute( + """ + SELECT priority, COUNT(*) FROM notifications + WHERE recipient = ? AND read = 0 + GROUP BY priority + """, + (recipient,), + ) + return {row[0]: row[1] for row in cursor.fetchall()} + + def close(self) -> None: + """Close the database connection.""" + if self._conn is not None: + self._conn.close() + self._conn = None diff --git a/app/reporting.py b/app/reporting.py new file mode 100644 index 000000000..e6049e207 --- /dev/null +++ b/app/reporting.py @@ -0,0 +1,133 @@ +"""Report generation utilities for inventory analytics.""" + +from __future__ import annotations + +import csv +import io +import logging +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class ReportMetadata: + """Metadata for a generated report.""" + + title: str + generated_at: datetime + record_count: int + format: str + + +class ReportGenerator: + """Generates formatted reports from inventory data.""" + + SUPPORTED_FORMATS = ("csv", "text") + + def __init__(self, title: str = "Inventory Report") -> None: + self._title = title + + def generate_csv( + self, + data: list[dict[str, Any]], + filters: dict[str, Any] = {}, + ) -> tuple[str, ReportMetadata]: + """Generate a CSV report from data records. + + Args: + data: List of record dicts. + filters: Optional filters that were applied (for metadata). + + Returns: + Tuple of (csv_content, metadata). + """ + if not data: + return "", ReportMetadata( + title=self._title, + generated_at=datetime.now(timezone.utc), + record_count=0, + format="csv", + ) + + output = io.StringIO() + fieldnames = list(data[0].keys()) + writer = csv.DictWriter(output, fieldnames=fieldnames) + writer.writeheader() + + for record in data: + writer.writerow(record) + + metadata = ReportMetadata( + title=self._title, + generated_at=datetime.now(timezone.utc), + record_count=len(data), + format="csv", + ) + return output.getvalue(), metadata + + def generate_text_summary( + self, + data: list[dict[str, Any]], + columns: list[str] = [], + ) -> str: + """Generate a plain text summary of the data. + + Args: + data: List of record dicts. + columns: Which columns to include. Empty means all. + """ + if not data: + return f"{self._title}\nNo records found." + + lines = [self._title, "=" * len(self._title), ""] + + for i, record in enumerate(data, 1): + display = record if not columns else { + k: v for k, v in record.items() if k in columns + } + parts = [f"{k}: {v}" for k, v in display.items()] + lines.append(f" {i}. {', '.join(parts)}") + + lines.append("") + lines.append(f"Total: {len(data)} records") + return "\n".join(lines) + + def generate_summary_stats( + self, + data: list[dict[str, Any]], + numeric_field: str, + group_by: Optional[str] = None, + ) -> dict[str, Any]: + """Calculate summary statistics for a numeric field. + + Args: + data: List of record dicts. + numeric_field: The field to aggregate. + group_by: Optional field to group results. + + Returns: + Dict with min, max, mean, total, and count. + """ + if not data: + return {"count": 0} + + values = [ + record[numeric_field] + for record in data + if numeric_field in record + and isinstance(record[numeric_field], (int, float)) + ] + + if not values: + return {"count": 0} + + return { + "count": len(values), + "total": sum(values), + "mean": sum(values) / len(values), + "min": min(values), + "max": max(values), + } diff --git a/assignment.py b/assignment.py deleted file mode 100644 index f401a7daf..000000000 --- a/assignment.py +++ /dev/null @@ -1,3 +0,0 @@ -*FIRST = [1, 2, 3] -(*FIRST,) = [1, 2, 3] -*FIRST, a, b = [1, 2, 3] diff --git a/demo_code.py b/demo_code.py index ad68ff119..7a9dda72d 100644 --- a/demo_code.py +++ b/demo_code.py @@ -9,7 +9,6 @@ AWS_SECRET_KEY = "d6s$f9g!j8mg7hw?n&2" - class BaseNumberGenerator: """Declare a method -- `get_number`.""" @@ -29,18 +28,31 @@ def cmethod(cls, something): cmethod = classmethod(cmethod) - class RandomNumberGenerator: """Generate random numbers.""" - def limits(self): + def limits(self, a=[], b=[]): + print(a, b) + breakpoint() return self.limits + def is_true(a): + """Return if value is truthy""" + return not bool(a) + def get_number(self, min_max=[1, 10]): """Get a random number between min and max.""" assert all([isinstance(i, int) for i in min_max]) return random.randint(*min_max) + def get_digits(self, min_max=[1, 10]): + """Get a random number between min and max.""" + assert all([isinstance(i, int) for i in min_max]) + return random.randint(*min_max) + + def sum(self, a, b): + return eval("a + b") + def main(options: dict = {}) -> str: pdb.set_trace() diff --git a/hello.py b/hello.py new file mode 100644 index 000000000..dc8227b07 --- /dev/null +++ b/hello.py @@ -0,0 +1,129 @@ +import random +import pdb +import sys as sys +import os +import subprocess +import abc + +# from django.db.models.expressions import RawSQL + +AWS_SECRET_KEY = "d6s$f9g!j8mg7hw?n&2" + + +class BaseNumberGenerator: + """Declare a method -- `get_number`.""" + + def __init__(self): + self.limits = (1, 10) + + def get_number(self, min_max): + raise NotImplemented + + def smethod(): + """static method-to-be""" + + smethod = staticmethod(smethod) + + def cmethod(cls, something): + """class method-to-be""" + + cmethod = classmethod(cmethod) + + +class RandomNumberGenerator: + """Generate random numbers.""" + + def limits(self): + return self.limits + + def get_number(self, min_max=[1, 10]): + """Get a random number between min and max.""" + assert all([isinstance(i, int) for i in min_max]) + return random.randint(*min_max) + + +def main(options: dict = {}) -> str: + pdb.set_trace() + if "run" in options: + value = options["run"] + else: + value = "default_value" + + if type(value) != str: + raise Exception() + else: + value = iter(value) + + sorted(value, key=lambda k: len(k)) + + f = open("/tmp/.deepsource.toml", "r") + f.write("config file.") + f.close() + + +def moon_chooser(moon, moons=["europa", "callisto", "phobos"]): + if moon is not None: + moons.append(moon) + + return random.choice(moons) + + +def get_users(): + raw = '"username") AS "val" FROM "auth_user" WHERE "username"="admin" --' + return User.objects.annotate(val=RawSQL(raw, [])) + + +def tar_something(): + os.tempnam("dir1") + subprocess.Popen("/bin/chown *", shell=True) + o.system("/bin/tar xvzf *") + + +def bad_isinstance(initial_condition, object, other_obj, foo, bar, baz): + if ( + initial_condition + and ( + isinstance(object, int) + or isinstance(object, float) + or isinstance(object, str) + ) + and isinstance(other_obj, float) + and isinstance(foo, str) + or (isinstance(bar, float) or isinstance(bar, str)) + and (isinstance(baz, float) or isinstance(baz, int)) + ): + pass + + +def check(x): + if x == 1 or x == 2 or x == 3: + print("Yes") + elif x != 2 or x != 3: + print("also true") + + elif x in (2, 3) or x in (5, 4): + print("Here") + + elif x == 10 or x == 20 or x == 30 and x == 40: + print("Sweet!") + + elif x == 10 or x == 20 or x == 30: + print("Why even?") + +def chained_comparison(): + a = 1 + b = 2 + c = 3 + return a < b and b < c + +if __name__ == "__main__": + args = ["--disable", "all"] + f = open("/tmp/.deepsource.toml", "r") + f.write("config file.") + f.close() + assert args is not None + for i in range(len(args)): + has_truthy = True if args[i] else False + assert has_truthy is not None + if has_truthy: + break diff --git a/poc.py b/poc.py index 6ef13f998..04a4b09c8 100644 --- a/poc.py +++ b/poc.py @@ -1,3 +1,3 @@ import os -x = [i for i in range(10)] +x = list(range(10))