From 0b1944211c34a2237b060cbdf3e10feb96adf882 Mon Sep 17 00:00:00 2001 From: igalshilman Date: Mon, 18 Sep 2023 11:59:32 +0200 Subject: [PATCH] Support KeyedEventHandler This commit adds the support for the Event API for the keyed dynamic handlers. Since the dynamic handler API requires a string key, this means that the registered gRPC methods are of the form rpc Handle(StringKeyedEvent) returns (google.protobuf.Empty) {}; --- buf.lock | 4 +- examples/handler_example.ts | 56 ++++++++++++ package.json | 3 +- proto/dynrpc.proto | 3 + src/public_api.ts | 3 + src/server/base_restate_server.ts | 144 +++++++++++++++++++++++++----- src/types/router.ts | 36 +++++++- src/types/types.ts | 17 ++++ 8 files changed, 236 insertions(+), 30 deletions(-) create mode 100644 examples/handler_example.ts diff --git a/buf.lock b/buf.lock index 8e061003..dae0d5f8 100644 --- a/buf.lock +++ b/buf.lock @@ -4,5 +4,5 @@ deps: - remote: buf.build owner: restatedev repository: proto - commit: 7c1d0063691147dab51a87fc7d2befa5 - digest: shake256:41fb2128bd34a84f363d2b0cafbf0746b82d2b18c400cf85a1b3a94b28a2f4643fc17c9cf523cf98720b227b8d75b24fdc03fb87f8fa9a7ef75f788a5cbfe0c4 + commit: 4c536701ef5348ecbf3cd1ef6cf825fc + digest: shake256:0fdebe27d9653dc31f9951623e0c8dc68a161d2b55146cc22c0501d2bccb22d49ab9b2b80c8f4de8827c4ba168119296f14d323eed5162ef63f128803cc64f47 diff --git a/examples/handler_example.ts b/examples/handler_example.ts new file mode 100644 index 00000000..b0bc9abf --- /dev/null +++ b/examples/handler_example.ts @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2023 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate SDK for Node.js/TypeScript, + * which is released under the MIT license. + * + * You can find a copy of the license in file LICENSE in the root + * directory of this repository or package, or at + * https://github.com/restatedev/sdk-typescript/blob/main/LICENSE + */ + +/* eslint-disable no-console */ + +/* + * A simple example program using the Restate's event handlers. + */ + +import * as restate from "../src/public_api"; + +const registration = async (ctx: restate.RpcContext, event: restate.Event) => { + // store in state the user's information as coming from the registeration event + const { name } = event.json<{ name: string }>(); + ctx.set("name", name); +}; + +const email = async (ctx: restate.RpcContext, event: restate.Event) => { + // store in state the user's information as coming from the email event + const { email } = event.json<{ email: string }>(); + ctx.set("email", email); +}; + +type UserProfile = { + id: string; + name: string; + email: string; +}; + +const get = async ( + ctx: restate.RpcContext, + id: string +): Promise => { + return { + id, + name: (await ctx.get("name")) ?? "", + email: (await ctx.get("email")) ?? "", + }; +}; + +const profile = restate.keyedRouter({ + registration: restate.keyedEventHandler(registration), + email: restate.keyedEventHandler(email), + get, +}); + +// restate server +restate.createServer().bindKeyedRouter("profile", profile).listen(8080); diff --git a/package.json b/package.json index 09133ac5..e909e39c 100644 --- a/package.json +++ b/package.json @@ -38,7 +38,8 @@ "verify": "npm run format -- --check && npm run test && npm run lint && npm run build", "release": "release-it", "example": "RESTATE_DEBUG_LOGGING=JOURNAL ts-node-dev --respawn --transpile-only ./examples/example.ts", - "grpcexample": "RESTATE_DEBUG_LOGGING=JOURNAL ts-node-dev --respawn --transpile-only ./examples/grpc_example.ts" + "grpcexample": "RESTATE_DEBUG_LOGGING=JOURNAL ts-node-dev --respawn --transpile-only ./examples/grpc_example.ts", + "handlerexample": "RESTATE_DEBUG_LOGGING=JOURNAL ts-node-dev --respawn --transpile-only ./examples/handler_example.ts" }, "files": [ "dist" diff --git a/proto/dynrpc.proto b/proto/dynrpc.proto index e695e8a5..57782dec 100644 --- a/proto/dynrpc.proto +++ b/proto/dynrpc.proto @@ -12,13 +12,16 @@ syntax = "proto3"; import "dev/restate/ext.proto"; +import "dev/restate/events.proto"; import "google/protobuf/struct.proto"; +import "google/protobuf/empty.proto"; service RpcEndpoint { option (dev.restate.ext.service_type) = KEYED; rpc call(RpcRequest) returns (RpcResponse) {}; + rpc handle(dev.restate.StringKeyedEvent) returns (google.protobuf.Empty) {}; } service UnkeyedRpcEndpoint { diff --git a/src/public_api.ts b/src/public_api.ts index 378cf460..c2466f91 100644 --- a/src/public_api.ts +++ b/src/public_api.ts @@ -19,8 +19,10 @@ export { export { router, keyedRouter, + keyedEventHandler, UnKeyedRouter, KeyedRouter, + KeyedEventHandler, Client, SendClient, } from "./types/router"; @@ -32,3 +34,4 @@ export { } from "./server/restate_lambda_handler"; export * as RestateUtils from "./utils/public_utils"; export { ErrorCodes, RestateError, TerminalError } from "./types/errors"; +export { Event } from "./types/types"; diff --git a/src/server/base_restate_server.ts b/src/server/base_restate_server.ts index 3109bc31..9f7f8fe0 100644 --- a/src/server/base_restate_server.ts +++ b/src/server/base_restate_server.ts @@ -22,10 +22,13 @@ import { ProtocolMode, ServiceDiscoveryResponse, } from "../generated/proto/discovery"; +import { Event } from "../types/types"; +import { StringKeyedEvent } from "../generated/dev/restate/events"; import { FileDescriptorProto, UninterpretedOption, } from "../generated/google/protobuf/descriptor"; +import { Empty } from "../generated/google/protobuf/empty"; import { FileDescriptorProto as FileDescriptorProto1, ServiceDescriptorProto as ServiceDescriptorProto1, @@ -45,6 +48,7 @@ import { RestateContext, useContext } from "../restate_context"; import { RpcContextImpl } from "../restate_context_impl"; import { verifyAssumptions } from "../utils/assumpsions"; import { TerminalError } from "../public_api"; +import { isEventHandler } from "../types/router"; export interface ServiceOpts { descriptor: ProtoMetadata; @@ -148,6 +152,78 @@ export abstract class BaseRestateServer { } } + rpcHandler( + keyed: boolean, + route: string, + handler: Function + ): { + descriptor: MethodDescriptorProto1; + method: GrpcServiceMethod; + } { + const descriptor = createRpcMethodDescriptor(route); + + const localMethod = (instance: unknown, input: RpcRequest) => { + const ctx = useContext(instance); + if (keyed) { + return dispatchKeyedRpcHandler(ctx, input, handler); + } else { + return dispatchUnkeyedRpcHandler(ctx, input, handler); + } + }; + + const decoder = RpcRequest.decode; + const encoder = (message: RpcResponse) => + RpcResponse.encode(message).finish(); + + const method = new GrpcServiceMethod( + route, + route, + localMethod, + decoder, + encoder + ); + + return { + descriptor: descriptor, + method: method as GrpcServiceMethod, + }; + } + + stringKeyedEventHandler( + keyed: boolean, + route: string, + handler: Function + ): { + descriptor: MethodDescriptorProto1; + method: GrpcServiceMethod; + } { + if (!keyed) { + // TODO: support unkeyed rpc event handler + throw new TerminalError("Unkeyed Event handlers are not yet supported."); + } + const descriptor = createStringKeyedMethodDescriptor(route); + const localMethod = (instance: unknown, input: StringKeyedEvent) => { + const ctx = useContext(instance); + return dispatchKeyedEventHandler(ctx, input, handler); + }; + + const decoder = StringKeyedEvent.decode; + const encoder = (message: Empty) => Empty.encode(message).finish(); + + const method = new GrpcServiceMethod( + route, + route, + localMethod, + decoder, + encoder + ); + + return { + descriptor, + method: method as GrpcServiceMethod, + }; + } + protected bindRpcService(name: string, router: RpcRouter, keyed: boolean) { const lastDot = name.indexOf("."); const serviceName = lastDot === -1 ? name : name.substring(lastDot + 1); @@ -161,40 +237,33 @@ export abstract class BaseRestateServer { ? pushKeyedService(desc, name) : pushUnKeyedService(desc, name); - const decoder = RpcRequest.decode; - const encoder = (message: RpcResponse) => - RpcResponse.encode(message).finish(); - for (const [route, handler] of Object.entries(router)) { - serviceGrpcSpec.method.push(createRpcMethodDescriptor(route)); - - const localFn = (instance: unknown, input: RpcRequest) => { - const ctx = useContext(instance); - if (keyed) { - return dispatchKeyedRpcHandler(ctx, input, handler); - } else { - return dispatchUnkeyedRpcHandler(ctx, input, handler); - } + let registration: { + descriptor: MethodDescriptorProto1; + method: GrpcServiceMethod; }; - const method = new GrpcServiceMethod( - route, - route, - localFn, - decoder, - encoder - ); - + if (isEventHandler(handler)) { + const theHandler = handler.handler; + registration = this.stringKeyedEventHandler(keyed, route, theHandler); + } else { + registration = this.rpcHandler(keyed, route, handler); + } + serviceGrpcSpec.method.push(registration.descriptor); const url = `/invoke/${name}/${route}`; this.methods[url] = new HostedGrpcServiceMethod( {}, // we don't actually execute on any class instance servicePackage, serviceName, - method + registration.method ) as HostedGrpcServiceMethod; rlog.info( - `Registering: ${url} -> ${JSON.stringify(method, null, "\t")}` + `Registering: ${url} -> ${JSON.stringify( + registration.method, + null, + "\t" + )}` ); } @@ -376,6 +445,25 @@ async function dispatchUnkeyedRpcHandler( return RpcResponse.create({ response: result }); } +async function dispatchKeyedEventHandler( + origCtx: RestateContext, + req: StringKeyedEvent, + handler: Function +): Promise { + const ctx = new RpcContextImpl(origCtx); + const key = req.key; + if (typeof key !== "string" || key.length === 0) { + // we throw a terminal error here, because this cannot be patched by updating code: + // if the request is wrong (missing a key), the request can never make it + throw new TerminalError( + "Keyed handlers must recieve a non null or empty string key" + ); + } + const jsEvent = new Event(key, req.payload, req.source, req.attributes); + await handler(ctx, jsEvent); + return Empty.create({}); +} + function copyProtoMetadata( original: RpcServiceProtoMetadata ): RpcServiceProtoMetadata { @@ -458,4 +546,14 @@ function createRpcMethodDescriptor(methodName: string): MethodDescriptorProto1 { return desc; } +function createStringKeyedMethodDescriptor( + methodName: string +): MethodDescriptorProto1 { + const desc = { + ...rpcServiceProtoMetadata.fileDescriptor.service[0].method[1], + } as MethodDescriptorProto1; + desc.name = methodName; + return desc; +} + const dynrpcDescriptor = copyProtoMetadata(rpcServiceProtoMetadata); diff --git a/src/types/router.ts b/src/types/router.ts index 20bbc066..f1146847 100644 --- a/src/types/router.ts +++ b/src/types/router.ts @@ -12,6 +12,7 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import { RpcContext } from "../restate_context"; +import { Event } from "../types/types"; // ----------- generics ------------------------------------------------------- @@ -25,11 +26,13 @@ type WithoutRpcContext = F extends ( : never; export type Client = { - [K in keyof M]: M[K]; + [K in keyof M as M[K] extends never ? never : K]: M[K]; }; export type SendClient = { - [K in keyof M]: M[K] extends (...args: infer P) => any + [K in keyof M as M[K] extends never ? never : K]: M[K] extends ( + ...args: infer P + ) => any ? (...args: P) => void : never; }; @@ -68,11 +71,15 @@ export type KeyedHandler = F extends (ctx: RpcContext) => Promise : never; export type KeyedRouterOpts = { - [K in keyof U]: U[K] extends KeyedHandler ? U[K] : never; + [K in keyof U]: U[K] extends KeyedHandler | KeyedEventHandler + ? U[K] + : never; }; export type KeyedRouter = { - [K in keyof U]: U[K] extends KeyedHandler + [K in keyof U]: U[K] extends KeyedEventHandler + ? never + : U[K] extends KeyedHandler ? WithKeyArgument> : never; }; @@ -83,3 +90,24 @@ export const keyedRouter = (opts: KeyedRouterOpts): KeyedRouter => { } return opts as KeyedRouter; }; + +// ----------- event handlers ---------------------------------------------- + +export type KeyedEventHandler = U extends ( + ctx: RpcContext, + event: Event +) => Promise + ? U + : never; + +export const keyedEventHandler = (handler: KeyedEventHandler): H => { + return { eventHandler: true, handler: handler } as H; +}; + +export const isEventHandler = ( + handler: any +): handler is { + handler: (ctx: RpcContext, event: Event) => Promise; +} => { + return typeof handler === "object" && handler["eventHandler"]; +}; diff --git a/src/types/types.ts b/src/types/types.ts index 29a46d6d..f1473910 100644 --- a/src/types/types.ts +++ b/src/types/types.ts @@ -120,3 +120,20 @@ export class Header { return res; } } + +export class Event { + constructor( + readonly key: string, + readonly payload: Buffer, + readonly source: string, + readonly attributes: Record + ) {} + + public json(): T { + return JSON.parse(this.payload.toString("utf-8")) as T; + } + + public body(): Uint8Array { + return this.payload; + } +}