From 1550d534f859769d435cda5b113848d64f4a4d3f Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 17 Oct 2024 20:37:38 +0300 Subject: [PATCH] websocket: move generating initial sync into hicli --- pkg/hicli/init.go | 79 +++++++++++++++++++++++++++++++++++++++++++++++ websocket.go | 65 ++------------------------------------ 2 files changed, 81 insertions(+), 63 deletions(-) create mode 100644 pkg/hicli/init.go diff --git a/pkg/hicli/init.go b/pkg/hicli/init.go new file mode 100644 index 0000000..3b6175a --- /dev/null +++ b/pkg/hicli/init.go @@ -0,0 +1,79 @@ +package hicli + +import ( + "context" + "iter" + "time" + + "github.com/rs/zerolog" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" + + "go.mau.fi/gomuks/pkg/hicli/database" +) + +func (h *HiClient) getInitialSyncRoom(ctx context.Context, room *database.Room) *SyncRoom { + syncRoom := &SyncRoom{ + Meta: room, + Events: make([]*database.Event, 0, 2), + Timeline: make([]database.TimelineRowTuple, 0), + State: map[event.Type]map[string]database.EventRowID{}, + Notifications: make([]SyncNotification, 0), + } + if room.PreviewEventRowID != 0 { + previewEvent, err := h.DB.Event.GetByRowID(ctx, room.PreviewEventRowID) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to get preview event for room") + } else if previewEvent != nil { + h.ReprocessExistingEvent(ctx, previewEvent) + previewMember, err := h.DB.CurrentState.Get(ctx, room.ID, event.StateMember, previewEvent.Sender.String()) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to get preview member event for room") + } else if previewMember != nil { + syncRoom.Events = append(syncRoom.Events, previewMember) + syncRoom.State[event.StateMember] = map[string]database.EventRowID{ + *previewMember.StateKey: previewMember.RowID, + } + } + if previewEvent.LastEditRowID != nil { + lastEdit, err := h.DB.Event.GetByRowID(ctx, *previewEvent.LastEditRowID) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to get last edit for preview event") + } else if lastEdit != nil { + h.ReprocessExistingEvent(ctx, lastEdit) + syncRoom.Events = append(syncRoom.Events, lastEdit) + } + } + syncRoom.Events = append(syncRoom.Events, previewEvent) + } + } + return syncRoom +} + +func (h *HiClient) GetInitialSync(ctx context.Context, batchSize int) iter.Seq[*SyncComplete] { + return func(yield func(*SyncComplete) bool) { + maxTS := time.Now().Add(1 * time.Hour) + for { + rooms, err := h.DB.Room.GetBySortTS(ctx, maxTS, batchSize) + if err != nil { + if ctx.Err() == nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to get initial rooms to send to client") + } + return + } + payload := SyncComplete{ + Rooms: make(map[id.RoomID]*SyncRoom, len(rooms)-1), + } + for _, room := range rooms { + if room.SortingTimestamp == rooms[len(rooms)-1].SortingTimestamp { + break + } + maxTS = room.SortingTimestamp.Time + payload.Rooms[room.ID] = h.getInitialSyncRoom(ctx, room) + } + if !yield(&payload) || len(rooms) < batchSize { + break + } + } + } +} diff --git a/websocket.go b/websocket.go index d6a89ba..15c6846 100644 --- a/websocket.go +++ b/websocket.go @@ -29,11 +29,8 @@ import ( "github.com/coder/websocket" "github.com/rs/zerolog" "go.mau.fi/util/exerrors" - "maunium.net/go/mautrix/event" - "maunium.net/go/mautrix/id" "go.mau.fi/gomuks/pkg/hicli" - "go.mau.fi/gomuks/pkg/hicli/database" ) func writeCmd(ctx context.Context, conn *websocket.Conn, cmd *hicli.JSONCommand) error { @@ -238,65 +235,10 @@ func (gmx *Gomuks) HandleWebsocket(w http.ResponseWriter, r *http.Request) { } func (gmx *Gomuks) sendInitialData(ctx context.Context, conn *websocket.Conn) { - maxTS := time.Now().Add(1 * time.Hour) log := zerolog.Ctx(ctx) var roomCount int - const BatchSize = 100 - for { - rooms, err := gmx.Client.DB.Room.GetBySortTS(ctx, maxTS, BatchSize) - if err != nil { - if ctx.Err() == nil { - log.Err(err).Msg("Failed to get initial rooms to send to client") - } - return - } - roomCount += len(rooms) - payload := hicli.SyncComplete{ - Rooms: make(map[id.RoomID]*hicli.SyncRoom, len(rooms)-1), - } - for _, room := range rooms { - if room.SortingTimestamp == rooms[len(rooms)-1].SortingTimestamp { - break - } - maxTS = room.SortingTimestamp.Time - syncRoom := &hicli.SyncRoom{ - Meta: room, - Events: make([]*database.Event, 0, 2), - Timeline: make([]database.TimelineRowTuple, 0), - State: map[event.Type]map[string]database.EventRowID{}, - Notifications: make([]hicli.SyncNotification, 0), - } - payload.Rooms[room.ID] = syncRoom - if room.PreviewEventRowID != 0 { - previewEvent, err := gmx.Client.DB.Event.GetByRowID(ctx, room.PreviewEventRowID) - if err != nil { - log.Err(err).Msg("Failed to get preview event for room") - return - } - if previewEvent != nil { - gmx.Client.ReprocessExistingEvent(ctx, previewEvent) - previewMember, err := gmx.Client.DB.CurrentState.Get(ctx, room.ID, event.StateMember, previewEvent.Sender.String()) - if err != nil { - log.Err(err).Msg("Failed to get preview member event for room") - } else if previewMember != nil { - syncRoom.Events = append(syncRoom.Events, previewMember) - syncRoom.State[event.StateMember] = map[string]database.EventRowID{ - *previewMember.StateKey: previewMember.RowID, - } - } - if previewEvent.LastEditRowID != nil { - lastEdit, err := gmx.Client.DB.Event.GetByRowID(ctx, *previewEvent.LastEditRowID) - if err != nil { - log.Err(err).Msg("Failed to get last edit for preview event") - } else if lastEdit != nil { - gmx.Client.ReprocessExistingEvent(ctx, lastEdit) - syncRoom.Events = append(syncRoom.Events, lastEdit) - } - } - syncRoom.Events = append(syncRoom.Events, previewEvent) - } - } - } + for payload := range gmx.Client.GetInitialSync(ctx, 100) { + roomCount += len(payload.Rooms) marshaledPayload, err := json.Marshal(&payload) if err != nil { log.Err(err).Msg("Failed to marshal initial rooms to send to client") @@ -311,9 +253,6 @@ func (gmx *Gomuks) sendInitialData(ctx context.Context, conn *websocket.Conn) { log.Err(err).Msg("Failed to send initial rooms to client") return } - if len(rooms) < BatchSize { - break - } } log.Info().Int("room_count", roomCount).Msg("Sent initial rooms to client") }