当我们谈起 nodejs 时,由于 JavaScript 只能在单线程上运行, 导致 一个 Node 进程只能运行在一个CPU上, 无法发挥现代 CPU 多核的特性。 这对于一个 服务端语言来说, 是比较掣肘其发展的。 好在 Node 在 v0.10 后, 可以使用 Cluster 模块搭建 多进程服务, 并在 v0.12 重写了该模块, 大幅提高其性能, 下面我们将走进 Node-Cluster ,看看 Node 是如何实现集群的。
首先,这涉及一个 程序设计 的两种模型 本文提及的 Nodejs 便是使用 多进程单线程 实现的, 而 Java 等语言 则使用 单进程多线程 模型 故本文着重 多进程单线程 模型, 至于 单进程多线程 模型,则会在比较中稍微涉及。
正如上文说到, JavaScript 只能在单线程中运行, 故 Node 在 JS层面只能 在某进程的一个线程中运行。 若要实现集群, 则必须创建多个进程, 以实现多个应用实例同时运行。
Cluster 模块, 提供了 master-worker 模式 启动多个应用模式。
接下来我们就走入这个模块, 看看其内部具体做了哪些事情。
Cluster 是什么?
其中:
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
} else {
// Workers can share any TCP connection
// In this case it is an HTTP server
http.createServer(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
}).listen(8000);
}
使用 Cluster 模块的 fork 方法来创建出子进程
cluster.fork();
先从 worker 进程的初始化开始看, master 进程在 fork 其 worker进程时 会在其环境变量(workerEnv)中 附加上一个 唯一ID(NODE UNIQUE ID) 该ID 是一个从 0 开始的递增数。
var ids = 0;
//.....
cluster.fork = function(env) {
cluster.setupMaster();
const id = ++ids;
const workerProcess = createWorkerProcess(id, env);
const worker = new Worker({
id: id,
process: workerProcess
});
// ....
function createWorkerProcess(id, env) {
const workerEnv = util._extend({}, process.env);
const execArgv = cluster.settings.execArgv.slice();
const debugArgRegex = /--inspect(?:-brk|-port)?|--debug-port/;
util._extend(workerEnv, env);
workerEnv.NODE_UNIQUE_ID = '' + id;
// .....
return fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: execArgv,
stdio: cluster.settings.stdio,
gid: cluster.settings.gid,
uid: cluster.settings.uid
});
}
然后 Node 在实例初始化时,使用该 ID 判断使用 clild.js Or master.js
const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrMaster}`);
把目光再集中到 这个 fork() 函数中来,
exports.fork = function(modulePath /*, args, options*/) {
// Get options and args arguments.
var execArgv;
var options = {};
var args = [];
var pos = 1;
if (pos < arguments.length && Array.isArray(arguments[pos])) {
args = arguments[pos++];
}
if (pos < arguments.length && arguments[pos] != null) {
if (typeof arguments[pos] !== 'object') {
throw new TypeError('Incorrect value of args option');
}
options = util._extend({}, arguments[pos++]);
}
// Prepare arguments for fork:
execArgv = options.execArgv || process.execArgv;
if (execArgv === process.execArgv && process._eval != null) {
const index = execArgv.lastIndexOf(process._eval);
if (index > 0) {
// Remove the -e switch to avoid fork bombing ourselves.
execArgv = execArgv.slice();
execArgv.splice(index - 1, 2);
}
}
args = execArgv.concat([modulePath], args);
if (typeof options.stdio === 'string') {
options.stdio = stdioStringToArray(options.stdio);
} else if (!Array.isArray(options.stdio)) {
// Use a separate fd=3 for the IPC channel. Inherit stdin, stdout,
// and stderr from the parent if silent isn't set.
options.stdio = options.silent ? stdioStringToArray('pipe') :
stdioStringToArray('inherit');
} else if (options.stdio.indexOf('ipc') === -1) {
throw new TypeError('Forked processes must have an IPC channel');
}
options.execPath = options.execPath || process.execPath;
options.shell = false;
return spawn(options.execPath, args, options);
};
在函数中 做了一些 参数准备,而重点在于 对 这个 options.stdio 的处理。
options.stdio 用于配置子进程与父进程之间建立的管道, 其值应该为一个数组, 但是为了方便, 值可以是以下的字符串之一:
'pipe' - 等同于 ['pipe', 'pipe', 'pipe'] (默认)
'ignore' - 等同于 ['ignore', 'ignore', 'ignore']
'inherit' - 等同于 [process.stdin, process.stdout, process.stderr] 或 [0,1,2]
其每个值 分别对应 [process.stdin, process.stdout, process.stderr] 标准输入、标准输出、标准错误 输出到父进程的方式。
而使用 Fork 方式 衍生出的子进程,又必须加上 一个 IPC通道 用于父子间 传递消息或文件描述符, 故 stdioStringToArray 函数代码如下
function stdioStringToArray(option) {
switch (option) {
case 'ignore':
case 'pipe':
case 'inherit':
return [option, option, option, 'ipc'];
default:
throw new TypeError('Incorrect value of stdio option: ' + option);
}
}
下面我们就重点来看,这个 IPC通道 是如何实现的,以及其工作原理
进程间通信(Inter-process communication, IPC)其实是个很简单的概念,只要你将这个进程的数据传到 另外一个进程就是 IPC 了, 要实现 这个数据传递的方式有非常的多, 如以下
在 Node 中 IPC 实现分两种, 在 Windows 上通过 命名管道, 在 UNIX 上则使用 UNIX domain sockets (UDS), 详情参见 官方文档
目前 Linux 还是主流的服务端操作系统, 主要分析下在 Linux 下 UDS 的使用方式
UDS 是在 Socket 的基础上发展而来的, Socket 一般我们指的都是 IP Socket , 通过网络协议进行通信, 但是在同一台设备上的通信,是否可以绕开网络层的限制呢。 这里我们就可以通过 UDS 来实现, 所以把 它称之为 LocalSocket ,看起来更贴切一点。
在 Linux 中, 一切都可以当做是 文件。 UDS 也不例外。
在 Node 的 child_process.js 模块中, 有如下代码
stdio = stdio.reduce(function(acc, stdio, i) {
// ......
else if (stdio === 'ipc') {
if (sync || ipc !== undefined) {
// Cleanup previously created pipes
cleanup();
if (!sync)
throw new errors.Error('ERR_IPC_ONE_PIPE');
else
throw new errors.Error('ERR_IPC_SYNC_FORK');
}
ipc = new Pipe(PipeConstants.IPC);
ipcFd = i;
acc.push({
type: 'pipe',
handle: ipc,
ipc: true
});
}
当 stdio 的 类型为 ipc 时,会创建一个 ipc 管道, 其fd 为 'ipc' 在stdio数组 中的索引
ipc = new Pipe(PipeConstants.IPC);
ipcFd = i;
此时目光应该被这个 Pipe 所吸引了吧, 那么它又是什么呢, 话不多少直接上 libuv 中对应 Pipe 的实现。
void PipeWrap::New(const FunctionCallbackInfo<Value>& args) {
// This constructor should not be exposed to public javascript.
// Therefore we assert that we are not trying to call this as a
// normal function.
CHECK(args.IsConstructCall());
CHECK(args[0]->IsInt32());
Environment* env = Environment::GetCurrent(args);
int type_value = args[0].As<Int32>()->Value();
PipeWrap::SocketType type = static_cast<PipeWrap::SocketType>(type_value);
bool ipc;
ProviderType provider;
switch (type) {
case SOCKET:
provider = PROVIDER_PIPEWRAP;
ipc = false;
break;
case SERVER:
provider = PROVIDER_PIPESERVERWRAP;
ipc = false;
break;
case IPC:
provider = PROVIDER_PIPEWRAP;
ipc = true;
break;
default:
UNREACHABLE();
}
new PipeWrap(env, args.This(), provider, ipc);
}
PipeWrap::PipeWrap(Environment* env,
Local<Object> object,
ProviderType provider,
bool ipc)
: ConnectionWrap(env, object, provider) {
int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
// Suggestion: uv_pipe_init() returns void.
UpdateWriteQueueSize();
}
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
handle->shutdown_req = NULL;
handle->connect_req = NULL;
handle->pipe_fname = NULL;
handle->ipc = ipc;
return 0;
}
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
int err;
uv__handle_init(loop, (uv_handle_t*)stream, type);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
stream->close_cb = NULL;
stream->connection_cb = NULL;
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
stream->write_queue_size = 0;
if (loop->emfile_fd == -1) {
err = uv__open_cloexec("/dev/null", O_RDONLY);
if (err < 0)
/* In the rare case that "/dev/null" isn't mounted open "/"
* instead.
*/
err = uv__open_cloexec("/", O_RDONLY);
if (err >= 0)
loop->emfile_fd = err;
}
#if defined(__APPLE__)
stream->select = NULL;
#endif /* defined(__APPLE_) */
uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}
可以看到对应 ipc 管道, 底层是通过文件流的方式实现的。 那么这个管道 就一样拥有 和 stream 一样的模式,即 open—write/read—close
父进程在实际创建子进程前,会创建IPC通道并监听它,然后才真正创建出子进程,并通过环境变量(NODE CHANNEL FD)告诉子进程这个IPC通信的文件描述符(fd)。子进程在启动的过程中,根据文件描述符去连接这个已存在的IPC通道,从而完成父子进程之间的连接。
当父进程 send 数据到子进程时, 便通过这个 fd 向 这个特殊的文件开始写入数据,此时调用底层stream 的 write 方法,而子进程由于在启动的过程中便已经连接上 该通道, 在应用层通过 message 事件(底层应是 stream 的 read方法),来接收数据。
由于这个IPC通道是 双工通信的, 故子进程也可以实现向父进程通信。
那么 node 又是如何实现多进程 监听同一个端口呢,这里的关键在于 句柄传递 。
如下代码:
//主进程代码
var child = require('child_process').fork('child.js');
// Open up the server object and send the handle
var server = require('net').createServer();
server.on('connection', function (socket) {
socket.end('handled by parent\n');
});
server.listen(1337, function () {
child.send('server', server);
});
//子进程代码
process.on('message', function (m, server) {
if (m === 'server') {
server.on('connection', function (socket) {
socket.end('handled by child\n');
});
}
});
在示例中,父进程直接将创建出来的 Tcp对象 传递到子进程中,但是我们知道两个进程之间又是无法直接共享内存的, 那么这又是怎么实现的呢?
来看这个 send方法
child.send(message, [sendHandle])
目前,子进程对象send()方法可以发送的句柄类型包括如下几种:
1.net.socket,tcp套接字
2.net.Server,tcp服务器,任意建立在tcp服务上的应用层服务都可以享受到它带来的好处。
3.net.Native,c++层面的tcp套接字或IPC管道。
4.dgram.socket,UDP套接字
5.dgram.Native,C++层面的UDP套接字
send()方法在将消息发送到IPC管道前,将消息组装成两个对象,一个参数是handle,另一个是message, 而发送到子进程的 实际上是这个句柄的 文件描述符, message对象也会序列化为字符串。
而子进程由于之前连接了IPC通道, 可以读取到父进程发送的消息。获取到这个消息后, 通过JSON.parse 还原为对象, 并分析 其中的cmd值, 若 message.cmd = NODE_HANDLE, 则表示父进程传递的是一个句柄,此时便会通过 获取到的 fd 和传递的句柄类型,还原出这个Tcp对象。
还原出这个server 对象后, 便是去监听这个端口了。
setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))
而在 libuv 中在 setsockopt 时设置了 SO_REUSEADDR 选项(关于这个选项的含义可以自行百度),简而言之,设置了这个选项后, 在Linux层面允许我们使用不同的进程 就相同的网卡和端口进行监听, 对于独立启动的进程,并不知道彼此的fd, 所以当一个进程监听成功后, 后面的进程便会失败,但是 通过 send 方式还原出来的 server对象,他们的fd是相同的,此时该子进程就可以通过这个fd 去监听这个端口。
但是 fd 在同一时刻只能被一个进程所占用, 换言之就是网络请求向服务器端发送时,只有一个幸运的进程能够抢到连接,也就是说只有他能为这个请求进行服务。这些进程也都是抢占式的。
随着这些模块逐渐完善, Nodejs 在服务端的使用场景也越来越丰富,如果你仅仅是因为JS 这个后缀而注意到它的话, 那么我希望你能暂停脚步,好好了解一下这门年轻的语言,相信它会给你带来惊喜。
https://github.com/nodejs/node
https://nodejs.org/dist/latest-v10.x/docs/api/net.html#net
https://www.jianshu.com/p/335a9e101c3f
http://docs.libuv.org/en/v1.x/stream.html
原文: https://blog.souche.com/untitled-25/
在日常 Coding 中,码农们肯定少不了对数组的操作,其中很常用的一个操作就是对数组进行遍历,查看数组中的元素,然后一顿操作猛如虎。今天暂且简单地说说在 JavaScript 中 forEach。
克隆项目代码到本地(git应该都要会哈,现在源码几乎都会放github上,会git才方便,不会的可以自学一下哦,不会的也没关系,gitHub上也提供直接下载的链接);打开微信开发者工具;
在 Vue 内部,有一段这样的代码:上面5个函数的作用是在Vue的原型上面挂载方法。initMixin 函数;可以看到在 initMixin 方法中,实现了一系列的初始化操作,包括生命周期流程以及响应式系统流程的启动
nextTick的使用:vue中dom的更像并不是实时的,当数据改变后,vue会把渲染watcher添加到异步队列,异步执行,同步代码执行完成后再统一修改dom,我们看下面的代码。
React更新的方式有三种:(1)ReactDOM.render() || hydrate(ReactDOMServer渲染)(2)setState(3)forceUpdate;接下来,我们就来看下ReactDOM.render()源码
在React中,为防止某个update因为优先级的原因一直被打断而未能执行。React会设置一个ExpirationTime,当时间到了ExpirationTime的时候,如果某个update还未执行的话,React将会强制执行该update,这就是ExpirationTime的作用。
算法对于前端工程师来说总有一层神秘色彩,这篇文章通过解读V8源码,带你探索 Array.prototype.sort 函数下的算法实现。来,先把你用过的和听说过的排序算法都列出来:
extend是jQuery中一个比较核心的代码,如果有查看jQuery的源码的话,就会发现jQuery在多处调用了extend方法。作用:对任意对象进行扩;’扩展某个实例对象
state也就是vuex里的值,也即是整个vuex的状态,而strict和state的设置有关,如果设置strict为true,那么不能直接修改state里的值,只能通过mutation来设置
最近在看webpack的源码,发现有个比较头疼的点是:代码看起来非常跳跃,往往看不到几行就插入一段新内容,为了理解又不得不先学习相关的前置知识。层层嵌套之后,发现最基础的还是tapable模型,因此先对这部分的内容做一个介绍。
内容以共享、参考、研究为目的,不存在任何商业目的。其版权属原作者所有,如有侵权或违规,请与小编联系!情况属实本人将予以删除!