1#!/usr/bin/env python
2"""
3    Patch utility to apply unified diffs
4
5    Brute-force line-by-line non-recursive parsing
6
7    Copyright (c) 2008-2016 anatoly techtonik
8    Available under the terms of MIT license
9
10---
11    The MIT License (MIT)
12
13    Copyright (c) 2019 JFrog LTD
14
15    Permission is hereby granted, free of charge, to any person obtaining a copy of this software
16    and associated documentation files (the "Software"), to deal in the Software without
17    restriction, including without limitation the rights to use, copy, modify, merge, publish,
18    distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the
19    Software is furnished to do so, subject to the following conditions:
20
21    The above copyright notice and this permission notice shall be included in all copies or
22    substantial portions of the Software.
23
24    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
25    INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
26    PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
27    ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
28    ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29    SOFTWARE.
30"""
31from __future__ import print_function
32
33__author__ = "Conan.io <info@conan.io>"
34__version__ = "1.17.4"
35__license__ = "MIT"
36__url__ = "https://github.com/conan-io/python-patch"
37
38import copy
39import logging
40import re
41import tempfile
42import codecs
43
44# cStringIO doesn't support unicode in 2.5
45try:
46  from StringIO import StringIO
47except ImportError:
48  from io import BytesIO as StringIO # python 3
49try:
50  import urllib2 as urllib_request
51except ImportError:
52  import urllib.request as urllib_request
53
54from os.path import exists, isfile, abspath
55import os
56import posixpath
57import shutil
58import sys
59import stat
60
61
62PY3K = sys.version_info >= (3, 0)
63
64# PEP 3114
65if not PY3K:
66  compat_next = lambda gen: gen.next()
67else:
68  compat_next = lambda gen: gen.__next__()
69
70def tostr(b):
71  """ Python 3 bytes encoder. Used to print filename in
72      diffstat output. Assumes that filenames are in utf-8.
73  """
74  if not PY3K:
75    return b
76
77  # [ ] figure out how to print non-utf-8 filenames without
78  #     information loss
79  return b.decode('utf-8')
80
81
82#------------------------------------------------
83# Logging is controlled by logger named after the
84# module name (e.g. 'patch' for patch_ng.py module)
85
86logger = logging.getLogger("patch_ng")
87
88debug = logger.debug
89info = logger.info
90warning = logger.warning
91error = logger.error
92
93class NullHandler(logging.Handler):
94  """ Copied from Python 2.7 to avoid getting
95      `No handlers could be found for logger "patch"`
96      http://bugs.python.org/issue16539
97  """
98  def handle(self, record):
99    pass
100  def emit(self, record):
101    pass
102  def createLock(self):
103    self.lock = None
104
105streamhandler = logging.StreamHandler()
106
107# initialize logger itself
108logger.addHandler(NullHandler())
109
110debugmode = False
111
112def setdebug():
113  global debugmode, streamhandler
114
115  debugmode = True
116  loglevel = logging.DEBUG
117  logformat = "%(levelname)8s %(message)s"
118  logger.setLevel(loglevel)
119
120  if streamhandler not in logger.handlers:
121    # when used as a library, streamhandler is not added
122    # by default
123    logger.addHandler(streamhandler)
124
125  streamhandler.setFormatter(logging.Formatter(logformat))
126
127
128#------------------------------------------------
129# Constants for Patch/PatchSet types
130
131DIFF = PLAIN = "plain"
132GIT = "git"
133HG = MERCURIAL = "mercurial"
134SVN = SUBVERSION = "svn"
135# mixed type is only actual when PatchSet contains
136# Patches of different type
137MIXED = MIXED = "mixed"
138
139
140#------------------------------------------------
141# Helpers (these could come with Python stdlib)
142
143# x...() function are used to work with paths in
144# cross-platform manner - all paths use forward
145# slashes even on Windows.
146
147def xisabs(filename):
148  """ Cross-platform version of `os.path.isabs()`
149      Returns True if `filename` is absolute on
150      Linux, OS X or Windows.
151  """
152  if filename.startswith(b'/'):     # Linux/Unix
153    return True
154  elif filename.startswith(b'\\'):  # Windows
155    return True
156  elif re.match(b'\\w:[\\\\/]', filename): # Windows
157    return True
158  return False
159
160def xnormpath(path):
161  """ Cross-platform version of os.path.normpath """
162  # replace escapes and Windows slashes
163  normalized = posixpath.normpath(path).replace(b'\\', b'/')
164  # fold the result
165  return posixpath.normpath(normalized)
166
167def xstrip(filename):
168  """ Make relative path out of absolute by stripping
169      prefixes used on Linux, OS X and Windows.
170
171      This function is critical for security.
172  """
173  while xisabs(filename):
174    # strip windows drive with all slashes
175    if re.match(b'\\w:[\\\\/]', filename):
176      filename = re.sub(b'^\\w+:[\\\\/]+', b'', filename)
177    # strip all slashes
178    elif re.match(b'[\\\\/]', filename):
179      filename = re.sub(b'^[\\\\/]+', b'', filename)
180  return filename
181
182
183def safe_unlink(filepath):
184  os.chmod(filepath, stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
185  os.unlink(filepath)
186
187
188#-----------------------------------------------
189# Main API functions
190
191def fromfile(filename):
192  """ Parse patch file. If successful, returns
193      PatchSet() object. Otherwise returns False.
194  """
195  patchset = PatchSet()
196  debug("reading %s" % filename)
197  fp = open(filename, "rb")
198  res = patchset.parse(fp)
199  fp.close()
200  if res == True:
201    return patchset
202  return False
203
204
205def fromstring(s):
206  """ Parse text string and return PatchSet()
207      object (or False if parsing fails)
208  """
209  ps = PatchSet( StringIO(s) )
210  if ps.errors == 0:
211    return ps
212  return False
213
214
215def fromurl(url):
216  """ Parse patch from an URL, return False
217      if an error occured. Note that this also
218      can throw urlopen() exceptions.
219  """
220  ps = PatchSet( urllib_request.urlopen(url) )
221  if ps.errors == 0:
222    return ps
223  return False
224
225
226# --- Utility functions ---
227# [ ] reuse more universal pathsplit()
228def pathstrip(path, n):
229  """ Strip n leading components from the given path """
230  pathlist = [path]
231  while os.path.dirname(pathlist[0]) != b'':
232    pathlist[0:1] = os.path.split(pathlist[0])
233  return b'/'.join(pathlist[n:])
234# --- /Utility function ---
235
236
237def decode_text(text):
238  encodings = {codecs.BOM_UTF8: "utf_8_sig",
239               codecs.BOM_UTF16_BE: "utf_16_be",
240               codecs.BOM_UTF16_LE: "utf_16_le",
241               codecs.BOM_UTF32_BE: "utf_32_be",
242               codecs.BOM_UTF32_LE: "utf_32_le",
243               b'\x2b\x2f\x76\x38': "utf_7",
244               b'\x2b\x2f\x76\x39': "utf_7",
245               b'\x2b\x2f\x76\x2b': "utf_7",
246               b'\x2b\x2f\x76\x2f': "utf_7",
247               b'\x2b\x2f\x76\x38\x2d': "utf_7"}
248  for bom in sorted(encodings, key=len, reverse=True):
249    if text.startswith(bom):
250      try:
251        return text[len(bom):].decode(encodings[bom])
252      except UnicodeDecodeError:
253        continue
254  decoders = ["utf-8", "Windows-1252"]
255  for decoder in decoders:
256    try:
257      return text.decode(decoder)
258    except UnicodeDecodeError:
259      continue
260  logger.warning("can't decode %s" % str(text))
261  return text.decode("utf-8", "ignore")  # Ignore not compatible characters
262
263
264def to_file_bytes(content):
265  if PY3K:
266    if not isinstance(content, bytes):
267      content = bytes(content, "utf-8")
268  elif isinstance(content, unicode):
269    content = content.encode("utf-8")
270  return content
271
272
273def load(path, binary=False):
274  """ Loads a file content """
275  with open(path, 'rb') as handle:
276    tmp = handle.read()
277    return tmp if binary else decode_text(tmp)
278
279
280def save(path, content, only_if_modified=False):
281  """
282  Saves a file with given content
283  Params:
284      path: path to write file to
285      content: contents to save in the file
286      only_if_modified: file won't be modified if the content hasn't changed
287  """
288  try:
289    os.makedirs(os.path.dirname(path))
290  except Exception:
291    pass
292
293  new_content = to_file_bytes(content)
294
295  if only_if_modified and os.path.exists(path):
296    old_content = load(path, binary=True)
297    if old_content == new_content:
298      return
299
300  with open(path, "wb") as handle:
301    handle.write(new_content)
302
303
304class Hunk(object):
305  """ Parsed hunk data container (hunk starts with @@ -R +R @@) """
306
307  def __init__(self):
308    self.startsrc=None #: line count starts with 1
309    self.linessrc=None
310    self.starttgt=None
311    self.linestgt=None
312    self.invalid=False
313    self.desc=''
314    self.text=[]
315
316
317class Patch(object):
318  """ Patch for a single file.
319      If used as an iterable, returns hunks.
320  """
321  def __init__(self):
322    self.source = None
323    self.target = None
324    self.hunks = []
325    self.hunkends = []
326    self.header = []
327
328    self.type = None
329
330  def __iter__(self):
331    for h in self.hunks:
332      yield h
333
334
335class PatchSet(object):
336  """ PatchSet is a patch parser and container.
337      When used as an iterable, returns patches.
338  """
339
340  def __init__(self, stream=None):
341    # --- API accessible fields ---
342
343    # name of the PatchSet (filename or ...)
344    self.name = None
345    # patch set type - one of constants
346    self.type = None
347
348    # list of Patch objects
349    self.items = []
350
351    self.errors = 0    # fatal parsing errors
352    self.warnings = 0  # non-critical warnings
353    # --- /API ---
354
355    if stream:
356      self.parse(stream)
357
358  def __len__(self):
359    return len(self.items)
360
361  def __iter__(self):
362    for i in self.items:
363      yield i
364
365  def parse(self, stream):
366    """ parse unified diff
367        return True on success
368    """
369    lineends = dict(lf=0, crlf=0, cr=0)
370    nexthunkno = 0    #: even if index starts with 0 user messages number hunks from 1
371
372    p = None
373    hunk = None
374    # hunkactual variable is used to calculate hunk lines for comparison
375    hunkactual = dict(linessrc=None, linestgt=None)
376
377
378    class wrapumerate(enumerate):
379      """Enumerate wrapper that uses boolean end of stream status instead of
380      StopIteration exception, and properties to access line information.
381      """
382
383      def __init__(self, *args, **kwargs):
384        # we don't call parent, it is magically created by __new__ method
385
386        self._exhausted = False
387        self._lineno = False     # after end of stream equal to the num of lines
388        self._line = False       # will be reset to False after end of stream
389
390      def next(self):
391        """Try to read the next line and return True if it is available,
392           False if end of stream is reached."""
393        if self._exhausted:
394          return False
395
396        try:
397          self._lineno, self._line = compat_next(super(wrapumerate, self))
398        except StopIteration:
399          self._exhausted = True
400          self._line = False
401          return False
402        return True
403
404      @property
405      def is_empty(self):
406        return self._exhausted
407
408      @property
409      def line(self):
410        return self._line
411
412      @property
413      def lineno(self):
414        return self._lineno
415
416    # define states (possible file regions) that direct parse flow
417    headscan  = True  # start with scanning header
418    filenames = False # lines starting with --- and +++
419
420    hunkhead = False  # @@ -R +R @@ sequence
421    hunkbody = False  #
422    hunkskip = False  # skipping invalid hunk mode
423
424    hunkparsed = False # state after successfully parsed hunk
425
426    # regexp to match start of hunk, used groups - 1,3,4,6
427    re_hunk_start = re.compile(b"^@@ -(\d+)(,(\d+))? \+(\d+)(,(\d+))? @@")
428
429    self.errors = 0
430    # temp buffers for header and filenames info
431    header = []
432    srcname = None
433    tgtname = None
434
435    # start of main cycle
436    # each parsing block already has line available in fe.line
437    fe = wrapumerate(stream)
438    while fe.next():
439
440      # -- deciders: these only switch state to decide who should process
441      # --           line fetched at the start of this cycle
442      if hunkparsed:
443        hunkparsed = False
444        if re_hunk_start.match(fe.line):
445            hunkhead = True
446        elif fe.line.startswith(b"--- "):
447            filenames = True
448        else:
449            headscan = True
450      # -- ------------------------------------
451
452      # read out header
453      if headscan:
454        while not fe.is_empty and not fe.line.startswith(b"--- "):
455            header.append(fe.line)
456            fe.next()
457        if fe.is_empty:
458            if p is None:
459              debug("no patch data found")  # error is shown later
460              self.errors += 1
461            else:
462              info("%d unparsed bytes left at the end of stream" % len(b''.join(header)))
463              self.warnings += 1
464              # TODO check for \No new line at the end..
465              # TODO test for unparsed bytes
466              # otherwise error += 1
467            # this is actually a loop exit
468            continue
469
470        headscan = False
471        # switch to filenames state
472        filenames = True
473
474      line = fe.line
475      lineno = fe.lineno
476
477
478      # hunkskip and hunkbody code skipped until definition of hunkhead is parsed
479      if hunkbody:
480        # [x] treat empty lines inside hunks as containing single space
481        #     (this happens when diff is saved by copy/pasting to editor
482        #      that strips trailing whitespace)
483        if line.strip(b"\r\n") == b"":
484            debug("expanding empty line in a middle of hunk body")
485            self.warnings += 1
486            line = b' ' + line
487
488        # process line first
489        if re.match(b"^[- \\+\\\\]", line):
490            # gather stats about line endings
491            if line.endswith(b"\r\n"):
492              p.hunkends["crlf"] += 1
493            elif line.endswith(b"\n"):
494              p.hunkends["lf"] += 1
495            elif line.endswith(b"\r"):
496              p.hunkends["cr"] += 1
497
498            if line.startswith(b"-"):
499              hunkactual["linessrc"] += 1
500            elif line.startswith(b"+"):
501              hunkactual["linestgt"] += 1
502            elif not line.startswith(b"\\"):
503              hunkactual["linessrc"] += 1
504              hunkactual["linestgt"] += 1
505            hunk.text.append(line)
506            # todo: handle \ No newline cases
507        else:
508            warning("invalid hunk no.%d at %d for target file %s" % (nexthunkno, lineno+1, p.target))
509            # add hunk status node
510            hunk.invalid = True
511            p.hunks.append(hunk)
512            self.errors += 1
513            # switch to hunkskip state
514            hunkbody = False
515            hunkskip = True
516
517        # check exit conditions
518        if hunkactual["linessrc"] > hunk.linessrc or hunkactual["linestgt"] > hunk.linestgt:
519            warning("extra lines for hunk no.%d at %d for target %s" % (nexthunkno, lineno+1, p.target))
520            # add hunk status node
521            hunk.invalid = True
522            p.hunks.append(hunk)
523            self.errors += 1
524            # switch to hunkskip state
525            hunkbody = False
526            hunkskip = True
527        elif hunk.linessrc == hunkactual["linessrc"] and hunk.linestgt == hunkactual["linestgt"]:
528            # hunk parsed successfully
529            p.hunks.append(hunk)
530            # switch to hunkparsed state
531            hunkbody = False
532            hunkparsed = True
533
534            # detect mixed window/unix line ends
535            ends = p.hunkends
536            if ((ends["cr"]!=0) + (ends["crlf"]!=0) + (ends["lf"]!=0)) > 1:
537              warning("inconsistent line ends in patch hunks for %s" % p.source)
538              self.warnings += 1
539            if debugmode:
540              debuglines = dict(ends)
541              debuglines.update(file=p.target, hunk=nexthunkno)
542              debug("crlf: %(crlf)d  lf: %(lf)d  cr: %(cr)d\t - file: %(file)s hunk: %(hunk)d" % debuglines)
543            # fetch next line
544            continue
545
546      if hunkskip:
547        if re_hunk_start.match(line):
548          # switch to hunkhead state
549          hunkskip = False
550          hunkhead = True
551        elif line.startswith(b"--- "):
552          # switch to filenames state
553          hunkskip = False
554          filenames = True
555          if debugmode and len(self.items) > 0:
556            debug("- %2d hunks for %s" % (len(p.hunks), p.source))
557
558      if filenames:
559        if line.startswith(b"--- "):
560          if srcname != None:
561            # XXX testcase
562            warning("skipping false patch for %s" % srcname)
563            srcname = None
564            # XXX header += srcname
565            # double source filename line is encountered
566            # attempt to restart from this second line
567
568            # Files dated at Unix epoch don't exist, e.g.:
569            # '1970-01-01 01:00:00.000000000 +0100'
570            # They include timezone offsets.
571            # .. which can be parsed (if we remove the nanoseconds)
572            # .. by strptime() with:
573            # '%Y-%m-%d %H:%M:%S %z'
574            # .. but unfortunately this relies on the OSes libc
575            # strptime function and %z support is patchy, so we drop
576            # everything from the . onwards and group the year and time
577            # separately.
578          re_filename_date_time = b"^--- ([^\t]+)(?:\s([0-9-]+)\s([0-9:]+)|.*)"
579          match = re.match(re_filename_date_time, line)
580          # todo: support spaces in filenames
581          if match:
582            srcname = match.group(1).strip()
583            date = match.group(2)
584            time = match.group(3)
585            if (date == b'1970-01-01' or date == b'1969-12-31') and time.split(b':',1)[1] == b'00:00':
586              srcname = b'/dev/null'
587          else:
588            warning("skipping invalid filename at line %d" % (lineno+1))
589            self.errors += 1
590            # XXX p.header += line
591            # switch back to headscan state
592            filenames = False
593            headscan = True
594        elif not line.startswith(b"+++ "):
595          if srcname != None:
596            warning("skipping invalid patch with no target for %s" % srcname)
597            self.errors += 1
598            srcname = None
599            # XXX header += srcname
600            # XXX header += line
601          else:
602            # this should be unreachable
603            warning("skipping invalid target patch")
604          filenames = False
605          headscan = True
606        else:
607          if tgtname != None:
608            # XXX seems to be a dead branch
609            warning("skipping invalid patch - double target at line %d" % (lineno+1))
610            self.errors += 1
611            srcname = None
612            tgtname = None
613            # XXX header += srcname
614            # XXX header += tgtname
615            # XXX header += line
616            # double target filename line is encountered
617            # switch back to headscan state
618            filenames = False
619            headscan = True
620          else:
621            re_filename_date_time = b"^\+\+\+ ([^\t]+)(?:\s([0-9-]+)\s([0-9:]+)|.*)"
622            match = re.match(re_filename_date_time, line)
623            if not match:
624              warning("skipping invalid patch - no target filename at line %d" % (lineno+1))
625              self.errors += 1
626              srcname = None
627              # switch back to headscan state
628              filenames = False
629              headscan = True
630            else:
631              tgtname = match.group(1).strip()
632              date = match.group(2)
633              time = match.group(3)
634              if (date == b'1970-01-01' or date == b'1969-12-31') and time.split(b':',1)[1] == b'00:00':
635                  tgtname = b'/dev/null'
636              if p: # for the first run p is None
637                self.items.append(p)
638              p = Patch()
639              p.source = srcname
640              srcname = None
641              p.target = tgtname
642              tgtname = None
643              p.header = header
644              header = []
645              # switch to hunkhead state
646              filenames = False
647              hunkhead = True
648              nexthunkno = 0
649              p.hunkends = lineends.copy()
650              continue
651
652      if hunkhead:
653        match = re.match(b"^@@ -(\d+)(,(\d+))? \+(\d+)(,(\d+))? @@(.*)", line)
654        if not match:
655          if not p.hunks:
656            warning("skipping invalid patch with no hunks for file %s" % p.source)
657            self.errors += 1
658            # XXX review switch
659            # switch to headscan state
660            hunkhead = False
661            headscan = True
662            continue
663          else:
664            # TODO review condition case
665            # switch to headscan state
666            hunkhead = False
667            headscan = True
668        else:
669          hunk = Hunk()
670          hunk.startsrc = int(match.group(1))
671          hunk.linessrc = 1
672          if match.group(3): hunk.linessrc = int(match.group(3))
673          hunk.starttgt = int(match.group(4))
674          hunk.linestgt = 1
675          if match.group(6): hunk.linestgt = int(match.group(6))
676          hunk.invalid = False
677          hunk.desc = match.group(7)[1:].rstrip()
678          hunk.text = []
679
680          hunkactual["linessrc"] = hunkactual["linestgt"] = 0
681
682          # switch to hunkbody state
683          hunkhead = False
684          hunkbody = True
685          nexthunkno += 1
686          continue
687
688    # /while fe.next()
689
690    if p:
691      self.items.append(p)
692
693    if not hunkparsed:
694      if hunkskip:
695        warning("warning: finished with errors, some hunks may be invalid")
696      elif headscan:
697        if len(self.items) == 0:
698          warning("error: no patch data found!")
699          return False
700        else: # extra data at the end of file
701          pass
702      else:
703        warning("error: patch stream is incomplete!")
704        self.errors += 1
705        if len(self.items) == 0:
706          return False
707
708    if debugmode and len(self.items) > 0:
709        debug("- %2d hunks for %s" % (len(p.hunks), p.source))
710
711    # XXX fix total hunks calculation
712    debug("total files: %d  total hunks: %d" % (len(self.items),
713        sum(len(p.hunks) for p in self.items)))
714
715    # ---- detect patch and patchset types ----
716    for idx, p in enumerate(self.items):
717      self.items[idx].type = self._detect_type(p)
718
719    types = set([p.type for p in self.items])
720    if len(types) > 1:
721      self.type = MIXED
722    else:
723      self.type = types.pop()
724    # --------
725
726    self._normalize_filenames()
727
728    return (self.errors == 0)
729
730  def _detect_type(self, p):
731    """ detect and return type for the specified Patch object
732        analyzes header and filenames info
733
734        NOTE: must be run before filenames are normalized
735    """
736
737    # check for SVN
738    #  - header starts with Index:
739    #  - next line is ===... delimiter
740    #  - filename is followed by revision number
741    # TODO add SVN revision
742    if (len(p.header) > 1 and p.header[-2].startswith(b"Index: ")
743          and p.header[-1].startswith(b"="*67)):
744        return SVN
745
746    # common checks for both HG and GIT
747    DVCS = ((p.source.startswith(b'a/') or p.source == b'/dev/null')
748        and (p.target.startswith(b'b/') or p.target == b'/dev/null'))
749
750    # GIT type check
751    #  - header[-2] is like "diff --git a/oldname b/newname"
752    #  - header[-1] is like "index <hash>..<hash> <mode>"
753    # TODO add git rename diffs and add/remove diffs
754    #      add git diff with spaced filename
755    # TODO http://www.kernel.org/pub/software/scm/git/docs/git-diff.html
756
757    # Git patch header len is 2 min
758    if len(p.header) > 1:
759      # detect the start of diff header - there might be some comments before
760      for idx in reversed(range(len(p.header))):
761        if p.header[idx].startswith(b"diff --git"):
762          break
763      if p.header[idx].startswith(b'diff --git a/'):
764        if (idx+1 < len(p.header)
765            and re.match(b'(?:index \\w{7}..\\w{7} \\d{6}|new file mode \\d*)', p.header[idx+1])):
766          if DVCS:
767            return GIT
768
769    # HG check
770    #
771    #  - for plain HG format header is like "diff -r b2d9961ff1f5 filename"
772    #  - for Git-style HG patches it is "diff --git a/oldname b/newname"
773    #  - filename starts with a/, b/ or is equal to /dev/null
774    #  - exported changesets also contain the header
775    #    # HG changeset patch
776    #    # User name@example.com
777    #    ...
778    # TODO add MQ
779    # TODO add revision info
780    if len(p.header) > 0:
781      if DVCS and re.match(b'diff -r \\w{12} .*', p.header[-1]):
782        return HG
783      if DVCS and p.header[-1].startswith(b'diff --git a/'):
784        if len(p.header) == 1:  # native Git patch header len is 2
785          return HG
786        elif p.header[0].startswith(b'# HG changeset patch'):
787          return HG
788
789    return PLAIN
790
791
792  def _normalize_filenames(self):
793    """ sanitize filenames, normalizing paths, i.e.:
794        1. strip a/ and b/ prefixes from GIT and HG style patches
795        2. remove all references to parent directories (with warning)
796        3. translate any absolute paths to relative (with warning)
797
798        [x] always use forward slashes to be crossplatform
799            (diff/patch were born as a unix utility after all)
800
801        return None
802    """
803    if debugmode:
804      debug("normalize filenames")
805    for i,p in enumerate(self.items):
806      if debugmode:
807        debug("    patch type = %s" % p.type)
808        debug("    source = %s" % p.source)
809        debug("    target = %s" % p.target)
810      if p.type in (HG, GIT):
811        debug("stripping a/ and b/ prefixes")
812        if p.source != b'/dev/null':
813          if not p.source.startswith(b"a/"):
814            warning("invalid source filename")
815          else:
816            p.source = p.source[2:]
817        if p.target != b'/dev/null':
818          if not p.target.startswith(b"b/"):
819            warning("invalid target filename")
820          else:
821            p.target = p.target[2:]
822
823      p.source = xnormpath(p.source)
824      p.target = xnormpath(p.target)
825
826      sep = b'/'  # sep value can be hardcoded, but it looks nice this way
827
828      # references to parent are not allowed
829      if p.source.startswith(b".." + sep):
830        warning("error: stripping parent path for source file patch no.%d" % (i+1))
831        self.warnings += 1
832        while p.source.startswith(b".." + sep):
833          p.source = p.source.partition(sep)[2]
834      if p.target.startswith(b".." + sep):
835        warning("error: stripping parent path for target file patch no.%d" % (i+1))
836        self.warnings += 1
837        while p.target.startswith(b".." + sep):
838          p.target = p.target.partition(sep)[2]
839      # absolute paths are not allowed
840      if (xisabs(p.source) and p.source != b'/dev/null') or \
841         (xisabs(p.target) and p.target != b'/dev/null'):
842        warning("error: absolute paths are not allowed - file no.%d" % (i+1))
843        self.warnings += 1
844        if xisabs(p.source) and p.source != b'/dev/null':
845          warning("stripping absolute path from source name '%s'" % p.source)
846          p.source = xstrip(p.source)
847        if xisabs(p.target) and p.target != b'/dev/null':
848          warning("stripping absolute path from target name '%s'" % p.target)
849          p.target = xstrip(p.target)
850
851      self.items[i].source = p.source
852      self.items[i].target = p.target
853
854
855  def diffstat(self):
856    """ calculate diffstat and return as a string
857        Notes:
858          - original diffstat ouputs target filename
859          - single + or - shouldn't escape histogram
860    """
861    names = []
862    insert = []
863    delete = []
864    delta = 0    # size change in bytes
865    namelen = 0
866    maxdiff = 0  # max number of changes for single file
867                 # (for histogram width calculation)
868    for patch in self.items:
869      i,d = 0,0
870      for hunk in patch.hunks:
871        for line in hunk.text:
872          if line.startswith(b'+'):
873            i += 1
874            delta += len(line)-1
875          elif line.startswith(b'-'):
876            d += 1
877            delta -= len(line)-1
878      names.append(patch.target)
879      insert.append(i)
880      delete.append(d)
881      namelen = max(namelen, len(patch.target))
882      maxdiff = max(maxdiff, i+d)
883    output = ''
884    statlen = len(str(maxdiff))  # stats column width
885    for i,n in enumerate(names):
886      # %-19s | %-4d %s
887      format = " %-" + str(namelen) + "s | %" + str(statlen) + "s %s\n"
888
889      hist = ''
890      # -- calculating histogram --
891      width = len(format % ('', '', ''))
892      histwidth = max(2, 80 - width)
893      if maxdiff < histwidth:
894        hist = "+"*insert[i] + "-"*delete[i]
895      else:
896        iratio = (float(insert[i]) / maxdiff) * histwidth
897        dratio = (float(delete[i]) / maxdiff) * histwidth
898
899        # make sure every entry gets at least one + or -
900        iwidth = 1 if 0 < iratio < 1 else int(iratio)
901        dwidth = 1 if 0 < dratio < 1 else int(dratio)
902        #print(iratio, dratio, iwidth, dwidth, histwidth)
903        hist = "+"*int(iwidth) + "-"*int(dwidth)
904      # -- /calculating +- histogram --
905      output += (format % (tostr(names[i]), str(insert[i] + delete[i]), hist))
906
907    output += (" %d files changed, %d insertions(+), %d deletions(-), %+d bytes"
908               % (len(names), sum(insert), sum(delete), delta))
909    return output
910
911
912  def findfiles(self, old, new):
913    """ return tuple of source file, target file """
914    if old == b'/dev/null':
915      handle, abspath = tempfile.mkstemp(suffix='pypatch')
916      abspath = abspath.encode()
917      # The source file must contain a line for the hunk matching to succeed.
918      os.write(handle, b' ')
919      os.close(handle)
920      if not exists(new):
921        handle = open(new, 'wb')
922        handle.close()
923      return abspath, new
924    elif exists(old):
925      return old, old
926    elif exists(new):
927      return new, new
928    elif new == b'/dev/null':
929      return None, None
930    else:
931      # [w] Google Code generates broken patches with its online editor
932      debug("broken patch from Google Code, stripping prefixes..")
933      if old.startswith(b'a/') and new.startswith(b'b/'):
934        old, new = old[2:], new[2:]
935        debug("   %s" % old)
936        debug("   %s" % new)
937        if exists(old):
938          return old, old
939        elif exists(new):
940          return new, new
941      return None, None
942
943  def _strip_prefix(self, filename):
944    if filename.startswith(b'a/') or filename.startswith(b'b/'):
945        return filename[2:]
946    return filename
947
948  def decode_clean(self, path, prefix):
949    path = path.decode("utf-8").replace("\\", "/")
950    if path.startswith(prefix):
951      path = path[2:]
952    return path
953
954  def strip_path(self, path, base_path, strip=0):
955    tokens = path.split("/")
956    if len(tokens) > 1:
957      tokens = tokens[strip:]
958    path = "/".join(tokens)
959    if base_path:
960      path = os.path.join(base_path, path)
961    return path
962    # account for new and deleted files, upstream dep won't fix them
963
964
965
966
967  def apply(self, strip=0, root=None, fuzz=False):
968    """ Apply parsed patch, optionally stripping leading components
969        from file paths. `root` parameter specifies working dir.
970        :param strip: Strip patch path
971        :param root: Folder to apply the patch
972        :param fuzz: Accept fuzzy patches
973        return True on success
974    """
975    items = []
976    for item in self.items:
977      source = self.decode_clean(item.source, "a/")
978      target = self.decode_clean(item.target, "b/")
979      if "dev/null" in source:
980        target = self.strip_path(target, root, strip)
981        hunks = [s.decode("utf-8") for s in item.hunks[0].text]
982        new_file = "".join(hunk[1:] for hunk in hunks)
983        save(target, new_file)
984      elif "dev/null" in target:
985        source = self.strip_path(source, root, strip)
986        safe_unlink(source)
987      else:
988        items.append(item)
989    self.items = items
990
991    if root:
992      prevdir = os.getcwd()
993      os.chdir(root)
994
995    total = len(self.items)
996    errors = 0
997    if strip:
998      # [ ] test strip level exceeds nesting level
999      #   [ ] test the same only for selected files
1000      #     [ ] test if files end up being on the same level
1001      try:
1002        strip = int(strip)
1003      except ValueError:
1004        errors += 1
1005        warning("error: strip parameter '%s' must be an integer" % strip)
1006        strip = 0
1007
1008    #for fileno, filename in enumerate(self.source):
1009    for i,p in enumerate(self.items):
1010      if strip:
1011        debug("stripping %s leading component(s) from:" % strip)
1012        debug("   %s" % p.source)
1013        debug("   %s" % p.target)
1014        old = p.source if p.source == b'/dev/null' else pathstrip(p.source, strip)
1015        new = p.target if p.target == b'/dev/null' else pathstrip(p.target, strip)
1016      else:
1017        old, new = p.source, p.target
1018
1019      filenameo, filenamen = self.findfiles(old, new)
1020
1021      if not filenameo or not filenamen:
1022        error("source/target file does not exist:\n  --- %s\n  +++ %s" % (old, new))
1023        errors += 1
1024        continue
1025      if not isfile(filenameo):
1026        error("not a file - %s" % filenameo)
1027        errors += 1
1028        continue
1029
1030      # [ ] check absolute paths security here
1031      debug("processing %d/%d:\t %s" % (i+1, total, filenamen))
1032
1033      # validate before patching
1034      f2fp = open(filenameo, 'rb')
1035      hunkno = 0
1036      hunk = p.hunks[hunkno]
1037      hunkfind = []
1038      hunkreplace = []
1039      validhunks = 0
1040      canpatch = False
1041      for lineno, line in enumerate(f2fp):
1042        if lineno+1 < hunk.startsrc:
1043          continue
1044        elif lineno+1 == hunk.startsrc:
1045          hunkfind = [x[1:].rstrip(b"\r\n") for x in hunk.text if x[0] in b" -"]
1046          hunkreplace = [x[1:].rstrip(b"\r\n") for x in hunk.text if x[0] in b" +"]
1047          #pprint(hunkreplace)
1048          hunklineno = 0
1049
1050          # todo \ No newline at end of file
1051
1052        # check hunks in source file
1053        if lineno+1 < hunk.startsrc+len(hunkfind):
1054          if line.rstrip(b"\r\n") == hunkfind[hunklineno]:
1055            hunklineno += 1
1056          else:
1057            warning("file %d/%d:\t %s" % (i+1, total, filenamen))
1058            warning(" hunk no.%d doesn't match source file at line %d" % (hunkno+1, lineno+1))
1059            warning("  expected: %s" % hunkfind[hunklineno])
1060            warning("  actual  : %s" % line.rstrip(b"\r\n"))
1061            if fuzz:
1062              hunklineno += 1
1063            else:
1064              # not counting this as error, because file may already be patched.
1065              # check if file is already patched is done after the number of
1066              # invalid hunks if found
1067              # TODO: check hunks against source/target file in one pass
1068              #   API - check(stream, srchunks, tgthunks)
1069              #           return tuple (srcerrs, tgterrs)
1070
1071              # continue to check other hunks for completeness
1072              hunkno += 1
1073              if hunkno < len(p.hunks):
1074                hunk = p.hunks[hunkno]
1075                continue
1076              else:
1077                break
1078
1079        # check if processed line is the last line
1080        if len(hunkfind) == 0 or lineno+1 == hunk.startsrc+len(hunkfind)-1:
1081          debug(" hunk no.%d for file %s  -- is ready to be patched" % (hunkno+1, filenamen))
1082          hunkno+=1
1083          validhunks+=1
1084          if hunkno < len(p.hunks):
1085            hunk = p.hunks[hunkno]
1086          else:
1087            if validhunks == len(p.hunks):
1088              # patch file
1089              canpatch = True
1090              break
1091      else:
1092        if hunkno < len(p.hunks):
1093          error("premature end of source file %s at hunk %d" % (filenameo, hunkno+1))
1094          errors += 1
1095
1096      f2fp.close()
1097
1098      if validhunks < len(p.hunks):
1099        if self._match_file_hunks(filenameo, p.hunks):
1100          warning("already patched  %s" % filenameo)
1101        else:
1102          if fuzz:
1103            warning("source file is different - %s" % filenameo)
1104          else:
1105            error("source file is different - %s" % filenameo)
1106            errors += 1
1107      if canpatch:
1108        backupname = filenamen+b".orig"
1109        if exists(backupname):
1110          warning("can't backup original file to %s - aborting" % backupname)
1111          errors += 1
1112        else:
1113          shutil.move(filenamen, backupname)
1114          if self.write_hunks(backupname if filenameo == filenamen else filenameo, filenamen, p.hunks):
1115            info("successfully patched %d/%d:\t %s" % (i+1, total, filenamen))
1116            safe_unlink(backupname)
1117            if new == b'/dev/null':
1118              # check that filename is of size 0 and delete it.
1119              if os.path.getsize(filenamen) > 0:
1120                warning("expected patched file to be empty as it's marked as deletion:\t %s" % filenamen)
1121              safe_unlink(filenamen)
1122          else:
1123            errors += 1
1124            warning("error patching file %s" % filenamen)
1125            shutil.copy(filenamen, filenamen+".invalid")
1126            warning("invalid version is saved to %s" % filenamen+".invalid")
1127            # todo: proper rejects
1128            shutil.move(backupname, filenamen)
1129
1130    if root:
1131      os.chdir(prevdir)
1132
1133    # todo: check for premature eof
1134    return (errors == 0)
1135
1136
1137  def _reverse(self):
1138    """ reverse patch direction (this doesn't touch filenames) """
1139    for p in self.items:
1140      for h in p.hunks:
1141        h.startsrc, h.starttgt = h.starttgt, h.startsrc
1142        h.linessrc, h.linestgt = h.linestgt, h.linessrc
1143        for i,line in enumerate(h.text):
1144          # need to use line[0:1] here, because line[0]
1145          # returns int instead of bytes on Python 3
1146          if line[0:1] == b'+':
1147            h.text[i] = b'-' + line[1:]
1148          elif line[0:1] == b'-':
1149            h.text[i] = b'+' +line[1:]
1150
1151  def revert(self, strip=0, root=None):
1152    """ apply patch in reverse order """
1153    reverted = copy.deepcopy(self)
1154    reverted._reverse()
1155    return reverted.apply(strip, root)
1156
1157
1158  def can_patch(self, filename):
1159    """ Check if specified filename can be patched. Returns None if file can
1160    not be found among source filenames. False if patch can not be applied
1161    clearly. True otherwise.
1162
1163    :returns: True, False or None
1164    """
1165    filename = abspath(filename)
1166    for p in self.items:
1167      if filename == abspath(p.source):
1168        return self._match_file_hunks(filename, p.hunks)
1169    return None
1170
1171
1172  def _match_file_hunks(self, filepath, hunks):
1173    matched = True
1174    fp = open(abspath(filepath), 'rb')
1175
1176    class NoMatch(Exception):
1177      pass
1178
1179    lineno = 1
1180    line = fp.readline()
1181    try:
1182      for hno, h in enumerate(hunks):
1183        # skip to first line of the hunk
1184        while lineno < h.starttgt:
1185          if not len(line): # eof
1186            debug("check failed - premature eof before hunk: %d" % (hno+1))
1187            raise NoMatch
1188          line = fp.readline()
1189          lineno += 1
1190        for hline in h.text:
1191          if hline.startswith(b"-"):
1192            continue
1193          if not len(line):
1194            debug("check failed - premature eof on hunk: %d" % (hno+1))
1195            # todo: \ No newline at the end of file
1196            raise NoMatch
1197          if line.rstrip(b"\r\n") != hline[1:].rstrip(b"\r\n"):
1198            debug("file is not patched - failed hunk: %d" % (hno+1))
1199            raise NoMatch
1200          line = fp.readline()
1201          lineno += 1
1202
1203    except NoMatch:
1204      matched = False
1205      # todo: display failed hunk, i.e. expected/found
1206
1207    fp.close()
1208    return matched
1209
1210
1211  def patch_stream(self, instream, hunks):
1212    """ Generator that yields stream patched with hunks iterable
1213
1214        Converts lineends in hunk lines to the best suitable format
1215        autodetected from input
1216    """
1217
1218    # todo: At the moment substituted lineends may not be the same
1219    #       at the start and at the end of patching. Also issue a
1220    #       warning/throw about mixed lineends (is it really needed?)
1221
1222    hunks = iter(hunks)
1223
1224    srclineno = 1
1225
1226    lineends = {b'\n':0, b'\r\n':0, b'\r':0}
1227    def get_line():
1228      """
1229      local utility function - return line from source stream
1230      collecting line end statistics on the way
1231      """
1232      line = instream.readline()
1233        # 'U' mode works only with text files
1234      if line.endswith(b"\r\n"):
1235        lineends[b"\r\n"] += 1
1236      elif line.endswith(b"\n"):
1237        lineends[b"\n"] += 1
1238      elif line.endswith(b"\r"):
1239        lineends[b"\r"] += 1
1240      return line
1241
1242    for hno, h in enumerate(hunks):
1243      debug("hunk %d" % (hno+1))
1244      # skip to line just before hunk starts
1245      while srclineno < h.startsrc:
1246        yield get_line()
1247        srclineno += 1
1248
1249      for hline in h.text:
1250        # todo: check \ No newline at the end of file
1251        if hline.startswith(b"-") or hline.startswith(b"\\"):
1252          get_line()
1253          srclineno += 1
1254          continue
1255        else:
1256          if not hline.startswith(b"+"):
1257            yield get_line()
1258            srclineno += 1
1259            continue
1260          line2write = hline[1:]
1261          # detect if line ends are consistent in source file
1262          if sum([bool(lineends[x]) for x in lineends]) == 1:
1263            newline = [x for x in lineends if lineends[x] != 0][0]
1264            yield line2write.rstrip(b"\r\n")+newline
1265          else: # newlines are mixed
1266            yield line2write
1267
1268    for line in instream:
1269      yield line
1270
1271
1272  def write_hunks(self, srcname, tgtname, hunks):
1273    src = open(srcname, "rb")
1274    tgt = open(tgtname, "wb")
1275
1276    debug("processing target file %s" % tgtname)
1277
1278    tgt.writelines(self.patch_stream(src, hunks))
1279
1280    tgt.close()
1281    src.close()
1282    # [ ] TODO: add test for permission copy
1283    shutil.copymode(srcname, tgtname)
1284    return True
1285
1286
1287  def dump(self):
1288    for p in self.items:
1289      for headline in p.header:
1290        print(headline.rstrip('\n'))
1291      print('--- ' + p.source)
1292      print('+++ ' + p.target)
1293      for h in p.hunks:
1294        print('@@ -%s,%s +%s,%s @@' % (h.startsrc, h.linessrc, h.starttgt, h.linestgt))
1295        for line in h.text:
1296          print(line.rstrip('\n'))
1297
1298
1299def main():
1300  from optparse import OptionParser
1301  from os.path import exists
1302  import sys
1303
1304  opt = OptionParser(usage="1. %prog [options] unified.diff\n"
1305                    "       2. %prog [options] http://host/patch\n"
1306                    "       3. %prog [options] -- < unified.diff",
1307                     version="python-patch %s" % __version__)
1308  opt.add_option("-q", "--quiet", action="store_const", dest="verbosity",
1309                                  const=0, help="print only warnings and errors", default=1)
1310  opt.add_option("-v", "--verbose", action="store_const", dest="verbosity",
1311                                  const=2, help="be verbose")
1312  opt.add_option("--debug", action="store_true", dest="debugmode", help="debug mode")
1313  opt.add_option("--diffstat", action="store_true", dest="diffstat",
1314                                           help="print diffstat and exit")
1315  opt.add_option("-d", "--directory", metavar='DIR',
1316                                           help="specify root directory for applying patch")
1317  opt.add_option("-p", "--strip", type="int", metavar='N', default=0,
1318                                           help="strip N path components from filenames")
1319  opt.add_option("--revert", action="store_true",
1320                                           help="apply patch in reverse order (unpatch)")
1321  opt.add_option("-f", "--fuzz", action="store_true", dest="fuzz", help="Accept fuuzzy patches")
1322  (options, args) = opt.parse_args()
1323
1324  if not args and sys.argv[-1:] != ['--']:
1325    opt.print_version()
1326    opt.print_help()
1327    sys.exit()
1328  readstdin = (sys.argv[-1:] == ['--'] and not args)
1329
1330  verbosity_levels = {0:logging.WARNING, 1:logging.INFO, 2:logging.DEBUG}
1331  loglevel = verbosity_levels[options.verbosity]
1332  logformat = "%(message)s"
1333  logger.setLevel(loglevel)
1334  streamhandler.setFormatter(logging.Formatter(logformat))
1335
1336  if options.debugmode:
1337    setdebug()  # this sets global debugmode variable
1338
1339  if readstdin:
1340    patch = PatchSet(sys.stdin)
1341  else:
1342    patchfile = args[0]
1343    urltest = patchfile.split(':')[0]
1344    if (':' in patchfile and urltest.isalpha()
1345        and len(urltest) > 1): # one char before : is a windows drive letter
1346      patch = fromurl(patchfile)
1347    else:
1348      if not exists(patchfile) or not isfile(patchfile):
1349        sys.exit("patch file does not exist - %s" % patchfile)
1350      patch = fromfile(patchfile)
1351
1352  if options.diffstat:
1353    print(patch.diffstat())
1354    sys.exit(0)
1355
1356  if not patch:
1357    error("Could not parse patch")
1358    sys.exit(-1)
1359
1360  #pprint(patch)
1361  if options.revert:
1362    patch.revert(options.strip, root=options.directory) or sys.exit(-1)
1363  else:
1364    patch.apply(options.strip, root=options.directory, fuzz=options.fuzz) or sys.exit(-1)
1365
1366  # todo: document and test line ends handling logic - patch_ng.py detects proper line-endings
1367  #       for inserted hunks and issues a warning if patched file has incosistent line ends
1368
1369
1370if __name__ == "__main__":
1371  main()
1372
1373# Legend:
1374# [ ]  - some thing to be done
1375# [w]  - official wart, external or internal that is unlikely to be fixed
1376
1377# [ ] API break (2.x) wishlist
1378# PatchSet.items  -->  PatchSet.patches
1379
1380# [ ] run --revert test for all dataset items
1381# [ ] run .parse() / .dump() test for dataset
1382