本文主要介绍 BookKeeper 的 API,文中所使用到的软件版本:Java 1.8.0_341、BookKeeper 4.16.5。
1、引入依赖
<dependency><groupId>org.apache.bookkeeper</groupId><artifactId>bookkeeper-server</artifactId><version>4.16.5</version> </dependency>
2、Ledger API
2.1、传统 API
2.1.1、创建客户端
public void createBookKeeper() throws Exception {ClientConfiguration configuration = new ClientConfiguration();configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");//单机//configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");//集群configuration.setAddEntryTimeout(2000);bookKeeper = new BookKeeper(configuration); }
2.1.2、创建 ledger 并添加 entry
//LedgerHandle ledgerHandle = bookKeeper.createLedger(3, 2, 2, BookKeeper.DigestType.MAC, "123".getBytes()); LedgerHandle ledgerHandle = bookKeeper.createLedger(BookKeeper.DigestType.MAC, "123".getBytes()); log.info("ledgerId={}", ledgerHandle.getId()); for (int i = 0; i < 10; i++) {long id = ledgerHandle.addEntry(("abc" + i).getBytes());log.info("id={}", id); }ledgerHandle.close();
创建 ledger 时可以指定存储该 ledger 的节点个数、副本个数、几个副本 ack 表示写入成功。
2.1.3、从 ledger 中读取 entry
public void readLedger() throws Exception {LedgerHandle ledgerHandle = bookKeeper.openLedger(0L, BookKeeper.DigestType.MAC, "123".getBytes());Enumeration<LedgerEntry> ledgerEntrys = ledgerHandle.readEntries(0, ledgerHandle.getLastAddConfirmed());while (ledgerEntrys.hasMoreElements()) {LedgerEntry ledgerEntry = ledgerEntrys.nextElement();log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntry()));}ledgerHandle.close(); }
2.1.4、删除 ledger
public void deleteLedger() throws Exception {bookKeeper.deleteLedger(0L); }
2.1.5、完整代码
package com.abc.demo.bk;import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.conf.ClientConfiguration; import org.junit.After; import org.junit.Before; import org.junit.Test;import java.util.Enumeration;@Slf4j public class LedgerCase {private BookKeeper bookKeeper;@Beforepublic void createBookKeeper() throws Exception {ClientConfiguration configuration = new ClientConfiguration();//configuration.setZkServers("10.49.196.33:2181");configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");//configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");configuration.setAddEntryTimeout(2000);bookKeeper = new BookKeeper(configuration);}@Afterpublic void closeBookKeeper() throws Exception {bookKeeper.close();}@Testpublic void createLedger() throws Exception {//LedgerHandle ledgerHandle = bookKeeper.createLedger(3, 2, 2, BookKeeper.DigestType.MAC, "123".getBytes());LedgerHandle ledgerHandle = bookKeeper.createLedger(BookKeeper.DigestType.MAC, "123".getBytes());log.info("ledgerId={}", ledgerHandle.getId());for (int i = 0; i < 10; i++) {long id = ledgerHandle.addEntry(("abc" + i).getBytes());log.info("id={}", id);}ledgerHandle.close();}@Testpublic void readLedger() throws Exception {LedgerHandle ledgerHandle = bookKeeper.openLedger(0L, BookKeeper.DigestType.MAC, "123".getBytes());Enumeration<LedgerEntry> ledgerEntrys = ledgerHandle.readEntries(0, ledgerHandle.getLastAddConfirmed());while (ledgerEntrys.hasMoreElements()) {LedgerEntry ledgerEntry = ledgerEntrys.nextElement();log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntry()));}ledgerHandle.close();}@Testpublic void deleteLedger() throws Exception {bookKeeper.deleteLedger(0L);} }
2.2、新 API
自 4.6 版本开始,BookKeeper 提供了一个新的客户端 API,利用了Java 8 的 CompletableFuture 功能。引入了 WriteHandle、WriteAdvHandle 和 ReadHandle 来替换通用的 LedgerHandle。新的 API 在 org.apache.bookkeeper.client.api 包中,应该只使用该包中定义的接口。4.6 版本的新 API 仍然是实验性的,并可能在后续的次要版本中进行更改。
2.2.1、创建客户端
public void createBookKeeper() throws Exception {ClientConfiguration configuration = new ClientConfiguration();configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");//单机//configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");//集群configuration.setAddEntryTimeout(2000);bookKeeper = BookKeeper.newBuilder(configuration).build(); }
2.2.2、创建 ledger 并添加 entry
public void write() throws Exception {WriteHandle writeHandle = bookKeeper.newCreateLedgerOp().withDigestType(DigestType.MAC).withPassword("123".getBytes()).withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2).execute().get();log.info("ledgerId={}", writeHandle.getId());for (int i = 0; i < 10; i++) {long id = writeHandle.append(("bcd" + i).getBytes());log.info("id={}", id);}writeHandle.close(); }
2.2.3、从 ledger 中读取 entry
public void read() throws Exception {ReadHandle readHandle = bookKeeper.newOpenLedgerOp().withLedgerId(1L).withDigestType(DigestType.MAC).withPassword("123".getBytes()).execute().get();LedgerEntries ledgerEntries = readHandle.read(0, readHandle.getLastAddConfirmed());Iterator<LedgerEntry> iterator = ledgerEntries.iterator();while (iterator.hasNext()) {LedgerEntry ledgerEntry = iterator.next();log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntryBytes()));}readHandle.close(); }
2.2.4、删除 ledger
public void delete() throws Exception {bookKeeper.newDeleteLedgerOp().withLedgerId(101L).execute().get(); }
2.2.5、完整代码
package com.abc.demo.bk;import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.*; import org.apache.bookkeeper.conf.ClientConfiguration; import org.junit.After; import org.junit.Before; import org.junit.Test;import java.util.Iterator;@Slf4j public class LedgerNewCase {private BookKeeper bookKeeper;@Beforepublic void createBookKeeper() throws Exception {ClientConfiguration configuration = new ClientConfiguration();//configuration.setZkServers("10.49.196.33:2181");//configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");configuration.setAddEntryTimeout(2000);bookKeeper = BookKeeper.newBuilder(configuration).build();}@Afterpublic void closeBookKeeper() throws Exception {bookKeeper.close();}@Testpublic void write() throws Exception {WriteHandle writeHandle = bookKeeper.newCreateLedgerOp().withDigestType(DigestType.MAC).withPassword("123".getBytes()).withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2).execute().get();log.info("ledgerId={}", writeHandle.getId());for (int i = 0; i < 10; i++) {long id = writeHandle.append(("bcd" + i).getBytes());log.info("id={}", id);}writeHandle.close();}@Testpublic void read() throws Exception {ReadHandle readHandle = bookKeeper.newOpenLedgerOp().withLedgerId(1L).withDigestType(DigestType.MAC).withPassword("123".getBytes()).execute().get();LedgerEntries ledgerEntries = readHandle.read(0, readHandle.getLastAddConfirmed());Iterator<LedgerEntry> iterator = ledgerEntries.iterator();while (iterator.hasNext()) {LedgerEntry ledgerEntry = iterator.next();log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntryBytes()));}readHandle.close();}@Testpublic void delete() throws Exception {bookKeeper.newDeleteLedgerOp().withLedgerId(101L).execute().get();} }
3、Advanced Ledger API
在 4.5.0 版本中,BookKeeper 引入了一些高级 API 用于高级功能。高级 API 和普通 API 主要区别在写 entry,读 entry 是一致的。
3.1、LedgerHandleAdv
LedgerHandleAdv 是 LedgerHandle 的高级扩展,在创建时可以指定 LedgerId,在添加 entry 时需要传入 entryId。
public void ledgerHandleAdv() throws Exception {ClientConfiguration configuration = new ClientConfiguration();//configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");configuration.setAddEntryTimeout(2000);org.apache.bookkeeper.client.BookKeeper bookKeeper = new org.apache.bookkeeper.client.BookKeeper(configuration);//LedgerHandleAdv ledgerHandle = (LedgerHandleAdv) bookKeeper.createLedgerAdv(3, 2, 2, BookKeeper.DigestType.MAC, "123".getBytes());LedgerHandleAdv ledgerHandleAdv = (LedgerHandleAdv) bookKeeper.createLedgerAdv(100L, 3, 2, 2, org.apache.bookkeeper.client.BookKeeper.DigestType.MAC, "123".getBytes(), Collections.emptyMap());log.info("ledgerId={}", ledgerHandleAdv.getId());for (int i = 0; i < 10; i++) {long id = ledgerHandleAdv.addEntry(i, ("abc" + i).getBytes()); //entry id 需从 0 开始log.info("id={}", id);}ledgerHandleAdv.close();bookKeeper.close(); }
3.2、4.6 版本新 API 的 LedgerHandleAdv
public void writeAdvHandle() throws Exception {ClientConfiguration configuration = new ClientConfiguration();//configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");org.apache.bookkeeper.client.api.BookKeeper bookKeeper = org.apache.bookkeeper.client.api.BookKeeper.newBuilder(configuration).build();WriteAdvHandle writeAdvHandle = bookKeeper.newCreateLedgerOp().withDigestType(DigestType.MAC).withPassword("123".getBytes()).withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2).makeAdv() //CreateBuilder 转为 CreateAdvBuilder.withLedgerId(101L).execute().get();log.info("ledgerId={}", writeAdvHandle.getId());for (int i = 0; i < 10; i++) {long id = writeAdvHandle.write(i, ("bcd" + i).getBytes());log.info("id={}", id);}writeAdvHandle.close();bookKeeper.close(); }
参考:
https://bookkeeper.apache.org/docs/api/overview