1 /*
2 This file is part of Telegram Desktop,
3 the official desktop application for the Telegram messaging service.
4
5 For license and copyright information please follow this link:
6 https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
7 */
8 #include "data/data_group_call.h"
9
10 #include "base/unixtime.h"
11 #include "data/data_channel.h"
12 #include "data/data_chat.h"
13 #include "data/data_changes.h"
14 #include "data/data_session.h"
15 #include "main/main_session.h"
16 #include "calls/calls_instance.h"
17 #include "calls/group/calls_group_call.h"
18 #include "calls/group/calls_group_common.h"
19 #include "core/application.h"
20 #include "apiwrap.h"
21
22 namespace Data {
23 namespace {
24
25 constexpr auto kRequestPerPage = 50;
26 constexpr auto kSpeakingAfterActive = crl::time(6000);
27 constexpr auto kActiveAfterJoined = crl::time(1000);
28 constexpr auto kWaitForUpdatesTimeout = 3 * crl::time(1000);
29
ExtractNextOffset(const MTPphone_GroupCall & call)30 [[nodiscard]] QString ExtractNextOffset(const MTPphone_GroupCall &call) {
31 return call.match([&](const MTPDphone_groupCall &data) {
32 return qs(data.vparticipants_next_offset());
33 });
34 }
35
36 } // namespace
37
cameraEndpoint() const38 const std::string &GroupCallParticipant::cameraEndpoint() const {
39 return GetCameraEndpoint(videoParams);
40 }
41
screenEndpoint() const42 const std::string &GroupCallParticipant::screenEndpoint() const {
43 return GetScreenEndpoint(videoParams);
44 }
45
cameraPaused() const46 bool GroupCallParticipant::cameraPaused() const {
47 return IsCameraPaused(videoParams);
48 }
49
screenPaused() const50 bool GroupCallParticipant::screenPaused() const {
51 return IsScreenPaused(videoParams);
52 }
53
GroupCall(not_null<PeerData * > peer,CallId id,CallId accessHash,TimeId scheduleDate)54 GroupCall::GroupCall(
55 not_null<PeerData*> peer,
56 CallId id,
57 CallId accessHash,
58 TimeId scheduleDate)
59 : _id(id)
60 , _accessHash(accessHash)
61 , _peer(peer)
62 , _reloadByQueuedUpdatesTimer([=] { reload(); })
__anonbe5f47380402null63 , _speakingByActiveFinishTimer([=] { checkFinishSpeakingByActive(); })
64 , _scheduleDate(scheduleDate) {
65 }
66
~GroupCall()67 GroupCall::~GroupCall() {
68 api().request(_unknownParticipantPeersRequestId).cancel();
69 api().request(_participantsRequestId).cancel();
70 api().request(_reloadRequestId).cancel();
71 }
72
id() const73 CallId GroupCall::id() const {
74 return _id;
75 }
76
loaded() const77 bool GroupCall::loaded() const {
78 return _version > 0;
79 }
80
peer() const81 not_null<PeerData*> GroupCall::peer() const {
82 return _peer;
83 }
84
input() const85 MTPInputGroupCall GroupCall::input() const {
86 return MTP_inputGroupCall(MTP_long(_id), MTP_long(_accessHash));
87 }
88
setPeer(not_null<PeerData * > peer)89 void GroupCall::setPeer(not_null<PeerData*> peer) {
90 Expects(peer->migrateFrom() == _peer);
91 Expects(_peer->migrateTo() == peer);
92
93 _peer = peer;
94 }
95
participants() const96 auto GroupCall::participants() const
97 -> const std::vector<Participant> & {
98 return _participants;
99 }
100
requestParticipants()101 void GroupCall::requestParticipants() {
102 if (!_savedFull) {
103 if (_participantsRequestId || _reloadRequestId) {
104 return;
105 } else if (_allParticipantsLoaded) {
106 return;
107 }
108 }
109 _participantsRequestId = api().request(MTPphone_GetGroupParticipants(
110 input(),
111 MTP_vector<MTPInputPeer>(), // ids
112 MTP_vector<MTPint>(), // ssrcs
113 MTP_string(_savedFull
114 ? ExtractNextOffset(*_savedFull)
115 : _nextOffset),
116 MTP_int(kRequestPerPage)
117 )).done([=](const MTPphone_GroupParticipants &result) {
118 result.match([&](const MTPDphone_groupParticipants &data) {
119 _participantsRequestId = 0;
120 const auto reloaded = processSavedFullCall();
121 _nextOffset = qs(data.vnext_offset());
122 _peer->owner().processUsers(data.vusers());
123 _peer->owner().processChats(data.vchats());
124 applyParticipantsSlice(
125 data.vparticipants().v,
126 (reloaded
127 ? ApplySliceSource::FullReloaded
128 : ApplySliceSource::SliceLoaded));
129 setServerParticipantsCount(data.vcount().v);
130 if (data.vparticipants().v.isEmpty()) {
131 _allParticipantsLoaded = true;
132 }
133 finishParticipantsSliceRequest();
134 if (reloaded) {
135 _participantsReloaded.fire({});
136 }
137 });
138 }).fail([=](const MTP::Error &error) {
139 _participantsRequestId = 0;
140 const auto reloaded = processSavedFullCall();
141 setServerParticipantsCount(_participants.size());
142 _allParticipantsLoaded = true;
143 finishParticipantsSliceRequest();
144 if (reloaded) {
145 _participantsReloaded.fire({});
146 }
147 }).send();
148 }
149
processSavedFullCall()150 bool GroupCall::processSavedFullCall() {
151 if (!_savedFull) {
152 return false;
153 }
154 _reloadRequestId = 0;
155 processFullCallFields(*base::take(_savedFull));
156 return true;
157 }
158
finishParticipantsSliceRequest()159 void GroupCall::finishParticipantsSliceRequest() {
160 computeParticipantsCount();
161 processQueuedUpdates();
162 }
163
setServerParticipantsCount(int count)164 void GroupCall::setServerParticipantsCount(int count) {
165 _serverParticipantsCount = count;
166 changePeerEmptyCallFlag();
167 }
168
changePeerEmptyCallFlag()169 void GroupCall::changePeerEmptyCallFlag() {
170 const auto chat = _peer->asChat();
171 const auto channel = _peer->asChannel();
172 constexpr auto chatFlag = ChatDataFlag::CallNotEmpty;
173 constexpr auto channelFlag = ChannelDataFlag::CallNotEmpty;
174 if (_peer->groupCall() != this) {
175 return;
176 } else if (_serverParticipantsCount > 0) {
177 if (chat && !(chat->flags() & chatFlag)) {
178 chat->addFlags(chatFlag);
179 chat->session().changes().peerUpdated(
180 chat,
181 Data::PeerUpdate::Flag::GroupCall);
182 } else if (channel && !(channel->flags() & channelFlag)) {
183 channel->addFlags(channelFlag);
184 channel->session().changes().peerUpdated(
185 channel,
186 Data::PeerUpdate::Flag::GroupCall);
187 }
188 } else if (chat && (chat->flags() & chatFlag)) {
189 chat->removeFlags(chatFlag);
190 chat->session().changes().peerUpdated(
191 chat,
192 Data::PeerUpdate::Flag::GroupCall);
193 } else if (channel && (channel->flags() & channelFlag)) {
194 channel->removeFlags(channelFlag);
195 channel->session().changes().peerUpdated(
196 channel,
197 Data::PeerUpdate::Flag::GroupCall);
198 }
199 }
200
fullCount() const201 int GroupCall::fullCount() const {
202 return _fullCount.current();
203 }
204
fullCountValue() const205 rpl::producer<int> GroupCall::fullCountValue() const {
206 return _fullCount.value();
207 }
208
participantsLoaded() const209 bool GroupCall::participantsLoaded() const {
210 return _allParticipantsLoaded;
211 }
212
participantPeerByAudioSsrc(uint32 ssrc) const213 PeerData *GroupCall::participantPeerByAudioSsrc(uint32 ssrc) const {
214 const auto i = _participantPeerByAudioSsrc.find(ssrc);
215 return (i != end(_participantPeerByAudioSsrc))
216 ? i->second.get()
217 : nullptr;
218 }
219
participantByPeer(not_null<PeerData * > peer) const220 const GroupCallParticipant *GroupCall::participantByPeer(
221 not_null<PeerData*> peer) const {
222 return const_cast<GroupCall*>(this)->findParticipant(peer);
223 }
224
findParticipant(not_null<PeerData * > peer)225 GroupCallParticipant *GroupCall::findParticipant(
226 not_null<PeerData*> peer) {
227 const auto i = ranges::find(_participants, peer, &Participant::peer);
228 return (i != end(_participants)) ? &*i : nullptr;
229 }
230
participantByEndpoint(const std::string & endpoint) const231 const GroupCallParticipant *GroupCall::participantByEndpoint(
232 const std::string &endpoint) const {
233 if (endpoint.empty()) {
234 return nullptr;
235 }
236 for (const auto &participant : _participants) {
237 if (GetCameraEndpoint(participant.videoParams) == endpoint
238 || GetScreenEndpoint(participant.videoParams) == endpoint) {
239 return &participant;
240 }
241 }
242 return nullptr;
243 }
244
participantsReloaded()245 rpl::producer<> GroupCall::participantsReloaded() {
246 return _participantsReloaded.events();
247 }
248
participantUpdated() const249 auto GroupCall::participantUpdated() const
250 -> rpl::producer<ParticipantUpdate> {
251 return _participantUpdates.events();
252 }
253
participantSpeaking() const254 auto GroupCall::participantSpeaking() const
255 -> rpl::producer<not_null<Participant*>> {
256 return _participantSpeaking.events();
257 }
258
enqueueUpdate(const MTPUpdate & update)259 void GroupCall::enqueueUpdate(const MTPUpdate &update) {
260 update.match([&](const MTPDupdateGroupCall &updateData) {
261 updateData.vcall().match([&](const MTPDgroupCall &data) {
262 const auto version = data.vversion().v;
263 if (!_applyingQueuedUpdates
264 && (!_version || _version == version)) {
265 DEBUG_LOG(("Group Call Participants: "
266 "Apply updateGroupCall %1 -> %2"
267 ).arg(_version
268 ).arg(version));
269 applyEnqueuedUpdate(update);
270 } else if (!_version || _version <= version) {
271 DEBUG_LOG(("Group Call Participants: "
272 "Queue updateGroupCall %1 -> %2"
273 ).arg(_version
274 ).arg(version));
275 const auto type = QueuedType::Call;
276 _queuedUpdates.emplace(std::pair{ version, type }, update);
277 }
278 }, [&](const MTPDgroupCallDiscarded &data) {
279 discard(data);
280 });
281 }, [&](const MTPDupdateGroupCallParticipants &updateData) {
282 const auto version = updateData.vversion().v;
283 const auto proj = [](const MTPGroupCallParticipant &data) {
284 return data.match([&](const MTPDgroupCallParticipant &data) {
285 return data.is_versioned();
286 });
287 };
288 const auto increment = ranges::contains(
289 updateData.vparticipants().v,
290 true,
291 proj);
292 const auto required = increment ? (version - 1) : version;
293 if (!_applyingQueuedUpdates && (_version == required)) {
294 DEBUG_LOG(("Group Call Participants: "
295 "Apply updateGroupCallParticipant %1 (%2)"
296 ).arg(_version
297 ).arg(Logs::b(increment)));
298 applyEnqueuedUpdate(update);
299 } else if (_version <= required) {
300 DEBUG_LOG(("Group Call Participants: "
301 "Queue updateGroupCallParticipant %1 -> %2 (%3)"
302 ).arg(_version
303 ).arg(version
304 ).arg(Logs::b(increment)));
305 const auto type = increment
306 ? QueuedType::VersionedParticipant
307 : QueuedType::Participant;
308 _queuedUpdates.emplace(std::pair{ version, type }, update);
309 }
310 }, [](const auto &) {
311 Unexpected("Type in GroupCall::enqueueUpdate.");
312 });
313 processQueuedUpdates();
314 }
315
discard(const MTPDgroupCallDiscarded & data)316 void GroupCall::discard(const MTPDgroupCallDiscarded &data) {
317 const auto id = _id;
318 const auto peer = _peer;
319 crl::on_main(&peer->session(), [=] {
320 if (peer->groupCall() && peer->groupCall()->id() == id) {
321 if (const auto chat = peer->asChat()) {
322 chat->clearGroupCall();
323 } else if (const auto channel = peer->asChannel()) {
324 channel->clearGroupCall();
325 }
326 }
327 });
328 Core::App().calls().applyGroupCallUpdateChecked(
329 &peer->session(),
330 MTP_updateGroupCall(
331 MTP_long(peer->isChat()
332 ? peerToChat(peer->id).bare
333 : peerToChannel(peer->id).bare),
334 MTP_groupCallDiscarded(
335 data.vid(),
336 data.vaccess_hash(),
337 data.vduration())));
338 }
339
processFullCallUsersChats(const MTPphone_GroupCall & call)340 void GroupCall::processFullCallUsersChats(const MTPphone_GroupCall &call) {
341 call.match([&](const MTPDphone_groupCall &data) {
342 _peer->owner().processUsers(data.vusers());
343 _peer->owner().processChats(data.vchats());
344 });
345 }
346
processFullCallFields(const MTPphone_GroupCall & call)347 void GroupCall::processFullCallFields(const MTPphone_GroupCall &call) {
348 call.match([&](const MTPDphone_groupCall &data) {
349 const auto &participants = data.vparticipants().v;
350 const auto nextOffset = qs(data.vparticipants_next_offset());
351 data.vcall().match([&](const MTPDgroupCall &data) {
352 _participants.clear();
353 _speakingByActiveFinishes.clear();
354 _participantPeerByAudioSsrc.clear();
355 _allParticipantsLoaded = false;
356
357 applyParticipantsSlice(
358 participants,
359 ApplySliceSource::FullReloaded);
360 _nextOffset = nextOffset;
361
362 applyCallFields(data);
363 }, [&](const MTPDgroupCallDiscarded &data) {
364 discard(data);
365 });
366 });
367 }
368
processFullCall(const MTPphone_GroupCall & call)369 void GroupCall::processFullCall(const MTPphone_GroupCall &call) {
370 processFullCallUsersChats(call);
371 processFullCallFields(call);
372 finishParticipantsSliceRequest();
373 _participantsReloaded.fire({});
374 }
375
applyCallFields(const MTPDgroupCall & data)376 void GroupCall::applyCallFields(const MTPDgroupCall &data) {
377 DEBUG_LOG(("Group Call Participants: "
378 "Set from groupCall %1 -> %2"
379 ).arg(_version
380 ).arg(data.vversion().v));
381 _version = data.vversion().v;
382 if (!_version) {
383 LOG(("API Error: Got zero version in groupCall."));
384 _version = 1;
385 }
386 _joinMuted = data.is_join_muted();
387 _canChangeJoinMuted = data.is_can_change_join_muted();
388 _joinedToTop = !data.is_join_date_asc();
389 setServerParticipantsCount(data.vparticipants_count().v);
390 changePeerEmptyCallFlag();
391 _title = qs(data.vtitle().value_or_empty());
392 {
393 _recordVideo = data.is_record_video_active();
394 _recordStartDate = data.vrecord_start_date().value_or_empty();
395 }
396 _scheduleDate = data.vschedule_date().value_or_empty();
397 _scheduleStartSubscribed = data.is_schedule_start_subscribed();
398 _unmutedVideoLimit = data.vunmuted_video_limit().v;
399 _allParticipantsLoaded
400 = (_serverParticipantsCount == _participants.size());
401 }
402
applyLocalUpdate(const MTPDupdateGroupCallParticipants & update)403 void GroupCall::applyLocalUpdate(
404 const MTPDupdateGroupCallParticipants &update) {
405 applyParticipantsSlice(
406 update.vparticipants().v,
407 ApplySliceSource::UpdateConstructed);
408 }
409
applyEnqueuedUpdate(const MTPUpdate & update)410 void GroupCall::applyEnqueuedUpdate(const MTPUpdate &update) {
411 Expects(!_applyingQueuedUpdates);
412
413 _applyingQueuedUpdates = true;
414 const auto guard = gsl::finally([&] { _applyingQueuedUpdates = false; });
415
416 update.match([&](const MTPDupdateGroupCall &data) {
417 data.vcall().match([&](const MTPDgroupCall &data) {
418 applyCallFields(data);
419 computeParticipantsCount();
420 }, [&](const MTPDgroupCallDiscarded &data) {
421 discard(data);
422 });
423 }, [&](const MTPDupdateGroupCallParticipants &data) {
424 DEBUG_LOG(("Group Call Participants: "
425 "Set from updateGroupCallParticipants %1 -> %2"
426 ).arg(_version
427 ).arg(data.vversion().v));
428 _version = data.vversion().v;
429 if (!_version) {
430 LOG(("API Error: "
431 "Got zero version in updateGroupCallParticipants."));
432 _version = 1;
433 }
434 applyParticipantsSlice(
435 data.vparticipants().v,
436 ApplySliceSource::UpdateReceived);
437 }, [](const auto &) {
438 Unexpected("Type in GroupCall::applyEnqueuedUpdate.");
439 });
440 Core::App().calls().applyGroupCallUpdateChecked(
441 &_peer->session(),
442 update);
443 }
444
processQueuedUpdates()445 void GroupCall::processQueuedUpdates() {
446 if (!_version || _applyingQueuedUpdates) {
447 return;
448 }
449
450 const auto size = _queuedUpdates.size();
451 while (!_queuedUpdates.empty()) {
452 const auto &entry = _queuedUpdates.front();
453 const auto version = entry.first.first;
454 const auto type = entry.first.second;
455 const auto incremented = (type == QueuedType::VersionedParticipant);
456 if ((version < _version)
457 || (version == _version && incremented)) {
458 _queuedUpdates.erase(_queuedUpdates.begin());
459 } else if (version == _version
460 || (version == _version + 1 && incremented)) {
461 const auto update = entry.second;
462 _queuedUpdates.erase(_queuedUpdates.begin());
463 applyEnqueuedUpdate(update);
464 } else {
465 break;
466 }
467 }
468 if (_queuedUpdates.empty()) {
469 _reloadByQueuedUpdatesTimer.cancel();
470 } else if (_queuedUpdates.size() != size
471 || !_reloadByQueuedUpdatesTimer.isActive()) {
472 _reloadByQueuedUpdatesTimer.callOnce(kWaitForUpdatesTimeout);
473 }
474 }
475
computeParticipantsCount()476 void GroupCall::computeParticipantsCount() {
477 _fullCount = _allParticipantsLoaded
478 ? int(_participants.size())
479 : std::max(int(_participants.size()), _serverParticipantsCount);
480 }
481
reload()482 void GroupCall::reload() {
483 if (_reloadRequestId || _applyingQueuedUpdates) {
484 return;
485 } else if (_participantsRequestId) {
486 api().request(_participantsRequestId).cancel();
487 _participantsRequestId = 0;
488 }
489
490 DEBUG_LOG(("Group Call Participants: "
491 "Reloading with queued: %1"
492 ).arg(_queuedUpdates.size()));
493
494 while (!_queuedUpdates.empty()) {
495 const auto &entry = _queuedUpdates.front();
496 const auto update = entry.second;
497 _queuedUpdates.erase(_queuedUpdates.begin());
498 applyEnqueuedUpdate(update);
499 }
500 _reloadByQueuedUpdatesTimer.cancel();
501
502 const auto limit = 3;
503 _reloadRequestId = api().request(
504 MTPphone_GetGroupCall(input(), MTP_int(limit))
505 ).done([=](const MTPphone_GroupCall &result) {
506 if (requestParticipantsAfterReload(result)) {
507 _savedFull = result;
508 processFullCallUsersChats(result);
509 requestParticipants();
510 return;
511 }
512 _reloadRequestId = 0;
513 processFullCall(result);
514 }).fail([=](const MTP::Error &error) {
515 _reloadRequestId = 0;
516 }).send();
517 }
518
requestParticipantsAfterReload(const MTPphone_GroupCall & call) const519 bool GroupCall::requestParticipantsAfterReload(
520 const MTPphone_GroupCall &call) const {
521 return call.match([&](const MTPDphone_groupCall &data) {
522 const auto received = data.vparticipants().v.size();
523 const auto size = data.vcall().match([&](const MTPDgroupCall &data) {
524 return data.vparticipants_count().v;
525 }, [](const auto &) {
526 return 0;
527 });
528 return (received < size) && (received < _participants.size());
529 });
530 }
531
applyParticipantsSlice(const QVector<MTPGroupCallParticipant> & list,ApplySliceSource sliceSource)532 void GroupCall::applyParticipantsSlice(
533 const QVector<MTPGroupCallParticipant> &list,
534 ApplySliceSource sliceSource) {
535 for (const auto &participant : list) {
536 participant.match([&](const MTPDgroupCallParticipant &data) {
537 const auto participantPeerId = peerFromMTP(data.vpeer());
538 const auto participantPeer = _peer->owner().peer(
539 participantPeerId);
540 const auto i = ranges::find(
541 _participants,
542 participantPeer,
543 &Participant::peer);
544 if (data.is_left()) {
545 if (i != end(_participants)) {
546 auto update = ParticipantUpdate{
547 .was = *i,
548 };
549 _participantPeerByAudioSsrc.erase(i->ssrc);
550 _participantPeerByAudioSsrc.erase(
551 GetAdditionalAudioSsrc(i->videoParams));
552 _speakingByActiveFinishes.remove(participantPeer);
553 _participants.erase(i);
554 if (sliceSource != ApplySliceSource::FullReloaded) {
555 _participantUpdates.fire(std::move(update));
556 }
557 }
558 if (_serverParticipantsCount > 0) {
559 --_serverParticipantsCount;
560 }
561 return;
562 }
563 if (const auto about = data.vabout()) {
564 participantPeer->setAbout(qs(*about));
565 }
566 const auto was = (i != end(_participants))
567 ? std::make_optional(*i)
568 : std::nullopt;
569 const auto canSelfUnmute = !data.is_muted()
570 || data.is_can_self_unmute();
571 const auto lastActive = data.vactive_date().value_or(
572 was ? was->lastActive : 0);
573 const auto volume = (was
574 && !was->applyVolumeFromMin
575 && data.is_min())
576 ? was->volume
577 : data.vvolume().value_or(Calls::Group::kDefaultVolume);
578 const auto applyVolumeFromMin = (was && data.is_min())
579 ? was->applyVolumeFromMin
580 : (data.is_min() || data.is_volume_by_admin());
581 const auto mutedByMe = (was && data.is_min())
582 ? was->mutedByMe
583 : data.is_muted_by_you();
584 const auto onlyMinLoaded = data.is_min()
585 && (!was || was->onlyMinLoaded);
586 const auto videoJoined = data.is_video_joined();
587 const auto raisedHandRating
588 = data.vraise_hand_rating().value_or_empty();
589 const auto localUpdate = (sliceSource
590 == ApplySliceSource::UpdateConstructed);
591 const auto existingVideoParams = (i != end(_participants))
592 ? i->videoParams
593 : nullptr;
594 auto videoParams = localUpdate
595 ? existingVideoParams
596 : Calls::ParseVideoParams(
597 data.vvideo(),
598 data.vpresentation(),
599 existingVideoParams);
600 const auto value = Participant{
601 .peer = participantPeer,
602 .videoParams = std::move(videoParams),
603 .date = data.vdate().v,
604 .lastActive = lastActive,
605 .raisedHandRating = raisedHandRating,
606 .ssrc = uint32(data.vsource().v),
607 .volume = volume,
608 .sounding = canSelfUnmute && was && was->sounding,
609 .speaking = canSelfUnmute && was && was->speaking,
610 .additionalSounding = (canSelfUnmute
611 && was
612 && was->additionalSounding),
613 .additionalSpeaking = (canSelfUnmute
614 && was
615 && was->additionalSpeaking),
616 .muted = data.is_muted(),
617 .mutedByMe = mutedByMe,
618 .canSelfUnmute = canSelfUnmute,
619 .onlyMinLoaded = onlyMinLoaded,
620 .videoJoined = videoJoined,
621 .applyVolumeFromMin = applyVolumeFromMin,
622 };
623 if (i == end(_participants)) {
624 if (value.ssrc) {
625 _participantPeerByAudioSsrc.emplace(
626 value.ssrc,
627 participantPeer);
628 }
629 if (const auto additional = GetAdditionalAudioSsrc(
630 value.videoParams)) {
631 _participantPeerByAudioSsrc.emplace(
632 additional,
633 participantPeer);
634 }
635 _participants.push_back(value);
636 if (const auto user = participantPeer->asUser()) {
637 _peer->owner().unregisterInvitedToCallUser(_id, user);
638 }
639 } else {
640 if (i->ssrc != value.ssrc) {
641 _participantPeerByAudioSsrc.erase(i->ssrc);
642 if (value.ssrc) {
643 _participantPeerByAudioSsrc.emplace(
644 value.ssrc,
645 participantPeer);
646 }
647 }
648 if (GetAdditionalAudioSsrc(i->videoParams)
649 != GetAdditionalAudioSsrc(value.videoParams)) {
650 _participantPeerByAudioSsrc.erase(
651 GetAdditionalAudioSsrc(i->videoParams));
652 if (const auto additional = GetAdditionalAudioSsrc(
653 value.videoParams)) {
654 _participantPeerByAudioSsrc.emplace(
655 additional,
656 participantPeer);
657 }
658 }
659 *i = value;
660 }
661 if (data.is_just_joined()) {
662 ++_serverParticipantsCount;
663 }
664 if (sliceSource != ApplySliceSource::FullReloaded) {
665 _participantUpdates.fire({
666 .was = was,
667 .now = value,
668 });
669 }
670 });
671 }
672 if (sliceSource == ApplySliceSource::UpdateReceived) {
673 changePeerEmptyCallFlag();
674 computeParticipantsCount();
675 }
676 }
677
applyLastSpoke(uint32 ssrc,LastSpokeTimes when,crl::time now)678 void GroupCall::applyLastSpoke(
679 uint32 ssrc,
680 LastSpokeTimes when,
681 crl::time now) {
682 const auto i = _participantPeerByAudioSsrc.find(ssrc);
683 if (i == end(_participantPeerByAudioSsrc)) {
684 _unknownSpokenSsrcs[ssrc] = when;
685 requestUnknownParticipants();
686 return;
687 }
688 const auto participant = findParticipant(i->second);
689 Assert(participant != nullptr);
690
691 _speakingByActiveFinishes.remove(participant->peer);
692 const auto sounding = (when.anything + kSoundStatusKeptFor >= now)
693 && participant->canSelfUnmute;
694 const auto speaking = sounding
695 && (when.voice + kSoundStatusKeptFor >= now);
696 if (speaking) {
697 _participantSpeaking.fire({ participant });
698 }
699 const auto useAdditional = (ssrc != participant->ssrc);
700 const auto nowSounding = useAdditional
701 ? participant->additionalSounding
702 : participant->sounding;
703 const auto nowSpeaking = useAdditional
704 ? participant->additionalSpeaking
705 : participant->speaking;
706 if (nowSounding != sounding || nowSpeaking != speaking) {
707 const auto was = *participant;
708 if (useAdditional) {
709 participant->additionalSounding = sounding;
710 participant->additionalSpeaking = speaking;
711 } else {
712 participant->sounding = sounding;
713 participant->speaking = speaking;
714 }
715 _participantUpdates.fire({
716 .was = was,
717 .now = *participant,
718 });
719 }
720 }
721
resolveParticipants(const base::flat_set<uint32> & ssrcs)722 void GroupCall::resolveParticipants(const base::flat_set<uint32> &ssrcs) {
723 if (ssrcs.empty()) {
724 return;
725 }
726 for (const auto ssrc : ssrcs) {
727 _unknownSpokenSsrcs.emplace(ssrc, LastSpokeTimes());
728 }
729 requestUnknownParticipants();
730 }
731
applyActiveUpdate(PeerId participantPeerId,LastSpokeTimes when,PeerData * participantPeerLoaded)732 void GroupCall::applyActiveUpdate(
733 PeerId participantPeerId,
734 LastSpokeTimes when,
735 PeerData *participantPeerLoaded) {
736 if (inCall()) {
737 return;
738 }
739 const auto participant = participantPeerLoaded
740 ? findParticipant(participantPeerLoaded)
741 : nullptr;
742 const auto loadByUserId = !participant || participant->onlyMinLoaded;
743 if (loadByUserId) {
744 _unknownSpokenPeerIds[participantPeerId] = when;
745 requestUnknownParticipants();
746 }
747 if (!participant || !participant->canSelfUnmute) {
748 return;
749 }
750 const auto was = std::make_optional(*participant);
751 const auto now = crl::now();
752 const auto elapsed = TimeId((now - when.anything) / crl::time(1000));
753 const auto lastActive = base::unixtime::now() - elapsed;
754 const auto finishes = when.anything + kSpeakingAfterActive;
755 if (lastActive <= participant->lastActive || finishes <= now) {
756 return;
757 }
758 _speakingByActiveFinishes[participant->peer] = finishes;
759 if (!_speakingByActiveFinishTimer.isActive()) {
760 _speakingByActiveFinishTimer.callOnce(finishes - now);
761 }
762
763 participant->lastActive = lastActive;
764 participant->speaking = true;
765 participant->canSelfUnmute = true;
766 if (!was->speaking || !was->canSelfUnmute) {
767 _participantUpdates.fire({
768 .was = was,
769 .now = *participant,
770 });
771 }
772 }
773
checkFinishSpeakingByActive()774 void GroupCall::checkFinishSpeakingByActive() {
775 const auto now = crl::now();
776 auto nearest = crl::time(0);
777 auto stop = std::vector<not_null<PeerData*>>();
778 for (auto i = begin(_speakingByActiveFinishes)
779 ; i != end(_speakingByActiveFinishes);) {
780 const auto when = i->second;
781 if (now >= when) {
782 stop.push_back(i->first);
783 i = _speakingByActiveFinishes.erase(i);
784 } else {
785 if (!nearest || nearest > when) {
786 nearest = when;
787 }
788 ++i;
789 }
790 }
791 for (const auto participantPeer : stop) {
792 const auto participant = findParticipant(participantPeer);
793 Assert(participant != nullptr);
794 if (participant->speaking) {
795 const auto was = *participant;
796 participant->speaking = false;
797 _participantUpdates.fire({
798 .was = was,
799 .now = *participant,
800 });
801 }
802 }
803 if (nearest) {
804 _speakingByActiveFinishTimer.callOnce(nearest - now);
805 }
806 }
807
requestUnknownParticipants()808 void GroupCall::requestUnknownParticipants() {
809 if (_unknownParticipantPeersRequestId
810 || (_unknownSpokenSsrcs.empty() && _unknownSpokenPeerIds.empty())) {
811 return;
812 }
813 const auto ssrcs = [&] {
814 if (_unknownSpokenSsrcs.size() < kRequestPerPage) {
815 return base::take(_unknownSpokenSsrcs);
816 }
817 auto result = base::flat_map<uint32, LastSpokeTimes>();
818 result.reserve(kRequestPerPage);
819 while (result.size() < kRequestPerPage) {
820 const auto [ssrc, when] = _unknownSpokenSsrcs.back();
821 result.emplace(ssrc, when);
822 _unknownSpokenSsrcs.erase(_unknownSpokenSsrcs.end() - 1);
823 }
824 return result;
825 }();
826 const auto participantPeerIds = [&] {
827 if (_unknownSpokenPeerIds.size() + ssrcs.size() < kRequestPerPage) {
828 return base::take(_unknownSpokenPeerIds);
829 }
830 auto result = base::flat_map<PeerId, LastSpokeTimes>();
831 const auto available = (kRequestPerPage - int(ssrcs.size()));
832 if (available > 0) {
833 result.reserve(available);
834 while (result.size() < available) {
835 const auto &back = _unknownSpokenPeerIds.back();
836 const auto [participantPeerId, when] = back;
837 result.emplace(participantPeerId, when);
838 _unknownSpokenPeerIds.erase(_unknownSpokenPeerIds.end() - 1);
839 }
840 }
841 return result;
842 }();
843 auto ssrcInputs = QVector<MTPint>();
844 ssrcInputs.reserve(ssrcs.size());
845 for (const auto &[ssrc, when] : ssrcs) {
846 ssrcInputs.push_back(MTP_int(ssrc));
847 }
848 auto peerInputs = QVector<MTPInputPeer>();
849 peerInputs.reserve(participantPeerIds.size());
850 for (const auto &[participantPeerId, when] : participantPeerIds) {
851 if (const auto userId = peerToUser(participantPeerId)) {
852 peerInputs.push_back(
853 MTP_inputPeerUser(MTP_long(userId.bare), MTP_long(0)));
854 } else if (const auto chatId = peerToChat(participantPeerId)) {
855 peerInputs.push_back(MTP_inputPeerChat(MTP_long(chatId.bare)));
856 } else if (const auto channelId = peerToChannel(participantPeerId)) {
857 peerInputs.push_back(
858 MTP_inputPeerChannel(MTP_long(channelId.bare), MTP_long(0)));
859 }
860 }
861 _unknownParticipantPeersRequestId = api().request(
862 MTPphone_GetGroupParticipants(
863 input(),
864 MTP_vector<MTPInputPeer>(peerInputs),
865 MTP_vector<MTPint>(ssrcInputs),
866 MTP_string(QString()),
867 MTP_int(kRequestPerPage)
868 )
869 ).done([=](const MTPphone_GroupParticipants &result) {
870 result.match([&](const MTPDphone_groupParticipants &data) {
871 _peer->owner().processUsers(data.vusers());
872 _peer->owner().processChats(data.vchats());
873 applyParticipantsSlice(
874 data.vparticipants().v,
875 ApplySliceSource::UnknownLoaded);
876 });
877 _unknownParticipantPeersRequestId = 0;
878 const auto now = crl::now();
879 for (const auto &[ssrc, when] : ssrcs) {
880 if (when.voice || when.anything) {
881 applyLastSpoke(ssrc, when, now);
882 }
883 _unknownSpokenSsrcs.remove(ssrc);
884 }
885 for (const auto &[id, when] : participantPeerIds) {
886 if (const auto participantPeer = _peer->owner().peerLoaded(id)) {
887 const auto isParticipant = ranges::contains(
888 _participants,
889 not_null{ participantPeer },
890 &Participant::peer);
891 if (isParticipant) {
892 applyActiveUpdate(id, when, participantPeer);
893 }
894 }
895 _unknownSpokenPeerIds.remove(id);
896 }
897 if (!ssrcs.empty()) {
898 _participantsResolved.fire(&ssrcs);
899 }
900 requestUnknownParticipants();
901 }).fail([=](const MTP::Error &error) {
902 _unknownParticipantPeersRequestId = 0;
903 for (const auto &[ssrc, when] : ssrcs) {
904 _unknownSpokenSsrcs.remove(ssrc);
905 }
906 for (const auto &[participantPeerId, when] : participantPeerIds) {
907 _unknownSpokenPeerIds.remove(participantPeerId);
908 }
909 requestUnknownParticipants();
910 }).send();
911 }
912
setInCall()913 void GroupCall::setInCall() {
914 _unknownSpokenPeerIds.clear();
915 if (_speakingByActiveFinishes.empty()) {
916 return;
917 }
918 auto restartTimer = true;
919 const auto latest = crl::now() + kActiveAfterJoined;
920 for (auto &[peer, when] : _speakingByActiveFinishes) {
921 if (when > latest) {
922 when = latest;
923 } else {
924 restartTimer = false;
925 }
926 }
927 if (restartTimer) {
928 _speakingByActiveFinishTimer.callOnce(kActiveAfterJoined);
929 }
930 }
931
inCall() const932 bool GroupCall::inCall() const {
933 const auto current = Core::App().calls().currentGroupCall();
934 return (current != nullptr)
935 && (current->id() == _id)
936 && (current->state() == Calls::GroupCall::State::Joined);
937 }
938
setJoinMutedLocally(bool muted)939 void GroupCall::setJoinMutedLocally(bool muted) {
940 _joinMuted = muted;
941 }
942
joinMuted() const943 bool GroupCall::joinMuted() const {
944 return _joinMuted;
945 }
946
canChangeJoinMuted() const947 bool GroupCall::canChangeJoinMuted() const {
948 return _canChangeJoinMuted;
949 }
950
joinedToTop() const951 bool GroupCall::joinedToTop() const {
952 return _joinedToTop;
953 }
954
api() const955 ApiWrap &GroupCall::api() const {
956 return _peer->session().api();
957 }
958
959 } // namespace Data
960