diff --git a/librarytrader/library.py b/librarytrader/library.py index 05f2d09ee844d6011d8db0cffcb1d5600b76ec47..da32c610fd318088a383b036b69568a7ba49b42b 100644 --- a/librarytrader/library.py +++ b/librarytrader/library.py @@ -1130,6 +1130,191 @@ class Library: logging.debug('fixing nop gap at %x: %d bytes', cur_end, gap) self.ranges[start] += gap + def _reconstruct_functions_linear(self): + # Try to reconstruct the boundaries of functions from the disassembly. + # First, collect all known call target addresses, these form the initial + # list of code ranges that could be functions. After that, parse all + # these ranges and follow jumps through the basic blocks. + cs_obj = self.get_capstone_object() + section = self._elffile.get_section_by_name('.text') + if not section: + return + boundaries = set() + start = section['sh_offset'] + size = section['sh_size'] + end = start + size + boundaries.add(start) + boundaries.add(end) + self.fd.seek(start) + code_bytes = self.fd.read(size) + code = list(cs_obj.disasm(code_bytes, start)) + # Collect all direct call targets as the initial list of boundaries. + # We know that there will be a function if the address is called from + # somewhere inside the binary. + for instr in code: + if instr.group(capstone.x86_const.X86_GRP_CALL): + op = instr.operands[-1] + if op.type == capstone.x86.X86_OP_IMM: + target = op.value.imm + elif op.type == capstone.x86.X86_OP_MEM and op.value.mem.base == capstone.x86.X86_REG_RIP: + target = instr.address + op.value.mem.disp + instr.size + if start <= target < end: + boundaries.add(target) + # Additionally, if we run into a known exported function, use this + # as a boundary as well. + elif instr.address in self.exported_addrs: + boundaries.add(instr.address) + + # Zip up the ranges to create [start, end) pairs + cur_ranges = list(zip(sorted(boundaries)[:-1], sorted(boundaries)[1:])) + found_ranges = set() + prev_ranges = set() + while cur_ranges != prev_ranges: + # Only process changed ranges to save reevaluating unchanged ranges + diff_ranges = [(start, end) for start, end in cur_ranges if (start, end) not in prev_ranges] + # Remove the ones to process if they were marked before + for start, end in diff_ranges: + if (start, end) in found_ranges: + found_ranges.remove((start, end)) + + prev_ranges = cur_ranges[:] + for start, end in diff_ranges: + #if start != 0x243c0: + # continue + logging.debug('Scanning range %x-%x', start, end) + # Read the code for the range in question + self.fd.seek(start) + code_bytes = self.fd.read(end - start) + code = list(cs_obj.disasm(code_bytes, start)) + # 'Trim' leading NOPs (which are padding from the range before) + while code and code[0].mnemonic == 'nop': + start += code[0].size + code.pop(0) + # If there were only NOPs or no code disassembled, continue with + # the next range + if not code: + continue + code_stack = collections.deque() + code_stack.append(0)# start processing at the first instruction + code_ends = set() # addresses beyond terminating instruction + worked = set() # keep track of already processed blocks + addr_to_index = {instr.address: index for index, instr in enumerate(code)} + # Process all blocks + while code_stack: + index = code_stack.pop() + worked.add(index) + logging.debug('Popped index %d', index) + # Iterate over all instructions in the current block + while index < len(code): + instr = code[index] + logging.debug('Checking %x: %s %s', instr.address, + instr.mnemonic, instr.op_str) + next_addr = instr.address + instr.size + if instr.group(capstone.x86_const.X86_GRP_JUMP): + op = instr.operands[-1] + # First, deal with jumps to immediate addresses. + if op.type == capstone.x86.X86_OP_IMM: + target = op.value.imm + new_index = addr_to_index.get(target) + # If we're jumping into the current range, add the + # target of the jump as a block to work on... + if new_index and new_index not in worked: + logging.debug('%x: appending %x', + instr.address, target) + code_stack.append(new_index) + # ... and note the following instruction as a + # potential last one + code_ends.add(next_addr) + # in case we're jumping through a register, add the + # next address as a boundary as well + elif op.type == capstone.x86.X86_OP_REG: + code_ends.add(next_addr) + # If we encounter a non-conditional jump, terminate + # processing of the current block + if instr.id == capstone.x86_const.X86_INS_JMP: + code_ends.add(next_addr) + break + # If we're at a return instruction, add the next address + # to the list of potential ends and finish processing + # this block + elif instr.group(capstone.x86_const.X86_GRP_RET): + code_ends.add(next_addr) + break + # If we see a call to an function known to exit or abort, + # add the next instruction to the list of potential ends. + elif instr.group(capstone.x86_const.X86_GRP_CALL): + op = instr.operands[-1] + if op.type == capstone.x86.X86_OP_IMM: + target = op.value.imm + if target in self.imports_plt and \ + ('stack_chk_fail' in self.imports_plt[target] or + '__assert_fail' in self.imports_plt[target] or + 'err@@' in self.imports_plt[target] or + 'abort@@' in self.imports_plt[target]): + code_ends.add(next_addr) + break + if index + 1 < len(code) and code[index + 1].mnemonic == 'nop': + logging.debug('NOP after call') + code_ends.add(next_addr + code[index + 1].size) + break + index += 1 + # Use original range if no terminating instruction was found + logging.debug('code_ends: %s', sorted([hex(x) for x in code_ends])) + det_end = max(code_ends) if code_ends else end + if start == det_end: + continue + found_ranges.add((start, det_end)) + + known_boundaries = set() + for start, det_end in found_ranges: + known_boundaries.update([start, det_end]) + + cur_ranges = list(zip(sorted(known_boundaries)[:-1], sorted(known_boundaries)[1:])) + logging.debug('Function boundary identification loop iteration done, '\ + 'previous size %d, current size %d', len(prev_ranges), + len(cur_ranges)) + + logging.debug('Identification of boundaries done!') + + # Compare the list of found ranges against the exported and local + # functions and report mismatches. + nop_count = 0 + for start, det_end in sorted(found_ranges): + det_size = det_end - start + #print('Function at {:x}: next at {:x}, end determined at {:x}, size {}'.format( + # start, end, det_end, det_size + #)) + # Filter out padding ranges between functions -> only NOPs + if det_size > 0: + self.fd.seek(start) + code_bytes = self.fd.read(det_size) + code = list(cs_obj.disasm(code_bytes, start)) + if all(instr.mnemonic == 'nop' for instr in code): + nop_count += 1 + continue + # Crosscheck determined ranges agains symtab/dynsym provided ones + if start in self.ranges: + names = '' + symtab_size = self.ranges[start] + if start in self.exported_addrs: + names = self.exported_addrs[start] + elif start in self.local_functions: + names = self.local_functions[start] + if symtab_size != det_size: + end_symtab = start + symtab_size + gap_size = det_end - end_symtab + # filter out if the gap only consists of NOPs + if gap_size > 0: + self.fd.seek(end_symtab) + code_bytes = self.fd.read(gap_size) + code = list(cs_obj.disasm(code_bytes, end_symtab)) + if all(instr.mnemonic == 'nop' for instr in code): + continue + logging.debug('difference for %x:%s: symtab %d, determined %d!', + start, names, symtab_size, det_size) + logging.debug('%d/%d ranges were NOP only', nop_count, len(found_ranges)) + logging.debug('ranges: %d %s', len(found_ranges), list((hex(x), hex(y)) for x, y in sorted(found_ranges))) + def parse_functions(self, release=False): before = time.time() self.parse_versions() @@ -1141,6 +1326,7 @@ class Library: self.parse_rela_dyn() if self.entrypoint and self.entrypoint not in self.ranges: self.ranges[self.entrypoint] = 0 + #self._reconstruct_functions_linear() if has_symtab: self._check_init_functions() self._postprocess_ranges()