bedrock.schedules.schedule_endpoint

 1from bedrock.log import log_config
 2
 3
 4class ScheduleEndpoint:  # pragma: unit
 5    """
 6    Base class for scheduled Lambda functions.
 7
 8    Scheduled endpoints are triggered by EventBridge Scheduler on a cron schedule.
 9    They do not respond to HTTP requests like regular Endpoints.
10
11    To implement a scheduled endpoint:
12    1. Create a class that inherits from ScheduleEndpoint
13    2. Decorate with @cron("cron_expression")
14    3. Override the on_schedule() method
15
16    Example:
17        @cron("0 8 * * ? *")  # Every day at 8 AM UTC
18        class DailyCleanup(ScheduleEndpoint):
19            def on_schedule(self, event, context):
20                # Your scheduled task logic here
21                return {"status": "completed"}
22    """
23    __cron__ = None  # Set by @cron decorator
24
25    def __init__(self):
26        self.logger = log_config(self.__class__.__name__)
27
28    def handle(self, event, context):
29        """
30        Entry point for Lambda invocation from EventBridge.
31        """
32        try:
33            return self.on_schedule(event, context)
34        except Exception as e:
35            self.logger.error(f"Error executing scheduled task {self.__class__.__name__}: {e}")
36            raise
37
38    def on_schedule(self, event, context):
39        """
40        Override this method to implement your scheduled task logic.
41
42        :param event: The EventBridge event payload
43        :param context: Lambda context object
44        :return: Result of the scheduled task
45        """
46        raise NotImplementedError("Subclasses must implement on_schedule()")
class ScheduleEndpoint:
 5class ScheduleEndpoint:  # pragma: unit
 6    """
 7    Base class for scheduled Lambda functions.
 8
 9    Scheduled endpoints are triggered by EventBridge Scheduler on a cron schedule.
10    They do not respond to HTTP requests like regular Endpoints.
11
12    To implement a scheduled endpoint:
13    1. Create a class that inherits from ScheduleEndpoint
14    2. Decorate with @cron("cron_expression")
15    3. Override the on_schedule() method
16
17    Example:
18        @cron("0 8 * * ? *")  # Every day at 8 AM UTC
19        class DailyCleanup(ScheduleEndpoint):
20            def on_schedule(self, event, context):
21                # Your scheduled task logic here
22                return {"status": "completed"}
23    """
24    __cron__ = None  # Set by @cron decorator
25
26    def __init__(self):
27        self.logger = log_config(self.__class__.__name__)
28
29    def handle(self, event, context):
30        """
31        Entry point for Lambda invocation from EventBridge.
32        """
33        try:
34            return self.on_schedule(event, context)
35        except Exception as e:
36            self.logger.error(f"Error executing scheduled task {self.__class__.__name__}: {e}")
37            raise
38
39    def on_schedule(self, event, context):
40        """
41        Override this method to implement your scheduled task logic.
42
43        :param event: The EventBridge event payload
44        :param context: Lambda context object
45        :return: Result of the scheduled task
46        """
47        raise NotImplementedError("Subclasses must implement on_schedule()")

Base class for scheduled Lambda functions.

Scheduled endpoints are triggered by EventBridge Scheduler on a cron schedule. They do not respond to HTTP requests like regular Endpoints.

To implement a scheduled endpoint:

  1. Create a class that inherits from ScheduleEndpoint
  2. Decorate with @cron("cron_expression")
  3. Override the on_schedule() method

Example: @cron("0 8 * * ? *") # Every day at 8 AM UTC class DailyCleanup(ScheduleEndpoint): def on_schedule(self, event, context): # Your scheduled task logic here return {"status": "completed"}

logger
def handle(self, event, context):
29    def handle(self, event, context):
30        """
31        Entry point for Lambda invocation from EventBridge.
32        """
33        try:
34            return self.on_schedule(event, context)
35        except Exception as e:
36            self.logger.error(f"Error executing scheduled task {self.__class__.__name__}: {e}")
37            raise

Entry point for Lambda invocation from EventBridge.

def on_schedule(self, event, context):
39    def on_schedule(self, event, context):
40        """
41        Override this method to implement your scheduled task logic.
42
43        :param event: The EventBridge event payload
44        :param context: Lambda context object
45        :return: Result of the scheduled task
46        """
47        raise NotImplementedError("Subclasses must implement on_schedule()")

Override this method to implement your scheduled task logic.

Parameters
  • event: The EventBridge event payload
  • context: Lambda context object
Returns

Result of the scheduled task