SpringBoot_Elasticsearch


SpringBoot_Elasticsearch

第一步

  • pom.xml 添加对应的架包
<properties>
    <java.version>1.8</java.version>
    <elasticsearch.version>7.15.0</elasticsearch.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.71</version>
    </dependency>
</dependencies>

第二步

  • 配置文件添加对应的配置信息
spring:
  elasticsearch:
    rest:
      uris: #xxxx.xx.x.xx:9200

第三步

  • 定义配置类RestHighLevelClientConfig
@Configuration
public class RestHighLevelClientConfig {

    @Value("${spring.elasticsearch.rest.uris}")
    private String uris;

    @Bean
    public RestHighLevelClient restHighLevelClient() {
        return new RestHighLevelClient(RestClient.builder(this.createHttpHost()));
    }

    /**
     * 创建 HttpHost 对象
     * @return 返回 HttpHost 对象
     */
    private HttpHost createHttpHost() {
        Asserts.check(ObjectUtils.isEmpty(uris), "ElasticSearch cluster ip address cannot empty");
        String url = uris.split(",")[0].trim();
        return new HttpHost(url.split(":")[0], Integer.parseInt(url.split(":")[1]));
    }
}

第四步

  • 定义数据实体类ElasticSearchDocument
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ElasticSearchDocument<T> {
    private String id;
    private T data;
}

第五步

  • 定义公共错误类BaseException
public class BaseException extends RuntimeException {

    public BaseException(String message) {
        super(message);
    }

    public BaseException(String message, Throwable cause) {
        super(message, cause);
    }
}

第六步

  • 定义方法接口ElasticSearchService
@Component
public interface ElasticSearchService {

    /**
     * 创建 ES 索引
     *
     * @param index      索引
     * @param properties 文档属性集合
     * @return 返回 true,表示创建成功
     */
    boolean createIndex(String index, Map<String, Map<String, Object>> properties) throws IOException;

    /**
     * 判断索引是否存在
     *
     * @param index 索引
     * @return 返回 true,表示存在
     */
    boolean isExistIndex(String index) throws IOException;

    /**
     * 删除索引
     *
     * @param index 索引
     * @return 返回 true,表示删除成功
     */
    boolean deleteIndex(String index) throws IOException;

    /**
     * 保存文档
     * <p>
     * 如果文档存在,则更新文档;如果文档不存在,则保存文档。
     *
     * @param document 文档数据
     */
    void save(String index, ElasticSearchDocument<?> document) throws IOException;

    /**
     * 更新 ES 文档
     *
     * @param index    索引
     * @param document 文档
     */
    void update(String index, ElasticSearchDocument<?> document) throws IOException;

    /**
     * 批量保存文档
     * <p>
     * 如果集合中有些文档已经存在,则更新文档;不存在,则保存文档。
     *
     * @param index        索引
     * @param documentList 文档集合
     * @param <T>          数据类型
     */
    <T> void saveAll(String index, List<ElasticSearchDocument<T>> documentList) throws IOException;

    /**
     * 根据文档 ID 删除文档
     *
     * @param index 索引
     * @param id    文档 ID
     */
    void delete(String index, String id) throws IOException;

    /**
     * 根据查询条件删除文档
     *
     * @param index        索引
     * @param queryBuilder 查询条件构建器
     */
    void deleteByQuery(String index, QueryBuilder queryBuilder) throws IOException;

    /**
     * 根据文档 ID 批量删除文档
     *
     * @param index  索引
     * @param idList 文档 ID 集合
     */
    void deleteAll(String index, List<String> idList) throws IOException;

    /**
     * 根据索引和文档 ID 获取数据
     *
     * @param index 索引
     * @param id    文档 ID
     * @param <T>   数据类型
     * @return T  返回 T 类型的数据
     */
    <T> T get(String index, String id, Class<T> resultType) throws IOException;

    /**
     * 条件查询
     *
     * @param index         索引
     * @param sourceBuilder 条件查询构建器
     * @param <T>           数据类型
     * @return T 类型的集合
     */
    <T> List<T> searchByQuery(String index, SearchSourceBuilder sourceBuilder, Class<T> resultType) throws IOException;

    /**
     * 获取聚合数据
     *
     * @param searchSourceBuilder 条件查询构建器
     * @param indices             索引
     * @return 返回聚合集合
     */
    Aggregations searchAgg(SearchSourceBuilder searchSourceBuilder, String... indices);
}

第七步

  • 实现接口ElasticSearchService
@Service
public class ElasticSearchServiceImpl implements ElasticSearchService {

    private static final int DEFAULT_SHARDS = 3;

    private static final int DEFAULT_REPLICAS = 1;

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    /**
     * 创建 ES 索引
     *
     * @param index      索引
     * @param properties 文档属性集合
     * @return 返回 true,表示创建成功
     * @throws IOException Rest Client 请求异常
     */
    @Override
    public boolean createIndex(String index, Map<String, Map<String, Object>> properties) throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder();
        // ES 7.0 后的版本中,已经弃用 type
        builder.startObject()
            .startObject("mappings")
            .field("properties", properties)
            .endObject()
            .startObject("settings")
            .field("number_of_shards", DEFAULT_SHARDS)
            .field("number_of_replicas", DEFAULT_REPLICAS)
            .endObject()
            .endObject();

        CreateIndexRequest request = new CreateIndexRequest(index).source(builder);
        CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);

        return response.isAcknowledged();
    }

    /**
     * 判断索引是否存在
     *
     * @param index 索引
     * @return 返回 true,表示存在
     * @throws IOException Rest Client 请求异常
     */
    @Override
    public boolean isExistIndex(String index) throws IOException {
        GetIndexRequest getIndexRequest = new GetIndexRequest(index);
        getIndexRequest.local(false);
        getIndexRequest.humanReadable(true);
        getIndexRequest.includeDefaults(false);

        return restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
    }

    /**
     * 删除索引
     *
     * @param index 索引
     * @return 返回 true,表示删除成功
     * @throws IOException Rest Client 请求异常
     */
    @Override
    public boolean deleteIndex(String index) throws IOException {
        try {
            DeleteIndexRequest request = new DeleteIndexRequest(index);
            AcknowledgedResponse response = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);

            return response.isAcknowledged();
        } catch (ElasticsearchException exception) {
            if (exception.status() == RestStatus.NOT_FOUND) {
                exception.printStackTrace();
                throw new BaseException("Not found index: " + index);
            }
            throw exception;
        }
    }

    /**
     * 保存文档
     * <p>
     * 如果文档存在,则更新文档;如果文档不存在,则保存文档。
     *
     * @param document 文档数据
     * @throws IOException Rest Client 请求异常
     */
    @Override
    public void save(String index, ElasticSearchDocument<?> document) throws IOException {
        IndexRequest indexRequest = new IndexRequest(index);
        indexRequest.id(document.getId());
        indexRequest.source(JSON.toJSONString(document.getData()), XContentType.JSON);
        // 保存文档数据
        restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);

    }

    /**
     * 更新 ES 文档
     *
     * @param index    索引
     * @param document 文档
     */
    @Override
    public void update(String index, ElasticSearchDocument<?> document) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(index, document.getId());
        updateRequest.doc(JSON.toJSONString(document.getData()), XContentType.JSON);

        restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
    }

    /**
     * 批量保存文档
     * <p>
     * 如果集合中有些文档已经存在,则更新文档;不存在,则保存文档。
     *
     * @param index        索引
     * @param documentList 文档集合
     * @param <T>          数据类型
     * @throws IOException Rest Client 请求异常
     */
    @Override
    public <T> void saveAll(String index, List<ElasticSearchDocument<T>> documentList) throws IOException {
        if (ObjectUtils.isEmpty(documentList)) {
            return;
        }
        // 批量请求
        BulkRequest bulkRequest = new BulkRequest();
        documentList.forEach(doc -> {
            bulkRequest.add(new IndexRequest(index)
                            .id(doc.getId())
                            .source(JSON.toJSONString(doc.getData()), XContentType.JSON));
        });

        restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

    }

    /**
     * 根据文档 ID 删除文档
     *
     * @param index 索引
     * @param id    文档 ID
     * @throws IOException Rest Client 请求异常
     */
    public void delete(String index, String id) throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest(index, id);

        restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
    }

    /**
     * 根据查询条件删除文档
     *
     * @param index        索引
     * @param queryBuilder 查询条件构建器
     * @throws IOException Rest Client 请求异常
     */
    public void deleteByQuery(String index, QueryBuilder queryBuilder) throws IOException {
        DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(index).setQuery(queryBuilder);
        deleteRequest.setConflicts("proceed");
        restHighLevelClient.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
    }

    /**
     * 根据文档 ID 批量删除文档
     *
     * @param index  索引
     * @param idList 文档 ID 集合
     * @throws IOException Rest Client 请求异常
     */
    public void deleteAll(String index, List<String> idList) throws IOException {
        if (ObjectUtils.isEmpty(idList)) {
            return;
        }
        BulkRequest bulkRequest = new BulkRequest();
        idList.forEach(id -> bulkRequest.add(new DeleteRequest(index, id)));

        restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    }

    /**
     * 根据索引和文档 ID 获取数据
     *
     * @param index 索引
     * @param id    文档 ID
     * @param <T>   数据类型
     * @return T  返回 T 类型的数据
     * @throws IOException Rest Client 请求异常
     */
    public <T> T get(String index, String id, Class<T> resultType) throws IOException {
        GetRequest getRequest = new GetRequest(index, id);
        GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
        String resultAsString = response.getSourceAsString();

        return JSON.parseObject(resultAsString, resultType);
    }

    /**
     * 条件查询
     *
     * @param index         索引
     * @param sourceBuilder 条件查询构建器
     * @param <T>           数据类型
     * @return T 类型的集合
     * @throws IOException Rest Client 请求异常
     */
    public <T> List<T> searchByQuery(String index, SearchSourceBuilder sourceBuilder, Class<T> resultType) throws IOException {
        // 构建查询请求
        SearchRequest searchRequest = new SearchRequest(index).source(sourceBuilder);
        // 获取返回值
        SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        SearchHit[] hits = response.getHits().getHits();
        // 创建空的查询结果集合
        List<T> results = new ArrayList<>(hits.length);
        for (SearchHit hit : hits) {
            // 以字符串的形式获取数据源
            String sourceAsString = hit.getSourceAsString();
            results.add(JSON.parseObject(sourceAsString, resultType));
        }

        return results;

    }

    /**
     * 获取聚合数据
     *
     * @param searchSourceBuilder 条件查询构建器
     * @param indices 索引
     * @return 返回聚合集合
     */
    public Aggregations searchAgg(SearchSourceBuilder searchSourceBuilder, String... indices) {
        try {
            SearchRequest searchRequest = new SearchRequest(indices).source(searchSourceBuilder);
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

            return searchResponse.getAggregations();
        } catch (IOException e) {
            e.printStackTrace();
            throw new BaseException("ElasticSearch client exception");
        }
    }
}

文章作者: L Q
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 L Q !
  目录