Skip to content

Commit

Permalink
Send connection and reconnection time in stats telemetry (#1274)
Browse files Browse the repository at this point in the history
  • Loading branch information
liviu-timar authored Jan 9, 2025
1 parent 3b46d50 commit 1d69320
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ public class Call(
internal var session: RtcSession? = null
var sessionId = UUID.randomUUID().toString()

internal var connectedAt: Long? = null
internal var reconnectAt: Pair<WebsocketReconnectStrategy, Long>? = null
internal var connectStartTime = 0L
internal var reconnectStartTime = 0L

internal var peerConnectionFactory: StreamPeerConnectionFactory = StreamPeerConnectionFactory(
context = clientImpl.context,
Expand Down Expand Up @@ -452,8 +452,9 @@ public class Call(
"[joinInternal] #track; create: $create, ring: $ring, notify: $notify, createOptions: $createOptions"
}

// step 1. call the join endpoint to get a list of SFUs
connectStartTime = System.currentTimeMillis()

// step 1. call the join endpoint to get a list of SFUs
val locationResult = clientImpl.getCachedLocation()
if (locationResult !is Success) {
return locationResult as Failure
Expand All @@ -479,7 +480,6 @@ public class Call(
session = if (testInstanceProvider.rtcSessionCreator != null) {
testInstanceProvider.rtcSessionCreator!!.invoke()
} else {
connectedAt = System.currentTimeMillis()
RtcSession(
sessionId = this.sessionId,
apiKey = clientImpl.apiKey,
Expand All @@ -503,11 +503,12 @@ public class Call(
} catch (e: Exception) {
return Failure(Error.GenericError(e.message ?: "RtcSession error occurred."))
}

monitorSession(result.value)
return Success(value = session!!)
}

private suspend fun Call.monitorSession(result: JoinCallResponse) {
private fun Call.monitorSession(result: JoinCallResponse) {
sfuEvents?.cancel()
sfuListener?.cancel()
startCallStatsReporting(result.statsOptions.reportingIntervalMs.toLong())
Expand Down Expand Up @@ -583,39 +584,45 @@ public class Call(
network.subscribe(listener)
}

private suspend fun startCallStatsReporting(reportingIntervalMs: Long = 10_000) {
private fun startCallStatsReporting(reportingIntervalMs: Long = 10_000) {
callStatsReportingJob?.cancel()
callStatsReportingJob = scope.launch {
// Wait a bit before we start capturing stats
delay(reportingIntervalMs)

while (isActive) {
delay(reportingIntervalMs)

val publisherStats = session?.getPublisherStats()
val subscriberStats = session?.getSubscriberStats()
state.stats.updateFromRTCStats(publisherStats, isPublisher = true)
state.stats.updateFromRTCStats(subscriberStats, isPublisher = false)
state.stats.updateLocalStats()
val local = state.stats._local.value

val report = CallStatsReport(
publisher = publisherStats,
subscriber = subscriberStats,
local = local,
stateStats = state.stats,
session?.sendCallStats(
report = collectStats(),
)
statsReport.value = report
statLatencyHistory.value += report.stateStats.publisher.latency.value
if (statLatencyHistory.value.size > 20) {
statLatencyHistory.value = statLatencyHistory.value.takeLast(20)
}

session?.sendCallStats(report)
}
}
}

internal suspend fun collectStats(): CallStatsReport {
val publisherStats = session?.getPublisherStats()
val subscriberStats = session?.getSubscriberStats()
state.stats.updateFromRTCStats(publisherStats, isPublisher = true)
state.stats.updateFromRTCStats(subscriberStats, isPublisher = false)
state.stats.updateLocalStats()
val local = state.stats._local.value

val report = CallStatsReport(
publisher = publisherStats,
subscriber = subscriberStats,
local = local,
stateStats = state.stats,
)

statsReport.value = report
statLatencyHistory.value += report.stateStats.publisher.latency.value
if (statLatencyHistory.value.size > 20) {
statLatencyHistory.value = statLatencyHistory.value.takeLast(20)
}

return report
}

/**
* Fast reconnect to the same SFU with the same participant session.
*/
Expand All @@ -624,6 +631,8 @@ public class Call(
session?.prepareReconnect()
this@Call.state._connection.value = RealtimeConnection.Reconnecting
if (session != null) {
reconnectStartTime = System.currentTimeMillis()

val session = session!!
val (prevSessionId, subscriptionsInfo, publishingInfo) = session.currentSfuInfo()
val reconnectDetails = ReconnectDetails(
Expand All @@ -633,7 +642,6 @@ public class Call(
subscriptions = subscriptionsInfo,
reconnect_attempt = reconnectAttepmts,
)
reconnectAt = Pair(WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_FAST, System.currentTimeMillis())
session.fastReconnect(reconnectDetails)
} else {
logger.e { "[reconnect] Disconnecting" }
Expand All @@ -646,10 +654,11 @@ public class Call(
*/
suspend fun rejoin() = schedule {
logger.d { "[rejoin] Rejoining" }
reconnectAt = Pair(WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_REJOIN, System.currentTimeMillis())
reconnectAttepmts++
state._connection.value = RealtimeConnection.Reconnecting
location?.let {
reconnectStartTime = System.currentTimeMillis()

val joinResponse = joinRequest(location = it)
if (joinResponse is Success) {
// switch to the new SFU
Expand Down Expand Up @@ -701,6 +710,8 @@ public class Call(
logger.d { "[migrate] Migrating" }
state._connection.value = RealtimeConnection.Migrating
location?.let {
reconnectStartTime = System.currentTimeMillis()

val joinResponse = joinRequest(location = it)
if (joinResponse is Success) {
// switch to the new SFU
Expand All @@ -720,7 +731,6 @@ public class Call(
reconnect_attempt = reconnectAttepmts,
)
session.prepareRejoin()
reconnectAt = Pair(WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_MIGRATE, System.currentTimeMillis())
val newSession = RtcSession(
clientImpl,
powerManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,24 @@ public class RtcSession internal constructor(
sfuConnectionModule.socketConnection.connect(request)
sfuConnectionModule.socketConnection.whenConnected {
connectRtc()
sendConnectionTimeStats(reconnectDetails?.strategy)
}
}

private suspend fun sendConnectionTimeStats(reconnectStrategy: WebsocketReconnectStrategy? = null) {
if (reconnectStrategy == null) {
sendCallStats(
report = call.collectStats(),
connectionTimeSeconds = (System.currentTimeMillis() - call.connectStartTime) / 1000f,
)
} else {
sendCallStats(
report = call.collectStats(),
reconnectionTimeSeconds = Pair(
(System.currentTimeMillis() - call.reconnectStartTime) / 1000f,
reconnectStrategy,
),
)
}
}

Expand Down Expand Up @@ -1631,9 +1649,12 @@ public class RtcSession internal constructor(
return subscriber?.getStats()
}

internal suspend fun sendCallStats(report: CallStatsReport) {
internal suspend fun sendCallStats(
report: CallStatsReport,
connectionTimeSeconds: Float? = null,
reconnectionTimeSeconds: Pair<Float, WebsocketReconnectStrategy>? = null,
) {
val result = wrapAPICall {
val now = System.currentTimeMillis()
val androidThermalState =
safeCallWithDefault(AndroidThermalState.ANDROID_THERMAL_STATE_UNSPECIFIED) {
val thermalState = powerManager?.currentThermalStatus
Expand Down Expand Up @@ -1667,19 +1688,16 @@ public class RtcSession internal constructor(
is_power_saver_mode = powerSaving,
),
telemetry = safeCallWithDefault(null) {
if (call.reconnectAt != null) {
if (connectionTimeSeconds != null) {
Telemetry(
reconnection = call.reconnectAt?.let {
Reconnection(
time_seconds = ((now - it.second) / 1000).toFloat(),
strategy = it.first,
)
},
connection_time_seconds = connectionTimeSeconds.toFloat(),
)
} else if (call.connectedAt != null) {
} else if (reconnectionTimeSeconds != null) {
Telemetry(
connection_time_seconds = call.connectedAt?.let { (now - it) / 1000 }
?.toFloat(),
reconnection = Reconnection(
time_seconds = reconnectionTimeSeconds.first.toFloat(),
strategy = reconnectionTimeSeconds.second,
),
)
} else {
null
Expand All @@ -1691,7 +1709,7 @@ public class RtcSession internal constructor(

logger.d {
"sendStats: " + when (result) {
is Success -> "Success"
is Success -> "Success. Response: ${result.value}. Telemetry: connectionTimeSeconds: $connectionTimeSeconds, reconnectionTimeSeconds: ${reconnectionTimeSeconds?.first}, strategy: ${reconnectionTimeSeconds?.second}"
is Failure -> "Failure. Reason: ${result.value.message}"
}
}
Expand Down Expand Up @@ -1898,6 +1916,15 @@ public class RtcSession internal constructor(
} else {
subscriber?.connection?.restartIce()
publisher?.connection?.restartIce()

sendCallStats(
report = call.collectStats(),
reconnectionTimeSeconds = Pair(
(System.currentTimeMillis() - call.reconnectStartTime) / 1000f,
WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_FAST,
),
)

setVideoSubscriptions(true)
}
}
Expand Down

0 comments on commit 1d69320

Please sign in to comment.