1#!/usr/local/bin/python3.8 2# vim:fileencoding=utf-8 3# License: GPLv3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net> 4 5 6import json 7import re 8from collections import defaultdict, namedtuple 9from contextlib import suppress 10from operator import attrgetter 11from functools import wraps 12from lxml import etree 13 14from calibre import prints 15from calibre.ebooks.metadata import authors_to_string, check_isbn, string_to_authors 16from calibre.ebooks.metadata.book.base import Metadata 17from calibre.ebooks.metadata.book.json_codec import ( 18 decode_is_multiple, encode_is_multiple, object_to_unicode 19) 20from calibre.ebooks.metadata.utils import ( 21 create_manifest_item, ensure_unique, normalize_languages, parse_opf, 22 pretty_print_opf 23) 24from calibre.ebooks.oeb.base import DC, OPF, OPF2_NSMAP 25from calibre.utils.config import from_json, to_json 26from calibre.utils.date import ( 27 fix_only_date, is_date_undefined, isoformat, parse_date as parse_date_, utcnow, 28 w3cdtf 29) 30from calibre.utils.iso8601 import parse_iso8601 31from calibre.utils.localization import canonicalize_lang 32from polyglot.builtins import iteritems 33 34# Utils {{{ 35_xpath_cache = {} 36_re_cache = {} 37 38 39def uniq(vals): 40 ''' Remove all duplicates from vals, while preserving order. ''' 41 vals = vals or () 42 seen = set() 43 seen_add = seen.add 44 return list(x for x in vals if x not in seen and not seen_add(x)) 45 46 47def dump_dict(cats): 48 return json.dumps(object_to_unicode(cats or {}), ensure_ascii=False, skipkeys=True) 49 50 51def XPath(x): 52 try: 53 return _xpath_cache[x] 54 except KeyError: 55 _xpath_cache[x] = ans = etree.XPath(x, namespaces=OPF2_NSMAP) 56 return ans 57 58 59def regex(r, flags=0): 60 try: 61 return _re_cache[(r, flags)] 62 except KeyError: 63 _re_cache[(r, flags)] = ans = re.compile(r, flags) 64 return ans 65 66 67def remove_refines(e, refines): 68 for x in refines[e.get('id')]: 69 x.getparent().remove(x) 70 refines.pop(e.get('id'), None) 71 72 73def remove_element(e, refines): 74 remove_refines(e, refines) 75 e.getparent().remove(e) 76 77 78def properties_for_id(item_id, refines): 79 ans = {} 80 if item_id: 81 for elem in refines[item_id]: 82 key = elem.get('property') 83 if key: 84 val = (elem.text or '').strip() 85 if val: 86 ans[key] = val 87 return ans 88 89 90def properties_for_id_with_scheme(item_id, prefixes, refines): 91 ans = defaultdict(list) 92 if item_id: 93 for elem in refines[item_id]: 94 key = elem.get('property') 95 if key: 96 val = (elem.text or '').strip() 97 if val: 98 scheme = elem.get('scheme') or None 99 scheme_ns = None 100 if scheme is not None: 101 p, r = scheme.partition(':')[::2] 102 if p and r: 103 ns = prefixes.get(p) 104 if ns: 105 scheme_ns = ns 106 scheme = r 107 ans[key].append((scheme_ns, scheme, val)) 108 return ans 109 110 111def getroot(elem): 112 while True: 113 q = elem.getparent() 114 if q is None: 115 return elem 116 elem = q 117 118 119def ensure_id(elem): 120 root = getroot(elem) 121 eid = elem.get('id') 122 if not eid: 123 eid = ensure_unique('id', frozenset(XPath('//*/@id')(root))) 124 elem.set('id', eid) 125 return eid 126 127 128def normalize_whitespace(text): 129 if not text: 130 return text 131 return re.sub(r'\s+', ' ', text).strip() 132 133 134def simple_text(f): 135 @wraps(f) 136 def wrapper(*args, **kw): 137 return normalize_whitespace(f(*args, **kw)) 138 return wrapper 139 140 141def items_with_property(root, q, prefixes=None): 142 if prefixes is None: 143 prefixes = read_prefixes(root) 144 q = expand_prefix(q, known_prefixes).lower() 145 for item in XPath("./opf:manifest/opf:item[@properties]")(root): 146 for prop in (item.get('properties') or '').lower().split(): 147 prop = expand_prefix(prop, prefixes) 148 if prop == q: 149 yield item 150 break 151 152# }}} 153 154# Prefixes {{{ 155 156# http://www.idpf.org/epub/vocab/package/pfx/ 157 158 159reserved_prefixes = { 160 'dcterms': 'http://purl.org/dc/terms/', 161 'epubsc': 'http://idpf.org/epub/vocab/sc/#', 162 'marc': 'http://id.loc.gov/vocabulary/', 163 'media': 'http://www.idpf.org/epub/vocab/overlays/#', 164 'onix': 'http://www.editeur.org/ONIX/book/codelists/current.html#', 165 'rendition':'http://www.idpf.org/vocab/rendition/#', 166 'schema': 'http://schema.org/', 167 'xsd': 'http://www.w3.org/2001/XMLSchema#', 168} 169 170CALIBRE_PREFIX = 'https://calibre-ebook.com' 171known_prefixes = reserved_prefixes.copy() 172known_prefixes['calibre'] = CALIBRE_PREFIX 173 174 175def parse_prefixes(x): 176 return {m.group(1):m.group(2) for m in re.finditer(r'(\S+): \s*(\S+)', x)} 177 178 179def read_prefixes(root): 180 ans = reserved_prefixes.copy() 181 ans.update(parse_prefixes(root.get('prefix') or '')) 182 return ans 183 184 185def expand_prefix(raw, prefixes): 186 return regex(r'(\S+)\s*:\s*(\S+)').sub(lambda m:(prefixes.get(m.group(1), m.group(1)) + ':' + m.group(2)), raw or '') 187 188 189def ensure_prefix(root, prefixes, prefix, value=None): 190 if prefixes is None: 191 prefixes = read_prefixes(root) 192 prefixes[prefix] = value or reserved_prefixes[prefix] 193 prefixes = {k:v for k, v in iteritems(prefixes) if reserved_prefixes.get(k) != v} 194 if prefixes: 195 root.set('prefix', ' '.join('%s: %s' % (k, v) for k, v in iteritems(prefixes))) 196 else: 197 root.attrib.pop('prefix', None) 198 199# }}} 200 201# Refines {{{ 202 203 204def read_refines(root): 205 ans = defaultdict(list) 206 for meta in XPath('./opf:metadata/opf:meta[@refines]')(root): 207 r = meta.get('refines') or '' 208 if r.startswith('#'): 209 ans[r[1:]].append(meta) 210 return ans 211 212 213def refdef(prop, val, scheme=None): 214 return (prop, val, scheme) 215 216 217def set_refines(elem, existing_refines, *new_refines): 218 eid = ensure_id(elem) 219 remove_refines(elem, existing_refines) 220 for ref in reversed(new_refines): 221 prop, val, scheme = ref 222 r = elem.makeelement(OPF('meta')) 223 r.set('refines', '#' + eid), r.set('property', prop) 224 r.text = val.strip() 225 if scheme: 226 r.set('scheme', scheme) 227 p = elem.getparent() 228 p.insert(p.index(elem)+1, r) 229# }}} 230 231# Identifiers {{{ 232 233 234def parse_identifier(ident, val, refines): 235 idid = ident.get('id') 236 refines = refines[idid] 237 scheme = None 238 lval = val.lower() 239 240 def finalize(scheme, val): 241 if not scheme or not val: 242 return None, None 243 scheme = scheme.lower() 244 if scheme in ('http', 'https'): 245 return None, None 246 if scheme.startswith('isbn'): 247 scheme = 'isbn' 248 if scheme == 'isbn': 249 val = val.split(':')[-1] 250 val = check_isbn(val) 251 if val is None: 252 return None, None 253 return scheme, val 254 255 # Try the OPF 2 style opf:scheme attribute, which will be present, for 256 # example, in EPUB 3 files that have had their metadata set by an 257 # application that only understands EPUB 2. 258 scheme = ident.get(OPF('scheme')) 259 if scheme and not lval.startswith('urn:'): 260 return finalize(scheme, val) 261 262 # Technically, we should be looking for refines that define the scheme, but 263 # the IDioticPF created such a bad spec that they got their own 264 # examples wrong, so I cannot be bothered doing this. 265 # http://www.idpf.org/epub/301/spec/epub-publications-errata/ 266 267 # Parse the value for the scheme 268 if lval.startswith('urn:'): 269 val = val[4:] 270 271 prefix, rest = val.partition(':')[::2] 272 return finalize(prefix, rest) 273 274 275def read_identifiers(root, prefixes, refines): 276 ans = defaultdict(list) 277 for ident in XPath('./opf:metadata/dc:identifier')(root): 278 val = (ident.text or '').strip() 279 if val: 280 scheme, val = parse_identifier(ident, val, refines) 281 if scheme and val: 282 ans[scheme].append(val) 283 return ans 284 285 286def set_identifiers(root, prefixes, refines, new_identifiers, force_identifiers=False): 287 uid = root.get('unique-identifier') 288 package_identifier = None 289 for ident in XPath('./opf:metadata/dc:identifier')(root): 290 if uid is not None and uid == ident.get('id'): 291 package_identifier = ident 292 continue 293 val = (ident.text or '').strip() 294 if not val: 295 ident.getparent().remove(ident) 296 continue 297 scheme, val = parse_identifier(ident, val, refines) 298 if not scheme or not val or force_identifiers or scheme in new_identifiers: 299 remove_element(ident, refines) 300 continue 301 metadata = XPath('./opf:metadata')(root)[0] 302 for scheme, val in iteritems(new_identifiers): 303 ident = metadata.makeelement(DC('identifier')) 304 ident.text = '%s:%s' % (scheme, val) 305 if package_identifier is None: 306 metadata.append(ident) 307 else: 308 p = package_identifier.getparent() 309 p.insert(p.index(package_identifier), ident) 310 311 312def identifier_writer(name): 313 def writer(root, prefixes, refines, ival=None): 314 uid = root.get('unique-identifier') 315 package_identifier = None 316 for ident in XPath('./opf:metadata/dc:identifier')(root): 317 is_package_id = uid is not None and uid == ident.get('id') 318 if is_package_id: 319 package_identifier = ident 320 val = (ident.text or '').strip() 321 if (val.startswith(name + ':') or ident.get(OPF('scheme')) == name) and not is_package_id: 322 remove_element(ident, refines) 323 metadata = XPath('./opf:metadata')(root)[0] 324 if ival: 325 ident = metadata.makeelement(DC('identifier')) 326 ident.text = '%s:%s' % (name, ival) 327 if package_identifier is None: 328 metadata.append(ident) 329 else: 330 p = package_identifier.getparent() 331 p.insert(p.index(package_identifier), ident) 332 return writer 333 334 335set_application_id = identifier_writer('calibre') 336set_uuid = identifier_writer('uuid') 337 338# }}} 339 340# Title {{{ 341 342 343def find_main_title(root, refines, remove_blanks=False): 344 first_title = main_title = None 345 for title in XPath('./opf:metadata/dc:title')(root): 346 if not title.text or not title.text.strip(): 347 if remove_blanks: 348 remove_element(title, refines) 349 continue 350 if first_title is None: 351 first_title = title 352 props = properties_for_id(title.get('id'), refines) 353 if props.get('title-type') == 'main': 354 main_title = title 355 break 356 else: 357 main_title = first_title 358 return main_title 359 360 361def find_subtitle(root, refines): 362 for title in XPath('./opf:metadata/dc:title')(root): 363 if not title.text or not title.text.strip(): 364 continue 365 props = properties_for_id(title.get('id'), refines) 366 q = props.get('title-type') or '' 367 if 'subtitle' in q or 'sub-title' in q: 368 return title 369 370 371@simple_text 372def read_title(root, prefixes, refines): 373 main_title = find_main_title(root, refines) 374 if main_title is None: 375 return None 376 ans = main_title.text.strip() 377 st = find_subtitle(root, refines) 378 if st is not None and st is not main_title: 379 ans += ': ' + st.text.strip() 380 return ans 381 382 383@simple_text 384def read_title_sort(root, prefixes, refines): 385 main_title = find_main_title(root, refines) 386 if main_title is not None: 387 fa = properties_for_id(main_title.get('id'), refines).get('file-as') 388 if fa: 389 return fa 390 # Look for OPF 2.0 style title_sort 391 for m in XPath('./opf:metadata/opf:meta[@name="calibre:title_sort"]')(root): 392 ans = m.get('content') 393 if ans: 394 return ans 395 396 397def set_title(root, prefixes, refines, title, title_sort=None): 398 main_title = find_main_title(root, refines, remove_blanks=True) 399 st = find_subtitle(root, refines) 400 if st is not None: 401 remove_element(st, refines) 402 if main_title is None: 403 m = XPath('./opf:metadata')(root)[0] 404 main_title = m.makeelement(DC('title')) 405 m.insert(0, main_title) 406 main_title.text = title or None 407 ts = [refdef('file-as', title_sort)] if title_sort else () 408 set_refines(main_title, refines, refdef('title-type', 'main'), *ts) 409 for m in XPath('./opf:metadata/opf:meta[@name="calibre:title_sort"]')(root): 410 remove_element(m, refines) 411 412# }}} 413 414# Languages {{{ 415 416 417def read_languages(root, prefixes, refines): 418 ans = [] 419 for lang in XPath('./opf:metadata/dc:language')(root): 420 val = canonicalize_lang((lang.text or '').strip()) 421 if val and val not in ans and val != 'und': 422 ans.append(val) 423 return uniq(ans) 424 425 426def set_languages(root, prefixes, refines, languages): 427 opf_languages = [] 428 for lang in XPath('./opf:metadata/dc:language')(root): 429 remove_element(lang, refines) 430 val = (lang.text or '').strip() 431 if val: 432 opf_languages.append(val) 433 languages = list(filter(lambda x: x and x != 'und', normalize_languages(opf_languages, languages))) 434 if not languages: 435 # EPUB spec says dc:language is required 436 languages = ['und'] 437 metadata = XPath('./opf:metadata')(root)[0] 438 for lang in uniq(languages): 439 l = metadata.makeelement(DC('language')) 440 l.text = lang 441 metadata.append(l) 442# }}} 443 444# Creator/Contributor {{{ 445 446 447Author = namedtuple('Author', 'name sort seq', defaults=(0,)) 448 449 450def is_relators_role(props, q): 451 for role in props.get('role'): 452 if role: 453 scheme_ns, scheme, role = role 454 if role.lower() == q and (scheme_ns is None or (scheme_ns, scheme) == (reserved_prefixes['marc'], 'relators')): 455 return True 456 return False 457 458 459def read_authors(root, prefixes, refines): 460 roled_authors, unroled_authors = [], [] 461 editors_map = {} 462 463 def author(item, props, val): 464 aus = None 465 file_as = props.get('file-as') 466 if file_as: 467 aus = file_as[0][-1] 468 else: 469 aus = item.get(OPF('file-as')) or None 470 seq = 0 471 ds = props.get('display-seq') 472 with suppress(Exception): 473 seq = int(ds[0][-1]) 474 return Author(normalize_whitespace(val), normalize_whitespace(aus), seq) 475 476 for item in XPath('./opf:metadata/dc:creator')(root): 477 val = (item.text or '').strip() 478 if val: 479 props = properties_for_id_with_scheme(item.get('id'), prefixes, refines) 480 role = props.get('role') 481 opf_role = item.get(OPF('role')) 482 if role: 483 if is_relators_role(props, 'aut'): 484 roled_authors.append(author(item, props, val)) 485 if is_relators_role(props, 'edt'): 486 # See https://bugs.launchpad.net/calibre/+bug/1950579 487 a = author(item, props, val) 488 editors_map[a.name] = a 489 elif opf_role: 490 if opf_role.lower() == 'aut': 491 roled_authors.append(author(item, props, val)) 492 else: 493 unroled_authors.append(author(item, props, val)) 494 495 if roled_authors or unroled_authors: 496 ans = uniq(roled_authors or unroled_authors) 497 else: 498 ans = uniq(editors_map.values()) 499 ans.sort(key=attrgetter('seq')) 500 return ans 501 502 503def set_authors(root, prefixes, refines, authors): 504 ensure_prefix(root, prefixes, 'marc') 505 removals = [] 506 for role in ('aut', 'edt'): 507 for item in XPath('./opf:metadata/dc:creator')(root): 508 props = properties_for_id_with_scheme(item.get('id'), prefixes, refines) 509 opf_role = item.get(OPF('role')) 510 if (opf_role and opf_role.lower() != role) or (props.get('role') and not is_relators_role(props, role)): 511 continue 512 removals.append(item) 513 if removals: 514 break 515 for item in removals: 516 remove_element(item, refines) 517 metadata = XPath('./opf:metadata')(root)[0] 518 for author in authors: 519 if author.name: 520 a = metadata.makeelement(DC('creator')) 521 aid = ensure_id(a) 522 a.text = author.name 523 metadata.append(a) 524 m = metadata.makeelement(OPF('meta'), attrib={'refines':'#'+aid, 'property':'role', 'scheme':'marc:relators'}) 525 m.text = 'aut' 526 metadata.append(m) 527 if author.sort: 528 m = metadata.makeelement(OPF('meta'), attrib={'refines':'#'+aid, 'property':'file-as'}) 529 m.text = author.sort 530 metadata.append(m) 531 532 533def read_book_producers(root, prefixes, refines): 534 ans = [] 535 for item in XPath('./opf:metadata/dc:contributor')(root): 536 val = (item.text or '').strip() 537 if val: 538 props = properties_for_id_with_scheme(item.get('id'), prefixes, refines) 539 role = props.get('role') 540 opf_role = item.get(OPF('role')) 541 if role: 542 if is_relators_role(props, 'bkp'): 543 ans.append(normalize_whitespace(val)) 544 elif opf_role and opf_role.lower() == 'bkp': 545 ans.append(normalize_whitespace(val)) 546 return ans 547 548 549def set_book_producers(root, prefixes, refines, producers): 550 for item in XPath('./opf:metadata/dc:contributor')(root): 551 props = properties_for_id_with_scheme(item.get('id'), prefixes, refines) 552 opf_role = item.get(OPF('role')) 553 if (opf_role and opf_role.lower() != 'bkp') or (props.get('role') and not is_relators_role(props, 'bkp')): 554 continue 555 remove_element(item, refines) 556 metadata = XPath('./opf:metadata')(root)[0] 557 for bkp in producers: 558 if bkp: 559 a = metadata.makeelement(DC('contributor')) 560 aid = ensure_id(a) 561 a.text = bkp 562 metadata.append(a) 563 m = metadata.makeelement(OPF('meta'), attrib={'refines':'#'+aid, 'property':'role', 'scheme':'marc:relators'}) 564 m.text = 'bkp' 565 metadata.append(m) 566# }}} 567 568# Dates {{{ 569 570 571def parse_date(raw, is_w3cdtf=False): 572 raw = raw.strip() 573 if is_w3cdtf: 574 ans = parse_iso8601(raw, assume_utc=True) 575 if 'T' not in raw and ' ' not in raw: 576 ans = fix_only_date(ans) 577 else: 578 ans = parse_date_(raw, assume_utc=True) 579 if ' ' not in raw and 'T' not in raw and (ans.hour, ans.minute, ans.second) == (0, 0, 0): 580 ans = fix_only_date(ans) 581 return ans 582 583 584def read_pubdate(root, prefixes, refines): 585 for date in XPath('./opf:metadata/dc:date')(root): 586 val = (date.text or '').strip() 587 if val: 588 try: 589 return parse_date(val) 590 except Exception: 591 continue 592 593 594def set_pubdate(root, prefixes, refines, val): 595 for date in XPath('./opf:metadata/dc:date')(root): 596 remove_element(date, refines) 597 if not is_date_undefined(val): 598 val = isoformat(val) 599 m = XPath('./opf:metadata')(root)[0] 600 d = m.makeelement(DC('date')) 601 d.text = val 602 m.append(d) 603 604 605def read_timestamp(root, prefixes, refines): 606 pq = '%s:timestamp' % CALIBRE_PREFIX 607 sq = '%s:w3cdtf' % reserved_prefixes['dcterms'] 608 for meta in XPath('./opf:metadata/opf:meta[@property]')(root): 609 val = (meta.text or '').strip() 610 if val: 611 prop = expand_prefix(meta.get('property'), prefixes) 612 if prop.lower() == pq: 613 scheme = expand_prefix(meta.get('scheme'), prefixes).lower() 614 try: 615 return parse_date(val, is_w3cdtf=scheme == sq) 616 except Exception: 617 continue 618 for meta in XPath('./opf:metadata/opf:meta[@name="calibre:timestamp"]')(root): 619 val = meta.get('content') 620 if val: 621 try: 622 return parse_date(val, is_w3cdtf=True) 623 except Exception: 624 continue 625 626 627def create_timestamp(root, prefixes, m, val): 628 if not is_date_undefined(val): 629 ensure_prefix(root, prefixes, 'calibre', CALIBRE_PREFIX) 630 ensure_prefix(root, prefixes, 'dcterms') 631 val = w3cdtf(val) 632 d = m.makeelement(OPF('meta'), attrib={'property':'calibre:timestamp', 'scheme':'dcterms:W3CDTF'}) 633 d.text = val 634 m.append(d) 635 636 637def set_timestamp(root, prefixes, refines, val): 638 pq = '%s:timestamp' % CALIBRE_PREFIX 639 for meta in XPath('./opf:metadata/opf:meta')(root): 640 prop = expand_prefix(meta.get('property'), prefixes) 641 if prop.lower() == pq or meta.get('name') == 'calibre:timestamp': 642 remove_element(meta, refines) 643 create_timestamp(root, prefixes, XPath('./opf:metadata')(root)[0], val) 644 645 646def read_last_modified(root, prefixes, refines): 647 pq = '%s:modified' % reserved_prefixes['dcterms'] 648 sq = '%s:w3cdtf' % reserved_prefixes['dcterms'] 649 for meta in XPath('./opf:metadata/opf:meta[@property]')(root): 650 val = (meta.text or '').strip() 651 if val: 652 prop = expand_prefix(meta.get('property'), prefixes) 653 if prop.lower() == pq: 654 scheme = expand_prefix(meta.get('scheme'), prefixes).lower() 655 try: 656 return parse_date(val, is_w3cdtf=scheme == sq) 657 except Exception: 658 continue 659 660 661def set_last_modified(root, prefixes, refines, val=None): 662 pq = '%s:modified' % reserved_prefixes['dcterms'] 663 val = w3cdtf(val or utcnow()) 664 for meta in XPath('./opf:metadata/opf:meta[@property]')(root): 665 prop = expand_prefix(meta.get('property'), prefixes) 666 if prop.lower() == pq: 667 iid = meta.get('id') 668 if not iid or not refines[iid]: 669 break 670 else: 671 ensure_prefix(root, prefixes, 'dcterms') 672 m = XPath('./opf:metadata')(root)[0] 673 meta = m.makeelement(OPF('meta'), attrib={'property':'dcterms:modified', 'scheme':'dcterms:W3CDTF'}) 674 m.append(meta) 675 meta.text = val 676# }}} 677 678# Comments {{{ 679 680 681def read_comments(root, prefixes, refines): 682 ans = '' 683 for dc in XPath('./opf:metadata/dc:description')(root): 684 if dc.text: 685 ans += '\n' + dc.text.strip() 686 return ans.strip() 687 688 689def set_comments(root, prefixes, refines, val): 690 for dc in XPath('./opf:metadata/dc:description')(root): 691 remove_element(dc, refines) 692 m = XPath('./opf:metadata')(root)[0] 693 if val: 694 val = val.strip() 695 if val: 696 c = m.makeelement(DC('description')) 697 c.text = val 698 m.append(c) 699# }}} 700 701# Publisher {{{ 702 703 704@simple_text 705def read_publisher(root, prefixes, refines): 706 for dc in XPath('./opf:metadata/dc:publisher')(root): 707 if dc.text: 708 return dc.text 709 710 711def set_publisher(root, prefixes, refines, val): 712 for dc in XPath('./opf:metadata/dc:publisher')(root): 713 remove_element(dc, refines) 714 m = XPath('./opf:metadata')(root)[0] 715 if val: 716 val = val.strip() 717 if val: 718 c = m.makeelement(DC('publisher')) 719 c.text = normalize_whitespace(val) 720 m.append(c) 721# }}} 722 723# Tags {{{ 724 725 726def read_tags(root, prefixes, refines): 727 ans = [] 728 for dc in XPath('./opf:metadata/dc:subject')(root): 729 if dc.text: 730 ans.extend(map(normalize_whitespace, dc.text.split(','))) 731 return uniq(list(filter(None, ans))) 732 733 734def set_tags(root, prefixes, refines, val): 735 for dc in XPath('./opf:metadata/dc:subject')(root): 736 remove_element(dc, refines) 737 m = XPath('./opf:metadata')(root)[0] 738 if val: 739 val = uniq(list(filter(None, val))) 740 for x in val: 741 c = m.makeelement(DC('subject')) 742 c.text = normalize_whitespace(x) 743 if c.text: 744 m.append(c) 745# }}} 746 747# Rating {{{ 748 749 750def read_rating(root, prefixes, refines): 751 pq = '%s:rating' % CALIBRE_PREFIX 752 for meta in XPath('./opf:metadata/opf:meta[@property]')(root): 753 val = (meta.text or '').strip() 754 if val: 755 prop = expand_prefix(meta.get('property'), prefixes) 756 if prop.lower() == pq: 757 try: 758 return float(val) 759 except Exception: 760 continue 761 for meta in XPath('./opf:metadata/opf:meta[@name="calibre:rating"]')(root): 762 val = meta.get('content') 763 if val: 764 try: 765 return float(val) 766 except Exception: 767 continue 768 769 770def create_rating(root, prefixes, val): 771 ensure_prefix(root, prefixes, 'calibre', CALIBRE_PREFIX) 772 m = XPath('./opf:metadata')(root)[0] 773 d = m.makeelement(OPF('meta'), attrib={'property':'calibre:rating'}) 774 d.text = val 775 m.append(d) 776 777 778def set_rating(root, prefixes, refines, val): 779 pq = '%s:rating' % CALIBRE_PREFIX 780 for meta in XPath('./opf:metadata/opf:meta[@name="calibre:rating"]')(root): 781 remove_element(meta, refines) 782 for meta in XPath('./opf:metadata/opf:meta[@property]')(root): 783 prop = expand_prefix(meta.get('property'), prefixes) 784 if prop.lower() == pq: 785 remove_element(meta, refines) 786 if val: 787 create_rating(root, prefixes, '%.2g' % float(val)) 788# }}} 789 790# Series {{{ 791 792 793def read_series(root, prefixes, refines): 794 series_index = 1.0 795 for meta in XPath('./opf:metadata/opf:meta[@property="belongs-to-collection" and @id]')(root): 796 val = (meta.text or '').strip() 797 if val: 798 props = properties_for_id(meta.get('id'), refines) 799 if props.get('collection-type') == 'series': 800 try: 801 series_index = float(props.get('group-position').strip()) 802 except Exception: 803 pass 804 return normalize_whitespace(val), series_index 805 for si in XPath('./opf:metadata/opf:meta[@name="calibre:series_index"]/@content')(root): 806 try: 807 series_index = float(si) 808 break 809 except: 810 pass 811 for s in XPath('./opf:metadata/opf:meta[@name="calibre:series"]/@content')(root): 812 s = normalize_whitespace(s) 813 if s: 814 return s, series_index 815 return None, series_index 816 817 818def create_series(root, refines, series, series_index): 819 m = XPath('./opf:metadata')(root)[0] 820 d = m.makeelement(OPF('meta'), attrib={'property':'belongs-to-collection'}) 821 d.text = series 822 m.append(d) 823 set_refines(d, refines, refdef('collection-type', 'series'), refdef('group-position', series_index)) 824 825 826def set_series(root, prefixes, refines, series, series_index): 827 for meta in XPath('./opf:metadata/opf:meta[@name="calibre:series" or @name="calibre:series_index"]')(root): 828 remove_element(meta, refines) 829 for meta in XPath('./opf:metadata/opf:meta[@property="belongs-to-collection"]')(root): 830 remove_element(meta, refines) 831 if series: 832 create_series(root, refines, series, '%.2g' % series_index) 833# }}} 834 835# User metadata {{{ 836 837 838def dict_reader(name, load=json.loads, try2=True): 839 pq = '%s:%s' % (CALIBRE_PREFIX, name) 840 841 def reader(root, prefixes, refines): 842 for meta in XPath('./opf:metadata/opf:meta[@property]')(root): 843 val = (meta.text or '').strip() 844 if val: 845 prop = expand_prefix(meta.get('property'), prefixes) 846 if prop.lower() == pq: 847 try: 848 ans = load(val) 849 if isinstance(ans, dict): 850 return ans 851 except Exception: 852 continue 853 if try2: 854 for meta in XPath('./opf:metadata/opf:meta[@name="calibre:%s"]' % name)(root): 855 val = meta.get('content') 856 if val: 857 try: 858 ans = load(val) 859 if isinstance(ans, dict): 860 return ans 861 except Exception: 862 continue 863 return reader 864 865 866read_user_categories = dict_reader('user_categories') 867read_author_link_map = dict_reader('author_link_map') 868 869 870def dict_writer(name, serialize=dump_dict, remove2=True): 871 pq = '%s:%s' % (CALIBRE_PREFIX, name) 872 873 def writer(root, prefixes, refines, val): 874 if remove2: 875 for meta in XPath('./opf:metadata/opf:meta[@name="calibre:%s"]' % name)(root): 876 remove_element(meta, refines) 877 for meta in XPath('./opf:metadata/opf:meta[@property]')(root): 878 prop = expand_prefix(meta.get('property'), prefixes) 879 if prop.lower() == pq: 880 remove_element(meta, refines) 881 if val: 882 ensure_prefix(root, prefixes, 'calibre', CALIBRE_PREFIX) 883 m = XPath('./opf:metadata')(root)[0] 884 d = m.makeelement(OPF('meta'), attrib={'property':'calibre:%s' % name}) 885 d.text = serialize(val) 886 m.append(d) 887 return writer 888 889 890set_user_categories = dict_writer('user_categories') 891set_author_link_map = dict_writer('author_link_map') 892 893 894def deserialize_user_metadata(val): 895 val = json.loads(val, object_hook=from_json) 896 ans = {} 897 for name, fm in iteritems(val): 898 decode_is_multiple(fm) 899 ans[name] = fm 900 return ans 901 902 903read_user_metadata3 = dict_reader('user_metadata', load=deserialize_user_metadata, try2=False) 904 905 906def read_user_metadata2(root, remove_tags=False): 907 ans = {} 908 for meta in XPath('./opf:metadata/opf:meta[starts-with(@name, "calibre:user_metadata:")]')(root): 909 name = meta.get('name') 910 name = ':'.join(name.split(':')[2:]) 911 if not name or not name.startswith('#'): 912 continue 913 fm = meta.get('content') 914 if remove_tags: 915 meta.getparent().remove(meta) 916 try: 917 fm = json.loads(fm, object_hook=from_json) 918 decode_is_multiple(fm) 919 ans[name] = fm 920 except Exception: 921 prints('Failed to read user metadata:', name) 922 import traceback 923 traceback.print_exc() 924 continue 925 return ans 926 927 928def read_user_metadata(root, prefixes, refines): 929 return read_user_metadata3(root, prefixes, refines) or read_user_metadata2(root) 930 931 932def serialize_user_metadata(val): 933 return json.dumps(object_to_unicode(val), ensure_ascii=False, default=to_json, indent=2, sort_keys=True) 934 935 936set_user_metadata3 = dict_writer('user_metadata', serialize=serialize_user_metadata, remove2=False) 937 938 939def set_user_metadata(root, prefixes, refines, val): 940 for meta in XPath('./opf:metadata/opf:meta[starts-with(@name, "calibre:user_metadata:")]')(root): 941 remove_element(meta, refines) 942 if val: 943 nval = {} 944 for name, fm in val.items(): 945 fm = fm.copy() 946 encode_is_multiple(fm) 947 nval[name] = fm 948 set_user_metadata3(root, prefixes, refines, nval) 949 950# }}} 951 952# Covers {{{ 953 954 955def read_raster_cover(root, prefixes, refines): 956 957 def get_href(item): 958 mt = item.get('media-type') 959 if mt and 'xml' not in mt and 'html' not in mt: 960 href = item.get('href') 961 if href: 962 return href 963 964 for item in items_with_property(root, 'cover-image', prefixes): 965 href = get_href(item) 966 if href: 967 return href 968 969 for item_id in XPath('./opf:metadata/opf:meta[@name="cover"]/@content')(root): 970 for item in XPath('./opf:manifest/opf:item[@id and @href and @media-type]')(root): 971 if item.get('id') == item_id: 972 href = get_href(item) 973 if href: 974 return href 975 976 977def ensure_is_only_raster_cover(root, prefixes, refines, raster_cover_item_href): 978 for item in XPath('./opf:metadata/opf:meta[@name="cover"]')(root): 979 remove_element(item, refines) 980 for item in items_with_property(root, 'cover-image', prefixes): 981 prop = normalize_whitespace(item.get('properties').replace('cover-image', '')) 982 if prop: 983 item.set('properties', prop) 984 else: 985 del item.attrib['properties'] 986 for item in XPath('./opf:manifest/opf:item')(root): 987 if item.get('href') == raster_cover_item_href: 988 item.set('properties', normalize_whitespace((item.get('properties') or '') + ' cover-image')) 989 990# }}} 991 992# Reading/setting Metadata objects {{{ 993 994 995def first_spine_item(root, prefixes, refines): 996 for i in XPath('./opf:spine/opf:itemref/@idref')(root): 997 for item in XPath('./opf:manifest/opf:item')(root): 998 if item.get('id') == i: 999 return item.get('href') or None 1000 1001 1002def set_last_modified_in_opf(root): 1003 prefixes, refines = read_prefixes(root), read_refines(root) 1004 set_last_modified(root, prefixes, refines) 1005 1006 1007def read_metadata(root, ver=None, return_extra_data=False): 1008 ans = Metadata(_('Unknown'), [_('Unknown')]) 1009 prefixes, refines = read_prefixes(root), read_refines(root) 1010 identifiers = read_identifiers(root, prefixes, refines) 1011 ids = {} 1012 for key, vals in iteritems(identifiers): 1013 if key == 'calibre': 1014 ans.application_id = vals[0] 1015 elif key == 'uuid': 1016 ans.uuid = vals[0] 1017 else: 1018 ids[key] = vals[0] 1019 ans.set_identifiers(ids) 1020 ans.title = read_title(root, prefixes, refines) or ans.title 1021 ans.title_sort = read_title_sort(root, prefixes, refines) or ans.title_sort 1022 ans.languages = read_languages(root, prefixes, refines) or ans.languages 1023 auts, aus = [], [] 1024 for a in read_authors(root, prefixes, refines): 1025 auts.append(a.name), aus.append(a.sort) 1026 ans.authors = auts or ans.authors 1027 ans.author_sort = authors_to_string(aus) or ans.author_sort 1028 bkp = read_book_producers(root, prefixes, refines) 1029 if bkp: 1030 if bkp[0]: 1031 ans.book_producer = bkp[0] 1032 pd = read_pubdate(root, prefixes, refines) 1033 if not is_date_undefined(pd): 1034 ans.pubdate = pd 1035 ts = read_timestamp(root, prefixes, refines) 1036 if not is_date_undefined(ts): 1037 ans.timestamp = ts 1038 lm = read_last_modified(root, prefixes, refines) 1039 if not is_date_undefined(lm): 1040 ans.last_modified = lm 1041 ans.comments = read_comments(root, prefixes, refines) or ans.comments 1042 ans.publisher = read_publisher(root, prefixes, refines) or ans.publisher 1043 ans.tags = read_tags(root, prefixes, refines) or ans.tags 1044 ans.rating = read_rating(root, prefixes, refines) or ans.rating 1045 s, si = read_series(root, prefixes, refines) 1046 if s: 1047 ans.series, ans.series_index = s, si 1048 ans.author_link_map = read_author_link_map(root, prefixes, refines) or ans.author_link_map 1049 ans.user_categories = read_user_categories(root, prefixes, refines) or ans.user_categories 1050 for name, fm in iteritems(read_user_metadata(root, prefixes, refines) or {}): 1051 ans.set_user_metadata(name, fm) 1052 if return_extra_data: 1053 ans = ans, ver, read_raster_cover(root, prefixes, refines), first_spine_item(root, prefixes, refines) 1054 return ans 1055 1056 1057def get_metadata(stream): 1058 root = parse_opf(stream) 1059 return read_metadata(root) 1060 1061 1062def apply_metadata(root, mi, cover_prefix='', cover_data=None, apply_null=False, update_timestamp=False, force_identifiers=False, add_missing_cover=True): 1063 prefixes, refines = read_prefixes(root), read_refines(root) 1064 current_mi = read_metadata(root) 1065 if apply_null: 1066 def ok(x): 1067 return True 1068 else: 1069 def ok(x): 1070 return not mi.is_null(x) 1071 if ok('identifiers'): 1072 set_identifiers(root, prefixes, refines, mi.identifiers, force_identifiers=force_identifiers) 1073 if ok('title'): 1074 set_title(root, prefixes, refines, mi.title, mi.title_sort) 1075 if ok('languages'): 1076 set_languages(root, prefixes, refines, mi.languages) 1077 if ok('book_producer'): 1078 set_book_producers(root, prefixes, refines, (mi.book_producer,)) 1079 aus = string_to_authors(mi.author_sort or '') 1080 authors = [] 1081 for i, aut in enumerate(mi.authors): 1082 authors.append(Author(aut, aus[i] if i < len(aus) else None)) 1083 if authors or apply_null: 1084 set_authors(root, prefixes, refines, authors) 1085 if ok('pubdate'): 1086 set_pubdate(root, prefixes, refines, mi.pubdate) 1087 if update_timestamp and mi.timestamp is not None: 1088 set_timestamp(root, prefixes, refines, mi.timestamp) 1089 if ok('comments'): 1090 set_comments(root, prefixes, refines, mi.comments) 1091 if ok('publisher'): 1092 set_publisher(root, prefixes, refines, mi.publisher) 1093 if ok('tags'): 1094 set_tags(root, prefixes, refines, mi.tags) 1095 if ok('rating') and mi.rating is not None and float(mi.rating) > 0.1: 1096 set_rating(root, prefixes, refines, mi.rating) 1097 if ok('series'): 1098 set_series(root, prefixes, refines, mi.series, mi.series_index or 1) 1099 if ok('author_link_map'): 1100 set_author_link_map(root, prefixes, refines, getattr(mi, 'author_link_map', None)) 1101 if ok('user_categories'): 1102 set_user_categories(root, prefixes, refines, getattr(mi, 'user_categories', None)) 1103 # We ignore apply_null for the next two to match the behavior with opf2.py 1104 if mi.application_id: 1105 set_application_id(root, prefixes, refines, mi.application_id) 1106 if mi.uuid: 1107 set_uuid(root, prefixes, refines, mi.uuid) 1108 current_mi.remove_stale_user_metadata(mi) 1109 new_user_metadata, current_user_metadata = mi.get_all_user_metadata(True), current_mi.get_all_user_metadata(True) 1110 missing = object() 1111 for key in tuple(new_user_metadata): 1112 meta = new_user_metadata.get(key) 1113 if meta is None: 1114 if apply_null: 1115 new_user_metadata[key] = None 1116 continue 1117 dt = meta.get('datatype') 1118 if dt == 'text' and meta.get('is_multiple'): 1119 val = mi.get(key, []) 1120 if val or apply_null: 1121 current_user_metadata[key] = meta 1122 elif dt in {'int', 'float', 'bool'}: 1123 val = mi.get(key, missing) 1124 if val is missing: 1125 if apply_null: 1126 current_user_metadata[key] = meta 1127 elif apply_null or val is not None: 1128 current_user_metadata[key] = meta 1129 elif apply_null or not mi.is_null(key): 1130 current_user_metadata[key] = meta 1131 1132 set_user_metadata(root, prefixes, refines, current_user_metadata) 1133 raster_cover = read_raster_cover(root, prefixes, refines) 1134 if not raster_cover and cover_data and add_missing_cover: 1135 if cover_prefix and not cover_prefix.endswith('/'): 1136 cover_prefix += '/' 1137 name = cover_prefix + 'cover.jpg' 1138 i = create_manifest_item(root, name, 'cover') 1139 if i is not None: 1140 ensure_is_only_raster_cover(root, prefixes, refines, name) 1141 raster_cover = name 1142 1143 pretty_print_opf(root) 1144 return raster_cover 1145 1146 1147def set_metadata(stream, mi, cover_prefix='', cover_data=None, apply_null=False, update_timestamp=False, force_identifiers=False, add_missing_cover=True): 1148 root = parse_opf(stream) 1149 return apply_metadata( 1150 root, mi, cover_prefix=cover_prefix, cover_data=cover_data, 1151 apply_null=apply_null, update_timestamp=update_timestamp, 1152 force_identifiers=force_identifiers) 1153# }}} 1154 1155 1156if __name__ == '__main__': 1157 import sys 1158 print(get_metadata(open(sys.argv[-1], 'rb'))) 1159