Skip to content

API Reference

Password reset tokens module for user password recovery.

This module provides CRUD operations, email messaging, and utilities for password reset token management.

Exports
  • CRUD: create_password_reset_token, get_password_reset_token_by_hash, claim_password_reset_token, mark_password_reset_token_used, mark_user_password_reset_tokens_used, delete_expired_password_reset_tokens
  • Schemas: PasswordResetToken, PasswordResetRequest, PasswordResetConfirm, PasswordResetResponse
  • Models: PasswordResetToken (ORM model)

PasswordResetConfirm

Bases: BaseModel

Request schema for confirming a password reset.

Attributes:

Name Type Description
token StrictStr

The reset token received by the user.

new_password StrictStr

The new password to set.

Source code in backend/app/password_reset_tokens/schema.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class PasswordResetConfirm(BaseModel):
    """
    Request schema for confirming a password reset.

    Attributes:
        token: The reset token received by the user.
        new_password: The new password to set.
    """

    token: StrictStr = Field(
        ...,
        description="The reset token received by the user",
        min_length=1,
        max_length=256,
    )
    new_password: StrictStr = Field(
        ...,
        description="The new password to set",
        min_length=8,
        max_length=256,
    )

    model_config = ConfigDict(extra="forbid")

PasswordResetRequest

Bases: BaseModel

Request schema for initiating a password reset.

Attributes:

Name Type Description
email EmailStr

Email address for the reset.

Source code in backend/app/password_reset_tokens/schema.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class PasswordResetRequest(BaseModel):
    """
    Request schema for initiating a password reset.

    Attributes:
        email: Email address for the reset.
    """

    email: EmailStr = Field(
        ...,
        description="Email address for the reset",
    )

    model_config = ConfigDict(extra="forbid")

PasswordResetResponse

Bases: BaseModel

Response schema for password reset operations.

Attributes:

Name Type Description
message StrictStr

Informational message for the client.

Source code in backend/app/password_reset_tokens/schema.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class PasswordResetResponse(BaseModel):
    """
    Response schema for password reset operations.

    Attributes:
        message: Informational message for the client.
    """

    message: StrictStr = Field(
        ...,
        description="Informational message for the client",
    )

    model_config = ConfigDict(extra="forbid")

PasswordResetToken

Bases: BaseModel

Schema representing a password reset token record.

Attributes:

Name Type Description
id StrictStr

Unique identifier for the token.

user_id StrictInt

ID of the user who requested the reset.

token_hash StrictStr

Hashed token value.

created_at datetime

Timestamp when the token was created.

expires_at datetime

Timestamp when the token expires.

used StrictBool

Whether the token has already been used.

Source code in backend/app/password_reset_tokens/schema.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class PasswordResetToken(BaseModel):
    """
    Schema representing a password reset token record.

    Attributes:
        id: Unique identifier for the token.
        user_id: ID of the user who requested the reset.
        token_hash: Hashed token value.
        created_at: Timestamp when the token was created.
        expires_at: Timestamp when the token expires.
        used: Whether the token has already been used.
    """

    id: StrictStr = Field(
        ...,
        description="Unique identifier for the token",
        max_length=64,
    )
    user_id: StrictInt = Field(
        ...,
        description="ID of the user who requested the reset",
    )
    token_hash: StrictStr = Field(
        ...,
        description="Hashed token value",
        max_length=128,
    )
    created_at: datetime = Field(
        ...,
        description="Timestamp when the token was created",
    )
    expires_at: datetime = Field(
        ...,
        description="Timestamp when the token expires",
    )
    used: StrictBool = Field(
        ...,
        description="Whether the token has already been used",
    )

    model_config = ConfigDict(
        from_attributes=True,
        extra="forbid",
        validate_assignment=True,
    )

PasswordResetTokenModel

Bases: Base

Password reset token database model.

Attributes:

Name Type Description
id Mapped[str]

Unique token identifier (string, 64 chars).

user_id Mapped[int]

ID of the user who owns the token.

token_hash Mapped[str]

Hashed password reset token.

created_at Mapped[datetime]

Token creation date.

expires_at Mapped[datetime]

Token expiration date.

used Mapped[bool]

Whether the token has been used.

users Mapped[Users]

Relationship to the Users model.

Source code in backend/app/password_reset_tokens/models.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class PasswordResetToken(Base):
    """
    Password reset token database model.

    Attributes:
        id: Unique token identifier (string, 64 chars).
        user_id: ID of the user who owns the token.
        token_hash: Hashed password reset token.
        created_at: Token creation date.
        expires_at: Token expiration date.
        used: Whether the token has been used.
        users: Relationship to the Users model.
    """

    __tablename__ = "password_reset_tokens"

    id: Mapped[str] = mapped_column(String(64), primary_key=True)
    user_id: Mapped[int] = mapped_column(
        ForeignKey("users.id", ondelete="CASCADE"),
        nullable=False,
        index=True,
        comment=("User ID that the password reset token belongs to"),
    )
    token_hash: Mapped[str] = mapped_column(
        String(128),
        nullable=False,
        comment="Hashed password reset token",
    )
    created_at: Mapped[datetime_type] = mapped_column(
        DateTime(timezone=True),
        nullable=False,
        comment="Token creation date (datetime)",
    )
    expires_at: Mapped[datetime_type] = mapped_column(
        DateTime(timezone=True),
        nullable=False,
        comment="Token expiration date (datetime)",
    )
    used: Mapped[bool] = mapped_column(
        default=False,
        nullable=False,
        comment="Token usage status",
    )

    # Define a relationship to the Users model
    users: Mapped["Users"] = relationship(back_populates="password_reset_tokens")

claim_password_reset_token

claim_password_reset_token(token_hash, db)

Atomically claim a valid password reset token.

Parameters:

Name Type Description Default
token_hash str

SHA-256 hash of the plaintext reset token.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
int | None

User ID owning the claimed token, or None if the token is missing,

int | None

expired, or already used.

Raises:

Type Description
HTTPException

500 error if database operation fails.

Source code in backend/app/password_reset_tokens/crud.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@core_decorators.handle_db_errors
def claim_password_reset_token(token_hash: str, db: Session) -> int | None:
    """Atomically claim a valid password reset token.

    Args:
        token_hash: SHA-256 hash of the plaintext reset token.
        db: SQLAlchemy database session.

    Returns:
        User ID owning the claimed token, or None if the token is missing,
        expired, or already used.

    Raises:
        HTTPException: 500 error if database operation fails.
    """
    stmt = (
        sa_update(password_reset_tokens_models.PasswordResetToken)
        .where(
            password_reset_tokens_models.PasswordResetToken.token_hash
            == token_hash,
            password_reset_tokens_models.PasswordResetToken.used.is_(False),
            password_reset_tokens_models.PasswordResetToken.expires_at
            > datetime.now(timezone.utc),
        )
        .values(used=True)
        .returning(password_reset_tokens_models.PasswordResetToken.user_id)
    )
    return db.execute(stmt).scalar_one_or_none()

create_password_reset_token

create_password_reset_token(token, db)

Create and persist a new password reset token.

Parameters:

Name Type Description Default
token PasswordResetToken

Schema object with token data to persist.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
PasswordResetToken

The persisted PasswordResetToken ORM instance.

Raises:

Type Description
HTTPException

500 error if database operation fails.

Source code in backend/app/password_reset_tokens/crud.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@core_decorators.handle_db_errors
def create_password_reset_token(
    token: password_reset_tokens_schema.PasswordResetToken,
    db: Session,
) -> password_reset_tokens_models.PasswordResetToken:
    """Create and persist a new password reset token.

    Args:
        token: Schema object with token data to persist.
        db: SQLAlchemy database session.

    Returns:
        The persisted PasswordResetToken ORM instance.

    Raises:
        HTTPException: 500 error if database operation fails.
    """
    # Create a new password reset token
    db_token = password_reset_tokens_models.PasswordResetToken(
        id=token.id,
        user_id=token.user_id,
        token_hash=token.token_hash,
        created_at=token.created_at,
        expires_at=token.expires_at,
        used=token.used,
    )

    # Add the token to the database
    db.add(db_token)
    db.commit()
    db.refresh(db_token)

    return db_token

delete_expired_password_reset_tokens

delete_expired_password_reset_tokens(db)

Delete all expired password reset tokens.

Parameters:

Name Type Description Default
db Session

SQLAlchemy database session.

required

Returns:

Type Description
int

Number of deleted rows.

Raises:

Type Description
HTTPException

500 error if database operation fails.

Source code in backend/app/password_reset_tokens/crud.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
@core_decorators.handle_db_errors
def delete_expired_password_reset_tokens(db: Session) -> int:
    """Delete all expired password reset tokens.

    Args:
        db: SQLAlchemy database session.

    Returns:
        Number of deleted rows.

    Raises:
        HTTPException: 500 error if database operation fails.
    """
    stmt = sa_delete(password_reset_tokens_models.PasswordResetToken).where(
        password_reset_tokens_models.PasswordResetToken.expires_at
        < datetime.now(timezone.utc)
    )
    result = db.execute(stmt)
    db.commit()
    return result.rowcount

get_password_reset_token_by_hash

get_password_reset_token_by_hash(token_hash, db)

Retrieve an unused, unexpired token matching the given hash.

Parameters:

Name Type Description Default
token_hash str

The hashed token value to look up.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
PasswordResetToken | None

The matching PasswordResetToken if found and valid, None otherwise.

Raises:

Type Description
HTTPException

500 error if database query fails.

Source code in backend/app/password_reset_tokens/crud.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@core_decorators.handle_db_errors
def get_password_reset_token_by_hash(
    token_hash: str, db: Session
) -> password_reset_tokens_models.PasswordResetToken | None:
    """Retrieve an unused, unexpired token matching the given hash.

    Args:
        token_hash: The hashed token value to look up.
        db: SQLAlchemy database session.

    Returns:
        The matching PasswordResetToken if found and valid, None otherwise.

    Raises:
        HTTPException: 500 error if database query fails.
    """
    stmt = select(password_reset_tokens_models.PasswordResetToken).where(
        password_reset_tokens_models.PasswordResetToken.token_hash
        == token_hash,
        password_reset_tokens_models.PasswordResetToken.used.is_(False),
        password_reset_tokens_models.PasswordResetToken.expires_at
        > datetime.now(timezone.utc),
    )
    return db.execute(stmt).scalar_one_or_none()

mark_password_reset_token_used

mark_password_reset_token_used(token_id, db)

Mark a password reset token as used.

Parameters:

Name Type Description Default
token_id str

The unique identifier of the token to mark.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
PasswordResetToken | None

Updated PasswordResetToken instance if found, None otherwise.

Raises:

Type Description
HTTPException

500 error if database operation fails.

Source code in backend/app/password_reset_tokens/crud.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@core_decorators.handle_db_errors
def mark_password_reset_token_used(
    token_id: str, db: Session
) -> password_reset_tokens_models.PasswordResetToken | None:
    """Mark a password reset token as used.

    Args:
        token_id: The unique identifier of the token to mark.
        db: SQLAlchemy database session.

    Returns:
        Updated PasswordResetToken instance if found, None otherwise.

    Raises:
        HTTPException: 500 error if database operation fails.
    """
    stmt = select(password_reset_tokens_models.PasswordResetToken).where(
        password_reset_tokens_models.PasswordResetToken.id == token_id,
    )
    db_token = db.execute(stmt).scalar_one_or_none()

    if db_token:
        # Mark the token as used
        db_token.used = True
        db.commit()
        db.refresh(db_token)

    return db_token

mark_user_password_reset_tokens_used

mark_user_password_reset_tokens_used(user_id, db)

Mark all unused password reset tokens for a user as used.

Parameters:

Name Type Description Default
user_id int

User ID whose reset tokens should be invalidated.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
int

Number of rows marked as used.

Raises:

Type Description
HTTPException

500 error if database operation fails.

Source code in backend/app/password_reset_tokens/crud.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
@core_decorators.handle_db_errors
def mark_user_password_reset_tokens_used(user_id: int, db: Session) -> int:
    """Mark all unused password reset tokens for a user as used.

    Args:
        user_id: User ID whose reset tokens should be invalidated.
        db: SQLAlchemy database session.

    Returns:
        Number of rows marked as used.

    Raises:
        HTTPException: 500 error if database operation fails.
    """
    stmt = (
        sa_update(password_reset_tokens_models.PasswordResetToken)
        .where(
            password_reset_tokens_models.PasswordResetToken.user_id == user_id,
            password_reset_tokens_models.PasswordResetToken.used.is_(False),
        )
        .values(used=True)
    )
    result = db.execute(stmt)
    return result.rowcount or 0