diff --git a/google/cloud/storage/bucket.py b/google/cloud/storage/bucket.py index 1621f879e..b4001e09d 100644 --- a/google/cloud/storage/bucket.py +++ b/google/cloud/storage/bucket.py @@ -43,7 +43,11 @@ from google.cloud.storage.acl import DefaultObjectACL from google.cloud.storage.blob import _quote from google.cloud.storage.blob import Blob -from google.cloud.storage.constants import _DEFAULT_TIMEOUT +from google.cloud.storage.constants import ( + _DEFAULT_TIMEOUT, + ENFORCEMENT_MODE_FULLY_RESTRICTED, + ENFORCEMENT_MODE_NOT_RESTRICTED, +) from google.cloud.storage.constants import ARCHIVE_STORAGE_CLASS from google.cloud.storage.constants import COLDLINE_STORAGE_CLASS from google.cloud.storage.constants import DUAL_REGION_LOCATION_TYPE @@ -65,7 +69,6 @@ from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED - _UBLA_BPO_ENABLED_MESSAGE = ( "Pass only one of 'uniform_bucket_level_access_enabled' / " "'bucket_policy_only_enabled' to 'IAMConfiguration'." @@ -2538,6 +2541,25 @@ def cors(self, entries): :rtype: bool or ``NoneType`` """ + @property + def encryption(self): + """Retrieve encryption configuration for this bucket. + + :rtype: :class:`BucketEncryption` + :returns: an instance for managing the bucket's encryption configuration. + """ + info = self._properties.get("encryption", {}) + return BucketEncryption.from_api_repr(info, self) + + @encryption.setter + def encryption(self, value): + """Set encryption configuration for this bucket. + + :type value: :class:`BucketEncryption` or dict + :param value: The encryption configuration. + """ + self._patch_property("encryption", value) + @property def default_kms_key_name(self): """Retrieve / set default KMS encryption key for objects in the bucket. @@ -3965,6 +3987,247 @@ def ip_filter(self, value): self._patch_property(_IP_FILTER_PROPERTY, value) +class EncryptionEnforcementConfig(dict): + """Map a bucket's encryption enforcement configuration. + + :type restriction_mode: str + :param restriction_mode: + (Optional) The restriction mode for the encryption type. + When set to ``FullyRestricted``, the bucket will only allow objects encrypted with the encryption type corresponding to this configuration. + When set to ``NotRestricted``, the bucket will allow objects encrypted with any encryption type. + + :type effective_time: :class:`datetime.datetime` + :param effective_time: + (Output only) The time when the encryption enforcement configuration became effective. + """ + + def __init__(self, restriction_mode=None): + data = {} + if restriction_mode is not None: + # Validate input against allowed constants + allowed = ( + ENFORCEMENT_MODE_FULLY_RESTRICTED, + ENFORCEMENT_MODE_NOT_RESTRICTED, + ) + if restriction_mode not in allowed: + raise ValueError( + f"Invalid restriction_mode: {restriction_mode}. " + f"Must be one of {allowed}" + ) + data["restrictionMode"] = restriction_mode + + super().__init__(data) + + @classmethod + def from_api_repr(cls, resource): + """Factory: construct instance from resource. + + :type resource: dict + :param resource: mapping as returned from API call. + + :rtype: :class:`EncryptionEnforcementConfig` + :returns: Instance created from resource. + """ + instance = cls() + instance.update(resource) + return instance + + @property + def restriction_mode(self): + """Get the restriction mode. + + :rtype: str or ``NoneType`` + :returns: The restriction mode or ``None`` if the property is not set. + """ + return self.get("restrictionMode") + + @restriction_mode.setter + def restriction_mode(self, value): + """Set the restriction mode. + + :type value: str + :param value: The restriction mode. + """ + self["restrictionMode"] = value + + @property + def effective_time(self): + """Get the effective time. + + :rtype: datetime.datetime or ``NoneType`` + :returns: point-in time at which the configuration is effective, + or ``None`` if the property is not set. + """ + timestamp = self.get("effectiveTime") + if timestamp is not None: + return _rfc3339_nanos_to_datetime(timestamp) + + +class BucketEncryption(dict): + """Map a bucket's encryption configuration. + + :type bucket: :class:`Bucket` + :param bucket: Bucket for which this instance is the policy. + + :type default_kms_key_name: str + :param default_kms_key_name: + (Optional) Resource name of KMS key used to encrypt bucket's content. + + :type google_managed_encryption_enforcement_config: :class:`EncryptionEnforcementConfig` + :param google_managed_encryption_enforcement_config: + (Optional) Encryption enforcement configuration for Google managed encryption. + + :type customer_managed_encryption_enforcement_config: :class:`EncryptionEnforcementConfig` + :param customer_managed_encryption_enforcement_config: + (Optional) Encryption enforcement configuration for Customer managed encryption. + + :type customer_supplied_encryption_enforcement_config: :class:`EncryptionEnforcementConfig` + :param customer_supplied_encryption_enforcement_config: + (Optional) Encryption enforcement configuration for Customer supplied encryption. + """ + + def __init__( + self, + bucket, + default_kms_key_name=None, + google_managed_encryption_enforcement_config=None, + customer_managed_encryption_enforcement_config=None, + customer_supplied_encryption_enforcement_config=None, + ): + data = {} + if default_kms_key_name is not None: + data["defaultKmsKeyName"] = default_kms_key_name + + if google_managed_encryption_enforcement_config is not None: + data["googleManagedEncryptionEnforcementConfig"] = ( + google_managed_encryption_enforcement_config + ) + + if customer_managed_encryption_enforcement_config is not None: + data["customerManagedEncryptionEnforcementConfig"] = ( + customer_managed_encryption_enforcement_config + ) + + if customer_supplied_encryption_enforcement_config is not None: + data["customerSuppliedEncryptionEnforcementConfig"] = ( + customer_supplied_encryption_enforcement_config + ) + + super().__init__(data) + self._bucket = bucket + + @classmethod + def from_api_repr(cls, resource, bucket): + """Factory: construct instance from resource. + + :type resource: dict + :param resource: mapping as returned from API call. + + :type bucket: :class:`Bucket` + :params bucket: Bucket for which this instance is the policy. + + :rtype: :class:`BucketEncryption` + :returns: Instance created from resource. + """ + instance = cls(bucket) + instance.update(resource) + return instance + + @property + def bucket(self): + """Bucket for which this instance is the policy. + + :rtype: :class:`Bucket` + :returns: the instance's bucket. + """ + return self._bucket + + @property + def default_kms_key_name(self): + """Retrieve default KMS encryption key for objects in the bucket. + + :rtype: str or ``NoneType`` + :returns: Default KMS encryption key, or ``None`` if not set. + """ + return self.get("defaultKmsKeyName") + + @default_kms_key_name.setter + def default_kms_key_name(self, value): + """Set default KMS encryption key for objects in the bucket. + + :type value: str or None + :param value: new KMS key name (None to clear any existing key). + """ + self["defaultKmsKeyName"] = value + self.bucket._patch_property("encryption", self) + + @property + def google_managed_encryption_enforcement_config(self): + """Retrieve the encryption enforcement configuration for Google managed encryption. + + :rtype: :class:`EncryptionEnforcementConfig` + :returns: The configuration instance. + """ + data = self.get("googleManagedEncryptionEnforcementConfig") + if data: + return EncryptionEnforcementConfig.from_api_repr(data) + return None + + @google_managed_encryption_enforcement_config.setter + def google_managed_encryption_enforcement_config(self, value): + """Set the encryption enforcement configuration for Google managed encryption. + + :type value: :class:`EncryptionEnforcementConfig` or dict + :param value: The configuration instance or dictionary. + """ + self["googleManagedEncryptionEnforcementConfig"] = value + self.bucket._patch_property("encryption", self) + + @property + def customer_managed_encryption_enforcement_config(self): + """Retrieve the encryption enforcement configuration for Customer managed encryption. + + :rtype: :class:`EncryptionEnforcementConfig` + :returns: The configuration instance. + """ + data = self.get("customerManagedEncryptionEnforcementConfig") + if data: + return EncryptionEnforcementConfig.from_api_repr(data) + return None + + @customer_managed_encryption_enforcement_config.setter + def customer_managed_encryption_enforcement_config(self, value): + """Set the encryption enforcement configuration for Customer managed encryption. + + :type value: :class:`EncryptionEnforcementConfig` or dict + :param value: The configuration instance or dictionary. + """ + self["customerManagedEncryptionEnforcementConfig"] = value + self.bucket._patch_property("encryption", self) + + @property + def customer_supplied_encryption_enforcement_config(self): + """Retrieve the encryption enforcement configuration for Customer supplied encryption. + + :rtype: :class:`EncryptionEnforcementConfig` + :returns: The configuration instance. + """ + data = self.get("customerSuppliedEncryptionEnforcementConfig") + if data: + return EncryptionEnforcementConfig.from_api_repr(data) + return None + + @customer_supplied_encryption_enforcement_config.setter + def customer_supplied_encryption_enforcement_config(self, value): + """Set the encryption enforcement configuration for Customer supplied encryption. + + :type value: :class:`EncryptionEnforcementConfig` or dict + :param value: The configuration instance or dictionary. + """ + self["customerSuppliedEncryptionEnforcementConfig"] = value + self.bucket._patch_property("encryption", self) + + class SoftDeletePolicy(dict): """Map a bucket's soft delete policy. diff --git a/google/cloud/storage/constants.py b/google/cloud/storage/constants.py index eba0a19df..c6c1b63c4 100644 --- a/google/cloud/storage/constants.py +++ b/google/cloud/storage/constants.py @@ -137,3 +137,9 @@ See: https://cloud.google.com/storage/docs/managing-turbo-replication """ + +ENFORCEMENT_MODE_FULLY_RESTRICTED = "FullyRestricted" +"""Bucket encryption restriction mode where encryption is fully restricted.""" + +ENFORCEMENT_MODE_NOT_RESTRICTED = "NotRestricted" +"""Bucket encryption restriction mode where encryption is not restricted.""" diff --git a/tests/system/test_bucket.py b/tests/system/test_bucket.py index 32806bd4c..cbd9d1880 100644 --- a/tests/system/test_bucket.py +++ b/tests/system/test_bucket.py @@ -22,6 +22,11 @@ PublicNetworkSource, VpcNetworkSource, ) +from google.cloud.storage.bucket import EncryptionEnforcementConfig +from google.cloud.storage.constants import ( + ENFORCEMENT_MODE_FULLY_RESTRICTED, + ENFORCEMENT_MODE_NOT_RESTRICTED, +) def test_bucket_create_w_alt_storage_class(storage_client, buckets_to_delete): @@ -1398,3 +1403,99 @@ def test_list_buckets_with_ip_filter(storage_client, buckets_to_delete): # Check that the summarized filter does not include full details. assert summarized_filter.public_network_source is None assert summarized_filter.vpc_network_sources == [] + + +def test_create_bucket_with_encryption_enforcement(storage_client, buckets_to_delete): + bucket_name = _helpers.unique_name("enforce-on-create") + + # Initialize the bucket object locally + bucket = storage_client.bucket(bucket_name) + + # Define and set the enforcement config + enforcement = EncryptionEnforcementConfig( + restriction_mode=ENFORCEMENT_MODE_FULLY_RESTRICTED + ) + # Set the config on the local object + bucket.encryption.google_managed_encryption_enforcement_config = enforcement + + # storage_client.create_bucket(bucket) sends the config in the initial POST + created_bucket = storage_client.create_bucket(bucket) + buckets_to_delete.append(created_bucket) + + # Verify backend persistence and server-generated effective_time + config = created_bucket.encryption.google_managed_encryption_enforcement_config + assert config.restriction_mode == ENFORCEMENT_MODE_FULLY_RESTRICTED + assert isinstance(config.effective_time, datetime.datetime) + + +def test_bucket_encryption_enforcement_config(storage_client, buckets_to_delete): + bucket_name = _helpers.unique_name("encryption-enforcement") + bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) + buckets_to_delete.append(bucket) + + # 1. Set initial enforcement configuration + # Testing both Google-managed and Customer-managed configurations + google_config = EncryptionEnforcementConfig( + restriction_mode=ENFORCEMENT_MODE_FULLY_RESTRICTED + ) + customer_config = EncryptionEnforcementConfig( + restriction_mode=ENFORCEMENT_MODE_NOT_RESTRICTED + ) + + bucket.encryption.google_managed_encryption_enforcement_config = google_config + bucket.encryption.customer_managed_encryption_enforcement_config = customer_config + + # Patch sends the 'encryption' dict to the server + bucket.patch() + + # 2. Reload and Verify backend persistence + bucket.reload() + + # Verify Google Managed Config and the presence of effective_time + reloaded_google = bucket.encryption.google_managed_encryption_enforcement_config + assert reloaded_google.restriction_mode == ENFORCEMENT_MODE_FULLY_RESTRICTED + assert isinstance(reloaded_google.effective_time, datetime.datetime) + + # Verify Customer Managed Config + reloaded_customer = bucket.encryption.customer_managed_encryption_enforcement_config + assert reloaded_customer.restriction_mode == ENFORCEMENT_MODE_NOT_RESTRICTED + assert reloaded_customer.effective_time is None + + # 3. Test updating an existing config + update_google_config = EncryptionEnforcementConfig( + restriction_mode=ENFORCEMENT_MODE_NOT_RESTRICTED + ) + bucket.encryption.google_managed_encryption_enforcement_config = ( + update_google_config + ) + bucket.patch() + assert ( + bucket.encryption.google_managed_encryption_enforcement_config.restriction_mode + == ENFORCEMENT_MODE_NOT_RESTRICTED + ) + + +def test_delete_bucket_encryption_enforcement_config(storage_client, buckets_to_delete): + bucket_name = _helpers.unique_name("delete-encryption-config") + + # Create a bucket with an initial restricted config + enforcement = EncryptionEnforcementConfig( + restriction_mode=ENFORCEMENT_MODE_FULLY_RESTRICTED + ) + bucket = storage_client.bucket(bucket_name) + bucket.encryption.google_managed_encryption_enforcement_config = enforcement + bucket = storage_client.create_bucket(bucket) + buckets_to_delete.append(bucket) + + # Verify it exists first + assert bucket.encryption.google_managed_encryption_enforcement_config is not None + + # DELETE: Set the specific enforcement config to None + bucket.encryption.google_managed_encryption_enforcement_config = None + + # patch() sends the null value to the server to clear the field + bucket.patch() + + # Reload and verify the field is gone + bucket.reload() + assert bucket.encryption.google_managed_encryption_enforcement_config is None diff --git a/tests/unit/test_bucket.py b/tests/unit/test_bucket.py index 850e89d04..98fc50ac8 100644 --- a/tests/unit/test_bucket.py +++ b/tests/unit/test_bucket.py @@ -2733,6 +2733,41 @@ def test_cors_setter(self): self.assertEqual(bucket.cors, [CORS_ENTRY]) self.assertTrue("cors" in bucket._changes) + def test_encryption_getter(self): + from google.cloud.storage.bucket import BucketEncryption + + NAME = "name" + KMS_RESOURCE = ( + "projects/test-project-123/" + "locations/us/" + "keyRings/test-ring/" + "cryptoKeys/test-key" + ) + ENCRYPTION_CONFIG = {"defaultKmsKeyName": KMS_RESOURCE} + bucket = self._make_one(name=NAME) + self.assertIsNone(bucket.encryption.default_kms_key_name) + bucket._properties["encryption"] = ENCRYPTION_CONFIG + encryption = bucket.encryption + self.assertIsInstance(encryption, BucketEncryption) + self.assertEqual(encryption.default_kms_key_name, KMS_RESOURCE) + + def test_encryption_setter(self): + from google.cloud.storage.bucket import BucketEncryption + + NAME = "name" + KMS_RESOURCE = ( + "projects/test-project-123/" + "locations/us/" + "keyRings/test-ring/" + "cryptoKeys/test-key" + ) + ENCRYPTION_CONFIG = {"defaultKmsKeyName": KMS_RESOURCE} + bucket = self._make_one(name=NAME) + encryption = BucketEncryption(bucket, default_kms_key_name=KMS_RESOURCE) + bucket.encryption = encryption + self.assertEqual(bucket._properties["encryption"], ENCRYPTION_CONFIG) + self.assertTrue("encryption" in bucket._changes) + def test_default_kms_key_name_getter(self): NAME = "name" KMS_RESOURCE = ( @@ -4722,3 +4757,127 @@ def test_it(self): self.assertEqual(notification._topic_name, topic) self.assertEqual(notification._topic_project, project) self.assertEqual(notification._properties, item) + + +class Test_EncryptionEnforcementConfig(unittest.TestCase): + @staticmethod + def _get_target_class(): + from google.cloud.storage.bucket import EncryptionEnforcementConfig + + return EncryptionEnforcementConfig + + def _make_one(self, **kw): + return self._get_target_class()(**kw) + + def test_ctor(self): + + from google.cloud.storage.constants import ENFORCEMENT_MODE_FULLY_RESTRICTED + + config = self._make_one(restriction_mode=ENFORCEMENT_MODE_FULLY_RESTRICTED) + + self.assertEqual(config.restriction_mode, ENFORCEMENT_MODE_FULLY_RESTRICTED) + self.assertIsNone(config.effective_time) + + def test_from_api_repr(self): + from google.cloud._helpers import _datetime_to_rfc3339 + from google.cloud.storage.constants import ENFORCEMENT_MODE_NOT_RESTRICTED + + now = _NOW(_UTC) + resource = { + "restrictionMode": ENFORCEMENT_MODE_NOT_RESTRICTED, + "effectiveTime": _datetime_to_rfc3339(now), + } + klass = self._get_target_class() + config = klass.from_api_repr(resource) + self.assertEqual(config.restriction_mode, ENFORCEMENT_MODE_NOT_RESTRICTED) + self.assertEqual(config.effective_time, now) + + def test_restriction_mode_setter(self): + config = self._make_one() + self.assertIsNone(config.restriction_mode) + config.restriction_mode = "FULLY_RESTRICTED" + self.assertEqual(config.restriction_mode, "FULLY_RESTRICTED") + self.assertEqual(config["restrictionMode"], "FULLY_RESTRICTED") + + +class Test_BucketEncryption(unittest.TestCase): + @staticmethod + def _get_target_class(): + from google.cloud.storage.bucket import BucketEncryption + + return BucketEncryption + + def _make_one(self, bucket, **kw): + return self._get_target_class()(bucket, **kw) + + @staticmethod + def _make_bucket(): + from google.cloud.storage.bucket import Bucket + + return mock.create_autospec(Bucket, instance=True) + + def test_ctor_defaults(self): + bucket = self._make_bucket() + encryption = self._make_one(bucket) + self.assertIs(encryption.bucket, bucket) + self.assertIsNone(encryption.default_kms_key_name) + # Check that the config itself is None, not its sub-property + self.assertIsNone(encryption.google_managed_encryption_enforcement_config) + self.assertIsNone(encryption.customer_managed_encryption_enforcement_config) + self.assertIsNone(encryption.customer_supplied_encryption_enforcement_config) + + def test_ctor_explicit(self): + from google.cloud.storage.bucket import EncryptionEnforcementConfig + + bucket = self._make_bucket() + kms_key = "key" + google_config = EncryptionEnforcementConfig("FullyRestricted") + encryption = self._make_one( + bucket, + default_kms_key_name=kms_key, + google_managed_encryption_enforcement_config=google_config, + ) + self.assertEqual(encryption.default_kms_key_name, kms_key) + self.assertEqual( + encryption.google_managed_encryption_enforcement_config.restriction_mode, + "FullyRestricted", + ) + + def test_from_api_repr(self): + klass = self._get_target_class() + bucket = self._make_bucket() + resource = { + "defaultKmsKeyName": "key", + "googleManagedEncryptionEnforcementConfig": { + "restrictionMode": "FullyRestricted" + }, + } + encryption = klass.from_api_repr(resource, bucket) + self.assertEqual(encryption.default_kms_key_name, "key") + self.assertEqual( + encryption.google_managed_encryption_enforcement_config.restriction_mode, + "FullyRestricted", + ) + + def test_setters_trigger_patch(self): + from google.cloud.storage.bucket import EncryptionEnforcementConfig + + bucket = self._make_bucket() + encryption = self._make_one(bucket) + + encryption.default_kms_key_name = "new-key" + config = EncryptionEnforcementConfig("NotRestricted") + encryption.google_managed_encryption_enforcement_config = config + encryption.customer_managed_encryption_enforcement_config = config + encryption.customer_supplied_encryption_enforcement_config = config + + self.assertEqual(bucket._patch_property.call_count, 4) + bucket._patch_property.assert_called_with("encryption", encryption) + + def test_bucket_encryption_getters_handle_none(self): + bucket = self._make_bucket() + encryption = self._get_target_class()(bucket) + encryption["googleManagedEncryptionEnforcementConfig"] = None + + config = encryption.google_managed_encryption_enforcement_config + self.assertIsNone(config)