Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ select = [
"B", # flake8-bugbear
"SIM", # flake8-simplify
]
ignore = [
"E501",
]

[tool.pytest.ini_options]
testpaths = [
Expand All @@ -85,3 +88,6 @@ testpaths = [
addopts = [
"--import-mode=importlib",
]
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
]
30 changes: 21 additions & 9 deletions src/onc/modules/_DataProductFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,23 @@ def __init__(self, max_retries):

class _DataProductFile:
"""
Donwloads a single data product file
Downloads a single data product file
Is able to poll and wait if required
"""

def __init__(self, dpRunId: int, index: str, baseUrl: str, token: str):
def __init__(
self,
dpRunId: int,
index: str,
baseUrl: str,
token: str,
verbosity: str = "INFO",
):
self.verbosity = verbosity
# Use child logger 'onc.poll' consistent with _PollLog
from ._Messages import setup_logger

self._log = setup_logger("onc.poll", verbosity)
self._retries = 0
self._status = 202
self._downloaded = False
Expand Down Expand Up @@ -50,7 +62,7 @@ def download(
Can poll, wait and retry if the file is not ready to download
Return the file information
"""
log = _PollLog(True)
log = _PollLog(self.verbosity)
self._status = 202
while self._status == 202:
# Run timed request
Expand All @@ -74,20 +86,19 @@ def download(
response, outPath, filename, overwrite
)
except FileExistsError:
if self._retries > 1:
print("")
print(f' Skipping "{self._filePath}": File already exists.')
self._log.info(
f' Skipping "{self._filePath}": File already exists.'
)
self._status = 777

elif self._status == 202: # Still processing, wait and retry
log.logMessage(response.json())
sleep(pollPeriod)

elif self._status == 204: # No data found
print(" No data found.")
self._log.info(" No data found.")

elif self._status == 404: # Index too high, no more files to download
log.printNewLine()
pass

elif self._status == 410: # Status 410: gone (file deleted from FTP)
Expand All @@ -98,7 +109,8 @@ def download(
stacklevel=2,
)
else:
raise requests.HTTPError(_createErrorMessage(response))
self._log.error(_createErrorMessage(response))
response.raise_for_status()

return self._status

Expand Down
139 changes: 139 additions & 0 deletions src/onc/modules/_Messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import logging
import re
import time

import requests

REQ_MSG = "Requested: {}" # get request url
RESPONSE_TIME_MSG = "Response received in {} seconds." # requests.elapsed value.
RESPONSE_MSG = "HTTP Response: {} ({})" # Brief description, status code
MULTIPAGE_MSG = (
"The requested data quantity is greater than the "
"supplied row limit and will be downloaded over multiple requests."
)


LEVEL_MAP = {
"CRITICAL": logging.CRITICAL,
"ERROR": logging.ERROR,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
}


class OnclibFormatter(logging.Formatter):
"""
Custom formatter that removes prefix for INFO level logs.
"""

def format(self, record):
if record.levelno == logging.INFO:
return record.getMessage()
return super().format(record)


def setup_logger(logger_name: str = "onc", level: int | str = "INFO") -> logging.Logger:
"""
Set up a logger object for displaying verbose messages to console.

Parameters
----------
logger_name : str, optional
The unique logger name to use. Can be shared between modules.
level : int or str, optional
The logging level to use. Default is 'INFO'.

Returns
-------
logging.Logger
The configured logging.Logger object.
"""

# Ensure level is a valid logging level integer
if isinstance(level, str):
level = LEVEL_MAP.get(level.upper(), logging.INFO)

logger = logging.getLogger(logger_name)
logger.propagate = False

# Apply level to the logger itself
logger.setLevel(level)

if not logger.handlers:
console = logging.StreamHandler()
console.setLevel(level)

# Set the logging format.
dtfmt = "%Y-%m-%dT%H:%M:%S"
strfmt = (
"%(asctime)s.%(msecs)03dZ | %(name)-12s | %(levelname)-8s | %(message)s"
)
fmt = OnclibFormatter(strfmt, datefmt=dtfmt)
fmt.converter = time.gmtime

console.setFormatter(fmt)
logger.addHandler(console)
else:
# If handlers exist, ensure they have the correct level
for handler in logger.handlers:
handler.setLevel(level)

return logger


def scrub_token(input: str) -> str:
"""
Replace a token in a query URL or other string with the string 'REDACTED'
so that users don't accidentally commit their tokens to public repositories
if ONC Info/Warnings are too verbose.

Parameters
----------
input : str
An Oceans 3.0 API URL or string with a token query parameter.

Returns
-------
str
A scrubbed url.
"""
return re.sub(r"([?&]token=)[a-f0-9-]{36}", r"\1REDACTED", input)


def build_error_message(response: requests.Response, redact_token: bool) -> str:
"""
Build an error message from a requests.Response object.

Parameters
----------
response : requests.Response
A requests.Response object.
redact_token : bool
If true, redact tokens before returning an error message.

Returns
-------
str
An error message.
"""
payload = response.json()
message = payload.get("message")

if "errors" in payload:
errors = payload["errors"]
error_messages = []
for error in errors:
emsg = (
f"(API Error Code {error['errorCode']}) "
f"{error['errorMessage']} for query parameter(s) "
f"'{error['parameter']}'."
)
error_messages.append(emsg)
error_message = "\n".join(error_messages)
else:
error_message = None
msg = "\n".join([m for m in (message, error_message) if m is not None])
if redact_token is True and "token=" in msg:
msg = scrub_token(msg)
return msg
119 changes: 77 additions & 42 deletions src/onc/modules/_MultiPage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from time import time

import dateutil.parser
import humanize
from onc.modules._Messages import (
setup_logger,
)

from ._util import _formatDuration

Expand All @@ -14,12 +16,25 @@ class _MultiPage:
def __init__(self, parent: object):
self.parent = weakref.ref(parent)
self.result = None
self._log = setup_logger("onc.multi", self._config("verbosity") or "INFO")

def _config(self, key):
p = self.parent()
if p is None:
return None
if hasattr(p, "_config"):
return p._config(key)
return getattr(p, key, None)

def getAllPages(self, service: str, url: str, filters: dict):
"""
Requests all pages from the service, with the url and filters
Multiple pages will be downloaded until completed
@return: Service response with concatenated data for all pages obtained

Returns
-------
dict
Service response with concatenated data for all pages obtained.
"""
# pop archivefiles extension
extension = None
Expand All @@ -30,48 +45,54 @@ def getAllPages(self, service: str, url: str, filters: dict):
# download first page
start = time()
response, responseTime = self._doPageRequest(url, filters, service, extension)
rNext = response["next"]

if rNext is not None:
print(
"Data quantity is greater than the row limit and",
"will be downloaded in multiple pages.",
)

pageCount = 1
pageEstimate = self._estimatePages(response, service)
if pageEstimate > 0:
# Exclude the first page when calculating the time estimation
timeEstimate = _formatDuration((pageEstimate - 1) * responseTime)
print(
f"Downloading time for the first page: {humanize.naturaldelta(responseTime)}" # noqa: E501
)
print(f"Estimated approx. {pageEstimate} pages in total.")
print(
f"Estimated approx. {timeEstimate} to complete for the rest of the pages." # noqa: E501
)

# keep downloading pages until next is None
print("")
while rNext is not None:
pageCount += 1
rowCount = self._rowCount(response, service)
if isinstance(response, dict):
rNext = response["next"]

print(f" ({rowCount} samples) Downloading page {pageCount}...")
nextResponse, nextTime = self._doPageRequest(
url, rNext["parameters"], service, extension
if rNext is not None:
self._log.info(
"The requested data quantity is greater than the supplied "
"row limit and will be downloaded over multiple requests."
)
rNext = nextResponse["next"]

# concatenate new data obtained
self._catenateData(response, nextResponse, service)

totalTime = _formatDuration(time() - start)
print(
f" ({self._rowCount(response, service):d} samples)"
f" Completed in {totalTime}."
)
response["next"] = None
pageCount = 1
pageEstimate = self._estimatePages(response, service)
if pageEstimate > 0:
# Exclude the first page when calculating the time estimation
timeEstimate = _formatDuration((pageEstimate - 1) * responseTime)
self._log.info(
f"Download time for page {pageCount}: {round(responseTime, 2)} seconds"
)
self._log.info(
f"Est. number of pages remaining for download: {pageEstimate - 1}"
)
self._log.info(
f"Est. number of seconds to download remaining data: {timeEstimate}"
)

# keep downloading pages until next is None
while rNext is not None:
pageCount += 1
rowCount = self._rowCount(response, service)

self._log.info(
f" Submitting request for page {pageCount} ({rowCount} samples)..."
)

nextResponse, nextTime = self._doPageRequest(
url, rNext["parameters"], service, extension
)
rNext = nextResponse["next"]

# concatenate new data obtained
self._catenateData(response, nextResponse, service)

totalTime = _formatDuration(time() - start)

self._log.info(
f" Downloaded {self._rowCount(response, service):d} total samples in {totalTime}."
)
response["next"] = None

return response

Expand All @@ -81,8 +102,22 @@ def _doPageRequest(
"""
Wraps the _doRequest method
Performs additional processing of the response for certain services
@param extension: Only provide for archivefiles filtering
Returns a tuple (jsonResponse, duration)

Parameters
----------
url : str
API endpoint URL.
filters : dict
Filters for the request.
service : str
Name of the service (e.g. archivefile).
extension : str, optional
Only provide for archivefiles filtering.

Returns
-------
tuple
(jsonResponse, duration)
"""
if service.startswith("archivefile"):
response, duration = self.parent()._doRequest(url, filters, getTime=True)
Expand Down
Loading
Loading