from flask import Flask, Blueprint, jsonify, request, abort, render_template, flash, session, redirect, url_for from flask_babel import Babel, _, refresh from flask_compress import Compress from dotenv import load_dotenv from functools import wraps from werkzeug.utils import secure_filename from pathlib import Path import unicodedata import threading import requests import secrets import shutil import zipfile import tarfile import psycopg import urllib import functions as func import os import json import constants as const # used for admin routes logged_in = False load_dotenv() app = Flask(const.appName) app.secret_key = os.environ.get("APP_SECRET") cfg = func.loadJSON(const.configFile) app.config.from_mapping(cfg) app.config.update(cfg) # compress to improve page load speed Compress(app) app.config['BABEL_DEFAULT_LOCALE'] = 'en' app.config['BABEL_TRANSLATION_DIRECTORIES'] = 'locales' # refreshing locale refresh() # update this once more languages are supported app.config['available_languages'] = { "en_US": _("English (US)"), "ru_RU": _("Russian") } def getLocale(): if not session.get('language'): app.config.update(cfg) session['language'] = cfg['languages']['default'] return session.get('language') babel = Babel(app, locale_selector=getLocale) # -- blueprints -- api_bp = Blueprint('api', const.appName) admin_bp = Blueprint('admin', const.appName) # -- cli commands -- @app.cli.command("init-db") def initDatabase(): dbName = os.environ.get("DB_NAME") print("Attempting to connect to database {dbName}...") try: conn = func.connectToDb() cursor = conn.cursor() print("Connected successfully") conn.database = dbName except psycopg.Error as error: app.logger.error("Database error:", error) # if error.errno == errorcode.ER_ACCESS_DENIED_ERROR: # print("Bad credentials") # elif error.errno == errorcode.ER_BAD_DB_ERROR: # dbPort = os.environ.get("DB_PORT") # if not dbPort: # dbPort = 3306 # # conn = func.connectToDb() # cursor = conn.cursor() # func.createDatabase(cursor, dbName) # conn.database = dbName # else: # print("Error:", error) # return with open('schema.sql', 'r') as schema_file: schema = schema_file.read() try: cursor.execute(schema) conn.commit() print(f"Database {dbName} was successfully initialized!") except psycopg.Error as error: print(f"Failed to initialize database {dbName}: {error}") finally: cursor.close() conn.close() # -- decorators -- def loginRequired(f): @wraps(f) def login_required(*args, **kwargs): if not logged_in: return abort(404) return f(*args, **kwargs) return login_required # -- before request -- @app.before_request def before_request(): # app.logger.debug("[CatAsk] app.before_request triggered") global logged_in logged_in = session.get('logged_in') # -- context processors -- @app.context_processor def inject_stuff(): app.logger.debug("[CatAsk] app.context_processor inject_stuff() triggered") cfg = func.loadJSON(const.configFile) # for 1.6.0 or later unreadQuestionCount = int(getQuestionCount(False, True)) totalQuestionCount = int(getQuestionCount()) return dict(metadata=func.generateMetadata(), len=len, str=str, const=const, cfg=cfg, logged_in=logged_in, version_id=const.version_id, version=const.version, appName=const.appName, unreadQuestionCount=unreadQuestionCount, totalQuestionCount=totalQuestionCount) # -- template filters -- # {text} | render_markdown @app.template_filter('render_markdown') def render_markdown(text): if text.startswith("![") and not (text.startswith(':') and text.endswith(':')): return text else: # app.logger.debug("[CatAsk] app.template_filter render_markdown(text) triggered") return func.renderMarkdown(text) # render_markdown({text}, fromWho=False|True) @app.template_global('render_markdown') def render_markdown(text, fromWho=False): if (text.startswith("![") or "![" in text) and not (text.startswith(':') and text.endswith(':')): # ensuring that only inline images get escaped, not emojis escaped = text.replace("![", "!\[") return func.renderMarkdown(escaped) else: # don't want people inserting buttons and other stuff into name field allowed_tags = ["p", "img"] if fromWho else None return func.renderMarkdown(text, allowed_tags) # -- error handlers -- @app.errorhandler(404) def notFound(e): return render_template('errors/404.html'), 404 @app.errorhandler(500) def internalServerError(e): return render_template('errors/500.html'), 500 @app.errorhandler(502) def badGateway(e): return render_template('errors/502.html'), 502 # -- client (frontend) routes -- @app.route('/', methods=['GET']) def index(): per_page = 25 offset = 0 page = 1 func_val = func.getAllQuestions(limit=per_page, offset=offset) combined = func_val[0] metadata = func_val[1] emojis = func.listEmojis() packs = func.listEmojiPacks() return render_template( 'index.html', combined=combined, urllib=urllib, trimContent=func.trimContent, metadata=metadata, getRandomWord=func.getRandomWord, formatRelativeTime=func.formatRelativeTime, emojis=emojis, packs=packs, page=page, per_page=per_page ) @app.route('/inbox/', methods=['GET']) @loginRequired def inbox(): conn = func.connectToDb() cursor = conn.cursor() app.logger.debug("[CatAsk/Inbox] SELECT'ing unanswered questions") cursor.execute("SELECT * FROM questions WHERE answered=%s ORDER BY creation_date DESC", (False,)) questions = cursor.fetchall() # postgres shenanigans for question in questions: question['creation_date'] = question['creation_date'].replace(microsecond=0).replace(tzinfo=None) cursor.execute("UPDATE questions SET unread=%s WHERE id=%s", (False, question['id'])) conn.commit() cursor.close() conn.close() return render_template('inbox.html', questions=questions, formatRelativeTime=func.formatRelativeTime) @app.route('/q//', methods=['GET']) def viewQuestion(question_id): question = func.getQuestion(question_id) answer = func.getAnswer(question_id) if not question or not answer: return abort(404) else: metadata = func.generateMetadata(question, answer) return render_template('view_question.html', question=question, urllib=urllib, answer=answer, metadata=metadata, formatRelativeTime=func.formatRelativeTime, trimContent=func.trimContent) # TODO: implement this and private questions should be here too """ @app.route('/questions/', methods=['GET']) def seeAskedQuestions(): conn = func.connectToDb() cursor = conn.cursor() cursor.execute("SELECT * FROM questions WHERE asker_id=%s ORDER BY creation_date DESC", (asker_id,)) questions = cursor.fetchall() cursor.close() conn.close() return render_template('asked_questions.html', questions=questions, formatRelativeTime=func.formatRelativeTime) """ # -- admin client routes -- @admin_bp.route('/login/', methods=['GET', 'POST']) def login(): global logged_in if request.method == 'POST': admin_password = request.form.get('admin_password') if admin_password == os.environ.get('ADMIN_PASSWORD'): session['logged_in'] = True session.permanent = request.form.get('remember_me', False) return redirect(url_for('index')) else: flash(_("Wrong password"), 'danger') return redirect(url_for('admin.login')) else: if logged_in: return redirect('index') else: return render_template('admin/login.html') @admin_bp.route('/logout/', methods=['POST']) @loginRequired def logout(): session['logged_in'] = False return redirect(url_for('index')) @admin_bp.route('/', methods=['GET', 'POST']) @loginRequired def index(): return redirect(url_for('admin.information')) @admin_bp.route('/information/', methods=['GET', 'POST']) @loginRequired def information(): return render_template('admin/categories/instance.html') @admin_bp.route('/accessibility/', methods=['GET', 'POST']) @loginRequired def accessibility(): return render_template('admin/categories/accessibility.html') @admin_bp.route('/languages/', methods=['GET', 'POST']) @loginRequired def languages(): return render_template('admin/categories/languages.html') @admin_bp.route('/notifications/', methods=['GET', 'POST']) @loginRequired def notifications(): return render_template('admin/categories/notifications.html') @admin_bp.route('/general/', methods=['GET', 'POST']) @loginRequired def general(): return render_template('admin/categories/general.html') @admin_bp.route('/customize/', methods=['GET', 'POST']) @loginRequired def customize(): return render_template('admin/categories/customize.html') @admin_bp.route('/antispam/', methods=['GET', 'POST']) @loginRequired def antispam(): return render_template('admin/categories/antispam.html') @admin_bp.route('/emojis/', methods=['GET', 'POST']) @loginRequired def emojis(): if not os.path.exists(const.emojiPath): os.makedirs(const.emojiPath) packs = func.listEmojiPacks() if packs: for pack in packs: json_pack = bool(pack.get('website') and pack.get('exportedAt')) if json_pack: break else: json_pack = False emojis = func.listEmojis() return render_template('admin/categories/emojis.html', json_pack=json_pack, packs=packs, emojis=emojis, formatRelativeTime=func.formatRelativeTime2) @admin_bp.route('/blacklist/', methods=['GET', 'POST']) @loginRequired def blacklist(): if request.method == 'POST': action = request.form.get('action') blacklist = request.form.get('blacklist') with open(const.blacklistFile, 'w') as file: file.write(blacklist) return {'message': _("Blacklist updated!")}, 200 else: blacklist = func.readPlainFile(const.blacklistFile) if blacklist == []: blacklist = '' return render_template('admin/categories/blacklist.html', blacklist=blacklist) # TODO: implement deleting exports @admin_bp.route('/import-export/', methods=['GET']) @loginRequired def importExport(): if os.path.exists(const.exportsFile): exports = func.loadJSON(const.exportsFile) else: exports = None return render_template('admin/categories/import.html', exports=exports) # TODO: implement first-launch setup route """ @admin_bp.route('/post-install/', methods=['GET', 'POST']) @loginRequired def postInstall(): if config.postInstallCompleted: return abort(404) else: pass """ # -- server routes -- @api_bp.errorhandler(404) def notFound(e): return jsonify({'error': 'Not found'}), 404 @api_bp.errorhandler(400) def badRequest(e): return jsonify({'error': str(e)}), 400 @api_bp.errorhandler(500) def internalServerError(e): return jsonify({'error': str(e)}), 500 # why should i have a manifest.json file when i can just make it a route @api_bp.route("/manifest.json", methods=['GET']) def pwaManifest(): # not sure about theme_color but whateva return jsonify({ "short_name": const.appName, "name": cfg['instance']['title'], "icons": [ { "src": url_for("static", filename="icons/favicon/android-chrome-192x192.png"), "sizes": "192x192", "type": "image/png" }, { "src": url_for("static", filename="icons/favicon/android-chrome-512x512.png"), "sizes": "512x512", "type": "image/png" } ], "start_url": "/", "display": "standalone", "theme_color": cfg['style']['accentLight'], "background_color": "" }) @api_bp.route('/ts/download///', methods=['GET']) @loginRequired def downloadTheme(author, theme): return send_file( requests.get(f"{ cfg['themeStoreUrl'] }/api/v1/dl/{author}/{theme}/"), download_name=f"{ theme }.css") @api_bp.route('/ts/card_row/', methods=['GET']) @loginRequired def themeStore_themes(): cfg = func.loadJSON(const.configFile) url = f"{ cfg['themeStoreUrl'] }/api/v1/themes/" themes = requests.get(url).json() cards = "" i = 0 for theme in themes: i += 1 cards += f"""

{ _("Updated") } { func.formatRelativeTime(theme['updated_at']) }

{ _("by") } { theme['author'] }

""" html = f"""
{cards}
""" return html @api_bp.route('/ts/modals/', methods=['GET']) @loginRequired def themeStore_themeModals(): cfg = func.loadJSON(const.configFile) url = f"{cfg['themeStoreUrl']}/api/v1/themes/" themes = requests.get(url).json() i = 0 modals = "" for theme in themes: i += 1 theme_name = theme['name'] singleton = f"data-bs-toggle='modal' data-bs-target='#theme-modal-confirm-{i}'" if cfg['style']['customCss'] != "" else f"""data-bs-dismiss='modal' onclick='useTheme({i}, "{ theme_name }")'""" modals += f""" """ return modals @api_bp.route("/change_language/", methods=['POST']) def changeLanguage(): if cfg['languages']['allowChanging']: lang = request.form.get('lang') if lang not in app.config['available_languages'].keys(): # fallback to en_US on malformed request session['language'] == 'en_US' flash("400 Bad Request: The browser (or proxy) sent a request that this server could not understand.", "danger") return redirect(request.referrer) session['language'] = lang return redirect(request.referrer) else: return abort(404) # wip, scheduled for 1.8.0 release # @api_bp.route('/hyper/widget/', methods=['GET']) # def widget(): # func_val = func.getAllQuestions() # combined = func_val[0] # metadata = func_val[1] # return render_template('widget.html', combined=combined, urllib=urllib, trimContent=func.trimContent, metadata=metadata, getRandomWord=func.getRandomWord, formatRelativeTime=func.formatRelativeTime) @api_bp.route('/hyper/load_more_questions/', methods=['GET']) def load_more_questions(): page = request.args.get('page', default=1, type=int) per_page = 25 offset = (page - 1) * per_page func_val = func.getAllQuestions(limit=per_page, offset=offset) combined = func_val[0] if not combined: return "" return render_template('snippets/layout/load_more_questions.html', combined=combined, page=page, per_page=per_page, formatRelativeTime=func.formatRelativeTime, trimContent=func.trimContent, urllib=urllib) # -- question routes -- @api_bp.route('/add_question/', methods=['POST']) def addQuestion(): try: app.logger.debug("[CatAsk/API/add_question] started question flow") from_who = request.form.get('from_who', cfg['anonName']) question = request.form.get('question', '') cw = request.form.get('cw', '') question_normalized = unicodedata.normalize('NFKC', question).replace("⠀", "").strip() if not question or not question_normalized: abort(400, _("Question field must not be empty")) if len(question) > int(cfg['charLimit']) or len(from_who) > int(cfg['charLimit']): abort(400, _("Question exceeds the character limit")) return_val = func.addQuestion(from_who, question, cw) app.logger.debug("[CatAsk/API/add_question] finished question flow") return return_val[0] finally: if cfg['ntfy']['enabled']: # cw, return_val, from_who, question ntfy_thread = threading.Thread(target=func.ntfySend, name=f"ntfy thread for {return_val[2]}", args=(cw, return_val, from_who, question,)) ntfy_thread.start() @api_bp.route('/delete_question/', methods=['DELETE']) @loginRequired def deleteQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, _("Missing 'question_id' attribute or 'question_id' is empty")) conn = func.connectToDb() cursor = conn.cursor() app.logger.debug("[CatAsk/API/delete_question] DELETE'ing a question from database") cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,)) conn.commit() cursor.close() conn.close() return {'message': _("Successfully deleted question.")}, 200 @api_bp.route('/return_to_inbox/', methods=['POST']) @loginRequired def returnToInbox(): question_id = request.args.get('question_id', '') if not question_id: abort(400, _("Missing 'question_id' attribute or 'question_id' is empty")) conn = func.connectToDb() cursor = conn.cursor() app.logger.debug("[CatAsk/API/return_to_inbox] SELECT'ing a question from database") cursor.execute("SELECT from_who, content, creation_date, cw FROM questions WHERE id=%s", (question_id,)) question = cursor.fetchone() # question = { # 'from_who': row[0], # 'content': row[1], # 'creation_date': row[2], # 'cw': row[3] # } app.logger.debug("[CatAsk/API/return_to_inbox] DELETE'ing a question from database") cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,)) app.logger.debug("[CatAsk/API/return_to_inbox] INSERT'ing a question into database") cursor.execute("INSERT INTO questions (from_who, content, creation_date, answered, cw) VALUES (%s, %s, %s, %s, %s)", (question["from_who"], question["content"], question["creation_date"], False, question['cw'])) conn.commit() cursor.close() conn.close() return {'message': _("Successfully returned question to inbox.")}, 200 @api_bp.route('/pin_question/', methods=['POST']) @loginRequired def pinQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, _("Missing 'question_id' attribute or 'question_id' is empty")) conn = func.connectToDb() cursor = conn.cursor() app.logger.debug("[CatAsk/API/pin_question] UPDATE'ing a question to pin it") cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (True, question_id)) conn.commit() cursor.close() conn.close() return {'message': _("Successfully pinned question.")}, 200 @api_bp.route('/unpin_question/', methods=['POST']) @loginRequired def unpinQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, _("Missing 'question_id' attribute or 'question_id' is empty")) conn = func.connectToDb() cursor = conn.cursor() app.logger.debug("[CatAsk/API/unpin_question] UPDATE'ing a question to unpin it") cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (False, question_id)) conn.commit() cursor.close() conn.close() return {'message': _("Successfully unpinned question.")}, 200 @api_bp.route('/add_answer/', methods=['POST']) @loginRequired def addAnswer(): question_id = request.args.get('question_id', '') answer = request.form.get('answer') cw = request.form.get('cw', '') if not question_id: abort(400, _("Missing 'question_id' attribute or 'question_id' is empty")) if not answer: abort(400, _("Missing 'answer' attribute or 'answer' is empty")) return func.addAnswer(question_id, answer, cw) # -- uploaders -- @api_bp.route('/upload_favicon/', methods=['POST']) @loginRequired def uploadFavicon(): favicon = request.files.get('favicon') if favicon and func.allowedFile(favicon.filename): filename = secure_filename(favicon.filename) favicon_path = const.faviconDir / filename favicon.save(favicon_path) func.generateFavicon(filename) return {'message': _("Successfully updated favicon!")}, 201 elif favicon and not func.allowedFile(favicon.filename): return {'error': _("File type is not supported")}, 400 elif not favicon: return {'error': _("favicon is not specified")}, 400 @api_bp.route('/upload_emoji/', methods=['POST']) @loginRequired def uploadEmoji(): if 'emoji' not in request.files: return jsonify({'error': _("No file part in the request")}), 400 emoji = request.files['emoji'] if emoji.filename == '': return jsonify({'error': _("No file selected for uploading")}), 400 if not func.allowedFile(emoji.filename): return jsonify({'error': _("Invalid file type. Only png, jpg, jpeg, webp, bmp, jxl, gif supported")}), 400 # Secure the filename and determine the archive name filename = secure_filename(emoji.filename) emoji_path = const.emojiPath / filename emoji.save(emoji_path) return jsonify({'message': _("Emoji {} successfully uploaded").format(filename)}), 201 @api_bp.route('/upload_emoji_pack/', methods=['POST']) @loginRequired def uploadEmojiPack(): if 'emoji_archive' not in request.files: return jsonify({'error': _("No file part in the request")}), 400 emoji_archive = request.files['emoji_archive'] if emoji_archive.filename == '': return jsonify({'error': _("No file selected for uploading")}), 400 if not func.allowedArchive(emoji_archive.filename): return jsonify({'error': _("Invalid file type. Only .zip, .tar, .tar.gz, .tar.bz2 allowed")}), 400 filename = secure_filename(emoji_archive.filename) archive_name = func.stripArchExtension(filename) extract_path = const.emojiPath / archive_name extract_path.mkdir(parents=True, exist_ok=True) archive_path = extract_path / filename emoji_archive.save(archive_path) try: if zipfile.is_zipfile(archive_path): with zipfile.ZipFile(archive_path, 'r') as zip_ref: zip_ref.extractall(extract_path) elif tarfile.is_tarfile(archive_path): with tarfile.open(archive_path, 'r:*') as tar_ref: tar_ref.extractall(extract_path) else: return jsonify({'error': _("Unsupported archive format")}), 400 # parse meta.json if it exists meta_json_path = extract_path / 'meta.json' if meta_json_path.exists(): processed_emojis = func.processEmojis(meta_json_path) return jsonify({'message': _('Successfully uploaded and processed {} emojis from archive "{}".').format(len(processed_emojis), filename)}), 201 else: return jsonify({'message': _("Archive {} successfully uploaded and extracted.").format(filename)}), 201 except Exception as e: return jsonify({'error': str(e)}), 500 finally: archive_path.unlink() # -- getters -- # this isn't used anywhere yet, but maybe it will in the future @api_bp.route('/get_emojis/', methods=['GET']) @loginRequired def getEmojis(): return func.listEmojis() @api_bp.route('/hyper/get_emoji_packs/', methods=['GET']) @loginRequired def getEmojiPacks(): index = int(request.args.get('index', '')) packs = func.listEmojiPacks() pack = packs[index] rel_path = pack['relative_path'] emojis = pack['emojis'] html = '' for emoji in emojis: html += f"""
{ emoji['name'] }
""" return html # -- delete routes -- @api_bp.route('/delete_emoji/', methods=['DELETE']) @loginRequired def deleteEmoji(): emoji_name = request.args.get('emoji_name') emoji_base_path = const.emojiPath emoji_file_path = emoji_base_path / emoji_name emoji_ext = os.path.splitext(f"{emoji_base_path}/{emoji_file_path}") if not emoji_file_path.exists() or not emoji_file_path.is_file(): return jsonify({'error': _("Emoji not found")}), 404 try: emoji_file_path.unlink() return jsonify({'message': _('Emoji "{}" deleted successfully').format(emoji_name)}), 200 except Exception as e: return jsonify({'error': str(e)}), 500 @api_bp.route('/delete_emoji_pack/', methods=['DELETE']) @loginRequired def deleteEmojiPack(): pack_name = request.args.get('pack_name') emojis_path = const.emojiPath emoji_base_path = Path.cwd() / 'static' / 'emojis' / pack_name.lower() if not emoji_base_path.exists() or not emoji_base_path.is_dir(): return jsonify({'error': _('Emoji pack "{}" not found').format(pack_name)}), 404 try: for json_file in emojis_path.glob(f'{pack_name.lower()}.json'): json_file.unlink() shutil.rmtree(emoji_base_path) return jsonify({'message': _('Emoji pack "{}" deleted successfully.').format(pack_name)}), 200 except Exception as e: return jsonify({'error': str(e)}), 500 # -- misc -- @api_bp.route('/update_blacklist/', methods=['PUT']) @loginRequired def updateBlacklist(): action = request.form.get('action') blacklist = request.form.get('blacklist') with open(const.blacklistFile, 'w') as file: file.write(blacklist) return {'message': _("Blacklist updated!")}, 200 @api_bp.route('/get_question_count/', methods=['GET']) def getQuestionCount(answered: bool = None, unread: bool = None): conn = func.connectToDb() cursor = conn.cursor() app.logger.debug("[CatAsk/API/get_question_count] SELECT'ing question count from database") query = "SELECT COUNT(id) FROM questions" if (answered != None) and not unread: query += " WHERE answered=%s" cursor.execute(query, (answered,)) elif (answered != None) and (unread != None): query += " WHERE answered=%s AND unread=%s" cursor.execute(query, (answered, unread)) elif (unread != None) and not answered: query += " WHERE unread=%s" cursor.execute(query, (unread,)) else: cursor.execute(query) question_count = cursor.fetchone() cursor.close() conn.close() return str(question_count['count']) # -- import/export -- @api_bp.route('/import_data/', methods=['PUT']) def importData(): os.makedirs(const.tempDir, exist_ok=True) archive = request.files['import_archive'] filename = secure_filename(archive.filename) file_path = const.tempDir / filename archive.save(file_path) return func.importData(file_path) @api_bp.route('/import_rs_data/', methods=['PUT']) def importRsData(): os.makedirs(const.tempDir, exist_ok=True) archive = request.files['import_archive'] filename = secure_filename(archive.filename) file_path = const.tempDir / filename archive.save(file_path) return func.retrospringImport(file_path) @api_bp.route('/create_export/', methods=['POST']) @loginRequired def createExport(): return func.createExport() @api_bp.route('/delete_export/', methods=['DELETE']) @loginRequired def deleteExport(): timestamp = request.args.get('timestamp', '') return func.deleteExport(timestamp) # unused @api_bp.route('/view_question/', methods=['GET']) @loginRequired def viewQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, _("Missing 'question_id' attribute or 'question_id' is empty")) conn = func.connectToDb() cursor = conn.cursor() cursor.execute("SELECT id, from_who, creation_date, content, answered, answer_id FROM questions WHERE id=%s", (question_id,)) question = cursor.fetchone() cursor.close() conn.close() return jsonify(question) @api_bp.route('/view_answer/', methods=['GET']) @loginRequired def viewAnswer(): answer_id = request.args.get('answer_id', '') if not answer_id: abort(400, _("Missing 'answer_id' attribute or 'answer_id' is empty")) conn = func.connectToDb() cursor = conn.cursor() cursor.execute("SELECT id, question_id, creation_date, content FROM answers WHERE id=%s", (answer_id,)) answer = cursor.fetchone() cursor.close() conn.close() return jsonify(answer) @api_bp.route('/update_config/', methods=['POST']) @loginRequired def updateConfig(): cfg = func.loadJSON(const.configFile) configuration = request.form for key, value in configuration.items(): cleaned_key = key.removeprefix('_') if key.startswith('_') else key nested_keys = cleaned_key.split('.') current_dict = cfg for nested_key in nested_keys[:-1]: current_dict = current_dict.setdefault(nested_key, {}) # Convert the checkbox value 'True'/'False' strings to actual booleans if value.lower() == 'true': value = True elif value.lower() == 'false': value = False current_dict[nested_keys[-1]] = value func.saveJSON(cfg, const.configFile) app.config.update(cfg) # loading the config file again to read new values func.loadJSON(const.configFile) return {'message': _("Settings saved!")} app.register_blueprint(api_bp, url_prefix='/api/v1') app.register_blueprint(admin_bp, url_prefix='/admin') if __name__ == '__main__': app.run(debug=True)