1import array 2import copy 3import xml.etree.ElementTree as ET 4from xml.dom import minidom 5 6import numpy as np 7from PyQt5.QtCore import QObject, pyqtSignal, Qt 8 9from urh import settings 10from urh.cythonext import signal_functions 11from urh.signalprocessing.Encoding import Encoding 12from urh.signalprocessing.Message import Message 13from urh.signalprocessing.MessageType import MessageType 14from urh.signalprocessing.Modulator import Modulator 15from urh.signalprocessing.Participant import Participant 16from urh.signalprocessing.ProtocoLabel import ProtocolLabel 17from urh.signalprocessing.Signal import Signal 18from urh.util import util as urh_util, util 19from urh.util.Logger import logger 20 21 22class ProtocolAnalyzerSignals(QObject): 23 protocol_updated = pyqtSignal() 24 show_state_changed = pyqtSignal() 25 sniff_device_errors_changed = pyqtSignal(str) 26 line_duplicated = pyqtSignal() 27 fuzzing_started = pyqtSignal(int) 28 current_fuzzing_message_changed = pyqtSignal(int) 29 fuzzing_finished = pyqtSignal() 30 31 def __init__(self, parent=None): 32 super().__init__(parent) 33 34 35class ProtocolAnalyzer(object): 36 """ 37 The ProtocolAnalyzer is what you would refer to as "protocol". 38 The data is stored in the messages variable. 39 This class offers several methods for protocol analysis. 40 """ 41 42 def __init__(self, signal: Signal or None, filename=None): 43 self.messages = [] # type: list[Message] 44 self.signal = signal 45 if filename is None: 46 self.filename = self.signal.filename if self.signal is not None else "" 47 else: 48 assert signal is None 49 self.filename = filename 50 51 self.__name = urh_util.get_name_from_filename(filename) # Fallback if Signal has no Name 52 53 self.show = Qt.Checked # Show in Compare Frame? 54 self.qt_signals = ProtocolAnalyzerSignals() 55 56 self.decoder = Encoding(["Non Return To Zero (NRZ)"]) # For Default Encoding of Protocol 57 58 self.message_types = [MessageType("Default")] 59 60 @property 61 def default_message_type(self) -> MessageType: 62 if len(self.message_types) == 0: 63 self.message_types.append(MessageType("Default")) 64 65 return self.message_types[0] 66 67 @default_message_type.setter 68 def default_message_type(self, val: MessageType): 69 if len(self.message_types) > 0: 70 self.message_types[0] = val 71 else: 72 self.message_types.append(val) 73 74 @property 75 def protocol_labels(self): 76 """ 77 78 :rtype: list of ProtocolLabel 79 """ 80 return [lbl for message_type in self.message_types for lbl in message_type] 81 82 def __deepcopy__(self, memo): 83 cls = self.__class__ 84 result = cls.__new__(cls) 85 memo[id(self)] = result 86 for k, v in self.__dict__.items(): 87 if k != "qt_signals" and k != "signal": 88 setattr(result, k, copy.deepcopy(v, memo)) 89 result.signal = self.signal 90 result.qt_signals = ProtocolAnalyzerSignals() 91 return result 92 93 @property 94 def name(self): 95 name = self.signal.name if self.signal is not None else self.__name 96 return name 97 98 @name.setter 99 def name(self, val: str): 100 if self.signal is None: 101 self.__name = val 102 else: 103 self.signal.name = val 104 105 @property 106 def pauses(self): 107 return [msg.pause for msg in self.messages] 108 109 @property 110 def plain_bits_str(self): 111 return [str(msg) for msg in self.messages] 112 113 @property 114 def plain_hex_str(self): 115 return [msg.plain_hex_str for msg in self.messages] 116 117 @property 118 def plain_ascii_str(self): 119 return [msg.plain_ascii_str for msg in self.messages] 120 121 @property 122 def decoded_bits(self): 123 return [msg.decoded_bits for msg in self.messages] 124 125 @property 126 def decoded_proto_bits_str(self): 127 """ 128 129 :rtype: list of str 130 """ 131 return [msg.decoded_bits_str for msg in self.messages] 132 133 @property 134 def decoded_hex_str(self): 135 """ 136 137 :rtype: list of str 138 """ 139 return [msg.decoded_hex_str for msg in self.messages] 140 141 @property 142 def decoded_ascii_str(self): 143 """ 144 145 :rtype: list of str 146 """ 147 return [msg.decoded_ascii_str for msg in self.messages] 148 149 @property 150 def num_messages(self): 151 return len([msg for msg in self.messages if msg]) 152 153 def clear_decoded_bits(self): 154 [msg.clear_decoded_bits() for msg in self.messages] 155 156 def decoded_to_str_list(self, view_type): 157 if view_type == 0: 158 return self.decoded_proto_bits_str 159 elif view_type == 1: 160 return self.decoded_hex_str 161 elif view_type == 2: 162 return self.decoded_ascii_str 163 164 def plain_to_string(self, view: int, show_pauses=True) -> str: 165 """ 166 167 :param view: 0 - Bits ## 1 - Hex ## 2 - ASCII 168 """ 169 time = settings.read('show_pause_as_time', type=bool) 170 if show_pauses and time and self.signal: 171 srate = self.signal.sample_rate 172 else: 173 srate = None 174 175 return '\n'.join(msg.view_to_string(view, False, show_pauses, 176 sample_rate=srate 177 ) for msg in self.messages) 178 179 def plain_to_html(self, view, show_pauses=True) -> str: 180 time = settings.read('show_pause_as_time', type=bool) 181 if show_pauses and time and self.signal: 182 srate = self.signal.sample_rate 183 else: 184 srate = None 185 186 result = [] 187 for message in self.messages: 188 cur_str = "" 189 if message.participant: 190 color = settings.PARTICIPANT_COLORS[message.participant.color_index] 191 red, green, blue = color.red(), color.green(), color.blue() 192 fgcolor = "#000000" if (red * 0.299 + green * 0.587 + blue * 0.114) > 186 else "#ffffff" 193 cur_str += '<span style="background-color: rgb({0},{1},{2}); color: {3}">'.format(red, green, blue, 194 fgcolor) 195 196 # cur_str += '<span style="color: rgb({0},{1},{2})">'.format(red, green, blue) 197 198 cur_str += message.view_to_string(view=view, decoded=False, show_pauses=False, sample_rate=srate) 199 200 if message.participant: 201 cur_str += '</span>' 202 203 cur_str += message.get_pause_str(sample_rate=srate) 204 result.append(cur_str) 205 206 return "<br>".join(result) 207 208 def set_decoder_for_messages(self, decoder: Encoding, messages=None): 209 messages = messages if messages is not None else self.messages 210 self.decoder = decoder 211 for message in messages: 212 message.decoder = decoder 213 214 def get_protocol_from_signal(self): 215 signal = self.signal 216 if signal is None: 217 self.messages = None 218 return 219 220 if self.messages is not None: 221 self.messages[:] = [] 222 else: 223 self.messages = [] 224 225 samples_per_symbol = signal.samples_per_symbol 226 227 ppseq = signal_functions.grab_pulse_lens(signal.qad, signal.center, signal.tolerance, 228 signal.modulation_type, signal.samples_per_symbol, 229 signal.bits_per_symbol, signal.center_spacing) 230 231 bit_data, pauses, bit_sample_pos = self._ppseq_to_bits(ppseq, samples_per_symbol, self.signal.bits_per_symbol, 232 pause_threshold=signal.pause_threshold) 233 if signal.message_length_divisor > 1 and signal.modulation_type == "ASK": 234 self.__ensure_message_length_multiple(bit_data, signal.samples_per_symbol, pauses, bit_sample_pos, 235 signal.message_length_divisor) 236 237 i = 0 238 for bits, pause in zip(bit_data, pauses): 239 middle_bit_pos = bit_sample_pos[i][int(len(bits) / 2)] 240 start, end = middle_bit_pos, middle_bit_pos + samples_per_symbol 241 rssi = np.mean(signal.iq_array.subarray(start, end).magnitudes_normalized) 242 message = Message(bits, pause, message_type=self.default_message_type, 243 samples_per_symbol=samples_per_symbol, rssi=rssi, decoder=self.decoder, 244 bit_sample_pos=bit_sample_pos[i], bits_per_symbol=signal.bits_per_symbol) 245 self.messages.append(message) 246 i += 1 247 248 self.qt_signals.protocol_updated.emit() 249 250 @staticmethod 251 def __ensure_message_length_multiple(bit_data, samples_per_symbol: int, pauses, bit_sample_pos, divisor: int): 252 """ 253 In case of ASK modulation, this method tries to use pauses after messages as zero bits so that 254 the bit lengths of messages are divisible by divisor 255 256 :param bit_data: List of bit arrays 257 :param samples_per_symbol: Symbol length that was used for demodulation 258 :param pauses: List of pauses 259 :param bit_sample_pos: List of Array of bit sample positions 260 :param divisor: Divisor the messages should be divisible by 261 """ 262 for i in range(len(bit_data)): 263 missing_bits = (divisor - (len(bit_data[i]) % divisor)) % divisor 264 if missing_bits > 0 and pauses[i] >= samples_per_symbol * missing_bits: 265 bit_data[i].extend([0] * missing_bits) 266 pauses[i] = pauses[i] - missing_bits * samples_per_symbol 267 268 try: 269 bit_sample_pos[i][-1] = bit_sample_pos[i][-2] + samples_per_symbol 270 except IndexError as e: 271 logger.warning("Error padding message " + str(e)) 272 continue 273 274 bit_sample_pos[i].extend([bit_sample_pos[i][-1] + (k + 1) * samples_per_symbol for k in range(missing_bits - 1)]) 275 bit_sample_pos[i].append(bit_sample_pos[i][-1] + pauses[i]) 276 277 def _ppseq_to_bits(self, ppseq, samples_per_symbol: int, bits_per_symbol: int, write_bit_sample_pos=True, pause_threshold=8): 278 bit_sampl_pos = array.array("L", []) 279 bit_sample_positions = [] 280 281 data_bits = array.array("B", []) 282 resulting_data_bits = [] 283 pauses = array.array("L", []) 284 start = 0 285 total_samples = 0 286 287 pause_type = -1 288 289 there_was_data = False 290 291 samples_per_bit = int(samples_per_symbol/bits_per_symbol) 292 293 if len(ppseq) > 0 and ppseq[0, 0] == pause_type: 294 start = 1 # Starts with Pause 295 total_samples = ppseq[0, 1] 296 297 for i in range(start, len(ppseq)): 298 cur_pulse_type = ppseq[i, 0] 299 num_samples = ppseq[i, 1] 300 num_symbols_float = num_samples / samples_per_symbol 301 num_symbols = int(num_symbols_float) 302 decimal_place = num_symbols_float - num_symbols 303 304 if decimal_place > 0.5: 305 num_symbols += 1 306 307 if cur_pulse_type == pause_type: 308 # OOK 309 if num_symbols <= pause_threshold or pause_threshold == 0: 310 data_bits.extend([0] * (num_symbols * bits_per_symbol)) 311 if write_bit_sample_pos: 312 bit_sampl_pos.extend([total_samples + k * samples_per_bit 313 for k in range(num_symbols*bits_per_symbol)]) 314 315 elif not there_was_data: 316 # Ignore this pause, if there were no information 317 # transmitted previously 318 data_bits[:] = array.array("B", []) 319 bit_sampl_pos[:] = array.array("L", []) 320 321 else: 322 if write_bit_sample_pos: 323 bit_sampl_pos.append(total_samples) 324 bit_sampl_pos.append(total_samples + num_samples) 325 bit_sample_positions.append(bit_sampl_pos[:]) 326 bit_sampl_pos[:] = array.array("L", []) 327 328 resulting_data_bits.append(data_bits[:]) 329 data_bits[:] = array.array("B", []) 330 pauses.append(num_samples) 331 there_was_data = False 332 else: 333 data_bits.extend(util.number_to_bits(cur_pulse_type, bits_per_symbol) * num_symbols) 334 if not there_was_data and num_symbols > 0: 335 there_was_data = True 336 if write_bit_sample_pos: 337 bit_sampl_pos.extend([total_samples + k * samples_per_bit for k in range(num_symbols*bits_per_symbol)]) 338 339 total_samples += num_samples 340 341 if there_was_data: 342 resulting_data_bits.append(data_bits[:]) 343 if write_bit_sample_pos: 344 bit_sample_positions.append(bit_sampl_pos[:] + array.array("L", [total_samples])) 345 pause = ppseq[-1, 1] if ppseq[-1, 0] == pause_type else 0 346 pauses.append(pause) 347 348 return resulting_data_bits, pauses, bit_sample_positions 349 350 def get_samplepos_of_bitseq(self, start_message: int, start_index: int, end_message: int, end_index: int, 351 include_pause: bool): 352 """ 353 Determine on which place (regarding samples) a bit sequence is 354 :rtype: tuple[int,int] 355 """ 356 try: 357 if start_message > end_message: 358 start_message, end_message = end_message, start_message 359 360 if start_index >= len(self.messages[start_message].bit_sample_pos) - 1: 361 start_index = len(self.messages[start_message].bit_sample_pos) - 1 362 if not include_pause: 363 start_index -= 1 364 365 if end_index >= len(self.messages[end_message].bit_sample_pos) - 1: 366 end_index = len(self.messages[end_message].bit_sample_pos) - 1 367 if not include_pause: 368 end_index -= 1 369 370 start = self.messages[start_message].bit_sample_pos[start_index] 371 num_samples = self.messages[end_message].bit_sample_pos[end_index] - start 372 373 return start, num_samples 374 except (KeyError, IndexError): 375 return -1, -1 376 377 def get_bitseq_from_selection(self, selection_start: int, selection_width: int): 378 """ 379 get start and end index of bit sequence from selected samples 380 381 :rtype: tuple[int,int,int,int] 382 :return: start_message index, start index, end message index, end index 383 """ 384 start_message, start_index, end_message, end_index = -1, -1, -1, -1 385 if not self.messages or not self.messages[0].bit_sample_pos: 386 return start_message, start_index, end_message, end_index 387 388 if selection_start + selection_width < self.messages[0].bit_sample_pos[0]: 389 return start_message, start_index, end_message, end_index 390 391 for i, msg in enumerate(self.messages): 392 msg_sample_pos = msg.bit_sample_pos 393 if msg_sample_pos[-2] < selection_start: 394 continue 395 elif start_message == -1: 396 start_message = i 397 for j, sample_pos in enumerate(msg_sample_pos): 398 if sample_pos < selection_start: 399 continue 400 elif start_index == -1: 401 start_index = j 402 if msg_sample_pos[-1] - selection_start < selection_width: 403 break 404 elif sample_pos - selection_start > selection_width: 405 return start_message, start_index, i, j 406 elif msg_sample_pos[-1] - selection_start < selection_width: 407 continue 408 else: 409 for j, sample_pos in enumerate(msg_sample_pos): 410 if sample_pos - selection_start > selection_width: 411 return start_message, start_index, i, j 412 413 last_message = len(self.messages) - 1 414 last_index = len(self.messages[-1].plain_bits) + 1 415 return start_message, start_index, last_message, last_index 416 417 def delete_messages(self, msg_start: int, msg_end: int, start: int, end: int, view: int, decoded: bool, 418 update_label_ranges=True): 419 removable_msg_indices = [] 420 421 for i in range(msg_start, msg_end + 1): 422 try: 423 bs, be = self.convert_range(start, end, view, 0, decoded, message_indx=i) 424 self.messages[i].clear_decoded_bits() 425 if update_label_ranges: 426 del self.messages[i][bs:be + 1] 427 else: 428 self.messages[i].delete_range_without_label_range_update(bs, be + 1) 429 if len(self.messages[i]) == 0: 430 removable_msg_indices.append(i) 431 except IndexError: 432 continue 433 434 # Remove empty messages and Pause after empty message 435 for i in reversed(removable_msg_indices): 436 del self.messages[i] 437 438 return removable_msg_indices 439 440 def convert_index(self, index: int, from_view: int, to_view: int, decoded: bool, message_indx=-1) -> tuple: 441 """ 442 Konvertiert einen Index aus der einen Sicht (z.B. Bit) in eine andere (z.B. Hex) 443 444 :param message_indx: if -1, the message with max length is chosen 445 :return: 446 """ 447 if len(self.messages) == 0: 448 return 0, 0 449 450 if message_indx == -1: 451 message_indx = self.messages.index(max(self.messages, key=len)) # Longest message 452 453 if message_indx >= len(self.messages): 454 message_indx = len(self.messages) - 1 455 456 return self.messages[message_indx].convert_index(index, from_view, to_view, decoded) 457 458 def convert_range(self, index1: int, index2: int, from_view: int, 459 to_view: int, decoded: bool, message_indx=-1): 460 if len(self.messages) == 0: 461 return 0, 0 462 463 if message_indx == -1: 464 message_indx = self.messages.index(max(self.messages, key=len)) # Longest message 465 466 if message_indx >= len(self.messages): 467 message_indx = len(self.messages) - 1 468 469 return self.messages[message_indx].convert_range(index1, index2, from_view, to_view, decoded) 470 471 def estimate_frequency_for_one(self, sample_rate: float, nbits=42) -> float: 472 """ 473 Calculates the frequency of at most nbits logical ones and returns the mean of these frequencies 474 475 :param nbits: 476 :return: 477 """ 478 return self.__estimate_frequency_for_bit(True, sample_rate, nbits) 479 480 def align_messages(self, pattern: str, view_type: int, use_decoded=True): 481 if view_type == 0: 482 bit_pattern = pattern 483 elif view_type == 1: 484 bit_pattern = "".join(map(str, urh_util.hex2bit(pattern))) 485 elif view_type == 2: 486 bit_pattern = "".join(map(str, urh_util.ascii2bit(pattern))) 487 else: 488 raise ValueError("Unknown view type {}".format(view_type)) 489 490 indices = [msg.decoded_bits_str.find(bit_pattern) if use_decoded else msg.plain_bits_str.find(bit_pattern) 491 for msg in self.messages] 492 493 max_index = max(indices) 494 for i, msg in enumerate(self.messages): 495 msg.alignment_offset = 0 if indices[i] == -1 else max_index - indices[i] 496 497 def estimate_frequency_for_zero(self, sample_rate: float, nbits=42) -> float: 498 """ 499 Calculates the frequency of at most nbits logical zeros and returns the mean of these frequencies 500 501 :param nbits: 502 :return: 503 """ 504 return self.__estimate_frequency_for_bit(False, sample_rate, nbits) 505 506 def __estimate_frequency_for_bit(self, bit: bool, sample_rate: float, nbits: int) -> float: 507 if nbits == 0: 508 return 0 509 510 assert self.signal is not None 511 frequencies = [] 512 for i, message in enumerate(self.messages): 513 for j, msg_bit in enumerate(message.plain_bits): 514 if msg_bit == bit: 515 start, num_samples = self.get_samplepos_of_bitseq(i, j, i, j + 1, False) 516 freq = self.signal.estimate_frequency(start, start + num_samples, sample_rate) 517 frequencies.append(freq) 518 if len(frequencies) == nbits: 519 return np.mean(frequencies) 520 if frequencies: 521 return np.mean(frequencies) 522 else: 523 return 0 524 525 def __str__(self): 526 return "ProtoAnalyzer " + self.name 527 528 def set_labels(self, val): 529 self._protocol_labels = val 530 531 def add_new_message_type(self, labels): 532 names = set(message_type.name for message_type in self.message_types) 533 name = "Message type #" 534 i = 0 535 while True: 536 i += 1 537 if name + str(i) not in names: 538 self.message_types.append( 539 MessageType(name=name + str(i), iterable=[copy.deepcopy(lbl) for lbl in labels])) 540 break 541 542 def to_binary(self, filename: str, use_decoded: bool): 543 with open(filename, "wb") as f: 544 for msg in self.messages: 545 bits = msg.decoded_bits if use_decoded else msg.plain_bits 546 aggregated = urh_util.aggregate_bits(bits, size=8) 547 f.write(bytes(aggregated)) 548 549 def from_binary(self, filename: str): 550 aggregated = np.fromfile(filename, dtype=np.uint8) 551 unaggregated = [int(b) for n in aggregated for b in "{0:08b}".format(n)] 552 self.messages.append(Message(unaggregated, 0, self.default_message_type)) 553 554 def to_xml_tag(self, decodings, participants, tag_name="protocol", 555 include_message_type=False, write_bits=False, messages=None, modulators=None) -> ET.Element: 556 root = ET.Element(tag_name) 557 messages = self.messages if messages is None else messages 558 559 # Save modulators 560 if modulators is not None: # For protocol analyzer container 561 root.append(Modulator.modulators_to_xml_tag(modulators)) 562 563 root.append(Encoding.decodings_to_xml_tag(decodings)) 564 root.append(Participant.participants_to_xml_tag(participants)) 565 566 # Save data 567 data_tag = ET.SubElement(root, "messages") 568 for i, message in enumerate(messages): 569 message_tag = message.to_xml(decoders=decodings, 570 include_message_type=include_message_type, 571 write_bits=write_bits) 572 data_tag.append(message_tag) 573 574 # Save message types separatively as not saved in messages already 575 if not include_message_type: 576 message_types_tag = ET.SubElement(root, "message_types") 577 for message_type in self.message_types: 578 message_types_tag.append(message_type.to_xml()) 579 580 return root 581 582 def to_xml_file(self, filename: str, decoders, participants, tag_name="protocol", 583 include_message_types=False, write_bits=False, modulators=None): 584 tag = self.to_xml_tag(decodings=decoders, participants=participants, tag_name=tag_name, 585 include_message_type=include_message_types, write_bits=write_bits, 586 modulators=modulators) 587 588 xmlstr = minidom.parseString(ET.tostring(tag)).toprettyxml(indent=" ") 589 with open(filename, "w") as f: 590 for line in xmlstr.split("\n"): 591 if line.strip(): 592 f.write(line + "\n") 593 594 def from_xml_tag(self, root: ET.Element, read_bits=False, participants=None, decodings=None): 595 if not root: 596 return None 597 598 decoders = Encoding.read_decoders_from_xml_tag(root) if decodings is None else decodings 599 600 if participants is None: 601 participants = Participant.read_participants_from_xml_tag(root) 602 603 if read_bits: 604 self.messages[:] = [] 605 606 try: 607 message_types = [] 608 for message_type_tag in root.find("message_types").findall("message_type"): 609 message_types.append(MessageType.from_xml(message_type_tag)) 610 except AttributeError: 611 message_types = [] 612 613 for message_type in message_types: 614 if message_type not in self.message_types: 615 self.message_types.append(message_type) 616 617 try: 618 message_tags = root.find("messages").findall("message") 619 for i, message_tag in enumerate(message_tags): 620 if read_bits: 621 self.messages.append(Message.new_from_xml(tag=message_tag, 622 participants=participants, 623 decoders=decoders, 624 message_types=self.message_types)) 625 else: 626 try: 627 self.messages[i].from_xml(tag=message_tag, participants=participants, 628 decoders=decoders, message_types=self.message_types) 629 except IndexError: 630 pass # Part of signal was copied in last session but signal was not saved 631 632 except AttributeError: 633 pass 634 635 def from_xml_file(self, filename: str, read_bits=False): 636 try: 637 tree = ET.parse(filename) 638 except FileNotFoundError: 639 logger.error("Could not find file " + filename) 640 return 641 except ET.ParseError: 642 logger.error("Could not parse file " + filename) 643 return 644 645 root = tree.getroot() 646 self.from_xml_tag(root, read_bits=read_bits) 647 648 def eliminate(self): 649 self.message_types = None 650 self.messages = None 651 if self.signal is not None: 652 self.signal.eliminate() 653 self.signal = None 654 655 def update_auto_message_types(self): 656 for message in self.messages: 657 for message_type in filter(lambda m: m.assigned_by_ruleset and len(m.ruleset) > 0, self.message_types): 658 if message_type.ruleset.applies_for_message(message): 659 message.message_type = message_type 660 break 661 662 def auto_assign_labels(self): 663 from urh.awre.FormatFinder import FormatFinder 664 format_finder = FormatFinder(self.messages) 665 format_finder.run(max_iterations=10) 666 667 self.message_types[:] = format_finder.message_types 668 for msg_type, indices in format_finder.existing_message_types.items(): 669 for i in indices: 670 self.messages[i].message_type = msg_type 671 672 @staticmethod 673 def get_protocol_from_string(message_strings: list, is_hex=None, default_pause=0, sample_rate=1e6): 674 """ 675 676 :param message_strings: 677 :param is_hex: None means auto detects 678 :return: 679 """ 680 protocol = ProtocolAnalyzer(None) 681 682 def parse_line(line: str): 683 # support transcript files e.g 1 (A->B): 10101111 684 index = line.rfind(" ") 685 line = line[index + 1:] 686 687 # support pauses given like 100101/10s 688 try: 689 data, pause = line.split(settings.PAUSE_SEP) 690 except ValueError: 691 data, pause = line, str(default_pause) 692 if pause.endswith("ms"): 693 pause = float(pause[:-2]) * float(sample_rate) / 1e3 694 elif pause.endswith("µs"): 695 pause = float(pause[:-2]) * float(sample_rate) / 1e6 696 elif pause.endswith("ns"): 697 pause = float(pause[:-2]) * float(sample_rate) / 1e9 698 elif pause.endswith("s"): 699 pause = float(pause[:-1]) * float(sample_rate) 700 else: 701 pause = float(pause) 702 703 return data, int(pause) 704 705 if not is_hex: 706 for line in filter(None, map(str.strip, message_strings)): 707 bits, pause = parse_line(line) 708 try: 709 protocol.messages.append(Message.from_plain_bits_str(bits, pause=pause)) 710 except ValueError: 711 is_hex = True if is_hex is None else is_hex 712 break 713 714 if is_hex: 715 protocol.messages.clear() 716 lookup = {"{0:0x}".format(i): "{0:04b}".format(i) for i in range(16)} 717 718 for line in filter(None, map(str.strip, message_strings)): 719 bits, pause = parse_line(line) 720 bit_str = [lookup[bits[i].lower()] for i in range(0, len(bits))] 721 protocol.messages.append(Message.from_plain_bits_str("".join(bit_str), pause=pause)) 722 723 return protocol 724