diff --git a/core/importers/models.py b/core/importers/models.py index ce99e80e..a1131333 100644 --- a/core/importers/models.py +++ b/core/importers/models.py @@ -109,12 +109,13 @@ class BaseResourceImporter: mandatory_fields = set() allowed_fields = [] - def __init__(self, data, user, update_if_exists=False): + def __init__(self, data, user, update_if_exists=False, cache=None): self.user = user self.data = data self.update_if_exists = update_if_exists self.queryset = None self.index_resources = False + self.cache = cache if cache is not None else {} @classmethod def can_handle(cls, obj): @@ -127,6 +128,15 @@ def get_resource_type(): def get(self, attr, default_value=None): return self.data.get(attr, default_value) + def get_cached_parent_source(self): + cache = self.cache.setdefault('source_by_owner', {}) + key = (self.get_owner_type_filter(), self.get('owner'), self.get('source')) + if key not in cache: + cache[key] = Source.objects.filter( + **{self.get_owner_type_filter(): self.get('owner')}, mnemonic=self.get('source'), version=HEAD + ).first() + return cache[key] + def parse(self): self.data = self.get_filter_allowed_fields() self.data['created_by'] = self.data['updated_by'] = self.user @@ -416,8 +426,8 @@ class ConceptImporter(BaseResourceImporter): def get_resource_type(): return 'Concept' - def __init__(self, data, user, update_if_exists, skip_hierarchy_tasks=False): - super().__init__(data, user, update_if_exists) + def __init__(self, data, user, update_if_exists, skip_hierarchy_tasks=False, cache=None): + super().__init__(data, user, update_if_exists, cache=cache) self.skip_hierarchy_tasks = skip_hierarchy_tasks self.version = False self.instance = None @@ -439,9 +449,7 @@ def get_queryset(self): return self.queryset def parse(self): - source = Source.objects.filter( - **{self.get_owner_type_filter(): self.get('owner')}, mnemonic=self.get('source'), version=HEAD - ).first() + source = self.get_cached_parent_source() super().parse() self.data['parent'] = source self.data['mnemonic'] = str(self.data.pop('id', '')) @@ -527,14 +535,26 @@ class MappingImporter(BaseResourceImporter): def get_resource_type(): return 'Mapping' - def __init__(self, data, user, update_if_exists): - super().__init__(data, user, update_if_exists) + def __init__(self, data, user, update_if_exists, cache=None): + super().__init__(data, user, update_if_exists, cache=cache) self.version = False self.instance = None def exists(self): return self.get_queryset().exists() + def get_cached_versioned_concept_by_uri(self, uri): + cache = self.cache.setdefault('concept_versioned_by_uri', {}) + if uri not in cache: + cache[uri] = Concept.objects.filter(id=F('versioned_object_id'), uri=uri).first() + return cache[uri] + + def get_cached_source_exists_by_uri(self, uri): + cache = self.cache.setdefault('source_exists_by_uri', {}) + if uri not in cache: + cache[uri] = Source.objects.filter(uri=uri).exists() + return cache[uri] + def get_queryset(self): # pylint: disable=too-many-branches if self.queryset: return self.queryset @@ -556,21 +576,21 @@ def get_queryset(self): # pylint: disable=too-many-branches ] versionless_from_concept_url = drop_version(from_concept_url) - from_concept = Concept.objects.filter(id=F('versioned_object_id'), uri=versionless_from_concept_url).first() + from_concept = self.get_cached_versioned_concept_by_uri(versionless_from_concept_url) if from_concept: filters['from_concept__versioned_object_id'] = from_concept.versioned_object_id elif not from_concept_code: filters['from_concept_code'] = compact(versionless_from_concept_url.split('/'))[-1] if to_concept_url: versionless_to_concept_url = drop_version(to_concept_url) - to_concept = Concept.objects.filter(id=F('versioned_object_id'), uri=versionless_to_concept_url).first() + to_concept = self.get_cached_versioned_concept_by_uri(versionless_to_concept_url) if to_concept: filters['to_concept__versioned_object_id'] = to_concept.versioned_object_id else: filters['to_concept_code'] = compact(versionless_to_concept_url.split('/'))[-1] if not to_source_url: to_source_uri = to_parent_uri(versionless_to_concept_url) - if Source.objects.filter(uri=drop_version(to_source_uri)).exists(): + if self.get_cached_source_exists_by_uri(drop_version(to_source_uri)): filters['to_source__uri'] = to_source_uri if self.get('id'): @@ -589,9 +609,7 @@ def get_queryset(self): # pylint: disable=too-many-branches return self.queryset def parse(self): - source = Source.objects.filter( - **{self.get_owner_type_filter(): self.get('owner')}, mnemonic=self.get('source'), version=HEAD - ).first() + source = self.get_cached_parent_source() self.data = self.get_filter_allowed_fields() self.data['parent'] = source @@ -631,14 +649,15 @@ def process(self): self.instance = queryset.first().clone() self.instance._counted = None # pylint: disable=protected-access self.instance._index = False # pylint: disable=protected-access - errors = Mapping.create_new_version_for(self.instance, self.data, self.user) + errors = Mapping.create_new_version_for(self.instance, self.data, self.user, cache=self.cache) if errors and Mapping.is_standard_checksum_error(errors): return UNCHANGED return errors or UPDATED if 'update_comment' in self.data: self.data['comment'] = self.data['update_comment'] self.data.pop('update_comment') - self.instance = Mapping.persist_new({**self.data, '_counted': None, '_index': False}, self.user) + self.instance = Mapping.persist_new( + {**self.data, '_counted': None, '_index': False}, self.user, cache=self.cache) if self.instance.id: return CREATED return self.instance.errors or errors or FAILED @@ -752,6 +771,8 @@ def delete(self): # pylint: disable=too-many-locals,too-many-branches class BulkImportInline(BaseImporter): + PROGRESS_NOTIFY_INTERVAL_SECONDS = 2 + def __init__( # pylint: disable=too-many-arguments self, content, username, update_if_exists=False, input_list=None, user=None, set_user=True, self_task_id=None, skip_hierarchy_tasks=False @@ -759,6 +780,7 @@ def __init__( # pylint: disable=too-many-arguments super().__init__(content, username, update_if_exists, user, not bool(input_list), set_user) self.self_task_id = self_task_id self.skip_hierarchy_tasks = skip_hierarchy_tasks + self.cache = {} self.set_task() if input_list: self.input_list = input_list @@ -777,6 +799,7 @@ def __init__( # pylint: disable=too-many-arguments self.processed = 0 self.total = len(self.input_list) self.start_time = time.time() + self.last_progress_notified_at = 0 self.elapsed_seconds = 0 self.index_resources = False @@ -819,21 +842,26 @@ def handle_item_import_result(self, result, item): # pylint: disable=too-many-r print("****Unexpected Result****", result) self.others.append(item) - def notify_progress(self): - if self.task: # pragma: no cover - self.task.summary = { - 'total': self.total, - 'processed': self.processed, - 'created': len(self.created), - 'updated': len(self.updated), - 'invalid': len(self.invalid), - 'failed': len(self.failed), - 'deleted': len(self.deleted), - 'not_found': len(self.not_found), - 'permission_denied': len(self.permission_denied), - 'unchanged': len(self.unchanged), - } - self.task.save() + def notify_progress(self, force=False): + if not self.task: + return + now = time.time() + if not force and (now - self.last_progress_notified_at) < self.PROGRESS_NOTIFY_INTERVAL_SECONDS: + return + self.last_progress_notified_at = now + self.task.summary = { # pragma: no cover + 'total': self.total, + 'processed': self.processed, + 'created': len(self.created), + 'updated': len(self.updated), + 'invalid': len(self.invalid), + 'failed': len(self.failed), + 'deleted': len(self.deleted), + 'not_found': len(self.not_found), + 'permission_denied': len(self.permission_denied), + 'unchanged': len(self.unchanged), + } + self.task.save() def run(self): # pylint: disable=too-many-branches,too-many-statements,too-many-locals if self.self_task_id: # pragma: no cover @@ -883,7 +911,8 @@ def run(self): # pylint: disable=too-many-branches,too-many-statements,too-many try: concept_importer = ConceptImporter( item, self.user, self.update_if_exists, - skip_hierarchy_tasks=self.skip_hierarchy_tasks and bool(item.get('id')) + skip_hierarchy_tasks=self.skip_hierarchy_tasks and bool(item.get('id')), + cache=self.cache ) _result = concept_importer.delete() if action == 'delete' else concept_importer.run() if self.index_resources and get(concept_importer.instance, 'id'): @@ -902,7 +931,7 @@ def run(self): # pylint: disable=too-many-branches,too-many-statements,too-many continue if item_type == 'mapping': try: - mapping_importer = MappingImporter(item, self.user, self.update_if_exists) + mapping_importer = MappingImporter(item, self.user, self.update_if_exists, cache=self.cache) _result = mapping_importer.delete() if action == 'delete' else mapping_importer.run() if self.index_resources and get(mapping_importer.instance, 'id'): new_mapping_ids.update(set(compact( @@ -925,6 +954,7 @@ def run(self): # pylint: disable=too-many-branches,too-many-statements,too-many ) continue + self.notify_progress(force=True) if new_concept_ids: for chunk in chunks(list(set(new_concept_ids)), 5000): batch_index_resources.apply_async( diff --git a/core/mappings/models.py b/core/mappings/models.py index ea985234..3b994b44 100644 --- a/core/mappings/models.py +++ b/core/mappings/models.py @@ -317,7 +317,7 @@ def create_initial_version(cls, mapping, **kwargs): initial_version.save() return initial_version - def populate_fields_from_relations(self, data): # pylint: disable=too-many-locals + def populate_fields_from_relations(self, data, cache=None): # pylint: disable=too-many-locals from core.concepts.models import Concept from core.sources.models import Source @@ -328,19 +328,33 @@ def populate_fields_from_relations(self, data): # pylint: disable=too-many-loca from_source_url = data.get('from_source_url', None) or to_parent_uri(from_concept_url) to_source_url = data.get('to_source_url', None) or to_parent_uri(to_concept_url) + concept_cache = None if cache is None else cache.setdefault('concept_by_expr', {}) + source_cache = None if cache is None else cache.setdefault('source_resolve_ref', {}) + def get_concept(expr): if expr and not expr.endswith('/'): expr = expr + '/' + if concept_cache is not None and expr in concept_cache: + return concept_cache[expr] concept = Concept.objects.filter( uri=expr).first() or Concept.objects.filter(uri=encode_string(expr, safe='/')).first() - return concept or {'mnemonic': expr.replace(to_parent_uri(expr), '').replace('concepts/', '').split('/')[0]} + result = concept or {'mnemonic': expr.replace(to_parent_uri(expr), '').replace('concepts/', '').split('/')[0]} + if concept_cache is not None: + concept_cache[expr] = result + return result def get_source(url): + if source_cache is not None and url in source_cache: + return source_cache[url] source, _ = Source.resolve_reference_expression(url, None, HEAD) if source.id: - return source, source.versioned_object_url or source.resolution_url or url - return None, source.resolution_url or url + result = (source, source.versioned_object_url or source.resolution_url or url) + else: + result = (None, source.resolution_url or url) + if source_cache is not None: + source_cache[url] = result + return result self.from_source, self.from_source_url = get_source(from_source_url) self.to_source, self.to_source_url = get_source(to_source_url) @@ -392,8 +406,8 @@ def latest_source_version(self): return self.sources.exclude(version=HEAD).order_by('-created_at').first() @classmethod - def create_new_version_for(cls, instance, data, user): - instance.populate_fields_from_relations(data) + def create_new_version_for(cls, instance, data, user, cache=None): + instance.populate_fields_from_relations(data, cache=cache) instance.extras = data.get('extras', instance.extras) instance.external_id = data.get('external_id', instance.external_id) instance.mnemonic = data.get('mnemonic', instance.mnemonic) @@ -464,7 +478,7 @@ def get_next_sort_weight(self): return None @classmethod - def persist_new(cls, data, user): # pylint: disable=too-many-statements + def persist_new(cls, data, user, cache=None): # pylint: disable=too-many-statements related_fields = ['from_concept_url', 'to_concept_url', 'to_source_url', 'from_source_url'] field_data = {k: v for k, v in data.items() if k not in related_fields} url_params = {k: v for k, v in data.items() if k in related_fields} @@ -478,7 +492,7 @@ def persist_new(cls, data, user): # pylint: disable=too-many-statements if mapping.is_existing_in_parent(): mapping.errors = {'__all__': [ALREADY_EXISTS]} return mapping - mapping.populate_fields_from_relations(url_params) + mapping.populate_fields_from_relations(url_params, cache=cache) try: mapping.full_clean()