1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package keyserver
16
17import (
18	"github.com/gorilla/mux"
19	fedsenderapi "github.com/matrix-org/dendrite/federationsender/api"
20	"github.com/matrix-org/dendrite/keyserver/api"
21	"github.com/matrix-org/dendrite/keyserver/consumers"
22	"github.com/matrix-org/dendrite/keyserver/internal"
23	"github.com/matrix-org/dendrite/keyserver/inthttp"
24	"github.com/matrix-org/dendrite/keyserver/producers"
25	"github.com/matrix-org/dendrite/keyserver/storage"
26	"github.com/matrix-org/dendrite/setup"
27	"github.com/matrix-org/dendrite/setup/config"
28	"github.com/matrix-org/dendrite/setup/kafka"
29	"github.com/sirupsen/logrus"
30)
31
32// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
33// on the given input API.
34func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
35	inthttp.AddRoutes(router, intAPI)
36}
37
38// NewInternalAPI returns a concerete implementation of the internal API. Callers
39// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
40func NewInternalAPI(
41	base *setup.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
42) api.KeyInternalAPI {
43	consumer, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
44
45	db, err := storage.NewDatabase(&cfg.Database)
46	if err != nil {
47		logrus.WithError(err).Panicf("failed to connect to key server database")
48	}
49	keyChangeProducer := &producers.KeyChange{
50		Topic:    string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
51		Producer: producer,
52		DB:       db,
53	}
54	ap := &internal.KeyInternalAPI{
55		DB:         db,
56		ThisServer: cfg.Matrix.ServerName,
57		FedClient:  fedClient,
58		Producer:   keyChangeProducer,
59	}
60	updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
61	ap.Updater = updater
62	go func() {
63		if err := updater.Start(); err != nil {
64			logrus.WithError(err).Panicf("failed to start device list updater")
65		}
66	}()
67
68	keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer(
69		base.ProcessContext, base.Cfg, consumer, db, ap,
70	)
71	if err := keyconsumer.Start(); err != nil {
72		logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer")
73	}
74
75	return ap
76}
77