1# WCS response decoder. 2# Decodes response from a WCS (either a Coverages XML document or a Multipart MIME) 3# and extracts the urls of the coverage data. 4# Copyright (c) 2007 STFC <http://www.stfc.ac.uk> 5# Author: Dominic Lowe, STFC 6# contact email: d.lowe@rl.ac.uk 7# 8# Multipart MIME decoding based on http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/86676 9 10# example: used in conjunction with ows lib wcs: 11 12# from owslib import wcsdecoder 13# u=wcs.getcoverage(identifier=['TuMYrRQ4'], timeSequence=['2792-06-01T00:00:00.0'], bbox=(-112,36,-106,41), 14# format='application/netcdf', store='true') 15# decoder=wcsdecoder.WCSDecoder(u) 16# decoder.getCoverages() 17 18import os 19from owslib.etree import etree 20import email 21import errno 22import mimetypes 23 24 25class WCSDecoder(object): 26 def __init__(self, u): 27 ''' initiate with a urllib url object.''' 28 self.u = u 29 self._getType() 30 31 def _getType(self): 32 ''' determine whether it is a Multipart Mime or a Coverages XML file''' 33 34 # what's the best way to test this? 35 # for now read start of file 36 tempu = self.u 37 if tempu.readline()[:14] == '<?xml version=': 38 self.urlType = 'XML' 39 else: 40 self.urlType = 'Multipart' 41 42 def getCoverages(self, unpackdir='./unpacked'): 43 if self.urlType == 'XML': 44 paths = [] 45 u_xml = self.u.read() 46 u_tree = etree.fromstring(u_xml) 47 for ref in u_tree.findall( 48 '{http://www.opengis.net/wcs/1.1}Coverage/{http://www.opengis.net/wcs/1.1}Reference'): 49 path = ref.attrib['{http://www.w3.org/1999/xlink}href'] 50 paths.append(path) 51 for ref in u_tree.findall( 52 '{http://www.opengis.net/wcs/1.1.0/owcs}Coverage/{{http://www.opengis.net/wcs/1.1.0/owcs}Reference'): # noqa 53 path = ref.attrib['{http://www.w3.org/1999/xlink}href'] 54 paths.append(path) 55 elif self.urlType == 'Multipart': 56 # Decode multipart mime and return fileobjects 57 u_mpart = self.u.read() 58 mpart = MpartMime(u_mpart) 59 paths = mpart.unpackToDir(unpackdir) 60 return paths 61 62 63class MpartMime(object): 64 def __init__(self, mpartmime): 65 """ mpartmime is a multipart mime file that has already been read in.""" 66 self.mpartmime = mpartmime 67 68 def unpackToDir(self, unpackdir): 69 """ unpacks contents of Multipart mime to a given directory""" 70 71 names = [] 72 # create the directory if it doesn't exist: 73 try: 74 os.mkdir(unpackdir) 75 except OSError as e: 76 # Ignore directory exists error 77 if e.errno != errno.EEXIST: 78 raise 79 80 # now walk through the multipart mime and write out files 81 msg = email.message_from_string(self.mpartmime) 82 counter = 1 83 for part in msg.walk(): 84 # multipart/* are just containers, ignore 85 if part.get_content_maintype() == 'multipart': 86 continue 87 # Applications should really check the given filename so that an 88 # email message can't be used to overwrite important files 89 filename = part.get_filename() 90 if not filename: 91 try: 92 ext = mimetypes.guess_extension(part.get_type()) 93 except Exception: 94 ext = None 95 if not ext: 96 # Use a generic extension 97 ext = '.bin' 98 filename = 'part-%03d%s' % (counter, ext) 99 fullpath = os.path.join(unpackdir, filename) 100 names.append(fullpath) 101 fp = open(fullpath, 'wb') 102 fp.write(part.get_payload(decode=True)) 103 fp.close() 104 return names 105