1"""
2Submit a Storm topology to Nimbus.
3"""
4
5import importlib
6import os
7import sys
8import time
9from itertools import chain
10
11import simplejson as json
12from fabric.api import env
13from pkg_resources import parse_version
14
15from ..dsl.component import JavaComponentSpec
16from ..thrift import ShellComponent, SubmitOptions, TopologyInitialStatus
17
18from ..util import (
19    activate_env,
20    get_config,
21    get_env_config,
22    get_nimbus_client,
23    get_topology_definition,
24    get_topology_from_file,
25    nimbus_storm_version,
26    set_topology_serializer,
27    ssh_tunnel,
28    warn,
29)
30from .common import (
31    add_ackers,
32    add_config,
33    add_debug,
34    add_environment,
35    add_name,
36    add_options,
37    add_override_name,
38    add_overwrite_virtualenv,
39    add_pool_size,
40    add_requirements,
41    add_timeout,
42    add_user,
43    add_wait,
44    add_workers,
45    resolve_options,
46    warn_about_deprecated_user,
47)
48from .jar import jar_for_deploy
49from .kill import _kill_topology
50from .list import _list_topologies
51from .update_virtualenv import create_or_update_virtualenvs
52
53
54THRIFT_CHUNK_SIZE = 307200
55
56
57def get_user_tasks():
58    """Get tasks defined in a user's tasks.py and fabfile.py file which is
59    assumed to be in the current working directory.
60
61    :returns: a `tuple` (invoke_tasks, fabric_tasks)
62    """
63    sys.path.insert(0, os.getcwd())
64    try:
65        user_invoke = importlib.import_module("tasks")
66    except ImportError:
67        user_invoke = None
68    try:
69        user_fabric = importlib.import_module("fabfile")
70    except ImportError:
71        user_fabric = None
72    return user_invoke, user_fabric
73
74
75def is_safe_to_submit(topology_name, nimbus_client):
76    """Is topology not in list of current topologies?"""
77    topologies = _list_topologies(nimbus_client)
78    safe = not any(topology.name == topology_name for topology in topologies)
79    return safe
80
81
82def _kill_existing_topology(topology_name, force, wait, nimbus_client):
83    if force and not is_safe_to_submit(topology_name, nimbus_client):
84        print(f'Killing current "{topology_name}" topology.')
85        sys.stdout.flush()
86        _kill_topology(topology_name, nimbus_client, wait=wait)
87        while not is_safe_to_submit(topology_name, nimbus_client):
88            print(f"Waiting for topology {topology_name} to quit...")
89            sys.stdout.flush()
90            time.sleep(0.5)
91        print("Killed.")
92        sys.stdout.flush()
93
94
95def _submit_topology(
96    topology_name,
97    topology_class,
98    remote_jar_path,
99    config,
100    env_config,
101    nimbus_client,
102    options=None,
103    active=True,
104):
105    if options.get("pystorm.log.path"):
106        print(f"Routing Python logging to {options['pystorm.log.path']}.")
107        sys.stdout.flush()
108
109    set_topology_serializer(env_config, config, topology_class)
110
111    # Check if topology name is okay on Storm versions that support that
112    if nimbus_storm_version(nimbus_client) >= parse_version("1.1.0"):
113        if not nimbus_client.isTopologyNameAllowed(topology_name):
114            raise ValueError(
115                f"Nimbus says {topology_name} is an invalid name for a Storm topology."
116            )
117
118    print(f"Submitting {topology_name} topology to nimbus...", end="")
119    sys.stdout.flush()
120    initial_status = (
121        TopologyInitialStatus.ACTIVE if active else TopologyInitialStatus.INACTIVE
122    )
123    submit_options = SubmitOptions(initial_status=initial_status)
124    nimbus_client.submitTopologyWithOpts(
125        name=topology_name,
126        uploadedJarLocation=remote_jar_path,
127        jsonConf=json.dumps(options),
128        topology=topology_class.thrift_topology,
129        options=submit_options,
130    )
131    print("done")
132
133
134def _pre_submit_hooks(topology_name, env_name, env_config, options):
135    """Pre-submit hooks for invoke and fabric."""
136    user_invoke, user_fabric = get_user_tasks()
137    pre_submit_invoke = getattr(user_invoke, "pre_submit", None)
138    if callable(pre_submit_invoke):
139        pre_submit_invoke(topology_name, env_name, env_config, options)
140    pre_submit_fabric = getattr(user_fabric, "pre_submit", None)
141    if callable(pre_submit_fabric):
142        pre_submit_fabric(topology_name, env_name, env_config, options)
143
144
145def _post_submit_hooks(topology_name, env_name, env_config, options):
146    """Post-submit hooks for invoke and fabric."""
147    user_invoke, user_fabric = get_user_tasks()
148    post_submit_invoke = getattr(user_invoke, "post_submit", None)
149    if callable(post_submit_invoke):
150        post_submit_invoke(topology_name, env_name, env_config, options)
151    post_submit_fabric = getattr(user_fabric, "post_submit", None)
152    if callable(post_submit_fabric):
153        post_submit_fabric(topology_name, env_name, env_config, options)
154
155
156def _upload_jar(nimbus_client, local_path):
157    upload_location = nimbus_client.beginFileUpload()
158    print(
159        f"Uploading topology jar {local_path} to assigned location: {upload_location}"
160    )
161    total_bytes = os.path.getsize(local_path)
162    bytes_uploaded = 0
163    with open(local_path, "rb") as local_jar:
164        while True:
165            print(f"Uploaded {bytes_uploaded}/{total_bytes} bytes", end="\r")
166            sys.stdout.flush()
167            curr_chunk = local_jar.read(THRIFT_CHUNK_SIZE)
168            if not curr_chunk:
169                break
170            nimbus_client.uploadChunk(upload_location, curr_chunk)
171            bytes_uploaded += len(curr_chunk)
172        nimbus_client.finishFileUpload(upload_location)
173        print(f"Uploaded {bytes_uploaded}/{total_bytes} bytes")
174        sys.stdout.flush()
175    return upload_location
176
177
178def submit_topology(
179    name=None,
180    env_name=None,
181    options=None,
182    force=False,
183    wait=None,
184    simple_jar=True,
185    override_name=None,
186    requirements_paths=None,
187    local_jar_path=None,
188    remote_jar_path=None,
189    timeout=None,
190    config_file=None,
191    overwrite_virtualenv=False,
192    user=None,
193    active=True,
194):
195    """Submit a topology to a remote Storm cluster."""
196    warn_about_deprecated_user(user, "submit_topology")
197    config = get_config(config_file=config_file)
198    name, topology_file = get_topology_definition(name, config_file=config_file)
199    env_name, env_config = get_env_config(env_name, config_file=config_file)
200    topology_class = get_topology_from_file(topology_file)
201    if override_name is None:
202        override_name = name
203    if remote_jar_path and local_jar_path:
204        warn("Ignoring local_jar_path because given remote_jar_path")
205        local_jar_path = None
206
207    # Setup the fabric env dictionary
208    activate_env(env_name)
209
210    # Handle option conflicts
211    options = resolve_options(options, env_config, topology_class, override_name)
212
213    # Check if we need to maintain virtualenv during the process
214    use_venv = options.get("use_virtualenv", True)
215
216    # Check if user wants to install virtualenv during the process
217    install_venv = options.get("install_virtualenv", use_venv)
218
219    # Run pre_submit actions provided by project
220    _pre_submit_hooks(override_name, env_name, env_config, options)
221
222    # If using virtualenv, set it up, and make sure paths are correct in specs
223    if use_venv:
224        virtualenv_name = options.get("virtualenv_name", override_name)
225        if install_venv:
226            create_or_update_virtualenvs(
227                env_name,
228                name,
229                options,
230                virtualenv_name=virtualenv_name,
231                requirements_paths=requirements_paths,
232                config_file=config_file,
233                overwrite_virtualenv=overwrite_virtualenv,
234                user=user,
235            )
236        streamparse_run_path = "/".join(
237            [env.virtualenv_root, virtualenv_name, "bin", "streamparse_run"]
238        )
239        # Update python paths in bolts
240        for thrift_bolt in topology_class.thrift_bolts.values():
241            inner_shell = thrift_bolt.bolt_object.shell
242            if isinstance(inner_shell, ShellComponent):
243                if "streamparse_run" in inner_shell.execution_command:
244                    inner_shell.execution_command = streamparse_run_path
245        # Update python paths in spouts
246        for thrift_spout in topology_class.thrift_spouts.values():
247            inner_shell = thrift_spout.spout_object.shell
248            if isinstance(inner_shell, ShellComponent):
249                if "streamparse_run" in inner_shell.execution_command:
250                    inner_shell.execution_command = streamparse_run_path
251
252    # In case we're overriding things, let's save the original name
253    options["topology.original_name"] = name
254
255    # Set parallelism based on env_name if necessary
256    for thrift_component in chain(
257        topology_class.thrift_bolts.values(), topology_class.thrift_spouts.values()
258    ):
259        par_hint = thrift_component.common.parallelism_hint
260        if isinstance(par_hint, dict):
261            thrift_component.common.parallelism_hint = par_hint.get(env_name)
262
263    if local_jar_path:
264        print(f"Using prebuilt JAR: {local_jar_path}")
265    elif not remote_jar_path:
266        # Check topology for JVM stuff to see if we need to create uber-jar
267        if simple_jar:
268            simple_jar = not any(
269                isinstance(spec, JavaComponentSpec) for spec in topology_class.specs
270            )
271
272        # Prepare a JAR that doesn't have Storm dependencies packaged
273        local_jar_path = jar_for_deploy(simple_jar=simple_jar)
274
275    if name != override_name:
276        print(f'Deploying "{name}" topology with name "{override_name}"...')
277    else:
278        print(f'Deploying "{name}" topology...')
279    sys.stdout.flush()
280    # Use ssh tunnel with Nimbus if use_ssh_for_nimbus is unspecified or True
281    with ssh_tunnel(env_config) as (host, port):
282        nimbus_client = get_nimbus_client(
283            env_config, host=host, port=port, timeout=timeout
284        )
285        if remote_jar_path:
286            print(
287                f"Reusing remote JAR on Nimbus server at path: {remote_jar_path}"
288            )
289        else:
290            remote_jar_path = _upload_jar(nimbus_client, local_jar_path)
291        _kill_existing_topology(override_name, force, wait, nimbus_client)
292        _submit_topology(
293            override_name,
294            topology_class,
295            remote_jar_path,
296            config,
297            env_config,
298            nimbus_client,
299            options=options,
300            active=active,
301        )
302    _post_submit_hooks(override_name, env_name, env_config, options)
303
304
305def subparser_hook(subparsers):
306    """ Hook to add subparser for this command. """
307    subparser = subparsers.add_parser("submit", description=__doc__, help=main.__doc__)
308    subparser.set_defaults(func=main)
309    add_ackers(subparser)
310    add_config(subparser)
311    add_debug(subparser)
312    add_environment(subparser)
313    subparser.add_argument(
314        "-f",
315        "--force",
316        action="store_true",
317        help="Force a topology to submit by killing any "
318        "currently running topologies with the same "
319        "name.",
320    )
321    subparser.add_argument(
322        "-i",
323        "--inactive",
324        help="Submit topology as inactive instead of active."
325        " This is useful if you are migrating the "
326        "topology to a new environment and already "
327        "have it running actively in an older one.",
328        action="store_false",
329        dest="active",
330    )
331    subparser.add_argument(
332        "-j",
333        "--local_jar_path",
334        help="Path to a prebuilt JAR to upload to Nimbus. "
335        "This is useful when you have multiple "
336        "topologies that all run out of the same JAR, "
337        "or you have manually created the JAR.",
338    )
339    add_name(subparser)
340    add_options(subparser)
341    add_override_name(subparser)
342    add_overwrite_virtualenv(subparser)
343    add_pool_size(subparser)
344    add_requirements(subparser)
345    subparser.add_argument(
346        "-R",
347        "--remote_jar_path",
348        help="Path to a prebuilt JAR that already exists on "
349        "your Nimbus server. This is useful when you "
350        "have multiple topologies that all run out of "
351        "the same JAR, and you do not want to upload it"
352        " multiple times.",
353    )
354    add_timeout(subparser)
355    subparser.add_argument(
356        "-u",
357        "--uber_jar",
358        help="Build an Uber-JAR even if you have no Java "
359        "components in your topology.  Useful if you "
360        "are providing your own seriailzer class.",
361        dest="simple_jar",
362        action="store_false",
363    )
364    add_user(subparser)
365    add_wait(subparser)
366    add_workers(subparser)
367
368
369def main(args):
370    """ Submit a Storm topology to Nimbus. """
371    env.pool_size = args.pool_size
372    submit_topology(
373        name=args.name,
374        env_name=args.environment,
375        options=args.options,
376        force=args.force,
377        wait=args.wait,
378        simple_jar=args.simple_jar,
379        override_name=args.override_name,
380        requirements_paths=args.requirements,
381        local_jar_path=args.local_jar_path,
382        remote_jar_path=args.remote_jar_path,
383        timeout=args.timeout,
384        config_file=args.config,
385        overwrite_virtualenv=args.overwrite_virtualenv,
386        active=args.active,
387    )
388