Intitial release

This commit is contained in:
John husxdev 2025-04-12 01:40:13 -05:00
parent 299038fbc7
commit fa8caa5116
21 changed files with 1234 additions and 0 deletions

3
.env Normal file
View file

@ -0,0 +1,3 @@
TRIAGE_API_KEY=your_triage_api_key_here
TELEGRAM_BOT_TOKEN=your_telegram_bot_token_here
TELEGRAM_CHAT_ID=your_telegram_chat_id_here

View file

@ -0,0 +1,31 @@
import os
from asyncio import sleep
class TelegramNotificationClient:
def __init__(self):
self.token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
self.chat_id = os.environ.get("TELEGRAM_CHAT_ID", "")
async def send_bot_message(self, message: str):
if not self.token or not self.chat_id:
return
try:
import httpx
url = f"https://api.telegram.org/bot{self.token}/sendMessage"
data = {
"chat_id": self.chat_id,
"text": message,
"parse_mode": "Markdown"
}
async with httpx.AsyncClient() as client:
await client.post(url, json=data)
except Exception as e:
print(f"Failed to send Telegram notification: {e}")
async def tg_bot_polling():
"""Simple polling function to keep the Telegram bot running"""
while True:
await sleep(60) # Just sleep in a loop - in a real bot this would check for messages

BIN
constants.zip Normal file

Binary file not shown.

8
constants/regex.py Normal file
View file

@ -0,0 +1,8 @@
# Regular expressions for detecting various tokens and URLs
WEBHOOK_REGEX = r"(https?://(?:ptb\.|canary\.)?discord(?:app)?\.com/api/webhooks/\d+/[\w-]+)"
WEBHOOK_REGEX_BASE64 = r"(?:['\"]([\w+/=]+)['\"])"
DISCORD_TOKEN_MATCHER = r"[MN][A-Za-z\d]{23}\.[\w-]{6}\.[\w-]{27}"
DISCORD_TOKEN_MATCHER_BASE64 = r"['\"]([\w+/=]{24,})['\"]"
TELEGRAM_REGEX = r"\d{8,10}:[A-Za-z0-9_-]{35}"
EMAIL_MATCHER = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"

626
crawler.py Normal file
View file

@ -0,0 +1,626 @@
from asyncio import run, create_task, gather, sleep
from base64 import b64decode
from html import escape
from os import environ, getcwd, listdir, path, remove, makedirs, chmod
from random import randint
from re import findall
from shutil import move
from sqlite3 import connect
from time import time
from traceback import format_exc
from typing import List, Dict, Optional, Set, BinaryIO, Union
from dotenv import load_dotenv
from loguru import logger
from yara_scanner import YaraScanner
from client.telegram_notification import TelegramNotificationClient, tg_bot_polling
from constants.regex import WEBHOOK_REGEX, DISCORD_TOKEN_MATCHER, WEBHOOK_REGEX_BASE64, DISCORD_TOKEN_MATCHER_BASE64, \
TELEGRAM_REGEX, EMAIL_MATCHER
from methods.decompilers import Decompiler
from methods.impl.amnesia_decompiler import AmnesiaDecompiler
from methods.impl.discord_rat_decompiler import DiscordRatDecompiler
from methods.impl.exela_stealer_decompiler import ExelaStealerDecompiler
from methods.impl.legacy_notoken_decompile import LegacyNotokenDecompiler
from methods.impl.luna_grabber_decompiler import LunaGrabberDecompiler
from methods.impl.notoken_decompile import NotokenDecompiler
from methods.impl.pysilon_decompiler import PysilonDecompiler
from methods.impl.umbral_decompiler import UmbralDecompiler
from model.common_models import DecompileResult
from model.triage_models import TriageNewSampleData, AnalyzeResult
from utils.blank import base64_decode_then_filter
from utils.common_utils import HttpxHelperClient
from utils.decompile_utils import clean_up_temp_files, extract_pyinstaller_exe, find_payload_files, decompile_pyc, \
attempts_to_get_entry_point
from utils.discord_token_validator import validate_discord_token
from utils.telegram_token_validator import validate_telegram_token
from utils.webhook_util import validate_webhooks
logger.add(
f"{getcwd()}/data/log/triage.log", level='WARNING',
colorize=False, backtrace=True, diagnose=True, rotation="20MB")
DECOMPILERS: Dict[str, Union[List[Decompiler], Decompiler]] = {
"rayxstealer": [NotokenDecompiler(), LegacyNotokenDecompiler()],
"pysilonstealer": PysilonDecompiler(),
"exelastealer": ExelaStealerDecompiler(),
"blankgrabber": AmnesiaDecompiler(),
"amnesiastealer": AmnesiaDecompiler(),
"lunagrabber": LunaGrabberDecompiler(),
"umbralstealer": UmbralDecompiler(),
"discordratstealer": DiscordRatDecompiler(),
}
class TriageClient:
def __init__(self):
load_dotenv(".env")
self.sample_file_count = 0
self.download_database = connect("data/malware_db.db")
self.stealer_scanner = YaraScanner()
self.stealer_scanner.track_yara_file("rules/infosteal.yar")
self.stealer_scanner.load_rules()
self.network_scanner = YaraScanner()
self.network_scanner.track_yara_file("rules/network.yar")
self.network_scanner.load_rules()
self.dangerous_scanner = YaraScanner()
self.dangerous_scanner.track_yara_file("rules/dangerous.yar")
self.dangerous_scanner.load_rules()
self.queued_files_to_delete = set()
self.headers = {"Authorization": f'Bearer {environ.get("TRIAGE_API_KEY", "")}'}
self.client = HttpxHelperClient()
self.telegram_client = TelegramNotificationClient()
async def insert_malware_sample_data(
self, sha256: str, file_name: str, analyze_id: str, family: str
):
self.download_database.execute(
"""
insert or replace into malwares (
sha256, file_name, analyze_id, identified_family, first_seen_time) VALUES (?, ?, ?, ?, ?)
""",
(sha256, file_name, analyze_id, family, int(time())),
)
self.download_database.commit()
async def get_new_samples(self) -> List[TriageNewSampleData]:
actual_search_query = escape("family:blankgrabber OR family:exelastealer OR tags:pyinstaller "
"OR family:pysilon OR family:lunagrabber OR family:umbral "
"OR family:discordrat")
result = await self.client.get(
f"https://tria.ge/api/v0/search?query={actual_search_query}",
headers=self.headers,
)
if result.status_code != 200:
logger.warning(f'Failed to retrieve new samples. {result.status_code}')
return []
sha256_set = set()
undownloaded_data: List[TriageNewSampleData] = []
result_data: List[Dict[str, str]] = result.json().get("data", [])
for r in result_data:
await self._analyze_triage_response(r, sha256_set, undownloaded_data)
logger.info(f"New sample retrieval is completed. Size: {len(undownloaded_data)}")
return undownloaded_data
async def _analyze_triage_response(self, r: dict, sha256_set: Set[str],
undownloaded_data: List[TriageNewSampleData]):
sha256 = r.get("sha256", "")
if sha256 and sha256 not in sha256_set:
sha256_set.add(sha256)
if await self.is_not_downloaded(r.get("sha256", "")):
summary = await self.client.get(
f"https://tria.ge/api/v0/samples/{r.get('id', '')}/summary",
headers=self.headers,
)
summary_json = summary.json()
tags = []
for key, value in summary_json.get("tasks", {}).items():
analyze_tag = value.get('tags', [])
tags += analyze_tag
if summary_json.get("score", -1) >= 7:
logger.info(f'Retrieved sample: {r.get("filename")} -> score {summary_json.get("score", -1)}')
undownloaded_data.append(
TriageNewSampleData(
r.get("filename", ""), sha256, r.get("id", ""), tags=list(set(tags))
)
)
else:
logger.info(f'Malware has low score, emitting: {r.get("filename")}')
await self.insert_malware_sample_data(
sha256, r.get("filename", ""), r.get("analyze_id", ""), ""
)
async def is_not_downloaded(self, sha256: str):
result = self.download_database.execute(
"""
select * from malwares where sha256 = ?
""",
(sha256,),
)
return result.fetchone() is None
async def download_sample(
self, samples: List[TriageNewSampleData]
) -> List[TriageNewSampleData]:
for sample in samples:
download_url = f"https://tria.ge/api/v0/samples/{sample.analyze_id}/sample"
file_name = f"{getcwd()}/samples/{sample.analyze_id}.{sample.file_name.split('.')[-1]}"
sample_data_path = await self.client.download(
download_url, file_name, headers=self.headers
)
chmod(sample_data_path, 0o777)
with open(sample_data_path, 'rb') as file:
file_header = file.read(6)
if file_header[:2] == b'MZ':
await self._executable_winrar_check(file, sample, sample_data_path)
elif file_header.find(b'Rar!') != -1:
if await self._extract_rar_file(sample, sample_data_path):
remove(sample_data_path)
else:
sample.file_path = sample_data_path
await self.insert_malware_sample_data(
sample.sha256,
sample.file_name,
sample.analyze_id,
"UNKNOWN",
)
elif file_header.find(b'PK\x03\x04') != -1:
if await self._extract_zip_file(sample, sample_data_path):
remove(sample_data_path)
else:
sample.file_path = sample_data_path
await self.insert_malware_sample_data(
sample.sha256,
sample.file_name,
sample.analyze_id,
"UNKNOWN",
)
else:
logger.warning(f'Unknown file type detected for sample: {sample.analyze_id}, '
f'first four bytes hex: {file_header}')
sample.file_path = sample_data_path
return samples
async def _executable_winrar_check(self, file: BinaryIO, sample: TriageNewSampleData, sample_data_path: str):
rar_signature_found = False
setup_signature_found = False
actual_payload_rar_file = sample_data_path + '.rar'
while chunk := file.read(4096):
if not rar_signature_found and chunk.find(b'Rar!') != -1:
rar_signature_found = True
if not setup_signature_found and chunk.find(b'Setup=') != -1:
setup_signature_found = True
if rar_signature_found and setup_signature_found:
move(sample_data_path, actual_payload_rar_file)
if not await self._extract_rar_file(sample, actual_payload_rar_file):
await self.insert_malware_sample_data(
sample.sha256,
sample.file_name,
sample.analyze_id,
"UNKNOWN",
)
break
else:
sample.file_path = sample_data_path
async def analyze_yara_rules(self, pathes: List[TriageNewSampleData]) -> List[AnalyzeResult]:
analyze_results = []
for sample in [x for x in pathes if x.file_path]:
families = []
network_usage = []
dangerous_tags = []
sample.file_path = sample.file_path.replace('\x00', '')
if self.stealer_scanner.scan(sample.file_path):
scan_result = self.stealer_scanner.scan_results
families = list(filter(lambda x: x, [x.get("rule", "").lower() for x in scan_result]))
else:
entry = attempts_to_get_entry_point(sample.file_path)
try:
if entry.entry_point and self.stealer_scanner.scan(entry.entry_point.replace('\x00', '')):
scan_result = self.stealer_scanner.scan_results
families = list(filter(lambda x: x, [x.get("rule", "").lower() for x in scan_result]))
if entry.entry_point and self.network_scanner.scan(sample.file_path):
network_usage = list(filter(
lambda x: x, [x.get("rule", "").lower() for x in self.network_scanner.scan_results]))
if entry.entry_point and self.network_scanner.scan(entry.entry_point):
network_usage += list(filter(
lambda x: x, [x.get("rule", "").lower() for x in self.network_scanner.scan_results]))
if entry.entry_point and self.dangerous_scanner.scan(entry.entry_point.replace('\x00', '')):
scan_result = self.dangerous_scanner.scan_results
dangerous_tags = list(filter(lambda x: x, [x.get("rule", "") for x in scan_result]))
sample.behaviors = dangerous_tags
if entry.extraction_path:
clean_up_temp_files(entry.extraction_path)
if entry.entry_point:
await self.telegram_client.send_bot_message(
f'Entry point detected: {entry.entry_point.split("/")[-1]}\n\n'
f'Yara matches: {", ".join(families + sample.tags)}\n'
f'Behavior: {", ".join(dangerous_tags)}\n\n'
f'url: https://tria.ge/{sample.analyze_id}')
except Exception as err:
logger.error(f'Unexpected issue occurred: {err}')
await self.insert_malware_sample_data(
sample.sha256,
sample.file_name,
sample.analyze_id,
"UNKNOWN",
)
analyze_results.append(AnalyzeResult(families, network_usage, sample))
return analyze_results
async def _analyze_network(self, value: AnalyzeResult) -> List[DecompileResult]:
extraction_result: List[DecompileResult] = []
if not value.network:
entry = attempts_to_get_entry_point(value.triage_object.file_path)
valid_entries = []
if entry.entry_point:
valid_entries = await self._fetch_results_from_entry_point(entry.entry_point, value)
if entry.extraction_path:
clean_up_temp_files(entry.extraction_path)
return valid_entries
extraction_dir = extract_pyinstaller_exe(value.triage_object.file_path)
pyc_files = attempts_to_get_entry_point(value.triage_object.file_path)
if not pyc_files.entry_point:
return extraction_result
source_codes = decompile_pyc(pyc_files.entry_point)
logger.warning(f'Detected network usage: {value.network} for sample {value.triage_object.analyze_id}')
for network in set(value.network):
match network.lower():
case 'usesdiscordwebhook':
extraction_result += await self._extract_webhook_from_source([source_codes], value)
if not value.triage_object.family:
value.triage_object.family = 'webhook_malware'
case 'discordpyframework':
extraction_result.append(await self._analyze_discord_py_framework([source_codes], value))
extraction_result += await self._extract_webhook_from_source([source_codes], value)
if not value.triage_object.family:
value.triage_object.family = 'discord_bot_malware'
case 'pythonsmtpusage':
extraction_result.append(await self._extract_email_from_source([source_codes]))
if not value.triage_object.family:
value.triage_object.family = 'smtp_malware'
if extraction_dir:
clean_up_temp_files(extraction_dir)
return extraction_result
@staticmethod
async def _extract_email_from_source(source_codes: List[str]) -> DecompileResult:
result = []
for code in source_codes:
email = list(filter(lambda x: x, findall(EMAIL_MATCHER, code)))
result += email
return DecompileResult([], result)
async def _fetch_results_from_entry_point(self, entry_file: str, value: AnalyzeResult) -> List[DecompileResult]:
valid_entries: List[str] = []
logger.info(f'Fetching result from the entry point for {value.triage_object.analyze_id}')
decompiled_code = decompile_pyc(entry_file)
possible_webhook = findall(WEBHOOK_REGEX, decompiled_code)
if possible_webhook:
# ('webhook_url', 'app')
self.queued_files_to_delete.add(value.triage_object.file_path)
valid_entries += [x[0] for x in possible_webhook]
if not value.triage_object.family:
value.triage_object.family = 'webhook_malware'
possible_base64_webhook = findall(WEBHOOK_REGEX_BASE64, decompiled_code)
if possible_base64_webhook:
self.queued_files_to_delete.add(value.triage_object.file_path)
valid_entries += base64_decode_then_filter(possible_base64_webhook)
if not value.triage_object.family:
value.triage_object.family = 'webhook_malware'
verified_entries: List[DecompileResult] = [
validate_webhooks(
list(set(valid_entries)),
[value.triage_object.family] + value.triage_object.behaviors + value.triage_object.tags,
value.triage_object.sha256)]
verified_entries += await self._extract_other_tokens(decompiled_code, value)
return verified_entries
async def _extract_other_tokens(self, decompiled_code: str, value: AnalyzeResult) -> DecompileResult:
valid_entries = []
invalid_entries = []
bot_tokens = list(set(findall(DISCORD_TOKEN_MATCHER, decompiled_code)))
if bot_tokens:
self.queued_files_to_delete.add(value.triage_object.file_path)
valid_entries += [x for x in bot_tokens if validate_discord_token(x, True).success]
if not value.triage_object.family:
value.triage_object.family = 'discord_bot_malware'
telegram_bot_tokens = list(set(findall(TELEGRAM_REGEX, decompiled_code)))
if telegram_bot_tokens:
self.queued_files_to_delete.add(value.triage_object.file_path)
for x in bot_tokens:
if validate_telegram_token(x):
valid_entries.append(x)
else:
invalid_entries.append(x)
if not value.triage_object.family:
value.triage_object.family = 'telegram_bot_usage'
return DecompileResult(invalid_entries, valid_entries)
async def _extract_webhook_from_source(self, source_codes: List[str], value: AnalyzeResult) -> list[
DecompileResult]:
extraction_result = []
for code in source_codes:
if code.startswith('pyi_') or code.startswith('_'):
continue
webhooks = findall(WEBHOOK_REGEX, code)
if webhooks:
extraction_result.append(
validate_webhooks(
webhooks,
[value.triage_object.family] + value.triage_object.tags + value.triage_object.behaviors,
value.triage_object.sha256
))
self.queued_files_to_delete.add(value.triage_object.file_path)
return extraction_result
async def _analyze_discord_py_framework(self, src_codes: List[str], value: AnalyzeResult) -> DecompileResult:
extraction_result_valid = []
extraction_result_invalid = []
all_results = []
for code in [x for x in src_codes if not x.startswith('pyi')]:
bot_tokens_plain_text = list(set(findall(DISCORD_TOKEN_MATCHER, code)))
for token in bot_tokens_plain_text:
all_results.append(token)
if validate_discord_token(token, True).success:
extraction_result_valid.append(token)
else:
extraction_result_invalid.append(token)
bot_token_base64: List[str] = list(set(findall(DISCORD_TOKEN_MATCHER_BASE64, code)))
for token in bot_token_base64:
if token:
token = token.replace("'", '')
all_results.append(token)
decoded_token = b64decode(token).decode('utf-8', errors='ignore')
if validate_discord_token(decoded_token, True).success:
extraction_result_valid.append(decoded_token)
elif validate_discord_token(decoded_token[::-1], True).success:
extraction_result_valid.append(decoded_token[::-1])
else:
extraction_result_invalid.append(decoded_token)
if not all_results:
await self.telegram_client.send_bot_message(
f'please investigate this sample: https://tria.ge/{value.triage_object.analyze_id}\n\n'
f'Usage: discord.py bot framework.\n'
f'Failed to retrieve bot token using usual method.')
else:
self.queued_files_to_delete.add(value.triage_object.file_path)
return DecompileResult(extraction_result_invalid, extraction_result_valid)
async def decompile_and_get_valids(self, rule_matched: List[AnalyzeResult]) -> List[DecompileResult]:
results: List[DecompileResult] = []
for value in rule_matched:
malware_family_result = await self._analyze_malware_family(value)
if malware_family_result is not None:
results.append(malware_family_result)
logger.info(f"Removing: {value.triage_object.file_path}")
continue
network_analyze_result = await self._analyze_network(value)
if network_analyze_result:
results += network_analyze_result
await self._send_success_notification(network_analyze_result, value)
if self.queued_files_to_delete:
for file in set(self.queued_files_to_delete):
clean_up_temp_files(file)
self.queued_files_to_delete = set()
return list(set(results))
async def _analyze_malware_family(self, value: AnalyzeResult) -> Optional[DecompileResult]:
if not value.families:
logger.warning(
f"Failed to determine sample family for {value.triage_object.analyze_id}"
)
await self.insert_malware_sample_data(
value.triage_object.sha256,
value.triage_object.file_name,
value.triage_object.analyze_id,
"UNKNOWN",
)
return None
# from here, we know this malware belongs to a known family by us.
family = value.families[0]
logger.warning(f"Identified {value.triage_object.analyze_id} is {value.families[0]}")
try:
function_result = DECOMPILERS[family]
except KeyError:
function_result = None
result: List[DecompileResult] = []
if function_result is not None:
if isinstance(function_result, list):
for function in function_result:
result_temp = function.decompile(
value.triage_object.file_path, value.triage_object.tags + value.triage_object.behaviors)
result.append(result_temp)
else:
result = [function_result.decompile(
value.triage_object.file_path, value.triage_object.tags + value.triage_object.behaviors,
value.triage_object.sha256)]
await self.insert_malware_sample_data(
value.triage_object.sha256,
value.triage_object.file_name,
value.triage_object.analyze_id,
family,
)
remove(value.triage_object.file_path)
value.triage_object.family = family
# string extraction should have no invalid data.
if result:
actual_result = await self._send_success_notification(result, value)
return actual_result
return None
async def _send_success_notification(self, result: List[DecompileResult], value: AnalyzeResult) -> DecompileResult:
valids = []
invalids = []
for res in result:
valids += res.valid_result
invalids += res.invalid_result
if valids or invalids:
await self.telegram_client.send_bot_message(
f"Extracted valid result: \n```\n{"\n".join([x for x in valids])}\n```\n"
f"Extracted invalid result: \n```\n{"\n".join([x for x in invalids])}\n```\n"
f"from https://tria.ge/{value.triage_object.analyze_id}.\n\n"
f"Family: {value.triage_object.family}\n"
f"Behaviors: {', '.join(value.triage_object.behaviors)}"
)
return DecompileResult(invalids, valids)
async def _extract_zip_file(self, sample: TriageNewSampleData, sample_data_path: str) -> bool:
from zipfile import ZipFile
extraction_dir = f"{getcwd()}/samples/{sample.analyze_id}_zipextracted"
makedirs(extraction_dir, exist_ok=True)
try:
with ZipFile(sample_data_path) as z:
z.extractall(extraction_dir)
await self.find_exe_from_extraction(extraction_dir, sample)
return True
except Exception as err:
logger.warning(f'Failed to extract payload from zip file for sample: {sample.analyze_id}. {err}')
clean_up_temp_files(extraction_dir)
return False
async def _extract_rar_file(self, sample: TriageNewSampleData, sample_data_path: str) -> bool:
from rarfile import RarFile
extraction_dir = f"{getcwd()}/samples/{sample.analyze_id}_rarextracted"
makedirs(extraction_dir, exist_ok=True)
try:
with RarFile(sample_data_path) as r:
r.extractall(extraction_dir)
await self.find_exe_from_extraction(extraction_dir, sample)
return True
except Exception as err:
logger.warning(f'Failed to extract payload from rar file for sample: {sample.analyze_id}. {err}')
clean_up_temp_files(extraction_dir)
return False
async def find_exe_from_extraction(self, extraction_dir, sample):
exe_files = find_payload_files(extraction_dir, '.exe', '')
if exe_files:
for file in exe_files:
self.stealer_scanner.scan(file)
if self.stealer_scanner.scan_results:
exe_files = file
break
else:
exe_files = exe_files[0]
final_payload_file = f"{getcwd()}/samples/{exe_files.split('/')[-1]}"
move(exe_files, final_payload_file)
sample.file_path = final_payload_file
else:
sample.file_path = None
clean_up_temp_files(extraction_dir)
async def _main():
triage_client = TriageClient()
while True:
try:
await worker(triage_client)
except Exception as err:
logger.exception(f'Worker task failed with error: {err}')
await triage_client.telegram_client.send_bot_message(
f"Failed to do work: {err}\n\n{err.__class__.__name__}\n\n"
f"Traceback: {format_exc()}")
await sleep(randint(35, 55))
async def main():
main_task = create_task(_main())
bot_task = create_task(tg_bot_polling())
await gather(main_task, bot_task)
async def worker(triage_client):
newest_sample = await triage_client.get_new_samples()
logger.info(f"Found {len(newest_sample)} new samples.")
data_pathes = await triage_client.download_sample(newest_sample)
rule_matched = await triage_client.analyze_yara_rules(data_pathes)
decompile_results = await triage_client.decompile_and_get_valids(rule_matched)
if decompile_results:
logger.info(f"Here is our results: {decompile_results}")
directory_path = f'{getcwd()}/samples'
actual_files = [f for f in listdir(directory_path) if path.isfile(path.join(directory_path, f))]
for file in actual_files:
clean_up_temp_files(f'{getcwd()}/samples/{file}')
logger.info("Sleeping 2 minutes for next batch.")
if __name__ == "__main__":
run(main())

10
methods/decompilers.py Normal file
View file

@ -0,0 +1,10 @@
from abc import ABC, abstractmethod
from typing import List
from model.common_models import DecompileResult
class Decompiler(ABC):
@abstractmethod
def decompile(self, file_path: str, tags: List[str], sha256: str = "") -> DecompileResult:
"""Decompile the given file and return the result"""
pass

View file

@ -0,0 +1,9 @@
from typing import List
from methods.decompilers import Decompiler
from model.common_models import DecompileResult
class AmnesiaDecompiler(Decompiler):
def decompile(self, file_path: str, tags: List[str], sha256: str = "") -> DecompileResult:
# Placeholder implementation
return DecompileResult([], [])

View file

@ -0,0 +1,9 @@
from typing import List
from methods.decompilers import Decompiler
from model.common_models import DecompileResult
class LegacyNotokenDecompiler(Decompiler):
def decompile(self, file_path: str, tags: List[str], sha256: str = "") -> DecompileResult:
# Placeholder implementation
return DecompileResult([], [])

View file

@ -0,0 +1,9 @@
from typing import List
from methods.decompilers import Decompiler
from model.common_models import DecompileResult
class NotokenDecompiler(Decompiler):
def decompile(self, file_path: str, tags: List[str], sha256: str = "") -> DecompileResult:
# Placeholder implementation
return DecompileResult([], [])

View file

@ -0,0 +1,9 @@
from typing import List
from methods.decompilers import Decompiler
from model.common_models import DecompileResult
class PysilonDecompiler(Decompiler):
def decompile(self, file_path: str, tags: List[str], sha256: str = "") -> DecompileResult:
# Placeholder implementation
return DecompileResult([], [])

7
model/common_models.py Normal file
View file

@ -0,0 +1,7 @@
from dataclasses import dataclass
from typing import List
@dataclass
class DecompileResult:
invalid_result: List[str]
valid_result: List[str]

18
model/triage_models.py Normal file
View file

@ -0,0 +1,18 @@
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class TriageNewSampleData:
file_name: str
sha256: str
analyze_id: str
file_path: Optional[str] = None
family: str = ""
tags: List[str] = field(default_factory=list)
behaviors: List[str] = field(default_factory=list)
@dataclass
class AnalyzeResult:
families: List[str]
network: List[str]
triage_object: TriageNewSampleData

134
rules/dangerous.yar Normal file
View file

@ -0,0 +1,134 @@
rule Keylogging {
meta:
description = "Detects keylogging functionality"
author = "Malware Researcher"
severity = "High"
date = "2023-01-01"
strings:
$import1 = "import keyboard" ascii wide
$import2 = "from pynput" ascii wide
$hook1 = "keyboard.hook" ascii wide
$hook2 = "keyboard.Listener" ascii wide
$hook3 = "keyboard.on_press" ascii wide
$hook4 = "KeyboardEvent" ascii wide
$func1 = "on_press" ascii wide
$func2 = "on_release" ascii wide
condition:
1 of ($import*) and 1 of ($hook*) or 1 of ($func*)
}
rule ScreenCapture {
meta:
description = "Detects screen capture functionality"
author = "Malware Researcher"
severity = "High"
date = "2023-01-01"
strings:
$import1 = "import pyautogui" ascii wide
$import2 = "from PIL import ImageGrab" ascii wide
$import3 = "import mss" ascii wide
$func1 = "pyautogui.screenshot" ascii wide
$func2 = "ImageGrab.grab" ascii wide
$func3 = "mss().shot" ascii wide
$func4 = ".save(" ascii wide
condition:
1 of ($import*) and 1 of ($func*)
}
rule BrowserDataTheft {
meta:
description = "Detects browser data theft functionality"
author = "Malware Researcher"
severity = "High"
date = "2023-01-01"
strings:
$browser1 = "Chrome" nocase ascii wide
$browser2 = "Firefox" nocase ascii wide
$browser3 = "Edge" nocase ascii wide
$browser4 = "Opera" nocase ascii wide
$path1 = "AppData\\Local\\Google\\Chrome\\User Data" ascii wide
$path2 = "AppData\\Roaming\\Mozilla\\Firefox\\Profiles" ascii wide
$path3 = "AppData\\Local\\Microsoft\\Edge\\User Data" ascii wide
$data1 = "Login Data" ascii wide
$data2 = "Cookies" ascii wide
$data3 = "Web Data" ascii wide
$sql1 = "SELECT" ascii wide
$sql2 = "FROM logins" ascii wide
$sql3 = "FROM cookies" ascii wide
$crypto1 = "CryptUnprotectData" ascii wide
$crypto2 = "Crypt.decrypt" ascii wide
condition:
(2 of ($browser*) or 1 of ($path*)) and
(1 of ($data*)) and
(1 of ($sql*) or 1 of ($crypto*))
}
rule SystemInformationCollection {
meta:
description = "Detects system information collection functionality"
author = "Malware Researcher"
severity = "Medium"
date = "2023-01-01"
strings:
$import1 = "import platform" ascii wide
$import2 = "import socket" ascii wide
$import3 = "import uuid" ascii wide
$import4 = "import psutil" ascii wide
$func1 = "platform.system" ascii wide
$func2 = "platform.release" ascii wide
$func3 = "socket.gethostname" ascii wide
$func4 = "uuid.getnode" ascii wide
$func5 = "psutil.cpu_count" ascii wide
$func6 = "psutil.disk_usage" ascii wide
$func7 = "os.environ" ascii wide
$func8 = "os.getlogin" ascii wide
condition:
2 of ($import*) and 2 of ($func*)
}
rule AntiVMTechniques {
meta:
description = "Detects anti-VM/sandbox evasion techniques"
author = "Malware Researcher"
severity = "High"
date = "2023-01-01"
strings:
$vm1 = "VMware" nocase ascii wide
$vm2 = "VirtualBox" nocase ascii wide
$vm3 = "QEMU" nocase ascii wide
$vm4 = "Xen" nocase ascii wide
$vm5 = "KVM" nocase ascii wide
$vm6 = "Parallels" nocase ascii wide
$vm7 = "Hyper-V" nocase ascii wide
$vm8 = "Virtual Machine" nocase ascii wide
$check1 = "wmic.exe" nocase ascii wide
$check2 = "systeminfo" nocase ascii wide
$check3 = "get_mac" nocase ascii wide
$check4 = "registry" nocase ascii wide
$check5 = "check_vm" nocase ascii wide
$check6 = "is_vm" nocase ascii wide
condition:
2 of ($vm*) and 1 of ($check*)
}
rule SelfDestructCode {
meta:
description = "Detects self-destructing code functionality"
author = "Malware Researcher"
severity = "High"
date = "2023-01-01"
strings:
$del1 = "os.remove" ascii wide
$del2 = "os.unlink" ascii wide
$del3 = "shutil.rmtree" ascii wide
$path1 = "__file__" ascii wide
$path2 = "sys.argv[0]" ascii wide
$cmd1 = "cmd.exe /c" ascii wide
$cmd2 = "del " ascii wide
$cmd3 = "rmdir" ascii wide
$bat1 = ".bat" ascii wide
$bat2 = ".cmd" ascii wide
condition:
1 of ($del*) and (1 of ($path*) or
(1 of ($cmd*) and 1 of ($bat*)))
}

136
rules/infosteal.yar Normal file
View file

@ -0,0 +1,136 @@
import "pe"
rule RayXStealer {
meta:
description = "Detects RayX Stealer malware"
author = "Malware Researcher"
severity = "High"
date = "2023-01-01"
strings:
$s1 = "rayxstealer" nocase ascii wide
$s2 = "ray-x" nocase ascii wide
$token1 = "discordwebhook" nocase ascii wide
$token2 = "getTokens" nocase ascii wide
$browser1 = "Chrome" nocase ascii wide
$browser2 = "Edge" nocase ascii wide
$browser3 = "passwords" nocase ascii wide
$browser4 = "cookies" nocase ascii wide
condition:
($s1 or $s2) and
1 of ($token*) and
2 of ($browser*)
}
rule PysilonStealer {
meta:
description = "Detects Pysilon Stealer malware"
author = "Malware Researcher"
severity = "High"
date = "2023-01-01"
strings:
$id1 = "pysilon" nocase ascii wide
$id2 = "pysilonstealer" nocase ascii wide
$grab1 = "grab_discord" nocase ascii wide
$grab2 = "grab_passwords" nocase ascii wide
$grab3 = "grab_cookies" nocase ascii wide
$grab4 = "system_info" nocase ascii wide
$net1 = "webhook" nocase ascii wide
$net2 = "sendData" nocase ascii wide
condition:
1 of ($id*) and
2 of ($grab*) and
1 of ($net*)
}
rule ExelaStealer {
meta:
description = "Detects Exela Stealer malware"
author = "Malware Researcher"
severity = "High"
date = "2023-01-01"
strings:
$id = "exelastealer" nocase ascii wide
$grab1 = "grab_discord" nocase ascii wide
$grab2 = "grab_browsers" nocase ascii wide
$grab3 = "grab_wallets" nocase ascii wide
$net1 = "webhook_url" nocase ascii wide
$net2 = "send_report" nocase ascii wide
condition:
$id or (2 of ($grab*) and 1 of ($net*))
}
rule BlankGrabber {
meta:
description = "Detects BlankGrabber/AmnesiaGrabber malware"
author = "Malware Researcher"
severity = "High"
date = "2023-01-01"
strings:
$id1 = "blankgrabber" nocase ascii wide
$id2 = "amnesia" nocase ascii wide
$func1 = "grab_tokens" nocase ascii wide
$func2 = "grab_cookies" nocase ascii wide
$func3 = "grab_passwords" nocase ascii wide
$func4 = "screenshot" nocase ascii wide
$net1 = "webhook" nocase ascii wide
$enc1 = "encrypt" nocase ascii wide
$enc2 = "decrypt" nocase ascii wide
condition:
1 of ($id*) and
2 of ($func*) and
($net1 or 1 of ($enc*))
}
rule LunaGrabber {
meta:
description = "Detects Luna Grabber malware"
author = "Malware Researcher"
severity = "High"
date = "2023-01-01"
strings:
$id = "lunagrabber" nocase ascii wide
$grab1 = "grab_tokens" nocase ascii wide
$grab2 = "grab_password" nocase ascii wide
$grab3 = "grab_cookie" nocase ascii wide
$net = "webhook" nocase ascii wide
condition:
$id or (2 of ($grab*) and $net)
}
rule UmbralStealer {
meta:
description = "Detects Umbral Stealer malware"
author = "Malware Researcher"
severity = "High"
date = "2023-01-01"
strings:
$id = "umbral" nocase ascii wide
$grab1 = "get_tokens" nocase ascii wide
$grab2 = "get_passwords" nocase ascii wide
$grab3 = "get_cookies" nocase ascii wide
$grab4 = "get_wallets" nocase ascii wide
$net1 = "webhook" nocase ascii wide
$net2 = "send_data" nocase ascii wide
condition:
$id or (2 of ($grab*) and 1 of ($net*))
}
rule DiscordRatStealer {
meta:
description = "Detects Discord RAT malware"
author = "Malware Researcher"
severity = "High"
date = "2023-01-01"
strings:
$id1 = "discordrat" nocase ascii wide
$id2 = "discord_rat" nocase ascii wide
$cmd1 = "command_handler" nocase ascii wide
$cmd2 = "remote_control" nocase ascii wide
$cmd3 = "remote_cmd" nocase ascii wide
$discord1 = "discord.py" nocase ascii wide
$discord2 = "discord_webhook" nocase ascii wide
$discord3 = "bot.command" nocase ascii wide
condition:
1 of ($id*) and
(1 of ($cmd*) and 1 of ($discord*))
}

102
rules/network.yar Normal file
View file

@ -0,0 +1,102 @@
rule UsesDiscordWebhook {
meta:
description = "Detects Discord webhook usage"
author = "Malware Researcher"
severity = "Medium"
date = "2023-01-01"
strings:
$webhook1 = "https://discord.com/api/webhooks/" ascii wide
$webhook2 = "https://discordapp.com/api/webhooks/" ascii wide
$webhook3 = "https://ptb.discord.com/api/webhooks/" ascii wide
$webhook4 = "https://canary.discord.com/api/webhooks/" ascii wide
$func1 = "requests.post" ascii wide
$func2 = "httpx.post" ascii wide
$func3 = "aiohttp" ascii wide
$func4 = "urllib" ascii wide
condition:
1 of ($webhook*) and 1 of ($func*)
}
rule DiscordPyFramework {
meta:
description = "Detects Discord.py bot framework usage"
author = "Malware Researcher"
severity = "Medium"
date = "2023-01-01"
strings:
$import1 = "import discord" ascii wide
$import2 = "from discord.ext import commands" ascii wide
$class1 = "discord.Client" ascii wide
$class2 = "commands.Bot" ascii wide
$func1 = "bot.run" ascii wide
$func2 = "client.run" ascii wide
$token = /['"][A-Za-z\d]{24}\.[\w-]{6}\.[\w-]{27}['"]/ ascii wide
condition:
(1 of ($import*) or 1 of ($class*)) and
(1 of ($func*) or $token)
}
rule PythonSMTPUsage {
meta:
description = "Detects Python SMTP mail sending functionality"
author = "Malware Researcher"
severity = "Medium"
date = "2023-01-01"
strings:
$import1 = "import smtplib" ascii wide
$import2 = "from email" ascii wide
$func1 = "smtplib.SMTP" ascii wide
$func2 = "smtp.send_message" ascii wide
$func3 = "smtp.sendmail" ascii wide
$auth1 = "smtp.login" ascii wide
$provider1 = "smtp.gmail.com" ascii wide
$provider2 = "smtp-mail.outlook.com" ascii wide
$provider3 = "smtp.mail.yahoo.com" ascii wide
condition:
$import1 and
(1 of ($func*)) and
($auth1 or 1 of ($provider*))
}
rule HTTPRequests {
meta:
description = "Detects HTTP request libraries usage for data exfiltration"
author = "Malware Researcher"
severity = "Low"
date = "2023-01-01"
strings:
$import1 = "import requests" ascii wide
$import2 = "import httpx" ascii wide
$import3 = "import aiohttp" ascii wide
$import4 = "from urllib" ascii wide
$func1 = ".post(" ascii wide
$func2 = ".get(" ascii wide
$func3 = "urlopen(" ascii wide
$data1 = "json=" ascii wide
$data2 = "data=" ascii wide
$data3 = "files=" ascii wide
condition:
1 of ($import*) and
1 of ($func*) and
1 of ($data*)
}
rule TelegramBotAPI {
meta:
description = "Detects Telegram Bot API usage"
author = "Malware Researcher"
severity = "Medium"
date = "2023-01-01"
strings:
$url1 = "api.telegram.org/bot" ascii wide
$token = /[0-9]{8,10}:[A-Za-z0-9_-]{35}/ ascii wide
$method1 = "/sendMessage" ascii wide
$method2 = "/sendDocument" ascii wide
$method3 = "/sendPhoto" ascii wide
$import1 = "telebot" ascii wide
$import2 = "telegram.ext" ascii wide
condition:
($url1 and 1 of ($method*)) or
($token and 1 of ($method*)) or
1 of ($import*)
}

17
utils/blank.py Normal file
View file

@ -0,0 +1,17 @@
from base64 import b64decode
from re import findall
from constants.regex import WEBHOOK_REGEX
def base64_decode_then_filter(encoded_strings):
results = []
for encoded_string in encoded_strings:
try:
decoded = b64decode(encoded_string).decode('utf-8', errors='ignore')
webhooks = findall(WEBHOOK_REGEX, decoded)
results.extend(webhooks)
except:
pass
return results

19
utils/common_utils.py Normal file
View file

@ -0,0 +1,19 @@
from httpx import AsyncClient
class HttpxHelperClient:
def __init__(self):
self.client = AsyncClient(timeout=60.0)
async def get(self, url, headers=None):
response = await self.client.get(url, headers=headers)
return response
async def post(self, url, data=None, json=None, headers=None):
response = await self.client.post(url, data=data, json=json, headers=headers)
return response
async def download(self, url, file_path, headers=None):
response = await self.client.get(url, headers=headers)
with open(file_path, 'wb') as f:
f.write(response.content)
return file_path

49
utils/decompile_utils.py Normal file
View file

@ -0,0 +1,49 @@
import os
import shutil
import tempfile
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class EntryPoint:
entry_point: Optional[str] = None
extraction_path: Optional[str] = None
def clean_up_temp_files(directory_path: str) -> None:
"""Remove temporary files and directories"""
try:
if os.path.exists(directory_path):
if os.path.isfile(directory_path):
os.remove(directory_path)
else:
shutil.rmtree(directory_path, ignore_errors=True)
except Exception as e:
print(f"Failed to clean up {directory_path}: {e}")
def extract_pyinstaller_exe(exe_path: str) -> str:
"""Extract PyInstaller executable contents"""
temp_dir = tempfile.mkdtemp(prefix="pyinstaller_extract_")
# In a real implementation, you would use pyinstxtractor or similar tools
# Here we just return the temporary directory
return temp_dir
def find_payload_files(directory: str, extension: str, exclude_pattern: str) -> List[str]:
"""Find files with specific extension in directory"""
result = []
for root, _, files in os.walk(directory):
for file in files:
if file.endswith(extension) and exclude_pattern not in file:
result.append(os.path.join(root, file))
return result
def decompile_pyc(pyc_file: str) -> str:
"""Decompile .pyc file to source code"""
# In a real implementation, you would use uncompyle6 or decompyle3
# Here we just return a placeholder
return f"# Decompiled content of {pyc_file}\n"
def attempts_to_get_entry_point(file_path: str) -> EntryPoint:
"""Try various methods to find the entry point"""
# In a real implementation, this would be more complex
return EntryPoint(entry_point=file_path)

View file

@ -0,0 +1,17 @@
from dataclasses import dataclass
@dataclass
class TokenValidationResult:
success: bool
message: str = ""
def validate_discord_token(token: str, ignore_api_check=False) -> TokenValidationResult:
# Simple validation logic - real implementation would verify with Discord API
if len(token) < 50 or '.' not in token:
return TokenValidationResult(False, "Invalid token format")
if ignore_api_check:
return TokenValidationResult(True, "Token format valid (API check skipped)")
# In a real implementation, you would check with Discord API here
return TokenValidationResult(True, "Token appears valid")

View file

@ -0,0 +1,6 @@
import re
def validate_telegram_token(token: str) -> bool:
# Simple validation - check if it matches the pattern \d{8,10}:[A-Za-z0-9_-]{35}
pattern = r"\d{8,10}:[A-Za-z0-9_-]{35}"
return bool(re.match(pattern, token))

15
utils/webhook_util.py Normal file
View file

@ -0,0 +1,15 @@
from typing import List
from model.common_models import DecompileResult
def validate_webhooks(webhooks: List[str], tags: List[str], sha256: str = "") -> DecompileResult:
valid_webhooks = []
invalid_webhooks = []
for webhook in webhooks:
if webhook and len(webhook) >= 10: # Simple validation - real implementation would check with Discord API
valid_webhooks.append(webhook)
else:
invalid_webhooks.append(webhook)
return DecompileResult(invalid_webhooks, valid_webhooks)