亲宝软件园·资讯

展开

Flutter数据操作原子性

水花DX 人气:0

前言

Flutter 是单线程架构,按道理理说,Flutter 不会出现 Java 的多线程相关的问题。

但在我使用 Flutter 过程中,却发现 Flutter 依然会存在数据操作原子性的问题。

其实 Flutter 中存在多线程的(Isolate 隔离池),只是 Flutter 中的多线程更像 Java 中的多进程,因为 Flutter 中线程不能像 Java 一样,可以两个线程去操作同一个对象。

我们一般将计算任务放在 Flutter 单独的线程中,例如一大段 Json 数据的解析,可以将解析计算放在单独的线程中,然后将解析完后的 Map<String, dynamic> 返回到主线程来用。

Flutter单例模式

在 Java 中,我们一般喜欢用单例模式来理解 Java 多线程问题。这里我们也以单例来举例,我们先来一个正常的:

class FlutterSingleton {
  static FlutterSingleton? _instance;

  /// 将构造方法声明成私有的
  FlutterSingleton._();

  static FlutterSingleton getInstance() {
    if (_instance == null) {
      _instance = FlutterSingleton._();
    }
    return _instance!;
  }
}

由于 Flutter 是单线程架构的, 所以上述代码是没有问题的。

问题示例

但是, 和 Java 不同的是, Flutter 中存在异步方法。

做 App 开发肯定会涉及到数据持久化,Android 开发应该都熟悉 SharedPreferences,Flutter 中也存在 SharedPreferences 库,我们就以此来举例。同样实现单例模式,只是这次无可避免的需要使用 Flutter 中的异步:

class SPSingleton {
  static SPSingleton? _instance;

  String? data;

  /// 将构造方法声明成私有的
  SPSingleton._fromMap(Map<String, dynamic> map) : data = map['data'];

  static Future<SPSingleton> _fromSharedPreferences() async {
    // 模拟从 SharedPreferences 中读取数据, 并以此来初始化当前对象
    Map<String, String> map = {'data': 'mockData'};
    await Future.delayed(Duration(milliseconds: 10));
    return SPSingleton._fromMap(map);
  }

  static Future<SPSingleton> getInstance() async {
    if (_instance == null) {
      _instance = await SPSingleton._fromSharedPreferences();
    }
    return _instance!;
  }
}

void main() async {
  SPSingleton.getInstance().then((value) {
    print('instance1.hashcode = ${value.hashCode}');
  });
  SPSingleton.getInstance().then((value) {
    print('instance2.hashcode = ${value.hashCode}');
  });
}

运行上面的代码,打印日志如下:

instance1.hashcode = 428834223
instance2.hashcode = 324692380

可以发现,我们两次调用 SPSingleton.getInstance() 方法,分别创建了两个对象,说明上面的单例模式实现有问题。

我们来分析一下 getInstance() 方法:

static Future<SPSingleton> getInstance() async {
  if (_instance == null) { // 1
    _instance = await SPSingleton._fromSharedPreferences(); //2
  }
  return _instance!;
}

当第一次调用 getInstance() 方法时,代码在运行到 1 处时,发现 _instance 为 null, 就会进入 if 语句里面执行 2 处, 并因为 await 关键字挂起, 并交出代码的执行权, 直到被 await 的 Future 执行完毕,最后将创建的 SPSingleton 对象赋值给 _instance 并返回。

当第二次调用 getInstance() 方法时,代码在运行到 1 处时,可能会发现 _instance 还是为 null (因为 await SPSingleton._fromSharedPreferences() 需要 10ms 才能返回结果), 然后和第一次调用 getInstance() 方法类似, 创建新的 SPSingleton 对象赋值给 _instance。

最后导致两次调用 getInstance() 方法, 分别创建了两个对象。

解决办法

问题原因知道了,那么该怎样解决这个问题呢?

究其本质,就是 getInstance() 方法的执行不具有原子性,即:在一次 getInstance() 方法执行结束前,不能执行下一次 getInstance() 方法。

幸运的是, 我们可以借助 Completer 来将异步操作原子化,下面是借助 Completer 改造后的代码:

import 'dart:async';

class SPSingleton {
  static SPSingleton? _instance;
  static Completer<bool>? _monitor;

  String? data;

  /// 将构造方法声明成私有的
  SPSingleton._fromMap(Map<String, dynamic> map) : data = map['data'];

  static Future<SPSingleton> _fromSharedPreferences() async {
    // 模拟从 SharedPreferences 中读取数据, 并以此来初始化当前对象
    Map<String, String> map = {'data': 'mockData'};
    await Future.delayed(Duration(milliseconds: 10));
    return SPSingleton._fromMap(map);
  }

  static Future<SPSingleton> getInstance() async {
    if (_instance == null) {
      if (_monitor == null) {
        _monitor = Completer<bool>();
        _instance = await SPSingleton._fromSharedPreferences();
        _monitor!.complete(true);
      } else {
        // Flutter 的 Future 支持被多次 await
        await _monitor!.future;
        _monitor = null;
      }
    }
    return _instance!;
  }
}

void main() async {
  SPSingleton.getInstance().then((value) {
    print('instance1.hashcode = ${value.hashCode}');
  });
  SPSingleton.getInstance().then((value) {
    print('instance2.hashcode = ${value.hashCode}');
  });
}

我们再次分析一下 getInstance() 方法:

static Future<SPSingleton> getInstance() async {
  if (_instance == null) { // 1
    if (_monitor == null) { // 2
      _monitor = Completer<bool>(); // 3
      _instance = await SPSingleton._fromSharedPreferences(); // 4
      _monitor!.complete(true); // 5
    } else {
      // Flutter 的 Future 支持被多次 await
      await _monitor!.future; //6
      _monitor = null;
    }
  }
  return _instance!; // 7
}

当第一次调用 getInstance() 方法时, 1 处和 2 处都会判定为 true, 然后进入执行到 3 处创建一个的 Completer 对象, 然后在 4 的 await 处挂起, 并交出代码的执行权, 直到被 await 的 Future 执行完毕。

此时第二次调用的 getInstance() 方法开始执行,1 处同样会判定为 true, 但是到 2 处时会判定为 false, 从而进入到 else, 并因为 6 处的 await 挂起, 并交出代码的执行权;

此时, 第一次调用 getInstance() 时的 4 处执行完毕, 并执行到 5, 并通过 Completer 通知第二次调用的 getInstance() 方法可以等待获取代码执行权了。

最后,两次调用 getInstance() 方法都会返回同一个 SPSingleton 对象,以下是打印日志:

instance1.hashcode = 786567983
instance2.hashcode = 786567983

由于 Flutter 的 Future 是支持多次 await 的, 所以即便是连续 n 次调用 getInstance() 方法, 从第 2 到 n 次调用会 await 同一个 Completer.future, 最后也能返回同一个对象。

Flutter任务队列

虽然我们经常拿单例模式来解释说明 Java 多线程问题,可这并不代表着 Java 只有在单例模式时才有多线程问题。

同样的,也并不代表着 Flutter 只有在单例模式下才有原子操作问题。

问题示例

我们同样以数据持久化来举例,只是这次我们以数据库操作来举例。

我们在操作数据库时,经常会有这样的需求:如果数据库表中存在这条数据,就更新这条数据,否则就插入这条数据。

为了实现这样的需求,我们可能会先从数据库表中查询数据,查询到了就更新,没查询到就插入,代码如下:

class Item {
  int id;
  String data;
  Item({
    required this.id,
    required this.data,
  });
}

class DBTest {
  DBTest._();
  static DBTest instance = DBTest._();
  bool _existsData = false;
  Future<void> insert(String data) async {
    // 模拟数据库插入操作,10毫秒过后,数据库中才有数据
    await Future.delayed(Duration(milliseconds: 10));
    _existsData = true;
    print('执行了插入');
  }

  Future<void> update(String data) async {
    // 模拟数据库更新操作
    await Future.delayed(Duration(milliseconds: 10));
    print('执行了更新');
  }

  Future<Item?> selected(int id) async {
    // 模拟数据库查询操作
    await Future.delayed(Duration(milliseconds: 10));
    if (_existsData) {
      // 数据库中有数据才返回
      return Item(id: 1, data: 'mockData');
    } else {
      // 数据库没有数据时,返回null
      return null;
    }
  }

  /// 先从数据库表中查询数据,查询到了就更新,没查询到就插入
  Future<void> insertOrUpdate(int id, String data) async {
    Item? item = await selected(id);
    if (item == null) {
      await insert(data);
    } else {
      await update(data);
    }
  }
}

void main() async {
  DBTest.instance.insertOrUpdate(1, 'data');
  DBTest.instance.insertOrUpdate(1, 'data');
}

我们期望的输出日志为:

执行了插入
执行了更新

但不幸的是, 输出的日志为:

执行了插入
执行了插入

原因也是异步方法操作数据, 不是原子操作, 导致逻辑异常。

也许我们也可以效仿单例模式的实现,利用 Completer 将 insertOrUpdate() 方法原子化。

但对于数据库操作是不合适的,因为我们可能还有其它需求,比如说:调用插入数据的方法,然后立即从数据库中查询这条数据,发现找不到。

如果强行使用 Completer,那么到最后,可能这个类中会出现一大堆的 Completer ,代码难以维护。

解决办法

其实我们想要的效果是,当有异步方法在操作数据库时,别的操作数据的异步方法应该阻塞住,也就是同一时间只能有一个方法来操作数据库。我们其实可以使用任务队列来实现数据库操作的需求。

我这里利用 Completer 实现了一个任务队列:

import 'dart:async';
import 'dart:collection';

/// TaskQueue 不支持 submit await submit, 以下代码就存在问题
///
/// TaskQueue taskQueue = TaskQueue();
/// Future<void> task1(String arg)async{
///   await Future.delayed(Duration(milliseconds: 100));
/// }
/// Future<void> task2(String arg)async{
///   在这里submit时, 任务会被添加到队尾, 且当前方法任务不会结束
///   添加到队尾的任务必须等到当前方法任务执行完毕后, 才能继续执行
///   而队尾的任务必须等当前任务执行完毕后, 才能执行
///   这就导致相互等待, 使任务无法进行下去
///   解决办法是, 移除当前的 await, 让当前任务结束
///   await taskQueue.submit(task1, arg);
/// }
///
/// taskQueue.submit(task2, arg);
///
/// 总结:
/// 被 submit 的方法的内部如果调用 submit 方法, 此方法不能 await, 否则任务队列会被阻塞住
///
/// 如何避免此操作, 可以借鉴以下思想:
/// 以数据库操作举例, 有个save方法的逻辑是插入或者更新(先查询数据库select,再进行下一步操作);
/// sava方法内部submit,并且select也submit, 就容易出现submit await submit的情况
///
/// 我们可以这样操作,假设当前类为 DBHelper:
/// 将数据库的增,删,查,改操作封装成私有的 async 方法, 且私有方法不能使用submit
/// DBHelper的公有方法, 可以调用自己的私有 async 方法, 但不能调用自己的公有方法, 公有方法可以使用submit
/// 这样就不会存在submit await submit的情况了
class TaskQueue {
  /// 提交任务
  Future<O> submit<A, O>(Function fun, A? arg) async {
    if (!_isEnable) {
      throw Exception('current TaskQueue is recycled.');
    }
    Completer<O> result = new Completer<O>();

    if (!_isStartLoop) {
      _isStartLoop = true;
      _startLoop();
    }

    _queue.addLast(_Runnable<A, O>(
      fun: fun,
      arg: arg,
      completer: result,
    ));
    if (!(_emptyMonitor?.isCompleted ?? true)) {
      _emptyMonitor?.complete();
    }

    return result.future;
  }

  /// 回收 TaskQueue
  void recycle() {
    _isEnable = false;
    if (!(_emptyMonitor?.isCompleted ?? true)) {
      _emptyMonitor?.complete();
    }
    _queue.clear();
  }

  Queue<_Runnable> _queue = Queue<_Runnable>();
  Completer? _emptyMonitor;
  bool _isStartLoop = false;
  bool _isEnable = true;

  Future<void> _startLoop() async {
    while (_isEnable) {
      if (_queue.isEmpty) {
        _emptyMonitor = new Completer();
        await _emptyMonitor!.future;
        _emptyMonitor = null;
      }

      if (!_isEnable) {
        // 当前TaskQueue不可用时, 跳出循环
        return;
      }

      _Runnable runnable = _queue.removeFirst();
      try {
        dynamic result = await runnable.fun(runnable.arg);
        runnable.completer.complete(result);
      } catch (e) {
        runnable.completer.completeError(e);
      }
    }
  }
}

class _Runnable<A, O> {
  final Completer<O> completer;
  final Function fun;
  final A? arg;

  _Runnable({
    required this.completer,
    required this.fun,
    this.arg,
  });
}

由于 Flutter 中的 future 不支持暂停操作, 一旦开始执行, 就只能等待执行完。

所以这里的任务队列实现是基于方法的延迟调用来实现的。

TaskQueue 的用法示例如下:

void main() async {
  Future<void> test1(String data) async {
    await Future.delayed(Duration(milliseconds: 20));
    print('执行了test1');
  }

  Future<String> test2(Map<String, dynamic> args) async {
    await Future.delayed(Duration(milliseconds: 10));
    print('执行了test2');
    return 'mockResult';
  }

  TaskQueue taskQueue = TaskQueue();
  taskQueue.submit(test1, '1');
  taskQueue.submit(test2, {
    'data1': 1,
    'data2': '2',
  }).then((value) {
    print('test2返回结果:${value}');
  });

  await Future.delayed(Duration(milliseconds: 200));
  taskQueue.recycle();
}
/*
执行输出结果如下:

执行了test1
执行了test2
test2返回结果:mockResult
*/

值得注意的是: 这里的 TaskQueue 不支持 submit await submit, 原因及示例代码已在注释中说明,这里不再赘述。

为了避免出现 submit await submit 的情况,我代码注释中也做出了建议(假设当前类为 DBHelper):

这样就不会出现 submit await submit 的情况了。

于是,上述的数据库操作示例代码就变成了以下的样子:

class Item {
  int id;
  String data;
  Item({
    required this.id,
    required this.data,
  });
}

class DBTest {
  DBTest._();
  static DBTest instance = DBTest._();
  TaskQueue _taskQueue = TaskQueue();
  bool _existsData = false;
  Future<void> _insert(String data) async {
    // 模拟数据库插入操作,10毫秒过后,数据库才有数据
    await Future.delayed(Duration(milliseconds: 10));
    _existsData = true;
    print('执行了插入');
  }

  Future<void> insert(String data) async {
    await _taskQueue.submit(_insert, data);
  }

  Future<void> _update(String data) async {
    // 模拟数据库更新操作
    await Future.delayed(Duration(milliseconds: 10));
    print('执行了更新');
  }

  Future<void> update(String data) async {
    await _taskQueue.submit(_update, data);
  }

  Future<Item?> _selected(int id) async {
    // 模拟数据库查询操作
    await Future.delayed(Duration(milliseconds: 10));
    if (_existsData) {
      // 数据库中有数据才返回
      return Item(id: 1, data: 'mockData');
    } else {
      // 数据库没有数据时,返回null
      return null;
    }
  }

  Future<Item?> selected(int id) async {
    return await _taskQueue.submit(_selected, id);
  }

  /// 先从数据库表中查询数据,查询到了就更新,没查询到就插入
  Future<void> _insertOrUpdate(Map<String, dynamic> args) async {
    int id = args['id'];
    String data = args['data'];
    Item? item = await _selected(id);
    if (item == null) {
      await _insert(data);
    } else {
      await _update(data);
    }
  }

  Future<Item?> insertOrUpdate(int id, String data) async {
    return await _taskQueue.submit(_insertOrUpdate, {
      'id': id,
      'data': data,
    });
  }
}

void main() async {
  DBTest.instance.insertOrUpdate(1, 'data');
  DBTest.instance.insertOrUpdate(1, 'data');
}

输出日志也变成了我们期望的样子:

执行了插入
执行了更新

总结

另外,本文中的任务队列实现有很大的缺陷,不支持 submit await submit,否则整个任务队列会被阻塞住。

加载全部内容

相关教程
猜你喜欢
用户评论