slurm集群搭建,pulsar部署
chanong
|集群规划至少需要三个组件来构建Pulsar 集群:ZooKeeper 集群、BookKeeper 集群和Broker 集群(Broker 是Pulsar 自己的实例)。三个集群组件是:
大数据12
大数据13
大数据14
动物园管理员
动物园管理员
动物园管理员
簿记员
簿记员
簿记员
经纪人
经纪人
经纪人
集群部署及JDK1.8安装(略)
安装ZooKeeper(略)
准备pulsar环境解压安装包
[bigdata@bigdata12 软件]$ tar -zxvf apache-pulsar-2.6.4-bin.tar.gz -C /opt/module/bigdata@bigdata12 软件]$ cd ./module/[bigdata@bigdata12 模块]$ mv apache-pulsar-2.6.4 pulsar-2.6.4 启动zk 集群
[bigdata@bigdata12 software]$ zkcluster.sh start[bigdata@bigdata12 software]$ zkcluster.sh status 初始化元数据初始化ZooKeeper 节点的元数据。
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsarInitialize-cluster-metadata \ --cluster pulsar-cluster-zk \ --zookeeper bigdata12:2181 \ --configuration-store bigdata12:2181 \ --web-service-url http://bigdata12:8088 \ - -web-service-url-tls https://bigdata12:8443 \ --broker-service-url pulsar://bigdata12:6650 \ --broker-service-url-tls pulsar+ssl://bigdata12:6651 如果初始化成功,您将看到以下提示。 19:41:47. 168 [main] INFO org.apache.pulsar.PulsarClusterMetadataSetup - “pulsar-cluster-zk”的集群元数据已正确设置
检查元数据是否初始化成功
zk下有这些目录就说明初始化成功了。
[zk: localhost:2181(已连接) 0] ls /admin/clusters/pulsar-cluster-zk [zk: localhost:2181(已连接) 1] ls /namespace 部署BookKeeper 部署bookies
BookKeeper 书键可以通过配置文件conf/bookkeeper.conf 进行配置。配置bookkey 时最重要的步骤是确保zkServers 设置为Pulsar 集群的本地Zookeeper 集群连接信息。修改配置文件bookkeeper.conf。
[bigdata@bigdata12 conf]$ vi bookkeeper.conf#将dedicatedAddress改为自己服务器对应的IP,另外两台服务器也做相应修改advertisementAddress=bigdata12#更改以下两个文件目录地址:journalDirectories=/opt/module /pulsar - 2.6.4/tmp/journalledgerDirectories=/opt/module/pulsar-2.6.4/tmp/ledger# zk 更改地址和端口信息zkServers=bigdata12:2181,bigdata13:2181,bigdata14:2181 bookies 元数据的初始化
# 执行初始化元数据命令。出现提示时,输入Y 继续(只需在Bookie 节点上运行一次) [bigdata@bigdata12 pulsar-2.6.4]$ bin/bookkeeper Shell 元格式部署Broker 集群配置文件更改broker.conf
[bigdata@bigdata12 conf]$ vi Broker.conf#更改集群名称。这与您在ZooKeeper 中初始化元数据时指定的集群名称相同(--cluster pulsar-cluster-zk)。更改clusterName=pulsar-cluster-zk#。指定以下两个配置的ZooKeeper集群地址和端口号zookeeperServers=bigdata12:2181,bigdata13:2181,bigdata14:2181configurationStoreServers=bigdata12:2181,bigdata13:2181,bigdata14:2181#修改以下参数为此将改为IP服务器的地址。另外两个broker节点配置文件也要做相应修改。AdvertishedAddress=bigdata12发行版安装包[bigdata@bigdata12模块]$ xsync pulsar-2.6.4 修改bookkeeper.conf和Broker.conf。在配置文件中修改AdvertizedAddress=bigdata13和AdvertishedAddress=bigdata14。注意:广告地址不能重复。
修改client.conf 修改配置文件client.conf,将localhost修改为你的本地IP。
[bigdata@bigdata12 pulsar-2.6.4]$ vi conf/client.conf webServiceUrl=http://bigdata12:8088/brokerServiceUrl=pulsar://bigdata12:6650/Start Cluster 启动Bookie 集群
# 启动bookie作为后台进程[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-daemon start bookie# 检查启动是否成功[bigdata@bigdata12 pulsar-2.6.4]$ bin/bookkeeper shell bookiesanity[bigdata ] @bigdata13 pulsar -2.6.4]$ bin/pulsar-daemon 启动bookie[bigdata@bigdata13 pulsar-2.6.4]$ bin/bookkeeper shell bookiesanity[bigdata@bigdata14 pulsar-2.6.4]$ bin/pulsar-daemon 启动bookie [bigdata@ bigdata14 pulsar-2.6.4]$ bin/bookkeeper shell bookiesanity 启动代理集群
[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-daemon 启动代理[bigdata@bigdata13 pulsar-2.6.4]$ bin/pulsar-daemon 启动代理[bigdata@bigdata14 pulsar-2.6.4]$ bin/pulsar -daemon start Broker# 检查集群broker节点状态[bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-admin Brokers list pulsar-cluster group start script#!/bin/bashif [ $# -lt 1 ]then echo 'no args' exit;ficase $1 in'start') echo '====================启动Bookie 集群====================' for i in bigdata12 bigdata13 bigdata14 doecho '-------------开始$i ---------- ---- -'ssh $i '/opt/module/pulsar-2.6.4/bin/pulsar-daemon start bookie' did echo '====================启动代理cluster====================' for i in bigdata12 bigdata13 bigdata14 doecho '------------- -- 启动$i - - ------- ------ 'ssh $i '/opt/module/pulsar-2.6.4/bin/pulsar-daemon start Broker' 完成;'stop' ) echo '==================关闭Bookie集群====================' for i in bigdata12 bigdata13 bigdata14 doecho '-- -- ------- ---- 关闭$i --------------- 'ssh $i '/opt/module/pulsar -2.6.4/bin/pulsar -daemon stop bookie' Done echo '==================关闭代理集群===================' for i in bigdata12 bigdata13 bigdata14 doecho '- -------------- 关闭$i --------------- 'ssh $i '/opt/module/pulsar -2.6.4/bin/pulsar-daemon stop Broker' Done;*) echo '输入参数错误' echo '启动或停止输入';esac verify cluster# Tenant create [bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-admin租户创建first-tenant#创建命名空间(命名空间名称:first-tenant/first-ns1,指定租户first-tenant) [bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-admin namespaces create first-tenant/first-ns1#创建持久分区topic(topic全名:persistent://first-tenant/first-ns1/first-topic,分区数量为3)[bigdata@bigdata12 pulsar-2.6 .4]$ bin/pulsar-admin topic create-partitioned-topicpersistent://first-tenant/first-ns1/my-topic -p 3#测试主题订阅(-n:消费消息数,-s:订阅名称),-t:订阅类型)[ bigdata@bigdata12 pulsar-2.6.4]$ bin/pulsar-clientconsumerspersistent://first-tenant/first -ns1/first-topic -n 100 -s 'consumer-first-topic' - t 'Exclusive'# 发送消息给first话题。 topic(-n: 发送消息的次数,-m: 消息内容) [bigdata@bigdata13 pulsar-2.6.4]$ bin/pulsar-client applypersistent://first-tenant /first-ns1/first-topic -n 10 -m 'Hello Pulsar'Pulsar API操作,将以下内容添加到你的pom文件中
org.apache.pulsar pulsar-client 2.6.4 准备log4j.properties 文件
log4j.rootLogger=信息, stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%nlog4j.appender.logfile=org.apache.log4j.FileAppenderlog4j.appender.logfile.File=目标/pulsar.loglog4j.appender.logfile.layout=org.apache.log4j.PatternLayoutlog4j.appender.logfile.layout。 ConversionPattern=%d %p [%c]- %m%nProducer 包com.yunclass.pulsar;导入org.apache.pulsar.client.api.MessageId;导入org.apache.pulsar.client.api.Producer;导入org .apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;public类生产者分析{private static final string brokerList='pulsar: //bigdata12:6650,bigdata13:6650,bigdata14:6650'; public Static void domain ( String[] args) throws PulsarClientException { PulsarClient client=PulsarClient.builder().serviceUrl(brokerList).build(); ProducerProducer 分析=client.newProducer().topic('persistent://scott/ns1 /topic-partition ' ) .create(); String msg='Hello Pulsar!'; Longstart=System.currentTimeMillis(); MessageId msgId=Producer Analysis.send(msg.getBytes()); System.out.println('spend=' + ( system.currentTimeMillis() - start) + ';发送消息msgId=' + msgId.toString());ProducerAnalysis.close(); client.close(); }} 消费者包com.yunclass.pulsar;import org.apache .pulsar.client.api.Consumer;导入org.apache.pulsar.client.api.Message;导入org.apache.pulsar.client.api.PulsarClient;导入org.apache.pulsar.client.api.PulsarClientException ;公共类ConsumerAnalysis { private static Final String BrokerList='pulsar: //bigdata12:6650,bigdata13:6650,bigdata14:6650'; public static void main(String[] args) throws PulsarClientException { PulsarClient client=PulsarClient.builder().serviceU rl (bro) kerList).build (); Consumer Consumer=client.newConsumer() .topic('persistent://scott/ns1/topic-partition') .subscriptionName('ConsumerAnalysis') .subscribe(); while (true) { 收到消息=Consumer.receive (); System.out.println('consumer-Message selected:' + new String(receive.getData())); //确认消息,以便代理可以将其删除Consumer.acknowledge(receive); } }}








