1 /*
2  * SessionClientEventQueue.cpp
3  *
4  * Copyright (C) 2021 by RStudio, PBC
5  *
6  * Unless you have received this program directly from RStudio pursuant
7  * to the terms of a commercial license agreement with RStudio, then
8  * this program is licensed to you under the terms of version 3 of the
9  * GNU Affero General Public License. This program is distributed WITHOUT
10  * ANY EXPRESS OR IMPLIED WARRANTY, INCLUDING THOSE OF NON-INFRINGEMENT,
11  * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Please refer to the
12  * AGPL (http://www.gnu.org/licenses/agpl-3.0.txt) for more details.
13  *
14  */
15 
16 #include "SessionClientEventQueue.hpp"
17 
18 #include "modules/SessionConsole.hpp"
19 
20 #include <core/BoostThread.hpp>
21 #include <core/Thread.hpp>
22 #include <shared_core/json/Json.hpp>
23 #include <core/StringUtils.hpp>
24 
25 #include <r/session/RConsoleActions.hpp>
26 
27 #include "SessionHttpMethods.hpp"
28 
29 using namespace rstudio::core;
30 
31 namespace rstudio {
32 namespace session {
33 
34 namespace {
35 ClientEventQueue* s_pClientEventQueue = nullptr;
36 }
37 
initializeClientEventQueue()38 void initializeClientEventQueue()
39 {
40    BOOST_ASSERT(s_pClientEventQueue == nullptr);
41    s_pClientEventQueue = new ClientEventQueue();
42 }
43 
clientEventQueue()44 ClientEventQueue& clientEventQueue()
45 {
46    return *s_pClientEventQueue;
47 }
48 
ClientEventQueue()49 ClientEventQueue::ClientEventQueue()
50    :  pMutex_(new boost::mutex()),
51       pWaitForEventCondition_(new boost::condition()),
52       lastEventAddTime_(boost::posix_time::not_a_date_time)
53 {
54 }
55 
setActiveConsole(const std::string & console)56 bool ClientEventQueue::setActiveConsole(const std::string& console)
57 {
58    bool changed = false;
59    LOCK_MUTEX(*pMutex_)
60    {
61       if (activeConsole_ != console)
62       {
63          // flush events to the previous console
64          flushPendingConsoleOutput();
65 
66          // switch to the new one
67          activeConsole_ = console;
68          changed = true;
69       }
70    }
71    END_LOCK_MUTEX
72    return changed;
73 }
74 
add(const ClientEvent & event)75 void ClientEventQueue::add(const ClientEvent& event)
76 {
77    if (http_methods::protocolDebugEnabled() && event.type() != client_events::kConsoleWriteError)
78    {
79       if (event.data().getType() == json::Type::STRING)
80          LOG_DEBUG_MESSAGE("Queued event: " + event.typeName() + ": " + event.data().getString());
81       else
82          LOG_DEBUG_MESSAGE("Queued event: " + event.typeName());
83    }
84    LOCK_MUTEX(*pMutex_)
85    {
86       // console output is batched up for compactness/efficiency.
87       if (event.type() == client_events::kConsoleWriteOutput)
88       {
89          if (event.data().getType() == json::Type::STRING)
90             pendingConsoleOutput_ += event.data().getString();
91       }
92       else if (event.type() == client_events::kConsoleWriteError &&
93                event.data().getType() == json::Type::STRING)
94       {
95          flushPendingConsoleOutput();
96          enqueueClientOutputEvent(event.type(), event.data().getString());
97       }
98       else
99       {
100          // flush existing console output prior to adding an
101          // action of another type
102          flushPendingConsoleOutput();
103 
104          // add event to queue
105          pendingEvents_.push_back(event);
106       }
107 
108       lastEventAddTime_ = boost::posix_time::microsec_clock::universal_time();
109    }
110    END_LOCK_MUTEX
111 
112    // notify listeners that an event has been added
113    pWaitForEventCondition_->notify_all();
114 }
115 
hasEvents()116 bool ClientEventQueue::hasEvents()
117 {
118    LOCK_MUTEX(*pMutex_)
119    {
120       return pendingEvents_.size() > 0 || pendingConsoleOutput_.length() > 0;
121    }
122    END_LOCK_MUTEX
123 
124    // keep compiler happy
125    return false;
126 }
127 
remove(std::vector<ClientEvent> * pEvents)128 void ClientEventQueue::remove(std::vector<ClientEvent>* pEvents)
129 {
130    LOCK_MUTEX(*pMutex_)
131    {
132       // flush any pending output
133       flushPendingConsoleOutput();
134 
135       // copy the events to the caller
136       pEvents->insert(pEvents->begin(),
137                       pendingEvents_.begin(),
138                       pendingEvents_.end());
139 
140       // clear pending events
141       pendingEvents_.clear();
142    }
143    END_LOCK_MUTEX
144 }
145 
clear()146 void ClientEventQueue::clear()
147 {
148    LOCK_MUTEX(*pMutex_)
149    {
150       pendingConsoleOutput_.clear();
151       pendingEvents_.clear();
152    }
153    END_LOCK_MUTEX
154 }
155 
156 
waitForEvent(const boost::posix_time::time_duration & waitDuration)157 bool ClientEventQueue::waitForEvent(
158                         const boost::posix_time::time_duration& waitDuration)
159 {
160    using namespace boost;
161    try
162    {
163       unique_lock<mutex> lock(*pMutex_);
164       system_time timeoutTime = get_system_time() + waitDuration;
165       return pWaitForEventCondition_->timed_wait(lock, timeoutTime);
166    }
167    catch(const thread_resource_error& e)
168    {
169       Error waitError(boost::thread_error::ec_from_exception(e),
170                         ERROR_LOCATION);
171       LOG_ERROR(waitError);
172       return false;
173    }
174 }
175 
176 
eventAddedSince(const boost::posix_time::ptime & time)177 bool ClientEventQueue::eventAddedSince(const boost::posix_time::ptime& time)
178 {
179    LOCK_MUTEX(*pMutex_)
180    {
181       if (lastEventAddTime_.is_not_a_date_time())
182          return false;
183       else
184          return lastEventAddTime_ >= time;
185    }
186    END_LOCK_MUTEX
187 
188    // keep compiler happy
189    return false;
190 }
191 
192 
flushPendingConsoleOutput()193 void ClientEventQueue::flushPendingConsoleOutput()
194 {
195    // NOTE: private helper so no lock required (mutex is not recursive)
196 
197    if ( !pendingConsoleOutput_.empty() )
198    {
199       // If there's more console output than the client can even show, then
200       // truncate it to the amount that the client can show. Too much output
201       // can overwhelm the client, causing it to become unresponsive.
202       int limit = r::session::consoleActions().capacity() + 1;
203       string_utils::trimLeadingLines(limit, &pendingConsoleOutput_);
204 
205       enqueueClientOutputEvent(client_events::kConsoleWriteOutput,
206             pendingConsoleOutput_);
207       pendingConsoleOutput_.clear();
208    }
209 }
210 
enqueueClientOutputEvent(int event,const std::string & text)211 void ClientEventQueue::enqueueClientOutputEvent(
212       int event, const std::string& text)
213 {
214    json::Object output;
215    output[kConsoleText] = text;
216    output[kConsoleId]   = activeConsole_;
217    pendingEvents_.push_back(ClientEvent(event, output));
218 }
219 
220 } // namespace session
221 } // namespace rstudio
222