1import re 2import warnings 3from collections import defaultdict 4from datetime import date, datetime, time, timedelta 5from decimal import Decimal 6from enum import Enum 7from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network 8from pathlib import Path 9from typing import ( 10 TYPE_CHECKING, 11 Any, 12 Callable, 13 Dict, 14 FrozenSet, 15 Generic, 16 Iterable, 17 List, 18 Optional, 19 Pattern, 20 Sequence, 21 Set, 22 Tuple, 23 Type, 24 TypeVar, 25 Union, 26 cast, 27) 28from uuid import UUID 29 30from typing_extensions import Annotated, Literal 31 32from .fields import ( 33 MAPPING_LIKE_SHAPES, 34 SHAPE_FROZENSET, 35 SHAPE_GENERIC, 36 SHAPE_ITERABLE, 37 SHAPE_LIST, 38 SHAPE_SEQUENCE, 39 SHAPE_SET, 40 SHAPE_SINGLETON, 41 SHAPE_TUPLE, 42 SHAPE_TUPLE_ELLIPSIS, 43 FieldInfo, 44 ModelField, 45) 46from .json import pydantic_encoder 47from .networks import AnyUrl, EmailStr 48from .types import ( 49 ConstrainedDecimal, 50 ConstrainedFloat, 51 ConstrainedInt, 52 ConstrainedList, 53 ConstrainedSet, 54 ConstrainedStr, 55 SecretBytes, 56 SecretStr, 57 conbytes, 58 condecimal, 59 confloat, 60 conint, 61 conlist, 62 conset, 63 constr, 64) 65from .typing import ( 66 NONE_TYPES, 67 ForwardRef, 68 all_literal_values, 69 get_args, 70 get_origin, 71 is_callable_type, 72 is_literal_type, 73 is_namedtuple, 74) 75from .utils import ROOT_KEY, get_model, lenient_issubclass, sequence_like 76 77if TYPE_CHECKING: 78 from .dataclasses import Dataclass # noqa: F401 79 from .main import BaseModel # noqa: F401 80 81default_prefix = '#/definitions/' 82default_ref_template = '#/definitions/{model}' 83 84TypeModelOrEnum = Union[Type['BaseModel'], Type[Enum]] 85TypeModelSet = Set[TypeModelOrEnum] 86 87 88def schema( 89 models: Sequence[Union[Type['BaseModel'], Type['Dataclass']]], 90 *, 91 by_alias: bool = True, 92 title: Optional[str] = None, 93 description: Optional[str] = None, 94 ref_prefix: Optional[str] = None, 95 ref_template: str = default_ref_template, 96) -> Dict[str, Any]: 97 """ 98 Process a list of models and generate a single JSON Schema with all of them defined in the ``definitions`` 99 top-level JSON key, including their sub-models. 100 101 :param models: a list of models to include in the generated JSON Schema 102 :param by_alias: generate the schemas using the aliases defined, if any 103 :param title: title for the generated schema that includes the definitions 104 :param description: description for the generated schema 105 :param ref_prefix: the JSON Pointer prefix for schema references with ``$ref``, if None, will be set to the 106 default of ``#/definitions/``. Update it if you want the schemas to reference the definitions somewhere 107 else, e.g. for OpenAPI use ``#/components/schemas/``. The resulting generated schemas will still be at the 108 top-level key ``definitions``, so you can extract them from there. But all the references will have the set 109 prefix. 110 :param ref_template: Use a ``string.format()`` template for ``$ref`` instead of a prefix. This can be useful 111 for references that cannot be represented by ``ref_prefix`` such as a definition stored in another file. For 112 a sibling json file in a ``/schemas`` directory use ``"/schemas/${model}.json#"``. 113 :return: dict with the JSON Schema with a ``definitions`` top-level key including the schema definitions for 114 the models and sub-models passed in ``models``. 115 """ 116 clean_models = [get_model(model) for model in models] 117 flat_models = get_flat_models_from_models(clean_models) 118 model_name_map = get_model_name_map(flat_models) 119 definitions = {} 120 output_schema: Dict[str, Any] = {} 121 if title: 122 output_schema['title'] = title 123 if description: 124 output_schema['description'] = description 125 for model in clean_models: 126 m_schema, m_definitions, m_nested_models = model_process_schema( 127 model, 128 by_alias=by_alias, 129 model_name_map=model_name_map, 130 ref_prefix=ref_prefix, 131 ref_template=ref_template, 132 ) 133 definitions.update(m_definitions) 134 model_name = model_name_map[model] 135 definitions[model_name] = m_schema 136 if definitions: 137 output_schema['definitions'] = definitions 138 return output_schema 139 140 141def model_schema( 142 model: Union[Type['BaseModel'], Type['Dataclass']], 143 by_alias: bool = True, 144 ref_prefix: Optional[str] = None, 145 ref_template: str = default_ref_template, 146) -> Dict[str, Any]: 147 """ 148 Generate a JSON Schema for one model. With all the sub-models defined in the ``definitions`` top-level 149 JSON key. 150 151 :param model: a Pydantic model (a class that inherits from BaseModel) 152 :param by_alias: generate the schemas using the aliases defined, if any 153 :param ref_prefix: the JSON Pointer prefix for schema references with ``$ref``, if None, will be set to the 154 default of ``#/definitions/``. Update it if you want the schemas to reference the definitions somewhere 155 else, e.g. for OpenAPI use ``#/components/schemas/``. The resulting generated schemas will still be at the 156 top-level key ``definitions``, so you can extract them from there. But all the references will have the set 157 prefix. 158 :param ref_template: Use a ``string.format()`` template for ``$ref`` instead of a prefix. This can be useful for 159 references that cannot be represented by ``ref_prefix`` such as a definition stored in another file. For a 160 sibling json file in a ``/schemas`` directory use ``"/schemas/${model}.json#"``. 161 :return: dict with the JSON Schema for the passed ``model`` 162 """ 163 model = get_model(model) 164 flat_models = get_flat_models_from_model(model) 165 model_name_map = get_model_name_map(flat_models) 166 model_name = model_name_map[model] 167 m_schema, m_definitions, nested_models = model_process_schema( 168 model, by_alias=by_alias, model_name_map=model_name_map, ref_prefix=ref_prefix, ref_template=ref_template 169 ) 170 if model_name in nested_models: 171 # model_name is in Nested models, it has circular references 172 m_definitions[model_name] = m_schema 173 m_schema = get_schema_ref(model_name, ref_prefix, ref_template, False) 174 if m_definitions: 175 m_schema.update({'definitions': m_definitions}) 176 return m_schema 177 178 179def get_field_info_schema(field: ModelField) -> Tuple[Dict[str, Any], bool]: 180 schema_overrides = False 181 182 # If no title is explicitly set, we don't set title in the schema for enums. 183 # The behaviour is the same as `BaseModel` reference, where the default title 184 # is in the definitions part of the schema. 185 schema: Dict[str, Any] = {} 186 if field.field_info.title or not lenient_issubclass(field.type_, Enum): 187 schema['title'] = field.field_info.title or field.alias.title().replace('_', ' ') 188 189 if field.field_info.title: 190 schema_overrides = True 191 192 if field.field_info.description: 193 schema['description'] = field.field_info.description 194 schema_overrides = True 195 196 if ( 197 not field.required 198 and not field.field_info.const 199 and field.default is not None 200 and not is_callable_type(field.outer_type_) 201 ): 202 schema['default'] = encode_default(field.default) 203 schema_overrides = True 204 205 return schema, schema_overrides 206 207 208def field_schema( 209 field: ModelField, 210 *, 211 by_alias: bool = True, 212 model_name_map: Dict[TypeModelOrEnum, str], 213 ref_prefix: Optional[str] = None, 214 ref_template: str = default_ref_template, 215 known_models: TypeModelSet = None, 216) -> Tuple[Dict[str, Any], Dict[str, Any], Set[str]]: 217 """ 218 Process a Pydantic field and return a tuple with a JSON Schema for it as the first item. 219 Also return a dictionary of definitions with models as keys and their schemas as values. If the passed field 220 is a model and has sub-models, and those sub-models don't have overrides (as ``title``, ``default``, etc), they 221 will be included in the definitions and referenced in the schema instead of included recursively. 222 223 :param field: a Pydantic ``ModelField`` 224 :param by_alias: use the defined alias (if any) in the returned schema 225 :param model_name_map: used to generate the JSON Schema references to other models included in the definitions 226 :param ref_prefix: the JSON Pointer prefix to use for references to other schemas, if None, the default of 227 #/definitions/ will be used 228 :param ref_template: Use a ``string.format()`` template for ``$ref`` instead of a prefix. This can be useful for 229 references that cannot be represented by ``ref_prefix`` such as a definition stored in another file. For a 230 sibling json file in a ``/schemas`` directory use ``"/schemas/${model}.json#"``. 231 :param known_models: used to solve circular references 232 :return: tuple of the schema for this field and additional definitions 233 """ 234 s, schema_overrides = get_field_info_schema(field) 235 236 validation_schema = get_field_schema_validations(field) 237 if validation_schema: 238 s.update(validation_schema) 239 schema_overrides = True 240 241 f_schema, f_definitions, f_nested_models = field_type_schema( 242 field, 243 by_alias=by_alias, 244 model_name_map=model_name_map, 245 schema_overrides=schema_overrides, 246 ref_prefix=ref_prefix, 247 ref_template=ref_template, 248 known_models=known_models or set(), 249 ) 250 # $ref will only be returned when there are no schema_overrides 251 if '$ref' in f_schema: 252 return f_schema, f_definitions, f_nested_models 253 else: 254 s.update(f_schema) 255 return s, f_definitions, f_nested_models 256 257 258numeric_types = (int, float, Decimal) 259_str_types_attrs: Tuple[Tuple[str, Union[type, Tuple[type, ...]], str], ...] = ( 260 ('max_length', numeric_types, 'maxLength'), 261 ('min_length', numeric_types, 'minLength'), 262 ('regex', str, 'pattern'), 263) 264 265_numeric_types_attrs: Tuple[Tuple[str, Union[type, Tuple[type, ...]], str], ...] = ( 266 ('gt', numeric_types, 'exclusiveMinimum'), 267 ('lt', numeric_types, 'exclusiveMaximum'), 268 ('ge', numeric_types, 'minimum'), 269 ('le', numeric_types, 'maximum'), 270 ('multiple_of', numeric_types, 'multipleOf'), 271) 272 273 274def get_field_schema_validations(field: ModelField) -> Dict[str, Any]: 275 """ 276 Get the JSON Schema validation keywords for a ``field`` with an annotation of 277 a Pydantic ``FieldInfo`` with validation arguments. 278 """ 279 f_schema: Dict[str, Any] = {} 280 281 if lenient_issubclass(field.type_, Enum): 282 # schema is already updated by `enum_process_schema`; just update with field extra 283 if field.field_info.extra: 284 f_schema.update(field.field_info.extra) 285 return f_schema 286 287 if lenient_issubclass(field.type_, (str, bytes)): 288 for attr_name, t, keyword in _str_types_attrs: 289 attr = getattr(field.field_info, attr_name, None) 290 if isinstance(attr, t): 291 f_schema[keyword] = attr 292 if lenient_issubclass(field.type_, numeric_types) and not issubclass(field.type_, bool): 293 for attr_name, t, keyword in _numeric_types_attrs: 294 attr = getattr(field.field_info, attr_name, None) 295 if isinstance(attr, t): 296 f_schema[keyword] = attr 297 if field.field_info is not None and field.field_info.const: 298 f_schema['const'] = field.default 299 if field.field_info.extra: 300 f_schema.update(field.field_info.extra) 301 modify_schema = getattr(field.outer_type_, '__modify_schema__', None) 302 if modify_schema: 303 modify_schema(f_schema) 304 return f_schema 305 306 307def get_model_name_map(unique_models: TypeModelSet) -> Dict[TypeModelOrEnum, str]: 308 """ 309 Process a set of models and generate unique names for them to be used as keys in the JSON Schema 310 definitions. By default the names are the same as the class name. But if two models in different Python 311 modules have the same name (e.g. "users.Model" and "items.Model"), the generated names will be 312 based on the Python module path for those conflicting models to prevent name collisions. 313 314 :param unique_models: a Python set of models 315 :return: dict mapping models to names 316 """ 317 name_model_map = {} 318 conflicting_names: Set[str] = set() 319 for model in unique_models: 320 model_name = normalize_name(model.__name__) 321 if model_name in conflicting_names: 322 model_name = get_long_model_name(model) 323 name_model_map[model_name] = model 324 elif model_name in name_model_map: 325 conflicting_names.add(model_name) 326 conflicting_model = name_model_map.pop(model_name) 327 name_model_map[get_long_model_name(conflicting_model)] = conflicting_model 328 name_model_map[get_long_model_name(model)] = model 329 else: 330 name_model_map[model_name] = model 331 return {v: k for k, v in name_model_map.items()} 332 333 334def get_flat_models_from_model(model: Type['BaseModel'], known_models: TypeModelSet = None) -> TypeModelSet: 335 """ 336 Take a single ``model`` and generate a set with itself and all the sub-models in the tree. I.e. if you pass 337 model ``Foo`` (subclass of Pydantic ``BaseModel``) as ``model``, and it has a field of type ``Bar`` (also 338 subclass of ``BaseModel``) and that model ``Bar`` has a field of type ``Baz`` (also subclass of ``BaseModel``), 339 the return value will be ``set([Foo, Bar, Baz])``. 340 341 :param model: a Pydantic ``BaseModel`` subclass 342 :param known_models: used to solve circular references 343 :return: a set with the initial model and all its sub-models 344 """ 345 known_models = known_models or set() 346 flat_models: TypeModelSet = set() 347 flat_models.add(model) 348 known_models |= flat_models 349 fields = cast(Sequence[ModelField], model.__fields__.values()) 350 flat_models |= get_flat_models_from_fields(fields, known_models=known_models) 351 return flat_models 352 353 354def get_flat_models_from_field(field: ModelField, known_models: TypeModelSet) -> TypeModelSet: 355 """ 356 Take a single Pydantic ``ModelField`` (from a model) that could have been declared as a sublcass of BaseModel 357 (so, it could be a submodel), and generate a set with its model and all the sub-models in the tree. 358 I.e. if you pass a field that was declared to be of type ``Foo`` (subclass of BaseModel) as ``field``, and that 359 model ``Foo`` has a field of type ``Bar`` (also subclass of ``BaseModel``) and that model ``Bar`` has a field of 360 type ``Baz`` (also subclass of ``BaseModel``), the return value will be ``set([Foo, Bar, Baz])``. 361 362 :param field: a Pydantic ``ModelField`` 363 :param known_models: used to solve circular references 364 :return: a set with the model used in the declaration for this field, if any, and all its sub-models 365 """ 366 from .dataclasses import dataclass, is_builtin_dataclass 367 from .main import BaseModel # noqa: F811 368 369 flat_models: TypeModelSet = set() 370 371 # Handle dataclass-based models 372 if is_builtin_dataclass(field.type_): 373 field.type_ = dataclass(field.type_) 374 field_type = field.type_ 375 if lenient_issubclass(getattr(field_type, '__pydantic_model__', None), BaseModel): 376 field_type = field_type.__pydantic_model__ 377 if field.sub_fields: 378 flat_models |= get_flat_models_from_fields(field.sub_fields, known_models=known_models) 379 elif lenient_issubclass(field_type, BaseModel) and field_type not in known_models: 380 flat_models |= get_flat_models_from_model(field_type, known_models=known_models) 381 elif lenient_issubclass(field_type, Enum): 382 flat_models.add(field_type) 383 return flat_models 384 385 386def get_flat_models_from_fields(fields: Sequence[ModelField], known_models: TypeModelSet) -> TypeModelSet: 387 """ 388 Take a list of Pydantic ``ModelField``s (from a model) that could have been declared as sublcasses of ``BaseModel`` 389 (so, any of them could be a submodel), and generate a set with their models and all the sub-models in the tree. 390 I.e. if you pass a the fields of a model ``Foo`` (subclass of ``BaseModel``) as ``fields``, and on of them has a 391 field of type ``Bar`` (also subclass of ``BaseModel``) and that model ``Bar`` has a field of type ``Baz`` (also 392 subclass of ``BaseModel``), the return value will be ``set([Foo, Bar, Baz])``. 393 394 :param fields: a list of Pydantic ``ModelField``s 395 :param known_models: used to solve circular references 396 :return: a set with any model declared in the fields, and all their sub-models 397 """ 398 flat_models: TypeModelSet = set() 399 for field in fields: 400 flat_models |= get_flat_models_from_field(field, known_models=known_models) 401 return flat_models 402 403 404def get_flat_models_from_models(models: Sequence[Type['BaseModel']]) -> TypeModelSet: 405 """ 406 Take a list of ``models`` and generate a set with them and all their sub-models in their trees. I.e. if you pass 407 a list of two models, ``Foo`` and ``Bar``, both subclasses of Pydantic ``BaseModel`` as models, and ``Bar`` has 408 a field of type ``Baz`` (also subclass of ``BaseModel``), the return value will be ``set([Foo, Bar, Baz])``. 409 """ 410 flat_models: TypeModelSet = set() 411 for model in models: 412 flat_models |= get_flat_models_from_model(model) 413 return flat_models 414 415 416def get_long_model_name(model: TypeModelOrEnum) -> str: 417 return f'{model.__module__}__{model.__qualname__}'.replace('.', '__') 418 419 420def field_type_schema( 421 field: ModelField, 422 *, 423 by_alias: bool, 424 model_name_map: Dict[TypeModelOrEnum, str], 425 ref_template: str, 426 schema_overrides: bool = False, 427 ref_prefix: Optional[str] = None, 428 known_models: TypeModelSet, 429) -> Tuple[Dict[str, Any], Dict[str, Any], Set[str]]: 430 """ 431 Used by ``field_schema()``, you probably should be using that function. 432 433 Take a single ``field`` and generate the schema for its type only, not including additional 434 information as title, etc. Also return additional schema definitions, from sub-models. 435 """ 436 definitions = {} 437 nested_models: Set[str] = set() 438 f_schema: Dict[str, Any] 439 if field.shape in {SHAPE_LIST, SHAPE_TUPLE_ELLIPSIS, SHAPE_SEQUENCE, SHAPE_SET, SHAPE_FROZENSET, SHAPE_ITERABLE}: 440 items_schema, f_definitions, f_nested_models = field_singleton_schema( 441 field, 442 by_alias=by_alias, 443 model_name_map=model_name_map, 444 ref_prefix=ref_prefix, 445 ref_template=ref_template, 446 known_models=known_models, 447 ) 448 definitions.update(f_definitions) 449 nested_models.update(f_nested_models) 450 f_schema = {'type': 'array', 'items': items_schema} 451 if field.shape in {SHAPE_SET, SHAPE_FROZENSET}: 452 f_schema['uniqueItems'] = True 453 454 elif field.shape in MAPPING_LIKE_SHAPES: 455 f_schema = {'type': 'object'} 456 key_field = cast(ModelField, field.key_field) 457 regex = getattr(key_field.type_, 'regex', None) 458 items_schema, f_definitions, f_nested_models = field_singleton_schema( 459 field, 460 by_alias=by_alias, 461 model_name_map=model_name_map, 462 ref_prefix=ref_prefix, 463 ref_template=ref_template, 464 known_models=known_models, 465 ) 466 definitions.update(f_definitions) 467 nested_models.update(f_nested_models) 468 if regex: 469 # Dict keys have a regex pattern 470 # items_schema might be a schema or empty dict, add it either way 471 f_schema['patternProperties'] = {regex.pattern: items_schema} 472 elif items_schema: 473 # The dict values are not simply Any, so they need a schema 474 f_schema['additionalProperties'] = items_schema 475 elif field.shape == SHAPE_TUPLE: 476 sub_schema = [] 477 sub_fields = cast(List[ModelField], field.sub_fields) 478 for sf in sub_fields: 479 sf_schema, sf_definitions, sf_nested_models = field_type_schema( 480 sf, 481 by_alias=by_alias, 482 model_name_map=model_name_map, 483 ref_prefix=ref_prefix, 484 ref_template=ref_template, 485 known_models=known_models, 486 ) 487 definitions.update(sf_definitions) 488 nested_models.update(sf_nested_models) 489 sub_schema.append(sf_schema) 490 if len(sub_schema) == 1: 491 sub_schema = sub_schema[0] # type: ignore 492 f_schema = {'type': 'array', 'items': sub_schema} 493 else: 494 assert field.shape in {SHAPE_SINGLETON, SHAPE_GENERIC}, field.shape 495 f_schema, f_definitions, f_nested_models = field_singleton_schema( 496 field, 497 by_alias=by_alias, 498 model_name_map=model_name_map, 499 schema_overrides=schema_overrides, 500 ref_prefix=ref_prefix, 501 ref_template=ref_template, 502 known_models=known_models, 503 ) 504 definitions.update(f_definitions) 505 nested_models.update(f_nested_models) 506 507 # check field type to avoid repeated calls to the same __modify_schema__ method 508 if field.type_ != field.outer_type_: 509 if field.shape == SHAPE_GENERIC: 510 field_type = field.type_ 511 else: 512 field_type = field.outer_type_ 513 modify_schema = getattr(field_type, '__modify_schema__', None) 514 if modify_schema: 515 modify_schema(f_schema) 516 return f_schema, definitions, nested_models 517 518 519def model_process_schema( 520 model: TypeModelOrEnum, 521 *, 522 by_alias: bool = True, 523 model_name_map: Dict[TypeModelOrEnum, str], 524 ref_prefix: Optional[str] = None, 525 ref_template: str = default_ref_template, 526 known_models: TypeModelSet = None, 527) -> Tuple[Dict[str, Any], Dict[str, Any], Set[str]]: 528 """ 529 Used by ``model_schema()``, you probably should be using that function. 530 531 Take a single ``model`` and generate its schema. Also return additional schema definitions, from sub-models. The 532 sub-models of the returned schema will be referenced, but their definitions will not be included in the schema. All 533 the definitions are returned as the second value. 534 """ 535 from inspect import getdoc, signature 536 537 known_models = known_models or set() 538 if lenient_issubclass(model, Enum): 539 model = cast(Type[Enum], model) 540 s = enum_process_schema(model) 541 return s, {}, set() 542 model = cast(Type['BaseModel'], model) 543 s = {'title': model.__config__.title or model.__name__} 544 doc = getdoc(model) 545 if doc: 546 s['description'] = doc 547 known_models.add(model) 548 m_schema, m_definitions, nested_models = model_type_schema( 549 model, 550 by_alias=by_alias, 551 model_name_map=model_name_map, 552 ref_prefix=ref_prefix, 553 ref_template=ref_template, 554 known_models=known_models, 555 ) 556 s.update(m_schema) 557 schema_extra = model.__config__.schema_extra 558 if callable(schema_extra): 559 if len(signature(schema_extra).parameters) == 1: 560 schema_extra(s) 561 else: 562 schema_extra(s, model) 563 else: 564 s.update(schema_extra) 565 return s, m_definitions, nested_models 566 567 568def model_type_schema( 569 model: Type['BaseModel'], 570 *, 571 by_alias: bool, 572 model_name_map: Dict[TypeModelOrEnum, str], 573 ref_template: str, 574 ref_prefix: Optional[str] = None, 575 known_models: TypeModelSet, 576) -> Tuple[Dict[str, Any], Dict[str, Any], Set[str]]: 577 """ 578 You probably should be using ``model_schema()``, this function is indirectly used by that function. 579 580 Take a single ``model`` and generate the schema for its type only, not including additional 581 information as title, etc. Also return additional schema definitions, from sub-models. 582 """ 583 properties = {} 584 required = [] 585 definitions: Dict[str, Any] = {} 586 nested_models: Set[str] = set() 587 for k, f in model.__fields__.items(): 588 try: 589 f_schema, f_definitions, f_nested_models = field_schema( 590 f, 591 by_alias=by_alias, 592 model_name_map=model_name_map, 593 ref_prefix=ref_prefix, 594 ref_template=ref_template, 595 known_models=known_models, 596 ) 597 except SkipField as skip: 598 warnings.warn(skip.message, UserWarning) 599 continue 600 definitions.update(f_definitions) 601 nested_models.update(f_nested_models) 602 if by_alias: 603 properties[f.alias] = f_schema 604 if f.required: 605 required.append(f.alias) 606 else: 607 properties[k] = f_schema 608 if f.required: 609 required.append(k) 610 if ROOT_KEY in properties: 611 out_schema = properties[ROOT_KEY] 612 out_schema['title'] = model.__config__.title or model.__name__ 613 else: 614 out_schema = {'type': 'object', 'properties': properties} 615 if required: 616 out_schema['required'] = required 617 if model.__config__.extra == 'forbid': 618 out_schema['additionalProperties'] = False 619 return out_schema, definitions, nested_models 620 621 622def enum_process_schema(enum: Type[Enum]) -> Dict[str, Any]: 623 """ 624 Take a single `enum` and generate its schema. 625 626 This is similar to the `model_process_schema` function, but applies to ``Enum`` objects. 627 """ 628 from inspect import getdoc 629 630 schema: Dict[str, Any] = { 631 'title': enum.__name__, 632 # Python assigns all enums a default docstring value of 'An enumeration', so 633 # all enums will have a description field even if not explicitly provided. 634 'description': getdoc(enum), 635 # Add enum values and the enum field type to the schema. 636 'enum': [item.value for item in cast(Iterable[Enum], enum)], 637 } 638 639 add_field_type_to_schema(enum, schema) 640 641 modify_schema = getattr(enum, '__modify_schema__', None) 642 if modify_schema: 643 modify_schema(schema) 644 645 return schema 646 647 648def field_singleton_sub_fields_schema( 649 sub_fields: Sequence[ModelField], 650 *, 651 by_alias: bool, 652 model_name_map: Dict[TypeModelOrEnum, str], 653 ref_template: str, 654 schema_overrides: bool = False, 655 ref_prefix: Optional[str] = None, 656 known_models: TypeModelSet, 657) -> Tuple[Dict[str, Any], Dict[str, Any], Set[str]]: 658 """ 659 This function is indirectly used by ``field_schema()``, you probably should be using that function. 660 661 Take a list of Pydantic ``ModelField`` from the declaration of a type with parameters, and generate their 662 schema. I.e., fields used as "type parameters", like ``str`` and ``int`` in ``Tuple[str, int]``. 663 """ 664 definitions = {} 665 nested_models: Set[str] = set() 666 if len(sub_fields) == 1: 667 return field_type_schema( 668 sub_fields[0], 669 by_alias=by_alias, 670 model_name_map=model_name_map, 671 schema_overrides=schema_overrides, 672 ref_prefix=ref_prefix, 673 ref_template=ref_template, 674 known_models=known_models, 675 ) 676 else: 677 sub_field_schemas = [] 678 for sf in sub_fields: 679 sub_schema, sub_definitions, sub_nested_models = field_type_schema( 680 sf, 681 by_alias=by_alias, 682 model_name_map=model_name_map, 683 schema_overrides=schema_overrides, 684 ref_prefix=ref_prefix, 685 ref_template=ref_template, 686 known_models=known_models, 687 ) 688 definitions.update(sub_definitions) 689 if schema_overrides and 'allOf' in sub_schema: 690 # if the sub_field is a referenced schema we only need the referenced 691 # object. Otherwise we will end up with several allOf inside anyOf. 692 # See https://github.com/samuelcolvin/pydantic/issues/1209 693 sub_schema = sub_schema['allOf'][0] 694 sub_field_schemas.append(sub_schema) 695 nested_models.update(sub_nested_models) 696 return {'anyOf': sub_field_schemas}, definitions, nested_models 697 698 699# Order is important, e.g. subclasses of str must go before str 700# this is used only for standard library types, custom types should use __modify_schema__ instead 701field_class_to_schema: Tuple[Tuple[Any, Dict[str, Any]], ...] = ( 702 (Path, {'type': 'string', 'format': 'path'}), 703 (datetime, {'type': 'string', 'format': 'date-time'}), 704 (date, {'type': 'string', 'format': 'date'}), 705 (time, {'type': 'string', 'format': 'time'}), 706 (timedelta, {'type': 'number', 'format': 'time-delta'}), 707 (IPv4Network, {'type': 'string', 'format': 'ipv4network'}), 708 (IPv6Network, {'type': 'string', 'format': 'ipv6network'}), 709 (IPv4Interface, {'type': 'string', 'format': 'ipv4interface'}), 710 (IPv6Interface, {'type': 'string', 'format': 'ipv6interface'}), 711 (IPv4Address, {'type': 'string', 'format': 'ipv4'}), 712 (IPv6Address, {'type': 'string', 'format': 'ipv6'}), 713 (Pattern, {'type': 'string', 'format': 'regex'}), 714 (str, {'type': 'string'}), 715 (bytes, {'type': 'string', 'format': 'binary'}), 716 (bool, {'type': 'boolean'}), 717 (int, {'type': 'integer'}), 718 (float, {'type': 'number'}), 719 (Decimal, {'type': 'number'}), 720 (UUID, {'type': 'string', 'format': 'uuid'}), 721 (dict, {'type': 'object'}), 722 (list, {'type': 'array', 'items': {}}), 723 (tuple, {'type': 'array', 'items': {}}), 724 (set, {'type': 'array', 'items': {}, 'uniqueItems': True}), 725 (frozenset, {'type': 'array', 'items': {}, 'uniqueItems': True}), 726) 727 728json_scheme = {'type': 'string', 'format': 'json-string'} 729 730 731def add_field_type_to_schema(field_type: Any, schema: Dict[str, Any]) -> None: 732 """ 733 Update the given `schema` with the type-specific metadata for the given `field_type`. 734 735 This function looks through `field_class_to_schema` for a class that matches the given `field_type`, 736 and then modifies the given `schema` with the information from that type. 737 """ 738 for type_, t_schema in field_class_to_schema: 739 # Fallback for `typing.Pattern` as it is not a valid class 740 if lenient_issubclass(field_type, type_) or field_type is type_ is Pattern: 741 schema.update(t_schema) 742 break 743 744 745def get_schema_ref(name: str, ref_prefix: Optional[str], ref_template: str, schema_overrides: bool) -> Dict[str, Any]: 746 if ref_prefix: 747 schema_ref = {'$ref': ref_prefix + name} 748 else: 749 schema_ref = {'$ref': ref_template.format(model=name)} 750 return {'allOf': [schema_ref]} if schema_overrides else schema_ref 751 752 753def field_singleton_schema( # noqa: C901 (ignore complexity) 754 field: ModelField, 755 *, 756 by_alias: bool, 757 model_name_map: Dict[TypeModelOrEnum, str], 758 ref_template: str, 759 schema_overrides: bool = False, 760 ref_prefix: Optional[str] = None, 761 known_models: TypeModelSet, 762) -> Tuple[Dict[str, Any], Dict[str, Any], Set[str]]: 763 """ 764 This function is indirectly used by ``field_schema()``, you should probably be using that function. 765 766 Take a single Pydantic ``ModelField``, and return its schema and any additional definitions from sub-models. 767 """ 768 from .main import BaseModel # noqa: F811 769 770 definitions: Dict[str, Any] = {} 771 nested_models: Set[str] = set() 772 if field.sub_fields: 773 return field_singleton_sub_fields_schema( 774 field.sub_fields, 775 by_alias=by_alias, 776 model_name_map=model_name_map, 777 schema_overrides=schema_overrides, 778 ref_prefix=ref_prefix, 779 ref_template=ref_template, 780 known_models=known_models, 781 ) 782 if field.type_ is Any or field.type_.__class__ == TypeVar: 783 return {}, definitions, nested_models # no restrictions 784 if field.type_ in NONE_TYPES: 785 return {'type': 'null'}, definitions, nested_models 786 if is_callable_type(field.type_): 787 raise SkipField(f'Callable {field.name} was excluded from schema since JSON schema has no equivalent type.') 788 f_schema: Dict[str, Any] = {} 789 if field.field_info is not None and field.field_info.const: 790 f_schema['const'] = field.default 791 792 field_type = field.type_ 793 if is_literal_type(field_type): 794 values = all_literal_values(field_type) 795 796 if len({v.__class__ for v in values}) > 1: 797 return field_schema( 798 multitypes_literal_field_for_schema(values, field), 799 by_alias=by_alias, 800 model_name_map=model_name_map, 801 ref_prefix=ref_prefix, 802 ref_template=ref_template, 803 known_models=known_models, 804 ) 805 806 # All values have the same type 807 field_type = values[0].__class__ 808 f_schema['enum'] = list(values) 809 add_field_type_to_schema(field_type, f_schema) 810 elif lenient_issubclass(field_type, Enum): 811 enum_name = model_name_map[field_type] 812 f_schema, schema_overrides = get_field_info_schema(field) 813 f_schema.update(get_schema_ref(enum_name, ref_prefix, ref_template, schema_overrides)) 814 definitions[enum_name] = enum_process_schema(field_type) 815 elif is_namedtuple(field_type): 816 sub_schema, *_ = model_process_schema( 817 field_type.__pydantic_model__, 818 by_alias=by_alias, 819 model_name_map=model_name_map, 820 ref_prefix=ref_prefix, 821 ref_template=ref_template, 822 known_models=known_models, 823 ) 824 f_schema.update({'type': 'array', 'items': list(sub_schema['properties'].values())}) 825 elif not hasattr(field_type, '__pydantic_model__'): 826 add_field_type_to_schema(field_type, f_schema) 827 828 modify_schema = getattr(field_type, '__modify_schema__', None) 829 if modify_schema: 830 modify_schema(f_schema) 831 832 if f_schema: 833 return f_schema, definitions, nested_models 834 835 # Handle dataclass-based models 836 if lenient_issubclass(getattr(field_type, '__pydantic_model__', None), BaseModel): 837 field_type = field_type.__pydantic_model__ 838 839 if issubclass(field_type, BaseModel): 840 model_name = model_name_map[field_type] 841 if field_type not in known_models: 842 sub_schema, sub_definitions, sub_nested_models = model_process_schema( 843 field_type, 844 by_alias=by_alias, 845 model_name_map=model_name_map, 846 ref_prefix=ref_prefix, 847 ref_template=ref_template, 848 known_models=known_models, 849 ) 850 definitions.update(sub_definitions) 851 definitions[model_name] = sub_schema 852 nested_models.update(sub_nested_models) 853 else: 854 nested_models.add(model_name) 855 schema_ref = get_schema_ref(model_name, ref_prefix, ref_template, schema_overrides) 856 return schema_ref, definitions, nested_models 857 858 # For generics with no args 859 args = get_args(field_type) 860 if args is not None and not args and Generic in field_type.__bases__: 861 return f_schema, definitions, nested_models 862 863 raise ValueError(f'Value not declarable with JSON Schema, field: {field}') 864 865 866def multitypes_literal_field_for_schema(values: Tuple[Any, ...], field: ModelField) -> ModelField: 867 """ 868 To support `Literal` with values of different types, we split it into multiple `Literal` with same type 869 e.g. `Literal['qwe', 'asd', 1, 2]` becomes `Union[Literal['qwe', 'asd'], Literal[1, 2]]` 870 """ 871 literal_distinct_types = defaultdict(list) 872 for v in values: 873 literal_distinct_types[v.__class__].append(v) 874 distinct_literals = (Literal[tuple(same_type_values)] for same_type_values in literal_distinct_types.values()) 875 876 return ModelField( 877 name=field.name, 878 type_=Union[tuple(distinct_literals)], # type: ignore 879 class_validators=field.class_validators, 880 model_config=field.model_config, 881 default=field.default, 882 required=field.required, 883 alias=field.alias, 884 field_info=field.field_info, 885 ) 886 887 888def encode_default(dft: Any) -> Any: 889 if isinstance(dft, (int, float, str)): 890 return dft 891 elif sequence_like(dft): 892 t = dft.__class__ 893 return t(encode_default(v) for v in dft) 894 elif isinstance(dft, dict): 895 return {encode_default(k): encode_default(v) for k, v in dft.items()} 896 elif dft is None: 897 return None 898 else: 899 return pydantic_encoder(dft) 900 901 902_map_types_constraint: Dict[Any, Callable[..., type]] = {int: conint, float: confloat, Decimal: condecimal} 903 904 905def get_annotation_from_field_info( 906 annotation: Any, field_info: FieldInfo, field_name: str, validate_assignment: bool = False 907) -> Type[Any]: 908 """ 909 Get an annotation with validation implemented for numbers and strings based on the field_info. 910 :param annotation: an annotation from a field specification, as ``str``, ``ConstrainedStr`` 911 :param field_info: an instance of FieldInfo, possibly with declarations for validations and JSON Schema 912 :param field_name: name of the field for use in error messages 913 :param validate_assignment: default False, flag for BaseModel Config value of validate_assignment 914 :return: the same ``annotation`` if unmodified or a new annotation with validation in place 915 """ 916 constraints = field_info.get_constraints() 917 918 used_constraints: Set[str] = set() 919 if constraints: 920 annotation, used_constraints = get_annotation_with_constraints(annotation, field_info) 921 922 if validate_assignment: 923 used_constraints.add('allow_mutation') 924 925 unused_constraints = constraints - used_constraints 926 if unused_constraints: 927 raise ValueError( 928 f'On field "{field_name}" the following field constraints are set but not enforced: ' 929 f'{", ".join(unused_constraints)}. ' 930 f'\nFor more details see https://pydantic-docs.helpmanual.io/usage/schema/#unenforced-field-constraints' 931 ) 932 933 return annotation 934 935 936def get_annotation_with_constraints(annotation: Any, field_info: FieldInfo) -> Tuple[Type[Any], Set[str]]: # noqa: C901 937 """ 938 Get an annotation with used constraints implemented for numbers and strings based on the field_info. 939 940 :param annotation: an annotation from a field specification, as ``str``, ``ConstrainedStr`` 941 :param field_info: an instance of FieldInfo, possibly with declarations for validations and JSON Schema 942 :return: the same ``annotation`` if unmodified or a new annotation along with the used constraints. 943 """ 944 used_constraints: Set[str] = set() 945 946 def go(type_: Any) -> Type[Any]: 947 if ( 948 is_literal_type(annotation) 949 or isinstance(type_, ForwardRef) 950 or lenient_issubclass(type_, (ConstrainedList, ConstrainedSet)) 951 ): 952 return type_ 953 origin = get_origin(type_) 954 if origin is not None: 955 args: Tuple[Any, ...] = get_args(type_) 956 if any(isinstance(a, ForwardRef) for a in args): 957 # forward refs cause infinite recursion below 958 return type_ 959 960 if origin is Annotated: 961 return go(args[0]) 962 if origin is Union: 963 return Union[tuple(go(a) for a in args)] # type: ignore 964 965 if issubclass(origin, List) and (field_info.min_items is not None or field_info.max_items is not None): 966 used_constraints.update({'min_items', 'max_items'}) 967 return conlist(go(args[0]), min_items=field_info.min_items, max_items=field_info.max_items) 968 969 if issubclass(origin, Set) and (field_info.min_items is not None or field_info.max_items is not None): 970 used_constraints.update({'min_items', 'max_items'}) 971 return conset(go(args[0]), min_items=field_info.min_items, max_items=field_info.max_items) 972 973 for t in (Tuple, List, Set, FrozenSet, Sequence): 974 if issubclass(origin, t): # type: ignore 975 return t[tuple(go(a) for a in args)] # type: ignore 976 977 if issubclass(origin, Dict): 978 return Dict[args[0], go(args[1])] # type: ignore 979 980 attrs: Optional[Tuple[str, ...]] = None 981 constraint_func: Optional[Callable[..., type]] = None 982 if isinstance(type_, type): 983 if issubclass(type_, (SecretStr, SecretBytes)): 984 attrs = ('max_length', 'min_length') 985 986 def constraint_func(**kwargs: Any) -> Type[Any]: 987 return type(type_.__name__, (type_,), kwargs) 988 989 elif issubclass(type_, str) and not issubclass(type_, (EmailStr, AnyUrl, ConstrainedStr)): 990 attrs = ('max_length', 'min_length', 'regex') 991 constraint_func = constr 992 elif issubclass(type_, bytes): 993 attrs = ('max_length', 'min_length', 'regex') 994 constraint_func = conbytes 995 elif issubclass(type_, numeric_types) and not issubclass( 996 type_, (ConstrainedInt, ConstrainedFloat, ConstrainedDecimal, ConstrainedList, ConstrainedSet, bool) 997 ): 998 # Is numeric type 999 attrs = ('gt', 'lt', 'ge', 'le', 'multiple_of') 1000 numeric_type = next(t for t in numeric_types if issubclass(type_, t)) # pragma: no branch 1001 constraint_func = _map_types_constraint[numeric_type] 1002 1003 if attrs: 1004 used_constraints.update(set(attrs)) 1005 kwargs = { 1006 attr_name: attr 1007 for attr_name, attr in ((attr_name, getattr(field_info, attr_name)) for attr_name in attrs) 1008 if attr is not None 1009 } 1010 if kwargs: 1011 constraint_func = cast(Callable[..., type], constraint_func) 1012 return constraint_func(**kwargs) 1013 return type_ 1014 1015 ans = go(annotation) 1016 1017 return ans, used_constraints 1018 1019 1020def normalize_name(name: str) -> str: 1021 """ 1022 Normalizes the given name. This can be applied to either a model *or* enum. 1023 """ 1024 return re.sub(r'[^a-zA-Z0-9.\-_]', '_', name) 1025 1026 1027class SkipField(Exception): 1028 """ 1029 Utility exception used to exclude fields from schema. 1030 """ 1031 1032 def __init__(self, message: str) -> None: 1033 self.message = message 1034