Skip to content

Commit

Permalink
Fix datetime offset comparison error (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
somethingnew2-0 authored Apr 18, 2024
1 parent 814ede8 commit a9c1348
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 5 deletions.
2 changes: 1 addition & 1 deletion api/models/tag.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ def coalesce_ended_at(
if initial_ended_at is None:
return constraint_ended_at
else:
return min(constraint_ended_at, initial_ended_at)
return min(constraint_ended_at, initial_ended_at.replace(tzinfo=UTC))
3 changes: 2 additions & 1 deletion api/operations/modify_group_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,8 @@ async def _execute(self) -> OktaGroup:
associated_users_ended_at = role_associated_group_map.ended_at
else:
associated_users_ended_at = (
min(self.members_added_ended_at, role_associated_group_map.ended_at.replace(tzinfo=UTC))
min(self.members_added_ended_at.replace(tzinfo=UTC),
role_associated_group_map.ended_at.replace(tzinfo=UTC))
)

access_to_add = OktaUserGroupMember(
Expand Down
8 changes: 5 additions & 3 deletions api/operations/modify_role_groups.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from datetime import datetime
from datetime import UTC, datetime
from typing import Dict, Optional

from flask import current_app, has_request_context, request
Expand Down Expand Up @@ -342,7 +342,8 @@ async def _execute(self) -> RoleGroup:
associated_users_ended_at = role_associated_group_map.ended_at
else:
associated_users_ended_at = (
min(member.ended_at, role_associated_group_map.ended_at)
min(member.ended_at.replace(tzinfo=UTC),
role_associated_group_map.ended_at.replace(tzinfo=UTC))
)

membership_to_add = OktaUserGroupMember(
Expand Down Expand Up @@ -380,7 +381,8 @@ async def _execute(self) -> RoleGroup:
associated_users_ended_at = role_associated_group_map.ended_at
else:
associated_users_ended_at = (
min(member.ended_at, role_associated_group_map.ended_at)
min(member.ended_at.replace(tzinfo=UTC),
role_associated_group_map.ended_at.replace(tzinfo=UTC))
)
ownership_to_add = OktaUserGroupMember(
user_id=member.user_id,
Expand Down
67 changes: 67 additions & 0 deletions tests/test_access_request.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime, timedelta
from typing import Any

from flask import Flask, url_for
Expand Down Expand Up @@ -29,6 +30,9 @@
from api.services import okta
from tests.factories import AccessRequestFactory, AppGroupFactory, OktaUserFactory

SEVEN_DAYS_IN_SECONDS = 7 * 24 * 60 * 60
THREE_DAYS_IN_SECONDS = 3 * 24 * 60 * 60
ONE_DAY_IN_SECONDS = 24 * 60 * 60

def test_get_access_request(app: Flask, client: FlaskClient, db: SQLAlchemy, mocker: MockerFixture, okta_group: OktaGroup, role_group: RoleGroup, user: OktaUser) -> None:
# test 404
Expand Down Expand Up @@ -674,6 +678,69 @@ def test_auto_resolve_create_access_request(app: Flask,
assert request_created_conditional_access_spy.call_count == 1
assert add_membership_spy.call_count == 0

_, kwargs = request_created_conditional_access_spy.call_args
assert access_request == kwargs['access_request']
assert okta_group == kwargs['group']
assert user == kwargs['requester']
assert len(kwargs['group_tags']) == 1
assert tag in kwargs['group_tags']

def test_auto_resolve_create_access_request_with_time_limit_constraint_tag(
app: Flask,
db: SQLAlchemy,
okta_group: OktaGroup,
user: OktaUser,
tag: Tag,
mocker: MockerFixture) -> None:

db.session.add(user)
db.session.add(okta_group)
tag.constraints = {
Tag.MEMBER_TIME_LIMIT_CONSTRAINT_KEY: THREE_DAYS_IN_SECONDS,
Tag.OWNER_TIME_LIMIT_CONSTRAINT_KEY: THREE_DAYS_IN_SECONDS
}
db.session.add(tag)
db.session.commit()

db.session.add(OktaGroupTagMap(group_id=okta_group.id, tag_id=tag.id))
db.session.commit()

notification_hook = get_notification_hook()
request_created_notification_spy = mocker.patch.object(
notification_hook, "access_request_created"
)
request_completed_notification_spy = mocker.patch.object(
notification_hook, "access_request_completed"
)
request_hook = get_conditional_access_hook()
request_created_conditional_access_spy = mocker.patch.object(
request_hook, "access_request_created", return_value=[
ConditionalAccessResponse(
approved=True,
reason="Auto-Approved",
ending_at=datetime.now() + timedelta(seconds=SEVEN_DAYS_IN_SECONDS)
),
]
)
add_membership_spy = mocker.patch.object(okta, "async_add_user_to_group")

access_request = CreateAccessRequest(
requester_user=user,
requested_group=okta_group,
request_ownership=False,
request_reason="test reason",
).execute()

assert access_request is not None
assert access_request.status == AccessRequestStatus.APPROVED
assert access_request.resolved_at is not None
assert access_request.resolver_user_id is None
assert access_request.resolution_reason == "Auto-Approved"
assert request_created_notification_spy.call_count == 0
assert request_completed_notification_spy.call_count == 0
assert request_created_conditional_access_spy.call_count == 1
assert add_membership_spy.call_count == 1

_, kwargs = request_created_conditional_access_spy.call_args
assert access_request == kwargs['access_request']
assert okta_group == kwargs['group']
Expand Down

0 comments on commit a9c1348

Please sign in to comment.