How to quickly load large table in Apache Ignite via Key-Value API

Some time ago, the Apache Ignite platform emerged on the horizon and began to gain popularity. In-memory computing is speed, which means that speed must be ensured at all stages of work, especially when loading data.



Below the cut is a description of a way to quickly load data from a relational table into a distributed Apache Ignite cluster. The preprocessing of the SQL query result set on the client node of the cluster and the distribution of data across the cluster using the map-reduce task are described. Describes caches and related relational tables, shows how to create a custom object from a table row, and how to use the ComputeTaskAdapter to quickly place the created objects. All the code can be seen in full in the FastDataLoad repository .



History of the issue



This text is a translation into Russian of my post in the In-Memory Computing Blog on the GridGain website.



So, a certain company decides to speed up a slow application by moving computing to an in-memory cluster. The initial data for calculations are in MS SQL; the result of calculations must be put there. The cluster is distributed, since there is already a lot of data, the application performance is at the limit and the data volume is growing. Hard time limits are set.



Before writing fast code to process data, the data needs to be loaded quickly. A frantic search of the web reveals a clear dearth of code examples that can scale to tables of tens or hundreds of millions of rows. Examples that you can download, compile, and walk through the steps in debugging. This is on the one hand.



, Apache Ignite / GridGain, . , . " ?", โ€” , .

, .



(World Database)



, data collocation, . world.sql Apache Ignite.



CSV , โ€” SQL :





countryCache country.csv. countryCache โ€” code, โ€” String, โ€” Country, (name, continent, region).







, โ€” , . Country , . org.h2.tools.Csv, CSV java.sql.ResultSet. Apache Ignite , SQL H2.



    // define countryCache
    IgniteCache<String,Country> cache = ignite.cache("countryCache");

    try (ResultSet rs = new Csv().read(csvFileName, null, null)) {
     while (rs.next()) {
      String code = rs.getString("Code");
      String name = rs.getString("Name");
      String continent = rs.getString("Continent");
      Country country = new Country(code,name,continent);
      cache.put(code,country);
     }
    }


. , , . - .



, . , .





Apache Ignite โ€” -. , PARTITIONED - (partition) . ; , . -, affinity function, , .



ResultSet . , . .





, :



  • HashMap partition_number -> key -> Value

    Map<Integer, Map<String, Country>> result = new HashMap<>();
  • affinity function partition_number. cache.put() - HashMap partition_number

    try (ResultSet rs = new Csv().read(csvFileName, null, null)) {
     while (rs.next()) {
      String code = rs.getString("Code");
      String name = rs.getString("Name");
      String continent = rs.getString("Continent");
      Country country = new Country(code,name,continent);
      result.computeIfAbsent(affinity.partition(key), k -> new HashMap<>()).put(code,country);
     }
    }


ComputeTaskAdapter ComputeJobAdapter. ComputeJobAdapter 1024. , .



ComputeJobAdapter . , .



Compute Task,



, "ComputeTaskAdapter initiates the simplified, in-memory, map-reduce process". ComputeJobAdapter map โ€” , . reduce โ€” .



(RenewLocalCacheJob)



, RenewLocalCacheJob .



targetCache.putAll(addend);


RenewLocalCacheJob partition_number .



(AbstractLoadTask)



( loader) โ€” AbstractLoadTask. . ( ), AbstractLoadTask TargetCacheKeyType. HashMap



    Map<Integer, Map<TargetCacheKeyType, BinaryObject>> result;


countryCache String. . AbstractLoadTask TargetCacheKeyType, BinaryObject. , โ€” .



BinaryObject



โ€” . , JVM, - . class definition , JAR- . Country



    IgniteCache<String, Country> countryCache;


, , classpath ClassNotFound.



. โ€” classpath, :



  • JAR- ;
  • classpath ;
  • ;
  • .


โ€” BinaryObject () . :





  • IgniteCache<String, BinaryObject> countryCache;    
  • Country BinaryObject (. LoadCountries.java)

    Country country = new Country(code, name, .. );    
    BinaryObject binCountry = node.binary().toBinary(country);    
  • HashMap, BinaryObject

    Map<Integer, Map<String, BinaryObject>> result


, . , , ClassNotFoundException .



. .



Apache Ignite : .





default-config.xml โ€” . :



  • GridGain CE Installing Using ZIP Archive. 8.7.10, FastDataLoad , ;
  • {gridgain}\config default-config.xml



    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="peerClassLoadingEnabled" value="true"/>
    </bean>
  • , {gridgain}\bin ignite.bat. ; ;
  • , . ,

    [08:40:04] Topology snapshot [ver=2, locNode=d52b1db3, servers=2, clients=0, state=ACTIVE, CPUs=8, offheap=3.2GB, heap=2.0GB]


. , 8.7.25, pom.xml



    <gridgain.version>8.7.25</gridgain.version>




class org.apache.ignite.spi.IgniteSpiException: Local node and remote node have different version numbers (node will not join, Ignite does not support rolling updates, so versions must be exactly the same) [locBuildVer=8.7.25, rmtBuildVer=8.7.10]




, , map-reduce. โ€” JAR-, compute task . Windows, Linux.

:



  • FastDataLoad;
  • ;

    mvn clean package
  • , .

    java -jar .\target\fastDataLoad.jar


main() LoadApp LoaderAgrument . map-reduce LoadCountries.

LoadCountries RenewLocalCacheJob , ( ).



#1





#2









country.csv , CountryCode . cityCache countryLanguageCache; , .





.



.



:



  • (SQL Server Management Studio):

    • โ€” 44 686 837;
    • โ€” 1.071 GB;
  • โ€” 0H:1M:35S;
  • RenewLocalCacheJob reduce โ€” 0H:0M:9S.


It takes less time to distribute data across a cluster than to execute an SQL query.




All Articles