catask/app.py
2024-08-28 12:21:31 +03:00

327 lines
10 KiB
Python

from flask import Flask, Blueprint, jsonify, request, abort, render_template, flash, session, redirect, url_for
from dotenv import load_dotenv
from mysql.connector import errorcode
from functools import wraps
import mysql.connector
import functions as func
import os
import json
import config as cfg
import constants as const
# used for admin routes
logged_in = False
load_dotenv()
app = Flask(cfg.appName)
app.secret_key = os.environ.get("APP_SECRET")
# -- blueprints --
api_bp = Blueprint('api', cfg.appName)
admin_bp = Blueprint('admin', cfg.appName)
# -- cli commands --
@app.cli.command("init-db")
def initDatabase():
dbName = os.environ.get("DB_NAME")
print(f"Attempting to connect to database {dbName}...")
try:
conn = func.connectToDb()
cursor = conn.cursor()
print("Connected successfully")
conn.database = dbName
except mysql.connector.Error as error:
if error.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Bad credentials")
elif error.errno == errorcode.ER_BAD_DB_ERROR:
conn = mysql.connector.connect(
user=os.environ.get("DB_USER"),
password=os.environ.get("DB_PASS"),
host=os.environ.get("DB_HOST"),
database='mysql'
)
cursor = conn.cursor()
func.createDatabase(cursor, dbName)
conn.database = dbName
else:
print("Error:", error)
return
with open('schema.sql', 'r') as schema_file:
schema = schema_file.read()
try:
cursor.execute(schema, multi=True)
print(f"Database {dbName} was successfully initialized!")
except mysql.connector.Error as error:
print(f"Failed to initialize database {dbName}: {error}")
finally:
cursor.close()
conn.close()
# -- decorators --
def loginRequired(f):
@wraps(f)
def login_required(*args, **kwargs):
if not logged_in:
return abort(403)
return f(*args, **kwargs)
return login_required
# -- before request --
@app.before_request
def before_request():
global logged_in
logged_in = session.get('logged_in')
# -- context processors --
@app.context_processor
def inject_stuff():
return dict(cfg=cfg, logged_in=logged_in, version=const.version, appName=cfg.appName)
# -- client (frontend) routes --
@app.route('/', methods=['GET'])
def index():
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM questions WHERE answered=%s ORDER BY creation_date DESC", (True,))
questions = cursor.fetchall()
cursor.execute("SELECT * FROM answers ORDER BY creation_date DESC")
answers = cursor.fetchall()
combined = []
for question in questions:
question_answers = [answer for answer in answers if answer['question_id'] == question['id']]
combined.append({
'question': question,
'answers': question_answers
})
cursor.close()
conn.close()
return render_template('index.html', combined=combined, getRandomWord=func.getRandomWord, formatRelativeTime=func.formatRelativeTime, str=str)
@app.route('/inbox/', methods=['GET'])
@loginRequired
def inbox():
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM questions WHERE answered=%s", (False,))
questions = cursor.fetchall()
cursor.close()
conn.close()
return render_template('inbox.html', questions=questions, formatRelativeTime=func.formatRelativeTime, str=str, len=len)
@app.route('/q/<int:question_id>/', methods=['GET'])
def viewQuestion(question_id):
question = func.getQuestion(question_id)
answer = func.getAnswer(question_id)
return render_template('view_question.html', question=question, answer=answer, formatRelativeTime=func.formatRelativeTime, str=str)
# -- admin client routes --
@admin_bp.route('/login/', methods=['GET', 'POST'])
def login():
global logged_in
if request.method == 'POST':
admin_password = request.form.get('admin_password')
if admin_password == os.environ.get('ADMIN_PASSWORD'):
session['logged_in'] = True
return redirect(url_for('admin.index'))
else:
flash("Wrong password", 'danger')
return redirect(url_for('admin.login'))
else:
if logged_in:
return redirect('admin.index')
else:
return render_template('admin/login.html')
@admin_bp.route('/logout/')
@loginRequired
def logout():
session['logged_in'] = False
return redirect(url_for('index'))
@admin_bp.route('/', methods=['GET', 'POST'])
@loginRequired
def index():
if request.method == 'POST':
action = request.form.get('action')
blacklist = request.form.get('blacklist')
if action == 'update_word_blacklist':
with open(const.blacklistFile, 'w') as file:
file.write(blacklist)
flash("Changes saved!", 'success')
return redirect(url_for('admin.index'))
else:
blacklist = func.readPlainFile(const.blacklistFile)
return render_template('admin/index.html', blacklist=blacklist)
# -- server routes --
@api_bp.errorhandler(404)
def notFound():
return jsonify({'error': 'Not found'}), 404
@api_bp.errorhandler(400)
def badRequest(e):
return jsonify({'error': str(e)}), 400
@api_bp.errorhandler(500)
def badRequest(e):
return jsonify({'error': str(e)}), 500
@api_bp.route('/add_question/', methods=['POST'])
def addQuestion():
from_who = request.form.get('from_who', 'Anonymous')
question = request.form.get('question', '')
antispam = request.form.get('antispam', '')
if not question:
abort(400, "Question field must not be empty")
if not antispam:
abort(400, "Anti-spam word must not be empty")
antispam_wordlist = func.readPlainFile(const.antiSpamFile, split=True)
antispam_valid = antispam in antispam_wordlist
if not antispam_valid:
abort(400, "Anti-spam is not valid")
blacklist = func.readPlainFile(const.blacklistFile, split=True)
if question in blacklist:
abort(500, "An error has occurred")
conn = func.connectToDb()
cursor = conn.cursor()
cursor.execute("INSERT INTO questions (from_who, content, answered) VALUES (%s, %s, %s)", (from_who, question, False,))
cursor.close()
conn.close()
return {'message': 'Question asked successfully!'}, 201
@api_bp.route('/delete_question/', methods=['DELETE'])
def deleteQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor()
cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,))
cursor.close()
conn.close()
return {'message': 'Successfully deleted question.'}, 200
@api_bp.route('/return_to_inbox/', methods=['POST'])
def returnToInbox():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor()
cursor.execute("SELECT from_who, content FROM questions WHERE id=%s", (question_id,))
row = cursor.fetchone()
question = {
'from_who': row[0],
'content': row[1]
}
cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,))
cursor.execute("INSERT INTO questions (from_who, content, answered) VALUES (%s, %s, %s)", (question["from_who"], question["content"], False,))
cursor.close()
conn.close()
return {'message': 'Successfully returned question to inbox.'}, 200
@api_bp.route('/add_answer/', methods=['POST'])
def addAnswer():
question_id = request.args.get('question_id', '')
answer = request.form.get('answer')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
if not answer:
abort(400, "Missing 'answer' attribute or 'answer' is empty")
conn = func.connectToDb()
try:
cursor = conn.cursor()
cursor.execute("INSERT INTO answers (question_id, content) VALUES (%s, %s)", (question_id, answer))
answer_id = cursor.lastrowid
cursor.execute("UPDATE questions SET answered=%s, answer_id=%s WHERE id=%s", (True, answer_id, question_id))
conn.commit()
except Exception as e:
conn.rollback()
return jsonify({'error': str(e)}), 500
finally:
cursor.close()
conn.close()
return jsonify({'message': 'Answer added successfully!'}), 201
@api_bp.route('/all_questions/', methods=['GET'])
def listQuestions():
answered = request.args.get('answered', False)
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, from_who, creation_date, content, answered, answer_id FROM questions WHERE answered=%s", (answered,))
questions = cursor.fetchall()
cursor.close()
conn.close()
return jsonify(questions)
@api_bp.route('/all_answers/', methods=['GET'])
def listAnswers():
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, question_id, creation_date, content FROM answers")
answers = cursor.fetchall()
cursor.close()
conn.close()
return jsonify(answers)
@api_bp.route('/view_question/', methods=['GET'])
def viewQuestion():
question_id = request.args.get('question_id', '')
if not question_id:
abort(400, "Missing 'question_id' attribute or 'question_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, from_who, creation_date, content, answered, answer_id FROM questions WHERE id=%s", (question_id,))
question = cursor.fetchone()
cursor.close()
conn.close()
return jsonify(question)
@api_bp.route('/view_answer/', methods=['GET'])
def viewAnswer():
answer_id = request.args.get('answer_id', '')
if not answer_id:
abort(400, "Missing 'answer_id' attribute or 'answer_id' is empty")
conn = func.connectToDb()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id, question_id, creation_date, content FROM answers WHERE id=%s", (answer_id,))
answer = cursor.fetchone()
cursor.close()
conn.close()
return jsonify(answer)
app.register_blueprint(api_bp, url_prefix='/api/v1')
app.register_blueprint(admin_bp, url_prefix='/admin')
if __name__ == '__main__':
app.run(debug=True)