forked from mamoe/mirai
-
Notifications
You must be signed in to change notification settings - Fork 0
/
EventChannel.kt
546 lines (503 loc) · 22.8 KB
/
EventChannel.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
/*
* Copyright 2019-2023 Mamoe Technologies and contributors.
*
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
* Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
*
* https://github.com/mamoe/mirai/blob/dev/LICENSE
*/
@file:JvmMultifileClass
@file:JvmName("EventChannelKt")
package net.mamoe.mirai.event
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.sync.Mutex
import net.mamoe.mirai.Bot
import net.mamoe.mirai.IMirai
import net.mamoe.mirai.event.ConcurrencyKind.CONCURRENT
import net.mamoe.mirai.event.ConcurrencyKind.LOCKED
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.utils.MiraiInternalApi
import net.mamoe.mirai.utils.NotStableForInheritance
import net.mamoe.mirai.utils.context
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.jvm.JvmMultifileClass
import kotlin.jvm.JvmName
import kotlin.jvm.JvmSynthetic
import kotlin.reflect.KClass
/**
* 事件通道.
*
* 事件通道是监听事件的入口, 但不负责广播事件. 要广播事件, 使用 [Event.broadcast] 或 [IMirai.broadcastEvent].
*
* ## 获取事件通道
*
* [EventChannel] 不可自行构造, 只能通过 [GlobalEventChannel], [BotEvent], 或基于一个通道的过滤等操作获得.
*
* ### 全局事件通道
*
* [GlobalEventChannel] 是单例对象, 表示全局事件通道, 可以获取到在其中广播的所有事件.
*
* ### [BotEvent] 事件通道
*
* 若只需要监听某个 [Bot] 的事件, 可通过 [Bot.eventChannel] 获取到这样的 [EventChannel].
*
* ## 通道操作
*
* ### 对通道的操作
* - 过滤通道: 通过 [EventChannel.filter]. 例如 `filter { it is BotEvent }` 得到一个只能监听到 [BotEvent] 的事件通道.
* - 转换为 Kotlin 协程 [Channel]: [EventChannel.forwardToChannel]
* - 添加 [CoroutineContext]: [context], [parentJob], [parentScope], [exceptionHandler]
*
* ### 创建事件监听
* - [EventChannel.subscribe] 创建带条件的一个事件监听器.
* - [EventChannel.subscribeAlways] 创建一个总是监听事件的事件监听器.
* - [EventChannel.subscribeOnce] 创建一个只监听单次的事件监听器.
*
* ### 监听器生命周期
*
* 阅读 [EventChannel.subscribe] 以获取监听器生命周期相关信息.
*
* ## 与 kotlinx-coroutines 交互
*
* mirai [EventChannel] 设计比 kotlinx-coroutines 的 [Flow] 稳定版更早.
* [EventChannel] 的功能与 [Flow] 类似, 不过 [EventChannel] 在 [subscribe] (类似 [Flow.collect]) 时有优先级判定, 也允许[拦截][Event.intercept].
*
* ### 通过 [Flow] 接收事件
*
* 使用 [EventChannel.asFlow] 获得 [Flow], 然后可使用 [Flow.collect] 等操作.
*
* ### 转发事件到 [SendChannel]
*
* 使用 [EventChannel.forwardToChannel] 可将事件转发到指定 [SendChannel].
*/
@NotStableForInheritance // since 2.12, before it was `final class`.
public expect abstract class EventChannel<out BaseEvent : Event> @MiraiInternalApi public constructor(
baseEventClass: KClass<out BaseEvent>,
defaultCoroutineContext: CoroutineContext,
) {
/**
* 此事件通道的默认 [CoroutineScope.coroutineContext]. 将会被添加给所有注册的事件监听器.
*/
public val defaultCoroutineContext: CoroutineContext
public val baseEventClass: KClass<out BaseEvent>
/**
* 创建事件监听并将监听结果转发到 [channel]. 当 [Channel.send] 抛出 [ClosedSendChannelException] 时停止 [Listener] 监听和转发.
*
* 返回创建的会转发监听到的所有事件到 [channel] 的[事件监听器][Listener]. [停止][Listener.complete] 该监听器会停止转发, 不会影响目标 [channel].
*
* 若 [Channel.send] 挂起, 则监听器也会挂起, 也就可能会导致事件广播过程挂起.
*
* 示例:
*
* ```
* val eventChannel: EventChannel<BotEvent> = ...
* val channel = Channel<BotEvent>() // kotlinx.coroutines.channels.Channel
* eventChannel.forwardToChannel(channel, priority = ...)
*
* // 其他地方
* val event: BotEvent = channel.receive() // 挂起并接收一个事件
* ```
*
* @see subscribeAlways
* @see Channel
* @since 2.10
*/
public fun forwardToChannel(
channel: SendChannel<@UnsafeVariance BaseEvent>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: EventPriority = EventPriority.MONITOR,
): Listener<@UnsafeVariance BaseEvent>
/**
* 通过 [Flow] 接收此通道内的所有事件.
*
* ```
* val eventChannel: EventChannel<BotEvent> = ...
* val flow: Flow<BotEvent> = eventChannel.asFlow()
*
* flow.collect { // it
* //
* }
*
* flow.filterIsInstance<GroupMessageEvent>.collect { // it: GroupMessageEvent
* // 处理事件 ...
* }
*
* flow.filterIsInstance<FriendMessageEvent>.collect { // it: FriendMessageEvent
* // 处理事件 ...
* }
* ```
*
* 类似于 [SharedFlow], [EventChannel.asFlow] 返回的 [Flow] 永远都不会停止. 因此上述示例 [Flow.collect] 永远都不会正常 (以抛出异常之外的) 结束.
*
* 通过 [asFlow] 接收事件相当于通过 [subscribeAlways] 以 [EventPriority.MONITOR] 监听事件.
*
* **注意**: [context], [parentJob] 等控制 [EventChannel.defaultCoroutineContext] 的操作对 [asFlow] 无效. 因为 [asFlow] 并不创建协程.
*
* @see Flow
* @since 2.12
*/
public abstract fun asFlow(): Flow<BaseEvent>
// region transforming operations
/**
* 添加一个过滤器. 过滤器将在收到任何事件之后, 传递给通过 [EventChannel.subscribe] 注册的监听器之前调用.
*
* 若 [filter] 返回 `true`, 该事件将会被传给监听器. 否则将会被忽略, **监听器继续监听**.
*
* ## 线性顺序
* 多个 [filter] 的处理是线性且有顺序的. 若一个 [filter] 已经返回了 `false` (代表忽略这个事件), 则会立即忽略, 而不会传递给后续过滤器.
*
* 示例:
* ```
* GlobalEventChannel // GlobalEventChannel 会收到全局所有事件, 事件类型是 Event
* .filterIsInstance<BotEvent>() // 过滤, 只接受 BotEvent
* .filter { event: BotEvent ->
* // 此时的 event 一定是 BotEvent
* event.bot.id == 123456 // 再过滤 event 的 bot.id
* }
* .subscribeAlways { event: BotEvent ->
* // 现在 event 是 BotEvent, 且 bot.id == 123456
* }
* ```
*
* ## 过滤器挂起
* [filter] 允许挂起协程. **过滤器的挂起将被认为是事件监听器的挂起**.
*
* 过滤器挂起是否会影响事件处理,
* 取决于 [subscribe] 时的 [ConcurrencyKind] 和 [EventPriority].
*
* ## 过滤器异常处理
* 若 [filter] 抛出异常, 将被包装为 [ExceptionInEventChannelFilterException] 并重新抛出.
*
* @see filterIsInstance 过滤指定类型的事件
*/
@JvmSynthetic
public fun filter(filter: suspend (event: BaseEvent) -> Boolean): EventChannel<BaseEvent>
/**
* [EventChannel.filter] 的 Java 版本.
*
* 添加一个过滤器. 过滤器将在收到任何事件之后, 传递给通过 [EventChannel.subscribe] 注册的监听器之前调用.
*
* 若 [filter] 返回 `true`, 该事件将会被传给监听器. 否则将会被忽略, **监听器继续监听**.
*
* ## 线性顺序
* 多个 [filter] 的处理是线性且有顺序的. 若一个 [filter] 已经返回了 `false` (代表忽略这个事件), 则会立即忽略, 而不会传递给后续过滤器.
*
* 示例:
* ```
* GlobalEventChannel // GlobalEventChannel 会收到全局所有事件, 事件类型是 Event
* .filterIsInstance(BotEvent.class) // 过滤, 只接受 BotEvent
* .filter(event ->
* // 此时的 event 一定是 BotEvent
* event.bot.id == 123456 // 再过滤 event 的 bot.id
* )
* .subscribeAlways(event -> {
* // 现在 event 是 BotEvent, 且 bot.id == 123456
* })
* ```
*
* ## 过滤器阻塞
* [filter] 允许阻塞线程. **过滤器的阻塞将被认为是事件监听器的阻塞**.
*
* 过滤器阻塞是否会影响事件处理,
* 取决于 [subscribe] 时的 [ConcurrencyKind] 和 [EventPriority].
*
* ## 过滤器异常处理
* 若 [filter] 抛出异常, 将被包装为 [ExceptionInEventChannelFilterException] 并重新抛出.
*
* @see filterIsInstance 过滤指定类型的事件
*
* @since 2.2
*/
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
@kotlin.internal.LowPriorityInOverloadResolution
public fun filter(filter: (event: BaseEvent) -> Boolean): EventChannel<BaseEvent>
/**
* 过滤事件的类型. 返回一个只包含 [E] 类型事件的 [EventChannel]
* @see filter 获取更多信息
*/
@JvmSynthetic
public inline fun <reified E : Event> filterIsInstance(): EventChannel<E>
/**
* 过滤事件的类型. 返回一个只包含 [E] 类型事件的 [EventChannel]
* @see filter 获取更多信息
*/
public fun <E : Event> filterIsInstance(kClass: KClass<out E>): EventChannel<E>
/**
* 创建一个新的 [EventChannel], 该 [EventChannel] 包含 [`this.coroutineContext`][defaultCoroutineContext] 和添加的 [coroutineContexts].
* [coroutineContexts] 会覆盖 [defaultCoroutineContext] 中的重复元素.
*
* 此操作不会修改 [`this.coroutineContext`][defaultCoroutineContext], 只会创建一个新的 [EventChannel].
*/
public abstract fun context(vararg coroutineContexts: CoroutineContext): EventChannel<BaseEvent>
/**
* 创建一个新的 [EventChannel], 该 [EventChannel] 包含 [this.coroutineContext][defaultCoroutineContext] 和添加的 [coroutineExceptionHandler]
* @see context
*/
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
@kotlin.internal.LowPriorityInOverloadResolution
public fun exceptionHandler(coroutineExceptionHandler: CoroutineExceptionHandler): EventChannel<BaseEvent>
/**
* 创建一个新的 [EventChannel], 该 [EventChannel] 包含 [`this.coroutineContext`][defaultCoroutineContext] 和添加的 [coroutineExceptionHandler]
* @see context
*/
public fun exceptionHandler(coroutineExceptionHandler: (exception: Throwable) -> Unit): EventChannel<BaseEvent>
/**
* 将 [coroutineScope] 作为这个 [EventChannel] 的父作用域.
*
* 实际作用为创建一个新的 [EventChannel],
* 该 [EventChannel] 包含 [`this.coroutineContext`][defaultCoroutineContext] 和添加的 [CoroutineScope.coroutineContext],
* 并以 [CoroutineScope] 中 [Job] (如果有) [作为父 Job][parentJob]
*
* @see parentJob
* @see context
*
* @see CoroutineScope.globalEventChannel `GlobalEventChannel.parentScope()` 的扩展
*/
public fun parentScope(coroutineScope: CoroutineScope): EventChannel<BaseEvent>
/**
* 指定协程父 [Job]. 之后在此 [EventChannel] 下创建的事件监听器都会成为 [job] 的子任务, 当 [job] 被取消时, 所有的事件监听器都会被取消.
*
* 注意: 监听器不会失败 ([Job.cancel]). 监听器处理过程的异常都会被捕获然后交由 [CoroutineExceptionHandler] 处理, 因此 [job] 不会因为子任务监听器的失败而被取消.
*
* @see parentScope
* @see context
*/
public fun parentJob(job: Job): EventChannel<BaseEvent>
// endregion
// region subscribe
/**
* 创建一个事件监听器, 监听事件通道中所有 [E] 及其子类事件.
*
* 每当 [事件广播][Event.broadcast] 时, [handler] 都会被执行.
*
*
* ## 创建监听
* 调用本函数:
* ```
* eventChannel.subscribe<E> { /* 会收到此通道中的所有是 E 的事件 */ }
* ```
*
* ## 生命周期
*
* ### 通过协程作用域管理监听器
* 本函数将会创建一个 [Job], 成为 [parentJob] 中的子任务. 可创建一个 [CoroutineScope] 来管理所有的监听器:
* ```
* val scope = CoroutineScope(SupervisorJob())
*
* val scopedChannel = eventChannel.parentScope(scope) // 将协程作用域 scope 附加到这个 EventChannel
*
* scopedChannel.subscribeAlways<MemberJoinEvent> { /* ... */ } // 启动监听, 监听器协程会作为 scope 的子任务
* scopedChannel.subscribeAlways<MemberMuteEvent> { /* ... */ } // 启动监听, 监听器协程会作为 scope 的子任务
*
* scope.cancel() // 停止了协程作用域, 也就取消了两个监听器
* ```
*
* 这个函数返回 [Listener], 它是一个 [CompletableJob]. 它会成为 [parentJob] 或 [parentScope] 的一个 [子任务][Job]
*
* ### 停止监听
* 如果 [handler] 返回 [ListeningStatus.STOPPED] 监听器将被停止.
*
* 也可以通过 [subscribe] 返回值 [Listener] 的 [Listener.complete]
*
* ## 监听器调度
* 监听器会被创建一个协程任务, 语义上在 [parentScope] 下运行.
* 通过 Kotlin [默认协程调度器][Dispatchers.Default] 在固定的全局共享线程池里执行, 除非有 [coroutineContext] 指定.
*
* 默认在 [handler] 中不能处理阻塞任务. 阻塞任务将会阻塞一个 Kotlin 全局协程调度线程并可能导致严重问题.
* 请通过 `withContext(Dispatchers.IO) { }` 等方法执行阻塞工作.
*
* ## 异常处理
*
* **监听过程抛出的异常是需要尽可能避免的, 因为这将产生不确定性.**
*
* 当参数 [handler] 处理事件抛出异常时, 只会从监听方协程上下文 ([CoroutineContext]) 寻找 [CoroutineExceptionHandler] 处理异常, 即如下顺序:
* 1. 本函数参数 [coroutineContext]
* 2. [EventChannel.defaultCoroutineContext]
* 3. 若以上步骤无法获取 [CoroutineExceptionHandler], 则只会在日志记录异常.
* 因此建议先指定 [CoroutineExceptionHandler] (可通过 [EventChannel.exceptionHandler]) 再监听事件, 或者在监听事件中捕获异常.
*
* 因此, 广播方 ([Event.broadcast]) 不会知晓监听方产生的异常, 其 [Event.broadcast] 过程也不会因监听方产生异常而提前结束.
*
* ***备注***: 在 2.11 以前, 发生上述异常时还会从广播方和有关 [Bot] 协程域获取 [CoroutineExceptionHandler]. 因此行为不稳定而在 2.11 变更为上述过程.
*
* 事件处理时抛出异常不会停止监听器.
*
* 建议在事件处理中 (即 [handler] 里) 处理异常,
* 或在参数 [coroutineContext] 中添加 [CoroutineExceptionHandler], 或通过 [EventChannel.exceptionHandler].
*
* ## 并发安全性
* 基于 [concurrency] 参数, 事件监听器可以被允许并行执行.
*
* - 若 [concurrency] 为 [ConcurrencyKind.CONCURRENT], [handler] 可能被并行调用, 需要保证并发安全.
* - 若 [concurrency] 为 [ConcurrencyKind.LOCKED], [handler] 会被 [Mutex] 限制, 串行异步执行.
*
* ## 衍生监听方法
*
* 这些方法仅 Kotlin 可用.
*
* - [syncFromEvent]: 挂起当前协程, 监听一个事件, 并尝试从这个事件中**获取**一个值
* - [nextEvent]: 挂起当前协程, 直到监听到特定类型事件的广播并通过过滤器, 返回这个事件实例.
*
* @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext].
* @param concurrency 并发类型. 查看 [ConcurrencyKind]
* @param priority 监听优先级,优先级越高越先执行
* @param handler 事件处理器. 在接收到事件时会调用这个处理器. 其返回值意义参考 [ListeningStatus]. 其异常处理参考上文
*
* @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
*
*
* @see selectMessages 以 `when` 的语法 '选择' 即将到来的一条消息.
* @see whileSelectMessages 以 `when` 的语法 '选择' 即将到来的所有消息, 直到不满足筛选结果.
*
* @see subscribeAlways 一直监听
* @see subscribeOnce 只监听一次
*
* @see subscribeMessages 监听消息 DSL
*/
@JvmSynthetic
public inline fun <reified E : Event> subscribe(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: ConcurrencyKind = LOCKED,
priority: EventPriority = EventPriority.NORMAL,
noinline handler: suspend E.(E) -> ListeningStatus,
): Listener<E>
/**
* 与 [subscribe] 的区别是接受 [eventClass] 参数, 而不使用 `reified` 泛型. 通常推荐使用具体化类型参数.
*
* @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
* @see subscribe
*/
@JvmSynthetic
public fun <E : Event> subscribe(
eventClass: KClass<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: ConcurrencyKind = LOCKED,
priority: EventPriority = EventPriority.NORMAL,
handler: suspend E.(E) -> ListeningStatus,
): Listener<E>
/**
* 创建一个事件监听器, 监听事件通道中所有 [E] 及其子类事件.
* 每当 [事件广播][Event.broadcast] 时, [handler] 都会被执行.
*
* 可在任意时候通过 [Listener.complete] 来主动停止监听.
*
* @param concurrency 并发类型默认为 [CONCURRENT]
* @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext]
* @param priority 处理优先级, 优先级高的先执行
*
* @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
*
* @see subscribe 获取更多说明
*/
@JvmSynthetic
public inline fun <reified E : Event> subscribeAlways(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: ConcurrencyKind = CONCURRENT,
priority: EventPriority = EventPriority.NORMAL,
noinline handler: suspend E.(E) -> Unit,
): Listener<E>
/**
* @see subscribe
* @see subscribeAlways
*/
@JvmSynthetic
public fun <E : Event> subscribeAlways(
eventClass: KClass<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
concurrency: ConcurrencyKind = CONCURRENT,
priority: EventPriority = EventPriority.NORMAL,
handler: suspend E.(E) -> Unit,
): Listener<E>
/**
* 创建一个事件监听器, 监听事件通道中所有 [E] 及其子类事件, 只监听一次.
* 当 [事件广播][Event.broadcast] 时, [handler] 会被执行.
*
* 可在任意时候通过 [Listener.complete] 来主动停止监听.
*
* @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext]
* @param priority 处理优先级, 优先级高的先执行
*
* @see subscribe 获取更多说明
*/
@JvmSynthetic
public inline fun <reified E : Event> subscribeOnce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: EventPriority = EventPriority.NORMAL,
noinline handler: suspend E.(E) -> Unit,
): Listener<E>
/**
* @see subscribeOnce
*/
public fun <E : Event> subscribeOnce(
eventClass: KClass<out E>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
priority: EventPriority = EventPriority.NORMAL,
handler: suspend E.(E) -> Unit,
): Listener<E>
// endregion
// region impl
// protected, to hide from users
@MiraiInternalApi
protected abstract fun <E : Event> registerListener(eventClass: KClass<out E>, listener: Listener<E>)
// to overcome visibility issue
internal fun <E : Event> registerListener0(eventClass: KClass<out E>, listener: Listener<E>)
/**
* Creates [Listener] instance using the [listenerBlock] action.
*/
// @Contract("_ -> new") // always creates new instance
@MiraiInternalApi
protected abstract fun <E : Event> createListener(
coroutineContext: CoroutineContext,
concurrencyKind: ConcurrencyKind,
priority: EventPriority,
listenerBlock: suspend (E) -> ListeningStatus,
): Listener<E>
// to overcome visibility issue
internal fun <E : Event> createListener0(
coroutineContext: CoroutineContext,
concurrencyKind: ConcurrencyKind,
priority: EventPriority,
listenerBlock: suspend (E) -> ListeningStatus,
): Listener<E>
// endregion
}
// used by mirai-core
@OptIn(MiraiInternalApi::class)
internal open class FilterEventChannel<BaseEvent : Event>(
private val delegate: EventChannel<BaseEvent>,
private val filter: suspend (event: BaseEvent) -> Boolean,
) : EventChannel<BaseEvent>(delegate.baseEventClass, delegate.defaultCoroutineContext) {
private fun <E : Event> intercept(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus {
return { ev ->
val filterResult = try {
@Suppress("UNCHECKED_CAST")
baseEventClass.isInstance(ev) && filter(ev as BaseEvent)
} catch (e: Throwable) {
if (e is ExceptionInEventChannelFilterException) throw e // wrapped by another filter
throw ExceptionInEventChannelFilterException(ev, this, cause = e)
}
if (filterResult) block.invoke(ev)
else ListeningStatus.LISTENING
}
}
override fun asFlow(): Flow<BaseEvent> = delegate.asFlow().filter(filter)
override fun <E : Event> registerListener(eventClass: KClass<out E>, listener: Listener<E>) {
delegate.registerListener0(eventClass, listener)
}
override fun <E : Event> createListener(
coroutineContext: CoroutineContext,
concurrencyKind: ConcurrencyKind,
priority: EventPriority,
listenerBlock: suspend (E) -> ListeningStatus
): Listener<E> = delegate.createListener0(coroutineContext, concurrencyKind, priority, intercept(listenerBlock))
override fun context(vararg coroutineContexts: CoroutineContext): EventChannel<BaseEvent> {
return delegate.context(*coroutineContexts)
}
}