resilient-lib

Python module with library calls which facilitate the development of Apps for IBM SOAR

Usage

resilient-lib is now a direct dependency of resilient-circuits as of v40.0 to help make development more streamlined

It can also be installed directly with:

$ pip install resilient-lib

To use it within your App development, import it like any other Python Package:

from resilient_lib import readable_datetime

created_date = readable_datetime(fn_inputs.inc_create_date, rtn_format="%m-%d-%Y %H:%M:%S")

Proxies

You can now take advantage of the global proxy functionality. There is no need to add an [integrations] section like below. Run the following on your App Host for more information:

$ manageAppHost proxy -h

The hierarchy of proxies is as follows:

  1. RequestsCommon.execute() Function: the proxies parameter.

  2. Environmental Variables: HTTPS_PROXY, HTTP_PROXY and NO_PROXY set using the manageAppHost proxy command on the App Host.

  3. Function Options: http_proxy or https_proxy configs set in the Function Section ([my_function]) of your app.config file.

  4. Integrations Options: http_proxy or https_proxy configs set in the Integrations Section ([integrations]) of your app.config file.

If using a version of App Host earlier than 1.6, resilient-lib supports an [integrations] section in your app.config file. Add this section to define proxy settings to be used for all integrations that use this library.

[integrations]
verify=True
# These proxy settings will be used by all integrations.
# To override, add any parameter to your specific integration section
http_proxy=
https_proxy=
timeout=30

Certificate Verification

You can now take advantage of the global verify functionality. Valid options for verify are True, False, or a path to a valid certificate chain file. The default (and suggested value) is True which will use the default Python CA bundle found at REQUESTS_CA_BUNDLE.

The hierarchy of proxies is as follows:

  1. RequestsCommon.execute() Function: the verify parameter.

  2. Function Options: verify config set in the Function Section ([my_function]) of your app.config file.

  3. Integrations Options: verify config set in the Integrations Section ([integrations]) of your app.config file.

Example:

[my_function]
verify=/var/rescircuits/cafile.pem
http_proxy=
https_proxy=
timeout=30

Common Helper Methods

build_incident_url(url, incidentId, orgId=None)

Build the url to link to a SOAR incident or CP4S case. Add https if http/https is not provided at the start. If url is not a string, returns back the value given.

orgId is optional to maintain backward compatibility, however, it is strongly recommended to provide the organization ID of the incident so that links work without unexpected hiccups when multiple orgs are available on your SOAR instance

Returns a URL in the format https://<url>/#<incident_id>?orgId=<orgId>.

Parameters:
  • url (str) – the URL of your SOAR instance

  • incidentId (str|int) – the id of the incident

  • orgId (str|int|None) – (optional) the id of the org the incident lives in. If the user is logged into a different org and this is not set, the link produced may direct the user to a different incident resulting in unexpected results

Returns:

full URL to the incident

Return type:

str

build_resilient_url(host, port)

Build basic url to SOAR/CP4S instance. Add ‘https’ if http/https is not provided at the start. If host is not a string, returns back the value given.

Parameters:
  • host (str) – host name

  • port (str|int) – port

Returns:

base url

Return type:

str

build_task_url(url, incident_id, task_id, org_id)

Build the url to link to a SOAR/CP4S task. Add https if http/https is not provided at the start. If url is not a string, returns back the value given.

Returns a URL in the format https://<url>/#<incident_id>?orgId=<org_id>&taskId=<task_id>&tabName=details.

Parameters:
  • url (str) – the URL of your SOAR instance

  • incident_id (str|int) – the id of the incident

  • task_id (str|int) – the id of the task

  • org_id (str|int) – the id of the org the incident lives in

Returns:

full URL to the task’s details tab

Return type:

str

clean_html(html_fragment)

SOAR textarea fields return HTML fragments. This routine removes the HTML and inserts any code within <div></div> with a linefeed.

Note

The string returned from this method may not format well as no presentation of line feeds are preserved, tags such as <br> or <ol>, <ul>, etc. are removed. See MarkdownParser class for a better way to translate html input to markdown.

Parameters:

html_fragment (str) – the html to clean up

Returns:

cleaned up code

Return type:

str

close_incident(res_client, incident_id, kwargs, handle_names=False)

Close an incident in SOAR.

  • Any required on close (roc) fields that are needed, pass them as a field_name:field_value dict in kwargs

  • If any roc select field needs to be identified as its name, set handle_names to True

Example:

res = close_incident(
    res_client=self.rest_client(),
    incident_id=fn_inputs.incident_id,
    kwargs={"resolution_id": "Duplicate", "resolution_summary": "This ticket is a duplicate"},
    handle_names=True
)
Parameters:
  • res_client (resilient_circuits.ResilientComponent.rest_client()) – required for communication back to SOAR

  • incident_id (int|str) – id of the incident

  • kwargs (dict) – required fields needed to close an incident in a field_name:field_value format

  • handle_names (bool) – if True, any select field types in kwargs will take str instead of int as their value

Returns:

Response from the server indicating if the incident was closed or not

Return type:

requests.Response

get_file_attachment(res_client, incident_id, artifact_id=None, task_id=None, attachment_id=None)

Call the SOAR REST API to get the attachment or artifact data for an Incident or a Task

  • If incident_id and artifact_id are defined it will get that Artifact

  • If incident_id and attachment_id are defined it will get that Incident Attachment

  • If incident_id, task_id and attachment_id are defined it will get that Task Attachment

Note

The artifact_id must reference an Artifact that is a downloadable type or it will raise a resilient.SimpleHTTPException

Example:

artifact_data = get_file_attachment(self.rest_client(), incident_id=2001, artifact_id=1)

with open("malware.eml", "wb") as f:
    f.write(artifact_data)
Parameters:
  • res_client (resilient_circuits.ResilientComponent.rest_client()) – required for communication back to SOAR

  • incident_id (int|str) – id of the Incident

  • artifact_id (int|str) – id of the Incident’s Artifact to download

  • task_id (int|str) – id of the Task to download it’s Attachment from

  • attachment_id (int|str) – id of the Incident’s Attachment to download

Returns:

byte string of attachment

Return type:

str

get_file_attachment_metadata(res_client, incident_id, artifact_id=None, task_id=None, attachment_id=None)

Call the SOAR REST API to get the attachment or artifact attachment metadata

Parameters:
  • res_client (resilient_circuits.ResilientComponent.rest_client()) – required for communication back to SOAR

  • incident_id (int|str) – id of the Incident

  • artifact_id (int|str) – id of the Incident’s Artifact

  • task_id (int|str) – id of the Task to get it’s Attachment metadata

  • attachment_id (int|str) – id of the Incident’s Attachment

Returns:

File metadata returned from endpoint

Return type:

dict

get_file_attachment_name(res_client, incident_id=None, artifact_id=None, task_id=None, attachment_id=None)

Call the SOAR REST API to get the attachment or artifact attachment name

Parameters:
  • res_client (resilient_circuits.ResilientComponent.rest_client()) – required for communication back to SOAR

  • incident_id (int|str) – id of the Incident

  • task_id (int|str) – id of the Task

  • attachment_id (int|str) – id of the Incident’s Attachment to get the name for

Returns:

name of the Attachment or Artifact

Return type:

str

readable_datetime(timestamp, milliseconds=True, rtn_format='%Y-%m-%dT%H:%M:%SZ')

Convert an epoch timestamp to a string using a format

Parameters:
Returns:

string representation of timestamp

Return type:

str

str_to_bool(value)

Convert value to either a True or False boolean

Returns False if value is anything other than: '1', 'true', 'yes' or 'on'

Parameters:

value (str) – the value to convert

Returns:

True or False

Return type:

bool

unescape(data)
Return unescaped data such as:
  • &gt; converts to >

  • &quot converts to '

Parameters:

data (str) – text to convert

Returns:

the text unescaped

Return type:

str

validate_fields(field_list, kwargs)

Ensure each mandatory field in field_list is present in kwargs. Throw ValueError if not.

field_list can be a list/tuple of strings where each string is a field name or it can be a list/tuple of dicts where each item has the attributes name (required) and placeholder (optional).

kwargs can be a dict or namedtuple. If a namedtuple, it calls its kwargs._as_dict() method and raises a ValueError if it does not succeed.

  • If the value of the item in kwargs is equal to its placeholder defined in field_list, a ValueError is raised.

  • If an item in kwargs is a Resilient Select Function Input, its value will be a dict that has a name attribute. This returns the value of name.

  • If an item in kwargs is a Resilient Multi-Select Function Input, its value will be a list of dicts that have the name attribute. This returns a list of the name values for that item.

Parameters:
  • field_list (list|tuple) – the mandatory fields. Can be an empty list if no mandatory fields.

  • kwargs (dict|namedtuple) – dict or a namedtuple of all the fields to search.

Raises:

ValueError – if a field is missing

Returns:

a Dictionary of all fields with Select/Multi-Select fields handled.

Return type:

dict

write_file_attachment(res_client, file_name, datastream, incident_id, task_id=None, content_type=None)

Add a file attachment to SOAR using the REST API to an Incident or a Task

Example:

with open("malware.eml", "rb") as data_stream:
    res = write_file_attachment(self.rest_client(), "malware.eml", data_stream, 2001)
Parameters:
  • res_client (ResilientComponent.rest_client()) – required for communication back to SOAR

  • file_name (str) – name of the attachment to create

  • dataStream (stream of bytes) – stream of bytes used to create the attachment

  • incident_id (int|str) – id of the Incident

  • task_id (int|str) – (optional) id of the Task

  • content_type – (optional) MIME type of attachment. Default is "application/octet-stream"

  • content_type – str

Returns:

metadata of new attachment created

Return type:

dict

write_to_tmp_file(data, tmp_file_name=None, path_tmp_dir=None)

Writes data to a file in a safely created temp directory.

  • If no tmp_file_name is provided, a temp name will be given

  • If no path_tmp_dir is provided a temp directory is created with the prefix resilient-lib-tmp-

Note

When used, ensure you safely remove the created temp directory in the finally block of the FunctionComponent code.

Example:

import os
import shutil

try:
    path_tmp_file, path_tmp_dir = write_to_tmp_file(attachment_contents, tmp_file_name=attachment_metadata.get("name"))

except Exception:
    yield FunctionError()

finally:
    if path_tmp_dir and os.path.isdir(path_tmp_dir):
        shutil.rmtree(path_tmp_dir)
Parameters:
  • data (bytes) – bytes to be written to the file

  • tmp_file_name (str) – name to be given to the file

  • path_tmp_dir (str) – path to an existing directory to use as the temp dir

Returns:

a tuple (path_tmp_file, path_tmp_dir)

Return type:

tuple

class MarkdownParser(bold='**', italic='*', underline='__', strikeout='~~', bullets='*', number=0, indent=4, monospace=['{{', '}}'], headers=['h1.', 'h2.', 'h3.', 'h4.', 'h5.', 'h6.'], blockquote='```')

Convert HTML text into Markdown. A wrapper for html.parser.HTMLParser

Example:

from resilient_lib import MarkdownParser

data = "<div class='rte'><div><strong><u>underline and strong</u></strong></div></div>"
markdown = "*_underline and strong_*"

parser = MarkdownParser(bold="*", underline="_") # override defaults
converted = parser.convert(data)
self.assertEqual(converted, markdown)
MarkdownParser.convert(self, data)

Converts html data to markdown and returns the converted string

Parameters:

data (str) – html text to convert

Returns:

converted text to markdown

Return type:

str

class OAuth2ClientCredentialsSession(url=None, client_id=None, client_secret=None, scope=None, proxies=None)

Wrapper around requests.Session which receives authentication tokens through the client_credentials type of OAuth2 grant, and adds tokens to the requests made through the session. It also attempts to keep track of the token and refresh it as needed.

  • If proxies are defined, every request will use them.

This session does not request authorization from the user first. The scope should be pre-authorized.

Example:

from resilient_lib import OAuth2ClientCredentialsSession, RequestsCommon

api1 = OAuth2ClientCredentialsSession('https://example1.com/<tenant_id>/oauth/v2/', client_id='xxx', client_secret='xxx')
api2 = OAuth2ClientCredentialsSession('https://example2.com/<tenant_id>/oauth/v2/', client_id='xxx', client_secret='xxx')

api1.post('https://example1.com/v4/me/messages', data={}) # use as a regular requests session object

api2.get('https://example2.com/v2/me/updates')

# When writing an integration, use RequestsCommon to get the proxies defined in in your app.config file.
rc = RequestsCommon(opts, function_opts)
api3 = OAuth2ClientCredentialsSession('https://example3.com/{}/test', proxies=rc.get_proxies())
Parameters:
  • url (str) – Authorization URL, with tenant_id in it, if required

  • client_id (str) – API key/User ID

  • client_secret (str) – secret for API key

  • scope ([str]) – (optional) list of scopes

  • proxies (dict) –

    (optional) Dictionary mapping protocol to the URL of the proxy. The mapping protocol must be in the format:

    {
        "https_proxy": "https://localhost:8080,
        "http_proxy": "http://localhost:8080
    }
    

add_authorization_header(headers)

Create headers needed for authentication/authorization, overriding the default ones if needed.

Parameters:

headers (dict) – Dictionary containing the Authorization header

authenticate(url, client_id, client_secret, scope, proxies=None)

Authenticate with the endpoint

get_token(token_url, client_id, client_secret, scope=None, proxies=None)

Override this method if the request needs specific information in the body of the request

For example, Cloud Foundry asking grant_type to be password and API key to be passed in password.

The default is:

post_data = {
    'client_id': client_id,
    'client_secret': client_secret,
    'grant_type': 'client_credentials'
}

if scope:
    post_data['scope'] = scope
Parameters:
  • token_url (str) – Authorization URL, with tenant_id in it, if required

  • client_id (str) – API key/User ID

  • client_secret (str) – secret for API key

  • scope ([str]) – (optional) list of scopes

  • proxies (dict) – (optional) Dictionary mapping protocol to the URL of the proxy.

Returns:

the response from the token_url

Return type:

requests.Response

request(method, url, *args, **kwargs)

Constructs a requests.Request, injects it with tokens and sends it in a new session to avoid having one session constantly open.

This uses the requests.request() function to make a call. The inputs are mapped to this function. See requests.request() for information on any parameters available

Returns:

the response object

Return type:

requests.Response

update_token()

Institutes a request for a new access token.

Raises:

ValueError – If it cannot update the token

Returns:

True if could get a new token

Return type:

bool

Common Request Methods

class RequestsCommon(opts=None, function_opts=None)

This class represents common functions around the use of the requests package for REST based APIs. It incorporates the app.config section [integrations] which can be used to define a common set of proxies for use by all functions using this library.

Any similar properties in the function’s section would override the [integrations] properties.

Note

In the Atomic Function template, as of version 41.1, RequestsCommon is instantiated and available in a class that inherits AppFunctionComponent as an rc attribute:

response = self.rc.execute(method="GET", url="ibm.com")
Parameters:
  • opts (dict) – all configurations found in the app.config file

  • function_opts (dict) – all configurations found in the [my_function] section of the app.config file

RequestsCommon.execute(self, method, url, timeout=None, proxies=None, callback=None, clientauth=None, verify=None, retry_tries=1, retry_delay=1, retry_max_delay=None, retry_backoff=1, retry_jitter=0, retry_exceptions=<class 'requests.exceptions.HTTPError'>, **kwargs)

Constructs and sends a request. Returns a requests.Response object.

This uses the requests.request() function to make a call. The inputs are mapped to this function. See requests.request() for information on any parameters available, but not documented here.

Retries can be achieved through the parameters prefixed with retry_<param>. Retry is only available in PY3+.

Parameters:
  • method (str) – Rest method to execute (GET, POST, etc…)

  • url (str) – URL to execute request against

  • timeout (float or tuple) – (Optional) Number of seconds to wait for the server to send data before sending a float or a timeout tuple (connect timeout, read timeout). See requests docs for more. If None it looks in the [integrations] section of your app.config for the timeout setting.

  • proxies (dict) –

    (Optional) Dictionary mapping protocol to the URL of the proxy. The mapping protocol must be in the format:

    {
        "https_proxy": "https://localhost:8080,
        "http_proxy": "http://localhost:8080
    }
    

  • callback (function) –

    (Optional) Once a response is received from the endpoint, return this callback function passing in the response as its only parameter. Can be used to specifically handle errors. If callback is given, any retry parameters will be ignored unless the callback function raises an error in the parameter retry_exceptions (see below) in which case the retry logic will kick in.

    Example:

    from resilient_lib import IntegrationError, RequestsCommon
    
    def custom_callback(response):
        """ custom callback function to handle 400 error codes """
        if response.status_code >= 400 and response.status_code < 500:
            # raise ValueError which will be retried
            raise ValueError("retry me")
        else:
            # all other status codes should return normally
            # note this bypasses the normal rc.execute logic which
            # would raise an error other 500 errors
            return response
    
    rc = RequestsCommon()
    try:
        # will retry 3 times then will raise IntegrationError
        response = rc.execute("GET", "https://postman-echo.com/status/404",
                                   callback=custom_callback, retry_tries=3,
                                   retry_exceptions=ValueError)
    except IntegrationError as err:
        print(err)
    

  • clientauth (str or tuple(str, str)) –

    (Optional) Equivalent to the cert parameter of requests. Client-side certificates can be configured automatically in the app’s section of the app.config. These values will be read in automatically by RequestsCommon when execute is called.

    Example:

    [resilient]
    ... # configs for connection to SOAR
    
    [fn_my_app]
    ... # some other configs
    client_auth_cert=<path_to_client_auth_cert_file>
    client_auth_key=<path_to_client_auth_private_key_file>
    

    If not set in the app.config file, the filepath for the client side certificate and the private key can be passed to this function either as the path to a single file or as a tuple of both files’ paths.

  • verify (bool or str) –

    (Optional) Either a boolean, in which case it controls whether requests verifies the server’s TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Defaults to None as passed in here. If None, then the value is searched for in the app’s section of the app.config. If no value is found, then the requests.request() default of True is used. In that case, the CA bundle found at requests.utils.DEFAULT_CA_BUNDLE_PATH is the CA bundle used, which is usually a bundle provided by Mozilla. Setting verify=True is the safest option and should only be changed if you have a self-signed certificate or you want to bypass SSL for testing purposes. A production level app should never disable SSL verification completely.

    Example:

    [fn_my_app]
    ... # some other configs
    verify=<path/to/CA/bundle/to/use>
    

    Note

    The value held in the app’s config will be overwritten if a value is passed in by the call to execute.

  • retry_tries (int) – (PY3 only) The maximum number of attempts. Default: 1 (no retry). Use -1 for unlimited retries. Matches tries parameter of retry.api.retry_call.

  • retry_delay (int) –

    (PY3 only) Initial delay between attempts. Default: 1. Matches delay parameter of retry.api.retry_call.

  • retry_max_delay (int) –

    (PY3 only) The maximum value of delay. Default: None (no limit). Matches max_delay parameter of retry.api.retry_call.

  • retry_backoff (int) –

    (PY3 only) Multiplier applied to delay between attempts. Default: 1 (no backoff). Matches backoff parameter of retry.api.retry_call.

  • retry_jitter (int | tuple(int, int)) –

    (PY3 only) Extra seconds added to delay between attempts. Default: 0. Fixed if a number, random if a range tuple (min, max). Matches jitter parameter of retry.api.retry_call.

  • retry_exceptions (Exception | tuple(Exception)) –

    (PY3 only) An exception or a tuple of exceptions to catch. Default: requests.exceptions.HTTPError. Matches exceptions parameter of retry.api.retry_call.

Returns:

the response from the endpoint or return callback if defined.

Return type:

requests.Response object or callback function.

RequestsCommon.get_proxies(self)

Proxies can be specified globally for all integrations or specifically per function.

  • If the environmental variables HTTPS_PROXY, HTTP_PROXY and NO_PROXY are set, this returns None, as if for a requests.Request. If proxies are None, the environmental variables are used

  • If http_proxy or https_proxy is set in the Function Section ([my_function]) of your app.config file, returns a dictionary mapping protocol to the URL of the proxy.

  • If http_proxy or https_proxy is set in the Integrations Section ([integrations]) of your app.config file, returns a dictionary mapping protocol to the URL of the proxy.

Returns:

A dictionary mapping protocol to the URL of the proxy or None

Return type:

dict or None

RequestsCommon.get_verify(self)

Get verify parameter from app config or from env var REQUESTS_CA_BUNDLE which is the default way to set verify with python requests library.

Value can be set in [integrations] or in the [fn_my_app] section

Value in [fn_my_app] takes precedence over [integrations] which takes precedence over REQUESTS_CA_BUNDLE

Parameters:

app_options (dict) – App config dict

Returns:

Value to set requests.session.verify to. Either a path or a boolean

Return type:

bool or str or None (which will default to requests default which is True)

RequestsCommon.get_client_auth(self)

A client certificate for authenticating calls to external endpoints can be specified on a per function basis.

If client_auth_cert and client_auth_key are set in the Function Section ([my_function]) of your app.config file, returns a tuple containing the respective paths to the certificate and private key for the client cert.

Example:

[my_function]
...
client_auth_cert = <path_to_cert.pem>
client_auth_key = <path_to_cert_private_key.pem>
Returns:

The filepaths for the client side certificate and the private key as a tuple of both files’ paths or None if either one of the values are missing

Return type:

tuple(str, str)

RequestsCommon.get_timeout(self)

Get the default timeout value in the Function Section ([my_function]) of your app.config file, or from the Integrations Section ([integrations]) of your app.config file.

[my_function] section takes precedence over [integrations] section value.

Returns:

timeout value

Return type:

int

class RequestsCommonWithoutSession(*args, **kwargs)

This class extends RequestsCommon maintaining the behavior of RequestsCommon that was present in versions <= 47.1.x.

The difference is that every time RequestsCommonWithoutSession.execute() is called, this class will use requests.request to execute the request, while RequestsCommon.execute() uses Session().request.

This class is interchangeable with RequestsCommon and code that uses RequestsCommon can be simiply refactored to use RequestsCommonWithoutSession

Parameters:

Common Poller Methods

poller(named_poller_interval, named_last_poller_time)

Decorator to wrap a function as the logic for a poller.

Example:

import logging
from threading import Thread

from resilient_circuits import AppFunctionComponent, is_this_a_selftest
from resilient_lib import poller, get_last_poller_date

PACKAGE_NAME = "fn_my_app"

LOG = logging.getLogger(__name__)

class MyPoller(AppFunctionComponent):

    def __init__(self, opts):
        super(PollerComponent, self).__init__(opts, PACKAGE_NAME)

        polling_interval = 5 # set to 5 seconds or could get from ``self.options``
        last_poller_time = get_last_poller_date(120) # look back 2 hours

        if is_this_a_selftest(self):
            LOG.warn("Running selftest -- disabling poller")
        else:
            poller_thread = Thread(target=self.run)
            poller_thread.daemon = True
            poller_thread.start()

    @poller('polling_interval', 'last_poller_time')
    def run(self, *args, **kwargs):

        # poll endpoint
        query_results = query_entities_since_last_poll(kwargs['last_poller_time'])

        # process any results to create, update, or close case in SOAR
        if query_results:
            self.process_entity_list(query_results)
Parameters:
  • named_poller_interval (str) – name of instance variable containing the poller interval in seconds

  • named_last_poller_time (datetime) – name of instance variable containing the lookback value in mseconds

class SOARCommon(rest_client)

Common methods for accessing IBM SOAR cases and their entities: comments, attachments, etc. This class and its methods should be used in conjunction with wrapped polling logic to communicated with the SOAR platform.

create_case_comment(case_id, note, entity_comment_id=None, entity_comment_header=None)

Add a comment to the specified SOAR case by ID.

Parameters:
  • case_id (str|int) – SOAR case ID

  • note (str) – Content to be added as a note to the case

  • entity_comment_id (str|int) – (Optional) entity comment id if updating an existing comment

  • entity_comment_header (str) – (Optional) header to place in bold at the top of the note

Returns:

Response from posting the comment to SOAR

Return type:

dict

create_datatable_row(case_id, datatable, rowdata)

Create a row in a SOAR datatable. rowdata should be formatted as a dictionary of column name and value pairs

Example:

from resilient_lib import SOARCommon

soar_common = SOARCommon(res_client)

case_id = get_case_id()
rowdata = {"column_1_api_name": 1, "column_2_api_name": 2}

soar_common.create_datatable_row(case_id, "my_dt", rowdata)
Parameters:
  • case_id (str|int) – case containing the datatable

  • datatable (str) – API name of datatable

  • rowdata (dict) – columns and values to add

create_soar_case(case_payload)

Create a new IBM SOAR case based on a payload formatted for the API call.

Parameters:

case_payload (dict) – Fields to use for creating a SOAR case

Returns:

API result with the created case

Return type:

dict

filter_soar_comments(case_id, entity_comments, soar_header=None)

Read all SOAR comments from a case and remove those comments which have already been synced using soar_header as a filter

Parameters:
  • case_id (str|int) – ([str]): [IBM SOAR case id]

  • entity_comments (list(str)) – list comments which the endpoint’s entity contains. This will be a mix of comments sync’d from SOAR and new comments

  • soar_header (str) – title added to SOAR comments to be filtered

Returns:

list of remaining comments

Return type:

list

get_case(case_id)

Get a SOAR case based on the case id

Parameters:

case_id (str|int) – ID of the case to get the details of

Returns:

Details of the case for a given ID

Return type:

dict

get_case_artifacts(case_id)

Get all case artifacts

Parameters:

case_id (str|int) – ID of the case to get the details of

get_case_attachment(case_id, artifact_id=None, task_id=None, attachment_id=None, return_base64=True)

Get contents of a file attachment or artifact.

  • If incident_id and artifact_id are defined it will get that Artifact

  • If incident_id and attachment_id are defined it will get that Incident Attachment

  • If incident_id, task_id and attachment_id are defined it will get that Task Attachment

Parameters:
  • case_id (str|int) – ID of the case to get the details of

  • artifact_id (str|int) – ID of the Incident’s Artifact to download

  • task_id (str|int) – ID of the Task to download it’s Attachment from

  • attachment_id (int|str) – id of the Incident’s Attachment to download

  • return_base64 (bool) – if False, return value of the content will be given as bytes, default True returns a base64 encoded string

Returns:

name of the artifact or file and base64 encoded string or byte string of attachment

Return type:

tuple(str, bytes|base64(str))

get_case_attachments(case_id, return_base64=True)

Get all case attachments

Parameters:
  • case_id (str|int) – ID of the case to get the details of

  • return_base64 (bool) – if False, return value of the content will be given as bytes

Returns:

list of attachments associated with the given case

Return type:

list[tuple(str, bytes|base64(str))]

get_case_comments(case_id)

Get all case comments

Parameters:

case_id (str|int) – ID of the case to get the details of

get_case_tasks(case_id, want_layouts=False, want_notes=False, handle_format='names')

Get all the tasks from a SOAR case

Parameters:
  • case_id (str|int) – IBM SOAR case id

  • want_layouts (bool) – If the task layout should be returned. Default is False.

  • want_notes (bool) – If the task notes should be returned. Default is False.

  • handle_format (str) – The format to return can be names, ids, or objects. Default is names.

Returns:

A list of all the tasks for the given SOAR case

Return type:

list

get_soar_case(search_fields, open_cases=True, uri_filters='return_level=normal')

Find a SOAR case which contains custom field(s) associated with the associated endpoint. Returns only one case. See SOARCommon.get_soar_cases() for examples.

Note

search_fields only supports custom fields.

Parameters:
  • search_fields (dict) – Dictionary containing key/value pairs to search for a case match. Field values can be True/False for has_a_value or does_not_have_a_value, otherwise a field will use equals for the value.

  • uri_filters (str) – Filters for the end of the uri. Default is “return_level=normal”

  • open_cases (bool) – True if only querying open cases.

Returns:

A tuple with the matching case, if any, and any associated error message if something went wrong. Returns None if no associated case was found.

Return type:

tuple(dict, str)

get_soar_cases(search_fields, open_cases=True, uri_filters='return_level=normal')

Get all IBM SOAR cases that match the given search fields. To find all cases that are synced from the endpoint platform, provide the unique search field that matches the unique id of your endpoint solution.

Example:

from resilient_lib import SOARCommon

soar_common = SOARCommon(res_client)
found_id = get_id_from_endpoint_query_result()
cases = soar_common.get_soar_cases({ "endpoint_id": found_id }, open_cases=False)

Note

search_fields only supports custom fields.

Parameters:
  • search_fields (dict) – Dictionary containing key/value pairs to search for a case match. Field values can be True/False for has_a_value or does_not_have_a_value, otherwise a field will use equals for the value.

  • uri_filters (str) – Filters for the end of the uri. Default is “return_level=normal”

  • open_cases (bool) – True if only querying open cases.

Returns:

A tuple with a list of cases whose values match the search_fields and any associated error message.

Return type:

tuple(dict, str)

lookup_artifact_type(artifact_type)

Return an artifact type based on it’s ID on SOAR. If not found, return None

Parameters:

artifact_type (str) – artifact type to search for in SOAR

Returns:

found artifact type if found in SOAR

Return type:

str

lookup_incident_types(incident_type_ids)

Return an incident type based on it’s ID. If not found, return None

Parameters:

incident_type_ids (list(str)) – list of incident type IDs to check for in SOAR

Returns:

list of incident types that match in SOAR what was given

Return type:

list(str)

update_soar_case(case_id, case_payload)

Update an IBM SOAR case (usually from a rendered Jinja template).

Parameters:

case_payload (dict) – Fields to be updated and their values

Returns:

The updated SOAR case case

Return type:

dict

eval_mapping(eval_value, wrapper=None)

Map a JSON string to a python object. Safely evaluates the values with the use of ast.literal_eval and can ONLY convert to a list or a dictionary.

Note

wrapper is optional (but recommended) to ensure that the eval_value is properly formatted as JSON. If None, then assumes that the eval_value comes pre-formatted in acceptable JSON format

This method is intended to be used to safely take input strings from the user’s config file that will eventually need to evaluated as JSON elsewhere in the code.

Example:

from resilient_lib import eval_mapping

config_value = app_configs.get("some_config")
# Assume that 'config_value' has a value of
# '"source":"A","tags":["tagA"],"priorities":[40,50]'

# parse the values to a dict, using a dictionary wrapper:
parsed_config = eval_mapping(config_value, "{{ {} }}")

# the returned value is a dict like:
# {
#   "source": "A",
#   "tags": ["tagA"],
#   "priorities": [40, 50]
# }
Parameters:
  • eval_value (str) – json fragment to evaluate

  • wrapper (str) – wrapper to put the source value into [{}] or {{ {} }}

Returns:

converted data

Return type:

list|dict

Common Jinja Methods

Common methods and filters used to process Jinja Templates

Usage:

  1. Create a Jinja template in the /util/templates directory of the App.

Note

Normally a template is in JSON format:

{
    "name": "{{ alert_name }}",
    "severity": "{{ alert_severity }}"
}

2. (Optional) Add an option to your app.config file to specify an absolute path to a custom Jinja template called: custom_template_path - if not provided the value of DEFAULT_TEMPLATE_PATH is used.

  1. Add the following code to your Function:

import pkg_resources
from resilient_circuits import AppFunctionComponent, FunctionResult, app_function
from resilient_lib import make_payload_from_template

PACKAGE_NAME = "my_app"
FN_NAME = "my_function_using_jinja"

# Creating an absolute path to the template
DEFAULT_TEMPLATE_PATH = pkg_resources.resource_filename(PACKAGE_NAME, "util/templates/<default_name>.jinja2")

class FunctionComponent(AppFunctionComponent):

    def __init__(self, opts):
        super(FunctionComponent, self).__init__(opts, PACKAGE_NAME)

    @app_function(FN_NAME)
    def _app_function(self, fn_inputs):
        '''
        Function: A function showing how to use Jinja templates
        Inputs:
            -   fn_inputs.alert_severity
            -   fn_inputs.device_id
        '''

        custom_template_path = self.app_configs.custom_template_path if hasattr(self.app_configs, "custom_template_path") else ""

        template_rendered = make_payload_from_template(
            template_override=custom_template_path,
            default_template=DEFAULT_TEMPLATE_PATH,
            payload={
                "alert_name": f"Malware found on device {fn_inputs.device_id}",
                "alert_severity": fn_inputs.alert_severity
            },
            return_json=True
        )

        self.LOG.info(template_rendered)

        results = {"template_rendered": template_rendered}

        yield FunctionResult(results)

Output:

INFO [my_function_using_jinja] {'name': 'Malware found on device 303', 'severity': 'High'}

4. Update your README.md documentation file to include relevant information about the templates, for example:

## Templates for SOAR Cases
It may necessary to modify the templates used to create SOAR cases based on a customer's required custom fields.
Below are the default templates used which can be copied, modified and used with app.config's
`custom_template_path` setting to override the default template.

### custom_template.jinja
```
{
    "name": "{{ alert_name }}",
    "severity": "{{ alert_severity }}"
}
```

base64_filter(val, indent=2)

base64

Breaks text into fixed-width blocks. You can specify the indent.

Parameters:
  • val (str) – The string to convert

  • indent (int) – Number of tabs

Returns:

Converted string

Return type:

str

camel_filter(val)

camel

Convert text to CamelCase.

Parameters:

val (str) – The string to convert

Returns:

Converted string

Return type:

str

This value is in camel case: {{ a#bc_def | camel }}
>>> 'ABcDef'
global_jinja_env()

Return the Jinja environment with our resilient-lib custom filters. This environment can be expanded upon to add additional custom filters.

See Jinja Custom Filters for more.

Current custom filters available:

json, js, html, url, idna, punycode, ldap, ps, sh, pretty, timestamp, iso8601, uniq, sample, camel, base64, soar_datetimeformat, soar_display_datetimeformat, soar_substitute, soar_splitpart, soar_trimlist
Returns:

The Jinja environment

Return type:

jinja2.Environment

Example:

from resilient-lib import global_jinja_env

addl_custom_filters = {
    "filter_name": method_name
}
env = global_jinja_env()
env.globals.update(addl_custom_filters)
env.filters.update(addl_custom_filters)
html_filter(val)

html

Produces HTML-encoded string of the value.

Parameters:

val (str) – The string to encode

Returns:

Encoded string

Return type:

str

idna_filter(val)

idna

Encodes the value per RFC 3490.

Parameters:

val (str) – The string to encode

Returns:

Encoded string

Return type:

str

iso8601(val)

iso8601

Assuming val is an epoch milliseconds timestamp, produce ISO8601 datetime.

Parameters:

val (str|int) – An epoch milliseconds timestamp

Returns:

ISO8601 datetime

Return type:

str

js_filter(val)

js

Produces JSONified string of the value, without surrounding quotes.

Parameters:

val (str) – The string to convert

Returns:

JSONified string of the value, without surrounding quotes

Return type:

str

json_filter(val, indent=0)

json

Produces JSONified string of the value.

Parameters:

val (str) – The string to convert

Returns:

JSONified string of the value

Return type:

str

ldap_filter(val)

ldap

Produces LDAP-encoded string of the value.

Parameters:

val (str) – The string to encode

Returns:

Encoded string

Return type:

str

make_payload_from_template(template_override, default_template, payload, return_json=True)

Convert a payload into a new format based on a specified template.

Parameters:
  • template_override (str) – Path to the specified template (usually taken from the app.config file. See the Usage example above)

  • default_template (str) – Path to the default template (usually in the ‘/util/templates’ directory. See the Usage example above)

  • payload (dict) – dict of payload that is passed to Jinja template

  • return_json (bool) – False if template should be render as a str and results returned as a str

Returns:

If the Jinja template is valid JSON and return_json is True the result is returned as a dict else it returns the rendered template as a str

Return type:

str|dict

Raises:

ValueError – if return_json is True and the Jinja template is not valid JSON

pretty_filter(val, indent=2)

pretty

Produces pretty-printed string of the value.

Parameters:
  • val (str) – The string to format

  • indent (int) – Number of tabs to use when formatting

Returns:

The formatted string

Return type:

str

ps_filter(val)

ps

Escapes characters in val for use in a PowerShell command line.

Parameters:

val (str) – The string to escaped

Returns:

Escaped string

Return type:

str

punycode_filter(val)

punycode

Encodes the value per RFC 3492.

Parameters:

val (str) – The string to encode

Returns:

Encoded string

Return type:

str

render(template, data)

Render data into a template, producing a string result. All the additional custom filters are available.

Parameters:
  • template (str or dict) – Path to or a dict of the Jinja template

  • data (dict) – JSON data to apply to the template

Returns:

result from the rendering of the template. The template is usually a string, but can be a dict

Return type:

str or dict

Examples:

>>> render("template {{value}}", {"value":"123"})
u'template 123'

>>> render({"template": "{{value}}"}, {"value":"123"})
u'{"template": "123"}'

You can escape values using the 'json' filter,
or the 'url' or 'html' or 'ldap' filters.

>>> render('{"template": {{value|json}} }', {"value":'1"23'})
u'{"template": "1\\"23" }'

>>> render('{"template": "{{value|js}}" }', {"value":'1"23'})
u'{"template": "1\\"23" }'

>>> render('{"template": {{value|ldap}} }', {"value":'1*23'})
u'{"template": 1\\2a23 }'

>>> render('shell "{{value|ps}}"', {"value":'$"foo"'})
u'shell "`$`"foo`""'

>>> render('shell "{{value|sh}}"', {"value":'$"foo"'})
u'shell "\\$\\"foo\\""'

>>> render('template={{value|timestamp}}', {"value":0})
u'template=0'

>>> render('template={{value|timestamp}}', {})
u'template=null'

>>> render('template={{value|timestamp}}', {"value":{"year":2015, "month":7, "day":15}})
u'template=1436918400000'

>>> render('template={{value|timestamp}}', {"value":datetime.datetime(2015, 7, 15)})
u'template=1436918400000'
render_json(template, data)

Render data into a template, producing a JSON result. Also clean up any “really bad” control characters to avoid failure.

Parameters:
  • template (str or dict) – Path to or a dict of the Jinja template

  • data (dict) – dict to apply to the template

Returns:

result from the rendering of the template as a dictionary

Return type:

dict

Examples:

>>> d = {"value": "the" + chr(10) + "new" + chr(10) + "thing"}
>>> render_json('{"result":"{{value}}"}', d)
{u'result': u'the new thing'}

>>> d = {"value": "the" + chr(1) + "new" + chr(9) + "thing"}
>>> render_json('{"result":"{{value}}"}', d)
{u'result': u'the new thing'}
sample_filter(val, count=None)

sample

Return a random sample from a list.

Parameters:
  • val (list) – List of str | obj | int

  • count (int | None) – Number of times to repeat items in val to increase its random probability

Returns:

The random item

Return type:

str | obj | int

sh_filter(val)

sh

Escapes characters in val for use in a Unix shell command line.

Parameters:

val (str) – The string to escaped

Returns:

Escaped string

Return type:

str

soar_datetimeformat(value, date_format='%Y-%m-%dT%H:%M:%S', split_at=None)

soar_datetimeformat

Convert UTC dates to epoch format.

Parameters:
  • value (str) – The UTC date string

  • date_format (str) – (optional) Conversion format. Defaults to "%Y-%m-%dT%H:%M:%S"

  • split_at (str) –

    (optional) Character to split the date field to scope the date field

    split_at='.' to remove milliseconds for "2021-10-22T20:53:53.913Z"
    split_at='+' to remove tz information "2021-10-22T20:53:53+00:00"
    

Returns:

Epoch value of datetime, in milliseconds

Return type:

int

soar_splitpart(value, index, split_chars=' - ')

soar_splitpart

Split a string and return the index.

Parameters:
  • value (str) – string to split

  • index (int) – index of split to return

  • split_chars (str) – (optional) split characters. Defaults to ' - '

Returns:

value of split. If index is out of bounds, the original value is returned

Return type:

str

soar_substitute(value, json_str)

soar_substitute

Replace values based on a lookup dictionary.

Parameters:
  • value (str) – original value to lookup

  • json_str (JSON encoded str) – JSON encoded string with key/value pairs of lookup values

Returns:

replacement value or original value if no replacement found

Return type:

str | int

soar_trimlist(org_list)

soar_trimlist

Trim whitespace from elements in a list.

Parameters:

org_list (list of strings) – list of elements to trim whitespace from

Returns:

list with elements trimmed of whitespace

Return type:

list

timestamp(val)

timestamp

Try convert non-timestamp values to a timestamp.

Parameters:

val (str | dict) – Either "now" or a dict containing year / month / day etc.

Returns:

An epoch milliseconds timestamp

Return type:

int

>>> timestamp({"year": 2018, "month": 8, "day": 1, "timezoneID": "CET"})
1533078000000

>>> timestamp(Undefined())
'null'

>>> timestamp("now") > 1530000000000
True

>>> timestamp("now") > 2000000000000 # 2033
False
uniq(val, key=None)

uniq

Produce the unique list. If val is a dict, produce unique list of key values.

Parameters:
  • val ([str | int | obj]) – The original list

  • key (str) – If val is a dict return a list with dicts with just that key

Returns:

Original list of items with duplicates removed

Return type:

list

>>> sorted(uniq([1,2,3,2]))
[1, 2, 3]

>>> sorted(uniq([ {"a":1}, {"a":2}, {"a":3}, {"a":2}]))
[{'a': 1}, {'a': 2}, {'a': 3}]

>>> sorted(uniq([ {"a":1}, {"a":2}, {"a":3}, {"a":2}, Exception()], "a"))
[{'a': 1}, {'a': 2}, {'a': 3}, Exception()]
url_filter(val)

url

Produces URL-encoded string of the value.

Parameters:

val (str) – The string to encoded

Returns:

Encoded string

Return type:

str

Exceptions

exception IntegrationError(value)

Class used to signal Integration Errors. It doesn’t add any specific information other than identifying the type of error

from resilient_lib import IntegrationError

raise IntegrationError("Example raising custom error")

Change Log

2024-07: version 51.0.2.1

  • All IBM SOAR Python libraries now only officially support Python 3.9, 3.11, and 3.12. To continue using SOAR libraries on earlier versions of Python, use v52.0.2.0.974

2024-05: version 51.0.2.0

  • Added official support for Python 3.12

2024-04: version 51.0.1.1

  • No major changes. Just bumping build number to coincide with other builds

2024-02: version 51.0.1.0

  • All SOAR Python libraries now officially support Python 3.11

  • Updated pytz dependency requirement to ~= 2024.1

  • Updated jinja2 dependency requirement to ~= 3.1 to address CVE-2024-22195

2024-01: version 51.0.0.2

2023-11: version 51.0.0.0

2023-10: version 50.1

  • Unit test improvements to better handle flaky requests tests

2023-08: version 50.0

  • No major changes. Just bumping build number to coincide with other builds

2023-07: version 49.1

  • No major changes. Just bumping build number to coincide with other builds

2023-05: version 49.0

  • Added new functions for pollers to interact with SOAR

2023-04: version 48.1

  • Updated SOAR case default filters for poller helper methods

2023-02: version 48.0

  • Added support for verify config in app.config when using resilient_lib.RequestsCommon.execute(). verify now can be set to True, False, or <path_to_your_custom_CA_bundle> to control the value used when verifying SSL certificates. This change is backward compatible with any apps that use RequestsCommon.execute() so any apps updated to run on v48.0 will pull in these changes and the verify configuration will be supported for that app

  • Updated resilient_lib.RequestsCommon to take advantage of persistent sessions from the requests library using requests.Session(). This has multiple advantages, including cookies persisting from the endpoint and significant performance increases when connecting to the same host. If non-session objects are desired, switch your code to use the new class resilient_lib.RequestsCommonWithoutSession which behaves the same as the old RequestsCommon would

  • Updated project to use pyproject.toml and setup.cfg metadata files. Build backend continues to use setuptools. Instead of directly invoking setup.py to get a sdist or wheel, use build as a build frontend:

    pip install build
    python -m build
    

2022-12: version 47.1

2022-11: version 47.0

  • Support for poller-based apps added with new Common Poller Methods, including resilient_lib.poller() decorator and resilient_lib.SOARCommon class of common SOAR methods

2022-08: version 46.0

  • build_incident_url urlencodes it’s orgId

2022-07: version 45.1

  • No major changes. Just bumping build number to coincide with other builds

2022-05: version 45.0

  • Added build_task_url to build direct links to incident Tasks

  • Updated build_incident_url to support appending Organization ID queries

  • Added client_auth_cert and client_auth_key options for client side certificates. These can be specified per app in the relevant [fn_some_app] section of the app.config and will be used in any requests made through resilient_lib.RequestsCommon.

2022-04: version 44.1

2022-02: version 44.0

  • Ensure tests/ is not included in packaged code

  • Officially support Python 3.9

2022-01: version 43.1

  • No major changes. Just bumping build number to coincide with other builds

2021-11: version 43.0

2021-11: version 42.3

  • No major changes. Just bumping build number to coincide with other builds

2021-10: version 42.2

  • No major changes. Just bumping build number to coincide with other builds

2021-08: version 42.1

  • No major changes. Just bumping build number to coincide with other builds

2021-08: version 42.0

  • Added support for HTTP_PROXY, HTTPS_PROXY and NO_PROXY environmental variables. See the App Host Deployment Guide for details.

  • Fix to OAuth2 to avoid infinite loop in some circumstances.

2021-06: version 41.1

  • Added execute as an alias for execute_call_v2

  • Bug fixes

2021-05: version 41.0

  • No major changes. Just bumping build number to coincide with other builds

2021-03: version 40.2

  • Bug fix for to use setuptools_scm < 6.0.0 for Python 2.7 environments

2021-03: version 40.1

  • No major changes. Just bumping build number to coincide with other builds

2021-02: version 40.0

  • bug fixes

2020-12: version 39

  • add a capability to close an incident

  • fixes a bug where timeout defined in function section is not processed as an int

2020-09: version 38

  • validate_fields now handles a field if it’s type is Text with value string

2020-05: version 37

  • execute_call_v2

  • might give an error when debugging with PyCharm. Added a workaround in a comment above the line where it occurs.

2020-04: version 36.2.dev

  • execute_call_v2 might give an error when debugging with PyCharm. Added a workaround in a comment above the line where it occurs.

2020-01-16: version 35.0.195

  • Added OAuth2 Client Credentials workflow handler

  • Added support for timeout argument in integrations config section

2019-10-07: version 34.0.194

  • Add functionality to resilient_lib to upload attachment

  • Added support for a timeout parameter across all integrations using the execute_call_v2() function

2019-08-02: version 33.0.189

  • Added support for API key and API key secret, now able to authenticate using API keys instead of email/password

  • New functions added to resilient-lib
    • get_file_attachment_metadata()

    • write_to_tmp_file()

  • Updated validate_fields() function in resilient-lib, adding the ability to validate fields in app.config

  • Other minor bug fixes.

2019-07-03: version 32.0.186

  • Added more flexible execute_call_v2() method in resilient-lib

  • Fix for deprecated log warnings

  • Other minor bug fixes

2019-04-12: version 32.0.140

  • Improvements to resilient-lib

  • Other minor bug fixes/improvements

2019-03-06: version 32.0.126

  • Removed selftest from function template

  • Improvements to resilient-lib

2019-01-15: version 32.0

  • Added resilient-lib to repo and PyPi - common library calls which facilitate the development of functions for IBM Resilient

  • Added Sphinx documentation builder