Skip to content

Commit

Permalink
Use a global SSE channel
Browse files Browse the repository at this point in the history
  • Loading branch information
louptheron committed Nov 20, 2023
1 parent a52c236 commit 328c9d1
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,8 @@ context('Side Window > Mission Form > Main Form', () => {
.its('mockEventSources' as any)
.then(mockEventSources => {
cy.log(mockEventSources.toString())
mockEventSources['http://0.0.0.0:8081/api/v1/missions/43/sse'].emitOpen()
mockEventSources['http://0.0.0.0:8081/api/v1/missions/43/sse'].emit(
mockEventSources['http://0.0.0.0:8081/api/v1/missions/sse'].emitOpen()
mockEventSources['http://0.0.0.0:8081/api/v1/missions/sse'].emit(
'MISSION_UPDATE',
new MessageEvent('MISSION_UPDATE', {
data: JSON.stringify({
Expand Down
30 changes: 7 additions & 23 deletions frontend/src/features/SideWindow/MissionForm/apis.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { logSoftError } from '@mtes-mct/monitor-ui'

import { MONITORENV_API_URL, monitorenvApi, monitorfishApi } from '../../../api'
import { addNewMissionListener, missionEventListener, removeMissionListener } from './sse'
import { monitorenvApi, monitorfishApi } from '../../../api'
import { Mission } from '../../../domain/entities/mission/types'
import { ApiError } from '../../../libs/ApiError'
import { ReconnectingEventSource } from '../../../libs/ReconnectingEventSource'

import type { ControlUnit } from '../../../domain/types/ControlUnit'

Expand Down Expand Up @@ -43,37 +43,21 @@ export const monitorenvMissionApi = monitorenvApi.injectEndpoints({
}),

getMission: builder.query<Mission.Mission, Mission.Mission['id']>({
keepUnusedDataFor: 0,
async onCacheEntryAdded(id, { cacheDataLoaded, cacheEntryRemoved, updateCachedData }) {
const url = `${MONITORENV_API_URL}/api/v1/missions/${id}/sse`

try {
const eventSource = new ReconnectingEventSource(url)
// eslint-disable-next-line no-console
console.log(`SSE: listening for updates of mission id ${id}...`)

// wait for the initial query to resolve before proceeding
await cacheDataLoaded

const listener = (event: MessageEvent) => {
const mission = JSON.parse(event.data) as Mission.Mission
// eslint-disable-next-line no-console
console.log(`SSE: received an update for mission id ${mission.id}.`)

updateCachedData(() => mission)
}

eventSource.addEventListener('MISSION_UPDATE', listener)
const listener = missionEventListener(id, updateCachedData)
addNewMissionListener(id, listener)

// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved

// perform cleanup steps once the `cacheEntryRemoved` promise resolves
eventSource.close()
// perform cleanup once the `cacheEntryRemoved` promise resolves
removeMissionListener(id)
} catch (e) {
logSoftError({
context: {
url
},
isSideWindowError: true,
message: "SSE: Can't connect or receive messages",
originalError: e
Expand Down
53 changes: 53 additions & 0 deletions frontend/src/features/SideWindow/MissionForm/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { MONITORENV_API_URL } from '../../../api'
import { Mission } from '../../../domain/entities/mission/types'
import { ReconnectingEventSource } from '../../../libs/ReconnectingEventSource'

import type { PatchCollection, Recipe } from '@reduxjs/toolkit/dist/query/core/buildThunks'

const MISSION_UPDATES_URL = `${MONITORENV_API_URL}/api/v1/missions/sse`
export const MISSION_UPDATE_EVENT = `MISSION_UPDATE`

export const EVENT_SOURCE = new ReconnectingEventSource(MISSION_UPDATES_URL, { max_retry_time: 10000 })
// eslint-disable-next-line no-console
console.log(`SSE: connected to missions endpoint.`)

const missionIdToListenerMap = new Map<number, (event: MessageEvent) => void>()

export function addNewMissionListener(missionId: number, listener: (event: MessageEvent) => void) {
removeMissionListener(missionId)

// eslint-disable-next-line no-console
console.log(`SSE: listening for updates of mission id ${missionId}...`)

missionIdToListenerMap.set(missionId, listener)

EVENT_SOURCE.addEventListener(MISSION_UPDATE_EVENT, listener)
}

export function removeMissionListener(missionId: number) {
const listener = missionIdToListenerMap.get(missionId)
if (!listener) {
return
}

EVENT_SOURCE.removeEventListener(MISSION_UPDATE_EVENT, listener)
missionIdToListenerMap.delete(missionId)
// eslint-disable-next-line no-console
console.log(`SSE: removed listener of mission id ${missionId}.`)
}

export const missionEventListener =
(id: number, updateCachedData: (updateRecipe: Recipe<any>) => PatchCollection) => (event: MessageEvent) => {
const mission = JSON.parse(event.data) as Mission.Mission
if (mission.id !== id) {
// eslint-disable-next-line no-console
console.log(`SSE: filtered an update for mission id ${id} (received mission id ${mission.id}).`)

return
}

// eslint-disable-next-line no-console
console.log(`SSE: received an update for mission id ${id}.`)

updateCachedData(() => mission)
}

0 comments on commit 328c9d1

Please sign in to comment.