Skip to content

Commit

Permalink
Commit for M8 tag
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Mar 10, 2016
1 parent 0105aca commit 7a16d7e
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 138 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.0.7-M7</version>
<version>1.0.7-M8</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-server-common</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/red5/server/api/Red5.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public final class Red5 {
/**
* Server version with revision
*/
public static final String VERSION = "Red5 Server 1.0.7-SNAPSHOT";
public static final String VERSION = "Red5 Server 1.0.7";

/**
* Server version for fmsVer requests
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/red5/server/net/rtmp/RTMPConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa

public static final String RTMP_CONN_MANAGER = "rtmp.connection.manager";

public static final Object RTMP_HANDLER = "rtmp.handler";

/**
* Marker byte for standard or non-encrypted RTMP data.
*/
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/org/red5/server/net/rtmp/ReceivedMessageTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,37 @@ public Thread getTaskThread() {
return taskThread;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((sessionId == null) ? 0 : sessionId.hashCode());
result = prime * result + packet.getHeader().hashCode();
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ReceivedMessageTask other = (ReceivedMessageTask) obj;
if (sessionId == null) {
if (other.sessionId != null) {
return false;
}
} else if (!sessionId.equals(other.sessionId)) {
return false;
}
if (!packet.getHeader().equals(other.packet.getHeader())) {
return false;
}
return true;
}

@Override
public String toString() {
return "[sessionId: " + sessionId + "; packetNumber: " + packetNumber + "; processing: " + processing.get() + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
import java.util.List;
import java.util.concurrent.Semaphore;

import org.apache.commons.codec.binary.Hex;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.bouncycastle.util.Arrays;
import org.red5.server.api.Red5;
import org.red5.server.net.IConnectionManager;
import org.red5.server.net.rtmp.RTMPConnection;
Expand Down Expand Up @@ -62,27 +60,32 @@ public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) th
if (conn != null) {
// set the connection to local if its referred to by this session
Red5.setConnectionLocal(conn);
// copy data range from incoming
if (log.isTraceEnabled()) {
log.trace("Incoming: in.position {}, in.limit {}, in.remaining {}", new Object[] { in.position(), in.limit(), in.remaining() });
}
byte[] arr = new byte[in.remaining()];
in.get(arr);
// create a buffer and store it on the session
IoBuffer buf = (IoBuffer) session.getAttribute("buffer");
if (buf == null) {
buf = IoBuffer.allocate(in.limit());
buf = IoBuffer.allocate(arr.length);
buf.setAutoExpand(true);
session.setAttribute("buffer", buf);
}
// copy incoming into buffer
buf.put(in);
buf.put(arr);
// flip so we can read
buf.flip();
if (log.isTraceEnabled()) {
log.trace("Buffer before: {}", Hex.encodeHexString(Arrays.copyOfRange(buf.array(), buf.position(), buf.limit())));
//log.trace("Buffer before: {}", Hex.encodeHexString(arr));
log.trace("Buffers info before: buf.position {}, buf.limit {}, buf.remaining {}", new Object[] { buf.position(), buf.limit(), buf.remaining() });
}
// get the connections decoder lock
final Semaphore lock = conn.getDecoderLock();
try {
// acquire the decoder lock
//log.trace("Decoder lock acquiring.. {}", sessionId);
lock.acquire();
log.trace("Decoder lock acquired {}", sessionId);
// construct any objects from the decoded bugger
List<?> objects = decoder.decodeBuffer(conn, buf);
log.trace("Decoded: {}", objects);
Expand All @@ -97,13 +100,12 @@ public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) th
} catch (Exception e) {
log.error("Error during decode", e);
} finally {
log.trace("Decoder lock releasing.. {}", sessionId);
lock.release();
// clear local
Red5.setConnectionLocal(null);
}
if (log.isTraceEnabled()) {
log.trace("Buffer after: {}", Hex.encodeHexString(Arrays.copyOfRange(buf.array(), buf.position(), buf.limit())));
//log.trace("Buffer after: {}", Hex.encodeHexString(Arrays.copyOfRange(buf.array(), buf.position(), buf.limit())));
log.trace("Buffers info after: buf.position {}, buf.limit {}, buf.remaining {}", new Object[] { buf.position(), buf.limit(), buf.remaining() });
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.red5.server.net.rtmp.codec;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -133,8 +134,8 @@ public List<Object> decodeBuffer(RTMPConnection conn, IoBuffer buffer) {
}
}
} catch (Exception ex) {
log.warn("Failed to decodeBuffer:: pos {}, limit {}, chunk size {}, buffer {}", position, buffer.limit(), conn.getState().getReadChunkSize()
, Hex.encodeHexString(Arrays.copyOfRange(buffer.array(), position, buffer.limit())));
log.warn("Failed to decodeBuffer: pos {}, limit {}, chunk size {}, buffer {}", position, buffer.limit(), conn.getState().getReadChunkSize(),
Hex.encodeHexString(Arrays.copyOfRange(buffer.array(), position, buffer.limit())));
// catch any non-handshake exception in the decoding; close the connection
log.warn("Closing connection because decoding failed: {}", conn, ex);
// clear the buffer to eliminate memory leaks when we can't parse protocol
Expand Down Expand Up @@ -164,7 +165,7 @@ public List<Object> decodeBuffer(RTMPConnection conn, IoBuffer buffer) {
* IoBuffer of data to be decoded
* @return one of three possible values:
*
* <pre>
* <pre>
* 1. null : the object could not be decoded, or some data was skipped, just continue
* 2. ProtocolState : the decoder was unable to decode the whole object, refer to the protocol state
* 3. Object : something was decoded, continue
Expand Down Expand Up @@ -222,8 +223,16 @@ public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer
RTMP rtmp = conn.getState();
final ChunkHeader chh = ChunkHeader.read(in);
final Header header = decodeHeader(chh, state, in, rtmp);
if (header == null) {
in.position(position);
if (header == null || header.isEmpty()) {
// ensure we dont simply have a buffer full of zeros
byte[] tmp = Arrays.copyOfRange(in.array(), position, in.limit());
BigInteger bi = new BigInteger(tmp);
if (bi.intValue() == 0) {
log.debug("Buffer seems to contain nothing but zeros, reset position to limit");
in.position(in.limit());
} else {
in.position(position);
}
return null;
}
final int channelId = header.getChannelId();
Expand Down Expand Up @@ -321,8 +330,6 @@ public Header decodeHeader(ChunkHeader chh, RTMPDecodeState state, IoBuffer in,
if (log.isTraceEnabled()) {
log.trace("lastHeader: {}", lastHeader);
}
Header header = new Header();
header.setChannelId(channelId);
if (headerSize != HEADER_NEW && lastHeader == null) {
// this will trigger an error status, which in turn will disconnect the "offending" flash player
// preventing a memory leak and bringing the whole server to its knees
Expand All @@ -336,6 +343,8 @@ public Header decodeHeader(ChunkHeader chh, RTMPDecodeState state, IoBuffer in,
return null;
}
int timeValue;
Header header = new Header();
header.setChannelId(channelId);
switch (headerSize) {
case HEADER_NEW:
// an absolute time value
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/org/red5/server/net/rtmp/event/Ping.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,26 @@ public Ping(short eventType, int value2) {
this.value2 = value2;
}

public Ping(short eventType, Number value2) {
super(Type.SYSTEM);
this.eventType = eventType;
this.value2 = value2;
}

public Ping(short eventType, int value2, int value3) {
super(Type.SYSTEM);
this.eventType = eventType;
this.value2 = value2;
this.value3 = value3;
}

public Ping(short eventType, Number value2, int value3) {
super(Type.SYSTEM);
this.eventType = eventType;
this.value2 = value2;
this.value3 = value3;
}

public Ping(short eventType, int value2, int value3, int value4) {
super(Type.SYSTEM);
this.eventType = eventType;
Expand All @@ -149,6 +162,14 @@ public Ping(short eventType, int value2, int value3, int value4) {
this.value4 = value4;
}

public Ping(short eventType, Number value2, int value3, int value4) {
super(Type.SYSTEM);
this.eventType = eventType;
this.value2 = value2;
this.value3 = value3;
this.value4 = value4;
}

public Ping(Ping in) {
super(Type.SYSTEM);
this.eventType = in.getEventType();
Expand Down
39 changes: 17 additions & 22 deletions src/main/java/org/red5/server/net/rtmp/message/ChunkHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,20 @@
import org.red5.server.net.protocol.ProtocolException;

/**
* RTMP chunk header
* https://www.adobe.com/content/dam/Adobe/en/devnet/rtmp/pdf/rtmp_specification_1.0.pdf (5.3.1.1 page 12)
* RTMP chunk header https://www.adobe.com/content/dam/Adobe/en/devnet/rtmp/pdf/rtmp_specification_1.0.pdf (5.3.1.1 page 12)
*/
public class ChunkHeader implements Constants, Cloneable, Externalizable {
private static final long serialVersionUID = 1L;

/**
* chunk format
* Chunk format
*/
private byte format;

/**
* chunk size
* Chunk size
*/
private byte size;

/**
* Channel
*/
Expand All @@ -66,7 +64,7 @@ public byte getFormat() {
public void setFormat(byte format) {
this.format = format;
}

/**
* Getter for channel id
*
Expand Down Expand Up @@ -106,15 +104,15 @@ public void setSize(byte size) {
}

public static ChunkHeader read(IoBuffer in) {
ChunkHeader h = new ChunkHeader();
final int remaining = in.remaining();
// at least one byte for valid decode
if (remaining < 1) {
throw new ProtocolException("Bad chunk header, at least 1 byte is expected");
}
byte headerByte = in.get();
//going to check highest 2 bits
h.format = (byte)((0b11000000 & headerByte) >> 6);
ChunkHeader h = new ChunkHeader();
// going to check highest 2 bits
h.format = (byte) ((0b11000000 & headerByte) >> 6);
h.size = 1;
h.channelId = 0x3F & headerByte;
if ((headerByte & 0x3f) == 0) {
Expand All @@ -135,19 +133,19 @@ public static ChunkHeader read(IoBuffer in) {
// single byte header
}
if (h.channelId < 0) {
throw new ProtocolException("Bad channel id: " + h.channelId);
throw new ProtocolException("Bad channel id: " + h.channelId);
}
return h;
}
}

/** {@inheritDoc} */
@Override
public boolean equals(Object other) {
if (!(other instanceof ChunkHeader)) {
return false;
if (other instanceof ChunkHeader) {
final ChunkHeader header = (ChunkHeader) other;
return (header.getChannelId() == channelId && header.getFormat() == format);
}
final ChunkHeader header = (ChunkHeader) other;
return (header.getChannelId() == channelId && header.getFormat() == format);
return false;
}

/** {@inheritDoc} */
Expand All @@ -163,17 +161,14 @@ public ChunkHeader clone() {
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
format = in.readByte();
channelId = in.readInt();
size = (byte)(channelId > 319 ? 3 : (channelId > 63 ? 2 : 1));
size = (byte) (channelId > 319 ? 3 : (channelId > 63 ? 2 : 1));
}

public void writeExternal(ObjectOutput out) throws IOException {
out.writeByte(format);
out.writeInt(channelId);
}

/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
// if its new and props are un-set, just return that message
Expand Down
23 changes: 20 additions & 3 deletions src/main/java/org/red5/server/net/rtmp/message/Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,23 @@ public int getTimerBase() {
return timerBase;
}

public boolean isEmpty() {
return !((channelId + dataType + size + streamId.doubleValue()) > 0d);
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + channelId;
result = prime * result + dataType;
result = prime * result + size;
result = prime * result + streamId.intValue();
result = prime * result + getTimer();
result = prime * result + extendedTimestamp;
return result;
}

/** {@inheritDoc} */
@Override
public boolean equals(Object other) {
Expand Down Expand Up @@ -246,10 +263,10 @@ public void writeExternal(ObjectOutput out) throws IOException {
@Override
public String toString() {
// if its new and props are un-set, just return that message
if ((channelId + dataType + size + streamId.doubleValue()) > 0d) {
return "Header [streamId=" + streamId + ", channelId=" + channelId + ", dataType=" + dataType + ", timerBase=" + timerBase + ", timerDelta=" + timerDelta + ", size=" + size + ", extendedTimestamp=" + extendedTimestamp + "]";
} else {
if (isEmpty()) {
return "empty";
} else {
return "Header [streamId=" + streamId + ", channelId=" + channelId + ", dataType=" + dataType + ", timerBase=" + timerBase + ", timerDelta=" + timerDelta + ", size=" + size + ", extendedTimestamp=" + extendedTimestamp + "]";
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/red5/server/stream/PlayEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1888,7 +1888,10 @@ public void execute(ISchedulingService svc) {
if (okayToSendMessage(body)) {
log.trace("ts: {}", rtmpMessage.getBody().getTimestamp());
sendMessage(rtmpMessage);
((IStreamData<?>) body).getData().free();
IoBuffer data = ((IStreamData<?>) body).getData();
if (data != null) {
data.free();
}
} else {
pendingMessage = rtmpMessage;
}
Expand Down
Loading

0 comments on commit 7a16d7e

Please sign in to comment.