diff --git a/CHANGELOG.md b/CHANGELOG.md index 2066d4d..15375ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 2.3.0 + +- Add `include_tables` connection option to sync only the listed tables (mutually exclusive with `skip_tables`) + ## 2.2.0 - Add support for constraints (geodiff 2.1.0) diff --git a/config.py b/config.py index f1d84c0..6b39550 100644 --- a/config.py +++ b/config.py @@ -11,6 +11,7 @@ import smtplib import subprocess import tempfile +import typing from dynaconf import Dynaconf import dynaconf @@ -89,6 +90,11 @@ def validate_config(config): "Config error: Name of the Mergin Maps project should be provided in the namespace/name format." ) + if "skip_tables" in conn and "include_tables" in conn: + raise ConfigError( + "Config error: `skip_tables` and `include_tables` cannot both be set for the same connection." + ) + if "skip_tables" in conn: if conn.skip_tables is None: continue @@ -101,7 +107,21 @@ def validate_config(config): conn.skip_tables, list, ): - raise ConfigError("Config error: Ignored tables parameter should be a list") + raise ConfigError("Config error: `skip_tables` parameter should be a list") + + if "include_tables" in conn: + if conn.include_tables is None: + continue + elif isinstance( + conn.include_tables, + str, + ): + continue + elif not isinstance( + conn.include_tables, + list, + ): + raise ConfigError("Config error: `include_tables` parameter should be a list") if "notification" in config: settings = [ @@ -144,26 +164,35 @@ def validate_config(config): raise ConfigError(f"Config SMTP Error: {err}.") +def _get_tables(tables: typing.Union[None, str, typing.List, dynaconf.vendor.box.box_list.BoxList], param_name: str): + if tables is None: + return [] + elif isinstance(tables, str): + return [tables] + elif isinstance(tables, list): + if len(tables) < 1: + return [] + elif isinstance(tables, dynaconf.vendor.box.box_list.BoxList): + return tables.to_list() + return tables + else: + raise ConfigError(f"Config error: `{param_name}` parameter should be a list or a string.") + + def get_ignored_tables( connection, ): if "skip_tables" in connection: - if connection.skip_tables is None: - return [] - elif isinstance( - connection.skip_tables, - str, - ): - return [connection.skip_tables] - elif isinstance( - connection.skip_tables, - list, - ): - if len(connection.skip_tables) < 1: - return [] - elif isinstance(connection.skip_tables, dynaconf.vendor.box.box_list.BoxList): - return connection.skip_tables.to_list() - return connection.skip_tables + return _get_tables(connection.skip_tables, "skip_tables") + else: + return [] + + +def get_include_tables( + connection, +): + if "include_tables" in connection: + return _get_tables(connection.include_tables, "include_tables") else: return [] diff --git a/dbsync.py b/dbsync.py index 1e11654..1214e21 100644 --- a/dbsync.py +++ b/dbsync.py @@ -42,6 +42,7 @@ config, validate_config, get_ignored_tables, + get_include_tables, ConfigError, ) @@ -96,6 +97,23 @@ def _tables_list_to_string( return ";".join(tables) +def _tables_filter_args( + ignored_tables, + include_tables, +): + """Build the geodiff CLI args for table filtering. + + ``skip_tables`` and ``include_tables`` are mutually exclusive (validated in + ``validate_config``), so at most one of them is ever set. Returns an empty + list when no filtering is configured. + """ + if include_tables: + return ["--include-tables", _tables_list_to_string(include_tables)] + if ignored_tables: + return ["--skip-tables", _tables_list_to_string(ignored_tables)] + return [] + + def _check_has_working_dir( work_path, ): @@ -199,35 +217,21 @@ def _geodiff_create_changeset( modified, changeset, ignored_tables, + include_tables, ): - if ignored_tables: - _run_geodiff( - [ - config.geodiff_exe, - "diff", - "--driver", - driver, - conn_info, - "--skip-tables", - _tables_list_to_string(ignored_tables), - base, - modified, - changeset, - ] - ) - else: - _run_geodiff( - [ - config.geodiff_exe, - "diff", - "--driver", - driver, - conn_info, - base, - modified, - changeset, - ] - ) + _run_geodiff( + [ + config.geodiff_exe, + "diff", + "--driver", + driver, + conn_info, + *_tables_filter_args(ignored_tables, include_tables), + base, + modified, + changeset, + ] + ) def _geodiff_apply_changeset( @@ -236,33 +240,20 @@ def _geodiff_apply_changeset( base, changeset, ignored_tables, + include_tables, ): - if ignored_tables: - _run_geodiff( - [ - config.geodiff_exe, - "apply", - "--driver", - driver, - conn_info, - "--skip-tables", - _tables_list_to_string(ignored_tables), - base, - changeset, - ] - ) - else: - _run_geodiff( - [ - config.geodiff_exe, - "apply", - "--driver", - driver, - conn_info, - base, - changeset, - ] - ) + _run_geodiff( + [ + config.geodiff_exe, + "apply", + "--driver", + driver, + conn_info, + *_tables_filter_args(ignored_tables, include_tables), + base, + changeset, + ] + ) def _geodiff_rebase( @@ -273,37 +264,22 @@ def _geodiff_rebase( base2their, conflicts, ignored_tables, + include_tables, ): - if ignored_tables: - _run_geodiff( - [ - config.geodiff_exe, - "rebase-db", - "--driver", - driver, - conn_info, - "--skip-tables", - _tables_list_to_string(ignored_tables), - base, - our, - base2their, - conflicts, - ] - ) - else: - _run_geodiff( - [ - config.geodiff_exe, - "rebase-db", - "--driver", - driver, - conn_info, - base, - our, - base2their, - conflicts, - ] - ) + _run_geodiff( + [ + config.geodiff_exe, + "rebase-db", + "--driver", + driver, + conn_info, + *_tables_filter_args(ignored_tables, include_tables), + base, + our, + base2their, + conflicts, + ] + ) def _geodiff_list_changes_details( @@ -368,39 +344,23 @@ def _geodiff_make_copy( dst_conn_info, dst, ignored_tables, + include_tables, ): - if ignored_tables: - _run_geodiff( - [ - config.geodiff_exe, - "copy", - "--driver-1", - src_driver, - src_conn_info, - "--driver-2", - dst_driver, - dst_conn_info, - "--skip-tables", - _tables_list_to_string(ignored_tables), - src, - dst, - ] - ) - else: - _run_geodiff( - [ - config.geodiff_exe, - "copy", - "--driver-1", - src_driver, - src_conn_info, - "--driver-2", - dst_driver, - dst_conn_info, - src, - dst, - ] - ) + _run_geodiff( + [ + config.geodiff_exe, + "copy", + "--driver-1", + src_driver, + src_conn_info, + "--driver-2", + dst_driver, + dst_conn_info, + *_tables_filter_args(ignored_tables, include_tables), + src, + dst, + ] + ) def _geodiff_create_changeset_dr( @@ -412,41 +372,24 @@ def _geodiff_create_changeset_dr( dst, changeset, ignored_tables, + include_tables, ): - if ignored_tables: - _run_geodiff( - [ - config.geodiff_exe, - "diff", - "--driver-1", - src_driver, - src_conn_info, - "--driver-2", - dst_driver, - dst_conn_info, - "--skip-tables", - _tables_list_to_string(ignored_tables), - src, - dst, - changeset, - ] - ) - else: - _run_geodiff( - [ - config.geodiff_exe, - "diff", - "--driver-1", - src_driver, - src_conn_info, - "--driver-2", - dst_driver, - dst_conn_info, - src, - dst, - changeset, - ] - ) + _run_geodiff( + [ + config.geodiff_exe, + "diff", + "--driver-1", + src_driver, + src_conn_info, + "--driver-2", + dst_driver, + dst_conn_info, + *_tables_filter_args(ignored_tables, include_tables), + src, + dst, + changeset, + ] + ) def _compare_datasets( @@ -457,6 +400,7 @@ def _compare_datasets( dst_conn_info, dst, ignored_tables, + include_tables, summary_only=True, ): """Compare content of two datasets (from various drivers) and return geodiff JSON summary of changes""" @@ -480,6 +424,7 @@ def _compare_datasets( dst, tmp_changeset, ignored_tables, + include_tables, ) if summary_only: return _geodiff_list_changes_summary(tmp_changeset) @@ -730,6 +675,7 @@ def pull(conn_cfg, mc): logging.debug(f"Processing Mergin Maps project '{conn_cfg.mergin_project}'") ignored_tables = get_ignored_tables(conn_cfg) + include_tables = get_include_tables(conn_cfg) project_name = conn_cfg.mergin_project.split("/")[1] work_dir = os.path.join( @@ -745,7 +691,10 @@ def pull(conn_cfg, mc): _check_has_sync_file(gpkg_full_path) mp = _get_mergin_project(work_dir) - mp.set_tables_to_skip(ignored_tables) + if include_tables: + mp.set_tables_to_include(include_tables) + else: + mp.set_tables_to_skip(ignored_tables) if mp.geodiff is None: raise DbSyncError("Mergin Maps client installation problem: geodiff not available") @@ -807,6 +756,7 @@ def pull(conn_cfg, mc): conn_cfg.modified, tmp_base2our, ignored_tables, + include_tables, ) needs_rebase = False @@ -834,6 +784,7 @@ def pull(conn_cfg, mc): gpkg_basefile, tmp_base2their, ignored_tables, + include_tables, ) # summarize changes @@ -845,8 +796,12 @@ def pull(conn_cfg, mc): if not needs_rebase: logging.debug("Applying new version [no rebase]") - _geodiff_apply_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, tmp_base2their, ignored_tables) - _geodiff_apply_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, tmp_base2their, ignored_tables) + _geodiff_apply_changeset( + conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, tmp_base2their, ignored_tables, include_tables + ) + _geodiff_apply_changeset( + conn_cfg.driver, conn_cfg.conn_info, conn_cfg.modified, tmp_base2their, ignored_tables, include_tables + ) else: logging.debug("Applying new version [WITH rebase]") tmp_conflicts = os.path.join(tmp_dir, f"{project_name}-dbsync-pull-conflicts") @@ -858,8 +813,11 @@ def pull(conn_cfg, mc): tmp_base2their, tmp_conflicts, ignored_tables, + include_tables, + ) + _geodiff_apply_changeset( + conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, tmp_base2their, ignored_tables, include_tables ) - _geodiff_apply_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, tmp_base2their, ignored_tables) os.remove(gpkg_basefile_old) conn = psycopg2.connect(conn_cfg.conn_info) @@ -877,6 +835,7 @@ def status(conn_cfg, mc): logging.debug(f"Processing Mergin Maps project '{conn_cfg.mergin_project}'") ignored_tables = get_ignored_tables(conn_cfg) + include_tables = get_include_tables(conn_cfg) project_name = conn_cfg.mergin_project.split("/")[1] @@ -894,7 +853,10 @@ def status(conn_cfg, mc): # get basic information mp = _get_mergin_project(work_dir) - mp.set_tables_to_skip(ignored_tables) + if include_tables: + mp.set_tables_to_include(include_tables) + else: + mp.set_tables_to_skip(ignored_tables) if mp.geodiff is None: raise DbSyncError("Mergin Maps client installation problem: geodiff not available") project_path = mp.project_full_name() @@ -924,7 +886,7 @@ def status(conn_cfg, mc): logging.debug("") logging.debug("Server is at version " + server_info["version"]) - status_pull = mp.get_pull_changes(server_info["files"]) + status_pull = mp.get_pull_changes(server_info["files"], server_info["version"]) if status_pull["added"] or status_pull["updated"] or status_pull["removed"]: logging.debug("There are pending changes on server:") _print_mergin_changes(status_pull) @@ -960,6 +922,7 @@ def status(conn_cfg, mc): conn_cfg.modified, tmp_changeset_file, ignored_tables, + include_tables, ) if os.path.getsize(tmp_changeset_file) == 0: @@ -976,6 +939,7 @@ def push(conn_cfg, mc): logging.debug(f"Processing Mergin Maps project '{conn_cfg.mergin_project}'") ignored_tables = get_ignored_tables(conn_cfg) + include_tables = get_include_tables(conn_cfg) project_name = conn_cfg.mergin_project.split("/")[1] @@ -999,7 +963,10 @@ def push(conn_cfg, mc): _check_has_sync_file(gpkg_full_path) mp = _get_mergin_project(work_dir) - mp.set_tables_to_skip(ignored_tables) + if include_tables: + mp.set_tables_to_include(include_tables) + else: + mp.set_tables_to_skip(ignored_tables) if mp.geodiff is None: raise DbSyncError("Mergin Maps client installation problem: geodiff not available") @@ -1046,6 +1013,7 @@ def push(conn_cfg, mc): conn_cfg.modified, tmp_changeset_file, ignored_tables, + include_tables, ) if os.path.getsize(tmp_changeset_file) == 0: @@ -1058,7 +1026,7 @@ def push(conn_cfg, mc): # write changes to the local geopackage logging.debug("Writing DB changes to working dir...") - _geodiff_apply_changeset("sqlite", "", gpkg_full_path, tmp_changeset_file, ignored_tables) + _geodiff_apply_changeset("sqlite", "", gpkg_full_path, tmp_changeset_file, ignored_tables, include_tables) # write to the server try: @@ -1072,7 +1040,9 @@ def push(conn_cfg, mc): # update base schema in the DB logging.debug("Updating DB base schema...") - _geodiff_apply_changeset(conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, tmp_changeset_file, ignored_tables) + _geodiff_apply_changeset( + conn_cfg.driver, conn_cfg.conn_info, conn_cfg.base, tmp_changeset_file, ignored_tables, include_tables + ) _set_db_project_comment(conn, conn_cfg.base, conn_cfg.mergin_project, version) @@ -1085,6 +1055,7 @@ def init( logging.debug(f"Processing Mergin Maps project '{conn_cfg.mergin_project}'") ignored_tables = get_ignored_tables(conn_cfg) + include_tables = get_include_tables(conn_cfg) project_name = conn_cfg.mergin_project.split("/")[1] @@ -1140,6 +1111,7 @@ def init( conn_cfg.conn_info, conn_cfg.base, ignored_tables, + include_tables, summary_only=False, ) changes = json.dumps(changes_gpkg_base, indent=2) @@ -1221,6 +1193,7 @@ def init( conn_cfg.conn_info, conn_cfg.modified, ignored_tables, + include_tables, ) logging.debug("Checking 'base' schema content...") summary_base = _compare_datasets( @@ -1231,6 +1204,7 @@ def init( conn_cfg.conn_info, conn_cfg.base, ignored_tables, + include_tables, ) if len(summary_base): # seems someone modified base schema manually - this should never happen! @@ -1274,6 +1248,7 @@ def init( conn_cfg.conn_info, conn_cfg.modified, ignored_tables, + include_tables, ) # COPY: modified -> base @@ -1285,6 +1260,7 @@ def init( conn_cfg.conn_info, conn_cfg.base, ignored_tables, + include_tables, ) # sanity check to verify that right after initialization we do not have any changes @@ -1298,6 +1274,7 @@ def init( conn_cfg.conn_info, conn_cfg.base, ignored_tables, + include_tables, summary_only=False, ) # mark project version into db schema @@ -1340,6 +1317,7 @@ def init( "", gpkg_full_path, ignored_tables, + include_tables, ) logging.debug("Checking 'base' schema content...") summary_base = _compare_datasets( @@ -1350,6 +1328,7 @@ def init( "", gpkg_full_path, ignored_tables, + include_tables, ) if len(summary_base): logging.debug( @@ -1393,6 +1372,7 @@ def init( conn_cfg.conn_info, conn_cfg.base, ignored_tables, + include_tables, ) # COPY: modified -> gpkg @@ -1404,6 +1384,7 @@ def init( "", gpkg_full_path, ignored_tables, + include_tables, ) # sanity check to verify that right after initialization we do not have any changes @@ -1417,6 +1398,7 @@ def init( conn_cfg.conn_info, conn_cfg.base, ignored_tables, + include_tables, summary_only=False, ) if len(changes_gpkg_base): diff --git a/docs/using.md b/docs/using.md index 2d83b6e..a058734 100644 --- a/docs/using.md +++ b/docs/using.md @@ -78,11 +78,15 @@ daemon: - `--test-notification-email` used to test send notification email (see below for details about sending emails in case of sync fails) -## Excluding tables from sync +## Selecting which tables are synced -Sometimes in the database there are tables that should not be synchronised to Mergin Maps projects. It is possible to ignore -these tables and not sync them. To do so add `skip_tables` setting to the corresponding `connections` entry in the config -file: +Sometimes in the database there are tables that should not be synchronised to Mergin Maps projects. There are two +mutually exclusive ways to control which tables get synced. Use **only one** of them per connection - setting both +`skip_tables` and `include_tables` for the same connection is a configuration error. + +### Excluding tables (`skip_tables`) + +Add `skip_tables` to the corresponding `connections` entry to ignore the listed tables and sync everything else: ```yaml connections: @@ -95,6 +99,22 @@ connections: - table2 ``` +### Including only specific tables (`include_tables`) + +Alternatively, add `include_tables` to sync **only** the listed tables and ignore everything else. This is useful when +a database contains many tables but only a few should be synced: + +```yaml +connections: + - driver: postgres + # ... + mergin_project: john/myproject + sync_file: sync.gpkg + include_tables: + - table1 + - table2 +``` + ## Email notifications on sync failures To simplify db-sync monitoring, it is possible to set up notification emails when a sync failure happens. Simply add `notification` section in the configuration file as described below. diff --git a/test/conftest.py b/test/conftest.py index 49f9af5..6db288d 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -50,6 +50,9 @@ def _reset_config(project_name: str = "mergin", init_from: str = "gpkg"): } ) + if "NOTIFICATION" in config: + config.unset("NOTIFICATION", force=True) + def cleanup( mc: MerginClient, @@ -80,7 +83,9 @@ def cleanup_db( cur.execute("COMMIT") -def init_sync_from_geopackage(mc, project_name, source_gpkg_path, ignored_tables=[], *extra_init_files): +def init_sync_from_geopackage( + mc, project_name, source_gpkg_path, ignored_tables=[], include_tables=None, *extra_init_files +): """ Initialize sync from given GeoPackage file: - (re)create Mergin Maps project with the file @@ -152,6 +157,12 @@ def init_sync_from_geopackage(mc, project_name, source_gpkg_path, ignored_tables elif isinstance(ignored_tables, list): connection["skip_tables"] = ignored_tables + if include_tables: + if isinstance(include_tables, str): + connection["include_tables"] = [include_tables] + elif isinstance(include_tables, list): + connection["include_tables"] = include_tables + config.update( { "GEODIFF_EXE": GEODIFF_EXE, diff --git a/test/test_basic.py b/test/test_basic.py index 64220a3..b5d31f3 100644 --- a/test/test_basic.py +++ b/test/test_basic.py @@ -1005,3 +1005,64 @@ def test_dbsync_clean_from_gpkg( dbsync_init(mc) dbsync_pull(mc) dbsync_push(mc) + + +def test_init_with_include_tables( + mc: MerginClient, +): + project_name = "test_init_include_tables" + source_gpkg_path = os.path.join( + TEST_DATA_DIR, + "base_2tables.gpkg", + ) + project_dir = os.path.join( + TMP_DIR, + project_name + "_work", + ) + db_schema_main = project_name + "_main" + + init_sync_from_geopackage( + mc, + project_name, + source_gpkg_path, + include_tables=["points"], + ) + + # only points should exist in the main schema, lines should be absent + conn = psycopg2.connect(DB_CONNINFO) + cur = conn.cursor() + cur.execute( + sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format( + sql.Identifier(db_schema_main) + ) + ) + assert cur.fetchone()[0] == False + + cur.execute(sql.SQL("SELECT count(*) from {}.points").format(sql.Identifier(db_schema_main))) + assert cur.fetchone()[0] == 0 + + # run init again, nothing should change + dbsync_init(mc) + cur.execute( + sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format( + sql.Identifier(db_schema_main) + ) + ) + assert cur.fetchone()[0] == False + + # push a change that touches both tables, only points should be pulled + shutil.copy( + os.path.join(TEST_DATA_DIR, "modified_all.gpkg"), + os.path.join(project_dir, "test_sync.gpkg"), + ) + mc.push_project(project_dir) + + dbsync_pull(mc) + cur.execute( + sql.SQL("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{}' AND tablename = 'lines');").format( + sql.Identifier(db_schema_main) + ) + ) + assert cur.fetchone()[0] == False + cur.execute(sql.SQL("SELECT count(*) from {}.points").format(sql.Identifier(db_schema_main))) + assert cur.fetchone()[0] == 4 diff --git a/test/test_config.py b/test/test_config.py index 8d0258e..0275bf7 100644 --- a/test/test_config.py +++ b/test/test_config.py @@ -8,7 +8,7 @@ import pytest -from config import ConfigError, config, get_ignored_tables, validate_config +from config import ConfigError, config, get_ignored_tables, get_include_tables, validate_config from .conftest import _reset_config @@ -345,3 +345,47 @@ def test_config_notification_setup(): with pytest.raises(ConfigError, match="Config SMTP Error"): validate_config(config) + + +def test_include_tables(): + _reset_config() + base = dict(config.connections[0]) + + for value in (None, [], "table", ["table"]): + config.update({"CONNECTIONS": [{**base, "include_tables": value}]}) + validate_config(config) + + # invalid type + config.update({"CONNECTIONS": [{**base, "include_tables": 42}]}) + with pytest.raises(ConfigError, match="`include_tables` parameter should be a list"): + validate_config(config) + + +def test_get_include_tables(): + _reset_config() + base = dict(config.connections[0]) + + config.update({"CONNECTIONS": [{**base, "include_tables": None}]}) + assert get_include_tables(config.connections[0]) == [] + + config.update({"CONNECTIONS": [{**base, "include_tables": []}]}) + assert get_include_tables(config.connections[0]) == [] + + config.update({"CONNECTIONS": [{**base, "include_tables": "table"}]}) + assert get_include_tables(config.connections[0]) == ["table"] + + config.update({"CONNECTIONS": [{**base, "include_tables": ["table"]}]}) + assert get_include_tables(config.connections[0]) == ["table"] + + # connection without include_tables configured + config.update({"CONNECTIONS": [base]}) + assert get_include_tables(config.connections[0]) == [] + + +def test_skip_and_include_tables_mutually_exclusive(): + _reset_config() + base = dict(config.connections[0]) + + config.update({"CONNECTIONS": [{**base, "skip_tables": ["a"], "include_tables": ["b"]}]}) + with pytest.raises(ConfigError, match="cannot both be set"): + validate_config(config) diff --git a/version.py b/version.py index 8a124bf..55e4709 100644 --- a/version.py +++ b/version.py @@ -1 +1 @@ -__version__ = "2.2.0" +__version__ = "2.3.0"