|

Storm in Action

In a previous post, I introduced Storm, the distributed and fault-tolerant stream processing system for Big Data. This post covers some details about Storm clusters and then examines an actual Storm example.

In general, a Storm cluster is superficially similar to a Hadoop cluster. One key difference is that a MapReduce job eventually finishes, while a Storm job processes messages forever (or until the user kills it). Figure 1 illustrates the architecture of a Storm Cluster. In principle, there are two kinds of nodes on a Storm cluster:

Cluster

Figure 1: Storm Cluster

In a Storm cluster, all of the interactions between Nimbus and the Supervisors are done through a ZooKeeper cluster, which is an open source configuration and synchronization service for large distributed systems. Both the Nimbus daemon and Supervisor daemons are fail-fast and stateless, where all of the state is kept in ZooKeeper or on local disk. Communication between workers living on the same host or on different machines is based upon ZeroMQ sockets over which serialized Java objects (representing tuples) are being passed. Some of the features of ZeroMQ include (read more about Programming with ØMQ in ZeroMQ):

The runtime of a Storm topology continuously processes stream(s) of incoming tuples (events) that can be continuously produced and read from multiple tuple sources such as files or memory queue. In particular, the nextTuple() method of the spout class is continuously called, where the user-defined implementation of this method can for example read the next line of text or read the next tuple of the queue and transform the read data into a new tuple (i.e. a sequence of key-value pairs).

The implementation calls the emit() function on the spout’s OutputCollector object to actually inject the tuple into the storm cluster. The OutputCollector applies the stream groupings onto the tuple, which associates one or more task ids for each tuple. A continuous loop reads the serialized tuples off the transfer queue and sends them to the destination worker(s). Another continuous loop reads tuples off of the receive queue and calls the user-defined execute() method on the bolt referred to by the task id associated with that tuple. When the implementation decides that a new tuple should be triggered, it should construct a new tuple and emit it to the bolt’s OutputCollector.

Storm in Action

Let’s take a look at an example that counts the number of occurrences of each hash tag in an input stream of tweets. The following snippet of code shows an example of creating the storm topology of this task:

public static Topology createTopology() {
     TopologyBuilder builder = new TopologyBuilder();
     builder.setSpout("tweets-stream", new ApiStreamingSpout(), 1);
     builder.setBolt("hashtags-reader", new HashTagsReader(), 2)
     .shuffleGrouping("tweets- stream");
     builder.setBolt("hashtags-counter", new HashtagsCounterBolt(), 2)
     .fieldsGrouping("hashtags-reader ", new Fields("hashtags"));
     return builder.createTopology();
}

In this code, the setSpout method is used to create a Storm stream spout called “tweets-stream” that inputs the stream of tuples to the Storm topology. The number of tasks that will be assigned to execute this spout is equal to 1. The setBolt method is used to create two processing bolts: “hashtags-reader” and “hashtags-counter.” The number of tasks that will be assigned to execute each bolt is equal to 2. The “tweets- stream” bolt uses a shuffle grouping mechanism, while the bolt “hashtags-counter” applies a fileds grouping based upon the field “hashtags”

The following snippet of code shows the implementation of the “tweets-stream” spout of the topology that generates the input tweets:

public class ApiStreamingSpout extends BaseRichSpout {
     SpoutOutputCollector collector;
     TweetReader reader;
     public void nextTuple() {
          Tweet tweet = reader.getNextTweet();
          if(tweet != null)
               collector.emit(new Values(tweet));
     }
     public void open(Map conf, TopologyContext context,
          SpoutOutputCollector collector) {
          reader = new TwitterReader(conf.get("server"), conf.get("user"),
             conf.get("pass")); 
               this.collector = collector;
     }
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("tweet"));
     }
}

In this code, the nextTuple() method requests and reads the next tweet from the input stream and the Spout emits tweet tuples to the output collector. The declareOutputFields method declares that the bolt emits tuples with one field called “tweet.”

The following snippet of code shows the implementation of the “hashtags-reader” bolt of the topology that reads the input tweet tuples, identifies the hash tag of each tweet tuple and emits a new tuple with the hash tag name:

public class HashtagsReader extends BaseBasicBolt {
     public void execute(Tuple input, BasicOutputCollector collector) {
          Tweet tweet = (Tweet)input.getValueByField("tweet");
          for(String hashtag : tweet.getHashTags()){
               collector.emit(new Values(hashtag));
          }
     }
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("hashtag"));
     }
}

In this code, the execute method takes in as input a tweet tuple from the “tweets-stream” spout and emits new hash tag tuples using the OutputCollector object. The declareOutputFields method declares that the bolt emits tuples with one field called “hashtag.”

The following snippet of code shows the implementation of the “hashtags- counter” volt of the topology that reads the input hash tag tuples and counts the number of occurrences of each hash tag:

public class HashtagsCounterBolt extends BaseBasicBolt {
     public void execute(Tuple input, BasicOutputCollector collector) {
          String key = input.getStringByField("hashtag");
          if(hash(key) != null)
               storeRec[key].value +=1;
	  else
	       storeRec .insert(key,1) ;
     }
}

Figure 2 illustrates the execution of the example storm topology.

Example

Conclusion

The ubiquity of mobile devices, location services, and sensor pervasiveness are examples of new scenarios that have created the crucial need for building scalable computing platforms and parallel architectures to process vast amounts of generated streaming data. We have presented a road map of using the Storm system, a recently distributed and fault-tolerant large scale stream processing system. In general, Storm provides a new model for implementing big-data analytics over streaming data, and we expect it to continue to attract a considerable amount of interest in the systems of Big Data processing.

Safari Books Online has the content you need

Check out these Storm relevant books available from Safari Books Online:

In just 24 sessions of one hour or less, Getting Started with Storm introduces you to Storm, a distributed, JVM-based system for processing streaming data. Through simple tutorials, sample Java code, and a complete real-world scenario, you’ll learn how to build fast, fault-tolerant solutions that process results as soon as the data arrives.
Big Data Bibliography contains a selection of the most useful books for data analysis. This bibliography starts from high level concepts of business intelligence, data analysis and data mining, and works its way down to the tools needed for number crunching mathematical toolkits, machine learning, and natural language processing. Cloud Services and Infrastructure and Amazon Web Services are covered, along with Hadoop and NoSql sections that list the Big Data tools that can be deployed locally or in the cloud.
HBase: The Definitive Guide contains a chapter that covers ZooKeeper – Storm keeps all cluster states either in Zookeeper or on local disk.

About the author

sherif Dr. Sherif Sakr is a Senior Research Scientist in the Software Systems Group at National ICT Australia (NICTA), Sydney, Australia. He is also a Conjoint Senior Lecturer in The School of Computer Science and Engineering (CSE) at University of New South Wales (UNSW). He received his PhD degree in Computer Science from Konstanz University, Germany in 2007. He received his BSc and MSc degrees in Computer Science from the Information Systems department at the Faculty of Computers and Information in Cairo University, Egypt, in 2000 and 2003 respectively. In 2011, Dr. Sakr held a visiting research scientist position in the eXtreme Computing Group (XCG) at Microsoft Research, Redmond, USA. In 2012, he held a research MTS position in Alcatel-Lucent Bell Labs. Sherif is a Cloudera certified developer for Apache Hadoop and Cloudera certified Specialist for HBase. You can reach Sherif at ssakr@cse.unsw.edu.au.

About Safari Books Online

Safari Books Online is an online learning library that provides access to thousands of technical, engineering, business, and digital media books and training videos. Get the latest information on topics like Windows 8, Android Development, iOS Development, Cloud Computing, HTML5, and so much more – sometimes even before the book is published or on bookshelves. Learn something new today with a free subscription to Safari Books Online.
|

Comments are closed.