Skip to content

API Reference

User session management module.

This module provides session management including CRUD operations, device detection, timeout validation, and token rotation for security.

Exports
  • CRUD: get_user_sessions, get_session_by_id, get_session_by_id_not_expired, get_session_with_oauth_state, create_session, mark_tokens_exchanged, edit_session, delete_session, delete_idle_sessions, delete_sessions_by_family
  • Schemas: UsersSessionsBase, UsersSessionsRead, UsersSessionsInternal
  • Models: UsersSessions (ORM model)
  • Utils: DeviceType, DeviceInfo, validate_session_timeout, create_session_object, edit_session_object, create_session, edit_session, get_user_agent, get_ip_address, parse_user_agent, cleanup_idle_sessions

DeviceInfo dataclass

Device information container.

Attributes:

Name Type Description
device_type DeviceType

Device type (mobile, tablet, PC).

operating_system str

OS name.

operating_system_version str

OS version string.

browser str

Browser name.

browser_version str

Browser version string.

Source code in backend/app/users/users_sessions/utils.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@dataclass
class DeviceInfo:
    """
    Device information container.

    Attributes:
        device_type: Device type (mobile, tablet, PC).
        operating_system: OS name.
        operating_system_version: OS version string.
        browser: Browser name.
        browser_version: Browser version string.
    """

    device_type: DeviceType
    operating_system: str
    operating_system_version: str
    browser: str
    browser_version: str

DeviceType

Bases: Enum

Device type enumeration.

Attributes:

Name Type Description
MOBILE

Mobile device.

TABLET

Tablet device.

PC

Desktop/laptop device.

Source code in backend/app/users/users_sessions/utils.py
28
29
30
31
32
33
34
35
36
37
38
39
40
class DeviceType(Enum):
    """
    Device type enumeration.

    Attributes:
        MOBILE: Mobile device.
        TABLET: Tablet device.
        PC: Desktop/laptop device.
    """

    MOBILE = "Mobile"
    TABLET = "Tablet"
    PC = "PC"

UsersSessionsBase

Bases: BaseModel

Base user session schema with safe fields.

Attributes:

Name Type Description
id StrictStr

Unique session identifier.

ip_address StrictStr

Client IP address.

device_type StrictStr

Device type.

operating_system StrictStr

Operating system name.

operating_system_version StrictStr

OS version string.

browser StrictStr

Browser name.

browser_version StrictStr

Browser version string.

created_at datetime

Session creation timestamp.

last_activity_at datetime

Last activity timestamp.

expires_at datetime

Session expiration timestamp.

Source code in backend/app/users/users_sessions/schema.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class UsersSessionsBase(BaseModel):
    """
    Base user session schema with safe fields.

    Attributes:
        id: Unique session identifier.
        ip_address: Client IP address.
        device_type: Device type.
        operating_system: Operating system name.
        operating_system_version: OS version string.
        browser: Browser name.
        browser_version: Browser version string.
        created_at: Session creation timestamp.
        last_activity_at: Last activity timestamp.
        expires_at: Session expiration timestamp.
    """

    id: StrictStr = Field(..., description="Unique session identifier")
    ip_address: StrictStr = Field(..., max_length=45, description="Client IP address")
    device_type: StrictStr = Field(..., max_length=45, description="Device type")
    operating_system: StrictStr = Field(
        ..., max_length=45, description="Operating system"
    )
    operating_system_version: StrictStr = Field(
        ..., max_length=45, description="OS version"
    )
    browser: StrictStr = Field(..., max_length=45, description="Browser name")
    browser_version: StrictStr = Field(
        ..., max_length=45, description="Browser version"
    )
    created_at: datetime = Field(..., description="Session creation timestamp")
    last_activity_at: datetime = Field(..., description="Last activity timestamp")
    expires_at: datetime = Field(..., description="Session expiration timestamp")

    model_config = ConfigDict(from_attributes=True)

UsersSessionsInternal

Bases: UsersSessionsBase

Internal user session schema with all fields.

Used for CRUD operations. Includes sensitive fields like refresh_token and csrf_token_hash that should never be exposed in API responses.

Attributes:

Name Type Description
user_id StrictInt

User ID that owns this session.

refresh_token StrictStr | None

Hashed session refresh token.

oauth_state_id StrictStr | None

Link to OAuth state for PKCE.

tokens_exchanged StrictBool

Prevents duplicate mobile token exchange.

token_family_id StrictStr

UUID for token family reuse detection.

rotation_count StrictInt

Number of times refresh token rotated.

last_rotation_at datetime | None

Timestamp of last token rotation.

csrf_token_hash StrictStr | None

Hashed CSRF token for refresh validation.

Source code in backend/app/users/users_sessions/schema.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class UsersSessionsInternal(UsersSessionsBase):
    """
    Internal user session schema with all fields.

    Used for CRUD operations. Includes sensitive fields like
    refresh_token and csrf_token_hash that should never be
    exposed in API responses.

    Attributes:
        user_id: User ID that owns this session.
        refresh_token: Hashed session refresh token.
        oauth_state_id: Link to OAuth state for PKCE.
        tokens_exchanged: Prevents duplicate mobile token
            exchange.
        token_family_id: UUID for token family reuse
            detection.
        rotation_count: Number of times refresh token
            rotated.
        last_rotation_at: Timestamp of last token rotation.
        csrf_token_hash: Hashed CSRF token for refresh
            validation.
    """

    user_id: StrictInt = Field(..., ge=1, description="User ID that owns this session")
    refresh_token: StrictStr | None = Field(
        None, description="Hashed session refresh token"
    )
    oauth_state_id: StrictStr | None = Field(
        None,
        max_length=64,
        description="Link to OAuth state for PKCE validation",
    )
    tokens_exchanged: StrictBool = Field(
        default=False,
        description="Prevents duplicate token exchange for " "mobile",
    )
    token_family_id: StrictStr = Field(
        ...,
        description="UUID identifying token family for reuse " "detection",
    )
    rotation_count: StrictInt = Field(
        default=0,
        ge=0,
        description="Number of times refresh token has been " "rotated",
    )
    last_rotation_at: datetime | None = Field(
        None, description="Timestamp of last token rotation"
    )
    csrf_token_hash: StrictStr | None = Field(
        None,
        max_length=255,
        description="Hashed CSRF token for refresh " "validation",
    )

    model_config = ConfigDict(
        from_attributes=True,
        extra="forbid",
        validate_assignment=True,
    )

UsersSessionsModel

Bases: Base

User authentication session for tracking active logins.

Attributes:

Name Type Description
id Mapped[str]

Unique session identifier (UUID).

user_id Mapped[int]

Foreign key to users table.

refresh_token Mapped[str]

Hashed refresh token for the session.

ip_address Mapped[str]

Client IP address.

device_type Mapped[str]

Type of device (Mobile, Tablet, PC).

operating_system Mapped[str]

Operating system name.

operating_system_version Mapped[str]

Operating system version.

browser Mapped[str]

Browser name.

browser_version Mapped[str]

Browser version.

created_at Mapped[datetime]

Session creation timestamp.

last_activity_at Mapped[datetime]

Last activity timestamp for idle timeout.

expires_at Mapped[datetime]

Session expiration timestamp.

oauth_state_id Mapped[str | None]

Link to OAuth state for PKCE validation.

tokens_exchanged Mapped[bool]

Prevents duplicate token exchange for mobile.

token_family_id Mapped[str]

UUID identifying token family for reuse detection.

rotation_count Mapped[int]

Number of times refresh token has been rotated.

last_rotation_at Mapped[datetime | None]

Timestamp of last token rotation.

csrf_token_hash Mapped[str | None]

Hashed CSRF token for refresh validation.

users

Relationship to Users model.

oauth_state

Relationship to OAuthState model.

rotated_refresh_tokens

Relationship to RotatedRefreshToken model.

Source code in backend/app/users/users_sessions/models.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class UsersSessions(Base):
    """
    User authentication session for tracking active logins.

    Attributes:
        id: Unique session identifier (UUID).
        user_id: Foreign key to users table.
        refresh_token: Hashed refresh token for the session.
        ip_address: Client IP address.
        device_type: Type of device (Mobile, Tablet, PC).
        operating_system: Operating system name.
        operating_system_version: Operating system version.
        browser: Browser name.
        browser_version: Browser version.
        created_at: Session creation timestamp.
        last_activity_at: Last activity timestamp for idle
            timeout.
        expires_at: Session expiration timestamp.
        oauth_state_id: Link to OAuth state for PKCE validation.
        tokens_exchanged: Prevents duplicate token exchange for
            mobile.
        token_family_id: UUID identifying token family for reuse
            detection.
        rotation_count: Number of times refresh token has been
            rotated.
        last_rotation_at: Timestamp of last token rotation.
        csrf_token_hash: Hashed CSRF token for refresh
            validation.
        users: Relationship to Users model.
        oauth_state: Relationship to OAuthState model.
        rotated_refresh_tokens: Relationship to
            RotatedRefreshToken model.
    """

    __tablename__ = "users_sessions"

    id: Mapped[str] = mapped_column(
        String(36),
        primary_key=True,
        nullable=False,
    )
    user_id: Mapped[int] = mapped_column(
        ForeignKey("users.id", ondelete="CASCADE"),
        nullable=False,
        index=True,
        comment="User ID that the session belongs",
    )
    refresh_token: Mapped[str] = mapped_column(
        String(255),
        nullable=True,
        comment="Session hashed refresh token",
    )
    ip_address: Mapped[str] = mapped_column(
        String(45),
        nullable=False,
        comment="Client IP address",
    )
    device_type: Mapped[str] = mapped_column(
        String(45),
        nullable=False,
        comment="Device type",
    )
    operating_system: Mapped[str] = mapped_column(
        String(45),
        nullable=False,
        comment="Operating system",
    )
    operating_system_version: Mapped[str] = mapped_column(
        String(45),
        nullable=False,
        comment="Operating system version",
    )
    browser: Mapped[str] = mapped_column(
        String(45),
        nullable=False,
        comment="Browser",
    )
    browser_version: Mapped[str] = mapped_column(
        String(45),
        nullable=False,
        comment="Browser version",
    )
    created_at: Mapped[datetime] = mapped_column(
        nullable=False,
        comment="Session creation date (datetime)",
    )
    last_activity_at: Mapped[datetime] = mapped_column(
        nullable=False,
        comment="Last activity timestamp for idle timeout",
    )
    expires_at: Mapped[datetime] = mapped_column(
        nullable=False,
        comment="Session expiration date (datetime)",
    )
    oauth_state_id: Mapped[str | None] = mapped_column(
        String(64),
        ForeignKey("oauth_states.id", ondelete="SET NULL"),
        nullable=True,
        index=True,
        comment="Link to OAuth state for PKCE validation",
    )
    tokens_exchanged: Mapped[bool] = mapped_column(
        default=False,
        nullable=False,
        comment="Prevents duplicate token exchange for mobile",
    )
    token_family_id: Mapped[str] = mapped_column(
        String(36),
        nullable=False,
        unique=True,
        index=True,
        comment="UUID identifying token family for reuse detection",
    )
    rotation_count: Mapped[int] = mapped_column(
        default=0,
        nullable=False,
        comment="Number of times refresh token has been rotated",
    )
    last_rotation_at: Mapped[datetime | None] = mapped_column(
        nullable=True,
        comment="Timestamp of last token rotation",
    )
    csrf_token_hash: Mapped[str | None] = mapped_column(
        String(255),
        nullable=True,
        comment="Hashed CSRF token for refresh validation",
    )

    # Relationship to Users model
    # TODO: Change to Mapped["Users"] when all modules use mapped
    users = relationship("Users", back_populates="users_sessions")

    # Relationship to OAuthState model
    # TODO: Change to Mapped["OAuthState"] when all modules use mapped
    oauth_state = relationship("OAuthState", back_populates="users_sessions")

    # Relationship to RotatedRefreshToken model
    # TODO: Change to Mapped["RotatedRefreshToken"] when all modules use mapped
    rotated_refresh_tokens = relationship(
        "RotatedRefreshToken",
        back_populates="users_session",
        cascade="all, delete-orphan",
    )

UsersSessionsRead

Bases: UsersSessionsBase

User session read schema for API responses.

Extends base with user_id. Excludes sensitive fields like refresh_token and csrf_token_hash.

Source code in backend/app/users/users_sessions/schema.py
51
52
53
54
55
56
57
58
59
class UsersSessionsRead(UsersSessionsBase):
    """
    User session read schema for API responses.

    Extends base with user_id. Excludes sensitive fields
    like refresh_token and csrf_token_hash.
    """

    user_id: StrictInt = Field(..., ge=1, description="User ID that owns this session")

cleanup_idle_sessions

cleanup_idle_sessions()

Clean up idle sessions exceeding timeout threshold.

Removes sessions inactive longer than the configured idle timeout period. Only runs if SESSION_IDLE_TIMEOUT_ENABLED. Logs count of cleaned sessions.

Raises:

Type Description
HTTPException

If database error occurs.

Source code in backend/app/users/users_sessions/utils.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def cleanup_idle_sessions() -> None:
    """
    Clean up idle sessions exceeding timeout threshold.

    Removes sessions inactive longer than the configured idle
    timeout period. Only runs if SESSION_IDLE_TIMEOUT_ENABLED.
    Logs count of cleaned sessions.

    Raises:
        HTTPException: If database error occurs.
    """
    if not auth_constants.SESSION_IDLE_TIMEOUT_ENABLED:
        return

    with SessionLocal() as db:
        try:
            cutoff_time = datetime.now(timezone.utc) - timedelta(
                hours=auth_constants.SESSION_IDLE_TIMEOUT_HOURS
            )

            # Delete sessions with last_activity_at older than cutoff
            deleted_count = users_session_crud.delete_idle_sessions(cutoff_time, db)

            if deleted_count > 0:
                core_logger.print_to_log(
                    f"Cleaned up {deleted_count} idle sessions", "info"
                )
        except Exception as err:
            core_logger.print_to_log(
                f"Error in cleanup_idle_sessions: {err}",
                "error",
                exc=err,
            )

create_session

create_session(session, db)

Create a new user session in the database.

Parameters:

Name Type Description Default
session UsersSessionsInternal

The session data to be created.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
UsersSessions

The newly created session object.

Raises:

Type Description
HTTPException

If database error occurs.

Source code in backend/app/users/users_sessions/crud.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
@core_decorators.handle_db_errors
def create_session(
    session: users_session_schema.UsersSessionsInternal,
    db: Session,
) -> users_session_models.UsersSessions:
    """
    Create a new user session in the database.

    Args:
        session: The session data to be created.
        db: SQLAlchemy database session.

    Returns:
        The newly created session object.

    Raises:
        HTTPException: If database error occurs.
    """
    db_session = users_session_models.UsersSessions(**session.model_dump())
    db.add(db_session)
    db.commit()
    db.refresh(db_session)
    return db_session

create_session_object

create_session_object(session_id, user, request, hashed_refresh_token, refresh_token_exp, oauth_state_id=None, csrf_token_hash=None)

Create session object with device and request metadata.

Parameters:

Name Type Description Default
session_id str

Unique identifier for the session.

required
user UsersRead

The user associated with the session.

required
request Request

HTTP request containing client information.

required
hashed_refresh_token str | None

Hashed refresh token.

required
refresh_token_exp datetime

Refresh token expiration datetime.

required
oauth_state_id str | None

Optional OAuth state ID for PKCE.

None
csrf_token_hash str | None

Hashed CSRF token for validation.

None

Returns:

Type Description
UsersSessionsInternal

Session object with user, device, and request details.

Source code in backend/app/users/users_sessions/utils.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def create_session_object(
    session_id: str,
    user: users_schema.UsersRead,
    request: Request,
    hashed_refresh_token: str | None,
    refresh_token_exp: datetime,
    oauth_state_id: str | None = None,
    csrf_token_hash: str | None = None,
) -> users_session_schema.UsersSessionsInternal:
    """
    Create session object with device and request metadata.

    Args:
        session_id: Unique identifier for the session.
        user: The user associated with the session.
        request: HTTP request containing client information.
        hashed_refresh_token: Hashed refresh token.
        refresh_token_exp: Refresh token expiration datetime.
        oauth_state_id: Optional OAuth state ID for PKCE.
        csrf_token_hash: Hashed CSRF token for validation.

    Returns:
        Session object with user, device, and request details.
    """
    user_agent = get_user_agent(request)
    device_info = parse_user_agent(user_agent)

    now = datetime.now(timezone.utc)

    return users_session_schema.UsersSessionsInternal(
        id=session_id,
        user_id=user.id,
        refresh_token=hashed_refresh_token,
        ip_address=get_ip_address(request),
        device_type=device_info.device_type.value,
        operating_system=device_info.operating_system,
        operating_system_version=device_info.operating_system_version,
        browser=device_info.browser,
        browser_version=device_info.browser_version,
        created_at=now,
        last_activity_at=now,
        expires_at=refresh_token_exp,
        oauth_state_id=oauth_state_id,
        tokens_exchanged=False,
        token_family_id=session_id,
        rotation_count=0,
        last_rotation_at=None,
        csrf_token_hash=csrf_token_hash,
    )

delete_idle_sessions

delete_idle_sessions(cutoff_time, db)

Delete sessions exceeding the idle timeout threshold.

Removes sessions where last_activity_at is older than the cutoff time. Used by cleanup scheduler to remove inactive sessions.

Parameters:

Name Type Description Default
cutoff_time datetime

Sessions with last_activity_at before this time will be deleted.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
int

Number of sessions deleted.

Raises:

Type Description
HTTPException

If database error occurs.

Source code in backend/app/users/users_sessions/crud.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
@core_decorators.handle_db_errors
def delete_idle_sessions(
    cutoff_time: datetime,
    db: Session,
) -> int:
    """
    Delete sessions exceeding the idle timeout threshold.

    Removes sessions where last_activity_at is older than the
    cutoff time. Used by cleanup scheduler to remove inactive
    sessions.

    Args:
        cutoff_time: Sessions with last_activity_at before this
            time will be deleted.
        db: SQLAlchemy database session.

    Returns:
        Number of sessions deleted.

    Raises:
        HTTPException: If database error occurs.
    """
    stmt = delete(users_session_models.UsersSessions).where(
        users_session_models.UsersSessions.last_activity_at < cutoff_time
    )
    result = db.execute(stmt)
    db.commit()
    return result.rowcount

delete_session

delete_session(session_id, user_id, db)

Delete a user session and its associated resources.

Deletes rotated tokens, the session, and any linked OAuth state. Used when user explicitly logs out a session.

Parameters:

Name Type Description Default
session_id str

The unique identifier of the session to delete.

required
user_id int

The ID of the user associated with the session.

required
db Session

SQLAlchemy database session.

required

Raises:

Type Description
HTTPException

If session not found (404) or database error occurs (500).

Source code in backend/app/users/users_sessions/crud.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
@core_decorators.handle_db_errors
def delete_session(
    session_id: str,
    user_id: int,
    db: Session,
) -> None:
    """
    Delete a user session and its associated resources.

    Deletes rotated tokens, the session, and any linked OAuth
    state. Used when user explicitly logs out a session.

    Args:
        session_id: The unique identifier of the session to
            delete.
        user_id: The ID of the user associated with the session.
        db: SQLAlchemy database session.

    Raises:
        HTTPException: If session not found (404) or database
            error occurs (500).
    """
    # Get the session to retrieve token_family_id before deletion
    stmt = select(users_session_models.UsersSessions).where(
        users_session_models.UsersSessions.id == session_id,
        users_session_models.UsersSessions.user_id == user_id,
    )
    session = db.execute(stmt).scalar_one_or_none()

    # Check if the session was found
    if session is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=(f"Session {session_id} not found for user " f"{user_id}"),
        )

    # Store oauth_state_id before deleting session (if exists)
    oauth_state_id_to_delete = session.oauth_state_id

    # Delete rotated tokens for this session's family
    users_session_rotated_tokens_crud.delete_by_family(session.token_family_id, db)

    # Delete the session
    stmt = delete(users_session_models.UsersSessions).where(
        users_session_models.UsersSessions.id == session_id,
        users_session_models.UsersSessions.user_id == user_id,
    )
    db.execute(stmt)

    # Delete OAuth state after session is deleted if exists
    if oauth_state_id_to_delete:
        oauth_state_crud.delete_oauth_state(oauth_state_id_to_delete, db)

    db.commit()

delete_sessions_by_family

delete_sessions_by_family(token_family_id, db)

Delete all sessions belonging to a token family.

Used when token reuse is detected to invalidate entire session family as security measure.

Parameters:

Name Type Description Default
token_family_id str

The family ID to delete sessions for.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
int

Number of sessions deleted.

Raises:

Type Description
HTTPException

If database error occurs.

Source code in backend/app/users/users_sessions/crud.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
@core_decorators.handle_db_errors
def delete_sessions_by_family(
    token_family_id: str,
    db: Session,
) -> int:
    """
    Delete all sessions belonging to a token family.

    Used when token reuse is detected to invalidate entire
    session family as security measure.

    Args:
        token_family_id: The family ID to delete sessions for.
        db: SQLAlchemy database session.

    Returns:
        Number of sessions deleted.

    Raises:
        HTTPException: If database error occurs.
    """
    stmt = delete(users_session_models.UsersSessions).where(
        users_session_models.UsersSessions.token_family_id == token_family_id
    )
    result = db.execute(stmt)
    db.commit()
    return result.rowcount

edit_session

edit_session(session, db)

Update an existing user session with new field values.

Parameters:

Name Type Description Default
session UsersSessionsInternal

Session data with fields to update.

required
db Session

SQLAlchemy database session.

required

Raises:

Type Description
HTTPException

If session not found (404) or database error occurs (500).

Source code in backend/app/users/users_sessions/crud.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
@core_decorators.handle_db_errors
def edit_session(
    session: users_session_schema.UsersSessionsInternal,
    db: Session,
) -> None:
    """
    Update an existing user session with new field values.

    Args:
        session: Session data with fields to update.
        db: SQLAlchemy database session.

    Raises:
        HTTPException: If session not found (404) or database
            error occurs (500).
    """
    # Get the session from the database
    db_session = get_session_by_id(session.id, db)

    # Check if the session exists
    if not db_session:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Session {session.id} not found",
        )

    # Update fields dynamically
    session_data = session.model_dump(exclude_unset=True)
    for key, value in session_data.items():
        setattr(db_session, key, value)

    db.commit()
    db.refresh(db_session)

edit_session_object

edit_session_object(request, hashed_refresh_token, refresh_token_exp, session, csrf_token_hash=None)

Create updated session object with new token and metadata.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request object.

required
hashed_refresh_token str

Hashed refresh token.

required
refresh_token_exp datetime

Refresh token expiration datetime.

required
session UsersSessionsInternal

The existing session object to update.

required
csrf_token_hash str | None

Hashed CSRF token for validation.

None

Returns:

Type Description
UsersSessionsInternal

Updated session object with device and token details.

Source code in backend/app/users/users_sessions/utils.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def edit_session_object(
    request: Request,
    hashed_refresh_token: str,
    refresh_token_exp: datetime,
    session: users_session_schema.UsersSessionsInternal,
    csrf_token_hash: str | None = None,
) -> users_session_schema.UsersSessionsInternal:
    """
    Create updated session object with new token and metadata.

    Args:
        request: The incoming HTTP request object.
        hashed_refresh_token: Hashed refresh token.
        refresh_token_exp: Refresh token expiration datetime.
        session: The existing session object to update.
        csrf_token_hash: Hashed CSRF token for validation.

    Returns:
        Updated session object with device and token details.
    """
    user_agent = get_user_agent(request)
    device_info = parse_user_agent(user_agent)

    now = datetime.now(timezone.utc)
    new_rotation_count = session.rotation_count + 1

    return users_session_schema.UsersSessionsInternal(
        id=session.id,
        user_id=session.user_id,
        refresh_token=hashed_refresh_token,
        ip_address=get_ip_address(request),
        device_type=device_info.device_type.value,
        operating_system=device_info.operating_system,
        operating_system_version=device_info.operating_system_version,
        browser=device_info.browser,
        browser_version=device_info.browser_version,
        created_at=session.created_at,
        last_activity_at=now,
        expires_at=refresh_token_exp,
        oauth_state_id=session.oauth_state_id,
        tokens_exchanged=session.tokens_exchanged,
        token_family_id=session.token_family_id,
        rotation_count=new_rotation_count,
        last_rotation_at=now,
        csrf_token_hash=csrf_token_hash,
    )

get_ip_address

get_ip_address(request)

Extract client IP address from request.

Checks proxy headers (X-Forwarded-For, X-Real-IP) first, then falls back to direct client host.

Parameters:

Name Type Description Default
request Request

Request object with headers and client info.

required

Returns:

Type Description
str

Client IP address or "unknown" if indeterminate.

Source code in backend/app/users/users_sessions/utils.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def get_ip_address(request: Request) -> str:
    """
    Extract client IP address from request.

    Checks proxy headers (X-Forwarded-For, X-Real-IP) first,
    then falls back to direct client host.

    Args:
        request: Request object with headers and client info.

    Returns:
        Client IP address or "unknown" if indeterminate.
    """
    # Check for proxy headers first
    forwarded_for = request.headers.get("X-Forwarded-For")
    if forwarded_for:
        # Take the first IP in the chain
        return forwarded_for.split(",")[0].strip()

    real_ip = request.headers.get("X-Real-IP")
    if real_ip:
        return real_ip

    return request.client.host if request.client else "unknown"

get_session_by_id

get_session_by_id(session_id, db)

Retrieve a user session by ID.

Parameters:

Name Type Description Default
session_id str

The unique identifier of the session.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
UsersSessions | None

The session object if found, None otherwise.

Raises:

Type Description
HTTPException

If database error occurs.

Source code in backend/app/users/users_sessions/crud.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@core_decorators.handle_db_errors
def get_session_by_id(
    session_id: str,
    db: Session,
) -> users_session_models.UsersSessions | None:
    """
    Retrieve a user session by ID.

    Args:
        session_id: The unique identifier of the session.
        db: SQLAlchemy database session.

    Returns:
        The session object if found, None otherwise.

    Raises:
        HTTPException: If database error occurs.
    """
    stmt = select(users_session_models.UsersSessions).where(
        users_session_models.UsersSessions.id == session_id
    )
    return db.execute(stmt).scalar_one_or_none()

get_session_by_id_not_expired

get_session_by_id_not_expired(session_id, db)

Retrieve a user session by ID if not expired.

Parameters:

Name Type Description Default
session_id str

The unique identifier of the session.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
UsersSessions | None

The session object if found and not expired, None otherwise.

Raises:

Type Description
HTTPException

If database error occurs.

Source code in backend/app/users/users_sessions/crud.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@core_decorators.handle_db_errors
def get_session_by_id_not_expired(
    session_id: str,
    db: Session,
) -> users_session_models.UsersSessions | None:
    """
    Retrieve a user session by ID if not expired.

    Args:
        session_id: The unique identifier of the session.
        db: SQLAlchemy database session.

    Returns:
        The session object if found and not expired, None otherwise.

    Raises:
        HTTPException: If database error occurs.
    """
    stmt = (
        select(users_session_models.UsersSessions)
        .where(users_session_models.UsersSessions.id == session_id)
        .where(
            users_session_models.UsersSessions.expires_at > datetime.now(timezone.utc)
        )
    )
    return db.execute(stmt).scalar_one_or_none()

get_session_with_oauth_state

get_session_with_oauth_state(session_id, db)

Retrieve a session with its OAuth state for token exchange.

Performs a query to retrieve a session with its linked OAuth state record (if any). Used during mobile token exchange to validate PKCE and ensure the session is valid.

Parameters:

Name Type Description Default
session_id str

The unique identifier of the session.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
tuple[UsersSessions, OAuthState | None] | None

Tuple of (session, oauth_state) where oauth_state may be

tuple[UsersSessions, OAuthState | None] | None

None if not linked. Returns None if session not found.

Raises:

Type Description
HTTPException

If database error occurs.

Source code in backend/app/users/users_sessions/crud.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@core_decorators.handle_db_errors
def get_session_with_oauth_state(
    session_id: str,
    db: Session,
) -> (
    tuple[
        users_session_models.UsersSessions,
        oauth_state_models.OAuthState | None,
    ]
    | None
):
    """
    Retrieve a session with its OAuth state for token exchange.

    Performs a query to retrieve a session with its linked OAuth
    state record (if any). Used during mobile token exchange to
    validate PKCE and ensure the session is valid.

    Args:
        session_id: The unique identifier of the session.
        db: SQLAlchemy database session.

    Returns:
        Tuple of (session, oauth_state) where oauth_state may be
        None if not linked. Returns None if session not found.

    Raises:
        HTTPException: If database error occurs.
    """
    # Query session
    stmt = (
        select(users_session_models.UsersSessions)
        .where(users_session_models.UsersSessions.id == session_id)
        .where(
            users_session_models.UsersSessions.expires_at > datetime.now(timezone.utc)
        )
    )
    db_session = db.execute(stmt).scalar_one_or_none()

    if not db_session:
        return None

    # Get OAuth state if linked
    oauth_state = None
    if db_session.oauth_state_id:
        oauth_state = oauth_state_crud.get_oauth_state_by_id(
            db_session.oauth_state_id, db
        )

    return (db_session, oauth_state)

get_user_agent

get_user_agent(request)

Extract User-Agent string from request headers.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request object.

required

Returns:

Type Description
str

User-Agent header value or empty string.

Source code in backend/app/users/users_sessions/utils.py
301
302
303
304
305
306
307
308
309
310
311
def get_user_agent(request: Request) -> str:
    """
    Extract User-Agent string from request headers.

    Args:
        request: The incoming HTTP request object.

    Returns:
        User-Agent header value or empty string.
    """
    return request.headers.get("user-agent", "")

get_user_sessions

get_user_sessions(user_id, db)

Retrieve all sessions for a user, ordered by creation date.

Parameters:

Name Type Description Default
user_id int

The ID of the user whose sessions to retrieve.

required
db Session

SQLAlchemy database session.

required

Returns:

Type Description
list[UsersSessions]

List of session objects, ordered by most recent first.

Raises:

Type Description
HTTPException

If database error occurs.

Source code in backend/app/users/users_sessions/crud.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@core_decorators.handle_db_errors
def get_user_sessions(
    user_id: int,
    db: Session,
) -> list[users_session_models.UsersSessions]:
    """
    Retrieve all sessions for a user, ordered by creation date.

    Args:
        user_id: The ID of the user whose sessions to retrieve.
        db: SQLAlchemy database session.

    Returns:
        List of session objects, ordered by most recent first.

    Raises:
        HTTPException: If database error occurs.
    """
    stmt = (
        select(users_session_models.UsersSessions)
        .where(users_session_models.UsersSessions.user_id == user_id)
        .order_by(users_session_models.UsersSessions.created_at.desc())
    )
    return list(db.execute(stmt).scalars().all())

mark_tokens_exchanged

mark_tokens_exchanged(session_id, db)

Mark tokens as exchanged and clear OAuth state.

Sets tokens_exchanged flag to prevent duplicate mobile token exchanges. Deletes the associated OAuth state per OAuth 2.1 best practices (state is ephemeral).

Parameters:

Name Type Description Default
session_id str

The unique identifier of the session.

required
db Session

SQLAlchemy database session.

required

Raises:

Type Description
HTTPException

If session not found (404) or database error occurs (500).

Source code in backend/app/users/users_sessions/crud.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
@core_decorators.handle_db_errors
def mark_tokens_exchanged(session_id: str, db: Session) -> None:
    """
    Mark tokens as exchanged and clear OAuth state.

    Sets tokens_exchanged flag to prevent duplicate mobile token
    exchanges. Deletes the associated OAuth state per OAuth 2.1
    best practices (state is ephemeral).

    Args:
        session_id: The unique identifier of the session.
        db: SQLAlchemy database session.

    Raises:
        HTTPException: If session not found (404) or database
            error occurs (500).
    """
    # Get the session from the database
    db_session = get_session_by_id(session_id, db)

    # Check if the session exists
    if not db_session:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Session {session_id} not found",
        )

    # Store oauth_state_id before clearing (for cleanup)
    oauth_state_id_to_delete = db_session.oauth_state_id

    # Mark tokens as exchanged and clear OAuth state reference
    db_session.tokens_exchanged = True
    db_session.oauth_state_id = None
    db.commit()

    # Delete the OAuth state now that tokens are exchanged
    if oauth_state_id_to_delete:
        try:
            oauth_state_crud.delete_oauth_state(oauth_state_id_to_delete, db)
            core_logger.print_to_log(
                f"Deleted OAuth state "
                f"{oauth_state_id_to_delete[:8]}... after token "
                f"exchange",
                "debug",
            )
        except Exception as err:
            # Log but don't fail - cleanup job handles orphaned
            # states
            core_logger.print_to_log(
                f"Failed to delete OAuth state "
                f"{oauth_state_id_to_delete[:8]}... after token "
                f"exchange: {err}",
                "warning",
                exc=err,
            )

parse_user_agent

parse_user_agent(user_agent)

Parse user agent string and extract device information.

Parameters:

Name Type Description Default
user_agent str

The user agent string to parse.

required

Returns:

Type Description
DeviceInfo

Device information including type, OS, and browser

DeviceInfo

details. Unknown fields default to "Unknown".

Source code in backend/app/users/users_sessions/utils.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def parse_user_agent(user_agent: str) -> DeviceInfo:
    """
    Parse user agent string and extract device information.

    Args:
        user_agent: The user agent string to parse.

    Returns:
        Device information including type, OS, and browser
        details. Unknown fields default to "Unknown".
    """
    ua = parse(user_agent)
    device_type = (
        DeviceType.MOBILE
        if ua.is_mobile
        else DeviceType.TABLET if ua.is_tablet else DeviceType.PC
    )

    return DeviceInfo(
        device_type=device_type,
        operating_system=ua.os.family or "Unknown",
        operating_system_version=ua.os.version_string or "Unknown",
        browser=ua.browser.family or "Unknown",
        browser_version=ua.browser.version_string or "Unknown",
    )

validate_session_timeout

validate_session_timeout(session)

Validate session hasn't exceeded idle or absolute timeout.

Only enforces when SESSION_IDLE_TIMEOUT_ENABLED=true. Checks idle timeout (last_activity_at) and absolute timeout (created_at).

Parameters:

Name Type Description Default
session UsersSessions

The session to validate.

required

Raises:

Type Description
HTTPException

401 if session has timed out.

Source code in backend/app/users/users_sessions/utils.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def validate_session_timeout(
    session: users_session_models.UsersSessions,
) -> None:
    """
    Validate session hasn't exceeded idle or absolute timeout.

    Only enforces when SESSION_IDLE_TIMEOUT_ENABLED=true.
    Checks idle timeout (last_activity_at) and absolute
    timeout (created_at).

    Args:
        session: The session to validate.

    Raises:
        HTTPException: 401 if session has timed out.
    """
    # Skip validation if timeouts are disabled
    if not auth_constants.SESSION_IDLE_TIMEOUT_ENABLED:
        return

    now = datetime.now(timezone.utc)

    # Check idle timeout
    idle_limit = session.last_activity_at + timedelta(
        hours=auth_constants.SESSION_IDLE_TIMEOUT_HOURS
    )
    if now > idle_limit:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Session expired due to inactivity",
            headers={"WWW-Authenticate": "Bearer"},
        )

    # Check absolute timeout
    absolute_limit = session.created_at + timedelta(
        hours=auth_constants.SESSION_ABSOLUTE_TIMEOUT_HOURS
    )
    if now > absolute_limit:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Session expired. Please login again for security.",
            headers={"WWW-Authenticate": "Bearer"},
        )