Skip to content

Commit

Permalink
sweep: #7632 feat (DM): add a protocol option to getReplicas method f…
Browse files Browse the repository at this point in the history
…amily
  • Loading branch information
chaen authored and web-flow committed May 30, 2024
1 parent 3497e5f commit 98e1f42
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions src/DIRAC/DataManagementSystem/Client/DataManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,10 +1558,16 @@ def put(self, lfn, fileName, diracSE, path=None):
# File catalog methods
#

def getActiveReplicas(self, lfns, getUrl=True, diskOnly=False, preferDisk=False):
def getActiveReplicas(self, lfns, getUrl=True, diskOnly=False, preferDisk=False, protocol=None):
"""Get all the replicas for the SEs which are in Active status for reading."""
return self.getReplicas(
lfns, allStatus=False, getUrl=getUrl, diskOnly=diskOnly, preferDisk=preferDisk, active=True
lfns,
allStatus=False,
getUrl=getUrl,
diskOnly=diskOnly,
preferDisk=preferDisk,
active=True,
protocol=protocol,
)

def __filterTapeReplicas(self, replicaDict, diskOnly=False):
Expand Down Expand Up @@ -1666,12 +1672,17 @@ def __checkSEStatus(self, se, status="Read"):
"""returns the value of a certain SE status flag (access or other)"""
return StorageElement(se, vo=self.voName).status().get(status, False)

def getReplicas(self, lfns, allStatus=True, getUrl=True, diskOnly=False, preferDisk=False, active=False):
def getReplicas(
self, lfns, allStatus=True, getUrl=True, diskOnly=False, preferDisk=False, active=False, protocol=None
):
"""get replicas from catalogue and filter if requested
Warning: all filters are independent, hence active and preferDisk should be set if using forJobs
"""
catalogReplicas = {}
failed = {}
if not protocol:
protocol = self.registrationProtocol

for lfnChunk in breakListIntoChunks(lfns, 1000):
res = self.fileCatalog.getReplicas(lfnChunk, allStatus=allStatus)
if res["OK"]:
Expand All @@ -1692,9 +1703,7 @@ def getReplicas(self, lfns, allStatus=True, getUrl=True, diskOnly=False, preferD

for se in se_lfn:
seObj = StorageElement(se, vo=self.voName)
succPfn = (
seObj.getURL(se_lfn[se], protocol=self.registrationProtocol).get("Value", {}).get("Successful", {})
)
succPfn = seObj.getURL(se_lfn[se], protocol=protocol).get("Value", {}).get("Successful", {})
for lfn in succPfn:
catalogReplicas[lfn][se] = succPfn[lfn]

Expand All @@ -1705,10 +1714,10 @@ def getReplicas(self, lfns, allStatus=True, getUrl=True, diskOnly=False, preferD
self.__filterTapeReplicas(result, diskOnly=diskOnly)
return S_OK(result)

def getReplicasForJobs(self, lfns, allStatus=False, getUrl=True, diskOnly=False):
def getReplicasForJobs(self, lfns, allStatus=False, getUrl=True, diskOnly=False, protocol=None):
"""get replicas useful for jobs"""
# Call getReplicas with no filter and enforce filters in this method
result = self.getReplicas(lfns, allStatus=allStatus, getUrl=getUrl)
result = self.getReplicas(lfns, allStatus=allStatus, getUrl=getUrl, protocol=protocol)
if not result["OK"]:
return result
replicaDict = result["Value"]
Expand Down

0 comments on commit 98e1f42

Please sign in to comment.