S3 Module#

The s3 module provides classes for specifying S3 paths to read from or write to during job execution. This is particularly useful for data-intensive workflows where input data needs to be downloaded from S3 and output data needs to be uploaded to S3.

S3Mount#

@dataclass
class S3Mount(DataClassJsonMixin):
    """An S3 mount for a job."""

    source: str
    destination: str
    recursive: bool | None = None
    sse: str | None = None
    sse_kms_key_id: str | None = None
    options: str = "--quiet"

Parameters#

  • source (str): The source path. For read mounts, this is the S3 path to read from. For write mounts, this is the local path to write from.

  • destination (str): The destination path. For read mounts, this is the local path to write to. For write mounts, this is the S3 path to write to.

  • recursive (bool | None, optional): Whether to recursively copy directories. Default is None.

  • sse (str | None, optional): The server-side encryption algorithm to use. Default is None.

  • sse_kms_key_id (str | None, optional): The KMS key ID to use for server-side encryption. Default is None.

  • options (str, optional): Additional options to pass to the AWS CLI. Default is “–quiet”.

Methods#

validate#

def validate(self):
    """Validate the S3 mount."""

Validates the S3 mount by checking if the source S3 path exists and if the destination S3 path is writable.

S3Mounts#

@dataclass
class S3Mounts(DataClassJsonMixin):
    """S3 mounts for a job."""

    read: Sequence[S3Mount | dict] = field(default_factory=list)
    write: Sequence[S3Mount | dict] = field(default_factory=list)

Parameters#

  • read (Sequence[S3Mount | dict], optional): A sequence of S3 mounts to read from. Default is an empty list.

  • write (Sequence[S3Mount | dict], optional): A sequence of S3 mounts to write to. Default is an empty list.

Methods#

validate#

def validate(self):
    """Validate the S3 mounts."""

Validates all S3 mounts by calling the validate method on each mount.

to_json#

def to_json(self, *args, **kwargs):
    """Convert to JSON."""

Converts the S3 mounts to a JSON string. This is used internally by the preloader script to parse the S3 mounts.

Helper Functions#

check_s3_uri_valid#

def check_s3_uri_valid(s3_uri: str) -> bool:
    """Check if an S3 URI is valid.

    Parameters
    ----------
    s3_uri : str
        The S3 URI to check.

    Returns
    -------
    bool
        Whether the S3 URI is valid.
    """

Checks if an S3 URI is valid by verifying that it starts with “s3://” and that the object exists.

check_bucket_writable#

def check_bucket_writable(
    bucket: str,
    sse: Literal["AES256", "aws:kms", "aws:kms:dsse"] = CONFIG.Settings.sse,
    sse_kms_key_id: str = CONFIG.Settings.sseKmsKeyId,
) -> bool:
    """Check if a bucket is writable.

    Parameters
    ----------
    bucket : str
        The bucket to check.
    sse : Literal["AES256", "aws:kms", "aws:kms:dsse"]
        The server-side encryption to use.
    sse_kms_key_id : str
        The KMS key ID to use.

    Returns
    -------
    bool
        Whether the bucket is writable.
    """

Checks if a bucket is writable by attempting to write a test object to the bucket.

Examples#

Creating S3 Mounts#

from ezbatch.s3 import S3Mounts

# Create S3 mounts using dictionaries
mounts = S3Mounts(
    read=[
        {
            "source": "s3://my-bucket/input",
            "destination": "/mnt/data",
            "recursive": True,
        }
    ],
    write=[
        {
            "source": "/mnt/output",
            "destination": "s3://my-bucket/output",
            "recursive": True,
            "sse": "aws:kms",
            "sse_kms_key_id": "mrk-0123456789abcdef0",
        }
    ],
)

# Create S3 mounts using S3Mount objects
from ezbatch.s3 import S3Mount

mounts = S3Mounts(
    read=[
        S3Mount(
            source="s3://my-bucket/input",
            destination="/mnt/data",
            recursive=True,
        )
    ],
    write=[
        S3Mount(
            source="/mnt/output",
            destination="s3://my-bucket/output",
            recursive=True,
            sse="aws:kms",
            sse_kms_key_id="mrk-0123456789abcdef0",
        )
    ],
)

Using S3 Mounts with EZBatchJob#

from ezbatch.workflow import EZBatchJob
from ezbatch.s3 import S3Mounts

job = EZBatchJob(
    image="public.ecr.aws/ubuntu/ubuntu:22.04",
    command="echo hello, world!; ls -l /mnt/data;",
    mounts=S3Mounts(
        read=[
            {
                "source": "s3://my-bucket/input",
                "destination": "/mnt/data",
                "recursive": True,
            }
        ],
        write=[
            {
                "source": "/mnt/output",
                "destination": "s3://my-bucket/output",
                "recursive": True,
            }
        ],
    ),
    preloader=True,  # Enable the preloader to use S3 mounts
)

Validating S3 Mounts#

from ezbatch.s3 import S3Mounts

mounts = S3Mounts(
    read=[
        {
            "source": "s3://my-bucket/input",
            "destination": "/mnt/data",
            "recursive": True,
        }
    ],
    write=[
        {
            "source": "/mnt/output",
            "destination": "s3://my-bucket/output",
            "recursive": True,
        }
    ],
)

# Validate the S3 mounts
mounts.validate()

Converting S3 Mounts to JSON#

from ezbatch.s3 import S3Mounts

mounts = S3Mounts(
    read=[
        {
            "source": "s3://my-bucket/input",
            "destination": "/mnt/data",
            "recursive": True,
        }
    ],
    write=[
        {
            "source": "/mnt/output",
            "destination": "s3://my-bucket/output",
            "recursive": True,
        }
    ],
)

# Convert the S3 mounts to JSON
json_string = mounts.to_json()
print(json_string)