import json import math from dotenv import load_dotenv from typing import List import logging import os import datetime import dateutil from pathlib import Path load_dotenv() # load environment before initializing flaskoidc logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) from flask import render_template, session, request, abort, redirect, url_for, flash from flask_seasurf import SeaSurf from sqlalchemy import extract import requests if "ENV" in os.environ and os.environ["ENV"] == "development": # if we're running on localhost we cannot use the production OIDC provider from oidcmock import FlaskOIDCMock app = FlaskOIDCMock(__name__) else: from flaskoidc import FlaskOIDC app = FlaskOIDC(__name__) csrf = SeaSurf(app) # CSRF protection db = app.db # FlaskOIDC already creates a db instance with app.app_context(): from models import User app.db.create_all() import cli import template_filters from radicale import Radicale app.radicale = Radicale(data_path=os.environ["RADICALE_DATA_PATH"]) @app.before_request def _populate_rich_user(): """Ensure the current user object is always up-to-date and available.""" # skip flaskoidc routes; user session variable is not guaranteed to be ready if request.path in ["/logout", "/login", app.config.get("REDIRECT_URI", "/auth")]: return session.rich_user = User.create_or_update(session.get("user")) db.session.commit() def get_map_label(user: User): return user.display_name @app.route("/") def overview_map(): all_users = User.all_viewable_users() markers = dict() for u in all_users: if u.lat and u.lon: if (coords := (u.lat, u.lon)) in markers: markers[coords].append(get_map_label(u)) else: markers[coords] = [get_map_label(u)] markers = [ { "label": " & ".join(sorted(v)), "lat": lat, "lon": lon, } for (lat, lon), v in markers.items() ] return render_template("map.html.j2", markers=markers, user=session.rich_user) @app.route("/profile", methods=["GET", "POST"]) def profile(): user = session.rich_user if request.method == "POST": user.first_name = request.form.get("first_name", "") user.last_name = request.form.get("last_name", "") user.email = request.form.get("email", "") user.phone = request.form.get("phone", "") user.street = request.form.get("street", "") user.number = request.form.get("number", "") user.postal = request.form.get("postal", "") user.city = request.form.get("city", "") user.country = request.form.get("country", "") user.lat = request.form.get("lat", "") user.lon = request.form.get("lon", "") birthdate = request.form.get("birthdate") if birthdate == "": user.birthdate = None else: user.birthdate = datetime.date.fromisoformat(birthdate) user.last_updated = datetime.datetime.now() # user.include_in_views = request.form.get("include_in_views", "") == "on" # do not give users a choice; include after first modification user.include_in_views = True db.session.commit() app.radicale.create_or_update_vcf(user) flash("Je wijzigingen zijn succesvol opgeslagen!", "success") return redirect(url_for("profile")) return render_template("profile.html.j2", user=user) @app.route("/calendar") def calendar(): def days_in_month(m): return [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31][m - 1] now = datetime.datetime.now().astimezone(dateutil.tz.gettz("Europe/Amsterdam")) try: month = ((int(request.args.get("month", now.month)) - 1) % 12) + 1 except ValueError: month = now.month all_users: List[User] = db.session.execute( db.select(User) .filter_by(include_in_views=True) .filter(extract("month", User.birthdate) == month) ).scalars() buckets = [{"day": i + 1, "bucket": []} for i in range(days_in_month(month))] for u in all_users: buckets[u.birthdate.day - 1]["bucket"].append(u.display_name) ncols = 3 column_size = math.ceil(len(buckets) / ncols) columns = [ buckets[i : i + column_size] for i in range(0, len(buckets), column_size) ] try: img_src = next(Path("static/img/calendar/").glob(f"{month}-*.webp")) except StopIteration: img_src = next(Path("static/img/calendar/").glob("default*.webp")) img_credits = None if "-" in img_src.stem: img_credits = img_src.stem.split("-")[1] return render_template( "calendar.html.j2", user=session.rich_user, month=month, columns=columns, img_src=img_src, img_credits=img_credits, today=now.day if month == now.month else None, ) def get_profile(username): if username is None: return None profile = db.session.execute( db.select(User) .filter_by(include_in_views=True) .filter(User.username == username) ).first() if profile is not None: profile: User = profile[0] profile.marker = { "lat": profile.lat, "lon": profile.lon, "label": get_map_label(profile), } return profile @app.route("/addressbook_card//") @app.route("/addressbook_card/", defaults={"username": None}) def addressbook_card(username): profile = get_profile(username) return render_template( "addressbook_card.html.j2", user=session.rich_user, profile=profile, ) @app.route("/addressbook//") @app.route("/addressbook/", defaults={"username": None}) def addressbook(username): contacts = sorted(User.all_viewable_users(), key=lambda x: x.display_name) profile = get_profile(username) if username and profile is None: return redirect(url_for("addressbook")) return render_template( "addressbook.html.j2", user=session.rich_user, contacts=contacts, profile=profile, ) @app.route("/generate_carddav_password", methods=["POST"]) def generate_carddav_password(): session.rich_user.reset_carddav_password() db.session.commit() flash("Er is een nieuw CardDAV-wachtwoord gegenereerd!", "carddav") return redirect(url_for("carddav")) @app.route("/carddav") def carddav(): return render_template( "carddav.html.j2", user=session.rich_user, carddav_url=os.environ["CARDDAV_URL"] ) @app.route("/geocoding") def geocoding(): params = dict(request.args) params.update({"format": "jsonv2"}) headers = { "User-Agent": "adRUsboek v1.0", } resp = requests.get( "https://nominatim.openstreetmap.org/search", params=params, headers=headers ) data = json.loads(resp.content.decode("utf8")) if not data or "lat" not in data[0] or "lon" not in data[0]: abort(404) return json.dumps({"lat": data[0]["lat"], "lon": data[0]["lon"]}) if __name__ == "__main__": app.run(host="0.0.0.0", debug=True, port=5000)