My AI Agent Hunted APT29 under 60 Seconds. Here’s How I Built It.
Hello defenders, I hope you are having a great day! In this blog, I am going to talk about an AI Age 2026-3-16 05:32:52 Author: infosecwriteups.com(查看原文) 阅读量:13 收藏

Sparsh Ladani

Hello defenders, I hope you are having a great day! In this blog, I am going to talk about an AI Agent which I built recently whose task is to perform the duties of a threat hunter. I will be going through my code, how did I ingested APT29 logs into Elastic SIEM and much more!

Step 1

Head to https://cloud.elastic.co/login and sign up for a 14-day free trial. Choose Elastic Cloud Serverless as the deployment type, it’s lightweight and perfect for this lab.

In the setup wizard, select Security as your use case. If asked, “How will you use Elastic for Security?” pick Something else, this keeps the setup flexible for our project.

Verify you’re in a serverless project: After logging in, check the top-left corner of the Elastic Cloud dashboard. It should say “Serverless” under your project name. If not, click Create Project, select Serverless, and name it (e.g., “Agentic AI based Threat Hunting”). In my case, I have kept the default name.

After that, click More, and then click “Machine Learning” and then click on Data visualizer.

Press enter or click to view image in full size

Press enter or click to view image in full size

Now, download a dataset for APT29 hackathon from this github repository and upload it to Elastic: https://github.com/OTRF/detection-hackathon-apt29/blob/master/datasets/day1/apt29_evals_day1_manual.zip

Congratulations! You have just ingested logs into the elastic siem. To proceed further, the next task will be to create/get the follow API keys:

  1. Any AI Model: I am using OpenAI API as I had some credits left in it. You can use a new AI model for the same which is much more efficient like Anthropic’s Opus 4.6 or If you dont want to spend any money and have good system specifications, you can go with locall LLM models like Ollama’s Qwen 3. If you want to create an OpenAI’s API go to: https://platform.openai.com/home and create one there.
  2. Now, the agent which I built, uses two secrets from Elastic: ELASTIC_API_KEY and ELASTICSEARCH_CLOUD_ID. To get ELASTIC_API_KEY, go kibana stack management and create a new ELASTIC_API_KEY. To get the ELASTIC_CLOUD_ID, go to your current deployment and then click manage my deployment. After doing so, you should be able to see your elastic cloud id. Now take all the secrets, open vs code and pasted them into a .env file by creating a new project in VS Code or any other code editor that you prefer.

Step 2

Let’s code the AI Agent! Create a new file called Agent.ipynb. We will be using Jupyter notebook to code and test our agent.

Install all the libraries needed for this:

langgraph
langchain-core
langchain-openai
openai
elasticsearch
python-dotenv

Put these into requirements.txt file and run the following command: pip install -r requirements.txt (or pip install -r requirements.txt --break-system-packages if you are running it on linux as I am :D)

When done add this code by adding/creating a new cell:

from dotenv import load_dotenv
from typing import TypedDict
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage
from langgraph.checkpoint.memory import MemorySaver
import os
from elasticsearch import Elasticsearch
from langchain_openai import ChatOpenAI
import json

load_dotenv()

The function of this code is to import the libraries that we installed:

  • dotenv: loads the environment variables from the .env file so we can access our API keys without hardcoding them in the code.
  • TypedDict: a built-in Python type that lets us define a dictionary with fixed keys and types, which is bydefault required by LangGraph to define the agent’s state (AgentState).
  • StateGraph, START, END: the core building blocks of LangGraph, StateGraph is the actual agent graph, START and END are special markers that define where the graph begins and ends.
  • SystemMessage, HumanMessage: used to structure the messages we send to GPT, SystemMessage is the prompt/instructions, HumanMessage is what we’re asking the model to do.
  • MemorySaver: an in-memory checkpointer that LangGraph uses to save and track the agent’s state as it moves through each node.
  • Elasticsearch: the official Python client that lets us connect to our Elastic Cloud deployment and run queries against our SIEM data.
  • ChatOpenAI: a LangChain wrapper around OpenAI’s API that lets us call GPT-4o-mini cleanly without manually handling API requests.
  • json: a built-in Python library used to convert data between Python dictionaries and JSON strings, needed because GPT returns responses as strings that we need to parse.
  • load_dotenv(): actually loads the .env file into memory so all the os.getenv() calls throughout the code can find the API keys.
class AgentState(TypedDict):
raw_logs: list[str]
suspicious_events: list[str]
category: str
confidence_score: float
mitre_ttps: list[str]
mitre_reasoning: str
related_events: list[str]
report: str
current_timestamp: str
iteration_count: int

AgentState is the memory of the agent. It is a TypedDict class that defines all the data the agent needs to track as it moves through each node in the graph. Think of it like a shared notepad that every node can read from and write to.

Get Sparsh Ladani’s stories in your inbox

Join Medium for free to get updates from this writer.

Remember me for faster sign in

Here’s what each field stores:

  • raw_logs: the raw security logs fetched directly from Elastic SIEM, this is the starting point of every hunt.
  • suspicious_events: the classified findings returned by GPT after analyzing the raw logs, stored as a JSON string.
  • category: the overall classification of the logs: BENIGN, SUSPICIOUS, or MALICIOUS.
  • confidence_score: a float between 0 and 1 that tells the agent how confident GPT is in its findings, this is what drives the conditional branching in should_continue.
  • mitre_ttps: the MITRE ATT&CK mappings returned by the mitre_mapping_node, containing the tactics, techniques and kill chain phases.
  • mitre_reasoning: the reasoning behind each MITRE mapping (currently stored but reserved for future use).
  • related_events: the enriched context fetched by enrich_content_node, containing all logs related to the suspicious events by hostname, process and timeframe.
  • report: the final SOC-ready threat hunting report generated by generate_report_node, this is what gets saved to report.md.
  • current_timestamp: tracks where in the timeline the agent currently is, used by fetch_more_information_node to slide the time window forward.
  • iteration_count: counts how many times the agent has looped through classify_logs_node, used to prevent infinite loops by forcing a exit after 5 iterations.
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0
)

elastic_search = Elasticsearch(
cloud_id=os.getenv("ELASTICSEARCH_CLOUD_ID"),
api_key=os.getenv("ELASTIC_API_KEY"),

Initializing OpenAI model and loading elk stack’s api key and cloud id into Elastic Client

RAW_LOGS_PROMPT = """You are an experienced Threat Hunter with over 10 years at a Managed Security Service Provider (MSSP).
You will be given raw security logs from an Elastic SIEM. Your task is to:
1. Identify any suspicious or anomalous activity in the logs
2. Explain why each event is suspicious
3. Assign a severity level: LOW, MEDIUM, HIGH, or CRITICAL
4. List the affected user accounts, IPs, or systems involved

Focus on detecting:
- Unauthorized access attempts
- Unusual process execution
- Lateral movement indicators
- Privilege escalation
- Data exfiltration patterns
- Persistence mechanisms

Be concise and precise. If no suspicious activity is found, state that clearly.

You MUST respond in the following JSON format only. No extra text outside the JSON:
{{
"suspicious_events": [
{{
"description": "what happened",
"severity": "LOW/MEDIUM/HIGH/CRITICAL",
"hostname": "affected hostname",
"process_id": "affected process id",
"timestamp": "event timestamp",
"reason": "why this is suspicious"
}}
],
"category": "BENIGN/SUSPICIOUS/MALICIOUS",
"confidence_score": 0.0
}}

Raw logs:
{logs}
"""

This predefined SYSTEM prompt tells the agent (or in a simeple langugage the LLM) to act as a senior threat hunter, analyze raw Sysmon logs from Elastic SIEM, and return structured JSON containing suspicious events, severity levels, and confidence score.

MITRE_MAPPING_PROMPT = """You are an expert in the MITRE ATT&CK framework with over 10 years of experience in threat intelligence.

You will be given a set of security findings from a threat hunting analysis. Your task is to:
1. Map each finding to the most relevant MITRE ATT&CK Tactic and Technique
2. Provide the exact Technique ID (e.g. T1055, T1003)
3. Explain why each finding maps to that specific technique
4. Identify the Kill Chain phase for each finding

Format your response exactly like this for each finding:
Finding: [brief description of the finding]
Tactic: [MITRE Tactic name]
Technique ID: [T####]
Technique Name: [MITRE Technique name]
Kill Chain Phase: [phase name]
Reasoning: [why this maps to this technique]

Security Findings:
{findings}

If a finding cannot be mapped to MITRE ATT&CK, state that clearly and explain why.
"""

This predefined SYSTEM prompt tells the agent (or in simple language the LLM) to map classified threat findings to MITRE ATT&CK tactics, techniques, and Kill Chain phases with exact Technique IDs (e.g. T1055).

REPORT_PROMPT = """You are a senior Threat Hunter writing a formal incident report for a SOC team.

You have completed a full threat hunting analysis. Below are your inputs:

INITIAL FINDINGS:
{findings}

MITRE ATT&CK MAPPINGS:
{mitre_mappings}

ENRICHED CONTEXT (related logs within ±5 minutes of suspicious events):
{related_events}

Your task:
1. Synthesize all three inputs into a coherent threat hunting report
2. Identify the full attack timeline based on the enriched context
3. Confirm or revise your initial severity assessment with justification
4. List all affected systems, accounts, and processes
5. Provide recommended immediate containment actions
6. Write KQL queries that a threat hunter could use in Elastic to hunt for the same TTPs
7. Summarize the overall threat level: BENIGN / LOW / MEDIUM / HIGH / CRITICAL

Format your report as follows:

EXECUTIVE SUMMARY:
[2-3 sentence overview for management]

TECHNICAL FINDINGS:
[Detailed technical breakdown]

ATTACK TIMELINE:
[Chronological sequence of events]

AFFECTED ASSETS:
[Systems, accounts, processes involved]

MITRE ATT&CK SUMMARY:
[TTPs identified with IDs]

KQL HUNTING QUERIES:
[Provide 3-5 KQL queries for Elastic that analysts can use to detect the same activity.
Each query should include:
- Query name/purpose
- The KQL query itself
- What it detects and why it is relevant to the findings]

RECOMMENDATIONS:
[Immediate containment and remediation steps]

OVERALL THREAT LEVEL: [level]
"""

This tells the agent (or in simple words the LLM) to summarize threat findings, MITRE mappings, and enriched context into a formal SOC incident report including attack timeline, affected assets, KQL hunting queries, and recommendations.

def fetch_logs_node(state: AgentState):
result = elastic_search.search(
index="malicious_activity",
size=20,
query={
"bool": {
"must": {"term": {"EventID": 1}},
}
}
)
logs = [hit["_source"] for hit in result["hits"]["hits"]]
return {
"raw_logs": logs,
"current_timestamp": logs[-1]["EventTime"]
}

A helper function which fetches 20 logs from index named malicious_activity (you can change the index name accordingly) using the query which looks for Event ID 1. Its a good idea to give the agent some context of what type of logs you are dealing just so it has some context for what type of activities it can look upto.

def query_by_hostname(hostname: str) -> list:
result = elastic_search.search(
index="malicious_activity",
size=20,
query={
"match": {
"Hostname": hostname
}
}
)

print(result)
return [hit["_source"] for hit in result["hits"]["hits"]]

A helper function which fetches 20 logs from the Elastic SIEM for a specific machine. If something suspicious was found on a host, this function pulls everything else that machine was doing giving the agent more context to work with.

def query_by_process(process_id: str) -> list:
result = elastic_search.search(
index="malicious_activity",
size=20,
query={
"match": {
"ProcessId": process_id
}
}
)
print(result)
return [hit["_source"] for hit in result["hits"]["hits"]]

A helper function which fetches 20 logs from the Elastic SIEM for a specific machine. If a suspicious process id was found, than this function pulls everything else that particular process was doing giving the agent more context to work with.

def query_by_timeframe(timestamp: str) -> list:
timestamp = timestamp.replace(" ", "T")
result = elastic_search.search(
index="malicious_activity",
size=20,
query={
"range": {
"@timestamp": {
"gte": f"{timestamp}||-5m",
"lte": f"{timestamp}||+5m"
}
}
}
)
print(result)
return [hit["_source"] for hit in result["hits"]["hits"]]

A helper function which fetches 20 logs from the Elastic SIEM within a +/- 5 minute window around a suspicious event’s timestamp. If something happened at 23:22:00, this pulls everything between 23:17:00 and 23:27:00, giving the agent a broader picture of what was happening before and after the suspicious activity.

def classify_logs_node(state: AgentState):
json_input = json.dumps(state['raw_logs'], indent=2)
messages = [
SystemMessage(content=RAW_LOGS_PROMPT.format(logs=json_input)),
HumanMessage(content="Analyse these logs")
]

response = llm.invoke(messages)

try:
parsed = json.loads(response.content)
return {
"suspicious_events": response.content,
"confidence_score": parsed.get("confidence_score", 0.0),
"category": parsed.get("category", ""),
"iteration_count": state['iteration_count'] + 1
}
except:
return {
"suspicious_events": response.content,
"confidence_score": 0.0,
"iteration_count": state['iteration_count'] + 1
}

This node is the brain of the agent. It takes the raw logs fetched by fetch_logs_node, converts them to JSON (GPT handles structured JSON better than raw Python (nested) dictionaries), and passes them to the LLM along with RAW_LOGS_PROMPT to start classifying the events. The LLM returns a structured JSON response containing the suspicious events, category, and a confidence score. The confidence score is then used by should_continue to decide whether the agent has enough information to proceed to MITRE mapping or needs to fetch more logs and try again.

def mitre_mapping_node(state: AgentState):
json_input = json.dumps(state['suspicious_events'], indent=2)
messages = [
SystemMessage(content=MITRE_MAPPING_PROMPT.format(findings=json_input)),
HumanMessage(content="Analyze these logs.")
]

response = llm.invoke(messages)

return {"mitre_ttps": response.content}

This node takes the suspicious events identified by classify_logs_node and asks the agent to map each finding to the MITRE ATT&CK framework. It formats the findings and passes them to the LLM along with the MITRE_MAPPING_PROMPT, which instructs the agent to return the relevant Tactic, Technique ID, Kill Chain phase and reasoning for each finding. Results are stored in mitre_ttps for the report generator to use.

def enrich_content_node(state: AgentState):
try:
parsed_suspicious_logs = json.loads(state['suspicious_events'])
print(parsed_suspicious_logs)
except:
print("JSON parsing failed:", state['suspicious_events'])
return {"related_events": []}

related_events = []

for events in parsed_suspicious_logs['suspicious_events'][:3]:
related_events += query_by_hostname(events['hostname'])[:5]
related_events += query_by_process(events['process_id'])[:5]
related_events += query_by_timeframe(events['timestamp'])[:5]

return {"related_events": related_events}

This node provies additional information to the agent’s findings by fetching additional context around each suspicious event that was identified in classify_logs_node. For each suspicious event, it runs three queries: same hostname, same process, and same time window (±5 mins) and dumps everything into related_events so the report generator has the full picture.

def should_continue(state: AgentState):
if state['confidence_score'] >=0.5 or state['iteration_count'] >=5:
return "mitre_mapping_node"

else:
return "fetch_more_information_node"

This is the decision making function of the agent. It decides whether the agent has gathered enough information to proceed or needs to fetch more logs. If the confidence score is 0.5 or above, the agent is confident enough in its findings and moves forward to MITRE mapping. If the agent has already looped 5 times without confidence, it forces a move forward anyway to prevent an infinite loop. Otherwise it goes back to fetch more logs from the next time window and tries again.

def fetch_more_information_node(state: AgentState):
current_timestamp = state['current_timestamp']
current_timestamp = current_timestamp.replace(" ", "T")
result = elastic_search.search(
index="malicious_activity",
size=20,
query={
"range": {
"@timestamp": {
"gte": f"{current_timestamp}||+5m",
"lte": f"{current_timestamp}||+10m"
}
}
}
)
print(result)
logs = [hit["_source"] for hit in result["hits"]["hits"]]
return {
"raw_logs": logs,
"current_timestamp": logs[-1]["EventTime"] if logs else current_timestamp
}

This node is triggered when the agent’s confidence score is below 0.5. It fetches the next batch of 20 logs from a 5 minute window ahead of the current timestamp, essentially sliding the time window forward to look at what happened next in the environment. The current_timestamp is then updated to the last log’s EventTime so the next iteration continues from where this one left off.

def generate_report_node(state: AgentState):
findings_json = json.dumps(state['suspicious_events'])
mitre_mappings_json = json.dumps(state['mitre_ttps'])
related_events_json = json.dumps(state.get('related_events', [])[:10], indent=2)

messages = [
SystemMessage(content=REPORT_PROMPT.format(
findings=findings_json,
mitre_mappings=mitre_mappings_json,
related_events=related_events_json
)),
HumanMessage(content="Generate a full threat hunting report using all the data that is provided to you!")
]

response = llm.invoke(messages)

return {"report": response.content}

This is the final analysis node. It takes everything the agent has gathered: the classified suspicious events, the MITRE ATT&CK mappings, and the enriched context and then passes it all to the LLM with the REPORT_PROMPT to generate a formal threat hunting report. Related events are capped at 10 to avoid hitting the model specific token limit. The report is stored in state for save_report to write to disk.

def save_report(state: AgentState):
response = state['report']
with open("report.md", "w") as f:
f.write(response)

A simple helper function that takes the final report from state and writes it to a markdown file called report.md.

def run_agent():
memory = MemorySaver()
builder = StateGraph(AgentState)

builder.add_node("fetch_logs_node", fetch_logs_node)
builder.add_node("classify_logs_node", classify_logs_node)
builder.add_node("mitre_mapping_node", mitre_mapping_node)
builder.add_node("enrich_content_node", enrich_content_node)
builder.add_node("generate_report_node", generate_report_node)
builder.add_node("fetch_more_information_node", fetch_more_information_node)

builder.set_entry_point("fetch_logs_node")

builder.add_edge("fetch_logs_node", "classify_logs_node")

builder.add_conditional_edges(
"classify_logs_node",
should_continue,
{
"mitre_mapping_node": "mitre_mapping_node",
"fetch_more_information_node": "fetch_more_information_node"
}
)

builder.add_edge("fetch_more_information_node", "classify_logs_node")
builder.add_edge("mitre_mapping_node", "enrich_content_node")
builder.add_edge("enrich_content_node", "generate_report_node")
builder.add_edge("generate_report_node", END)

graph = builder.compile(checkpointer=memory)

thread = {"configurable": {"thread_id": "1"}}

result = graph.invoke(
{
"raw_logs": [],
"suspicious_events": "",
"category": "",
"confidence_score": 0.0,
"mitre_ttps": "",
"mitre_reasoning": "",
"related_events": [],
"report": "",
"current_timestamp": "2020-05-01 23:22:00",
"iteration_count": 0
},
thread
)

return result

This is the main entry point of the agent. This function builds the LangGraph state machine by registering all nodes, defining the edges between them, and compiling the graph with an in-memory checkpointer for state persistence across all nodes. The graph is then invoked with an initial empty state and a starting timestamp. Once complete, it returns the final state containing the full threat hunting report.

result = run_agent()
save_report(result)

This code just runs the agent by calling the run_agent function
and saves the report by calling the save_report function and passing
the result which we got by calling the agent

Now we are all done! Click run all to run all cells to run and test the output of the agent that we have created.

As you can see below, the agent ran in ~55 seconds

Press enter or click to view image in full size

Here is the detailed report that the agent created:

Press enter or click to view image in full size

Here is a workflow showing the interconncetion of all nodes:

Press enter or click to view image in full size

That’s it! In under 60 seconds the agent fetched real APT29 attack logs, classified suspicious activity, mapped it to MITRE ATT&CK, enriched the context, and produced a full SOC-ready incident report with KQL hunting queries, all without any human involvement.

The full source code is available on my GitHub. If you found this useful, drop a star on the repo and feel free to open a PR, all contributions are welcome!

If you have any questions or want to discuss the project, feel free to connect with me on LinkedIn.

Elasticsearch python client docs:

https://www.elastic.co/docs/reference/elasticsearch/clients/python


文章来源: https://infosecwriteups.com/my-ai-agent-hunted-apt29-under-60-seconds-heres-how-i-built-it-f98dfbdd115c?source=rss----7b722bfd1b8d---4
如有侵权请联系:admin#unsafe.sh