1# This Python file uses the following encoding: utf-8
2from collections.abc import Mapping
3from io import StringIO
4
5import cProfile
6import logging
7import pstats
8import re
9import traceback
10from functools import wraps
11from logging import debug, error
12from aistate_interface import get_aistate
13
14import freeOrionAIInterface as fo  # pylint: disable=import-error
15
16# color wrappers for chat:
17RED = '<rgba 255 0 0 255>%s</rgba>'
18WHITE = '<rgba 255 255 255 255>%s</rgba>'
19
20
21def dict_from_map(thismap):
22    """Convert C++ map to python dict."""
23    return {el.key(): el.data() for el in thismap}
24
25
26def dict_from_map_recursive(thismap):
27    retval = {}
28    try:
29        for el in thismap:
30            retval[el.key()] = dict_from_map_recursive(el.data())
31        return retval
32    except Exception:
33        return dict_from_map(thismap)
34
35
36def get_ai_tag_grade(tag_list, tag_type):
37    """
38    Accepts a list of string tags and a tag_type (like 'WEAPONS').
39    Checks for the first tag in the list (if any), for tag_type "TYPE",
40    having the structure X_TYPE
41    and then returns 'X'
42    X is most commonly (but not necessarily) one of [NO, BAD, AVERAGE, GOOD, GREAT, ULTIMATE]
43    If no matching tags, returns empty string (which for most types should be considered equivalent to AVERAGE)
44    """
45    for tag in [tag_ for tag_ in tag_list if tag_.count("_") > 0]:
46        parts = tag.split("_", 1)
47        if parts[1] == tag_type.upper():
48            return parts[0]
49    return ""
50
51
52# this name left with C naming style for compatibility with translation assistance procedures
53def UserString(label, default=None):  # pylint: disable=invalid-name
54    """
55    A translation assistance tool is intended to search for this method to identify translatable strings.
56
57    :param label: a UserString key
58    :param default: a default value to return if there is a key error
59    :return: a translated string for the label
60    """
61
62    table_string = fo.userString(label)
63
64    if "ERROR: " + label in table_string:  # implement test for string lookup not found error
65        return default or table_string
66    else:
67        return table_string
68
69
70# this name left with C naming style for compatibility with translation assistance procedures
71def UserStringList(label):  # pylint: disable=invalid-name
72    """
73    A translation assistance tool is intended to search for this method to identify translatable strings.
74
75    :param label: a UserString key
76    :return: a python list of translated strings from the UserString list identified by the label
77    """
78
79    return fo.userStringList(label)
80
81
82def tech_is_complete(tech):
83    """
84    Return if tech is complete.
85    """
86    return fo.getEmpire().techResearched(tech)
87
88
89def ppstring(foo):
90    """
91    Returns a string version of lists, dicts, sets, such that entries with special characters will be
92    printed in legible string format rather than as hex escape characters, i.e.,
93    ['Asimov α'] rather than ['Asimov \xce\xb1']."""
94
95    if isinstance(foo, list):
96        return "[" + ",".join(map(ppstring, foo)) + "]"
97    elif isinstance(foo, dict):
98        return "{" + ",".join([ppstring(k) + ":" + ppstring(v) for k, v in foo.items()]) + "}"
99    elif isinstance(foo, tuple):
100        return "(" + ",".join(map(ppstring, foo)) + ")"
101    elif isinstance(foo, set) or isinstance(foo, frozenset):
102        return "{" + ",".join(map(ppstring, foo)) + "}"
103    elif isinstance(foo, str):
104        return "'" + foo + "'"
105    else:
106        return str(foo)
107
108
109class ConsoleLogHandler(logging.Handler):
110    """A log handler to send errors to the console. """
111    def emit(self, record):
112        """Emit a record.
113
114        If a formatter is specified, it is used to format the record and then sent to human players. """
115        try:
116            human_ids = [x for x in fo.allPlayerIDs() if fo.playerIsHost(x)]
117            if not human_ids:
118                return
119            msg = self.format(record)
120
121            for human_id in human_ids:
122                fo.sendChatMessage(human_id, msg)
123        except (KeyboardInterrupt, SystemExit):
124            raise
125        # Hide errors from within the ConsoleLogHandler
126        except:  # noqa: E722
127            self.handleError(record)
128
129
130# Create the log handler, format it and attach it to the root logger
131console_handler = ConsoleLogHandler()
132
133console_handler.setFormatter(
134    logging.Formatter(RED % ('%s : %%(filename)s:%%(funcName)s():%%(lineno)d  - %%(message)s'
135                             % fo.userString('AI_ERROR_MSG'))))
136
137console_handler.setLevel(logging.ERROR)
138
139logging.getLogger().addHandler(console_handler)
140
141
142def remove_tags(message):
143    """Remove tags described in Font.h from message."""
144    expr = r'</?(i|u|(rgba ([0-1]\.)?\d+ ([0-1]\.)?\d+ ([0-1]\.)?\d+ ([0-1]\.)?\d+)|rgba|left|center|right|pre)>'
145    return re.sub(expr, '', message)
146
147
148def chat_human(message):
149    """
150    Send chat message to human and print it to log.
151    Log message cleared form tags.
152    """
153    human_id = [x for x in fo.allPlayerIDs() if fo.playerIsHost(x)][0]
154    message = str(message)
155    fo.sendChatMessage(human_id, message)
156    debug("Chat Message to human: %s", remove_tags(message))
157
158
159def cache_for_session(func):
160    """
161    Cache a function value for current session.
162
163    Wraps only functions with hashable arguments.
164    Use this only if the called function return value is constant throughout the game.
165    """
166    _cache = {}
167
168    @wraps(func)
169    def wrapper(*args, **kwargs):
170        key = (func, args, tuple(kwargs.items()))
171        if key in _cache:
172            return _cache[key]
173        res = func(*args, **kwargs)
174        _cache[key] = res
175        return res
176    wrapper._cache = _cache
177    return wrapper
178
179
180def cache_for_current_turn(func):
181    """
182    Cache a function value updated each turn.
183
184    The cache is non-persistent through loading a game.
185    Wraps only functions with hashable arguments.
186    """
187    _cache = {}
188
189    @wraps(func)
190    def wrapper(*args, **kwargs):
191        key = (func, args, tuple(kwargs.items()))
192        this_turn = fo.currentTurn()
193        if key in _cache and _cache[key][0] == this_turn:
194            return _cache[key][1]
195        res = func(*args, **kwargs)
196        _cache[key] = (this_turn, res)
197        return res
198    wrapper._cache = _cache
199    return wrapper
200
201
202def cache_by_turn_persistent(func):
203    """
204    Cache a function value by turn, persistent through loading a game.
205
206    It will also provides a history that may be analysed.
207    The cache is keyed by the original function name. It only wraps functions without arguments.
208
209    As the result is stored in AIstate, its type must be trusted by the savegame_codec module.
210    """
211    @wraps(func)
212    def wrapper():
213        if get_aistate() is None:
214            return func()
215        else:
216            cache = get_aistate().misc.setdefault('caches', {}).setdefault(func.__name__, {})
217            this_turn = fo.currentTurn()
218            return cache[this_turn] if this_turn in cache else cache.setdefault(this_turn, func())
219    return wrapper
220
221
222def dict_to_tuple(dic):
223    return tuple(dic.items())
224
225
226def tuple_to_dict(tup):
227    try:
228        return dict(tup)
229    except TypeError:
230        try:
231            return {k: v for k, v in [tup]}
232        except:  # noqa: E722
233            error("Can't convert tuple_list to dict: %s", tup)
234            return {}
235
236
237def profile(func):
238
239    @wraps(func)
240    def wrapper(*args, **kwargs):
241        pr = cProfile.Profile()
242        pr.enable()
243        retval = func(*args, **kwargs)
244        pr.disable()
245        s = StringIO()
246        sortby = 'cumulative'
247        ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
248        ps.print_stats()
249        debug(s.getvalue())
250        return retval
251
252    return wrapper
253
254
255@cache_for_current_turn
256def get_partial_visibility_turn(obj_id):
257    """Return the last turn an object had at least partial visibility.
258
259    :type obj_id: int
260    :return: Last turn an object had at least partial visibility, -9999 if never
261    :rtype: int
262    """
263    visibility_turns_map = fo.getUniverse().getVisibilityTurnsMap(obj_id, fo.empireID())
264    return visibility_turns_map.get(fo.visibility.partial, -9999)
265
266
267class ReadOnlyDict(Mapping):
268    """A dict that offers only read access.
269
270     Note that if the values of the ReadOnlyDict are mutable,
271     then those objects may actually be changed.
272
273     It is strongly advised to store only immutable objects.
274     A slight protection is offered by checking for hashability of the values.
275
276      Example usage:
277      my_dict = ReadOnlyDict({1:2, 3:4})
278      print my_dict[1]
279      for k in my_dict:
280          print my_dict.get(k, -1)
281      for k in my_dict.keys():
282          print my_dict[k]
283      for k, v in my_dict.iteritems():
284          print k, v
285      my_dict[5] = 4  # throws TypeError
286      del my_dict[1]  # throws TypeError
287
288      Implementation note:
289
290     The checks that values are hashable is the main difference from the built-in types.MappingProxyType.
291     MappingProxyType has slightly different signature and cannot be inherited.
292     """
293
294    def __init__(self, *args, **kwargs):
295        self._data = dict(*args, **kwargs)
296        for k, v in self._data.items():
297            try:
298                hash(v)
299            except TypeError:
300                error("Tried to store a non-hashable value in ReadOnlyDict")
301                raise
302
303    def __getitem__(self, item):
304        return self._data[item]
305
306    def __iter__(self):
307        return iter(self._data)
308
309    def __len__(self):
310        return len(self._data)
311
312    def __str__(self):
313        return str(self._data)
314
315
316def dump_universe():
317    """Dump the universe but not more than once per turn."""
318    cur_turn = fo.currentTurn()
319
320    if (not hasattr(dump_universe, "last_dump") or
321            dump_universe.last_dump < cur_turn):
322        dump_universe.last_dump = cur_turn
323        fo.getUniverse().dump()  # goes to debug logger
324
325
326class LogLevelSwitcher:
327    """A context manager class which controls the log level within its scope.
328
329    Example usage:
330    logging.getLogger().setLevel(logging.INFO)
331
332    debug("Some message")  # not printed because of log level
333    with LogLevelSwitcher(logging.DEBUG):
334        debug("foo")  # printed because we set to DEBUG level
335
336    debug("baz")  # not printed, we are back to INFO level
337    """
338    def __init__(self, log_level):
339        self.target_log_level = log_level
340        self.old_log_level = 0
341
342    def __enter__(self):
343        self.old_log_level = logging.getLogger().level
344        logging.getLogger().setLevel(self.target_log_level)
345
346    def __exit__(self, exc_type, exc_val, exc_tb):
347        logging.getLogger().setLevel(self.old_log_level)
348
349
350def with_log_level(log_level):
351    """A decorator to set a specific logging level for the function call.
352
353    This decorator is useful to selectively activate debugging for a specific function
354    while the rest of the code base only logs at a higher level.
355
356    If functions are called within the decorated function, then those will be
357    executed with the same logging level. However, if those functions use this
358    decorator as well, then that logging level will be respected for its scope.
359
360    Example usage:
361    @log_with_specific_log_level(logging.DEBUG)
362    def foo():
363        debug("debug stuff")
364    """
365    def decorator(func):
366        @wraps(func)
367        def wrapper(*args, **kwargs):
368            with LogLevelSwitcher(log_level):
369                return func(*args, **kwargs)
370
371        return wrapper
372    return decorator
373
374
375def assertion_fails(cond, msg="", logger=logging.error):
376    """
377    Check if condition fails and if so, log a traceback but raise no Exception.
378
379    This is a useful functions for generic sanity checks and may be used to
380    replace manual error logging with more context provided by the traceback.
381
382    :param bool cond: condition to be asserted
383    :param str msg: additional info to be logged
384    :param func logger: may be used to override default log level (error)
385    :return: True if assertion failed, otherwise false.
386    """
387    if cond:
388        return False
389
390    if msg:
391        header = "Assertion failed: %s." % msg
392    else:
393        header = "Assertion failed!"
394    stack = traceback.extract_stack()[:-1]  # do not log this function
395    logger("%s Traceback (most recent call last): %s", header,
396           ''.join(traceback.format_list(stack)))
397    return True
398
399
400@cache_for_session
401def get_species_tag_grade(species_name, tag_type):
402    if not species_name:
403        return ""
404    species = fo.getSpecies(species_name)
405    if assertion_fails(species is not None):
406        return ""
407
408    return get_ai_tag_grade(species.tags, tag_type)
409