Skip to content

Commit

Permalink
feat: auto-unsubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
manzt committed Mar 17, 2024
1 parent 190b31f commit f118747
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 101 deletions.
135 changes: 76 additions & 59 deletions packages/anywidget/src/model.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import * as utils from "./util.js";

/** @template {Record<string, unknown>} State */
/**
* @template {Record<string, unknown>} T
* @typedef {import('./view.js').View<T>} View
*/

/** @template {Record<string, unknown>} T */
export class Model {
/** @type {import("./types.js").Comm=} */
#comm;
/** @type {Omit<import("./types.js").ModelOptions, "comm">} */
#options;
/** @type {{ [evt_name: string]: Map<() => void, (event: Event) => void> }} */
#listeners = {};
/** @type {State} */
/** @type {Map<any, { [evt_name: string]: Map<() => void, (event: Event) => void> }>} */
#listeners = new Map();
/** @type {T} */
#state;
/** @type {Set<string>} */
#need_sync = new Set();
Expand All @@ -17,7 +22,6 @@ export class Model {
/** @type {EventTarget} */
#events = new EventTarget();


// NOTE: (from Jupyter Team): keep track of the msg id for each attr for updates
// we send out so that we can ignore old messages that we send in
// order to avoid 'drunken' sliders going back and forward
Expand All @@ -28,11 +32,11 @@ export class Model {
/** @type {Promise<void>} */
state_change;

/** @type {Record<string, Promise<unknown>>} */
views = {}
/** @type {Record<string, Promise<View<T>>>} */
views = {};

/**
* @param {State} state
* @param {T} state
* @param {import("./types.js").ModelOptions} options
*/
constructor(state, options) {
Expand Down Expand Up @@ -86,7 +90,7 @@ export class Model {

/**
* Serialize the model state.
* @template {Partial<State>} T
* @template {Partial<T>} T
* @param {T} ser
* @returns {Promise<T>}
*/
Expand All @@ -109,7 +113,7 @@ export class Model {

/**
* Deserialize the model state.
* @template {Partial<State>} T
* @template {Partial<T>} T
* @param {T} de
* @returns {Promise<T>}
*/
Expand All @@ -133,22 +137,24 @@ export class Model {
*/
async #handle_comm_msg(msg) {
if (utils.is_update_msg(msg)) {
return this.#handle_update(msg);
await this.#handle_update(msg);
return;
}
if (utils.is_custom_msg(msg)) {
this.#emit("msg:custom", [msg.content.data.content, msg.buffers]);
return;
}
throw new Error(`unhandled comm message: ${JSON.stringify(msg)}`);
}

/**
* @param {import("./types.js").UpdateMessage | import("./types.js").EchoUpdateMessage} msg
*/
async #handle_update({ content, buffers, parent_header }) {
let state = content.data.state;
utils.put_buffers(state, content.data.buffer_paths, buffers);
if (content.data.method === "echo_update" && parent_header?.msg_id) {
this.#resolve_echo(state, parent_header.msg_id);
async #handle_update(msg) {
let state = msg.content.data.state;
utils.put_buffers(state, msg.content.data.buffer_paths, msg.buffers);
if (utils.is_echo_update_msg(msg)) {
this.#handle_echo_update(state, msg.parent_header.msg_id);
}
// @ts-expect-error - we don't validate this
let deserialized = await this.#deserialize(state);
Expand All @@ -159,7 +165,7 @@ export class Model {
* @param {Record<string, unknown>} state
* @param {string} msg_id
*/
#resolve_echo(state, msg_id) {
#handle_echo_update(state, msg_id) {
// we may have echos coming from other clients, we only care about
// dropping echos for which we expected a reply
for (let name of Object.keys(state)) {
Expand All @@ -180,7 +186,6 @@ export class Model {
}
}


/**
* @param {string} name
* @param {unknown} [value]
Expand All @@ -198,32 +203,30 @@ export class Model {
async #handle_comm_close() {
// can only be closed once.
if (!this.#comm) return;
this.#emit("comm:close");
this.#comm.close();
for (let [event, map] of Object.entries(this.#listeners)) {
for (let listener of map.values()) {
this.#events.removeEventListener(event, listener);
}
}
this.#listeners = {};
this.#comm = undefined;
for await (let view of Object.values(this.views)) view.remove();
this.views.clear();
this.#emit("comm:close");
this.off();
this.#listeners.clear();
for await (let view of Object.values(this.views)) {
view.remove();
}
this.views = {};
}

/**
* @template {keyof State} K
* @template {keyof T} K
* @param {K} key
* @returns {State[K]}
* @returns {T[K]}
*/
get(key) {
return this.#state[key];
}

/**
* @template {keyof State & string} K
* @template {keyof T & string} K
* @param {K} key
* @param {State[K]} value
* @param {T[K]} value
*/
set(key, value) {
this.#state[key] = value;
Expand All @@ -234,44 +237,43 @@ export class Model {

async save_changes() {
if (!this.#comm) return;
/** @type {Partial<State>} */
/** @type {Partial<T>} */
let to_send = {};
for (let key of this.#need_sync) {
// @ts-expect-error - we know this is a valid key
to_send[key] = this.#state[key];
}
let serialized = await this.serialize(to_send);
this.#need_sync.clear();
let split = utils.extract_buffers(serialized);
let { state, buffer_paths, buffers } = utils.extract_buffers(serialized);
this.#comm.send(
{
method: "update",
state: split.state,
buffer_paths: split.buffer_paths,
},
{ method: "update", state, buffer_paths },
undefined,
{},
split.buffers,
buffers,
);
}

/**
* @overload
* @param {string} event
* @param {() => void} callback
* @param {unknown} [scope]
* @returns {void}
*/
/**
* @overload
* @param {"msg:custom"} event
* @param {(content: unknown, buffers: ArrayBuffer[]) => void} callback
* @param {unknown} scope
* @returns {void}
*/
/**
* @param {string} event
* @param {(...args: any[]) => void} callback
* @param {unknown} [scope]
*/
on(event, callback) {
on(event, callback, scope = this) {
/** @type {(event?: unknown) => void} */
let handler;
if (event === "msg:custom") {
Expand All @@ -280,13 +282,15 @@ export class Model {
} else {
handler = () => callback();
}
this.#listeners[event] = this.#listeners[event] ?? new Map();
this.#listeners[event].set(callback, handler);
let scope_listeners = this.#listeners.get(scope) ?? {};
this.#listeners.set(scope, scope_listeners);
scope_listeners[event] = scope_listeners[event] ?? new Map();
scope_listeners[event].set(callback, handler);
this.#events.addEventListener(event, handler);
}

/**
* @param {Partial<State>} state
* @param {Partial<T>} state
*/
set_state(state) {
for (let key in state) {
Expand All @@ -302,25 +306,38 @@ export class Model {
}

/**
* @param {string} event
* @param {() => void} [callback]
* @param {string | null} [event]
* @param {null | (() => void)} [callback]
* @param {unknown} [scope]
*/
off(event, callback) {
let listeners = this.#listeners[event];
if (!listeners) {
return;
}
if (!callback) {
for (let handler of listeners.values()) {
this.#events.removeEventListener(event, handler);
off(event, callback, scope) {
let callbacks = [];
for (let [s, scope_listeners] of this.#listeners.entries()) {
if (scope && scope !== s) {
continue;
}
listeners.clear();
return;
for (let [e, listeners] of Object.entries(scope_listeners)) {
if (event && event !== e) {
continue;
}
for (let [cb, handler] of listeners.entries()) {
if (callback && callback !== cb) {
continue;
}
callbacks.push({ event: e, handler });
listeners.delete(cb);
}
if (!listeners.size) {
delete scope_listeners[e];
}
}
if (Object.keys(scope_listeners).length == 0) {
this.#listeners.delete(s);
}
}
for (let { event, handler } of callbacks) {
this.#events.removeEventListener(event, handler);
}
let handler = listeners.get(callback);
if (!handler) return;
this.#events.removeEventListener(event, handler);
listeners.delete(callback);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/anywidget/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export type UpdateMessage = {
};
};
export type EchoUpdateMessage = {
parent_header?: { msg_id: string };
parent_header: { msg_id: string };
buffers?: ReadonlyArray<ArrayBuffer | DataView>;
content: {
data: {
Expand All @@ -83,7 +83,7 @@ export type CustomMessage = {
};
};

export type WidgetManager = { get_model: (model_id: string) => unknown };
export type WidgetManager = { get_model: (model_id: string) => any };
export type FieldSerializer<A, B> = {
serialize: (value: A) => B | Promise<B>;
deserialize: (value: B, widget_manager: WidgetManager) => A | Promise<A>;
Expand Down
9 changes: 9 additions & 0 deletions packages/anywidget/src/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,12 @@ export function is_update_msg(msg) {
return msg.content.data.method === "update" ||
msg.content.data.method === "echo_update";
}

/**
* @param {import("./types.js").UpdateMessage | import("./types.js").EchoUpdateMessage} msg
* @return {msg is import("./types.js").EchoUpdateMessage}
*/
export function is_echo_update_msg(msg) {
return msg.content.data.method === "echo_update" &&
!!msg.parent_header?.msg_id;
}
46 changes: 15 additions & 31 deletions packages/anywidget/src/view.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { assert } from "./util.js";
import * as utils from "./util.js";
import { Widget } from "@lumino/widgets";

/**
Expand Down Expand Up @@ -32,12 +32,12 @@ export class View {
/** @type {Record<string, unknown>} */
options;
/** @type {() => void} */
#remove_callback = () => { };
#remove_callback = () => {};

/** @param {ViewOptions<T>} options */
constructor(options) {
this.el = options.el ?? document.createElement("div");
this.model = options.model;
constructor({ el, model, ...options }) {
this.el = el ?? document.createElement("div");
this.model = model;
this.options = options;
this.luminoWidget = new Widget({ node: this.el });
}
Expand All @@ -48,16 +48,22 @@ export class View {
* @param {() => void} callback
*/
listenTo(model, name, callback) {
assert(name === "destroy", "Only 'destroy' event is supported in `listenTo`.");
model.on(name, callback);
utils.assert(
name === "destroy",
"[anywidget]: Only 'destroy' event is supported in `listenTo`.",
);
model.once("destroy", callback);
}

/**
* @param {"remove"} name
* @param {() => void} callback
*/
once(name, callback) {
assert(name === "remove", "Only 'remove' event is supported in `once`.");
utils.assert(
name === "remove",
"[anywidget]: Only 'remove' event is supported in `once`.",
);
this.#remove_callback = callback;
}

Expand All @@ -66,30 +72,8 @@ export class View {
this.el.remove();
}

/**
* @param {LuminoMessage} msg
*/
processLuminoMessage(msg) {
switch (msg.type) {
case 'after-attach':
this.trigger('displayed');
break;
case 'show':
this.trigger('shown');
break;
}
}

/**
* @param {"displayed" | "shown"} name
*/
trigger(name) {
console.log(name)
}

/**
* Render the view.
*/
async render() { }

async render() {}
}
Loading

0 comments on commit f118747

Please sign in to comment.