import ctypes
import json
import os
from typing import Dict, List, Optional, Tuple
import binaryninja
import binaryninja.enterprise as enterprise
from .. import _binaryninjacore as core
from . import databasesync, group, project, user, util
[docs]
class Remote:
"""
Class representing a connection to a Collaboration server
"""
def __init__(self, handle: core.BNRemoteHandle):
"""
:param handle: FFI handle for internal use
:raises: RuntimeError if there was an error
"""
self._handle = ctypes.cast(handle, core.BNRemoteHandle)
def __del__(self):
if core is not None:
core.BNFreeRemote(self._handle)
def __eq__(self, other):
if not isinstance(other, Remote):
return False
if not self.has_loaded_metadata or not other.has_loaded_metadata:
# Don't pull metadata if we haven't yet
return self.address == other.address
return other.unique_id == self.unique_id
def __str__(self):
return f'<remote: {self.name}>'
def __repr__(self):
return f'<remote: {self.name}>'
[docs]
@staticmethod
def get_for_local_database(database: 'binaryninja.Database') -> Optional['Remote']:
"""
Get the Remote for a Database
:param database: BN database, potentially with collaboration metadata
:return: Remote from one of the connected remotes, or None if not found
:rtype: Optional[Remote]
:raises RuntimeError: If there was an error
"""
return databasesync.get_remote_for_local_database(database)
[docs]
@staticmethod
def get_for_bv(bv: 'binaryninja.BinaryView') -> Optional['Remote']:
"""
Get the Remote for a Binary View
:param bv: Binary view, potentially with collaboration metadata
:return: Remote from one of the connected remotes, or None if not found
:raises RuntimeError: If there was an error
"""
if not bv.file.has_database:
return None
db = bv.file.database
if db is None:
return None
return databasesync.get_remote_for_local_database(db)
@property
def has_loaded_metadata(self):
"""
If the remote has pulled metadata like its id, etc
:return: True if it has been pulled
"""
return core.BNRemoteHasLoadedMetadata(self._handle)
@property
def unique_id(self) -> str:
"""
Unique id. If metadata has not been pulled, it will be pulled upon calling this.
:return: Id string
:raises RuntimeError: If there was an error pulling metadata.
"""
if not self.has_loaded_metadata:
self.load_metadata()
return core.BNRemoteGetUniqueId(self._handle)
@property
def name(self) -> str:
"""
Assigned name of the Remote
:return: Name string
"""
return core.BNRemoteGetName(self._handle)
@property
def address(self) -> str:
"""
Base address of the Remote
:return: URL string
"""
return core.BNRemoteGetAddress(self._handle)
@property
def is_connected(self) -> bool:
"""
If the Remote is connected (has `Remote.connect` been called)
:return: True if connected
"""
return core.BNRemoteIsConnected(self._handle)
@property
def username(self) -> str:
"""
Username used to connect to the remote
:return: Username string
"""
return core.BNRemoteGetUsername(self._handle)
@property
def token(self) -> str:
"""
Token used to connect to the remote
:return: Token string
"""
return core.BNRemoteGetToken(self._handle)
@property
def server_version(self) -> int:
"""
Version of software running on the server. If metadata has not been pulled, it will
be pulled upon calling this.
:return: Server version number
:raises RuntimeError: If there was an error
"""
if not self.has_loaded_metadata:
self.load_metadata()
return core.BNRemoteGetServerVersion(self._handle)
@property
def server_build_id(self) -> str:
"""
Build id of software running on the server. If metadata has not been pulled, it will
be pulled upon calling this.
:return: Server build id string
:raises RuntimeError: If there was an error
"""
if not self.has_loaded_metadata:
self.load_metadata()
return core.BNRemoteGetServerBuildId(self._handle)
@property
def auth_backends(self) -> List[Tuple[str, str]]:
"""
List of supported authentication backends on the server.
If metadata has not been pulled, it will be pulled upon calling this.
:return: List of Backend id <=> backend display name tuples
:raises RuntimeError: If there was an error
"""
if not self.has_loaded_metadata:
self.load_metadata()
backend_ids = ctypes.POINTER(ctypes.c_char_p)()
backend_names = ctypes.POINTER(ctypes.c_char_p)()
count = ctypes.c_size_t()
if not core.BNRemoteGetAuthBackends(self._handle, backend_ids, backend_names, count):
raise RuntimeError(util._last_error())
result = []
for i in range(count.value):
result.append((core.pyNativeStr(backend_ids[i]), core.pyNativeStr(backend_names[i])))
core.BNFreeStringList(backend_ids, count.value)
core.BNFreeStringList(backend_names, count.value)
return result
@property
def is_admin(self) -> bool:
"""
If the currently connected user is an administrator.
.. note:: If users have not been pulled, they will attempt to be pulled upon calling this.
:return: True if the user is an administrator
"""
# This is the test by which the api knows it is an admin
if not self.has_pulled_users:
self.pull_users()
return core.BNRemoteIsAdmin(self._handle)
@property
def is_enterprise(self) -> bool:
"""
If this remote is the same as the Enterprise License server
:return: True if the same
"""
if not self.has_loaded_metadata:
self.load_metadata()
return core.BNRemoteIsEnterprise(self._handle)
[docs]
def request_authentication_token(self, username: str, password: str) -> Optional[str]:
"""
Request an authentication token using a username and password.
:param username: Username to authenticate with
:param password: Password of user
:return: Authentication token string, or None if there was an error
"""
return core.BNRemoteRequestAuthenticationToken(self._handle, username, password)
[docs]
def connect(self, username: Optional[str] = None, token: Optional[str] = None):
"""
Connect to a Remote, loading metadata and optionally acquiring a token.
.. note:: If no username or token are provided, they will be looked up from the keychain, \
likely saved there by Enterprise authentication.
:param username: Optional username to connect with
:param token: Optional token to authenticate with
:raises RuntimeError: If the connection fails
"""
if not self.has_loaded_metadata:
self.load_metadata()
got_auth = False
if username is not None and token is not None:
got_auth = True
if not got_auth:
# Try logging in with defaults
if self.is_enterprise and enterprise.is_authenticated():
username = enterprise.username()
token = enterprise.token()
if username is not None and token is not None:
got_auth = True
if not got_auth:
# Try to load from default secrets provider
secrets = binaryninja.SecretsProvider[
binaryninja.Settings().get_string("enterprise.secretsProvider")]
if secrets.has_data(self.address):
creds = json.loads(secrets.get_data(self.address))
username = creds['username']
token = creds['token']
got_auth = True
if not got_auth:
# Try logging in with creds in the env
if os.environ.get('BN_ENTERPRISE_USERNAME') is not None and \
os.environ.get('BN_ENTERPRISE_PASSWORD') is not None:
token = self.request_authentication_token(os.environ['BN_ENTERPRISE_USERNAME'], os.environ['BN_ENTERPRISE_PASSWORD'])
if token is not None:
username = os.environ['BN_ENTERPRISE_USERNAME']
got_auth = True
if not got_auth or username is None or token is None:
raise RuntimeError("Cannot connect without a username or token")
if not core.BNRemoteConnect(self._handle, username, token):
raise RuntimeError(util._last_error())
[docs]
def disconnect(self):
"""
Disconnect from the remote
:raises RuntimeError: If there was somehow an error
"""
if not core.BNRemoteDisconnect(self._handle):
raise RuntimeError(util._last_error())
@property
def has_pulled_projects(self) -> bool:
"""
If the project has pulled the projects yet
:return: True if they have been pulled
"""
return core.BNRemoteHasPulledProjects(self._handle)
@property
def has_pulled_groups(self) -> bool:
"""
If the project has pulled the groups yet
:return: True if they have been pulled
"""
return core.BNRemoteHasPulledGroups(self._handle)
@property
def has_pulled_users(self) -> bool:
"""
If the project has pulled the users yet
:return: True if they have been pulled
"""
return core.BNRemoteHasPulledUsers(self._handle)
@property
def projects(self) -> List['project.RemoteProject']:
"""
Get the list of projects in this project.
.. note:: If projects have not been pulled, they will be pulled upon calling this.
:return: List of Project objects
:raises: RuntimeError if there was an error pulling projects
"""
if not self.has_pulled_projects:
self.pull_projects()
count = ctypes.c_size_t()
value = core.BNRemoteGetProjects(self._handle, count)
if value is None:
raise RuntimeError(util._last_error())
result = []
for i in range(count.value):
result.append(project.RemoteProject(value[i]))
return result
[docs]
def get_project_by_id(self, id: str) -> Optional['project.RemoteProject']:
"""
Get a specific project in the Remote by its id
.. note:: If projects have not been pulled, they will be pulled upon calling this.
:param id: Id of Project
:return: Project object, if one with that id exists. Else, None
:raises: RuntimeError if there was an error pulling projects
"""
if not self.has_pulled_projects:
self.pull_projects()
value = core.BNRemoteGetProjectById(self._handle, id)
if value is None:
return None
return project.RemoteProject(value)
[docs]
def get_project_by_name(self, name: str) -> Optional['project.RemoteProject']:
"""
Get a specific project in the Remote by its name
.. note:: If projects have not been pulled, they will be pulled upon calling this.
:param name: Name of Project
:return: Project object, if one with that name exists. Else, None
:raises: RuntimeError if there was an error pulling projects
"""
if not self.has_pulled_projects:
self.pull_projects()
value = core.BNRemoteGetProjectByName(self._handle, name)
if value is None:
return None
return project.RemoteProject(value)
[docs]
def pull_projects(self, progress: 'util.ProgressFuncType' = util.nop):
"""
Pull the list of projects from the Remote.
:param progress: Function to call for progress updates
:raises: RuntimeError if there was an error pulling projects
"""
if not core.BNRemotePullProjects(self._handle, util.wrap_progress(progress), None):
raise RuntimeError(util._last_error())
[docs]
def create_project(self, name: str, description: str) -> 'project.RemoteProject':
"""
Create a new project on the remote (and pull it)
:param name: Project name
:param description: Project description
:return: Reference to the created project
:raises: RuntimeError if there was an error
"""
value = core.BNRemoteCreateProject(self._handle, name, description)
if value is None:
raise RuntimeError(util._last_error())
return project.RemoteProject(value)
[docs]
def push_project(self, project: 'project.RemoteProject', extra_fields: Optional[Dict[str, str]] = None):
"""
Push an updated Project object to the Remote
:param project: Project object which has been updated
:param extra_fields: Extra HTTP fields to send with the update
:raises: RuntimeError if there was an error
"""
if extra_fields is None:
extra_fields = {}
extra_field_keys = (ctypes.c_char_p * len(extra_fields))()
extra_field_values = (ctypes.c_char_p * len(extra_fields))()
for (i, (key, value)) in enumerate(extra_fields.items()):
extra_field_keys[i] = core.cstr(key)
extra_field_values[i] = core.cstr(value)
if not core.BNRemotePushProject(self._handle, project._handle, extra_field_keys, extra_field_values, len(extra_fields)):
raise RuntimeError(util._last_error())
[docs]
def delete_project(self, project: 'project.RemoteProject'):
"""
Delete a project from the remote
:param project: Project to delete
:raises: RuntimeError if there was an error
"""
if not core.BNRemoteDeleteProject(self._handle, project._handle):
raise RuntimeError(util._last_error())
@property
def groups(self) -> List['group.Group']:
"""
Get the list of groups in this project.
.. note:: If groups have not been pulled, they will be pulled upon calling this.
.. note:: This function is only available to accounts with admin status on the Remote
:return: List of Group objects
:raises: RuntimeError if there was an error pulling groups
"""
if not self.has_pulled_groups:
self.pull_groups()
count = ctypes.c_size_t()
value = core.BNRemoteGetGroups(self._handle, count)
if value is None:
raise RuntimeError(util._last_error())
result = []
for i in range(count.value):
result.append(group.Group(value[i]))
return result
[docs]
def get_group_by_id(self, id: int) -> Optional['group.Group']:
"""
Get a specific group in the Remote by its id
.. note:: If groups have not been pulled, they will be pulled upon calling this.
.. note:: This function is only available to accounts with admin status on the Remote
:param id: Id of Group
:return: Group object, if one with that id exists. Else, None
:raises: RuntimeError if there was an error pulling groups
"""
if not self.has_pulled_groups:
self.pull_groups()
value = core.BNRemoteGetGroupById(self._handle, id)
if value is None:
return None
return group.Group(value)
[docs]
def get_group_by_name(self, name: str) -> Optional['group.Group']:
"""
Get a specific group in the Remote by its name
.. note:: If groups have not been pulled, they will be pulled upon calling this.
.. note:: This function is only available to accounts with admin status on the Remote
:param name: Name of Group
:return: Group object, if one with that name exists. Else, None
:raises: RuntimeError if there was an error pulling groups
"""
if not self.has_pulled_groups:
self.pull_groups()
value = core.BNRemoteGetGroupByName(self._handle, name)
if value is None:
return None
return group.Group(value)
[docs]
def search_groups(self, prefix: str) -> List[Tuple[int, str]]:
"""
Search for groups in the Remote with a given prefix
:param prefix: Prefix of name for groups
:return: List of group id <=> group name pairs
:raises: RuntimeError if there was an error
"""
count = ctypes.c_size_t()
group_ids = ctypes.POINTER(ctypes.c_uint64)()
group_names = ctypes.POINTER(ctypes.c_char_p)()
if not core.BNRemoteSearchGroups(self._handle, prefix, group_ids, group_names, count):
raise RuntimeError(util._last_error())
result = []
for i in range(count.value):
result.append((group_ids[i], core.pyNativeStr(group_names[i])))
core.BNCollaborationFreeIdList(group_ids, count.value)
core.BNFreeStringList(group_names, count.value)
return result
[docs]
def pull_groups(self, progress: 'util.ProgressFuncType' = util.nop):
"""
Pull the list of groups from the Remote.
.. note:: This function is only available to accounts with admin status on the Remote
:param progress: Function to call for progress updates
:raises: RuntimeError if there was an error pulling groups
"""
if not core.BNRemotePullGroups(self._handle, util.wrap_progress(progress), None):
raise RuntimeError(util._last_error())
[docs]
def create_group(self, name: str, usernames: List[str]) -> 'group.Group':
"""
Create a new group on the remote (and pull it)
.. note:: This function is only available to accounts with admin status on the Remote
:param name: Group name
:param usernames: List of usernames of users in the group
:return: Reference to the created group
:raises: RuntimeError if there was an error
"""
c_usernames = (ctypes.c_char_p * len(usernames))()
for (i, username) in enumerate(usernames):
c_usernames[i] = core.cstr(username)
value = core.BNRemoteCreateGroup(self._handle, name, c_usernames, len(usernames))
if value is None:
raise RuntimeError(util._last_error())
return group.Group(value)
[docs]
def push_group(self, group: 'group.Group', extra_fields: Optional[Dict[str, str]] = None):
"""
Push an updated Group object to the Remote
.. note:: This function is only available to accounts with admin status on the Remote
:param group: Group object which has been updated
:param extra_fields: Extra HTTP fields to send with the update
:raises: RuntimeError if there was an error
"""
if extra_fields is None:
extra_fields = {}
extra_field_keys = (ctypes.c_char_p * len(extra_fields))()
extra_field_values = (ctypes.c_char_p * len(extra_fields))()
for (i, (key, value)) in enumerate(extra_fields.items()):
extra_field_keys[i] = core.cstr(key)
extra_field_values[i] = core.cstr(value)
if not core.BNRemotePushGroup(self._handle, group._handle, extra_field_keys, extra_field_values, len(extra_fields)):
raise RuntimeError(util._last_error())
[docs]
def delete_group(self, group: 'group.Group'):
"""
Delete a group from the remote
.. note:: This function is only available to accounts with admin status on the Remote
:param group: Group to delete
:raises: RuntimeError if there was an error
"""
if not core.BNRemoteDeleteGroup(self._handle, group._handle):
raise RuntimeError(util._last_error())
@property
def users(self) -> List['user.User']:
"""
Get the list of users in this project.
.. note:: If users have not been pulled, they will be pulled upon calling this.
.. note:: This function is only available to accounts with admin status on the Remote
:return: List of User objects
:raises: RuntimeError if there was an error pulling users
"""
if not self.has_pulled_users:
self.pull_users()
count = ctypes.c_size_t()
value = core.BNRemoteGetUsers(self._handle, count)
if value is None:
raise RuntimeError(util._last_error())
result = []
for i in range(count.value):
result.append(user.User(value[i]))
return result
[docs]
def get_user_by_id(self, id: str) -> Optional['user.User']:
"""
Get a specific user in the Remote by their id
.. note:: If users have not been pulled, they will be pulled upon calling this.
.. note:: This function is only available to accounts with admin status on the Remote
:param id: Id of User
:return: User object, if one with that id exists. Else, None
:raises: RuntimeError if there was an error pulling users
"""
if not self.has_pulled_users:
self.pull_users()
value = core.BNRemoteGetUserById(self._handle, id)
if value is None:
return None
return user.User(value)
[docs]
def get_user_by_username(self, username: str) -> Optional['user.User']:
"""
Get a specific user in the Remote by their username
.. note:: If users have not been pulled, they will be pulled upon calling this.
.. note:: This function is only available to accounts with admin status on the Remote
:param username: Username of User
:return: User object, if one with that name exists. Else, None
:raises: RuntimeError if there was an error pulling users
"""
if not self.has_pulled_users:
self.pull_users()
value = core.BNRemoteGetUserByUsername(self._handle, username)
if value is None:
return None
return user.User(value)
@property
def current_user(self) -> Optional['user.User']:
"""
Get the user object for the currently connected user (only if you are an admin)
.. note:: If users have not been pulled, they will be pulled upon calling this.
.. note:: This function is only available to accounts with admin status on the Remote
:return: User object
:raises: RuntimeError if there was an error pulling users
"""
if not self.has_pulled_users:
self.pull_users()
value = core.BNRemoteGetCurrentUser(self._handle)
if value is None:
return None
return user.User(value)
[docs]
def search_users(self, prefix: str) -> List[Tuple[str, str]]:
"""
Search for users in the Remote with a given prefix
:param prefix: Prefix of name for users
:return: List of user id <=> user name pairs
:raises: RuntimeError if there was an error
"""
count = ctypes.c_size_t()
user_ids = ctypes.POINTER(ctypes.c_char_p)()
usernames = ctypes.POINTER(ctypes.c_char_p)()
if not core.BNRemoteSearchUsers(self._handle, prefix, user_ids, usernames, count):
raise RuntimeError(util._last_error())
result = []
for i in range(count.value):
result.append((core.pyNativeStr(user_ids[i]), core.pyNativeStr(usernames[i])))
core.BNFreeStringList(user_ids, count.value)
core.BNFreeStringList(usernames, count.value)
return result
[docs]
def pull_users(self, progress: 'util.ProgressFuncType' = util.nop):
"""
Pull the list of users from the Remote.
.. note:: This function is only available to accounts with admin status on the Remote. \
Non-admin accounts attempting to call this function will pull an empty list of users.
:param progress: Function to call for progress updates
:raises: RuntimeError if there was an error pulling users
"""
if not core.BNRemotePullUsers(self._handle, util.wrap_progress(progress), None):
raise RuntimeError(util._last_error())
[docs]
def create_user(self, username: str, email: str, is_active: bool, password: str, group_ids: List[int], user_permission_ids: List[int]) -> 'user.User':
"""
Create a new user on the remote (and pull it)
.. note:: This function is only available to accounts with admin status on the Remote
:param username: User username
:param email: User email
:param is_active: If the user is enabled
:param password: User password
:param group_ids: List of group ids for the user
:param user_permission_ids: List of permission ids for the user
:return: Reference to the created user
:raises: RuntimeError if there was an error
"""
group_ids_array = (ctypes.c_uint64 * len(group_ids))()
for i in range(len(group_ids)):
group_ids_array[i] = group_ids[i]
user_permission_ids_array = (ctypes.c_uint64 * len(group_ids))()
for i in range(len(user_permission_ids)):
user_permission_ids_array[i] = user_permission_ids[i]
value = core.BNRemoteCreateUser(self._handle, username, email, is_active, password, group_ids_array, len(group_ids), user_permission_ids_array, len(user_permission_ids))
if value is None:
raise RuntimeError(util._last_error())
return user.User(value)
[docs]
def push_user(self, user: 'user.User', extra_fields: Optional[Dict[str, str]] = None):
"""
Push an updated User object to the Remote
.. note:: This function is only available to accounts with admin status on the Remote
:param group: User object which has been updated
:param extra_fields: Extra HTTP fields to send with the update
:raises: RuntimeError if there was an error
"""
if extra_fields is None:
extra_fields = {}
extra_field_keys = (ctypes.c_char_p * len(extra_fields))()
extra_field_values = (ctypes.c_char_p * len(extra_fields))()
for (i, (key, value)) in enumerate(extra_fields.items()):
extra_field_keys[i] = core.cstr(key)
extra_field_values[i] = core.cstr(value)
if not core.BNRemotePushUser(self._handle, user._handle, extra_field_keys, extra_field_values, len(extra_fields)):
raise RuntimeError(util._last_error())