前言 本文从操作系统实际调用角度(以CentOS Linux release 7.5操作系统为示例),力求追根溯源看IO的每一步操作到底发生了什么。
关于如何查看系统调用,Linux可以使用 strace 来查看任何软件的系统调动(这是个很好的分析学习方法):strace -ff -o ./out java TestJava
/**
* Alipay.com Inc. Copyright (c) 2004-2020 All Rights Reserved.
*/
package io;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author xiangyong.ding
* @version $Id: TestSocket.java, v 0.1 2020年08月02日 20:56 xiangyong.ding Exp $
*/
public class BIOSocket {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8090);
System.out.println("step1: new ServerSocket ");
while (true) {
Socket client = serverSocket.accept();
System.out.println("step2: client\t" + client.getPort());
new Thread(() -> {
try {
InputStream in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
while (true) {
System.out.println(reader.readLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
}
启动时
socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 5
bind(5, {sa_family=AF_INET, sin_port=htons(8090), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
listen(5, 50) = 0
poll([{fd=5, events=POLLIN|POLLERR}], 1, -1) = 1 ([{fd=5, revents=POLLIN}])
poll函数会阻塞直到其中任何一个fd发生事件。
有客户端连接后
accept(5, {sa_family=AF_INET, sin_port=htons(10253), sin_addr=inet_addr("42.120.74.252")}, [16]) = 6
clone(child_stack=0x7f013e5c4fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f013e5c59d0, tls=0x7f013e5c5700, child_tidptr=0x7f013e5c59d0) = 13168
poll([{fd=5, events=POLLIN|POLLERR}], 1, -1
抛出线程(即我们代码里的 new Thread() )后,继续poll阻塞等待连接。 clone出来的线程
recvfrom(6, "hello,bio\n", 8192, 0, NULL, NULL) =
关于对recvfrom函数的说明,其中第四个参数0 表示这是一个阻塞调用。
客户端发送数据后
recvfrom(6, "hello,bio\n", 8192, 0, NULL, NULL) = 10
优点 代码简单,逻辑清晰。 缺点
由于stream的read操作是阻塞读,面对多个连接时 每个连接需要每线程。无法处理大量连接(C10K问题)。
误区:可见JDK1.8中对于最初的BIO,在Linux OS下仍然使用的poll,poll本身也是相对比较高效的多路复用函数(支持非阻塞、多个socket同时检查event),只是限于JDK最初的stream API限制,无法支持非阻塞读取。
改进:使用NIO API,将阻塞变为非阻塞, 不需要大量线程。
/**
* Alipay.com Inc. Copyright (c) 2004-2020 All Rights Reserved.
*/
package io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
/**
* @author xiangyong.ding
* @version $Id: NioSocket.java, v 0.1 2020年08月09日 11:25 xiangyong.ding Exp $
*/
public class NIOSocket {
private static LinkedList<SocketChannel> clients = new LinkedList<>();
private static void startClientChannelHandleThread(){
new Thread(() -> {
while (true){
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
//处理客户端连接
for (SocketChannel c : clients) {
// 非阻塞, >0 表示读取到的字节数量, 0或-1表示未读取到或读取异常
int num = 0;
try {
num = c.read(buffer);
} catch (IOException e) {
e.printStackTrace();
}
if (num > 0) {
buffer.flip();
byte[] clientBytes = new byte[buffer.limit()];
//从缓冲区 读取到内存中
buffer.get(clientBytes);
System.out.println(c.socket().getPort() + ":" + new String(clientBytes));
//清空缓冲区
buffer.clear();
}
}
}
}).start();
}
public static void main(String[] args) throws IOException {
//new socket,开启监听
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.bind(new InetSocketAddress(9090));
//设置阻塞接受客户端连接
socketChannel.configureBlocking(true);
//开始client处理线程
startClientChannelHandleThread();
while (true) {
//接受客户端连接; 非阻塞,无客户端返回null(操作系统返回-1)
SocketChannel client = socketChannel.accept();
if (client == null) {
//System.out.println("no client");
} else {
//设置读非阻塞
client.configureBlocking(false);
int port = client.socket().getPort();
System.out.println("client port :" + port);
clients.add(client);
}
}
}
}
主线程
socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 4
bind(4, {sa_family=AF_INET, sin_port=htons(9090), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
listen(4, 50) = 0
fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0
accept(4, 0x7fe26414e680, 0x7fe26c376710) = -1 EAGAIN (Resource temporarily unavailable)
有连接后,子线程
read(6, 0x7f3f415b1c50, 4096) = -1 EAGAIN (Resource temporarily unavailable)
read(6, 0x7f3f415b1c50, 4096) = -1 EAGAIN (Resource temporarily unavailable)
...
资源使用情况:
优点
线程数大大减少。
缺点
需要程序自己扫描 每个连接read,需要 O(n)时间复杂度系统调用 (此时可能只有一个连接发送了数据),高频系统调用(导致CPU 用户态内核态切换)高。导致CPU消耗很高。
改进:不需要用户扫描所有连接,由kernel 给出哪些连接有数据,然后应用从有数据的连接读取数据。
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
/**
* 多路复用socket
*
* @author xiangyong.ding
* @version $Id: MultiplexingSocket.java, v 0.1 2020年08月09日 12:19 xiangyong.ding Exp $
*/
public class MultiplexingSocket {
static ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
public static void main(String[] args) throws Exception {
LinkedList<SocketChannel> clients = new LinkedList<>();
//1.启动server
//new socket,开启监听
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.bind(new InetSocketAddress(9090));
//设置非阻塞,接受客户端
socketChannel.configureBlocking(false);
//多路复用器(JDK包装的代理,select /poll/epoll/kqueue)
Selector selector = Selector.open(); //java自动代理,默认为epoll
//Selector selector = PollSelectorProvider.provider().openSelector();//指定为poll
//将服务端socket 注册到 多路复用器
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
//2. 轮训多路复用器
// 先询问有没有连接,如果有则返回数量以及对应的对象(fd)
while (selector.select() > 0) {
System.out.println();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
//2.1 处理新的连接
if (key.isAcceptable()) {
//接受客户端连接; 非阻塞,无客户端返回null(操作系统返回-1)
SocketChannel client = socketChannel.accept();
//设置读非阻塞
client.configureBlocking(false);
//同样,把client也注册到selector
client.register(selector, SelectionKey.OP_READ);
System.out.println("new client : " + client.getRemoteAddress());
}
//2.2 处理读取数据
else if (key.isReadable()) {
readDataFromSocket(key);
}
}
}
}
protected static void readDataFromSocket(SelectionKey key) throws Exception {
SocketChannel socketChannel = (SocketChannel) key.channel();
// 非阻塞, >0 表示读取到的字节数量, 0或-1表示未读取到或读取异常
// 请注意:这个例子降低复杂度,不考虑报文大于buffer size的情况
int num = socketChannel.read(buffer);
if (num > 0) {
buffer.flip();
byte[] clientBytes = new byte[buffer.limit()];
//从缓冲区 读取到内存中
buffer.get(clientBytes);
System.out.println(socketChannel.socket().getPort() + ":" + new String(clientBytes));
//清空缓冲区
buffer.clear();
}
}
}
启动
socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 4
bind(4, {sa_family=AF_INET, sin_port=htons(9090), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
listen(4, 50)
fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0
epoll_create(256) = 7
epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=4324783852322029573}}) = 0
epoll_ctl(7, EPOLL_CTL_ADD, 4, {EPOLLIN, {u32=4, u64=158913789956}}) = 0
epoll_wait(7
关于对epoll_create(对应着Java的 Selector selector = Selector.open()) 的说明,本质上是在内存的操作系统保留区,创建一个epoll数据结构。用于后面当有client连接时,向该epoll区中添加监听。 有连接
epoll_wait(7,[{EPOLLIN, {u32=4, u64=158913789956}}], 8192, -1) = 1
accept(4, {sa_family=AF_INET, sin_port=htons(29597), sin_addr=inet_addr("42.120.74.252")}, [16]) = 8
fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK) = 0
epoll_ctl(7, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=3212844375897800712}}) = 0
关于epoll_ctl (对应着Java的 client.register(selector, SelectionKey.OP_READ) )。其中 EPOLLIN 恰好对应着Java的 SelectionKey.OP_READ 即监听数据到达读取事件。
客户端发送数据
epoll_wait(7,[{EPOLLIN, {u32=8, u64=3212844375897800712}}], 8192, -1) = 1
read(8, "hello,multiplex\n", 4096) = 16
epoll_wait(7,
note:epoll_wait第四个参数-1表示block。
poll 和 epoll 对比 根据“1.BIO”中的poll函数调用和epoll函数对比如下:
poll和epoll本质上都是同步IO, 区别于BIO的是 多路复用充分降低了 system call,而epoll更进一步,再次降低了system call的时间复杂度。
优点
线程数同样很少,甚至可以把acceptor线程和worker线程使用同一个。
时间复杂度低,Java实现的Selector(在Linux OS下使用的epoll函数)支持多个clientChannel事件的一次性获取,且时间复杂度维持在O(1)。
CPU使用低:得益于Selector,我们不用向 “2.NIO”中需要自己一个个ClientChannel手动去检查事件,因此使得CPU使用率大大降低。
缺点
数据处理麻烦:目前socketChannel.read 读取数据完全是基于字节的,当我们需要需要作为HTTP服务网关时,对于HTTP协议的处理完全需要自己解析,这是个庞大、烦杂、容易出错的工作。
性能
现有socket数据的读取(socketChannel.read(buffer))全部通过一个buffer 缓冲区来接受,一旦连接多起来,这无疑是一个单线程读取,性能无疑是个问题。
那么此时buffer我们每次读取都重新new出来呢?如果每次都new出来,这样的内存碎片对于GC无疑是一场灾难。如何平衡地协调好buffer的共享,既保证性能,又保证线程安全,这是个难题。
TelnetServer
package telnet;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
/**
* Simplistic telnet server.
*/
public final class TelnetServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8992" : "8023"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TelnetServerInitializer(sslCtx));
b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
TelnetServerHandler
package telnet;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.net.InetAddress;
import java.util.Date;
/**
* Handles a server-side channel.
*/
@Sharable
public class TelnetServerHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Send greeting for a new connection.
ctx.write("Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n");
ctx.write("It is " + new Date() + " now.\r\n");
ctx.flush();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, String request) throws Exception {
// Generate and write a response.
String response;
boolean close = false;
if (request.isEmpty()) {
response = "Please type something.\r\n";
} else if ("bye".equals(request.toLowerCase())) {
response = "Have a good day!\r\n";
close = true;
} else {
response = "Did you say '" + request + "'?\r\n";
}
// We do not need to write a ChannelBuffer here.
// We know the encoder inserted at TelnetPipelineFactory will do the conversion.
ChannelFuture future = ctx.write(response);
// Close the connection after sending 'Have a good day!'
// if the client has sent 'bye'.
if (close) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
TelnetServerInitializer
package telnet;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.ssl.SslContext;
/**
* Creates a newly configured {@link ChannelPipeline} for a new channel.
*/
public class TelnetServerInitializer extends ChannelInitializer<SocketChannel> {
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
private static final TelnetServerHandler SERVER_HANDLER = new TelnetServerHandler();
private final SslContext sslCtx;
public TelnetServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
// Add the text line codec combination first,
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// the encoder and decoder are static as these are sharable
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// and then business logic.
pipeline.addLast(SERVER_HANDLER);
}
}
主线程(23109)
## 256无实际作用,这里只为了兼容旧版kernel api
epoll_create(256) = 7epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=5477705356928876549}}) = 0
epoll_create(256) = 10epoll_ctl(10, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=17041805914081853448}}) = 0
epoll_create(256) = 13
epoll_ctl(13, EPOLL_CTL_ADD, 11, {EPOLLIN, {u32=11, u64=17042151607409573899}}) = 0
epoll_create(256) = 16
epoll_ctl(16, EPOLL_CTL_ADD, 14, {EPOLLIN, {u32=14, u64=17042497300737294350}}) = 0
epoll_create(256) = 19
epoll_ctl(19, EPOLL_CTL_ADD, 17, {EPOLLIN, {u32=17, u64=17042561450368827409}}) = 0
epoll_create(256) = 10
socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 20
clone(child_stack=0x7fc3c509afb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7fc3c509b9d0, tls=0x7fc3c509b700, child_tidptr=0x7fc3c509b9d0) = 23130
概括为:
向OS新建socket,并开启clone boss线程23130。
为BOSS创建了一个epoll(论证参见下面“boss”),每个worker创建一个epoll数据结构(本质上是在kernel内存区创建了一个数据结构,用于后续监听)。
创建boss线程监听的socket(本质上在kernel中创建一个数据结构)。
boss(23130)
bind(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
listen(20, 128) = 0
getsockname(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0
getsockname(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0
##将fd为7号epoll和fd为20号的socket绑定,事件:epoll_ctl_add和epoll_ctl_mod
epoll_ctl(7, EPOLL_CTL_ADD, 20, {EPOLLIN, {u32=20, u64=14198059139132817428}}) = 0
epoll_ctl(7, EPOLL_CTL_MOD, 20, {EPOLLIN, {u32=20, u64=20}}) = 0
epoll_wait(7, [{EPOLLIN, {u32=5, u64=17295150779149058053}}], 8192, 1000) = 1
epoll_wait(7, [], 8192, 1000) = 0(不断轮训,1S超时一次)
概括为:
将上一步中main线程创建的fd:20绑定端口8023,并开启监听(网卡负责监听和接受连接和数据,kernel则负责路由到具体进程,具体参见:关于socket和bind和listen,TODO )。
将7号socket对应的fd绑定到20号对应的epoll数据结构上去(都是操作kernel中的内存)。
开始1S中一次阻塞等待epoll有任何连接或数据到达。
boss (23130)
accept(20, {sa_family=AF_INET, sin_port=htons(11144), sin_addr=inet_addr("42.120.74.122")}, [16]) = 24
getsockname(24, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("192.168.0.120")}, [16]) = 0
getsockname(24, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("192.168.0.120")}, [16]) = 0
setsockopt(24, SOL_TCP, TCP_NODELAY, [1], 4) = 0
getsockopt(24, SOL_SOCKET, SO_SNDBUF, [87040], [4]) = 0
getsockopt(24, SOL_SOCKET, SO_SNDBUF, [87040], [4]) = 0
##抛出 work线程
clone(child_stack=0x7fc3c4c98fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7fc3c4c999d0, tls=0x7fc3c4c99700, child_tidptr=0x7fc3c4c999d0) = 2301
worker (2301)
writev(24, [{"Welcome to iZbp14e1g9ztpshfrla9m"..., 37}, {"It is Sun Aug 23 15:44:14 CST 20"..., 41}], 2) = 78
epoll_ctl(13, EPOLL_CTL_ADD, 24, {EPOLLIN, {u32=24, u64=24}}) = 0
epoll_ctl(13, EPOLL_CTL_MOD, 24, {EPOLLIN, {u32=24, u64=14180008216221450264}}) = 0
epoll_wait(13, [{EPOLLIN, {u32=11, u64=17042151607409573899}}], 8192, 1000) = 1
read(11, "\1", 128) = 1
##开始无限loop
epoll_wait(13, [], 8192, 1000) = 0
epoll_wait(13, [{EPOLLIN, {u32=24, u64=24}}], 8192, 1000) = 1
概括:
当BOSS轮训epoll_wait等到了连接后,首先accept得到该socket对应的fd。
连接建立后 BOSS立马抛出一个线程(clone函数)。
worker(即新建的线程)写入了一段数据(这里是业务逻辑)。
worker将该client对应的fd绑定到了13号epoll上。
worker继续轮训监听13号epoll。
worker(2301)
read(24, "i am daojian\r\n", 1024) = 14
write(24, "Did you say 'i am daojian'?\r\n", 29) = 29
##继续无限loop
epoll_wait(13, [], 8192, 1000) = 0
概括为:
wait到数据后,立即read到用户控件内存中(读取1024个字节到 用户控件某个buff中)。
写入数据(业务逻辑,不必太关注)。
继续轮训等待13号epoll。
worker(2301)
read(24, "bye\r\n", 1024) = 5
write(24, "Have a good day!\r\n", 18) = 18
getsockopt(24, SOL_SOCKET, SO_LINGER, {onoff=0, linger=0}, [8]) = 0
dup2(25, 24) = 24
##从epoll数据结构中(OS)中删除fd为24的socket
epoll_ctl(13, EPOLL_CTL_DEL, 24, 0x7f702dd531e0) = -1 ENOENT
##关闭24 socket
close(24) = 0
##继续等待13 epoll数据
epoll_wait(13, [], 8192, 1000) = 0
断开客户端连接概括为:
从epoll中删除该客户端对应的fd(这里触发源头没找到,可能是boss)。
close关闭客户端24号fd。
继续轮训epoll。
boss线程(23130)
accept(20, {sa_family=AF_INET, sin_port=htons(1846), sin_addr=inet_addr("42.120.74.122")}, [16]) = 24
clone(child_stack=0x7f702cc51fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f702cc529d0, tls=0x7f702cc52700, child_tidptr=0x7f702cc529d0) = 10035
accept(20, {sa_family=AF_INET, sin_port=htons(42067), sin_addr=inet_addr("42.120.74.122")}, [16]) = 26
clone(child_stack=0x7f702cb50fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f702cb519d0, tls=0x7f702cb51700, child_tidptr=0x7f702cb519d0) = 10067
...
woker线程(10035,第一个连接)
epoll_ctl(13, EPOLL_CTL_ADD, 24, {EPOLLIN, {u32=24, u64=24}}) = 0
epoll_ctl(13, EPOLL_CTL_MOD, 24, {EPOLLIN, {u32=24, u64=3226004877247250456}}) = 0
epoll_wait(13, [{EPOLLIN, {u32=11, u64=17042151607409573899}}], 8192, 1000) = 1 = 1
epoll_wait(13, [], 8192, 1000) = 0
worker线程(10067,第二个连接)
epoll_ctl(16, EPOLL_CTL_ADD, 26, {EPOLLIN, {u32=26, u64=26}}) = 0
epoll_ctl(16, EPOLL_CTL_MOD, 26, {EPOLLIN, {u32=26, u64=3221483685433835546}}) = 0
epoll_wait(16, [{EPOLLIN, {u32=14, u64=17042497300737294350}}], 8192, 1000) = 1
epoll_wait(16, [], 8192, 1000) = 0
epoll_wait(16, [], 8192, 1000) = 0
worker线程(10067,第二个连接)
epoll_ctl(19, EPOLL_CTL_ADD, 27, {EPOLLIN, {u32=27, u64=27}}) = 0
epoll_ctl(19, EPOLL_CTL_MOD, 27, {EPOLLIN, {u32=27, u64=3216966479350071323}}) = 0
worker线程(8055,第四个连接)
epoll_ctl(10, EPOLL_CTL_ADD, 28, {EPOLLIN, {u32=28, u64=28}}) = 0
epoll_ctl(10, EPOLL_CTL_MOD, 28, {EPOLLIN, {u32=28, u64=3302604828697427996}}) = 0
worker线程(10035,第五个连接,不在clone线程,而是复用了第一个epoll对应的worker)
epoll_ctl(13, EPOLL_CTL_ADD, 29, {EPOLLIN, {u32=29, u64=29}}) = 0
epoll_ctl(13, EPOLL_CTL_MOD, 29, {EPOLLIN, {u32=29, u64=29}}) = 0
概括为:
epoll和boss、worker之间的关系:一共有4个worker对应着4个epoll对象,boss和每个worker都有对应自己的epoll。
boss根据epoll数量,平衡分配连接到每个worker对应的epoll中。
下图通过对系统调用的调查得出 netty 和 kernel 交互图:
初始化直接创建5个epoll,其中7号为boss使用,专门用于处理和客户端连接;其余4个用来给worker使用,用户处理和客户端的数据交互。
work的线程数量,取决于初始化时创建了几个epoll,worker的复用本质上是epoll的复用。
work之间为什么要独立使用epoll?为什么不共享?
为了避免各个worker之间发生争抢连接处理,netty直接做了物理隔离,避免竞争。各个worker只负责处理自己管理的连接,并且后续该worker中的每个client的读写操作完全由 该线程单独处理,天然避免了资源竞争,避免了锁。
worker单线程,性能考虑:worker不仅仅要epoll_wait,还是处理read、write逻辑,加入worker处理了过多的连接,势必造成这部分消耗时间片过多,来不及处理更多连接,性能下降。
优点
数据处理:netty提供了大量成熟的数据处理组件(ENCODER、DECODER),HTTP、POP3拿来即用。
编码复杂度、可维护性:netty充分使得业务逻辑与网络处理解耦,只需要少量的BootStrap配置即可,更多的集中在业务逻辑处理上。
性能:netty提供了的ByteBuf(底层Java原生的ByteBuffer),提供了池化的ByteBuf,兼顾读取性能和ByteBuf内存分配(在后续文档中会再做详解)。
缺点
main线程
epoll_create(256) = 5
epoll_ctl(5, EPOLL_CTL_ADD, 6, {EPOLLIN, {u32=6, u64=11590018039084482566}}) = 0
##创建BOSS 线程(Proactor)
clone(child_stack=0x7f340ac06fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f340ac079d0, tls=0x7f340ac07700, child_tidptr=0x7f340ac079d0) = 22704
socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 8
setsockopt(8, SOL_IPV6, IPV6_V6ONLY, [0], 4) = 0
setsockopt(8, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(8, {sa_family=AF_INET6, sin6_port=htons(9090), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, 28) = 0
listen(8, 50)
accept(8, 0x7f67d01b3120, 0x7f67d9246690) = -1
epoll_ctl(5, EPOLL_CTL_MOD, 8, {EPOLLIN|EPOLLONESHOT, {u32=8, u64=15380749440025362440}}) = -1 ENOENT (No such file or directory)
epoll_ctl(5, EPOLL_CTL_ADD, 8, {EPOLLIN|EPOLLONESHOT, {u32=8, u64=15380749440025362440}}) = 0
read(0,
22704(BOSS 线程(Proactor))
epoll_wait(5, <unfinished ...>
22704(BOSS 线程(Proactor))处理连接
epoll_wait(5,[{EPOLLIN, {u32=9, u64=4294967305}}], 512, -1) = 1
accept(8, {sa_family=AF_INET6, sin6_port=htons(55320), inet_pton(AF_INET6, "::ffff:36.24.32.140", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28]) = 9
clone(child_stack=0x7ff35c99ffb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7ff35c9a09d0, tls=0x7ff35c9a0700, child_tidptr=0x7ff35c9a09d0) = 26241
epoll_wait(5, <unfinished ...>
26241
#将client 连接的FD加入到BOSS的epoll中,以便BOSS线程监听网络事件
epoll_ctl(5, EPOLL_CTL_MOD, 9, {EPOLLIN|EPOLLONESHOT, {u32=9, u64=4398046511113}}) = -1 ENOENT (No such file or directory)
epoll_ctl(5, EPOLL_CTL_ADD, 9, {EPOLLIN|EPOLLONESHOT, {u32=9, u64=4398046511113}}) = 0
accept(8, 0x7ff3440008c0, 0x7ff35c99f4d0) = -1 EAGAIN (Resource temporarily unavailable)
epoll_ctl(5, EPOLL_CTL_MOD, 8, {EPOLLIN|EPOLLONESHOT, {u32=8, u64=8}}) = 0
22704(BOSS 线程(Proactor))处理连接
epoll_wait(5,[{EPOLLIN, {u32=9, u64=4294967305}}], 512, -1) = 1
##数据读出
read(9, "daojian111\r\n", 1024) = 12
##数据处理交给其他线程,这里由于线程池为空,需要先clone线程
clone(child_stack=0x7ff35c99ffb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7ff35c9a09d0, tls=0x7ff35c9a0700, child_tidptr=0x7ff35c9a09d0) = 26532
复制线程处理,线程号26532
write(1, "pool-1-thread-2-10received : dao"..., 41) = 41
write(1, "\n", 1)
accept(8, 0x7f11c400b5f0, 0x7f11f42fd4d0) = -1 EAGAIN (Resource temporarily unavailable)
epoll_ctl(5, EPOLL_CTL_MOD, 8, {EPOLLIN|EPOLLONESHOT, {u32=8, u64=8}}) = 0
从系统调用角度,Java的AIO事实上是以多路复用(Linux上为epoll)等同步IO为基础,自行实现了异步事件分发。
BOSS Thread负责处理连接,并分发事件。
WORKER Thread只负责从BOSS接收的事件执行,不负责任何网络事件监听。
优点 相比于前面的BIO、NIO,AIO已经封装好了任务调度,使用时只需关心任务处理。 缺点
事件处理完全由Thread Pool完成,对于同一个channel的多个事件可能会出现并发问题。
相比netty,buffer API不友好容易出错;编解码工作复杂。
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/KgJFyEmZApF7l5UUJeWf8Q
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。