1import argparse 2import sys 3import re 4import json 5from . import SchemaBuilder, __version__ 6 7 8def main(): 9 CLI().run() 10 11 12class CLI: 13 def __init__(self): 14 self._make_parser() 15 self._prepare_args() 16 self.builder = SchemaBuilder(schema_uri=self.args.schema_uri) 17 18 def run(self): 19 if not self.args.schema and not self.args.object: 20 self.fail('noting to do - no schemas or objects given') 21 self.add_schemas() 22 self.add_objects() 23 self.print_output() 24 25 def add_schemas(self): 26 for fp in self.args.schema: 27 self._call_with_json_from_fp(self.builder.add_schema, fp) 28 fp.close() 29 30 def add_objects(self): 31 for fp in self.args.object: 32 self._call_with_json_from_fp(self.builder.add_object, fp) 33 fp.close() 34 35 def print_output(self): 36 print(self.builder.to_json(indent=self.args.indent)) 37 38 def fail(self, message): 39 self.parser.error(message) 40 41 def _make_parser(self): 42 # only support encoding option for Python 3 43 if sys.version_info.major == 3: 44 file_type = argparse.FileType('r', encoding=self._get_encoding()) 45 else: 46 file_type = argparse.FileType('r') 47 48 self.parser = argparse.ArgumentParser( 49 add_help=False, 50 description="""Generate one, unified JSON Schema from one or more 51 JSON objects and/or JSON Schemas. Compatible with JSON-Schema Draft 52 4 and above.""") 53 54 self.parser.add_argument( 55 '-h', '--help', action='help', default=argparse.SUPPRESS, 56 help='Show this help message and exit.') 57 self.parser.add_argument( 58 '--version', action='version', default=argparse.SUPPRESS, 59 version='%(prog)s {}'.format(__version__), 60 help='Show version number and exit.') 61 self.parser.add_argument( 62 '-d', '--delimiter', metavar='DELIM', 63 help="""Set a delimiter. Use this option if the input files 64 contain multiple JSON objects/schemas. You can pass any string. A 65 few cases ('newline', 'tab', 'space') will get converted to a 66 whitespace character. If this option is omitted, the parser will 67 try to auto-detect boundaries.""") 68 if sys.version_info.major == 3: 69 self.parser.add_argument( 70 '-e', '--encoding', type=str, metavar='ENCODING', 71 help="""Use ENCODING instead of the default system encoding 72 when reading files. ENCODING must be a valid codec name or 73 alias.""") 74 self.parser.add_argument( 75 '-i', '--indent', type=int, metavar='SPACES', 76 help="""Pretty-print the output, indenting SPACES spaces.""") 77 self.parser.add_argument( 78 '-s', '--schema', action='append', default=[], type=file_type, 79 help="""File containing a JSON Schema (can be specified multiple 80 times to merge schemas).""") 81 self.parser.add_argument( 82 '-$', '--schema-uri', metavar='SCHEMA_URI', dest='schema_uri', 83 default=SchemaBuilder.DEFAULT_URI, 84 help="""The value of the '$schema' keyword (defaults to {default!r} 85 or can be specified in a schema with the -s option). If {null!r} is 86 passed, the "$schema" keyword will not be included in the 87 result.""".format(default=SchemaBuilder.DEFAULT_URI, 88 null=SchemaBuilder.NULL_URI)) 89 self.parser.add_argument( 90 'object', nargs=argparse.REMAINDER, type=file_type, 91 help="""Files containing JSON objects (defaults to stdin if no 92 arguments are passed).""") 93 94 def _get_encoding(self): 95 """ 96 use separate arg parser to grab encoding argument before 97 defining FileType args 98 """ 99 parser = argparse.ArgumentParser(add_help=False) 100 parser.add_argument('-e', '--encoding', type=str) 101 args, _ = parser.parse_known_args() 102 return args.encoding 103 104 def _prepare_args(self): 105 self.args = self.parser.parse_args() 106 self._prepare_delimiter() 107 108 # default to stdin if no objects or schemas 109 if not self.args.object and not sys.stdin.isatty(): 110 self.args.object.append(sys.stdin) 111 112 def _prepare_delimiter(self): 113 """ 114 manage special conversions for difficult bash characters 115 """ 116 if self.args.delimiter == 'newline': 117 self.args.delimiter = '\n' 118 elif self.args.delimiter == 'tab': 119 self.args.delimiter = '\t' 120 elif self.args.delimiter == 'space': 121 self.args.delimiter = ' ' 122 123 def _call_with_json_from_fp(self, method, fp): 124 for json_string in self._get_json_strings(fp.read().strip()): 125 method(json.loads(json_string)) 126 127 def _get_json_strings(self, raw_text): 128 if self.args.delimiter is None or self.args.delimiter == '': 129 json_strings = self._detect_json_strings(raw_text) 130 else: 131 json_strings = raw_text.split(self.args.delimiter) 132 133 # sanitize data before returning 134 return [string.strip() for string in json_strings if string.strip()] 135 136 @staticmethod 137 def _detect_json_strings(raw_text): 138 """ 139 Use regex with lookaround to spot the boundaries between JSON 140 objects. Unfortunately, it has to match *something*, so at least 141 one character must be removed and replaced. 142 """ 143 strings = re.split(r'}\s*(?={)', raw_text) 144 145 # put back the stripped character 146 json_strings = [string + '}' for string in strings[:-1]] 147 148 # the last one doesn't need to be modified 149 json_strings.append(strings[-1]) 150 151 return json_strings 152