From 9ac8060da15dd44146c8c7fbe8bb882a42e93828 Mon Sep 17 00:00:00 2001 From: Dennis Reed Date: Mon, 19 Aug 2013 23:19:31 -0500 Subject: [PATCH] [JGRP-1674] STOP_FLUSH tests --- .../protocols/FLUSH_STOP_FLUSH_Test.java | 193 ++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 tests/junit-functional/org/jgroups/protocols/FLUSH_STOP_FLUSH_Test.java diff --git a/tests/junit-functional/org/jgroups/protocols/FLUSH_STOP_FLUSH_Test.java b/tests/junit-functional/org/jgroups/protocols/FLUSH_STOP_FLUSH_Test.java new file mode 100644 index 00000000000..07e421616a6 --- /dev/null +++ b/tests/junit-functional/org/jgroups/protocols/FLUSH_STOP_FLUSH_Test.java @@ -0,0 +1,193 @@ +package org.jgroups.protocols; + +import org.jgroups.*; +import org.jgroups.util.*; +import org.jgroups.stack.*; +import org.jgroups.conf.ClassConfigurator; +import org.jgroups.stack.Protocol; +import org.jgroups.protocols.TP; +import org.jgroups.protocols.pbcast.FLUSH; +import org.jgroups.protocols.pbcast.FLUSH.FlushHeader; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.List; +import java.util.ArrayList; + +/** + * Tests the FLUSH STOP_FLUSH behavior + * @author Dennis Reed + */ +@Test(groups=Global.FUNCTIONAL,sequential=true) +public class FLUSH_STOP_FLUSH_Test { + static final short FLUSH_ID=ClassConfigurator.getProtocolId(FLUSH.class); + IpAddress a1; + FLUSH flush; + StopFlushInterceptor downInterceptor; + BlockInterceptor upInterceptor; + + @BeforeMethod + public void setUp() throws Exception { + a1=new IpAddress(1111); + + flush=new FLUSH(); + downInterceptor = new StopFlushInterceptor(a1); + downInterceptor.setUpProtocol(flush); + flush.setDownProtocol(downInterceptor); + + TP transport=new TP() { + public boolean supportsMulticasting() {return false;} + public void sendMulticast(byte[] data, int offset, int length) throws Exception {} + public void sendUnicast(PhysicalAddress dest, byte[] data, int offset, int length) throws Exception {} + public String getInfo() {return null;} + public Object down(Event evt) { return null; } + protected PhysicalAddress getPhysicalAddress() {return null;} + public TimeScheduler getTimer() {return new DefaultTimeScheduler(1);} + }; + downInterceptor.setDownProtocol(transport); + + upInterceptor = new BlockInterceptor(); + flush.setUpProtocol(upInterceptor); + + flush.start(); + + List
members=new ArrayList
(1); + members.add(a1); + View view=new View(a1, 1, members); + + // set the local address + flush.down(new Event(Event.SET_LOCAL_ADDRESS,a1)); + + // set dummy view + flush.up(new Event(Event.VIEW_CHANGE,view)); + } + + @AfterMethod + protected void tearDown() { + flush.stop(); + } + + @Test + public void testStopFlush() throws InterruptedException { + flush.down(new Event(Event.SUSPEND)); + Assert.assertTrue(upInterceptor.isBlocked()); + + flush.down(new Event(Event.RESUME)); + Assert.assertFalse(upInterceptor.isBlocked()); + + // Verify flushParticipants is set correctly on the STOP_FLUSH message + Collection
flushParticipants = downInterceptor.getFlushParticipants(); + Assert.assertNotNull(flushParticipants); + Assert.assertEquals(1, flushParticipants.size()); + Assert.assertTrue(flushParticipants.contains(a1)); + } + + @Test + public void testRogueStopFlush() throws InterruptedException { + flush.down(new Event(Event.SUSPEND)); + Assert.assertTrue(upInterceptor.isBlocked()); + + // STOP_FLUSH that is not addressed to this member + Address a2 = new IpAddress(2222); + Message msg = new Message(null, a2, null); + Collection
flushMembers = new ArrayList
(); + flushMembers.add(a2); + msg.putHeader(FLUSH_ID, new FlushHeader(FlushHeader.STOP_FLUSH, 1, flushMembers)); + flush.up(new Event(Event.MSG, msg)); + + // Should still be blocked + Assert.assertTrue(upInterceptor.isBlocked()); + } + + static class StopFlushInterceptor extends Protocol { + private Collection
flushParticipants; + private Address address; + private static Field typeField; + private static Field flushParticipantsField; + + static { + try { + typeField = FlushHeader.class.getDeclaredField("type"); + typeField.setAccessible(true); + + flushParticipantsField = FlushHeader.class.getDeclaredField("flushParticipants"); + flushParticipantsField.setAccessible(true); + } + catch ( NoSuchFieldException e ) + { + Assert.fail("FlushHeader is missing fields checked by test case", e); + } + } + + public StopFlushInterceptor ( Address address ) { + this.address = address; + } + + public String getName () { + return "StopFlushInterceptor"; + } + + public Object down(Event evt) { + if(evt.getType() == Event.MSG) { + Message msg=(Message)evt.getArg(); + FlushHeader hdr=(FlushHeader)msg.getHeader(FLUSH_ID); + if(hdr != null) { + try { + byte type = typeField.getByte(hdr); + if(type == FlushHeader.STOP_FLUSH) { + this.flushParticipants = (Collection
)flushParticipantsField.get(hdr); + } + } + catch ( IllegalAccessException e ) + { + Assert.fail("Could not make FlushHeader fields used by test accessible"); + } + } + + // loopback + if(msg.getDest() == null || msg.getDest().equals(this.address)) + { + msg.setSrc(this.address); + getUpProtocol().up(evt); + } + } + + return super.down(evt); + } + + public Collection
getFlushParticipants () + { + return this.flushParticipants; + } + } + + static class BlockInterceptor extends Protocol { + private boolean blocked = false; + + public BlockInterceptor () { + } + + public String getName () { + return "BlockInterceptor"; + } + + public Object up(Event evt) { + if(evt.getType() == Event.BLOCK) { + this.blocked = true; + } else if(evt.getType() == Event.UNBLOCK) { + this.blocked = false; + } + + return null; + } + + public boolean isBlocked() + { + return this.blocked; + } + } +}