1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# This code is in the public domain, so as to serve as a template for 4# real-world plugins. 5# or, at the choice of the licensee, 6# Copyright 2019 Even Rouault 7# SPDX-License-Identifier: MIT 8 9# Metadata parsed by GDAL C++ code at driver pre-loading, starting with '# gdal: ' 10# Required and with that exact syntax since it is parsed by non-Python 11# aware code. So just literal values, no expressions, etc. 12# gdal: DRIVER_NAME = "PASSTHROUGH" 13# API version(s) supported. Must include 1 currently 14# gdal: DRIVER_SUPPORTED_API_VERSION = [1] 15# gdal: DRIVER_DCAP_VECTOR = "YES" 16# gdal: DRIVER_DMD_LONGNAME = "Passthrough driver" 17# gdal: DRIVER_DMD_CONNECTION_PREFIX = "PASSTHROUGH:" 18 19from osgeo import gdal, ogr 20 21try: 22 # The gdal_python_driver module is defined by the GDAL library at runtime 23 from gdal_python_driver import BaseDriver, BaseDataset, BaseLayer 24except ImportError: 25 # To be able to run in standalone mode 26 class BaseDriver(object): 27 pass 28 29 class BaseDataset(object): 30 pass 31 32 class BaseLayer(object): 33 RandomRead = 'RandomRead' 34 FastSpatialFilter = 'FastSpatialFilter' 35 FastFeatureCount = 'FastFeatureCount' 36 FastGetExtent = 'FastGetExtent' 37 StringsAsUTF8 = 'StringsAsUTF8' 38 pass 39 40 41class Layer(BaseLayer): 42 43 def __init__(self, gdal_layer): 44 self.gdal_layer = gdal_layer 45 self.name = gdal_layer.GetName() 46 self.fid_name = gdal_layer.GetFIDColumn() 47 self.metadata = gdal_layer.GetMetadata_Dict() 48 self.iterator_honour_attribute_filter = True 49 self.iterator_honour_spatial_filter = True 50 self.feature_count_honour_attribute_filter = True 51 self.feature_count_honour_spatial_filter = True 52 53 def fields(self): 54 res = [] 55 layer_defn = self.gdal_layer.GetLayerDefn() 56 for i in range(layer_defn.GetFieldCount()): 57 ogr_field_def = layer_defn.GetFieldDefn(i) 58 field_def = {"name": ogr_field_def.GetName(), 59 "type": ogr_field_def.GetType()} 60 res.append(field_def) 61 return res 62 63 def geometry_fields(self): 64 res = [] 65 layer_defn = self.gdal_layer.GetLayerDefn() 66 for i in range(layer_defn.GetGeomFieldCount()): 67 ogr_field_def = layer_defn.GetGeomFieldDefn(i) 68 field_def = {"name": ogr_field_def.GetName(), 69 "type": ogr_field_def.GetType()} 70 srs = ogr_field_def.GetSpatialRef() 71 if srs: 72 field_def["srs"] = srs.ExportToWkt() 73 res.append(field_def) 74 return res 75 76 def test_capability(self, cap): 77 if cap in (BaseLayer.FastGetExtent, BaseLayer.StringsAsUTF8, 78 BaseLayer.RandomRead, BaseLayer.FastFeatureCount): 79 return self.gdal_layer.TestCapability(cap) 80 return False 81 82 def extent(self, force_computation): 83 # Impedance mismatch between SWIG GetExtent() and the Python 84 # driver API 85 minx, maxx, miny, maxy = self.gdal_layer.GetExtent(force_computation) 86 return [minx, miny, maxx, maxy] 87 88 def feature_count(self, force_computation): 89 return self.gdal_layer.GetFeatureCount(True) 90 91 def attribute_filter_changed(self): 92 if self.attribute_filter: 93 self.gdal_layer.SetAttributeFilter(str(self.attribute_filter)) 94 else: 95 self.gdal_layer.SetAttributeFilter(None) 96 97 def spatial_filter_changed(self): 98 # the 'inf' test is just for a test_ogrsf oddity 99 if self.spatial_filter and 'inf' not in self.spatial_filter: 100 self.gdal_layer.SetSpatialFilter( 101 ogr.CreateGeometryFromWkt(self.spatial_filter)) 102 else: 103 self.gdal_layer.SetSpatialFilter(None) 104 105 def _translate_feature(self, ogr_f): 106 fields = {} 107 layer_defn = ogr_f.GetDefnRef() 108 for i in range(ogr_f.GetFieldCount()): 109 if ogr_f.IsFieldSet(i): 110 fields[layer_defn.GetFieldDefn( 111 i).GetName()] = ogr_f.GetField(i) 112 geom_fields = {} 113 for i in range(ogr_f.GetGeomFieldCount()): 114 g = ogr_f.GetGeomFieldRef(i) 115 if g: 116 geom_fields[layer_defn.GetGeomFieldDefn( 117 i).GetName()] = g.ExportToIsoWkt() 118 return {'id': ogr_f.GetFID(), 119 'type': 'OGRFeature', 120 'style': ogr_f.GetStyleString(), 121 'fields': fields, 122 'geometry_fields': geom_fields} 123 124 def __iter__(self): 125 for f in self.gdal_layer: 126 yield self._translate_feature(f) 127 128 def feature_by_id(self, fid): 129 ogr_f = self.gdal_layer.GetFeature(fid) 130 if not ogr_f: 131 return None 132 return self._translate_feature(ogr_f) 133 134 135class Dataset(BaseDataset): 136 137 def __init__(self, gdal_ds): 138 self.gdal_ds = gdal_ds 139 self.layers = [Layer(gdal_ds.GetLayer(idx)) 140 for idx in range(gdal_ds.GetLayerCount())] 141 self.metadata = gdal_ds.GetMetadata_Dict() 142 143 def close(self): 144 del self.gdal_ds 145 self.gdal_ds = None 146 147class Driver(BaseDriver): 148 149 def _identify(self, filename): 150 prefix = 'PASSTHROUGH:' 151 if not filename.startswith(prefix): 152 return None 153 return gdal.OpenEx(filename[len(prefix):], gdal.OF_VECTOR) 154 155 # Required 156 def identify(self, filename, first_bytes, open_flags, open_options={}): 157 return self._identify(filename) is not None 158 159 # Required 160 def open(self, filename, first_bytes, open_flags, open_options={}): 161 gdal_ds = self._identify(filename) 162 if not gdal_ds: 163 return None 164 return Dataset(gdal_ds) 165 166 167# Test as standalone 168if __name__ == '__main__': 169 import sys 170 drv = Driver() 171 assert drv.identify(sys.argv[1], None, 0) 172 ds = drv.open(sys.argv[1], None, 0) 173 for l in ds.layers: 174 l.geometry_fields() 175 l.fields() 176 l.test_capability(BaseLayer.FastGetExtent) 177 l.extent(True) 178 l.feature_count(True) 179 for f in l: 180 print(f) 181