1""" 2Remove all logs from Storm workers for the specified Storm topology. 3""" 4 5from fabric.api import env, execute, parallel 6 7from .common import ( 8 add_config, 9 add_environment, 10 add_name, 11 add_override_name, 12 add_pattern, 13 add_pool_size, 14 add_user, 15 resolve_options, 16 warn_about_deprecated_user, 17) 18from ..util import ( 19 activate_env, 20 get_env_config, 21 get_topology_definition, 22 get_topology_from_file, 23 get_logfiles_cmd, 24 run_cmd, 25) 26 27 28@parallel 29def _remove_logs( 30 topology_name, pattern, remove_worker_logs, user, remove_all_artifacts 31): 32 """ 33 Actual task to remove logs on all servers in parallel. 34 """ 35 ls_cmd = get_logfiles_cmd( 36 topology_name=topology_name, 37 pattern=pattern, 38 include_worker_logs=remove_worker_logs, 39 include_all_artifacts=remove_all_artifacts, 40 ) 41 rm_pipe = " | xargs rm -f" 42 run_cmd(ls_cmd + rm_pipe, user, warn_only=True) 43 44 45def remove_logs( 46 topology_name=None, 47 env_name=None, 48 pattern=None, 49 remove_worker_logs=False, 50 user=None, 51 override_name=None, 52 remove_all_artifacts=False, 53 options=None, 54 config_file=None, 55): 56 """Remove all Python logs on Storm workers in the log.path directory.""" 57 warn_about_deprecated_user(user, "remove_logs") 58 topology_name, topology_file = get_topology_definition( 59 override_name or topology_name, config_file=config_file 60 ) 61 topology_class = get_topology_from_file(topology_file) 62 env_name, env_config = get_env_config(env_name, config_file=config_file) 63 storm_options = resolve_options(options, env_config, topology_class, topology_name) 64 activate_env(env_name, storm_options, config_file=config_file) 65 execute( 66 _remove_logs, 67 topology_name, 68 pattern, 69 remove_worker_logs, 70 # TODO: Remove "user" in next major version 71 user or storm_options["sudo_user"], 72 remove_all_artifacts, 73 hosts=env.storm_workers, 74 ) 75 76 77def subparser_hook(subparsers): 78 """ Hook to add subparser for this command. """ 79 subparser = subparsers.add_parser( 80 "remove_logs", description=__doc__, help=main.__doc__ 81 ) 82 subparser.set_defaults(func=main) 83 subparser.add_argument( 84 "-A", 85 "--remove_all_artifacts", 86 help="Remove not only topology-specific logs, but " 87 "also any other files for the topology in its " 88 "workers-artifacts subdirectories.", 89 action="store_true", 90 ) 91 add_config(subparser) 92 add_environment(subparser) 93 add_name(subparser) 94 add_override_name(subparser) 95 add_pattern(subparser) 96 add_pool_size(subparser) 97 add_user(subparser, allow_short=True) 98 subparser.add_argument( 99 "-w", 100 "--remove_worker_logs", 101 help="Remove not only topology-specific logs, but " 102 "also worker logs that may be shared between " 103 "topologies.", 104 action="store_true", 105 ) 106 107 108def main(args): 109 """ Remove logs from Storm workers. """ 110 env.pool_size = args.pool_size 111 remove_logs( 112 topology_name=args.name, 113 env_name=args.environment, 114 pattern=args.pattern, 115 remove_worker_logs=args.remove_worker_logs, 116 options=args.options, 117 override_name=args.override_name, 118 remove_all_artifacts=args.remove_all_artifacts, 119 config_file=args.config, 120 ) 121