Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sweep:integration] fix(Resources): AREX can submit jobs without proxy (token only) #7832

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 66 additions & 71 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def _request(self, method, query, params=None, data=None, headers=None, timeout=
except requests.RequestException as e:
return S_ERROR(f"Request exception: {e}")

def _checkSession(self, mandatoryProxy: bool = False):
def _checkSession(self):
"""Check that the session exists and carries a valid proxy."""
if not self.session:
return S_ERROR("REST interface not initialised.")
Expand All @@ -248,13 +248,12 @@ def _checkSession(self, mandatoryProxy: bool = False):
self.log.error("Proxy or token not set")
return S_ERROR("Proxy or token not set")

# A proxy might be required: in this case, it should be present
if mandatoryProxy and not self.proxy:
self.log.error("Proxy is mandatory but not set")
return S_ERROR("Proxy is mandatory but not set")

# If a proxy is required or if no token is set
if mandatoryProxy or not self.token:
# If a token is set, we use it
if self.token:
# Attach the token to the headers if present
self.headers["Authorization"] = f"Bearer {self.token['access_token']}"
self.log.verbose("A token is attached to the header of the request(s)")
else:
# Prepare the proxy in X509_USER_PROXY
if not (result := self._prepareProxy())["OK"]:
self.log.error("Failed to set up proxy", result["Message"])
Expand All @@ -264,12 +263,6 @@ def _checkSession(self, mandatoryProxy: bool = False):
self.session.cert = os.environ.get("X509_USER_PROXY")
self.log.verbose("A proxy is attached to the session")

# If a token is set, we use it
if self.token:
# Attach the token to the headers if present
self.headers["Authorization"] = f"Bearer {self.token['access_token']}"
self.log.verbose("A token is attached to the header of the request(s)")

return S_OK()

#############################################################################
Expand Down Expand Up @@ -566,63 +559,66 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
Assume that the ARC queues are always of the format nordugrid-<batchSystem>-<queue>
And none of our supported batch systems have a "-" in their name
"""
result = self._checkSession(mandatoryProxy=True)
result = self._checkSession()
if not result["OK"]:
self.log.error("Cannot submit jobs", result["Message"])
return result

self.log.verbose(f"Executable file path: {executableFile}")

# Get existing delegations
result = self._getDelegationIDs()
if not result["OK"]:
self.log.error("Could not get delegation IDs.", result["Message"])
return S_ERROR("Could not get delegation IDs")
delegationIDs = result["Value"]

# Get the delegationID which corresponds to the DIRAC group of the proxy if it exists
currentDelegationID = None
proxyGroup = self.proxy.getDIRACGroup()
for delegationID in delegationIDs:
# Get the proxy attached to the delegationID
result = self._getProxyFromDelegationID(delegationID)

# Bug in AREX, sometimes delegationID does not exist anymore,
# but still appears in the list of delegationIDs.
# Issue submitted here: https://bugzilla.nordugrid.org/show_bug.cgi?id=4133
# In this case, we just try with the next one
if not result["OK"] and "404" in result["Message"]:
continue

# Else, it means there was an issue with the CE,
# we stop the execution
if not result["OK"]:
return result

proxy = result["Value"]

if proxy.getDIRACGroup() != proxyGroup:
continue

# If we are here, we have found the right delegationID to use
currentDelegationID = delegationID
break

if not currentDelegationID:
# No existing delegation, we need to prepare one
result = self._prepareDelegation()
if not result["OK"]:
self.log.warn("Could not get a new delegation", f"for CE {self.ceName}")
return S_ERROR("Could not get a new delegation")
currentDelegationID = result["Value"]

delegation = f"\n(delegationid={currentDelegationID})"

if not inputs:
inputs = []
if not outputs:
outputs = []

# Delegation cannot be used with a token
delegation = ""
if not self.token:
# Get existing delegations
result = self._getDelegationIDs()
if not result["OK"]:
self.log.error("Could not get delegation IDs.", result["Message"])
return S_ERROR("Could not get delegation IDs")
delegationIDs = result["Value"]

# Get the delegationID which corresponds to the DIRAC group of the proxy if it exists
currentDelegationID = None
proxyGroup = self.proxy.getDIRACGroup()
for delegationID in delegationIDs:
# Get the proxy attached to the delegationID
result = self._getProxyFromDelegationID(delegationID)

# Bug in AREX, sometimes delegationID does not exist anymore,
# but still appears in the list of delegationIDs.
# Issue submitted here: https://bugzilla.nordugrid.org/show_bug.cgi?id=4133
# In this case, we just try with the next one
if not result["OK"] and "404" in result["Message"]:
continue

# Else, it means there was an issue with the CE,
# we stop the execution
if not result["OK"]:
return result

proxy = result["Value"]

if proxy.getDIRACGroup() != proxyGroup:
continue

# If we are here, we have found the right delegationID to use
currentDelegationID = delegationID
break

if not currentDelegationID:
# No existing delegation, we need to prepare one
result = self._prepareDelegation()
if not result["OK"]:
self.log.warn("Could not get a new delegation", f"for CE {self.ceName}")
return S_ERROR("Could not get a new delegation")
currentDelegationID = result["Value"]

delegation = f"\n(delegationid={currentDelegationID})"

# If there is a preamble, then we bundle it in an executable file
if self.preamble:
inputs.append(executableFile)
Expand Down Expand Up @@ -871,14 +867,12 @@ def _renewDelegation(self, delegationID):

return S_OK()

#############################################################################

def getJobStatus(self, jobIDList):
"""Get the status information for the given list of jobs.

:param list jobIDList: list of job references, followed by the DIRAC stamp.
"""
result = self._checkSession(mandatoryProxy=True)
result = self._checkSession()
if not result["OK"]:
self.log.error("Cannot get status of the jobs", result["Message"])
return result
Expand Down Expand Up @@ -924,15 +918,16 @@ def getJobStatus(self, jobIDList):
self.log.debug(f"Killing held job {jobReference}")

# Renew delegations to renew the proxies of the jobs
result = self._getDelegationIDs()
if not result["OK"]:
return result
delegationIDs = result["Value"]
for delegationID in delegationIDs:
result = self._renewDelegation(delegationID)
if not self.token:
result = self._getDelegationIDs()
if not result["OK"]:
# Only log here as we still want to return statuses
self.log.warn("Failed to renew delegation", f"{delegationID}: {result['Message']}")
return result
delegationIDs = result["Value"]
for delegationID in delegationIDs:
result = self._renewDelegation(delegationID)
if not result["OK"]:
# Only log here as we still want to return statuses
self.log.warn("Failed to renew delegation", f"{delegationID}: {result['Message']}")

# Kill held jobs
if jobsToCancel:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,11 @@ def side_effect():
assert not arex.session.cert
assert "Authorization" in arex.headers, arex.headers

# 6. We make the proxy mandatory: the session should include both the proxy and the token
result = arex._checkSession(mandatoryProxy=True)
assert result["OK"], result
assert arex.session
assert arex.session.cert
assert "Authorization" in arex.headers, arex.headers

# 7. Now we just include the token, the proxy is not mandatory:
# 7. Now we just include the token:
# the session should only include the token
arex.proxy = None
result = arex._checkSession()
assert result["OK"], result
assert arex.session
assert not arex.session.cert
assert "Authorization" in arex.headers, arex.headers

# 8. Now we just include the token, but the proxy is mandatory: it should return an error
result = arex._checkSession(mandatoryProxy=True)
assert not result["OK"], result
assert arex.session
assert not arex.session.cert
assert "Authorization" not in arex.headers, arex.headers
Loading