Skip to content

API Reference

Rotated refresh token reuse detection.

RotatedRefreshToken

Bases: Base

Rotated refresh token for token reuse detection.

Attributes:

Name Type Description
id Mapped[int]

Primary key.

token_family_id Mapped[str]

UUID of the token family.

hashed_token Mapped[str]

Hashed old refresh token.

rotation_count Mapped[int]

Which rotation this token belonged to.

rotated_at Mapped[datetime]

When this token was rotated.

expires_at Mapped[datetime]

Cleanup marker (rotated_at + 60 seconds).

user_session Mapped[datetime]

Relationship to UsersSessions model.

Source code in backend/app/users/users_sessions/rotated_refresh_tokens/models.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class RotatedRefreshToken(Base):
    """
    Rotated refresh token for token reuse detection.

    Attributes:
        id: Primary key.
        token_family_id: UUID of the token family.
        hashed_token: Hashed old refresh token.
        rotation_count: Which rotation this token belonged to.
        rotated_at: When this token was rotated.
        expires_at: Cleanup marker (rotated_at + 60 seconds).
        user_session: Relationship to UsersSessions model.
    """

    __tablename__ = "rotated_refresh_tokens"

    id: Mapped[int] = mapped_column(
        primary_key=True,
        autoincrement=True,
    )
    token_family_id: Mapped[str] = mapped_column(
        String(36),
        ForeignKey("users_sessions.token_family_id", ondelete="CASCADE"),
        nullable=False,
        index=True,
        comment="UUID of the token family",
    )
    hashed_token: Mapped[str] = mapped_column(
        String(255),
        nullable=False,
        unique=True,
        comment="Hashed old refresh token",
    )
    rotation_count: Mapped[int] = mapped_column(
        nullable=False,
        comment="Which rotation this token belonged to",
    )
    rotated_at: Mapped[datetime] = mapped_column(
        nullable=False,
        comment="When this token was rotated",
    )
    expires_at: Mapped[datetime] = mapped_column(
        nullable=False,
        comment="Cleanup marker (rotated_at + 60 seconds)",
    )

    # Relationship to UsersSessions model
    # TODO: Change to Mapped["UsersSessions"] when all modules use mapped
    users_session = relationship(
        "UsersSessions",
        back_populates="rotated_refresh_tokens",
    )

RotatedRefreshTokenCreate

Bases: BaseModel

Schema for creating a rotated refresh token record.

Attributes:

Name Type Description
token_family_id StrictStr

UUID of the token family.

hashed_token StrictStr

Hashed old refresh token.

rotation_count StrictInt

Sequential rotation number for this token.

rotated_at datetime

Timestamp when this token was rotated.

expires_at datetime

Cleanup marker timestamp.

Source code in backend/app/users/users_sessions/rotated_refresh_tokens/schema.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class RotatedRefreshTokenCreate(BaseModel):
    """
    Schema for creating a rotated refresh token record.

    Attributes:
        token_family_id: UUID of the token family.
        hashed_token: Hashed old refresh token.
        rotation_count: Sequential rotation number for this
            token.
        rotated_at: Timestamp when this token was rotated.
        expires_at: Cleanup marker timestamp.
    """

    token_family_id: StrictStr = Field(
        ...,
        max_length=36,
        description="UUID of the token family",
    )
    hashed_token: StrictStr = Field(
        ...,
        max_length=255,
        description="Hashed old refresh token",
    )
    rotation_count: StrictInt = Field(
        ...,
        ge=0,
        description="Which rotation this token belonged to",
    )
    rotated_at: datetime = Field(
        ...,
        description="When this token was rotated",
    )
    expires_at: datetime = Field(
        ...,
        description="Cleanup marker (rotated_at + 60 seconds)",
    )

    model_config = ConfigDict(
        from_attributes=True,
        extra="forbid",
        validate_assignment=True,
    )

RotatedRefreshTokenRead

Bases: RotatedRefreshTokenCreate

Schema for reading a rotated refresh token record.

Attributes:

Name Type Description
id StrictInt

Unique identifier for the rotated token record.

Source code in backend/app/users/users_sessions/rotated_refresh_tokens/schema.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class RotatedRefreshTokenRead(RotatedRefreshTokenCreate):
    """
    Schema for reading a rotated refresh token record.

    Attributes:
        id: Unique identifier for the rotated token record.
    """

    id: StrictInt = Field(
        ...,
        ge=1,
        description="Unique identifier for the rotated token record",
    )

    model_config = ConfigDict(
        from_attributes=True,
        extra="forbid",
        validate_assignment=True,
    )

check_token_reuse

check_token_reuse(raw_token, db)

Check if a refresh token has been reused (already rotated).

Uses HMAC-SHA256 with the server secret to hash the token for lookup, ensuring deterministic matching.

Parameters:

Name Type Description Default
raw_token str

The raw refresh token to check.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
tuple[bool, bool]

Tuple of (is_reused, in_grace_period): - (False, False): Token is valid, not reused. - (True, True): Reused but within 60s grace period. - (True, False): Reused after grace period - THEFT!

Raises:

Type Description
HTTPException

If lookup fails.

Source code in backend/app/users/users_sessions/rotated_refresh_tokens/utils.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def check_token_reuse(raw_token: str, db: Session) -> tuple[bool, bool]:
    """
    Check if a refresh token has been reused (already rotated).

    Uses HMAC-SHA256 with the server secret to hash the token
    for lookup, ensuring deterministic matching.

    Args:
        raw_token: The raw refresh token to check.
        db: SQLAlchemy database session.

    Returns:
        Tuple of (is_reused, in_grace_period):
            - (False, False): Token is valid, not reused.
            - (True, True): Reused but within 60s grace period.
            - (True, False): Reused after grace period - THEFT!

    Raises:
        HTTPException: If lookup fails.
    """
    # Use HMAC-SHA256 for deterministic lookup
    hashed_token = hmac_hash_token(raw_token)
    rotated_token = rotated_token_crud.get_rotated_token_by_hash(hashed_token, db)

    if not rotated_token:
        return (False, False)

    # Token was already rotated - check grace period
    now = datetime.now(timezone.utc)

    if now <= rotated_token.expires_at:
        # Within grace period - might be legitimate retry
        core_logger.print_to_log(
            f"Token reuse within grace period for family "
            f"{rotated_token.token_family_id}",
            "warning",
            context={
                "token_family_id": rotated_token.token_family_id,
                "rotation_count": rotated_token.rotation_count,
            },
        )
        return (True, True)

    # Past grace period - likely theft!
    core_logger.print_to_log(
        f"Token reuse detected after grace period for family "
        f"{rotated_token.token_family_id}",
        "error",
        context={
            "token_family_id": rotated_token.token_family_id,
            "rotation_count": rotated_token.rotation_count,
            "rotated_at": rotated_token.rotated_at.isoformat(),
        },
    )
    return (True, False)

cleanup_expired_rotated_tokens

cleanup_expired_rotated_tokens()

Cleanup job to delete expired rotated tokens.

Called by the scheduler to periodically remove tokens that have exceeded the grace period. Should run every 1 minute. Exceptions are caught and logged to avoid breaking the scheduler.

Returns:

Type Description
None

None.

Source code in backend/app/users/users_sessions/rotated_refresh_tokens/utils.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def cleanup_expired_rotated_tokens() -> None:
    """
    Cleanup job to delete expired rotated tokens.

    Called by the scheduler to periodically remove tokens that
    have exceeded the grace period. Should run every 1 minute.
    Exceptions are caught and logged to avoid breaking the
    scheduler.

    Returns:
        None.
    """
    with SessionLocal() as db:
        try:
            cutoff_time = datetime.now(timezone.utc)
            deleted_count = rotated_token_crud.delete_expired_tokens(cutoff_time, db)

            if deleted_count > 0:
                core_logger.print_to_log(
                    f"Cleaned up {deleted_count} expired rotated tokens",
                    "info",
                )
        except Exception as err:
            core_logger.print_to_log(
                f"Error in cleanup_expired_rotated_tokens: {err}",
                "error",
                exc=err,
            )

create_rotated_token

create_rotated_token(rotated_token, db)

Store a rotated refresh token in the database.

Parameters:

Name Type Description Default
rotated_token RotatedRefreshTokenCreate

The rotated token data to store.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
RotatedRefreshToken

The created RotatedRefreshToken object.

Raises:

Type Description
HTTPException

If database error occurs.

Source code in backend/app/users/users_sessions/rotated_refresh_tokens/crud.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@core_decorators.handle_db_errors
def create_rotated_token(
    rotated_token: rotated_token_schema.RotatedRefreshTokenCreate,
    db: Session,
) -> rotated_token_models.RotatedRefreshToken:
    """
    Store a rotated refresh token in the database.

    Args:
        rotated_token: The rotated token data to store.
        db: SQLAlchemy database session.

    Returns:
        The created RotatedRefreshToken object.

    Raises:
        HTTPException: If database error occurs.
    """
    db_rotated_token = rotated_token_models.RotatedRefreshToken(
        token_family_id=rotated_token.token_family_id,
        hashed_token=rotated_token.hashed_token,
        rotation_count=rotated_token.rotation_count,
        rotated_at=rotated_token.rotated_at,
        expires_at=rotated_token.expires_at,
    )

    db.add(db_rotated_token)
    db.commit()
    db.refresh(db_rotated_token)

    return db_rotated_token

delete_by_family

delete_by_family(token_family_id, db)

Delete all rotated tokens for a specific token family.

Parameters:

Name Type Description Default
token_family_id str

The family ID to delete tokens for.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
int

Number of tokens deleted.

Raises:

Type Description
HTTPException

If database error occurs.

Source code in backend/app/users/users_sessions/rotated_refresh_tokens/crud.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@core_decorators.handle_db_errors
def delete_by_family(token_family_id: str, db: Session) -> int:
    """
    Delete all rotated tokens for a specific token family.

    Args:
        token_family_id: The family ID to delete tokens for.
        db: SQLAlchemy database session.

    Returns:
        Number of tokens deleted.

    Raises:
        HTTPException: If database error occurs.
    """
    stmt = delete(rotated_token_models.RotatedRefreshToken).where(
        rotated_token_models.RotatedRefreshToken.token_family_id == token_family_id
    )
    result = db.execute(stmt)
    db.commit()
    return result.rowcount

delete_expired_tokens

delete_expired_tokens(cutoff_time, db)

Delete rotated tokens older than the cutoff time.

Parameters:

Name Type Description Default
cutoff_time datetime

Tokens with expires_at before this deleted.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
int

Number of tokens deleted.

Raises:

Type Description
HTTPException

If database error occurs.

Source code in backend/app/users/users_sessions/rotated_refresh_tokens/crud.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@core_decorators.handle_db_errors
def delete_expired_tokens(cutoff_time: datetime, db: Session) -> int:
    """
    Delete rotated tokens older than the cutoff time.

    Args:
        cutoff_time: Tokens with expires_at before this deleted.
        db: SQLAlchemy database session.

    Returns:
        Number of tokens deleted.

    Raises:
        HTTPException: If database error occurs.
    """
    stmt = delete(rotated_token_models.RotatedRefreshToken).where(
        rotated_token_models.RotatedRefreshToken.expires_at < cutoff_time
    )
    result = db.execute(stmt)
    db.commit()
    return result.rowcount

get_rotated_token_by_hash

get_rotated_token_by_hash(hashed_token, db)

Retrieve a rotated token by its hashed value.

Parameters:

Name Type Description Default
hashed_token str

The hashed refresh token to search for.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
RotatedRefreshToken | None

The RotatedRefreshToken if found, None otherwise.

Raises:

Type Description
HTTPException

If database error occurs.

Source code in backend/app/users/users_sessions/rotated_refresh_tokens/crud.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@core_decorators.handle_db_errors
def get_rotated_token_by_hash(
    hashed_token: str,
    db: Session,
) -> rotated_token_models.RotatedRefreshToken | None:
    """
    Retrieve a rotated token by its hashed value.

    Args:
        hashed_token: The hashed refresh token to search for.
        db: SQLAlchemy database session.

    Returns:
        The RotatedRefreshToken if found, None otherwise.

    Raises:
        HTTPException: If database error occurs.
    """
    stmt = select(rotated_token_models.RotatedRefreshToken).where(
        rotated_token_models.RotatedRefreshToken.hashed_token == hashed_token
    )
    return db.execute(stmt).scalar_one_or_none()

hmac_hash_token

hmac_hash_token(token)

Compute HMAC-SHA256 hash of a token for secure lookup.

Uses the server's SECRET_KEY as the HMAC key, providing defense-in-depth: even if the database is compromised, an attacker cannot verify stolen tokens without the key.

Parameters:

Name Type Description Default
token str

The raw refresh token to hash.

required

Returns:

Type Description
str

Hex-encoded HMAC-SHA256 hash of the token.

Raises:

Type Description
ValueError

If JWT_SECRET_KEY is not configured.

Source code in backend/app/users/users_sessions/rotated_refresh_tokens/utils.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def hmac_hash_token(token: str) -> str:
    """
    Compute HMAC-SHA256 hash of a token for secure lookup.

    Uses the server's SECRET_KEY as the HMAC key, providing
    defense-in-depth: even if the database is compromised,
    an attacker cannot verify stolen tokens without the key.

    Args:
        token: The raw refresh token to hash.

    Returns:
        Hex-encoded HMAC-SHA256 hash of the token.

    Raises:
        ValueError: If JWT_SECRET_KEY is not configured.
    """
    secret_key = auth_constants.JWT_SECRET_KEY
    if not secret_key:
        raise ValueError("JWT_SECRET_KEY is not configured")

    return hmac.new(
        secret_key.encode(),
        token.encode(),
        hashlib.sha256,
    ).hexdigest()

invalidate_token_family

invalidate_token_family(token_family_id, db)

Invalidate all sessions in a token family due to reuse detection.

Parameters:

Name Type Description Default
token_family_id str

The family ID to invalidate.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
int

Number of sessions invalidated.

Raises:

Type Description
HTTPException

If invalidation fails.

Source code in backend/app/users/users_sessions/rotated_refresh_tokens/utils.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def invalidate_token_family(token_family_id: str, db: Session) -> int:
    """
    Invalidate all sessions in a token family due to reuse detection.

    Args:
        token_family_id: The family ID to invalidate.
        db: SQLAlchemy database session.

    Returns:
        Number of sessions invalidated.

    Raises:
        HTTPException: If invalidation fails.
    """
    # Delete all sessions in the family
    num_sessions_deleted = users_session_crud.delete_sessions_by_family(
        token_family_id, db
    )

    # Delete all rotated tokens for this family
    num_tokens_deleted = rotated_token_crud.delete_by_family(token_family_id, db)

    core_logger.print_to_log(
        f"Invalidated token family {token_family_id} due to reuse: "
        f"{num_sessions_deleted} sessions, {num_tokens_deleted} tokens",
        "error",
        context={
            "token_family_id": token_family_id,
            "sessions_deleted": num_sessions_deleted,
            "tokens_deleted": num_tokens_deleted,
        },
    )

    return num_sessions_deleted

store_rotated_token

store_rotated_token(raw_token, token_family_id, rotation_count, db)

Store an old refresh token after rotation for reuse detection.

Uses HMAC-SHA256 with the server secret to hash the token, enabling deterministic lookups while maintaining security.

Parameters:

Name Type Description Default
raw_token str

The raw refresh token being rotated out.

required
token_family_id str

UUID of the token family.

required
rotation_count int

Current rotation count for this token.

required
db Session

SQLAlchemy database session.

required

Raises:

Type Description
HTTPException

If storage fails.

Source code in backend/app/users/users_sessions/rotated_refresh_tokens/utils.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def store_rotated_token(
    raw_token: str,
    token_family_id: str,
    rotation_count: int,
    db: Session,
) -> None:
    """
    Store an old refresh token after rotation for reuse detection.

    Uses HMAC-SHA256 with the server secret to hash the token,
    enabling deterministic lookups while maintaining security.

    Args:
        raw_token: The raw refresh token being rotated out.
        token_family_id: UUID of the token family.
        rotation_count: Current rotation count for this token.
        db: SQLAlchemy database session.

    Raises:
        HTTPException: If storage fails.
    """
    now = datetime.now(timezone.utc)
    expires_at = now + timedelta(seconds=TOKEN_REUSE_GRACE_PERIOD_SECONDS)

    # Use HMAC-SHA256 for deterministic, secure hashing
    hashed_token = hmac_hash_token(raw_token)

    rotated_token = rotated_token_schema.RotatedRefreshTokenCreate(
        token_family_id=token_family_id,
        hashed_token=hashed_token,
        rotation_count=rotation_count,
        rotated_at=now,
        expires_at=expires_at,
    )

    rotated_token_crud.create_rotated_token(rotated_token, db)