1# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0 2# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 3# 4 5import logging 6from typing import Callable, List, Generator, Iterable 7 8from volatility.framework import renderers, interfaces 9from volatility.framework.configuration import requirements 10from volatility.framework.objects import utility 11from volatility.framework.renderers import format_hints 12from volatility.plugins.windows import pslist 13 14vollog = logging.getLogger(__name__) 15 16# these are from WinNT.h 17winnt_protections = { 18 "PAGE_NOACCESS": 0x01, 19 "PAGE_READONLY": 0x02, 20 "PAGE_READWRITE": 0x04, 21 "PAGE_WRITECOPY": 0x08, 22 "PAGE_EXECUTE": 0x10, 23 "PAGE_EXECUTE_READ": 0x20, 24 "PAGE_EXECUTE_READWRITE": 0x40, 25 "PAGE_EXECUTE_WRITECOPY": 0x80, 26 "PAGE_GUARD": 0x100, 27 "PAGE_NOCACHE": 0x200, 28 "PAGE_WRITECOMBINE": 0x400, 29 "PAGE_TARGETS_INVALID": 0x40000000, 30} 31 32 33class VadInfo(interfaces.plugins.PluginInterface): 34 """Lists process memory ranges.""" 35 36 _version = (1, 0, 0) 37 38 def __init__(self, *args, **kwargs): 39 super().__init__(*args, **kwargs) 40 self._protect_values = None 41 42 @classmethod 43 def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: 44 # Since we're calling the plugin, make sure we have the plugin's requirements 45 return [requirements.TranslationLayerRequirement(name = 'primary', 46 description = 'Memory layer for the kernel', 47 architectures = ["Intel32", "Intel64"]), 48 requirements.SymbolTableRequirement(name = "nt_symbols", description = "Windows kernel symbols"), 49 # TODO: Convert this to a ListRequirement so that people can filter on sets of ranges 50 requirements.IntRequirement(name = 'address', 51 description = "Process virtual memory address to include " \ 52 "(all other address ranges are excluded). This must be " \ 53 "a base address, not an address within the desired range.", 54 optional = True), 55 requirements.IntRequirement( 56 name = 'pid', description = "Process ID to include (all other processes are excluded)", 57 optional = True), 58 requirements.PluginRequirement(name = 'pslist', plugin = pslist.PsList, version = (1, 0, 0)), 59 ] 60 61 @classmethod 62 def protect_values(cls, context: interfaces.context.ContextInterface, layer_name: str, 63 symbol_table: str) -> Iterable[int]: 64 """Look up the array of memory protection constants from the memory 65 sample. These don't change often, but if they do in the future, then 66 finding them dynamically versus hard-coding here will ensure we parse 67 them properly. 68 69 Args: 70 context: The context to retrieve required elements (layers, symbol tables) from 71 layer_name: The name of the layer on which to operate 72 symbol_table: The name of the table containing the kernel symbols 73 """ 74 75 kvo = context.layers[layer_name].config["kernel_virtual_offset"] 76 ntkrnlmp = context.module(symbol_table, layer_name = layer_name, offset = kvo) 77 addr = ntkrnlmp.get_symbol("MmProtectToValue").address 78 values = ntkrnlmp.object(object_type = "array", offset = addr, subtype = ntkrnlmp.get_type("int"), count = 32) 79 return values # type: ignore 80 81 @classmethod 82 def list_vads(cls, proc: interfaces.objects.ObjectInterface, 83 filter_func: Callable[[interfaces.objects.ObjectInterface], bool] = lambda _: False) -> \ 84 Generator[interfaces.objects.ObjectInterface, None, None]: 85 """Lists the Virtual Address Descriptors of a specific process. 86 87 Args: 88 proc: _EPROCESS object from which to list the VADs 89 filter_func: Function to take a virtual address descriptor value and return True if it should be filtered out 90 91 Returns: 92 A list of virtual address descriptors based on the process and filtered based on the filter function 93 """ 94 for vad in proc.get_vad_root().traverse(): 95 if not filter_func(vad): 96 yield vad 97 98 def _generator(self, procs): 99 100 def passthrough(_: 'interfaces.objects.ObjectInterface') -> bool: 101 return False 102 103 filter_func = passthrough 104 if self.config.get('address', None) is not None: 105 106 def filter_function(x: 'interfaces.objects.ObjectInterface') -> bool: 107 return x.get_start() not in [self.config['address']] 108 109 filter_func = filter_function 110 111 for proc in procs: 112 process_name = utility.array_to_string(proc.ImageFileName) 113 114 for vad in self.list_vads(proc, filter_func = filter_func): 115 yield (0, (proc.UniqueProcessId, process_name, format_hints.Hex(vad.vol.offset), 116 format_hints.Hex(vad.get_start()), format_hints.Hex(vad.get_end()), vad.get_tag(), 117 vad.get_protection( 118 self.protect_values(self.context, self.config['primary'], self.config['nt_symbols']), 119 winnt_protections), vad.get_commit_charge(), vad.get_private_memory(), 120 format_hints.Hex(vad.get_parent()), vad.get_file_name())) 121 122 def run(self): 123 124 filter_func = pslist.PsList.create_pid_filter([self.config.get('pid', None)]) 125 126 return renderers.TreeGrid([("PID", int), ("Process", str), ("Offset", format_hints.Hex), 127 ("Start VPN", format_hints.Hex), ("End VPN", format_hints.Hex), ("Tag", str), 128 ("Protection", str), ("CommitCharge", int), ("PrivateMemory", int), 129 ("Parent", format_hints.Hex), ("File", str)], 130 self._generator( 131 pslist.PsList.list_processes(context = self.context, 132 layer_name = self.config['primary'], 133 symbol_table = self.config['nt_symbols'], 134 filter_func = filter_func))) 135