From 5391b7f32254623704458d1b8415109e97c1f489 Mon Sep 17 00:00:00 2001
From: livrasand
Date: Sun, 13 Oct 2024 19:49:08 -0600
Subject: [PATCH] =?UTF-8?q?Implementaci=C3=B3n=20de=20Autenticaci=C3=B3n?=
=?UTF-8?q?=20y=20Seguridad=20Mejorada?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
app.py | 321 +++++++++++++++++++-----------------
templates/activate_2fa.html | 40 +++++
templates/auth.html | 19 ---
templates/authorize.html | 18 --
templates/login.html | 2 +-
templates/logout.html | 4 +-
templates/sent.html | 8 +-
templates/verify_2fa.html | 42 +++++
8 files changed, 262 insertions(+), 192 deletions(-)
create mode 100644 templates/activate_2fa.html
delete mode 100644 templates/auth.html
delete mode 100644 templates/authorize.html
create mode 100644 templates/verify_2fa.html
diff --git a/app.py b/app.py
index 91dcbe5..00b325a 100644
--- a/app.py
+++ b/app.py
@@ -21,6 +21,8 @@
import threading
import re
from dotenv import load_dotenv
+import pyotp
+import qrcode
load_dotenv()
app = Flask(__name__)
@@ -43,6 +45,8 @@
bcrypt = Bcrypt(app)
jwt = JWTManager(app)
+secret = pyotp.random_base32()
+
#@babel.localeselector
def get_locale():
return session.get('language')
@@ -135,121 +139,6 @@ def sitemap():
def login():
return render_template('login.html')
-@app.route('/helloworld')
-def helloworld():
- return render_template('status.html')
-
-@app.route('/status', methods=['GET'])
-def status():
- try:
- status = {
- "domain": check_domain(),
- "server": check_server(),
- "database": check_database(),
- "code": check_code()
- }
- return jsonify(status)
- except Exception as e:
- # Registra el error con detalles para el desarrollador
- app.logger.error(f'Error occurred in status check: {str(e)}')
- # Muestra un mensaje genérico al usuario
- return jsonify({"error": "Ocurrió un error al verificar el estado, por favor inténtelo más tarde."}), 500
-
-@app.errorhandler(Exception)
-def handle_exception(e):
- # Registra el error con detalles para el desarrollador
- app.logger.error(f'Error occurred: {str(e)}')
- # Muestra un mensaje genérico al usuario
- return jsonify({"error": "Ocurrió un error, por favor inténtelo más tarde."}), 500
-
-def send_email_status(subject, body, recipients):
- msg = Message(subject, sender=app.config['MAIL_USERNAME'], recipients=recipients)
- msg.body = body
- mail.send(msg)
-
-previous_status = None
-
-def monitor_status():
- global previous_status
- while True:
- current_status = {
- "domain": check_domain(),
- "server": check_server(),
- "database": check_database(),
- "code": check_code()
- }
-
- if current_status != previous_status:
- # Si el estado ha cambiado, envía correos a los suscriptores
- conn = sqlite3.connect(DATABASE)
- cursor = conn.cursor()
- cursor.execute("SELECT email FROM subscriptions")
- subscribers = [row[0] for row in cursor.fetchall()]
- conn.close()
-
- status_message = '\n'.join([f"{key}: {value}" for key, value in current_status.items()])
- send_email_status("Kingdom Hall Attendant Status Update", status_message, subscribers)
-
- previous_status = current_status
-
- time.sleep(10) # Revisa cada 10 segundos
-
-# Inicia el hilo de monitoreo
-monitor_thread = threading.Thread(target=monitor_status, daemon=True)
-monitor_thread.start()
-
-@app.route('/subscribe', methods=['POST'])
-def subscribe():
- data = request.json
- email = data.get('email')
-
- if not email:
- return jsonify({"error": "Email is required"}), 400
-
- conn = sqlite3.connect(DATABASE)
- cursor = conn.cursor()
- cursor.execute("INSERT OR IGNORE INTO subscriptions (email) VALUES (?)", (email,))
- conn.commit()
- conn.close()
-
- return jsonify({"message": "Subscribed successfully"}), 201
-
-def check_domain():
- try:
- response = requests.get('https://www.getkha.org', timeout=5) # 5 segundos
- if response.status_code == 200:
- return "Online"
- else:
- return "Offline"
- except requests.exceptions.Timeout:
- return "Request timed out"
- except requests.exceptions.RequestException as e:
- return f"An error occurred: {e}"
-
-def check_server():
- try:
- response = requests.get('https://www.getkha.org', timeout=5)
- if response.status_code == 200:
- return "Online"
- else:
- return "Offline"
- except requests.exceptions.Timeout:
- return "Request timed out"
- except requests.exceptions.RequestException as e:
- return f"An error occurred: {e}"
-
-def check_database():
- try:
- conn = sqlite3.connect('cavea.db')
- cursor = conn.cursor()
- cursor.execute('SELECT 1')
- return "Online"
- except sqlite3.Error:
- return "Offline"
-
-def check_code():
- return "Up-to-date"
-
@app.route('/login-desktop-client')
def login_desktop():
return render_template('login-desktop-client.html')
@@ -2198,41 +2087,150 @@ def inject_user_info():
@app.route('/accessing', methods=['POST'])
def accessing():
- email = request.form['email']
+ email = request.form.get('email')
+ if not email:
+ flash('El campo de correo electrónico es obligatorio.')
+ return redirect(url_for('login'))
+
password = request.form['password']
- # Crear una conexión directa a cavea.db
+ # Crear una conexión directa a la base de datos
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
- cursor.execute("SELECT id, database, contraseña, golden_edition FROM emptor WHERE correo = ?", (email,))
- result = cursor.fetchone()
+ # Consultar la base de datos para obtener la información del usuario
+ cursor.execute("SELECT id, database, contraseña, golden_edition, secret FROM emptor WHERE correo = ?", (email,))
+ result = cursor.fetchone()
+
if result and bcrypt.check_password_hash(result['contraseña'], password):
+ # Si el usuario tiene 2FA configurado
+ secret = result['secret']
user_id = result['id']
- user_db_name = result['database']
- user_golden_edition = result['golden_edition']
- session['user_db'] = user_db_name
- session['user_id'] = user_id
- session['user_ge'] = user_golden_edition
- logging.debug(f"Usuario {email} ha iniciado sesión. Conectado a la base de datos del usuario: {user_db_name}. ¿Es Golden Edition?: {user_golden_edition}")
-
- # Actualizar el último inicio de sesión del usuario
- update_last_login(user_id)
- last_login = get_last_login(user_id)
-
- # Enviar correo electrónico de notificación
- requester_ip = get_requester_ip()
- sender = app.config['MAIL_USERNAME']
- send_login_notification(sender, email, email, last_login, requester_ip)
- conn.close()
- return redirect(url_for('index'))
+ if secret:
+ # Actualizar el último inicio de sesión del usuario
+ update_last_login(user_id)
+ last_login = get_last_login(user_id)
+
+ # Guardar temporalmente el ID del usuario para verificar el 2FA
+ session['temp_user_id'] = user_id
+ session['temp_user_email'] = email # Guardar el email temporal
+ conn.close() # Asegurarse de cerrar la conexión
+ return redirect(url_for('verify_2fa'))
+ else:
+ # No tiene 2FA, así que generamos un nuevo secreto y código QR
+ # Generar nuevo secreto y URI para el código QR
+ secret = pyotp.random_base32()
+ totp = pyotp.TOTP(secret)
+ uri = totp.provisioning_uri(name=email, issuer_name="Kingdom Hall Attendant")
+ img = qrcode.make(uri)
+
+ # Guardar el código QR en el sistema
+ qr_folder_path = os.path.join('static', 'qrcodes')
+ os.makedirs(qr_folder_path, exist_ok=True) # Crear la carpeta si no existe
+ qr_code_path = os.path.join(qr_folder_path, f'{email}_qr.png')
+ img.save(qr_code_path)
+
+ # Actualizar la base de datos con el nuevo secreto
+ cursor.execute("UPDATE emptor SET secret = ? WHERE correo = ?", (secret, email))
+ conn.commit()
+ conn.close() # Asegurarse de cerrar la conexión
+
+ # Guardar temporalmente el email para obligar al usuario a activar 2FA
+ session['temp_user_email'] = email
+
+ return render_template('activate_2fa.html', qr_code_path=qr_code_path) # Mostrar el QR al usuario
+
else:
- conn.close()
+ # Correo o contraseña incorrectos
+ conn.close() # Cerrar conexión en caso de error
flash('Correo o contraseña incorrectos.')
return redirect(url_for('login'))
+@app.route('/activate_2fa', methods=['GET', 'POST'])
+def activate_2fa():
+ if request.method == 'POST':
+ email = session.get('temp_user_email') # Recuperar el correo del usuario temporal
+ if not email:
+ flash('Error en la sesión. Por favor, inicie sesión de nuevo.')
+ return redirect(url_for('login'))
+
+ secret = pyotp.random_base32() # Generar un nuevo secreto para 2FA
+ totp = pyotp.TOTP(secret)
+
+ # Generar URI para el código QR
+ uri = totp.provisioning_uri(name=email, issuer_name="Kingdom Hall Attendant")
+ img = qrcode.make(uri)
+
+ # Definir la ruta del código QR y crear la carpeta si no existe
+ qr_folder_path = os.path.join('static', 'qrcodes')
+ os.makedirs(qr_folder_path, exist_ok=True) # Crear la carpeta si no existe
+
+ qr_code_path = os.path.join(qr_folder_path, f'{email}_qr.png')
+ img.save(qr_code_path)
+
+ # Actualizar la base de datos con el nuevo secreto
+ conn = sqlite3.connect(DATABASE)
+ conn.row_factory = sqlite3.Row
+ cursor = conn.cursor()
+ cursor.execute("UPDATE emptor SET secret = ? WHERE correo = ?", (secret, email))
+ conn.commit()
+ conn.close()
+
+ return render_template('activate_2fa.html', qr_code_path=qr_code_path) # Mostrar el QR al usuario
+
+ return render_template('activate_2fa.html')
+
+@app.route('/verify_2fa', methods=['GET', 'POST'])
+def verify_2fa():
+ if request.method == 'POST':
+ codigo_ingresado = request.form['codigo']
+ user_id = session.get('temp_user_id') # Recuperar el ID de usuario temporal
+ email = session.get('temp_user_email') # Recuperar el email temporal
+
+ if not user_id or not email:
+ flash('La sesión ha expirado. Por favor, inicie sesión nuevamente.')
+ return redirect(url_for('login'))
+
+ # Conectar a la base de datos y obtener el secreto 2FA
+ conn = sqlite3.connect(DATABASE)
+ conn.row_factory = sqlite3.Row
+ cursor = conn.cursor()
+ cursor.execute("SELECT secret, database, golden_edition FROM emptor WHERE id = ?", (user_id,))
+ result = cursor.fetchone()
+
+ if result:
+ totp = pyotp.TOTP(result['secret'])
+ if totp.verify(codigo_ingresado, valid_window=1): # Verificación con ventana de validación de 1
+ # Código 2FA es correcto, proceder a autenticar al usuario
+ session['user_id'] = user_id
+ session['user_db'] = result['database']
+ session['user_ge'] = result['golden_edition']
+
+ # Limpiar los datos temporales
+ session.pop('temp_user_id', None)
+ session.pop('temp_user_email', None)
+
+ # Actualizar el último inicio de sesión del usuario
+ update_last_login(user_id)
+ last_login = get_last_login(user_id)
+
+ # Enviar correo electrónico de notificación
+ requester_ip = get_requester_ip()
+ sender = app.config['MAIL_USERNAME']
+ send_login_notification(sender, email, email, last_login, requester_ip)
+ # Redirigir al dashboard o página principal
+ conn.close()
+ return redirect(url_for('index'))
+ else:
+ flash('Código 2FA incorrecto.')
+ else:
+ flash('Error al verificar el código 2FA.')
+
+ conn.close()
+ return render_template('verify_2fa.html')
+
@app.route('/signup')
def signup():
return render_template('signup.html')
@@ -2271,12 +2269,11 @@ def register():
if request.method == 'POST':
email = request.form['email']
- # Validar el email para asegurarse de que sea seguro
+ # Validar el email
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
flash('Error: dirección de correo electrónico no válida.')
return redirect(url_for('register'))
- # Conectar a la base de datos
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
@@ -2294,20 +2291,30 @@ def register():
token = secrets.token_urlsafe(16)
token_expiration = datetime.datetime.now() + timedelta(hours=1)
confirm_url = url_for('confirm_email', token=token, _external=True)
- requester_ip = get_requester_ip()
- sender = app.config['MAIL_USERNAME']
- send_email(sender, email, confirm_url, requester_ip)
+
+ # Generar clave secreta para 2FA
+ secret = pyotp.random_base32()
+ totp = pyotp.TOTP(secret)
+
+ # Generar URI para el código QR
+ uri = totp.provisioning_uri(name=email, issuer_name="Kingdom Hall Attendant")
+
+ # Generar código QR
+ img = qrcode.make(uri)
+ qr_code_path = os.path.join('static', 'qrcodes', f'{email}_qr.png')
+ img.save(qr_code_path)
user_db_name = f"{email.split('@')[0]}.db"
- # Validar el nombre de archivo para asegurarse de que no contenga caracteres peligrosos
+ # Validar el nombre de archivo
if re.match(r'^[a-zA-Z0-9_-]+\.db$', user_db_name):
- cursor.execute("INSERT INTO emptor (correo, token, token_expiration, database) VALUES (?, ?, ?, ?)",
- (email, token, token_expiration, user_db_name))
+ # Insertar en la base de datos
+ cursor.execute("INSERT INTO emptor (correo, token, token_expiration, database, secret) VALUES (?, ?, ?, ?, ?)",
+ (email, token, token_expiration, user_db_name, secret))
conn.commit()
+ # Copiar la base de datos
try:
- # Copiar la base de datos
shutil.copy("kha.db", user_db_name)
user_conn = sqlite3.connect(user_db_name)
user_cursor = user_conn.cursor()
@@ -2318,20 +2325,30 @@ def register():
flash('Error: la base de datos "kha.db" no se encontró.')
except Exception as e:
flash(f'Error al copiar la base de datos: {str(e)}')
-
- return redirect(url_for('register_sent', email=email))
+
+ # Enviar el correo de confirmación
+ send_email(app.config['MAIL_DEFAULT_SENDER'], email, confirm_url, request.remote_addr)
+
+ return redirect(url_for('register_sent', email=email)) # Redirigir a la vista que muestra el QR
else:
flash('Error: nombre de archivo no válido.')
- conn.close() # Asegúrate de cerrar la conexión a la base de datos
+ conn.close()
return redirect(url_for('register'))
return render_template('signup.html')
-@app.route('/register/sent', methods=['GET'])
+@app.route('/register/sent', methods=['GET', 'POST'])
def register_sent():
email = request.args.get('email') # Obtener el correo de los parámetros de la URL
- return render_template('sent.html', email=email)
+ qr_code_path = os.path.join('static', 'qrcodes', f'{email}_qr.png')
+
+ # Comprobar si el archivo QR existe
+ if not os.path.exists(qr_code_path):
+ flash('Error: el código QR no se encontró.')
+ return redirect(url_for('register'))
+
+ return render_template('sent.html', email=email, qr_code_path=qr_code_path) # Pasar la ruta del QR a la plantilla
@app.route('/sign-up-system')
def log_in_system():
@@ -4021,4 +4038,4 @@ def eliminar_all_ava():
return redirect(url_for('audio_video_acomodadores'))
if __name__ == '__main__':
- app.run(debug=False)
\ No newline at end of file
+ app.run(debug=True) #or False
\ No newline at end of file
diff --git a/templates/activate_2fa.html b/templates/activate_2fa.html
new file mode 100644
index 0000000..cd11c71
--- /dev/null
+++ b/templates/activate_2fa.html
@@ -0,0 +1,40 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Activar 2FA - Kingdom Hall Attendant
+
+
+
+
+
+
+
+
Active la Autenticación 2FA
+ {% if qr_code_path %}
+
Escanea el código QR utilizando Google Authenticator, Microsoft Authenticator o cualquier aplicación de autenticación de tu elección compatible con códigos de un solo uso (TOTP) para activar la verificación en dos pasos (2FA) y mejorar la seguridad de tu cuenta:
Escanea el código QR utilizando Google Authenticator, Microsoft Authenticator o cualquier aplicación de autenticación de tu elección compatible con códigos de un solo uso (TOTP) para activar la verificación en dos pasos (2FA) y mejorar la seguridad de tu cuenta.
+
+{% endif %}
+
+
Será redirigido desde el enlace del correo electrónico. Puede cerrar esta pestaña sin problemas.