1try: 2 import collections.abc as collections_abc # only works on python 3.3+ 3except ImportError: 4 import collections as collections_abc 5 6from fnmatch import fnmatch 7 8from elasticsearch.exceptions import NotFoundError, RequestError 9from six import iteritems, add_metaclass, string_types 10 11from .field import Field 12from .mapping import Mapping 13from .utils import ObjectBase, merge, DOC_META_FIELDS, META_FIELDS 14from .search import Search 15from .connections import connections 16from .exceptions import ValidationException, IllegalOperation 17from .index import Index 18 19 20class MetaField(object): 21 def __init__(self, *args, **kwargs): 22 self.args, self.kwargs = args, kwargs 23 24 25class DocumentMeta(type): 26 def __new__(cls, name, bases, attrs): 27 # DocumentMeta filters attrs in place 28 attrs['_doc_type'] = DocumentOptions(name, bases, attrs) 29 return super(DocumentMeta, cls).__new__(cls, name, bases, attrs) 30 31class IndexMeta(DocumentMeta): 32 # global flag to guard us from associating an Index with the base Document 33 # class, only user defined subclasses should have an _index attr 34 _document_initialized = False 35 36 def __new__(cls, name, bases, attrs): 37 new_cls = super(IndexMeta, cls).__new__(cls, name, bases, attrs) 38 if cls._document_initialized: 39 index_opts = attrs.pop('Index', None) 40 index = cls.construct_index(index_opts, bases) 41 new_cls._index = index 42 index.document(new_cls) 43 cls._document_initialized = True 44 return new_cls 45 46 @classmethod 47 def construct_index(cls, opts, bases): 48 if opts is None: 49 for b in bases: 50 if hasattr(b, '_index'): 51 return b._index 52 53 # Set None as Index name so it will set _all while making the query 54 return Index(name=None) 55 56 i = Index( 57 getattr(opts, 'name', '*'), 58 using=getattr(opts, 'using', 'default') 59 ) 60 i.settings(**getattr(opts, 'settings', {})) 61 i.aliases(**getattr(opts, 'aliases', {})) 62 for a in getattr(opts, 'analyzers', ()): 63 i.analyzer(a) 64 return i 65 66 67class DocumentOptions(object): 68 def __init__(self, name, bases, attrs): 69 meta = attrs.pop('Meta', None) 70 71 # get doc_type name, if not defined use 'doc' 72 doc_type = getattr(meta, 'doc_type', 'doc') 73 74 # create the mapping instance 75 self.mapping = getattr(meta, 'mapping', Mapping(doc_type)) 76 77 # register all declared fields into the mapping 78 for name, value in list(iteritems(attrs)): 79 if isinstance(value, Field): 80 self.mapping.field(name, value) 81 del attrs[name] 82 83 # add all the mappings for meta fields 84 for name in dir(meta): 85 if isinstance(getattr(meta, name, None), MetaField): 86 params = getattr(meta, name) 87 self.mapping.meta(name, *params.args, **params.kwargs) 88 89 # document inheritance - include the fields from parents' mappings 90 for b in bases: 91 if hasattr(b, '_doc_type') and hasattr(b._doc_type, 'mapping'): 92 self.mapping.update(b._doc_type.mapping, update_only=True) 93 94 @property 95 def name(self): 96 return self.mapping.properties.name 97 98 99@add_metaclass(DocumentMeta) 100class InnerDoc(ObjectBase): 101 """ 102 Common class for inner documents like Object or Nested 103 """ 104 @classmethod 105 def from_es(cls, data, data_only=False): 106 if data_only: 107 data = {'_source': data} 108 return super(InnerDoc, cls).from_es(data) 109 110@add_metaclass(IndexMeta) 111class Document(ObjectBase): 112 """ 113 Model-like class for persisting documents in elasticsearch. 114 """ 115 @classmethod 116 def _matches(cls, hit): 117 return fnmatch(hit.get('_index', ''), cls._index._name) \ 118 and cls._doc_type.name == hit.get('_type') 119 120 @classmethod 121 def _get_using(cls, using=None): 122 return using or cls._index._using 123 124 @classmethod 125 def _get_connection(cls, using=None): 126 return connections.get_connection(cls._get_using(using)) 127 128 @classmethod 129 def _default_index(cls, index=None): 130 return index or cls._index._name 131 132 @classmethod 133 def init(cls, index=None, using=None): 134 """ 135 Create the index and populate the mappings in elasticsearch. 136 """ 137 i = cls._index 138 if index: 139 i = i.clone(name=index) 140 i.save(using=using) 141 142 def _get_index(self, index=None, required=True): 143 if index is None: 144 index = getattr(self.meta, 'index', None) 145 if index is None: 146 index = getattr(self._index, '_name', None) 147 if index is None and required: 148 raise ValidationException('No index') 149 if index and '*' in index: 150 raise ValidationException('You cannot write to a wildcard index.') 151 return index 152 153 def __repr__(self): 154 return '%s(%s)' % ( 155 self.__class__.__name__, 156 ', '.join('%s=%r' % (key, getattr(self.meta, key)) for key in 157 ('index', 'doc_type', 'id') if key in self.meta) 158 ) 159 160 @classmethod 161 def search(cls, using=None, index=None): 162 """ 163 Create an :class:`~elasticsearch_dsl.Search` instance that will search 164 over this ``Document``. 165 """ 166 return Search( 167 using=cls._get_using(using), 168 index=cls._default_index(index), 169 doc_type=[cls] 170 ) 171 172 @classmethod 173 def get(cls, id, using=None, index=None, **kwargs): 174 """ 175 Retrieve a single document from elasticsearch using it's ``id``. 176 177 :arg id: ``id`` of the document to be retireved 178 :arg index: elasticsearch index to use, if the ``Document`` is 179 associated with an index this can be omitted. 180 :arg using: connection alias to use, defaults to ``'default'`` 181 182 Any additional keyword arguments will be passed to 183 ``Elasticsearch.get`` unchanged. 184 """ 185 es = cls._get_connection(using) 186 doc = es.get( 187 index=cls._default_index(index), 188 doc_type=cls._doc_type.name, 189 id=id, 190 **kwargs 191 ) 192 if not doc.get('found', False): 193 return None 194 return cls.from_es(doc) 195 196 @classmethod 197 def mget(cls, docs, using=None, index=None, raise_on_error=True, 198 missing='none', **kwargs): 199 """ 200 Retrieve multiple document by their ``id``\s. Returns a list of instances 201 in the same order as requested. 202 203 :arg docs: list of ``id``\s of the documents to be retireved or a list 204 of document specifications as per 205 https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-multi-get.html 206 :arg index: elasticsearch index to use, if the ``Document`` is 207 associated with an index this can be omitted. 208 :arg using: connection alias to use, defaults to ``'default'`` 209 :arg missing: what to do when one of the documents requested is not 210 found. Valid options are ``'none'`` (use ``None``), ``'raise'`` (raise 211 ``NotFoundError``) or ``'skip'`` (ignore the missing document). 212 213 Any additional keyword arguments will be passed to 214 ``Elasticsearch.mget`` unchanged. 215 """ 216 if missing not in ('raise', 'skip', 'none'): 217 raise ValueError("'missing' must be 'raise', 'skip', or 'none'.") 218 es = cls._get_connection(using) 219 body = { 220 'docs': [ 221 doc if isinstance(doc, collections_abc.Mapping) else {'_id': doc} 222 for doc in docs 223 ] 224 } 225 results = es.mget( 226 body, 227 index=cls._default_index(index), 228 doc_type=cls._doc_type.name, 229 **kwargs 230 ) 231 232 objs, error_docs, missing_docs = [], [], [] 233 for doc in results['docs']: 234 if doc.get('found'): 235 if error_docs or missing_docs: 236 # We're going to raise an exception anyway, so avoid an 237 # expensive call to cls.from_es(). 238 continue 239 240 objs.append(cls.from_es(doc)) 241 242 elif doc.get('error'): 243 if raise_on_error: 244 error_docs.append(doc) 245 if missing == 'none': 246 objs.append(None) 247 248 # The doc didn't cause an error, but the doc also wasn't found. 249 elif missing == 'raise': 250 missing_docs.append(doc) 251 elif missing == 'none': 252 objs.append(None) 253 254 if error_docs: 255 error_ids = [doc['_id'] for doc in error_docs] 256 message = 'Required routing not provided for documents %s.' 257 message %= ', '.join(error_ids) 258 raise RequestError(400, message, error_docs) 259 if missing_docs: 260 missing_ids = [doc['_id'] for doc in missing_docs] 261 message = 'Documents %s not found.' % ', '.join(missing_ids) 262 raise NotFoundError(404, message, {'docs': missing_docs}) 263 return objs 264 265 def delete(self, using=None, index=None, **kwargs): 266 """ 267 Delete the instance in elasticsearch. 268 269 :arg index: elasticsearch index to use, if the ``Document`` is 270 associated with an index this can be omitted. 271 :arg using: connection alias to use, defaults to ``'default'`` 272 273 Any additional keyword arguments will be passed to 274 ``Elasticsearch.delete`` unchanged. 275 """ 276 es = self._get_connection(using) 277 # extract routing etc from meta 278 doc_meta = dict( 279 (k, self.meta[k]) 280 for k in DOC_META_FIELDS 281 if k in self.meta 282 ) 283 doc_meta.update(kwargs) 284 es.delete( 285 index=self._get_index(index), 286 doc_type=self._doc_type.name, 287 **doc_meta 288 ) 289 290 def to_dict(self, include_meta=False, skip_empty=True): 291 """ 292 Serialize the instance into a dictionary so that it can be saved in elasticsearch. 293 294 :arg include_meta: if set to ``True`` will include all the metadata 295 (``_index``, ``_type``, ``_id`` etc). Otherwise just the document's 296 data is serialized. This is useful when passing multiple instances into 297 ``elasticsearch.helpers.bulk``. 298 :arg skip_empty: if set to ``False`` will cause empty values (``None``, 299 ``[]``, ``{}``) to be left on the document. Those values will be 300 stripped out otherwise as they make no difference in elasticsearch. 301 """ 302 d = super(Document, self).to_dict(skip_empty=skip_empty) 303 if not include_meta: 304 return d 305 306 meta = dict( 307 ('_' + k, self.meta[k]) 308 for k in DOC_META_FIELDS 309 if k in self.meta 310 ) 311 312 # in case of to_dict include the index unlike save/update/delete 313 index = self._get_index(required=False) 314 if index is not None: 315 meta['_index'] = index 316 317 meta['_type'] = self._doc_type.name 318 meta['_source'] = d 319 return meta 320 321 def update(self, using=None, index=None, detect_noop=True, 322 doc_as_upsert=False, refresh=False, retry_on_conflict=None, 323 script=None, script_id=None, scripted_upsert=False, upsert=None, 324 **fields): 325 """ 326 Partial update of the document, specify fields you wish to update and 327 both the instance and the document in elasticsearch will be updated:: 328 329 doc = MyDocument(title='Document Title!') 330 doc.save() 331 doc.update(title='New Document Title!') 332 333 :arg index: elasticsearch index to use, if the ``Document`` is 334 associated with an index this can be omitted. 335 :arg using: connection alias to use, defaults to ``'default'`` 336 :arg detect_noop: Set to ``False`` to disable noop detection. 337 :arg refresh: Control when the changes made by this request are visible 338 to search. Set to ``True`` for immediate effect. 339 :arg retry_on_conflict: In between the get and indexing phases of the 340 update, it is possible that another process might have already 341 updated the same document. By default, the update will fail with a 342 version conflict exception. The retry_on_conflict parameter 343 controls how many times to retry the update before finally throwing 344 an exception. 345 :arg doc_as_upsert: Instead of sending a partial doc plus an upsert 346 doc, setting doc_as_upsert to true will use the contents of doc as 347 the upsert value 348 """ 349 body = { 350 'doc_as_upsert': doc_as_upsert, 351 'detect_noop': detect_noop, 352 } 353 354 # scripted update 355 if script or script_id: 356 if upsert is not None: 357 body['upsert'] = upsert 358 359 if script: 360 script = {'source': script} 361 else: 362 script = {'id': script_id} 363 364 script['params'] = fields 365 366 body['script'] = script 367 body['scripted_upsert'] = scripted_upsert 368 369 # partial document update 370 else: 371 if not fields: 372 raise IllegalOperation('You cannot call update() without updating individual fields or a script. ' 373 'If you wish to update the entire object use save().') 374 375 # update given fields locally 376 merge(self, fields) 377 378 # prepare data for ES 379 values = self.to_dict() 380 381 # if fields were given: partial update 382 body['doc'] = dict( 383 (k, values.get(k)) 384 for k in fields.keys() 385 ) 386 387 # extract routing etc from meta 388 doc_meta = dict( 389 (k, self.meta[k]) 390 for k in DOC_META_FIELDS 391 if k in self.meta 392 ) 393 394 if retry_on_conflict is not None: 395 doc_meta['retry_on_conflict'] = retry_on_conflict 396 397 meta = self._get_connection(using).update( 398 index=self._get_index(index), 399 doc_type=self._doc_type.name, 400 body=body, 401 refresh=refresh, 402 **doc_meta 403 ) 404 # update meta information from ES 405 for k in META_FIELDS: 406 if '_' + k in meta: 407 setattr(self.meta, k, meta['_' + k]) 408 409 def save(self, using=None, index=None, validate=True, skip_empty=True, **kwargs): 410 """ 411 Save the document into elasticsearch. If the document doesn't exist it 412 is created, it is overwritten otherwise. Returns ``True`` if this 413 operations resulted in new document being created. 414 415 :arg index: elasticsearch index to use, if the ``Document`` is 416 associated with an index this can be omitted. 417 :arg using: connection alias to use, defaults to ``'default'`` 418 :arg validate: set to ``False`` to skip validating the document 419 :arg skip_empty: if set to ``False`` will cause empty values (``None``, 420 ``[]``, ``{}``) to be left on the document. Those values will be 421 stripped out otherwise as they make no difference in elasticsearch. 422 423 Any additional keyword arguments will be passed to 424 ``Elasticsearch.index`` unchanged. 425 """ 426 if validate: 427 self.full_clean() 428 429 es = self._get_connection(using) 430 # extract routing etc from meta 431 doc_meta = dict( 432 (k, self.meta[k]) 433 for k in DOC_META_FIELDS 434 if k in self.meta 435 ) 436 doc_meta.update(kwargs) 437 meta = es.index( 438 index=self._get_index(index), 439 doc_type=self._doc_type.name, 440 body=self.to_dict(skip_empty=skip_empty), 441 **doc_meta 442 ) 443 # update meta information from ES 444 for k in META_FIELDS: 445 if '_' + k in meta: 446 setattr(self.meta, k, meta['_' + k]) 447 448 # return True/False if the document has been created/updated 449 return meta['result'] == 'created' 450 451# limited backwards compatibility, to be removed in 7.0.0 452DocType = Document 453