1# Copyright (C) 2005-2012 Canonical Ltd 2# Copyright (C) 2020 Breezy Developers 3# 4# This program is free software; you can redistribute it and/or modify 5# it under the terms of the GNU General Public License as published by 6# the Free Software Foundation; either version 2 of the License, or 7# (at your option) any later version. 8# 9# This program is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12# GNU General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with this program; if not, write to the Free Software 16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 17 18from __future__ import absolute_import 19 20from ..tag import Tags 21 22from .. import ( 23 bencode, 24 errors, 25 trace, 26 ) 27 28 29class BasicTags(Tags): 30 """Tag storage in an unversioned branch control file. 31 """ 32 33 def set_tag(self, tag_name, tag_target): 34 """Add a tag definition to the branch. 35 36 Behaviour if the tag is already present is not defined (yet). 37 """ 38 # all done with a write lock held, so this looks atomic 39 with self.branch.lock_write(): 40 master = self.branch.get_master_branch() 41 if master is not None: 42 master.tags.set_tag(tag_name, tag_target) 43 td = self.get_tag_dict() 44 td[tag_name] = tag_target 45 self._set_tag_dict(td) 46 47 def lookup_tag(self, tag_name): 48 """Return the referent string of a tag""" 49 td = self.get_tag_dict() 50 try: 51 return td[tag_name] 52 except KeyError: 53 raise errors.NoSuchTag(tag_name) 54 55 def get_tag_dict(self): 56 with self.branch.lock_read(): 57 try: 58 tag_content = self.branch._get_tags_bytes() 59 except errors.NoSuchFile: 60 # ugly, but only abentley should see this :) 61 trace.warning('No branch/tags file in %s. ' 62 'This branch was probably created by bzr 0.15pre. ' 63 'Create an empty file to silence this message.' 64 % (self.branch, )) 65 return {} 66 return self._deserialize_tag_dict(tag_content) 67 68 def delete_tag(self, tag_name): 69 """Delete a tag definition. 70 """ 71 with self.branch.lock_write(): 72 d = self.get_tag_dict() 73 try: 74 del d[tag_name] 75 except KeyError: 76 raise errors.NoSuchTag(tag_name) 77 master = self.branch.get_master_branch() 78 if master is not None: 79 try: 80 master.tags.delete_tag(tag_name) 81 except errors.NoSuchTag: 82 pass 83 self._set_tag_dict(d) 84 85 def _set_tag_dict(self, new_dict): 86 """Replace all tag definitions 87 88 WARNING: Calling this on an unlocked branch will lock it, and will 89 replace the tags without warning on conflicts. 90 91 :param new_dict: Dictionary from tag name to target. 92 """ 93 return self.branch._set_tags_bytes(self._serialize_tag_dict(new_dict)) 94 95 def _serialize_tag_dict(self, tag_dict): 96 td = dict((k.encode('utf-8'), v) 97 for k, v in tag_dict.items()) 98 return bencode.bencode(td) 99 100 def _deserialize_tag_dict(self, tag_content): 101 """Convert the tag file into a dictionary of tags""" 102 # was a special case to make initialization easy, an empty definition 103 # is an empty dictionary 104 if tag_content == b'': 105 return {} 106 try: 107 r = {} 108 for k, v in bencode.bdecode(tag_content).items(): 109 r[k.decode('utf-8')] = v 110 return r 111 except ValueError as e: 112 raise ValueError("failed to deserialize tag dictionary %r: %s" 113 % (tag_content, e)) 114