基于消息队列的异步秒杀

This commit is contained in:
fang 2025-12-24 20:05:03 +08:00
parent 1559befe29
commit d3a3654bec
4 changed files with 260 additions and 38 deletions

View File

@ -12,7 +12,8 @@ public class RedissonConfig {
public RedissonClient redissonClient() {
//1.创建配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.56.10:6379").setPassword("123456");
// 这里的地址估计不太对
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("ningzaichun");
//2.根据 Config 创建出 RedissonClient 实例
return Redisson.create(config);
}

View File

@ -17,4 +17,6 @@ public interface IVoucherOrderService extends IService<VoucherOrder> {
Result seckillVoucher(Long voucherId);
Result creatVoucheOrder(Long voucherId);
void creatVoucheOrderByvoucherOrder(VoucherOrder voucherOrder);
}

View File

@ -1,5 +1,6 @@
package com.hmdp.service.impl;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.db.handler.RsHandler;
import com.hmdp.config.RedissonConfig;
import com.hmdp.dto.Result;
@ -14,17 +15,28 @@ import com.hmdp.service.IVoucherService;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.SimpleRedisLock;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.aop.framework.AopProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
* <p>
@ -34,6 +46,7 @@ import java.time.LocalDateTime;
* @author 虎哥
* @since 2021-12-22
*/
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
@ -44,48 +57,214 @@ public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, Vou
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
private IVoucherOrderService proxy;
// 根据vorcherId查询优惠券
SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);
// 判断秒杀功能是否已经开始或结束
if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀功能还未开始");
} else if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀功能已经结束");
}
// 判断库存是否充足
if(seckillVoucher.getStock()<1){
return Result.fail("库存不足");
}
Long userID = UserHolder.getUser().getId();
// // 基于悲观锁但是这个在集群环境下还是会导致线程不安全
// synchronized(userID.toString().intern()) {
// // 获取代理对象才能让事务生效
// IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
// return proxy.creatVoucheOrder(voucherId);
// 初始的秒杀功能速度慢一个进程串行执行
// public Result seckillVoucher(Long voucherId) {
//
// // 根据vorcherId查询优惠券
// SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);
// // 判断秒杀功能是否已经开始或结束
// if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
// return Result.fail("秒杀功能还未开始");
// } else if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
// return Result.fail("秒杀功能已经结束");
// }
// 不再用自己实现的分布式锁了因为它存在不可重入不可重试超时释放主从一致的问题
// SimpleRedisLock simpleRedisLock = new SimpleRedisLock("oreder:" + userID, stringRedisTemplate);
// boolean isLock = simpleRedisLock.tryLock(1200L);
// 利用redission 的分布式锁可以解决上述问题
RLock lock = redissonClient.getLock("lock:oreder:" + userID);
boolean isLock = lock.tryLock();
if(!isLock){
// // 判断库存是否充足
// if(seckillVoucher.getStock()<1){
// return Result.fail("库存不足");
// }
// Long userID = UserHolder.getUser().getId();
//
//// // 基于悲观锁但是这个在集群环境下还是会导致线程不安全
//// synchronized(userID.toString().intern()) {
//// // 获取代理对象才能让事务生效
//// IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
//// return proxy.creatVoucheOrder(voucherId);
//// }
// // 不再用自己实现的分布式锁了因为它存在不可重入不可重试超时释放主从一致的问题
//// SimpleRedisLock simpleRedisLock = new SimpleRedisLock("oreder:" + userID, stringRedisTemplate);
//// boolean isLock = simpleRedisLock.tryLock(1200L);
// // 利用redission 的分布式锁可以解决上述问题
// RLock lock = redissonClient.getLock("lock:oreder:" + userID);
// boolean isLock = lock.tryLock();
// if(!isLock){
// return Result.fail("请勿重复下单");
// }
// // 获取代理对象才能让事务生效
// try {
// IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
// return proxy.creatVoucheOrder(voucherId);
// } finally {
// // 释放锁
// lock.unlock();
// }
//
// }
private static final DefaultRedisScript<Long> SECKIL_SCRIPT;
// 通过静态代码块在类初始化的时候就创建了lua脚本
static {
SECKIL_SCRIPT = new DefaultRedisScript<>();
SECKIL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKIL_SCRIPT.setResultType(Long.class);
}
// 线程池处理器
private static final ExecutorService SECKIL_USER_ORDER = Executors.newSingleThreadExecutor();
@PostConstruct // 这个注解会在类初始化完成就执行
public void init() {
SECKIL_USER_ORDER.submit(new vorcherOrderHandler());
}
//消息队列
public class vorcherOrderHandler implements Runnable{
@Override
public void run() {
while (true){
try {
// 1.获取消息队列seckill:orders里的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS seckill:orders >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("seckill:orders", ReadOffset.lastConsumed())
);
// 2. 获取失败继续监听获取
if(list==null || list.isEmpty()){
continue;
}
MapRecord<String, Object, Object> entries = list.get(0);
Map<Object, Object> values = entries.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 3. 获取成功往数据库添加
proxy.creatVoucheOrderByvoucherOrder(voucherOrder);
// 4. XACK来确认消息已被处理
stringRedisTemplate.opsForStream().acknowledge("seckill:orders", "g1", entries.getId());
} catch (Exception e) {
// throw new RuntimeException(e);
log.error("异步处理秒杀订单异常",e);
handelePendingList();
}
}
}
}
private void handelePendingList() {
while (true){
try {
// 1.获取消息队列pending-list里的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS seckill:orders 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("seckill:orders", ReadOffset.from("0"))
);
// 2. 获取pending-list失败结束循环
if(list==null || list.isEmpty()){
break;
}
MapRecord<String, Object, Object> entries = list.get(0);
Map<Object, Object> values = entries.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 3. 获取成功往数据库添加
proxy.creatVoucheOrderByvoucherOrder(voucherOrder);
// 4. XACK来确认消息已被处理
stringRedisTemplate.opsForStream().acknowledge("seckill:orders", "g1", entries.getId());
} catch (Exception e) {
// throw new RuntimeException(e);
log.error("异步处理秒杀订单异常",e);
}
}
}
// 阻塞队列
// private BlockingQueue<VoucherOrder> ORDER_TASK = new ArrayBlockingQueue<>(1024*1024);
// public class vorcherOrderHandler implements Runnable{
//
// @Override
// public void run() {
// while (true){
// try {
// // 获取阻塞队列里的订单信息
// VoucherOrder voucherOrder = ORDER_TASK.take();
// // 往数据集添加实际上不需要像下面一样加锁但是为了给redis兜底还是加了锁
// Long userID = voucherOrder.getUserId();
// RLock lock = redissonClient.getLock("lock:oreder:" + userID);
// boolean isLock = lock.tryLock();
// if(!isLock){
// log.error("异步下单:请勿重复下单");
// return ;
// }
// // 获取代理对象才能让事务生效
// try {
//
//// return proxy.creatVoucheOrder(voucherId);
// proxy.creatVoucheOrderByvoucherOrder(voucherOrder);
// } finally {
// // 释放锁
// lock.unlock();
// }
// } catch (Exception e) {
//// throw new RuntimeException(e);
// log.error("异步处理秒杀订单异常",e);
// }
// }
// }
// }
@Override
// 用lua脚本判断是否有下单资格有资格将订单id用户id优惠券id加入消息队列seckill:orders
public Result seckillVoucher(Long voucherId) {
// 获取用户id
Long userID = UserHolder.getUser().getId();
Long orderId = redisIdWorker.nextId("order");
// 调用lua脚本
Long result =stringRedisTemplate.execute(
SECKIL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),userID.toString(),orderId.toString()
);
int r = result.intValue();
if(r==1){
return Result.fail("库存不足");
} else if (r == 2) {
return Result.fail("请勿重复下单");
}
// 获取代理对象才能让事务生效
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.creatVoucheOrder(voucherId);
} finally {
// 释放锁
lock.unlock();
}
// 因为其他方法子线程没法获取到这个代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
System.out.println("下单成功的id");
System.out.println(orderId);
return Result.ok(orderId);
}
// @Override
// // 用lua脚本判断是否有下单资格然后存入阻塞队列其他进程执行异步下单
// public Result seckillVoucher(Long voucherId) {
// // 获取用户id
// Long userID = UserHolder.getUser().getId();
// // 调用lua脚本
// Long result =stringRedisTemplate.execute(
// SECKIL_SCRIPT,
// Collections.emptyList(),
// voucherId.toString(),userID.toString()
// );
// int r = result.intValue();
// if(r==1){
// return Result.fail("库存不足");
// } else if (r == 2) {
// return Result.fail("请勿重复下单");
// }
// // 成功的话需要加入到阻塞队列中等待其他进程处理数据库操作
// Long orderId = redisIdWorker.nextId("order");
// VoucherOrder voucherOrder = new VoucherOrder();
// voucherOrder.setId(orderId);
// voucherOrder.setUserId(userID);
// voucherOrder.setVoucherId(voucherId);
// ORDER_TASK.add(voucherOrder);
// // 因为其他方法子线程没法获取到这个代理对象
// proxy = (IVoucherOrderService) AopContext.currentProxy();
// System.out.println("下单成功的id");
// System.out.println(orderId);
// return Result.ok(orderId);
// }
@Transactional
public Result creatVoucheOrder(Long voucherId) {
Long userID = UserHolder.getUser().getId();
@ -117,4 +296,30 @@ public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, Vou
return Result.ok(orderId);
}
@Transactional
@Override
public void creatVoucheOrderByvoucherOrder(VoucherOrder voucherOrder) {
Long userID = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
// 判断当前用户是否已经抢过订单了
Integer count = query().eq("user_id", userID).eq("voucher_id", voucherId).count();
if(count>0){
log.error("您已经抢过该优惠券了");
return;
}
// 扣减库存,乐观锁CAS法
// boolean succed = iSeckillVoucherService.update().setSql("stock = stock-1").
// eq("voucher_id",voucherId).eq("stock",seckillVoucher.getStock()).update(); 这种情况会导致即使有很多余额也会说库存不足
boolean succed = iSeckillVoucherService.update().setSql("stock = stock-1").
eq("voucher_id",voucherId).gt("stock",0).update();
if(!succed){
System.out.println("失败");
return;
}
save(voucherOrder);
}
}

View File

@ -11,6 +11,20 @@ local userID = ARGV[2]
local orderID = ARGV[3]
local stockKey = "seckill:stock:" .. voucherID
--- 解决一人一单问题保存当前优惠券有哪些用户购买key是优惠券idvalue是用户id一个set集合
local orderKey = "seckill:order:" .. voucherID
local stock = redis.call("get", stockKey);
--- 判断库存是否充足
if tonumber(redis.call("get", stockKey)) <= 0 then
return 1
end
--- 判断用户是否下单
if redis.call("sismember",orderKey, userID) ==1 then
return 2
end
--- 扣库存加用户id
redis.call("incrby",stockKey,-1)
redis.call("sadd",orderKey, userID)
--- 前面已经判断好了用户是否有下单资格这里可以直接往消息队列seckill:orders添加消息
redis.call("xadd","seckill:orders", "*", "userId", userID, "voucherId", voucherID, "id", orderID)
return 0