Posted by & filed under Content - Highlights and Reviews, Programming & Development.

In a previous post, I introduced Storm, the distributed and fault-tolerant stream processing system for Big Data. This post covers some details about Storm clusters and then examines an actual Storm example.

In general, a Storm cluster is superficially similar to a Hadoop cluster. One key difference is that a MapReduce job eventually finishes, while a Storm job processes messages forever (or until the user kills it). Figure 1 illustrates the architecture of a Storm Cluster. In principle, there are two kinds of nodes on a Storm cluster:

  • The Master node runs a daemon called Nimbus (similar to Hadoop’s JobTracker), which is responsible for distributing code around the cluster, assigning tasks to machines, and handling failures.
  • The worker nodes run a daemon called the Supervisor. The supervisor listens for work assigned to its machine and starts and stops worker processes as necessary based upon what Nimbus has assigned to it.
Cluster

Figure 1: Storm Cluster

In a Storm cluster, all of the interactions between Nimbus and the Supervisors are done through a ZooKeeper cluster, which is an open source configuration and synchronization service for large distributed systems. Both the Nimbus daemon and Supervisor daemons are fail-fast and stateless, where all of the state is kept in ZooKeeper or on local disk. Communication between workers living on the same host or on different machines is based upon ZeroMQ sockets over which serialized Java objects (representing tuples) are being passed. Some of the features of ZeroMQ include (read more about Programming with ØMQ in ZeroMQ):

  • Socket library that acts as a concurrency framework
  • Faster than TCP, for clustered products and supercomputing
  • Carries messages across inproc, IPC, TCP, and multicast
  • Asynch I/O for scalable multicore message-passing applications
  • Connect N-to-N via fanout, pubsub, pipeline, and request-reply

The runtime of a Storm topology continuously processes stream(s) of incoming tuples (events) that can be continuously produced and read from multiple tuple sources such as files or memory queue. In particular, the nextTuple() method of the spout class is continuously called, where the user-defined implementation of this method can for example read the next line of text or read the next tuple of the queue and transform the read data into a new tuple (i.e. a sequence of key-value pairs).

The implementation calls the emit() function on the spout’s OutputCollector object to actually inject the tuple into the storm cluster. The OutputCollector applies the stream groupings onto the tuple, which associates one or more task ids for each tuple. A continuous loop reads the serialized tuples off the transfer queue and sends them to the destination worker(s). Another continuous loop reads tuples off of the receive queue and calls the user-defined execute() method on the bolt referred to by the task id associated with that tuple. When the implementation decides that a new tuple should be triggered, it should construct a new tuple and emit it to the bolt’s OutputCollector.

Storm in Action

Let’s take a look at an example that counts the number of occurrences of each hash tag in an input stream of tweets. The following snippet of code shows an example of creating the storm topology of this task:

In this code, the setSpout method is used to create a Storm stream spout called “tweets-stream” that inputs the stream of tuples to the Storm topology. The number of tasks that will be assigned to execute this spout is equal to 1. The setBolt method is used to create two processing bolts: “hashtags-reader” and “hashtags-counter.” The number of tasks that will be assigned to execute each bolt is equal to 2. The “tweets- stream” bolt uses a shuffle grouping mechanism, while the bolt “hashtags-counter” applies a fileds grouping based upon the field “hashtags”

The following snippet of code shows the implementation of the “tweets-stream” spout of the topology that generates the input tweets:

In this code, the nextTuple() method requests and reads the next tweet from the input stream and the Spout emits tweet tuples to the output collector. The declareOutputFields method declares that the bolt emits tuples with one field called “tweet.”

The following snippet of code shows the implementation of the “hashtags-reader” bolt of the topology that reads the input tweet tuples, identifies the hash tag of each tweet tuple and emits a new tuple with the hash tag name:

In this code, the execute method takes in as input a tweet tuple from the “tweets-stream” spout and emits new hash tag tuples using the OutputCollector object. The declareOutputFields method declares that the bolt emits tuples with one field called “hashtag.”

The following snippet of code shows the implementation of the “hashtags- counter” volt of the topology that reads the input hash tag tuples and counts the number of occurrences of each hash tag:

Figure 2 illustrates the execution of the example storm topology.

Example

Conclusion

The ubiquity of mobile devices, location services, and sensor pervasiveness are examples of new scenarios that have created the crucial need for building scalable computing platforms and parallel architectures to process vast amounts of generated streaming data. We have presented a road map of using the Storm system, a recently distributed and fault-tolerant large scale stream processing system. In general, Storm provides a new model for implementing big-data analytics over streaming data, and we expect it to continue to attract a considerable amount of interest in the systems of Big Data processing.

Safari Books Online has the content you need

Check out these Storm relevant books available from Safari Books Online:

In just 24 sessions of one hour or less, Getting Started with Storm introduces you to Storm, a distributed, JVM-based system for processing streaming data. Through simple tutorials, sample Java code, and a complete real-world scenario, you’ll learn how to build fast, fault-tolerant solutions that process results as soon as the data arrives.
Big Data Bibliography contains a selection of the most useful books for data analysis. This bibliography starts from high level concepts of business intelligence, data analysis and data mining, and works its way down to the tools needed for number crunching mathematical toolkits, machine learning, and natural language processing. Cloud Services and Infrastructure and Amazon Web Services are covered, along with Hadoop and NoSql sections that list the Big Data tools that can be deployed locally or in the cloud.
HBase: The Definitive Guide contains a chapter that covers ZooKeeper – Storm keeps all cluster states either in Zookeeper or on local disk.

About the author

sherif Dr. Sherif Sakr is a Senior Research Scientist in the Software Systems Group at National ICT Australia (NICTA), Sydney, Australia. He is also a Conjoint Senior Lecturer in The School of Computer Science and Engineering (CSE) at University of New South Wales (UNSW). He received his PhD degree in Computer Science from Konstanz University, Germany in 2007. He received his BSc and MSc degrees in Computer Science from the Information Systems department at the Faculty of Computers and Information in Cairo University, Egypt, in 2000 and 2003 respectively. In 2011, Dr. Sakr held a visiting research scientist position in the eXtreme Computing Group (XCG) at Microsoft Research, Redmond, USA. In 2012, he held a research MTS position in Alcatel-Lucent Bell Labs. Sherif is a Cloudera certified developer for Apache Hadoop and Cloudera certified Specialist for HBase. You can reach Sherif at ssakr@cse.unsw.edu.au.

Tags: Hadoop, MapReduce, Nimbus, Storm, Storm Cluster, Supervisor, ZeroMQ, zookeeper,

Comments are closed.