Continuous Data Sync Across Hetereogeneous Data Persistent Systems


INTRODUCTION

Often a need arises to migrate the data from one System to another system. These Persistent Data Systems, Source and Destination, could be entirely different, from different vendors.
It could be due to change in requirements or technology advancements.

Add to it the changes in above tier which is making use of Persistent System.
To make sure that everything works fine on new system, you may plan to start executing small %age of traffic on New System and calibrate\compare the results with Old Stack results.

For proper calibration and find out the differences in result set from Old and New Systems, the task in hand is to synchronize the Data across 2 systems, being stored differently in different systems.

If that’s the case, this article can help you achieve Data Synchronization across Heterogeneous Systems on an ongoing basis.

This article aims to present the concept to seamlessly move the data incrementally from your current data storage system to different data storage system, be it on Premise or on cloud.

TERMS USED

Batch: Means a collection of data records to be moved across
BatchState: Represents the Status of Batch Transfer, whether it is IN_PROGRESS, FAILED, COMPLETED
Metadata: Represents the batch details which will help in detecting the next batch of data to be synchronized

WHICH COMPONENTS ARE INVOLVED?

Data Source : Actual source containing the original data to be synchronized
Data Destination: This is the Persistent System where you want your data to be moved to
Syncer Component: Responsible to detect the incremental changes and synchronize
Transformer Component: Responsible to transform the source data structure into Destination DS. This will be required if you restructure the data.
Tracker System: Responsible to store the status and the details related to last batch of data being sync’ed

Below diagram depicts the Problem statement of Sync’ing the On-Premise RDBMS data to No-SQL, MongoDB Storage System in AWS Cloud.

Pictorial_HL

WHY INCREMENTALLY?

You may have a huge data in your storage system which you cannot move in a single operation. This could be due to resource constraints of memory, network etc which may hinder the data synchronization.

And what if this data is changed frequently by Biz users. Doing Full synchronization each time can prove to be costly.

How can we reduce this cost? How do we increase the chances of successful data synchronization?

How can we make this process resilient and resume from the point where it stopped or failed the last time?

How about splitting up the Data to be synchronized?

How about defining a batch of data, pull up the data of this batch only and then transfer this data batch?

In order to accomplish this, we need to store the details using which we can determine, how much data we have already sync’ed and what is the next batch of data that we need to sync.

HOW IT WORKS?

Before we go further into steps involved, lets understand the batch Metadata.

WHAT COULD BE BATCH METADATA?
{
"batchOffset" : 0,
"batchSize" : 20,
"migrationStartDateTime" : NumberLong(1486026714594),
"migrationEndDateTime" : NumberLong(1486026718245),
"rulesUpdateDateTimeBeginPickedUpForMigration" : null,
"rulesUpdateDateTimeEndPickedUpForMigration" : null,
"status" : "COMPLETED",
"remarks" : "",
"isIncrementalModeOn" : false
}

batchOffset is the marker. Based on the Status, you can compute from where to begin or resume the process. So, if last batch was successfully sync’ed, next batch to be sync’ed starts with batchOffset+batchSize, or, otherwise, batchOffset in case the last batch failed.

batchSize denotes the no of records you want to sync in a single operation and thus it also tells the amount of data.
It shall neither be too small (otherwise resulting in more roundtrips and more processing time) nor too big (otherwise requiring more resources – memory, network bandwidth etc)

status denotes the sync operation Status of the batch

isIncrementalModeOn denotes whether  sync process is just pulling up the incremental updates (including additions) or not. This does mean that source data had been completely synchronized once.

rulesUpdateDateTimeBeginPickedUpForMigration and rulesUpdateDateTimeEndPickedUpForMigration denote the time boundaries for incremental updates. These are useful in pulling up the incremental changes during this time period.

migrationStartDateTime and migrationEndDateTime are useful for tracking purposes to determine how much time did this batch sync take.

With this information, let’s see the sequence of events which happen to sync the batch of data.

The process is initiated or resumed with Syncer component.

  1. Syncer pulls up the last migrated batch details form the Tracker system.
  2. Using Batch Metadata, it identifies the next batch of data to be synchronized.
    It makes an entry into Tracker System to store the next batch metadata with IN_PROGRESS status.
  3. It then builds the query, pulls up the records as per next batch metadata from Source system. You can use any ORM, hibernate or jpa to get the data.
  4. It then delegates to Transformer to transform the source data structure to destination data structure.
  5. With transformed data, it identifies the data to be created and data to be updated and accordingly splits the data.
  6. It then sends data to Destination System.
  7. Depending upon the operation status, it marks the Batch either as COMPLETED or FAILED status.

And, these sequence of steps go on till there isn’t any more data to sync.

At this point, isIncrementalModeOn is saved as TRUE in the Tracker system and post this, SYNCER System can tweak the query to pull the data records for a time window.

 

This slideshow requires JavaScript.

BATCH PROCESS STATE

In case you want to have Primary, Secondary Sync Processes so as to guarantee the High Availability of the Sync Process, we need to maintain and detect the various states of a Sync Process. With this data, we can ensure that at a time, no 2 sync processes are running.

BatchProcessStates

BATCH STATES aka STATUS

Every individual batch of data goes through few states. Below diagram represents the various states, a batch goes through in a syncing process.

BatchStates

THINGS TO KEEP IN MIND

  • Idempotency and Duplicacy Prevention:

We are transferring a batch of records. Therefore, it may happen that batch gets Partially Succeeded, meaning that few records got sync’ed and rest failed due to any reasons. In such cases, if you retry posting the data, it may result into same data getting saved twice or more. To prevent this, query what data has to be inserted and what data has to be updated. You can make use of indexes or similar concept.

  • Timezone Differences:

Syncer system and Data Source System can be in different timezones or source data may be stored in a specific Timezone. So, if you are pulling up records based on time window, make sure that timezone information is converted into source system before querying.

  • Security:

For sensitive data, you can enable SSL/ TLS over transport layer. Also, You may want to have authentication and authorization enabled on both data ends: Source and Destination Storage Systems.

  • Hard Deletes:

Soft Deletes like making biz rule inactive or likewise will be taken care by Syncer process. What if tuple is hard deleted from a source storage. For Hard deletes, you may have to use Triggers to catch the deleted tuples.

  • Alert Mechanism to detect Stopped Sync Process:

Sync process can also fail due to any reason. Without any alerting mechanism, it may go unnoticed and these Heterogeneous Systems can go out of sync. To prevent such circumstances, log start, stop events into some sinks like Splunk and have Alerts on them.

WHAT QoS PARAMETERS ARE IMPLEMENTED?

  • Eventual Consistency
  • Guaranteed Sync
  • FaultTolerance
  • Idempotency
  • Also, Updates while Sync are not missed

HOSTING MECHANISM

There can be multiple ways to host a Syncer process. Depending upon the traffic your consuming application takes, you can

  • Either host the syncer process under the same application which relies on this data
  • Or, host it under a separate process and schedule it using AWS Lambda or AWS Batch

ALTERNATIVES

Amazon DMS also offers the ongoing data migration however it supports only selected Storage Systems. At the time of implementing this, Amazon DMS Offering does not have MSSQL –> MongoDB supported.

If you want to sync data to AWS RDS, Amazon DMS can be used.

Also, if you have huge data ranging in hundreds of TBs and a limited network bandwidth and wants to get this done quickly and only for once, AWS Snowball is another offering you can use.

Advertisements

Health Checks : Detection, Reporting, Configuration of Server Instance\Process Health Status


In this article, i will talk about the Running Instance Health, what can represent the Health, how can we detect the health and how can we use this health information to make the System resilient.

Health, basically, defines how well an instance is responding. Health can be:

  • UP
  • DOWN

REAL LIFE PROBLEM
Imagine you reach a Bank and found it being closed. Or, Imagine you are standing in a bank counter queue and waiting to be served. By the time your turn arrives, person sitting at a counter goes away. May be that person is not feeling well.

How would you feel in such a situation? Irritated? Frustrated?
What if you would have been told upfront about this situation? Your time would not have wasted. You would not have felt bad.

But what if someone else takes a job of that counter and start serving you.

Now, imagine a pool of servers hosting a site which allows you to upload a video, say http://www.Youtube.com. You are trying to upload a small video of yours on a site and every time you try to upload, you get some error after sometime and video could not be uploaded.

Basically, Software Applications like http://www.youtube.com run on machines – be it physical or virtual in order to get desired results. Executing these applications require machine’s local resources like memory, cpu, network, disk etc or other external dependencies to get things done.
These resources are limited and executing multiple tasks concurrently put a risk of contention and exhaustion.
It may happen that enough resources are not available for execution and thus the task execution will eventually fail.

In order to make the system Resilient, one of the things that can be done is Proactively determine the Health Status and  report it – to LoadBalancer or to Service Discoverers etc whenever asked, to prevent or deal with the failures.

Reporting a health Status with proper Http Status Codes like 200 for UP and 500 for DOWN can be quite useful.

WHAT CAN DEFINE INSTANCE\PROCESS HEALTH?
Below is a list of some common metrics that can be useful in detecting the health of an instance:

  • Pending Requests
    • Container Level
    • Message Level
  • Latency Overhead – Defined as the TP99 latency added by this application/layer
    • TP99 or TP95 or TP75 as per your Service SLAs
  • Resources
    • % Memory Utilization – Leading towards OOM
    • % CPU Utilization
      • Host Level
      • Process Level
    • Number of Threads
  • Any Business KPI
  • External Dependencies Failures optioanlly

Identifying a list of above criterias is important as well as choosing the correct Threshold or Saturation Values as well.
Too low values or high values can result into system unreliability.

WHY IS IT IMPORTANT?

System is usually expected to be highly available and reliable. High Availability can be achieved through Redundancy where in multiple server instances are running in parallel, processing the requests and thus the demand.

What if One or more instances are running out of resources and thus not able to meet the demand.

Detecting such a state at an appropriate time and taking an action can help in achieving High Availability and Reliability of the System.

It helps in making the system resilient against failures.

ACTIONS ON DETECTING UNHEALTHY

  • REPLENISH thru REBOOT: If you have limited servers pool capacity and cannot increase the capacity, the unhealthy machine has to be restarted\rebooted in order to get it back to healthy state.
  • REPLACE: If you have unlimited server capacity or using Cloud Computing Framework – AWS, Azure, Google Cloud etc, rather than rebooting the machine, you have an option of starting a new machine and killing and removing the old unhealthy machine from processing the requests.

Once an instance is detected unhealthy, instance shall be replenished or replaced.
Either that unhealthy instance shall be rebooted to get it to Healthy state or be replaced with a new server which is put behind LoadBalancer and old being removed from LoadBalancer.

OTHER CONSIDERATIONS

  • Do enable Connection Draining
  • Do configure Connection Draining timeout
  • Enable HealthCheck Response Caching
  • Scale before Declaring UnHealthy
  • Prefer Recent Trend before Declaring UnHealthy – configure unHealthy, healthy Thresholds

These settings prevent the In-Flight requests to be aborted prematurely.
Without these settings, data can be inconsistent state

  • Report Health with Proper Http Status Codes
    • 200 for UP
    • 500 for DOWN

CODE IMPLEMENTATION

Basically, what we need is to peek into current metrics and evaluate the Health as UP or DOWN

So, we need an HealthEvaluator, List of HealthCriteria, Some Operators and Health Definition.

public interface IHealthEvaluator {
    /**
     * Return an indication of health.
     * @return the health after consulting different metrics
     */
    Health health();
}
public final class CompositeMetricBasedHealthEvaluator implements IHealthEvaluator {
    /**
     * Instantiates an object of CompositeMetricBasedHealthEvaluator
     * @param healthCriteriaList List containing Metrics to be used for Health Evaluation
     * @param metricReadersList List containing Metric Readers
     */
    public CompositeMetricBasedHealthEvaluator(List<HealthCriteria<Number>> healthCriteriaList,
                                               List<MetricReader> metricReadersList) {
        this(healthCriteriaList, metricReadersList, null);
    }

    /**
     * Instantiates an object of CompositeMetricBasedHealthEvaluator
     * @param healthCriteriaList List containing Metrics to be used for Health Evaluation
     * @param metricReadersList List containing Metric Readers
     * @param metricsList List containing the Public Metrics
     */
    public CompositeMetricBasedHealthEvaluator(List<HealthCriteria<Number>> healthCriteriaList,
                                               List<MetricReader> metricReadersList,
                                               List<PublicMetrics> metricsList) {
        this.healthCriteriaList = CollectionUtils.isNotEmpty(healthCriteriaList)
                ? ListUtils.unmodifiableList(healthCriteriaList) : ListUtils.EMPTY_LIST;
        this.metricReaderList = metricReadersList;
        this.metricsList = metricsList;
    }

    /**
     * Return an indication of health.
     * @return the health after consulting different metrics
     */
    @Override
    public Health health() {
        Health.Builder curHealth = Health.up();
        Status status = Status.UP;
        for (HealthCriteria healthCriteria : this.healthCriteriaList) {
            String metricName = healthCriteria.getMetricName();
            if (StringUtils.isNotBlank(metricName)) {
                Metric metric = this.getFirstMatchingMetric(metricName);
                if (metric != null) {
                    status = evaluate(healthCriteria, metric);
                    curHealth.withDetail(metricName, String.format("Value:%s, Status:%s", metric.getValue(), status));
                } else {
                    curHealth.withDetail(metricName, Status.UNKNOWN);
                }
            }
        }

        curHealth.status(status);

        return curHealth.build();
    }

    private Metric getFirstMatchingMetric(String name) {
        Object metricProvider = this.selectedMetricProvider.get(name);

        if (metricProvider instanceof MetricReader) {
            return find((MetricReader) metricProvider, name);
        } else if (metricProvider instanceof PublicMetrics) {
            return find((PublicMetrics) metricProvider, name);
        }

        // Preference to use MetricReaders
        if (CollectionUtils.isNotEmpty(this.metricReaderList)) {
            for (MetricReader metricReader : this.metricReaderList) {
                Metric<?> metric = find(metricReader, name);
                    if (metric != null) {
                        this.selectedMetricProvider.put(name, metricReader);
                        return metric;
                    }
            }
        }

        if (CollectionUtils.isNotEmpty(this.metricsList)) {
            for (PublicMetrics publicMetrics : this.metricsList) {
                Metric<?> metric = find(publicMetrics, name);
                if (metric != null) {
                    this.selectedMetricProvider.put(name, publicMetrics);
                    break;
                }
            }
        }

        return null;
    }

    private static Status evaluate(HealthCriteria healthCriteria, Metric metric) {
        int result = compare(metric.getValue(), healthCriteria.getThresholdOrSaturationLevel());
        ComparisonOperator op = healthCriteria.getOperator();

        if ((ComparisonOperator.EQUAL.equals(op) && result != 0) ||
                (ComparisonOperator.LESS_THAN.equals(op) && result >= 0) ||
                (ComparisonOperator.LESS_THAN_EQUAL.equals(op) && result > 0) ||
                (ComparisonOperator.GREATER_THAN.equals(op) && result <= 0) ||
                (ComparisonOperator.GREATER_THAN_EQUAL.equals(op) && result < 0)) {
            return Status.DOWN;
        }

        return Status.UP;
    }

    private static Metric<?> find(MetricReader reader, String name) {
        try {
            return reader.findOne(name);
        } catch (RuntimeException ex) {
            // Ignore the Runtime exceptions
            return null;
        }
    }

    private static Metric<?> find(PublicMetrics source, String name) {
        return (Metric<?>) CollectionUtils.find(source.metrics(),
                (met) -> StringUtils.equalsIgnoreCase(((Metric) met).getName(), name));
    }

    private static int compare(Number n1, Number n2) {
        if (n1 != null && n2 != null) {
            return Double.compare(n1.doubleValue(), n2.doubleValue());
        }

        if (n1 != null) {
            return 1;
        }

        if (n2 != null) {
            return -1; // Even for -ive numbers
        }
        return 0;
    }

    private final List<HealthCriteria<Number>> healthCriteriaList;
    private final List<PublicMetrics> metricsList;
    private final List<MetricReader> metricReaderList;
    private final Map<String, Object> selectedMetricProvider = new HashMap<>();
}

HealthCriteria defines 3 things: what has to be checked, it’s expected value(or a range) and Operator. Value can be integer, float or decimal etc

public class HealthCriteria<TInput extends Number> {
    /**
     * Gets the Operator
     * @return Operator to be used for health evaluation
     */
    public ComparisonOperator getOperator() {
        return operator;
    }

    /**
     * Sets the Operator
     * @param operator Operator to be used for health evaluation
     */
    public void setOperator(ComparisonOperator operator) {
        this.operator = operator;
    }

    /**
     * Gets the Threshold or Saturation value against which health evaluation to be done
     * @return Threshold or Saturation value
     */
    public TInput getThresholdOrSaturationLevel() {
        return thresholdOrSaturationLevel;
    }

    /**
     * Sets the Threshold or Saturation value against which health evaluation to be done
     * @param thresholdOrSaturationLevel Threshold or Saturation value
     */
    public void setThresholdOrSaturationLevel(TInput thresholdOrSaturationLevel) {
        this.thresholdOrSaturationLevel = thresholdOrSaturationLevel;
    }

    /**
     * Gets the name of the metric to be used for health evaluation
     * @return Metric name
     */
    public String getMetricName() {
        return metricName;
    }

    /**
     * Sets the name of the metric to be used for health evaluation
     * @param metricName Metric name
     */
    public void setMetricName(String metricName) {
        this.metricName = metricName;
    }

    private String metricName;
    private TInput thresholdOrSaturationLevel;
    private ComparisonOperator operator;
}

@Configuration
@ConfigurationProperties("healthIndicator")
public class HealthCriteriaList {
    public List<HealthCriteria<Number>> getCriterias() {
        return criterias;
    }

    public void setCriterias(List<HealthCriteria<Number>> criterias) {
        this.criterias = criterias;
    }

    private List<HealthCriteria<Number>> criterias;
}

Some basic Operators that can be supported are:

public enum ComparisonOperator {
    EQUAL,
    LESS_THAN,
    LESS_THAN_EQUAL,
    GREATER_THAN ,
    GREATER_THAN_EQUAL;
}

Using the above code, you can evaluate the Health based on metrics and plug it into any application, be it SPRINGBOOT or DROPWIZARD or CXF etc

SPRINGBOOT ADAPTER like below can be used which can easily plug into and start evaluating the health based on metrics.

public final class MetricBasedSpringBootAdapter implements HealthIndicator {
    /**
     * Instantiates an object of MetricBasedSpringBootAdapter
     * @param healthEvaluator Reference to an instance of IHealthEvaluator impl
     */
    public MetricBasedSpringBootAdapter(IHealthEvaluator healthEvaluator) {
        Assert.notNull(healthEvaluator, "Underlying HealthEvaluator");
        this.underlyingHealthEvaluator = healthEvaluator;
    }

    /**
     * Return an indication of health.
     * @return the health for Server Instance after consulting different metrics
     */
    @Override
    public Health health() {
        return this.underlyingHealthEvaluator.health();
    }

    private final IHealthEvaluator underlyingHealthEvaluator;
}

HOW IT WORKS IN SPRINGBOOT?

Spring Boot includes a number of built-in endpoints.
One of the endpoints is the health endpoint which provides basic application health information.
By default, the health endpoint is mapped to /health

On invoking this endpoint, Health information is collected from all HealthIndicator beans defined in your
ApplicationContext and based on Health Status returned by these HealthIndicators, Aggregated Health Status is returned.

Spring Boot includes a number of auto-configured HealthIndicators and allows to write our own.

Since we keep track of certain metrics in our applications, we wanted an ability to evaluate Health based on certain
Metrics’ values. For e.g., if Number of Thread exceed ‘n’, Health shall be reported as DOWN

For this purpose, CompositeMetricBasedHealthEvaluator is implemented.
It relies on either MetricReaders or PublicMetrics to get the Metrics’s current values and evaluate the
Health accordingly.

It reports the Individual Health of all configured Health indicator Criterias and reports Health as DOWN If any of
them is Down.

For Unavailable Metric, Health cannot be determined and thus reported as UNKNOWN for that specific metric.

STEPS TO ENABLE IN SPRINGBOOT

* Enable Health Endpoint if not enabled already
* Configure custom endpoint name optionally and other parameters like Caching of results etc
* Configure MetricReader(s) and\or PublicMetric(s)
* Configure the HealthIndicator Metric Criterias
* Instantiate CompositeMetricBasedHealthEvaluator
* Inject the MetricReaders and\or PublicMetrics and Criterias configured above
* Instantiate and Inject MetricBasedSpringBootAdapter into Spring Application Context
* Inject CompositeMetricBasedHealthEvaluator while instantiating
* Disable\Enable Auto-Configured HealthIndicators

That’s all need to be done to enable Health Evaluation using Metrics.

HOW TO ENABLE HEALTH ENDPOINT?

One of the ways is to enable it through Application Configuration YAML file.
In your application.yml file, put the following configuration:

endpoints:
health:
enabled: true
time-to-live: 1000

With the above configuration, health point is enabled and also results will be cached for 1000ms.
Default time-to-live = 1000ms.

HOW TO CONFIGURE HEALTH INDICATOR METRIC CRITERIAS?

1) **VIA APPLICATION CONFIGURATION YAML file**

One of the ways is to configure it in Application Configuration YAML file itself.
In your application.yml file, put the following configuration:

healthIndicator:
criterias:
- metricName: threads
thresholdOrSaturationLevel: 100
operator: LESS_THAN
- metricName: anotherMetricNameGoesHere
thresholdOrSaturationLevel: 100.23
operator: ANY_COMPARISON_OPERATOR(EQUAL, LESS_THAN, LESS_THAN_EQUAL, GREATER_THAN, GREATER_THAN_EQUAL)

With the above configuration, 2 Criterias are defined and **HealthCriteriaList** object gets instantiated using
Configuration Annotation.

Here, Thread Criteria specifies that for Health to be **UP**, number of threads < 100.
If NumberOfThreads >= 100, Health will be reported as **DOWN**

Likewise, more criterias can be defined.

Note that
* **metricName** can contain ‘.’ character as well.
* **thresholdOrSaturationLevel** can have any Valid Number, be it Integer or Decimal Number
* **operator** can be any valid value from ComparisonOperator enum.

2) **Same Configuration can be done through code**

List<HealthCriteria<Number>> criterias = new ArrayList<>();

HealthCriteria<Number> criteria = new HealthCriteria<>();
final String expMetricName = "threads";
criteria.setMetricName(expMetricName);
criteria.setThresholdOrSaturationLevel(100);
criteria.setOperator(ComparisonOperator.LESS_THAN);

criterias.add(criteria);

HOW TO PLUGIN MetricBasedSpringBootAdapter?

MetricBasedSpringBootAdapter implements HealthIndicator interface. Thus, simply injecting it into
Spring Application Context will plugin this component for Health Evaluation.

The below configuration instantiates MetricBasedSpringBootAdapter with MetricReaders only.
Both Parameters, healthCriteriaList and metricReaderList are injected automatically through Spring application
context. This happens due to auto configuration.

@Bean
public MetricBasedSpringBootAdapter metricBasedHealthIndicator(
HealthCriteriaList healthCriteriaList,
List<MetricReader> metricReaderList) {
return new MetricBasedSpringBootAdapter(healthCriteriaList.getCriterias(),
metricReaderList);
}

OR,

@Bean
public MetricBasedSpringBootAdapter metricBasedHealthIndicator(
List<HealthCriteria> healthCriteriaList,
List<MetricReader> metricReaderList) {
return new MetricBasedSpringBootAdapter(healthCriteriaList, metricReaderList);
}

OR,

@Bean
public MetricBasedSpringBootAdapter metricBasedHealthIndicator(
HealthCriteriaList healthCriteriaList,
List<MetricReader> metricReaderList,
List<PublicMetrics> publicMetricsList) {
return new MetricBasedSpringBootAdapter(healthCriteriaList.getCriterias(),
metricReaderList, publicMetricsList);
}

The above configuration can be useful wherein MetricReader is not available to read the Metric but Metric is
available publicly through PublicMetrics interface.
With the above configuration, all parameters are injected automatically by Spring.

Things to Note
* Name of Bean minus Suffix HealthIndicator (metricBased) is what is reported as HealthIndicator Name.
* AutoConfiguration of MetricReaders, PublicMetrics or Configuration could be disabled. If this is the case, either
enable AutoConfiguration or manually instantiate MetricReaders, PublicMetrics etc
* PublicMetrics interface can be expensive depending upon the number of metrics being maintained. Use it only if
Custom MetricReader cannot be written or Metrics are small in number.