1# 2# Copyright 2009 Facebook 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15 16"""Implementation of an S3-like storage server based on local files. 17 18Useful to test features that will eventually run on S3, or if you want to 19run something locally that was once running on S3. 20 21We don't support all the features of S3, but it does work with the 22standard S3 client for the most basic semantics. To use the standard 23S3 client with this module: 24 25 c = S3.AWSAuthConnection("", "", server="localhost", port=8888, 26 is_secure=False) 27 c.create_bucket("mybucket") 28 c.put("mybucket", "mykey", "a value") 29 print c.get("mybucket", "mykey").body 30 31""" 32 33import bisect 34import datetime 35import hashlib 36import os 37import os.path 38import urllib 39 40from tornado import escape 41from tornado import httpserver 42from tornado import ioloop 43from tornado import web 44from tornado.util import unicode_type 45from tornado.options import options, define 46 47try: 48 long 49except NameError: 50 long = int 51 52define("port", default=9888, help="TCP port to listen on") 53define("root_directory", default="/tmp/s3", help="Root storage directory") 54define("bucket_depth", default=0, help="Bucket file system depth limit") 55 56 57def start(port, root_directory, bucket_depth): 58 """Starts the mock S3 server on the given port at the given path.""" 59 application = S3Application(root_directory, bucket_depth) 60 http_server = httpserver.HTTPServer(application) 61 http_server.listen(port) 62 ioloop.IOLoop.current().start() 63 64 65class S3Application(web.Application): 66 """Implementation of an S3-like storage server based on local files. 67 68 If bucket depth is given, we break files up into multiple directories 69 to prevent hitting file system limits for number of files in each 70 directories. 1 means one level of directories, 2 means 2, etc. 71 """ 72 def __init__(self, root_directory, bucket_depth=0): 73 web.Application.__init__(self, [ 74 (r"/", RootHandler), 75 (r"/([^/]+)/(.+)", ObjectHandler), 76 (r"/([^/]+)/", BucketHandler), 77 ]) 78 self.directory = os.path.abspath(root_directory) 79 if not os.path.exists(self.directory): 80 os.makedirs(self.directory) 81 self.bucket_depth = bucket_depth 82 83 84class BaseRequestHandler(web.RequestHandler): 85 SUPPORTED_METHODS = ("PUT", "GET", "DELETE") 86 87 def render_xml(self, value): 88 assert isinstance(value, dict) and len(value) == 1 89 self.set_header("Content-Type", "application/xml; charset=UTF-8") 90 name = list(value.keys())[0] 91 parts = [] 92 parts.append('<' + name + 93 ' xmlns="http://doc.s3.amazonaws.com/2006-03-01">') 94 self._render_parts(value[name], parts) 95 parts.append('</' + name + '>') 96 self.finish('<?xml version="1.0" encoding="UTF-8"?>\n' + 97 ''.join(parts)) 98 99 def _render_parts(self, value, parts=[]): 100 if isinstance(value, (unicode_type, bytes)): 101 parts.append(escape.xhtml_escape(value)) 102 elif isinstance(value, (int, long)): 103 parts.append(str(value)) 104 elif isinstance(value, datetime.datetime): 105 parts.append(value.strftime("%Y-%m-%dT%H:%M:%S.000Z")) 106 elif isinstance(value, dict): 107 for name, subvalue in value.items(): 108 if not isinstance(subvalue, list): 109 subvalue = [subvalue] 110 for subsubvalue in subvalue: 111 parts.append('<' + name + '>') 112 self._render_parts(subsubvalue, parts) 113 parts.append('</' + name + '>') 114 else: 115 raise Exception("Unknown S3 value type %r", value) 116 117 def _object_path(self, bucket, object_name): 118 if self.application.bucket_depth < 1: 119 return os.path.abspath(os.path.join( 120 self.application.directory, bucket, object_name)) 121 hash = hashlib.md5(object_name).hexdigest() 122 path = os.path.abspath(os.path.join( 123 self.application.directory, bucket)) 124 for i in range(self.application.bucket_depth): 125 path = os.path.join(path, hash[:2 * (i + 1)]) 126 return os.path.join(path, object_name) 127 128 129class RootHandler(BaseRequestHandler): 130 def get(self): 131 names = os.listdir(self.application.directory) 132 buckets = [] 133 for name in names: 134 path = os.path.join(self.application.directory, name) 135 info = os.stat(path) 136 buckets.append({ 137 "Name": name, 138 "CreationDate": datetime.datetime.utcfromtimestamp( 139 info.st_ctime), 140 }) 141 self.render_xml({"ListAllMyBucketsResult": { 142 "Buckets": {"Bucket": buckets}, 143 }}) 144 145 146class BucketHandler(BaseRequestHandler): 147 def get(self, bucket_name): 148 prefix = self.get_argument("prefix", u"") 149 marker = self.get_argument("marker", u"") 150 max_keys = int(self.get_argument("max-keys", 50000)) 151 path = os.path.abspath(os.path.join(self.application.directory, 152 bucket_name)) 153 terse = int(self.get_argument("terse", 0)) 154 if not path.startswith(self.application.directory) or \ 155 not os.path.isdir(path): 156 raise web.HTTPError(404) 157 object_names = [] 158 for root, dirs, files in os.walk(path): 159 for file_name in files: 160 object_names.append(os.path.join(root, file_name)) 161 skip = len(path) + 1 162 for i in range(self.application.bucket_depth): 163 skip += 2 * (i + 1) + 1 164 object_names = [n[skip:] for n in object_names] 165 object_names.sort() 166 contents = [] 167 168 start_pos = 0 169 if marker: 170 start_pos = bisect.bisect_right(object_names, marker, start_pos) 171 if prefix: 172 start_pos = bisect.bisect_left(object_names, prefix, start_pos) 173 174 truncated = False 175 for object_name in object_names[start_pos:]: 176 if not object_name.startswith(prefix): 177 break 178 if len(contents) >= max_keys: 179 truncated = True 180 break 181 object_path = self._object_path(bucket_name, object_name) 182 c = {"Key": object_name} 183 if not terse: 184 info = os.stat(object_path) 185 c.update({ 186 "LastModified": datetime.datetime.utcfromtimestamp( 187 info.st_mtime), 188 "Size": info.st_size, 189 }) 190 contents.append(c) 191 marker = object_name 192 self.render_xml({"ListBucketResult": { 193 "Name": bucket_name, 194 "Prefix": prefix, 195 "Marker": marker, 196 "MaxKeys": max_keys, 197 "IsTruncated": truncated, 198 "Contents": contents, 199 }}) 200 201 def put(self, bucket_name): 202 path = os.path.abspath(os.path.join( 203 self.application.directory, bucket_name)) 204 if not path.startswith(self.application.directory) or \ 205 os.path.exists(path): 206 raise web.HTTPError(403) 207 os.makedirs(path) 208 self.finish() 209 210 def delete(self, bucket_name): 211 path = os.path.abspath(os.path.join( 212 self.application.directory, bucket_name)) 213 if not path.startswith(self.application.directory) or \ 214 not os.path.isdir(path): 215 raise web.HTTPError(404) 216 if len(os.listdir(path)) > 0: 217 raise web.HTTPError(403) 218 os.rmdir(path) 219 self.set_status(204) 220 self.finish() 221 222 223class ObjectHandler(BaseRequestHandler): 224 def get(self, bucket, object_name): 225 object_name = urllib.unquote(object_name) 226 path = self._object_path(bucket, object_name) 227 if not path.startswith(self.application.directory) or \ 228 not os.path.isfile(path): 229 raise web.HTTPError(404) 230 info = os.stat(path) 231 self.set_header("Content-Type", "application/unknown") 232 self.set_header("Last-Modified", datetime.datetime.utcfromtimestamp( 233 info.st_mtime)) 234 object_file = open(path, "rb") 235 try: 236 self.finish(object_file.read()) 237 finally: 238 object_file.close() 239 240 def put(self, bucket, object_name): 241 object_name = urllib.unquote(object_name) 242 bucket_dir = os.path.abspath(os.path.join( 243 self.application.directory, bucket)) 244 if not bucket_dir.startswith(self.application.directory) or \ 245 not os.path.isdir(bucket_dir): 246 raise web.HTTPError(404) 247 path = self._object_path(bucket, object_name) 248 if not path.startswith(bucket_dir) or os.path.isdir(path): 249 raise web.HTTPError(403) 250 directory = os.path.dirname(path) 251 if not os.path.exists(directory): 252 os.makedirs(directory) 253 object_file = open(path, "w") 254 object_file.write(self.request.body) 255 object_file.close() 256 self.finish() 257 258 def delete(self, bucket, object_name): 259 object_name = urllib.unquote(object_name) 260 path = self._object_path(bucket, object_name) 261 if not path.startswith(self.application.directory) or \ 262 not os.path.isfile(path): 263 raise web.HTTPError(404) 264 os.unlink(path) 265 self.set_status(204) 266 self.finish() 267 268 269if __name__ == "__main__": 270 options.parse_command_line() 271 start(options.port, options.root_directory, options.bucket_depth) 272