本文将详细讲解EF Core与MySQL的事务和并发处理,分为三个部分:使用事务、处理并发冲突(乐观并发)以及悲观并发(MySQL中使用锁)。
-
使用事务
在EF Core中,可以使用事务来确保一系列操作要么全部成功,要么全部失败。EF Core支持多种事务管理方式,包括自动事务(SaveChanges自动包装事务)和显式事务。 -
处理并发冲突(乐观并发)
乐观并发假设多个事务很少冲突,因此不会立即锁定资源。而是在更新时检查数据是否被其他事务修改过。EF Core通过配置并发令牌(Concurrency Token)来实现乐观并发。 -
悲观并发(MySQL中使用锁)
悲观并发假设冲突经常发生,因此在读取数据时就会锁定资源,防止其他事务修改。在MySQL中,可以通过使用事务和SELECT … FOR UPDATE语句来实现悲观并发。
1. 使用事务
基本事务用法
// 使用DbContext的Database.BeginTransaction() using (var transaction = context.Database.BeginTransaction()) { try { // 执行多个数据库操作 var product = new Product { Name = "New Product", Price = 99.99m, Stock = 10 }; context.Products.Add(product); var order = new Order { CustomerId = 1, OrderDate = DateTime.Now }; context.Orders.Add(order); // 保存更改 context.SaveChanges(); // 提交事务 transaction.Commit(); } catch (Exception) { // 回滚事务 transaction.Rollback(); throw; } }
使用TransactionScope(分布式事务)
// 需要安装System.Transactions包 using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) { try { // 多个数据库操作 using (var context1 = new ApplicationDbContext()) using (var context2 = new ApplicationDbContext()) { context1.Products.Add(new Product { Name = "Product 1", Price = 10.99m }); context2.Orders.Add(new Order { CustomerId = 1, OrderDate = DateTime.Now }); await context1.SaveChangesAsync(); await context2.SaveChangesAsync(); } // 完成事务 scope.Complete(); } catch (Exception) { // 事务会自动回滚 throw; } }
设置事务隔离级别
using (var transaction = context.Database.BeginTransaction(IsolationLevel.ReadCommitted)) { try { // 执行操作 context.Products.Add(new Product { Name = "New Product", Price = 99.99m }); context.SaveChanges(); transaction.Commit(); } catch (Exception) { transaction.Rollback(); throw; } }
异步事务处理
public async Task<bool> ProcessOrderAsync(int productId, int quantity) { using (var transaction = await context.Database.BeginTransactionAsync()) { try { // 检查库存 var product = await context.Products .FirstOrDefaultAsync(p => p.Id == productId); if (product == null || product.Stock < quantity) return false; // 减少库存 product.Stock -= quantity; context.Products.Update(product); // 创建订单 var order = new Order { ProductId = productId, Quantity = quantity, OrderDate = DateTime.Now }; context.Orders.Add(order); await context.SaveChangesAsync(); await transaction.CommitAsync(); return true; } catch (Exception) { await transaction.RollbackAsync(); throw; } } }
2. 处理并发冲突(乐观并发)
配置并发令牌
// 在实体中定义并发令牌属性 public class Product { public int Id { get; set; } public string Name { get; set; } public decimal Price { get; set; } public int Stock { get; set; } // 使用时间戳作为并发令牌 [Timestamp] public byte[] Version { get; set; } // 或者使用自定义的并发令牌 // public uint RowVersion { get; set; } } // 在DbContext中配置并发令牌 protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.Entity<Product>() .Property(p => p.Version) .IsRowVersion() .IsConcurrencyToken(); // 或者使用非时间戳字段作为并发令牌 modelBuilder.Entity<Product>() .Property(p => p.Name) .IsConcurrencyToken(); }
处理并发异常
public async Task<bool> UpdateProductAsync(Product updatedProduct) { try { context.Products.Update(updatedProduct); await context.SaveChangesAsync(); return true; } catch (DbUpdateConcurrencyException ex) { // 处理并发冲突 foreach (var entry in ex.Entries) { if (entry.Entity is Product) { // 获取数据库中的当前值 var databaseValues = await entry.GetDatabaseValuesAsync(); if (databaseValues == null) { // 实体已被删除 return false; } // 转换为实体 var databaseProduct = (Product)databaseValues.ToObject(); // 决定如何解决冲突 // 选项1: 使用客户端值 // entry.OriginalValues.SetValues(databaseValues); // await context.SaveChangesAsync(); // 选项2: 使用数据库值 // entry.CurrentValues.SetValues(databaseValues); // 选项3: 合并值 var currentValues = entry.CurrentValues; var resolvedValues = currentValues.Clone(); // 合并逻辑示例 resolvedValues["Name"] = currentValues["Name"]; // 保留新名称 resolvedValues["Price"] = databaseValues["Price"]; // 使用数据库价格 resolvedValues["Stock"] = Math.Max( (int)currentValues["Stock"], (int)databaseValues["Stock"]); // 使用较大库存 // 设置解析后的值 entry.OriginalValues.SetValues(databaseValues); entry.CurrentValues.SetValues(resolvedValues); // 重试保存 await context.SaveChangesAsync(); return true; } } return false; } }
自定义并发冲突解决策略
public class ConcurrencyHandler { public static async Task<bool> ResolveConcurrencyAsync( DbUpdateConcurrencyException ex, ApplicationDbContext context) { var saved = false; foreach (var entry in ex.Entries) { // 获取数据库当前值 var databaseValues = await entry.GetDatabaseValuesAsync(); if (databaseValues == null) { // 实体已被删除 entry.State = EntityState.Detached; continue; } // 获取原始值和当前值 var originalValues = entry.OriginalValues; var currentValues = entry.CurrentValues; // 根据实体类型应用不同的解决策略 if (entry.Entity is Product product) { await ResolveProductConcurrency(entry, databaseValues); } else if (entry.Entity is Order order) { await ResolveOrderConcurrency(entry, databaseValues); } } // 重试保存 try { await context.SaveChangesAsync(); saved = true; } catch (DbUpdateConcurrencyException) { // 如果再次失败,可能需要递归处理或放弃 } return saved; } private static async Task ResolveProductConcurrency( EntityEntry entry, PropertyValues databaseValues) { var databaseProduct = (Product)databaseValues.ToObject(); var currentProduct = (Product)entry.Entity; // 解决策略:保留较高的库存值,其他字段使用最新值 var resolvedStock = Math.Max(currentProduct.Stock, databaseProduct.Stock); // 更新实体 entry.OriginalValues.SetValues(databaseValues); currentProduct.Stock = resolvedStock; } }
3. 悲观并发(MySQL中使用锁)
使用SELECT … FOR UPDATE
// 使用原始SQL进行行级锁定 public async Task<Product> GetProductWithLockAsync(int productId) { // 开始事务 using (var transaction = await context.Database.BeginTransactionAsync(IsolationLevel.Serializable)) { try { // 使用SELECT ... FOR UPDATE锁定行 var product = await context.Products .FromSqlRaw("SELECT * FROM Products WHERE Id = {0} FOR UPDATE", productId) .AsNoTracking() .FirstOrDefaultAsync(); // 执行需要原子性的操作 // ... await transaction.CommitAsync(); return product; } catch (Exception) { await transaction.RollbackAsync(); throw; } } }
使用EF Core的锁机制
// 在EF Core中模拟行级锁 public async Task<bool> ReserveProductAsync(int productId, int quantity) { using (var transaction = await context.Database.BeginTransactionAsync(IsolationLevel.Serializable)) { try { // 获取产品并锁定行 var product = await context.Products .FirstOrDefaultAsync(p => p.Id == productId); if (product == null || product.Stock < quantity) { await transaction.RollbackAsync(); return false; } // 更新库存 product.Stock -= quantity; await context.SaveChangesAsync(); await transaction.CommitAsync(); return true; } catch (Exception) { await transaction.RollbackAsync(); throw; } } }
处理锁超时
// 在MySQL连接字符串中设置锁等待超时 var connectionString = "server=localhost;database=efcoredb;user=root;password=yourpassword;" + "Connection Timeout=30;Default Command Timeout=30;"; // 或者在DbContext中配置 protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { optionsBuilder.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString), options => options.EnableRetryOnFailure().CommandTimeout(30)); } // 处理锁超时异常 public async Task<bool> TryUpdateWithLockAsync(int productId, Action<Product> updateAction) { const int maxRetries = 3; for (int attempt = 0; attempt < maxRetries; attempt++) { using (var transaction = await context.Database.BeginTransactionAsync(IsolationLevel.Serializable)) { try { // 获取并锁定产品 var product = await context.Products .FirstOrDefaultAsync(p => p.Id == productId); if (product == null) return false; // 执行更新操作 updateAction(product); await context.SaveChangesAsync(); await transaction.CommitAsync(); return true; } catch (MySqlException ex) when (ex.Number == 1205) // Lock wait timeout { // 锁等待超时,重试 await transaction.RollbackAsync(); if (attempt == maxRetries - 1) throw; // 指数退避策略 await Task.Delay(TimeSpan.FromMilliseconds(100 * Math.Pow(2, attempt))); } catch (Exception) { await transaction.RollbackAsync(); throw; } } } return false; }
表级锁定
// 使用表级锁定(谨慎使用,影响性能) public async Task<bool> PerformMaintenanceAsync() { using (var transaction = await context.Database.BeginTransactionAsync(IsolationLevel.Serializable)) { try { // 锁定整个表 await context.Database.ExecuteSqlRawAsync("LOCK TABLES Products WRITE"); // 执行维护操作 await context.Database.ExecuteSqlRawAsync("UPDATE Products SET Price = Price * 1.1"); // 解锁表 await context.Database.ExecuteSqlRawAsync("UNLOCK TABLES"); await transaction.CommitAsync(); return true; } catch (Exception) { await context.Database.ExecuteSqlRawAsync("UNLOCK TABLES"); await transaction.RollbackAsync(); throw; } } }
4. 高级并发控制模式
使用版本号实现乐观并发
public class VersionedEntity { public int Id { get; set; } public uint Version { get; set; } // 使用无符号整数作为版本号 } // 配置版本号并发令牌 protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.Entity<Product>() .Property(p => p.Version) .IsConcurrencyToken() .ValueGeneratedOnAddOrUpdate(); } // 在更新时自动增加版本号 public override int SaveChanges() { var entries = ChangeTracker.Entries() .Where(e => e.State == EntityState.Modified || e.State == EntityState.Added); foreach (var entry in entries) { if (entry.Entity is VersionedEntity entity) { entity.Version++; } } return base.SaveChanges(); }
使用自定义锁机制
// 实现简单的应用级锁机制 public class ApplicationLockService { private static readonly ConcurrentDictionary<string, SemaphoreSlim> Locks = new ConcurrentDictionary<string, SemaphoreSlim>(); public async Task<T> ExecuteWithLockAsync<T>(string lockKey, Func<Task<T>> operation) { var lockObject = Locks.GetOrAdd(lockKey, key => new SemaphoreSlim(1, 1)); await lockObject.WaitAsync(); try { return await operation(); } finally { lockObject.Release(); } } } // 使用应用级锁 public async Task<bool> UpdateProductSafelyAsync(int productId, Action<Product> updateAction) { var lockService = new ApplicationLockService(); var lockKey = $"product_{productId}"; return await lockService.ExecuteWithLockAsync(lockKey, async () => { using (var context = new ApplicationDbContext()) { var product = await context.Products.FindAsync(productId); if (product == null) return false; updateAction(product); await context.SaveChangesAsync(); return true; } }); }
5. 监控和诊断并发问题
记录并发冲突
public class ConcurrencyLogger { private readonly ILogger<ConcurrencyLogger> _logger; public ConcurrencyLogger(ILogger<ConcurrencyLogger> logger) { _logger = logger; } public void LogConcurrencyConflict(DbUpdateConcurrencyException ex, string operation) { _logger.LogWarning("并发冲突发生在操作: {Operation}", operation); foreach (var entry in ex.Entries) { var databaseValues = entry.GetDatabaseValues(); var currentValues = entry.CurrentValues; var originalValues = entry.OriginalValues; _logger.LogWarning("实体: {EntityType}", entry.Metadata.Name); _logger.LogWarning("数据库值: {DatabaseValues}", databaseValues?.Properties); _logger.LogWarning("当前值: {CurrentValues}", currentValues?.Properties); _logger.LogWarning("原始值: {OriginalValues}", originalValues?.Properties); } } }
性能计数器监控
// 监控并发冲突率 public class ConcurrencyMonitor { private long _totalOperations; private long _concurrencyExceptions; public void RecordOperation(bool hadConcurrencyException = false) { Interlocked.Increment(ref _totalOperations); if (hadConcurrencyException) { Interlocked.Increment(ref _concurrencyExceptions); } } public double GetConcurrencyExceptionRate() { var total = Interlocked.Read(ref _totalOperations); var exceptions = Interlocked.Read(ref _concurrencyExceptions); return total > 0 ? (double)exceptions / total : 0; } }
总结
本文详细介绍了EF Core与MySQL中的事务和并发处理,包括:
- 事务管理:
-
-
基本事务用法和TransactionScope
-
设置事务隔离级别
-
异步事务处理
-
-
乐观并发控制:
-
配置并发令牌(时间戳或自定义字段)
-
处理DbUpdateConcurrencyException
-
实现自定义冲突解决策略
-
-
悲观并发控制:
-
使用SELECT … FOR UPDATE进行行级锁定
-
处理锁超时和重试机制
-
表级锁定(谨慎使用)
-
-
高级并发模式:
-
版本号实现乐观并发
-
应用级锁机制
-
监控和诊断并发问题
-
在实际应用中,应根据具体场景选择合适的并发控制策略:
-
对于读多写少的场景,乐观并发通常更高效
-
对于写密集或需要强一致性的场景,悲观并发可能更合适
-
总是要考虑超时和重试机制,以提高系统的健壮性
-
监控并发冲突率,及时发现和解决性能瓶颈
正确实施事务和并发控制是构建高并发、高可用应用程序的关键。建议在生产环境中进行充分的压力测试,确保并发控制策略能够满足实际需求。