forked from Mirrors/gomuks
web/statestore: fix state subscribers
This commit is contained in:
parent
11e1eef5e2
commit
1f521f8fac
4 changed files with 52 additions and 51 deletions
|
@ -48,14 +48,10 @@ export default class Client {
|
|||
if (typeof room === "string") {
|
||||
room = this.store.rooms.get(room)
|
||||
}
|
||||
if (!room || room.eventsByID.has(eventID)) {
|
||||
if (!room || room.eventsByID.has(eventID) || room.requestedEvents.has(eventID)) {
|
||||
return
|
||||
}
|
||||
const sub = room.getEventSubscriber(eventID)
|
||||
if (sub.requested) {
|
||||
return
|
||||
}
|
||||
sub.requested = true
|
||||
room.requestedEvents.add(eventID)
|
||||
this.rpc.getEvent(room.roomID, eventID).then(
|
||||
evt => room.applyEvent(evt),
|
||||
err => console.error(`Failed to fetch event ${eventID}`, err),
|
||||
|
|
|
@ -28,7 +28,7 @@ export function useRoomState(
|
|||
room: RoomStateStore, type: EventType, stateKey: string | undefined = "",
|
||||
): MemDBEvent | null {
|
||||
return useSyncExternalStore(
|
||||
stateKey === undefined ? noopSubscribe : room.getStateSubscriber(type, stateKey).subscribe,
|
||||
stateKey === undefined ? noopSubscribe : room.stateSubs.getSubscriber(room.stateSubKey(type, stateKey)),
|
||||
stateKey === undefined ? returnNull : (() => room.getStateEvent(type, stateKey) ?? null),
|
||||
)
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ const returnNull = () => null
|
|||
|
||||
export function useRoomEvent(room: RoomStateStore, eventID: EventID | null): MemDBEvent | null {
|
||||
return useSyncExternalStore(
|
||||
eventID ? room.getEventSubscriber(eventID).subscribe : noopSubscribe,
|
||||
eventID ? room.eventSubs.getSubscriber(eventID) : noopSubscribe,
|
||||
eventID ? (() => room.eventsByID.get(eventID) ?? null) : returnNull,
|
||||
)
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
import { NonNullCachedEventDispatcher } from "@/util/eventdispatcher.ts"
|
||||
import Subscribable from "@/util/subscribable.ts"
|
||||
import Subscribable, { MultiSubscribable } from "@/util/subscribable.ts"
|
||||
import type {
|
||||
DBRoom,
|
||||
EncryptedEventContent,
|
||||
|
@ -61,10 +61,6 @@ function visibleMetaIsEqual(meta1: DBRoom, meta2: DBRoom): boolean {
|
|||
meta1.has_member_list === meta2.has_member_list
|
||||
}
|
||||
|
||||
class EventSubscribable extends Subscribable {
|
||||
requested: boolean = false
|
||||
}
|
||||
|
||||
export class RoomStateStore {
|
||||
readonly roomID: RoomID
|
||||
readonly meta: NonNullCachedEventDispatcher<DBRoom>
|
||||
|
@ -75,8 +71,9 @@ export class RoomStateStore {
|
|||
readonly eventsByRowID: Map<EventRowID, MemDBEvent> = new Map()
|
||||
readonly eventsByID: Map<EventID, MemDBEvent> = new Map()
|
||||
readonly timelineSub = new Subscribable()
|
||||
readonly stateSubs: Map<EventID, Subscribable> = new Map()
|
||||
readonly eventSubs: Map<EventID, EventSubscribable> = new Map()
|
||||
readonly stateSubs = new MultiSubscribable()
|
||||
readonly eventSubs = new MultiSubscribable()
|
||||
readonly requestedEvents: Set<EventID> = new Set()
|
||||
readonly openNotifications: Map<EventRowID, Notification> = new Map()
|
||||
readonly pendingEvents: EventRowID[] = []
|
||||
paginating = false
|
||||
|
@ -102,28 +99,8 @@ export class RoomStateStore {
|
|||
this.timelineSub.notify()
|
||||
}
|
||||
|
||||
getEventSubscriber(eventID: EventID): EventSubscribable {
|
||||
let sub = this.eventSubs.get(eventID)
|
||||
if (!sub) {
|
||||
sub = new EventSubscribable(() => this.eventsByID.has(eventID) && this.eventSubs.delete(eventID))
|
||||
this.eventSubs.set(eventID, sub)
|
||||
}
|
||||
return sub
|
||||
}
|
||||
|
||||
notifyStateSubscribers(eventType: EventType, stateKey: string) {
|
||||
const subKey = `${eventType}:${stateKey}`
|
||||
this.stateSubs.get(subKey)?.notify()
|
||||
}
|
||||
|
||||
getStateSubscriber(eventType: EventType, stateKey: string): Subscribable {
|
||||
const subKey = `${eventType}:${stateKey}`
|
||||
let sub = this.stateSubs.get(subKey)
|
||||
if (!sub) {
|
||||
sub = new Subscribable(() => this.stateSubs.delete(subKey))
|
||||
this.stateSubs.set(subKey, sub)
|
||||
}
|
||||
return sub
|
||||
stateSubKey(eventType: EventType, stateKey: string): string {
|
||||
return `${eventType}:${stateKey}`
|
||||
}
|
||||
|
||||
getStateEvent(type: EventType, stateKey: string): MemDBEvent | undefined {
|
||||
|
@ -176,18 +153,19 @@ export class RoomStateStore {
|
|||
content: memEvt.content["m.new_content"],
|
||||
local_content: memEvt.local_content,
|
||||
})
|
||||
this.eventSubs.get(editTarget.event_id)?.notify()
|
||||
this.eventSubs.notify(editTarget.event_id)
|
||||
}
|
||||
}
|
||||
this.eventsByRowID.set(memEvt.rowid, memEvt)
|
||||
this.eventsByID.set(memEvt.event_id, memEvt)
|
||||
this.requestedEvents.delete(memEvt.event_id)
|
||||
if (!pending) {
|
||||
const pendingIdx = this.pendingEvents.indexOf(evt.rowid)
|
||||
const pendingIdx = this.pendingEvents.indexOf(memEvt.rowid)
|
||||
if (pendingIdx !== -1) {
|
||||
this.pendingEvents.splice(pendingIdx, 1)
|
||||
}
|
||||
}
|
||||
this.eventSubs.get(evt.event_id)?.notify()
|
||||
this.eventSubs.notify(memEvt.event_id)
|
||||
}
|
||||
|
||||
applySendComplete(evt: RawDBEvent) {
|
||||
|
@ -216,7 +194,7 @@ export class RoomStateStore {
|
|||
}
|
||||
for (const [key, rowID] of Object.entries(changedEvts)) {
|
||||
stateMap.set(key, rowID)
|
||||
this.notifyStateSubscribers(evtType, key)
|
||||
this.stateSubs.notify(this.stateSubKey(evtType, key))
|
||||
}
|
||||
}
|
||||
if (sync.reset) {
|
||||
|
@ -252,7 +230,7 @@ export class RoomStateStore {
|
|||
this.stateLoaded = true
|
||||
for (const [evtType, stateMap] of newStateMap) {
|
||||
for (const [key] of stateMap) {
|
||||
this.notifyStateSubscribers(evtType, key)
|
||||
this.stateSubs.notify(this.stateSubKey(evtType, key))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,17 +20,9 @@ export type SubscribeFunc = (callback: Subscriber) => () => void
|
|||
export default class Subscribable {
|
||||
readonly subscribers: Set<Subscriber> = new Set()
|
||||
|
||||
constructor(private onEmpty?: () => void) {
|
||||
}
|
||||
|
||||
subscribe: SubscribeFunc = callback => {
|
||||
this.subscribers.add(callback)
|
||||
return () => {
|
||||
this.subscribers.delete(callback)
|
||||
if (this.subscribers.size === 0) {
|
||||
this.onEmpty?.()
|
||||
}
|
||||
}
|
||||
return () => this.subscribers.delete(callback)
|
||||
}
|
||||
|
||||
notify() {
|
||||
|
@ -39,3 +31,38 @@ export default class Subscribable {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class MultiSubscribable {
|
||||
readonly subscribers: Map<string, Set<Subscriber>> = new Map()
|
||||
readonly subscribeFuncs: Map<string, SubscribeFunc> = new Map()
|
||||
|
||||
getSubscriber(key: string): SubscribeFunc {
|
||||
let subscribe = this.subscribeFuncs.get(key)
|
||||
if (!subscribe) {
|
||||
const subs = new Set<Subscriber>()
|
||||
subscribe = callback => {
|
||||
subs.add(callback)
|
||||
return () => {
|
||||
subs.delete(callback)
|
||||
// if (subs.size === 0 && Object.is(subs, this.subscribers.get(key))) {
|
||||
// this.subscribers.delete(key)
|
||||
// this.subscribeFuncs.delete(key)
|
||||
// }
|
||||
}
|
||||
}
|
||||
this.subscribers.set(key, subs)
|
||||
this.subscribeFuncs.set(key, subscribe)
|
||||
}
|
||||
return subscribe
|
||||
}
|
||||
|
||||
notify(key: string) {
|
||||
const subs = this.subscribers.get(key)
|
||||
if (!subs) {
|
||||
return
|
||||
}
|
||||
for (const sub of subs) {
|
||||
sub()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue