From fa8caa5116b98eb8b2a6ce2a62fea843233ab558 Mon Sep 17 00:00:00 2001 From: John husxdev Date: Sat, 12 Apr 2025 01:40:13 -0500 Subject: [PATCH] Intitial release --- .env | 3 + client/telegram_notification.py | 31 ++ constants.zip | Bin 0 -> 17943 bytes constants/regex.py | 8 + crawler.py | 626 +++++++++++++++++++++++ methods/decompilers.py | 10 + methods/impl/amnesia_decompiler.py | 9 + methods/impl/legacy_notoken_decompile.py | 9 + methods/impl/notoken_decompile.py | 9 + methods/impl/pysilon_decompiler.py | 9 + model/common_models.py | 7 + model/triage_models.py | 18 + rules/dangerous.yar | 134 +++++ rules/infosteal.yar | 136 +++++ rules/network.yar | 102 ++++ utils/blank.py | 17 + utils/common_utils.py | 19 + utils/decompile_utils.py | 49 ++ utils/discord_token_validator.py | 17 + utils/telegram_token_validator.py | 6 + utils/webhook_util.py | 15 + 21 files changed, 1234 insertions(+) create mode 100644 .env create mode 100644 client/telegram_notification.py create mode 100644 constants.zip create mode 100644 constants/regex.py create mode 100644 crawler.py create mode 100644 methods/decompilers.py create mode 100644 methods/impl/amnesia_decompiler.py create mode 100644 methods/impl/legacy_notoken_decompile.py create mode 100644 methods/impl/notoken_decompile.py create mode 100644 methods/impl/pysilon_decompiler.py create mode 100644 model/common_models.py create mode 100644 model/triage_models.py create mode 100644 rules/dangerous.yar create mode 100644 rules/infosteal.yar create mode 100644 rules/network.yar create mode 100644 utils/blank.py create mode 100644 utils/common_utils.py create mode 100644 utils/decompile_utils.py create mode 100644 utils/discord_token_validator.py create mode 100644 utils/telegram_token_validator.py create mode 100644 utils/webhook_util.py diff --git a/.env b/.env new file mode 100644 index 0000000..8b71311 --- /dev/null +++ b/.env @@ -0,0 +1,3 @@ +TRIAGE_API_KEY=your_triage_api_key_here +TELEGRAM_BOT_TOKEN=your_telegram_bot_token_here +TELEGRAM_CHAT_ID=your_telegram_chat_id_here diff --git a/client/telegram_notification.py b/client/telegram_notification.py new file mode 100644 index 0000000..087b613 --- /dev/null +++ b/client/telegram_notification.py @@ -0,0 +1,31 @@ +import os +from asyncio import sleep + +class TelegramNotificationClient: + def __init__(self): + self.token = os.environ.get("TELEGRAM_BOT_TOKEN", "") + self.chat_id = os.environ.get("TELEGRAM_CHAT_ID", "") + + async def send_bot_message(self, message: str): + if not self.token or not self.chat_id: + return + + try: + import httpx + + url = f"https://api.telegram.org/bot{self.token}/sendMessage" + data = { + "chat_id": self.chat_id, + "text": message, + "parse_mode": "Markdown" + } + + async with httpx.AsyncClient() as client: + await client.post(url, json=data) + except Exception as e: + print(f"Failed to send Telegram notification: {e}") + +async def tg_bot_polling(): + """Simple polling function to keep the Telegram bot running""" + while True: + await sleep(60) # Just sleep in a loop - in a real bot this would check for messages diff --git a/constants.zip b/constants.zip new file mode 100644 index 0000000000000000000000000000000000000000..7356da87a36eac494a4a8f59fc3c45f5b776a4b0 GIT binary patch literal 17943 zcmbVz1yq*X)-@p^-Q6YK4I&{O(%s!J-5`y0cXx+?f^>Ixr_v1)Qa^aE9`)!w_xo|Y zJidc5=3dWSYwcKjO=)q^Cn!LVKj=^WRKNb`Kfj=W2!QkembP{}mUgyu_D;}1z)v3k z^!1m5JS-5zLC=wq1=JU#$EWmH97rHYAkJTKkbc6kF)%W4qP24V9UepPkq0K#cO=Rh@>Lsh}l#E9#$7wRA?&^{0!SURZxokJn zdesTS%ijZ-1#XB3Yh&Jukhq{?>T++&;60 zq+Q@9q>fy!XqWp{t^t&4P15Woc$L(oy)FtWzZxK>uqZscoaJtl6@7;Raci6$(6;_qHeSZ9(;4 z?6a9)&%*BWJdxc67nyn6J$_2u3d}>ly_yT9<;1@2=_?S9RckMD2`nuR0m)l8vO*0E z1l0KO`O6oNz7qJw7hn&*(ATlk`K>!&JkbB>&d2AUA)bDNFb5d@A7JQyu>|q~#=^kP z7@+_A*uwsgG3eio`CV+G{eYrxpa-z9GBG!>vHh3$dc@RtB_7;HkI;0X1jC_HfcNA% zii5RMcv=H(aEHSf_z4jc)b*ib(ib2IkSk3$CNg38FJoSxDGdF~58a6q*muL8pI}_~ zN#Nayv%}eYP6{6Cv7__CPXqf&O|3rN-XR&`kUYye<=@$P50bd3w%zgB z_~YUrG`|rR{mJuZgL=?w0MF077PFheM*vXfozI49@|Qrlrpg{o07fOL>=+l*7UDK- z0EQU!)`);m(}r}6dKQwCf3&*XE$E}yrHkO<+aCPP5B#IonSS9K_6L?sEUe6b>o(&5 z5dKK~pOcyBS126|O9NXI9j(9F{}=Kf@eRTnp&pXk_nNyiOA>^^FGG-*uc8`Z9-%FA zo(bZjdq3}Jzwu>ghT8rR%yL-J637d02gecC%O0!I4wXEzb|QF9S<<%`tj0c(JcXx3 zjoF%6g;9o$yBq5-Tm-NrbuWYvT6;N3Xqqx^>e1}MO-Lw}JvM!XVl&3(z*+@hJKjD) zb1d2%ldPLZGDetR^l(@ODpmkaUIzrAdnB=_hi+LP3F)R_VOMV{SDafu75vChrzNtK z1CUhHfAvVaMwVxP!vpPt-)a9{aFYMZmAQeDj-IoYCBP0~W?=a@W&gq${v%^jd{7VJ z+Hipytf~4nT+cqrylg;=95J8Mj-r|UymBa6tT@6kI0RFrN8}tK3=n{xxJZdlnA zwT7j1vF|x-F}y(%*MSm@afN9nbxv)Xj3;shd1Q@}<%3(x18d^{l{La&S^NK@D~u1i zjULAD@EVnm4toIi$#a;me8ELl=9SH`W(PvtH!e;*O_DoVTJL9y-;-k zN?2+qjrJH<(Q}exTw+)nvCOYl8S(B%kxv#$9d=Qea74EJSE2iw&18%sgtEQ19f!6g zo|2WuYleNvU2xCA zWppVdGT+#QucAk>ZL8VRRz7>A>hP9E>E8qFAIWItY-?f;_#v`v{w1^?linbVA^ef6 zi#w=bHPu4;lqUkD2^g3~YE-^iU73+7wLya|&CS)kkuA};OXHiHNDw2}TkEK97#tKS z4aPFHy^sZjpec*{{jj`u5U7AgdjZJC0!~m(3i|l!3=j_Z!<8^D<9nY)}rW*Y#g~0LL6SbYQEm2$(GL{=8b%4*FsczFOY} zCxJSzkU^)VK0}&P|8%=)Y?QU5H%hIL4PBbbVEO{ z1D&s<#U(pfJV>{-vi9S{at{HCKOW(z_VCj251&8J-hZ2KXJevcWbnV4|42=2#4B-7 z287PTE0oN9G#8{DLTTwEa#T|QQNNvIB^S%Rh*1CeICg$D)lMf4$km{0w6mV=SskeV z6d#2$^A!ryE8{|%%-xt}{lGxR}XRL!0I4ir%K+=DVQ(wZd$Z#*_9H}G^Eo{37C zIy`Z>WFWjy+7z%@U`xfCactB6V_#T%WMnlEyn)Zt@v5$qW{rWs!&pZjg^uO9lb`Bu z1xGs5xjbb#9BHdx@8a)g2scwgg|^Agw|pKjb+Ho3aOP zMWu#{HQ+aFU3LZb5UbdG=30!a>L5Drnd*y4Nl=FLF6*yas zfSqj7Yct+5ZiuWw8a82D$>HE0+*Z}(r!MeV!x9LWRI-TaLUdv>!Z;aSy^z;<*D|Z1 z7`(6V=*|Jh+20>!aHLtKlKyEny0EXw7b%}Ku>v-wS0P;f&195df^#7|Jt7}CM1W4S z5|Wv*GAP8ojDl9+R;nUA_!&pl9&5mdMM2@JsVRdbu(%Q}b#dHhVhiNL)+@9zLF<+a z81ryaqSu1P3B}zOM=>?Q%=3L5pAPiEl0Iiq8QNyuzAsYejM5KTArf7pibahe*|&-M zIMlg7$lBj|6*zUXK(45p5JjT+Ixwpx(y~z0rgTyrWn~s*XmXZJ?CpvwCQFUzS{8jz zFNN&|^#b_?-odwV(%*iuCr zaq)KDMMhw6O_N9_)Y;(mo$V262fcEcFgAhMF?dXH2wb|ZEPJPCZlSSJsV^5i2 zZw?VG7r+*@?6Yb0S~Frt^~^!7Je`r@9cria?ar3JMFE7^G}6AAZk!d-Vj0Z<5VCHG zzCk!JgJZv~RpA)k*2POdMf8)RG#iDB&qgn}z4F@@UYWI5;$6Iu%{JtZ2ptjo;sxL& zK+0>^NruI=Aa5G&TDw+MWe5a#ZOXcpweD2{Fy`=Htv$TL@Pr-^EEAk={E}wY4J)ar zB~rbLOTUI()B+j*d=ly*_>QGq)SO4-n?rCblFDU1xE2v@rD!K0p|(21q}4$A1Yr8Y zopuna+hcv8FB`pcJ_*O_J$F_*MNY2nF63bFtJcu$V6}^92Tr)G85r3U`K0UZ#zEEo zFaSdr2AKReW#@HynI7N2ESOS%i-d&1lSh zusvFH2S8br`xwRg%d(`k!|TXE6dMrcY~F=doZAg=i%jNr28TBb=OpIKO^Zt9gKALh zFz9od-LYSdk3+{&T(xUGJ6%D&SnDCQfo*_P^`n(ry;|4qn(M<`^Fiku_uaAxC`PZ` zVRovR3jPqcqf+HpHVqaWty}!*$u-^Mn$j75d}#7;+E+pPaZUO2OlD$f2(Yy?&@unF znQZl8*D)Bc0T`!8@jtw#DhUd<9P5g+@$2Z+MppOqg%Vj%kaC++7T@b}*e2?D-Q9hj zRAL^8OK8mUdAFh5J?wU(d)=NmmhcpOqbhMukyo+$_O27Cls7V+Nf)05#n6tpfv~~vl+-#3edER6gCD%uvKubvq)SVYs2r6FSML|gN`C~_*C~P!4JV%nV zf*35G%ZBK)CwpsScUK9lLBXL1m9Cj|GWo`?3^r`7u=6hiv~j$)&UL)7bsQJUj-F(+xaIPGE^m zE;wo#h=ieLAex(6IrjO{A`qrM_7=lN1qEhKGq9N_$Xl&~RrOVu$ZM}iSoc0Q5^<$5WC+HrQ?s|kx}uZwL3i=W;OVu$CqT4?Y@a>0;U_>}T| z1~H%!y?DkfVeES>8Ih}zA*;6Xu9>2wAQ4E8K6-L;nt01~bu4@9NCme}Q@XtSqu{ml2W;M5{Q1}QoE)HL18lh;Z= z(O9RDqZ_eW9BrsGg;9{b`xu0Y7rh0zN*k_(-?|n|s6%$0fW%qW9r`WMXO9tVnEJ-1 zn`D4GeUwlG)KMaAX8DLMA*09Mb>`DyXydi@kV~}oTh=yif+sX0kZNS$EM!)3`z8jp zCZx!6ht{n)7D-R7f@W>B#H+Yr?Nl&s2E3pT3}T?!-VfYcLP+Pod8(kyCT0{-#ZER( zBh5P=K`x^Zx<||@kVv@1$|13cV)Nnbbp+&7+6v) zYnpTiNiR8aoyrE{tmbt5w6PhULR7O6xvRsGU~iWqPPV63?wZN@ z@_~*4nL#T4tDVe5a5RP?OMU1iAi{k`SlxrZj3;GX`&!*!wOs_*Ek65Xy-vyCk3r-N8gad6cV>eB z99@E>P{ca@=nFw|ZapELUs{E=bBy5K#JjxW#<|_5)RLu!t&6E{hvq&uHk)KPt+E={ z!i~O@0h0pUFQ(OLY?8X5crhzXSRVsV^K?eqk)9{X@g;`^gz@L@4cM)}iOY?=$O#ZB z)q|?HMr1iI?4?kv_hR~F2c$6;OLPPKn6yb`IW&r(KtRE;-;(z66u|q-4*s!#X{+;4 zP5Z++fc+syAFHX4&%djwG{2xc9!l-)Ow50OmUt{wK4P5y*F!1XcMM%~9ZR!+Ir3jT zU};3Onzqx!_#b*eEl2Uj%Tef!^b^>P2xP{?cF~Jc=NlH{lK2?_n(G45)p4KPk-lyw z@wuZ+PSUQ4J?C`NpWSuzeZa zWB>xzfVUEp{4rPx<9nkP9QBB5BB5PQN=@GeFp$va(F^^<<@Hvty0Rs%bM4`w20al% zns155#9j) zeDfhFuCZ6*YY~Ph z?Z;;Sc*Nun}^_UD2F#&=r3*N3(K7iJ#=KU7}QY=Iu3@vs~tB^Y+T5yb`4d^__gYz%A6!k8|K z84gt>DPiv{3*mhuaj({HHKC!Sh4V6YqcaonD3--@;~K>6UY1mJ8+B)MNg5JM(_*!i zfhsY^&5>511Wi)k!%*pq_4H-GQ?*NzhLND&!Hxcqux6{whW+w)(syp4b~?)OWK9W9 zHAC|+Dgy9Tb@*``4HPb`m#LkYtKIrCytYW(C$#aRfFst*Al4>{*y(_YFX$c8c+=QU%u%uQHX)O$)=DS7W_6XQ zA@s}C2r`LSK>3HlCa!sxV6!{4j>a)G@oGi2n&!8-Xg;4%vEb2cQzf9;l(+4-v(SWiSkjtJ-%7Lo# zwh{=YaU69PqxNIb(>1Tx>c}8UrtIKi<_U2|3w+=uc!pfiHSjQiF{~nwOf_|%%2=cO zeu_>b3frU#K_J%wnkTrh@wt5sc`*3@z9X= zSWh zHZpFFr2Fy>;_`Yhz!y8@gMjfif=@(H$2w!Tig1&3Lpln2a`R&@K{mi@Z9L==0>ppI zC4#?m$;4IbJ%I-Q^(pYPJk z;Av7rqxlhY!$f_xFNc!I;^GupwveKR8;yWG)Ps?NjB16Y=zwjPIIAy|LHXm)Q2i8hRFV%>)x8WUboqu@nUk}FnYuir3x>as$bid+aMOW%23!qk z{NOEVrkp+1It$L<3Wrt&=3KCsdoVgP(HQ0A<@k3D4*I$n-+rO za^*5hqPj-UZ0=rgQX8;iP!Ic_($WVa6NW_R)0sOEJ_^(sk<9)?nE4ZD(!v&9xj>Z4cFQ5-|oqgCK zls7Mm-vv)+D=U-~24al#2vhYgMB-VJ4^vo34O-w~8tjKXS~&H~p>W1jdl+FpX<5Jc ziJ}+@j_ACAXfmUDX>8}gk*Z)qxLUR1Dxh&1E7`npX0BKZLg@*4W@%KEc9FCtJauHw zUWu~KcR25RDRHa%Wm0mT@^#~b=PQ;%c=Ib*AZuWg?nG)T!+U0KK zK^p<|)S~64tp58H?l;Qsav>eX#<8z7g}rjX)A^|=KvKcK7$?$s@bDB(oUizB(IRwQ zVI7q^xw(`fyjIp@rFy#{fm;M}OcJobgN7gWROuWoy38NL#zMM|BzHRNTJa$&<-2|0miWJh+mnZIqcyN} z_+6#=TmQu4t&T?DTR3le1n`wbM_qC!uRK%O4}ROP{Ggc?%xLYm=DAin*j{t1jw25& zwvA3q+$f-g>Y)0*dKncYDK8~~z9VcaxQn@myLhLRR%z}T$?%Icy`ijcSd zZ~4nfir0{zHY(T%u2_RX(kOMNcu_>V!QCTnwUslMUr`a^(T%yK>0`R8(LAjrm~faczMM<7;SUCqp{`QvwDTM%R^_H5xSLz+RJSiKC@d=Yu=aV>h3Hm zc5av)BMMXPj~TOe+q^Q1>DtjcTp9DxUOqZRJ+7B>D40_tr|Jm9jsA&cHDDg@}Z4VxgFDHxj)5qJ#!RzO67|Mbk*8vQIRY%zFBj9hWY^#dUbxR59kX_Q`vCLT)|4tLQp( z$k_;(sVcIk4K?%cBJw(6&sMC6toYNp zeIi->OSAhl9&W;Eqo)M?X{7c@F2Wfw@6%Bc3i67KlSrM5wG;9}6Foxt5I2vB22H(4 zc3UHJUcBs%w{Ki(Ael{yk#PG2M*};-Qx!r2WQaSh|E|j#vOu$vuM|SISyE<<>7&TX zaKz4xQ>uAk(po52%kI8JkEtwiLtW4=5qKy*DB~xzYLOzsPSOB|o_rGVO;L^N;tu=> zWs_&1#-HbCAx#A^J6IBFgV!kIV1?#Gn@i5m6LvzQt`_o{)I%&2C{BIm-Hns?1cKeW zpb_10y#k5TMI6}guC~rTPWA>{i(C%|m5`4Qe>`%IQs8}9N+-H-n&_}EpF;pS%LJwy z$jG_7vAftEoo|!R%)2_=tk>7PXmqi4b79T&-)JZjx0D+iy-9I2cSjU+Ib3VwIaJq2YIT#ka{JB!4*t&E+yEQ_8iB$U_FGkJ$^+xM52mSEmUTQ9b}wBrXJ>}?BP zXMVNIeohrd19Fb@B1(KEoahogk+-4ay^Np&Ol-H4BC|8>u3N^tMD%G6pC=_jSm1M3 zhu!!x=NhE5_Ojk3lG#@iI6l~9t5!IDP5$_2tYHz&<+?i+1W|Vd=DhZM`BzyPlEh9d zbHF}Cudc5+EjbFygHKE`ELHW;EgKSHnQ-~x4NplRa%Gq&=@&vaX!qv)S#KoBpJUwR z6T+_6;l#4c(906r%IC0R(TOq&$3so@Y!v`=K?jAHKGVMLyrcj=r)b1J*b&aayB)nA zR}!WRThC6{>?{wz2NSbqhO+Fgp;X_<0a0%E-bOK8sTL4~DZ!-JRkq@c z$NuQij4K2K);i4Se6|=JC_G7pTE?w{c*mua$1Sy)#aAg-MRUuiBZ_2>&Ul zyD?DZJ#P%S&xoA)&01=h2i)m`xa44W){QwP^y!^9}O0N zy4Qdu5{C?+UlQ_}X@}Rwl@gW+lV+QO>D2{B29r|M5P2aR)TvZibafn9lvo90k8bso zr=?`q6Qf=i*S(yJ_jVP-BA1|~tE-`YWi?p!Rth%cyQqn{bLHFKdXQ*@g?xpXN_)4T zMqKTz=w*0q;9)-!1xQb7g`_`g39^On$R+WlbJD}J3hL${J+YK%snLra%+!o~6R3nIc>FH3tYcqw99xc^W?;%oMJVsY5 zcol$U^E~A@NJ?#~Nn*O`M~q*SxULjd`Bt= zh!0^JmIEE5BNy0Kmnc(20|Crq3ZAxT65;X4Rz(c>Sg6tA+4&J}7JjmBu z+j)zs*PMtOe)8cGN{5Ly+qc>@dv24NOy`Bd23W-MMJP zcVY#)I2i=0w04wGWNz1N=110?_yJ2!FbpX8M*3Qj9`1xZUeMPVx!|8-Z$I)^8!-#h zl!+N~5l0kLfq=tc)DhYlkc@;xC2*H|MzGcJ#qa~c9K?yfx~YaqR&O&#AtHtdd!Yz6 z$9veCxv-o9Y7^~+Md<-j@cf4C=+1h}9ndWv`RSNqI?0xHl%P7rV5IAz`$h{+k_PNx z@hq&!O!1iEBc<4sS=8{pn;(8~#|P|nVc{ejml;m68+%T4x`@7jg!FTn#`W@+FfNZh zz(D?!tXsmDulu>V0EzNP8%d+AXOK=WSQ1!vvDihZpznEymF|qsMti&Q9eL867%I8Q z^4KiNN!kzNI`Zx1nmUsGn-Gp%UY{y|kzya676(BAD13%*=U!~tDYwe*4E^jqQDqS6 zQxhz7^rUH9-q@&Zssrl{ed?^9m2wBe$gZGdmn5?(1DrZ$jbIT46G4xq#Zc2rT&9H+ zN6Opz8BdT{B;)6vM8_rjI10Nkx?a}Ia6|YWw$@y-_8A=u43@MiH1P?Uy>}-IvBq77 z-Z4u>C}X!&TEb*!a6yO7_nHUBT`d{l7 zJ*j|}-97CkwxWb@+`imz?uMfx0lcjskvo1v zpTGC2J74q)J%OD49KQK&c6J@pTM=1eT4uJjmu+xxFU+S$GwL)_>sZ)5^vOP{=ld)& z6=p_WX__X%4#bWT+>pOGG^GGL9Lmdve^&l#>DFz{YR+77r0L6x#=BaMOSHTDOHJ-0 z4vl*mr4$0&8`h33*LG zs&X#l7O-V4QRYBd9htP7rV$OnT?-T|ibHR-H2Pu_Ul(2G?f^3YcG0rKd{%{kk0=`l zka^#9&|5>Y!#}S>C(T{iG2A!LH5%@4$QWry<9C8@uzqq_FZe-Iw|};>x4a?Hyao^d z<~ES5gy0kddY-=9DiH+JP;f80JCt?a(}58~AWf%A;39+8bWG>7wMKL`2t#+JY30Z} zSw(bUBc7gtI?sbM(z`QCOQ;mLNgfu|CGE-4LP)P?i%)gAI3zf<*Ox{zf}vt|Kslg? zbgCl!@?YSLwq5!2%&a)FGM^0OCHMPrDkM2c1(N0yIgBPjNwaD>CM#=*8Db#T3nMNw z9(S)O<5X}*(^^e63f+{K3smKHWoVRu=dntASLU5A$pqwKD<=k|lNN7tAGaV_@Mo|G zS6Pp$_~IILvK-(#P!Yn~AR6=3EG~_?z`%qo*kh3a516Zbu&*;gktJzecNq}0%x<*! z?6OcxrxCntE9oC8y8O^j61$cyp1_@|dAN08YoAF~aH}TUV*DcE3H;p~W_rid=D6A! z;W-L=|GXxj8TQW3Kvunb3Kg>SG4D8{g7rfphEu~aBBQ0ZwgloDDF84;_AHnAqPl?S z1>({U!Xgo#e3n`&SwifK(|uhx2?efoB1AgL3Q7SZj5tB;{cvTpeZ4z|!|)_Dy%LRM zMbYQkJl(Lgx@S~Cr)s5Fywr%2$;$`eS21+CAW|CR+6b1CiIUvUUzr5L(j4Q=4)kD& zn;*YW#z>XNqv&6vM_ocEg_Z3!9P}QmF2Y(vtx}PT3)#qJa=T%=Ztv3@tVgNm3Sye8 zd{tm-jyMEv=%E3+yMVNDR9hqAI7eoRYq?2a#?ll;Mb*oVZKZpd`QEhc)@$c~k1g=BU6+;>h2dPPFve>qoGp6!|DzD(03~v)uv*z})~!_Qj3% zq$3g?9AzZQu-HB;QwwQ;4Q{AWqL35^bB3Z(XYogS6V@Zn{1 z=Qk&p_mRf>a(5-zbBzIpRbTS_v~ULby(r%<>fx?F#guARUEGaazd;&tkkeh%a58SM zI?{paTY5+(9Zymu;%Y^kp&&{7`q@F{6X2jCv2AcTISD(aG@2v_T3Mg68F$>EKC;)J zZxkB^s(l!Ewm@iFwhDLh{k%a_5%l;XvWM6P_0jsW*T(W%+l*XKmvzXQhHOkK2l4Y??qII+q3+N{ZlBtiI`{T?MM;4B95b* zPl2f+ldOW4ii+y(2%1o`-kuihZHN0kqq3*RyU!IPOF0n@7&}*5uh&IFsdlNk_XjnK zl$kf;wcQrMlnRDHf>N8({mlxbmd_xrz3DWIOdV5PQD7J~j8G3Eyz9fWVdcB?o!0V8 zsHp4D2w}w6tyhlk`ly^NMafZ|Q#V}p{VTLBn_t<}v$JL;8w50-b zQIWH!&EFhp55M)k;2fsvQ$lxqVVu`!4r#Obpp%sfxsfB{ll)wk2%Tx1f*SzqT3abA zXp{UdcL%hJhyMJO6u)svC@uFoZ}HWZyiLPU6f?wPec_?@o`np}nt)CbrxsRA41q}* zzk@NI*=%&X>G*4w{^#_s+7(T@dn~!u)TLpwQqQ;o? zIBEqk0SMDGN~$VGU4Az;#Qk$zOJJgYRj#_O^# zx(!2+WTwE_vXk4SBU8okCX;9@SjmH2vJ&SSd!3h{^Fph;Xr(peDw<|eM6PuG+lG5x zPK|~1&!*%hqmHp7i2{>pNCTToZzpgiH<1Ep@t9JEX~E)K`c|i~ES|NgmQVz$7__)WEU=(wWDV`|Lj>ehc5aNki^nm-7fo2oI_0&1TJ22si)SpjAepRFpQ zFGY}ORT#EUU!!>Ha90tJ{_%8vP46P1Q>wWx`j8-WrmJSW1FlaLGGuvQLjs?_&94Zi zj49Uj%#DlQ_E2pic@E4%S6Nk6YPg2qm2=J;r5_DSTp1qK^%M$&cD@w7wGcDGI?E^1 zaYFmeE|TjL`M@N>QISI$guFEz`yk~0>^_Ag$@tNnvR&|ADHoeQFCZNg|OvYphBLncMxI6@8Q!v_$19F@5y)7QGI3H#y zm8<0Y?k(j5kX)n_OFtn_=6pVp;}sSf+uqjMsk*mjDl}hwO^KJ#SfZwe4;HLl!zdxg zi5kQmku$l3N6mEGT#X3!mI{+JBO?@(=Tm2B(Q_m9rnv6yJp7B7dNLZ}3 z!ON=L;@1pkUSTlu$`DbsvkcVk(|V_^*2XX9M;|l>>o4wgD>$u8tt8NwtJThy1D0qkqfBZlD$d26Y{kZ$A8ruKDJKp&8SCz?r2w zC0y@~y0$2@h2hdWLIE^4c9{1GhA-?qRxcL$mm&4Eu&Xz3=D1zu`LveiT!Y%Isb6-> zLb9fN+ya~|KpjOorbLF*L>!-J)K#rZ1=z_*=LtrNve^?90@DYB&P-O{OHB{2=F=Sa z2dM-s6? zCx)0NX2*l`eXvpC$4lh<`t+qzpQIN!FoltCYT-X{zFIuwF5j6Wi84w?G$a1Z&be>+ z0+|}vtXZ$3bc|pySML6%4Wgd9hPT5{@_OG-lJVtEL5+n#Kcvf4ANLoF^KEg5LqyHjpw2@Wk^Td?8r9L+4bqlb$e?1<1_o{_?ntrI@3%4T=MVW0>kmQfF+VA&k6IXz3}@cNTe~$)gfQFP zkDF)T^`bNnhRbr#XiXifUFtv!3U<{Uz|^FuBR7;M=2F9#Qe`g(4In7WeHq@~CpyDZ z1k<-FR*&vS{!(fZGvr&Sx5GAD$V-9XoZbQ4X*2G}%-o&32)$#9S%UH5JVc-S@+1cu zxy0>hv&4sZCk<_=OdUA~S0Of`i#S+5JzgAH?Nx3iB|SN<%I7?I)zHt~nzV%=XyERH zMVp{#GT!w$%?ql^G>3T@@(&gAZ?*kLaCbRLz&t%nz%60`UhA(cW&p*FE zQa)NczCQ3O^?dt@yWSkkTBR}W^}T`GQ8~VxgG9J55pzJ@S~Ki1G3PU^5Co+NDQqi! z08l?xb*9=vmsFas0TjGpq)V|;*{;{#I5#_NE$NQbaQ>6s(i1l`w6+DqMzqucic&(h z_bRSDn@`sba77M;h0GNTkx1MMPX{nVNNLTPvfqDs5pndAhZ1%#(Njt};KrEpOvjvV zFO$(HZN0WZ!a;qKpb)n^3gDwBul)XOT6(F#B8~%6ber`sBzpkf66|%#sH4|q4heVc z4C3Be_b!`!r*rF&*mAjHy05%4aAHA9@fRP+vXkW&OgcW(=z)SjqTLW#SvHj@6urH8 ztq=Sn=+YN;A&_o9KCcaUX{03Go>nwY@r>nnAqngr^XgnW20;P+ zlT;VuF!y1zC!(0l=>UEf67sRv`q5>?vLWs z2eiL5l0WnJqZAYE2jkv*<@|v5y$$B~BGun;_)jtCkCyg5^r?Nb^!L)v2gt|!z;AB( z=I$RQj6^?~i~103KSBO)63<^u|2EE#qCfJV#^D3|Wt{(uJn|2t{ivHG{An~t@Lxvz zpJau980JU8+0W!sN>Ws1W-( zK2e^2!}?cI+b?#0bMlY6sh_dOzhnQa;_MgfM-Kj}y!w#?OlYA0XsG^i8UBFvD3|(? zgGb%MkBSE5pTa}qJJx^KV?6*r>al(W{;|*dXW(ZD-;Dh4@`yhG|JaL)_>;w^$lrkf zThsV2u#cqvvyt^jQt{D&{@m93fc95A>)-S8KO0(qL^F6m`$K!{1J+;K{-3AOAKNwI zetO};?^u6qbA7=4aS8esoXMY&x$NrX|y}(NFZJ2-S z8TteCk8LPVe&Q*P5a{>qGQUB3Y+L#9AOQ6fk{Hp$uIkTCEDv~(O)TFghQ~zv=kejk z)Ij2R!282t;sNZhL&VRy@#A(7|0m;={s#N*see0Ed}jdcVQB;c!h86q3U^@{)i literal 0 HcmV?d00001 diff --git a/constants/regex.py b/constants/regex.py new file mode 100644 index 0000000..b8ed41f --- /dev/null +++ b/constants/regex.py @@ -0,0 +1,8 @@ +# Regular expressions for detecting various tokens and URLs + +WEBHOOK_REGEX = r"(https?://(?:ptb\.|canary\.)?discord(?:app)?\.com/api/webhooks/\d+/[\w-]+)" +WEBHOOK_REGEX_BASE64 = r"(?:['\"]([\w+/=]+)['\"])" +DISCORD_TOKEN_MATCHER = r"[MN][A-Za-z\d]{23}\.[\w-]{6}\.[\w-]{27}" +DISCORD_TOKEN_MATCHER_BASE64 = r"['\"]([\w+/=]{24,})['\"]" +TELEGRAM_REGEX = r"\d{8,10}:[A-Za-z0-9_-]{35}" +EMAIL_MATCHER = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}" diff --git a/crawler.py b/crawler.py new file mode 100644 index 0000000..d4b0c62 --- /dev/null +++ b/crawler.py @@ -0,0 +1,626 @@ +from asyncio import run, create_task, gather, sleep +from base64 import b64decode +from html import escape +from os import environ, getcwd, listdir, path, remove, makedirs, chmod +from random import randint +from re import findall +from shutil import move +from sqlite3 import connect +from time import time +from traceback import format_exc +from typing import List, Dict, Optional, Set, BinaryIO, Union + +from dotenv import load_dotenv +from loguru import logger +from yara_scanner import YaraScanner + +from client.telegram_notification import TelegramNotificationClient, tg_bot_polling +from constants.regex import WEBHOOK_REGEX, DISCORD_TOKEN_MATCHER, WEBHOOK_REGEX_BASE64, DISCORD_TOKEN_MATCHER_BASE64, \ + TELEGRAM_REGEX, EMAIL_MATCHER +from methods.decompilers import Decompiler +from methods.impl.amnesia_decompiler import AmnesiaDecompiler +from methods.impl.discord_rat_decompiler import DiscordRatDecompiler +from methods.impl.exela_stealer_decompiler import ExelaStealerDecompiler +from methods.impl.legacy_notoken_decompile import LegacyNotokenDecompiler +from methods.impl.luna_grabber_decompiler import LunaGrabberDecompiler +from methods.impl.notoken_decompile import NotokenDecompiler +from methods.impl.pysilon_decompiler import PysilonDecompiler +from methods.impl.umbral_decompiler import UmbralDecompiler +from model.common_models import DecompileResult +from model.triage_models import TriageNewSampleData, AnalyzeResult +from utils.blank import base64_decode_then_filter +from utils.common_utils import HttpxHelperClient +from utils.decompile_utils import clean_up_temp_files, extract_pyinstaller_exe, find_payload_files, decompile_pyc, \ + attempts_to_get_entry_point +from utils.discord_token_validator import validate_discord_token +from utils.telegram_token_validator import validate_telegram_token +from utils.webhook_util import validate_webhooks + +logger.add( + f"{getcwd()}/data/log/triage.log", level='WARNING', + colorize=False, backtrace=True, diagnose=True, rotation="20MB") + +DECOMPILERS: Dict[str, Union[List[Decompiler], Decompiler]] = { + "rayxstealer": [NotokenDecompiler(), LegacyNotokenDecompiler()], + "pysilonstealer": PysilonDecompiler(), + "exelastealer": ExelaStealerDecompiler(), + "blankgrabber": AmnesiaDecompiler(), + "amnesiastealer": AmnesiaDecompiler(), + "lunagrabber": LunaGrabberDecompiler(), + "umbralstealer": UmbralDecompiler(), + "discordratstealer": DiscordRatDecompiler(), +} + + +class TriageClient: + def __init__(self): + load_dotenv(".env") + + self.sample_file_count = 0 + + self.download_database = connect("data/malware_db.db") + + self.stealer_scanner = YaraScanner() + self.stealer_scanner.track_yara_file("rules/infosteal.yar") + self.stealer_scanner.load_rules() + + self.network_scanner = YaraScanner() + self.network_scanner.track_yara_file("rules/network.yar") + self.network_scanner.load_rules() + + self.dangerous_scanner = YaraScanner() + self.dangerous_scanner.track_yara_file("rules/dangerous.yar") + self.dangerous_scanner.load_rules() + + self.queued_files_to_delete = set() + + self.headers = {"Authorization": f'Bearer {environ.get("TRIAGE_API_KEY", "")}'} + self.client = HttpxHelperClient() + self.telegram_client = TelegramNotificationClient() + + async def insert_malware_sample_data( + self, sha256: str, file_name: str, analyze_id: str, family: str + ): + self.download_database.execute( + """ + insert or replace into malwares ( + sha256, file_name, analyze_id, identified_family, first_seen_time) VALUES (?, ?, ?, ?, ?) + """, + (sha256, file_name, analyze_id, family, int(time())), + ) + self.download_database.commit() + + + async def get_new_samples(self) -> List[TriageNewSampleData]: + actual_search_query = escape("family:blankgrabber OR family:exelastealer OR tags:pyinstaller " + "OR family:pysilon OR family:lunagrabber OR family:umbral " + "OR family:discordrat") + result = await self.client.get( + f"https://tria.ge/api/v0/search?query={actual_search_query}", + headers=self.headers, + ) + + if result.status_code != 200: + logger.warning(f'Failed to retrieve new samples. {result.status_code}') + return [] + + sha256_set = set() + undownloaded_data: List[TriageNewSampleData] = [] + result_data: List[Dict[str, str]] = result.json().get("data", []) + for r in result_data: + await self._analyze_triage_response(r, sha256_set, undownloaded_data) + + logger.info(f"New sample retrieval is completed. Size: {len(undownloaded_data)}") + return undownloaded_data + + async def _analyze_triage_response(self, r: dict, sha256_set: Set[str], + undownloaded_data: List[TriageNewSampleData]): + sha256 = r.get("sha256", "") + if sha256 and sha256 not in sha256_set: + sha256_set.add(sha256) + if await self.is_not_downloaded(r.get("sha256", "")): + summary = await self.client.get( + f"https://tria.ge/api/v0/samples/{r.get('id', '')}/summary", + headers=self.headers, + ) + summary_json = summary.json() + + tags = [] + for key, value in summary_json.get("tasks", {}).items(): + analyze_tag = value.get('tags', []) + tags += analyze_tag + + if summary_json.get("score", -1) >= 7: + logger.info(f'Retrieved sample: {r.get("filename")} -> score {summary_json.get("score", -1)}') + undownloaded_data.append( + TriageNewSampleData( + r.get("filename", ""), sha256, r.get("id", ""), tags=list(set(tags)) + ) + ) + else: + logger.info(f'Malware has low score, emitting: {r.get("filename")}') + await self.insert_malware_sample_data( + sha256, r.get("filename", ""), r.get("analyze_id", ""), "" + ) + + async def is_not_downloaded(self, sha256: str): + result = self.download_database.execute( + """ + select * from malwares where sha256 = ? + """, + (sha256,), + ) + + return result.fetchone() is None + + async def download_sample( + self, samples: List[TriageNewSampleData] + ) -> List[TriageNewSampleData]: + for sample in samples: + download_url = f"https://tria.ge/api/v0/samples/{sample.analyze_id}/sample" + file_name = f"{getcwd()}/samples/{sample.analyze_id}.{sample.file_name.split('.')[-1]}" + + sample_data_path = await self.client.download( + download_url, file_name, headers=self.headers + ) + chmod(sample_data_path, 0o777) + with open(sample_data_path, 'rb') as file: + file_header = file.read(6) + if file_header[:2] == b'MZ': + await self._executable_winrar_check(file, sample, sample_data_path) + + elif file_header.find(b'Rar!') != -1: + if await self._extract_rar_file(sample, sample_data_path): + remove(sample_data_path) + else: + sample.file_path = sample_data_path + + + await self.insert_malware_sample_data( + sample.sha256, + sample.file_name, + sample.analyze_id, + "UNKNOWN", + ) + elif file_header.find(b'PK\x03\x04') != -1: + if await self._extract_zip_file(sample, sample_data_path): + remove(sample_data_path) + else: + sample.file_path = sample_data_path + await self.insert_malware_sample_data( + sample.sha256, + sample.file_name, + sample.analyze_id, + "UNKNOWN", + ) + else: + logger.warning(f'Unknown file type detected for sample: {sample.analyze_id}, ' + f'first four bytes hex: {file_header}') + sample.file_path = sample_data_path + + return samples + + async def _executable_winrar_check(self, file: BinaryIO, sample: TriageNewSampleData, sample_data_path: str): + rar_signature_found = False + setup_signature_found = False + + actual_payload_rar_file = sample_data_path + '.rar' + while chunk := file.read(4096): + if not rar_signature_found and chunk.find(b'Rar!') != -1: + rar_signature_found = True + if not setup_signature_found and chunk.find(b'Setup=') != -1: + setup_signature_found = True + + if rar_signature_found and setup_signature_found: + move(sample_data_path, actual_payload_rar_file) + if not await self._extract_rar_file(sample, actual_payload_rar_file): + await self.insert_malware_sample_data( + sample.sha256, + sample.file_name, + sample.analyze_id, + "UNKNOWN", + ) + break + else: + sample.file_path = sample_data_path + + async def analyze_yara_rules(self, pathes: List[TriageNewSampleData]) -> List[AnalyzeResult]: + analyze_results = [] + + for sample in [x for x in pathes if x.file_path]: + families = [] + network_usage = [] + dangerous_tags = [] + + sample.file_path = sample.file_path.replace('\x00', '') + if self.stealer_scanner.scan(sample.file_path): + scan_result = self.stealer_scanner.scan_results + families = list(filter(lambda x: x, [x.get("rule", "").lower() for x in scan_result])) + + else: + entry = attempts_to_get_entry_point(sample.file_path) + try: + if entry.entry_point and self.stealer_scanner.scan(entry.entry_point.replace('\x00', '')): + scan_result = self.stealer_scanner.scan_results + families = list(filter(lambda x: x, [x.get("rule", "").lower() for x in scan_result])) + + if entry.entry_point and self.network_scanner.scan(sample.file_path): + network_usage = list(filter( + lambda x: x, [x.get("rule", "").lower() for x in self.network_scanner.scan_results])) + + if entry.entry_point and self.network_scanner.scan(entry.entry_point): + network_usage += list(filter( + lambda x: x, [x.get("rule", "").lower() for x in self.network_scanner.scan_results])) + + if entry.entry_point and self.dangerous_scanner.scan(entry.entry_point.replace('\x00', '')): + scan_result = self.dangerous_scanner.scan_results + dangerous_tags = list(filter(lambda x: x, [x.get("rule", "") for x in scan_result])) + sample.behaviors = dangerous_tags + + if entry.extraction_path: + clean_up_temp_files(entry.extraction_path) + + + if entry.entry_point: + await self.telegram_client.send_bot_message( + f'Entry point detected: {entry.entry_point.split("/")[-1]}\n\n' + f'Yara matches: {", ".join(families + sample.tags)}\n' + f'Behavior: {", ".join(dangerous_tags)}\n\n' + f'url: https://tria.ge/{sample.analyze_id}') + except Exception as err: + logger.error(f'Unexpected issue occurred: {err}') + await self.insert_malware_sample_data( + sample.sha256, + sample.file_name, + sample.analyze_id, + "UNKNOWN", + ) + + analyze_results.append(AnalyzeResult(families, network_usage, sample)) + + return analyze_results + + async def _analyze_network(self, value: AnalyzeResult) -> List[DecompileResult]: + extraction_result: List[DecompileResult] = [] + if not value.network: + entry = attempts_to_get_entry_point(value.triage_object.file_path) + + valid_entries = [] + if entry.entry_point: + valid_entries = await self._fetch_results_from_entry_point(entry.entry_point, value) + + if entry.extraction_path: + clean_up_temp_files(entry.extraction_path) + return valid_entries + + extraction_dir = extract_pyinstaller_exe(value.triage_object.file_path) + pyc_files = attempts_to_get_entry_point(value.triage_object.file_path) + if not pyc_files.entry_point: + return extraction_result + + source_codes = decompile_pyc(pyc_files.entry_point) + logger.warning(f'Detected network usage: {value.network} for sample {value.triage_object.analyze_id}') + + for network in set(value.network): + match network.lower(): + case 'usesdiscordwebhook': + extraction_result += await self._extract_webhook_from_source([source_codes], value) + if not value.triage_object.family: + value.triage_object.family = 'webhook_malware' + case 'discordpyframework': + extraction_result.append(await self._analyze_discord_py_framework([source_codes], value)) + extraction_result += await self._extract_webhook_from_source([source_codes], value) + if not value.triage_object.family: + value.triage_object.family = 'discord_bot_malware' + case 'pythonsmtpusage': + extraction_result.append(await self._extract_email_from_source([source_codes])) + if not value.triage_object.family: + value.triage_object.family = 'smtp_malware' + + if extraction_dir: + clean_up_temp_files(extraction_dir) + + return extraction_result + + @staticmethod + async def _extract_email_from_source(source_codes: List[str]) -> DecompileResult: + result = [] + for code in source_codes: + email = list(filter(lambda x: x, findall(EMAIL_MATCHER, code))) + result += email + + return DecompileResult([], result) + + async def _fetch_results_from_entry_point(self, entry_file: str, value: AnalyzeResult) -> List[DecompileResult]: + valid_entries: List[str] = [] + + logger.info(f'Fetching result from the entry point for {value.triage_object.analyze_id}') + decompiled_code = decompile_pyc(entry_file) + possible_webhook = findall(WEBHOOK_REGEX, decompiled_code) + if possible_webhook: + # ('webhook_url', 'app') + self.queued_files_to_delete.add(value.triage_object.file_path) + valid_entries += [x[0] for x in possible_webhook] + if not value.triage_object.family: + value.triage_object.family = 'webhook_malware' + + + possible_base64_webhook = findall(WEBHOOK_REGEX_BASE64, decompiled_code) + if possible_base64_webhook: + self.queued_files_to_delete.add(value.triage_object.file_path) + valid_entries += base64_decode_then_filter(possible_base64_webhook) + if not value.triage_object.family: + value.triage_object.family = 'webhook_malware' + + verified_entries: List[DecompileResult] = [ + validate_webhooks( + list(set(valid_entries)), + [value.triage_object.family] + value.triage_object.behaviors + value.triage_object.tags, + value.triage_object.sha256)] + + verified_entries += await self._extract_other_tokens(decompiled_code, value) + return verified_entries + + async def _extract_other_tokens(self, decompiled_code: str, value: AnalyzeResult) -> DecompileResult: + valid_entries = [] + invalid_entries = [] + bot_tokens = list(set(findall(DISCORD_TOKEN_MATCHER, decompiled_code))) + if bot_tokens: + self.queued_files_to_delete.add(value.triage_object.file_path) + valid_entries += [x for x in bot_tokens if validate_discord_token(x, True).success] + if not value.triage_object.family: + value.triage_object.family = 'discord_bot_malware' + + telegram_bot_tokens = list(set(findall(TELEGRAM_REGEX, decompiled_code))) + if telegram_bot_tokens: + self.queued_files_to_delete.add(value.triage_object.file_path) + for x in bot_tokens: + if validate_telegram_token(x): + valid_entries.append(x) + else: + invalid_entries.append(x) + + if not value.triage_object.family: + value.triage_object.family = 'telegram_bot_usage' + + return DecompileResult(invalid_entries, valid_entries) + + async def _extract_webhook_from_source(self, source_codes: List[str], value: AnalyzeResult) -> list[ + DecompileResult]: + extraction_result = [] + for code in source_codes: + if code.startswith('pyi_') or code.startswith('_'): + continue + + webhooks = findall(WEBHOOK_REGEX, code) + if webhooks: + extraction_result.append( + validate_webhooks( + webhooks, + [value.triage_object.family] + value.triage_object.tags + value.triage_object.behaviors, + value.triage_object.sha256 + )) + self.queued_files_to_delete.add(value.triage_object.file_path) + + return extraction_result + + async def _analyze_discord_py_framework(self, src_codes: List[str], value: AnalyzeResult) -> DecompileResult: + extraction_result_valid = [] + extraction_result_invalid = [] + all_results = [] + for code in [x for x in src_codes if not x.startswith('pyi')]: + bot_tokens_plain_text = list(set(findall(DISCORD_TOKEN_MATCHER, code))) + for token in bot_tokens_plain_text: + all_results.append(token) + if validate_discord_token(token, True).success: + extraction_result_valid.append(token) + else: + extraction_result_invalid.append(token) + + bot_token_base64: List[str] = list(set(findall(DISCORD_TOKEN_MATCHER_BASE64, code))) + for token in bot_token_base64: + if token: + token = token.replace("'", '') + all_results.append(token) + decoded_token = b64decode(token).decode('utf-8', errors='ignore') + if validate_discord_token(decoded_token, True).success: + extraction_result_valid.append(decoded_token) + elif validate_discord_token(decoded_token[::-1], True).success: + extraction_result_valid.append(decoded_token[::-1]) + else: + extraction_result_invalid.append(decoded_token) + + + if not all_results: + await self.telegram_client.send_bot_message( + f'please investigate this sample: https://tria.ge/{value.triage_object.analyze_id}\n\n' + f'Usage: discord.py bot framework.\n' + f'Failed to retrieve bot token using usual method.') + else: + self.queued_files_to_delete.add(value.triage_object.file_path) + + return DecompileResult(extraction_result_invalid, extraction_result_valid) + + async def decompile_and_get_valids(self, rule_matched: List[AnalyzeResult]) -> List[DecompileResult]: + results: List[DecompileResult] = [] + for value in rule_matched: + malware_family_result = await self._analyze_malware_family(value) + if malware_family_result is not None: + results.append(malware_family_result) + logger.info(f"Removing: {value.triage_object.file_path}") + continue + + network_analyze_result = await self._analyze_network(value) + if network_analyze_result: + results += network_analyze_result + await self._send_success_notification(network_analyze_result, value) + + if self.queued_files_to_delete: + for file in set(self.queued_files_to_delete): + clean_up_temp_files(file) + + self.queued_files_to_delete = set() + + return list(set(results)) + + async def _analyze_malware_family(self, value: AnalyzeResult) -> Optional[DecompileResult]: + if not value.families: + logger.warning( + f"Failed to determine sample family for {value.triage_object.analyze_id}" + ) + await self.insert_malware_sample_data( + value.triage_object.sha256, + value.triage_object.file_name, + value.triage_object.analyze_id, + "UNKNOWN", + ) + return None + + # from here, we know this malware belongs to a known family by us. + family = value.families[0] + logger.warning(f"Identified {value.triage_object.analyze_id} is {value.families[0]}") + + try: + function_result = DECOMPILERS[family] + except KeyError: + function_result = None + + result: List[DecompileResult] = [] + if function_result is not None: + if isinstance(function_result, list): + for function in function_result: + result_temp = function.decompile( + value.triage_object.file_path, value.triage_object.tags + value.triage_object.behaviors) + result.append(result_temp) + else: + result = [function_result.decompile( + value.triage_object.file_path, value.triage_object.tags + value.triage_object.behaviors, + value.triage_object.sha256)] + + await self.insert_malware_sample_data( + value.triage_object.sha256, + value.triage_object.file_name, + value.triage_object.analyze_id, + family, + ) + remove(value.triage_object.file_path) + + value.triage_object.family = family + # string extraction should have no invalid data. + if result: + actual_result = await self._send_success_notification(result, value) + return actual_result + + return None + + async def _send_success_notification(self, result: List[DecompileResult], value: AnalyzeResult) -> DecompileResult: + valids = [] + invalids = [] + for res in result: + valids += res.valid_result + invalids += res.invalid_result + + + if valids or invalids: + await self.telegram_client.send_bot_message( + f"Extracted valid result: \n```\n{"\n".join([x for x in valids])}\n```\n" + f"Extracted invalid result: \n```\n{"\n".join([x for x in invalids])}\n```\n" + f"from https://tria.ge/{value.triage_object.analyze_id}.\n\n" + f"Family: {value.triage_object.family}\n" + f"Behaviors: {', '.join(value.triage_object.behaviors)}" + ) + + return DecompileResult(invalids, valids) + + async def _extract_zip_file(self, sample: TriageNewSampleData, sample_data_path: str) -> bool: + from zipfile import ZipFile + extraction_dir = f"{getcwd()}/samples/{sample.analyze_id}_zipextracted" + makedirs(extraction_dir, exist_ok=True) + try: + with ZipFile(sample_data_path) as z: + z.extractall(extraction_dir) + await self.find_exe_from_extraction(extraction_dir, sample) + return True + except Exception as err: + logger.warning(f'Failed to extract payload from zip file for sample: {sample.analyze_id}. {err}') + clean_up_temp_files(extraction_dir) + + return False + + async def _extract_rar_file(self, sample: TriageNewSampleData, sample_data_path: str) -> bool: + from rarfile import RarFile + extraction_dir = f"{getcwd()}/samples/{sample.analyze_id}_rarextracted" + makedirs(extraction_dir, exist_ok=True) + try: + with RarFile(sample_data_path) as r: + r.extractall(extraction_dir) + await self.find_exe_from_extraction(extraction_dir, sample) + return True + except Exception as err: + logger.warning(f'Failed to extract payload from rar file for sample: {sample.analyze_id}. {err}') + clean_up_temp_files(extraction_dir) + + return False + + async def find_exe_from_extraction(self, extraction_dir, sample): + exe_files = find_payload_files(extraction_dir, '.exe', '') + if exe_files: + for file in exe_files: + self.stealer_scanner.scan(file) + if self.stealer_scanner.scan_results: + exe_files = file + break + else: + exe_files = exe_files[0] + + final_payload_file = f"{getcwd()}/samples/{exe_files.split('/')[-1]}" + move(exe_files, final_payload_file) + + sample.file_path = final_payload_file + else: + sample.file_path = None + clean_up_temp_files(extraction_dir) + + +async def _main(): + triage_client = TriageClient() + + while True: + try: + await worker(triage_client) + except Exception as err: + logger.exception(f'Worker task failed with error: {err}') + await triage_client.telegram_client.send_bot_message( + f"Failed to do work: {err}\n\n{err.__class__.__name__}\n\n" + f"Traceback: {format_exc()}") + + await sleep(randint(35, 55)) + + +async def main(): + main_task = create_task(_main()) + bot_task = create_task(tg_bot_polling()) + + await gather(main_task, bot_task) + + +async def worker(triage_client): + newest_sample = await triage_client.get_new_samples() + logger.info(f"Found {len(newest_sample)} new samples.") + data_pathes = await triage_client.download_sample(newest_sample) + rule_matched = await triage_client.analyze_yara_rules(data_pathes) + decompile_results = await triage_client.decompile_and_get_valids(rule_matched) + + if decompile_results: + logger.info(f"Here is our results: {decompile_results}") + + directory_path = f'{getcwd()}/samples' + actual_files = [f for f in listdir(directory_path) if path.isfile(path.join(directory_path, f))] + for file in actual_files: + clean_up_temp_files(f'{getcwd()}/samples/{file}') + + logger.info("Sleeping 2 minutes for next batch.") + + +if __name__ == "__main__": + run(main()) diff --git a/methods/decompilers.py b/methods/decompilers.py new file mode 100644 index 0000000..b2536dc --- /dev/null +++ b/methods/decompilers.py @@ -0,0 +1,10 @@ +from abc import ABC, abstractmethod +from typing import List + +from model.common_models import DecompileResult + +class Decompiler(ABC): + @abstractmethod + def decompile(self, file_path: str, tags: List[str], sha256: str = "") -> DecompileResult: + """Decompile the given file and return the result""" + pass diff --git a/methods/impl/amnesia_decompiler.py b/methods/impl/amnesia_decompiler.py new file mode 100644 index 0000000..049b702 --- /dev/null +++ b/methods/impl/amnesia_decompiler.py @@ -0,0 +1,9 @@ +from typing import List + +from methods.decompilers import Decompiler +from model.common_models import DecompileResult + +class AmnesiaDecompiler(Decompiler): + def decompile(self, file_path: str, tags: List[str], sha256: str = "") -> DecompileResult: + # Placeholder implementation + return DecompileResult([], []) diff --git a/methods/impl/legacy_notoken_decompile.py b/methods/impl/legacy_notoken_decompile.py new file mode 100644 index 0000000..54b6203 --- /dev/null +++ b/methods/impl/legacy_notoken_decompile.py @@ -0,0 +1,9 @@ +from typing import List + +from methods.decompilers import Decompiler +from model.common_models import DecompileResult + +class LegacyNotokenDecompiler(Decompiler): + def decompile(self, file_path: str, tags: List[str], sha256: str = "") -> DecompileResult: + # Placeholder implementation + return DecompileResult([], []) diff --git a/methods/impl/notoken_decompile.py b/methods/impl/notoken_decompile.py new file mode 100644 index 0000000..d1dc285 --- /dev/null +++ b/methods/impl/notoken_decompile.py @@ -0,0 +1,9 @@ +from typing import List + +from methods.decompilers import Decompiler +from model.common_models import DecompileResult + +class NotokenDecompiler(Decompiler): + def decompile(self, file_path: str, tags: List[str], sha256: str = "") -> DecompileResult: + # Placeholder implementation + return DecompileResult([], []) diff --git a/methods/impl/pysilon_decompiler.py b/methods/impl/pysilon_decompiler.py new file mode 100644 index 0000000..718fd2c --- /dev/null +++ b/methods/impl/pysilon_decompiler.py @@ -0,0 +1,9 @@ +from typing import List + +from methods.decompilers import Decompiler +from model.common_models import DecompileResult + +class PysilonDecompiler(Decompiler): + def decompile(self, file_path: str, tags: List[str], sha256: str = "") -> DecompileResult: + # Placeholder implementation + return DecompileResult([], []) diff --git a/model/common_models.py b/model/common_models.py new file mode 100644 index 0000000..98d4d2e --- /dev/null +++ b/model/common_models.py @@ -0,0 +1,7 @@ +from dataclasses import dataclass +from typing import List + +@dataclass +class DecompileResult: + invalid_result: List[str] + valid_result: List[str] diff --git a/model/triage_models.py b/model/triage_models.py new file mode 100644 index 0000000..e2725d4 --- /dev/null +++ b/model/triage_models.py @@ -0,0 +1,18 @@ +from dataclasses import dataclass, field +from typing import List, Optional + +@dataclass +class TriageNewSampleData: + file_name: str + sha256: str + analyze_id: str + file_path: Optional[str] = None + family: str = "" + tags: List[str] = field(default_factory=list) + behaviors: List[str] = field(default_factory=list) + +@dataclass +class AnalyzeResult: + families: List[str] + network: List[str] + triage_object: TriageNewSampleData diff --git a/rules/dangerous.yar b/rules/dangerous.yar new file mode 100644 index 0000000..185e031 --- /dev/null +++ b/rules/dangerous.yar @@ -0,0 +1,134 @@ +rule Keylogging { + meta: + description = "Detects keylogging functionality" + author = "Malware Researcher" + severity = "High" + date = "2023-01-01" + strings: + $import1 = "import keyboard" ascii wide + $import2 = "from pynput" ascii wide + $hook1 = "keyboard.hook" ascii wide + $hook2 = "keyboard.Listener" ascii wide + $hook3 = "keyboard.on_press" ascii wide + $hook4 = "KeyboardEvent" ascii wide + $func1 = "on_press" ascii wide + $func2 = "on_release" ascii wide + condition: + 1 of ($import*) and 1 of ($hook*) or 1 of ($func*) +} + +rule ScreenCapture { + meta: + description = "Detects screen capture functionality" + author = "Malware Researcher" + severity = "High" + date = "2023-01-01" + strings: + $import1 = "import pyautogui" ascii wide + $import2 = "from PIL import ImageGrab" ascii wide + $import3 = "import mss" ascii wide + $func1 = "pyautogui.screenshot" ascii wide + $func2 = "ImageGrab.grab" ascii wide + $func3 = "mss().shot" ascii wide + $func4 = ".save(" ascii wide + condition: + 1 of ($import*) and 1 of ($func*) +} + +rule BrowserDataTheft { + meta: + description = "Detects browser data theft functionality" + author = "Malware Researcher" + severity = "High" + date = "2023-01-01" + strings: + $browser1 = "Chrome" nocase ascii wide + $browser2 = "Firefox" nocase ascii wide + $browser3 = "Edge" nocase ascii wide + $browser4 = "Opera" nocase ascii wide + $path1 = "AppData\\Local\\Google\\Chrome\\User Data" ascii wide + $path2 = "AppData\\Roaming\\Mozilla\\Firefox\\Profiles" ascii wide + $path3 = "AppData\\Local\\Microsoft\\Edge\\User Data" ascii wide + $data1 = "Login Data" ascii wide + $data2 = "Cookies" ascii wide + $data3 = "Web Data" ascii wide + $sql1 = "SELECT" ascii wide + $sql2 = "FROM logins" ascii wide + $sql3 = "FROM cookies" ascii wide + $crypto1 = "CryptUnprotectData" ascii wide + $crypto2 = "Crypt.decrypt" ascii wide + condition: + (2 of ($browser*) or 1 of ($path*)) and + (1 of ($data*)) and + (1 of ($sql*) or 1 of ($crypto*)) +} + +rule SystemInformationCollection { + meta: + description = "Detects system information collection functionality" + author = "Malware Researcher" + severity = "Medium" + date = "2023-01-01" + strings: + $import1 = "import platform" ascii wide + $import2 = "import socket" ascii wide + $import3 = "import uuid" ascii wide + $import4 = "import psutil" ascii wide + $func1 = "platform.system" ascii wide + $func2 = "platform.release" ascii wide + $func3 = "socket.gethostname" ascii wide + $func4 = "uuid.getnode" ascii wide + $func5 = "psutil.cpu_count" ascii wide + $func6 = "psutil.disk_usage" ascii wide + $func7 = "os.environ" ascii wide + $func8 = "os.getlogin" ascii wide + condition: + 2 of ($import*) and 2 of ($func*) +} + +rule AntiVMTechniques { + meta: + description = "Detects anti-VM/sandbox evasion techniques" + author = "Malware Researcher" + severity = "High" + date = "2023-01-01" + strings: + $vm1 = "VMware" nocase ascii wide + $vm2 = "VirtualBox" nocase ascii wide + $vm3 = "QEMU" nocase ascii wide + $vm4 = "Xen" nocase ascii wide + $vm5 = "KVM" nocase ascii wide + $vm6 = "Parallels" nocase ascii wide + $vm7 = "Hyper-V" nocase ascii wide + $vm8 = "Virtual Machine" nocase ascii wide + $check1 = "wmic.exe" nocase ascii wide + $check2 = "systeminfo" nocase ascii wide + $check3 = "get_mac" nocase ascii wide + $check4 = "registry" nocase ascii wide + $check5 = "check_vm" nocase ascii wide + $check6 = "is_vm" nocase ascii wide + condition: + 2 of ($vm*) and 1 of ($check*) +} + +rule SelfDestructCode { + meta: + description = "Detects self-destructing code functionality" + author = "Malware Researcher" + severity = "High" + date = "2023-01-01" + strings: + $del1 = "os.remove" ascii wide + $del2 = "os.unlink" ascii wide + $del3 = "shutil.rmtree" ascii wide + $path1 = "__file__" ascii wide + $path2 = "sys.argv[0]" ascii wide + $cmd1 = "cmd.exe /c" ascii wide + $cmd2 = "del " ascii wide + $cmd3 = "rmdir" ascii wide + $bat1 = ".bat" ascii wide + $bat2 = ".cmd" ascii wide + condition: + 1 of ($del*) and (1 of ($path*) or + (1 of ($cmd*) and 1 of ($bat*))) +} diff --git a/rules/infosteal.yar b/rules/infosteal.yar new file mode 100644 index 0000000..b3167fb --- /dev/null +++ b/rules/infosteal.yar @@ -0,0 +1,136 @@ +import "pe" + +rule RayXStealer { + meta: + description = "Detects RayX Stealer malware" + author = "Malware Researcher" + severity = "High" + date = "2023-01-01" + strings: + $s1 = "rayxstealer" nocase ascii wide + $s2 = "ray-x" nocase ascii wide + $token1 = "discordwebhook" nocase ascii wide + $token2 = "getTokens" nocase ascii wide + $browser1 = "Chrome" nocase ascii wide + $browser2 = "Edge" nocase ascii wide + $browser3 = "passwords" nocase ascii wide + $browser4 = "cookies" nocase ascii wide + condition: + ($s1 or $s2) and + 1 of ($token*) and + 2 of ($browser*) +} + +rule PysilonStealer { + meta: + description = "Detects Pysilon Stealer malware" + author = "Malware Researcher" + severity = "High" + date = "2023-01-01" + strings: + $id1 = "pysilon" nocase ascii wide + $id2 = "pysilonstealer" nocase ascii wide + $grab1 = "grab_discord" nocase ascii wide + $grab2 = "grab_passwords" nocase ascii wide + $grab3 = "grab_cookies" nocase ascii wide + $grab4 = "system_info" nocase ascii wide + $net1 = "webhook" nocase ascii wide + $net2 = "sendData" nocase ascii wide + condition: + 1 of ($id*) and + 2 of ($grab*) and + 1 of ($net*) +} + +rule ExelaStealer { + meta: + description = "Detects Exela Stealer malware" + author = "Malware Researcher" + severity = "High" + date = "2023-01-01" + strings: + $id = "exelastealer" nocase ascii wide + $grab1 = "grab_discord" nocase ascii wide + $grab2 = "grab_browsers" nocase ascii wide + $grab3 = "grab_wallets" nocase ascii wide + $net1 = "webhook_url" nocase ascii wide + $net2 = "send_report" nocase ascii wide + condition: + $id or (2 of ($grab*) and 1 of ($net*)) +} + +rule BlankGrabber { + meta: + description = "Detects BlankGrabber/AmnesiaGrabber malware" + author = "Malware Researcher" + severity = "High" + date = "2023-01-01" + strings: + $id1 = "blankgrabber" nocase ascii wide + $id2 = "amnesia" nocase ascii wide + $func1 = "grab_tokens" nocase ascii wide + $func2 = "grab_cookies" nocase ascii wide + $func3 = "grab_passwords" nocase ascii wide + $func4 = "screenshot" nocase ascii wide + $net1 = "webhook" nocase ascii wide + $enc1 = "encrypt" nocase ascii wide + $enc2 = "decrypt" nocase ascii wide + condition: + 1 of ($id*) and + 2 of ($func*) and + ($net1 or 1 of ($enc*)) +} + +rule LunaGrabber { + meta: + description = "Detects Luna Grabber malware" + author = "Malware Researcher" + severity = "High" + date = "2023-01-01" + strings: + $id = "lunagrabber" nocase ascii wide + $grab1 = "grab_tokens" nocase ascii wide + $grab2 = "grab_password" nocase ascii wide + $grab3 = "grab_cookie" nocase ascii wide + $net = "webhook" nocase ascii wide + condition: + $id or (2 of ($grab*) and $net) +} + +rule UmbralStealer { + meta: + description = "Detects Umbral Stealer malware" + author = "Malware Researcher" + severity = "High" + date = "2023-01-01" + strings: + $id = "umbral" nocase ascii wide + $grab1 = "get_tokens" nocase ascii wide + $grab2 = "get_passwords" nocase ascii wide + $grab3 = "get_cookies" nocase ascii wide + $grab4 = "get_wallets" nocase ascii wide + $net1 = "webhook" nocase ascii wide + $net2 = "send_data" nocase ascii wide + condition: + $id or (2 of ($grab*) and 1 of ($net*)) +} + +rule DiscordRatStealer { + meta: + description = "Detects Discord RAT malware" + author = "Malware Researcher" + severity = "High" + date = "2023-01-01" + strings: + $id1 = "discordrat" nocase ascii wide + $id2 = "discord_rat" nocase ascii wide + $cmd1 = "command_handler" nocase ascii wide + $cmd2 = "remote_control" nocase ascii wide + $cmd3 = "remote_cmd" nocase ascii wide + $discord1 = "discord.py" nocase ascii wide + $discord2 = "discord_webhook" nocase ascii wide + $discord3 = "bot.command" nocase ascii wide + condition: + 1 of ($id*) and + (1 of ($cmd*) and 1 of ($discord*)) +} diff --git a/rules/network.yar b/rules/network.yar new file mode 100644 index 0000000..d774086 --- /dev/null +++ b/rules/network.yar @@ -0,0 +1,102 @@ +rule UsesDiscordWebhook { + meta: + description = "Detects Discord webhook usage" + author = "Malware Researcher" + severity = "Medium" + date = "2023-01-01" + strings: + $webhook1 = "https://discord.com/api/webhooks/" ascii wide + $webhook2 = "https://discordapp.com/api/webhooks/" ascii wide + $webhook3 = "https://ptb.discord.com/api/webhooks/" ascii wide + $webhook4 = "https://canary.discord.com/api/webhooks/" ascii wide + $func1 = "requests.post" ascii wide + $func2 = "httpx.post" ascii wide + $func3 = "aiohttp" ascii wide + $func4 = "urllib" ascii wide + condition: + 1 of ($webhook*) and 1 of ($func*) +} + +rule DiscordPyFramework { + meta: + description = "Detects Discord.py bot framework usage" + author = "Malware Researcher" + severity = "Medium" + date = "2023-01-01" + strings: + $import1 = "import discord" ascii wide + $import2 = "from discord.ext import commands" ascii wide + $class1 = "discord.Client" ascii wide + $class2 = "commands.Bot" ascii wide + $func1 = "bot.run" ascii wide + $func2 = "client.run" ascii wide + $token = /['"][A-Za-z\d]{24}\.[\w-]{6}\.[\w-]{27}['"]/ ascii wide + condition: + (1 of ($import*) or 1 of ($class*)) and + (1 of ($func*) or $token) +} + +rule PythonSMTPUsage { + meta: + description = "Detects Python SMTP mail sending functionality" + author = "Malware Researcher" + severity = "Medium" + date = "2023-01-01" + strings: + $import1 = "import smtplib" ascii wide + $import2 = "from email" ascii wide + $func1 = "smtplib.SMTP" ascii wide + $func2 = "smtp.send_message" ascii wide + $func3 = "smtp.sendmail" ascii wide + $auth1 = "smtp.login" ascii wide + $provider1 = "smtp.gmail.com" ascii wide + $provider2 = "smtp-mail.outlook.com" ascii wide + $provider3 = "smtp.mail.yahoo.com" ascii wide + condition: + $import1 and + (1 of ($func*)) and + ($auth1 or 1 of ($provider*)) +} + +rule HTTPRequests { + meta: + description = "Detects HTTP request libraries usage for data exfiltration" + author = "Malware Researcher" + severity = "Low" + date = "2023-01-01" + strings: + $import1 = "import requests" ascii wide + $import2 = "import httpx" ascii wide + $import3 = "import aiohttp" ascii wide + $import4 = "from urllib" ascii wide + $func1 = ".post(" ascii wide + $func2 = ".get(" ascii wide + $func3 = "urlopen(" ascii wide + $data1 = "json=" ascii wide + $data2 = "data=" ascii wide + $data3 = "files=" ascii wide + condition: + 1 of ($import*) and + 1 of ($func*) and + 1 of ($data*) +} + +rule TelegramBotAPI { + meta: + description = "Detects Telegram Bot API usage" + author = "Malware Researcher" + severity = "Medium" + date = "2023-01-01" + strings: + $url1 = "api.telegram.org/bot" ascii wide + $token = /[0-9]{8,10}:[A-Za-z0-9_-]{35}/ ascii wide + $method1 = "/sendMessage" ascii wide + $method2 = "/sendDocument" ascii wide + $method3 = "/sendPhoto" ascii wide + $import1 = "telebot" ascii wide + $import2 = "telegram.ext" ascii wide + condition: + ($url1 and 1 of ($method*)) or + ($token and 1 of ($method*)) or + 1 of ($import*) +} diff --git a/utils/blank.py b/utils/blank.py new file mode 100644 index 0000000..f9e6e8a --- /dev/null +++ b/utils/blank.py @@ -0,0 +1,17 @@ +from base64 import b64decode +from re import findall + +from constants.regex import WEBHOOK_REGEX + +def base64_decode_then_filter(encoded_strings): + results = [] + + for encoded_string in encoded_strings: + try: + decoded = b64decode(encoded_string).decode('utf-8', errors='ignore') + webhooks = findall(WEBHOOK_REGEX, decoded) + results.extend(webhooks) + except: + pass + + return results diff --git a/utils/common_utils.py b/utils/common_utils.py new file mode 100644 index 0000000..7f660e6 --- /dev/null +++ b/utils/common_utils.py @@ -0,0 +1,19 @@ +from httpx import AsyncClient + +class HttpxHelperClient: + def __init__(self): + self.client = AsyncClient(timeout=60.0) + + async def get(self, url, headers=None): + response = await self.client.get(url, headers=headers) + return response + + async def post(self, url, data=None, json=None, headers=None): + response = await self.client.post(url, data=data, json=json, headers=headers) + return response + + async def download(self, url, file_path, headers=None): + response = await self.client.get(url, headers=headers) + with open(file_path, 'wb') as f: + f.write(response.content) + return file_path diff --git a/utils/decompile_utils.py b/utils/decompile_utils.py new file mode 100644 index 0000000..ad5e763 --- /dev/null +++ b/utils/decompile_utils.py @@ -0,0 +1,49 @@ +import os +import shutil +import tempfile +from dataclasses import dataclass +from typing import List, Optional + +@dataclass +class EntryPoint: + entry_point: Optional[str] = None + extraction_path: Optional[str] = None + +def clean_up_temp_files(directory_path: str) -> None: + """Remove temporary files and directories""" + try: + if os.path.exists(directory_path): + if os.path.isfile(directory_path): + os.remove(directory_path) + else: + shutil.rmtree(directory_path, ignore_errors=True) + except Exception as e: + print(f"Failed to clean up {directory_path}: {e}") + +def extract_pyinstaller_exe(exe_path: str) -> str: + """Extract PyInstaller executable contents""" + temp_dir = tempfile.mkdtemp(prefix="pyinstaller_extract_") + + # In a real implementation, you would use pyinstxtractor or similar tools + # Here we just return the temporary directory + return temp_dir + +def find_payload_files(directory: str, extension: str, exclude_pattern: str) -> List[str]: + """Find files with specific extension in directory""" + result = [] + for root, _, files in os.walk(directory): + for file in files: + if file.endswith(extension) and exclude_pattern not in file: + result.append(os.path.join(root, file)) + return result + +def decompile_pyc(pyc_file: str) -> str: + """Decompile .pyc file to source code""" + # In a real implementation, you would use uncompyle6 or decompyle3 + # Here we just return a placeholder + return f"# Decompiled content of {pyc_file}\n" + +def attempts_to_get_entry_point(file_path: str) -> EntryPoint: + """Try various methods to find the entry point""" + # In a real implementation, this would be more complex + return EntryPoint(entry_point=file_path) diff --git a/utils/discord_token_validator.py b/utils/discord_token_validator.py new file mode 100644 index 0000000..5c02f52 --- /dev/null +++ b/utils/discord_token_validator.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass + +@dataclass +class TokenValidationResult: + success: bool + message: str = "" + +def validate_discord_token(token: str, ignore_api_check=False) -> TokenValidationResult: + # Simple validation logic - real implementation would verify with Discord API + if len(token) < 50 or '.' not in token: + return TokenValidationResult(False, "Invalid token format") + + if ignore_api_check: + return TokenValidationResult(True, "Token format valid (API check skipped)") + + # In a real implementation, you would check with Discord API here + return TokenValidationResult(True, "Token appears valid") diff --git a/utils/telegram_token_validator.py b/utils/telegram_token_validator.py new file mode 100644 index 0000000..366e9db --- /dev/null +++ b/utils/telegram_token_validator.py @@ -0,0 +1,6 @@ +import re + +def validate_telegram_token(token: str) -> bool: + # Simple validation - check if it matches the pattern \d{8,10}:[A-Za-z0-9_-]{35} + pattern = r"\d{8,10}:[A-Za-z0-9_-]{35}" + return bool(re.match(pattern, token)) diff --git a/utils/webhook_util.py b/utils/webhook_util.py new file mode 100644 index 0000000..b37d6d8 --- /dev/null +++ b/utils/webhook_util.py @@ -0,0 +1,15 @@ +from typing import List + +from model.common_models import DecompileResult + +def validate_webhooks(webhooks: List[str], tags: List[str], sha256: str = "") -> DecompileResult: + valid_webhooks = [] + invalid_webhooks = [] + + for webhook in webhooks: + if webhook and len(webhook) >= 10: # Simple validation - real implementation would check with Discord API + valid_webhooks.append(webhook) + else: + invalid_webhooks.append(webhook) + + return DecompileResult(invalid_webhooks, valid_webhooks)