1# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
2# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
3#
4# MDAnalysis --- http://www.MDAnalysis.org
5# Copyright (c) 2006-2015 Naveen Michaud-Agrawal, Elizabeth J. Denning, Oliver
6# Beckstein and contributors (see AUTHORS for the full list)
7#
8# Released under the GNU Public Licence, v2 or any higher version
9#
10# Please cite your use of MDAnalysis in published work:
11#
12# N. Michaud-Agrawal, E. J. Denning, T. B. Woolf, and O. Beckstein.
13# MDAnalysis: A Toolkit for the Analysis of Molecular Dynamics Simulations.
14# J. Comput. Chem. 32 (2011), 2319--2327, doi:10.1002/jcc.21787
15#
16from __future__ import absolute_import, print_function
17from six.moves import zip
18from six.moves import cPickle as pickle
19
20from collections import namedtuple
21import os
22import string
23import struct
24
25import hypothesis.strategies as strategies
26from hypothesis import example, given
27import numpy as np
28
29from numpy.testing import (assert_array_almost_equal, assert_equal,
30                           assert_array_equal, assert_almost_equal)
31
32from MDAnalysis.lib.formats.libdcd import DCDFile, DCD_IS_CHARMM, DCD_HAS_EXTRA_BLOCK
33
34from MDAnalysisTests.datafiles import (
35    DCD, DCD_NAMD_TRICLINIC, legacy_DCD_ADK_coords, legacy_DCD_NAMD_coords,
36    legacy_DCD_c36_coords, DCD_TRICLINIC)
37
38import pytest
39
40
41@pytest.mark.parametrize("dcdfile, is_periodic",
42                         [(DCD, False), (DCD_NAMD_TRICLINIC, True),
43                          (DCD_TRICLINIC, True)])
44def test_is_periodic(dcdfile, is_periodic):
45    with DCDFile(dcdfile) as f:
46        assert f.is_periodic == is_periodic
47
48
49@pytest.mark.parametrize("dcdfile, natoms", [(DCD, 3341), (DCD_NAMD_TRICLINIC,
50                                                           5545),
51                                             (DCD_TRICLINIC, 375)])
52def test_read_coordsshape(dcdfile, natoms):
53    # confirm shape of coordinate data against result from previous
54    # MDAnalysis implementation of DCD file handling
55    with DCDFile(dcdfile) as dcd:
56        dcd_frame = dcd.read()
57    xyz = dcd_frame[0]
58    assert xyz.shape == (natoms, 3)
59
60
61@pytest.mark.parametrize(
62    "dcdfile, unit_cell",
63    [(DCD, [0., 90., 0., 90., 90., 0.]),
64     (DCD_NAMD_TRICLINIC, [38.42659378, 0.499563, 38.393102, 0., 0., 44.7598]),
65     (DCD_TRICLINIC,
66      [30.841836, 14.578635, 31.780088, 9.626323, -2.60815, 32.67009])])
67def test_read_unit_cell(dcdfile, unit_cell):
68    # confirm unit cell read against result from previous
69    # MDAnalysis implementation of DCD file handling
70    with DCDFile(dcdfile) as dcd:
71        dcd_frame = dcd.read()
72    assert_array_almost_equal(dcd_frame.unitcell, unit_cell)
73
74
75def test_seek_over_max():
76    with DCDFile(DCD) as dcd:
77        with pytest.raises(EOFError):
78            dcd.seek(102)
79
80
81@pytest.fixture
82def dcd():
83    with DCDFile(DCD) as dcd:
84        yield dcd
85
86
87def test_pickle(dcd):
88    dcd.seek(len(dcd) - 1)
89    dump = pickle.dumps(dcd)
90    new_dcd = pickle.loads(dump)
91
92    assert dcd.fname == new_dcd.fname
93    assert dcd.tell() == new_dcd.tell()
94
95
96def test_pickle_closed(dcd):
97    dcd.seek(len(dcd) - 1)
98    dcd.close()
99    dump = pickle.dumps(dcd)
100    new_dcd = pickle.loads(dump)
101
102    assert dcd.fname == new_dcd.fname
103    assert dcd.tell() != new_dcd.tell()
104
105
106@pytest.mark.parametrize("new_frame", (10, 42, 21))
107def test_seek_normal(new_frame, dcd):
108    # frame seek within range is tested
109    dcd.seek(new_frame)
110    assert dcd.tell() == new_frame
111
112
113def test_seek_negative(dcd):
114    with pytest.raises(IOError):
115        dcd.seek(-78)
116
117
118def test_iteration(dcd):
119    num_iters = 10
120    for _ in range(num_iters):
121        dcd.__next__()
122    assert dcd.tell() == num_iters
123
124
125def test_open_wrong_mode():
126    with pytest.raises(IOError):
127        DCDFile(DCD, 'e')
128
129
130def test_raise_not_existing():
131    with pytest.raises(IOError):
132        DCDFile('foo')
133
134
135def test_zero_based_frames_counting(dcd):
136    assert dcd.tell() == 0
137
138
139@pytest.mark.parametrize("dcdfile, natoms", [(DCD, 3341), (DCD_NAMD_TRICLINIC,
140                                                           5545),
141                                             (DCD_TRICLINIC, 375)])
142def test_natoms(dcdfile, natoms):
143    with DCDFile(dcdfile) as dcd:
144        assert dcd.header['natoms'] == natoms
145
146
147def test_read_closed(dcd):
148    dcd.close()
149    with pytest.raises(IOError):
150        dcd.read()
151
152
153@pytest.mark.parametrize("dcdfile, nframes", [(DCD, 98), (DCD_NAMD_TRICLINIC,
154                                                          1), (DCD_TRICLINIC,
155                                                               10)])
156def test_length_traj(dcdfile, nframes):
157    with DCDFile(dcdfile) as dcd:
158        assert len(dcd) == nframes
159
160
161def test_read_write_mode_file(tmpdir):
162    fname = str(tmpdir.join('foo'))
163    with DCDFile(fname, 'w') as f:
164        with pytest.raises(IOError):
165            f.read()
166
167
168def test_iterating_twice(dcd):
169    with dcd as f:
170        for i, _ in enumerate(f):
171            assert_equal(i + 1, f.tell())
172        # second iteration should work from start again
173        for i, _ in enumerate(f):
174            assert_equal(i + 1, f.tell())
175
176
177DCD_HEADER = '''* DIMS ADK SEQUENCE FOR PORE PROGRAM                                            * WRITTEN BY LIZ DENNING (6.2008)                                               *  DATE:     6/ 6/ 8     17:23:56      CREATED BY USER: denniej0                '''
178DCD_NAMD_TRICLINIC_HEADER = 'Created by DCD pluginREMARKS Created 06 July, 2014 at 17:29Y5~CORD,'
179DCD_TRICLINIC_HEADER = '* CHARMM TRICLINIC BOX TESTING                                                  * (OLIVER BECKSTEIN 2014)                                                       * BASED ON NPTDYN.INP : SCOTT FELLER, NIH, 7/15/95                              * TEST EXTENDED SYSTEM CONSTANT PRESSURE AND TEMPERATURE                        * DYNAMICS WITH WATER BOX.                                                      *  DATE:     7/ 7/14     13:59:46      CREATED BY USER: oliver                  '
180
181
182@pytest.mark.parametrize("dcdfile, remarks",
183                         ((DCD, DCD_HEADER), (DCD_NAMD_TRICLINIC,
184                                              DCD_NAMD_TRICLINIC_HEADER),
185                          (DCD_TRICLINIC, DCD_TRICLINIC_HEADER)))
186def test_header_remarks(dcdfile, remarks):
187    # confirm correct header remarks section reading
188    with DCDFile(dcdfile) as f:
189        assert len(f.header['remarks']) == len(remarks)
190
191
192@pytest.mark.parametrize("dcdfile, legacy_data, frames",
193                         ((DCD, legacy_DCD_ADK_coords, [5, 29]),
194                          (DCD_NAMD_TRICLINIC, legacy_DCD_NAMD_coords, [0]),
195                          (DCD_TRICLINIC, legacy_DCD_c36_coords, [1, 4])))
196def test_read_coord_values(dcdfile, legacy_data, frames):
197    # test the actual values of coordinates read in versus
198    # stored values read in by the legacy DCD handling framework
199    # to reduce repo storage burden, we only compare for a few
200    # randomly selected frames
201    legacy = np.load(legacy_data)
202    with DCDFile(dcdfile) as dcd:
203        for index, frame_num in enumerate(frames):
204            dcd.seek(frame_num)
205            actual_coords = dcd.read()[0]
206            desired_coords = legacy[index]
207            assert_array_equal(actual_coords, desired_coords)
208
209
210@pytest.mark.parametrize("dcdfile, legacy_data, frame_idx",
211                         ((DCD, legacy_DCD_ADK_coords, [5, 29]),
212                          (DCD_NAMD_TRICLINIC, legacy_DCD_NAMD_coords, [0]),
213                          (DCD_TRICLINIC, legacy_DCD_c36_coords, [1, 4])))
214def test_readframes(dcdfile, legacy_data, frame_idx):
215    legacy = np.load(legacy_data)
216    with DCDFile(dcdfile) as dcd:
217        frames = dcd.readframes()
218        xyz = frames.xyz
219        assert_equal(len(xyz), len(dcd))
220        for index, frame_num in enumerate(frame_idx):
221            assert_array_almost_equal(xyz[frame_num], legacy[index])
222
223
224def test_write_header(tmpdir):
225    # test that _write_header() can produce a very crude
226    # header for a new / empty file
227    testfile = str(tmpdir.join('test.dcd'))
228    with DCDFile(testfile, 'w') as dcd:
229        dcd.write_header(
230            remarks='Crazy!',
231            natoms=22,
232            istart=12,
233            nsavc=10,
234            delta=0.02,
235            is_periodic=1)
236
237    with DCDFile(testfile) as dcd:
238        header = dcd.header
239        assert header['remarks'] == 'Crazy!'
240        assert header['natoms'] == 22
241        assert header['istart'] == 12
242        assert header['is_periodic'] == 1
243        assert header['nsavc'] == 10
244        assert np.allclose(header['delta'], .02)
245
246    # we also check the bytes written directly.
247    with open(testfile, 'rb') as fh:
248        header_bytes = fh.read()
249    # check for magic number
250    assert struct.unpack('i', header_bytes[:4])[0] == 84
251    # magic number should be written again before remark section
252    assert struct.unpack('i', header_bytes[88:92])[0] == 84
253    # length of remark section. We hard code this to 244 right now
254    assert struct.unpack('i', header_bytes[92:96])[0] == 244
255    # say we have 3 block of length 80
256    assert struct.unpack('i', header_bytes[96:100])[0] == 3
257    # after the remark section the length should be reported again
258    assert struct.unpack('i', header_bytes[340:344])[0] == 244
259    # this is a magic number as far as I see
260    assert struct.unpack('i', header_bytes[344:348])[0] == 4
261
262
263def test_write_no_header(tmpdir):
264    fname = str(tmpdir.join('test.dcd'))
265    with DCDFile(fname, 'w') as dcd:
266        with pytest.raises(IOError):
267            dcd.write(np.ones(3), np.ones(6))
268
269
270def test_write_header_twice(tmpdir):
271    # an IOError should be raised if a duplicate
272    # header writing is attempted
273
274    header = {
275        "remarks": 'Crazy!',
276        "natoms": 22,
277        "istart": 12,
278        "nsavc": 10,
279        "delta": 0.02,
280        "is_periodic": 1
281    }
282
283    fname = str(tmpdir.join('test.dcd'))
284    with DCDFile(fname, 'w') as dcd:
285        dcd.write_header(**header)
286        with pytest.raises(IOError):
287            dcd.write_header(**header)
288
289
290def test_write_header_wrong_mode(dcd):
291    # an exception should be raised on any attempt to use
292    # write_header with a DCDFile object in 'r' mode
293    with pytest.raises(IOError):
294        dcd.write_header(
295            remarks='Crazy!',
296            natoms=22,
297            istart=12,
298            nsavc=10,
299            delta=0.02,
300            is_periodic=1)
301
302
303def test_write_mode(dcd):
304    # ensure that writing of DCD files only occurs with properly
305    # opened files
306    with pytest.raises(IOError):
307        dcd.write(xyz=np.zeros((3, 3)), box=np.zeros(6, dtype=np.float64))
308
309
310def write_dcd(in_name, out_name, remarks='testing', header=None):
311    with DCDFile(in_name) as f_in, DCDFile(out_name, 'w') as f_out:
312        if header is None:
313            header = f_in.header
314        f_out.write_header(**header)
315        for frame in f_in:
316            f_out.write(xyz=frame.xyz, box=frame.unitcell)
317
318
319@given(remarks=strategies.text(
320    alphabet=string.printable, min_size=0,
321    max_size=239))  # handle the printable ASCII strings
322@example(remarks='')
323def test_written_remarks_property(remarks, tmpdir, dcd):
324    # property based testing for writing of a wide range of string
325    # values to REMARKS field
326    testfile = str(tmpdir.join('test.dcd'))
327    header = dcd.header
328    header['remarks'] = remarks
329    write_dcd(DCD, testfile, header=header)
330    expected_remarks = remarks
331    with DCDFile(testfile) as f:
332        assert f.header['remarks'] == expected_remarks
333
334
335@pytest.fixture(scope='session')
336def written_dcd(tmpdir_factory):
337    with DCDFile(DCD) as dcd:
338        header = dcd.header
339    testfile = tmpdir_factory.mktemp('dcd').join('test.dcd')
340    testfile = str(testfile)
341    write_dcd(DCD, testfile)
342    Result = namedtuple("Result", "testfile, header, orgfile")
343    # throw away last char we didn't save due to null termination
344    header['remarks'] = header['remarks'][:-1]
345    return Result(testfile, header, DCD)
346
347
348def test_written_header(written_dcd):
349    header = written_dcd.header
350    with DCDFile(written_dcd.testfile) as dcd:
351        dcdheader = dcd.header
352        assert dcdheader == header
353
354
355def test_written_num_frames(written_dcd):
356    with DCDFile(written_dcd.testfile) as dcd, DCDFile(
357            written_dcd.orgfile) as other:
358        assert len(dcd) == len(other)
359
360
361def test_written_dcd_coordinate_data_shape(written_dcd):
362    with DCDFile(written_dcd.testfile) as dcd, DCDFile(
363            written_dcd.orgfile) as other:
364        for frame, other_frame in zip(dcd, other):
365            assert frame.xyz.shape == other_frame.xyz.shape
366
367
368def test_written_seek(written_dcd):
369    # ensure that we can seek properly on written DCD file
370    with DCDFile(written_dcd.testfile) as f:
371        f.seek(40)
372        assert_equal(f.tell(), 40)
373
374
375def test_written_coord_match(written_dcd):
376    with DCDFile(written_dcd.testfile) as test, DCDFile(
377            written_dcd.orgfile) as ref:
378        for frame, o_frame in zip(test, ref):
379            assert_array_almost_equal(frame.xyz, o_frame.xyz)
380
381
382def test_written_unit_cell(written_dcd):
383    with DCDFile(written_dcd.testfile) as test, DCDFile(
384            written_dcd.orgfile) as ref:
385        for frame, o_frame in zip(test, ref):
386            assert_array_almost_equal(frame.unitcell, o_frame.unitcell)
387
388
389@pytest.mark.parametrize("dtype", (np.int32, np.int64, np.float32, np.float64,
390                                   int, float))
391def test_write_all_dtypes(tmpdir, dtype):
392    fname = str(tmpdir.join('foo.dcd'))
393    with DCDFile(fname, 'w') as out:
394        natoms = 10
395        xyz = np.ones((natoms, 3), dtype=dtype)
396        box = np.ones(6, dtype=dtype)
397        out.write_header(
398            remarks='test',
399            natoms=natoms,
400            is_periodic=1,
401            delta=1,
402            nsavc=1,
403            istart=1)
404        out.write(xyz=xyz, box=box)
405
406
407@pytest.mark.parametrize("array_like", (np.array, list))
408def test_write_array_like(tmpdir, array_like):
409    fname = str(tmpdir.join('foo.dcd'))
410    with DCDFile(fname, 'w') as out:
411        natoms = 10
412        xyz = array_like([[1, 1, 1] for i in range(natoms)])
413        box = array_like([i for i in range(6)])
414        out.write_header(
415            remarks='test',
416            natoms=natoms,
417            is_periodic=1,
418            delta=1,
419            nsavc=1,
420            istart=1)
421        out.write(xyz=xyz, box=box)
422
423
424def test_write_wrong_shape_xyz(tmpdir):
425    fname = str(tmpdir.join('foo.dcd'))
426    with DCDFile(fname, 'w') as out:
427        natoms = 10
428        xyz = np.ones((natoms + 1, 3))
429        box = np.ones(6)
430        out.write_header(
431            remarks='test',
432            natoms=natoms,
433            is_periodic=1,
434            delta=1,
435            nsavc=1,
436            istart=1)
437        with pytest.raises(ValueError):
438            out.write(xyz=xyz, box=box)
439
440
441def test_write_wrong_shape_box(tmpdir):
442    fname = str(tmpdir.join('foo.dcd'))
443    with DCDFile(fname, 'w') as out:
444        natoms = 10
445        xyz = np.ones((natoms, 3))
446        box = np.ones(7)
447        out.write_header(
448            remarks='test',
449            natoms=natoms,
450            is_periodic=1,
451            delta=1,
452            nsavc=1,
453            istart=1)
454        with pytest.raises(ValueError):
455            out.write(xyz=xyz, box=box)
456
457
458@pytest.mark.parametrize("dcdfile", (DCD, DCD_TRICLINIC, DCD_NAMD_TRICLINIC))
459def test_relative_frame_sizes(dcdfile):
460    # the first frame of a DCD file should always be >= in size
461    # to subsequent frames, as the first frame contains the same
462    # atoms + (optional) fixed atoms
463    with DCDFile(dcdfile) as dcd:
464        first_frame_size = dcd._firstframesize
465        general_frame_size = dcd._framesize
466
467    assert first_frame_size >= general_frame_size
468
469
470@pytest.mark.parametrize("dcdfile", (DCD, DCD_TRICLINIC, DCD_NAMD_TRICLINIC))
471def test_file_size_breakdown(dcdfile):
472    # the size of a DCD file is equivalent to the sum of the header
473    # size, first frame size, and (N - 1 frames) * size per general
474    # frame
475
476    expected = os.path.getsize(dcdfile)
477    with DCDFile(dcdfile) as dcd:
478        actual = dcd._header_size + dcd._firstframesize + (
479            (dcd.n_frames - 1) * dcd._framesize)
480    assert actual == expected
481
482
483@pytest.mark.parametrize("dcdfile", (DCD, DCD_TRICLINIC, DCD_NAMD_TRICLINIC))
484def test_nframessize_int(dcdfile):
485    # require that the (nframessize / framesize) value used by DCDFile
486    # is an integer (because nframessize / framesize + 1 = total frames,
487    # which must also be an int)
488    filesize = os.path.getsize(dcdfile)
489    with DCDFile(dcdfile) as dcd:
490        nframessize = filesize - dcd._header_size - dcd._firstframesize
491        assert float(nframessize) % float(dcd._framesize) == 0
492
493
494@pytest.mark.parametrize(
495    "slice, length",
496    [([None, None, None], 98), ([0, None, None], 98), ([None, 98, None], 98),
497     ([None, None, 1], 98), ([None, None, -1], 98), ([2, 6, 2], 2),
498     ([0, 10, None], 10), ([2, 10, None], 8), ([0, 1, 1], 1), ([1, 1, 1], 0),
499     ([1, 2, 1], 1), ([1, 2, 2], 1), ([1, 4, 2], 2), ([1, 4, 4], 1), ([
500         0, 5, 5
501     ], 1), ([3, 5, 1], 2), ([4, 0, -1], 4), ([5, 0, -2], 3), ([5, 0, -4], 2)])
502def test_readframes_slices(slice, length, dcd):
503    start, stop, step = slice
504    allframes = dcd.readframes().xyz
505    frames = dcd.readframes(start=start, stop=stop, step=step)
506    xyz = frames.xyz
507    assert len(xyz) == length
508    assert_array_almost_equal(xyz, allframes[start:stop:step])
509
510
511@pytest.mark.parametrize("order, shape", (
512    ('fac', (98, 3341, 3)),
513    ('fca', (98, 3, 3341)),
514    ('afc', (3341, 98, 3)),
515    ('acf', (3341, 3, 98)),
516    ('caf', (3, 3341, 98)),
517    ('cfa', (3, 98, 3341)), ))
518def test_readframes_order(order, shape, dcd):
519    x = dcd.readframes(order=order).xyz
520    assert x.shape == shape
521
522
523@pytest.mark.parametrize("indices", [[1, 2, 3, 4], [5, 10, 15, 19],
524                                     [9, 4, 2, 0, 50]])
525def test_readframes_atomindices(indices, dcd):
526    allframes = dcd.readframes(order='afc').xyz
527    frames = dcd.readframes(indices=indices, order='afc')
528    xyz = frames.xyz
529    assert len(xyz) == len(indices)
530    assert_array_almost_equal(xyz, allframes[indices])
531
532
533def test_write_random_unitcell(tmpdir):
534    testname = str(tmpdir.join('test.dcd'))
535    rstate = np.random.RandomState(1178083)
536    random_unitcells = rstate.uniform(high=80, size=(98, 6)).astype(np.float64)
537
538    with DCDFile(DCD) as f_in, DCDFile(testname, 'w') as f_out:
539        header = f_in.header
540        header['is_periodic'] = True
541        f_out.write_header(**header)
542        for index, frame in enumerate(f_in):
543            f_out.write(xyz=frame.xyz, box=random_unitcells[index])
544
545    with DCDFile(testname) as test:
546        for index, frame in enumerate(test):
547            assert_array_almost_equal(frame.unitcell, random_unitcells[index])
548