1
0
Fork 0
forked from Mirrors/gomuks

hicli/init: send spaces in first payload

This commit is contained in:
Tulir Asokan 2024-12-28 19:13:21 +02:00
parent 2ea80dac6f
commit 5483b077c7
5 changed files with 72 additions and 27 deletions

View file

@ -27,6 +27,7 @@ const (
FROM room
`
getRoomsBySortingTimestampQuery = getRoomBaseQuery + `WHERE sorting_timestamp < $1 AND sorting_timestamp > 0 ORDER BY sorting_timestamp DESC LIMIT $2`
getRoomsByTypeQuery = getRoomBaseQuery + `WHERE room_type = $1`
getRoomByIDQuery = getRoomBaseQuery + `WHERE room_id = $1`
ensureRoomExistsQuery = `
INSERT INTO room (room_id) VALUES ($1)
@ -96,6 +97,10 @@ func (rq *RoomQuery) GetBySortTS(ctx context.Context, maxTS time.Time, limit int
return rq.QueryMany(ctx, getRoomsBySortingTimestampQuery, maxTS.UnixMilli(), limit)
}
func (rq *RoomQuery) GetAllSpaces(ctx context.Context) ([]*Room, error) {
return rq.QueryMany(ctx, getRoomsByTypeQuery, event.RoomTypeSpace)
}
func (rq *RoomQuery) Upsert(ctx context.Context, room *Room) error {
return rq.Exec(ctx, upsertRoomFromSyncQuery, room.sqlVariables()...)
}

View file

@ -26,13 +26,18 @@ const (
getTopLevelSpaces = `
SELECT space_id
FROM (SELECT DISTINCT(space_id) FROM space_edge) outeredge
LEFT JOIN room_account_data ON
room_account_data.user_id = $1
AND room_account_data.room_id = outeredge.space_id
AND room_account_data.type = 'org.matrix.msc3230.space_order'
WHERE NOT EXISTS(
SELECT 1
FROM space_edge inneredge
INNER JOIN room ON inneredge.space_id = room.room_id
WHERE inneredge.child_id = outeredge.space_id
AND (inneredge.child_event_rowid IS NOT NULL OR inneredge.parent_validated)
)
) AND EXISTS(SELECT 1 FROM room WHERE room_id = space_id AND room_type = 'm.space')
ORDER BY room_account_data.content->>'$.order' NULLS LAST, space_id
`
revalidateAllParents = `
UPDATE space_edge
@ -196,6 +201,12 @@ func (seq *SpaceEdgeQuery) GetAll(ctx context.Context, spaceID id.RoomID) (map[i
return edges, err
}
var roomIDScanner = dbutil.ConvertRowFn[id.RoomID](dbutil.ScanSingleColumn[id.RoomID])
func (seq *SpaceEdgeQuery) GetTopLevelIDs(ctx context.Context, userID id.UserID) ([]id.RoomID, error) {
return roomIDScanner.NewRowIter(seq.GetDB().Query(ctx, getTopLevelSpaces, userID)).AsList()
}
type SpaceEdge struct {
SpaceID id.RoomID `json:"space_id,omitempty"`
ChildID id.RoomID `json:"child_id"`

View file

@ -31,13 +31,14 @@ type SyncNotification struct {
}
type SyncComplete struct {
Since *string `json:"since,omitempty"`
ClearState bool `json:"clear_state,omitempty"`
AccountData map[event.Type]*database.AccountData `json:"account_data"`
Rooms map[id.RoomID]*SyncRoom `json:"rooms"`
LeftRooms []id.RoomID `json:"left_rooms"`
InvitedRooms []*database.InvitedRoom `json:"invited_rooms"`
SpaceEdges map[id.RoomID][]*database.SpaceEdge `json:"space_edges"`
Since *string `json:"since,omitempty"`
ClearState bool `json:"clear_state,omitempty"`
AccountData map[event.Type]*database.AccountData `json:"account_data"`
Rooms map[id.RoomID]*SyncRoom `json:"rooms"`
LeftRooms []id.RoomID `json:"left_rooms"`
InvitedRooms []*database.InvitedRoom `json:"invited_rooms"`
SpaceEdges map[id.RoomID][]*database.SpaceEdge `json:"space_edges"`
TopLevelSpaces []id.RoomID `json:"top_level_spaces"`
}
func (c *SyncComplete) IsEmpty() bool {

View file

@ -66,6 +66,49 @@ func (h *HiClient) getInitialSyncRoom(ctx context.Context, room *database.Room)
func (h *HiClient) GetInitialSync(ctx context.Context, batchSize int) iter.Seq[*SyncComplete] {
return func(yield func(*SyncComplete) bool) {
maxTS := time.Now().Add(1 * time.Hour)
{
spaces, err := h.DB.Room.GetAllSpaces(ctx)
if err != nil {
if ctx.Err() == nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get initial spaces to send to client")
}
return
}
payload := SyncComplete{
Rooms: make(map[id.RoomID]*SyncRoom, len(spaces)),
}
for _, room := range spaces {
payload.Rooms[room.ID] = h.getInitialSyncRoom(ctx, room)
if ctx.Err() != nil {
return
}
}
payload.TopLevelSpaces, err = h.DB.SpaceEdge.GetTopLevelIDs(ctx, h.Account.UserID)
if err != nil {
if ctx.Err() == nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get top-level space IDs to send to client")
}
return
}
payload.SpaceEdges, err = h.DB.SpaceEdge.GetAll(ctx, "")
if err != nil {
if ctx.Err() == nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get space edges to send to client")
}
return
}
payload.InvitedRooms, err = h.DB.InvitedRoom.GetAll(ctx)
if err != nil {
if ctx.Err() == nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get invited rooms to send to client")
}
return
}
payload.ClearState = true
if !yield(&payload) {
return
}
}
for i := 0; ; i++ {
rooms, err := h.DB.Room.GetBySortTS(ctx, maxTS, batchSize)
if err != nil {
@ -77,24 +120,6 @@ func (h *HiClient) GetInitialSync(ctx context.Context, batchSize int) iter.Seq[*
payload := SyncComplete{
Rooms: make(map[id.RoomID]*SyncRoom, len(rooms)),
}
if i == 0 {
payload.InvitedRooms, err = h.DB.InvitedRoom.GetAll(ctx)
if err != nil {
if ctx.Err() == nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get invited rooms to send to client")
}
return
}
// TODO include space rooms in first batch too?
payload.SpaceEdges, err = h.DB.SpaceEdge.GetAll(ctx, "")
if err != nil {
if ctx.Err() == nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get space edges to send to client")
}
return
}
payload.ClearState = true
}
for _, room := range rooms {
if room.SortingTimestamp == rooms[len(rooms)-1].SortingTimestamp {
break
@ -105,7 +130,9 @@ func (h *HiClient) GetInitialSync(ctx context.Context, batchSize int) iter.Seq[*
return
}
}
if !yield(&payload) || len(rooms) < batchSize {
if !yield(&payload) {
return
} else if len(rooms) < batchSize {
break
}
}

View file

@ -92,6 +92,7 @@ export interface SyncCompleteData {
left_rooms: RoomID[] | null
account_data: Record<EventType, DBAccountData> | null
space_edges: Record<RoomID, Omit<DBSpaceEdge, "space_id">[]> | null
top_level_spaces: RoomID[] | null
since?: string
clear_state?: boolean
}