1
0
Fork 0
forked from Mirrors/gomuks

web: reorganize RPC client inheritance

This commit is contained in:
Tulir Asokan 2024-10-09 01:43:09 +03:00
parent c048eedabe
commit 7831ec5d62
5 changed files with 143 additions and 120 deletions

View file

@ -15,10 +15,10 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
import { CachedEventDispatcher } from "../util/eventdispatcher.ts" import { CachedEventDispatcher } from "../util/eventdispatcher.ts"
import type { import type {
ClientWellKnown, DBEvent, EventID, EventRowID, EventType, PaginationResponse, RoomID, TimelineRowID, UserID, RoomID,
} from "./types/hitypes.ts" } from "./types/hitypes.ts"
import { ClientState, RPCEvent } from "./types/hievents.ts" import type { ClientState, RPCEvent } from "./types/hievents.ts"
import { RPCClient } from "./rpc.ts" import type RPCClient from "./rpc.ts"
import { StateStore } from "./statestore.ts" import { StateStore } from "./statestore.ts"
export default class Client { export default class Client {
@ -39,56 +39,16 @@ export default class Client {
} }
} }
request<Req, Resp>(command: string, data: Req): Promise<Resp> {
return this.rpc.request(command, data)
}
sendMessage(room_id: RoomID, event_type: EventType, content: Record<string, unknown>): Promise<DBEvent> {
return this.request("send_message", { room_id, event_type, content })
}
ensureGroupSessionShared(room_id: RoomID): Promise<boolean> {
return this.request("ensure_group_session_shared", { room_id })
}
getEvent(room_id: RoomID, event_id: EventID): Promise<DBEvent> {
return this.request("get_event", { room_id, event_id })
}
getEventsByRowIDs(row_ids: EventRowID[]): Promise<DBEvent[]> {
return this.request("get_events_by_row_ids", { row_ids })
}
async loadMoreHistory(roomID: RoomID): Promise<void> { async loadMoreHistory(roomID: RoomID): Promise<void> {
const room = this.store.rooms.get(roomID) const room = this.store.rooms.get(roomID)
if (!room) { if (!room) {
throw new Error("Room not found") throw new Error("Room not found")
} }
const oldestRowID = room.timeline.current[0]?.timeline_rowid const oldestRowID = room.timeline.current[0]?.timeline_rowid
const resp = await this.paginate(roomID, oldestRowID ?? 0, 100) const resp = await this.rpc.paginate(roomID, oldestRowID ?? 0, 100)
if (room.timeline.current[0]?.timeline_rowid !== oldestRowID) { if (room.timeline.current[0]?.timeline_rowid !== oldestRowID) {
throw new Error("Timeline changed while loading history") throw new Error("Timeline changed while loading history")
} }
room.applyPagination(resp.events) room.applyPagination(resp.events)
} }
paginate(room_id: RoomID, max_timeline_id: TimelineRowID, limit: number): Promise<PaginationResponse> {
return this.request("paginate", { room_id, max_timeline_id, limit })
}
paginateServer(room_id: RoomID, limit: number): Promise<PaginationResponse> {
return this.request("paginate_server", { room_id, limit })
}
discoverHomeserver(user_id: UserID): Promise<ClientWellKnown> {
return this.request("discover_homeserver", { user_id })
}
login(homeserver_url: string, username: string, password: string): Promise<boolean> {
return this.request("login", { homeserver_url, username, password })
}
verify(recovery_key: string): Promise<boolean> {
return this.request("verify", { recovery_key })
}
} }

View file

@ -13,19 +13,133 @@
// //
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
import { EventDispatcher, CachedEventDispatcher } from "../util/eventdispatcher.ts" import { CachedEventDispatcher, EventDispatcher } from "../util/eventdispatcher.ts"
import { CancellablePromise } from "../util/promise.ts" import { CancellablePromise } from "../util/promise.ts"
import { RPCEvent } from "./types/hievents.ts" import type {
ClientWellKnown,
export interface RPCClient { DBEvent,
connect: CachedEventDispatcher<ConnectionEvent> EventID,
event: EventDispatcher<RPCEvent> EventRowID,
start(): void EventType,
stop(): void PaginationResponse,
request<Req, Resp>(command: string, data: Req): CancellablePromise<Resp> RoomID,
} TimelineRowID, UserID,
} from "./types/hitypes.ts"
import { RPCCommand, RPCEvent } from "./types/hievents.ts"
export interface ConnectionEvent { export interface ConnectionEvent {
connected: boolean connected: boolean
error: Error | null error: Error | null
} }
export class ErrorResponse extends Error {
constructor(public data: unknown) {
super(`${data}`)
}
}
export default abstract class RPCClient {
public readonly connect: CachedEventDispatcher<ConnectionEvent> = new CachedEventDispatcher()
public readonly event: EventDispatcher<RPCEvent> = new EventDispatcher()
protected readonly pendingRequests: Map<number, {
resolve: (data: unknown) => void,
reject: (err: Error) => void
}> = new Map()
protected nextRequestID: number = 1
protected abstract isConnected: boolean
protected abstract send(data: string): void
public abstract start(): void
public abstract stop(): void
protected onCommand(data: RPCCommand<unknown>) {
if (data.command === "response" || data.command === "error") {
const target = this.pendingRequests.get(data.request_id)
if (!target) {
console.error("Received response for unknown request:", data)
return
}
this.pendingRequests.delete(data.request_id)
if (data.command === "response") {
target.resolve(data.data)
} else {
target.reject(new ErrorResponse(data.data))
}
} else {
this.event.emit(data as RPCEvent)
}
}
protected cancelRequest(request_id: number, reason: string) {
if (!this.pendingRequests.has(request_id)) {
console.debug("Tried to cancel unknown request", request_id)
return
}
this.request("cancel", { request_id, reason }).then(
() => console.debug("Cancelled request", request_id, "for", reason),
err => console.debug("Failed to cancel request", request_id, "for", reason, err),
)
}
request<Req, Resp>(command: string, data: Req): CancellablePromise<Resp> {
if (!this.isConnected) {
return new CancellablePromise((_resolve, reject) => {
reject(new Error("Websocket not connected"))
}, () => {
})
}
const request_id = this.nextRequestID++
return new CancellablePromise((resolve, reject) => {
if (!this.isConnected) {
reject(new Error("Websocket not connected"))
return
}
this.pendingRequests.set(request_id, { resolve: resolve as ((value: unknown) => void), reject })
this.send(JSON.stringify({
command,
request_id,
data,
}))
}, this.cancelRequest.bind(this, request_id))
}
sendMessage(room_id: RoomID, event_type: EventType, content: Record<string, unknown>): Promise<DBEvent> {
return this.request("send_message", { room_id, event_type, content })
}
ensureGroupSessionShared(room_id: RoomID): Promise<boolean> {
return this.request("ensure_group_session_shared", { room_id })
}
getRoomState(room_id: RoomID, fetch_members = false, refetch = false): Promise<DBEvent[]> {
return this.request("get_room_state", { room_id, fetch_members, refetch })
}
getEvent(room_id: RoomID, event_id: EventID): Promise<DBEvent> {
return this.request("get_event", { room_id, event_id })
}
getEventsByRowIDs(row_ids: EventRowID[]): Promise<DBEvent[]> {
return this.request("get_events_by_row_ids", { row_ids })
}
paginate(room_id: RoomID, max_timeline_id: TimelineRowID, limit: number): Promise<PaginationResponse> {
return this.request("paginate", { room_id, max_timeline_id, limit })
}
paginateServer(room_id: RoomID, limit: number): Promise<PaginationResponse> {
return this.request("paginate_server", { room_id, limit })
}
discoverHomeserver(user_id: UserID): Promise<ClientWellKnown> {
return this.request("discover_homeserver", { user_id })
}
login(homeserver_url: string, username: string, password: string): Promise<boolean> {
return this.request("login", { homeserver_url, username, password })
}
verify(recovery_key: string): Promise<boolean> {
return this.request("verify", { recovery_key })
}
}

View file

@ -13,29 +13,14 @@
// //
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
import { CachedEventDispatcher, EventDispatcher } from "../util/eventdispatcher.ts" import type { RPCCommand } from "./types/hievents.ts"
import { CancellablePromise } from "../util/promise.ts" import RPCClient from "./rpc.ts"
import { RPCCommand, RPCEvent } from "./types/hievents.ts"
import { ConnectionEvent, RPCClient } from "./rpc.ts"
export class ErrorResponse extends Error { export default class WSClient extends RPCClient {
constructor(public data: unknown) {
super(`${data}`)
}
}
export default class WSClient implements RPCClient {
#conn: WebSocket | null = null #conn: WebSocket | null = null
readonly connect: CachedEventDispatcher<ConnectionEvent> = new CachedEventDispatcher()
readonly event: EventDispatcher<RPCEvent> = new EventDispatcher()
readonly #pendingRequests: Map<number, {
resolve: (data: unknown) => void,
reject: (err: Error) => void
}> = new Map()
#nextRequestID: number = 1
constructor(readonly addr: string) { constructor(readonly addr: string) {
super()
} }
start() { start() {
@ -55,37 +40,15 @@ export default class WSClient implements RPCClient {
this.#conn?.close(1000, "Client closed") this.#conn?.close(1000, "Client closed")
} }
#cancelRequest(request_id: number, reason: string) { get isConnected() {
if (!this.#pendingRequests.has(request_id)) { return this.#conn?.readyState === WebSocket.OPEN
console.debug("Tried to cancel unknown request", request_id)
return
}
this.request("cancel", { request_id, reason }).then(
() => console.debug("Cancelled request", request_id, "for", reason),
err => console.debug("Failed to cancel request", request_id, "for", reason, err),
)
} }
request<Req, Resp>(command: string, data: Req): CancellablePromise<Resp> { send(data: string) {
if (!this.#conn) { if (!this.#conn) {
return new CancellablePromise((_resolve, reject) => { throw new Error("Websocket not connected")
reject(new Error("Websocket not connected"))
}, () => {
})
} }
const request_id = this.#nextRequestID++ this.#conn.send(data)
return new CancellablePromise((resolve, reject) => {
if (!this.#conn) {
reject(new Error("Websocket not connected"))
return
}
this.#pendingRequests.set(request_id, { resolve: resolve as ((value: unknown) => void), reject })
this.#conn.send(JSON.stringify({
command,
request_id,
data,
}))
}, this.#cancelRequest.bind(this, request_id))
} }
#onMessage = (ev: MessageEvent) => { #onMessage = (ev: MessageEvent) => {
@ -101,21 +64,7 @@ export default class WSClient implements RPCClient {
this.#conn?.close(1003, "Malformed JSON") this.#conn?.close(1003, "Malformed JSON")
return return
} }
if (parsed.command === "response" || parsed.command === "error") { this.onCommand(parsed)
const target = this.#pendingRequests.get(parsed.request_id)
if (!target) {
console.error("Received response for unknown request:", parsed)
return
}
this.#pendingRequests.delete(parsed.request_id)
if (parsed.command === "response") {
target.resolve(parsed.data)
} else {
target.reject(new ErrorResponse(parsed.data))
}
} else {
this.event.emit(parsed as RPCEvent)
}
} }
#dispatchConnectionStatus(connected: boolean, error: Error | null) { #dispatchConnectionStatus(connected: boolean, error: Error | null) {
@ -128,10 +77,10 @@ export default class WSClient implements RPCClient {
} }
#clearPending = () => { #clearPending = () => {
for (const { reject } of this.#pendingRequests.values()) { for (const { reject } of this.pendingRequests.values()) {
reject(new Error("Websocket closed")) reject(new Error("Websocket closed"))
} }
this.#pendingRequests.clear() this.pendingRequests.clear()
} }
#onError = (ev: Event) => { #onError = (ev: Event) => {

View file

@ -31,14 +31,14 @@ export const LoginScreen = ({ client }: LoginScreenProps) => {
const login = useCallback((evt: React.FormEvent) => { const login = useCallback((evt: React.FormEvent) => {
evt.preventDefault() evt.preventDefault()
client.login(homeserverURL, username, password).then( client.rpc.login(homeserverURL, username, password).then(
() => {}, () => {},
err => setError(err.toString()), err => setError(err.toString()),
) )
}, [homeserverURL, username, password, client]) }, [homeserverURL, username, password, client])
const resolveHomeserver = useCallback(() => { const resolveHomeserver = useCallback(() => {
client.discoverHomeserver(username).then( client.rpc.discoverHomeserver(username).then(
resp => setHomeserverURL(resp["m.homeserver"].base_url), resp => setHomeserverURL(resp["m.homeserver"].base_url),
err => setError(`Failed to resolve homeserver: ${err}`), err => setError(`Failed to resolve homeserver: ${err}`),
) )

View file

@ -26,7 +26,7 @@ export const VerificationScreen = ({ client, clientState }: LoginScreenProps) =>
const verify = useCallback((evt: React.FormEvent) => { const verify = useCallback((evt: React.FormEvent) => {
evt.preventDefault() evt.preventDefault()
client.verify(recoveryKey).then( client.rpc.verify(recoveryKey).then(
() => {}, () => {},
err => setError(err.toString()), err => setError(err.toString()),
) )