1// Copyright 2020 The Matrix.org Foundation C.I.C. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package msc2836 'Threading' implements https://github.com/matrix-org/matrix-doc/pull/2836 16package msc2836 17 18import ( 19 "bytes" 20 "context" 21 "crypto/sha256" 22 "encoding/json" 23 "fmt" 24 "io" 25 "net/http" 26 "sort" 27 "strings" 28 "time" 29 30 "github.com/matrix-org/dendrite/clientapi/jsonerror" 31 fs "github.com/matrix-org/dendrite/federationsender/api" 32 "github.com/matrix-org/dendrite/internal/hooks" 33 "github.com/matrix-org/dendrite/internal/httputil" 34 roomserver "github.com/matrix-org/dendrite/roomserver/api" 35 "github.com/matrix-org/dendrite/setup" 36 userapi "github.com/matrix-org/dendrite/userapi/api" 37 "github.com/matrix-org/gomatrixserverlib" 38 "github.com/matrix-org/util" 39) 40 41const ( 42 constRelType = "m.reference" 43) 44 45type EventRelationshipRequest struct { 46 EventID string `json:"event_id"` 47 RoomID string `json:"room_id"` 48 MaxDepth int `json:"max_depth"` 49 MaxBreadth int `json:"max_breadth"` 50 Limit int `json:"limit"` 51 DepthFirst bool `json:"depth_first"` 52 RecentFirst bool `json:"recent_first"` 53 IncludeParent bool `json:"include_parent"` 54 IncludeChildren bool `json:"include_children"` 55 Direction string `json:"direction"` 56 Batch string `json:"batch"` 57} 58 59func NewEventRelationshipRequest(body io.Reader) (*EventRelationshipRequest, error) { 60 var relation EventRelationshipRequest 61 relation.Defaults() 62 if err := json.NewDecoder(body).Decode(&relation); err != nil { 63 return nil, err 64 } 65 return &relation, nil 66} 67 68func (r *EventRelationshipRequest) Defaults() { 69 r.Limit = 100 70 r.MaxBreadth = 10 71 r.MaxDepth = 3 72 r.DepthFirst = false 73 r.RecentFirst = true 74 r.IncludeParent = false 75 r.IncludeChildren = false 76 r.Direction = "down" 77} 78 79type EventRelationshipResponse struct { 80 Events []gomatrixserverlib.ClientEvent `json:"events"` 81 NextBatch string `json:"next_batch"` 82 Limited bool `json:"limited"` 83} 84 85func toClientResponse(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) *EventRelationshipResponse { 86 out := &EventRelationshipResponse{ 87 Events: gomatrixserverlib.ToClientEvents(res.Events, gomatrixserverlib.FormatAll), 88 Limited: res.Limited, 89 NextBatch: res.NextBatch, 90 } 91 return out 92} 93 94// Enable this MSC 95func Enable( 96 base *setup.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI, 97 userAPI userapi.UserInternalAPI, keyRing gomatrixserverlib.JSONVerifier, 98) error { 99 db, err := NewDatabase(&base.Cfg.MSCs.Database) 100 if err != nil { 101 return fmt.Errorf("cannot enable MSC2836: %w", err) 102 } 103 hooks.Enable() 104 hooks.Attach(hooks.KindNewEventPersisted, func(headeredEvent interface{}) { 105 he := headeredEvent.(*gomatrixserverlib.HeaderedEvent) 106 hookErr := db.StoreRelation(context.Background(), he) 107 if hookErr != nil { 108 util.GetLogger(context.Background()).WithError(hookErr).WithField("event_id", he.EventID()).Error( 109 "failed to StoreRelation", 110 ) 111 } 112 // we need to update child metadata here as well as after doing remote /event_relationships requests 113 // so we catch child metadata originating from /send transactions 114 hookErr = db.UpdateChildMetadata(context.Background(), he) 115 if hookErr != nil { 116 util.GetLogger(context.Background()).WithError(err).WithField("event_id", he.EventID()).Warn( 117 "failed to update child metadata for event", 118 ) 119 } 120 }) 121 122 base.PublicClientAPIMux.Handle("/unstable/event_relationships", 123 httputil.MakeAuthAPI("eventRelationships", userAPI, eventRelationshipHandler(db, rsAPI, fsAPI)), 124 ).Methods(http.MethodPost, http.MethodOptions) 125 126 base.PublicFederationAPIMux.Handle("/unstable/event_relationships", httputil.MakeExternalAPI( 127 "msc2836_event_relationships", func(req *http.Request) util.JSONResponse { 128 fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest( 129 req, time.Now(), base.Cfg.Global.ServerName, keyRing, 130 ) 131 if fedReq == nil { 132 return errResp 133 } 134 return federatedEventRelationship(req.Context(), fedReq, db, rsAPI, fsAPI) 135 }, 136 )).Methods(http.MethodPost, http.MethodOptions) 137 return nil 138} 139 140type reqCtx struct { 141 ctx context.Context 142 rsAPI roomserver.RoomserverInternalAPI 143 db Database 144 req *EventRelationshipRequest 145 userID string 146 roomVersion gomatrixserverlib.RoomVersion 147 148 // federated request args 149 isFederatedRequest bool 150 serverName gomatrixserverlib.ServerName 151 fsAPI fs.FederationSenderInternalAPI 152} 153 154func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI) func(*http.Request, *userapi.Device) util.JSONResponse { 155 return func(req *http.Request, device *userapi.Device) util.JSONResponse { 156 relation, err := NewEventRelationshipRequest(req.Body) 157 if err != nil { 158 util.GetLogger(req.Context()).WithError(err).Error("failed to decode HTTP request as JSON") 159 return util.JSONResponse{ 160 Code: 400, 161 JSON: jsonerror.BadJSON(fmt.Sprintf("invalid json: %s", err)), 162 } 163 } 164 rc := reqCtx{ 165 ctx: req.Context(), 166 req: relation, 167 userID: device.UserID, 168 rsAPI: rsAPI, 169 fsAPI: fsAPI, 170 isFederatedRequest: false, 171 db: db, 172 } 173 res, resErr := rc.process() 174 if resErr != nil { 175 return *resErr 176 } 177 178 return util.JSONResponse{ 179 Code: 200, 180 JSON: toClientResponse(res), 181 } 182 } 183} 184 185func federatedEventRelationship( 186 ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI, 187) util.JSONResponse { 188 relation, err := NewEventRelationshipRequest(bytes.NewBuffer(fedReq.Content())) 189 if err != nil { 190 util.GetLogger(ctx).WithError(err).Error("failed to decode HTTP request as JSON") 191 return util.JSONResponse{ 192 Code: 400, 193 JSON: jsonerror.BadJSON(fmt.Sprintf("invalid json: %s", err)), 194 } 195 } 196 rc := reqCtx{ 197 ctx: ctx, 198 req: relation, 199 rsAPI: rsAPI, 200 db: db, 201 // federation args 202 isFederatedRequest: true, 203 fsAPI: fsAPI, 204 serverName: fedReq.Origin(), 205 } 206 res, resErr := rc.process() 207 if resErr != nil { 208 return *resErr 209 } 210 // add auth chain information 211 requiredAuthEventsSet := make(map[string]bool) 212 var requiredAuthEvents []string 213 for _, ev := range res.Events { 214 for _, a := range ev.AuthEventIDs() { 215 if requiredAuthEventsSet[a] { 216 continue 217 } 218 requiredAuthEvents = append(requiredAuthEvents, a) 219 requiredAuthEventsSet[a] = true 220 } 221 } 222 var queryRes roomserver.QueryAuthChainResponse 223 err = rsAPI.QueryAuthChain(ctx, &roomserver.QueryAuthChainRequest{ 224 EventIDs: requiredAuthEvents, 225 }, &queryRes) 226 if err != nil { 227 // they may already have the auth events so don't fail this request 228 util.GetLogger(ctx).WithError(err).Error("Failed to QueryAuthChain") 229 } 230 res.AuthChain = make([]*gomatrixserverlib.Event, len(queryRes.AuthChain)) 231 for i := range queryRes.AuthChain { 232 res.AuthChain[i] = queryRes.AuthChain[i].Unwrap() 233 } 234 235 return util.JSONResponse{ 236 Code: 200, 237 JSON: res, 238 } 239} 240 241func (rc *reqCtx) process() (*gomatrixserverlib.MSC2836EventRelationshipsResponse, *util.JSONResponse) { 242 var res gomatrixserverlib.MSC2836EventRelationshipsResponse 243 var returnEvents []*gomatrixserverlib.HeaderedEvent 244 // Can the user see (according to history visibility) event_id? If no, reject the request, else continue. 245 event := rc.getLocalEvent(rc.req.EventID) 246 if event == nil { 247 event = rc.fetchUnknownEvent(rc.req.EventID, rc.req.RoomID) 248 } 249 if rc.req.RoomID == "" && event != nil { 250 rc.req.RoomID = event.RoomID() 251 } 252 if event == nil || !rc.authorisedToSeeEvent(event) { 253 return nil, &util.JSONResponse{ 254 Code: 403, 255 JSON: jsonerror.Forbidden("Event does not exist or you are not authorised to see it"), 256 } 257 } 258 rc.roomVersion = event.Version() 259 260 // Retrieve the event. Add it to response array. 261 returnEvents = append(returnEvents, event) 262 263 if rc.req.IncludeParent { 264 if parentEvent := rc.includeParent(event); parentEvent != nil { 265 returnEvents = append(returnEvents, parentEvent) 266 } 267 } 268 269 if rc.req.IncludeChildren { 270 remaining := rc.req.Limit - len(returnEvents) 271 if remaining > 0 { 272 children, resErr := rc.includeChildren(rc.db, event.EventID(), remaining, rc.req.RecentFirst) 273 if resErr != nil { 274 return nil, resErr 275 } 276 returnEvents = append(returnEvents, children...) 277 } 278 } 279 280 remaining := rc.req.Limit - len(returnEvents) 281 var walkLimited bool 282 if remaining > 0 { 283 included := make(map[string]bool, len(returnEvents)) 284 for _, ev := range returnEvents { 285 included[ev.EventID()] = true 286 } 287 var events []*gomatrixserverlib.HeaderedEvent 288 events, walkLimited = walkThread( 289 rc.ctx, rc.db, rc, included, remaining, 290 ) 291 returnEvents = append(returnEvents, events...) 292 } 293 res.Events = make([]*gomatrixserverlib.Event, len(returnEvents)) 294 for i, ev := range returnEvents { 295 // for each event, extract the children_count | hash and add it as unsigned data. 296 rc.addChildMetadata(ev) 297 res.Events[i] = ev.Unwrap() 298 } 299 res.Limited = remaining == 0 || walkLimited 300 return &res, nil 301} 302 303// fetchUnknownEvent retrieves an unknown event from the room specified. This server must 304// be joined to the room in question. This has the side effect of injecting surround threaded 305// events into the roomserver. 306func (rc *reqCtx) fetchUnknownEvent(eventID, roomID string) *gomatrixserverlib.HeaderedEvent { 307 if rc.isFederatedRequest || roomID == "" { 308 // we don't do fed hits for fed requests, and we can't ask servers without a room ID! 309 return nil 310 } 311 logger := util.GetLogger(rc.ctx).WithField("room_id", roomID) 312 // if they supplied a room_id, check the room exists. 313 var queryVerRes roomserver.QueryRoomVersionForRoomResponse 314 err := rc.rsAPI.QueryRoomVersionForRoom(rc.ctx, &roomserver.QueryRoomVersionForRoomRequest{ 315 RoomID: roomID, 316 }, &queryVerRes) 317 if err != nil { 318 logger.WithError(err).Warn("failed to query room version for room, does this room exist?") 319 return nil 320 } 321 322 // check the user is joined to that room 323 var queryMemRes roomserver.QueryMembershipForUserResponse 324 err = rc.rsAPI.QueryMembershipForUser(rc.ctx, &roomserver.QueryMembershipForUserRequest{ 325 RoomID: roomID, 326 UserID: rc.userID, 327 }, &queryMemRes) 328 if err != nil { 329 logger.WithError(err).Warn("failed to query membership for user in room") 330 return nil 331 } 332 if !queryMemRes.IsInRoom { 333 return nil 334 } 335 336 // ask one of the servers in the room for the event 337 var queryRes fs.QueryJoinedHostServerNamesInRoomResponse 338 err = rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{ 339 RoomID: roomID, 340 }, &queryRes) 341 if err != nil { 342 logger.WithError(err).Error("failed to QueryJoinedHostServerNamesInRoom") 343 return nil 344 } 345 // query up to 5 servers 346 serversToQuery := queryRes.ServerNames 347 if len(serversToQuery) > 5 { 348 serversToQuery = serversToQuery[:5] 349 } 350 351 // fetch the event, along with some of the surrounding thread (if it's threaded) and the auth chain. 352 // Inject the response into the roomserver to remember the event across multiple calls and to set 353 // unexplored flags correctly. 354 for _, srv := range serversToQuery { 355 res, err := rc.MSC2836EventRelationships(eventID, srv, queryVerRes.RoomVersion) 356 if err != nil { 357 continue 358 } 359 rc.injectResponseToRoomserver(res) 360 for _, ev := range res.Events { 361 if ev.EventID() == eventID { 362 return ev.Headered(ev.Version()) 363 } 364 } 365 } 366 logger.WithField("servers", serversToQuery).Warn("failed to query event relationships") 367 return nil 368} 369 370// If include_parent: true and there is a valid m.relationship field in the event, 371// retrieve the referenced event. Apply history visibility check to that event and if it passes, add it to the response array. 372func (rc *reqCtx) includeParent(childEvent *gomatrixserverlib.HeaderedEvent) (parent *gomatrixserverlib.HeaderedEvent) { 373 parentID, _, _ := parentChildEventIDs(childEvent) 374 if parentID == "" { 375 return nil 376 } 377 return rc.lookForEvent(parentID) 378} 379 380// If include_children: true, lookup all events which have event_id as an m.relationship 381// Apply history visibility checks to all these events and add the ones which pass into the response array, 382// honouring the recent_first flag and the limit. 383func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recentFirst bool) ([]*gomatrixserverlib.HeaderedEvent, *util.JSONResponse) { 384 if rc.hasUnexploredChildren(parentID) { 385 // we need to do a remote request to pull in the children as we are missing them locally. 386 serversToQuery := rc.getServersForEventID(parentID) 387 var result *gomatrixserverlib.MSC2836EventRelationshipsResponse 388 for _, srv := range serversToQuery { 389 res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{ 390 EventID: parentID, 391 Direction: "down", 392 Limit: 100, 393 MaxBreadth: -1, 394 MaxDepth: 1, // we just want the children from this parent 395 RecentFirst: true, 396 }, rc.roomVersion) 397 if err != nil { 398 util.GetLogger(rc.ctx).WithError(err).WithField("server", srv).Error("includeChildren: failed to call MSC2836EventRelationships") 399 } else { 400 result = &res 401 break 402 } 403 } 404 if result != nil { 405 rc.injectResponseToRoomserver(result) 406 } 407 // fallthrough to pull these new events from the DB 408 } 409 children, err := db.ChildrenForParent(rc.ctx, parentID, constRelType, recentFirst) 410 if err != nil { 411 util.GetLogger(rc.ctx).WithError(err).Error("failed to get ChildrenForParent") 412 resErr := jsonerror.InternalServerError() 413 return nil, &resErr 414 } 415 var childEvents []*gomatrixserverlib.HeaderedEvent 416 for _, child := range children { 417 childEvent := rc.lookForEvent(child.EventID) 418 if childEvent != nil { 419 childEvents = append(childEvents, childEvent) 420 } 421 } 422 if len(childEvents) > limit { 423 return childEvents[:limit], nil 424 } 425 return childEvents, nil 426} 427 428// Begin to walk the thread DAG in the direction specified, either depth or breadth first according to the depth_first flag, 429// honouring the limit, max_depth and max_breadth values according to the following rules 430func walkThread( 431 ctx context.Context, db Database, rc *reqCtx, included map[string]bool, limit int, 432) ([]*gomatrixserverlib.HeaderedEvent, bool) { 433 var result []*gomatrixserverlib.HeaderedEvent 434 eventWalker := walker{ 435 ctx: ctx, 436 req: rc.req, 437 db: db, 438 fn: func(wi *walkInfo) bool { 439 // If already processed event, skip. 440 if included[wi.EventID] { 441 return false 442 } 443 444 // If the response array is >= limit, stop. 445 if len(result) >= limit { 446 return true 447 } 448 449 // Process the event. 450 // if event is not found, use remoteEventRelationships to explore that part of the thread remotely. 451 // This will probably be easiest if the event relationships response is directly pumped into the database 452 // so the next walk will do the right thing. This requires those events to be authed and likely injected as 453 // outliers into the roomserver DB, which will de-dupe appropriately. 454 event := rc.lookForEvent(wi.EventID) 455 if event != nil { 456 result = append(result, event) 457 } 458 included[wi.EventID] = true 459 return false 460 }, 461 } 462 limited, err := eventWalker.WalkFrom(rc.req.EventID) 463 if err != nil { 464 util.GetLogger(ctx).WithError(err).Errorf("Failed to WalkFrom %s", rc.req.EventID) 465 } 466 return result, limited 467} 468 469// MSC2836EventRelationships performs an /event_relationships request to a remote server 470func (rc *reqCtx) MSC2836EventRelationships(eventID string, srv gomatrixserverlib.ServerName, ver gomatrixserverlib.RoomVersion) (*gomatrixserverlib.MSC2836EventRelationshipsResponse, error) { 471 res, err := rc.fsAPI.MSC2836EventRelationships(rc.ctx, srv, gomatrixserverlib.MSC2836EventRelationshipsRequest{ 472 EventID: eventID, 473 DepthFirst: rc.req.DepthFirst, 474 Direction: rc.req.Direction, 475 Limit: rc.req.Limit, 476 MaxBreadth: rc.req.MaxBreadth, 477 MaxDepth: rc.req.MaxDepth, 478 RecentFirst: rc.req.RecentFirst, 479 }, ver) 480 if err != nil { 481 util.GetLogger(rc.ctx).WithError(err).Error("Failed to call MSC2836EventRelationships") 482 return nil, err 483 } 484 return &res, nil 485 486} 487 488// authorisedToSeeEvent checks that the user or server is allowed to see this event. Returns true if allowed to 489// see this request. This only needs to be done once per room at present as we just check for joined status. 490func (rc *reqCtx) authorisedToSeeEvent(event *gomatrixserverlib.HeaderedEvent) bool { 491 if rc.isFederatedRequest { 492 // make sure the server is in this room 493 var res fs.QueryJoinedHostServerNamesInRoomResponse 494 err := rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{ 495 RoomID: event.RoomID(), 496 }, &res) 497 if err != nil { 498 util.GetLogger(rc.ctx).WithError(err).Error("authorisedToSeeEvent: failed to QueryJoinedHostServerNamesInRoom") 499 return false 500 } 501 for _, srv := range res.ServerNames { 502 if srv == rc.serverName { 503 return true 504 } 505 } 506 return false 507 } 508 // make sure the user is in this room 509 // Allow events if the member is in the room 510 // TODO: This does not honour history_visibility 511 // TODO: This does not honour m.room.create content 512 var queryMembershipRes roomserver.QueryMembershipForUserResponse 513 err := rc.rsAPI.QueryMembershipForUser(rc.ctx, &roomserver.QueryMembershipForUserRequest{ 514 RoomID: event.RoomID(), 515 UserID: rc.userID, 516 }, &queryMembershipRes) 517 if err != nil { 518 util.GetLogger(rc.ctx).WithError(err).Error("authorisedToSeeEvent: failed to QueryMembershipForUser") 519 return false 520 } 521 return queryMembershipRes.IsInRoom 522} 523 524func (rc *reqCtx) getServersForEventID(eventID string) []gomatrixserverlib.ServerName { 525 if rc.req.RoomID == "" { 526 util.GetLogger(rc.ctx).WithField("event_id", eventID).Error( 527 "getServersForEventID: event exists in unknown room", 528 ) 529 return nil 530 } 531 if rc.roomVersion == "" { 532 util.GetLogger(rc.ctx).WithField("event_id", eventID).Errorf( 533 "getServersForEventID: event exists in %s with unknown room version", rc.req.RoomID, 534 ) 535 return nil 536 } 537 var queryRes fs.QueryJoinedHostServerNamesInRoomResponse 538 err := rc.fsAPI.QueryJoinedHostServerNamesInRoom(rc.ctx, &fs.QueryJoinedHostServerNamesInRoomRequest{ 539 RoomID: rc.req.RoomID, 540 }, &queryRes) 541 if err != nil { 542 util.GetLogger(rc.ctx).WithError(err).Error("getServersForEventID: failed to QueryJoinedHostServerNamesInRoom") 543 return nil 544 } 545 // query up to 5 servers 546 serversToQuery := queryRes.ServerNames 547 if len(serversToQuery) > 5 { 548 serversToQuery = serversToQuery[:5] 549 } 550 return serversToQuery 551} 552 553func (rc *reqCtx) remoteEventRelationships(eventID string) *gomatrixserverlib.MSC2836EventRelationshipsResponse { 554 if rc.isFederatedRequest { 555 return nil // we don't query remote servers for remote requests 556 } 557 serversToQuery := rc.getServersForEventID(eventID) 558 var res *gomatrixserverlib.MSC2836EventRelationshipsResponse 559 var err error 560 for _, srv := range serversToQuery { 561 res, err = rc.MSC2836EventRelationships(eventID, srv, rc.roomVersion) 562 if err != nil { 563 util.GetLogger(rc.ctx).WithError(err).WithField("server", srv).Error("remoteEventRelationships: failed to call MSC2836EventRelationships") 564 } else { 565 break 566 } 567 } 568 return res 569} 570 571// lookForEvent returns the event for the event ID given, by trying to query remote servers 572// if the event ID is unknown via /event_relationships. 573func (rc *reqCtx) lookForEvent(eventID string) *gomatrixserverlib.HeaderedEvent { 574 event := rc.getLocalEvent(eventID) 575 if event == nil { 576 queryRes := rc.remoteEventRelationships(eventID) 577 if queryRes != nil { 578 // inject all the events into the roomserver then return the event in question 579 rc.injectResponseToRoomserver(queryRes) 580 for _, ev := range queryRes.Events { 581 if ev.EventID() == eventID && rc.req.RoomID == ev.RoomID() { 582 return ev.Headered(ev.Version()) 583 } 584 } 585 } 586 } else if rc.hasUnexploredChildren(eventID) { 587 // we have the local event but we may need to do a remote hit anyway if we are exploring the thread and have unknown children. 588 // If we don't do this then we risk never fetching the children. 589 queryRes := rc.remoteEventRelationships(eventID) 590 if queryRes != nil { 591 rc.injectResponseToRoomserver(queryRes) 592 err := rc.db.MarkChildrenExplored(context.Background(), eventID) 593 if err != nil { 594 util.GetLogger(rc.ctx).WithError(err).Warnf("failed to mark children of %s as explored", eventID) 595 } 596 } 597 } 598 if rc.req.RoomID == event.RoomID() { 599 return event 600 } 601 return nil 602} 603 604func (rc *reqCtx) getLocalEvent(eventID string) *gomatrixserverlib.HeaderedEvent { 605 var queryEventsRes roomserver.QueryEventsByIDResponse 606 err := rc.rsAPI.QueryEventsByID(rc.ctx, &roomserver.QueryEventsByIDRequest{ 607 EventIDs: []string{eventID}, 608 }, &queryEventsRes) 609 if err != nil { 610 util.GetLogger(rc.ctx).WithError(err).Error("getLocalEvent: failed to QueryEventsByID") 611 return nil 612 } 613 if len(queryEventsRes.Events) == 0 { 614 util.GetLogger(rc.ctx).WithField("event_id", eventID).Infof("getLocalEvent: event does not exist") 615 return nil // event does not exist 616 } 617 return queryEventsRes.Events[0] 618} 619 620// injectResponseToRoomserver injects the events 621// into the roomserver as KindOutlier, with auth chains. 622func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) { 623 var stateEvents []*gomatrixserverlib.Event 624 var messageEvents []*gomatrixserverlib.Event 625 for _, ev := range res.Events { 626 if ev.StateKey() != nil { 627 stateEvents = append(stateEvents, ev) 628 } else { 629 messageEvents = append(messageEvents, ev) 630 } 631 } 632 respState := gomatrixserverlib.RespState{ 633 AuthEvents: res.AuthChain, 634 StateEvents: stateEvents, 635 } 636 eventsInOrder, err := respState.Events() 637 if err != nil { 638 util.GetLogger(rc.ctx).WithError(err).Error("failed to calculate order to send events in MSC2836EventRelationshipsResponse") 639 return 640 } 641 // everything gets sent as an outlier because auth chain events may be disjoint from the DAG 642 // as may the threaded events. 643 var ires []roomserver.InputRoomEvent 644 for _, outlier := range append(eventsInOrder, messageEvents...) { 645 ires = append(ires, roomserver.InputRoomEvent{ 646 Kind: roomserver.KindOutlier, 647 Event: outlier.Headered(outlier.Version()), 648 AuthEventIDs: outlier.AuthEventIDs(), 649 }) 650 } 651 // we've got the data by this point so use a background context 652 err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires) 653 if err != nil { 654 util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver") 655 } 656 // update the child count / hash columns for these nodes. We need to do this here because not all events will make it 657 // through to the KindNewEventPersisted hook because the roomserver will ignore duplicates. Duplicates have meaning though 658 // as the `unsigned` field may differ (if the number of children changes). 659 for _, ev := range ires { 660 err = rc.db.UpdateChildMetadata(context.Background(), ev.Event) 661 if err != nil { 662 util.GetLogger(rc.ctx).WithError(err).WithField("event_id", ev.Event.EventID()).Warn("failed to update child metadata for event") 663 } 664 } 665} 666 667func (rc *reqCtx) addChildMetadata(ev *gomatrixserverlib.HeaderedEvent) { 668 count, hash := rc.getChildMetadata(ev.EventID()) 669 if count == 0 { 670 return 671 } 672 err := ev.SetUnsignedField("children_hash", gomatrixserverlib.Base64Bytes(hash)) 673 if err != nil { 674 util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children_hash") 675 } 676 err = ev.SetUnsignedField("children", map[string]int{ 677 constRelType: count, 678 }) 679 if err != nil { 680 util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children count") 681 } 682} 683 684func (rc *reqCtx) getChildMetadata(eventID string) (count int, hash []byte) { 685 children, err := rc.db.ChildrenForParent(rc.ctx, eventID, constRelType, false) 686 if err != nil { 687 util.GetLogger(rc.ctx).WithError(err).Warn("Failed to get ChildrenForParent for getting child metadata") 688 return 689 } 690 if len(children) == 0 { 691 return 692 } 693 // sort it lexiographically 694 sort.Slice(children, func(i, j int) bool { 695 return children[i].EventID < children[j].EventID 696 }) 697 // hash it 698 var eventIDs strings.Builder 699 for _, c := range children { 700 _, _ = eventIDs.WriteString(c.EventID) 701 } 702 hashValBytes := sha256.Sum256([]byte(eventIDs.String())) 703 704 count = len(children) 705 hash = hashValBytes[:] 706 return 707} 708 709// hasUnexploredChildren returns true if this event has unexplored children. 710// "An event has unexplored children if the `unsigned` child count on the parent does not match 711// how many children the server believes the parent to have. In addition, if the counts match but 712// the hashes do not match, then the event is unexplored." 713func (rc *reqCtx) hasUnexploredChildren(eventID string) bool { 714 if rc.isFederatedRequest { 715 return false // we only explore children for clients, not servers. 716 } 717 // extract largest child count from event 718 eventCount, eventHash, explored, err := rc.db.ChildMetadata(rc.ctx, eventID) 719 if err != nil { 720 util.GetLogger(rc.ctx).WithError(err).WithField("event_id", eventID).Warn( 721 "failed to get ChildMetadata from db", 722 ) 723 return false 724 } 725 // if there are no recorded children then we know we have >= children. 726 // if the event has already been explored (read: we hit /event_relationships successfully) 727 // then don't do it again. We'll only re-do this if we get an even bigger children count, 728 // see Database.UpdateChildMetadata 729 if eventCount == 0 || explored { 730 return false // short-circuit 731 } 732 733 // calculate child count for event 734 calcCount, calcHash := rc.getChildMetadata(eventID) 735 736 if eventCount < calcCount { 737 return false // we have more children 738 } else if eventCount > calcCount { 739 return true // the event has more children than we know about 740 } 741 // we have the same count, so a mismatched hash means some children are different 742 return !bytes.Equal(eventHash, calcHash) 743} 744 745type walkInfo struct { 746 eventInfo 747 SiblingNumber int 748 Depth int 749} 750 751type walker struct { 752 ctx context.Context 753 req *EventRelationshipRequest 754 db Database 755 fn func(wi *walkInfo) bool // callback invoked for each event walked, return true to terminate the walk 756} 757 758// WalkFrom the event ID given 759func (w *walker) WalkFrom(eventID string) (limited bool, err error) { 760 children, err := w.childrenForParent(eventID) 761 if err != nil { 762 util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() childrenForParent failed, cannot walk") 763 return false, err 764 } 765 var next *walkInfo 766 toWalk := w.addChildren(nil, children, 1) 767 next, toWalk = w.nextChild(toWalk) 768 for next != nil { 769 stop := w.fn(next) 770 if stop { 771 return true, nil 772 } 773 // find the children's children 774 children, err = w.childrenForParent(next.EventID) 775 if err != nil { 776 util.GetLogger(w.ctx).WithError(err).Error("WalkFrom() childrenForParent failed, cannot walk") 777 return false, err 778 } 779 toWalk = w.addChildren(toWalk, children, next.Depth+1) 780 next, toWalk = w.nextChild(toWalk) 781 } 782 783 return false, nil 784} 785 786// addChildren adds an event's children to the to walk data structure 787func (w *walker) addChildren(toWalk []walkInfo, children []eventInfo, depthOfChildren int) []walkInfo { 788 // Check what number child this event is (ordered by recent_first) compared to its parent, does it exceed (greater than) max_breadth? If yes, skip. 789 if len(children) > w.req.MaxBreadth { 790 children = children[:w.req.MaxBreadth] 791 } 792 // Check how deep the event is compared to event_id, does it exceed (greater than) max_depth? If yes, skip. 793 if depthOfChildren > w.req.MaxDepth { 794 return toWalk 795 } 796 797 if w.req.DepthFirst { 798 // the slice is a stack so push them in reverse order so we pop them in the correct order 799 // e.g [3,2,1] => [3,2] , 1 => [3] , 2 => [] , 3 800 for i := len(children) - 1; i >= 0; i-- { 801 toWalk = append(toWalk, walkInfo{ 802 eventInfo: children[i], 803 SiblingNumber: i + 1, // index from 1 804 Depth: depthOfChildren, 805 }) 806 } 807 } else { 808 // the slice is a queue so push them in normal order to we dequeue them in the correct order 809 // e.g [1,2,3] => 1, [2, 3] => 2 , [3] => 3, [] 810 for i := range children { 811 toWalk = append(toWalk, walkInfo{ 812 eventInfo: children[i], 813 SiblingNumber: i + 1, // index from 1 814 Depth: depthOfChildren, 815 }) 816 } 817 } 818 return toWalk 819} 820 821func (w *walker) nextChild(toWalk []walkInfo) (*walkInfo, []walkInfo) { 822 if len(toWalk) == 0 { 823 return nil, nil 824 } 825 var child walkInfo 826 if w.req.DepthFirst { 827 // toWalk is a stack so pop the child off 828 child, toWalk = toWalk[len(toWalk)-1], toWalk[:len(toWalk)-1] 829 return &child, toWalk 830 } 831 // toWalk is a queue so shift the child off 832 child, toWalk = toWalk[0], toWalk[1:] 833 return &child, toWalk 834} 835 836// childrenForParent returns the children events for this event ID, honouring the direction: up|down flags 837// meaning this can actually be returning the parent for the event instead of the children. 838func (w *walker) childrenForParent(eventID string) ([]eventInfo, error) { 839 if w.req.Direction == "down" { 840 return w.db.ChildrenForParent(w.ctx, eventID, constRelType, w.req.RecentFirst) 841 } 842 // find the event to pull out the parent 843 ei, err := w.db.ParentForChild(w.ctx, eventID, constRelType) 844 if err != nil { 845 return nil, err 846 } 847 if ei != nil { 848 return []eventInfo{*ei}, nil 849 } 850 return nil, nil 851} 852