1from numpy.random.mtrand import uniform 2from netCDF4 import Dataset 3from netCDF4.utils import _quantize 4from numpy.testing import assert_almost_equal 5import os, tempfile, unittest 6 7ndim = 100000 8ndim2 = 100 9chunk1 = 10; chunk2 = ndim2 10nfiles = 7 11files = [tempfile.NamedTemporaryFile(suffix='.nc', delete=False).name for nfile in range(nfiles)] 12array = uniform(size=(ndim,)) 13array2 = uniform(size=(ndim,ndim2)) 14lsd = 3 15 16def write_netcdf(filename,zlib,least_significant_digit,data,dtype='f8',shuffle=False,contiguous=False,\ 17 chunksizes=None,complevel=6,fletcher32=False): 18 file = Dataset(filename,'w') 19 file.createDimension('n', ndim) 20 foo = file.createVariable('data',\ 21 dtype,('n'),zlib=zlib,least_significant_digit=least_significant_digit,\ 22 shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes) 23 foo[:] = data 24 file.close() 25 file = Dataset(filename) 26 data = file.variables['data'][:] 27 file.close() 28 29def write_netcdf2(filename,zlib,least_significant_digit,data,dtype='f8',shuffle=False,contiguous=False,\ 30 chunksizes=None,complevel=6,fletcher32=False): 31 file = Dataset(filename,'w') 32 file.createDimension('n', ndim) 33 file.createDimension('n2', ndim2) 34 foo = file.createVariable('data2',\ 35 dtype,('n','n2'),zlib=zlib,least_significant_digit=least_significant_digit,\ 36 shuffle=shuffle,contiguous=contiguous,complevel=complevel,fletcher32=fletcher32,chunksizes=chunksizes) 37 foo[:] = data 38 file.close() 39 file = Dataset(filename) 40 data = file.variables['data2'][:] 41 file.close() 42 43class CompressionTestCase(unittest.TestCase): 44 45 def setUp(self): 46 self.files = files 47 # no compression 48 write_netcdf(self.files[0],False,None,array) 49 # compressed, lossless, no shuffle. 50 write_netcdf(self.files[1],True,None,array) 51 # compressed, lossless, with shuffle. 52 write_netcdf(self.files[2],True,None,array,shuffle=True) 53 # compressed, lossy, no shuffle. 54 write_netcdf(self.files[3],True,lsd,array) 55 # compressed, lossy, with shuffle. 56 write_netcdf(self.files[4],True,lsd,array,shuffle=True) 57 # compressed, lossy, with shuffle and fletcher32 checksum. 58 write_netcdf(self.files[5],True,lsd,array,shuffle=True,fletcher32=True) 59 # 2-d compressed, lossy, with shuffle and fletcher32 checksum and 60 # chunksizes. 61 write_netcdf2(self.files[6],True,lsd,array2,shuffle=True,fletcher32=True,chunksizes=(chunk1,chunk2)) 62 63 def tearDown(self): 64 # Remove the temporary files 65 for file in self.files: 66 os.remove(file) 67 68 def runTest(self): 69 """testing zlib and shuffle compression filters""" 70 uncompressed_size = os.stat(self.files[0]).st_size 71 # check compressed data. 72 f = Dataset(self.files[1]) 73 size = os.stat(self.files[1]).st_size 74 assert_almost_equal(array,f.variables['data'][:]) 75 assert f.variables['data'].filters() == {'zlib':True,'shuffle':False,'complevel':6,'fletcher32':False} 76 assert(size < 0.95*uncompressed_size) 77 f.close() 78 # check compression with shuffle 79 f = Dataset(self.files[2]) 80 size = os.stat(self.files[2]).st_size 81 assert_almost_equal(array,f.variables['data'][:]) 82 assert f.variables['data'].filters() == {'zlib':True,'shuffle':True,'complevel':6,'fletcher32':False} 83 assert(size < 0.85*uncompressed_size) 84 f.close() 85 # check lossy compression without shuffle 86 f = Dataset(self.files[3]) 87 size = os.stat(self.files[3]).st_size 88 checkarray = _quantize(array,lsd) 89 assert_almost_equal(checkarray,f.variables['data'][:]) 90 assert(size < 0.27*uncompressed_size) 91 f.close() 92 # check lossy compression with shuffle 93 f = Dataset(self.files[4]) 94 size = os.stat(self.files[4]).st_size 95 assert_almost_equal(checkarray,f.variables['data'][:]) 96 assert(size < 0.20*uncompressed_size) 97 size_save = size 98 f.close() 99 # check lossy compression with shuffle and fletcher32 checksum. 100 f = Dataset(self.files[5]) 101 size = os.stat(self.files[5]).st_size 102 assert_almost_equal(checkarray,f.variables['data'][:]) 103 assert f.variables['data'].filters() == {'zlib':True,'shuffle':True,'complevel':6,'fletcher32':True} 104 assert(size < 0.20*uncompressed_size) 105 # should be slightly larger than without fletcher32 106 assert(size > size_save) 107 # check chunksizes 108 f.close() 109 f = Dataset(self.files[6]) 110 checkarray2 = _quantize(array2,lsd) 111 assert_almost_equal(checkarray2,f.variables['data2'][:]) 112 assert f.variables['data2'].filters() == {'zlib':True,'shuffle':True,'complevel':6,'fletcher32':True} 113 assert f.variables['data2'].chunking() == [chunk1,chunk2] 114 f.close() 115 116if __name__ == '__main__': 117 unittest.main() 118