1# -*- coding: utf-8 -*- #
2# Copyright 2016 Google LLC. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#    http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""Methods for looking up completions from the static CLI tree."""
17
18from __future__ import absolute_import
19from __future__ import division
20from __future__ import unicode_literals
21
22import os
23import shlex
24import sys
25from googlecloudsdk.core.util import encoding
26import six
27
28
29LINE_ENV_VAR = 'COMP_LINE'
30POINT_ENV_VAR = 'COMP_POINT'
31IFS_ENV_VAR = '_ARGCOMPLETE_IFS'
32IFS_ENV_DEFAULT = '\013'
33COMPLETIONS_OUTPUT_FD = 8
34
35FLAG_PREFIX = '--'
36
37FLAG_BOOLEAN = 'bool'
38FLAG_DYNAMIC = 'dynamic'
39FLAG_VALUE = 'value'
40
41LOOKUP_COMMANDS = 'commands'
42LOOKUP_FLAGS = 'flags'
43
44_EMPTY_STRING = ''
45_VALUE_SEP = '='
46_SPACE = ' '
47
48
49class CannotHandleCompletionError(Exception):
50  """Error for when completions cannot be handled."""
51  pass
52
53
54def _GetCmdLineFromEnv():
55  """Gets the command line from the environment.
56
57  Returns:
58    str, Command line.
59  """
60  cmd_line = encoding.GetEncodedValue(os.environ, LINE_ENV_VAR)
61  completion_point = int(encoding.GetEncodedValue(os.environ, POINT_ENV_VAR))
62  cmd_line = cmd_line[:completion_point]
63  return cmd_line
64
65
66def _GetCmdWordQueue(cmd_line):
67  """Converts the given cmd_line to a queue of command line words.
68
69  Args:
70    cmd_line: str, full command line before parsing.
71
72  Returns:
73    [str], Queue of command line words.
74  """
75  cmd_words = shlex.split(cmd_line)[1:]  # First word should always be 'gcloud'
76
77  # We need to know if last word was empty. Shlex removes trailing whitespaces.
78  if cmd_line[-1] == _SPACE:
79    cmd_words.append(_EMPTY_STRING)
80
81  # Reverse so we can use as a queue
82  cmd_words.reverse()
83  return cmd_words
84
85
86def _FindCompletions(root, cmd_line):
87  """Try to perform a completion based on the static CLI tree.
88
89  Args:
90    root: The root of the tree that will be traversed to find completions.
91    cmd_line: [str], original command line.
92
93  Raises:
94    CannotHandleCompletionError: If FindCompletions cannot handle completion.
95
96  Returns:
97    []: No completions.
98    [completions]: List, all possible sorted completions.
99  """
100  words = _GetCmdWordQueue(cmd_line)
101  node = root
102
103  global_flags = node[LOOKUP_FLAGS]
104
105  completions = []
106  flag_mode = FLAG_BOOLEAN
107  while words:
108    word = words.pop()
109
110    if word.startswith(FLAG_PREFIX):
111      is_flag_word = True
112      child_nodes = node.get(LOOKUP_FLAGS, {})
113      child_nodes.update(global_flags)
114      # Add the value part back to the queue if it exists
115      if _VALUE_SEP in word:
116        word, flag_value = word.split(_VALUE_SEP, 1)
117        words.append(flag_value)
118    else:
119      is_flag_word = False
120      child_nodes = node.get(LOOKUP_COMMANDS, {})
121
122    # Consume word
123    if words:
124      if word in child_nodes:
125        if is_flag_word:
126          flag_mode = child_nodes[word]
127        else:
128          flag_mode = FLAG_BOOLEAN
129          node = child_nodes[word]  # Progress to next command node
130      elif flag_mode != FLAG_BOOLEAN:
131        flag_mode = FLAG_BOOLEAN
132        continue  # Just consume if we are expecting a flag value
133      else:
134        return []  # Non-existing command/flag, so nothing to do
135
136    # Complete word
137    else:
138      if flag_mode == FLAG_DYNAMIC:
139        raise CannotHandleCompletionError(
140            'Dynamic completions are not handled by this module')
141      elif flag_mode == FLAG_VALUE:
142        return []  # Cannot complete, so nothing to do
143      elif flag_mode != FLAG_BOOLEAN:  # Must be list of choices
144        for value in flag_mode:
145          if value.startswith(word):
146            completions.append(value)
147      elif not child_nodes:
148        raise CannotHandleCompletionError(
149            'Positional completions are not handled by this module')
150      else:  # Command/flag completion
151        for child, value in six.iteritems(child_nodes):
152          if not child.startswith(word):
153            continue
154          if is_flag_word and value != FLAG_BOOLEAN:
155            child += _VALUE_SEP
156          completions.append(child)
157  return sorted(completions)
158
159
160def _GetInstallationRootDir():
161  """Returns the SDK installation root dir."""
162  # Intentionally ignoring config path abstraction imports.
163  return os.path.sep.join(__file__.split(os.path.sep)[:-5])
164
165
166def _GetCompletionCliTreeDir():
167  """Returns the SDK static completion CLI tree dir."""
168  # Intentionally ignoring config path abstraction imports.
169  return os.path.join(_GetInstallationRootDir(), 'data', 'cli')
170
171
172def CompletionCliTreePath(directory=None):
173  """Returns the SDK static completion CLI tree path."""
174  # Intentionally ignoring config path abstraction imports.
175  return os.path.join(
176      directory or _GetCompletionCliTreeDir(), 'gcloud_completions.py')
177
178
179def LoadCompletionCliTree():
180  """Loads and returns the static completion CLI tree."""
181  try:
182    sys_path = sys.path[:]
183    sys.path.append(_GetCompletionCliTreeDir())
184    import gcloud_completions  # pylint: disable=g-import-not-at-top
185    tree = gcloud_completions.STATIC_COMPLETION_CLI_TREE
186  except ImportError:
187    raise CannotHandleCompletionError(
188        'Cannot find static completion CLI tree module.')
189  finally:
190    sys.path = sys_path
191  return tree
192
193
194def _OpenCompletionsOutputStream():
195  """Returns the completions output stream."""
196  return os.fdopen(COMPLETIONS_OUTPUT_FD, 'wb')
197
198
199def _GetCompletions():
200  """Returns the static completions, None if there are none."""
201  root = LoadCompletionCliTree()
202  cmd_line = _GetCmdLineFromEnv()
203  return _FindCompletions(root, cmd_line)
204
205
206def Complete():
207  """Attempts completions and writes them to the completion stream."""
208  completions = _GetCompletions()
209  if completions:
210    # The bash/zsh completion scripts set IFS_ENV_VAR to one character.
211    ifs = encoding.GetEncodedValue(os.environ, IFS_ENV_VAR, IFS_ENV_DEFAULT)
212    # Write completions to stream
213    f = None
214    try:
215      f = _OpenCompletionsOutputStream()
216      # the other side also uses the console encoding
217      f.write(ifs.join(completions).encode())
218    finally:
219      if f:
220        f.close()
221