-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
175 lines (145 loc) · 5.73 KB
/
main.py
File metadata and controls
175 lines (145 loc) · 5.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from zipfile import ZipFile, ZIP_DEFLATED
from fastapi.responses import FileResponse, JSONResponse
from tempfile import TemporaryDirectory
from uuid import uuid4
import os
import logging
import asyncio
import aiohttp
import aiofiles
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S',
filename='urls_to_zip.log',
encoding='utf-8'
)
# Data Model for the POST request body items
class URLData(BaseModel):
url: str
filename: str
app = FastAPI()
# Dict to check status of task (key: zip_id, val: 'pending' | 'complete')
# Task entry deleted after returning zip to client
task_dict = {}
# This async func takes the list from the request body and a temp dir to store the files;
# fetching each asynchronously with fetch_file() and ensuring unique
# filenames with get_filename_to_arc() then returns the files
async def loop_session_calls(urls: list[URLData], tmpdir: TemporaryDirectory):
# file name dict to be consumed by get_filename_to_arc() then reset
file_name_dict = {}
def get_filename_to_arc(item: URLData) -> str:
try:
# if file name exists in dict:
# modify filename appending (value) before the extension
if file_name_dict[item.filename]:
new_filename = item.filename[:item.filename.index('.')] \
+ f'({file_name_dict[item.filename]})' \
+ item.filename[item.filename.index('.'):]
# then increment the value assigned
file_name_dict[item.filename] += 1
return new_filename
except KeyError:
# else initialize dict entry as 1
file_name_dict[item.filename] = 1
return item.filename
async def fetch_file(session, item: URLData):
try:
async with session.get(item.url) as res:
filename_to_arc = get_filename_to_arc(item)
file_contents = await res.content.read()
async with aiofiles.open(f'{tmpdir.name}/{filename_to_arc}', 'wb') as file_to_arc:
await file_to_arc.write(file_contents)
logging.info(f'wrote the content for {item.url} to {filename_to_arc}')
return file_to_arc
except aiohttp.ClientConnectorError as e:
logging.error(f'There has been a connection error: {e}')
return {
'item': item,
'error': e,
}
except Exception as ex:
logging.error(f'Unexpected issue: {ex}')
return {
'item': item,
'error': ex,
}
# Fire off all the async fetch_file() calls for each item in the request body
# Return the files listed in a temp directory
async with aiohttp.ClientSession() as session:
tasks = [fetch_file(session, item) for item in urls]
file_list = await asyncio.gather(*tasks)
# reset file name dict and return file_list
file_name_dict = {}
return file_list
# Get the list of files from loop_session_calls() passing in the urls from the
# POST request body and write them to a zip file, currently skipping the files that
# weren't successfully retrieved, could do something else in the future
async def create_zipfile(urls: list[URLData], tmpdir: TemporaryDirectory, zip_id: str):
file_list = await loop_session_calls(urls, tmpdir)
def zip_files(file_list: list) -> str:
file_path = f'./{zip_id}.zip'
try:
with ZipFile(file_path, 'w', compression=ZIP_DEFLATED) as myzip:
for f in file_list:
if type(f) == dict and 'error' in f.keys():
logging.error(f"URL: {f['item'].url} failed with error: {f['error']}")
continue
elif f:
name_to_save = f.name.split('/')[-1]
# append to the zip archive
myzip.write(f.name, arcname=name_to_save)
logging.info(f'wrote {name_to_save} to myzip')
else:
logging.error(f'There has been an issue...file==None\n{f}')
except Exception as e:
logging.error(f'An unrealized exception: {e}')
zip_files(file_list)
async def getzip(urls: list[URLData], zip_id: str):
# temp directory to save fetched files
tmpdir = TemporaryDirectory()
file_path = await create_zipfile(urls, tmpdir, zip_id)
# Change task status to 'complete' in task_dict
task_dict[zip_id] = 'complete'
def delete_zip(file_path: str):
try:
os.remove(file_path)
except Exception as e:
logging.error(f'Error removing file at {file_path}: {e}')
# Return the zip to client if task is completed, then kickoff
# background task to delete the zip after returning
# Send appropriate response otherwise
@app.get('/get-zip/{zip_id}')
async def retrieve_zip(zip_id: str, background_tasks: BackgroundTasks):
file_path = f'./{zip_id}.zip'
try:
if task_dict[zip_id] == 'complete' and os.path.exists(file_path):
del task_dict[zip_id]
background_tasks.add_task(delete_zip, file_path)
return FileResponse(file_path, \
media_type='application/x-zip-compressed', \
filename=file_path, \
status_code=200)
else:
return JSONResponse({
'message': 'Zip file still processing, try again',
'id': zip_id
}, status_code=202)
except KeyError:
return JSONResponse({
'message': 'This zip id is invalid or has already been returned',
'id': zip_id
}, status_code=404)
# Generate task/zip id, set status to pending in task_dict and fireoff
# background task to start the zipping process after responding to the request
@app.post('/get-zip')
async def main(urls: list[URLData], background_tasks: BackgroundTasks):
zip_id = str(uuid4())
task_dict[zip_id] = 'pending'
background_tasks.add_task(getzip, urls, zip_id)
return JSONResponse({
'message': 'Zip file processing, send GET request to /get-zip/<zip_id>',
'id': zip_id
}, status_code=202)