from flask import Flask, Blueprint, jsonify, request, abort, render_template, flash, session, redirect, url_for from flask_compress import Compress from dotenv import load_dotenv from mysql.connector import errorcode from functools import wraps from werkzeug.utils import secure_filename import mysql.connector import urllib import functions as func import os import json import constants as const # used for admin routes logged_in = False load_dotenv() app = Flask(const.appName) app.secret_key = os.environ.get("APP_SECRET") cfg = func.loadJSON(const.configFile) app.config.from_mapping(cfg) app.config.update(cfg) # compress to improve page load speed Compress(app) # -- blueprints -- api_bp = Blueprint('api', const.appName) admin_bp = Blueprint('admin', const.appName) # -- cli commands -- @app.cli.command("init-db") def initDatabase(): dbName = os.environ.get("DB_NAME") print(f"Attempting to connect to database {dbName}...") try: conn = func.connectToDb() cursor = conn.cursor() print("Connected successfully") conn.database = dbName except mysql.connector.Error as error: if error.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Bad credentials") elif error.errno == errorcode.ER_BAD_DB_ERROR: dbPort = os.environ.get("DB_PORT") if not dbPort: dbPort = 3306 conn = mysql.connector.connect( user=os.environ.get("DB_USER"), password=os.environ.get("DB_PASS"), host=os.environ.get("DB_HOST"), port=dbPort, database='mysql' ) cursor = conn.cursor() func.createDatabase(cursor, dbName) conn.database = dbName else: print("Error:", error) return with open('schema.sql', 'r') as schema_file: schema = schema_file.read() try: cursor.execute(schema, multi=True) print(f"Database {dbName} was successfully initialized!") except mysql.connector.Error as error: print(f"Failed to initialize database {dbName}: {error}") finally: cursor.close() conn.close() # -- decorators -- def loginRequired(f): @wraps(f) def login_required(*args, **kwargs): if not logged_in: return abort(404) return f(*args, **kwargs) return login_required # -- before request -- @app.before_request def before_request(): global logged_in logged_in = session.get('logged_in') # -- context processors -- @app.context_processor def inject_stuff(): cfg = func.loadJSON(const.configFile) # for 1.6.0 or later # questionCount = getQuestionCount() return dict(metadata=func.generateMetadata(), len=len, str=str, const=const, cfg=cfg, logged_in=logged_in, version_id=const.version_id, version=const.version, appName=const.appName) # -- template filters -- @app.template_filter('render_markdown') def render_markdown(text): return func.renderMarkdown(text) # -- client (frontend) routes -- @app.route('/', methods=['GET']) def index(): conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM questions WHERE answered=%s AND pinned=%s ORDER BY creation_date DESC", (True, True)) pinned_questions = cursor.fetchall() cursor.execute("SELECT * FROM questions WHERE answered=%s AND pinned=%s ORDER BY creation_date DESC", (True, False)) non_pinned_questions = cursor.fetchall() questions = pinned_questions + non_pinned_questions cursor.execute("SELECT * FROM answers ORDER BY creation_date DESC") answers = cursor.fetchall() metadata = func.generateMetadata() combined = [] for question in questions: question_answers = [answer for answer in answers if answer['question_id'] == question['id']] combined.append({ 'question': question, 'answers': question_answers }) cursor.close() conn.close() return render_template('index.html', combined=combined, urllib=urllib, trimContent=func.trimContent, metadata=metadata, getRandomWord=func.getRandomWord, formatRelativeTime=func.formatRelativeTime) @app.route('/inbox/', methods=['GET']) @loginRequired def inbox(): conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM questions WHERE answered=%s ORDER BY creation_date DESC", (False,)) questions = cursor.fetchall() cursor.close() conn.close() return render_template('inbox.html', questions=questions, formatRelativeTime=func.formatRelativeTime) @app.route('/q//', methods=['GET']) def viewQuestion(question_id): question = func.getQuestion(question_id) answer = func.getAnswer(question_id) metadata = func.generateMetadata(question, answer) return render_template('view_question.html', question=question, urllib=urllib, answer=answer, metadata=metadata, formatRelativeTime=func.formatRelativeTime, trimContent=func.trimContent) # TODO: implement this and private questions should be here too """ @app.route('/questions/', methods=['GET']) def seeAskedQuestions(): conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM questions WHERE asker_id=%s ORDER BY creation_date DESC", (asker_id,)) questions = cursor.fetchall() cursor.close() conn.close() return render_template('asked_questions.html', questions=questions, formatRelativeTime=func.formatRelativeTime) """ # -- admin client routes -- @admin_bp.route('/login/', methods=['GET', 'POST']) def login(): global logged_in if request.method == 'POST': admin_password = request.form.get('admin_password') if admin_password == os.environ.get('ADMIN_PASSWORD'): session['logged_in'] = True return redirect(url_for('index')) else: flash("Wrong password", 'danger') return redirect(url_for('admin.login')) else: if logged_in: return redirect('index') else: return render_template('admin/login.html') @admin_bp.route('/logout/') @loginRequired def logout(): session['logged_in'] = False return redirect(url_for('index')) @admin_bp.route('/', methods=['GET', 'POST']) @loginRequired def index(): if request.method == 'POST': action = request.form.get('action') blacklist = request.form.get('blacklist') with open(const.blacklistFile, 'w') as file: file.write(blacklist) return {'message': 'Changes saved!'}, 200 else: blacklist = func.readPlainFile(const.blacklistFile) if blacklist == []: blacklist = '' cfg_vars = func.loadJSON(const.configFile) return render_template('admin/index.html', blacklist=blacklist, cfg=cfg_vars) # TODO: implement first-launch setup route """ @admin_bp.route('/post-install/', methods=['GET', 'POST']) @loginRequired def postInstall(): if config.postInstallCompleted: return abort(404) else: pass """ # -- server routes -- @api_bp.errorhandler(404) def notFound(): return jsonify({'error': 'Not found'}), 404 @api_bp.errorhandler(400) def badRequest(e): return jsonify({'error': str(e)}), 400 @api_bp.errorhandler(500) def internalServerError(e): return jsonify({'error': str(e)}), 500 @api_bp.route('/add_question/', methods=['POST']) def addQuestion(): from_who = request.form.get('from_who', cfg['anonName']) question = request.form.get('question', '') antispam = request.form.get('antispam', '') # reserved for version 1.5.0 or later # private = request.form.get('private') if not question: abort(400, "Question field must not be empty") if not antispam: abort(400, "Anti-spam word must not be empty") if len(question) > int(cfg['charLimit']) or len(from_who) > int(cfg['charLimit']): abort(400, "Question exceeds the character limit") antispam_wordlist = func.readPlainFile(const.antiSpamFile, split=True) antispam_valid = antispam in antispam_wordlist if not antispam_valid: return {'error': 'An error has occurred'}, 500 blacklist = func.readPlainFile(const.blacklistFile, split=True) for bad_word in blacklist: if bad_word in question or bad_word in from_who: # return a generic error message so bad actors wouldn't figure out the blacklist return {'error': 'An error has occurred'}, 500 conn = func.connectToDb() cursor = conn.cursor() cursor.execute("INSERT INTO questions (from_who, content, answered) VALUES (%s, %s, %s)", (from_who, question, False,)) cursor.close() conn.close() return {'message': 'Question asked successfully!'}, 201 @api_bp.route('/delete_question/', methods=['DELETE']) @loginRequired def deleteQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") conn = func.connectToDb() cursor = conn.cursor() cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,)) cursor.close() conn.close() return {'message': 'Successfully deleted question.'}, 200 @api_bp.route('/return_to_inbox/', methods=['POST']) @loginRequired def returnToInbox(): question_id = request.args.get('question_id', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") conn = func.connectToDb() cursor = conn.cursor() cursor.execute("SELECT from_who, content, creation_date FROM questions WHERE id=%s", (question_id,)) row = cursor.fetchone() question = { 'from_who': row[0], 'content': row[1], 'creation_date': row[2] } cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,)) cursor.execute("INSERT INTO questions (from_who, content, creation_date, answered) VALUES (%s, %s, %s, %s)", (question["from_who"], question["content"], question["creation_date"], False,)) cursor.close() conn.close() return {'message': 'Successfully returned question to inbox.'}, 200 @api_bp.route('/pin_question/', methods=['POST']) @loginRequired def pinQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") conn = func.connectToDb() cursor = conn.cursor() cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (True, question_id)) cursor.close() conn.close() return {'message': 'Successfully pinned question.'}, 200 @api_bp.route('/unpin_question/', methods=['POST']) @loginRequired def unpinQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") conn = func.connectToDb() cursor = conn.cursor() cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (False, question_id)) cursor.close() conn.close() return {'message': 'Successfully unpinned question.'}, 200 @api_bp.route('/add_answer/', methods=['POST']) @loginRequired def addAnswer(): question_id = request.args.get('question_id', '') answer = request.form.get('answer') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") if not answer: abort(400, "Missing 'answer' attribute or 'answer' is empty") conn = func.connectToDb() try: cursor = conn.cursor() cursor.execute("INSERT INTO answers (question_id, content) VALUES (%s, %s)", (question_id, answer)) answer_id = cursor.lastrowid cursor.execute("UPDATE questions SET answered=%s, answer_id=%s WHERE id=%s", (True, answer_id, question_id)) conn.commit() except Exception as e: conn.rollback() return jsonify({'error': str(e)}), 500 finally: cursor.close() conn.close() return jsonify({'message': 'Answer added successfully!'}), 201 @api_bp.route('/upload_favicon/', methods=['POST']) @loginRequired def uploadFavicon(): favicon = request.files.get('favicon') if favicon and func.allowedFile(favicon.filename): filename = secure_filename(favicon.filename) favicon_path = const.faviconDir / filename favicon.save(favicon_path) func.generateFavicon(filename) return {'message': 'Successfully updated favicon!'}, 201 elif favicon and not func.allowedFile(favicon.filename): return {'error': 'File type is not supported'}, 400 elif not favicon: return {'error': "favicon is not specified"}, 400 @api_bp.route('/get_question_count/', methods=['GET']) def getQuestionCount(): conn = func.connectToDb() cursor = conn.cursor() cursor.execute("SELECT COUNT(id) FROM questions WHERE answered=%s", (False,)) question_count = cursor.fetchone() cursor.close() conn.close() return str(question_count[0]) # unused """ @api_bp.route('/view_question/', methods=['GET']) @loginRequired def viewQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id, from_who, creation_date, content, answered, answer_id FROM questions WHERE id=%s", (question_id,)) question = cursor.fetchone() cursor.close() conn.close() return jsonify(question) @api_bp.route('/view_answer/', methods=['GET']) @loginRequired def viewAnswer(): answer_id = request.args.get('answer_id', '') if not answer_id: abort(400, "Missing 'answer_id' attribute or 'answer_id' is empty") conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id, question_id, creation_date, content FROM answers WHERE id=%s", (answer_id,)) answer = cursor.fetchone() cursor.close() conn.close() return jsonify(answer) """ @api_bp.route('/update_config/', methods=['POST']) @loginRequired def updateConfig(): cfg = func.loadJSON(const.configFile) configuration = request.form for key, value in configuration.items(): cleaned_key = key.removeprefix('_') if key.startswith('_') else key nested_keys = cleaned_key.split('.') current_dict = cfg for nested_key in nested_keys[:-1]: current_dict = current_dict.setdefault(nested_key, {}) # Convert the checkbox value 'True'/'False' strings to actual booleans if value.lower() == 'true': value = True elif value.lower() == 'false': value = False current_dict[nested_keys[-1]] = value func.saveJSON(cfg, const.configFile) app.config.update(cfg) return {'message': 'Changes saved!'} app.register_blueprint(api_bp, url_prefix='/api/v1') app.register_blueprint(admin_bp, url_prefix='/admin') if __name__ == '__main__': app.run(debug=True)