Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import net.essentialsx.discord.util.ConsoleInjector;
import net.essentialsx.discord.util.DiscordUtil;
import net.essentialsx.discord.util.MessageUtil;
import net.essentialsx.discord.util.WrappedWebhookClient;
import net.essentialsx.discord.util.WebhookDispatcher;
import org.bukkit.Bukkit;
import org.bukkit.entity.Player;
import org.bukkit.event.HandlerList;
Expand Down Expand Up @@ -84,11 +84,11 @@ public class JDADiscordService implements DiscordService, IEssentialsModule {
private JDA jda;
private Guild guild;
private TextChannel primaryChannel;
private WrappedWebhookClient consoleWebhook;
private WebhookDispatcher consoleWebhook;
private String lastConsoleId;
private final Map<String, MessageType> registeredTypes = new HashMap<>();
private final Map<MessageType, String> typeToChannelId = new HashMap<>();
private final Map<String, WrappedWebhookClient> channelIdToWebhook = new HashMap<>();
private final Map<String, WebhookDispatcher> channelIdToWebhook = new HashMap<>();
private ConsoleInjector injector;
private DiscordCommandDispatcher commandDispatcher;
private InteractionControllerImpl interactionController;
Expand Down Expand Up @@ -145,11 +145,11 @@ public void sendMessage(DiscordMessageEvent event, String message, boolean group

final String webhookChannelId = typeToChannelId.get(event.getType());
if (webhookChannelId != null) {
final WrappedWebhookClient client = channelIdToWebhook.get(webhookChannelId);
if (client != null) {
final WebhookDispatcher dispatcher = channelIdToWebhook.get(webhookChannelId);
if (dispatcher != null) {
final String avatarUrl = event.getAvatarUrl() != null ? event.getAvatarUrl() : jda.getSelfUser().getAvatarUrl();
final String name = event.getName() != null ? event.getName() : guild.getSelfMember().getEffectiveName();
client.send(getWebhookMessage(strippedContent, avatarUrl, name, groupMentions));
dispatcher.send(getWebhookMessage(strippedContent, avatarUrl, name, groupMentions));
return;
}
}
Expand All @@ -160,7 +160,7 @@ public void sendMessage(DiscordMessageEvent event, String message, boolean group
}
channel.sendMessage(strippedContent)
.setAllowedMentions(groupMentions ? null : DiscordUtil.NO_GROUP_MENTIONS)
.queue();
.queue(null, error -> logger.log(Level.WARNING, "Failed to send message to channel " + channel.getName(), error));
}

public void startup() throws LoginException, InterruptedException {
Expand Down Expand Up @@ -229,7 +229,7 @@ public void startup() throws LoginException, InterruptedException {
}

// Load emotes into cache, JDA will handle updates from here on out.
guild.retrieveEmojis().queue();
guild.retrieveEmojis().queue(null, error -> logger.log(Level.WARNING, "Failed to retrieve emojis from guild", error));

updatePrimaryChannel();

Expand Down Expand Up @@ -390,8 +390,8 @@ public void updatePresence() {

public void updateTypesRelay() {
if (!getSettings().isShowAvatar() && !getSettings().isCustomBotName()) {
for (WrappedWebhookClient webhook : channelIdToWebhook.values()) {
webhook.close();
for (WebhookDispatcher dispatcher : channelIdToWebhook.values()) {
dispatcher.close();
}
typeToChannelId.clear();
channelIdToWebhook.clear();
Expand All @@ -410,14 +410,14 @@ public void updateTypesRelay() {

final Webhook webhook = DiscordUtil.getOrCreateWebhook(channel, DiscordUtil.ADVANCED_RELAY_NAME).join();
if (webhook == null) {
final WrappedWebhookClient current = channelIdToWebhook.remove(channel.getId());
final WebhookDispatcher current = channelIdToWebhook.remove(channel.getId());
if (current != null) {
current.close();
}
continue;
}
typeToChannelId.put(type, channel.getId());
channelIdToWebhook.put(channel.getId(), DiscordUtil.getWebhookClient(webhook.getIdLong(), webhook.getToken(), jda.getHttpClient()));
channelIdToWebhook.put(channel.getId(), new WebhookDispatcher(DiscordUtil.getWebhookClient(webhook.getIdLong(), webhook.getToken(), jda.getHttpClient())));
}
}

Expand Down Expand Up @@ -471,7 +471,7 @@ public void updateConsoleRelay() {
}

shutdownConsoleRelay(false);
consoleWebhook = DiscordUtil.getWebhookClient(webhookId, webhookToken, jda.getHttpClient());
consoleWebhook = new WebhookDispatcher(DiscordUtil.getWebhookClient(webhookId, webhookToken, jda.getHttpClient()), 50);
if (injector == null || injector.isRemoved()) {
injector = new ConsoleInjector(this);
injector.start();
Expand Down Expand Up @@ -510,8 +510,8 @@ public void shutdown() {

shutdownConsoleRelay(true);

for (WrappedWebhookClient webhook : channelIdToWebhook.values()) {
webhook.close();
for (WebhookDispatcher dispatcher : channelIdToWebhook.values()) {
dispatcher.close();
}

// Unregister leftover jda listeners
Expand Down Expand Up @@ -583,7 +583,10 @@ public CompletableFuture<Void> modifyMemberRoles(InteractionMember member, Colle
}

final CompletableFuture<Void> future = new CompletableFuture<>();
guild.modifyMemberRoles(((InteractionMemberImpl) member).getJdaObject(), add, remove).queue(future::complete);
guild.modifyMemberRoles(((InteractionMemberImpl) member).getJdaObject(), add, remove).queue(future::complete, error -> {
logger.log(Level.WARNING, "Failed to modify member roles", error);
future.complete(null);
});
return future;
}

Expand Down Expand Up @@ -613,7 +616,7 @@ public DiscordSettings getSettings() {
return plugin.getSettings();
}

public WrappedWebhookClient getConsoleWebhook() {
public WebhookDispatcher getConsoleWebhook() {
return consoleWebhook;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
import org.bukkit.Bukkit;

import java.time.Instant;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;

import static com.earth2me.essentials.I18n.tlLiteral;
Expand All @@ -27,40 +25,18 @@ public class ConsoleInjector extends AbstractAppender {
private final static java.util.logging.Logger logger = EssentialsDiscord.getWrappedLogger();

private final static long QUEUE_PROCESS_PERIOD_SECONDS = 2;
private final static int QUEUE_CAPACITY = 500;

private final JDADiscordService jda;
private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
private final int taskId;
private boolean removed = false;

private final AtomicLong lastRateLimitTime = new AtomicLong(0);
private final AtomicInteger recentRateLimit = new AtomicInteger(0);
private final AtomicInteger totalBackoffEvents = new AtomicInteger();

public ConsoleInjector(JDADiscordService jda) {
super("EssentialsX-ConsoleInjector", null, null, false);
this.jda = jda;
((Logger) LogManager.getRootLogger()).addAppender(this);
taskId = Bukkit.getScheduler().runTaskTimerAsynchronously(jda.getPlugin(), () -> {
// Check to see if we're supposed to be backing off, preform backoff if the case.
if (recentRateLimit.get() < 0) {
if (totalBackoffEvents.get() * 20 >= jda.getSettings().getConsoleSkipDelay() * 60) {
logger.warning("EssXBackoff: Reached console skip delay, attempt to skip");
jda.getConsoleWebhook().abandonRequests();
messageQueue.clear();
totalBackoffEvents.set(0);
recentRateLimit.set(0);
lastRateLimitTime.set(0);
return;
}

final int backoff = recentRateLimit.incrementAndGet();
if (jda.isDebug()) {
logger.warning("EssXBackoff: Webhook backoff in progress, skipping queue processing. Resuming in " + Math.abs(backoff) + " cycles.");
}
return;
}

final StringBuilder buffer = new StringBuilder();
String curLine;
while ((curLine = messageQueue.peek()) != null) {
Expand All @@ -78,7 +54,11 @@ public ConsoleInjector(JDADiscordService jda) {
}

private void sendMessage(String content) {
jda.getConsoleWebhook().send(jda.getWebhookMessage(content)).exceptionally(e -> {
final WebhookDispatcher webhook = jda.getConsoleWebhook();
if (webhook == null || webhook.isShutdown()) {
return;
}
webhook.send(jda.getWebhookMessage(content)).exceptionally(e -> {
logger.severe(tlLiteral("discordErrorWebhook"));
remove();
return null;
Expand All @@ -97,40 +77,13 @@ public void append(LogEvent event) {
return;
}

if (entry.startsWith("EssXBackoff: ")) {
return;
}

if (event.getLoggerName().contains("club.minnced.discord.webhook.WebhookClient") && entry.startsWith("Encountered 429, retrying after ")) {
if (recentRateLimit.get() >= 0) {
recentRateLimit.incrementAndGet();
}

if (lastRateLimitTime.get() == 0 || System.currentTimeMillis() - lastRateLimitTime.get() > 5000) {
lastRateLimitTime.set(System.currentTimeMillis());

// A negative value would mean the timer is current preforming a backoff, don't stop it.
if (recentRateLimit.get() >= 0) {
recentRateLimit.set(0);
}
} else if (recentRateLimit.get() >= 2) {
// Start the webhook backoff, defaulting to 20s, which should reset our bucket.
if (jda.isDebug()) {
totalBackoffEvents.getAndIncrement();
logger.warning("EssXBackoff: Beginning Webhook Backoff");
}
recentRateLimit.set(-20);
}
return;
}

final String[] loggerNameSplit = event.getLoggerName().split("\\.");
final String loggerName = loggerNameSplit[loggerNameSplit.length - 1].trim();

if (!loggerName.isEmpty()) {
entry = "[" + loggerName + "] " + entry;
}

if (!jda.getSettings().getConsoleFilters().isEmpty()) {
for (final Pattern pattern : jda.getSettings().getConsoleFilters()) {
if (pattern.matcher(entry).find()) {
Expand All @@ -139,11 +92,18 @@ public void append(LogEvent event) {
}
}

messageQueue.addAll(Splitter.fixedLength(Message.MAX_CONTENT_LENGTH - 50).splitToList(
for (final String line : Splitter.fixedLength(Message.MAX_CONTENT_LENGTH - 50).splitToList(
MessageUtil.formatMessage(jda.getSettings().getConsoleFormat(),
TimeFormat.TIME_LONG.format(Instant.now()),
event.getLevel().name(),
MessageUtil.sanitizeDiscordMarkdown(entry))));
MessageUtil.sanitizeDiscordMarkdown(entry)))) {

if (!messageQueue.offer(line)) {
if (jda.isDebug()) {
logger.fine("Console relay queue full, dropping message.");
}
}
}
}

public void remove() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import net.dv8tion.jda.api.entities.channel.concrete.TextChannel;
import net.essentialsx.api.v2.events.discord.DiscordMessageEvent;
import net.essentialsx.api.v2.services.discord.MessageType;
import net.essentialsx.discord.EssentialsDiscord;
import net.essentialsx.discord.JDADiscordService;
import okhttp3.OkHttpClient;
import org.bukkit.Bukkit;
Expand All @@ -26,8 +27,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class DiscordUtil {
private static final Logger logger = EssentialsDiscord.getWrappedLogger();
public final static String ADVANCED_RELAY_NAME = "EssX Advanced Relay";
public final static String CONSOLE_RELAY_NAME = "EssX Console Relay";
public final static List<Message.MentionType> NO_GROUP_MENTIONS;
Expand Down Expand Up @@ -63,7 +67,6 @@ public static WrappedWebhookClient getWebhookClient(long id, String token, OkHtt
*
* @param channel The channel to search for/create webhooks in.
* @param webhookName The name of the webhook to search for/create.
*
* @return A future which completes with the webhook by the given name in the given channel, or null
* if the bot lacks the proper permissions.
*/
Expand All @@ -82,7 +85,7 @@ public static CompletableFuture<Webhook> getOrCreateWebhook(final TextChannel ch
}
}
createWebhook(channel, webhookName).thenAccept(future::complete);
});
}, error -> logger.log(Level.WARNING, "Failed to retrieve webhooks from channel " + channel.getName(), error));
return future;
}

Expand All @@ -101,17 +104,17 @@ private static void cleanWebhooks(final Guild guild, String webhookName) {
guild.retrieveWebhooks().queue(webhooks -> {
for (final Webhook webhook : webhooks) {
if (webhook.getName().equalsIgnoreCase(webhookName) && !ACTIVE_WEBHOOKS.contains(webhook.getId())) {
webhook.delete().reason("EssentialsX Discord: webhook cleanup").queue();
webhook.delete().reason("EssentialsX Discord: webhook cleanup").queue(null, error -> logger.log(Level.WARNING, "Failed to delete webhook " + webhook.getName(), error));
}
}
});
}, error -> logger.log(Level.WARNING, "Failed to retrieve webhooks from guild " + guild.getName(), error));
}

/**
* Creates a webhook with the given name in the given channel.
*
* @param channel The channel to search for webhooks in.
* @param webhookName The name of the webhook to look for.
* @param channel The channel to search for webhooks in.
* @param webhookName The name of the webhook to look for.
* @return A future which completes with the webhook by the given name in the given channel or null if no permissions.
*/
public static CompletableFuture<Webhook> createWebhook(TextChannel channel, String webhookName) {
Expand All @@ -123,7 +126,7 @@ public static CompletableFuture<Webhook> createWebhook(TextChannel channel, Stri
channel.createWebhook(webhookName).queue(webhook -> {
future.complete(webhook);
ACTIVE_WEBHOOKS.addIfAbsent(webhook.getId());
});
}, error -> logger.log(Level.WARNING, "Failed to create webhook " + webhookName + " in channel " + channel.getName(), error));
return future;
}

Expand Down
Loading
Loading