-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscheduler.py
More file actions
147 lines (123 loc) · 5 KB
/
scheduler.py
File metadata and controls
147 lines (123 loc) · 5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""Proxy rotation scheduler with configurable strategies."""
import asyncio
import logging
import time
from typing import List, Optional, Dict, Callable
from dataclasses import dataclass, field
from enum import Enum
logger = logging.getLogger(__name__)
class RotationStrategy(Enum):
ROUND_ROBIN = "round_robin"
RANDOM = "random"
LEAST_USED = "least_used"
WEIGHTED = "weighted"
FAILOVER = "failover"
@dataclass
class SchedulerConfig:
strategy: RotationStrategy = RotationStrategy.ROUND_ROBIN
health_check_interval: int = 60
max_failures: int = 3
cooldown_seconds: int = 300
auto_remove_dead: bool = False
@dataclass
class ProxyStats:
proxy_url: str
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
consecutive_failures: int = 0
avg_response_time: float = 0.0
last_used: float = 0.0
last_success: float = 0.0
is_cooling_down: bool = False
cooldown_until: float = 0.0
class ProxyScheduler:
"""Manages proxy rotation with multiple strategies."""
def __init__(self, config: Optional[SchedulerConfig] = None):
self.config = config or SchedulerConfig()
self._proxies: Dict[str, ProxyStats] = {}
self._rr_index = 0
self._lock = asyncio.Lock()
def add_proxy(self, proxy_url: str, weight: int = 1) -> None:
if proxy_url not in self._proxies:
self._proxies[proxy_url] = ProxyStats(proxy_url=proxy_url)
logger.info(f"Added proxy: {proxy_url}")
def remove_proxy(self, proxy_url: str) -> None:
self._proxies.pop(proxy_url, None)
logger.info(f"Removed proxy: {proxy_url}")
async def get_next(self) -> Optional[str]:
async with self._lock:
available = self._get_available_proxies()
if not available:
logger.warning("No available proxies")
return None
if self.config.strategy == RotationStrategy.ROUND_ROBIN:
return self._round_robin(available)
elif self.config.strategy == RotationStrategy.LEAST_USED:
return self._least_used(available)
elif self.config.strategy == RotationStrategy.FAILOVER:
return self._failover(available)
else:
return self._round_robin(available)
def _get_available_proxies(self) -> List[ProxyStats]:
now = time.time()
available = []
for stats in self._proxies.values():
if stats.is_cooling_down and now > stats.cooldown_until:
stats.is_cooling_down = False
stats.consecutive_failures = 0
if not stats.is_cooling_down:
available.append(stats)
return available
def _round_robin(self, proxies: List[ProxyStats]) -> str:
self._rr_index = self._rr_index % len(proxies)
proxy = proxies[self._rr_index]
self._rr_index += 1
proxy.last_used = time.time()
proxy.total_requests += 1
return proxy.proxy_url
def _least_used(self, proxies: List[ProxyStats]) -> str:
proxy = min(proxies, key=lambda p: p.total_requests)
proxy.last_used = time.time()
proxy.total_requests += 1
return proxy.proxy_url
def _failover(self, proxies: List[ProxyStats]) -> str:
proxy = max(proxies, key=lambda p: p.successful_requests / max(p.total_requests, 1))
proxy.last_used = time.time()
proxy.total_requests += 1
return proxy.proxy_url
def report_success(self, proxy_url: str, response_time: float = 0) -> None:
if proxy_url in self._proxies:
stats = self._proxies[proxy_url]
stats.successful_requests += 1
stats.consecutive_failures = 0
stats.last_success = time.time()
n = stats.successful_requests
stats.avg_response_time = (stats.avg_response_time * (n - 1) + response_time) / n
def report_failure(self, proxy_url: str) -> None:
if proxy_url in self._proxies:
stats = self._proxies[proxy_url]
stats.failed_requests += 1
stats.consecutive_failures += 1
if stats.consecutive_failures >= self.config.max_failures:
stats.is_cooling_down = True
stats.cooldown_until = time.time() + self.config.cooldown_seconds
logger.warning(f"Proxy {proxy_url} cooling down for {self.config.cooldown_seconds}s")
def get_stats(self) -> List[Dict]:
return [
{
"url": s.proxy_url,
"total": s.total_requests,
"success": s.successful_requests,
"failed": s.failed_requests,
"avg_response_time": round(s.avg_response_time, 3),
"cooling_down": s.is_cooling_down,
}
for s in self._proxies.values()
]
@property
def proxy_count(self) -> int:
return len(self._proxies)
@property
def available_count(self) -> int:
return len(self._get_available_proxies())