Skip to content

Commit

Permalink
Implement a lot more virutal threads
Browse files Browse the repository at this point in the history
  • Loading branch information
duncte123 committed Dec 13, 2023
1 parent e39e413 commit 1409a8a
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 114 deletions.
54 changes: 28 additions & 26 deletions bot/src/main/java/ml/duncte123/skybot/CommandManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.util.stream.Collectors;

import static me.duncte123.botcommons.messaging.MessageUtils.sendMsg;
import static ml.duncte123.skybot.utils.ThreadUtils.runOnVirtual;
import static ml.duncte123.skybot.utils.AirUtils.setJDAContext;
import static net.dv8tion.jda.api.requests.ErrorResponse.MISSING_ACCESS;
import static net.dv8tion.jda.api.requests.ErrorResponse.UNKNOWN_CHANNEL;
Expand All @@ -98,11 +99,6 @@ public class CommandManager {
private static final TObjectLongMap<String> COOLDOWNS = MapUtils.newObjectLongMap();
private static final Logger LOGGER = LoggerFactory.getLogger(CommandManager.class);
private static final Pattern COMMAND_PATTERN = Pattern.compile("([^\"]\\S*|\".+?\")\\s*");
private static final ScheduledExecutorService COOLDOWN_THREAD = Executors.newSingleThreadScheduledExecutor((r) -> {
final Thread thread = new Thread(r, "Command-cooldown-thread");
thread.setDaemon(true);
return thread;
});
private final ExecutorService commandThread = Executors.newThreadPerTaskExecutor(
(r) -> Thread.ofVirtual().name("Command-execute-thread").start(r)
);
Expand All @@ -112,26 +108,31 @@ public class CommandManager {
private final Variables variables;

static {
COOLDOWN_THREAD.scheduleWithFixedDelay(() -> {
try {
// Loop over all cooldowns with a 5 minute interval
// This makes sure that we don't have any useless cooldowns in the system hogging up memory
COOLDOWNS.forEachEntry((key, val) -> {
final long remaining = calcTimeRemaining(val);

// Remove the value from the cooldowns if it is less or equal to 0
if (remaining <= 0) {
COOLDOWNS.remove(key);
}

// Return true to indicate that we are allowed to continue the loop
return true;
});
}
catch (Exception e) {
e.printStackTrace();
}
}, 5, 5, TimeUnit.MINUTES);
SkyBot.SYSTEM_POOL.scheduleWithFixedDelay(
() -> runOnVirtual(CommandManager::handleCooldowns),
5, 5, TimeUnit.MINUTES
);
}

private static void handleCooldowns() {
try {
// Loop over all cooldowns with a 5 minute interval
// This makes sure that we don't have any useless cooldowns in the system hogging up memory
COOLDOWNS.forEachEntry((key, val) -> {
final long remaining = calcTimeRemaining(val);

// Remove the value from the cooldowns if it is less or equal to 0
if (remaining <= 0) {
COOLDOWNS.remove(key);
}

// Return true to indicate that we are allowed to continue the loop
return true;
});
}
catch (Exception e) {
LOGGER.error("Parsing cooldowns failed", e);
}
}

public CommandManager(Variables variables) {
Expand Down Expand Up @@ -763,7 +764,8 @@ public void executeSlashCommand(SlashCommandInteractionEvent event) {
if (command != null) {
command.handleEvent(event, variables);
}
} catch (Exception e) {
}
catch (Exception e) {
e.printStackTrace();
}
}
Expand Down
11 changes: 2 additions & 9 deletions bot/src/main/java/ml/duncte123/skybot/ShardWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static gnu.trove.impl.Constants.DEFAULT_CAPACITY;
import static gnu.trove.impl.Constants.DEFAULT_LOAD_FACTOR;
import static ml.duncte123.skybot.utils.ThreadUtils.runOnVirtual;
import static ml.duncte123.skybot.utils.AirUtils.setJDAContext;

public class ShardWatcher implements EventListener {
Expand All @@ -48,13 +47,7 @@ public class ShardWatcher implements EventListener {
-1, -1
);

@SuppressWarnings("PMD.CloseResource")
final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor((r) -> {
final Thread thread = new Thread(r, "Shard-watcher");
thread.setDaemon(true);
return thread;
});
service.scheduleAtFixedRate(this::checkShards, 5, 5, TimeUnit.MINUTES);
SkyBot.SYSTEM_POOL.scheduleAtFixedRate(() -> runOnVirtual("Shard-watcher", this::checkShards), 5, 5, TimeUnit.MINUTES);
}

@Override
Expand Down
16 changes: 15 additions & 1 deletion bot/src/main/java/ml/duncte123/skybot/SkyBot.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@

import javax.security.auth.login.LoginException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static net.dv8tion.jda.api.exceptions.ErrorResponseException.ignore;
Expand All @@ -54,6 +56,12 @@
import static net.dv8tion.jda.api.utils.MemberCachePolicy.PENDING;

public final class SkyBot {
public static ScheduledExecutorService SYSTEM_POOL = Executors.newSingleThreadScheduledExecutor((r) -> {
final Thread t = new Thread(r, "System Pool");
t.setDaemon(true);
return t;
});

private static SkyBot instance;
private static final MemberCachePolicy PATRON_POLICY = (member) -> {
// Member needs to be cached for JDA to fire role update event
Expand Down Expand Up @@ -100,6 +108,8 @@ private SkyBot() throws LoginException {
logger.info("{} commands with {} aliases loaded.", commandCount.getFirst(), commandCount.getSecond());
LavalinkManager.INS.start(config, variables.getAudioUtils());

final var jdaVirtualPool = Executors.newVirtualThreadPerTaskExecutor();

final EventManager eventManager = new EventManager(variables);
// Build our shard manager
final DefaultShardManagerBuilder builder = DefaultShardManagerBuilder.create(
Expand Down Expand Up @@ -130,7 +140,11 @@ private SkyBot() throws LoginException {
// Can't enable CLIENT_STATUS because we don't have GatewayIntent.GUILD_PRESENCES
// (is it worth it to enable it for one command?)
.disableCache(CacheFlag.ACTIVITY, CacheFlag.CLIENT_STATUS, CacheFlag.ONLINE_STATUS, CacheFlag.SCHEDULED_EVENTS)
.setGatewayEncoding(GatewayEncoding.ETF);
.setGatewayEncoding(GatewayEncoding.ETF)
// Configure JDA to use virtual threads/project loom
.setCallbackPool(jdaVirtualPool, true)
.setEventPool(jdaVirtualPool, true)
.setRateLimitElastic(jdaVirtualPool, true);

// If lavalink is enabled we will hook it into jda
if (LavalinkManager.INS.isEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import ml.duncte123.skybot.entities.jda.DunctebotGuild;
import ml.duncte123.skybot.objects.command.CommandCategory;
import ml.duncte123.skybot.objects.command.CommandContext;
import ml.duncte123.skybot.objects.command.ICommand;
import ml.duncte123.skybot.objects.command.CustomCommand;
import ml.duncte123.skybot.objects.command.ICommand;
import ml.duncte123.skybot.objects.discord.MessageData;
import ml.duncte123.skybot.objects.user.UnknownUser;
import ml.duncte123.skybot.utils.GuildSettingsUtils;
Expand All @@ -45,7 +45,10 @@
import net.dv8tion.jda.api.EmbedBuilder;
import net.dv8tion.jda.api.JDA;
import net.dv8tion.jda.api.Permission;
import net.dv8tion.jda.api.entities.*;
import net.dv8tion.jda.api.entities.Guild;
import net.dv8tion.jda.api.entities.Member;
import net.dv8tion.jda.api.entities.Message;
import net.dv8tion.jda.api.entities.User;
import net.dv8tion.jda.api.entities.channel.ChannelType;
import net.dv8tion.jda.api.entities.channel.middleman.MessageChannel;
import net.dv8tion.jda.api.entities.channel.unions.MessageChannelUnion;
Expand All @@ -63,8 +66,6 @@
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Matcher;
Expand All @@ -85,8 +86,6 @@ public abstract class MessageListener extends BaseListener {
protected final CommandManager commandManager = variables.getCommandManager();
private static final String PROFANITY_DISABLE = "--no-filter";
protected final SpamFilter spamFilter = new SpamFilter(variables);
protected final ScheduledExecutorService systemPool = Executors.newSingleThreadScheduledExecutor(
(r) -> new Thread(r, "Bot-Service-Thread"));

protected MessageListener(Variables variables) {
super(variables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static ml.duncte123.skybot.utils.ThreadUtils.runOnVirtual;

public class ReadyShutdownListener extends MessageListener {
// Using an atomic boolean because multiple shards are writing to it
private final AtomicBoolean arePoolsRunning = new AtomicBoolean(false);
Expand Down Expand Up @@ -83,7 +85,7 @@ private void onReady(ReadyEvent event) {
//Start the timers if they have not been started yet
if (!arePoolsRunning.get()) {
LOGGER.info("Starting spam-cache-cleaner!");
systemPool.scheduleAtFixedRate(spamFilter::clearMessages, 20, 13, TimeUnit.SECONDS);
SkyBot.SYSTEM_POOL.scheduleAtFixedRate(() -> runOnVirtual(spamFilter::clearMessages), 20, 13, TimeUnit.SECONDS);

if (
"psql".equals(this.variables.getConfig().useDatabase) ||
Expand Down Expand Up @@ -129,7 +131,7 @@ private void onShutdown() {
LOGGER.info("Music system shutdown");

// Kill all threads
this.systemPool.shutdown();
SkyBot.SYSTEM_POOL.shutdown();
LOGGER.info("System pool shutdown");

// kill the websocket
Expand Down
28 changes: 13 additions & 15 deletions bot/src/main/kotlin/ml/duncte123/skybot/ReactionHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,23 @@

package ml.duncte123.skybot

import me.duncte123.botcommons.messaging.MessageUtils.*
import me.duncte123.botcommons.messaging.MessageUtils.sendErrorWithMessage
import me.duncte123.botcommons.messaging.MessageUtils.sendMsg
import ml.duncte123.skybot.objects.Emotes.SEARCH_EMOTE
import ml.duncte123.skybot.objects.command.CommandContext
import ml.duncte123.skybot.utils.ThreadUtils.runOnVirtual
import net.dv8tion.jda.api.entities.Message
import net.dv8tion.jda.api.entities.channel.unions.MessageChannelUnion
import net.dv8tion.jda.api.events.GenericEvent
import net.dv8tion.jda.api.events.interaction.component.StringSelectInteractionEvent
import net.dv8tion.jda.api.exceptions.ErrorResponseException.ignore
import net.dv8tion.jda.api.hooks.EventListener
import net.dv8tion.jda.api.requests.ErrorResponse.UNKNOWN_MESSAGE
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

class ReactionHandler : EventListener {
private val requirementsCache = arrayListOf<ReactionCacheElement>()
private val consumerCache = hashMapOf<String, CommandContext>()
private val executor = Executors.newScheduledThreadPool(2) { r ->
val t = Thread(r, "ReactionAwaiter")
t.isDaemon = true
return@newScheduledThreadPool t
}

private fun MessageChannelUnion.editMsg(id: Long, msg: String) = this.asGuildMessageChannel()
.editMessageById(id, msg)
Expand Down Expand Up @@ -98,16 +94,18 @@ class ReactionHandler : EventListener {
requirementsCache.add(cacheElement)
consumerCache[componentId] = ctx.applySentId(userId)

executor.schedule(
SkyBot.SYSTEM_POOL.schedule(
{
try {
if (requirementsCache.contains(cacheElement)) {
requirementsCache.remove(cacheElement)
consumerCache.remove(componentId)
ctx.channel.editMsg(msg.idLong, "$SEARCH_EMOTE Search timed out")
runOnVirtual {
try {
if (requirementsCache.contains(cacheElement)) {
requirementsCache.remove(cacheElement)
consumerCache.remove(componentId)
ctx.channel.editMsg(msg.idLong, "$SEARCH_EMOTE Search timed out")
}
} catch (e: Exception) {
e.printStackTrace()
}
} catch (e: Exception) {
e.printStackTrace()
}
},
timeoutInMillis, TimeUnit.MILLISECONDS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ import me.duncte123.botcommons.web.WebParserUtils
import me.duncte123.botcommons.web.WebUtils
import me.duncte123.botcommons.web.requests.FormRequestBody
import ml.duncte123.skybot.Settings.NO_STATIC
import ml.duncte123.skybot.SkyBot
import ml.duncte123.skybot.objects.command.Command
import ml.duncte123.skybot.objects.command.CommandCategory
import ml.duncte123.skybot.objects.command.CommandContext
import ml.duncte123.skybot.utils.AirUtils
import ml.duncte123.skybot.utils.MapUtils
import net.dv8tion.jda.api.Permission
import net.dv8tion.jda.api.events.message.MessageReceivedEvent
import org.jsoup.Jsoup
import java.io.InputStream
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.MINUTES
Expand All @@ -53,9 +54,6 @@ class ChatCommand : Command() {
"In this server my prefix is *`{PREFIX}`*"
)

// The size should match the usage for stability but not more than 4.
private val cleanupService = Executors.newSingleThreadScheduledExecutor()

init {
this.category = CommandCategory.FUN
this.name = "chat"
Expand All @@ -66,19 +64,9 @@ class ChatCommand : Command() {
Permission.MESSAGE_HISTORY
)

cleanupService.scheduleAtFixedRate(
SkyBot.SYSTEM_POOL.scheduleAtFixedRate(
{
val temp = TLongObjectHashMap(sessions)
val now = Date()
var cleared = 0
for (it in temp.keys()) {
val duration = now.time - sessions.get(it).time.time
if (duration >= maxDuration) {
sessions.remove(it)
cleared++
}
}
LOGGER.debug("Removed $cleared chat sessions that have been inactive for 20 minutes.")
AirUtils.runOnVirtual(this::doCleanup)
},
1L, 1L, TimeUnit.HOURS
)
Expand All @@ -105,7 +93,7 @@ class ChatCommand : Command() {
val time = System.currentTimeMillis()
var message = ctx.argsJoined

message = replaceStuff(event, message)
message = replaceMentionsWithText(event, message)

if (!sessions.containsKey(event.author.idLong)) {
sessions.put(event.author.idLong, ChatSession(event.author.idLong))
Expand Down Expand Up @@ -155,7 +143,7 @@ class ChatCommand : Command() {
return response1
}

private fun replaceStuff(event: MessageReceivedEvent, m: String): String {
private fun replaceMentionsWithText(event: MessageReceivedEvent, m: String): String {
var message = m
val mentions = event.message.mentions

Expand All @@ -174,6 +162,20 @@ class ChatCommand : Command() {
message = message.replace("@here", "here").replace("@everyone", "everyone")
return message
}

private fun doCleanup() {
val temp = TLongObjectHashMap(sessions)
val now = Date()
var cleared = 0
for (it in temp.keys()) {
val duration = now.time - sessions.get(it).time.time
if (duration >= maxDuration) {
sessions.remove(it)
cleared++
}
}
LOGGER.debug("Removed $cleared chat sessions that have been inactive for 20 minutes.")
}
}

class ChatSession(userId: Long) {
Expand Down
Loading

0 comments on commit 1409a8a

Please sign in to comment.