1#!/usr/bin/python2.5
2#
3# Copyright 2009 Olivier Gillet.
4#
5# Author: Olivier Gillet (ol.gillet@gmail.com)
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18# -----------------------------------------------------------------------------
19#
20# Midifile Writer and Reader.
21
22"""Midifile writer.
23"""
24
25import bisect
26import math
27import struct
28
29
30def PackInteger(value, size=4):
31  """Packs a python integer into a n-byte big endian byte sequence."""
32  return struct.pack('>%s' % {1: 'B', 2: 'H', 4: 'L'}[size], value)
33
34
35def UnpackInteger(value, size=4):
36  """Packs a python integer into a n-byte big endian byte sequence."""
37  return struct.unpack('>%s' % {1: 'B', 2: 'H', 4: 'L'}[size], value)[0]
38
39
40def PackVariableLengthInteger(value):
41  """Packs a python integer into a variable length byte sequence."""
42  if value == 0:
43    return '\x00'
44  s = value
45  output = []
46  while value:
47    to_write = value & 0x7f
48    value = value >> 7
49    output.insert(0, to_write)
50  for i in xrange(len(output) - 1):
51    output[i] |= 0x80
52  output = ''.join(map(chr, output))
53  return output
54
55"""Classes representing a MIDI event, with a Serialize method which encodes
56them into a string."""
57
58class Event(object):
59  def __init__(self):
60    pass
61
62  def Serialize(self, running_status):
63    raise NotImplementedError
64
65
66class MetaEvent(Event):
67  def __init__(self, id, data):
68    assert len(data) < 256
69    self.id = id
70    self.data = data
71
72  def Serialize(self, running_status):
73    return ''.join([
74        '\xff',
75        PackInteger(self.id, size=1),
76        PackInteger(len(self.data), size=1),
77        self.data]), None
78
79
80class TextEvent(MetaEvent):
81  def __init__(self, text):
82    self.text = text
83    super(TextEvent, self).__init__(0x01, text)
84
85
86class CopyrightInfoEvent(MetaEvent):
87  def __init__(self, text):
88    self.text = text
89    super(CopyrightInfoEvent, self).__init__(0x02, text)
90
91
92class TrackNameEvent(MetaEvent):
93  def __init__(self, text):
94    self.text = text
95    super(TrackNameEvent, self).__init__(0x03, text)
96
97
98class TrackInstrumentNameEvent(MetaEvent):
99  def __init__(self, text):
100    self.text = text
101    super(TrackInstrumentNameEvent, self).__init__(0x04, text)
102
103
104class LyricEvent(MetaEvent):
105  def __init__(self, text):
106    self.text = text
107    super(LyricEvent, self).__init__(0x05, text)
108
109
110class MarkerEvent(MetaEvent):
111  def __init__(self, text):
112    self.text = text
113    super(MarkerEvent, self).__init__(0x06, text)
114
115
116class CuePointEvent(MetaEvent):
117  def __init__(self, text):
118    self.text = text
119    super(CuePointEvent, self).__init__(0x07, text)
120
121
122class EndOfTrackEvent(MetaEvent):
123  def __init__(self):
124    super(EndOfTrackEvent, self).__init__(0x2f, '')
125
126
127class TempoEvent(MetaEvent):
128  def __init__(self, bpm):
129    self.bpm = bpm
130    value = 60000000.0 / bpm
131    data = PackInteger(int(value), size=4)[1:]
132    super(TempoEvent, self).__init__(0x51, data)
133
134
135class SMPTEOffsetEvent(MetaEvent):
136  def __init__(self, h, m, s, f, sf):
137    self.smpte_offset = (h, m, s, f, sf)
138    data = ''.join(map(chr, [h, m, s, f, sf]))
139    super(SMPTEOffsetEvent, self).__init__(0x54, data)
140
141
142class TimeSignatureEvent(MetaEvent):
143  def __init__(self, numerator, denominator):
144    self.numerator = numerator
145    self.denominator = denominator
146    data = ''.join([
147        PackInteger(numerator, size=1),
148        PackInteger(int(math.log(denominator) / math.log(2)), size=1),
149        '\x16\x08'])
150    super(TimeSignatureEvent, self).__init__(0x58, data)
151
152
153class KeyEvent(MetaEvent):
154  def __init__(self, sharp_flats, major_minor):
155    self.sharp_flats = sharp_flats
156    self.major_minor = major_minor
157    data = ''.join([
158        PackInteger(sharp_flats, size=1),
159        PackInteger(major_minor, size=1)])
160    super(KeyEvent, self).__init__(0x59, data)
161
162
163class BlobEvent(MetaEvent):
164  def __init__(self, blob):
165    self.data = blob
166    super(BlobEvent, self).__init__(0x7f, blob)
167
168
169"""Classic channel-oriented messages."""
170
171class ChannelEvent(Event):
172  def __init__(self, mask, channel, data):
173    self.channel = channel
174    self._status = mask | (channel - 1)
175    self._data = PackInteger(self._status, size=1) + data
176
177  def Serialize(self, running_status):
178    if self._status == running_status:
179      return self._data[1:], self._status
180    else:
181      return self._data, self._status
182
183
184class NoteOffEvent(ChannelEvent):
185  def __init__(self, channel, note, velocity):
186    data = PackInteger(note, size=1) + PackInteger(velocity, size=1)
187    super(NoteOffEvent, self).__init__(0x80, channel, data)
188    self.note = note
189    self.velocity = velocity
190
191
192class NoteOnEvent(ChannelEvent):
193  def __init__(self, channel, note, velocity):
194    data = PackInteger(note, size=1) + PackInteger(velocity, size=1)
195    super(NoteOnEvent, self).__init__(0x90, channel, data)
196    self.note = note
197    self.velocity = velocity
198
199
200class KeyAftertouchEvent(ChannelEvent):
201  def __init__(self, channel, note, aftertouch):
202    data = PackInteger(note, size=1) + PackInteger(aftertouch, size=1)
203    super(KeyAftertouchEvent, self).__init__(0xa0, channel, data)
204    self.note = note
205    self.aftertouch = aftertouch
206
207
208class ControlChangeEvent(ChannelEvent):
209  def __init__(self, channel, controller, value):
210    data = PackInteger(controller, size=1) + PackInteger(value, size=1)
211    super(ControlChangeEvent, self).__init__(0xb0, channel, data)
212    self.controller = controller
213    self.value = value
214
215
216class ProgramChangeEvent(ChannelEvent):
217  def __init__(self, channel, program_number):
218    data = PackInteger(program_number, size=1)
219    super(ProgramChangeEvent, self).__init__(0xc0, channel, data)
220    self.program_number = program_number
221
222
223class ChannelAftertouchEvent(ChannelEvent):
224  def __init__(self, channel, aftertouch):
225    data = PackInteger(aftertouch, size=1)
226    super(ChannelAftertouchEvent, self).__init__(0xd0, channel, data)
227    self.aftertouch = aftertouch
228
229
230class PitchBendEvent(ChannelEvent):
231  def __init__(self, channel, pitch_bend_14_bits):
232    data = PackInteger(pitch_bend_14_bits >> 7, size=1) + \
233        PackInteger(pitch_bend_14_bits & 0x7f, size=1)
234    super(PitchBendEvent, self).__init__(0xe0, channel, data)
235    self.pitch_bend = pitch_bend_14_bits
236
237
238class SystemEvent(Event):
239  def __init__(self, id):
240    self._id = id
241
242  def Serialize(self, running_status):
243    return PackInteger(self._id, size=1), running_status
244
245
246class ClockEvent(SystemEvent):
247  def __init__(self):
248    super(ClockEvent, self).__init__(0xf8)
249
250
251class StartEvent(SystemEvent):
252  def __init__(self):
253    super(StartEvent, self).__init__(0xfa)
254
255
256class ContinueEvent(SystemEvent):
257  def __init__(self):
258    super(ContinueEvent, self).__init__(0xfb)
259
260
261class StopEvent(SystemEvent):
262  def __init__(self):
263    super(StopEvent, self).__init__(0xfc)
264
265
266# TODO(pichenettes): also support pauses within a block transmission (F7)
267class SysExEvent(Event):
268  def __init__(self, manufacturer_id, device_id, data):
269    self.data = data
270    self.message = ''.join([
271        manufacturer_id,
272        device_id,
273        data,
274        '\xf7'])
275    self.raw_message = '\xf0' + self.message
276    assert all(ord(x) < 128 for x in self.message[:-1])
277    self.message = ''.join([
278        '\xf0',
279        PackVariableLengthInteger(len(self.message)),
280        self.message])
281
282  def Serialize(self, running_status):
283    return self.message, None
284
285
286def Nibblize(data, add_checksum=True):
287  """Converts a byte string into a nibble string. Also adds checksum"""
288  output = []
289  if add_checksum:
290    tail = [chr(sum(ord(char) for char in data) % 256)]
291  else:
292    tail = []
293  for char in map(ord, list(data) + tail):
294    output.append(chr(char >> 4))
295    output.append(chr(char & 0x0f))
296  return ''.join(output)
297
298
299class Track(object):
300  def __init__(self):
301    self._events = []
302
303  def AddEvent(self, time, event):
304    self._events.append((time, event))
305
306  def Sort(self):
307    self._events = sorted(self._events)
308
309  def Serialize(self):
310    self.Sort()
311    last_time, last_event = self._events[-1]
312    if type(last_event) != EndOfTrackEvent:
313      self._events.append((last_time + 1, EndOfTrackEvent()))
314    data = []
315    current_time = 0
316    running_status = None
317    for time, event in self._events:
318      delta = time - current_time
319      data.append(PackVariableLengthInteger(delta))
320      event_data, running_status = event.Serialize(running_status)
321      data.append(event_data)
322      current_time = time
323    return ''.join(data)
324
325  def Write(self, file_object):
326    file_object.write('MTrk')
327    track_data = self.Serialize()
328    file_object.write(PackInteger(len(track_data)))
329    file_object.write(track_data)
330
331  @property
332  def events(self):
333    return self._events
334
335
336class Writer(object):
337  def __init__(self, ppq=96):
338    self._tracks = []
339    self._ppq = ppq
340
341  def AddTrack(self):
342    new_track = Track()
343    self._tracks.append(new_track)
344    return new_track
345
346  def _MergeTracks(self):
347    new_track = Track()
348    for track in self._tracks:
349      for time_event in track.events:
350        new_track.AddEvent(*time_event)
351    new_track.Sort()
352    return new_track
353
354  def Write(self, file_object, format=0):
355    tracks = self._tracks
356    if format == 0:
357      tracks = [self._MergeTracks()]
358
359    # File header.
360    file_object.write('MThd')
361    file_object.write(PackInteger(6))
362    file_object.write(PackInteger(format, size=2))
363    if format == 0:
364      file_object.write(PackInteger(1, size=2))
365    else:
366      file_object.write(PackInteger(len(self._tracks), size=2))
367    file_object.write(PackInteger(self._ppq, size=2))
368
369    # Tracks.
370    for track in tracks:
371      track.Write(file_object)
372
373
374class Reader(object):
375  def __init__(self):
376    self.tracks = []
377    self.format = 0
378    self.ppq = 96
379    self._previous_status = 0
380
381  def Read(self, f):
382    assert f.read(4) == 'MThd'
383    assert struct.unpack('>i', f.read(4))[0] == 6
384    self.format = struct.unpack('>h', f.read(2))[0]
385    assert self.format <= 2
386    num_tracks = struct.unpack('>h', f.read(2))[0]
387    self.ppq = struct.unpack('>h', f.read(2))[0]
388    self._tempo_map = []
389
390    for i in xrange(num_tracks):
391      self.tracks.append(self._ReadTrack(f))
392    self._CreateCumulativeTempoMap()
393
394
395  def _ReadTrack(self, f):
396    assert f.read(4) == 'MTrk'
397    size = struct.unpack('>i', f.read(4))[0]
398    t = 0
399    events = []
400    while size > 0:
401      delta_t, e, event_size = self._ReadEvent(f)
402      t += delta_t
403      if e:
404        events.append((t, e))
405        if type(e) == TempoEvent:
406          self._tempo_map.append((t, e.bpm))
407      size -= event_size
408    return events
409
410  def _CreateCumulativeTempoMap(self):
411    t = 0.0
412    current_tempo = 120.0
413    previous_beat = 0
414    cumulative_tempo_map = [(0, 0.0, current_tempo)]
415    for beat, tempo in sorted(self._tempo_map):
416      beats = float(beat - previous_beat) / self.ppq
417      t += beats * 60.0 / current_tempo
418      cumulative_tempo_map.append((beat, t, tempo))
419      current_tempo = tempo
420      previous_beat = beat
421    self._tempo_map = cumulative_tempo_map
422
423  def AbsoluteTime(self, t):
424    index = bisect.bisect_left(self._tempo_map, (t, 0, 0))
425    index = max(index - 1, 0)
426    start_beat, start_seconds, tempo = self._tempo_map[index]
427    return start_seconds + float(t - start_beat) / self.ppq * 60.0 / tempo
428
429  def _ReadVariableLengthInteger(self, f):
430    v = 0
431    size = 0
432    while True:
433      v <<= 7
434      byte = UnpackInteger(f.read(1), size=1)
435      size += 1
436      v |= (byte & 0x7f)
437      if not (byte & 0x80):
438        break
439    return v, size
440
441  def _ReadEvent(self, f):
442    delta_t, size = self._ReadVariableLengthInteger(f)
443    event_byte = ord(f.read(1))
444    size += 1
445    if event_byte < 0x80:
446      if self._previous_status:
447        f.seek(f.tell() - 1)
448        size -= 1
449        event_byte = self._previous_status
450      else:
451        return delta_t, None, size
452
453    event_type = event_byte & 0xf0
454    channel = event_byte & 0xf
455    channel += 1
456    if event_type == 0x80:
457      self._previous_status = event_type
458      note = ord(f.read(1))
459      velo = ord(f.read(1))
460      event = NoteOffEvent(channel, note, velo)
461      size += 2
462    elif event_type == 0x90:
463      self._previous_status = event_type
464      event = NoteOnEvent(channel, ord(f.read(1)), ord(f.read(1)))
465      size += 2
466    elif event_type == 0xa0:
467      self._previous_status = event_type
468      event = KeyAftertouchEvent(channel, ord(f.read(1)), ord(f.read(1)))
469      size += 2
470    elif event_type == 0xb0:
471      self._previous_status = event_type
472      event = ControlChangeEvent(channel, ord(f.read(1)), ord(f.read(1)))
473      size += 2
474    elif event_type == 0xc0:
475      self._previous_status = event_type
476      event = ProgramChangeEvent(channel, ord(f.read(1)))
477      size += 1
478    elif event_type == 0xd0:
479      self._previous_status = event_type
480      event = ChannelAftertouchEvent(channel, ord(f.read(1)))
481      size += 1
482    elif event_type == 0xe0:
483      self._previous_status = event_type
484      event = PitchBendEvent(channel, (ord(f.read(1)) << 7) | ord(f.read(1)))
485      size += 2
486    elif event_byte == 0xff:
487      event_type = ord(f.read(1))
488      size += 1
489      event_size, event_size_size = self._ReadVariableLengthInteger(f)
490      size += event_size_size
491      bytes = f.read(event_size)
492      size += event_size
493      if event_type == 0x01:
494        event = TextEvent(bytes)
495      elif event_type == 0x02:
496        event = CopyrightEvent(bytes)
497      elif event_type == 0x03:
498        event = TrackNameEvent(bytes)
499      elif event_type == 0x04:
500        event = TrackInstrumentNameEvent(bytes)
501      elif event_type == 0x05:
502        event = LyricEvent(bytes)
503      elif event_type == 0x06:
504        event = MarkerEvent(bytes)
505      elif event_type == 0x07:
506        event = CuePointEvent(bytes)
507      elif event_type == 0x20:
508        current_channel = ord(bytes[0])
509        event = None
510      elif event_type == 0x2f:
511        event = EndOfTrackEvent()
512      elif event_type == 0x51:
513        value = UnpackInteger('\x00' + bytes, size=4)
514        event = TempoEvent(60000000.0 / value)
515      elif event_type == 0x54:
516        event = SMPTEOffsetEvent(*map(ord, bytes))
517      elif event_type == 0x58:
518        event = TimeSignatureEvent(ord(bytes[0]), 2 ** ord(bytes[1]))
519      elif event_type == 0x59:
520        event = KeyEvent(ord(bytes[0]), ord(bytes[1]))
521      elif event_type == 0x7f:
522        event = BlobEvent(bytes)
523    elif event_byte == 0xf0:
524      event_size, event_size_size = self._ReadVariableLengthInteger(f)
525      size += event_size_size
526      bytes = f.read(event_size)
527      size += event_size
528      event = SysExEvent(bytes[0:3], bytes[3:5], bytes[5:-1])
529    else:
530      print event_byte, '!!'
531      event = None
532    return delta_t, event, size
533
534
535if __name__ == '__main__':
536  m = MidiFile()
537  t = m.AddTrack()
538  t.AddEvent(0, TempoEvent(120.0))
539
540  t = m.AddTrack()
541  t.AddEvent(1, SysExEvent(
542      '\x00\x20\x77',
543      '\x00\x01',
544      '\x7f\x7f' + Nibblize('\xff\x00\xcc')))
545
546  f = file('output.mid', 'wb')
547  m.Write(f, format=0)
548  f.close()
549