Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ACID violation: implicit lose transaction when the connection pool is reopened #209

Closed
rodion-solovev-7 opened this issue Mar 21, 2024 · 5 comments

Comments

@rodion-solovev-7
Copy link

What the hell?

Let's imagine that we have a web server intensively processing 8+ parallel requests by a handler like this:

async def create_delayed_send():
    try:
        async with manager.atomic():
            await manager.create(MoneyTransferHistory, amount=5)
            await some_long_working_logic_without_sql_writes()
            await manager.create(QueuedTmpTaskForTransferMoney, amount=5, ready=True)
            # QueuedTmpTaskForTransferMoney will be autodeleted later (after success execute)
    except ...:
        # process exception with logic, db connections, etc
        ...

The code above relies on transactionality: delayed transfer task should only be created with corresponding historical record. And most of the time it will work as expected...

But in some rarely cases, this can lead to creation of delayed task without corresponding historical record.
This happens completely implicitly. You may not even see the corresponding error.

When it occurs?

  • peewee-async==0.9.1
  • Using any of Pooled*Database (example: peewee_async.PooledPostgresqlDatabase)
  • After database connection broke
  • 2+ parallel asyncio.Task with running SQL queries

How to reproduce

Environment:

. venv/bin/activate
python3.8 -m venv venv
pip install peewee==3.16.3 peewee-async==0.9.1 aiopg==1.3.1 psycopg2-binary==2.9.1 pytest==6.2.3 pytest-aiohttp==0.3.0

Test:

import asyncio
import logging
import time

# python 3.8
# pip install peewee==3.16.3 peewee-async==0.9.1 pytest==6.2.3 pytest-aiohttp==0.3.0 aiopg==1.3.1 psycopg2-binary==2.9.1
import peewee
import peewee_async
import pytest

log = logging.getLogger(__name__)


pg_db = peewee_async.PooledPostgresqlDatabase(None)
manager = peewee_async.Manager(database=pg_db)


class MySimplestModel(peewee.Model):
    id = peewee.IntegerField(primary_key=True, sequence=True)

    class Meta:
        database = pg_db


@pytest.fixture(scope='session', autouse=True)
def loop():
    return asyncio.new_event_loop()


@pytest.fixture(scope='function', autouse=True)
async def init_db(loop):
    for i in range(10):
        try:
            pg_db.init(
                database='postgres',
                user='cyclops',
                password='cyclops',
                host='localhost',
                port=5432,
                max_connections=4,
            )
            pg_db.connect()
        except peewee.OperationalError:
            if i == 9:
                raise
            log.info('DB unavailable, just try another one')
            time.sleep(1)
            continue


@pytest.fixture(autouse=True)
async def create_and_clear_tables(init_db):
    await peewee_async._run_no_result_sql(  # noqa
        database=manager.database,
        operation='CREATE TABLE IF NOT EXISTS MySimplestModel (id SERIAL PRIMARY KEY);',
    )
    await peewee_async._run_no_result_sql(  # noqa
        database=manager.database,
        operation='TRUNCATE TABLE MySimplestModel;',
    )


async def task_multi_save_atomic(event_for_wait: asyncio.Event):
    await event_for_wait.wait()
    async with manager.atomic():
        # BEGIN
        # INSERT 1
        await manager.create(MySimplestModel)

        # Это место для падения метеорита.
        # Тут произойдёт разрыв соединения и подключение заново.
        # event здесь нужен чтобы таска не упала с исключением, а воспроизвела редкое поведение peewee-async
        await asyncio.sleep(0.05)
        await event_for_wait.wait()
        # Метеорит позади. event-loop вернул управление в таску

        # INSERT 2
        await manager.create(MySimplestModel)
        # COMMIT/ROLLBACK ???

    return None


async def task_restart_connections(event_for_lock: asyncio.Event) -> None:
    event_for_lock.set()
    # С этого момента БД доступна, пулл в порядке, всё хорошо. Таски могут работать работу
    await asyncio.sleep(0.05)

    event_for_lock.clear()

    await manager.database.close_async()
    await manager.database.connect_async()

    event_for_lock.set()

    # БД самопочинилась (пулл заполнен и готов к работе).
    # С этого момента таски опять могут работать работу
    return None


async def test_fuckin_peewee_on_connection_broke(create_and_clear_tables):
    # Этот event-семафор нужно чтобы удобно воссоздать редкую ситуацию,
    # когда разрыв + восстановление происходит до того, как в asyncio.Task вернётся управление.
    # В дикой природе такое происходит редко и при определённых стечениях обстоятельств,
    # но приносит ощутимый ущерб
    event = asyncio.Event()

    results = await asyncio.gather(
        task_restart_connections(event),
        task_multi_save_atomic(event),
        return_exceptions=True,
    )
    # Проверяем, что ни одна из тасок не упала с исключением
    # assert list(results) == [None, None]

    # (!) Убеждаемся, что атомарность работает (!)
    # Т.е. у нас должны либо закоммититься 2 записи, либо ни одной
    a = list(await manager.execute(MySimplestModel.select()))
    assert len(a) in (0, 2), f'WTF, peewee-async ?! Saved rows: {a}'
    # Если assert выше упал, то в БД оказалась 1 запись, а не 0 или 2.
    # Хотя мы на уровне кода пытались гарантировать, что такого не будет
@alwavshumilin
Copy link

I believe that it affects all types of Database classes, since they all use the same mechanism of re-connection (and pools from driver)

@kalombos
Copy link
Collaborator

related to #123

@kalombos
Copy link
Collaborator

kalombos commented Mar 22, 2024

@rodion-solovev-7 i merged your test into master with skip param #210. Thank you. I hope we can fix it some time :) Also i tried to cancel transaction tasks.

    async def close_async(self):
        """Close async connection.
        """
        if self._async_wait:
            await self._async_wait
        if self._async_conn:
            conn = self._async_conn
            self._async_conn = None
            self._async_wait = None
            self._task_data = None
            if self._task_data is not None:
                for data in self._task_data.data.values():
                    # i had addded code to save task into _task_data
                    data['task'].cancel()
            await conn.close()

The test had been passed. All i can say :)

@rodion-solovev-7
Copy link
Author

@kalombos, Thank you for your response.
I'll try tasks cancellation as a temporary solution of this problem.

But I'll be watching for official fix too :)

@kalombos
Copy link
Collaborator

kalombos commented Apr 3, 2024

Some news guys. Have a look at these problems that i found. And this commit should fix them including the issue. The test is green now :)

@kalombos kalombos closed this as completed May 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants