Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/link networks and resources #5

Merged
merged 5 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion clients/directory_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,16 @@ def get_all_collections():
id
name
}
}
network {
id
name
url
contact
{
email
}
}
}
}

'''
Expand Down
52 changes: 49 additions & 3 deletions clients/negotiator_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import requests

from utils import dump
from exceptions import TokenExpiredException
from models.dto.network import NegotiatorNetworkDTO, NetworkDirectoryDTO
from models.dto.organization import NegotiatorOrganizationDTO, OrganizationDirectoryDTO
from models.dto.resource import NegotiatorResourceDTO, ResourceDirectoryDTO
from utils import dump


class NegotiatorAPIClient:
Expand Down Expand Up @@ -51,6 +51,13 @@ def patch(self, endpoint, data=None):
raise TokenExpiredException()
return response # Return the JSON response

def delete(self, endpoint, data=None):
url = f"{self._base_url}/{endpoint}"
response = requests.delete(url, headers=self.get_headers(), data=data)
if response.status_code == 401:
raise TokenExpiredException()
return response # Return the JSON response

def get_all_organizations(self):
return NegotiatorOrganizationDTO.parse(
self.get('organizations?size=10000').json()['_embedded']['organizations'])
Expand All @@ -69,14 +76,31 @@ def update_organization_name(self, id, name, external_id):
self.put(f'organizations/{id}', data=json.dumps({'name': name, 'externalId': external_id}))

def add_resources(self, resources: list):
self.post('resources', data=json.dumps(resources))
added_resources = self.post('resources', data=json.dumps(resources))
return added_resources.json()

def update_resource_name_or_description(self, id, name, description):
self.patch(f'resources/{id}',
data=json.dumps({'name': name, 'description': description}))

def add_networks(self, networks: list):
self.post('networks', data=json.dumps(networks))
added_networks = self.post('networks', data=json.dumps(networks))
return added_networks.json()

def add_resources_to_network(self, network_id, resources: list):
response = self.post(f'networks/{network_id}/resources', data=json.dumps(resources))
if response.status_code != 204:
raise Exception(f'Error occurred while trying to link network {network_id} with resources {resources}')

def delete_resource_from_network(self, network_id, resource_id):
self.delete(f'networks/{network_id}/resources/{resource_id}')

def get_network_resources(self, network_id):
response = self.get(f'networks/{network_id}/resources?size=10000')
try:
return [{'id': resource['id'], 'sourceId': resource['sourceId']} for resource in response.json()['_embedded']['resources']]
except KeyError:
return []

def update_network_info(self, id, name, url, email, external_id):
self.put(f'networks/{id}',
Expand Down Expand Up @@ -108,3 +132,25 @@ def network_create_dto(network: NetworkDirectoryDTO):
'contactEmail': network.contact.email,
'uri': network.url
}


def get_network_id_by_external_id(external_id, added_networks_json):
for network in added_networks_json['_embedded']['networks']:
if network['externalId'] == external_id:
return network['id']
return None


def lookup_resource_id(source_id, added_resources_json):
for resource in added_resources_json['_embedded']['resources']:
if resource['sourceId'] == source_id:
return resource['id']
return None


def get_resource_id_by_source_id(source_id, negotiator_resources: [NegotiatorResourceDTO]):
for r in negotiator_resources:
if r.sourceId == source_id:
return r.id


2 changes: 1 addition & 1 deletion compose/docker-compose-run-services-for-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ services:
networks:
- directory-negotiator-sync-test
emx2:
image: molgenis/molgenis-emx2:latest
image: molgenis/molgenis-emx2:v11.28.2
environment:
- TZ=Europe/Rome
- MOLGENIS_POSTGRES_URI=jdbc:postgresql://postgres:5432/molgenis
Expand Down
2 changes: 2 additions & 0 deletions models/dto/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pydantic import BaseModel

from .network import NetworkDirectoryDTO
from ..dto.organization import OrganizationDirectoryDTO


Expand All @@ -10,6 +11,7 @@ class ResourceDirectoryDTO(BaseModel):
name: str
description: Optional[str]
biobank: OrganizationDirectoryDTO
network: Optional[list[NetworkDirectoryDTO]] = None

@staticmethod
def parse(directory_data):
Expand Down
53 changes: 47 additions & 6 deletions synchronization/sync_service.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from auth import renew_access_token
from clients.directory_client import (get_all_biobanks, get_all_collections, get_all_directory_networks)
from clients.negotiator_client import resource_create_dto, network_create_dto, NegotiatorAPIClient
from clients.negotiator_client import resource_create_dto, network_create_dto, NegotiatorAPIClient, \
get_resource_id_by_source_id
from config import LOG
from models.dto.network import NetworkDirectoryDTO, NegotiatorNetworkDTO
from models.dto.organization import OrganizationDirectoryDTO, NegotiatorOrganizationDTO
from models.dto.resource import ResourceDirectoryDTO, NegotiatorResourceDTO
from utils import get_all_directory_resources_networks_links


def get_negotiator_organization_by_external_id(negotiator_organizations: list[NegotiatorOrganizationDTO],
Expand Down Expand Up @@ -45,13 +47,14 @@ def sync_all(negotiator_client: NegotiatorAPIClient):
directory_organizations = get_all_biobanks()
negotiator_organizations = negotiator_client.get_all_organizations()
directory_resources = get_all_collections()
negotiator_resources = negotiator_client.get_all_resources()
sync_organizations(negotiator_client, directory_organizations, negotiator_organizations)
directory_network_resources_links = get_all_directory_resources_networks_links(directory_resources)
negotiator_resources = negotiator_client.get_all_resources()
sync_resources(negotiator_client, directory_resources, negotiator_resources)
directory_networks = get_all_directory_networks()
negotiator_networks = negotiator_client.get_all_negotiator_networks()
sync_networks(negotiator_client, directory_networks, negotiator_networks)
negotiator_client.update_sync_job(job_id, 'COMPLETED')
sync_networks(negotiator_client, directory_networks, negotiator_networks,
directory_network_resources_links)


@renew_access_token
Expand Down Expand Up @@ -106,10 +109,12 @@ def sync_resources(negotiator_client: NegotiatorAPIClient, directory_resources:
negotiator_client.add_resources(resources_to_add)



@renew_access_token
def sync_networks(negotiator_client: NegotiatorAPIClient, directory_networks: list[NetworkDirectoryDTO],
negotiator_networks: list[NegotiatorNetworkDTO]):
negotiator_networks: list[NegotiatorNetworkDTO], directory_network_resources_links: dict):
networks_to_add = list()
added_networks = None
LOG.info("Starting sync for networks")
for directory_network in directory_networks:
external_id = directory_network.id
Expand All @@ -120,8 +125,44 @@ def sync_networks(negotiator_client: NegotiatorAPIClient, directory_networks: li
negotiator_client.update_network_info(network.id, directory_network.name,
directory_network.url,
directory_network.contact.email, external_id)
LOG.info(f'Updating linked resources for network: {network.id}')
update_network_resources(negotiator_client, network.id, network.externalId,
directory_network_resources_links)
else:
LOG.info(f'Network with id {external_id} not found, adding it to the list of networks to add')
networks_to_add.append(network_create_dto(directory_network))

if len(networks_to_add) > 0:
negotiator_client.add_networks(networks_to_add)
added_networks = negotiator_client.add_networks(networks_to_add)
LOG.info("Adding resource links for the new networks")
for network in added_networks['_embedded']['networks']:
update_network_resources(negotiator_client, network['id'], network['externalId'],
directory_network_resources_links)

return added_networks


@renew_access_token
def update_network_resources(negotiator_client: NegotiatorAPIClient, network_id, network_external_id,
directory_network_resources_links):
negotiator_network_resources = negotiator_client.get_network_resources(network_id)
negotiator_network_resources_external_ids = [r['sourceId'] for r in negotiator_network_resources]
try:
directory_network_resources = directory_network_resources_links[network_external_id]
except KeyError:
directory_network_resources = []
if set(directory_network_resources) == set(negotiator_network_resources_external_ids):
LOG.info(f'No resources to update for network: {network_id}')
else:
resources_to_unlink = set(negotiator_network_resources_external_ids) - set(directory_network_resources)
resources_to_add = set(directory_network_resources) - set(negotiator_network_resources_external_ids)
negotiator_resources = negotiator_client.get_all_resources()
for r in resources_to_unlink:
LOG.info(f'Removing resource {r} from network: {network_external_id}')
negotiator_resource_id = get_resource_id_by_source_id(r, negotiator_resources)
negotiator_client.delete_resource_from_network(network_id, negotiator_resource_id)
if len(resources_to_add) > 0:
negotiator_resources_to_add = [get_resource_id_by_source_id(res, negotiator_resources) for res in
resources_to_add]
LOG.info(f'Adding resources {resources_to_add} to network {network_external_id}')
negotiator_client.add_resources_to_network(network_id, negotiator_resources_to_add)
Loading