From: Simon Glass <simon.glass@canonical.com> get_schema_version() catches OperationalError and returns 0 to indicate a fresh database. However, it catches all OperationalError variants, including "attempt to write a readonly database" and "database is locked". When this happens, migrate_to() misinterprets an existing database as empty and re-runs all migrations from scratch, destroying the data. Narrow the catch to only match "no such table", which is the genuine fresh-database case. All other OperationalError variants are re-raised so that callers see the real problem instead of silently losing data. Signed-off-by: Simon Glass <simon.glass@canonical.com> --- tools/pickman/database.py | 14 ++++++++++++-- tools/pickman/ftest.py | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/tools/pickman/database.py b/tools/pickman/database.py index 317668a979d..28b28896cb2 100644 --- a/tools/pickman/database.py +++ b/tools/pickman/database.py @@ -190,14 +190,24 @@ class Database: # pylint: disable=too-many-public-methods def get_schema_version(self): """Get the version of the database's schema + Only returns 0 when the schema_version table genuinely does not + exist (i.e. a fresh database). Other errors (read-only, locked, + corrupt) are re-raised so that migrate_to() does not accidentally + destroy an existing database by re-running all migrations. + Return: int: Database version, 0 means there is no data + + Raises: + sqlite3.OperationalError: For errors other than a missing table """ try: self.cur.execute('SELECT version FROM schema_version') return self.cur.fetchone()[0] - except sqlite3.OperationalError: - return 0 + except sqlite3.OperationalError as exc: + if 'no such table' in str(exc): + return 0 + raise def execute(self, query, parameters=()): """Execute a database query diff --git a/tools/pickman/ftest.py b/tools/pickman/ftest.py index 52c807cf8cc..261ca4cd2d5 100644 --- a/tools/pickman/ftest.py +++ b/tools/pickman/ftest.py @@ -10,6 +10,7 @@ import asyncio import argparse import os import shutil +import sqlite3 import subprocess import sys import tempfile @@ -398,6 +399,39 @@ class TestDatabase(unittest.TestCase): self.assertEqual(sources[1], ('branch-b', 'def456')) dbs.close() + def test_schema_version_readonly_raises(self): + """Test that a read-only database raises instead of returning 0. + + Previously get_schema_version() caught all OperationalError and + returned 0, which caused migrate_to() to re-create all tables on + top of an existing (read-only) database, wiping the data. + """ + with terminal.capture(): + dbs = database.Database(self.db_path) + dbs.start() + dbs.source_set('us/next', 'abc123') + dbs.commit() + + # Simulate a read-only error by replacing the cursor + real_cur = dbs.cur + mock_cur = mock.MagicMock() + mock_cur.execute.side_effect = sqlite3.OperationalError( + 'attempt to write a readonly database') + dbs.cur = mock_cur + with self.assertRaises(sqlite3.OperationalError): + dbs.get_schema_version() + dbs.cur = real_cur + dbs.close() + + def test_schema_version_missing_table_returns_zero(self): + """Test that a missing schema_version table returns 0.""" + with terminal.capture(): + dbs = database.Database(self.db_path) + dbs.open_it() + # Fresh database has no tables, should return 0 + self.assertEqual(dbs.get_schema_version(), 0) + dbs.close() + class TestDatabaseCommit(unittest.TestCase): """Tests for Database commit functions.""" -- 2.43.0