1""" 2Twisted integration for Urwid. 3 4This module allows you to serve Urwid applications remotely over ssh. 5 6The idea is that the server listens as an SSH server, and each connection is 7routed by Twisted to urwid, and the urwid UI is routed back to the console. 8The concept was a bit of a head-bender for me, but really we are just sending 9escape codes and the what-not back to the console over the shell that ssh has 10created. This is the same service as provided by the UI components in 11twisted.conch.insults.window, except urwid has more features, and seems more 12mature. 13 14This module is not highly configurable, and the API is not great, so 15don't worry about just using it as an example and copy-pasting. 16 17Process 18------- 19 20 21TODO: 22 23- better gpm tracking: there is no place for os.Popen in a Twisted app I 24 think. 25 26Copyright: 2010, Ali Afshar <aafshar@gmail.com> 27License: MIT <http://www.opensource.org/licenses/mit-license.php> 28 29Portions Copyright: 2010, Ian Ward <ian@excess.org> 30Licence: LGPL <http://opensource.org/licenses/lgpl-2.1.php> 31""" 32 33from __future__ import print_function 34 35import os 36 37import urwid 38from urwid.raw_display import Screen 39 40from zope.interface import Interface, Attribute, implements 41from twisted.application.service import Application 42from twisted.application.internet import TCPServer 43from twisted.cred.portal import Portal 44from twisted.conch.interfaces import IConchUser, ISession 45from twisted.conch.insults.insults import TerminalProtocol, ServerProtocol 46from twisted.conch.manhole_ssh import (ConchFactory, TerminalRealm, 47 TerminalUser, TerminalSession, TerminalSessionTransport) 48 49from twisted.python.components import Componentized, Adapter 50 51 52 53class IUrwidUi(Interface): 54 55 """Toplevel urwid widget 56 """ 57 toplevel = Attribute('Urwid Toplevel Widget') 58 palette = Attribute('Urwid Palette') 59 screen = Attribute('Urwid Screen') 60 loop = Attribute('Urwid Main Loop') 61 62 def create_urwid_toplevel(): 63 """Create a toplevel widget. 64 """ 65 66 def create_urwid_mainloop(): 67 """Create the urwid main loop. 68 """ 69 70 71class IUrwidMind(Interface): 72 ui = Attribute('') 73 terminalProtocol = Attribute('') 74 terminal = Attribute('') 75 checkers = Attribute('') 76 avatar = Attribute('The avatar') 77 78 def push(data): 79 """Push data""" 80 81 def draw(): 82 """Refresh the UI""" 83 84 85 86 87class UrwidUi(object): 88 89 def __init__(self, urwid_mind): 90 self.mind = urwid_mind 91 self.toplevel = self.create_urwid_toplevel() 92 self.palette = self.create_urwid_palette() 93 self.screen = TwistedScreen(self.mind.terminalProtocol) 94 self.loop = self.create_urwid_mainloop() 95 96 def create_urwid_toplevel(self): 97 raise NotImplementedError 98 99 def create_urwid_palette(self): 100 return 101 102 def create_urwid_mainloop(self): 103 evl = urwid.TwistedEventLoop(manage_reactor=False) 104 loop = urwid.MainLoop(self.toplevel, screen=self.screen, 105 event_loop=evl, 106 unhandled_input=self.mind.unhandled_key, 107 palette=self.palette) 108 self.screen.loop = loop 109 loop.run() 110 return loop 111 112 113 114class UnhandledKeyHandler(object): 115 116 def __init__(self, mind): 117 self.mind = mind 118 119 def push(self, key): 120 if isinstance(key, tuple): 121 pass 122 else: 123 f = getattr(self, 'key_%s' % key.replace(' ', '_'), None) 124 if f is None: 125 return 126 else: 127 return f(key) 128 129 def key_ctrl_c(self, key): 130 self.mind.terminal.loseConnection() 131 132 133class UrwidMind(Adapter): 134 135 implements(IUrwidMind) 136 137 cred_checkers = [] 138 ui = None 139 140 ui_factory = None 141 unhandled_key_factory = UnhandledKeyHandler 142 143 @property 144 def avatar(self): 145 return IConchUser(self.original) 146 147 def set_terminalProtocol(self, terminalProtocol): 148 self.terminalProtocol = terminalProtocol 149 self.terminal = terminalProtocol.terminal 150 self.unhandled_key_handler = self.unhandled_key_factory(self) 151 self.unhandled_key = self.unhandled_key_handler.push 152 self.ui = self.ui_factory(self) 153 154 def push(self, data): 155 self.ui.screen.push(data) 156 157 def draw(self): 158 self.ui.loop.draw_screen() 159 160 161 162 163 164class TwistedScreen(Screen): 165 """A Urwid screen which knows about the Twisted terminal protocol that is 166 driving it. 167 168 A Urwid screen is responsible for: 169 170 1. Input 171 2. Output 172 173 Input is achieved in normal urwid by passing a list of available readable 174 file descriptors to the event loop for polling/selecting etc. In the 175 Twisted situation, this is not necessary because Twisted polls the input 176 descriptors itself. Urwid allows this by being driven using the main loop 177 instance's `process_input` method which is triggered on Twisted protocol's 178 standard `dataReceived` method. 179 """ 180 181 def __init__(self, terminalProtocol): 182 # We will need these later 183 self.terminalProtocol = terminalProtocol 184 self.terminal = terminalProtocol.terminal 185 Screen.__init__(self) 186 self.colors = 16 187 self._pal_escape = {} 188 self.bright_is_bold = True 189 self.register_palette_entry(None, 'black', 'white') 190 urwid.signals.connect_signal(self, urwid.UPDATE_PALETTE_ENTRY, 191 self._on_update_palette_entry) 192 # Don't need to wait for anything to start 193 self._started = True 194 195 # Urwid Screen API 196 197 def get_cols_rows(self): 198 """Get the size of the terminal as (cols, rows) 199 """ 200 return self.terminalProtocol.width, self.terminalProtocol.height 201 202 def draw_screen(self, maxres, r ): 203 """Render a canvas to the terminal. 204 205 The canvas contains all the information required to render the Urwid 206 UI. The content method returns a list of rows as (attr, cs, text) 207 tuples. This very simple implementation iterates each row and simply 208 writes it out. 209 """ 210 (maxcol, maxrow) = maxres 211 #self.terminal.eraseDisplay() 212 lasta = None 213 for i, row in enumerate(r.content()): 214 self.terminal.cursorPosition(0, i) 215 for (attr, cs, text) in row: 216 if attr != lasta: 217 text = '%s%s' % (self._attr_to_escape(attr), text) 218 lasta = attr 219 #if cs or attr: 220 # print cs, attr 221 self.write(text) 222 cursor = r.get_cursor() 223 if cursor is not None: 224 self.terminal.cursorPosition(*cursor) 225 226 # XXX from base screen 227 def set_mouse_tracking(self, enable=True): 228 """ 229 Enable (or disable) mouse tracking. 230 231 After calling this function get_input will include mouse 232 click events along with keystrokes. 233 """ 234 if enable: 235 self.write(urwid.escape.MOUSE_TRACKING_ON) 236 else: 237 self.write(urwid.escape.MOUSE_TRACKING_OFF) 238 239 # twisted handles polling, so we don't need the loop to do it, we just 240 # push what we get to the loop from dataReceived. 241 def hook_event_loop(self, event_loop, callback): 242 self._urwid_callback = callback 243 self._evl = event_loop 244 245 def unhook_event_loop(self, event_loop): 246 pass 247 248 # Do nothing here either. Not entirely sure when it gets called. 249 def get_input(self, raw_keys=False): 250 return 251 252 def get_available_raw_input(self): 253 data = self._data 254 self._data = [] 255 return data 256 257 # Twisted driven 258 def push(self, data): 259 """Receive data from Twisted and push it into the urwid main loop. 260 261 We must here: 262 263 1. filter the input data against urwid's input filter. 264 2. Calculate escapes and other clever things using urwid's 265 `escape.process_keyqueue`. 266 3. Pass the calculated keys as a list to the Urwid main loop. 267 4. Redraw the screen 268 """ 269 self._data = list(map(ord, data)) 270 self.parse_input(self._evl, self._urwid_callback) 271 self.loop.draw_screen() 272 273 # Convenience 274 def write(self, data): 275 self.terminal.write(data) 276 277 # Private 278 def _on_update_palette_entry(self, name, *attrspecs): 279 # copy the attribute to a dictionary containing the escape sequences 280 self._pal_escape[name] = self._attrspec_to_escape( 281 attrspecs[{16:0,1:1,88:2,256:3}[self.colors]]) 282 283 def _attr_to_escape(self, a): 284 if a in self._pal_escape: 285 return self._pal_escape[a] 286 elif isinstance(a, urwid.AttrSpec): 287 return self._attrspec_to_escape(a) 288 # undefined attributes use default/default 289 # TODO: track and report these 290 return self._attrspec_to_escape( 291 urwid.AttrSpec('default','default')) 292 293 def _attrspec_to_escape(self, a): 294 """ 295 Convert AttrSpec instance a to an escape sequence for the terminal 296 297 >>> s = Screen() 298 >>> s.set_terminal_properties(colors=256) 299 >>> a2e = s._attrspec_to_escape 300 >>> a2e(s.AttrSpec('brown', 'dark green')) 301 '\\x1b[0;33;42m' 302 >>> a2e(s.AttrSpec('#fea,underline', '#d0d')) 303 '\\x1b[0;38;5;229;4;48;5;164m' 304 """ 305 if a.foreground_high: 306 fg = "38;5;%d" % a.foreground_number 307 elif a.foreground_basic: 308 if a.foreground_number > 7: 309 if self.bright_is_bold: 310 fg = "1;%d" % (a.foreground_number - 8 + 30) 311 else: 312 fg = "%d" % (a.foreground_number - 8 + 90) 313 else: 314 fg = "%d" % (a.foreground_number + 30) 315 else: 316 fg = "39" 317 st = "1;" * a.bold + "4;" * a.underline + "7;" * a.standout 318 if a.background_high: 319 bg = "48;5;%d" % a.background_number 320 elif a.background_basic: 321 if a.background_number > 7: 322 # this doesn't work on most terminals 323 bg = "%d" % (a.background_number - 8 + 100) 324 else: 325 bg = "%d" % (a.background_number + 40) 326 else: 327 bg = "49" 328 return urwid.escape.ESC + "[0;%s;%s%sm" % (fg, st, bg) 329 330 331class UrwidTerminalProtocol(TerminalProtocol): 332 """A terminal protocol that knows to proxy input and receive output from 333 Urwid. 334 335 This integrates with the TwistedScreen in a 1:1. 336 """ 337 338 def __init__(self, urwid_mind): 339 self.urwid_mind = urwid_mind 340 self.width = 80 341 self.height = 24 342 343 def connectionMade(self): 344 self.urwid_mind.set_terminalProtocol(self) 345 self.terminalSize(self.height, self.width) 346 347 def terminalSize(self, height, width): 348 """Resize the terminal. 349 """ 350 self.width = width 351 self.height = height 352 self.urwid_mind.ui.loop.screen_size = None 353 self.terminal.eraseDisplay() 354 self.urwid_mind.draw() 355 356 def dataReceived(self, data): 357 """Received data from the connection. 358 359 This overrides the default implementation which parses and passes to 360 the keyReceived method. We don't do that here, and must not do that so 361 that Urwid can get the right juice (which includes things like mouse 362 tracking). 363 364 Instead we just pass the data to the screen instance's dataReceived, 365 which handles the proxying to Urwid. 366 """ 367 self.urwid_mind.push(data) 368 369 def _unhandled_input(self, input): 370 # evil 371 proceed = True 372 if hasattr(self.urwid_toplevel, 'app'): 373 proceed = self.urwid_toplevel.app.unhandled_input(self, input) 374 if not proceed: 375 return 376 if input == 'ctrl c': 377 self.terminal.loseConnection() 378 379 380class UrwidServerProtocol(ServerProtocol): 381 def dataReceived(self, data): 382 self.terminalProtocol.dataReceived(data) 383 384 385class UrwidUser(TerminalUser): 386 """A terminal user that remembers its avatarId 387 388 The default implementation doesn't 389 """ 390 def __init__(self, original, avatarId): 391 TerminalUser.__init__(self, original, avatarId) 392 self.avatarId = avatarId 393 394 395class UrwidTerminalSession(TerminalSession): 396 """A terminal session that remembers the avatar and chained protocol for 397 later use. And implements a missing method for changed Window size. 398 399 Note: This implementation assumes that each SSH connection will only 400 request a single shell, which is not an entirely safe assumption, but is 401 by far the most common case. 402 """ 403 404 def openShell(self, proto): 405 """Open a shell. 406 """ 407 self.chained_protocol = UrwidServerProtocol( 408 UrwidTerminalProtocol, IUrwidMind(self.original)) 409 TerminalSessionTransport( 410 proto, self.chained_protocol, 411 IConchUser(self.original), 412 self.height, self.width) 413 414 def windowChanged(self, dimensions): 415 """Called when the window size has changed. 416 """ 417 (h, w, x, y) = dimensions 418 self.chained_protocol.terminalProtocol.terminalSize(h, w) 419 420 421class UrwidRealm(TerminalRealm): 422 """Custom terminal realm class-configured to use our custom Terminal User 423 Terminal Session. 424 """ 425 def __init__(self, mind_factory): 426 self.mind_factory = mind_factory 427 428 def _getAvatar(self, avatarId): 429 comp = Componentized() 430 user = UrwidUser(comp, avatarId) 431 comp.setComponent(IConchUser, user) 432 sess = UrwidTerminalSession(comp) 433 comp.setComponent(ISession, sess) 434 mind = self.mind_factory(comp) 435 comp.setComponent(IUrwidMind, mind) 436 return user 437 438 def requestAvatar(self, avatarId, mind, *interfaces): 439 for i in interfaces: 440 if i is IConchUser: 441 return (IConchUser, 442 self._getAvatar(avatarId), 443 lambda: None) 444 raise NotImplementedError() 445 446 447def create_server_factory(urwid_mind_factory): 448 """Convenience to create a server factory with a portal that uses a realm 449 serving a given urwid widget against checkers provided. 450 """ 451 rlm = UrwidRealm(urwid_mind_factory) 452 ptl = Portal(rlm, urwid_mind_factory.cred_checkers) 453 return ConchFactory(ptl) 454 455 456def create_service(urwid_mind_factory, port, *args, **kw): 457 """Convenience to create a service for use in tac-ish situations. 458 """ 459 f = create_server_factory(urwid_mind_factory) 460 return TCPServer(port, f, *args, **kw) 461 462 463def create_application(application_name, urwid_mind_factory, 464 port, *args, **kw): 465 """Convenience to create an application suitable for tac file 466 """ 467 application = Application(application_name) 468 svc = create_service(urwid_mind_factory, 6022) 469 svc.setServiceParent(application) 470 return application 471 472