Source code for binaryninja.collaboration.merge

import abc
import ctypes
import json
import sys
import traceback
from typing import Dict, Optional

from .. import _binaryninjacore as core
from ..enums import MergeConflictDataType
from ..database import KeyValueStore
from . import util

from ..database import Database, Snapshot
from ..filemetadata import FileMetadata

OptionalStringDict = Optional[Dict[str, object]]


[docs] class MergeConflict: """ Structure representing an individual merge conflict """ def __init__(self, handle: core.BNAnalysisMergeConflictHandle): """ FFI constructor :param handle: FFI handle for internal use """ self._handle = ctypes.cast(handle, core.BNAnalysisMergeConflictHandle) def __del__(self): core.BNFreeAnalysisMergeConflict(self._handle) @property def database(self) -> Database: """ Database backing all snapshots in the merge conflict :return: Database object """ result = core.BNAnalysisMergeConflictGetDatabase(self._handle) return Database(handle=ctypes.cast(result, ctypes.POINTER(core.BNDatabase))) @property def base_snapshot(self) -> Optional[Snapshot]: """ Snapshot which is the parent of the two being merged :return: Snapshot object """ result = core.BNAnalysisMergeConflictGetBaseSnapshot(self._handle) if result is None: return None return Snapshot(handle=ctypes.cast(result, ctypes.POINTER(core.BNSnapshot))) @property def first_snapshot(self) -> Optional[Snapshot]: """ First snapshot being merged :return: Snapshot object """ result = core.BNAnalysisMergeConflictGetFirstSnapshot(self._handle) if result is None: return None return Snapshot(handle=ctypes.cast(result, ctypes.POINTER(core.BNSnapshot))) @property def second_snapshot(self) -> Optional[Snapshot]: """ Second snapshot being merged :return: Snapshot object """ result = core.BNAnalysisMergeConflictGetSecondSnapshot(self._handle) if result is None: return None return Snapshot(handle=ctypes.cast(result, ctypes.POINTER(core.BNSnapshot))) @property def base_file(self) -> Optional[FileMetadata]: """ FileMetadata with contents of file for base snapshot This function is slow! Only use it if you really need it. :return: FileMetadata object """ result = core.BNAnalysisMergeConflictGetBaseFile(self._handle) if result is None: return None lazy = util.LazyT(handle=result) file = FileMetadata(handle=ctypes.cast(lazy.get(ctypes.POINTER(core.BNFileMetadata)), ctypes.POINTER(core.BNFileMetadata))) core.BNCollaborationFreeLazyT(result) return file @property def first_file(self) -> Optional[FileMetadata]: """ FileMetadata with contents of file for first snapshot This function is slow! Only use it if you really need it. :return: FileMetadata object """ result = core.BNAnalysisMergeConflictGetFirstFile(self._handle) if result is None: return None lazy = util.LazyT(handle=result) file = FileMetadata(handle=ctypes.cast(lazy.get(ctypes.POINTER(core.BNFileMetadata)), ctypes.POINTER(core.BNFileMetadata))) core.BNCollaborationFreeLazyT(result) return file @property def second_file(self) -> Optional[FileMetadata]: """ FileMetadata with contents of file for second snapshot This function is slow! Only use it if you really need it. :return: FileMetadata object """ result = core.BNAnalysisMergeConflictGetSecondFile(self._handle) if result is None: return None lazy = util.LazyT(handle=result) file = FileMetadata(handle=ctypes.cast(lazy.get(ctypes.POINTER(core.BNFileMetadata)), ctypes.POINTER(core.BNFileMetadata))) core.BNCollaborationFreeLazyT(result) return file @property def base(self) -> OptionalStringDict: """ Json object for conflicting data in the base snapshot :return: Python dictionary from parsed json """ result = core.BNAnalysisMergeConflictGetBase(self._handle) if result is None: return None return json.loads(result) @property def first(self) -> OptionalStringDict: """ Json object for conflicting data in the first snapshot :return: Python dictionary from parsed json """ result = core.BNAnalysisMergeConflictGetFirst(self._handle) if result is None: return None return json.loads(result) @property def second(self) -> OptionalStringDict: """ Json object for conflicting data in the second snapshot :return: Python dictionary from parsed json """ result = core.BNAnalysisMergeConflictGetSecond(self._handle) if result is None: return None return json.loads(result) @property def data_type(self) -> 'MergeConflictDataType': """ Type of data in the conflict, Text/Json/Binary :return: Conflict data type """ return MergeConflictDataType(core.BNAnalysisMergeConflictGetDataType(self._handle)) @property def type(self) -> str: """ String representing the type name of the data, not the same as data_type. This is like "typeName" or "tag" depending on what object the conflict represents. :return: Type name """ return core.BNAnalysisMergeConflictGetType(self._handle) @property def key(self) -> str: """ Lookup key for the merge conflict, ideally a tree path that contains the name of the conflict and all the recursive children leading up to this conflict. :return: Key name """ return core.BNAnalysisMergeConflictGetKey(self._handle)
[docs] def success(self, value: OptionalStringDict) -> bool: """ Call this when you've resolved the conflict to save the result :param value: Resolved value :return: True if successful """ if value is None: printed = None else: printed = json.dumps(value) return core.BNAnalysisMergeConflictSuccess(self._handle, printed)
[docs] def get_path_item(self, path_key: str) -> Optional[object]: """ Get item in the merge conflict's path for a given key. :param path_key: Key for path item :return: Path item, or an None if not found """ value = core.BNAnalysisMergeConflictGetPathItem(self._handle, path_key) if value is None: return None return ctypes.py_object(value)
[docs] class ConflictHandler: """ Helper class that resolves conflicts """ def _handle(self, ctxt: ctypes.c_void_p, keys: ctypes.POINTER(ctypes.c_char_p), conflicts: ctypes.POINTER(core.BNAnalysisMergeConflictHandle), count: ctypes.c_ulonglong) -> bool: try: py_conflicts = {} for i in range(count.value): py_conflicts[core.pyNativeStr(keys[i])] = MergeConflict(handle=conflicts[i]) return self.handle(py_conflicts) except: traceback.print_exc(file=sys.stderr) return False
[docs] @abc.abstractmethod def handle(self, conflicts: Dict[str, MergeConflict]) -> bool: """ Handle any merge conflicts by calling their success() function with a merged value :param conflicts: Map of conflict id to conflict structure :return: True if all conflicts were successfully merged """ raise NotImplementedError("Not implemented")
[docs] class ConflictSplitter: """ Helper class that takes one merge conflict and splits it into multiple conflicts Eg takes conflicts for View/symbols and splits to one conflict per symbol """ def __init__(self, handle=None): if handle is not None: self._handle = handle
[docs] def register(self): self._cb = core.BNAnalysisMergeConflictSplitterCallbacks() self._cb.context = 0 self._cb.getName = self._cb.getName.__class__(self._get_name) self._cb.reset = self._cb.reset.__class__(self._reset) self._cb.finished = self._cb.finished.__class__(self._finished) self._cb.canSplit = self._cb.canSplit.__class__(self._can_split) self._cb.split = self._cb.split.__class__(self._split) self._cb.freeName = self._cb.freeName.__class__(self._free_name) self._cb.freeKeyList = self._cb.freeKeyList.__class__(self._free_key_list) self._cb.freeConflictList = self._cb.freeConflictList.__class__(self._free_conflict_list) self._handle = core.BNRegisterAnalysisMergeConflictSplitter(self._cb) self._split_keys = None self._split_conflicts = None
def _get_name(self, ctxt: ctypes.c_void_p) -> ctypes.c_char_p: try: return core.BNAllocString(core.cstr(self.name)) except: # Not sure why your get_name() would throw but let's handle it anyway traceback.print_exc(file=sys.stderr) return core.BNAllocString(core.cstr(type(self).__name__)) def _reset(self, ctxt: ctypes.c_void_p): try: self.reset() except: traceback.print_exc(file=sys.stderr) def _finished(self, ctxt: ctypes.c_void_p): try: self.finished() except: traceback.print_exc(file=sys.stderr) def _can_split(self, ctxt: ctypes.c_void_p, key: ctypes.c_char_p, conflict: core.BNAnalysisMergeConflictHandle) -> bool: try: py_conflict = MergeConflict(handle=conflict) return self.can_split(core.pyNativeStr(key), py_conflict) except: traceback.print_exc(file=sys.stderr) return False def _split( self, ctxt: ctypes.c_void_p, original_key: ctypes.c_char_p, original_conflict: core.BNAnalysisMergeConflictHandle, result_kvs: core.BNKeyValueStoreHandle, new_keys: ctypes.POINTER(ctypes.POINTER(ctypes.c_char_p)), new_conflicts: ctypes.POINTER(core.BNAnalysisMergeConflictHandle), new_count: ctypes.POINTER(ctypes.c_size_t) ) -> bool: try: py_original_conflict = MergeConflict(handle=original_conflict) py_result_kvs = KeyValueStore(handle=ctypes.cast(result_kvs, core.BNKeyValueStoreHandle)) result = self.split(core.pyNativeStr(original_key), py_original_conflict, py_result_kvs) if result is None: return False new_count[0] = ctypes.c_size_t(len(result)) new_keys[0] = (ctypes.c_char_p * len(result))() new_conflicts[0] = (core.BNAnalysisMergeConflictHandle * len(result))() self._split_keys = [] self._split_conflicts = [] for (i, (key, conflict)) in enumerate(result): self._split_keys.append(key) self._split_conflicts.append(conflict) new_keys[0][i] = core.cstr(self._split_keys[-1]) new_conflicts[0][i] = self._split_conflicts[-1]._handle return True except: traceback.print_exc(file=sys.stderr) return False def _free_name(self, ctxt: ctypes.c_void_p, name: ctypes.c_char_p): core.BNFreeString(name) def _free_key_list(self, ctxt: ctypes.c_void_p, key_list: ctypes.POINTER(ctypes.c_char_p), count: ctypes.c_size_t): del self._split_keys def _free_conflict_list(self, ctxt: ctypes.c_void_p, conflict_list: core.BNAnalysisMergeConflictHandle, count: ctypes.c_size_t): del self._split_conflicts @property def name(self): """ Get a friendly name for the splitter :return: Name of the splitter """ return self.get_name()
[docs] def get_name(self) -> str: """ Get a friendly name for the splitter :return: Name of the splitter """ return type(self).__name__
[docs] def reset(self): """ Reset any internal state the splitter may hold during the merge """ return
[docs] def finished(self): """ Clean up any internal state after the merge operation has finished """ return
[docs] @abc.abstractmethod def can_split(self, key: str, conflict: MergeConflict) -> bool: """ Test if the splitter applies to a given conflict (by key). :param key: Key of the conflicting field :param conflict: Conflict data :return: True if this splitter should be used on the conflict """ raise NotImplementedError("Not implemented")
[docs] @abc.abstractmethod def split(self, key: str, conflict: MergeConflict, result: KeyValueStore) -> Optional[Dict[str, MergeConflict]]: """ Split a field conflict into any number of alternate conflicts. Note: Returned conflicts will also be checked for splitting, beware infinite loops! If this function raises, it will be treated as returning None :param key: Original conflicting field's key :param conflict: Original conflict data :param result: Kvs structure containing the result of all splits. You should use the original conflict's success() function in most cases unless you specifically want to write a new key to this. :return: A collection of conflicts into which the original conflict was split, or None if this splitter cannot handle the conflict """ raise NotImplementedError("Not implemented")