Kafka¶
Table of Contents¶
Release Notes¶
Version |
Date |
Notes |
---|---|---|
1.0.2 |
05/2024 |
Refresh Kafka python dependency; add playbooks |
1.0.1 |
10/2023 |
Convert Workflows to Python3 |
1.0.0 |
04/2021 |
Initial Release |
Kafka App 1.0.2 Changes¶
In v1.0.2, the existing rules and workflows have been replaced with playbooks. This change is made to support the ongoing, newer capabilities of playbooks. Each playbook has the same functionality as the previous, corresponding rule/workflow.
If upgrading from a previous release, you’ll noticed that the previous release’s rules/workflows remain in place. Both sets of rules and playbooks are active. For manual actions, playbooks will have the same name as it’s corresponding rule, but with “(PB)” added at the end. For automatic actions, the playbooks will be disabled by default.
You can continue to use the rules/workflows. But migrating to playbooks will provide greater functionality along with future app enhancements and bug fixes.
Overview¶
Resilient Circuits Components for ‘fn_kafka’
Support the ability to produce and consume Kafka messages over a number of brokers.
Key Features¶
Ability to define multiple brokers for producing and consuming messages
Send to Kafka allows key/value or just value transmissions on a topic
Poller for listening on broker topics with configurable templates
Requirements¶
This app supports the IBM Security QRadar SOAR Platform and the IBM Security QRadar SOAR for IBM Cloud Pak for Security.
SOAR platform¶
The SOAR platform supports two app deployment mechanisms, Edge Gateway (also known as App Host) and integration server.
If deploying to a SOAR platform with an App Host, the requirements are:
SOAR platform >=
50.0.9097
.The app is in a container-based format (available from the AppExchange as a
zip
file).
If deploying to a SOAR platform with an integration server, the requirements are:
SOAR platform >=
50.0.9097
.The app is in the older integration format (available from the AppExchange as a
zip
file which contains atar.gz
file).Integration server is running
resilient_circuits>=46.0.0
.If using an API key account, make sure the account provides the following minimum permissions:
Name
Permissions
Org Data
Read
Function
Read
Incidents
Create, Read
Edit Incidents
Fields
The following SOAR platform guides provide additional information:
Edge Gateway Deployment Guide or App Host Deployment Guide: provides installation, configuration, and troubleshooting information, including proxy server settings.
Integration Server Guide: provides installation, configuration, and troubleshooting information, including proxy server settings.
System Administrator Guide: provides the procedure to install, configure and deploy apps.
The above guides are available on the IBM Documentation website at ibm.biz/soar-docs. On this web page, select your SOAR platform version. On the follow-on page, you can find the Edge Gateway Deployment Guide, App Host Deployment Guide, or Integration Server Guide by expanding Apps in the Table of Contents pane. The System Administrator Guide is available by expanding System Administrator.
Cloud Pak for Security¶
If you are deploying to IBM Cloud Pak for Security, the requirements are:
IBM Cloud Pak for Security >=
1.10
.Cloud Pak is configured with an Edge Gateway.
The app is in a container-based format (available from the AppExchange as a
zip
file).
The following Cloud Pak guides provide additional information:
Edge Gateway Deployment Guide or App Host Deployment Guide: provides installation, configuration, and troubleshooting information, including proxy server settings. From the Table of Contents, select Case Management and Orchestration & Automation > Orchestration and Automation Apps.
System Administrator Guide: provides information to install, configure, and deploy apps. From the IBM Cloud Pak for Security IBM Documentation table of contents, select Case Management and Orchestration & Automation > System administrator.
These guides are available on the IBM Documentation website at ibm.biz/cp4s-docs. From this web page, select your IBM Cloud Pak for Security version. From the version-specific IBM Documentation page, select Case Management and Orchestration & Automation.
Proxy Server¶
The app does not support a proxy server.
Python Environment¶
Python 3.6 and Python 3.9 are supported. Additional package dependencies may exist for each of these packages:
kafka-python-ng~=2.0
resilient_circuits>=51.0.0
Development Version¶
This app has been implemented using:
Product Name |
Product Version |
---|---|
confluentinc/cp-kafka |
7.4.4 |
Prerequisites¶
A running Kafka broker is required for the app to connect to
Installation¶
Install¶
To install or uninstall an App or Integration on the SOAR platform, see the documentation at ibm.biz/soar-docs.
To install or uninstall an App on IBM Cloud Pak for Security, see the documentation at ibm.biz/cp4s-docs and follow the instructions above to navigate to Orchestration and Automation.
App Configuration¶
The following table provides the settings you need to configure the app. These settings are made in the app.config file. See the documentation discussed in the Requirements section for the procedure.
fn_kafka¶
Config |
Required |
Example |
Description |
---|---|---|---|
listener_brokers |
Yes |
|
Comma-separated list of broker labels to listen on. If empty or commented out, no listeners are started |
selftest_broker |
Yes |
|
Broker label to troubleshoot connection issues |
fn_kafka:broker label
¶
Config |
Required |
Example |
Description |
---|---|---|---|
bootstrap_servers |
Yes |
|
IP Address and port for Kafka bootstrap server |
topics |
Yes |
|
Comma-separated list of topics to listen on when using the listener logic |
template_dir |
No |
|
Directory where templates reside. Use the naming convention or |
The remaining settings are all configurable per your Kafka broker. See (https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html) for information on how to use these additional settings
Config |
Required |
Example |
Description |
---|---|---|---|
group_id |
No |
`` |
Listener group for consumers |
client_id |
No |
`` |
Client identifier for producers |
sasl_mechanism |
No |
|
Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL |
security_protocol |
No |
|
Protocol used to communicate with brokers |
ssl_check_hostname |
No |
|
Flag to configure whether ssl handshake should verify that the certificate matches the brokers hostname. |
sasl_plain_username |
No |
`` |
Username for sasl PLAIN and SCRAM authentication |
sasl_plain_password |
No |
`` |
Password for sasl PLAIN and SCRAM authentication |
Function - Kafka Send¶
Send messages through Kafka based on a named topic
Inputs:
Name |
Type |
Required |
Example |
Tooltip |
---|---|---|---|---|
|
|
Yes |
|
Broker label defined in app.config |
|
|
No |
|
Optional key to pass with Kafka message |
|
|
Yes |
|
Message to send, can be in JSON format |
|
|
Yes |
|
Kafka topic to send message to |
Outputs:
NOTE: This example might be in JSON format, but
results
is a Python Dictionary on the SOAR platform.
results = {
"version": "1.0",
"success": true,
"reason": null,
"content": null,
"raw": "null",
"inputs": {
"kafka_broker_label": "brokerA",
"kafka_message": "{\"name\": \"my test incident\",\"description\": \"this is from SOAR\",\"status\": \"A\",\"severity_code\": \"High\", \"discovered_date\": 1708705700642, \"start_date\": 1708705700642}",
"kafka_key": null,
"kafka_topic": "quickstart-events"
},
"metrics": {
"version": "1.0",
"package": "fn-kafka",
"package_version": "1.0.2",
"host": "PMBP.local",
"execution_time_ms": 559,
"timestamp": "2024-05-07 15:32:32"
}
}
Example Function Input Script:
inputs.kafka_broker_label = playbook.inputs.kafka_broker_label
inputs.kafka_topic = playbook.inputs.kafka_topic
inputs.kafka_key = playbook.inputs.kafka_key
inputs.kafka_message = playbook.inputs.kafka_message.get("content")
Example Function Post Process Script:
results = playbook.functions.results.kafka_send_results
if not results.success:
incident.addNote(f"<b>Kafka: Send (PB):</b> Unable to send message to broker {playbook.inputs.kafka_broker_label}.")
else:
incident.addNote(f"<b>Kafka: Send (PB):</b> Sent message to broker {playbook.inputs.kafka_broker_label}: {playbook.inputs.kafka_message.get('content')}")
Kafka Listener¶
It’s possible to listen for messages on Kafka topics. When consumed, the json formatted message retrieved can be used to create and update incidents. Existing incidents are identified by their case ID as the Kafka message key.
To use this feature, reference to the broker label(s) with the app.config listener_brokers
setting and specify the topics
settings to consume messages.
Templates¶
Two different templates types are available to convert JSON messages into the format needed to create or update an incident.
See the template_dir
app.config setting for the directory used to retain templates. Each template contains the mapping information from a Kafka message to the correct create or update IBM SOAR API call.
Name templates using the following format:
<topic>_create_template.jinja
<topic>_update_template.jinja
This is an example template:
{
{# JINJA template for creating a new IBM SOAR incident #}
"name": "Kafka Incident - {{ title|e }}",
{# use the filter resilient_datetimeformat to convert values for resilient's Epoch timestamp #}
"discovered_date": {{ createdTimeUtc|resilient_datetimeformat }},
"create_date": {{ firstActivityTimeUtc|resilient_datetimeformat }},
"description": {
"format": "text",
"content": "{{ description|e }}"
},
{# use the filter resilient_substitute to convert values for resilient's [N]ew and [A]ctive
ex. status |resilient_substitute('{"New": "N", "Active": "A"}') #}
"plan_status": "{{ status }}",
{# use the filter resilient_substitute to convert values for resilient's Low, Medium, High
ex. severity |resilient_substitute('{"Informational": "Low", "Critical": "High"}') #}
"severity_code": "{{ severity }}",
"properties": {
"custom_int": {{ custom_int }}
}
}
The following json formatted Kafka message would create a new incident.
{
"title": "incident title",
"createdTimeUtc": "2021-03-28T14:45:02",
"firstActivityTimeUtc": "2021-03-28T14:45:02",
"description": "description field",
"status": "A",
"severity": "High",
"custom_int": 5
}
Closing and Updating Incidents¶
You may use one topic for creating and/or updating incidents and another for closing an incident. When closing or updating an incident, include the incident ID as the Kafka message key, kafka_key
.
The following update template illustrates closing an incident:
{
{# include additional "required on close" fields #}
"plan_status": "C",
"resolution_id": "{{ status|resilient_substitute('{"truepositive": "Resolved", "falsepositive": "Not an Issue"}') }}",
"resolution_summary": "{{ description|e }}"
}
This payload corresponds with this template mapping:
{
"status": "truepositive",
"description": "closed by Kafka"
}
The following image shows three templates used for creating and updating incidents on the topic test
and a third template used to close incidents on the test_close
topic.
¶
Playbooks¶
Playbook Name |
Description |
Activation Type |
Object |
Status |
Condition |
---|---|---|---|---|---|
Kafka: Send (PB) |
Send messages through Kafka based on a named topic |
Manual |
incident |
|
|
Troubleshooting & Support¶
Refer to the documentation listed in the Requirements section for troubleshooting information.
For Support¶
This is a IBM Community provided App. Please search the Community ibm.biz/soarcommunity for assistance.