1import typing
2
3from mitmproxy import exceptions
4from mitmproxy import flowfilter
5from mitmproxy import ctx
6
7
8class StickyAuth:
9    def __init__(self):
10        self.flt = None
11        self.hosts = {}
12
13    def load(self, loader):
14        loader.add_option(
15            "stickyauth", typing.Optional[str], None,
16            "Set sticky auth filter. Matched against requests."
17        )
18
19    def configure(self, updated):
20        if "stickyauth" in updated:
21            if ctx.options.stickyauth:
22                flt = flowfilter.parse(ctx.options.stickyauth)
23                if not flt:
24                    raise exceptions.OptionsError(
25                        "stickyauth: invalid filter expression: %s" % ctx.options.stickyauth
26                    )
27                self.flt = flt
28            else:
29                self.flt = None
30
31    def request(self, flow):
32        if self.flt:
33            host = flow.request.host
34            if "authorization" in flow.request.headers:
35                self.hosts[host] = flow.request.headers["authorization"]
36            elif flowfilter.match(self.flt, flow):
37                if host in self.hosts:
38                    flow.request.headers["authorization"] = self.hosts[host]
39