Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 63 additions & 33 deletions core/importers/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,13 @@ class BaseResourceImporter:
mandatory_fields = set()
allowed_fields = []

def __init__(self, data, user, update_if_exists=False):
def __init__(self, data, user, update_if_exists=False, cache=None):
self.user = user
self.data = data
self.update_if_exists = update_if_exists
self.queryset = None
self.index_resources = False
self.cache = cache if cache is not None else {}

@classmethod
def can_handle(cls, obj):
Expand All @@ -127,6 +128,15 @@ def get_resource_type():
def get(self, attr, default_value=None):
return self.data.get(attr, default_value)

def get_cached_parent_source(self):
cache = self.cache.setdefault('source_by_owner', {})
key = (self.get_owner_type_filter(), self.get('owner'), self.get('source'))
if key not in cache:
cache[key] = Source.objects.filter(
**{self.get_owner_type_filter(): self.get('owner')}, mnemonic=self.get('source'), version=HEAD
).first()
return cache[key]

def parse(self):
self.data = self.get_filter_allowed_fields()
self.data['created_by'] = self.data['updated_by'] = self.user
Expand Down Expand Up @@ -416,8 +426,8 @@ class ConceptImporter(BaseResourceImporter):
def get_resource_type():
return 'Concept'

def __init__(self, data, user, update_if_exists, skip_hierarchy_tasks=False):
super().__init__(data, user, update_if_exists)
def __init__(self, data, user, update_if_exists, skip_hierarchy_tasks=False, cache=None):
super().__init__(data, user, update_if_exists, cache=cache)
self.skip_hierarchy_tasks = skip_hierarchy_tasks
self.version = False
self.instance = None
Expand All @@ -439,9 +449,7 @@ def get_queryset(self):
return self.queryset

def parse(self):
source = Source.objects.filter(
**{self.get_owner_type_filter(): self.get('owner')}, mnemonic=self.get('source'), version=HEAD
).first()
source = self.get_cached_parent_source()
super().parse()
self.data['parent'] = source
self.data['mnemonic'] = str(self.data.pop('id', ''))
Expand Down Expand Up @@ -527,14 +535,26 @@ class MappingImporter(BaseResourceImporter):
def get_resource_type():
return 'Mapping'

def __init__(self, data, user, update_if_exists):
super().__init__(data, user, update_if_exists)
def __init__(self, data, user, update_if_exists, cache=None):
super().__init__(data, user, update_if_exists, cache=cache)
self.version = False
self.instance = None

def exists(self):
return self.get_queryset().exists()

def get_cached_versioned_concept_by_uri(self, uri):
cache = self.cache.setdefault('concept_versioned_by_uri', {})
if uri not in cache:
cache[uri] = Concept.objects.filter(id=F('versioned_object_id'), uri=uri).first()
return cache[uri]

def get_cached_source_exists_by_uri(self, uri):
cache = self.cache.setdefault('source_exists_by_uri', {})
if uri not in cache:
cache[uri] = Source.objects.filter(uri=uri).exists()
return cache[uri]

def get_queryset(self): # pylint: disable=too-many-branches
if self.queryset:
return self.queryset
Expand All @@ -556,21 +576,21 @@ def get_queryset(self): # pylint: disable=too-many-branches
]

versionless_from_concept_url = drop_version(from_concept_url)
from_concept = Concept.objects.filter(id=F('versioned_object_id'), uri=versionless_from_concept_url).first()
from_concept = self.get_cached_versioned_concept_by_uri(versionless_from_concept_url)
if from_concept:
filters['from_concept__versioned_object_id'] = from_concept.versioned_object_id
elif not from_concept_code:
filters['from_concept_code'] = compact(versionless_from_concept_url.split('/'))[-1]
if to_concept_url:
versionless_to_concept_url = drop_version(to_concept_url)
to_concept = Concept.objects.filter(id=F('versioned_object_id'), uri=versionless_to_concept_url).first()
to_concept = self.get_cached_versioned_concept_by_uri(versionless_to_concept_url)
if to_concept:
filters['to_concept__versioned_object_id'] = to_concept.versioned_object_id
else:
filters['to_concept_code'] = compact(versionless_to_concept_url.split('/'))[-1]
if not to_source_url:
to_source_uri = to_parent_uri(versionless_to_concept_url)
if Source.objects.filter(uri=drop_version(to_source_uri)).exists():
if self.get_cached_source_exists_by_uri(drop_version(to_source_uri)):
filters['to_source__uri'] = to_source_uri

if self.get('id'):
Expand All @@ -589,9 +609,7 @@ def get_queryset(self): # pylint: disable=too-many-branches
return self.queryset

def parse(self):
source = Source.objects.filter(
**{self.get_owner_type_filter(): self.get('owner')}, mnemonic=self.get('source'), version=HEAD
).first()
source = self.get_cached_parent_source()
self.data = self.get_filter_allowed_fields()
self.data['parent'] = source

Expand Down Expand Up @@ -631,14 +649,15 @@ def process(self):
self.instance = queryset.first().clone()
self.instance._counted = None # pylint: disable=protected-access
self.instance._index = False # pylint: disable=protected-access
errors = Mapping.create_new_version_for(self.instance, self.data, self.user)
errors = Mapping.create_new_version_for(self.instance, self.data, self.user, cache=self.cache)
if errors and Mapping.is_standard_checksum_error(errors):
return UNCHANGED
return errors or UPDATED
if 'update_comment' in self.data:
self.data['comment'] = self.data['update_comment']
self.data.pop('update_comment')
self.instance = Mapping.persist_new({**self.data, '_counted': None, '_index': False}, self.user)
self.instance = Mapping.persist_new(
{**self.data, '_counted': None, '_index': False}, self.user, cache=self.cache)
if self.instance.id:
return CREATED
return self.instance.errors or errors or FAILED
Expand Down Expand Up @@ -752,13 +771,16 @@ def delete(self): # pylint: disable=too-many-locals,too-many-branches


class BulkImportInline(BaseImporter):
PROGRESS_NOTIFY_INTERVAL_SECONDS = 2

def __init__( # pylint: disable=too-many-arguments
self, content, username, update_if_exists=False, input_list=None, user=None, set_user=True,
self_task_id=None, skip_hierarchy_tasks=False
):
super().__init__(content, username, update_if_exists, user, not bool(input_list), set_user)
self.self_task_id = self_task_id
self.skip_hierarchy_tasks = skip_hierarchy_tasks
self.cache = {}
self.set_task()
if input_list:
self.input_list = input_list
Expand All @@ -777,6 +799,7 @@ def __init__( # pylint: disable=too-many-arguments
self.processed = 0
self.total = len(self.input_list)
self.start_time = time.time()
self.last_progress_notified_at = 0
self.elapsed_seconds = 0
self.index_resources = False

Expand Down Expand Up @@ -819,21 +842,26 @@ def handle_item_import_result(self, result, item): # pylint: disable=too-many-r
print("****Unexpected Result****", result)
self.others.append(item)

def notify_progress(self):
if self.task: # pragma: no cover
self.task.summary = {
'total': self.total,
'processed': self.processed,
'created': len(self.created),
'updated': len(self.updated),
'invalid': len(self.invalid),
'failed': len(self.failed),
'deleted': len(self.deleted),
'not_found': len(self.not_found),
'permission_denied': len(self.permission_denied),
'unchanged': len(self.unchanged),
}
self.task.save()
def notify_progress(self, force=False):
if not self.task:
return
now = time.time()
if not force and (now - self.last_progress_notified_at) < self.PROGRESS_NOTIFY_INTERVAL_SECONDS:
return
self.last_progress_notified_at = now
self.task.summary = { # pragma: no cover
'total': self.total,
'processed': self.processed,
'created': len(self.created),
'updated': len(self.updated),
'invalid': len(self.invalid),
'failed': len(self.failed),
'deleted': len(self.deleted),
'not_found': len(self.not_found),
'permission_denied': len(self.permission_denied),
'unchanged': len(self.unchanged),
}
self.task.save()

def run(self): # pylint: disable=too-many-branches,too-many-statements,too-many-locals
if self.self_task_id: # pragma: no cover
Expand Down Expand Up @@ -883,7 +911,8 @@ def run(self): # pylint: disable=too-many-branches,too-many-statements,too-many
try:
concept_importer = ConceptImporter(
item, self.user, self.update_if_exists,
skip_hierarchy_tasks=self.skip_hierarchy_tasks and bool(item.get('id'))
skip_hierarchy_tasks=self.skip_hierarchy_tasks and bool(item.get('id')),
cache=self.cache
)
_result = concept_importer.delete() if action == 'delete' else concept_importer.run()
if self.index_resources and get(concept_importer.instance, 'id'):
Expand All @@ -902,7 +931,7 @@ def run(self): # pylint: disable=too-many-branches,too-many-statements,too-many
continue
if item_type == 'mapping':
try:
mapping_importer = MappingImporter(item, self.user, self.update_if_exists)
mapping_importer = MappingImporter(item, self.user, self.update_if_exists, cache=self.cache)
_result = mapping_importer.delete() if action == 'delete' else mapping_importer.run()
if self.index_resources and get(mapping_importer.instance, 'id'):
new_mapping_ids.update(set(compact(
Expand All @@ -925,6 +954,7 @@ def run(self): # pylint: disable=too-many-branches,too-many-statements,too-many
)
continue

self.notify_progress(force=True)
if new_concept_ids:
for chunk in chunks(list(set(new_concept_ids)), 5000):
batch_index_resources.apply_async(
Expand Down
30 changes: 22 additions & 8 deletions core/mappings/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def create_initial_version(cls, mapping, **kwargs):
initial_version.save()
return initial_version

def populate_fields_from_relations(self, data): # pylint: disable=too-many-locals
def populate_fields_from_relations(self, data, cache=None): # pylint: disable=too-many-locals
from core.concepts.models import Concept
from core.sources.models import Source

Expand All @@ -328,19 +328,33 @@ def populate_fields_from_relations(self, data): # pylint: disable=too-many-loca
from_source_url = data.get('from_source_url', None) or to_parent_uri(from_concept_url)
to_source_url = data.get('to_source_url', None) or to_parent_uri(to_concept_url)

concept_cache = None if cache is None else cache.setdefault('concept_by_expr', {})
source_cache = None if cache is None else cache.setdefault('source_resolve_ref', {})

def get_concept(expr):
if expr and not expr.endswith('/'):
expr = expr + '/'
if concept_cache is not None and expr in concept_cache:
return concept_cache[expr]
concept = Concept.objects.filter(
uri=expr).first() or Concept.objects.filter(uri=encode_string(expr, safe='/')).first()

return concept or {'mnemonic': expr.replace(to_parent_uri(expr), '').replace('concepts/', '').split('/')[0]}
result = concept or {'mnemonic': expr.replace(to_parent_uri(expr), '').replace('concepts/', '').split('/')[0]}
if concept_cache is not None:
concept_cache[expr] = result
return result

def get_source(url):
if source_cache is not None and url in source_cache:
return source_cache[url]
source, _ = Source.resolve_reference_expression(url, None, HEAD)
if source.id:
return source, source.versioned_object_url or source.resolution_url or url
return None, source.resolution_url or url
result = (source, source.versioned_object_url or source.resolution_url or url)
else:
result = (None, source.resolution_url or url)
if source_cache is not None:
source_cache[url] = result
return result

self.from_source, self.from_source_url = get_source(from_source_url)
self.to_source, self.to_source_url = get_source(to_source_url)
Expand Down Expand Up @@ -392,8 +406,8 @@ def latest_source_version(self):
return self.sources.exclude(version=HEAD).order_by('-created_at').first()

@classmethod
def create_new_version_for(cls, instance, data, user):
instance.populate_fields_from_relations(data)
def create_new_version_for(cls, instance, data, user, cache=None):
instance.populate_fields_from_relations(data, cache=cache)
instance.extras = data.get('extras', instance.extras)
instance.external_id = data.get('external_id', instance.external_id)
instance.mnemonic = data.get('mnemonic', instance.mnemonic)
Expand Down Expand Up @@ -464,7 +478,7 @@ def get_next_sort_weight(self):
return None

@classmethod
def persist_new(cls, data, user): # pylint: disable=too-many-statements
def persist_new(cls, data, user, cache=None): # pylint: disable=too-many-statements
related_fields = ['from_concept_url', 'to_concept_url', 'to_source_url', 'from_source_url']
field_data = {k: v for k, v in data.items() if k not in related_fields}
url_params = {k: v for k, v in data.items() if k in related_fields}
Expand All @@ -478,7 +492,7 @@ def persist_new(cls, data, user): # pylint: disable=too-many-statements
if mapping.is_existing_in_parent():
mapping.errors = {'__all__': [ALREADY_EXISTS]}
return mapping
mapping.populate_fields_from_relations(url_params)
mapping.populate_fields_from_relations(url_params, cache=cache)

try:
mapping.full_clean()
Expand Down