Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3624: Add biosamples communicators #57

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions ebi_eva_common_pyutils/biosamples_communicators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
#!/usr/bin/env python
# Copyright 2020 EMBL - European Bioinformatics Institute
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re

import requests
from functools import cached_property
from ebi_eva_common_pyutils.logger import AppLogger
from retry import retry


class HALNotReadyError(Exception):
pass


class HALCommunicator(AppLogger):
"""
This class helps navigate through REST API that uses the HAL standard.
"""
acceptable_code = [200, 201]

def __init__(self, auth_url, bsd_url, username, password):
self.auth_url = auth_url
self.bsd_url = bsd_url
self.username = username
self.password = password

def _validate_response(self, response):
"""Check that the response has an acceptable code and raise if it does not"""
if response.status_code not in self.acceptable_code:
self.error(response.request.method + ': ' + response.request.url + " with " + str(response.request.body))
self.error("headers: {}".format(response.request.headers))
self.error("<{}>: {}".format(response.status_code, response.text))
raise ValueError('The HTTP status code ({}) is not one of the acceptable codes ({})'.format(
str(response.status_code), str(self.acceptable_code))
)
return response

@cached_property
def token(self):
"""Retrieve the token from the AAP REST API then cache it for further quering"""
response = requests.get(self.auth_url, auth=(self.username, self.password))
self._validate_response(response)
return response.text

@retry(exceptions=(ValueError, requests.RequestException), tries=3, delay=2, backoff=1.2, jitter=(1, 3))
def _req(self, method, url, **kwargs):
"""Private method that sends a request using the specified method. It adds the headers required by bsd"""
headers = kwargs.pop('headers', {})
headers.update({'Accept': 'application/hal+json'})
if self.token is not None:
headers.update({'Authorization': 'Bearer ' + self.token})
if 'json' in kwargs:
headers['Content-Type'] = 'application/json'
response = requests.request(
method=method,
url=url,
headers=headers,
**kwargs
)
self._validate_response(response)
return response

def follows(self, query, json_obj=None, method='GET', url_template_values=None, join_url=None, **kwargs):
"""
Finds a link within the json_obj using a query string or list, modify the link using the
url_template_values dictionary then query the link using the method and any additional keyword argument.
If the json_obj is not specified then it will use the root query defined by the base url.
"""
all_pages = kwargs.pop('all_pages', False)

if json_obj is None:
json_obj = self.root
# Drill down into a dict using dot notation
_json_obj = json_obj
if isinstance(query, str):
query_list = query.split('.')
else:
query_list = query
for query_element in query_list:
if query_element in _json_obj:
_json_obj = _json_obj[query_element]
else:
raise KeyError('{} does not exist in json object'.format(query_element, _json_obj))
if not isinstance(_json_obj, str):
raise ValueError('The result of the query_string must be a string to use as a url')
url = _json_obj
# replace the template in the url with the value provided
if url_template_values:
for k, v in url_template_values.items():
url = re.sub('{(' + k + ')(:.*)?}', v, url)
if join_url:
url += '/' + join_url
# Now query the url
json_response = self._req(method, url, **kwargs).json()

# Depaginate the call if requested
if all_pages is True:
# This depagination code will iterate over all the pages available until the pages comes back without a
# next page. It stores the embedded elements in the initial query's json response
content = json_response
while 'next' in content.get('_links'):
content = self._req(method, content.get('_links').get('next').get('href'), **kwargs).json()
for key in content.get('_embedded'):
json_response['_embedded'][key].extend(content.get('_embedded').get(key))
# Remove the pagination information as it is not relevant to the depaginated response
if 'page' in json_response: json_response.pop('page')
if 'first' in json_response['_links']: json_response['_links'].pop('first')
if 'last' in json_response['_links']: json_response['_links'].pop('last')
if 'next' in json_response['_links']: json_response['_links'].pop('next')
return json_response

def follows_link(self, key, json_obj=None, method='GET', url_template_values=None, join_url=None, **kwargs):
"""
Same function as follows but construct the query_string from a single keyword surrounded by '_links' and 'href'.
"""
return self.follows(('_links', key, 'href'),
json_obj=json_obj, method=method, url_template_values=url_template_values,
join_url=join_url, **kwargs)

@cached_property
def root(self):
return self._req('GET', self.bsd_url).json()

@property
def communicator_attributes(self):
raise NotImplementedError


class AAPHALCommunicator(HALCommunicator):
"""Class to navigate BioSamples API using AAP authentication."""

def __init__(self, auth_url, bsd_url, username, password, domain=None):
super(AAPHALCommunicator, self).__init__(auth_url, bsd_url, username, password)
self.domain = domain

@property
def communicator_attributes(self):
return {'domain': self.domain}


class WebinHALCommunicator(HALCommunicator):
"""Class to navigate BioSamples API using Webin authentication."""

@cached_property
def token(self):
"""Retrieve the token from the ENA Webin REST API then cache it for further querying"""
response = requests.post(self.auth_url,
json={"authRealms": ["ENA"], "password": self.password,
"username": self.username})
self._validate_response(response)
return response.text

@property
def communicator_attributes(self):
return {'webinSubmissionAccountId': self.username}


class NoAuthHALCommunicator(HALCommunicator):
"""Class to navigate BioSamples API without authentication."""

def __init__(self, bsd_url):
super(NoAuthHALCommunicator, self).__init__(None, bsd_url, None, None)

@cached_property
def token(self):
"""No auth token, so errors will be raised if auth is required for requests"""
return None
127 changes: 127 additions & 0 deletions tests/common/test_biosamples_communicators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from copy import deepcopy
from unittest import TestCase
from unittest.mock import Mock, patch, PropertyMock

from ebi_eva_common_pyutils.biosamples_communicators import HALCommunicator, WebinHALCommunicator


class TestHALCommunicator(TestCase):

@staticmethod
def patch_token(token='token'):
"""Creates a patch for BSDCommunicator token attribute. it returns the token provided"""
return patch.object(HALCommunicator, 'token', return_value=PropertyMock(return_value=token))

def setUp(self) -> None:
self.comm = HALCommunicator('http://aap.example.org', 'http://BSD.example.org', 'user', 'pass')

def test_token(self):
with patch('requests.get', return_value=Mock(text='token', status_code=200)) as mocked_get:
self.assertEqual(self.comm.token, 'token')
mocked_get.assert_called_once_with('http://aap.example.org', auth=('user', 'pass'))

def test_req(self):
with patch('requests.request', return_value=Mock(status_code=200)) as mocked_request, \
patch.object(HALCommunicator, 'token', new_callable=PropertyMock(return_value='token')):
self.comm._req('GET', 'http://BSD.example.org')
mocked_request.assert_called_once_with(
method='GET', url='http://BSD.example.org',
headers={'Accept': 'application/hal+json', 'Authorization': 'Bearer token'}
)

with patch.object(HALCommunicator, 'token', new_callable=PropertyMock(return_value='token')), \
patch('requests.request') as mocked_request:
mocked_request.return_value = Mock(status_code=500, request=PropertyMock(url='text'))
self.assertRaises(ValueError, self.comm._req, 'GET', 'http://BSD.example.org')

def test_root(self):
expected_json = {'json': 'values'}
with patch.object(HALCommunicator, '_req') as mocked_req:
mocked_req.return_value = Mock(json=Mock(return_value={'json': 'values'}))
self.assertEqual(self.comm.root, expected_json)
mocked_req.assert_called_once_with('GET', 'http://BSD.example.org')

def test_follows(self):
json_response = {'json': 'values'}
# Patches the _req function that returns the Response object with a json function
patch_req = patch.object(HALCommunicator, '_req', return_value=Mock(json=Mock(return_value=json_response)))

# test follow url
with patch_req as mocked_req:
self.assertEqual(self.comm.follows('test', {'test': 'url'}), json_response)
mocked_req.assert_any_call('GET', 'url')

# test follow url with a template
with patch_req as mocked_req:
self.assertEqual(self.comm.follows('test', {'test': 'url/{id:*.}'}, url_template_values={'id': '1'}),
json_response)
mocked_req.assert_any_call('GET', 'url/1')

# test follow url deep in the json_obj
with patch_req as mocked_req:
self.assertEqual(self.comm.follows('test1.test2', {'test1': {'test2': 'url'}}), json_response)
mocked_req.assert_any_call('GET', 'url')

# test follow url wih specific verb and payload
with patch_req as mocked_req:
self.assertEqual(
self.comm.follows('test', {'test': 'url'}, method='POST', json={'data': 'value'}),
json_response
)
mocked_req.assert_any_call('POST', 'url', json={'data': 'value'})

# test follow with depagination
json_entries_with_next = {
'_embedded': {'samples': [json_response, json_response]},
'_links': {'next': {'href': 'url'}, 'first': {}, 'last': {}},
'page': {}
}
json_entries_without_next = {
'_embedded': {'samples': [json_response]},
'_links': {},
'page': {}
}
patch_req_with_pages = patch.object(HALCommunicator, '_req', side_effect=[
Mock(json=Mock(return_value=deepcopy(json_entries_with_next))),
Mock(json=Mock(return_value=deepcopy(json_entries_with_next))),
Mock(json=Mock(return_value=deepcopy(json_entries_with_next))),
Mock(json=Mock(return_value=deepcopy(json_entries_without_next))),
])
# Without all_pages=True only returns the first page
with patch_req_with_pages as mocked_req:
observed_json = self.comm.follows('test', {'test': 'url'})
self.assertEqual(observed_json, json_entries_with_next)
self.assertEqual(len(observed_json['_embedded']['samples']), 2)
mocked_req.assert_any_call('GET', 'url')

# With all_pages=True returns the first page that contains all the embedded elements
with patch_req_with_pages as mocked_req:
observed_json = self.comm.follows('test', {'test': 'url'}, all_pages=True)
self.assertEqual(len(observed_json['_embedded']['samples']), 7)
self.assertEqual(mocked_req.call_count, 4)

def test_follows_link(self):
json_response = {'json': 'values'}
# Patches the _req function that returns the Response object with a json function
patch_req = patch.object(HALCommunicator, '_req', return_value=Mock(json=Mock(return_value=json_response)))

# test basic follow
with patch_req as mocked_req:
self.assertEqual(self.comm.follows_link('test', {'_links': {'test': {'href': 'url'}}}), json_response)
mocked_req.assert_any_call('GET', 'url')


class TestWebinHALCommunicator(TestCase):

def setUp(self) -> None:
self.comm = WebinHALCommunicator('http://webin.example.org', 'http://BSD.example.org', 'user', 'pass')

def test_communicator_attributes(self):
assert self.comm.communicator_attributes == {'webinSubmissionAccountId': 'user'}

def test_token(self):
with patch('requests.post', return_value=Mock(text='token', status_code=200)) as mocked_post:
self.assertEqual(self.comm.token, 'token')
print(mocked_post.mock_calls)
mocked_post.assert_called_once_with('http://webin.example.org',
json={'authRealms': ['ENA'], 'password': 'pass', 'username': 'user'})
Loading