From f0f155ddbdca00f25b4217a200a7e76a342391a9 Mon Sep 17 00:00:00 2001 From: Schahin Rouhanizadeh Date: Tue, 10 Sep 2024 19:01:21 +0200 Subject: [PATCH] Introduce `nativelink-bridge` This is a fast prototype for subscribing to the redis/dragonflydb "build_events" channel and decode them properly via protobuf and fires them via websocket to the browser. It's a plain prototype without any error handling and formatting. --- flake.nix | 3 + nativelink-config/examples/basic_bes.json | 177 ++++++++++++++++++++++ tools/pre-commit-hooks.nix | 1 + web/bridge/.gitignore | 175 +++++++++++++++++++++ web/bridge/README.md | 74 +++++++++ web/bridge/bun.lockb | Bin 0 -> 11334 bytes web/bridge/image.nix | 62 ++++++++ web/bridge/index.ts | 68 +++++++++ web/bridge/package.json | 15 ++ web/bridge/src/eventHandler.ts | 152 +++++++++++++++++++ web/bridge/src/protobuf.ts | 107 +++++++++++++ web/bridge/src/redis.ts | 28 ++++ web/bridge/src/utils.ts | 27 ++++ web/bridge/src/websocket.ts | 46 ++++++ web/bridge/tsconfig.json | 27 ++++ 15 files changed, 962 insertions(+) create mode 100644 nativelink-config/examples/basic_bes.json create mode 100644 web/bridge/.gitignore create mode 100644 web/bridge/README.md create mode 100755 web/bridge/bun.lockb create mode 100644 web/bridge/image.nix create mode 100644 web/bridge/index.ts create mode 100644 web/bridge/package.json create mode 100644 web/bridge/src/eventHandler.ts create mode 100644 web/bridge/src/protobuf.ts create mode 100644 web/bridge/src/redis.ts create mode 100644 web/bridge/src/utils.ts create mode 100644 web/bridge/src/websocket.ts create mode 100644 web/bridge/tsconfig.json diff --git a/flake.nix b/flake.nix index 479379f02..a9c07eac3 100644 --- a/flake.nix +++ b/flake.nix @@ -262,6 +262,8 @@ }; }; + webBridge = pkgs.callPackage ./web/bridge/image.nix {inherit buildImage pullImage pkgs;}; + nativelink-worker-init = pkgs.callPackage ./tools/nativelink-worker-init.nix {inherit buildImage self nativelink-image;}; rbe-autogen = pkgs.callPackage ./local-remote-execution/rbe-autogen.nix { @@ -373,6 +375,7 @@ nativelink-worker-init nativelink-x86_64-linux publish-ghcr + webBridge ; default = nativelink; diff --git a/nativelink-config/examples/basic_bes.json b/nativelink-config/examples/basic_bes.json new file mode 100644 index 000000000..cca412f12 --- /dev/null +++ b/nativelink-config/examples/basic_bes.json @@ -0,0 +1,177 @@ +{ + "stores": { + "AC_MAIN_STORE": { + "filesystem": { + "content_path": "/tmp/nativelink/data-worker-test/content_path-ac", + "temp_path": "/tmp/nativelink/data-worker-test/tmp_path-ac", + "eviction_policy": { + "max_bytes": 100000000000 + } + } + }, + "BEP_STORE": { + "redis_store": { + "addresses": [ + "redis://@localhost:6379/0" + ], + "response_timeout_s": 5, + "connection_timeout_s": 5, + "experimental_pub_sub_channel": "build_event", + "key_prefix": "nativelink.", + "mode": "standard" + } + }, + "WORKER_FAST_SLOW_STORE": { + "fast_slow": { + "fast": { + "filesystem": { + "content_path": "/tmp/nativelink/data-worker-test/content_path-cas", + "temp_path": "/tmp/nativelink/data-worker-test/tmp_path-cas", + "eviction_policy": { + "max_bytes": 100000000000 + } + } + }, + "slow": { + "noop": {} + } + } + } + }, + "schedulers": { + "MAIN_SCHEDULER": { + "simple": { + "supported_platform_properties": { + "cpu_count": "minimum", + "memory_kb": "minimum", + "network_kbps": "minimum", + "disk_read_iops": "minimum", + "disk_read_bps": "minimum", + "disk_write_iops": "minimum", + "disk_write_bps": "minimum", + "shm_size": "minimum", + "gpu_count": "minimum", + "gpu_model": "exact", + "cpu_vendor": "exact", + "cpu_arch": "exact", + "cpu_model": "exact", + "kernel_version": "exact", + "OSFamily": "priority", + "container-image": "priority" + } + } + } + }, + "workers": [ + { + "local": { + "worker_api_endpoint": { + "uri": "grpc://127.0.0.1:50062" + }, + "cas_fast_slow_store": "WORKER_FAST_SLOW_STORE", + "upload_action_result": { + "ac_store": "AC_MAIN_STORE" + }, + "work_directory": "/tmp/nativelink/work", + "platform_properties": { + "cpu_count": { + "values": [ + "16" + ] + }, + "memory_kb": { + "values": [ + "500000" + ] + }, + "network_kbps": { + "values": [ + "100000" + ] + }, + "cpu_arch": { + "values": [ + "x86_64" + ] + }, + "OSFamily": { + "values": [ + "" + ] + }, + "container-image": { + "values": [ + "" + ] + } + } + } + } + ], + "servers": [ + { + "name": "public", + "listener": { + "http": { + "socket_address": "0.0.0.0:50052" + } + }, + "services": { + "cas": { + "main": { + "cas_store": "WORKER_FAST_SLOW_STORE" + } + }, + "ac": { + "main": { + "ac_store": "AC_MAIN_STORE" + } + }, + "execution": { + "main": { + "cas_store": "WORKER_FAST_SLOW_STORE", + "scheduler": "MAIN_SCHEDULER" + } + }, + "capabilities": { + "main": { + "remote_execution": { + "scheduler": "MAIN_SCHEDULER" + } + } + }, + "bytestream": { + "cas_stores": { + "main": "WORKER_FAST_SLOW_STORE" + } + } + } + }, + { + "name": "private_workers_servers", + "listener": { + "http": { + "socket_address": "0.0.0.0:50062" + } + }, + "services": { + "experimental_prometheus": { + "path": "/metrics" + }, + "experimental_bep": { + "store": "BEP_STORE" + }, + "worker_api": { + "scheduler": "MAIN_SCHEDULER" + }, + "admin": {}, + "health": { + "path": "/status" + } + } + } + ], + "global": { + "max_open_files": 512 + } +} diff --git a/tools/pre-commit-hooks.nix b/tools/pre-commit-hooks.nix index b0b8c90e1..8b93bafa7 100644 --- a/tools/pre-commit-hooks.nix +++ b/tools/pre-commit-hooks.nix @@ -65,6 +65,7 @@ in { # Bun binary lockfile "web/platform/bun.lockb" + "web/bridge/bun.lockb" ]; enable = true; types = ["binary"]; diff --git a/web/bridge/.gitignore b/web/bridge/.gitignore new file mode 100644 index 000000000..9b1ee42e8 --- /dev/null +++ b/web/bridge/.gitignore @@ -0,0 +1,175 @@ +# Based on https://raw.githubusercontent.com/github/gitignore/main/Node.gitignore + +# Logs + +logs +_.log +npm-debug.log_ +yarn-debug.log* +yarn-error.log* +lerna-debug.log* +.pnpm-debug.log* + +# Caches + +.cache + +# Diagnostic reports (https://nodejs.org/api/report.html) + +report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json + +# Runtime data + +pids +_.pid +_.seed +*.pid.lock + +# Directory for instrumented libs generated by jscoverage/JSCover + +lib-cov + +# Coverage directory used by tools like istanbul + +coverage +*.lcov + +# nyc test coverage + +.nyc_output + +# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) + +.grunt + +# Bower dependency directory (https://bower.io/) + +bower_components + +# node-waf configuration + +.lock-wscript + +# Compiled binary addons (https://nodejs.org/api/addons.html) + +build/Release + +# Dependency directories + +node_modules/ +jspm_packages/ + +# Snowpack dependency directory (https://snowpack.dev/) + +web_modules/ + +# TypeScript cache + +*.tsbuildinfo + +# Optional npm cache directory + +.npm + +# Optional eslint cache + +.eslintcache + +# Optional stylelint cache + +.stylelintcache + +# Microbundle cache + +.rpt2_cache/ +.rts2_cache_cjs/ +.rts2_cache_es/ +.rts2_cache_umd/ + +# Optional REPL history + +.node_repl_history + +# Output of 'npm pack' + +*.tgz + +# Yarn Integrity file + +.yarn-integrity + +# dotenv environment variable files + +.env +.env.development.local +.env.test.local +.env.production.local +.env.local + +# parcel-bundler cache (https://parceljs.org/) + +.parcel-cache + +# Next.js build output + +.next +out + +# Nuxt.js build / generate output + +.nuxt +dist + +# Gatsby files + +# Comment in the public line in if your project uses Gatsby and not Next.js + +# https://nextjs.org/blog/next-9-1#public-directory-support + +# public + +# vuepress build output + +.vuepress/dist + +# vuepress v2.x temp and cache directory + +.temp + +# Docusaurus cache and generated files + +.docusaurus + +# Serverless directories + +.serverless/ + +# FuseBox cache + +.fusebox/ + +# DynamoDB Local files + +.dynamodb/ + +# TernJS port file + +.tern-port + +# Stores VSCode versions used for testing VSCode extensions + +.vscode-test + +# yarn v2 + +.yarn/cache +.yarn/unplugged +.yarn/build-state.yml +.yarn/install-state.gz +.pnp.* + +# IntelliJ based IDEs +.idea + +# Finder (MacOS) folder config +.DS_Store diff --git a/web/bridge/README.md b/web/bridge/README.md new file mode 100644 index 000000000..3a7e740be --- /dev/null +++ b/web/bridge/README.md @@ -0,0 +1,74 @@ +# NativeLink Bridge + +Make sure you are running an instance of Redis or DragonflyDB in your local network. + +For DragonflyDB inside docker run: + +```bash +docker run -d --name some-dragonfly -p 6379:6379 --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly +``` + +For Redis inside docker run: + +```bash +docker run -d --name some-redis -p 6379:6379 redis +``` + +## You need 4 Shells to be open + +### 1. Shell (NativeLink) + +Start an instance of NativeLink and connect it with the basic_bes_conf.json to the redis/dragonflydb (default values): + +```bash +cd ../.. && ./result/bin/nativelink ./nativelink-config/examples/basic_bes.json +``` + +## 2. Shell (NativeLink Web Bridge) + +Install dependencies of the bridges: + +```bash +bun i +``` + +Run the Bridge: + +```bash +bun run index.ts +``` + +## 3. Shell (NativeLink Web App) + +Inside the web/app directory run: + +```bash +bun i & bun dev +``` + +Now you can open http://localhost:4321/app. + + +## 4. Shell (Bazel) + +Now you can run your Bazel build with NativeLink and see it in real-time going into the web app + +Include this in your .bazelrc +```bash +build --remote_instance_name=main +build --remote_cache=grpc://127.0.0.1:50051 +build --remote_executor=grpc://127.0.0.1:50051 +build --bes_backend=grpc://127.0.0.1:50061 +build --bes_results_url=http://127.0.0.1:50061/ +build --bes_upload_mode=fully_async +build --build_event_publish_all_actions=true +``` + +```bash +bazel build some-target +``` + + + + +This project was created using `bun init` in bun v1.1.25. [Bun](https://bun.sh) is a fast all-in-one JavaScript runtime. diff --git a/web/bridge/bun.lockb b/web/bridge/bun.lockb new file mode 100755 index 0000000000000000000000000000000000000000..653336b553e8bbd65d6fb13b437f0f6b1c1131c7 GIT binary patch literal 11334 zcmeHNc|6p6_a7RC$`X}C(e@&YAgs_;#OrGk*8^<9WTFj@Q@uewOz+=W{;mch0B3p=p#@ zWa`fsnDPYC2L4e3Rd_gI!hp~qei)AvBou}FNjTAFs){TY>%NgCchuPvK{2`hT2)bY zibfGGD-&$j?e%)rW&FT!zVi`ds05@hV=U!A@OV0HL@KcSMf|{E9*ZU71@gsEx*DEt zELOZX@UFn){KCTcVhPJsfyL?q>At`LSxGJQ>go0prW!>l? ztLMy?)ys!8$~i#uZ8TpETnqB&1J?u|O7mX8yF+>ka2?=Iz_A_$9PI^NQhjcnSS%Gt zr_vmCqaA}g4p=Tc##vIcy^HbN*=~|7^Lw%VeJ#$MOgUEm5nuk;J$}&Z-SV4eMzu~V4FUVIj63wa?fKSe z%Y~CS%lNTbew%ltntH7sblAaGMQC4CGbtmlbEqKu?DC}h<1>j!rXURzoqiU<(+a@iKsg2dye?r5}K4e%%P+Up6dj7877nA3ied>2= zppom7SixnQjWbJlKkrrWwWMw@`(<6(6qh;Y7C&=fyovELo_)vJUW!Mkq$VCL7&A^j9 zc?6KU74TvY`q6G2gYpu=--nJf1H2LdF%SDu9wPY8pmZ4EaUFwojin|J80rxGB*2fL z_3J`D+93}S{93?c`_b?6`X1#IeDUAJtAfEy{X}kCZ6Nhr06+3?*q;P=M;gy;m%RFm z82JB>|JbnTxG?M|?UGmjSip~G;M>BHa-wGg;L(314c#dhk@|N5@5Ing^4k(Z>gmA5 zvtr=MNkU${2jI#2h5pAmAP*7!8GyG0{iqkZQaT6QgTDuO9KVRgM_&J-4y=n0xDd|{ z()kBx9O)WD)`36c*iLo2KAdf&I5@*dIh;|XZNPmM2|3p9iwU6iaP*O(6!mS6^83Mq z^#{O%>4ESN`rl}0lX??2{69$`Pk|+q%M{Hn{+O-jq_1)J4 z*%niqS3P3){$As|-c*@*W~SNa4=z=Me_q{VYsK?s_eg_=hI-MZ-o;T2xnv%o*=+Gt zhxf(2f1ahp%k@J{`c8CywY@Q0SZ4ombDw*Artt5dblWw~w~{@M`$oI-wQ0u(jh*@Q z>QXgo&B%0rhXRYepBQqP`yiXIPc3Lve&eR5G2*v`6;BSSym|QLcCUzKrxGi6zBxM} zr|L=SBa^q>FKOwQPfYym>#;26@Wr(0RN0MpD{VG8KX+uv#j!+){iu`6iaVm5bp<8G zJxxX*pRjK2=&p6EBL*4<7@i2vcD~a5>(qahSFgGG;KvaHiYI!QqPTy_c9T;_c~j3U zn>~i}iXj*0E+KZJ(UM`WM?8;uGjyp|!;PR|t+L92+u84~EUqelU#Jx8ab`$Y^BL3Y z)1JDeZF=%$LcQDiWig*l>AaqDj$2c#3%zvfw%Y72y?+^9owaD8uhQ%1+RYUPII+lh zfMutJ*oTulDCNzu_1vam!HP8B<1&RCvABPJ{x|tSm+AGRX*pd_bm%F$ywBM>@ZE9u50}6>A9ZyVpXa`q%Jq%n$1yued%VE75&cdAA~+L%psU zR<8~6$WxrD_-W{fbDs5?mYXxvqoRFgS0zwh^HdpfaUCYaPVe4B<=W**6U^>d*&hgV zRf%klFFjRhckxb_Icjy{|#NaS9ZWiw_Blh`+fDEjb9n>*14v{Wc2)`h#|U%yT=|| zlziCL;$%r-P_ehVR?V;Ob6?ci92;KFuou3eTZo-|sqlFzfA;XGqFCb*ibZSOPfX4& z9X>Rf7vi@{JL7>Q`>gd1@gm0@Nr>Sjv-1k6g<{8)O?=A^jvJI>BE5_=7;?$}gTmQ^ zCa$u3zFjg>b@!ymbwBm>h}ZTAv8X+7cXjyVhGK5qXU268s+pV3kc;~lA-2cI57#mtk4PWDcJDm)>Cw5u`Qi}~@vr}J!S0i7 zX|DZGt{OY7!-6hkR&Uo?@^C-b=4hwLkmlhfwZrsuBNi>R#or8we{nw~#J)A6b4XxP zmu~w->|@2Q?@B*xiOLCGojAxmV25I*dVZ%LBgbCXzdaJ-KHz}Jh|DtV!+Sy{;A{jL%0S*F7DTa*r}yk7Zg_Pd*ZKsezIVZ z<(-G7+WRhdYMLDreE0N8zhfH=>K4DuSTxyaVB_J9)2e4>AFTG+Mcwl?^LCDl$)DSy zfgu-uvuGi9*E>zA)1LZXSrAjTz~TMHBK^^la)a$Xj~{ya>8RVRJ03lT)d#!Q6{Lk9 zyRG<6+`nR5=_|qH(&!gOt`jti50&>~$i?pzLhKEmw|z4<#+?XDw#qAQ^ioMoikJ|R z>20evmHX81lg^Rz`}>vd(fIAd0$6+!#%{bhFtm@sY11!Rm2B@(Hi=_*Fyz8-E-l1P zd7-MMNm2d>&Nvh&Xl*=lFnVHmZr$K$zxC#GbQb% znipss(_1@E>6u84+Rl*MqeUu=-;}Z~TeQF2X;gXt{Pyw46PnuVBqJNn&+NK*ci_1z zy$yH18@sx4+u*`}AGYc?`IO8Z^r)oso2fn-xvN(RCOcQU|IU!xlPNdA&?|ZDs%Hi7 ziw$QeSG?^)y^9S>_`O1T(wUJ8cH8uWDoUq17ld)1Opmo)qVDzDt0Lym9l!XA2e$0i z8tCYAY7j#%{N~g`Y=v|^mYVR#M{g?YYK;49KJ%WpY;(B1jpBypp<2!7&F)Srx6tFb z&T@7dqZ+v5R??}f7dxnBPaQNm*KYoxcbOY0hTL8)Qo#@QtGahrOgU|2l04{z(D;E@ ziK*EXt(1x>Z%f&Ue+o(%&dR2r;!-FQ^H!gmU;)%qR%yV2iMXL_(Dne9b9y%0^}*mXMgNqJr;G$`Uao zx=o_LP-009#gd624eXC6@mxg(bBc=vEN^(g`gjr{#u96337^K0STL5rALK^C1D=m3 z(PAtyqlQ9mJUl3k#FJa~tcDWsT0DtJ%cT981>ge|2Sp?bPomT^3>PGkI6aADD=Jt} zwolz*(8cjqTH<|PB17j&QIdtPy*dU zL1=*71R#-e8Ux-TcLqpIy|skgDj-pJX^9zHK<*uoxO^*S8(5$L7Ldq2T>=<#cY(zA zTQT`GhD85cOUOM4a!)`}!O_9o0>>h`DM4-$0EQm%|GRAqxn`6(m$D(ZDadUDu)v10qRh?y1TzFiA=2RfG9E(nuNQ1F zFFcwjqMTzTJdwaJY>Gt07X&*Rn71wnHeW3W5^;c3T>fY69ip@~+IV3G^VA<^*}vA0 zJRuQ_c_NVLQ(}-XIcVH(#jw!fyk2T!byXo&t!Wc)1Rj|gx`eXi=%a0^3f+r3T z@gpQo(pOWsl@lQnN`(GVL36|*5=n%3n5n6V7YvtUL~)cLB7BaR5{iN$3og=_a%7w% zU1caq@LVTZzJHid7*4+jgz&!8b*qPbIuHH`bH~gAuCo+AxMG9H7i3N4mPTf&Zhi()N69ztkz*HspNOCm=-DIRM2KNiKh4 z&jJt`z+h*P4<;QP*m4{sQd7<414@S*0Hs_mLtDo+taOmj;_MhOD>0ep_M z(l@j+tAQA160NX?m9~x1#c&ul7KH$lj+8b>CIgF(11z%kW1$le3rvb_b22gX(tQeW z`mNpCrNad9QVwX?)7#-Q>8L|Aj=py5ZkYujq2;~ZK48uW5X4Nj+ah|xff)LYBRw;< z!y~du3<6{?^0lUAqeL8BV#STd29;u>tx3>QhXzaQ%k)~K40-~`d~@rQSrCK!Z;Y5D nJ)%g*f$X#(J0W~yB1+-XO& { + await handleEvent(message, commandClient, protoTypes); + }); + + const websocketServer = startWebSocket() + + // Clean up on exit + process.on('SIGINT', async () => { + await redisClient.disconnect(); + await commandClient.disconnect(); + process.exit(); + }); +} + +main().catch(err => console.error(err)); diff --git a/web/bridge/package.json b/web/bridge/package.json new file mode 100644 index 000000000..83e075c7e --- /dev/null +++ b/web/bridge/package.json @@ -0,0 +1,15 @@ +{ + "name": "bridge", + "module": "index.ts", + "type": "module", + "devDependencies": { + "@types/bun": "^1.1.8" + }, + "peerDependencies": { + "typescript": "^5.0.0" + }, + "dependencies": { + "protobufjs": "^7.4.0", + "redis": "^4.7.0" + } +} diff --git a/web/bridge/src/eventHandler.ts b/web/bridge/src/eventHandler.ts new file mode 100644 index 000000000..34409f4be --- /dev/null +++ b/web/bridge/src/eventHandler.ts @@ -0,0 +1,152 @@ +import type protobuf from 'protobufjs'; +import { commandOptions, type RedisClientType } from 'redis'; +import { constructRedisKey, parseMessage } from './utils'; +import { broadcastProgress } from './websocket'; + +interface CustomBuildEvent extends protobuf.Message { + orderedBuildEvent?: { + event: { + eventTime: { + seconds: protobuf.Long; + nanos: number; + }; + // biome-ignore lint/suspicious/noExplicitAny: + bazelEvent?: any; + }; + }; +} + + + +// biome-ignore lint/suspicious/noExplicitAny: +export async function handleEvent(message: string, commandClient: any, types: { PublishBuildToolEventStreamRequest: protobuf.Type, PublishLifecycleEventRequest: protobuf.Type }) { +// console.log(`Received message from build_event channel: ${message}`); + + const parsedMessage = parseMessage(message); +// console.log('Parsed Message:', parsedMessage); + + const redisKey = constructRedisKey(parsedMessage); +// console.log('Constructed Redis Key:', redisKey); + + switch (parsedMessage.eventType) { + case 'LifecycleEvent': + // console.log(`Processing ${parsedMessage.eventType} with key ${redisKey}`); + await fetchAndDecodeBuildData(redisKey, commandClient, types.PublishLifecycleEventRequest); + break; + case 'BuildToolEventStream': + // console.log(`Processing ${parsedMessage.eventType} with key ${redisKey}`); + await fetchAndDecodeBuildData(redisKey, commandClient, types.PublishBuildToolEventStreamRequest); + break; + default: + console.log('Unknown event type:', parsedMessage.eventType); + } +} + +// biome-ignore lint/suspicious/noExplicitAny: +async function fetchAndDecodeBuildData(redisKey: string, commandClient: any, messageType: protobuf.Type) { + try { + const buildData = await commandClient.get(commandOptions({ returnBuffers: true }), redisKey); + if (buildData) { + // console.log(`Fetched build data for key ${redisKey}`); + const buffer = Buffer.from(buildData); + const decodedMessage = messageType.decode(buffer) as CustomBuildEvent; + + // Hier wird der `bazelEvent` dekodiert, falls er existiert + if(decodedMessage.orderedBuildEvent) { + + + const buildId = decodedMessage.orderedBuildEvent.streamId.buildId + const invocationId = decodedMessage.orderedBuildEvent.streamId.invocationId + // const sequenceNumber = decodedMessage.orderedBuildEvent + + console.log("Build ID: ", buildId) + console.log("Invocation ID: ", invocationId) + // console.log("Sequence Number: ", sequenceNumber) + + + const eventTime = decodedMessage.orderedBuildEvent.event.eventTime; + + // Convert seconds to milliseconds and add nanoseconds converted to milliseconds + const milliseconds = eventTime.seconds.low * 1000 + Math.floor(eventTime.nanos / 1000000); + + // Create a new Date object using the computed milliseconds + const eventDate = new Date(milliseconds); + + // const date = new Date(decodedMessage.orderedBuildEvent.event.eventTime.seconds*1000); + console.log("Event time nanos:", eventTime.nanos) + console.log("Event time seconds:", eventTime.seconds.low) + console.log("Event time:", eventDate.toISOString()); + const currentTime = new Date() + const elapsedTime = currentTime.getTime() - eventDate.getTime(); + console.log("Time Now: ", currentTime.toISOString()) + console.log(`Elapsed Time: ${elapsedTime} ms`); + } + if (decodedMessage?.orderedBuildEvent?.event?.bazelEvent) { + console.log("------------------") + // console.log("Got here.") + const decodedBazelEvent = decodeBazelEvent(decodedMessage.orderedBuildEvent.event.bazelEvent, messageType.root); + // console.log("Decoded Bazel Event:", decodedBazelEvent); + } else { + // console.log("No Bazel Event found."); + } + + // console.log("Decoded String:", decodedMessage.toJSON()); + } else { + // console.log(`No build data found for key ${redisKey}`); + } + } catch (err) { + // console.error(`Error fetching build data for key ${redisKey}:`, err); + } +} + +// biome-ignore lint/suspicious/noExplicitAny: +function decodeBazelEvent(bazelEvent: any, root: protobuf.Root): any { + if (!bazelEvent || !bazelEvent.value) return null; + + const decodedBinaryData = Buffer.from(bazelEvent.value, 'base64'); + const messageType = root.lookupType(bazelEvent.typeUrl.split('/').pop()); + const decodedMessage = messageType.decode(decodedBinaryData); + // In ein lesbares JSON-Objekt umwandeln + const decodedObject = messageType.toObject(decodedMessage, { + longs: String, + enums: String, + bytes: String, + }); + + // Progress Informationen verarbeiten + if (decodedObject.progress) { + console.log("Processing progress information...\n\n"); + processProgress(decodedObject.progress); + } + + return decodedObject; +} + +// biome-ignore lint/suspicious/noExplicitAny: +function processProgress(progress: any) { +// console.log(progress.stderr) + if (progress.stderr) { + // console.log(progress) + // const cleanStderr = stripAnsi(progress.stderr); + console.log(progress.stderr); + broadcastProgress(progress.stderr) + } + + if (progress.opaqueCount === 1) { + // console.log(`Progress Opaque Count: ${progress.opaqueCount}`); + // console.log(progress.stderr); + } + + if (progress.children) { + // biome-ignore lint/suspicious/noExplicitAny: + progress.children.forEach((child: any, index: number) => { + // console.log(`Child ${index + 1}:`); + if (child.progress && child.progress.opaqueCount ===2 ) { + // console.log(` Child Progress Opaque Count: ${child.progress.opaqueCount}`); + } + // if (child.configuration && child.configuration.id) { + // // console.log(` Child Configuration ID: ${child.configuration.id}`); + // } + }); + } +} diff --git a/web/bridge/src/protobuf.ts b/web/bridge/src/protobuf.ts new file mode 100644 index 000000000..861383ba4 --- /dev/null +++ b/web/bridge/src/protobuf.ts @@ -0,0 +1,107 @@ +import protobuf from 'protobufjs'; + +export async function initializeProtobuf(protos: string[]) { + console.log("Loading Remote Proto Files"); + + // Create a new Root instance + const combinedRoot = new protobuf.Root(); + + // Track loaded files to avoid circular dependencies + const loadedFiles: Record = {}; + + // Track processed imports to avoid duplicates + const processedImports = new Set(); + + // Load all initial proto files + for (const proto of protos) { + await loadProto(loadedFiles, combinedRoot, proto, processedImports); + } + + console.log("\nDone parsing all proto files.\n"); + + // Now combinedRoot contains your parsed .proto content + // Example: Look up specific message types + const BazelBuildEvent = combinedRoot.lookupType("build_event_stream.BuildEvent"); + const PublishBuildToolEventStreamRequest = combinedRoot.lookupType("google.devtools.build.v1.PublishBuildToolEventStreamRequest"); + const PublishLifecycleEventRequest = combinedRoot.lookupType("google.devtools.build.v1.PublishLifecycleEventRequest"); + + console.log("Loaded Types:\n"); + console.log({ + PublishLifecycleEventRequest: PublishLifecycleEventRequest ? PublishLifecycleEventRequest.fullName : "Not found", + PublishBuildToolEventStreamRequest: PublishBuildToolEventStreamRequest ? PublishBuildToolEventStreamRequest.fullName : "Not found", + BazelBuildEvent: BazelBuildEvent ? BazelBuildEvent.fullName : "Not found" + }); + + return { + PublishLifecycleEventRequest, + PublishBuildToolEventStreamRequest, + BazelBuildEvent + }; +} + +function resolveImportPath(protoUrl: string, importPath: string): string { + // Handle googleapis imports + if (importPath.startsWith("google/api") || importPath.startsWith("google/devtools/build/v1")) { + return `https://raw.githubusercontent.com/googleapis/googleapis/master/${importPath}`; + } + + // Handle protocolbuffers imports + if (importPath.startsWith("google/protobuf")) { + return `https://raw.githubusercontent.com/protocolbuffers/protobuf/master/src/${importPath}`; + } + + // Handle specific case for bazel + if (importPath.includes("com/google/devtools/build/lib/packages/metrics") || importPath.startsWith("src/main/protobuf")) { + return `https://raw.githubusercontent.com/bazelbuild/bazel/master/${importPath}`; + } + + // Default behavior for other imports - resolve relative to protoUrl + return new URL(importPath, protoUrl).toString(); +} + +// Recursive function to fetch, parse, and handle imports +async function loadProto( + loadedFiles: Record, + root: protobuf.Root, + protoUrl: string, + processedImports: Set, + indentLevel = 0, +) { + if (loadedFiles[protoUrl]) { + // If already loaded, skip to prevent circular imports + return; + } + + const indent = ' '.repeat(indentLevel); + + // Fetch the .proto file content + const response = await fetch(protoUrl); + if (!response.ok) { + throw new Error(`Failed to fetch .proto file from ${protoUrl}: ${response.statusText}`); + } + + const protoContent = await response.text(); + + // Parse the proto content + const parsedProto = protobuf.parse(protoContent, root); + + // Mark this proto as loaded + loadedFiles[protoUrl] = true; + + // Log the imports necessary for this proto file + if (indentLevel < 1) { + console.log(`\n${indent} ${protoUrl}:`); + } + + if (parsedProto.imports && parsedProto.imports.length > 0) { + for (const importPath of parsedProto.imports) { + const resolvedImportUrl = resolveImportPath(protoUrl, importPath); + if (!processedImports.has(resolvedImportUrl)) { + console.log(`${indent} - ${importPath}`); + processedImports.add(resolvedImportUrl); + // Recursively handle the imports + await loadProto(loadedFiles, root, resolvedImportUrl, processedImports, indentLevel + 1,); + } + } + } +} diff --git a/web/bridge/src/redis.ts b/web/bridge/src/redis.ts new file mode 100644 index 000000000..fe571ab81 --- /dev/null +++ b/web/bridge/src/redis.ts @@ -0,0 +1,28 @@ +import { createClient } from 'redis'; + +export async function initializeRedisClients() { + try { + const redisClient = createClient({ + socket: { + host: "172.17.0.2", + port: 6379 + } + }); + const commandClient = redisClient.duplicate(); + + redisClient.on('error', (err) => { + console.error('Redis Client Error:', err); + throw new Error('Failed to connect to Redis.'); + }); + + await redisClient.connect(); + await commandClient.connect(); + + console.log('\nRedis clients successfully connected.\n'); + + return { redisClient, commandClient }; + } catch (error) { + console.error('Error during Redis client initialization:', error); + throw new Error('Unable to initialize Redis clients. Please check your connection.'); + } +} diff --git a/web/bridge/src/utils.ts b/web/bridge/src/utils.ts new file mode 100644 index 000000000..c312e3184 --- /dev/null +++ b/web/bridge/src/utils.ts @@ -0,0 +1,27 @@ +export function parseMessage(message: string) { + const parts = message.split(':'); + + const eventType = parts[0].replace('nativelink:', ''); + const eventID = parts.slice(1, 6).join(':'); + const subEventID = parts.slice(6, 11).join(':'); + const sequenceNumber = parts[11]; + + return { + eventType, + eventID, + subEventID, + sequenceNumber + }; +} + +// biome-ignore lint/suspicious/noExplicitAny: +export function constructRedisKey(parsedMessage: any) { + console.log("\nNew Published Event: ") + console.log(" EventID: ", parsedMessage.eventID) + console.log(" Sequence Number: ", parsedMessage.sequenceNumber) + console.log(" Invocation ID: ", parsedMessage.subEventID) + console.log("------------------") + + // console.log( `nativelink:${parsedMessage.eventType}:${parsedMessage.eventID}:${parsedMessage.subEventID}:${parsedMessage.sequenceNumber}`) + return `nativelink:${parsedMessage.eventType}:${parsedMessage.eventID}:${parsedMessage.subEventID}:${parsedMessage.sequenceNumber}`; +} diff --git a/web/bridge/src/websocket.ts b/web/bridge/src/websocket.ts new file mode 100644 index 000000000..f0b29c01e --- /dev/null +++ b/web/bridge/src/websocket.ts @@ -0,0 +1,46 @@ +import type { ServerWebSocket } from "bun"; + +const clients = new Set>(); + +export const startWebSocket = () => { + console.log('\nWebSocket server is running on ws://localhost:8080\n'); + Bun.serve({ + port: 8080, + fetch(req, server) { + // Upgrade the request to a WebSocket + // Here we can also do the websocket auth/token auth + if (server.upgrade(req)) { + return; + } + return new Response("Upgrade failed", { status: 500 }); + }, + websocket: { + open(ws) { + console.log('New client connected'); + clients.add(ws); + ws.send("Hello Web Client") + }, + message(ws, message) { + console.log('Received message from web client:', message); + }, + close(ws) { + console.log('Web Client disconnected'); + clients.delete(ws); + }, + drain(ws) { + console.log('Ready to receive more data'); + }, + }, +});} + +export function broadcastProgress(progress: string) { + // Convert the string to a Uint8Array + // const buffer = new TextEncoder().encode(progress); + // console.log(progress) + // console.log("----------------------------------------------") + const buffer = Buffer.from(progress) + + for (const ws of clients) { + ws.send(new Uint8Array(buffer)); // Send the ArrayBufferView + } +} diff --git a/web/bridge/tsconfig.json b/web/bridge/tsconfig.json new file mode 100644 index 000000000..238655f2c --- /dev/null +++ b/web/bridge/tsconfig.json @@ -0,0 +1,27 @@ +{ + "compilerOptions": { + // Enable latest features + "lib": ["ESNext", "DOM"], + "target": "ESNext", + "module": "ESNext", + "moduleDetection": "force", + "jsx": "react-jsx", + "allowJs": true, + + // Bundler mode + "moduleResolution": "bundler", + "allowImportingTsExtensions": true, + "verbatimModuleSyntax": true, + "noEmit": true, + + // Best practices + "strict": true, + "skipLibCheck": true, + "noFallthroughCasesInSwitch": true, + + // Some stricter flags (disabled by default) + "noUnusedLocals": false, + "noUnusedParameters": false, + "noPropertyAccessFromIndexSignature": false + } +}