Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -703,18 +703,65 @@ options = {

Embed context values into a bearer token during generation so you can reference those values in your policies. This enables more flexible access controls, such as tracking end-user identity when making API calls using service accounts, and facilitates using signed data tokens during detokenization.

Generate bearer tokens containing context information using a service account with the context_id identifier. Context information is represented as a JWT claim in a Skyflow-generated bearer token. Tokens generated from such service accounts include a context_identifier claim, are valid for 60 minutes, and can be used to make API calls to the Data and Management APIs, depending on the service account's permissions.
Generate bearer tokens containing context information using a service account with the `context_id` identifier. Context information is represented as a JWT claim in a Skyflow-generated bearer token. Tokens generated from such service accounts include a `context_identifier` claim, are valid for 60 minutes, and can be used to make API calls to the Data and Management APIs, depending on the service account's permissions.

The `ctx` parameter accepts either a **string** or a **dict**:

**String context** — use when your policy references a single context value:

```python
options = {'ctx': 'user_12345'}
token, _ = generate_bearer_token(filepath, options)
```

**Dict context** — use when your policy needs multiple context values for conditional data access. Each key in the dict maps to a Skyflow CEL policy variable under `request.context.*`:

```python
options = {
'ctx': {
'role': 'admin',
'department': 'finance',
'user_id': 'user_12345',
}
}
token, _ = generate_bearer_token(filepath, options)
```

With the dict above, your Skyflow policies can reference `request.context.role`, `request.context.department`, and `request.context.user_id` to make conditional access decisions.

Dict keys must contain only alphanumeric characters and underscores (`[a-zA-Z0-9_]`). Invalid keys will raise a `SkyflowError`.

> [!TIP]
> See the full example in the samples directory: [token_generation_with_context_example.py](samples/service_account/token_generation_with_context_example.py)
> See [docs.skyflow.com](https://docs.skyflow.com) for more details on authentication, access control, and governance for Skyflow.
> See the full example in the samples directory: [token_generation_with_context_example.py](samples/service_account/token_generation_with_context_example.py)
> See Skyflow's [context-aware authorization](https://docs.skyflow.com) and [conditional data access](https://docs.skyflow.com) docs for policy variable syntax like `request.context.*`.

#### Generate signed data tokens: `generate_signed_data_tokens(filepath, options)`

Digitally sign data tokens with a service account's private key to add an extra layer of protection. Skyflow generates data tokens when sensitive data is inserted into the vault. Detokenize signed tokens only by providing the signed data token along with a bearer token generated from the service account's credentials. The service account must have the necessary permissions and context to successfully detokenize the signed data tokens.

The `ctx` parameter on signed data tokens also accepts either a **string** or a **dict**, using the same format as bearer tokens:

```python
# String context
options = {
'ctx': 'user_12345',
'data_tokens': ['dataToken1', 'dataToken2'],
'time_to_live': 90,
}

# Dict context
options = {
'ctx': {
'role': 'analyst',
'department': 'research',
},
'data_tokens': ['dataToken1', 'dataToken2'],
'time_to_live': 90,
}
```

> [!TIP]
> See the full example in the samples directory: [signed_token_generation_example.py](samples/service_account/signed_token_generation_example.py)
> See the full example in the samples directory: [signed_token_generation_example.py](samples/service_account/signed_token_generation_example.py)
> See [docs.skyflow.com](https://docs.skyflow.com) for more details on authentication, access control, and governance for Skyflow.

## Logging
Expand Down
70 changes: 41 additions & 29 deletions samples/service_account/signed_token_generation_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,54 @@
credentials_string = json.dumps(skyflow_credentials)


options = {
'ctx': 'CONTEXT_ID',
'data_tokens': ['DATA_TOKEN1', 'DATA_TOKEN2'],
'time_to_live': 90, # in seconds
}
# Approach 1: Signed data tokens with string context
def get_signed_tokens_with_string_context():
options = {
'ctx': 'user_12345',
'data_tokens': ['DATA_TOKEN1', 'DATA_TOKEN2'],
'time_to_live': 90, # in seconds
}
try:
data_token, signed_data_token = generate_signed_data_tokens(file_path, options)
return data_token, signed_data_token
except Exception as e:
print(f'Error: {str(e)}')

def get_signed_bearer_token_from_file_path():
# Generate signed bearer token from credentials file path.
global bearer_token

# Approach 2: Signed data tokens with JSON object context (dict)
# Each key maps to a Skyflow CEL policy variable under request.context.*
# For example: request.context.role == "analyst" and request.context.department == "research"
def get_signed_tokens_with_object_context():
options = {
'ctx': {
'role': 'analyst',
'department': 'research',
'user_id': 'user_67890',
},
'data_tokens': ['DATA_TOKEN1', 'DATA_TOKEN2'],
'time_to_live': 90,
}
try:
if not is_expired(bearer_token):
return bearer_token
else:
data_token, signed_data_token = generate_signed_data_tokens(file_path, options)
return data_token, signed_data_token

data_token, signed_data_token = generate_signed_data_tokens(file_path, options)
return data_token, signed_data_token
except Exception as e:
print(f'Error generating token from file path: {str(e)}')
print(f'Error: {str(e)}')


def get_signed_bearer_token_from_credentials_string():
# Generate signed bearer token from credentials string.
global bearer_token

# Approach 3: Signed data tokens from credentials string
def get_signed_tokens_from_credentials_string():
options = {
'ctx': 'user_12345',
'data_tokens': ['DATA_TOKEN1', 'DATA_TOKEN2'],
'time_to_live': 90,
}
try:
if not is_expired(bearer_token):
return bearer_token
else:
data_token, signed_data_token = generate_signed_data_tokens_from_creds(credentials_string, options)
return data_token, signed_data_token

data_token, signed_data_token = generate_signed_data_tokens_from_creds(credentials_string, options)
return data_token, signed_data_token
except Exception as e:
print(f'Error generating token from credentials string: {str(e)}')

print(f'Error: {str(e)}')

print(get_signed_bearer_token_from_file_path())

print(get_signed_bearer_token_from_credentials_string())
print("String context:", get_signed_tokens_with_string_context())
print("Object context:", get_signed_tokens_with_object_context())
print("Creds string:", get_signed_tokens_from_credentials_string())
46 changes: 37 additions & 9 deletions samples/service_account/token_generation_with_context_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
}
credentials_string = json.dumps(skyflow_credentials)

options = {'ctx': '<CONTEXT_ID>'}

def get_bearer_token_with_context_from_file_path():
# Generate bearer token with context from credentials file path.
# Approach 1: Bearer token with string context
# Use a simple string identifier when your policy references a single context value.
# In your Skyflow policy, reference this as: request.context
def get_bearer_token_with_string_context():
global bearer_token
options = {'ctx': 'user_12345'}

try:
if not is_expired(bearer_token):
Expand All @@ -31,14 +33,40 @@ def get_bearer_token_with_context_from_file_path():
token, _ = generate_bearer_token(file_path, options)
bearer_token = token
return bearer_token
except Exception as e:
print(f'Error generating token: {str(e)}')


# Approach 2: Bearer token with JSON object context (dict)
# Use a dict when your policy needs multiple context values for conditional data access.
# Each key maps to a Skyflow CEL policy variable under request.context.*
# For example: request.context.role == "admin" and request.context.department == "finance"
def get_bearer_token_with_object_context():
global bearer_token
options = {
'ctx': {
'role': 'admin',
'department': 'finance',
'user_id': 'user_12345',
}
}

try:
if not is_expired(bearer_token):
return bearer_token
else:
token, _ = generate_bearer_token(file_path, options)
bearer_token = token
return bearer_token
except Exception as e:
print(f'Error generating token from file path: {str(e)}')
print(f'Error generating token: {str(e)}')


# Approach 3: Bearer token with string context from credentials string
def get_bearer_token_with_context_from_credentials_string():
# Generate bearer token with context from credentials string.
global bearer_token
options = {'ctx': 'user_12345'}

try:
if not is_expired(bearer_token):
return bearer_token
Expand All @@ -47,9 +75,9 @@ def get_bearer_token_with_context_from_credentials_string():
bearer_token = token
return bearer_token
except Exception as e:
print(f"Error generating token from credentials string: {str(e)}")

print(f"Error generating token: {str(e)}")

print(get_bearer_token_with_context_from_file_path())

print(get_bearer_token_with_context_from_credentials_string())
print("String context:", get_bearer_token_with_string_context())
print("Object context:", get_bearer_token_with_object_context())
print("Creds string:", get_bearer_token_with_context_from_credentials_string())
37 changes: 35 additions & 2 deletions skyflow/service_account/_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import datetime
import re
import time
import jwt
from skyflow.error import SkyflowError
Expand All @@ -10,6 +11,34 @@

invalid_input_error_code = SkyflowMessages.ErrorCodes.INVALID_INPUT.value

_CTX_KEY_PATTERN = re.compile(r'^[a-zA-Z0-9_]+$')


def _validate_and_resolve_ctx(ctx):
"""Validate ctx value and return resolved value for JWT claims.
Returns None if ctx should be omitted, the value if valid, or raises SkyflowError if invalid.
"""
if ctx is None:
return None
if isinstance(ctx, str):
if ctx.strip() == '':
return None
return ctx
if isinstance(ctx, dict):
if len(ctx) == 0:
return None
for key in ctx:
if not isinstance(key, str) or not _CTX_KEY_PATTERN.match(key):
raise SkyflowError(
SkyflowMessages.Error.INVALID_CTX_MAP_KEY.value.format(key),
invalid_input_error_code
)
return ctx
raise SkyflowError(
SkyflowMessages.Error.INVALID_CTX_TYPE.value,
invalid_input_error_code
)

def is_expired(token, logger = None):
if len(token) == 0:
log_error_log(SkyflowMessages.ErrorLogs.INVALID_BEARER_TOKEN.value)
Expand Down Expand Up @@ -103,7 +132,9 @@ def get_signed_jwt(options, client_id, key_id, token_uri, private_key, logger):
"exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=60)
}
if options and "ctx" in options:
payload["ctx"] = options.get("ctx")
resolved_ctx = _validate_and_resolve_ctx(options.get("ctx"))
if resolved_ctx is not None:
payload["ctx"] = resolved_ctx
try:
return jwt.encode(payload=payload, key=private_key, algorithm="RS256")
except Exception:
Expand All @@ -128,7 +159,9 @@ def get_signed_tokens(credentials_obj, options):
}

if "ctx" in options:
claims["ctx"] = options["ctx"]
resolved_ctx = _validate_and_resolve_ctx(options["ctx"])
if resolved_ctx is not None:
claims["ctx"] = resolved_ctx

private_key = credentials_obj.get("privateKey")
signed_jwt = jwt.encode(claims, private_key, algorithm="RS256")
Expand Down
2 changes: 2 additions & 0 deletions skyflow/utils/_skyflow_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class Error(Enum):
EMPTY_CONTEXT = f"{error_prefix} Initialization failed. Invalid context provided. Specify context as type Context."
INVALID_CONTEXT_IN_CONFIG = f"{error_prefix} Initialization failed. Invalid context for {{}} with id {{}}. Specify a valid context."
INVALID_CONTEXT = f"{error_prefix} Initialization failed. Invalid context. Specify a valid context."
INVALID_CTX_TYPE = f"{error_prefix} Initialization failed. Invalid ctx type. Specify ctx as a string or a dict."
INVALID_CTX_MAP_KEY = f"{error_prefix} Initialization failed. Invalid key '{{}}' in ctx dict. Keys must contain only alphanumeric characters and underscores."
INVALID_LOG_LEVEL = f"{error_prefix} Initialization failed. Invalid log level. Specify a valid log level."
EMPTY_LOG_LEVEL = f"{error_prefix} Initialization failed. Specify a valid log level."

Expand Down
69 changes: 67 additions & 2 deletions tests/service_account/test__utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from skyflow.service_account import is_expired, generate_bearer_token, \
generate_bearer_token_from_creds
from skyflow.utils import SkyflowMessages
from skyflow.service_account._utils import get_service_account_token, get_signed_jwt, generate_signed_data_tokens, get_signed_data_token_response_object, generate_signed_data_tokens_from_creds
from skyflow.service_account._utils import get_service_account_token, get_signed_jwt, generate_signed_data_tokens, get_signed_data_token_response_object, generate_signed_data_tokens_from_creds, _validate_and_resolve_ctx

creds_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "credentials.json")
with open(creds_path, 'r') as file:
Expand Down Expand Up @@ -143,4 +143,69 @@ def test_generate_signed_data_tokens_from_creds_with_invalid_string(self):
credentials_string = '{'
with self.assertRaises(SkyflowError) as context:
result = generate_signed_data_tokens_from_creds(credentials_string, options)
self.assertEqual(context.exception.message, SkyflowMessages.Error.INVALID_CREDENTIALS_STRING.value)
self.assertEqual(context.exception.message, SkyflowMessages.Error.INVALID_CREDENTIALS_STRING.value)

# ctx JSON object support tests

def test_validate_and_resolve_ctx_none(self):
self.assertIsNone(_validate_and_resolve_ctx(None))

def test_validate_and_resolve_ctx_empty_string(self):
self.assertIsNone(_validate_and_resolve_ctx(''))
self.assertIsNone(_validate_and_resolve_ctx(' '))

def test_validate_and_resolve_ctx_valid_string(self):
self.assertEqual(_validate_and_resolve_ctx('user_12345'), 'user_12345')

def test_validate_and_resolve_ctx_empty_dict(self):
self.assertIsNone(_validate_and_resolve_ctx({}))

def test_validate_and_resolve_ctx_valid_dict(self):
ctx = {"role": "admin", "department": "finance"}
self.assertEqual(_validate_and_resolve_ctx(ctx), ctx)

def test_validate_and_resolve_ctx_dict_with_alphanumeric_keys(self):
ctx = {"role_1": "admin", "dept2": "finance", "ABC_123": "value"}
self.assertEqual(_validate_and_resolve_ctx(ctx), ctx)

def test_validate_and_resolve_ctx_dict_with_invalid_key_hyphen(self):
ctx = {"valid_key": "value", "invalid-key": "value"}
with self.assertRaises(SkyflowError):
_validate_and_resolve_ctx(ctx)

def test_validate_and_resolve_ctx_dict_with_invalid_key_space(self):
ctx = {"invalid key": "value"}
with self.assertRaises(SkyflowError):
_validate_and_resolve_ctx(ctx)

def test_validate_and_resolve_ctx_dict_with_invalid_key_dot(self):
ctx = {"invalid.key": "value"}
with self.assertRaises(SkyflowError):
_validate_and_resolve_ctx(ctx)

def test_validate_and_resolve_ctx_invalid_type_int(self):
with self.assertRaises(SkyflowError):
_validate_and_resolve_ctx(42)

def test_validate_and_resolve_ctx_invalid_type_list(self):
with self.assertRaises(SkyflowError):
_validate_and_resolve_ctx(["a", "b"])

def test_validate_and_resolve_ctx_dict_with_mixed_value_types(self):
ctx = {"role": "admin", "level": 3, "active": True, "timestamp": "2025-12-25T10:30:00Z"}
self.assertEqual(_validate_and_resolve_ctx(ctx), ctx)

def test_validate_and_resolve_ctx_dict_with_nested_objects(self):
ctx = {"role": "admin", "metadata": {"level": 2, "tags": ["a", "b"]}}
self.assertEqual(_validate_and_resolve_ctx(ctx), ctx)

def test_generate_signed_data_tokens_with_dict_ctx(self):
creds_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "credentials.json")
options = {"data_tokens": ["token1"], "ctx": {"role": "admin", "department": "finance"}}
result = generate_signed_data_tokens(creds_path, options)
self.assertEqual(len(result), 2)

def test_generate_signed_data_tokens_from_creds_with_dict_ctx(self):
options = {"data_tokens": ["token1"], "ctx": {"role": "admin", "level": 3}}
result = generate_signed_data_tokens_from_creds(VALID_CREDENTIALS_STRING, options)
self.assertEqual(len(result), 2)
Loading