Flutter数据操作原子性
水花DX 人气:0前言
Flutter 是单线程架构,按道理理说,Flutter 不会出现 Java 的多线程相关的问题。
但在我使用 Flutter 过程中,却发现 Flutter 依然会存在数据操作原子性的问题。
其实 Flutter 中存在多线程的(Isolate 隔离池),只是 Flutter 中的多线程更像 Java 中的多进程,因为 Flutter 中线程不能像 Java 一样,可以两个线程去操作同一个对象。
我们一般将计算任务放在 Flutter 单独的线程中,例如一大段 Json 数据的解析,可以将解析计算放在单独的线程中,然后将解析完后的 Map<String, dynamic> 返回到主线程来用。
Flutter单例模式
在 Java 中,我们一般喜欢用单例模式来理解 Java 多线程问题。这里我们也以单例来举例,我们先来一个正常的:
class FlutterSingleton { static FlutterSingleton? _instance; /// 将构造方法声明成私有的 FlutterSingleton._(); static FlutterSingleton getInstance() { if (_instance == null) { _instance = FlutterSingleton._(); } return _instance!; } }
由于 Flutter 是单线程架构的, 所以上述代码是没有问题的。
问题示例
但是, 和 Java 不同的是, Flutter 中存在异步方法。
做 App 开发肯定会涉及到数据持久化,Android 开发应该都熟悉 SharedPreferences,Flutter 中也存在 SharedPreferences 库,我们就以此来举例。同样实现单例模式,只是这次无可避免的需要使用 Flutter 中的异步:
class SPSingleton { static SPSingleton? _instance; String? data; /// 将构造方法声明成私有的 SPSingleton._fromMap(Map<String, dynamic> map) : data = map['data']; static Future<SPSingleton> _fromSharedPreferences() async { // 模拟从 SharedPreferences 中读取数据, 并以此来初始化当前对象 Map<String, String> map = {'data': 'mockData'}; await Future.delayed(Duration(milliseconds: 10)); return SPSingleton._fromMap(map); } static Future<SPSingleton> getInstance() async { if (_instance == null) { _instance = await SPSingleton._fromSharedPreferences(); } return _instance!; } } void main() async { SPSingleton.getInstance().then((value) { print('instance1.hashcode = ${value.hashCode}'); }); SPSingleton.getInstance().then((value) { print('instance2.hashcode = ${value.hashCode}'); }); }
运行上面的代码,打印日志如下:
instance1.hashcode = 428834223
instance2.hashcode = 324692380
可以发现,我们两次调用 SPSingleton.getInstance() 方法,分别创建了两个对象,说明上面的单例模式实现有问题。
我们来分析一下 getInstance() 方法:
static Future<SPSingleton> getInstance() async { if (_instance == null) { // 1 _instance = await SPSingleton._fromSharedPreferences(); //2 } return _instance!; }
当第一次调用 getInstance() 方法时,代码在运行到 1 处时,发现 _instance 为 null, 就会进入 if 语句里面执行 2 处, 并因为 await 关键字挂起, 并交出代码的执行权, 直到被 await 的 Future 执行完毕,最后将创建的 SPSingleton 对象赋值给 _instance 并返回。
当第二次调用 getInstance() 方法时,代码在运行到 1 处时,可能会发现 _instance 还是为 null (因为 await SPSingleton._fromSharedPreferences() 需要 10ms 才能返回结果), 然后和第一次调用 getInstance() 方法类似, 创建新的 SPSingleton 对象赋值给 _instance。
最后导致两次调用 getInstance() 方法, 分别创建了两个对象。
解决办法
问题原因知道了,那么该怎样解决这个问题呢?
究其本质,就是 getInstance() 方法的执行不具有原子性,即:在一次 getInstance() 方法执行结束前,不能执行下一次 getInstance() 方法。
幸运的是, 我们可以借助 Completer 来将异步操作原子化,下面是借助 Completer 改造后的代码:
import 'dart:async'; class SPSingleton { static SPSingleton? _instance; static Completer<bool>? _monitor; String? data; /// 将构造方法声明成私有的 SPSingleton._fromMap(Map<String, dynamic> map) : data = map['data']; static Future<SPSingleton> _fromSharedPreferences() async { // 模拟从 SharedPreferences 中读取数据, 并以此来初始化当前对象 Map<String, String> map = {'data': 'mockData'}; await Future.delayed(Duration(milliseconds: 10)); return SPSingleton._fromMap(map); } static Future<SPSingleton> getInstance() async { if (_instance == null) { if (_monitor == null) { _monitor = Completer<bool>(); _instance = await SPSingleton._fromSharedPreferences(); _monitor!.complete(true); } else { // Flutter 的 Future 支持被多次 await await _monitor!.future; _monitor = null; } } return _instance!; } } void main() async { SPSingleton.getInstance().then((value) { print('instance1.hashcode = ${value.hashCode}'); }); SPSingleton.getInstance().then((value) { print('instance2.hashcode = ${value.hashCode}'); }); }
我们再次分析一下 getInstance() 方法:
static Future<SPSingleton> getInstance() async { if (_instance == null) { // 1 if (_monitor == null) { // 2 _monitor = Completer<bool>(); // 3 _instance = await SPSingleton._fromSharedPreferences(); // 4 _monitor!.complete(true); // 5 } else { // Flutter 的 Future 支持被多次 await await _monitor!.future; //6 _monitor = null; } } return _instance!; // 7 }
当第一次调用 getInstance() 方法时, 1 处和 2 处都会判定为 true, 然后进入执行到 3 处创建一个的 Completer 对象, 然后在 4 的 await 处挂起, 并交出代码的执行权, 直到被 await 的 Future 执行完毕。
此时第二次调用的 getInstance() 方法开始执行,1 处同样会判定为 true, 但是到 2 处时会判定为 false, 从而进入到 else, 并因为 6 处的 await 挂起, 并交出代码的执行权;
此时, 第一次调用 getInstance() 时的 4 处执行完毕, 并执行到 5, 并通过 Completer 通知第二次调用的 getInstance() 方法可以等待获取代码执行权了。
最后,两次调用 getInstance() 方法都会返回同一个 SPSingleton 对象,以下是打印日志:
instance1.hashcode = 786567983
instance2.hashcode = 786567983
由于 Flutter 的 Future 是支持多次 await 的, 所以即便是连续 n 次调用 getInstance() 方法, 从第 2 到 n 次调用会 await 同一个 Completer.future, 最后也能返回同一个对象。
Flutter任务队列
虽然我们经常拿单例模式来解释说明 Java 多线程问题,可这并不代表着 Java 只有在单例模式时才有多线程问题。
同样的,也并不代表着 Flutter 只有在单例模式下才有原子操作问题。
问题示例
我们同样以数据持久化来举例,只是这次我们以数据库操作来举例。
我们在操作数据库时,经常会有这样的需求:如果数据库表中存在这条数据,就更新这条数据,否则就插入这条数据。
为了实现这样的需求,我们可能会先从数据库表中查询数据,查询到了就更新,没查询到就插入,代码如下:
class Item { int id; String data; Item({ required this.id, required this.data, }); } class DBTest { DBTest._(); static DBTest instance = DBTest._(); bool _existsData = false; Future<void> insert(String data) async { // 模拟数据库插入操作,10毫秒过后,数据库中才有数据 await Future.delayed(Duration(milliseconds: 10)); _existsData = true; print('执行了插入'); } Future<void> update(String data) async { // 模拟数据库更新操作 await Future.delayed(Duration(milliseconds: 10)); print('执行了更新'); } Future<Item?> selected(int id) async { // 模拟数据库查询操作 await Future.delayed(Duration(milliseconds: 10)); if (_existsData) { // 数据库中有数据才返回 return Item(id: 1, data: 'mockData'); } else { // 数据库没有数据时,返回null return null; } } /// 先从数据库表中查询数据,查询到了就更新,没查询到就插入 Future<void> insertOrUpdate(int id, String data) async { Item? item = await selected(id); if (item == null) { await insert(data); } else { await update(data); } } } void main() async { DBTest.instance.insertOrUpdate(1, 'data'); DBTest.instance.insertOrUpdate(1, 'data'); }
我们期望的输出日志为:
执行了插入
执行了更新
但不幸的是, 输出的日志为:
执行了插入
执行了插入
原因也是异步方法操作数据, 不是原子操作, 导致逻辑异常。
也许我们也可以效仿单例模式的实现,利用 Completer 将 insertOrUpdate() 方法原子化。
但对于数据库操作是不合适的,因为我们可能还有其它需求,比如说:调用插入数据的方法,然后立即从数据库中查询这条数据,发现找不到。
如果强行使用 Completer,那么到最后,可能这个类中会出现一大堆的 Completer ,代码难以维护。
解决办法
其实我们想要的效果是,当有异步方法在操作数据库时,别的操作数据的异步方法应该阻塞住,也就是同一时间只能有一个方法来操作数据库。我们其实可以使用任务队列来实现数据库操作的需求。
我这里利用 Completer 实现了一个任务队列:
import 'dart:async'; import 'dart:collection'; /// TaskQueue 不支持 submit await submit, 以下代码就存在问题 /// /// TaskQueue taskQueue = TaskQueue(); /// Future<void> task1(String arg)async{ /// await Future.delayed(Duration(milliseconds: 100)); /// } /// Future<void> task2(String arg)async{ /// 在这里submit时, 任务会被添加到队尾, 且当前方法任务不会结束 /// 添加到队尾的任务必须等到当前方法任务执行完毕后, 才能继续执行 /// 而队尾的任务必须等当前任务执行完毕后, 才能执行 /// 这就导致相互等待, 使任务无法进行下去 /// 解决办法是, 移除当前的 await, 让当前任务结束 /// await taskQueue.submit(task1, arg); /// } /// /// taskQueue.submit(task2, arg); /// /// 总结: /// 被 submit 的方法的内部如果调用 submit 方法, 此方法不能 await, 否则任务队列会被阻塞住 /// /// 如何避免此操作, 可以借鉴以下思想: /// 以数据库操作举例, 有个save方法的逻辑是插入或者更新(先查询数据库select,再进行下一步操作); /// sava方法内部submit,并且select也submit, 就容易出现submit await submit的情况 /// /// 我们可以这样操作,假设当前类为 DBHelper: /// 将数据库的增,删,查,改操作封装成私有的 async 方法, 且私有方法不能使用submit /// DBHelper的公有方法, 可以调用自己的私有 async 方法, 但不能调用自己的公有方法, 公有方法可以使用submit /// 这样就不会存在submit await submit的情况了 class TaskQueue { /// 提交任务 Future<O> submit<A, O>(Function fun, A? arg) async { if (!_isEnable) { throw Exception('current TaskQueue is recycled.'); } Completer<O> result = new Completer<O>(); if (!_isStartLoop) { _isStartLoop = true; _startLoop(); } _queue.addLast(_Runnable<A, O>( fun: fun, arg: arg, completer: result, )); if (!(_emptyMonitor?.isCompleted ?? true)) { _emptyMonitor?.complete(); } return result.future; } /// 回收 TaskQueue void recycle() { _isEnable = false; if (!(_emptyMonitor?.isCompleted ?? true)) { _emptyMonitor?.complete(); } _queue.clear(); } Queue<_Runnable> _queue = Queue<_Runnable>(); Completer? _emptyMonitor; bool _isStartLoop = false; bool _isEnable = true; Future<void> _startLoop() async { while (_isEnable) { if (_queue.isEmpty) { _emptyMonitor = new Completer(); await _emptyMonitor!.future; _emptyMonitor = null; } if (!_isEnable) { // 当前TaskQueue不可用时, 跳出循环 return; } _Runnable runnable = _queue.removeFirst(); try { dynamic result = await runnable.fun(runnable.arg); runnable.completer.complete(result); } catch (e) { runnable.completer.completeError(e); } } } } class _Runnable<A, O> { final Completer<O> completer; final Function fun; final A? arg; _Runnable({ required this.completer, required this.fun, this.arg, }); }
由于 Flutter 中的 future 不支持暂停操作, 一旦开始执行, 就只能等待执行完。
所以这里的任务队列实现是基于方法的延迟调用来实现的。
TaskQueue 的用法示例如下:
void main() async { Future<void> test1(String data) async { await Future.delayed(Duration(milliseconds: 20)); print('执行了test1'); } Future<String> test2(Map<String, dynamic> args) async { await Future.delayed(Duration(milliseconds: 10)); print('执行了test2'); return 'mockResult'; } TaskQueue taskQueue = TaskQueue(); taskQueue.submit(test1, '1'); taskQueue.submit(test2, { 'data1': 1, 'data2': '2', }).then((value) { print('test2返回结果:${value}'); }); await Future.delayed(Duration(milliseconds: 200)); taskQueue.recycle(); } /* 执行输出结果如下: 执行了test1 执行了test2 test2返回结果:mockResult */
值得注意的是: 这里的 TaskQueue 不支持 submit await submit, 原因及示例代码已在注释中说明,这里不再赘述。
为了避免出现 submit await submit 的情况,我代码注释中也做出了建议(假设当前类为 DBHelper):
将数据库的增、删、查、改操作封装成私有的异步方法, 且私有异步方法不能使用 submit;
DBHelper 的公有方法, 可以调用自己的私有异步方法, 但不能调用自己的公有异步方法, 公有异步方法可以使用 submit;
这样就不会出现 submit await submit 的情况了。
于是,上述的数据库操作示例代码就变成了以下的样子:
class Item { int id; String data; Item({ required this.id, required this.data, }); } class DBTest { DBTest._(); static DBTest instance = DBTest._(); TaskQueue _taskQueue = TaskQueue(); bool _existsData = false; Future<void> _insert(String data) async { // 模拟数据库插入操作,10毫秒过后,数据库才有数据 await Future.delayed(Duration(milliseconds: 10)); _existsData = true; print('执行了插入'); } Future<void> insert(String data) async { await _taskQueue.submit(_insert, data); } Future<void> _update(String data) async { // 模拟数据库更新操作 await Future.delayed(Duration(milliseconds: 10)); print('执行了更新'); } Future<void> update(String data) async { await _taskQueue.submit(_update, data); } Future<Item?> _selected(int id) async { // 模拟数据库查询操作 await Future.delayed(Duration(milliseconds: 10)); if (_existsData) { // 数据库中有数据才返回 return Item(id: 1, data: 'mockData'); } else { // 数据库没有数据时,返回null return null; } } Future<Item?> selected(int id) async { return await _taskQueue.submit(_selected, id); } /// 先从数据库表中查询数据,查询到了就更新,没查询到就插入 Future<void> _insertOrUpdate(Map<String, dynamic> args) async { int id = args['id']; String data = args['data']; Item? item = await _selected(id); if (item == null) { await _insert(data); } else { await _update(data); } } Future<Item?> insertOrUpdate(int id, String data) async { return await _taskQueue.submit(_insertOrUpdate, { 'id': id, 'data': data, }); } } void main() async { DBTest.instance.insertOrUpdate(1, 'data'); DBTest.instance.insertOrUpdate(1, 'data'); }
输出日志也变成了我们期望的样子:
执行了插入
执行了更新
总结
Flutter 异步方法修改数据时, 一定要注意数据操作的原子性, 不能因为 Flutter 是单线程架构,就忽略多个异步方法竞争导致数据异常的问题。
Flutter 保证数据操作的原子性,也有可行办法,当逻辑比较简单时,可直接使用 Completer,当逻辑比较复杂时,可以考虑使用任务队列。
另外,本文中的任务队列实现有很大的缺陷,不支持 submit await submit,否则整个任务队列会被阻塞住。
加载全部内容