DEV Community

Geoffrey Kim
Geoffrey Kim

Posted on

Building a Modern User Permission Management System with FastAPI, SQLAlchemy, and MariaDB

In this article, I will guide you through designing and implementing a user permission management system using FastAPI, SQLAlchemy, and MariaDB. I'll cover the database schema, recent trends in permission models, and how to set up API endpoints to manage users, roles, permissions, and groups effectively.

Recent Trends in Permission Models

While traditional Role-Based Access Control (RBAC) is still widely used, recent trends in permission models have introduced more flexible and granular control mechanisms. Here are some of the notable trends:

1. Attribute-Based Access Control (ABAC)

ABAC determines access rights based on user attributes, resource attributes, and environmental conditions. It allows for highly flexible and detailed access control policies, considering factors such as user roles, departments, and working hours.

2. Policy-Based Access Control (PBAC)

PBAC manages access through policies that define specific conditions for access. These policies are centrally managed and can include complex rules, making it suitable for organizations with intricate access control requirements.

3. JSON-Based Permission Management

Storing permissions in JSON format allows for a more flexible and hierarchical structure, making it easier to manage complex permission sets and conditions.

4. User Groups and Hierarchies

Incorporating user groups and hierarchical structures (e.g., teams, departments, projects) can streamline permission management by grouping users with similar access needs.

5. Integration with External Authentication and Authorization

Using external systems like OAuth and OpenID Connect enables Single Sign-On (SSO) and consistent authentication and authorization across multiple systems.

Database Design

Let's start with designing the database schema that incorporates these trends.

Basic Tables

Users Table

Stores basic user information.

CREATE TABLE Users (
    UserID INT PRIMARY KEY AUTO_INCREMENT,
    Username VARCHAR(50) NOT NULL UNIQUE,
    PasswordHash VARCHAR(255) NOT NULL,
    Email VARCHAR(100) NOT NULL UNIQUE,
    CreatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Enter fullscreen mode Exit fullscreen mode

Roles Table

Defines roles for users.

CREATE TABLE Roles (
    RoleID INT PRIMARY KEY AUTO_INCREMENT,
    RoleName VARCHAR(50) NOT NULL UNIQUE
);
Enter fullscreen mode Exit fullscreen mode

Permissions Table

Defines specific permissions.

CREATE TABLE Permissions (
    PermissionID INT PRIMARY KEY AUTO_INCREMENT,
    PermissionName VARCHAR(100) NOT NULL UNIQUE,
    PermissionDetails JSON NOT NULL
);
Enter fullscreen mode Exit fullscreen mode

Relationship Tables

UserRoles Table

Manages many-to-many relationships between users and roles.

CREATE TABLE UserRoles (
    UserID INT,
    RoleID INT,
    PRIMARY KEY (UserID, RoleID),
    FOREIGN KEY (UserID) REFERENCES Users(UserID),
    FOREIGN KEY (RoleID) REFERENCES Roles(RoleID)
);
Enter fullscreen mode Exit fullscreen mode

RolePermissions Table

Manages many-to-many relationships between roles and permissions.

CREATE TABLE RolePermissions (
    RoleID INT,
    PermissionID INT,
    PRIMARY KEY (RoleID, PermissionID),
    FOREIGN KEY (RoleID) REFERENCES Roles(RoleID),
    FOREIGN KEY (PermissionID) REFERENCES Permissions(PermissionID)
);
Enter fullscreen mode Exit fullscreen mode

UserPermissions Table

Allows direct assignment of permissions to users.

CREATE TABLE UserPermissions (
    UserID INT,
    PermissionID INT,
    PRIMARY KEY (UserID, PermissionID),
    FOREIGN KEY (UserID) REFERENCES Users(UserID),
    FOREIGN KEY (PermissionID) REFERENCES Permissions(PermissionID)
);
Enter fullscreen mode Exit fullscreen mode

Groups Table

Manages user groups.

CREATE TABLE Groups (
    GroupID INT PRIMARY KEY AUTO_INCREMENT,
    GroupName VARCHAR(100) NOT NULL UNIQUE
);

CREATE TABLE UserGroups (
    UserID INT,
    GroupID INT,
    PRIMARY KEY (UserID, GroupID),
    FOREIGN KEY (UserID) REFERENCES Users(UserID),
    FOREIGN KEY (GroupID) REFERENCES Groups(GroupID)
);
Enter fullscreen mode Exit fullscreen mode

AuditLogs Table

Tracks permission changes.

CREATE TABLE AuditLogs (
    AuditID INT PRIMARY KEY AUTO_INCREMENT,
    UserID INT,
    ChangeType VARCHAR(50),
    ChangeDetails JSON,
    ChangedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (UserID) REFERENCES Users(UserID)
);
Enter fullscreen mode Exit fullscreen mode

Setting Up FastAPI and SQLAlchemy

Database Models

Define the database models using SQLAlchemy.

from sqlalchemy import Column, Integer, String, ForeignKey, Table, create_engine, JSON, TIMESTAMP
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from sqlalchemy.sql import func

Base = declarative_base()

# Association tables for many-to-many relationships
user_roles = Table(
    'user_roles', Base.metadata,
    Column('user_id', Integer, ForeignKey('users.id')),
    Column('role_id', Integer, ForeignKey('roles.id'))
)

role_permissions = Table(
    'role_permissions', Base.metadata,
    Column('role_id', Integer, ForeignKey('roles.id')),
    Column('permission_id', Integer, ForeignKey('permissions.id'))
)

user_permissions = Table(
    'user_permissions', Base.metadata,
    Column('user_id', Integer, ForeignKey('users.id')),
    Column('permission_id', Integer, ForeignKey('permissions.id'))
)

user_groups = Table(
    'user_groups', Base.metadata,
    Column('user_id', Integer, ForeignKey('users.id')),
    Column('group_id', Integer, ForeignKey('groups.id'))
)

# Models
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, index=True, nullable=False)
    password_hash = Column(String(255), nullable=False)
    email = Column(String(100), unique=True, index=True, nullable=False)
    roles = relationship('Role', secondary=user_roles, back_populates='users')
    permissions = relationship('Permission', secondary=user_permissions, back_populates='users')
    groups = relationship('Group', secondary=user_groups, back_populates='users')

class Role(Base):
    __tablename__ = 'roles'
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), unique=True, index=True, nullable=False)
    users = relationship('User', secondary=user_roles, back_populates='roles')
    permissions = relationship('Permission', secondary=role_permissions, back_populates='roles')

class Permission(Base):
    __tablename__ = 'permissions'
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), unique=True, index=True, nullable=False)
    details = Column(JSON, nullable=False)
    roles = relationship('Role', secondary=role_permissions, back_populates='permissions')
    users = relationship('User', secondary=user_permissions, back_populates='permissions')

class Group(Base):
    __tablename__ = 'groups'
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), unique=True, index=True, nullable=False)
    users = relationship('User', secondary=user_groups, back_populates='groups')

class AuditLog(Base):
    __tablename__ = 'audit_logs'
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    change_type = Column(String(50))
    change_details = Column(JSON)
    changed_at = Column(TIMESTAMP, server_default=func.now())
    user = relationship('User')

# Database setup
DATABASE_URL = "mariadb+mariadbconnector://user:password@localhost/dbname"

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base.metadata.create_all(bind=engine)
Enter fullscreen mode Exit fullscreen mode

FastAPI Endpoints

Set up the API endpoints using FastAPI.

from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel
import bcrypt

app = FastAPI()

# Dependency to get DB session
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Pydantic models for request bodies
class UserCreate(BaseModel):
    username: str
    email: str
    password: str

class RoleCreate(BaseModel):
    name: str

class PermissionCreate(BaseModel):
    name: str
    details: dict

class GroupCreate(BaseModel):
    name: str

# Utility functions
def get_password_hash(password: str) -> str:
    return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')

# Endpoints
@app.post("/users/", response_model=UserCreate)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    hashed_password = get_password_hash(user.password)
    db_user = User(username=user.username, email=user.email, password_hash=hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

@app.post("/roles/", response_model=RoleCreate)
def create_role(role: RoleCreate, db: Session = Depends(get_db)):
    db_role = Role(name=role.name)
    db.add(db_role)
    db.commit()
    db.refresh(db_role)
    return db_role

@app.post("/permissions/", response_model=PermissionCreate)
def create_permission(permission: PermissionCreate, db: Session = Depends(get_db)):
    db_permission = Permission(name=permission.name, details=permission.details)
    db.add(db_permission)
    db.commit()
    db.refresh(db_permission)
    return db_permission

@app.post("/groups/", response_model=GroupCreate)
def create_group(group: GroupCreate, db: Session = Depends(get_db)):
    db_group = Group(name=group.name)
    db.add(db_group)
    db.commit()
    db.refresh(db_group)
    return db_group

@app.post("/users/{user_id}/roles/{role_id}")
def assign_role_to_user(user_id: int, role_id: int, db: Session = Depends(get_db)):
    user = db.query(User).filter(User.id == user_id).first()
    role = db.query(Role).filter(Role.id == role_id).first()
    if not user or not role:
        raise HTTPException(status_code=404, detail="User or Role not found")
    user.roles.append(role)
    db.commit()
    return {"message": "Role assigned to user"}

@app.post("/users/{user_id}/permissions/{permission_id}")
def assign_permission_to_user(user_id: int, permission_id: int, db: Session = Depends(get_db)):
    user = db.query(User).filter(User.id == user_id).first()
    permission = db.query(Permission).filter(Permission.id == permission_id).first()
    if not user or not permission:
        raise HTTPException(status_code=404, detail="User or Permission not found")
    user.permissions.append(permission)
    db.commit()
    return {"message": "Permission assigned to user"}

@app.post("/roles/{role_id}/permissions/{permission_id}")
def assign_permission_to_role(role_id: int, permission_id: int, db: Session = Depends(get_db)):
    role = db.query(Role).filter(Role.id == role_id).first()
    permission = db.query(Permission).filter(Permission.id == permission_id).first()
    if not role or not permission:
        raise HTTPException(status_code=404, detail="Role or Permission not found")
    role.permissions.append(permission)
    db.commit()
    return {"message": "Permission assigned to role"}

@app.post("/users/{user_id}/groups/{group_id}")
def assign_user_to_group(user_id: int, group_id: int, db: Session = Depends(get_db)):
    user = db.query(User).filter(User.id == user_id).first()
    group = db.query(Group).filter(Group.id == group_id).first()
    if not user or not group:
        raise HTTPException(status_code=404, detail="User or Group not found")
    user.groups.append(group)
    db.commit()
    return {"message": "User assigned to group"}

@app.post("/audit/", response_model=dict)
def create_audit_log(user_id: int, change_type: str, change_details: dict, db: Session = Depends(get_db)):
    db_audit = AuditLog(user_id=user_id, change_type=change_type, change_details=change_details)
    db.add(db_audit)
    db.commit()
    db.refresh(db_audit)
    return {"message": "Audit log created"}
Enter fullscreen mode Exit fullscreen mode

Conclusion

This article provided a comprehensive guide to building a user permission management system using FastAPI, SQLAlchemy, and MariaDB. By incorporating recent trends in permission models, I designed a flexible and scalable system. Remember, for a complete production-ready implementation, consider adding further security measures, error handling, validation, and logging.

Top comments (0)