diff --git a/pom.xml b/pom.xml index b5feea6a..3d1df2ed 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.red5 red5-parent - 1.0.8-M4 + 1.0.8-M5 4.0.0 red5-server-common diff --git a/src/main/java/org/red5/server/messaging/AbstractMessage.java b/src/main/java/org/red5/server/messaging/AbstractMessage.java index 53353ea2..8504f1a9 100644 --- a/src/main/java/org/red5/server/messaging/AbstractMessage.java +++ b/src/main/java/org/red5/server/messaging/AbstractMessage.java @@ -33,7 +33,7 @@ public class AbstractMessage implements IMessage { protected String messageType; - protected Map extraHeaders = null; + protected Map extraHeaders; /** {@inheritDoc} */ public String getMessageID() { diff --git a/src/main/java/org/red5/server/messaging/AbstractPipe.java b/src/main/java/org/red5/server/messaging/AbstractPipe.java index 7a8b2d98..ef472f78 100644 --- a/src/main/java/org/red5/server/messaging/AbstractPipe.java +++ b/src/main/java/org/red5/server/messaging/AbstractPipe.java @@ -68,17 +68,7 @@ public abstract class AbstractPipe implements IPipe { * Consumer * @param paramMap * Parameters passed with connection, used in concrete pipe implementations - * @return
-     * true
-     * 
- * - * if consumer was added, - * - *
-     * false
-     * 
- * - * otherwise + * @return true if consumer was added, false otherwise */ public boolean subscribe(IConsumer consumer, Map paramMap) { // pipe is possibly used by dozens of threads at once (like many subscribers for one server stream) @@ -87,7 +77,6 @@ public boolean subscribe(IConsumer consumer, Map paramMap) { if (success && consumer instanceof IPipeConnectionListener) { listeners.addIfAbsent((IPipeConnectionListener) consumer); } - return success; } @@ -98,17 +87,7 @@ public boolean subscribe(IConsumer consumer, Map paramMap) { * Provider * @param paramMap * Parameters passed with connection, used in concrete pipe implementations - * @return
-     * true
-     * 
- * - * if provider was added, - * - *
-     * false
-     * 
- * - * otherwise + * @return true if provider was added, false otherwise */ public boolean subscribe(IProvider provider, Map paramMap) { boolean success = providers.addIfAbsent(provider); @@ -124,21 +103,11 @@ public boolean subscribe(IProvider provider, Map paramMap) { * * @param provider * Provider that should be removed - * @return
-     * true
-     * 
- * - * on success, - * - *
-     * false
-     * 
- * - * otherwise + * @return true on success, false otherwise */ public boolean unsubscribe(IProvider provider) { if (providers.remove(provider)) { - fireProviderConnectionEvent(provider, PipeConnectionEvent.PROVIDER_DISCONNECT, null); + fireProviderConnectionEvent(provider, PipeConnectionEvent.EventType.PROVIDER_DISCONNECT, null); listeners.remove(provider); return true; } @@ -150,21 +119,11 @@ public boolean unsubscribe(IProvider provider) { * * @param consumer * Consumer that should be removed - * @return
-     * true
-     * 
- * - * on success, - * - *
-     * false
-     * 
- * - * otherwise + * @return true on success, false otherwise */ public boolean unsubscribe(IConsumer consumer) { if (consumers.remove(consumer)) { - fireConsumerConnectionEvent(consumer, PipeConnectionEvent.CONSUMER_DISCONNECT, null); + fireConsumerConnectionEvent(consumer, PipeConnectionEvent.EventType.CONSUMER_DISCONNECT, null); listeners.remove(consumer); return true; } @@ -275,15 +234,8 @@ public List getConsumers() { * @param paramMap * Parameters passed with connection */ - protected void fireConsumerConnectionEvent(IConsumer consumer, int type, Map paramMap) { - // Create event object - PipeConnectionEvent event = new PipeConnectionEvent(this); - // Fill it up - event.setConsumer(consumer); - event.setType(type); - event.setParamMap(paramMap); - // Fire it - firePipeConnectionEvent(event); + protected void fireConsumerConnectionEvent(IConsumer consumer, PipeConnectionEvent.EventType type, Map paramMap) { + firePipeConnectionEvent(PipeConnectionEvent.build(this, type, consumer, paramMap)); } /** @@ -296,12 +248,8 @@ protected void fireConsumerConnectionEvent(IConsumer consumer, int type, Map paramMap) { - PipeConnectionEvent event = new PipeConnectionEvent(this); - event.setProvider(provider); - event.setType(type); - event.setParamMap(paramMap); - firePipeConnectionEvent(event); + protected void fireProviderConnectionEvent(IProvider provider, PipeConnectionEvent.EventType type, Map paramMap) { + firePipeConnectionEvent(PipeConnectionEvent.build(this, type, provider, paramMap)); } /** diff --git a/src/main/java/org/red5/server/messaging/IPassive.java b/src/main/java/org/red5/server/messaging/IPassive.java index d34cc449..b38468c9 100644 --- a/src/main/java/org/red5/server/messaging/IPassive.java +++ b/src/main/java/org/red5/server/messaging/IPassive.java @@ -25,5 +25,7 @@ * @author Steven Gong (steven.gong@gmail.com) */ public interface IPassive { + public static final String KEY = IPassive.class.getName(); + } diff --git a/src/main/java/org/red5/server/messaging/IPipe.java b/src/main/java/org/red5/server/messaging/IPipe.java index c7e75747..d0717f3a 100644 --- a/src/main/java/org/red5/server/messaging/IPipe.java +++ b/src/main/java/org/red5/server/messaging/IPipe.java @@ -27,6 +27,7 @@ * @author Steven Gong (steven.gong@gmail.com) */ public interface IPipe extends IMessageInput, IMessageOutput { + /** * Add connection event listener to pipe * @@ -42,4 +43,5 @@ public interface IPipe extends IMessageInput, IMessageOutput { * Connection event listener */ void removePipeConnectionListener(IPipeConnectionListener listener); + } diff --git a/src/main/java/org/red5/server/messaging/InMemoryPushPushPipe.java b/src/main/java/org/red5/server/messaging/InMemoryPushPushPipe.java index bfdfd1f5..837658b9 100644 --- a/src/main/java/org/red5/server/messaging/InMemoryPushPushPipe.java +++ b/src/main/java/org/red5/server/messaging/InMemoryPushPushPipe.java @@ -54,7 +54,7 @@ public boolean subscribe(IConsumer consumer, Map paramMap) { log.debug("Consumer subscribe{} {} params: {}", new Object[] { (success ? "d" : " failed"), consumer, paramMap }); } if (success) { - fireConsumerConnectionEvent(consumer, PipeConnectionEvent.CONSUMER_CONNECT_PUSH, paramMap); + fireConsumerConnectionEvent(consumer, PipeConnectionEvent.EventType.CONSUMER_CONNECT_PUSH, paramMap); } return success; } else { @@ -70,7 +70,7 @@ public boolean subscribe(IProvider provider, Map paramMap) { log.debug("Provider subscribe{} {} params: {}", new Object[] { (success ? "d" : " failed"), provider, paramMap }); } if (success) { - fireProviderConnectionEvent(provider, PipeConnectionEvent.PROVIDER_CONNECT_PUSH, paramMap); + fireProviderConnectionEvent(provider, PipeConnectionEvent.EventType.PROVIDER_CONNECT_PUSH, paramMap); } return success; } @@ -91,12 +91,11 @@ public IMessage pullMessage(long wait) { * @param message * the message to be pushed to consumers * @throws IOException - * In case IOException of some sort is occured + * In case IOException of some sort is occurred */ public void pushMessage(IMessage message) throws IOException { if (log.isDebugEnabled()) { - log.debug("pushMessage: {}", message); - log.debug("pushMessage - consumers: {}", consumers.size()); + log.debug("pushMessage: {} to {} consumers", message, consumers.size()); } for (IConsumer consumer : consumers) { try { diff --git a/src/main/java/org/red5/server/messaging/OOBControlMessage.java b/src/main/java/org/red5/server/messaging/OOBControlMessage.java index b1d72626..933fa308 100644 --- a/src/main/java/org/red5/server/messaging/OOBControlMessage.java +++ b/src/main/java/org/red5/server/messaging/OOBControlMessage.java @@ -22,14 +22,17 @@ import java.util.Map; /** - * Out-of-band control message used by inter-components communication which are connected with pipes. Out-of-band data is a separate data stream used for specific purposes (in TCP it's referenced as "urgent data"), like lifecycle control. + * Out-of-band control message used by inter-components communication which are connected with pipes. + * Out-of-band data is a separate data stream used for specific purposes (in TCP it's referenced as "urgent data"), like lifecycle control. * - * 'Target' is used to represent the receiver who may be interested for receiving. It's a string of any form. XXX shall we design a standard form for Target, like "class.instance"? + * 'Target' is used to represent the receiver who may be interested for receiving. + * It's a string of any form. XXX shall we design a standard form for Target, like "class.instance"? * * @author The Red5 Project * @author Steven Gong (steven.gong@gmail.com) */ public class OOBControlMessage implements Serializable { + private static final long serialVersionUID = -6037348177653934300L; /** diff --git a/src/main/java/org/red5/server/messaging/PipeConnectionEvent.java b/src/main/java/org/red5/server/messaging/PipeConnectionEvent.java index ccac1a30..9bdb73d3 100644 --- a/src/main/java/org/red5/server/messaging/PipeConnectionEvent.java +++ b/src/main/java/org/red5/server/messaging/PipeConnectionEvent.java @@ -18,81 +18,96 @@ package org.red5.server.messaging; -import java.util.ArrayList; import java.util.EventObject; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** - * Event object corresponds to the connect/disconnect events among providers/consumers and pipes. + * Event object corresponds to the connect/disconnect events among providers/consumers on pipes. + * This object is immutable except for the parameter map and tasks. * * @author The Red5 Project * @author Steven Gong (steven.gong@gmail.com) + * @author Paul Gregoire (mondain@gmail.com) */ public class PipeConnectionEvent extends EventObject { private static final long serialVersionUID = 9078843765378168072L; - private List taskList = new ArrayList(3); + /** Pipe connection event type */ + public enum EventType { + /** Provider connects in pull mode */ + PROVIDER_CONNECT_PULL, + /** Provider connects in push mode */ + PROVIDER_CONNECT_PUSH, + /** Provider disconnects */ + PROVIDER_DISCONNECT, + /** Consumer connects in pull mode */ + CONSUMER_CONNECT_PULL, + /** Consumer connects in push mode */ + CONSUMER_CONNECT_PUSH, + /** Consumer disconnects */ + CONSUMER_DISCONNECT + }; /** - * A provider connects as pull mode. - */ - public static final int PROVIDER_CONNECT_PULL = 0; - - /** - * A provider connects as push mode. - */ - public static final int PROVIDER_CONNECT_PUSH = 1; - - /** - * A provider disconnects. - */ - public static final int PROVIDER_DISCONNECT = 2; - - /** - * A consumer connects as pull mode. - */ - public static final int CONSUMER_CONNECT_PULL = 3; - - /** - * A consumer connects as push mode. + * Provider */ - public static final int CONSUMER_CONNECT_PUSH = 4; + private final transient IProvider provider; /** - * A consumer disconnects. + * Consumer */ - public static final int CONSUMER_DISCONNECT = 5; + private final transient IConsumer consumer; /** - * Provider + * Event type */ - private transient IProvider provider; + private final EventType type; /** - * Consumer + * Parameters map */ - private transient IConsumer consumer; + private final ConcurrentMap paramMap = new ConcurrentHashMap<>(); /** - * Event type + * List of tasks to be executed for the event */ - private int type; + private final LinkedList taskList = new LinkedList<>(); /** - * Parameters map + * Construct an object with the specific pipe as the source + * + * @param source pipe that triggers this event + * @param type event type + * @param consumer the consumer + * @param paramMap parameters map */ - private Map paramMap; + private PipeConnectionEvent(AbstractPipe source, EventType type, IConsumer consumer, Map paramMap) { + super(source); + this.type = type; + this.consumer = consumer; + this.provider = null; + setParamMap(paramMap); + } /** * Construct an object with the specific pipe as the source * - * @param source - * A pipe that triggers this event. + * @param source pipe that triggers this event + * @param type event type + * @param provider the provider + * @param paramMap parameters map */ - public PipeConnectionEvent(Object source) { + private PipeConnectionEvent(AbstractPipe source, EventType type, IProvider provider, Map paramMap) { super(source); + this.type = type; + this.consumer = null; + this.provider = provider; + setParamMap(paramMap); } /** @@ -104,16 +119,6 @@ public IProvider getProvider() { return provider; } - /** - * Setter for pipe connection provider - * - * @param provider - * Provider - */ - public void setProvider(IProvider provider) { - this.provider = provider; - } - /** * Return pipe connection consumer * @@ -123,35 +128,15 @@ public IConsumer getConsumer() { return consumer; } - /** - * Setter for pipe connection consumer - * - * @param consumer - * Consumer - */ - public void setConsumer(IConsumer consumer) { - this.consumer = consumer; - } - /** * Return event type * * @return Event type */ - public int getType() { + public EventType getType() { return type; } - /** - * Setter for event type - * - * @param type - * Event type - */ - public void setType(int type) { - this.type = type; - } - /** * Return event parameters as Map * @@ -168,7 +153,9 @@ public Map getParamMap() { * Event parameters as Map */ public void setParamMap(Map paramMap) { - this.paramMap = paramMap; + if (paramMap != null && !paramMap.isEmpty()) { + this.paramMap.putAll(paramMap); + } } /** @@ -189,4 +176,31 @@ public void addTask(Runnable task) { List getTaskList() { return taskList; } + + /** + * Builds a PipeConnectionEvent with a source pipe and consumer. + * + * @param source pipe that triggers this event + * @param type event type + * @param consumer the consumer + * @param paramMap parameters map + * @return event + */ + public final static PipeConnectionEvent build(AbstractPipe source, EventType type, IConsumer consumer, Map paramMap) { + return new PipeConnectionEvent(source, type, consumer, paramMap); + } + + /** + * Builds a PipeConnectionEvent with a source pipe and provider. + * + * @param source pipe that triggers this event + * @param type event type + * @param provider the provider + * @param paramMap parameters map + * @return event + */ + public final static PipeConnectionEvent build(AbstractPipe source, EventType type, IProvider provider, Map paramMap) { + return new PipeConnectionEvent(source, type, provider, paramMap); + } + } diff --git a/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java b/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java index b1166dc9..12fcaeaf 100755 --- a/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java +++ b/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java @@ -653,13 +653,15 @@ public void closeChannel(int channelId) { } ReceivedMessageTaskQueue queue = tasksByChannels.remove(channelId); if (queue != null) { + if (isConnected()) { + // if connected, drain and process the tasks queued-up + log.debug("Processing remaining tasks at close for channel: {}", channelId); + processTasksQueue(queue); + } queue.removeAllTasks(); } else if (log.isTraceEnabled()) { log.trace("No task queue for id: {}", channelId); } - if (log.isDebugEnabled()) { - log.debug("Closing / removing channel: {}", chan); - } chan = null; } diff --git a/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTaskQueue.java b/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTaskQueue.java index 2ab2c93d..85c30723 100644 --- a/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTaskQueue.java +++ b/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTaskQueue.java @@ -98,7 +98,6 @@ public ReceivedMessageTask getTaskToProcess() { if (task != null && task.setProcessing()) { return task; } - return null; } @@ -147,7 +146,7 @@ public void run() { if (packet.isProcessed()) { log.debug("DeadlockGuard skipping task for processed packet {}", task); } else if (packet.isExpired()) { - //I believe we also should try to interrupt thread + // try to interrupt thread log.debug("DeadlockGuard skipping task for expired packet {}", task); } else { // if the message task is not yet done or is not expired interrupt @@ -163,7 +162,7 @@ public void run() { log.debug("Unfinished task {} already interrupted", task); } } - //remove this task from the queue in any case + // remove this task from the queue in any case removeTask(task); } } diff --git a/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java b/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java index 57b8475b..e6457117 100644 --- a/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java +++ b/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java @@ -82,7 +82,7 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder { protected Logger log = LoggerFactory.getLogger(RTMPProtocolDecoder.class); // close when header errors occur - protected boolean closeOnHeaderError = false; + protected boolean closeOnHeaderError; /** Constructs a new RTMPProtocolDecoder. */ public RTMPProtocolDecoder() { @@ -465,7 +465,9 @@ public IRTMPEvent decodeMessage(RTMPConnection conn, Header header, IoBuffer in) message = decodeInvoke(conn.getEncoding(), in); break; case TYPE_NOTIFY: - log.trace("Sending notify on stream id: {}", header.getStreamId()); + if (log.isTraceEnabled()) { + log.trace("Sending notify on stream id: {}", header.getStreamId()); + } if (header.getStreamId().doubleValue() == 0.0d) { message = decodeNotify(conn.getEncoding(), in, header); } else { @@ -551,6 +553,9 @@ private IRTMPEvent decodeClientBW(IoBuffer in) { /** {@inheritDoc} */ public Unknown decodeUnknown(byte dataType, IoBuffer in) { + if (log.isDebugEnabled()) { + log.debug("decodeUnknown: {}", dataType); + } return new Unknown(dataType, in); } @@ -690,6 +695,9 @@ protected void doDecodeSharedObject(SharedObjectMessage so, IoBuffer in, Input i /** {@inheritDoc} */ public Notify decodeNotify(Encoding encoding, IoBuffer in) { + if (log.isDebugEnabled()) { + log.debug("decodeNotify: {}", encoding); + } return decodeNotify(encoding, in, null); } @@ -705,12 +713,39 @@ public Notify decodeNotify(Encoding encoding, IoBuffer in) { * @return decoded notify result */ public Notify decodeNotify(Encoding encoding, IoBuffer in, Header header) { + if (log.isDebugEnabled()) { + log.debug("decodeNotify - encoding: {} header: {}", encoding, header); + } + // instance a new Notify Notify notify = new Notify(); - int start = in.position(); - Input input; + // throw a runtime exception if there is no action + return decodeAction(encoding, in, header, notify); + } + + /** {@inheritDoc} */ + public Invoke decodeInvoke(Encoding encoding, IoBuffer in) { + if (log.isDebugEnabled()) { + log.debug("decodeInvoke - encoding: {}", encoding); + } + // throw a runtime exception if there is no action + return (Invoke) decodeAction(encoding, in, null, null); + } + + /** + * Decode the 'action' for a supplied Notify or Invoke. + * + * @param encoding AMF encoding + * @param in buffer + * @param header data header + * @param notify the notify or invoke + * @return notify + */ + private Notify decodeAction(Encoding encoding, IoBuffer in, Header header, Notify notify) { // for response, the action string and invokeId is always encoded as AMF0 we use the first byte to decide which encoding to use + in.mark(); byte tmp = in.get(); - in.position(start); + in.reset(); + Input input; if (encoding == Encoding.AMF3 && tmp == AMF.TYPE_AMF3_OBJECT) { input = new org.red5.io.amf3.Input(in); ((org.red5.io.amf3.Input) input).enforceAMF3(); @@ -719,15 +754,24 @@ public Notify decodeNotify(Encoding encoding, IoBuffer in, Header header) { } // get the action String action = Deserializer.deserialize(input, String.class); + if (action == null) { + throw new RuntimeException("Action was null"); + } if (log.isTraceEnabled()) { - log.trace("Action: {} stream id: {}", action, header.getStreamId()); + log.trace("Action: {}", action); } - // throw a runtime exception if there is no action - if (action != null) { - // TODO Handle NetStream.send? Where and how? + if (notify == null) { + // instance an Invoke + notify = new Invoke(); + // set the transaction id + ((Invoke) notify).setTransactionId(Deserializer. deserialize(input, Number.class).intValue()); + } else { + if (log.isTraceEnabled()) { + log.trace("Stream id: {}", header.getStreamId()); + } if (header != null && header.getStreamId().doubleValue() != 0.0d && !isStreamCommand(action)) { // don't decode "NetStream.send" requests - in.position(start); + //in.position(start); notify.setData(in.asReadOnlyBuffer()); return notify; } @@ -737,86 +781,27 @@ public Notify decodeNotify(Encoding encoding, IoBuffer in, Header header) { throw new RuntimeException("Notify invoke / transaction id was non-zero"); } } - // now go back to the actual encoding to decode parameters - if (encoding == Encoding.AMF3) { - input = new org.red5.io.amf3.Input(in); - ((org.red5.io.amf3.Input) input).enforceAMF3(); - } else { - input = new org.red5.io.amf.Input(in); - } - // get / set the parameters if there any - Object[] params = handleParameters(in, notify, input); - // determine service information - final int dotIndex = action.lastIndexOf('.'); - String serviceName = (dotIndex == -1) ? null : action.substring(0, dotIndex); - // pull off the prefixes since java doesn't allow this on a method name - if (serviceName != null && (serviceName.startsWith("@") || serviceName.startsWith("|"))) { - serviceName = serviceName.substring(1); - } - String serviceMethod = (dotIndex == -1) ? action : action.substring(dotIndex + 1, action.length()); - // pull off the prefixes since java doesnt allow this on a method name - if (serviceMethod.startsWith("@") || serviceMethod.startsWith("|")) { - serviceMethod = serviceMethod.substring(1); - } - Call call = new Call(serviceName, serviceMethod, params); - notify.setCall(call); - return notify; - } else { - //TODO replace this with something better as time permits - throw new RuntimeException("Action was null"); } - } - - /** {@inheritDoc} */ - public Invoke decodeInvoke(Encoding encoding, IoBuffer in) { - // for response, the action string and invokeId is always encoded as AMF0 we use the first byte to decide which encoding to use - in.mark(); - byte tmp = in.get(); - in.reset(); - Input input; - if (encoding == Encoding.AMF3 && tmp == AMF.TYPE_AMF3_OBJECT) { - input = new org.red5.io.amf3.Input(in); - ((org.red5.io.amf3.Input) input).enforceAMF3(); - } else { - input = new org.red5.io.amf.Input(in); - } - // get the action - String action = Deserializer.deserialize(input, String.class); - if (log.isTraceEnabled()) { - log.trace("Action {}", action); + // reset and decode parameters + input.reset(); + // get / set the parameters if there any + Object[] params = in.hasRemaining() ? handleParameters(in, notify, input) : new Object[0]; + // determine service information + final int dotIndex = action.lastIndexOf('.'); + String serviceName = (dotIndex == -1) ? null : action.substring(0, dotIndex); + // pull off the prefixes since java doesn't allow this on a method name + if (serviceName != null && (serviceName.startsWith("@") || serviceName.startsWith("|"))) { + serviceName = serviceName.substring(1); } - // throw a runtime exception if there is no action - if (action != null) { - Invoke invoke = new Invoke(); - invoke.setTransactionId(Deserializer. deserialize(input, Number.class).intValue()); - // now go back to the actual encoding to decode parameters - if (encoding == Encoding.AMF3) { - input = new org.red5.io.amf3.Input(in); - ((org.red5.io.amf3.Input) input).enforceAMF3(); - } else { - input = new org.red5.io.amf.Input(in); - } - // get / set the parameters if there any - Object[] params = in.hasRemaining() ? handleParameters(in, invoke, input) : new Object[0]; - // determine service information - final int dotIndex = action.lastIndexOf('.'); - String serviceName = (dotIndex == -1) ? null : action.substring(0, dotIndex); - // pull off the prefixes since java doesnt allow this on a method name - if (serviceName != null && (serviceName.startsWith("@") || serviceName.startsWith("|"))) { - serviceName = serviceName.substring(1); - } - String serviceMethod = (dotIndex == -1) ? action : action.substring(dotIndex + 1, action.length()); - // pull off the prefixes since java doesn't allow this on a method name - if (serviceMethod.startsWith("@") || serviceMethod.startsWith("|")) { - serviceMethod = serviceMethod.substring(1); - } - PendingCall call = new PendingCall(serviceName, serviceMethod, params); - invoke.setCall(call); - return invoke; - } else { - // TODO replace this with something better as time permits - throw new RuntimeException("Action was null"); + String serviceMethod = (dotIndex == -1) ? action : action.substring(dotIndex + 1, action.length()); + // pull off the prefixes since java doesnt allow this on a method name + if (serviceMethod.startsWith("@") || serviceMethod.startsWith("|")) { + serviceMethod = serviceMethod.substring(1); } + // create the pending call for invoke or call for standard notify + Call call = (notify instanceof Invoke) ? new PendingCall(serviceName, serviceMethod, params) : new Call(serviceName, serviceMethod, params); + notify.setCall(call); + return notify; } /** @@ -882,6 +867,9 @@ public VideoData decodeVideoData(IoBuffer in) { */ @SuppressWarnings("unchecked") public Notify decodeStreamMetadata(IoBuffer in) { + if (log.isDebugEnabled()) { + log.debug("decodeStreamMetadata"); + } Encoding encoding = ((RTMPConnection) Red5.getConnectionLocal()).getEncoding(); Input input = null; // check the encoding, if its AMF3 check to see if first byte is set to AMF0 @@ -947,7 +935,7 @@ public Notify decodeStreamMetadata(IoBuffer in) { log.debug("onFI params: {}", params.toString()); } } else { - log.info("stream send: {}", setData); + log.info("Stream send: {}", setData); if (log.isDebugEnabled()) { byte object = input.readDataType(); log.debug("Params type: {}", object); @@ -974,6 +962,9 @@ public Notify decodeStreamMetadata(IoBuffer in) { * @return FlexMessage event */ public FlexMessage decodeFlexMessage(IoBuffer in) { + if (log.isDebugEnabled()) { + log.debug("decodeFlexMessage"); + } // TODO: Unknown byte, probably encoding as with Flex SOs? byte flexByte = in.get(); log.trace("Flex byte: {}", flexByte); @@ -995,8 +986,8 @@ public FlexMessage decodeFlexMessage(IoBuffer in) { while (in.hasRemaining()) { // Check for AMF3 encoding of parameters byte objectEncodingType = in.get(); - in.position(in.position() - 1); log.debug("Object encoding: {}", objectEncodingType); + in.position(in.position() - 1); switch (objectEncodingType) { case AMF.TYPE_AMF3_OBJECT: case AMF3.TYPE_VECTOR_NUMBER: @@ -1035,6 +1026,9 @@ public FlexMessage decodeFlexMessage(IoBuffer in) { } public Notify decodeFlexStreamSend(IoBuffer in) { + if (log.isDebugEnabled()) { + log.debug("decodeFlexStreamSend"); + } // remove the first byte in.get(); // copy remaining to pos 0, reset mark, move to pos 0 @@ -1089,7 +1083,7 @@ private boolean isStreamCommand(String action) { */ private Object[] handleParameters(IoBuffer in, Notify notify, Input input) { Object[] params = new Object[] {}; - List paramList = new ArrayList(); + List paramList = new ArrayList<>(); final Object obj = Deserializer.deserialize(input, Object.class); if (obj instanceof Map) { // Before the actual parameters we sometimes (connect) get a map of parameters, this is usually null, but if set should be diff --git a/src/main/java/org/red5/server/stream/ClientBroadcastStream.java b/src/main/java/org/red5/server/stream/ClientBroadcastStream.java index 023d3271..ea71e522 100644 --- a/src/main/java/org/red5/server/stream/ClientBroadcastStream.java +++ b/src/main/java/org/red5/server/stream/ClientBroadcastStream.java @@ -309,15 +309,16 @@ public void dispatchEvent(IEvent event) { eventTime = rtmpEvent.getTimestamp(); log.trace("Video: {}", eventTime); } else if (rtmpEvent instanceof Invoke) { + Invoke invokeEvent = (Invoke) rtmpEvent; + log.debug("Invoke action: {}", invokeEvent.getAction()); eventTime = rtmpEvent.getTimestamp(); - //do we want to return from here? - //event / stream listeners will not be notified of invokes + // event / stream listeners will not be notified of invokes return; } else if (rtmpEvent instanceof Notify) { - // store the metadata Notify notifyEvent = (Notify) rtmpEvent; - log.debug("Notify action:{}", notifyEvent.getAction()); + log.debug("Notify action: {}", notifyEvent.getAction()); if (notifyEvent.getAction() != null && notifyEvent.getAction().equals("onMetaData")) { + // store the metadata try { log.debug("Setting metadata"); metaData = notifyEvent.duplicate(); @@ -565,37 +566,37 @@ public void onOOBControlMessage(IMessageComponent source, IPipe pipe, OOBControl @SuppressWarnings("unused") public void onPipeConnectionEvent(PipeConnectionEvent event) { switch (event.getType()) { - case PipeConnectionEvent.PROVIDER_CONNECT_PUSH: + case PROVIDER_CONNECT_PUSH: log.debug("Provider connect"); if (event.getProvider() == this && event.getSource() != connMsgOut && (event.getParamMap() == null || !event.getParamMap().containsKey("record"))) { - this.livePipe = (IPipe) event.getSource(); - log.debug("Provider: {}", this.livePipe.getClass().getName()); - for (IConsumer consumer : this.livePipe.getConsumers()) { + livePipe = (IPipe) event.getSource(); + log.debug("Provider: {}", livePipe.getClass().getName()); + for (IConsumer consumer : livePipe.getConsumers()) { subscriberStats.increment(); } } break; - case PipeConnectionEvent.PROVIDER_DISCONNECT: + case PROVIDER_DISCONNECT: log.debug("Provider disconnect"); - if (log.isDebugEnabled() && this.livePipe != null) { - log.debug("Provider: {}", this.livePipe.getClass().getName()); + if (log.isDebugEnabled() && livePipe != null) { + log.debug("Provider: {}", livePipe.getClass().getName()); } - if (this.livePipe == event.getSource()) { - this.livePipe = null; + if (livePipe == event.getSource()) { + livePipe = null; } break; - case PipeConnectionEvent.CONSUMER_CONNECT_PUSH: + case CONSUMER_CONNECT_PUSH: log.debug("Consumer connect"); IPipe pipe = (IPipe) event.getSource(); if (log.isDebugEnabled() && pipe != null) { log.debug("Consumer: {}", pipe.getClass().getName()); } - if (this.livePipe == pipe) { + if (livePipe == pipe) { notifyChunkSize(); } subscriberStats.increment(); break; - case PipeConnectionEvent.CONSUMER_DISCONNECT: + case CONSUMER_DISCONNECT: log.debug("Consumer disconnect: {}", event.getSource().getClass().getName()); subscriberStats.decrement(); break; diff --git a/src/main/java/org/red5/server/stream/PlayEngine.java b/src/main/java/org/red5/server/stream/PlayEngine.java index 8965cc63..57f2c7b6 100755 --- a/src/main/java/org/red5/server/stream/PlayEngine.java +++ b/src/main/java/org/red5/server/stream/PlayEngine.java @@ -851,7 +851,7 @@ public void close() { // XXX is clear ping required? //sendClearPing(); InMemoryPushPushPipe out = (InMemoryPushPushPipe) msgOut.get(); - if (msgOut != null) { + if (out != null) { List consumers = out.getConsumers(); // assume a list of 1 in most cases if (log.isDebugEnabled()) { @@ -1403,7 +1403,7 @@ public void onOOBControlMessage(IMessageComponent source, IPipe pipe, OOBControl /** {@inheritDoc} */ public void onPipeConnectionEvent(PipeConnectionEvent event) { switch (event.getType()) { - case PipeConnectionEvent.PROVIDER_CONNECT_PUSH: + case PROVIDER_CONNECT_PUSH: if (event.getProvider() != this) { if (waitLiveJob != null) { schedulingService.removeScheduledJob(waitLiveJob); @@ -1412,19 +1412,19 @@ public void onPipeConnectionEvent(PipeConnectionEvent event) { sendPublishedStatus(currentItem); } break; - case PipeConnectionEvent.PROVIDER_DISCONNECT: + case PROVIDER_DISCONNECT: if (pullMode) { sendStopStatus(currentItem); } else { sendUnpublishedStatus(currentItem); } break; - case PipeConnectionEvent.CONSUMER_CONNECT_PULL: + case CONSUMER_CONNECT_PULL: if (event.getConsumer() == this) { pullMode = true; } break; - case PipeConnectionEvent.CONSUMER_CONNECT_PUSH: + case CONSUMER_CONNECT_PUSH: if (event.getConsumer() == this) { pullMode = false; } diff --git a/src/main/java/org/red5/server/stream/consumer/ConnectionConsumer.java b/src/main/java/org/red5/server/stream/consumer/ConnectionConsumer.java index 49f9a358..4cc62f82 100644 --- a/src/main/java/org/red5/server/stream/consumer/ConnectionConsumer.java +++ b/src/main/java/org/red5/server/stream/consumer/ConnectionConsumer.java @@ -18,6 +18,8 @@ package org.red5.server.stream.consumer; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.mina.core.buffer.IoBuffer; import org.red5.server.api.stream.IClientStream; import org.red5.server.messaging.IMessage; @@ -85,7 +87,7 @@ public class ConnectionConsumer implements IPushableConsumer, IPipeConnectionLis /** * Whether or not the chunk size has been sent. This seems to be required for h264. */ - private boolean chunkSizeSent; + private AtomicBoolean chunkSizeSent = new AtomicBoolean(false); /** * Create RTMP connection consumer for given connection and channels. @@ -132,10 +134,8 @@ public void pushMessage(IPipe pipe, IMessage message) { StatusMessage statusMsg = (StatusMessage) message; data.sendStatus(statusMsg.getBody()); } else if (message instanceof RTMPMessage) { - //make sure chunk size has been sent - if (!chunkSizeSent) { - sendChunkSize(); - } + // make sure chunk size has been sent + sendChunkSize(); // cast to rtmp message RTMPMessage rtmpMsg = (RTMPMessage) message; IRTMPEvent msg = rtmpMsg.getBody(); @@ -149,10 +149,10 @@ public void pushMessage(IPipe pipe, IMessage message) { // get the data type byte dataType = msg.getDataType(); log.trace("Data type: {}", dataType); - //create a new header for the consumer + // create a new header for the consumer final Header header = new Header(); header.setTimerBase(eventTime); - //data buffer + // data buffer IoBuffer buf = null; switch (dataType) { case Constants.TYPE_AGGREGATE: @@ -230,12 +230,8 @@ public void pushMessage(IPipe pipe, IMessage message) { /** {@inheritDoc} */ public void onPipeConnectionEvent(PipeConnectionEvent event) { - switch (event.getType()) { - case PipeConnectionEvent.PROVIDER_DISCONNECT: - // XXX should put the channel release code in ConsumerService - closeChannels(); - break; - default: + if (event.getType().equals(PipeConnectionEvent.EventType.PROVIDER_DISCONNECT)) { + closeChannels(); } } @@ -264,6 +260,7 @@ public void onOOBControlMessage(IMessageComponent source, IPipe pipe, OOBControl int newSize = (Integer) oobCtrlMsg.getServiceParamMap().get("chunkSize"); if (newSize != chunkSize) { chunkSize = newSize; + chunkSizeSent.set(false); sendChunkSize(); } } @@ -274,10 +271,11 @@ public void onOOBControlMessage(IMessageComponent source, IPipe pipe, OOBControl * Send the chunk size */ private void sendChunkSize() { - log.debug("Sending chunk size: {}", chunkSize); - ChunkSize chunkSizeMsg = new ChunkSize(chunkSize); - conn.getChannel((byte) 2).write(chunkSizeMsg); - chunkSizeSent = true; + if (chunkSizeSent.compareAndSet(false, true)) { + log.debug("Sending chunk size: {}", chunkSize); + ChunkSize chunkSizeMsg = new ChunkSize(chunkSize); + conn.getChannel((byte) 2).write(chunkSizeMsg); + } } /** diff --git a/src/main/java/org/red5/server/stream/consumer/FileConsumer.java b/src/main/java/org/red5/server/stream/consumer/FileConsumer.java index 7ac1a82b..05973949 100644 --- a/src/main/java/org/red5/server/stream/consumer/FileConsumer.java +++ b/src/main/java/org/red5/server/stream/consumer/FileConsumer.java @@ -406,7 +406,7 @@ public void onOOBControlMessage(IMessageComponent source, IPipe pipe, OOBControl */ public void onPipeConnectionEvent(PipeConnectionEvent event) { switch (event.getType()) { - case PipeConnectionEvent.CONSUMER_CONNECT_PUSH: + case CONSUMER_CONNECT_PUSH: if (event.getConsumer() == this) { Map paramMap = event.getParamMap(); if (paramMap != null) { @@ -414,11 +414,11 @@ public void onPipeConnectionEvent(PipeConnectionEvent event) { } } break; - case PipeConnectionEvent.CONSUMER_DISCONNECT: + case CONSUMER_DISCONNECT: if (event.getConsumer() != this) { break; } - case PipeConnectionEvent.PROVIDER_DISCONNECT: + case PROVIDER_DISCONNECT: // we only support one provider at a time so releasing when provider disconnects uninit(); break; default: