V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐关注
Meteor
JSLint - a JavaScript code quality tool
jsFiddle
D3.js
WebStorm
推荐书目
JavaScript 权威指南第 5 版
Closure: The Definitive Guide
dvsilch
V2EX  ›  JavaScript

JS 中,串行异步任务的取消是否有更好的处理方式

  •  
  •   dvsilch · 131 天前 · 2995 次点击
    这是一个创建于 131 天前的主题,其中的信息可能已经有所发展或是发生改变。

    最近遇到的业务场景:

    1. 用户输入时进行初始化,初始化过程中包含了异步任务的串行
    2. 初始化过程中,不阻塞用户输入

    如果用户输入时上一次的初始化还没结束,就会出现多次初始化的并行。我目前的处理方式是初始化开始时记下当前状态,每结束一个串行的异步任务,都进行一次状态比对。简化后的代码如下:

    /**
     * @type {number | undefined}
     */
    let state = undefined
    
    /**
     * @param {number} ms
     * @returns {Promise<void>}
     * @description sleep for ms milliseconds to simulate async task
     */
    function sleep(ms) {
      return new Promise((resolve) => setTimeout(resolve, ms))
    }
    
    /**
     * @returns {Promise<void>}
     * @description init async task
     */
    async function init() {
      const current = Date.now()
      state = current
    
      await sleep(500)
      if (state !== current) {
        console.warn(`state ${current} init: canceled`)
        return
      }
    
      await sleep(500)
      if (state !== current) {
        console.warn(`state ${current} init: canceled`)
        return
      }
    
      console.log(`state ${current} init: done`)
      state = undefined
    }
    

    但我感觉这种做法不太合理,虽然上面用当前时间戳模拟了状态,但实际上业务场景中可能会出现某几次用户输入一致,所以状态一致,进而导致这几次输入执行的初始化过程无法正常中断。像 C#中有 CancellationToken 可以直接调用 Cancel 取消异步任务。不知道 JS 是否拥有类似的设计,或者对于这类业务场景有更好的处理方式?

    33 条回复    2024-08-16 10:42:29 +08:00
    chenliangngng
        1
    chenliangngng  
       131 天前
    为什么要用时间戳,直接存储状态不就可以了
    不论是 promise 和 ajax 都可以取消
    Pencillll
        2
    Pencillll  
       131 天前 via Android
    我一般把整个流程写成一个 task ,并提供 cancel 函数,需要取消时就主动调用 task.cancel()
    pursuer
        3
    pursuer  
       131 天前
    js 现在还没有自动取消异步任务的方法。可以用 AbortSignal ,但要每次 await 前 throwIfAbort 。或实现一个类似效果的东西。
    rabbbit
        4
    rabbbit  
       131 天前
    const sleep = (time) => {
    return new Promise((resolve) => {
    setTimeout(() => resolve(), time);
    });
    };

    const task = (id) => {
    let _reject;
    const p = new Promise(async (resolve, reject) => {
    _reject = reject;
    const time = Math.random() * 10;
    await sleep(time);
    resolve({ id, time });
    }).then(({ id, time }) => console.log(id, time));

    return () => _reject();
    };

    const main = () => {
    let cancelFn;
    for (let i = 0; i < 5; i++) {
    cancelFn?.();
    cancelFn = task(i);
    }
    };
    main();
    DOLLOR
        5
    DOLLOR  
       131 天前
    不要用时间戳,可以用 symbol 来标记状态。Symbol()创建的每个 symbol 都是唯一的。
    dvsilch
        6
    dvsilch  
    OP
       131 天前
    @rabbbit 这个方式我之前实现过,但是存在麻烦的地方是:业务场景需要外部和内部都拥有 cancel 的能力,结果就是需要暴露一个额外的方法同时内部持有这个 reject ,担心会出现因为持有 reject 导致 promise 无法从内存中释放的情况
    dvsilch
        7
    dvsilch  
    OP
       131 天前
    @chenliangngng
    @DOLLOR
    @pursuer
    不好意思主贴里没说清楚,代码里的状态只是随便写了一个东西,实际业务并不是这么实现的。而且,与其说会因为状态出现 bug ,更不如说是我感觉这种「每次异步任务后需要主动判断状态」的做法太傻逼了...
    doommm
        8
    doommm  
       131 天前
    我在业务中也遇到了这种问题,有一连串的异步调用链需要取消(可能执行到任意一步,不需要撤销,只要停止执行后续就行),没啥头绪。
    有一些比较简单的场景,像异步拉取一些列表数据之类的,我尝试过用 Rxjs 来处理。switchMap 还是挺好用的
    rabbbit
        9
    rabbbit  
       131 天前
    脑洞,不推荐这么写
    const sleep = (time) => {
    return new Promise((resolve) => {
    setTimeout(() => resolve(), time);
    });
    };

    async function* task(id) {
    yield sleep(Math.random() * 100);
    console.log('id: ', id, 'num: ', 1);
    yield sleep(Math.random() * 100);
    console.log('id: ', id, 'num: ', 2);
    yield sleep(Math.random() * 100);
    console.log('id: ', id, 'num: ', 3);
    }

    const main = async () => {
    for (let i = 0; i < 5; i++) {
    const start = Date.now();
    for await (const t of task(i)) {
    const diff = Date.now() - start;
    if (diff > 100) {
    break;
    } else {
    console.log("time: ", diff);
    }
    }
    console.log('end ---')
    }
    };
    main();
    rabbbit
        10
    rabbbit  
       131 天前   ❤️ 1
    排版不好,放 github 了,另外还有一些例子
    https://gist.github.com/Aaron-Bird/42359bf78fe0e868946cb5897f6ca7cd
    chnwillliu
        11
    chnwillliu  
       131 天前 via Android   ❤️ 1
    rxjs switchMap ?
    Projection
        12
    Projection  
       131 天前
    用 RxJS 可以方便地实现,跟 AI 交流了一会后给出了如下代码:

    import { Subject, from } from 'rxjs';
    import { switchMap } from 'rxjs/operators';

    // 模拟异步任务 A 、B 、C
    const taskA = (input) => {
    console.log(`Starting task A with input: ${input}`);
    return new Promise((resolve) => {
    setTimeout(() => {
    console.log(`Finishing task A with input: ${input}`);
    resolve(input + 'A');
    }, 1000); // 假设任务 A 需要 1 秒钟
    });
    };

    const taskB = (input) => {
    console.log(`Starting task B with input: ${input}`);
    return new Promise((resolve) => {
    setTimeout(() => {
    console.log(`Finishing task B with input: ${input}`);
    resolve(input + 'B');
    }, 1000); // 假设任务 B 需要 1 秒钟
    });
    };

    const taskC = (input) => {
    console.log(`Starting task C with input: ${input}`);
    return new Promise((resolve) => {
    setTimeout(() => {
    console.log(`Finishing task C with input: ${input}`);
    resolve(input + 'C');
    }, 1000); // 假设任务 C 需要 1 秒钟
    });
    };

    const subject = new Subject();

    subject
    .pipe(
    switchMap((value) =>
    from(taskA(value)).pipe(
    switchMap((resultA) => from(taskB(resultA))),
    switchMap((resultB) => from(taskC(resultB))),
    ),
    ),
    )
    .subscribe((result) => {
    console.log('Final result:', result);
    });

    // 发出一些值
    subject.next('1'); // 发出第一个值
    setTimeout(() => subject.next('2'), 500); // 在 0.5 秒后发出第二个值,中止第一个任务并开始新的任务
    setTimeout(() => subject.next('3'), 2000); // 在 2 秒后发出第三个值

    输出结果:

    Starting task A with input: 1
    Starting task A with input: 2
    Finishing task A with input: 1
    Finishing task A with input: 2
    Starting task B with input: 2A
    Starting task A with input: 3
    Finishing task B with input: 2A
    Finishing task A with input: 3
    Starting task B with input: 3A
    Finishing task B with input: 3A
    Starting task C with input: 3AB
    Finishing task C with input: 3AB
    Final result: 3ABC

    实际上用全套 RxJS API 更简单,无奈 API 忘了好多
    Projection
        13
    Projection  
       131 天前
    给输出结果加个时间线:

    [1] Starting task A with input: 1
    [514] Starting task A with input: 2
    [1010] Finishing task A with input: 1
    [1537] Finishing task A with input: 2
    [1539] Starting task B with input: 2A
    [2019] Starting task A with input: 3
    [2542] Finishing task B with input: 2A
    [3023] Finishing task A with input: 3
    [3024] Starting task B with input: 3A
    [4027] Finishing task B with input: 3A
    [4028] Starting task C with input: 3AB
    [5038] Finishing task C with input: 3AB
    [5040] Final result: 3ABC
    Projection
        14
    Projection  
       131 天前
    @rabbbit 用生成器确实是一个好办法,在可被取消的时间点添加 yield 。暂停后只要不调用 next() 方法就不会继续执行,这样就需要自己写一个执行器,调度的时机可以自己灵活控制,像 co 那种执行器是自动执行的,async 函数也是自动执行的。
    Projection
        15
    Projection  
       131 天前   ❤️ 1
    我用生成器简单实现了一下,代码 https://pastebin.com/hi04KbPd

    function delay(ms) {
    return new Promise((resolve) => setTimeout(resolve, ms));
    }

    const then = Date.now();

    async function task(name, input) {
    console.log(`[${Date.now() - then}] ${name}: start, input: ${input}`);
    await delay(1_000);
    console.log(`[${Date.now() - then}] ${name}: done, input: ${input}`);
    return input + name;
    }

    let token = 0;
    async function onUpdate(input) {
    let current = ++token;
    async function* gen() {
    let result = await task('a', input);
    yield;
    result = await task('b', result);
    yield;
    result = await task('c', result);
    console.log(`[${Date.now() - then}] final result: ${result}`);
    return result;
    }
    for await (const _ of gen()) {
    if (token !== current) break;
    }
    }

    onUpdate('1');
    await delay(500);
    onUpdate('2');
    await delay(2_000);
    onUpdate('3');

    输出结果:

    [0] a: start, input: 1
    [510] a: start, input: 2
    [1006] a: done, input: 1
    [1526] a: done, input: 2
    [1527] b: start, input: 2a
    [2516] a: start, input: 3
    [2533] b: done, input: 2a
    [3520] a: done, input: 3
    [3520] b: start, input: 3a
    [4528] b: done, input: 3a
    [4530] c: start, input: 3ab
    [5536] c: done, input: 3ab
    [5537] final result: 3abc
    paopjian
        16
    paopjian  
       131 天前
    这好像是个面试题,串行异步任务链的中止与继续,你看是这个么
    function processTasks(...tasks) {
    let isRunning = false; // 是否正在执行,初始化 false,不然第一次启动不了
    const results = []; // 保存执行结果
    let i = 0; // 当前任务索引
    let prom = null;
    return {
    start() {
    return new Promise(async (resolve, reject) => {
    if (prom) {
    // 结束了
    prom.then(resolve, reject);
    return;
    }
    if (isRunning) return;
    isRunning = true;
    while (i < tasks.length) {
    try {
    results.push(await tasks[i]());
    } catch (err) {
    isRunning = false;
    reject(err);
    prom = Promise.reject(err);
    return;
    }
    i++;
    if (!isRunning && i < tasks.length) return; //中断
    }
    // 全部执行完毕
    isRunning = false;
    resolve(results);
    prom = Promise.resolve(results);
    });
    },
    pause() {
    console.log('暂停任务');
    isRunning = false;
    },
    }
    }
    amlee
        17
    amlee  
       130 天前
    https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal

    有一个 AbortSignal ,似乎可以做到,看下上面那个链接里面的 implementing_an_abortable_api 这一节
    iidear2015
        18
    iidear2015  
       130 天前
    感觉你逻辑挺合理的,好像只能写个工具方法重构一下

    ```
    // 一个 task 包含多个异步 job
    // 每个 job 运行前需要校验当前 task 是否在运行
    // 运行新 task 时取消之前的 task

    const createScheduler = (jobs) => {
    let runningTask = 0;

    const run = async () => {
    runningTask += 1;
    const thisTask = runningTask;

    for (let job of jobs) {
    if (thisTask === runningTask) {
    await job();
    } else {
    return;
    }
    }
    };

    return run;
    };

    const init = createScheduler([
    () => sleep(500),
    () => sleep(500),
    () => sleep(500)
    ]);

    ```
    jones2000
        19
    jones2000  
       130 天前
    做一个任务队列,输入一次,移除上一次剩余的没有完成的任务,再往队列里面加一组异步的任务, 异步的任务处理从任务队列里面取。
    dvsilch
        20
    dvsilch  
    OP
       130 天前
    @pursuer
    @amlee
    看了下,AbortSignal 大概也需要在没做支持的异步任务上套一层 Promise 或者每次异步结束前做状态检测,也不是很好

    @rabbbit
    @Projection
    rxjs 之前也有看过,无奈确实是经验不够一眼瞎。目前看来对我而言修改起来最简单的方式,就是自己写一个 generator 然后来手动判定是否执行下一步了,至少把状态判断统一在了 for 循环里,少写不少东西
    dvsilch
        21
    dvsilch  
    OP
       130 天前
    @jones2000 是的,其实跟上面使用 generator 的思路一致
    edward1987
        22
    edward1987  
       130 天前
    也可以简单的写个装饰函数,把所有异步任务的函数都装饰一遍再调用
    function trans(funcN){
    return function(...args){
    if(state!==current){return};
    return funcN(...args)
    }
    }
    然后重新赋值每个异步任务函数
    funcA = trans(funcA);
    funcB= trans(funcB);
    yigefanqie
        23
    yigefanqie  
       129 天前
    没太理解业务场景,这不应该是给用户输入的事件加 debounce 的事吗
    nzbin
        24
    nzbin  
       129 天前
    只要是异步,毫无疑问 rxjs 就是更好的处理方式
    ddch1997
        25
    ddch1997  
       129 天前
    我是用 redux-saga ,fork 一个异步 task 任务,然后 take 一个停止 action ,cancel 掉这个 task 任务就好
    cjd6568358
        26
    cjd6568358  
       128 天前
    把所有的串行异步用 promise.then 链式连接起来。对外暴露 promise,每次用户有新输入调用 promise.reject 终止异步任务链。然后生成新的
    RabbitDR
        27
    RabbitDR  
       128 天前
    @yigefanqie 不一样,他是想取消并行的初始化,防抖不会取消函数的执行。比如连续输入 abc 后 abc 三次输入被防抖了,开始初始化,但这时用户再输入 d ,会进行第二次初始化,他想第二次初始化时如果第一次还在初始化则中止第一次的初始化。
    yigefanqie
        28
    yigefanqie  
       128 天前
    我理解的这样:用户输入时的异步串行任务是不是都是无副作用的,如果是无作用的,直接丢弃之前的初始化,重新初始化就行了,配合上 debounce 降低下初始化触发频率。丢弃掉的初始化如果都是 promise 不用担心内存问题,会自动回收的。
    yigefanqie
        29
    yigefanqie  
       128 天前
    @RabbitDR 不太会回复。回复发在楼上一层了。
    tsanie
        30
    tsanie  
       127 天前
    @yigefanqie #28 一般初始化都是资源占用型过程,最佳实践还是直接中断。

    不过对 op 提到的 CancellationToken 有些疑问,token 自身是无法 Cancel 的,只能通过 IsCancellationRequested 获取终止状态或者调用 ThrowIfCancellationRequested() 在被终止时抛出异常,和 js 中的 AbortSignal 是一样样的( aborted, throwIfAborted())

    可以主动调用 Cancel 的是 CancellationTokenSource ,对应在 js 中是 AbortController
    dvsilch
        31
    dvsilch  
    OP
       127 天前
    @tsanie
    是我笔误了,确实是 CancellationTokenSource(cts),C#里可以做到外部内部各提供一个 cts ,初始化时将两个 cts link 然后将 ct 链式传递到各个异步任务,需要中断时任意一个 cts 直接 cancel 即可,会立刻走到 catch OperationCanceledException 的分支

    但我看了一下 AbortController 似乎只能做到对 fetch api 执行中断,不是特别符合我当前的业务场景...不仅仅是网络请求,还有一些文件的读取、以及等待用户另外输入的 Promise 。这部分如果取消不掉的话,我能想到的形式就是要么把各个 reject 动态推入移出队列,要么就是每一轮异步任务结束后判断状态。两种做法都感觉不太合理,所以目前的做法是暂时跟 #9 #15 一样写一个自定义的 generator 来统一这部分逻辑,有空再去研究研究 rxjs 的实现
    zy445566
        32
    zy445566  
       127 天前
    嘿嘿 这里还挺有讨论意义 自己也写了一个暂停取消任务给自己使用

    https://github.com/zy445566/pause-task-line
    forty
        33
    forty  
       97 天前
    用单独的线程执行任务,需要取消的时候直接杀线程,简单粗暴又有效
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3117 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 12:38 · PVG 20:38 · LAX 04:38 · JFK 07:38
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.