Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Two bots simultaneously calling get_joined_members causes error #160

Open
MxMarx opened this issue Sep 25, 2023 · 2 comments
Open

Two bots simultaneously calling get_joined_members causes error #160

MxMarx opened this issue Sep 25, 2023 · 2 comments

Comments

@MxMarx
Copy link

MxMarx commented Sep 25, 2023

I'm using get_joined_members() for a bot to determine if a room is a direct message.

If multiple instances of the bot are running, I occasionally get this this error:

Failed to run handler
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/mautrix/client/syncer.py", line 235, in _catch_errors
    await handler(data)
  File "/usr/local/lib/python3.11/site-packages/mautrix/crypto/machine.py", line 167, in handle_device_lists
    await self._fetch_keys(device_lists.changed, include_untracked=False)
  File "/usr/local/lib/python3.11/site-packages/mautrix/crypto/device_lists.py", line 80, in _fetch_keys
    await self.crypto_store.put_devices(user_id, new_devices)
  File "/usr/local/lib/python3.11/site-packages/mautrix/crypto/store/asyncpg/store.py", line 615, in put_devices
    await conn.copy_records_to_table("crypto_device", records=data, columns=columns)
  File "/usr/local/lib/python3.11/site-packages/mautrix/util/async_db/connection.py", line 38, in wrapper
    ret = await func(self, arg, *args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/mautrix/util/async_db/connection.py", line 151, in copy_records_to_table
    return await self.wrapped.copy_records_to_table(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/asyncpg/connection.py", line 983, in copy_records_to_table
    return await self._protocol.copy_in(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "asyncpg/protocol/protocol.pyx", line 529, in copy_in
asyncpg.exceptions.UniqueViolationError: duplicate key value violates unique constraint "crypto_device_pkey"
DETAIL:  Key (user_id, device_id)=(@xxxx:yyyy, zzzz) already exists.

It looks like this is happening because set_members works by deleting a room's members from the database and then inserting the new member list returned by get_joined_members, so if this function is simultaneously called by two bots, the old rows and removed at the same time and then duplicate new rows are added which results in the error.

It seems like this could be fixed by adding an ON CONFLICT (user_id, device_id) DO NOTHING, but I haven't done a pull request because set_members uses copy_records_to_table for postgresql and executemany for the other databases and I'm not familiar enough with them to be confident.

@tulir
Copy link
Member

tulir commented Sep 25, 2023

Having multiple programs using the same database is usually not a good idea, although not sure why it isn't working even then, that method uses a transaction for both the delete and copy https://github.com/mautrix/python/blob/master/mautrix/client/state_store/asyncpg/store.py#L156

@MxMarx
Copy link
Author

MxMarx commented Sep 26, 2023

Weird, it seems like the transaction should prevent this. I'm using ansible to install the maubot container, I'll take a look at my configuration to see if I broke anything.

Here's a minimal example. If I create two instances of this plugin in the Maubot web-ui, around half the time, just one will respond to the command.

from maubot import Plugin, MessageEvent
from maubot.handlers import command, event

class BotThatWorksDifferentlyInDirectMessages(Plugin):
    @command.new(name="isdirect")
    async def is_direct(self, evt: MessageEvent) -> None:
        joined_members = await self.client.get_joined_members(room_id=evt.room_id)
        if len(joined_members) <= 2:
            await evt.reply(f"There are {len(joined_members)} members in this room so it's a direct message!")
        else:
            await evt.reply(f"There are {len(joined_members)} members in this room so it's a group chat!")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants