S3 Module#
The s3 module provides classes for specifying S3 paths to read from or write to during job execution. This is particularly useful for data-intensive workflows where input data needs to be downloaded from S3 and output data needs to be uploaded to S3.
S3Mount#
@dataclass
class S3Mount(DataClassJsonMixin):
"""An S3 mount for a job."""
source: str
destination: str
recursive: bool | None = None
sse: str | None = None
sse_kms_key_id: str | None = None
options: str = "--quiet"
Parameters#
source (
str): The source path. For read mounts, this is the S3 path to read from. For write mounts, this is the local path to write from.destination (
str): The destination path. For read mounts, this is the local path to write to. For write mounts, this is the S3 path to write to.recursive (
bool | None, optional): Whether to recursively copy directories. Default is None.sse (
str | None, optional): The server-side encryption algorithm to use. Default is None.sse_kms_key_id (
str | None, optional): The KMS key ID to use for server-side encryption. Default is None.options (
str, optional): Additional options to pass to the AWS CLI. Default is “–quiet”.
Methods#
validate#
def validate(self):
"""Validate the S3 mount."""
Validates the S3 mount by checking if the source S3 path exists and if the destination S3 path is writable.
S3Mounts#
@dataclass
class S3Mounts(DataClassJsonMixin):
"""S3 mounts for a job."""
read: Sequence[S3Mount | dict] = field(default_factory=list)
write: Sequence[S3Mount | dict] = field(default_factory=list)
Parameters#
read (
Sequence[S3Mount | dict], optional): A sequence of S3 mounts to read from. Default is an empty list.write (
Sequence[S3Mount | dict], optional): A sequence of S3 mounts to write to. Default is an empty list.
Methods#
validate#
def validate(self):
"""Validate the S3 mounts."""
Validates all S3 mounts by calling the validate method on each mount.
to_json#
def to_json(self, *args, **kwargs):
"""Convert to JSON."""
Converts the S3 mounts to a JSON string. This is used internally by the preloader script to parse the S3 mounts.
Helper Functions#
check_s3_uri_valid#
def check_s3_uri_valid(s3_uri: str) -> bool:
"""Check if an S3 URI is valid.
Parameters
----------
s3_uri : str
The S3 URI to check.
Returns
-------
bool
Whether the S3 URI is valid.
"""
Checks if an S3 URI is valid by verifying that it starts with “s3://” and that the object exists.
check_bucket_writable#
def check_bucket_writable(
bucket: str,
sse: Literal["AES256", "aws:kms", "aws:kms:dsse"] = CONFIG.Settings.sse,
sse_kms_key_id: str = CONFIG.Settings.sseKmsKeyId,
) -> bool:
"""Check if a bucket is writable.
Parameters
----------
bucket : str
The bucket to check.
sse : Literal["AES256", "aws:kms", "aws:kms:dsse"]
The server-side encryption to use.
sse_kms_key_id : str
The KMS key ID to use.
Returns
-------
bool
Whether the bucket is writable.
"""
Checks if a bucket is writable by attempting to write a test object to the bucket.
Examples#
Creating S3 Mounts#
from ezbatch.s3 import S3Mounts
# Create S3 mounts using dictionaries
mounts = S3Mounts(
read=[
{
"source": "s3://my-bucket/input",
"destination": "/mnt/data",
"recursive": True,
}
],
write=[
{
"source": "/mnt/output",
"destination": "s3://my-bucket/output",
"recursive": True,
"sse": "aws:kms",
"sse_kms_key_id": "mrk-0123456789abcdef0",
}
],
)
# Create S3 mounts using S3Mount objects
from ezbatch.s3 import S3Mount
mounts = S3Mounts(
read=[
S3Mount(
source="s3://my-bucket/input",
destination="/mnt/data",
recursive=True,
)
],
write=[
S3Mount(
source="/mnt/output",
destination="s3://my-bucket/output",
recursive=True,
sse="aws:kms",
sse_kms_key_id="mrk-0123456789abcdef0",
)
],
)
Using S3 Mounts with EZBatchJob#
from ezbatch.workflow import EZBatchJob
from ezbatch.s3 import S3Mounts
job = EZBatchJob(
image="public.ecr.aws/ubuntu/ubuntu:22.04",
command="echo hello, world!; ls -l /mnt/data;",
mounts=S3Mounts(
read=[
{
"source": "s3://my-bucket/input",
"destination": "/mnt/data",
"recursive": True,
}
],
write=[
{
"source": "/mnt/output",
"destination": "s3://my-bucket/output",
"recursive": True,
}
],
),
preloader=True, # Enable the preloader to use S3 mounts
)
Validating S3 Mounts#
from ezbatch.s3 import S3Mounts
mounts = S3Mounts(
read=[
{
"source": "s3://my-bucket/input",
"destination": "/mnt/data",
"recursive": True,
}
],
write=[
{
"source": "/mnt/output",
"destination": "s3://my-bucket/output",
"recursive": True,
}
],
)
# Validate the S3 mounts
mounts.validate()
Converting S3 Mounts to JSON#
from ezbatch.s3 import S3Mounts
mounts = S3Mounts(
read=[
{
"source": "s3://my-bucket/input",
"destination": "/mnt/data",
"recursive": True,
}
],
write=[
{
"source": "/mnt/output",
"destination": "s3://my-bucket/output",
"recursive": True,
}
],
)
# Convert the S3 mounts to JSON
json_string = mounts.to_json()
print(json_string)