1// Copyright 2017 Vector Creations Ltd 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package consumers 16 17import ( 18 "context" 19 "encoding/json" 20 21 "github.com/Shopify/sarama" 22 "github.com/getsentry/sentry-go" 23 "github.com/matrix-org/dendrite/internal" 24 "github.com/matrix-org/dendrite/internal/eventutil" 25 "github.com/matrix-org/dendrite/setup/config" 26 "github.com/matrix-org/dendrite/setup/process" 27 "github.com/matrix-org/dendrite/syncapi/notifier" 28 "github.com/matrix-org/dendrite/syncapi/storage" 29 "github.com/matrix-org/dendrite/syncapi/types" 30 log "github.com/sirupsen/logrus" 31) 32 33// OutputClientDataConsumer consumes events that originated in the client API server. 34type OutputClientDataConsumer struct { 35 clientAPIConsumer *internal.ContinualConsumer 36 db storage.Database 37 stream types.StreamProvider 38 notifier *notifier.Notifier 39} 40 41// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. 42func NewOutputClientDataConsumer( 43 process *process.ProcessContext, 44 cfg *config.SyncAPI, 45 kafkaConsumer sarama.Consumer, 46 store storage.Database, 47 notifier *notifier.Notifier, 48 stream types.StreamProvider, 49) *OutputClientDataConsumer { 50 consumer := internal.ContinualConsumer{ 51 Process: process, 52 ComponentName: "syncapi/clientapi", 53 Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)), 54 Consumer: kafkaConsumer, 55 PartitionStore: store, 56 } 57 s := &OutputClientDataConsumer{ 58 clientAPIConsumer: &consumer, 59 db: store, 60 notifier: notifier, 61 stream: stream, 62 } 63 consumer.ProcessMessage = s.onMessage 64 65 return s 66} 67 68// Start consuming from room servers 69func (s *OutputClientDataConsumer) Start() error { 70 return s.clientAPIConsumer.Start() 71} 72 73// onMessage is called when the sync server receives a new event from the client API server output log. 74// It is not safe for this function to be called from multiple goroutines, or else the 75// sync stream position may race and be incorrectly calculated. 76func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error { 77 // Parse out the event JSON 78 var output eventutil.AccountData 79 if err := json.Unmarshal(msg.Value, &output); err != nil { 80 // If the message was invalid, log it and move on to the next message in the stream 81 log.WithError(err).Errorf("client API server output log: message parse failure") 82 sentry.CaptureException(err) 83 return nil 84 } 85 86 log.WithFields(log.Fields{ 87 "type": output.Type, 88 "room_id": output.RoomID, 89 }).Info("received data from client API server") 90 91 streamPos, err := s.db.UpsertAccountData( 92 context.TODO(), string(msg.Key), output.RoomID, output.Type, 93 ) 94 if err != nil { 95 sentry.CaptureException(err) 96 log.WithFields(log.Fields{ 97 "type": output.Type, 98 "room_id": output.RoomID, 99 log.ErrorKey: err, 100 }).Panicf("could not save account data") 101 } 102 103 s.stream.Advance(streamPos) 104 s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos}) 105 106 return nil 107} 108