1""" 2Functions for adding common CLI arguments to argparse sub-commands. 3""" 4import argparse 5import copy 6import warnings 7 8from ruamel import yaml 9 10from streamparse.util import get_storm_workers 11 12 13class _StoreDictAction(argparse.Action): 14 """Action for storing key=val option strings as a single dict.""" 15 16 def __init__( 17 self, 18 option_strings, 19 dest, 20 nargs=None, 21 const=None, 22 default=None, 23 type=None, 24 choices=None, 25 required=False, 26 help=None, 27 metavar=None, 28 ): 29 if nargs == 0: 30 raise ValueError("nargs for store_dict actions must be > 0") 31 if const is not None and nargs != "?": 32 raise ValueError('nargs must be "?" to supply const') 33 super().__init__( 34 option_strings=option_strings, 35 dest=dest, 36 nargs=nargs, 37 const=const, 38 default=default, 39 type=type, 40 choices=choices, 41 required=required, 42 help=help, 43 metavar=metavar, 44 ) 45 46 def __call__(self, parser, namespace, values, option_string=None): 47 if getattr(namespace, self.dest, None) is None: 48 setattr(namespace, self.dest, {}) 49 # Only doing a copy here because that's what _AppendAction does 50 items = copy.copy(getattr(namespace, self.dest)) 51 key, val = values.split("=", 1) 52 if yaml.version_info < (0, 15): 53 items[key] = yaml.safe_load(val) 54 else: 55 yml = yaml.YAML(typ="safe", pure=True) 56 items[key] = yml.load(val) 57 setattr(namespace, self.dest, items) 58 59 60def option_alias(option): 61 """Returns a function that will create option=val for _StoreDictAction.""" 62 63 def _create_key_val_str(val): 64 return f"{option}={val}" 65 66 return _create_key_val_str 67 68 69def add_ackers(parser): 70 """ Add --ackers option to parser """ 71 parser.add_argument( 72 "-a", 73 "--ackers", 74 help="Set number of acker executors for your topology. " 75 "Defaults to the number of worker nodes in your " 76 "Storm environment.", 77 type=option_alias("topology.acker.executors"), 78 action=_StoreDictAction, 79 dest="options", 80 ) 81 82 83def add_config(parser): 84 """ Add --config option to parser """ 85 parser.add_argument( 86 "--config", help="Specify path to config.json", type=argparse.FileType("r") 87 ) 88 89 90def add_debug(parser): 91 """ Add --debug option to parser """ 92 parser.add_argument( 93 "-d", 94 "--debug", 95 help="Set topology.debug and produce debugging output.", 96 type=option_alias("topology.debug"), 97 action=_StoreDictAction, 98 dest="options", 99 const="true", 100 nargs="?", 101 ) 102 103 104def add_environment(parser): 105 """ Add --environment option to parser """ 106 parser.add_argument( 107 "-e", 108 "--environment", 109 help="The environment to use for the command. " 110 "Corresponds to an environment in your " 111 '"envs" dictionary in config.json. If you ' 112 "only have one environment specified, " 113 "streamparse will automatically use this.", 114 ) 115 116 117def add_name(parser): 118 """ Add --name option to parser """ 119 parser.add_argument( 120 "-n", 121 "--name", 122 help="The name of the topology to act on. If you have " 123 'only one topology defined in your "topologies" ' 124 "directory, streamparse will use it " 125 "automatically.", 126 ) 127 128 129def add_options(parser): 130 """ Add --option options to parser """ 131 parser.add_argument( 132 "-o", 133 "--option", 134 dest="options", 135 action=_StoreDictAction, 136 help="Topology option to pass on to Storm. For example," 137 ' "-o topology.debug=true" is equivalent to ' 138 '"--debug". May be repeated multiple for multiple' 139 " options.", 140 ) 141 142 143def add_override_name(parser): 144 """ Add --override_name option to parser """ 145 parser.add_argument( 146 "-N", 147 "--override_name", 148 help="For operations such as creating virtualenvs and " 149 "killing/submitting topologies, use this value " 150 "instead of NAME. This is useful if you want to " 151 "submit the same topology twice without having to " 152 "duplicate the topology file.", 153 ) 154 155 156def add_overwrite_virtualenv(parser): 157 """ Add --overwrite_virtualenv option to parser """ 158 parser.add_argument( 159 "--overwrite_virtualenv", 160 help="Create the virtualenv even if it already exists." 161 " This is useful when you have changed your " 162 "virtualenv_flags.", 163 action="store_true", 164 ) 165 166 167def add_pattern(parser): 168 """ Add --pattern option to parser """ 169 parser.add_argument("--pattern", help="Pattern of log files to operate on.") 170 171 172def add_pool_size(parser): 173 """ Add --pool_size option to parser """ 174 parser.add_argument( 175 "--pool_size", 176 help="Number of simultaneous SSH connections to use when updating " 177 "virtualenvs, removing logs, or tailing logs.", 178 default=10, 179 type=int, 180 ) 181 182 183def add_requirements(parser): 184 """ Add --requirements option to parser """ 185 parser.add_argument( 186 "-r", 187 "--requirements", 188 nargs="*", 189 help="Path to pip-style requirements file specifying " 190 "the dependencies to use for creating the " 191 "virtualenv for this topology. If unspecified, " 192 "streamparse will look for a file called NAME.txt " 193 "in the directory specified by the " 194 "virtualenv_specs setting in config.json.", 195 ) 196 197 198def add_simple_jar(parser): 199 """ Add --simple_jar option to parser. """ 200 parser.add_argument( 201 "-s", 202 "--simple_jar", 203 action="store_true", 204 help="Instead of creating an Uber-JAR for the " 205 "topology, which contains all of its JVM " 206 "dependencies, create a simple JAR with just the " 207 "code for the project. This is useful when your " 208 "project is pure Python and has no JVM " 209 "dependencies.", 210 ) 211 212 213def add_timeout(parser): 214 """ Add --timeout option to parser """ 215 parser.add_argument( 216 "--timeout", 217 type=int, 218 default=7000, 219 help="Milliseconds to wait for Nimbus to respond. " "(default: %(default)s)", 220 ) 221 222 223def add_user(parser, allow_short=False): 224 """Add --user option to parser 225 226 Set allow_short to add -u as well. 227 """ 228 args = ["--user"] 229 if allow_short: 230 args.insert(0, "-u") 231 232 kwargs = { 233 "help": "User argument to sudo when creating and deleting virtualenvs.", 234 "default": None, 235 "type": option_alias("sudo_user"), 236 "dest": "options", 237 "action": _StoreDictAction, 238 } 239 240 parser.add_argument(*args, **kwargs) 241 242 243def add_wait(parser): 244 """ Add --wait option to parser """ 245 parser.add_argument( 246 "--wait", 247 type=int, 248 default=5, 249 help="Seconds to wait before killing topology. " "(default: %(default)s)", 250 ) 251 252 253def add_workers(parser): 254 """ Add --workers option to parser """ 255 parser.add_argument( 256 "-w", 257 "--workers", 258 help="Set number of Storm workers for your topology. " 259 "Defaults to the number of worker nodes in your " 260 "Storm environment.", 261 type=option_alias("topology.workers"), 262 action=_StoreDictAction, 263 dest="options", 264 ) 265 266 267VIRTUALENV_OPTIONS = ( 268 "install_virtualenv", 269 "use_virtualenv", 270 "virtualenv_flags", 271 "virtualenv_root", 272 "virtualenv_name", 273) 274 275 276def resolve_options( 277 cli_options, env_config, topology_class, topology_name, local_only=False 278): 279 """Resolve potentially conflicting Storm options from three sources: 280 281 CLI options > Topology options > config.json options 282 283 :param local_only: Whether or not we should talk to Nimbus to get Storm 284 workers and other info. 285 """ 286 storm_options = {} 287 288 # Start with environment options 289 storm_options.update(env_config.get("options", {})) 290 291 # Set topology.python.path 292 if env_config.get("use_virtualenv", True): 293 python_path = "/".join( 294 [env_config["virtualenv_root"], topology_name, "bin", "python"] 295 ) 296 # This setting is for information purposes only, and is not actually 297 # read by any pystorm or streamparse code. 298 storm_options["topology.python.path"] = python_path 299 300 # Set logging options based on environment config 301 log_config = env_config.get("log", {}) 302 log_path = log_config.get("path") or env_config.get("log_path") 303 log_file = log_config.get("file") or env_config.get("log_file") 304 if log_path: 305 storm_options["pystorm.log.path"] = log_path 306 if log_file: 307 storm_options["pystorm.log.file"] = log_file 308 if isinstance(log_config.get("max_bytes"), int): 309 storm_options["pystorm.log.max_bytes"] = log_config["max_bytes"] 310 if isinstance(log_config.get("backup_count"), int): 311 storm_options["pystorm.log.backup_count"] = log_config["backup_count"] 312 if isinstance(log_config.get("level"), str): 313 storm_options["pystorm.log.level"] = log_config["level"].lower() 314 315 # Make sure virtualenv options are present here 316 for venv_option in VIRTUALENV_OPTIONS: 317 if venv_option in env_config: 318 storm_options[venv_option] = env_config[venv_option] 319 320 # Set sudo_user default to SSH user if we have one 321 storm_options["sudo_user"] = env_config.get("user", None) 322 323 # Override options with topology options 324 storm_options.update(topology_class.config) 325 326 # Override options with CLI options 327 storm_options.update(cli_options or {}) 328 329 # Set log level to debug if topology.debug is set 330 if storm_options.get("topology.debug", False): 331 storm_options["pystorm.log.level"] = "debug" 332 333 # If ackers and executors still aren't set, use number of worker nodes 334 if not local_only: 335 if not storm_options.get("storm.workers.list"): 336 storm_options["storm.workers.list"] = get_storm_workers(env_config) 337 elif isinstance(storm_options["storm.workers.list"], str): 338 storm_options["storm.workers.list"] = storm_options[ 339 "storm.workers.list" 340 ].split(",") 341 num_storm_workers = len(storm_options["storm.workers.list"]) 342 else: 343 storm_options["storm.workers.list"] = [] 344 num_storm_workers = 1 345 if storm_options.get("topology.acker.executors") is None: 346 storm_options["topology.acker.executors"] = num_storm_workers 347 if storm_options.get("topology.workers") is None: 348 storm_options["topology.workers"] = num_storm_workers 349 350 # If sudo_user was not present anywhere, set it to "root" 351 storm_options.setdefault("sudo_user", "root") 352 353 return storm_options 354 355 356def warn_about_deprecated_user(user, func_name): 357 if user is not None: 358 warnings.warn( 359 ( 360 "The 'user' argument to '{}' will be removed in the next " 361 "major release of streamparse. Provide the 'sudo_user' key to" 362 " the 'options' dict argument instead." 363 ).format(func_name), 364 DeprecationWarning, 365 ) 366