1// Copyright 2018 Vector Creations Ltd 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package appservice 16 17import ( 18 "context" 19 "crypto/tls" 20 "net/http" 21 "sync" 22 "time" 23 24 "github.com/gorilla/mux" 25 appserviceAPI "github.com/matrix-org/dendrite/appservice/api" 26 "github.com/matrix-org/dendrite/appservice/consumers" 27 "github.com/matrix-org/dendrite/appservice/inthttp" 28 "github.com/matrix-org/dendrite/appservice/query" 29 "github.com/matrix-org/dendrite/appservice/storage" 30 "github.com/matrix-org/dendrite/appservice/types" 31 "github.com/matrix-org/dendrite/appservice/workers" 32 roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" 33 "github.com/matrix-org/dendrite/setup" 34 "github.com/matrix-org/dendrite/setup/config" 35 "github.com/matrix-org/dendrite/setup/kafka" 36 userapi "github.com/matrix-org/dendrite/userapi/api" 37 "github.com/sirupsen/logrus" 38) 39 40// AddInternalRoutes registers HTTP handlers for internal API calls 41func AddInternalRoutes(router *mux.Router, queryAPI appserviceAPI.AppServiceQueryAPI) { 42 inthttp.AddRoutes(queryAPI, router) 43} 44 45// NewInternalAPI returns a concerete implementation of the internal API. Callers 46// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. 47func NewInternalAPI( 48 base *setup.BaseDendrite, 49 userAPI userapi.UserInternalAPI, 50 rsAPI roomserverAPI.RoomserverInternalAPI, 51) appserviceAPI.AppServiceQueryAPI { 52 client := &http.Client{ 53 Timeout: time.Second * 30, 54 Transport: &http.Transport{ 55 DisableKeepAlives: true, 56 TLSClientConfig: &tls.Config{ 57 InsecureSkipVerify: base.Cfg.AppServiceAPI.DisableTLSValidation, 58 }, 59 }, 60 } 61 consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka) 62 63 // Create a connection to the appservice postgres DB 64 appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) 65 if err != nil { 66 logrus.WithError(err).Panicf("failed to connect to appservice db") 67 } 68 69 // Wrap application services in a type that relates the application service and 70 // a sync.Cond object that can be used to notify workers when there are new 71 // events to be sent out. 72 workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices)) 73 for i, appservice := range base.Cfg.Derived.ApplicationServices { 74 m := sync.Mutex{} 75 ws := types.ApplicationServiceWorkerState{ 76 AppService: appservice, 77 Cond: sync.NewCond(&m), 78 } 79 workerStates[i] = ws 80 81 // Create bot account for this AS if it doesn't already exist 82 if err = generateAppServiceAccount(userAPI, appservice); err != nil { 83 logrus.WithFields(logrus.Fields{ 84 "appservice": appservice.ID, 85 }).WithError(err).Panicf("failed to generate bot account for appservice") 86 } 87 } 88 89 // Create appserivce query API with an HTTP client that will be used for all 90 // outbound and inbound requests (inbound only for the internal API) 91 appserviceQueryAPI := &query.AppServiceQueryAPI{ 92 HTTPClient: client, 93 Cfg: base.Cfg, 94 } 95 96 // Only consume if we actually have ASes to track, else we'll just chew cycles needlessly. 97 // We can't add ASes at runtime so this is safe to do. 98 if len(workerStates) > 0 { 99 consumer := consumers.NewOutputRoomEventConsumer( 100 base.ProcessContext, base.Cfg, consumer, appserviceDB, 101 rsAPI, workerStates, 102 ) 103 if err := consumer.Start(); err != nil { 104 logrus.WithError(err).Panicf("failed to start appservice roomserver consumer") 105 } 106 } 107 108 // Create application service transaction workers 109 if err := workers.SetupTransactionWorkers(client, appserviceDB, workerStates); err != nil { 110 logrus.WithError(err).Panicf("failed to start app service transaction workers") 111 } 112 return appserviceQueryAPI 113} 114 115// generateAppServiceAccounts creates a dummy account based off the 116// `sender_localpart` field of each application service if it doesn't 117// exist already 118func generateAppServiceAccount( 119 userAPI userapi.UserInternalAPI, 120 as config.ApplicationService, 121) error { 122 var accRes userapi.PerformAccountCreationResponse 123 err := userAPI.PerformAccountCreation(context.Background(), &userapi.PerformAccountCreationRequest{ 124 AccountType: userapi.AccountTypeUser, 125 Localpart: as.SenderLocalpart, 126 AppServiceID: as.ID, 127 OnConflict: userapi.ConflictUpdate, 128 }, &accRes) 129 if err != nil { 130 return err 131 } 132 var devRes userapi.PerformDeviceCreationResponse 133 err = userAPI.PerformDeviceCreation(context.Background(), &userapi.PerformDeviceCreationRequest{ 134 Localpart: as.SenderLocalpart, 135 AccessToken: as.ASToken, 136 DeviceID: &as.SenderLocalpart, 137 DeviceDisplayName: &as.SenderLocalpart, 138 }, &devRes) 139 return err 140} 141