From 2856183120416b32a825f0ae9d2b2fae0661db39 Mon Sep 17 00:00:00 2001 From: Juanjo M Date: Sat, 31 Jan 2026 19:21:21 +0000 Subject: [PATCH] =?UTF-8?q?fix(mastodon):=20improve=20autocompletion=20sca?= =?UTF-8?q?n=20performance=20and=20UX=20-=20Fix=20O(n=C2=B2)=20to=20O(1)?= =?UTF-8?q?=20complexity=20when=20checking=20for=20duplicate=20users=20=20?= =?UTF-8?q?=20The=20original=20code=20used=20`if=20user=20not=20in=20list`?= =?UTF-8?q?=20which=20caused=20scans=20=20=20to=20hang=20for=20hours=20on?= =?UTF-8?q?=20accounts=20with=20many=20followers.=20Now=20uses=20dict=20?= =?UTF-8?q?=20=20keyed=20by=20user=20ID=20for=20instant=20lookups.=20-=20A?= =?UTF-8?q?dd=20detailed=20progress=20dialog=20with:=20=20=20-=20Account?= =?UTF-8?q?=20info=20(followers/following/total=20counts)=20=20=20-=20Curr?= =?UTF-8?q?ent=20status=20and=20page=20number=20=20=20-=20Users=20processe?= =?UTF-8?q?d=20counter=20=20=20-=20Visual=20progress=20bar=20=20=20-=20Can?= =?UTF-8?q?cel=20button=20to=20stop=20scan=20gracefully=20-=20Refactor=20s?= =?UTF-8?q?can=20logic=20into=20reusable=20=5Fscan=5Fusers()=20method=20wi?= =?UTF-8?q?th=20ScanType=20enum.=20-=20Show=20accurate=20completion=20stat?= =?UTF-8?q?s=20(new=20users=20vs=20already=20in=20database).=20-=20Fix=20w?= =?UTF-8?q?indow=20close=20behavior=20(X=20button=20now=20cancels=20instea?= =?UTF-8?q?d=20of=20crashing).=20-=20Add=2015=20unit=20tests=20for=20stora?= =?UTF-8?q?ge=20and=20scan=20logic.=20-=20Fix=20storage.=5F=5Fdel=5F=5F=20?= =?UTF-8?q?to=20handle=20already-closed=20connections.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../autocompletionUsers/mastodon/scan.py | 158 ++++++++++---- .../autocompletionUsers/mastodon/wx_scan.py | 115 +++++++++- src/extra/autocompletionUsers/storage.py | 12 +- src/test/extra/__init__.py | 1 + .../extra/autocompletionUsers/__init__.py | 1 + .../autocompletionUsers/test_mastodon_scan.py | 201 ++++++++++++++++++ .../extra/autocompletionUsers/test_storage.py | 115 ++++++++++ 7 files changed, 547 insertions(+), 56 deletions(-) create mode 100644 src/test/extra/__init__.py create mode 100644 src/test/extra/autocompletionUsers/__init__.py create mode 100644 src/test/extra/autocompletionUsers/test_mastodon_scan.py create mode 100644 src/test/extra/autocompletionUsers/test_storage.py diff --git a/src/extra/autocompletionUsers/mastodon/scan.py b/src/extra/autocompletionUsers/mastodon/scan.py index f0c7c5dd0..cca833206 100644 --- a/src/extra/autocompletionUsers/mastodon/scan.py +++ b/src/extra/autocompletionUsers/mastodon/scan.py @@ -1,13 +1,24 @@ # -*- coding: utf-8 -*- """ Scanning code for autocompletion feature on TWBlue. This module can retrieve user objects from the selected Mastodon account automatically. """ import time +import logging import wx import widgetUtils import output +from enum import Enum from pubsub import pub from . import wx_scan from extra.autocompletionUsers import manage, storage +log = logging.getLogger("extra.autocompletionUsers.mastodon.scan") + + +class ScanType(Enum): + """Enum to distinguish between scanning followers or following users.""" + FOLLOWING = "following" + FOLLOWERS = "followers" + + class autocompletionScan(object): def __init__(self, config, buffer, window): """ Class constructor. This class will take care of scanning the selected Mastodon account to populate the database with users automatically upon request. @@ -23,6 +34,7 @@ def __init__(self, config, buffer, window): self.config = config self.buffer = buffer self.window = window + self.progress_dialog = None def show_dialog(self): """ displays a dialog to confirm which buffers should be scanned (followers or following users). """ @@ -33,67 +45,119 @@ def show_dialog(self): confirmation = wx_scan.confirm() return confirmation + def get_user_counts(self): + """Get the followers and following counts from the user's account.""" + try: + credentials = self.buffer.session.api.account_verify_credentials() + followers_count = credentials.followers_count if self.dialog.get("followers") else 0 + following_count = credentials.following_count if self.dialog.get("friends") else 0 + return followers_count, following_count + except Exception as e: + log.exception(f"Error getting user counts: {e}") + return 0, 0 + def prepare_progress_dialog(self): - self.progress_dialog = wx_scan.autocompletionScanProgressDialog() - # connect method to update progress dialog - pub.subscribe(self.on_update_progress, "on-update-progress") + followers_count, following_count = self.get_user_counts() + self.progress_dialog = wx_scan.autocompletionScanProgressDialog( + followers_count=followers_count, + following_count=following_count + ) self.progress_dialog.Show() - def on_update_progress(self): - wx.CallAfter(self.progress_dialog.progress_bar.Pulse) + def update_progress(self, current_users, current_page, scanning_type): + """Update the progress dialog from the worker thread.""" + if self.progress_dialog: + wx.CallAfter(self.progress_dialog.update_progress, current_users, current_page, scanning_type) + + def is_cancelled(self): + """Check if the user has requested cancellation.""" + if self.progress_dialog: + return self.progress_dialog.cancelled + return False + + def _scan_users(self, scan_type, users_dict): + """Scan users of the specified type and add them to the users dictionary. + + :param scan_type: ScanType.FOLLOWING or ScanType.FOLLOWERS + :param users_dict: Dictionary to store users, keyed by user ID + :returns: True if completed, False if cancelled + """ + # Select the appropriate API method + if scan_type == ScanType.FOLLOWING: + api_method = self.buffer.session.api.account_following + else: + api_method = self.buffer.session.api.account_followers + + log.debug(f"Scanning {scan_type.value}...") + current_page = 1 + first_page = api_method(id=self.buffer.session.db["user_id"], limit=80) + self.update_progress(len(users_dict), current_page, scan_type.value) + + if first_page != None: + for user in first_page: + if user.id not in users_dict: + users_dict[user.id] = user + + next_page = first_page + while next_page != None: + if self.is_cancelled(): + log.info(f"Scan cancelled by user during {scan_type.value} scan.") + return False + time.sleep(0.25) # Small delay to avoid rate limiting + next_page = self.buffer.session.api.fetch_next(next_page) + current_page += 1 + self.update_progress(len(users_dict), current_page, scan_type.value) + if next_page == None: + break + for user in next_page: + if user.id not in users_dict: + users_dict[user.id] = user + log.debug(f"Scanned {len(users_dict)} users so far...") + + return True def scan(self): """ Attempts to add all users selected by current user to the autocomplete database. """ self.config["mysc"]["save_friends_in_autocompletion_db"] = self.dialog.get("friends") self.config["mysc"]["save_followers_in_autocompletion_db"] = self.dialog.get("followers") - output.speak(_("Updating database... You can close this window now. A message will tell you when the process finishes.")) + output.speak(_("Scanning account. Please wait or press cancel to stop.")) database = storage.storage(self.buffer.session.session_id) - percent = 0 - users = [] - if self.dialog.get("friends") == True: - first_page = self.buffer.session.api.account_following(id=self.buffer.session.db["user_id"], limit=80) - pub.sendMessage("on-update-progress") - if first_page != None: - for user in first_page: - users.append(user) - next_page = first_page - while next_page != None: - next_page = self.buffer.session.api.fetch_next(next_page) - pub.sendMessage("on-update-progress") - if next_page == None: - break - for user in next_page: - users.append(user) - # same step, but for followers. - if self.dialog.get("followers") == True: - first_page = self.buffer.session.api.account_followers(id=self.buffer.session.db["user_id"], limit=80) - pub.sendMessage("on-update-progress") - if first_page != None: - for user in first_page: - if user not in users: - users.append(user) - next_page = first_page - while next_page != None: - next_page = self.buffer.session.api.fetch_next(next_page) - pub.sendMessage("on-update-progress") - if next_page == None: - break - for user in next_page: - if user not in users: - users.append(user) -# except TweepyException: -# wx.CallAfter(wx_scan.show_error) -# return self.done() - for user in users: - name = user.display_name if user.display_name != None and user.display_name != "" else user.username - database.set_user(user.acct, name, 1) - wx.CallAfter(wx_scan .show_success, len(users)) + # Use a dictionary keyed by user ID for O(1) lookups instead of O(n) list comparisons + users_dict = {} + cancelled = False + try: + if self.dialog.get("friends") == True: + if not self._scan_users(ScanType.FOLLOWING, users_dict): + cancelled = True + if self.dialog.get("followers") == True and not cancelled: + if not self._scan_users(ScanType.FOLLOWERS, users_dict): + cancelled = True + except Exception as e: + log.exception(f"Error scanning account: {e}") + wx.CallAfter(wx_scan.show_error) + return self.done() + # Save users to database + users = list(users_dict.values()) + new_users_count = 0 + if len(users) > 0: + log.debug(f"Saving {len(users)} users to autocompletion database...") + wx.CallAfter(self.progress_dialog.set_saving_status) + for user in users: + name = user.display_name if user.display_name != None and user.display_name != "" else user.username + if database.set_user(user.acct, name, 1): + new_users_count += 1 + already_existed = len(users) - new_users_count + if cancelled: + log.info(f"Scan cancelled. Found {len(users)} users, {new_users_count} new, {already_existed} already in database.") + wx.CallAfter(wx_scan.show_cancelled, len(users), new_users_count) + else: + log.info(f"Successfully imported {new_users_count} new users ({already_existed} already existed).") + wx.CallAfter(wx_scan.show_success, len(users), new_users_count) self.done() def done(self): wx.CallAfter(self.progress_dialog.Destroy) wx.CallAfter(self.dialog.Destroy) - pub.unsubscribe(self.on_update_progress, "on-update-progress") def add_user(session, database, user): """ Adds an user to the database. """ diff --git a/src/extra/autocompletionUsers/mastodon/wx_scan.py b/src/extra/autocompletionUsers/mastodon/wx_scan.py index 8011f0625..cd0b736a7 100644 --- a/src/extra/autocompletionUsers/mastodon/wx_scan.py +++ b/src/extra/autocompletionUsers/mastodon/wx_scan.py @@ -3,6 +3,9 @@ import widgetUtils import application +def returnTrue(): + return True + class autocompletionScanDialog(widgetUtils.BaseDialog): def __init__(self): super(autocompletionScanDialog, self).__init__(parent=None, id=-1, title=_(u"Autocomplete users' settings")) @@ -22,13 +25,97 @@ def __init__(self): self.SetClientSize(sizer.CalcMin()) class autocompletionScanProgressDialog(widgetUtils.BaseDialog): - def __init__(self, *args, **kwargs): + def __init__(self, followers_count=0, following_count=0, *args, **kwargs): super(autocompletionScanProgressDialog, self).__init__(parent=None, id=wx.ID_ANY, title=_("Updating autocompletion database"), *args, **kwargs) - panel = wx.Panel(self) + self.cancelled = False + self.followers_count = followers_count + self.following_count = following_count + self.total_users = followers_count + following_count + panel = wx.Panel(self, style=wx.TAB_TRAVERSAL) sizer = wx.BoxSizer(wx.VERTICAL) - self.progress_bar = wx.Gauge(parent=panel) - sizer.Add(self.progress_bar) - panel.SetSizerAndFit(sizer) + # Account information label and field + info_label_text = wx.StaticText(panel, label=_("Account information:")) + sizer.Add(info_label_text, 0, wx.LEFT | wx.TOP, 5) + info_text = _("Followers: {followers} | Following: {following} | Total: {total}").format( + followers=followers_count, + following=following_count, + total=self.total_users + ) + self.info_field = wx.TextCtrl(panel, value=info_text, style=wx.TE_READONLY | wx.TE_PROCESS_TAB) + self.info_field.AcceptsFocusFromKeyboard = returnTrue + sizer.Add(self.info_field, 0, wx.ALL | wx.EXPAND, 5) + # Current status label and field + status_label_text = wx.StaticText(panel, label=_("Current status:")) + sizer.Add(status_label_text, 0, wx.LEFT | wx.TOP, 5) + self.status_field = wx.TextCtrl(panel, value=_("Preparing..."), style=wx.TE_READONLY | wx.TE_PROCESS_TAB) + self.status_field.AcceptsFocusFromKeyboard = returnTrue + sizer.Add(self.status_field, 0, wx.ALL | wx.EXPAND, 5) + # Progress label and field + progress_label_text = wx.StaticText(panel, label=_("Progress:")) + sizer.Add(progress_label_text, 0, wx.LEFT | wx.TOP, 5) + self.progress_field = wx.TextCtrl(panel, value="", style=wx.TE_READONLY | wx.TE_PROCESS_TAB) + self.progress_field.AcceptsFocusFromKeyboard = returnTrue + sizer.Add(self.progress_field, 0, wx.ALL | wx.EXPAND, 5) + # Progress bar + self.progress_bar = wx.Gauge(parent=panel, range=self.total_users if self.total_users > 0 else 100) + sizer.Add(self.progress_bar, 0, wx.ALL | wx.EXPAND, 5) + # Cancel button + self.cancel_button = wx.Button(panel, wx.ID_CANCEL, _("&Cancel")) + self.cancel_button.Bind(wx.EVT_BUTTON, self.on_cancel) + sizer.Add(self.cancel_button, 0, wx.ALL | wx.ALIGN_CENTER, 10) + panel.SetSizer(sizer) + self.SetClientSize(sizer.CalcMin()) + self.SetSize(400, -1) + # Handle window close - same as cancel, don't close immediately + self.Bind(wx.EVT_CLOSE, self.on_close) + + def on_close(self, event): + """Handle window close event (X button). Same as cancel, but prevent immediate close.""" + if not self.cancelled: + self.on_cancel() + # Don't call event.Skip() - this prevents the window from closing + # The window will be destroyed by done() after the scan thread finishes + + def on_cancel(self, event=None): + self.cancelled = True + self._update_field_preserving_cursor(self.status_field, _("Cancelling... Please wait.")) + self.cancel_button.Disable() + + def _update_field_preserving_cursor(self, field, new_value): + """Update a text field while preserving cursor position.""" + cursor_pos = field.GetInsertionPoint() + field.SetValue(new_value) + # Restore cursor, but don't go beyond the new text length + new_length = len(new_value) + field.SetInsertionPoint(min(cursor_pos, new_length)) + + def update_progress(self, current_users, current_page, scanning_type): + """Update the progress dialog with current scan status. + + :param current_users: Number of users scanned so far + :param current_page: Current page being processed + :param scanning_type: 'following' or 'followers' + """ + if scanning_type == "following": + status = _("Scanning following users...") + else: + status = _("Scanning followers...") + self._update_field_preserving_cursor(self.status_field, status) + progress_text = _("Page {page} | Users processed: {current} / {total}").format( + page=current_page, + current=current_users, + total=self.total_users + ) + self._update_field_preserving_cursor(self.progress_field, progress_text) + # Update progress bar + if self.total_users > 0: + self.progress_bar.SetValue(min(current_users, self.total_users)) + + def set_saving_status(self): + """Update status to show we're saving to database.""" + self._update_field_preserving_cursor(self.status_field, _("Saving users to database...")) + self._update_field_preserving_cursor(self.progress_field, "") + self.progress_bar.SetValue(self.total_users) def confirm(): with wx.MessageDialog(None, _("This process will retrieve the users you selected from your Mastodon account, and add them to the user autocomplete database. Please note that if there are many users or you have tried to perform this action less than 15 minutes ago, TWBlue may reach a limit in API calls when trying to load the users into the database. If this happens, we will show you an error, in which case you will have to try this process again in a few minutes. If this process ends with no error, you will be redirected back to the account settings dialog. Do you want to continue?"), _("Attention"), style=wx.ICON_QUESTION|wx.YES_NO) as result: @@ -36,8 +123,22 @@ def confirm(): return True return False -def show_success(users): - with wx.MessageDialog(None, _("TWBlue has imported {} users successfully.").format(users), _("Done")) as dlg: +def show_success(total_users, new_users): + already_existed = total_users - new_users + message = _("Scan completed. {new} new users imported, {existing} already in database.").format( + new=new_users, + existing=already_existed + ) + with wx.MessageDialog(None, message, _("Done")) as dlg: + dlg.ShowModal() + +def show_cancelled(total_users, new_users): + already_existed = total_users - new_users + message = _("Operation cancelled. {new} new users imported, {existing} already in database.").format( + new=new_users, + existing=already_existed + ) + with wx.MessageDialog(None, message, _("Cancelled"), style=wx.ICON_INFORMATION) as dlg: dlg.ShowModal() def show_error(): diff --git a/src/extra/autocompletionUsers/storage.py b/src/extra/autocompletionUsers/storage.py index 4ac80ebfd..055bf0bda 100644 --- a/src/extra/autocompletionUsers/storage.py +++ b/src/extra/autocompletionUsers/storage.py @@ -25,8 +25,13 @@ def get_users(self, term): return self.cursor.fetchall() def set_user(self, screen_name, user_name, from_a_buffer): + """Insert a user into the database. + + :returns: True if the user was inserted (new), False if already existed. + """ self.cursor.execute("""insert or ignore into users values(?, ?, ?)""", (screen_name, user_name, from_a_buffer)) self.connection.commit() + return self.cursor.rowcount > 0 def remove_user(self, user): self.cursor.execute("""DELETE FROM users WHERE user = ?""", (user,)) @@ -48,5 +53,8 @@ def create_table(self): )""") def __del__(self): - self.cursor.close() - self.connection.close() + try: + self.cursor.close() + self.connection.close() + except Exception: + pass # Already closed diff --git a/src/test/extra/__init__.py b/src/test/extra/__init__.py new file mode 100644 index 000000000..40a96afc6 --- /dev/null +++ b/src/test/extra/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/test/extra/autocompletionUsers/__init__.py b/src/test/extra/autocompletionUsers/__init__.py new file mode 100644 index 000000000..40a96afc6 --- /dev/null +++ b/src/test/extra/autocompletionUsers/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/test/extra/autocompletionUsers/test_mastodon_scan.py b/src/test/extra/autocompletionUsers/test_mastodon_scan.py new file mode 100644 index 000000000..c91a975e7 --- /dev/null +++ b/src/test/extra/autocompletionUsers/test_mastodon_scan.py @@ -0,0 +1,201 @@ +# -*- coding: utf-8 -*- +"""Tests for the Mastodon autocompletion scan module.""" +import sys +import types +import pytest +import time +from unittest import mock + +# Mock wx module before importing scan +wx_module = types.ModuleType("wx") +wx_module.CallAfter = mock.MagicMock() +wx_module.Panel = mock.MagicMock() +wx_module.BoxSizer = mock.MagicMock() +wx_module.StaticText = mock.MagicMock() +wx_module.TextCtrl = mock.MagicMock() +wx_module.Gauge = mock.MagicMock() +wx_module.Button = mock.MagicMock() +wx_module.MessageDialog = mock.MagicMock() +wx_module.VERTICAL = 0 +wx_module.HORIZONTAL = 1 +wx_module.ALL = 0 +wx_module.EXPAND = 0 +wx_module.LEFT = 0 +wx_module.TOP = 0 +wx_module.ALIGN_CENTER = 0 +wx_module.TE_READONLY = 0 +wx_module.TE_PROCESS_TAB = 0 +wx_module.TAB_TRAVERSAL = 0 +wx_module.ID_ANY = -1 +wx_module.ID_CANCEL = 0 +wx_module.ID_OK = 0 +wx_module.ID_YES = 0 +wx_module.EVT_BUTTON = mock.MagicMock() +wx_module.EVT_CLOSE = mock.MagicMock() +wx_module.ICON_QUESTION = 0 +wx_module.ICON_INFORMATION = 0 +wx_module.ICON_ERROR = 0 +wx_module.YES_NO = 0 +sys.modules["wx"] = wx_module + +# Mock widgetUtils +widgetUtils_module = types.ModuleType("widgetUtils") +widgetUtils_module.BaseDialog = mock.MagicMock() +widgetUtils_module.OK = 1 +sys.modules["widgetUtils"] = widgetUtils_module + +# Mock output +output_module = types.ModuleType("output") +output_module.speak = mock.MagicMock() +sys.modules["output"] = output_module + +# Mock pubsub +pub_module = types.ModuleType("pubsub") +pub_module.pub = mock.MagicMock() +sys.modules["pubsub"] = pub_module +sys.modules["pubsub.pub"] = pub_module.pub + +# Mock application +application_module = types.ModuleType("application") +sys.modules["application"] = application_module + + +class MockUser: + """Mock Mastodon user object.""" + def __init__(self, user_id, username, display_name="", acct=None): + self.id = user_id + self.username = username + self.display_name = display_name + self.acct = acct or f"{username}@instance.social" + + +### Tests for the scan logic (not UI) + +def test_user_dict_prevents_duplicates(): + """Test that using a dict with user.id as key prevents duplicates.""" + users_dict = {} + + # Simulate adding users from following + user1 = MockUser(1, "alice", "Alice") + user2 = MockUser(2, "bob", "Bob") + + users_dict[user1.id] = user1 + users_dict[user2.id] = user2 + + # Simulate adding same users from followers (should not duplicate) + user1_again = MockUser(1, "alice", "Alice Updated") # Same ID + user3 = MockUser(3, "charlie", "Charlie") + + if user1_again.id not in users_dict: + users_dict[user1_again.id] = user1_again + if user3.id not in users_dict: + users_dict[user3.id] = user3 + + # Should have 3 unique users, not 4 + assert len(users_dict) == 3 + # Original alice should be preserved (not updated) + assert users_dict[1].display_name == "Alice" + +def test_user_dict_performance(): + """Test that dict lookups are O(1) - should complete instantly.""" + users_dict = {} + + # Add 10,000 users + for i in range(10000): + user = MockUser(i, f"user{i}", f"User {i}") + users_dict[user.id] = user + + # Now check 10,000 more users for duplicates + start_time = time.time() + for i in range(10000): + user = MockUser(i, f"user{i}", f"User {i}") + if user.id not in users_dict: + users_dict[user.id] = user + elapsed = time.time() - start_time + + # This should complete in well under 1 second with O(1) lookups + # The original O(n²) implementation would take minutes + assert elapsed < 1.0, f"Duplicate check took {elapsed}s, expected < 1s" + +def test_display_name_fallback_to_username(): + """Test that username is used when display_name is empty.""" + user_with_display = MockUser(1, "alice", "Alice Smith") + user_without_display = MockUser(2, "bob", "") + user_with_none = MockUser(3, "charlie", None) + + def get_name(user): + return user.display_name if user.display_name != None and user.display_name != "" else user.username + + assert get_name(user_with_display) == "Alice Smith" + assert get_name(user_without_display) == "bob" + assert get_name(user_with_none) == "charlie" + +def test_new_users_count(): + """Test counting new vs existing users.""" + # Simulate the storage.set_user return value pattern + existing_users = {"user1@instance.social", "user2@instance.social"} + + def mock_set_user(acct, name, from_buffer): + """Returns True if new, False if already existed.""" + if acct in existing_users: + return False + existing_users.add(acct) + return True + + users_to_add = [ + ("user1@instance.social", "User One"), # existing + ("user2@instance.social", "User Two"), # existing + ("user3@instance.social", "User Three"), # new + ("user4@instance.social", "User Four"), # new + ] + + new_count = 0 + for acct, name in users_to_add: + if mock_set_user(acct, name, 1): + new_count += 1 + + assert new_count == 2 + total = len(users_to_add) + already_existed = total - new_count + assert already_existed == 2 + + +### Tests for progress dialog calculations + +def test_total_users_calculation(): + """Test that total users is sum of followers and following.""" + followers_count = 500 + following_count = 300 + total = followers_count + following_count + assert total == 800 + +def test_progress_bar_value_capped(): + """Test that progress bar value doesn't exceed total.""" + total_users = 100 + + # Simulate progress bar update + def get_progress_value(current_users, total): + return min(current_users, total) + + assert get_progress_value(50, total_users) == 50 + assert get_progress_value(100, total_users) == 100 + assert get_progress_value(150, total_users) == 100 # Capped + +def test_cursor_preservation_logic(): + """Test cursor position preservation when text changes.""" + old_text = "Page 1 | Users: 50" + new_text = "Page 2 | Users: 100" + cursor_pos = 5 # Somewhere in the middle + + new_length = len(new_text) + restored_pos = min(cursor_pos, new_length) + + assert restored_pos == 5 # Position preserved + + # Test with cursor beyond new text length + short_new_text = "Done" + new_length = len(short_new_text) + cursor_pos = 10 + restored_pos = min(cursor_pos, new_length) + + assert restored_pos == 4 # Capped to new length diff --git a/src/test/extra/autocompletionUsers/test_storage.py b/src/test/extra/autocompletionUsers/test_storage.py new file mode 100644 index 000000000..049f4c1c0 --- /dev/null +++ b/src/test/extra/autocompletionUsers/test_storage.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +"""Tests for the autocompletion storage module.""" +import os +import pytest +import tempfile +import shutil + +# path where we will save our test config +temp_base_dir = None + +@pytest.fixture +def temp_config_dir(): + """Create a temporary directory for test database.""" + global temp_base_dir + temp_base_dir = tempfile.mkdtemp() + session_dir = os.path.join(temp_base_dir, "test_session") + os.makedirs(session_dir) + yield temp_base_dir + # Cleanup handled by storage_instance fixture + +@pytest.fixture +def storage_instance(temp_config_dir, monkeypatch): + """Create a storage instance with mocked paths.""" + # Mock paths.config_path to return our temp directory + import paths + monkeypatch.setattr(paths, "config_path", lambda: temp_config_dir) + + from extra.autocompletionUsers import storage + s = storage.storage("test_session") + yield s + # Explicitly close database connection before cleanup + s.cursor.close() + s.connection.close() + # Now cleanup temp directory + if os.path.exists(temp_config_dir): + shutil.rmtree(temp_config_dir) + +def test_create_table(storage_instance): + """Test that the users table is created.""" + assert storage_instance.table_exist("users") == True + +def test_set_user_new(storage_instance): + """Test adding a new user returns True.""" + result = storage_instance.set_user("user1@instance.social", "User One", 1) + assert result == True + +def test_set_user_duplicate(storage_instance): + """Test adding a duplicate user returns False.""" + storage_instance.set_user("user1@instance.social", "User One", 1) + result = storage_instance.set_user("user1@instance.social", "User One Updated", 1) + assert result == False + +def test_get_users(storage_instance): + """Test searching for users.""" + storage_instance.set_user("alice@instance.social", "Alice Smith", 1) + storage_instance.set_user("bob@instance.social", "Bob Jones", 1) + storage_instance.set_user("charlie@instance.social", "Charlie Brown", 1) + + # Search by username + results = storage_instance.get_users("alice") + assert len(results) == 1 + assert results[0][0] == "alice@instance.social" + + # Search by display name + results = storage_instance.get_users("jones") + assert len(results) == 1 + assert results[0][1] == "Bob Jones" + +def test_get_all_users(storage_instance): + """Test getting all users.""" + storage_instance.set_user("user1@instance.social", "User One", 1) + storage_instance.set_user("user2@instance.social", "User Two", 1) + storage_instance.set_user("user3@instance.social", "User Three", 1) + + results = storage_instance.get_all_users() + assert len(results) == 3 + +def test_remove_user(storage_instance): + """Test removing a user.""" + storage_instance.set_user("user1@instance.social", "User One", 1) + storage_instance.set_user("user2@instance.social", "User Two", 1) + + storage_instance.remove_user("user1@instance.social") + + results = storage_instance.get_all_users() + assert len(results) == 1 + assert results[0][0] == "user2@instance.social" + +def test_remove_by_buffer(storage_instance): + """Test removing users by buffer type.""" + storage_instance.set_user("friend1@instance.social", "Friend One", 1) + storage_instance.set_user("friend2@instance.social", "Friend Two", 1) + storage_instance.set_user("manual@instance.social", "Manual User", 0) + + # Remove all users added from buffer type 1 (friends/following) + storage_instance.remove_by_buffer(1) + + results = storage_instance.get_all_users() + assert len(results) == 1 + assert results[0][0] == "manual@instance.social" + +def test_case_insensitive_search(storage_instance): + """Test that search is case insensitive.""" + storage_instance.set_user("Alice@Instance.Social", "ALICE SMITH", 1) + + # Search with different cases + results = storage_instance.get_users("alice") + assert len(results) == 1 + + results = storage_instance.get_users("ALICE") + assert len(results) == 1 + + results = storage_instance.get_users("Alice") + assert len(results) == 1 +