博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm安装
阅读量:5949 次
发布时间:2019-06-19

本文共 10355 字,大约阅读时间需要 34 分钟。

  1. 0安装JDK

    wget --no-check-certificate --no-cookie --header "Cookie: oraclelicense=accept-securebackup-cookie;" http://download.oracle.com/otn-pub/java/jdk/8u45-b14/jdk-8u45-linux-x64.rpm

    使用rpm -ivh jdk-8u45-linux-x64.rpm进行安装

    检查安装Javac

    1:centOS安装ZeroMQ所需组件及工具:

    yum install gcc

    yum install gcc-c++

    yum install make

    yum install uuid-devel

    yum install libuuid-devel

     

    yum install libtool

  2.  

    wget http://mirror.bjtu.edu.cn/apache/zookeeper/stable/zookeeper-3.4.6.tar.gz

     

    tar -zxvf zookeeper-3.4.6.tar.gz

     

    cp -R zookeeper-3.4.6 /usr/local/

     

    ln -s /usr/local/zookeeper-3.4.6/  /usr/local/zookeeper

     

    vim /etc/profile

    export ZOOKEEPER_HOME="/path/to/zookeeper" #路径指定,存放日志等文件

     

    cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg

     

    mkdir /tmp/zookeepermkdir /var/log/zookeeper

  3.  

    安装zeromq以及jzmq:

    wget http://download.zeromq.org/zeromq-2.2.0.tar.gz

    tar zxf zeromq-2.2.0.tar.gz

     cd zeromq-2.2.0

    ./configure

    make

    make install

    sudo ldconfig (更新LD_LIBRARY_PATH)zeromq安装完成。

     

    安装jzmq: (提前安装好java)

    yum install git

     git clone git://github.com/nathanmarz/jzmq.git

    cd jzmq

    ./autogen.sh

    ./configure

    make

    make install然后,jzmq就装好了.注意:在./autogen.sh这步如果报错:autogen.sh:error:could not find libtool is required to run autogen.sh,这是因为缺少了libtool,可以用#yum install libtool*来解决。

  4.  

    wget 

    unzip storm-0.8.1.zip
    mv storm-0.8.1 /usr/local/
    ln -s /usr/local/storm-0.8.1/ /usr/local/storm
    vim /etc/profile
    export STORM_HOME=/usr/local/storm-0.8.1
    export PATH=$PATH:$STORM_HOME/bin
    到此为止单机版的Storm就安装完毕了。

  5. 5
    1. 启动ZOOPKEEPER
      zkServer.sh start
    2. 启动NIMBUS
      storm nimbus &
    3. 启动SUPERVISOR
      storm supervisor &
    4. 启动UI
      storm ui &
    5. 部署TOPOLOGY
      storm jar /opt/hadoop/loganalyst/storm-dependend/data/teststorm-1.0.jar teststorm.TopologyMain /opt/hadoop/loganalyst/storm-dependend/data/words.txt
    6. 删除TOPOLOGY
      storm kill {toponame}
    7. 激活TOPOLOGY
      storm active {toponame}
    8. 不激活TOPOLOGY
      storm deactive {toponame}
    9. 列出所有TOPOLOGY
      storm list

    再查看进程jps查看UI:在浏览器中输入http://localhost:8080

     

     

     

    6:storm进程远程kill

    1. String topologyName = "topology001";  
    2. boolean kill = false;  
    3. Map conf = Utils.readStormConfig();  
    4. //nimbus服务器地址  
    5. conf.put(Config.NIMBUS_HOST, "172.16.1.100");  
    6. //nimbus thrift地址  
    7. conf.put(Config.NIMBUS_THRIFT_PORT, 6627);  
    8. Nimbus.Client client = NimbusClient.getConfiguredClient(conf).getClient();  
    9. List<TopologySummary> topologyList = client.getClusterInfo().get_topologies();  
    10. for(TopologySummary topologySummary : topologyList) {  
    11.     if(topologySummary.get_name().equals(topologyName)) {  
    12.     KillOptions killOpts = new KillOptions();  
    13. //延迟杀死时间,单位秒  
    14.     killOpts.set_wait_secs(5);  
    15.     client.killTopologyWithOpts(topologyName, killOpts);  
    16.     kill = true;  
    17.     System.out.println("killed " + topologyName);  
    18.     }  
    19. }  
    20. if(!kill)  
    21.     System.out.println(topologyName + " not started");  

     

     

     

    使用ThriftAPI监控Storm集群和Topology
    2015-01-15       0     来源:xcc的博客  
        

    如要监控Storm集群和运行在其上的Topology,该如何做呢?

    Storm已经为你考虑到了,Storm支持Thrift的C/S架构,在部署Nimbus组件的机器上启动一个Thrift Server进程来提供服务,我们可以通过编写一个Thrift Client来请求Thrift Server,来获取你想得到的集群和Topology的相关数据,来接入监控平台,如Zabbix等,我目前使用的就是Zabbix。

    整体的流程已经清楚了,下面就来实践吧。

    1 安装Thrift

    由于我们要使用Thrift来编译Storm的源代码来获得Thrift Client相关的Java源代码,所以需要先安装Thrift,这里选取的版本为0.9.2。

    到官网好安装包:http://thrift.apache.org/

    编译安装:configure && make && make install

    验证:thrift --version

    如果打印出Thrift version 0.9.2,代表安装成功。

    2 编译Thrift Client代码

    首先下载Storm源代码,这里使用最新的0.9.3版本:http://mirrors.hust.edu.cn/apache/storm/apache-storm-0.9.3/apache-storm-0.9.3-src.tar.gz

    解压后进行编译:thrift -gen java apache-storm-0.9.3/storm-core/src/storm.thrift

    在当前目录下出现gen-java文件夹,此文件夹下就是Thrift Client的源代码了。

    3 使用Thrift Client API

    然后创建一个Maven项目来进行执行监控数据的获取。

    项目生成一个Jar文件,输入一些命令和自定义参数,然后输出结果。

    以命令行的形式进行调用,这样可以方便的接入监控,当然使用形式可以根据自身情况施行。

    创建好后,把gen-java生成的代码拷贝进来。

    在pom.xml里引入Thrift对应版本的库:

     

    1
    2
    3
    4
    5
    <dependency>
        
    <groupId>org.apache.thrift</groupId>
        
    <artifactId>libthrift</artifactId>
        
    <version>
    0.9
    .
    2
    </version>
    </dependency>

    首先写一些Thrift相关的辅助类。

    ClientInfo.java

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    package
    com.damacheng009.storm.monitor.thrift;
     
    import
    org.apache.thrift.protocol.TBinaryProtocol;
    import
    org.apache.thrift.transport.TFramedTransport;
    import
    org.apache.thrift.transport.TSocket;
     
    import
    backtype.storm.generated.Nimbus;
     
    /**
     
    * 代表一个Thrift Client的信息
     
    * @author jb-xingchencheng
     
    *
     
    */
    public
    class
    ClientInfo {
        
    private
    TSocket tsocket;
        
    private
    TFramedTransport tTransport;
        
    private
    TBinaryProtocol tBinaryProtocol;
        
    private
    Nimbus.Client client;
     
        
    public
    TSocket getTsocket() {
            
    return
    tsocket;
        
    }
     
        
    public
    void
    setTsocket(TSocket tsocket) {
            
    this
    .tsocket = tsocket;
        
    }
     
        
    public
    TFramedTransport gettTransport() {
            
    return
    tTransport;
        
    }
     
        
    public
    void
    settTransport(TFramedTransport tTransport) {
            
    this
    .tTransport = tTransport;
        
    }
     
        
    public
    TBinaryProtocol gettBinaryProtocol() {
            
    return
    tBinaryProtocol;
        
    }
     
        
    public
    void
    settBinaryProtocol(TBinaryProtocol tBinaryProtocol) {
            
    this
    .tBinaryProtocol = tBinaryProtocol;
        
    }
     
        
    public
    Nimbus.Client getClient() {
            
    return
    client;
        
    }
     
        
    public
    void
    setClient(Nimbus.Client client) {
            
    this
    .client = client;
        
    }
    }
    ClientManager.java

     

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    package
    com.damacheng009.storm.monitor.thrift;
     
    import
    org.apache.thrift.protocol.TBinaryProtocol;
    import
    org.apache.thrift.transport.TFramedTransport;
    import
    org.apache.thrift.transport.TSocket;
    import
    org.apache.thrift.transport.TTransportException;
     
    import
    backtype.storm.generated.Nimbus;
     
    /**
     
    * Thrift Client管理类
     
    * @author jb-xingchencheng
     
    *
     
    */
    public
    class
    ClientManager {
        
    public
    static
    ClientInfo getClient(String nimbusHost,
    int
    nimbusPort)
    throws
    TTransportException {
            
    ClientInfo client =
    new
    ClientInfo();
            
    TSocket tsocket =
    new
    TSocket(nimbusHost, nimbusPort);
            
    TFramedTransport tTransport =
    new
    TFramedTransport(tsocket);
            
    TBinaryProtocol tBinaryProtocol =
    new
    TBinaryProtocol(tTransport);
            
    Nimbus.Client c =
    new
    Nimbus.Client(tBinaryProtocol);
            
    tTransport.open();
            
    client.setTsocket(tsocket);
            
    client.settTransport(tTransport);
            
    client.settBinaryProtocol(tBinaryProtocol);
            
    client.setClient(c);
             
            
    return
    client; 
        
    }
         
        
    public
    static
    void
    closeClient(ClientInfo client) {
            
    if
    (
    null
    == client) {
                
    return
    ;
            
    }
             
            
    if
    (
    null
    != client.gettTransport()) {
                
    client.gettTransport().close();
            
    }
             
            
    if
    (
    null
    != client.getTsocket()) {
                
    client.getTsocket().close();
            
    }
        
    }
    }
    然后就可以写自己的逻辑去获取集群和拓扑的数据了,Storm提供的UI界面上展示的数据基本都可以获取到,这里只举出一个简单的例子,我们想获得某个拓扑发生异常的次数,和发生的异常的堆栈。剩下的项目你可以随意的定制。

     

    下面是入口类:

    Main.java

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    package
    com.damacheng009.storm.monitor;
     
    import
    com.damacheng009.storm.monitor.logic.Logic;
     
    /**
     
    * 入口类
     
    * @author jb-xingchencheng
     
    *
     
    */
    public
    class
    Main {
        
    // NIMBUS的信息
        
    public
    static
    String NIMBUS_HOST =
    "192.168.180.36"
    ;
        
    public
    static
    int
    NIMBUS_PORT =
    6627
    ;
     
        
    /**
         
    * 命令格式 CMD(命令) [ARG0] [ARG1] ...(更多参数)
         
    * @param args
         
    */
        
    public
    static
    void
    main(String[] args) {
            
    if
    (args.length <
    3
    ) {
                
    return
    ;
            
    }
             
            
    NIMBUS_HOST = args[
    0
    ];
            
    NIMBUS_PORT = Integer.parseInt(args[
    1
    ]);
             
            
    String cmd = args[
    2
    ];
            
    String result =
    "-1"
    ;
            
    if
    (cmd.equals(
    "get_topo_exp_size"
    )) {
                
    String topoName = args[
    3
    ];
                
    result = Logic.getTopoExpSize(topoName);
            
    }
    else
    if
    (cmd.equals(
    "get_topo_exp_stack_trace"
    )) {
                
    String topoName = args[
    3
    ];
                
    result = Logic.getTopoExpStackTrace(topoName);
            
    }
             
            
    System.out.println(result);
        
    }
    }

     

    测试的时候把具体的HOST和PORT改一下即可。

    然后是具体的逻辑类。

    Logic.java

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    package
    com.damacheng009.storm.monitor.logic;
     
    import
    java.util.Date;
    import
    java.util.List;
    import
    java.util.Set;
     
    import
    com.damacheng009.storm.monitor.Main;
    import
    com.damacheng009.storm.monitor.thrift.ClientInfo;
    import
    com.damacheng009.storm.monitor.thrift.ClientManager;
     
    import
    backtype.storm.generated.ClusterSummary;
    import
    backtype.storm.generated.ErrorInfo;
    import
    backtype.storm.generated.TopologyInfo;
    import
    backtype.storm.generated.TopologySummary;
     
    public
    class
    Logic {
        
    /**
         
    * 取得某个拓扑的异常个数
         
    * @param topoName
         
    * @return
         
    */
        
    public
    static
    String getTopoExpSize(String topoName) {
            
    ClientInfo client =
    null
    ;
            
    int
    errorTotal =
    0
    ;
             
            
    try
    {
                
    client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT);
                 
                
    ClusterSummary clusterSummary = client.getClient().getClusterInfo();
                
    List<TopologySummary> topoSummaryList = clusterSummary.getTopologies();
                
    for
    (TopologySummary ts : topoSummaryList) {
                    
    if
    (ts.getName().equals(topoName)) {
                        
    TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId());
                        
    Set<String> errorKeySet = topologyInfo.getErrors().keySet();
                        
    for
    (String errorKey : errorKeySet) {
                            
    List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey);
                            
    errorTotal += listErrorInfo.size();
                        
    }
                        
    break
    ;
                    
    }
                
    }
                 
                
    return
    String.valueOf(errorTotal);
            
    }
    catch
    (Exception e) {
                
    return
    "-1"
    ;
            
    }
    finally
    {
                
    ClientManager.closeClient(client);
            
    }  
        
    }
     
        
    /**
         
    * 返回某个拓扑的异常堆栈
         
    * @param topoName
         
    * @return
         
    */
        
    public
    static
    String getTopoExpStackTrace(String topoName) {
            
    ClientInfo client =
    null
    ;
            
    StringBuilder error =
    new
    StringBuilder();
             
            
    try
    {
                
    client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT);
                 
                
    ClusterSummary clusterSummary = client.getClient().getClusterInfo();
                
    List<TopologySummary> topoSummaryList = clusterSummary.getTopologies();
                
    for
    (TopologySummary ts : topoSummaryList) {
                    
    if
    (ts.getName().equals(topoName)) {
                        
    TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId());
                        
    // 得到错误信息
                        
    Set<String> errorKeySet = topologyInfo.getErrors().keySet();
                        
    for
    (String errorKey : errorKeySet) {
                            
    List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey);
                            
    for
    (ErrorInfo ei : listErrorInfo) {
                                
    // 发生异常的时间
                                
    long
    expTime = (
    long
    ) ei.getError_time_secs() *
    1000
    ;
                                
    // 现在的时间
                                
    long
    now = System.currentTimeMillis();
                                 
                                
    // 由于获取的是全量的错误堆栈,我们可以设置一个范围来获取指定范围的错误,看情况而定
                                
    // 如果超过5min,那么就不用记录了,因为5min检查一次
                                
    if
    (now - expTime >
    1000
    *
    60
    *
    5
    ) {
                                    
    continue
    ;
                                
    }
                                 
                                
    error.append(
    new
    Date(expTime) +
    "\n"
    );
                                
    error.append(ei.getError() +
    "\n"
    );
                            
    }
                        
    }
                         
                        
    break
    ;
                    
    }
                
    }
                 
                
    return
    error.toString().isEmpty() ?
    "none"
    : error.toString();
            
    }
    catch
    (Exception e) {
                
    return
    "-1"
    ;
            
    }
    finally
    {
                
    ClientManager.closeClient(client);
            
    }
        
    }
    }

     

    最后打成一个Jar包,就可以跑起来接入监控系统了,如在Zabbix中,可以把各个监控项设置为自定义的item,在Zabbix Client中配置命令行来运行Jar取得数据。

    接下来的测试过程先略过。

    对于Storm监控的实践,目前就是这样了。

     

转载于:https://www.cnblogs.com/leo3689/p/5158138.html

你可能感兴趣的文章
web安全问题分析与防御总结
查看>>
React 组件通信之 React context
查看>>
Linux下通过配置Crontab实现进程守护
查看>>
ios 打包上传Appstore 时报的错误 90101 90149
查看>>
密码概述
查看>>
jQuery的技巧01
查看>>
基于泛型实现的ibatis通用分页查询
查看>>
gopacket 使用
查看>>
AlertDialog对话框
查看>>
我的友情链接
查看>>
linux安全---cacti+ntop监控
查看>>
鸟哥的linux私房菜-shell简单学习-1
查看>>
nagios配置监控的一些思路和工作流程
查看>>
通讯组基本管理任务三
查看>>
Centos下基于Hadoop安装Spark(分布式)
查看>>
3D地图的定时高亮和点击事件(基于echarts)
查看>>
mysql开启binlog
查看>>
设置Eclipse编码方式
查看>>
分布式系统唯一ID生成方案汇总【转】
查看>>
并查集hdu1232
查看>>