from flask import url_for, request, jsonify, Flask, abort, session from flask_babel import Babel, _, refresh from markupsafe import Markup from bleach.sanitizer import Cleaner from datetime import datetime, timezone from pathlib import Path from mistune import HTMLRenderer, escape from PIL import Image from psycopg.rows import dict_row import base64 import time import zipfile import shutil import subprocess import mistune import humanize import psycopg import re import os import random import json import requests import constants as const app = Flask(const.appName) app.config['BABEL_DEFAULT_LOCALE'] = 'en' app.config['BABEL_TRANSLATION_DIRECTORIES'] = 'locales' # refreshing locale refresh() # update this once more languages are supported app.config['available_languages'] = { "en_US": _("English (US)"), "ru_RU": _("Russian") } def getLocale(): if not session.get('language'): app.config.update(cfg) session['language'] = cfg['languages']['default'] return session.get('language') babel = Babel(app, locale_selector=getLocale) # load json file def loadJSON(file_path): # open the file path = Path.cwd() / file_path with open(path, 'r', encoding="utf-8") as file: # return loaded file return json.load(file) # save json file def saveJSON(dict, file_path): # open the file path = Path.cwd() / file_path with open(path, 'w', encoding="utf-8") as file: # dump the contents json.dump(dict, file, indent=4) # append to a json file def appendToJSON(new_data, file_path): try: # open the file path = Path(file_path) if not path.is_file(): with open(path, 'w', encoding="utf-8") as file: json.dump([], file) with open(path, 'r+', encoding="utf-8") as file: file_data = json.load(file) file_data.append(new_data) file.seek(0) json.dump(file_data, file, indent=4) return True except Exception as e: app.logger.error(str(e)) return False cfg = loadJSON(const.configFile) def formatRelativeTime(date_str): date_format = "%Y-%m-%d %H:%M:%S" past_date = datetime.strptime(date_str, date_format) now = datetime.now() time_difference = now - past_date return humanize.naturaltime(time_difference) def formatRelativeTime2(date_str): date_format = "%Y-%m-%dT%H:%M:%SZ" past_date = None try: if date_str: past_date = datetime.strptime(date_str, date_format) else: pass except ValueError: pass if past_date is None: return '' # raise ValueError("Date string does not match any supported format.") if past_date.tzinfo is None: past_date = past_date.replace(tzinfo=timezone.utc) now = datetime.now(timezone.utc) time_difference = now - past_date return humanize.naturaltime(time_difference) dbHost = os.environ.get("DB_HOST") dbUser = os.environ.get("DB_USER") dbPass = os.environ.get("DB_PASS") dbName = os.environ.get("DB_NAME") dbPort = os.environ.get("DB_PORT") if not dbPort: dbPort = 3306 def createDatabase(cursor, dbName) -> None: try: cursor.execute("CREATE DATABASE {} OWNER {}".format(dbName, dbUser)) print(f"Database {dbName} created successfully") except psycopg.Error as error: print("Failed to create database:", error) exit(1) def connectToDb(): # using dict_row factory here because its easier than modifying now-legacy mysql code return psycopg.connect(f"postgresql://{dbUser}:{dbPass}@{dbHost}/{dbName}", row_factory=dict_row) def getQuestion(question_id: int) -> dict: conn = connectToDb() cursor = conn.cursor() cursor.execute("SELECT * FROM questions WHERE id=%s", (question_id,)) question = cursor.fetchone() question['creation_date'] = question['creation_date'].replace(microsecond=0).replace(tzinfo=None) cursor.close() conn.close() return question def getAllQuestions(limit: int = None, offset: int = None) -> dict: conn = connectToDb() cursor = conn.cursor() app.logger.debug("[CatAsk/functions/getAllQuestions] SELECT'ing all questions with latest answers") query = """ SELECT q.*, a.creation_date AS latest_answer_date FROM questions q LEFT JOIN ( SELECT question_id, MAX(creation_date) AS creation_date FROM answers GROUP BY question_id ) a ON q.id = a.question_id WHERE q.answered = %s ORDER BY q.pinned DESC, (a.creation_date IS NULL), a.creation_date DESC, q.creation_date DESC """ params = [True] if limit is not None: query += " LIMIT %s" params.append(limit) if offset is not None: query += " OFFSET %s" params.append(offset) cursor.execute(query, tuple(params)) questions = cursor.fetchall() app.logger.debug("[CatAsk/functions/getAllQuestions] SELECT'ing answers") cursor.execute("SELECT * FROM answers ORDER BY creation_date DESC") answers = cursor.fetchall() metadata = generateMetadata() combined = [] for question in questions: question['creation_date'] = question['creation_date'].replace(microsecond=0).replace(tzinfo=None) for answer in answers: answer['creation_date'] = answer['creation_date'].replace(microsecond=0).replace(tzinfo=None) question_answers = [answer for answer in answers if answer['question_id'] == question['id']] combined.append({ 'question': question, 'answers': question_answers }) cursor.close() conn.close() return combined, metadata def addQuestion(from_who, question, cw, noAntispam=False): if cfg['antispam']['type'] == 'basic': antispam = request.form.get('antispam', '') elif cfg['antispam']['type'] == 'recaptcha': antispam = request.form.get('g-recaptcha-response', '') elif cfg['antispam']['type'] == 'turnstile': antispam = request.form.get('cf-turnstile-response', '') elif cfg['antispam']['type'] == 'frc': antispam = request.form.get('frc-captcha-response', '') if cfg['antispam']['enabled'] and not noAntispam: if cfg['antispam']['type'] == 'basic': if not antispam: abort(400, "Anti-spam word must not be empty") antispam_wordlist = readPlainFile(const.antiSpamFile, split=True) antispam_valid = antispam in antispam_wordlist if not antispam_valid: # return a generic error message so bad actors wouldn't figure out the antispam list return {'error': _('An error has occurred')}, 500 # it's probably bad to hardcode the siteverify urls, but meh, that will do for now elif cfg['antispam']['type'] == 'recaptcha': r = requests.post( 'https://www.google.com/recaptcha/api/siteverify', data={'response': antispam, 'secret': cfg['antispam']['recaptcha']['secretkey']} ) json_r = r.json() success = json_r['success'] if not success: return {'error': _('An error has occurred')}, 500 elif cfg['antispam']['type'] == 'turnstile': r = requests.post( 'https://challenges.cloudflare.com/turnstile/v0/siteverify', data={'response': antispam, 'secret': cfg['antispam']['turnstile']['secretkey']} ) json_r = r.json() success = json_r['success'] if not success: return {'error': _('An error has occurred')}, 500 elif cfg['antispam']['type'] == 'frc': url = 'https://global.frcapi.com/api/v2/captcha/siteverify' headers = {'X-API-Key': cfg['antispam']['frc']['apikey']} data = {'response': antispam, 'sitekey': cfg['antispam']['frc']['sitekey']} r = requests.post(url, data=data, headers=headers) json_r = r.json() success = json_r['success'] if not success: return {'error': _('An error has occurred')}, 500 blacklist = readPlainFile(const.blacklistFile, split=True) for bad_word in blacklist: if bad_word in question or bad_word in from_who: # return a generic error message so bad actors wouldn't figure out the blacklist return {'error': _('An error has occurred')}, 500 conn = connectToDb() cursor = conn.cursor() app.logger.debug("[CatAsk/API/add_question] INSERT'ing new question into database") cursor.execute("INSERT INTO questions (from_who, content, answered, cw) VALUES (%s, %s, %s, %s) RETURNING id", (from_who, question, False, cw)) question_id = cursor.fetchone()['id'] conn.commit() cursor.close() conn.close() return {'message': _('Question asked successfully!')}, 201, question_id def getAnswer(question_id: int) -> dict: conn = connectToDb() cursor = conn.cursor() cursor.execute("SELECT * FROM answers WHERE question_id=%s", (question_id,)) answer = cursor.fetchone() answer['creation_date'] = answer['creation_date'].replace(microsecond=0).replace(tzinfo=None) cursor.close() conn.close() return answer def addAnswer(question_id: int, answer: str, cw: str) -> dict: conn = connectToDb() try: cursor = conn.cursor() app.logger.debug("[CatAsk/API/add_answer] INSERT'ing an answer into database") cursor.execute("INSERT INTO answers (question_id, content, cw) VALUES (%s, %s, %s) RETURNING id", (question_id, answer, cw)) answer_id = cursor.fetchone()['id'] app.logger.debug("[CatAsk/API/add_answer] UPDATE'ing question to set answered and answer_id") cursor.execute("UPDATE questions SET answered=%s, answer_id=%s WHERE id=%s", (True, answer_id, question_id)) conn.commit() # except Exception as e: # conn.rollback() # app.logger.error(e) # return jsonify({'error': str(e)}), 500 finally: cursor.close() conn.close() return jsonify({'message': _('Answer added successfully!')}), 201 def ntfySend(cw, return_val, from_who, question) -> None: app.logger.debug("[CatAsk/functions/ntfySend] started ntfy flow") ntfy_cw = f" [CW: {cw}]" if cw else "" ntfy_host = cfg['ntfy']['host'] ntfy_topic = cfg['ntfy']['topic'] question_id = return_val[2] # doesn't work otherwise from_who = from_who if from_who else cfg['anonName'] if cfg['ntfy']['user'] and cfg['ntfy']['pass']: ntfy_user = cfg['ntfy']['user'] ntfy_pass = cfg['ntfy']['pass'] ascii_auth = f"{ntfy_user}:{ntfy_pass}".encode('ascii') b64_auth = base64.b64encode(ascii_auth) # there's probably a better way to do this without duplicated code headers={ "Authorization": f"Basic {b64_auth.decode('ascii')}", "Title": f"New question from {from_who}{ntfy_cw}", "Actions": f"view, View question, {cfg['instance']['fullBaseUrl']}/inbox/#question-{question_id}", "Tags": "question" } else: headers={ "Title": f"New question from {from_who}{ntfy_cw}", "Actions": f"view, View question, {cfg['instance']['fullBaseUrl']}/inbox/#question-{question_id}", "Tags": "question" } r = requests.put( f"{ntfy_host}/{ntfy_topic}".encode('utf-8'), data=trimContent(question, int(cfg['trimContentAfter'])), headers=headers ) app.logger.debug("[CatAsk/functions/ntfySend] finished ntfy flow") def readPlainFile(file, split=False): if os.path.exists(file): with open(file, 'r', encoding="utf-8") as file: if split: return file.read().splitlines() else: return file.read() else: return [] def savePlainFile(file, contents): with open(file, 'w') as file: file.write(contents) def getRandomWord(): items = readPlainFile(const.antiSpamFile, split=True) return random.choice(items) def trimContent(var, trim): trim = int(trim) if trim > 0: trimmed = var[:trim] + '…' if len(var) >= trim else var trimmed = trimmed.rstrip() return trimmed else: return var # mistune plugin inlineBtnPattern = r'\[btn\](?P.+?)\[/btn\]' def parse_inline_button(inline, m, state): text = m.group("button_text") state.append_token({"type": "inline_button", "raw": text}) return m.end() def render_inline_button(renderer, text): return f"" def button(md): md.inline.register('inline_button', inlineBtnPattern, parse_inline_button, before='link') if md.renderer and md.renderer.NAME == 'html': md.renderer.register('inline_button', render_inline_button) # Base directory where emoji packs are stored EMOJI_BASE_PATH = Path.cwd() / 'static' / 'emojis' emoji_cache = {} def to_snake_case(name): name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name) return re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name).lower() def find_emoji_path(emoji_name): if '_' in emoji_name: head, sep, tail = emoji_name.partition('_') else: head = to_snake_case(emoji_name).split('_')[0] if any(Path(EMOJI_BASE_PATH).glob(f'{head}.json')): for json_file in Path(EMOJI_BASE_PATH).glob('*.json'): app.logger.debug("\n[CatAsk/functions/find_emoji_path] Using JSON meta file\n") pack_data = loadJSON(json_file) emojis = pack_data.get('emojis', []) for emoji in emojis: if emoji['name'] == emoji_name: rel_dir = json_file.stem emoji_path = os.path.join('static/emojis', rel_dir, emoji['file_name']) emoji_cache[emoji_name] = emoji_path return emoji_path else: for address, dirs, files in os.walk(EMOJI_BASE_PATH): app.logger.debug("\n[CatAsk/functions/find_emoji_path] Falling back to scanning directories\n") if f"{emoji_name}.png" in files: rel_dir = os.path.relpath(address, EMOJI_BASE_PATH) emoji_path = os.path.join("static/emojis", rel_dir, f"{emoji_name}.png") emoji_cache[emoji_name] = emoji_path return emoji_path return None emojiPattern = r':(?P[a-zA-Z0-9_]+):' def parse_emoji(inline, m, state): emoji_name = m.group("emoji_name") state.append_token({"type": "emoji", "raw": emoji_name}) return m.end() def render_emoji(renderer, emoji_name): emoji_path = find_emoji_path(emoji_name) if emoji_path: absolute_emoji_path = url_for('static', filename=emoji_path.replace('static/', '')) return f":{emoji_name}:" return f":{emoji_name}:" def emoji(md): md.inline.register('emoji', emojiPattern, parse_emoji, before='link') if md.renderer and md.renderer.NAME == 'html': md.renderer.register('emoji', render_emoji) def listEmojis(): emojis = [] emoji_base_path = Path.cwd() / 'static' / 'emojis' # Iterate over files that are directly in the emoji base path (not in subdirectories) for file in emoji_base_path.iterdir(): # Only include files, not directories if file.is_file() and file.suffix in {'.png', '.jpg', '.jpeg', '.webp'}: # Get the relative path and name for the emoji relative_path = os.path.relpath(file, emoji_base_path) emojis.append({ 'name': file.stem, # Get the file name without the extension 'image': os.path.join('static/emojis', relative_path), # Full relative path for image 'relative_path': relative_path }) return emojis def listEmojiPacks(): emoji_packs = [] emoji_base_path = const.emojiPath # Iterate through all directories in the emoji base path for pack_dir in emoji_base_path.iterdir(): if pack_dir.is_dir(): relative_path = os.path.relpath(pack_dir, emoji_base_path) # Check if a meta.json file exists in the directory meta_json_path = const.emojiPath / f"{pack_dir}.json" if meta_json_path.exists(): app.logger.debug(f"[CatAsk/functions/listEmojiPacks] Using meta.json file ({meta_json_path})") # Load data from the meta.json file pack_data = loadJSON(meta_json_path) emoji_packs.append({ 'name': pack_data.get('name', pack_dir.name).capitalize(), 'exportedAt': pack_data.get('exportedAt', 'Unknown'), 'preview_image': pack_data.get('preview_image', ''), 'website': pack_data.get('website', ''), 'relative_path': f'static/emojis/{relative_path}', 'emojis': pack_data.get('emojis', []) }) else: app.logger.debug(f"[CatAsk/functions/listEmojiPacks] Falling back to directory scan ({pack_dir})") # If no meta.json is found, fall back to directory scan preview_image = None # Find the first image in the directory for preview for file in pack_dir.iterdir(): if file.suffix in {'.png', '.jpg', '.jpeg', '.webp'}: preview_image = os.path.join('static/emojis', relative_path, file.name) break # Append pack info without meta.json emoji_packs.append({ 'name': pack_dir.name.capitalize(), 'preview_image': preview_image, 'relative_path': f'static/emojis/{relative_path}' }) return emoji_packs def processEmojis(meta_json_path): emoji_metadata = loadJSON(meta_json_path) emojis = emoji_metadata.get('emojis', []) pack_name = emoji_metadata['emojis'][0]['emoji']['category'].capitalize() exported_at = emoji_metadata.get('exportedAt', 'Unknown') website = emoji_metadata.get('host', '') preview_image = os.path.join('static/emojis', pack_name.lower(), emoji_metadata['emojis'][0]['fileName']) relative_path = os.path.join('static/emojis', pack_name.lower()) processed_emojis = [] for emoji in emojis: emoji_info = { 'name': emoji['emoji']['name'], 'file_name': emoji['fileName'], } processed_emojis.append(emoji_info) app.logger.debug(f"[CatAsk/API/upload_emoji_pack] Processed emoji: {emoji_info['name']}\t(File: {emoji_info['file_name']})") # Create the pack info structure pack_info = { 'name': pack_name, 'exportedAt': exported_at, 'preview_image': preview_image, 'relative_path': relative_path, 'website': website, 'emojis': processed_emojis } # Save the combined pack info to .json pack_json_name = const.emojiPath / f"{pack_name.lower()}.json" saveJSON(pack_info, pack_json_name) return processed_emojis def renderMarkdown(text, allowed_tags=None): plugins = [ 'strikethrough', button, emoji ] if not allowed_tags: allowed_tags = [ 'p', 'em', 'b', 'strong', 'i', 'br', 's', 'del', 'a', 'button', 'ol', 'li', 'hr', 'img', 'code', 'pre' ] allowed_attrs = { 'a': 'href', 'button': 'class', 'img': { 'class': lambda value: value == "emoji", 'src': True, # Allow specific attributes on emoji images 'alt': True, 'title': True, 'width': True, 'height': True, 'loading': True } } # hard_wrap=True means that newlines will be # converted into
tags # # yes, markdown usually lets you make line breaks only # with 2 spaces or
tag, but hard_wrap is enabled to keep # sanity of whoever will use this software # (after all, not everyone knows markdown syntax) md = mistune.create_markdown( escape=True, plugins=plugins, hard_wrap=True ) html = md(text) cleaner = Cleaner(tags=allowed_tags, attributes=allowed_attrs) clean_html = cleaner.clean(html) return Markup(clean_html) def generateMetadata(question=None, answer=None): metadata = { 'title': cfg['instance']['title'], 'description': cfg['instance']['description'], 'url': cfg['instance']['fullBaseUrl'], 'image': cfg['instance']['image'] } # if question is specified, generate metadata for that question if question and answer: metadata.update({ 'title': trimContent(question['content'], 150) + " | " + cfg['instance']['title'], 'description': trimContent(answer['content'], 150), 'url': cfg['instance']['fullBaseUrl'] + url_for('viewQuestion', question_id=question['id']), 'image': cfg['instance']['image'] }) # return 'metadata' dictionary return metadata allowedFileExtensions = {'png', 'jpg', 'jpeg', 'webp', 'bmp', 'jxl'} allowedArchiveExtensions = {'zip', 'tar', 'gz', 'bz2', 'xz'} def allowedFile(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowedFileExtensions def allowedArchive(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowedArchiveExtensions def stripArchExtension(filename): if filename.endswith(('.tar.gz', '.tar.bz2', '.tar.xz')): filename = filename.rsplit('.', 2)[0] else: filename = filename.rsplit('.', 1)[0] return filename def generateFavicon(file_name): sizes = { 'apple-touch-icon.png': (180, 180), 'android-chrome-192x192.png': (192, 192), 'android-chrome-512x512.png': (512, 512), 'favicon-32x32.png': (32, 32), 'favicon-16x16.png': (16, 16), 'favicon.ico': (16, 16) } img = Image.open(const.faviconDir / file_name) if not os.path.exists(const.faviconDir): os.makedirs(const.faviconDir) for filename, size in sizes.items(): resized_img = img.resize(size) resized_img_absolute_path = const.faviconDir / filename resized_img.save(resized_img_absolute_path) def createExport(): try: # just to test if connection works conn = connectToDb() conn.close() timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") timestamp_morereadable = datetime.now().strftime('%b %d, %Y %H:%M') export_dir = const.exportsDir temp_dir = const.tempDir os.makedirs(export_dir, exist_ok=True) os.makedirs(temp_dir, exist_ok=True) config_dest_path = temp_dir / const.configFile shutil.copy(const.configFile, config_dest_path) # Export database to SQL file dump_file = temp_dir / 'database.sql' result = subprocess.Popen( f'pg_dump -U {dbUser} -d {dbName} -F c -E UTF8 -f {dump_file}', stdin=subprocess.PIPE, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", env=dict(os.environ, PGPASSWORD=dbPass) ) # absolutely dumb workaround for an error time.sleep(1) # Create export zip archive zip_file_path = export_dir / f'export-{timestamp}.zip' with zipfile.ZipFile(zip_file_path, 'w') as export_zip: export_zip.write(config_dest_path, arcname=const.configFile) export_zip.write(dump_file, arcname='database.sql') # Add favicon and emojis folders to the zip archive favicon_dir = Path('static/icons/favicon') emojis_dir = Path('static/emojis') if favicon_dir.exists(): for root, _, files in os.walk(favicon_dir): for file in files: file_path = Path(root) / file export_zip.write(file_path, arcname=file_path.relative_to(favicon_dir.parent.parent)) if emojis_dir.exists(): for root, _, files in os.walk(emojis_dir): for file in files: file_path = Path(root) / file export_zip.write(file_path, arcname=file_path.relative_to(emojis_dir.parent)) # Record export metadata export_data = { 'timestamp_esc': timestamp, 'timestamp': timestamp_morereadable, 'downloadPath': str(zip_file_path) } appendToJSON(export_data, const.exportsFile) shutil.rmtree(temp_dir) return {'message': _('Export created successfully!')} except psycopg.Error as e: return {'error': str(e)}, 500 except Exception as e: return {'error': str(e)}, 500 def importData(export_file) -> dict: try: shutil.unpack_archive(export_file, const.tempDir) # Replace config file os.remove(const.configFile) shutil.move(const.tempDir / const.configFile, Path.cwd() / const.configFile) # Replace favicon and emojis folders favicon_dest = Path('static/icons/favicon') emojis_dest = Path('static/emojis') shutil.rmtree(favicon_dest) shutil.copytree(const.tempDir / 'icons' / 'favicon', favicon_dest) shutil.rmtree(emojis_dest) shutil.copytree(const.tempDir / 'emojis', emojis_dest) # Restore database from SQL file conn = connectToDb() cursor = conn.cursor() dump_file = const.tempDir / 'database.sql' process = subprocess.Popen( f'pg_restore --clean -U {dbUser} -d {dbName} {dump_file}', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", env=dict(os.environ, PGPASSWORD=dbPass) ) shutil.rmtree(const.tempDir) return {'message': _('Data imported successfully!')} except Exception as e: return {'error': str(e)}, 500 # will probably get to it in 1.8.0 because my brain can't do it rn # 2.1.0 maybe -1/12/25 """ def retrospringImport(export_file): shutil.unpack_archive(export_file, const.tempDir) # probably a hack but whateva export_dirname = Path(export_file).stem export_dir = const.tempDir / export_dirname conn = connectToDb() cursor = conn.cursor() questions_file = loadJSON(export_dir / 'questions.json') answers_file = loadJSON(export_dir / 'answers.json') # Extract answers list questions_list = questions_file.get('questions', []) answers_list = answers_file.get('answers', []) # ['related']['question']['anonymous'] for question in questions_list: # addQuestion(answer['related']['question']['anonymous'], question['content'], None, noAntispam=True) for answer in answers_list: print("anonymous:", answer['related']['question']['anonymous']) print(question['id'], answer['content'], None) # addAnswer(question['id'], answer['content'], None) # shutil.rmtree(const.tempDir) cursor.close() conn.close() """ def deleteExport(timestamp): try: export_file = Path('static') / 'exports' / f'export-{timestamp}.zip' data = loadJSON(const.exportsFile) data = [export for export in data if export["timestamp_esc"] != timestamp] export_file.unlink() saveJSON(data, const.exportsFile) return {'message': f'Export {timestamp} deleted successfully.'} except Exception as e: return {'error': str(e)}, 500 # reserved for 1.7.0 or later """ def getUserIp(): if request.environ.get('HTTP_X_FORWARDED_FOR') is None: return request.environ['REMOTE_ADDR'] else: return request.environ['HTTP_X_FORWARDED_FOR'] def isIpBlacklisted(user_ip): blacklist = readPlainFile(const.ipBlacklistFile, split=True) return user_ip in blacklist """