Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/python_picnic_api2/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .client import PicnicAPI
from .session import Picnic2FAError, Picnic2FARequired

__all__ = ["PicnicAPI"]
__all__ = ["PicnicAPI", "Picnic2FAError", "Picnic2FARequired"]
__title__ = "python-picnic-api"
__version__ = "1.1.0"
__author__ = "Mike Brink"
91 changes: 85 additions & 6 deletions src/python_picnic_api2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
_url_generator,
find_nodes_by_content,
)
from .session import PicnicAPISession, PicnicAuthError
from .session import (
Picnic2FAError,
Picnic2FARequired,
PicnicAPISession,
PicnicAuthError,
)

DEFAULT_URL = "https://storefront-prod.{}.picnicinternational.com/api/{}"
GLOBAL_GATEWAY_URL = "https://gateway-prod.global.picnicinternational.com"
Expand Down Expand Up @@ -60,14 +65,18 @@ def _get(self, path: str, add_picnic_headers=False):

return response

def _post(self, path: str, data=None, base_url_override=None):
def _post(
self, path: str, data=None, base_url_override=None, add_picnic_headers=False
):
url = (base_url_override if base_url_override else self._base_url) + path
response = self.session.post(url, json=data).json()
kwargs = {"json": data}
if add_picnic_headers:
kwargs["headers"] = _HEADERS
response = self.session.post(url, **kwargs).json()

if self._contains_auth_error(response):
raise PicnicAuthError(
f"Picnic authentication error: \
{response['error'].get('message')}"
f"Picnic authentication error: {response['error'].get('message')}"
)

return response
Expand All @@ -80,12 +89,82 @@ def _contains_auth_error(response):
error_code = response.setdefault("error", {}).get("code")
return error_code == "AUTH_ERROR" or error_code == "AUTH_INVALID_CRED"

@staticmethod
def _requires_2fa(response):
if not isinstance(response, dict):
return False

error_code = response.get("error", {}).get("code")
return error_code == "TWO_FACTOR_AUTHENTICATION_REQUIRED"

def login(self, username: str, password: str):
path = "/user/login"
secret = md5(password.encode("utf-8")).hexdigest()
data = {"key": username, "secret": secret, "client_id": 30100}

return self._post(path, data)
response = self._post(path, data, add_picnic_headers=True)

if self._requires_2fa(response):
raise Picnic2FARequired(
message=response.get("error", {}).get(
"message", "Two-factor authentication required"
),
response=response,
)

return response

def _post_2fa(self, path: str, data=None):
"""POST for 2FA endpoints that may return empty (204) or JSON error bodies."""
url = self._base_url + path
response = self.session.post(url, json=data, headers=_HEADERS)

if response.status_code == 204 or not response.content:
return None

json_body = response.json()

# This should not happen because password auth is already done
# at this point, but just in case.
if self._contains_auth_error(json_body):
raise PicnicAuthError(
f"Picnic authentication error: {json_body['error'].get('message')}"
)

error = json_body.get("error", {})
if error.get("code"):
raise Picnic2FAError(
message=error.get("message", "Two-factor authentication failed"),
code=error["code"],
)

return json_body

def generate_2fa_code(self, channel: str = "SMS"):
"""Request a 2FA code to be sent via the specified channel.

Args:
channel: The delivery channel ("SMS" or "EMAIL").

Raises:
Picnic2FAError: If the server returns an error (e.g. invalid channel).
"""
path = "/user/2fa/generate"
data = {"channel": channel}
self._post_2fa(path, data)

def verify_2fa_code(self, code: str):
"""Verify the 2FA code to complete authentication.

Args:
code: The OTP code received via SMS or email.

Raises:
Picnic2FAError: If the OTP code is invalid.
"""
path = "/user/2fa/verify"
data = {"otp": code}
self._post_2fa(path, data)

def logged_in(self):
return self.session.authenticated
Expand Down
26 changes: 25 additions & 1 deletion src/python_picnic_api2/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,30 @@ class PicnicAuthError(Exception):
"""Indicates an error when authenticating to the Picnic API."""


class Picnic2FARequired(Exception):
"""Indicates that two-factor authentication is required."""

def __init__(
self,
message: str = "Two-factor authentication required",
response: dict = None,
):
super().__init__(message)
self.response = response or {}


class Picnic2FAError(Exception):
"""Indicates an error during two-factor authentication (e.g. invalid OTP)."""

def __init__(
self,
message: str = "Two-factor authentication failed",
code: str = None,
):
super().__init__(message)
self.code = code


class PicnicAPISession(Session):
AUTH_HEADER = "x-picnic-auth"

Expand Down Expand Up @@ -52,4 +76,4 @@ def post(self, url, data=None, json=None, **kwargs) -> Response:
return response


__all__ = ["PicnicAuthError", "PicnicAPISession"]
__all__ = ["PicnicAuthError", "Picnic2FARequired", "Picnic2FAError", "PicnicAPISession"]
93 changes: 91 additions & 2 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

from python_picnic_api2 import PicnicAPI
from python_picnic_api2.client import DEFAULT_URL
from python_picnic_api2.session import PicnicAuthError
from python_picnic_api2.session import (
Picnic2FAError,
Picnic2FARequired,
PicnicAuthError,
)

PICNIC_HEADERS = {
"x-picnic-agent": "30100;1.206.1-#15408",
Expand All @@ -15,9 +19,10 @@

class TestClient(unittest.TestCase):
class MockResponse:
def __init__(self, json_data, status_code):
def __init__(self, json_data, status_code, content=b"data"):
self.json_data = json_data
self.status_code = status_code
self.content = content

def json(self):
return self.json_data
Expand All @@ -42,6 +47,7 @@ def test_login_credentials(self):
"secret": "098f6bcd4621d373cade4e832627b4f6",
"client_id": 30100,
},
headers=PICNIC_HEADERS,
)

def test_login_auth_token(self):
Expand Down Expand Up @@ -334,3 +340,86 @@ def test_post_auth_exception(self):

with self.assertRaises(PicnicAuthError):
self.client.clear_cart()

def test_login_requires_2fa(self):
response = {
"error": {
"code": "TWO_FACTOR_AUTHENTICATION_REQUIRED",
"message": "User must verify their second factor",
"details": {},
}
}
self.session_mock().post.return_value = self.MockResponse(response, 200)

client = PicnicAPI()
with self.assertRaises(Picnic2FARequired) as ctx:
client.login("test-user", "test-password")
self.assertEqual(
str(ctx.exception), "User must verify their second factor"
)
self.assertEqual(ctx.exception.response, response)

def test_generate_2fa_code(self):
self.session_mock().post.return_value = self.MockResponse(
None, 204, content=b""
)

result = self.client.generate_2fa_code()
self.session_mock().post.assert_called_with(
self.expected_base_url + "/user/2fa/generate",
json={"channel": "SMS"},
headers=PICNIC_HEADERS,
)
self.assertIsNone(result)

def test_generate_2fa_code_email(self):
self.session_mock().post.return_value = self.MockResponse(
None, 204, content=b""
)

self.client.generate_2fa_code(channel="EMAIL")
self.session_mock().post.assert_called_with(
self.expected_base_url + "/user/2fa/generate",
json={"channel": "EMAIL"},
headers=PICNIC_HEADERS,
)

def test_verify_2fa_code_success(self):
self.session_mock().post.return_value = self.MockResponse(
None, 204, content=b""
)

result = self.client.verify_2fa_code("123456")
self.session_mock().post.assert_called_with(
self.expected_base_url + "/user/2fa/verify",
json={"otp": "123456"},
headers=PICNIC_HEADERS,
)
self.assertIsNone(result)

def test_verify_2fa_code_invalid(self):
response = {
"error": {
"code": "OTP_NOT_VALID",
"message": "Otp is not valid",
"details": {},
}
}
self.session_mock().post.return_value = self.MockResponse(response, 200)

with self.assertRaises(Picnic2FAError) as ctx:
self.client.verify_2fa_code("000000")
self.assertEqual(str(ctx.exception), "Otp is not valid")
self.assertEqual(ctx.exception.code, "OTP_NOT_VALID")

def test_2fa_auth_error(self):
response = {
"error": {
"code": "AUTH_ERROR",
"message": "Authentication failed.",
}
}
self.session_mock().post.return_value = self.MockResponse(response, 400)

with self.assertRaises(PicnicAuthError):
self.client.verify_2fa_code("123456")