皮皮网

【多功能源码】【美化网站弹窗源码】【帝国cms整站 源码】baserichspout源码

2024-11-20 04:53:04 来源:展示源码

1.如何在eclipse调试storm程序

baserichspout源码

如何在eclipse调试storm程序

       ä¸€ã€ä»‹ç»

        storm提供了两种运行模式:本地模式和分布式模式。本地模式针对开发调试storm topologies非常有用。

        Storm has two modes of operation: local mode and distributed mode.

        In local mode,多功能源码 Storm executes completely in process by simulating

       worker nodes with threads. Local mode is useful for testing and

       development of topologies

        因为多数程序开发者都是使用windows系统进行程序开发,如果在本机不安装storm环境的情况下,开发、调试storm程序。如果你正在为此问题而烦恼,请使用本文提供的方法。

        二、实施步骤

        如何基于eclipse+maven调试storm程序,步骤如下:

        1.搭建好开发环境(eclipse+maven,本人使用的是eclipse Kepler 与maven3.1.1)

        2.创建maven项目,并修改pom.xml,内容如pom.xml(机器联网,下载所需的依赖jar)

        Github上的pom.xml,引入的依赖太多,有些不需要,

        3. 编写storm程序,指定为本地模式运行。本文提供的程序是wordcount

        重要的是LocalCluster cluster = new LocalCluster();这一句

       Config conf = new Config();

       conf.setDebug(true);

       conf.setNumWorkers(2);

       LocalCluster cluster = new LocalCluster();

       cluster.submitTopology("test", conf, builder.createTopology());

       Utils.sleep();

       cluster.killTopology("test");

       cluster.shutdown();

       pom.xml文件

       <project xmlns="mons-collections</groupId>

        <artifactId>commons-collections</artifactId>

        <version>3.2.1</version>

        </dependency>

        </dependencies>

       </project>

       storm程序

       package storm.starter;

       import java.util.HashMap;

       import java.util.Map;

       import storm.starter.spout.RandomSentenceSpout;

       import backtype.storm.Config;

       import backtype.storm.LocalCluster;

       import backtype.storm.StormSubmitter;

       import backtype.storm.topology.BasicOutputCollector;

       import backtype.storm.topology.OutputFieldsDeclarer;

       import backtype.storm.topology.TopologyBuilder;

       import backtype.storm.topology.base.BaseBasicBolt;

       import backtype.storm.tuple.Fields;

       import backtype.storm.tuple.Tuple;

       import backtype.storm.tuple.Values;

       /

**

        * This topology demonstrates Storm's stream groupings and multilang

        * capabilities.

        */

       public class WordCountTopology {

        public static class SplitSentence extends BaseBasicBolt {

        @Override

        public void execute(Tuple input, BasicOutputCollector collector) {

        try {

        String msg = input.getString(0);

        System.out.println(msg + "-------------------");

        if (msg != null) {

        String[] s = msg.split(" ");

        for (String string : s) {

        collector.emit(new Values(string));

        }

        }

        } catch (Exception e) {

        e.printStackTrace();

        }

        }

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

        }

        }

        public static class WordCount extends BaseBasicBolt {

        Map<String, Integer> counts = new HashMap<String, Integer>();

        @Override

        public void execute(Tuple tuple, BasicOutputCollector collector) {

        String word = tuple.getString(0);

        Integer count = counts.get(word);

        if (count == null)

        count = 0;

        count++;

        counts.put(word, count);

        collector.emit(new Values(word, count));

        }

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word", "count"));

        }

        }

        public static void main(String[] args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new RandomSentenceSpout(), 5);

        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping(

        "spout");

        builder.setBolt("count", new WordCount(), ).fieldsGrouping("split",

        new Fields("word"));

        Config conf = new Config();

        conf.setDebug(true);

        if (args != null && args.length > 0) {

        conf.setNumWorkers(3);

        StormSubmitter.submitTopology(args[0], conf,

        builder.createTopology());

        } else {

        conf.setMaxTaskParallelism(3);

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("word-count", conf, builder.createTopology());

        Thread.sleep();

        cluster.shutdown();

        }

        }

       }

       package storm.starter.spout;

       import backtype.storm.spout.SpoutOutputCollector;

       import backtype.storm.task.TopologyContext;

       import backtype.storm.topology.OutputFieldsDeclarer;

       import backtype.storm.topology.base.BaseRichSpout;

       import backtype.storm.tuple.Fields;

       import backtype.storm.tuple.Values;

       import backtype.storm.utils.Utils;

       import java.util.Map;

       import java.util.Random;

       public class RandomSentenceSpout extends BaseRichSpout {

        SpoutOutputCollector _collector;

        Random _rand;

       @Override

        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

        _collector = collector;

        _rand = new Random();

        }

        @Override

        public void nextTuple() {

        Utils.sleep();

        String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",

        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };

        String sentence = sentences[_rand.nextInt(sentences.length)];

        _collector.emit(new Values(sentence));

        }

        @Override

        public void ack(Object id) {

        }

        @Override

        public void fail(Object id) {

        }

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

        }

       }