MySQL如何实现一对多关系到Elasticsearch的同步(mysql一对多到es)

MySQL如何实现一对多关系到Elasticsearch的同步?

在实际应用中,经常需要将MySQL中存储的一对多关系数据同步到Elasticsearch中,以提高查询效率和搜索速度。本文将介绍如何使用MySQL和Elasticsearch的相关技术实现这一需求。

MySQL实现一对多关系

我们需要在MySQL中创建存储一对多关系的表。以博客文章和标签为例,一篇博客文章可以对应多个标签,而一个标签也可以对应多篇博客文章。因此,我们需要创建两个表:文章表和标签表,其关系如下图所示:

![图1 一对多关系ER图](https://img-blog.csdn.net/20180829164036977?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lhaG9vX3dx/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/q/75)

在MySQL中,我们可以使用外键约束来实现一对多关系。具体地,我们在标签表中增加一个文章ID字段,作为外键引用文章表中的主键ID。代码如下:

CREATE TABLE IF NOT EXISTS `article` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`title` varchar(255) NOT NULL,
`content` text NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE IF NOT EXISTS `tag` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
`article_id` int(11) NOT NULL,
PRIMARY KEY (`id`),
CONSTRNT `fk_article` FOREIGN KEY (`article_id`) REFERENCES `article` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

通过以上代码,我们可以创建出两张相互关联的表,并建立了外键约束。接下来,我们可以向这两张表中插入数据,并实现数据同步的功能。

数据同步

为了将MySQL中的数据同步到Elasticsearch中,我们可以使用Elasticsearch的Java客户端和MySQL的JDBC驱动。具体地,我们使用JDBC驱动从MySQL中查询数据,然后使用Java数据结构将其转换为Elasticsearch格式,最终使用Elasticsearch的Java客户端将数据导入到Elasticsearch中。

我们需要在项目中导入Elasticsearch和JDBC的相关依赖。在pom.xml文件中增加以下代码:



org.elasticsearch.client
elasticsearch-rest-high-level-client
7.13.2


mysql
mysql-connector-java
8.0.25


然后,在代码中通过JDBC驱动从MySQL中查询数据,并将其转换为Elasticsearch格式。例如,我们可以定义一个Article和Tag实体类来存储数据:

public class Article {
private int id;
private String title;
private String content;
private List tags;
// getter and setter
}
public class Tag {
private int id;
private String name;

// getter and setter
}

我们可以通过以下代码从MySQL中查询文章及对应的标签:

public List
queryArticles() throws SQLException {
List
articles = new ArrayList();
String sql = "SELECT a.id, a.title, a.content, t.id AS tag_id, t.name AS tag_name " +
"FROM article AS a " +
"LEFT JOIN tag AS t ON a.id = t.article_id";
try (Connection conn = getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
while (rs.next()) {
int articleId = rs.getInt("id");
String title = rs.getString("title");
String content = rs.getString("content");
int tagId = rs.getInt("tag_id");
String tagName = rs.getString("tag_name");
Article article = articles.stream().filter(a -> a.getId() == articleId).findFirst().orElse(null);
if (article == null) {
article = new Article();
article.setId(articleId);
article.setTitle(title);
article.setContent(content);
article.setTags(new ArrayList());
articles.add(article);
}
if (tagId > 0) {
Tag tag = new Tag();
tag.setId(tagId);
tag.setName(tagName);
article.getTags().add(tag);
}
}
}
return articles;
}

在代码中,我们使用了LEFT JOIN来查询文章及对应的标签数据。由于一篇文章可能对应多个标签,因此我们需要使用stream()函数和findFirst()函数来查找已经存在的Article实体对象。如果不存在,我们则创建一个新的Article实体对象,并添加到articles集合中。

接下来,我们将数据转换为Elasticsearch格式,并导入到Elasticsearch中:

public void syncToElasticsearch() throws IOException, SQLException {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200, "http")).build();
RestHighLevelClient highLevelClient = new RestHighLevelClient(restClient);
List
articles = queryArticles();
for (Article article : articles) {
IndexRequest indexRequest = new IndexRequest("articles");
Map jsonMap = new HashMap();
jsonMap.put("title", article.getTitle());
jsonMap.put("content", article.getContent());
List> tags = new ArrayList();
for (Tag tag : article.getTags()) {
Map tagMap = new HashMap();
tagMap.put("id", tag.getId());
tagMap.put("name", tag.getName());
tags.add(tagMap);
}
jsonMap.put("tags", tags);
indexRequest.source(jsonMap);
highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
}
}

在代码中,我们使用Elasticsearch的Java客户端将数据导入到Elasticsearch中。我们使用RestClient构建连接,然后通过RestHighLevelClient发送数据到Elasticsearch。最终,我们可以在Elasticsearch中查看导入的数据。

完整代码

public class Mn {
private static final String DATABASE_URL = "jdbc:mysql://localhost:3306/blog";
private static final String DATABASE_USERNAME = "root";
private static final String DATABASE_PASSWORD = "123456";

public static void mn(String[] args) throws SQLException, IOException {
syncToElasticsearch();
}

public static void syncToElasticsearch() throws IOException, SQLException {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200, "http")).build();
RestHighLevelClient highLevelClient = new RestHighLevelClient(restClient);
List
articles = queryArticles();
for (Article article : articles) {
IndexRequest indexRequest = new IndexRequest("articles");
Map jsonMap = new HashMap();
jsonMap.put("title", article.getTitle());
jsonMap.put("content", article.getContent());
List> tags = new ArrayList();
for (Tag tag : article.getTags()) {
Map tagMap = new HashMap();
tagMap.put("id", tag.getId());
tagMap.put("name", tag.getName());
tags.add(tagMap);
}
jsonMap.put("tags", tags);
indexRequest.source(jsonMap);
highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
}
}

public static List
queryArticles() throws SQLException {
List
articles = new ArrayList();
String sql = "SELECT a.id, a.title, a.content, t.id AS tag_id, t.name AS tag_name " +
"FROM article AS a " +
"LEFT JOIN tag AS t ON a.id = t.article_id";
try (Connection conn = getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
while (rs.next()) {
int articleId = rs.getInt("id");
String title = rs.getString("title");
String content = rs.getString("content");
int tagId = rs.getInt("tag_id");
String tagName = rs.getString("tag_name");
Article article = articles.stream().filter(a -> a.getId() == articleId).findFirst().orElse(null);
if (article == null) {
article = new Article();
article.setId(articleId);
article.setTitle(title);
article.setContent(content);
article.setTags(new ArrayList());
articles.add(article);
}
if (tagId > 0) {
Tag tag = new Tag();
tag.setId(tagId);
tag.setName(tagName);
article.getTags().add(tag);
}
}
}
return articles;
}

public static Connection getConnection() throws SQLException {
return DriverManager.getConnection(
DATABASE_URL, DATABASE_USERNAME, DATABASE_PASSWORD);
}
}

public class Article {
private int id;
private String title;
private String content;
private List tags;
// getter and setter
}
public class Tag {
private int id;
private String name;

// getter and setter
}

总结

本文介绍了如何使用MySQL和Elasticsearch的相关技术实现一对多关系数据的同步。通过以上的代码,我们可以将MySQL中存储的一对多关系数据同步到Elasticsearch中,以提高查询效率和搜索速度。如果您还有其它相关问题或建议,欢迎在评论区留言,我们会及时回复。


数据运维技术 » MySQL如何实现一对多关系到Elasticsearch的同步(mysql一对多到es)