1######################################################################
2#
3# File: b2sdk/transfer/emerge/planner/planner.py
4#
5# Copyright 2020 Backblaze Inc. All Rights Reserved.
6#
7# License https://www.backblaze.com/using_b2_code.html
8#
9######################################################################
10
11import hashlib
12import json
13
14from abc import ABCMeta, abstractmethod
15from collections import deque
16from itertools import chain
17
18from b2sdk.transfer.emerge.planner.part_definition import (
19    CopyEmergePartDefinition,
20    UploadEmergePartDefinition,
21    UploadSubpartsEmergePartDefinition,
22)
23from b2sdk.transfer.emerge.planner.upload_subpart import (
24    LocalSourceUploadSubpart,
25    RemoteSourceUploadSubpart,
26)
27
28MEGABYTE = 1000 * 1000
29GIGABYTE = 1000 * MEGABYTE
30
31
32class UploadBuffer(object):
33    """ data container used by EmergePlanner for temporary storage of write intents """
34
35    def __init__(self, start_offset, buff=None):
36        self._start_offset = start_offset
37        self._buff = buff or []
38        if self._buff:
39            self._end_offset = self._buff[-1][1]
40        else:
41            self._end_offset = self._start_offset
42
43    @property
44    def start_offset(self):
45        return self._start_offset
46
47    @property
48    def end_offset(self):
49        return self._end_offset
50
51    @property
52    def length(self):
53        return self.end_offset - self.start_offset
54
55    def intent_count(self):
56        return len(self._buff)
57
58    def get_intent(self, index):
59        return self.get_item(index)[0]
60
61    def get_item(self, index):
62        return self._buff[index]
63
64    def iter_items(self):
65        return iter(self._buff)
66
67    def append(self, intent, fragment_end):
68        self._buff.append((intent, fragment_end))
69        self._end_offset = fragment_end
70
71    def get_slice(self, start_idx=None, end_idx=None, start_offset=None):
72        start_idx = start_idx or 0
73        buff_slice = self._buff[start_idx:end_idx]
74        if start_offset is None:
75            if start_idx == 0:
76                start_offset = self.start_offset
77            else:
78                start_offset = self._buff[start_idx - 1:start_idx][0][1]
79        return self.__class__(start_offset, buff_slice)
80
81
82class EmergePlanner(object):
83    """ Creates a list of actions required for advanced creation of an object in the cloud from an iterator of write intent objects """
84    DEFAULT_MIN_PART_SIZE = 5 * MEGABYTE
85    DEFAULT_RECOMMENDED_UPLOAD_PART_SIZE = 100 * MEGABYTE
86    DEFAULT_MAX_PART_SIZE = 5 * GIGABYTE
87
88    def __init__(
89        self,
90        min_part_size=None,
91        recommended_upload_part_size=None,
92        max_part_size=None,
93    ):
94        self.min_part_size = min_part_size or self.DEFAULT_MIN_PART_SIZE
95        self.recommended_upload_part_size = recommended_upload_part_size or self.DEFAULT_RECOMMENDED_UPLOAD_PART_SIZE
96        self.max_part_size = max_part_size or self.DEFAULT_MAX_PART_SIZE
97        assert self.min_part_size <= self.recommended_upload_part_size <= self.max_part_size
98
99    @classmethod
100    def from_account_info(
101        cls,
102        account_info,
103        min_part_size=None,
104        recommended_upload_part_size=None,
105        max_part_size=None
106    ):
107        if recommended_upload_part_size is None:
108            recommended_upload_part_size = account_info.get_recommended_part_size()
109        if min_part_size is None and recommended_upload_part_size < cls.DEFAULT_MIN_PART_SIZE:
110            min_part_size = recommended_upload_part_size
111        if max_part_size is None and recommended_upload_part_size > cls.DEFAULT_MAX_PART_SIZE:
112            max_part_size = recommended_upload_part_size
113        kwargs = {
114            'min_part_size': min_part_size,
115            'recommended_upload_part_size': recommended_upload_part_size,
116            'max_part_size': max_part_size,
117        }
118        return cls(**{key: value for key, value in kwargs.items() if value is not None})
119
120    def get_emerge_plan(self, write_intents):
121        write_intents = sorted(write_intents, key=lambda intent: intent.destination_offset)
122        return self._get_emerge_plan(write_intents, EmergePlan)
123
124    def get_streaming_emerge_plan(self, write_intent_iterator):
125        return self._get_emerge_plan(write_intent_iterator, StreamingEmergePlan)
126
127    def _get_emerge_plan(self, write_intent_iterator, plan_class):
128        return plan_class(
129            self._get_emerge_parts(
130                self._select_intent_fragments(self._validatation_iterator(write_intent_iterator))
131            )
132        )
133
134    def _get_emerge_parts(self, intent_fragments_iterator):
135        # This is where the magic happens. Instead of describing typical inputs and outputs here,
136        # We've put them in tests. It is recommended to read those tests before trying to comprehend
137        # the implementation details of this function.
138        min_part_size = self.min_part_size
139
140        # this stores current intent that we need to process - we may get
141        # it in fragments so we want to glue just by updating `current_end`
142        current_intent = None
143        current_end = 0
144
145        upload_buffer = UploadBuffer(0)
146        first = True
147        last = False
148        for intent, fragment_end in intent_fragments_iterator:
149            if current_intent is None:
150                # this is a first loop run - just initialize current intent
151                current_intent = intent
152                current_end = fragment_end
153                continue
154
155            if intent is current_intent:
156                # new intent is the same as previously processed intent, so lets glue them together
157                # this happens when the caller splits N overlapping intents into overlapping fragments
158                # and two fragments from the same intent end up streaming into here one after the other
159                current_end = fragment_end
160                continue
161
162            if intent is None:
163                last = True
164
165            # incoming intent is different - this means that now we have to decide what to do:
166            # if this is a copy intent and we want to copy it server-side, then we have to
167            # flush the whole upload buffer we accumulated so far, but OTOH we may decide that we just want to
168            # append it to upload buffer (see complete, untrivial logic below) and then maybe
169            # flush some upload parts from upload bufffer (if there is enough in the buffer)
170
171            current_len = current_end - upload_buffer.end_offset
172            # should we flush the upload buffer or do we have to add a chunk of the copy first?
173            if current_intent.is_copy() and current_len >= min_part_size:
174                # check if we can flush upload buffer or there is some missing bytes to fill it to `min_part_size`
175                if upload_buffer.intent_count() > 0 and upload_buffer.length < min_part_size:
176                    missing_length = min_part_size - upload_buffer.length
177                else:
178                    missing_length = 0
179                if missing_length > 0 and current_len - missing_length < min_part_size:
180                    # current intent is *not* a "small copy", but upload buffer is small
181                    # and current intent is too short with the buffer to reach the minimum part size
182                    # so we append current intent to upload buffer
183                    upload_buffer.append(current_intent, current_end)
184                else:
185                    if missing_length > 0:
186                        # we "borrow" a fragment of current intent to upload buffer
187                        # to fill it to minimum part size
188                        upload_buffer.append(
189                            current_intent, upload_buffer.end_offset + missing_length
190                        )
191                    # completely flush the upload buffer
192                    for upload_buffer_part in self._buff_split(upload_buffer):
193                        yield self._get_upload_part(upload_buffer_part)
194                    # split current intent (copy source) to parts and yield
195                    copy_parts = self._get_copy_parts(
196                        current_intent,
197                        start_offset=upload_buffer.end_offset,
198                        end_offset=current_end,
199                    )
200                    for part in copy_parts:
201                        yield part
202                    upload_buffer = UploadBuffer(current_end)
203            else:
204                if current_intent.is_copy() and first and last:
205                    # this is a single intent "small copy" - we force use of `copy_file`
206                    copy_parts = self._get_copy_parts(
207                        current_intent,
208                        start_offset=upload_buffer.end_offset,
209                        end_offset=current_end,
210                    )
211                    for part in copy_parts:
212                        yield part
213                else:
214                    # this is a upload source or "small copy" source (that is *not* single intent)
215                    # either way we just add them to upload buffer
216                    upload_buffer.append(current_intent, current_end)
217                    upload_buffer_parts = list(self._buff_split(upload_buffer))
218                    # we flush all parts excluding last one - we may want to extend
219                    # this last part with "incoming" intent in next loop run
220                    for upload_buffer_part in upload_buffer_parts[:-1]:
221                        yield self._get_upload_part(upload_buffer_part)
222                    upload_buffer = upload_buffer_parts[-1]
223            current_intent = intent
224            first = False
225            current_end = fragment_end
226            if current_intent is None:
227                # this is a sentinel - there would be no more fragments - we have to flush upload buffer
228                for upload_buffer_part in self._buff_split(upload_buffer):
229                    yield self._get_upload_part(upload_buffer_part)
230
231    def _get_upload_part(self, upload_buffer):
232        """ Build emerge part from upload buffer. """
233        if upload_buffer.intent_count() == 1 and upload_buffer.get_intent(0).is_upload():
234            intent = upload_buffer.get_intent(0)
235            relative_offset = upload_buffer.start_offset - intent.destination_offset
236            length = upload_buffer.length
237            definition = UploadEmergePartDefinition(intent.outbound_source, relative_offset, length)
238        else:
239            subparts = []
240            fragment_start = upload_buffer.start_offset
241            for intent, fragment_end in upload_buffer.iter_items():
242                relative_offset = fragment_start - intent.destination_offset
243                length = fragment_end - fragment_start
244                if intent.is_upload():
245                    subpart_class = LocalSourceUploadSubpart
246                elif intent.is_copy():
247                    subpart_class = RemoteSourceUploadSubpart
248                else:
249                    raise RuntimeError('This cannot happen!!!')
250                subparts.append(subpart_class(intent.outbound_source, relative_offset, length))
251                fragment_start = fragment_end
252            definition = UploadSubpartsEmergePartDefinition(subparts)
253        return EmergePart(definition)
254
255    def _get_copy_parts(self, copy_intent, start_offset, end_offset):
256        """ Split copy intent to emerge parts. """
257        fragment_length = end_offset - start_offset
258        part_count = int(fragment_length / self.max_part_size)
259        last_part_length = fragment_length % self.max_part_size
260        if last_part_length == 0:
261            last_part_length = self.max_part_size
262        else:
263            part_count += 1
264
265        if part_count == 1:
266            part_sizes = [last_part_length]
267        else:
268            if last_part_length < int(fragment_length / (part_count + 1)):
269                part_count += 1
270            base_part_size = int(fragment_length / part_count)
271            size_remainder = fragment_length % part_count
272            part_sizes = [
273                base_part_size + (1 if i < size_remainder else 0) for i in range(part_count)
274            ]
275
276        copy_source = copy_intent.outbound_source
277        relative_offset = start_offset - copy_intent.destination_offset
278        for part_size in part_sizes:
279            yield EmergePart(CopyEmergePartDefinition(copy_source, relative_offset, part_size))
280            relative_offset += part_size
281
282    def _buff_split(self, upload_buffer):
283        """ Split upload buffer to parts candidates - smaller upload buffers.
284
285        :rtype iterator[b2sdk.transfer.emerge.planner.planner.UploadBuffer]:
286        """
287        if upload_buffer.intent_count() == 0:
288            return
289        tail_buffer = upload_buffer
290        while True:
291            if tail_buffer.length < self.recommended_upload_part_size + self.min_part_size:
292                # `EmergePlanner_buff_partition` can split in such way that tail part
293                # can be smaller than `min_part_size` - to avoid unnecessary download of possible
294                # incoming copy intent, we don't split futher
295                yield tail_buffer
296                return
297            head_buff, tail_buffer = self._buff_partition(tail_buffer)
298            yield head_buff
299
300    def _buff_partition(self, upload_buffer):
301        """ Split upload buffer to two parts (smaller upload buffers).
302
303        In result left part cannot be splitted more, and nothing can be assumed about right part.
304
305        :rtype tuple(b2sdk.transfer.emerge.planner.planner.UploadBuffer,
306                     b2sdk.transfer.emerge.planner.planner.UploadBuffer):
307        """
308        left_buff = UploadBuffer(upload_buffer.start_offset)
309        buff_start = upload_buffer.start_offset
310        for idx, (intent, fragment_end) in enumerate(upload_buffer.iter_items()):
311            candidate_size = fragment_end - buff_start
312            if candidate_size > self.recommended_upload_part_size:
313                right_fragment_size = candidate_size - self.recommended_upload_part_size
314                left_buff.append(intent, fragment_end - right_fragment_size)
315                return left_buff, upload_buffer.get_slice(
316                    start_idx=idx, start_offset=left_buff.end_offset
317                )
318            else:
319                left_buff.append(intent, fragment_end)
320                if candidate_size == self.recommended_upload_part_size:
321                    return left_buff, upload_buffer.get_slice(start_idx=idx + 1)
322
323        return left_buff, UploadBuffer(left_buff.end_offset)
324
325    def _select_intent_fragments(self, write_intent_iterator):
326        """ Select overapping write intent fragments to use.
327
328        To solve overlapping intents selection, intents can be split to smaller fragments.
329        Those fragments are yieled as soon as decision can be made to use them,
330        so there is possibility that one intent is yielded in multiple fragments. Those
331        would be merged again by higher level iterator that produces emerge parts, but
332        in principle this merging can happen here. Not merging it is a code design decision
333        to make this function easier to implement and also it would allow yielding emerge parts
334        a bit quickier.
335        """
336
337        # `protected_intent_length` for upload state is 0, so it would generate at most single intent fragment
338        # every loop iteration, but algorithm is not assuming that - one may one day choose to
339        # protect upload fragments length too - eg. to avoid situation when file is opened to
340        # read just small number of bytes and then switch to another overlapping upload source
341        upload_intents_state = IntentsState()
342        copy_intents_state = IntentsState(protected_intent_length=self.min_part_size)
343
344        last_sent_offset = 0
345        incoming_offset = None
346        while True:
347            incoming_intent = next(write_intent_iterator, None)
348            if incoming_intent is None:
349                incoming_offset = None
350            else:
351                incoming_offset = incoming_intent.destination_offset
352
353            upload_intents = list(
354                upload_intents_state.state_update(last_sent_offset, incoming_offset)
355            )
356            copy_intents = list(copy_intents_state.state_update(last_sent_offset, incoming_offset))
357
358            intent_fragments = self._merge_intent_fragments(
359                last_sent_offset,
360                upload_intents,
361                copy_intents,
362            )
363
364            for intent, intent_fragment_end in intent_fragments:
365                yield intent, intent_fragment_end
366                last_sent_offset = intent_fragment_end
367
368            if incoming_offset is not None and last_sent_offset < incoming_offset:
369                raise ValueError(
370                    'Cannot emerge file with holes. '
371                    'Found hole range: ({}, {})'.format(last_sent_offset, incoming_offset)
372                )
373
374            if incoming_intent is None:
375                yield None, None  # lets yield sentinel for cleaner `_get_emerge_parts` implementation
376                return
377            if incoming_intent.is_upload():
378                upload_intents_state.add(incoming_intent)
379            elif incoming_intent.is_copy():
380                copy_intents_state.add(incoming_intent)
381            else:
382                raise RuntimeError('This should not happen at all!')
383
384    def _merge_intent_fragments(self, start_offset, upload_intents, copy_intents):
385        """ Select "competing" upload and copy fragments.
386
387        Upload and copy fragments may overlap so we nedd to choose right one
388        to use - copy fragments are prioritized unless this fragment is unprotected
389        (we use "protection" as an abstract for "short copy" fragments - meaning upload
390        fragments have higher priority than "short copy")
391        """
392        upload_intents = deque(upload_intents)
393        copy_intents = deque(copy_intents)
394        while True:
395            upload_intent = copy_intent = None
396            if upload_intents:
397                upload_intent, upload_end, _ = upload_intents[0]
398            if copy_intents:
399                copy_intent, copy_end, copy_protected = copy_intents[0]
400
401            if upload_intent is not None and copy_intent is not None:
402                if not copy_protected:
403                    yield_intent = upload_intent
404                else:
405                    yield_intent = copy_intent
406                start_offset = min(upload_end, copy_end)
407                yield yield_intent, start_offset
408                if start_offset >= upload_end:
409                    upload_intents.popleft()
410                if start_offset >= copy_end:
411                    copy_intents.popleft()
412            elif upload_intent is not None:
413                yield upload_intent, upload_end
414                upload_intents.popleft()
415            elif copy_intent is not None:
416                yield copy_intent, copy_end
417                copy_intents.popleft()
418            else:
419                return
420
421    def _validatation_iterator(self, write_intents):
422        """ Iterate over write intents and validate length and order. """
423        last_offset = 0
424        for write_intent in write_intents:
425            if write_intent.length is None:
426                raise ValueError('Planner cannot support write intents of unknown length')
427            if write_intent.destination_offset < last_offset:
428                raise ValueError('Write intent stream have to be sorted by destination offset')
429            last_offset = write_intent.destination_offset
430            yield write_intent
431
432
433class IntentsState(object):
434    """ Store and process state of incoming write intents to solve
435    overlapping intents selection in streaming manner.
436
437    It does not check if intents are of the same kind (upload/copy), but the intention
438    was to use it to split incoming intents by kind (two intents state are required then).
439    If there would be no need for differentiating incoming intents, this would
440    still work - so intent kind is ignored at this level. To address "short copy"
441    prioritization problem (and avoidance) - ``protected_intent_length`` param was introduced
442    to prevent logic from allowing too small fragments (if it is possible)
443    """
444
445    def __init__(self, protected_intent_length=0):
446        self.protected_intent_length = protected_intent_length
447        self._current_intent = None
448        self._next_intent = None
449        self._last_sent_offset = 0
450        self._incoming_offset = None
451        self._current_intent_start = None
452        self._current_intent_end = None
453        self._next_intent_end = None
454
455    def add(self, incoming_intent):
456        """ Add incoming intent to state.
457
458        It has to called *after* ``IntentsState.state_update`` but it is not verified.
459        """
460        if self._next_intent is None:
461            self._set_next_intent(incoming_intent)
462        elif incoming_intent.destination_end_offset > self._next_intent_end:
463            # here either incoming intent starts at the same position as next intent
464            # (and current intent is None in such case - it was just cleared in `state_update`
465            # or it was cleared some time ago - in previous iteratios) or we are in situation
466            # when current and next intent overlaps, and `last_sent_offset` is now set to
467            # incoming intent `destination_offset` - in both cases we want to choose
468            # intent which has larger `destination_end_offset`
469            self._set_next_intent(incoming_intent)
470
471    def state_update(self, last_sent_offset, incoming_offset):
472        """ Update the state using incoming intent offset.
473
474        It has to be called *before* ``IntentsState.add`` and even if incoming intent
475        would not be added to this intents state. It would yield a state of this stream
476        of intents (like copy or upload) from ``last_sent_offset`` to ``incoming_offset``.
477        So here happens the first stage of solving overlapping intents selection - but
478        write intent iterator can be splitted to multiple substreams (like copy and upload)
479        so additional stage is required to cover this.
480        """
481        if self._current_intent is not None:
482            if last_sent_offset >= self._current_intent_end:
483                self._set_current_intent(None, None)
484
485        # `effective_incoming_offset` is a safeguard after intent iterator is drained
486        if incoming_offset is not None:
487            effective_incoming_offset = incoming_offset
488        elif self._next_intent is not None:
489            effective_incoming_offset = self._next_intent_end
490        elif self._current_intent is not None:
491            effective_incoming_offset = self._current_intent_end
492        else:
493            # intent iterator is drained and this state is empty
494            return
495
496        if (
497            self._current_intent is None and self._next_intent is not None and (
498                self._next_intent.destination_offset != effective_incoming_offset or
499                incoming_offset is None
500            )
501        ):
502            self._set_current_intent(self._next_intent, last_sent_offset)
503            self._set_next_intent(None)
504
505        # current and next can be both not None at this point only if they overlap
506        if (
507            self._current_intent is not None and self._next_intent is not None and
508            effective_incoming_offset > self._current_intent_end
509        ):
510            # incoming intent does not overlap with current intent
511            # so we switch to next because we are sure that we will have to use it anyway
512            # (of course other overriding (eg. "copy" over "upload") state can have
513            # overlapping intent but we have no information about it here)
514            # but we also need to protect current intent length
515            if not self._is_current_intent_protected():
516                # we were unable to protect current intent, so we can safely rotate
517                self._set_current_intent(self._next_intent, last_sent_offset)
518                self._set_next_intent(None)
519            else:
520                remaining_len = self.protected_intent_length - (
521                    last_sent_offset - self._current_intent_start
522                )
523                if remaining_len > 0:
524                    last_sent_offset += remaining_len
525                    if not self._can_be_protected(last_sent_offset, self._next_intent_end):
526                        last_sent_offset = self._current_intent_end
527                    yield self._current_intent, last_sent_offset, True
528                self._set_current_intent(self._next_intent, last_sent_offset)
529                self._set_next_intent(None)
530
531        if self._current_intent is not None:
532            yield (
533                self._current_intent,
534                min(effective_incoming_offset, self._current_intent_end),
535                self._is_current_intent_protected(),
536            )
537
538    def _set_current_intent(self, intent, start_offset):
539        self._current_intent = intent
540        if self._current_intent is not None:
541            self._current_intent_end = self._current_intent.destination_end_offset
542        else:
543            self._current_intent_end = None
544            assert start_offset is None
545        self._current_intent_start = start_offset
546
547    def _set_next_intent(self, intent):
548        self._next_intent = intent
549        if self._next_intent is not None:
550            self._next_intent_end = self._next_intent.destination_end_offset
551        else:
552            self._next_intent_end = None
553
554    def _is_current_intent_protected(self):
555        """ States if current intent is protected.
556
557        Intent can be split to smaller fragments, but to choose upload over "small copy"
558        we need to know for fragment if it is a "small copy" or not. In result of solving
559        overlapping intents selection there might be a situation when original intent was not
560        a small copy, but in effect it will be used only partially and in effect it may be a "small copy".
561        Algorithm attempts to aviod using smaller fragments than ``protected_intent_length`` but
562        sometimes it may be impossible. So if this function returns ``False`` it means
563        that used length of this intent is smaller than ``protected_intent_length`` and the algorithm
564        was unable to avoid this.
565        """
566        return self._can_be_protected(self._current_intent_start, self._current_intent_end)
567
568    def _can_be_protected(self, start, end):
569        return end - start >= self.protected_intent_length
570
571
572class BaseEmergePlan(metaclass=ABCMeta):
573    def __init__(self, emerge_parts):
574        self.emerge_parts = emerge_parts
575
576    @abstractmethod
577    def is_large_file(self):
578        pass
579
580    @abstractmethod
581    def get_total_length(self):
582        pass
583
584    @abstractmethod
585    def get_plan_id(self):
586        pass
587
588    def enumerate_emerge_parts(self):
589        return enumerate(self.emerge_parts, 1)
590
591
592class EmergePlan(BaseEmergePlan):
593    def __init__(self, emerge_parts):
594        super(EmergePlan, self).__init__(list(emerge_parts))
595        self._is_large_file = len(self.emerge_parts) > 1
596
597    def is_large_file(self):
598        return self._is_large_file
599
600    def get_total_length(self):
601        return sum(emerge_part.get_length() for emerge_part in self.emerge_parts)
602
603    def get_plan_id(self):
604        if all(part.is_hashable() for part in self.emerge_parts):
605            return None
606
607        json_id = json.dumps([emerge_part.get_part_id() for emerge_part in self.emerge_parts])
608        return hashlib.sha1(json_id.encode()).hexdigest()
609
610
611class StreamingEmergePlan(BaseEmergePlan):
612    def __init__(self, emerge_parts_iterator):
613        emerge_parts, self._is_large_file = self._peek_for_large_file(emerge_parts_iterator)
614        super(StreamingEmergePlan, self).__init__(emerge_parts)
615
616    def is_large_file(self):
617        return self._is_large_file
618
619    def get_total_length(self):
620        return None
621
622    def get_plan_id(self):
623        return None
624
625    def _peek_for_large_file(self, emerge_parts_iterator):
626        first_part = next(emerge_parts_iterator, None)
627        if first_part is None:
628            raise ValueError('Empty emerge parts iterator')
629
630        second_part = next(emerge_parts_iterator, None)
631        if second_part is None:
632            return iter([first_part]), False
633        else:
634            return chain([first_part, second_part], emerge_parts_iterator), True
635
636
637class EmergePart(object):
638    def __init__(self, part_definition, verification_ranges=None):
639        self.part_definition = part_definition
640        self.verification_ranges = verification_ranges
641
642    def __repr__(self):
643        return '<{classname} part_definition={part_definition}>'.format(
644            classname=self.__class__.__name__,
645            part_definition=repr(self.part_definition),
646        )
647
648    def get_length(self):
649        return self.part_definition.get_length()
650
651    def get_execution_step(self, execution_step_factory):
652        return self.part_definition.get_execution_step(execution_step_factory)
653
654    def get_part_id(self):
655        return self.part_definition.get_part_id()
656
657    def is_hashable(self):
658        return self.part_definition.is_hashable()
659
660    def get_sha1(self):
661        return self.part_definition.get_sha1()
662