resilient-lib¶
Python module with library calls which facilitate the development of Apps for IBM SOAR
Usage¶
resilient-lib
is now a direct dependency of resilient-circuits
as of v40.0 to help make development more streamlined
It can also be installed directly with:
$ pip install resilient-lib
To use it within your App development, import it like any other Python Package:
from resilient_lib import readable_datetime
created_date = readable_datetime(fn_inputs.inc_create_date, rtn_format="%m-%d-%Y %H:%M:%S")
Proxies¶
You can now take advantage of the global proxy functionality.
There is no need to add an [integrations]
section like below.
Run the following on your App Host for more information:
$ manageAppHost proxy -h
The hierarchy of proxies is as follows:
RequestsCommon.execute() Function: the
proxies
parameter.Environmental Variables:
HTTPS_PROXY
,HTTP_PROXY
andNO_PROXY
set using themanageAppHost proxy
command on the App Host.Function Options:
http_proxy
orhttps_proxy
configs set in the Function Section ([my_function]
) of your app.config file.Integrations Options:
http_proxy
orhttps_proxy
configs set in the Integrations Section ([integrations]
) of your app.config file.
If using a version of App Host earlier than 1.6, resilient-lib
supports an [integrations]
section in your app.config file.
Add this section to define proxy settings to be used for all integrations that use this library.
[integrations]
verify=True
# These proxy settings will be used by all integrations.
# To override, add any parameter to your specific integration section
http_proxy=
https_proxy=
timeout=30
Certificate Verification¶
You can now take advantage of the global verify functionality.
Valid options for verify
are True, False, or a path to a
valid certificate chain file. The default (and suggested value)
is True
which will use the default Python CA bundle found at
REQUESTS_CA_BUNDLE
.
The hierarchy of proxies is as follows:
RequestsCommon.execute() Function: the
verify
parameter.Function Options:
verify
config set in the Function Section ([my_function]
) of your app.config file.Integrations Options:
verify
config set in the Integrations Section ([integrations]
) of your app.config file.
Example:
[my_function]
verify=/var/rescircuits/cafile.pem
http_proxy=
https_proxy=
timeout=30
Common Helper Methods¶
- build_incident_url(url, incidentId, orgId=None)¶
Build the url to link to a SOAR incident or CP4S case. Add
https
if http/https is not provided at the start. Ifurl
is not a string, returns back the value given.orgId
is optional to maintain backward compatibility, however, it is strongly recommended to provide the organization ID of the incident so that links work without unexpected hiccups when multiple orgs are available on your SOAR instanceReturns a URL in the format
https://<url>/#<incident_id>?orgId=<orgId>
.- Parameters:
url (str) – the URL of your SOAR instance
incidentId (str|int) – the id of the incident
orgId (str|int|None) – (optional) the id of the org the incident lives in. If the user is logged into a different org and this is not set, the link produced may direct the user to a different incident resulting in unexpected results
- Returns:
full URL to the incident
- Return type:
str
- build_resilient_url(host, port)¶
Build basic url to SOAR/CP4S instance. Add ‘https’ if http/https is not provided at the start. If
host
is not a string, returns back the value given.- Parameters:
host (str) – host name
port (str|int) – port
- Returns:
base url
- Return type:
str
- build_task_url(url, incident_id, task_id, org_id)¶
Build the url to link to a SOAR/CP4S task. Add
https
if http/https is not provided at the start. Ifurl
is not a string, returns back the value given.Returns a URL in the format
https://<url>/#<incident_id>?orgId=<org_id>&taskId=<task_id>&tabName=details
.- Parameters:
url (str) – the URL of your SOAR instance
incident_id (str|int) – the id of the incident
task_id (str|int) – the id of the task
org_id (str|int) – the id of the org the incident lives in
- Returns:
full URL to the task’s details tab
- Return type:
str
- clean_html(html_fragment)¶
SOAR textarea fields return HTML fragments. This routine removes the HTML and inserts any code within
<div></div>
with a linefeed.Note
The string returned from this method may not format well as no presentation of line feeds are preserved, tags such as
<br>
or<ol>
,<ul>
, etc. are removed. SeeMarkdownParser
class for a better way to translate html input to markdown.- Parameters:
html_fragment (str) – the html to clean up
- Returns:
cleaned up code
- Return type:
str
- close_incident(res_client, incident_id, kwargs, handle_names=False)¶
Close an incident in SOAR.
Any required on close (roc) fields that are needed, pass them as a
field_name:field_value
dict inkwargs
If any roc select field needs to be identified as its name, set
handle_names
toTrue
Example:
res = close_incident( res_client=self.rest_client(), incident_id=fn_inputs.incident_id, kwargs={"resolution_id": "Duplicate", "resolution_summary": "This ticket is a duplicate"}, handle_names=True )
- Parameters:
res_client (resilient_circuits.ResilientComponent.rest_client()) – required for communication back to SOAR
incident_id (int|str) – id of the incident
kwargs (dict) – required fields needed to close an incident in a
field_name:field_value
formathandle_names (bool) – if
True
, any select field types inkwargs
will takestr
instead ofint
as their value
- Returns:
Response from the server indicating if the incident was closed or not
- Return type:
requests.Response
- get_file_attachment(res_client, incident_id, artifact_id=None, task_id=None, attachment_id=None)¶
Call the SOAR REST API to get the attachment or artifact data for an Incident or a Task
If
incident_id
andartifact_id
are defined it will get that ArtifactIf
incident_id
andattachment_id
are defined it will get that Incident AttachmentIf
incident_id
,task_id
andattachment_id
are defined it will get that Task Attachment
Note
The
artifact_id
must reference an Artifact that is a downloadable type or it will raise aresilient.SimpleHTTPException
Example:
artifact_data = get_file_attachment(self.rest_client(), incident_id=2001, artifact_id=1) with open("malware.eml", "wb") as f: f.write(artifact_data)
- Parameters:
res_client (resilient_circuits.ResilientComponent.rest_client()) – required for communication back to SOAR
incident_id (int|str) – id of the Incident
artifact_id (int|str) – id of the Incident’s Artifact to download
task_id (int|str) – id of the Task to download it’s Attachment from
attachment_id (int|str) – id of the Incident’s Attachment to download
- Returns:
byte string of attachment
- Return type:
str
- get_file_attachment_metadata(res_client, incident_id, artifact_id=None, task_id=None, attachment_id=None)¶
Call the SOAR REST API to get the attachment or artifact attachment metadata
- Parameters:
res_client (resilient_circuits.ResilientComponent.rest_client()) – required for communication back to SOAR
incident_id (int|str) – id of the Incident
artifact_id (int|str) – id of the Incident’s Artifact
task_id (int|str) – id of the Task to get it’s Attachment metadata
attachment_id (int|str) – id of the Incident’s Attachment
- Returns:
File metadata returned from endpoint
- Return type:
dict
- get_file_attachment_name(res_client, incident_id=None, artifact_id=None, task_id=None, attachment_id=None)¶
Call the SOAR REST API to get the attachment or artifact attachment name
- Parameters:
res_client (resilient_circuits.ResilientComponent.rest_client()) – required for communication back to SOAR
incident_id (int|str) – id of the Incident
task_id (int|str) – id of the Task
attachment_id (int|str) – id of the Incident’s Attachment to get the name for
- Returns:
name of the Attachment or Artifact
- Return type:
str
- readable_datetime(timestamp, milliseconds=True, rtn_format='%Y-%m-%dT%H:%M:%SZ')¶
Convert an epoch timestamp to a string using a format
- Parameters:
timestamp (int) – ts of object sent from SOAR Server i.e.
incident.create_date
milliseconds (bool) – Set to
True
if ts in millisecondsrtn_format (str) – Format of resultant string. See https://docs.python.org/3.6/library/datetime.html#strftime-and-strptime-behavior for options
- Returns:
string representation of timestamp
- Return type:
str
- str_to_bool(value)¶
Convert value to either a
True
orFalse
booleanReturns
False
ifvalue
is anything other than:'1', 'true', 'yes' or 'on'
- Parameters:
value (str) – the value to convert
- Returns:
True
orFalse
- Return type:
bool
- unescape(data)¶
- Return unescaped data such as:
>
converts to>
"
converts to'
- Parameters:
data (str) – text to convert
- Returns:
the text unescaped
- Return type:
str
- validate_fields(field_list, kwargs)¶
Ensure each mandatory field in
field_list
is present inkwargs
. Throw ValueError if not.field_list
can be a list/tuple ofstrings
where each string is a field name or it can be a list/tuple ofdicts
where each item has the attributesname
(required) andplaceholder
(optional).kwargs
can be a dict or namedtuple. If a namedtuple, it calls itskwargs._as_dict()
method and raises aValueError
if it does not succeed.If the value of the item in
kwargs
is equal to itsplaceholder
defined infield_list
, aValueError
is raised.If an item in
kwargs
is a Resilient Select Function Input, its value will be adict
that has aname
attribute. This returns the value ofname
.If an item in
kwargs
is a Resilient Multi-Select Function Input, its value will be a list ofdicts
that have thename
attribute. This returns a list of thename
values for that item.
- Parameters:
field_list (list|tuple) – the mandatory fields. Can be an empty list if no mandatory fields.
kwargs (dict|namedtuple) – dict or a namedtuple of all the fields to search.
- Raises:
ValueError – if a field is missing
- Returns:
a Dictionary of all fields with Select/Multi-Select fields handled.
- Return type:
dict
- write_file_attachment(res_client, file_name, datastream, incident_id, task_id=None, content_type=None)¶
Add a file attachment to SOAR using the REST API to an Incident or a Task
Example:
with open("malware.eml", "rb") as data_stream: res = write_file_attachment(self.rest_client(), "malware.eml", data_stream, 2001)
- Parameters:
res_client (
ResilientComponent.rest_client()
) – required for communication back to SOARfile_name (str) – name of the attachment to create
dataStream (stream of bytes) – stream of bytes used to create the attachment
incident_id (int|str) – id of the Incident
task_id (int|str) – (optional) id of the Task
content_type – (optional) MIME type of attachment. Default is
"application/octet-stream"
content_type – str
- Returns:
metadata of new attachment created
- Return type:
dict
- write_to_tmp_file(data, tmp_file_name=None, path_tmp_dir=None)¶
Writes data to a file in a safely created temp directory.
If no
tmp_file_name
is provided, a temp name will be givenIf no
path_tmp_dir
is provided a temp directory is created with the prefix resilient-lib-tmp-
Note
When used, ensure you safely remove the created temp directory in the
finally
block of theFunctionComponent
code.Example:
import os import shutil try: path_tmp_file, path_tmp_dir = write_to_tmp_file(attachment_contents, tmp_file_name=attachment_metadata.get("name")) except Exception: yield FunctionError() finally: if path_tmp_dir and os.path.isdir(path_tmp_dir): shutil.rmtree(path_tmp_dir)
- Parameters:
data (bytes) – bytes to be written to the file
tmp_file_name (str) – name to be given to the file
path_tmp_dir (str) – path to an existing directory to use as the temp dir
- Returns:
a tuple (path_tmp_file, path_tmp_dir)
- Return type:
tuple
- class MarkdownParser(bold='**', italic='*', underline='__', strikeout='~~', bullets='*', number=0, indent=4, monospace=['{{', '}}'], headers=['h1.', 'h2.', 'h3.', 'h4.', 'h5.', 'h6.'], blockquote='```')¶
Convert HTML text into Markdown. A wrapper for html.parser.HTMLParser
Example:
from resilient_lib import MarkdownParser data = "<div class='rte'><div><strong><u>underline and strong</u></strong></div></div>" markdown = "*_underline and strong_*" parser = MarkdownParser(bold="*", underline="_") # override defaults converted = parser.convert(data) self.assertEqual(converted, markdown)
- MarkdownParser.convert(self, data)¶
Converts html
data
to markdown and returns the converted string- Parameters:
data (str) – html text to convert
- Returns:
converted text to markdown
- Return type:
str
- class OAuth2ClientCredentialsSession(url=None, client_id=None, client_secret=None, scope=None, proxies=None)¶
Wrapper around requests.Session which receives authentication tokens through the
client_credentials
type of OAuth2 grant, and adds tokens to the requests made through the session. It also attempts to keep track of the token and refresh it as needed.If proxies are defined, every request will use them.
This session does not request authorization from the user first. The scope should be pre-authorized.
Example:
from resilient_lib import OAuth2ClientCredentialsSession, RequestsCommon api1 = OAuth2ClientCredentialsSession('https://example1.com/<tenant_id>/oauth/v2/', client_id='xxx', client_secret='xxx') api2 = OAuth2ClientCredentialsSession('https://example2.com/<tenant_id>/oauth/v2/', client_id='xxx', client_secret='xxx') api1.post('https://example1.com/v4/me/messages', data={}) # use as a regular requests session object api2.get('https://example2.com/v2/me/updates') # When writing an integration, use RequestsCommon to get the proxies defined in in your app.config file. rc = RequestsCommon(opts, function_opts) api3 = OAuth2ClientCredentialsSession('https://example3.com/{}/test', proxies=rc.get_proxies())
- Parameters:
url (str) – Authorization URL, with tenant_id in it, if required
client_id (str) – API key/User ID
client_secret (str) – secret for API key
scope ([str]) – (optional) list of scopes
proxies (dict) –
(optional) Dictionary mapping protocol to the URL of the proxy. The mapping protocol must be in the format:
{ "https_proxy": "https://localhost:8080, "http_proxy": "http://localhost:8080 }
- add_authorization_header(headers)¶
Create headers needed for authentication/authorization, overriding the default ones if needed.
- Parameters:
headers (dict) – Dictionary containing the
Authorization
header
- authenticate(url, client_id, client_secret, scope, proxies=None)¶
Authenticate with the endpoint
- get_token(token_url, client_id, client_secret, scope=None, proxies=None)¶
Override this method if the request needs specific information in the body of the request
For example, Cloud Foundry asking
grant_type
to bepassword
and API key to be passed inpassword
.The default is:
post_data = { 'client_id': client_id, 'client_secret': client_secret, 'grant_type': 'client_credentials' } if scope: post_data['scope'] = scope
- Parameters:
token_url (str) – Authorization URL, with tenant_id in it, if required
client_id (str) – API key/User ID
client_secret (str) – secret for API key
scope ([str]) – (optional) list of scopes
proxies (dict) – (optional) Dictionary mapping protocol to the URL of the proxy.
- Returns:
the
response
from thetoken_url
- Return type:
- request(method, url, *args, **kwargs)¶
Constructs a requests.Request, injects it with tokens and sends it in a new session to avoid having one session constantly open.
This uses the
requests.request()
function to make a call. The inputs are mapped to this function. See requests.request() for information on any parameters available- Returns:
the
response
object- Return type:
- update_token()¶
Institutes a request for a new access token.
- Raises:
ValueError – If it cannot update the token
- Returns:
True if could get a new token
- Return type:
bool
Common Request Methods¶
- class RequestsCommon(opts=None, function_opts=None)¶
This class represents common functions around the use of the requests package for REST based APIs. It incorporates the app.config section
[integrations]
which can be used to define a common set of proxies for use by all functions using this library.Any similar properties in the function’s section would override the [integrations] properties.
Note
In the Atomic Function template, as of version 41.1,
RequestsCommon
is instantiated and available in a class that inheritsAppFunctionComponent
as anrc
attribute:response = self.rc.execute(method="GET", url="ibm.com")
- Parameters:
opts (dict) – all configurations found in the app.config file
function_opts (dict) – all configurations found in the
[my_function]
section of the app.config file
- RequestsCommon.execute(self, method, url, timeout=None, proxies=None, callback=None, clientauth=None, verify=None, retry_tries=1, retry_delay=1, retry_max_delay=None, retry_backoff=1, retry_jitter=0, retry_exceptions=<class 'requests.exceptions.HTTPError'>, **kwargs)¶
Constructs and sends a request. Returns a requests.Response object.
This uses the
requests.request()
function to make a call. The inputs are mapped to this function. See requests.request() for information on any parameters available, but not documented here.Retries can be achieved through the parameters prefixed with
retry_<param>
. Retry is only available in PY3+.- Parameters:
method (str) – Rest method to execute (
GET
,POST
, etc…)url (str) – URL to execute request against
timeout (float or tuple) – (Optional) Number of seconds to wait for the server to send data before sending a float or a timeout tuple (connect timeout, read timeout). See requests docs for more. If
None
it looks in the[integrations]
section of your app.config for thetimeout
setting.proxies (dict) –
(Optional) Dictionary mapping protocol to the URL of the proxy. The mapping protocol must be in the format:
{ "https_proxy": "https://localhost:8080, "http_proxy": "http://localhost:8080 }
callback (function) –
(Optional) Once a response is received from the endpoint, return this callback function passing in the
response
as its only parameter. Can be used to specifically handle errors. Ifcallback
is given, any retry parameters will be ignored unless thecallback
function raises an error in the parameterretry_exceptions
(see below) in which case the retry logic will kick in.Example:
from resilient_lib import IntegrationError, RequestsCommon def custom_callback(response): """ custom callback function to handle 400 error codes """ if response.status_code >= 400 and response.status_code < 500: # raise ValueError which will be retried raise ValueError("retry me") else: # all other status codes should return normally # note this bypasses the normal rc.execute logic which # would raise an error other 500 errors return response rc = RequestsCommon() try: # will retry 3 times then will raise IntegrationError response = rc.execute("GET", "https://postman-echo.com/status/404", callback=custom_callback, retry_tries=3, retry_exceptions=ValueError) except IntegrationError as err: print(err)
clientauth (str or tuple(str, str)) –
(Optional) Equivalent to the
cert
parameter ofrequests
. Client-side certificates can be configured automatically in the app’s section of the app.config. These values will be read in automatically byRequestsCommon
whenexecute
is called.Example:
[resilient] ... # configs for connection to SOAR [fn_my_app] ... # some other configs client_auth_cert=<path_to_client_auth_cert_file> client_auth_key=<path_to_client_auth_private_key_file>
If not set in the app.config file, the filepath for the client side certificate and the private key can be passed to this function either as the path to a single file or as a tuple of both files’ paths.
verify (bool or str) –
(Optional) Either a boolean, in which case it controls whether requests verifies the server’s TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Defaults to
None
as passed in here. IfNone
, then the value is searched for in the app’s section of the app.config. If no value is found, then the requests.request() default ofTrue
is used. In that case, the CA bundle found atrequests.utils.DEFAULT_CA_BUNDLE_PATH
is the CA bundle used, which is usually a bundle provided by Mozilla. Settingverify=True
is the safest option and should only be changed if you have a self-signed certificate or you want to bypass SSL for testing purposes. A production level app should never disable SSL verification completely.Example:
[fn_my_app] ... # some other configs verify=<path/to/CA/bundle/to/use>
Note
The value held in the app’s config will be overwritten if a value is passed in by the call to
execute
.retry_tries (int) – (PY3 only) The maximum number of attempts. Default:
1
(no retry). Use-1
for unlimited retries. Matchestries
parameter of retry.api.retry_call.retry_delay (int) –
(PY3 only) Initial delay between attempts. Default:
1
. Matchesdelay
parameter of retry.api.retry_call.retry_max_delay (int) –
(PY3 only) The maximum value of delay. Default:
None
(no limit). Matchesmax_delay
parameter of retry.api.retry_call.retry_backoff (int) –
(PY3 only) Multiplier applied to delay between attempts. Default:
1
(no backoff). Matchesbackoff
parameter of retry.api.retry_call.retry_jitter (int | tuple(int, int)) –
(PY3 only) Extra seconds added to delay between attempts. Default:
0
. Fixed if a number, random if a range tuple (min, max). Matchesjitter
parameter of retry.api.retry_call.retry_exceptions (Exception | tuple(Exception)) –
(PY3 only) An exception or a tuple of exceptions to catch. Default:
requests.exceptions.HTTPError
. Matchesexceptions
parameter of retry.api.retry_call.
- Returns:
the
response
from the endpoint or returncallback
if defined.- Return type:
requests.Response object or
callback
function.
- RequestsCommon.get_proxies(self)¶
Proxies can be specified globally for all integrations or specifically per function.
If the environmental variables HTTPS_PROXY, HTTP_PROXY and NO_PROXY are set, this returns
None
, as if for a requests.Request. Ifproxies
areNone
, the environmental variables are usedIf
http_proxy
orhttps_proxy
is set in the Function Section ([my_function]
) of your app.config file, returns a dictionary mapping protocol to the URL of the proxy.If
http_proxy
orhttps_proxy
is set in the Integrations Section ([integrations]
) of your app.config file, returns a dictionary mapping protocol to the URL of the proxy.
- Returns:
A dictionary mapping protocol to the URL of the proxy or
None
- Return type:
dict or
None
- RequestsCommon.get_verify(self)¶
Get
verify
parameter from app config or from env varREQUESTS_CA_BUNDLE
which is the default way to set verify with python requests library.Value can be set in
[integrations]
or in the[fn_my_app]
sectionValue in
[fn_my_app]
takes precedence over[integrations]
which takes precedence overREQUESTS_CA_BUNDLE
- Parameters:
app_options (dict) – App config dict
- Returns:
Value to set
requests.session.verify
to. Either a path or a boolean- Return type:
bool or str or None (which will default to requests default which is
True
)
- RequestsCommon.get_client_auth(self)¶
A client certificate for authenticating calls to external endpoints can be specified on a per function basis.
If
client_auth_cert
andclient_auth_key
are set in the Function Section ([my_function]
) of your app.config file, returns a tuple containing the respective paths to the certificate and private key for the client cert.Example:
[my_function] ... client_auth_cert = <path_to_cert.pem> client_auth_key = <path_to_cert_private_key.pem>
- Returns:
The filepaths for the client side certificate and the private key as a tuple of both files’ paths or
None
if either one of the values are missing- Return type:
tuple(str, str)
- RequestsCommon.get_timeout(self)¶
Get the default timeout value in the Function Section (
[my_function]
) of your app.config file, or from the Integrations Section ([integrations]
) of your app.config file.[my_function]
section takes precedence over[integrations]
section value.- Returns:
timeout value
- Return type:
int
- class RequestsCommonWithoutSession(*args, **kwargs)¶
This class extends
RequestsCommon
maintaining the behavior ofRequestsCommon
that was present in versions <= 47.1.x.The difference is that every time
RequestsCommonWithoutSession.execute()
is called, this class will userequests.request
to execute the request, whileRequestsCommon.execute()
usesSession().request
.This class is interchangeable with
RequestsCommon
and code that usesRequestsCommon
can be simiply refactored to useRequestsCommonWithoutSession
- Parameters:
args (list) – positional arguments matching
RequestsCommon
kwargs (dict) – named keyword arguments matching
RequestsCommon
Common Poller Methods¶
- poller(named_poller_interval, named_last_poller_time)¶
Decorator to wrap a function as the logic for a poller.
Example:
import logging from threading import Thread from resilient_circuits import AppFunctionComponent, is_this_a_selftest from resilient_lib import poller, get_last_poller_date PACKAGE_NAME = "fn_my_app" LOG = logging.getLogger(__name__) class MyPoller(AppFunctionComponent): def __init__(self, opts): super(PollerComponent, self).__init__(opts, PACKAGE_NAME) polling_interval = 5 # set to 5 seconds or could get from ``self.options`` last_poller_time = get_last_poller_date(120) # look back 2 hours if is_this_a_selftest(self): LOG.warn("Running selftest -- disabling poller") else: poller_thread = Thread(target=self.run) poller_thread.daemon = True poller_thread.start() @poller('polling_interval', 'last_poller_time') def run(self, *args, **kwargs): # poll endpoint query_results = query_entities_since_last_poll(kwargs['last_poller_time']) # process any results to create, update, or close case in SOAR if query_results: self.process_entity_list(query_results)
- Parameters:
named_poller_interval (str) – name of instance variable containing the poller interval in seconds
named_last_poller_time (datetime) – name of instance variable containing the lookback value in mseconds
- class SOARCommon(rest_client)¶
Common methods for accessing IBM SOAR cases and their entities: comments, attachments, etc. This class and its methods should be used in conjunction with wrapped polling logic to communicated with the SOAR platform.
- create_case_comment(case_id, note, entity_comment_id=None, entity_comment_header=None)¶
Add a comment to the specified SOAR case by ID.
- Parameters:
case_id (str|int) – SOAR case ID
note (str) – Content to be added as a note to the case
entity_comment_id (str|int) – (Optional) entity comment id if updating an existing comment
entity_comment_header (str) – (Optional) header to place in bold at the top of the note
- Returns:
Response from posting the comment to SOAR
- Return type:
dict
- create_datatable_row(case_id, datatable, rowdata)¶
Create a row in a SOAR datatable.
rowdata
should be formatted as a dictionary of column name and value pairsExample:
from resilient_lib import SOARCommon soar_common = SOARCommon(res_client) case_id = get_case_id() rowdata = {"column_1_api_name": 1, "column_2_api_name": 2} soar_common.create_datatable_row(case_id, "my_dt", rowdata)
- Parameters:
case_id (str|int) – case containing the datatable
datatable (str) – API name of datatable
rowdata (dict) – columns and values to add
- create_soar_case(case_payload)¶
Create a new IBM SOAR case based on a payload formatted for the API call.
- Parameters:
case_payload (dict) – Fields to use for creating a SOAR case
- Returns:
API result with the created case
- Return type:
dict
- filter_soar_comments(case_id, entity_comments, soar_header=None)¶
Read all SOAR comments from a case and remove those comments which have already been synced using
soar_header
as a filter- Parameters:
case_id (str|int) – ([str]): [IBM SOAR case id]
entity_comments (list(str)) – list comments which the endpoint’s entity contains. This will be a mix of comments sync’d from SOAR and new comments
soar_header (str) – title added to SOAR comments to be filtered
- Returns:
list of remaining comments
- Return type:
list
- get_case(case_id)¶
Get a SOAR case based on the case id
- Parameters:
case_id (str|int) – ID of the case to get the details of
- Returns:
Details of the case for a given ID
- Return type:
dict
- get_case_artifacts(case_id)¶
Get all case artifacts
- Parameters:
case_id (str|int) – ID of the case to get the details of
- get_case_attachment(case_id, artifact_id=None, task_id=None, attachment_id=None, return_base64=True)¶
Get contents of a file attachment or artifact.
If
incident_id
andartifact_id
are defined it will get that ArtifactIf
incident_id
andattachment_id
are defined it will get that Incident AttachmentIf
incident_id
,task_id
andattachment_id
are defined it will get that Task Attachment
- Parameters:
case_id (str|int) – ID of the case to get the details of
artifact_id (str|int) – ID of the Incident’s Artifact to download
task_id (str|int) – ID of the Task to download it’s Attachment from
attachment_id (int|str) – id of the Incident’s Attachment to download
return_base64 (bool) – if False, return value of the content will be given as bytes, default True returns a base64 encoded string
- Returns:
name of the artifact or file and base64 encoded string or byte string of attachment
- Return type:
tuple(str, bytes|base64(str))
- get_case_attachments(case_id, return_base64=True)¶
Get all case attachments
- Parameters:
case_id (str|int) – ID of the case to get the details of
return_base64 (bool) – if False, return value of the content will be given as bytes
- Returns:
list of attachments associated with the given case
- Return type:
list[tuple(str, bytes|base64(str))]
- get_case_comments(case_id)¶
Get all case comments
- Parameters:
case_id (str|int) – ID of the case to get the details of
- get_case_tasks(case_id, want_layouts=False, want_notes=False, handle_format='names')¶
Get all the tasks from a SOAR case
- Parameters:
case_id (str|int) – IBM SOAR case id
want_layouts (bool) – If the task layout should be returned. Default is False.
want_notes (bool) – If the task notes should be returned. Default is False.
handle_format (str) – The format to return can be names, ids, or objects. Default is names.
- Returns:
A list of all the tasks for the given SOAR case
- Return type:
list
- get_soar_case(search_fields, open_cases=True, uri_filters='return_level=normal')¶
Find a SOAR case which contains custom field(s) associated with the associated endpoint. Returns only one case. See
SOARCommon.get_soar_cases()
for examples.Note
search_fields
only supports custom fields.- Parameters:
search_fields (dict) – Dictionary containing key/value pairs to search for a case match. Field values can be True/False for
has_a_value
ordoes_not_have_a_value
, otherwise a field will useequals
for the value.uri_filters (str) – Filters for the end of the uri. Default is “return_level=normal”
open_cases (bool) – True if only querying open cases.
- Returns:
A tuple with the matching case, if any, and any associated error message if something went wrong. Returns
None
if no associated case was found.- Return type:
tuple(dict, str)
- get_soar_cases(search_fields, open_cases=True, uri_filters='return_level=normal')¶
Get all IBM SOAR cases that match the given search fields. To find all cases that are synced from the endpoint platform, provide the unique search field that matches the unique id of your endpoint solution.
Example:
from resilient_lib import SOARCommon soar_common = SOARCommon(res_client) found_id = get_id_from_endpoint_query_result() cases = soar_common.get_soar_cases({ "endpoint_id": found_id }, open_cases=False)
Note
search_fields
only supports custom fields.- Parameters:
search_fields (dict) – Dictionary containing key/value pairs to search for a case match. Field values can be True/False for
has_a_value
ordoes_not_have_a_value
, otherwise a field will useequals
for the value.uri_filters (str) – Filters for the end of the uri. Default is “return_level=normal”
open_cases (bool) – True if only querying open cases.
- Returns:
A tuple with a list of cases whose values match the
search_fields
and any associated error message.- Return type:
tuple(dict, str)
- lookup_artifact_type(artifact_type)¶
Return an artifact type based on it’s ID on SOAR. If not found, return None
- Parameters:
artifact_type (str) – artifact type to search for in SOAR
- Returns:
found artifact type if found in SOAR
- Return type:
str
- lookup_incident_types(incident_type_ids)¶
Return an incident type based on it’s ID. If not found, return None
- Parameters:
incident_type_ids (list(str)) – list of incident type IDs to check for in SOAR
- Returns:
list of incident types that match in SOAR what was given
- Return type:
list(str)
- update_soar_case(case_id, case_payload)¶
Update an IBM SOAR case (usually from a rendered Jinja template).
- Parameters:
case_payload (dict) – Fields to be updated and their values
- Returns:
The updated SOAR case case
- Return type:
dict
- eval_mapping(eval_value, wrapper=None)¶
Map a JSON string to a python object. Safely evaluates the values with the use of
ast.literal_eval
and can ONLY convert to a list or a dictionary.Note
wrapper
is optional (but recommended) to ensure that theeval_value
is properly formatted as JSON. IfNone
, then assumes that theeval_value
comes pre-formatted in acceptable JSON formatThis method is intended to be used to safely take input strings from the user’s config file that will eventually need to evaluated as JSON elsewhere in the code.
Example:
from resilient_lib import eval_mapping config_value = app_configs.get("some_config") # Assume that 'config_value' has a value of # '"source":"A","tags":["tagA"],"priorities":[40,50]' # parse the values to a dict, using a dictionary wrapper: parsed_config = eval_mapping(config_value, "{{ {} }}") # the returned value is a dict like: # { # "source": "A", # "tags": ["tagA"], # "priorities": [40, 50] # }
- Parameters:
eval_value (str) – json fragment to evaluate
wrapper (str) – wrapper to put the source value into
[{}]
or{{ {} }}
- Returns:
converted data
- Return type:
list|dict
Common Jinja Methods¶
Common methods and filters used to process Jinja Templates
Usage:
Create a Jinja template in the
/util/templates
directory of the App.
Note
Normally a template is in JSON format:
{
"name": "{{ alert_name }}",
"severity": "{{ alert_severity }}"
}
2. (Optional) Add an option to your app.config
file to specify an absolute
path to a custom Jinja template called: custom_template_path
- if not provided the
value of DEFAULT_TEMPLATE_PATH
is used.
Add the following code to your Function:
import pkg_resources
from resilient_circuits import AppFunctionComponent, FunctionResult, app_function
from resilient_lib import make_payload_from_template
PACKAGE_NAME = "my_app"
FN_NAME = "my_function_using_jinja"
# Creating an absolute path to the template
DEFAULT_TEMPLATE_PATH = pkg_resources.resource_filename(PACKAGE_NAME, "util/templates/<default_name>.jinja2")
class FunctionComponent(AppFunctionComponent):
def __init__(self, opts):
super(FunctionComponent, self).__init__(opts, PACKAGE_NAME)
@app_function(FN_NAME)
def _app_function(self, fn_inputs):
'''
Function: A function showing how to use Jinja templates
Inputs:
- fn_inputs.alert_severity
- fn_inputs.device_id
'''
custom_template_path = self.app_configs.custom_template_path if hasattr(self.app_configs, "custom_template_path") else ""
template_rendered = make_payload_from_template(
template_override=custom_template_path,
default_template=DEFAULT_TEMPLATE_PATH,
payload={
"alert_name": f"Malware found on device {fn_inputs.device_id}",
"alert_severity": fn_inputs.alert_severity
},
return_json=True
)
self.LOG.info(template_rendered)
results = {"template_rendered": template_rendered}
yield FunctionResult(results)
Output:
INFO [my_function_using_jinja] {'name': 'Malware found on device 303', 'severity': 'High'}
4. Update your README.md
documentation file to include relevant information about the templates,
for example:
## Templates for SOAR Cases
It may necessary to modify the templates used to create SOAR cases based on a customer's required custom fields.
Below are the default templates used which can be copied, modified and used with app.config's
`custom_template_path` setting to override the default template.
### custom_template.jinja
```
{
"name": "{{ alert_name }}",
"severity": "{{ alert_severity }}"
}
```
- base64_filter(val, indent=2)¶
base64
Breaks text into fixed-width blocks. You can specify the
indent
.- Parameters:
val (str) – The string to convert
indent (int) – Number of tabs
- Returns:
Converted string
- Return type:
str
- camel_filter(val)¶
camel
Convert text to CamelCase.
- Parameters:
val (str) – The string to convert
- Returns:
Converted string
- Return type:
str
This value is in camel case: {{ a#bc_def | camel }} >>> 'ABcDef'
- global_jinja_env()¶
Return the Jinja environment with our resilient-lib custom filters. This environment can be expanded upon to add additional custom filters.
See Jinja Custom Filters for more.
Current custom filters available:
json, js, html, url, idna, punycode, ldap, ps, sh, pretty, timestamp, iso8601, uniq, sample, camel, base64, soar_datetimeformat, soar_display_datetimeformat, soar_substitute, soar_splitpart, soar_trimlist
- Returns:
The Jinja environment
- Return type:
Example:
from resilient-lib import global_jinja_env addl_custom_filters = { "filter_name": method_name } env = global_jinja_env() env.globals.update(addl_custom_filters) env.filters.update(addl_custom_filters)
- html_filter(val)¶
html
Produces HTML-encoded string of the value.
- Parameters:
val (str) – The string to encode
- Returns:
Encoded string
- Return type:
str
- idna_filter(val)¶
idna
Encodes the value per RFC 3490.
- Parameters:
val (str) – The string to encode
- Returns:
Encoded string
- Return type:
str
- iso8601(val)¶
iso8601
Assuming
val
is an epoch milliseconds timestamp, produce ISO8601 datetime.- Parameters:
val (str|int) – An epoch milliseconds timestamp
- Returns:
ISO8601 datetime
- Return type:
str
- js_filter(val)¶
js
Produces JSONified string of the value, without surrounding quotes.
- Parameters:
val (str) – The string to convert
- Returns:
JSONified string of the value, without surrounding quotes
- Return type:
str
- json_filter(val, indent=0)¶
json
Produces JSONified string of the value.
- Parameters:
val (str) – The string to convert
- Returns:
JSONified string of the value
- Return type:
str
- ldap_filter(val)¶
ldap
Produces LDAP-encoded string of the value.
- Parameters:
val (str) – The string to encode
- Returns:
Encoded string
- Return type:
str
- make_payload_from_template(template_override, default_template, payload, return_json=True)¶
Convert a payload into a new format based on a specified template.
- Parameters:
template_override (str) – Path to the specified template (usually taken from the app.config file. See the Usage example above)
default_template (str) – Path to the default template (usually in the ‘/util/templates’ directory. See the Usage example above)
payload (dict) –
dict
of payload that is passed to Jinja templatereturn_json (bool) – False if template should be render as a
str
and results returned as astr
- Returns:
If the Jinja template is valid JSON and
return_json
isTrue
the result is returned as adict
else it returns the rendered template as astr
- Return type:
str|dict
- Raises:
ValueError – if
return_json
isTrue
and the Jinja template is not valid JSON
- pretty_filter(val, indent=2)¶
pretty
Produces pretty-printed string of the value.
- Parameters:
val (str) – The string to format
indent (int) – Number of tabs to use when formatting
- Returns:
The formatted string
- Return type:
str
- ps_filter(val)¶
ps
Escapes characters in
val
for use in a PowerShell command line.- Parameters:
val (str) – The string to escaped
- Returns:
Escaped string
- Return type:
str
- punycode_filter(val)¶
punycode
Encodes the value per RFC 3492.
- Parameters:
val (str) – The string to encode
- Returns:
Encoded string
- Return type:
str
- render(template, data)¶
Render data into a template, producing a string result. All the additional custom filters are available.
- Parameters:
template (str or dict) – Path to or a dict of the Jinja template
data (dict) – JSON data to apply to the template
- Returns:
result from the rendering of the template. The template is usually a string, but can be a dict
- Return type:
str or dict
Examples:
>>> render("template {{value}}", {"value":"123"}) u'template 123' >>> render({"template": "{{value}}"}, {"value":"123"}) u'{"template": "123"}' You can escape values using the 'json' filter, or the 'url' or 'html' or 'ldap' filters. >>> render('{"template": {{value|json}} }', {"value":'1"23'}) u'{"template": "1\\"23" }' >>> render('{"template": "{{value|js}}" }', {"value":'1"23'}) u'{"template": "1\\"23" }' >>> render('{"template": {{value|ldap}} }', {"value":'1*23'}) u'{"template": 1\\2a23 }' >>> render('shell "{{value|ps}}"', {"value":'$"foo"'}) u'shell "`$`"foo`""' >>> render('shell "{{value|sh}}"', {"value":'$"foo"'}) u'shell "\\$\\"foo\\""' >>> render('template={{value|timestamp}}', {"value":0}) u'template=0' >>> render('template={{value|timestamp}}', {}) u'template=null' >>> render('template={{value|timestamp}}', {"value":{"year":2015, "month":7, "day":15}}) u'template=1436918400000' >>> render('template={{value|timestamp}}', {"value":datetime.datetime(2015, 7, 15)}) u'template=1436918400000'
- render_json(template, data)¶
Render data into a template, producing a JSON result. Also clean up any “really bad” control characters to avoid failure.
- Parameters:
template (str or dict) – Path to or a dict of the Jinja template
data (dict) – dict to apply to the template
- Returns:
result from the rendering of the template as a dictionary
- Return type:
dict
Examples:
>>> d = {"value": "the" + chr(10) + "new" + chr(10) + "thing"} >>> render_json('{"result":"{{value}}"}', d) {u'result': u'the new thing'} >>> d = {"value": "the" + chr(1) + "new" + chr(9) + "thing"} >>> render_json('{"result":"{{value}}"}', d) {u'result': u'the new thing'}
- sample_filter(val, count=None)¶
sample
Return a random sample from a list.
- Parameters:
val (list) – List of str | obj | int
count (int |
None
) – Number of times to repeat items inval
to increase its random probability
- Returns:
The random item
- Return type:
str | obj | int
- sh_filter(val)¶
sh
Escapes characters in
val
for use in a Unix shell command line.- Parameters:
val (str) – The string to escaped
- Returns:
Escaped string
- Return type:
str
- soar_datetimeformat(value, date_format='%Y-%m-%dT%H:%M:%S', split_at=None)¶
soar_datetimeformat
Convert UTC dates to epoch format.
- Parameters:
value (str) – The UTC date string
date_format (str) – (optional) Conversion format. Defaults to
"%Y-%m-%dT%H:%M:%S"
split_at (str) –
(optional) Character to split the date field to scope the date field
split_at='.' to remove milliseconds for "2021-10-22T20:53:53.913Z" split_at='+' to remove tz information "2021-10-22T20:53:53+00:00"
- Returns:
Epoch value of datetime, in milliseconds
- Return type:
int
- soar_splitpart(value, index, split_chars=' - ')¶
soar_splitpart
Split a string and return the index.
- Parameters:
value (str) – string to split
index (int) – index of split to return
split_chars (str) – (optional) split characters. Defaults to
' - '
- Returns:
value of split. If
index
is out of bounds, the originalvalue
is returned- Return type:
str
- soar_substitute(value, json_str)¶
soar_substitute
Replace values based on a lookup dictionary.
- Parameters:
value (str) – original value to lookup
json_str (JSON encoded str) – JSON encoded string with key/value pairs of lookup values
- Returns:
replacement value or original value if no replacement found
- Return type:
str | int
- soar_trimlist(org_list)¶
soar_trimlist
Trim whitespace from elements in a list.
- Parameters:
org_list (list of strings) – list of elements to trim whitespace from
- Returns:
list with elements trimmed of whitespace
- Return type:
list
- timestamp(val)¶
timestamp
Try convert non-timestamp values to a timestamp.
- Parameters:
val (str | dict) – Either
"now"
or a dict containing year / month / day etc.- Returns:
An epoch milliseconds timestamp
- Return type:
int
>>> timestamp({"year": 2018, "month": 8, "day": 1, "timezoneID": "CET"}) 1533078000000 >>> timestamp(Undefined()) 'null' >>> timestamp("now") > 1530000000000 True >>> timestamp("now") > 2000000000000 # 2033 False
- uniq(val, key=None)¶
uniq
Produce the unique list. If
val
is a dict, produce unique list of key values.- Parameters:
val ([str | int | obj]) – The original list
key (str) – If
val
is a dict return a list with dicts with just thatkey
- Returns:
Original list of items with duplicates removed
- Return type:
list
>>> sorted(uniq([1,2,3,2])) [1, 2, 3] >>> sorted(uniq([ {"a":1}, {"a":2}, {"a":3}, {"a":2}])) [{'a': 1}, {'a': 2}, {'a': 3}] >>> sorted(uniq([ {"a":1}, {"a":2}, {"a":3}, {"a":2}, Exception()], "a")) [{'a': 1}, {'a': 2}, {'a': 3}, Exception()]
- url_filter(val)¶
url
Produces URL-encoded string of the value.
- Parameters:
val (str) – The string to encoded
- Returns:
Encoded string
- Return type:
str
Exceptions¶
- exception IntegrationError(value)¶
Class used to signal Integration Errors. It doesn’t add any specific information other than identifying the type of error
from resilient_lib import IntegrationError raise IntegrationError("Example raising custom error")
Change Log¶
2024-07: version 51.0.2.2 * Update dependency version of setuptools version to 70.3.x to address CVE-2024-6345
2024-07: version 51.0.2.1
All IBM SOAR Python libraries now only officially support Python 3.9, 3.11, and 3.12. To continue using SOAR libraries on earlier versions of Python, use v52.0.2.0.974
2024-05: version 51.0.2.0
Added official support for Python 3.12
2024-04: version 51.0.1.1
No major changes. Just bumping build number to coincide with other builds
2024-02: version 51.0.1.0
All SOAR Python libraries now officially support Python 3.11
Updated
pytz
dependency requirement to~= 2024.1
Updated
jinja2
dependency requirement to~= 3.1
to address CVE-2024-22195
2024-01: version 51.0.0.2
Improve debug logs for retry parameters in
resilient_lib.RequestsCommon.execute()
Added internal UI components for HTML Blocks and Headers as well as Summary Section functionality
2023-11: version 51.0.0.0
Added optional retry to
resilient_lib.RequestsCommon.execute()
. This allows developers to control the retry behavior of failed requests. Seeresilient_lib.RequestsCommon.execute()
for more details and examples
2023-10: version 50.1
Unit test improvements to better handle flaky
requests
tests
2023-08: version 50.0
No major changes. Just bumping build number to coincide with other builds
2023-07: version 49.1
No major changes. Just bumping build number to coincide with other builds
2023-05: version 49.0
Added new functions for pollers to interact with SOAR
2023-04: version 48.1
Updated SOAR case default filters for poller helper methods
2023-02: version 48.0
Added support for
verify
config in app.config when usingresilient_lib.RequestsCommon.execute()
.verify
now can be set toTrue
,False
, or<path_to_your_custom_CA_bundle>
to control the value used when verifying SSL certificates. This change is backward compatible with any apps that useRequestsCommon.execute()
so any apps updated to run on v48.0 will pull in these changes and theverify
configuration will be supported for that appUpdated
resilient_lib.RequestsCommon
to take advantage of persistent sessions from therequests
library usingrequests.Session()
. This has multiple advantages, including cookies persisting from the endpoint and significant performance increases when connecting to the same host. If non-session objects are desired, switch your code to use the new classresilient_lib.RequestsCommonWithoutSession
which behaves the same as the oldRequestsCommon
wouldUpdated project to use
pyproject.toml
andsetup.cfg
metadata files. Build backend continues to usesetuptools
. Instead of directly invoking setup.py to get a sdist or wheel, usebuild
as a build frontend:pip install build python -m build
2022-12: version 47.1
resilient_lib.RequestsCommon.get_client_auth()
renamed fromget_clientauth
. Alias provided forget_clientauth
for backward compatiblity. Please useget_client_auth()
from now on
2022-11: version 47.0
Support for poller-based apps added with new Common Poller Methods, including
resilient_lib.poller()
decorator andresilient_lib.SOARCommon
class of common SOAR methods
2022-08: version 46.0
build_incident_url
urlencodes it’sorgId
2022-07: version 45.1
No major changes. Just bumping build number to coincide with other builds
2022-05: version 45.0
Added
build_task_url
to build direct links to incident TasksUpdated
build_incident_url
to support appending Organization ID queriesAdded
client_auth_cert
andclient_auth_key
options for client side certificates. These can be specified per app in the relevant [fn_some_app] section of the app.config and will be used in any requests made throughresilient_lib.RequestsCommon
.
2022-04: version 44.1
Added Common Jinja Methods for help with using Jinja in your App
2022-02: version 44.0
Ensure
tests/
is not included in packaged codeOfficially support
Python 3.9
2022-01: version 43.1
No major changes. Just bumping build number to coincide with other builds
2021-11: version 43.0
Formatted Sphinx documentation and hosted it at https://ibm.biz/soar-python-docs
validate_fields
also handles anamedtuple
2021-11: version 42.3
No major changes. Just bumping build number to coincide with other builds
2021-10: version 42.2
No major changes. Just bumping build number to coincide with other builds
2021-08: version 42.1
No major changes. Just bumping build number to coincide with other builds
2021-08: version 42.0
Added support for
HTTP_PROXY
,HTTPS_PROXY
andNO_PROXY
environmental variables. See the App Host Deployment Guide for details.Fix to OAuth2 to avoid infinite loop in some circumstances.
2021-06: version 41.1
Added
execute
as an alias forexecute_call_v2
Bug fixes
2021-05: version 41.0
No major changes. Just bumping build number to coincide with other builds
2021-03: version 40.2
Bug fix for to use
setuptools_scm < 6.0.0
for Python 2.7 environments
2021-03: version 40.1
No major changes. Just bumping build number to coincide with other builds
2021-02: version 40.0
bug fixes
2020-12: version 39
add a capability to close an incident
fixes a bug where timeout defined in function section is not processed as an int
2020-09: version 38
validate_fields
now handles a field if it’s type is Text with value string
2020-05: version 37
execute_call_v2
might give an error when debugging with PyCharm. Added a workaround in a comment above the line where it occurs.
2020-04: version 36.2.dev
execute_call_v2
might give an error when debugging with PyCharm. Added a workaround in a comment above the line where it occurs.
2020-01-16: version 35.0.195
Added OAuth2 Client Credentials workflow handler
Added support for timeout argument in integrations config section
2019-10-07: version 34.0.194
Add functionality to resilient_lib to upload attachment
Added support for a timeout parameter across all integrations using the
execute_call_v2()
function
2019-08-02: version 33.0.189
Added support for API key and API key secret, now able to authenticate using API keys instead of email/password
- New functions added to
resilient-lib
get_file_attachment_metadata()
write_to_tmp_file()
- New functions added to
Updated
validate_fields()
function inresilient-lib
, adding the ability to validate fields in app.configOther minor bug fixes.
2019-07-03: version 32.0.186
Added more flexible
execute_call_v2()
method inresilient-lib
Fix for deprecated log warnings
Other minor bug fixes
2019-04-12: version 32.0.140
Improvements to
resilient-lib
Other minor bug fixes/improvements
2019-03-06: version 32.0.126
Removed
selftest
from function templateImprovements to
resilient-lib
2019-01-15: version 32.0
Added
resilient-lib
to repo and PyPi - common library calls which facilitate the development of functions for IBM ResilientAdded Sphinx documentation builder