#!/usr/bin/python3 import os import sqlite3 import time import secrets import configparser from waitress import serve from werkzeug.security import generate_password_hash, check_password_hash from flask import Flask, render_template, request, url_for, flash, redirect, session, make_response, send_from_directory, stream_with_context, Response, request # Parse configuration file, and check if anything is wrong with it config = configparser.ConfigParser() config.read("config.ini") HOST = config["config"]["HOST"] PORT = config["config"]["PORT"] SECRET_KEY = config["config"]["SECRET_KEY"] MAX_STORAGE = config["config"]["MAX_STORAGE"] if SECRET_KEY == "placeholder": print("[WARNING] Secret key not set") # Define Flask app = Flask(__name__) app.config["SECRET_KEY"] = SECRET_KEY # Database functions def get_db_connection(): conn = sqlite3.connect("database.db") conn.row_factory = sqlite3.Row return conn def get_user(id): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post def get_note(id): conn = get_db_connection() post = conn.execute("SELECT * FROM notes WHERE id = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post def get_space(id): conn = get_db_connection() notes = conn.execute("SELECT content, title FROM notes WHERE creator = ? ORDER BY id DESC;", (id,)).fetchall() conn.close() spacetaken = 0 for x in notes: spacetaken = spacetaken + len(x["content"].encode("utf-8")) spacetaken = spacetaken + len(x["title"].encode("utf-8")) return spacetaken def get_note_count(id): conn = get_db_connection() notes = conn.execute("SELECT content, title FROM notes WHERE creator = ? ORDER BY id DESC;", (id,)).fetchall() conn.close() notecount = 0 for x in notes: notecount = notecount + 1 return notecount def get_session(id): conn = get_db_connection() post = conn.execute("SELECT * FROM sessions WHERE session = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post def get_session_from_sessionid(id): conn = get_db_connection() post = conn.execute("SELECT * FROM sessions WHERE sessionid = ?", (id,)).fetchone() conn.close() if post is None: return "error" return post def check_username_taken(username): conn = get_db_connection() post = conn.execute("SELECT * FROM users WHERE lower(username) = ?", (username.lower(),)).fetchone() conn.close() if post is None: return "error" return post["id"] # Main page @app.route("/") def main(): return render_template("main.html") # Web app @app.route("/app") def webapp(): return render_template("app.html") # Login and signup @app.route("/signup") def signup(): return render_template("signup.html") @app.route("/login") def login(): return render_template("login.html") @app.route("/privacy") def privacy(): return render_template("privacy.html") # API @app.route("/api/signup", methods=("GET", "POST")) def apisignup(): if request.method == "POST": data = request.get_json() username = data["username"] password = data["password"] if username == "": return {}, 422 if len(username) > 20: return {}, 422 if not username.isalnum(): return {}, 422 if password == "": return {}, 422 if len(password) < 14: return {}, 422 if not check_username_taken(username) == "error": return {}, 409 hashedpassword = generate_password_hash(password) conn = get_db_connection() conn.execute("INSERT INTO users (username, password, created) VALUES (?, ?, ?)", (username, hashedpassword, str(time.time()))) conn.commit() conn.close() userID = check_username_taken(username) user = get_user(userID) randomCharacters = secrets.token_hex(512) conn = get_db_connection() conn.execute("INSERT INTO sessions (session, id, device) VALUES (?, ?, ?)", (randomCharacters, userID, request.headers.get("user-agent"))) conn.commit() conn.close() return { "key": randomCharacters }, 200 @app.route("/api/login", methods=("GET", "POST")) def apilogin(): if request.method == "POST": data = request.get_json() username = data["username"] password = data["password"] check_username_thing = check_username_taken(username) if check_username_thing == "error": return {}, 401 userID = check_username_taken(username) user = get_user(userID) if not check_password_hash(user["password"], (password)): return {}, 401 randomCharacters = secrets.token_hex(512) conn = get_db_connection() conn.execute("INSERT INTO sessions (session, id, device) VALUES (?, ?, ?)", (randomCharacters, userID, request.headers.get("user-agent"))) conn.commit() conn.close() return { "key": randomCharacters }, 200 @app.route("/api/userinfo", methods=("GET", "POST")) def apiuserinfo(): if request.method == "POST": data = request.get_json() secretKey = data["secretKey"] userCookie = get_session(secretKey) user = get_user(userCookie["id"]) datatemplate = { "username": user["username"], "id": user["id"], "created": user["created"], "storageused": get_space(user["id"]), "storagemax": MAX_STORAGE, "notecount": get_note_count(user["id"]) } return datatemplate @app.route("/api/listnotes", methods=("GET", "POST")) def apilistnotes(): if request.method == "POST": data = request.get_json() secretKey = data["secretKey"] userCookie = get_session(secretKey) user = get_user(userCookie["id"]) conn = get_db_connection() notes = conn.execute("SELECT * FROM notes WHERE creator = ? ORDER BY id DESC;", (user["id"],)).fetchall() conn.close() datatemplate = [] for note in notes: notetemplate = { "id": note["id"], "title": note["title"] } datatemplate.append(notetemplate) return datatemplate, 200 @app.route("/api/exportnotes", methods=("GET", "POST")) def apiexportnotes(): if request.method == "POST": data = request.get_json() secretKey = data["secretKey"] userCookie = get_session(secretKey) user = get_user(userCookie["id"]) conn = get_db_connection() notes = conn.execute("SELECT * FROM notes WHERE creator = ? ORDER BY id DESC;", (user["id"],)).fetchall() conn.close() datatemplate = [] for note in notes: notetemplate = { "id": note["id"], "created": note["created"], "edited": note["edited"], "title": note["title"], "content": note["content"] } datatemplate.append(notetemplate) return datatemplate, 200 @app.route("/api/newnote", methods=("GET", "POST")) def apinewnote(): if request.method == "POST": data = request.get_json() secretKey = data["secretKey"] noteName = data["noteName"] userCookie = get_session(secretKey) user = get_user(userCookie["id"]) conn = get_db_connection() conn.execute("INSERT INTO notes (title, content, creator, created, edited) VALUES (?, ?, ?, ?, ?)", (noteName, "", user["id"], str(time.time()), str(time.time()))) conn.commit() conn.close() return {}, 200 @app.route("/api/readnote", methods=("GET", "POST")) def apireadnote(): if request.method == "POST": data = request.get_json() secretKey = data["secretKey"] noteId = data["noteId"] userCookie = get_session(secretKey) user = get_user(userCookie["id"]) note = get_note(noteId) if (note != "error"): if (user["id"] == note["creator"]): contenttemplate = { "content": note["content"] } return contenttemplate, 200 else: return {}, 422 else: return {}, 422 @app.route("/api/editnote", methods=("GET", "POST")) def apieditnote(): if request.method == "POST": data = request.get_json() secretKey = data["secretKey"] noteId = data["noteId"] content = data["content"] userCookie = get_session(secretKey) user = get_user(userCookie["id"]) note = get_note(noteId) if get_space(user["id"]) + len(content.encode("utf-8")) > int(MAX_STORAGE): return {}, 418 if (note != "error"): if (user["id"] == note["creator"]): conn = get_db_connection() conn.execute("UPDATE notes SET content = ?, edited = ? WHERE id = ?", (content, str(time.time()), noteId)) conn.commit() conn.close() return {}, 200 else: return {}, 403 else: return {}, 422 @app.route("/api/removenote", methods=("GET", "POST")) def apiremovenote(): if request.method == "POST": data = request.get_json() secretKey = data["secretKey"] noteId = data["noteId"] userCookie = get_session(secretKey) user = get_user(userCookie["id"]) note = get_note(noteId) if (note != "error"): if (user["id"] == note["creator"]): conn = get_db_connection() conn.execute("DELETE FROM notes WHERE id = ?", (noteId,)) conn.commit() conn.close() return {}, 200 else: return {}, 403 else: return {}, 422 @app.route("/api/deleteaccount", methods=("GET", "POST")) def apideleteaccount(): if request.method == "POST": data = request.get_json() secretKey = data["secretKey"] userCookie = get_session(secretKey) user = get_user(userCookie["id"]) conn = get_db_connection() conn.execute("DELETE FROM notes WHERE creator = ?", (userCookie["id"],)) conn.commit() conn.close() conn = get_db_connection() conn.execute("DELETE FROM users WHERE id = ?", (userCookie["id"],)) conn.commit() conn.close() return {}, 200 @app.route("/api/sessions/list", methods=("GET", "POST")) def apisessionslist(): if request.method == "POST": data = request.get_json() secretKey = data["secretKey"] userCookie = get_session(secretKey) user = get_user(userCookie["id"]) conn = get_db_connection() sessions = conn.execute("SELECT * FROM sessions WHERE id = ? ORDER BY id DESC;", (user["id"],)).fetchall() conn.close() datatemplate = [] for x in sessions: device = x["device"] thisSession = False if (x["session"] == secretKey): thisSession = True sessiontemplate = { "id": x["sessionid"], "thisSession": thisSession, "device": device } datatemplate.append(sessiontemplate) return datatemplate, 200 @app.route("/api/sessions/remove", methods=("GET", "POST")) def apisessionsremove(): if request.method == "POST": data = request.get_json() secretKey = data["secretKey"] sessionId = data["sessionId"] userCookie = get_session(secretKey) user = get_user(userCookie["id"]) session = get_session_from_sessionid(sessionId) if (session != "error"): if (user["id"] == session["id"]): conn = get_db_connection() conn.execute("DELETE FROM sessions WHERE sessionid = ?", (session["sessionid"],)) conn.commit() conn.close() return {}, 200 else: return {}, 403 else: return {}, 422 @app.route("/listusers/", methods=("GET", "POST")) def listusers(secretkey): if secretkey == SECRET_KEY: conn = get_db_connection() users = conn.execute("SELECT * FROM users").fetchall() conn.close() thing = "" for x in users: thing = str(x["id"]) + " - " + x["username"] + " - " + str(get_space(x["id"])) + "
" + thing return thing else: return redirect("/") @app.route("/api/logout") def apilogout(): return render_template("logout.html") @app.errorhandler(500) def burger(e): return {}, 500 @app.errorhandler(404) def burger(e): return render_template("error.html", errorCode=404, errorMessage="Page not found"), 404 # Start server if __name__ == "__main__": print("[INFO] Server started") serve(app, host=HOST, port=PORT) print("[INFO] Server stopped")