1import os
2import warnings
3from pathlib import Path
4from typing import AbstractSet, Any, Callable, Dict, List, Mapping, Optional, Tuple, Union
5
6from .fields import ModelField
7from .main import BaseConfig, BaseModel, Extra
8from .typing import display_as_type
9from .utils import deep_update, path_type, sequence_like
10
11env_file_sentinel = str(object())
12
13SettingsSourceCallable = Callable[['BaseSettings'], Dict[str, Any]]
14
15
16class SettingsError(ValueError):
17    pass
18
19
20class BaseSettings(BaseModel):
21    """
22    Base class for settings, allowing values to be overridden by environment variables.
23
24    This is useful in production for secrets you do not wish to save in code, it plays nicely with docker(-compose),
25    Heroku and any 12 factor app design.
26    """
27
28    def __init__(
29        __pydantic_self__,
30        _env_file: Union[Path, str, None] = env_file_sentinel,
31        _env_file_encoding: Optional[str] = None,
32        _secrets_dir: Union[Path, str, None] = None,
33        **values: Any,
34    ) -> None:
35        # Uses something other than `self` the first arg to allow "self" as a settable attribute
36        super().__init__(
37            **__pydantic_self__._build_values(
38                values, _env_file=_env_file, _env_file_encoding=_env_file_encoding, _secrets_dir=_secrets_dir
39            )
40        )
41
42    def _build_values(
43        self,
44        init_kwargs: Dict[str, Any],
45        _env_file: Union[Path, str, None] = None,
46        _env_file_encoding: Optional[str] = None,
47        _secrets_dir: Union[Path, str, None] = None,
48    ) -> Dict[str, Any]:
49        # Configure built-in sources
50        init_settings = InitSettingsSource(init_kwargs=init_kwargs)
51        env_settings = EnvSettingsSource(
52            env_file=(_env_file if _env_file != env_file_sentinel else self.__config__.env_file),
53            env_file_encoding=(
54                _env_file_encoding if _env_file_encoding is not None else self.__config__.env_file_encoding
55            ),
56        )
57        file_secret_settings = SecretsSettingsSource(secrets_dir=_secrets_dir or self.__config__.secrets_dir)
58        # Provide a hook to set built-in sources priority and add / remove sources
59        sources = self.__config__.customise_sources(
60            init_settings=init_settings, env_settings=env_settings, file_secret_settings=file_secret_settings
61        )
62        if sources:
63            return deep_update(*reversed([source(self) for source in sources]))
64        else:
65            # no one should mean to do this, but I think returning an empty dict is marginally preferable
66            # to an informative error and much better than a confusing error
67            return {}
68
69    class Config(BaseConfig):
70        env_prefix = ''
71        env_file = None
72        env_file_encoding = None
73        secrets_dir = None
74        validate_all = True
75        extra = Extra.forbid
76        arbitrary_types_allowed = True
77        case_sensitive = False
78
79        @classmethod
80        def prepare_field(cls, field: ModelField) -> None:
81            env_names: Union[List[str], AbstractSet[str]]
82            field_info_from_config = cls.get_field_info(field.name)
83
84            env = field_info_from_config.get('env') or field.field_info.extra.get('env')
85            if env is None:
86                if field.has_alias:
87                    warnings.warn(
88                        'aliases are no longer used by BaseSettings to define which environment variables to read. '
89                        'Instead use the "env" field setting. '
90                        'See https://pydantic-docs.helpmanual.io/usage/settings/#environment-variable-names',
91                        FutureWarning,
92                    )
93                env_names = {cls.env_prefix + field.name}
94            elif isinstance(env, str):
95                env_names = {env}
96            elif isinstance(env, (set, frozenset)):
97                env_names = env
98            elif sequence_like(env):
99                env_names = list(env)
100            else:
101                raise TypeError(f'invalid field env: {env!r} ({display_as_type(env)}); should be string, list or set')
102
103            if not cls.case_sensitive:
104                env_names = env_names.__class__(n.lower() for n in env_names)
105            field.field_info.extra['env_names'] = env_names
106
107        @classmethod
108        def customise_sources(
109            cls,
110            init_settings: SettingsSourceCallable,
111            env_settings: SettingsSourceCallable,
112            file_secret_settings: SettingsSourceCallable,
113        ) -> Tuple[SettingsSourceCallable, ...]:
114            return init_settings, env_settings, file_secret_settings
115
116    __config__: Config  # type: ignore
117
118
119class InitSettingsSource:
120    __slots__ = ('init_kwargs',)
121
122    def __init__(self, init_kwargs: Dict[str, Any]):
123        self.init_kwargs = init_kwargs
124
125    def __call__(self, settings: BaseSettings) -> Dict[str, Any]:
126        return self.init_kwargs
127
128    def __repr__(self) -> str:
129        return f'InitSettingsSource(init_kwargs={self.init_kwargs!r})'
130
131
132class EnvSettingsSource:
133    __slots__ = ('env_file', 'env_file_encoding')
134
135    def __init__(self, env_file: Union[Path, str, None], env_file_encoding: Optional[str]):
136        self.env_file: Union[Path, str, None] = env_file
137        self.env_file_encoding: Optional[str] = env_file_encoding
138
139    def __call__(self, settings: BaseSettings) -> Dict[str, Any]:
140        """
141        Build environment variables suitable for passing to the Model.
142        """
143        d: Dict[str, Optional[str]] = {}
144
145        if settings.__config__.case_sensitive:
146            env_vars: Mapping[str, Optional[str]] = os.environ
147        else:
148            env_vars = {k.lower(): v for k, v in os.environ.items()}
149
150        if self.env_file is not None:
151            env_path = Path(self.env_file).expanduser()
152            if env_path.is_file():
153                env_vars = {
154                    **read_env_file(
155                        env_path, encoding=self.env_file_encoding, case_sensitive=settings.__config__.case_sensitive
156                    ),
157                    **env_vars,
158                }
159
160        for field in settings.__fields__.values():
161            env_val: Optional[str] = None
162            for env_name in field.field_info.extra['env_names']:
163                env_val = env_vars.get(env_name)
164                if env_val is not None:
165                    break
166
167            if env_val is None:
168                continue
169
170            if field.is_complex():
171                try:
172                    env_val = settings.__config__.json_loads(env_val)  # type: ignore
173                except ValueError as e:
174                    raise SettingsError(f'error parsing JSON for "{env_name}"') from e
175            d[field.alias] = env_val
176        return d
177
178    def __repr__(self) -> str:
179        return f'EnvSettingsSource(env_file={self.env_file!r}, env_file_encoding={self.env_file_encoding!r})'
180
181
182class SecretsSettingsSource:
183    __slots__ = ('secrets_dir',)
184
185    def __init__(self, secrets_dir: Union[Path, str, None]):
186        self.secrets_dir: Union[Path, str, None] = secrets_dir
187
188    def __call__(self, settings: BaseSettings) -> Dict[str, Any]:
189        """
190        Build fields from "secrets" files.
191        """
192        secrets: Dict[str, Optional[str]] = {}
193
194        if self.secrets_dir is None:
195            return secrets
196
197        secrets_path = Path(self.secrets_dir).expanduser()
198
199        if not secrets_path.exists():
200            warnings.warn(f'directory "{secrets_path}" does not exist')
201            return secrets
202
203        if not secrets_path.is_dir():
204            raise SettingsError(f'secrets_dir must reference a directory, not a {path_type(secrets_path)}')
205
206        for field in settings.__fields__.values():
207            for env_name in field.field_info.extra['env_names']:
208                path = secrets_path / env_name
209                if path.is_file():
210                    secrets[field.alias] = path.read_text().strip()
211                elif path.exists():
212                    warnings.warn(
213                        f'attempted to load secret file "{path}" but found a {path_type(path)} instead.',
214                        stacklevel=4,
215                    )
216
217        return secrets
218
219    def __repr__(self) -> str:
220        return f'SecretsSettingsSource(secrets_dir={self.secrets_dir!r})'
221
222
223def read_env_file(file_path: Path, *, encoding: str = None, case_sensitive: bool = False) -> Dict[str, Optional[str]]:
224    try:
225        from dotenv import dotenv_values
226    except ImportError as e:
227        raise ImportError('python-dotenv is not installed, run `pip install pydantic[dotenv]`') from e
228
229    file_vars: Dict[str, Optional[str]] = dotenv_values(file_path, encoding=encoding or 'utf8')
230    if not case_sensitive:
231        return {k.lower(): v for k, v in file_vars.items()}
232    else:
233        return file_vars
234