bedrock.schedules.schedule_endpoint
1from bedrock.log import log_config 2 3 4class ScheduleEndpoint: # pragma: unit 5 """ 6 Base class for scheduled Lambda functions. 7 8 Scheduled endpoints are triggered by EventBridge Scheduler on a cron schedule. 9 They do not respond to HTTP requests like regular Endpoints. 10 11 To implement a scheduled endpoint: 12 1. Create a class that inherits from ScheduleEndpoint 13 2. Decorate with @cron("cron_expression") 14 3. Override the on_schedule() method 15 16 Example: 17 @cron("0 8 * * ? *") # Every day at 8 AM UTC 18 class DailyCleanup(ScheduleEndpoint): 19 def on_schedule(self, event, context): 20 # Your scheduled task logic here 21 return {"status": "completed"} 22 """ 23 __cron__ = None # Set by @cron decorator 24 25 def __init__(self): 26 self.logger = log_config(self.__class__.__name__) 27 28 def handle(self, event, context): 29 """ 30 Entry point for Lambda invocation from EventBridge. 31 """ 32 try: 33 return self.on_schedule(event, context) 34 except Exception as e: 35 self.logger.error(f"Error executing scheduled task {self.__class__.__name__}: {e}") 36 raise 37 38 def on_schedule(self, event, context): 39 """ 40 Override this method to implement your scheduled task logic. 41 42 :param event: The EventBridge event payload 43 :param context: Lambda context object 44 :return: Result of the scheduled task 45 """ 46 raise NotImplementedError("Subclasses must implement on_schedule()")
class
ScheduleEndpoint:
5class ScheduleEndpoint: # pragma: unit 6 """ 7 Base class for scheduled Lambda functions. 8 9 Scheduled endpoints are triggered by EventBridge Scheduler on a cron schedule. 10 They do not respond to HTTP requests like regular Endpoints. 11 12 To implement a scheduled endpoint: 13 1. Create a class that inherits from ScheduleEndpoint 14 2. Decorate with @cron("cron_expression") 15 3. Override the on_schedule() method 16 17 Example: 18 @cron("0 8 * * ? *") # Every day at 8 AM UTC 19 class DailyCleanup(ScheduleEndpoint): 20 def on_schedule(self, event, context): 21 # Your scheduled task logic here 22 return {"status": "completed"} 23 """ 24 __cron__ = None # Set by @cron decorator 25 26 def __init__(self): 27 self.logger = log_config(self.__class__.__name__) 28 29 def handle(self, event, context): 30 """ 31 Entry point for Lambda invocation from EventBridge. 32 """ 33 try: 34 return self.on_schedule(event, context) 35 except Exception as e: 36 self.logger.error(f"Error executing scheduled task {self.__class__.__name__}: {e}") 37 raise 38 39 def on_schedule(self, event, context): 40 """ 41 Override this method to implement your scheduled task logic. 42 43 :param event: The EventBridge event payload 44 :param context: Lambda context object 45 :return: Result of the scheduled task 46 """ 47 raise NotImplementedError("Subclasses must implement on_schedule()")
Base class for scheduled Lambda functions.
Scheduled endpoints are triggered by EventBridge Scheduler on a cron schedule. They do not respond to HTTP requests like regular Endpoints.
To implement a scheduled endpoint:
- Create a class that inherits from ScheduleEndpoint
- Decorate with @cron("cron_expression")
- Override the on_schedule() method
Example: @cron("0 8 * * ? *") # Every day at 8 AM UTC class DailyCleanup(ScheduleEndpoint): def on_schedule(self, event, context): # Your scheduled task logic here return {"status": "completed"}
def
handle(self, event, context):
29 def handle(self, event, context): 30 """ 31 Entry point for Lambda invocation from EventBridge. 32 """ 33 try: 34 return self.on_schedule(event, context) 35 except Exception as e: 36 self.logger.error(f"Error executing scheduled task {self.__class__.__name__}: {e}") 37 raise
Entry point for Lambda invocation from EventBridge.
def
on_schedule(self, event, context):
39 def on_schedule(self, event, context): 40 """ 41 Override this method to implement your scheduled task logic. 42 43 :param event: The EventBridge event payload 44 :param context: Lambda context object 45 :return: Result of the scheduled task 46 """ 47 raise NotImplementedError("Subclasses must implement on_schedule()")
Override this method to implement your scheduled task logic.
Parameters
- event: The EventBridge event payload
- context: Lambda context object
Returns
Result of the scheduled task