from flask import Flask, Blueprint, jsonify, request, abort, render_template, flash, session, redirect, url_for from dotenv import load_dotenv from mysql.connector import errorcode from functools import wraps import mysql.connector import functions as func import os import json import config as cfg import constants as const # used for admin routes logged_in = False load_dotenv() app = Flask(cfg.appName) print(os.environ.get("APP_SECRET")) app.secret_key = os.environ.get("APP_SECRET") # -- blueprints -- api_bp = Blueprint('api', cfg.appName) admin_bp = Blueprint('admin', cfg.appName) # -- cli commands -- @app.cli.command("init-db") def initDatabase(): dbName = os.environ.get("DB_NAME") print(f"Attempting to connect to database {dbName}...") try: conn = func.connectToDb() cursor = conn.cursor() print("Connected successfully") conn.database = dbName except mysql.connector.Error as error: if error.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Bad credentials") elif error.errno == errorcode.ER_BAD_DB_ERROR: conn = mysql.connector.connect( user=os.environ.get("DB_USER"), password=os.environ.get("DB_PASS"), host=os.environ.get("DB_HOST"), database='mysql' ) cursor = conn.cursor() func.createDatabase(cursor, dbName) conn.database = dbName else: print("Error:", error) return with open('schema.sql', 'r') as schema_file: schema = schema_file.read() try: cursor.execute(schema, multi=True) print(f"Database {dbName} was successfully initialized!") except mysql.connector.Error as error: print(f"Failed to initialize database {dbName}: {error}") finally: cursor.close() conn.close() # -- decorators -- def loginRequired(f): @wraps(f) def login_required(*args, **kwargs): if not logged_in: return abort(403) return f(*args, **kwargs) return login_required # -- before request -- @app.before_request def before_request(): global logged_in logged_in = session.get('logged_in') # -- context processors -- @app.context_processor def inject_stuff(): return dict(cfg=cfg, logged_in=logged_in, version=const.version, appName=cfg.appName) # -- client (frontend) routes -- @app.route('/', methods=['GET']) def index(): conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM questions WHERE answered=%s ORDER BY creation_date DESC", (True,)) questions = cursor.fetchall() cursor.execute("SELECT * FROM answers ORDER BY creation_date DESC") answers = cursor.fetchall() combined = [] for question in questions: question_answers = [answer for answer in answers if answer['question_id'] == question['id']] combined.append({ 'question': question, 'answers': question_answers }) cursor.close() conn.close() return render_template('index.html', combined=combined, getRandomWord=func.getRandomWord, formatRelativeTime=func.formatRelativeTime, str=str) @app.route('/inbox/', methods=['GET']) @loginRequired def inbox(): conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM questions WHERE answered=%s", (False,)) questions = cursor.fetchall() cursor.close() conn.close() return render_template('inbox.html', questions=questions, formatRelativeTime=func.formatRelativeTime, str=str, len=len) @app.route('/q//', methods=['GET']) def viewQuestion(question_id): question = func.getQuestion(question_id) answer = func.getAnswer(question_id) return render_template('view_question.html', question=question, answer=answer, formatRelativeTime=func.formatRelativeTime, str=str) # -- admin client routes -- @admin_bp.route('/login/', methods=['GET', 'POST']) def login(): global logged_in if request.method == 'POST': admin_password = request.form.get('admin_password') if admin_password == os.environ.get('ADMIN_PASSWORD'): session['logged_in'] = True return redirect(url_for('admin.index')) else: flash("Wrong password", 'danger') return redirect(url_for('admin.login')) else: if logged_in: return redirect('admin.index') else: return render_template('admin/login.html') @admin_bp.route('/logout/') @loginRequired def logout(): session['logged_in'] = False return redirect(url_for('index')) @admin_bp.route('/', methods=['GET', 'POST']) @loginRequired def index(): if request.method == 'POST': action = request.form.get('action') blacklist = request.form.get('blacklist') if action == 'update_word_blacklist': with open(const.blacklistFile, 'w') as file: file.write(blacklist) flash("Changes saved!", 'success') return redirect(url_for('admin.index')) else: blacklist = func.readPlainFile(const.blacklistFile) return render_template('admin/index.html', blacklist=blacklist) # -- server routes -- @api_bp.errorhandler(404) def notFound(): return jsonify({'error': 'Not found'}), 404 @api_bp.errorhandler(400) def badRequest(e): return jsonify({'error': str(e)}), 400 @api_bp.errorhandler(500) def badRequest(e): return jsonify({'error': str(e)}), 500 @api_bp.route('/add_question/', methods=['POST']) def addQuestion(): from_who = request.form.get('from_who', 'Anonymous') question = request.form.get('question', '') antispam = request.form.get('antispam', '') if not question: abort(400, "Question field must not be empty") if not antispam: abort(400, "Anti-spam word must not be empty") antispam_wordlist = func.readPlainFile(const.antiSpamFile, split=True) antispam_valid = antispam in antispam_wordlist if not antispam_valid: abort(400, "Anti-spam is not valid") blacklist = func.readPlainFile(const.blacklistFile, split=True) if question in blacklist: abort(500, "An error has occurred") conn = func.connectToDb() cursor = conn.cursor() cursor.execute("INSERT INTO questions (from_who, content, answered) VALUES (%s, %s, %s)", (from_who, question, False,)) cursor.close() conn.close() return {'message': 'Question asked successfully!'}, 201 @api_bp.route('/delete_question/', methods=['DELETE']) def deleteQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") conn = func.connectToDb() cursor = conn.cursor() cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,)) cursor.close() conn.close() return {'message': 'Successfully deleted question.'}, 200 @api_bp.route('/return_to_inbox/', methods=['POST']) def returnToInbox(): question_id = request.args.get('question_id', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") conn = func.connectToDb() cursor = conn.cursor() cursor.execute("SELECT from_who, content FROM questions WHERE id=%s", (question_id,)) row = cursor.fetchone() question = { 'from_who': row[0], 'content': row[1] } cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,)) cursor.execute("INSERT INTO questions (from_who, content, answered) VALUES (%s, %s, %s)", (question["from_who"], question["content"], False,)) cursor.close() conn.close() return {'message': 'Successfully returned question to inbox.'}, 200 @api_bp.route('/add_answer/', methods=['POST']) def addAnswer(): question_id = request.args.get('question_id', '') answer = request.form.get('answer') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") if not answer: abort(400, "Missing 'answer' attribute or 'answer' is empty") conn = func.connectToDb() try: cursor = conn.cursor() cursor.execute("INSERT INTO answers (question_id, content) VALUES (%s, %s)", (question_id, answer)) answer_id = cursor.lastrowid cursor.execute("UPDATE questions SET answered=%s, answer_id=%s WHERE id=%s", (True, answer_id, question_id)) conn.commit() except Exception as e: conn.rollback() return jsonify({'error': str(e)}), 500 finally: cursor.close() conn.close() return jsonify({'message': 'Answer added successfully!'}), 201 @api_bp.route('/all_questions/', methods=['GET']) def listQuestions(): answered = request.args.get('answered', False) conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id, from_who, creation_date, content, answered, answer_id FROM questions WHERE answered=%s", (answered,)) questions = cursor.fetchall() cursor.close() conn.close() return jsonify(questions) @api_bp.route('/all_answers/', methods=['GET']) def listAnswers(): conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id, question_id, creation_date, content FROM answers") answers = cursor.fetchall() cursor.close() conn.close() return jsonify(answers) @api_bp.route('/view_question/', methods=['GET']) def viewQuestion(): question_id = request.args.get('question_id', '') if not question_id: abort(400, "Missing 'question_id' attribute or 'question_id' is empty") conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id, from_who, creation_date, content, answered, answer_id FROM questions WHERE id=%s", (question_id,)) question = cursor.fetchone() cursor.close() conn.close() return jsonify(question) @api_bp.route('/view_answer/', methods=['GET']) def viewAnswer(): answer_id = request.args.get('answer_id', '') if not answer_id: abort(400, "Missing 'answer_id' attribute or 'answer_id' is empty") conn = func.connectToDb() cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id, question_id, creation_date, content FROM answers WHERE id=%s", (answer_id,)) answer = cursor.fetchone() cursor.close() conn.close() return jsonify(answer) app.register_blueprint(api_bp, url_prefix='/api/v1') app.register_blueprint(admin_bp, url_prefix='/admin') if __name__ == '__main__': app.run(debug=True)