db_kyc_project/batcher/app/src/db/pg/pg.py

41 lines
1.2 KiB
Python
Raw Normal View History

2024-12-14 03:08:46 +03:00
import asyncio
2024-12-13 16:45:21 +03:00
from app.src.config import PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB
2024-12-12 22:12:00 +03:00
from pathlib import Path
2024-12-13 16:45:21 +03:00
from starlette.requests import Request
2024-12-12 22:12:00 +03:00
import asyncpg
2024-12-14 03:08:46 +03:00
import logging
2024-12-12 22:12:00 +03:00
from asyncpg_trek import plan, execute, Direction
from asyncpg_trek.asyncpg import AsyncpgBackend
DB_URL = f'postgresql://{PG_USER}:{str(PG_PASSWORD)}@{PG_HOST}:{PG_PORT}/{PG_DB}'
2024-12-13 16:45:21 +03:00
MIGRATIONS_DIR = Path(__file__).parent.resolve() / "migrations"
2024-12-12 22:12:00 +03:00
2024-12-14 03:08:46 +03:00
logger = logging.getLogger("uvicorn")
2024-12-12 22:12:00 +03:00
2024-12-13 16:45:21 +03:00
async def connect_pg() -> asyncpg.Pool:
2024-12-14 03:08:46 +03:00
while True:
try:
logger.info(DB_URL)
pg_conn = await asyncpg.create_pool(DB_URL)
return pg_conn
except OSError:
logger.info("Postgres is unavailable - sleeping")
await asyncio.sleep(2)
2024-12-12 22:12:00 +03:00
2024-12-13 16:45:21 +03:00
async def get_pg(request: Request) -> asyncpg.Connection:
async with request.app.state.pg_pool.acquire() as conn:
2024-12-12 22:12:00 +03:00
yield conn
async def migrate(
2024-12-13 16:45:21 +03:00
target_revision: str,
2024-12-12 22:12:00 +03:00
) -> None:
2024-12-13 16:45:21 +03:00
pool = await connect_pg()
2024-12-12 22:12:00 +03:00
async with pool.acquire() as conn:
backend = AsyncpgBackend(conn)
2024-12-13 16:45:21 +03:00
planned = await plan(backend, MIGRATIONS_DIR, target_revision=target_revision, direction=Direction.up)
await execute(backend, planned)