1// Copyright 2020 The Matrix.org Foundation C.I.C. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package keyserver 16 17import ( 18 "github.com/gorilla/mux" 19 fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" 20 "github.com/matrix-org/dendrite/keyserver/api" 21 "github.com/matrix-org/dendrite/keyserver/consumers" 22 "github.com/matrix-org/dendrite/keyserver/internal" 23 "github.com/matrix-org/dendrite/keyserver/inthttp" 24 "github.com/matrix-org/dendrite/keyserver/producers" 25 "github.com/matrix-org/dendrite/keyserver/storage" 26 "github.com/matrix-org/dendrite/setup" 27 "github.com/matrix-org/dendrite/setup/config" 28 "github.com/matrix-org/dendrite/setup/kafka" 29 "github.com/sirupsen/logrus" 30) 31 32// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions 33// on the given input API. 34func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { 35 inthttp.AddRoutes(router, intAPI) 36} 37 38// NewInternalAPI returns a concerete implementation of the internal API. Callers 39// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. 40func NewInternalAPI( 41 base *setup.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, 42) api.KeyInternalAPI { 43 consumer, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) 44 45 db, err := storage.NewDatabase(&cfg.Database) 46 if err != nil { 47 logrus.WithError(err).Panicf("failed to connect to key server database") 48 } 49 keyChangeProducer := &producers.KeyChange{ 50 Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), 51 Producer: producer, 52 DB: db, 53 } 54 ap := &internal.KeyInternalAPI{ 55 DB: db, 56 ThisServer: cfg.Matrix.ServerName, 57 FedClient: fedClient, 58 Producer: keyChangeProducer, 59 } 60 updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable 61 ap.Updater = updater 62 go func() { 63 if err := updater.Start(); err != nil { 64 logrus.WithError(err).Panicf("failed to start device list updater") 65 } 66 }() 67 68 keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer( 69 base.ProcessContext, base.Cfg, consumer, db, ap, 70 ) 71 if err := keyconsumer.Start(); err != nil { 72 logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer") 73 } 74 75 return ap 76} 77