As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
In today's digital landscape, securing web applications is more critical than ever. Authentication serves as the first line of defense, and Python offers numerous advanced strategies to protect your applications. I've implemented these techniques across various projects and found them essential for maintaining robust security without compromising user experience.
JSON Web Tokens (JWT)
JWT authentication has transformed how I handle user sessions in distributed applications. These compact, self-contained tokens eliminate the need for server-side storage while providing a secure method for information exchange.
import jwt
from datetime import datetime, timedelta
from flask import Flask, request, jsonify
app = Flask(__name__)
SECRET_KEY = "your-secure-secret-key"
ALGORITHM = "HS256"
def generate_jwt(user_id):
payload = {
"user_id": user_id,
"exp": datetime.utcnow() + timedelta(hours=1),
"iat": datetime.utcnow()
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def decode_jwt(token):
try:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
@app.route("/login", methods=["POST"])
def login():
# Authenticate user (simplified)
user_id = authenticate_user(request.json)
if not user_id:
return jsonify({"error": "Invalid credentials"}), 401
token = generate_jwt(user_id)
return jsonify({"token": token})
@app.route("/protected", methods=["GET"])
def protected_route():
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return jsonify({"error": "Missing token"}), 401
token = auth_header.split(" ")[1]
payload = decode_jwt(token)
if not payload:
return jsonify({"error": "Invalid or expired token"}), 401
# Access granted
return jsonify({"message": "Access granted", "user_id": payload["user_id"]})
When implementing JWT authentication, I've found it essential to handle token expiration and rotation properly. For added security, consider storing a token identifier in your database with the ability to invalidate specific tokens if needed.
OAuth2 Integration
OAuth2 allows users to authenticate through trusted third-party providers. I've implemented this in several applications to reduce friction during user registration and login.
from authlib.integrations.flask_client import OAuth
from flask import Flask, redirect, url_for, session
app = Flask(__name__)
app.secret_key = "your-secret-key"
oauth = OAuth(app)
# Configure Google OAuth
google = oauth.register(
name='google',
client_id='your-google-client-id',
client_secret='your-google-client-secret',
access_token_url='https://accounts.google.com/o/oauth2/token',
access_token_params=None,
authorize_url='https://accounts.google.com/o/oauth2/auth',
authorize_params=None,
api_base_url='https://www.googleapis.com/oauth2/v1/',
client_kwargs={'scope': 'openid email profile'},
)
@app.route('/login/google')
def google_login():
redirect_uri = url_for('google_authorize', _external=True)
return google.authorize_redirect(redirect_uri)
@app.route('/login/google/callback')
def google_authorize():
token = google.authorize_access_token()
user_info = google.get('userinfo').json()
# Process user info and create/update user in your database
user_id = process_oauth_user(user_info)
# Set session or generate JWT
session['user_id'] = user_id
return redirect('/dashboard')
When working with OAuth, I always implement proper state validation to prevent CSRF attacks and ensure secure handling of tokens. It's also crucial to handle account linking if users might authenticate through multiple providers.
Two-Factor Authentication with TOTP
Adding TOTP as a second authentication factor significantly enhances security. I've found pyotp makes this implementation straightforward.
import pyotp
import qrcode
from io import BytesIO
import base64
from flask import Flask, request, jsonify, render_template
app = Flask(__name__)
# Store user secrets securely in your database
user_totp_secrets = {}
@app.route("/setup-2fa", methods=["POST"])
def setup_2fa():
user_id = get_authenticated_user_id(request)
if not user_id:
return jsonify({"error": "Authentication required"}), 401
# Generate a secret key
totp_secret = pyotp.random_base32()
user_totp_secrets[user_id] = totp_secret
# Generate provisioning URI for QR code
totp = pyotp.TOTP(totp_secret)
uri = totp.provisioning_uri(f"user:{user_id}", issuer_name="YourApp")
# Generate QR code
qr = qrcode.make(uri)
buffered = BytesIO()
qr.save(buffered)
qr_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
return jsonify({
"secret": totp_secret,
"qr_code": f"data:image/png;base64,{qr_base64}"
})
@app.route("/verify-2fa", methods=["POST"])
def verify_2fa():
user_id = get_authenticated_user_id(request)
if not user_id:
return jsonify({"error": "Authentication required"}), 401
code = request.json.get("code")
if not code:
return jsonify({"error": "TOTP code required"}), 400
totp_secret = user_totp_secrets.get(user_id)
if not totp_secret:
return jsonify({"error": "2FA not set up for this user"}), 400
totp = pyotp.TOTP(totp_secret)
if totp.verify(code):
# Mark session as 2FA verified
return jsonify({"success": True})
else:
return jsonify({"error": "Invalid code"}), 400
In production, I ensure TOTP secrets are stored securely using encryption. I also always provide backup codes for users who might lose access to their authentication device.
WebAuthn for Passwordless Authentication
WebAuthn represents the future of authentication, enabling users to authenticate using biometrics or security keys. Here's how I've implemented it:
from flask import Flask, request, jsonify, session
from py_webauthn import generate_registration_options, verify_registration_response
from py_webauthn import generate_authentication_options, verify_authentication_response
import base64
import os
app = Flask(__name__)
app.secret_key = os.urandom(32)
# In-memory storage (use a database in production)
registered_credentials = {}
@app.route("/register/begin", methods=["POST"])
def begin_registration():
username = request.json.get("username")
if not username:
return jsonify({"error": "Username required"}), 400
# Generate registration options
options = generate_registration_options(
rp_id=request.host,
rp_name="My WebAuthn App",
user_id=username.encode(),
user_name=username,
user_display_name=username,
attestation="direct"
)
# Store challenge for verification
session['current_registration_challenge'] = options.challenge
return jsonify(options._asdict())
@app.route("/register/complete", methods=["POST"])
def complete_registration():
challenge = session.pop('current_registration_challenge', None)
if not challenge:
return jsonify({"error": "Registration session expired"}), 400
credential_data = request.json
try:
verification = verify_registration_response(
credential=credential_data,
expected_challenge=challenge,
expected_origin=f"https://{request.host}",
expected_rp_id=request.host
)
# Store credential for the user
user_id = verification.user_id.decode()
registered_credentials[user_id] = {
"public_key": verification.credential_public_key,
"credential_id": verification.credential_id,
"sign_count": verification.sign_count
}
return jsonify({"success": True})
except Exception as e:
return jsonify({"error": str(e)}), 400
@app.route("/authenticate/begin", methods=["POST"])
def begin_authentication():
username = request.json.get("username")
if not username or username not in registered_credentials:
return jsonify({"error": "User not registered"}), 400
credential = registered_credentials[username]
options = generate_authentication_options(
rp_id=request.host,
allow_credentials=[{
"id": credential["credential_id"],
"type": "public-key"
}]
)
# Store challenge for verification
session['current_authentication_challenge'] = options.challenge
session['authenticating_username'] = username
return jsonify(options._asdict())
@app.route("/authenticate/complete", methods=["POST"])
def complete_authentication():
challenge = session.pop('current_authentication_challenge', None)
username = session.pop('authenticating_username', None)
if not challenge or not username:
return jsonify({"error": "Authentication session expired"}), 400
credential_data = request.json
credential = registered_credentials[username]
try:
verification = verify_authentication_response(
credential=credential_data,
expected_challenge=challenge,
expected_origin=f"https://{request.host}",
expected_rp_id=request.host,
credential_public_key=credential["public_key"],
credential_current_sign_count=credential["sign_count"]
)
# Update sign count
registered_credentials[username]["sign_count"] = verification.new_sign_count
# Authentication successful, create session
session['user_id'] = username
return jsonify({"success": True})
except Exception as e:
return jsonify({"error": str(e)}), 400
WebAuthn can be challenging to implement, but the security benefits are substantial. I've found it particularly valuable for applications requiring high security with minimal user friction.
Rate Limiting and Brute Force Protection
Preventing automated attacks is essential for any authentication system. Here's how I implement rate limiting with Redis:
import time
import redis
from functools import wraps
from flask import Flask, request, jsonify
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def rate_limit(limit=5, period=60, key_prefix='rl'):
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
# Use IP as identifier (consider using username for login endpoints)
identifier = request.remote_addr
key = f"{key_prefix}:{identifier}"
# Get current count
current = redis_client.get(key)
if current and int(current) >= limit:
return jsonify({"error": "Too many attempts, please try again later"}), 429
# Increment and set expiry if needed
pipe = redis_client.pipeline()
pipe.incr(key)
if not current:
pipe.expire(key, period)
pipe.execute()
return f(*args, **kwargs)
return wrapped
return decorator
@app.route("/login", methods=["POST"])
@rate_limit(limit=5, period=60, key_prefix='login')
def login():
# Normal login logic here
username = request.json.get("username")
password = request.json.get("password")
# Track failed attempts for brute force protection
key = f"failed:{username}"
if not authenticate_user(username, password):
# Increment failed attempts
redis_client.incr(key)
redis_client.expire(key, 3600) # Reset after an hour
# If too many failures, require additional verification
failures = int(redis_client.get(key) or 0)
if failures >= 5:
return jsonify({"error": "Account locked, please reset your password"}), 403
return jsonify({"error": "Invalid credentials"}), 401
# Success - reset failed attempts counter
redis_client.delete(key)
# Proceed with login...
return jsonify({"success": True, "token": generate_token(username)})
For distributed applications, I always ensure rate limiting works across multiple instances. Redis provides an excellent solution for this, but you can also use other distributed caching systems.
Role-Based Access Control (RBAC)
Implementing proper authorization is as important as authentication. RBAC provides a scalable approach:
from flask import Flask, request, jsonify
from functools import wraps
import jwt
app = Flask(__name__)
SECRET_KEY = "your-secure-secret-key"
# Sample user-role database
users = {
"alice": {"roles": ["admin", "user"]},
"bob": {"roles": ["user"]},
"charlie": {"roles": ["moderator", "user"]}
}
# Sample role-permission mapping
role_permissions = {
"admin": ["read", "write", "delete", "manage_users"],
"moderator": ["read", "write", "delete"],
"user": ["read", "write_own"]
}
def get_user_from_token():
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return None
token = auth_header.split(" ")[1]
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
username = payload.get("username")
return users.get(username)
except:
return None
def requires_role(role):
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
user = get_user_from_token()
if not user:
return jsonify({"error": "Authentication required"}), 401
if role not in user["roles"]:
return jsonify({"error": "Insufficient permissions"}), 403
return f(*args, **kwargs)
return wrapped
return decorator
def requires_permission(permission):
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
user = get_user_from_token()
if not user:
return jsonify({"error": "Authentication required"}), 401
user_permissions = []
for role in user["roles"]:
user_permissions.extend(role_permissions.get(role, []))
if permission not in user_permissions:
return jsonify({"error": "Insufficient permissions"}), 403
return f(*args, **kwargs)
return wrapped
return decorator
@app.route("/admin-panel")
@requires_role("admin")
def admin_panel():
return jsonify({"data": "Admin panel content"})
@app.route("/posts/<int:post_id>", methods=["DELETE"])
@requires_permission("delete")
def delete_post(post_id):
# Delete post logic
return jsonify({"success": True})
In larger applications, I've found it helpful to use a dedicated permissions library like Flask-Principal or implement a custom solution with database-backed roles and permissions.
Secure Password Storage with Argon2
Modern password hashing is essential for protecting user credentials. Argon2 provides excellent protection against various attack vectors:
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from flask import Flask, request, jsonify
app = Flask(__name__)
ph = PasswordHasher()
# In-memory user storage (use a database in production)
users_db = {}
@app.route("/register", methods=["POST"])
def register():
username = request.json.get("username")
password = request.json.get("password")
if not username or not password:
return jsonify({"error": "Username and password required"}), 400
if username in users_db:
return jsonify({"error": "Username already exists"}), 400
# Hash password with Argon2
password_hash = ph.hash(password)
# Store user with hashed password
users_db[username] = {
"password_hash": password_hash
}
return jsonify({"success": True}), 201
@app.route("/login", methods=["POST"])
def login():
username = request.json.get("username")
password = request.json.get("password")
if not username or not password:
return jsonify({"error": "Username and password required"}), 400
user = users_db.get(username)
if not user:
# Use constant-time comparison to prevent timing attacks
return jsonify({"error": "Invalid credentials"}), 401
try:
# Verify password
ph.verify(user["password_hash"], password)
# Check if rehash is needed
if ph.check_needs_rehash(user["password_hash"]):
user["password_hash"] = ph.hash(password)
# Authentication successful
return jsonify({
"success": True,
"token": generate_auth_token(username)
})
except VerifyMismatchError:
return jsonify({"error": "Invalid credentials"}), 401
I always implement automatic password rehashing when parameters change and ensure secure handling of passwords in memory by avoiding string operations that might leave copies in memory.
Bringing It All Together
In practice, you'll often combine these strategies. Here's a simplified Flask application demonstrating several techniques together:
from flask import Flask, request, jsonify, session
from werkzeug.security import safe_str_cmp
from functools import wraps
import jwt
import redis
import pyotp
from argon2 import PasswordHasher
from datetime import datetime, timedelta
import os
app = Flask(__name__)
app.secret_key = os.urandom(24)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
ph = PasswordHasher()
JWT_SECRET = os.urandom(32)
# User database (use a real database in production)
users = {}
# Authentication decorator
def authenticate(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
if 'Authorization' in request.headers:
auth_header = request.headers['Authorization']
if auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
if not token:
return jsonify({'error': 'Authentication token is missing'}), 401
try:
data = jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
current_user = users.get(data['username'])
if not current_user:
return jsonify({'error': 'Invalid user'}), 401
except:
return jsonify({'error': 'Invalid or expired token'}), 401
return f(current_user, *args, **kwargs)
return decorated
@app.route('/register', methods=['POST'])
def register():
data = request.get_json()
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'error': 'Username and password are required'}), 400
if username in users:
return jsonify({'error': 'Username already exists'}), 400
# Rate limit registrations
ip = request.remote_addr
key = f'register_ip:{ip}'
if redis_client.get(key) and int(redis_client.get(key)) > 5:
return jsonify({'error': 'Too many registration attempts'}), 429
redis_client.incr(key)
redis_client.expire(key, 3600)
# Create user
totp_secret = pyotp.random_base32()
users[username] = {
'password_hash': ph.hash(password),
'totp_secret': totp_secret,
'totp_enabled': False,
'roles': ['user']
}
return jsonify({
'success': True,
'totp_secret': totp_secret,
'totp_uri': pyotp.TOTP(totp_secret).provisioning_uri(username, issuer_name="MyApp")
})
@app.route('/enable-2fa', methods=['POST'])
@authenticate
def enable_2fa(current_user):
data = request.get_json()
code = data.get('code')
if not code:
return jsonify({'error': 'TOTP code is required'}), 400
totp = pyotp.TOTP(current_user['totp_secret'])
if totp.verify(code):
current_user['totp_enabled'] = True
return jsonify({'success': True})
else:
return jsonify({'error': 'Invalid TOTP code'}), 400
@app.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'error': 'Username and password are required'}), 400
# Rate limit login attempts
key = f'login_attempts:{username}'
if redis_client.get(key) and int(redis_client.get(key)) > 5:
return jsonify({'error': 'Account locked, too many attempts'}), 429
user = users.get(username)
if not user:
redis_client.incr(key)
redis_client.expire(key, 1800) # 30 minutes
return jsonify({'error': 'Invalid credentials'}), 401
try:
ph.verify(user['password_hash'], password)
# Check if rehash is needed
if ph.check_needs_rehash(user['password_hash']):
user['password_hash'] = ph.hash(password)
# Reset failed attempts
redis_client.delete(key)
# If 2FA is enabled, require TOTP
if user['totp_enabled']:
# Generate a short-lived token for 2FA validation
temp_token = jwt.encode({
'username': username,
'exp': datetime.utcnow() + timedelta(minutes=5),
'type': '2fa_required'
}, JWT_SECRET, algorithm='HS256')
return jsonify({
'requires_2fa': True,
'temp_token': temp_token
})
# Generate access token
token = jwt.encode({
'username': username,
'exp': datetime.utcnow() + timedelta(hours=1),
'roles': user['roles']
}, JWT_SECRET, algorithm='HS256')
return jsonify({'token': token})
except:
redis_client.incr(key)
redis_client.expire(key, 1800) # 30 minutes
return jsonify({'error': 'Invalid credentials'}), 401
@app.route('/verify-2fa', methods=['POST'])
def verify_2fa():
data = request.get_json()
temp_token = data.get('temp_token')
totp_code = data.get('code')
if not temp_token or not totp_code:
return jsonify({'error': 'Token and TOTP code are required'}), 400
try:
payload = jwt.decode(temp_token, JWT_SECRET, algorithms=['HS256'])
if payload.get('type') != '2fa_required':
return jsonify({'error': 'Invalid token type'}), 400
username = payload.get('username')
user = users.get(username)
if not user:
return jsonify({'error': 'User not found'}), 404
totp = pyotp.TOTP(user['totp_secret'])
if not totp.verify(totp_code):
return jsonify({'error': 'Invalid TOTP code'}), 401
# Generate access token
token = jwt.encode({
'username': username,
'exp': datetime.utcnow() + timedelta(hours=1),
'roles': user['roles']
}, JWT_SECRET, algorithm='HS256')
return jsonify({'token': token})
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
@app.route('/protected', methods=['GET'])
@authenticate
def protected(current_user):
return jsonify({
'message': 'This is a protected endpoint',
'username': current_user.get('username'),
'roles': current_user.get('roles')
})
if __name__ == '__main__':
app.run(debug=True)
I've found that combining these strategies provides a defense-in-depth approach that significantly enhances application security. The specific combination depends on your application's requirements and threat model.
Remember that security is a continuous process. Regular security audits, keeping dependencies updated, and staying informed about new vulnerabilities are essential practices that complement these authentication strategies.
Implementing these advanced authentication methods has not only improved security in my applications but also enhanced user trust. With the right combination of techniques, you can achieve both robust security and a positive user experience.
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)