1# This Python file uses the following encoding: utf-8 2from collections.abc import Mapping 3from io import StringIO 4 5import cProfile 6import logging 7import pstats 8import re 9import traceback 10from functools import wraps 11from logging import debug, error 12from aistate_interface import get_aistate 13 14import freeOrionAIInterface as fo # pylint: disable=import-error 15 16# color wrappers for chat: 17RED = '<rgba 255 0 0 255>%s</rgba>' 18WHITE = '<rgba 255 255 255 255>%s</rgba>' 19 20 21def dict_from_map(thismap): 22 """Convert C++ map to python dict.""" 23 return {el.key(): el.data() for el in thismap} 24 25 26def dict_from_map_recursive(thismap): 27 retval = {} 28 try: 29 for el in thismap: 30 retval[el.key()] = dict_from_map_recursive(el.data()) 31 return retval 32 except Exception: 33 return dict_from_map(thismap) 34 35 36def get_ai_tag_grade(tag_list, tag_type): 37 """ 38 Accepts a list of string tags and a tag_type (like 'WEAPONS'). 39 Checks for the first tag in the list (if any), for tag_type "TYPE", 40 having the structure X_TYPE 41 and then returns 'X' 42 X is most commonly (but not necessarily) one of [NO, BAD, AVERAGE, GOOD, GREAT, ULTIMATE] 43 If no matching tags, returns empty string (which for most types should be considered equivalent to AVERAGE) 44 """ 45 for tag in [tag_ for tag_ in tag_list if tag_.count("_") > 0]: 46 parts = tag.split("_", 1) 47 if parts[1] == tag_type.upper(): 48 return parts[0] 49 return "" 50 51 52# this name left with C naming style for compatibility with translation assistance procedures 53def UserString(label, default=None): # pylint: disable=invalid-name 54 """ 55 A translation assistance tool is intended to search for this method to identify translatable strings. 56 57 :param label: a UserString key 58 :param default: a default value to return if there is a key error 59 :return: a translated string for the label 60 """ 61 62 table_string = fo.userString(label) 63 64 if "ERROR: " + label in table_string: # implement test for string lookup not found error 65 return default or table_string 66 else: 67 return table_string 68 69 70# this name left with C naming style for compatibility with translation assistance procedures 71def UserStringList(label): # pylint: disable=invalid-name 72 """ 73 A translation assistance tool is intended to search for this method to identify translatable strings. 74 75 :param label: a UserString key 76 :return: a python list of translated strings from the UserString list identified by the label 77 """ 78 79 return fo.userStringList(label) 80 81 82def tech_is_complete(tech): 83 """ 84 Return if tech is complete. 85 """ 86 return fo.getEmpire().techResearched(tech) 87 88 89def ppstring(foo): 90 """ 91 Returns a string version of lists, dicts, sets, such that entries with special characters will be 92 printed in legible string format rather than as hex escape characters, i.e., 93 ['Asimov α'] rather than ['Asimov \xce\xb1'].""" 94 95 if isinstance(foo, list): 96 return "[" + ",".join(map(ppstring, foo)) + "]" 97 elif isinstance(foo, dict): 98 return "{" + ",".join([ppstring(k) + ":" + ppstring(v) for k, v in foo.items()]) + "}" 99 elif isinstance(foo, tuple): 100 return "(" + ",".join(map(ppstring, foo)) + ")" 101 elif isinstance(foo, set) or isinstance(foo, frozenset): 102 return "{" + ",".join(map(ppstring, foo)) + "}" 103 elif isinstance(foo, str): 104 return "'" + foo + "'" 105 else: 106 return str(foo) 107 108 109class ConsoleLogHandler(logging.Handler): 110 """A log handler to send errors to the console. """ 111 def emit(self, record): 112 """Emit a record. 113 114 If a formatter is specified, it is used to format the record and then sent to human players. """ 115 try: 116 human_ids = [x for x in fo.allPlayerIDs() if fo.playerIsHost(x)] 117 if not human_ids: 118 return 119 msg = self.format(record) 120 121 for human_id in human_ids: 122 fo.sendChatMessage(human_id, msg) 123 except (KeyboardInterrupt, SystemExit): 124 raise 125 # Hide errors from within the ConsoleLogHandler 126 except: # noqa: E722 127 self.handleError(record) 128 129 130# Create the log handler, format it and attach it to the root logger 131console_handler = ConsoleLogHandler() 132 133console_handler.setFormatter( 134 logging.Formatter(RED % ('%s : %%(filename)s:%%(funcName)s():%%(lineno)d - %%(message)s' 135 % fo.userString('AI_ERROR_MSG')))) 136 137console_handler.setLevel(logging.ERROR) 138 139logging.getLogger().addHandler(console_handler) 140 141 142def remove_tags(message): 143 """Remove tags described in Font.h from message.""" 144 expr = r'</?(i|u|(rgba ([0-1]\.)?\d+ ([0-1]\.)?\d+ ([0-1]\.)?\d+ ([0-1]\.)?\d+)|rgba|left|center|right|pre)>' 145 return re.sub(expr, '', message) 146 147 148def chat_human(message): 149 """ 150 Send chat message to human and print it to log. 151 Log message cleared form tags. 152 """ 153 human_id = [x for x in fo.allPlayerIDs() if fo.playerIsHost(x)][0] 154 message = str(message) 155 fo.sendChatMessage(human_id, message) 156 debug("Chat Message to human: %s", remove_tags(message)) 157 158 159def cache_for_session(func): 160 """ 161 Cache a function value for current session. 162 163 Wraps only functions with hashable arguments. 164 Use this only if the called function return value is constant throughout the game. 165 """ 166 _cache = {} 167 168 @wraps(func) 169 def wrapper(*args, **kwargs): 170 key = (func, args, tuple(kwargs.items())) 171 if key in _cache: 172 return _cache[key] 173 res = func(*args, **kwargs) 174 _cache[key] = res 175 return res 176 wrapper._cache = _cache 177 return wrapper 178 179 180def cache_for_current_turn(func): 181 """ 182 Cache a function value updated each turn. 183 184 The cache is non-persistent through loading a game. 185 Wraps only functions with hashable arguments. 186 """ 187 _cache = {} 188 189 @wraps(func) 190 def wrapper(*args, **kwargs): 191 key = (func, args, tuple(kwargs.items())) 192 this_turn = fo.currentTurn() 193 if key in _cache and _cache[key][0] == this_turn: 194 return _cache[key][1] 195 res = func(*args, **kwargs) 196 _cache[key] = (this_turn, res) 197 return res 198 wrapper._cache = _cache 199 return wrapper 200 201 202def cache_by_turn_persistent(func): 203 """ 204 Cache a function value by turn, persistent through loading a game. 205 206 It will also provides a history that may be analysed. 207 The cache is keyed by the original function name. It only wraps functions without arguments. 208 209 As the result is stored in AIstate, its type must be trusted by the savegame_codec module. 210 """ 211 @wraps(func) 212 def wrapper(): 213 if get_aistate() is None: 214 return func() 215 else: 216 cache = get_aistate().misc.setdefault('caches', {}).setdefault(func.__name__, {}) 217 this_turn = fo.currentTurn() 218 return cache[this_turn] if this_turn in cache else cache.setdefault(this_turn, func()) 219 return wrapper 220 221 222def dict_to_tuple(dic): 223 return tuple(dic.items()) 224 225 226def tuple_to_dict(tup): 227 try: 228 return dict(tup) 229 except TypeError: 230 try: 231 return {k: v for k, v in [tup]} 232 except: # noqa: E722 233 error("Can't convert tuple_list to dict: %s", tup) 234 return {} 235 236 237def profile(func): 238 239 @wraps(func) 240 def wrapper(*args, **kwargs): 241 pr = cProfile.Profile() 242 pr.enable() 243 retval = func(*args, **kwargs) 244 pr.disable() 245 s = StringIO() 246 sortby = 'cumulative' 247 ps = pstats.Stats(pr, stream=s).sort_stats(sortby) 248 ps.print_stats() 249 debug(s.getvalue()) 250 return retval 251 252 return wrapper 253 254 255@cache_for_current_turn 256def get_partial_visibility_turn(obj_id): 257 """Return the last turn an object had at least partial visibility. 258 259 :type obj_id: int 260 :return: Last turn an object had at least partial visibility, -9999 if never 261 :rtype: int 262 """ 263 visibility_turns_map = fo.getUniverse().getVisibilityTurnsMap(obj_id, fo.empireID()) 264 return visibility_turns_map.get(fo.visibility.partial, -9999) 265 266 267class ReadOnlyDict(Mapping): 268 """A dict that offers only read access. 269 270 Note that if the values of the ReadOnlyDict are mutable, 271 then those objects may actually be changed. 272 273 It is strongly advised to store only immutable objects. 274 A slight protection is offered by checking for hashability of the values. 275 276 Example usage: 277 my_dict = ReadOnlyDict({1:2, 3:4}) 278 print my_dict[1] 279 for k in my_dict: 280 print my_dict.get(k, -1) 281 for k in my_dict.keys(): 282 print my_dict[k] 283 for k, v in my_dict.iteritems(): 284 print k, v 285 my_dict[5] = 4 # throws TypeError 286 del my_dict[1] # throws TypeError 287 288 Implementation note: 289 290 The checks that values are hashable is the main difference from the built-in types.MappingProxyType. 291 MappingProxyType has slightly different signature and cannot be inherited. 292 """ 293 294 def __init__(self, *args, **kwargs): 295 self._data = dict(*args, **kwargs) 296 for k, v in self._data.items(): 297 try: 298 hash(v) 299 except TypeError: 300 error("Tried to store a non-hashable value in ReadOnlyDict") 301 raise 302 303 def __getitem__(self, item): 304 return self._data[item] 305 306 def __iter__(self): 307 return iter(self._data) 308 309 def __len__(self): 310 return len(self._data) 311 312 def __str__(self): 313 return str(self._data) 314 315 316def dump_universe(): 317 """Dump the universe but not more than once per turn.""" 318 cur_turn = fo.currentTurn() 319 320 if (not hasattr(dump_universe, "last_dump") or 321 dump_universe.last_dump < cur_turn): 322 dump_universe.last_dump = cur_turn 323 fo.getUniverse().dump() # goes to debug logger 324 325 326class LogLevelSwitcher: 327 """A context manager class which controls the log level within its scope. 328 329 Example usage: 330 logging.getLogger().setLevel(logging.INFO) 331 332 debug("Some message") # not printed because of log level 333 with LogLevelSwitcher(logging.DEBUG): 334 debug("foo") # printed because we set to DEBUG level 335 336 debug("baz") # not printed, we are back to INFO level 337 """ 338 def __init__(self, log_level): 339 self.target_log_level = log_level 340 self.old_log_level = 0 341 342 def __enter__(self): 343 self.old_log_level = logging.getLogger().level 344 logging.getLogger().setLevel(self.target_log_level) 345 346 def __exit__(self, exc_type, exc_val, exc_tb): 347 logging.getLogger().setLevel(self.old_log_level) 348 349 350def with_log_level(log_level): 351 """A decorator to set a specific logging level for the function call. 352 353 This decorator is useful to selectively activate debugging for a specific function 354 while the rest of the code base only logs at a higher level. 355 356 If functions are called within the decorated function, then those will be 357 executed with the same logging level. However, if those functions use this 358 decorator as well, then that logging level will be respected for its scope. 359 360 Example usage: 361 @log_with_specific_log_level(logging.DEBUG) 362 def foo(): 363 debug("debug stuff") 364 """ 365 def decorator(func): 366 @wraps(func) 367 def wrapper(*args, **kwargs): 368 with LogLevelSwitcher(log_level): 369 return func(*args, **kwargs) 370 371 return wrapper 372 return decorator 373 374 375def assertion_fails(cond, msg="", logger=logging.error): 376 """ 377 Check if condition fails and if so, log a traceback but raise no Exception. 378 379 This is a useful functions for generic sanity checks and may be used to 380 replace manual error logging with more context provided by the traceback. 381 382 :param bool cond: condition to be asserted 383 :param str msg: additional info to be logged 384 :param func logger: may be used to override default log level (error) 385 :return: True if assertion failed, otherwise false. 386 """ 387 if cond: 388 return False 389 390 if msg: 391 header = "Assertion failed: %s." % msg 392 else: 393 header = "Assertion failed!" 394 stack = traceback.extract_stack()[:-1] # do not log this function 395 logger("%s Traceback (most recent call last): %s", header, 396 ''.join(traceback.format_list(stack))) 397 return True 398 399 400@cache_for_session 401def get_species_tag_grade(species_name, tag_type): 402 if not species_name: 403 return "" 404 species = fo.getSpecies(species_name) 405 if assertion_fails(species is not None): 406 return "" 407 408 return get_ai_tag_grade(species.tags, tag_type) 409