1 /*
2 * SessionClientEventQueue.cpp
3 *
4 * Copyright (C) 2021 by RStudio, PBC
5 *
6 * Unless you have received this program directly from RStudio pursuant
7 * to the terms of a commercial license agreement with RStudio, then
8 * this program is licensed to you under the terms of version 3 of the
9 * GNU Affero General Public License. This program is distributed WITHOUT
10 * ANY EXPRESS OR IMPLIED WARRANTY, INCLUDING THOSE OF NON-INFRINGEMENT,
11 * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Please refer to the
12 * AGPL (http://www.gnu.org/licenses/agpl-3.0.txt) for more details.
13 *
14 */
15
16 #include "SessionClientEventQueue.hpp"
17
18 #include "modules/SessionConsole.hpp"
19
20 #include <core/BoostThread.hpp>
21 #include <core/Thread.hpp>
22 #include <shared_core/json/Json.hpp>
23 #include <core/StringUtils.hpp>
24
25 #include <r/session/RConsoleActions.hpp>
26
27 #include "SessionHttpMethods.hpp"
28
29 using namespace rstudio::core;
30
31 namespace rstudio {
32 namespace session {
33
34 namespace {
35 ClientEventQueue* s_pClientEventQueue = nullptr;
36 }
37
initializeClientEventQueue()38 void initializeClientEventQueue()
39 {
40 BOOST_ASSERT(s_pClientEventQueue == nullptr);
41 s_pClientEventQueue = new ClientEventQueue();
42 }
43
clientEventQueue()44 ClientEventQueue& clientEventQueue()
45 {
46 return *s_pClientEventQueue;
47 }
48
ClientEventQueue()49 ClientEventQueue::ClientEventQueue()
50 : pMutex_(new boost::mutex()),
51 pWaitForEventCondition_(new boost::condition()),
52 lastEventAddTime_(boost::posix_time::not_a_date_time)
53 {
54 }
55
setActiveConsole(const std::string & console)56 bool ClientEventQueue::setActiveConsole(const std::string& console)
57 {
58 bool changed = false;
59 LOCK_MUTEX(*pMutex_)
60 {
61 if (activeConsole_ != console)
62 {
63 // flush events to the previous console
64 flushPendingConsoleOutput();
65
66 // switch to the new one
67 activeConsole_ = console;
68 changed = true;
69 }
70 }
71 END_LOCK_MUTEX
72 return changed;
73 }
74
add(const ClientEvent & event)75 void ClientEventQueue::add(const ClientEvent& event)
76 {
77 if (http_methods::protocolDebugEnabled() && event.type() != client_events::kConsoleWriteError)
78 {
79 if (event.data().getType() == json::Type::STRING)
80 LOG_DEBUG_MESSAGE("Queued event: " + event.typeName() + ": " + event.data().getString());
81 else
82 LOG_DEBUG_MESSAGE("Queued event: " + event.typeName());
83 }
84 LOCK_MUTEX(*pMutex_)
85 {
86 // console output is batched up for compactness/efficiency.
87 if (event.type() == client_events::kConsoleWriteOutput)
88 {
89 if (event.data().getType() == json::Type::STRING)
90 pendingConsoleOutput_ += event.data().getString();
91 }
92 else if (event.type() == client_events::kConsoleWriteError &&
93 event.data().getType() == json::Type::STRING)
94 {
95 flushPendingConsoleOutput();
96 enqueueClientOutputEvent(event.type(), event.data().getString());
97 }
98 else
99 {
100 // flush existing console output prior to adding an
101 // action of another type
102 flushPendingConsoleOutput();
103
104 // add event to queue
105 pendingEvents_.push_back(event);
106 }
107
108 lastEventAddTime_ = boost::posix_time::microsec_clock::universal_time();
109 }
110 END_LOCK_MUTEX
111
112 // notify listeners that an event has been added
113 pWaitForEventCondition_->notify_all();
114 }
115
hasEvents()116 bool ClientEventQueue::hasEvents()
117 {
118 LOCK_MUTEX(*pMutex_)
119 {
120 return pendingEvents_.size() > 0 || pendingConsoleOutput_.length() > 0;
121 }
122 END_LOCK_MUTEX
123
124 // keep compiler happy
125 return false;
126 }
127
remove(std::vector<ClientEvent> * pEvents)128 void ClientEventQueue::remove(std::vector<ClientEvent>* pEvents)
129 {
130 LOCK_MUTEX(*pMutex_)
131 {
132 // flush any pending output
133 flushPendingConsoleOutput();
134
135 // copy the events to the caller
136 pEvents->insert(pEvents->begin(),
137 pendingEvents_.begin(),
138 pendingEvents_.end());
139
140 // clear pending events
141 pendingEvents_.clear();
142 }
143 END_LOCK_MUTEX
144 }
145
clear()146 void ClientEventQueue::clear()
147 {
148 LOCK_MUTEX(*pMutex_)
149 {
150 pendingConsoleOutput_.clear();
151 pendingEvents_.clear();
152 }
153 END_LOCK_MUTEX
154 }
155
156
waitForEvent(const boost::posix_time::time_duration & waitDuration)157 bool ClientEventQueue::waitForEvent(
158 const boost::posix_time::time_duration& waitDuration)
159 {
160 using namespace boost;
161 try
162 {
163 unique_lock<mutex> lock(*pMutex_);
164 system_time timeoutTime = get_system_time() + waitDuration;
165 return pWaitForEventCondition_->timed_wait(lock, timeoutTime);
166 }
167 catch(const thread_resource_error& e)
168 {
169 Error waitError(boost::thread_error::ec_from_exception(e),
170 ERROR_LOCATION);
171 LOG_ERROR(waitError);
172 return false;
173 }
174 }
175
176
eventAddedSince(const boost::posix_time::ptime & time)177 bool ClientEventQueue::eventAddedSince(const boost::posix_time::ptime& time)
178 {
179 LOCK_MUTEX(*pMutex_)
180 {
181 if (lastEventAddTime_.is_not_a_date_time())
182 return false;
183 else
184 return lastEventAddTime_ >= time;
185 }
186 END_LOCK_MUTEX
187
188 // keep compiler happy
189 return false;
190 }
191
192
flushPendingConsoleOutput()193 void ClientEventQueue::flushPendingConsoleOutput()
194 {
195 // NOTE: private helper so no lock required (mutex is not recursive)
196
197 if ( !pendingConsoleOutput_.empty() )
198 {
199 // If there's more console output than the client can even show, then
200 // truncate it to the amount that the client can show. Too much output
201 // can overwhelm the client, causing it to become unresponsive.
202 int limit = r::session::consoleActions().capacity() + 1;
203 string_utils::trimLeadingLines(limit, &pendingConsoleOutput_);
204
205 enqueueClientOutputEvent(client_events::kConsoleWriteOutput,
206 pendingConsoleOutput_);
207 pendingConsoleOutput_.clear();
208 }
209 }
210
enqueueClientOutputEvent(int event,const std::string & text)211 void ClientEventQueue::enqueueClientOutputEvent(
212 int event, const std::string& text)
213 {
214 json::Object output;
215 output[kConsoleText] = text;
216 output[kConsoleId] = activeConsole_;
217 pendingEvents_.push_back(ClientEvent(event, output));
218 }
219
220 } // namespace session
221 } // namespace rstudio
222