【FastAPI】backgroundtasksが同期的に実行される場合
環境
Python=3.8.1
fastapi==0.62.0 starlette==0.13.6
同期的に実行される(非同期実行にならなかった)コード
以下のような感じでFastAPIのbackgroundtaskを活用して、非同期に処理を行おうとしていました。 (実際の処理からかなり簡略化しています。)
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
async def do_async(counter):
time.sleep(3)
print(f"{datetime.utcnow()} [{threading.get_ident()}]{counter} async end")
@app.get("/")
async def docroot(background_tasks: BackgroundTasks):
for i in range(3):
background_tasks.add_task(do_async, i)
print(f"main thread: {threading.get_ident()}")
return "ok"
しかし、実際にこのエンドポイントを呼び出すと結果が返ってくるのは9秒後となりました。
これはbackground_tasks.add_taskで追加したタスクがすべて完了したタイミングなので、上手く非同期実行が出来ていませんでした。
ちょっと込み入ったことをしていたので、バグなのか仕様なのか実装ミスなのか分からなかったので調査しました。
良くある想定通り非同期実行されるコード
「do_async」関数の内容を変えることで、通常通り非同期に実行されるようになります。
①非同期になる例-1
async def do_async(counter):
# asyncio.sleepを使う
await asyncio.sleep(3)
print(f"{datetime.utcnow()} [{threading.get_ident()}]{counter} async end")
②非同期になる例-2
# asyncを外す
def do_async(counter):
time.sleep(3)
print(f"{datetime.utcnow()} [{threading.get_ident()}]{counter} async end")
どうやらasyncを付けた関数の中でasyncっぽいことをしないのが原因なのでしょうかねえ…。
非同期にならない理由(推測)
FastAPIは裏側でstarletteというASGIサーバーを使っていて、backgrouondtaskはこのstarletteの機能をふんだんに使ったものとなっています。
なのでこちらを覗いてみてあげましょう。 そうすると、以下のようにasync/syncによって挙動が変わっています。
class BackgroundTask:
def __init__(
self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
) -> None:
self.func = func
self.args = args
self.kwargs = kwargs
self.is_async = asyncio.iscoroutinefunction(func)
async def __call__(self) -> None:
if self.is_async:
await self.func(*self.args, **self.kwargs)
else:
await run_in_threadpool(self.func, *self.args, **self.kwargs)
通常の関数の場合はthredpoolに入れるので明示的に非同期になるようです。 ただ、asyncの場合はただ実行するだけ。ううむ… なんでこれで非同期になるんだ…。
と、いうことでasyncの場合についてもう少し詳しく調査してみます。
③即座にレスポンスが返ってくる例
async def do_sync(counter):
await asyncio.sleep(0.00001)
time.sleep(3)
print(f"{datetime.utcnow()} [{threading.get_ident()}]{counter} sync end")
④3秒後にレスポンスが返ってくる例
async def do_sync(counter):
time.sleep(3)
await asyncio.sleep(0.00001)
print(f"{datetime.utcnow()} [{threading.get_ident()}]{counter} sync end")
眺めていると何となく条件が見えてきます。
- backgrouondtaskと通所の処理が全て完了(っぽい状態)になったらレスポンスを返却する仕組み
- awaitが実行されることをFastAPI(starlette)が監視
- awaitが実行されたタイミングでbackgrouondtaskのステータスを「仮完了」のようなものに変更
という流れなのかなあと推測できます。中々情報が見つからずに厄介でしたがおおよそこんな所でしょう…。
解決策
可能であればFastAPIやStarlette側の対応に期待したところですが、そうも言ってられない方たちも多いかと思います。
そのような場合はこの例のように、
async def do_sync(counter):
await asyncio.sleep(0.00001)
time.sleep(3)
print(f"{datetime.utcnow()} [{threading.get_ident()}]{counter} sync end")
backgrouondtaskに登録する処理の最初で
await asyncio.sleep(0.00001)
のような処理を追加することが簡易的な解決策となります。
バグなのか仕様なのかは分かりませんが、とにかく分かりずらいですね。。
参考
https://github.com/tiangolo/fastapi/issues/1313