1"""UNIX terminal recording functionalities 2 3This module exposes functions for 4 - recording the output of a shell process in asciicast v2 format (`record`) 5 - producing frames (2D array of CharacterCell) from the raw output of 6 `record` (`timed_frames`) 7 8A context manager named `TerminalMode` is also provided and is to be used with 9the `record` function to ensure that the terminal state is always properly 10restored, otherwise a failure during a call to `record` could render the 11terminal unusable. 12""" 13 14import codecs 15import datetime 16import fcntl 17import os 18import pty 19import select 20import struct 21import termios 22import tty 23from collections import defaultdict, namedtuple 24from typing import Iterator 25 26import pyte 27import pyte.screens 28 29from termtosvg import anim 30from termtosvg.asciicast import AsciiCastV2Event, AsciiCastV2Header 31 32TimedFrame = namedtuple('TimedFrame', ['time', 'duration', 'buffer']) 33 34 35class TerminalMode: 36 """Save terminal mode and size on entry, restore them on exit 37 38 This context manager exists to ensure that the state of the terminal is 39 properly restored when functions like `_record` (which relies on setting 40 the terminal mode to raw and changing the geometry of the screen) fail. 41 """ 42 def __init__(self, fileno): 43 self.fileno = fileno 44 self.mode = None 45 self.ttysize = None 46 47 def __enter__(self): 48 try: 49 self.mode = tty.tcgetattr(self.fileno) 50 except tty.error: 51 pass 52 53 try: 54 columns, lines = os.get_terminal_size(self.fileno) 55 except OSError: 56 pass 57 else: 58 self.ttysize = struct.pack("HHHH", lines, columns, 0, 0) 59 60 return self.mode, self.ttysize 61 62 def __exit__(self, exc_type, exc_val, exc_tb): 63 if self.ttysize is not None: 64 fcntl.ioctl(self.fileno, termios.TIOCSWINSZ, self.ttysize) 65 66 if self.mode is not None: 67 tty.tcsetattr(self.fileno, tty.TCSAFLUSH, self.mode) 68 69 70def _record(process_args, columns, lines, input_fileno, output_fileno): 71 """Record raw input and output of a process 72 73 This function forks the current process. The child process runs the command 74 specified by 'process_args' which is a session leader and has a controlling 75 terminal and is run in the background. The parent process, which runs in 76 the foreground, transmits data between the standard input, output and the 77 child process and logs it. From the user point of view, it appears they are 78 communicating with the process they intend to record (through their 79 terminal emulator) when in fact they communicate with our parent process 80 which logs all data exchanges with the user 81 82 The implementation of this method is mostly copied from the pty.spawn 83 function of the CPython standard library. It has been modified in order to 84 make the record function a generator. 85 See https://github.com/python/cpython/blob/master/Lib/pty.py 86 87 :param process_args: List of arguments to run the process to be recorded 88 :param columns: Initial number of columns of the terminal 89 :param lines: Initial number of lines of the terminal 90 :param input_fileno: File descriptor of the input data stream 91 :param output_fileno: File descriptor of the output data stream 92 """ 93 pid, master_fd = pty.fork() 94 if pid == 0: 95 # Child process - this call never returns 96 os.execlp(process_args[0], *process_args) 97 98 # Parent process 99 # Set the terminal size for master_fd 100 ttysize = struct.pack("HHHH", lines, columns, 0, 0) 101 fcntl.ioctl(master_fd, termios.TIOCSWINSZ, ttysize) 102 103 try: 104 tty.setraw(input_fileno) 105 except tty.error: 106 pass 107 108 for data, time in _capture_output(input_fileno, output_fileno, master_fd): 109 yield data, time 110 111 os.close(master_fd) 112 113 _, child_exit_status = os.waitpid(pid, 0) 114 return child_exit_status 115 116 117def _capture_output(input_fileno, output_fileno, master_fd, buffer_size=1024): 118 """Send data from input_fileno to master_fd and send data from master_fd to 119 output_fileno and to the caller 120 121 The implementation of this method is mostly copied from the pty.spawn 122 function of the CPython standard library. It has been modified in order to 123 make the `record` function a generator. 124 125 See https://github.com/python/cpython/blob/master/Lib/pty.py 126 """ 127 rlist = [input_fileno, master_fd] 128 xlist = [input_fileno, output_fileno, master_fd] 129 130 xfds = [] 131 while not xfds: 132 rfds, _, xfds = select.select(rlist, [], xlist) 133 for fd in rfds: 134 try: 135 data = os.read(fd, buffer_size) 136 except OSError: 137 xfds.append(fd) 138 continue 139 140 if not data: 141 xfds.append(fd) 142 continue 143 144 if fd == input_fileno: 145 write_fileno = master_fd 146 else: 147 write_fileno = output_fileno 148 yield data, datetime.datetime.now() 149 150 while data: 151 n = os.write(write_fileno, data) 152 data = data[n:] 153 154 155def _group_by_time(event_records, min_rec_duration, max_rec_duration, last_rec_duration): 156 """Merge event records together if they are close enough 157 158 The time elapsed between two consecutive event records returned by this 159 function is guaranteed to be at least min_rec_duration. 160 161 The duration of each record is also computed. Any record with a duration 162 greater than `max_rec_duration` will see its duration reduce to this value. 163 The duration of the last record can't be computed and is simply set to 164 `last_rec_duration`. 165 166 :param event_records: Sequence of records in asciicast v2 format 167 :param min_rec_duration: Minimum time between two records returned by the 168 function in milliseconds. 169 :param max_rec_duration: Maximum duration of a record in milliseconds 170 :param last_rec_duration: Duration of the last record in milliseconds 171 :return: Sequence of records with duration 172 """ 173 # TODO: itertools.accumulate? 174 current_string = '' 175 current_time = 0 176 dropped_time = 0 177 178 if max_rec_duration: 179 max_rec_duration /= 1000 180 181 for event_record in event_records: 182 assert isinstance(event_record, AsciiCastV2Event) 183 # Silently ignoring the duration on input records is a source 184 # of confusion so fail hard if the duration is set 185 assert event_record.duration is None 186 if event_record.event_type != 'o': 187 continue 188 189 time_between_events = event_record.time - (current_time + dropped_time) 190 if time_between_events * 1000 >= min_rec_duration: 191 if max_rec_duration: 192 if max_rec_duration < time_between_events: 193 dropped_time += time_between_events - max_rec_duration 194 time_between_events = max_rec_duration 195 accumulator_event = AsciiCastV2Event(time=current_time, 196 event_type='o', 197 event_data=current_string, 198 duration=time_between_events) 199 yield accumulator_event 200 current_string = '' 201 current_time += time_between_events 202 203 current_string += event_record.event_data 204 205 accumulator_event = AsciiCastV2Event(time=current_time, 206 event_type='o', 207 event_data=current_string, 208 duration=last_rec_duration / 1000) 209 yield accumulator_event 210 211 212def record(process_args, columns, lines, input_fileno, output_fileno): 213 """Record a process in asciicast v2 format 214 215 The records returned by this method are: 216 - a single header containing configuration information 217 - multiple event records made of data captured from the terminal and 218 timing information (except for record duration which needs to be 219 computed separately) 220 221 :param process_args: Arguments required to spawn the process (list of 222 string) 223 :param columns: Width of the terminal screen (integer) 224 :param lines: Height of the terminal screen (integer) 225 :param input_fileno: File descriptor that will be used as the standard 226 input of the process 227 :param output_fileno: File descriptor that will be used as the standard 228 output of the process 229 230 When using `sys.stdout.fileno()` for `output_fileno` there is a risk 231 that the terminal is left in an unusable state if `record` fails. To 232 prevent this, `record` should be called inside the `TerminalMode` 233 context manager. 234 """ 235 yield AsciiCastV2Header(version=2, width=columns, height=lines, theme=None) 236 237 # TODO: why start != 0? 238 start = None 239 utf8_decoder = codecs.getincrementaldecoder('utf-8')('replace') 240 for data, time in _record(process_args, columns, lines, input_fileno, output_fileno): 241 if start is None: 242 start = time 243 244 yield AsciiCastV2Event(time=(time - start).total_seconds(), 245 event_type='o', 246 event_data=utf8_decoder.decode(data), 247 duration=None) 248 249 250def timed_frames(records, min_frame_dur=1, max_frame_dur=None, last_frame_dur=1000): 251 """Return a tuple made of the geometry of the screen and a generator of 252 instances of TimedFrame computed from asciicast records 253 254 Asciicast records are first coalesced so that the mininum duration between 255 two frames is at least `min_frame_dur` milliseconds. Events with a duration 256 greater than `max_frame_dur` will see their duration reduced to that value. 257 258 The duration of all frames lasting until the end of the animation 259 will be adjusted so that the last frame of the animation lasts 260 `last_frame_dur` 261 262 :param records: Terminal session record in Asciicast v2 format 263 :param min_frame_dur: Minimum frame duration in milliseconds (integer) 264 :param min_frame_dur: Minimum frame duration in milliseconds (integer) 265 :param max_frame_dur: Maximum frame duration in milliseconds (None or 266 integer) 267 :param last_frame_dur: Duration of the last frame of the animation 268 (integer) 269 """ 270 if not isinstance(records, Iterator): 271 records = iter(records) 272 273 header = next(records) 274 assert isinstance(header, AsciiCastV2Header) 275 276 if not max_frame_dur and header.idle_time_limit: 277 max_frame_dur = int(header.idle_time_limit * 1000) 278 279 def generator(): 280 screen = pyte.Screen(header.width, header.height) 281 stream = pyte.Stream(screen) 282 timed_records = _group_by_time(records, min_frame_dur, max_frame_dur, 283 last_frame_dur) 284 285 for record_ in timed_records: 286 assert isinstance(record_, AsciiCastV2Event) 287 for char in record_.event_data: 288 stream.feed(char) 289 yield TimedFrame(int(1000 * record_.time), 290 int(1000 * record_.duration), 291 _screen_buffer(screen)) 292 293 return (header.width, header.height), generator() 294 295 296def _screen_buffer(screen): 297 assert isinstance(screen, pyte.Screen) 298 299 buffer = defaultdict(dict) 300 for row in range(screen.lines): 301 buffer[row] = { 302 column: anim.CharacterCell.from_pyte(screen.buffer[row][column]) 303 for column in screen.buffer[row] 304 } 305 306 if not screen.cursor.hidden: 307 row, column = screen.cursor.y, screen.cursor.x 308 try: 309 data = screen.buffer[row][column].data 310 except KeyError: 311 data = ' ' 312 313 cursor_char = pyte.screens.Char(data=data, 314 fg=screen.cursor.attrs.fg, 315 bg=screen.cursor.attrs.bg, 316 reverse=True) 317 buffer[row][column] = anim.CharacterCell.from_pyte(cursor_char) 318 return buffer 319 320 321def get_terminal_size(fileno): 322 try: 323 columns, lines = os.get_terminal_size(fileno) 324 except OSError: 325 columns, lines = 80, 24 326 327 return columns, lines 328