关键词搜索

源码搜索 ×
×

Java使用Map模拟事务控制-跨系统数据一致性处理

发布2016-08-15浏览1888次

详情内容

首先,在复杂的业务系统中,尤其是跨系统交互如何保证两边数据一致性是极其重要的。下面就来模拟一下使用Map与Mybatis实现的事务控制的实例:

1.业务场景描述

WMS-OMS库存同步,实现两边库存一致性。

2.WMS-OMS接口交互缺陷

业务处理逻辑代码:

  1. try{
  2. // ====需要事务的业务操作====
  3. //WMS操作1
  4. //WMS操作2
  5. //WMS/操作3
  6. //OMS操作1----异步线程处理
  7. //WMS操作4
  8. //OMS操作2----异步线程处理
  9.   //WMS操作5
  10. }catch(Exception e){
  11. // 事务回滚
  12. TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
  13. }finally{
  14. // 其他处理
  15. }

这里处理过程中,从WMS操作4开始就已经出现弊端了,万一后面操作异常WMS这边的数据都会回滚,而线程已经执行不能够回滚处理就存在两边数据不一致的情况。

3.使用Map管理两边库存差异数据

以新增库存记录到OMS为例:

  1. public Integer insert(String key,WmStock model) {
  2. String memberCode = EhcacheUtil.get("JZTD_OMS_MEMBERCODE").toString();
  3. // 九州通达OMS同步处理
  4. if(EhcacheUtil.get("OMS_API_ISENABLE").toString().equals("true")&&memberCode.equals(model.getWmstCustomerCode()+"")){
  5. int flag=wmStockMapper.insert(model);
  6. if(flag>0){
  7. Log.getLogger(getClass()).info("<<<<<<<<<<<<<<更新数据到九州通达OMS>>>>>>>>>>>>EVENT:insert(WmStock model):"+(flag>0?true:false));
  8. // 线程更新库存(increment)
  9. //this.updateStockToOMSBySkuIndentifyInfo(true, model);
  10. // ===放入待处理缓存对象===
  11. List<JZTDOmsProduct> stocks=StockMapHandler.get(key);
  12. if(stocks==null||stocks.size()==0){
  13. List<JZTDOmsProduct> stocks2=new ArrayList<JZTDOmsProduct>();
  14. JZTDOmsProduct product=getProductDifferenceNumberFromStock(true, model);
  15. stocks2.add(product);
  16. StockMapHandler.put(key, stocks2);
  17. }else{
  18. JZTDOmsProduct product=getProductDifferenceNumberFromStock(true, model);
  19. stocks.add(product);
  20. StockMapHandler.put(key, stocks);
  21. }
  22. }
  23. return flag;
  24. }
  25. // 一般处理
  26. return wmStockMapper.insert(model);
  27. }
注:新增库存两边直接同步不需要计算差异。
交互数据结构:
  1. /**
  2. * 封装差异库存数据
  3. *
  4. * @MethodName: getProductDifferenceNumberFromStock
  5. * @Description:
  6. * @param key
  7. * @param isIncrement
  8. * @param wmStock
  9. * @return
  10. * @throws
  11. */
  12. private JZTDOmsProduct getProductDifferenceNumberFromStock(boolean isIncrement,WmStock wmStock){
  13. int memberCode = Integer.valueOf(EhcacheUtil.get("JZTD_OMS_MEMBERCODE").toString());
  14. CdWhItme item=CdWhItmeService.selectByPrimaryKeyOfTabel(memberCode, wmStock.getWmstSkuId());
  15. String productCode=item==null?"":item.getCdskItemCode();
  16. String productDate=wmStock.getWmstProductDate()==null?null:DateUtil.date2String(wmStock.getWmstProductDate(), DateUtil.PATTERN_STANDARD);
  17. CdCustomer customer=cdWhCustomerService.selectByCustomerName(wmStock.getWmstCustomer(), memberCode+"");
  18. String customerCode=customer==null?"":customer.getCdstCustomerCode();
  19. JZTDOmsProduct product=null;
  20. if(isIncrement){
  21. product=new JZTDOmsProduct(customerCode,productCode, wmStock.getWmstNowNumber(), wmStock.getWmstEnabledNumber(), wmStock.getWmstSkuUnit(), wmStock.getWmstSkuBatch(),productDate );
  22. }else{
  23. product=new JZTDOmsProduct(customerCode,productCode, wmStock.getWmstNowNumber()==0?0:-wmStock.getWmstNowNumber(), wmStock.getWmstEnabledNumber()==0?0:-wmStock.getWmstEnabledNumber(), wmStock.getWmstSkuUnit(), wmStock.getWmstSkuBatch(),productDate );
  24. }
  25. return product;
  26. }
  27. @Override
  28. public void executeStockToOMSBySkuIndentifyInfo(String key,String token) {
  29. List<JZTDOmsProduct> produts=StockMapHandler.get(key);
  30. if(produts==null||produts.size()==0){
  31. Log.getLogger(getClass()).info(">>>>>>>>开启线程处理库存同步到九州通达OMS:.................验证缓存库存同步数据失败:key="+key);
  32. return ;
  33. }
  34. Map<String,Object> map=new HashMap<String, Object>();
  35. map.put("Products", produts);
  36. String requestBody= Tools.toJson(map);
  37. // 线程处理库存同步
  38. Log.getLogger(getClass()).info(">>>>>>>>开启线程处理库存同步到九州通达OMS:.................线程处理中---DATA:"+requestBody);
  39. new UpdateProductStockThread(jztdapiService, ebInterfaceLogService,requestBody, token, StaticProperty.OMS_WMS_INTERFACE_UPDATEPRODUCTSTOCK,true).start();
  40. }

创建Handler:

  1. package com.wlyd.fmcgwms.util;
  2. import java.util.List;
  3. import java.util.Map;
  4. import java.util.concurrent.ConcurrentHashMap;
  5. import com.wlyd.fmcgwms.persistence.beans.api.JZTDOmsProduct;
  6. import com.wlyd.fmcgwms.service.basic.WmStockService;
  7. /**
  8. * 库存缓存对象处理类
  9. *
  10. * @packge com.wlyd.fmcgwms.util.StockMapHandler
  11. * @date 2016年8月15日 上午10:30:10
  12. * @author pengjunlin
  13. * @comment
  14. * @update
  15. */
  16. public class StockMapHandler{
  17. /**
  18. * WmStock库存差异对象存储
  19. */
  20. public static Map<String, List<JZTDOmsProduct>> map = new ConcurrentHashMap<String, List<JZTDOmsProduct>>(
  21. 5000);
  22. /**
  23. * 获取List对象
  24. *
  25. * @MethodName: get
  26. * @Description:
  27. * @param key
  28. * @return
  29. * @throws
  30. */
  31. public static List<JZTDOmsProduct> get(String key) {
  32. if (map.containsKey(key)) {
  33. return map.get(key);
  34. }
  35. return null;
  36. }
  37. /**
  38. * 存储List对象
  39. *
  40. * @MethodName: put
  41. * @Description:
  42. * @param key
  43. * @param produts
  44. * @throws
  45. */
  46. public static void put(String key, List<JZTDOmsProduct> produts) {
  47. map.put(key, produts);
  48. }
  49. /**
  50. * 移除List对象
  51. *
  52. * @MethodName: put
  53. * @Description:
  54. * @param key
  55. * @throws
  56. */
  57. public static void remove(String key) {
  58. if (map.containsKey(key)) {
  59. map.remove(key);
  60. }
  61. }
  62. /**
  63. * 清除Map中存储的所有对象
  64. *
  65. * @MethodName: clear
  66. * @Description:
  67. * @throws
  68. */
  69. public static void clear(){
  70. map.clear();
  71. }
  72. public class Handler implements Runnable{
  73. private WmStockService wmStockService;
  74. private String key;
  75. public Handler( WmStockService wmStockService, String key){
  76. this.wmStockService=wmStockService;
  77. this.key=key;
  78. }
  79. @Override
  80. public void run() {
  81. wmStockService.executeStockToOMSBySkuIndentifyInfo(key, SAASTokenManager.getToken());
  82. }
  83. }
  84. }

测试类方法:
  1. package fmcgwms;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.test.context.ContextConfiguration;
  6. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  7. import com.wlyd.fmcgwms.service.basic.WmStockService;
  8. import com.wlyd.fmcgwms.util.StockMapHandler;
  9. @RunWith(SpringJUnit4ClassRunner.class)
  10. @ContextConfiguration(locations = "classpath:applicationContext.xml")
  11. public class StockMapHandlerTest {
  12. @Autowired
  13. WmStockService wmStockService;
  14. @Test
  15. public void testHandler(){
  16. String key="10000"+System.currentTimeMillis();
  17.         
  18.         StockMapHandler stockMapHandler=new StockMapHandler();
  19.         
  20.         new Thread(stockMapHandler.new Handler(jztdapiService,ebInterfaceLogService, key,false)).start();
  21.         
  22.         try {
  23.             Thread.sleep(8000);// 线程等待执行,避免未执行完现场spring容器已销毁
  24.         } catch (InterruptedException e) {
  25.             e.printStackTrace();
  26.         }
  27. }
  28. }

4.Map控制的缓存数据调用

  1. //对应的多个业务事务方法处理
  2. public void doBusiness(String key){
  3. //key="uniqueString";
  4. try{
  5. // ====需要事务的业务操作====
  6. //WMS操作1
  7. //WMS操作2
  8. //WMS/操作3
  9. //OMS操作1----异步线程处理(不执行线程)
  10. //WMS操作4(For instance:insertStock----map set different stock)
  11. //OMS操作2----异步线程处理(不执行线程)
  12. //WMS操作5(For instance:updateStock----map set different stock)
  13. }catch(Exception e){
  14. // 事务回滚
  15. TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
  16. }finally{
  17. // 统一执行异步线程同步库存
  18. //handler 处理
  19. StockMapHandler stockMapHandler=new StockMapHandler();
  20. stockMapHandler.new Handler(wmStockService, key);
  21. // 移除该业务操作的缓存对象    
  22. StockMapHandler.remove(key);
  23. }
  24. }

5.关于key是否会重复的问题

在StockMapHandler.java 中新增一个生成:企业编码+UUID的唯一标识

  1. /**
  2. * 生成企业的UUID编码
  3. *
  4. * @MethodName: generateUuidKey
  5. * @Description:
  6. * @param esCorCode
  7. * @return
  8. * @throws
  9. */
  10. public static String generateUuidKey(String esCorCode){
  11. String uuid=UUID.randomUUID().toString();
  12. uuid=uuid.replaceAll("-", "");
  13. uuid=esCorCode+uuid;
  14. return uuid;
  15. }

另外需要注意,以下操作应该放在线程内执行,避免出错:

  1. // 移除该业务操作的缓存对象    
  2. StockMapHandler.remove(key);

6.测试UUID的唯一性

原理:入库key不重复,那么map输出的长度就是循环的长度。

  1. package ebwms;
  2. import java.util.Map;
  3. import java.util.UUID;
  4. import java.util.concurrent.ConcurrentHashMap;
  5. public class Test {
  6. public static Map<String, Object> map = new ConcurrentHashMap<String,Object>(
  7. 5000);
  8. public String generateUuidKey(String esCorCode){
  9. String uuid=UUID.randomUUID().toString();
  10. uuid=uuid.replaceAll("-", "");
  11. uuid=esCorCode+uuid;
  12. return uuid;
  13. }
  14. public static void main(String[] args) {
  15. Test t=new Test();
  16. // 快速验证1000000个UUID
  17. for (int i = 1; i <= 1000000; i++) {
  18. String key=t.generateUuidKey("P0000007");
  19. map.put(key, key);
  20. }
  21. System.out.println(map.size());
  22. try {
  23. Thread.sleep(20000);
  24. } catch (InterruptedException e) {
  25. // TODO Auto-generated catch block
  26. e.printStackTrace();
  27. }
  28. }
  29. }
运行测试输出:1000000.

 至此,我们的事务控制配合就完美解决了线程引起的数据不一致问题。executeStockToOMSBySkuIndentifyInfo实际上就是Handler线程需要处理的方法,可以将UpdateProductStockThread线程内部的处理放到Handler稍加修改,避免线程中又开线程。



相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载