#!/usr/bin/python3 import os import configparser import sqlite3 import json import secrets from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash from flask import Flask, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request # read config file config = configparser.ConfigParser() config.read("config.ini") HOST = config["config"]["HOST"] PORT = config["config"]["PORT"] SECRET_KEY = config["config"]["SECRET_KEY"] UPLOAD_FOLDER = config["config"]["UPLOAD_FOLDER"] UPLOAD_LIMIT = config["config"]["UPLOAD_LIMIT"] PASSWORD_REQUIREMENT = config["config"]["PASSWORD_REQUIREMENT"] app = Flask(__name__) app.config["SECRET_KEY"] = SECRET_KEY app.config["MAX_CONTENT_LENGTH"] = int(UPLOAD_LIMIT) * 1000 * 1000 if SECRET_KEY == "placeholder": print("[WARNING] Secret key is not set") if not os.path.exists(UPLOAD_FOLDER): print("[WARNING] Upload folder doesn't exist, creating one") os.mkdir(UPLOAD_FOLDER) if not os.path.exists("database.db"): print("[ERROR] No database exists, please run init_db") exit() def makeStrSafe(url): return str(urllib.parse.quote(url)).replace("%20", " ") def get_db_connection(): conn = sqlite3.connect("database.db") conn.row_factory = sqlite3.Row return conn def get_user(id): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post def get_post(id): conn = get_db_connection() post = conn.execute("SELECT * FROM posts WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post app.jinja_env.globals.update(getUser=get_user) def check_username_taken(username): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE username = ?", (username,)).fetchone() conn.close() if post is None: return "error" return post["id"] def get_session(id): conn = get_db_connection() post = conn.execute("SELECT * FROM sessions WHERE session = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post ALLOWED_EXTENSIONS = {"png", "apng", "jpg", "jpeg", "gif", "svg", "webp"} def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.route("/", methods=("GET", "POST")) def main(): usersession = request.cookies.get("session_DO_NOT_SHARE") conn = get_db_connection() posts = conn.execute( "SELECT * FROM posts ORDER BY created DESC;").fetchall() conn.close() if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("main.html", userdata=user, posts=posts) else: return render_template("main.html", posts=posts) @app.route("/post", methods=("GET", "POST")) def post(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: if request.method == "POST": title = request.form["title"] if title == "": flash("Text required :3") return redirect(request.url) if "file" not in request.files: flash("No file selected :3") return redirect(request.url) file = request.files["file"] if file.filename == "": flash("No file selected :3") return redirect(request.url) if not allowed_file(file.filename): flash("File is not an image!") return redirect(request.url) filename = secure_filename(file.filename) finalfilename = secrets.token_hex(64) + filename file.save(os.path.join(UPLOAD_FOLDER, finalfilename)) imgurl = "/cdn/" + finalfilename userCookie = get_session(usersession) user = get_user(userCookie["id"]) if not user["banned"] == "0": flash("Your account has been banned. Reason: " + user["banned"]) return redirect(request.url) print(userCookie) conn = get_db_connection() conn.execute("INSERT INTO posts (textstr, imageurl, creator) VALUES (?, ?, ?)", (title, imgurl, userCookie["id"])) conn.commit() conn.close() return redirect("/") else: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("post.html", userdata=user) else: flash("A burgercat account is required to post :3") return redirect("/login") @app.route("/cdn/", methods=("GET", "POST")) def cdn(filename): if os.path.exists(os.path.join(UPLOAD_FOLDER, filename)): return send_from_directory(UPLOAD_FOLDER, filename) else: return "file doesn't exist!!" @app.route("/signup", methods=("GET", "POST")) def signup(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: return redirect("/") if request.method == "POST": if not check_username_taken(request.form["username"]) == "error": flash("Username already taken :3") return redirect(request.url) if not request.form["username"].isalnum(): flash("Username must be alphanumeric :3") return redirect(request.url) if not len(request.form["password"]) > int(PASSWORD_REQUIREMENT): flash("Password must contain at least " + PASSWORD_REQUIREMENT + " characters") return redirect(request.url) hashedpassword = generate_password_hash(request.form["password"]) conn = get_db_connection() conn.execute("INSERT INTO users (username, password) VALUES (?, ?)", (request.form["username"], hashedpassword)) conn.commit() conn.close() return redirect("/login") else: return render_template("signup.html") @app.route("/login", methods=("GET", "POST")) def login(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: return redirect("/") if request.method == "POST": userID = check_username_taken(request.form["username"]) user = get_user(userID) if user == "error": flash("Wrong username or password :3") return redirect(request.url) if not check_password_hash(user["password"], (request.form["password"])): flash("Wrong username or password :3") return redirect(request.url) randomCharacters = secrets.token_hex(512) conn = get_db_connection() conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)", (randomCharacters, userID)) conn.commit() conn.close() resp = make_response(redirect("/")) resp.set_cookie("session_DO_NOT_SHARE", randomCharacters) return resp else: return render_template("login.html") @app.route("/settings", methods=("GET", "POST")) def settings(): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) return render_template("settings.html", userdata=user) else: return redirect("/") @app.route("/remove/", methods=("GET", "POST")) def remove(postid): usersession = request.cookies.get("session_DO_NOT_SHARE") if usersession: userCookie = get_session(usersession) user = get_user(userCookie["id"]) if str(user["administrator"]) == "1": post = get_post(postid) conn = get_db_connection() conn.execute("DELETE FROM posts WHERE id = ?", (postid,)) conn.commit() conn.close() return "Deleted post!" else: return "nice try" else: return redirect("/login") @app.route("/settings/logout", methods=("GET", "POST")) def logout(): resp = redirect("/") session = request.cookies.get("session_DO_NOT_SHARE") resp.delete_cookie("session_DO_NOT_SHARE") return resp @app.errorhandler(413) def page_not_found(e): return "the server decided to commit die" @app.errorhandler(413) def page_not_found(e): return "Images can't be larger than 4MB" if __name__ == "__main__": from waitress import serve print("[INFO] Server started") serve(app, host=HOST, port=PORT) print("[INFO] Server stopped")