What You Will Achieve
By the end of this playbook you will have:
- Splunk lookup command that calls isMalicious API in real-time during search
- Elastic Logstash filter for inline enrichment of network events
- Python enrichment script for batch processing alert queues
- A triage workflow that auto-categorizes alerts as malicious/suspicious/clean
Prerequisites
| Requirement | Details |
|---|---|
| isMalicious API key + secret | From Account; header X-API-KEY = Base64(key:secret) |
| Splunk 9.x or Elastic 8.x | Cloud or on-premise |
| Python 3.9+ | For the batch script |
Get your API key at ismalicious.com/app/account/api.
Step 1: Splunk Real-Time Lookup
Add isMalicious as an external lookup in Splunk so you can enrich any field in a search.
1a. Create the lookup script
Save as $SPLUNK_HOME/etc/apps/search/lookups/ismalicious_lookup.py:
import base64
import sys
import csv
import requests
ISMALICIOUS_API_KEY = "YOUR_API_KEY"
ISMALICIOUS_API_SECRET = "YOUR_API_SECRET"
BASE_URL = "https://api.ismalicious.com/check"
HDR = {
"X-API-KEY": base64.b64encode(
f"{ISMALICIOUS_API_KEY}:{ISMALICIOUS_API_SECRET}".encode()
).decode()
}
def lookup(field_values):
results = []
for value in field_values:
try:
resp = requests.get(
BASE_URL,
params={"query": value},
headers=HDR,
timeout=5
)
if resp.ok:
data = resp.json()
results.append({
"im_malicious": str(data.get("malicious", False)),
"im_score": str(data.get("riskScore", {}).get("score", 0)),
"im_category": ", ".join(data.get("categories", [])),
"im_confidence": str(data.get("confidenceScore", 0)),
})
else:
results.append({"im_malicious": "error", "im_score": "", "im_category": "", "im_confidence": ""})
except Exception:
results.append({"im_malicious": "timeout", "im_score": "", "im_category": "", "im_confidence": ""})
return results
1b. Register the lookup
In transforms.conf:
[ismalicious_lookup]
filename = ismalicious_lookup.py
fields_list = query, im_malicious, im_score, im_category, im_confidence
external_type = python
1c. Use it in a search
index=firewall | where isnotnull(dest_ip)
| lookup ismalicious_lookup query AS dest_ip OUTPUT im_malicious, im_score, im_category
| where im_malicious="True"
| table _time, src_ip, dest_ip, im_score, im_category
Step 2: Elastic Logstash Enrichment
Set ISMALICIOUS_X_API_KEY to Base64(apiKey:apiSecret) from your dashboard. Add an HTTP filter to enrich network flow events inline.
logstash.conf
filter {
if [destination][ip] {
http {
url => "https://api.ismalicious.com/check?query=%{[destination][ip]}"
headers => {
"X-API-KEY" => "${ISMALICIOUS_X_API_KEY}"
}
target_body => "ismalicious"
verb => "GET"
}
mutate {
add_field => {
"[threat][enrichment][malicious]" => "%{[ismalicious][malicious]}"
"[threat][enrichment][score]" => "%{[ismalicious][riskScore][score]}"
"[threat][enrichment][category]" => "%{[ismalicious][categories]}"
}
remove_field => ["ismalicious"]
}
}
}
Step 3: Batch Enrichment Script
Use this for enriching historical data or alert queues:
import base64
import requests
import json
from concurrent.futures import ThreadPoolExecutor
API_KEY = "YOUR_API_KEY"
API_SECRET = "YOUR_API_SECRET"
HEADERS = {
"X-API-KEY": base64.b64encode(f"{API_KEY}:{API_SECRET}".encode()).decode()
}
def enrich(entity):
try:
r = requests.get(
"https://api.ismalicious.com/check",
params={"query": entity},
headers=HEADERS,
timeout=10
)
if r.ok:
data = r.json()
return {
"entity": entity,
"malicious": data.get("malicious"),
"score": data.get("riskScore", {}).get("score"),
"categories": data.get("categories", []),
"confidence": data.get("confidenceScore"),
}
except Exception as e:
return {"entity": entity, "error": str(e)}
# Enrich 100 IPs concurrently (respects rate limits)
entities = ["1.2.3.4", "evil.example.com", ...]
with ThreadPoolExecutor(max_workers=10) as pool:
results = list(pool.map(enrich, entities))
print(json.dumps(results, indent=2))
Step 4: Triage Workflow
Once enrichment is in place, create a tiered response workflow:
| Score | Action | |---|---| | 80–100 | Auto-create P1 incident, notify SOC | | 60–79 | Create P2 alert, assign to analyst | | 40–59 | Log for review, add to watchlist | | 0–39 | Log only, no action |
Use the confidence field to weight auto-close decisions: high confidence + low score = safe to auto-resolve.
Validation
Test with known malicious indicators:
# Should return malicious: true
curl -G "https://api.ismalicious.com/check" \
--data-urlencode "query=185.220.101.1" \
-H "X-API-KEY: $(echo -n 'YOUR_API_KEY:YOUR_API_SECRET' | base64)"
Next Steps
- Set up monitoring/watchlist for your most critical assets
- Connect to the STIX/TAXII feed for automated IOC ingestion
- Export results to your ticketing system (Jira, ServiceNow, PagerDuty)