3rd Party Persistence

Write-through or read-through data to and from a 3rd party persistent storage.

❗️

This is a legacy Apache Ignite documentation

The new documentation is hosted here: https://ignite.apache.org/docs/latest/

Overview

Ignite can be used as a caching layer (aka in-memory data grid) above an existing 3rd party database such as a Relational Database Management System (RDBMS) or Apache Cassandra. This mode is used to accelerate the underlying database that persists the data. Ignite provides an out-of-the-box implementation for reading and writing database records from any RDBMS and Apache Cassandra. For other NoSQL databases for which read-through and write-through functionality is not available off-the-shelf, Ignite provides APIs to implement custom cache store.

JCache specification comes with APIs for javax.cache.integration.CacheLoader and javax.cache.integration.CacheWriter which are used respectively for read-through and write-through from and to an underlying persistent storage, such as an RDBMS like Oracle or MySQL, or NoSQL database like MongoDB or Couchbase. In addition to key-value operations, Ignite writes through results of its SQL INSERT, UPDATE, and MERGE queries. However, note that Ignite SELECT queries never read through data from a 3rd party persistence.

633

While Ignite allows you to configure the CacheLoader and CacheWriter separately, it is very awkward to implement a transactional store within 2 separate classes, as multiple load and put operations have to share the same connection within the same transaction. To mitigate that, Ignite provides the org.apache.ignite.cache.store.CacheStore interface which extends both CacheLoader and CacheWriter.

📘

Transactions

CacheStore is fully transactional and automatically merges into the ongoing cache transaction.

Read-Through and Write-Through

Providing a proper CacheStore implementation is important whenever read-through or write-through behavior is desired. Read-through means that the data will be read from the underlying persistent store whenever it’s not available in cache. Note that this is true only for cache get operations; Ignite SELECT queries never read through data from a 3rd party persistence. To execute select queries, data must be preloaded from the database into Ignite caches (explained below).
Write-through means that the data will be automatically persisted whenever it is updated in cache. All read-through and write-through operations will participate in overall cache transactions and will be committed or rolled back as a whole.

To configure read-through and write-through, you need to implement the CacheStore interface and set the cacheStoreFactory as well as readThrough and writeThrough properties of CacheConfiguration, as shown in the examples below.

Write-Behind Caching

In a simple write-through mode, each cache put and remove operation will involve a corresponding request to the persistent store and therefore the overall duration of the cache update might be relatively long. Additionally, an intensive cache update rate can cause an extremely high storage load.

For such cases, Ignite offers an option to perform an asynchronous persistent store update, also known as write-behind. The key concept of this approach is to accumulate updates and then asynchronously flush them to the underlying database as a bulk operation.

📘

Performance vs. Consistency

Enabling write-behind caching increases performance by performing asynchronous updates, but this can lead to a potential drop in consistency as some updates could be lost due to node failures or crashes.

The actual data persistence can be triggered by time-based events (the maximum time that data entry can reside in the queue is limited), by queue-size events (the queue is flushed when its size reaches some particular point), or by using both of them in combination in which case either event will trigger the flush.

📘

Update Sequence

With the write-behind approach, only the last update to an entry will be written to the underlying storage. If a cache entry with a key named key1 is sequentially updated with values value1, value2, and value3 respectively, then only a single store request for (key1, value3) pair will be propagated to the persistent store.

📘

Update Performance

Batch store operations are usually more efficient than a sequence of single store operations. One can exploit this feature by enabling batch operations in the write-behind mode. Update sequences of similar types (put or remove) can be grouped to a single batch. For example, sequential cache puts of (key1, value1), (key2, value2), (key3, value3) will be batched into a single CacheStore.putAll(...) operation.

Write-behind caching can be enabled via the CacheConfiguration.setWriteBehindEnabled(boolean) configuration property. See configuration section below for a full list of configuration properties that allow you to customize the behavior of write-behind caching.

Configuration

The following configuration parameters of cacheConfiguration can be used to enable and configure write-behind caching:

Setter MethodDescriptionDefault
setWriteBehindEnabled(boolean)Sets flag indicating whether write-behind is enabled.false
setWriteBehindFlushSize(int)Maximum size of the write-behind cache. If cache size exceeds this value, all cached items are flushed to the CacheStore and write cache is cleared. If this value is 0, then flush is performed according to the flush frequency interval. Note that you cannot set both flush size and flush frequency to 0.10240
setWriteBehindFlushFrequency(long)Frequency with which write-behind cache is flushed to the CacheStore in milliseconds. This value defines the maximum time interval between object insertion/deletion from the cache and the moment when corresponding operation is applied to the CacheStore. If this value is 0, then flush is performed according to the flush size. Note that you cannot set both flush size and flush frequency to 0.5000 milliseconds
setWriteBehindFlushThreadCount(int)Number of threads that will perform cache flushing.1
setWriteBehindBatchSize(int)Maximum batch size for write-behind CacheStore operations.512

CacheStore interface can be set on CacheConfiguration via a Factory in much the same way that CacheLoader and CacheWriter are set.

🚧

For distributed cache configuration, Factory should be serializable.

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
  ...
    <property name="cacheConfiguration">
      <list>
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          ...
          <property name="cacheStoreFactory">
            <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
              <constructor-arg value="foo.bar.MyPersonStore"/>
            </bean>
          </property>
          <property name="readThrough" value="true"/>
          <property name="writeThrough"  value="true"/>
    		</bean>
    	</list>
    </property>
  ...
</bean>
IgniteConfiguration cfg = new IgniteConfiguration();

CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<>();

cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(MyPersonStore.class));
cacheCfg.setReadThrough(true);
cacheCfg.setWriteThrough(true);

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

RDBMS Integration

You can hook Ignite with any RDBMS, load data into Ignite caches, and perform key-value operations including ACID transactions. This can be done in two ways:

Automatic

Use Apache Ignite Web Console to automatically import metadata from an RDBMS and create Ignite cluster configurations. See Automatic RDBMS Integration documentation for more details.

Manual

Enable the JDBC POJO store manually in the Ignite XML configuration file (or via code). For this you need to:

  1. Download the JDBC driver of the database you are using, and make sure it is in the CLASSPATH of your application.
  2. Set the cacheStoreFactory property of CacheConfiguration by initializing the CacheJdbcPojoStoreFactory bean in which you should provide the following properties:
    • dataSourceBean - Database connection credentials - URL, user, password.
    • dialect - A dialect compatible with your database. Ignite provides out-of-the-box implementations for MySQL, Oracle, H2, SQLServer, and DB2 databases. These dialects can be found in the org.apache.ignite.cache.store.jdbc.dialect package of your Ignite distribution.
    • types - this property is required to define mappings between the database table and the corresponding POJO (see POJO configuration example below).

Once the configuration is set, you can use the IgniteCache.loadCache() method to load the data from the database into the respective caches.

Example

For this example, we are using MySQL as the database and lets assume that we have a "PERSON" table with fields - id, orgId, name, and salary.

<!-- Data source beans -->
<bean id="dsMySQL_Test" class="com.mysql.cj.jdbc.MysqlDataSource">
  <property name="URL" value="jdbc:mysql://[host]:[port]/[database]"/>
  <property name="user" value="YOUR_USER_NAME"/>
  <property name="password" value="YOUR_PASSWORD"/>
</bean>

<!-- Ignite Configuration -->
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
  <property name="cacheConfiguration">
    <list>
      <!-- Configuration for PersonCache -->
      <bean class="org.apache.ignite.configuration.CacheConfiguration">
        <property name="name" value="PersonCache"/>
        <property name="cacheMode" value="PARTITIONED"/>
        <property name="atomicityMode" value="ATOMIC"/>

        <property name="cacheStoreFactory">
          <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory">
            <property name="dataSourceBean" value="dsMySQL_Test"/>
            <property name="dialect">
              <bean class="org.apache.ignite.cache.store.jdbc.dialect.MySQLDialect">
              </bean>
            </property>

            <property name="types">
              <list>
                 <bean class="org.apache.ignite.cache.store.jdbc.JdbcType">
                    <property name="cacheName" value="PersonCache"/>
                    <property name="keyType" value="java.lang.Integer"/>
                    <property name="valueType" value="com.gridgain.pgarg.model.Person"/>
                    <property name="databaseSchema" value="MY_DB_SCHEMA"/>
                    <property name="databaseTable" value="PERSON"/>

                    <property name="keyFields">
                      <list>
                        <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                          <constructor-arg>
                            <util:constant static-field="java.sql.Types.INTEGER"/>
                          </constructor-arg>
                          <constructor-arg value="id"/>
                          <constructor-arg value="int"/>
                          <constructor-arg value="id"/>
                        </bean>
                      </list>
                    </property>

                    <property name="valueFields">
                      <list>
                        <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                          <constructor-arg>
                            <util:constant static-field="java.sql.Types.INTEGER"/>
                          </constructor-arg>
                          <constructor-arg value="orgId"/>
                          <constructor-arg value="java.lang.Integer"/>
                          <constructor-arg value="orgid"/>
                        </bean>

                        <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                          <constructor-arg>
                            <util:constant static-field="java.sql.Types.VARCHAR"/>
                          </constructor-arg>
                          <constructor-arg value="name"/>
                          <constructor-arg value="java.lang.String"/>
                          <constructor-arg value="name"/>
                        </bean>

                        <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                          <constructor-arg>
                            <util:constant static-field="java.sql.Types.INTEGER"/>
                          </constructor-arg>
                          <constructor-arg value="salary"/>
                          <constructor-arg value="java.lang.Integer"/>
                          <constructor-arg value="salary"/>
                        </bean>
                      </list>
                    </property>
                  </bean>
                </list>
              </property>
            </bean>
          </property>

          <property name="readThrough" value="true"/>
          <property name="writeThrough" value="true"/>

          <property name="queryEntities">
            <list>
              <bean class="org.apache.ignite.cache.QueryEntity">
                <property name="keyType" value="java.lang.Integer"/>
                <property name="valueType" value="com.gridgain.pgarg.model.Person"/>
                <property name="keyFieldName" value="id"/>

                 <property name="keyFields">
                   <list>
                     <value>id</value>
                    </list>
                  </property>

                  <property name="fields">
                    <map>
                      <entry key="orgid" value="java.lang.Integer"/>
                      <entry key="name" value="java.lang.String"/>
                      <entry key="salary" value="java.lang.Integer"/>
                      <entry key="id" value="java.lang.Integer"/>
                    </map>
                  </property>
                </bean>
              </list>
            </property>
          </bean>
          
          <!-- Provide similar configurations for other caches/tables -->
      </list>
  </property>
</bean>
try (Ignite ignite = Ignition.start("path/to/xml-config/file")) {
  // Load data from person table into PersonCache.
  ignite.cache("PersonCache").loadCache(null);
  
  // Populate other caches
  ...
}
import java.io.Serializable;

public class Person implements Serializable {
    /** */
    private static final long serialVersionUID = 0L;

    /** Value for orgid. */
    private Integer orgid;

    /** Value for name. */
    private String name;

    /** Value for salary. */
    private Integer salary;

    /** Value for id. */
    private int id;

    /** Empty constructor. **/
    public Person() {
        // No-op.
    }

    /** Full constructor. **/
    public Person(Integer orgid,
        String name,
        Integer salary,
        int id) {
        this.orgid = orgid;
        this.name = name;
        this.salary = salary;
        this.id = id;
    }

    /**
     * Gets orgid
     * 
     * @return Value for orgid.
     **/
    public Integer getOrgid() {
        return orgid;
    }

    /**
     * Sets orgid
     * 
     * @param orgid New value for orgid.
     **/
    public void setOrgid(Integer orgid) {
        this.orgid = orgid;
    }

    /**
     * Gets name
     * 
     * @return Value for name.
     **/
    public String getName() {
        return name;
    }

    /**
     * Sets name
     * 
     * @param name New value for name.
     **/
    public void setName(String name) {
        this.name = name;
    }

    /**
     * Gets salary
     * 
     * @return Value for salary.
     **/
    public Integer getSalary() {
        return salary;
    }

    /**
     * Sets salary
     * 
     * @param salary New value for salary.
     **/
    public void setSalary(Integer salary) {
        this.salary = salary;
    }

    /**
     * Gets id
     * 
     * @return Value for id.
     **/
    public int getId() {
        return id;
    }

    /**
     * Sets id
     * 
     * @param id New value for id.
     **/
    public void setId(int id) {
        this.id = id;
    }

    /** {@inheritDoc} **/
    @Override public boolean equals(Object o) {
        if (this == o)
            return true;
        
        if (!(o instanceof Person))
            return false;
        
        Person that = (Person)o;

        if (orgid != null ? !orgid.equals(that.orgid) : that.orgid != null)
            return false;
        

        if (name != null ? !name.equals(that.name) : that.name != null)
            return false;
        

        if (salary != null ? !salary.equals(that.salary) : that.salary != null)
            return false;
        

        if (id != that.id)
            return false;
        
        return true;
    }

    /** {@inheritDoc} **/
    @Override public int hashCode() {
        int res = orgid != null ? orgid.hashCode() : 0;

        res = 31 * res + (name != null ? name.hashCode() : 0);

        res = 31 * res + (salary != null ? salary.hashCode() : 0);

        res = 31 * res + (id);

        return res;
    }

    /** {@inheritDoc} **/
    @Override public String toString() {
        return "Person [" + 
            "orgid=" + orgid + ", " + 
            "name=" + name + ", " + 
            "salary=" + salary + ", " + 
            "id=" + id +
        "]";
    }
}

CacheJdbcBlobStore

CacheJdbcBlobStore implementation is backed by JDBC. This implementation stores objects in the underlying database in the BLOB format. The Store will create table ENTRIES in the database to store the data. The table will have key and val fields.
If custom DDL and DML statements are provided, then table and field names should be consistent for all the statements, and sequence of parameters should be preserved.

Use the CacheJdbcBlobStoreFactory factory to pass CacheJdbcBlobStore to CacheConfiguration.

<bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource">
        <property name="url" value="jdbc:h2:mem:jdbcCacheStore;DB_CLOSE_DELAY=-1" />
</bean>
  
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  ...
   <property name="cacheConfiguration">
     <list>
       <bean class="org.apache.ignite.configuration.CacheConfiguration">
         ...
           <property name="cacheStoreFactory">
             <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcBlobStoreFactory">
               <property name="user" value = "user" />
               <property name="dataSourceBean" value = "simpleDataSource" />
             </bean>
           </property>
       </bean>
      </list>
    </property>
  ...
</bean>

CacheJdbcPojoStore

CacheJdbcPojoStore of CacheStore is backed by JDBC and POJO via reflection. This implementation stores objects in the underlying database using java beans mapping description via reflection.

Use the CacheJdbcPojoStoreFactory factory to pass CacheJdbcPojoStore to CacheConfiguration.

<bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource"/>
  
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  ...
    <property name="cacheConfiguration">
      <list>
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          ...
            <property name="cacheStoreFactory">
              <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory">
                <property name="dataSourceBean" value = "simpleDataSource" />
              </bean>
            </property>
        </bean>
      </list>
    </property>
</bean>

CacheHibernateBlobStore

CacheHibernateBlobStore implementation is backed by Hibernate. This implementation
stores objects in the underlying database in BLOB format.

Use the CacheHibernateBlobStoreFactory factory to pass CacheHibernateBlobStore to CacheConfiguration.

<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  ...
    <property name="cacheConfiguration">
      <list>
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          <bean class="org.apache.ignite.cache.store.hibernate.CacheHibernateBlobStoreFactory">
           <property name="hibernateProperties">
             <props>
               <prop key="connection.url">jdbc:h2:mem:</prop>
               <prop key="hbm2ddl.auto">update</prop>
               <prop key="show_sql">true</prop>
             </props>
           </property>
         </bean>
       </list>
    </property>
  ...    
</bean>

NoSQL Integration

Ignite allows integration with NoSQL databases such as Apache Cassandra. Refer to Cassandra CacheStore documentation to see how to use Cassandra as an Apache Ignite persistent store. For other NoSQL databases for which Ignite does not provide any out-of-the-box implementation, you can implement your own CacheStore.

Note that even though Ignite supports distributed transactions, it doesn't make your NoSQL database transactional if it is used as a persistence layer for Ignite, unless the database supports transactions out of the box. For instance, transactions performed on Ignite caches will not be spanned to Cassandra.

Custom CacheStore

NoSQL databases for which Ignite does not provide any out-of-the-box implementation, you can implement your own CacheStore. The CacheStore interface allows you to write and read data from the underlying data store. In addition to the standard JCache loading and storing methods, it also introduces end-of-transaction demarcation and the ability to bulk load a cache from the database.

loadCache()

The CacheStore.loadCache() method allows for cache loading without passing all the keys that need to be loaded. This method is generally used for hot-loading the cache on initialization, but can be also called at any point after the cache has been started.

IgniteCache.loadCache() method will delegate to CacheStore.loadCache() method on every cluster member that is running the cache. To invoke loading only on the local cluster node, use the IgniteCache.localLoadCache() method.

📘

In case of partitioned caches, keys that are not mapped to this node, either as primary or backups, will be automatically discarded by the cache.

load(), write(), delete()

Methods load(), write(), and delete() in the CacheStore are called whenever methods get(), put(), and remove() are called correspondingly on the IgniteCache interface. These methods are used to enable read-through and write-through behavior when working with individual cache entries.

loadAll(), writeAll(), deleteAll()

Methods loadAll(), writeAll(), and deleteAll() in the CacheStore are called whenever methods getAll(), putAll(), and removeAll() are called correspondingly on the IgniteCache interface. These methods are used to enable read-through and write-through behavior when working with multiple cache entries and should generally be implemented using batch operations to provide better performance.

📘

CacheStoreAdapter provides default implementation for loadAll(), writeAll(), and deleteAll() methods which simply iterates through all keys one by one.

sessionEnd()

Ignite has a concept of store session which may span more than one CacheStore operation. Sessions are especially useful when working with transactions.

In case of ATOMIC caches, sessionEnd() is called after completion of each CacheStore method. In case of TRANSACTIONAL caches, sessionEnd() is called at the end of each transaction, which provides the ability to either commit or rollback multiple operations on the underlying persistent store.

📘

CacheStoreAdapater provides default empty implementation of sessionEnd() method.

CacheStoreSession

The main purpose of a CacheStoreSession is to hold the context between multiple store invocations whenever CacheStore is used in a cache transaction. For example, if using JDBC, you can store the ongoing database connection via the CacheStoreSession.attach() method. You can then commit this connection in the CacheStore#sessionEnd(boolean) method.

CacheStoreSession can be injected into your CacheStore implementation via the @GridCacheStoreSessionResource annotation.

📘

Cassandra CacheStore

Ignite provides out of the box integration with Apache Cassandra that is used as a CacheStore at the level of the in-memory data grid. To learn more about the integration, refer to the following documentation section.

Example

Below are a couple of different possible CacheStore implementations. Note that transactional implementation works with and without transactions.

public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
  // This method is called whenever "get(...)" methods are called on IgniteCache.
  @Override public Person load(Long key) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
        st.setLong(1, key);

        ResultSet rs = st.executeQuery();

        return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load: " + key, e);
    }
  }

  // This method is called whenever "put(...)" methods are called on IgniteCache.
  @Override public void write(Cache.Entry<Long, Person> entry) {
    try (Connection conn = connection()) {
      // Syntax of MERGE statement is database specific and should be adopted for your database.
      // If your database does not support MERGE statement then use sequentially update, insert statements.
      try (PreparedStatement st = conn.prepareStatement(
        "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
        for (Cache.Entry<Long, Person> entry : entries) {
          Person val = entry.getValue();
          
          st.setLong(1, entry.getKey());
          st.setString(2, val.getFirstName());
          st.setString(3, val.getLastName());
          
          st.executeUpdate();
        }
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
    }
  }

  // This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
  @Override public void delete(Object key) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
        st.setLong(1, (Long)key);

        st.executeUpdate();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to delete: " + key, e);
    }
  }

  // This method is called whenever "loadCache()" and "localLoadCache()"
  // methods are called on IgniteCache. It is used for bulk-loading the cache.
  // If you don't need to bulk-load the cache, skip this method.
  @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
    if (args == null || args.length == 0 || args[0] == null)
      throw new CacheLoaderException("Expected entry count parameter is not provided.");

    final int entryCnt = (Integer)args[0];

    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
        try (ResultSet rs = st.executeQuery()) {
          int cnt = 0;

          while (cnt < entryCnt && rs.next()) {
            Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));

            clo.apply(person.getId(), person);

            cnt++;
          }
        }
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load values from cache store.", e);
    }
  }

  // Open JDBC connection.
  private Connection connection() throws SQLException  {
    // Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
    // In this example we use H2 Database for simplification.
    Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");

    conn.setAutoCommit(true);

    return conn;
  }
}
public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
  /** Auto-injected store session. */
  @CacheStoreSessionResource
  private CacheStoreSession ses;

  // Complete transaction or simply close connection if there is no transaction.
  @Override public void sessionEnd(boolean commit) {
    try {
      Connection conn = ses.getAttached();
      if (conn != null && ses.isWithinTransaction()) {
        if (commit)
          conn.commit();
        else
          conn.rollback();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to end store session.", e);
    }
  }

  // This method is called whenever "get(...)" methods are called on IgniteCache.
  @Override public Person load(Long key) {
    try {
      Connection conn = connection();
      try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
        st.setLong(1, key);

        ResultSet rs = st.executeQuery();

        return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load: " + key, e);
    }
  }

  // This method is called whenever "put(...)" methods are called on IgniteCache.
  @Override public void write(Cache.Entry<Long, Person> entry) {
    try {
      Connection conn = connection();
      // Syntax of MERGE statement is database specific and should be adopted for your database.
      // If your database does not support MERGE statement then use sequentially update, insert statements.
      try (PreparedStatement st = conn.prepareStatement(
        "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
        for (Cache.Entry<Long, Person> entry : entries) {
          Person val = entry.getValue();
          
          st.setLong(1, entry.getKey());
          st.setString(2, val.getFirstName());
          st.setString(3, val.getLastName());
          
          st.executeUpdate();
        }
      }
    }        
    catch (SQLException e) {
      throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
    }
  }

  // This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
  @Override public void delete(Object key) {
    try {
      Connection conn = connection();
      try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
        st.setLong(1, (Long)key);

        st.executeUpdate();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to delete: " + key, e);
    }
  }

  // This method is called whenever "loadCache()" and "localLoadCache()"
  // methods are called on IgniteCache. It is used for bulk-loading the cache.
  // If you don't need to bulk-load the cache, skip this method.
  @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
    if (args == null || args.length == 0 || args[0] == null)
      throw new CacheLoaderException("Expected entry count parameter is not provided.");

    final int entryCnt = (Integer)args[0];

    try {
      Connection conn = connection();
     try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
        try (ResultSet rs = st.executeQuery()) {
          int cnt = 0;

          while (cnt < entryCnt && rs.next()) {
            Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));

            clo.apply(person.getId(), person);

            cnt++;
          }
        }
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load values from cache store.", e);
    }
  }

  // Opens JDBC connection and attaches it to the ongoing
  // session if within a transaction.
  private Connection connection() throws SQLException  {
    if (ses.isWithinTransaction()) {
      Connection conn = ses.getAttached();

      if (conn == null) {
        conn = openConnection(false);

        // Store connection in the session, so it can be accessed
        // for other operations within the same transaction.
        ses.attach(conn);
      }

      return conn;
    }
    // Transaction can be null in case of simple load or put operation.
    else
      return openConnection(true);
  }

  // Opens JDBC connection.
  private Connection openConnection(boolean autocommit) throws SQLException {
    // Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
    // In this example we use H2 Database for simplification.
    Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");

    conn.setAutoCommit(autocommit);

    return conn;
  }
}
public class CacheJdbcPersonStore extends CacheStore<Long, Person> {
  // Skip single operations and open connection methods.
  // You can copy them from jdbc non-transactional or jdbc transactional examples.
  ...
  
  // This method is called whenever "getAll(...)" methods are called on IgniteCache.
  @Override public Map<K, V> loadAll(Iterable<Long> keys) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement(
        "select firstName, lastName from PERSONS where id=?")) {
        Map<K, V> loaded = new HashMap<>();
        
        for (Long key : keys) {
          st.setLong(1, key);
          
          try(ResultSet rs = st.executeQuery()) {
            if (rs.next())
              loaded.put(key, new Person(key, rs.getString(1), rs.getString(2));
          }
        }

        return loaded;
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to loadAll: " + keys, e);
    }
  }
  
  // This method is called whenever "putAll(...)" methods are called on IgniteCache.
  @Override public void writeAll(Collection<Cache.Entry<Long, Person>> entries) {
    try (Connection conn = connection()) {
      // Syntax of MERGE statement is database specific and should be adopted for your database.
      // If your database does not support MERGE statement then use sequentially update, insert statements.
      try (PreparedStatement st = conn.prepareStatement(
        "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
        for (Cache.Entry<Long, Person> entry : entries) {
          Person val = entry.getValue();
          
          st.setLong(1, entry.getKey());
          st.setString(2, val.getFirstName());
          st.setString(3, val.getLastName());
          
          st.addBatch();
        }
        
				st.executeBatch();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to writeAll: " + entries, e);
    }
  }
  
  // This method is called whenever "removeAll(...)" methods are called on IgniteCache.
  @Override public void deleteAll(Collection<Long> keys) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
        for (Long key : keys) {
          st.setLong(1, key);
          
          st.addBatch();
        }
        
				st.executeBatch();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to deleteAll: " + keys, e);
    }
  }
}

Using 3rd Party Persistence Together with Ignite Persistence

Starting with Apache Ignite 2.4, it's possible to use 3rd party persistence together with Ignite native persistence for a single cluster deployment. Apache Ignite does its best effort to ensure consistency between Ignite and 3rd party storage when Ignite native persistence is enabled.

However, the data can become out of sync if transactions are used and all nodes responsible for a single partition go down (or the whole cluster goes down as a generalization of a single partition loss) during an ongoing transaction commit. Depending on timing, a transaction state may differ in the 3rd party persistence and Ignite persistence.

❗️

Apache Ignite does not guarantee strict consistency between native persistence and 3rd party persistence because the current CacheStore interface does not support a two-phase commit protocol.

On recovery, commit logs of both storages have to be compared. In case of inconsistency, the corresponding missing transactions should be either re-done or rolled backed.