diff --git a/src/python_picnic_api2/__init__.py b/src/python_picnic_api2/__init__.py index 661bf07..44f0789 100644 --- a/src/python_picnic_api2/__init__.py +++ b/src/python_picnic_api2/__init__.py @@ -1,6 +1,7 @@ from .client import PicnicAPI +from .session import Picnic2FAError, Picnic2FARequired -__all__ = ["PicnicAPI"] +__all__ = ["PicnicAPI", "Picnic2FAError", "Picnic2FARequired"] __title__ = "python-picnic-api" __version__ = "1.1.0" __author__ = "Mike Brink" diff --git a/src/python_picnic_api2/client.py b/src/python_picnic_api2/client.py index 21a3405..d1039b6 100644 --- a/src/python_picnic_api2/client.py +++ b/src/python_picnic_api2/client.py @@ -10,7 +10,12 @@ _url_generator, find_nodes_by_content, ) -from .session import PicnicAPISession, PicnicAuthError +from .session import ( + Picnic2FAError, + Picnic2FARequired, + PicnicAPISession, + PicnicAuthError, +) DEFAULT_URL = "https://storefront-prod.{}.picnicinternational.com/api/{}" GLOBAL_GATEWAY_URL = "https://gateway-prod.global.picnicinternational.com" @@ -60,14 +65,18 @@ def _get(self, path: str, add_picnic_headers=False): return response - def _post(self, path: str, data=None, base_url_override=None): + def _post( + self, path: str, data=None, base_url_override=None, add_picnic_headers=False + ): url = (base_url_override if base_url_override else self._base_url) + path - response = self.session.post(url, json=data).json() + kwargs = {"json": data} + if add_picnic_headers: + kwargs["headers"] = _HEADERS + response = self.session.post(url, **kwargs).json() if self._contains_auth_error(response): raise PicnicAuthError( - f"Picnic authentication error: \ - {response['error'].get('message')}" + f"Picnic authentication error: {response['error'].get('message')}" ) return response @@ -80,12 +89,82 @@ def _contains_auth_error(response): error_code = response.setdefault("error", {}).get("code") return error_code == "AUTH_ERROR" or error_code == "AUTH_INVALID_CRED" + @staticmethod + def _requires_2fa(response): + if not isinstance(response, dict): + return False + + error_code = response.get("error", {}).get("code") + return error_code == "TWO_FACTOR_AUTHENTICATION_REQUIRED" + def login(self, username: str, password: str): path = "/user/login" secret = md5(password.encode("utf-8")).hexdigest() data = {"key": username, "secret": secret, "client_id": 30100} - return self._post(path, data) + response = self._post(path, data, add_picnic_headers=True) + + if self._requires_2fa(response): + raise Picnic2FARequired( + message=response.get("error", {}).get( + "message", "Two-factor authentication required" + ), + response=response, + ) + + return response + + def _post_2fa(self, path: str, data=None): + """POST for 2FA endpoints that may return empty (204) or JSON error bodies.""" + url = self._base_url + path + response = self.session.post(url, json=data, headers=_HEADERS) + + if response.status_code == 204 or not response.content: + return None + + json_body = response.json() + + # This should not happen because password auth is already done + # at this point, but just in case. + if self._contains_auth_error(json_body): + raise PicnicAuthError( + f"Picnic authentication error: {json_body['error'].get('message')}" + ) + + error = json_body.get("error", {}) + if error.get("code"): + raise Picnic2FAError( + message=error.get("message", "Two-factor authentication failed"), + code=error["code"], + ) + + return json_body + + def generate_2fa_code(self, channel: str = "SMS"): + """Request a 2FA code to be sent via the specified channel. + + Args: + channel: The delivery channel ("SMS" or "EMAIL"). + + Raises: + Picnic2FAError: If the server returns an error (e.g. invalid channel). + """ + path = "/user/2fa/generate" + data = {"channel": channel} + self._post_2fa(path, data) + + def verify_2fa_code(self, code: str): + """Verify the 2FA code to complete authentication. + + Args: + code: The OTP code received via SMS or email. + + Raises: + Picnic2FAError: If the OTP code is invalid. + """ + path = "/user/2fa/verify" + data = {"otp": code} + self._post_2fa(path, data) def logged_in(self): return self.session.authenticated diff --git a/src/python_picnic_api2/session.py b/src/python_picnic_api2/session.py index 5a8debc..262c25d 100644 --- a/src/python_picnic_api2/session.py +++ b/src/python_picnic_api2/session.py @@ -5,6 +5,30 @@ class PicnicAuthError(Exception): """Indicates an error when authenticating to the Picnic API.""" +class Picnic2FARequired(Exception): + """Indicates that two-factor authentication is required.""" + + def __init__( + self, + message: str = "Two-factor authentication required", + response: dict = None, + ): + super().__init__(message) + self.response = response or {} + + +class Picnic2FAError(Exception): + """Indicates an error during two-factor authentication (e.g. invalid OTP).""" + + def __init__( + self, + message: str = "Two-factor authentication failed", + code: str = None, + ): + super().__init__(message) + self.code = code + + class PicnicAPISession(Session): AUTH_HEADER = "x-picnic-auth" @@ -52,4 +76,4 @@ def post(self, url, data=None, json=None, **kwargs) -> Response: return response -__all__ = ["PicnicAuthError", "PicnicAPISession"] +__all__ = ["PicnicAuthError", "Picnic2FARequired", "Picnic2FAError", "PicnicAPISession"] diff --git a/tests/test_client.py b/tests/test_client.py index 5e340a0..3d49570 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -5,7 +5,11 @@ from python_picnic_api2 import PicnicAPI from python_picnic_api2.client import DEFAULT_URL -from python_picnic_api2.session import PicnicAuthError +from python_picnic_api2.session import ( + Picnic2FAError, + Picnic2FARequired, + PicnicAuthError, +) PICNIC_HEADERS = { "x-picnic-agent": "30100;1.206.1-#15408", @@ -15,9 +19,10 @@ class TestClient(unittest.TestCase): class MockResponse: - def __init__(self, json_data, status_code): + def __init__(self, json_data, status_code, content=b"data"): self.json_data = json_data self.status_code = status_code + self.content = content def json(self): return self.json_data @@ -42,6 +47,7 @@ def test_login_credentials(self): "secret": "098f6bcd4621d373cade4e832627b4f6", "client_id": 30100, }, + headers=PICNIC_HEADERS, ) def test_login_auth_token(self): @@ -334,3 +340,86 @@ def test_post_auth_exception(self): with self.assertRaises(PicnicAuthError): self.client.clear_cart() + + def test_login_requires_2fa(self): + response = { + "error": { + "code": "TWO_FACTOR_AUTHENTICATION_REQUIRED", + "message": "User must verify their second factor", + "details": {}, + } + } + self.session_mock().post.return_value = self.MockResponse(response, 200) + + client = PicnicAPI() + with self.assertRaises(Picnic2FARequired) as ctx: + client.login("test-user", "test-password") + self.assertEqual( + str(ctx.exception), "User must verify their second factor" + ) + self.assertEqual(ctx.exception.response, response) + + def test_generate_2fa_code(self): + self.session_mock().post.return_value = self.MockResponse( + None, 204, content=b"" + ) + + result = self.client.generate_2fa_code() + self.session_mock().post.assert_called_with( + self.expected_base_url + "/user/2fa/generate", + json={"channel": "SMS"}, + headers=PICNIC_HEADERS, + ) + self.assertIsNone(result) + + def test_generate_2fa_code_email(self): + self.session_mock().post.return_value = self.MockResponse( + None, 204, content=b"" + ) + + self.client.generate_2fa_code(channel="EMAIL") + self.session_mock().post.assert_called_with( + self.expected_base_url + "/user/2fa/generate", + json={"channel": "EMAIL"}, + headers=PICNIC_HEADERS, + ) + + def test_verify_2fa_code_success(self): + self.session_mock().post.return_value = self.MockResponse( + None, 204, content=b"" + ) + + result = self.client.verify_2fa_code("123456") + self.session_mock().post.assert_called_with( + self.expected_base_url + "/user/2fa/verify", + json={"otp": "123456"}, + headers=PICNIC_HEADERS, + ) + self.assertIsNone(result) + + def test_verify_2fa_code_invalid(self): + response = { + "error": { + "code": "OTP_NOT_VALID", + "message": "Otp is not valid", + "details": {}, + } + } + self.session_mock().post.return_value = self.MockResponse(response, 200) + + with self.assertRaises(Picnic2FAError) as ctx: + self.client.verify_2fa_code("000000") + self.assertEqual(str(ctx.exception), "Otp is not valid") + self.assertEqual(ctx.exception.code, "OTP_NOT_VALID") + + def test_2fa_auth_error(self): + response = { + "error": { + "code": "AUTH_ERROR", + "message": "Authentication failed.", + } + } + self.session_mock().post.return_value = self.MockResponse(response, 400) + + with self.assertRaises(PicnicAuthError): + self.client.verify_2fa_code("123456")