Skip to content

Commit

Permalink
Merge pull request #529 from scidsg/me
Browse files Browse the repository at this point in the history
Add Verified URLs
  • Loading branch information
micahflee committed Sep 6, 2024
2 parents e215d18 + dc86e6f commit 82677d3
Show file tree
Hide file tree
Showing 14 changed files with 880 additions and 220 deletions.
10 changes: 10 additions & 0 deletions hushline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from flask import Flask, flash, redirect, request, session, url_for
from flask_migrate import Migrate
from werkzeug.middleware.proxy_fix import ProxyFix
from werkzeug.wrappers.response import Response

from . import admin, routes, settings
Expand Down Expand Up @@ -47,6 +48,15 @@ def create_app() -> Flask:
app.config["SMTP_ENCRYPTION"] = os.environ.get("SMTP_ENCRYPTION", "StartTLS")
app.config["REQUIRE_PGP"] = os.environ.get("REQUIRE_PGP", "False").lower() == "true"

# Handle the tips domain for profile verification
app.config["SERVER_NAME"] = os.getenv("SERVER_NAME")
app.config["PREFERRED_URL_SCHEME"] = "https" if os.getenv("SERVER_NAME") is not None else "http"

if not app.config["IS_PERSONAL_SERVER"]:
# if were running the managed service, we are behind a proxy
app.wsgi_app = ProxyFix( # type: ignore[method-assign]
app.wsgi_app, x_for=2, x_proto=1, x_host=0, x_port=0, x_prefix=0
)
# Run migrations
db.init_app(app)
Migrate(app, db)
Expand Down
4 changes: 4 additions & 0 deletions hushline/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class User(Model):
extra_field_value3: Mapped[Optional[str]]
extra_field_label4: Mapped[Optional[str]]
extra_field_value4: Mapped[Optional[str]]
extra_field_verified1: Mapped[Optional[bool]] = mapped_column(default=False)
extra_field_verified2: Mapped[Optional[bool]] = mapped_column(default=False)
extra_field_verified3: Mapped[Optional[bool]] = mapped_column(default=False)
extra_field_verified4: Mapped[Optional[bool]] = mapped_column(default=False)

@property
def password_hash(self) -> str:
Expand Down
25 changes: 25 additions & 0 deletions hushline/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,30 @@ def profile(username: str) -> Response | str:
math_problem = f"{num1} + {num2} ="
session["math_answer"] = str(num1 + num2) # Store the answer in session as a string

# Prepare extra fields and verification status
extra_fields = [
{
"label": user.extra_field_label1,
"value": user.extra_field_value1,
"verified": user.extra_field_verified1,
},
{
"label": user.extra_field_label2,
"value": user.extra_field_value2,
"verified": user.extra_field_verified2,
},
{
"label": user.extra_field_label3,
"value": user.extra_field_value3,
"verified": user.extra_field_verified3,
},
{
"label": user.extra_field_label4,
"value": user.extra_field_value4,
"verified": user.extra_field_verified4,
},
]

return render_template(
"profile.html",
form=form,
Expand All @@ -135,6 +159,7 @@ def profile(username: str) -> Response | str:
is_personal_server=app.config["IS_PERSONAL_SERVER"],
require_pgp=app.config["REQUIRE_PGP"],
math_problem=math_problem,
extra_fields=extra_fields, # Pass extra fields to template
)

@app.route("/to/<username>", methods=["POST"])
Expand Down
204 changes: 126 additions & 78 deletions hushline/settings.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import asyncio
import base64
import io
import re
from datetime import UTC, datetime
from typing import Optional
from typing import Any, Optional

import aiohttp
import pyotp
import qrcode
import requests
from bs4 import BeautifulSoup
from flask import (
Blueprint,
current_app,
Expand Down Expand Up @@ -141,31 +144,53 @@ class DirectoryVisibilityForm(FlaskForm):
show_in_directory = BooleanField("Show on public directory")


def strip_whitespace(value: Optional[Any]) -> Optional[str]:
if value is not None and hasattr(value, "strip"):
return value.strip()
return value


class ProfileForm(FlaskForm):
bio = TextAreaField("Bio", validators=[Length(max=250)])
bio = TextAreaField("Bio", filters=[strip_whitespace], validators=[Length(max=250)])
extra_field_label1 = StringField(
"Extra Field Label 1", validators=[OptionalField(), Length(max=50)]
"Extra Field Label 1",
filters=[strip_whitespace],
validators=[OptionalField(), Length(max=50)],
)
extra_field_value1 = StringField(
"Extra Field Value 1", validators=[OptionalField(), Length(max=4096)]
"Extra Field Value 1",
filters=[strip_whitespace],
validators=[OptionalField(), Length(max=4096)],
)
extra_field_label2 = StringField(
"Extra Field Label 2", validators=[OptionalField(), Length(max=50)]
"Extra Field Label 2",
filters=[strip_whitespace],
validators=[OptionalField(), Length(max=50)],
)
extra_field_value2 = StringField(
"Extra Field Value 2", validators=[OptionalField(), Length(max=4096)]
"Extra Field Value 2",
filters=[strip_whitespace],
validators=[OptionalField(), Length(max=4096)],
)
extra_field_label3 = StringField(
"Extra Field Label 3", validators=[OptionalField(), Length(max=50)]
"Extra Field Label 3",
filters=[strip_whitespace],
validators=[OptionalField(), Length(max=50)],
)
extra_field_value3 = StringField(
"Extra Field Value 3", validators=[OptionalField(), Length(max=4096)]
"Extra Field Value 3",
filters=[strip_whitespace],
validators=[OptionalField(), Length(max=4096)],
)
extra_field_label4 = StringField(
"Extra Field Label 4", validators=[OptionalField(), Length(max=50)]
"Extra Field Label 4",
filters=[strip_whitespace],
validators=[OptionalField(), Length(max=50)],
)
extra_field_value4 = StringField(
"Extra Field Value 4", validators=[OptionalField(), Length(max=4096)]
"Extra Field Value 4",
filters=[strip_whitespace],
validators=[OptionalField(), Length(max=4096)],
)


Expand Down Expand Up @@ -194,12 +219,36 @@ def set_input_disabled(input_field: Field, disabled: bool = True) -> None:
unset_field_attribute(input_field, "disabled")


# Define the async function for URL verification
async def verify_url(
session: aiohttp.ClientSession, user: User, i: int, url_to_verify: str, profile_url: str
) -> None:
try:
async with session.get(url_to_verify, timeout=aiohttp.ClientTimeout(total=5)) as response:
response.raise_for_status()
html_content = await response.text()

soup = BeautifulSoup(html_content, "html.parser")
verified = False
for link in soup.find_all("a"):
href = link.get("href")
rel = link.get("rel", [])
if href == profile_url and "me" in rel:
verified = True
break

setattr(user, f"extra_field_verified{i}", verified)
except aiohttp.ClientError as e:
current_app.logger.error(f"Error fetching URL for field {i}: {e}")
setattr(user, f"extra_field_verified{i}", False)


def create_blueprint() -> Blueprint:
bp = Blueprint("settings", __file__, url_prefix="/settings")

@bp.route("/", methods=["GET", "POST"])
@authentication_required
def index() -> str | Response:
@bp.route("/", methods=["GET", "POST"])
async def index() -> str | Response:
user_id = session.get("user_id")
if not user_id:
return redirect(url_for("login"))
Expand All @@ -224,57 +273,56 @@ def index() -> str | Response:
directory_visibility_form = DirectoryVisibilityForm()
profile_form = ProfileForm()

# Check if the bio update form was submitted
if (
request.method == "POST"
and "update_bio" in request.form
and profile_form.validate_on_submit()
):
user.bio = request.form["bio"]
user.extra_field_label1 = profile_form.extra_field_label1.data.strip()
user.extra_field_value1 = profile_form.extra_field_value1.data.strip()
user.extra_field_label2 = profile_form.extra_field_label2.data.strip()
user.extra_field_value2 = profile_form.extra_field_value2.data.strip()
user.extra_field_label3 = profile_form.extra_field_label3.data.strip()
user.extra_field_value3 = profile_form.extra_field_value3.data.strip()
user.extra_field_label4 = profile_form.extra_field_label4.data.strip()
user.extra_field_value4 = profile_form.extra_field_value4.data.strip()
db.session.commit()
flash("👍 Bio updated successfully.")
return redirect(url_for("settings.index"))
# Handle form submissions
if request.method == "POST":
# Update bio and custom fields
if "update_bio" in request.form and profile_form.validate_on_submit():
user.bio = profile_form.bio.data

if request.method == "POST" and (
directory_visibility_form.validate_on_submit()
and "update_directory_visibility" in request.form
):
user.show_in_directory = directory_visibility_form.show_in_directory.data
db.session.commit()
flash("👍 Directory visibility updated successfully.")
return redirect(url_for("settings.index"))
# Define base_url from the environment or config
profile_url = url_for("profile", _external=True, username=user.primary_username)

# Additional admin-specific data initialization
user_count = two_fa_count = pgp_key_count = two_fa_percentage = pgp_key_percentage = None
all_users = []
async with aiohttp.ClientSession() as client_session:
tasks = []
for i in range(1, 5):
label_field = getattr(profile_form, f"extra_field_label{i}", "")
value_field = getattr(profile_form, f"extra_field_value{i}", "")

# Check if user is admin and add admin-specific data
if user.is_admin:
user_count = db.session.scalar(db.func.count(User.id))
two_fa_count = db.session.scalar(
db.select(db.func.count(User.id).filter(User._totp_secret.isnot(None)))
)
pgp_key_count = db.session.scalar(
db.select(
db.func.count(User.id)
.filter(User._pgp_key.isnot(None))
.filter(User._pgp_key != "")
)
)
two_fa_percentage = (two_fa_count / user_count * 100) if user_count else 0
pgp_key_percentage = (pgp_key_count / user_count * 100) if user_count else 0
all_users = list(db.session.scalars(db.select(User)).all()) # Fetch all users for admin
label = label_field.data if hasattr(label_field, "data") else label_field
setattr(user, f"extra_field_label{i}", label)

value = value_field.data if hasattr(value_field, "data") else value_field
setattr(user, f"extra_field_value{i}", value)

# If the value is empty, reset the verification status
if not value:
setattr(user, f"extra_field_verified{i}", False)
continue

# Verify the URL only if it starts with "https://"
url_to_verify = value
if url_to_verify.startswith("https://"):
task = verify_url(client_session, user, i, url_to_verify, profile_url)
tasks.append(task)

# Run all the tasks concurrently
if tasks: # Only gather if there are tasks to run
await asyncio.gather(*tasks)

db.session.commit()
flash("👍 Bio and fields updated successfully.")
return redirect(url_for("settings.index"))

# Update directory visibility
if (
"update_directory_visibility" in request.form
and directory_visibility_form.validate_on_submit()
):
user.show_in_directory = directory_visibility_form.show_in_directory.data
db.session.commit()
flash("👍 Directory visibility updated successfully.")
return redirect(url_for("settings.index"))

# Handle form submissions
if request.method == "POST":
# Handle Display Name Form Submission
if "update_display_name" in request.form and display_name_form.validate_on_submit():
user.update_display_name(display_name_form.display_name.data.strip())
Expand Down Expand Up @@ -305,26 +353,26 @@ def index() -> str | Response:
)
return redirect(url_for(".index"))

# Check if user is admin and add admin-specific data
is_admin = user.is_admin
if is_admin:
user_count = db.session.scalar(db.func.count(User.id))
two_fa_count = db.session.scalar(
db.select(db.func.count(User.id).filter(User._totp_secret.isnot(None)))
)
pgp_key_count = db.session.scalar(
db.select(
db.func.count(User.id)
.filter(User._pgp_key.isnot(None))
.filter(User._pgp_key != "")
)
# Additional admin-specific data initialization
user_count = two_fa_count = pgp_key_count = two_fa_percentage = pgp_key_percentage = None
all_users = []

# Check if user is admin and add admin-specific data
if user.is_admin:
user_count = db.session.scalar(db.func.count(User.id))
two_fa_count = db.session.scalar(
db.select(db.func.count(User.id).filter(User._totp_secret.isnot(None)))
)
pgp_key_count = db.session.scalar(
db.select(
db.func.count(User.id)
.filter(User._pgp_key.isnot(None))
.filter(User._pgp_key != "")
)
two_fa_percentage = (two_fa_count / user_count * 100) if user_count else 0
pgp_key_percentage = (pgp_key_count / user_count * 100) if user_count else 0
else:
user_count = two_fa_count = pgp_key_count = two_fa_percentage = (
pgp_key_percentage
) = None
)
two_fa_percentage = (two_fa_count / user_count * 100) if user_count else 0
pgp_key_percentage = (pgp_key_count / user_count * 100) if user_count else 0
all_users = list(db.session.scalars(db.select(User)).all()) # Fetch all users for admin

# Prepopulate form fields
email_forwarding_form.forwarding_enabled.data = user.email is not None
Expand Down
Loading

0 comments on commit 82677d3

Please sign in to comment.