1// Copyright 2017 Vector Creations Ltd
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package consumers
16
17import (
18	"context"
19	"encoding/json"
20
21	"github.com/Shopify/sarama"
22	"github.com/getsentry/sentry-go"
23	"github.com/matrix-org/dendrite/internal"
24	"github.com/matrix-org/dendrite/internal/eventutil"
25	"github.com/matrix-org/dendrite/setup/config"
26	"github.com/matrix-org/dendrite/setup/process"
27	"github.com/matrix-org/dendrite/syncapi/notifier"
28	"github.com/matrix-org/dendrite/syncapi/storage"
29	"github.com/matrix-org/dendrite/syncapi/types"
30	log "github.com/sirupsen/logrus"
31)
32
33// OutputClientDataConsumer consumes events that originated in the client API server.
34type OutputClientDataConsumer struct {
35	clientAPIConsumer *internal.ContinualConsumer
36	db                storage.Database
37	stream            types.StreamProvider
38	notifier          *notifier.Notifier
39}
40
41// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
42func NewOutputClientDataConsumer(
43	process *process.ProcessContext,
44	cfg *config.SyncAPI,
45	kafkaConsumer sarama.Consumer,
46	store storage.Database,
47	notifier *notifier.Notifier,
48	stream types.StreamProvider,
49) *OutputClientDataConsumer {
50	consumer := internal.ContinualConsumer{
51		Process:        process,
52		ComponentName:  "syncapi/clientapi",
53		Topic:          string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)),
54		Consumer:       kafkaConsumer,
55		PartitionStore: store,
56	}
57	s := &OutputClientDataConsumer{
58		clientAPIConsumer: &consumer,
59		db:                store,
60		notifier:          notifier,
61		stream:            stream,
62	}
63	consumer.ProcessMessage = s.onMessage
64
65	return s
66}
67
68// Start consuming from room servers
69func (s *OutputClientDataConsumer) Start() error {
70	return s.clientAPIConsumer.Start()
71}
72
73// onMessage is called when the sync server receives a new event from the client API server output log.
74// It is not safe for this function to be called from multiple goroutines, or else the
75// sync stream position may race and be incorrectly calculated.
76func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error {
77	// Parse out the event JSON
78	var output eventutil.AccountData
79	if err := json.Unmarshal(msg.Value, &output); err != nil {
80		// If the message was invalid, log it and move on to the next message in the stream
81		log.WithError(err).Errorf("client API server output log: message parse failure")
82		sentry.CaptureException(err)
83		return nil
84	}
85
86	log.WithFields(log.Fields{
87		"type":    output.Type,
88		"room_id": output.RoomID,
89	}).Info("received data from client API server")
90
91	streamPos, err := s.db.UpsertAccountData(
92		context.TODO(), string(msg.Key), output.RoomID, output.Type,
93	)
94	if err != nil {
95		sentry.CaptureException(err)
96		log.WithFields(log.Fields{
97			"type":       output.Type,
98			"room_id":    output.RoomID,
99			log.ErrorKey: err,
100		}).Panicf("could not save account data")
101	}
102
103	s.stream.Advance(streamPos)
104	s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos})
105
106	return nil
107}
108