From 7831ec5d62ef877056cc457ba3b64b57e5a42263 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Wed, 9 Oct 2024 01:43:09 +0300 Subject: [PATCH] web: reorganize RPC client inheritance --- web/src/api/client.ts | 48 +-------- web/src/api/rpc.ts | 134 ++++++++++++++++++++++-- web/src/api/wsclient.ts | 75 +++---------- web/src/ui/login/LoginScreen.tsx | 4 +- web/src/ui/login/VerificationScreen.tsx | 2 +- 5 files changed, 143 insertions(+), 120 deletions(-) diff --git a/web/src/api/client.ts b/web/src/api/client.ts index 4a77cb1..5db7bb3 100644 --- a/web/src/api/client.ts +++ b/web/src/api/client.ts @@ -15,10 +15,10 @@ // along with this program. If not, see . import { CachedEventDispatcher } from "../util/eventdispatcher.ts" import type { - ClientWellKnown, DBEvent, EventID, EventRowID, EventType, PaginationResponse, RoomID, TimelineRowID, UserID, + RoomID, } from "./types/hitypes.ts" -import { ClientState, RPCEvent } from "./types/hievents.ts" -import { RPCClient } from "./rpc.ts" +import type { ClientState, RPCEvent } from "./types/hievents.ts" +import type RPCClient from "./rpc.ts" import { StateStore } from "./statestore.ts" export default class Client { @@ -39,56 +39,16 @@ export default class Client { } } - request(command: string, data: Req): Promise { - return this.rpc.request(command, data) - } - - sendMessage(room_id: RoomID, event_type: EventType, content: Record): Promise { - return this.request("send_message", { room_id, event_type, content }) - } - - ensureGroupSessionShared(room_id: RoomID): Promise { - return this.request("ensure_group_session_shared", { room_id }) - } - - getEvent(room_id: RoomID, event_id: EventID): Promise { - return this.request("get_event", { room_id, event_id }) - } - - getEventsByRowIDs(row_ids: EventRowID[]): Promise { - return this.request("get_events_by_row_ids", { row_ids }) - } - async loadMoreHistory(roomID: RoomID): Promise { const room = this.store.rooms.get(roomID) if (!room) { throw new Error("Room not found") } const oldestRowID = room.timeline.current[0]?.timeline_rowid - const resp = await this.paginate(roomID, oldestRowID ?? 0, 100) + const resp = await this.rpc.paginate(roomID, oldestRowID ?? 0, 100) if (room.timeline.current[0]?.timeline_rowid !== oldestRowID) { throw new Error("Timeline changed while loading history") } room.applyPagination(resp.events) } - - paginate(room_id: RoomID, max_timeline_id: TimelineRowID, limit: number): Promise { - return this.request("paginate", { room_id, max_timeline_id, limit }) - } - - paginateServer(room_id: RoomID, limit: number): Promise { - return this.request("paginate_server", { room_id, limit }) - } - - discoverHomeserver(user_id: UserID): Promise { - return this.request("discover_homeserver", { user_id }) - } - - login(homeserver_url: string, username: string, password: string): Promise { - return this.request("login", { homeserver_url, username, password }) - } - - verify(recovery_key: string): Promise { - return this.request("verify", { recovery_key }) - } } diff --git a/web/src/api/rpc.ts b/web/src/api/rpc.ts index 61099f3..1556d82 100644 --- a/web/src/api/rpc.ts +++ b/web/src/api/rpc.ts @@ -13,19 +13,133 @@ // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -import { EventDispatcher, CachedEventDispatcher } from "../util/eventdispatcher.ts" +import { CachedEventDispatcher, EventDispatcher } from "../util/eventdispatcher.ts" import { CancellablePromise } from "../util/promise.ts" -import { RPCEvent } from "./types/hievents.ts" - -export interface RPCClient { - connect: CachedEventDispatcher - event: EventDispatcher - start(): void - stop(): void - request(command: string, data: Req): CancellablePromise -} +import type { + ClientWellKnown, + DBEvent, + EventID, + EventRowID, + EventType, + PaginationResponse, + RoomID, + TimelineRowID, UserID, +} from "./types/hitypes.ts" +import { RPCCommand, RPCEvent } from "./types/hievents.ts" export interface ConnectionEvent { connected: boolean error: Error | null } + +export class ErrorResponse extends Error { + constructor(public data: unknown) { + super(`${data}`) + } +} + +export default abstract class RPCClient { + public readonly connect: CachedEventDispatcher = new CachedEventDispatcher() + public readonly event: EventDispatcher = new EventDispatcher() + protected readonly pendingRequests: Map void, + reject: (err: Error) => void + }> = new Map() + protected nextRequestID: number = 1 + + protected abstract isConnected: boolean + protected abstract send(data: string): void + public abstract start(): void + public abstract stop(): void + + protected onCommand(data: RPCCommand) { + if (data.command === "response" || data.command === "error") { + const target = this.pendingRequests.get(data.request_id) + if (!target) { + console.error("Received response for unknown request:", data) + return + } + this.pendingRequests.delete(data.request_id) + if (data.command === "response") { + target.resolve(data.data) + } else { + target.reject(new ErrorResponse(data.data)) + } + } else { + this.event.emit(data as RPCEvent) + } + } + + protected cancelRequest(request_id: number, reason: string) { + if (!this.pendingRequests.has(request_id)) { + console.debug("Tried to cancel unknown request", request_id) + return + } + this.request("cancel", { request_id, reason }).then( + () => console.debug("Cancelled request", request_id, "for", reason), + err => console.debug("Failed to cancel request", request_id, "for", reason, err), + ) + } + + request(command: string, data: Req): CancellablePromise { + if (!this.isConnected) { + return new CancellablePromise((_resolve, reject) => { + reject(new Error("Websocket not connected")) + }, () => { + }) + } + const request_id = this.nextRequestID++ + return new CancellablePromise((resolve, reject) => { + if (!this.isConnected) { + reject(new Error("Websocket not connected")) + return + } + this.pendingRequests.set(request_id, { resolve: resolve as ((value: unknown) => void), reject }) + this.send(JSON.stringify({ + command, + request_id, + data, + })) + }, this.cancelRequest.bind(this, request_id)) + } + + sendMessage(room_id: RoomID, event_type: EventType, content: Record): Promise { + return this.request("send_message", { room_id, event_type, content }) + } + + ensureGroupSessionShared(room_id: RoomID): Promise { + return this.request("ensure_group_session_shared", { room_id }) + } + + getRoomState(room_id: RoomID, fetch_members = false, refetch = false): Promise { + return this.request("get_room_state", { room_id, fetch_members, refetch }) + } + + getEvent(room_id: RoomID, event_id: EventID): Promise { + return this.request("get_event", { room_id, event_id }) + } + + getEventsByRowIDs(row_ids: EventRowID[]): Promise { + return this.request("get_events_by_row_ids", { row_ids }) + } + + paginate(room_id: RoomID, max_timeline_id: TimelineRowID, limit: number): Promise { + return this.request("paginate", { room_id, max_timeline_id, limit }) + } + + paginateServer(room_id: RoomID, limit: number): Promise { + return this.request("paginate_server", { room_id, limit }) + } + + discoverHomeserver(user_id: UserID): Promise { + return this.request("discover_homeserver", { user_id }) + } + + login(homeserver_url: string, username: string, password: string): Promise { + return this.request("login", { homeserver_url, username, password }) + } + + verify(recovery_key: string): Promise { + return this.request("verify", { recovery_key }) + } +} diff --git a/web/src/api/wsclient.ts b/web/src/api/wsclient.ts index 095733e..865bf43 100644 --- a/web/src/api/wsclient.ts +++ b/web/src/api/wsclient.ts @@ -13,29 +13,14 @@ // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -import { CachedEventDispatcher, EventDispatcher } from "../util/eventdispatcher.ts" -import { CancellablePromise } from "../util/promise.ts" -import { RPCCommand, RPCEvent } from "./types/hievents.ts" -import { ConnectionEvent, RPCClient } from "./rpc.ts" +import type { RPCCommand } from "./types/hievents.ts" +import RPCClient from "./rpc.ts" -export class ErrorResponse extends Error { - constructor(public data: unknown) { - super(`${data}`) - } -} - -export default class WSClient implements RPCClient { +export default class WSClient extends RPCClient { #conn: WebSocket | null = null - readonly connect: CachedEventDispatcher = new CachedEventDispatcher() - readonly event: EventDispatcher = new EventDispatcher() - readonly #pendingRequests: Map void, - reject: (err: Error) => void - }> = new Map() - #nextRequestID: number = 1 constructor(readonly addr: string) { - + super() } start() { @@ -55,37 +40,15 @@ export default class WSClient implements RPCClient { this.#conn?.close(1000, "Client closed") } - #cancelRequest(request_id: number, reason: string) { - if (!this.#pendingRequests.has(request_id)) { - console.debug("Tried to cancel unknown request", request_id) - return - } - this.request("cancel", { request_id, reason }).then( - () => console.debug("Cancelled request", request_id, "for", reason), - err => console.debug("Failed to cancel request", request_id, "for", reason, err), - ) + get isConnected() { + return this.#conn?.readyState === WebSocket.OPEN } - request(command: string, data: Req): CancellablePromise { + send(data: string) { if (!this.#conn) { - return new CancellablePromise((_resolve, reject) => { - reject(new Error("Websocket not connected")) - }, () => { - }) + throw new Error("Websocket not connected") } - const request_id = this.#nextRequestID++ - return new CancellablePromise((resolve, reject) => { - if (!this.#conn) { - reject(new Error("Websocket not connected")) - return - } - this.#pendingRequests.set(request_id, { resolve: resolve as ((value: unknown) => void), reject }) - this.#conn.send(JSON.stringify({ - command, - request_id, - data, - })) - }, this.#cancelRequest.bind(this, request_id)) + this.#conn.send(data) } #onMessage = (ev: MessageEvent) => { @@ -101,21 +64,7 @@ export default class WSClient implements RPCClient { this.#conn?.close(1003, "Malformed JSON") return } - if (parsed.command === "response" || parsed.command === "error") { - const target = this.#pendingRequests.get(parsed.request_id) - if (!target) { - console.error("Received response for unknown request:", parsed) - return - } - this.#pendingRequests.delete(parsed.request_id) - if (parsed.command === "response") { - target.resolve(parsed.data) - } else { - target.reject(new ErrorResponse(parsed.data)) - } - } else { - this.event.emit(parsed as RPCEvent) - } + this.onCommand(parsed) } #dispatchConnectionStatus(connected: boolean, error: Error | null) { @@ -128,10 +77,10 @@ export default class WSClient implements RPCClient { } #clearPending = () => { - for (const { reject } of this.#pendingRequests.values()) { + for (const { reject } of this.pendingRequests.values()) { reject(new Error("Websocket closed")) } - this.#pendingRequests.clear() + this.pendingRequests.clear() } #onError = (ev: Event) => { diff --git a/web/src/ui/login/LoginScreen.tsx b/web/src/ui/login/LoginScreen.tsx index c8ad7f4..186fbc0 100644 --- a/web/src/ui/login/LoginScreen.tsx +++ b/web/src/ui/login/LoginScreen.tsx @@ -31,14 +31,14 @@ export const LoginScreen = ({ client }: LoginScreenProps) => { const login = useCallback((evt: React.FormEvent) => { evt.preventDefault() - client.login(homeserverURL, username, password).then( + client.rpc.login(homeserverURL, username, password).then( () => {}, err => setError(err.toString()), ) }, [homeserverURL, username, password, client]) const resolveHomeserver = useCallback(() => { - client.discoverHomeserver(username).then( + client.rpc.discoverHomeserver(username).then( resp => setHomeserverURL(resp["m.homeserver"].base_url), err => setError(`Failed to resolve homeserver: ${err}`), ) diff --git a/web/src/ui/login/VerificationScreen.tsx b/web/src/ui/login/VerificationScreen.tsx index 2333e0f..d094b60 100644 --- a/web/src/ui/login/VerificationScreen.tsx +++ b/web/src/ui/login/VerificationScreen.tsx @@ -26,7 +26,7 @@ export const VerificationScreen = ({ client, clientState }: LoginScreenProps) => const verify = useCallback((evt: React.FormEvent) => { evt.preventDefault() - client.verify(recoveryKey).then( + client.rpc.verify(recoveryKey).then( () => {}, err => setError(err.toString()), )