nodejs的api设计推崇一切皆异步,然而异步的编程不仅流程控制复杂,而且内存泄露意外的发生可能性更大。异步的任务通常是并行运作的,异步回调意味着在完成任务之前,回调函数会一直堆积在内存中。所以有必要设计一个异步回调的队列机制来避免这种情况。
这个队列需要支持的功能如下:#
- 并发执行任务
这些任务应该是相互间没有依赖关系的,可以让他们同时运行减少等待时间。通过在队列中设置并发数上限,来避免同时运行的过多。一个常见的例子是网络文件批量上传,每个文件上传过程是没有关联的,但如果不做并发运行的限制,就会出现n个文件同时上传把本地系统资源耗尽(网络阻塞,内存占满)。 0. 顺序执行任务
如果任务之间存在上下依赖关系,第二个任务成功执行依赖第一个任务的执行结果,那么就要保证他们的运行是顺序的了。比如文件系统操作,一次建立多级目录/a/b/c/d/e/f,实际上被拆分成了六个子任务,依次建立 /a,/a/b,/a/b/c,。。。这六个子任务的顺序不能错,否则无法建立成功。 0. 新任务排队
如果一个新的任务重要性不高于目前队列中的任务,那么应该把这个任务排到队列的尾部等待执行机会。 0. 新任务插队
与其三相反,如果这个任务很重要,那么就把他排到队列的第一个,下一次的执行机会让给这个任务。
- 关于新任务的重要性应该对两种情况综合考虑:
- 这个任务是否必须在其他任务之前执行;
- 如果把这个任务堆积到后面是否会造成过多的内存堆积。
因为队列中存在的任务可能有一些是很占用时间的,而新的任务可能不需要很多时间去完成,但是他对内存的要求很大。比如有一个任务需要把网络请求的结果写到本地文件。这个任务通常并不会占用很多时间,但是这样的任务堆积起来对内存的消耗就很大了,必须把他排到队列开头去处理。
最终实现的程序如下:
'use strict';
let getTaskQueue=(SimultaneousMaxCount)=>{ // 参数为并发最大个数
let f=(fn)=>fn instanceof Array?fn:[fn];
return new(function(){
let maxCount=SimultaneousMaxCount||3;
let queue=[],currentCount=0;
let t=setInterval(()=>{
if(queue.length && currentCount<maxCount){
currentCount++;
let fn=queue.shift(),
n=fn.length,
c=()=>fn.shift()(()=>--n?c():currentCount--);
c();
}
},20);
// 排队和插队接收参数为数组或者单个function
// 数组中的function需要依次执行,整个队列是并发执行的
// 排队
this.push=(fn)=>queue.push(f(fn));
// 插队
this.unshift=(fn)=>queue.unshift(f(fn));
// 销毁队列
this.destory=()=>clearInterval(t);
// 返回当前任务个数
this.hasTask=()=>currentCount+queue.length;
});
};
demo程序taskqueuedemo.js,请使用 node --harmony taskqueuedemo来运行查看效果。
'use strict';
let getTaskQueue=(SimultaneousMaxCount)=>{
let f=(fn)=>fn instanceof Array?fn:[fn];
return new(function(){
let maxCount=SimultaneousMaxCount||3;
let queue=[],currentCount=0;
let t=setInterval(()=>{
if(queue.length && currentCount<maxCount){
currentCount++;
let fn=queue.shift(),
n=fn.length,
c=()=>fn.shift()(()=>--n?c():currentCount--);
c();
}
},20);
this.push=(fn)=>queue.push(f(fn));
this.unshift=(fn)=>queue.unshift(f(fn));
this.destory=()=>clearInterval(t);
this.hasTask=()=>currentCount+queue.length;
});
};
let g=getTaskQueue(3);
let print=(...a)=>console.log.apply(console,a);
// Task1
{
g.push((completed)=>{
print('Task1 Start..');
setTimeout(()=>{ print('Task1 completed!'),completed(); },2000);
});
}
// Task2
{
let task2=[(completed)=>{
print('Task2 Start..');
setTimeout(completed,3000);
}];'a,b,c,d'.split(',').map(a=>{
task2.push((completed)=>{
print('Task2=> ',a);
setTimeout(completed,1000);
});
});task2.push((completed)=>{
print('Task2 completed!');
completed();
});
g.push(task2);
}
// Task3
{
let task3=[(completed)=>{
print('Task3 Start..');
setImmediate(completed);
}];'1,2,3,4'.split(',').map(a=>{
task3.push((completed)=>{
print('Task3=> ',a);
setTimeout(completed,1000);
});
});task3.push((completed)=>{
print('Task3 completed!');
completed();
});
g.push(task3);
}
// Task4
{
g.push((completed)=>{
print('Task4 Start..');
setTimeout(()=>{print('Task4 completed!'),completed();},3000);
});
}
// When task all completed, destory the queue
{
let c=setInterval(()=>{
if(g.hasTask())return;
g.destory();
clearInterval(c);
},1000);
}
相关文档
暂无
随便看看
畅言模块加载中