1# -*- coding: utf-8 -*-
2# Copyright 2013 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""This module provides the notification command to gsutil."""
16
17from __future__ import absolute_import
18from __future__ import print_function
19from __future__ import division
20from __future__ import unicode_literals
21
22import getopt
23import re
24import time
25import uuid
26
27from datetime import datetime
28from gslib import metrics
29from gslib.cloud_api import AccessDeniedException
30from gslib.cloud_api import BadRequestException
31from gslib.cloud_api import NotFoundException
32from gslib.cloud_api import PublishPermissionDeniedException
33from gslib.command import Command
34from gslib.command import NO_MAX
35from gslib.command_argument import CommandArgument
36from gslib.cs_api_map import ApiSelector
37from gslib.exception import CommandException
38from gslib.help_provider import CreateHelpText
39from gslib.project_id import PopulateProjectId
40from gslib.pubsub_api import PubsubApi
41from gslib.storage_url import StorageUrlFromString
42from gslib.third_party.pubsub_apitools.pubsub_v1_messages import Binding
43from gslib.utils import copy_helper
44
45# Cloud Pub/Sub commands
46
47_LIST_SYNOPSIS = """
48  gsutil notification list gs://<bucket_name>...
49"""
50
51_DELETE_SYNOPSIS = """
52  gsutil notification delete (<notificationConfigName>|gs://<bucket_name>)...
53"""
54
55_CREATE_SYNOPSIS = """
56  gsutil notification create -f (json|none) [-p <prefix>] [-t <topic>] \\
57      [-m <key>:<value>]... [-e <eventType>]... gs://<bucket_name>
58"""
59
60# Object Change Notification commands
61
62_WATCHBUCKET_SYNOPSIS = """
63  gsutil notification watchbucket [-i <id>] [-t <token>] <app_url> gs://<bucket_name>
64"""
65
66_STOPCHANNEL_SYNOPSIS = """
67  gsutil notification stopchannel <channel_id> <resource_id>
68"""
69
70_SYNOPSIS = (
71    _CREATE_SYNOPSIS +
72    _DELETE_SYNOPSIS.lstrip('\n') +
73    _LIST_SYNOPSIS.lstrip('\n') +
74    _WATCHBUCKET_SYNOPSIS +
75    _STOPCHANNEL_SYNOPSIS.lstrip('\n') + '\n')  # yapf: disable
76
77_LIST_DESCRIPTION = """
78<B>LIST</B>
79  The list sub-command provides a list of notification configs belonging to a
80  given bucket. The listed name of each notification config can be used with
81  the delete sub-command to delete that specific notification config.
82
83  For listing Object Change Notifications instead of Cloud Pub/Sub notification
84  subscription configs, add a -o flag.
85
86<B>LIST EXAMPLES</B>
87  Fetch the list of notification configs for the bucket example-bucket:
88
89    gsutil notification list gs://example-bucket
90
91  The same as above, but for Object Change Notifications instead of Cloud
92  Pub/Sub notification subscription configs:
93
94    gsutil notification list -o gs://example-bucket
95
96  Fetch the notification configs in all buckets matching a wildcard:
97
98    gsutil notification list gs://example-*
99
100  Fetch all of the notification configs for buckets in the default project:
101
102    gsutil notification list gs://*
103"""
104
105_DELETE_DESCRIPTION = """
106<B>DELETE</B>
107  The delete sub-command deletes notification configs from a bucket. If a
108  notification config name is passed as a parameter, that notification config
109  alone is deleted. If a bucket name is passed, all notification configs
110  associated with that bucket are deleted.
111
112  Cloud Pub/Sub topics associated with this notification config are not
113  deleted by this command. Those must be deleted separately, for example with
114  the gcloud command `gcloud beta pubsub topics delete`.
115
116  Object Change Notification subscriptions cannot be deleted with this command.
117  For that, see the command `gsutil notification stopchannel`.
118
119<B>DELETE EXAMPLES</B>
120  Delete a single notification config (with ID 3) in the bucket example-bucket:
121
122    gsutil notification delete projects/_/buckets/example-bucket/notificationConfigs/3
123
124  Delete all notification configs in the bucket example-bucket:
125
126    gsutil notification delete gs://example-bucket
127"""
128
129_CREATE_DESCRIPTION = """
130<B>CREATE</B>
131  The create sub-command creates a notification config on a bucket, establishing
132  a flow of event notifications from Cloud Storage to a Cloud Pub/Sub topic. As
133  part of creating this flow, the create command also verifies that the
134  destination Cloud Pub/Sub topic exists, creating it if necessary, and verifies
135  that the Cloud Storage bucket has permission to publish events to that topic,
136  granting the permission if necessary.
137
138  If a destination Cloud Pub/Sub topic is not specified with the -t flag, Cloud
139  Storage chooses a topic name in the default project whose ID is the same as
140  the bucket name. For example, if the default project ID specified is
141  'default-project' and the bucket being configured is gs://example-bucket, the
142  create command uses the Cloud Pub/Sub topic
143  "projects/default-project/topics/example-bucket".
144
145  In order to enable notifications, a `special Cloud Storage service account
146  <https://cloud.google.com/storage/docs/projects#service-accounts>`_ unique to
147  each project must have the IAM permission "pubsub.topics.publish". This
148  command checks to see if the destination Cloud Pub/Sub topic grants the
149  service account this permission. If not, the create command attempts to
150  grant it.
151
152  You can create multiple notification configurations for a bucket, but their
153  triggers cannot overlap such that a single event could send multiple
154  notifications. Attempting to create a notification configuration that
155  overlaps with an existing notification configuration results in an error.
156
157<B>CREATE EXAMPLES</B>
158  Begin sending notifications of all changes to the bucket example-bucket
159  to the Cloud Pub/Sub topic projects/default-project/topics/example-bucket:
160
161    gsutil notification create -f json gs://example-bucket
162
163  The same as above, but specifies the destination topic ID 'files-to-process'
164  in the default project:
165
166    gsutil notification create -f json \\
167      -t files-to-process gs://example-bucket
168
169  The same as above, but specifies a Cloud Pub/Sub topic belonging to the
170  specific cloud project 'example-project':
171
172    gsutil notification create -f json \\
173      -t projects/example-project/topics/files-to-process gs://example-bucket
174
175  Create a notification config that only sends an event when a new object
176  has been created:
177
178    gsutil notification create -f json -e OBJECT_FINALIZE gs://example-bucket
179
180  Create a topic and notification config that only sends an event when
181  an object beginning with "photos/" is affected:
182
183    gsutil notification create -p photos/ gs://example-bucket
184
185  List all of the notificationConfigs in bucket example-bucket:
186
187    gsutil notification list gs://example-bucket
188
189  Delete all notitificationConfigs for bucket example-bucket:
190
191    gsutil notification delete gs://example-bucket
192
193  Delete one specific notificationConfig for bucket example-bucket:
194
195    gsutil notification delete \\
196      projects/_/buckets/example-bucket/notificationConfigs/1
197
198<B>OPTIONS</B>
199  The create sub-command has the following options
200
201  -e        Specify an event type filter for this notification config. Cloud
202            Storage only sends notifications of this type. You may specify this
203            parameter multiple times to allow multiple event types. If not
204            specified, Cloud Storage sends notifications for all event types.
205            The valid types are:
206
207              OBJECT_FINALIZE - An object has been created.
208              OBJECT_METADATA_UPDATE - The metadata of an object has changed.
209              OBJECT_DELETE - An object has been permanently deleted.
210              OBJECT_ARCHIVE - A live version of an object has become a
211                noncurrent version.
212
213  -f        Specifies the payload format of notification messages. Must be
214            either "json" for a payload matches the object metadata for the
215            JSON API, or "none" to specify no payload at all. In either case,
216            notification details are available in the message attributes.
217
218  -m        Specifies a key:value attribute that is appended to the set
219            of attributes sent to Cloud Pub/Sub for all events associated with
220            this notification config. You may specify this parameter multiple
221            times to set multiple attributes.
222
223  -p        Specifies a prefix path filter for this notification config. Cloud
224            Storage only sends notifications for objects in this bucket whose
225            names begin with the specified prefix.
226
227  -s        Skips creation and permission assignment of the Cloud Pub/Sub topic.
228            This is useful if the caller does not have permission to access
229            the topic in question, or if the topic already exists and has the
230            appropriate publish permission assigned.
231
232  -t        The Cloud Pub/Sub topic to which notifications should be sent. If
233            not specified, this command chooses a topic whose project is your
234            default project and whose ID is the same as the Cloud Storage bucket
235            name.
236
237<B>NEXT STEPS</B>
238  Once the create command has succeeded, Cloud Storage publishes a message to
239  the specified Cloud Pub/Sub topic when eligible changes occur. In order to
240  receive these messages, you must create a Pub/Sub subscription for your
241  Pub/Sub topic. To learn more about creating Pub/Sub subscriptions, see `the
242  Pub/Sub Subscriber Overview <https://cloud.google.com/pubsub/docs/subscriber>`_.
243
244  You can create a simple Pub/Sub subscription using the ``gcloud`` command-line
245  tool. For example, to create a new subscription on the topic "myNewTopic" and
246  attempt to pull messages from it, you could run:
247
248    gcloud beta pubsub subscriptions create --topic myNewTopic testSubscription
249    gcloud beta pubsub subscriptions pull --auto-ack testSubscription
250"""
251
252_WATCHBUCKET_DESCRIPTION = """
253<B>WATCHBUCKET</B>
254  The watchbucket sub-command can be used to watch a bucket for object changes.
255  A service account must be used when running this command.
256
257  The app_url parameter must be an HTTPS URL to an application that will be
258  notified of changes to any object in the bucket. The URL endpoint must be
259  a verified domain on your project. See `Notification Authorization
260  <https://cloud.google.com/storage/docs/object-change-notification#_Authorization>`_
261  for details.
262
263  The optional id parameter can be used to assign a unique identifier to the
264  created notification channel. If not provided, a random UUID string is
265  generated.
266
267  The optional token parameter can be used to validate notifications events.
268  To do this, set this custom token and store it to later verify that
269  notification events contain the client token you expect.
270
271<B>WATCHBUCKET EXAMPLES</B>
272  Watch the bucket example-bucket for changes and send notifications to an
273  application server running at example.com:
274
275    gsutil notification watchbucket https://example.com/notify \\
276      gs://example-bucket
277
278  Assign identifier my-channel-id to the created notification channel:
279
280    gsutil notification watchbucket -i my-channel-id \\
281      https://example.com/notify gs://example-bucket
282
283  Set a custom client token that is included with each notification event:
284
285    gsutil notification watchbucket -t my-client-token \\
286      https://example.com/notify gs://example-bucket
287"""
288
289_STOPCHANNEL_DESCRIPTION = """
290<B>STOPCHANNEL</B>
291  The stopchannel sub-command can be used to stop sending change events to a
292  notification channel.
293
294  The channel_id and resource_id parameters should match the values from the
295  response of a bucket watch request.
296
297<B>STOPCHANNEL EXAMPLES</B>
298  Stop the notification event channel with channel identifier channel1 and
299  resource identifier SoGqan08XDIFWr1Fv_nGpRJBHh8:
300
301    gsutil notification stopchannel channel1 SoGqan08XDIFWr1Fv_nGpRJBHh8
302"""
303
304_DESCRIPTION = """
305  You can use the ``notification`` command to configure
306  `Pub/Sub notifications for Cloud Storage
307  <https://cloud.google.com/storage/docs/pubsub-notifications>`_
308  and `Object change notification
309  <https://cloud.google.com/storage/docs/object-change-notification>`_ channels.
310
311<B>CLOUD PUB/SUB</B>
312  The "create", "list", and "delete" sub-commands deal with configuring Cloud
313  Storage integration with Google Cloud Pub/Sub.
314""" + _CREATE_DESCRIPTION + _LIST_DESCRIPTION + _DELETE_DESCRIPTION + """
315<B>OBJECT CHANGE NOTIFICATIONS</B>
316  Object change notification is a separate, older feature within Cloud Storage
317  for generating notifications. This feature sends HTTPS messages to a client
318  application that you've set up separately. This feature is generally not
319  recommended, because Pub/Sub notifications are cheaper, easier to use, and
320  more flexible. For more information, see
321  `Object change notification
322  <https://cloud.google.com/storage/docs/object-change-notification>`_.
323
324  The "watchbucket" and "stopchannel" sub-commands enable and disable Object
325  change notifications.
326""" + _WATCHBUCKET_DESCRIPTION + _STOPCHANNEL_DESCRIPTION + """
327<B>NOTIFICATIONS AND PARALLEL COMPOSITE UPLOADS</B>
328  gsutil supports `parallel composite uploads
329  <https://cloud.google.com/storage/docs/uploads-downloads#parallel-composite-uploads>`_.
330  If enabled, an upload can result in multiple temporary component objects
331  being uploaded before the actual intended object is created. Any subscriber
332  to notifications for this bucket then sees a notification for each of these
333  components being created and deleted. If this is a concern for you, note
334  that parallel composite uploads can be disabled by setting
335  "parallel_composite_upload_threshold = 0" in your .boto config file.
336  Alternately, your subscriber code can filter out gsutil's parallel
337  composite uploads by ignoring any notification about objects whose names
338  contain (but do not start with) the following string:
339    "{composite_namespace}".
340
341""".format(composite_namespace=copy_helper.PARALLEL_UPLOAD_TEMP_NAMESPACE)
342
343NOTIFICATION_AUTHORIZATION_FAILED_MESSAGE = """
344Watch bucket attempt failed:
345  {watch_error}
346
347You attempted to watch a bucket with an application URL of:
348
349  {watch_url}
350
351which is not authorized for your project. Please ensure that you are using
352Service Account authentication and that the Service Account's project is
353authorized for the application URL. Notification endpoint URLs must also be
354whitelisted in your Cloud Console project. To do that, the domain must also be
355verified using Google Webmaster Tools. For instructions, please see
356`Notification Authorization
357<https://cloud.google.com/storage/docs/object-change-notification#_Authorization>`_.
358"""
359
360_DETAILED_HELP_TEXT = CreateHelpText(_SYNOPSIS, _DESCRIPTION)
361
362# yapf: disable
363_create_help_text = (
364    CreateHelpText(_CREATE_SYNOPSIS, _CREATE_DESCRIPTION))
365_list_help_text = (
366    CreateHelpText(_LIST_SYNOPSIS, _LIST_DESCRIPTION))
367_delete_help_text = (
368    CreateHelpText(_DELETE_SYNOPSIS, _DELETE_DESCRIPTION))
369_watchbucket_help_text = (
370    CreateHelpText(_WATCHBUCKET_SYNOPSIS, _WATCHBUCKET_DESCRIPTION))
371_stopchannel_help_text = (
372    CreateHelpText(_STOPCHANNEL_SYNOPSIS, _STOPCHANNEL_DESCRIPTION))
373# yapf: enable
374
375PAYLOAD_FORMAT_MAP = {
376    'none': 'NONE',
377    'json': 'JSON_API_V1',
378}
379
380
381class NotificationCommand(Command):
382  """Implementation of gsutil notification command."""
383
384  # Notification names might look like one of these:
385  #  canonical form:  projects/_/buckets/bucket/notificationConfigs/3
386  #  JSON API form:   b/bucket/notificationConfigs/5
387  # Either of the above might start with a / if a user is copying & pasting.
388  def _GetNotificationPathRegex(self):
389    if not NotificationCommand._notification_path_regex:
390      NotificationCommand._notification_path_regex = re.compile(
391          ('/?(projects/[^/]+/)?b(uckets)?/(?P<bucket>[^/]+)/'
392           'notificationConfigs/(?P<notification>[0-9]+)'))
393    return NotificationCommand._notification_path_regex
394
395  _notification_path_regex = None
396
397  # Command specification. See base class for documentation.
398  command_spec = Command.CreateCommandSpec(
399      'notification',
400      command_name_aliases=[
401          'notify',
402          'notifyconfig',
403          'notifications',
404          'notif',
405      ],
406      usage_synopsis=_SYNOPSIS,
407      min_args=2,
408      max_args=NO_MAX,
409      supported_sub_args='i:t:m:t:of:e:p:s',
410      file_url_ok=False,
411      provider_url_ok=False,
412      urls_start_arg=1,
413      gs_api_support=[ApiSelector.JSON],
414      gs_default_api=ApiSelector.JSON,
415      argparse_arguments={
416          'watchbucket': [
417              CommandArgument.MakeFreeTextArgument(),
418              CommandArgument.MakeZeroOrMoreCloudBucketURLsArgument(),
419          ],
420          'stopchannel': [],
421          'list': [CommandArgument.MakeZeroOrMoreCloudBucketURLsArgument(),],
422          'delete': [
423              # Takes a list of one of the following:
424              #   notification: projects/_/buckets/bla/notificationConfigs/5,
425              #   bucket: gs://foobar
426              CommandArgument.MakeZeroOrMoreCloudURLsArgument(),
427          ],
428          'create': [
429              CommandArgument.MakeFreeTextArgument(),  # Cloud Pub/Sub topic
430              CommandArgument.MakeNCloudBucketURLsArgument(1),
431          ]
432      },
433  )
434  # Help specification. See help_provider.py for documentation.
435  help_spec = Command.HelpSpec(
436      help_name='notification',
437      help_name_aliases=[
438          'watchbucket',
439          'stopchannel',
440          'notifyconfig',
441      ],
442      help_type='command_help',
443      help_one_line_summary='Configure object change notification',
444      help_text=_DETAILED_HELP_TEXT,
445      subcommand_help_text={
446          'create': _create_help_text,
447          'list': _list_help_text,
448          'delete': _delete_help_text,
449          'watchbucket': _watchbucket_help_text,
450          'stopchannel': _stopchannel_help_text,
451      },
452  )
453
454  def _WatchBucket(self):
455    """Creates a watch on a bucket given in self.args."""
456    self.CheckArguments()
457    identifier = None
458    client_token = None
459    if self.sub_opts:
460      for o, a in self.sub_opts:
461        if o == '-i':
462          identifier = a
463        if o == '-t':
464          client_token = a
465
466    identifier = identifier or str(uuid.uuid4())
467    watch_url = self.args[0]
468    bucket_arg = self.args[-1]
469
470    if not watch_url.lower().startswith('https://'):
471      raise CommandException('The application URL must be an https:// URL.')
472
473    bucket_url = StorageUrlFromString(bucket_arg)
474    if not (bucket_url.IsBucket() and bucket_url.scheme == 'gs'):
475      raise CommandException(
476          'The %s command can only be used with gs:// bucket URLs.' %
477          self.command_name)
478    if not bucket_url.IsBucket():
479      raise CommandException('URL must name a bucket for the %s command.' %
480                             self.command_name)
481
482    self.logger.info('Watching bucket %s with application URL %s ...',
483                     bucket_url, watch_url)
484
485    try:
486      channel = self.gsutil_api.WatchBucket(bucket_url.bucket_name,
487                                            watch_url,
488                                            identifier,
489                                            token=client_token,
490                                            provider=bucket_url.scheme)
491    except AccessDeniedException as e:
492      self.logger.warn(
493          NOTIFICATION_AUTHORIZATION_FAILED_MESSAGE.format(watch_error=str(e),
494                                                           watch_url=watch_url))
495      raise
496
497    channel_id = channel.id
498    resource_id = channel.resourceId
499    client_token = channel.token
500    self.logger.info('Successfully created watch notification channel.')
501    self.logger.info('Watch channel identifier: %s', channel_id)
502    self.logger.info('Canonicalized resource identifier: %s', resource_id)
503    self.logger.info('Client state token: %s', client_token)
504
505    return 0
506
507  def _StopChannel(self):
508    channel_id = self.args[0]
509    resource_id = self.args[1]
510
511    self.logger.info('Removing channel %s with resource identifier %s ...',
512                     channel_id, resource_id)
513    self.gsutil_api.StopChannel(channel_id, resource_id, provider='gs')
514    self.logger.info('Succesfully removed channel.')
515
516    return 0
517
518  def _ListChannels(self, bucket_arg):
519    """Lists active channel watches on a bucket given in self.args."""
520    bucket_url = StorageUrlFromString(bucket_arg)
521    if not (bucket_url.IsBucket() and bucket_url.scheme == 'gs'):
522      raise CommandException(
523          'The %s command can only be used with gs:// bucket URLs.' %
524          self.command_name)
525    if not bucket_url.IsBucket():
526      raise CommandException('URL must name a bucket for the %s command.' %
527                             self.command_name)
528    channels = self.gsutil_api.ListChannels(bucket_url.bucket_name,
529                                            provider='gs').items
530    self.logger.info(
531        'Bucket %s has the following active Object Change Notifications:',
532        bucket_url.bucket_name)
533    for idx, channel in enumerate(channels):
534      self.logger.info('\tNotification channel %d:', idx + 1)
535      self.logger.info('\t\tChannel identifier: %s', channel.channel_id)
536      self.logger.info('\t\tResource identifier: %s', channel.resource_id)
537      self.logger.info('\t\tApplication URL: %s', channel.push_url)
538      self.logger.info('\t\tCreated by: %s', channel.subscriber_email)
539      self.logger.info(
540          '\t\tCreation time: %s',
541          str(datetime.fromtimestamp(channel.creation_time_ms / 1000)))
542
543    return 0
544
545  def _Create(self):
546    self.CheckArguments()
547
548    # User-specified options
549    pubsub_topic = None
550    payload_format = None
551    custom_attributes = {}
552    event_types = []
553    object_name_prefix = None
554    should_setup_topic = True
555
556    if self.sub_opts:
557      for o, a in self.sub_opts:
558        if o == '-e':
559          event_types.append(a)
560        elif o == '-f':
561          payload_format = a
562        elif o == '-m':
563          if ':' not in a:
564            raise CommandException(
565                'Custom attributes specified with -m should be of the form '
566                'key:value')
567          key, value = a.split(':')
568          custom_attributes[key] = value
569        elif o == '-p':
570          object_name_prefix = a
571        elif o == '-s':
572          should_setup_topic = False
573        elif o == '-t':
574          pubsub_topic = a
575
576    if payload_format not in PAYLOAD_FORMAT_MAP:
577      raise CommandException(
578          "Must provide a payload format with -f of either 'json' or 'none'")
579    payload_format = PAYLOAD_FORMAT_MAP[payload_format]
580
581    bucket_arg = self.args[-1]
582
583    bucket_url = StorageUrlFromString(bucket_arg)
584    if not bucket_url.IsCloudUrl() or not bucket_url.IsBucket():
585      raise CommandException(
586          "%s %s requires a GCS bucket name, but got '%s'" %
587          (self.command_name, self.subcommand_name, bucket_arg))
588    if bucket_url.scheme != 'gs':
589      raise CommandException(
590          'The %s command can only be used with gs:// bucket URLs.' %
591          self.command_name)
592    bucket_name = bucket_url.bucket_name
593    self.logger.debug('Creating notification for bucket %s', bucket_url)
594
595    # Find the project this bucket belongs to
596    bucket_metadata = self.gsutil_api.GetBucket(bucket_name,
597                                                fields=['projectNumber'],
598                                                provider=bucket_url.scheme)
599    bucket_project_number = bucket_metadata.projectNumber
600
601    # If not specified, choose a sensible default for the Cloud Pub/Sub topic
602    # name.
603    if not pubsub_topic:
604      pubsub_topic = 'projects/%s/topics/%s' % (PopulateProjectId(None),
605                                                bucket_name)
606    if not pubsub_topic.startswith('projects/'):
607      # If a user picks a topic ID (mytopic) but doesn't pass the whole name (
608      # projects/my-project/topics/mytopic ), pick a default project.
609      pubsub_topic = 'projects/%s/topics/%s' % (PopulateProjectId(None),
610                                                pubsub_topic)
611    self.logger.debug('Using Cloud Pub/Sub topic %s', pubsub_topic)
612
613    just_modified_topic_permissions = False
614    if should_setup_topic:
615      # Ask GCS for the email address that represents GCS's permission to
616      # publish to a Cloud Pub/Sub topic from this project.
617      service_account = self.gsutil_api.GetProjectServiceAccount(
618          bucket_project_number, provider=bucket_url.scheme).email_address
619      self.logger.debug('Service account for project %d: %s',
620                        bucket_project_number, service_account)
621      just_modified_topic_permissions = self._CreateTopic(
622          pubsub_topic, service_account)
623
624    for attempt_number in range(0, 2):
625      try:
626        create_response = self.gsutil_api.CreateNotificationConfig(
627            bucket_name,
628            pubsub_topic=pubsub_topic,
629            payload_format=payload_format,
630            custom_attributes=custom_attributes,
631            event_types=event_types if event_types else None,
632            object_name_prefix=object_name_prefix,
633            provider=bucket_url.scheme)
634        break
635      except PublishPermissionDeniedException:
636        if attempt_number == 0 and just_modified_topic_permissions:
637          # If we have just set the IAM policy, it may take up to 10 seconds to
638          # take effect.
639          self.logger.info(
640              'Retrying create notification in 10 seconds '
641              '(new permissions may take up to 10 seconds to take effect.)')
642          time.sleep(10)
643        else:
644          raise
645
646    notification_name = 'projects/_/buckets/%s/notificationConfigs/%s' % (
647        bucket_name, create_response.id)
648    self.logger.info('Created notification config %s', notification_name)
649
650    return 0
651
652  def _CreateTopic(self, pubsub_topic, service_account):
653    """Assures that a topic exists, creating it if necessary.
654
655    Also adds GCS as a publisher on that bucket, if necessary.
656
657    Args:
658      pubsub_topic: name of the Cloud Pub/Sub topic to use/create.
659      service_account: the GCS service account that needs publish permission.
660
661    Returns:
662      true if we modified IAM permissions, otherwise false.
663    """
664
665    pubsub_api = PubsubApi(logger=self.logger)
666
667    # Verify that the Pub/Sub topic exists. If it does not, create it.
668    try:
669      pubsub_api.GetTopic(topic_name=pubsub_topic)
670      self.logger.debug('Topic %s already exists', pubsub_topic)
671    except NotFoundException:
672      self.logger.debug('Creating topic %s', pubsub_topic)
673      pubsub_api.CreateTopic(topic_name=pubsub_topic)
674      self.logger.info('Created Cloud Pub/Sub topic %s', pubsub_topic)
675
676    # Verify that the service account is in the IAM policy.
677    policy = pubsub_api.GetTopicIamPolicy(topic_name=pubsub_topic)
678    binding = Binding(role='roles/pubsub.publisher',
679                      members=['serviceAccount:%s' % service_account])
680
681    # This could be more extensive. We could, for instance, check for roles
682    # that are stronger that pubsub.publisher, like owner. We could also
683    # recurse up the hierarchy looking to see if there are project-level
684    # permissions. This can get very complex very quickly, as the caller
685    # may not necessarily have access to the project-level IAM policy.
686    # There's no danger in double-granting permission just to make sure it's
687    # there, though.
688    if binding not in policy.bindings:
689      policy.bindings.append(binding)
690      # transactional safety via etag field.
691      pubsub_api.SetTopicIamPolicy(topic_name=pubsub_topic, policy=policy)
692      return True
693    else:
694      self.logger.debug('GCS already has publish permission to topic %s.',
695                        pubsub_topic)
696      return False
697
698  def _EnumerateNotificationsFromArgs(self, accept_notification_configs=True):
699    """Yields bucket/notification tuples from command-line args.
700
701    Given a list of strings that are bucket names (gs://foo) or notification
702    config IDs, yield tuples of bucket names and their associated notifications.
703
704    Args:
705      accept_notification_configs: whether notification configs are valid args.
706    Yields:
707      Tuples of the form (bucket_name, Notification)
708    """
709    path_regex = self._GetNotificationPathRegex()
710
711    for list_entry in self.args:
712      match = path_regex.match(list_entry)
713      if match:
714        if not accept_notification_configs:
715          raise CommandException(
716              '%s %s accepts only bucket names, but you provided %s' %
717              (self.command_name, self.subcommand_name, list_entry))
718        bucket_name = match.group('bucket')
719        notification_id = match.group('notification')
720        found = False
721        for notification in self.gsutil_api.ListNotificationConfigs(
722            bucket_name, provider='gs'):
723          if notification.id == notification_id:
724            yield (bucket_name, notification)
725            found = True
726            break
727        if not found:
728          raise NotFoundException('Could not find notification %s' % list_entry)
729      else:
730        storage_url = StorageUrlFromString(list_entry)
731        if not storage_url.IsCloudUrl():
732          raise CommandException(
733              'The %s command must be used on cloud buckets or notification '
734              'config names.' % self.command_name)
735        if storage_url.scheme != 'gs':
736          raise CommandException('The %s command only works on gs:// buckets.')
737        path = None
738        if storage_url.IsProvider():
739          path = 'gs://*'
740        elif storage_url.IsBucket():
741          path = list_entry
742        if not path:
743          raise CommandException(
744              'The %s command cannot be used on cloud objects, only buckets' %
745              self.command_name)
746        for blr in self.WildcardIterator(path).IterBuckets(
747            bucket_fields=['id']):
748          for notification in self.gsutil_api.ListNotificationConfigs(
749              blr.storage_url.bucket_name, provider='gs'):
750            yield (blr.storage_url.bucket_name, notification)
751
752  def _List(self):
753    self.CheckArguments()
754    if self.sub_opts:
755      if '-o' in dict(self.sub_opts):
756        for bucket_name in self.args:
757          self._ListChannels(bucket_name)
758    else:
759      for bucket_name, notification in self._EnumerateNotificationsFromArgs(
760          accept_notification_configs=False):
761        self._PrintNotificationDetails(bucket_name, notification)
762    return 0
763
764  def _PrintNotificationDetails(self, bucket, notification):
765    print('projects/_/buckets/{bucket}/notificationConfigs/{notification}\n'
766          '\tCloud Pub/Sub topic: {topic}'.format(
767              bucket=bucket,
768              notification=notification.id,
769              topic=notification.topic[len('//pubsub.googleapis.com/'):]))
770    if notification.custom_attributes:
771      print('\tCustom attributes:')
772      for attr in notification.custom_attributes.additionalProperties:
773        print('\t\t%s: %s' % (attr.key, attr.value))
774    filters = []
775    if notification.event_types:
776      filters.append('\t\tEvent Types: %s' %
777                     ', '.join(notification.event_types))
778    if notification.object_name_prefix:
779      filters.append("\t\tObject name prefix: '%s'" %
780                     notification.object_name_prefix)
781    if filters:
782      print('\tFilters:')
783      for line in filters:
784        print(line)
785    self.logger.info('')
786
787  def _Delete(self):
788    for bucket_name, notification in self._EnumerateNotificationsFromArgs():
789      self._DeleteNotification(bucket_name, notification.id)
790    return 0
791
792  def _DeleteNotification(self, bucket_name, notification_id):
793    self.gsutil_api.DeleteNotificationConfig(bucket_name,
794                                             notification=notification_id,
795                                             provider='gs')
796    return 0
797
798  def _RunSubCommand(self, func):
799    try:
800      (self.sub_opts,
801       self.args) = getopt.getopt(self.args,
802                                  self.command_spec.supported_sub_args)
803      # Commands with both suboptions and subcommands need to reparse for
804      # suboptions, so we log again.
805      metrics.LogCommandParams(sub_opts=self.sub_opts)
806      return func(self)
807    except getopt.GetoptError:
808      self.RaiseInvalidArgumentException()
809
810  SUBCOMMANDS = {
811      'create': _Create,
812      'list': _List,
813      'delete': _Delete,
814      'watchbucket': _WatchBucket,
815      'stopchannel': _StopChannel
816  }
817
818  def RunCommand(self):
819    """Command entry point for the notification command."""
820    self.subcommand_name = self.args.pop(0)
821    if self.subcommand_name in NotificationCommand.SUBCOMMANDS:
822      metrics.LogCommandParams(subcommands=[self.subcommand_name])
823      return self._RunSubCommand(
824          NotificationCommand.SUBCOMMANDS[self.subcommand_name])
825    else:
826      raise CommandException('Invalid subcommand "%s" for the %s command.' %
827                             (self.subcommand_name, self.command_name))
828