1# -*- coding: utf-8 -*- 2# This file is part of GNU Dico. 3# Copyright (C) 2008-2010, 2012, 2013, 2015 Wojciech Polak 4# 5# GNU Dico is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 3, or (at your option) 8# any later version. 9# 10# GNU Dico is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with GNU Dico. If not, see <http://www.gnu.org/licenses/>. 17 18import re 19import socket 20import base64 21import quopri 22 23try: 24 from django.utils.six.moves import range, reduce 25except ImportError: 26 from six.moves import range, reduce 27 28__version__ = '1.1' 29 30 31class DicoClient: 32 33 """GNU Dico client module written in Python 34 (a part of GNU Dico software)""" 35 36 host = None 37 levenshtein_distance = 0 38 mime = False 39 40 verbose = 0 41 timeout = 10 42 transcript = False 43 __connected = False 44 45 def __init__(self, host=None): 46 if host != None: 47 self.host = host 48 49 def __del__(self): 50 if self.__connected: 51 self.socket.close() 52 53 def open(self, host=None, port=2628): 54 """Open the connection to the DICT server.""" 55 if host != None: 56 self.host = host 57 if self.verbose: 58 self.__debug('Connecting to %s:%d' % (self.host, port)) 59 socket.setdefaulttimeout(int(self.timeout)) 60 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 61 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 62 self.socket.connect((self.host, port)) 63 self.__connected = True 64 self.fd = self.socket.makefile() 65 66 self.server_banner = self.__read()[0] 67 capas, msgid = re.search('<(.*)> (<.*>)$', 68 self.server_banner).groups() 69 self.server_capas = capas.split('.') 70 self.server_msgid = msgid 71 72 self.__send_client() 73 self.__read() 74 75 def close(self): 76 """Close the connection.""" 77 if self.__connected: 78 self.__send_quit() 79 self.__read() 80 self.socket.close() 81 self.__connected = False 82 83 def option(self, name, *args): 84 """Send the OPTION command.""" 85 if self.__connected: 86 self.__send('OPTION %s%s' % 87 (name, reduce(lambda x, y: str(x) + ' ' + str(y), 88 args, ''))) 89 res = self.__read() 90 code, msg = res[0].split(' ', 1) 91 if int(code) == 250: 92 if name.lower() == 'mime': 93 self.mime = True 94 return True 95 return False 96 97 def __get_mime(self, lines): 98 cnt = 0 99 mimeinfo = {} 100 firstline = lines[0].lower() 101 if firstline.find ('content-type:') != -1 or \ 102 firstline.find('content-transfer-encoding:') != -1: 103 cnt += 1 104 for line in lines: 105 if line == '': 106 break 107 t = line.split(':', 1) 108 mimeinfo[t[0].lower()] = t[1].strip() 109 cnt += 1 110 for i in range(0, cnt): 111 lines.pop(0) 112 else: 113 lines.pop(0) 114 if 'content-transfer-encoding' in mimeinfo: 115 if mimeinfo['content-transfer-encoding'].lower() == 'base64': 116 buf = base64.decodestring('\n'.join(lines)) 117 lines[:] = (buf.split('\r\n')) 118 if lines[-1] == '': 119 del lines[-1] 120 del mimeinfo['content-transfer-encoding'] 121 elif mimeinfo['content-transfer-encoding'].lower() == 'quoted-printable': 122 buf = quopri.decodestring('\n'.join(lines)) 123 try: 124 lines[:] = buf.split('\r\n') 125 except TypeError: 126 lines[:] = buf.decode('utf-8').split('\r\n') 127 if lines[-1] == '': 128 del lines[-1] 129 del mimeinfo['content-transfer-encoding'] 130 return mimeinfo 131 132 def __get_rs(self, line): 133 code, text = line.split(' ', 1) 134 code = int(code) 135 return code, text 136 137 def __read(self): 138 if not self.__connected: 139 raise DicoNotConnectedError('Not connected') 140 buf = [] 141 line = self.__readline() 142 if len(line) == 0: 143 raise DicoNotConnectedError('Not connected') 144 buf.append(line) 145 code, text = self.__get_rs(line) 146 147 if code >= 100 and code < 200: 148 if code == 150: 149 while True: 150 rs = self.__readline() 151 code, text = self.__get_rs(rs) 152 if code != 151: 153 buf.append(rs) 154 break 155 buf.append([rs, self.__readblock()]) 156 else: 157 buf.append(self.__readblock()) 158 buf.append(self.__readline()) 159 return buf 160 161 def __readline(self): 162 line = self.fd.readline().rstrip() 163 if self.transcript: 164 self.__debug('S:%s' % line) 165 return line 166 167 def __readblock(self): 168 buf = [] 169 while True: 170 line = self.__readline() 171 if line == '.': 172 break 173 buf.append(line) 174 return buf 175 176 def __send(self, command): 177 if not self.__connected: 178 raise DicoNotConnectedError('Not connected') 179 cmd = command + "\r\n" 180 try: 181 self.socket.send(cmd) 182 except (UnicodeEncodeError, TypeError): 183 self.socket.send(cmd.encode('utf-8')) 184 if self.transcript: 185 self.__debug('C:%s' % command) 186 187 def __send_client(self): 188 if self.verbose: 189 self.__debug('Sending client information') 190 self.__send('CLIENT "%s %s"' % ("GNU Dico (Python Edition)", 191 __version__)) 192 193 def __send_quit(self): 194 if self.verbose: 195 self.__debug('Quitting') 196 self.__send('QUIT') 197 198 def __send_show(self, what, arg=None): 199 if arg != None: 200 self.__send('SHOW %s "%s"' % (what, arg)) 201 else: 202 self.__send('SHOW %s' % what) 203 return self.__read() 204 205 def __send_define(self, database, word): 206 if self.verbose: 207 self.__debug('Sending query for word "%s" in database "%s"' % 208 (word, database)) 209 self.__send('DEFINE "%s" "%s"' % (database, word)) 210 return self.__read() 211 212 def __send_match(self, database, strategy, word): 213 if self.verbose: 214 self.__debug('Sending query to match word "%s" in database "%s", using "%s"' 215 % (word, database, strategy)) 216 self.__send('MATCH "%s" "%s" "%s"' % (database, strategy, word)) 217 return self.__read() 218 219 def __send_xlev(self, distance): 220 self.__send('XLEV %u' % distance) 221 return self.__read() 222 223 def show_databases(self): 224 """List all accessible databases.""" 225 if self.verbose: 226 self.__debug('Getting list of databases') 227 res = self.__send_show('DATABASES') 228 if self.mime: 229 mimeinfo = self.__get_mime(res[1]) 230 dbs_res = res[1:-1][0] 231 dbs = [] 232 for d in dbs_res: 233 short_name, full_name = d.split(' ', 1) 234 dbs.append([short_name, self.__unquote(full_name)]) 235 dct = { 236 'count': len(dbs), 237 'databases': dbs, 238 } 239 return dct 240 241 def show_strategies(self): 242 """List available matching strategies.""" 243 if self.verbose: 244 self.__debug('Getting list of strategies') 245 res = self.__send_show('STRATEGIES') 246 if self.mime: 247 mimeinfo = self.__get_mime(res[1]) 248 sts_res = res[1:-1][0] 249 sts = [] 250 for s in sts_res: 251 short_name, full_name = s.split(' ', 1) 252 sts.append([short_name, self.__unquote(full_name)]) 253 dct = { 254 'count': len(sts), 255 'strategies': sts, 256 } 257 return dct 258 259 def show_info(self, database): 260 """Provide information about the database.""" 261 res = self.__send_show("INFO", database) 262 code, msg = res[0].split(' ', 1) 263 if int(code) < 500: 264 if self.mime: 265 mimeinfo = self.__get_mime(res[1]) 266 dsc = res[1] 267 return {'desc': '\n'.join(dsc)} 268 else: 269 return {'error': code, 'msg': msg} 270 271 def show_lang_db(self): 272 """Show databases with their language preferences.""" 273 res = self.__send_show('LANG DB') 274 code, msg = res[0].split(' ', 1) 275 if int(code) < 500: 276 if self.mime: 277 mimeinfo = self.__get_mime(res[1]) 278 dsc = res[1] 279 lang_src = {} 280 lang_dst = {} 281 for i in dsc: 282 pair = i.split(' ', 1)[1] 283 src, dst = pair.split(':', 1) 284 for j in src: 285 lang_src[src.strip()] = True 286 for j in dst: 287 lang_dst[dst.strip()] = True 288 return { 289 'desc': '\n'.join(dsc), 290 'lang_src': list(lang_src.keys()), 291 'lang_dst': list(lang_dst.keys()), 292 } 293 else: 294 return {'error': code, 'msg': msg} 295 296 def show_lang_pref(self): 297 """Show server language preferences.""" 298 res = self.__send_show('LANG PREF') 299 code, msg = res[0].split(' ', 1) 300 if int(code) < 500: 301 return {'msg': msg} 302 else: 303 return {'error': code, 'msg': msg} 304 305 def show_server(self): 306 """Provide site-specific information.""" 307 res = self.__send_show('SERVER') 308 code, msg = res[0].split(' ', 1) 309 if int(code) < 500: 310 dsc = res[1] 311 return {'desc': '\n'.join(dsc)} 312 else: 313 return {'error': code, 'msg': msg} 314 315 def define(self, database, word): 316 """Look up word in database.""" 317 database = database.replace('"', "\\\"") 318 word = word.replace('"', "\\\"") 319 res = self.__send_define(database, word) 320 code, msg = res[-1].split(' ', 1) 321 if int(code) < 500: 322 defs_res = res[1:-1] 323 defs = [] 324 rx = re.compile( 325 '^\d+ ("[^"]+"|\w+) ([a-zA-Z0-9_\-]+) ("[^"]*"|\w+)') 326 for i in defs_res: 327 term, db, db_fullname = rx.search(i[0]).groups() 328 df = { 329 'term': self.__unquote(term), 330 'db': db, 331 'db_fullname': self.__unquote(db_fullname), 332 } 333 if self.mime: 334 mimeinfo = self.__get_mime(i[1]) 335 df.update(mimeinfo) 336 df['desc'] = '\n'.join(i[1]) 337 defs.append(df) 338 dct = { 339 'count': len(defs), 340 'definitions': defs, 341 } 342 return dct 343 else: 344 return {'error': code, 'msg': msg} 345 346 def match(self, database, strategy, word): 347 """Match word in database using strategy.""" 348 if not self.__connected: 349 raise DicoNotConnectedError('Not connected') 350 351 if self.levenshtein_distance and 'xlev' in self.server_capas: 352 res = self.__send_xlev(self.levenshtein_distance) 353 code, msg = res[-1].split(' ', 1) 354 if int(code) != 250 and self.verbose: 355 self.__debug('Server rejected XLEV command') 356 self.__debug('Server reply: %s' % msg) 357 358 database = database.replace('"', "\\\"") 359 strategy = strategy.replace('"', "\\\"") 360 word = word.replace('"', "\\\"") 361 362 res = self.__send_match(database, strategy, word) 363 code, msg = res[-1].split(' ', 1) 364 if int(code) < 500: 365 if self.mime: 366 mimeinfo = self.__get_mime(res[1]) 367 mts_refs = res[1:-1][0] 368 mts = {} 369 for i in mts_refs: 370 db, term = i.split(' ', 1) 371 if db in mts: 372 mts[db].append(self.__unquote(term)) 373 else: 374 mts[db] = [self.__unquote(term)] 375 dct = { 376 'matches': mts, 377 } 378 return dct 379 else: 380 return {'error': code, 'msg': msg} 381 382 def xlev(self, distance): 383 """Set Levenshtein distance.""" 384 self.levenshtein_distance = distance 385 res = self.__send_xlev(distance) 386 code, msg = res[0].split(' ', 1) 387 if int(code) == 250: 388 return True 389 return False 390 391 def __unquote(self, s): 392 s = s.replace("\\\\'", "'") 393 if s[0] == '"' and s[-1] == '"': 394 s = s[1:-1] 395 try: 396 s = self.__decode(s) 397 except UnicodeEncodeError: 398 pass 399 return s 400 401 def __decode(self, encoded): 402 for octc in (c for c in re.findall(r'\\(\d{3})', encoded)): 403 encoded = encoded.replace(r'\%s' % octc, chr(int(octc, 8))) 404 return encoded 405 406 def __debug(self, msg): 407 print('dico: Debug: %s' % msg) 408 409 410class DicoNotConnectedError (Exception): 411 412 def __init__(self, value): 413 self.parameter = value 414 415 def __str__(self): 416 return repr(self.parameter) 417