1# Copyright (C) 2005 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17# \subsection{\emph{rio} - simple text metaformat} 18# 19# \emph{r} stands for `restricted', `reproducible', or `rfc822-like'. 20# 21# The stored data consists of a series of \emph{stanzas}, each of which contains 22# \emph{fields} identified by an ascii name, with Unicode or string contents. 23# The field tag is constrained to alphanumeric characters. 24# There may be more than one field in a stanza with the same name. 25# 26# The format itself does not deal with character encoding issues, though 27# the result will normally be written in Unicode. 28# 29# The format is intended to be simple enough that there is exactly one character 30# stream representation of an object and vice versa, and that this relation 31# will continue to hold for future versions of bzr. 32 33import re 34 35from . import osutils 36from .iterablefile import IterableFile 37 38# XXX: some redundancy is allowing to write stanzas in isolation as well as 39# through a writer object. 40 41 42class RioWriter(object): 43 44 def __init__(self, to_file): 45 self._soft_nl = False 46 self._to_file = to_file 47 48 def write_stanza(self, stanza): 49 if self._soft_nl: 50 self._to_file.write(b'\n') 51 stanza.write(self._to_file) 52 self._soft_nl = True 53 54 55class RioReader(object): 56 """Read stanzas from a file as a sequence 57 58 to_file can be anything that can be enumerated as a sequence of 59 lines (with newlines.) 60 """ 61 62 def __init__(self, from_file): 63 self._from_file = from_file 64 65 def __iter__(self): 66 while True: 67 s = read_stanza(self._from_file) 68 if s is None: 69 break 70 else: 71 yield s 72 73 74def rio_file(stanzas, header=None): 75 """Produce a rio IterableFile from an iterable of stanzas""" 76 def str_iter(): 77 if header is not None: 78 yield header + b'\n' 79 first_stanza = True 80 for s in stanzas: 81 if first_stanza is not True: 82 yield b'\n' 83 for line in s.to_lines(): 84 yield line 85 first_stanza = False 86 return IterableFile(str_iter()) 87 88 89def read_stanzas(from_file): 90 91 while True: 92 s = read_stanza(from_file) 93 if s is None: 94 break 95 yield s 96 97 98def read_stanzas_unicode(from_file): 99 100 while True: 101 s = read_stanza_unicode(from_file) 102 if s is None: 103 break 104 yield s 105 106 107class Stanza(object): 108 """One stanza for rio. 109 110 Each stanza contains a set of named fields. 111 112 Names must be non-empty ascii alphanumeric plus _. Names can be repeated 113 within a stanza. Names are case-sensitive. The ordering of fields is 114 preserved. 115 116 Each field value must be either an int or a string. 117 """ 118 119 __slots__ = ['items'] 120 121 def __init__(self, **kwargs): 122 """Construct a new Stanza. 123 124 The keyword arguments, if any, are added in sorted order to the stanza. 125 """ 126 self.items = [] 127 if kwargs: 128 for tag, value in sorted(kwargs.items()): 129 self.add(tag, value) 130 131 def add(self, tag, value): 132 """Append a name and value to the stanza.""" 133 if not valid_tag(tag): 134 raise ValueError("invalid tag %r" % (tag,)) 135 if isinstance(value, bytes): 136 value = value.decode('ascii') 137 elif isinstance(value, str): 138 pass 139 else: 140 raise TypeError("invalid type for rio value: %r of type %s" 141 % (value, type(value))) 142 self.items.append((tag, value)) 143 144 @classmethod 145 def from_pairs(cls, pairs): 146 ret = cls() 147 ret.items = pairs 148 return ret 149 150 def __contains__(self, find_tag): 151 """True if there is any field in this stanza with the given tag.""" 152 for tag, value in self.items: 153 if tag == find_tag: 154 return True 155 return False 156 157 def __len__(self): 158 """Return number of pairs in the stanza.""" 159 return len(self.items) 160 161 def __eq__(self, other): 162 if not isinstance(other, Stanza): 163 return False 164 return self.items == other.items 165 166 def __ne__(self, other): 167 return not self.__eq__(other) 168 169 def __repr__(self): 170 return "Stanza(%r)" % self.items 171 172 def iter_pairs(self): 173 """Return iterator of tag, value pairs.""" 174 return iter(self.items) 175 176 def to_lines(self): 177 """Generate sequence of lines for external version of this file. 178 179 The lines are always utf-8 encoded strings. 180 """ 181 if not self.items: 182 # max() complains if sequence is empty 183 return [] 184 result = [] 185 for text_tag, text_value in self.items: 186 tag = text_tag.encode('ascii') 187 value = text_value.encode('utf-8', 'surrogateescape') 188 if value == b'': 189 result.append(tag + b': \n') 190 elif b'\n' in value: 191 # don't want splitlines behaviour on empty lines 192 val_lines = value.split(b'\n') 193 result.append(tag + b': ' + val_lines[0] + b'\n') 194 for line in val_lines[1:]: 195 result.append(b'\t' + line + b'\n') 196 else: 197 result.append(tag + b': ' + value + b'\n') 198 return result 199 200 def to_string(self): 201 """Return stanza as a single string""" 202 return b''.join(self.to_lines()) 203 204 def to_unicode(self): 205 """Return stanza as a single Unicode string. 206 207 This is most useful when adding a Stanza to a parent Stanza 208 """ 209 if not self.items: 210 return u'' 211 212 result = [] 213 for tag, value in self.items: 214 if value == u'': 215 result.append(tag + u': \n') 216 elif u'\n' in value: 217 # don't want splitlines behaviour on empty lines 218 val_lines = value.split(u'\n') 219 result.append(tag + u': ' + val_lines[0] + u'\n') 220 for line in val_lines[1:]: 221 result.append(u'\t' + line + u'\n') 222 else: 223 result.append(tag + u': ' + value + u'\n') 224 return u''.join(result) 225 226 def write(self, to_file): 227 """Write stanza to a file""" 228 to_file.writelines(self.to_lines()) 229 230 def get(self, tag): 231 """Return the value for a field wih given tag. 232 233 If there is more than one value, only the first is returned. If the 234 tag is not present, KeyError is raised. 235 """ 236 for t, v in self.items: 237 if t == tag: 238 return v 239 else: 240 raise KeyError(tag) 241 242 __getitem__ = get 243 244 def get_all(self, tag): 245 r = [] 246 for t, v in self.items: 247 if t == tag: 248 r.append(v) 249 return r 250 251 def as_dict(self): 252 """Return a dict containing the unique values of the stanza. 253 """ 254 d = {} 255 for tag, value in self.items: 256 d[tag] = value 257 return d 258 259 260def valid_tag(tag): 261 return _valid_tag(tag) 262 263 264def read_stanza(line_iter): 265 """Return new Stanza read from list of lines or a file 266 267 Returns one Stanza that was read, or returns None at end of file. If a 268 blank line follows the stanza, it is consumed. It's not an error for 269 there to be no blank at end of file. If there is a blank file at the 270 start of the input this is really an empty stanza and that is returned. 271 272 Only the stanza lines and the trailing blank (if any) are consumed 273 from the line_iter. 274 275 The raw lines must be in utf-8 encoding. 276 """ 277 return _read_stanza_utf8(line_iter) 278 279 280def read_stanza_unicode(unicode_iter): 281 """Read a Stanza from a list of lines or a file. 282 283 The lines should already be in unicode form. This returns a single 284 stanza that was read. If there is a blank line at the end of the Stanza, 285 it is consumed. It is not an error for there to be no blank line at 286 the end of the iterable. If there is a blank line at the beginning, 287 this is treated as an empty Stanza and None is returned. 288 289 Only the stanza lines and the trailing blank (if any) are consumed 290 from the unicode_iter 291 292 :param unicode_iter: A iterable, yeilding Unicode strings. See read_stanza 293 if you have a utf-8 encoded string. 294 :return: A Stanza object if there are any lines in the file. 295 None otherwise 296 """ 297 return _read_stanza_unicode(unicode_iter) 298 299 300def to_patch_lines(stanza, max_width=72): 301 """Convert a stanza into RIO-Patch format lines. 302 303 RIO-Patch is a RIO variant designed to be e-mailed as part of a patch. 304 It resists common forms of damage such as newline conversion or the removal 305 of trailing whitespace, yet is also reasonably easy to read. 306 307 :param max_width: The maximum number of characters per physical line. 308 :return: a list of lines 309 """ 310 if max_width <= 6: 311 raise ValueError(max_width) 312 max_rio_width = max_width - 4 313 lines = [] 314 for pline in stanza.to_lines(): 315 for line in pline.split(b'\n')[:-1]: 316 line = re.sub(b'\\\\', b'\\\\\\\\', line) 317 while len(line) > 0: 318 partline = line[:max_rio_width] 319 line = line[max_rio_width:] 320 if len(line) > 0 and line[:1] != [b' ']: 321 break_index = -1 322 break_index = partline.rfind(b' ', -20) 323 if break_index < 3: 324 break_index = partline.rfind(b'-', -20) 325 break_index += 1 326 if break_index < 3: 327 break_index = partline.rfind(b'/', -20) 328 if break_index >= 3: 329 line = partline[break_index:] + line 330 partline = partline[:break_index] 331 if len(line) > 0: 332 line = b' ' + line 333 partline = re.sub(b'\r', b'\\\\r', partline) 334 blank_line = False 335 if len(line) > 0: 336 partline += b'\\' 337 elif re.search(b' $', partline): 338 partline += b'\\' 339 blank_line = True 340 lines.append(b'# ' + partline + b'\n') 341 if blank_line: 342 lines.append(b'# \n') 343 return lines 344 345 346def _patch_stanza_iter(line_iter): 347 map = {b'\\\\': b'\\', 348 b'\\r': b'\r', 349 b'\\\n': b''} 350 351 def mapget(match): 352 return map[match.group(0)] 353 354 last_line = None 355 for line in line_iter: 356 if line.startswith(b'# '): 357 line = line[2:] 358 elif line.startswith(b'#'): 359 line = line[1:] 360 else: 361 raise ValueError("bad line %r" % (line,)) 362 if last_line is not None and len(line) > 2: 363 line = line[2:] 364 line = re.sub(b'\r', b'', line) 365 line = re.sub(b'\\\\(.|\n)', mapget, line) 366 if last_line is None: 367 last_line = line 368 else: 369 last_line += line 370 if last_line[-1:] == b'\n': 371 yield last_line 372 last_line = None 373 if last_line is not None: 374 yield last_line 375 376 377def read_patch_stanza(line_iter): 378 """Convert an iterable of RIO-Patch format lines into a Stanza. 379 380 RIO-Patch is a RIO variant designed to be e-mailed as part of a patch. 381 It resists common forms of damage such as newline conversion or the removal 382 of trailing whitespace, yet is also reasonably easy to read. 383 384 :return: a Stanza 385 """ 386 return read_stanza(_patch_stanza_iter(line_iter)) 387 388 389try: 390 from ._rio_pyx import ( 391 _read_stanza_utf8, 392 _read_stanza_unicode, 393 _valid_tag, 394 ) 395except ImportError as e: 396 osutils.failed_to_load_extension(e) 397 from ._rio_py import ( 398 _read_stanza_utf8, 399 _read_stanza_unicode, 400 _valid_tag, 401 ) 402