forked from wherobots/wherobots-python-dbapi
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcursor.py
More file actions
183 lines (143 loc) · 5.63 KB
/
cursor.py
File metadata and controls
183 lines (143 loc) · 5.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import queue
import re
from typing import Any, List, Tuple, Dict
from .errors import ProgrammingError
from .models import ExecutionResult, Store, StoreResult
# Matches pyformat parameter markers: %(name)s
_PYFORMAT_RE = re.compile(r"%\(([^)]+)\)s")
def _substitute_parameters(operation: str, parameters: Dict[str, Any] | None) -> str:
"""Substitute pyformat parameters into a SQL operation string.
Uses regex to match only %(name)s tokens, leaving literal percent
characters (e.g. SQL LIKE wildcards) untouched.
"""
if not parameters:
return operation
def replacer(match: re.Match) -> str:
key = match.group(1)
if key not in parameters:
raise ProgrammingError(
f"Parameter '{key}' not found in provided parameters"
)
return str(parameters[key])
return _PYFORMAT_RE.sub(replacer, operation)
_TYPE_MAP = {
"object": "STRING",
"int64": "NUMBER",
"float64": "NUMBER",
"datetime64[ns]": "DATETIME",
"timedelta[ns]": "DATETIME",
"bool": "NUMBER", # Assuming boolean is stored as number
"bytes": "BINARY",
}
class Cursor:
def __init__(self, exec_fn, cancel_fn) -> None:
self.__exec_fn = exec_fn
self.__cancel_fn = cancel_fn
self.__queue: queue.Queue = queue.Queue()
self.__results: list[Any] | None = None
self.__store_result: StoreResult | None = None
self.__current_execution_id: str | None = None
self.__current_row: int = 0
# Description and row count are set by the last executed operation.
# Their default values are defined by PEP-0249.
self.__description: List[Tuple] | None = None
self.__rowcount: int = -1
# Array-size is also defined by PEP-0249 and is expected to be read/writable.
self.arraysize: int = 1
@property
def description(self) -> List[Tuple] | None:
return self.__description
@property
def rowcount(self) -> int:
return self.__rowcount
def __on_execution_result(self, result) -> None:
self.__queue.put(result)
def __get_results(self) -> List[Tuple[Any, ...]] | None:
if not self.__current_execution_id:
raise ProgrammingError("No query has been executed yet")
if self.__results is not None:
return self.__results
execution_result = self.__queue.get()
if not isinstance(execution_result, ExecutionResult):
raise ProgrammingError("Unexpected result type")
if execution_result.error:
raise execution_result.error
self.__store_result = execution_result.store_result
results = execution_result.results
# Results is None when results are stored in cloud storage
if results is None:
return None
self.__rowcount = len(results)
self.__results = results
if not results.empty:
self.__description = [
(
col_name, # name
_TYPE_MAP.get(str(results[col_name].dtype), "STRING"), # type_code
None, # display_size
results[col_name].memory_usage(), # internal_size
None, # precision
None, # scale
True, # null_ok; Assuming all columns can accept NULL values
)
for col_name in results.columns
]
return self.__results
def execute(
self,
operation: str,
parameters: Dict[str, Any] | None = None,
store: Store | None = None,
) -> None:
if self.__current_execution_id:
self.__cancel_fn(self.__current_execution_id)
self.__results = None
self.__store_result = None
self.__current_row = 0
self.__rowcount = -1
self.__description = None
self.__current_execution_id = self.__exec_fn(
_substitute_parameters(operation, parameters),
self.__on_execution_result,
store,
)
def get_store_result(self) -> StoreResult | None:
"""Get the store result for the last executed query.
Returns the StoreResult containing the URI and size of the stored
results, or None if the query was not configured to store results.
This method blocks until the query completes.
"""
if not self.__current_execution_id:
raise ProgrammingError("No query has been executed yet")
# Ensure we've waited for the result
self.__get_results()
return self.__store_result
def executemany(
self, operation: str, seq_of_parameters: List[Dict[str, Any]]
) -> None:
raise NotImplementedError
def fetchone(self) -> Any:
results = self.__get_results()[self.__current_row :]
if len(results) == 0:
return None
self.__current_row += 1
return results[0]
def fetchmany(self, size: int = None) -> List[Any]:
size = size or self.arraysize
results = self.__get_results()[self.__current_row : self.__current_row + size]
self.__current_row += size
return results
def fetchall(self) -> List[Any]:
return self.__get_results()[self.__current_row :]
def close(self) -> None:
"""Close the cursor."""
if self.__results is None and self.__current_execution_id:
self.__cancel_fn(self.__current_execution_id)
def __iter__(self):
return self
def __next__(self) -> None:
raise StopIteration
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()