
本文主要介绍使用 coreos 提供的 Java 客户端(jetcd)来操作 etcd,文中所使用到的软件版本:etcd 3.5.18、jetcd 0.7.7。
1、引入依赖
<dependency> <groupId>io.etcd</groupId> <artifactId>jetcd-core</artifactId> <version>0.7.7</version> </dependency>
2、jetcd 使用
2.1、初始化客户端
@Before public void before() { client = Client.builder() .endpoints("http://10.49.196.33:2379") //.endpoints("http://10.49.196.30:2379", "http://10.49.196.31:2379", "http://10.49.196.33:2379") .connectTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); }
2.2、键值操作
A、新增/修改
@Test public void kvPut() throws Exception { KV kv = client.getKVClient(); ByteSequence key = ByteSequence.from("key2", StandardCharsets.UTF_8); ByteSequence value = ByteSequence.from("value2", StandardCharsets.UTF_8); CompletableFuture<PutResponse> completableFuture = kv.put(key, value); log.info("completableFuture={}", completableFuture.get()); }
B、查询
@Test public void kvGet() throws Exception { KV kv = client.getKVClient(); ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8); CompletableFuture<GetResponse> completableFuture = kv.get(key); GetResponse getResponse = completableFuture.get(); if (getResponse.getCount() > 0) { log.info("value={}", getResponse.getKvs().get(0).getValue()); } key = ByteSequence.from("key", StandardCharsets.UTF_8); GetOption getOption = GetOption.builder().isPrefix(true).build(); completableFuture = kv.get(key, getOption);//查询健以”key“开头的数据 for (KeyValue keyValue : completableFuture.get().getKvs()) { log.info("key={},value={}", keyValue.getKey(), keyValue.getValue()); } }
C、删除
@Test public void kvDelete() throws Exception { KV kv = client.getKVClient(); ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8); CompletableFuture<DeleteResponse> completableFuture = kv.delete(key); log.info("completableFuture={}", completableFuture.get()); }
2.3、监控
@Test public void watch() throws Exception { Watch watch = client.getWatchClient(); watch.watch(ByteSequence.from("key1", StandardCharsets.UTF_8), new Watch.Listener() { @Override public void onNext(WatchResponse response) { List<WatchEvent> events = response.getEvents(); for (WatchEvent watchEvent : events) { log.info("eventType={},value={}", watchEvent.getEventType(), watchEvent.getKeyValue().getValue()); } } @Override public void onError(Throwable throwable) { log.error("发生异常:{}", throwable.getMessage()); } @Override public void onCompleted() { log.info("complete"); } }); Thread.sleep(1000 * 60 * 5); }
2.4、租约
@Test public void lease() throws Exception { Lease lease = client.getLeaseClient(); //创建租约 LeaseGrantResponse leaseGrantResponse = lease.grant(10).get(); long leaseId = leaseGrantResponse.getID(); //租约与键值数据绑定 ByteSequence key = ByteSequence.from("lease-key", StandardCharsets.UTF_8); ByteSequence value = ByteSequence.from("lease-value", StandardCharsets.UTF_8); PutOption putOption = PutOption.builder().withLeaseId(leaseId).build(); client.getKVClient().put(key, value, putOption).get(); Thread.sleep(1000); //查看租约剩余时间 LeaseOption leaseOption = LeaseOption.builder().withAttachedKeys().build(); LeaseTimeToLiveResponse leaseTimeToLiveResponse = lease.timeToLive(leaseId, leaseOption).get(); log.info("leaseTimeToLiveResponse={}", leaseTimeToLiveResponse); //使租约一直有效 lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() { @Override public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) { log.info("Lease keep-alive response:{}", leaseGrantResponse.getTTL()); } @Override public void onError(Throwable throwable) { log.info("发生异常:{}", throwable.getMessage()); } @Override public void onCompleted() { log.info("Complete"); } }); Thread.sleep(1000 * 30); //撤销租约 LeaseRevokeResponse leaseRevokeResponse = lease.revoke(leaseId).get(); log.info("leaseRevokeResponse={}", leaseRevokeResponse); }
2.5、锁
@Test public void lock() throws Exception { ByteSequence lockName = ByteSequence.from("my-lock", StandardCharsets.UTF_8); for (int i = 1; i <= 3; i++) { new Thread(() -> { try { LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(10).get(); long leaseId = leaseGrantResponse.getID(); Lock lock = client.getLockClient(); //阻塞获取锁 LockResponse lockResponse = lock.lock(lockName, leaseId).get(); ByteSequence lockKey = lockResponse.getKey(); log.info("{} 获得锁 {}", Thread.currentThread().getName(), lockResponse.getKey()); Thread.sleep(3000); //释放锁,租约撤销或到期也会释放锁 lock.unlock(lockKey).get(); } catch (Exception e) { log.error("", e); } }).start(); } Thread.sleep(1000 * 20); }
2.6、选举
@Test public void election() throws Exception { ByteSequence electionName = ByteSequence.from("electionName", StandardCharsets.UTF_8); ByteSequence proposal1 = ByteSequence.from("proposal1", StandardCharsets.UTF_8); ByteSequence proposal2 = ByteSequence.from("proposal2", StandardCharsets.UTF_8); ByteSequence proposal3 = ByteSequence.from("proposal3", StandardCharsets.UTF_8); ByteSequence[] proposals = new ByteSequence[]{proposal1, proposal2, proposal3}; for (ByteSequence proposal : proposals) { new Thread(() -> { try { Election election = client.getElectionClient(); //监听选举事件(可选) election.observe(electionName, new Election.Listener() { @Override public void onNext(LeaderResponse leaderResponse) { log.info("proposal={},key={},value={}", proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue()); } @Override public void onError(Throwable throwable) { log.error("发生异常:{}", throwable.getMessage()); } @Override public void onCompleted() { log.info("complete"); } }); LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(5).get(); long leaseId = leaseGrantResponse.getID(); client.getLeaseClient().keepAlive(leaseId, null); //获得领导权限或租约到期退出等待 CampaignResponse campaignResponse = election.campaign(electionName, leaseId, proposal).get(); LeaderKey leaderKey = campaignResponse.getLeader(); log.info("{},获得领导权,{}", proposal, leaderKey.getKey()); //获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException LeaderResponse leaderResponse = election.leader(electionName).get(); log.info("领导者:{}", leaderResponse.getKv().getValue()); //TODO:业务处理 Thread.sleep(1000 * 6); //释放领导权 election.resign(leaderKey).get(); client.getLeaseClient().revoke(leaseId); } catch (Exception e) { log.error("", e); } }).start(); } Thread.sleep(1000 * 30); }
2.7、完整代码
package com.abc.etcd; import io.etcd.jetcd.*; import io.etcd.jetcd.election.CampaignResponse; import io.etcd.jetcd.election.LeaderKey; import io.etcd.jetcd.election.LeaderResponse; import io.etcd.jetcd.kv.DeleteResponse; import io.etcd.jetcd.kv.GetResponse; import io.etcd.jetcd.kv.PutResponse; import io.etcd.jetcd.lease.LeaseGrantResponse; import io.etcd.jetcd.lease.LeaseKeepAliveResponse; import io.etcd.jetcd.lease.LeaseRevokeResponse; import io.etcd.jetcd.lease.LeaseTimeToLiveResponse; import io.etcd.jetcd.lock.LockResponse; import io.etcd.jetcd.options.GetOption; import io.etcd.jetcd.options.LeaseOption; import io.etcd.jetcd.options.PutOption; import io.etcd.jetcd.watch.WatchEvent; import io.etcd.jetcd.watch.WatchResponse; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.List; import java.util.concurrent.CompletableFuture; @Slf4j public class EtcdCase { private Client client; @Before public void before() { client = Client.builder() .endpoints("http://10.49.196.33:2379") //.endpoints("http://10.49.196.30:2379", "http://10.49.196.31:2379", "http://10.49.196.33:2379") .connectTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); } @After public void after() { client.close(); } @Test public void kvPut() throws Exception { KV kv = client.getKVClient(); ByteSequence key = ByteSequence.from("key2", StandardCharsets.UTF_8); ByteSequence value = ByteSequence.from("value2", StandardCharsets.UTF_8); CompletableFuture<PutResponse> completableFuture = kv.put(key, value); log.info("completableFuture={}", completableFuture.get()); } @Test public void kvGet() throws Exception { KV kv = client.getKVClient(); ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8); CompletableFuture<GetResponse> completableFuture = kv.get(key); GetResponse getResponse = completableFuture.get(); if (getResponse.getCount() > 0) { log.info("value={}", getResponse.getKvs().get(0).getValue()); } key = ByteSequence.from("key", StandardCharsets.UTF_8); GetOption getOption = GetOption.builder().isPrefix(true).build(); completableFuture = kv.get(key, getOption);//查询健以”key“开头的数据 for (KeyValue keyValue : completableFuture.get().getKvs()) { log.info("key={},value={}", keyValue.getKey(), keyValue.getValue()); } } @Test public void kvDelete() throws Exception { KV kv = client.getKVClient(); ByteSequence key = ByteSequence.from("key1", StandardCharsets.UTF_8); CompletableFuture<DeleteResponse> completableFuture = kv.delete(key); log.info("completableFuture={}", completableFuture.get()); } @Test public void watch() throws Exception { Watch watch = client.getWatchClient(); watch.watch(ByteSequence.from("key1", StandardCharsets.UTF_8), new Watch.Listener() { @Override public void onNext(WatchResponse response) { List<WatchEvent> events = response.getEvents(); for (WatchEvent watchEvent : events) { log.info("eventType={},value={}", watchEvent.getEventType(), watchEvent.getKeyValue().getValue()); } } @Override public void onError(Throwable throwable) { log.error("发生异常:{}", throwable.getMessage()); } @Override public void onCompleted() { log.info("complete"); } }); Thread.sleep(1000 * 60 * 5); } @Test public void lease() throws Exception { Lease lease = client.getLeaseClient(); //创建租约 LeaseGrantResponse leaseGrantResponse = lease.grant(10).get(); long leaseId = leaseGrantResponse.getID(); //租约与键值数据绑定 ByteSequence key = ByteSequence.from("lease-key", StandardCharsets.UTF_8); ByteSequence value = ByteSequence.from("lease-value", StandardCharsets.UTF_8); PutOption putOption = PutOption.builder().withLeaseId(leaseId).build(); client.getKVClient().put(key, value, putOption).get(); Thread.sleep(1000); //查看租约剩余时间 LeaseOption leaseOption = LeaseOption.builder().withAttachedKeys().build(); LeaseTimeToLiveResponse leaseTimeToLiveResponse = lease.timeToLive(leaseId, leaseOption).get(); log.info("leaseTimeToLiveResponse={}", leaseTimeToLiveResponse); //使租约一直有效 lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() { @Override public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) { log.info("Lease keep-alive response:{}", leaseGrantResponse.getTTL()); } @Override public void onError(Throwable throwable) { log.info("发生异常:{}", throwable.getMessage()); } @Override public void onCompleted() { log.info("Complete"); } }); Thread.sleep(1000 * 30); //撤销租约 LeaseRevokeResponse leaseRevokeResponse = lease.revoke(leaseId).get(); log.info("leaseRevokeResponse={}", leaseRevokeResponse); } @Test public void lock() throws Exception { ByteSequence lockName = ByteSequence.from("my-lock", StandardCharsets.UTF_8); for (int i = 1; i <= 3; i++) { new Thread(() -> { try { LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(10).get(); long leaseId = leaseGrantResponse.getID(); Lock lock = client.getLockClient(); //阻塞获取锁 LockResponse lockResponse = lock.lock(lockName, leaseId).get(); ByteSequence lockKey = lockResponse.getKey(); log.info("{} 获得锁 {}", Thread.currentThread().getName(), lockResponse.getKey()); Thread.sleep(3000); //释放锁,租约撤销或到期也会释放锁 lock.unlock(lockKey).get(); } catch (Exception e) { log.error("", e); } }).start(); } Thread.sleep(1000 * 20); } @Test public void election() throws Exception { ByteSequence electionName = ByteSequence.from("electionName", StandardCharsets.UTF_8); ByteSequence proposal1 = ByteSequence.from("proposal1", StandardCharsets.UTF_8); ByteSequence proposal2 = ByteSequence.from("proposal2", StandardCharsets.UTF_8); ByteSequence proposal3 = ByteSequence.from("proposal3", StandardCharsets.UTF_8); ByteSequence[] proposals = new ByteSequence[]{proposal1, proposal2, proposal3}; for (ByteSequence proposal : proposals) { new Thread(() -> { try { Election election = client.getElectionClient(); //监听选举事件(可选) election.observe(electionName, new Election.Listener() { @Override public void onNext(LeaderResponse leaderResponse) { log.info("proposal={},key={},value={}", proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue()); } @Override public void onError(Throwable throwable) { log.error("发生异常:{}", throwable.getMessage()); } @Override public void onCompleted() { log.info("complete"); } }); LeaseGrantResponse leaseGrantResponse = client.getLeaseClient().grant(5).get(); long leaseId = leaseGrantResponse.getID(); client.getLeaseClient().keepAlive(leaseId, null); //获得领导权限或租约到期退出等待 CampaignResponse campaignResponse = election.campaign(electionName, leaseId, proposal).get(); LeaderKey leaderKey = campaignResponse.getLeader(); log.info("{},获得领导权,{}", proposal, leaderKey.getKey()); //获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException LeaderResponse leaderResponse = election.leader(electionName).get(); log.info("领导者:{}", leaderResponse.getKv().getValue()); //TODO:业务处理 Thread.sleep(1000 * 6); //释放领导权 election.resign(leaderKey).get(); client.getLeaseClient().revoke(leaseId); } catch (Exception e) { log.error("", e); } }).start(); } Thread.sleep(1000 * 30); } }
EtcdCase.java
参考:
https://github.com/etcd-io/jetcd。