详解Reactor中Context的用法
临虹路365号 人气:0在响应式编程中,多线程异步性成为天然的内在,多线程之间的切换也成为原生的,在处理一个数据流Flux/Mono时,基本无法知道是运行在哪个线程上或哪个线程池里,可以说,每一个操作符operator以及内部的函数都可能运行在不同的线程上。这就意味着,以前用ThreadLocal来作为方法间透明传递共享变量的方式不再行得通。为此,Reactor提供了Context来替代ThreadLocal实现一个跨线程的共享变量的透明方式。
本文会从以下几个方面来介绍Context的相关知识:
- context的基本用法
- 从源码上解读context的用法
- 用log的MDC案例介绍如何用context实现与threadlocal的桥接
- 总结下context以及目前的一些局限性
一、使用介绍
static String KEY = "TEST_CONTEXT_KEY"; static String KEY2 = "TEST_CONTEXT_KEY2"; public static void main(String[] args) { Flux<String> flux = convert("hello", Flux.just(1, 2, 3)); flux .subscriberContext(Context.of(KEY, "Outside")) .subscribe(v -> System.out.println(v)); } public static Flux<String> convert(String prefix, Flux<Integer> publisher) { return publisher.map(v -> prefix + " " + v) .subscriberContext(Context.of(KEY, "NotUsed")) .flatMap(v -> Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + ctx.get(KEY2) + " " + v)) .subscriberContext(context -> context.put(KEY2, "Inside")) .flatMap(v -> Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + v)); }
上面是context的使用方案介绍,其输出如下:
Outside Outside Inside hello 1
Outside Outside Inside hello 2
Outside Outside Inside hello 3
上面的使用案例展示了一个使用context的常见例子。通过在外部方法里传入context,如flux.subscriberContext(Context.of(KEY, "Outside"))
,使得内部方法convert能够获取外界环境的context,同时内部方法还可以增加自己的context数据,如subscriberContext(context -> context.put(KEY2, "Inside"))
,结合之后,在让内部的方法(flatMap里的方法)感知到整个上下文context的数据内容。
对于context的使用,主要分为几个部分: 1. context的创建 2. context的写入(传入)与读取 3. 执行顺序
1. context —— 不可变对象
由于reactor天然是跨线程的,所以context设计为了不可变的对象,即每次的更新都是创建一个新的对象。每次的put/putAll操作,都是先把旧对象的值复制到新对象,然后再进行put/putAll等更新操作。
2. context的写入与读取
context写入是使用subscriberContext方法,其入参有两种形式:传值方式subscriberContext(ctx)与lambda函数方式 —— subscriberContext(ctx -> ctx.put(key,value))。
context的读取是利用Mono的静态方法subscriberContext()来获取,由于其返回的是一个Mono, 所以通常与flatMap结合使用。
3. 执行顺序
context的传入是发生在subscribe()订阅阶段的,所以其写入的顺序是从下往上的,即在示例中,先执行subscriberContext(Context.of(KEY, "Outside"))
,再执行subscriberContext(context -> context.put(KEY2, "Inside"))
, 最后执行subscriberContext(Context.of(KEY, "NotUsed"))
。在订阅阶段执行完后,进入运行阶段,数据流从上往下执行,每次读取context的时候Mono.subscriberContext()
都是读取下一个的context。所以"NotUsed"的context并没有生效。
此外,context.put()操作是复制旧的再update新的对象,所以Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + ctx.get(KEY2) + " " + v)
这个阶段仍能读取前一个context关于KEY的内容。
总结
- context是不可变对象,每次更新都是新的context
- context是存在于subscriber的内部的,一个context是绑定在当前subscriber上的,如
FluxContextStart
的对象 - context的写入顺序是从下而上的,读取的时候是从上而下的,只能读取之后的subscriber里的context。
- 每个subscriber中的context都是独有的,运行阶段的时候,无法改变其他subscriber的context。
注意
subscriberContext(Context.of("Outside")
与subscriberContext(context -> Context.of("Outside"))
是有区别,前者是会结合复用前面的context,而后者是直接返回一个新的context并不会复用前面的context。 其原因是,subscriberContext(Context.of("Outside"))
其实内部调用的是subscriberContext(context -> context.putAll(Context.of("Outside"))
,其入参的context就是前面的context,putAll方法会复用前面的context。而 subscriberContext(context -> Context.of("Outside"))不复用的原因就是因为放弃了入参的context。所以,可以利用这种方式来放弃之前的context,当然不鼓励这么做,因为你不清楚之前context会不会影响后续的程序。
本文章的代码用的事reactor 3.3的版本,自3.5之后,subscriberContext方法改为contextWrite
,读取的方法改为deferContextual
。
二、源码解读
现在我们从源代码上看看,context写入为什么是自下而上的,读取的时候又是依附于下一个subscriber并且自上而下的。
public final Flux<T> subscriberContext(Function<Context, Context> doOnContext) { return new FluxContextStart<>(this, doOnContext); } FluxContextStart(Flux<? extends T> source, Function<Context, Context> doOnContext) { super(source); this.doOnContext = Objects.requireNonNull(doOnContext, "doOnContext"); } @Override public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) { Context c = doOnContext.apply(actual.currentContext()); return new ContextStartSubscriber<>(actual, c); } ContextStartSubscriber(CoreSubscriber<? super T> actual, Context context) { this.actual = actual; this.context = context; if (actual instanceof ConditionalSubscriber) { this.actualConditional = (ConditionalSubscriber<? super T>) actual; } else { this.actualConditional = null; } } @Override public Context currentContext() { return this.context; }
上面截取了subscriberContext方法的源代码,可以看到subscriberContext方法最终会创建ContextStartSubscriber的对象,并将生成的context赋值Context c = doOnContext.apply(actual.currentContext())
,所以context是伴随subscriberContext方法对应的subscriber里的。
由于context赋值操作Context c = doOnContext.apply(actual.currentContext())
是发生在subscribeOrReturn方法里,即发生在subscribe()订阅阶段,所以整个执行的顺序是自下而上的(沿着整个flow自下而上至源头的publisher)。
那读取context的时候为什么是自上而下的呢?我们来看下读取操作Mono.subscribeContext()的源码。
public static Mono<Context> subscriberContext() { return onAssembly(MonoCurrentContext.INSTANCE); } final class MonoCurrentContext extends Mono<Context> implements Fuseable, Scannable { static final MonoCurrentContext INSTANCE = new MonoCurrentContext(); public void subscribe(CoreSubscriber<? super Context> actual) { Context ctx = actual.currentContext(); actual.onSubscribe(Operators.scalarSubscription(actual, ctx)); } } interface InnerOperator<I, O> extends InnerConsumer<I>, InnerProducer<O> { @Override default Context currentContext() { return actual().currentContext(); } }
Mono.subscribeContext()
方法返回的是一个MonoCurrentContext的静态对象,在订阅subscribe时期,就会去读取当前的context,即Context ctx = actual.currentContext()
。而对于一个InnerOperator的接口而言,其currentContext()方法会不断寻找下一个subscriber的context,即 actual().currentContext()
,直到有哪个subscriber覆写了currentContext方法,如先前的ContextStartSubscriber对象。对于InnerOperator接口,是大多数subscriber都会实现的接口,例如map、filter、flatmap这些,都会实现这个接口。
在找到context之后,通过Operators.scalarSubscription(actual, ctx)
写入,这个方法其实也是Mono.just()的实现,所以相当于把context当做value,生成了一个Mono.just(ctx)来完成了context读取。
所以,context读取的是从当前操作operator之后的那个最接近的subscriber的context。这也解释了前面使用案例中,subscriberContext(Context.of(KEY, "NotUsed"))
,没有作用的缘故。
三、如何桥接现有的ThreadLocal系统
虽然reactor提供了context来替代ThreadLocal的使用,但目前大多数的代码库仍然是命令式编程的,使用的方式仍然是基于ThreadLocal的,如Logger里的MDC。本小节以Logger中的MDC来介绍,如何利用context实现与旧系统中的基于ThreadLocal方式的打通。
我们假设有这样的一个场景,每一次的Http请求都有一个trace id,我们称为request id,并通过Http Header "X-Request-Id"来命名,打印日志的时候,希望每条日志里都包含请求id,这样方便跟踪整个请求链路的情况。
为此,我们把日志配置里的pattern设置为:[%X{X-Request-Id}] [%thread] %-5level - %msg %n
。
可以在SpringBoot
的application.yml
里设置,如:
logging.pattern.level: "[%X{X-Request-Id}] [%thread] %-5level - %msg %n"
因此,要使得每条日志里有request id,那就必须要MDC里有key为X-Request-Id
的内容。下面来看下,reactor中是如何实现的。
@SpringBootApplication @Slf4j @RestController public class MdcApplication { public static void main(String[] args) { SpringApplication.run(MdcApplication.class, args); } private final static String X_REQUEST_ID_KEY = "X-Request-Id"; @GetMapping("/") Flux<String> split(@RequestParam("value") String value, @RequestHeader(X_REQUEST_ID_KEY) String requestId) { return Flux.fromArray(value.split("_")) .doOnEach(logWithContext(ch -> log.info("handling one item: {}", ch))) .subscriberContext(Context.of(X_REQUEST_ID_KEY, requestId)); } private static <T> Consumer<Signal<T>> logWithContext(Consumer<T> logStatement) { return signal -> { if (!signal.isOnNext()) { return; } String requestId = signal.getContext().get(X_REQUEST_ID_KEY); try (MDC.MDCCloseable closeable = MDC.putCloseable(X_REQUEST_ID_KEY, requestId)) { logStatement.accept(signal.get()); } }; } }
这是一个简单的示例程序,对于请求输入的value值通过"-"分割后,再一个个返回给客户端。首先利用subscriberContext方法,将http header里的X-Request-Id
作为context来传入。然后利用doOnEach的方式获取signal。doOnEach的方法可以工作在onNext、onComplete、onError等所有事件,每一个信号signal里都包含有context,当为onNext则还包含value值,当为onError时,则还包含有exception。因此可以通过signal来获取context。
在从context获取X-Request-Id后,可以利用try-with-resource方式来更新MDC,其效果是在执行完try里面的程序后,将更新的value回退。等价于:
try { MDC.put(X_REQUEST_ID_KEY, requestId); logStatement.accept(signal.get()); } finally { MDC.remove(X_REQUEST_ID_KEY); }
置于为什么需要操作完之后回退掉MDC中的更新,那是因为reactor中所有的操作都是异步执行在不同线程中的,如果不回退的话,很有可能造成污染,其原因还是MDC内部是用ThreadLocal实现的,所以跨线程的时候,如果不把ThreadLocal值清理干净,很容易造成互相污染。
用curl命令发送请求:curl --header "X-Request-Id:12345" localhost:8080?value=a_b_c
,返回的结果是abc
,打印的日志如下:
[12345] [reactor-http-nio-2] INFO - handling one item: a
[12345] [reactor-http-nio-2] INFO - handling one item: b
[12345] [reactor-http-nio-2] INFO - handling one item: c
其中12345
就是从context里获取到的request id。
如果想要将request id继续贯穿后续请求流程,如请求第三方服务,可以在用webClient发送请求的时候,把request id作为header加入到它的request请求里,如:
Mono.subscriberContext().map(ctx -> { RequestHeadersSpec<?> request = webClient.get().uri(uri); request = request.header("X-Request-ID", ctx.get(X_REQUEST_ID_KEY)); // The rest of your request logic... });
四、总结
本文介绍了reactor中context的概念,并用代码示例的方式介绍了如何使用。再然后,通过源码的解读来加深对context使用规则的理解:自下而上的context写入,以及与subscriber绑定后的自上而下的读取。 在这之后,用以传递并打印日志中包含request id的一个实际例子,来介绍如何使用context与log的MDC一起使用。
虽然reactor自3.1开始提供了context来弥补无法使用ThreadLocal的不足,但与ThreaLocal相比,context仍然有不少局限。比如使用上的不方便,要么利用Mono.subscribeContext().map并搭配flatmap来使用,要么需要将数据流转化成信号signal流来使用,总之远不如ThreadLocal来的简单易用。另外,context的不可变特性,虽然有助于thread safe,但使得不同方法之间无法传递更新,比如方法A内修改后再传递给方法B,因为context是只读的,但这在ThreadLocal上却是轻而易举就能实现。
好消息的是,reactor在3.5开始,提供了新的方法deferContextual来简化context的使用。以及提出了context view的概念来简化context传递问题,感兴趣的可以阅读reactor文档。
加载全部内容