1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_FORWARDER_BASE_H_
6 #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_FORWARDER_BASE_H_
7 
8 #include <memory>
9 #include <vector>
10 
11 #include "base/callback.h"
12 #include "base/component_export.h"
13 #include "base/macros.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/memory/scoped_refptr.h"
16 #include "base/memory/weak_ptr.h"
17 #include "base/sequenced_task_runner.h"
18 #include "base/synchronization/lock.h"
19 #include "base/synchronization/waitable_event.h"
20 #include "mojo/public/cpp/bindings/associated_group.h"
21 #include "mojo/public/cpp/bindings/message.h"
22 
23 namespace mojo {
24 
25 // This class defines out-of-line logic common to the behavior of
26 // ThreadSafeForwarder<Interface>, which is in turn used to support the
27 // implementation of SharedRemote<Interface> and
28 // (deprecated) ThreadSafeInterfacePtr<Interface>.
29 //
30 // This object is sequence-affine and it provides an opaque interface to an
31 // underlying weakly-referenced interface proxy (e.g. a Remote) which may be
32 // bound on a different sequence and referenced weakly by any number of other
33 // ThreadSafeForwarders. The opaque interface is provide via a set of callbacks
34 // bound internally by e.g. SharedRemote or ThreadSafeInterfacePtr.
COMPONENT_EXPORT(MOJO_CPP_BINDINGS)35 class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) ThreadSafeForwarderBase
36     : public MessageReceiverWithResponder {
37  public:
38   // A callback used to send a message on the underlying interface proxy. Used
39   // only for messages with no reply.
40   using ForwardMessageCallback = base::RepeatingCallback<void(Message)>;
41 
42   // A callback used to send a message on the underlying interface proxy. Used
43   // only for messages with no reply.
44   using ForwardMessageWithResponderCallback =
45       base::RepeatingCallback<void(Message, std::unique_ptr<MessageReceiver>)>;
46 
47   // A callback used to reconfigure the underlying proxy by changing whether or
48   // not it can perform its own blocking waits for [Sync] message replies. When
49   // `force` is false, the proxy behaves normally and will block the calling
50   // thread when used to issue a sync message; when `force` is true however,
51   // [Sync] is effectively ignored when sending messages and the reply is
52   // received asynchronously. ThreadSafeForwarderBase uses this to disable
53   // normal synchronous behavior and implement its own sync waiting from the
54   // caller's thread rather than the proxy's bound thread.
55   using ForceAsyncSendCallback = base::RepeatingCallback<void(bool force)>;
56 
57   // Constructs a new ThreadSafeForwarderBase which forwards requests to a proxy
58   // bound on `task_runner`. Forwarding is done opaquely via the callbacks given
59   // in `forward` (to send one-off messages), `forward_with_responder` (to send
60   // messages expecting replies), and `force_async_send` to control sync IPC
61   // behavior within the underlying proxy.
62   ThreadSafeForwarderBase(
63       scoped_refptr<base::SequencedTaskRunner> task_runner,
64       ForwardMessageCallback forward,
65       ForwardMessageWithResponderCallback forward_with_responder,
66       ForceAsyncSendCallback force_async_send,
67       const AssociatedGroup& associated_group);
68 
69   ~ThreadSafeForwarderBase() override;
70 
71   // MessageReceiverWithResponder implementation:
72   bool PrefersSerializedMessages() override;
73   bool Accept(Message* message) override;
74   bool AcceptWithResponder(Message* message,
75                            std::unique_ptr<MessageReceiver> responder) override;
76 
77  private:
78   // Data that we need to share between the sequences involved in a sync call.
79   struct SyncResponseInfo
80       : public base::RefCountedThreadSafe<SyncResponseInfo> {
81     SyncResponseInfo();
82 
83     Message message;
84     bool received = false;
85     base::WaitableEvent event{base::WaitableEvent::ResetPolicy::MANUAL,
86                               base::WaitableEvent::InitialState::NOT_SIGNALED};
87 
88    private:
89     friend class base::RefCountedThreadSafe<SyncResponseInfo>;
90 
91     ~SyncResponseInfo();
92   };
93 
94   // A MessageReceiver that signals |response| when it either accepts the
95   // response message, or is destructed.
96   class SyncResponseSignaler : public MessageReceiver {
97    public:
98     explicit SyncResponseSignaler(scoped_refptr<SyncResponseInfo> response);
99     ~SyncResponseSignaler() override;
100 
101     bool Accept(Message* message) override;
102 
103    private:
104     scoped_refptr<SyncResponseInfo> response_;
105   };
106 
107   // A record of the pending sync responses for canceling pending sync calls
108   // when the owning ThreadSafeForwarder is destructed.
109   struct InProgressSyncCalls
110       : public base::RefCountedThreadSafe<InProgressSyncCalls> {
111     InProgressSyncCalls();
112 
113     // |lock| protects access to |pending_responses|.
114     base::Lock lock;
115     std::vector<SyncResponseInfo*> pending_responses;
116 
117    private:
118     friend class base::RefCountedThreadSafe<InProgressSyncCalls>;
119 
120     ~InProgressSyncCalls();
121   };
122 
123   class ForwardToCallingThread : public MessageReceiver {
124    public:
125     explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder);
126     ~ForwardToCallingThread() override;
127 
128    private:
129     bool Accept(Message* message) override;
130 
131     static void CallAcceptAndDeleteResponder(
132         std::unique_ptr<MessageReceiver> responder,
133         Message message);
134 
135     std::unique_ptr<MessageReceiver> responder_;
136     scoped_refptr<base::SequencedTaskRunner> caller_task_runner_;
137   };
138 
139   const scoped_refptr<base::SequencedTaskRunner> task_runner_;
140   const ForwardMessageCallback forward_;
141   const ForwardMessageWithResponderCallback forward_with_responder_;
142   const ForceAsyncSendCallback force_async_send_;
143   AssociatedGroup associated_group_;
144   scoped_refptr<InProgressSyncCalls> sync_calls_;
145   int sync_call_nesting_level_ = 0;
146   base::WeakPtrFactory<ThreadSafeForwarderBase> weak_ptr_factory_{this};
147 
148   DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarderBase);
149 };
150 
151 }  // namespace mojo
152 
153 #endif  // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_FORWARDER_BASE_H_
154