1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "remoting/protocol/audio_pump.h"
6
7 #include <memory>
8 #include <utility>
9
10 #include "base/bind.h"
11 #include "base/check_op.h"
12 #include "base/location.h"
13 #include "base/macros.h"
14 #include "base/notreached.h"
15 #include "base/single_thread_task_runner.h"
16 #include "base/threading/thread_task_runner_handle.h"
17 #include "media/base/audio_bus.h"
18 #include "media/base/audio_sample_types.h"
19 #include "media/base/channel_layout.h"
20 #include "media/base/channel_mixer.h"
21 #include "remoting/codec/audio_encoder.h"
22 #include "remoting/proto/audio.pb.h"
23 #include "remoting/protocol/audio_source.h"
24 #include "remoting/protocol/audio_stub.h"
25
26 namespace {
27
CalculateFrameCount(const remoting::AudioPacket & packet)28 int CalculateFrameCount(const remoting::AudioPacket& packet) {
29 return packet.data(0).size() / packet.channels() / packet.bytes_per_sample();
30 }
31
AudioPacketToAudioBus(const remoting::AudioPacket & packet)32 std::unique_ptr<media::AudioBus> AudioPacketToAudioBus(
33 const remoting::AudioPacket& packet) {
34 const int frame_count = CalculateFrameCount(packet);
35 DCHECK_GT(frame_count, 0);
36 std::unique_ptr<media::AudioBus> result =
37 media::AudioBus::Create(packet.channels(), frame_count);
38 result->FromInterleaved<media::SignedInt16SampleTypeTraits>(
39 reinterpret_cast<const int16_t*>(packet.data(0).data()), frame_count);
40 return result;
41 }
42
AudioBusToAudioPacket(const media::AudioBus & packet)43 std::unique_ptr<remoting::AudioPacket> AudioBusToAudioPacket(
44 const media::AudioBus& packet) {
45 std::unique_ptr<remoting::AudioPacket> result =
46 std::make_unique<remoting::AudioPacket>();
47 result->add_data()->resize(
48 packet.channels() * packet.frames() * sizeof(int16_t));
49 packet.ToInterleaved<media::SignedInt16SampleTypeTraits>(
50 packet.frames(),
51 reinterpret_cast<int16_t*>(&(result->mutable_data(0)->at(0))));
52 result->set_encoding(remoting::AudioPacket::ENCODING_RAW);
53 result->set_channels(
54 static_cast<remoting::AudioPacket::Channels>(packet.channels()));
55 result->set_bytes_per_sample(remoting::AudioPacket::BYTES_PER_SAMPLE_2);
56 return result;
57 }
58
RetrieveLayout(const remoting::AudioPacket & packet)59 media::ChannelLayout RetrieveLayout(const remoting::AudioPacket& packet) {
60 // This switch should match AudioPacket::Channels enum in audio.proto.
61 switch (packet.channels()) {
62 case remoting::AudioPacket::CHANNELS_INVALID:
63 return media::CHANNEL_LAYOUT_UNSUPPORTED;
64 case remoting::AudioPacket::CHANNELS_MONO:
65 return media::CHANNEL_LAYOUT_MONO;
66 case remoting::AudioPacket::CHANNELS_STEREO:
67 return media::CHANNEL_LAYOUT_STEREO;
68 case remoting::AudioPacket::CHANNELS_SURROUND:
69 return media::CHANNEL_LAYOUT_SURROUND;
70 case remoting::AudioPacket::CHANNELS_4_0:
71 return media::CHANNEL_LAYOUT_4_0;
72 case remoting::AudioPacket::CHANNELS_4_1:
73 return media::CHANNEL_LAYOUT_4_1;
74 case remoting::AudioPacket::CHANNELS_5_1:
75 return media::CHANNEL_LAYOUT_5_1;
76 case remoting::AudioPacket::CHANNELS_6_1:
77 return media::CHANNEL_LAYOUT_6_1;
78 case remoting::AudioPacket::CHANNELS_7_1:
79 return media::CHANNEL_LAYOUT_7_1;
80 }
81 NOTREACHED() << "Invalid AudioPacket::Channels";
82 return media::CHANNEL_LAYOUT_UNSUPPORTED;
83 }
84
85 } // namespace
86
87 namespace remoting {
88 namespace protocol {
89
90 // Limit the data stored in the pending send buffers to 250ms.
91 const int kMaxBufferedIntervalMs = 250;
92
93 class AudioPump::Core {
94 public:
95 Core(base::WeakPtr<AudioPump> pump,
96 std::unique_ptr<AudioSource> audio_source,
97 std::unique_ptr<AudioEncoder> audio_encoder);
98 ~Core();
99
100 void Start();
101 void Pause(bool pause);
102
103 void OnPacketSent(int size);
104
105 private:
106 std::unique_ptr<AudioPacket> Downmix(std::unique_ptr<AudioPacket> packet);
107
108 void EncodeAudioPacket(std::unique_ptr<AudioPacket> packet);
109
110 base::ThreadChecker thread_checker_;
111
112 base::WeakPtr<AudioPump> pump_;
113
114 scoped_refptr<base::SingleThreadTaskRunner> pump_task_runner_;
115
116 std::unique_ptr<AudioSource> audio_source_;
117 std::unique_ptr<AudioEncoder> audio_encoder_;
118
119 bool enabled_;
120
121 // Number of bytes in the queue that have been encoded but haven't been sent
122 // yet.
123 int bytes_pending_;
124
125 std::unique_ptr<media::ChannelMixer> mixer_;
126 media::ChannelLayout mixer_input_layout_ = media::CHANNEL_LAYOUT_NONE;
127
128 DISALLOW_COPY_AND_ASSIGN(Core);
129 };
130
Core(base::WeakPtr<AudioPump> pump,std::unique_ptr<AudioSource> audio_source,std::unique_ptr<AudioEncoder> audio_encoder)131 AudioPump::Core::Core(base::WeakPtr<AudioPump> pump,
132 std::unique_ptr<AudioSource> audio_source,
133 std::unique_ptr<AudioEncoder> audio_encoder)
134 : pump_(pump),
135 pump_task_runner_(base::ThreadTaskRunnerHandle::Get()),
136 audio_source_(std::move(audio_source)),
137 audio_encoder_(std::move(audio_encoder)),
138 enabled_(true),
139 bytes_pending_(0) {
140 thread_checker_.DetachFromThread();
141 }
142
~Core()143 AudioPump::Core::~Core() {
144 DCHECK(thread_checker_.CalledOnValidThread());
145 }
146
Start()147 void AudioPump::Core::Start() {
148 DCHECK(thread_checker_.CalledOnValidThread());
149
150 audio_source_->Start(
151 base::BindRepeating(&Core::EncodeAudioPacket, base::Unretained(this)));
152 }
153
Pause(bool pause)154 void AudioPump::Core::Pause(bool pause) {
155 DCHECK(thread_checker_.CalledOnValidThread());
156
157 enabled_ = !pause;
158 }
159
OnPacketSent(int size)160 void AudioPump::Core::OnPacketSent(int size) {
161 DCHECK(thread_checker_.CalledOnValidThread());
162
163 bytes_pending_ -= size;
164 DCHECK_GE(bytes_pending_, 0);
165 }
166
EncodeAudioPacket(std::unique_ptr<AudioPacket> packet)167 void AudioPump::Core::EncodeAudioPacket(std::unique_ptr<AudioPacket> packet) {
168 DCHECK(thread_checker_.CalledOnValidThread());
169 DCHECK(packet);
170
171 int max_buffered_bytes =
172 audio_encoder_->GetBitrate() * kMaxBufferedIntervalMs / 1000 / 8;
173 if (!enabled_ || bytes_pending_ > max_buffered_bytes) {
174 return;
175 }
176
177 if (packet->channels() > AudioPacket::CHANNELS_STEREO) {
178 packet = Downmix(std::move(packet));
179 }
180
181 std::unique_ptr<AudioPacket> encoded_packet =
182 audio_encoder_->Encode(std::move(packet));
183
184 // The audio encoder returns a null audio packet if there's no audio to send.
185 if (!encoded_packet) {
186 return;
187 }
188
189 int packet_size = encoded_packet->ByteSize();
190 bytes_pending_ += packet_size;
191
192 pump_task_runner_->PostTask(
193 FROM_HERE, base::BindOnce(&AudioPump::SendAudioPacket, pump_,
194 std::move(encoded_packet), packet_size));
195 }
196
Downmix(std::unique_ptr<AudioPacket> packet)197 std::unique_ptr<AudioPacket> AudioPump::Core::Downmix(
198 std::unique_ptr<AudioPacket> packet) {
199 DCHECK(thread_checker_.CalledOnValidThread());
200 DCHECK(packet);
201 DCHECK_EQ(packet->data_size(), 1);
202 DCHECK_EQ(packet->bytes_per_sample(), AudioPacket::BYTES_PER_SAMPLE_2);
203
204 const media::ChannelLayout input_layout = RetrieveLayout(*packet);
205 DCHECK_NE(input_layout, media::CHANNEL_LAYOUT_UNSUPPORTED);
206 DCHECK_NE(input_layout, media::CHANNEL_LAYOUT_MONO);
207 DCHECK_NE(input_layout, media::CHANNEL_LAYOUT_STEREO);
208
209 if (!mixer_ || mixer_input_layout_ != input_layout) {
210 mixer_input_layout_ = input_layout;
211 mixer_ = std::make_unique<media::ChannelMixer>(
212 input_layout, media::CHANNEL_LAYOUT_STEREO);
213 }
214
215 std::unique_ptr<media::AudioBus> input = AudioPacketToAudioBus(*packet);
216 DCHECK(input);
217 std::unique_ptr<media::AudioBus> output =
218 media::AudioBus::Create(AudioPacket::CHANNELS_STEREO, input->frames());
219 mixer_->Transform(input.get(), output.get());
220
221 std::unique_ptr<AudioPacket> result = AudioBusToAudioPacket(*output);
222 result->set_sampling_rate(packet->sampling_rate());
223 return result;
224 }
225
AudioPump(scoped_refptr<base::SingleThreadTaskRunner> audio_task_runner,std::unique_ptr<AudioSource> audio_source,std::unique_ptr<AudioEncoder> audio_encoder,AudioStub * audio_stub)226 AudioPump::AudioPump(
227 scoped_refptr<base::SingleThreadTaskRunner> audio_task_runner,
228 std::unique_ptr<AudioSource> audio_source,
229 std::unique_ptr<AudioEncoder> audio_encoder,
230 AudioStub* audio_stub)
231 : audio_task_runner_(audio_task_runner), audio_stub_(audio_stub) {
232 DCHECK(audio_stub_);
233
234 core_.reset(new Core(weak_factory_.GetWeakPtr(), std::move(audio_source),
235 std::move(audio_encoder)));
236
237 audio_task_runner_->PostTask(
238 FROM_HERE, base::BindOnce(&Core::Start, base::Unretained(core_.get())));
239 }
240
~AudioPump()241 AudioPump::~AudioPump() {
242 DCHECK(thread_checker_.CalledOnValidThread());
243
244 audio_task_runner_->DeleteSoon(FROM_HERE, core_.release());
245 }
246
Pause(bool pause)247 void AudioPump::Pause(bool pause) {
248 DCHECK(thread_checker_.CalledOnValidThread());
249
250 audio_task_runner_->PostTask(
251 FROM_HERE,
252 base::BindOnce(&Core::Pause, base::Unretained(core_.get()), pause));
253 }
254
SendAudioPacket(std::unique_ptr<AudioPacket> packet,int size)255 void AudioPump::SendAudioPacket(std::unique_ptr<AudioPacket> packet, int size) {
256 DCHECK(thread_checker_.CalledOnValidThread());
257 DCHECK(packet);
258
259 audio_stub_->ProcessAudioPacket(
260 std::move(packet), base::BindOnce(&AudioPump::OnPacketSent,
261 weak_factory_.GetWeakPtr(), size));
262 }
263
OnPacketSent(int size)264 void AudioPump::OnPacketSent(int size) {
265 audio_task_runner_->PostTask(
266 FROM_HERE,
267 base::BindOnce(&Core::OnPacketSent, base::Unretained(core_.get()), size));
268 }
269
270 } // namespace protocol
271 } // namespace remoting
272