Skip to content

API Reference

Data migration tracking and execution module.

Provides utilities for running and tracking schema/data migrations at application startup.

Exports
  • CRUD: get_migrations_not_executed, set_migration_as_executed
  • Models: Migration (ORM model)
  • Enums: StreamType
  • Utils: check_migrations_not_executed

MigrationModel

Bases: Base

Tracks data migration execution state.

Attributes:

Name Type Description
id Mapped[int]

Primary key.

name Mapped[str]

Migration name (max 250 chars).

description Mapped[str]

Migration description (max 2500 chars).

executed Mapped[bool]

Whether the migration has been executed.

Source code in backend/app/migrations/models.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Migration(Base):
    """
    Tracks data migration execution state.

    Attributes:
        id: Primary key.
        name: Migration name (max 250 chars).
        description: Migration description (max 2500 chars).
        executed: Whether the migration has been executed.
    """

    __tablename__ = "migrations"

    id: Mapped[int] = mapped_column(
        primary_key=True,
        autoincrement=True,
    )
    name: Mapped[str] = mapped_column(
        String(250),
        nullable=False,
        comment="Migration name",
    )
    description: Mapped[str] = mapped_column(
        String(2500),
        nullable=False,
        comment="Migration description",
    )
    executed: Mapped[bool] = mapped_column(
        nullable=False,
        default=False,
        comment="Whether the migration was executed",
    )

StreamType

Bases: Enum

Activity data stream type enumeration.

Attributes:

Name Type Description
HEART_RATE

Heart rate data stream.

POWER

Power output data stream.

CADENCE

Cadence data stream.

ELEVATION

Elevation data stream.

SPEED

Speed data stream.

PACE

Pace data stream.

LATLONG

Latitude/longitude data stream.

Source code in backend/app/migrations/schema.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class StreamType(Enum):
    """
    Activity data stream type enumeration.

    Attributes:
        HEART_RATE: Heart rate data stream.
        POWER: Power output data stream.
        CADENCE: Cadence data stream.
        ELEVATION: Elevation data stream.
        SPEED: Speed data stream.
        PACE: Pace data stream.
        LATLONG: Latitude/longitude data stream.
    """

    HEART_RATE = 1
    POWER = 2
    CADENCE = 3
    ELEVATION = 4
    SPEED = 5
    PACE = 6
    LATLONG = 7

check_migrations_not_executed async

check_migrations_not_executed(db)

Check and execute any pending migrations.

Iterates over unexecuted migrations and runs each migration handler in order. Sync handlers are called directly; async handlers are awaited.

Parameters:

Name Type Description Default
db Session

Database session.

required

Returns:

Type Description
None

None.

Source code in backend/app/migrations/utils.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
async def check_migrations_not_executed(
    db: Session,
) -> None:
    """
    Check and execute any pending migrations.

    Iterates over unexecuted migrations and runs each migration handler in
    order. Sync handlers are called directly; async handlers are awaited.

    Args:
        db: Database session.

    Returns:
        None.
    """
    migrations_not_executed = migrations_crud.get_migrations_not_executed(db)

    if not migrations_not_executed:
        return

    for migration in migrations_not_executed:
        core_logger.print_to_log(
            f"Migration not executed: {migration.name}" " - Migration will be executed"
        )

        if migration.id in _SYNC_MIGRATIONS:
            _SYNC_MIGRATIONS[migration.id](db)
        elif migration.id in _ASYNC_MIGRATIONS:
            await _ASYNC_MIGRATIONS[migration.id](db)

get_migrations_not_executed

get_migrations_not_executed(db)

Retrieve all unexecuted migrations.

Parameters:

Name Type Description Default
db Session

Database session.

required

Returns:

Type Description
list[Migration] | None

List of unexecuted Migration models, or None if all migrations are executed.

Raises:

Type Description
HTTPException

500 error on database failure.

Source code in backend/app/migrations/crud.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@core_decorators.handle_db_errors
def get_migrations_not_executed(
    db: Session,
) -> list[migrations_models.Migration] | None:
    """
    Retrieve all unexecuted migrations.

    Args:
        db: Database session.

    Returns:
        List of unexecuted Migration models, or None if all migrations are
            executed.

    Raises:
        HTTPException: 500 error on database failure.
    """
    stmt = select(migrations_models.Migration).where(
        migrations_models.Migration.executed.is_(False)
    )
    results = db.execute(stmt).scalars().all()
    return results if results else None

set_migration_as_executed

set_migration_as_executed(migration_id, db)

Mark a migration as executed by its ID.

Parameters:

Name Type Description Default
migration_id int

ID of the migration to mark as executed.

required
db Session

Database session.

required

Returns:

Type Description
None

None.

Raises:

Type Description
HTTPException

404 if migration not found.

HTTPException

500 error on database failure.

Source code in backend/app/migrations/crud.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@core_decorators.handle_db_errors
def set_migration_as_executed(
    migration_id: int,
    db: Session,
) -> None:
    """
    Mark a migration as executed by its ID.

    Args:
        migration_id: ID of the migration to mark as executed.
        db: Database session.

    Returns:
        None.

    Raises:
        HTTPException: 404 if migration not found.
        HTTPException: 500 error on database failure.
    """
    stmt = select(migrations_models.Migration).where(
        migrations_models.Migration.id == migration_id
    )
    db_migration = db.execute(stmt).scalar_one_or_none()

    if db_migration is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Migration not found",
            headers={"WWW-Authenticate": "Bearer"},
        )

    db_migration.executed = True
    db.commit()
    db.refresh(db_migration)