c4se記:さっちゃんですよ☆

.。oO(さっちゃんですよヾ(〃l _ l)ノ゙☆)

.。oO(此のblogは、主に音樂考察Programming に分類されますよ。ヾ(〃l _ l)ノ゙♬♪♡)

音樂は SoundCloud に公開中です。

考察は現在は主に Scrapbox で公表中です。

Programming は GitHub で開發中です。

node.jsでCPUを使う重い処理のlibrary (child process使い) を1 fileで作る

追記 2013-12-11
問題と対策を書きました。
node.jsでChild Processと大量のdataをやりとりする問題 http://c4se.hatenablog.com/entry/2013/12/11/231318
node.jsでnon-blockingなのはIO丈なので、single threadのnode.jsでは、重い計算関数は矢張り他の処理をblockする。なんたってsingle threadなので。node.jsで重い計算処理をnon-blockingでやるには、Child Process moduleを使う。child_process.fork(filepath)で、node.jsをもう一つ立ち上げ、messageをやり取りする事ができる。子のnode.js processが一生懸命計算してゐる間、親のnode.js processは、子からのmessage eventを待ちつつ、他の処理を行える。Web Workerと同じだ。
Web Workerと同じ様に、fork()には、child processとして立ち上げるentry pointと成るjs fileのpathを渡してやる必要が有る。通常は親processと子processで全く別のjs fileに書けば好いのだが、めんどくさい事が有る。偶に。npmに公開する程じゃないcodeの時とか。
其の時は、自身のfileを読み込み直してやれば好い。__filenameで自身のfile pathが取れる。因みに__dirnameで自身のdirectory pathが取れる。親と子のmessageの形式をちゃんと分けてやれば、問題無く通信できる。

heavyCalcと云ふ重い計算関数が有るとする。requestHeavyCalc()関数で、自身のfileをchild processとして読み直してあげて、計算requestをsend()する。
messageは、親から子へ渡す形式にはrequestと云うkeyが、子から親へ渡す形式にはresultと云うkeyを含んでゐる。実際、形式を区別しなくても支障は無いが、人間の頭が混乱する。

/**
 * @module heavyCalc
 */

'use strict';

var cp = require('child_process');

process.on('message', function(message) {
  var result;

  if (message.request) {
    result = heavyCalc(message.request);
    process.send({ok: true, result: result});
    process.exit(0);
  }
});

/**
 * @static
 * @param {Object} request
 * @param {function(?Error,?Object)} callback function(?Error,?Object)
 */
function requestHeavyCalc(request, callback) {
  var n = cp.fork(__filename);

  n.once('message', function(message) {
    var error = message.error,
        result = message.result;

    if (! result && ! error) { error = new Error('Result is invalid.'); }
    if (error) { callback(error, result); return; }
    callback(null, result);
  });
  n.on('error', function(error) { callback(error, null); });
  n.send({request: request});
}

/*
 * @param {Object} request
 * @return {Object}
 */
function heavyCalc(request) {
  var result;

  // Process heavy calculation here.
  return result;
}

module.exports = requestHeavyCalc;

実例 (random・ゼロ詰め・1詰めのbuffer poolを作る)

因みに此んなcodeで使ってる。random・ゼロ詰め・1詰めのbuffer poolを作る。
./fs.jsは、先日のnode.jsで非同期にdirectory中の全てのfileを読み込む

data_pool.js

'use strict';

var path = require('path'),
    cp = require('child_process'),
    fs = require('./fs.js');
    /* @const */
var MAX_POOL_SIZE = 1000;
    /* @type { {zero:Object.<number,Buffer>,one:Object.<number,Buffer>} } */
var cache = {zero: { }, one: { }};

process.on('message', function (message) {
  if (message.create) {
    switch (message.type) {
    case 'random':
      createRandomPool(message.size);
      break;
    case 'zero':
      createZeroFillPool(message.size);
      break;
    case 'one':
      createOneFillPool(message.size);
      break;
    default:
      throw new Error('Unknown creation type: ' + message.type);
    }
  }
});

/**
 * Create customisable data pools you want.
 *
 * @constructor
 */
function DataPool() {
  if (! (this instanceof DataPool)) {
    return new DataPool();
  }
}

/**
 * Create a random data pool.
 *
 * @example
 * new DataPool().createRandom(1024, function (error, pool) {
 *   if (error) { console.log(error); return; }
 *   console.log(pool);
 * });
 *
 * // Create a random IMIX (Internet MIX) data pool.
 * new DataPool().createRandom([
 *   [40,   7 / 12],
 *   [576,  4 / 12],
 *   [1500, 1 / 12]
 * ], function (error, pool) {
 *   if (error) { console.log(error); return; }
 *   console.log(pool);
 * });
 *
 * @param {(number|Array.<number[]>)} size Size: number or [size: number, ratio: number][].
 * @param {function(?Error,Buffer[])} callback
 */
DataPool.prototype.createRandom = function (size, callback) {
  createPoolAsync(size, callback, 'random');
};

/**
 * Create a data pool filled with zero.
 *
 * @param {(number|Array.<number[]>)} size Size: number or [size: number, ratio: number][].
 * @param {function(?Error,Buffer[])} callback
 */
DataPool.prototype.createZeroFill = function (size, callback) {
  if (Array.isArray(size)) {
    createPoolAsync(size, callback, 'zero');
  } else {
    setImmediate(function () {
      var pool = new Array(MAX_POOL_SIZE),
          i = 0, buffer;

      buffer = new Buffer(createZeroFillBuffer(size));
      for (i = 0; i < MAX_POOL_SIZE; ++i) { pool[i] = buffer; }
      callback(null, pool);
    });
  }
};

/**
 * Create a data pool filled with one.
 *
 * @param {(number|Array.<number[]>)} size Size: number or [size: number, ratio: number][].
 * @param {function(?Error,Buffer[])} callback
 */
DataPool.prototype.createOneFill = function (size, callback) {
  if (Array.isArray(size)) {
    createPoolAsync(size, callback, 'one');
  } else {
    setImmediate(function () {
      var pool = new Array(MAX_POOL_SIZE),
          i = 0, buffer;

      buffer = new Buffer(createOneFillBuffer(size));
      for (i = 0; i < MAX_POOL_SIZE; ++i) { pool[i] = buffer; }
      callback(null, pool);
    });
  }
};

/**
 * @param {string} filepath
 * @param {function(?Error,Buffer[])} callback
 */
DataPool.prototype.createFromFile = function (filepath, callback) {
  filepath = path.join(__dirname, '../../public/', filepath);
  fs.readAllFiles(filepath, function (error, files) {
    if (error) { callback(error, files); return; }
    callback(null, files);
  }.bind(this));
};

/*
 * @param {(number|Array.<number[]>)} size
 * @param {function(?Error,?Buffer[])} callback
 * @param {string} datatype 'random'|'zero'|'one'
 */
function createPoolAsync(size, callback, datatype) {
  var pool = [ ],
      n = cp.fork(__filename),
      onEnd;

  onEnd = function () {
    onEnd = function () { };
    callback(null, pool);
  };

  n.on('message', function (message) {
    var error = message.error,
        data = message.data;

    if (! data && ! error) {
      error = new Error('DataPool data is invalid.');
    } else {
      data = new Buffer(data);
      pool.push(data);
    }
    if (error) {
      n.disconnect();
      callback(error, pool);
      return;
    }
  });
  n.on('error', function (error) {
    onEnd = function () { };
    callback(error, null);
  });
  n.on('exit', function () { onEnd(); }).
    on('close', function () { onEnd(); });
  n.send({create: true, type: datatype, size: size});
}

/*
 * Create a random data pool at background.
 *
 * @param {(number|Array.<number[]>)} size
 */
function createRandomPool(size) {
  var i = 0, thisSize;

  if (Array.isArray(size)) {
    thisSize = selectSize.bind(null, size);
    size = null;
  }
  for (i = 0; i < MAX_POOL_SIZE; ++i) {
    process.send({data: createRandomBuffer(size || thisSize())});
  }
  process.exit(0);
}

/*
 * Create a zero filled data poll at background.
 *
 * @param {Array.<number[]>} size
 */
function createZeroFillPool(size) {
  var i = 0;

  for (i = 0; i < MAX_POOL_SIZE; ++i) {
    process.send({data: createZeroFillBuffer(selectSize(size))});
  }
  process.exit(0);
}

/*
 * Create a one filled data poll at background.
 *
 * @param {Array.<number[]>} size
 */
function createOneFillPool(size) {
  var i = 0;

  for (i = 0; i < MAX_POOL_SIZE; ++i) {
    process.send({data: createOneFillBuffer(selectSize(size))});
  }
  process.exit(0);
}

/*
 * @param {Array.<number[]>} size
 * @return {number}
 */
function selectSize(size) {
  var i = 0, iz = 0, sum = 0, rand = 0;

  sum = size.reduce(function (sum, elm) { return sum + elm[1]; }, 0);
  rand = Math.random() * sum;
  sum = 0;
  for (i = 0, iz = size.length; i < iz; ++i) {
    sum += size[i][1];
    if (rand <= sum) { return size[i][0]; }
  }
  return size[size.length - 1][0];
}

/*
 * Create a random buffer.
 *
 * @param {number} size
 * @return {number[]}
 */
function createRandomBuffer(size) {
  var i = size, buffer = new Array(size);

  while (i) { buffer[-- i] = Math.floor(Math.random() * 0xff); }
  return buffer;
}

/*
 * Create a buffer filled with zero.
 *
 * @param {number}
 * @return {number[]}
 */
function createZeroFillBuffer(size) {
  var _cache = cache.zero,
      i = 0, buffer;

  if (_cache[size]) { return _cache[size]; }
  buffer = new Array(size);
  i = size;
  while (i) { buffer[-- i] = 0; }
  _cache[size] = buffer;
  return buffer;
}

/*
 * Create a buffer filled with one.
 *
 * @param {number}
 * @return {number[]}
 */
function createOneFillBuffer(size) {
  var _cache = cache.one,
      i = 0, buffer;

  if (_cache[size]) { return _cache[size]; }
  buffer = new Array(size);
  i = size;
  while (i) { buffer[-- i] = 0xff; }
  _cache[size] = buffer;
  return buffer;
}

module.exports = DataPool;

data_pool_test.js

'use strict';

var factory = require('../factory.js')();

exports.tearDown = function(callback) {
  factory.destroy();
  callback();
};

exports.testDataPoolCanCreateZeroFilledPool = function(test) {
  test.expect(4);
  factory.create('DataPool.zero', 1024).then(function(pool) {
    test.ok(pool.length >= 1000);
    test.ok(pool[0] instanceof Buffer);
    test.equal(pool[0].length, 1024);
    test.ok(Array.prototype.slice.call(pool[1], 0).
      every(function(data) { return data === 0; }));
    test.done();
  }).done();
};

exports.testDataPoolCanCreateOneFilledPool = function(test) {
  test.expect(4);
  factory.create('DataPool.one', 2048).then(function(pool) {
    test.ok(pool.length >= 1000);
    test.ok(pool[0] instanceof Buffer);
    test.equal(pool[0].length, 2048);
    test.ok(Array.prototype.slice.call(pool[1], 0).
      every(function(data) { return data === 0xff; }));
    test.done();
  }).done();
};

exports.testDataPoolGenerateCorrectImixDatas = function(test) {
  test.expect(6);
  factory.create('DataPool.imix').then(function(pool) {
    test.ok(pool[0] instanceof Buffer);
    test.ok(Array.prototype.slice.call(pool[0], 0).
      some(function(b) { return 0 < b && b < 0xff; }));
    test.ok(pool.some(function(data) { return data.length === 40; }));
    test.ok(pool.some(function(data) { return data.length === 576; }));
    test.ok(pool.some(function(data) { return data.length === 1500; }));
    test.ok(pool.every(function(data) {
      return data.length === 40 ||
             data.length === 576 ||
             data.length === 1500;
    }));
    test.done();
  }).fail(function(error) {
    test.ifError(error);
    test.done();
  }).done();
};