Skip to content

Commit

Permalink
Conflicts resolved with rel-v7r1
Browse files Browse the repository at this point in the history
  • Loading branch information
atsareg committed Mar 31, 2021
2 parents 83e4d8a + a61a752 commit 4176025
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 11 deletions.
17 changes: 17 additions & 0 deletions release.notes
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ NEW: (#4910) --runslow option on unit tests to allow faster local tests
NEW: (#4938) added a helloworld test for the (yet to be implemented) cloud testing in certification
CHANGE: (#4968) Change the defaults for tests (to MySQL 8 and ES 7)

[v7r1p36]

*DMS
FIX: (#5080) SQLAlchemy 1.4 support for FTS3DB
FIX: (#5054) Submit FTS3 jobs with lowercase checksum

*RMS
FIX: (#5080) SQLAlchemy 1.4 support for ReqDB

*Resources
FIX: (#5077) Host.py - Fix for the case where no subprocess32 module found

[v7r1p35]

FIX: Fixes from v7r0p52
Expand Down Expand Up @@ -740,6 +752,11 @@ FIX: (#4551) align ProxyDB test to current changes
NEW: (#4289) Document how to run integration tests in docker
NEW: (#4551) add DNProperties description to Registry/Users subsection

[v7r0p53]

*Interface
FIX: (#5069) parseArguments strip arguments correctly

[v7r0p52]

*Core
Expand Down
3 changes: 2 additions & 1 deletion src/DIRAC/Core/Utilities/MySQL.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
__RCSID__ = "$Id$"

MAXCONNECTRETRY = 10
RETRY_SLEEP_DURATION = 5


def _checkFields(inFields, inValues):
Expand Down Expand Up @@ -254,7 +255,7 @@ def get(self, dbName, retries=10):
return self.__getWithRetry(dbName, retries, retries)

def __getWithRetry(self, dbName, totalRetries, retriesLeft):
sleepTime = 5 * (totalRetries - retriesLeft)
sleepTime = RETRY_SLEEP_DURATION * (totalRetries - retriesLeft)
if sleepTime > 0:
time.sleep(sleepTime)
try:
Expand Down
8 changes: 7 additions & 1 deletion src/DIRAC/DataManagementSystem/Client/FTS3Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,15 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
transfers.append(stageTrans)

trans_metadata = {'desc': 'Transfer %s' % ftsFileID, 'fileID': ftsFileID}

# because of an xroot bug (https://github.com/xrootd/xrootd/issues/1433)
# the checksum needs to be lowercase. It does not impact the other
# protocol, so it's fine to put it here.
# I only add it in this transfer and not the "staging" one above because it
# impacts only root -> root transfers
trans = fts3.new_transfer(sourceSURL,
targetSURL,
checksum='ADLER32:%s' % ftsFile.checksum,
checksum='ADLER32:%s' % ftsFile.checksum.lower(),
filesize=ftsFile.size,
metadata=trans_metadata,
activity=self.activity)
Expand Down
32 changes: 27 additions & 5 deletions src/DIRAC/DataManagementSystem/DB/FTS3DB.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@
)


# About synchronize_session:
# The FTS3DB class uses SQLAlchemy in a mixed mode:
# both the ORM and the Core style.
# Up to 1.3, the `session.execute` statements had no ORM functionality,
# meaning that the session cache were not updated when using `update` or `delete`
# Although it could be an issue, it was not really one in our case, since we always close
# the session
# As of 1.4, `session.execute` supports ORM functionality, and thus needs more info to know
# how to manage `update` or `delete`. Hense the `synchronize_session` option.
# We set it to `False` simply because we do not rely on the session cache.
# Please see https://github.com/sqlalchemy/sqlalchemy/discussions/6159 for detailed discussion

########################################################################
class FTS3DB(object):
"""
Expand Down Expand Up @@ -312,6 +324,7 @@ def getActiveJobs(self, limit=20, lastMonitor=None, jobAssignmentTag="Assigned")
.where(FTS3Job.jobID.in_(jobIds)
)
.values({'assignment': jobAssignmentTag})
.execution_options(synchronize_session=False) # see comment about synchronize_session
)

session.commit()
Expand Down Expand Up @@ -384,7 +397,8 @@ def updateFileStatus(self, fileStatusDict, ftsGUID=None):
updateQuery = update(FTS3File)\
.where(and_(*whereConditions)
)\
.values(updateDict)
.values(updateDict)\
.execution_options(synchronize_session=False) # see comment about synchronize_session

session.execute(updateQuery)

Expand Down Expand Up @@ -435,6 +449,7 @@ def updateJobStatus(self, jobStatusDict):
)
)
.values(updateDict)
.execution_options(synchronize_session=False) # see comment about synchronize_session
)
session.commit()

Expand Down Expand Up @@ -480,7 +495,8 @@ def cancelNonExistingJob(self, operationID, ftsGUID):
.where(and_(FTS3File.operationID == FTS3Job.operationID,
FTS3File.ftsGUID == FTS3Job.ftsGUID,
FTS3Job.operationID == operationID,
FTS3Job.ftsGUID == ftsGUID))
FTS3Job.ftsGUID == ftsGUID))\
.execution_options(synchronize_session=False) # see comment about synchronize_session

session.execute(updStmt)
session.commit()
Expand Down Expand Up @@ -541,6 +557,7 @@ def getNonFinishedOperations(self, limit=20, operationAssignmentTag="Assigned"):
.where(FTS3Operation.operationID.in_(operationIDs)
)
.values({'assignment': operationAssignmentTag})
.execution_options(synchronize_session=False) # see comment about synchronize_session
)

session.commit()
Expand Down Expand Up @@ -588,7 +605,9 @@ def kickStuckOperations(self, limit=20, kickDelay=2):
'INTERVAL %d HOUR' %
kickDelay)))) .values(
{
'assignment': None}))
'assignment': None})
.execution_options(synchronize_session=False) # see comment about synchronize_session
)
rowCount = result.rowcount

session.commit()
Expand Down Expand Up @@ -636,7 +655,9 @@ def kickStuckJobs(self, limit=20, kickDelay=2):
'INTERVAL %d HOUR' %
kickDelay)))) .values(
{
'assignment': None}))
'assignment': None})
.execution_options(synchronize_session=False) # see comment about synchronize_session
)
rowCount = result.rowcount

session.commit()
Expand Down Expand Up @@ -677,7 +698,8 @@ def deleteFinalOperations(self, limit=20, deleteDelay=180):
rowCount = 0
if opIDs:
result = session.execute(delete(FTS3Operation)
.where(FTS3Operation.operationID.in_(opIDs)))
.where(FTS3Operation.operationID.in_(opIDs))
.execution_options(synchronize_session=False))
rowCount = result.rowcount

session.commit()
Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/Interfaces/API/Dirac.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
def parseArguments(args):
argList = []
for arg in args:
argList += arg.split(',')
argList += [a.strip() for a in arg.split(',') if a.strip()]
return argList


Expand Down
3 changes: 2 additions & 1 deletion src/DIRAC/RequestManagementSystem/DB/RequestDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ def cancelRequest(self, requestID):
.where(Request.RequestID == requestID)
.values({Request._Status: 'Canceled',
Request._LastUpdate: datetime.datetime.utcnow()
.strftime(Request._datetimeFormat)}))
.strftime(Request._datetimeFormat)})
.execution_options(synchronize_session=False)) # See FTS3DB for synchronize_session
session.commit()

# No row was changed
Expand Down
5 changes: 4 additions & 1 deletion src/DIRAC/Resources/Computing/BatchSystems/Host.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import glob
import shutil
import signal
import subprocess32 as subprocess
try:
import subprocess32 as subprocess
except ImportError:
import subprocess
import stat
import json
import multiprocessing
Expand Down
7 changes: 6 additions & 1 deletion tests/Integration/Core/Test_MySQLDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import time
import pytest

import DIRAC
from DIRAC import gLogger, gConfig
from DIRAC.Core.Utilities import Time
from DIRAC.Core.Utilities.MySQL import MySQL
Expand Down Expand Up @@ -108,9 +109,13 @@ def genVal2():
('mysql', 'Dirac', 'Dirac', 'AccountingDB', 3306, True),
('fake', 'fake', 'fake', 'FakeDB', 0000, False),
])
def test_connection(host, user, password, dbName, port, expected):
def test_connection(host, user, password, dbName, port, expected, monkeypatch):
""" Try to connect to a DB
"""
# Avoid having many retries which sleep for long durations
monkeypatch.setattr(DIRAC.Core.Utilities.MySQL, "MAXCONNECTRETRY", 1)
monkeypatch.setattr(DIRAC.Core.Utilities.MySQL, "RETRY_SLEEP_DURATION", 0.5)

mysqlDB = getDB(host, user, password, dbName, port)
result = mysqlDB._connect()
assert result['OK'] is expected
Expand Down

0 comments on commit 4176025

Please sign in to comment.