1# -*- coding: utf-8 -*- 2# Copyright 2013 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""This module provides the notification command to gsutil.""" 16 17from __future__ import absolute_import 18from __future__ import print_function 19from __future__ import division 20from __future__ import unicode_literals 21 22import getopt 23import re 24import time 25import uuid 26 27from datetime import datetime 28from gslib import metrics 29from gslib.cloud_api import AccessDeniedException 30from gslib.cloud_api import BadRequestException 31from gslib.cloud_api import NotFoundException 32from gslib.cloud_api import PublishPermissionDeniedException 33from gslib.command import Command 34from gslib.command import NO_MAX 35from gslib.command_argument import CommandArgument 36from gslib.cs_api_map import ApiSelector 37from gslib.exception import CommandException 38from gslib.help_provider import CreateHelpText 39from gslib.project_id import PopulateProjectId 40from gslib.pubsub_api import PubsubApi 41from gslib.storage_url import StorageUrlFromString 42from gslib.third_party.pubsub_apitools.pubsub_v1_messages import Binding 43from gslib.utils import copy_helper 44 45# Cloud Pub/Sub commands 46 47_LIST_SYNOPSIS = """ 48 gsutil notification list gs://<bucket_name>... 49""" 50 51_DELETE_SYNOPSIS = """ 52 gsutil notification delete (<notificationConfigName>|gs://<bucket_name>)... 53""" 54 55_CREATE_SYNOPSIS = """ 56 gsutil notification create -f (json|none) [-p <prefix>] [-t <topic>] \\ 57 [-m <key>:<value>]... [-e <eventType>]... gs://<bucket_name> 58""" 59 60# Object Change Notification commands 61 62_WATCHBUCKET_SYNOPSIS = """ 63 gsutil notification watchbucket [-i <id>] [-t <token>] <app_url> gs://<bucket_name> 64""" 65 66_STOPCHANNEL_SYNOPSIS = """ 67 gsutil notification stopchannel <channel_id> <resource_id> 68""" 69 70_SYNOPSIS = ( 71 _CREATE_SYNOPSIS + 72 _DELETE_SYNOPSIS.lstrip('\n') + 73 _LIST_SYNOPSIS.lstrip('\n') + 74 _WATCHBUCKET_SYNOPSIS + 75 _STOPCHANNEL_SYNOPSIS.lstrip('\n') + '\n') # yapf: disable 76 77_LIST_DESCRIPTION = """ 78<B>LIST</B> 79 The list sub-command provides a list of notification configs belonging to a 80 given bucket. The listed name of each notification config can be used with 81 the delete sub-command to delete that specific notification config. 82 83 For listing Object Change Notifications instead of Cloud Pub/Sub notification 84 subscription configs, add a -o flag. 85 86<B>LIST EXAMPLES</B> 87 Fetch the list of notification configs for the bucket example-bucket: 88 89 gsutil notification list gs://example-bucket 90 91 The same as above, but for Object Change Notifications instead of Cloud 92 Pub/Sub notification subscription configs: 93 94 gsutil notification list -o gs://example-bucket 95 96 Fetch the notification configs in all buckets matching a wildcard: 97 98 gsutil notification list gs://example-* 99 100 Fetch all of the notification configs for buckets in the default project: 101 102 gsutil notification list gs://* 103""" 104 105_DELETE_DESCRIPTION = """ 106<B>DELETE</B> 107 The delete sub-command deletes notification configs from a bucket. If a 108 notification config name is passed as a parameter, that notification config 109 alone is deleted. If a bucket name is passed, all notification configs 110 associated with that bucket are deleted. 111 112 Cloud Pub/Sub topics associated with this notification config are not 113 deleted by this command. Those must be deleted separately, for example with 114 the gcloud command `gcloud beta pubsub topics delete`. 115 116 Object Change Notification subscriptions cannot be deleted with this command. 117 For that, see the command `gsutil notification stopchannel`. 118 119<B>DELETE EXAMPLES</B> 120 Delete a single notification config (with ID 3) in the bucket example-bucket: 121 122 gsutil notification delete projects/_/buckets/example-bucket/notificationConfigs/3 123 124 Delete all notification configs in the bucket example-bucket: 125 126 gsutil notification delete gs://example-bucket 127""" 128 129_CREATE_DESCRIPTION = """ 130<B>CREATE</B> 131 The create sub-command creates a notification config on a bucket, establishing 132 a flow of event notifications from Cloud Storage to a Cloud Pub/Sub topic. As 133 part of creating this flow, the create command also verifies that the 134 destination Cloud Pub/Sub topic exists, creating it if necessary, and verifies 135 that the Cloud Storage bucket has permission to publish events to that topic, 136 granting the permission if necessary. 137 138 If a destination Cloud Pub/Sub topic is not specified with the -t flag, Cloud 139 Storage chooses a topic name in the default project whose ID is the same as 140 the bucket name. For example, if the default project ID specified is 141 'default-project' and the bucket being configured is gs://example-bucket, the 142 create command uses the Cloud Pub/Sub topic 143 "projects/default-project/topics/example-bucket". 144 145 In order to enable notifications, a `special Cloud Storage service account 146 <https://cloud.google.com/storage/docs/projects#service-accounts>`_ unique to 147 each project must have the IAM permission "pubsub.topics.publish". This 148 command checks to see if the destination Cloud Pub/Sub topic grants the 149 service account this permission. If not, the create command attempts to 150 grant it. 151 152 You can create multiple notification configurations for a bucket, but their 153 triggers cannot overlap such that a single event could send multiple 154 notifications. Attempting to create a notification configuration that 155 overlaps with an existing notification configuration results in an error. 156 157<B>CREATE EXAMPLES</B> 158 Begin sending notifications of all changes to the bucket example-bucket 159 to the Cloud Pub/Sub topic projects/default-project/topics/example-bucket: 160 161 gsutil notification create -f json gs://example-bucket 162 163 The same as above, but specifies the destination topic ID 'files-to-process' 164 in the default project: 165 166 gsutil notification create -f json \\ 167 -t files-to-process gs://example-bucket 168 169 The same as above, but specifies a Cloud Pub/Sub topic belonging to the 170 specific cloud project 'example-project': 171 172 gsutil notification create -f json \\ 173 -t projects/example-project/topics/files-to-process gs://example-bucket 174 175 Create a notification config that only sends an event when a new object 176 has been created: 177 178 gsutil notification create -f json -e OBJECT_FINALIZE gs://example-bucket 179 180 Create a topic and notification config that only sends an event when 181 an object beginning with "photos/" is affected: 182 183 gsutil notification create -p photos/ gs://example-bucket 184 185 List all of the notificationConfigs in bucket example-bucket: 186 187 gsutil notification list gs://example-bucket 188 189 Delete all notitificationConfigs for bucket example-bucket: 190 191 gsutil notification delete gs://example-bucket 192 193 Delete one specific notificationConfig for bucket example-bucket: 194 195 gsutil notification delete \\ 196 projects/_/buckets/example-bucket/notificationConfigs/1 197 198<B>OPTIONS</B> 199 The create sub-command has the following options 200 201 -e Specify an event type filter for this notification config. Cloud 202 Storage only sends notifications of this type. You may specify this 203 parameter multiple times to allow multiple event types. If not 204 specified, Cloud Storage sends notifications for all event types. 205 The valid types are: 206 207 OBJECT_FINALIZE - An object has been created. 208 OBJECT_METADATA_UPDATE - The metadata of an object has changed. 209 OBJECT_DELETE - An object has been permanently deleted. 210 OBJECT_ARCHIVE - A live version of an object has become a 211 noncurrent version. 212 213 -f Specifies the payload format of notification messages. Must be 214 either "json" for a payload matches the object metadata for the 215 JSON API, or "none" to specify no payload at all. In either case, 216 notification details are available in the message attributes. 217 218 -m Specifies a key:value attribute that is appended to the set 219 of attributes sent to Cloud Pub/Sub for all events associated with 220 this notification config. You may specify this parameter multiple 221 times to set multiple attributes. 222 223 -p Specifies a prefix path filter for this notification config. Cloud 224 Storage only sends notifications for objects in this bucket whose 225 names begin with the specified prefix. 226 227 -s Skips creation and permission assignment of the Cloud Pub/Sub topic. 228 This is useful if the caller does not have permission to access 229 the topic in question, or if the topic already exists and has the 230 appropriate publish permission assigned. 231 232 -t The Cloud Pub/Sub topic to which notifications should be sent. If 233 not specified, this command chooses a topic whose project is your 234 default project and whose ID is the same as the Cloud Storage bucket 235 name. 236 237<B>NEXT STEPS</B> 238 Once the create command has succeeded, Cloud Storage publishes a message to 239 the specified Cloud Pub/Sub topic when eligible changes occur. In order to 240 receive these messages, you must create a Pub/Sub subscription for your 241 Pub/Sub topic. To learn more about creating Pub/Sub subscriptions, see `the 242 Pub/Sub Subscriber Overview <https://cloud.google.com/pubsub/docs/subscriber>`_. 243 244 You can create a simple Pub/Sub subscription using the ``gcloud`` command-line 245 tool. For example, to create a new subscription on the topic "myNewTopic" and 246 attempt to pull messages from it, you could run: 247 248 gcloud beta pubsub subscriptions create --topic myNewTopic testSubscription 249 gcloud beta pubsub subscriptions pull --auto-ack testSubscription 250""" 251 252_WATCHBUCKET_DESCRIPTION = """ 253<B>WATCHBUCKET</B> 254 The watchbucket sub-command can be used to watch a bucket for object changes. 255 A service account must be used when running this command. 256 257 The app_url parameter must be an HTTPS URL to an application that will be 258 notified of changes to any object in the bucket. The URL endpoint must be 259 a verified domain on your project. See `Notification Authorization 260 <https://cloud.google.com/storage/docs/object-change-notification#_Authorization>`_ 261 for details. 262 263 The optional id parameter can be used to assign a unique identifier to the 264 created notification channel. If not provided, a random UUID string is 265 generated. 266 267 The optional token parameter can be used to validate notifications events. 268 To do this, set this custom token and store it to later verify that 269 notification events contain the client token you expect. 270 271<B>WATCHBUCKET EXAMPLES</B> 272 Watch the bucket example-bucket for changes and send notifications to an 273 application server running at example.com: 274 275 gsutil notification watchbucket https://example.com/notify \\ 276 gs://example-bucket 277 278 Assign identifier my-channel-id to the created notification channel: 279 280 gsutil notification watchbucket -i my-channel-id \\ 281 https://example.com/notify gs://example-bucket 282 283 Set a custom client token that is included with each notification event: 284 285 gsutil notification watchbucket -t my-client-token \\ 286 https://example.com/notify gs://example-bucket 287""" 288 289_STOPCHANNEL_DESCRIPTION = """ 290<B>STOPCHANNEL</B> 291 The stopchannel sub-command can be used to stop sending change events to a 292 notification channel. 293 294 The channel_id and resource_id parameters should match the values from the 295 response of a bucket watch request. 296 297<B>STOPCHANNEL EXAMPLES</B> 298 Stop the notification event channel with channel identifier channel1 and 299 resource identifier SoGqan08XDIFWr1Fv_nGpRJBHh8: 300 301 gsutil notification stopchannel channel1 SoGqan08XDIFWr1Fv_nGpRJBHh8 302""" 303 304_DESCRIPTION = """ 305 You can use the ``notification`` command to configure 306 `Pub/Sub notifications for Cloud Storage 307 <https://cloud.google.com/storage/docs/pubsub-notifications>`_ 308 and `Object change notification 309 <https://cloud.google.com/storage/docs/object-change-notification>`_ channels. 310 311<B>CLOUD PUB/SUB</B> 312 The "create", "list", and "delete" sub-commands deal with configuring Cloud 313 Storage integration with Google Cloud Pub/Sub. 314""" + _CREATE_DESCRIPTION + _LIST_DESCRIPTION + _DELETE_DESCRIPTION + """ 315<B>OBJECT CHANGE NOTIFICATIONS</B> 316 Object change notification is a separate, older feature within Cloud Storage 317 for generating notifications. This feature sends HTTPS messages to a client 318 application that you've set up separately. This feature is generally not 319 recommended, because Pub/Sub notifications are cheaper, easier to use, and 320 more flexible. For more information, see 321 `Object change notification 322 <https://cloud.google.com/storage/docs/object-change-notification>`_. 323 324 The "watchbucket" and "stopchannel" sub-commands enable and disable Object 325 change notifications. 326""" + _WATCHBUCKET_DESCRIPTION + _STOPCHANNEL_DESCRIPTION + """ 327<B>NOTIFICATIONS AND PARALLEL COMPOSITE UPLOADS</B> 328 gsutil supports `parallel composite uploads 329 <https://cloud.google.com/storage/docs/uploads-downloads#parallel-composite-uploads>`_. 330 If enabled, an upload can result in multiple temporary component objects 331 being uploaded before the actual intended object is created. Any subscriber 332 to notifications for this bucket then sees a notification for each of these 333 components being created and deleted. If this is a concern for you, note 334 that parallel composite uploads can be disabled by setting 335 "parallel_composite_upload_threshold = 0" in your .boto config file. 336 Alternately, your subscriber code can filter out gsutil's parallel 337 composite uploads by ignoring any notification about objects whose names 338 contain (but do not start with) the following string: 339 "{composite_namespace}". 340 341""".format(composite_namespace=copy_helper.PARALLEL_UPLOAD_TEMP_NAMESPACE) 342 343NOTIFICATION_AUTHORIZATION_FAILED_MESSAGE = """ 344Watch bucket attempt failed: 345 {watch_error} 346 347You attempted to watch a bucket with an application URL of: 348 349 {watch_url} 350 351which is not authorized for your project. Please ensure that you are using 352Service Account authentication and that the Service Account's project is 353authorized for the application URL. Notification endpoint URLs must also be 354whitelisted in your Cloud Console project. To do that, the domain must also be 355verified using Google Webmaster Tools. For instructions, please see 356`Notification Authorization 357<https://cloud.google.com/storage/docs/object-change-notification#_Authorization>`_. 358""" 359 360_DETAILED_HELP_TEXT = CreateHelpText(_SYNOPSIS, _DESCRIPTION) 361 362# yapf: disable 363_create_help_text = ( 364 CreateHelpText(_CREATE_SYNOPSIS, _CREATE_DESCRIPTION)) 365_list_help_text = ( 366 CreateHelpText(_LIST_SYNOPSIS, _LIST_DESCRIPTION)) 367_delete_help_text = ( 368 CreateHelpText(_DELETE_SYNOPSIS, _DELETE_DESCRIPTION)) 369_watchbucket_help_text = ( 370 CreateHelpText(_WATCHBUCKET_SYNOPSIS, _WATCHBUCKET_DESCRIPTION)) 371_stopchannel_help_text = ( 372 CreateHelpText(_STOPCHANNEL_SYNOPSIS, _STOPCHANNEL_DESCRIPTION)) 373# yapf: enable 374 375PAYLOAD_FORMAT_MAP = { 376 'none': 'NONE', 377 'json': 'JSON_API_V1', 378} 379 380 381class NotificationCommand(Command): 382 """Implementation of gsutil notification command.""" 383 384 # Notification names might look like one of these: 385 # canonical form: projects/_/buckets/bucket/notificationConfigs/3 386 # JSON API form: b/bucket/notificationConfigs/5 387 # Either of the above might start with a / if a user is copying & pasting. 388 def _GetNotificationPathRegex(self): 389 if not NotificationCommand._notification_path_regex: 390 NotificationCommand._notification_path_regex = re.compile( 391 ('/?(projects/[^/]+/)?b(uckets)?/(?P<bucket>[^/]+)/' 392 'notificationConfigs/(?P<notification>[0-9]+)')) 393 return NotificationCommand._notification_path_regex 394 395 _notification_path_regex = None 396 397 # Command specification. See base class for documentation. 398 command_spec = Command.CreateCommandSpec( 399 'notification', 400 command_name_aliases=[ 401 'notify', 402 'notifyconfig', 403 'notifications', 404 'notif', 405 ], 406 usage_synopsis=_SYNOPSIS, 407 min_args=2, 408 max_args=NO_MAX, 409 supported_sub_args='i:t:m:t:of:e:p:s', 410 file_url_ok=False, 411 provider_url_ok=False, 412 urls_start_arg=1, 413 gs_api_support=[ApiSelector.JSON], 414 gs_default_api=ApiSelector.JSON, 415 argparse_arguments={ 416 'watchbucket': [ 417 CommandArgument.MakeFreeTextArgument(), 418 CommandArgument.MakeZeroOrMoreCloudBucketURLsArgument(), 419 ], 420 'stopchannel': [], 421 'list': [CommandArgument.MakeZeroOrMoreCloudBucketURLsArgument(),], 422 'delete': [ 423 # Takes a list of one of the following: 424 # notification: projects/_/buckets/bla/notificationConfigs/5, 425 # bucket: gs://foobar 426 CommandArgument.MakeZeroOrMoreCloudURLsArgument(), 427 ], 428 'create': [ 429 CommandArgument.MakeFreeTextArgument(), # Cloud Pub/Sub topic 430 CommandArgument.MakeNCloudBucketURLsArgument(1), 431 ] 432 }, 433 ) 434 # Help specification. See help_provider.py for documentation. 435 help_spec = Command.HelpSpec( 436 help_name='notification', 437 help_name_aliases=[ 438 'watchbucket', 439 'stopchannel', 440 'notifyconfig', 441 ], 442 help_type='command_help', 443 help_one_line_summary='Configure object change notification', 444 help_text=_DETAILED_HELP_TEXT, 445 subcommand_help_text={ 446 'create': _create_help_text, 447 'list': _list_help_text, 448 'delete': _delete_help_text, 449 'watchbucket': _watchbucket_help_text, 450 'stopchannel': _stopchannel_help_text, 451 }, 452 ) 453 454 def _WatchBucket(self): 455 """Creates a watch on a bucket given in self.args.""" 456 self.CheckArguments() 457 identifier = None 458 client_token = None 459 if self.sub_opts: 460 for o, a in self.sub_opts: 461 if o == '-i': 462 identifier = a 463 if o == '-t': 464 client_token = a 465 466 identifier = identifier or str(uuid.uuid4()) 467 watch_url = self.args[0] 468 bucket_arg = self.args[-1] 469 470 if not watch_url.lower().startswith('https://'): 471 raise CommandException('The application URL must be an https:// URL.') 472 473 bucket_url = StorageUrlFromString(bucket_arg) 474 if not (bucket_url.IsBucket() and bucket_url.scheme == 'gs'): 475 raise CommandException( 476 'The %s command can only be used with gs:// bucket URLs.' % 477 self.command_name) 478 if not bucket_url.IsBucket(): 479 raise CommandException('URL must name a bucket for the %s command.' % 480 self.command_name) 481 482 self.logger.info('Watching bucket %s with application URL %s ...', 483 bucket_url, watch_url) 484 485 try: 486 channel = self.gsutil_api.WatchBucket(bucket_url.bucket_name, 487 watch_url, 488 identifier, 489 token=client_token, 490 provider=bucket_url.scheme) 491 except AccessDeniedException as e: 492 self.logger.warn( 493 NOTIFICATION_AUTHORIZATION_FAILED_MESSAGE.format(watch_error=str(e), 494 watch_url=watch_url)) 495 raise 496 497 channel_id = channel.id 498 resource_id = channel.resourceId 499 client_token = channel.token 500 self.logger.info('Successfully created watch notification channel.') 501 self.logger.info('Watch channel identifier: %s', channel_id) 502 self.logger.info('Canonicalized resource identifier: %s', resource_id) 503 self.logger.info('Client state token: %s', client_token) 504 505 return 0 506 507 def _StopChannel(self): 508 channel_id = self.args[0] 509 resource_id = self.args[1] 510 511 self.logger.info('Removing channel %s with resource identifier %s ...', 512 channel_id, resource_id) 513 self.gsutil_api.StopChannel(channel_id, resource_id, provider='gs') 514 self.logger.info('Succesfully removed channel.') 515 516 return 0 517 518 def _ListChannels(self, bucket_arg): 519 """Lists active channel watches on a bucket given in self.args.""" 520 bucket_url = StorageUrlFromString(bucket_arg) 521 if not (bucket_url.IsBucket() and bucket_url.scheme == 'gs'): 522 raise CommandException( 523 'The %s command can only be used with gs:// bucket URLs.' % 524 self.command_name) 525 if not bucket_url.IsBucket(): 526 raise CommandException('URL must name a bucket for the %s command.' % 527 self.command_name) 528 channels = self.gsutil_api.ListChannels(bucket_url.bucket_name, 529 provider='gs').items 530 self.logger.info( 531 'Bucket %s has the following active Object Change Notifications:', 532 bucket_url.bucket_name) 533 for idx, channel in enumerate(channels): 534 self.logger.info('\tNotification channel %d:', idx + 1) 535 self.logger.info('\t\tChannel identifier: %s', channel.channel_id) 536 self.logger.info('\t\tResource identifier: %s', channel.resource_id) 537 self.logger.info('\t\tApplication URL: %s', channel.push_url) 538 self.logger.info('\t\tCreated by: %s', channel.subscriber_email) 539 self.logger.info( 540 '\t\tCreation time: %s', 541 str(datetime.fromtimestamp(channel.creation_time_ms / 1000))) 542 543 return 0 544 545 def _Create(self): 546 self.CheckArguments() 547 548 # User-specified options 549 pubsub_topic = None 550 payload_format = None 551 custom_attributes = {} 552 event_types = [] 553 object_name_prefix = None 554 should_setup_topic = True 555 556 if self.sub_opts: 557 for o, a in self.sub_opts: 558 if o == '-e': 559 event_types.append(a) 560 elif o == '-f': 561 payload_format = a 562 elif o == '-m': 563 if ':' not in a: 564 raise CommandException( 565 'Custom attributes specified with -m should be of the form ' 566 'key:value') 567 key, value = a.split(':') 568 custom_attributes[key] = value 569 elif o == '-p': 570 object_name_prefix = a 571 elif o == '-s': 572 should_setup_topic = False 573 elif o == '-t': 574 pubsub_topic = a 575 576 if payload_format not in PAYLOAD_FORMAT_MAP: 577 raise CommandException( 578 "Must provide a payload format with -f of either 'json' or 'none'") 579 payload_format = PAYLOAD_FORMAT_MAP[payload_format] 580 581 bucket_arg = self.args[-1] 582 583 bucket_url = StorageUrlFromString(bucket_arg) 584 if not bucket_url.IsCloudUrl() or not bucket_url.IsBucket(): 585 raise CommandException( 586 "%s %s requires a GCS bucket name, but got '%s'" % 587 (self.command_name, self.subcommand_name, bucket_arg)) 588 if bucket_url.scheme != 'gs': 589 raise CommandException( 590 'The %s command can only be used with gs:// bucket URLs.' % 591 self.command_name) 592 bucket_name = bucket_url.bucket_name 593 self.logger.debug('Creating notification for bucket %s', bucket_url) 594 595 # Find the project this bucket belongs to 596 bucket_metadata = self.gsutil_api.GetBucket(bucket_name, 597 fields=['projectNumber'], 598 provider=bucket_url.scheme) 599 bucket_project_number = bucket_metadata.projectNumber 600 601 # If not specified, choose a sensible default for the Cloud Pub/Sub topic 602 # name. 603 if not pubsub_topic: 604 pubsub_topic = 'projects/%s/topics/%s' % (PopulateProjectId(None), 605 bucket_name) 606 if not pubsub_topic.startswith('projects/'): 607 # If a user picks a topic ID (mytopic) but doesn't pass the whole name ( 608 # projects/my-project/topics/mytopic ), pick a default project. 609 pubsub_topic = 'projects/%s/topics/%s' % (PopulateProjectId(None), 610 pubsub_topic) 611 self.logger.debug('Using Cloud Pub/Sub topic %s', pubsub_topic) 612 613 just_modified_topic_permissions = False 614 if should_setup_topic: 615 # Ask GCS for the email address that represents GCS's permission to 616 # publish to a Cloud Pub/Sub topic from this project. 617 service_account = self.gsutil_api.GetProjectServiceAccount( 618 bucket_project_number, provider=bucket_url.scheme).email_address 619 self.logger.debug('Service account for project %d: %s', 620 bucket_project_number, service_account) 621 just_modified_topic_permissions = self._CreateTopic( 622 pubsub_topic, service_account) 623 624 for attempt_number in range(0, 2): 625 try: 626 create_response = self.gsutil_api.CreateNotificationConfig( 627 bucket_name, 628 pubsub_topic=pubsub_topic, 629 payload_format=payload_format, 630 custom_attributes=custom_attributes, 631 event_types=event_types if event_types else None, 632 object_name_prefix=object_name_prefix, 633 provider=bucket_url.scheme) 634 break 635 except PublishPermissionDeniedException: 636 if attempt_number == 0 and just_modified_topic_permissions: 637 # If we have just set the IAM policy, it may take up to 10 seconds to 638 # take effect. 639 self.logger.info( 640 'Retrying create notification in 10 seconds ' 641 '(new permissions may take up to 10 seconds to take effect.)') 642 time.sleep(10) 643 else: 644 raise 645 646 notification_name = 'projects/_/buckets/%s/notificationConfigs/%s' % ( 647 bucket_name, create_response.id) 648 self.logger.info('Created notification config %s', notification_name) 649 650 return 0 651 652 def _CreateTopic(self, pubsub_topic, service_account): 653 """Assures that a topic exists, creating it if necessary. 654 655 Also adds GCS as a publisher on that bucket, if necessary. 656 657 Args: 658 pubsub_topic: name of the Cloud Pub/Sub topic to use/create. 659 service_account: the GCS service account that needs publish permission. 660 661 Returns: 662 true if we modified IAM permissions, otherwise false. 663 """ 664 665 pubsub_api = PubsubApi(logger=self.logger) 666 667 # Verify that the Pub/Sub topic exists. If it does not, create it. 668 try: 669 pubsub_api.GetTopic(topic_name=pubsub_topic) 670 self.logger.debug('Topic %s already exists', pubsub_topic) 671 except NotFoundException: 672 self.logger.debug('Creating topic %s', pubsub_topic) 673 pubsub_api.CreateTopic(topic_name=pubsub_topic) 674 self.logger.info('Created Cloud Pub/Sub topic %s', pubsub_topic) 675 676 # Verify that the service account is in the IAM policy. 677 policy = pubsub_api.GetTopicIamPolicy(topic_name=pubsub_topic) 678 binding = Binding(role='roles/pubsub.publisher', 679 members=['serviceAccount:%s' % service_account]) 680 681 # This could be more extensive. We could, for instance, check for roles 682 # that are stronger that pubsub.publisher, like owner. We could also 683 # recurse up the hierarchy looking to see if there are project-level 684 # permissions. This can get very complex very quickly, as the caller 685 # may not necessarily have access to the project-level IAM policy. 686 # There's no danger in double-granting permission just to make sure it's 687 # there, though. 688 if binding not in policy.bindings: 689 policy.bindings.append(binding) 690 # transactional safety via etag field. 691 pubsub_api.SetTopicIamPolicy(topic_name=pubsub_topic, policy=policy) 692 return True 693 else: 694 self.logger.debug('GCS already has publish permission to topic %s.', 695 pubsub_topic) 696 return False 697 698 def _EnumerateNotificationsFromArgs(self, accept_notification_configs=True): 699 """Yields bucket/notification tuples from command-line args. 700 701 Given a list of strings that are bucket names (gs://foo) or notification 702 config IDs, yield tuples of bucket names and their associated notifications. 703 704 Args: 705 accept_notification_configs: whether notification configs are valid args. 706 Yields: 707 Tuples of the form (bucket_name, Notification) 708 """ 709 path_regex = self._GetNotificationPathRegex() 710 711 for list_entry in self.args: 712 match = path_regex.match(list_entry) 713 if match: 714 if not accept_notification_configs: 715 raise CommandException( 716 '%s %s accepts only bucket names, but you provided %s' % 717 (self.command_name, self.subcommand_name, list_entry)) 718 bucket_name = match.group('bucket') 719 notification_id = match.group('notification') 720 found = False 721 for notification in self.gsutil_api.ListNotificationConfigs( 722 bucket_name, provider='gs'): 723 if notification.id == notification_id: 724 yield (bucket_name, notification) 725 found = True 726 break 727 if not found: 728 raise NotFoundException('Could not find notification %s' % list_entry) 729 else: 730 storage_url = StorageUrlFromString(list_entry) 731 if not storage_url.IsCloudUrl(): 732 raise CommandException( 733 'The %s command must be used on cloud buckets or notification ' 734 'config names.' % self.command_name) 735 if storage_url.scheme != 'gs': 736 raise CommandException('The %s command only works on gs:// buckets.') 737 path = None 738 if storage_url.IsProvider(): 739 path = 'gs://*' 740 elif storage_url.IsBucket(): 741 path = list_entry 742 if not path: 743 raise CommandException( 744 'The %s command cannot be used on cloud objects, only buckets' % 745 self.command_name) 746 for blr in self.WildcardIterator(path).IterBuckets( 747 bucket_fields=['id']): 748 for notification in self.gsutil_api.ListNotificationConfigs( 749 blr.storage_url.bucket_name, provider='gs'): 750 yield (blr.storage_url.bucket_name, notification) 751 752 def _List(self): 753 self.CheckArguments() 754 if self.sub_opts: 755 if '-o' in dict(self.sub_opts): 756 for bucket_name in self.args: 757 self._ListChannels(bucket_name) 758 else: 759 for bucket_name, notification in self._EnumerateNotificationsFromArgs( 760 accept_notification_configs=False): 761 self._PrintNotificationDetails(bucket_name, notification) 762 return 0 763 764 def _PrintNotificationDetails(self, bucket, notification): 765 print('projects/_/buckets/{bucket}/notificationConfigs/{notification}\n' 766 '\tCloud Pub/Sub topic: {topic}'.format( 767 bucket=bucket, 768 notification=notification.id, 769 topic=notification.topic[len('//pubsub.googleapis.com/'):])) 770 if notification.custom_attributes: 771 print('\tCustom attributes:') 772 for attr in notification.custom_attributes.additionalProperties: 773 print('\t\t%s: %s' % (attr.key, attr.value)) 774 filters = [] 775 if notification.event_types: 776 filters.append('\t\tEvent Types: %s' % 777 ', '.join(notification.event_types)) 778 if notification.object_name_prefix: 779 filters.append("\t\tObject name prefix: '%s'" % 780 notification.object_name_prefix) 781 if filters: 782 print('\tFilters:') 783 for line in filters: 784 print(line) 785 self.logger.info('') 786 787 def _Delete(self): 788 for bucket_name, notification in self._EnumerateNotificationsFromArgs(): 789 self._DeleteNotification(bucket_name, notification.id) 790 return 0 791 792 def _DeleteNotification(self, bucket_name, notification_id): 793 self.gsutil_api.DeleteNotificationConfig(bucket_name, 794 notification=notification_id, 795 provider='gs') 796 return 0 797 798 def _RunSubCommand(self, func): 799 try: 800 (self.sub_opts, 801 self.args) = getopt.getopt(self.args, 802 self.command_spec.supported_sub_args) 803 # Commands with both suboptions and subcommands need to reparse for 804 # suboptions, so we log again. 805 metrics.LogCommandParams(sub_opts=self.sub_opts) 806 return func(self) 807 except getopt.GetoptError: 808 self.RaiseInvalidArgumentException() 809 810 SUBCOMMANDS = { 811 'create': _Create, 812 'list': _List, 813 'delete': _Delete, 814 'watchbucket': _WatchBucket, 815 'stopchannel': _StopChannel 816 } 817 818 def RunCommand(self): 819 """Command entry point for the notification command.""" 820 self.subcommand_name = self.args.pop(0) 821 if self.subcommand_name in NotificationCommand.SUBCOMMANDS: 822 metrics.LogCommandParams(subcommands=[self.subcommand_name]) 823 return self._RunSubCommand( 824 NotificationCommand.SUBCOMMANDS[self.subcommand_name]) 825 else: 826 raise CommandException('Invalid subcommand "%s" for the %s command.' % 827 (self.subcommand_name, self.command_name)) 828