1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package msc2836 'Threading' implements https://github.com/matrix-org/matrix-doc/pull/2836
16package msc2836
17
18import (
19	"bytes"
20	"context"
21	"crypto/sha256"
22	"encoding/json"
23	"fmt"
24	"io"
25	"net/http"
26	"sort"
27	"strings"
28	"time"
29
30	"github.com/matrix-org/dendrite/clientapi/jsonerror"
31	fs "github.com/matrix-org/dendrite/federationsender/api"
32	"github.com/matrix-org/dendrite/internal/hooks"
33	"github.com/matrix-org/dendrite/internal/httputil"
34	roomserver "github.com/matrix-org/dendrite/roomserver/api"
35	"github.com/matrix-org/dendrite/setup"
36	userapi "github.com/matrix-org/dendrite/userapi/api"
37	"github.com/matrix-org/gomatrixserverlib"
38	"github.com/matrix-org/util"
39)
40
41const (
42	constRelType = "m.reference"
43)
44
45type EventRelationshipRequest struct {
46	EventID         string `json:"event_id"`
47	RoomID          string `json:"room_id"`
48	MaxDepth        int    `json:"max_depth"`
49	MaxBreadth      int    `json:"max_breadth"`
50	Limit           int    `json:"limit"`
51	DepthFirst      bool   `json:"depth_first"`
52	RecentFirst     bool   `json:"recent_first"`
53	IncludeParent   bool   `json:"include_parent"`
54	IncludeChildren bool   `json:"include_children"`
55	Direction       string `json:"direction"`
56	Batch           string `json:"batch"`
57}
58
59func NewEventRelationshipRequest(body io.Reader) (*EventRelationshipRequest, error) {
60	var relation EventRelationshipRequest
61	relation.Defaults()
62	if err := json.NewDecoder(body).Decode(&relation); err != nil {
63		return nil, err
64	}
65	return &relation, nil
66}
67
68func (r *EventRelationshipRequest) Defaults() {
69	r.Limit = 100
70	r.MaxBreadth = 10
71	r.MaxDepth = 3
72	r.DepthFirst = false
73	r.RecentFirst = true
74	r.IncludeParent = false
75	r.IncludeChildren = false
76	r.Direction = "down"
77}
78
79type EventRelationshipResponse struct {
80	Events    []gomatrixserverlib.ClientEvent `json:"events"`
81	NextBatch string                          `json:"next_batch"`
82	Limited   bool                            `json:"limited"`
83}
84
85func toClientResponse(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) *EventRelationshipResponse {
86	out := &EventRelationshipResponse{
87		Events:    gomatrixserverlib.ToClientEvents(res.Events, gomatrixserverlib.FormatAll),
88		Limited:   res.Limited,
89		NextBatch: res.NextBatch,
90	}
91	return out
92}
93
94// Enable this MSC
95func Enable(
96	base *setup.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI,
97	userAPI userapi.UserInternalAPI, keyRing gomatrixserverlib.JSONVerifier,
98) error {
99	db, err := NewDatabase(&base.Cfg.MSCs.Database)
100	if err != nil {
101		return fmt.Errorf("cannot enable MSC2836: %w", err)
102	}
103	hooks.Enable()
104	hooks.Attach(hooks.KindNewEventPersisted, func(headeredEvent interface{}) {
105		he := headeredEvent.(*gomatrixserverlib.HeaderedEvent)
106		hookErr := db.StoreRelation(context.Background(), he)
107		if hookErr != nil {
108			util.GetLogger(context.Background()).WithError(hookErr).WithField("event_id", he.EventID()).Error(
109				"failed to StoreRelation",
110			)
111		}
112		// we need to update child metadata here as well as after doing remote /event_relationships requests
113		// so we catch child metadata originating from /send transactions
114		hookErr = db.UpdateChildMetadata(context.Background(), he)
115		if hookErr != nil {
116			util.GetLogger(context.Background()).WithError(err).WithField("event_id", he.EventID()).Warn(
117				"failed to update child metadata for event",
118			)
119		}
120	})
121
122	base.PublicClientAPIMux.Handle("/unstable/event_relationships",
123		httputil.MakeAuthAPI("eventRelationships", userAPI, eventRelationshipHandler(db, rsAPI, fsAPI)),
124	).Methods(http.MethodPost, http.MethodOptions)
125
126	base.PublicFederationAPIMux.Handle("/unstable/event_relationships", httputil.MakeExternalAPI(
127		"msc2836_event_relationships", func(req *http.Request) util.JSONResponse {
128			fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest(
129				req, time.Now(), base.Cfg.Global.ServerName, keyRing,
130			)
131			if fedReq == nil {
132				return errResp
133			}
134			return federatedEventRelationship(req.Context(), fedReq, db, rsAPI, fsAPI)
135		},
136	)).Methods(http.MethodPost, http.MethodOptions)
137	return nil
138}
139
140type reqCtx struct {
141	ctx         context.Context
142	rsAPI       roomserver.RoomserverInternalAPI
143	db          Database
144	req         *EventRelationshipRequest
145	userID      string
146	roomVersion gomatrixserverlib.RoomVersion
147
148	// federated request args
149	isFederatedRequest bool
150	serverName         gomatrixserverlib.ServerName
151	fsAPI              fs.FederationSenderInternalAPI
152}
153
154func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI) func(*http.Request, *userapi.Device) util.JSONResponse {
155	return func(req *http.Request, device *userapi.Device) util.JSONResponse {
156		relation, err := NewEventRelationshipRequest(req.Body)
157		if err != nil {
158			util.GetLogger(req.Context()).WithError(err).Error("failed to decode HTTP request as JSON")
159			return util.JSONResponse{
160				Code: 400,
161				JSON: jsonerror.BadJSON(fmt.Sprintf("invalid json: %s", err)),
162			}
163		}
164		rc := reqCtx{
165			ctx:                req.Context(),
166			req:                relation,
167			userID:             device.UserID,
168			rsAPI:              rsAPI,
169			fsAPI:              fsAPI,
170			isFederatedRequest: false,
171			db:                 db,
172		}
173		res, resErr := rc.process()
174		if resErr != nil {
175			return *resErr
176		}
177
178		return util.JSONResponse{
179			Code: 200,
180			JSON: toClientResponse(res),
181		}
182	}
183}
184
185func federatedEventRelationship(
186	ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI,
187) util.JSONResponse {
188	relation, err := NewEventRelationshipRequest(bytes.NewBuffer(fedReq.Content()))
189	if err != nil {
190		util.GetLogger(ctx).WithError(err).Error("failed to decode HTTP request as JSON")
191		return util.JSONResponse{
192			Code: 400,
193			JSON: jsonerror.BadJSON(fmt.Sprintf("invalid json: %s", err)),
194		}
195	}
196	rc := reqCtx{
197		ctx:   ctx,
198		req:   relation,
199		rsAPI: rsAPI,
200		db:    db,
201		// federation args
202		isFederatedRequest: true,
203		fsAPI:              fsAPI,
204		serverName:         fedReq.Origin(),
205	}
206	res, resErr := rc.process()
207	if resErr != nil {
208		return *resErr
209	}
210	// add auth chain information
211	requiredAuthEventsSet := make(map[string]bool)
212	var requiredAuthEvents []string
213	for _, ev := range res.Events {
214		for _, a := range ev.AuthEventIDs() {
215			if requiredAuthEventsSet[a] {
216				continue
217			}
218			requiredAuthEvents = append(requiredAuthEvents, a)
219			requiredAuthEventsSet[a] = true
220		}
221	}
222	var queryRes roomserver.QueryAuthChainResponse
223	err = rsAPI.QueryAuthChain(ctx, &roomserver.QueryAuthChainRequest{
224		EventIDs: requiredAuthEvents,
225	}, &queryRes)
226	if err != nil {
227		// they may already have the auth events so don't fail this request
228		util.GetLogger(ctx).WithError(err).Error("Failed to QueryAuthChain")
229	}
230	res.AuthChain = make([]*gomatrixserverlib.Event, len(queryRes.AuthChain))
231	for i := range queryRes.AuthChain {
232		res.AuthChain[i] = queryRes.AuthChain[i].Unwrap()
233	}
234
235	return util.JSONResponse{
236		Code: 200,
237		JSON: res,
238	}
239}
240
241func (rc *reqCtx) process() (*gomatrixserverlib.MSC2836EventRelationshipsResponse, *util.JSONResponse) {
242	var res gomatrixserverlib.MSC2836EventRelationshipsResponse
243	var returnEvents []*gomatrixserverlib.HeaderedEvent
244	// Can the user see (according to history visibility) event_id? If no, reject the request, else continue.
245	event := rc.getLocalEvent(rc.req.EventID)
246	if event == nil {
247		event = rc.fetchUnknownEvent(rc.req.EventID, rc.req.RoomID)
248	}
249	if rc.req.RoomID == "" && event != nil {
250		rc.req.RoomID = event.RoomID()
251	}
252	if event == nil || !rc.authorisedToSeeEvent(event) {
253		return nil, &util.JSONResponse{
254			Code: 403,
255			JSON: jsonerror.Forbidden("Event does not exist or you are not authorised to see it"),
256		}
257	}
258	rc.roomVersion = event.Version()
259
260	// Retrieve the event. Add it to response array.
261	returnEvents = append(returnEvents, event)
262
263	if rc.req.IncludeParent {
264		if parentEvent := rc.includeParent(event); parentEvent != nil {
265			returnEvents = append(returnEvents, parentEvent)
266		}
267	}
268
269	if rc.req.IncludeChildren {
270		remaining := rc.req.Limit - len(returnEvents)
271		if remaining > 0 {
272			children, resErr := rc.includeChildren(rc.db, event.EventID(), remaining, rc.req.RecentFirst)
273			if resErr != nil {
274				return nil, resErr
275			}
276			returnEvents = append(returnEvents, children...)
277		}
278	}
279
280	remaining := rc.req.Limit - len(returnEvents)
281	var walkLimited bool
282	if remaining > 0 {
283		included := make(map[string]bool, len(returnEvents))
284		for _, ev := range returnEvents {
285			included[ev.EventID()] = true
286		}
287		var events []*gomatrixserverlib.HeaderedEvent
288		events, walkLimited = walkThread(
289			rc.ctx, rc.db, rc, included, remaining,
290		)
291		returnEvents = append(returnEvents, events...)
292	}
293	res.Events = make([]*gomatrixserverlib.Event, len(returnEvents))
294	for i, ev := range returnEvents {
295		// for each event, extract the children_count | hash and add it as unsigned data.
296		rc.addChildMetadata(ev)
297		res.Events[i] = ev.Unwrap()
298	}
299	res.Limited = remaining == 0 || walkLimited
300	return &res, nil
301}
302
303// fetchUnknownEvent retrieves an unknown event from the room specified. This server must
304// be joined to the room in question. This has the side effect of injecting surround threaded
305// events into the roomserver.
306func (rc *reqCtx) fetchUnknownEvent(eventID, roomID string) *gomatrixserverlib.HeaderedEvent {
307	if rc.isFederatedRequest || roomID == "" {
308		// we don't do fed hits for fed requests, and we can't ask servers without a room ID!
309		return nil
310	}
311	logger := util.GetLogger(rc.ctx).WithField("room_id", roomID)
312	// if they supplied a room_id, check the room exists.
313	var queryVerRes roomserver.QueryRoomVersionForRoomResponse
314	err := rc.rsAPI.QueryRoomVersionForRoom(rc.ctx, &roomserver.QueryRoomVersionForRoomRequest{
315		RoomID: roomID,
316	}, &queryVerRes)
317	if err != nil {
318		logger.WithError(err).Warn("failed to query room version for room, does this room exist?")
319		return nil
320	}
321
322	// check the user is joined to that room
323	var queryMemRes roomserver.QueryMembershipForUserResponse
324	err = rc.rsAPI.QueryMembershipForUser(rc.ctx, &roomserver.QueryMembershipForUserRequest{
325		RoomID: roomID,
326		UserID: rc.userID,
327	}, &queryMemRes)
328	if err != nil {
329		logger.WithError(err).Warn("failed to query membership for user in room")
330		return nil
331	}
332	if !queryMemRes.IsInRoom {
333		return nil
334	}
335
336	// ask one of the servers in the room for the event
337	var queryRes fs.QueryJoinedHostServerNamesInRoomResponse
338	err = rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
339		RoomID: roomID,
340	}, &queryRes)
341	if err != nil {
342		logger.WithError(err).Error("failed to QueryJoinedHostServerNamesInRoom")
343		return nil
344	}
345	// query up to 5 servers
346	serversToQuery := queryRes.ServerNames
347	if len(serversToQuery) > 5 {
348		serversToQuery = serversToQuery[:5]
349	}
350
351	// fetch the event, along with some of the surrounding thread (if it's threaded) and the auth chain.
352	// Inject the response into the roomserver to remember the event across multiple calls and to set
353	// unexplored flags correctly.
354	for _, srv := range serversToQuery {
355		res, err := rc.MSC2836EventRelationships(eventID, srv, queryVerRes.RoomVersion)
356		if err != nil {
357			continue
358		}
359		rc.injectResponseToRoomserver(res)
360		for _, ev := range res.Events {
361			if ev.EventID() == eventID {
362				return ev.Headered(ev.Version())
363			}
364		}
365	}
366	logger.WithField("servers", serversToQuery).Warn("failed to query event relationships")
367	return nil
368}
369
370// If include_parent: true and there is a valid m.relationship field in the event,
371// retrieve the referenced event. Apply history visibility check to that event and if it passes, add it to the response array.
372func (rc *reqCtx) includeParent(childEvent *gomatrixserverlib.HeaderedEvent) (parent *gomatrixserverlib.HeaderedEvent) {
373	parentID, _, _ := parentChildEventIDs(childEvent)
374	if parentID == "" {
375		return nil
376	}
377	return rc.lookForEvent(parentID)
378}
379
380// If include_children: true, lookup all events which have event_id as an m.relationship
381// Apply history visibility checks to all these events and add the ones which pass into the response array,
382// honouring the recent_first flag and the limit.
383func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recentFirst bool) ([]*gomatrixserverlib.HeaderedEvent, *util.JSONResponse) {
384	if rc.hasUnexploredChildren(parentID) {
385		// we need to do a remote request to pull in the children as we are missing them locally.
386		serversToQuery := rc.getServersForEventID(parentID)
387		var result *gomatrixserverlib.MSC2836EventRelationshipsResponse
388		for _, srv := range serversToQuery {
389			res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{
390				EventID:     parentID,
391				Direction:   "down",
392				Limit:       100,
393				MaxBreadth:  -1,
394				MaxDepth:    1, // we just want the children from this parent
395				RecentFirst: true,
396			}, rc.roomVersion)
397			if err != nil {
398				util.GetLogger(rc.ctx).WithError(err).WithField("server", srv).Error("includeChildren: failed to call MSC2836EventRelationships")
399			} else {
400				result = &res
401				break
402			}
403		}
404		if result != nil {
405			rc.injectResponseToRoomserver(result)
406		}
407		// fallthrough to pull these new events from the DB
408	}
409	children, err := db.ChildrenForParent(rc.ctx, parentID, constRelType, recentFirst)
410	if err != nil {
411		util.GetLogger(rc.ctx).WithError(err).Error("failed to get ChildrenForParent")
412		resErr := jsonerror.InternalServerError()
413		return nil, &resErr
414	}
415	var childEvents []*gomatrixserverlib.HeaderedEvent
416	for _, child := range children {
417		childEvent := rc.lookForEvent(child.EventID)
418		if childEvent != nil {
419			childEvents = append(childEvents, childEvent)
420		}
421	}
422	if len(childEvents) > limit {
423		return childEvents[:limit], nil
424	}
425	return childEvents, nil
426}
427
428// Begin to walk the thread DAG in the direction specified, either depth or breadth first according to the depth_first flag,
429// honouring the limit, max_depth and max_breadth values according to the following rules
430func walkThread(
431	ctx context.Context, db Database, rc *reqCtx, included map[string]bool, limit int,
432) ([]*gomatrixserverlib.HeaderedEvent, bool) {
433	var result []*gomatrixserverlib.HeaderedEvent
434	eventWalker := walker{
435		ctx: ctx,
436		req: rc.req,
437		db:  db,
438		fn: func(wi *walkInfo) bool {
439			// If already processed event, skip.
440			if included[wi.EventID] {
441				return false
442			}
443
444			// If the response array is >= limit, stop.
445			if len(result) >= limit {
446				return true
447			}
448
449			// Process the event.
450			// if event is not found, use remoteEventRelationships to explore that part of the thread remotely.
451			// This will probably be easiest if the event relationships response is directly pumped into the database
452			// so the next walk will do the right thing. This requires those events to be authed and likely injected as
453			// outliers into the roomserver DB, which will de-dupe appropriately.
454			event := rc.lookForEvent(wi.EventID)
455			if event != nil {
456				result = append(result, event)
457			}
458			included[wi.EventID] = true
459			return false
460		},
461	}
462	limited, err := eventWalker.WalkFrom(rc.req.EventID)
463	if err != nil {
464		util.GetLogger(ctx).WithError(err).Errorf("Failed to WalkFrom %s", rc.req.EventID)
465	}
466	return result, limited
467}
468
469// MSC2836EventRelationships performs an /event_relationships request to a remote server
470func (rc *reqCtx) MSC2836EventRelationships(eventID string, srv gomatrixserverlib.ServerName, ver gomatrixserverlib.RoomVersion) (*gomatrixserverlib.MSC2836EventRelationshipsResponse, error) {
471	res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{
472		EventID:     eventID,
473		DepthFirst:  rc.req.DepthFirst,
474		Direction:   rc.req.Direction,
475		Limit:       rc.req.Limit,
476		MaxBreadth:  rc.req.MaxBreadth,
477		MaxDepth:    rc.req.MaxDepth,
478		RecentFirst: rc.req.RecentFirst,
479	}, ver)
480	if err != nil {
481		util.GetLogger(rc.ctx).WithError(err).Error("Failed to call MSC2836EventRelationships")
482		return nil, err
483	}
484	return &res, nil
485
486}
487
488// authorisedToSeeEvent checks that the user or server is allowed to see this event. Returns true if allowed to
489// see this request. This only needs to be done once per room at present as we just check for joined status.
490func (rc *reqCtx) authorisedToSeeEvent(event *gomatrixserverlib.HeaderedEvent) bool {
491	if rc.isFederatedRequest {
492		// make sure the server is in this room
493		var res fs.QueryJoinedHostServerNamesInRoomResponse
494		err := rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
495			RoomID: event.RoomID(),
496		}, &res)
497		if err != nil {
498			util.GetLogger(rc.ctx).WithError(err).Error("authorisedToSeeEvent: failed to QueryJoinedHostServerNamesInRoom")
499			return false
500		}
501		for _, srv := range res.ServerNames {
502			if srv == rc.serverName {
503				return true
504			}
505		}
506		return false
507	}
508	// make sure the user is in this room
509	// Allow events if the member is in the room
510	// TODO: This does not honour history_visibility
511	// TODO: This does not honour m.room.create content
512	var queryMembershipRes roomserver.QueryMembershipForUserResponse
513	err := rc.rsAPI.QueryMembershipForUser(rc.ctx, &roomserver.QueryMembershipForUserRequest{
514		RoomID: event.RoomID(),
515		UserID: rc.userID,
516	}, &queryMembershipRes)
517	if err != nil {
518		util.GetLogger(rc.ctx).WithError(err).Error("authorisedToSeeEvent: failed to QueryMembershipForUser")
519		return false
520	}
521	return queryMembershipRes.IsInRoom
522}
523
524func (rc *reqCtx) getServersForEventID(eventID string) []gomatrixserverlib.ServerName {
525	if rc.req.RoomID == "" {
526		util.GetLogger(rc.ctx).WithField("event_id", eventID).Error(
527			"getServersForEventID: event exists in unknown room",
528		)
529		return nil
530	}
531	if rc.roomVersion == "" {
532		util.GetLogger(rc.ctx).WithField("event_id", eventID).Errorf(
533			"getServersForEventID: event exists in %s with unknown room version", rc.req.RoomID,
534		)
535		return nil
536	}
537	var queryRes fs.QueryJoinedHostServerNamesInRoomResponse
538	err := rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{
539		RoomID: rc.req.RoomID,
540	}, &queryRes)
541	if err != nil {
542		util.GetLogger(rc.ctx).WithError(err).Error("getServersForEventID: failed to QueryJoinedHostServerNamesInRoom")
543		return nil
544	}
545	// query up to 5 servers
546	serversToQuery := queryRes.ServerNames
547	if len(serversToQuery) > 5 {
548		serversToQuery = serversToQuery[:5]
549	}
550	return serversToQuery
551}
552
553func (rc *reqCtx) remoteEventRelationships(eventID string) *gomatrixserverlib.MSC2836EventRelationshipsResponse {
554	if rc.isFederatedRequest {
555		return nil // we don't query remote servers for remote requests
556	}
557	serversToQuery := rc.getServersForEventID(eventID)
558	var res *gomatrixserverlib.MSC2836EventRelationshipsResponse
559	var err error
560	for _, srv := range serversToQuery {
561		res, err = rc.MSC2836EventRelationships(eventID, srv, rc.roomVersion)
562		if err != nil {
563			util.GetLogger(rc.ctx).WithError(err).WithField("server", srv).Error("remoteEventRelationships: failed to call MSC2836EventRelationships")
564		} else {
565			break
566		}
567	}
568	return res
569}
570
571// lookForEvent returns the event for the event ID given, by trying to query remote servers
572// if the event ID is unknown via /event_relationships.
573func (rc *reqCtx) lookForEvent(eventID string) *gomatrixserverlib.HeaderedEvent {
574	event := rc.getLocalEvent(eventID)
575	if event == nil {
576		queryRes := rc.remoteEventRelationships(eventID)
577		if queryRes != nil {
578			// inject all the events into the roomserver then return the event in question
579			rc.injectResponseToRoomserver(queryRes)
580			for _, ev := range queryRes.Events {
581				if ev.EventID() == eventID && rc.req.RoomID == ev.RoomID() {
582					return ev.Headered(ev.Version())
583				}
584			}
585		}
586	} else if rc.hasUnexploredChildren(eventID) {
587		// we have the local event but we may need to do a remote hit anyway if we are exploring the thread and have unknown children.
588		// If we don't do this then we risk never fetching the children.
589		queryRes := rc.remoteEventRelationships(eventID)
590		if queryRes != nil {
591			rc.injectResponseToRoomserver(queryRes)
592			err := rc.db.MarkChildrenExplored(context.Background(), eventID)
593			if err != nil {
594				util.GetLogger(rc.ctx).WithError(err).Warnf("failed to mark children of %s as explored", eventID)
595			}
596		}
597	}
598	if rc.req.RoomID == event.RoomID() {
599		return event
600	}
601	return nil
602}
603
604func (rc *reqCtx) getLocalEvent(eventID string) *gomatrixserverlib.HeaderedEvent {
605	var queryEventsRes roomserver.QueryEventsByIDResponse
606	err := rc.rsAPI.QueryEventsByID(rc.ctx, &roomserver.QueryEventsByIDRequest{
607		EventIDs: []string{eventID},
608	}, &queryEventsRes)
609	if err != nil {
610		util.GetLogger(rc.ctx).WithError(err).Error("getLocalEvent: failed to QueryEventsByID")
611		return nil
612	}
613	if len(queryEventsRes.Events) == 0 {
614		util.GetLogger(rc.ctx).WithField("event_id", eventID).Infof("getLocalEvent: event does not exist")
615		return nil // event does not exist
616	}
617	return queryEventsRes.Events[0]
618}
619
620// injectResponseToRoomserver injects the events
621// into the roomserver as KindOutlier, with auth chains.
622func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) {
623	var stateEvents []*gomatrixserverlib.Event
624	var messageEvents []*gomatrixserverlib.Event
625	for _, ev := range res.Events {
626		if ev.StateKey() != nil {
627			stateEvents = append(stateEvents, ev)
628		} else {
629			messageEvents = append(messageEvents, ev)
630		}
631	}
632	respState := gomatrixserverlib.RespState{
633		AuthEvents:  res.AuthChain,
634		StateEvents: stateEvents,
635	}
636	eventsInOrder, err := respState.Events()
637	if err != nil {
638		util.GetLogger(rc.ctx).WithError(err).Error("failed to calculate order to send events in MSC2836EventRelationshipsResponse")
639		return
640	}
641	// everything gets sent as an outlier because auth chain events may be disjoint from the DAG
642	// as may the threaded events.
643	var ires []roomserver.InputRoomEvent
644	for _, outlier := range append(eventsInOrder, messageEvents...) {
645		ires = append(ires, roomserver.InputRoomEvent{
646			Kind:         roomserver.KindOutlier,
647			Event:        outlier.Headered(outlier.Version()),
648			AuthEventIDs: outlier.AuthEventIDs(),
649		})
650	}
651	// we've got the data by this point so use a background context
652	err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires)
653	if err != nil {
654		util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver")
655	}
656	// update the child count / hash columns for these nodes. We need to do this here because not all events will make it
657	// through to the KindNewEventPersisted hook because the roomserver will ignore duplicates. Duplicates have meaning though
658	// as the `unsigned` field may differ (if the number of children changes).
659	for _, ev := range ires {
660		err = rc.db.UpdateChildMetadata(context.Background(), ev.Event)
661		if err != nil {
662			util.GetLogger(rc.ctx).WithError(err).WithField("event_id", ev.Event.EventID()).Warn("failed to update child metadata for event")
663		}
664	}
665}
666
667func (rc *reqCtx) addChildMetadata(ev *gomatrixserverlib.HeaderedEvent) {
668	count, hash := rc.getChildMetadata(ev.EventID())
669	if count == 0 {
670		return
671	}
672	err := ev.SetUnsignedField("children_hash", gomatrixserverlib.Base64Bytes(hash))
673	if err != nil {
674		util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children_hash")
675	}
676	err = ev.SetUnsignedField("children", map[string]int{
677		constRelType: count,
678	})
679	if err != nil {
680		util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children count")
681	}
682}
683
684func (rc *reqCtx) getChildMetadata(eventID string) (count int, hash []byte) {
685	children, err := rc.db.ChildrenForParent(rc.ctx, eventID, constRelType, false)
686	if err != nil {
687		util.GetLogger(rc.ctx).WithError(err).Warn("Failed to get ChildrenForParent for getting child metadata")
688		return
689	}
690	if len(children) == 0 {
691		return
692	}
693	// sort it lexiographically
694	sort.Slice(children, func(i, j int) bool {
695		return children[i].EventID < children[j].EventID
696	})
697	// hash it
698	var eventIDs strings.Builder
699	for _, c := range children {
700		_, _ = eventIDs.WriteString(c.EventID)
701	}
702	hashValBytes := sha256.Sum256([]byte(eventIDs.String()))
703
704	count = len(children)
705	hash = hashValBytes[:]
706	return
707}
708
709// hasUnexploredChildren returns true if this event has unexplored children.
710// "An event has unexplored children if the `unsigned` child count on the parent does not match
711// how many children the server believes the parent to have. In addition, if the counts match but
712// the hashes do not match, then the event is unexplored."
713func (rc *reqCtx) hasUnexploredChildren(eventID string) bool {
714	if rc.isFederatedRequest {
715		return false // we only explore children for clients, not servers.
716	}
717	// extract largest child count from event
718	eventCount, eventHash, explored, err := rc.db.ChildMetadata(rc.ctx, eventID)
719	if err != nil {
720		util.GetLogger(rc.ctx).WithError(err).WithField("event_id", eventID).Warn(
721			"failed to get ChildMetadata from db",
722		)
723		return false
724	}
725	// if there are no recorded children then we know we have >= children.
726	// if the event has already been explored (read: we hit /event_relationships successfully)
727	// then don't do it again. We'll only re-do this if we get an even bigger children count,
728	// see Database.UpdateChildMetadata
729	if eventCount == 0 || explored {
730		return false // short-circuit
731	}
732
733	// calculate child count for event
734	calcCount, calcHash := rc.getChildMetadata(eventID)
735
736	if eventCount < calcCount {
737		return false // we have more children
738	} else if eventCount > calcCount {
739		return true // the event has more children than we know about
740	}
741	// we have the same count, so a mismatched hash means some children are different
742	return !bytes.Equal(eventHash, calcHash)
743}
744
745type walkInfo struct {
746	eventInfo
747	SiblingNumber int
748	Depth         int
749}
750
751type walker struct {
752	ctx context.Context
753	req *EventRelationshipRequest
754	db  Database
755	fn  func(wi *walkInfo) bool // callback invoked for each event walked, return true to terminate the walk
756}
757
758// WalkFrom the event ID given
759func (w *walker) WalkFrom(eventID string) (limited bool, err error) {
760	children, err := w.childrenForParent(eventID)
761	if err != nil {
762		util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() childrenForParent failed, cannot walk")
763		return false, err
764	}
765	var next *walkInfo
766	toWalk := w.addChildren(nil, children, 1)
767	next, toWalk = w.nextChild(toWalk)
768	for next != nil {
769		stop := w.fn(next)
770		if stop {
771			return true, nil
772		}
773		// find the children's children
774		children, err = w.childrenForParent(next.EventID)
775		if err != nil {
776			util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() childrenForParent failed, cannot walk")
777			return false, err
778		}
779		toWalk = w.addChildren(toWalk, children, next.Depth+1)
780		next, toWalk = w.nextChild(toWalk)
781	}
782
783	return false, nil
784}
785
786// addChildren adds an event's children to the to walk data structure
787func (w *walker) addChildren(toWalk []walkInfo, children []eventInfo, depthOfChildren int) []walkInfo {
788	// Check what number child this event is (ordered by recent_first) compared to its parent, does it exceed (greater than) max_breadth? If yes, skip.
789	if len(children) > w.req.MaxBreadth {
790		children = children[:w.req.MaxBreadth]
791	}
792	// Check how deep the event is compared to event_id, does it exceed (greater than) max_depth? If yes, skip.
793	if depthOfChildren > w.req.MaxDepth {
794		return toWalk
795	}
796
797	if w.req.DepthFirst {
798		// the slice is a stack so push them in reverse order so we pop them in the correct order
799		// e.g [3,2,1] => [3,2] , 1 => [3] , 2 => [] , 3
800		for i := len(children) - 1; i >= 0; i-- {
801			toWalk = append(toWalk, walkInfo{
802				eventInfo:     children[i],
803				SiblingNumber: i + 1, // index from 1
804				Depth:         depthOfChildren,
805			})
806		}
807	} else {
808		// the slice is a queue so push them in normal order to we dequeue them in the correct order
809		// e.g [1,2,3] => 1, [2, 3] => 2 , [3] => 3, []
810		for i := range children {
811			toWalk = append(toWalk, walkInfo{
812				eventInfo:     children[i],
813				SiblingNumber: i + 1, // index from 1
814				Depth:         depthOfChildren,
815			})
816		}
817	}
818	return toWalk
819}
820
821func (w *walker) nextChild(toWalk []walkInfo) (*walkInfo, []walkInfo) {
822	if len(toWalk) == 0 {
823		return nil, nil
824	}
825	var child walkInfo
826	if w.req.DepthFirst {
827		// toWalk is a stack so pop the child off
828		child, toWalk = toWalk[len(toWalk)-1], toWalk[:len(toWalk)-1]
829		return &child, toWalk
830	}
831	// toWalk is a queue so shift the child off
832	child, toWalk = toWalk[0], toWalk[1:]
833	return &child, toWalk
834}
835
836// childrenForParent returns the children events for this event ID, honouring the direction: up|down flags
837// meaning this can actually be returning the parent for the event instead of the children.
838func (w *walker) childrenForParent(eventID string) ([]eventInfo, error) {
839	if w.req.Direction == "down" {
840		return w.db.ChildrenForParent(w.ctx, eventID, constRelType, w.req.RecentFirst)
841	}
842	// find the event to pull out the parent
843	ei, err := w.db.ParentForChild(w.ctx, eventID, constRelType)
844	if err != nil {
845		return nil, err
846	}
847	if ei != nil {
848		return []eventInfo{*ei}, nil
849	}
850	return nil, nil
851}
852