Skip to content

Commit

Permalink
Merge pull request #317 from sumeetphadnis/next
Browse files Browse the repository at this point in the history
Enable rspace module
  • Loading branch information
ar authored Oct 2, 2024
2 parents d3610e8 + 117f675 commit 197fa8c
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 36 deletions.
4 changes: 3 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ flyway = '10.9.1'
h2 = '2.2.224'
testcontainers = '1.19.6'
httpAsyncClient = '4.1.5'
jgroups = '5.3.11.Final'

[libraries]
jpos = { module = "org.jpos:jpos", version.ref = "jpos" }
Expand Down Expand Up @@ -54,4 +55,5 @@ flywayPostgresql = { module = 'org.flywaydb:flyway-database-postgresql', version
flywayMysql = { module = 'org.flywaydb:flyway-mysql', version.ref='flyway' }
h2 = { module = 'com.h2database:h2', version.ref='h2' }
testcontainersPostgresql = { module = 'org.testcontainers:postgresql', version.ref='testcontainers' }
httpAsyncClient = { module = 'org.apache.httpcomponents:httpasyncclient', version.ref = 'httpAsyncClient' }
httpAsyncClient = { module = 'org.apache.httpcomponents:httpasyncclient', version.ref = 'httpAsyncClient' }
jgroups = { module = 'org.jgroups:jgroups', version.ref = 'jgroups' }
4 changes: 2 additions & 2 deletions modules/rspace/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
description = 'jPOS-EE :: RSpace Module'

dependencies {
api libraries.jpos
api libraries.jgroups
api libs.jpos
api libs.jgroups
}

58 changes: 28 additions & 30 deletions modules/rspace/src/main/java/org/jpos/space/ReplicatedSpace.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@
import java.util.Map;
import java.util.List;
import java.util.UUID;

import org.jgroups.*;
import org.jpos.iso.ISOUtil;
import org.jpos.util.Log;
import org.jpos.util.Logger;
import org.jpos.util.LogEvent;
import org.jgroups.JChannel;
import org.jgroups.View;
import org.jgroups.Message;
import org.jgroups.Address;
import org.jgroups.Receiver;

@SuppressWarnings("unchecked")
public class ReplicatedSpace
Expand Down Expand Up @@ -79,7 +76,7 @@ public ReplicatedSpace (
{
this (sp, groupName, configFile, null, null, false, false);
}
public void close() throws IOException {
public void close() {
block();
channel.close();
}
Expand All @@ -90,7 +87,7 @@ public void out (Object key, Object value, long timeout) {
getCoordinator();
try {
Request r = new Request (Request.OUT, key, value, timeout);
channel.send (new Message (null, r));
channel.send (new BytesMessage(null, r));
Object o = sp.in (r.getUUID(), MAX_OUT_WAIT);
if (o == null)
throw new SpaceError ("Could not out " + key);
Expand All @@ -105,7 +102,7 @@ public void push (Object key, Object value, long timeout) {
getCoordinator();
try {
Request r = new Request (Request.PUSH, key, value, timeout);
channel.send (new Message (null, r));
channel.send (new BytesMessage (null, r));
Object o = sp.in (r.getUUID(), MAX_OUT_WAIT);
if (o == null)
throw new SpaceError ("Could not push " + key);
Expand All @@ -120,7 +117,7 @@ public void put (Object key, Object value, long timeout) {
getCoordinator();
try {
Request r = new Request (Request.PUT, key, value, timeout);
channel.send (new Message (null, r));
channel.send (new BytesMessage (null, r));
Object o = sp.in (r.getUUID(), MAX_OUT_WAIT);
if (o == null)
throw new SpaceError ("Could not put " + key);
Expand All @@ -146,7 +143,9 @@ public Object inp (Object key) {
obj = null;
return obj;
}
public void receive (Message msg) {

@Override
public void receive (Message msg) {
LogEvent evt = null;
Object obj = msg.getObject();
if (trace && logger != null) {
Expand All @@ -155,8 +154,7 @@ public void receive (Message msg) {
evt.addMessage (" object: " + obj.toString());
}
}
if (obj instanceof Request) {
Request r = (Request) obj;
if (obj instanceof Request r) {
switch (r.type) {
case Request.OUT:
if (r.timeout != 0)
Expand Down Expand Up @@ -390,6 +388,7 @@ public void suspect (Address suspected_mbr) {
//
}
/** Block sending and receiving of messages until ViewAccepted is called */
@Override
public void block () {
this.view = null;
}
Expand All @@ -410,6 +409,7 @@ public void unblock() {

}

@Override
public void viewAccepted (View view) {
this.view = view;
if (logger != null) {
Expand All @@ -418,18 +418,16 @@ public void viewAccepted (View view) {
Logger.log (evt);
}
if (replicate && isCoordinator() && view.getMembers().size() > 1 && sp instanceof TSpace) {
new Thread () {
public void run() {
info ("New node joined, sending full Space");
send (null,
new Request (
Request.SPACE_COPY,
null,
((TSpace)sp).getEntries()
)
);
}
}.start();
new Thread (() -> {
info ("New node joined, sending full Space");
send (null,
new Request (
Request.SPACE_COPY,
null,
((TSpace)sp).getEntries()
)
);
}).start();
}
}
public boolean isCoordinator () {
Expand All @@ -448,17 +446,17 @@ public byte[] getState() {
return "DummyState".getBytes();
}
private void commitOff() {
if (sp instanceof JDBMSpace)
((JDBMSpace)sp).setAutoCommit(false);
if (sp instanceof JDBMSpace jdbmSpace)
jdbmSpace.setAutoCommit(false);
}
private void commitOn() {
if (sp instanceof JDBMSpace)
((JDBMSpace)sp).setAutoCommit(true);
if (sp instanceof JDBMSpace jdbmSpace)
jdbmSpace.setAutoCommit(true);
}
private void send (Address destination, Request r)
{
try {
channel.send (new Message (destination, r));
channel.send (new BytesMessage (destination, r));
} catch (Exception e) {
error (e);
}
Expand All @@ -468,7 +466,7 @@ private void sendToCoordinator (Request r)
while (true) {
Address coordinator = getCoordinator();
try {
channel.send (new Message (coordinator, r));
channel.send (new BytesMessage (coordinator, r));
break;
} catch (Exception e) {
error ("error " + e.getMessage() + ", retrying");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.jpos.q2.QBeanSupport;
import org.jpos.core.ConfigurationException;
import org.jpos.util.NameRegistrar;
import org.jdom2.Element;

/**
* RemoteSpaceAdaptor
Expand All @@ -35,6 +34,8 @@ public class ReplicatedSpaceAdaptor extends QBeanSupport {
public ReplicatedSpaceAdaptor () {
super ();
}

@Override
public void initService() throws ConfigurationException {
Space sp = SpaceFactory.getSpace (cfg.get ("space", ""));
rspaceUri = cfg.get ("rspace", "rspace");
Expand All @@ -53,6 +54,8 @@ public void initService() throws ConfigurationException {
throw new ConfigurationException (t);
}
}

@Override
protected void stopService () throws Exception {
if (rs != null)
rs.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

thread_pool.min_threads="0"
thread_pool.max_threads="20"
thread_pool.keep_alive_time="30000"/>
thread_pool.keep_alive_time="30000"
thread_pool.use_virtual_threads="true"/>

<PING />
<MERGE3 max_interval="30000"
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ include ':modules:http-client'
include ':modules:resultcode'
include ':modules:eeuser'
include ':modules:status'
include ':modules:rspace'
// include ':modules:dbtest'

dependencyResolutionManagement {
Expand All @@ -50,7 +51,6 @@ dependencyResolutionManagement {
// include ':modules:saf-monitor'
// include ':modules:things'
// include ':modules:visitor'
// include ':modules:rspace'
// include ':modules:rest-testbed'
// include ':modules:testbed'
// include ':modules:eerest'
Expand Down

0 comments on commit 197fa8c

Please sign in to comment.