From 69689230876a6d4f23264d1bf774610e7cd20697 Mon Sep 17 00:00:00 2001 From: Paul Gregoire Date: Fri, 18 Mar 2022 12:13:31 -0700 Subject: [PATCH] Added stream alias support and special char handling --- .../api/stream/IClientBroadcastStream.java | 47 +++++++- .../red5/server/messaging/IMessageOutput.java | 2 +- .../java/org/red5/server/scope/Scope.java | 26 ++++- .../server/stream/ClientBroadcastStream.java | 100 +++++++++++++++--- .../stream/PlaylistSubscriberStream.java | 3 +- .../org/red5/server/stream/StreamService.java | 43 +++++++- 6 files changed, 196 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/red5/server/api/stream/IClientBroadcastStream.java b/src/main/java/org/red5/server/api/stream/IClientBroadcastStream.java index 5d0d9159..fa725e59 100644 --- a/src/main/java/org/red5/server/api/stream/IClientBroadcastStream.java +++ b/src/main/java/org/red5/server/api/stream/IClientBroadcastStream.java @@ -8,6 +8,8 @@ package org.red5.server.api.stream; import java.util.Map; +import java.util.Set; + import org.red5.server.api.statistics.IClientBroadcastStreamStatistics; /** @@ -22,14 +24,14 @@ public interface IClientBroadcastStream extends IClientStream, IBroadcastStream /** * Notify client that stream is ready for publishing. */ - public void startPublishing(); + void startPublishing(); /** * Return statistics about the stream. * * @return statistics */ - public IClientBroadcastStreamStatistics getStatistics(); + IClientBroadcastStreamStatistics getStatistics(); /** * Sets streaming parameters as supplied by the publishing application. @@ -37,13 +39,50 @@ public interface IClientBroadcastStream extends IClientStream, IBroadcastStream * @param params * parameter map */ - public void setParameters(Map params); + void setParameters(Map params); /** * Returns streaming parameters. * * @return parameters */ - public Map getParameters(); + Map getParameters(); + + /** + * Adds a stream name alias. + * + * @param alias + * @return true if added to the aliases, false otherwise + */ + boolean addAlias(String alias); + + /** + * Returns whether or not an alias for this stream exists. + * + * @return true if an alias has been added and false otherwise + */ + boolean hasAlias(); + + /** + * Returns an alias. + * + * @return alias if at least one exists or null when there are none + */ + String getAlias(); + + /** + * Returns whether or not a given alias exists. + * + * @param alias + * @return true if found and false otherwise + */ + boolean containsAlias(String alias); + + /** + * Returns all the aliases. + * + * @return all aliases for this instance or an empty set + */ + Set getAliases(); } diff --git a/src/main/java/org/red5/server/messaging/IMessageOutput.java b/src/main/java/org/red5/server/messaging/IMessageOutput.java index a9eb9c08..c57e9efc 100644 --- a/src/main/java/org/red5/server/messaging/IMessageOutput.java +++ b/src/main/java/org/red5/server/messaging/IMessageOutput.java @@ -29,7 +29,7 @@ public interface IMessageOutput { void pushMessage(IMessage message) throws IOException; /** - * Connect to a provider. Note that params passed has nothing to deal with NetConnection.connect in client-side Flex/Flash RIA. + * Connect to a provider. Note that params passed has nothing to do with NetConnection.connect in client-side Flex/Flash RIA. * * @param provider * Provider diff --git a/src/main/java/org/red5/server/scope/Scope.java b/src/main/java/org/red5/server/scope/Scope.java index 9ad1c8fa..46635466 100644 --- a/src/main/java/org/red5/server/scope/Scope.java +++ b/src/main/java/org/red5/server/scope/Scope.java @@ -46,6 +46,7 @@ import org.red5.server.api.scope.ScopeType; import org.red5.server.api.statistics.IScopeStatistics; import org.red5.server.api.statistics.support.StatisticsCounter; +import org.red5.server.api.stream.IClientBroadcastStream; import org.red5.server.exception.ScopeException; import org.red5.server.jmx.mxbeans.ScopeMXBean; import org.slf4j.Logger; @@ -481,7 +482,23 @@ public IBasicScope getBasicScope(ScopeType type, String name) { */ public Set getBasicScopeNames(ScopeType type) { if (type != null) { - return children.stream().filter(child -> child.getType().equals(type)).map(IBasicScope::getName).collect(Collectors.toSet()); + // if its broadcast type then also check aliases + if (type == ScopeType.BROADCAST) { + final Set broadcastNames = new HashSet<>(); + Set broadcastScopes = children.stream().filter(child -> child.getType().equals(type)).collect(Collectors.toSet()); + broadcastScopes.forEach(bs -> { + // add the streams name + broadcastNames.add(bs.getName()); + // add any aliases + IClientBroadcastStream stream = ((IBroadcastScope) bs).getClientBroadcastStream(); + if (stream != null && stream.hasAlias()) { + broadcastNames.addAll(stream.getAliases()); + } + }); + return broadcastNames; + } else { + return children.stream().filter(child -> child.getType().equals(type)).map(IBasicScope::getName).collect(Collectors.toSet()); + } } return getScopeNames(); } @@ -1418,7 +1435,12 @@ public IBasicScope getBasicScope(ScopeType type, String name) { if (ScopeType.UNDEFINED.equals(type)) { scope = stream().filter(child -> name.equals(child.getName())).findFirst(); } else { - scope = stream().filter(child -> child.getType().equals(type) && name.equals(child.getName())).findFirst(); + // if its broadcast type then allow an alias match in addition to the name match + if (type == ScopeType.BROADCAST) { + scope = stream().filter(child -> child.getType().equals(type) && (name.equals(child.getName()) || ((IBroadcastScope) child).getClientBroadcastStream().containsAlias(name))).findFirst(); + } else { + scope = stream().filter(child -> child.getType().equals(type) && name.equals(child.getName())).findFirst(); + } } if (scope.isPresent()) { return scope.get(); diff --git a/src/main/java/org/red5/server/stream/ClientBroadcastStream.java b/src/main/java/org/red5/server/stream/ClientBroadcastStream.java index b9896749..cab9f090 100644 --- a/src/main/java/org/red5/server/stream/ClientBroadcastStream.java +++ b/src/main/java/org/red5/server/stream/ClientBroadcastStream.java @@ -16,6 +16,7 @@ import java.time.format.DateTimeFormatter; import java.util.Calendar; import java.util.Collection; +import java.util.Collections; import java.util.GregorianCalendar; import java.util.HashMap; import java.util.Map; @@ -75,6 +76,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jmx.export.annotation.ManagedResource; +import org.terracotta.statistics.jsr166e.ThreadLocalRandom; /** * Represents live stream broadcasted from client. As Flash Media Server, Red5 supports recording mode for live streams, that is, @@ -177,13 +179,23 @@ public class ClientBroadcastStream extends AbstractClientStream implements IClie */ protected boolean registerJMX = true; + /** + * Stream name aliases for the entire server instance. + */ + protected static CopyOnWriteArraySet localAliases = new CopyOnWriteArraySet<>(); + + /** + * Stream name aliases for this instance. + */ + protected CopyOnWriteArraySet aliases; + /** * Check and send notification if necessary * * @param event * Event */ - private void checkSendNotifications(IEvent event) { + protected void checkSendNotifications(IEvent event) { IEventListener source = event.getSource(); sendStartNotifications(source); } @@ -221,6 +233,11 @@ public void close() { // deregister with jmx unregisterJMX(); setState(StreamState.CLOSED); + // clear our aliases and from local registry + if (aliases != null) { + localAliases.removeAll(aliases); + aliases.clear(); + } } } @@ -404,7 +421,7 @@ public IProvider getProvider() { * Name that used for publishing. Set at client side when begin to broadcast with NetStream#publish. */ public void setPublishedName(String name) { - //log.debug("setPublishedName: {}", name); + log.debug("setPublishedName: {}", name); // a publish name of "false" is a special case, used when stopping a stream if (StringUtils.isNotEmpty(name) && !"false".equals(name)) { this.publishedName = name; @@ -475,7 +492,7 @@ public void setRegisterJMX(boolean registerJMX) { /** * Notifies handler on stream broadcast close */ - private void notifyBroadcastClose() { + protected void notifyBroadcastClose() { final IStreamAwareScopeHandler handler = getStreamAwareHandler(); if (handler != null) { try { @@ -489,7 +506,7 @@ private void notifyBroadcastClose() { /** * Notifies handler on stream recording stop */ - private void notifyRecordingStop() { + protected void notifyRecordingStop() { IStreamAwareScopeHandler handler = getStreamAwareHandler(); if (handler != null) { try { @@ -503,7 +520,7 @@ private void notifyRecordingStop() { /** * Notifies handler on stream broadcast start */ - private void notifyBroadcastStart() { + protected void notifyBroadcastStart() { IStreamAwareScopeHandler handler = getStreamAwareHandler(); if (handler != null) { try { @@ -540,7 +557,7 @@ private void notifyBroadcastStart() { /** * Send OOB control message with chunk size */ - private void notifyChunkSize() { + protected void notifyChunkSize() { if (chunkSize > 0 && livePipe != null) { OOBControlMessage setChunkSize = new OOBControlMessage(); setChunkSize.setTarget("ConnectionConsumer"); @@ -721,7 +738,7 @@ public void saveAs(String name, boolean isAppend) throws IOException { /** * Sends publish start notifications */ - private void sendPublishStartNotify() { + protected void sendPublishStartNotify() { Status publishStatus = new Status(StatusCodes.NS_PUBLISH_START); publishStatus.setClientid(getStreamId()); publishStatus.setDetails(getPublishedName()); @@ -735,7 +752,7 @@ private void sendPublishStartNotify() { /** * Sends publish stop notifications */ - private void sendPublishStopNotify() { + protected void sendPublishStopNotify() { Status stopStatus = new Status(StatusCodes.NS_UNPUBLISHED_SUCCESS); stopStatus.setClientid(getStreamId()); stopStatus.setDetails(getPublishedName()); @@ -749,7 +766,7 @@ private void sendPublishStopNotify() { /** * Sends record failed notifications */ - private void sendRecordFailedNotify(String reason) { + protected void sendRecordFailedNotify(String reason) { Status failedStatus = new Status(StatusCodes.NS_RECORD_FAILED); failedStatus.setLevel(Status.ERROR); failedStatus.setClientid(getStreamId()); @@ -764,7 +781,7 @@ private void sendRecordFailedNotify(String reason) { /** * Sends record start notifications */ - private void sendRecordStartNotify() { + protected void sendRecordStartNotify() { Status recordStatus = new Status(StatusCodes.NS_RECORD_START); recordStatus.setClientid(getStreamId()); recordStatus.setDetails(getPublishedName()); @@ -777,7 +794,7 @@ private void sendRecordStartNotify() { /** * Sends record stop notifications */ - private void sendRecordStopNotify() { + protected void sendRecordStopNotify() { Status stopStatus = new Status(StatusCodes.NS_RECORD_STOP); stopStatus.setClientid(getStreamId()); stopStatus.setDetails(getPublishedName()); @@ -805,7 +822,7 @@ protected void pushMessage(StatusMessage msg) { } } - private void sendStartNotifications(IEventListener source) { + protected void sendStartNotifications(IEventListener source) { if (sendStartNotification) { // notify handler that stream starts recording/publishing sendStartNotification = false; @@ -945,7 +962,8 @@ protected void registerJMX() { // register with jmx MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { - ObjectName oName = new ObjectName(String.format("org.red5.server:type=ClientBroadcastStream,scope=%s,publishedName=%s", getScope().getName(), publishedName)); + // replace any colons with pipes as they are invalid characters for jmx object names + ObjectName oName = new ObjectName(String.format("org.red5.server:type=ClientBroadcastStream,scope=%s,publishedName=%s", getScope().getName(), publishedName.replaceAll(":", "|"))); mbs.registerMBean(new StandardMBean(this, ClientBroadcastStreamMXBean.class, true), oName); } catch (InstanceAlreadyExistsException e) { log.debug("Instance already registered", e); @@ -960,7 +978,8 @@ protected void unregisterJMX() { if (StringUtils.isNotEmpty(publishedName) && !"false".equals(publishedName)) { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { - ObjectName oName = new ObjectName(String.format("org.red5.server:type=ClientBroadcastStream,scope=%s,publishedName=%s", getScope().getName(), publishedName)); + // replace any colons with pipes as they are invalid characters for jmx object names + ObjectName oName = new ObjectName(String.format("org.red5.server:type=ClientBroadcastStream,scope=%s,publishedName=%s", getScope().getName(), publishedName.replaceAll(":", "|"))); mbs.unregisterMBean(oName); } catch (Exception e) { log.warn("Exception unregistering", e); @@ -969,4 +988,57 @@ protected void unregisterJMX() { } } + @Override + public boolean addAlias(String alias) { + log.debug("Adding alias: {}", alias); + if (aliases == null) { + aliases = new CopyOnWriteArraySet<>(); + } + // check local registry first then attempt the add + if (!localAliases.contains(alias) && aliases.add(alias)) { + return true; + } + return false; + } + + @Override + public boolean hasAlias() { + if (aliases != null && !aliases.isEmpty()) { + return true; + } + return false; + } + + @Override + public String getAlias() { + String alias = null; + if (hasAlias()) { + int bound = aliases.size(); + if (bound > 1) { + int index = ThreadLocalRandom.current().nextInt(bound); + alias = aliases.stream().skip(index - 1).findFirst().get(); + } else { + alias = aliases.stream().findFirst().get(); + } + log.debug("Returning alias: {}", alias); + } + return alias; + } + + @Override + public boolean containsAlias(String alias) { + if (aliases != null && !aliases.isEmpty()) { + return aliases.contains(alias); + } + return false; + } + + @Override + public Set getAliases() { + if (aliases != null) { + return aliases; + } + return Collections.emptySet(); + } + } diff --git a/src/main/java/org/red5/server/stream/PlaylistSubscriberStream.java b/src/main/java/org/red5/server/stream/PlaylistSubscriberStream.java index 998d8a09..686e98c5 100644 --- a/src/main/java/org/red5/server/stream/PlaylistSubscriberStream.java +++ b/src/main/java/org/red5/server/stream/PlaylistSubscriberStream.java @@ -42,7 +42,7 @@ public class PlaylistSubscriberStream extends AbstractClientStream implements IP /** * Default playlist controller */ - private IPlaylistController defaultController; + private IPlaylistController defaultController = new SimplePlaylistController(); /** * Playlist items @@ -122,7 +122,6 @@ public class PlaylistSubscriberStream extends AbstractClientStream implements IP /** Constructs a new PlaylistSubscriberStream. */ public PlaylistSubscriberStream() { - defaultController = new SimplePlaylistController(); } /** diff --git a/src/main/java/org/red5/server/stream/StreamService.java b/src/main/java/org/red5/server/stream/StreamService.java index d23fd2e2..e466ad53 100644 --- a/src/main/java/org/red5/server/stream/StreamService.java +++ b/src/main/java/org/red5/server/stream/StreamService.java @@ -11,7 +11,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.stream.Stream; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; import org.red5.io.utils.ObjectMap; import org.red5.server.BaseConnection; @@ -50,6 +52,16 @@ public class StreamService implements IStreamService { private static Logger log = LoggerFactory.getLogger(StreamService.class); + /** + * Whether or not to strip file / media type prefixing. + */ + protected boolean stripTypePrefix = true; + + /** + * Whether or not to enable stream name aliasing. + */ + protected boolean nameAliasingEnabled = true; + /** * Use to determine playback type. */ @@ -614,7 +626,7 @@ public void publish(String name, String mode) { Map params = null; if (name != null && name.contains("?")) { // read and utilize the query string values - params = new HashMap(); + params = new HashMap<>(); String tmp = name; // check if we start with '?' or not if (name.charAt(0) != '?') { @@ -632,7 +644,12 @@ public void publish(String name, String mode) { // grab the streams name name = name.substring(0, name.indexOf("?")); } - log.debug("publish called with name {} and mode {}", name, mode); + log.debug("publish called with name: {} and mode: {}", name, mode); + // if stripping prefixes, do so here + if (stripTypePrefix) { + name = name.replaceAll("(mp4\\:|f4v\\:)", ""); + log.debug("publish name (updated): {}", name); + } IConnection conn = Red5.getConnectionLocal(); if (conn instanceof IStreamCapableConnection) { IScope scope = conn.getScope(); @@ -679,6 +696,20 @@ public void publish(String name, String mode) { if (params != null) { bs.setParameters(params); } + if (nameAliasingEnabled) { + // if aliasing, check for requested aliases before generating random names + if (params != null && params.containsKey("aliases")) { + // comma separated + Stream.of(params.get("aliases").split(",")).forEach(alias -> bs.addAlias(alias)); + } else { + // generate some random alpha/num aliases; based on cpu count x2 + int aliasCount = Runtime.getRuntime().availableProcessors() * 2; + for (int i = 0; i < aliasCount; i++) { + // generate 8-16 long aliases + bs.addAlias(RandomStringUtils.randomAlphanumeric(8, 16)); + } + } + } IContext context = conn.getScope().getContext(); IProviderService providerService = (IProviderService) context.getBean(IProviderService.BEAN_NAME); // TODO handle registration failure @@ -837,4 +868,12 @@ public static void sendNetStreamStatus(IConnection conn, String statusCode, Stri } } + public void setStripTypePrefix(boolean stripTypePrefix) { + this.stripTypePrefix = stripTypePrefix; + } + + public void setNameAliasingEnabled(boolean nameAliasingEnabled) { + this.nameAliasingEnabled = nameAliasingEnabled; + } + }