diff --git a/.gitignore b/.gitignore index 7617c11..2b0c955 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ .DS_Store __pycache__/ *.py[cod] +celerybeat-schedule +backend/static diff --git a/backend/clicker/settings.py b/backend/clicker/settings.py index 6379648..ca18d57 100644 --- a/backend/clicker/settings.py +++ b/backend/clicker/settings.py @@ -25,7 +25,10 @@ SECRET_KEY = os.getenv('SECRET_KEY', 'django-insecure-4nww@d-th@7^(chggt5q+$e*d_ DEBUG = int(os.getenv('DEBUG', 0)) PROD = 1 - DEBUG -ALLOWED_HOSTS = ['crowngame.ru', 'backend', '127.0.0.1'] +ALLOWED_HOSTS = ['backend', '127.0.0.1'] +if app_url := os.getenv('APP_URL', None): + ALLOWED_HOSTS.append(app_url) + SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https') USE_X_FORWARDED_HOST = True USE_X_FORWARDED_PORT = True diff --git a/backend/misc/apps.py b/backend/misc/apps.py index 2528402..a9c3da5 100644 --- a/backend/misc/apps.py +++ b/backend/misc/apps.py @@ -6,9 +6,5 @@ class MiscConfig(AppConfig): name = "misc" def ready(self): - from .celery import deliver_setting as deliver_setting_celery from .signals import deliver_setting - from misc.models import Setting - for setting in Setting.objects.all(): - deliver_setting_celery.delay(setting.name) diff --git a/backend/misc/management/__init__.py b/backend/misc/management/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/misc/management/commands/__init__.py b/backend/misc/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/misc/management/commands/send_settings.py b/backend/misc/management/commands/send_settings.py new file mode 100644 index 0000000..ab49748 --- /dev/null +++ b/backend/misc/management/commands/send_settings.py @@ -0,0 +1,10 @@ +from django.core.management.base import BaseCommand, CommandError +from misc.celery import deliver_setting +from misc.models import Setting + +class Command(BaseCommand): + help = 'Sends all settings to rmq for batcher to consume' + + def handle(self, *args, **options): + for setting in Setting.objects.all(): + deliver_setting.delay(setting.name) diff --git a/backend/scripts/gunicorn.sh b/backend/scripts/gunicorn.sh index b8a9f9e..a48058a 100644 --- a/backend/scripts/gunicorn.sh +++ b/backend/scripts/gunicorn.sh @@ -5,5 +5,6 @@ set -o pipefail set -o nounset python manage.py migrate +python manage.py send_settings python manage.py collectstatic --noinput --verbosity 0 gunicorn clicker.wsgi -b 0.0.0.0:8000 -w 17 --timeout 600 --chdir=/app --access-logfile - diff --git a/backend/scripts/start.sh b/backend/scripts/start.sh index e8171a5..11001f4 100644 --- a/backend/scripts/start.sh +++ b/backend/scripts/start.sh @@ -6,5 +6,6 @@ set -o nounset set -o xtrace python manage.py migrate +python manage.py send_settings python manage.py collectstatic --noinput --verbosity 0 python manage.py runserver 0.0.0.0:8000 diff --git a/backend/users/authentication.py b/backend/users/authentication.py index 4d426b5..93eded2 100644 --- a/backend/users/authentication.py +++ b/backend/users/authentication.py @@ -33,7 +33,7 @@ class TelegramValidationAuthentication(authentication.BaseAuthentication): split_res = base64.b64decode(token).decode('utf-8').split(':') try: data_check_string = ':'.join(split_res[:-1]).strip().replace('/', '\\/') - hash = split_res[-1] + _hash = split_res[-1] except IndexError: raise exceptions.AuthenticationFailed('Invalid token format') secret = hmac.new( @@ -46,7 +46,7 @@ class TelegramValidationAuthentication(authentication.BaseAuthentication): msg=data_check_string.encode('utf-8'), digestmod=hashlib.sha256 ).hexdigest() - if hash != actual_hash: + if _hash != actual_hash: raise exceptions.AuthenticationFailed('Invalid token (hash check failed)') data_dict = dict([x.split('=') for x in data_check_string.split('\n')]) diff --git a/batcher/Dockerfile b/batcher/Dockerfile index 7432fed..7c5b6e8 100644 --- a/batcher/Dockerfile +++ b/batcher/Dockerfile @@ -8,4 +8,6 @@ RUN pip install --no-cache-dir --upgrade -r /batcher/requirements.txt COPY ./app /batcher/app -CMD ["fastapi", "run", "app/main.py", "--port", "$HTTP_PORT"] \ No newline at end of file +ENV PYTHONPATH="${PYTHONPATH}:/batcher/app" + +CMD uvicorn app.main:app --host 0.0.0.0 --port "${HTTP_PORT}" \ No newline at end of file diff --git a/batcher/Pipfile b/batcher/Pipfile deleted file mode 100644 index 645a67e..0000000 --- a/batcher/Pipfile +++ /dev/null @@ -1,11 +0,0 @@ -[[source]] -url = "https://pypi.org/simple" -verify_ssl = true -name = "pypi" - -[packages] - -[dev-packages] - -[requires] -python_version = "3.12" diff --git a/batcher/app/main.py b/batcher/app/main.py index 9e6f5bd..76f5593 100644 --- a/batcher/app/main.py +++ b/batcher/app/main.py @@ -1,9 +1,13 @@ +import aio_pika from fastapi import Depends, FastAPI, Request, Response from fastapi.middleware.cors import CORSMiddleware +from functools import partial from starlette.exceptions import HTTPException -from .src.routers.api import router as router_api -from .src.routers.handlers import http_error_handler +from app.src.routers.api import router as router_api +from app.src.routers.handlers import http_error_handler +from app.src.domain.setting import launch_consumer +from app.src.db import connect_pg, connect_redis, get_connection, get_channel, get_rmq def get_application() -> FastAPI: @@ -23,5 +27,23 @@ def get_application() -> FastAPI: return application +app = get_application() + +@app.on_event("startup") +async def startup(): + launch_consumer(get_connection) + + app.state.pg_pool = await connect_pg() + + app.state.redis_pool = connect_redis() + + rmq_conn_pool = aio_pika.pool.Pool(get_connection, max_size=2) + rmq_chan_pool = aio_pika.pool.Pool(partial(get_channel, conn_pool=rmq_conn_pool), max_size=10) + app.state.rmq_chan_pool = rmq_chan_pool + + +@app.on_event("shutdown") +async def shutdown(): + await app.state.pg_pool.close() + await app.state.redis.close() -app = get_application() \ No newline at end of file diff --git a/batcher/app/migrate.py b/batcher/app/migrate.py index 7643a71..b97289b 100644 --- a/batcher/app/migrate.py +++ b/batcher/app/migrate.py @@ -1,6 +1,6 @@ import sys import asyncio -from .src.db.pg import migrate +from app.src.db.pg import migrate if __name__ == '__main__': diff --git a/batcher/app/src/config.py b/batcher/app/src/config.py index dce7c4f..bb2ec0d 100644 --- a/batcher/app/src/config.py +++ b/batcher/app/src/config.py @@ -2,7 +2,7 @@ from starlette.config import Config from starlette.datastructures import Secret from functools import lru_cache -config = Config('.env') +config = Config() REDIS_USER = config('REDIS_USER') @@ -11,8 +11,6 @@ REDIS_PORT = config('REDIS_PORT', cast=int) REDIS_HOST = config('REDIS_HOST') REDIS_DB = config('REDIS_DB') -HTTP_PORT = config('HTTP_PORT', cast=int) - PG_HOST = config('POSTGRES_HOST') PG_PORT = config('POSTGRES_PORT', cast=int) PG_USER = config('POSTGRES_USER') @@ -22,7 +20,7 @@ PG_DB = config('POSTGRES_DB') RMQ_HOST = config('RABBITMQ_HOST') RMQ_PORT = config('RABBITMQ_PORT', cast=int) RMQ_USER = config('RABBITMQ_DEFAULT_USER') -RMQ_PASSWORD = config('RABBITMQ_DEFAULT_PASSWORD', cast=Secret) +RMQ_PASSWORD = config('RABBITMQ_DEFAULT_PASS', cast=Secret) TG_TOKEN = config('TG_TOKEN', cast=Secret) diff --git a/batcher/app/src/db/__init__.py b/batcher/app/src/db/__init__.py index ac05ff3..ce5d7cb 100644 --- a/batcher/app/src/db/__init__.py +++ b/batcher/app/src/db/__init__.py @@ -1,3 +1,3 @@ -from .pg import get_pg -from .redis import get_redis -from .rmq import get_rmq \ No newline at end of file +from .pg import get_pg, connect_pg +from .redis import get_redis, connect_redis +from .rmq import get_rmq, get_channel, get_connection \ No newline at end of file diff --git a/batcher/app/src/db/pg/__init__.py b/batcher/app/src/db/pg/__init__.py index 1e976f8..54858c6 100644 --- a/batcher/app/src/db/pg/__init__.py +++ b/batcher/app/src/db/pg/__init__.py @@ -1 +1 @@ -from .pg import get_pg, migrate +from .pg import get_pg, migrate, connect_pg diff --git a/batcher/app/src/db/pg/migrations/20241023_init_down_zero.sql b/batcher/app/src/db/pg/migrations/20241023_first_down_initial.sql similarity index 100% rename from batcher/app/src/db/pg/migrations/20241023_init_down_zero.sql rename to batcher/app/src/db/pg/migrations/20241023_first_down_initial.sql diff --git a/batcher/app/src/db/pg/migrations/20241023_zero_up_init.sql b/batcher/app/src/db/pg/migrations/20241023_initial_up_first.sql similarity index 100% rename from batcher/app/src/db/pg/migrations/20241023_zero_up_init.sql rename to batcher/app/src/db/pg/migrations/20241023_initial_up_first.sql diff --git a/batcher/app/src/db/pg/pg.py b/batcher/app/src/db/pg/pg.py index f998230..757fdc2 100644 --- a/batcher/app/src/db/pg/pg.py +++ b/batcher/app/src/db/pg/pg.py @@ -1,32 +1,29 @@ -from batcher.app.src.config import PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB +from app.src.config import PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB from pathlib import Path -import asyncio +from starlette.requests import Request import asyncpg from asyncpg_trek import plan, execute, Direction from asyncpg_trek.asyncpg import AsyncpgBackend DB_URL = f'postgresql://{PG_USER}:{str(PG_PASSWORD)}@{PG_HOST}:{PG_PORT}/{PG_DB}' -MIGRATIONS_DIR = Path(__file__) / "migrations" +MIGRATIONS_DIR = Path(__file__).parent.resolve() / "migrations" -async def connect_db() -> asyncpg.Pool: +async def connect_pg() -> asyncpg.Pool: return await asyncpg.create_pool(DB_URL) -pool = asyncio.run(connect_db()) - - -async def get_pg() -> asyncpg.Connection: - async with pool.acquire() as conn: +async def get_pg(request: Request) -> asyncpg.Connection: + async with request.app.state.pg_pool.acquire() as conn: yield conn async def migrate( - target_revision: str, + target_revision: str, ) -> None: + pool = await connect_pg() async with pool.acquire() as conn: backend = AsyncpgBackend(conn) - async with backend.connect() as conn: - planned = await plan(conn, backend, MIGRATIONS_DIR, target_revision=target_revision, direction=Direction.up) - await execute(conn, backend, planned) + planned = await plan(backend, MIGRATIONS_DIR, target_revision=target_revision, direction=Direction.up) + await execute(backend, planned) diff --git a/batcher/app/src/db/redis.py b/batcher/app/src/db/redis.py index 4430e3f..10a0577 100644 --- a/batcher/app/src/db/redis.py +++ b/batcher/app/src/db/redis.py @@ -1,11 +1,14 @@ -import asyncio +from starlette.requests import Request import redis.asyncio as redis from ..config import REDIS_HOST, REDIS_PORT, REDIS_USER, REDIS_PASSWORD, REDIS_DB -r = asyncio.run(redis.Redis(host=REDIS_HOST, port=REDIS_PORT, username=REDIS_USER, password=REDIS_PASSWORD, db=REDIS_DB)) +def connect_redis() -> redis.ConnectionPool: + return redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT, username=REDIS_USER, password=str(REDIS_PASSWORD), db=REDIS_DB) -def get_redis() -> redis.Redis: +async def get_redis(request: Request) -> redis.Redis: + r = redis.Redis(connection_pool=request.app.state.redis_pool) yield r + await r.aclose() diff --git a/batcher/app/src/db/rmq.py b/batcher/app/src/db/rmq.py index eac9504..4b86267 100644 --- a/batcher/app/src/db/rmq.py +++ b/batcher/app/src/db/rmq.py @@ -1,26 +1,28 @@ -import aio_pika -from aio_pika.abc import AbstractRobustConnection import asyncio +import aio_pika +from starlette.requests import Request +from aio_pika.abc import AbstractRobustConnection from ..config import RMQ_HOST, RMQ_PORT, RMQ_USER, RMQ_PASSWORD +fqdn = f'amqp://{RMQ_USER}:{str(RMQ_PASSWORD)}@{RMQ_HOST}:{RMQ_PORT}/' + async def get_connection() -> AbstractRobustConnection: - return await aio_pika.connect_robust(f'amqp://{RMQ_USER}:{RMQ_PASSWORD}@{RMQ_HOST}:{RMQ_PORT}/') + while True: + try: + conn = await aio_pika.connect_robust(fqdn) + return conn + except ConnectionError: + await asyncio.sleep(2) -conn_pool = aio_pika.pool.Pool(get_connection, max_size=2) - - -async def get_channel() -> aio_pika.Channel: +async def get_channel(conn_pool: AbstractRobustConnection) -> aio_pika.Channel: async with conn_pool.acquire() as connection: return await connection.channel() -chan_pool = aio_pika.pool.Pool(get_channel, max_size=10) - - -async def get_rmq() -> aio_pika.Channel: - async with chan_pool.acquire() as chan: +async def get_rmq(request: Request) -> aio_pika.Channel: + async with request.app.state.rmq_chan_pool.acquire() as chan: yield chan diff --git a/batcher/app/src/dependencies.py b/batcher/app/src/dependencies.py index faee168..b1356ff 100644 --- a/batcher/app/src/dependencies.py +++ b/batcher/app/src/dependencies.py @@ -25,7 +25,7 @@ async def get_token_header(authorization: str = Header()) -> (int, str): raise HTTPException(status_code=403, detail='Unauthorized') secret = hmac.new( 'WebAppData'.encode(), - TG_TOKEN.encode('utf-8'), + str(TG_TOKEN).encode('utf-8'), digestmod=hashlib.sha256 ).digest() actual_hash = hmac.new( @@ -33,7 +33,7 @@ async def get_token_header(authorization: str = Header()) -> (int, str): msg=data_check_string.encode('utf-8'), digestmod=hashlib.sha256 ).hexdigest() - if hash != actual_hash: + if _hash != actual_hash: raise HTTPException(status_code=403, detail='Unauthorized') data_dict = dict([x.split('=') for x in data_check_string.split('\n')]) diff --git a/batcher/app/src/domain/click/models.py b/batcher/app/src/domain/click/models.py index 300b121..bc2c90c 100644 --- a/batcher/app/src/domain/click/models.py +++ b/batcher/app/src/domain/click/models.py @@ -4,6 +4,6 @@ import pydantic class Click(pydantic.BaseModel): - UserID: int - DateTime: datetime.datetime - Value: decimal.Decimal + userId: int + dateTime: datetime.datetime + value: decimal.Decimal diff --git a/batcher/app/src/domain/click/repos/pg.py b/batcher/app/src/domain/click/repos/pg.py index b1ad83c..0c1ae28 100644 --- a/batcher/app/src/domain/click/repos/pg.py +++ b/batcher/app/src/domain/click/repos/pg.py @@ -33,13 +33,13 @@ async def store(conn: Connection, click: Click) -> int: RETURNING id ; ''' - return await conn.fetchval(query, click.UserID, click.DateTime, click.Value) + return await conn.fetchval(query, click.userId, click.dateTime, click.value) async def bulk_store_copy(conn: Connection, click: Click, count: int) -> None: - args = [(click.UserID, click.DateTime. click.Value) for _ in range(count)] + args = [(click.userId, click.dateTime, click.value) for _ in range(count)] query = ''' - INSERT INTO clicks(user_id, time, values, expiry_info) + INSERT INTO clicks(user_id, time, value, expiry_info) VALUES($1, $2, $3, '{"period_24": false, "period_168": false}') ; ''' diff --git a/batcher/app/src/domain/click/repos/redis.py b/batcher/app/src/domain/click/repos/redis.py index 5a64365..68b67c3 100644 --- a/batcher/app/src/domain/click/repos/redis.py +++ b/batcher/app/src/domain/click/repos/redis.py @@ -5,10 +5,10 @@ import redis.asyncio as redis async def get_period_sum(r: redis.Redis, user_id: int, period: int) -> decimal.Decimal: - sum_str = await r.get(f'period_{period}_user_{user_id}') - if sum_str is None: + sum_bytes = await r.get(f'period_{period}_user_{user_id}') + if sum_bytes is None: return decimal.Decimal(0) - return decimal.Decimal(sum_str) + return decimal.Decimal(sum_bytes.decode()) async def incr_period_sum(r: redis.Redis, user_id: int, _period: int, value: decimal.Decimal) -> decimal.Decimal: @@ -16,10 +16,10 @@ async def incr_period_sum(r: redis.Redis, user_id: int, _period: int, value: dec async def get_max_period_sum(r: redis.Redis, _period: int) -> decimal.Decimal: - max_sum_str = await r.get(f'max_period_{_period}') - if max_sum_str is None: + max_sum_bytes = await r.get(f'max_period_{_period}') + if max_sum_bytes is None: return decimal.Decimal(0) - return decimal.Decimal(max_sum_str) + return decimal.Decimal(max_sum_bytes.decode()) async def compare_max_period_sum(r: redis.Redis, _period: int, _sum: decimal.Decimal) -> None: @@ -64,10 +64,10 @@ async def decr_energy(r: redis.Redis, user_id: int, amount: int) -> (int, int): async def get_global_average(r: redis.Redis) -> decimal.Decimal: - avg_str = await r.get('global_average') - if avg_str is None: + avg_bytes = await r.get('global_average') + if avg_bytes is None: return decimal.Decimal(0) - return decimal.Decimal(avg_str) + return decimal.Decimal(avg_bytes.decode()) async def update_global_average(r: redis.Redis, value_to_add: decimal.Decimal) -> decimal.Decimal: @@ -75,14 +75,14 @@ async def update_global_average(r: redis.Redis, value_to_add: decimal.Decimal) - local delta = tonumber(ARGV[1]) / tonumber(redis.call('GET', KEYS[1])) return redis.call('INCRBYFLOAT', KEYS[2], delta) ''') - return decimal.Decimal(await _script(keys=["user_count", "global_average"], args=[float(value_to_add)])) + return decimal.Decimal((await _script(keys=["user_count", "global_average"], args=[float(value_to_add)])).decode()) async def get_user_total(r: redis.Redis, user_id: int) -> decimal.Decimal: - total_str = await r.get(f'total_{user_id}') - if total_str is None: + total_bytes = await r.get(f'total_{user_id}') + if total_bytes is None: return decimal.Decimal(0) - return decimal.Decimal(total_str) + return decimal.Decimal(total_bytes.decode()) async def incr_user_count_if_no_clicks(r: redis.Redis, user_id: int) -> int: @@ -111,7 +111,10 @@ async def incr_user_total(r: redis.Redis, user_id: int, value: decimal.Decimal) async def get_user_session(r: redis.Redis, user_id: int) -> Optional[str]: - return await r.get(f'session_{user_id}') + session_bytes = await r.get(f'session_{user_id}') + if session_bytes is None: + return None + return session_bytes.decode() async def set_user_session(r: redis.Redis, user_id: int, token: str) -> None: diff --git a/batcher/app/src/domain/click/repos/rmq.py b/batcher/app/src/domain/click/repos/rmq.py index a016a0d..598bc16 100644 --- a/batcher/app/src/domain/click/repos/rmq.py +++ b/batcher/app/src/domain/click/repos/rmq.py @@ -1,6 +1,7 @@ import json import aio_pika import uuid +from datetime import datetime from ..models import Click @@ -9,14 +10,28 @@ CELERY_QUEUE_NAME = "celery" CLICK_TASK_NAME = "clicks.celery.click.handle_click" -def send_click_batch_copy(chan: aio_pika.Channel, click: Click, count: int): +async def send_click_batch_copy(chan: aio_pika.Channel, click: Click, count: int): + args = (click.userId, int(click.dateTime.timestamp() * 1e3), str(click.value), count) await chan.default_exchange.publish( - message=aio_pika.Message(json.dumps({ - 'id': str(uuid.uuid4()), - 'task': CLICK_TASK_NAME, - 'args': [click.UserID, int(click.DateTime.timestamp() * 1e3), str(click.Value), count], - 'kwargs': dict(), - }).encode('utf-8')), + message=aio_pika.Message( + body=json.dumps([ + args, + {}, + {"callbacks": None, "errbacks": None, "chain": None, "chord": None}, + ]).encode('utf-8'), + headers={ + 'task': CLICK_TASK_NAME, + 'lang': 'py', + 'argsrepr': repr(args), + 'kwargsrepr': '{}', + 'id': str(uuid.uuid4()), + 'eta': datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f'), + 'retries': 0, + }, + content_type='application/json', + content_encoding='utf-8', + ), routing_key=CELERY_QUEUE_NAME, mandatory=False, + immediate=False, ) diff --git a/batcher/app/src/domain/click/usecase.py b/batcher/app/src/domain/click/usecase.py index 2158dc8..ae65cf7 100644 --- a/batcher/app/src/domain/click/usecase.py +++ b/batcher/app/src/domain/click/usecase.py @@ -6,7 +6,7 @@ import redis.asyncio as redis import aio_pika import asyncpg -from batcher.app.src.domain.setting import get_setting +from app.src.domain.setting import get_setting from .repos.redis import ( get_period_sum, incr_period_sum, get_max_period_sum, get_user_total, get_global_average, incr_user_count_if_no_clicks, update_global_average, incr_user_total, compare_max_period_sum, @@ -35,17 +35,16 @@ async def add_click_batch_copy(r: redis.Redis, pg: asyncpg.Connection, rmq: aio await compare_max_period_sum(r, period, new_period_sum) click = Click( - UserID=user_id, - DateTime=datetime.now(), - Value=_click_value, - + userId=user_id, + dateTime=datetime.now(), + value=_click_value, ) # insert click await bulk_store_copy(pg, click, count) # send click to backend - send_click_batch_copy(rmq, click, count) + await send_click_batch_copy(rmq, click, count) return click diff --git a/batcher/app/src/domain/setting/repos/rmq.py b/batcher/app/src/domain/setting/repos/rmq.py index 52d9de8..bbdd515 100644 --- a/batcher/app/src/domain/setting/repos/rmq.py +++ b/batcher/app/src/domain/setting/repos/rmq.py @@ -1,15 +1,12 @@ import decimal import json - import aio_pika from typing import Callable SETTING_QUEUE_NAME = "settings" -SETTING_TASK_NAME = "misc.celery.deliver_setting.deliver_setting" - -async def consume_setting_updates(update_setting_func: Callable[[str, decimal.Decimal], None], chan: aio_pika.Channel): +async def consume_setting_updates(set_setting_func: Callable[[str, decimal.Decimal], None], chan: aio_pika.abc.AbstractChannel): queue = await chan.get_queue(SETTING_QUEUE_NAME) async with queue.iterator() as queue_iter: @@ -17,4 +14,4 @@ async def consume_setting_updates(update_setting_func: Callable[[str, decimal.De async with msg.process(): settings = json.loads(msg.body.decode('utf-8')) for name, value in settings.items(): - update_setting_func(name, value) + set_setting_func(name, decimal.Decimal(value)) diff --git a/batcher/app/src/domain/setting/usecase.py b/batcher/app/src/domain/setting/usecase.py index 11865a4..babb8a1 100644 --- a/batcher/app/src/domain/setting/usecase.py +++ b/batcher/app/src/domain/setting/usecase.py @@ -1,16 +1,24 @@ import decimal import threading - +import asyncio +from collections.abc import Callable, Awaitable import aio_pika -from .repos.in_memory_storage import get_setting as ims_get_setting +from .repos.in_memory_storage import set_setting, get_setting as ims_get_setting from .repos.rmq import consume_setting_updates + def get_setting(name: str) -> decimal.Decimal: return ims_get_setting(name) +async def start_thread(rmq_connect_func: Callable[[], Awaitable[aio_pika.abc.AbstractRobustConnection]], *args): + conn = await rmq_connect_func() + async with conn: + chan = await conn.channel() + await consume_setting_updates(set_setting, chan) -def launch_consumer(rmq: aio_pika.Connection): - t = threading.Thread(target=consume_setting_updates, args=(ims_get_setting, rmq)) + +def launch_consumer(rmq_connect_func: Callable[[], Awaitable[aio_pika.abc.AbstractRobustConnection]]): + t = threading.Thread(target=asyncio.run, args=(start_thread(rmq_connect_func),)) t.start() diff --git a/batcher/app/src/routers/click.py b/batcher/app/src/routers/click.py index 65a2412..9d66857 100644 --- a/batcher/app/src/routers/click.py +++ b/batcher/app/src/routers/click.py @@ -2,7 +2,7 @@ import aio_pika import asyncpg import redis from fastapi import APIRouter, Depends, HTTPException -from typing import Annotated +from typing import Annotated, Tuple from ..domain.click import ( ClickResponse, BatchClickRequest, EnergyResponse, ClickValueResponse, add_click_batch_copy, check_registration, check_energy, get_energy, click_value, delete_user_info @@ -22,9 +22,9 @@ router = APIRouter( @router.post("/batch-click/", response_model=ClickResponse, status_code=200) -async def batch_click(req: BatchClickRequest, auth_info: Annotated[(int, str), Depends(get_token_header)], pg: Annotated[asyncpg.Connection, Depends(get_pg)], r: Annotated[redis.Redis, Depends(get_redis)], rmq: Annotated[aio_pika.Channel, Depends(get_rmq)]): +async def batch_click(req: BatchClickRequest, auth_info: Annotated[Tuple[int, str], Depends(get_token_header)], pg: Annotated[asyncpg.Connection, Depends(get_pg)], r: Annotated[redis.Redis, Depends(get_redis)], rmq: Annotated[aio_pika.Channel, Depends(get_rmq)]): user_id, token = auth_info - if not check_registration(r, user_id, token, BACKEND_URL): + if not await check_registration(r, user_id, token, BACKEND_URL): raise HTTPException(status_code=403, detail='Unauthorized') _energy, spent = await check_energy(r, user_id, req.count, token) @@ -39,9 +39,9 @@ async def batch_click(req: BatchClickRequest, auth_info: Annotated[(int, str), D @router.get("/energy", response_model=EnergyResponse, status_code=200) -async def energy(auth_info: Annotated[(int, str), Depends(get_token_header)], r: Annotated[redis.Redis, Depends(get_redis)]): +async def energy(auth_info: Annotated[Tuple[int, str], Depends(get_token_header)], r: Annotated[redis.Redis, Depends(get_redis)]): user_id, token = auth_info - if not check_registration(r, user_id, token, BACKEND_URL): + if not await check_registration(r, user_id, token, BACKEND_URL): raise HTTPException(status_code=403, detail='Unauthorized') _energy = await get_energy(r, user_id, token) @@ -51,9 +51,9 @@ async def energy(auth_info: Annotated[(int, str), Depends(get_token_header)], r: @router.get('/coefficient', response_model=ClickValueResponse, status_code=200) -async def coefficient(auth_info: Annotated[(int, str), Depends(get_token_header)], r: Annotated[redis.Redis, Depends(get_redis)], pg: Annotated[asyncpg.Connection, Depends(get_pg)]): +async def coefficient(auth_info: Annotated[Tuple[int, str], Depends(get_token_header)], r: Annotated[redis.Redis, Depends(get_redis)], pg: Annotated[asyncpg.Connection, Depends(get_pg)]): user_id, token = auth_info - if not check_registration(r, user_id, token, BACKEND_URL): + if not await check_registration(r, user_id, token, BACKEND_URL): raise HTTPException(status_code=403, detail='Unauthorized') value = await click_value(r, pg, user_id) @@ -63,9 +63,9 @@ async def coefficient(auth_info: Annotated[(int, str), Depends(get_token_header) @router.delete('/internal/user', status_code=204) -async def delete_user(auth_info: Annotated[(int, str), Depends(get_token_header())], r: Annotated[redis.Redis, Depends(get_redis)], pg: Annotated[asyncpg.Connection, Depends(get_pg)]): +async def delete_user(auth_info: Annotated[Tuple[int, str], Depends(get_token_header)], r: Annotated[redis.Redis, Depends(get_redis)], pg: Annotated[asyncpg.Connection, Depends(get_pg)]): user_id, token = auth_info - if not check_registration(r, user_id, token, BACKEND_URL): + if not await check_registration(r, user_id, token, BACKEND_URL): raise HTTPException(status_code=403, detail='Unauthorized') await delete_user_info(r, pg, user_id) \ No newline at end of file diff --git a/batcher/migrate.sh b/batcher/migrate.sh old mode 100644 new mode 100755 diff --git a/batcher/requirements.txt b/batcher/requirements.txt index 84debec..9849d19 100644 --- a/batcher/requirements.txt +++ b/batcher/requirements.txt @@ -27,5 +27,6 @@ starlette==0.40.0 typing_extensions==4.12.2 tzdata==2024.2 urllib3==2.2.3 +uvicorn==0.32.1 vine==5.1.0 yarl==1.15.5 diff --git a/bot/.DS_Store b/bot/.DS_Store deleted file mode 100644 index c7b068c..0000000 Binary files a/bot/.DS_Store and /dev/null differ diff --git a/bot/__pycache__/create_bot.cpython-312.pyc b/bot/__pycache__/create_bot.cpython-312.pyc deleted file mode 100644 index 54b70b4..0000000 Binary files a/bot/__pycache__/create_bot.cpython-312.pyc and /dev/null differ diff --git a/bot/__pycache__/memcached_def.cpython-312.pyc b/bot/__pycache__/memcached_def.cpython-312.pyc deleted file mode 100644 index 7f834b7..0000000 Binary files a/bot/__pycache__/memcached_def.cpython-312.pyc and /dev/null differ diff --git a/bot/__pycache__/messages.cpython-312.pyc b/bot/__pycache__/messages.cpython-312.pyc deleted file mode 100644 index 3ff6d4d..0000000 Binary files a/bot/__pycache__/messages.cpython-312.pyc and /dev/null differ diff --git a/bot/__pycache__/req.cpython-312.pyc b/bot/__pycache__/req.cpython-312.pyc deleted file mode 100644 index 767513f..0000000 Binary files a/bot/__pycache__/req.cpython-312.pyc and /dev/null differ diff --git a/bot/__pycache__/wrapper.cpython-312.pyc b/bot/__pycache__/wrapper.cpython-312.pyc deleted file mode 100644 index ee3a9b6..0000000 Binary files a/bot/__pycache__/wrapper.cpython-312.pyc and /dev/null differ diff --git a/bot/create_bot.py b/bot/create_bot.py index a17ec01..a0f79fd 100644 --- a/bot/create_bot.py +++ b/bot/create_bot.py @@ -5,7 +5,7 @@ token = os.getenv('TG_TOKEN', '7748003961:AAEIXu8NFICPabNaQP5JQ3AcY79nZdUbKdI') api_token = os.getenv('API_TOKEN', 'b43fa8ccea5b6dd5e889a8ad3890ce14ce36a8bc') # TODO: remove backend_url = os.getenv('BACKEND_URL', 'http://backend:8000') request_url = f'{backend_url}/api' -url = os.getenv('URL', 'https://google.com') +url = os.getenv('APP_URL', 'https://google.com') bot_name = os.getenv('BOT_NAME', 'https://t.me/danyadjan_test_bot') bucket_name = 'brawny-basket' diff --git a/bot/handlers/__pycache__/__init__.cpython-312.pyc b/bot/handlers/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index f349995..0000000 Binary files a/bot/handlers/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/bot/handlers/__pycache__/instruction.cpython-312.pyc b/bot/handlers/__pycache__/instruction.cpython-312.pyc deleted file mode 100644 index 3970864..0000000 Binary files a/bot/handlers/__pycache__/instruction.cpython-312.pyc and /dev/null differ diff --git a/bot/handlers/__pycache__/register_handlers.cpython-312.pyc b/bot/handlers/__pycache__/register_handlers.cpython-312.pyc deleted file mode 100644 index efb8663..0000000 Binary files a/bot/handlers/__pycache__/register_handlers.cpython-312.pyc and /dev/null differ diff --git a/bot/handlers/__pycache__/start_handler.cpython-312.pyc b/bot/handlers/__pycache__/start_handler.cpython-312.pyc deleted file mode 100644 index 88a2c46..0000000 Binary files a/bot/handlers/__pycache__/start_handler.cpython-312.pyc and /dev/null differ diff --git a/bot/pictures/.DS_Store b/bot/pictures/.DS_Store deleted file mode 100644 index 5008ddf..0000000 Binary files a/bot/pictures/.DS_Store and /dev/null differ diff --git a/docker-compose-prod.yml b/docker-compose-prod.yml index a0f36bf..be01d25 100644 --- a/docker-compose-prod.yml +++ b/docker-compose-prod.yml @@ -1,5 +1,3 @@ -version: '3.9' - volumes: db_data: {} batcher_db_data: {} diff --git a/docker-compose.yml b/docker-compose.yml index 22561f0..fbf7993 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3.9' - volumes: db_data: {} batcher_db_data: {} diff --git a/frontend/.DS_Store b/frontend/.DS_Store deleted file mode 100644 index 9a9ce6d..0000000 Binary files a/frontend/.DS_Store and /dev/null differ diff --git a/frontend/public/.DS_Store b/frontend/public/.DS_Store deleted file mode 100644 index 0331f6a..0000000 Binary files a/frontend/public/.DS_Store and /dev/null differ diff --git a/frontend/public/assets/.DS_Store b/frontend/public/assets/.DS_Store deleted file mode 100644 index 5008ddf..0000000 Binary files a/frontend/public/assets/.DS_Store and /dev/null differ diff --git a/frontend/src/.DS_Store b/frontend/src/.DS_Store deleted file mode 100644 index 18ab8a0..0000000 Binary files a/frontend/src/.DS_Store and /dev/null differ diff --git a/frontend/src/assets/.DS_Store b/frontend/src/assets/.DS_Store deleted file mode 100644 index 5008ddf..0000000 Binary files a/frontend/src/assets/.DS_Store and /dev/null differ diff --git a/frontend/src/shared/Pages/StoragePage/StoragePage.tsx b/frontend/src/shared/Pages/StoragePage/StoragePage.tsx index e4c1d11..ae0b3ae 100644 --- a/frontend/src/shared/Pages/StoragePage/StoragePage.tsx +++ b/frontend/src/shared/Pages/StoragePage/StoragePage.tsx @@ -14,7 +14,7 @@ import { useNavigate } from 'react-router-dom'; export function StoragePage() { const userId = useAppSelector(state => state.userTg.id); const [page, setPage] = useState('storage'); - const refLink = `https://t.me/sapphirecrown_bot?start=user_${userId}`; + const refLink = `https://t.me/kyc_clicker_bot?start=user_${userId}`; const [showNotif, setShow] = useState(false); const navigate = useNavigate(); diff --git a/frontend/src/shared/Pages/WrongSourcePage/WrongSourcePage.tsx b/frontend/src/shared/Pages/WrongSourcePage/WrongSourcePage.tsx index 2f6737d..4842093 100644 --- a/frontend/src/shared/Pages/WrongSourcePage/WrongSourcePage.tsx +++ b/frontend/src/shared/Pages/WrongSourcePage/WrongSourcePage.tsx @@ -11,7 +11,7 @@ export function WrongSourcePage() {

Похоже вы вошли не по той ссылке...

-
); diff --git a/nginx/nginx.conf b/nginx/nginx.conf index f3a20bb..0bac9ec 100644 --- a/nginx/nginx.conf +++ b/nginx/nginx.conf @@ -42,32 +42,32 @@ http { access_log /var/log/nginx/access.log upstreamlog; error_log /var/log/nginx/error.log; listen 80; - listen 443 ssl http2; +; listen 443 ssl http2; charset utf-8; - server_name crowngame.ru www.crowngame.ru; +; server_name kyc_clicker.ru www.kyc_clicker.ru; root /dist/; index index.html; - ssl_certificate /etc/letsencrypt/live/crowngame.ru/fullchain.pem; - ssl_certificate_key /etc/letsencrypt/live/crowngame.ru/privkey.pem; +; ssl_certificate /etc/letsencrypt/live/kyc_clicker.ru/fullchain.pem; +; ssl_certificate_key /etc/letsencrypt/live/kyc_clicker.ru/privkey.pem; - include /etc/letsencrypt/options-ssl-nginx.conf; - ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem; +; include /etc/letsencrypt/options-ssl-nginx.conf; +; ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem; - if ($server_port = 80) { - set $https_redirect 1; - } - if ($host ~ '^www\.') { - set $https_redirect 1; - } - if ($https_redirect = 1) { - return 301 https://crowngame.ru$request_uri; - } +; if ($server_port = 80) { +; set $https_redirect 1; +; } +; if ($host ~ '^www\.') { +; set $https_redirect 1; +; } +; if ($https_redirect = 1) { +; return 301 https://crowngame.ru$request_uri; +; } - location /.well-known/acme-challenge/ { - root /var/www/certbot; - } +; location /.well-known/acme-challenge/ { +; root /var/www/certbot; +; } # frontend location / {