Skip to content

Commit

Permalink
Update channel handling
Browse files Browse the repository at this point in the history
  • Loading branch information
parzival418 committed Nov 22, 2024
1 parent e72525e commit ec697d3
Showing 1 changed file with 38 additions and 22 deletions.
60 changes: 38 additions & 22 deletions packages/server/agents/src/lib/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,42 @@ export class Agent
this.initializationPromise = this.initialize()

this.logger.info('New agent created: %s | %s', this.name, this.id)

// Set up global event routing to channels
this.on('messageReceived', (data: ActionPayload) => {
const channelId = (data as any)?.channel
if (channelId && this.channels.has(channelId)) {
this.channels.get(channelId)!.emit('messageReceived', data)
}
})

this.on('message', (data: EventPayload) => {
const channelId = (data as any)?.channel
if (channelId && this.channels.has(channelId)) {
this.channels.get(channelId)!.emit('message', data)
}
})

this.on('messageStream', (data: ActionPayload) => {
const channelId = (data as any)?.channel
if (channelId && this.channels.has(channelId)) {
this.channels.get(channelId)!.emit('messageStream', data)
}
})

this.on('eventComplete', (data: EventPayload | null) => {
const channelId = (data as any)?.channel
if (channelId && this.channels.has(channelId)) {
this.channels.get(channelId)!.emit('eventComplete', data)
}
})

this.on('error', (data: ActionPayload) => {
const channelId = (data as any)?.channel
if (channelId && this.channels.has(channelId)) {
this.channels.get(channelId)!.emit('error', data)
}
})
}

async initialize() {
Expand Down Expand Up @@ -487,37 +523,17 @@ export class Agent
channel(channelId: string): Channel {
if (!this.channels.has(channelId)) {
const channel = new Channel(channelId, this)

// Forward channel events to global agent events
const forwardEvent = <K extends keyof ChannelEvents>(
event: K,
data: Parameters<ChannelEvents[K]>[0]
) => {
super.emit(event, ...([data] as Parameters<AgentEvents[K]>))
}

// Set up event forwarding
channel.on('message', data => forwardEvent('message', data))
channel.on('messageReceived', data =>
forwardEvent('messageReceived', data)
)
channel.on('messageStream', data => forwardEvent('messageStream', data))
channel.on('eventComplete', data => forwardEvent('eventComplete', data))
channel.on('error', data => forwardEvent('error', data))

this.channels.set(channelId, channel)
}
return this.channels.get(channelId)!
}

// Simplified emit - we don't need to forward channel events back to global
// since we're now forwarding global events to channels
emit<K extends keyof AgentEvents>(
event: K,
data: Parameters<AgentEvents[K]>[0]
): boolean {
const channelId = (data as any)?.channel
if (channelId) {
return this.channel(channelId).emit(event, data)
}
return super.emit(event, ...([data] as Parameters<AgentEvents[K]>))
}
}
Expand Down

0 comments on commit ec697d3

Please sign in to comment.