如何在单独的文件中使用FastAPI依赖于端点/路由?路由、文件、如何在、依赖于

2023-09-04 01:57:24 作者:抬头、阳光依旧温暖

我在单独的文件中定义了一个WebSocket端点,如下所示:

from starlette.endpoints import WebSocketEndpoint
from connection_service import ConnectionService


class WSEndpoint(WebSocketEndpoint):
    """Handles Websocket connections"""

    async def on_connect(self,
            websocket: WebSocket,
            connectionService: ConnectionService = Depends(ConnectionService)):
        """Handles new connection"""
        self.connectionService = connectionService
        ...

main.py中,我将终结点注册为:

from fastapi import FastAPI
from starlette.routing import WebSocketRoute
from ws_endpoint import WSEndpoint

app = FastAPI(routes=[ WebSocketRoute("/ws", WSEndpoint) ])
但我的终结点的Depends从未解析。有没有办法让它起作用?

此外,此机制在FastAPI中的用途是什么?我们不能只使用局部/全局变量吗?

推荐答案

TL;DR

好的需求文档是什么样的结构

文档似乎提示您只能使用Depends执行请求函数。

说明

我在FastAPI存储库中找到了相关的issue #2057,Depends(...)似乎只对请求有效。

我确认了这一点,

from fastapi import Depends, FastAPI

app = FastAPI()


async def foo_func():
    return "This is from foo"


async def test_depends(foo: str = Depends(foo_func)):
    return foo


@app.get("/")
async def read_items():
    depends_result = await test_depends()
    return depends_result

在这种情况下,未解析依赖项。

就您的案例而言,您可以像这样解决依赖关系

from starlette.endpoints import WebSocketEndpoint
from connection_service import ConnectionService


class WSEndpoint(WebSocketEndpoint):
    async def on_connect(
            self,
            websocket: WebSocket,
            connectionService=None
    ):
        if connectionService is None:
            connectionService = ConnectionService()  # calling the depend function

        self.connectionService = connectionService
 
精彩推荐
图片推荐