forked from constverum/ProxyBroker
-
-
Notifications
You must be signed in to change notification settings - Fork 136
Expand file tree
/
Copy pathjudge.py
More file actions
136 lines (120 loc) · 4.17 KB
/
judge.py
File metadata and controls
136 lines (120 loc) · 4.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import asyncio
import random
from urllib.parse import urlparse
import aiohttp
from .errors import ResolveError
from .resolver import Resolver
from .utils import get_headers, log
class Judge:
"""Proxy Judge."""
available = {"HTTP": [], "HTTPS": [], "SMTP": []}
ev = {
"HTTP": asyncio.Event(),
"HTTPS": asyncio.Event(),
"SMTP": asyncio.Event(),
}
def __init__(self, url, timeout=8, verify_ssl=False, loop=None):
self.url = url
self.scheme = urlparse(url).scheme.upper()
self.host = urlparse(url).netloc
self.path = url.split(self.host)[-1]
self.ip = None
self.is_working = False
self.marks = {"via": 0, "proxy": 0}
self.timeout = timeout
self.verify_ssl = verify_ssl
try:
self._loop = loop or asyncio.get_running_loop()
except RuntimeError:
# No running event loop, will be set later
self._loop = loop
self._resolver = Resolver(loop=self._loop)
def __repr__(self):
"""Class representation"""
return "<Judge [%s] %s>" % (self.scheme, self.host)
@classmethod
def get_random(cls, proto):
if proto == "HTTPS":
scheme = "HTTPS"
elif proto == "CONNECT:25":
scheme = "SMTP"
else:
scheme = "HTTP"
return random.choice(cls.available[scheme])
@classmethod
def clear(cls):
cls.available["HTTP"].clear()
cls.available["HTTPS"].clear()
cls.available["SMTP"].clear()
cls.ev["HTTP"].clear()
cls.ev["HTTPS"].clear()
cls.ev["SMTP"].clear()
async def check(self, real_ext_ip):
# TODO: need refactoring
try:
self.ip = await self._resolver.resolve(self.host)
except ResolveError:
return
if self.scheme == "SMTP":
self.is_working = True
self.available[self.scheme].append(self)
self.ev[self.scheme].set()
return
page = False
headers, rv = get_headers(rv=True)
connector = aiohttp.TCPConnector(
loop=self._loop, ssl=self.verify_ssl, force_close=True
)
try:
timeout = aiohttp.ClientTimeout(total=self.timeout)
async with (
aiohttp.ClientSession(connector=connector, timeout=timeout) as session,
session.get(
url=self.url, headers=headers, allow_redirects=False
) as resp,
):
page = await resp.text()
except (
asyncio.TimeoutError,
aiohttp.ClientOSError,
aiohttp.ClientResponseError,
aiohttp.ServerDisconnectedError,
) as e:
log.debug("%s is failed. Error: %r;" % (self, e))
return
page = page.lower()
if resp.status == 200 and real_ext_ip in page and rv in page:
self.marks["via"] = page.count("via")
self.marks["proxy"] = page.count("proxy")
self.is_working = True
self.available[self.scheme].append(self)
self.ev[self.scheme].set()
log.debug("%s is verified" % self)
else:
log.debug(
f"{self} is failed. HTTP status code: {resp.status}; "
f"Real IP on page: {real_ext_ip in page}; Version: {rv in page}; "
f"Response: {page}"
)
def get_judges(judges=None, timeout=8, verify_ssl=False):
judges = judges or [
'http://httpbin.org/get?show_env',
'https://httpbin.org/get?show_env',
'smtp://smtp.gmail.com',
'smtp://aspmx.l.google.com',
'http://azenv.net/',
'https://www.proxy-listen.de/azenv.php',
'http://www.proxyfire.net/fastenv',
'http://proxyjudge.us/azenv.php',
'http://ip.spys.ru/',
'http://www.proxy-listen.de/azenv.php',
'http://httpbin.com.br/anything?show_env',
'https://httpbin.com.br/anything?show_env',
]
_judges = []
for j in judges:
j = j if isinstance(j, Judge) else Judge(j)
j.timeout = timeout
j.verify_ssl = verify_ssl
_judges.append(j)
return _judges