1// Copyright 2017 Vector Creations Ltd
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package consumers
16
17import (
18	"context"
19	"encoding/json"
20	"fmt"
21
22	"github.com/Shopify/sarama"
23	"github.com/getsentry/sentry-go"
24	"github.com/matrix-org/dendrite/internal"
25	"github.com/matrix-org/dendrite/roomserver/api"
26	"github.com/matrix-org/dendrite/setup/config"
27	"github.com/matrix-org/dendrite/setup/process"
28	"github.com/matrix-org/dendrite/syncapi/notifier"
29	"github.com/matrix-org/dendrite/syncapi/storage"
30	"github.com/matrix-org/dendrite/syncapi/types"
31	"github.com/matrix-org/gomatrixserverlib"
32	log "github.com/sirupsen/logrus"
33)
34
35// OutputRoomEventConsumer consumes events that originated in the room server.
36type OutputRoomEventConsumer struct {
37	cfg          *config.SyncAPI
38	rsAPI        api.RoomserverInternalAPI
39	rsConsumer   *internal.ContinualConsumer
40	db           storage.Database
41	pduStream    types.StreamProvider
42	inviteStream types.StreamProvider
43	notifier     *notifier.Notifier
44}
45
46// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
47func NewOutputRoomEventConsumer(
48	process *process.ProcessContext,
49	cfg *config.SyncAPI,
50	kafkaConsumer sarama.Consumer,
51	store storage.Database,
52	notifier *notifier.Notifier,
53	pduStream types.StreamProvider,
54	inviteStream types.StreamProvider,
55	rsAPI api.RoomserverInternalAPI,
56) *OutputRoomEventConsumer {
57
58	consumer := internal.ContinualConsumer{
59		Process:        process,
60		ComponentName:  "syncapi/roomserver",
61		Topic:          string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
62		Consumer:       kafkaConsumer,
63		PartitionStore: store,
64	}
65	s := &OutputRoomEventConsumer{
66		cfg:          cfg,
67		rsConsumer:   &consumer,
68		db:           store,
69		notifier:     notifier,
70		pduStream:    pduStream,
71		inviteStream: inviteStream,
72		rsAPI:        rsAPI,
73	}
74	consumer.ProcessMessage = s.onMessage
75
76	return s
77}
78
79// Start consuming from room servers
80func (s *OutputRoomEventConsumer) Start() error {
81	return s.rsConsumer.Start()
82}
83
84// onMessage is called when the sync server receives a new event from the room server output log.
85// It is not safe for this function to be called from multiple goroutines, or else the
86// sync stream position may race and be incorrectly calculated.
87func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
88	// Parse out the event JSON
89	var output api.OutputEvent
90	if err := json.Unmarshal(msg.Value, &output); err != nil {
91		// If the message was invalid, log it and move on to the next message in the stream
92		log.WithError(err).Errorf("roomserver output log: message parse failure")
93		return nil
94	}
95
96	switch output.Type {
97	case api.OutputTypeNewRoomEvent:
98		// Ignore redaction events. We will add them to the database when they are
99		// validated (when we receive OutputTypeRedactedEvent)
100		event := output.NewRoomEvent.Event
101		if event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil {
102			// in the special case where the event redacts itself, just pass the message through because
103			// we will never see the other part of the pair
104			if event.Redacts() != event.EventID() {
105				return nil
106			}
107		}
108		return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
109	case api.OutputTypeOldRoomEvent:
110		return s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent)
111	case api.OutputTypeNewInviteEvent:
112		return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
113	case api.OutputTypeRetireInviteEvent:
114		return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
115	case api.OutputTypeNewPeek:
116		return s.onNewPeek(context.TODO(), *output.NewPeek)
117	case api.OutputTypeRetirePeek:
118		return s.onRetirePeek(context.TODO(), *output.RetirePeek)
119	case api.OutputTypeRedactedEvent:
120		return s.onRedactEvent(context.TODO(), *output.RedactedEvent)
121	default:
122		log.WithField("type", output.Type).Debug(
123			"roomserver output log: ignoring unknown output type",
124		)
125		return nil
126	}
127}
128
129func (s *OutputRoomEventConsumer) onRedactEvent(
130	ctx context.Context, msg api.OutputRedactedEvent,
131) error {
132	err := s.db.RedactEvent(ctx, msg.RedactedEventID, msg.RedactedBecause)
133	if err != nil {
134		log.WithError(err).Error("RedactEvent error'd")
135		return err
136	}
137	// fake a room event so we notify clients about the redaction, as if it were
138	// a normal event.
139	return s.onNewRoomEvent(ctx, api.OutputNewRoomEvent{
140		Event: msg.RedactedBecause,
141	})
142}
143
144func (s *OutputRoomEventConsumer) onNewRoomEvent(
145	ctx context.Context, msg api.OutputNewRoomEvent,
146) error {
147	ev := msg.Event
148	addsStateEvents := msg.AddsState()
149
150	ev, err := s.updateStateEvent(ev)
151	if err != nil {
152		sentry.CaptureException(err)
153		return err
154	}
155
156	for i := range addsStateEvents {
157		addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i])
158		if err != nil {
159			sentry.CaptureException(err)
160			return err
161		}
162	}
163
164	if msg.RewritesState {
165		if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil {
166			sentry.CaptureException(err)
167			return fmt.Errorf("s.db.PurgeRoom: %w", err)
168		}
169	}
170
171	pduPos, err := s.db.WriteEvent(
172		ctx,
173		ev,
174		addsStateEvents,
175		msg.AddsStateEventIDs,
176		msg.RemovesStateEventIDs,
177		msg.TransactionID,
178		false,
179	)
180	if err != nil {
181		// panic rather than continue with an inconsistent database
182		log.WithFields(log.Fields{
183			"event_id":   ev.EventID(),
184			"event":      string(ev.JSON()),
185			log.ErrorKey: err,
186			"add":        msg.AddsStateEventIDs,
187			"del":        msg.RemovesStateEventIDs,
188		}).Panicf("roomserver output log: write new event failure")
189		return nil
190	}
191
192	if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
193		log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
194		sentry.CaptureException(err)
195		return err
196	}
197
198	s.pduStream.Advance(pduPos)
199	s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
200
201	return nil
202}
203
204func (s *OutputRoomEventConsumer) onOldRoomEvent(
205	ctx context.Context, msg api.OutputOldRoomEvent,
206) error {
207	ev := msg.Event
208
209	// TODO: The state key check when excluding from sync is designed
210	// to stop us from lying to clients with old state, whilst still
211	// allowing normal timeline events through. This is an absolute
212	// hack but until we have some better strategy for dealing with
213	// old events in the sync API, this should at least prevent us
214	// from confusing clients into thinking they've joined/left rooms.
215	pduPos, err := s.db.WriteEvent(
216		ctx,
217		ev,
218		[]*gomatrixserverlib.HeaderedEvent{},
219		[]string{},           // adds no state
220		[]string{},           // removes no state
221		nil,                  // no transaction
222		ev.StateKey() != nil, // exclude from sync?
223	)
224	if err != nil {
225		// panic rather than continue with an inconsistent database
226		log.WithFields(log.Fields{
227			"event_id":   ev.EventID(),
228			"event":      string(ev.JSON()),
229			log.ErrorKey: err,
230		}).Panicf("roomserver output log: write old event failure")
231		return nil
232	}
233
234	if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
235		log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
236		return err
237	}
238
239	s.pduStream.Advance(pduPos)
240	s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
241
242	return nil
243}
244
245func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) {
246	if ev.Type() != gomatrixserverlib.MRoomMember {
247		return sp, nil
248	}
249	membership, err := ev.Membership()
250	if err != nil {
251		return sp, fmt.Errorf("ev.Membership: %w", err)
252	}
253	// TODO: check that it's a join and not a profile change (means unmarshalling prev_content)
254	if membership == gomatrixserverlib.Join {
255		// check it's a local join
256		_, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey())
257		if err != nil {
258			return sp, fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
259		}
260		if domain != s.cfg.Matrix.ServerName {
261			return sp, nil
262		}
263
264		// cancel any peeks for it
265		peekSP, peekErr := s.db.DeletePeeks(ctx, ev.RoomID(), *ev.StateKey())
266		if peekErr != nil {
267			return sp, fmt.Errorf("s.db.DeletePeeks: %w", peekErr)
268		}
269		if peekSP > 0 {
270			sp = peekSP
271		}
272	}
273	return sp, nil
274}
275
276func (s *OutputRoomEventConsumer) onNewInviteEvent(
277	ctx context.Context, msg api.OutputNewInviteEvent,
278) error {
279	if msg.Event.StateKey() == nil {
280		log.WithFields(log.Fields{
281			"event": string(msg.Event.JSON()),
282		}).Panicf("roomserver output log: invite has no state key")
283		return nil
284	}
285	pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
286	if err != nil {
287		sentry.CaptureException(err)
288		// panic rather than continue with an inconsistent database
289		log.WithFields(log.Fields{
290			"event_id":   msg.Event.EventID(),
291			"event":      string(msg.Event.JSON()),
292			"pdupos":     pduPos,
293			log.ErrorKey: err,
294		}).Panicf("roomserver output log: write invite failure")
295		return nil
296	}
297
298	s.inviteStream.Advance(pduPos)
299	s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey())
300
301	return nil
302}
303
304func (s *OutputRoomEventConsumer) onRetireInviteEvent(
305	ctx context.Context, msg api.OutputRetireInviteEvent,
306) error {
307	pduPos, err := s.db.RetireInviteEvent(ctx, msg.EventID)
308	if err != nil {
309		sentry.CaptureException(err)
310		// panic rather than continue with an inconsistent database
311		log.WithFields(log.Fields{
312			"event_id":   msg.EventID,
313			log.ErrorKey: err,
314		}).Panicf("roomserver output log: remove invite failure")
315		return nil
316	}
317
318	// Notify any active sync requests that the invite has been retired.
319	s.inviteStream.Advance(pduPos)
320	s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
321
322	return nil
323}
324
325func (s *OutputRoomEventConsumer) onNewPeek(
326	ctx context.Context, msg api.OutputNewPeek,
327) error {
328	sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
329	if err != nil {
330		sentry.CaptureException(err)
331		// panic rather than continue with an inconsistent database
332		log.WithFields(log.Fields{
333			log.ErrorKey: err,
334		}).Panicf("roomserver output log: write peek failure")
335		return nil
336	}
337
338	// tell the notifier about the new peek so it knows to wake up new devices
339	// TODO: This only works because the peeks table is reusing the same
340	// index as PDUs, but we should fix this
341	s.pduStream.Advance(sp)
342	s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
343
344	return nil
345}
346
347func (s *OutputRoomEventConsumer) onRetirePeek(
348	ctx context.Context, msg api.OutputRetirePeek,
349) error {
350	sp, err := s.db.DeletePeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
351	if err != nil {
352		// panic rather than continue with an inconsistent database
353		log.WithFields(log.Fields{
354			log.ErrorKey: err,
355		}).Panicf("roomserver output log: write peek failure")
356		return nil
357	}
358
359	// tell the notifier about the new peek so it knows to wake up new devices
360	// TODO: This only works because the peeks table is reusing the same
361	// index as PDUs, but we should fix this
362	s.pduStream.Advance(sp)
363	s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
364
365	return nil
366}
367
368func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) {
369	if event.StateKey() == nil {
370		return event, nil
371	}
372	stateKey := *event.StateKey()
373
374	prevEvent, err := s.db.GetStateEvent(
375		context.TODO(), event.RoomID(), event.Type(), stateKey,
376	)
377	if err != nil {
378		return event, err
379	}
380
381	if prevEvent == nil || prevEvent.EventID() == event.EventID() {
382		return event, nil
383	}
384
385	prev := types.PrevEventRef{
386		PrevContent:   prevEvent.Content(),
387		ReplacesState: prevEvent.EventID(),
388		PrevSender:    prevEvent.Sender(),
389	}
390
391	event.Event, err = event.SetUnsigned(prev)
392	return event, err
393}
394