87 lines
3.2 KiB
Python
87 lines
3.2 KiB
Python
import time
|
|
import hmac
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
from django.conf import settings
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from django.contrib.auth.models import User
|
|
from rest_framework import authentication, exceptions
|
|
from users.models import TGUser
|
|
|
|
|
|
def validate_referred_by_id(referred_by_id):
|
|
if not referred_by_id:
|
|
return None
|
|
if not referred_by_id.isdigit():
|
|
return None
|
|
referred_by_id = int(referred_by_id)
|
|
return referred_by_id if TGUser.objects.filter(pk=referred_by_id).exists() else None
|
|
|
|
|
|
class TelegramValidationAuthentication(authentication.BaseAuthentication):
|
|
def authenticate(self, request):
|
|
token = request.META.get('HTTP_AUTHORIZATION', '')
|
|
if not token:
|
|
return None, None
|
|
|
|
if not token.startswith('TelegramToken '):
|
|
return None, None
|
|
|
|
token = ' '.join(token.split()[1:])
|
|
|
|
split_res = base64.b64decode(token).decode('utf-8').split(':')
|
|
try:
|
|
data_check_string = ':'.join(split_res[:-1]).strip().replace('/', '\\/')
|
|
hash = split_res[-1]
|
|
except IndexError:
|
|
raise exceptions.AuthenticationFailed('Invalid token format')
|
|
secret = hmac.new(
|
|
'WebAppData'.encode(),
|
|
settings.TG_TOKEN.encode('utf-8'),
|
|
digestmod=hashlib.sha256
|
|
).digest()
|
|
actual_hash = hmac.new(
|
|
secret,
|
|
msg=data_check_string.encode('utf-8'),
|
|
digestmod=hashlib.sha256
|
|
).hexdigest()
|
|
if hash != actual_hash:
|
|
raise exceptions.AuthenticationFailed('Invalid token (hash check failed)')
|
|
|
|
data_dict = dict([x.split('=') for x in data_check_string.split('\n')])
|
|
try:
|
|
auth_date = int(data_dict['auth_date'])
|
|
except KeyError:
|
|
raise exceptions.AuthenticationFailed('Invalid token (auth_date not found)')
|
|
except ValueError:
|
|
raise exceptions.AuthenticationFailed('Invalid token (auth_date is not an int)')
|
|
|
|
if auth_date + 60 * 30 < int(time.time()):
|
|
raise exceptions.AuthenticationFailed('Token expired')
|
|
|
|
user_info = json.loads(data_dict['user'])
|
|
try:
|
|
tg_user = TGUser.objects.get(pk=user_info['id'])
|
|
if tg_user.is_blocked:
|
|
raise exceptions.PermissionDenied('Пользователь заблокирован')
|
|
except ObjectDoesNotExist:
|
|
username = user_info.get('username', f'user-{user_info["id"]}')
|
|
pass_data = f'{username} ({user_info["id"]})'.encode()
|
|
referred_by_id = validate_referred_by_id(request.query_params.get('referred_by', None))
|
|
if (user_qs := User.objects.filter(username=username)).exists():
|
|
user = user_qs.first()
|
|
else:
|
|
user = User.objects.create_user(
|
|
username=username,
|
|
password=hashlib.md5(pass_data).hexdigest()
|
|
)
|
|
tg_user = TGUser.objects.create(
|
|
user=user,
|
|
tg_id=user_info['id'],
|
|
username=username,
|
|
referred_by_id=referred_by_id,
|
|
)
|
|
|
|
return tg_user.user, token
|