This repository has been archived on 2024-10-20. You can view files and clone it, but cannot push or open issues or pull requests.
burgercat/main

297 lines
8.7 KiB
Python

#!/usr/bin/python3
import os
import configparser
import sqlite3
import json
import secrets
from werkzeug.utils import secure_filename
from werkzeug.security import generate_password_hash, check_password_hash
from flask import Flask, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request
# read config file
config = configparser.ConfigParser()
config.read("config.ini")
HOST = config["config"]["HOST"]
PORT = config["config"]["PORT"]
SECRET_KEY = config["config"]["SECRET_KEY"]
UPLOAD_FOLDER = config["config"]["UPLOAD_FOLDER"]
UPLOAD_LIMIT = config["config"]["UPLOAD_LIMIT"]
PASSWORD_REQUIREMENT = config["config"]["PASSWORD_REQUIREMENT"]
app = Flask(__name__)
app.config["SECRET_KEY"] = SECRET_KEY
app.config["MAX_CONTENT_LENGTH"] = int(UPLOAD_LIMIT) * 1000 * 1000
if SECRET_KEY == "placeholder":
print("[WARNING] Secret key is not set")
if not os.path.exists(UPLOAD_FOLDER):
print("[WARNING] Upload folder doesn't exist, creating one")
os.mkdir(UPLOAD_FOLDER)
if not os.path.exists("database.db"):
print("[ERROR] No database exists, please run init_db")
exit()
def makeStrSafe(url):
return str(urllib.parse.quote(url)).replace("%20", " ")
def get_db_connection():
conn = sqlite3.connect("database.db")
conn.row_factory = sqlite3.Row
return conn
def get_user(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM users WHERE id = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
def get_post(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM posts WHERE id = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
app.jinja_env.globals.update(getUser=get_user)
def check_username_taken(username):
conn = get_db_connection()
post = conn.execute("SELECT * FROM users WHERE username = ?",
(username,)).fetchone()
conn.close()
if post is None:
return "error"
return post["id"]
def get_session(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM sessions WHERE session = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
ALLOWED_EXTENSIONS = {"png", "apng", "jpg", "jpeg", "gif", "svg"}
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route("/", methods=("GET", "POST"))
def main():
usersession = request.cookies.get("session_DO_NOT_SHARE")
conn = get_db_connection()
posts = conn.execute(
"SELECT * FROM posts ORDER BY created DESC;").fetchall()
conn.close()
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("main.html", userdata=user, posts=posts)
else:
return render_template("main.html", posts=posts)
@app.route("/post", methods=("GET", "POST"))
def post():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
if request.method == "POST":
title = request.form["title"]
if title == "":
flash("Text required :3")
return redirect(request.url)
if "file" not in request.files:
flash("No file selected :3")
return redirect(request.url)
file = request.files["file"]
if file.filename == "":
flash("No file selected :3")
return redirect(request.url)
if not allowed_file(file.filename):
flash("File is not an image!")
return redirect(request.url)
filename = secure_filename(file.filename)
finalfilename = secrets.token_hex(64) + filename
file.save(os.path.join(UPLOAD_FOLDER, finalfilename))
imgurl = "/cdn/" + finalfilename
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if not user["banned"] == "0":
flash("Your account has been banned. Reason: " +
user["banned"])
return redirect(request.url)
print(userCookie)
conn = get_db_connection()
conn.execute("INSERT INTO posts (textstr, imageurl, creator) VALUES (?, ?, ?)",
(title, imgurl, userCookie["id"]))
conn.commit()
conn.close()
return redirect("/")
else:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("post.html", userdata=user)
else:
flash("A burgercat account is required to post :3")
return redirect("/login")
@app.route("/cdn/<filename>", methods=("GET", "POST"))
def cdn(filename):
if os.path.exists(os.path.join(UPLOAD_FOLDER, filename)):
return send_from_directory(UPLOAD_FOLDER, filename)
else:
return "file doesn't exist!!"
@app.route("/signup", methods=("GET", "POST"))
def signup():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
return redirect("/")
if request.method == "POST":
if not check_username_taken(request.form["username"]) == "error":
flash("Username already taken :3")
return redirect(request.url)
if not request.form["username"].isalnum():
flash("Username must be alphanumeric :3")
return redirect(request.url)
if not len(request.form["password"]) > int(PASSWORD_REQUIREMENT):
flash("Password must contain at least " + PASSWORD_REQUIREMENT + " characters")
return redirect(request.url)
hashedpassword = generate_password_hash(request.form["password"])
conn = get_db_connection()
conn.execute("INSERT INTO users (username, password) VALUES (?, ?)",
(request.form["username"], hashedpassword))
conn.commit()
conn.close()
return redirect("/login")
else:
return render_template("signup.html")
@app.route("/login", methods=("GET", "POST"))
def login():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
return redirect("/")
if request.method == "POST":
userID = check_username_taken(request.form["username"])
user = get_user(userID)
if user == "error":
flash("Wrong username or password :3")
return redirect(request.url)
if not check_password_hash(user["password"], (request.form["password"])):
flash("Wrong username or password :3")
return redirect(request.url)
randomCharacters = secrets.token_hex(512)
conn = get_db_connection()
conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)",
(randomCharacters, userID))
conn.commit()
conn.close()
resp = make_response(redirect("/"))
resp.set_cookie("session_DO_NOT_SHARE", randomCharacters)
return resp
else:
return render_template("login.html")
@app.route("/settings", methods=("GET", "POST"))
def settings():
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
return render_template("settings.html", userdata=user)
else:
return redirect("/")
@app.route("/remove/<postid>", methods=("GET", "POST"))
def remove(postid):
usersession = request.cookies.get("session_DO_NOT_SHARE")
if usersession:
userCookie = get_session(usersession)
user = get_user(userCookie["id"])
if str(user["administrator"]) == "1":
post = get_post(postid)
conn = get_db_connection()
conn.execute("DELETE FROM posts WHERE id = ?", (postid,))
conn.commit()
conn.close()
return "Deleted post!"
else:
return "nice try"
else:
return redirect("/login")
@app.route("/settings/logout", methods=("GET", "POST"))
def logout():
resp = redirect("/")
session = request.cookies.get("session_DO_NOT_SHARE")
resp.delete_cookie("session_DO_NOT_SHARE")
return resp
@app.errorhandler(413)
def page_not_found(e):
return errorMessage("the server decided to commit die")
@app.errorhandler(413)
def page_not_found(e):
return errorMessage("Images can't be larger than 4MB")
if __name__ == "__main__":
from waitress import serve
print("[INFO] Server started")
serve(app, host=HOST, port=PORT)
print("[INFO] Server stopped")