1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/policy/messaging_layer/public/report_client.h"
6
7 #include <memory>
8 #include <utility>
9
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/containers/queue.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/memory/scoped_refptr.h"
15 #include "base/memory/singleton.h"
16 #include "base/path_service.h"
17 #include "base/strings/strcat.h"
18 #include "base/task/post_task.h"
19 #include "chrome/browser/browser_process.h"
20 #include "chrome/browser/browser_process_platform_part.h"
21 #include "chrome/browser/net/system_network_context_manager.h"
22 #include "chrome/browser/policy/messaging_layer/public/report_queue.h"
23 #include "chrome/browser/policy/messaging_layer/public/report_queue_configuration.h"
24 #include "chrome/browser/policy/messaging_layer/storage/storage_module.h"
25 #include "chrome/browser/policy/messaging_layer/util/status.h"
26 #include "chrome/browser/policy/messaging_layer/util/status_macros.h"
27 #include "chrome/browser/policy/messaging_layer/util/statusor.h"
28 #include "chrome/browser/policy/messaging_layer/util/task_runner_context.h"
29 #include "chrome/browser/signin/identity_manager_factory.h"
30 #include "chrome/common/chrome_paths.h"
31 #include "components/enterprise/browser/controller/browser_dm_token_storage.h"
32 #include "components/policy/core/common/cloud/cloud_policy_client_registration_helper.h"
33 #include "components/policy/core/common/cloud/device_management_service.h"
34 #include "components/policy/proto/record.pb.h"
35 #include "components/signin/public/identity_manager/identity_manager.h"
36 #include "content/public/browser/browser_task_traits.h"
37 #include "content/public/browser/browser_thread.h"
38 #include "services/network/public/cpp/shared_url_loader_factory.h"
39
40 #ifdef OS_CHROMEOS
41 #include "chrome/browser/chromeos/login/users/chrome_user_manager.h"
42 #include "chrome/browser/chromeos/policy/browser_policy_connector_chromeos.h"
43 #include "chrome/browser/chromeos/profiles/profile_helper.h"
44 #include "chrome/browser/chromeos/settings/device_settings_service.h"
45 #include "components/policy/proto/chrome_device_policy.pb.h"
46 #else
47 #include "chrome/browser/policy/chrome_browser_policy_connector.h"
48 #endif
49
50 namespace reporting {
51
52 namespace {
53
54 // policy::CloudPolicyClient is needed by the UploadClient, but is built in two
55 // different ways for ChromeOS and non-ChromeOS browsers.
56 // NOT THREAD SAFE - these functions must be called on the main thread.
57 // TODO(chromium:1078512) Wrap CloudPolicyClient in a new object so that its
58 // methods, constructor, and destructor are accessed on the correct thread.
59 #ifdef OS_CHROMEOS
BuildCloudPolicyClient(base::OnceCallback<void (StatusOr<std::unique_ptr<policy::CloudPolicyClient>>)> build_cb)60 void BuildCloudPolicyClient(
61 base::OnceCallback<
62 void(StatusOr<std::unique_ptr<policy::CloudPolicyClient>>)> build_cb) {
63 auto* const browser_policy_connector =
64 g_browser_process->browser_policy_connector();
65 if (!browser_policy_connector) {
66 std::move(build_cb).Run(
67 Status(error::FAILED_PRECONDITION, "This is not a managed device."));
68 return;
69 }
70
71 policy::DeviceManagementService* const device_management_service =
72 browser_policy_connector->device_management_service();
73 if (!device_management_service) {
74 std::move(build_cb).Run(
75 Status(error::FAILED_PRECONDITION, "This is not a managed device."));
76 return;
77 }
78
79 if (!chromeos::DeviceSettingsService::IsInitialized()) {
80 chromeos::DeviceSettingsService::Initialize();
81 }
82 const enterprise_management::PolicyData* policy_data =
83 chromeos::DeviceSettingsService::Get()->policy_data();
84 if (!policy_data || !policy_data->has_request_token() ||
85 !policy_data->has_device_id()) {
86 std::move(build_cb).Run(
87 Status(error::UNAVAILABLE, "PolicyData is unavailable."));
88 return;
89 }
90
91 scoped_refptr<network::SharedURLLoaderFactory>
92 signin_profile_url_loader_factory =
93 g_browser_process->system_network_context_manager()
94 ->GetSharedURLLoaderFactory();
95
96 auto* user_manager_ptr = g_browser_process->platform_part()->user_manager();
97 auto* primary_user = user_manager_ptr->GetPrimaryUser();
98 auto dm_token_getter = chromeos::GetDeviceDMTokenForUserPolicyGetter(
99 primary_user->GetAccountId());
100
101 auto client = std::make_unique<policy::CloudPolicyClient>(
102 device_management_service, signin_profile_url_loader_factory,
103 dm_token_getter);
104
105 std::vector<std::string> affiliation_ids(
106 policy_data->user_affiliation_ids().begin(),
107 policy_data->user_affiliation_ids().end());
108 client->SetupRegistration(policy_data->request_token(),
109 policy_data->device_id(), affiliation_ids);
110 if (!client->is_registered()) {
111 std::move(build_cb).Run(
112 Status(error::UNAVAILABLE, "Unable to start CloudPolicyClient."));
113 return;
114 }
115 std::move(build_cb).Run(std::move(client));
116 }
117 #else
118 void BuildCloudPolicyClient(
119 base::OnceCallback<
120 void(StatusOr<std::unique_ptr<policy::CloudPolicyClient>>)> build_cb) {
121 policy::DeviceManagementService* const device_management_service =
122 g_browser_process->browser_policy_connector()
123 ->device_management_service();
124
125 scoped_refptr<network::SharedURLLoaderFactory>
126 signin_profile_url_loader_factory =
127 g_browser_process->system_network_context_manager()
128 ->GetSharedURLLoaderFactory();
129
130 auto client = std::make_unique<policy::CloudPolicyClient>(
131 device_management_service, signin_profile_url_loader_factory,
132 policy::CloudPolicyClient::DeviceDMTokenCallback());
133
134 policy::DMToken browser_dm_token =
135 policy::BrowserDMTokenStorage::Get()->RetrieveDMToken();
136 std::string client_id =
137 policy::BrowserDMTokenStorage::Get()->RetrieveClientId();
138
139 client->SetupRegistration(browser_dm_token.value(), client_id,
140 std::vector<std::string>());
141 std::move(build_cb).Run(std::move(client));
142 }
143 #endif
144
145 const base::FilePath::CharType kReportingDirectory[] =
146 FILE_PATH_LITERAL("reporting");
147
148 } // namespace
149
Uploader(UploadCallback upload_callback)150 ReportingClient::Uploader::Uploader(UploadCallback upload_callback)
151 : upload_callback_(std::move(upload_callback)),
152 encrypted_records_(std::make_unique<std::vector<EncryptedRecord>>()),
153 sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})) {}
154
155 ReportingClient::Uploader::~Uploader() = default;
156
157 StatusOr<std::unique_ptr<ReportingClient::Uploader>>
Create(UploadCallback upload_callback)158 ReportingClient::Uploader::Create(UploadCallback upload_callback) {
159 auto uploader = base::WrapUnique(new Uploader(std::move(upload_callback)));
160 return uploader;
161 }
162
ProcessRecord(EncryptedRecord data,base::OnceCallback<void (bool)> processed_cb)163 void ReportingClient::Uploader::ProcessRecord(
164 EncryptedRecord data,
165 base::OnceCallback<void(bool)> processed_cb) {
166 if (completed_) {
167 std::move(processed_cb).Run(false);
168 return;
169 }
170
171 sequenced_task_runner_->PostTask(
172 FROM_HERE,
173 base::BindOnce(
174 [](std::vector<EncryptedRecord>* records, EncryptedRecord record,
175 base::OnceCallback<void(bool)> processed_cb) {
176 records->emplace_back(std::move(record));
177 std::move(processed_cb).Run(true);
178 },
179 base::Unretained(encrypted_records_.get()), std::move(data),
180 std::move(processed_cb)));
181 }
182
ProcessGap(SequencingInformation start,uint64_t count,base::OnceCallback<void (bool)> processed_cb)183 void ReportingClient::Uploader::ProcessGap(
184 SequencingInformation start,
185 uint64_t count,
186 base::OnceCallback<void(bool)> processed_cb) {
187 if (completed_) {
188 std::move(processed_cb).Run(false);
189 return;
190 }
191
192 sequenced_task_runner_->PostTask(
193 FROM_HERE,
194 base::BindOnce(
195 [](std::vector<EncryptedRecord>* records, SequencingInformation start,
196 uint64_t count, base::OnceCallback<void(bool)> processed_cb) {
197 EncryptedRecord record;
198 *record.mutable_sequencing_information() = std::move(start);
199 for (uint64_t i = 0; i < count; ++i) {
200 records->emplace_back(record);
201 record.mutable_sequencing_information()->set_sequencing_id(
202 record.sequencing_information().sequencing_id() + 1);
203 }
204 std::move(processed_cb).Run(true);
205 },
206 base::Unretained(encrypted_records_.get()), std::move(start), count,
207 std::move(processed_cb)));
208 }
209
Completed(Status final_status)210 void ReportingClient::Uploader::Completed(Status final_status) {
211 if (!final_status.ok()) {
212 // No work to do - something went wrong with storage and it no longer wants
213 // to upload the records. Let the records die with |this|.
214 return;
215 }
216
217 if (completed_) {
218 // RunUpload has already been invoked. Return.
219 return;
220 }
221 completed_ = true;
222
223 sequenced_task_runner_->PostTask(
224 FROM_HERE,
225 base::BindOnce(&Uploader::RunUpload, std::move(upload_callback_),
226 std::move(encrypted_records_)));
227 }
228
229 // static
RunUpload(ReportingClient::Uploader::UploadCallback upload_callback,std::unique_ptr<std::vector<EncryptedRecord>> encrypted_records)230 void ReportingClient::Uploader::RunUpload(
231 ReportingClient::Uploader::UploadCallback upload_callback,
232 std::unique_ptr<std::vector<EncryptedRecord>> encrypted_records) {
233 DCHECK(encrypted_records);
234 if (encrypted_records->empty()) {
235 return;
236 }
237
238 Status upload_status =
239 std::move(upload_callback).Run(std::move(encrypted_records));
240 if (!upload_status.ok()) {
241 LOG(ERROR) << "Unable to upload records: " << upload_status;
242 }
243 }
244
245 ReportingClient::Configuration::Configuration() = default;
~Configuration()246 ReportingClient::Configuration::~Configuration() {
247 if (cloud_policy_client) {
248 base::PostTask(
249 FROM_HERE, {content::BrowserThread::UI},
250 base::BindOnce(
251 [](std::unique_ptr<policy::CloudPolicyClient> cloud_policy_client) {
252 cloud_policy_client.reset();
253 },
254 std::move(cloud_policy_client)));
255 }
256 }
257
InitializationStateTracker()258 ReportingClient::InitializationStateTracker::InitializationStateTracker()
259 : sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})) {}
260
261 ReportingClient::InitializationStateTracker::~InitializationStateTracker() =
262 default;
263
264 // static
265 scoped_refptr<ReportingClient::InitializationStateTracker>
Create()266 ReportingClient::InitializationStateTracker::Create() {
267 return base::WrapRefCounted(
268 new ReportingClient::InitializationStateTracker());
269 }
270
GetInitState(GetInitStateCallback get_init_state_cb)271 void ReportingClient::InitializationStateTracker::GetInitState(
272 GetInitStateCallback get_init_state_cb) {
273 sequenced_task_runner_->PostTask(
274 FROM_HERE,
275 base::BindOnce(
276 &ReportingClient::InitializationStateTracker::OnIsInitializedRequest,
277 this, std::move(get_init_state_cb)));
278 }
279
RequestLeaderPromotion(LeaderPromotionRequestCallback promo_request_cb)280 void ReportingClient::InitializationStateTracker::RequestLeaderPromotion(
281 LeaderPromotionRequestCallback promo_request_cb) {
282 sequenced_task_runner_->PostTask(
283 FROM_HERE, base::BindOnce(&ReportingClient::InitializationStateTracker::
284 OnLeaderPromotionRequest,
285 this, std::move(promo_request_cb)));
286 }
287
OnIsInitializedRequest(GetInitStateCallback get_init_state_cb)288 void ReportingClient::InitializationStateTracker::OnIsInitializedRequest(
289 GetInitStateCallback get_init_state_cb) {
290 base::ThreadPool::PostTask(
291 FROM_HERE,
292 base::BindOnce(
293 [](GetInitStateCallback get_init_state_cb, bool is_initialized) {
294 std::move(get_init_state_cb).Run(is_initialized);
295 },
296 std::move(get_init_state_cb), is_initialized_));
297 }
298
OnLeaderPromotionRequest(LeaderPromotionRequestCallback promo_request_cb)299 void ReportingClient::InitializationStateTracker::OnLeaderPromotionRequest(
300 LeaderPromotionRequestCallback promo_request_cb) {
301 StatusOr<ReleaseLeaderCallback> result;
302 if (is_initialized_) {
303 result = Status(error::FAILED_PRECONDITION,
304 "ReportClient is already configured");
305 } else if (has_promoted_initializing_context_) {
306 result = Status(error::RESOURCE_EXHAUSTED,
307 "ReportClient already has a lead initializing context.");
308 } else {
309 result = base::BindOnce(
310 &ReportingClient::InitializationStateTracker::ReleaseLeader, this);
311 }
312
313 base::ThreadPool::PostTask(
314 FROM_HERE, base::BindOnce(
315 [](LeaderPromotionRequestCallback promo_request_cb,
316 StatusOr<ReleaseLeaderCallback> result) {
317 std::move(promo_request_cb).Run(std::move(result));
318 },
319 std::move(promo_request_cb), std::move(result)));
320 }
321
ReleaseLeader(bool initialization_successful)322 void ReportingClient::InitializationStateTracker::ReleaseLeader(
323 bool initialization_successful) {
324 sequenced_task_runner_->PostTask(
325 FROM_HERE,
326 base::BindOnce(
327 &ReportingClient::InitializationStateTracker::OnLeaderRelease, this,
328 initialization_successful));
329 }
330
OnLeaderRelease(bool initialization_successful)331 void ReportingClient::InitializationStateTracker::OnLeaderRelease(
332 bool initialization_successful) {
333 if (initialization_successful) {
334 is_initialized_ = true;
335 }
336 has_promoted_initializing_context_ = false;
337 }
338
CreateReportQueueRequest(std::unique_ptr<ReportQueueConfiguration> config,CreateReportQueueCallback create_cb)339 ReportingClient::CreateReportQueueRequest::CreateReportQueueRequest(
340 std::unique_ptr<ReportQueueConfiguration> config,
341 CreateReportQueueCallback create_cb)
342 : config_(std::move(config)), create_cb_(std::move(create_cb)) {}
343
344 ReportingClient::CreateReportQueueRequest::~CreateReportQueueRequest() =
345 default;
346
CreateReportQueueRequest(ReportingClient::CreateReportQueueRequest && other)347 ReportingClient::CreateReportQueueRequest::CreateReportQueueRequest(
348 ReportingClient::CreateReportQueueRequest&& other)
349 : config_(other.config()), create_cb_(other.create_cb()) {}
350
351 std::unique_ptr<ReportQueueConfiguration>
config()352 ReportingClient::CreateReportQueueRequest::config() {
353 return std::move(config_);
354 }
355
356 ReportingClient::CreateReportQueueCallback
create_cb()357 ReportingClient::CreateReportQueueRequest::create_cb() {
358 return std::move(create_cb_);
359 }
360
InitializingContext(BuildCloudPolicyClientCallback build_client_cb,Storage::StartUploadCb start_upload_cb,UpdateConfigurationCallback update_config_cb,InitCompleteCallback init_complete_cb,scoped_refptr<ReportingClient::InitializationStateTracker> init_state_tracker,scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)361 ReportingClient::InitializingContext::InitializingContext(
362 BuildCloudPolicyClientCallback build_client_cb,
363 Storage::StartUploadCb start_upload_cb,
364 UpdateConfigurationCallback update_config_cb,
365 InitCompleteCallback init_complete_cb,
366 scoped_refptr<ReportingClient::InitializationStateTracker>
367 init_state_tracker,
368 scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
369 : TaskRunnerContext<Status>(std::move(init_complete_cb),
370 sequenced_task_runner),
371 build_client_cb_(std::move(build_client_cb)),
372 start_upload_cb_(std::move(start_upload_cb)),
373 update_config_cb_(std::move(update_config_cb)),
374 init_state_tracker_(init_state_tracker),
375 client_config_(std::make_unique<Configuration>()) {}
376
377 ReportingClient::InitializingContext::~InitializingContext() = default;
378
OnStart()379 void ReportingClient::InitializingContext::OnStart() {
380 init_state_tracker_->RequestLeaderPromotion(base::BindOnce(
381 &ReportingClient::InitializingContext::OnLeaderPromotionResult,
382 base::Unretained(this)));
383 }
384
OnLeaderPromotionResult(StatusOr<ReportingClient::InitializationStateTracker::ReleaseLeaderCallback> promo_result)385 void ReportingClient::InitializingContext::OnLeaderPromotionResult(
386 StatusOr<ReportingClient::InitializationStateTracker::ReleaseLeaderCallback>
387 promo_result) {
388 if (promo_result.status().error_code() == error::FAILED_PRECONDITION) {
389 // Between building this InitializingContext and attempting to promote to
390 // leader, the ReportingClient was configured. Ok response.
391 Complete(Status::StatusOK());
392 return;
393 }
394
395 if (!promo_result.ok()) {
396 Complete(promo_result.status());
397 return;
398 }
399
400 release_leader_cb_ = std::move(promo_result.ValueOrDie());
401 Schedule(&ReportingClient::InitializingContext::ConfigureCloudPolicyClient,
402 base::Unretained(this));
403 }
404
ConfigureCloudPolicyClient()405 void ReportingClient::InitializingContext::ConfigureCloudPolicyClient() {
406 // CloudPolicyClient requires posting to the main UI thread.
407 base::PostTask(
408 FROM_HERE, {content::BrowserThread::UI},
409 base::BindOnce(
410 [](BuildCloudPolicyClientCallback build_client_cb,
411 base::OnceCallback<void(
412 StatusOr<std::unique_ptr<policy::CloudPolicyClient>>)>
413 on_client_configured) {
414 std::move(build_client_cb).Run(std::move(on_client_configured));
415 },
416 std::move(build_client_cb_),
417 base::BindOnce(&ReportingClient::InitializingContext::
418 OnCloudPolicyClientConfigured,
419 base::Unretained(this))));
420 }
421
OnCloudPolicyClientConfigured(StatusOr<std::unique_ptr<policy::CloudPolicyClient>> client_result)422 void ReportingClient::InitializingContext::OnCloudPolicyClientConfigured(
423 StatusOr<std::unique_ptr<policy::CloudPolicyClient>> client_result) {
424 if (!client_result.ok()) {
425 Complete(Status(error::FAILED_PRECONDITION,
426 base::StrCat({"Unable to build CloudPolicyClient: ",
427 client_result.status().message()})));
428 return;
429 }
430 client_config_->cloud_policy_client = std::move(client_result.ValueOrDie());
431 Schedule(&ReportingClient::InitializingContext::ConfigureStorageModule,
432 base::Unretained(this));
433 }
434
ConfigureStorageModule()435 void ReportingClient::InitializingContext::ConfigureStorageModule() {
436 base::FilePath user_data_dir;
437 if (!base::PathService::Get(chrome::DIR_USER_DATA, &user_data_dir)) {
438 Complete(
439 Status(error::FAILED_PRECONDITION, "Could not retrieve base path"));
440 return;
441 }
442
443 base::FilePath reporting_path = user_data_dir.Append(kReportingDirectory);
444 StorageModule::Create(
445 Storage::Options().set_directory(reporting_path),
446 std::move(start_upload_cb_), base::MakeRefCounted<EncryptionModule>(),
447 base::BindOnce(
448 &ReportingClient::InitializingContext::OnStorageModuleConfigured,
449 base::Unretained(this)));
450 }
451
OnStorageModuleConfigured(StatusOr<scoped_refptr<StorageModule>> storage_result)452 void ReportingClient::InitializingContext::OnStorageModuleConfigured(
453 StatusOr<scoped_refptr<StorageModule>> storage_result) {
454 if (!storage_result.ok()) {
455 Complete(Status(error::FAILED_PRECONDITION,
456 base::StrCat({"Unable to build StorageModule: ",
457 storage_result.status().message()})));
458 return;
459 }
460
461 client_config_->storage = storage_result.ValueOrDie();
462 Schedule(
463 base::BindOnce(&ReportingClient::InitializingContext::CreateUploadClient,
464 base::Unretained(this)));
465 }
466
CreateUploadClient()467 void ReportingClient::InitializingContext::CreateUploadClient() {
468 ReportingClient* const instance = GetInstance();
469 DCHECK(!instance->upload_client_);
470 UploadClient::Create(
471 std::move(client_config_->cloud_policy_client),
472 base::BindRepeating(&StorageModule::ReportSuccess,
473 client_config_->storage),
474 base::BindOnce(&InitializingContext::OnUploadClientCreated,
475 base::Unretained(this)));
476 }
477
OnUploadClientCreated(StatusOr<std::unique_ptr<UploadClient>> upload_client_result)478 void ReportingClient::InitializingContext::OnUploadClientCreated(
479 StatusOr<std::unique_ptr<UploadClient>> upload_client_result) {
480 if (!upload_client_result.ok()) {
481 Complete(Status(error::FAILED_PRECONDITION,
482 base::StrCat({"Unable to create UploadClient: ",
483 upload_client_result.status().message()})));
484 return;
485 }
486 Schedule(&ReportingClient::InitializingContext::UpdateConfiguration,
487 base::Unretained(this),
488 std::move(upload_client_result.ValueOrDie()));
489 }
490
UpdateConfiguration(std::unique_ptr<UploadClient> upload_client)491 void ReportingClient::InitializingContext::UpdateConfiguration(
492 std::unique_ptr<UploadClient> upload_client) {
493 ReportingClient* const instance = GetInstance();
494 DCHECK(!instance->upload_client_);
495 instance->upload_client_ = std::move(upload_client);
496
497 std::move(update_config_cb_)
498 .Run(std::move(client_config_),
499 base::BindOnce(&ReportingClient::InitializingContext::Complete,
500 base::Unretained(this)));
501 }
502
Complete(Status status)503 void ReportingClient::InitializingContext::Complete(Status status) {
504 std::move(release_leader_cb_).Run(/*initialization_successful=*/status.ok());
505 Schedule(&ReportingClient::InitializingContext::Response,
506 base::Unretained(this), status);
507 }
508
ReportingClient()509 ReportingClient::ReportingClient()
510 : create_request_queue_(SharedQueue<CreateReportQueueRequest>::Create()),
511 init_state_tracker_(
512 ReportingClient::InitializationStateTracker::Create()),
513 build_cloud_policy_client_cb_(base::BindOnce(&BuildCloudPolicyClient)) {}
514
515 ReportingClient::~ReportingClient() = default;
516
GetInstance()517 ReportingClient* ReportingClient::GetInstance() {
518 return base::Singleton<ReportingClient>::get();
519 }
520
CreateReportQueue(std::unique_ptr<ReportQueueConfiguration> config,CreateReportQueueCallback create_cb)521 void ReportingClient::CreateReportQueue(
522 std::unique_ptr<ReportQueueConfiguration> config,
523 CreateReportQueueCallback create_cb) {
524 auto* instance = GetInstance();
525 instance->create_request_queue_->Push(
526 CreateReportQueueRequest(std::move(config), std::move(create_cb)),
527 base::BindOnce(&ReportingClient::OnPushComplete,
528 base::Unretained(instance)));
529 }
530
OnPushComplete()531 void ReportingClient::OnPushComplete() {
532 init_state_tracker_->GetInitState(
533 base::BindOnce(&ReportingClient::OnInitState, base::Unretained(this)));
534 }
535
OnInitState(bool reporting_client_configured)536 void ReportingClient::OnInitState(bool reporting_client_configured) {
537 if (!reporting_client_configured) {
538 // Schedule an InitializingContext to take care of initialization.
539 Start<ReportingClient::InitializingContext>(
540 std::move(build_cloud_policy_client_cb_),
541 base::BindRepeating(&ReportingClient::BuildUploader),
542 base::BindOnce(&ReportingClient::OnConfigResult,
543 base::Unretained(this)),
544 base::BindOnce(&ReportingClient::OnInitializationComplete,
545 base::Unretained(this)),
546 init_state_tracker_, base::ThreadPool::CreateSequencedTaskRunner({}));
547 return;
548 }
549
550 // Client was configured, build the queue!
551 create_request_queue_->Pop(base::BindOnce(&ReportingClient::BuildRequestQueue,
552 base::Unretained(this)));
553 }
554
OnConfigResult(std::unique_ptr<ReportingClient::Configuration> config,base::OnceCallback<void (Status)> continue_init_cb)555 void ReportingClient::OnConfigResult(
556 std::unique_ptr<ReportingClient::Configuration> config,
557 base::OnceCallback<void(Status)> continue_init_cb) {
558 config_ = std::move(config);
559 std::move(continue_init_cb).Run(Status::StatusOK());
560 }
561
OnInitializationComplete(Status init_status)562 void ReportingClient::OnInitializationComplete(Status init_status) {
563 if (init_status.error_code() == error::RESOURCE_EXHAUSTED) {
564 // This happens when a new request comes in while the ReportingClient is
565 // undergoing initialization. The leader will either clear or build the
566 // queue when it completes.
567 return;
568 }
569
570 // Configuration failed. Clear out all the requests that came in while we were
571 // attempting to configure.
572 if (!init_status.ok()) {
573 create_request_queue_->Swap(
574 base::queue<CreateReportQueueRequest>(),
575 base::BindOnce(&ReportingClient::ClearRequestQueue,
576 base::Unretained(this)));
577 return;
578 }
579 create_request_queue_->Pop(base::BindOnce(&ReportingClient::BuildRequestQueue,
580 base::Unretained(this)));
581 }
582
ClearRequestQueue(base::queue<CreateReportQueueRequest> failed_requests)583 void ReportingClient::ClearRequestQueue(
584 base::queue<CreateReportQueueRequest> failed_requests) {
585 while (!failed_requests.empty()) {
586 // Post to general thread.
587 base::ThreadPool::PostTask(
588 FROM_HERE, base::BindOnce(
589 [](CreateReportQueueRequest queue_request) {
590 std::move(queue_request.create_cb())
591 .Run(Status(error::UNAVAILABLE,
592 "Unable to build a ReportQueue"));
593 },
594 std::move(failed_requests.front())));
595 failed_requests.pop();
596 }
597 }
598
BuildRequestQueue(StatusOr<CreateReportQueueRequest> pop_result)599 void ReportingClient::BuildRequestQueue(
600 StatusOr<CreateReportQueueRequest> pop_result) {
601 // Queue is clear - nothing more to do.
602 if (!pop_result.ok()) {
603 return;
604 }
605
606 // We don't want to block either the ReportingClient sequenced_task_runner_ or
607 // the create_request_queue_.sequenced_task_runner_, so we post the task to a
608 // general thread.
609 base::ThreadPool::PostTask(
610 FROM_HERE, base::BindOnce(
611 [](scoped_refptr<StorageModule> storage_module,
612 CreateReportQueueRequest report_queue_request) {
613 std::move(report_queue_request.create_cb())
614 .Run(ReportQueue::Create(
615 report_queue_request.config(), storage_module));
616 },
617 config_->storage, std::move(pop_result.ValueOrDie())));
618
619 // Build the next item asynchronously
620 create_request_queue_->Pop(base::BindOnce(&ReportingClient::BuildRequestQueue,
621 base::Unretained(this)));
622 }
623
624 // static
625 StatusOr<std::unique_ptr<Storage::UploaderInterface>>
BuildUploader(Priority priority)626 ReportingClient::BuildUploader(Priority priority) {
627 ReportingClient* const instance = GetInstance();
628 DCHECK(instance->upload_client_);
629 return Uploader::Create(
630 base::BindOnce(&UploadClient::EnqueueUpload,
631 base::Unretained(instance->upload_client_.get())));
632 }
633
TestEnvironment(std::unique_ptr<policy::CloudPolicyClient> client)634 ReportingClient::TestEnvironment::TestEnvironment(
635 std::unique_ptr<policy::CloudPolicyClient> client)
636 : saved_build_cloud_policy_client_cb_(std::move(
637 ReportingClient::GetInstance()->build_cloud_policy_client_cb_)) {
638 ReportingClient::GetInstance()
639 ->build_cloud_policy_client_cb_ = base::BindOnce(
640 [](std::unique_ptr<policy::CloudPolicyClient> client,
641 base::OnceCallback<void(
642 StatusOr<std::unique_ptr<policy::CloudPolicyClient>>)> build_cb) {
643 std::move(build_cb).Run(std::move(client));
644 },
645 std::move(client));
646 }
647
~TestEnvironment()648 ReportingClient::TestEnvironment::~TestEnvironment() {
649 ReportingClient::GetInstance()->build_cloud_policy_client_cb_ =
650 std::move(saved_build_cloud_policy_client_cb_);
651 base::Singleton<ReportingClient>::OnExit(nullptr);
652 }
653
654 } // namespace reporting
655