#!/usr/bin/python3 import os import configparser import sqlite3 import time import json import secrets import datetime import socket import threading import subprocess import asyncio from hypercorn.config import Config from hypercorn.asyncio import serve from itertools import groupby from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash from quart import Quart, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request, jsonify, websocket from apscheduler.schedulers.background import BackgroundScheduler # read config file config = configparser.ConfigParser() config.read("config.ini") PORT = config["config"]["PORT"] SECRET_KEY = config["config"]["SECRET_KEY"] UPLOAD_FOLDER = config["config"]["UPLOAD_FOLDER"] UPLOAD_LIMIT = config["config"]["UPLOAD_LIMIT"] PASSWORD_REQUIREMENT = config["config"]["PASSWORD_REQUIREMENT"] app = Quart(__name__) app.config["SECRET_KEY"] = SECRET_KEY app.config["MAX_CONTENT_LENGTH"] = int(UPLOAD_LIMIT) * 1000 * 1000 if SECRET_KEY == "placeholder": print("[WARNING] Secret key is not set") if not os.path.exists(UPLOAD_FOLDER): print("[WARNING] Upload folder doesn't exist, creating") os.mkdir(UPLOAD_FOLDER) if not os.path.exists("database.db"): print("[ERROR] No database exists, please run init_db") exit() def makeStrSafe(url): return str(urllib.parse.quote(url)).replace("%20", " ") def get_db_connection(): conn = sqlite3.connect("database.db") conn.row_factory = sqlite3.Row return conn def get_user(id): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post def get_comments(id): conn = get_db_connection() post = conn.execute("SELECT * FROM comments WHERE post_id = ?", (id,)).fetchall() conn.close() if post is None: return "error" return post def get_messages(chatroomid, max): conn = get_db_connection() post = conn.execute("SELECT * FROM chatmessages WHERE chatroom_id = ? ORDER BY created DESC;", (chatroomid,)).fetchmany(max + 1) conn.close() if post is None: return "error" return post app.jinja_env.globals.update(getComments=get_comments) def get_post(id): conn = get_db_connection() post = conn.execute("SELECT * FROM posts WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post app.jinja_env.globals.update(getUser=get_user) def check_username_taken(username): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE lower(username) = ?", (username.lower(),)).fetchone() conn.close() if post is None: return "error" return post["id"] def get_session(id): conn = get_db_connection() post = conn.execute("SELECT * FROM sessions WHERE session = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post full_hash = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode().strip() short_hash = subprocess.check_output(["git", "rev-parse", "--short=8", "HEAD"]).decode().strip() ALLOWED_EXTENSIONS = {"png", "apng", "jpg", "jpeg", "gif", "svg", "webp", "jxl"} def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.route("/", methods=("GET", "POST")) async def main(): usersession = request.cookies.get("session_DO_NOT_SHARE") conn = get_db_connection() posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall() conn.close() if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return await render_template("main.html", userdata=user, posts=posts, full_hash=full_hash, short_hash=short_hash) else: return await render_template("main.html", posts=posts, full_hash=full_hash, short_hash=short_hash) @app.route("/chat", methods=("GET", "POST")) async def chat(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return await render_template("chat.html", userdata=user) else: return await render_template("chat.html") @app.route("/api/chat/listrooms") async def chatlistrooms(): conn = get_db_connection() rooms = conn.execute("SELECT * FROM chatrooms ORDER BY id ASC;").fetchall() conn.close() template = [] for room in rooms: roomtemplate = { "id": room["id"], "name": room["roomname"] } template.append(roomtemplate) return(template), 200 @app.route("/api/chat/getmessages/<roomid>") async def chatget(roomid): messages = get_messages(roomid, 150) template = [] for message in messages: creatorid = message["creator"] creatortemplate = { "id": message["creator"], "username": get_user(creatorid)["username"] } messagetemplate = { "id": message["id"], "content": message["content"], "creator": creatortemplate, "created": message["created"] } template.append(messagetemplate) return(template), 200 @app.route("/api/chat/send/<roomid>", methods=("GET", "POST")) async def chatsend(roomid): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: if request.method == "POST": data = await request.get_json() content = data["content"] userCookie = get_session(usersession) user = get_user(userCookie["id"]) if not user["banned"] == "0": return { "error": "banned" }, 403 chatMessageContent = { "content": content, "creator": user["username"], "roomid": roomid, "created": str(time.time()) } #message_queue.append({"message": chatMessageContent}) conn = get_db_connection() conn.execute("INSERT INTO chatmessages (content, chatroom_id, creator, created) VALUES (?, ?, ?, ?)", (content, roomid, userCookie["id"], str(time.time()))) conn.commit() conn.close() return "success", 200 @app.route("/@<pageusername>", methods=("GET", "POST")) async def user(pageusername): usersession = request.cookies.get("session_DO_NOT_SHARE") checkusername = check_username_taken(pageusername) if not checkusername == "error": pageuser = get_user(checkusername) if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return await render_template("user.html", userdata=user, createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser) else: return await render_template("user.html", createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser) else: return """<img src="https://http.cat/images/404.jpg">""", 404 @app.route("/api/page/<userid>", methods=("GET", "POST")) async def apipageuser(userid): pageuser = get_user(userid) addhtml = """ <!DOCTYPE html><script> window.stop() </script> <base target="_blank"/> <head><meta http-equiv="Content-Security-Policy" default-src='none'; child-src='none'; content="img-src cdn.discordapp.com cdn.discordapp.net media.tenor.com; media-src: fonts.gstatic.com fonts.googleapis.com;" /></head>""" if not pageuser == "error": return addhtml + pageuser["htmldescription"] else: return """<img src="https://http.cat/images/404.jpg">""", 404 @app.route("/@<pageusername>/edit", methods=("GET", "POST")) async def edituser(pageusername): usersession = request.cookies.get("session_DO_NOT_SHARE") checkusername = check_username_taken(pageusername) if not checkusername == "error": pageuser = get_user(checkusername) if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) if pageuser["username"] == user["username"]: if request.method == "POST": requestData = await request.form code = requestData["code"].replace("Content-Security-Policy", "").replace("<iframe>", "") conn = get_db_connection() conn.execute("UPDATE users SET htmldescription = ? WHERE id = ?", (code, user["id"])) conn.commit() conn.close() return redirect("/@" + user["username"]) else: return await render_template("edituser.html", userdata=user, pageuser=pageuser) else: return """<img src="https://http.cat/images/403.jpg">""", 403 else: return """<img src="https://http.cat/images/403.jpg">""", 403 else: return """<img src="https://http.cat/images/404.jpg">""", 404 @app.route("/api/frontpage", methods=("GET", "POST")) async def apifrontpage(): conn = get_db_connection() posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall() conn.close() result = [] for post in posts: comments = [] for comment in get_comments(post["id"]): commentthing = { "title": comment["textstr"], "id": comment["id"], "created": comment["created"], "creator": { "id": comment["creator"], "username": get_user(comment["creator"])["username"] } } comments.append(commentthing) mainthing = { "id": post["id"], "created": post["created"], "title": post["textstr"], "imgurl": post["imageurl"], "creator": { "id": post["creator"], "username": get_user(post["creator"])["username"] }, "comments": comments } result.append(mainthing) return result @app.route("/api/userinfo", methods=("GET", "POST")) async def apiuserinfo(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) datatemplate = { "username": user["username"], "id": user["id"], "created": user["created"] } return datatemplate else: return { "error": "no authentication" }, 403 @app.route("/api/login", methods=("GET", "POST")) async def apilogin(): usersession = request.cookies.get("session_DO_NOT_SHARE") if request.method == "POST": data = await request.get_json() username = data["username"] password = data["password"] userID = check_username_taken(username) user = get_user(userID) if user == "error": return { "error": "wrong username or password" }, 401 if not check_password_hash(user["password"], (password)): return { "error": "wrong username or password" }, 401 randomCharacters = secrets.token_hex(512) conn = get_db_connection() conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)", (randomCharacters, userID)) conn.commit() conn.close() return { "key": randomCharacters }, 200 else: return { "error": "https://http.cat/images/400.jpg" }, 400 @app.route("/apidocs", methods=("GET", "POST")) async def apidocs(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return await render_template("apidocs.html", userdata=user) else: return await render_template("apidocs.html") @app.route("/post", methods=("GET", "POST")) async def post(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: if request.method == "POST": formData = await request.form formFiles = await request.files title = formData["title"] if title == "": await flash("Text required :3") return redirect(url_for("post")) if len(title) > 300: await flash("Too long title!") return redirect(url_for("post")) file = formFiles["file"] if (file): if not allowed_file(file.filename): await flash("File is not an image!") return redirect(url_for("post")) userCookie = get_session(usersession) user = get_user(userCookie["id"]) if not user["banned"] == "0": await flash("Your account has been suspended. You may no longer post") return redirect(url_for("post")) filename = secure_filename(file.filename) finalfilename = secrets.token_hex(32) + filename await file.save(os.path.join(UPLOAD_FOLDER, finalfilename)) imgurl = "/cdn/" + finalfilename if (not file): imgurl = "" conn = get_db_connection() conn.execute("INSERT INTO posts (textstr, imageurl, creator, created) VALUES (?, ?, ?, ?)", (title, imgurl, userCookie["id"], str(time.time()))) conn.commit() conn.close() return redirect(url_for("main")) else: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return await render_template("post.html", userdata=user) else: await flash("A burgercat account is required to post :3") return redirect(url_for("login")) @app.route("/api/comment", methods=("GET", "POST")) async def comment(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: if request.method == "POST": data = await request.get_json() uid = data["id"] title = data["title"] userCookie = get_session(usersession) user = get_user(userCookie["id"]) if len(title) > 300: return { "error": "too much text" }, 403 if not user["banned"] == "0": return { "error": "banned" }, 403 conn = get_db_connection() conn.execute("INSERT INTO comments (textstr, post_id, creator, created) VALUES (?, ?, ?, ?)", (title, uid, userCookie["id"], str(time.time()))) conn.commit() conn.close() return "success", 200 else: return { "error": "https://http.cat/images/400.jpg" }, 400 else: return { "error": "https://http.cat/images/401.jpg" }, 401 @app.route("/api/post/<post_id>/comments", methods=("GET", "POST")) async def apicomments(post_id): postthing = get_comments(int(post_id)) if not postthing == "error": comments = [] for comment in postthing: commentthing = { "title": comment["textstr"], "created": comment["created"], "creator": { "id": comment["creator"], "username": get_user(comment["creator"])["username"] } } comments.append(commentthing) return comments @app.route("/cdn/<filename>", methods=("GET", "POST")) async def cdn(filename): if os.path.exists(os.path.join(UPLOAD_FOLDER, filename)): return await send_from_directory(UPLOAD_FOLDER, filename) else: return "file doesn't exist!!" @app.route("/signup", methods=("GET", "POST")) async def signup(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: return redirect(url_for("main")) if request.method == "POST": requestData = await request.form if not check_username_taken(requestData["username"]) == "error": await flash("Username already taken :3") return redirect(url_for("signup")) if not requestData["username"].isalnum(): await flash("Username must be alphanumeric :3") return redirect(url_for("signup")) if not len(requestData["password"]) > int(PASSWORD_REQUIREMENT): await flash("Password must contain at least " + PASSWORD_REQUIREMENT + " characters") return redirect(url_for("signup")) hashedpassword = generate_password_hash(requestData["password"]) conn = get_db_connection() conn.execute("INSERT INTO users (username, password, created, htmldescription) VALUES (?, ?, ?, ?)", (requestData["username"], hashedpassword, str(time.time()), "")) conn.commit() conn.close() return redirect(url_for("login")) else: return await render_template("signup.html") @app.route("/login", methods=("GET", "POST")) async def login(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: redirect(url_for("main")) if request.method == "POST": requestData = await request.form userID = check_username_taken(requestData["username"]) user = get_user(userID) if user == "error": await flash("Wrong username or password :3") return redirect(url_for("login")) if not check_password_hash(user["password"], (requestData["password"])): await flash("Wrong username or password :3") return redirect(url_for("login")) randomCharacters = secrets.token_hex(512) conn = get_db_connection() conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)", (randomCharacters, userID)) conn.commit() conn.close() response = Response("""<script>window.location.href = "/";</script>""") response.set_cookie("session_DO_NOT_SHARE", randomCharacters) return response #resp = await make_response(redirect("/")) #resp.set_cookie("session_DO_NOT_SHARE", randomCharacters, expires=datetime.datetime.now() + datetime.timedelta(days=float(180))) #return redirect("/") else: return await render_template("login.html") @app.route("/settings", methods=("GET", "POST")) async def settings(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return await render_template("settings.html", userdata=user, createddate=datetime.datetime.utcfromtimestamp(int(str(user["created"]).split(".")[0])).strftime("%Y-%m-%d %H:%m:%S")) else: return redirect("/") @app.route("/api/delete", methods=("GET", "POST")) async def delete(): usersession = request.cookies.get("session_DO_NOT_SHARE") if request.method == "POST": data = await request.get_json() postid = int(data["id"]) post = get_post(postid) if not post == "error": if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) if (str(user["administrator"]) == "1") or (int(user["id"]) == int(post["creator"])): post = get_post(postid) conn = get_db_connection() conn.execute("DELETE FROM posts WHERE id = ?", (postid,)) conn.commit() conn.close() return "success", 200 else: return { "error": "https://http.cat/images/403.jpg" }, 403 else: return { "error": "https://http.cat/images/400.jpg" }, 400 @app.route("/listusers", methods=("GET", "POST")) async def listusers(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) if str(user["administrator"]) == "1": thing = "" conn = get_db_connection() users = conn.execute("SELECT * FROM users").fetchall() conn.close() for x in users: thing = str(x["id"]) + " - " + x["username"] + "<br>" + thing return thing else: return """<img src="https://http.cat/images/403.jpg">""" else: return redirect(url_for("login")) @app.route("/settings/logout", methods=("GET", "POST")) async def logout(): resp = redirect(url_for("main")) session = request.cookies.get("session_DO_NOT_SHARE") resp.delete_cookie("session_DO_NOT_SHARE") return resp @app.errorhandler(500) async def page_not_found(e): return """<img src="https://http.cat/images/500.jpg">""", 500 @app.errorhandler(400) async def page_not_found(e): return """<img src="https://http.cat/images/400.jpg">""", 400 @app.errorhandler(429) async def page_not_found(e): return """<img src="https://http.cat/images/429.jpg">""", 429 @app.errorhandler(404) async def page_not_found(e): return """<img src="https://http.cat/images/404.jpg">""", 404 @app.errorhandler(413) async def page_not_found(e): return "Images can't be larger than " + str(UPLOAD_LIMIT) + "MB", 413 if __name__ == "__main__": print("[INFO] Server started") hypercornconfig = Config() hypercornconfig.bind = ("0.0.0.0" + ":" + PORT) asyncio.run(serve(app, hypercornconfig)) print("[INFO] Server stopped")