hicli/database: store spaces edges

This commit is contained in:
Tulir Asokan 2024-12-28 18:36:41 +02:00
parent 622bc5d804
commit 326b06c702
10 changed files with 533 additions and 27 deletions

View file

@ -17,16 +17,17 @@ import (
type Database struct {
*dbutil.Database
Account AccountQuery
AccountData AccountDataQuery
Room RoomQuery
InvitedRoom InvitedRoomQuery
Event EventQuery
CurrentState CurrentStateQuery
Timeline TimelineQuery
SessionRequest SessionRequestQuery
Receipt ReceiptQuery
Media MediaQuery
Account *AccountQuery
AccountData *AccountDataQuery
Room *RoomQuery
InvitedRoom *InvitedRoomQuery
Event *EventQuery
CurrentState *CurrentStateQuery
Timeline *TimelineQuery
SessionRequest *SessionRequestQuery
Receipt *ReceiptQuery
Media *MediaQuery
SpaceEdge *SpaceEdgeQuery
}
func New(rawDB *dbutil.Database) *Database {
@ -35,16 +36,17 @@ func New(rawDB *dbutil.Database) *Database {
return &Database{
Database: rawDB,
Account: AccountQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newAccount)},
AccountData: AccountDataQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newAccountData)},
Room: RoomQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newRoom)},
InvitedRoom: InvitedRoomQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newInvitedRoom)},
Event: EventQuery{QueryHelper: eventQH},
CurrentState: CurrentStateQuery{QueryHelper: eventQH},
Timeline: TimelineQuery{QueryHelper: eventQH},
SessionRequest: SessionRequestQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newSessionRequest)},
Receipt: ReceiptQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newReceipt)},
Media: MediaQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newMedia)},
Account: &AccountQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newAccount)},
AccountData: &AccountDataQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newAccountData)},
Room: &RoomQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newRoom)},
InvitedRoom: &InvitedRoomQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newInvitedRoom)},
Event: &EventQuery{QueryHelper: eventQH},
CurrentState: &CurrentStateQuery{QueryHelper: eventQH},
Timeline: &TimelineQuery{QueryHelper: eventQH},
SessionRequest: &SessionRequestQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newSessionRequest)},
Receipt: &ReceiptQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newReceipt)},
Media: &MediaQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newMedia)},
SpaceEdge: &SpaceEdgeQuery{QueryHelper: dbutil.MakeQueryHelper(rawDB, newSpaceEdge)},
}
}
@ -79,3 +81,7 @@ func newAccountData(_ *dbutil.QueryHelper[*AccountData]) *AccountData {
func newAccount(_ *dbutil.QueryHelper[*Account]) *Account {
return &Account{}
}
func newSpaceEdge(_ *dbutil.QueryHelper[*SpaceEdge]) *SpaceEdge {
return &SpaceEdge{}
}

View file

@ -34,7 +34,8 @@ const (
`
upsertRoomFromSyncQuery = `
UPDATE room
SET creation_content = COALESCE(room.creation_content, $2),
SET room_type = COALESCE(room.room_type, json($2)->>'$.type'),
creation_content = COALESCE(room.creation_content, $2),
tombstone_content = COALESCE(room.tombstone_content, $3),
name = COALESCE($4, room.name),
name_quality = CASE WHEN $4 IS NOT NULL THEN $5 ELSE room.name_quality END,

250
pkg/hicli/database/space.go Normal file
View file

@ -0,0 +1,250 @@
// Copyright (c) 2024 Tulir Asokan
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package database
import (
"context"
"database/sql"
"go.mau.fi/util/dbutil"
"maunium.net/go/mautrix/id"
)
const (
getAllSpaceChildren = `
SELECT space_id, child_id, depth, child_event_rowid, "order", suggested, parent_event_rowid, canonical, parent_validated
FROM space_edge
WHERE (space_id = $1 OR $1 = '') AND depth IS NOT NULL AND (child_event_rowid IS NOT NULL OR parent_validated)
ORDER BY depth, space_id, "order", child_id
`
// language=sqlite - for some reason GoLand doesn't auto-detect SQL when using WITH RECURSIVE
recalculateAllSpaceChildDepths = `
UPDATE space_edge SET depth = NULL;
WITH RECURSIVE
top_level_spaces AS (
SELECT space_id
FROM (SELECT DISTINCT(space_id) FROM space_edge) outeredge
INNER JOIN room ON outeredge.space_id = room.room_id AND room.room_type = 'm.space'
WHERE NOT EXISTS(
SELECT 1
FROM space_edge inneredge
INNER JOIN room ON inneredge.space_id = room.room_id
WHERE inneredge.child_id=outeredge.space_id
AND (inneredge.child_event_rowid IS NOT NULL OR inneredge.parent_validated)
)
),
children AS (
SELECT space_id, child_id, 1 AS depth, space_id AS path
FROM space_edge
WHERE space_id IN top_level_spaces AND (child_event_rowid IS NOT NULL OR parent_validated)
UNION
SELECT se.space_id, se.child_id, c.depth+1, c.path || se.space_id
FROM space_edge se
INNER JOIN children c ON se.space_id = c.child_id
WHERE instr(c.path, se.space_id) = 0
AND c.depth < 10
AND (child_event_rowid IS NOT NULL OR parent_validated)
)
UPDATE space_edge
SET depth = c.depth
FROM children c
WHERE space_edge.space_id = c.space_id AND space_edge.child_id = c.child_id;
`
revalidateAllParents = `
UPDATE space_edge
SET parent_validated=(SELECT EXISTS(
SELECT 1
FROM room
INNER JOIN current_state cs ON cs.room_id = room.room_id AND cs.event_type = 'm.room.power_levels' AND cs.state_key = ''
INNER JOIN event pls ON cs.event_rowid = pls.rowid
INNER JOIN event edgeevt ON space_edge.parent_event_rowid = edgeevt.rowid
WHERE room.room_id = space_edge.space_id
AND room.room_type = 'm.space'
AND COALESCE(
(
SELECT value
FROM json_each(pls.content, 'users')
WHERE key=edgeevt.sender AND type='integer'
),
pls.content->>'$.users_default',
0
) >= COALESCE(
pls.content->>'$.events."m.space.child"',
pls.content->>'$.state_default',
50
)
))
WHERE parent_event_rowid IS NOT NULL
`
revalidateAllParentsPointingAtSpaceQuery = revalidateAllParents + ` AND space_id=$1`
revalidateAllParentsOfRoomQuery = revalidateAllParents + ` AND child_id=$1`
revalidateSpecificParentQuery = revalidateAllParents + ` AND space_id=$1 AND child_id=$2`
clearSpaceChildrenQuery = `
UPDATE space_edge SET child_event_rowid=NULL, "order"=NULL, suggested=false
WHERE space_id=$1
`
clearSpaceParentsQuery = `
UPDATE space_edge SET parent_event_rowid=NULL, canonical=false, parent_validated=false
WHERE child_id=$1
`
deleteEmptySpaceEdgeRowsQuery = `
DELETE FROM space_edge WHERE child_event_rowid IS NULL AND parent_event_rowid IS NULL
`
addSpaceChildQuery = `
INSERT INTO space_edge (space_id, child_id, child_event_rowid, "order", suggested)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (space_id, child_id) DO UPDATE
SET child_event_rowid=EXCLUDED.child_event_rowid,
"order"=EXCLUDED."order",
suggested=EXCLUDED.suggested
`
addSpaceParentQuery = `
INSERT INTO space_edge (space_id, child_id, parent_event_rowid, canonical)
VALUES ($1, $2, $3, $4)
ON CONFLICT (space_id, child_id) DO UPDATE
SET parent_event_rowid=EXCLUDED.parent_event_rowid,
canonical=EXCLUDED.canonical,
parent_validated=false
`
)
var massInsertSpaceParentBuilder = dbutil.NewMassInsertBuilder[SpaceParentEntry, [1]any](addSpaceParentQuery, "($%d, $1, $%d, $%d)")
var massInsertSpaceChildBuilder = dbutil.NewMassInsertBuilder[SpaceChildEntry, [1]any](addSpaceChildQuery, "($1, $%d, $%d, $%d, $%d)")
type SpaceEdgeQuery struct {
*dbutil.QueryHelper[*SpaceEdge]
}
func (seq *SpaceEdgeQuery) AddChild(ctx context.Context, spaceID, childID id.RoomID, childEventRowID EventRowID, order string, suggested bool) error {
return seq.Exec(ctx, addSpaceChildQuery, spaceID, childID, childEventRowID, order, suggested)
}
func (seq *SpaceEdgeQuery) AddParent(ctx context.Context, spaceID, childID id.RoomID, parentEventRowID EventRowID, canonical bool) error {
return seq.Exec(ctx, addSpaceParentQuery, spaceID, childID, parentEventRowID, canonical)
}
type SpaceParentEntry struct {
ParentID id.RoomID
EventRowID EventRowID
Canonical bool
}
func (spe SpaceParentEntry) GetMassInsertValues() [3]any {
return [...]any{spe.ParentID, spe.EventRowID, spe.Canonical}
}
type SpaceChildEntry struct {
ChildID id.RoomID
EventRowID EventRowID
Order string
Suggested bool
}
func (sce SpaceChildEntry) GetMassInsertValues() [4]any {
return [...]any{sce.ChildID, sce.EventRowID, sce.Order, sce.Suggested}
}
func (seq *SpaceEdgeQuery) SetChildren(ctx context.Context, spaceID id.RoomID, children []SpaceChildEntry, removedChildren []id.RoomID, clear bool) error {
if clear {
err := seq.Exec(ctx, clearSpaceChildrenQuery, spaceID)
if err != nil {
return err
}
} else {
}
if len(removedChildren) > 0 {
err := seq.Exec(ctx, deleteEmptySpaceEdgeRowsQuery, spaceID)
if err != nil {
return err
}
}
if len(children) == 0 {
return nil
}
query, params := massInsertSpaceChildBuilder.Build([1]any{spaceID}, children)
return seq.Exec(ctx, query, params)
}
func (seq *SpaceEdgeQuery) SetParents(ctx context.Context, childID id.RoomID, parents []SpaceParentEntry, removedParents []id.RoomID, clear bool) error {
if clear {
err := seq.Exec(ctx, clearSpaceParentsQuery, childID)
if err != nil {
return err
}
}
if len(removedParents) > 0 {
err := seq.Exec(ctx, deleteEmptySpaceEdgeRowsQuery)
if err != nil {
return err
}
}
if len(parents) == 0 {
return nil
}
query, params := massInsertSpaceParentBuilder.Build([1]any{childID}, parents)
return seq.Exec(ctx, query, params)
}
func (seq *SpaceEdgeQuery) RevalidateAllChildrenOfParentValidity(ctx context.Context, spaceID id.RoomID) error {
return seq.Exec(ctx, revalidateAllParentsPointingAtSpaceQuery, spaceID)
}
func (seq *SpaceEdgeQuery) RevalidateAllParentsOfRoomValidity(ctx context.Context, childID id.RoomID) error {
return seq.Exec(ctx, revalidateAllParentsOfRoomQuery, childID)
}
func (seq *SpaceEdgeQuery) RevalidateSpecificParentValidity(ctx context.Context, spaceID, childID id.RoomID) error {
return seq.Exec(ctx, revalidateSpecificParentQuery, spaceID, childID)
}
func (seq *SpaceEdgeQuery) RecalculateAllChildDepths(ctx context.Context) error {
return seq.Exec(ctx, recalculateAllSpaceChildDepths)
}
func (seq *SpaceEdgeQuery) GetAll(ctx context.Context, spaceID id.RoomID) (map[id.RoomID][]*SpaceEdge, error) {
edges := make(map[id.RoomID][]*SpaceEdge)
err := seq.QueryManyIter(ctx, getAllSpaceChildren, spaceID).Iter(func(edge *SpaceEdge) (bool, error) {
edges[edge.SpaceID] = append(edges[edge.SpaceID], edge)
edge.SpaceID = ""
if !edge.ParentValidated {
edge.ParentEventRowID = 0
edge.Canonical = false
}
return true, nil
})
return edges, err
}
type SpaceEdge struct {
SpaceID id.RoomID `json:"space_id,omitempty"`
ChildID id.RoomID `json:"child_id"`
Depth int `json:"-"`
ChildEventRowID EventRowID `json:"child_event_rowid,omitempty"`
Order string `json:"order,omitempty"`
Suggested bool `json:"suggested,omitempty"`
ParentEventRowID EventRowID `json:"parent_event_rowid,omitempty"`
Canonical bool `json:"canonical,omitempty"`
ParentValidated bool `json:"-"`
}
func (se *SpaceEdge) Scan(row dbutil.Scannable) (*SpaceEdge, error) {
var childRowID, parentRowID sql.NullInt64
err := row.Scan(
&se.SpaceID, &se.ChildID, &se.Depth,
&childRowID, &se.Order, &se.Suggested,
&parentRowID, &se.Canonical, &se.ParentValidated,
)
if err != nil {
return nil, err
}
se.ChildEventRowID = EventRowID(childRowID.Int64)
se.ParentEventRowID = EventRowID(parentRowID.Int64)
return se, nil
}

View file

@ -1,4 +1,4 @@
-- v0 -> v9 (compatible with v5+): Latest revision
-- v0 -> v10 (compatible with v10+): Latest revision
CREATE TABLE account (
user_id TEXT NOT NULL PRIMARY KEY,
device_id TEXT NOT NULL,
@ -10,6 +10,7 @@ CREATE TABLE account (
CREATE TABLE room (
room_id TEXT NOT NULL PRIMARY KEY,
room_type TEXT,
creation_content TEXT,
tombstone_content TEXT,
@ -35,7 +36,7 @@ CREATE TABLE room (
CONSTRAINT room_preview_event_fkey FOREIGN KEY (preview_event_rowid) REFERENCES event (rowid) ON DELETE SET NULL
) STRICT;
CREATE INDEX room_type_idx ON room (creation_content ->> 'type');
CREATE INDEX room_type_idx ON room (room_type);
CREATE INDEX room_sorting_timestamp_idx ON room (sorting_timestamp DESC);
CREATE INDEX room_preview_idx ON room (preview_event_rowid);
-- CREATE INDEX room_sorting_timestamp_idx ON room (unread_notifications > 0);
@ -278,3 +279,25 @@ CREATE TABLE receipt (
CONSTRAINT receipt_room_fkey FOREIGN KEY (room_id) REFERENCES room (room_id) ON DELETE CASCADE
-- note: there's no foreign key on event ID because receipts could point at events that are too far in history.
) STRICT;
CREATE TABLE space_edge (
space_id TEXT NOT NULL,
child_id TEXT NOT NULL,
depth INTEGER,
-- m.space.child fields
child_event_rowid INTEGER,
"order" TEXT NOT NULL DEFAULT '',
suggested INTEGER NOT NULL DEFAULT false CHECK ( suggested IN (false, true) ),
-- m.space.parent fields
parent_event_rowid INTEGER,
canonical INTEGER NOT NULL DEFAULT false CHECK ( canonical IN (false, true) ),
parent_validated INTEGER NOT NULL DEFAULT false CHECK ( parent_validated IN (false, true) ),
PRIMARY KEY (space_id, child_id),
CONSTRAINT space_edge_child_event_fkey FOREIGN KEY (child_event_rowid) REFERENCES event (rowid) ON DELETE CASCADE,
CONSTRAINT space_edge_parent_event_fkey FOREIGN KEY (parent_event_rowid) REFERENCES event (rowid) ON DELETE CASCADE,
CONSTRAINT space_edge_child_event_unique UNIQUE (child_event_rowid),
CONSTRAINT space_edge_parent_event_unique UNIQUE (parent_event_rowid)
) STRICT;
CREATE INDEX space_edge_child_idx ON space_edge (child_id);

View file

@ -0,0 +1,113 @@
-- v10 (compatible with v10+): Add support for spaces
ALTER TABLE room ADD COLUMN room_type TEXT;
UPDATE room SET room_type=COALESCE(creation_content->>'$.type', '');
DROP INDEX room_type_idx;
CREATE INDEX room_type_idx ON room (room_type);
CREATE TABLE space_edge (
space_id TEXT NOT NULL,
child_id TEXT NOT NULL,
depth INTEGER,
-- m.space.child fields
child_event_rowid INTEGER,
"order" TEXT NOT NULL DEFAULT '',
suggested INTEGER NOT NULL DEFAULT false CHECK ( suggested IN (false, true) ),
-- m.space.parent fields
parent_event_rowid INTEGER,
canonical INTEGER NOT NULL DEFAULT false CHECK ( canonical IN (false, true) ),
parent_validated INTEGER NOT NULL DEFAULT false CHECK ( parent_validated IN (false, true) ),
PRIMARY KEY (space_id, child_id),
CONSTRAINT space_edge_child_event_fkey FOREIGN KEY (child_event_rowid) REFERENCES event (rowid) ON DELETE CASCADE,
CONSTRAINT space_edge_parent_event_fkey FOREIGN KEY (parent_event_rowid) REFERENCES event (rowid) ON DELETE CASCADE,
CONSTRAINT space_edge_child_event_unique UNIQUE (child_event_rowid),
CONSTRAINT space_edge_parent_event_unique UNIQUE (parent_event_rowid)
) STRICT;
CREATE INDEX space_edge_child_idx ON space_edge (child_id);
INSERT INTO space_edge (space_id, child_id, child_event_rowid, "order", suggested)
SELECT
event.room_id,
event.state_key,
event.rowid,
CASE WHEN typeof(content->>'$.order')='TEXT' THEN content->>'$.order' ELSE '' END,
CASE WHEN json_type(content, '$.suggested') IN ('true', 'false') THEN content->>'$.suggested' ELSE false END
FROM current_state
INNER JOIN event ON current_state.event_rowid = event.rowid
LEFT JOIN room ON current_state.room_id = room.room_id
WHERE type = 'm.space.child'
AND json_array_length(event.content, '$.via') > 0
AND event.state_key LIKE '!%'
AND (room.room_id IS NULL OR room.room_type = 'm.space');
INSERT INTO space_edge (space_id, child_id, parent_event_rowid, canonical)
SELECT
event.state_key,
event.room_id,
event.rowid,
CASE WHEN json_type(content, '$.canonical') IN ('true', 'false') THEN content->>'$.canonical' ELSE false END
FROM current_state
INNER JOIN event ON current_state.event_rowid = event.rowid
LEFT JOIN room ON event.state_key = room.room_id
WHERE type = 'm.space.parent'
AND json_array_length(event.content, '$.via') > 0
AND event.state_key LIKE '!%'
AND (room.room_id IS NULL OR room.room_type = 'm.space')
ON CONFLICT (space_id, child_id) DO UPDATE
SET parent_event_rowid = excluded.parent_event_rowid,
canonical = excluded.canonical;
UPDATE space_edge
SET parent_validated=(SELECT EXISTS(
SELECT 1
FROM room
INNER JOIN current_state cs ON cs.room_id = room.room_id AND cs.event_type = 'm.room.power_levels' AND cs.state_key = ''
INNER JOIN event pls ON cs.event_rowid = pls.rowid
INNER JOIN event edgeevt ON space_edge.parent_event_rowid = edgeevt.rowid
WHERE room.room_id = space_edge.space_id
AND room.room_type = 'm.space'
AND COALESCE(
(
SELECT value
FROM json_each(pls.content, '$.users')
WHERE key=edgeevt.sender AND type='integer'
),
pls.content->>'$.users_default',
0
) >= COALESCE(
pls.content->>'$.events."m.space.child"',
pls.content->>'$.state_default',
50
)
))
WHERE parent_event_rowid IS NOT NULL;
WITH RECURSIVE
top_level_spaces AS (
SELECT space_id
FROM (SELECT DISTINCT(space_id) FROM space_edge) outeredge
WHERE NOT EXISTS(
SELECT 1
FROM space_edge inneredge
INNER JOIN room ON inneredge.space_id = room.room_id
WHERE inneredge.child_id=outeredge.space_id
AND (inneredge.child_event_rowid IS NOT NULL OR inneredge.parent_validated)
)
),
children AS (
SELECT space_id, child_id, 1 AS depth, space_id AS path
FROM space_edge
WHERE space_id IN top_level_spaces AND (child_event_rowid IS NOT NULL OR parent_validated)
UNION
SELECT se.space_id, se.child_id, c.depth+1, c.path || se.space_id
FROM space_edge se
INNER JOIN children c ON se.space_id=c.child_id
WHERE instr(c.path, se.space_id)=0
AND c.depth < 10
AND (child_event_rowid IS NOT NULL OR parent_validated)
)
UPDATE space_edge
SET depth = c.depth
FROM children c
WHERE space_edge.space_id = c.space_id AND space_edge.child_id = c.child_id;

View file

@ -37,6 +37,7 @@ type SyncComplete struct {
Rooms map[id.RoomID]*SyncRoom `json:"rooms"`
LeftRooms []id.RoomID `json:"left_rooms"`
InvitedRooms []*database.InvitedRoom `json:"invited_rooms"`
SpaceEdges map[id.RoomID][]*database.SpaceEdge `json:"space_edges"`
}
func (c *SyncComplete) IsEmpty() bool {

View file

@ -91,11 +91,21 @@ func (h *HiClient) GetInitialSync(ctx context.Context, batchSize int) iter.Seq[*
}
return
}
payload.SpaceEdges, err = h.DB.SpaceEdge.GetAll(ctx, "")
if err != nil {
if ctx.Err() == nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get space edges to send to client")
}
return
}
payload.ClearState = true
}
if payload.InvitedRooms == nil {
payload.InvitedRooms = make([]*database.InvitedRoom, 0)
}
if payload.SpaceEdges == nil {
payload.SpaceEdges = make(map[id.RoomID][]*database.SpaceEdge)
}
for _, room := range rooms {
if room.SortingTimestamp == rooms[len(rooms)-1].SortingTimestamp {
break

View file

@ -121,13 +121,14 @@ func (h *HiClient) processGetRoomState(ctx context.Context, roomID id.RoomID, fe
if err != nil {
return fmt.Errorf("failed to save events: %w", err)
}
sdc := &spaceDataCollector{}
for i := range currentStateEntries {
currentStateEntries[i].EventRowID = dbEvts[i].RowID
if mediaReferenceEntries[i] != nil {
mediaReferenceEntries[i].EventRowID = dbEvts[i].RowID
}
if evts[i].Type != event.StateMember {
processImportantEvent(ctx, evts[i], room, updatedRoom)
processImportantEvent(ctx, evts[i], room, updatedRoom, dbEvts[i].RowID, sdc)
}
}
err = h.DB.Media.AddMany(ctx, mediaCacheEntries)
@ -146,6 +147,10 @@ func (h *HiClient) processGetRoomState(ctx context.Context, roomID id.RoomID, fe
return fmt.Errorf("failed to save current state entries: %w", err)
}
roomChanged := updatedRoom.CheckChangesAndCopyInto(room)
err = sdc.Apply(ctx, room, h.DB.SpaceEdge)
if err != nil {
return err
}
if roomChanged {
err = h.DB.Room.Upsert(ctx, updatedRoom)
if err != nil {
@ -168,6 +173,8 @@ func (h *HiClient) processGetRoomState(ctx context.Context, roomID id.RoomID, fe
InvitedRooms: make([]*database.InvitedRoom, 0),
AccountData: make(map[event.Type]*database.AccountData),
LeftRooms: make([]id.RoomID, 0),
// TODO dispatch space edge changes if something changed? (fairly unlikely though)
SpaceEdges: make(map[id.RoomID][]*database.SpaceEdge),
})
}
}

View file

@ -667,6 +667,7 @@ func (h *HiClient) processStateAndTimeline(
updatedRoom.LazyLoadSummary = summary
heroesChanged = true
}
sdc := &spaceDataCollector{}
decryptionQueue := make(map[id.SessionID]*database.SessionRequest)
allNewEvents := make([]*database.Event, 0, len(state.Events)+len(timeline.Events))
addedEvents := make(map[database.EventRowID]struct{})
@ -750,7 +751,7 @@ func (h *HiClient) processStateAndTimeline(
if err != nil {
return -1, fmt.Errorf("failed to save current state event ID %s for %s/%s: %w", evt.ID, evt.Type.Type, *evt.StateKey, err)
}
processImportantEvent(ctx, evt, room, updatedRoom)
processImportantEvent(ctx, evt, room, updatedRoom, dbEvt.RowID, sdc)
}
allNewEvents = append(allNewEvents, dbEvt)
addedEvents[dbEvt.RowID] = struct{}{}
@ -932,6 +933,10 @@ func (h *HiClient) processStateAndTimeline(
return fmt.Errorf("failed to save room data: %w", err)
}
}
err = sdc.Apply(ctx, room, h.DB.SpaceEdge)
if err != nil {
return err
}
// TODO why is *old* unread count sometimes zero when processing the read receipt that is making it zero?
if roomChanged || len(accountData) > 0 || len(newOwnReceipts) > 0 || len(receipts) > 0 || len(timelineRowTuples) > 0 || len(allNewEvents) > 0 {
for _, receipt := range receipts {
@ -1033,13 +1038,101 @@ func intPtrEqual(a, b *int) bool {
return *a == *b
}
func processImportantEvent(ctx context.Context, evt *event.Event, existingRoomData, updatedRoom *database.Room) (roomDataChanged bool) {
type spaceDataCollector struct {
Children []database.SpaceChildEntry
Parents []database.SpaceParentEntry
RemovedChildren []id.RoomID
RemovedParents []id.RoomID
PowerLevelChanged bool
IsFullState bool
}
func (sdc *spaceDataCollector) Collect(evt *event.Event, rowID database.EventRowID) {
switch evt.Type {
case event.StatePowerLevels:
sdc.PowerLevelChanged = true
case event.StateCreate:
sdc.IsFullState = true
case event.StateSpaceChild:
content := evt.Content.AsSpaceChild()
if len(content.Via) == 0 {
sdc.RemovedChildren = append(sdc.RemovedChildren, id.RoomID(*evt.StateKey))
} else {
sdc.Children = append(sdc.Children, database.SpaceChildEntry{
ChildID: id.RoomID(*evt.StateKey),
EventRowID: rowID,
Order: content.Order,
Suggested: content.Suggested,
})
}
case event.StateSpaceParent:
content := evt.Content.AsSpaceParent()
if len(content.Via) == 0 {
sdc.RemovedParents = append(sdc.RemovedParents, id.RoomID(*evt.StateKey))
} else {
sdc.Parents = append(sdc.Parents, database.SpaceParentEntry{
ParentID: id.RoomID(*evt.StateKey),
EventRowID: rowID,
Canonical: content.Canonical,
})
}
}
}
func (sdc *spaceDataCollector) Apply(ctx context.Context, room *database.Room, seq *database.SpaceEdgeQuery) error {
if room.CreationContent == nil || room.CreationContent.Type != event.RoomTypeSpace {
sdc.Children = nil
sdc.RemovedChildren = nil
sdc.PowerLevelChanged = false
}
if len(sdc.Children) == 0 && len(sdc.RemovedChildren) == 0 &&
len(sdc.Parents) == 0 && len(sdc.RemovedParents) == 0 &&
!sdc.PowerLevelChanged {
return nil
}
return seq.GetDB().DoTxn(ctx, nil, func(ctx context.Context) error {
if len(sdc.Children) > 0 || len(sdc.RemovedChildren) > 0 {
err := seq.SetChildren(ctx, room.ID, sdc.Children, sdc.RemovedChildren, sdc.IsFullState)
if err != nil {
return fmt.Errorf("failed to set space children: %w", err)
}
}
if len(sdc.Parents) > 0 || len(sdc.RemovedParents) > 0 {
err := seq.SetParents(ctx, room.ID, sdc.Parents, sdc.RemovedParents, sdc.IsFullState)
if err != nil {
return fmt.Errorf("failed to set space parents: %w", err)
}
if len(sdc.Parents) > 0 {
err = seq.RevalidateAllParentsOfRoomValidity(ctx, room.ID)
if err != nil {
return fmt.Errorf("failed to revalidate own parent references: %w", err)
}
}
}
if sdc.PowerLevelChanged {
err := seq.RevalidateAllChildrenOfParentValidity(ctx, room.ID)
if err != nil {
return fmt.Errorf("failed to revalidate child parent references to self: %w", err)
}
}
return nil
})
}
func processImportantEvent(
ctx context.Context,
evt *event.Event,
existingRoomData, updatedRoom *database.Room,
rowID database.EventRowID,
sdc *spaceDataCollector,
) (roomDataChanged bool) {
if evt.StateKey == nil {
return
}
switch evt.Type {
case event.StateCreate, event.StateTombstone, event.StateRoomName, event.StateCanonicalAlias,
event.StateRoomAvatar, event.StateTopic, event.StateEncryption:
event.StateRoomAvatar, event.StateTopic, event.StateEncryption,
event.StateSpaceChild, event.StateSpaceParent, event.StatePowerLevels:
if *evt.StateKey != "" {
return
}
@ -1047,6 +1140,7 @@ func processImportantEvent(ctx context.Context, evt *event.Event, existingRoomDa
return
}
err := evt.Content.ParseRaw(evt.Type)
sdc.Collect(evt, rowID)
if err != nil && !errors.Is(err, event.ErrContentAlreadyParsed) {
zerolog.Ctx(ctx).Warn().Err(err).
Stringer("event_type", &evt.Type).

View file

@ -38,6 +38,7 @@ func (h *hiSyncer) ProcessResponse(ctx context.Context, resp *mautrix.RespSync,
Rooms: make(map[id.RoomID]*SyncRoom, len(resp.Rooms.Join)),
InvitedRooms: make([]*database.InvitedRoom, 0, len(resp.Rooms.Invite)),
LeftRooms: make([]id.RoomID, 0, len(resp.Rooms.Leave)),
SpaceEdges: make(map[id.RoomID][]*database.SpaceEdge),
}})
err := c.preProcessSyncResponse(ctx, resp, since)
if err != nil {