Hello defenders, I hope you are having a great day! In this blog, I am going to talk about an AI Agent which I built recently whose task is to perform the duties of a threat hunter. I will be going through my code, how did I ingested APT29 logs into Elastic SIEM and much more!
Head to https://cloud.elastic.co/login and sign up for a 14-day free trial. Choose Elastic Cloud Serverless as the deployment type, it’s lightweight and perfect for this lab.
In the setup wizard, select Security as your use case. If asked, “How will you use Elastic for Security?” pick Something else, this keeps the setup flexible for our project.
Verify you’re in a serverless project: After logging in, check the top-left corner of the Elastic Cloud dashboard. It should say “Serverless” under your project name. If not, click Create Project, select Serverless, and name it (e.g., “Agentic AI based Threat Hunting”). In my case, I have kept the default name.
After that, click More, and then click “Machine Learning” and then click on Data visualizer.
Press enter or click to view image in full size
Press enter or click to view image in full size
Now, download a dataset for APT29 hackathon from this github repository and upload it to Elastic: https://github.com/OTRF/detection-hackathon-apt29/blob/master/datasets/day1/apt29_evals_day1_manual.zip
Congratulations! You have just ingested logs into the elastic siem. To proceed further, the next task will be to create/get the follow API keys:
Let’s code the AI Agent! Create a new file called Agent.ipynb. We will be using Jupyter notebook to code and test our agent.
Install all the libraries needed for this:
langgraph
langchain-core
langchain-openai
openai
elasticsearch
python-dotenvPut these into requirements.txt file and run the following command: pip install -r requirements.txt (or pip install -r requirements.txt --break-system-packages if you are running it on linux as I am :D)
When done add this code by adding/creating a new cell:
from dotenv import load_dotenv
from typing import TypedDict
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage
from langgraph.checkpoint.memory import MemorySaver
import os
from elasticsearch import Elasticsearch
from langchain_openai import ChatOpenAI
import jsonload_dotenv()
The function of this code is to import the libraries that we installed:
class AgentState(TypedDict):
raw_logs: list[str]
suspicious_events: list[str]
category: str
confidence_score: float
mitre_ttps: list[str]
mitre_reasoning: str
related_events: list[str]
report: str
current_timestamp: str
iteration_count: intAgentState is the memory of the agent. It is a TypedDict class that defines all the data the agent needs to track as it moves through each node in the graph. Think of it like a shared notepad that every node can read from and write to.
Join Medium for free to get updates from this writer.
Here’s what each field stores:
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0
)elastic_search = Elasticsearch(
cloud_id=os.getenv("ELASTICSEARCH_CLOUD_ID"),
api_key=os.getenv("ELASTIC_API_KEY"),
Initializing OpenAI model and loading elk stack’s api key and cloud id into Elastic Client
RAW_LOGS_PROMPT = """You are an experienced Threat Hunter with over 10 years at a Managed Security Service Provider (MSSP).
You will be given raw security logs from an Elastic SIEM. Your task is to:
1. Identify any suspicious or anomalous activity in the logs
2. Explain why each event is suspicious
3. Assign a severity level: LOW, MEDIUM, HIGH, or CRITICAL
4. List the affected user accounts, IPs, or systems involvedFocus on detecting:
- Unauthorized access attempts
- Unusual process execution
- Lateral movement indicators
- Privilege escalation
- Data exfiltration patterns
- Persistence mechanisms
Be concise and precise. If no suspicious activity is found, state that clearly.
You MUST respond in the following JSON format only. No extra text outside the JSON:
{{
"suspicious_events": [
{{
"description": "what happened",
"severity": "LOW/MEDIUM/HIGH/CRITICAL",
"hostname": "affected hostname",
"process_id": "affected process id",
"timestamp": "event timestamp",
"reason": "why this is suspicious"
}}
],
"category": "BENIGN/SUSPICIOUS/MALICIOUS",
"confidence_score": 0.0
}}
Raw logs:
{logs}
"""
This predefined SYSTEM prompt tells the agent (or in a simeple langugage the LLM) to act as a senior threat hunter, analyze raw Sysmon logs from Elastic SIEM, and return structured JSON containing suspicious events, severity levels, and confidence score.
MITRE_MAPPING_PROMPT = """You are an expert in the MITRE ATT&CK framework with over 10 years of experience in threat intelligence.You will be given a set of security findings from a threat hunting analysis. Your task is to:
1. Map each finding to the most relevant MITRE ATT&CK Tactic and Technique
2. Provide the exact Technique ID (e.g. T1055, T1003)
3. Explain why each finding maps to that specific technique
4. Identify the Kill Chain phase for each finding
Format your response exactly like this for each finding:
Finding: [brief description of the finding]
Tactic: [MITRE Tactic name]
Technique ID: [T####]
Technique Name: [MITRE Technique name]
Kill Chain Phase: [phase name]
Reasoning: [why this maps to this technique]
Security Findings:
{findings}
If a finding cannot be mapped to MITRE ATT&CK, state that clearly and explain why.
"""
This predefined SYSTEM prompt tells the agent (or in simple language the LLM) to map classified threat findings to MITRE ATT&CK tactics, techniques, and Kill Chain phases with exact Technique IDs (e.g. T1055).
REPORT_PROMPT = """You are a senior Threat Hunter writing a formal incident report for a SOC team.You have completed a full threat hunting analysis. Below are your inputs:
INITIAL FINDINGS:
{findings}
MITRE ATT&CK MAPPINGS:
{mitre_mappings}
ENRICHED CONTEXT (related logs within ±5 minutes of suspicious events):
{related_events}
Your task:
1. Synthesize all three inputs into a coherent threat hunting report
2. Identify the full attack timeline based on the enriched context
3. Confirm or revise your initial severity assessment with justification
4. List all affected systems, accounts, and processes
5. Provide recommended immediate containment actions
6. Write KQL queries that a threat hunter could use in Elastic to hunt for the same TTPs
7. Summarize the overall threat level: BENIGN / LOW / MEDIUM / HIGH / CRITICAL
Format your report as follows:
EXECUTIVE SUMMARY:
[2-3 sentence overview for management]
TECHNICAL FINDINGS:
[Detailed technical breakdown]
ATTACK TIMELINE:
[Chronological sequence of events]
AFFECTED ASSETS:
[Systems, accounts, processes involved]
MITRE ATT&CK SUMMARY:
[TTPs identified with IDs]
KQL HUNTING QUERIES:
[Provide 3-5 KQL queries for Elastic that analysts can use to detect the same activity.
Each query should include:
- Query name/purpose
- The KQL query itself
- What it detects and why it is relevant to the findings]
RECOMMENDATIONS:
[Immediate containment and remediation steps]
OVERALL THREAT LEVEL: [level]
"""
This tells the agent (or in simple words the LLM) to summarize threat findings, MITRE mappings, and enriched context into a formal SOC incident report including attack timeline, affected assets, KQL hunting queries, and recommendations.
def fetch_logs_node(state: AgentState):
result = elastic_search.search(
index="malicious_activity",
size=20,
query={
"bool": {
"must": {"term": {"EventID": 1}},
}
}
)
logs = [hit["_source"] for hit in result["hits"]["hits"]]
return {
"raw_logs": logs,
"current_timestamp": logs[-1]["EventTime"]
}A helper function which fetches 20 logs from index named malicious_activity (you can change the index name accordingly) using the query which looks for Event ID 1. Its a good idea to give the agent some context of what type of logs you are dealing just so it has some context for what type of activities it can look upto.
def query_by_hostname(hostname: str) -> list:
result = elastic_search.search(
index="malicious_activity",
size=20,
query={
"match": {
"Hostname": hostname
}
}
) print(result)
return [hit["_source"] for hit in result["hits"]["hits"]]
A helper function which fetches 20 logs from the Elastic SIEM for a specific machine. If something suspicious was found on a host, this function pulls everything else that machine was doing giving the agent more context to work with.
def query_by_process(process_id: str) -> list:
result = elastic_search.search(
index="malicious_activity",
size=20,
query={
"match": {
"ProcessId": process_id
}
}
)
print(result)
return [hit["_source"] for hit in result["hits"]["hits"]]A helper function which fetches 20 logs from the Elastic SIEM for a specific machine. If a suspicious process id was found, than this function pulls everything else that particular process was doing giving the agent more context to work with.
def query_by_timeframe(timestamp: str) -> list:
timestamp = timestamp.replace(" ", "T")
result = elastic_search.search(
index="malicious_activity",
size=20,
query={
"range": {
"@timestamp": {
"gte": f"{timestamp}||-5m",
"lte": f"{timestamp}||+5m"
}
}
}
)
print(result)
return [hit["_source"] for hit in result["hits"]["hits"]]A helper function which fetches 20 logs from the Elastic SIEM within a +/- 5 minute window around a suspicious event’s timestamp. If something happened at 23:22:00, this pulls everything between 23:17:00 and 23:27:00, giving the agent a broader picture of what was happening before and after the suspicious activity.
def classify_logs_node(state: AgentState):
json_input = json.dumps(state['raw_logs'], indent=2)
messages = [
SystemMessage(content=RAW_LOGS_PROMPT.format(logs=json_input)),
HumanMessage(content="Analyse these logs")
] response = llm.invoke(messages)
try:
parsed = json.loads(response.content)
return {
"suspicious_events": response.content,
"confidence_score": parsed.get("confidence_score", 0.0),
"category": parsed.get("category", ""),
"iteration_count": state['iteration_count'] + 1
}
except:
return {
"suspicious_events": response.content,
"confidence_score": 0.0,
"iteration_count": state['iteration_count'] + 1
}
This node is the brain of the agent. It takes the raw logs fetched by fetch_logs_node, converts them to JSON (GPT handles structured JSON better than raw Python (nested) dictionaries), and passes them to the LLM along with RAW_LOGS_PROMPT to start classifying the events. The LLM returns a structured JSON response containing the suspicious events, category, and a confidence score. The confidence score is then used by should_continue to decide whether the agent has enough information to proceed to MITRE mapping or needs to fetch more logs and try again.
def mitre_mapping_node(state: AgentState):
json_input = json.dumps(state['suspicious_events'], indent=2)
messages = [
SystemMessage(content=MITRE_MAPPING_PROMPT.format(findings=json_input)),
HumanMessage(content="Analyze these logs.")
] response = llm.invoke(messages)
return {"mitre_ttps": response.content}
This node takes the suspicious events identified by classify_logs_node and asks the agent to map each finding to the MITRE ATT&CK framework. It formats the findings and passes them to the LLM along with the MITRE_MAPPING_PROMPT, which instructs the agent to return the relevant Tactic, Technique ID, Kill Chain phase and reasoning for each finding. Results are stored in mitre_ttps for the report generator to use.
def enrich_content_node(state: AgentState):
try:
parsed_suspicious_logs = json.loads(state['suspicious_events'])
print(parsed_suspicious_logs)
except:
print("JSON parsing failed:", state['suspicious_events'])
return {"related_events": []} related_events = []
for events in parsed_suspicious_logs['suspicious_events'][:3]:
related_events += query_by_hostname(events['hostname'])[:5]
related_events += query_by_process(events['process_id'])[:5]
related_events += query_by_timeframe(events['timestamp'])[:5]
return {"related_events": related_events}
This node provies additional information to the agent’s findings by fetching additional context around each suspicious event that was identified in classify_logs_node. For each suspicious event, it runs three queries: same hostname, same process, and same time window (±5 mins) and dumps everything into related_events so the report generator has the full picture.
def should_continue(state: AgentState):
if state['confidence_score'] >=0.5 or state['iteration_count'] >=5:
return "mitre_mapping_node" else:
return "fetch_more_information_node"
This is the decision making function of the agent. It decides whether the agent has gathered enough information to proceed or needs to fetch more logs. If the confidence score is 0.5 or above, the agent is confident enough in its findings and moves forward to MITRE mapping. If the agent has already looped 5 times without confidence, it forces a move forward anyway to prevent an infinite loop. Otherwise it goes back to fetch more logs from the next time window and tries again.
def fetch_more_information_node(state: AgentState):
current_timestamp = state['current_timestamp']
current_timestamp = current_timestamp.replace(" ", "T")
result = elastic_search.search(
index="malicious_activity",
size=20,
query={
"range": {
"@timestamp": {
"gte": f"{current_timestamp}||+5m",
"lte": f"{current_timestamp}||+10m"
}
}
}
)
print(result)
logs = [hit["_source"] for hit in result["hits"]["hits"]]
return {
"raw_logs": logs,
"current_timestamp": logs[-1]["EventTime"] if logs else current_timestamp
}This node is triggered when the agent’s confidence score is below 0.5. It fetches the next batch of 20 logs from a 5 minute window ahead of the current timestamp, essentially sliding the time window forward to look at what happened next in the environment. The current_timestamp is then updated to the last log’s EventTime so the next iteration continues from where this one left off.
def generate_report_node(state: AgentState):
findings_json = json.dumps(state['suspicious_events'])
mitre_mappings_json = json.dumps(state['mitre_ttps'])
related_events_json = json.dumps(state.get('related_events', [])[:10], indent=2) messages = [
SystemMessage(content=REPORT_PROMPT.format(
findings=findings_json,
mitre_mappings=mitre_mappings_json,
related_events=related_events_json
)),
HumanMessage(content="Generate a full threat hunting report using all the data that is provided to you!")
]
response = llm.invoke(messages)
return {"report": response.content}
This is the final analysis node. It takes everything the agent has gathered: the classified suspicious events, the MITRE ATT&CK mappings, and the enriched context and then passes it all to the LLM with the REPORT_PROMPT to generate a formal threat hunting report. Related events are capped at 10 to avoid hitting the model specific token limit. The report is stored in state for save_report to write to disk.
def save_report(state: AgentState):
response = state['report']
with open("report.md", "w") as f:
f.write(response)A simple helper function that takes the final report from state and writes it to a markdown file called report.md.
def run_agent():
memory = MemorySaver()
builder = StateGraph(AgentState) builder.add_node("fetch_logs_node", fetch_logs_node)
builder.add_node("classify_logs_node", classify_logs_node)
builder.add_node("mitre_mapping_node", mitre_mapping_node)
builder.add_node("enrich_content_node", enrich_content_node)
builder.add_node("generate_report_node", generate_report_node)
builder.add_node("fetch_more_information_node", fetch_more_information_node)
builder.set_entry_point("fetch_logs_node")
builder.add_edge("fetch_logs_node", "classify_logs_node")
builder.add_conditional_edges(
"classify_logs_node",
should_continue,
{
"mitre_mapping_node": "mitre_mapping_node",
"fetch_more_information_node": "fetch_more_information_node"
}
)
builder.add_edge("fetch_more_information_node", "classify_logs_node")
builder.add_edge("mitre_mapping_node", "enrich_content_node")
builder.add_edge("enrich_content_node", "generate_report_node")
builder.add_edge("generate_report_node", END)
graph = builder.compile(checkpointer=memory)
thread = {"configurable": {"thread_id": "1"}}
result = graph.invoke(
{
"raw_logs": [],
"suspicious_events": "",
"category": "",
"confidence_score": 0.0,
"mitre_ttps": "",
"mitre_reasoning": "",
"related_events": [],
"report": "",
"current_timestamp": "2020-05-01 23:22:00",
"iteration_count": 0
},
thread
)
return result
This is the main entry point of the agent. This function builds the LangGraph state machine by registering all nodes, defining the edges between them, and compiling the graph with an in-memory checkpointer for state persistence across all nodes. The graph is then invoked with an initial empty state and a starting timestamp. Once complete, it returns the final state containing the full threat hunting report.
result = run_agent()
save_report(result)This code just runs the agent by calling the run_agent function
and saves the report by calling the save_report function and passing
the result which we got by calling the agent
Now we are all done! Click run all to run all cells to run and test the output of the agent that we have created.
As you can see below, the agent ran in ~55 seconds
Press enter or click to view image in full size
Here is the detailed report that the agent created:
Press enter or click to view image in full size
Here is a workflow showing the interconncetion of all nodes:
Press enter or click to view image in full size
That’s it! In under 60 seconds the agent fetched real APT29 attack logs, classified suspicious activity, mapped it to MITRE ATT&CK, enriched the context, and produced a full SOC-ready incident report with KQL hunting queries, all without any human involvement.
The full source code is available on my GitHub. If you found this useful, drop a star on the repo and feel free to open a PR, all contributions are welcome!
If you have any questions or want to discuss the project, feel free to connect with me on LinkedIn.
Elasticsearch python client docs:
https://www.elastic.co/docs/reference/elasticsearch/clients/python