当前位置: 首页 > news >正文

Elasticsearch 快照同机 异机备份到 MinIO(Java 实现)

📦 Elasticsearch 快照同机 & 异机备份到 MinIO(Java 实现)

一句话总结:通过 ES 快照 API + 本地/远程文件拉取 + tar.gz 压缩 + MinIO 存储,实现一套通用、自动、安全的 ES 备份方案,支持本地和远程部署。


🎯 背景

  • ES 本身支持快照(Snapshot)功能,但快照仅保存在本地仓库(如文件系统);
  • 若 ES 与应用不在同一台机器,需通过 SSH/SFTP 拉取快照目录;
  • 为防数据丢失,需将快照仓库定期打包上传至 MinIO
  • 保留最近 7 份快照(ES 层面),MinIO 只保留最新一份完整仓库备份。

🔧 前提条件

1️⃣ 修改 elasticsearch.yml(ES 服务端)

⚠️ 必须配置 path.repo,否则无法注册快照仓库!

# elasticsearch.yml
path.repo: ["/aaatmp/es_backupdata"]
  • 路径需存在,且 ES 进程有读写权限;
  • 修改后重启 ES

2️⃣ 注册快照仓库(只需一次)

通过 Kibana Dev Tools 或 curl 注册:

PUT /_snapshot/gxsj_eslog_backup
{"type": "fs","settings": {"location": "/aaatmp/es_backupdata","compress": true}
}
  • gxsj_eslog_backup:仓库名(与代码中 repoName 一致);
  • location:必须在 path.repo 列表中。

✅ 成功响应:{"acknowledged":true}

📄 Java 核心代码(SnapshotService)

依赖:elasticsearch-javajschcommons-compressminio

@Service
public class SnapshotService {protected final static String TEMP_DIR = System.getProperty("java.io.tmpdir") + File.separator;@Autowiredprivate ElasticsearchClient client;@Value("${es.snapshot.repository}") private String repoName;      // gxsj_eslog_backup@Value("${es.snapshot.repo.path}") private String repoPath;       // /aaatmp/es_backupdata@Value("${es.server.host}") String esHost;                        // 192.168.1.158@Value("${es.server.ssh.user}") private String sshUser;           // elasticsearch@Value("${es.server.ssh.password}") private String sshPassword;   // 123456@Autowiredprivate MinioService minioService;// 主入口:创建快照 + 备份仓库到 MinIOpublic Long createSnapshot(List<String> indices) throws Exception {String name = "backup_" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"));return createSnapshotWithName(name, indices);}public Long createSnapshotWithName(String name, List<String> indices) throws Exception {CreateSnapshotRequest request = CreateSnapshotRequest.of(b -> b.repository(repoName).snapshot(name).indices(indices).ignoreUnavailable(true).includeGlobalState(false).waitForCompletion(true));CreateSnapshotResponse response = client.snapshot().create(request);if ("SUCCESS".equals(response.snapshot().state())) {deleteOldBackups();           // ES 层面保留最近 7 份return backupRepoToMinio();   // 打包整个 repo 上传 MinIO}return null;}// 自动判断同机 or 异机public Long backupRepoToMinio() throws Exception {boolean isLocal = isSameMachine();String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"));String tarGzPath = TEMP_DIR + "es-repo-" + timestamp + ".tar.gz";try {if (isLocal) {System.out.println("📁 本地快照,直接压缩...");createTarGz(repoPath, tarGzPath);} else {System.out.println("🌐 远程快照,SFTP 拉取...");String localTempDir = TEMP_DIR + "es-repo-remote-" + timestamp;downloadDirectoryViaSsh(repoPath, localTempDir);createTarGz(localTempDir, tarGzPath);deleteDirectory(new File(localTempDir));}String objectName = "full-repo/es-repo-" + timestamp + ".tar.gz";minioService.removeAll("backup", "full-repo/es-repo-"); // 只保留最新minioService.uploadObject("backup", objectName, tarGzPath);StatObjectResponse stat = minioService.statObject("backup", objectName);System.out.println("✅ ES 快照仓库已上传 MinIO: " + FileUtil.convertFileSize(stat.size()));return stat.size() / (1024 * 1024); // MB} finally {new File(tarGzPath).delete();}}// 判断是否同机private boolean isSameMachine() {if (isLocalhost(esHost)) {return Files.exists(java.nio.file.Paths.get(repoPath));}return false;}private boolean isLocalhost(String host) {if (host == null) return false;if ("localhost".equalsIgnoreCase(host) || "127.0.0.1".equals(host) || "::1".equals(host)) {return true;}try {Set<String> localIps = getLocalIpAddresses();return localIps.contains(host);} catch (Exception e) {return false;}}// 远程 SFTP 下载目录(JSch)private void downloadDirectoryViaSsh(String remotePath, String localPath) throws JSchException, SftpException {JSch jsch = new JSch();Session session = jsch.getSession(sshUser, esHost, 22);session.setPassword(sshPassword);session.setConfig("StrictHostKeyChecking", "no");session.connect();ChannelSftp sftp = (ChannelSftp) session.openChannel("sftp");sftp.connect();new File(localPath).mkdirs();downloadRecursive(sftp, remotePath, localPath);sftp.disconnect();session.disconnect();}// 递归下载private void downloadRecursive(ChannelSftp sftp, String remotePath, String localPath) throws SftpException {Vector<ChannelSftp.LsEntry> files = sftp.ls(remotePath);for (ChannelSftp.LsEntry entry : files) {if (".".equals(entry.getFilename()) || "..".equals(entry.getFilename())) continue;String remoteFile = remotePath + "/" + entry.getFilename();String localFile = localPath + "/" + entry.getFilename();if (entry.getAttrs().isDir()) {new File(localFile).mkdirs();downloadRecursive(sftp, remoteFile, localFile);} else {sftp.get(remoteFile, localFile);}}}// tar.gz 压缩private void createTarGz(String sourceDir, String tarGzPath) throws IOException {try (FileOutputStream fOut = new FileOutputStream(tarGzPath);BufferedOutputStream bOut = new BufferedOutputStream(fOut);GzipCompressorOutputStream gzOut = new GzipCompressorOutputStream(bOut);TarArchiveOutputStream tOut = new TarArchiveOutputStream(gzOut)) {tOut.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);addFilesToTarGz(tOut, new File(sourceDir), "");}}private void addFilesToTarGz(TarArchiveOutputStream tOut, File file, String base) throws IOException {String entryName = base + file.getName();TarArchiveEntry tarEntry = new TarArchiveEntry(file, entryName);tOut.putArchiveEntry(tarEntry);if (file.isFile()) {try (FileInputStream fIn = new FileInputStream(file)) {byte[] buffer = new byte[4096];int bytesRead;while ((bytesRead = fIn.read(buffer)) != -1) {tOut.write(buffer, 0, bytesRead);}}tOut.closeArchiveEntry();} else if (file.isDirectory()) {tOut.closeArchiveEntry();File[] children = file.listFiles();if (children != null) {for (File child : children) {addFilesToTarGz(tOut, child, entryName + "/");}}}}// 清理旧快照(保留7个)public void deleteOldBackups() throws IOException {GetSnapshotRequest request = GetSnapshotRequest.of(b -> b.repository(repoName).snapshot("_all"));GetSnapshotResponse response = client.snapshot().get(request);int excess = response.total() - 7;if (excess > 0) {for (int i = 0; i < excess; i++) {deleteSnapshot(response.snapshots().get(i).snapshot());}}}public String deleteSnapshot(String name) throws IOException {client.snapshot().delete(b -> b.repository(repoName).snapshot(name));return "Deleted: " + name;}// 工具private void deleteDirectory(File dir) {if (dir.isDirectory()) {File[] children = dir.listFiles();if (children != null) {for (File child : children) deleteDirectory(child);}}dir.delete();}private Set<String> getLocalIpAddresses() throws Exception {return Collections.list(NetworkInterface.getNetworkInterfaces()).stream().flatMap(ni -> {try { return Collections.list(ni.getInetAddresses()).stream(); }catch (Exception e) { return Stream.of(); }}).filter(ia -> ia instanceof Inet4Address).map(InetAddress::getHostAddress).collect(Collectors.toSet());}
}

🛠️ 配置文件(application.yml)

es:server:host: 192.168.1.111          # ES 服务器 IP(若为 localhost 则走本地)ssh:user: elasticsearch        # 仅当 host 非本地时需要password: 123456           # 建议生产环境改用 SSH 密钥snapshot:repository: es_backuprepo:path: /tmp/es_backup

💡 若 hostlocalhost 或本机 IP,且 repoPath 存在,则直接读取本地文件,不走 SSH。


✅ 使用流程总结

  1. ES 服务端:配置 path.repo → 重启 ES;
  2. 注册仓库:通过 Kibana/curl 注册 fs 类型仓库;
  3. 应用配置:填写 es.server.hostrepo.path
  4. 定时调用snapshotService.createSnapshot(Arrays.asList("index1", "index2"))
  5. 自动完成:创建快照 → 清理旧快照 → 打包 repo → 上传 MinIO。

📝 备注:此方案备份的是整个快照仓库目录,而非单个快照。适用于灾备恢复整个 ES 环境。若只需恢复部分索引,可直接使用 ES 快照恢复 API。

🔐 安全建议:生产环境请用 SSH 密钥替代密码,MinIO 使用 IAM 权限控制。

快照仓库可以直接注册在S3服务器的空间中,这里没有采用是业务不同

完整代码:

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.snapshot.*;
import co.elastic.clients.elasticsearch.snapshot.get.SnapshotResponseItem;
import com.jcraft.jsch.*;
import io.minio.StatObjectResponse;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import java.io.*;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.file.Files;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.Vector;
import java.util.stream.Collectors;
import java.util.stream.Stream;/** es-快照服务* */
@Service
public class SnapshotService {protected final static String TEMP_DIR = System.getProperty("java.io.tmpdir") + File.separator;@Autowiredprivate ElasticsearchClient client;@Value("${es.snapshot.repository}")private String repoName;@Value("${es.snapshot.repo.path}")private String repoPath;@Value("${es.server.host}")String esHost;@Value("${es.server.ssh.user}")private String sshUser;@Value("${es.server.ssh.password}")private String sshPassword;@Autowiredprivate MinioService minioService;// 创建快照public Long createSnapshot(List<String> indices) throws Exception {String name = "backup_" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"));return createSnapshotWithName(name, indices);}public Long createSnapshotWithName(String name, List<String> indices) throws Exception {CreateSnapshotRequest request = CreateSnapshotRequest.of(b -> b.repository(repoName).snapshot(name).indices(indices).ignoreUnavailable(true).includeGlobalState(false).waitForCompletion(true) // 异步);CreateSnapshotResponse response = client.snapshot().create(request);//保存最近7次快照if (response.snapshot().state().equals("SUCCESS")) {deleteOldBackups();return backupRepoToMinio();}return null;}public void deleteOldBackups() throws IOException{GetSnapshotRequest request = GetSnapshotRequest.of(b -> b.repository(repoName).snapshot("_all"));GetSnapshotResponse response = client.snapshot().get(request);int excessCount = response.total() - 7;if (excessCount > 0) {for (int i = 0; i < excessCount; i++) {deleteSnapshot(response.snapshots().get(i).snapshot());}}}// 删除快照public String deleteSnapshot(String snapshotName) throws IOException {DeleteSnapshotRequest request = DeleteSnapshotRequest.of(b -> b.repository(repoName).snapshot(snapshotName));client.snapshot().delete(request);return "Snapshot '" + snapshotName + "' deleted.";}// ==============================// 主入口:自动判断是否同机// ==============================public Long backupRepoToMinio() throws Exception {boolean isLocal = isSameMachine();String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"));// 使用 TEMP_DIR 构建临时路径(跨平台兼容)String tarGzPath = TEMP_DIR + "es-repo-" + timestamp + ".tar.gz";try {if (isLocal) {System.out.println("📁 检测到快照仓库在本地,直接读取文件系统...");createTarGz(repoPath, tarGzPath);} else {System.out.println("🌐 快照仓库在远程服务器,通过 SFTP 拉取...");String localTempDir = TEMP_DIR + "es-repo-remote-" + timestamp;downloadDirectoryViaSsh(repoPath, localTempDir);createTarGz(localTempDir, tarGzPath);deleteDirectory(new File(localTempDir)); // 清理临时目录}// 上传到 MinIOString objectName = "full-repo/es-repo-" + timestamp + ".tar.gz";minioService.removeAll("backup", "full-repo/es-repo-");minioService.uploadObject("backup", objectName, tarGzPath);StatObjectResponse backup1 = minioService.statObject("backup", objectName);System.out.println("es备份文件大小: " + FileUtil.convertFileSize(backup1.size()));System.out.println("✅ 快照仓库已上传到 MinIO: " + objectName);return backup1.size() / (1024 * 1024);} finally {// 清理临时压缩包new File(tarGzPath).delete();}}// ==============================// 判断是否同一台服务器(两种策略)// ==============================private boolean isSameMachine() {if (isLocalhost(esHost)) {return Files.exists(java.nio.file.Paths.get(repoPath));}return false;}private boolean isLocalhost(String host) {if (host == null) return false;if ("localhost".equalsIgnoreCase(host) ||"127.0.0.1".equals(host) ||"::1".equals(host)) {return true;}try {Set<String> localIps = getLocalIpAddresses();return localIps.contains(host);} catch (Exception e) {System.err.println("⚠️ 获取本机 IP 失败,跳过 IP 匹配: " + e.getMessage());return false;}}private Set<String> getLocalIpAddresses() throws Exception {return Collections.list(NetworkInterface.getNetworkInterfaces()).stream().flatMap(ni -> {try {return Collections.list(ni.getInetAddresses()).stream();} catch (Exception e) {return Stream.of();}}).filter(ia -> ia instanceof java.net.Inet4Address).map(InetAddress::getHostAddress).collect(Collectors.toSet());}// ==============================// 方案一:本地直接压缩// ==============================private void createTarGz(String sourceDir, String tarGzPath) throws IOException {try (FileOutputStream fOut = new FileOutputStream(tarGzPath);BufferedOutputStream bOut = new BufferedOutputStream(fOut);GzipCompressorOutputStream gzOut = new GzipCompressorOutputStream(bOut);TarArchiveOutputStream tOut = new TarArchiveOutputStream(gzOut)) {tOut.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);addFilesToTarGz(tOut, new File(sourceDir), "");}}private void addFilesToTarGz(TarArchiveOutputStream tOut, File file, String base) throws IOException {String entryName = base + file.getName();TarArchiveEntry tarEntry = new TarArchiveEntry(file, entryName);tOut.putArchiveEntry(tarEntry);if (file.isFile()) {try (FileInputStream fIn = new FileInputStream(file)) {byte[] buffer = new byte[4096];int bytesRead;while ((bytesRead = fIn.read(buffer)) != -1) {tOut.write(buffer, 0, bytesRead);}}tOut.closeArchiveEntry();} else if (file.isDirectory()) {tOut.closeArchiveEntry();File[] children = file.listFiles();if (children != null) {for (File child : children) {addFilesToTarGz(tOut, child, entryName + "/");}}}}// ==============================// 方案二:远程通过 SFTP 拉取// ==============================private void downloadDirectoryViaSsh(String remotePath, String localPath) throws JSchException, SftpException {JSch jsch = new JSch();Session session = jsch.getSession(sshUser, esHost, 22);if (!sshPassword.isEmpty()) {session.setPassword(sshPassword);}session.setConfig("StrictHostKeyChecking", "no");session.connect();ChannelSftp sftp = (ChannelSftp) session.openChannel("sftp");sftp.connect();new File(localPath).mkdirs();downloadRecursive(sftp, remotePath, localPath);sftp.disconnect();session.disconnect();}private void downloadRecursive(ChannelSftp sftp, String remotePath, String localPath) throws SftpException {Vector<ChannelSftp.LsEntry> files = sftp.ls(remotePath);for (ChannelSftp.LsEntry entry : files) {if (".".equals(entry.getFilename()) || "..".equals(entry.getFilename())) continue;String remoteFilePath = remotePath + "/" + entry.getFilename();String localFilePath = localPath + "/" + entry.getFilename();if (entry.getAttrs().isDir()) {new File(localFilePath).mkdirs();downloadRecursive(sftp, remoteFilePath, localFilePath);} else {sftp.get(remoteFilePath, localFilePath);}}}// ==============================// 工具方法// ==============================private void deleteDirectory(File dir) {if (dir.isDirectory()) {File[] children = dir.listFiles();if (children != null) {for (File child : children) {deleteDirectory(child);}}}dir.delete();}}
http://www.hskmm.com/?act=detail&tid=36732

相关文章:

  • 基于setbuf的ret2libc
  • 终于开通博客啦!
  • 工业主板:智慧工业时代的 “硬核大脑”
  • 2025 年冷凝器源头厂家最新推荐榜:优选凸显高真空稳定运行优势,助力企业精准选购平板/片式/方形/搪瓷方形/搪瓷方形平板冷凝器厂家推荐
  • WPS内部版
  • 2025 年管道生产厂家最新推荐排行榜:聚焦多行业适配需求,甄选技术领先、口碑优良的企业搪玻璃/搪瓷三通/搪瓷塔节/搪瓷弯头管道厂家推荐
  • 【51单片机篮球记分器+复合按键操作】2022-12-22 - 指南
  • npm ERR! chromedriver@2.46.0 install: `node install.js`
  • Java 实现 MySQL 同机 异机自动备份到 MinIO(附完整代码)
  • 为什么现在入行 Salesforce 更难了?真相在这里
  • Android 资源适配踩坑记:为什么我的设备匹配不上对应的 `values-wXXXdp-hXXXdp`?
  • QT实现DockWidget内部组件自动换行布局
  • 2025年知名的工业防锈漆厂家最新推荐榜 - Di
  • java8以上快速生成wsdl
  • 2025 年 10 月深圳市激光雕刻机厂家解析,基于专业技术及市场分析
  • UMDF驱动开发入门:二 详解INF文件与设备类选择
  • 2025年诚信的光学真空镀膜机厂家推荐及选择指南 - Di
  • 2025 蛋白/8秒液体/发膜推荐榜:玛丝兰 5 星领跑,这些修护力出众的品牌值得囤!西安悦己容凭技术实力登顶
  • 2025年耐用的破碎机TOP厂家推荐
  • 2025年知名的雕塑推荐TOP品牌企业 - Di
  • 美股及墨西哥股票数据接口文档
  • Spring - 教程
  • 例子:vue3+vite+router创建多级导航菜单
  • 2025 - Di
  • JVM探究(Leo)
  • Higress v2.1.8:30 项引擎更新 + 4 项控制台更新
  • 2025 年最新推荐!集装箱拖车供应厂家权威榜单重磅发布,全方位解析优质厂家实力助企业选对合作伙伴
  • 实战案例 | 利用山海鲸可视化软件,构建制造业数字孪生监控大屏
  • 权威调研榜单:无线电环形导轨配件生产厂家TOP3榜单好评深度解析
  • 10.22模拟赛总结