一、创建一个新的服务模块 zidanmall-search
参考官方文档:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.x/index.html
我们选择:Java High Level REST Client 更强大,封装更彻底;
1、pox.xml:
注意:我们虽然添加了 Java High Level REST Client 的依赖,但是 elasticsearch 本身的版本在springboot的版本依赖中默认定义了;版本较低,我这边在 spring-boot-dependencies 中查出来是6.4.3;
需要修改为配套:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.jiguiquan.zidanmall</groupId>
<artifactId>zidanmall-search</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>zidanmall-search</name>
<description>全文检索ES服务</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR3</spring-cloud.version>
<elasticsearch.version>7.4.2</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>com.jiguiquan.zidanmall</groupId>
<artifactId>zidanmall-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--es的版本一定要严格和我们安装的软件保持一致-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、ElsaticSearch的配置:
如果我们是通过SpringData来使用ES,那么配置将会比较简单,但是我们没有使用spring data,所以需要自己手动配置ES的配置信息:
bootstrap.yml:
server: port: 10000 spring: application: name: zidanmall-product cloud: nacos: discovery: server-addr: localhost:8848 config: server-addr: localhost:8848 file-extension: yaml namespace: c5a52176-88d8-4aaa-ba1d-65caaab513d5 group: dev
config.ZidanElasticSearchConfig.java:
@Configuration
public class ZidanElasticSearchConfig {
@Bean
public RestHighLevelClient esRestClient(){
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("192.168.174.141", 9200, "http")));
return client;
}
}
启动项目,一切正常;
二、在ElasticSearch中创建索引product及对应的映射
1、相当于,创建了product数据库,并在一种创建了默认表“_doc”,及对应的表结构(_mapping);
PUT product
{
"mappings": {
"properties": {
"skuId":{
"type": "long"
},
"spuId":{
"type": "keyword"
},
"skuTitle":{
"type": "text",
"analyzer": "ik_smart"
},
"skuPrice":{
"type": "keyword"
},
"skuImg":{
"type": "keyword",
"index": false, //不需要作为索引字段
"doc_values": false //不需要进行聚合分析
},
"saleCount":{
"type": "long"
},
"hasStock":{
"type": "boolean"
},
"hotScore":{
"type": "long"
},
"brandId":{
"type": "long"
},
"catalogId":{
"type": "long"
},
"brandName":{
"type": "keyword",
"index": false
},
"brandImg":{
"type": "keyword",
"index": false
},
"catalogName":{
"type": "keyword",
"index": false
},
"attrs":{
"type": "nested", //嵌套结构
"properties": {
"attrId":{
"type":"long"
},
"attrName":{
"type": "keyword",
"index": false
},
"attrValue":{
"type":"keyword"
}
}
}
}
}
}
nested:嵌入式 数据类型(重要:可以解决ES对数据扁平化处理后的错误查询问题)
index:是否需要作为索引字段
doc_values:是否需要进行聚合分析
因为当上面2个位true的时候,ES需要做大量的工作,以满足后续使用需求;如果我们不需要,可以置为false,这样可以节约ES的资源!
执行成功:

2、因为嵌入式nested比较重要,所以记录一下:
2.1、有一个user[]数组:
"user" : [
{
"first" : "John",
"last" : "Smith"
},
{
"first" : "Alice",
"last" : "White"
}
]}
2.2、如果我们正常存入ES,
PUT my_index/_doc/1{
"group" : "fans",
"user" : [
{
"first" : "John",
"last" : "Smith"
},
{
"first" : "Alice",
"last" : "White"
}
]}
因为ES会对数据进行扁平化处理成:
{
"group" : "fans",
"user.first" : [ "alice", "john" ],
"user.last" : [ "smith", "white" ]}
2.3、然后我们搜索:first:Alice,last:Smith的用户
GET my_index/_search{
"query": {
"bool": {
"must": [
{ "match": { "user.first": "Alice" }},
{ "match": { "user.last": "Smith" }}
]
}
}}
其实应该是查找不到 Alice Smith这个user的,但是实际结果却是可以查到:

这样的结果显然是由问题的!,该怎么办呢?这时候就需要用到nested嵌入式类型:
3、创建映射(数据类型)为nested
PUT my_index{
"mappings": {
"properties": {
"user": {
"type": "nested"
}
}
}}
此时再存入数据:
PUT my_index/_doc/1{
"group" : "fans",
"user" : [
{
"first" : "John",
"last" : "Smith"
},
{
"first" : "Alice",
"last" : "White"
}
]}
这时候再去查,就查不到数据了:

三、编写spu商品上架功能

spu商品上架:
找出spu下所有的sku,将其存到ES中;(其中要确认当前sku是否有库存,有哪些可以被检索的规格属性等)
1、构造基本数据结构:SkuEsModel.java:
@Data
public class SkuEsModel {
private Long skuId;
private Long spuId;
private String skuTitle;
private BigDecimal skuPrice;
private String skuImg;
private Long saleCount;
private Boolean hasStock;
private Long hotScore;
private Long brandId;
private Long catelogId;
private String brandName;
private String brandImg;
private String catelogName;
private List<Attrs> attrs;
@Data
public static class Attrs{
private Long attrId;
private String attrName;
private String attrValue;
}
}
后面我们想要往ElasticSearch中存放数据,只需要封装成此entity对象即可;
接口:POST /product/spuinfo/{spuId}/up
2、Service层核心代码 SpuInfoServiceImpl.java:
@Override
public void up(Long spuId) {
//组装需要存入ES中的数据模型
//1、查询出spuId对应的所有sku信息,品牌和名字
List<SkuInfoEntity> skus = skuInfoService.getSkusBySpuId(spuId);
//2、调用远程服务 库存系统,查询是否有库存——>远程服务,为了减少请求次数,我们批量执行——————对应4.1
Map<Long, Boolean> hasStockMap = null;
try{
List<Long> skuIds = skus.stream().map(SkuInfoEntity::getSkuId).collect(Collectors.toList());
TypeReference<List<SkuHasStockVo>> typeReference = new TypeReference<List<SkuHasStockVo>>(){};
List<SkuHasStockVo> data = wareFeignService.getSkuHasStock(skuIds).getData(typeReference);
hasStockMap = data.stream().collect(Collectors.toMap(SkuHasStockVo::getSkuId, SkuHasStockVo::getHasStock));
System.out.println("hasStockMap:"+hasStockMap);
}catch (Exception e){
log.error("库存服务调用失败,原因:{}", e);
}
//3、查询出当前spu的所有“可以被用来检索”的规格属性——>因为属于Spu,所以我们只需要查询一次——————对应4.4
List<ProductAttrValueEntity> baseAttrs = attrValueService.baseAttrListForSpu(spuId);
List<Long> attrIds = baseAttrs.stream().map(ProductAttrValueEntity::getAttrId).collect(Collectors.toList());
//筛选出所有可以被检索的attrIds
Set<Long> searchAttrIds = attrService.selectSearchAttrIds(attrIds);
//从baseAttrs中取出在searchAttrIds中的元素
List<SkuEsModel.Attrs> attrsList = baseAttrs.stream().filter(attr -> searchAttrIds.contains(attr.getAttrId())).map(item -> {
SkuEsModel.Attrs attrs = new SkuEsModel.Attrs();
BeanUtils.copyProperties(item, attrs);
return attrs;
}).collect(Collectors.toList());
//4、封装每个sku的详细信息
Map<Long, Boolean> finalHasStockMap = hasStockMap;
List<SkuEsModel> upProducts = skus.stream().map(sku -> {
SkuEsModel esModel = new SkuEsModel();
BeanUtils.copyProperties(sku, esModel);
//有几个无法拷过来的skuPrice/skuImg/hasStock/hotScore/brandName/brandImg/catalogName /attrs
esModel.setSkuPrice(sku.getPrice());
esModel.setSkuImg(sku.getSkuDefaultImg());
//4.1、调用远程服务 库存系统,查询是否有库存——>远程服务,为了减少请求次数,我们批量执行
if (finalHasStockMap != null) {
esModel.setHasStock(finalHasStockMap.get(sku.getSkuId()));
} else {
esModel.setHasStock(true);
}
//TODO 4.2、热度评分,暂定为0
esModel.setHotScore(0L);
//4.3、查询品牌和分类的名字
BrandEntity brand = brandService.getById(sku.getBrandId());
esModel.setBrandName(brand.getName());
esModel.setBrandImg(brand.getLogo());
CategoryEntity category = categoryService.getById(sku.getCatalogId());
esModel.setCatelogName(category.getName());
//4.4、查询出当前sku的所有“可以被用来检索”的规格属性——>因为属于Spu,所以我们只需要查询一次
esModel.setAttrs(attrsList);
return esModel;
}).collect(Collectors.toList());
//将收集到的需要上架的upProducts通过zidanmall-search服务上传到ES中
R r = searchFeignService.productStatusUp(upProducts);
if (r.getCode() == 0){
//远程调用成功, 修改当前商品的发布状态
SpuInfoEntity spuInfoEntity = new SpuInfoEntity();
spuInfoEntity.setId(spuId);
spuInfoEntity.setPublishStatus(1);
this.updateById(spuInfoEntity);
}else {
//远程调用失败
//TODO 重复调用问题,接口幂等性————>feign的重试机制
}
}
其中,除了需要调用本服务中的接口外,还需要使用Feign远程调用:
-
调用ware库存服务,查询每个sku的库存;
-
调用search检索服务(整合了ES),将数据存到ES中;
4、zidanmall-ware中的批量查询sku对应的是否有库存接口:
/**
* 批量查询对应的sku是否有库存
* @param skuIds
* @return
*/
@Override
public List<SkuHasStockVo> getSkuHasStock(List<Long> skuIds) {
List<SkuHasStockVo> collect = skuIds.stream().map(id -> {
SkuHasStockVo vo = new SkuHasStockVo();
//select SUM(stock-stock_locked) from `wma_ware_sku` where sku_id = 1
Long count = baseMapper.getSkuStock(id);
vo.setSkuId(id);
vo.setHasStock(count == null ? false : (count > 0));
return vo;
}).collect(Collectors.toList());
return collect;
}
5、zidanmall-search中批量上架SkuEsModel到ES存储中:
ElasticSaveController.java:
/**
* 商品上架功能
* @param skuEsModels
* @return
*/
@PostMapping("/product")
public R productStatusUp(@RequestBody List<SkuEsModel> skuEsModels){
Boolean b = false;
try {
b = productSaveService.productStatusUp(skuEsModels);
} catch (IOException e) {
log.error("商品上架异常:{}", e);
return R.error(BizCodeEnum.PRODUCT_UO_EXCEPTION.getCode(), BizCodeEnum.PRODUCT_UO_EXCEPTION.getMsg());
}
if (b){
return R.ok();
}else {
return R.error(BizCodeEnum.PRODUCT_UO_EXCEPTION.getCode(), BizCodeEnum.PRODUCT_UO_EXCEPTION.getMsg());
}
}
ProductSaveServiceImpl.java:
@Service
@Slf4j
public class ProductSaveServiceImpl implements ProductSaveService {
@Autowired
RestHighLevelClient restHighLevelClient;
/**
* 上架功能
* @param skuEsModels
*/
@Override
public Boolean productStatusUp(List<SkuEsModel> skuEsModels) throws IOException {
//将数据保存到ES中
//1、给ES中建立一个索引product,并建好mapping映射,我们已经在ES中手动创建了
//2、给索引中保存数据——批量操作
BulkRequest bulkRequest = new BulkRequest();
skuEsModels.forEach(model -> {
IndexRequest indexRequest = new IndexRequest(EsConstant.PRODUCT_INDEX);
indexRequest.id(model.getSkuId().toString());
String json = JSON.toJSONString(model);
indexRequest.source(json, XContentType.JSON);
bulkRequest.add(indexRequest);
});
BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, ZidanElasticSearchConfig.COMMON_OPTIONS);
//可以对结果进行处理
if (bulk.hasFailures()){
List<String> collect = Arrays.stream(bulk.getItems()).map(item -> item.getId()).collect(Collectors.toList());
log.error("下列商品上架不成功:{}", collect);
return false;
}else {
return true;
}
}
}
6、其中 RestHighLevelClient 的注入配置:
@Configuration
public class ZidanElasticSearchConfig {
public static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
COMMON_OPTIONS = builder.build();
}
@Bean
public RestHighLevelClient esRestClient(){
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("192.168.174.141", 9200, "http")));
return client;
}
}
7、补充:为了方便 Feign 调用过程中,获取复杂数据结构,我重新修改了统一返回 R 的内容:

使用方式:
存入:
List<SkuHasStockVo> data = wareSkuService.getSkuHasStock(skuIds);
return R.ok().setData(data);
取出:
TypeReference<List<SkuHasStockVo>> typeReference = new TypeReference<List<SkuHasStockVo>>(){};
List<SkuHasStockVo> data = wareFeignService.getSkuHasStock(skuIds).getData(typeReference);
四、上架结果:
1、上架成功后:

2、到ES中查询是否已经上架成功——即ES中有没有这部分数据:

到这里,借助ElasticSearch实现商品Sku的上架功能就算是完成了;
当然还有一些瑕疵:比如Feign的重复调用等,后续解决;
3、这里,简单描述一下,Feign的调用过程和重试机制的实现:
//Feign 的调用流程:
/**
* SynchronousMethodHandler.java类中的invoke()方法:
* 1、构造请求数据,将对象转为json;
* RequestTemplate template = this.buildTemplateFromArgs.create(argv);
* 2、发送请求,进行执行(执行成功会解码响应数据);
* this.executeAndDecode(template);
* 3、执行请求失败会有重试机制(retryer重试器)
* while(true){
* try{
* this.executeAndDecode(template);
* }catch(Exception ex){
* try{retryer.continueOrPropagate(e)}catch(Exception ex){throw ex}
* continue;
* }
* }
*/


