1import asyncio 2import os.path 3import sys 4import typing 5 6from mitmproxy import ctx 7from mitmproxy import exceptions 8from mitmproxy import flowfilter 9from mitmproxy import io 10from mitmproxy import command 11 12 13class ReadFile: 14 """ 15 An addon that handles reading from file on startup. 16 """ 17 def __init__(self): 18 self.filter = None 19 self.is_reading = False 20 21 def load(self, loader): 22 loader.add_option( 23 "rfile", typing.Optional[str], None, 24 "Read flows from file." 25 ) 26 loader.add_option( 27 "readfile_filter", typing.Optional[str], None, 28 "Read only matching flows." 29 ) 30 31 def configure(self, updated): 32 if "readfile_filter" in updated: 33 filt = None 34 if ctx.options.readfile_filter: 35 filt = flowfilter.parse(ctx.options.readfile_filter) 36 if not filt: 37 raise exceptions.OptionsError( 38 "Invalid readfile filter: %s" % ctx.options.readfile_filter 39 ) 40 self.filter = filt 41 42 async def load_flows(self, fo: typing.IO[bytes]) -> int: 43 cnt = 0 44 freader = io.FlowReader(fo) 45 try: 46 for flow in freader.stream(): 47 if self.filter and not self.filter(flow): 48 continue 49 await ctx.master.load_flow(flow) 50 cnt += 1 51 except (OSError, exceptions.FlowReadException) as e: 52 if cnt: 53 ctx.log.warn("Flow file corrupted - loaded %i flows." % cnt) 54 else: 55 ctx.log.error("Flow file corrupted.") 56 raise exceptions.FlowReadException(str(e)) from e 57 else: 58 return cnt 59 60 async def load_flows_from_path(self, path: str) -> int: 61 path = os.path.expanduser(path) 62 try: 63 with open(path, "rb") as f: 64 return await self.load_flows(f) 65 except OSError as e: 66 ctx.log.error(f"Cannot load flows: {e}") 67 raise exceptions.FlowReadException(str(e)) from e 68 69 async def doread(self, rfile): 70 self.is_reading = True 71 try: 72 await self.load_flows_from_path(ctx.options.rfile) 73 except exceptions.FlowReadException as e: 74 raise exceptions.OptionsError(e) from e 75 finally: 76 self.is_reading = False 77 78 def running(self): 79 if ctx.options.rfile: 80 asyncio.get_event_loop().create_task(self.doread(ctx.options.rfile)) 81 82 @command.command("readfile.reading") 83 def reading(self) -> bool: 84 return self.is_reading 85 86 87class ReadFileStdin(ReadFile): 88 """Support the special case of "-" for reading from stdin""" 89 async def load_flows_from_path(self, path: str) -> int: 90 if path == "-": # pragma: no cover 91 # Need to think about how to test this. This function is scheduled 92 # onto the event loop, where a sys.stdin mock has no effect. 93 return await self.load_flows(sys.stdin.buffer) 94 else: 95 return await super().load_flows_from_path(path) 96