mirror of
https://codeberg.org/catask-org/catask.git
synced 2025-04-20 05:43:41 -05:00
169 lines
4.5 KiB
Python
169 lines
4.5 KiB
Python
from flask import url_for, request
|
|
from markupsafe import Markup
|
|
from bleach.sanitizer import Cleaner
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from mistune import HTMLRenderer, escape
|
|
import mistune
|
|
import humanize
|
|
import mysql.connector
|
|
import os
|
|
import random
|
|
import json
|
|
import constants as const
|
|
|
|
# load json file
|
|
def loadJSON(file_path):
|
|
# open the file
|
|
path = Path.cwd() / file_path
|
|
with open(path, 'r', encoding="utf-8") as file:
|
|
# return loaded file
|
|
return json.load(file)
|
|
|
|
# save json file
|
|
def saveJSON(dict, file_path):
|
|
# open the file
|
|
path = Path.cwd() / file_path
|
|
with open(path, 'w', encoding="utf-8") as file:
|
|
# dump the contents
|
|
json.dump(dict, file, indent=4)
|
|
|
|
cfg = loadJSON(const.configFile)
|
|
|
|
def formatRelativeTime(date_str):
|
|
date_format = "%Y-%m-%d %H:%M:%S"
|
|
past_date = datetime.strptime(date_str, date_format)
|
|
|
|
now = datetime.now()
|
|
time_difference = now - past_date
|
|
|
|
return humanize.naturaltime(time_difference)
|
|
|
|
dbHost = os.environ.get("DB_HOST")
|
|
dbUser = os.environ.get("DB_USER")
|
|
dbPass = os.environ.get("DB_PASS")
|
|
dbName = os.environ.get("DB_NAME")
|
|
dbPort = os.environ.get("DB_PORT")
|
|
if not dbPort:
|
|
dbPort = 3306
|
|
|
|
def createDatabase(cursor, dbName):
|
|
try:
|
|
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(dbName))
|
|
print(f"Database {dbName} created successfully")
|
|
except mysql.connector.Error as error:
|
|
print("Failed to create database:", error)
|
|
exit(1)
|
|
|
|
def connectToDb():
|
|
conn = mysql.connector.connect(
|
|
host=dbHost,
|
|
user=dbUser,
|
|
password=dbPass,
|
|
database=dbName,
|
|
port=dbPort,
|
|
pool_name=f"{const.appName}-pool",
|
|
pool_size=32,
|
|
autocommit=True
|
|
)
|
|
return conn
|
|
|
|
def getQuestion(question_id: int):
|
|
conn = connectToDb()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM questions WHERE id=%s", (question_id,))
|
|
question = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
return question
|
|
|
|
def getAnswer(question_id: int):
|
|
conn = connectToDb()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM answers WHERE question_id=%s", (question_id,))
|
|
answer = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
return answer
|
|
|
|
def readPlainFile(file, split=False):
|
|
if os.path.exists(file):
|
|
with open(file, 'r', encoding="utf-8") as file:
|
|
if split == False:
|
|
return file.read()
|
|
if split == True:
|
|
return file.read().splitlines()
|
|
else:
|
|
return []
|
|
|
|
def getRandomWord():
|
|
items = readPlainFile(const.antiSpamFile, split=True)
|
|
return random.choice(items)
|
|
|
|
def trimContent(var, trim):
|
|
trimmed = var[:trim] + '…' if len(var) >= trim else var
|
|
trimmed = trimmed.rstrip()
|
|
return trimmed
|
|
|
|
class FixAsciiEmojis(HTMLRenderer):
|
|
def text(self, text):
|
|
if text.startswith('>') and text.endswith('<'):
|
|
return text
|
|
else:
|
|
return escape(text)
|
|
|
|
def renderMarkdown(text):
|
|
plugins = [
|
|
'strikethrough'
|
|
]
|
|
allowed_tags = [
|
|
'p',
|
|
'em',
|
|
'b',
|
|
'strong',
|
|
'i',
|
|
'br',
|
|
's',
|
|
'del',
|
|
'a'
|
|
]
|
|
allowed_attrs = {
|
|
'a': 'href'
|
|
}
|
|
# hard_wrap=True means that newlines will be
|
|
# converted into <br> tags
|
|
#
|
|
# yes, markdown usually lets you make line breaks only
|
|
# with 2 spaces or <br> tag, but hard_wrap is enabled to keep
|
|
# sanity of whoever will use this software
|
|
# (after all, not everyone knows markdown syntax)
|
|
md = mistune.create_markdown(
|
|
escape=False,
|
|
plugins=plugins,
|
|
hard_wrap=True,
|
|
renderer=FixAsciiEmojis()
|
|
)
|
|
html = md(text)
|
|
# cleaner = Cleaner(tags=allowed_tags, attributes=allowed_attrs)
|
|
# clean_html = cleaner.clean(html)
|
|
return Markup(html)
|
|
|
|
def generateMetadata(question=None, answer=None):
|
|
metadata = {
|
|
'title': cfg['instance']['title'],
|
|
'description': cfg['instance']['description'],
|
|
'url': cfg['instance']['fullBaseUrl'],
|
|
'image': cfg['instance']['image']
|
|
}
|
|
|
|
# if question is specified, generate metadata for that question
|
|
if question and answer:
|
|
metadata.update({
|
|
'title': trimContent(question['content'], 150) + " | " + cfg['instance']['title'],
|
|
'description': trimContent(answer['content'], 150),
|
|
'url': cfg['instance']['fullBaseUrl'] + url_for('viewQuestion', question_id=question['id']),
|
|
'image': cfg['instance']['image']
|
|
})
|
|
|
|
# return 'metadata' dictionary
|
|
return metadata
|