1############################################################################### 2# 3# The MIT License (MIT) 4# 5# Copyright (c) Crossbar.io Technologies GmbH 6# 7# Permission is hereby granted, free of charge, to any person obtaining a copy 8# of this software and associated documentation files (the "Software"), to deal 9# in the Software without restriction, including without limitation the rights 10# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11# copies of the Software, and to permit persons to whom the Software is 12# furnished to do so, subject to the following conditions: 13# 14# The above copyright notice and this permission notice shall be included in 15# all copies or substantial portions of the Software. 16# 17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 23# THE SOFTWARE. 24# 25############################################################################### 26 27from __future__ import absolute_import 28 29# this module is available as the 'wamp' command-line tool or as 30# 'python -m autobahn' 31 32import os 33import sys 34import argparse 35import json 36from copy import copy 37 38try: 39 from autobahn.twisted.component import Component 40except ImportError: 41 print("The 'wamp' command-line tool requires Twisted.") 42 print(" pip install autobahn[twisted]") 43 sys.exit(1) 44 45from twisted.internet.defer import Deferred, ensureDeferred 46from twisted.internet.task import react 47from twisted.internet.protocol import ProcessProtocol 48 49from autobahn.wamp.exception import ApplicationError 50from autobahn.wamp.types import PublishOptions 51from autobahn.wamp.types import SubscribeOptions 52 53 54# XXX other ideas to get 'connection config': 55# - if there .crossbar/ here, load that config and accept a --name or 56# so to idicate which transport to use 57 58# wamp [options] {call,publish,subscribe,register} wamp-uri [args] [kwargs] 59# 60# kwargs are spec'd with a 2-value-consuming --keyword option: 61# --keyword name value 62 63 64top = argparse.ArgumentParser(prog="wamp") 65top.add_argument( 66 '--url', 67 action='store', 68 help='A WAMP URL to connect to, like ws://127.0.0.1:8080/ws or rs://localhost:1234/', 69 required=True, 70) 71top.add_argument( 72 '--realm', '-r', 73 action='store', 74 help='The realm to join', 75 default='default', 76) 77top.add_argument( 78 '--private-key', '-k', 79 action='store', 80 help='Hex-encoded private key (via WAMP_PRIVATE_KEY if not provided here)', 81 default=os.environ.get('WAMP_PRIVATE_KEY', None), 82) 83top.add_argument( 84 '--authid', 85 action='store', 86 help='The authid to use, if authenticating', 87 default=None, 88) 89top.add_argument( 90 '--authrole', 91 action='store', 92 help='The role to use, if authenticating', 93 default=None, 94) 95top.add_argument( 96 '--max-failures', '-m', 97 action='store', 98 type=int, 99 help='Failures before giving up (0 forever)', 100 default=0, 101) 102sub = top.add_subparsers( 103 title="subcommands", 104 dest="subcommand_name", 105) 106 107 108call = sub.add_parser( 109 'call', 110 help='Do a WAMP call() and print any results', 111) 112call.add_argument( 113 'uri', 114 type=str, 115 help="A WAMP URI to call" 116) 117call.add_argument( 118 'call_args', 119 nargs='*', 120 help="All additional arguments are positional args", 121) 122call.add_argument( 123 '--keyword', 124 nargs=2, 125 action='append', 126 help="Specify a keyword argument to send: name value", 127) 128 129 130publish = sub.add_parser( 131 'publish', 132 help='Do a WAMP publish() with the given args, kwargs', 133) 134publish.add_argument( 135 'uri', 136 type=str, 137 help="A WAMP URI to publish" 138) 139publish.add_argument( 140 'publish_args', 141 nargs='*', 142 help="All additional arguments are positional args", 143) 144publish.add_argument( 145 '--keyword', 146 nargs=2, 147 action='append', 148 help="Specify a keyword argument to send: name value", 149) 150 151 152register = sub.add_parser( 153 'register', 154 help='Do a WAMP register() and run a command when called', 155) 156register.add_argument( 157 'uri', 158 type=str, 159 help="A WAMP URI to call" 160) 161register.add_argument( 162 '--times', 163 type=int, 164 default=0, 165 help="Listen for this number of events, then exit. Default: forever", 166) 167register.add_argument( 168 'command', 169 type=str, 170 nargs='*', 171 help=( 172 "Takes one or more args: the executable to call, and any positional " 173 "arguments. As well, the following environment variables are set: " 174 "WAMP_ARGS, WAMP_KWARGS and _JSON variants." 175 ) 176) 177 178 179subscribe = sub.add_parser( 180 'subscribe', 181 help='Do a WAMP subscribe() and print one line of JSON per event', 182) 183subscribe.add_argument( 184 'uri', 185 type=str, 186 help="A WAMP URI to call" 187) 188subscribe.add_argument( 189 '--times', 190 type=int, 191 default=0, 192 help="Listen for this number of events, then exit. Default: forever", 193) 194subscribe.add_argument( 195 '--match', 196 type=str, 197 default='exact', 198 choices=['exact', 'prefix'], 199 help="Massed in the SubscribeOptions, how to match the URI", 200) 201 202 203def _create_component(options): 204 """ 205 Configure and return a Component instance according to the given 206 `options` 207 """ 208 if options.url.startswith('ws://'): 209 kind = 'websocket' 210 elif options.url.startswith('rs://'): 211 kind = 'rawsocket' 212 else: 213 raise ValueError( 214 "URL should start with ws:// or rs://" 215 ) 216 217 authentication = dict() 218 if options.private_key: 219 if not options.authid: 220 raise ValueError( 221 "Require --authid and --authrole if --private-key (or WAMP_PRIVATE_KEY) is provided" 222 ) 223 authentication["cryptosign"] = { 224 "authid": options.authid, 225 "authrole": options.authrole, 226 "privkey": options.private_key, 227 } 228 229 return Component( 230 transports=[{ 231 "type": kind, 232 "url": options.url, 233 }], 234 authentication=authentication if authentication else None, 235 realm=options.realm, 236 ) 237 238 239async def do_call(reactor, session, options): 240 call_args = list(options.call_args) 241 call_kwargs = dict() 242 if options.keyword is not None: 243 call_kwargs = { 244 k: v 245 for k, v in options.keyword 246 } 247 248 results = await session.call(options.uri, *call_args, **call_kwargs) 249 print("result: {}".format(results)) 250 251 252async def do_publish(reactor, session, options): 253 publish_args = list(options.publish_args) 254 publish_kwargs = { 255 k: v 256 for k, v in options.keyword 257 } 258 259 await session.publish( 260 options.uri, 261 *publish_args, 262 options=PublishOptions(acknowledge=True), 263 **publish_kwargs, 264 ) 265 266 267async def do_register(reactor, session, options): 268 """ 269 run a command-line upon an RPC call 270 """ 271 272 all_done = Deferred() 273 countdown = [options.times] 274 275 async def called(*args, **kw): 276 print("called: args={}, kwargs={}".format(args, kw), file=sys.stderr) 277 env = copy(os.environ) 278 env['WAMP_ARGS'] = ' '.join(args) 279 env['WAMP_ARGS_JSON'] = json.dumps(args) 280 env['WAMP_KWARGS'] = ' '.join('{}={}'.format(k, v) for k, v in kw.items()) 281 env['WAMP_KWARGS_JSON'] = json.dumps(kw) 282 283 exe = os.path.abspath(options.command[0]) 284 args = options.command 285 done = Deferred() 286 287 class DumpOutput(ProcessProtocol): 288 def outReceived(self, data): 289 sys.stdout.write(data.decode('utf8')) 290 291 def errReceived(self, data): 292 sys.stderr.write(data.decode('utf8')) 293 294 def processExited(self, reason): 295 done.callback(reason.value.exitCode) 296 297 proto = DumpOutput() 298 reactor.spawnProcess( 299 proto, exe, args, env=env, path="." 300 ) 301 code = await done 302 303 if code != 0: 304 print("Failed with exit-code {}".format(code)) 305 if countdown[0]: 306 countdown[0] -= 1 307 if countdown[0] <= 0: 308 reactor.callLater(0, all_done.callback, None) 309 310 await session.register(called, options.uri) 311 await all_done 312 313 314async def do_subscribe(reactor, session, options): 315 """ 316 print events (one line of JSON per event) 317 """ 318 319 all_done = Deferred() 320 countdown = [options.times] 321 322 async def published(*args, **kw): 323 print( 324 json.dumps({ 325 "args": args, 326 "kwargs": kw, 327 }) 328 ) 329 if countdown[0]: 330 countdown[0] -= 1 331 if countdown[0] <= 0: 332 reactor.callLater(0, all_done.callback, None) 333 334 await session.subscribe(published, options.uri, options=SubscribeOptions(match=options.match)) 335 await all_done 336 337 338def _main(): 339 """ 340 This is a magic name for `python -m autobahn`, and specified as 341 our entry_point in setup.py 342 """ 343 react( 344 lambda reactor: ensureDeferred( 345 _real_main(reactor) 346 ) 347 ) 348 349 350async def _real_main(reactor): 351 """ 352 Sanity check options, create a connection and run our subcommand 353 """ 354 options = top.parse_args() 355 component = _create_component(options) 356 357 if options.subcommand_name is None: 358 print("Must select a subcommand") 359 sys.exit(1) 360 361 if options.subcommand_name == "register": 362 exe = options.command[0] 363 if not os.path.isabs(exe): 364 print("Full path to the executable required. Found: {}".format(exe), file=sys.stderr) 365 sys.exit(1) 366 if not os.path.exists(exe): 367 print("Executable not found: {}".format(exe), file=sys.stderr) 368 sys.exit(1) 369 370 subcommands = { 371 "call": do_call, 372 "register": do_register, 373 "subscribe": do_subscribe, 374 "publish": do_publish, 375 } 376 command_fn = subcommands[options.subcommand_name] 377 378 exit_code = [0] 379 380 @component.on_join 381 async def _(session, details): 382 print("connected: authrole={} authmethod={}".format(details.authrole, details.authmethod), file=sys.stderr) 383 try: 384 await command_fn(reactor, session, options) 385 except ApplicationError as e: 386 print("\n{}: {}\n".format(e.error, ''.join(e.args))) 387 exit_code[0] = 5 388 await session.leave() 389 390 failures = [] 391 392 @component.on_connectfailure 393 async def _(comp, fail): 394 print("connect failure: {}".format(fail)) 395 failures.append(fail) 396 if options.max_failures > 0 and len(failures) > options.max_failures: 397 print("Too many failures ({}). Exiting".format(len(failures))) 398 reactor.stop() 399 400 await component.start(reactor) 401 # sys.exit(exit_code[0]) 402 403 404if __name__ == "__main__": 405 _main() 406