1// Copyright 2017 Vector Creations Ltd 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package routing 16 17import ( 18 "net/http" 19 "sync" 20 "time" 21 22 "github.com/matrix-org/dendrite/clientapi/httputil" 23 "github.com/matrix-org/dendrite/clientapi/jsonerror" 24 "github.com/matrix-org/dendrite/internal/eventutil" 25 "github.com/matrix-org/dendrite/internal/transactions" 26 "github.com/matrix-org/dendrite/roomserver/api" 27 "github.com/matrix-org/dendrite/setup/config" 28 userapi "github.com/matrix-org/dendrite/userapi/api" 29 "github.com/matrix-org/gomatrixserverlib" 30 "github.com/matrix-org/util" 31 "github.com/prometheus/client_golang/prometheus" 32 "github.com/sirupsen/logrus" 33) 34 35// http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid 36// http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-state-eventtype-statekey 37type sendEventResponse struct { 38 EventID string `json:"event_id"` 39} 40 41var ( 42 userRoomSendMutexes sync.Map // (roomID+userID) -> mutex. mutexes to ensure correct ordering of sendEvents 43) 44 45func init() { 46 prometheus.MustRegister(sendEventDuration) 47} 48 49var sendEventDuration = prometheus.NewHistogramVec( 50 prometheus.HistogramOpts{ 51 Namespace: "dendrite", 52 Subsystem: "clientapi", 53 Name: "sendevent_duration_millis", 54 Help: "How long it takes to build and submit a new event from the client API to the roomserver", 55 Buckets: []float64{ // milliseconds 56 5, 10, 25, 50, 75, 100, 250, 500, 57 1000, 2000, 3000, 4000, 5000, 6000, 58 7000, 8000, 9000, 10000, 15000, 20000, 59 }, 60 }, 61 []string{"action"}, 62) 63 64// SendEvent implements: 65// /rooms/{roomID}/send/{eventType} 66// /rooms/{roomID}/send/{eventType}/{txnID} 67// /rooms/{roomID}/state/{eventType}/{stateKey} 68func SendEvent( 69 req *http.Request, 70 device *userapi.Device, 71 roomID, eventType string, txnID, stateKey *string, 72 cfg *config.ClientAPI, 73 rsAPI api.RoomserverInternalAPI, 74 txnCache *transactions.Cache, 75) util.JSONResponse { 76 verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} 77 verRes := api.QueryRoomVersionForRoomResponse{} 78 if err := rsAPI.QueryRoomVersionForRoom(req.Context(), &verReq, &verRes); err != nil { 79 return util.JSONResponse{ 80 Code: http.StatusBadRequest, 81 JSON: jsonerror.UnsupportedRoomVersion(err.Error()), 82 } 83 } 84 85 if txnID != nil { 86 // Try to fetch response from transactionsCache 87 if res, ok := txnCache.FetchTransaction(device.AccessToken, *txnID); ok { 88 return *res 89 } 90 } 91 92 // create a mutex for the specific user in the specific room 93 // this avoids a situation where events that are received in quick succession are sent to the roomserver in a jumbled order 94 userID := device.UserID 95 mutex, _ := userRoomSendMutexes.LoadOrStore(roomID+userID, &sync.Mutex{}) 96 mutex.(*sync.Mutex).Lock() 97 defer mutex.(*sync.Mutex).Unlock() 98 99 startedGeneratingEvent := time.Now() 100 e, resErr := generateSendEvent(req, device, roomID, eventType, stateKey, cfg, rsAPI) 101 if resErr != nil { 102 return *resErr 103 } 104 timeToGenerateEvent := time.Since(startedGeneratingEvent) 105 106 var txnAndSessionID *api.TransactionID 107 if txnID != nil { 108 txnAndSessionID = &api.TransactionID{ 109 TransactionID: *txnID, 110 SessionID: device.SessionID, 111 } 112 } 113 114 // pass the new event to the roomserver and receive the correct event ID 115 // event ID in case of duplicate transaction is discarded 116 startedSubmittingEvent := time.Now() 117 if err := api.SendEvents( 118 req.Context(), rsAPI, 119 api.KindNew, 120 []*gomatrixserverlib.HeaderedEvent{ 121 e.Headered(verRes.RoomVersion), 122 }, 123 cfg.Matrix.ServerName, 124 txnAndSessionID, 125 ); err != nil { 126 util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") 127 return jsonerror.InternalServerError() 128 } 129 timeToSubmitEvent := time.Since(startedSubmittingEvent) 130 util.GetLogger(req.Context()).WithFields(logrus.Fields{ 131 "event_id": e.EventID(), 132 "room_id": roomID, 133 "room_version": verRes.RoomVersion, 134 }).Info("Sent event to roomserver") 135 136 res := util.JSONResponse{ 137 Code: http.StatusOK, 138 JSON: sendEventResponse{e.EventID()}, 139 } 140 // Add response to transactionsCache 141 if txnID != nil { 142 txnCache.AddTransaction(device.AccessToken, *txnID, &res) 143 } 144 145 // Take a note of how long it took to generate the event vs submit 146 // it to the roomserver. 147 sendEventDuration.With(prometheus.Labels{"action": "build"}).Observe(float64(timeToGenerateEvent.Milliseconds())) 148 sendEventDuration.With(prometheus.Labels{"action": "submit"}).Observe(float64(timeToSubmitEvent.Milliseconds())) 149 150 return res 151} 152 153func generateSendEvent( 154 req *http.Request, 155 device *userapi.Device, 156 roomID, eventType string, stateKey *string, 157 cfg *config.ClientAPI, 158 rsAPI api.RoomserverInternalAPI, 159) (*gomatrixserverlib.Event, *util.JSONResponse) { 160 // parse the incoming http request 161 userID := device.UserID 162 var r map[string]interface{} // must be a JSON object 163 resErr := httputil.UnmarshalJSONRequest(req, &r) 164 if resErr != nil { 165 return nil, resErr 166 } 167 168 evTime, err := httputil.ParseTSParam(req) 169 if err != nil { 170 return nil, &util.JSONResponse{ 171 Code: http.StatusBadRequest, 172 JSON: jsonerror.InvalidArgumentValue(err.Error()), 173 } 174 } 175 176 // create the new event and set all the fields we can 177 builder := gomatrixserverlib.EventBuilder{ 178 Sender: userID, 179 RoomID: roomID, 180 Type: eventType, 181 StateKey: stateKey, 182 } 183 err = builder.SetContent(r) 184 if err != nil { 185 util.GetLogger(req.Context()).WithError(err).Error("builder.SetContent failed") 186 resErr := jsonerror.InternalServerError() 187 return nil, &resErr 188 } 189 190 var queryRes api.QueryLatestEventsAndStateResponse 191 e, err := eventutil.QueryAndBuildEvent(req.Context(), &builder, cfg.Matrix, evTime, rsAPI, &queryRes) 192 if err == eventutil.ErrRoomNoExists { 193 return nil, &util.JSONResponse{ 194 Code: http.StatusNotFound, 195 JSON: jsonerror.NotFound("Room does not exist"), 196 } 197 } else if e, ok := err.(gomatrixserverlib.BadJSONError); ok { 198 return nil, &util.JSONResponse{ 199 Code: http.StatusBadRequest, 200 JSON: jsonerror.BadJSON(e.Error()), 201 } 202 } else if e, ok := err.(gomatrixserverlib.EventValidationError); ok { 203 if e.Code == gomatrixserverlib.EventValidationTooLarge { 204 return nil, &util.JSONResponse{ 205 Code: http.StatusRequestEntityTooLarge, 206 JSON: jsonerror.BadJSON(e.Error()), 207 } 208 } 209 return nil, &util.JSONResponse{ 210 Code: http.StatusBadRequest, 211 JSON: jsonerror.BadJSON(e.Error()), 212 } 213 } else if err != nil { 214 util.GetLogger(req.Context()).WithError(err).Error("eventutil.BuildEvent failed") 215 resErr := jsonerror.InternalServerError() 216 return nil, &resErr 217 } 218 219 // check to see if this user can perform this operation 220 stateEvents := make([]*gomatrixserverlib.Event, len(queryRes.StateEvents)) 221 for i := range queryRes.StateEvents { 222 stateEvents[i] = queryRes.StateEvents[i].Event 223 } 224 provider := gomatrixserverlib.NewAuthEvents(stateEvents) 225 if err = gomatrixserverlib.Allowed(e.Event, &provider); err != nil { 226 return nil, &util.JSONResponse{ 227 Code: http.StatusForbidden, 228 JSON: jsonerror.Forbidden(err.Error()), // TODO: Is this error string comprehensible to the client? 229 } 230 } 231 return e.Event, nil 232} 233