1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "remoting/protocol/audio_pump.h"
6 
7 #include <memory>
8 #include <utility>
9 
10 #include "base/bind.h"
11 #include "base/check_op.h"
12 #include "base/location.h"
13 #include "base/macros.h"
14 #include "base/notreached.h"
15 #include "base/single_thread_task_runner.h"
16 #include "base/threading/thread_task_runner_handle.h"
17 #include "media/base/audio_bus.h"
18 #include "media/base/audio_sample_types.h"
19 #include "media/base/channel_layout.h"
20 #include "media/base/channel_mixer.h"
21 #include "remoting/codec/audio_encoder.h"
22 #include "remoting/proto/audio.pb.h"
23 #include "remoting/protocol/audio_source.h"
24 #include "remoting/protocol/audio_stub.h"
25 
26 namespace {
27 
CalculateFrameCount(const remoting::AudioPacket & packet)28 int CalculateFrameCount(const remoting::AudioPacket& packet) {
29   return packet.data(0).size() / packet.channels() / packet.bytes_per_sample();
30 }
31 
AudioPacketToAudioBus(const remoting::AudioPacket & packet)32 std::unique_ptr<media::AudioBus> AudioPacketToAudioBus(
33     const remoting::AudioPacket& packet) {
34   const int frame_count = CalculateFrameCount(packet);
35   DCHECK_GT(frame_count, 0);
36   std::unique_ptr<media::AudioBus> result =
37       media::AudioBus::Create(packet.channels(), frame_count);
38   result->FromInterleaved<media::SignedInt16SampleTypeTraits>(
39       reinterpret_cast<const int16_t*>(packet.data(0).data()), frame_count);
40   return result;
41 }
42 
AudioBusToAudioPacket(const media::AudioBus & packet)43 std::unique_ptr<remoting::AudioPacket> AudioBusToAudioPacket(
44     const media::AudioBus& packet) {
45   std::unique_ptr<remoting::AudioPacket> result =
46       std::make_unique<remoting::AudioPacket>();
47   result->add_data()->resize(
48       packet.channels() * packet.frames() * sizeof(int16_t));
49   packet.ToInterleaved<media::SignedInt16SampleTypeTraits>(
50       packet.frames(),
51       reinterpret_cast<int16_t*>(&(result->mutable_data(0)->at(0))));
52   result->set_encoding(remoting::AudioPacket::ENCODING_RAW);
53   result->set_channels(
54       static_cast<remoting::AudioPacket::Channels>(packet.channels()));
55   result->set_bytes_per_sample(remoting::AudioPacket::BYTES_PER_SAMPLE_2);
56   return result;
57 }
58 
RetrieveLayout(const remoting::AudioPacket & packet)59 media::ChannelLayout RetrieveLayout(const remoting::AudioPacket& packet) {
60   // This switch should match AudioPacket::Channels enum in audio.proto.
61   switch (packet.channels()) {
62     case remoting::AudioPacket::CHANNELS_INVALID:
63       return media::CHANNEL_LAYOUT_UNSUPPORTED;
64     case remoting::AudioPacket::CHANNELS_MONO:
65       return media::CHANNEL_LAYOUT_MONO;
66     case remoting::AudioPacket::CHANNELS_STEREO:
67       return media::CHANNEL_LAYOUT_STEREO;
68     case remoting::AudioPacket::CHANNELS_SURROUND:
69       return media::CHANNEL_LAYOUT_SURROUND;
70     case remoting::AudioPacket::CHANNELS_4_0:
71       return media::CHANNEL_LAYOUT_4_0;
72     case remoting::AudioPacket::CHANNELS_4_1:
73       return media::CHANNEL_LAYOUT_4_1;
74     case remoting::AudioPacket::CHANNELS_5_1:
75       return media::CHANNEL_LAYOUT_5_1;
76     case remoting::AudioPacket::CHANNELS_6_1:
77       return media::CHANNEL_LAYOUT_6_1;
78     case remoting::AudioPacket::CHANNELS_7_1:
79       return media::CHANNEL_LAYOUT_7_1;
80   }
81   NOTREACHED() << "Invalid AudioPacket::Channels";
82   return media::CHANNEL_LAYOUT_UNSUPPORTED;
83 }
84 
85 }  // namespace
86 
87 namespace remoting {
88 namespace protocol {
89 
90 // Limit the data stored in the pending send buffers to 250ms.
91 const int kMaxBufferedIntervalMs = 250;
92 
93 class AudioPump::Core {
94  public:
95   Core(base::WeakPtr<AudioPump> pump,
96        std::unique_ptr<AudioSource> audio_source,
97        std::unique_ptr<AudioEncoder> audio_encoder);
98   ~Core();
99 
100   void Start();
101   void Pause(bool pause);
102 
103   void OnPacketSent(int size);
104 
105  private:
106   std::unique_ptr<AudioPacket> Downmix(std::unique_ptr<AudioPacket> packet);
107 
108   void EncodeAudioPacket(std::unique_ptr<AudioPacket> packet);
109 
110   base::ThreadChecker thread_checker_;
111 
112   base::WeakPtr<AudioPump> pump_;
113 
114   scoped_refptr<base::SingleThreadTaskRunner> pump_task_runner_;
115 
116   std::unique_ptr<AudioSource> audio_source_;
117   std::unique_ptr<AudioEncoder> audio_encoder_;
118 
119   bool enabled_;
120 
121   // Number of bytes in the queue that have been encoded but haven't been sent
122   // yet.
123   int bytes_pending_;
124 
125   std::unique_ptr<media::ChannelMixer> mixer_;
126   media::ChannelLayout mixer_input_layout_ = media::CHANNEL_LAYOUT_NONE;
127 
128   DISALLOW_COPY_AND_ASSIGN(Core);
129 };
130 
Core(base::WeakPtr<AudioPump> pump,std::unique_ptr<AudioSource> audio_source,std::unique_ptr<AudioEncoder> audio_encoder)131 AudioPump::Core::Core(base::WeakPtr<AudioPump> pump,
132                       std::unique_ptr<AudioSource> audio_source,
133                       std::unique_ptr<AudioEncoder> audio_encoder)
134     : pump_(pump),
135       pump_task_runner_(base::ThreadTaskRunnerHandle::Get()),
136       audio_source_(std::move(audio_source)),
137       audio_encoder_(std::move(audio_encoder)),
138       enabled_(true),
139       bytes_pending_(0) {
140   thread_checker_.DetachFromThread();
141 }
142 
~Core()143 AudioPump::Core::~Core() {
144   DCHECK(thread_checker_.CalledOnValidThread());
145 }
146 
Start()147 void AudioPump::Core::Start() {
148   DCHECK(thread_checker_.CalledOnValidThread());
149 
150   audio_source_->Start(
151       base::BindRepeating(&Core::EncodeAudioPacket, base::Unretained(this)));
152 }
153 
Pause(bool pause)154 void AudioPump::Core::Pause(bool pause) {
155   DCHECK(thread_checker_.CalledOnValidThread());
156 
157   enabled_ = !pause;
158 }
159 
OnPacketSent(int size)160 void AudioPump::Core::OnPacketSent(int size) {
161   DCHECK(thread_checker_.CalledOnValidThread());
162 
163   bytes_pending_ -= size;
164   DCHECK_GE(bytes_pending_, 0);
165 }
166 
EncodeAudioPacket(std::unique_ptr<AudioPacket> packet)167 void AudioPump::Core::EncodeAudioPacket(std::unique_ptr<AudioPacket> packet) {
168   DCHECK(thread_checker_.CalledOnValidThread());
169   DCHECK(packet);
170 
171   int max_buffered_bytes =
172       audio_encoder_->GetBitrate() * kMaxBufferedIntervalMs / 1000 / 8;
173   if (!enabled_ || bytes_pending_ > max_buffered_bytes) {
174     return;
175   }
176 
177   if (packet->channels() > AudioPacket::CHANNELS_STEREO) {
178     packet = Downmix(std::move(packet));
179   }
180 
181   std::unique_ptr<AudioPacket> encoded_packet =
182       audio_encoder_->Encode(std::move(packet));
183 
184   // The audio encoder returns a null audio packet if there's no audio to send.
185   if (!encoded_packet) {
186     return;
187   }
188 
189   int packet_size = encoded_packet->ByteSize();
190   bytes_pending_ += packet_size;
191 
192   pump_task_runner_->PostTask(
193       FROM_HERE, base::BindOnce(&AudioPump::SendAudioPacket, pump_,
194                                 std::move(encoded_packet), packet_size));
195 }
196 
Downmix(std::unique_ptr<AudioPacket> packet)197 std::unique_ptr<AudioPacket> AudioPump::Core::Downmix(
198     std::unique_ptr<AudioPacket> packet) {
199   DCHECK(thread_checker_.CalledOnValidThread());
200   DCHECK(packet);
201   DCHECK_EQ(packet->data_size(), 1);
202   DCHECK_EQ(packet->bytes_per_sample(), AudioPacket::BYTES_PER_SAMPLE_2);
203 
204   const media::ChannelLayout input_layout = RetrieveLayout(*packet);
205   DCHECK_NE(input_layout, media::CHANNEL_LAYOUT_UNSUPPORTED);
206   DCHECK_NE(input_layout, media::CHANNEL_LAYOUT_MONO);
207   DCHECK_NE(input_layout, media::CHANNEL_LAYOUT_STEREO);
208 
209   if (!mixer_ || mixer_input_layout_ != input_layout) {
210     mixer_input_layout_ = input_layout;
211     mixer_ = std::make_unique<media::ChannelMixer>(
212         input_layout, media::CHANNEL_LAYOUT_STEREO);
213   }
214 
215   std::unique_ptr<media::AudioBus> input = AudioPacketToAudioBus(*packet);
216   DCHECK(input);
217   std::unique_ptr<media::AudioBus> output =
218       media::AudioBus::Create(AudioPacket::CHANNELS_STEREO, input->frames());
219   mixer_->Transform(input.get(), output.get());
220 
221   std::unique_ptr<AudioPacket> result = AudioBusToAudioPacket(*output);
222   result->set_sampling_rate(packet->sampling_rate());
223   return result;
224 }
225 
AudioPump(scoped_refptr<base::SingleThreadTaskRunner> audio_task_runner,std::unique_ptr<AudioSource> audio_source,std::unique_ptr<AudioEncoder> audio_encoder,AudioStub * audio_stub)226 AudioPump::AudioPump(
227     scoped_refptr<base::SingleThreadTaskRunner> audio_task_runner,
228     std::unique_ptr<AudioSource> audio_source,
229     std::unique_ptr<AudioEncoder> audio_encoder,
230     AudioStub* audio_stub)
231     : audio_task_runner_(audio_task_runner), audio_stub_(audio_stub) {
232   DCHECK(audio_stub_);
233 
234   core_.reset(new Core(weak_factory_.GetWeakPtr(), std::move(audio_source),
235                        std::move(audio_encoder)));
236 
237   audio_task_runner_->PostTask(
238       FROM_HERE, base::BindOnce(&Core::Start, base::Unretained(core_.get())));
239 }
240 
~AudioPump()241 AudioPump::~AudioPump() {
242   DCHECK(thread_checker_.CalledOnValidThread());
243 
244   audio_task_runner_->DeleteSoon(FROM_HERE, core_.release());
245 }
246 
Pause(bool pause)247 void AudioPump::Pause(bool pause) {
248   DCHECK(thread_checker_.CalledOnValidThread());
249 
250   audio_task_runner_->PostTask(
251       FROM_HERE,
252       base::BindOnce(&Core::Pause, base::Unretained(core_.get()), pause));
253 }
254 
SendAudioPacket(std::unique_ptr<AudioPacket> packet,int size)255 void AudioPump::SendAudioPacket(std::unique_ptr<AudioPacket> packet, int size) {
256   DCHECK(thread_checker_.CalledOnValidThread());
257   DCHECK(packet);
258 
259   audio_stub_->ProcessAudioPacket(
260       std::move(packet), base::BindOnce(&AudioPump::OnPacketSent,
261                                         weak_factory_.GetWeakPtr(), size));
262 }
263 
OnPacketSent(int size)264 void AudioPump::OnPacketSent(int size) {
265   audio_task_runner_->PostTask(
266       FROM_HERE,
267       base::BindOnce(&Core::OnPacketSent, base::Unretained(core_.get()), size));
268 }
269 
270 }  // namespace protocol
271 }  // namespace remoting
272