Tag Archives: AWS

My First Lambda – Not Just Hello World


Amazon Web Services aka AWS provides many SaaS products.
In this post, I want to share my learnings and experiences while working on one of the SaaS Products called LAMBDA.

I’ll begin with explaining our use case a bit and then implementing and Deploying a Lambda.

USE CASE

I was working on designing and implementing on a requirement to ticket the Air Bookings. Without Ticketing, user cannot board a flight and thus fly.

MORE ABOUT TICKETING PROCESS

Ticketing is an orchestration of series of steps, some require Biz Logic evaluation and some require interacting with different 3rd Party Services multiple times over the network.

This process can be seen as event driven, can be done asynchronously with retry capabilities, scheduling capabilities, involving interaction with 3rd Party Services over the network.

It has to be completed within time constraints as per Airlines\GDSes otherwise user cannot fly.

After gathering requirements,  it seems to be a usecase of building a BOT, a Ticketing Bot, more specifically and Executor-Schedulor-Supervisor-Agent Pattern fitting very well technically.

WHAT IS “EXECUTOR-SCHEDULOR-SUPRVISOR-AGENT?

It’s a Pattern where in roles and responsibilities are clearly separated out to different actors\components.
Executor, Supervisor, Agent represent different Blocks and each is responsible to perform clearly defined task.

Executor is responsible to execute the Orchestration and likewise for other. You may choose to use Persistent Workflow frameworks, Queues for orchestration execution.

WHERE DOES LAMBDA FIT IN OUR CASE?

Ticketing Process has to be completed for multiple bookings. After all, multiple users are doing bookings on our site.

This demands multiple executors to be running in parallel and executing an orchestration independently with no interference.

Obviously, you will want that each executor picks a different Booking for ticketing.
For this, you will have synchronization and other checks in place so that once booking is owned by any executor, it does not get  executed by another Executor.

Let’s say, we have a strategy that once a booking is picked by an Executor, executor updates a workItem with it’s ownership, timestamp and changes it’s status to In_Progress to reflect that Ticketing Process has been kicked in.

Now think of a scenario where in

  • an executor(s) (Server) performing a ticketing process, crashes in the middle of the process.
  • Server has been put Out of Rotation due to being Unhealthy
  • Or, you want to deploy the incremental changes and that may involve halting\interrupting the currently executing Ticketing Processes.

The 3rd scenario can be dealt with publishing Events to reach to a consistent state and stop further processing.

But, what about other Scenarios ? In that, Ticketing Process(es) will appear to be running with In_Progress status while that’s not the case.

How will you ensure that those Processes get completed later?

We will for sure want to complete the Ticketing Process at any cost.

What if we have something which can detect such Stuck Bookings and reprocess them from the last checkpoint.

Lets just focus on Supervisor.

What is the role of “Supervisor”?

Supervisor is a component made responsible to detect such Stuck Bookings and queue them for further re-processing. Note that it does not start executing those processes, instead it just re-queues them so that an executor an pick it up again.

In our case, Supervisor has to connect to Queues\Data Stores hosted in VPC.
Ok. What are the other expectations from this Supervisor?

  1. It has to be Available. You would not want your Supervisor to be down for a long time. However, you would want that when 
  2. A Single Supervisor can fulfill the need. No need to run the multiple Supervisors at a time.
  3. Supervisor running periodically.
  4. Supervisor running in background
  5. Supervisor has no state attached to it

All the above expectations made LAMBDA a good Fit in our case.

Enough of the story 🙂 Before you start cursing me, let’s start building a Lambda.

LAMBDA

Lambda is a function that can be executed in AWS Cloud environment based on certain trigger policies. Trigger can be a scheduled timed event or S3 event or likewise.

Refer AWS for more information.

BUILDING AND DEPLOYING LAMBDA

Building a Lambda is simple. It requires a function which has to be executed based on a trigger policy. As such, Lambda can be in Java or Python or Node till this time.

Lets Build Lambda in Java.

  1. Create a Class (any name) MyFirstLambda and a function handler (any name), handler as below:
    public class Supervisor {
        public void queueStuckOrdersForReprocessing(Context context) {
            // Implement this function as per tasks need to be accomplished
        }
    


    Microsoft (CPS) IN

  2. Implement handler function keeping in mind the task you want to accomplish. In our case, we wanted to detect and queue the Stuck Bookings for re-processing.
    public class Supervisor {
        public void queueStuckOrdersForReprocessing(Context context) {
            LambdaLogger logger = context.getLogger();
            logger.log("Supervisor Cycle Started");
    
            // Problem: Time Consuming while actual task is pretty small
            // Problem: How can i Initialize based on environment or profile like Spring Profiles
            QueueingService queueingService = this.initialize();
    
            logger.log("Supervisor Initialized");
    
            // Problem: How can i execute Multiple Tasks in Parallel
            this.buildTask(this.enrichedQueueingService, "Enriched").run();
    
            logger.log("Supervisor Cycle Completed");
        }
    
        private String getProperty(String name) {
            return System.getenv(name);
        }
    
        private QueueingService initialize() {
           return new QueueingService() {
                    public QueueingService() {
                       // Initialize, it could be Time Consuming.
                       // You may be using MongoDB as a Queue and initializing might take some time
                    }
    
                    /**
                         * Moves the products stuck in queue1 for past 'timeInSeconds' seconds to queue2
                         * @param queue1 Current Queue the Product is in
                         * @param timeInSeconds Time in seconds since product is not acted upon
                         * @param queue2 New Queue the Product shall be moved to
                         * @return No of Products got reset
                         */
                        @Override
                        public int move(String queue1, int timeInSeconds, String queue2) {
                            // your implementation here
                        }
            );
        }
    
        // Problem: Logger has to be passed everywhere we want to log
        private Runnable buildTask(QueueingService queueingService, LambdaLogger logger) {
            return new Runnable() {
                @Override
                public void run() {
                    int noOfProducts =
                            queueingService.move(IN_PROGRESS,
                                    Integer.parseInt(getProperty("IN_PROGRESS_AGE")),
                                    REPROCESS);
    
                    logger.log(
                            String.format(
                                    "Supervisor requeued '%s' Products for ReProcessing",
                                    noOfProducts));
                }
            };
        }
    }
             

    The above code works. However, it can be refactored and optimized further.

    Let’s assume that Queues are maintained in a Database, MongoDB (No-Sql).
    Initializing a MongoDB can take a lot of time while the actual task to be performed may not be that TimeConsuming.

    You may ask that Is there a way we can initialize just once and thus be more performant and consume lesser resources?

    Fortunately, there is.

    AWS says that Lambda container can be reused for subsequent invocations.

    Note the words can be. AWS does not guarantee but there is a possibility.

    If that’s the case, to avoid re-initialization, how about maintaining Fields and use them. We can simple maintain QueueingService as a field\state in Supervisor class and use it.

    Below is the refactored code.

    public class Supervisor {
        private boolean isInitialized = false;
        private QueueingService queueingService;
        private LambdaLogger logger;
    
        public void queueStuckOrdersForReprocessing(Context context) {
            logger = context.getLogger();
            logger.log("Supervisor Cycle Started");
    
            // Fields are initialized and thus on reuse, will not be re-initialized
            // Problem: How can i Initialize based on environment or profile like Spring Profiles
            this.initialize();
    
            logger.log("Supervisor Initialized");
    
            // Problem: How can i execute Multiple Tasks in Parallel
            this.buildTask(this.enrichedQueueingService, "Enriched").run();
    
            logger.log("Supervisor Cycle Completed");
        }
    
        private String getProperty(String name) {
            return System.getenv(name);
        }
    
        private void initialize() {
           if (!this.isInitialized) {
           this.queueingService = new QueueingService() {
                    public QueueingService() {
                       // Initialize, it could be Time Consuming.
                       // You may be using MongoDB as a Queue and initializing might take some time
                    }
    
                    /**
                         * Moves the products stuck in queue1 for past 'timeInSeconds' seconds to queue2
                         * @param queue1 Current Queue the Product is in
                         * @param timeInSeconds Time in seconds since product is not acted upon
                         * @param queue2 New Queue the Product shall be moved to
                         * @return No of Products got reset
                         */
                        @Override
                        public int move(String queue1, int timeInSeconds, String queue2) {
                            // your implementation here
                        }
            );
           this.isInitialized = true;
         }
        }
    
        private Runnable buildTask(QueueingService queueingService) {
            return new Runnable() {
                @Override
                public void run() {
                    int noOfProducts =
                            queueingService.move(IN_PROGRESS,
                                    Integer.parseInt(getProperty("IN_PROGRESS_AGE")),
                                    REPROCESS);
    
                    logger.log(
                            String.format(
                                    "Supervisor requeued '%s' Products for ReProcessing",
                                    noOfProducts));
                }
            };
        }
    }
             

    Great. I still have another problem. I want to execute multiple tasks but not Sequentially, in parallel instead.

    AWS does allow creating Threads or ThreadPool(s) as long as CPU, Memory Limits are not crossed. Refer AWS.

    Below code has a simple change to create a ThreadPool of size 1. Just change the size to create more threads.

    public class Supervisor {
        private boolean isInitialized = false;
        private QueueingService queueingService;
        private LambdaLogger logger;
    
        public void queueStuckOrdersForReprocessing(Context context) {
            logger = context.getLogger();
            logger.log("Supervisor Cycle Started");
    
            // Fields are initialized and thus on reuse, will not be re-initialized
            // Problem: How can i Initialize based on environment or profile like Spring Profiles
            this.initialize();
    
            logger.log("Supervisor Initialized");
    
            ExecutorService executor = Executors.newFixedThreadPool(1);
    
            Future enrichedSupervisor = executor.submit(this.buildTask(this.enrichedQueueingService, "Enriched"));
    
            while (!(enrichedSupervisor.isDone() && supervisor.isDone())) {
                    // spin and wait
                    Thread.sleep(1000);
            }
    
            logger.log("Supervisor Cycle Completed");
        }
    
        private String getProperty(String name) {
            return System.getenv(name);
        }
    
        private void initialize() {
           if (!this.isInitialized) {
           this.queueingService = new QueueingService() {
                    public QueueingService() {
                       // Initialize, it could be Time Consuming.
                       // You may be using MongoDB as a Queue and initializing might take some time
                    }
    
                    /**
                         * Moves the products stuck in queue1 for past 'timeInSeconds' seconds to queue2
                         * @param queue1 Current Queue the Product is in
                         * @param timeInSeconds Time in seconds since product is not acted upon
                         * @param queue2 New Queue the Product shall be moved to
                         * @return No of Products got reset
                         */
                        @Override
                        public int move(String queue1, int timeInSeconds, String queue2) {
                            // your implementation here
                        }
            );
           this.isInitialized = true;
         }
        }
    
        private Runnable buildTask(QueueingService queueingService) {
            return new Runnable() {
                @Override
                public void run() {
                    int noOfProducts =
                            queueingService.move(IN_PROGRESS,
                                    Integer.parseInt(getProperty("IN_PROGRESS_AGE")),
                                    REPROCESS);
    
                    logger.log(
                            String.format(
                                    "Supervisor requeued '%s' Products for ReProcessing",
                                    noOfProducts));
                }
            };
        }
    }
    

    Another problem, i have. I have different environments set up and in each environment, i have different settings, say mongodb cluster is different.
    I want to package resource files in jar and load them as per environment rather than configuring each setting as an environment variable.

    How can i initialize based on an environment?

    Once again, AWS comes to a rescue. It provides an ability to specify environment variables while configuration and these environment variables get passed to Lambda Function as Environment Variables on each execution.
    What if we set the Environment and based on it’s value we load the resource file like Spring loads the configuration based on Profile.

    Let’s see how can this be achieved.


    Microsoft (CPS) IN

    public class Supervisor {
        private static String MONGODB_URI_SETTINGNAME = "mongodb.uri";
        private static String IN_PROGRESS_AGE_SETTINGNAME = "inprogress.ageInSomeTimeUnit";
        private static String ENVIRONMENT_SETTINGNAME = "environment";
    
        private boolean isInitialized = false;
        private String environment;
        private Properties properties;
        private QueueingService queueingService;
        private LambdaLogger logger;
    
        public void queueStuckOrdersForReprocessing(Context context) {
            logger = context.getLogger();
            logger.log("Supervisor Cycle Started");
    
            // Fields are initialized based on environment and thus on reuse, will not be re-initialized
            this.initialize();
    
            logger.log("Supervisor Initialized");
    
            ExecutorService executor = Executors.newFixedThreadPool(1);
    
            Future enrichedSupervisor = executor.submit(this.buildTask(this.enrichedQueueingService, "Enriched"));
    
            while (!(enrichedSupervisor.isDone() && supervisor.isDone())) {
                    // spin and wait
                    Thread.sleep(1000);
            }
    
            logger.log("Supervisor Cycle Completed");
        }
    
        private String getSystemEnv(String name) {
            return System.getenv(name);
        }
    
        // This is to get the profile based properties
        private String getProperty(String name) {
            return this.properties.getProperty(name);
        }
    
        // This does the initialization
        private void initialize() {
           if (!this.isInitialized) {
               this.initializeProps();
               this.queueingService = new QueueingService() {
                    public QueueingService() {
                       // Initialize, it could be Time Consuming.
                       // You may be using MongoDB as a Queue and initializing might take some time
                    }
    
                    /**
                         * Moves the products stuck in queue1 for past 'timeInSeconds' seconds to queue2
                         * @param queue1 Current Queue the Product is in
                         * @param timeInSeconds Time in seconds since product is not acted upon
                         * @param queue2 New Queue the Product shall be moved to
                         * @return No of Products got reset
                         */
                        @Override
                        public int move(String queue1, int timeInSeconds, String queue2) {
                            // your implementation here
                        }
            );
           this.isInitialized = true;
         }
        }
    
        private void initializeProps() throws IOException {
            this.initializeEnvironment();
            if (this.properties == null) {
                final String propFileName = String.format("application-%s.yml", this.environment);
                this.properties = new Properties();
                this.properties.load(Supervisor.class.getClassLoader().getResourceAsStream(propFileName));
            }
        }
    
        private void initializeEnvironment() {
            this.environment = getSystemEnv(ENVIRONMENT_SETTINGNAME);
            if (StringUtils.isBlank(this.environment)) {
                this.environment = "prod";
            }
        }
        private Runnable buildTask(QueueingService queueingService) {
            return new Runnable() {
                @Override
                public void run() {
                    int noOfProducts =
                            queueingService.move(IN_PROGRESS,
                                    Integer.parseInt(getProperty(IN_PROGRESS_AGE_IN_SECONDS_SETTINGNAME)),
                                    REPROCESS);
    
                    logger.log(
                            String.format(
                                    "Supervisor requeued '%s' Products for ReProcessing",
                                    noOfProducts));
                }
            };
        }
    }
    
  3. Package your code into Jar using mvn or tool as per your preferences.

DEPLOYING LAMBDA

  1. Using AWS CMD CLI (Command Line Interface) to upload jar and other required/optional configurations
  2. Through AWS console where in you can provide different configurations

HOW CAN I\WE ACCOMPLISH THIS?

  1. We use different environments like Test environment, Stress etc before releasing to PROD and in each environment, we want to have different settings. How can we pass different settings like we can activate different Profiles in Spring?      [ANSWER]: AWS allows to configure and pass environment variables to a Lambda on execution. While configuring a Lambda Function, define what environment variables need to be passed to your Lambda and then based on those environment variables, do things.
  2. Our Lambda needs to connect to components\services deployed in our VPC. On execution, Lambda function is not able to connect to that component.        [ANSWER]: AWS considers and enforces Security . To allow connections, configure Lambda with proper SubnetIds of your VPCs and permissions.
  3. Our Lambda is not Event driven. It’s based on files wriiten in S3. How can we pass event data to Lambda?
    [ANSWER]: This blog focussed on Lambda with no event data, however AWS supports different events. Refer AWS. In order to pass Event Data to Lambda Function, handler function can accept more parameters. Parameter can even be of Custom Type and AWS takes care of Serialization and De-serialization.

THINGS TO KEEP IN MIND

  1. AWS puts restrictions on executing Lambda – be it a size of the jar or constraints on resources like cpu, memory etc. Always check restrictions on AWS Site before considering Lambda.
  2. Make sure that you understand the billing. Lambda is billed based on resources usage and the total time for execution.

FEW MORE TIPS

  • Give your Lambda a Good Name
  • Tag Your Lambda for proper identification and enforcing security policies
  • Do not package redundant dependencies. It can make your package heavy and may not be even fit to be run as Lambda.
  • Have CloudWatch Metrics’ based Alarms in place
  • Ensure that you do not over-configure your Lambda with all SubnetIds of your VPC.
  • When deploying your Lambda in VPC, Scaling has to be thought of properly
  • Have proper Logging for debugging and tracing purposes. Logs are available in CloudWatch as well
Advertisement