Skip to content

Commit

Permalink
Added stream alias support and special char handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Mar 18, 2022
1 parent 45177cf commit 6968923
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package org.red5.server.api.stream;

import java.util.Map;
import java.util.Set;

import org.red5.server.api.statistics.IClientBroadcastStreamStatistics;

/**
Expand All @@ -22,28 +24,65 @@ public interface IClientBroadcastStream extends IClientStream, IBroadcastStream
/**
* Notify client that stream is ready for publishing.
*/
public void startPublishing();
void startPublishing();

/**
* Return statistics about the stream.
*
* @return statistics
*/
public IClientBroadcastStreamStatistics getStatistics();
IClientBroadcastStreamStatistics getStatistics();

/**
* Sets streaming parameters as supplied by the publishing application.
*
* @param params
* parameter map
*/
public void setParameters(Map<String, String> params);
void setParameters(Map<String, String> params);

/**
* Returns streaming parameters.
*
* @return parameters
*/
public Map<String, String> getParameters();
Map<String, String> getParameters();

/**
* Adds a stream name alias.
*
* @param alias
* @return true if added to the aliases, false otherwise
*/
boolean addAlias(String alias);

/**
* Returns whether or not an alias for this stream exists.
*
* @return true if an alias has been added and false otherwise
*/
boolean hasAlias();

/**
* Returns an alias.
*
* @return alias if at least one exists or null when there are none
*/
String getAlias();

/**
* Returns whether or not a given alias exists.
*
* @param alias
* @return true if found and false otherwise
*/
boolean containsAlias(String alias);

/**
* Returns all the aliases.
*
* @return all aliases for this instance or an empty set
*/
Set<String> getAliases();

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface IMessageOutput {
void pushMessage(IMessage message) throws IOException;

/**
* Connect to a provider. Note that params passed has nothing to deal with NetConnection.connect in client-side Flex/Flash RIA.
* Connect to a provider. Note that params passed has nothing to do with NetConnection.connect in client-side Flex/Flash RIA.
*
* @param provider
* Provider
Expand Down
26 changes: 24 additions & 2 deletions src/main/java/org/red5/server/scope/Scope.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.red5.server.api.scope.ScopeType;
import org.red5.server.api.statistics.IScopeStatistics;
import org.red5.server.api.statistics.support.StatisticsCounter;
import org.red5.server.api.stream.IClientBroadcastStream;
import org.red5.server.exception.ScopeException;
import org.red5.server.jmx.mxbeans.ScopeMXBean;
import org.slf4j.Logger;
Expand Down Expand Up @@ -481,7 +482,23 @@ public IBasicScope getBasicScope(ScopeType type, String name) {
*/
public Set<String> getBasicScopeNames(ScopeType type) {
if (type != null) {
return children.stream().filter(child -> child.getType().equals(type)).map(IBasicScope::getName).collect(Collectors.toSet());
// if its broadcast type then also check aliases
if (type == ScopeType.BROADCAST) {
final Set<String> broadcastNames = new HashSet<>();
Set<IBasicScope> broadcastScopes = children.stream().filter(child -> child.getType().equals(type)).collect(Collectors.toSet());
broadcastScopes.forEach(bs -> {
// add the streams name
broadcastNames.add(bs.getName());
// add any aliases
IClientBroadcastStream stream = ((IBroadcastScope) bs).getClientBroadcastStream();
if (stream != null && stream.hasAlias()) {
broadcastNames.addAll(stream.getAliases());
}
});
return broadcastNames;
} else {
return children.stream().filter(child -> child.getType().equals(type)).map(IBasicScope::getName).collect(Collectors.toSet());
}
}
return getScopeNames();
}
Expand Down Expand Up @@ -1418,7 +1435,12 @@ public IBasicScope getBasicScope(ScopeType type, String name) {
if (ScopeType.UNDEFINED.equals(type)) {
scope = stream().filter(child -> name.equals(child.getName())).findFirst();
} else {
scope = stream().filter(child -> child.getType().equals(type) && name.equals(child.getName())).findFirst();
// if its broadcast type then allow an alias match in addition to the name match
if (type == ScopeType.BROADCAST) {
scope = stream().filter(child -> child.getType().equals(type) && (name.equals(child.getName()) || ((IBroadcastScope) child).getClientBroadcastStream().containsAlias(name))).findFirst();
} else {
scope = stream().filter(child -> child.getType().equals(type) && name.equals(child.getName())).findFirst();
}
}
if (scope.isPresent()) {
return scope.get();
Expand Down
100 changes: 86 additions & 14 deletions src/main/java/org/red5/server/stream/ClientBroadcastStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -75,6 +76,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.terracotta.statistics.jsr166e.ThreadLocalRandom;

/**
* Represents live stream broadcasted from client. As Flash Media Server, Red5 supports recording mode for live streams, that is,
Expand Down Expand Up @@ -177,13 +179,23 @@ public class ClientBroadcastStream extends AbstractClientStream implements IClie
*/
protected boolean registerJMX = true;

/**
* Stream name aliases for the entire server instance.
*/
protected static CopyOnWriteArraySet<String> localAliases = new CopyOnWriteArraySet<>();

/**
* Stream name aliases for this instance.
*/
protected CopyOnWriteArraySet<String> aliases;

/**
* Check and send notification if necessary
*
* @param event
* Event
*/
private void checkSendNotifications(IEvent event) {
protected void checkSendNotifications(IEvent event) {
IEventListener source = event.getSource();
sendStartNotifications(source);
}
Expand Down Expand Up @@ -221,6 +233,11 @@ public void close() {
// deregister with jmx
unregisterJMX();
setState(StreamState.CLOSED);
// clear our aliases and from local registry
if (aliases != null) {
localAliases.removeAll(aliases);
aliases.clear();
}
}
}

Expand Down Expand Up @@ -404,7 +421,7 @@ public IProvider getProvider() {
* Name that used for publishing. Set at client side when begin to broadcast with NetStream#publish.
*/
public void setPublishedName(String name) {
//log.debug("setPublishedName: {}", name);
log.debug("setPublishedName: {}", name);
// a publish name of "false" is a special case, used when stopping a stream
if (StringUtils.isNotEmpty(name) && !"false".equals(name)) {
this.publishedName = name;
Expand Down Expand Up @@ -475,7 +492,7 @@ public void setRegisterJMX(boolean registerJMX) {
/**
* Notifies handler on stream broadcast close
*/
private void notifyBroadcastClose() {
protected void notifyBroadcastClose() {
final IStreamAwareScopeHandler handler = getStreamAwareHandler();
if (handler != null) {
try {
Expand All @@ -489,7 +506,7 @@ private void notifyBroadcastClose() {
/**
* Notifies handler on stream recording stop
*/
private void notifyRecordingStop() {
protected void notifyRecordingStop() {
IStreamAwareScopeHandler handler = getStreamAwareHandler();
if (handler != null) {
try {
Expand All @@ -503,7 +520,7 @@ private void notifyRecordingStop() {
/**
* Notifies handler on stream broadcast start
*/
private void notifyBroadcastStart() {
protected void notifyBroadcastStart() {
IStreamAwareScopeHandler handler = getStreamAwareHandler();
if (handler != null) {
try {
Expand Down Expand Up @@ -540,7 +557,7 @@ private void notifyBroadcastStart() {
/**
* Send OOB control message with chunk size
*/
private void notifyChunkSize() {
protected void notifyChunkSize() {
if (chunkSize > 0 && livePipe != null) {
OOBControlMessage setChunkSize = new OOBControlMessage();
setChunkSize.setTarget("ConnectionConsumer");
Expand Down Expand Up @@ -721,7 +738,7 @@ public void saveAs(String name, boolean isAppend) throws IOException {
/**
* Sends publish start notifications
*/
private void sendPublishStartNotify() {
protected void sendPublishStartNotify() {
Status publishStatus = new Status(StatusCodes.NS_PUBLISH_START);
publishStatus.setClientid(getStreamId());
publishStatus.setDetails(getPublishedName());
Expand All @@ -735,7 +752,7 @@ private void sendPublishStartNotify() {
/**
* Sends publish stop notifications
*/
private void sendPublishStopNotify() {
protected void sendPublishStopNotify() {
Status stopStatus = new Status(StatusCodes.NS_UNPUBLISHED_SUCCESS);
stopStatus.setClientid(getStreamId());
stopStatus.setDetails(getPublishedName());
Expand All @@ -749,7 +766,7 @@ private void sendPublishStopNotify() {
/**
* Sends record failed notifications
*/
private void sendRecordFailedNotify(String reason) {
protected void sendRecordFailedNotify(String reason) {
Status failedStatus = new Status(StatusCodes.NS_RECORD_FAILED);
failedStatus.setLevel(Status.ERROR);
failedStatus.setClientid(getStreamId());
Expand All @@ -764,7 +781,7 @@ private void sendRecordFailedNotify(String reason) {
/**
* Sends record start notifications
*/
private void sendRecordStartNotify() {
protected void sendRecordStartNotify() {
Status recordStatus = new Status(StatusCodes.NS_RECORD_START);
recordStatus.setClientid(getStreamId());
recordStatus.setDetails(getPublishedName());
Expand All @@ -777,7 +794,7 @@ private void sendRecordStartNotify() {
/**
* Sends record stop notifications
*/
private void sendRecordStopNotify() {
protected void sendRecordStopNotify() {
Status stopStatus = new Status(StatusCodes.NS_RECORD_STOP);
stopStatus.setClientid(getStreamId());
stopStatus.setDetails(getPublishedName());
Expand Down Expand Up @@ -805,7 +822,7 @@ protected void pushMessage(StatusMessage msg) {
}
}

private void sendStartNotifications(IEventListener source) {
protected void sendStartNotifications(IEventListener source) {
if (sendStartNotification) {
// notify handler that stream starts recording/publishing
sendStartNotification = false;
Expand Down Expand Up @@ -945,7 +962,8 @@ protected void registerJMX() {
// register with jmx
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try {
ObjectName oName = new ObjectName(String.format("org.red5.server:type=ClientBroadcastStream,scope=%s,publishedName=%s", getScope().getName(), publishedName));
// replace any colons with pipes as they are invalid characters for jmx object names
ObjectName oName = new ObjectName(String.format("org.red5.server:type=ClientBroadcastStream,scope=%s,publishedName=%s", getScope().getName(), publishedName.replaceAll(":", "|")));
mbs.registerMBean(new StandardMBean(this, ClientBroadcastStreamMXBean.class, true), oName);
} catch (InstanceAlreadyExistsException e) {
log.debug("Instance already registered", e);
Expand All @@ -960,7 +978,8 @@ protected void unregisterJMX() {
if (StringUtils.isNotEmpty(publishedName) && !"false".equals(publishedName)) {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try {
ObjectName oName = new ObjectName(String.format("org.red5.server:type=ClientBroadcastStream,scope=%s,publishedName=%s", getScope().getName(), publishedName));
// replace any colons with pipes as they are invalid characters for jmx object names
ObjectName oName = new ObjectName(String.format("org.red5.server:type=ClientBroadcastStream,scope=%s,publishedName=%s", getScope().getName(), publishedName.replaceAll(":", "|")));
mbs.unregisterMBean(oName);
} catch (Exception e) {
log.warn("Exception unregistering", e);
Expand All @@ -969,4 +988,57 @@ protected void unregisterJMX() {
}
}

@Override
public boolean addAlias(String alias) {
log.debug("Adding alias: {}", alias);
if (aliases == null) {
aliases = new CopyOnWriteArraySet<>();
}
// check local registry first then attempt the add
if (!localAliases.contains(alias) && aliases.add(alias)) {
return true;
}
return false;
}

@Override
public boolean hasAlias() {
if (aliases != null && !aliases.isEmpty()) {
return true;
}
return false;
}

@Override
public String getAlias() {
String alias = null;
if (hasAlias()) {
int bound = aliases.size();
if (bound > 1) {
int index = ThreadLocalRandom.current().nextInt(bound);
alias = aliases.stream().skip(index - 1).findFirst().get();
} else {
alias = aliases.stream().findFirst().get();
}
log.debug("Returning alias: {}", alias);
}
return alias;
}

@Override
public boolean containsAlias(String alias) {
if (aliases != null && !aliases.isEmpty()) {
return aliases.contains(alias);
}
return false;
}

@Override
public Set<String> getAliases() {
if (aliases != null) {
return aliases;
}
return Collections.emptySet();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class PlaylistSubscriberStream extends AbstractClientStream implements IP
/**
* Default playlist controller
*/
private IPlaylistController defaultController;
private IPlaylistController defaultController = new SimplePlaylistController();

/**
* Playlist items
Expand Down Expand Up @@ -122,7 +122,6 @@ public class PlaylistSubscriberStream extends AbstractClientStream implements IP

/** Constructs a new PlaylistSubscriberStream. */
public PlaylistSubscriberStream() {
defaultController = new SimplePlaylistController();
}

/**
Expand Down
Loading

0 comments on commit 6968923

Please sign in to comment.