1###############################################################################
2#
3# The MIT License (MIT)
4#
5# Copyright (c) Crossbar.io Technologies GmbH
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the "Software"), to deal
9# in the Software without restriction, including without limitation the rights
10# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11# copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23# THE SOFTWARE.
24#
25###############################################################################
26
27from __future__ import absolute_import
28
29# this module is available as the 'wamp' command-line tool or as
30# 'python -m autobahn'
31
32import os
33import sys
34import argparse
35import json
36from copy import copy
37
38try:
39    from autobahn.twisted.component import Component
40except ImportError:
41    print("The 'wamp' command-line tool requires Twisted.")
42    print("  pip install autobahn[twisted]")
43    sys.exit(1)
44
45from twisted.internet.defer import Deferred, ensureDeferred
46from twisted.internet.task import react
47from twisted.internet.protocol import ProcessProtocol
48
49from autobahn.wamp.exception import ApplicationError
50from autobahn.wamp.types import PublishOptions
51from autobahn.wamp.types import SubscribeOptions
52
53
54# XXX other ideas to get 'connection config':
55# - if there .crossbar/ here, load that config and accept a --name or
56#   so to idicate which transport to use
57
58# wamp [options] {call,publish,subscribe,register} wamp-uri [args] [kwargs]
59#
60# kwargs are spec'd with a 2-value-consuming --keyword option:
61# --keyword name value
62
63
64top = argparse.ArgumentParser(prog="wamp")
65top.add_argument(
66    '--url',
67    action='store',
68    help='A WAMP URL to connect to, like ws://127.0.0.1:8080/ws or rs://localhost:1234/',
69    required=True,
70)
71top.add_argument(
72    '--realm', '-r',
73    action='store',
74    help='The realm to join',
75    default='default',
76)
77top.add_argument(
78    '--private-key', '-k',
79    action='store',
80    help='Hex-encoded private key (via WAMP_PRIVATE_KEY if not provided here)',
81    default=os.environ.get('WAMP_PRIVATE_KEY', None),
82)
83top.add_argument(
84    '--authid',
85    action='store',
86    help='The authid to use, if authenticating',
87    default=None,
88)
89top.add_argument(
90    '--authrole',
91    action='store',
92    help='The role to use, if authenticating',
93    default=None,
94)
95top.add_argument(
96    '--max-failures', '-m',
97    action='store',
98    type=int,
99    help='Failures before giving up (0 forever)',
100    default=0,
101)
102sub = top.add_subparsers(
103    title="subcommands",
104    dest="subcommand_name",
105)
106
107
108call = sub.add_parser(
109    'call',
110    help='Do a WAMP call() and print any results',
111)
112call.add_argument(
113    'uri',
114    type=str,
115    help="A WAMP URI to call"
116)
117call.add_argument(
118    'call_args',
119    nargs='*',
120    help="All additional arguments are positional args",
121)
122call.add_argument(
123    '--keyword',
124    nargs=2,
125    action='append',
126    help="Specify a keyword argument to send: name value",
127)
128
129
130publish = sub.add_parser(
131    'publish',
132    help='Do a WAMP publish() with the given args, kwargs',
133)
134publish.add_argument(
135    'uri',
136    type=str,
137    help="A WAMP URI to publish"
138)
139publish.add_argument(
140    'publish_args',
141    nargs='*',
142    help="All additional arguments are positional args",
143)
144publish.add_argument(
145    '--keyword',
146    nargs=2,
147    action='append',
148    help="Specify a keyword argument to send: name value",
149)
150
151
152register = sub.add_parser(
153    'register',
154    help='Do a WAMP register() and run a command when called',
155)
156register.add_argument(
157    'uri',
158    type=str,
159    help="A WAMP URI to call"
160)
161register.add_argument(
162    '--times',
163    type=int,
164    default=0,
165    help="Listen for this number of events, then exit. Default: forever",
166)
167register.add_argument(
168    'command',
169    type=str,
170    nargs='*',
171    help=(
172        "Takes one or more args: the executable to call, and any positional "
173        "arguments. As well, the following environment variables are set: "
174        "WAMP_ARGS, WAMP_KWARGS and _JSON variants."
175    )
176)
177
178
179subscribe = sub.add_parser(
180    'subscribe',
181    help='Do a WAMP subscribe() and print one line of JSON per event',
182)
183subscribe.add_argument(
184    'uri',
185    type=str,
186    help="A WAMP URI to call"
187)
188subscribe.add_argument(
189    '--times',
190    type=int,
191    default=0,
192    help="Listen for this number of events, then exit. Default: forever",
193)
194subscribe.add_argument(
195    '--match',
196    type=str,
197    default='exact',
198    choices=['exact', 'prefix'],
199    help="Massed in the SubscribeOptions, how to match the URI",
200)
201
202
203def _create_component(options):
204    """
205    Configure and return a Component instance according to the given
206    `options`
207    """
208    if options.url.startswith('ws://'):
209        kind = 'websocket'
210    elif options.url.startswith('rs://'):
211        kind = 'rawsocket'
212    else:
213        raise ValueError(
214            "URL should start with ws:// or rs://"
215        )
216
217    authentication = dict()
218    if options.private_key:
219        if not options.authid:
220            raise ValueError(
221                "Require --authid and --authrole if --private-key (or WAMP_PRIVATE_KEY) is provided"
222            )
223        authentication["cryptosign"] = {
224            "authid": options.authid,
225            "authrole": options.authrole,
226            "privkey": options.private_key,
227        }
228
229    return Component(
230        transports=[{
231            "type": kind,
232            "url": options.url,
233        }],
234        authentication=authentication if authentication else None,
235        realm=options.realm,
236    )
237
238
239async def do_call(reactor, session, options):
240    call_args = list(options.call_args)
241    call_kwargs = dict()
242    if options.keyword is not None:
243        call_kwargs = {
244            k: v
245            for k, v in options.keyword
246        }
247
248    results = await session.call(options.uri, *call_args, **call_kwargs)
249    print("result: {}".format(results))
250
251
252async def do_publish(reactor, session, options):
253    publish_args = list(options.publish_args)
254    publish_kwargs = {
255        k: v
256        for k, v in options.keyword
257    }
258
259    await session.publish(
260        options.uri,
261        *publish_args,
262        options=PublishOptions(acknowledge=True),
263        **publish_kwargs,
264    )
265
266
267async def do_register(reactor, session, options):
268    """
269    run a command-line upon an RPC call
270    """
271
272    all_done = Deferred()
273    countdown = [options.times]
274
275    async def called(*args, **kw):
276        print("called: args={}, kwargs={}".format(args, kw), file=sys.stderr)
277        env = copy(os.environ)
278        env['WAMP_ARGS'] = ' '.join(args)
279        env['WAMP_ARGS_JSON'] = json.dumps(args)
280        env['WAMP_KWARGS'] = ' '.join('{}={}'.format(k, v) for k, v in kw.items())
281        env['WAMP_KWARGS_JSON'] = json.dumps(kw)
282
283        exe = os.path.abspath(options.command[0])
284        args = options.command
285        done = Deferred()
286
287        class DumpOutput(ProcessProtocol):
288            def outReceived(self, data):
289                sys.stdout.write(data.decode('utf8'))
290
291            def errReceived(self, data):
292                sys.stderr.write(data.decode('utf8'))
293
294            def processExited(self, reason):
295                done.callback(reason.value.exitCode)
296
297        proto = DumpOutput()
298        reactor.spawnProcess(
299            proto, exe, args, env=env, path="."
300        )
301        code = await done
302
303        if code != 0:
304            print("Failed with exit-code {}".format(code))
305        if countdown[0]:
306            countdown[0] -= 1
307            if countdown[0] <= 0:
308                reactor.callLater(0, all_done.callback, None)
309
310    await session.register(called, options.uri)
311    await all_done
312
313
314async def do_subscribe(reactor, session, options):
315    """
316    print events (one line of JSON per event)
317    """
318
319    all_done = Deferred()
320    countdown = [options.times]
321
322    async def published(*args, **kw):
323        print(
324            json.dumps({
325                "args": args,
326                "kwargs": kw,
327            })
328        )
329        if countdown[0]:
330            countdown[0] -= 1
331            if countdown[0] <= 0:
332                reactor.callLater(0, all_done.callback, None)
333
334    await session.subscribe(published, options.uri, options=SubscribeOptions(match=options.match))
335    await all_done
336
337
338def _main():
339    """
340    This is a magic name for `python -m autobahn`, and specified as
341    our entry_point in setup.py
342    """
343    react(
344        lambda reactor: ensureDeferred(
345            _real_main(reactor)
346        )
347    )
348
349
350async def _real_main(reactor):
351    """
352    Sanity check options, create a connection and run our subcommand
353    """
354    options = top.parse_args()
355    component = _create_component(options)
356
357    if options.subcommand_name is None:
358        print("Must select a subcommand")
359        sys.exit(1)
360
361    if options.subcommand_name == "register":
362        exe = options.command[0]
363        if not os.path.isabs(exe):
364            print("Full path to the executable required. Found: {}".format(exe), file=sys.stderr)
365            sys.exit(1)
366        if not os.path.exists(exe):
367            print("Executable not found: {}".format(exe), file=sys.stderr)
368            sys.exit(1)
369
370    subcommands = {
371        "call": do_call,
372        "register": do_register,
373        "subscribe": do_subscribe,
374        "publish": do_publish,
375    }
376    command_fn = subcommands[options.subcommand_name]
377
378    exit_code = [0]
379
380    @component.on_join
381    async def _(session, details):
382        print("connected: authrole={} authmethod={}".format(details.authrole, details.authmethod), file=sys.stderr)
383        try:
384            await command_fn(reactor, session, options)
385        except ApplicationError as e:
386            print("\n{}: {}\n".format(e.error, ''.join(e.args)))
387            exit_code[0] = 5
388        await session.leave()
389
390    failures = []
391
392    @component.on_connectfailure
393    async def _(comp, fail):
394        print("connect failure: {}".format(fail))
395        failures.append(fail)
396        if options.max_failures > 0 and len(failures) > options.max_failures:
397            print("Too many failures ({}). Exiting".format(len(failures)))
398            reactor.stop()
399
400    await component.start(reactor)
401    # sys.exit(exit_code[0])
402
403
404if __name__ == "__main__":
405    _main()
406