|
4 | 4 |
|
5 | 5 | from django.contrib.auth import get_user_model |
6 | 6 | from django.contrib.auth.models import Group |
7 | | -from django.core.management import call_command |
| 7 | +from django.core.management import CommandError, call_command |
8 | 8 | from django.test import TestCase |
9 | 9 |
|
10 | 10 | from openedx_authz.api.users import batch_unassign_role_from_users, get_user_role_assignments_in_scope |
@@ -762,6 +762,73 @@ def test_migrate_legacy_course_roles_to_authz_using_org_id(self): |
762 | 762 |
|
763 | 763 | # Must be different before and after migration since we set delete_after_migration |
764 | 764 | # to True and we are deleting all |
| 765 | + # Now let's rollback |
| 766 | + |
| 767 | + # Capture the state of permissions before rollback to verify that rollback restores the original state |
| 768 | + original_state_user_subjects = list( |
| 769 | + UserSubject.objects.filter(casbin_rules__scope__coursescope__course_overview__isnull=False) |
| 770 | + .distinct() |
| 771 | + .order_by("id") |
| 772 | + .values("id", "user_id") |
| 773 | + ) |
| 774 | + original_state_access_roles = list( |
| 775 | + CourseAccessRole.objects.all().order_by("id").values("id", "user_id", "org", "course_id", "role") |
| 776 | + ) |
| 777 | + |
| 778 | + permissions_with_errors, permissions_with_no_errors = migrate_authz_to_legacy_course_roles( |
| 779 | + CourseAccessRole, UserSubject, course_id_list=None, org_id=self.org, delete_after_migration=True |
| 780 | + ) |
| 781 | + |
| 782 | + # Check that each user has the expected legacy role after rollback |
| 783 | + # and that errors are logged for any permissions that could not be rolled back |
| 784 | + for user in self.admin_users: |
| 785 | + assignments = get_user_role_assignments_in_scope( |
| 786 | + user_external_key=user.username, scope_external_key=self.course_id |
| 787 | + ) |
| 788 | + self.assertEqual(len(assignments), 0) |
| 789 | + for user in self.staff_users: |
| 790 | + assignments = get_user_role_assignments_in_scope( |
| 791 | + user_external_key=user.username, scope_external_key=self.course_id |
| 792 | + ) |
| 793 | + self.assertEqual(len(assignments), 0) |
| 794 | + for user in self.limited_staff: |
| 795 | + assignments = get_user_role_assignments_in_scope( |
| 796 | + user_external_key=user.username, scope_external_key=self.course_id |
| 797 | + ) |
| 798 | + self.assertEqual(len(assignments), 0) |
| 799 | + for user in self.data_researcher: |
| 800 | + assignments = get_user_role_assignments_in_scope( |
| 801 | + user_external_key=user.username, scope_external_key=self.course_id |
| 802 | + ) |
| 803 | + self.assertEqual(len(assignments), 0) |
| 804 | + |
| 805 | + self.assertEqual(len(permissions_with_errors), 0) |
| 806 | + self.assertEqual(len(permissions_with_no_errors), 12) |
| 807 | + |
| 808 | + state_after_migration_user_subjects = list( |
| 809 | + UserSubject.objects.filter(casbin_rules__scope__coursescope__course_overview__isnull=False) |
| 810 | + .distinct() |
| 811 | + .order_by("id") |
| 812 | + .values("id", "user_id") |
| 813 | + ) |
| 814 | + after_migrate_state_access_roles = list( |
| 815 | + CourseAccessRole.objects.all().order_by("id").values("id", "user_id", "org", "course_id", "role") |
| 816 | + ) |
| 817 | + |
| 818 | + # Before the rollback, we should only have the 1 invalid role entry |
| 819 | + # since we set delete_after_migration to True in the migration. |
| 820 | + self.assertEqual(len(original_state_access_roles), 1) |
| 821 | + |
| 822 | + # All original entries + 3 users * 4 roles = 12 |
| 823 | + # plus the original invalid entry = 1 + 12 = 13 total entries |
| 824 | + self.assertEqual(len(after_migrate_state_access_roles), 1 + 12) |
| 825 | + |
| 826 | + # Sanity check to ensure we have the expected number of UserSubjects related to |
| 827 | + # the course permissions before migration (3 users * 4 roles = 12) |
| 828 | + self.assertEqual(len(original_state_user_subjects), 12) |
| 829 | + |
| 830 | + # After rollback, we should have 0 UserSubjects related to the course permissions |
| 831 | + self.assertEqual(len(state_after_migration_user_subjects), 0) |
765 | 832 |
|
766 | 833 | @patch("openedx_authz.api.data.CourseOverview", CourseOverview) |
767 | 834 | def test_migrate_authz_to_legacy_course_roles_with_no_org_and_courses(self): |
@@ -838,3 +905,25 @@ def test_authz_rollback_course_authoring_command(self, mock_rollback): |
838 | 905 | call_kwargs = mock_rollback.call_args_list[0][1] |
839 | 906 |
|
840 | 907 | self.assertEqual(call_kwargs["delete_after_migration"], True) |
| 908 | + |
| 909 | + @patch("openedx_authz.management.commands.authz_migrate_course_authoring.CourseAccessRole", CourseAccessRole) |
| 910 | + @patch("openedx_authz.management.commands.authz_migrate_course_authoring.migrate_legacy_course_roles_to_authz") |
| 911 | + def test_authz_migrate_course_authoring_command_no_org_and_courses(self): |
| 912 | + """ |
| 913 | + Verify that the authz_migrate_course_authoring command raises an error |
| 914 | + when neither course_id_list nor org_id is provided. |
| 915 | + """ |
| 916 | + |
| 917 | + with self.assertRaises(CommandError): |
| 918 | + call_command("authz_migrate_course_authoring") |
| 919 | + |
| 920 | + @patch("openedx_authz.management.commands.authz_migrate_course_authoring.CourseAccessRole", CourseAccessRole) |
| 921 | + @patch("openedx_authz.management.commands.authz_migrate_course_authoring.migrate_legacy_course_roles_to_authz") |
| 922 | + def test_authz_migrate_course_authoring_command_with_org_and_courses(self): |
| 923 | + """ |
| 924 | + Verify that the authz_migrate_course_authoring command raises an error |
| 925 | + when both course_id_list and org_id are provided. |
| 926 | + """ |
| 927 | + |
| 928 | + with self.assertRaises(CommandError): |
| 929 | + call_command("authz_migrate_course_authoring", "--course-id-list", self.course_id, "--org-id", self.org) |
0 commit comments