#!/usr/bin/python3 import os import configparser import sqlite3 import time import json import secrets import datetime from itertools import groupby from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash from flask import Flask, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request from flask_limiter import Limiter from flask_limiter.util import get_remote_address # read config file config = configparser.ConfigParser() config.read("config.ini") HOST = config["config"]["HOST"] PORT = config["config"]["PORT"] SECRET_KEY = config["config"]["SECRET_KEY"] UPLOAD_FOLDER = config["config"]["UPLOAD_FOLDER"] UPLOAD_LIMIT = config["config"]["UPLOAD_LIMIT"] PASSWORD_REQUIREMENT = config["config"]["PASSWORD_REQUIREMENT"] app = Flask(__name__) app.config["SECRET_KEY"] = SECRET_KEY app.config["MAX_CONTENT_LENGTH"] = int(UPLOAD_LIMIT) * 1000 * 1000 limiter = Limiter( get_remote_address, app = app, default_limits = ["3 per second"], storage_uri = "memory://", strategy = "fixed-window" ) if SECRET_KEY == "placeholder": print("[WARNING] Secret key is not set") if not os.path.exists(UPLOAD_FOLDER): print("[WARNING] Upload folder doesn't exist, creating") os.mkdir(UPLOAD_FOLDER) if not os.path.exists("database.db"): print("[ERROR] No database exists, please run init_db") exit() def makeStrSafe(url): return str(urllib.parse.quote(url)).replace("%20", " ") def get_db_connection(): conn = sqlite3.connect("database.db") conn.row_factory = sqlite3.Row return conn def get_user(id): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post def get_comments(id): conn = get_db_connection() post = conn.execute("SELECT * FROM comments WHERE post_id = ?", (id,)).fetchall() conn.close() if post is None: return "error" return post app.jinja_env.globals.update(getComments=get_comments) def get_post(id): conn = get_db_connection() post = conn.execute("SELECT * FROM posts WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post app.jinja_env.globals.update(getUser=get_user) def check_username_taken(username): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE lower(username) = ?", (username.lower(),)).fetchone() conn.close() if post is None: return "error" return post["id"] def get_session(id): conn = get_db_connection() post = conn.execute("SELECT * FROM sessions WHERE session = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post ALLOWED_EXTENSIONS = {"png", "apng", "jpg", "jpeg", "gif", "svg", "webp"} def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.route("/", methods=("GET", "POST")) def main(): usersession = request.cookies.get("session_DO_NOT_SHARE") conn = get_db_connection() posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall() conn.close() if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("main.html", userdata=user, posts=posts) else: return render_template("main.html", posts=posts) @app.route("/@", methods=("GET", "POST")) def user(pageusername): usersession = request.cookies.get("session_DO_NOT_SHARE") checkusername = check_username_taken(pageusername) if not checkusername == "error": pageuser = get_user(checkusername) if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("user.html", userdata=user, createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser) else: return render_template("user.html", createddate=datetime.datetime.utcfromtimestamp(int(str(pageuser["created"]).split(".")[0])).strftime("%Y-%m-%d"), pageuser=pageuser) else: return """""", 404 @app.route("/@/edit", methods=("GET", "POST")) def edituser(pageusername): usersession = request.cookies.get("session_DO_NOT_SHARE") checkusername = check_username_taken(pageusername) if not checkusername == "error": pageuser = get_user(checkusername) if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) if pageuser["username"] == user["username"]: if request.method == "POST": code = request.form["code"].replace("Content-Security-Policy", "") conn = get_db_connection() conn.execute("UPDATE users SET htmldescription = ? WHERE id = ?", (code, user["id"])) conn.commit() conn.close() return redirect("/@" + user["username"]) else: return render_template("edituser.html", userdata=user, pageuser=pageuser) else: return """""", 403 else: return """""", 403 else: return """""", 404 @app.route("/api/frontpage", methods=("GET", "POST")) def apifrontpage(): conn = get_db_connection() posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall() conn.close() result = [] for post in posts: comments = [] for comment in get_comments(post["id"]): commentthing = { "title": comment["textstr"], "created": comment["created"], "creator": { "id": comment["creator"], "username": get_user(comment["creator"])["username"] } } comments.append(commentthing) mainthing = { "id": post["id"], "created": post["created"], "title": post["textstr"], "imgurl": post["imageurl"], "creator": { "id": post["creator"], "username": get_user(post["creator"])["username"] }, "comments": comments } result.append(mainthing) return result @app.route("/api/userinfo", methods=("GET", "POST")) def apiuserinfo(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) datatemplate = { "username": user["username"], "id": user["id"], "created": user["created"] } return datatemplate else: return { "error": "no authentication" }, 403 @app.route("/api/login", methods=("GET", "POST")) def apilogin(): usersession = request.cookies.get("session_DO_NOT_SHARE") if request.method == "POST": data = request.get_json() username = data["username"] password = data["password"] userID = check_username_taken(username) user = get_user(userID) if user == "error": return { "error": "wrong username or password" }, 401 if not check_password_hash(user["password"], (password)): return { "error": "wrong username or password" }, 401 randomCharacters = secrets.token_hex(512) conn = get_db_connection() conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)", (randomCharacters, userID)) conn.commit() conn.close() return { "key": randomCharacters }, 100 @app.route("/api/post", methods=("GET", "POST")) def apipost(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: if request.method == "POST": data = request.get_json() title = data["id"] if title == "": return { "error": "no title" }, 403 if "file" not in request.files: return { "error": "no file" }, 403 file = request.files["file"] if file.filename == "": return { "error": "no file" }, 403 if not allowed_file(file.filename): return { "error": "invalid file format" }, 403 filename = secure_filename(file.filename) finalfilename = secrets.token_hex(64) + filename file.save(os.path.join(UPLOAD_FOLDER, finalfilename)) imgurl = "/cdn/" + finalfilename userCookie = get_session(usersession) user = get_user(userCookie["id"]) if not user["banned"] == "0": return { "error": "banned", "reason": user["banned"] }, 403 conn = get_db_connection() conn.execute("INSERT INTO posts (textstr, imageurl, creator, created) VALUES (?, ?, ?, ?)", (title, imgurl, userCookie["id"], str(time.time()))) conn.commit() conn.close() return "success", 100 @app.route("/apidocs", methods=("GET", "POST")) def apidocs(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("apidocs.html", userdata=user) else: return render_template("apidocs.html") @app.route("/post", methods=("GET", "POST")) def post(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: if request.method == "POST": title = request.form["title"] if title == "": flash("Text required :3") return redirect(url_for("post")) if "file" not in request.files: flash("No file selected :3") return redirect(url_for("post")) file = request.files["file"] if file.filename == "": flash("No file selected :3") return redirect(url_for("post")) if not allowed_file(file.filename): flash("File is not an image!") return redirect(url_for("post")) filename = secure_filename(file.filename) finalfilename = secrets.token_hex(64) + filename file.save(os.path.join(UPLOAD_FOLDER, finalfilename)) imgurl = "/cdn/" + finalfilename userCookie = get_session(usersession) user = get_user(userCookie["id"]) if not user["banned"] == "0": flash("Your account has been banned. Reason: " + user["banned"]) return redirect(url_for("post")) conn = get_db_connection() conn.execute("INSERT INTO posts (textstr, imageurl, creator, created) VALUES (?, ?, ?, ?)", (title, imgurl, userCookie["id"], str(time.time()))) conn.commit() conn.close() return redirect(url_for("main")) else: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("post.html", userdata=user) else: flash("A burgercat account is required to post :3") return redirect(url_for("login")) @app.route("/api/comment", methods=("GET", "POST")) @limiter.limit("1/second", override_defaults=False) def comment(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: if request.method == "POST": data = request.get_json() uid = data["id"] title = data["title"] userCookie = get_session(usersession) user = get_user(userCookie["id"]) if not user["banned"] == "0": return { "error": "banned", "reason": user["banned"] }, 403 conn = get_db_connection() conn.execute("INSERT INTO comments (textstr, post_id, creator, created) VALUES (?, ?, ?, ?)", (title, uid, userCookie["id"], str(time.time()))) conn.commit() conn.close() return "success", 100 else: return { "error": "https://http.cat/images/400.jpg" }, 400 else: return { "error": "https://http.cat/images/401.jpg" }, 401 @app.route("/cdn/", methods=("GET", "POST")) def cdn(filename): if os.path.exists(os.path.join(UPLOAD_FOLDER, filename)): return send_from_directory(UPLOAD_FOLDER, filename) else: return "file doesn't exist!!" @app.route("/signup", methods=("GET", "POST")) @limiter.limit("5/minute", override_defaults=False) def signup(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: return redirect(url_for("main")) if request.method == "POST": if not check_username_taken(request.form["username"]) == "error": flash("Username already taken :3") return redirect(url_for("signup")) if not request.form["username"].isalnum(): flash("Username must be alphanumeric :3") return redirect(url_for("signup")) if not len(request.form["password"]) > int(PASSWORD_REQUIREMENT): flash("Password must contain at least " + PASSWORD_REQUIREMENT + " characters") return redirect(url_for("signup")) hashedpassword = generate_password_hash(request.form["password"]) conn = get_db_connection() conn.execute("INSERT INTO users (username, password, created, htmldescription) VALUES (?, ?, ?, ?)", (request.form["username"], hashedpassword, str(time.time()), "")) conn.commit() conn.close() return redirect(url_for("login")) else: return render_template("signup.html") @app.route("/login", methods=("GET", "POST")) @limiter.limit("10/minute", override_defaults=False) def login(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: redirect(url_for("main")) if request.method == "POST": userID = check_username_taken(request.form["username"]) user = get_user(userID) if user == "error": flash("Wrong username or password :3") return redirect(url_for("login")) if not check_password_hash(user["password"], (request.form["password"])): flash("Wrong username or password :3") return redirect(url_for("login")) randomCharacters = secrets.token_hex(512) conn = get_db_connection() conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)", (randomCharacters, userID)) conn.commit() conn.close() resp = make_response(redirect("/")) resp.set_cookie("session_DO_NOT_SHARE", randomCharacters, expires=datetime.datetime.now() + datetime.timedelta(days=float(180))) return resp else: return render_template("login.html") @app.route("/settings", methods=("GET", "POST")) def settings(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("settings.html", userdata=user) else: return redirect("/") @app.route("/remove/", methods=("GET", "POST")) def remove(postid): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) if str(user["administrator"]) == "1": post = get_post(postid) conn = get_db_connection() conn.execute("DELETE FROM posts WHERE id = ?", (postid,)) conn.commit() conn.close() return "Deleted post!" else: return "nice try" else: return redirect(url_for("login")) @app.route("/listusers", methods=("GET", "POST")) def listusers(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) if str(user["administrator"]) == "1": thing = "" conn = get_db_connection() users = conn.execute("SELECT * FROM users").fetchall() conn.close() for x in users: thing = str(x["id"]) + " - " + x["username"] + "
" + thing return thing else: return """""" else: return redirect(url_for("login")) @app.route("/settings/logout", methods=("GET", "POST")) def logout(): resp = redirect(url_for("main")) session = request.cookies.get("session_DO_NOT_SHARE") resp.delete_cookie("session_DO_NOT_SHARE") return resp @app.errorhandler(500) def page_not_found(e): return """""", 500 @app.errorhandler(400) def page_not_found(e): return """""", 400 @app.errorhandler(429) def page_not_found(e): return """""", 429 @app.errorhandler(404) def page_not_found(e): return """""", 404 @app.errorhandler(413) def page_not_found(e): return "Images can't be larger than " + str(UPLOAD_LIMIT) + "MB", 413 if __name__ == "__main__": from waitress import serve print("[INFO] Server started") serve(app, host=HOST, port=PORT) #app.run(host=HOST, port=PORT, debug=True) print("[INFO] Server stopped")