from flask import Flask, Blueprint, jsonify, request, abort, render_template, flash, session, redirect, url_for from flask_compress import Compress from dotenv import load_dotenv from mysql.connector import errorcode from functools import wraps from werkzeug.utils import secure_filename from pathlib import Path import secrets import shutil import zipfile import tarfile import mysql.connector import urllib import functions as func import os import json import constants as const # used for admin routes logged_in = False load_dotenv() app = Flask(const.appName) app.secret_key = os.environ.get("APP_SECRET") cfg = func.loadJSON(const.configFile) app.config.from_mapping(cfg) app.config.update(cfg) # compress to improve page load speed Compress(app) # -- blueprints -- api_bp = Blueprint('api', const.appName) admin_bp = Blueprint('admin', const.appName) # -- cli commands -- @app.cli.command("init-db") def initDatabase(): dbName = os.environ.get("DB_NAME") print(f"Attempting to connect to database {dbName}...") try: conn = func.connectToDb() cursor = conn.cursor() print("Connected successfully") conn.database = dbName except mysql.connector.Error as error: if error.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Bad credentials") elif error.errno == errorcode.ER_BAD_DB_ERROR: dbPort = os.environ.get("DB_PORT") if not dbPort: dbPort = 3306 conn = mysql.connector.connect( user=os.environ.get("DB_USER"), password=os.environ.get("DB_PASS"), host=os.environ.get("DB_HOST"), port=dbPort, database='mysql' ) cursor = conn.cursor() func.createDatabase(cursor, dbName) conn.database = dbName else: print("Error:", error) return with open('schema.sql', 'r') as schema_file: schema = schema_file.read() try: cursor.execute(schema, multi=True) print(f"Database {dbName} was successfully initialized!") except mysql.connector.Error as error: print(f"Failed to initialize database {dbName}: {error}") finally: cursor.close() conn.close() # -- decorators -- def loginRequired(f): @wraps(f) def login_required(*args, **kwargs): if not logged_in: return abort(404) return f(*args, **kwargs) return login_required # -- before request -- @app.before_request def before_request(): # app.logger.debug("[CatAsk] app.before_request triggered") global logged_in logged_in = session.get('logged_in') # -- context processors -- @app.context_processor def inject_stuff(): app.logger.debug("[CatAsk] app.context_processor inject_stuff() triggered") cfg = func.loadJSON(const.configFile) # for 1.6.0 or later # questionCount = getQuestionCount() return dict(metadata=func.generateMetadata(), len=len, str=str, const=const, cfg=cfg, logged_in=logged_in, version_id=const.version_id, version=const.version, appName=const.appName) # -- template filters -- @app.template_filter('render_markdown') def render_markdown(text): # app.logger.debug("[CatAsk] app.template_filter render_markdown(text) triggered") return func.renderMarkdown(text) # -- client (frontend) routes -- @app.route('/', methods=['GET']) def index(): conn = func.connectToDb() cursor = conn.cursor(dictionary=True) app.logger.debug("[CatAsk/Home] SELECT'ing pinned questions") cursor.execute("SELECT * FROM questions WHERE answered=%s AND pinned=%s ORDER BY creation_date DESC", (True, True)) pinned_questions = cursor.fetchall() app.logger.debug("[CatAsk/Home] SELECT'ing non-pinned questions") cursor.execute("SELECT * FROM questions WHERE answered=%s AND pinned=%s ORDER BY creation_date DESC", (True, False)) non_pinned_questions = cursor.fetchall() questions = pinned_questions + non_pinned_questions app.logger.debug("[CatAsk/Home] SELECT'ing answers") cursor.execute("SELECT * FROM answers ORDER BY creation_date DESC") answers = cursor.fetchall() metadata = func.generateMetadata() combined = [] for question in questions: question_answers = [answer for answer in answers if answer['question_id'] == question['id']] combined.append({ 'question': question, 'answers': question_answers }) cursor.close() conn.close() return render_template('index.html', combined=combined, urllib=urllib, trimContent=func.trimContent, metadata=metadata, getRandomWord=func.getRandomWord, formatRelativeTime=func.formatRelativeTime) @app.route('/inbox/', methods=['GET']) @loginRequired def inbox(): conn = func.connectToDb() cursor = conn.cursor(dictionary=True) app.logger.debug("[CatAsk/Inbox] SELECT'ing unanswered questions") cursor.execute("SELECT * FROM questions WHERE answered=%s ORDER BY creation_date DESC", (False,)) questions = cursor.fetchall() cursor.close() conn.close() return render_template('inbox.html', questions=questions, formatRelativeTime=func.formatRelativeTime) @app.route('/q//', methods=['GET']) def viewQuestion(question_id): question = func.getQuestion(question_id) answer = func.getAnswer(question_id) metadata = func.generateMetadata(question, answer) return render_template('view_question.html', question=question, urllib=urllib, answer=answer, metadata=metadata, formatRelativeTime=func.formatRelativeTime, trimContent=func.trimContent) # TODO: implement this and private questions should be here too """ @app.route('/questions/', methods=['GET']) def seeAskedQuestions(): conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM questions WHERE asker_id=%s ORDER BY creation_date DESC", (asker_id,)) questions = cursor.fetchall() cursor.close() conn.close() return render_template('asked_questions.html', questions=questions, formatRelativeTime=func.formatRelativeTime) """ # -- admin client routes -- @admin_bp.route('/login/', methods=['GET', 'POST']) def login(): global logged_in if request.method == 'POST': admin_password = request.form.get('admin_password') if admin_password == os.environ.get('ADMIN_PASSWORD'): session['logged_in'] = True session.permanent = request.form.get('remember_me', False) return redirect(url_for('index')) else: flash("Wrong password", 'danger') return redirect(url_for('admin.login')) else: if logged_in: return redirect('index') else: return render_template('admin/login.html') @admin_bp.route('/logout/', methods=['POST']) @loginRequired def logout(): session['logged_in'] = False return redirect(url_for('index')) @admin_bp.route('/', methods=['GET', 'POST']) @loginRequired def index(): return redirect(url_for('admin.information')) @admin_bp.route('/information/', methods=['GET', 'POST']) @loginRequired def information(): return render_template('admin/categories/instance.html') @admin_bp.route('/general/', methods=['GET', 'POST']) @loginRequired def general(): return render_template('admin/categories/general.html') @admin_bp.route('/customize/', methods=['GET', 'POST']) @loginRequired def customize(): return render_template('admin/categories/customize.html') @admin_bp.route('/emojis/', methods=['GET', 'POST']) @loginRequired def emojis(): if not os.path.exists(const.emojiPath): os.makedirs(const.emojiPath) packs = func.listEmojiPacks() if packs: for pack in packs: # print(pack,'\n') json_pack = bool(pack.get('website') and pack.get('exportedAt')) print(json_pack) if json_pack: break else: json_pack = False emojis = func.listEmojis() return render_template('admin/categories/emojis.html', json_pack=json_pack, packs=packs, emojis=emojis, formatRelativeTime=func.formatRelativeTime2) @admin_bp.route('/blacklist/', methods=['GET', 'POST']) @loginRequired def blacklist(): if request.method == 'POST': action = request.form.get('action') blacklist = request.form.get('blacklist') with open(const.blacklistFile, 'w') as file: file.write(blacklist) return {'message': 'Blacklist updated!'}, 200 else: blacklist = func.readPlainFile(const.blacklistFile) if blacklist == []: blacklist = '' return render_template('admin/categories/blacklist.html', blacklist=blacklist) # TODO: implement first-launch setup route """ @admin_bp.route('/post-install/', methods=['GET', 'POST']) @loginRequired def postInstall(): if config.postInstallCompleted: return abort(404) else: pass """ # -- server routes -- @api_bp.errorhandler(404) def notFound(): return jsonify({'error': 'Not found'}), 404 @api_bp.errorhandler(400) def badRequest(e): return jsonify({'error': str(e)}), 400 @api_bp.errorhandler(500) def internalServerError(e): return jsonify({'error': str(e)}), 500 # why should i have a manifest.json file when i can just make it a route @api_bp.route("/manifest.json", methods=['GET']) def pwaManifest(): # not sure about theme_color but whateva return jsonify({ "short_name": const.appName, "name": cfg['instance']['title'], "icons": [ { "src": url_for("static", filename="icons/favicon/android-chrome-192x192.png"), "sizes": "192x192", "type": "image/png" }, { "src": url_for("static", filename="icons/favicon/android-chrome-512x512.png"), "sizes": "512x512", "type": "image/png" } ], "start_url": "/", "display": "standalone", "theme_color": cfg['style']['accentLight'], "background_color": "" }) # -- question routes -- @api_bp.route('/add_question/', methods=['POST']) def addQuestion(): from_who = request.form.get('from_who', cfg['anonName']) question = request.form.get('question', '') cw = request.form.get('cw', '') if not question: abort(400, "Question field must not be empty") if len(question) > int(cfg['charLimit']) or len(from_who) > int(cfg['charLimit']): abort(400, "Question exceeds the character limit") return func.addQuestion(from_who, question, cw) @api_bp.route('/delete_question/', methods=['DELETE']) @loginRequired def deleteQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") conn = func.connectToDb() cursor = conn.cursor() app.logger.debug("[CatAsk/API/delete_question] DELETE'ing a question from database") cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,)) cursor.close() conn.close() return {'message': 'Successfully deleted question.'}, 200 @api_bp.route('/return_to_inbox/', methods=['POST']) @loginRequired def returnToInbox(): question_id = request.args.get('question_id', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") conn = func.connectToDb() cursor = conn.cursor() app.logger.debug("[CatAsk/API/return_to_inbox] SELECT'ing a question from database") cursor.execute("SELECT from_who, content, creation_date, cw FROM questions WHERE id=%s", (question_id,)) row = cursor.fetchone() question = { 'from_who': row[0], 'content': row[1], 'creation_date': row[2], 'cw': row[3] } app.logger.debug("[CatAsk/API/return_to_inbox] DELETE'ing a question from database") cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,)) app.logger.debug("[CatAsk/API/return_to_inbox] INSERT'ing a question into database") cursor.execute("INSERT INTO questions (from_who, content, creation_date, answered, cw) VALUES (%s, %s, %s, %s, %s)", (question["from_who"], question["content"], question["creation_date"], False, question['cw'])) cursor.close() conn.close() return {'message': 'Successfully returned question to inbox.'}, 200 @api_bp.route('/pin_question/', methods=['POST']) @loginRequired def pinQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") conn = func.connectToDb() cursor = conn.cursor() app.logger.debug("[CatAsk/API/pin_question] UPDATE'ing a question to pin it") cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (True, question_id)) cursor.close() conn.close() return {'message': 'Successfully pinned question.'}, 200 @api_bp.route('/unpin_question/', methods=['POST']) @loginRequired def unpinQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") conn = func.connectToDb() cursor = conn.cursor() app.logger.debug("[CatAsk/API/unpin_question] UPDATE'ing a question to unpin it") cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (False, question_id)) cursor.close() conn.close() return {'message': 'Successfully unpinned question.'}, 200 @api_bp.route('/add_answer/', methods=['POST']) @loginRequired def addAnswer(): question_id = request.args.get('question_id', '') answer = request.form.get('answer') cw = request.form.get('cw', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") if not answer: abort(400, "Missing 'answer' attribute or 'answer' is empty") conn = func.connectToDb() try: cursor = conn.cursor() app.logger.debug("[CatAsk/API/add_answer] INSERT'ing an answer into database") cursor.execute("INSERT INTO answers (question_id, content, cw) VALUES (%s, %s, %s)", (question_id, answer, cw)) answer_id = cursor.lastrowid app.logger.debug("[CatAsk/API/add_answer] UPDATE'ing question to set answered and answer_id") cursor.execute("UPDATE questions SET answered=%s, answer_id=%s WHERE id=%s", (True, answer_id, question_id)) conn.commit() except Exception as e: conn.rollback() return jsonify({'error': str(e)}), 500 finally: cursor.close() conn.close() return jsonify({'message': 'Answer added successfully!'}), 201 # -- uploaders -- @api_bp.route('/upload_favicon/', methods=['POST']) @loginRequired def uploadFavicon(): favicon = request.files.get('favicon') if favicon and func.allowedFile(favicon.filename): filename = secure_filename(favicon.filename) favicon_path = const.faviconDir / filename favicon.save(favicon_path) func.generateFavicon(filename) return {'message': 'Successfully updated favicon!'}, 201 elif favicon and not func.allowedFile(favicon.filename): return {'error': 'File type is not supported'}, 400 elif not favicon: return {'error': "favicon is not specified"}, 400 @api_bp.route('/upload_emoji/', methods=['POST']) @loginRequired def uploadEmoji(): if 'emoji' not in request.files: return jsonify({'error': 'No file part in the request'}), 400 emoji = request.files['emoji'] if emoji.filename == '': return jsonify({'error': 'No file selected for uploading'}), 400 if not func.allowedFile(emoji.filename): return jsonify({'error': 'Invalid file type. Only png, jpg, jpeg, webp, bmp, jxl supported'}), 400 # Secure the filename and determine the archive name filename = secure_filename(emoji.filename) emoji_path = const.emojiPath / filename emoji.save(emoji_path) return jsonify({'message': f'Emoji {filename} successfully uploaded'}), 201 @api_bp.route('/upload_emoji_pack/', methods=['POST']) @loginRequired def uploadEmojiPack(): if 'emoji_archive' not in request.files: return jsonify({'error': 'No file part in the request'}), 400 emoji_archive = request.files['emoji_archive'] if emoji_archive.filename == '': return jsonify({'error': 'No file selected for uploading'}), 400 if not func.allowedArchive(emoji_archive.filename): return jsonify({'error': 'Invalid file type. Only .zip, .tar, .tar.gz, .tar.bz2 allowed'}), 400 filename = secure_filename(emoji_archive.filename) archive_name = func.stripArchExtension(filename) extract_path = const.emojiPath / archive_name extract_path.mkdir(parents=True, exist_ok=True) archive_path = extract_path / filename emoji_archive.save(archive_path) try: if zipfile.is_zipfile(archive_path): with zipfile.ZipFile(archive_path, 'r') as zip_ref: zip_ref.extractall(extract_path) elif tarfile.is_tarfile(archive_path): with tarfile.open(archive_path, 'r:*') as tar_ref: tar_ref.extractall(extract_path) else: return jsonify({'error': 'Unsupported archive format'}), 400 # parse meta.json if it exists meta_json_path = extract_path / 'meta.json' if meta_json_path.exists(): processed_emojis = func.processEmojis(meta_json_path) # emoji_metadata = func.loadJSON(meta_json_path) # emojis = emoji_metadata.get('emojis', []) # processed_emojis = [] # for emoji in emojis: # emoji_info = { # 'name': emoji['emoji']['name'], # 'file_name': emoji['fileName'] # } # processed_emojis.append(emoji_info) # app.logger.debug(f"[CatAsk/API/upload_emoji_pack] Processed emoji: {emoji_info['name']} (File: {emoji_info['file_name']})") # pack_json = { # 'name': emoji_metadata['emojis'][0]['emoji']['category'].capitalize(), # 'exportedAt': emoji_metadata["exportedAt"], # 'emojis': processed_emojis # } # pack_json_name = const.emojiPath / (emoji_metadata['emojis'][0]['emoji']['category'] + '.json') # func.saveJSON(pack_json, pack_json_name) return jsonify({'message': f'Successfully uploaded and processed {len(processed_emojis)} emojis from archive "{filename}".'}), 201 else: return jsonify({'message': f'Archive {filename} successfully uploaded and extracted.'}), 201 except Exception as e: return jsonify({'error': str(e)}), 500 finally: archive_path.unlink() # -- getters -- # this isn't used anywhere yet, but maybe it will in the future @api_bp.route('/get_emojis/', methods=['GET']) @loginRequired def getEmojis(): return func.listEmojis() @api_bp.route('/hyper/get_emoji_packs/', methods=['GET']) @loginRequired def getEmojiPacks(): index = int(request.args.get('index', '')) packs = func.listEmojiPacks() pack = packs[index] rel_path = pack['relative_path'] emojis = pack['emojis'] html = '' for emoji in emojis: html += f"""
{ emoji['name'] }
""" return html # -- delete routes -- @api_bp.route('/delete_emoji/', methods=['DELETE']) @loginRequired def deleteEmoji(): emoji_name = request.args.get('emoji_name') emoji_base_path = const.emojiPath emoji_file_path = emoji_base_path / emoji_name print(emoji_file_path) emoji_ext = os.path.splitext(f"{emoji_base_path}/{emoji_file_path}") print("emoji ext:", emoji_ext) if not emoji_file_path.exists() or not emoji_file_path.is_file(): return jsonify({'error': 'Emoji not found'}), 404 try: emoji_file_path.unlink() return jsonify({'message': f'Emoji "{emoji_name}" deleted successfully'}), 200 except Exception as e: return jsonify({'error': str(e)}), 500 @api_bp.route('/delete_emoji_pack/', methods=['DELETE']) @loginRequired def deleteEmojiPack(): pack_name = request.args.get('pack_name') emojis_path = const.emojiPath emoji_base_path = Path.cwd() / 'static' / 'emojis' / pack_name.lower() if not emoji_base_path.exists() or not emoji_base_path.is_dir(): return jsonify({'error': f'Emoji pack "{pack_name.capitalize()}" not found'}), 404 try: for json_file in emojis_path.glob(f'{pack_name.lower()}.json'): json_file.unlink() shutil.rmtree(emoji_base_path) return jsonify({'message': f'Emoji pack "{pack_name.capitalize()}" deleted successfully.'}), 200 except Exception as e: return jsonify({'error': str(e)}), 500 # -- misc -- @api_bp.route('/get_question_count/', methods=['GET']) def getQuestionCount(): conn = func.connectToDb() cursor = conn.cursor() app.logger.debug("[CatAsk/API/get_question_count] SELECT'ing question count from database") cursor.execute("SELECT COUNT(id) FROM questions WHERE answered=%s", (False,)) question_count = cursor.fetchone() cursor.close() conn.close() return str(question_count[0]) # unused """ @api_bp.route('/view_question/', methods=['GET']) @loginRequired def viewQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id, from_who, creation_date, content, answered, answer_id FROM questions WHERE id=%s", (question_id,)) question = cursor.fetchone() cursor.close() conn.close() return jsonify(question) @api_bp.route('/view_answer/', methods=['GET']) @loginRequired def viewAnswer(): answer_id = request.args.get('answer_id', '') if not answer_id: abort(400, "Missing 'answer_id' attribute or 'answer_id' is empty") conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id, question_id, creation_date, content FROM answers WHERE id=%s", (answer_id,)) answer = cursor.fetchone() cursor.close() conn.close() return jsonify(answer) """ @api_bp.route('/update_config/', methods=['POST']) @loginRequired def updateConfig(): cfg = func.loadJSON(const.configFile) configuration = request.form for key, value in configuration.items(): cleaned_key = key.removeprefix('_') if key.startswith('_') else key nested_keys = cleaned_key.split('.') current_dict = cfg for nested_key in nested_keys[:-1]: current_dict = current_dict.setdefault(nested_key, {}) # Convert the checkbox value 'True'/'False' strings to actual booleans if value.lower() == 'true': value = True elif value.lower() == 'false': value = False current_dict[nested_keys[-1]] = value func.saveJSON(cfg, const.configFile) app.config.update(cfg) return {'message': 'Changes saved!'} app.register_blueprint(api_bp, url_prefix='/api/v1') app.register_blueprint(admin_bp, url_prefix='/admin') if __name__ == '__main__': app.run(debug=True)