Skip to content

kwanCCC/rx-fun

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RxJava Extension

20 example use Java CompletableFuture

为什么这个被名为CompletableFuture

首先可拆解为 CompletableFuture ,因为其实现了 CompletionStagefuture,首先我们可以看 CompletionStage 的接口的Doc (PS:写好文档无比重要),首先它代表了了一个特定的计算阶段,并且可以被同步或是异步的完成,加上 Future 提供的功能,即为一个未完成的计算单元 ,想象一下,Stage I -> Stage II -> .... 依次类推,这就组成了一条计算的流水线,一个个的完成,一个触发一个

简单的开始

预定义结果创建CompletableFuture

public static void simpleCompletableFuture(){
    CompletableFuture someone = CompletableFuture.completedFuture("I will come")
    someone.isDone()
    assertEquals("I will come", someone.getNow(null));
} 

getNow(null)会在Future完成的情况下返回,但是这个鬼例子🌰和同步有毛区别,别急

简单的异步

public static void runAsAsync(){
    someFuture = CompletableFuture.runAsAsync(()->{
        // go make some coffe,I would finish it
    });
    task_a_break()
    assertTrue(someFuture.isDone)
}

如上的例子 CompletableFuture 中以 Async 结尾的方法,在没有指定 Executor 的情况下会使用 ForkJoinPoolcommonPool ,注意它使用的是守护线程

来一个之前说的 Stage I -> Stage II 的例子

public static void syncPipline(){
    syncPipline = CompletableFuture.completedFuture("I am coming with sugar").thenApply(()-{
        //I need to get some coffee
    });
    syncPipline.getNow(null)
}

这个例子形象的说明两个动作,先拿糖,之后泡了一杯咖啡, then 意味着需要上一阶段得完成, apply 意味着会对上一阶段基础的结果执行,所以函数会阻塞

接着上一个 A 去拿糖 B 去接热水

public static void AsyncPipline(){
    aysnc = CompletableFuture.completedFuture("A: I will take some sugar").thenApplyAsync(()->{
        //B : I got hot water
    });
    // I need to wait here ,Because I have nothing
    assertNull(aysnc.getNow(null))
    // Afterwards, I can easily make a cup of coffee
    aysnc.join()
}

thenAcceptAsync 串联CompletableFuture,异步执行,最后记得 join()

出现计算异常该怎么办

这次选择 thenAcceptAsync(Function,Executor) ,选择一个自定义的 Executor


public static void something_would_happen(){
    something = CompletableFuture.completedFuture("A : I will take some sugar").thenAcceptAysnc(fn->{
        // "B : I will take hot water"
    },DelayedExecutor(5,TimeUnit.Minutes));
    exhand = somehtin.handle((s, th) -> { return (th != null) ? "where is water" : ""; });
                 cf.completeExceptionally(new RuntimeException("completed exceptionally"));
    somehting.join();
    // then somehting wrong but exhand
    exhand.join();
    // then There will be complaints           
}

DelayedExecutor 可以看看 java.util.concurrent.Delayed 接口的实现,有很多现成的,也可以在接口规约上自行定制,如上例子在没有 handle() 的情况下,我们就只剩下糖没有热水了,这样子泡咖啡会出错,但是使用 handle() 会警惕的检查一下,提前定制好计划,没水就不继续了

既然会出现计算异常,可不可以取消呢

可以

public static void something_would_wrong_so_i_want_cancel_it(){
    something = CompletableFuture.completedFuture("A : I will take some sugar").thenAcceptAysnc(fn->{
            // "B : I will take hot water"
        },DelayedExecutor(5,TimeUnit.Minutes));
    exHandle = something.exceptionally(th->" roll back");
    AssertTrue(something.canel(true))
    AssertTrue(something.isCompletedExceptionally())
    // but use exception handle
    exHandle.join()
}

如上的例子,使用 something 会检测出 isCompletedExceptionally(),但是 使用 exHandle() 进行回滚,则不会让 A 白跑一趟拿一堆糖过来

public static void 

convert between Flowable and Computable

About

something fun in Reactive

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages