Source code for pyarcher.archer

# -*- coding: utf-8 -*-
"""Main module."""
import json
import logging

import requests

from pyarcher.application import Application
from pyarcher.base import ArcherBase
from pyarcher.group import Group
from pyarcher.user import User

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(levelname)-8s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')


[docs]class Archer(ArcherBase): """Creates archer instance object using following arguments A username and password or a cert and key must be passed to initiate an archer instance. Args: url (str): Full path url to archer instance. example: https://archer.com/rsarcher instance_name (str): Archer instance name. user_domain (optional, str) username (optional, str): Username to login with. password (optional, str): Password to login with. client_cert (optional, tuple(str, str)): Tuple of cert and key file path. Attributes: """ _group_membership: dict = None _version: str = None
[docs] def refresh_metadata(self): """Refresh Metadata.""" pass
@property def _dirty_objects(self): """Keeps track of a list of dirty objects."""
[docs] def get_user(self, obj_id: int): """Get a user. Get a archer user object Args: obj_id (int): User ID Returns: pyarcher.user.User """ return self.resource("user", obj_id=obj_id)
[docs] def query_users(self, params: dict = {}, raw=False): """Query for Users. Args: params (dict): Send a dictionary of ODATA Params without the "$". Example: params['filter'] = "AccountStatus eq '1'" Returns: requests.models.Response: The response of the http call from requests. """ if "Id" not in params.get("select", "Id,DisplayName,FirstName,LastName"): params['select'] = f"Id,{params['select']}" params = {f"${key}": value for key, value in params.items()} resp = self.request_helper("core/system/user/", method="get", params=params) resp_data = resp.json() if raw: return resp return [ self.get_user(user['RequestedObject']['Id']) for user in resp_data ]
[docs] def create_user(self, username: str, first_name: str, last_name: str, password: str, account_status: int = 1): """Create User. Create an archer user. Args: username (str): Desired username first_name (str): User's first name last_name (str): User's last name password (str): User's password account_status (int): Pass the integer value for active, inactive, locked Returns: resp (requests.models.Response) """ data = { "User": { "FirstName": first_name, "LastName": last_name, "Username": username, }, "Password": password } resp = self.request_helper("core/system/user", method="post", data=data) return resp
[docs] def update_user(self, obj_id): """Update User."""
# TODO: Create method
[docs] def delete_user(self, obj_id): """Delete User."""
# TODO: Create method
[docs] def get_group(self, obj_id): """Get a group. Get a archer group object Args: obj_id (int): Group ID Returns: pyarcher.group.Group """ return self.resource("group", obj_id=obj_id)
[docs] def get_group_hierarchy(self): """Get all group hierarchy.""" resp = self.request_helper("core/system/grouphierarchy", method="get") resp_data = resp.json() hierarchy_groups = {} for data in resp_data: _id = data['RequestedObject']['Id'] related_id = data['RequestedObject']['RelatedId'] generation = data['RequestedObject']['Generation'] if hierarchy_groups.get(_id): hold = hierarchy_groups[_id] hold.append({ "group": self.get_group(related_id), "generation": generation }) hierarchy_groups[_id] = hold else: hierarchy_groups[_id] = [{ "group": self.get_group(related_id), "generation": generation }] return hierarchy_groups
[docs] def query_groups(self, params: dict = {}, raw=False): """Query for Groups. Args: params (dict): Send a dictionary of ODATA Params without the "$". Example: params['filter'] = "Name eq 'group_name'" Returns: requests.models.Response: The response of the http call from requests. """ if "Id" not in params.get("select", "Id,DisplayName,FirstName,LastName"): params['select'] = f"Id,{params['select']}" params = {f"${key}": value for key, value in params.items()} resp = self.request_helper("core/system/group/", method="get", params=params) if raw: return resp resp_data = resp.json() return [ self.get_group(user['RequestedObject']['Id']) for user in resp_data ]
[docs] def get_all_groups(self) -> dict: """Get all archer groups. Returns: groups (List[pyarcher.group.Group]) """ resp = self.request_helper("core/system/group/", method="get") resp_data = resp.json() groups = [] for group in resp_data: new_group = self.get_group(group["RequestedObject"]["Id"]) new_group.metadata = group["RequestedObject"] groups.append(new_group) return groups
[docs] def update_group(self, group_id: int, name: str, parent_groups: list = None, child_groups: list = None, child_users: list = None): """Update Group. Update this groups child groups, members, name, description, or parent groups. This function is delcarative, meaning whatever you specify will be the truth. WARNING: This will not update other objects that are affected by this. Args: group_id (int): Group ID to modify name (str): Group Name parent_group (list[int]): List of group ids to be set as a parent. child_group (list[int]): List of group ids to be set as a child. child_users (list[int]): List of user ids to be set as a member. Returns: resp (requests.models.Response) """ api_url = f"core/system/group" data = { "Group": { "Id": group_id, "Name": name }, "ParentGroups": parent_groups, "ChildGroups": child_groups, "ChildUsers": child_users } resp = self.request_helper(api_url, method="put", data=data) return resp
[docs] def delete_group(self, group_id: int): """Delete Group. Args: group_id (int): Group ID Returns: resp (requests.models.Response) """ api_url = f"core/system/group/{group_id}" resp = self.request_helper(api_url, method="delete", platform_api=True) return resp
[docs] def create_group(self, name: str, description: str = None, parent_groups: list = None, child_groups: list = None, child_users: list = None): """Create a group. Args: name (str): Name of the group. description (str, optional): Description of the group. parent_groups (list[int], optional): List of group ids to be set as a parent. child_Groups (list[int], optional): List of group ids to be set as a child. child_users (list[int], optional): List of user ids to be set as a member. Returns: If successful: pyarcher.group.Group else: resp (requests.models.Response) """ api_url = f"core/system/group" data = { "Group": { "Name": name, "Description": description }, "ParentGroups": parent_groups, "ChildGroups": child_groups, "ChildUsers": child_users } resp = self.request_helper(api_url, data=data, method="post", platform_api=True) resp_data = resp.json() if resp_data['IsSuccessful']: return self.get_group(resp_data['RequestedObject']['Id']) return resp
[docs] def modify_group_role(self, group_id: int, role_id: int, is_add: bool): """Modify Group Role. Args: group_id (int): Group id to target role_id (int): Role id to add or remove from group is_add (bool): Add or remove role from group Returns: If successful: pyarcher.group.Group else: resp (requests.models.Response) """ api_url = f"core/system/rolegroup" data = {"GroupId": group_id, "RoleId": role_id, "IsAdd": is_add} resp = self.request_helper(api_url, method="put", data=data, platform_api=True) if resp.json()['IsSuccessful']: return self.get_group(group_id) return resp
[docs] def modify_child_group(self, group_id: int, group_member_id: int, is_add: bool): """Modify child group Args: group_id (int): Group id to target group_member_id (int): Group id to add or remove from group member is_add (bool): Add or remove role from group Returns: If successful: pyarcher.group.Group else: resp (requests.models.Response) """ api_url = "core/system/groupmember" data = { "GroupId": group_id, "GroupMemberId": group_member_id, "IsAdd": is_add } resp = self.request_helper(api_url, method="put", data=data, platform_api=True) if resp.json()['IsSuccessful']: return self.get_group(group_id) return resp
[docs] def get_groups_by_user(self, user_id: int): """Get all groups that are assigned to a user. Args: user_id (int): User id to check groups Returns: List[pyarcher.group.Groups] """ api_url = f"core/system/group/user/{user_id}" resp = self.request_helper(api_url, method="get", platform_api=True) resp_data = resp.json() groups = [] for data in resp_data: if data['IsSuccessful']: request_data = data['RequestedObject'] group = self.get_group(request_data['Id']) group.metadata = request_data groups.append(group) return groups
[docs] def group_membership_setter(self, group_id: int): """Group membership data form specific group. Args: group_id (int): Group ID Returns: hold (dict) """ hold = {} for data in self.group_membership: if data['GroupId'] == group_id: hold.update({"members": []}) if data['UserIds']: hold.update({ "members": [self.get_user(user) for user in data['UserIds']] }) hold.update({"parent_groups": []}) if data['ParentGroupIds']: hold.update({ "parent_groups": [ self.get_group(group) for group in data['ParentGroupIds'] ] }) return hold
@property def group_membership(self): """Group Membership property method. Returns: List[Dict] """ if not self._group_membership: api_url = f"core/system/groupmembership" resp_data = self.request_helper(api_url, method="get").json() self._group_membership = [ data['RequestedObject'] for data in resp_data ] return self._group_membership
[docs] def get_active_users_with_no_login(self): """Prebuilt function for query_user.""" params = { "select": "Id,UserName,DisplayName", "filter": "AccountStatus eq '1'and LastLoginDate eq null", "orderby": "LastName" } return self.query_users(params)
[docs] def get_application(self, obj_id: int): """Get an application. Get an archer application object Args: obj_id (int): Application ID Returns: pyarcher.application.Application """ return self.resource("application", obj_id=obj_id)
[docs] def get_all_application(self): resp = self.request_helper("core/system/application/", method="get") resp_data = resp.json() return [ self.get_application(app['RequestedObject']['Id']) for app in resp_data ]
[docs] def raw_version(self): """Return dictionary of the version.""" api_url = f"core/system/applicationinfo/version" resp_data = self.request_helper(api_url, method="get") return resp_data
@property def version(self): """Property method for version""" if not self._version: _version = self.raw_version().json()['RequestedObject']['Version'] self._version = _version return self._version
# TODO: Remove all old code below # def create_content_record(self, fields_json, record_id=None): # """ # :param fields_json: {field name how you see it in the app: value content # (for text it text, for others it's internal unique ids)} # :param record_id: # :returns int - record_id # """ # api_url = f"{self.api_url_base}core/content/" # post_header = dict(self.header) # transformed_json = {} # for key in fields_json.keys(): # current_key_id = self.get_field_id_by_name(key) # transformed_json[current_key_id] = self.add_value_to_field(current_key_id, fields_json[key]) # if record_id: # post_header["X-Http-Method-Override"] = "PUT" # body = json.dumps({"Content": {"Id": record_id, "LevelId": self.application_level_id, # "FieldContents": transformed_json}}) # else: # post_header["X-Http-Method-Override"] = "POST" # body = json.dumps({"Content": {"LevelId": self.application_level_id, "FieldContents": transformed_json}}) # try: # if record_id: # response = requests.put(api_url, headers=post_header, data=body, verify=False) # data = json.loads(response.content.decode("utf-8")) # log.info("Record updated, %s", data["RequestedObject"]["Id"]) # else: # response = requests.post(api_url, headers=post_header, data=body, verify=False) # data = json.loads(response.content.decode("utf-8")) # log.info("Function create_content_record created record, %s", data["RequestedObject"]["Id"]) # return data["RequestedObject"]["Id"] # except Exception as e: # log.info("Function create_content_record didn't work, %s", e) # def create_sub_record(self, fields_json, subform_name): # """LevelID is an application # :param fields_json: {field name how you see it in the app: value content # (for text it text, for others it's internal unique ids)} # :param subform_name: how you see it in the app # :returns sub_record_id # """ # api_url = f"{self.api_url_base}core/content/" # post_header = dict(self.header) # post_header["X-Http-Method-Override"] = "POST" # subform_field_id = self.get_field_id_by_name(subform_name) # transformed_json = {} # for key in fields_json.keys(): # current_id = self.subforms_json_by_sf_name[subform_name][key] # current_json = dict(self.subforms_json_by_sf_name[subform_name][current_id]) # current_json["Value"] = fields_json[key] # subform_level_id = self.subforms_json_by_sf_name[subform_name]["LevelId"] # transformed_json.update({current_id: current_json}) # body = json.dumps({"Content": {"LevelId": subform_level_id, "FieldContents": transformed_json}, # "SubformFieldId": subform_field_id}) # try: # response = requests.post(api_url, headers=post_header, data=body, verify=False) # data = json.loads(response.content.decode("utf-8")) # log.info("Function create_sub_record created record, %s", data["RequestedObject"]["Id"]) # return data["RequestedObject"]["Id"] # except Exception as e: # log.error("Function create_sub_record didn't work, %s", e) # def post_attachment(self, name, base64_string): # """ # :param name: Name of the attachment # :param base64_string: File in base64_string # :return: # """ # api_url = f"{self.api_url_base}core/content/attachment" # post_header = dict(self.header) # post_header["X-Http-Method-Override"] = "POST" # body = json.dumps({"AttachmentName": name, "AttachmentBytes": base64_string}) # try: # response = requests.post(api_url, headers=post_header, data=body, verify=False) # data = response.json() # log.info("Attachment %s posted to Archer", data["RequestedObject"]["Id"]) # return data["RequestedObject"]["Id"] # except Exception as e: # log.error("Function post_attachment didn't work, %s; Response content: %s", e, response.content) # def update_content_record(self, updated_json, record_id): # """LevelID is an application # :param updated_json: see function create_content_record() # :param record_id: internal archer ID # :returns record_id # """ # return self.create_content_record(updated_json, record_id) # def get_record(self, record_id): # """ # :param record_id: internal archer record id # :return: record object # """ # api_url = f"{self.api_url_base}core/content/fieldcontent/" # cont_id = [str(record_id)] # body = json.dumps({"FieldIds": self.all_application_fields_array, "ContentIds": cont_id}) # post_header = dict(self.header) # post_header["X-Http-Method-Override"] = "POST" # try: # response = requests.post(api_url, headers=post_header, data=body, verify=False) # data = json.loads(response.content.decode("utf-8")) # return Record(self, data[0]["RequestedObject"]) # except Exception as e: # log.error("Function get_record() didn't work, %s", e) # def get_sub_record(self, sub_record_id, sub_record_name): # """ # :param sub_record_id: # :param sub_record_name: # :return: record object # """ # api_url = f"{self.api_url_base}core/content/fieldcontent/" # cont_id = [str(sub_record_id)] # all_fields_arr = self.subforms_json_by_sf_name[sub_record_name]["AllFields"] # body = json.dumps({"FieldIds": all_fields_arr, "ContentIds": cont_id}) # post_header = dict(self.header) # post_header["X-Http-Method-Override"] = "POST" # try: # response = requests.post(api_url, headers=post_header, data=body, verify=False) # data = json.loads(response.content.decode("utf-8")) # return Record(self, data[0]["RequestedObject"]) # except Exception as e: # log.error("Function get_sub_record() didn't work, %s", e) # def all_endpoints(self): # """ # :param app_name: Try a name you see in the app # :return: You will get printout of all similar grc_api endpoints urls that # might be slightly different from the app name, don't ask me why. # For all grc_api calls use the name you get. # """ # resp = self.request_helper("", content_api=True, method="get") # return resp.json()['value'] # def records(self): # pass # def get_grc_endpoint_records(self, endpoint_url, skip=None): # """ # By default gets 1000 records from the endpoint. # :param endpoint_url: get from find_grc_endpoint_url() # :param skip: number of records to skip in thousands (1,2,3) # :return: array of record jsons # """ # if skip: # api_url = self.content_api_url_base + endpoint_url + "?$skip=" + str(skip) # else: # api_url = self.content_api_url_base + endpoint_url # response = requests.get(api_url, headers=self.header, verify=False) # data = json.loads(response.content.decode("utf-8")) # array_jsons = [] # for record in data["value"]: # array_jsons.append(record) # return array_jsons # def build_unique_value_to_id_mapping(self, endpoint_url, key_value_field=None, prefix=None): # """ # :param endpoint_url: get from find_grc_endpoint_url() # :param key_value_field: name of the field with unique value that you # identified in your application(e.g. "Incident #") # :param prefix: adding prefix in front of key_value_field, sometimes in Archer # tranp_key fields are shown like INC-xxx, but in app they only have xxx, # so to solve that add prefix here, in our case it's INC- # :return: Populate Archer_Instance object with self.key_field_value_to_system_id with {field_value:content_record_id} # """ # i = 0 # for_equal_numbers = 0 # breaks out of the loop if the number of records are equal to 1000 # all_records = [] # while True: # current_records = self.get_grc_endpoint_records(endpoint_url, i) # all_records += current_records # if len(current_records) != 1000 or for_equal_numbers > 21: # Attention, if records are more than 21000 increase the value # break # i += 1000 # for_equal_numbers += 1 # for record in all_records: # if key_value_field: # if prefix: # field_value = prefix + str(record[key_value_field]) # else: # field_value = str(record[key_value_field]) # system_id = record[endpoint_url + "_Id"] # self.key_field_value_to_system_id.update({field_value: system_id}) # else: # print(record) # print('Please choose your key_field above: {"KEY_FIELD": "unique value"}') # break # log.info("Updated the mapping between record id and KEY_FIELD") # def get_record_id_by_unique_value(self, key_value_field): # """ # :param key_value_field: field you used in build_unique_value_to_id_mapping() # :return: record id or False # """ # try: # return self.key_field_value_to_system_id[key_value_field] # except: # return False # def add_record_id_to_mapping(self, key_value_field, system_id, prefix=None): # """ # :param key_value_field: field you used in build_unique_value_to_id_mapping() # :param system_id: redord id # :param prefix: # :return: populate self.key_field_value_to_system_id # """ # if prefix: # field_value = prefix + str(key_value_field) # else: # field_value = str(key_value_field) # self.key_field_value_to_system_id.update({field_value: system_id})