1"""
2Functions for adding common CLI arguments to argparse sub-commands.
3"""
4import argparse
5import copy
6import warnings
7
8from ruamel import yaml
9
10from streamparse.util import get_storm_workers
11
12
13class _StoreDictAction(argparse.Action):
14    """Action for storing key=val option strings as a single dict."""
15
16    def __init__(
17        self,
18        option_strings,
19        dest,
20        nargs=None,
21        const=None,
22        default=None,
23        type=None,
24        choices=None,
25        required=False,
26        help=None,
27        metavar=None,
28    ):
29        if nargs == 0:
30            raise ValueError("nargs for store_dict actions must be > 0")
31        if const is not None and nargs != "?":
32            raise ValueError('nargs must be "?" to supply const')
33        super().__init__(
34            option_strings=option_strings,
35            dest=dest,
36            nargs=nargs,
37            const=const,
38            default=default,
39            type=type,
40            choices=choices,
41            required=required,
42            help=help,
43            metavar=metavar,
44        )
45
46    def __call__(self, parser, namespace, values, option_string=None):
47        if getattr(namespace, self.dest, None) is None:
48            setattr(namespace, self.dest, {})
49        # Only doing a copy here because that's what _AppendAction does
50        items = copy.copy(getattr(namespace, self.dest))
51        key, val = values.split("=", 1)
52        if yaml.version_info < (0, 15):
53            items[key] = yaml.safe_load(val)
54        else:
55            yml = yaml.YAML(typ="safe", pure=True)
56            items[key] = yml.load(val)
57        setattr(namespace, self.dest, items)
58
59
60def option_alias(option):
61    """Returns a function that will create option=val for _StoreDictAction."""
62
63    def _create_key_val_str(val):
64        return f"{option}={val}"
65
66    return _create_key_val_str
67
68
69def add_ackers(parser):
70    """ Add --ackers option to parser """
71    parser.add_argument(
72        "-a",
73        "--ackers",
74        help="Set number of acker executors for your topology. "
75        "Defaults to the number of worker nodes in your "
76        "Storm environment.",
77        type=option_alias("topology.acker.executors"),
78        action=_StoreDictAction,
79        dest="options",
80    )
81
82
83def add_config(parser):
84    """ Add --config option to parser """
85    parser.add_argument(
86        "--config", help="Specify path to config.json", type=argparse.FileType("r")
87    )
88
89
90def add_debug(parser):
91    """ Add --debug option to parser """
92    parser.add_argument(
93        "-d",
94        "--debug",
95        help="Set topology.debug and produce debugging output.",
96        type=option_alias("topology.debug"),
97        action=_StoreDictAction,
98        dest="options",
99        const="true",
100        nargs="?",
101    )
102
103
104def add_environment(parser):
105    """ Add --environment option to parser """
106    parser.add_argument(
107        "-e",
108        "--environment",
109        help="The environment to use for the command.  "
110        "Corresponds to an environment in your "
111        '"envs" dictionary in config.json.  If you '
112        "only have one environment specified, "
113        "streamparse will automatically use this.",
114    )
115
116
117def add_name(parser):
118    """ Add --name option to parser """
119    parser.add_argument(
120        "-n",
121        "--name",
122        help="The name of the topology to act on.  If you have "
123        'only one topology defined in your "topologies" '
124        "directory, streamparse will use it "
125        "automatically.",
126    )
127
128
129def add_options(parser):
130    """ Add --option options to parser """
131    parser.add_argument(
132        "-o",
133        "--option",
134        dest="options",
135        action=_StoreDictAction,
136        help="Topology option to pass on to Storm. For example,"
137        ' "-o topology.debug=true" is equivalent to '
138        '"--debug".  May be repeated multiple for multiple'
139        " options.",
140    )
141
142
143def add_override_name(parser):
144    """ Add --override_name option to parser """
145    parser.add_argument(
146        "-N",
147        "--override_name",
148        help="For operations such as creating virtualenvs and "
149        "killing/submitting topologies, use this value "
150        "instead of NAME.  This is useful if you want to "
151        "submit the same topology twice without having to "
152        "duplicate the topology file.",
153    )
154
155
156def add_overwrite_virtualenv(parser):
157    """ Add --overwrite_virtualenv option to parser """
158    parser.add_argument(
159        "--overwrite_virtualenv",
160        help="Create the virtualenv even if it already exists."
161        " This is useful when you have changed your "
162        "virtualenv_flags.",
163        action="store_true",
164    )
165
166
167def add_pattern(parser):
168    """ Add --pattern option to parser """
169    parser.add_argument("--pattern", help="Pattern of log files to operate on.")
170
171
172def add_pool_size(parser):
173    """ Add --pool_size option to parser """
174    parser.add_argument(
175        "--pool_size",
176        help="Number of simultaneous SSH connections to use when updating "
177        "virtualenvs, removing logs, or tailing logs.",
178        default=10,
179        type=int,
180    )
181
182
183def add_requirements(parser):
184    """ Add --requirements option to parser """
185    parser.add_argument(
186        "-r",
187        "--requirements",
188        nargs="*",
189        help="Path to pip-style requirements file specifying "
190        "the dependencies to use for creating the "
191        "virtualenv for this topology.  If unspecified, "
192        "streamparse will look for a file called NAME.txt "
193        "in the directory specified by the "
194        "virtualenv_specs setting in config.json.",
195    )
196
197
198def add_simple_jar(parser):
199    """ Add --simple_jar option to parser. """
200    parser.add_argument(
201        "-s",
202        "--simple_jar",
203        action="store_true",
204        help="Instead of creating an Uber-JAR for the "
205        "topology, which contains all of its JVM "
206        "dependencies, create a simple JAR with just the "
207        "code for the project.  This is useful when your "
208        "project is pure Python and has no JVM "
209        "dependencies.",
210    )
211
212
213def add_timeout(parser):
214    """ Add --timeout option to parser """
215    parser.add_argument(
216        "--timeout",
217        type=int,
218        default=7000,
219        help="Milliseconds to wait for Nimbus to respond. " "(default: %(default)s)",
220    )
221
222
223def add_user(parser, allow_short=False):
224    """Add --user option to parser
225
226    Set allow_short to add -u as well.
227    """
228    args = ["--user"]
229    if allow_short:
230        args.insert(0, "-u")
231
232    kwargs = {
233        "help": "User argument to sudo when creating and deleting virtualenvs.",
234        "default": None,
235        "type": option_alias("sudo_user"),
236        "dest": "options",
237        "action": _StoreDictAction,
238    }
239
240    parser.add_argument(*args, **kwargs)
241
242
243def add_wait(parser):
244    """ Add --wait option to parser """
245    parser.add_argument(
246        "--wait",
247        type=int,
248        default=5,
249        help="Seconds to wait before killing topology. " "(default: %(default)s)",
250    )
251
252
253def add_workers(parser):
254    """ Add --workers option to parser """
255    parser.add_argument(
256        "-w",
257        "--workers",
258        help="Set number of Storm workers for your topology. "
259        "Defaults to the number of worker nodes in your "
260        "Storm environment.",
261        type=option_alias("topology.workers"),
262        action=_StoreDictAction,
263        dest="options",
264    )
265
266
267VIRTUALENV_OPTIONS = (
268    "install_virtualenv",
269    "use_virtualenv",
270    "virtualenv_flags",
271    "virtualenv_root",
272    "virtualenv_name",
273)
274
275
276def resolve_options(
277    cli_options, env_config, topology_class, topology_name, local_only=False
278):
279    """Resolve potentially conflicting Storm options from three sources:
280
281    CLI options > Topology options > config.json options
282
283    :param local_only: Whether or not we should talk to Nimbus to get Storm
284                       workers and other info.
285    """
286    storm_options = {}
287
288    # Start with environment options
289    storm_options.update(env_config.get("options", {}))
290
291    # Set topology.python.path
292    if env_config.get("use_virtualenv", True):
293        python_path = "/".join(
294            [env_config["virtualenv_root"], topology_name, "bin", "python"]
295        )
296        # This setting is for information purposes only, and is not actually
297        # read by any pystorm or streamparse code.
298        storm_options["topology.python.path"] = python_path
299
300    # Set logging options based on environment config
301    log_config = env_config.get("log", {})
302    log_path = log_config.get("path") or env_config.get("log_path")
303    log_file = log_config.get("file") or env_config.get("log_file")
304    if log_path:
305        storm_options["pystorm.log.path"] = log_path
306    if log_file:
307        storm_options["pystorm.log.file"] = log_file
308    if isinstance(log_config.get("max_bytes"), int):
309        storm_options["pystorm.log.max_bytes"] = log_config["max_bytes"]
310    if isinstance(log_config.get("backup_count"), int):
311        storm_options["pystorm.log.backup_count"] = log_config["backup_count"]
312    if isinstance(log_config.get("level"), str):
313        storm_options["pystorm.log.level"] = log_config["level"].lower()
314
315    # Make sure virtualenv options are present here
316    for venv_option in VIRTUALENV_OPTIONS:
317        if venv_option in env_config:
318            storm_options[venv_option] = env_config[venv_option]
319
320    # Set sudo_user default to SSH user if we have one
321    storm_options["sudo_user"] = env_config.get("user", None)
322
323    # Override options with topology options
324    storm_options.update(topology_class.config)
325
326    # Override options with CLI options
327    storm_options.update(cli_options or {})
328
329    # Set log level to debug if topology.debug is set
330    if storm_options.get("topology.debug", False):
331        storm_options["pystorm.log.level"] = "debug"
332
333    # If ackers and executors still aren't set, use number of worker nodes
334    if not local_only:
335        if not storm_options.get("storm.workers.list"):
336            storm_options["storm.workers.list"] = get_storm_workers(env_config)
337        elif isinstance(storm_options["storm.workers.list"], str):
338            storm_options["storm.workers.list"] = storm_options[
339                "storm.workers.list"
340            ].split(",")
341        num_storm_workers = len(storm_options["storm.workers.list"])
342    else:
343        storm_options["storm.workers.list"] = []
344        num_storm_workers = 1
345    if storm_options.get("topology.acker.executors") is None:
346        storm_options["topology.acker.executors"] = num_storm_workers
347    if storm_options.get("topology.workers") is None:
348        storm_options["topology.workers"] = num_storm_workers
349
350    # If sudo_user was not present anywhere, set it to "root"
351    storm_options.setdefault("sudo_user", "root")
352
353    return storm_options
354
355
356def warn_about_deprecated_user(user, func_name):
357    if user is not None:
358        warnings.warn(
359            (
360                "The 'user' argument to '{}' will be removed in the next "
361                "major release of streamparse. Provide the 'sudo_user' key to"
362                " the 'options' dict argument instead."
363            ).format(func_name),
364            DeprecationWarning,
365        )
366