Co源码以及与Koa的深入理解

tj 大神的 co,将本来应该是一个数据类型的 generator 变成了一种处理异步的解决方案

其实主要就是一个遍历函数,将 promise 或者 generator 的异步函数一直执行到得到最后的结果再返回,这样就可以把本来放到异步中的方法按照同步的顺序来写。

yield

函数内部的 yield 后面传入的可以是以下

  • Promise(就是 promise 嘛)
  • thunks(就是一个偏函数,执行之后只有一个简单的拥有一个 callback 的参数的函数)
  • array(通过 array 可以并行执行里面的 function,并行是主要的价值)
  • objects(和 array 相同,也是并行执行里面的 yieldable,并行是主要的价值)
  • generators
  • generators functions(下面的这两个东西可以支持,但是并不被推荐,因为我们应该转向更加标准的 promise)

API

co(fn*).then

将一个 generator 解决为一个 promise

var fn = co.wrap(fn*)

讲一个 generator 转化为一个返回 promise 的常规函数

本质的探索

他的最初实现是基于 Thunk 函数的。接收了一个生成器函数作为参数,并生成了一个实际操作函数,函数通过接收回调的方式来传入最后的返回值。

所以先了解下 thunk 函数

这东西的发展是由函数的求值策略的分歧决定的,两种求值策略

var b = 1;
function a(x, y) {
  return y;
}
a(b + 1);

上面的代码一b+1在什么时候执行比较好,

一种是传值调用,在进入函数体之前就直接执行完,把值传进去。c 语言是这么做的

一种是传名调用,将表达式传入函数体,只在用到他的时候求值。Hskell 语言是这么做的

前一种会简单一些,但是会有性能损失,所以倾向于传名调用。

传名函数的编译器实现,其实就是放入一个临时函数,再将临时函数传入函数体,这个临时函数就叫做 thunk 函数。

js 语言是传值调用,他的 thunk 含义有些不同,js 中,thunk 函数替换的不是表达式,而是多参数函数,将它替换成单参数的版本,且只接受回调函数作为参数。

//正常的readFile函数
fs.readFile(fileName, callback);
var readFileThunk = Thunk(fileName);
readFileThunk(callback);
//thunk版本的函数
function Thunk(fileName) {
  return function(callback) {
    fs.readFile(fileName, callback);
  };
}

所以其实任何有回调的函数都是可以搞成 thunk 形式的,下面是一个简单的生成器

var Thunk = function(fn) {
  return function() {
    //先传入其他的参数初始化
    var args = Array.prototype.slice.call(arguments);
    //传入callback返回的函数
    return function(callback) {
      args.push(callback);
      //实际调用的时候
      return fn.apply(this, args);
    };
  };
};
var readFileThunk = Thunk(fs.readFile);
readFileThunk(fileA)(callback);

tj 的 thunkify 源码

/**
 * Module dependencies.
 */
var assert = require("assert");
/**
 * Expose `thunkify()`.
 */
module.exports = thunkify;
/**
 * Wrap a regular callback `fn` as a thunk.
 *
 * @param {Function} fn
 * @return {Function}
 * @api public
 */
function thunkify(fn) {
  assert("function" == typeof fn, "function required");
  return function() {
    //这里就是将所有的参数放进了一个新的数组,这里之所以不用[].slice。是因为有人在bluebird docs发现,如果直接这样泄露arguments,v8的一些优化的编译会被搁置,就会有性能上的损失。
    var args = new Array(arguments.length);
    var ctx = this;
    for (var i = 0; i < args.length; ++i) {
      args[i] = arguments[i];
    }
    return function(done) {
      //这里用called是为了标记只执行了一次,类似于promise的resolve和reject只能执行一次一样。
      var called;
      args.push(function() {
        if (called) return;
        called = true;
        //因为arguments是一个list,必须得用apply才能在done传入。
        done.apply(null, arguments);
      });
      //这里用个try catch,可以在执行失败时走一遍callback,传入err信息
      try {
        fn.apply(ctx, args);
      } catch (err) {
        done(err);
      }
    };
  };
}
不用slice处理arguments 为了使回调函数只执行一次

generator 函数的回调流程管理

包装成这样到底有个啥用场?用在了 generator 的流程管理

var fs = require("fs");
var thunkify = require("thunkify");
var readFile = thunkify(fs.readFile);
var gen = function*() {
  var r1 = yield readFile("/etc/fstab");
  console.log(r1.toString());
  var r2 = yield readFile("/etc/shells");
  console.log(r2.toString());
};
var g = gen();
var r1 = g.next();
r1.value(function(err, data) {
  if (err) throw err;
  var r2 = g.next(data);
  r2.value(function(err, data) {
    if (err) throw err;
    g.next(data);
  });
});

就如同上面的,generator 的执行过程实际上是将同一个回调函数,反复传入 next 的 value 结果中。这样我们就可以递归的来自动完成这个过程了。于是据诞生了基于 thunk 函数的执行器,也就是 co 了。

最简单的 co

function run(fn) {
  var gen = fn();
  function next(err, data) {
    var result = gen.next(data);
    if (result.done) return;
    result.value(next);
  }
  next();
}
run(gen);

执行器帮我们不停地调用传入生成器的 next 函数,如果 done 为 true 的时候,代表迭代完成,会将值传给回调函数。

当然前提是每一个一步函数都得是 thunk 函数的形式。

thunk 并不是 generator 函数的自动执行的唯一方案。我们需要的其实只是一个机制,循环调用,并且交出和返回程序的执行权,thunk 可以做到,promise 也可以做到。

首先将 readfile 包装成 promise 形式

var fs = require("fs");
var readFile = function(fileName) {
  return new Promise(function(resolve, reject) {
    fs.readFile(fileName, function(error, data) {
      if (error) {
        reject(error);
      }
      resolve(data);
    });
  });
};
var gen = function*() {
  var f1 = yield readFile("f1.js");
  var f2 = yield readFile("f2.js");
  console.log(f1);
  console.log(f2);
};

然后手动执行下 generator 函数

var g = gen();
g.next().value.then(function(data) {
  g.next(data).value.then(function(data) {
    g.next(data);
  });
});

写一个自动执行器

function run(gen) {
  var g = gen();
  function next(data) {
    var result = g.next(data);
    if (result.done) return result.value;
    result.value.then(function(data) {
      next(data);
    });
  }
  next();
}

co 的源码

下面的是 co 源码的逐行阅读,先把参照的一些图片列举出来

//array原生的slice
var slice = Array.prototype.slice;
//这里写的这么古怪就只是想在es6的模块引入时更加舒服一些,参见下面的图片3
module.exports = co["default"] = co.co = co;
//将传入的generator函数包装成一个返回promise的方法
//这是一个独立的方法,就是将传入的函数包装成了co执行前的形式
co.wrap = function(fn) {
  //存了一个指针指向原generator函数
  createPromise.__generatorFunction__ = fn;
  return createPromise;
  function createPromise() {
    //返回的方法调用就会直接执行co。
    return co.call(this, fn.apply(this, arguments));
  }
};
//执行generator或者generator函数然后返回一个promise
function co(gen) {
  var ctx = this;
  var args = slice.call(arguments, 1);
  // 将所有的东西放到一个promise里面,来防止引起内存泄露错误的promise chaining。
  //tudo:看一下这个issue see https://github.com/tj/co/issues/180
  //参见下面的内存泄露的研究
  //https://github.com/promises-aplus/promises-spec/issues/179 看的我好累,完全没有看懂啊!!!
  //总之不管怎样,他是把传进来的东西包装成了一个promise
  return new Promise(function(resolve, reject) {
    //这里是判断下gen是不是函数,generators function执行之后是一个object
    if (typeof gen === "function") gen = gen.apply(ctx, args);
    //传入的不是generators函数,没有next,就直接resolve返回结果;这里是错误兼容而已,因为co就是基于generator的,传入其他的没有意义
    if (!gen || typeof gen.next !== "function") return resolve(gen);
    //主要就是走下面的onFulfilled方法,这个方法返回的是一个promise(resolve或者reject)
    onFulfilled();
    function onFulfilled(res) {
      var ret;
      try {
        //调用第一次next方法
        ret = gen.next(res);
      } catch (e) {
        //出错了直接reject出去
        return reject(e);
      }
      //将第一次的结果({done:true,value:{\}#\})传入内部方法next
      next(ret);
    }
    //promise失败的时候调用
    //这里在promise错误的时候,就会尝试向外throw err。Genertor的属性,可以内部抛出,外部不活。如果我们对这个yield进行了try catch,就会被捕获,不处理的话,就会reject出去,在co的catch语句中co(*fn).catch处理。
    function onRejected(err) {
      var ret;
      try {
        ret = gen.throw(err);
      } catch (e) {
        return reject(e);
      }
      next(ret);
    }
    //循环得到next的结果,return的还是一个promise
    function next(ret) {
      //如果done为true的话,代表执行结束,返回一个resolve的promise
      if (ret.done) return resolve(ret.value);
      //既然还没执行完,就将ret.value转换成一个promise
      var value = toPromise.call(ctx, ret.value);
      //如果成功转化为了promise,就在这个promise执行完了再调用onFulfilled方法
      if (value && isPromise(value)) return value.then(onFulfilled, onRejected);
      return onRejected(
        new TypeError(
          "You may only yield a function, promise, generator, array, or object, " +
            'but the following object was passed: "' +
            String(ret.value) +
            '"'
        )
      );
    }
  });
}
//将yield后面的东西转化成一个promise
function toPromise(obj) {
  //如果不存在的话,直接返回,走最后的报错流程
  if (!obj) return obj;
  //判断传入的是不是promise,是的话直接返回
  if (isPromise(obj)) return obj;
  //判断传入的是不是generator,或者generator function,是的话,继续调用co函数进行循环~
  if (isGeneratorFunction(obj) || isGenerator(obj)) return co.call(this, obj);
  //如果就是个普通的thunk函数,也把他转化为promise
  if ("function" == typeof obj) return thunkToPromise.call(this, obj);
  //如果是array或者object的话,也走相应地变换方法
  if (Array.isArray(obj)) return arrayToPromise.call(this, obj);
  if (isObject(obj)) return objectToPromise.call(this, obj);
  //如果都不是,直接返回,走最后的报错流程
  return obj;
}
//这里将thunk转化成了promise,thunk就是调用的时候传入一个error和res的function,就在最外面包了个promise就行了
function thunkToPromise(fn) {
  var ctx = this;
  return new Promise(function(resolve, reject) {
    fn.call(ctx, function(err, res) {
      if (err) return reject(err);
      if (arguments.length > 2) res = slice.call(arguments, 1);
      resolve(res);
    });
  });
}
//这里的array转化为promise其实就是通过Promise.all来包裹,这个方法只接受promise的数组,并且装化为一个新的promise
//参见下面的promise平行执行的研究
function arrayToPromise(obj) {
  return Promise.all(obj.map(toPromise, this));
}
//将一个object转化为promise,其实就是内部调用了promise.all方法而已
function objectToPromise(obj) {
  var results = new obj.constructor();
  var keys = Object.keys(obj);
  var promises = [];
  for (var i = 0; i < keys.length; i++) {
    var key = keys[i];
    var promise = toPromise.call(this, obj[key]);
    if (promise && isPromise(promise)) defer(promise, key);
    else results[key] = obj[key];
  }
  return Promise.all(promises).then(function() {
    return results;
  });
  function defer(promise, key) {
    // predefine the key in the result
    results[key] = undefined;
    promises.push(
      promise.then(function(res) {
        results[key] = res;
      })
    );
  }
}
//检查是否是promise,果然就是简单的判断他有没有then方法
function isPromise(obj) {
  return "function" == typeof obj.then;
}
//这里判断是不是generator就是判断他的next和throw方法是不是function
function isGenerator(obj) {
  return "function" == typeof obj.next && "function" == typeof obj.throw;
}
//判断是否是generatorFunction就是判断了他的constructor的name
function isGeneratorFunction(obj) {
  var constructor = obj.constructor;
  //这里是为了解决没有constructor的对象,比如Object.create(null)
  if (!constructor) return false;
  //这里两种情况会返回true,一种是名字正确地,一种是他的prototype是generator
  if ("GeneratorFunction" === constructor.name || "GeneratorFunction" === constructor.displayName) return true;
  return isGenerator(constructor.prototype);
}
//就是通过constructor来判断是不是一个简单的对象
function isObject(val) {
  return Object == val.constructor;
}

图片 3

处理es6模块的引入

promise chaining 导致的内存泄露

这里只是源码的一个小注释,去看了不少东西

阅读了https://github.com/tj/co/issues/180

有人发现在一个无限循环的 for 循环里面使用 co 调用一个异步操作,会发生内存泄露

有人推断是所有的 promise 都被连接了起来,阻止了 gc 的回收

有人测试了基于 thunk 的 v3,发现 ok,v4 发现内存泄露,并且使用工具发现确实是 promise 的问题

死马说这事规范里的问题,bluebird 和 then/promise 已经做出了修复,

最后 hax 说是 es6 spec “bug”

接下来看一个解决方案

就是用一个 promise 从外面包裹住全部,为什么这样有用??!!

接下来阅读https://github.com/promises-aplus/promises-spec/issues/179

promise 的平行执行

promise 被创建的时候就开始了他们的任务,是无法被执行的。他们只代表了结果的状态,将他们传给 promise.all 的时候甚至都是并行执行的。他不关心执行顺序,也不关心是否平行运行。
tudo:看一下 nodejs 的多线程??

Co 的错误处理

这里主要是涉及到 generator.throw 方法,可以在 generator 外部抛出异常,而在 generator 内部来 catch 住异常。

co 里面使用了这个属性,就可以针对某几个 yield 进行 try catch,如果不进行处理,统一的会在后面的 catch 语句中co(*fn).catch找到。

Koa 执行的理解

洋葱模型

请求进来的时候会一次经过各个中间件进行执行,中间件之间的跳转是 yield next,执行完了之后就会逆序执行。

app.use(function*(next) {
  var start = new Date();
  //执行到这句话的时候跳到下一个中间件
  yield next;
  //下面的中间件执行完了之后再执行下面的部分
  var ms = new Date() - start;
  console.log("%s %s - %s", this.method, this.url, ms);
});

写两个 yield next 会有什么问题?

这里遇到 yield next 其实还是会继续向下执行下一个 generator 的,但是因为下一个中间件 done 的状态已经是 true 了,再次调用一下此前已经执行完的 generator,调用返回的结果肯定还是 done 为 true,因为此前已经执行完了。所以后面继续 yield next 是没有意义的~~

Koa 的中间件是运行在 co 函数之下的。

Koa 的中间件的实现

这里看到了一个 Koa 中间件的实现

var gens = [];
function use(generetor) {
  gens.push(generetor);
}
// 实现co函数
function co(flow, isGenerator) {
  var gen;
  if (isGenerator) {
    gen = flow;
  } else {
    gen = flow();
  }
  return new Promise(function(resolve) {
    var next = function(data) {
      var result = gen.next(data);
      var value = result.value;
      // 如果调用完毕,调用resolve
      if (result.done) {
        resolve(value);
        return;
      }
      // 如果为yield后面接的为generator,传入co进行递归,并且将promise返回
      if (typeof value.next === "function" && typeof value.throw === "function") {
        value = co(value, true);
      }
      if (value.then) {
        // 当promise执行完毕,调用next处理下一个yield
        value.then(function(data) {
          next(data);
        });
      }
    };
    next();
  });
}
function trigger() {
  var prev = null;
  var m = gens.length;
  co(function*() {
    while (m--) {
      // 形成链式generator
      prev = gens[m].call(null, prev);
    }
    // 执行最外层generator方法
    yield prev;
  });
}
use(function*(next) {
  var d = yield new Promise(function(resolve) {
    setTimeout(function() {
      resolve("step1");
    }, 1000);
  });
  console.log(d);
  yield next;
  console.log("step2");
});
use(function*(next) {
  console.log("step3");
  yield next;
  var d = yield new Promise(function(resolve) {
    setTimeout(function() {
      resolve("step4");
    }, 1000);
  });
  console.log(d);
});
use(function*() {
  var d = yield new Promise(function(resolve) {
    setTimeout(function() {
      resolve("step5");
    }, 1000);
  });
  console.log(d);
  console.log("step6");
});
trigger();

tudo:啥叫链式的 generator??

Koa 的运行顺序图

Koa的运行顺序

这张图非常详细了,原来本身的 respond,以及自己定义的一些中间件统一的会被整成一个 generator,然后交给 co 来执行。

tudo:看到的一个博主说是这里模拟的 async 和 await,这里可以去了解一下??

todu:那个内存泄露的问题看不懂啊,操!!先不管了,要死了

本文引用:

http://segmentfault.com/a/1190000002783230

http://www.ruanyifeng.com/blog/2015/05/thunk.html

http://www.cnblogs.com/axes/p/4683176.html

http://purplebamboo.github.io/2015/01/16/koa-source-analytics-4/


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 981909093@qq.com

文章标题:Co源码以及与Koa的深入理解

文章字数:3.7k

本文作者:泽鹿

发布时间:2019-08-28, 16:45:23

最后更新:2019-08-28, 16:45:23

原始链接:http://panyifei.github.io/2019/08/28/前端技术/nodejs/Nodejs框架模块/Co源码以及与Koa的深入理解/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏