AWS S3 Storage¶
This section describes how to interact with Amazon's cloud storage service (S3). To interact with the S3 resource, we use the boto3
SDK.
This documentation is a work in progress. We welcome any feedback or requests for extra features. If you find any bugs, please report them as issues on GitHub.
Setting up credentials¶
To use the AWS remote feature, you need to configure your credentials (i.e., the access key and secret key). You can set up these credentials by adding them as environment variables in a file ~/.aws/credentials
, as detailed in the boto3
configuration page. In brief, you need to add the keys as follows:
These access keys can be found within your S3 AWS console by following these steps:
- Log in to your AWS Management Console.
- Click on your username at the top right of the page.
- Click "My Security Credentials."
- Click "Users" in the left-hand menu and select a user.
- Click the "Security credentials" tab.
- YOUR_ACCESS_KEY is located in the "Access key" section.
If you have lost YOUR_SECRET_KEY, you will need to create a new access key. Please see the AWS documentation for instructions on how to do this. Note that every 90 days, AWS will rotate your access keys.
Additionally, you may want to configure the default region:
Once configuration variables have been created, you are ready to interact with the S3 storage.
Using S3 with S3Pipeline¶
The S3Pipeline
class in file_handler.py
is a convenient tool to integrate AWS S3 operations into data processing workflows. The class provides decorators to simplify working with S3 files in different stages of the pipeline.
First, initiate the class as follows:
from cgatcore import pipeline as P
from cgatcore.remote.file_handler import S3Pipeline
pipeline = S3Pipeline(name="MyPipeline", temp_dir="/tmp")
The S3Pipeline
class provides several decorators:
Download from AWS S3 with s3_transform
¶
To download a file, process it, and save the output, use the s3_transform
decorator. Here's an example:
@pipeline.s3_transform('s3://aws-test-boto/pipeline.yml', '_counts', 's3://aws-test-boto/pipeline_counts.yml')
def countWords(input_file, output_file):
"""Count the number of words in the file."""
with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
content = infile.read()
words = content.split()
outfile.write(f"word\tfreq\n")
word_freq = {word: words.count(word) for word in set(words)}
for word, count in word_freq.items():
outfile.write(f"{word}\t{count}\n")
This decorator downloads pipeline.yml
from the S3 bucket aws-test-boto
to a local temporary directory, processes it, and saves the results to a new file (pipeline_counts.yml
) back on S3.
Merging Multiple Files with s3_merge
¶
If you need to merge multiple files from S3 into one, use the s3_merge
decorator. Here's how:
@pipeline.s3_merge(['s3://aws-test-boto/file1.txt', 's3://aws-test-boto/file2.txt'], 's3://aws-test-boto/merged_file.txt')
def mergeFiles(input_files, output_file):
"""Merge multiple input files into one."""
with open(output_file, 'w') as outfile:
for file in input_files:
with open(file, 'r') as infile:
outfile.write(infile.read())
Splitting a File with s3_split
¶
To split a single input file into multiple output files, use the s3_split
decorator:
@pipeline.s3_split('s3://aws-test-boto/largefile.txt', ['s3://aws-test-boto/part1.txt', 's3://aws-test-boto/part2.txt'])
def splitFile(input_file, output_files):
"""Split the input file into multiple output files."""
with open(input_file, 'r') as infile:
content = infile.readlines()
mid = len(content) // 2
with open(output_files[0], 'w') as outfile1:
outfile1.writelines(content[:mid])
with open(output_files[1], 'w') as outfile2:
outfile2.writelines(content[mid:])
This splits the large input file into two separate parts, saving them as different S3 objects.
Running the Pipeline¶
To run all tasks in the pipeline:
This will sequentially execute all tasks that have been added to the pipeline through the decorators.
Example: Full Pipeline¶
Here is an example of a simple pipeline that uses the S3Pipeline
class to count words in a file, merge two files, and then delete a file:
from cgatcore.remote.file_handler import S3Pipeline
pipeline = S3Pipeline(name="ExamplePipeline", temp_dir="/tmp")
@pipeline.s3_transform('s3://aws-test-boto/pipeline.yml', '_counts', 's3://aws-test-boto/pipeline_counts.yml')
def countWords(input_file, output_file):
with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
content = infile.read()
words = content.split()
outfile.write(f"word\tfreq\n")
word_freq = {word: words.count(word) for word in set(words)}
for word, count in word_freq.items():
outfile.write(f"{word}\t{count}\n")
@pipeline.s3_merge(['s3://aws-test-boto/file1.txt', 's3://aws-test-boto/file2.txt'], 's3://aws-test-boto/merged_file.txt')
def mergeFiles(input_files, output_file):
with open(output_file, 'w') as outfile:
for file in input_files:
with open(file, 'r') as infile:
outfile.write(infile.read())
pipeline.run()
In this example:
- Download and Transform: The
countWords
function downloads a file from S3, counts the words, and uploads the output back to S3. - Merge: The
mergeFiles
function merges two files from S3 and writes the merged output back to S3. - Run: Finally, all the tasks are executed sequentially with
pipeline.run()
.
This updated documentation provides a more accurate representation of the current capabilities of the S3Pipeline
class, allowing for an easier and more efficient way to handle AWS S3 resources within your pipelines.