Burgernotes/main

318 lines
8.7 KiB
Plaintext
Raw Normal View History

2023-07-21 20:52:06 +01:00
#!/usr/bin/python3
import os
import sqlite3
import time
import secrets
import configparser
from waitress import serve
from flask import Flask, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request
# Parse configuration file, and check if anything is wrong with it
config = configparser.ConfigParser()
config.read("config.ini")
HOST = config["config"]["HOST"]
PORT = config["config"]["PORT"]
SECRET_KEY = config["config"]["SECRET_KEY"]
2023-07-22 17:15:59 +01:00
MAX_STORAGE = config["config"]["MAX_STORAGE"]
2023-07-21 20:52:06 +01:00
if SECRET_KEY == "placeholder":
print("[WARNING] Secret key not set")
# Define Flask
app = Flask(__name__)
app.config["SECRET_KEY"] = SECRET_KEY
# Database functions
def get_db_connection():
conn = sqlite3.connect("database.db")
conn.row_factory = sqlite3.Row
return conn
def get_user(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM users WHERE id = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
def get_note(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM notes WHERE id = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
2023-07-22 17:15:59 +01:00
def get_space(id):
conn = get_db_connection()
notes = conn.execute("SELECT content FROM notes WHERE creator = ? ORDER BY id DESC;", (id,)).fetchall()
conn.close()
spacetaken = 0
for x in notes:
spacetaken = spacetaken + len(x["content"].encode("utf-8"))
return spacetaken
2023-07-21 20:52:06 +01:00
def get_session(id):
conn = get_db_connection()
post = conn.execute("SELECT * FROM sessions WHERE session = ?",
(id,)).fetchone()
conn.close()
if post is None:
return "error"
return post
def check_username_taken(username):
conn = get_db_connection()
post = conn.execute("SELECT * FROM users WHERE lower(username) = ?",
(username.lower(),)).fetchone()
conn.close()
if post is None:
return "error"
return post["id"]
# Main page
@app.route("/")
def main():
return render_template("main.html")
# Web app
@app.route("/app")
def webapp():
return render_template("app.html")
# Login and signup
@app.route("/signup")
def signup():
return render_template("signup.html")
@app.route("/login")
def login():
return render_template("login.html")
# API
@app.route("/api/signup", methods=("GET", "POST"))
def apisignup():
if request.method == "POST":
data = request.get_json()
username = data["username"]
password = data["password"]
if username == "":
return {}, 422
if len(username) > 20:
return {}, 422
if not username.isalnum():
return {}, 422
if password == "":
return {}, 422
if len(password) < 14:
return {}, 422
if not check_username_taken(username) == "error":
return {}, 409
conn = get_db_connection()
conn.execute("INSERT INTO users (username, password, created) VALUES (?, ?, ?)",
(username, password, str(time.time())))
conn.commit()
conn.close()
userID = check_username_taken(username)
user = get_user(userID)
randomCharacters = secrets.token_hex(512)
conn = get_db_connection()
conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)",
(randomCharacters, userID))
conn.commit()
conn.close()
return {
"key": randomCharacters
}, 200
@app.route("/api/login", methods=("GET", "POST"))
def apilogin():
if request.method == "POST":
data = request.get_json()
username = data["username"]
password = data["password"]
check_username_thing = check_username_taken(username)
if check_username_thing == "error":
return {}, 401
userID = check_username_taken(username)
user = get_user(userID)
if not password == user["password"]:
return {}, 401
randomCharacters = secrets.token_hex(512)
conn = get_db_connection()
conn.execute("INSERT INTO sessions (session, id) VALUES (?, ?)",
(randomCharacters, userID))
conn.commit()
conn.close()
return {
"key": randomCharacters
}, 200
@app.route("/api/userinfo", methods=("GET", "POST"))
def apiuserinfo():
if request.method == "POST":
data = request.get_json()
secretKey = data["secretKey"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
datatemplate = {
"username": user["username"],
"id": user["id"],
2023-07-22 17:15:59 +01:00
"created": user["created"],
"storageused": get_space(user["id"]),
"storagemax": int(MAX_STORAGE)
2023-07-21 20:52:06 +01:00
}
return datatemplate
@app.route("/api/listnotes", methods=("GET", "POST"))
def apilistnotes():
if request.method == "POST":
data = request.get_json()
secretKey = data["secretKey"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
conn = get_db_connection()
notes = conn.execute("SELECT * FROM notes WHERE creator = ? ORDER BY id DESC;", (user["id"],)).fetchall()
conn.close()
datatemplate = []
for note in notes:
notetemplate = {
"id": note["id"],
"title": note["title"]
}
datatemplate.append(notetemplate)
return datatemplate, 200
@app.route("/api/newnote", methods=("GET", "POST"))
def apinewnote():
if request.method == "POST":
data = request.get_json()
secretKey = data["secretKey"]
noteName = data["noteName"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
conn = get_db_connection()
conn.execute("INSERT INTO notes (title, content, creator, created, edited) VALUES (?, ?, ?, ?, ?)",
(noteName, "", user["id"], str(time.time()), str(time.time())))
conn.commit()
conn.close()
return {}, 200
@app.route("/api/readnote", methods=("GET", "POST"))
def apireadnote():
if request.method == "POST":
data = request.get_json()
secretKey = data["secretKey"]
noteId = data["noteId"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
note = get_note(noteId)
if (note != "error"):
if (user["id"] == note["creator"]):
contenttemplate = {
"content": note["content"]
}
return contenttemplate, 200
else:
return {}, 422
else:
return {}, 422
@app.route("/api/editnote", methods=("GET", "POST"))
def apieditnote():
if request.method == "POST":
data = request.get_json()
secretKey = data["secretKey"]
noteId = data["noteId"]
content = data["content"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
note = get_note(noteId)
2023-07-22 17:15:59 +01:00
if get_space(user["id"]) + len(content.encode("utf-8")) > int(MAX_STORAGE):
return {}, 418
2023-07-21 20:52:06 +01:00
if (note != "error"):
if (user["id"] == note["creator"]):
conn = get_db_connection()
conn.execute("UPDATE notes SET content = ?, edited = ? WHERE id = ?", (content, str(time.time()), noteId))
conn.commit()
conn.close()
return {}, 200
else:
return {}, 403
else:
return {}, 422
@app.route("/api/removenote", methods=("GET", "POST"))
def apiremovenote():
if request.method == "POST":
data = request.get_json()
secretKey = data["secretKey"]
noteId = data["noteId"]
userCookie = get_session(secretKey)
user = get_user(userCookie["id"])
note = get_note(noteId)
if (note != "error"):
if (user["id"] == note["creator"]):
conn = get_db_connection()
conn.execute("DELETE FROM notes WHERE id = ?", (noteId,))
conn.commit()
conn.close()
return {}, 200
else:
return {}, 403
else:
return {}, 422
@app.route("/api/logout")
def apilogout():
return render_template("logout.html")
# Start server
if __name__ == "__main__":
print("[INFO] Server started")
serve(app, host=HOST, port=PORT)
print("[INFO] Server stopped")