mirror of
https://github.com/tulir/gomuks.git
synced 2025-04-19 18:13:41 -05:00
websocket: move generating initial sync into hicli
This commit is contained in:
parent
1db1d2db5c
commit
1550d534f8
2 changed files with 81 additions and 63 deletions
79
pkg/hicli/init.go
Normal file
79
pkg/hicli/init.go
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
package hicli
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"iter"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/rs/zerolog"
|
||||||
|
"maunium.net/go/mautrix/event"
|
||||||
|
"maunium.net/go/mautrix/id"
|
||||||
|
|
||||||
|
"go.mau.fi/gomuks/pkg/hicli/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (h *HiClient) getInitialSyncRoom(ctx context.Context, room *database.Room) *SyncRoom {
|
||||||
|
syncRoom := &SyncRoom{
|
||||||
|
Meta: room,
|
||||||
|
Events: make([]*database.Event, 0, 2),
|
||||||
|
Timeline: make([]database.TimelineRowTuple, 0),
|
||||||
|
State: map[event.Type]map[string]database.EventRowID{},
|
||||||
|
Notifications: make([]SyncNotification, 0),
|
||||||
|
}
|
||||||
|
if room.PreviewEventRowID != 0 {
|
||||||
|
previewEvent, err := h.DB.Event.GetByRowID(ctx, room.PreviewEventRowID)
|
||||||
|
if err != nil {
|
||||||
|
zerolog.Ctx(ctx).Err(err).Msg("Failed to get preview event for room")
|
||||||
|
} else if previewEvent != nil {
|
||||||
|
h.ReprocessExistingEvent(ctx, previewEvent)
|
||||||
|
previewMember, err := h.DB.CurrentState.Get(ctx, room.ID, event.StateMember, previewEvent.Sender.String())
|
||||||
|
if err != nil {
|
||||||
|
zerolog.Ctx(ctx).Err(err).Msg("Failed to get preview member event for room")
|
||||||
|
} else if previewMember != nil {
|
||||||
|
syncRoom.Events = append(syncRoom.Events, previewMember)
|
||||||
|
syncRoom.State[event.StateMember] = map[string]database.EventRowID{
|
||||||
|
*previewMember.StateKey: previewMember.RowID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if previewEvent.LastEditRowID != nil {
|
||||||
|
lastEdit, err := h.DB.Event.GetByRowID(ctx, *previewEvent.LastEditRowID)
|
||||||
|
if err != nil {
|
||||||
|
zerolog.Ctx(ctx).Err(err).Msg("Failed to get last edit for preview event")
|
||||||
|
} else if lastEdit != nil {
|
||||||
|
h.ReprocessExistingEvent(ctx, lastEdit)
|
||||||
|
syncRoom.Events = append(syncRoom.Events, lastEdit)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
syncRoom.Events = append(syncRoom.Events, previewEvent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return syncRoom
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HiClient) GetInitialSync(ctx context.Context, batchSize int) iter.Seq[*SyncComplete] {
|
||||||
|
return func(yield func(*SyncComplete) bool) {
|
||||||
|
maxTS := time.Now().Add(1 * time.Hour)
|
||||||
|
for {
|
||||||
|
rooms, err := h.DB.Room.GetBySortTS(ctx, maxTS, batchSize)
|
||||||
|
if err != nil {
|
||||||
|
if ctx.Err() == nil {
|
||||||
|
zerolog.Ctx(ctx).Err(err).Msg("Failed to get initial rooms to send to client")
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
payload := SyncComplete{
|
||||||
|
Rooms: make(map[id.RoomID]*SyncRoom, len(rooms)-1),
|
||||||
|
}
|
||||||
|
for _, room := range rooms {
|
||||||
|
if room.SortingTimestamp == rooms[len(rooms)-1].SortingTimestamp {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
maxTS = room.SortingTimestamp.Time
|
||||||
|
payload.Rooms[room.ID] = h.getInitialSyncRoom(ctx, room)
|
||||||
|
}
|
||||||
|
if !yield(&payload) || len(rooms) < batchSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
65
websocket.go
65
websocket.go
|
@ -29,11 +29,8 @@ import (
|
||||||
"github.com/coder/websocket"
|
"github.com/coder/websocket"
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
"go.mau.fi/util/exerrors"
|
"go.mau.fi/util/exerrors"
|
||||||
"maunium.net/go/mautrix/event"
|
|
||||||
"maunium.net/go/mautrix/id"
|
|
||||||
|
|
||||||
"go.mau.fi/gomuks/pkg/hicli"
|
"go.mau.fi/gomuks/pkg/hicli"
|
||||||
"go.mau.fi/gomuks/pkg/hicli/database"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func writeCmd(ctx context.Context, conn *websocket.Conn, cmd *hicli.JSONCommand) error {
|
func writeCmd(ctx context.Context, conn *websocket.Conn, cmd *hicli.JSONCommand) error {
|
||||||
|
@ -238,65 +235,10 @@ func (gmx *Gomuks) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gmx *Gomuks) sendInitialData(ctx context.Context, conn *websocket.Conn) {
|
func (gmx *Gomuks) sendInitialData(ctx context.Context, conn *websocket.Conn) {
|
||||||
maxTS := time.Now().Add(1 * time.Hour)
|
|
||||||
log := zerolog.Ctx(ctx)
|
log := zerolog.Ctx(ctx)
|
||||||
var roomCount int
|
var roomCount int
|
||||||
const BatchSize = 100
|
for payload := range gmx.Client.GetInitialSync(ctx, 100) {
|
||||||
for {
|
roomCount += len(payload.Rooms)
|
||||||
rooms, err := gmx.Client.DB.Room.GetBySortTS(ctx, maxTS, BatchSize)
|
|
||||||
if err != nil {
|
|
||||||
if ctx.Err() == nil {
|
|
||||||
log.Err(err).Msg("Failed to get initial rooms to send to client")
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
roomCount += len(rooms)
|
|
||||||
payload := hicli.SyncComplete{
|
|
||||||
Rooms: make(map[id.RoomID]*hicli.SyncRoom, len(rooms)-1),
|
|
||||||
}
|
|
||||||
for _, room := range rooms {
|
|
||||||
if room.SortingTimestamp == rooms[len(rooms)-1].SortingTimestamp {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
maxTS = room.SortingTimestamp.Time
|
|
||||||
syncRoom := &hicli.SyncRoom{
|
|
||||||
Meta: room,
|
|
||||||
Events: make([]*database.Event, 0, 2),
|
|
||||||
Timeline: make([]database.TimelineRowTuple, 0),
|
|
||||||
State: map[event.Type]map[string]database.EventRowID{},
|
|
||||||
Notifications: make([]hicli.SyncNotification, 0),
|
|
||||||
}
|
|
||||||
payload.Rooms[room.ID] = syncRoom
|
|
||||||
if room.PreviewEventRowID != 0 {
|
|
||||||
previewEvent, err := gmx.Client.DB.Event.GetByRowID(ctx, room.PreviewEventRowID)
|
|
||||||
if err != nil {
|
|
||||||
log.Err(err).Msg("Failed to get preview event for room")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if previewEvent != nil {
|
|
||||||
gmx.Client.ReprocessExistingEvent(ctx, previewEvent)
|
|
||||||
previewMember, err := gmx.Client.DB.CurrentState.Get(ctx, room.ID, event.StateMember, previewEvent.Sender.String())
|
|
||||||
if err != nil {
|
|
||||||
log.Err(err).Msg("Failed to get preview member event for room")
|
|
||||||
} else if previewMember != nil {
|
|
||||||
syncRoom.Events = append(syncRoom.Events, previewMember)
|
|
||||||
syncRoom.State[event.StateMember] = map[string]database.EventRowID{
|
|
||||||
*previewMember.StateKey: previewMember.RowID,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if previewEvent.LastEditRowID != nil {
|
|
||||||
lastEdit, err := gmx.Client.DB.Event.GetByRowID(ctx, *previewEvent.LastEditRowID)
|
|
||||||
if err != nil {
|
|
||||||
log.Err(err).Msg("Failed to get last edit for preview event")
|
|
||||||
} else if lastEdit != nil {
|
|
||||||
gmx.Client.ReprocessExistingEvent(ctx, lastEdit)
|
|
||||||
syncRoom.Events = append(syncRoom.Events, lastEdit)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
syncRoom.Events = append(syncRoom.Events, previewEvent)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
marshaledPayload, err := json.Marshal(&payload)
|
marshaledPayload, err := json.Marshal(&payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Err(err).Msg("Failed to marshal initial rooms to send to client")
|
log.Err(err).Msg("Failed to marshal initial rooms to send to client")
|
||||||
|
@ -311,9 +253,6 @@ func (gmx *Gomuks) sendInitialData(ctx context.Context, conn *websocket.Conn) {
|
||||||
log.Err(err).Msg("Failed to send initial rooms to client")
|
log.Err(err).Msg("Failed to send initial rooms to client")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(rooms) < BatchSize {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
log.Info().Int("room_count", roomCount).Msg("Sent initial rooms to client")
|
log.Info().Int("room_count", roomCount).Msg("Sent initial rooms to client")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue