from asyncio import run, create_task, gather, sleep from base64 import b64decode from html import escape from os import environ, getcwd, listdir, path, remove, makedirs, chmod from random import randint from re import findall from shutil import move from sqlite3 import connect from time import time from traceback import format_exc from typing import List, Dict, Optional, Set, BinaryIO, Union from dotenv import load_dotenv from loguru import logger from yara_scanner import YaraScanner from client.telegram_notification import TelegramNotificationClient, tg_bot_polling from constants.regex import WEBHOOK_REGEX, DISCORD_TOKEN_MATCHER, WEBHOOK_REGEX_BASE64, DISCORD_TOKEN_MATCHER_BASE64, \ TELEGRAM_REGEX, EMAIL_MATCHER from methods.decompilers import Decompiler from methods.impl.amnesia_decompiler import AmnesiaDecompiler from methods.impl.discord_rat_decompiler import DiscordRatDecompiler from methods.impl.exela_stealer_decompiler import ExelaStealerDecompiler from methods.impl.legacy_notoken_decompile import LegacyNotokenDecompiler from methods.impl.luna_grabber_decompiler import LunaGrabberDecompiler from methods.impl.notoken_decompile import NotokenDecompiler from methods.impl.pysilon_decompiler import PysilonDecompiler from methods.impl.umbral_decompiler import UmbralDecompiler from model.common_models import DecompileResult from model.triage_models import TriageNewSampleData, AnalyzeResult from utils.blank import base64_decode_then_filter from utils.common_utils import HttpxHelperClient from utils.decompile_utils import clean_up_temp_files, extract_pyinstaller_exe, find_payload_files, decompile_pyc, \ attempts_to_get_entry_point from utils.discord_token_validator import validate_discord_token from utils.telegram_token_validator import validate_telegram_token from utils.webhook_util import validate_webhooks logger.add( f"{getcwd()}/data/log/triage.log", level='WARNING', colorize=False, backtrace=True, diagnose=True, rotation="20MB") DECOMPILERS: Dict[str, Union[List[Decompiler], Decompiler]] = { "rayxstealer": [NotokenDecompiler(), LegacyNotokenDecompiler()], "pysilonstealer": PysilonDecompiler(), "exelastealer": ExelaStealerDecompiler(), "blankgrabber": AmnesiaDecompiler(), "amnesiastealer": AmnesiaDecompiler(), "lunagrabber": LunaGrabberDecompiler(), "umbralstealer": UmbralDecompiler(), "discordratstealer": DiscordRatDecompiler(), } class TriageClient: def __init__(self): load_dotenv(".env") self.sample_file_count = 0 self.download_database = connect("data/malware_db.db") self.stealer_scanner = YaraScanner() self.stealer_scanner.track_yara_file("rules/infosteal.yar") self.stealer_scanner.load_rules() self.network_scanner = YaraScanner() self.network_scanner.track_yara_file("rules/network.yar") self.network_scanner.load_rules() self.dangerous_scanner = YaraScanner() self.dangerous_scanner.track_yara_file("rules/dangerous.yar") self.dangerous_scanner.load_rules() self.queued_files_to_delete = set() self.headers = {"Authorization": f'Bearer {environ.get("TRIAGE_API_KEY", "")}'} self.client = HttpxHelperClient() self.telegram_client = TelegramNotificationClient() async def insert_malware_sample_data( self, sha256: str, file_name: str, analyze_id: str, family: str ): self.download_database.execute( """ insert or replace into malwares ( sha256, file_name, analyze_id, identified_family, first_seen_time) VALUES (?, ?, ?, ?, ?) """, (sha256, file_name, analyze_id, family, int(time())), ) self.download_database.commit() async def get_new_samples(self) -> List[TriageNewSampleData]: actual_search_query = escape("family:blankgrabber OR family:exelastealer OR tags:pyinstaller " "OR family:pysilon OR family:lunagrabber OR family:umbral " "OR family:discordrat") result = await self.client.get( f"https://tria.ge/api/v0/search?query={actual_search_query}", headers=self.headers, ) if result.status_code != 200: logger.warning(f'Failed to retrieve new samples. {result.status_code}') return [] sha256_set = set() undownloaded_data: List[TriageNewSampleData] = [] result_data: List[Dict[str, str]] = result.json().get("data", []) for r in result_data: await self._analyze_triage_response(r, sha256_set, undownloaded_data) logger.info(f"New sample retrieval is completed. Size: {len(undownloaded_data)}") return undownloaded_data async def _analyze_triage_response(self, r: dict, sha256_set: Set[str], undownloaded_data: List[TriageNewSampleData]): sha256 = r.get("sha256", "") if sha256 and sha256 not in sha256_set: sha256_set.add(sha256) if await self.is_not_downloaded(r.get("sha256", "")): summary = await self.client.get( f"https://tria.ge/api/v0/samples/{r.get('id', '')}/summary", headers=self.headers, ) summary_json = summary.json() tags = [] for key, value in summary_json.get("tasks", {}).items(): analyze_tag = value.get('tags', []) tags += analyze_tag if summary_json.get("score", -1) >= 7: logger.info(f'Retrieved sample: {r.get("filename")} -> score {summary_json.get("score", -1)}') undownloaded_data.append( TriageNewSampleData( r.get("filename", ""), sha256, r.get("id", ""), tags=list(set(tags)) ) ) else: logger.info(f'Malware has low score, emitting: {r.get("filename")}') await self.insert_malware_sample_data( sha256, r.get("filename", ""), r.get("analyze_id", ""), "" ) async def is_not_downloaded(self, sha256: str): result = self.download_database.execute( """ select * from malwares where sha256 = ? """, (sha256,), ) return result.fetchone() is None async def download_sample( self, samples: List[TriageNewSampleData] ) -> List[TriageNewSampleData]: for sample in samples: download_url = f"https://tria.ge/api/v0/samples/{sample.analyze_id}/sample" file_name = f"{getcwd()}/samples/{sample.analyze_id}.{sample.file_name.split('.')[-1]}" sample_data_path = await self.client.download( download_url, file_name, headers=self.headers ) chmod(sample_data_path, 0o777) with open(sample_data_path, 'rb') as file: file_header = file.read(6) if file_header[:2] == b'MZ': await self._executable_winrar_check(file, sample, sample_data_path) elif file_header.find(b'Rar!') != -1: if await self._extract_rar_file(sample, sample_data_path): remove(sample_data_path) else: sample.file_path = sample_data_path await self.insert_malware_sample_data( sample.sha256, sample.file_name, sample.analyze_id, "UNKNOWN", ) elif file_header.find(b'PK\x03\x04') != -1: if await self._extract_zip_file(sample, sample_data_path): remove(sample_data_path) else: sample.file_path = sample_data_path await self.insert_malware_sample_data( sample.sha256, sample.file_name, sample.analyze_id, "UNKNOWN", ) else: logger.warning(f'Unknown file type detected for sample: {sample.analyze_id}, ' f'first four bytes hex: {file_header}') sample.file_path = sample_data_path return samples async def _executable_winrar_check(self, file: BinaryIO, sample: TriageNewSampleData, sample_data_path: str): rar_signature_found = False setup_signature_found = False actual_payload_rar_file = sample_data_path + '.rar' while chunk := file.read(4096): if not rar_signature_found and chunk.find(b'Rar!') != -1: rar_signature_found = True if not setup_signature_found and chunk.find(b'Setup=') != -1: setup_signature_found = True if rar_signature_found and setup_signature_found: move(sample_data_path, actual_payload_rar_file) if not await self._extract_rar_file(sample, actual_payload_rar_file): await self.insert_malware_sample_data( sample.sha256, sample.file_name, sample.analyze_id, "UNKNOWN", ) break else: sample.file_path = sample_data_path async def analyze_yara_rules(self, pathes: List[TriageNewSampleData]) -> List[AnalyzeResult]: analyze_results = [] for sample in [x for x in pathes if x.file_path]: families = [] network_usage = [] dangerous_tags = [] sample.file_path = sample.file_path.replace('\x00', '') if self.stealer_scanner.scan(sample.file_path): scan_result = self.stealer_scanner.scan_results families = list(filter(lambda x: x, [x.get("rule", "").lower() for x in scan_result])) else: entry = attempts_to_get_entry_point(sample.file_path) try: if entry.entry_point and self.stealer_scanner.scan(entry.entry_point.replace('\x00', '')): scan_result = self.stealer_scanner.scan_results families = list(filter(lambda x: x, [x.get("rule", "").lower() for x in scan_result])) if entry.entry_point and self.network_scanner.scan(sample.file_path): network_usage = list(filter( lambda x: x, [x.get("rule", "").lower() for x in self.network_scanner.scan_results])) if entry.entry_point and self.network_scanner.scan(entry.entry_point): network_usage += list(filter( lambda x: x, [x.get("rule", "").lower() for x in self.network_scanner.scan_results])) if entry.entry_point and self.dangerous_scanner.scan(entry.entry_point.replace('\x00', '')): scan_result = self.dangerous_scanner.scan_results dangerous_tags = list(filter(lambda x: x, [x.get("rule", "") for x in scan_result])) sample.behaviors = dangerous_tags if entry.extraction_path: clean_up_temp_files(entry.extraction_path) if entry.entry_point: await self.telegram_client.send_bot_message( f'Entry point detected: {entry.entry_point.split("/")[-1]}\n\n' f'Yara matches: {", ".join(families + sample.tags)}\n' f'Behavior: {", ".join(dangerous_tags)}\n\n' f'url: https://tria.ge/{sample.analyze_id}') except Exception as err: logger.error(f'Unexpected issue occurred: {err}') await self.insert_malware_sample_data( sample.sha256, sample.file_name, sample.analyze_id, "UNKNOWN", ) analyze_results.append(AnalyzeResult(families, network_usage, sample)) return analyze_results async def _analyze_network(self, value: AnalyzeResult) -> List[DecompileResult]: extraction_result: List[DecompileResult] = [] if not value.network: entry = attempts_to_get_entry_point(value.triage_object.file_path) valid_entries = [] if entry.entry_point: valid_entries = await self._fetch_results_from_entry_point(entry.entry_point, value) if entry.extraction_path: clean_up_temp_files(entry.extraction_path) return valid_entries extraction_dir = extract_pyinstaller_exe(value.triage_object.file_path) pyc_files = attempts_to_get_entry_point(value.triage_object.file_path) if not pyc_files.entry_point: return extraction_result source_codes = decompile_pyc(pyc_files.entry_point) logger.warning(f'Detected network usage: {value.network} for sample {value.triage_object.analyze_id}') for network in set(value.network): match network.lower(): case 'usesdiscordwebhook': extraction_result += await self._extract_webhook_from_source([source_codes], value) if not value.triage_object.family: value.triage_object.family = 'webhook_malware' case 'discordpyframework': extraction_result.append(await self._analyze_discord_py_framework([source_codes], value)) extraction_result += await self._extract_webhook_from_source([source_codes], value) if not value.triage_object.family: value.triage_object.family = 'discord_bot_malware' case 'pythonsmtpusage': extraction_result.append(await self._extract_email_from_source([source_codes])) if not value.triage_object.family: value.triage_object.family = 'smtp_malware' if extraction_dir: clean_up_temp_files(extraction_dir) return extraction_result @staticmethod async def _extract_email_from_source(source_codes: List[str]) -> DecompileResult: result = [] for code in source_codes: email = list(filter(lambda x: x, findall(EMAIL_MATCHER, code))) result += email return DecompileResult([], result) async def _fetch_results_from_entry_point(self, entry_file: str, value: AnalyzeResult) -> List[DecompileResult]: valid_entries: List[str] = [] logger.info(f'Fetching result from the entry point for {value.triage_object.analyze_id}') decompiled_code = decompile_pyc(entry_file) possible_webhook = findall(WEBHOOK_REGEX, decompiled_code) if possible_webhook: # ('webhook_url', 'app') self.queued_files_to_delete.add(value.triage_object.file_path) valid_entries += [x[0] for x in possible_webhook] if not value.triage_object.family: value.triage_object.family = 'webhook_malware' possible_base64_webhook = findall(WEBHOOK_REGEX_BASE64, decompiled_code) if possible_base64_webhook: self.queued_files_to_delete.add(value.triage_object.file_path) valid_entries += base64_decode_then_filter(possible_base64_webhook) if not value.triage_object.family: value.triage_object.family = 'webhook_malware' verified_entries: List[DecompileResult] = [ validate_webhooks( list(set(valid_entries)), [value.triage_object.family] + value.triage_object.behaviors + value.triage_object.tags, value.triage_object.sha256)] verified_entries += await self._extract_other_tokens(decompiled_code, value) return verified_entries async def _extract_other_tokens(self, decompiled_code: str, value: AnalyzeResult) -> DecompileResult: valid_entries = [] invalid_entries = [] bot_tokens = list(set(findall(DISCORD_TOKEN_MATCHER, decompiled_code))) if bot_tokens: self.queued_files_to_delete.add(value.triage_object.file_path) valid_entries += [x for x in bot_tokens if validate_discord_token(x, True).success] if not value.triage_object.family: value.triage_object.family = 'discord_bot_malware' telegram_bot_tokens = list(set(findall(TELEGRAM_REGEX, decompiled_code))) if telegram_bot_tokens: self.queued_files_to_delete.add(value.triage_object.file_path) for x in bot_tokens: if validate_telegram_token(x): valid_entries.append(x) else: invalid_entries.append(x) if not value.triage_object.family: value.triage_object.family = 'telegram_bot_usage' return DecompileResult(invalid_entries, valid_entries) async def _extract_webhook_from_source(self, source_codes: List[str], value: AnalyzeResult) -> list[ DecompileResult]: extraction_result = [] for code in source_codes: if code.startswith('pyi_') or code.startswith('_'): continue webhooks = findall(WEBHOOK_REGEX, code) if webhooks: extraction_result.append( validate_webhooks( webhooks, [value.triage_object.family] + value.triage_object.tags + value.triage_object.behaviors, value.triage_object.sha256 )) self.queued_files_to_delete.add(value.triage_object.file_path) return extraction_result async def _analyze_discord_py_framework(self, src_codes: List[str], value: AnalyzeResult) -> DecompileResult: extraction_result_valid = [] extraction_result_invalid = [] all_results = [] for code in [x for x in src_codes if not x.startswith('pyi')]: bot_tokens_plain_text = list(set(findall(DISCORD_TOKEN_MATCHER, code))) for token in bot_tokens_plain_text: all_results.append(token) if validate_discord_token(token, True).success: extraction_result_valid.append(token) else: extraction_result_invalid.append(token) bot_token_base64: List[str] = list(set(findall(DISCORD_TOKEN_MATCHER_BASE64, code))) for token in bot_token_base64: if token: token = token.replace("'", '') all_results.append(token) decoded_token = b64decode(token).decode('utf-8', errors='ignore') if validate_discord_token(decoded_token, True).success: extraction_result_valid.append(decoded_token) elif validate_discord_token(decoded_token[::-1], True).success: extraction_result_valid.append(decoded_token[::-1]) else: extraction_result_invalid.append(decoded_token) if not all_results: await self.telegram_client.send_bot_message( f'please investigate this sample: https://tria.ge/{value.triage_object.analyze_id}\n\n' f'Usage: discord.py bot framework.\n' f'Failed to retrieve bot token using usual method.') else: self.queued_files_to_delete.add(value.triage_object.file_path) return DecompileResult(extraction_result_invalid, extraction_result_valid) async def decompile_and_get_valids(self, rule_matched: List[AnalyzeResult]) -> List[DecompileResult]: results: List[DecompileResult] = [] for value in rule_matched: malware_family_result = await self._analyze_malware_family(value) if malware_family_result is not None: results.append(malware_family_result) logger.info(f"Removing: {value.triage_object.file_path}") continue network_analyze_result = await self._analyze_network(value) if network_analyze_result: results += network_analyze_result await self._send_success_notification(network_analyze_result, value) if self.queued_files_to_delete: for file in set(self.queued_files_to_delete): clean_up_temp_files(file) self.queued_files_to_delete = set() return list(set(results)) async def _analyze_malware_family(self, value: AnalyzeResult) -> Optional[DecompileResult]: if not value.families: logger.warning( f"Failed to determine sample family for {value.triage_object.analyze_id}" ) await self.insert_malware_sample_data( value.triage_object.sha256, value.triage_object.file_name, value.triage_object.analyze_id, "UNKNOWN", ) return None # from here, we know this malware belongs to a known family by us. family = value.families[0] logger.warning(f"Identified {value.triage_object.analyze_id} is {value.families[0]}") try: function_result = DECOMPILERS[family] except KeyError: function_result = None result: List[DecompileResult] = [] if function_result is not None: if isinstance(function_result, list): for function in function_result: result_temp = function.decompile( value.triage_object.file_path, value.triage_object.tags + value.triage_object.behaviors) result.append(result_temp) else: result = [function_result.decompile( value.triage_object.file_path, value.triage_object.tags + value.triage_object.behaviors, value.triage_object.sha256)] await self.insert_malware_sample_data( value.triage_object.sha256, value.triage_object.file_name, value.triage_object.analyze_id, family, ) remove(value.triage_object.file_path) value.triage_object.family = family # string extraction should have no invalid data. if result: actual_result = await self._send_success_notification(result, value) return actual_result return None async def _send_success_notification(self, result: List[DecompileResult], value: AnalyzeResult) -> DecompileResult: valids = [] invalids = [] for res in result: valids += res.valid_result invalids += res.invalid_result if valids or invalids: await self.telegram_client.send_bot_message( f"Extracted valid result: \n```\n{"\n".join([x for x in valids])}\n```\n" f"Extracted invalid result: \n```\n{"\n".join([x for x in invalids])}\n```\n" f"from https://tria.ge/{value.triage_object.analyze_id}.\n\n" f"Family: {value.triage_object.family}\n" f"Behaviors: {', '.join(value.triage_object.behaviors)}" ) return DecompileResult(invalids, valids) async def _extract_zip_file(self, sample: TriageNewSampleData, sample_data_path: str) -> bool: from zipfile import ZipFile extraction_dir = f"{getcwd()}/samples/{sample.analyze_id}_zipextracted" makedirs(extraction_dir, exist_ok=True) try: with ZipFile(sample_data_path) as z: z.extractall(extraction_dir) await self.find_exe_from_extraction(extraction_dir, sample) return True except Exception as err: logger.warning(f'Failed to extract payload from zip file for sample: {sample.analyze_id}. {err}') clean_up_temp_files(extraction_dir) return False async def _extract_rar_file(self, sample: TriageNewSampleData, sample_data_path: str) -> bool: from rarfile import RarFile extraction_dir = f"{getcwd()}/samples/{sample.analyze_id}_rarextracted" makedirs(extraction_dir, exist_ok=True) try: with RarFile(sample_data_path) as r: r.extractall(extraction_dir) await self.find_exe_from_extraction(extraction_dir, sample) return True except Exception as err: logger.warning(f'Failed to extract payload from rar file for sample: {sample.analyze_id}. {err}') clean_up_temp_files(extraction_dir) return False async def find_exe_from_extraction(self, extraction_dir, sample): exe_files = find_payload_files(extraction_dir, '.exe', '') if exe_files: for file in exe_files: self.stealer_scanner.scan(file) if self.stealer_scanner.scan_results: exe_files = file break else: exe_files = exe_files[0] final_payload_file = f"{getcwd()}/samples/{exe_files.split('/')[-1]}" move(exe_files, final_payload_file) sample.file_path = final_payload_file else: sample.file_path = None clean_up_temp_files(extraction_dir) async def _main(): triage_client = TriageClient() while True: try: await worker(triage_client) except Exception as err: logger.exception(f'Worker task failed with error: {err}') await triage_client.telegram_client.send_bot_message( f"Failed to do work: {err}\n\n{err.__class__.__name__}\n\n" f"Traceback: {format_exc()}") await sleep(randint(35, 55)) async def main(): main_task = create_task(_main()) bot_task = create_task(tg_bot_polling()) await gather(main_task, bot_task) async def worker(triage_client): newest_sample = await triage_client.get_new_samples() logger.info(f"Found {len(newest_sample)} new samples.") data_pathes = await triage_client.download_sample(newest_sample) rule_matched = await triage_client.analyze_yara_rules(data_pathes) decompile_results = await triage_client.decompile_and_get_valids(rule_matched) if decompile_results: logger.info(f"Here is our results: {decompile_results}") directory_path = f'{getcwd()}/samples' actual_files = [f for f in listdir(directory_path) if path.isfile(path.join(directory_path, f))] for file in actual_files: clean_up_temp_files(f'{getcwd()}/samples/{file}') logger.info("Sleeping 2 minutes for next batch.") if __name__ == "__main__": run(main())