Source code for strongdoc.api.account

#
# All Rights Reserved 2020
#

from strongdoc import client
from strongdoc import constants
from strongdoc.proto import strongdoc_pb2_grpc, accounts_pb2

from datetime import timezone

[docs]class AccountInfo: """ Attributes: orgid: :class:`str` org_email: :class:`str` subscription: :class:`Subscription` payments: list(:class:`Payment`) org_address: :class:`str` multilevel_sharing: :class:`bool` sharable_orgs: list(:class:`str`) """ def __init__(self, proto_account_info): self.orgid = proto_account_info.orgID self.org_email = proto_account_info.orgEmail self.subscription = Subscription(proto_account_info.subscription) self.payments = [Payment(payment) for payment in proto_account_info.payments] self.org_address = proto_account_info.orgAddress self.multilevel_sharing = proto_account_info.multiLevelShare self.sharable_orgs = [sharable_org for sharable_org in proto_account_info.sharableOrgs] def __repr__(self): result = "\n".join(["{}: {}".format(key, str(value).replace('\n', '\n{}'.format(' '*(2+len(key))))) for key, value in self.__dict__.items()]) return result def __str__(self): return self.__repr__()
[docs]class UserInfo: """ Attributes: userid: :class:`str` name: :class:`str` email: optional :class:`str`, otherwise :class:`None` orgid: optional :class:`str`, otherwise :class:`None` is_admin: :class:`bool` """ def __init__(self, proto_user): self.userid = proto_user.userID self.name = proto_user.userName try: self.email = proto_user.email except: self.email = None try: self.orgid = proto_user.orgID except: self.orgid = None self.is_admin = proto_user.isAdmin def __repr__(self): result = "\n".join(["{}: {}".format(key, str(value).replace('\n', '\n{}'.format(' '*(2+len(key))))) for key, value in self.__dict__.items()]) return result def __str__(self): return self.__repr__()
[docs]class Subscription: """ Attributes: type: :class:`str` status: :class:`str` """ def __init__(self, proto_subscription): self.type = proto_subscription.type self.status = proto_subscription.status def __repr__(self): result = "\n".join(["{}: {}".format(key, str(value).replace('\n', '\n{}'.format(' '*(2+len(key))))) for key, value in self.__dict__.items()]) return result def __str__(self): return self.__repr__()
[docs]class Payment: """ Attributes: billed_at: :class:`datetime.datetime` period_start: :class:`datetime.datetime` period_end: :class:`datetime.datetime` amount: :class:`float` status: :class:`str` """ def __init__(self, proto_payment): self.billed_at = proto_payment.billedAt.ToDatetime().replace(tzinfo=timezone.utc) self.period_start = proto_payment.periodStart.ToDatetime().replace(tzinfo=timezone.utc) self.period_end = proto_payment.periodEnd.ToDatetime().replace(tzinfo=timezone.utc) self.amount = proto_payment.amount self.status = proto_payment.status def __repr__(self): result = "\n".join(["{}: {}".format(key, str(value).replace('\n', '\n{}'.format(' '*(2+len(key))))) for key, value in self.__dict__.items()]) return result def __str__(self): return self.__repr__()
# _register_organization creates an organization def _register_organization(org_name, org_email, org_addr, admin_name, admin_password, admin_email, source, source_data): """ Registers a new organization. A new administrator user will also be created. New users can be added using this administrator account. A gRPC connection timeout will be implemented. :param org_name: The organization name to create. :type org_name: str :param org_email: The organization email address. :type org_email: str :param org_addr: The organization address. :type org_addr: str :param admin_name: The organization administrator name. :type admin_name: str :param admin_password: The organization administrator password. :type admin_password: str :param admin_email: The organization administrator email. :type admin_email: str :param source: The subscription source, such as "AWS Marketplace". :type source: str :param source_data: The registration data for the supplied source. :type source: str :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. Returns ------- orgID : :class:`str` The newly created organization ID. userID : :class:`str` The newly created user ID. """ with client.connect_to_server_with_no_auth() as no_auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(no_auth_conn) request = accounts_pb2.RegisterOrganizationReq(orgName=org_name, orgEmail=org_email, orgAddr=org_addr, userName=admin_name, password=admin_password, adminEmail=admin_email, source=source, sourceData=source_data) response = client_stub.RegisterOrganization(request, timeout=constants.GRPC_TIMEOUT) return response.orgID, response.userID # remove_organization removes an organization # :returns bool: Whether the removal was a success
[docs]def remove_organization(token, force): """ Removes an organization, deleting all data stored with the organization. This requires an administrator privilege. A gRPC connection timeout will be implemented. :param token: The user JWT token. :type token: str :param force: If this is false, removal will fail if there are still data stored with the organization. This prevents accidental deletion. :type force: bool :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: Whether the removal was a success. :rtype: bool """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.RemoveOrganizationReq(force=force) response = client_stub.RemoveOrganization(request, timeout=constants.GRPC_TIMEOUT) return response.success
# register_user registers a user for the active organization
[docs]def register_user(token, name, password, email, make_admin): """ Registers a user for the active organization. This requires an administrator privilege. A gRPC connection timeout will be implemented. :param token: The user JWT token. :type token: str :param name: The new user's name. :type name: str :param password: The new user's password. :type password: str :param email: The new user's email address. :type email: str :param make_admin: Whether or not the new user will be an admin. :type make_admin: bool :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: The userID of the new user. :rtype: str """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.RegisterUserReq(userName=name, password=password, email=email, admin=make_admin) response = client_stub.RegisterUser(request, timeout=constants.GRPC_TIMEOUT) return response.userID
# list_users lists the active organization's users
[docs]def list_users(token): """ Lists the users of the active organization. :param token: The user JWT token. :type token: str :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: A list of all the users in the organization. :rtype: list(UserInfo) """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.ListUsersReq() response = client_stub.ListUsers(request, timeout=constants.GRPC_TIMEOUT) return [UserInfo(user) for user in response.orgUsers]
# remove_user removes a user from the active organization
[docs]def remove_user(token, userid): """ Removes a user from the active organization. Deletes the user if this is the user's only organization. This requires an administrator privilege. A gRPC connection timeout will be implemented. :param token: The user JWT token. :type token: str :param userid: The userid of the user to remove. :type userid: str :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: The number of users removed. :rtype: int """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.RemoveUserReq(userID=userid) response = client_stub.RemoveUser(request, timeout=constants.GRPC_TIMEOUT) return response.count
# promote_user makes a user an admin of the active organization
[docs]def promote_user(token, userid): """ Promotes a user to admin status. This requires an administrator privilege. A gRPC connection timeout will be implemented. :param token: The user JWT token. :type token: str :param userid: The userid of the user to promote. :type userid: str :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: Whether the promotion was a success. :rtype: bool """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.PromoteUserReq(userID=userid) response = client_stub.PromoteUser(request, timeout=constants.GRPC_TIMEOUT) return response.success
# demote_user removes admin privileges from a user
[docs]def demote_user(token, userid): """ Removes admin privileges from a user. This requires an administrator privilege. A gRPC connection timeout will be implemented. :param token: The user JWT token. :type token: str :param userid: The userid of the user to demote. :type userid: str :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: Whether the demotion was a success. :rtype: bool """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.DemoteUserReq(userID=userid) response = client_stub.DemoteUser(request, timeout=constants.GRPC_TIMEOUT) return response.success
[docs]def add_sharable_org(token, orgid): """ Adds another org as a sharable org. This requires an administrator privilege. :param token: The user JWT token. :type token: str :param orgid: The orgid of the org to add as a sharable org. :type orgid: str :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: Whether the addition was a success. :rtype: bool """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.AddSharableOrgReq(newOrgID=orgid) response = client_stub.AddSharableOrg(request, timeout=constants.GRPC_TIMEOUT) return response.success
[docs]def remove_sharable_org(token, orgid): """ Removes a sharable org. This requires an administrator privilege. :param token: The user JWT token. :type token: str :param orgid: The orgid of the org to remove as a sharable org. :type orgid: str :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: Whether the removal was a success. :rtype: bool """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.RemoveSharableOrgReq(removeOrgID=orgid) response = client_stub.RemoveSharableOrg(request, timeout=constants.GRPC_TIMEOUT) return response.success
[docs]def set_multilevel_sharing(token, enable): """ Enables or disables multi-level sharing. This requires an administrator privilege. :param token: The user JWT token. :type token: str :param enable: Whether or not multi-level sharing should be enabled. :type enable: bool :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: Whether the change was a success. :rtype: bool """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.SetMultiLevelSharingReq(enable=enable) response = client_stub.SetMultiLevelSharing(request, timeout=constants.GRPC_TIMEOUT) return response.success
[docs]def get_account_info(token): """ Gets account information such as subscription details, payments, sharable orgs, etc. This requires an administrator privilege. :param token: The user JWT token. :type token: str :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: The account information :rtype: AccountInfo """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.GetAccountInfoReq() response = client_stub.GetAccountInfo(request, timeout=constants.GRPC_TIMEOUT) return AccountInfo(response)
[docs]def set_account_info(token, org_email, org_address): """ Set organization account info. This requires an administrator privilege. :param token: The user JWT token. :type token: str :param email: The organization email address. Must be a valid email address. :type email: str :param address: The organization address. :type address: str :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: Whether the change was a success. :rtype: bool """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.SetAccountInfoReq(orgEmail=org_email, orgAddress=org_address) response = client_stub.SetAccountInfo(request, timeout=constants.GRPC_TIMEOUT) return response.success
[docs]def get_user_info(token): """ Gets information about the active user. :param token: The user JWT token. :type token: str :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: The user information :rtype: UserInfo """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.GetUserInfoReq() response = client_stub.GetUserInfo(request, timeout=constants.GRPC_TIMEOUT) return UserInfo(response)
[docs]def set_user_info(token, name, email): """ Set user info. :param token: The user JWT token. :type token: str :param name: The user's name. Cannot be an empty string. :type name: str :param email: The user's email address. Must be a valid email address. :type email: str :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: Whether the change was a success. :rtype: bool """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.SetUserInfoReq(name=name, email=email) response = client_stub.SetUserInfo(request, timeout=constants.GRPC_TIMEOUT) return response.success
[docs]def change_user_password(token, old_password, new_password): """ Change user password. :param token: The user JWT token. :type token: str :param old_password: The user's current password. :type old_password: str :param new_password: The user's new password. :type new_password: str :raises grpc.RpcError: Raised by the gRPC library to indicate non-OK-status RPC termination. :returns: Whether the change was a success. :rtype: bool """ with client.connect_to_server_with_auth(token) as auth_conn: client_stub = strongdoc_pb2_grpc.StrongDocServiceStub(auth_conn) request = accounts_pb2.ChangeUserPasswordReq(oldPassword=old_password, newPassword=new_password) response = client_stub.ChangeUserPassword(request, timeout=constants.GRPC_TIMEOUT) return response.success