Further PEP 8 tidying of server

Continuation of 9514740fc9.
This commit is contained in:
Spotlight 2024-09-23 12:02:14 -05:00
parent 4c1d0db359
commit b18e3650e8
No known key found for this signature in database
GPG Key ID: 874AA355B3209BDC

View File

@ -9,7 +9,6 @@ from sqlalchemy import select, update, insert, delete
from database import start_db_time, get_db_url, Friend, DiscordFriends, Discord, Config from database import start_db_time, get_db_url, Friend, DiscordFriends, Discord, Config
sys.path.append('../') sys.path.append('../')
from api import *
from api.love2 import * from api.love2 import *
from api.private import CLIENT_ID, CLIENT_SECRET, HOST from api.private import CLIENT_ID, CLIENT_SECRET, HOST
from api.public import pretendoBotFC, nintendoBotFC from api.public import pretendoBotFC, nintendoBotFC
@ -22,9 +21,9 @@ app.config["SQLALCHEMY_DATABASE_URI"] = get_db_url()
db = SQLAlchemy() db = SQLAlchemy()
db.init_app(app) db.init_app(app)
limiter = Limiter(app, key_func = lambda : request.access_route[-1]) limiter = Limiter(app, key_func=lambda: request.access_route[-1])
API_ENDPOINT:str = 'https://discord.com/api/v10' API_ENDPOINT: str = 'https://discord.com/api/v10'
local = False local = False
port = 2277 port = 2277
@ -35,65 +34,73 @@ frontend_uptime = datetime.datetime.now()
start_db_time(None, NetworkType.NINTENDO) start_db_time(None, NetworkType.NINTENDO)
start_db_time(None, NetworkType.PRETENDO) start_db_time(None, NetworkType.PRETENDO)
@app.errorhandler(404) @app.errorhandler(404)
def handler404(e): def handler404(e):
return render_template('dist/404.html') return render_template('dist/404.html')
disableBackendWarnings = False
disable_backend_warnings = False
try: try:
if sys.argv[1] == 'ignoreBackend' and local: if sys.argv[1] == 'ignoreBackend' and local:
disableBackendWarnings = True disable_backend_warnings = True
except:pass except:
pass
if local: if local:
HOST = 'http://localhost:2277' HOST = 'http://localhost:2277'
# Limiter limits # Limiter limits
userPresenceLimit = '3/minute' user_presence_limit = '3/minute'
newUserLimit = '2/minute' new_user_limit = '2/minute'
cdnLimit = '60/minute' cdn_limit = '60/minute'
togglerLimit = '5/minute' toggler_limit = '5/minute'
# Database files # Database files
titleDatabase = [] title_database = []
titlesToUID = [] titles_to_uid = []
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
# Create title cache # Create title cache
def cacheTitles(): def cache_titles():
global titleDatabase, titlesToUID global title_database, titles_to_uid
# Pull databases # Pull databases
databasePath = './cache/' database_path = './cache/'
if not os.path.exists(databasePath): if not os.path.exists(database_path):
os.mkdir(databasePath) os.mkdir(database_path)
databasePath = os.path.join(databasePath, 'databases.dat') database_path = os.path.join(database_path, 'databases.dat')
if os.path.isfile(databasePath): if os.path.isfile(database_path):
with open(databasePath, 'rb') as file: with open(database_path, 'rb') as file:
t = pickle.loads(file.read()) t = pickle.loads(file.read())
titleDatabase = t[0] title_database = t[0]
titlesToUID = t[1] titles_to_uid = t[1]
else: else:
titleDatabase = [] title_database = []
titlesToUID = [] titles_to_uid = []
bar = ProgressBar() # Create progress bar # Create progress bar
bar = ProgressBar()
for region in ['US', 'JP', 'GB', 'KR', 'TW']: for region in ['US', 'JP', 'GB', 'KR', 'TW']:
titleDatabase.append( title_database.append(
xmltodict.parse(requests.get('https://samurai.ctr.shop.nintendo.net/samurai/ws/%s/titles?shop_id=1&limit=5000&offset=0' % region, verify = False).text) xmltodict.parse(requests.get('https://samurai.ctr.shop.nintendo.net/samurai/ws/%s/titles?shop_id=1&limit=5000&offset=0' % region, verify = False).text)
) )
bar.update(.5 / 5) # Update progress bar
titlesToUID += requests.get('https://raw.githubusercontent.com/hax0kartik/3dsdb/master/jsons/list_%s.json' % region).json() # Update progress bar as database requests complete
bar.update(.5 / 5) # Update progress bar bar.update(.5 / 5)
titles_to_uid += requests.get('https://raw.githubusercontent.com/hax0kartik/3dsdb/master/jsons/list_%s.json' % region).json()
bar.update(.5 / 5)
bar.end() # End the progress bar bar.end() # End the progress bar
# Save databases to file # Save databases to file
with open(databasePath, 'wb') as file: with open(database_path, 'wb') as file:
file.write(pickle.dumps( file.write(pickle.dumps(
(titleDatabase, (title_database,
titlesToUID) titles_to_uid)
)) ))
print('[Saved database to file]') print('[Saved database to file]')
@ -139,7 +146,7 @@ def create_user(friend_code: int, network: NetworkType, add_new_instance: bool):
db.session.commit() db.session.commit()
def fetchBearerToken(code:str): def fetch_bearer_token(code: str):
data = { data = {
'client_id': '%s' % CLIENT_ID, 'client_id': '%s' % CLIENT_ID,
'client_secret': '%s' % CLIENT_SECRET, 'client_secret': '%s' % CLIENT_SECRET,
@ -150,13 +157,13 @@ def fetchBearerToken(code:str):
headers = { headers = {
'Content-Type': 'application/x-www-form-urlencoded', 'Content-Type': 'application/x-www-form-urlencoded',
} }
r = requests.post('%s/oauth2/token' % API_ENDPOINT, data = data, headers = headers) r = requests.post('%s/oauth2/token' % API_ENDPOINT, data=data, headers=headers)
r.raise_for_status() r.raise_for_status()
return r.json() return r.json()
def refreshBearer(token:str): def refresh_bearer(token: str):
user = userFromToken(token) user = user_from_token(token)
data = { data = {
'client_id': '%s' % CLIENT_ID, 'client_id': '%s' % CLIENT_ID,
'client_secret': '%s' % CLIENT_SECRET, 'client_secret': '%s' % CLIENT_SECRET,
@ -166,19 +173,19 @@ def refreshBearer(token:str):
headers = { headers = {
'Content-Type': 'application/x-www-form-urlencoded', 'Content-Type': 'application/x-www-form-urlencoded',
} }
r = requests.post('%s/oauth2/token' % API_ENDPOINT, data = data, headers = headers) r = requests.post('%s/oauth2/token' % API_ENDPOINT, data=data, headers=headers)
r.raise_for_status() r.raise_for_status()
token, user, pfp = createDiscordUser('', r.json()) token, user, pfp = create_discord_user('', r.json())
return token, user, pfp return token, user, pfp
def tokenFromID(ID:int) -> str: def token_from_id(discord_id: int) -> str:
stmt = select(Discord).where(Discord.id == ID) stmt = select(Discord).where(Discord.id == discord_id)
result = db.session.scalar(stmt) result = db.session.scalar(stmt)
return result.site_session_token return result.site_session_token
def userFromToken(token: str) -> Discord: def user_from_token(token: str) -> Discord:
stmt = select(Discord).where(Discord.site_session_token == token) stmt = select(Discord).where(Discord.site_session_token == token)
result = db.session.scalar(stmt) result = db.session.scalar(stmt)
if not result: if not result:
@ -186,13 +193,13 @@ def userFromToken(token: str) -> Discord:
return result return result
def createDiscordUser(code:str, response:dict = None): def create_discord_user(code: str, response: dict = None):
if not response: if not response:
response = fetchBearerToken(code) response = fetch_bearer_token(code)
headers = { headers = {
'Authorization': 'Bearer %s' % response['access_token'], 'Authorization': 'Bearer %s' % response['access_token'],
} }
new = requests.get('https://discord.com/api/users/@me', headers = headers) new = requests.get('https://discord.com/api/users/@me', headers=headers)
user = new.json() user = new.json()
token = secrets.token_hex(20) token = secrets.token_hex(20)
try: try:
@ -200,7 +207,7 @@ def createDiscordUser(code:str, response:dict = None):
select(Discord) select(Discord)
.where(Discord.id == user['id']) .where(Discord.id == user['id'])
) )
if already_exist_check != None: if already_exist_check:
raise Exception('UNIQUE constraint failed: discord.id') raise Exception('UNIQUE constraint failed: discord.id')
db.session.add(Discord( db.session.add(Discord(
id=user['id'], id=user['id'],
@ -214,29 +221,39 @@ def createDiscordUser(code:str, response:dict = None):
db.session.commit() db.session.commit()
except Exception as e: except Exception as e:
if 'UNIQUE constraint failed' in str(e): if 'UNIQUE constraint failed' in str(e):
old_token = tokenFromID(user['id']) old_token = token_from_id(user['id'])
discord_user = userFromToken(old_token) discord_user = user_from_token(old_token)
discord_user.refresh_token = response['refresh_token'] discord_user.refresh_token = response['refresh_token']
discord_user.bearer_token = response['access_token'] discord_user.bearer_token = response['access_token']
discord_user.generation_date = time.time() discord_user.generation_date = time.time()
discord_user.site_session_token = token discord_user.site_session_token = token
db.session.commit() db.session.commit()
return token, user['username'], ('https://cdn.discordapp.com/avatars/%s/%s.%s' % (user['id'], user['avatar'], 'gif' if user['avatar'].startswith('a_') else 'png') if user['avatar'] else '')
if user['avatar']:
if user['avatar'].startswith('a_'):
avatar_extension = 'gif'
else:
avatar_extension = 'png'
avatar_url = 'https://cdn.discordapp.com/avatars/%s/%s.%s' % (user['id'], user['avatar'], avatar_extension)
else:
avatar_url = ''
return token, user['username'], avatar_url
def deleteDiscordUser(ID:int): def delete_discord_user(discord_id: int):
db.session.delete(db.session.get(Discord, ID)) db.session.delete(db.session.get(Discord, discord_id))
db.session.delete(db.session.get(DiscordFriends, ID)) db.session.delete(db.session.get(DiscordFriends, discord_id))
db.session.commit() db.session.commit()
def getConnectedConsoles(ID:int): def get_connected_consoles(discord_id: int):
stmt = select(DiscordFriends).where(DiscordFriends.id == ID) stmt = select(DiscordFriends).where(DiscordFriends.id == discord_id)
result = db.session.scalars(stmt).all() result = db.session.scalars(stmt).all()
return [ (result.friend_code, result.active, result.network) for result in result ] return [(result.friend_code, result.active, result.network) for result in result]
def sidenav(): def sidenav():
@ -273,10 +290,10 @@ def sidenav():
return data return data
def userAgentCheck(): def user_agent_check():
userAgent = request.headers['User-Agent'] user_agent = request.headers['User-Agent']
try: try:
if float(userAgent.replace(agent, '')) != version: if float(user_agent.replace(agent, '')) != version:
raise Exception('client is not v%s' % version) raise Exception('client is not v%s' % version)
except: except:
raise Exception('this client is invalid') raise Exception('this client is invalid')
@ -286,14 +303,14 @@ def get_presence(friend_code: int, network: NetworkType, is_api: bool):
try: try:
if is_api: if is_api:
# First, run 3DS-RPC client checks. # First, run 3DS-RPC client checks.
userAgentCheck() user_agent_check()
# Create a user for this friend code, or update its last access date. # Create a user for this friend code, or update its last access date.
# TODO(spotlightishere): This should be restructured! # TODO(spotlightishere): This should be restructured!
create_user(friend_code, network, False) create_user(friend_code, network, False)
network_start_time = db.session.get(Config, network).backend_uptime network_start_time = db.session.get(Config, network).backend_uptime
if network_start_time is None and not disableBackendWarnings: if network_start_time is None and not disable_backend_warnings:
raise Exception('Backend currently offline. please try again later') raise Exception('Backend currently offline. please try again later')
friend_code = str(friend_code).zfill(12) friend_code = str(friend_code).zfill(12)
@ -313,7 +330,7 @@ def get_presence(friend_code: int, network: NetworkType, is_api: bool):
'updateID': result.upd_id, 'updateID': result.upd_id,
'joinable': result.joinable, 'joinable': result.joinable,
'gameDescription': result.game_description, 'gameDescription': result.game_description,
'game': getTitle(result.title_id, titlesToUID, titleDatabase), 'game': getTitle(result.title_id, titles_to_uid, title_database),
'disclaimer': 'all information regarding the title (User/Presence/game) is downloaded from Nintendo APIs', 'disclaimer': 'all information regarding the title (User/Presence/game) is downloaded from Nintendo APIs',
} }
else: else:
@ -348,6 +365,7 @@ def get_presence(friend_code: int, network: NetworkType, is_api: bool):
# NON-API ROUTES # # NON-API ROUTES #
################## ##################
# Index page # Index page
@app.route('/') @app.route('/')
def index(): def index():
@ -360,14 +378,14 @@ def index():
results = db.session.scalars(stmt).all() results = db.session.scalars(stmt).all()
num = len(results) num = len(results)
data = sidenav() data = sidenav()
data['active'] = [ ({ data['active'] = [({
'mii':MiiData().mii_studio_url(user.mii), 'mii': MiiData().mii_studio_url(user.mii),
'username':user.username, 'username': user.username,
'game': getTitle(user.title_id, titlesToUID, titleDatabase), 'game': getTitle(user.title_id, titles_to_uid, title_database),
'friendCode': user.friend_code.zfill(12), 'friendCode': user.friend_code.zfill(12),
'joinable': user.joinable, 'joinable': user.joinable,
'network': user.network.lower_name(), 'network': user.network.lower_name(),
}) for user in results if user.username ] }) for user in results if user.username]
data['active'] = data['active'][:2] data['active'] = data['active'][:2]
stmt = ( stmt = (
@ -378,25 +396,25 @@ def index():
) )
results = db.session.scalars(stmt).all() results = db.session.scalars(stmt).all()
data['new'] = [ ({ data['new'] = [({
'mii':MiiData().mii_studio_url(user.mii), 'mii': MiiData().mii_studio_url(user.mii),
'username': user.username, 'username': user.username,
'game': getTitle(user.title_id, titlesToUID, titleDatabase) if user.online and user.title_id != 0 else '', 'game': getTitle(user.title_id, titles_to_uid, title_database) if user.online and user.title_id != 0 else '',
'friendCode': user.friend_code.zfill(12), 'friendCode': user.friend_code.zfill(12),
'joinable': user.joinable, 'joinable': user.joinable,
'network': user.network.lower_name(), 'network': user.network.lower_name(),
}) for user in results if user.username ] }) for user in results if user.username]
data['new'] = data['new'][:2] data['new'] = data['new'][:2]
data['num'] = num data['num'] = num
response = make_response(render_template('dist/index.html', data = data)) response = make_response(render_template('dist/index.html', data=data))
return response return response
# Index page # Index page
@app.route('/index.html') @app.route('/index.html')
def indexHTML(): def index_html():
return index() return index()
@ -416,26 +434,29 @@ def settings():
} }
data = sidenav() data = sidenav()
try: try:
stmt = select(Discord).where(Discord.site_session_token == request.cookies['token']) stmt = (
select(Discord)
.where(Discord.site_session_token == request.cookies['token'])
)
result = db.session.scalar(stmt) result = db.session.scalar(stmt)
except Exception as e: except Exception as e:
if 'invalid token' in str(e): if 'invalid token' in str(e):
response = make_response(redirect('/')) response = make_response(redirect('/'))
response.set_cookie('token', '', expires = 0) response.set_cookie('token', '', expires=0)
response.set_cookie('user', '', expires = 0) response.set_cookie('user', '', expires=0)
response.set_cookie('pfp', '', expires = 0) response.set_cookie('pfp', '', expires=0)
return response return response
return redirect('/') return redirect('/')
data['profileButton'] = result.show_profile_button data['profileButton'] = result.show_profile_button
data['smallImage'] = result.show_small_image data['smallImage'] = result.show_small_image
response = make_response(render_template('dist/settings.html', data = data)) response = make_response(render_template('dist/settings.html', data=data))
return response return response
@app.route('/settings.html') @app.route('/settings.html')
def settingsRedirect(): def settings_redirect():
return redirect('/settings') return redirect('/settings')
@ -453,18 +474,19 @@ def roster():
data = sidenav() data = sidenav()
data['title'] = 'New Users' data['title'] = 'New Users'
data['users'] = [ ({ data['users'] = [({
'mii':MiiData().mii_studio_url(user.mii), 'mii': MiiData().mii_studio_url(user.mii),
'username':user.username, 'username': user.username,
'game': getTitle(user.title_id, titlesToUID, titleDatabase), 'game': getTitle(user.title_id, titles_to_uid, title_database),
'friendCode': user.friend_code.zfill(12), 'friendCode': user.friend_code.zfill(12),
'joinable': user.joinable, 'joinable': user.joinable,
'network': user.network.lower_name(), 'network': user.network.lower_name(),
}) for user in results if user.username ] }) for user in results if user.username]
response = make_response(render_template('dist/users.html', data = data)) response = make_response(render_template('dist/users.html', data=data))
return response return response
# Active page # Active page
@app.route('/active') @app.route('/active')
def active(): def active():
@ -479,22 +501,22 @@ def active():
data = sidenav() data = sidenav()
data['title'] = 'Active Users' data['title'] = 'Active Users'
data['users'] = [ ({ data['users'] = [({
'mii':MiiData().mii_studio_url(user.mii), 'mii': MiiData().mii_studio_url(user.mii),
'username':user.username, 'username': user.username,
'game': getTitle(user.title_id, titlesToUID, titleDatabase), 'game': getTitle(user.title_id, titles_to_uid, title_database),
'friendCode': user.friend_code.zfill(12), 'friendCode': user.friend_code.zfill(12),
'joinable': user.joinable, 'joinable': user.joinable,
'network': user.network.lower_name(), 'network': user.network.lower_name(),
}) for user in results if user.username ] }) for user in results if user.username]
response = make_response(render_template('dist/users.html', data = data)) response = make_response(render_template('dist/users.html', data=data))
return response return response
# Register page # Register page
@app.route('/register.html') @app.route('/register.html')
def register(): def register():
network = request.args.get('network') network = request.args.get('network')
if network is None: if network is None:
return make_response(render_template('dist/registerselectnetwork.html')) return make_response(render_template('dist/registerselectnetwork.html'))
@ -509,25 +531,30 @@ def register():
except: except:
return make_response(render_template('dist/registerselectnetwork.html')) return make_response(render_template('dist/registerselectnetwork.html'))
# Register page redirect # Register page redirect
@app.route('/register') @app.route('/register')
def registerPage(): def register_page():
return register() return register()
# Connection page # Connection page
@app.route('/connect') @app.route('/connect')
def connect(): def connect():
return render_template('dist/connect.html', data = {'local':local}) return render_template('dist/connect.html', data={'local': local})
@app.route('/discord') @app.route('/discord')
def discordConnect(): def discord_connect():
return redirect('/connect') return redirect('/connect')
# Failure page # Failure page
@app.route('/failure.html') @app.route('/failure.html')
def failure(): def failure():
return render_template('dist/failure.html') return render_template('dist/failure.html')
# Success page # Success page
@app.route('/success.html') @app.route('/success.html')
def success(): def success():
@ -536,7 +563,8 @@ def success():
'fc': request.args.get('fc'), 'fc': request.args.get('fc'),
'network': request.args.get('network') 'network': request.args.get('network')
} }
return render_template('dist/success.html', data = data) return render_template('dist/success.html', data=data)
# Consoles page # Consoles page
@app.route('/consoles') @app.route('/consoles')
@ -547,16 +575,16 @@ def consoles():
'consoles': [], 'consoles': [],
} }
try: try:
id = userFromToken(request.cookies['token']).id discord_id = user_from_token(request.cookies['token']).discord_id
except Exception as e: except Exception as e:
if 'invalid token' in str(e): if 'invalid token' in str(e):
response = make_response(redirect('/')) response = make_response(redirect('/'))
response.set_cookie('token', '', expires = 0) response.set_cookie('token', '', expires=0)
response.set_cookie('user', '', expires = 0) response.set_cookie('user', '', expires=0)
response.set_cookie('pfp', '', expires = 0) response.set_cookie('pfp', '', expires=0)
return response return response
return redirect('/') return redirect('/')
for console, active, network_type in getConnectedConsoles(id): for console, active, network_type in get_connected_consoles(discord_id):
network = NetworkType(network_type) network = NetworkType(network_type)
stmt = ( stmt = (
select(Friend) select(Friend)
@ -571,44 +599,47 @@ def consoles():
'network': network.lower_name() 'network': network.lower_name()
}) })
data.update(sidenav()) data.update(sidenav())
response = render_template('dist/consoles.html', data = data) response = render_template('dist/consoles.html', data=data)
return response return response
@app.route('/user/<string:friendCode>/')
def userPage(friendCode:str): @app.route('/user/<string:friend_code>/')
def user_page(friend_code: str):
network: NetworkType network: NetworkType
try: try:
network = nameToNetworkType(request.args.get('network')) network = nameToNetworkType(request.args.get('network'))
friend_code_int = int(friendCode.replace('-', '')) friend_code_int = int(friend_code.replace('-', ''))
userData = get_presence(friend_code_int, network, False) user_data = get_presence(friend_code_int, network, False)
if userData['Exception'] or not userData['User']['username']: if user_data['Exception'] or not user_data['User']['username']:
raise Exception(userData['Exception']) raise Exception(user_data['Exception'])
except: except:
return render_template('dist/404.html') return render_template('dist/404.html')
if not userData['User']['online'] or not userData['User']['Presence']: if not user_data['User']['online'] or not user_data['User']['Presence']:
userData['User']['Presence']['game'] = None user_data['User']['Presence']['game'] = None
userData['User']['favoriteGame'] = getTitle(userData['User']['favoriteGame'], titlesToUID, titleDatabase) user_data['User']['favoriteGame'] = getTitle(user_data['User']['favoriteGame'], titles_to_uid, title_database)
userData['User']['network'] = network.lower_name() user_data['User']['network'] = network.lower_name()
if userData['User']['favoriteGame']['name'] == 'Home Screen': if user_data['User']['favoriteGame']['name'] == 'Home Screen':
userData['User']['favoriteGame'] = None user_data['User']['favoriteGame'] = None
for i in ('accountCreation','lastAccessed','lastOnline'): for i in ('accountCreation', 'lastAccessed', 'lastOnline'):
if userData['User'][i] == 0: if user_data['User'][i] == 0:
userData['User'][i] = 'Never' user_data['User'][i] = 'Never'
elif time.time() - userData['User'][i] > 86400: elif time.time() - user_data['User'][i] > 86400:
userData['User'][i] = datetime.datetime.fromtimestamp(userData['User'][i]).strftime('%b %d, %Y') user_data['User'][i] = datetime.datetime.fromtimestamp(user_data['User'][i]).strftime('%b %d, %Y')
elif time.time() - userData['User'][i] > 600: elif time.time() - user_data['User'][i] > 600:
s = str(datetime.timedelta(seconds = int(time.time() - userData['User'][i]))).split(':') s = str(datetime.timedelta(seconds=int(time.time() - user_data['User'][i]))).split(':')
userData['User'][i] = s[0] + 'h, ' + s[1] + 'm, ' + s[2] + 's ago' user_data['User'][i] = s[0] + 'h, ' + s[1] + 'm, ' + s[2] + 's ago'
else: else:
userData['User'][i] = 'Just now' user_data['User'][i] = 'Just now'
#print(userData) # COMMENT/DELETE THIS BEFORE COMMITTING # COMMENT/DELETE THIS BEFORE COMMITTING
userData.update(sidenav()) # print(user_data)
response = make_response(render_template('dist/user.html', data = userData)) user_data.update(sidenav())
response = make_response(render_template('dist/user.html', data = user_data))
return response return response
@app.route('/terms') @app.route('/terms')
def terms(): def terms():
return redirect('https://github.com/MCMi460/3DS-RPC/blob/main/TERMS.md') return redirect('https://github.com/MCMi460/3DS-RPC/blob/main/TERMS.md')
@ -617,13 +648,14 @@ def terms():
# API ROUTES # # API ROUTES #
############## ##############
# Create entry in database with friendCode # Create entry in database with friendCode
@app.route('/api/user/create/<int:friendCode>/', methods=['POST']) @app.route('/api/user/create/<int:friend_code>/', methods=['POST'])
@limiter.limit(newUserLimit) @limiter.limit(new_user_limit)
def newUser(friendCode:int, network:int=-1, userCheck:bool = True): def new_user(friend_code: int, network: int = -1, user_check: bool = True):
try: try:
if userCheck: if user_check:
userAgentCheck() user_agent_check()
if network == -1: if network == -1:
network = NetworkType.NINTENDO network = NetworkType.NINTENDO
@ -632,7 +664,7 @@ def newUser(friendCode:int, network:int=-1, userCheck:bool = True):
network = nameToNetworkType(request_arg) network = nameToNetworkType(request_arg)
except: except:
pass pass
create_user(friendCode, network, True) create_user(friend_code, network, True)
return { return {
'Exception': False, 'Exception': False,
} }
@ -646,8 +678,8 @@ def newUser(friendCode:int, network:int=-1, userCheck:bool = True):
# Grab presence from friendCode # Grab presence from friendCode
@app.route('/api/user/<int:friend_code>/', methods=['GET']) @app.route('/api/user/<int:friend_code>/', methods=['GET'])
@limiter.limit(userPresenceLimit) @limiter.limit(user_presence_limit)
def userPresence(friend_code: int): def user_presence(friend_code: int):
# Check if a specific network is being specified as a query parameter. # Check if a specific network is being specified as a query parameter.
network_name = request.args.get('network') network_name = request.args.get('network')
if network_name: if network_name:
@ -659,14 +691,14 @@ def userPresence(friend_code: int):
# Toggle # Toggle
@app.route('/api/toggle/<int:friendCode>/', methods=['POST']) @app.route('/api/toggle/<int:friend_code>/', methods=['POST'])
@limiter.limit(togglerLimit) @limiter.limit(toggler_limit)
def toggler(friendCode:int): def toggler(friend_code: int):
network = NetworkType.NINTENDO network = NetworkType.NINTENDO
if request.data.decode('utf-8').split(',')[2] != None: if request.data.decode('utf-8').split(',')[2]:
network = nameToNetworkType(request.data.decode('utf-8').split(',')[2]) network = nameToNetworkType(request.data.decode('utf-8').split(',')[2])
try: try:
fc = str(principal_id_to_friend_code(friend_code_to_principal_id(friendCode))).zfill(12) fc = str(principal_id_to_friend_code(friend_code_to_principal_id(friend_code))).zfill(12)
except: except:
return 'failure!\nthat is not a real friendCode!' return 'failure!\nthat is not a real friendCode!'
stmt = ( stmt = (
@ -681,17 +713,17 @@ def toggler(friendCode:int):
f = request.data.decode('utf-8').split(',') f = request.data.decode('utf-8').split(',')
token = f[0] token = f[0]
active = bool(int(f[1])) active = bool(int(f[1]))
id = userFromToken(token).id discord_id = user_from_token(token).id
stmt = ( stmt = (
select(DiscordFriends) select(DiscordFriends)
.where(DiscordFriends.id == id) .where(DiscordFriends.id == discord_id)
.where(DiscordFriends.friend_code == fc) .where(DiscordFriends.friend_code == fc)
.where(DiscordFriends.network == network) .where(DiscordFriends.network == network)
) )
result = db.session.scalar(stmt) result = db.session.scalar(stmt)
if not result: if not result:
stmt = select(DiscordFriends).where(DiscordFriends.id == id) stmt = select(DiscordFriends).where(DiscordFriends.id == discord_id)
allFriends = db.session.scalars(stmt).all() allFriends = db.session.scalars(stmt).all()
if len(allFriends) >= 10: if len(allFriends) >= 10:
return 'failure!\nyou can\'t have more than ten consoles added at one time!' return 'failure!\nyou can\'t have more than ten consoles added at one time!'
@ -700,7 +732,7 @@ def toggler(friendCode:int):
db.session.execute( db.session.execute(
update(DiscordFriends) update(DiscordFriends)
.where(DiscordFriends.active) .where(DiscordFriends.active)
.where(DiscordFriends.id == id) .where(DiscordFriends.id == discord_id)
.values(active = False) .values(active = False)
) )
if result: if result:
@ -708,32 +740,32 @@ def toggler(friendCode:int):
update(DiscordFriends) update(DiscordFriends)
.where(DiscordFriends.friend_code == fc) .where(DiscordFriends.friend_code == fc)
.where(DiscordFriends.network == network) .where(DiscordFriends.network == network)
.where(DiscordFriends.id == id) .where(DiscordFriends.id == discord_id)
.values(active=active) .values(active=active)
) )
else: else:
db.session.execute( db.session.execute(
insert(DiscordFriends) insert(DiscordFriends)
.values(id=id, friend_code=fc, active=active, network=network) .values(id=discord_id, friend_code=fc, active=active, network=network)
) )
db.session.commit() db.session.commit()
return 'success!' return 'success!'
# Delete # Delete
@app.route('/api/delete/<int:friendCode>/', methods=['POST']) @app.route('/api/delete/<int:friend_code>/', methods=['POST'])
@limiter.limit(togglerLimit) @limiter.limit(toggler_limit)
def deleter(friendCode:int): def deleter(friend_code: int):
fc = str(principal_id_to_friend_code(friend_code_to_principal_id(friendCode))).zfill(12) fc = str(principal_id_to_friend_code(friend_code_to_principal_id(friend_code))).zfill(12)
if not ',' in request.data.decode('utf-8'): # Old API compatiblity. In the future this should be depercated. if not ',' in request.data.decode('utf-8'): # Old API compatiblity. In the future this should be depercated.
token = request.data.decode('utf-8') token = request.data.decode('utf-8')
id = userFromToken(token).id discord_id = user_from_token(token).id
db.session.execute( db.session.execute(
delete(DiscordFriends) delete(DiscordFriends)
.where(DiscordFriends.friend_code == fc) .where(DiscordFriends.friend_code == fc)
.where(DiscordFriends.network == NetworkType.NINTENDO) .where(DiscordFriends.network == NetworkType.NINTENDO)
.where(DiscordFriends.id == id) .where(DiscordFriends.id == discord_id)
) )
db.session.commit() db.session.commit()
@ -742,13 +774,13 @@ def deleter(friendCode:int):
data = request.data.decode('utf-8').split(',') data = request.data.decode('utf-8').split(',')
token = data[0] token = data[0]
network = nameToNetworkType(data[1]) network = nameToNetworkType(data[1])
id = userFromToken(token).id discord_id = user_from_token(token).id
db.session.execute( db.session.execute(
delete(DiscordFriends) delete(DiscordFriends)
.where(DiscordFriends.friend_code == fc) .where(DiscordFriends.friend_code == fc)
.where(DiscordFriends.network == network) .where(DiscordFriends.network == network)
.where(DiscordFriends.id == id) .where(DiscordFriends.id == discord_id)
) )
db.session.commit() db.session.commit()
return 'success!' return 'success!'
@ -756,8 +788,8 @@ def deleter(friendCode:int):
# Toggle one # Toggle one
@app.route('/api/settings/<string:which>/', methods=['POST']) @app.route('/api/settings/<string:which>/', methods=['POST'])
@limiter.limit(togglerLimit) @limiter.limit(toggler_limit)
def settingsToggler(which:str): def settings_toggler(which: str):
toggle = bool(int(request.data.decode('utf-8'))) toggle = bool(int(request.data.decode('utf-8')))
if not which in ('smallImage', 'profileButton'): if not which in ('smallImage', 'profileButton'):
return 'failure!' return 'failure!'
@ -780,24 +812,24 @@ def settingsToggler(which:str):
# Make Nintendo's cert a 'secure' cert # Make Nintendo's cert a 'secure' cert
@app.route('/cdn/i/<string:file>/', methods=['GET']) @app.route('/cdn/i/<string:file>/', methods=['GET'])
@limiter.limit(cdnLimit) @limiter.limit(cdn_limit)
def cdnImage(file:str): def cdn_image(file: str):
response = make_response(requests.get('https://kanzashi-ctr.cdn.nintendo.net/i/%s' % file, verify = False).content) response = make_response(requests.get('https://kanzashi-ctr.cdn.nintendo.net/i/%s' % file, verify=False).content)
response.headers['Content-Type'] = 'image/jpeg' response.headers['Content-Type'] = 'image/jpeg'
return response return response
# Local image cache # Local image cache
@app.route('/cdn/l/<string:file>/', methods=['GET']) @app.route('/cdn/l/<string:file>/', methods=['GET'])
@limiter.limit(cdnLimit) @limiter.limit(cdn_limit)
def localImageCdn(file:str): def local_image_cdn(file: str):
file = hex(int(file, 16)).replace('0x', '').zfill(16).upper() file = hex(int(file, 16)).replace('0x', '').zfill(16).upper()
return send_file('cache/' + file + '.png') return send_file('cache/' + file + '.png')
# Login route # Login route
@app.route('/login', methods=['POST']) @app.route('/login', methods=['POST'])
@limiter.limit(newUserLimit) @limiter.limit(new_user_limit)
def login(): def login():
try: try:
fc = str(principal_id_to_friend_code(friend_code_to_principal_id(request.form['fc']))).zfill(12) fc = str(principal_id_to_friend_code(friend_code_to_principal_id(request.form['fc']))).zfill(12)
@ -805,7 +837,7 @@ def login():
network = NetworkType.NINTENDO network = NetworkType.NINTENDO
else: else:
network = NetworkType(int(request.form['network'])) network = NetworkType(int(request.form['network']))
newUser(fc, network, False) new_user(fc, network, False)
except: except:
return redirect('/failure.html') return redirect('/failure.html')
return redirect(f'/success.html?fc={fc}&network={network.lower_name()}') return redirect(f'/success.html?fc={fc}&network={network.lower_name()}')
@ -813,15 +845,15 @@ def login():
# Discord route # Discord route
@app.route('/authorize') @app.route('/authorize')
@limiter.limit(newUserLimit) @limiter.limit(new_user_limit)
def authorize(): def authorize():
if not request.args.get('code'): if not request.args.get('code'):
return render_template('dist/404.html') return render_template('dist/404.html')
token, user, pfp = createDiscordUser(request.args['code']) token, user, pfp = create_discord_user(request.args['code'])
response = make_response(redirect('/consoles')) response = make_response(redirect('/consoles'))
response.set_cookie('token', token, expires = datetime.datetime.now() + datetime.timedelta(days = 30)) response.set_cookie('token', token, expires=datetime.datetime.now() + datetime.timedelta(days=30))
response.set_cookie('user', user, expires = datetime.datetime.now() + datetime.timedelta(days = 30)) response.set_cookie('user', user, expires=datetime.datetime.now() + datetime.timedelta(days=30))
response.set_cookie('pfp', pfp, expires = datetime.datetime.now() + datetime.timedelta(days = 30)) response.set_cookie('pfp', pfp, expires=datetime.datetime.now() + datetime.timedelta(days=30))
return response return response
@ -829,21 +861,21 @@ def authorize():
def refresh(): def refresh():
if local: if local:
try: try:
token, user, pfp = refreshBearer(request.cookies['token']) token, user, pfp = refresh_bearer(request.cookies['token'])
response = make_response(redirect('/consoles')) response = make_response(redirect('/consoles'))
response.set_cookie('token', token, expires = datetime.datetime.now() + datetime.timedelta(days = 30)) response.set_cookie('token', token, expires=datetime.datetime.now() + datetime.timedelta(days=30))
response.set_cookie('user', user, expires = datetime.datetime.now() + datetime.timedelta(days = 30)) response.set_cookie('user', user, expires=datetime.datetime.now() + datetime.timedelta(days=30))
response.set_cookie('pfp', pfp, expires = datetime.datetime.now() + datetime.timedelta(days = 30)) response.set_cookie('pfp', pfp, expires=datetime.datetime.now() + datetime.timedelta(days=30))
return response return response
except: except:
deleteDiscordUser(userFromToken(request.cookies['token']).id) delete_discord_user(user_from_token(request.cookies['token']).id)
return redirect('/404.html') return redirect('/404.html')
if __name__ == '__main__': if __name__ == '__main__':
cacheTitles() cache_titles()
if local: if local:
app.run(host = '0.0.0.0', port = port) app.run(host='0.0.0.0', port=port)
else: else:
import gevent.pywsgi import gevent.pywsgi
server = gevent.pywsgi.WSGIServer(('0.0.0.0', port), app) server = gevent.pywsgi.WSGIServer(('0.0.0.0', port), app)