1// Copyright 2017 Vector Creations Ltd
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package routing
16
17import (
18	"net/http"
19	"sync"
20	"time"
21
22	"github.com/matrix-org/dendrite/clientapi/httputil"
23	"github.com/matrix-org/dendrite/clientapi/jsonerror"
24	"github.com/matrix-org/dendrite/internal/eventutil"
25	"github.com/matrix-org/dendrite/internal/transactions"
26	"github.com/matrix-org/dendrite/roomserver/api"
27	"github.com/matrix-org/dendrite/setup/config"
28	userapi "github.com/matrix-org/dendrite/userapi/api"
29	"github.com/matrix-org/gomatrixserverlib"
30	"github.com/matrix-org/util"
31	"github.com/prometheus/client_golang/prometheus"
32	"github.com/sirupsen/logrus"
33)
34
35// http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid
36// http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-state-eventtype-statekey
37type sendEventResponse struct {
38	EventID string `json:"event_id"`
39}
40
41var (
42	userRoomSendMutexes sync.Map // (roomID+userID) -> mutex. mutexes to ensure correct ordering of sendEvents
43)
44
45func init() {
46	prometheus.MustRegister(sendEventDuration)
47}
48
49var sendEventDuration = prometheus.NewHistogramVec(
50	prometheus.HistogramOpts{
51		Namespace: "dendrite",
52		Subsystem: "clientapi",
53		Name:      "sendevent_duration_millis",
54		Help:      "How long it takes to build and submit a new event from the client API to the roomserver",
55		Buckets: []float64{ // milliseconds
56			5, 10, 25, 50, 75, 100, 250, 500,
57			1000, 2000, 3000, 4000, 5000, 6000,
58			7000, 8000, 9000, 10000, 15000, 20000,
59		},
60	},
61	[]string{"action"},
62)
63
64// SendEvent implements:
65//   /rooms/{roomID}/send/{eventType}
66//   /rooms/{roomID}/send/{eventType}/{txnID}
67//   /rooms/{roomID}/state/{eventType}/{stateKey}
68func SendEvent(
69	req *http.Request,
70	device *userapi.Device,
71	roomID, eventType string, txnID, stateKey *string,
72	cfg *config.ClientAPI,
73	rsAPI api.RoomserverInternalAPI,
74	txnCache *transactions.Cache,
75) util.JSONResponse {
76	verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
77	verRes := api.QueryRoomVersionForRoomResponse{}
78	if err := rsAPI.QueryRoomVersionForRoom(req.Context(), &verReq, &verRes); err != nil {
79		return util.JSONResponse{
80			Code: http.StatusBadRequest,
81			JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
82		}
83	}
84
85	if txnID != nil {
86		// Try to fetch response from transactionsCache
87		if res, ok := txnCache.FetchTransaction(device.AccessToken, *txnID); ok {
88			return *res
89		}
90	}
91
92	// create a mutex for the specific user in the specific room
93	// this avoids a situation where events that are received in quick succession are sent to the roomserver in a jumbled order
94	userID := device.UserID
95	mutex, _ := userRoomSendMutexes.LoadOrStore(roomID+userID, &sync.Mutex{})
96	mutex.(*sync.Mutex).Lock()
97	defer mutex.(*sync.Mutex).Unlock()
98
99	startedGeneratingEvent := time.Now()
100	e, resErr := generateSendEvent(req, device, roomID, eventType, stateKey, cfg, rsAPI)
101	if resErr != nil {
102		return *resErr
103	}
104	timeToGenerateEvent := time.Since(startedGeneratingEvent)
105
106	var txnAndSessionID *api.TransactionID
107	if txnID != nil {
108		txnAndSessionID = &api.TransactionID{
109			TransactionID: *txnID,
110			SessionID:     device.SessionID,
111		}
112	}
113
114	// pass the new event to the roomserver and receive the correct event ID
115	// event ID in case of duplicate transaction is discarded
116	startedSubmittingEvent := time.Now()
117	if err := api.SendEvents(
118		req.Context(), rsAPI,
119		api.KindNew,
120		[]*gomatrixserverlib.HeaderedEvent{
121			e.Headered(verRes.RoomVersion),
122		},
123		cfg.Matrix.ServerName,
124		txnAndSessionID,
125	); err != nil {
126		util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
127		return jsonerror.InternalServerError()
128	}
129	timeToSubmitEvent := time.Since(startedSubmittingEvent)
130	util.GetLogger(req.Context()).WithFields(logrus.Fields{
131		"event_id":     e.EventID(),
132		"room_id":      roomID,
133		"room_version": verRes.RoomVersion,
134	}).Info("Sent event to roomserver")
135
136	res := util.JSONResponse{
137		Code: http.StatusOK,
138		JSON: sendEventResponse{e.EventID()},
139	}
140	// Add response to transactionsCache
141	if txnID != nil {
142		txnCache.AddTransaction(device.AccessToken, *txnID, &res)
143	}
144
145	// Take a note of how long it took to generate the event vs submit
146	// it to the roomserver.
147	sendEventDuration.With(prometheus.Labels{"action": "build"}).Observe(float64(timeToGenerateEvent.Milliseconds()))
148	sendEventDuration.With(prometheus.Labels{"action": "submit"}).Observe(float64(timeToSubmitEvent.Milliseconds()))
149
150	return res
151}
152
153func generateSendEvent(
154	req *http.Request,
155	device *userapi.Device,
156	roomID, eventType string, stateKey *string,
157	cfg *config.ClientAPI,
158	rsAPI api.RoomserverInternalAPI,
159) (*gomatrixserverlib.Event, *util.JSONResponse) {
160	// parse the incoming http request
161	userID := device.UserID
162	var r map[string]interface{} // must be a JSON object
163	resErr := httputil.UnmarshalJSONRequest(req, &r)
164	if resErr != nil {
165		return nil, resErr
166	}
167
168	evTime, err := httputil.ParseTSParam(req)
169	if err != nil {
170		return nil, &util.JSONResponse{
171			Code: http.StatusBadRequest,
172			JSON: jsonerror.InvalidArgumentValue(err.Error()),
173		}
174	}
175
176	// create the new event and set all the fields we can
177	builder := gomatrixserverlib.EventBuilder{
178		Sender:   userID,
179		RoomID:   roomID,
180		Type:     eventType,
181		StateKey: stateKey,
182	}
183	err = builder.SetContent(r)
184	if err != nil {
185		util.GetLogger(req.Context()).WithError(err).Error("builder.SetContent failed")
186		resErr := jsonerror.InternalServerError()
187		return nil, &resErr
188	}
189
190	var queryRes api.QueryLatestEventsAndStateResponse
191	e, err := eventutil.QueryAndBuildEvent(req.Context(), &builder, cfg.Matrix, evTime, rsAPI, &queryRes)
192	if err == eventutil.ErrRoomNoExists {
193		return nil, &util.JSONResponse{
194			Code: http.StatusNotFound,
195			JSON: jsonerror.NotFound("Room does not exist"),
196		}
197	} else if e, ok := err.(gomatrixserverlib.BadJSONError); ok {
198		return nil, &util.JSONResponse{
199			Code: http.StatusBadRequest,
200			JSON: jsonerror.BadJSON(e.Error()),
201		}
202	} else if e, ok := err.(gomatrixserverlib.EventValidationError); ok {
203		if e.Code == gomatrixserverlib.EventValidationTooLarge {
204			return nil, &util.JSONResponse{
205				Code: http.StatusRequestEntityTooLarge,
206				JSON: jsonerror.BadJSON(e.Error()),
207			}
208		}
209		return nil, &util.JSONResponse{
210			Code: http.StatusBadRequest,
211			JSON: jsonerror.BadJSON(e.Error()),
212		}
213	} else if err != nil {
214		util.GetLogger(req.Context()).WithError(err).Error("eventutil.BuildEvent failed")
215		resErr := jsonerror.InternalServerError()
216		return nil, &resErr
217	}
218
219	// check to see if this user can perform this operation
220	stateEvents := make([]*gomatrixserverlib.Event, len(queryRes.StateEvents))
221	for i := range queryRes.StateEvents {
222		stateEvents[i] = queryRes.StateEvents[i].Event
223	}
224	provider := gomatrixserverlib.NewAuthEvents(stateEvents)
225	if err = gomatrixserverlib.Allowed(e.Event, &provider); err != nil {
226		return nil, &util.JSONResponse{
227			Code: http.StatusForbidden,
228			JSON: jsonerror.Forbidden(err.Error()), // TODO: Is this error string comprehensible to the client?
229		}
230	}
231	return e.Event, nil
232}
233