mirror of
https://codeberg.org/catask-org/catask.git
synced 2025-04-16 12:13:42 -05:00
1015 lines
37 KiB
Python
1015 lines
37 KiB
Python
from flask import Flask, Blueprint, jsonify, request, abort, render_template, flash, session, redirect, url_for
|
||
from flask_babel import Babel, _, refresh
|
||
from flask_compress import Compress
|
||
from dotenv import load_dotenv
|
||
from functools import wraps
|
||
from werkzeug.utils import secure_filename
|
||
from pathlib import Path
|
||
import unicodedata
|
||
import threading
|
||
import requests
|
||
import secrets
|
||
import shutil
|
||
import zipfile
|
||
import tarfile
|
||
import psycopg
|
||
import urllib
|
||
import functions as func
|
||
import os
|
||
import json
|
||
import constants as const
|
||
|
||
# used for admin routes
|
||
logged_in = False
|
||
|
||
load_dotenv()
|
||
|
||
app = Flask(const.appName)
|
||
app.secret_key = os.environ.get("APP_SECRET")
|
||
cfg = func.loadJSON(const.configFile)
|
||
app.config.from_mapping(cfg)
|
||
app.config.update(cfg)
|
||
# compress to improve page load speed
|
||
Compress(app)
|
||
|
||
app.config['BABEL_DEFAULT_LOCALE'] = 'en'
|
||
app.config['BABEL_TRANSLATION_DIRECTORIES'] = 'locales'
|
||
# refreshing locale
|
||
refresh()
|
||
|
||
# update this once more languages are supported
|
||
app.config['available_languages'] = {
|
||
"en_US": _("English (US)"),
|
||
"ru_RU": _("Russian")
|
||
}
|
||
|
||
def getLocale():
|
||
if not session.get('language'):
|
||
app.config.update(cfg)
|
||
session['language'] = cfg['languages']['default']
|
||
return session.get('language')
|
||
|
||
babel = Babel(app, locale_selector=getLocale)
|
||
|
||
# -- blueprints --
|
||
api_bp = Blueprint('api', const.appName)
|
||
admin_bp = Blueprint('admin', const.appName)
|
||
|
||
# -- cli commands --
|
||
|
||
@app.cli.command("init-db")
|
||
def initDatabase():
|
||
dbName = os.environ.get("DB_NAME")
|
||
print("Attempting to connect to database {dbName}...")
|
||
try:
|
||
conn = func.connectToDb()
|
||
cursor = conn.cursor()
|
||
print("Connected successfully")
|
||
|
||
conn.database = dbName
|
||
|
||
except psycopg.Error as error:
|
||
app.logger.error("Database error:", error)
|
||
# if error.errno == errorcode.ER_ACCESS_DENIED_ERROR:
|
||
# print("Bad credentials")
|
||
# elif error.errno == errorcode.ER_BAD_DB_ERROR:
|
||
# dbPort = os.environ.get("DB_PORT")
|
||
# if not dbPort:
|
||
# dbPort = 3306
|
||
#
|
||
# conn = func.connectToDb()
|
||
# cursor = conn.cursor()
|
||
# func.createDatabase(cursor, dbName)
|
||
# conn.database = dbName
|
||
# else:
|
||
# print("Error:", error)
|
||
# return
|
||
|
||
with open('schema.sql', 'r') as schema_file:
|
||
schema = schema_file.read()
|
||
try:
|
||
cursor.execute(schema)
|
||
conn.commit()
|
||
print(f"Database {dbName} was successfully initialized!")
|
||
except psycopg.Error as error:
|
||
print(f"Failed to initialize database {dbName}: {error}")
|
||
finally:
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
# -- decorators --
|
||
|
||
def loginRequired(f):
|
||
@wraps(f)
|
||
def login_required(*args, **kwargs):
|
||
if not logged_in:
|
||
return abort(404)
|
||
return f(*args, **kwargs)
|
||
return login_required
|
||
|
||
# -- before request --
|
||
|
||
@app.before_request
|
||
def before_request():
|
||
# app.logger.debug("[CatAsk] app.before_request triggered")
|
||
global logged_in
|
||
logged_in = session.get('logged_in')
|
||
|
||
# -- context processors --
|
||
|
||
@app.context_processor
|
||
def inject_stuff():
|
||
app.logger.debug("[CatAsk] app.context_processor inject_stuff() triggered")
|
||
cfg = func.loadJSON(const.configFile)
|
||
# for 1.6.0 or later
|
||
unreadQuestionCount = int(getQuestionCount(False, True))
|
||
totalQuestionCount = int(getQuestionCount())
|
||
return dict(metadata=func.generateMetadata(), len=len, str=str, const=const, cfg=cfg, logged_in=logged_in, version_id=const.version_id, version=const.version, appName=const.appName, unreadQuestionCount=unreadQuestionCount, totalQuestionCount=totalQuestionCount)
|
||
|
||
# -- template filters --
|
||
|
||
# {text} | render_markdown
|
||
@app.template_filter('render_markdown')
|
||
def render_markdown(text):
|
||
if text.startswith("![") and not (text.startswith(':') and text.endswith(':')):
|
||
return text
|
||
else:
|
||
# app.logger.debug("[CatAsk] app.template_filter render_markdown(text) triggered")
|
||
return func.renderMarkdown(text)
|
||
|
||
# render_markdown({text}, fromWho=False|True)
|
||
@app.template_global('render_markdown')
|
||
def render_markdown(text, fromWho=False):
|
||
if (text.startswith("![") or "![" in text) and not (text.startswith(':') and text.endswith(':')):
|
||
# ensuring that only inline images get escaped, not emojis
|
||
escaped = text.replace("![", "!\[")
|
||
return func.renderMarkdown(escaped)
|
||
else:
|
||
# don't want people inserting buttons and other stuff into name field
|
||
allowed_tags = ["p", "img"] if fromWho else None
|
||
return func.renderMarkdown(text, allowed_tags)
|
||
|
||
# -- error handlers --
|
||
|
||
@app.errorhandler(404)
|
||
def notFound(e):
|
||
return render_template('errors/404.html'), 404
|
||
|
||
@app.errorhandler(500)
|
||
def internalServerError(e):
|
||
return render_template('errors/500.html'), 500
|
||
|
||
@app.errorhandler(502)
|
||
def badGateway(e):
|
||
return render_template('errors/502.html'), 502
|
||
|
||
# -- client (frontend) routes --
|
||
|
||
@app.route('/', methods=['GET'])
|
||
def index():
|
||
per_page = 25
|
||
offset = 0
|
||
page = 1
|
||
|
||
func_val = func.getAllQuestions(limit=per_page, offset=offset)
|
||
combined = func_val[0]
|
||
metadata = func_val[1]
|
||
emojis = func.listEmojis()
|
||
packs = func.listEmojiPacks()
|
||
|
||
return render_template(
|
||
'index.html',
|
||
combined=combined,
|
||
urllib=urllib,
|
||
trimContent=func.trimContent,
|
||
metadata=metadata,
|
||
getRandomWord=func.getRandomWord,
|
||
formatRelativeTime=func.formatRelativeTime,
|
||
emojis=emojis,
|
||
packs=packs,
|
||
page=page,
|
||
per_page=per_page
|
||
)
|
||
|
||
|
||
@app.route('/inbox/', methods=['GET'])
|
||
@loginRequired
|
||
def inbox():
|
||
conn = func.connectToDb()
|
||
cursor = conn.cursor()
|
||
|
||
app.logger.debug("[CatAsk/Inbox] SELECT'ing unanswered questions")
|
||
|
||
cursor.execute("SELECT * FROM questions WHERE answered=%s ORDER BY creation_date DESC", (False,))
|
||
questions = cursor.fetchall()
|
||
# postgres shenanigans
|
||
for question in questions:
|
||
question['creation_date'] = question['creation_date'].replace(microsecond=0).replace(tzinfo=None)
|
||
cursor.execute("UPDATE questions SET unread=%s WHERE id=%s", (False, question['id']))
|
||
conn.commit()
|
||
cursor.close()
|
||
conn.close()
|
||
return render_template('inbox.html', questions=questions, formatRelativeTime=func.formatRelativeTime)
|
||
|
||
@app.route('/q/<int:question_id>/', methods=['GET'])
|
||
def viewQuestion(question_id):
|
||
question = func.getQuestion(question_id)
|
||
answer = func.getAnswer(question_id)
|
||
if not question or not answer:
|
||
return abort(404)
|
||
else:
|
||
metadata = func.generateMetadata(question, answer)
|
||
return render_template('view_question.html', question=question, urllib=urllib, answer=answer, metadata=metadata, formatRelativeTime=func.formatRelativeTime, trimContent=func.trimContent)
|
||
|
||
# TODO: implement this and private questions should be here too
|
||
"""
|
||
@app.route('/questions/', methods=['GET'])
|
||
def seeAskedQuestions():
|
||
conn = func.connectToDb()
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT * FROM questions WHERE asker_id=%s ORDER BY creation_date DESC", (asker_id,))
|
||
questions = cursor.fetchall()
|
||
|
||
cursor.close()
|
||
conn.close()
|
||
return render_template('asked_questions.html', questions=questions, formatRelativeTime=func.formatRelativeTime)
|
||
"""
|
||
|
||
# -- admin client routes --
|
||
|
||
@admin_bp.route('/login/', methods=['GET', 'POST'])
|
||
def login():
|
||
global logged_in
|
||
if request.method == 'POST':
|
||
admin_password = request.form.get('admin_password')
|
||
if admin_password == os.environ.get('ADMIN_PASSWORD'):
|
||
session['logged_in'] = True
|
||
session.permanent = request.form.get('remember_me', False)
|
||
return redirect(url_for('index'))
|
||
else:
|
||
flash(_("Wrong password"), 'danger')
|
||
return redirect(url_for('admin.login'))
|
||
else:
|
||
if logged_in:
|
||
return redirect('index')
|
||
else:
|
||
return render_template('admin/login.html')
|
||
|
||
@admin_bp.route('/logout/', methods=['POST'])
|
||
@loginRequired
|
||
def logout():
|
||
session['logged_in'] = False
|
||
return redirect(url_for('index'))
|
||
|
||
@admin_bp.route('/', methods=['GET', 'POST'])
|
||
@loginRequired
|
||
def index():
|
||
return redirect(url_for('admin.information'))
|
||
|
||
@admin_bp.route('/information/', methods=['GET', 'POST'])
|
||
@loginRequired
|
||
def information():
|
||
return render_template('admin/categories/instance.html')
|
||
|
||
@admin_bp.route('/accessibility/', methods=['GET', 'POST'])
|
||
@loginRequired
|
||
def accessibility():
|
||
return render_template('admin/categories/accessibility.html')
|
||
|
||
@admin_bp.route('/crosspost/', methods=['GET', 'POST'])
|
||
@loginRequired
|
||
def crosspost():
|
||
return render_template('admin/categories/crosspost.html')
|
||
|
||
@admin_bp.route('/languages/', methods=['GET', 'POST'])
|
||
@loginRequired
|
||
def languages():
|
||
return render_template('admin/categories/languages.html')
|
||
|
||
@admin_bp.route('/notifications/', methods=['GET', 'POST'])
|
||
@loginRequired
|
||
def notifications():
|
||
return render_template('admin/categories/notifications.html')
|
||
|
||
@admin_bp.route('/general/', methods=['GET', 'POST'])
|
||
@loginRequired
|
||
def general():
|
||
return render_template('admin/categories/general.html')
|
||
|
||
@admin_bp.route('/customize/', methods=['GET', 'POST'])
|
||
@loginRequired
|
||
def customize():
|
||
return render_template('admin/categories/customize.html')
|
||
|
||
@admin_bp.route('/antispam/', methods=['GET', 'POST'])
|
||
@loginRequired
|
||
def antispam():
|
||
return render_template('admin/categories/antispam.html')
|
||
|
||
@admin_bp.route('/emojis/', methods=['GET', 'POST'])
|
||
@loginRequired
|
||
def emojis():
|
||
if not os.path.exists(const.emojiPath):
|
||
os.makedirs(const.emojiPath)
|
||
|
||
packs = func.listEmojiPacks()
|
||
if packs:
|
||
for pack in packs:
|
||
json_pack = bool(pack.get('website') and pack.get('exportedAt'))
|
||
if json_pack:
|
||
break
|
||
else:
|
||
json_pack = False
|
||
emojis = func.listEmojis()
|
||
|
||
return render_template('admin/categories/emojis.html', json_pack=json_pack, packs=packs, emojis=emojis, formatRelativeTime=func.formatRelativeTime2)
|
||
|
||
@admin_bp.route('/blacklist/', methods=['GET', 'POST'])
|
||
@loginRequired
|
||
def blacklist():
|
||
if request.method == 'POST':
|
||
action = request.form.get('action')
|
||
blacklist = request.form.get('blacklist')
|
||
with open(const.blacklistFile, 'w') as file:
|
||
file.write(blacklist)
|
||
return {'message': _("Blacklist updated!")}, 200
|
||
|
||
else:
|
||
blacklist = func.readPlainFile(const.blacklistFile)
|
||
if blacklist == []:
|
||
blacklist = ''
|
||
|
||
return render_template('admin/categories/blacklist.html', blacklist=blacklist)
|
||
|
||
# TODO: implement deleting exports
|
||
|
||
@admin_bp.route('/import-export/', methods=['GET'])
|
||
@loginRequired
|
||
def importExport():
|
||
if os.path.exists(const.exportsFile):
|
||
exports = func.loadJSON(const.exportsFile)
|
||
else:
|
||
exports = None
|
||
return render_template('admin/categories/import.html', exports=exports)
|
||
|
||
# TODO: implement first-launch setup route
|
||
"""
|
||
@admin_bp.route('/post-install/', methods=['GET', 'POST'])
|
||
@loginRequired
|
||
def postInstall():
|
||
if config.postInstallCompleted:
|
||
return abort(404)
|
||
else:
|
||
pass
|
||
"""
|
||
|
||
# -- server routes --
|
||
|
||
@api_bp.errorhandler(404)
|
||
def notFound(e):
|
||
return jsonify({'error': 'Not found'}), 404
|
||
|
||
@api_bp.errorhandler(400)
|
||
def badRequest(e):
|
||
return jsonify({'error': str(e)}), 400
|
||
|
||
@api_bp.errorhandler(500)
|
||
def internalServerError(e):
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
# why should i have a manifest.json file when i can just make it a route
|
||
@api_bp.route("/manifest.json", methods=['GET'])
|
||
def pwaManifest():
|
||
# not sure about theme_color but whateva
|
||
return jsonify({
|
||
"short_name": const.appName,
|
||
"name": cfg['instance']['title'],
|
||
"icons": [
|
||
{
|
||
"src": url_for("static", filename="icons/favicon/android-chrome-192x192.png"),
|
||
"sizes": "192x192",
|
||
"type": "image/png"
|
||
},
|
||
{
|
||
"src": url_for("static", filename="icons/favicon/android-chrome-512x512.png"),
|
||
"sizes": "512x512",
|
||
"type": "image/png"
|
||
}
|
||
],
|
||
"start_url": "/",
|
||
"display": "standalone",
|
||
"theme_color": cfg['style']['accentLight'],
|
||
"background_color": ""
|
||
})
|
||
|
||
@api_bp.route('/ts/download/<author>/<theme>/', methods=['GET'])
|
||
@loginRequired
|
||
def downloadTheme(author, theme):
|
||
return send_file(
|
||
requests.get(f"{ cfg['themeStoreUrl'] }/api/v1/dl/{author}/{theme}/"),
|
||
download_name=f"{ theme }.css")
|
||
|
||
@api_bp.route('/ts/card_row/', methods=['GET'])
|
||
@loginRequired
|
||
def themeStore_themes():
|
||
cfg = func.loadJSON(const.configFile)
|
||
url = f"{ cfg['themeStoreUrl'] }/api/v1/themes/"
|
||
themes = requests.get(url).json()
|
||
cards = ""
|
||
i = 0
|
||
for theme in themes:
|
||
i += 1
|
||
cards += f"""
|
||
<div class="col-md-6 col-lg-4 col-xl-3 mb-4">
|
||
<div class="card">
|
||
<div class="card-header p-0 preview-img">
|
||
<button type="button" class="btn border-0 p-0" data-bs-toggle="modal" data-bs-target="#theme-modal-{i}" data-dontshowtoast>
|
||
<img src="{ cfg['themeStoreUrl'] }{ theme['preview_url'] }" class="card-img-top" alt="Preview of { theme['name'] }" style="min-height: 160px; object-fit: cover;">
|
||
</button>
|
||
</div>
|
||
<div class="card-body">
|
||
<p class="card-text text-body-secondary mb-1 small">{ _("Updated") } { func.formatRelativeTime(theme['updated_at']) }</p>
|
||
<h5 class="card-title m-0"><button type="button" class="btn p-0 fs-5" data-bs-toggle="modal" data-bs-target="#theme-modal-{i}" data-dontshowtoast>{ theme['name'] }</button></h5>
|
||
<p class="card-text text-body-secondary">
|
||
{ _("by") }
|
||
<a href="{ theme['author_url'] or 'javascript:;' }" class="link-offset-2 text-body text-decoration-none underline-hover">{ theme['author'] }</a>
|
||
</p>
|
||
</div>
|
||
</div>
|
||
</div>
|
||
"""
|
||
html = f"""
|
||
<div class="row">
|
||
{cards}
|
||
</div>
|
||
"""
|
||
return html
|
||
|
||
@api_bp.route('/ts/modals/', methods=['GET'])
|
||
@loginRequired
|
||
def themeStore_themeModals():
|
||
cfg = func.loadJSON(const.configFile)
|
||
url = f"{cfg['themeStoreUrl']}/api/v1/themes/"
|
||
themes = requests.get(url).json()
|
||
i = 0
|
||
modals = ""
|
||
|
||
for theme in themes:
|
||
i += 1
|
||
theme_name = theme['name']
|
||
singleton = f"data-bs-toggle='modal' data-bs-target='#theme-modal-confirm-{i}'" if cfg['style']['customCss'] != "" else f"""data-bs-dismiss='modal' onclick='useTheme({i}, "{ theme_name }")'"""
|
||
modals += f"""
|
||
<div class="modal fade" id="theme-modal-{i}" tabindex="-1" aria-labelledby="ts-modal-label" aria-hidden="true">
|
||
<div class="modal-dialog modal-fullscreen-md-down modal-xl modal-dialog-centered">
|
||
<div class="modal-content">
|
||
<div class="modal-header border-0 d-flex align-items-center ps-2 ms-1">
|
||
<h1 class="modal-title fs-5 fw-normal d-flex align-items-center" id="q-modal-label">
|
||
<button type="button" class="btn btn-basic fs-5 me-1 px-2 py-1" data-bs-target="#theme-store-modal" data-bs-toggle="modal">
|
||
<i class="bi bi-chevron-left"></i>
|
||
</button>
|
||
<span>{ theme['name'] } <span class="text-body-secondary">{ _("by") } { theme['author'] }</span></span>
|
||
</h1>
|
||
<button type="button" class="btn-close d-flex align-items-center fs-5" data-bs-dismiss="modal" aria-label="Close"><i class="bi bi-x-lg"></i></button>
|
||
</div>
|
||
<div class="modal-body py-0">
|
||
<section id="preview">
|
||
<img src="{ cfg['themeStoreUrl'] }{ theme['preview_url'] }" class="img-fluid rounded mx-auto d-block" alt="Screenshot of { theme['name'] } by { theme['author'] }">
|
||
</section>
|
||
<section id="header">
|
||
<div class="d-md-flex justify-content-between align-items-center mb-4 mt-3">
|
||
<div class="d-flex">
|
||
<span class="border py-1 px-2 rounded-start user-select-all overflow-hidden text-nowrap ts-share">{ cfg['themeStoreUrl'] }/themes/{ theme['escaped_name'] }/</span>
|
||
<button class="btn btn-secondary rounded-start-0" type="button" onclick="copyFull(`{ cfg['themeStoreUrl'] }/themes/{ theme['escaped_name'] }/`)" style="min-width: 6em;"><i class="bi bi-copy me-1"></i> { _("Copy") }</button>
|
||
</div>
|
||
<br class="d-md-none">
|
||
<div class="d-flex flex-column flex-md-row align-items-md-center gap-2">
|
||
<button type="button" class="btn btn-primary" {singleton}><i class="bi bi-palette me-1"></i> { _("Use") }</button>
|
||
{ f"""<a href="{ theme['homepage'] }/" class="btn btn-secondary">
|
||
<i class="bi bi-house-door"></i>
|
||
<span class="d-inline d-sm-none ms-1">{ _("Homepage") }</span>
|
||
</a>""" if theme['homepage'] else "" }
|
||
<a href="{ cfg['themeStoreUrl'] }/dl/{ theme['author'] }/{ theme['name'] }/" class="btn btn-secondary">
|
||
<i class="bi bi-download"></i>
|
||
<span class="d-inline d-sm-none ms-1">{ _("Download") }</span>
|
||
</a>
|
||
<a href="{ cfg['themeStoreUrl'] }/themes/{ theme['escaped_name'] }/" class="btn btn-secondary" target="_blank">
|
||
<i class="bi bi-box-arrow-up-right"></i>
|
||
<span class="d-inline d-sm-none ms-1">{ _("Go to Store page") }</span>
|
||
</a>
|
||
</div>
|
||
</div>
|
||
</section>
|
||
<section id="details" class="mb-4">
|
||
<h2>{ _("Details") }</h2>
|
||
<p class="mb-2">
|
||
<span class="tab">{ _("Author") }</span>
|
||
<a href="{ theme['author_url'] }" target="_blank" class="text-decoration-none underline-hover">{ theme['author'] }</a>
|
||
</p>
|
||
<p class="mb-2">
|
||
<span class="tab">{ _("Size") }</span>
|
||
{ theme['size'] }
|
||
</p>
|
||
<p class="mb-2">
|
||
<span class="tab">{ _("Created") }</span>
|
||
{ func.formatRelativeTime(theme['created_at']) }
|
||
</p>
|
||
<p class="mb-2">
|
||
<span class="tab">{ _("Updated") }</span>
|
||
{ func.formatRelativeTime(theme['updated_at']) }
|
||
</p>
|
||
</section>
|
||
<section id="source-code">
|
||
<h2>{ _("Source code") }</h2>
|
||
<pre class="border p-2 rounded"><code style="tab-size: 4;" id="theme-source-code-{i}">{ theme['file_contents'] }</code></pre>
|
||
</section>
|
||
</div>
|
||
</div>
|
||
</div>
|
||
</div>
|
||
<div class="modal fade" id="theme-modal-confirm-{i}" tabindex="-1" aria-labelledby="ts-modal-label" aria-hidden="true">
|
||
<div class="modal-dialog modal-dialog-centered">
|
||
<div class="modal-content">
|
||
<div class="modal-header border-0">
|
||
<h1 class="modal-title fs-5 fw-normal" id="q-modal-label">{ _('Confirmation') }</h1>
|
||
<button type="button" class="btn-close d-flex align-items-center fs-5" data-bs-toggle='modal' data-bs-target='#theme-modal-{i}' aria-label="Close"><i class="bi bi-x-lg"></i></button>
|
||
</div>
|
||
<div class="modal-body py-0">
|
||
<p>{ _('This action will overwrite your existing custom CSS, are you sure you want to apply this theme?') }</p>
|
||
</div>
|
||
<div class="modal-footer pt-1 flex-row align-items-stretch w-100 border-0">
|
||
<button type="button" class="btn btn-outline-secondary flex-fill flex-lg-grow-0" data-bs-toggle='modal' data-bs-target='#theme-modal-{i}'>{ _('Cancel') }</button>
|
||
<button type="button" class="btn btn-danger flex-fill flex-lg-grow-0" data-bs-dismiss="modal" onclick="useTheme({ i }, '{ theme_name }')">{ _('Confirm') }</button>
|
||
</div>
|
||
</div>
|
||
</div>
|
||
</div>
|
||
"""
|
||
return modals
|
||
|
||
@api_bp.route("/change_language/", methods=['POST'])
|
||
def changeLanguage():
|
||
if cfg['languages']['allowChanging']:
|
||
lang = request.form.get('lang')
|
||
if lang not in app.config['available_languages'].keys():
|
||
# fallback to en_US on malformed request
|
||
session['language'] == 'en_US'
|
||
flash("400 Bad Request: The browser (or proxy) sent a request that this server could not understand.", "danger")
|
||
return redirect(request.referrer)
|
||
session['language'] = lang
|
||
return redirect(request.referrer)
|
||
else:
|
||
return abort(404)
|
||
|
||
# wip, scheduled for 1.8.0 release
|
||
# @api_bp.route('/hyper/widget/', methods=['GET'])
|
||
# def widget():
|
||
# func_val = func.getAllQuestions()
|
||
# combined = func_val[0]
|
||
# metadata = func_val[1]
|
||
# return render_template('widget.html', combined=combined, urllib=urllib, trimContent=func.trimContent, metadata=metadata, getRandomWord=func.getRandomWord, formatRelativeTime=func.formatRelativeTime)
|
||
|
||
@api_bp.route('/hyper/load_more_questions/', methods=['GET'])
|
||
def load_more_questions():
|
||
page = request.args.get('page', default=1, type=int)
|
||
per_page = 25
|
||
offset = (page - 1) * per_page
|
||
|
||
func_val = func.getAllQuestions(limit=per_page, offset=offset)
|
||
combined = func_val[0]
|
||
|
||
if not combined:
|
||
return ""
|
||
|
||
return render_template('snippets/layout/load_more_questions.html', combined=combined, page=page, per_page=per_page, formatRelativeTime=func.formatRelativeTime, trimContent=func.trimContent, urllib=urllib)
|
||
|
||
# -- question routes --
|
||
|
||
@api_bp.route('/add_question/', methods=['POST'])
|
||
def addQuestion():
|
||
try:
|
||
app.logger.debug("[CatAsk/API/add_question] started question flow")
|
||
from_who = request.form.get('from_who', cfg['anonName'])
|
||
question = request.form.get('question', '')
|
||
cw = request.form.get('cw', '')
|
||
|
||
question_normalized = unicodedata.normalize('NFKC', question).replace("⠀", "").strip()
|
||
|
||
if not question or not question_normalized:
|
||
abort(400, _("Question field must not be empty"))
|
||
if len(question) > int(cfg['charLimit']) or len(from_who) > int(cfg['charLimit']):
|
||
abort(400, _("Question exceeds the character limit"))
|
||
return_val = func.addQuestion(from_who, question, cw)
|
||
app.logger.debug("[CatAsk/API/add_question] finished question flow")
|
||
return return_val[0]
|
||
finally:
|
||
if cfg['ntfy']['enabled']:
|
||
# cw, return_val, from_who, question
|
||
ntfy_thread = threading.Thread(target=func.ntfySend, name=f"ntfy thread for {return_val[2]}", args=(cw, return_val, from_who, question,))
|
||
ntfy_thread.start()
|
||
|
||
@api_bp.route('/delete_question/', methods=['DELETE'])
|
||
@loginRequired
|
||
def deleteQuestion():
|
||
question_id = request.args.get('question_id', '')
|
||
if not question_id:
|
||
abort(400, _("Missing 'question_id' attribute or 'question_id' is empty"))
|
||
|
||
conn = func.connectToDb()
|
||
cursor = conn.cursor()
|
||
|
||
app.logger.debug("[CatAsk/API/delete_question] DELETE'ing a question from database")
|
||
|
||
cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,))
|
||
conn.commit()
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
return {'message': _("Successfully deleted question.")}, 200
|
||
|
||
@api_bp.route('/return_to_inbox/', methods=['POST'])
|
||
@loginRequired
|
||
def returnToInbox():
|
||
question_id = request.args.get('question_id', '')
|
||
if not question_id:
|
||
abort(400, _("Missing 'question_id' attribute or 'question_id' is empty"))
|
||
|
||
conn = func.connectToDb()
|
||
cursor = conn.cursor()
|
||
|
||
app.logger.debug("[CatAsk/API/return_to_inbox] SELECT'ing a question from database")
|
||
|
||
cursor.execute("SELECT from_who, content, creation_date, cw FROM questions WHERE id=%s", (question_id,))
|
||
question = cursor.fetchone()
|
||
|
||
# question = {
|
||
# 'from_who': row[0],
|
||
# 'content': row[1],
|
||
# 'creation_date': row[2],
|
||
# 'cw': row[3]
|
||
# }
|
||
|
||
app.logger.debug("[CatAsk/API/return_to_inbox] DELETE'ing a question from database")
|
||
|
||
cursor.execute("DELETE FROM questions WHERE id=%s", (question_id,))
|
||
|
||
app.logger.debug("[CatAsk/API/return_to_inbox] INSERT'ing a question into database")
|
||
|
||
cursor.execute("INSERT INTO questions (from_who, content, creation_date, answered, cw) VALUES (%s, %s, %s, %s, %s)", (question["from_who"], question["content"], question["creation_date"], False, question['cw']))
|
||
conn.commit()
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
return {'message': _("Successfully returned question to inbox.")}, 200
|
||
|
||
@api_bp.route('/pin_question/', methods=['POST'])
|
||
@loginRequired
|
||
def pinQuestion():
|
||
question_id = request.args.get('question_id', '')
|
||
if not question_id:
|
||
abort(400, _("Missing 'question_id' attribute or 'question_id' is empty"))
|
||
|
||
conn = func.connectToDb()
|
||
cursor = conn.cursor()
|
||
|
||
app.logger.debug("[CatAsk/API/pin_question] UPDATE'ing a question to pin it")
|
||
|
||
cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (True, question_id))
|
||
conn.commit()
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
return {'message': _("Successfully pinned question.")}, 200
|
||
|
||
@api_bp.route('/unpin_question/', methods=['POST'])
|
||
@loginRequired
|
||
def unpinQuestion():
|
||
question_id = request.args.get('question_id', '')
|
||
if not question_id:
|
||
abort(400, _("Missing 'question_id' attribute or 'question_id' is empty"))
|
||
|
||
conn = func.connectToDb()
|
||
cursor = conn.cursor()
|
||
|
||
app.logger.debug("[CatAsk/API/unpin_question] UPDATE'ing a question to unpin it")
|
||
|
||
cursor.execute("UPDATE questions SET pinned=%s WHERE id=%s", (False, question_id))
|
||
|
||
conn.commit()
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
return {'message': _("Successfully unpinned question.")}, 200
|
||
|
||
@api_bp.route('/add_answer/', methods=['POST'])
|
||
@loginRequired
|
||
def addAnswer():
|
||
question_id = request.args.get('question_id', '')
|
||
answer = request.form.get('answer')
|
||
cw = request.form.get('cw', '')
|
||
|
||
if not question_id:
|
||
abort(400, _("Missing 'question_id' attribute or 'question_id' is empty"))
|
||
if not answer:
|
||
abort(400, _("Missing 'answer' attribute or 'answer' is empty"))
|
||
|
||
return_val = func.addAnswer(question_id, answer, cw)
|
||
|
||
if cfg['crosspost']['fediverse']['enabled']:
|
||
fedi_crosspost_thread = threading.Thread(target=func.postOnFediverse, name=f"fediverse crosspost thread for {question_id}", args=(question_id, answer, cw,))
|
||
fedi_crosspost_thread.start()
|
||
|
||
if cfg['crosspost']['bluesky']['enabled']:
|
||
bsky_crosspost_thread = threading.Thread(target=func.postOnBluesky, name=f"bluesky crosspost thread for {question_id}", args=(question_id, answer, cw,))
|
||
bsky_crosspost_thread.start()
|
||
|
||
return return_val
|
||
|
||
# -- uploaders --
|
||
|
||
@api_bp.route('/upload_favicon/', methods=['POST'])
|
||
@loginRequired
|
||
def uploadFavicon():
|
||
favicon = request.files.get('favicon')
|
||
|
||
if favicon and func.allowedFile(favicon.filename):
|
||
filename = secure_filename(favicon.filename)
|
||
favicon_path = const.faviconDir / filename
|
||
favicon.save(favicon_path)
|
||
|
||
func.generateFavicon(filename)
|
||
|
||
return {'message': _("Successfully updated favicon!")}, 201
|
||
elif favicon and not func.allowedFile(favicon.filename):
|
||
return {'error': _("File type is not supported")}, 400
|
||
elif not favicon:
|
||
return {'error': _("favicon is not specified")}, 400
|
||
|
||
@api_bp.route('/upload_emoji/', methods=['POST'])
|
||
@loginRequired
|
||
def uploadEmoji():
|
||
if 'emoji' not in request.files:
|
||
return jsonify({'error': _("No file part in the request")}), 400
|
||
|
||
emoji = request.files['emoji']
|
||
if emoji.filename == '':
|
||
return jsonify({'error': _("No file selected for uploading")}), 400
|
||
|
||
if not func.allowedFile(emoji.filename):
|
||
return jsonify({'error': _("Invalid file type. Only png, jpg, jpeg, webp, bmp, jxl, gif supported")}), 400
|
||
|
||
# Secure the filename and determine the archive name
|
||
filename = secure_filename(emoji.filename)
|
||
|
||
emoji_path = const.emojiPath / filename
|
||
|
||
emoji.save(emoji_path)
|
||
|
||
return jsonify({'message': _("Emoji {} successfully uploaded").format(filename)}), 201
|
||
|
||
@api_bp.route('/upload_emoji_pack/', methods=['POST'])
|
||
@loginRequired
|
||
def uploadEmojiPack():
|
||
if 'emoji_archive' not in request.files:
|
||
return jsonify({'error': _("No file part in the request")}), 400
|
||
|
||
emoji_archive = request.files['emoji_archive']
|
||
if emoji_archive.filename == '':
|
||
return jsonify({'error': _("No file selected for uploading")}), 400
|
||
|
||
if not func.allowedArchive(emoji_archive.filename):
|
||
return jsonify({'error': _("Invalid file type. Only .zip, .tar, .tar.gz, .tar.bz2 allowed")}), 400
|
||
|
||
filename = secure_filename(emoji_archive.filename)
|
||
archive_name = func.stripArchExtension(filename)
|
||
|
||
extract_path = const.emojiPath / archive_name
|
||
extract_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
archive_path = extract_path / filename
|
||
emoji_archive.save(archive_path)
|
||
|
||
try:
|
||
if zipfile.is_zipfile(archive_path):
|
||
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
|
||
zip_ref.extractall(extract_path)
|
||
elif tarfile.is_tarfile(archive_path):
|
||
with tarfile.open(archive_path, 'r:*') as tar_ref:
|
||
tar_ref.extractall(extract_path)
|
||
else:
|
||
return jsonify({'error': _("Unsupported archive format")}), 400
|
||
|
||
# parse meta.json if it exists
|
||
meta_json_path = extract_path / 'meta.json'
|
||
if meta_json_path.exists():
|
||
processed_emojis = func.processEmojis(meta_json_path)
|
||
return jsonify({'message': _('Successfully uploaded and processed {} emojis from archive "{}".').format(len(processed_emojis), filename)}), 201
|
||
else:
|
||
return jsonify({'message': _("Archive {} successfully uploaded and extracted.").format(filename)}), 201
|
||
|
||
except Exception as e:
|
||
return jsonify({'error': str(e)}), 500
|
||
finally:
|
||
archive_path.unlink()
|
||
|
||
# -- getters --
|
||
|
||
# this isn't used anywhere yet, but maybe it will in the future
|
||
|
||
@api_bp.route('/get_emojis/', methods=['GET'])
|
||
@loginRequired
|
||
def getEmojis():
|
||
return func.listEmojis()
|
||
|
||
@api_bp.route('/hyper/get_emoji_packs/', methods=['GET'])
|
||
@loginRequired
|
||
def getEmojiPacks():
|
||
index = int(request.args.get('index', ''))
|
||
packs = func.listEmojiPacks()
|
||
pack = packs[index]
|
||
rel_path = pack['relative_path']
|
||
emojis = pack['emojis']
|
||
html = ''
|
||
for emoji in emojis:
|
||
html += f"""
|
||
<div class="mb-2">
|
||
<img src="/{ rel_path }/{ emoji['file_name'] }" width="28" height="28" class="emoji" title=":{ emoji['name'] }:">
|
||
<span>{ emoji['name'] }</span>
|
||
</div>
|
||
"""
|
||
return html
|
||
|
||
# -- delete routes --
|
||
|
||
@api_bp.route('/delete_emoji/', methods=['DELETE'])
|
||
@loginRequired
|
||
def deleteEmoji():
|
||
emoji_name = request.args.get('emoji_name')
|
||
emoji_base_path = const.emojiPath
|
||
emoji_file_path = emoji_base_path / emoji_name
|
||
emoji_ext = os.path.splitext(f"{emoji_base_path}/{emoji_file_path}")
|
||
|
||
if not emoji_file_path.exists() or not emoji_file_path.is_file():
|
||
return jsonify({'error': _("Emoji not found")}), 404
|
||
|
||
try:
|
||
emoji_file_path.unlink()
|
||
return jsonify({'message': _('Emoji "{}" deleted successfully').format(emoji_name)}), 200
|
||
except Exception as e:
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
@api_bp.route('/delete_emoji_pack/', methods=['DELETE'])
|
||
@loginRequired
|
||
def deleteEmojiPack():
|
||
pack_name = request.args.get('pack_name')
|
||
emojis_path = const.emojiPath
|
||
emoji_base_path = Path.cwd() / 'static' / 'emojis' / pack_name.lower()
|
||
|
||
if not emoji_base_path.exists() or not emoji_base_path.is_dir():
|
||
return jsonify({'error': _('Emoji pack "{}" not found').format(pack_name)}), 404
|
||
|
||
try:
|
||
for json_file in emojis_path.glob(f'{pack_name.lower()}.json'):
|
||
json_file.unlink()
|
||
|
||
shutil.rmtree(emoji_base_path)
|
||
return jsonify({'message': _('Emoji pack "{}" deleted successfully.').format(pack_name)}), 200
|
||
except Exception as e:
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
# -- misc --
|
||
|
||
@api_bp.route('/update_blacklist/', methods=['PUT'])
|
||
@loginRequired
|
||
def updateBlacklist():
|
||
action = request.form.get('action')
|
||
blacklist = request.form.get('blacklist')
|
||
with open(const.blacklistFile, 'w') as file:
|
||
file.write(blacklist)
|
||
return {'message': _("Blacklist updated!")}, 200
|
||
|
||
@api_bp.route('/get_question_count/', methods=['GET'])
|
||
def getQuestionCount(answered: bool = None, unread: bool = None):
|
||
conn = func.connectToDb()
|
||
cursor = conn.cursor()
|
||
|
||
app.logger.debug("[CatAsk/API/get_question_count] SELECT'ing question count from database")
|
||
query = "SELECT COUNT(id) FROM questions"
|
||
if (answered != None) and not unread:
|
||
query += " WHERE answered=%s"
|
||
cursor.execute(query, (answered,))
|
||
elif (answered != None) and (unread != None):
|
||
query += " WHERE answered=%s AND unread=%s"
|
||
cursor.execute(query, (answered, unread))
|
||
elif (unread != None) and not answered:
|
||
query += " WHERE unread=%s"
|
||
cursor.execute(query, (unread,))
|
||
else:
|
||
cursor.execute(query)
|
||
|
||
question_count = cursor.fetchone()
|
||
|
||
cursor.close()
|
||
conn.close()
|
||
return str(question_count['count'])
|
||
|
||
# -- import/export --
|
||
|
||
@api_bp.route('/import_data/', methods=['PUT'])
|
||
def importData():
|
||
os.makedirs(const.tempDir, exist_ok=True)
|
||
|
||
archive = request.files['import_archive']
|
||
filename = secure_filename(archive.filename)
|
||
|
||
file_path = const.tempDir / filename
|
||
|
||
archive.save(file_path)
|
||
return func.importData(file_path)
|
||
|
||
@api_bp.route('/import_rs_data/', methods=['PUT'])
|
||
def importRsData():
|
||
os.makedirs(const.tempDir, exist_ok=True)
|
||
|
||
archive = request.files['import_archive']
|
||
filename = secure_filename(archive.filename)
|
||
|
||
file_path = const.tempDir / filename
|
||
|
||
archive.save(file_path)
|
||
return func.retrospringImport(file_path)
|
||
|
||
@api_bp.route('/create_export/', methods=['POST'])
|
||
@loginRequired
|
||
def createExport():
|
||
return func.createExport()
|
||
|
||
@api_bp.route('/delete_export/', methods=['DELETE'])
|
||
@loginRequired
|
||
def deleteExport():
|
||
timestamp = request.args.get('timestamp', '')
|
||
return func.deleteExport(timestamp)
|
||
|
||
# unused
|
||
@api_bp.route('/view_question/', methods=['GET'])
|
||
@loginRequired
|
||
def viewQuestion():
|
||
question_id = request.args.get('question_id', '')
|
||
if not question_id:
|
||
abort(400, _("Missing 'question_id' attribute or 'question_id' is empty"))
|
||
|
||
conn = func.connectToDb()
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT id, from_who, creation_date, content, answered, answer_id FROM questions WHERE id=%s", (question_id,))
|
||
question = cursor.fetchone()
|
||
cursor.close()
|
||
conn.close()
|
||
return jsonify(question)
|
||
|
||
@api_bp.route('/view_answer/', methods=['GET'])
|
||
@loginRequired
|
||
def viewAnswer():
|
||
answer_id = request.args.get('answer_id', '')
|
||
if not answer_id:
|
||
abort(400, _("Missing 'answer_id' attribute or 'answer_id' is empty"))
|
||
|
||
conn = func.connectToDb()
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT id, question_id, creation_date, content FROM answers WHERE id=%s", (answer_id,))
|
||
answer = cursor.fetchone()
|
||
cursor.close()
|
||
conn.close()
|
||
return jsonify(answer)
|
||
|
||
@api_bp.route('/update_config/', methods=['POST'])
|
||
@loginRequired
|
||
def updateConfig():
|
||
cfg = func.loadJSON(const.configFile)
|
||
configuration = request.form
|
||
|
||
for key, value in configuration.items():
|
||
cleaned_key = key.removeprefix('_') if key.startswith('_') else key
|
||
nested_keys = cleaned_key.split('.')
|
||
current_dict = cfg
|
||
|
||
for nested_key in nested_keys[:-1]:
|
||
current_dict = current_dict.setdefault(nested_key, {})
|
||
|
||
# Convert the checkbox value 'True'/'False' strings to actual booleans
|
||
if value.lower() == 'true':
|
||
value = True
|
||
elif value.lower() == 'false':
|
||
value = False
|
||
|
||
current_dict[nested_keys[-1]] = value
|
||
|
||
func.saveJSON(cfg, const.configFile)
|
||
app.config.update(cfg)
|
||
# loading the config file again to read new values
|
||
func.loadJSON(const.configFile)
|
||
return {'message': _("Settings saved!")}
|
||
|
||
app.register_blueprint(api_bp, url_prefix='/api/v1')
|
||
app.register_blueprint(admin_bp, url_prefix='/admin')
|
||
|
||
if __name__ == '__main__':
|
||
app.run(debug=True)
|