Connecting...

W1siziisimnvbxbpbgvkx3rozw1lx2fzc2v0cy9zawduawz5lxrly2hub2xvz3kvanbnl2jhbm5lci1kzwzhdwx0lmpwzyjdxq

Apache Ignite: More than a simple cache by Gastón Lucero

W1siziisijiwmtgvmduvmjevmtyvmjyvndmvnjawlzywmf80njqwodgwmtiuanblzyjdlfsiccisinrodw1iiiwiotawedkwmfx1mdazzsjdxq

Image credit meetup.com

In this article by Gastón Lucero, Big Data Architect, he talks about Apache Ignite. We hope you enjoy!

 

"Let us suppose that we start to develop a webserver for our IOT App with a few endpoints, like POST for receive events, GET devicesBySensorType, GET all, and PUT for update device metadata, etc.

At first, a cache for common data could seem like a secondary issue, but if we start to think long term and if we want to improve performance and/or to decrease response time (e.g when a service retrieve data from a database), we realize that a cache is a mandatory requirement.

An initial solution could be to use the always reliable HashMap class (or ConcurrentHashMap after the first ConcurrentModificationException): this is a first attempt that, by caching objects in different maps in memory, may give us quick benefits. But it is not enough.

 

Hello Data Grid

With Ignite, this is simple, only add the dependency on your scala/java project:
1> sbt
2
3       "org.apache.ignite" % "ignite-core" % "2.4.0"
4
5> maven
6
7    <dependency>
8          <groupId>org.apache.ignite</groupId>
9          <artifactId>ignite-core</artifactId>
10          <version>2.4.0</version>
11   </dependency>

Scala code for the example:

1 object IgniteSimpleDataGrid extends App {
2
3     val igniteConfig = new IgniteConfiguration()
4     val cacheConfig = new CacheConfiguration("ignite")
5     igniteConfig.setCacheConfiguration(cacheConfig)
6     val ignite = Ignition.start(igniteConfig)
7     val cache: IgniteCache[Int, String] = ignite.getOrCreateCache[Int, String]("ignite")
8     for (i <- 1 to 10) {
9       cache.put(i, s"value-$i")
10     }
11     println(s"From cache 7 -> ${cache.get(7)}")
12     println(s"Not exists 20 -> ${cache.get(20)}")
13}

Run the class and see the output
(Note: ONLY one running Ignite instance per JVM)

Perfect, we have our first single version of Ignite Data Grid, but what if we need HA and to deploy another instance of the app? Or it may sound familiar a common situation where that metadata endpoint is related to another service and uses the same information so are in need of sharing data between the apps and, sadly, the maps cache schema solution does not work anymore.

What if I tell you that with in-memory data grid data could be distributed and every node in the cluster (forming a ring topology) can access and maintain the shared information, besides computing remote functions or custom predicates? Would you believe that this is possible? The answer is absolutely yes and here is where Ignite stands out among other solutions.

Let’s test IgniteSimpleDataGrid class with two instances(nodes) : node1 is already running so before we can start the app again (node2), we need to comment the for cycle (block in the code snippet), because node1 has already populated the cache with values and this means that node2 will only be reading from cache.

1/* for (i <- 1 to 10) {
2    cache.put(i, s"value-$i")
3    } */
4   println(s"From cache 7 -> ${cache.get(7)}")
5   println(s"Not exists 20 -> ${cache.get(20)}")

Pay attention to the output in node2: the console prints almost the same data as node1 in the first example (look at the id, it is different).

And now, look at the output in node1:

Both nodes print this line:

Great!! That means that we have our Ignite in-memory distributed key-value store cluster up and running!! (more than a simple cache, and we see why).
Nodes have been self-discovered and all of them see the same data. If we start more nodes, the topology will keep on growing up, each node will be assigned with a unique id and the server number will increase ( e.g. servers =3, servers=4 , etc..)

If we stop node1, topology decreases and the rest of the running nodes print this change, because they are listening to topology change events.

Ignite implements high availability, if a node goes down, the data is then re-balanced around the cluster without any user operation involved.

Let us clarify this point.

My definition of Ignite is that it’s a distributed in-memory cache, query and processing platform for working with large-scale data sets in real-time (leaving aside, streaming processing, Spark integration, Machine learning grid, Ignite FileSystem, persistence, transactions…)

How can Ignite automagically create a cluster? Well, it provides TcpDiscoverySpi as a default implementation of DiscoverySpi that uses TCP/IP for node discovery. Discovery SPI can be configured for Multicast- and Static IP-based node discovery. (Spi = especially Ip Finder)

It means that nodes use multicast to find each other:

1 val igniteConfig = new IgniteConfiguration()
2 val spi = new TcpDiscoverySpi()
3 val ipFinder = new TcpDiscoveryMulticastIpFinder()
4 ipFinder.setMulticastGroup("228.10.10.157") // Default value = DFLT_MCAST_GROUP = "228.1.2.4";
5 spi.setIpFinder(ipFinder)
6 cfg.setDiscoverySpi(spi)
7 val ignite = Ignition.start(igniteConfig)
8 val cache: IgniteCache[Int, String] = ignite.getOrCreateCache[Int, String]("ignite")

 

Ignite Ring Topology

 

In the image the nodes form a ring of server nodes but, inside an Ignite cluster, nodes can either have roles and belong to a cluster group or they could be client nodes (this means “outside” of the ring and without the possibility of maintaining cached data. This may prove useful for external applications.

Nodes can broadcast messages to other nodes in a particular group:

1 val igniteCluster = ignite.cluster()
2 send%20to%20cluster%20remotes%20nodes%20%28ignoring%20this%29%20this%20code
3 ignite.compute(igniteCluster.forRemotes()).broadcast(
4      new IgniteRunnable {
5          override def run(): Unit = println(s"Hello node ${igniteCluster.localNode()},this message had been send by igniteCompute broadcast")
6       }
7)

As this example shows, when a node starts, it sends a message to the cluster group forRemotes and to all other nodes (except from itself) that have been configured in mode=server. You might see an exception on the first node at the beginning when the cluster has just been created but don’t worry because it is normal.

Furthermore it is possible to define nodes with your custom attributes:

1 igniteConfig.setUserAttributes(Map[String,Any]("ROLE" -> "MASTER").asJava)
2 ignite.compute(igniteCluster.forAttribute("ROLE","WORKER")).broadcast(new IgniteRunnable {
3        override def run(): Unit = println(s"Hello worker node ${igniteCluster.localNode()}," +s"this message had been send by ignite master node")
4})

Here the node has attribute ROLE = MASTER, and broadcast only to nodes with ROLE = WORKER

Nodes with:

1 igniteConfig.setUserAttributes(Map[String,Any]("ROLE" -> "WORKER").asJava)

will receive the message:

As long as your cluster is alive, Ignite will guarantee that the data between different cluster nodes will always remain consistent regardless of crashes or topology changes.

So far so good: we have seen nodes, topology, cluster groups, broadcast and now let us look forward to one of the best feature of Ignite (PS: durable memory, persistence, spark RDD, streaming, transactions will be tackled in the next posts XD): SQL Queries!!
Does it sounds crazy? Yes, SQLQueries! Over your data!
Ignite is fully ANSI-99 compliant and it supports Text queries and Predicate-based Scan Queries.

 

Cache Queries

Before starting to play with code, a few considerations:
  • Add dependency
1 sbt = "org.apache.ignite" % "ignite-indexing" % "2.4.0"
2
3 maven = <dependency>
4            <groupId>org.apache.ignite</groupId>
5            <artifactId>ignite-indexing</artifactId>
6            <version>2.4.0</version>
7         </dependency>
  • Tell Ignite, which entities are allowed to be used on queries, it is easy, only adding annotations to classes:
1 case class IotDevice(
2@(QuerySqlField@field)(index = true) name: String,
3@(QuerySqlField@field)(index = true) gpio: String,
4@(QueryTextField@field) sensorType: String,
5@(QueryTextField@field) model: String) 

Here we said, that all fields are available for use in queries, and add indexes over name and gpio attributes (like in any sql database)

  • After indexed and queryable fields are defined, they have to be registered in the SQL engine along with the object types they belong to.
1 val igniteConfig = new IgniteConfiguration()
2 val cacheConfig = new CacheConfiguration("ignite")
3 cacheConfig.setIndexedTypes(Seq(classOf[String], classOf[IotDevice]): _*)
4 igniteConfig.setCacheConfiguration(cacheConfig)
5 val ignite = Ignition.start(igniteConfig)
6 val cacheIot: IgniteCache[String, IotDevice] = ignite.getOrCreateCache[Int, String]("ignite")

Following the idea of IoT WebServer, for example we develop the web server and put data to the cacheIot defined above.

1 val temp1 = IotDevice(name = "temp1", gpio = “123ASD", sensorType = "temperature", model = "test")
2 cacheIot.put(temp1.gpio,temp1)
3 val temp2 = IotDevice(name = "temp2", gpio = “456ASD", sensorType = "temperature", model = "test")
4 cacheIot.put(temp2.gpio,temp2)

Now user call method: GET/devicesBySensorType?sensor=temperature

In our IotDevice case class sensorType is valid for queries thus, we can execute this query in three ways in Ignite:

Simple sql :

1 val sqlText = s"sensorType = 'temperature'"
2 val sql = new SqlQuery[String, IotDevice](classOf[IotDevice], sqlText)
3 val temperatureQueryResult = cacheIot.query(sql).getAll.asScala.map(_.getValue)
4 println(s"SqlQuery = $temperatureQueryResult")

ScanQuery:

1 val cursor = cacheIot.query(new ScanQuery(new IgniteBiPredicate[String, IotDevice] {
2        override def apply(key: String, entryValue: IotDevice) : Boolean = entryValue.sensorType == "temperature"
3 }))
4 val temperatureScanResult = cursor.getAll.asScala
5 println(s"ScanQuery = $temperatureScanResult")

Text-based queries based on Lucene indexing, here find all IotDevices where sensorType == “temperature” (the annotation on model attribute QueryTextField allow this query, if you want, Ignite supports more than one QueryTextField)

1 val textQuery = new TextQuery[IotDevice, String](classOf[IotDevice], "temperature")
2 val temperatureTextResult = cacheIot.query(textQuery).getAll.asScala
3 println(s"TextQuery = $temperatureTextResult") //all devices with sensorType = temperature

The result of queries:

Here is the full code for the example:

1import org.apache.ignite.cache.query.annotations.{QuerySqlField, QueryTextField}
2import org.apache.ignite.cache.query.{ScanQuery, SqlQuery, TextQuery}
3import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
4import org.apache.ignite.lang.IgniteBiPredicate
5import org.apache.ignite.{IgniteCache, Ignition}
6import scala.annotation.meta.field
7import scala.collection.JavaConverters._
8
9object IgniteSql extends App {
10
11     val igniteConfig = new IgniteConfiguration()
12      val cacheConfig = new CacheConfiguration("ignite")
13     cacheConfig.setIndexedTypes(Seq(classOf[String], classOf\[IotDevice]): _*)
14     igniteConfig.setCacheConfiguration(cacheConfig)
15
16     val ignite = Ignition.start(igniteConfig)
17     val cacheIot: IgniteCache[String, IotDevice] = ignite.getOrCreateCache[String, IotDevice]("ignite")
18
19     val temp1 = IotDevice(name = "temp1", gpio = "123ASD", sensorType = "temperature", model = "testTemp")
20     cacheIot.put(temp1.gpio, temp1)
21     val temp2 = IotDevice(name = "temp2", gpio = "456ASD", sensorType = "temperature", model = "testTemp")
22     cacheIot.put(temp2.gpio, temp2)
23
24     val sqlText = s"sensorType = 'temperature'"
25     val sql = new SqlQuery[String, IotDevice](classOf[IotDevice], sqlText)
26     val temperatureQueryResult = cacheIot.query(sql).getAll.asScala.map(_.getValue)
27     println(s"SqlQuery = $temperatureQueryResult")
28
29     val cursor = cacheIot.query(new ScanQuery(new IgniteBiPredicate[String, IotDevice] {
30          override def apply(key: String, entryValue: IotDevice): Boolean = entryValue.sensorType == "temperature"}))
31
32     val temperatureScanResult = cursor.getAll.asScala
33     println(s"ScanQuery = $temperatureScanResult")
34
35     val textQuery = new TextQuery[IotDevice, String](classOf[IotDevice], "temperature")
36     val temperatureTextResult = cacheIot.query(textQuery).getAll.asScala
37     println(s"TextQuery = $temperatureTextResult") //all devices with sensorType = temperature
38
39}
40
41case class IotDevice(@(QuerySqlField@field)(index = true) name: String,
42@(QuerySqlField@field)(index = true) gpio: String,
43@(QueryTextField@field) sensorType: String,
44@(QueryTextField@field) model: String)
 

 

 Partition and Replication

Besides having our cluster and executing queries, we might ask ourselves: where currently is our data? Ignite provides three different modes of cache operation: PARTITIONED, REPLICATED, and LOCAL:
  • Partitioned: this mode is the most scalable among the distributed cache modes. In this mode the overall data set is divided equally into partitions and all partitions are split equally between participating nodes. This is ideal when working with large data sets and updates are frequent. In this mode you can also optionally configure any number of backup nodes for cached data.
1cacheCfg.setCacheMode(CacheMode.PARTITIONED)
2cacheCfg.setBackups(1);

  • Replicated: this mode is expensive because all the data is replicated to every node in the cluster and every data updates must be propagated to all other nodes that can have an impact on performance and scalability. This mode is ideal when we are working with small datasets and updates are not frequent.
1cacheCfg.setCacheMode(CacheMode.REPLICATED)

  • Local: this mode is the most light-weight mode of cache operation, as no data is distributed to other cache nodes.
 

In-Memory features

Since Ignite architecture is *memory friendly*, RAM is always treated as the first memory tier, where all the processing happens. Some benefits of this:

 Off-heap Based: Data and indexes are stored outside Java heap, therefore only your app code may trigger Garbage collection actions.
– Predictable memory usage: It is possible to set up memory utilization.
– Automatic memory defragmentation: Apache Ignite uses the memory as efficiently as possible and executes defragmentation routines in the background thus avoiding fragmentation.
 Improved performance and memory utilization: All the data and indexes are stored in paged format with similar representation in memory and on disk, which removes any need for serializing or deserializing of data.

 

Conclusion

Could Ignite help your distributed architectures? Would it be costly to integrate Ignite in your apps that are already in production?

There is a lot more to cover and discuss…and a lot of code to try out! We only scratched the surface of this great tool."

This article was written by Gastón Lucero and posted on www.stratio.com