追記 2013-12-11node.jsでnon-blockingなのはIO丈なので、single threadのnode.jsでは、重い計算関数は矢張り他の処理をblockする。なんたってsingle threadなので。node.jsで重い計算処理をnon-blockingでやるには、Child Process moduleを使う。
問題と対策を書きました。
node.jsでChild Processと大量のdataをやりとりする問題 http://c4se.hatenablog.com/entry/2013/12/11/231318
child_process.fork(filepath)
で、node.jsをもう一つ立ち上げ、messageをやり取りする事ができる。子のnode.js processが一生懸命計算してゐる間、親のnode.js processは、子からのmessage eventを待ちつつ、他の処理を行える。Web Workerと同じだ。Web Workerと同じ様に、
fork()
には、child processとして立ち上げるentry pointと成るjs fileのpathを渡してやる必要が有る。通常は親processと子processで全く別のjs fileに書けば好いのだが、めんどくさい事が有る。偶に。npmに公開する程じゃないcodeの時とか。其の時は、自身のfileを読み込み直してやれば好い。
__filename
で自身のfile pathが取れる。因みに__dirname
で自身のdirectory pathが取れる。親と子のmessageの形式をちゃんと分けてやれば、問題無く通信できる。
例
heavyCalc
と云ふ重い計算関数が有るとする。requestHeavyCalc()
関数で、自身のfileをchild processとして読み直してあげて、計算requestをsend()
する。
messageは、親から子へ渡す形式にはrequest
と云うkeyが、子から親へ渡す形式にはresult
と云うkeyを含んでゐる。実際、形式を区別しなくても支障は無いが、人間の頭が混乱する。
/** * @module heavyCalc */ 'use strict'; var cp = require('child_process'); process.on('message', function(message) { var result; if (message.request) { result = heavyCalc(message.request); process.send({ok: true, result: result}); process.exit(0); } }); /** * @static * @param {Object} request * @param {function(?Error,?Object)} callback function(?Error,?Object) */ function requestHeavyCalc(request, callback) { var n = cp.fork(__filename); n.once('message', function(message) { var error = message.error, result = message.result; if (! result && ! error) { error = new Error('Result is invalid.'); } if (error) { callback(error, result); return; } callback(null, result); }); n.on('error', function(error) { callback(error, null); }); n.send({request: request}); } /* * @param {Object} request * @return {Object} */ function heavyCalc(request) { var result; // Process heavy calculation here. return result; } module.exports = requestHeavyCalc;
実例 (random・ゼロ詰め・1詰めのbuffer poolを作る)
因みに此んなcodeで使ってる。random・ゼロ詰め・1詰めのbuffer poolを作る。
./fs.jsは、先日のnode.jsで非同期にdirectory中の全てのfileを読み込む。
data_pool.js
'use strict'; var path = require('path'), cp = require('child_process'), fs = require('./fs.js'); /* @const */ var MAX_POOL_SIZE = 1000; /* @type { {zero:Object.<number,Buffer>,one:Object.<number,Buffer>} } */ var cache = {zero: { }, one: { }}; process.on('message', function (message) { if (message.create) { switch (message.type) { case 'random': createRandomPool(message.size); break; case 'zero': createZeroFillPool(message.size); break; case 'one': createOneFillPool(message.size); break; default: throw new Error('Unknown creation type: ' + message.type); } } }); /** * Create customisable data pools you want. * * @constructor */ function DataPool() { if (! (this instanceof DataPool)) { return new DataPool(); } } /** * Create a random data pool. * * @example * new DataPool().createRandom(1024, function (error, pool) { * if (error) { console.log(error); return; } * console.log(pool); * }); * * // Create a random IMIX (Internet MIX) data pool. * new DataPool().createRandom([ * [40, 7 / 12], * [576, 4 / 12], * [1500, 1 / 12] * ], function (error, pool) { * if (error) { console.log(error); return; } * console.log(pool); * }); * * @param {(number|Array.<number[]>)} size Size: number or [size: number, ratio: number][]. * @param {function(?Error,Buffer[])} callback */ DataPool.prototype.createRandom = function (size, callback) { createPoolAsync(size, callback, 'random'); }; /** * Create a data pool filled with zero. * * @param {(number|Array.<number[]>)} size Size: number or [size: number, ratio: number][]. * @param {function(?Error,Buffer[])} callback */ DataPool.prototype.createZeroFill = function (size, callback) { if (Array.isArray(size)) { createPoolAsync(size, callback, 'zero'); } else { setImmediate(function () { var pool = new Array(MAX_POOL_SIZE), i = 0, buffer; buffer = new Buffer(createZeroFillBuffer(size)); for (i = 0; i < MAX_POOL_SIZE; ++i) { pool[i] = buffer; } callback(null, pool); }); } }; /** * Create a data pool filled with one. * * @param {(number|Array.<number[]>)} size Size: number or [size: number, ratio: number][]. * @param {function(?Error,Buffer[])} callback */ DataPool.prototype.createOneFill = function (size, callback) { if (Array.isArray(size)) { createPoolAsync(size, callback, 'one'); } else { setImmediate(function () { var pool = new Array(MAX_POOL_SIZE), i = 0, buffer; buffer = new Buffer(createOneFillBuffer(size)); for (i = 0; i < MAX_POOL_SIZE; ++i) { pool[i] = buffer; } callback(null, pool); }); } }; /** * @param {string} filepath * @param {function(?Error,Buffer[])} callback */ DataPool.prototype.createFromFile = function (filepath, callback) { filepath = path.join(__dirname, '../../public/', filepath); fs.readAllFiles(filepath, function (error, files) { if (error) { callback(error, files); return; } callback(null, files); }.bind(this)); }; /* * @param {(number|Array.<number[]>)} size * @param {function(?Error,?Buffer[])} callback * @param {string} datatype 'random'|'zero'|'one' */ function createPoolAsync(size, callback, datatype) { var pool = [ ], n = cp.fork(__filename), onEnd; onEnd = function () { onEnd = function () { }; callback(null, pool); }; n.on('message', function (message) { var error = message.error, data = message.data; if (! data && ! error) { error = new Error('DataPool data is invalid.'); } else { data = new Buffer(data); pool.push(data); } if (error) { n.disconnect(); callback(error, pool); return; } }); n.on('error', function (error) { onEnd = function () { }; callback(error, null); }); n.on('exit', function () { onEnd(); }). on('close', function () { onEnd(); }); n.send({create: true, type: datatype, size: size}); } /* * Create a random data pool at background. * * @param {(number|Array.<number[]>)} size */ function createRandomPool(size) { var i = 0, thisSize; if (Array.isArray(size)) { thisSize = selectSize.bind(null, size); size = null; } for (i = 0; i < MAX_POOL_SIZE; ++i) { process.send({data: createRandomBuffer(size || thisSize())}); } process.exit(0); } /* * Create a zero filled data poll at background. * * @param {Array.<number[]>} size */ function createZeroFillPool(size) { var i = 0; for (i = 0; i < MAX_POOL_SIZE; ++i) { process.send({data: createZeroFillBuffer(selectSize(size))}); } process.exit(0); } /* * Create a one filled data poll at background. * * @param {Array.<number[]>} size */ function createOneFillPool(size) { var i = 0; for (i = 0; i < MAX_POOL_SIZE; ++i) { process.send({data: createOneFillBuffer(selectSize(size))}); } process.exit(0); } /* * @param {Array.<number[]>} size * @return {number} */ function selectSize(size) { var i = 0, iz = 0, sum = 0, rand = 0; sum = size.reduce(function (sum, elm) { return sum + elm[1]; }, 0); rand = Math.random() * sum; sum = 0; for (i = 0, iz = size.length; i < iz; ++i) { sum += size[i][1]; if (rand <= sum) { return size[i][0]; } } return size[size.length - 1][0]; } /* * Create a random buffer. * * @param {number} size * @return {number[]} */ function createRandomBuffer(size) { var i = size, buffer = new Array(size); while (i) { buffer[-- i] = Math.floor(Math.random() * 0xff); } return buffer; } /* * Create a buffer filled with zero. * * @param {number} * @return {number[]} */ function createZeroFillBuffer(size) { var _cache = cache.zero, i = 0, buffer; if (_cache[size]) { return _cache[size]; } buffer = new Array(size); i = size; while (i) { buffer[-- i] = 0; } _cache[size] = buffer; return buffer; } /* * Create a buffer filled with one. * * @param {number} * @return {number[]} */ function createOneFillBuffer(size) { var _cache = cache.one, i = 0, buffer; if (_cache[size]) { return _cache[size]; } buffer = new Array(size); i = size; while (i) { buffer[-- i] = 0xff; } _cache[size] = buffer; return buffer; } module.exports = DataPool;
data_pool_test.js
'use strict'; var factory = require('../factory.js')(); exports.tearDown = function(callback) { factory.destroy(); callback(); }; exports.testDataPoolCanCreateZeroFilledPool = function(test) { test.expect(4); factory.create('DataPool.zero', 1024).then(function(pool) { test.ok(pool.length >= 1000); test.ok(pool[0] instanceof Buffer); test.equal(pool[0].length, 1024); test.ok(Array.prototype.slice.call(pool[1], 0). every(function(data) { return data === 0; })); test.done(); }).done(); }; exports.testDataPoolCanCreateOneFilledPool = function(test) { test.expect(4); factory.create('DataPool.one', 2048).then(function(pool) { test.ok(pool.length >= 1000); test.ok(pool[0] instanceof Buffer); test.equal(pool[0].length, 2048); test.ok(Array.prototype.slice.call(pool[1], 0). every(function(data) { return data === 0xff; })); test.done(); }).done(); }; exports.testDataPoolGenerateCorrectImixDatas = function(test) { test.expect(6); factory.create('DataPool.imix').then(function(pool) { test.ok(pool[0] instanceof Buffer); test.ok(Array.prototype.slice.call(pool[0], 0). some(function(b) { return 0 < b && b < 0xff; })); test.ok(pool.some(function(data) { return data.length === 40; })); test.ok(pool.some(function(data) { return data.length === 576; })); test.ok(pool.some(function(data) { return data.length === 1500; })); test.ok(pool.every(function(data) { return data.length === 40 || data.length === 576 || data.length === 1500; })); test.done(); }).fail(function(error) { test.ifError(error); test.done(); }).done(); };