1""" 2Submit a Storm topology to Nimbus. 3""" 4 5import importlib 6import os 7import sys 8import time 9from itertools import chain 10 11import simplejson as json 12from fabric.api import env 13from pkg_resources import parse_version 14 15from ..dsl.component import JavaComponentSpec 16from ..thrift import ShellComponent, SubmitOptions, TopologyInitialStatus 17 18from ..util import ( 19 activate_env, 20 get_config, 21 get_env_config, 22 get_nimbus_client, 23 get_topology_definition, 24 get_topology_from_file, 25 nimbus_storm_version, 26 set_topology_serializer, 27 ssh_tunnel, 28 warn, 29) 30from .common import ( 31 add_ackers, 32 add_config, 33 add_debug, 34 add_environment, 35 add_name, 36 add_options, 37 add_override_name, 38 add_overwrite_virtualenv, 39 add_pool_size, 40 add_requirements, 41 add_timeout, 42 add_user, 43 add_wait, 44 add_workers, 45 resolve_options, 46 warn_about_deprecated_user, 47) 48from .jar import jar_for_deploy 49from .kill import _kill_topology 50from .list import _list_topologies 51from .update_virtualenv import create_or_update_virtualenvs 52 53 54THRIFT_CHUNK_SIZE = 307200 55 56 57def get_user_tasks(): 58 """Get tasks defined in a user's tasks.py and fabfile.py file which is 59 assumed to be in the current working directory. 60 61 :returns: a `tuple` (invoke_tasks, fabric_tasks) 62 """ 63 sys.path.insert(0, os.getcwd()) 64 try: 65 user_invoke = importlib.import_module("tasks") 66 except ImportError: 67 user_invoke = None 68 try: 69 user_fabric = importlib.import_module("fabfile") 70 except ImportError: 71 user_fabric = None 72 return user_invoke, user_fabric 73 74 75def is_safe_to_submit(topology_name, nimbus_client): 76 """Is topology not in list of current topologies?""" 77 topologies = _list_topologies(nimbus_client) 78 safe = not any(topology.name == topology_name for topology in topologies) 79 return safe 80 81 82def _kill_existing_topology(topology_name, force, wait, nimbus_client): 83 if force and not is_safe_to_submit(topology_name, nimbus_client): 84 print(f'Killing current "{topology_name}" topology.') 85 sys.stdout.flush() 86 _kill_topology(topology_name, nimbus_client, wait=wait) 87 while not is_safe_to_submit(topology_name, nimbus_client): 88 print(f"Waiting for topology {topology_name} to quit...") 89 sys.stdout.flush() 90 time.sleep(0.5) 91 print("Killed.") 92 sys.stdout.flush() 93 94 95def _submit_topology( 96 topology_name, 97 topology_class, 98 remote_jar_path, 99 config, 100 env_config, 101 nimbus_client, 102 options=None, 103 active=True, 104): 105 if options.get("pystorm.log.path"): 106 print(f"Routing Python logging to {options['pystorm.log.path']}.") 107 sys.stdout.flush() 108 109 set_topology_serializer(env_config, config, topology_class) 110 111 # Check if topology name is okay on Storm versions that support that 112 if nimbus_storm_version(nimbus_client) >= parse_version("1.1.0"): 113 if not nimbus_client.isTopologyNameAllowed(topology_name): 114 raise ValueError( 115 f"Nimbus says {topology_name} is an invalid name for a Storm topology." 116 ) 117 118 print(f"Submitting {topology_name} topology to nimbus...", end="") 119 sys.stdout.flush() 120 initial_status = ( 121 TopologyInitialStatus.ACTIVE if active else TopologyInitialStatus.INACTIVE 122 ) 123 submit_options = SubmitOptions(initial_status=initial_status) 124 nimbus_client.submitTopologyWithOpts( 125 name=topology_name, 126 uploadedJarLocation=remote_jar_path, 127 jsonConf=json.dumps(options), 128 topology=topology_class.thrift_topology, 129 options=submit_options, 130 ) 131 print("done") 132 133 134def _pre_submit_hooks(topology_name, env_name, env_config, options): 135 """Pre-submit hooks for invoke and fabric.""" 136 user_invoke, user_fabric = get_user_tasks() 137 pre_submit_invoke = getattr(user_invoke, "pre_submit", None) 138 if callable(pre_submit_invoke): 139 pre_submit_invoke(topology_name, env_name, env_config, options) 140 pre_submit_fabric = getattr(user_fabric, "pre_submit", None) 141 if callable(pre_submit_fabric): 142 pre_submit_fabric(topology_name, env_name, env_config, options) 143 144 145def _post_submit_hooks(topology_name, env_name, env_config, options): 146 """Post-submit hooks for invoke and fabric.""" 147 user_invoke, user_fabric = get_user_tasks() 148 post_submit_invoke = getattr(user_invoke, "post_submit", None) 149 if callable(post_submit_invoke): 150 post_submit_invoke(topology_name, env_name, env_config, options) 151 post_submit_fabric = getattr(user_fabric, "post_submit", None) 152 if callable(post_submit_fabric): 153 post_submit_fabric(topology_name, env_name, env_config, options) 154 155 156def _upload_jar(nimbus_client, local_path): 157 upload_location = nimbus_client.beginFileUpload() 158 print( 159 f"Uploading topology jar {local_path} to assigned location: {upload_location}" 160 ) 161 total_bytes = os.path.getsize(local_path) 162 bytes_uploaded = 0 163 with open(local_path, "rb") as local_jar: 164 while True: 165 print(f"Uploaded {bytes_uploaded}/{total_bytes} bytes", end="\r") 166 sys.stdout.flush() 167 curr_chunk = local_jar.read(THRIFT_CHUNK_SIZE) 168 if not curr_chunk: 169 break 170 nimbus_client.uploadChunk(upload_location, curr_chunk) 171 bytes_uploaded += len(curr_chunk) 172 nimbus_client.finishFileUpload(upload_location) 173 print(f"Uploaded {bytes_uploaded}/{total_bytes} bytes") 174 sys.stdout.flush() 175 return upload_location 176 177 178def submit_topology( 179 name=None, 180 env_name=None, 181 options=None, 182 force=False, 183 wait=None, 184 simple_jar=True, 185 override_name=None, 186 requirements_paths=None, 187 local_jar_path=None, 188 remote_jar_path=None, 189 timeout=None, 190 config_file=None, 191 overwrite_virtualenv=False, 192 user=None, 193 active=True, 194): 195 """Submit a topology to a remote Storm cluster.""" 196 warn_about_deprecated_user(user, "submit_topology") 197 config = get_config(config_file=config_file) 198 name, topology_file = get_topology_definition(name, config_file=config_file) 199 env_name, env_config = get_env_config(env_name, config_file=config_file) 200 topology_class = get_topology_from_file(topology_file) 201 if override_name is None: 202 override_name = name 203 if remote_jar_path and local_jar_path: 204 warn("Ignoring local_jar_path because given remote_jar_path") 205 local_jar_path = None 206 207 # Setup the fabric env dictionary 208 activate_env(env_name) 209 210 # Handle option conflicts 211 options = resolve_options(options, env_config, topology_class, override_name) 212 213 # Check if we need to maintain virtualenv during the process 214 use_venv = options.get("use_virtualenv", True) 215 216 # Check if user wants to install virtualenv during the process 217 install_venv = options.get("install_virtualenv", use_venv) 218 219 # Run pre_submit actions provided by project 220 _pre_submit_hooks(override_name, env_name, env_config, options) 221 222 # If using virtualenv, set it up, and make sure paths are correct in specs 223 if use_venv: 224 virtualenv_name = options.get("virtualenv_name", override_name) 225 if install_venv: 226 create_or_update_virtualenvs( 227 env_name, 228 name, 229 options, 230 virtualenv_name=virtualenv_name, 231 requirements_paths=requirements_paths, 232 config_file=config_file, 233 overwrite_virtualenv=overwrite_virtualenv, 234 user=user, 235 ) 236 streamparse_run_path = "/".join( 237 [env.virtualenv_root, virtualenv_name, "bin", "streamparse_run"] 238 ) 239 # Update python paths in bolts 240 for thrift_bolt in topology_class.thrift_bolts.values(): 241 inner_shell = thrift_bolt.bolt_object.shell 242 if isinstance(inner_shell, ShellComponent): 243 if "streamparse_run" in inner_shell.execution_command: 244 inner_shell.execution_command = streamparse_run_path 245 # Update python paths in spouts 246 for thrift_spout in topology_class.thrift_spouts.values(): 247 inner_shell = thrift_spout.spout_object.shell 248 if isinstance(inner_shell, ShellComponent): 249 if "streamparse_run" in inner_shell.execution_command: 250 inner_shell.execution_command = streamparse_run_path 251 252 # In case we're overriding things, let's save the original name 253 options["topology.original_name"] = name 254 255 # Set parallelism based on env_name if necessary 256 for thrift_component in chain( 257 topology_class.thrift_bolts.values(), topology_class.thrift_spouts.values() 258 ): 259 par_hint = thrift_component.common.parallelism_hint 260 if isinstance(par_hint, dict): 261 thrift_component.common.parallelism_hint = par_hint.get(env_name) 262 263 if local_jar_path: 264 print(f"Using prebuilt JAR: {local_jar_path}") 265 elif not remote_jar_path: 266 # Check topology for JVM stuff to see if we need to create uber-jar 267 if simple_jar: 268 simple_jar = not any( 269 isinstance(spec, JavaComponentSpec) for spec in topology_class.specs 270 ) 271 272 # Prepare a JAR that doesn't have Storm dependencies packaged 273 local_jar_path = jar_for_deploy(simple_jar=simple_jar) 274 275 if name != override_name: 276 print(f'Deploying "{name}" topology with name "{override_name}"...') 277 else: 278 print(f'Deploying "{name}" topology...') 279 sys.stdout.flush() 280 # Use ssh tunnel with Nimbus if use_ssh_for_nimbus is unspecified or True 281 with ssh_tunnel(env_config) as (host, port): 282 nimbus_client = get_nimbus_client( 283 env_config, host=host, port=port, timeout=timeout 284 ) 285 if remote_jar_path: 286 print( 287 f"Reusing remote JAR on Nimbus server at path: {remote_jar_path}" 288 ) 289 else: 290 remote_jar_path = _upload_jar(nimbus_client, local_jar_path) 291 _kill_existing_topology(override_name, force, wait, nimbus_client) 292 _submit_topology( 293 override_name, 294 topology_class, 295 remote_jar_path, 296 config, 297 env_config, 298 nimbus_client, 299 options=options, 300 active=active, 301 ) 302 _post_submit_hooks(override_name, env_name, env_config, options) 303 304 305def subparser_hook(subparsers): 306 """ Hook to add subparser for this command. """ 307 subparser = subparsers.add_parser("submit", description=__doc__, help=main.__doc__) 308 subparser.set_defaults(func=main) 309 add_ackers(subparser) 310 add_config(subparser) 311 add_debug(subparser) 312 add_environment(subparser) 313 subparser.add_argument( 314 "-f", 315 "--force", 316 action="store_true", 317 help="Force a topology to submit by killing any " 318 "currently running topologies with the same " 319 "name.", 320 ) 321 subparser.add_argument( 322 "-i", 323 "--inactive", 324 help="Submit topology as inactive instead of active." 325 " This is useful if you are migrating the " 326 "topology to a new environment and already " 327 "have it running actively in an older one.", 328 action="store_false", 329 dest="active", 330 ) 331 subparser.add_argument( 332 "-j", 333 "--local_jar_path", 334 help="Path to a prebuilt JAR to upload to Nimbus. " 335 "This is useful when you have multiple " 336 "topologies that all run out of the same JAR, " 337 "or you have manually created the JAR.", 338 ) 339 add_name(subparser) 340 add_options(subparser) 341 add_override_name(subparser) 342 add_overwrite_virtualenv(subparser) 343 add_pool_size(subparser) 344 add_requirements(subparser) 345 subparser.add_argument( 346 "-R", 347 "--remote_jar_path", 348 help="Path to a prebuilt JAR that already exists on " 349 "your Nimbus server. This is useful when you " 350 "have multiple topologies that all run out of " 351 "the same JAR, and you do not want to upload it" 352 " multiple times.", 353 ) 354 add_timeout(subparser) 355 subparser.add_argument( 356 "-u", 357 "--uber_jar", 358 help="Build an Uber-JAR even if you have no Java " 359 "components in your topology. Useful if you " 360 "are providing your own seriailzer class.", 361 dest="simple_jar", 362 action="store_false", 363 ) 364 add_user(subparser) 365 add_wait(subparser) 366 add_workers(subparser) 367 368 369def main(args): 370 """ Submit a Storm topology to Nimbus. """ 371 env.pool_size = args.pool_size 372 submit_topology( 373 name=args.name, 374 env_name=args.environment, 375 options=args.options, 376 force=args.force, 377 wait=args.wait, 378 simple_jar=args.simple_jar, 379 override_name=args.override_name, 380 requirements_paths=args.requirements, 381 local_jar_path=args.local_jar_path, 382 remote_jar_path=args.remote_jar_path, 383 timeout=args.timeout, 384 config_file=args.config, 385 overwrite_virtualenv=args.overwrite_virtualenv, 386 active=args.active, 387 ) 388