1// Copyright 2017 Vector Creations Ltd 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package consumers 16 17import ( 18 "context" 19 "encoding/json" 20 "fmt" 21 22 "github.com/Shopify/sarama" 23 "github.com/getsentry/sentry-go" 24 "github.com/matrix-org/dendrite/internal" 25 "github.com/matrix-org/dendrite/roomserver/api" 26 "github.com/matrix-org/dendrite/setup/config" 27 "github.com/matrix-org/dendrite/setup/process" 28 "github.com/matrix-org/dendrite/syncapi/notifier" 29 "github.com/matrix-org/dendrite/syncapi/storage" 30 "github.com/matrix-org/dendrite/syncapi/types" 31 "github.com/matrix-org/gomatrixserverlib" 32 log "github.com/sirupsen/logrus" 33) 34 35// OutputRoomEventConsumer consumes events that originated in the room server. 36type OutputRoomEventConsumer struct { 37 cfg *config.SyncAPI 38 rsAPI api.RoomserverInternalAPI 39 rsConsumer *internal.ContinualConsumer 40 db storage.Database 41 pduStream types.StreamProvider 42 inviteStream types.StreamProvider 43 notifier *notifier.Notifier 44} 45 46// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. 47func NewOutputRoomEventConsumer( 48 process *process.ProcessContext, 49 cfg *config.SyncAPI, 50 kafkaConsumer sarama.Consumer, 51 store storage.Database, 52 notifier *notifier.Notifier, 53 pduStream types.StreamProvider, 54 inviteStream types.StreamProvider, 55 rsAPI api.RoomserverInternalAPI, 56) *OutputRoomEventConsumer { 57 58 consumer := internal.ContinualConsumer{ 59 Process: process, 60 ComponentName: "syncapi/roomserver", 61 Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), 62 Consumer: kafkaConsumer, 63 PartitionStore: store, 64 } 65 s := &OutputRoomEventConsumer{ 66 cfg: cfg, 67 rsConsumer: &consumer, 68 db: store, 69 notifier: notifier, 70 pduStream: pduStream, 71 inviteStream: inviteStream, 72 rsAPI: rsAPI, 73 } 74 consumer.ProcessMessage = s.onMessage 75 76 return s 77} 78 79// Start consuming from room servers 80func (s *OutputRoomEventConsumer) Start() error { 81 return s.rsConsumer.Start() 82} 83 84// onMessage is called when the sync server receives a new event from the room server output log. 85// It is not safe for this function to be called from multiple goroutines, or else the 86// sync stream position may race and be incorrectly calculated. 87func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { 88 // Parse out the event JSON 89 var output api.OutputEvent 90 if err := json.Unmarshal(msg.Value, &output); err != nil { 91 // If the message was invalid, log it and move on to the next message in the stream 92 log.WithError(err).Errorf("roomserver output log: message parse failure") 93 return nil 94 } 95 96 switch output.Type { 97 case api.OutputTypeNewRoomEvent: 98 // Ignore redaction events. We will add them to the database when they are 99 // validated (when we receive OutputTypeRedactedEvent) 100 event := output.NewRoomEvent.Event 101 if event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil { 102 // in the special case where the event redacts itself, just pass the message through because 103 // we will never see the other part of the pair 104 if event.Redacts() != event.EventID() { 105 return nil 106 } 107 } 108 return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) 109 case api.OutputTypeOldRoomEvent: 110 return s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent) 111 case api.OutputTypeNewInviteEvent: 112 return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) 113 case api.OutputTypeRetireInviteEvent: 114 return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) 115 case api.OutputTypeNewPeek: 116 return s.onNewPeek(context.TODO(), *output.NewPeek) 117 case api.OutputTypeRetirePeek: 118 return s.onRetirePeek(context.TODO(), *output.RetirePeek) 119 case api.OutputTypeRedactedEvent: 120 return s.onRedactEvent(context.TODO(), *output.RedactedEvent) 121 default: 122 log.WithField("type", output.Type).Debug( 123 "roomserver output log: ignoring unknown output type", 124 ) 125 return nil 126 } 127} 128 129func (s *OutputRoomEventConsumer) onRedactEvent( 130 ctx context.Context, msg api.OutputRedactedEvent, 131) error { 132 err := s.db.RedactEvent(ctx, msg.RedactedEventID, msg.RedactedBecause) 133 if err != nil { 134 log.WithError(err).Error("RedactEvent error'd") 135 return err 136 } 137 // fake a room event so we notify clients about the redaction, as if it were 138 // a normal event. 139 return s.onNewRoomEvent(ctx, api.OutputNewRoomEvent{ 140 Event: msg.RedactedBecause, 141 }) 142} 143 144func (s *OutputRoomEventConsumer) onNewRoomEvent( 145 ctx context.Context, msg api.OutputNewRoomEvent, 146) error { 147 ev := msg.Event 148 addsStateEvents := msg.AddsState() 149 150 ev, err := s.updateStateEvent(ev) 151 if err != nil { 152 sentry.CaptureException(err) 153 return err 154 } 155 156 for i := range addsStateEvents { 157 addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i]) 158 if err != nil { 159 sentry.CaptureException(err) 160 return err 161 } 162 } 163 164 if msg.RewritesState { 165 if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil { 166 sentry.CaptureException(err) 167 return fmt.Errorf("s.db.PurgeRoom: %w", err) 168 } 169 } 170 171 pduPos, err := s.db.WriteEvent( 172 ctx, 173 ev, 174 addsStateEvents, 175 msg.AddsStateEventIDs, 176 msg.RemovesStateEventIDs, 177 msg.TransactionID, 178 false, 179 ) 180 if err != nil { 181 // panic rather than continue with an inconsistent database 182 log.WithFields(log.Fields{ 183 "event_id": ev.EventID(), 184 "event": string(ev.JSON()), 185 log.ErrorKey: err, 186 "add": msg.AddsStateEventIDs, 187 "del": msg.RemovesStateEventIDs, 188 }).Panicf("roomserver output log: write new event failure") 189 return nil 190 } 191 192 if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil { 193 log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) 194 sentry.CaptureException(err) 195 return err 196 } 197 198 s.pduStream.Advance(pduPos) 199 s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) 200 201 return nil 202} 203 204func (s *OutputRoomEventConsumer) onOldRoomEvent( 205 ctx context.Context, msg api.OutputOldRoomEvent, 206) error { 207 ev := msg.Event 208 209 // TODO: The state key check when excluding from sync is designed 210 // to stop us from lying to clients with old state, whilst still 211 // allowing normal timeline events through. This is an absolute 212 // hack but until we have some better strategy for dealing with 213 // old events in the sync API, this should at least prevent us 214 // from confusing clients into thinking they've joined/left rooms. 215 pduPos, err := s.db.WriteEvent( 216 ctx, 217 ev, 218 []*gomatrixserverlib.HeaderedEvent{}, 219 []string{}, // adds no state 220 []string{}, // removes no state 221 nil, // no transaction 222 ev.StateKey() != nil, // exclude from sync? 223 ) 224 if err != nil { 225 // panic rather than continue with an inconsistent database 226 log.WithFields(log.Fields{ 227 "event_id": ev.EventID(), 228 "event": string(ev.JSON()), 229 log.ErrorKey: err, 230 }).Panicf("roomserver output log: write old event failure") 231 return nil 232 } 233 234 if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil { 235 log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) 236 return err 237 } 238 239 s.pduStream.Advance(pduPos) 240 s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) 241 242 return nil 243} 244 245func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) { 246 if ev.Type() != gomatrixserverlib.MRoomMember { 247 return sp, nil 248 } 249 membership, err := ev.Membership() 250 if err != nil { 251 return sp, fmt.Errorf("ev.Membership: %w", err) 252 } 253 // TODO: check that it's a join and not a profile change (means unmarshalling prev_content) 254 if membership == gomatrixserverlib.Join { 255 // check it's a local join 256 _, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey()) 257 if err != nil { 258 return sp, fmt.Errorf("gomatrixserverlib.SplitID: %w", err) 259 } 260 if domain != s.cfg.Matrix.ServerName { 261 return sp, nil 262 } 263 264 // cancel any peeks for it 265 peekSP, peekErr := s.db.DeletePeeks(ctx, ev.RoomID(), *ev.StateKey()) 266 if peekErr != nil { 267 return sp, fmt.Errorf("s.db.DeletePeeks: %w", peekErr) 268 } 269 if peekSP > 0 { 270 sp = peekSP 271 } 272 } 273 return sp, nil 274} 275 276func (s *OutputRoomEventConsumer) onNewInviteEvent( 277 ctx context.Context, msg api.OutputNewInviteEvent, 278) error { 279 if msg.Event.StateKey() == nil { 280 log.WithFields(log.Fields{ 281 "event": string(msg.Event.JSON()), 282 }).Panicf("roomserver output log: invite has no state key") 283 return nil 284 } 285 pduPos, err := s.db.AddInviteEvent(ctx, msg.Event) 286 if err != nil { 287 sentry.CaptureException(err) 288 // panic rather than continue with an inconsistent database 289 log.WithFields(log.Fields{ 290 "event_id": msg.Event.EventID(), 291 "event": string(msg.Event.JSON()), 292 "pdupos": pduPos, 293 log.ErrorKey: err, 294 }).Panicf("roomserver output log: write invite failure") 295 return nil 296 } 297 298 s.inviteStream.Advance(pduPos) 299 s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey()) 300 301 return nil 302} 303 304func (s *OutputRoomEventConsumer) onRetireInviteEvent( 305 ctx context.Context, msg api.OutputRetireInviteEvent, 306) error { 307 pduPos, err := s.db.RetireInviteEvent(ctx, msg.EventID) 308 if err != nil { 309 sentry.CaptureException(err) 310 // panic rather than continue with an inconsistent database 311 log.WithFields(log.Fields{ 312 "event_id": msg.EventID, 313 log.ErrorKey: err, 314 }).Panicf("roomserver output log: remove invite failure") 315 return nil 316 } 317 318 // Notify any active sync requests that the invite has been retired. 319 s.inviteStream.Advance(pduPos) 320 s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID) 321 322 return nil 323} 324 325func (s *OutputRoomEventConsumer) onNewPeek( 326 ctx context.Context, msg api.OutputNewPeek, 327) error { 328 sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID) 329 if err != nil { 330 sentry.CaptureException(err) 331 // panic rather than continue with an inconsistent database 332 log.WithFields(log.Fields{ 333 log.ErrorKey: err, 334 }).Panicf("roomserver output log: write peek failure") 335 return nil 336 } 337 338 // tell the notifier about the new peek so it knows to wake up new devices 339 // TODO: This only works because the peeks table is reusing the same 340 // index as PDUs, but we should fix this 341 s.pduStream.Advance(sp) 342 s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) 343 344 return nil 345} 346 347func (s *OutputRoomEventConsumer) onRetirePeek( 348 ctx context.Context, msg api.OutputRetirePeek, 349) error { 350 sp, err := s.db.DeletePeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID) 351 if err != nil { 352 // panic rather than continue with an inconsistent database 353 log.WithFields(log.Fields{ 354 log.ErrorKey: err, 355 }).Panicf("roomserver output log: write peek failure") 356 return nil 357 } 358 359 // tell the notifier about the new peek so it knows to wake up new devices 360 // TODO: This only works because the peeks table is reusing the same 361 // index as PDUs, but we should fix this 362 s.pduStream.Advance(sp) 363 s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) 364 365 return nil 366} 367 368func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) { 369 if event.StateKey() == nil { 370 return event, nil 371 } 372 stateKey := *event.StateKey() 373 374 prevEvent, err := s.db.GetStateEvent( 375 context.TODO(), event.RoomID(), event.Type(), stateKey, 376 ) 377 if err != nil { 378 return event, err 379 } 380 381 if prevEvent == nil || prevEvent.EventID() == event.EventID() { 382 return event, nil 383 } 384 385 prev := types.PrevEventRef{ 386 PrevContent: prevEvent.Content(), 387 ReplacesState: prevEvent.EventID(), 388 PrevSender: prevEvent.Sender(), 389 } 390 391 event.Event, err = event.SetUnsigned(prev) 392 return event, err 393} 394