1# -*- coding: utf-8 -*- # 2# Copyright 2016 Google LLC. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"""Methods for looking up completions from the static CLI tree.""" 17 18from __future__ import absolute_import 19from __future__ import division 20from __future__ import unicode_literals 21 22import os 23import shlex 24import sys 25from googlecloudsdk.core.util import encoding 26import six 27 28 29LINE_ENV_VAR = 'COMP_LINE' 30POINT_ENV_VAR = 'COMP_POINT' 31IFS_ENV_VAR = '_ARGCOMPLETE_IFS' 32IFS_ENV_DEFAULT = '\013' 33COMPLETIONS_OUTPUT_FD = 8 34 35FLAG_PREFIX = '--' 36 37FLAG_BOOLEAN = 'bool' 38FLAG_DYNAMIC = 'dynamic' 39FLAG_VALUE = 'value' 40 41LOOKUP_COMMANDS = 'commands' 42LOOKUP_FLAGS = 'flags' 43 44_EMPTY_STRING = '' 45_VALUE_SEP = '=' 46_SPACE = ' ' 47 48 49class CannotHandleCompletionError(Exception): 50 """Error for when completions cannot be handled.""" 51 pass 52 53 54def _GetCmdLineFromEnv(): 55 """Gets the command line from the environment. 56 57 Returns: 58 str, Command line. 59 """ 60 cmd_line = encoding.GetEncodedValue(os.environ, LINE_ENV_VAR) 61 completion_point = int(encoding.GetEncodedValue(os.environ, POINT_ENV_VAR)) 62 cmd_line = cmd_line[:completion_point] 63 return cmd_line 64 65 66def _GetCmdWordQueue(cmd_line): 67 """Converts the given cmd_line to a queue of command line words. 68 69 Args: 70 cmd_line: str, full command line before parsing. 71 72 Returns: 73 [str], Queue of command line words. 74 """ 75 cmd_words = shlex.split(cmd_line)[1:] # First word should always be 'gcloud' 76 77 # We need to know if last word was empty. Shlex removes trailing whitespaces. 78 if cmd_line[-1] == _SPACE: 79 cmd_words.append(_EMPTY_STRING) 80 81 # Reverse so we can use as a queue 82 cmd_words.reverse() 83 return cmd_words 84 85 86def _FindCompletions(root, cmd_line): 87 """Try to perform a completion based on the static CLI tree. 88 89 Args: 90 root: The root of the tree that will be traversed to find completions. 91 cmd_line: [str], original command line. 92 93 Raises: 94 CannotHandleCompletionError: If FindCompletions cannot handle completion. 95 96 Returns: 97 []: No completions. 98 [completions]: List, all possible sorted completions. 99 """ 100 words = _GetCmdWordQueue(cmd_line) 101 node = root 102 103 global_flags = node[LOOKUP_FLAGS] 104 105 completions = [] 106 flag_mode = FLAG_BOOLEAN 107 while words: 108 word = words.pop() 109 110 if word.startswith(FLAG_PREFIX): 111 is_flag_word = True 112 child_nodes = node.get(LOOKUP_FLAGS, {}) 113 child_nodes.update(global_flags) 114 # Add the value part back to the queue if it exists 115 if _VALUE_SEP in word: 116 word, flag_value = word.split(_VALUE_SEP, 1) 117 words.append(flag_value) 118 else: 119 is_flag_word = False 120 child_nodes = node.get(LOOKUP_COMMANDS, {}) 121 122 # Consume word 123 if words: 124 if word in child_nodes: 125 if is_flag_word: 126 flag_mode = child_nodes[word] 127 else: 128 flag_mode = FLAG_BOOLEAN 129 node = child_nodes[word] # Progress to next command node 130 elif flag_mode != FLAG_BOOLEAN: 131 flag_mode = FLAG_BOOLEAN 132 continue # Just consume if we are expecting a flag value 133 else: 134 return [] # Non-existing command/flag, so nothing to do 135 136 # Complete word 137 else: 138 if flag_mode == FLAG_DYNAMIC: 139 raise CannotHandleCompletionError( 140 'Dynamic completions are not handled by this module') 141 elif flag_mode == FLAG_VALUE: 142 return [] # Cannot complete, so nothing to do 143 elif flag_mode != FLAG_BOOLEAN: # Must be list of choices 144 for value in flag_mode: 145 if value.startswith(word): 146 completions.append(value) 147 elif not child_nodes: 148 raise CannotHandleCompletionError( 149 'Positional completions are not handled by this module') 150 else: # Command/flag completion 151 for child, value in six.iteritems(child_nodes): 152 if not child.startswith(word): 153 continue 154 if is_flag_word and value != FLAG_BOOLEAN: 155 child += _VALUE_SEP 156 completions.append(child) 157 return sorted(completions) 158 159 160def _GetInstallationRootDir(): 161 """Returns the SDK installation root dir.""" 162 # Intentionally ignoring config path abstraction imports. 163 return os.path.sep.join(__file__.split(os.path.sep)[:-5]) 164 165 166def _GetCompletionCliTreeDir(): 167 """Returns the SDK static completion CLI tree dir.""" 168 # Intentionally ignoring config path abstraction imports. 169 return os.path.join(_GetInstallationRootDir(), 'data', 'cli') 170 171 172def CompletionCliTreePath(directory=None): 173 """Returns the SDK static completion CLI tree path.""" 174 # Intentionally ignoring config path abstraction imports. 175 return os.path.join( 176 directory or _GetCompletionCliTreeDir(), 'gcloud_completions.py') 177 178 179def LoadCompletionCliTree(): 180 """Loads and returns the static completion CLI tree.""" 181 try: 182 sys_path = sys.path[:] 183 sys.path.append(_GetCompletionCliTreeDir()) 184 import gcloud_completions # pylint: disable=g-import-not-at-top 185 tree = gcloud_completions.STATIC_COMPLETION_CLI_TREE 186 except ImportError: 187 raise CannotHandleCompletionError( 188 'Cannot find static completion CLI tree module.') 189 finally: 190 sys.path = sys_path 191 return tree 192 193 194def _OpenCompletionsOutputStream(): 195 """Returns the completions output stream.""" 196 return os.fdopen(COMPLETIONS_OUTPUT_FD, 'wb') 197 198 199def _GetCompletions(): 200 """Returns the static completions, None if there are none.""" 201 root = LoadCompletionCliTree() 202 cmd_line = _GetCmdLineFromEnv() 203 return _FindCompletions(root, cmd_line) 204 205 206def Complete(): 207 """Attempts completions and writes them to the completion stream.""" 208 completions = _GetCompletions() 209 if completions: 210 # The bash/zsh completion scripts set IFS_ENV_VAR to one character. 211 ifs = encoding.GetEncodedValue(os.environ, IFS_ENV_VAR, IFS_ENV_DEFAULT) 212 # Write completions to stream 213 f = None 214 try: 215 f = _OpenCompletionsOutputStream() 216 # the other side also uses the console encoding 217 f.write(ifs.join(completions).encode()) 218 finally: 219 if f: 220 f.close() 221