Skip to content

Configuring AWS S3 Integration

CGAT-core provides native support for working with AWS S3 storage, allowing pipelines to read from and write to S3 buckets seamlessly.

Prerequisites

  1. AWS Account Setup
  2. Active AWS account
  3. IAM user with S3 access
  4. Access key and secret key

  5. Required Packages

    pip install boto3
    pip install cgatcore[s3]
    

Configuration

1. AWS Credentials

Configure AWS credentials using one of these methods:

a. Environment Variables

export AWS_ACCESS_KEY_ID='your_access_key'
export AWS_SECRET_ACCESS_KEY='your_secret_key'
export AWS_DEFAULT_REGION='your_region'

b. AWS Credentials File

Create ~/.aws/credentials:

[default]
aws_access_key_id = your_access_key
aws_secret_access_key = your_secret_key
region = your_region

c. Pipeline Configuration

In pipeline.yml:

s3:
    access_key: your_access_key
    secret_key: your_secret_key
    region: your_region
    bucket: your_default_bucket

2. S3 Pipeline Configuration

Configure S3-specific settings in pipeline.yml:

s3:
    # Default bucket for pipeline
    bucket: my-pipeline-bucket

    # Temporary directory for downloaded files
    local_tmpdir: /tmp/s3_cache

    # File transfer settings
    transfer:
        multipart_threshold: 8388608  # 8MB
        max_concurrency: 10
        multipart_chunksize: 8388608  # 8MB

    # Retry configuration
    retry:
        max_attempts: 5
        mode: standard

Usage Examples

1. Basic S3 Operations

Reading from S3

from cgatcore import pipeline as P

@P.s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed")
def process_s3_file(infile, outfile):
    """Process a file from S3."""
    statement = """
    cat %(infile)s | process_data > %(outfile)s
    """
    P.run(statement)

Writing to S3

@P.s3_transform("input.txt", suffix(".txt"), 
                "s3://bucket/output.processed")
def write_to_s3(infile, outfile):
    """Write results to S3."""
    statement = """
    process_data %(infile)s > %(outfile)s
    """
    P.run(statement)

2. Advanced Operations

Working with Multiple Files

@P.s3_merge(["s3://bucket/*.txt"], "s3://bucket/merged.txt")
def merge_s3_files(infiles, outfile):
    """Merge multiple S3 files."""
    statement = """
    cat %(infiles)s > %(outfile)s
    """
    P.run(statement)

Conditional S3 Usage

@P.transform("*.txt", suffix(".txt"), 
             P.s3_path_if("use_s3", ".processed"))
def conditional_s3(infile, outfile):
    """Use S3 based on configuration."""
    statement = """
    process_data %(infile)s > %(outfile)s
    """
    P.run(statement)

Best Practices

1. Performance Optimization

  • Batch Operations: Group small files for transfers
  • Multipart Uploads: Configure for large files
  • Concurrent Transfers: Set appropriate concurrency
  • Local Caching: Use temporary directory efficiently
s3:
    transfer:
        multipart_threshold: 100_000_000  # 100MB
        max_concurrency: 20
        multipart_chunksize: 10_000_000  # 10MB
    local_tmpdir: /fast/local/disk/s3_cache

2. Cost Management

  • Data Transfer: Minimize cross-region transfers
  • Storage Classes: Use appropriate storage tiers
  • Cleanup: Remove temporary files
  • Lifecycle Rules: Configure bucket lifecycle

3. Error Handling

@P.s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed")
def robust_s3_processing(infile, outfile):
    """Handle S3 operations with proper error checking."""
    try:
        statement = """
        process_data %(infile)s > %(outfile)s
        """
        P.run(statement)
    except P.S3Error as e:
        L.error("S3 operation failed: %s" % e)
        raise
    finally:
        # Clean up local temporary files
        P.cleanup_tmpdir()

Troubleshooting

Common Issues

  1. Access Denied
  2. Check AWS credentials
  3. Verify IAM permissions
  4. Ensure bucket policy allows access

  5. Transfer Failures

  6. Check network connectivity
  7. Verify file permissions
  8. Monitor transfer logs

  9. Performance Issues

  10. Adjust multipart settings
  11. Check network bandwidth
  12. Monitor memory usage

Debugging

Enable detailed S3 logging:

import logging
logging.getLogger('boto3').setLevel(logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.DEBUG)

Security Considerations

  1. Credentials Management
  2. Use IAM roles when possible
  3. Rotate access keys regularly
  4. Never commit credentials

  5. Data Protection

  6. Enable bucket encryption
  7. Use HTTPS endpoints
  8. Configure appropriate bucket policies

  9. Access Control

  10. Implement least privilege
  11. Use bucket policies
  12. Enable access logging

For more examples of using S3 in your pipelines, see the S3 Pipeline Examples section. - AWS S3 Documentation - Boto3 Documentation