IO知识整理
IO
面向系统IO
page cache
程序虚拟内存到物理内存的转换依靠cpu中的mmu映射
物理内存以page(4k)为单位做分配
多个程序访问磁盘上同一个文件,步骤
- kernel将文件内容加载到pagecache
- 多个程序读取同一份文件指向的同一个pagecache
- 多个程序各自维护各自的fd,fd中seek记录偏移量,指向具体数据
pagecache特点
- 为内核维护的中间层
- 淘汰机制
- 持久化机制,是否会丢失数据
- 占用内存大小
Java输出流
- 不使用buffer的输出流
- 使用buffer的输出流
- 速度超过不使用buffer的输出流,是因为先在jvm内存中写入,默认到8kb写完调用一次systemcall
- 随机输出流
内存充足的情况下,会优先写入pagecache,只要未达到脏页持久化阈值,就不会写入磁盘,不论pagecache里面多少数据,关机重启后将全部丢失;内存不充足的情况下,pagecache中的新数据的写入会导致老数据的持久化,进而写入磁盘。
可以通过手动调用系统flush将脏页写入磁盘
脏页持久化之后cache依然存在于内存,只有内存不够分配时才会淘汰cache,且淘汰的cache一定不是脏页。
ByteBuffer
@Test
public void whatByteBuffer() {// 堆内分配内存// ByteBuffer buffer = ByteBuffer.allocate(1024);// 堆外分配内存ByteBuffer buffer = ByteBuffer.allocateDirect(1024);System.out.println("postition: " + buffer.position());System.out.println("limit: " + buffer.limit());System.out.println("capacity: " + buffer.capacity());// position起点 limit终点 capacity容量System.out.println("mark: " + buffer);// position->3 buffer.put("123".getBytes());System.out.println("-------------put:123......");System.out.println("mark: " + buffer);// 读写交替 position->0 limit->3buffer.flip();System.out.println("-------------flip......");System.out.println("mark: " + buffer);// 读取一个byte pos->1 limit->3buffer.get();System.out.println("-------------get......");System.out.println("mark: " + buffer);// 读写交替 pos->2 limit->1024 已读取的字节删除buffer.compact();System.out.println("-------------compact......");System.out.println("mark: " + buffer);buffer.clear();System.out.println("-------------clear......");System.out.println("mark: " + buffer);
}
mmap
堆外创建一个地址空间,只有文件系统可以直接调用
该内存空间的数据不需要通过系统调用及用户态和内核态的切换,直接写入数据就可以同步
但是依然收到系统page cache限制,存在数据丢失的可能
Direct IO
可以绕过系统控制的page cache,不受系统page cache参数控制
程序自己维护page cache,通过自定义代码逻辑维护一致性/dirty等…
好处是自己维护page cache刷磁盘的阈值,与系统通用配置隔离,但是依然存在丢失数据的逻辑
@Test
public void testRandomAccessFileWrite() throws Exception {RandomAccessFile raf = new RandomAccessFile(path, "rw");raf.write("hello mashibing\n".getBytes());raf.write("hello seanzhou\n".getBytes());System.out.println("write------------");System.in.read();// 指针调整,往前调整后继续写会直接覆盖原有数据raf.seek(4);raf.write("ooxx".getBytes());System.out.println("seek---------");System.in.read();FileChannel rafchannel = raf.getChannel();// mmap 堆外(jvm堆外且是linux的java进程堆外的内存) 和文件映射的 byte not object// 此时文件大小会变为4096kbMappedByteBuffer map = rafchannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);map.put("@@@".getBytes()); //不是系统调用 但是数据会到达 内核的pagecache//曾经我们是需要out.write() 这样的系统调用,才能让程序的data 进入内核的pagecache//曾经必须有用户态内核态切换//mmap的内存映射,依然是内核的pagecache体系所约束的!!!//换言之,丢数据System.out.println("map--put--------");System.in.read();// 数据刷入磁盘 等于flush// map.force(); raf.seek(0);ByteBuffer buffer = ByteBuffer.allocate(8192);// ByteBuffer buffer = ByteBuffer.allocateDirect(1024);// 将channel中的数据读入buffer 等价于buffer.put()int read = rafchannel.read(buffer); System.out.println(buffer);// 此时翻转后可以开始读取数据 pos->0 limit->4096buffer.flip();System.out.println(buffer);for (int i = 0; i < buffer.limit(); i++) {Thread.sleep(200);System.out.print(((char)buffer.get(i)));}
}

面向网络IO
TCP
面向连接的,可靠的传输协议
- 三次握手
- 四次分手
- 内核级开辟资源,建立连接
- 即使服务端没有accept,也可以建立连接(established状态),只是不分配具体的pid
- 而从java进程内部,看到的连接依然是listen状态
- 此时可以从客户端发送数据,而服务端不能接收,服务端开启accept后可以接收到已发送的数据
三次握手报文
- win 滑动窗口长度 协商结果为115
- tcp拥塞控制,提速
- 服务端窗口已满
- 客户端阻塞不再继续发包(如果继续发包会丢弃后发的数据)
- tcp拥塞控制,提速
- seq 序列号 client->server server将序列号+1作为ack返回给client
- mtu ifconfig可以看到网口字节长度
- mss 实际数据字节长度,基本上为mtu-ip长度-port长度 各自20字节

四次分手
- 异常状态
- CLOSE_WAIT(接收方第三次FIN信号没有发送成功)
- TIME_WAIT (连接关闭后,为防止接收方没有收到最后一次ACK,保留连接对应资源)
- FIN_WAIT2(没有收到FIN信号)

Socket
四元组(cip + sport +sip + sport)
- 服务端不需要给客户端连接分配新的端口号(四元组标识唯一,客户端服务端各存一份)
内核级别
关键配置
-
backlog
-
配置后续队列,超过配置条数+1之后,会将连接状态置为SYNC_REC状态,不可连接
-
应当根据处理速度、cpu条数来进行配置,防止建立过多连接
-
-
nodelay
- false:使用优化,会积攒超过buffer长度的字节一次发到服务端,但是会有延时
- true:不使用优化,根据内核调度尽快发送,会多出几次网络io
-
oobinline
- 结合nodelay配置使用效果明显,会单独发出第一个字节
-
keepalive
- 开启后会保持心跳,判断相互之间连接是否生效
IO模型
模型分类
- 同步:程序自己进行R/W操作
- 异步:内核完成R/W 程序像是不访问IO一样,直接使用buffer,linux不可用
- 阻塞:BLOCKING BIO
- 非阻塞:NONBLOCKING NIO
linux
- 同步阻塞:BIO
- 同步非阻塞:NIO、多路复用
BIO
多线程BIO模型,主线程accept+创建子线程,子线程做recv
public static void main(String[] args) throws Exception {ServerSocket server = new ServerSocket(9090,20);System.out.println("step1: new ServerSocket(9090) ");while (true) {// linux指令 socket() ->fd3 bind(fd3,8090) accept(fd3)->fd5Socket client = server.accept(); //阻塞1System.out.println("step2:client\t" + client.getPort());// 主线程只负责创建子线程用来接收数据new Thread(new Runnable(){public void run() {InputStream in = null;try {in = client.getInputStream();BufferedReader reader = new BufferedReader(new InputStreamReader(in));while(true){// linux指令 recv(fd5) 子线程负责读取数据String dataline = reader.readLine(); //阻塞2if(null != dataline){System.out.println(dataline);}else{client.close();break;}}System.out.println("客户端断开");} catch (IOException e) {e.printStackTrace();}}}).start();}

BIO多线程模型需要创建新线程(系统调用clone)+线程调度,导致速率降低
BIO的弊端主要来自于阻塞,系统内核级别阻塞(accept+recv)
- accept等待连接和recv接收数据会阻塞,所以通过创建子线程的方式进行规避,但是clone指令+线程切换也是有很高成本的,当客户端连接数量增加时,处理速度会明显降低
- 因为recv接收数据会发生阻塞,所以当多个客户端连接的时候只能使用多个线程的方式来进行读取,如果只使用一个线程,当连接阻塞时,没办法读取其他连接发送的数据
NIO
accept指令不会阻塞,如果没有连接会直接返回-1,在Java api中会体现为对象为null
不阻塞的好处
- 可以不额外创建线程,在一个线程中执行建立连接和接收信息两个操作,节省了创建连接和线程切换的成本
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kpaWtaly-1676460489179)(IO.assets/image-20230209151431908.png)]
public static void main(String[] args) throws Exception {LinkedList<SocketChannel> clients = new LinkedList<>();ServerSocketChannel ss = ServerSocketChannel.open(); //服务端开启监听:接受客户端ss.bind(new InetSocketAddress(9090));ss.configureBlocking(false); //重点 OS NONBLOCKING!!! //只让接受客户端 不阻塞while (true) {//接受客户端的连接Thread.sleep(1000);SocketChannel client = ss.accept(); //不会阻塞? -1 NULL//accept 调用内核了:1,没有客户端连接进来在BIO 的时候一直卡着,但是在NIO不卡着,返回-1,NULL//如果来客户端的连接,accept 返回的是这个客户端的fd 5,client object//NONBLOCKING 就是代码能往下走了,只不过有不同的情况if (client == null) {System.out.println("null.....");} else {client.configureBlocking(false); //重点 连接socket非阻塞// socket// 服务端的listen socket(连接请求三次握手后,通过accept 得到 连接的socket)// 连接socket(连接后的数据读写使用的)int port = client.socket().getPort();System.out.println("client..port: " + port);clients.add(client);}ByteBuffer buffer = ByteBuffer.allocateDirect(4096); //可以在堆里 堆外//遍历已经连接进来的客户端能不能读写数据for (SocketChannel c : clients) { //串行化!!!! 多线程!!int num = c.read(buffer); // >0 -1 0 //不会阻塞if (num > 0) {buffer.flip();byte[] aaa = new byte[buffer.limit()];buffer.get(aaa);String b = new String(aaa);System.out.println(c.socket().getPort() + " : " + b);buffer.clear();}}}
}
NIO的弊端
-
每次获取数据是O(n)级别的时间复杂度
- 例如有10w连接,只有100个数据传输,nio需要10w次recv系统调用,这里面大部分的系统调用是无意义的(没有数据传输,徒增成本)
多路复用IO
程序通过一次系统调用获得其中IO状态,然后程序自己实现对于有状态的IO进行R/W,时间负责度O(m) + O(1)
无论select、poll还是nio,本质上都是对程序持有的fds依次遍历,只不过区别是nio的依次遍历发生在程序侧,需要发生n次系统调用,以及n次用户态内核态的切换;而select和poll,是需要程序传递fds给内核,内核触发遍历,只发生一次系统调用。
linux下 多路复用器
- SELECT POSIX规范
- 受到FD_SETSIZE的限制,一次最多1024个fd
- 每次都要重新传递fds,每次内核调用都需要触发全量调用传递的fds
- POLL
- 不受到FD_SETSIZE的限制
- 每次都要重新传递fds,每次内核调用都需要触发全量调用传递的fds
- EPOLL
- 内核开辟空间保存fd,规避程序重复传递fd的问题和遍历全量fd的问题

public class SocketMultiplexingSingleThreadv1 {private ServerSocketChannel server = null;//linux 多路复用器(select poll epoll kqueue) nginx event{}private Selector selector = null; int port = 9090;public void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));//如果在epoll模型下,open--》 epoll_create -> fd3selector = Selector.open(); // select poll *epoll 优先选择:epoll 但是可以 -D修正//server 约等于 listen状态的 fd4/*register如果:select,poll:jvm里开辟一个数组 fd4 放进去epoll: epoll_ctl(fd3,ADD,fd4,EPOLLIN*/server.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println("服务器启动了。。。。。");try {while (true) { //死循环Set<SelectionKey> keys = selector.keys();System.out.println(keys.size()+" size");//1,调用多路复用器(select,poll or epoll (epoll_wait))/*select()是啥意思:1,select,poll 其实 内核的select(fd4) poll(fd4)2,epoll: 其实 内核的 epoll_wait()*, 参数可以带时间:没有时间,0 : 阻塞,有时间设置一个超时selector.wakeup() 结果返回0懒加载:其实再触碰到selector.select()调用的时候触发了epoll_ctl的调用*/while (selector.select() > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys(); //返回的有状态的fd集合Iterator<SelectionKey> iter = selectionKeys.iterator();// NIO 对着每一个fd调用系统调用,浪费资源// 多路复用IO调用一次select方法,返回的那些fc可以R/Wwhile (iter.hasNext()) {SelectionKey key = iter.next();iter.remove(); //set 不移除会重复循环处理if (key.isAcceptable()) {//看代码的时候,这里是重点,如果要去接受一个新的连接//语义上,accept接受连接且返回新连接的FD对吧?//那新的FD怎么办?//select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起//epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间acceptHandler(key);} else if (key.isReadable()) {readHandler(key); //连read 还有 write都处理了//在当前线程,这个方法可能会阻塞 ,如果阻塞了十年,其他的IO早就没电了。。。//所以,为什么提出了 IO THREADS//redis 是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的//tomcat 8,9 异步的处理方式 IO 和 处理上 解耦}}}}} catch (IOException e) {e.printStackTrace();}}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel client = ssc.accept(); //来啦,目的是调用accept接受客户端 fd7client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(8192);//你看,调用了register/*select,poll:jvm里开辟一个数组 fd7 放进去epoll: epoll_ctl(fd3,ADD,fd7,EPOLLIN*/client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("-------------------------------------------");System.out.println("新客户端:" + client.getRemoteAddress());System.out.println("-------------------------------------------");} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;try {while (true) {read = client.read(buffer);if (read > 0) {buffer.flip();while (buffer.hasRemaining()) {client.write(buffer);}buffer.clear();} else if (read == 0) {break;} else {client.close();break;}}} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();service.start();}
}
不同多路复用器对应的系统指令
- POLL(jdk native 用户空间 保存了fd)
- 创建server并进行监听
- socket(PF_INET, SOCK_STREAM, IPPROTO_IP) = 4
- fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0 //server.configureBlocking(false);
- bind(4, {sa_family=AF_INET, sin_port=htons(9090)
- listen(4, 50)
- 多路复用建立连接
- poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
- accept(4, = 7 //获取到新的客户端
- fcntl(7, F_SETFL, O_RDWR|O_NONBLOCK) //非阻塞
- poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=POLLIN}], 3, -1) = 1 //selector.select() 注册新的客户端并查询客户端fd是否有状态变更
- 创建server并进行监听
- EPOLL
- 创建server并进行监听
- socket(PF_INET, SOCK_STREAM, IPPROTO_IP) = 4
- fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0 //server.configureBlocking(false);
- bind(4, {sa_family=AF_INET, sin_port=htons(9090)
- listen(4, 50)
- 多路复用创建连接
- epoll_create(256) = 7 (epfd)
- epoll_ctl(7, EPOLL_CTL_ADD, 4,
- epoll_wait(7, {{EPOLLIN, {u32=4, u64=2216749036554158084}}}, 4096, -1) = 1 // selector.select()
- accept(4 =8 //建立连接获取新client的fd
- fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK) //设置为非阻塞
- epoll_ctl(7, EPOLL_CTL_ADD, 8, {EPOLLIN, //注册新的客户端到多路复用器
- epoll_wait(7, //等待有状态变更的fd返回
- 创建server并进行监听
多线程Selector
多线程 多路复用IO
分一个bossGroup 和一个workerGroup;bossGroup负责listen,workGroup负责人R/W
public class MainThread {public static void main(String[] args) {//1,创建 IO Thread (一个或者多个)SelectorThreadGroup boss = new SelectorThreadGroup(3); //混杂模式//boss有自己的线程组SelectorThreadGroup worker = new SelectorThreadGroup(3); //混杂模式//worker有自己的线程组boss.setWorker(worker);//但是,boss得多持有worker的引用:/*** boss里选一个线程注册listen , 触发bind,从而,这个不选中的线程得持有 workerGroup的引用* 因为未来 listen 一旦accept得到client后得去worker中 next出一个线程分配*/boss.bind(9999);boss.bind(8888);boss.bind(6666);boss.bind(7777);}
}
public class SelectorThread extends ThreadLocal<LinkedBlockingQueue<Channel>> implements Runnable{// 每线程对应一个selector,// 多线程情况下,该主机,该程序的并发客户端被分配到多个selector上//注意,每个客户端,只绑定到其中一个selector//其实不会有交互问题Selector selector = null;LinkedBlockingQueue<Channel> lbq = get(); //lbq 在接口或者类中是固定使用方式逻辑写死了。你需要是lbq每个线程持有自己的独立对象SelectorThreadGroup stg;@Overrideprotected LinkedBlockingQueue<Channel> initialValue() {return new LinkedBlockingQueue<>();}SelectorThread(SelectorThreadGroup stg){try {this.stg = stg;selector = Selector.open();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void run() {//Loopwhile (true){try {//1,select()int nums = selector.select(); //阻塞 wakeup()//2,处理selectkeysif(nums>0){Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> iter = keys.iterator();while(iter.hasNext()){ //线程处理的过程SelectionKey key = iter.next();iter.remove();if(key.isAcceptable()){ //复杂,接受客户端的过程(接收之后,要注册,多线程下,新的客户端,注册到那里呢?)acceptHandler(key);}else if(key.isReadable()){readHander(key);}else if(key.isWritable()){}}}//3,处理一些task : listen clientif(!lbq.isEmpty()){//只有方法的逻辑,本地变量是线程隔离的Channel c = lbq.take();if(c instanceof ServerSocketChannel){ServerSocketChannel server = (ServerSocketChannel) c;server.register(selector,SelectionKey.OP_ACCEPT);System.out.println(Thread.currentThread().getName()+" register listen");}else if(c instanceof SocketChannel){SocketChannel client = (SocketChannel) c;ByteBuffer buffer = ByteBuffer.allocateDirect(4096);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println(Thread.currentThread().getName()+" register client: " + client.getRemoteAddress());}}} catch (Exception e) {e.printStackTrace();} }}private void readHander(SelectionKey key) {System.out.println(Thread.currentThread().getName()+" read......");ByteBuffer buffer = (ByteBuffer)key.attachment();SocketChannel client = (SocketChannel)key.channel();buffer.clear();while(true){try {int num = client.read(buffer);if(num > 0){buffer.flip(); //将读到的内容翻转,然后直接写出while(buffer.hasRemaining()){client.write(buffer);}buffer.clear();}else if(num == 0){break;}else {//客户端断开了System.out.println("client: " + client.getRemoteAddress()+"closed......");key.cancel();break;}} catch (IOException e) {e.printStackTrace();}}}private void acceptHandler(SelectionKey key) {System.out.println(Thread.currentThread().getName()+" acceptHandler......");ServerSocketChannel server = (ServerSocketChannel)key.channel();try {SocketChannel client = server.accept();client.configureBlocking(false);stg.nextSelectorV3(client);// stg.nextSelectorV2(client);} catch (IOException e) {e.printStackTrace();}}public void setWorker(SelectorThreadGroup stgWorker) {this.stg = stgWorker;}
}
public class SelectorThreadGroup { //天生都是bossSelectorThread[] sts;ServerSocketChannel server=null;AtomicInteger xid = new AtomicInteger(0);SelectorThreadGroup stg = this;public void setWorker(SelectorThreadGroup stg){this.stg = stg;}SelectorThreadGroup(int num){//num 线程数sts = new SelectorThread[num];for (int i = 0; i < num; i++) {sts[i] = new SelectorThread(this);new Thread(sts[i]).start();}}public void bind(int port) {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));//注册到那个selector上呢?nextSelectorV3(server);} catch (IOException e) {e.printStackTrace();}}public void nextSelectorV3(Channel c) {try {if(c instanceof ServerSocketChannel){SelectorThread st = next(); //listen 选择了 boss组中的一个线程后,要更新这个线程的work组st.lbq.put(c);st.setWorker(stg);st.selector.wakeup();}else {SelectorThread st = nextV3(); //在 main线程种,取到堆里的selectorThread对象//1,通过队列传递数据 消息st.lbq.add(c);//2,通过打断阻塞,让对应的线程去自己在打断后完成注册selectorst.selector.wakeup();}} catch (InterruptedException e) {e.printStackTrace();}}//无论 serversocket socket 都复用这个方法private SelectorThread next() {int index = xid.incrementAndGet() % sts.length; //轮询就会很尴尬,倾斜return sts[index];}private SelectorThread nextV3() {int index = xid.incrementAndGet() % stg.sts.length; //动用worker的线程分配return stg.sts[index];}
}
Netty
ButeBuf
类似于jdk原生ByteBuffer封装
//initialCapacity maxCapacity 默认分配为堆外内存
//ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);System.out.println("buf.isReadable() :" + buf.isReadable());
System.out.println("buf.readerIndex() :" + buf.readerIndex());
System.out.println("buf.readableBytes() " + buf.readableBytes());
System.out.println("buf.isWritable() :" + buf.isWritable());
System.out.println("buf.writerIndex() :" + buf.writerIndex());
System.out.println("buf.writableBytes() :" + buf.writableBytes());
System.out.println("buf.capacity() :" + buf.capacity());
System.out.println("buf.maxCapacity() :" + buf.maxCapacity());
//是否为堆外内存
System.out.println("buf.isDirect() :" + buf.isDirect());
Client端
NioEventLoopGroup
NioSocketChannel
客户端读写需要注册到类似于多路复用器
@Test
public void clientMode() throws Exception {NioEventLoopGroup thread = new NioEventLoopGroup(1);//客户端模式:NioSocketChannel client = new NioSocketChannel();thread.register(client); //epoll_ctl(5,ADD,3)//响应式:输入处理ChannelPipeline p = client.pipeline();p.addLast(new MyInHandler());//reactor 异步的特征ChannelFuture connect = client.connect(new InetSocketAddress("192.168.150.11", 9090));ChannelFuture sync = connect.sync();ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());ChannelFuture send = client.writeAndFlush(buf);//同步处理 send.sync();//同步处理 等待服务端断开连接sync.channel().closeFuture().sync();System.out.println("client over....");
}class MyInHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("client registed...");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client active...");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;// 这里readCharSequence会移动ByteBuffer里面的指针,所以再写回去是没有数据的// CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);CharSequence str = buf.getCharSequence(0, buf.readableBytes(), CharsetUtil.UTF_8);System.out.println(str);ctx.writeAndFlush(buf);}
}
Server端
NioEventLoopGroup
NioServerSocketChannel
还是响应式编程,有客户端连接后通过acceptHandler进行accept和注册R/W Handler
@Test
public void serverMode() throws Exception {NioEventLoopGroup thread = new NioEventLoopGroup(1);NioServerSocketChannel server = new NioServerSocketChannel();thread.register(server);ChannelPipeline p = server.pipeline();//这里通过ChannelInit处理注册R/W处理器//accept接收客户端,并且注册到selectorp.addLast(new MyAcceptHandler(thread, new ChannelInit())); ChannelFuture bind = server.bind(new InetSocketAddress("192.168.150.1", 9090));bind.sync().channel().closeFuture().sync();System.out.println("server close....");
}class MyAcceptHandler extends ChannelInboundHandlerAdapter {private final EventLoopGroup selector;private final ChannelHandler handler;public MyAcceptHandler(EventLoopGroup thread, ChannelHandler myInitHandler) {this.selector = thread;this.handler = myInitHandler; //ChannelInit}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("server registerd...");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// listen socket accept client// socket R/W ByteBufSocketChannel client = (SocketChannel) msg;//响应式的 handlerChannelPipeline p = client.pipeline();p.addLast(handler); //1,client::pipeline[ChannelInit,]//注册selector.register(client);}
}/**
* @ChannelHandler.Sharable表示线程间共享
* 通过ChannelInit初始化来进行与业务处理器之间的解耦
*/
@ChannelHandler.Sharable
class ChannelInit extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {Channel client = ctx.channel();ChannelPipeline p = client.pipeline();p.addLast(new MyInHandler());//2,client::pipeline[ChannelInit,MyInHandler]//用完清除自己即可ctx.pipeline().remove(this);//3,client::pipeline[MyInHandler]}
}
Nio Bootstrap
Client端
@Test
public void nettyClient() throws InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup(1);Bootstrap bs = new Bootstrap();ChannelFuture connect = bs.group(group).channel(NioSocketChannel.class)// .handler(new ChannelInit()).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new MyInHandler());}}).connect(new InetSocketAddress("192.168.150.11", 9090));Channel client = connect.sync().channel();ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());ChannelFuture send = client.writeAndFlush(buf);send.sync();client.closeFuture().sync();
}
Server端
@Test
public void nettyServer() throws InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup(1);ServerBootstrap bs = new ServerBootstrap();ChannelFuture bind = bs.group(group, group).channel(NioServerSocketChannel.class)// .childHandler(new ChannelInit()).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new MyInHandler());}}).bind(new InetSocketAddress("192.168.150.1", 9090));bind.sync().channel().closeFuture().sync();}
C10K problem
单机连接10k客户端问题,随着单服务端连接客户端越来越多,才逐渐出现Nio、多路复用IO等模型
http://www.kegel.com/c10k.html
public static void main(String[] args) {LinkedList<SocketChannel> clients = new LinkedList<>();InetSocketAddress serverAddr = new InetSocketAddress("192.168.150.11", 9090);//端口号的问题:65535// windowsfor (int i = 10000; i < 65000; i++) {try {SocketChannel client1 = SocketChannel.open();SocketChannel client2 = SocketChannel.open();/*linux中你看到的连接就是:client...port: 10508client...port: 10508*/client1.bind(new InetSocketAddress("192.168.150.1", i));// 192.168.150.1:10000 192.168.150.11:9090client1.connect(serverAddr);clients.add(client1);client2.bind(new InetSocketAddress("192.168.110.100", i));// 192.168.110.100:10000 192.168.150.11:9090client2.connect(serverAddr);clients.add(client2);} catch (IOException e) {e.printStackTrace();}}System.out.println("clients "+ clients.size());try {System.in.read();} catch (IOException e) {e.printStackTrace();}
}
IO数据流向图
EPOLL即为在内核空间中通过链表保存有数据状态变更的fd,直接从链表中获取fd即可

相关文章:
IO知识整理
IO 面向系统IO page cache 程序虚拟内存到物理内存的转换依靠cpu中的mmu映射 物理内存以page(4k)为单位做分配 多个程序访问磁盘上同一个文件,步骤 kernel将文件内容加载到pagecache多个程序读取同一份文件指向的同一个pagecache多个程…...
【正点原子FPGA连载】第十三章QSPI Flash读写测试实验 摘自【正点原子】DFZU2EG_4EV MPSoC之嵌入式Vitis开发指南
1)实验平台:正点原子MPSoC开发板 2)平台购买地址:https://detail.tmall.com/item.htm?id692450874670 3)全套实验源码手册视频下载地址: http://www.openedv.com/thread-340252-1-1.html 第十三章QSPI Fl…...
深入理解mysql的内核查询成本计算
MySql系列整体栏目 内容链接地址【一】深入理解mysql索引本质https://blog.csdn.net/zhenghuishengq/article/details/121027025【二】深入理解mysql索引优化以及explain关键字https://blog.csdn.net/zhenghuishengq/article/details/124552080【三】深入理解mysql的索引分类&a…...
LeetCode 141. 环形链表
原题链接 难度:easy\color{Green}{easy}easy 题目描述 给你一个链表的头节点 headheadhead ,判断链表中是否有环。 如果链表中有某个节点,可以通过连续跟踪 nextnextnext 指针再次到达,则链表中存在环。 为了表示给定链表中的…...
git提交
文章目录关于数据库:桌面/vue-admin/vue_shop_api 的 git 输入 打开 phpStudy ->mySQL管理器 导入文件同时输入密码,和文件名 node app.js 错误区: $ git branch // git branch 查看分支 只有一个main分支不见master解决: gi…...
Java中常见的编码集问题
收录于热门专栏Java基础教程系列(进阶篇) 一、遇到一个问题 1、读取CSV文件 package com.guor.demo.charset;import java.io.BufferedReader; import java.io.FileReader; import java.util.ArrayList; import java.util.HashMap; import java.util.L…...
数据结构与算法(Java版) | 就让我们来看看几个实际编程中遇到的问题吧!
上一讲,我给大家简单介绍了一下数据结构,以及数据结构与算法之间的关系,照理来说,接下来我就应该要给大家详细介绍线性结构和非线性结构了,但是在此之前,我决定还是先带着大家看几个实际编程中遇到的问题&a…...
【C++算法】dfs深度优先搜索(上) ——【全面深度剖析+经典例题展示】
💃🏼 本人简介:男 👶🏼 年龄:18 📕 ps:七八天没更新了欸,这几天刚搞完元宇宙,上午一直练🚗,下午背四级单词和刷题来着,还在忙一些学弟…...
总结高频率Vue面试题
目录 什么是三次握手? 什么是四次挥手?(close触发) 什么是VUEX? 什么是同源----跨域? 什么是Promise? 什么是fexl布局? 数据类型 什么是深浅拷贝? 什么是懒加载&…...
IP协议详解
目录 前言: IP协议 提出问题 解决方案 地址管理 子网掩码 路由选择 小结: 前言: IP协议作为网络层知名协议。当数据经过传输层使用TCP或者UDP对数据进行封装,然后当数据到达网络层,基于TCP或UDP数据包继续进行…...
webpack5 基础配置
在开发中,我们会使用 vue、react、less、scss等语法进行开发项目,但是浏览器只能识别 js、css,或者说在js中使用了es6中的import 导入 这时候也需要打包工具去转换成浏览器可以识别的语句。 一、使用webpack 1.初始化package.json npm i…...
IDEA入门安装使用教程
一、背景 作为一个Java开发者,有非常多编辑工具供我们选择,比如Eclipse、IntelliJ IDEA、NetBeans、Visual Studio Code、Sublime Text等等,这些有免费也有收费的,但是就目前市场占比来说普遍使用Eclipse和IntelliJ IDEA这两款主…...
Lambda表达式使用及详解
一 Lambda表达式的简介 Lambda表达式(闭包):java8的新特性,lambda运行将函数作为一个方法的参数,也就是函数作为参数传递到方法中。使用lambda表达式可以让代码更加简洁。 Lambda表达式的使用场景:用以简…...
JAVA练习52-打家劫舍
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 前言 一、题目-打家劫舍 1.题目描述 2.思路与代码 2.1 思路 2.2 代码 总结 前言 提示:这里可以添加本文要记录的大概内容: 2月16日练习内容 提…...
简单谈一谈幂等测试
1、什么是幂等测试 幂等是一个抽象的概念,在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同,即多次调用方法或者接口不会改变业务状态,可以保证重复调用的结果和单次调用的结果一致。幂等测试,则主…...
typescript复习笔记
数组类型-限定每一项的类型 //写法一 const arrNumber: number[] [1, 2, 3] const arrString: string[] [a, b, c] //写法二 const arrNumber2: Array<number> [1, 2, 3] const arrString2: Array<string> [a, b, c]联合类型 符号是 | //数组可以存放字符串或…...
webstorm开发electron,调试主进程方案
官网教程地址:https://www.electronjs.org/zh/docs/latest/tutorial/debugging-main-process 我只能说官网太看得起人了,整这么简易的教程…… 命令行开关 第一步还是要按要求在我们的package.json里加上端口监听:–inspect5858 我的命令…...
2W字正则表达式基础知识总结,这一篇就够了!!(含前端常用案例,建议收藏)
正则表达式 (Regular Expression,简称 RE 或 regexp ) 是一种文本模式,包括普通字符(例如,a 到 z 之间的字母)和特殊字符(称为"元字符")正则表达式使用单个字符串来描述、匹配一系列匹…...
自学web前端觉得好难,可能你遇到了这些困境
好多人跟我说上学的时候也学过前端,毕业了想从事web前端开发的工作,但自学起来好难,快要放弃了,所以我总结了一些大家遇到的困境,希望对你会有所帮助。 目录 1. 意志是否坚定 2. 没有找到合适自己的老师 3. 为了找…...
ASEMI中低压MOS管18N20参数,18N20封装,18N20尺寸
编辑-Z ASEMI中低压MOS管18N20参数: 型号:18N20 漏极-源极电压(VDS):200V 栅源电压(VGS):30V 漏极电流(ID):18A 功耗(PD&#x…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...
【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...
屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...
零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
让AI看见世界:MCP协议与服务器的工作原理
让AI看见世界:MCP协议与服务器的工作原理 MCP(Model Context Protocol)是一种创新的通信协议,旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天,MCP正成为连接AI与现实世界的重要桥梁。…...
JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作
一、上下文切换 即使单核CPU也可以进行多线程执行代码,CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短,所以CPU会不断地切换线程执行,从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...
JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案
JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停 1. 安全点(Safepoint)阻塞 现象:JVM暂停但无GC日志,日志显示No GCs detected。原因:JVM等待所有线程进入安全点(如…...
如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
springboot整合VUE之在线教育管理系统简介
可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生,小白用户,想学习知识的 有点基础,想要通过项…...
