Skip to content

Commit

Permalink
New version 1.2.26; fixes mostly around WebSocket handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Jun 28, 2022
1 parent fed1648 commit e0926e9
Show file tree
Hide file tree
Showing 15 changed files with 141 additions and 60 deletions.
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.2.25</version>
<version>1.2.26</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-client</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion client/src/main/java/org/red5/client/Red5Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public final class Red5Client {
/**
* Current server version with revision
*/
public static final String VERSION = "Red5 Client 1.2.25";
public static final String VERSION = "Red5 Client 1.2.26";

/**
* Create a new Red5Client object using the connection local to the current thread A bit of magic that lets you access the red5 scope
Expand Down
2 changes: 1 addition & 1 deletion common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.2.25</version>
<version>1.2.26</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-server-common</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions common/src/main/java/org/red5/server/api/Red5.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ public final class Red5 {
/**
* Server version with revision
*/
public static final String VERSION = "Red5 Server 1.2.25";
public static final String VERSION = "Red5 Server 1.2.26";

/**
* Server version for fmsVer requests
*/
public static final String FMS_VERSION = "RED5/1,2,25,0";
public static final String FMS_VERSION = "RED5/1,2,26,0";

/**
* Server capabilities
Expand Down
2 changes: 1 addition & 1 deletion io/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.2.25</version>
<version>1.2.26</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-io</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<name>Red5</name>
<description>The Red5 server</description>
<groupId>org.red5</groupId>
<version>1.2.25</version>
<version>1.2.26</version>
<url>https://github.com/Red5/red5-server</url>
<inceptionYear>2005</inceptionYear>
<organization>
Expand Down
2 changes: 1 addition & 1 deletion server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.2.25</version>
<version>1.2.26</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -60,7 +59,13 @@ public class WebSocketConnection extends AttributeStore {
// associated websocket session
private final WsSession wsSession;

private String httpSessionId;
private final WebSocketScope scope;

// unique identifier for the session
private final String wsSessionId;

// unique identifier for this instance based upon the websocket session id
private final int hashCode;

private String host;

Expand Down Expand Up @@ -96,12 +101,16 @@ public class WebSocketConnection extends AttributeStore {
private final static AtomicLongFieldUpdater<WebSocketConnection> writeUpdater = AtomicLongFieldUpdater.newUpdater(WebSocketConnection.class, "writtenBytes");

public WebSocketConnection(WebSocketScope scope, Session session) {
// set the scope for ease of use later
this.scope = scope;
// set our path
path = scope.getPath();
// cast ws session
this.wsSession = (WsSession) session;
// set the local session id
httpSessionId = Optional.ofNullable(wsSession.getHttpSessionId()).orElse(wsSession.getId());
// the websocket session id will be used for hash code comparison, its the only usable value currently
wsSessionId = wsSession.getId();
hashCode = Integer.valueOf(wsSessionId);
log.info("wsSessionId: {}", wsSessionId);
// get extensions
wsSession.getNegotiatedExtensions().forEach(extension -> {
if (extensions == null) {
Expand Down Expand Up @@ -259,26 +268,24 @@ public void sendPong(byte[] buf) throws IllegalArgumentException, IOException {
public void close() {
if (connected.compareAndSet(true, false)) {
// TODO disconnect from scope etc...
scope.removeConnection(this);
// normal close
if (wsSession.isOpen()) {
try {
wsSession.close();
} catch (IOException e) {
} catch (Exception e) {
}
}
}
}

public long getReadBytes() {
return readBytes;
}

public void updateReadBytes(long read) {
readUpdater.addAndGet(this, read);
}

public long getWrittenBytes() {
return writtenBytes;
/**
* Return the WebSocketScope to which we're connected/connecting.
*
* @return WebSocketScope
*/
public WebSocketScope getScope() {
return scope;
}

/**
Expand Down Expand Up @@ -357,25 +364,29 @@ public void setPath(String path) {
* @return sessionId
*/
public String getSessionId() {
return wsSession.getId();
return wsSessionId;
}

/**
* Sets / overrides this connections HttpSession id.
*
* @param httpSessionId
* @deprecated Session id read from WSSession
*/
@Deprecated(since = "1.2.26")
public void setHttpSessionId(String httpSessionId) {
this.httpSessionId = httpSessionId;
//this.httpSessionId = httpSessionId;
}

/**
* Returns the HttpSession id associated with this connection.
*
* @return sessionId
* @deprecated Session id read from WSSession
*/
@Deprecated(since = "1.2.26")
public String getHttpSessionId() {
return httpSessionId;
return wsSessionId;
}

/**
Expand Down Expand Up @@ -526,9 +537,21 @@ public WsSession getWsSession() {
return wsSession;
}

public long getReadBytes() {
return readBytes;
}

public void updateReadBytes(long read) {
readUpdater.addAndGet(this, read);
}

public long getWrittenBytes() {
return writtenBytes;
}

@Override
public int hashCode() {
return Objects.hash(httpSessionId);
return hashCode;
}

@Override
Expand All @@ -540,16 +563,16 @@ public boolean equals(Object obj) {
if (getClass() != obj.getClass())
return false;
WebSocketConnection other = (WebSocketConnection) obj;
return Objects.equals(httpSessionId, other.httpSessionId);
return hashCode == other.hashCode();
}

@Override
public String toString() {
if (wsSession != null && connected.get()) {
return "WebSocketConnection [wsId=" + wsSession.getId() + ", sessionId=" + httpSessionId + ", host=" + host + ", origin=" + origin + ", path=" + path + ", secure=" + isSecure() + ", connected=" + connected + "]";
if (wsSessionId != null) {
return "WebSocketConnection [wsId=" + wsSessionId + ", host=" + host + ", origin=" + origin + ", path=" + path + ", secure=" + isSecure() + ", connected=" + connected + "]";
}
if (wsSession == null) {
return "WebSocketConnection [wsId=not-set, sessionId=not-set, host=" + host + ", origin=" + origin + ", path=" + path + ", secure=not-set, connected=" + connected + "]";
return "WebSocketConnection [wsId=not-set, host=" + host + ", origin=" + origin + ", path=" + path + ", secure=not-set, connected=" + connected + "]";
}
return "WebSocketConnection [host=" + host + ", origin=" + origin + ", path=" + path + " connected=false]";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.red5.server.api.listeners.IScopeListener;
import org.red5.server.api.scope.IScope;
import org.red5.server.api.scope.ScopeType;
import org.red5.server.plugin.PluginRegistry;
import org.red5.server.plugin.Red5Plugin;
import org.red5.server.util.ScopeUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -129,6 +130,7 @@ public void notifyScopeRemoved(IScope scope) {
@Override
public void doStop() throws Exception {
log.trace("WebSocketPlugin stop");
PluginRegistry.unregister(this);
managerMap.entrySet().forEach(entry -> {
entry.getValue().stop();
});
Expand Down Expand Up @@ -264,8 +266,8 @@ public WebSocketScopeManager getManager(String path) {
if (parts.length > 1) {
// skip default in a path if it exists in slot #1
String name = !"default".equals(parts[1]) ? parts[1] : ((parts.length >= 3) ? parts[2] : parts[1]);
if (log.isDebugEnabled()) {
log.debug("Managers: {}", managerMap.entrySet());
if (log.isTraceEnabled()) {
log.trace("Managers: {}", managerMap.entrySet());
}
for (Entry<IScope, WebSocketScopeManager> entry : managerMap.entrySet()) {
IScope appScope = entry.getKey();
Expand Down Expand Up @@ -410,7 +412,7 @@ public static ServerContainer getWsServerContainerInstance(ServletContext servle
try {
// locate the endpoint class
Class<?> endpointClass = Class.forName(wsEndpointClass);
log.debug("startWebSocket - endpointPath: {} endpointClass: {}", path, endpointClass);
log.debug("startWebSocket - endpointPath: {} endpointClass: {}", path, wsEndpointClass);
// build an endpoint config
ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder.create(endpointClass, path).configurator(configurator).subprotocols(subProtocols).build();
// set the endpoint on the server container
Expand Down
49 changes: 36 additions & 13 deletions server/src/main/java/org/red5/net/websocket/WebSocketScope.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.red5.net.websocket.model.WSMessage;
import org.red5.server.api.scope.IScope;
import org.red5.server.plugin.PluginRegistry;
import org.red5.server.util.ScopeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
Expand All @@ -31,6 +32,8 @@ public class WebSocketScope implements InitializingBean, DisposableBean {

private static Logger log = LoggerFactory.getLogger(WebSocketScope.class);

private WebSocketScopeManager manager;

protected CopyOnWriteArraySet<WebSocketConnection> conns = new CopyOnWriteArraySet<>();

protected CopyOnWriteArraySet<IWebSocketDataListener> listeners = new CopyOnWriteArraySet<>();
Expand Down Expand Up @@ -63,7 +66,8 @@ public void destroy() throws Exception {
*/
public void register() {
log.info("Application scope: {}", scope);
WebSocketScopeManager manager = ((WebSocketPlugin) PluginRegistry.getPlugin(WebSocketPlugin.NAME)).getManager(scope);
// get the manager on registration and keep a reference locally
manager = ((WebSocketPlugin) PluginRegistry.getPlugin(WebSocketPlugin.NAME)).getManager(scope);
if (manager.setApplication(scope)) {
log.info("WebSocket app added: {}", scope.getName());
}
Expand All @@ -76,6 +80,12 @@ public void register() {
* Un-registers from the WebSocketScopeManager.
*/
public void unregister() {
// remove app scope registration only if we're an app scope
if (ScopeUtils.isApp(scope)) {
manager.removeApplication(scope);
}
// remove ourself
manager.removeWebSocketScope(this);
// clean up the connections by first closing them
conns.forEach(conn -> {
conn.close();
Expand All @@ -95,7 +105,8 @@ public void unregister() {
* @return WebSocketConnection for the given id or null if not found
*/
public WebSocketConnection getConnectionBySessionId(String id) {
Optional<WebSocketConnection> opt = conns.stream().filter(conn -> id.equals(conn.getHttpSessionId())).findFirst();
log.debug("getConnectionBySessionId: {}", id);
Optional<WebSocketConnection> opt = conns.stream().filter(conn -> id.equals(conn.getSessionId())).findFirst();
if (opt.isPresent()) {
return opt.get();
}
Expand Down Expand Up @@ -155,12 +166,18 @@ public String getPath() {
* @param conn WebSocketConnection
*/
public void addConnection(WebSocketConnection conn) {
if (conns.add(conn)) {
for (IWebSocketDataListener listener : listeners) {
listener.onWSConnect(conn);
// prevent false failed logging when a connection is already registered
if (!conns.contains(conn)) {
if (conns.add(conn)) {
log.debug("Added connection: {}", conn);
for (IWebSocketDataListener listener : listeners) {
listener.onWSConnect(conn);
}
} else {
log.warn("Add connection failed for: {}", conn);
}
} else {
log.warn("Add connection failed for: {}", conn);
log.debug("Add connection skipped, already registered: {}", conn);
}
}

Expand All @@ -170,12 +187,18 @@ public void addConnection(WebSocketConnection conn) {
* @param conn WebSocketConnection
*/
public void removeConnection(WebSocketConnection conn) {
if (conns.remove(conn)) {
for (IWebSocketDataListener listener : listeners) {
listener.onWSDisconnect(conn);
// prevent false failed logging when a connection isnt registered
if (conns.contains(conn)) {
if (conns.remove(conn)) {
log.debug("Removed connection: {}", conn);
for (IWebSocketDataListener listener : listeners) {
listener.onWSDisconnect(conn);
}
} else {
log.warn("Remove connection failed for: {}", conn);
}
} else {
log.warn("Remove connection failed for: {}", conn);
log.debug("Remove connection skipped, not registered: {}", conn);
}
}

Expand All @@ -185,7 +208,7 @@ public void removeConnection(WebSocketConnection conn) {
* @param listener IWebSocketDataListener
*/
public void addListener(IWebSocketDataListener listener) {
log.info("addListener: {}", listener);
log.debug("addListener to {}: {}", path, listener);
listeners.add(listener);
}

Expand All @@ -195,7 +218,7 @@ public void addListener(IWebSocketDataListener listener) {
* @param listener IWebSocketDataListener
*/
public void removeListener(IWebSocketDataListener listener) {
log.info("removeListener: {}", listener);
log.debug("removeListener from {}: {}", path, listener);
listeners.remove(listener);
}

Expand All @@ -206,7 +229,7 @@ public void removeListener(IWebSocketDataListener listener) {
* list of IWebSocketDataListener
*/
public void setListeners(Collection<IWebSocketDataListener> listeners) {
log.trace("setListeners: {}", listeners);
log.trace("setListeners on {}: {}", path, listeners);
this.listeners.addAll(listeners);
}

Expand Down
Loading

0 comments on commit e0926e9

Please sign in to comment.