展开目录
nodejs异步回调的并发控制
javascript异步回调并发控制
X
陈尼玛的博客
记录开发生涯的踩坑经历,用时间来验证成长
加载中

nodejs的api设计推崇一切皆异步,然而异步的编程不仅流程控制复杂,而且内存泄露意外的发生可能性更大。异步的任务通常是并行运作的,异步回调意味着在完成任务之前,回调函数会一直堆积在内存中。所以有必要设计一个异步回调的队列机制来避免这种情况。

这个队列需要支持的功能如下:#

  1. 并发执行任务

这些任务应该是相互间没有依赖关系的,可以让他们同时运行减少等待时间。通过在队列中设置并发数上限,来避免同时运行的过多。一个常见的例子是网络文件批量上传,每个文件上传过程是没有关联的,但如果不做并发运行的限制,就会出现n个文件同时上传把本地系统资源耗尽(网络阻塞,内存占满)。 0. 顺序执行任务


如果任务之间存在上下依赖关系,第二个任务成功执行依赖第一个任务的执行结果,那么就要保证他们的运行是顺序的了。比如文件系统操作,一次建立多级目录/a/b/c/d/e/f,实际上被拆分成了六个子任务,依次建立 /a,/a/b,/a/b/c,。。。这六个子任务的顺序不能错,否则无法建立成功。 0. 新任务排队


如果一个新的任务重要性不高于目前队列中的任务,那么应该把这个任务排到队列的尾部等待执行机会。 0. 新任务插队


与其三相反,如果这个任务很重要,那么就把他排到队列的第一个,下一次的执行机会让给这个任务。

因为队列中存在的任务可能有一些是很占用时间的,而新的任务可能不需要很多时间去完成,但是他对内存的要求很大。比如有一个任务需要把网络请求的结果写到本地文件。这个任务通常并不会占用很多时间,但是这样的任务堆积起来对内存的消耗就很大了,必须把他排到队列开头去处理。

最终实现的程序如下:

'use strict';
let getTaskQueue=(SimultaneousMaxCount)=>{ // 参数为并发最大个数
  let f=(fn)=>fn instanceof Array?fn:[fn];
  return new(function(){
    let maxCount=SimultaneousMaxCount||3;
    let queue=[],currentCount=0;
    let t=setInterval(()=>{
      if(queue.length && currentCount<maxCount){
        currentCount++;
        let fn=queue.shift(),
          n=fn.length,
          c=()=>fn.shift()(()=>--n?c():currentCount--);
        c();
      }
    },20);
    // 排队和插队接收参数为数组或者单个function
    // 数组中的function需要依次执行,整个队列是并发执行的
    // 排队
    this.push=(fn)=>queue.push(f(fn));
    // 插队
    this.unshift=(fn)=>queue.unshift(f(fn));
    // 销毁队列
    this.destory=()=>clearInterval(t);
    // 返回当前任务个数
    this.hasTask=()=>currentCount+queue.length;
  });
};

demo程序taskqueuedemo.js,请使用 node --harmony taskqueuedemo来运行查看效果。

'use strict';
let getTaskQueue=(SimultaneousMaxCount)=>{
  let f=(fn)=>fn instanceof Array?fn:[fn];
  return new(function(){
    let maxCount=SimultaneousMaxCount||3;
    let queue=[],currentCount=0;
    let t=setInterval(()=>{
      if(queue.length && currentCount<maxCount){
        currentCount++;
        let fn=queue.shift(),
          n=fn.length,
          c=()=>fn.shift()(()=>--n?c():currentCount--);
        c();
      }
    },20);
    this.push=(fn)=>queue.push(f(fn));
    this.unshift=(fn)=>queue.unshift(f(fn));
    this.destory=()=>clearInterval(t);
    this.hasTask=()=>currentCount+queue.length;
  });
};

let g=getTaskQueue(3);
let print=(...a)=>console.log.apply(console,a);

// Task1
{
  g.push((completed)=>{
    print('Task1 Start..');
    setTimeout(()=>{ print('Task1 completed!'),completed(); },2000);
  });
}

// Task2
{
  let task2=[(completed)=>{
    print('Task2 Start..');
    setTimeout(completed,3000);
  }];'a,b,c,d'.split(',').map(a=>{
    task2.push((completed)=>{
      print('Task2=> ',a);
      setTimeout(completed,1000);
    });
  });task2.push((completed)=>{
    print('Task2 completed!');
    completed();
  });
  g.push(task2);
}

// Task3
{
  let task3=[(completed)=>{
    print('Task3 Start..');
    setImmediate(completed);
  }];'1,2,3,4'.split(',').map(a=>{
    task3.push((completed)=>{
      print('Task3=> ',a);
      setTimeout(completed,1000);
    });
  });task3.push((completed)=>{
    print('Task3 completed!');
    completed();
  });
  g.push(task3);
}

// Task4
{
  g.push((completed)=>{
    print('Task4 Start..');
    setTimeout(()=>{print('Task4 completed!'),completed();},3000);
  });
}

// When task all completed, destory the queue
{
  let c=setInterval(()=>{
    if(g.hasTask())return;
    g.destory();
    clearInterval(c);
  },1000);
}

相关文档

暂无

随便看看

  1. windows电脑防止自动休眠

  2. nodejs 长连接

  3. nginx用域名来转发请求

  4. jxa运动指令脚本

  5. 树莓派实现用pi用户自动登录

  6. git删除远程分支

  7. 模拟307跳转情况

  8. webrtc服务搭建

  9. 树莓派 3B/3B+ usb启动

  10. cdn资源列表

  11. git配置服务端支持http认证

  12. git 设置代理服务器

  13. 把树莓派的存储空间拓展到整张TF卡中

  14. mongodb2.4 添加用户

  15. python下载文件,带进度条控制

  16. mysql导出csv文件

  17. sendmail用nginx做代理

  18. ie8上Image.onload不触发问题

畅言模块加载中