1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/policy/messaging_layer/public/report_client.h"
6 
7 #include <memory>
8 #include <utility>
9 
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/containers/queue.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/memory/scoped_refptr.h"
15 #include "base/memory/singleton.h"
16 #include "base/path_service.h"
17 #include "base/strings/strcat.h"
18 #include "base/task/post_task.h"
19 #include "chrome/browser/browser_process.h"
20 #include "chrome/browser/browser_process_platform_part.h"
21 #include "chrome/browser/net/system_network_context_manager.h"
22 #include "chrome/browser/policy/messaging_layer/public/report_queue.h"
23 #include "chrome/browser/policy/messaging_layer/public/report_queue_configuration.h"
24 #include "chrome/browser/policy/messaging_layer/storage/storage_module.h"
25 #include "chrome/browser/policy/messaging_layer/util/status.h"
26 #include "chrome/browser/policy/messaging_layer/util/status_macros.h"
27 #include "chrome/browser/policy/messaging_layer/util/statusor.h"
28 #include "chrome/browser/policy/messaging_layer/util/task_runner_context.h"
29 #include "chrome/browser/signin/identity_manager_factory.h"
30 #include "chrome/common/chrome_paths.h"
31 #include "components/enterprise/browser/controller/browser_dm_token_storage.h"
32 #include "components/policy/core/common/cloud/cloud_policy_client_registration_helper.h"
33 #include "components/policy/core/common/cloud/device_management_service.h"
34 #include "components/policy/proto/record.pb.h"
35 #include "components/signin/public/identity_manager/identity_manager.h"
36 #include "content/public/browser/browser_task_traits.h"
37 #include "content/public/browser/browser_thread.h"
38 #include "services/network/public/cpp/shared_url_loader_factory.h"
39 
40 #ifdef OS_CHROMEOS
41 #include "chrome/browser/chromeos/login/users/chrome_user_manager.h"
42 #include "chrome/browser/chromeos/policy/browser_policy_connector_chromeos.h"
43 #include "chrome/browser/chromeos/profiles/profile_helper.h"
44 #include "chrome/browser/chromeos/settings/device_settings_service.h"
45 #include "components/policy/proto/chrome_device_policy.pb.h"
46 #else
47 #include "chrome/browser/policy/chrome_browser_policy_connector.h"
48 #endif
49 
50 namespace reporting {
51 
52 namespace {
53 
54 // policy::CloudPolicyClient is needed by the UploadClient, but is built in two
55 // different ways for ChromeOS and non-ChromeOS browsers.
56 // NOT THREAD SAFE - these functions must be called on the main thread.
57 // TODO(chromium:1078512) Wrap CloudPolicyClient in a new object so that its
58 // methods, constructor, and destructor are accessed on the correct thread.
59 #ifdef OS_CHROMEOS
BuildCloudPolicyClient(base::OnceCallback<void (StatusOr<std::unique_ptr<policy::CloudPolicyClient>>)> build_cb)60 void BuildCloudPolicyClient(
61     base::OnceCallback<
62         void(StatusOr<std::unique_ptr<policy::CloudPolicyClient>>)> build_cb) {
63   auto* const browser_policy_connector =
64       g_browser_process->browser_policy_connector();
65   if (!browser_policy_connector) {
66     std::move(build_cb).Run(
67         Status(error::FAILED_PRECONDITION, "This is not a managed device."));
68     return;
69   }
70 
71   policy::DeviceManagementService* const device_management_service =
72       browser_policy_connector->device_management_service();
73   if (!device_management_service) {
74     std::move(build_cb).Run(
75         Status(error::FAILED_PRECONDITION, "This is not a managed device."));
76     return;
77   }
78 
79   if (!chromeos::DeviceSettingsService::IsInitialized()) {
80     chromeos::DeviceSettingsService::Initialize();
81   }
82   const enterprise_management::PolicyData* policy_data =
83       chromeos::DeviceSettingsService::Get()->policy_data();
84   if (!policy_data || !policy_data->has_request_token() ||
85       !policy_data->has_device_id()) {
86     std::move(build_cb).Run(
87         Status(error::UNAVAILABLE, "PolicyData is unavailable."));
88     return;
89   }
90 
91   scoped_refptr<network::SharedURLLoaderFactory>
92       signin_profile_url_loader_factory =
93           g_browser_process->system_network_context_manager()
94               ->GetSharedURLLoaderFactory();
95 
96   auto* user_manager_ptr = g_browser_process->platform_part()->user_manager();
97   auto* primary_user = user_manager_ptr->GetPrimaryUser();
98   auto dm_token_getter = chromeos::GetDeviceDMTokenForUserPolicyGetter(
99       primary_user->GetAccountId());
100 
101   auto client = std::make_unique<policy::CloudPolicyClient>(
102       device_management_service, signin_profile_url_loader_factory,
103       dm_token_getter);
104 
105   std::vector<std::string> affiliation_ids(
106       policy_data->user_affiliation_ids().begin(),
107       policy_data->user_affiliation_ids().end());
108   client->SetupRegistration(policy_data->request_token(),
109                             policy_data->device_id(), affiliation_ids);
110   if (!client->is_registered()) {
111     std::move(build_cb).Run(
112         Status(error::UNAVAILABLE, "Unable to start CloudPolicyClient."));
113     return;
114   }
115   std::move(build_cb).Run(std::move(client));
116 }
117 #else
118 void BuildCloudPolicyClient(
119     base::OnceCallback<
120         void(StatusOr<std::unique_ptr<policy::CloudPolicyClient>>)> build_cb) {
121   policy::DeviceManagementService* const device_management_service =
122       g_browser_process->browser_policy_connector()
123           ->device_management_service();
124 
125   scoped_refptr<network::SharedURLLoaderFactory>
126       signin_profile_url_loader_factory =
127           g_browser_process->system_network_context_manager()
128               ->GetSharedURLLoaderFactory();
129 
130   auto client = std::make_unique<policy::CloudPolicyClient>(
131       device_management_service, signin_profile_url_loader_factory,
132       policy::CloudPolicyClient::DeviceDMTokenCallback());
133 
134   policy::DMToken browser_dm_token =
135       policy::BrowserDMTokenStorage::Get()->RetrieveDMToken();
136   std::string client_id =
137       policy::BrowserDMTokenStorage::Get()->RetrieveClientId();
138 
139   client->SetupRegistration(browser_dm_token.value(), client_id,
140                             std::vector<std::string>());
141   std::move(build_cb).Run(std::move(client));
142 }
143 #endif
144 
145 const base::FilePath::CharType kReportingDirectory[] =
146     FILE_PATH_LITERAL("reporting");
147 
148 }  // namespace
149 
Uploader(UploadCallback upload_callback)150 ReportingClient::Uploader::Uploader(UploadCallback upload_callback)
151     : upload_callback_(std::move(upload_callback)),
152       encrypted_records_(std::make_unique<std::vector<EncryptedRecord>>()),
153       sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})) {}
154 
155 ReportingClient::Uploader::~Uploader() = default;
156 
157 StatusOr<std::unique_ptr<ReportingClient::Uploader>>
Create(UploadCallback upload_callback)158 ReportingClient::Uploader::Create(UploadCallback upload_callback) {
159   auto uploader = base::WrapUnique(new Uploader(std::move(upload_callback)));
160   return uploader;
161 }
162 
ProcessRecord(EncryptedRecord data,base::OnceCallback<void (bool)> processed_cb)163 void ReportingClient::Uploader::ProcessRecord(
164     EncryptedRecord data,
165     base::OnceCallback<void(bool)> processed_cb) {
166   if (completed_) {
167     std::move(processed_cb).Run(false);
168     return;
169   }
170 
171   sequenced_task_runner_->PostTask(
172       FROM_HERE,
173       base::BindOnce(
174           [](std::vector<EncryptedRecord>* records, EncryptedRecord record,
175              base::OnceCallback<void(bool)> processed_cb) {
176             records->emplace_back(std::move(record));
177             std::move(processed_cb).Run(true);
178           },
179           base::Unretained(encrypted_records_.get()), std::move(data),
180           std::move(processed_cb)));
181 }
182 
ProcessGap(SequencingInformation start,uint64_t count,base::OnceCallback<void (bool)> processed_cb)183 void ReportingClient::Uploader::ProcessGap(
184     SequencingInformation start,
185     uint64_t count,
186     base::OnceCallback<void(bool)> processed_cb) {
187   if (completed_) {
188     std::move(processed_cb).Run(false);
189     return;
190   }
191 
192   sequenced_task_runner_->PostTask(
193       FROM_HERE,
194       base::BindOnce(
195           [](std::vector<EncryptedRecord>* records, SequencingInformation start,
196              uint64_t count, base::OnceCallback<void(bool)> processed_cb) {
197             EncryptedRecord record;
198             *record.mutable_sequencing_information() = std::move(start);
199             for (uint64_t i = 0; i < count; ++i) {
200               records->emplace_back(record);
201               record.mutable_sequencing_information()->set_sequencing_id(
202                   record.sequencing_information().sequencing_id() + 1);
203             }
204             std::move(processed_cb).Run(true);
205           },
206           base::Unretained(encrypted_records_.get()), std::move(start), count,
207           std::move(processed_cb)));
208 }
209 
Completed(Status final_status)210 void ReportingClient::Uploader::Completed(Status final_status) {
211   if (!final_status.ok()) {
212     // No work to do - something went wrong with storage and it no longer wants
213     // to upload the records. Let the records die with |this|.
214     return;
215   }
216 
217   if (completed_) {
218     // RunUpload has already been invoked. Return.
219     return;
220   }
221   completed_ = true;
222 
223   sequenced_task_runner_->PostTask(
224       FROM_HERE,
225       base::BindOnce(&Uploader::RunUpload, std::move(upload_callback_),
226                      std::move(encrypted_records_)));
227 }
228 
229 // static
RunUpload(ReportingClient::Uploader::UploadCallback upload_callback,std::unique_ptr<std::vector<EncryptedRecord>> encrypted_records)230 void ReportingClient::Uploader::RunUpload(
231     ReportingClient::Uploader::UploadCallback upload_callback,
232     std::unique_ptr<std::vector<EncryptedRecord>> encrypted_records) {
233   DCHECK(encrypted_records);
234   if (encrypted_records->empty()) {
235     return;
236   }
237 
238   Status upload_status =
239       std::move(upload_callback).Run(std::move(encrypted_records));
240   if (!upload_status.ok()) {
241     LOG(ERROR) << "Unable to upload records: " << upload_status;
242   }
243 }
244 
245 ReportingClient::Configuration::Configuration() = default;
~Configuration()246 ReportingClient::Configuration::~Configuration() {
247   if (cloud_policy_client) {
248     base::PostTask(
249         FROM_HERE, {content::BrowserThread::UI},
250         base::BindOnce(
251             [](std::unique_ptr<policy::CloudPolicyClient> cloud_policy_client) {
252               cloud_policy_client.reset();
253             },
254             std::move(cloud_policy_client)));
255   }
256 }
257 
InitializationStateTracker()258 ReportingClient::InitializationStateTracker::InitializationStateTracker()
259     : sequenced_task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})) {}
260 
261 ReportingClient::InitializationStateTracker::~InitializationStateTracker() =
262     default;
263 
264 // static
265 scoped_refptr<ReportingClient::InitializationStateTracker>
Create()266 ReportingClient::InitializationStateTracker::Create() {
267   return base::WrapRefCounted(
268       new ReportingClient::InitializationStateTracker());
269 }
270 
GetInitState(GetInitStateCallback get_init_state_cb)271 void ReportingClient::InitializationStateTracker::GetInitState(
272     GetInitStateCallback get_init_state_cb) {
273   sequenced_task_runner_->PostTask(
274       FROM_HERE,
275       base::BindOnce(
276           &ReportingClient::InitializationStateTracker::OnIsInitializedRequest,
277           this, std::move(get_init_state_cb)));
278 }
279 
RequestLeaderPromotion(LeaderPromotionRequestCallback promo_request_cb)280 void ReportingClient::InitializationStateTracker::RequestLeaderPromotion(
281     LeaderPromotionRequestCallback promo_request_cb) {
282   sequenced_task_runner_->PostTask(
283       FROM_HERE, base::BindOnce(&ReportingClient::InitializationStateTracker::
284                                     OnLeaderPromotionRequest,
285                                 this, std::move(promo_request_cb)));
286 }
287 
OnIsInitializedRequest(GetInitStateCallback get_init_state_cb)288 void ReportingClient::InitializationStateTracker::OnIsInitializedRequest(
289     GetInitStateCallback get_init_state_cb) {
290   base::ThreadPool::PostTask(
291       FROM_HERE,
292       base::BindOnce(
293           [](GetInitStateCallback get_init_state_cb, bool is_initialized) {
294             std::move(get_init_state_cb).Run(is_initialized);
295           },
296           std::move(get_init_state_cb), is_initialized_));
297 }
298 
OnLeaderPromotionRequest(LeaderPromotionRequestCallback promo_request_cb)299 void ReportingClient::InitializationStateTracker::OnLeaderPromotionRequest(
300     LeaderPromotionRequestCallback promo_request_cb) {
301   StatusOr<ReleaseLeaderCallback> result;
302   if (is_initialized_) {
303     result = Status(error::FAILED_PRECONDITION,
304                     "ReportClient is already configured");
305   } else if (has_promoted_initializing_context_) {
306     result = Status(error::RESOURCE_EXHAUSTED,
307                     "ReportClient already has a lead initializing context.");
308   } else {
309     result = base::BindOnce(
310         &ReportingClient::InitializationStateTracker::ReleaseLeader, this);
311   }
312 
313   base::ThreadPool::PostTask(
314       FROM_HERE, base::BindOnce(
315                      [](LeaderPromotionRequestCallback promo_request_cb,
316                         StatusOr<ReleaseLeaderCallback> result) {
317                        std::move(promo_request_cb).Run(std::move(result));
318                      },
319                      std::move(promo_request_cb), std::move(result)));
320 }
321 
ReleaseLeader(bool initialization_successful)322 void ReportingClient::InitializationStateTracker::ReleaseLeader(
323     bool initialization_successful) {
324   sequenced_task_runner_->PostTask(
325       FROM_HERE,
326       base::BindOnce(
327           &ReportingClient::InitializationStateTracker::OnLeaderRelease, this,
328           initialization_successful));
329 }
330 
OnLeaderRelease(bool initialization_successful)331 void ReportingClient::InitializationStateTracker::OnLeaderRelease(
332     bool initialization_successful) {
333   if (initialization_successful) {
334     is_initialized_ = true;
335   }
336   has_promoted_initializing_context_ = false;
337 }
338 
CreateReportQueueRequest(std::unique_ptr<ReportQueueConfiguration> config,CreateReportQueueCallback create_cb)339 ReportingClient::CreateReportQueueRequest::CreateReportQueueRequest(
340     std::unique_ptr<ReportQueueConfiguration> config,
341     CreateReportQueueCallback create_cb)
342     : config_(std::move(config)), create_cb_(std::move(create_cb)) {}
343 
344 ReportingClient::CreateReportQueueRequest::~CreateReportQueueRequest() =
345     default;
346 
CreateReportQueueRequest(ReportingClient::CreateReportQueueRequest && other)347 ReportingClient::CreateReportQueueRequest::CreateReportQueueRequest(
348     ReportingClient::CreateReportQueueRequest&& other)
349     : config_(other.config()), create_cb_(other.create_cb()) {}
350 
351 std::unique_ptr<ReportQueueConfiguration>
config()352 ReportingClient::CreateReportQueueRequest::config() {
353   return std::move(config_);
354 }
355 
356 ReportingClient::CreateReportQueueCallback
create_cb()357 ReportingClient::CreateReportQueueRequest::create_cb() {
358   return std::move(create_cb_);
359 }
360 
InitializingContext(BuildCloudPolicyClientCallback build_client_cb,Storage::StartUploadCb start_upload_cb,UpdateConfigurationCallback update_config_cb,InitCompleteCallback init_complete_cb,scoped_refptr<ReportingClient::InitializationStateTracker> init_state_tracker,scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)361 ReportingClient::InitializingContext::InitializingContext(
362     BuildCloudPolicyClientCallback build_client_cb,
363     Storage::StartUploadCb start_upload_cb,
364     UpdateConfigurationCallback update_config_cb,
365     InitCompleteCallback init_complete_cb,
366     scoped_refptr<ReportingClient::InitializationStateTracker>
367         init_state_tracker,
368     scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
369     : TaskRunnerContext<Status>(std::move(init_complete_cb),
370                                 sequenced_task_runner),
371       build_client_cb_(std::move(build_client_cb)),
372       start_upload_cb_(std::move(start_upload_cb)),
373       update_config_cb_(std::move(update_config_cb)),
374       init_state_tracker_(init_state_tracker),
375       client_config_(std::make_unique<Configuration>()) {}
376 
377 ReportingClient::InitializingContext::~InitializingContext() = default;
378 
OnStart()379 void ReportingClient::InitializingContext::OnStart() {
380   init_state_tracker_->RequestLeaderPromotion(base::BindOnce(
381       &ReportingClient::InitializingContext::OnLeaderPromotionResult,
382       base::Unretained(this)));
383 }
384 
OnLeaderPromotionResult(StatusOr<ReportingClient::InitializationStateTracker::ReleaseLeaderCallback> promo_result)385 void ReportingClient::InitializingContext::OnLeaderPromotionResult(
386     StatusOr<ReportingClient::InitializationStateTracker::ReleaseLeaderCallback>
387         promo_result) {
388   if (promo_result.status().error_code() == error::FAILED_PRECONDITION) {
389     // Between building this InitializingContext and attempting to promote to
390     // leader, the ReportingClient was configured. Ok response.
391     Complete(Status::StatusOK());
392     return;
393   }
394 
395   if (!promo_result.ok()) {
396     Complete(promo_result.status());
397     return;
398   }
399 
400   release_leader_cb_ = std::move(promo_result.ValueOrDie());
401   Schedule(&ReportingClient::InitializingContext::ConfigureCloudPolicyClient,
402            base::Unretained(this));
403 }
404 
ConfigureCloudPolicyClient()405 void ReportingClient::InitializingContext::ConfigureCloudPolicyClient() {
406   // CloudPolicyClient requires posting to the main UI thread.
407   base::PostTask(
408       FROM_HERE, {content::BrowserThread::UI},
409       base::BindOnce(
410           [](BuildCloudPolicyClientCallback build_client_cb,
411              base::OnceCallback<void(
412                  StatusOr<std::unique_ptr<policy::CloudPolicyClient>>)>
413                  on_client_configured) {
414             std::move(build_client_cb).Run(std::move(on_client_configured));
415           },
416           std::move(build_client_cb_),
417           base::BindOnce(&ReportingClient::InitializingContext::
418                              OnCloudPolicyClientConfigured,
419                          base::Unretained(this))));
420 }
421 
OnCloudPolicyClientConfigured(StatusOr<std::unique_ptr<policy::CloudPolicyClient>> client_result)422 void ReportingClient::InitializingContext::OnCloudPolicyClientConfigured(
423     StatusOr<std::unique_ptr<policy::CloudPolicyClient>> client_result) {
424   if (!client_result.ok()) {
425     Complete(Status(error::FAILED_PRECONDITION,
426                     base::StrCat({"Unable to build CloudPolicyClient: ",
427                                   client_result.status().message()})));
428     return;
429   }
430   client_config_->cloud_policy_client = std::move(client_result.ValueOrDie());
431   Schedule(&ReportingClient::InitializingContext::ConfigureStorageModule,
432            base::Unretained(this));
433 }
434 
ConfigureStorageModule()435 void ReportingClient::InitializingContext::ConfigureStorageModule() {
436   base::FilePath user_data_dir;
437   if (!base::PathService::Get(chrome::DIR_USER_DATA, &user_data_dir)) {
438     Complete(
439         Status(error::FAILED_PRECONDITION, "Could not retrieve base path"));
440     return;
441   }
442 
443   base::FilePath reporting_path = user_data_dir.Append(kReportingDirectory);
444   StorageModule::Create(
445       Storage::Options().set_directory(reporting_path),
446       std::move(start_upload_cb_), base::MakeRefCounted<EncryptionModule>(),
447       base::BindOnce(
448           &ReportingClient::InitializingContext::OnStorageModuleConfigured,
449           base::Unretained(this)));
450 }
451 
OnStorageModuleConfigured(StatusOr<scoped_refptr<StorageModule>> storage_result)452 void ReportingClient::InitializingContext::OnStorageModuleConfigured(
453     StatusOr<scoped_refptr<StorageModule>> storage_result) {
454   if (!storage_result.ok()) {
455     Complete(Status(error::FAILED_PRECONDITION,
456                     base::StrCat({"Unable to build StorageModule: ",
457                                   storage_result.status().message()})));
458     return;
459   }
460 
461   client_config_->storage = storage_result.ValueOrDie();
462   Schedule(
463       base::BindOnce(&ReportingClient::InitializingContext::CreateUploadClient,
464                      base::Unretained(this)));
465 }
466 
CreateUploadClient()467 void ReportingClient::InitializingContext::CreateUploadClient() {
468   ReportingClient* const instance = GetInstance();
469   DCHECK(!instance->upload_client_);
470   UploadClient::Create(
471       std::move(client_config_->cloud_policy_client),
472       base::BindRepeating(&StorageModule::ReportSuccess,
473                           client_config_->storage),
474       base::BindOnce(&InitializingContext::OnUploadClientCreated,
475                      base::Unretained(this)));
476 }
477 
OnUploadClientCreated(StatusOr<std::unique_ptr<UploadClient>> upload_client_result)478 void ReportingClient::InitializingContext::OnUploadClientCreated(
479     StatusOr<std::unique_ptr<UploadClient>> upload_client_result) {
480   if (!upload_client_result.ok()) {
481     Complete(Status(error::FAILED_PRECONDITION,
482                     base::StrCat({"Unable to create UploadClient: ",
483                                   upload_client_result.status().message()})));
484     return;
485   }
486   Schedule(&ReportingClient::InitializingContext::UpdateConfiguration,
487            base::Unretained(this),
488            std::move(upload_client_result.ValueOrDie()));
489 }
490 
UpdateConfiguration(std::unique_ptr<UploadClient> upload_client)491 void ReportingClient::InitializingContext::UpdateConfiguration(
492     std::unique_ptr<UploadClient> upload_client) {
493   ReportingClient* const instance = GetInstance();
494   DCHECK(!instance->upload_client_);
495   instance->upload_client_ = std::move(upload_client);
496 
497   std::move(update_config_cb_)
498       .Run(std::move(client_config_),
499            base::BindOnce(&ReportingClient::InitializingContext::Complete,
500                           base::Unretained(this)));
501 }
502 
Complete(Status status)503 void ReportingClient::InitializingContext::Complete(Status status) {
504   std::move(release_leader_cb_).Run(/*initialization_successful=*/status.ok());
505   Schedule(&ReportingClient::InitializingContext::Response,
506            base::Unretained(this), status);
507 }
508 
ReportingClient()509 ReportingClient::ReportingClient()
510     : create_request_queue_(SharedQueue<CreateReportQueueRequest>::Create()),
511       init_state_tracker_(
512           ReportingClient::InitializationStateTracker::Create()),
513       build_cloud_policy_client_cb_(base::BindOnce(&BuildCloudPolicyClient)) {}
514 
515 ReportingClient::~ReportingClient() = default;
516 
GetInstance()517 ReportingClient* ReportingClient::GetInstance() {
518   return base::Singleton<ReportingClient>::get();
519 }
520 
CreateReportQueue(std::unique_ptr<ReportQueueConfiguration> config,CreateReportQueueCallback create_cb)521 void ReportingClient::CreateReportQueue(
522     std::unique_ptr<ReportQueueConfiguration> config,
523     CreateReportQueueCallback create_cb) {
524   auto* instance = GetInstance();
525   instance->create_request_queue_->Push(
526       CreateReportQueueRequest(std::move(config), std::move(create_cb)),
527       base::BindOnce(&ReportingClient::OnPushComplete,
528                      base::Unretained(instance)));
529 }
530 
OnPushComplete()531 void ReportingClient::OnPushComplete() {
532   init_state_tracker_->GetInitState(
533       base::BindOnce(&ReportingClient::OnInitState, base::Unretained(this)));
534 }
535 
OnInitState(bool reporting_client_configured)536 void ReportingClient::OnInitState(bool reporting_client_configured) {
537   if (!reporting_client_configured) {
538     // Schedule an InitializingContext to take care of initialization.
539     Start<ReportingClient::InitializingContext>(
540         std::move(build_cloud_policy_client_cb_),
541         base::BindRepeating(&ReportingClient::BuildUploader),
542         base::BindOnce(&ReportingClient::OnConfigResult,
543                        base::Unretained(this)),
544         base::BindOnce(&ReportingClient::OnInitializationComplete,
545                        base::Unretained(this)),
546         init_state_tracker_, base::ThreadPool::CreateSequencedTaskRunner({}));
547     return;
548   }
549 
550   // Client was configured, build the queue!
551   create_request_queue_->Pop(base::BindOnce(&ReportingClient::BuildRequestQueue,
552                                             base::Unretained(this)));
553 }
554 
OnConfigResult(std::unique_ptr<ReportingClient::Configuration> config,base::OnceCallback<void (Status)> continue_init_cb)555 void ReportingClient::OnConfigResult(
556     std::unique_ptr<ReportingClient::Configuration> config,
557     base::OnceCallback<void(Status)> continue_init_cb) {
558   config_ = std::move(config);
559   std::move(continue_init_cb).Run(Status::StatusOK());
560 }
561 
OnInitializationComplete(Status init_status)562 void ReportingClient::OnInitializationComplete(Status init_status) {
563   if (init_status.error_code() == error::RESOURCE_EXHAUSTED) {
564     // This happens when a new request comes in while the ReportingClient is
565     // undergoing initialization. The leader will either clear or build the
566     // queue when it completes.
567     return;
568   }
569 
570   // Configuration failed. Clear out all the requests that came in while we were
571   // attempting to configure.
572   if (!init_status.ok()) {
573     create_request_queue_->Swap(
574         base::queue<CreateReportQueueRequest>(),
575         base::BindOnce(&ReportingClient::ClearRequestQueue,
576                        base::Unretained(this)));
577     return;
578   }
579   create_request_queue_->Pop(base::BindOnce(&ReportingClient::BuildRequestQueue,
580                                             base::Unretained(this)));
581 }
582 
ClearRequestQueue(base::queue<CreateReportQueueRequest> failed_requests)583 void ReportingClient::ClearRequestQueue(
584     base::queue<CreateReportQueueRequest> failed_requests) {
585   while (!failed_requests.empty()) {
586     // Post to general thread.
587     base::ThreadPool::PostTask(
588         FROM_HERE, base::BindOnce(
589                        [](CreateReportQueueRequest queue_request) {
590                          std::move(queue_request.create_cb())
591                              .Run(Status(error::UNAVAILABLE,
592                                          "Unable to build a ReportQueue"));
593                        },
594                        std::move(failed_requests.front())));
595     failed_requests.pop();
596   }
597 }
598 
BuildRequestQueue(StatusOr<CreateReportQueueRequest> pop_result)599 void ReportingClient::BuildRequestQueue(
600     StatusOr<CreateReportQueueRequest> pop_result) {
601   // Queue is clear - nothing more to do.
602   if (!pop_result.ok()) {
603     return;
604   }
605 
606   // We don't want to block either the ReportingClient sequenced_task_runner_ or
607   // the create_request_queue_.sequenced_task_runner_, so we post the task to a
608   // general thread.
609   base::ThreadPool::PostTask(
610       FROM_HERE, base::BindOnce(
611                      [](scoped_refptr<StorageModule> storage_module,
612                         CreateReportQueueRequest report_queue_request) {
613                        std::move(report_queue_request.create_cb())
614                            .Run(ReportQueue::Create(
615                                report_queue_request.config(), storage_module));
616                      },
617                      config_->storage, std::move(pop_result.ValueOrDie())));
618 
619   // Build the next item asynchronously
620   create_request_queue_->Pop(base::BindOnce(&ReportingClient::BuildRequestQueue,
621                                             base::Unretained(this)));
622 }
623 
624 // static
625 StatusOr<std::unique_ptr<Storage::UploaderInterface>>
BuildUploader(Priority priority)626 ReportingClient::BuildUploader(Priority priority) {
627   ReportingClient* const instance = GetInstance();
628   DCHECK(instance->upload_client_);
629   return Uploader::Create(
630       base::BindOnce(&UploadClient::EnqueueUpload,
631                      base::Unretained(instance->upload_client_.get())));
632 }
633 
TestEnvironment(std::unique_ptr<policy::CloudPolicyClient> client)634 ReportingClient::TestEnvironment::TestEnvironment(
635     std::unique_ptr<policy::CloudPolicyClient> client)
636     : saved_build_cloud_policy_client_cb_(std::move(
637           ReportingClient::GetInstance()->build_cloud_policy_client_cb_)) {
638   ReportingClient::GetInstance()
639       ->build_cloud_policy_client_cb_ = base::BindOnce(
640       [](std::unique_ptr<policy::CloudPolicyClient> client,
641          base::OnceCallback<void(
642              StatusOr<std::unique_ptr<policy::CloudPolicyClient>>)> build_cb) {
643         std::move(build_cb).Run(std::move(client));
644       },
645       std::move(client));
646 }
647 
~TestEnvironment()648 ReportingClient::TestEnvironment::~TestEnvironment() {
649   ReportingClient::GetInstance()->build_cloud_policy_client_cb_ =
650       std::move(saved_build_cloud_policy_client_cb_);
651   base::Singleton<ReportingClient>::OnExit(nullptr);
652 }
653 
654 }  // namespace reporting
655