1import asyncio
2import os.path
3import sys
4import typing
5
6from mitmproxy import ctx
7from mitmproxy import exceptions
8from mitmproxy import flowfilter
9from mitmproxy import io
10from mitmproxy import command
11
12
13class ReadFile:
14    """
15        An addon that handles reading from file on startup.
16    """
17    def __init__(self):
18        self.filter = None
19        self.is_reading = False
20
21    def load(self, loader):
22        loader.add_option(
23            "rfile", typing.Optional[str], None,
24            "Read flows from file."
25        )
26        loader.add_option(
27            "readfile_filter", typing.Optional[str], None,
28            "Read only matching flows."
29        )
30
31    def configure(self, updated):
32        if "readfile_filter" in updated:
33            filt = None
34            if ctx.options.readfile_filter:
35                filt = flowfilter.parse(ctx.options.readfile_filter)
36                if not filt:
37                    raise exceptions.OptionsError(
38                        "Invalid readfile filter: %s" % ctx.options.readfile_filter
39                    )
40            self.filter = filt
41
42    async def load_flows(self, fo: typing.IO[bytes]) -> int:
43        cnt = 0
44        freader = io.FlowReader(fo)
45        try:
46            for flow in freader.stream():
47                if self.filter and not self.filter(flow):
48                    continue
49                await ctx.master.load_flow(flow)
50                cnt += 1
51        except (OSError, exceptions.FlowReadException) as e:
52            if cnt:
53                ctx.log.warn("Flow file corrupted - loaded %i flows." % cnt)
54            else:
55                ctx.log.error("Flow file corrupted.")
56            raise exceptions.FlowReadException(str(e)) from e
57        else:
58            return cnt
59
60    async def load_flows_from_path(self, path: str) -> int:
61        path = os.path.expanduser(path)
62        try:
63            with open(path, "rb") as f:
64                return await self.load_flows(f)
65        except OSError as e:
66            ctx.log.error(f"Cannot load flows: {e}")
67            raise exceptions.FlowReadException(str(e)) from e
68
69    async def doread(self, rfile):
70        self.is_reading = True
71        try:
72            await self.load_flows_from_path(ctx.options.rfile)
73        except exceptions.FlowReadException as e:
74            raise exceptions.OptionsError(e) from e
75        finally:
76            self.is_reading = False
77
78    def running(self):
79        if ctx.options.rfile:
80            asyncio.get_event_loop().create_task(self.doread(ctx.options.rfile))
81
82    @command.command("readfile.reading")
83    def reading(self) -> bool:
84        return self.is_reading
85
86
87class ReadFileStdin(ReadFile):
88    """Support the special case of "-" for reading from stdin"""
89    async def load_flows_from_path(self, path: str) -> int:
90        if path == "-":  # pragma: no cover
91            # Need to think about how to test this. This function is scheduled
92            # onto the event loop, where a sys.stdin mock has no effect.
93            return await self.load_flows(sys.stdin.buffer)
94        else:
95            return await super().load_flows_from_path(path)
96