1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3"""" 4Requires: python 3.6+; Windows 7+ 5Code format: PEP-8; line breaks at 120; Black default formatting. 6 7Limits a target process to a given amount of memory. 8If the process use more memory than the given limit, the process allocations 9will start to fail. 10 11``` 12$ python .\\windows_limit_memory.py --help 13usage: windows_limit_memory.py [-h] 14 [-l {NOTSET,DEBUG,INFO,WARNING,ERROR,CRITICAL}] 15 [-m MEMORY] 16 {process,pid} ... 17 18C:DDA Memory Limit test script. 19 20positional arguments: 21 {process,pid} help for sub-commands 22 process Start a new process. 23 pid Enforce limit on a given process pid. 24 25optional arguments: 26 -h, --help show this help message and exit 27 -l,--log-level {NOTSET,DEBUG,INFO,WARNING,ERROR,CRITICAL} 28 Set the logging level. 29 -m MEMORY, --memory MEMORY 30 Maximum process memory size in MiB. 31``` 32 33Examples: 34 ; limiting memory of process with PID 22764 to 50 MiB. 35 $ python windows_limit_memory.py -m 50 pid 22764 36 37 ; starting process "cataclysm-tiles.exe" and limiting its memory to 1GiB. 38 $ python windows_limit_memory.py -m 1024 process \ 39 z:\\CDDA\\Cataclysm-tiles.exe 40""" 41 42import argparse 43import ctypes 44import logging 45import pathlib 46import platform 47import sys 48from typing import Any, Dict, Optional 49 50logger = logging.getLogger(__name__) 51 52# 53# Windows basic types 54# 55LPCWSTR = ctypes.c_wchar_p 56BOOL = ctypes.c_long 57HANDLE = ctypes.c_void_p 58WORD = ctypes.c_uint16 59DWORD = ctypes.c_uint32 60if ctypes.sizeof(ctypes.c_void_p) == 8: 61 ULONG_PTR = ctypes.c_ulonglong 62else: 63 ULONG_PTR = ctypes.c_ulong 64SIZE_T = ULONG_PTR 65PVOID = ctypes.c_void_p 66LPVOID = PVOID 67LPDWORD = ctypes.POINTER(DWORD) 68PULONG_PTR = ctypes.POINTER(ULONG_PTR) 69 70# 71# Windows defines 72# 73CREATE_SUSPENDED = 0x4 74CREATE_BREAKAWAY_FROM_JOB = 0x01000000 75 76# -1 on 32 or 64-bit. 77INVALID_HANDLE_VALUE = 2 ** (ctypes.sizeof(ctypes.c_void_p) * 8) - 1 78JobObjectAssociateCompletionPortInformation = 7 79JobObjectExtendedLimitInformation = 9 80JOB_OBJECT_LIMIT_PROCESS_MEMORY = 0x100 81PROCESS_SET_QUOTA = 0x100 82PROCESS_TERMINATE = 0x1 83INFINITE = 0xFFFFFFFF 84 85# Completion Port Messages for job objects 86JOB_OBJECT_MSG_END_OF_JOB_TIME = 1 87JOB_OBJECT_MSG_END_OF_PROCESS_TIME = 2 88JOB_OBJECT_MSG_ACTIVE_PROCESS_LIMIT = 3 89JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO = 4 90JOB_OBJECT_MSG_NEW_PROCESS = 6 91JOB_OBJECT_MSG_EXIT_PROCESS = 7 92JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS = 8 93JOB_OBJECT_MSG_PROCESS_MEMORY_LIMIT = 9 94JOB_OBJECT_MSG_JOB_MEMORY_LIMIT = 10 95JOB_OBJECT_MSG_NOTIFICATION_LIMIT = 11 96JOB_OBJECT_MSG_JOB_CYCLE_TIME_LIMIT = 12 97JOB_OBJECT_MSG_SILO_TERMINATED = 13 98 99# 100# Windows Structures 101# 102 103 104class SECURITY_ATTRIBUTES(ctypes.Structure): 105 _fields_ = [("nLength", DWORD), ("lpSecurityDescriptor", LPVOID), 106 ("bInheritHandle", BOOL)] 107 108 109class STARTUPINFO(ctypes.Structure): 110 _fields_ = [ 111 ("cb", DWORD), 112 ("lpReserved", LPCWSTR), 113 ("lpDesktop", LPCWSTR), 114 ("lpTitle", LPCWSTR), 115 ("dwX", DWORD), 116 ("dwY", DWORD), 117 ("dwXSize", DWORD), 118 ("dwYSize", DWORD), 119 ("dwXCountChars", DWORD), 120 ("dwYCountChars", DWORD), 121 ("dwFillAttribute", DWORD), 122 ("dwFlags", DWORD), 123 ("wShowWindow", WORD), 124 ("cbReserved2", WORD), 125 ("lpReserved2", LPVOID), 126 ("hStdInput", HANDLE), 127 ("hStdOutput", HANDLE), 128 ("hStdError", HANDLE), 129 ] 130 131 132class PROCESS_INFORMATION(ctypes.Structure): 133 _fields_ = [("hProcess", HANDLE), ("hThread", HANDLE), 134 ("dwProcessId", DWORD), ("dwThreadId", DWORD)] 135 136 137class IO_COUNTERS(ctypes.Structure): 138 _fields_ = [ 139 ('ReadOperationCount', ctypes.c_ulonglong), 140 ('WriteOperationCount', ctypes.c_ulonglong), 141 ('OtherOperationCount', ctypes.c_ulonglong), 142 ('ReadTransferCount', ctypes.c_ulonglong), 143 ('WriteTransferCount', ctypes.c_ulonglong), 144 ('OtherTransferCount', ctypes.c_ulonglong), 145 ] 146 147 148class JOBOBJECT_BASIC_LIMIT_INFORMATION(ctypes.Structure): 149 _fields_ = [ 150 ('PerProcessUserTimeLimit', ctypes.c_int64), 151 ('PerJobUserTimeLimit', ctypes.c_int64), 152 ('LimitFlags', ctypes.c_uint32), 153 ('MinimumWorkingSetSize', ctypes.c_ulonglong), 154 ('MaximumWorkingSetSize', ctypes.c_ulonglong), 155 ('ActiveProcessLimit', ctypes.c_uint32), 156 ('Affinity', ctypes.c_void_p), 157 ('PriorityClass', ctypes.c_uint32), 158 ('SchedulingClass', ctypes.c_uint32), 159 ] 160 161 162class JOBOBJECT_EXTENDED_LIMIT_INFORMATION(ctypes.Structure): 163 _fields_ = [ 164 ('BasicLimitInformation', JOBOBJECT_BASIC_LIMIT_INFORMATION), 165 ('IoInfo', IO_COUNTERS), 166 ('ProcessMemoryLimit', ctypes.c_ulonglong), 167 ('JobMemoryLimit', ctypes.c_ulonglong), 168 ('PeakProcessMemoryUsed', ctypes.c_ulonglong), 169 ('PeakJobMemoryUsed', ctypes.c_ulonglong), 170 ] 171 172 173class JOBOBJECT_ASSOCIATE_COMPLETION_PORT(ctypes.Structure): 174 _fields_ = [('CompletionKey', LPVOID), ('CompletionPort', HANDLE)] 175 176 177class OVERLAPPED(ctypes.Structure): 178 _fields_ = [("Internal", ULONG_PTR), ("InternalHigh", ULONG_PTR), 179 ("Pointer", LPVOID), ("hEvent", HANDLE)] 180 181 182LPSECURITY_ATTRIBUTES = ctypes.POINTER(SECURITY_ATTRIBUTES) 183LPSTARTUPINFO = ctypes.POINTER(STARTUPINFO) 184LPPROCESS_INFORMATION = ctypes.POINTER(PROCESS_INFORMATION) 185LPOVERLAPPED = ctypes.POINTER(OVERLAPPED) 186 187 188class Kernel32Wrapper: 189 """A class used to encapsulate kernel32 library functions. 190 """ 191 192 def __init__(self) -> None: 193 """Initialization. 194 """ 195 # get kernel32 instance 196 self._kernel32: ctypes.WinDLL = ctypes.WinDLL( 197 "kernel32", use_last_error=True) 198 199 # 200 # fetch functions 201 # 202 203 self._create_job_object = self._kernel32.CreateJobObjectW 204 self._create_job_object.argtypes = (LPSECURITY_ATTRIBUTES, LPCWSTR) 205 self._create_job_object.restype = HANDLE 206 207 self._query_information_job_object = \ 208 self._kernel32.QueryInformationJobObject 209 self._query_information_job_object.argtypes = ( 210 HANDLE, ctypes.c_uint32, LPVOID, DWORD, ctypes.POINTER(DWORD)) 211 self._query_information_job_object.restype = BOOL 212 213 self._set_information_job_object = \ 214 self._kernel32.SetInformationJobObject 215 self._set_information_job_object.argtypes = ( 216 HANDLE, ctypes.c_uint32, LPVOID, DWORD) 217 self._set_information_job_object.restype = BOOL 218 219 self._assign_process_to_job_object = \ 220 self._kernel32.AssignProcessToJobObject 221 self._assign_process_to_job_object.argtypes = (HANDLE, HANDLE) 222 self._assign_process_to_job_object.restype = BOOL 223 224 self._create_process = self._kernel32.CreateProcessW 225 self._create_process.argtypes = ( 226 LPCWSTR, 227 LPCWSTR, 228 LPSECURITY_ATTRIBUTES, 229 LPSECURITY_ATTRIBUTES, 230 BOOL, 231 DWORD, 232 LPVOID, 233 LPCWSTR, 234 LPSTARTUPINFO, 235 LPPROCESS_INFORMATION, 236 ) 237 self._create_process.restype = BOOL 238 239 self._open_process = self._kernel32.OpenProcess 240 self._open_process.argtypes = (DWORD, BOOL, HANDLE) 241 self._open_process.restype = HANDLE 242 243 self._create_io_completion_port = self._kernel32.CreateIoCompletionPort 244 self._create_io_completion_port.argtypes = ( 245 HANDLE, HANDLE, ULONG_PTR, DWORD) 246 self._create_io_completion_port.restype = HANDLE 247 248 self._get_queued_completion_status = \ 249 self._kernel32.GetQueuedCompletionStatus 250 self._get_queued_completion_status.argtypes = ( 251 HANDLE, LPDWORD, PULONG_PTR, ctypes.POINTER(LPOVERLAPPED), 252 DWORD) 253 self._get_queued_completion_status.restype = BOOL 254 255 self._resume_thread = self._kernel32.ResumeThread 256 self._resume_thread.argtypes = (HANDLE,) 257 self._resume_thread.restype = DWORD 258 259 self._terminate_process = self._kernel32.TerminateProcess 260 self._terminate_process.argtypes = (HANDLE, ctypes.c_uint32) 261 self._terminate_process.restype = BOOL 262 263 self._close_handle = self._kernel32.CloseHandle 264 self._close_handle.argtypes = (HANDLE,) 265 self._close_handle.restype = BOOL 266 267 self._function_map = { 268 "AssignProcessToJobObject": self._assign_process_to_job_object, 269 "CloseHandle": self._close_handle, 270 "CreateIoCompletionPort": self._create_io_completion_port, 271 "CreateJobObject": self._create_job_object, 272 "CreateProcess": self._create_process, 273 "GetQueuedCompletionStatus": self._get_queued_completion_status, 274 "OpenProcess": self._open_process, 275 "QueryInformationJobObject": self._query_information_job_object, 276 "ResumeThread": self._resume_thread, 277 "TerminateProcess": self._terminate_process, 278 "SetInformationJobObject": self._set_information_job_object, 279 } 280 281 def __getattr__(self, item: str): 282 """Get an attribute from the class. 283 284 Args: 285 item: The attribute to get. 286 287 Raises: 288 AttributeError: the given name is not a known function name. 289 290 Returns: 291 If the attribute is a function name, returns the function pointer, 292 otherwise raise an exception. 293 """ 294 func = self._function_map.get(item) 295 if func is None: 296 raise AttributeError(item) 297 return func 298 299 @staticmethod 300 def create_buffer(obj: Any, max_buffer_len: Optional[int] = None) -> str: 301 """Creates a ctypes unicode buffer given an object convertible to string. 302 303 Args: 304 obj: The object from which to create a buffer. Must be convertible 305 to `str`. 306 max_buffer_len: The maximum buffer length for the buffer. If `None` 307 is supplied, default to the length of 308 the given object string. 309 310 Returns: 311 A unicode buffer of object converted to string. 312 """ 313 str_obj = obj if isinstance(obj, str) else str(obj) 314 315 if max_buffer_len is None: 316 max_len_value = len(str_obj) + 1 317 else: 318 max_len_value = max_buffer_len 319 max_len = max(max_len_value, len(str_obj)) 320 321 return ctypes.create_unicode_buffer(str_obj, max_len) 322 323 324class ProcessLimiter: 325 """A class used to limit a process memory using Windows Jobs. 326 """ 327 328 def __init__(self) -> None: 329 """Initialization. 330 """ 331 logger.debug("instantiating kernel32 wrapper.") 332 self._kernel32: Kernel32Wrapper = Kernel32Wrapper() 333 self._handle_process: Optional[HANDLE] = None 334 self._handle_thread: Optional[HANDLE] = None 335 self._handle_job: Optional[HANDLE] = None 336 self._handle_io_port: Optional[HANDLE] = None 337 try: 338 self._handle_io_port: HANDLE = self._create_io_completion_port() 339 except ctypes.WinError: 340 pass 341 342 def __enter__(self) -> "ProcessLimiter": 343 """Context manager entry. 344 """ 345 return self 346 347 def __exit__(self, exc_type, exc_val, exc_tb) -> None: 348 """Context manager exit. 349 350 Args: 351 exc_type: exception type. 352 exc_val: exception value. 353 exc_tb: Exception trackeback. 354 """ 355 if self._handle_process: 356 self._kernel32.CloseHandle(self._handle_process) 357 self._handle_process = None 358 if self._handle_thread: 359 self._kernel32.CloseHandle(self._handle_thread) 360 self._handle_thread = None 361 if self._handle_io_port: 362 self._kernel32.CloseHandle(self._handle_io_port) 363 self._handle_io_port = None 364 if self._handle_job: 365 self._kernel32.CloseHandle(self._handle_job) 366 self._handle_job = None 367 368 @property 369 def has_io_port(self) -> bool: 370 """Get whether the current class instance holds a Windows I/O port. 371 372 Returns: 373 `True` if the class hold an I/O port, `False` otherwise. 374 """ 375 return self._handle_io_port is not None 376 377 @property 378 def is_started_process(self) -> bool: 379 """Get whether the process object was started by the class or not. 380 381 Returns: 382 True if the process was started by this class instance, False 383 otherwise. 384 """ 385 return (self._handle_process is not None and 386 self._handle_thread is not None) 387 388 def _create_io_completion_port(self) -> HANDLE: 389 """[Internal] Create an I/O completion port. 390 391 Notes: 392 The I/O port is used later in the `wait_for_job()` function. 393 394 Raises: 395 ctypes.WinError: An error occurred while creating the I/O port. 396 397 Returns: 398 A `HANDLE` to the I/O Port. 399 """ 400 logger.info("Creating IO Port") 401 handle_io_port = self._kernel32.CreateIoCompletionPort( 402 INVALID_HANDLE_VALUE, None, 0, 1) 403 if not handle_io_port: 404 raise ctypes.WinError(ctypes.get_last_error()) 405 logger.debug(f"IO Port: {handle_io_port:#x}") 406 return handle_io_port 407 408 def _query_information_job_object(self, structure: ctypes.Structure, 409 query_type: int) -> ctypes.Structure: 410 """[Internal] Retrieves limit and job state information from the job object. 411 412 Args: 413 structure: The limit or job state information. 414 query_type: The information class for the limits to be queried. 415 416 Raises: 417 ctypes.WinError: An error occurred while getting the state or limit 418 for the job object. 419 420 Returns: 421 Returns the given structure filled with the job limit or state 422 information. 423 """ 424 # query its default properties 425 return_length = DWORD(0) 426 logger.debug("Querying job object.") 427 ret_val = self._kernel32.QueryInformationJobObject( 428 self._handle_job, query_type, ctypes.byref(structure), 429 ctypes.sizeof(structure), ctypes.byref(return_length) 430 ) 431 if ret_val == 0 or return_length.value != ctypes.sizeof(structure): 432 raise ctypes.WinError(ctypes.get_last_error()) 433 return structure 434 435 def _resume_main_thread(self) -> None: 436 """[Internal] Resume the main thread of a created process. 437 438 Raises: 439 ValueError: the function was called but there is no main thread 440 handle. 441 ctypes.WinError: There was an error while trying to resume the main 442 thread of the process. Note that this is a critical condition 443 resulting in a zombie process. The code will try to kill the 444 zombie process if it happens, without any guaranty of success. 445 """ 446 if not self._handle_thread: 447 raise ValueError("Thread handle is NULL.") 448 # resume the main thread and let the process run. 449 logger.debug("Resuming main thread.") 450 ret_val = self._kernel32.ResumeThread(self._handle_thread) 451 if ret_val == -1: 452 # oops, we now have a zombie process... we'll try to kill it 453 # nonetheless. 454 logger.error( 455 "Error: ResumeThread failed but the process is started!") 456 # try to kill it 457 if 0 != self._kernel32.TerminateProcess(self._handle_process, 458 0xDEADBEEF): 459 logger.info("Successfully killed the zombie process.") 460 else: 461 logger.warning("The zombie process is sitll alive. Try to " 462 "kill it manually.") 463 # we tried to kill the process, now raise. 464 raise ctypes.WinError(ctypes.get_last_error()) 465 466 def _set_information_job_object(self, structure: ctypes.Structure, 467 set_type: int) -> None: 468 """[Internal] Set limit and job state information for the job object. 469 470 Args: 471 structure: The limit or job state information. 472 set_type: The information class for the limits to be queried. 473 474 Raises: 475 ctypes.WinError: An error occurred while setting the state or limit 476 for the job object. 477 """ 478 ret_val = self._kernel32.SetInformationJobObject( 479 self._handle_job, set_type, ctypes.byref(structure), 480 ctypes.sizeof(structure) 481 ) 482 if ret_val == 0: 483 raise ctypes.WinError(ctypes.get_last_error()) 484 485 def assign_process_to_job(self) -> None: 486 """Assign a process to a job. 487 488 Raises: 489 ValueError: There's no process or job to associate with. 490 ctypes.WinError: There was an error while associating the process 491 with the job. 492 """ 493 if not self._handle_process: 494 raise ValueError("There's no process to associate with the job.") 495 if not self._handle_job: 496 raise ValueError("There's no job.") 497 logger.info("Assigning process to job.") 498 ret_val = self._kernel32.AssignProcessToJobObject( 499 self._handle_job, self._handle_process) 500 if ret_val == 0: 501 raise ctypes.WinError(ctypes.get_last_error()) 502 503 def create_job(self, job_name: Optional[str] = None) -> None: 504 """Create a job. 505 506 Args: 507 job_name: The optional job name; can be `None`. 508 509 Notes: 510 This function will try to associate the job with an I/O completion 511 port, which is used later in the `wait_for_job` function. 512 513 Raises: 514 ctypes.WinError: There was an error while creating the job, or, if 515 a completion port exists, an error 516 occurred while associating the completion port to the job. 517 """ 518 logger.info("Creating job object.") 519 handle_job = self._kernel32.CreateJobObject(None, job_name) 520 if handle_job == 0: 521 raise ctypes.WinError(ctypes.get_last_error()) 522 logger.debug(f"Job object: {handle_job:#x}") 523 self._handle_job = handle_job 524 525 # associate io port completion, if any 526 if not self._handle_io_port: 527 return 528 job_completion_port = JOBOBJECT_ASSOCIATE_COMPLETION_PORT() 529 job_completion_port.CompletionKey = self._handle_job 530 job_completion_port.CompletionPort = self._handle_io_port 531 self._set_information_job_object( 532 job_completion_port, 533 JobObjectAssociateCompletionPortInformation) 534 535 def create_process(self, process_path: pathlib.Path, 536 command_line: Optional[str] = None) -> None: 537 """Create a new process object to be associated with the main job. 538 539 Args: 540 process_path: The path to the main binary executable. 541 command_line: The command line for the process. 542 543 Raises: 544 ctypes.WinError: An error occurred while trying to create the 545 process. 546 547 Notes: 548 The command line is prepended with the full binary path. 549 """ 550 # create the process with its main thread in a suspended state 551 logger.debug("Creating suspended process.") 552 si = STARTUPINFO() 553 si.cb = ctypes.sizeof(STARTUPINFO) 554 pi = PROCESS_INFORMATION() 555 556 full_proc_path = process_path.resolve() 557 cmd_line_str = str(full_proc_path) + ( 558 command_line if command_line is not None else "") 559 560 cmd_line = self._kernel32.create_buffer(cmd_line_str) 561 current_dir = self._kernel32.create_buffer(str(full_proc_path.parent)) 562 563 ret_val = self._kernel32.CreateProcess( 564 None, # lpApplicationName 565 cmd_line, # lpCommandLine 566 None, # lpProcessAttributes 567 None, # lpThreadAttributes 568 False, # bInheritHandles 569 CREATE_SUSPENDED, # dwCreationFlags 570 None, # lpEnvironment 571 current_dir, # lpCurrentDirectory 572 ctypes.byref(si), # lpStartupInfo 573 ctypes.byref(pi), # lpProcessInformation 574 ) 575 if ret_val == 0: 576 raise ctypes.WinError(ctypes.get_last_error()) 577 logger.debug(f"Process: {pi.hProcess:#x}; Thread: {pi.hThread:#x}") 578 579 self._handle_process = pi.hProcess 580 self._handle_thread = pi.hThread 581 582 def get_process(self, pid: int) -> None: 583 """Get a process object from its process identifier (PID). This process 584 will be associated with the main job. 585 586 Args: 587 pid: The pid of the process to associate with the job. 588 589 Raises: 590 ctypes.WinError: An error occurred while trying to ge the process 591 object form its PID. Ensure you have 592 sufficient rights upon the process. 593 """ 594 handle_process = self._kernel32.OpenProcess( 595 PROCESS_SET_QUOTA | PROCESS_TERMINATE, False, pid) 596 if not handle_process: 597 raise ctypes.WinError(ctypes.get_last_error()) 598 logger.debug(f"Process: {handle_process:#x}") 599 self._handle_process = handle_process 600 601 def limit_process_memory(self, memory_limit: int) -> None: 602 """Effectively limit the process memory that the target process can allocates. 603 604 Args: 605 memory_limit: The memory limit of the process, in MiB (MebiBytes). 606 607 Raises: 608 ctypes.WinError: There was an error while trying to limit the 609 process memory. 610 ValueError: The given memory limit is not valid or there's no job. 611 """ 612 logger.info(f"Limiting process memory to {memory_limit} MiB " 613 f"({memory_limit * 1024 * 1024} bytes)") 614 if memory_limit <= 0: 615 raise ValueError(f"Memory limit can be 0 or negative; got: " 616 f"{memory_limit}") 617 if not self._handle_job: 618 raise ValueError("Job handle is NULL.") 619 # query current job object 620 job_info = JOBOBJECT_EXTENDED_LIMIT_INFORMATION() 621 job_info = self._query_information_job_object( 622 job_info, JobObjectExtendedLimitInformation) 623 624 # limit the process memory; note: not the job memory! 625 logger.debug("Setting job information.") 626 job_info.BasicLimitInformation.LimitFlags |= \ 627 JOB_OBJECT_LIMIT_PROCESS_MEMORY 628 # MiB to bytes 629 job_info.ProcessMemoryLimit = memory_limit * 1024 * 1024 630 self._set_information_job_object( 631 job_info, JobObjectExtendedLimitInformation) 632 633 def wait_for_job(self) -> bool: 634 """Wait for the job completion. This function returns when the last 635 process of the job exits. 636 637 Notes: 638 This function returns immediately if there's no IO ports. 639 640 Returns: 641 True if the function successfully waited for the job to finish, 642 False otherwise. 643 """ 644 logger.info("Waiting for the job.") 645 if self.is_started_process: 646 # resume the main thread, as we started the process ourselves. 647 self._resume_main_thread() 648 if not self.has_io_port: 649 return False 650 msg_map: Dict[int, str] = { 651 JOB_OBJECT_MSG_END_OF_JOB_TIME: "End of job time", 652 JOB_OBJECT_MSG_END_OF_PROCESS_TIME: "End of process time", 653 JOB_OBJECT_MSG_ACTIVE_PROCESS_LIMIT: 654 "Active process limit reached", 655 JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO: 656 "No more active process in job", 657 JOB_OBJECT_MSG_NEW_PROCESS: "New process in job", 658 JOB_OBJECT_MSG_EXIT_PROCESS: "A process in the job exited", 659 JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS: 660 "A process in the job exited abnormally", 661 JOB_OBJECT_MSG_PROCESS_MEMORY_LIMIT: 662 "A process in the job reached its memory limit", 663 JOB_OBJECT_MSG_JOB_MEMORY_LIMIT: 664 "The job has reached its memory limit", 665 JOB_OBJECT_MSG_NOTIFICATION_LIMIT: 666 "The job reached a notification limit", 667 JOB_OBJECT_MSG_JOB_CYCLE_TIME_LIMIT: 668 "The CPU cycle limit for the job has been reached", 669 JOB_OBJECT_MSG_SILO_TERMINATED: "A silo a terminated.", 670 } 671 672 return_val = False 673 674 completion_code = DWORD(0) 675 completion_key = ULONG_PTR(0) 676 overlapped = OVERLAPPED() 677 lp_overlapped = ctypes.POINTER(OVERLAPPED)(overlapped) 678 while True: 679 ret_val = self._kernel32.GetQueuedCompletionStatus( 680 self._handle_io_port, 681 ctypes.byref(completion_code), 682 ctypes.byref(completion_key), 683 ctypes.byref(lp_overlapped), 684 INFINITE, 685 ) 686 if ret_val == 0: 687 # error from GetQueuedCompletionStatus 688 logger.error(f"Error GetQueuedCompletionStatus: " 689 f"{ctypes.get_last_error():#x}") 690 break 691 if completion_key.value != self._handle_job: 692 # we received an event, but it's not from our job; we can 693 # ignore it! 694 continue 695 # message 696 msg = msg_map.get(completion_code.value) 697 if msg: 698 logger.info(f"IO Port Message: {msg}") 699 if completion_code.value == JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO: 700 # no more processes in the job, we can exit. 701 return_val = True 702 break 703 logger.info("Job wait finished") 704 return return_val 705 706 707def main(args: argparse.Namespace) -> int: 708 # runs only on Windows 709 if platform.system().lower() != "windows": 710 logger.error("This script only works on Microsoft Windows systems.") 711 return -1 712 713 # check the memory limit 714 if args.memory <= 0: 715 logger.debug(f"Memory limit must be above 0, got: {args.memory}") 716 return -1 717 718 # check the command type 719 if args.command_name == "process": 720 # check the binary file exists 721 if not args.process.is_file(): 722 logger.debug(f"The given file path '{args.process}' is not a file " 723 f"or doesn't exist.") 724 return -1 725 logger.info(f"Process Path: {args.process}; Memory limit (MiB): " 726 f"{args.memory}") 727 elif args.command_name == "pid": 728 # pid to int 729 if args.pid.startswith("0x") or args.pid.startswith("0X"): 730 base = 16 731 else: 732 base = 10 733 args.pid = int(args.pid, base) 734 logger.info(f"Process Pid: {args.pid:#x}; Memory limit (MiB): " 735 f"{args.memory}") 736 else: 737 logger.error(f"Unknown command: '{args.command}'") 738 return -1 739 740 with ProcessLimiter() as proc_limiter: 741 proc_limiter.create_job("MEMORY_LIMITER_JOB") 742 743 if args.command_name == "process": 744 proc_limiter.create_process(args.process) 745 else: 746 proc_limiter.get_process(args.pid) 747 proc_limiter.assign_process_to_job() 748 proc_limiter.limit_process_memory(args.memory) 749 if not proc_limiter.wait_for_job(): 750 input("Press <enter> to exit this script") 751 752 print("Script done.") 753 return 0 754 755 756if __name__ == "__main__": 757 arg_parser = argparse.ArgumentParser( 758 description="C:DDA Memory Limit test script.") 759 760 # 761 # options for all sub-commands 762 # 763 arg_parser.add_argument( 764 "-l", 765 "--log-level", 766 choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], 767 default='INFO', 768 help="Set the logging level.", 769 ) 770 arg_parser.add_argument( 771 "-m", "--memory", action="store", type=int, default=1024, 772 help="Maximum process memory size in MiB." 773 ) 774 775 # 776 # sub-commands 777 # 778 subparsers = arg_parser.add_subparsers(help='help for sub-commands', 779 dest="command_name") 780 781 parser_new_process = subparsers.add_parser( 782 'process', help='Start a new process.') 783 parser_new_process.add_argument( 784 "process", action="store", type=pathlib.Path, 785 help="Path to process binary file.") 786 787 parser_pid = subparsers.add_parser( 788 'pid', help='Enforce limit on a given process pid.') 789 parser_pid.add_argument( 790 "pid", action="store", type=str, 791 help="Process pid on which to enforce memory limit.") 792 793 parsed_args = arg_parser.parse_args() 794 795 if not parsed_args.command_name: 796 arg_parser.error("'process' or 'pid' command is required.") 797 logging_level = logging.getLevelName(parsed_args.log_level) 798 logging.basicConfig(level=logging_level) 799 logger.setLevel(logging_level) 800 801 sys.exit(main(parsed_args)) 802