1from numpy.random.mtrand import uniform
2from netCDF4 import Dataset
3from netCDF4.utils import _quantize
4from numpy.testing import assert_almost_equal
5import os, tempfile, unittest
6
7ndim = 100000
8ndim2 = 100
9chunk1 = 10; chunk2 = ndim2
10nfiles = 7
11files = [tempfile.NamedTemporaryFile(suffix='.nc', delete=False).name for nfile in range(nfiles)]
12array = uniform(size=(ndim,))
13array2 = uniform(size=(ndim,ndim2))
14lsd = 3
15
16def write_netcdf(filename,zlib,least_significant_digit,data,dtype='f8',shuffle=False,contiguous=False,\
17                 chunksizes=None,complevel=6,fletcher32=False):
18    file = Dataset(filename,'w')
19    file.createDimension('n', ndim)
20    foo = file.createVariable('data',\
21            dtype,('n'),zlib=zlib,least_significant_digit=least_significant_digit,\
22            shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes)
23    foo[:] = data
24    file.close()
25    file = Dataset(filename)
26    data = file.variables['data'][:]
27    file.close()
28
29def write_netcdf2(filename,zlib,least_significant_digit,data,dtype='f8',shuffle=False,contiguous=False,\
30                 chunksizes=None,complevel=6,fletcher32=False):
31    file = Dataset(filename,'w')
32    file.createDimension('n', ndim)
33    file.createDimension('n2', ndim2)
34    foo = file.createVariable('data2',\
35            dtype,('n','n2'),zlib=zlib,least_significant_digit=least_significant_digit,\
36            shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes)
37    foo[:] = data
38    file.close()
39    file = Dataset(filename)
40    data = file.variables['data2'][:]
41    file.close()
42
43class CompressionTestCase(unittest.TestCase):
44
45    def setUp(self):
46        self.files = files
47        # no compression
48        write_netcdf(self.files[0],False,None,array)
49        # compressed, lossless, no shuffle.
50        write_netcdf(self.files[1],True,None,array)
51        # compressed, lossless, with shuffle.
52        write_netcdf(self.files[2],True,None,array,shuffle=True)
53        # compressed, lossy, no shuffle.
54        write_netcdf(self.files[3],True,lsd,array)
55        # compressed, lossy, with shuffle.
56        write_netcdf(self.files[4],True,lsd,array,shuffle=True)
57        # compressed, lossy, with shuffle and fletcher32 checksum.
58        write_netcdf(self.files[5],True,lsd,array,shuffle=True,fletcher32=True)
59        # 2-d compressed, lossy, with shuffle and fletcher32 checksum and
60        # chunksizes.
61        write_netcdf2(self.files[6],True,lsd,array2,shuffle=True,fletcher32=True,chunksizes=(chunk1,chunk2))
62
63    def tearDown(self):
64        # Remove the temporary files
65        for file in self.files:
66            os.remove(file)
67
68    def runTest(self):
69        """testing zlib and shuffle compression filters"""
70        uncompressed_size = os.stat(self.files[0]).st_size
71        # check compressed data.
72        f = Dataset(self.files[1])
73        size = os.stat(self.files[1]).st_size
74        assert_almost_equal(array,f.variables['data'][:])
75        assert f.variables['data'].filters() == {'zlib':True,'shuffle':False,'complevel':6,'fletcher32':False}
76        assert(size < 0.95*uncompressed_size)
77        f.close()
78        # check compression with shuffle
79        f = Dataset(self.files[2])
80        size = os.stat(self.files[2]).st_size
81        assert_almost_equal(array,f.variables['data'][:])
82        assert f.variables['data'].filters() == {'zlib':True,'shuffle':True,'complevel':6,'fletcher32':False}
83        assert(size < 0.85*uncompressed_size)
84        f.close()
85        # check lossy compression without shuffle
86        f = Dataset(self.files[3])
87        size = os.stat(self.files[3]).st_size
88        checkarray = _quantize(array,lsd)
89        assert_almost_equal(checkarray,f.variables['data'][:])
90        assert(size < 0.27*uncompressed_size)
91        f.close()
92        # check lossy compression with shuffle
93        f = Dataset(self.files[4])
94        size = os.stat(self.files[4]).st_size
95        assert_almost_equal(checkarray,f.variables['data'][:])
96        assert(size < 0.20*uncompressed_size)
97        size_save = size
98        f.close()
99        # check lossy compression with shuffle and fletcher32 checksum.
100        f = Dataset(self.files[5])
101        size = os.stat(self.files[5]).st_size
102        assert_almost_equal(checkarray,f.variables['data'][:])
103        assert f.variables['data'].filters() == {'zlib':True,'shuffle':True,'complevel':6,'fletcher32':True}
104        assert(size < 0.20*uncompressed_size)
105        # should be slightly larger than without fletcher32
106        assert(size > size_save)
107        # check chunksizes
108        f.close()
109        f = Dataset(self.files[6])
110        checkarray2 = _quantize(array2,lsd)
111        assert_almost_equal(checkarray2,f.variables['data2'][:])
112        assert f.variables['data2'].filters() == {'zlib':True,'shuffle':True,'complevel':6,'fletcher32':True}
113        assert f.variables['data2'].chunking() == [chunk1,chunk2]
114        f.close()
115
116if __name__ == '__main__':
117    unittest.main()
118