-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
119 lines (94 loc) · 3.51 KB
/
utils.py
File metadata and controls
119 lines (94 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
Utility Functions for Proxy Manager.
Common helper functions for proxy parsing, validation,
and formatting used across the application.
"""
import re
import logging
from typing import Tuple, Optional
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
# Regex pattern for proxy URL validation
PROXY_URL_PATTERN = re.compile(
r"^(https?|socks5)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$"
)
def parse_proxy_url(url: str) -> Tuple[str, int, str, Optional[str], Optional[str]]:
"""
Parse a proxy URL into its components.
Args:
url: Proxy URL (e.g., "socks5://user:[email protected]:1080")
Returns:
Tuple of (address, port, protocol, username, password)
Raises:
ValueError: If the URL format is invalid.
"""
match = PROXY_URL_PATTERN.match(url)
if not match:
raise ValueError(f"Invalid proxy URL format: {url}")
protocol, username, password, address, port = match.groups()
return address, int(port), protocol, username, password
def validate_ip_address(address: str) -> bool:
"""Validate an IPv4 address format."""
parts = address.split(".")
if len(parts) != 4:
return False
return all(
part.isdigit() and 0 <= int(part) <= 255
for part in parts
)
def validate_port(port: int) -> bool:
"""Validate a port number is within valid range."""
return 1 <= port <= 65535
def format_proxy_list(proxies: list, include_auth: bool = False) -> str:
"""
Format a list of proxies as a human-readable string.
Args:
proxies: List of Proxy objects.
include_auth: Whether to include authentication details.
Returns:
Formatted string with proxy details.
"""
lines = []
for i, proxy in enumerate(proxies, 1):
status_icon = "+" if proxy.status.value == "active" else "-"
line = f" [{status_icon}] {proxy.address}:{proxy.port} ({proxy.protocol.value})"
if proxy.latency_ms > 0:
line += f" - {proxy.latency_ms:.0f}ms"
if include_auth and proxy.username:
line += f" [auth: {proxy.username}]"
lines.append(line)
return "\n".join(lines) if lines else " (no proxies)"
def load_proxies_from_file(filepath: str) -> list:
"""
Load proxy URLs from a text file (one per line).
Args:
filepath: Path to the proxy list file.
Returns:
List of parsed proxy tuples.
"""
proxies = []
try:
with open(filepath, "r") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith("#"):
continue
try:
proxy_data = parse_proxy_url(line)
proxies.append(proxy_data)
except ValueError as e:
logger.warning("Line %d: %s", line_num, e)
except FileNotFoundError:
logger.error("Proxy file not found: %s", filepath)
return proxies
def setup_logging(level: str = "INFO", log_file: Optional[str] = None) -> None:
"""Configure application logging."""
handlers = [logging.StreamHandler()]
if log_file:
handlers.append(logging.FileHandler(log_file))
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=handlers,
)