2023-07-08 17:00:32 +01:00
|
|
|
#!/usr/bin/python3
|
|
|
|
import os
|
|
|
|
import configparser
|
|
|
|
import sqlite3
|
|
|
|
import time
|
|
|
|
import json
|
|
|
|
import secrets
|
|
|
|
from itertools import groupby
|
|
|
|
from werkzeug.utils import secure_filename
|
|
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
from flask import Flask, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request
|
|
|
|
|
|
|
|
# read config file
|
|
|
|
config = configparser.ConfigParser()
|
|
|
|
config.read("config.ini")
|
|
|
|
|
|
|
|
HOST = config["config"]["HOST"]
|
|
|
|
PORT = config["config"]["PORT"]
|
|
|
|
SECRET_KEY = config["config"]["SECRET_KEY"]
|
|
|
|
UPLOAD_FOLDER = config["config"]["UPLOAD_FOLDER"]
|
|
|
|
UPLOAD_LIMIT = config["config"]["UPLOAD_LIMIT"]
|
|
|
|
PASSWORD_REQUIREMENT = config["config"]["PASSWORD_REQUIREMENT"]
|
|
|
|
|
|
|
|
app = Flask(__name__)
|
|
|
|
app.config["SECRET_KEY"] = SECRET_KEY
|
|
|
|
app.config["MAX_CONTENT_LENGTH"] = int(UPLOAD_LIMIT) * 1000 * 1000
|
|
|
|
|
|
|
|
if SECRET_KEY == "placeholder":
|
|
|
|
print("[WARNING] Secret key is not set")
|
|
|
|
|
|
|
|
if not os.path.exists(UPLOAD_FOLDER):
|
|
|
|
print("[WARNING] Upload folder doesn't exist, creating")
|
|
|
|
os.mkdir(UPLOAD_FOLDER)
|
|
|
|
|
|
|
|
if not os.path.exists("database.db"):
|
|
|
|
print("[ERROR] No database exists, please run init_db")
|
|
|
|
exit()
|
|
|
|
|
|
|
|
|
|
|
|
def makeStrSafe(url):
|
|
|
|
return str(urllib.parse.quote(url)).replace("%20", " ")
|
|
|
|
|
|
|
|
|
|
|
|
def get_db_connection():
|
|
|
|
conn = sqlite3.connect("database.db")
|
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
return conn
|
|
|
|
|
|
|
|
|
|
|
|
def get_user(id):
|
|
|
|
conn = get_db_connection()
|
|
|
|
post = conn.execute("SELECT * FROM users WHERE id = ?",
|
|
|
|
(id,)).fetchone()
|
|
|
|
conn.close()
|
|
|
|
if post is None:
|
|
|
|
return "error"
|
|
|
|
return post
|
|
|
|
|
|
|
|
|
|
|
|
def get_comments(id):
|
|
|
|
conn = get_db_connection()
|
|
|
|
post = conn.execute("SELECT * FROM comments WHERE post_id = ?",
|
|
|
|
(id,)).fetchall()
|
|
|
|
conn.close()
|
|
|
|
if post is None:
|
|
|
|
return "error"
|
|
|
|
return post
|
|
|
|
|
|
|
|
|
|
|
|
app.jinja_env.globals.update(getComments=get_comments)
|
|
|
|
|
|
|
|
def get_post(id):
|
|
|
|
conn = get_db_connection()
|
|
|
|
post = conn.execute("SELECT * FROM posts WHERE id = ?",
|
|
|
|
(id,)).fetchone()
|
|
|
|
conn.close()
|
|
|
|
if post is None:
|
|
|
|
return "error"
|
|
|
|
return post
|
|
|
|
|
|
|
|
|
|
|
|
app.jinja_env.globals.update(getUser=get_user)
|
|
|
|
|
|
|
|
|
|
|
|
def check_username_taken(username):
|
|
|
|
conn = get_db_connection()
|
|
|
|
post = conn.execute("SELECT * FROM users WHERE username = ?",
|
|
|
|
(username,)).fetchone()
|
|
|
|
conn.close()
|
|
|
|
if post is None:
|
|
|
|
return "error"
|
|
|
|
return post["id"]
|
|
|
|
|
|
|
|
|
|
|
|
def get_session(id):
|
|
|
|
conn = get_db_connection()
|
|
|
|
post = conn.execute("SELECT * FROM sessions WHERE session = ?",
|
|
|
|
(id,)).fetchone()
|
|
|
|
conn.close()
|
|
|
|
if post is None:
|
|
|
|
return "error"
|
|
|
|
return post
|
|
|
|
|
|
|
|
|
|
|
|
ALLOWED_EXTENSIONS = {"png", "apng", "jpg", "jpeg", "gif", "svg", "webp"}
|
|
|
|
|
|
|
|
|
|
|
|
def allowed_file(filename):
|
|
|
|
return '.' in filename and \
|
|
|
|
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/", methods=("GET", "POST"))
|
|
|
|
def main():
|
|
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
|
|
conn = get_db_connection()
|
|
|
|
posts = conn.execute("SELECT * FROM posts ORDER BY created DESC;").fetchall()
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
if usersession:
|
|
|
|
userCookie = get_session(usersession)
|
|
|
|
user = get_user(userCookie["id"])
|
|
|
|
return render_template("main.html", userdata=user, posts=posts)
|
|
|
|
else:
|
|
|
|
return render_template("main.html", posts=posts)
|
|
|
|
|
|
|
|
@app.route("/post", methods=("GET", "POST"))
|
|
|
|
def post():
|
|
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
|
|
if usersession:
|
|
|
|
if request.method == "POST":
|
|
|
|
title = request.form["title"]
|
|
|
|
if title == "":
|
|
|
|
flash("Text required :3")
|
|
|
|
return redirect(url_for("post"))
|
|
|
|
|
|
|
|
if "file" not in request.files:
|
|
|
|
flash("No file selected :3")
|
|
|
|
return redirect(url_for("post"))
|
|
|
|
|
|
|
|
file = request.files["file"]
|
|
|
|
if file.filename == "":
|
|
|
|
flash("No file selected :3")
|
|
|
|
return redirect(url_for("post"))
|
|
|
|
|
|
|
|
if not allowed_file(file.filename):
|
|
|
|
flash("File is not an image!")
|
|
|
|
return redirect(url_for("post"))
|
|
|
|
|
|
|
|
filename = secure_filename(file.filename)
|
|
|
|
finalfilename = secrets.token_hex(64) + filename
|
|
|
|
|
|
|
|
file.save(os.path.join(UPLOAD_FOLDER, finalfilename))
|
|
|
|
imgurl = "/cdn/" + finalfilename
|
|
|
|
|
|
|
|
userCookie = get_session(usersession)
|
|
|
|
user = get_user(userCookie["id"])
|
|
|
|
|
|
|
|
if not user["banned"] == "0":
|
|
|
|
flash("Your account has been banned. Reason: " +
|
|
|
|
user["banned"])
|
|
|
|
return redirect(url_for("post"))
|
|
|
|
|
|
|
|
conn = get_db_connection()
|
|
|
|
conn.execute("INSERT INTO posts (textstr, imageurl, creator, created) VALUES (?, ?, ?, ?)",
|
|
|
|
(title, imgurl, userCookie["id"], str(time.time())))
|
|
|
|
conn.commit()
|
|
|
|
conn.close()
|
|
|
|
return redirect(url_for("main"))
|
|
|
|
|
|
|
|
else:
|
|
|
|
userCookie = get_session(usersession)
|
|
|
|
user = get_user(userCookie["id"])
|
|
|
|
return render_template("post.html", userdata=user)
|
|
|
|
else:
|
|
|
|
flash("A burgercat account is required to post :3")
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/comment", methods=("GET", "POST"))
|
|
|
|
def comment():
|
|
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
|
|
if usersession:
|
|
|
|
if request.method == "POST":
|
|
|
|
|
|
|
|
data = request.get_json()
|
|
|
|
uid = data["id"]
|
|
|
|
title = data["title"]
|
|
|
|
|
|
|
|
userCookie = get_session(usersession)
|
|
|
|
user = get_user(userCookie["id"])
|
|
|
|
|
|
|
|
if not user["banned"] == "0":
|
|
|
|
flash("Your account has been banned. Reason: " +
|
|
|
|
user["banned"])
|
|
|
|
return redirect(url_for("comment"))
|
|
|
|
|
|
|
|
conn = get_db_connection()
|
|
|
|
conn.execute("INSERT INTO comments (textstr, post_id, creator, created) VALUES (?, ?, ?, ?)",
|
|
|
|
(title, uid, userCookie["id"], str(time.time())))
|
|
|
|
conn.commit()
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
return "success"
|
|
|
|
|
|
|
|
else:
|
|
|
|
return """<img src="https://http.cat/images/400.jpg">""", 400
|
|
|
|
else:
|
|
|
|
flash("A burgercat account is required to post :3")
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/cdn/<filename>", methods=("GET", "POST"))
|
|
|
|
def cdn(filename):
|
|
|
|
if os.path.exists(os.path.join(UPLOAD_FOLDER, filename)):
|
|
|
|
return send_from_directory(UPLOAD_FOLDER, filename)
|
|
|
|
else:
|
|
|
|
return "file doesn't exist!!"
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/signup", methods=("GET", "POST"))
|
|
|
|
def signup():
|
|
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
|
|
if usersession:
|
|
|
|
return redirect(url_for("main"))
|
|
|
|
if request.method == "POST":
|
|
|
|
if not check_username_taken(request.form["username"]) == "error":
|
|
|
|
flash("Username already taken :3")
|
|
|
|
return redirect(url_for("signup"))
|
|
|
|
|
|
|
|
if not request.form["username"].isalnum():
|
|
|
|
flash("Username must be alphanumeric :3")
|
|
|
|
return redirect(url_for("signup"))
|
|
|
|
|
|
|
|
if not len(request.form["password"]) > int(PASSWORD_REQUIREMENT):
|
|
|
|
flash("Password must contain at least " + PASSWORD_REQUIREMENT + " characters")
|
|
|
|
return redirect(url_for("signup"))
|
|
|
|
|
|
|
|
hashedpassword = generate_password_hash(request.form["password"])
|
|
|
|
|
|
|
|
conn = get_db_connection()
|
|
|
|
conn.execute("INSERT INTO users (username, password, created) VALUES (?, ?, ?)",
|
|
|
|
(request.form["username"], hashedpassword, str(time.time())))
|
|
|
|
conn.commit()
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
else:
|
|
|
|
return render_template("signup.html")
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/login", methods=("GET", "POST"))
|
|
|
|
def login():
|
|
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
|
|
if usersession:
|
|
|
|
redirect(url_for("main"))
|
|
|
|
if request.method == "POST":
|
|
|
|
userID = check_username_taken(request.form["username"])
|
|
|
|
user = get_user(userID)
|
|
|
|
|
|
|
|
if user == "error":
|
|
|
|
flash("Wrong username or password :3")
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
|
|
if not check_password_hash(user["password"], (request.form["password"])):
|
|
|
|
flash("Wrong username or password :3")
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
|
|
randomCharacters = secrets.token_hex(512)
|
|
|
|
|
|
|
|
conn = get_db_connection()
|
|
|
|
conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)",
|
|
|
|
(randomCharacters, userID))
|
|
|
|
conn.commit()
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
resp = make_response(redirect("/"))
|
|
|
|
resp.set_cookie("session_DO_NOT_SHARE", randomCharacters)
|
|
|
|
|
|
|
|
return resp
|
|
|
|
else:
|
|
|
|
return render_template("login.html")
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/settings", methods=("GET", "POST"))
|
|
|
|
def settings():
|
|
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
|
|
if usersession:
|
|
|
|
userCookie = get_session(usersession)
|
|
|
|
user = get_user(userCookie["id"])
|
|
|
|
|
|
|
|
return render_template("settings.html", userdata=user)
|
|
|
|
else:
|
|
|
|
return redirect("/")
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/remove/<postid>", methods=("GET", "POST"))
|
|
|
|
def remove(postid):
|
|
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
|
|
if usersession:
|
|
|
|
userCookie = get_session(usersession)
|
|
|
|
user = get_user(userCookie["id"])
|
|
|
|
if str(user["administrator"]) == "1":
|
|
|
|
post = get_post(postid)
|
|
|
|
conn = get_db_connection()
|
|
|
|
conn.execute("DELETE FROM posts WHERE id = ?", (postid,))
|
|
|
|
conn.commit()
|
|
|
|
conn.close()
|
|
|
|
return "Deleted post!"
|
|
|
|
else:
|
|
|
|
return "nice try"
|
|
|
|
else:
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
|
|
@app.route("/listusers", methods=("GET", "POST"))
|
|
|
|
def listusers():
|
|
|
|
usersession = request.cookies.get("session_DO_NOT_SHARE")
|
|
|
|
if usersession:
|
|
|
|
userCookie = get_session(usersession)
|
|
|
|
user = get_user(userCookie["id"])
|
|
|
|
if str(user["administrator"]) == "1":
|
|
|
|
thing = ""
|
|
|
|
|
|
|
|
conn = get_db_connection()
|
|
|
|
users = conn.execute("SELECT * FROM users").fetchall()
|
|
|
|
conn.close()
|
|
|
|
for x in users:
|
|
|
|
thing = str(x["id"]) + " - " + x["username"] + "<br>" + thing
|
|
|
|
|
|
|
|
return thing
|
|
|
|
else:
|
|
|
|
return """<img src="https://http.cat/images/403.jpg">"""
|
|
|
|
else:
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/settings/logout", methods=("GET", "POST"))
|
|
|
|
def logout():
|
|
|
|
resp = redirect(url_for("main"))
|
|
|
|
session = request.cookies.get("session_DO_NOT_SHARE")
|
|
|
|
resp.delete_cookie("session_DO_NOT_SHARE")
|
|
|
|
|
|
|
|
return resp
|
|
|
|
|
|
|
|
@app.errorhandler(500)
|
|
|
|
def page_not_found(e):
|
|
|
|
return """<img src="https://http.cat/images/500.jpg">""", 500
|
|
|
|
|
|
|
|
@app.errorhandler(400)
|
|
|
|
def page_not_found(e):
|
|
|
|
return """<img src="https://http.cat/images/400.jpg">""", 400
|
|
|
|
|
|
|
|
@app.errorhandler(404)
|
|
|
|
def page_not_found(e):
|
|
|
|
return """<img src="https://http.cat/images/404.jpg">""", 404
|
|
|
|
|
|
|
|
@app.errorhandler(413)
|
|
|
|
def page_not_found(e):
|
|
|
|
return "Images can't be larger than 4MB", 413
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
from waitress import serve
|
|
|
|
print("[INFO] Server started")
|
|
|
|
serve(app, host=HOST, port=PORT)
|
|
|
|
#app.run(host=HOST, port=PORT, debug=True)
|
|
|
|
print("[INFO] Server stopped")
|