Skip to content

Commit

Permalink
Refactor codecs, logger, and rtmp client executor. Update spring
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Mar 18, 2024
1 parent 5e896e7 commit c40dfa7
Show file tree
Hide file tree
Showing 29 changed files with 249 additions and 269 deletions.
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.29</version>
<version>1.3.30</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-client</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion client/src/main/java/org/red5/client/Red5Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public final class Red5Client {
/**
* Current server version with revision
*/
public static final String VERSION = "Red5 Client 1.3.29";
public static final String VERSION = "Red5 Client 1.3.30";

/**
* Create a new Red5Client object using the connection local to the current thread A bit of magic that lets you access the red5 scope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.red5.server.stream.consumer.ConnectionConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* Base class for clients (RTMP and RTMPT)
Expand Down Expand Up @@ -907,6 +908,15 @@ public void setProtocol(String protocol) throws Exception {
public void setConnection(RTMPConnection conn) {
this.conn = conn;
this.conn.setHandler(this);
if (conn.getExecutor() == null) {
// setup executor
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setDaemon(true);
executor.setMaxPoolSize(1);
executor.initialize();
conn.setExecutor(executor);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class RTMPClientConnManager implements IConnectionManager<BaseConnection>
private static int executorQueueCapacity = 32;

// whether or not to use the ThreadPoolTaskExecutor for incoming messages
protected static boolean enableTaskExecutor;
protected static boolean enableTaskExecutor = true;

protected static IConnectionManager<BaseConnection> instance;

Expand Down
4 changes: 2 additions & 2 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.29</version>
<version>1.3.30</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-server-common</artifactId>
Expand Down Expand Up @@ -113,7 +113,7 @@
<dependency>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>1.3.29</version>
<version>1.3.30</version>
</dependency> -->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.red5.logging;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -37,7 +36,7 @@ public class LoggingContextSelector implements ContextSelector {

private static final Semaphore lock = new Semaphore(1, true);

private static final ConcurrentMap<String, LoggerContext> contextMap = new ConcurrentHashMap<>(6, 0.9f, 1);
private static final ConcurrentMap<String, LoggerContext> contextMap = new ConcurrentHashMap<>();

private static LoggerContext DEFAULT_CONTEXT;

Expand Down Expand Up @@ -194,9 +193,7 @@ public LoggerContext detachLoggerContext(String contextName) {
}

public List<String> getContextNames() {
List<String> list = new ArrayList<>();
list.addAll(contextMap.keySet());
return list;
return List.copyOf(contextMap.keySet());
}

public void setContextConfigFile(String contextConfigFile) {
Expand Down
66 changes: 30 additions & 36 deletions common/src/main/java/org/red5/logging/Red5LoggerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,20 @@ public class Red5LoggerFactory {

public static boolean DEBUG = true;

// root logger
private static Logger rootLogger;

// context selector
private static ContextSelector contextSelector;

static {
DEBUG = Boolean.valueOf(System.getProperty("logback.debug", "false"));
try {
Logger logger = LoggerFactory.getILoggerFactory().getLogger(Logger.ROOT_LOGGER_NAME);
logger.debug("Red5LoggerFactory instanced by Thread: {}", Thread.currentThread().getName());
rootLogger = LoggerFactory.getILoggerFactory().getLogger(Logger.ROOT_LOGGER_NAME);
rootLogger.debug("Red5LoggerFactory instanced by Thread: {}", Thread.currentThread().getName());
rootLogger.debug("Logging context selector: {} impl: {}", System.getProperty("logback.ContextSelector"), getContextSelector());
// get the context selector here
contextSelector = getContextSelector();
} catch (Throwable t) {
t.printStackTrace();
}
Expand All @@ -43,7 +52,7 @@ public static Logger getLogger(Class<?> clazz) {
if (DEBUG) {
System.out.printf("getLogger for: %s thread: %s%n", clazz.getName(), Thread.currentThread().getName());
ClassLoader cl = Thread.currentThread().getContextClassLoader();
System.out.printf("class loader: %s%n", cl);
rootLogger.debug("Class loader: {}", cl);
// if cl is WebappClassLoader type we can probably get the context from it
//if (cl instanceof WebappClassLoader) {
// getContextName()
Expand All @@ -52,37 +61,22 @@ public static Logger getLogger(Class<?> clazz) {
Logger logger = null;
if (useLogback) {
// determine the red5 app name or servlet context name
String contextName = CoreConstants.DEFAULT_CONTEXT_NAME;
final String threadName = Thread.currentThread().getName();
// route the Launcher entries to the correct context
String[] parts = Thread.currentThread().getName().split("Loader:/");
if (parts.length > 1) {
contextName = parts[1];
if (threadName.startsWith("Loader:/")) {
String contextName = threadName.split("Loader:/")[1];
logger = getLogger(clazz, contextName);
} else {
logger = getLogger(clazz, CoreConstants.DEFAULT_CONTEXT_NAME);
}
logger = Red5LoggerFactory.getLogger(clazz, contextName);
/*
* // get a reference to our caller Class caller = Thread.currentThread().getStackTrace()[2].getClassName(); if (DEBUG) { System.out.printf("Caller class: %s classloader: %s%n",
* caller, caller.getClassLoader()); } // if the incoming class extends StatefulScopeWrappingAdapter we lookup the context by scope name boolean scopeAware =
* StatefulScopeWrappingAdapter.class.isAssignableFrom(caller); if (DEBUG) { System.out.printf("scopeAware: %s%n", scopeAware); } if (scopeAware) { try { Class wrapper = null; if
* ((wrapper = caller.asSubclass(StatefulScopeWrappingAdapter.class)) != null) { Method getScope = wrapper.getMethod("getScope", new Class[0]); // NPE will occur here if the scope
* is not yet set on the application adapter IScope scope = (IScope) getScope.invoke(null, new Object[0]); if (DEBUG) { System.out.printf("scope: %s%n", scope); } contextName =
* scope.getName(); } } catch (Exception cce) { //cclog.warn("Exception {}", e); } } else { // if the incoming class is a servlet we lookup the context name boolean
* servletScopeAware = Servlet.class.isAssignableFrom(caller); if (DEBUG) { System.out.printf("servletScopeAware: %s%n", servletScopeAware); } if (servletScopeAware) { try { Class
* wrapper = null; if ((wrapper = caller.asSubclass(Servlet.class)) != null) { //ServletConfig getServletConfig Method getServletConfig = wrapper.getMethod("getServletConfig", new
* Class[0]); // NPE will occur here if the scope is not yet set on the application adapter ServletConfig config = (ServletConfig) getServletConfig.invoke(null, new Object[0]); if
* (DEBUG) { System.out.printf("config: %s%n", config); } contextName = config.getServletContext().getContextPath().replaceAll("/", ""); if ("".equals(contextName)) { contextName =
* "root"; } } } catch (Exception cce) { //cclog.warn("Exception {}", e); } } else { // route the Launcher entries to the correct context String[] parts =
* Thread.currentThread().getName().split("Loader:/"); if (parts.length > 1) { contextName = parts[1]; } else { contextName = CoreConstants.DEFAULT_CONTEXT_NAME; } } } logger =
* Red5LoggerFactory.getLogger(clazz, contextName);
*/
}
if (logger == null) {
logger = LoggerFactory.getLogger(clazz);
}
return logger;
}

@SuppressWarnings({ "rawtypes" })
public static Logger getLogger(Class clazz, String contextName) {
public static Logger getLogger(Class<?> clazz, String contextName) {
return getLogger(clazz.getName(), contextName);
}

Expand All @@ -97,22 +91,22 @@ public static Logger getLogger(String name, String contextName) {
contextName = CoreConstants.DEFAULT_CONTEXT_NAME;
}
try {
ContextSelector selector = Red5LoggerFactory.getContextSelector();
// get the context for the given context name or default if null
LoggerContext context = selector.getLoggerContext(contextName);
LoggerContext context = contextSelector.getLoggerContext(contextName);
// and if we get here, fall back to the default context
if (context == null) {
System.err.printf("No context named %s was found!!%n", contextName);
}
// get the logger from the context or default context
if (context != null) {
logger = context.getLogger(name);
// System.out.printf("Application name: %s in context: %s%n", context.getProperty(KEY_APP_NAME), contextName);
if (DEBUG) {
rootLogger.debug("Application name: {} in context: {}", context.getProperty(CoreConstants.CONTEXT_NAME_KEY), contextName);
}
}
} catch (Exception e) {
// no logback, use whatever logger is in-place
System.err.printf("Exception %s%n", e.getMessage());
e.printStackTrace();
rootLogger.error("Exception {}", e);
}
}
if (logger == null) {
Expand All @@ -122,26 +116,26 @@ public static Logger getLogger(String name, String contextName) {
}

public static ContextSelector getContextSelector() {
ContextSelector selector = null;
if (useLogback) {
ContextSelectorStaticBinder contextSelectorBinder = ContextSelectorStaticBinder.getSingleton();
ContextSelector selector = contextSelectorBinder.getContextSelector();
selector = contextSelectorBinder.getContextSelector();
if (selector == null) {
if (DEBUG) {
System.err.println("Context selector was null, creating default context");
rootLogger.error("Context selector was null, creating default context");
}
LoggerContext defaultLoggerContext = new LoggerContext();
defaultLoggerContext.setName(CoreConstants.DEFAULT_CONTEXT_NAME);
try {
contextSelectorBinder.init(defaultLoggerContext, null);
selector = contextSelectorBinder.getContextSelector();
rootLogger.debug("Context selector: {}", selector.getClass().getName());
} catch (Exception e) {
e.printStackTrace();
rootLogger.error("Exception {}", e);
}
}
//System.out.printf("Context selector: %s%n", selector.getClass().getName());
return selector;
}
return null;
return selector;
}

public static void setUseLogback(boolean useLogback) {
Expand Down
4 changes: 2 additions & 2 deletions common/src/main/java/org/red5/server/api/Red5.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ public final class Red5 {
/**
* Server version with revision
*/
public static final String VERSION = "Red5 Server 1.3.29";
public static final String VERSION = "Red5 Server 1.3.30";

/**
* Server version for fmsVer requests
*/
public static final String FMS_VERSION = "RED5/1,3,29,0";
public static final String FMS_VERSION = "RED5/1,3,30,0";

/**
* Server capabilities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1483,10 +1483,9 @@ public void onSuccess(Packet packet) {
}
log.info("Rejected task: {}", task);
} catch (Throwable e) {
log.error("Incoming message failed task: {}", task, e);
log.warn("Incoming message failed task: {}", task, e);
if (isDebug) {
log.debug("Execution rejected on {} - {}", getSessionId(), RTMP.states[getStateCode()]);
log.debug("Lock permits - decode: {} encode: {}", decoderLock.availablePermits(), encoderLock.availablePermits());
log.debug("Execution rejected on {} - {} lock permits - decode: {} encode: {}", getSessionId(), RTMP.states[getStateCode()], decoderLock.availablePermits(), encoderLock.availablePermits());
}
}
}
Expand Down
Loading

0 comments on commit c40dfa7

Please sign in to comment.