From ef7ffb06141a227fa8184e2bd8c4394f3588c309 Mon Sep 17 00:00:00 2001 From: Paul Gregoire Date: Thu, 24 Mar 2016 09:26:59 -0400 Subject: [PATCH] Fixes for RTMP decoding --- .../org/red5/server/net/rtmp/codec/RTMP.java | 24 -------- .../net/rtmp/codec/RTMPProtocolDecoder.java | 56 ++++--------------- .../rtmp/codec/TestRTMPProtocolDecoder.java | 4 +- 3 files changed, 12 insertions(+), 72 deletions(-) diff --git a/src/main/java/org/red5/server/net/rtmp/codec/RTMP.java b/src/main/java/org/red5/server/net/rtmp/codec/RTMP.java index e364876a..09801140 100644 --- a/src/main/java/org/red5/server/net/rtmp/codec/RTMP.java +++ b/src/main/java/org/red5/server/net/rtmp/codec/RTMP.java @@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentMap; import org.red5.server.api.IConnection.Encoding; -import org.red5.server.net.rtmp.message.ChunkHeader; import org.red5.server.net.rtmp.message.Header; import org.red5.server.net.rtmp.message.Packet; @@ -108,12 +107,6 @@ public class RTMP { */ private Encoding encoding = Encoding.AMF0; - /** - * Temporarily stored chunk header. This is for partial chunk headers which required more bytes than were - * available at the time of initial parsing; mostly to determine the channel id to which it belongs. - */ - private ChunkHeader chunkHeader; - /** * Creates RTMP object; essentially for storing session information. */ @@ -194,23 +187,6 @@ public void setState(byte state) { } } - /** - * Get the temporary chunk header if it exists. - * - * @return chunkHeader - */ - public ChunkHeader getChunkHeader() { - return chunkHeader; - } - - /** - * Set a temporary chunk header. - * @param chunkHeader - */ - public void setChunkHeader(ChunkHeader chunkHeader) { - this.chunkHeader = chunkHeader; - } - /** * Setter for last read header. * diff --git a/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java b/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java index eac509ba..e468a9c7 100644 --- a/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java +++ b/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java @@ -227,46 +227,19 @@ public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer } // get RTMP state holder RTMP rtmp = conn.getState(); - // get chunk header (a chunk header varies from 1-3 bytes) - ChunkHeader chunkHeader = rtmp.getChunkHeader(); - if (chunkHeader == null) { - if (log.isTraceEnabled()) { - log.trace("No decoded chunk header available"); - } - try { - // read the chunk header - chunkHeader = ChunkHeader.read(in); - // store the chunk header - rtmp.setChunkHeader(chunkHeader); - } catch (ProtocolException pe) { - log.warn("Chunk header read exception", pe); - // back up the position so next read gets the rest - in.position(position); - return null; - } - } + // read the chunk header (variable from 1-3 bytes) + final ChunkHeader chunkHeader = ChunkHeader.read(in); // represents "packet" header length via "format" only 1 byte in the chunk header is needed here int headerLength = RTMPUtils.getHeaderLength(chunkHeader.getFormat()); + headerLength += chunkHeader.getSize() - 1; if (in.remaining() < headerLength) { - log.debug("Not enough data available for the packet header to decode - required: {} remaining: {} ", headerLength, in.remaining()); + state.bufferDecoding(headerLength - in.remaining()); + in.position(position); return null; } + final Header header = decodeHeader(chunkHeader, state, in, rtmp); // get the channel id - int channelId = chunkHeader.getChannelId(); - // decode the header based on the current chunked header data, channel id is needed here so thats 1-3 bytes of chunk header - Header header = rtmp.getLastReadHeader(channelId); - if (header == null) { - if (log.isTraceEnabled()) { - log.trace("No decoded header available"); - } - // decode the packet header - header = decodeHeader(chunkHeader, state, in, rtmp); - if (header != null) { - // store the header based on its channel id - rtmp.setLastReadHeader(channelId, header); - } - } - // if the header is null here, we have issues + final int channelId = header != null ? header.getChannelId() : chunkHeader.getChannelId(); if (header == null || header.isEmpty()) { if (log.isTraceEnabled()) { log.trace("Header was null or empty - chh: {}", chunkHeader); @@ -274,8 +247,6 @@ public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer // clear / compact the input and close the channel in.clear(); in.compact(); - // clear the chunk header - rtmp.setChunkHeader(null); // send a NetStream.Failed message StreamService.sendNetStreamStatus(conn, StatusCodes.NS_FAILED, "Bad data on channel: " + channelId, "no-name", Status.ERROR, conn.getStreamIdForChannelId(channelId)); // close the channel on which the issue occurred, until we find a way to exclude the current data @@ -284,12 +255,8 @@ public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer } // get the size of our chunks int readChunkSize = rtmp.getReadChunkSize(); - // ensure theres enough data for the packet to decode -// int totalBytesRequired = header.getSize(); // + (header.getSize() / readChunkSize); -// if (in.remaining() <= totalBytesRequired) { -// log.debug("Not enough data available for the packet to decode - required: {} remaining: {}", totalBytesRequired, in.remaining()); -// return null; -// } + // store the header based on its channel id + rtmp.setLastReadHeader(channelId, header); // check to see if this is a new packet or continue decoding an existing one Packet packet = rtmp.getLastReadPacket(channelId); if (packet == null) { @@ -306,7 +273,7 @@ public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer // read chunk int length = Math.min(buf.remaining(), readChunkSize); if (in.remaining() < length) { - log.trace("Chunk too small, buffering ({},{})", in.remaining(), length); + log.debug("Chunk too small, buffering ({},{})", in.remaining(), length); // how much more data we need to continue? state.bufferDecoding(in.position() - position + length); // we need to move back position so header will be available during another decode round @@ -360,7 +327,6 @@ public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer lastHeader.setTimerBase(header.getTimer()); } finally { rtmp.setLastReadPacket(channelId, null); - rtmp.setChunkHeader(null); } return packet; } @@ -410,7 +376,7 @@ public Header decodeHeader(ChunkHeader chh, RTMPDecodeState state, IoBuffer in, log.trace("headerLength: {}", headerLength); } if (remaining < headerLength) { - log.debug("Header too small (hlen: {}), buffering. remaining: {}", headerLength, remaining); + log.trace("Header too small (hlen: {}), buffering. remaining: {}", headerLength, remaining); state.bufferDecoding(headerLength); return null; } diff --git a/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java b/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java index aeec3240..2aa607d0 100644 --- a/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java +++ b/src/test/java/org/red5/server/net/rtmp/codec/TestRTMPProtocolDecoder.java @@ -3,7 +3,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import java.io.BufferedReader; import java.io.File; @@ -15,8 +14,6 @@ import org.junit.Before; import org.junit.Test; import org.red5.io.utils.IOUtils; -import org.red5.server.net.protocol.RTMPDecodeState; -import org.red5.server.net.rtmp.Channel; import org.red5.server.net.rtmp.IRTMPHandler; import org.red5.server.net.rtmp.RTMPConnection; import org.red5.server.net.rtmp.RTMPMinaConnection; @@ -273,6 +270,7 @@ public void connectionClosed(RTMPConnection conn) { log.debug("connectionClosed - conn: {}", conn); } + @SuppressWarnings("unused") private void fillBufferFromStringData(IoBuffer buf, String byteDumpFile) throws Exception { File f = new File(String.format("%s/target/test-classes/%s", System.getProperty("user.dir"), byteDumpFile)); BufferedReader in = new BufferedReader(new FileReader(f));