mirror of
https://codeberg.org/catask-org/catask.git
synced 2025-04-19 21:33:41 -05:00
73 lines
1.9 KiB
Python
73 lines
1.9 KiB
Python
from datetime import datetime
|
|
import humanize
|
|
import mysql.connector
|
|
import config as cfg
|
|
import os
|
|
import random
|
|
import constants as const
|
|
|
|
def formatRelativeTime(date_str):
|
|
date_format = "%Y-%m-%d %H:%M:%S"
|
|
past_date = datetime.strptime(date_str, date_format)
|
|
|
|
now = datetime.now()
|
|
time_difference = now - past_date
|
|
|
|
return humanize.naturaltime(time_difference)
|
|
|
|
dbHost = os.environ.get("DB_HOST")
|
|
dbUser = os.environ.get("DB_USER")
|
|
dbPass = os.environ.get("DB_PASS")
|
|
dbName = os.environ.get("DB_NAME")
|
|
|
|
def createDatabase(cursor, dbName):
|
|
try:
|
|
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(dbName))
|
|
print(f"Database {dbName} created successfully")
|
|
except mysql.connector.Error as error:
|
|
print("Failed to create database:", error)
|
|
exit(1)
|
|
|
|
def connectToDb():
|
|
conn = mysql.connector.connect(
|
|
host=dbHost,
|
|
user=dbUser,
|
|
password=dbPass,
|
|
database=dbName,
|
|
pool_name=f"{cfg.appName}-pool",
|
|
pool_size=32,
|
|
autocommit=True
|
|
)
|
|
return conn
|
|
|
|
def getQuestion(question_id: int):
|
|
conn = connectToDb()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM questions WHERE id=%s", (question_id,))
|
|
question = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
return question
|
|
|
|
def getAnswer(question_id: int):
|
|
conn = connectToDb()
|
|
cursor = conn.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM answers WHERE question_id=%s", (question_id,))
|
|
answer = cursor.fetchone()
|
|
cursor.close()
|
|
conn.close()
|
|
return answer
|
|
|
|
def readPlainFile(file, split=False):
|
|
if os.path.exists(file):
|
|
with open(file, 'r', encoding="utf-8") as file:
|
|
if split == False:
|
|
return file.read()
|
|
if split == True:
|
|
return file.read().splitlines()
|
|
else:
|
|
return []
|
|
|
|
def getRandomWord():
|
|
items = readPlainFile(const.antiSpamFile, split=True)
|
|
return random.choice(items)
|