From ebcc819d45cd440a1ae5bca5b27a751951100063 Mon Sep 17 00:00:00 2001 From: dsplayerX Date: Wed, 4 Oct 2023 11:56:24 +0530 Subject: [PATCH 1/9] [Automated] Update the native jar versions --- transaction-ballerina/Dependencies.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transaction-ballerina/Dependencies.toml b/transaction-ballerina/Dependencies.toml index 55479ac7..4097a969 100644 --- a/transaction-ballerina/Dependencies.toml +++ b/transaction-ballerina/Dependencies.toml @@ -5,7 +5,7 @@ [ballerina] dependencies-toml-version = "2" -distribution-version = "2201.8.0-20230726-145300-b2bdf796" +distribution-version = "2201.8.0-20230830-220400-8a7556d8" [[package]] org = "ballerina" From 6b4da9df960f6f2d617eb38383fe85dd95fda9c4 Mon Sep 17 00:00:00 2001 From: dsplayerX Date: Wed, 4 Oct 2023 12:22:06 +0530 Subject: [PATCH 2/9] [Automated] Update the native jar versions --- transaction-ballerina/Dependencies.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transaction-ballerina/Dependencies.toml b/transaction-ballerina/Dependencies.toml index 4097a969..03cdb0c5 100644 --- a/transaction-ballerina/Dependencies.toml +++ b/transaction-ballerina/Dependencies.toml @@ -64,7 +64,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.10.0" +version = "2.10.1" dependencies = [ {org = "ballerina", name = "auth"}, {org = "ballerina", name = "cache"}, From aaa7280027e0aeb38c6af45a6b7478849a2ff866 Mon Sep 17 00:00:00 2001 From: dsplayerX Date: Thu, 5 Oct 2023 16:22:58 +0530 Subject: [PATCH 3/9] Add locks where initiatedTransactions map is accessed --- transaction-ballerina/commons.bal | 146 ++++++++++++-------- transaction-ballerina/transaction_block.bal | 25 ++-- 2 files changed, 100 insertions(+), 71 deletions(-) diff --git a/transaction-ballerina/commons.bal b/transaction-ballerina/commons.bal index 906388e3..9ed19431 100644 --- a/transaction-ballerina/commons.bal +++ b/transaction-ballerina/commons.bal @@ -13,15 +13,14 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - import ballerina/cache; import ballerina/http; +import ballerina/lang.'transaction as lang_trx; +import ballerina/lang.'value as value; import ballerina/log; -import ballerina/uuid; import ballerina/task; import ballerina/time; -import ballerina/lang.'transaction as lang_trx; -import ballerina/lang.'value as value; +import ballerina/uuid; # ID of the local participant used when registering with the initiator. string localParticipantId = uuid:createType4AsString(); @@ -30,7 +29,8 @@ string localParticipantId = uuid:createType4AsString(); map initiatedTransactions = {}; # This map is used for caching transaction that are this Ballerina instance participates in. -@tainted map participatedTransactions = {}; +@tainted +map participatedTransactions = {}; # This cache is used for caching HTTP connectors against the URL, since creating connectors is expensive. cache:Cache httpClientCache = new; @@ -55,8 +55,8 @@ function cleanupTransactions() returns error? { while (i < participatedTransactionsArr.length()) { var twopcTxn = participatedTransactionsArr[i]; i += 1; - //TODO: commenting due to a caching issue - //foreach var twopcTxn in participatedTransactions { + //TODO: commenting due to a caching issue + //foreach var twopcTxn in participatedTransactions { final string participatedTxnId = getParticipatedTransactionId(twopcTxn.transactionId, twopcTxn.transactionBlockId); if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 120d) { @@ -85,7 +85,7 @@ function cleanupTransactions() returns error? { } } } - if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 600) { + if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 600) { // We don't want dead transactions hanging around removeParticipatedTransaction(participatedTxnId); } @@ -94,11 +94,11 @@ function cleanupTransactions() returns error? { worker w2 returns () { TwoPhaseCommitTransaction[] initiatedTransactionsArr = initiatedTransactions.toArray(); int i = 0; - while(i < initiatedTransactionsArr.length()) { + while (i < initiatedTransactionsArr.length()) { var twopcTxn = initiatedTransactionsArr[i]; i += 1; - //TODO:commenting due to a caching issue - //foreach var twopcTxn in initiatedTransactions { + //TODO:commenting due to a caching issue + //foreach var twopcTxn in initiatedTransactions { if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 120) { if (twopcTxn.state != TXN_STATE_ABORTED) { // Commit the transaction since prepare hasn't been received @@ -114,7 +114,7 @@ function cleanupTransactions() returns error? { } } } - if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 600) { + if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 600) { // We don't want dead transactions hanging around removeInitiatedTransaction(twopcTxn.transactionId); } @@ -125,7 +125,6 @@ function cleanupTransactions() returns error? { return value; } - function isRegisteredParticipant(string participantId, map participants) returns boolean { return participants.hasKey(participantId); } @@ -135,8 +134,8 @@ function isValidCoordinationType(string coordinationType) returns boolean { while (i < coordinationTypes.length()) { var coordType = coordinationTypes[i]; i += 1; - //TODO:commenting due to caching issue; - //foreach var coordType in coordinationTypes { + //TODO:commenting due to caching issue; + //foreach var coordType in coordinationTypes { if (coordinationType == coordType) { return true; } @@ -148,7 +147,7 @@ function protoName(UProtocol p) returns string { if (p is RemoteProtocol) { return p.name; } else { - return p.name; + return p.name; } } @@ -156,19 +155,19 @@ function protocolCompatible(string coordinationType, UProtocol?[] participantPro boolean participantProtocolIsValid = false; string[] validProtocols = coordinationTypeToProtocolsMap[coordinationType] ?: []; int i = 0; - while ( i < participantProtocols.length()) { + while (i < participantProtocols.length()) { var p = participantProtocols[i]; i += 1; - //TODO: commenting due to a caching issue - //foreach var p in participantProtocols { + //TODO: commenting due to a caching issue + //foreach var p in participantProtocols { if (p is UProtocol) { UProtocol participantProtocol = p; int j = 0; while (j < validProtocols.length()) { var validProtocol = validProtocols[j]; j += 1; - //TODO: commenting due to a caching issue - //foreach var validProtocol in validProtocols { + //TODO: commenting due to a caching issue + //foreach var validProtocol in validProtocols { if (protoName(participantProtocol) == validProtocol) { participantProtocolIsValid = true; break; @@ -188,11 +187,12 @@ type JsonTypedesc typedesc; function respondToBadRequest(http:Caller ep, string msg) { log:printError(msg); - http:Response res = new; res.statusCode = http:STATUS_BAD_REQUEST; - RequestError requestError = {errorMessage:msg}; + http:Response res = new; + res.statusCode = http:STATUS_BAD_REQUEST; + RequestError requestError = {errorMessage: msg}; var resPayload = requestError.cloneWithType(JsonTypedesc); if (resPayload is json) { - res.setJsonPayload(<@untainted json> resPayload); + res.setJsonPayload(<@untainted json>resPayload); var resResult = ep->respond(res); if (resResult is error) { log:printError("Could not send Bad Request error response to caller", 'error = resResult); @@ -220,7 +220,7 @@ function getParticipantProtocolAt(string protocolName, string transactionBlockId # corresponding to the coordinationType will also be created and stored as an initiated transaction. # # + coordinationType - The type of the coordination relevant to the transaction block for which this TransactionContext -# is being created for. +# is being created for. # + transactionBlockId - The ID of the transaction block. # + return - TransactionContext if the coordination type is valid or an error in case of an invalid coordination type. function createTransactionContext(string coordinationType, string transactionBlockId) returns TransactionContext|error { @@ -229,15 +229,17 @@ function createTransactionContext(string coordinationType, string transactionBlo error err = error(msg); return err; } else { - TwoPhaseCommitTransaction txn = new(uuid(), transactionBlockId, coordinationType = coordinationType); + TwoPhaseCommitTransaction txn = new (uuid(), transactionBlockId, coordinationType = coordinationType); string txnId = txn.transactionId; txn.isInitiated = true; - initiatedTransactions[txnId] = txn; + lock { + initiatedTransactions[txnId] = txn; + } TransactionContext txnContext = { - transactionId:txnId, - transactionBlockId:transactionBlockId, - coordinationType:coordinationType, - registerAtURL:"http://" + value:toString(coordinatorHost) + ":" + value:toString(coordinatorPort) + + transactionId: txnId, + transactionBlockId: transactionBlockId, + coordinationType: coordinationType, + registerAtURL: "http://" + value:toString(coordinatorHost) + ":" + value:toString(coordinatorPort) + initiatorCoordinatorBasePath + "/" + transactionBlockId + registrationPath }; return txnContext; @@ -254,20 +256,24 @@ function createTransactionContext(string coordinationType, string transactionBlo # + return - TransactionContext if the registration is successul or an error in case of a failure. function registerLocalParticipantWithInitiator(string transactionId, string transactionBlockId, string registerAtURL) returns TransactionContext|error { - final string trxId = transactionId; final string participantId = getParticipantId(transactionBlockId); //TODO: Protocol name should be passed down from the transaction statement - LocalProtocol participantProtocol = {name:PROTOCOL_DURABLE}; - var initiatedTxn = initiatedTransactions[transactionId]; + LocalProtocol participantProtocol = {name: PROTOCOL_DURABLE}; + TwoPhaseCommitTransaction? initiatedTxn; + lock { + initiatedTxn = initiatedTransactions[transactionId]; + } if (initiatedTxn is ()) { return error lang_trx:Error("Transaction-Unknown. Invalid TID:" + transactionId); } else { if (isRegisteredParticipant(participantId, initiatedTxn.participants)) { // Already-Registered log:printDebug("Already-Registered. TID:" + trxId + ", participant ID:" + participantId); TransactionContext txnCtx = { - transactionId:transactionId, transactionBlockId:transactionBlockId, - coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL + transactionId: transactionId, + transactionBlockId: transactionBlockId, + coordinationType: TWO_PHASE_COMMIT, + registerAtURL: registerAtURL }; return txnCtx; } else if (!protocolCompatible(initiatedTxn.coordinationType, [participantProtocol])) { // Invalid-Protocol @@ -275,17 +281,21 @@ function registerLocalParticipantWithInitiator(string transactionId, string tran participantId); } else { //Set initiator protocols - TwoPhaseCommitTransaction participatedTxn = new(transactionId, transactionBlockId); + TwoPhaseCommitTransaction participatedTxn = new (transactionId, transactionBlockId); //Protocol initiatorProto = {name: PROTOCOL_DURABLE, transactionBlockId:transactionBlockId}; //participatedTxn.coordinatorProtocols = [initiatorProto]; - LocalParticipant participant = new(participantId, participatedTxn, [participantProtocol]); + LocalParticipant participant = new (participantId, participatedTxn, [participantProtocol]); initiatedTxn.participants[participantId] = participant; string participatedTxnId = getParticipatedTransactionId(transactionId, transactionBlockId); participatedTransactions[participatedTxnId] = participatedTxn; - TransactionContext txnCtx = {transactionId:transactionId, transactionBlockId:transactionBlockId, - coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL}; + TransactionContext txnCtx = { + transactionId: transactionId, + transactionBlockId: transactionBlockId, + coordinationType: TWO_PHASE_COMMIT, + registerAtURL: registerAtURL + }; log:printDebug("Registered local participant: " + participantId + " for transaction:" + trxId); return txnCtx; } @@ -299,24 +309,34 @@ function removeParticipatedTransaction(string participatedTxnId) { } } +function hasInitiatedTransaction(string transactionId) returns boolean { + lock { + return initiatedTransactions.hasKey(transactionId); + } +} + function removeInitiatedTransaction(string transactionId) { - var removed = trap initiatedTransactions.remove(transactionId); - if (removed is error) { - panic error lang_trx:Error("Removing initiated transaction: " + transactionId + " failed"); + lock { + var removed = trap initiatedTransactions.remove(transactionId); + if (removed is error) { + panic error lang_trx:Error("Removing initiated transaction: " + transactionId + " failed"); + } } } function getInitiatorClient(string registerAtURL) returns InitiatorClientEP { InitiatorClientEP initiatorEP; if (httpClientCache.hasKey(registerAtURL)) { - return checkpanic httpClientCache.get(registerAtURL); + return checkpanic httpClientCache.get(registerAtURL); } else { lock { if (httpClientCache.hasKey(registerAtURL)) { - return checkpanic httpClientCache.get(registerAtURL); + return checkpanic httpClientCache.get(registerAtURL); } - initiatorEP = new({ registerAtURL: registerAtURL, timeout: 15, - retryConfig: { count: 2, interval: 5 } + initiatorEP = new ({ + registerAtURL: registerAtURL, + timeout: 15, + retryConfig: {count: 2, interval: 5} }); cache:Error? result = httpClientCache.put(registerAtURL, initiatorEP); if (result is cache:Error) { @@ -330,15 +350,17 @@ function getInitiatorClient(string registerAtURL) returns InitiatorClientEP { function getParticipant2pcClient(string participantURL) returns Participant2pcClientEP { Participant2pcClientEP participantEP; - if (httpClientCache.hasKey(<@untainted> participantURL)) { - return checkpanic httpClientCache.get(<@untainted>participantURL); + if (httpClientCache.hasKey(<@untainted>participantURL)) { + return checkpanic httpClientCache.get(<@untainted>participantURL); } else { lock { - if (httpClientCache.hasKey(<@untainted> participantURL)) { - return checkpanic httpClientCache.get(<@untainted>participantURL); + if (httpClientCache.hasKey(<@untainted>participantURL)) { + return checkpanic httpClientCache.get(<@untainted>participantURL); } - participantEP = new({ participantURL: participantURL, - timeout: 15, retryConfig: { count: 2, interval: 5 } + participantEP = new ({ + participantURL: participantURL, + timeout: 15, + retryConfig: {count: 2, interval: 5} }); cache:Error? result = httpClientCache.put(participantURL, participantEP); if (result is cache:Error) { @@ -352,13 +374,13 @@ function getParticipant2pcClient(string participantURL) returns Participant2pcCl # Registers a participant with the initiator's coordinator. This function will be called by the participant. # -# + transactionId - Global transaction ID to which this participant is registering with. +# + transactionId - Global transaction ID to which this participant is registering with. # + transactionBlockId - The local ID of the transaction block on the participant. # + registerAtURL - The URL of the coordinator. # + participantProtocols - The coordination protocals supported by the participant. # + return - TransactionContext if the registration is successful or an error in case of a failure. function registerParticipantWithRemoteInitiator(string transactionId, string transactionBlockId, - string registerAtURL, RemoteProtocol[] participantProtocols) + string registerAtURL, RemoteProtocol[] participantProtocols) returns TransactionContext|error { InitiatorClientEP initiatorEP = getInitiatorClient(registerAtURL); @@ -368,8 +390,10 @@ function registerParticipantWithRemoteInitiator(string transactionId, string tra if (participatedTransactions.hasKey(participatedTxnId)) { log:printDebug("Already registered with initiator for transaction:" + participatedTxnId); TransactionContext txnCtx = { - transactionId:transactionId, transactionBlockId:transactionBlockId, - coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL + transactionId: transactionId, + transactionBlockId: transactionBlockId, + coordinationType: TWO_PHASE_COMMIT, + registerAtURL: registerAtURL }; return txnCtx; } @@ -385,12 +409,14 @@ function registerParticipantWithRemoteInitiator(string transactionId, string tra return error lang_trx:Error(msg); } else { RemoteProtocol[] coordinatorProtocols = result.coordinatorProtocols; - TwoPhaseCommitTransaction twopcTxn = new(transactionId, transactionBlockId); + TwoPhaseCommitTransaction twopcTxn = new (transactionId, transactionBlockId); twopcTxn.coordinatorProtocols = toProtocolArray(coordinatorProtocols); participatedTransactions[participatedTxnId] = twopcTxn; TransactionContext txnCtx = { - transactionId:transactionId, transactionBlockId:transactionBlockId, - coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL + transactionId: transactionId, + transactionBlockId: transactionBlockId, + coordinationType: TWO_PHASE_COMMIT, + registerAtURL: registerAtURL }; final string trxId = transactionId; log:printDebug("Registered with coordinator for transaction: " + trxId); diff --git a/transaction-ballerina/transaction_block.bal b/transaction-ballerina/transaction_block.bal index 8ccc922b..12c7fd31 100644 --- a/transaction-ballerina/transaction_block.bal +++ b/transaction-ballerina/transaction_block.bal @@ -13,7 +13,6 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - import ballerina/jballerina.java; import ballerina/lang.'transaction as lang_trx; import ballerina/log; @@ -47,24 +46,27 @@ function beginRemoteParticipant(string transactionBlockId) { # initiator via a network call. # # + transactionId - Globally unique transaction ID. If this is a new transaction which is initiated, then this -# will be null. -# If this is a participant in an existing transaction, then it will have a value. +# will be null. +# If this is a participant in an existing transaction, then it will have a value. # + transactionBlockId - ID of the transaction block. Each transaction block in a process has a unique ID. # + registerAtUrl - The URL of the initiator # + coordinationType - Coordination type of this transaction # + return - Newly created/existing TransactionContext for this transaction. public function beginTransaction(string? transactionId, string transactionBlockId, string registerAtUrl, - string coordinationType) returns TransactionContext|error { + string coordinationType) returns TransactionContext|error { if (transactionId is string) { - if (initiatedTransactions.hasKey(transactionId)) { // if participant & initiator are in the same process + if (hasInitiatedTransaction(transactionId)) { // if participant & initiator are in the same process // we don't need to do a network call and can simply do a local function call return registerLocalParticipantWithInitiator(transactionId, transactionBlockId, registerAtUrl); } else { //TODO: set the proper protocol string protocolName = PROTOCOL_DURABLE; - RemoteProtocol[] protocols = [{ - name:protocolName, url:getParticipantProtocolAt(protocolName, <@untainted> transactionBlockId) - }]; + RemoteProtocol[] protocols = [ + { + name: protocolName, + url: getParticipantProtocolAt(protocolName, <@untainted>transactionBlockId) + } + ]; return registerParticipantWithRemoteInitiator(transactionId, transactionBlockId, registerAtUrl, protocols); } } else { @@ -111,12 +113,13 @@ transactional function endTransaction(string transactionId, string transactionBl } string participatedTxnId = getParticipatedTransactionId(transactionId, transactionBlockId); - if (!initiatedTransactions.hasKey(transactionId) && !participatedTransactions.hasKey(participatedTxnId)) { + + if (!hasInitiatedTransaction(transactionId) && !participatedTransactions.hasKey(participatedTxnId)) { error err = error("Transaction: " + participatedTxnId + " not found"); panic err; } - var initiatedTxn = initiatedTransactions[transactionId]; + TwoPhaseCommitTransaction? initiatedTxn = initiatedTransactions[transactionId]; if (initiatedTxn is ()) { return ""; } else { @@ -134,7 +137,7 @@ transactional function endTransaction(string transactionId, string transactionBl # # + transactionBlockId - ID of the transaction block. Each transaction block in a process has a unique ID. # + return - Transaction context. -function registerRemoteParticipant(string transactionBlockId) returns TransactionContext? = @java:Method { +function registerRemoteParticipant(string transactionBlockId) returns TransactionContext? = @java:Method { 'class: "org.ballerinalang.stdlib.transaction.Utils", name: "registerRemoteParticipant" } external; From ad2536d8371c3e99585f7437fe903b67480c339d Mon Sep 17 00:00:00 2001 From: dsplayerX Date: Thu, 5 Oct 2023 16:28:27 +0530 Subject: [PATCH 4/9] [Automated] Update the native jar versions --- transaction-ballerina/Dependencies.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transaction-ballerina/Dependencies.toml b/transaction-ballerina/Dependencies.toml index 03cdb0c5..ac1c3302 100644 --- a/transaction-ballerina/Dependencies.toml +++ b/transaction-ballerina/Dependencies.toml @@ -5,7 +5,7 @@ [ballerina] dependencies-toml-version = "2" -distribution-version = "2201.8.0-20230830-220400-8a7556d8" +distribution-version = "2201.8.0-20230908-135700-74a59dff" [[package]] org = "ballerina" From 3793861e1179a0929734e565c1f8a378758c03db Mon Sep 17 00:00:00 2001 From: dsplayerX Date: Tue, 10 Oct 2023 14:55:00 +0530 Subject: [PATCH 5/9] [Automated] Update the native jar versions --- transaction-ballerina/Dependencies.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transaction-ballerina/Dependencies.toml b/transaction-ballerina/Dependencies.toml index ac1c3302..a873e8a3 100644 --- a/transaction-ballerina/Dependencies.toml +++ b/transaction-ballerina/Dependencies.toml @@ -64,7 +64,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.10.1" +version = "2.10.2" dependencies = [ {org = "ballerina", name = "auth"}, {org = "ballerina", name = "cache"}, From cb872a57ddc092d10f3543e5e32bcaf42998c778 Mon Sep 17 00:00:00 2001 From: dsplayerX Date: Wed, 11 Oct 2023 09:58:48 +0530 Subject: [PATCH 6/9] Add lock to initiatedTransactions array conversion --- transaction-ballerina/commons.bal | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/transaction-ballerina/commons.bal b/transaction-ballerina/commons.bal index 9ed19431..acb11e78 100644 --- a/transaction-ballerina/commons.bal +++ b/transaction-ballerina/commons.bal @@ -92,7 +92,10 @@ function cleanupTransactions() returns error? { } } worker w2 returns () { - TwoPhaseCommitTransaction[] initiatedTransactionsArr = initiatedTransactions.toArray(); + TwoPhaseCommitTransaction[] initiatedTransactionsArr; + lock { + initiatedTransactionsArr = initiatedTransactions.toArray(); + } int i = 0; while (i < initiatedTransactionsArr.length()) { var twopcTxn = initiatedTransactionsArr[i]; From 810403f24eb2d9b08c1ea4ea52148e628b46537b Mon Sep 17 00:00:00 2001 From: dsplayerX Date: Wed, 11 Oct 2023 10:23:13 +0530 Subject: [PATCH 7/9] Add transaction concurrency test --- .../tests/transaction_concurrency.bal | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 transaction-ballerina/tests/transaction_concurrency.bal diff --git a/transaction-ballerina/tests/transaction_concurrency.bal b/transaction-ballerina/tests/transaction_concurrency.bal new file mode 100644 index 00000000..7a4e7d18 --- /dev/null +++ b/transaction-ballerina/tests/transaction_concurrency.bal @@ -0,0 +1,97 @@ +// Copyright (c) 2023 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import ballerina/lang.runtime; +import ballerina/task; +import ballerina/test; + +isolated map taskCounterMap = {"A": 0, "B": 0, "C": 0, "D": 0, "E": 0}; + +public isolated function performTransaction() returns error? { + transaction { + check commit; + } +} + +public isolated class TaskExecuter { + + *task:Job; + private int counter = 0; + private final string name; + + function init(string name) { + self.name = name; + } + + public isolated function execute() { + int count = 0; + lock { + count = self.counter.cloneReadOnly(); + if (count >= 100) { + return; + } + self.counter += 1; + } + + var err = trap performTransaction(); + + if !(err is ()) { + test:assertFail("Error in task " + self.name + " : " + err.toString()); + } + + lock { + int i = taskCounterMap.get(self.name); + taskCounterMap[self.name] = (i + 1); + } + } + + public isolated function scheduleTaskExecution(decimal interval) { + do { + var _ = check task:scheduleJobRecurByFrequency(self, interval); + } on fail error err { + test:assertFail("Error in scheduling task " + self.name + " : " + err.toString()); + } + } +} + +public function scheduleTasks() returns error? { + var taskA = new TaskExecuter("A"); + var taskB = new TaskExecuter("B"); + var taskC = new TaskExecuter("C"); + var taskD = new TaskExecuter("D"); + var taskE = new TaskExecuter("E"); + + decimal interval = 0.1; + + taskA.scheduleTaskExecution(interval); + taskB.scheduleTaskExecution(interval); + taskC.scheduleTaskExecution(interval); + taskD.scheduleTaskExecution(interval); + taskE.scheduleTaskExecution(interval); + + runtime:sleep(10); // Sleep to allow tasks to run +} + +@test:Config { + before: scheduleTasks +} +public function testTransactionConcurrency() { + map expectedCountsMap = {"A": 100, "B": 100, "C": 100, "D": 100, "E": 100}; + map actualCountsMap; + lock { + actualCountsMap = taskCounterMap.cloneReadOnly(); + } + test:assertEquals(actualCountsMap, expectedCountsMap, "Transaction concurrency test failed"); +} From 0db265273ca1b0c4e76b481956f778f7f2e4afc3 Mon Sep 17 00:00:00 2001 From: dsplayerX Date: Fri, 20 Oct 2023 10:47:09 +0530 Subject: [PATCH 8/9] [Automated] Update the native jar versions --- transaction-ballerina/Dependencies.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transaction-ballerina/Dependencies.toml b/transaction-ballerina/Dependencies.toml index a873e8a3..5d971d0d 100644 --- a/transaction-ballerina/Dependencies.toml +++ b/transaction-ballerina/Dependencies.toml @@ -64,7 +64,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.10.2" +version = "2.10.3" dependencies = [ {org = "ballerina", name = "auth"}, {org = "ballerina", name = "cache"}, From ee909bfcec53393968cfd5b5d99cb732ea4bd0c7 Mon Sep 17 00:00:00 2001 From: dsplayerX Date: Fri, 20 Oct 2023 12:20:51 +0530 Subject: [PATCH 9/9] Refactor transaction concurrency test --- .../tests/transaction_concurrency.bal | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/transaction-ballerina/tests/transaction_concurrency.bal b/transaction-ballerina/tests/transaction_concurrency.bal index 7a4e7d18..1eaaaa8c 100644 --- a/transaction-ballerina/tests/transaction_concurrency.bal +++ b/transaction-ballerina/tests/transaction_concurrency.bal @@ -25,7 +25,7 @@ public isolated function performTransaction() returns error? { } } -public isolated class TaskExecuter { +public isolated class ExecuteTask { *task:Job; private int counter = 0; @@ -39,7 +39,7 @@ public isolated class TaskExecuter { int count = 0; lock { count = self.counter.cloneReadOnly(); - if (count >= 100) { + if count >= 100 { return; } self.counter += 1; @@ -48,7 +48,7 @@ public isolated class TaskExecuter { var err = trap performTransaction(); if !(err is ()) { - test:assertFail("Error in task " + self.name + " : " + err.toString()); + test:assertFail(string`Error in task " ${self.name} : ${err.toString()}`); } lock { @@ -59,27 +59,27 @@ public isolated class TaskExecuter { public isolated function scheduleTaskExecution(decimal interval) { do { - var _ = check task:scheduleJobRecurByFrequency(self, interval); + _ = check task:scheduleJobRecurByFrequency(self, interval); } on fail error err { - test:assertFail("Error in scheduling task " + self.name + " : " + err.toString()); + test:assertFail(string`Error in scheduling task ${self.name}: ${err.toString()}`); } } } public function scheduleTasks() returns error? { - var taskA = new TaskExecuter("A"); - var taskB = new TaskExecuter("B"); - var taskC = new TaskExecuter("C"); - var taskD = new TaskExecuter("D"); - var taskE = new TaskExecuter("E"); + ExecuteTask[] tasks = []; + string[] taskNames = ["A", "B", "C", "D", "E"]; + decimal interval = 0.1; // 100 milliseconds - decimal interval = 0.1; + from string taskName in taskNames + do { + tasks.push(new ExecuteTask(taskName)); + }; - taskA.scheduleTaskExecution(interval); - taskB.scheduleTaskExecution(interval); - taskC.scheduleTaskExecution(interval); - taskD.scheduleTaskExecution(interval); - taskE.scheduleTaskExecution(interval); + from ExecuteTask task in tasks + do { + task.scheduleTaskExecution(interval); + }; runtime:sleep(10); // Sleep to allow tasks to run } @@ -93,5 +93,5 @@ public function testTransactionConcurrency() { lock { actualCountsMap = taskCounterMap.cloneReadOnly(); } - test:assertEquals(actualCountsMap, expectedCountsMap, "Transaction concurrency test failed"); + test:assertEquals(actualCountsMap, expectedCountsMap, "Transaction concurrency test failed."); }