import ctypes
import json
import os
from typing import Dict, List, Optional, Tuple
import binaryninja
import binaryninja.enterprise as enterprise
from .. import _binaryninjacore as core
from . import databasesync, group, project, user, util
[docs]
class Remote:
	"""
	Class representing a connection to a Collaboration server
	"""
[docs]
	def __init__(self, handle: core.BNRemoteHandle):
		"""
		:param handle: FFI handle for internal use
		:raises: RuntimeError if there was an error
		"""
		self._handle = ctypes.cast(handle, core.BNRemoteHandle) 
	def __del__(self):
		if core is not None:
			core.BNFreeRemote(self._handle)
	def __eq__(self, other):
		if not isinstance(other, Remote):
			return False
		if not self.has_loaded_metadata or not other.has_loaded_metadata:
			# Don't pull metadata if we haven't yet
			return self.address == other.address
		return other.unique_id == self.unique_id
	def __str__(self):
		return f'<remote: {self.name}>'
	def __repr__(self):
		return f'<remote: {self.name}>'
[docs]
	@staticmethod
	def get_for_local_database(database: 'binaryninja.Database') -> Optional['Remote']:
		"""
		Get the Remote for a Database
		:param database: BN database, potentially with collaboration metadata
		:return: Remote from one of the connected remotes, or None if not found
		:rtype: Optional[Remote]
		:raises RuntimeError: If there was an error
		"""
		return databasesync.get_remote_for_local_database(database) 
[docs]
	@staticmethod
	def get_for_bv(bv: 'binaryninja.BinaryView') -> Optional['Remote']:
		"""
		Get the Remote for a Binary View
		:param bv: Binary view, potentially with collaboration metadata
		:return: Remote from one of the connected remotes, or None if not found
		:raises RuntimeError: If there was an error
		"""
		if not bv.file.has_database:
			return None
		db = bv.file.database
		if db is None:
			return None
		return databasesync.get_remote_for_local_database(db) 
	@property
	def has_loaded_metadata(self):
		"""
		If the remote has pulled metadata like its id, etc
		:return: True if it has been pulled
		"""
		return core.BNRemoteHasLoadedMetadata(self._handle)
	@property
	def unique_id(self) -> str:
		"""
		Unique id. If metadata has not been pulled, it will be pulled upon calling this.
		:return: Id string
		:raises RuntimeError: If there was an error pulling metadata.
		"""
		if not self.has_loaded_metadata:
			self.load_metadata()
		return core.BNRemoteGetUniqueId(self._handle)
	@property
	def name(self) -> str:
		"""
		Assigned name of the Remote
		:return: Name string
		"""
		return core.BNRemoteGetName(self._handle)
	@property
	def address(self) -> str:
		"""
		Base address of the Remote
		:return: URL string
		"""
		return core.BNRemoteGetAddress(self._handle)
	@property
	def is_connected(self) -> bool:
		"""
		If the Remote is connected (has `Remote.connect` been called)
		:return: True if connected
		"""
		return core.BNRemoteIsConnected(self._handle)
	@property
	def username(self) -> str:
		"""
		Username used to connect to the remote
		:return: Username string
		"""
		return core.BNRemoteGetUsername(self._handle)
	@property
	def token(self) -> str:
		"""
		Token used to connect to the remote
		:return: Token string
		"""
		return core.BNRemoteGetToken(self._handle)
	@property
	def server_version(self) -> int:
		"""
		Version of software running on the server. If metadata has not been pulled, it will
		be pulled upon calling this.
		:return: Server version number
		:raises RuntimeError: If there was an error
		"""
		if not self.has_loaded_metadata:
			self.load_metadata()
		return core.BNRemoteGetServerVersion(self._handle)
	@property
	def server_build_id(self) -> str:
		"""
		Build id of software running on the server. If metadata has not been pulled, it will
		be pulled upon calling this.
		:return: Server build id string
		:raises RuntimeError: If there was an error
		"""
		if not self.has_loaded_metadata:
			self.load_metadata()
		return core.BNRemoteGetServerBuildId(self._handle)
	@property
	def auth_backends(self) -> List[Tuple[str, str]]:
		"""
		List of supported authentication backends on the server.
		If metadata has not been pulled, it will be pulled upon calling this.
		:return: List of Backend id <=> backend display name tuples
		:raises RuntimeError: If there was an error
		"""
		if not self.has_loaded_metadata:
			self.load_metadata()
		backend_ids = ctypes.POINTER(ctypes.c_char_p)()
		backend_names = ctypes.POINTER(ctypes.c_char_p)()
		count = ctypes.c_size_t()
		if not core.BNRemoteGetAuthBackends(self._handle, backend_ids, backend_names, count):
			raise RuntimeError(util._last_error())
		result = []
		for i in range(count.value):
			result.append((core.pyNativeStr(backend_ids[i]), core.pyNativeStr(backend_names[i])))
		core.BNFreeStringList(backend_ids, count.value)
		core.BNFreeStringList(backend_names, count.value)
		return result
	@property
	def is_admin(self) -> bool:
		"""
		If the currently connected user is an administrator.
		.. note:: If users have not been pulled, they will attempt to be pulled upon calling this.
		:return: True if the user is an administrator
		"""
		# This is the test by which the api knows it is an admin
		if not self.has_pulled_users:
			self.pull_users()
		return core.BNRemoteIsAdmin(self._handle)
	@property
	def is_enterprise(self) -> bool:
		"""
		If this remote is the same as the Enterprise License server
		:return: True if the same
		"""
		if not self.has_loaded_metadata:
			self.load_metadata()
		return core.BNRemoteIsEnterprise(self._handle)
[docs]
	def request_authentication_token(self, username: str, password: str) -> Optional[str]:
		"""
		Request an authentication token using a username and password.
		:param username: Username to authenticate with
		:param password: Password of user
		:return: Authentication token string, or None if there was an error
		"""
		return core.BNRemoteRequestAuthenticationToken(self._handle, username, password) 
[docs]
	def connect(self, username: Optional[str] = None, token: Optional[str] = None):
		"""
		Connect to a Remote, loading metadata and optionally acquiring a token.
		.. note:: If no username or token are provided, they will be looked up from the keychain, \
		likely saved there by Enterprise authentication.
		:param username: Optional username to connect with
		:param token: Optional token to authenticate with
		:raises RuntimeError: If the connection fails
		"""
		if not self.has_loaded_metadata:
			self.load_metadata()
		got_auth = False
		if username is not None and token is not None:
			got_auth = True
		if not got_auth:
			# Try logging in with defaults
			if self.is_enterprise and enterprise.is_authenticated():
				username = enterprise.username()
				token = enterprise.token()
				if username is not None and token is not None:
					got_auth = True
		if not got_auth:
			# Try to load from default secrets provider
			secrets = binaryninja.SecretsProvider[
				binaryninja.Settings().get_string("enterprise.secretsProvider")]
			if secrets.has_data(self.address):
				creds = json.loads(secrets.get_data(self.address))
				username = creds['username']
				token = creds['token']
				got_auth = True
		if not got_auth:
			# Try logging in with creds in the env
			if os.environ.get('BN_ENTERPRISE_USERNAME') is not None and \
				
os.environ.get('BN_ENTERPRISE_PASSWORD') is not None:
				token = self.request_authentication_token(os.environ['BN_ENTERPRISE_USERNAME'], os.environ['BN_ENTERPRISE_PASSWORD'])
				if token is not None:
					username = os.environ['BN_ENTERPRISE_USERNAME']
					got_auth = True
		if not got_auth or username is None or token is None:
			raise RuntimeError("Cannot connect without a username or token")
		if not core.BNRemoteConnect(self._handle, username, token):
			raise RuntimeError(util._last_error()) 
[docs]
	def disconnect(self):
		"""
		Disconnect from the remote
		:raises RuntimeError: If there was somehow an error
		"""
		if not core.BNRemoteDisconnect(self._handle):
			raise RuntimeError(util._last_error()) 
	@property
	def has_pulled_projects(self) -> bool:
		"""
		If the project has pulled the projects yet
		:return: True if they have been pulled
		"""
		return core.BNRemoteHasPulledProjects(self._handle)
	@property
	def has_pulled_groups(self) -> bool:
		"""
		If the project has pulled the groups yet
		:return: True if they have been pulled
		"""
		return core.BNRemoteHasPulledGroups(self._handle)
	@property
	def has_pulled_users(self) -> bool:
		"""
		If the project has pulled the users yet
		:return: True if they have been pulled
		"""
		return core.BNRemoteHasPulledUsers(self._handle)
	@property
	def projects(self) -> List['project.RemoteProject']:
		"""
		Get the list of projects in this project.
		.. note:: If projects have not been pulled, they will be pulled upon calling this.
		:return: List of Project objects
		:raises: RuntimeError if there was an error pulling projects
		"""
		if not self.has_pulled_projects:
			self.pull_projects()
		count = ctypes.c_size_t()
		value = core.BNRemoteGetProjects(self._handle, count)
		if value is None:
			raise RuntimeError(util._last_error())
		result = []
		for i in range(count.value):
			result.append(project.RemoteProject(value[i]))
		return result
[docs]
	def get_project_by_id(self, id: str) -> Optional['project.RemoteProject']:
		"""
		Get a specific project in the Remote by its id
		.. note:: If projects have not been pulled, they will be pulled upon calling this.
		:param id: Id of Project
		:return: Project object, if one with that id exists. Else, None
		:raises: RuntimeError if there was an error pulling projects
		"""
		if not self.has_pulled_projects:
			self.pull_projects()
		value = core.BNRemoteGetProjectById(self._handle, id)
		if value is None:
			return None
		return project.RemoteProject(value) 
[docs]
	def get_project_by_name(self, name: str) -> Optional['project.RemoteProject']:
		"""
		Get a specific project in the Remote by its name
		.. note:: If projects have not been pulled, they will be pulled upon calling this.
		:param name: Name of Project
		:return: Project object, if one with that name exists. Else, None
		:raises: RuntimeError if there was an error pulling projects
		"""
		if not self.has_pulled_projects:
			self.pull_projects()
		value = core.BNRemoteGetProjectByName(self._handle, name)
		if value is None:
			return None
		return project.RemoteProject(value) 
[docs]
	def pull_projects(self, progress: 'util.ProgressFuncType' = util.nop):
		"""
		Pull the list of projects from the Remote.
		:param progress: Function to call for progress updates
		:raises: RuntimeError if there was an error pulling projects
		"""
		if not core.BNRemotePullProjects(self._handle, util.wrap_progress(progress), None):
			raise RuntimeError(util._last_error()) 
[docs]
	def create_project(self, name: str, description: str) -> 'project.RemoteProject':
		"""
		Create a new project on the remote (and pull it)
		:param name: Project name
		:param description: Project description
		:return: Reference to the created project
		:raises: RuntimeError if there was an error
		"""
		value = core.BNRemoteCreateProject(self._handle, name, description)
		if value is None:
			raise RuntimeError(util._last_error())
		return project.RemoteProject(value) 
[docs]
	def push_project(self, project: 'project.RemoteProject', extra_fields: Optional[Dict[str, str]] = None):
		"""
		Push an updated Project object to the Remote
		:param project: Project object which has been updated
		:param extra_fields: Extra HTTP fields to send with the update
		:raises: RuntimeError if there was an error
		"""
		if extra_fields is None:
			extra_fields = {}
		extra_field_keys = (ctypes.c_char_p * len(extra_fields))()
		extra_field_values = (ctypes.c_char_p * len(extra_fields))()
		for (i, (key, value)) in enumerate(extra_fields.items()):
			extra_field_keys[i] = core.cstr(key)
			extra_field_values[i] = core.cstr(value)
		if not core.BNRemotePushProject(self._handle, project._handle, extra_field_keys, extra_field_values, len(extra_fields)):
			raise RuntimeError(util._last_error()) 
[docs]
	def delete_project(self, project: 'project.RemoteProject'):
		"""
		Delete a project from the remote
		:param project: Project to delete
		:raises: RuntimeError if there was an error
		"""
		if not core.BNRemoteDeleteProject(self._handle, project._handle):
			raise RuntimeError(util._last_error()) 
	@property
	def groups(self) -> List['group.Group']:
		"""
		Get the list of groups in this project.
		.. note:: If groups have not been pulled, they will be pulled upon calling this.
		.. note:: This function is only available to accounts with admin status on the Remote
		:return: List of Group objects
		:raises: RuntimeError if there was an error pulling groups
		"""
		if not self.has_pulled_groups:
			self.pull_groups()
		count = ctypes.c_size_t()
		value = core.BNRemoteGetGroups(self._handle, count)
		if value is None:
			raise RuntimeError(util._last_error())
		result = []
		for i in range(count.value):
			result.append(group.Group(value[i]))
		return result
[docs]
	def get_group_by_id(self, id: int) -> Optional['group.Group']:
		"""
		Get a specific group in the Remote by its id
		.. note:: If groups have not been pulled, they will be pulled upon calling this.
		.. note:: This function is only available to accounts with admin status on the Remote
		:param id: Id of Group
		:return: Group object, if one with that id exists. Else, None
		:raises: RuntimeError if there was an error pulling groups
		"""
		if not self.has_pulled_groups:
			self.pull_groups()
		value = core.BNRemoteGetGroupById(self._handle, id)
		if value is None:
			return None
		return group.Group(value) 
[docs]
	def get_group_by_name(self, name: str) -> Optional['group.Group']:
		"""
		Get a specific group in the Remote by its name
		.. note:: If groups have not been pulled, they will be pulled upon calling this.
		.. note:: This function is only available to accounts with admin status on the Remote
		:param name: Name of Group
		:return: Group object, if one with that name exists. Else, None
		:raises: RuntimeError if there was an error pulling groups
		"""
		if not self.has_pulled_groups:
			self.pull_groups()
		value = core.BNRemoteGetGroupByName(self._handle, name)
		if value is None:
			return None
		return group.Group(value) 
[docs]
	def search_groups(self, prefix: str) -> List[Tuple[int, str]]:
		"""
		Search for groups in the Remote with a given prefix
		:param prefix: Prefix of name for groups
		:return: List of group id <=> group name pairs
		:raises: RuntimeError if there was an error
		"""
		count = ctypes.c_size_t()
		group_ids = ctypes.POINTER(ctypes.c_uint64)()
		group_names = ctypes.POINTER(ctypes.c_char_p)()
		if not core.BNRemoteSearchGroups(self._handle, prefix, group_ids, group_names, count):
			raise RuntimeError(util._last_error())
		result = []
		for i in range(count.value):
			result.append((group_ids[i], core.pyNativeStr(group_names[i])))
		core.BNCollaborationFreeIdList(group_ids, count.value)
		core.BNFreeStringList(group_names, count.value)
		return result 
[docs]
	def pull_groups(self, progress: 'util.ProgressFuncType' = util.nop):
		"""
		Pull the list of groups from the Remote.
		.. note:: This function is only available to accounts with admin status on the Remote
		:param progress: Function to call for progress updates
		:raises: RuntimeError if there was an error pulling groups
		"""
		if not core.BNRemotePullGroups(self._handle, util.wrap_progress(progress), None):
			raise RuntimeError(util._last_error()) 
[docs]
	def create_group(self, name: str, usernames: List[str]) -> 'group.Group':
		"""
		Create a new group on the remote (and pull it)
		.. note:: This function is only available to accounts with admin status on the Remote
		:param name: Group name
		:param usernames: List of usernames of users in the group
		:return: Reference to the created group
		:raises: RuntimeError if there was an error
		"""
		c_usernames = (ctypes.c_char_p * len(usernames))()
		for (i, username) in enumerate(usernames):
			c_usernames[i] = core.cstr(username)
		value = core.BNRemoteCreateGroup(self._handle, name, c_usernames, len(usernames))
		if value is None:
			raise RuntimeError(util._last_error())
		return group.Group(value) 
[docs]
	def push_group(self, group: 'group.Group', extra_fields: Optional[Dict[str, str]] = None):
		"""
		Push an updated Group object to the Remote
		.. note:: This function is only available to accounts with admin status on the Remote
		:param group: Group object which has been updated
		:param extra_fields: Extra HTTP fields to send with the update
		:raises: RuntimeError if there was an error
		"""
		if extra_fields is None:
			extra_fields = {}
		extra_field_keys = (ctypes.c_char_p * len(extra_fields))()
		extra_field_values = (ctypes.c_char_p * len(extra_fields))()
		for (i, (key, value)) in enumerate(extra_fields.items()):
			extra_field_keys[i] = core.cstr(key)
			extra_field_values[i] = core.cstr(value)
		if not core.BNRemotePushGroup(self._handle, group._handle, extra_field_keys, extra_field_values, len(extra_fields)):
			raise RuntimeError(util._last_error()) 
[docs]
	def delete_group(self, group: 'group.Group'):
		"""
		Delete a group from the remote
		.. note:: This function is only available to accounts with admin status on the Remote
		:param group: Group to delete
		:raises: RuntimeError if there was an error
		"""
		if not core.BNRemoteDeleteGroup(self._handle, group._handle):
			raise RuntimeError(util._last_error()) 
	@property
	def users(self) -> List['user.User']:
		"""
		Get the list of users in this project.
		.. note:: If users have not been pulled, they will be pulled upon calling this.
		.. note:: This function is only available to accounts with admin status on the Remote
		:return: List of User objects
		:raises: RuntimeError if there was an error pulling users
		"""
		if not self.has_pulled_users:
			self.pull_users()
		count = ctypes.c_size_t()
		value = core.BNRemoteGetUsers(self._handle, count)
		if value is None:
			raise RuntimeError(util._last_error())
		result = []
		for i in range(count.value):
			result.append(user.User(value[i]))
		return result
[docs]
	def get_user_by_id(self, id: str) -> Optional['user.User']:
		"""
		Get a specific user in the Remote by their id
		.. note:: If users have not been pulled, they will be pulled upon calling this.
		.. note:: This function is only available to accounts with admin status on the Remote
		:param id: Id of User
		:return: User object, if one with that id exists. Else, None
		:raises: RuntimeError if there was an error pulling users
		"""
		if not self.has_pulled_users:
			self.pull_users()
		value = core.BNRemoteGetUserById(self._handle, id)
		if value is None:
			return None
		return user.User(value) 
[docs]
	def get_user_by_username(self, username: str) -> Optional['user.User']:
		"""
		Get a specific user in the Remote by their username
		.. note:: If users have not been pulled, they will be pulled upon calling this.
		.. note:: This function is only available to accounts with admin status on the Remote
		:param username: Username of User
		:return: User object, if one with that name exists. Else, None
		:raises: RuntimeError if there was an error pulling users
		"""
		if not self.has_pulled_users:
			self.pull_users()
		value = core.BNRemoteGetUserByUsername(self._handle, username)
		if value is None:
			return None
		return user.User(value) 
	@property
	def current_user(self) -> Optional['user.User']:
		"""
		Get the user object for the currently connected user (only if you are an admin)
		.. note:: If users have not been pulled, they will be pulled upon calling this.
		.. note:: This function is only available to accounts with admin status on the Remote
		:return: User object
		:raises: RuntimeError if there was an error pulling users
		"""
		if not self.has_pulled_users:
			self.pull_users()
		value = core.BNRemoteGetCurrentUser(self._handle)
		if value is None:
			return None
		return user.User(value)
[docs]
	def search_users(self, prefix: str) -> List[Tuple[str, str]]:
		"""
		Search for users in the Remote with a given prefix
		:param prefix: Prefix of name for users
		:return: List of user id <=> user name pairs
		:raises: RuntimeError if there was an error
		"""
		count = ctypes.c_size_t()
		user_ids = ctypes.POINTER(ctypes.c_char_p)()
		usernames = ctypes.POINTER(ctypes.c_char_p)()
		if not core.BNRemoteSearchUsers(self._handle, prefix, user_ids, usernames, count):
			raise RuntimeError(util._last_error())
		result = []
		for i in range(count.value):
			result.append((core.pyNativeStr(user_ids[i]), core.pyNativeStr(usernames[i])))
		core.BNFreeStringList(user_ids, count.value)
		core.BNFreeStringList(usernames, count.value)
		return result 
[docs]
	def pull_users(self, progress: 'util.ProgressFuncType' = util.nop):
		"""
		Pull the list of users from the Remote.
		.. note:: This function is only available to accounts with admin status on the Remote. \
		Non-admin accounts attempting to call this function will pull an empty list of users.
		:param progress: Function to call for progress updates
		:raises: RuntimeError if there was an error pulling users
		"""
		if not core.BNRemotePullUsers(self._handle, util.wrap_progress(progress), None):
			raise RuntimeError(util._last_error()) 
[docs]
	def create_user(self, username: str, email: str, is_active: bool, password: str, group_ids: List[int], user_permission_ids: List[int]) -> 'user.User':
		"""
		Create a new user on the remote (and pull it)
		.. note:: This function is only available to accounts with admin status on the Remote
		:param username: User username
		:param email: User email
		:param is_active: If the user is enabled
		:param password: User password
		:param group_ids: List of group ids for the user
		:param user_permission_ids: List of permission ids for the user
		:return: Reference to the created user
		:raises: RuntimeError if there was an error
		"""
		group_ids_array = (ctypes.c_uint64 * len(group_ids))()
		for i in range(len(group_ids)):
			group_ids_array[i] = group_ids[i]
		user_permission_ids_array = (ctypes.c_uint64 * len(group_ids))()
		for i in range(len(user_permission_ids)):
			user_permission_ids_array[i] = user_permission_ids[i]
		value = core.BNRemoteCreateUser(self._handle, username, email, is_active, password, group_ids_array, len(group_ids), user_permission_ids_array, len(user_permission_ids))
		if value is None:
			raise RuntimeError(util._last_error())
		return user.User(value) 
[docs]
	def push_user(self, user: 'user.User', extra_fields: Optional[Dict[str, str]] = None):
		"""
		Push an updated User object to the Remote
		.. note:: This function is only available to accounts with admin status on the Remote
		:param group: User object which has been updated
		:param extra_fields: Extra HTTP fields to send with the update
		:raises: RuntimeError if there was an error
		"""
		if extra_fields is None:
			extra_fields = {}
		extra_field_keys = (ctypes.c_char_p * len(extra_fields))()
		extra_field_values = (ctypes.c_char_p * len(extra_fields))()
		for (i, (key, value)) in enumerate(extra_fields.items()):
			extra_field_keys[i] = core.cstr(key)
			extra_field_values[i] = core.cstr(value)
		if not core.BNRemotePushUser(self._handle, user._handle, extra_field_keys, extra_field_values, len(extra_fields)):
			raise RuntimeError(util._last_error())