image-20241008222015353

背景

电商模式是指电子商务平台和线上商家之间进行业务合作的方式,主要包括以下几种:

B2C

B2C是Business-to-Customer的缩写,而其中文简称为“商对客”。“商对客”是电子商务的一种模式,也就是通常说的直接面向消费者销售产品和服务商业零售模式。这种形式的电子商务一般以网络零售业为主,主要借助于互联网开展在线销售活动。B2C即企业通过互联网为消费者提供一个新型的购物环境——网上商店,消费者通过网络在网上购物、网上支付等消费行为。

案例:唯品会、亚马逊

image-20241005120430236

B2B

B2B ( Business to Business)是指进行电子商务交易的供需双方都是商家(或企业、公司),她(他)们使用了互联网的技术或各种商务网络平台,完成商务交易的过程。电子商务是现代 B2B marketing的一种具体主要的表现形式。

案例:阿里巴巴1688

image-20241005120456425

B2B2C

B2B2C是一种电子商务类型的网络购物商业模式,B是BUSINESS的简称,C是CUSTOMER的简称,第一个B指的是商品或服务的供应商,第二个B指的是从事电子商务的企业,C则是表示消费者。

案例:京东

image-20241005120522854

C2C

C2C即 Customer(Consumer) to Customer(Consumer),意思就是消费者个人间的电子商务行为。比如一个消费者有一台电脑,通过网络进行交易,把它出售给另外一个消费者,此种交易类型就称为C2C电子商务。

案例:闲鱼

image-20241005120542944

O2O

O2O即Online To Offline(在线离线/线上到线下),是指将线下的商务机会与互联网结合,让互联网成为线下交易的平台,这个概念最早来源于美国。O2O的概念非常广泛,既可涉及到线上,又可涉及到线下,可以通称为O2O。O2O这种在线支付购买线下的商品和服务,再到线下享受服务的模式也被证实可以很快被消费者接受。

案例:美团优选

=.=我们的快乐学习就是B2B2C这样模式

优化需求

  • 根据课程id查询课程信息
  • 根据文件id查询视频信息
名称 类型 长度 小数点 不是 null 虚拟 注释
id bigint 🔑1 主键
company_id bigint 机构ID
company_name varchar 255 机构名称
name varchar 100 课程名称
users varchar 500 适用人群
tags varchar 50 课程标签
mt varchar 20 大分类
st varchar 20 小分类
grade varchar 50 课程等级
teachmode varchar 32 教育模式(common普通, record录播, live直播等)
description text 课程介绍
pic varchar 500 课程图片
create_date datetime 创建时间
change_date datetime 修改时间
create_people varchar 50 创建人
change_people varchar 50 更新人
audit_status varchar 32 审核状态
status varchar 10 课程发布状态(未发布、已发布、下线)

为啥要着重优化?

原因:

这些接口在用户未认证状态下也可以访问,如果接口的性能不高,当高并发到来很可能耗尽整个系统的资源,将整个系统压垮,所以特别需要对这些暴露在外边的接口进行优化。

这里接口文档我们是采用Apifox进行管理的。

Apifox 是一个集成了 API 文档管理、API 调试、API 自动化测试Mock 服务 的全流程 API 工具。它的目标是帮助开发者和测试人员简化 API 开发、测试的复杂性,并提高协作效率。

image-20241006144949434

可以很好的对每个模块的接口进行管理。

压力测试

使用Jmeter进行接口压力测试

image-20240930230426768

样本数:200个线程,每个线程请求100次,共20000次

压力机:通常压力机是单独的客户端。

测试gateway+content

吞吐量180左右

image-20240930225333727

测试content

吞吐量300左右

image-20240930225347190

优化日志

内容管理日志级别改为info级别.

image-20240930225443617

单独请求内容管理测试,吞吐量达到1500左右

image-20240930225506973

缓存优化

使用redis缓存

测试用例是根据id查询课程信息,这里不存在复杂的SQL,也不存在数据库连接不释放的问题,暂时不考虑数据库方面的优化。

课程发布信息的特点的是查询较多,修改很少,这里考虑将课程发布信息进行缓存。

课程信息缓存的流程如下:

image-20240930220116737

在nacos配置redis-dev.yaml(group=xuecheng-plus-common)

1
2
3
4
5
6
7
8
9
10
11
12
spring: 
redis:
host: 192.168.101.65
port: 6379
password: redis
database: 0
lettuce:
pool:
max-active: 20
max-idle: 10
min-idle: 0
timeout: 10000

在content-api微服务加载redis-dev.yaml

1
2
3
4
shared-configs:
- data-id: redis-${spring.profiles.active}.yaml
group: xuecheng-plus-common
refresh: true

在content-service微服务中添加依赖

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.2</version>
</dependency>

定义查询缓存接口:

1
2
3
4
5
6
7
8
/**
* @description 查询缓存中的课程信息
* @param courseId
* @return com.xuecheng.content.model.po.CoursePublish
* @author Mr.M
* @date 2022/10/22 16:15
*/
public CoursePublish getCoursePublishCache(Long courseId);

接口实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public CoursePublish getCoursePublishCache(Long courseId){
//查询缓存
Object jsonObj = redisTemplate.opsForValue().get("course:" + courseId);
if(jsonObj!=null){
String jsonString = jsonObj.toString();
System.out.println("=================从缓存查=================");
CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);
return coursePublish;
} else {
System.out.println("从数据库查询...");
//从数据库查询
CoursePublish coursePublish = getCoursePublish(courseId);
if(coursePublish!=null){
redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish));
}
return coursePublish;
}
}

修改controller接口调用代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@ApiOperation("获取课程发布信息")
@ResponseBody
@GetMapping("/course/whole/{courseId}")
public CoursePreviewDto getCoursePublish(@PathVariable("courseId") Long courseId) {
//查询课程发布信息
CoursePublish coursePublish = coursePublishService.getCoursePublishCache(courseId);
// CoursePublish coursePublish = coursePublishService.getCoursePublish(courseId);
if(coursePublish==null){
return new CoursePreviewDto();
}

//课程基本信息
CourseBaseInfoDto courseBase = new CourseBaseInfoDto();
BeanUtils.copyProperties(coursePublish, courseBase);
//课程计划
List<TeachplanDto> teachplans = JSON.parseArray(coursePublish.getTeachplan(), TeachplanDto.class);
CoursePreviewDto coursePreviewInfo = new CoursePreviewDto();
coursePreviewInfo.setCourseBase(courseBase);
coursePreviewInfo.setTeachplans(teachplans);
return coursePreviewInfo;
}

缓存穿透

使用缓存后代码的性能有了很大的提高,虽然性能有很大的提升但是控制台打出了很多"从数据库查询"的日志,明明判断了如果缓存存在课程信息则从缓存查询,为什么要有这么多从数据库查询的请求的?

这是因为并发数高,很多线程会同时到达查询数据库代码处去执行。

image-20240930220538182

如果存在恶意攻击的可能,如果有大量并发去查询一个不存在的课程信息会出现什么问题呢?

比如去请求/content/course/whole/181,查询181号课程,该课程并不在课程发布表中。

进行压力测试发现会去请求数据库。

大量并发去访问一个数据库不存在的数据,由于缓存中没有该数据导致大量并发查询数据库,这个现象要缓存穿透。

解决方法

1.设置空值【注意设置超时时间】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public CoursePublish getCoursePublishCache(Long courseId) {

// 查询缓存
Object jsonObj = redisTemplate.opsForValue().get("course:" + courseId);

if (jsonObj != null) {
// 如果缓存中存在数据
String jsonString = jsonObj.toString();

if (jsonString.equals("null")) {
// 如果缓存中的值为 "null",返回 null
return null;
}

// 将 JSON 字符串解析为 CoursePublish 对象
CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);
return coursePublish;
} else {
// 如果缓存中不存在数据,从数据库查询
System.out.println("从数据库查询数据...");
CoursePublish coursePublish = getCoursePublish(courseId);

// 将查询结果存入缓存,并设置过期时间为 300 秒
redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish), 300, TimeUnit.SECONDS);

return coursePublish;
}
}

2.布隆过滤器

Google工具包Guava实现

假设我们预计会有 1000000 个 courseId,并允许 1% 的误判率(即布隆过滤器可能会误判某些不存在的 courseId 存在,但不会误判已存在的 courseId 不存在)。

可以在服务启动时或某个初始化阶段,加载所有有效的 courseId 到布隆过滤器中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;

@Service
public class CoursePublishService {

@Autowired
private StringRedisTemplate redisTemplate;

private BloomFilter<Long> bloomFilter;

public CoursePublishService() {
// 初始化布隆过滤器,假设预计存储100万条数据,误判率为1%
bloomFilter = BloomFilter.create(Funnels.longFunnel(), 1000000, 0.01);
// 在服务启动时或某个初始化阶段,加载所有有效的 courseId 到布隆过滤器
initBloomFilter();
}

// 初始化布隆过滤器,将所有存在的 courseId 加载进去
private void initBloomFilter() {
// 从数据库或者其他来源加载所有有效的 courseId
// Example: List<Long> courseIds = courseRepository.findAllCourseIds();
// for (Long courseId : courseIds) {
// bloomFilter.put(courseId);
// }

// 假设数据源中有一些 courseId
bloomFilter.put(1L);
bloomFilter.put(2L);
bloomFilter.put(3L);
// 继续加载其他 courseId...
}

public CoursePublish getCoursePublishCache(Long courseId) {
// 先检查布隆过滤器,判断 courseId 是否可能存在
if (!bloomFilter.mightContain(courseId)) {
// 如果布隆过滤器判断 courseId 不存在,直接返回 null
return null;
}

// 查询 Redis 缓存
Object jsonObj = redisTemplate.opsForValue().get("course:" + courseId);

if (jsonObj != null) {
// 如果缓存中存在数据
String jsonString = jsonObj.toString();

if (jsonString.equals("null")) {
// 如果缓存中的值为 "null",返回 null
return null;
}

// 将 JSON 字符串解析为 CoursePublish 对象
CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);
return coursePublish;
} else {
// 如果缓存中不存在数据,从数据库查询
System.out.println("从数据库查询数据...");
CoursePublish coursePublish = getCoursePublish(courseId);

// 如果查询结果为空,防止缓存穿透,向缓存中写入 "null",并设置较短的过期时间
if (coursePublish == null) {
redisTemplate.opsForValue().set("course:" + courseId, "null", 300, TimeUnit.SECONDS);
} else {
// 将查询结果存入缓存,并设置过期时间为 300 秒
redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish), 300, TimeUnit.SECONDS);
}

return coursePublish;
}
}

// 模拟从数据库查询课程发布数据
private CoursePublish getCoursePublish(Long courseId) {
// 查询数据库的逻辑...
// 如果数据库中没有这个 courseId 的数据,返回 null
return null;
}
}

通过使用 Guava 布隆过滤器,我们可以在缓存查询之前先判断 courseId 是否可能存在,避免由于大量查询不存在的数据而导致的缓存穿透问题。这种优化方法非常适合高并发下的大规模缓存场景,并且通过合理设置布隆过滤器的参数(如误判率和预计数据量),可以在 性能准确性 之间找到平衡。

误判率 p = 0.01(1%)

  • 是大多数场景下的推荐值,适合一般业务场景,如防止缓存穿透、日志过滤、请求去重等。

Redis 插件RedisBloom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public CoursePublish getCoursePublishCache(Long courseId) {
// 1. 使用布隆过滤器判断该 courseId 是否可能存在
boolean existsInBloomFilter = bloomFilter.mightContain(courseId);

if (!existsInBloomFilter) {
// 如果布隆过滤器认为该 courseId 不存在,直接返回 null,避免查询缓存和数据库
return null;
}

// 2. 查询缓存
Object jsonObj = redisTemplate.opsForValue().get("course:" + courseId);

if (jsonObj != null) {
// 如果缓存中存在数据
String jsonString = jsonObj.toString();

if (jsonString.equals("null")) {
// 如果缓存中的值为 "null",返回 null
return null;
}

// 将 JSON 字符串解析为 CoursePublish 对象
CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);
return coursePublish;
} else {
// 如果缓存中不存在数据,从数据库查询
System.out.println("从数据库查询数据...");
CoursePublish coursePublish = getCoursePublish(courseId);

if (coursePublish == null) {
// 如果数据库中也没有找到该数据,将一个空值写入缓存以防止缓存穿透,并设置过期时间
redisTemplate.opsForValue().set("course:" + courseId, "null", 300, TimeUnit.SECONDS);
return null;
}

// 将查询结果存入缓存,并设置过期时间为 300 秒
redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish), 300, TimeUnit.SECONDS);

return coursePublish;
}
}

缓存雪崩

缓存雪崩是缓存中大量key失效后当高并发到来时导致大量请求到数据库,瞬间耗尽数据库资源,导致数据库无法使用。

造成缓存雪崩问题的原因是是大量key拥有了相同的过期时间,比如对课程信息设置缓存过期时间为10分钟,在大量请求同时查询大量的课程信息时,此时就会有大量的课程存在相同的过期时间,一旦失效将同时失效,造成雪崩问题。

对同一类型信息的key设置不同的过期时间

1
2
//设置过期时间300秒
redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish),300+new Random().nextInt(100), TimeUnit.SECONDS);

缓存击穿

缓存击穿是指大量并发访问同一个热点数据,当热点数据失效后同时去请求数据库,瞬间耗尽数据库资源,导致数据库无法使用。

比如某手机新品发布,当缓存失效时有大量并发到来导致同时去访问数据库。

image-20240930222111812

解决方法

  • 互斥锁
    • 当缓存中没有找到数据时,先获取一个分布式锁(如Redis的setnx命令)。
    • 只有获取到锁的请求才能去查询数据库并更新缓存,其他请求等待。
    • 查询完数据库后,释放锁,其他等待的请求再从缓存中读取数据。
  • 逻辑过期
    • 缓存中存储的数据不仅包括实际值,还包含一个逻辑过期时间。
    • 当请求到达时,如果数据未过期直接返回;如果已过期,异步更新缓存,同时返回过期前的数据。
    • 异步更新可以通过后台任务或定时任务来实现。【异步更新】

先说说分布式锁

在说分布式锁之前先说本地锁的问题

但如果将同步锁的程序,分布式部署在多个jvm上,则无法保证同一个key只会查询一次数据库

image-20240930222549903

  • 一个同步锁程序只能保证同一个jvm在多个线程只有一个线程去访问数据库
    • 但如果高并发通过网关负载均衡转发给各个虚拟机,此时就会存在多个线程去查询数据库
    • 因为jvm中的锁只能保证该虚拟机自己的线程去同步执行,无法跨jvm保证同步执行

分布式锁

  • 本地锁只能控制所在JVM中的线程同步执行,现在要实现分布式环境下所有虚拟机中的线程去同步执行,就需要让多个JVM使用同一把锁,JVM可以分布式部署,锁也可以分布式部署

image-20240930222758862

  • JVM去抢同一把锁,锁是一个单独的程序,提供加锁、解锁服务,谁抢到锁谁就去查询数据库
  • 该锁不属于某个虚拟机,而是分布式部署,由多个虚拟机共享,这种锁叫分布式锁

分布式锁的方案

分布式锁的方案很多,常用的如下

  1. 基于数据库实现分布式锁
    • 利用数据库主键唯一的特性,或利用数据库唯一索引的特点,多个县城同时去插入相同的记录,谁插入成功谁就抢到锁
  2. 基于Redis实现分布式锁
    • Redis提供了分布式锁的实现方案,例如:SETNX、Redisson等
    • 拿SETNX举例,SETNX是set一个不存在的key(set if not exists),多个线程去设置同一个key,只会有一个线程设置成功,设置成功的线程拿到锁
  3. 使用zookeeper实现
    • zookeeper是一个分布式协调事务,主要解决分布式程序之间的同步问题
    • zookeeper的结构类似文件目录,多线程向zookeeper创建一个子目录(节点)只会有一个创建成功,可以利用此特点实现分布式锁,谁创建该节点成功,谁就获得锁

Redisson

image-20240930223933277

加锁机制

线程去获取锁,获取成功::执行lua脚本,保存数据到redis数据库。

线程去获取锁,获取失败:一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis。

WatchDog自动延期看门狗机制

【主要解决分布式锁自动续期】

第一种情况:在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(当不设置默认30秒时),这样的目的主要是防止死锁的发生。

第二种情况:线程A业务还没有执行完,时间就过了,线程A还想持有锁的话,就会启动一个watchdog后台线程,不断的延长锁key的生存时间。

QA:

Q:看门狗续期是否合理?

A:合理,因为任务未完成。加个超时时间就ok

Q:一旦客户端挂了但是锁还没释放怎么办?

A:

当使用Redis分布式锁时,如果某台应用服务器在执行任务的过程中挂掉了,而锁还没有及时释放,可能会导致锁无法被其他进程获取,进而影响系统的正常运行。

但是,Redisson的分布式锁有一个自动过期机制。即使机挂掉,Redisson的后台任务无法继续运行,锁也不会永久存在。到了锁的过期时间后,Redis会自动将锁释放,避免因为某个实例挂掉导致锁一直无法被释放的问题。

这样,即便客户端挂掉,锁也会在设定的过期时间后自动解除,确保其他客户端可以继续获取锁,保证系统的高可用性和稳定性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Autowired
RedissonClient redissonClient;

@Override
public CoursePublish getCoursePublishCache(Long courseId) {
// 1. 先从缓存中查询
String courseCacheJson = redisTemplate.opsForValue().get("course:" + courseId);

// 2. 如果缓存里有,直接返回
if (StringUtils.isNotEmpty(courseCacheJson)) {
log.debug("从缓存中查询");
if ("null".equals(courseCacheJson)) {
return null;
}
CoursePublish coursePublish = JSON.parseObject(courseCacheJson, CoursePublish.class);
return coursePublish;
} else {
// 使用 Redisson 分布式锁,防止缓存击穿
RLock lock = redissonClient.getLock("courseQueryLock" + courseId);
lock.lock();
try {
// 再次从缓存中查询,防止在获取锁的过程中其他线程已经更新了缓存
courseCacheJson = redisTemplate.opsForValue().get("course:" + courseId);

// 如果缓存里有,直接返回
if (StringUtils.isNotEmpty(courseCacheJson)) {
log.debug("从缓存中查询");
if ("null".equals(courseCacheJson)) {
return null;
}
CoursePublish coursePublish = JSON.parseObject(courseCacheJson, CoursePublish.class);
return coursePublish;
}

log.debug("缓存中没有,查询数据库");
System.out.println("缓存中没有,查询数据库");

// 3. 如果缓存里没有,查询数据库
CoursePublish coursePublish = coursePublishMapper.selectById(courseId);

if (coursePublish == null) {
// 如果数据库中也没有,缓存一个空值,防止缓存穿透
redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(null), 30 + new Random().nextInt(100), TimeUnit.SECONDS);
return null;
}

String jsonString = JSON.toJSONString(coursePublish);

// 3.1 将查询结果缓存
redisTemplate.opsForValue().set("course:" + courseId, jsonString, 300 + new Random().nextInt(100), TimeUnit.SECONDS);

// 3.1 返回查询结果
return coursePublish;
} finally {
// 释放锁
lock.unlock();
}
}
}
  • 此方法为加锁,但是锁的有效期采用默认30秒

  • 如果主线程未释放,且当前锁未调用unlock方法,则进入到watchDog机制

  • 如果主线程未释放,且当前锁调用unlock方法,则直接释放锁

其他的一些方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public interface RRLock {

//----------------------Lock接口方法-----------------------
/**
* 加锁 锁的有效期默认30秒
*/
void lock();

/**
* 加锁 可以手动设置锁的有效时间
*
* @param leaseTime 锁有效时间
* @param unit 时间单位 小时、分、秒、毫秒等
*/
void lock(long leaseTime, TimeUnit unit);

/**
* tryLock()方法是有返回值的,用来尝试获取锁,
* 如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .
*/
boolean tryLock();

/**
* tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,
* 只不过区别在于这个方法在拿不到锁时会等待一定的时间,
* 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
*
* @param time 等待时间
* @param unit 时间单位 小时、分、秒、毫秒等
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

/**
* 比上面多一个参数,多添加一个锁的有效时间
*
* @param waitTime 等待时间
* @param leaseTime 锁有效时间
* @param unit 时间单位 小时、分、秒、毫秒等
* waitTime 大于 leaseTime
*/
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

/**
* 解锁
*/
void unlock();
}

image-20240930225611575

多级缓存

当然,我们也可以通过本地缓存和分布式缓存进一步优化。

问题1:本地缓存 VS 分布式缓存

本地缓存在集群环境中,会存在不—致的问题。

问题2:如何保证本地缓存的一致性?

Cache Aside(旁路缓存):最常见的缓存策略,通常被称为“读写分离”模式。

  • :从缓存中读取数据,缓存中没有则从数据库读取,并将结果写入缓存。
  • :当数据库更新时,先更新数据库,再删除缓存中的旧数据(而不是更新缓存)。