1"""
2Remove all logs from Storm workers for the specified Storm topology.
3"""
4
5from fabric.api import env, execute, parallel
6
7from .common import (
8    add_config,
9    add_environment,
10    add_name,
11    add_override_name,
12    add_pattern,
13    add_pool_size,
14    add_user,
15    resolve_options,
16    warn_about_deprecated_user,
17)
18from ..util import (
19    activate_env,
20    get_env_config,
21    get_topology_definition,
22    get_topology_from_file,
23    get_logfiles_cmd,
24    run_cmd,
25)
26
27
28@parallel
29def _remove_logs(
30    topology_name, pattern, remove_worker_logs, user, remove_all_artifacts
31):
32    """
33    Actual task to remove logs on all servers in parallel.
34    """
35    ls_cmd = get_logfiles_cmd(
36        topology_name=topology_name,
37        pattern=pattern,
38        include_worker_logs=remove_worker_logs,
39        include_all_artifacts=remove_all_artifacts,
40    )
41    rm_pipe = " | xargs rm -f"
42    run_cmd(ls_cmd + rm_pipe, user, warn_only=True)
43
44
45def remove_logs(
46    topology_name=None,
47    env_name=None,
48    pattern=None,
49    remove_worker_logs=False,
50    user=None,
51    override_name=None,
52    remove_all_artifacts=False,
53    options=None,
54    config_file=None,
55):
56    """Remove all Python logs on Storm workers in the log.path directory."""
57    warn_about_deprecated_user(user, "remove_logs")
58    topology_name, topology_file = get_topology_definition(
59        override_name or topology_name, config_file=config_file
60    )
61    topology_class = get_topology_from_file(topology_file)
62    env_name, env_config = get_env_config(env_name, config_file=config_file)
63    storm_options = resolve_options(options, env_config, topology_class, topology_name)
64    activate_env(env_name, storm_options, config_file=config_file)
65    execute(
66        _remove_logs,
67        topology_name,
68        pattern,
69        remove_worker_logs,
70        # TODO: Remove "user" in next major version
71        user or storm_options["sudo_user"],
72        remove_all_artifacts,
73        hosts=env.storm_workers,
74    )
75
76
77def subparser_hook(subparsers):
78    """ Hook to add subparser for this command. """
79    subparser = subparsers.add_parser(
80        "remove_logs", description=__doc__, help=main.__doc__
81    )
82    subparser.set_defaults(func=main)
83    subparser.add_argument(
84        "-A",
85        "--remove_all_artifacts",
86        help="Remove not only topology-specific logs, but "
87        "also any other files for the topology in its "
88        "workers-artifacts subdirectories.",
89        action="store_true",
90    )
91    add_config(subparser)
92    add_environment(subparser)
93    add_name(subparser)
94    add_override_name(subparser)
95    add_pattern(subparser)
96    add_pool_size(subparser)
97    add_user(subparser, allow_short=True)
98    subparser.add_argument(
99        "-w",
100        "--remove_worker_logs",
101        help="Remove not only topology-specific logs, but "
102        "also worker logs that may be shared between "
103        "topologies.",
104        action="store_true",
105    )
106
107
108def main(args):
109    """ Remove logs from Storm workers. """
110    env.pool_size = args.pool_size
111    remove_logs(
112        topology_name=args.name,
113        env_name=args.environment,
114        pattern=args.pattern,
115        remove_worker_logs=args.remove_worker_logs,
116        options=args.options,
117        override_name=args.override_name,
118        remove_all_artifacts=args.remove_all_artifacts,
119        config_file=args.config,
120    )
121