我在VirtualBox上配置了一个Fedora34VM,内存为2048MB,以便在localhost:7070
上为这个FastAPI应用程序提供服务。完整的应用程序源代码、依赖关系代码和指令为here。下面是我能举的最小的可重现的例子。
main.py
import os, pathlib
import fastapi as fast
import aiofiles
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
RESULTS_DIR = pathlib.Path('/'.join((ROOT_DIR, 'results')))
app = fast.FastAPI()
@app.post('/api')
async def upload(
request: fast.Request,
file: fast.UploadFile = fast.File(...),
filedir: str = ''):
dest = RESULTS_DIR.joinpath(filedir, file.filename)
dest.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(dest, 'wb') as buffer:
await file.seek(0)
contents = await file.read()
await buffer.write(contents)
return f'localhost:7070/{dest.parent.name}/{dest.name}'
start.sh
服务器应用程序
#! /bin/bash
uvicorn --host "0.0.0.0" --log-level debug --port 7070 main:app
client.py
import httpx
from pathlib import Path
import asyncio
async def async_post_file_req(url: str, filepath: Path):
async with httpx.AsyncClient(
timeout=httpx.Timeout(write=None, read=None, connect=None, pool=None)) as client:
r = await client.post(
url,
files={
'file': (filepath.name, filepath.open('rb'), 'application/octet-stream')
}
)
if __name__ == '__main__':
url = 'http://localhost:7070'
asyncio.run(
async_post_file_req(
f'{url}/api',
Path('~/1500M.txt')
))
创建1500 MB文件
truncate -s 1500M 1500M.txt
上传1500 MB文件时,当前实现的upload
似乎是将整个文件读取到内存中,然后服务器返回{status: 400, reason: 'Bad Request', details: 'There was an error parsing the body.'}
,文件不会写入磁盘。当上传一个825 MB的文件时,服务器返回200,并且该文件被写入到磁盘。我不明白为什么在分析较大的文件时会出错。
发生了什么?
如何上载大于计算机可用内存的文件?
我必须对正文进行流处理吗?
深入源代码,我发现FastAPI在试图确定请求表单或正文是否需要读取时,会抛出源代码中状态代码为400和详细信息There was an error in parsing body
exactly once的Http异常。FastAPI Request is basically the Starlette Request,所以我将FastAPI服务器应用程序重新实现为Starlette应用程序,希望它能绕过此异常处理程序,并为我提供有关此问题的更多信息。
main.py
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
async def homepage(request):
return JSONResponse({'hello': 'world'})
async def upload(request):
form = await request.form()
print(type(form['upload_file']))
filename = form['upload_file'].filename or 'not found'
contents = await form['upload_file'].read()
b = len(contents) or -1
return JSONResponse({
'filename': filename,
'bytes': b
})
app = Starlette(debug=True, routes=[
Route('/', homepage),
Route('/api', upload, methods=['POST'])
])
Pipfile
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
starlette = "*"
uvicorn = "*"
uvloop = "*"
httpx = "*"
watchgod = "*"
python-multipart = "*"
[dev-packages]
[requires]
python_version = "3.9"
在发布大小为989 MiB或更大的文件时,Starlette应用程序抛出操作系统错误28,设备上没有剩余空间。大小为988 MiB或更小的文件未导致错误。
INFO: 10.0.2.2:46996 - "POST /api HTTP/1.1" 500 Internal Server Error
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/uvicorn/protocols/http/httptools_impl.py", line 398, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "/usr/local/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
return await self.app(scope, receive, send)
File "/usr/local/lib/python3.9/site-packages/starlette/applications.py", line 112, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 181, in __call__
raise exc from None
File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 159, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.9/site-packages/starlette/exceptions.py", line 82, in __call__
raise exc from None
File "/usr/local/lib/python3.9/site-packages/starlette/exceptions.py", line 71, in __call__
await self.app(scope, receive, sender)
File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 580, in __call__
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 241, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 52, in app
response = await func(request)
File "/home/vagrant/star-file-server/./main.py", line 11, in upload
form = await request.form()
File "/usr/local/lib/python3.9/site-packages/starlette/requests.py", line 240, in form
self._form = await multipart_parser.parse()
File "/usr/local/lib/python3.9/site-packages/starlette/formparsers.py", line 231, in parse
await file.write(message_bytes)
File "/usr/local/lib/python3.9/site-packages/starlette/datastructures.py", line 445, in write
await run_in_threadpool(self.file.write, data)
File "/usr/local/lib/python3.9/site-packages/starlette/concurrency.py", line 40, in run_in_threadpool
return await loop.run_in_executor(None, func, *args)
File "/usr/lib64/python3.9/concurrent/futures/thread.py", line 52, in run
result = self.fn(*self.args, **self.kwargs)
File "/usr/lib64/python3.9/tempfile.py", line 755, in write
rv = file.write(s)
OSError: [Errno 28] No space left on device
Starlette's UploadFile data structure使用SpooledTemporaryFile。此对象写入您的os's temporary directory。我的临时目录是/tmp
,因为我在Fedora 34上,并且我还没有创建任何环境变量来告诉python使用其他任何内容作为临时目录。
[vagrant@fedora star-file-server]$ python
Python 3.9.5 (default, May 14 2021, 00:00:00)
[GCC 11.1.1 20210428 (Red Hat 11.1.1-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import tempfile
>>> tempfile.gettempdir()
'/tmp'
[vagrant@fedora star-file-server]$ df -h
Filesystem Size Used Avail Use% Mounted on
devtmpfs 974M 0 974M 0% /dev
tmpfs 989M 168K 989M 1% /dev/shm
tmpfs 396M 5.6M 390M 2% /run
/dev/sda1 40G 1.6G 36G 5% /
tmpfs 989M 0 989M 0% /tmp
tmpfs 198M 84K 198M 1% /run/user/1000
Starlette将SpooledTemporaryDirectory
的max_size
设置为1 MiB。从Python临时文件文档来看,我认为这意味着在使用临时文件时,一次只能将1 MiB从临时文件读入内存。尽管它的大小是1 MiB,但989 MiB似乎是UploadFile
大小的正确硬边界,因为SpooledTemporaryDirectory
由系统临时目录可用的存储绑定。
如果我仍然要使用UploadFile
,我可以创建一个环境变量来指向已知始终有足够可用空间的设备,即使对于最大的上载也是如此。
export TMPDIR=/huge_storage_device
我喜欢的方法使用请求的stream
,以避免必须将文件写两次,第一次写到本地临时目录,第二次写到本地永久目录。
import os, pathlib
import fastapi as fast
import aiofiles
app = fast.FastAPI()
@app.post('/stream')
async def stream(
request: fast.Request,
filename: str,
filedir: str = ''
):
dest = RESULTS_DIR.joinpath(filedir, filename)
dest.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(dest, 'wb') as buffer:
async for chunk in request.stream():
await buffer.write(chunk)
return {
'loc': f'localhost:7070/{dest.parent.name}/{dest.name}'
}
使用这种方法,当我将文件(5M、450M、988M,每个都有两次重复测量)上传到运行在具有2048 MiB内存的Fedora VM上的服务器时,服务器从未使用过太多内存,也没有崩溃,平均延迟减少了40%(即发布到/stream
的延迟大约是发布到/api
的延迟的60%)。