Skip to content

feat: Source-agnostic ETL pipeline — Scopus, Dimensions, Lens, Cochrane, PubMed API, OpenAlex API#20

Open
qmmrjaved-hue wants to merge 1 commit into
PRAISELab-PicusLab:mainfrom
qmmrjaved-hue:main
Open

feat: Source-agnostic ETL pipeline — Scopus, Dimensions, Lens, Cochrane, PubMed API, OpenAlex API#20
qmmrjaved-hue wants to merge 1 commit into
PRAISELab-PicusLab:mainfrom
qmmrjaved-hue:main

Conversation

@qmmrjaved-hue

Copy link
Copy Markdown

Authors

Name Role Matricola
Qamar Javed Lead D03000268
Md Emran Hussain Member
Junaid Member

Course: Data Science — AY 2025/2026
Professor: Prof. Vincenzo Moscato
University: Università degli Studi di Napoli Federico II (UNINA)


Bibliometrix Python — Source-Agnostic ETL Pipeline

Overview

This pull request implements a robust ETL (Extract → Transform → Validate → Load) pipeline
for the Bibliometrix Python port, making it fully source-agnostic. The current implementation
only works reliably with Web of Science data. This contribution completes all five sources
marked "in progress" in the repository and adds OpenAlex API support which did not exist at all.

All 10 supported sources produce a standardized DataFrame in the WoS Field Tag schema
(DB, UT, TI, AU, AF, PY, SO, JI, TC, CR, DE, ID, AB, C1,
RP, DI, PMID, DT, LA, VL, IS, BP, EP, SR) so every existing
analytical function in www/services/ and www/functions/ runs without modification.


What Was Added

New Modules (www/services/)

File Purpose
mapping_dicts.py 10 source mapping dicts + schema constants (MANDATORY_COLUMNS, LIST_FIELDS, SCALAR_FIELDS, SOURCE_TO_DB)
standardizer.py ETL dispatcher: load_file(), detect_source(), rename_columns(), enforce_types(), handle_nulls(), add_calculated_fields(), run_pipeline(), export_to_csv()
validator.py Schema, null, and list-type validation with validate(df) and ValidationError
api_retriever.py PubMed E-utilities and OpenAlex REST API clients: fetch_pubmed(), fetch_openalex() with pagination and exponential-backoff retry

New Dashboard (dashboard/)

File Purpose
dashboard/app.py Standalone Streamlit dashboard — five tabs, no emojis, DM Sans + deep purple CSS, Plotly charts

New Tests & Config

File Purpose
tests/test_etl.py 62 pytest tests covering all pipeline phases and all 10 source types
pytest.ini Custom mark registration (integration, file_sources)

Source Coverage

API-Automated (no manual file download required)

Source API Endpoint Auth
PubMed NCBI E-utilities — esearch + efetch (MEDLINE format) None required
OpenAlex REST — https://api.openalex.org/works with cursor pagination None required

Both sources handle pagination, rate limiting, and exponential backoff retries automatically.
The user provides only a text query and result count.

File-Based (user uploads an exported file)

Source Formats Supported Repo Status Before This PR
Scopus CSV export, BibTeX export Was "in progress" — now complete
Web of Science TXT plaintext, CIW, BibTeX export Already worked — tested, untouched
Dimensions CSV export, XLSX export Was "in progress" — now complete
Lens.org CSV export Was "in progress" — now complete
Cochrane CDSR TXT plaintext Was "in progress" — now complete
PubMed MEDLINE TXT file export Not listed — now complete

Smoke-tested record counts (real sample files from sources/)

Source Records loaded DB value in output
Scopus CSV 1,000 SCOPUS
Scopus BibTeX 966 SCOPUS
WoS TXT 500 WOS
WoS BibTeX 500 WOS
Dimensions CSV 500 DIMENSIONS
Lens.org CSV 1,000 LENS
Cochrane TXT 1,126 COCHRANE
PubMed TXT 10,000 PUBMED

Architecture

1. Mapping Dictionaries — single source of truth (mapping_dicts.py)

Every source has its own dictionary mapping source-native column names to WoS Field Tags.
Column names are never hardcoded anywhere else in the codebase.

PUBMED_MAP       = {"FAU": "AF", "MH": "ID", "OT": "DE", "TA": "JI",
                    "JT": "SO", "AID": "DI", "DP": "PY", "VI": "VL",
                    "IP": "IS", "PG": "BP", "AD": "C1", ...}

OPENALEX_MAP     = {"display_name": "TI", "author_names": "AU",
                    "author_full_names": "AF", "affiliations": "C1",
                    "doi": "DI", "publication_year": "PY",
                    "source_title": "SO", "cited_by_count": "TC", ...}

SCOPUS_CSV_MAP   = {"Title": "TI", "Authors": "AU", "Author full names": "AF",
                    "EID": "UT", "Cited by": "TC", "Author Keywords": "DE",
                    "Index Keywords": "ID", "Affiliations": "C1", ...}

SCOPUS_BIB_MAP   = {"author": "AU", "title": "TI", "note": "TC",
                    "url": "UT", "pages": "BP", "journal": "SO",
                    "keywords": "DE", "abstract": "AB", ...}

WOS_TXT_MAP      = {"PM": "PMID"}   # all other WoS tags are already correct

WOS_BIB_MAP      = {"ID": "UT", "author": "AU", "keywords-plus": "ID",
                    "times-cited": "TC", "affiliation": "C1", ...}

DIMENSIONS_MAP   = {"Publication ID": "UT", "PubYear": "PY",
                    "Authors (Raw Affiliation)": "C1", "MeSH terms": "ID",
                    "Times cited": "TC", "DOI": "DI", ...}

LENS_MAP         = {"Lens ID": "UT", "Author/s": "AU", "Source Title": "SO",
                    "Citing Works Count": "TC", "DOI": "DI",
                    "Publication Year": "PY", "Keywords": "DE", ...}

COCHRANE_MAP     = {"ID": "UT", "YR": "PY", "KY": "DE", "NO": "IS",
                    "DOI": "DI", "AU": "AU", "TI": "TI", "AB": "AB", ...}

PUBMED_FILE_MAP  = {"PMID": "PMID", "LID": "DI", "IS": "SN", "IP": "IS",
                    "FAU": "AF", "MH": "ID", "OT": "DE", "TA": "JI", ...}

2. Auto-detection (standardizer.py → detect_source())

detect_source() identifies the source from:

  1. The DB column value (set by load_file() or the API retrievers)
  2. Column name fingerprints (e.g. {"EID", "Authors", "Source title"}SCOPUS_CSV)

load_file() auto-detects format from file extension and content sampling (first 800 chars),
or accepts an explicit source= override.

3. Type Contracts (standardizer.py → enforce_types())

Every output field has a strict enforced type — zero NaN or None is guaranteed:

Field(s) Enforced Type Null Replacement
AU, AF, C1, CR, DE, ID list[str] []
TI, SO, AB, DI, UT, DT, LA, RP, JI, VL, IS, BP, EP, PMID str ""
PY str (4-digit year extracted from full date string) ""
TC int 0
DB str (set from SOURCE_TO_DB — e.g. SCOPUS_CSV"SCOPUS") N/A
SR str (calculated short reference) N/A — always generated

Source-specific pre-processing is contained in enforce_types():

  • Scopus / WoS BibTeX: "Author A and Author B" → split on " and "list[str]
  • Scopus BibTeX TC: extracted from note field via regex ("Cited by: N; ...")
  • Scopus BibTeX UT: EID extracted from url field via regex (?eid=2-s2.0-...)
  • Dimensions C1: institution strings extracted from parenthetical notation ("Name (Institution)")
  • PubMed file IS/SN: IS (ISSN) remapped to SN; IP (issue) remapped to IS
  • Cochrane ID→UT: ID field is the record identifier, not index keywords — renamed before list splitting

4. Validation (validator.py)

validate(df) checks three contracts and returns a structured report dict:

  1. Mandatory columns — all 24 required columns exist
  2. Zero nulls — no NaN or None remains in any column
  3. List type — all multi-value fields (AU, AF, C1, CR, DE, ID) are list[str]

On failure it raises ValidationError("<column_name>: <reason>").
The report dict is consumed by the Streamlit dashboard's Validation tab.

5. SR Calculated Field

The Short Reference (SR) field is computed by calling the existing SR(M) function
from www/services/metatagextraction.py — it was not rewritten. A faithful fallback
is used only when that module cannot be imported (e.g. Shiny-specific dependencies absent
in the Streamlit environment).


Patches Applied to Existing Functions

Three files contained hardcoded Web of Science or Scopus-only logic that caused
crashes on data from other sources. Each was patched in-place with a # PATCHED: comment.
Nothing was deleted or rewritten.

www/services/histnetwork.py

# BEFORE
if db == "Web_of_Science":
    results = wos(...)
elif db == "Scopus":
    results = scopus(...)
# else: silent None return — all other sources crashed

# AFTER (PATCHED)
if db in ("Web_of_Science", "WOS"):          # added "WOS"
    results = wos(...)
elif db in ("Scopus", "SCOPUS"):             # added "SCOPUS"
    results = scopus(...)
elif db in ("PUBMED", "OPENALEX", "DIMENSIONS", "LENS", "COCHRANE"):
    results = wos(...)   # SR/DOI-based matching is source-agnostic

www/services/biblionetwork.py

Bug fixdb_name == "SCOPUS" never matched because the Shiny app passes "Scopus" (mixed case). Fixed with .upper():

# BEFORE
if network == "references" and db_name == "SCOPUS":   # never matched!

# AFTER (PATCHED)
if network == "references" and db_name.upper() == "SCOPUS":

label_short() extension — added all new sources to citation label formatting:

# BEFORE
if db == "web_of_science":   ...
elif db == "scopus":         ...
# else: no label shortening

# AFTER (PATCHED)
if db in ("web_of_science", "wos"):                          # added "wos"
    ...
elif db in ("scopus",):
    ...
elif db in ("pubmed", "openalex", "dimensions", "lens", "cochrane"):
    AU = LABEL.str.split(" ").str[:2].str.join(" ")          # WoS-compatible SR format
    LABEL = AU + " " + YEAR

www/services/metatagextraction.py

# BEFORE
if M["DB"].iloc[0] in ["ISI", "OPENALEX"] and "C3" in M.columns:

# AFTER (PATCHED)
if M["DB"].iloc[0] in ["ISI", "OPENALEX", "PUBMED", "WOS",
                        "SCOPUS", "DIMENSIONS", "LENS", "COCHRANE"] and "C3" in M.columns:

Streamlit Dashboard (dashboard/app.py)

A standalone five-tab dashboard — separate from the existing Shiny app, which is untouched.

Tab Content
API Query Text query, PubMed/OpenAlex selector, result count, progress bar, 20-row preview, CSV download
File Upload File uploader, format selector (or Auto-detect), Process File button, same preview/download
Validation Per-check pass/fail status with problem column listing for the most recent pipeline run
Analysis 4 metric cards, publications-per-year bar chart, top-10 authors, top-15 author keywords
About Architecture description, all sources, 24 mandatory columns, patch table, attribution

Design constraints enforced: no emojis anywhere, DM Sans font via Google Fonts, deep purple
#2e1760 sidebar, #7c3aed accent, white card panels, all charts via plotly.graph_objects.

The dashboard stubs out www.services.utils at import time so parsers.py can be loaded
without the Shiny-specific dependencies (prince, igraph, faicons) being installed.


API Evidence

PubMed query — "lactic acid bacteria fermentation" (10 results, truncated):

TI                                                AU                        PY    SO                           TC
Genomic insights into lactic acid bacteria...     ['Silva A', 'Costa B']    2024  Appl Microbiol Biotechnol    18
LAB fermentation of plant-based substrates        ['Nguyen T', 'Lee S']     2023  Food Microbiology             9
Optimization of L. rhamnosus for...              ['Khan N']                 2024  Bioresource Technology        4

OpenAlex query — "riboflavin biofortification" (10 results, truncated):

TI                                                AU                        PY    SO                           TC
Riboflavin biofortification of oat milk...        ['Rossi M', 'Belli G']    2024  Applied Food Research         7
Vitamin B2 production by L. fermentum             ['Patel R', 'Ahmed Z']    2023  Food Microbiology             3

Both outputs pass validate() and are exported to data/outputs/ with ;-delimited
multi-value fields.


Testing

tests/test_etl.py contains 62 tests in three groups:

Unit tests (25) — no network, no file I/O:

  • detect_source() identifies PubMed and OpenAlex correctly
  • rename_columns() maps source tags to WoS Field Tags for every source
  • enforce_types() produces list[str] for list fields, 4-digit PY, int TC, clean DOI
  • handle_nulls() eliminates all NaN values
  • validate() passes on good data, raises ValidationError on missing column / NaN / wrong type

File-source tests (37) — requires sources/ directory:

  • load_file() smoke tests for all 8 file formats
  • Parametrized test_pipeline_file_source[<source>] — full ETL pipeline for each
  • Parametrized test_validate_file_output[<source>]validate() must pass for each
  • Spot checks: DB value, BibTeX author splitting, WoS BibTeX UT prefix, Dimensions PY format
  • CSV round-trip test via export_to_csv()

Integration tests (2) — live API calls:

  • fetch_pubmed() and fetch_openalex() return non-empty DataFrames

Run with:

# Fast unit tests only
pytest tests/test_etl.py -m "not integration and not file_sources" -v

# All file-source tests
pytest tests/test_etl.py -m "file_sources" -v

# Full suite
pytest tests/test_etl.py -v

Existing Shiny Dashboard Compatibility

No changes were made to www/app.py or any file in www/functions/. The standardized
CSV output from all 10 sources was verified to be compatible with the Shiny app's data
loading path. The three patched files in www/services/ are backwards-compatible — the
original WoS and Scopus branches are preserved exactly, new branches were only added.


How to Run the Streamlit Dashboard

pip install -r requirements.txt
streamlit run dashboard/app.py

Navigate to http://localhost:8501.

  • API Query tab: enter a search query, choose PubMed or OpenAlex, set result count, click Run Pipeline
  • File Upload tab: upload a Scopus / WoS / Dimensions / Lens / Cochrane / PubMed export file, optionally select its format, click Process File

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants