forked from nkuntz1934/matrix-workers
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsync.ts
More file actions
594 lines (510 loc) · 19 KB
/
sync.ts
File metadata and controls
594 lines (510 loc) · 19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
// Matrix sync endpoint
import { Hono } from 'hono';
import type { AppEnv, SyncResponse, JoinedRoom, InvitedRoom, LeftRoom, Env } from '../types';
import { requireAuth } from '../middleware/auth';
import {
getUserRooms,
getRoomState,
getEventsSince,
getLatestStreamPosition,
} from '../services/database';
import { getToDeviceMessages } from './to-device';
import {
getGlobalAccountData,
getRoomAccountData,
} from './account-data';
import { getReceiptsForRoom } from './receipts';
import { getTypingUsers } from './typing';
// ============================================
// Sync Filter Types and Helpers
// ============================================
interface EventFilter {
types?: string[];
not_types?: string[];
senders?: string[];
not_senders?: string[];
limit?: number;
}
interface RoomFilter {
rooms?: string[];
not_rooms?: string[];
timeline?: EventFilter;
state?: EventFilter;
ephemeral?: EventFilter;
account_data?: EventFilter;
include_leave?: boolean;
}
interface SyncFilter {
room?: RoomFilter;
presence?: EventFilter;
account_data?: EventFilter;
event_format?: 'client' | 'federation';
event_fields?: string[];
}
// Load a filter from KV storage or parse inline JSON
async function loadFilter(env: Env, userId: string, filterParam?: string): Promise<SyncFilter | null> {
if (!filterParam) return null;
// Check if it's inline JSON (starts with '{')
if (filterParam.startsWith('{')) {
try {
return JSON.parse(filterParam);
} catch {
console.warn('[sync] Failed to parse inline filter JSON');
return null;
}
}
// Otherwise it's a filter ID - load from KV
const filterJson = await env.CACHE.get(`filter:${userId}:${filterParam}`);
if (!filterJson) {
console.warn('[sync] Filter not found:', filterParam);
return null;
}
try {
return JSON.parse(filterJson);
} catch {
console.warn('[sync] Failed to parse stored filter JSON');
return null;
}
}
// Apply an event filter to a list of events
function applyEventFilter(events: any[], filter?: EventFilter): any[] {
if (!filter) return events;
let result = events.filter(event => {
// Filter by type whitelist
if (filter.types && filter.types.length > 0) {
const matches = filter.types.some(pattern => {
if (pattern.endsWith('*')) {
return event.type.startsWith(pattern.slice(0, -1));
}
return event.type === pattern;
});
if (!matches) return false;
}
// Filter by type blacklist
if (filter.not_types && filter.not_types.length > 0) {
const excluded = filter.not_types.some(pattern => {
if (pattern.endsWith('*')) {
return event.type.startsWith(pattern.slice(0, -1));
}
return event.type === pattern;
});
if (excluded) return false;
}
// Filter by sender whitelist
if (filter.senders && filter.senders.length > 0) {
if (!filter.senders.includes(event.sender)) return false;
}
// Filter by sender blacklist
if (filter.not_senders && filter.not_senders.length > 0) {
if (filter.not_senders.includes(event.sender)) return false;
}
return true;
});
// Apply limit
if (filter.limit && filter.limit > 0) {
result = result.slice(0, filter.limit);
}
return result;
}
// Check if a room should be included based on room filter
function shouldIncludeRoom(roomId: string, filter?: RoomFilter): boolean {
if (!filter) return true;
// Room whitelist
if (filter.rooms && filter.rooms.length > 0) {
if (!filter.rooms.includes(roomId)) return false;
}
// Room blacklist
if (filter.not_rooms && filter.not_rooms.length > 0) {
if (filter.not_rooms.includes(roomId)) return false;
}
return true;
}
// Helper to get one-time key counts for a device
async function getOneTimeKeyCounts(
db: D1Database,
userId: string,
deviceId: string
): Promise<Record<string, number>> {
const counts = await db.prepare(`
SELECT algorithm, COUNT(*) as count
FROM one_time_keys
WHERE user_id = ? AND device_id = ? AND claimed = 0
GROUP BY algorithm
`).bind(userId, deviceId).all<{ algorithm: string; count: number }>();
const result: Record<string, number> = {};
for (const row of counts.results) {
result[row.algorithm] = row.count;
}
return result;
}
// Helper to get unused fallback key types for a device
async function getUnusedFallbackKeyTypes(
db: D1Database,
userId: string,
deviceId: string
): Promise<string[]> {
const keys = await db.prepare(`
SELECT DISTINCT algorithm
FROM fallback_keys
WHERE user_id = ? AND device_id = ? AND used = 0
`).bind(userId, deviceId).all<{ algorithm: string }>();
return keys.results.map(row => row.algorithm);
}
// Helper to get device list changes (users whose keys have changed since last sync)
async function getDeviceListChanges(
db: D1Database,
userId: string,
sincePosition: number
): Promise<{ changed: string[]; left: string[] }> {
// Get users in shared rooms whose device keys have changed
// Note: We now include the user's own changes as well, because
// cross-signing signature uploads need to trigger key refresh
const otherUsersChanged = await db.prepare(`
SELECT DISTINCT dkc.user_id
FROM device_key_changes dkc
WHERE dkc.stream_position > ?
AND dkc.user_id != ?
AND EXISTS (
SELECT 1 FROM room_memberships rm1
JOIN room_memberships rm2 ON rm1.room_id = rm2.room_id
WHERE rm1.user_id = ? AND rm1.membership = 'join'
AND rm2.user_id = dkc.user_id AND rm2.membership = 'join'
)
`).bind(sincePosition, userId, userId).all<{ user_id: string }>();
// Check if the user's own keys have changed (for cross-signing signatures)
const selfChanged = await db.prepare(`
SELECT COUNT(*) as count
FROM device_key_changes dkc
WHERE dkc.stream_position > ?
AND dkc.user_id = ?
`).bind(sincePosition, userId).first<{ count: number }>();
const changedUsers = otherUsersChanged.results.map(row => row.user_id);
// Include self in changed list if own keys updated (for cross-signing verification)
if (selfChanged && selfChanged.count > 0) {
changedUsers.push(userId);
}
// For left, we'd track users who left shared rooms, but for simplicity return empty for now
return {
changed: changedUsers,
left: [],
};
}
const app = new Hono<AppEnv>();
// GET /_matrix/client/v3/sync - Sync with server
// Parse composite sync token: "s{events}_td{to_device}" or legacy plain number
function parseSyncToken(token: string | undefined): { events: number; toDevice: number } {
if (!token) {
return { events: 0, toDevice: 0 };
}
// Try composite format first: s84_td119
const match = token.match(/^s(\d+)_td(\d+)$/);
if (match) {
return { events: parseInt(match[1]), toDevice: parseInt(match[2]) };
}
// Legacy format: plain number (use for both streams for backwards compat)
const num = parseInt(token);
if (!isNaN(num)) {
return { events: num, toDevice: num };
}
return { events: 0, toDevice: 0 };
}
// Build composite sync token
function buildSyncToken(eventsPos: number, toDevicePos: number): string {
return `s${eventsPos}_td${toDevicePos}`;
}
app.get('/_matrix/client/v3/sync', requireAuth(), async (c) => {
const userId = c.get('userId');
const deviceId = c.get('deviceId');
// Parse query parameters
const since = c.req.query('since');
const fullState = c.req.query('full_state') === 'true';
const filterParam = c.req.query('filter');
// Load filter if specified
const filter = await loadFilter(c.env, userId, filterParam);
if (filterParam && !filter) {
console.log('[sync] Using no filter (filter not found or invalid)');
}
// Parse composite sync token (separate positions for events and to-device)
const { events: sincePosition, toDevice: sinceToDevice } = parseSyncToken(since);
// Get current position
const currentPosition = await getLatestStreamPosition(c.env.DB);
// Track to-device position for next_batch
let currentToDevicePos = sinceToDevice;
// If no changes and timeout, wait (using Durable Objects for long-polling)
// For now, just return immediately
// Build sync response (next_batch will be set at the end)
const response: SyncResponse = {
next_batch: '', // Set below
rooms: {
join: {},
invite: {},
leave: {},
},
presence: {
events: [],
},
account_data: {
events: [],
},
to_device: {
events: [],
},
device_one_time_keys_count: {},
device_unused_fallback_key_types: [],
};
// Get to-device messages (E2E encryption key exchange, verification, etc.)
if (deviceId) {
// Pass the to-device specific position for proper acknowledgment
const toDeviceResult = await getToDeviceMessages(c.env.DB, userId, deviceId, String(sinceToDevice));
response.to_device!.events = toDeviceResult.events;
// Update to-device position for next_batch
currentToDevicePos = parseInt(toDeviceResult.nextBatch) || sinceToDevice;
// Get E2E encryption key counts for this device
response.device_one_time_keys_count = await getOneTimeKeyCounts(c.env.DB, userId, deviceId);
response.device_unused_fallback_key_types = await getUnusedFallbackKeyTypes(c.env.DB, userId, deviceId);
// Debug E2EE state for first sync
if (sincePosition === 0) {
console.log('[sync] Initial sync E2EE state for', userId, ':', {
otk_counts: response.device_one_time_keys_count,
fallback_types: response.device_unused_fallback_key_types,
to_device_count: response.to_device!.events.length,
});
}
}
// Get device list changes (users whose keys have changed since last sync)
if (sincePosition > 0) {
const deviceListChanges = await getDeviceListChanges(c.env.DB, userId, sincePosition);
if (deviceListChanges.changed.length > 0 || deviceListChanges.left.length > 0) {
response.device_lists = deviceListChanges;
}
} else {
// For initial sync, include the user's own ID in device_lists.changed
// This tells Element X to fetch device keys immediately, which is important
// for cross-signing verification to work correctly after first login
response.device_lists = {
changed: [userId],
left: [],
};
console.log('[sync] Initial sync - including self in device_lists.changed to trigger key fetch');
}
// Get global account data
// For initial sync (no since token), get all account data
// For incremental sync, only get changed account data since last sync
let globalAccountData = await getGlobalAccountData(
c.env.DB,
userId,
sincePosition > 0 ? sincePosition : undefined
);
// Apply account_data filter to global account data
globalAccountData = applyEventFilter(globalAccountData, filter?.account_data);
response.account_data!.events = globalAccountData;
// Debug: Log global account_data that will be returned (for initial sync)
if (sincePosition === 0) {
console.log('[sync] Initial sync account_data for', userId, ':',
globalAccountData.length > 0 ? globalAccountData.map(e => e.type) : 'none');
}
// Get user's joined rooms
const joinedRoomIds = await getUserRooms(c.env.DB, userId, 'join');
for (const roomId of joinedRoomIds) {
// Check if room should be included based on filter
if (!shouldIncludeRoom(roomId, filter?.room)) {
continue;
}
const joinedRoom: JoinedRoom = {
timeline: {
events: [],
limited: false,
},
state: {
events: [],
},
ephemeral: {
events: [],
},
account_data: {
events: [],
},
};
// Get events since last sync
const events = await getEventsSince(c.env.DB, roomId, sincePosition);
// Separate state and timeline events
let stateEvents: any[] = [];
let timelineEvents: any[] = [];
for (const event of events) {
const clientEvent = {
type: event.type,
state_key: event.state_key,
content: event.content,
sender: event.sender,
origin_server_ts: event.origin_server_ts,
event_id: event.event_id,
room_id: event.room_id,
unsigned: event.unsigned,
};
if (event.state_key !== undefined) {
// State event - include in both state and timeline
stateEvents.push(clientEvent);
}
timelineEvents.push(clientEvent);
}
// Include full state if requested or initial sync
if (fullState || sincePosition === 0) {
const state = await getRoomState(c.env.DB, roomId);
for (const event of state) {
const clientEvent = {
type: event.type,
state_key: event.state_key,
content: event.content,
sender: event.sender,
origin_server_ts: event.origin_server_ts,
event_id: event.event_id,
room_id: event.room_id,
};
// Only add if not already in state events from timeline
if (!stateEvents.find(e => e.event_id === event.event_id)) {
stateEvents.push(clientEvent);
}
}
}
// Apply filters to state and timeline events
stateEvents = applyEventFilter(stateEvents, filter?.room?.state);
timelineEvents = applyEventFilter(timelineEvents, filter?.room?.timeline);
joinedRoom.state!.events = stateEvents;
joinedRoom.timeline!.events = timelineEvents;
joinedRoom.timeline!.prev_batch = sincePosition.toString();
// Get room-level account data
let roomAccountData = await getRoomAccountData(
c.env.DB,
userId,
roomId,
sincePosition > 0 ? sincePosition : undefined
);
// Apply account_data filter to room account data
roomAccountData = applyEventFilter(roomAccountData, filter?.room?.account_data);
joinedRoom.account_data!.events = roomAccountData;
// Get read receipts for this room (from Room Durable Object)
// Pass userId to filter m.read.private receipts (only visible to owner)
const receipts = await getReceiptsForRoom(c.env, roomId, userId);
if (Object.keys(receipts.content).length > 0) {
joinedRoom.ephemeral!.events.push(receipts);
}
// Get typing indicators for this room (from Room Durable Object)
const typingUsers = await getTypingUsers(c.env, roomId);
if (typingUsers.length > 0) {
joinedRoom.ephemeral!.events.push({
type: 'm.typing',
content: { user_ids: typingUsers }
});
}
// Apply ephemeral filter
joinedRoom.ephemeral!.events = applyEventFilter(
joinedRoom.ephemeral!.events,
filter?.room?.ephemeral
);
response.rooms!.join![roomId] = joinedRoom;
}
// Get invited rooms
const invitedRoomIds = await getUserRooms(c.env.DB, userId, 'invite');
for (const roomId of invitedRoomIds) {
// Check if room should be included based on filter
if (!shouldIncludeRoom(roomId, filter?.room)) {
continue;
}
const state = await getRoomState(c.env.DB, roomId);
// Strip state for invited rooms
let strippedState = state.map(event => ({
type: event.type,
state_key: event.state_key!,
content: event.content,
sender: event.sender,
}));
// Apply state filter to invited room state
strippedState = applyEventFilter(strippedState, filter?.room?.state);
const invitedRoom: InvitedRoom = {
invite_state: {
events: strippedState,
},
};
response.rooms!.invite![roomId] = invitedRoom;
}
// Get left rooms (rooms user left since last sync)
// Check if filter allows left rooms (default: false per spec)
const includeLeave = filter?.room?.include_leave ?? false;
if (sincePosition > 0 && (includeLeave || !filter)) {
const leftRoomIds = await getUserRooms(c.env.DB, userId, 'leave');
for (const roomId of leftRoomIds) {
// Check if room should be included based on filter
if (!shouldIncludeRoom(roomId, filter?.room)) {
continue;
}
// Only include if membership changed since last sync
const events = await getEventsSince(c.env.DB, roomId, sincePosition);
const leaveEvent = events.find(
e => e.type === 'm.room.member' && e.state_key === userId
);
if (leaveEvent) {
const leftRoom: LeftRoom = {
timeline: {
events: [
{
type: leaveEvent.type,
state_key: leaveEvent.state_key,
content: leaveEvent.content,
sender: leaveEvent.sender,
origin_server_ts: leaveEvent.origin_server_ts,
event_id: leaveEvent.event_id,
room_id: leaveEvent.room_id,
},
],
},
};
response.rooms!.leave![roomId] = leftRoom;
}
}
}
// Check if there are any changes to return
const hasRoomChanges = Object.keys(response.rooms!.join!).some(roomId => {
const room = response.rooms!.join![roomId];
return room.timeline!.events.length > 0 || room.state!.events.length > 0;
});
const hasInvites = Object.keys(response.rooms!.invite!).length > 0;
const hasLeaves = Object.keys(response.rooms!.leave!).length > 0;
const hasToDevice = response.to_device!.events.length > 0;
const hasAccountData = response.account_data!.events.length > 0;
const hasChanges = hasRoomChanges || hasInvites || hasLeaves || hasToDevice || hasAccountData;
// Parse timeout from query params (default 0 for no wait, max 30s)
const timeout = Math.min(parseInt(c.req.query('timeout') || '0'), 30000);
// If no changes and timeout > 0, wait for events via Durable Object
if (!hasChanges && timeout > 0 && sincePosition > 0) {
console.log('[sync] Entering DO wait for', userId, 'timeout:', timeout);
const syncDO = c.env.SYNC;
const doId = syncDO.idFromName(userId);
const stub = syncDO.get(doId);
// Wait for up to 25s (leave buffer for response)
const waitTimeout = Math.min(timeout, 25000);
const waitResponse = await stub.fetch(new Request('http://internal/wait-for-events', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ timeout: waitTimeout }),
}));
const waitResult = await waitResponse.json() as { hasEvents: boolean };
console.log('[sync] DO wait result for', userId, ':', waitResult);
if (waitResult.hasEvents) {
console.log('[sync] Woken up early - events arrived for', userId);
// New events arrived - return empty response with SAME next_batch
// Client will immediately sync again and get the new events
// We intentionally do NOT advance next_batch here, so the client
// re-syncs from the same position and actually sees the events
}
} else if (timeout > 0 && sincePosition > 0) {
console.log('[sync] Skipping DO wait for', userId, '- hasChanges:', hasChanges,
'roomChanges:', hasRoomChanges, 'invites:', hasInvites, 'leaves:', hasLeaves,
'toDevice:', hasToDevice, 'accountData:', hasAccountData);
}
// Build composite next_batch token with separate positions for each stream
if (!response.next_batch) {
response.next_batch = buildSyncToken(currentPosition, currentToDevicePos);
}
return c.json(response);
});
export default app;