1#!/usr/bin/env python 2""" 3 Patch utility to apply unified diffs 4 5 Brute-force line-by-line non-recursive parsing 6 7 Copyright (c) 2008-2016 anatoly techtonik 8 Available under the terms of MIT license 9 10--- 11 The MIT License (MIT) 12 13 Copyright (c) 2019 JFrog LTD 14 15 Permission is hereby granted, free of charge, to any person obtaining a copy of this software 16 and associated documentation files (the "Software"), to deal in the Software without 17 restriction, including without limitation the rights to use, copy, modify, merge, publish, 18 distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the 19 Software is furnished to do so, subject to the following conditions: 20 21 The above copyright notice and this permission notice shall be included in all copies or 22 substantial portions of the Software. 23 24 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, 25 INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 26 PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR 27 ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 28 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 29 SOFTWARE. 30""" 31from __future__ import print_function 32 33__author__ = "Conan.io <info@conan.io>" 34__version__ = "1.17.4" 35__license__ = "MIT" 36__url__ = "https://github.com/conan-io/python-patch" 37 38import copy 39import logging 40import re 41import tempfile 42import codecs 43 44# cStringIO doesn't support unicode in 2.5 45try: 46 from StringIO import StringIO 47except ImportError: 48 from io import BytesIO as StringIO # python 3 49try: 50 import urllib2 as urllib_request 51except ImportError: 52 import urllib.request as urllib_request 53 54from os.path import exists, isfile, abspath 55import os 56import posixpath 57import shutil 58import sys 59import stat 60 61 62PY3K = sys.version_info >= (3, 0) 63 64# PEP 3114 65if not PY3K: 66 compat_next = lambda gen: gen.next() 67else: 68 compat_next = lambda gen: gen.__next__() 69 70def tostr(b): 71 """ Python 3 bytes encoder. Used to print filename in 72 diffstat output. Assumes that filenames are in utf-8. 73 """ 74 if not PY3K: 75 return b 76 77 # [ ] figure out how to print non-utf-8 filenames without 78 # information loss 79 return b.decode('utf-8') 80 81 82#------------------------------------------------ 83# Logging is controlled by logger named after the 84# module name (e.g. 'patch' for patch_ng.py module) 85 86logger = logging.getLogger("patch_ng") 87 88debug = logger.debug 89info = logger.info 90warning = logger.warning 91error = logger.error 92 93class NullHandler(logging.Handler): 94 """ Copied from Python 2.7 to avoid getting 95 `No handlers could be found for logger "patch"` 96 http://bugs.python.org/issue16539 97 """ 98 def handle(self, record): 99 pass 100 def emit(self, record): 101 pass 102 def createLock(self): 103 self.lock = None 104 105streamhandler = logging.StreamHandler() 106 107# initialize logger itself 108logger.addHandler(NullHandler()) 109 110debugmode = False 111 112def setdebug(): 113 global debugmode, streamhandler 114 115 debugmode = True 116 loglevel = logging.DEBUG 117 logformat = "%(levelname)8s %(message)s" 118 logger.setLevel(loglevel) 119 120 if streamhandler not in logger.handlers: 121 # when used as a library, streamhandler is not added 122 # by default 123 logger.addHandler(streamhandler) 124 125 streamhandler.setFormatter(logging.Formatter(logformat)) 126 127 128#------------------------------------------------ 129# Constants for Patch/PatchSet types 130 131DIFF = PLAIN = "plain" 132GIT = "git" 133HG = MERCURIAL = "mercurial" 134SVN = SUBVERSION = "svn" 135# mixed type is only actual when PatchSet contains 136# Patches of different type 137MIXED = MIXED = "mixed" 138 139 140#------------------------------------------------ 141# Helpers (these could come with Python stdlib) 142 143# x...() function are used to work with paths in 144# cross-platform manner - all paths use forward 145# slashes even on Windows. 146 147def xisabs(filename): 148 """ Cross-platform version of `os.path.isabs()` 149 Returns True if `filename` is absolute on 150 Linux, OS X or Windows. 151 """ 152 if filename.startswith(b'/'): # Linux/Unix 153 return True 154 elif filename.startswith(b'\\'): # Windows 155 return True 156 elif re.match(b'\\w:[\\\\/]', filename): # Windows 157 return True 158 return False 159 160def xnormpath(path): 161 """ Cross-platform version of os.path.normpath """ 162 # replace escapes and Windows slashes 163 normalized = posixpath.normpath(path).replace(b'\\', b'/') 164 # fold the result 165 return posixpath.normpath(normalized) 166 167def xstrip(filename): 168 """ Make relative path out of absolute by stripping 169 prefixes used on Linux, OS X and Windows. 170 171 This function is critical for security. 172 """ 173 while xisabs(filename): 174 # strip windows drive with all slashes 175 if re.match(b'\\w:[\\\\/]', filename): 176 filename = re.sub(b'^\\w+:[\\\\/]+', b'', filename) 177 # strip all slashes 178 elif re.match(b'[\\\\/]', filename): 179 filename = re.sub(b'^[\\\\/]+', b'', filename) 180 return filename 181 182 183def safe_unlink(filepath): 184 os.chmod(filepath, stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) 185 os.unlink(filepath) 186 187 188#----------------------------------------------- 189# Main API functions 190 191def fromfile(filename): 192 """ Parse patch file. If successful, returns 193 PatchSet() object. Otherwise returns False. 194 """ 195 patchset = PatchSet() 196 debug("reading %s" % filename) 197 fp = open(filename, "rb") 198 res = patchset.parse(fp) 199 fp.close() 200 if res == True: 201 return patchset 202 return False 203 204 205def fromstring(s): 206 """ Parse text string and return PatchSet() 207 object (or False if parsing fails) 208 """ 209 ps = PatchSet( StringIO(s) ) 210 if ps.errors == 0: 211 return ps 212 return False 213 214 215def fromurl(url): 216 """ Parse patch from an URL, return False 217 if an error occured. Note that this also 218 can throw urlopen() exceptions. 219 """ 220 ps = PatchSet( urllib_request.urlopen(url) ) 221 if ps.errors == 0: 222 return ps 223 return False 224 225 226# --- Utility functions --- 227# [ ] reuse more universal pathsplit() 228def pathstrip(path, n): 229 """ Strip n leading components from the given path """ 230 pathlist = [path] 231 while os.path.dirname(pathlist[0]) != b'': 232 pathlist[0:1] = os.path.split(pathlist[0]) 233 return b'/'.join(pathlist[n:]) 234# --- /Utility function --- 235 236 237def decode_text(text): 238 encodings = {codecs.BOM_UTF8: "utf_8_sig", 239 codecs.BOM_UTF16_BE: "utf_16_be", 240 codecs.BOM_UTF16_LE: "utf_16_le", 241 codecs.BOM_UTF32_BE: "utf_32_be", 242 codecs.BOM_UTF32_LE: "utf_32_le", 243 b'\x2b\x2f\x76\x38': "utf_7", 244 b'\x2b\x2f\x76\x39': "utf_7", 245 b'\x2b\x2f\x76\x2b': "utf_7", 246 b'\x2b\x2f\x76\x2f': "utf_7", 247 b'\x2b\x2f\x76\x38\x2d': "utf_7"} 248 for bom in sorted(encodings, key=len, reverse=True): 249 if text.startswith(bom): 250 try: 251 return text[len(bom):].decode(encodings[bom]) 252 except UnicodeDecodeError: 253 continue 254 decoders = ["utf-8", "Windows-1252"] 255 for decoder in decoders: 256 try: 257 return text.decode(decoder) 258 except UnicodeDecodeError: 259 continue 260 logger.warning("can't decode %s" % str(text)) 261 return text.decode("utf-8", "ignore") # Ignore not compatible characters 262 263 264def to_file_bytes(content): 265 if PY3K: 266 if not isinstance(content, bytes): 267 content = bytes(content, "utf-8") 268 elif isinstance(content, unicode): 269 content = content.encode("utf-8") 270 return content 271 272 273def load(path, binary=False): 274 """ Loads a file content """ 275 with open(path, 'rb') as handle: 276 tmp = handle.read() 277 return tmp if binary else decode_text(tmp) 278 279 280def save(path, content, only_if_modified=False): 281 """ 282 Saves a file with given content 283 Params: 284 path: path to write file to 285 content: contents to save in the file 286 only_if_modified: file won't be modified if the content hasn't changed 287 """ 288 try: 289 os.makedirs(os.path.dirname(path)) 290 except Exception: 291 pass 292 293 new_content = to_file_bytes(content) 294 295 if only_if_modified and os.path.exists(path): 296 old_content = load(path, binary=True) 297 if old_content == new_content: 298 return 299 300 with open(path, "wb") as handle: 301 handle.write(new_content) 302 303 304class Hunk(object): 305 """ Parsed hunk data container (hunk starts with @@ -R +R @@) """ 306 307 def __init__(self): 308 self.startsrc=None #: line count starts with 1 309 self.linessrc=None 310 self.starttgt=None 311 self.linestgt=None 312 self.invalid=False 313 self.desc='' 314 self.text=[] 315 316 317class Patch(object): 318 """ Patch for a single file. 319 If used as an iterable, returns hunks. 320 """ 321 def __init__(self): 322 self.source = None 323 self.target = None 324 self.hunks = [] 325 self.hunkends = [] 326 self.header = [] 327 328 self.type = None 329 330 def __iter__(self): 331 for h in self.hunks: 332 yield h 333 334 335class PatchSet(object): 336 """ PatchSet is a patch parser and container. 337 When used as an iterable, returns patches. 338 """ 339 340 def __init__(self, stream=None): 341 # --- API accessible fields --- 342 343 # name of the PatchSet (filename or ...) 344 self.name = None 345 # patch set type - one of constants 346 self.type = None 347 348 # list of Patch objects 349 self.items = [] 350 351 self.errors = 0 # fatal parsing errors 352 self.warnings = 0 # non-critical warnings 353 # --- /API --- 354 355 if stream: 356 self.parse(stream) 357 358 def __len__(self): 359 return len(self.items) 360 361 def __iter__(self): 362 for i in self.items: 363 yield i 364 365 def parse(self, stream): 366 """ parse unified diff 367 return True on success 368 """ 369 lineends = dict(lf=0, crlf=0, cr=0) 370 nexthunkno = 0 #: even if index starts with 0 user messages number hunks from 1 371 372 p = None 373 hunk = None 374 # hunkactual variable is used to calculate hunk lines for comparison 375 hunkactual = dict(linessrc=None, linestgt=None) 376 377 378 class wrapumerate(enumerate): 379 """Enumerate wrapper that uses boolean end of stream status instead of 380 StopIteration exception, and properties to access line information. 381 """ 382 383 def __init__(self, *args, **kwargs): 384 # we don't call parent, it is magically created by __new__ method 385 386 self._exhausted = False 387 self._lineno = False # after end of stream equal to the num of lines 388 self._line = False # will be reset to False after end of stream 389 390 def next(self): 391 """Try to read the next line and return True if it is available, 392 False if end of stream is reached.""" 393 if self._exhausted: 394 return False 395 396 try: 397 self._lineno, self._line = compat_next(super(wrapumerate, self)) 398 except StopIteration: 399 self._exhausted = True 400 self._line = False 401 return False 402 return True 403 404 @property 405 def is_empty(self): 406 return self._exhausted 407 408 @property 409 def line(self): 410 return self._line 411 412 @property 413 def lineno(self): 414 return self._lineno 415 416 # define states (possible file regions) that direct parse flow 417 headscan = True # start with scanning header 418 filenames = False # lines starting with --- and +++ 419 420 hunkhead = False # @@ -R +R @@ sequence 421 hunkbody = False # 422 hunkskip = False # skipping invalid hunk mode 423 424 hunkparsed = False # state after successfully parsed hunk 425 426 # regexp to match start of hunk, used groups - 1,3,4,6 427 re_hunk_start = re.compile(b"^@@ -(\d+)(,(\d+))? \+(\d+)(,(\d+))? @@") 428 429 self.errors = 0 430 # temp buffers for header and filenames info 431 header = [] 432 srcname = None 433 tgtname = None 434 435 # start of main cycle 436 # each parsing block already has line available in fe.line 437 fe = wrapumerate(stream) 438 while fe.next(): 439 440 # -- deciders: these only switch state to decide who should process 441 # -- line fetched at the start of this cycle 442 if hunkparsed: 443 hunkparsed = False 444 if re_hunk_start.match(fe.line): 445 hunkhead = True 446 elif fe.line.startswith(b"--- "): 447 filenames = True 448 else: 449 headscan = True 450 # -- ------------------------------------ 451 452 # read out header 453 if headscan: 454 while not fe.is_empty and not fe.line.startswith(b"--- "): 455 header.append(fe.line) 456 fe.next() 457 if fe.is_empty: 458 if p is None: 459 debug("no patch data found") # error is shown later 460 self.errors += 1 461 else: 462 info("%d unparsed bytes left at the end of stream" % len(b''.join(header))) 463 self.warnings += 1 464 # TODO check for \No new line at the end.. 465 # TODO test for unparsed bytes 466 # otherwise error += 1 467 # this is actually a loop exit 468 continue 469 470 headscan = False 471 # switch to filenames state 472 filenames = True 473 474 line = fe.line 475 lineno = fe.lineno 476 477 478 # hunkskip and hunkbody code skipped until definition of hunkhead is parsed 479 if hunkbody: 480 # [x] treat empty lines inside hunks as containing single space 481 # (this happens when diff is saved by copy/pasting to editor 482 # that strips trailing whitespace) 483 if line.strip(b"\r\n") == b"": 484 debug("expanding empty line in a middle of hunk body") 485 self.warnings += 1 486 line = b' ' + line 487 488 # process line first 489 if re.match(b"^[- \\+\\\\]", line): 490 # gather stats about line endings 491 if line.endswith(b"\r\n"): 492 p.hunkends["crlf"] += 1 493 elif line.endswith(b"\n"): 494 p.hunkends["lf"] += 1 495 elif line.endswith(b"\r"): 496 p.hunkends["cr"] += 1 497 498 if line.startswith(b"-"): 499 hunkactual["linessrc"] += 1 500 elif line.startswith(b"+"): 501 hunkactual["linestgt"] += 1 502 elif not line.startswith(b"\\"): 503 hunkactual["linessrc"] += 1 504 hunkactual["linestgt"] += 1 505 hunk.text.append(line) 506 # todo: handle \ No newline cases 507 else: 508 warning("invalid hunk no.%d at %d for target file %s" % (nexthunkno, lineno+1, p.target)) 509 # add hunk status node 510 hunk.invalid = True 511 p.hunks.append(hunk) 512 self.errors += 1 513 # switch to hunkskip state 514 hunkbody = False 515 hunkskip = True 516 517 # check exit conditions 518 if hunkactual["linessrc"] > hunk.linessrc or hunkactual["linestgt"] > hunk.linestgt: 519 warning("extra lines for hunk no.%d at %d for target %s" % (nexthunkno, lineno+1, p.target)) 520 # add hunk status node 521 hunk.invalid = True 522 p.hunks.append(hunk) 523 self.errors += 1 524 # switch to hunkskip state 525 hunkbody = False 526 hunkskip = True 527 elif hunk.linessrc == hunkactual["linessrc"] and hunk.linestgt == hunkactual["linestgt"]: 528 # hunk parsed successfully 529 p.hunks.append(hunk) 530 # switch to hunkparsed state 531 hunkbody = False 532 hunkparsed = True 533 534 # detect mixed window/unix line ends 535 ends = p.hunkends 536 if ((ends["cr"]!=0) + (ends["crlf"]!=0) + (ends["lf"]!=0)) > 1: 537 warning("inconsistent line ends in patch hunks for %s" % p.source) 538 self.warnings += 1 539 if debugmode: 540 debuglines = dict(ends) 541 debuglines.update(file=p.target, hunk=nexthunkno) 542 debug("crlf: %(crlf)d lf: %(lf)d cr: %(cr)d\t - file: %(file)s hunk: %(hunk)d" % debuglines) 543 # fetch next line 544 continue 545 546 if hunkskip: 547 if re_hunk_start.match(line): 548 # switch to hunkhead state 549 hunkskip = False 550 hunkhead = True 551 elif line.startswith(b"--- "): 552 # switch to filenames state 553 hunkskip = False 554 filenames = True 555 if debugmode and len(self.items) > 0: 556 debug("- %2d hunks for %s" % (len(p.hunks), p.source)) 557 558 if filenames: 559 if line.startswith(b"--- "): 560 if srcname != None: 561 # XXX testcase 562 warning("skipping false patch for %s" % srcname) 563 srcname = None 564 # XXX header += srcname 565 # double source filename line is encountered 566 # attempt to restart from this second line 567 568 # Files dated at Unix epoch don't exist, e.g.: 569 # '1970-01-01 01:00:00.000000000 +0100' 570 # They include timezone offsets. 571 # .. which can be parsed (if we remove the nanoseconds) 572 # .. by strptime() with: 573 # '%Y-%m-%d %H:%M:%S %z' 574 # .. but unfortunately this relies on the OSes libc 575 # strptime function and %z support is patchy, so we drop 576 # everything from the . onwards and group the year and time 577 # separately. 578 re_filename_date_time = b"^--- ([^\t]+)(?:\s([0-9-]+)\s([0-9:]+)|.*)" 579 match = re.match(re_filename_date_time, line) 580 # todo: support spaces in filenames 581 if match: 582 srcname = match.group(1).strip() 583 date = match.group(2) 584 time = match.group(3) 585 if (date == b'1970-01-01' or date == b'1969-12-31') and time.split(b':',1)[1] == b'00:00': 586 srcname = b'/dev/null' 587 else: 588 warning("skipping invalid filename at line %d" % (lineno+1)) 589 self.errors += 1 590 # XXX p.header += line 591 # switch back to headscan state 592 filenames = False 593 headscan = True 594 elif not line.startswith(b"+++ "): 595 if srcname != None: 596 warning("skipping invalid patch with no target for %s" % srcname) 597 self.errors += 1 598 srcname = None 599 # XXX header += srcname 600 # XXX header += line 601 else: 602 # this should be unreachable 603 warning("skipping invalid target patch") 604 filenames = False 605 headscan = True 606 else: 607 if tgtname != None: 608 # XXX seems to be a dead branch 609 warning("skipping invalid patch - double target at line %d" % (lineno+1)) 610 self.errors += 1 611 srcname = None 612 tgtname = None 613 # XXX header += srcname 614 # XXX header += tgtname 615 # XXX header += line 616 # double target filename line is encountered 617 # switch back to headscan state 618 filenames = False 619 headscan = True 620 else: 621 re_filename_date_time = b"^\+\+\+ ([^\t]+)(?:\s([0-9-]+)\s([0-9:]+)|.*)" 622 match = re.match(re_filename_date_time, line) 623 if not match: 624 warning("skipping invalid patch - no target filename at line %d" % (lineno+1)) 625 self.errors += 1 626 srcname = None 627 # switch back to headscan state 628 filenames = False 629 headscan = True 630 else: 631 tgtname = match.group(1).strip() 632 date = match.group(2) 633 time = match.group(3) 634 if (date == b'1970-01-01' or date == b'1969-12-31') and time.split(b':',1)[1] == b'00:00': 635 tgtname = b'/dev/null' 636 if p: # for the first run p is None 637 self.items.append(p) 638 p = Patch() 639 p.source = srcname 640 srcname = None 641 p.target = tgtname 642 tgtname = None 643 p.header = header 644 header = [] 645 # switch to hunkhead state 646 filenames = False 647 hunkhead = True 648 nexthunkno = 0 649 p.hunkends = lineends.copy() 650 continue 651 652 if hunkhead: 653 match = re.match(b"^@@ -(\d+)(,(\d+))? \+(\d+)(,(\d+))? @@(.*)", line) 654 if not match: 655 if not p.hunks: 656 warning("skipping invalid patch with no hunks for file %s" % p.source) 657 self.errors += 1 658 # XXX review switch 659 # switch to headscan state 660 hunkhead = False 661 headscan = True 662 continue 663 else: 664 # TODO review condition case 665 # switch to headscan state 666 hunkhead = False 667 headscan = True 668 else: 669 hunk = Hunk() 670 hunk.startsrc = int(match.group(1)) 671 hunk.linessrc = 1 672 if match.group(3): hunk.linessrc = int(match.group(3)) 673 hunk.starttgt = int(match.group(4)) 674 hunk.linestgt = 1 675 if match.group(6): hunk.linestgt = int(match.group(6)) 676 hunk.invalid = False 677 hunk.desc = match.group(7)[1:].rstrip() 678 hunk.text = [] 679 680 hunkactual["linessrc"] = hunkactual["linestgt"] = 0 681 682 # switch to hunkbody state 683 hunkhead = False 684 hunkbody = True 685 nexthunkno += 1 686 continue 687 688 # /while fe.next() 689 690 if p: 691 self.items.append(p) 692 693 if not hunkparsed: 694 if hunkskip: 695 warning("warning: finished with errors, some hunks may be invalid") 696 elif headscan: 697 if len(self.items) == 0: 698 warning("error: no patch data found!") 699 return False 700 else: # extra data at the end of file 701 pass 702 else: 703 warning("error: patch stream is incomplete!") 704 self.errors += 1 705 if len(self.items) == 0: 706 return False 707 708 if debugmode and len(self.items) > 0: 709 debug("- %2d hunks for %s" % (len(p.hunks), p.source)) 710 711 # XXX fix total hunks calculation 712 debug("total files: %d total hunks: %d" % (len(self.items), 713 sum(len(p.hunks) for p in self.items))) 714 715 # ---- detect patch and patchset types ---- 716 for idx, p in enumerate(self.items): 717 self.items[idx].type = self._detect_type(p) 718 719 types = set([p.type for p in self.items]) 720 if len(types) > 1: 721 self.type = MIXED 722 else: 723 self.type = types.pop() 724 # -------- 725 726 self._normalize_filenames() 727 728 return (self.errors == 0) 729 730 def _detect_type(self, p): 731 """ detect and return type for the specified Patch object 732 analyzes header and filenames info 733 734 NOTE: must be run before filenames are normalized 735 """ 736 737 # check for SVN 738 # - header starts with Index: 739 # - next line is ===... delimiter 740 # - filename is followed by revision number 741 # TODO add SVN revision 742 if (len(p.header) > 1 and p.header[-2].startswith(b"Index: ") 743 and p.header[-1].startswith(b"="*67)): 744 return SVN 745 746 # common checks for both HG and GIT 747 DVCS = ((p.source.startswith(b'a/') or p.source == b'/dev/null') 748 and (p.target.startswith(b'b/') or p.target == b'/dev/null')) 749 750 # GIT type check 751 # - header[-2] is like "diff --git a/oldname b/newname" 752 # - header[-1] is like "index <hash>..<hash> <mode>" 753 # TODO add git rename diffs and add/remove diffs 754 # add git diff with spaced filename 755 # TODO http://www.kernel.org/pub/software/scm/git/docs/git-diff.html 756 757 # Git patch header len is 2 min 758 if len(p.header) > 1: 759 # detect the start of diff header - there might be some comments before 760 for idx in reversed(range(len(p.header))): 761 if p.header[idx].startswith(b"diff --git"): 762 break 763 if p.header[idx].startswith(b'diff --git a/'): 764 if (idx+1 < len(p.header) 765 and re.match(b'(?:index \\w{7}..\\w{7} \\d{6}|new file mode \\d*)', p.header[idx+1])): 766 if DVCS: 767 return GIT 768 769 # HG check 770 # 771 # - for plain HG format header is like "diff -r b2d9961ff1f5 filename" 772 # - for Git-style HG patches it is "diff --git a/oldname b/newname" 773 # - filename starts with a/, b/ or is equal to /dev/null 774 # - exported changesets also contain the header 775 # # HG changeset patch 776 # # User name@example.com 777 # ... 778 # TODO add MQ 779 # TODO add revision info 780 if len(p.header) > 0: 781 if DVCS and re.match(b'diff -r \\w{12} .*', p.header[-1]): 782 return HG 783 if DVCS and p.header[-1].startswith(b'diff --git a/'): 784 if len(p.header) == 1: # native Git patch header len is 2 785 return HG 786 elif p.header[0].startswith(b'# HG changeset patch'): 787 return HG 788 789 return PLAIN 790 791 792 def _normalize_filenames(self): 793 """ sanitize filenames, normalizing paths, i.e.: 794 1. strip a/ and b/ prefixes from GIT and HG style patches 795 2. remove all references to parent directories (with warning) 796 3. translate any absolute paths to relative (with warning) 797 798 [x] always use forward slashes to be crossplatform 799 (diff/patch were born as a unix utility after all) 800 801 return None 802 """ 803 if debugmode: 804 debug("normalize filenames") 805 for i,p in enumerate(self.items): 806 if debugmode: 807 debug(" patch type = %s" % p.type) 808 debug(" source = %s" % p.source) 809 debug(" target = %s" % p.target) 810 if p.type in (HG, GIT): 811 debug("stripping a/ and b/ prefixes") 812 if p.source != b'/dev/null': 813 if not p.source.startswith(b"a/"): 814 warning("invalid source filename") 815 else: 816 p.source = p.source[2:] 817 if p.target != b'/dev/null': 818 if not p.target.startswith(b"b/"): 819 warning("invalid target filename") 820 else: 821 p.target = p.target[2:] 822 823 p.source = xnormpath(p.source) 824 p.target = xnormpath(p.target) 825 826 sep = b'/' # sep value can be hardcoded, but it looks nice this way 827 828 # references to parent are not allowed 829 if p.source.startswith(b".." + sep): 830 warning("error: stripping parent path for source file patch no.%d" % (i+1)) 831 self.warnings += 1 832 while p.source.startswith(b".." + sep): 833 p.source = p.source.partition(sep)[2] 834 if p.target.startswith(b".." + sep): 835 warning("error: stripping parent path for target file patch no.%d" % (i+1)) 836 self.warnings += 1 837 while p.target.startswith(b".." + sep): 838 p.target = p.target.partition(sep)[2] 839 # absolute paths are not allowed 840 if (xisabs(p.source) and p.source != b'/dev/null') or \ 841 (xisabs(p.target) and p.target != b'/dev/null'): 842 warning("error: absolute paths are not allowed - file no.%d" % (i+1)) 843 self.warnings += 1 844 if xisabs(p.source) and p.source != b'/dev/null': 845 warning("stripping absolute path from source name '%s'" % p.source) 846 p.source = xstrip(p.source) 847 if xisabs(p.target) and p.target != b'/dev/null': 848 warning("stripping absolute path from target name '%s'" % p.target) 849 p.target = xstrip(p.target) 850 851 self.items[i].source = p.source 852 self.items[i].target = p.target 853 854 855 def diffstat(self): 856 """ calculate diffstat and return as a string 857 Notes: 858 - original diffstat ouputs target filename 859 - single + or - shouldn't escape histogram 860 """ 861 names = [] 862 insert = [] 863 delete = [] 864 delta = 0 # size change in bytes 865 namelen = 0 866 maxdiff = 0 # max number of changes for single file 867 # (for histogram width calculation) 868 for patch in self.items: 869 i,d = 0,0 870 for hunk in patch.hunks: 871 for line in hunk.text: 872 if line.startswith(b'+'): 873 i += 1 874 delta += len(line)-1 875 elif line.startswith(b'-'): 876 d += 1 877 delta -= len(line)-1 878 names.append(patch.target) 879 insert.append(i) 880 delete.append(d) 881 namelen = max(namelen, len(patch.target)) 882 maxdiff = max(maxdiff, i+d) 883 output = '' 884 statlen = len(str(maxdiff)) # stats column width 885 for i,n in enumerate(names): 886 # %-19s | %-4d %s 887 format = " %-" + str(namelen) + "s | %" + str(statlen) + "s %s\n" 888 889 hist = '' 890 # -- calculating histogram -- 891 width = len(format % ('', '', '')) 892 histwidth = max(2, 80 - width) 893 if maxdiff < histwidth: 894 hist = "+"*insert[i] + "-"*delete[i] 895 else: 896 iratio = (float(insert[i]) / maxdiff) * histwidth 897 dratio = (float(delete[i]) / maxdiff) * histwidth 898 899 # make sure every entry gets at least one + or - 900 iwidth = 1 if 0 < iratio < 1 else int(iratio) 901 dwidth = 1 if 0 < dratio < 1 else int(dratio) 902 #print(iratio, dratio, iwidth, dwidth, histwidth) 903 hist = "+"*int(iwidth) + "-"*int(dwidth) 904 # -- /calculating +- histogram -- 905 output += (format % (tostr(names[i]), str(insert[i] + delete[i]), hist)) 906 907 output += (" %d files changed, %d insertions(+), %d deletions(-), %+d bytes" 908 % (len(names), sum(insert), sum(delete), delta)) 909 return output 910 911 912 def findfiles(self, old, new): 913 """ return tuple of source file, target file """ 914 if old == b'/dev/null': 915 handle, abspath = tempfile.mkstemp(suffix='pypatch') 916 abspath = abspath.encode() 917 # The source file must contain a line for the hunk matching to succeed. 918 os.write(handle, b' ') 919 os.close(handle) 920 if not exists(new): 921 handle = open(new, 'wb') 922 handle.close() 923 return abspath, new 924 elif exists(old): 925 return old, old 926 elif exists(new): 927 return new, new 928 elif new == b'/dev/null': 929 return None, None 930 else: 931 # [w] Google Code generates broken patches with its online editor 932 debug("broken patch from Google Code, stripping prefixes..") 933 if old.startswith(b'a/') and new.startswith(b'b/'): 934 old, new = old[2:], new[2:] 935 debug(" %s" % old) 936 debug(" %s" % new) 937 if exists(old): 938 return old, old 939 elif exists(new): 940 return new, new 941 return None, None 942 943 def _strip_prefix(self, filename): 944 if filename.startswith(b'a/') or filename.startswith(b'b/'): 945 return filename[2:] 946 return filename 947 948 def decode_clean(self, path, prefix): 949 path = path.decode("utf-8").replace("\\", "/") 950 if path.startswith(prefix): 951 path = path[2:] 952 return path 953 954 def strip_path(self, path, base_path, strip=0): 955 tokens = path.split("/") 956 if len(tokens) > 1: 957 tokens = tokens[strip:] 958 path = "/".join(tokens) 959 if base_path: 960 path = os.path.join(base_path, path) 961 return path 962 # account for new and deleted files, upstream dep won't fix them 963 964 965 966 967 def apply(self, strip=0, root=None, fuzz=False): 968 """ Apply parsed patch, optionally stripping leading components 969 from file paths. `root` parameter specifies working dir. 970 :param strip: Strip patch path 971 :param root: Folder to apply the patch 972 :param fuzz: Accept fuzzy patches 973 return True on success 974 """ 975 items = [] 976 for item in self.items: 977 source = self.decode_clean(item.source, "a/") 978 target = self.decode_clean(item.target, "b/") 979 if "dev/null" in source: 980 target = self.strip_path(target, root, strip) 981 hunks = [s.decode("utf-8") for s in item.hunks[0].text] 982 new_file = "".join(hunk[1:] for hunk in hunks) 983 save(target, new_file) 984 elif "dev/null" in target: 985 source = self.strip_path(source, root, strip) 986 safe_unlink(source) 987 else: 988 items.append(item) 989 self.items = items 990 991 if root: 992 prevdir = os.getcwd() 993 os.chdir(root) 994 995 total = len(self.items) 996 errors = 0 997 if strip: 998 # [ ] test strip level exceeds nesting level 999 # [ ] test the same only for selected files 1000 # [ ] test if files end up being on the same level 1001 try: 1002 strip = int(strip) 1003 except ValueError: 1004 errors += 1 1005 warning("error: strip parameter '%s' must be an integer" % strip) 1006 strip = 0 1007 1008 #for fileno, filename in enumerate(self.source): 1009 for i,p in enumerate(self.items): 1010 if strip: 1011 debug("stripping %s leading component(s) from:" % strip) 1012 debug(" %s" % p.source) 1013 debug(" %s" % p.target) 1014 old = p.source if p.source == b'/dev/null' else pathstrip(p.source, strip) 1015 new = p.target if p.target == b'/dev/null' else pathstrip(p.target, strip) 1016 else: 1017 old, new = p.source, p.target 1018 1019 filenameo, filenamen = self.findfiles(old, new) 1020 1021 if not filenameo or not filenamen: 1022 error("source/target file does not exist:\n --- %s\n +++ %s" % (old, new)) 1023 errors += 1 1024 continue 1025 if not isfile(filenameo): 1026 error("not a file - %s" % filenameo) 1027 errors += 1 1028 continue 1029 1030 # [ ] check absolute paths security here 1031 debug("processing %d/%d:\t %s" % (i+1, total, filenamen)) 1032 1033 # validate before patching 1034 f2fp = open(filenameo, 'rb') 1035 hunkno = 0 1036 hunk = p.hunks[hunkno] 1037 hunkfind = [] 1038 hunkreplace = [] 1039 validhunks = 0 1040 canpatch = False 1041 for lineno, line in enumerate(f2fp): 1042 if lineno+1 < hunk.startsrc: 1043 continue 1044 elif lineno+1 == hunk.startsrc: 1045 hunkfind = [x[1:].rstrip(b"\r\n") for x in hunk.text if x[0] in b" -"] 1046 hunkreplace = [x[1:].rstrip(b"\r\n") for x in hunk.text if x[0] in b" +"] 1047 #pprint(hunkreplace) 1048 hunklineno = 0 1049 1050 # todo \ No newline at end of file 1051 1052 # check hunks in source file 1053 if lineno+1 < hunk.startsrc+len(hunkfind): 1054 if line.rstrip(b"\r\n") == hunkfind[hunklineno]: 1055 hunklineno += 1 1056 else: 1057 warning("file %d/%d:\t %s" % (i+1, total, filenamen)) 1058 warning(" hunk no.%d doesn't match source file at line %d" % (hunkno+1, lineno+1)) 1059 warning(" expected: %s" % hunkfind[hunklineno]) 1060 warning(" actual : %s" % line.rstrip(b"\r\n")) 1061 if fuzz: 1062 hunklineno += 1 1063 else: 1064 # not counting this as error, because file may already be patched. 1065 # check if file is already patched is done after the number of 1066 # invalid hunks if found 1067 # TODO: check hunks against source/target file in one pass 1068 # API - check(stream, srchunks, tgthunks) 1069 # return tuple (srcerrs, tgterrs) 1070 1071 # continue to check other hunks for completeness 1072 hunkno += 1 1073 if hunkno < len(p.hunks): 1074 hunk = p.hunks[hunkno] 1075 continue 1076 else: 1077 break 1078 1079 # check if processed line is the last line 1080 if len(hunkfind) == 0 or lineno+1 == hunk.startsrc+len(hunkfind)-1: 1081 debug(" hunk no.%d for file %s -- is ready to be patched" % (hunkno+1, filenamen)) 1082 hunkno+=1 1083 validhunks+=1 1084 if hunkno < len(p.hunks): 1085 hunk = p.hunks[hunkno] 1086 else: 1087 if validhunks == len(p.hunks): 1088 # patch file 1089 canpatch = True 1090 break 1091 else: 1092 if hunkno < len(p.hunks): 1093 error("premature end of source file %s at hunk %d" % (filenameo, hunkno+1)) 1094 errors += 1 1095 1096 f2fp.close() 1097 1098 if validhunks < len(p.hunks): 1099 if self._match_file_hunks(filenameo, p.hunks): 1100 warning("already patched %s" % filenameo) 1101 else: 1102 if fuzz: 1103 warning("source file is different - %s" % filenameo) 1104 else: 1105 error("source file is different - %s" % filenameo) 1106 errors += 1 1107 if canpatch: 1108 backupname = filenamen+b".orig" 1109 if exists(backupname): 1110 warning("can't backup original file to %s - aborting" % backupname) 1111 errors += 1 1112 else: 1113 shutil.move(filenamen, backupname) 1114 if self.write_hunks(backupname if filenameo == filenamen else filenameo, filenamen, p.hunks): 1115 info("successfully patched %d/%d:\t %s" % (i+1, total, filenamen)) 1116 safe_unlink(backupname) 1117 if new == b'/dev/null': 1118 # check that filename is of size 0 and delete it. 1119 if os.path.getsize(filenamen) > 0: 1120 warning("expected patched file to be empty as it's marked as deletion:\t %s" % filenamen) 1121 safe_unlink(filenamen) 1122 else: 1123 errors += 1 1124 warning("error patching file %s" % filenamen) 1125 shutil.copy(filenamen, filenamen+".invalid") 1126 warning("invalid version is saved to %s" % filenamen+".invalid") 1127 # todo: proper rejects 1128 shutil.move(backupname, filenamen) 1129 1130 if root: 1131 os.chdir(prevdir) 1132 1133 # todo: check for premature eof 1134 return (errors == 0) 1135 1136 1137 def _reverse(self): 1138 """ reverse patch direction (this doesn't touch filenames) """ 1139 for p in self.items: 1140 for h in p.hunks: 1141 h.startsrc, h.starttgt = h.starttgt, h.startsrc 1142 h.linessrc, h.linestgt = h.linestgt, h.linessrc 1143 for i,line in enumerate(h.text): 1144 # need to use line[0:1] here, because line[0] 1145 # returns int instead of bytes on Python 3 1146 if line[0:1] == b'+': 1147 h.text[i] = b'-' + line[1:] 1148 elif line[0:1] == b'-': 1149 h.text[i] = b'+' +line[1:] 1150 1151 def revert(self, strip=0, root=None): 1152 """ apply patch in reverse order """ 1153 reverted = copy.deepcopy(self) 1154 reverted._reverse() 1155 return reverted.apply(strip, root) 1156 1157 1158 def can_patch(self, filename): 1159 """ Check if specified filename can be patched. Returns None if file can 1160 not be found among source filenames. False if patch can not be applied 1161 clearly. True otherwise. 1162 1163 :returns: True, False or None 1164 """ 1165 filename = abspath(filename) 1166 for p in self.items: 1167 if filename == abspath(p.source): 1168 return self._match_file_hunks(filename, p.hunks) 1169 return None 1170 1171 1172 def _match_file_hunks(self, filepath, hunks): 1173 matched = True 1174 fp = open(abspath(filepath), 'rb') 1175 1176 class NoMatch(Exception): 1177 pass 1178 1179 lineno = 1 1180 line = fp.readline() 1181 try: 1182 for hno, h in enumerate(hunks): 1183 # skip to first line of the hunk 1184 while lineno < h.starttgt: 1185 if not len(line): # eof 1186 debug("check failed - premature eof before hunk: %d" % (hno+1)) 1187 raise NoMatch 1188 line = fp.readline() 1189 lineno += 1 1190 for hline in h.text: 1191 if hline.startswith(b"-"): 1192 continue 1193 if not len(line): 1194 debug("check failed - premature eof on hunk: %d" % (hno+1)) 1195 # todo: \ No newline at the end of file 1196 raise NoMatch 1197 if line.rstrip(b"\r\n") != hline[1:].rstrip(b"\r\n"): 1198 debug("file is not patched - failed hunk: %d" % (hno+1)) 1199 raise NoMatch 1200 line = fp.readline() 1201 lineno += 1 1202 1203 except NoMatch: 1204 matched = False 1205 # todo: display failed hunk, i.e. expected/found 1206 1207 fp.close() 1208 return matched 1209 1210 1211 def patch_stream(self, instream, hunks): 1212 """ Generator that yields stream patched with hunks iterable 1213 1214 Converts lineends in hunk lines to the best suitable format 1215 autodetected from input 1216 """ 1217 1218 # todo: At the moment substituted lineends may not be the same 1219 # at the start and at the end of patching. Also issue a 1220 # warning/throw about mixed lineends (is it really needed?) 1221 1222 hunks = iter(hunks) 1223 1224 srclineno = 1 1225 1226 lineends = {b'\n':0, b'\r\n':0, b'\r':0} 1227 def get_line(): 1228 """ 1229 local utility function - return line from source stream 1230 collecting line end statistics on the way 1231 """ 1232 line = instream.readline() 1233 # 'U' mode works only with text files 1234 if line.endswith(b"\r\n"): 1235 lineends[b"\r\n"] += 1 1236 elif line.endswith(b"\n"): 1237 lineends[b"\n"] += 1 1238 elif line.endswith(b"\r"): 1239 lineends[b"\r"] += 1 1240 return line 1241 1242 for hno, h in enumerate(hunks): 1243 debug("hunk %d" % (hno+1)) 1244 # skip to line just before hunk starts 1245 while srclineno < h.startsrc: 1246 yield get_line() 1247 srclineno += 1 1248 1249 for hline in h.text: 1250 # todo: check \ No newline at the end of file 1251 if hline.startswith(b"-") or hline.startswith(b"\\"): 1252 get_line() 1253 srclineno += 1 1254 continue 1255 else: 1256 if not hline.startswith(b"+"): 1257 yield get_line() 1258 srclineno += 1 1259 continue 1260 line2write = hline[1:] 1261 # detect if line ends are consistent in source file 1262 if sum([bool(lineends[x]) for x in lineends]) == 1: 1263 newline = [x for x in lineends if lineends[x] != 0][0] 1264 yield line2write.rstrip(b"\r\n")+newline 1265 else: # newlines are mixed 1266 yield line2write 1267 1268 for line in instream: 1269 yield line 1270 1271 1272 def write_hunks(self, srcname, tgtname, hunks): 1273 src = open(srcname, "rb") 1274 tgt = open(tgtname, "wb") 1275 1276 debug("processing target file %s" % tgtname) 1277 1278 tgt.writelines(self.patch_stream(src, hunks)) 1279 1280 tgt.close() 1281 src.close() 1282 # [ ] TODO: add test for permission copy 1283 shutil.copymode(srcname, tgtname) 1284 return True 1285 1286 1287 def dump(self): 1288 for p in self.items: 1289 for headline in p.header: 1290 print(headline.rstrip('\n')) 1291 print('--- ' + p.source) 1292 print('+++ ' + p.target) 1293 for h in p.hunks: 1294 print('@@ -%s,%s +%s,%s @@' % (h.startsrc, h.linessrc, h.starttgt, h.linestgt)) 1295 for line in h.text: 1296 print(line.rstrip('\n')) 1297 1298 1299def main(): 1300 from optparse import OptionParser 1301 from os.path import exists 1302 import sys 1303 1304 opt = OptionParser(usage="1. %prog [options] unified.diff\n" 1305 " 2. %prog [options] http://host/patch\n" 1306 " 3. %prog [options] -- < unified.diff", 1307 version="python-patch %s" % __version__) 1308 opt.add_option("-q", "--quiet", action="store_const", dest="verbosity", 1309 const=0, help="print only warnings and errors", default=1) 1310 opt.add_option("-v", "--verbose", action="store_const", dest="verbosity", 1311 const=2, help="be verbose") 1312 opt.add_option("--debug", action="store_true", dest="debugmode", help="debug mode") 1313 opt.add_option("--diffstat", action="store_true", dest="diffstat", 1314 help="print diffstat and exit") 1315 opt.add_option("-d", "--directory", metavar='DIR', 1316 help="specify root directory for applying patch") 1317 opt.add_option("-p", "--strip", type="int", metavar='N', default=0, 1318 help="strip N path components from filenames") 1319 opt.add_option("--revert", action="store_true", 1320 help="apply patch in reverse order (unpatch)") 1321 opt.add_option("-f", "--fuzz", action="store_true", dest="fuzz", help="Accept fuuzzy patches") 1322 (options, args) = opt.parse_args() 1323 1324 if not args and sys.argv[-1:] != ['--']: 1325 opt.print_version() 1326 opt.print_help() 1327 sys.exit() 1328 readstdin = (sys.argv[-1:] == ['--'] and not args) 1329 1330 verbosity_levels = {0:logging.WARNING, 1:logging.INFO, 2:logging.DEBUG} 1331 loglevel = verbosity_levels[options.verbosity] 1332 logformat = "%(message)s" 1333 logger.setLevel(loglevel) 1334 streamhandler.setFormatter(logging.Formatter(logformat)) 1335 1336 if options.debugmode: 1337 setdebug() # this sets global debugmode variable 1338 1339 if readstdin: 1340 patch = PatchSet(sys.stdin) 1341 else: 1342 patchfile = args[0] 1343 urltest = patchfile.split(':')[0] 1344 if (':' in patchfile and urltest.isalpha() 1345 and len(urltest) > 1): # one char before : is a windows drive letter 1346 patch = fromurl(patchfile) 1347 else: 1348 if not exists(patchfile) or not isfile(patchfile): 1349 sys.exit("patch file does not exist - %s" % patchfile) 1350 patch = fromfile(patchfile) 1351 1352 if options.diffstat: 1353 print(patch.diffstat()) 1354 sys.exit(0) 1355 1356 if not patch: 1357 error("Could not parse patch") 1358 sys.exit(-1) 1359 1360 #pprint(patch) 1361 if options.revert: 1362 patch.revert(options.strip, root=options.directory) or sys.exit(-1) 1363 else: 1364 patch.apply(options.strip, root=options.directory, fuzz=options.fuzz) or sys.exit(-1) 1365 1366 # todo: document and test line ends handling logic - patch_ng.py detects proper line-endings 1367 # for inserted hunks and issues a warning if patched file has incosistent line ends 1368 1369 1370if __name__ == "__main__": 1371 main() 1372 1373# Legend: 1374# [ ] - some thing to be done 1375# [w] - official wart, external or internal that is unlikely to be fixed 1376 1377# [ ] API break (2.x) wishlist 1378# PatchSet.items --> PatchSet.patches 1379 1380# [ ] run --revert test for all dataset items 1381# [ ] run .parse() / .dump() test for dataset 1382