From 117f675bad7b0f577c8a611aa208e075ede520d5 Mon Sep 17 00:00:00 2001 From: Sumeet Phadnis Date: Wed, 2 Oct 2024 21:57:18 +0530 Subject: [PATCH] Enable rspace module --- gradle/libs.versions.toml | 4 +- modules/rspace/build.gradle | 4 +- .../java/org/jpos/space/ReplicatedSpace.java | 58 +++++++++---------- .../jpos/space/ReplicatedSpaceAdaptor.java | 5 +- .../META-INF/q2/installs/cfg/udp.xml | 3 +- settings.gradle | 2 +- 6 files changed, 40 insertions(+), 36 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 99bbc4845d..21dce1ef49 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -22,6 +22,7 @@ flyway = '10.9.1' h2 = '2.2.224' testcontainers = '1.19.6' httpAsyncClient = '4.1.5' +jgroups = '5.3.11.Final' [libraries] jpos = { module = "org.jpos:jpos", version.ref = "jpos" } @@ -54,4 +55,5 @@ flywayPostgresql = { module = 'org.flywaydb:flyway-database-postgresql', version flywayMysql = { module = 'org.flywaydb:flyway-mysql', version.ref='flyway' } h2 = { module = 'com.h2database:h2', version.ref='h2' } testcontainersPostgresql = { module = 'org.testcontainers:postgresql', version.ref='testcontainers' } -httpAsyncClient = { module = 'org.apache.httpcomponents:httpasyncclient', version.ref = 'httpAsyncClient' } \ No newline at end of file +httpAsyncClient = { module = 'org.apache.httpcomponents:httpasyncclient', version.ref = 'httpAsyncClient' } +jgroups = { module = 'org.jgroups:jgroups', version.ref = 'jgroups' } diff --git a/modules/rspace/build.gradle b/modules/rspace/build.gradle index 8ec3b85ac1..aec621c115 100644 --- a/modules/rspace/build.gradle +++ b/modules/rspace/build.gradle @@ -1,7 +1,7 @@ description = 'jPOS-EE :: RSpace Module' dependencies { - api libraries.jpos - api libraries.jgroups + api libs.jpos + api libs.jgroups } diff --git a/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java b/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java index db43834676..d1b97bf683 100644 --- a/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java +++ b/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java @@ -23,15 +23,12 @@ import java.util.Map; import java.util.List; import java.util.UUID; + +import org.jgroups.*; import org.jpos.iso.ISOUtil; import org.jpos.util.Log; import org.jpos.util.Logger; import org.jpos.util.LogEvent; -import org.jgroups.JChannel; -import org.jgroups.View; -import org.jgroups.Message; -import org.jgroups.Address; -import org.jgroups.Receiver; @SuppressWarnings("unchecked") public class ReplicatedSpace @@ -79,7 +76,7 @@ public ReplicatedSpace ( { this (sp, groupName, configFile, null, null, false, false); } - public void close() throws IOException { + public void close() { block(); channel.close(); } @@ -90,7 +87,7 @@ public void out (Object key, Object value, long timeout) { getCoordinator(); try { Request r = new Request (Request.OUT, key, value, timeout); - channel.send (new Message (null, r)); + channel.send (new BytesMessage(null, r)); Object o = sp.in (r.getUUID(), MAX_OUT_WAIT); if (o == null) throw new SpaceError ("Could not out " + key); @@ -105,7 +102,7 @@ public void push (Object key, Object value, long timeout) { getCoordinator(); try { Request r = new Request (Request.PUSH, key, value, timeout); - channel.send (new Message (null, r)); + channel.send (new BytesMessage (null, r)); Object o = sp.in (r.getUUID(), MAX_OUT_WAIT); if (o == null) throw new SpaceError ("Could not push " + key); @@ -120,7 +117,7 @@ public void put (Object key, Object value, long timeout) { getCoordinator(); try { Request r = new Request (Request.PUT, key, value, timeout); - channel.send (new Message (null, r)); + channel.send (new BytesMessage (null, r)); Object o = sp.in (r.getUUID(), MAX_OUT_WAIT); if (o == null) throw new SpaceError ("Could not put " + key); @@ -146,7 +143,9 @@ public Object inp (Object key) { obj = null; return obj; } - public void receive (Message msg) { + + @Override + public void receive (Message msg) { LogEvent evt = null; Object obj = msg.getObject(); if (trace && logger != null) { @@ -155,8 +154,7 @@ public void receive (Message msg) { evt.addMessage (" object: " + obj.toString()); } } - if (obj instanceof Request) { - Request r = (Request) obj; + if (obj instanceof Request r) { switch (r.type) { case Request.OUT: if (r.timeout != 0) @@ -390,6 +388,7 @@ public void suspect (Address suspected_mbr) { // } /** Block sending and receiving of messages until ViewAccepted is called */ + @Override public void block () { this.view = null; } @@ -410,6 +409,7 @@ public void unblock() { } + @Override public void viewAccepted (View view) { this.view = view; if (logger != null) { @@ -418,18 +418,16 @@ public void viewAccepted (View view) { Logger.log (evt); } if (replicate && isCoordinator() && view.getMembers().size() > 1 && sp instanceof TSpace) { - new Thread () { - public void run() { - info ("New node joined, sending full Space"); - send (null, - new Request ( - Request.SPACE_COPY, - null, - ((TSpace)sp).getEntries() - ) - ); - } - }.start(); + new Thread (() -> { + info ("New node joined, sending full Space"); + send (null, + new Request ( + Request.SPACE_COPY, + null, + ((TSpace)sp).getEntries() + ) + ); + }).start(); } } public boolean isCoordinator () { @@ -448,17 +446,17 @@ public byte[] getState() { return "DummyState".getBytes(); } private void commitOff() { - if (sp instanceof JDBMSpace) - ((JDBMSpace)sp).setAutoCommit(false); + if (sp instanceof JDBMSpace jdbmSpace) + jdbmSpace.setAutoCommit(false); } private void commitOn() { - if (sp instanceof JDBMSpace) - ((JDBMSpace)sp).setAutoCommit(true); + if (sp instanceof JDBMSpace jdbmSpace) + jdbmSpace.setAutoCommit(true); } private void send (Address destination, Request r) { try { - channel.send (new Message (destination, r)); + channel.send (new BytesMessage (destination, r)); } catch (Exception e) { error (e); } @@ -468,7 +466,7 @@ private void sendToCoordinator (Request r) while (true) { Address coordinator = getCoordinator(); try { - channel.send (new Message (coordinator, r)); + channel.send (new BytesMessage (coordinator, r)); break; } catch (Exception e) { error ("error " + e.getMessage() + ", retrying"); diff --git a/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpaceAdaptor.java b/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpaceAdaptor.java index 6e24e920a0..fd982b4696 100644 --- a/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpaceAdaptor.java +++ b/modules/rspace/src/main/java/org/jpos/space/ReplicatedSpaceAdaptor.java @@ -21,7 +21,6 @@ import org.jpos.q2.QBeanSupport; import org.jpos.core.ConfigurationException; import org.jpos.util.NameRegistrar; -import org.jdom2.Element; /** * RemoteSpaceAdaptor @@ -35,6 +34,8 @@ public class ReplicatedSpaceAdaptor extends QBeanSupport { public ReplicatedSpaceAdaptor () { super (); } + + @Override public void initService() throws ConfigurationException { Space sp = SpaceFactory.getSpace (cfg.get ("space", "")); rspaceUri = cfg.get ("rspace", "rspace"); @@ -53,6 +54,8 @@ public void initService() throws ConfigurationException { throw new ConfigurationException (t); } } + + @Override protected void stopService () throws Exception { if (rs != null) rs.close(); diff --git a/modules/rspace/src/main/resources/META-INF/q2/installs/cfg/udp.xml b/modules/rspace/src/main/resources/META-INF/q2/installs/cfg/udp.xml index 6918818a47..9e3a4fb9cc 100644 --- a/modules/rspace/src/main/resources/META-INF/q2/installs/cfg/udp.xml +++ b/modules/rspace/src/main/resources/META-INF/q2/installs/cfg/udp.xml @@ -22,7 +22,8 @@ thread_pool.min_threads="0" thread_pool.max_threads="20" - thread_pool.keep_alive_time="30000"/> + thread_pool.keep_alive_time="30000" + thread_pool.use_virtual_threads="true"/>