Skip to content

API Reference

Activity streams module for stream data management.

This module provides activity stream data operations, including retrieval, creation, and transformation of heart rate, power, cadence, elevation, speed, pace, and map streams.

Exports
  • CRUD: get_activity_streams, get_activities_streams, get_public_activity_streams, get_activity_stream_by_type, get_public_activity_stream_by_type, create_activity_streams
  • Utils: transform_activity_streams, is_stream_hidden, filter_visible_streams
  • Schemas: ActivityStreams
  • Models: ActivityStreams (ORM model)
  • Constants: STREAM_TYPE_HR, STREAM_TYPE_POWER, STREAM_TYPE_CADENCE, STREAM_TYPE_ELEVATION, STREAM_TYPE_SPEED, STREAM_TYPE_PACE, STREAM_TYPE_MAP

ActivityStreamsBase

Bases: BaseModel

Schema for activity stream data.

Attributes:

Name Type Description
id

Unique stream identifier.

activity_id StrictInt

Parent activity identifier.

stream_type StrictInt

Stream type code (1-7).

stream_waypoints list[dict]

Waypoint data points.

strava_activity_stream_id StrictInt | None

Strava stream ID.

zone_percentages dict | None

Zone breakdowns keyed by metric (e.g. 'hr').

Source code in backend/app/activities/activity_streams/schema.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class ActivityStreamsBase(BaseModel):
    """
    Schema for activity stream data.

    Attributes:
        id: Unique stream identifier.
        activity_id: Parent activity identifier.
        stream_type: Stream type code (1-7).
        stream_waypoints: Waypoint data points.
        strava_activity_stream_id: Strava stream ID.
        zone_percentages: Zone breakdowns keyed by metric (e.g. 'hr').
    """

    model_config = ConfigDict(
        from_attributes=True,
    )

    activity_id: StrictInt
    stream_type: StrictInt
    stream_waypoints: list[dict]
    strava_activity_stream_id: StrictInt | None = None
    zone_percentages: dict | None = None

ActivityStreamsCreate

Bases: ActivityStreamsBase

Schema for activity stream creation.

Inherits all fields from ActivityStreamsBase.

Source code in backend/app/activities/activity_streams/schema.py
34
35
36
37
38
39
class ActivityStreamsCreate(ActivityStreamsBase):
    """
    Schema for activity stream creation.

    Inherits all fields from ActivityStreamsBase.
    """

ActivityStreamsModel

Bases: Base

ORM model for activity stream data.

Attributes:

Name Type Description
id Mapped[int]

Primary key, auto-incremented.

activity_id Mapped[int]

FK to the parent activity.

stream_type Mapped[int]

Stream type integer code.

stream_waypoints Mapped[list]

JSON waypoint data.

strava_activity_stream_id Mapped[int | None]

Strava stream ID.

Source code in backend/app/activities/activity_streams/models.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class ActivityStreams(Base):
    """
    ORM model for activity stream data.

    Attributes:
        id: Primary key, auto-incremented.
        activity_id: FK to the parent activity.
        stream_type: Stream type integer code.
        stream_waypoints: JSON waypoint data.
        strava_activity_stream_id: Strava stream ID.
    """

    __tablename__ = "activities_streams"

    id: Mapped[int] = mapped_column(
        primary_key=True,
        autoincrement=True,
    )
    activity_id: Mapped[int] = mapped_column(
        ForeignKey(
            "activities.id",
            ondelete="CASCADE",
        ),
        nullable=False,
        index=True,
        comment=("Activity ID that the activity stream belongs"),
    )
    stream_type: Mapped[int] = mapped_column(
        nullable=False,
        comment=("Stream type (1-HR, 2-Power, 3-Cadence, 4-Elevation, 5-Velocity, 6-Pace, 7-lat/lon)"),
    )
    stream_waypoints: Mapped[list] = mapped_column(
        JSON,
        nullable=False,
        comment="Store waypoints data",
    )
    strava_activity_stream_id: Mapped[int | None] = mapped_column(
        nullable=True,
        comment="Strava activity stream ID",
    )
    zone_percentages: Mapped[dict | None] = mapped_column(
        JSON(none_as_null=True),
        nullable=True,
        comment="Pre-computed zone breakdowns keyed by metric (e.g. 'hr')",
    )

    # Define a relationship to the Activity model
    activity: Mapped["Activity"] = relationship(
        back_populates="activities_streams",
    )

ActivityStreamsRead

Bases: ActivityStreamsBase

Schema for activity stream reading.

Attributes:

Name Type Description
id StrictInt | None

Unique stream identifier.

Source code in backend/app/activities/activity_streams/schema.py
42
43
44
45
46
47
48
49
50
class ActivityStreamsRead(ActivityStreamsBase):
    """
    Schema for activity stream reading.

    Attributes:
        id: Unique stream identifier.
    """

    id: StrictInt | None = None

create_activity_streams async

create_activity_streams(activity_streams, activity, db)

Bulk create activity streams.

Parameters:

Name Type Description Default
activity_streams list[ActivityStreamsCreate]

List of stream schemas.

required
activity Activity

Activity schema to associate streams with.

required
db Session

Database session.

required
Source code in backend/app/activities/activity_streams/crud.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
@core_decorators.handle_db_errors
async def create_activity_streams(
    activity_streams: list[activity_streams_schema.ActivityStreamsCreate],
    activity: activity_schema.Activity,
    db: Session,
) -> None:
    """
    Bulk create activity streams.

    Args:
        activity_streams: List of stream schemas.
        activity: Activity schema to associate streams with.
        db: Database session.
    """

    if activity.user_id is None:
        core_logger.print_to_log_and_console(
            f"Failed to create activity streams: activity {activity.id} has no user_id",
            "warning",
        )
        return

    user: users_models.Users | None = users_crud.get_user_by_id(activity.user_id, db)
    if not user:
        core_logger.print_to_log_and_console(
            f"Failed to create activity streams: user {activity.user_id} not found",
            "warning",
        )
        return

    streams: list[activity_streams_models.ActivityStreams] = []
    for stream in activity_streams:
        zone_percentages: dict | None = None
        if stream.stream_type == activity_streams_constants.STREAM_TYPE_HR:
            try:
                zone_percentages = await activity_streams_utils.build_zone_percentages(
                    user,
                    activity,
                    stream.stream_waypoints,
                )
            except Exception as err:
                core_logger.print_to_log(
                    f"Zone % computation failed for stream (activity {stream.activity_id}): {err}",
                    "error",
                    exc=err,
                )
        streams.append(
            activity_streams_models.ActivityStreams(
                activity_id=stream.activity_id,
                stream_type=stream.stream_type,
                stream_waypoints=stream.stream_waypoints,
                strava_activity_stream_id=stream.strava_activity_stream_id,
                zone_percentages=zone_percentages,
            )
        )

    if streams:
        db.add_all(streams)
        db.commit()

filter_visible_streams

filter_visible_streams(streams, activity)

Filter out streams hidden by the activity.

Parameters:

Name Type Description Default
streams list[ActivityStreams]

List of stream ORM instances.

required
activity Activity

The activity schema instance.

required

Returns:

Type Description
list[ActivityStreams]

Streams that are not hidden.

Source code in backend/app/activities/activity_streams/utils.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def filter_visible_streams(
    streams: list[activity_streams_models.ActivityStreams],
    activity: activity_schema.Activity,
) -> list[activity_streams_models.ActivityStreams]:
    """
    Filter out streams hidden by the activity.

    Args:
        streams: List of stream ORM instances.
        activity: The activity schema instance.

    Returns:
        Streams that are not hidden.
    """
    return [s for s in streams if not is_stream_hidden(activity, s.stream_type)]

get_activities_streams

get_activities_streams(activity_ids, _user_id, db, _activities)

Get streams for multiple activities.

Parameters:

Name Type Description Default
activity_ids list[int]

List of activity IDs.

required
_user_id int

Authenticated user ID.

required
db Session

Database session.

required
_activities list[Activity]

Pre-fetched activity list.

required

Returns:

Type Description
list[ActivityStreamsRead]

List of activity streams.

Raises:

Type Description
HTTPException

On database errors.

Source code in backend/app/activities/activity_streams/crud.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@core_decorators.handle_db_errors
def get_activities_streams(
    activity_ids: list[int],
    _user_id: int,
    db: Session,
    _activities: list[activity_models.Activity],
) -> list[activity_streams_schema.ActivityStreamsRead]:
    """
    Get streams for multiple activities.

    Args:
        activity_ids: List of activity IDs.
        _user_id: Authenticated user ID.
        db: Database session.
        _activities: Pre-fetched activity list.

    Returns:
        List of activity streams.

    Raises:
        HTTPException: On database errors.
    """
    stmt = select(activity_streams_models.ActivityStreams).where(
        activity_streams_models.ActivityStreams.activity_id.in_(activity_ids)
    )
    all_streams: list[activity_streams_models.ActivityStreams] = list(db.scalars(stmt).all())

    if not all_streams:
        return []

    return activity_streams_utils.transform_activity_streams(all_streams)

get_activity_stream_by_type

get_activity_stream_by_type(activity_id, stream_type, token_user_id, db)

Get a specific stream type for an activity.

Parameters:

Name Type Description Default
activity_id int

The activity identifier.

required
stream_type int

The stream type constant.

required
token_user_id int

Authenticated user ID.

required
db Session

Database session.

required

Returns:

Type Description
ActivityStreamsRead | None

The activity stream or None.

Raises:

Type Description
HTTPException

On database errors.

Source code in backend/app/activities/activity_streams/crud.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@core_decorators.handle_db_errors
def get_activity_stream_by_type(
    activity_id: int,
    stream_type: int,
    token_user_id: int,
    db: Session,
) -> activity_streams_schema.ActivityStreamsRead | None:
    """
    Get a specific stream type for an activity.

    Args:
        activity_id: The activity identifier.
        stream_type: The stream type constant.
        token_user_id: Authenticated user ID.
        db: Database session.

    Returns:
        The activity stream or None.

    Raises:
        HTTPException: On database errors.
    """
    activity: activity_schema.Activity | None = activity_crud.get_activity_by_id(activity_id, db)

    if not activity:
        return None

    stmt = select(activity_streams_models.ActivityStreams).where(
        activity_streams_models.ActivityStreams.activity_id == activity_id,
        activity_streams_models.ActivityStreams.stream_type == stream_type,
    )
    activity_stream: activity_streams_models.ActivityStreams | None = db.scalars(stmt).first()

    if not activity_stream:
        return None

    if token_user_id != activity.user_id and activity_streams_utils.is_stream_hidden(
        activity,
        activity_stream.stream_type,
    ):
        return None

    return activity_streams_utils.transform_activity_streams(activity_stream)

get_activity_streams

get_activity_streams(activity_id, token_user_id, db)

Get all streams for an activity.

Parameters:

Name Type Description Default
activity_id int

The activity identifier.

required
token_user_id int

Authenticated user ID.

required
db Session

Database session.

required

Returns:

Type Description
list[ActivityStreamsRead]

List of activity streams or None.

Raises:

Type Description
HTTPException

On database errors.

Source code in backend/app/activities/activity_streams/crud.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@core_decorators.handle_db_errors
def get_activity_streams(
    activity_id: int,
    token_user_id: int,
    db: Session,
) -> list[activity_streams_schema.ActivityStreamsRead]:
    """
    Get all streams for an activity.

    Args:
        activity_id: The activity identifier.
        token_user_id: Authenticated user ID.
        db: Database session.

    Returns:
        List of activity streams or None.

    Raises:
        HTTPException: On database errors.
    """
    activity: activity_schema.Activity | None = activity_crud.get_activity_by_id(activity_id, db)

    if not activity:
        return []

    stmt = select(activity_streams_models.ActivityStreams).where(
        activity_streams_models.ActivityStreams.activity_id == activity_id,
    )
    activity_streams: list[activity_streams_models.ActivityStreams] = list(db.scalars(stmt).all())

    if not activity_streams:
        return []

    if token_user_id != activity.user_id:
        activity_streams = activity_streams_utils.filter_visible_streams(activity_streams, activity)

    return activity_streams_utils.transform_activity_streams(activity_streams)

get_public_activity_stream_by_type

get_public_activity_stream_by_type(activity_id, stream_type, db)

Get a public stream by type for an activity.

Parameters:

Name Type Description Default
activity_id int

The activity identifier.

required
stream_type int

The stream type constant.

required
db Session

Database session.

required

Returns:

Type Description
ActivityStreamsRead | None

The activity stream or None.

Raises:

Type Description
HTTPException

On database errors.

Source code in backend/app/activities/activity_streams/crud.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
@core_decorators.handle_db_errors
def get_public_activity_stream_by_type(
    activity_id: int,
    stream_type: int,
    db: Session,
) -> activity_streams_schema.ActivityStreamsRead | None:
    """
    Get a public stream by type for an activity.

    Args:
        activity_id: The activity identifier.
        stream_type: The stream type constant.
        db: Database session.

    Returns:
        The activity stream or None.

    Raises:
        HTTPException: On database errors.
    """
    server_settings = server_settings_utils.get_server_settings_or_404(db)

    if not server_settings.public_shareable_links:
        return None

    activity: activity_schema.Activity | None = activity_crud.get_activity_by_id_if_is_public(activity_id, db)

    if not activity:
        return None

    stmt = (
        select(activity_streams_models.ActivityStreams)
        .join(
            activity_models.Activity,
            activity_models.Activity.id == (activity_streams_models.ActivityStreams.activity_id),
        )
        .where(
            activity_streams_models.ActivityStreams.activity_id == activity_id,
            activity_streams_models.ActivityStreams.stream_type == stream_type,
            activity_models.Activity.visibility == 0,
            activity_models.Activity.id == activity_id,
        )
    )
    activity_stream = db.scalars(stmt).first()

    if not activity_stream:
        return None

    if activity_streams_utils.is_stream_hidden(
        activity,
        activity_stream.stream_type,
    ):
        return None

    return activity_streams_utils.transform_activity_streams(activity_stream)

get_public_activity_streams

get_public_activity_streams(activity_id, db)

Get public streams for an activity.

Parameters:

Name Type Description Default
activity_id int

The activity identifier.

required
db Session

Database session.

required

Returns:

Type Description
list[ActivityStreamsRead]

List of activity streams.

Raises:

Type Description
HTTPException

On database errors.

Source code in backend/app/activities/activity_streams/crud.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@core_decorators.handle_db_errors
def get_public_activity_streams(
    activity_id: int,
    db: Session,
) -> list[activity_streams_schema.ActivityStreamsRead]:
    """
    Get public streams for an activity.

    Args:
        activity_id: The activity identifier.
        db: Database session.

    Returns:
        List of activity streams.

    Raises:
        HTTPException: On database errors.
    """
    server_settings = server_settings_utils.get_server_settings_or_404(db)

    if not server_settings.public_shareable_links:
        return []

    activity = activity_crud.get_activity_by_id_if_is_public(activity_id, db)

    if not activity:
        return []

    stmt = (
        select(activity_streams_models.ActivityStreams)
        .join(
            activity_models.Activity,
            activity_models.Activity.id == (activity_streams_models.ActivityStreams.activity_id),
        )
        .where(
            activity_streams_models.ActivityStreams.activity_id == activity_id,
            activity_models.Activity.visibility == 0,
            activity_models.Activity.id == activity_id,
        )
    )
    activity_streams: list[activity_streams_models.ActivityStreams] = list(db.scalars(stmt).all())

    if not activity_streams:
        return []

    activity_streams = activity_streams_utils.filter_visible_streams(activity_streams, activity)

    return activity_streams_utils.transform_activity_streams(activity_streams)

is_stream_hidden

is_stream_hidden(activity, stream_type)

Check if a stream type is hidden.

Parameters:

Name Type Description Default
activity Activity

The activity schema instance.

required
stream_type int

The stream type constant.

required

Returns:

Type Description
bool

True if the stream should be hidden.

Source code in backend/app/activities/activity_streams/utils.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def is_stream_hidden(
    activity: activity_schema.Activity,
    stream_type: int,
) -> bool:
    """
    Check if a stream type is hidden.

    Args:
        activity: The activity schema instance.
        stream_type: The stream type constant.

    Returns:
        True if the stream should be hidden.
    """
    attr = _STREAM_HIDE_MAP.get(stream_type)
    return bool(attr and getattr(activity, attr, False))

transform_activity_streams

transform_activity_streams(activity_streams: list[ActivityStreams]) -> list[activity_streams_schema.ActivityStreamsRead]
transform_activity_streams(activity_streams: ActivityStreams) -> activity_streams_schema.ActivityStreamsRead
transform_activity_streams(activity_streams)

Transform a stream or list of streams to a Pydantic schema or list of schemas.

Parameters:

Name Type Description Default
activity_streams ActivityStreams | list[ActivityStreams]

The stream ORM instance or list of stream ORM instances.

required

Returns:

Type Description
ActivityStreamsRead | list[ActivityStreamsRead]

The activity stream as a schema or list of schemas.

Source code in backend/app/activities/activity_streams/utils.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def transform_activity_streams(
    activity_streams: activity_streams_models.ActivityStreams | list[activity_streams_models.ActivityStreams],
) -> activity_streams_schema.ActivityStreamsRead | list[activity_streams_schema.ActivityStreamsRead]:
    """
    Transform a stream or list of streams to a Pydantic schema or list of schemas.

    Args:
        activity_streams: The stream ORM instance or list of stream ORM instances.

    Returns:
        The activity stream as a schema or list of schemas.
    """
    if isinstance(activity_streams, list):
        return [activity_streams_schema.ActivityStreamsRead.model_validate(stream) for stream in activity_streams]
    return activity_streams_schema.ActivityStreamsRead.model_validate(activity_streams)