Source code for data_manipulation.boto3_

import os
from typing import Any, Dict, List, Optional, Union

from loguru import logger

# Constants - now serving as defaults
DEFAULT_MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024  # 10MB
DEFAULT_MAX_RETRIES = 3


[docs] def send_aws_ses_email( sender: str, recipient: List[str], subject: str, body_text: str, body_type: str, ses_client: Any, attachment: Optional[str] = None, max_attachment_size: int = DEFAULT_MAX_ATTACHMENT_SIZE, max_retries: int = DEFAULT_MAX_RETRIES, ) -> Optional[Dict[str, Any]]: """Sends an email using AWS SES service. Args: sender (str): Sender's email address. recipient (list): List of recipient email addresses. subject (str): Email subject line. body_text (str): Email body content. body_type (str): MIME type of email body (e.g., 'plain', 'html'). ses_client: AWS SES client instance. attachment (str, optional): Path to file to attach. Defaults to None. max_attachment_size (int, optional): Maximum allowed attachment size in bytes. Defaults to 10MB. max_retries (int, optional): Maximum number of retry attempts for failed sends. Defaults to 3. Returns: dict: AWS SES response dictionary if successful, None if failed. Note: Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ses/client/send_raw_email.html """ from email.mime.application import MIMEApplication from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import boto3.exceptions from botocore.exceptions import BotoCoreError, ClientError # Input validation if not sender or not recipient or not subject or not body_text: logger.error("Missing required email parameters") return None if not isinstance(recipient, list): logger.error("Recipient must be a list") return None if body_type not in ["plain", "html"]: logger.error("Invalid body type. Must be 'plain' or 'html'") return None # Validate retry count if max_retries < 1: logger.error("max_retries must be at least 1") return None msg = MIMEMultipart() msg["Subject"] = subject msg["From"] = sender msg["To"] = ", ".join(recipient) part = MIMEText(body_text, body_type) msg.attach(part) if attachment: try: if not os.path.exists(attachment): logger.error(f"Attachment not found: {attachment}") return None # Check file size with parameterized max size if os.path.getsize(attachment) > max_attachment_size: logger.error( f"Attachment size exceeds maximum allowed size of {max_attachment_size/1024/1024}MB" ) return None with open(attachment, "rb") as f: part = MIMEApplication(f.read()) part.add_header( "Content-Disposition", "attachment", filename=os.path.basename(attachment), ) msg.attach(part) except (IOError, OSError) as e: logger.error(f"Error processing attachment: {str(e)}") return None # Implement retry logic with parameterized max retries for attempt in range(max_retries): try: response = ses_client.send_raw_email( Source=sender, Destinations=recipient, RawMessage={"Data": msg.as_string()}, ) logger.info(f"Email sent successfully on attempt {attempt + 1}") return response except (BotoCoreError, ClientError) as error: if attempt == max_retries - 1: logger.error( f"Failed to send email after {max_retries} attempts: {error}" ) return None logger.warning(f"Attempt {attempt + 1} failed, retrying...") continue
[docs] def list_s3_bucket_files( bucket: str, to_dateframe: bool = False, prefix: Optional[str] = None, ) -> Union[List[str], "pd.DataFrame"]: """Lists all files in an S3 bucket. Args: bucket (str): Name of the S3 bucket. to_dateframe (bool, optional): Whether to return results as pandas DataFrame. Defaults to False. prefix (Optional[str], optional): Filter results to files with this prefix. Defaults to None. Returns: Union[List[str], pd.DataFrame]: List of file keys or DataFrame containing file keys. If to_dateframe is True, returns DataFrame with 'key' column. If to_dateframe is False, returns list of file keys. Examples: >>> files = list_s3_bucket_files('my-bucket') >>> type(files) <class 'list'> >>> df = list_s3_bucket_files('my-bucket', to_dateframe=True) >>> type(df) <class 'pandas.core.frame.DataFrame'> """ import boto3 from botocore.exceptions import BotoCoreError, ClientError if not bucket: raise ValueError("Bucket name cannot be empty") s3_client = boto3.client("s3") paginator = s3_client.get_paginator("list_objects_v2") keys = [] try: params = {"Bucket": bucket} if prefix: params["Prefix"] = prefix for page in paginator.paginate(**params): if "Contents" not in page: continue for obj in page["Contents"]: if obj["Key"].endswith("/"): continue keys.append(obj["Key"]) except (BotoCoreError, ClientError) as e: logger.error(f"Error accessing S3 bucket {bucket}: {str(e)}") raise if to_dateframe: import pandas as pd return pd.DataFrame(keys, columns=["key"]) return keys
if __name__ == "__main__": import doctest doctest.testmod()