1import array
2import copy
3import xml.etree.ElementTree as ET
4from xml.dom import minidom
5
6import numpy as np
7from PyQt5.QtCore import QObject, pyqtSignal, Qt
8
9from urh import settings
10from urh.cythonext import signal_functions
11from urh.signalprocessing.Encoding import Encoding
12from urh.signalprocessing.Message import Message
13from urh.signalprocessing.MessageType import MessageType
14from urh.signalprocessing.Modulator import Modulator
15from urh.signalprocessing.Participant import Participant
16from urh.signalprocessing.ProtocoLabel import ProtocolLabel
17from urh.signalprocessing.Signal import Signal
18from urh.util import util as urh_util, util
19from urh.util.Logger import logger
20
21
22class ProtocolAnalyzerSignals(QObject):
23    protocol_updated = pyqtSignal()
24    show_state_changed = pyqtSignal()
25    sniff_device_errors_changed = pyqtSignal(str)
26    line_duplicated = pyqtSignal()
27    fuzzing_started = pyqtSignal(int)
28    current_fuzzing_message_changed = pyqtSignal(int)
29    fuzzing_finished = pyqtSignal()
30
31    def __init__(self, parent=None):
32        super().__init__(parent)
33
34
35class ProtocolAnalyzer(object):
36    """
37    The ProtocolAnalyzer is what you would refer to as "protocol".
38    The data is stored in the messages variable.
39    This class offers several methods for protocol analysis.
40    """
41
42    def __init__(self, signal: Signal or None, filename=None):
43        self.messages = []  # type: list[Message]
44        self.signal = signal
45        if filename is None:
46            self.filename = self.signal.filename if self.signal is not None else ""
47        else:
48            assert signal is None
49            self.filename = filename
50
51        self.__name = urh_util.get_name_from_filename(filename)  # Fallback if Signal has no Name
52
53        self.show = Qt.Checked  # Show in Compare Frame?
54        self.qt_signals = ProtocolAnalyzerSignals()
55
56        self.decoder = Encoding(["Non Return To Zero (NRZ)"])  # For Default Encoding of Protocol
57
58        self.message_types = [MessageType("Default")]
59
60    @property
61    def default_message_type(self) -> MessageType:
62        if len(self.message_types) == 0:
63            self.message_types.append(MessageType("Default"))
64
65        return self.message_types[0]
66
67    @default_message_type.setter
68    def default_message_type(self, val: MessageType):
69        if len(self.message_types) > 0:
70            self.message_types[0] = val
71        else:
72            self.message_types.append(val)
73
74    @property
75    def protocol_labels(self):
76        """
77
78        :rtype: list of ProtocolLabel
79        """
80        return [lbl for message_type in self.message_types for lbl in message_type]
81
82    def __deepcopy__(self, memo):
83        cls = self.__class__
84        result = cls.__new__(cls)
85        memo[id(self)] = result
86        for k, v in self.__dict__.items():
87            if k != "qt_signals" and k != "signal":
88                setattr(result, k, copy.deepcopy(v, memo))
89        result.signal = self.signal
90        result.qt_signals = ProtocolAnalyzerSignals()
91        return result
92
93    @property
94    def name(self):
95        name = self.signal.name if self.signal is not None else self.__name
96        return name
97
98    @name.setter
99    def name(self, val: str):
100        if self.signal is None:
101            self.__name = val
102        else:
103            self.signal.name = val
104
105    @property
106    def pauses(self):
107        return [msg.pause for msg in self.messages]
108
109    @property
110    def plain_bits_str(self):
111        return [str(msg) for msg in self.messages]
112
113    @property
114    def plain_hex_str(self):
115        return [msg.plain_hex_str for msg in self.messages]
116
117    @property
118    def plain_ascii_str(self):
119        return [msg.plain_ascii_str for msg in self.messages]
120
121    @property
122    def decoded_bits(self):
123        return [msg.decoded_bits for msg in self.messages]
124
125    @property
126    def decoded_proto_bits_str(self):
127        """
128
129        :rtype: list of str
130        """
131        return [msg.decoded_bits_str for msg in self.messages]
132
133    @property
134    def decoded_hex_str(self):
135        """
136
137        :rtype: list of str
138        """
139        return [msg.decoded_hex_str for msg in self.messages]
140
141    @property
142    def decoded_ascii_str(self):
143        """
144
145        :rtype: list of str
146        """
147        return [msg.decoded_ascii_str for msg in self.messages]
148
149    @property
150    def num_messages(self):
151        return len([msg for msg in self.messages if msg])
152
153    def clear_decoded_bits(self):
154        [msg.clear_decoded_bits() for msg in self.messages]
155
156    def decoded_to_str_list(self, view_type):
157        if view_type == 0:
158            return self.decoded_proto_bits_str
159        elif view_type == 1:
160            return self.decoded_hex_str
161        elif view_type == 2:
162            return self.decoded_ascii_str
163
164    def plain_to_string(self, view: int, show_pauses=True) -> str:
165        """
166
167        :param view: 0 - Bits ## 1 - Hex ## 2 - ASCII
168        """
169        time = settings.read('show_pause_as_time', type=bool)
170        if show_pauses and time and self.signal:
171            srate = self.signal.sample_rate
172        else:
173            srate = None
174
175        return '\n'.join(msg.view_to_string(view, False, show_pauses,
176                                            sample_rate=srate
177                                            ) for msg in self.messages)
178
179    def plain_to_html(self, view, show_pauses=True) -> str:
180        time = settings.read('show_pause_as_time', type=bool)
181        if show_pauses and time and self.signal:
182            srate = self.signal.sample_rate
183        else:
184            srate = None
185
186        result = []
187        for message in self.messages:
188            cur_str = ""
189            if message.participant:
190                color = settings.PARTICIPANT_COLORS[message.participant.color_index]
191                red, green, blue = color.red(), color.green(), color.blue()
192                fgcolor = "#000000" if (red * 0.299 + green * 0.587 + blue * 0.114) > 186 else "#ffffff"
193                cur_str += '<span style="background-color: rgb({0},{1},{2}); color: {3}">'.format(red, green, blue,
194                                                                                                  fgcolor)
195
196                # cur_str += '<span style="color: rgb({0},{1},{2})">'.format(red, green, blue)
197
198            cur_str += message.view_to_string(view=view, decoded=False, show_pauses=False, sample_rate=srate)
199
200            if message.participant:
201                cur_str += '</span>'
202
203            cur_str += message.get_pause_str(sample_rate=srate)
204            result.append(cur_str)
205
206        return "<br>".join(result)
207
208    def set_decoder_for_messages(self, decoder: Encoding, messages=None):
209        messages = messages if messages is not None else self.messages
210        self.decoder = decoder
211        for message in messages:
212            message.decoder = decoder
213
214    def get_protocol_from_signal(self):
215        signal = self.signal
216        if signal is None:
217            self.messages = None
218            return
219
220        if self.messages is not None:
221            self.messages[:] = []
222        else:
223            self.messages = []
224
225        samples_per_symbol = signal.samples_per_symbol
226
227        ppseq = signal_functions.grab_pulse_lens(signal.qad, signal.center, signal.tolerance,
228                                                 signal.modulation_type, signal.samples_per_symbol,
229                                                 signal.bits_per_symbol, signal.center_spacing)
230
231        bit_data, pauses, bit_sample_pos = self._ppseq_to_bits(ppseq, samples_per_symbol, self.signal.bits_per_symbol,
232                                                               pause_threshold=signal.pause_threshold)
233        if signal.message_length_divisor > 1 and signal.modulation_type == "ASK":
234            self.__ensure_message_length_multiple(bit_data, signal.samples_per_symbol, pauses, bit_sample_pos,
235                                                  signal.message_length_divisor)
236
237        i = 0
238        for bits, pause in zip(bit_data, pauses):
239            middle_bit_pos = bit_sample_pos[i][int(len(bits) / 2)]
240            start, end = middle_bit_pos, middle_bit_pos + samples_per_symbol
241            rssi = np.mean(signal.iq_array.subarray(start, end).magnitudes_normalized)
242            message = Message(bits, pause, message_type=self.default_message_type,
243                              samples_per_symbol=samples_per_symbol, rssi=rssi, decoder=self.decoder,
244                              bit_sample_pos=bit_sample_pos[i], bits_per_symbol=signal.bits_per_symbol)
245            self.messages.append(message)
246            i += 1
247
248        self.qt_signals.protocol_updated.emit()
249
250    @staticmethod
251    def __ensure_message_length_multiple(bit_data, samples_per_symbol: int, pauses, bit_sample_pos, divisor: int):
252        """
253        In case of ASK modulation, this method tries to use pauses after messages as zero bits so that
254        the bit lengths of messages are divisible by divisor
255
256        :param bit_data: List of bit arrays
257        :param samples_per_symbol: Symbol length that was used for demodulation
258        :param pauses: List of pauses
259        :param bit_sample_pos: List of Array of bit sample positions
260        :param divisor: Divisor the messages should be divisible by
261        """
262        for i in range(len(bit_data)):
263            missing_bits = (divisor - (len(bit_data[i]) % divisor)) % divisor
264            if missing_bits > 0 and pauses[i] >= samples_per_symbol * missing_bits:
265                bit_data[i].extend([0] * missing_bits)
266                pauses[i] = pauses[i] - missing_bits * samples_per_symbol
267
268                try:
269                    bit_sample_pos[i][-1] = bit_sample_pos[i][-2] + samples_per_symbol
270                except IndexError as e:
271                    logger.warning("Error padding message " + str(e))
272                    continue
273
274                bit_sample_pos[i].extend([bit_sample_pos[i][-1] + (k + 1) * samples_per_symbol for k in range(missing_bits - 1)])
275                bit_sample_pos[i].append(bit_sample_pos[i][-1] + pauses[i])
276
277    def _ppseq_to_bits(self, ppseq, samples_per_symbol: int, bits_per_symbol: int, write_bit_sample_pos=True, pause_threshold=8):
278        bit_sampl_pos = array.array("L", [])
279        bit_sample_positions = []
280
281        data_bits = array.array("B", [])
282        resulting_data_bits = []
283        pauses = array.array("L", [])
284        start = 0
285        total_samples = 0
286
287        pause_type = -1
288
289        there_was_data = False
290
291        samples_per_bit = int(samples_per_symbol/bits_per_symbol)
292
293        if len(ppseq) > 0 and ppseq[0, 0] == pause_type:
294            start = 1  # Starts with Pause
295            total_samples = ppseq[0, 1]
296
297        for i in range(start, len(ppseq)):
298            cur_pulse_type = ppseq[i, 0]
299            num_samples = ppseq[i, 1]
300            num_symbols_float = num_samples / samples_per_symbol
301            num_symbols = int(num_symbols_float)
302            decimal_place = num_symbols_float - num_symbols
303
304            if decimal_place > 0.5:
305                num_symbols += 1
306
307            if cur_pulse_type == pause_type:
308                # OOK
309                if num_symbols <= pause_threshold or pause_threshold == 0:
310                    data_bits.extend([0] * (num_symbols * bits_per_symbol))
311                    if write_bit_sample_pos:
312                        bit_sampl_pos.extend([total_samples + k * samples_per_bit
313                                              for k in range(num_symbols*bits_per_symbol)])
314
315                elif not there_was_data:
316                    # Ignore this pause, if there were no information
317                    # transmitted previously
318                    data_bits[:] = array.array("B", [])
319                    bit_sampl_pos[:] = array.array("L", [])
320
321                else:
322                    if write_bit_sample_pos:
323                        bit_sampl_pos.append(total_samples)
324                        bit_sampl_pos.append(total_samples + num_samples)
325                        bit_sample_positions.append(bit_sampl_pos[:])
326                        bit_sampl_pos[:] = array.array("L", [])
327
328                    resulting_data_bits.append(data_bits[:])
329                    data_bits[:] = array.array("B", [])
330                    pauses.append(num_samples)
331                    there_was_data = False
332            else:
333                data_bits.extend(util.number_to_bits(cur_pulse_type, bits_per_symbol) * num_symbols)
334                if not there_was_data and num_symbols > 0:
335                    there_was_data = True
336                if write_bit_sample_pos:
337                    bit_sampl_pos.extend([total_samples + k * samples_per_bit for k in range(num_symbols*bits_per_symbol)])
338
339            total_samples += num_samples
340
341        if there_was_data:
342            resulting_data_bits.append(data_bits[:])
343            if write_bit_sample_pos:
344                bit_sample_positions.append(bit_sampl_pos[:] + array.array("L", [total_samples]))
345            pause = ppseq[-1, 1] if ppseq[-1, 0] == pause_type else 0
346            pauses.append(pause)
347
348        return resulting_data_bits, pauses, bit_sample_positions
349
350    def get_samplepos_of_bitseq(self, start_message: int, start_index: int, end_message: int, end_index: int,
351                                include_pause: bool):
352        """
353        Determine on which place (regarding samples) a bit sequence is
354        :rtype: tuple[int,int]
355        """
356        try:
357            if start_message > end_message:
358                start_message, end_message = end_message, start_message
359
360            if start_index >= len(self.messages[start_message].bit_sample_pos) - 1:
361                start_index = len(self.messages[start_message].bit_sample_pos) - 1
362                if not include_pause:
363                    start_index -= 1
364
365            if end_index >= len(self.messages[end_message].bit_sample_pos) - 1:
366                end_index = len(self.messages[end_message].bit_sample_pos) - 1
367                if not include_pause:
368                    end_index -= 1
369
370            start = self.messages[start_message].bit_sample_pos[start_index]
371            num_samples = self.messages[end_message].bit_sample_pos[end_index] - start
372
373            return start, num_samples
374        except (KeyError, IndexError):
375            return -1, -1
376
377    def get_bitseq_from_selection(self, selection_start: int, selection_width: int):
378        """
379        get start and end index of bit sequence from selected samples
380
381        :rtype: tuple[int,int,int,int]
382        :return: start_message index, start index, end message index, end index
383        """
384        start_message, start_index, end_message, end_index = -1, -1, -1, -1
385        if not self.messages or not self.messages[0].bit_sample_pos:
386            return start_message, start_index, end_message, end_index
387
388        if selection_start + selection_width < self.messages[0].bit_sample_pos[0]:
389            return start_message, start_index, end_message, end_index
390
391        for i, msg in enumerate(self.messages):
392            msg_sample_pos = msg.bit_sample_pos
393            if msg_sample_pos[-2] < selection_start:
394                continue
395            elif start_message == -1:
396                start_message = i
397                for j, sample_pos in enumerate(msg_sample_pos):
398                    if sample_pos < selection_start:
399                        continue
400                    elif start_index == -1:
401                        start_index = j
402                        if msg_sample_pos[-1] - selection_start < selection_width:
403                            break
404                    elif sample_pos - selection_start > selection_width:
405                        return start_message, start_index, i, j
406            elif msg_sample_pos[-1] - selection_start < selection_width:
407                continue
408            else:
409                for j, sample_pos in enumerate(msg_sample_pos):
410                    if sample_pos - selection_start > selection_width:
411                        return start_message, start_index, i, j
412
413        last_message = len(self.messages) - 1
414        last_index = len(self.messages[-1].plain_bits) + 1
415        return start_message, start_index, last_message, last_index
416
417    def delete_messages(self, msg_start: int, msg_end: int, start: int, end: int, view: int, decoded: bool,
418                        update_label_ranges=True):
419        removable_msg_indices = []
420
421        for i in range(msg_start, msg_end + 1):
422            try:
423                bs, be = self.convert_range(start, end, view, 0, decoded, message_indx=i)
424                self.messages[i].clear_decoded_bits()
425                if update_label_ranges:
426                    del self.messages[i][bs:be + 1]
427                else:
428                    self.messages[i].delete_range_without_label_range_update(bs, be + 1)
429                if len(self.messages[i]) == 0:
430                    removable_msg_indices.append(i)
431            except IndexError:
432                continue
433
434        # Remove empty messages and Pause after empty message
435        for i in reversed(removable_msg_indices):
436            del self.messages[i]
437
438        return removable_msg_indices
439
440    def convert_index(self, index: int, from_view: int, to_view: int, decoded: bool, message_indx=-1) -> tuple:
441        """
442        Konvertiert einen Index aus der einen Sicht (z.B. Bit) in eine andere (z.B. Hex)
443
444        :param message_indx: if -1, the message with max length is chosen
445        :return:
446        """
447        if len(self.messages) == 0:
448            return 0, 0
449
450        if message_indx == -1:
451            message_indx = self.messages.index(max(self.messages, key=len))  # Longest message
452
453        if message_indx >= len(self.messages):
454            message_indx = len(self.messages) - 1
455
456        return self.messages[message_indx].convert_index(index, from_view, to_view, decoded)
457
458    def convert_range(self, index1: int, index2: int, from_view: int,
459                      to_view: int, decoded: bool, message_indx=-1):
460        if len(self.messages) == 0:
461            return 0, 0
462
463        if message_indx == -1:
464            message_indx = self.messages.index(max(self.messages, key=len))  # Longest message
465
466        if message_indx >= len(self.messages):
467            message_indx = len(self.messages) - 1
468
469        return self.messages[message_indx].convert_range(index1, index2, from_view, to_view, decoded)
470
471    def estimate_frequency_for_one(self, sample_rate: float, nbits=42) -> float:
472        """
473        Calculates the frequency of at most nbits logical ones and returns the mean of these frequencies
474
475        :param nbits:
476        :return:
477        """
478        return self.__estimate_frequency_for_bit(True, sample_rate, nbits)
479
480    def align_messages(self, pattern: str, view_type: int, use_decoded=True):
481        if view_type == 0:
482            bit_pattern = pattern
483        elif view_type == 1:
484            bit_pattern = "".join(map(str, urh_util.hex2bit(pattern)))
485        elif view_type == 2:
486            bit_pattern = "".join(map(str, urh_util.ascii2bit(pattern)))
487        else:
488            raise ValueError("Unknown view type {}".format(view_type))
489
490        indices = [msg.decoded_bits_str.find(bit_pattern) if use_decoded else msg.plain_bits_str.find(bit_pattern)
491                   for msg in self.messages]
492
493        max_index = max(indices)
494        for i, msg in enumerate(self.messages):
495            msg.alignment_offset = 0 if indices[i] == -1 else max_index - indices[i]
496
497    def estimate_frequency_for_zero(self, sample_rate: float, nbits=42) -> float:
498        """
499        Calculates the frequency of at most nbits logical zeros and returns the mean of these frequencies
500
501        :param nbits:
502        :return:
503        """
504        return self.__estimate_frequency_for_bit(False, sample_rate, nbits)
505
506    def __estimate_frequency_for_bit(self, bit: bool, sample_rate: float, nbits: int) -> float:
507        if nbits == 0:
508            return 0
509
510        assert self.signal is not None
511        frequencies = []
512        for i, message in enumerate(self.messages):
513            for j, msg_bit in enumerate(message.plain_bits):
514                if msg_bit == bit:
515                    start, num_samples = self.get_samplepos_of_bitseq(i, j, i, j + 1, False)
516                    freq = self.signal.estimate_frequency(start, start + num_samples, sample_rate)
517                    frequencies.append(freq)
518                    if len(frequencies) == nbits:
519                        return np.mean(frequencies)
520        if frequencies:
521            return np.mean(frequencies)
522        else:
523            return 0
524
525    def __str__(self):
526        return "ProtoAnalyzer " + self.name
527
528    def set_labels(self, val):
529        self._protocol_labels = val
530
531    def add_new_message_type(self, labels):
532        names = set(message_type.name for message_type in self.message_types)
533        name = "Message type #"
534        i = 0
535        while True:
536            i += 1
537            if name + str(i) not in names:
538                self.message_types.append(
539                    MessageType(name=name + str(i), iterable=[copy.deepcopy(lbl) for lbl in labels]))
540                break
541
542    def to_binary(self, filename: str, use_decoded: bool):
543        with open(filename, "wb") as f:
544            for msg in self.messages:
545                bits = msg.decoded_bits if use_decoded else msg.plain_bits
546                aggregated = urh_util.aggregate_bits(bits, size=8)
547                f.write(bytes(aggregated))
548
549    def from_binary(self, filename: str):
550        aggregated = np.fromfile(filename, dtype=np.uint8)
551        unaggregated = [int(b) for n in aggregated for b in "{0:08b}".format(n)]
552        self.messages.append(Message(unaggregated, 0, self.default_message_type))
553
554    def to_xml_tag(self, decodings, participants, tag_name="protocol",
555                   include_message_type=False, write_bits=False, messages=None, modulators=None) -> ET.Element:
556        root = ET.Element(tag_name)
557        messages = self.messages if messages is None else messages
558
559        # Save modulators
560        if modulators is not None:  # For protocol analyzer container
561            root.append(Modulator.modulators_to_xml_tag(modulators))
562
563        root.append(Encoding.decodings_to_xml_tag(decodings))
564        root.append(Participant.participants_to_xml_tag(participants))
565
566        # Save data
567        data_tag = ET.SubElement(root, "messages")
568        for i, message in enumerate(messages):
569            message_tag = message.to_xml(decoders=decodings,
570                                         include_message_type=include_message_type,
571                                         write_bits=write_bits)
572            data_tag.append(message_tag)
573
574        # Save message types separatively as not saved in messages already
575        if not include_message_type:
576            message_types_tag = ET.SubElement(root, "message_types")
577            for message_type in self.message_types:
578                message_types_tag.append(message_type.to_xml())
579
580        return root
581
582    def to_xml_file(self, filename: str, decoders, participants, tag_name="protocol",
583                    include_message_types=False, write_bits=False, modulators=None):
584        tag = self.to_xml_tag(decodings=decoders, participants=participants, tag_name=tag_name,
585                              include_message_type=include_message_types, write_bits=write_bits,
586                              modulators=modulators)
587
588        xmlstr = minidom.parseString(ET.tostring(tag)).toprettyxml(indent="   ")
589        with open(filename, "w") as f:
590            for line in xmlstr.split("\n"):
591                if line.strip():
592                    f.write(line + "\n")
593
594    def from_xml_tag(self, root: ET.Element, read_bits=False, participants=None, decodings=None):
595        if not root:
596            return None
597
598        decoders = Encoding.read_decoders_from_xml_tag(root) if decodings is None else decodings
599
600        if participants is None:
601            participants = Participant.read_participants_from_xml_tag(root)
602
603        if read_bits:
604            self.messages[:] = []
605
606        try:
607            message_types = []
608            for message_type_tag in root.find("message_types").findall("message_type"):
609                message_types.append(MessageType.from_xml(message_type_tag))
610        except AttributeError:
611            message_types = []
612
613        for message_type in message_types:
614            if message_type not in self.message_types:
615                self.message_types.append(message_type)
616
617        try:
618            message_tags = root.find("messages").findall("message")
619            for i, message_tag in enumerate(message_tags):
620                if read_bits:
621                    self.messages.append(Message.new_from_xml(tag=message_tag,
622                                                              participants=participants,
623                                                              decoders=decoders,
624                                                              message_types=self.message_types))
625                else:
626                    try:
627                        self.messages[i].from_xml(tag=message_tag, participants=participants,
628                                                  decoders=decoders, message_types=self.message_types)
629                    except IndexError:
630                        pass  # Part of signal was copied in last session but signal was not saved
631
632        except AttributeError:
633            pass
634
635    def from_xml_file(self, filename: str, read_bits=False):
636        try:
637            tree = ET.parse(filename)
638        except FileNotFoundError:
639            logger.error("Could not find file " + filename)
640            return
641        except ET.ParseError:
642            logger.error("Could not parse file " + filename)
643            return
644
645        root = tree.getroot()
646        self.from_xml_tag(root, read_bits=read_bits)
647
648    def eliminate(self):
649        self.message_types = None
650        self.messages = None
651        if self.signal is not None:
652            self.signal.eliminate()
653        self.signal = None
654
655    def update_auto_message_types(self):
656        for message in self.messages:
657            for message_type in filter(lambda m: m.assigned_by_ruleset and len(m.ruleset) > 0, self.message_types):
658                if message_type.ruleset.applies_for_message(message):
659                    message.message_type = message_type
660                    break
661
662    def auto_assign_labels(self):
663        from urh.awre.FormatFinder import FormatFinder
664        format_finder = FormatFinder(self.messages)
665        format_finder.run(max_iterations=10)
666
667        self.message_types[:] = format_finder.message_types
668        for msg_type, indices in format_finder.existing_message_types.items():
669            for i in indices:
670                self.messages[i].message_type = msg_type
671
672    @staticmethod
673    def get_protocol_from_string(message_strings: list, is_hex=None, default_pause=0, sample_rate=1e6):
674        """
675
676        :param message_strings:
677        :param is_hex: None means auto detects
678        :return:
679        """
680        protocol = ProtocolAnalyzer(None)
681
682        def parse_line(line: str):
683            # support transcript files e.g 1 (A->B): 10101111
684            index = line.rfind(" ")
685            line = line[index + 1:]
686
687            # support pauses given like 100101/10s
688            try:
689                data, pause = line.split(settings.PAUSE_SEP)
690            except ValueError:
691                data, pause = line, str(default_pause)
692            if pause.endswith("ms"):
693                pause = float(pause[:-2]) * float(sample_rate) / 1e3
694            elif pause.endswith("µs"):
695                pause = float(pause[:-2]) * float(sample_rate) / 1e6
696            elif pause.endswith("ns"):
697                pause = float(pause[:-2]) * float(sample_rate) / 1e9
698            elif pause.endswith("s"):
699                pause = float(pause[:-1]) * float(sample_rate)
700            else:
701                pause = float(pause)
702
703            return data, int(pause)
704
705        if not is_hex:
706            for line in filter(None, map(str.strip, message_strings)):
707                bits, pause = parse_line(line)
708                try:
709                    protocol.messages.append(Message.from_plain_bits_str(bits, pause=pause))
710                except ValueError:
711                    is_hex = True if is_hex is None else is_hex
712                    break
713
714        if is_hex:
715            protocol.messages.clear()
716            lookup = {"{0:0x}".format(i): "{0:04b}".format(i) for i in range(16)}
717
718            for line in filter(None, map(str.strip, message_strings)):
719                bits, pause = parse_line(line)
720                bit_str = [lookup[bits[i].lower()] for i in range(0, len(bits))]
721                protocol.messages.append(Message.from_plain_bits_str("".join(bit_str), pause=pause))
722
723        return protocol
724