1#!/usr/bin/python2.5 2# 3# Copyright 2009 Olivier Gillet. 4# 5# Author: Olivier Gillet (ol.gillet@gmail.com) 6# 7# This program is free software: you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation, either version 3 of the License, or 10# (at your option) any later version. 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18# ----------------------------------------------------------------------------- 19# 20# Midifile Writer and Reader. 21 22"""Midifile writer. 23""" 24 25import bisect 26import math 27import struct 28 29 30def PackInteger(value, size=4): 31 """Packs a python integer into a n-byte big endian byte sequence.""" 32 return struct.pack('>%s' % {1: 'B', 2: 'H', 4: 'L'}[size], value) 33 34 35def UnpackInteger(value, size=4): 36 """Packs a python integer into a n-byte big endian byte sequence.""" 37 return struct.unpack('>%s' % {1: 'B', 2: 'H', 4: 'L'}[size], value)[0] 38 39 40def PackVariableLengthInteger(value): 41 """Packs a python integer into a variable length byte sequence.""" 42 if value == 0: 43 return '\x00' 44 s = value 45 output = [] 46 while value: 47 to_write = value & 0x7f 48 value = value >> 7 49 output.insert(0, to_write) 50 for i in xrange(len(output) - 1): 51 output[i] |= 0x80 52 output = ''.join(map(chr, output)) 53 return output 54 55"""Classes representing a MIDI event, with a Serialize method which encodes 56them into a string.""" 57 58class Event(object): 59 def __init__(self): 60 pass 61 62 def Serialize(self, running_status): 63 raise NotImplementedError 64 65 66class MetaEvent(Event): 67 def __init__(self, id, data): 68 assert len(data) < 256 69 self.id = id 70 self.data = data 71 72 def Serialize(self, running_status): 73 return ''.join([ 74 '\xff', 75 PackInteger(self.id, size=1), 76 PackInteger(len(self.data), size=1), 77 self.data]), None 78 79 80class TextEvent(MetaEvent): 81 def __init__(self, text): 82 self.text = text 83 super(TextEvent, self).__init__(0x01, text) 84 85 86class CopyrightInfoEvent(MetaEvent): 87 def __init__(self, text): 88 self.text = text 89 super(CopyrightInfoEvent, self).__init__(0x02, text) 90 91 92class TrackNameEvent(MetaEvent): 93 def __init__(self, text): 94 self.text = text 95 super(TrackNameEvent, self).__init__(0x03, text) 96 97 98class TrackInstrumentNameEvent(MetaEvent): 99 def __init__(self, text): 100 self.text = text 101 super(TrackInstrumentNameEvent, self).__init__(0x04, text) 102 103 104class LyricEvent(MetaEvent): 105 def __init__(self, text): 106 self.text = text 107 super(LyricEvent, self).__init__(0x05, text) 108 109 110class MarkerEvent(MetaEvent): 111 def __init__(self, text): 112 self.text = text 113 super(MarkerEvent, self).__init__(0x06, text) 114 115 116class CuePointEvent(MetaEvent): 117 def __init__(self, text): 118 self.text = text 119 super(CuePointEvent, self).__init__(0x07, text) 120 121 122class EndOfTrackEvent(MetaEvent): 123 def __init__(self): 124 super(EndOfTrackEvent, self).__init__(0x2f, '') 125 126 127class TempoEvent(MetaEvent): 128 def __init__(self, bpm): 129 self.bpm = bpm 130 value = 60000000.0 / bpm 131 data = PackInteger(int(value), size=4)[1:] 132 super(TempoEvent, self).__init__(0x51, data) 133 134 135class SMPTEOffsetEvent(MetaEvent): 136 def __init__(self, h, m, s, f, sf): 137 self.smpte_offset = (h, m, s, f, sf) 138 data = ''.join(map(chr, [h, m, s, f, sf])) 139 super(SMPTEOffsetEvent, self).__init__(0x54, data) 140 141 142class TimeSignatureEvent(MetaEvent): 143 def __init__(self, numerator, denominator): 144 self.numerator = numerator 145 self.denominator = denominator 146 data = ''.join([ 147 PackInteger(numerator, size=1), 148 PackInteger(int(math.log(denominator) / math.log(2)), size=1), 149 '\x16\x08']) 150 super(TimeSignatureEvent, self).__init__(0x58, data) 151 152 153class KeyEvent(MetaEvent): 154 def __init__(self, sharp_flats, major_minor): 155 self.sharp_flats = sharp_flats 156 self.major_minor = major_minor 157 data = ''.join([ 158 PackInteger(sharp_flats, size=1), 159 PackInteger(major_minor, size=1)]) 160 super(KeyEvent, self).__init__(0x59, data) 161 162 163class BlobEvent(MetaEvent): 164 def __init__(self, blob): 165 self.data = blob 166 super(BlobEvent, self).__init__(0x7f, blob) 167 168 169"""Classic channel-oriented messages.""" 170 171class ChannelEvent(Event): 172 def __init__(self, mask, channel, data): 173 self.channel = channel 174 self._status = mask | (channel - 1) 175 self._data = PackInteger(self._status, size=1) + data 176 177 def Serialize(self, running_status): 178 if self._status == running_status: 179 return self._data[1:], self._status 180 else: 181 return self._data, self._status 182 183 184class NoteOffEvent(ChannelEvent): 185 def __init__(self, channel, note, velocity): 186 data = PackInteger(note, size=1) + PackInteger(velocity, size=1) 187 super(NoteOffEvent, self).__init__(0x80, channel, data) 188 self.note = note 189 self.velocity = velocity 190 191 192class NoteOnEvent(ChannelEvent): 193 def __init__(self, channel, note, velocity): 194 data = PackInteger(note, size=1) + PackInteger(velocity, size=1) 195 super(NoteOnEvent, self).__init__(0x90, channel, data) 196 self.note = note 197 self.velocity = velocity 198 199 200class KeyAftertouchEvent(ChannelEvent): 201 def __init__(self, channel, note, aftertouch): 202 data = PackInteger(note, size=1) + PackInteger(aftertouch, size=1) 203 super(KeyAftertouchEvent, self).__init__(0xa0, channel, data) 204 self.note = note 205 self.aftertouch = aftertouch 206 207 208class ControlChangeEvent(ChannelEvent): 209 def __init__(self, channel, controller, value): 210 data = PackInteger(controller, size=1) + PackInteger(value, size=1) 211 super(ControlChangeEvent, self).__init__(0xb0, channel, data) 212 self.controller = controller 213 self.value = value 214 215 216class ProgramChangeEvent(ChannelEvent): 217 def __init__(self, channel, program_number): 218 data = PackInteger(program_number, size=1) 219 super(ProgramChangeEvent, self).__init__(0xc0, channel, data) 220 self.program_number = program_number 221 222 223class ChannelAftertouchEvent(ChannelEvent): 224 def __init__(self, channel, aftertouch): 225 data = PackInteger(aftertouch, size=1) 226 super(ChannelAftertouchEvent, self).__init__(0xd0, channel, data) 227 self.aftertouch = aftertouch 228 229 230class PitchBendEvent(ChannelEvent): 231 def __init__(self, channel, pitch_bend_14_bits): 232 data = PackInteger(pitch_bend_14_bits >> 7, size=1) + \ 233 PackInteger(pitch_bend_14_bits & 0x7f, size=1) 234 super(PitchBendEvent, self).__init__(0xe0, channel, data) 235 self.pitch_bend = pitch_bend_14_bits 236 237 238class SystemEvent(Event): 239 def __init__(self, id): 240 self._id = id 241 242 def Serialize(self, running_status): 243 return PackInteger(self._id, size=1), running_status 244 245 246class ClockEvent(SystemEvent): 247 def __init__(self): 248 super(ClockEvent, self).__init__(0xf8) 249 250 251class StartEvent(SystemEvent): 252 def __init__(self): 253 super(StartEvent, self).__init__(0xfa) 254 255 256class ContinueEvent(SystemEvent): 257 def __init__(self): 258 super(ContinueEvent, self).__init__(0xfb) 259 260 261class StopEvent(SystemEvent): 262 def __init__(self): 263 super(StopEvent, self).__init__(0xfc) 264 265 266# TODO(pichenettes): also support pauses within a block transmission (F7) 267class SysExEvent(Event): 268 def __init__(self, manufacturer_id, device_id, data): 269 self.data = data 270 self.message = ''.join([ 271 manufacturer_id, 272 device_id, 273 data, 274 '\xf7']) 275 self.raw_message = '\xf0' + self.message 276 assert all(ord(x) < 128 for x in self.message[:-1]) 277 self.message = ''.join([ 278 '\xf0', 279 PackVariableLengthInteger(len(self.message)), 280 self.message]) 281 282 def Serialize(self, running_status): 283 return self.message, None 284 285 286def Nibblize(data, add_checksum=True): 287 """Converts a byte string into a nibble string. Also adds checksum""" 288 output = [] 289 if add_checksum: 290 tail = [chr(sum(ord(char) for char in data) % 256)] 291 else: 292 tail = [] 293 for char in map(ord, list(data) + tail): 294 output.append(chr(char >> 4)) 295 output.append(chr(char & 0x0f)) 296 return ''.join(output) 297 298 299class Track(object): 300 def __init__(self): 301 self._events = [] 302 303 def AddEvent(self, time, event): 304 self._events.append((time, event)) 305 306 def Sort(self): 307 self._events = sorted(self._events) 308 309 def Serialize(self): 310 self.Sort() 311 last_time, last_event = self._events[-1] 312 if type(last_event) != EndOfTrackEvent: 313 self._events.append((last_time + 1, EndOfTrackEvent())) 314 data = [] 315 current_time = 0 316 running_status = None 317 for time, event in self._events: 318 delta = time - current_time 319 data.append(PackVariableLengthInteger(delta)) 320 event_data, running_status = event.Serialize(running_status) 321 data.append(event_data) 322 current_time = time 323 return ''.join(data) 324 325 def Write(self, file_object): 326 file_object.write('MTrk') 327 track_data = self.Serialize() 328 file_object.write(PackInteger(len(track_data))) 329 file_object.write(track_data) 330 331 @property 332 def events(self): 333 return self._events 334 335 336class Writer(object): 337 def __init__(self, ppq=96): 338 self._tracks = [] 339 self._ppq = ppq 340 341 def AddTrack(self): 342 new_track = Track() 343 self._tracks.append(new_track) 344 return new_track 345 346 def _MergeTracks(self): 347 new_track = Track() 348 for track in self._tracks: 349 for time_event in track.events: 350 new_track.AddEvent(*time_event) 351 new_track.Sort() 352 return new_track 353 354 def Write(self, file_object, format=0): 355 tracks = self._tracks 356 if format == 0: 357 tracks = [self._MergeTracks()] 358 359 # File header. 360 file_object.write('MThd') 361 file_object.write(PackInteger(6)) 362 file_object.write(PackInteger(format, size=2)) 363 if format == 0: 364 file_object.write(PackInteger(1, size=2)) 365 else: 366 file_object.write(PackInteger(len(self._tracks), size=2)) 367 file_object.write(PackInteger(self._ppq, size=2)) 368 369 # Tracks. 370 for track in tracks: 371 track.Write(file_object) 372 373 374class Reader(object): 375 def __init__(self): 376 self.tracks = [] 377 self.format = 0 378 self.ppq = 96 379 self._previous_status = 0 380 381 def Read(self, f): 382 assert f.read(4) == 'MThd' 383 assert struct.unpack('>i', f.read(4))[0] == 6 384 self.format = struct.unpack('>h', f.read(2))[0] 385 assert self.format <= 2 386 num_tracks = struct.unpack('>h', f.read(2))[0] 387 self.ppq = struct.unpack('>h', f.read(2))[0] 388 self._tempo_map = [] 389 390 for i in xrange(num_tracks): 391 self.tracks.append(self._ReadTrack(f)) 392 self._CreateCumulativeTempoMap() 393 394 395 def _ReadTrack(self, f): 396 assert f.read(4) == 'MTrk' 397 size = struct.unpack('>i', f.read(4))[0] 398 t = 0 399 events = [] 400 while size > 0: 401 delta_t, e, event_size = self._ReadEvent(f) 402 t += delta_t 403 if e: 404 events.append((t, e)) 405 if type(e) == TempoEvent: 406 self._tempo_map.append((t, e.bpm)) 407 size -= event_size 408 return events 409 410 def _CreateCumulativeTempoMap(self): 411 t = 0.0 412 current_tempo = 120.0 413 previous_beat = 0 414 cumulative_tempo_map = [(0, 0.0, current_tempo)] 415 for beat, tempo in sorted(self._tempo_map): 416 beats = float(beat - previous_beat) / self.ppq 417 t += beats * 60.0 / current_tempo 418 cumulative_tempo_map.append((beat, t, tempo)) 419 current_tempo = tempo 420 previous_beat = beat 421 self._tempo_map = cumulative_tempo_map 422 423 def AbsoluteTime(self, t): 424 index = bisect.bisect_left(self._tempo_map, (t, 0, 0)) 425 index = max(index - 1, 0) 426 start_beat, start_seconds, tempo = self._tempo_map[index] 427 return start_seconds + float(t - start_beat) / self.ppq * 60.0 / tempo 428 429 def _ReadVariableLengthInteger(self, f): 430 v = 0 431 size = 0 432 while True: 433 v <<= 7 434 byte = UnpackInteger(f.read(1), size=1) 435 size += 1 436 v |= (byte & 0x7f) 437 if not (byte & 0x80): 438 break 439 return v, size 440 441 def _ReadEvent(self, f): 442 delta_t, size = self._ReadVariableLengthInteger(f) 443 event_byte = ord(f.read(1)) 444 size += 1 445 if event_byte < 0x80: 446 if self._previous_status: 447 f.seek(f.tell() - 1) 448 size -= 1 449 event_byte = self._previous_status 450 else: 451 return delta_t, None, size 452 453 event_type = event_byte & 0xf0 454 channel = event_byte & 0xf 455 channel += 1 456 if event_type == 0x80: 457 self._previous_status = event_type 458 note = ord(f.read(1)) 459 velo = ord(f.read(1)) 460 event = NoteOffEvent(channel, note, velo) 461 size += 2 462 elif event_type == 0x90: 463 self._previous_status = event_type 464 event = NoteOnEvent(channel, ord(f.read(1)), ord(f.read(1))) 465 size += 2 466 elif event_type == 0xa0: 467 self._previous_status = event_type 468 event = KeyAftertouchEvent(channel, ord(f.read(1)), ord(f.read(1))) 469 size += 2 470 elif event_type == 0xb0: 471 self._previous_status = event_type 472 event = ControlChangeEvent(channel, ord(f.read(1)), ord(f.read(1))) 473 size += 2 474 elif event_type == 0xc0: 475 self._previous_status = event_type 476 event = ProgramChangeEvent(channel, ord(f.read(1))) 477 size += 1 478 elif event_type == 0xd0: 479 self._previous_status = event_type 480 event = ChannelAftertouchEvent(channel, ord(f.read(1))) 481 size += 1 482 elif event_type == 0xe0: 483 self._previous_status = event_type 484 event = PitchBendEvent(channel, (ord(f.read(1)) << 7) | ord(f.read(1))) 485 size += 2 486 elif event_byte == 0xff: 487 event_type = ord(f.read(1)) 488 size += 1 489 event_size, event_size_size = self._ReadVariableLengthInteger(f) 490 size += event_size_size 491 bytes = f.read(event_size) 492 size += event_size 493 if event_type == 0x01: 494 event = TextEvent(bytes) 495 elif event_type == 0x02: 496 event = CopyrightEvent(bytes) 497 elif event_type == 0x03: 498 event = TrackNameEvent(bytes) 499 elif event_type == 0x04: 500 event = TrackInstrumentNameEvent(bytes) 501 elif event_type == 0x05: 502 event = LyricEvent(bytes) 503 elif event_type == 0x06: 504 event = MarkerEvent(bytes) 505 elif event_type == 0x07: 506 event = CuePointEvent(bytes) 507 elif event_type == 0x20: 508 current_channel = ord(bytes[0]) 509 event = None 510 elif event_type == 0x2f: 511 event = EndOfTrackEvent() 512 elif event_type == 0x51: 513 value = UnpackInteger('\x00' + bytes, size=4) 514 event = TempoEvent(60000000.0 / value) 515 elif event_type == 0x54: 516 event = SMPTEOffsetEvent(*map(ord, bytes)) 517 elif event_type == 0x58: 518 event = TimeSignatureEvent(ord(bytes[0]), 2 ** ord(bytes[1])) 519 elif event_type == 0x59: 520 event = KeyEvent(ord(bytes[0]), ord(bytes[1])) 521 elif event_type == 0x7f: 522 event = BlobEvent(bytes) 523 elif event_byte == 0xf0: 524 event_size, event_size_size = self._ReadVariableLengthInteger(f) 525 size += event_size_size 526 bytes = f.read(event_size) 527 size += event_size 528 event = SysExEvent(bytes[0:3], bytes[3:5], bytes[5:-1]) 529 else: 530 print event_byte, '!!' 531 event = None 532 return delta_t, event, size 533 534 535if __name__ == '__main__': 536 m = MidiFile() 537 t = m.AddTrack() 538 t.AddEvent(0, TempoEvent(120.0)) 539 540 t = m.AddTrack() 541 t.AddEvent(1, SysExEvent( 542 '\x00\x20\x77', 543 '\x00\x01', 544 '\x7f\x7f' + Nibblize('\xff\x00\xcc'))) 545 546 f = file('output.mid', 'wb') 547 m.Write(f, format=0) 548 f.close() 549