IO流中「线程」模型总结

频道:行业资讯 日期: 浏览:854
一、基础简介

在IO流的网络模型中,以常见的「客户端-服务端」交互场景为例;

客户端与服务端进行通信「交互」,可能是同步或者异步,服务端进行「流」处理时,可能是阻塞或者非阻塞模式,当然也有自定义的业务流程需要执行,从处理逻辑看就是「读取数据-业务执行-应答写数据」的形式;

Java提供「三种」IO网络编程模型,即:「BIO同步阻塞」、「NIO同步非阻塞」、「AIO异步非阻塞」;

二、同步阻塞

1、模型图解

BIO即同步阻塞,服务端收到客户端的请求时,会启动一个线程处理,「交互」会阻塞直到整个流程结束;

这种模式如果在高并发且流程复杂耗时的场景下,客户端的请求响应会存在严重的性能问题,并且占用过多资源;

2、参考案例

【服务端】启动ServerSocket接收客户端的请求,经过一系列逻辑之后,向客户端发送消息,注意这里线程的10秒休眠;

复制

public class SocketServer01 {

public static void main(String[] args) throws Exception {

// 1、创建Socket服务端

ServerSocket serverSocket = new ServerSocket(8080);

// 2、方法阻塞等待,直到有客户端连接

Socket socket = serverSocket.accept();

// 3、输入流,输出流

InputStream inStream = socket.getInputStream();

OutputStream outStream = socket.getOutputStream();

// 4、数据接收和响应

int readLen = 0;

byte[] buf = new byte[1024];

if ((readLen=inStream.read(buf)) != -1){

// 接收数据

String readVar = new String(buf, 0, readLen) ;

System.out.println("readVar======="+readVar);

}

// 响应数据

Thread.sleep(10000);

outStream.write("sever-8080-write;".getBytes());

// 5、资源关闭

IoClose.ioClose(outStream,inStream,socket,serverSocket);

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

24.

【客户端】Socket连接,先向ServerSocket发送请求,再接收其响应,由于Server端模拟耗时,Client处于长时间阻塞状态;

复制

public class SocketClient01 {

public static void main(String[] args) throws Exception {

// 1、创建Socket客户端

Socket socket = new Socket(InetAddress.getLocalHost(), 8080);

// 2、输入流,输出流

OutputStream outStream = socket.getOutputStream();

InputStream inStream = socket.getInputStream();

// 3、数据发送和响应接收

// 发送数据

outStream.write("client-hello".getBytes());

// 接收数据

int readLen = 0;

byte[] buf = new byte[1024];

if ((readLen=inStream.read(buf)) != -1){

String readVar = new String(buf, 0, readLen) ;

System.out.println("readVar======="+readVar);

}

// 4、资源关闭

IoClose.ioClose(inStream,outStream,socket);

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

三、同步非阻塞

1、模型图解

NIO即同步非阻塞,服务端可以实现一个线程,处理多个客户端请求连接,服务端的并发能力得到极大的提升;

这种模式下客户端的请求连接都会注册到Selector多路复用器上,多路复用器会进行轮询,对请求连接的IO流进行处理;

2、参考案例

【服务端】单线程可以处理多个客户端请求,通过轮询多路复用器查看是否有IO请求;

复制

public class SocketServer01 {

public static void main(String[] args) throws Exception {

try {

//启动服务开启监听

ServerSocketChannel socketChannel = ServerSocketChannel.open();

socketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 8989));

// 设置非阻塞,接受客户端

socketChannel.configureBlocking(false);

// 打开多路复用器

Selector selector = Selector.open();

// 服务端Socket注册到多路复用器,指定兴趣事件

socketChannel.register(selector, SelectionKey.OP_ACCEPT);

// 多路复用器轮询

ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

while (selector.select() > 0){

Set selectionKeys = selector.selectedKeys();

Iterator selectionKeyIter = selectionKeys.iterator();

while (selectionKeyIter.hasNext()){

SelectionKey selectionKey = selectionKeyIter.next() ;

selectionKeyIter.remove();

if(selectionKey.isAcceptable()) {

// 接受新的连接

SocketChannel client = socketChannel.accept();

// 设置读非阻塞

client.configureBlocking(false);

// 注册到多路复用器

client.register(selector, SelectionKey.OP_READ);

} else if (selectionKey.isReadable()) {

// 通道可读

SocketChannel client = (SocketChannel) selectionKey.channel();

int len = client.read(buffer);

if (len > 0){

buffer.flip();

byte[] readArr = new byte[buffer.limit()];

buffer.get(readArr);

System.out.println(client.socket().getPort() + "端口数据:" + new String(readArr));

buffer.clear();

}

}

}

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

24.

25.

26.

27.

28.

29.

30.

31.

32.

33.

34.

35.

36.

37.

38.

39.

40.

41.

42.

43.

44.

45.

46.

【客户端】每隔3秒持续的向通道内写数据,服务端通过轮询多路复用器,持续的读取数据;

复制

public class SocketClient01 {

public static void main(String[] args) throws Exception {

try {

// 连接服务端

SocketChannel socketChannel = SocketChannel.open();

socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));

ByteBuffer writeBuffer = ByteBuffer.allocate(1024);

String conVar = "client-hello";

writeBuffer.put(conVar.getBytes());

writeBuffer.flip();

// 每隔3S发送一次数据

while (true) {

Thread.sleep(3000);

writeBuffer.rewind();

socketChannel.write(writeBuffer);

writeBuffer.clear();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

四、异步非阻塞

1、模型图解

AIO即异步非阻塞,对于通道内数据的「读」和「写」动作,都是采用异步的模式,对于性能的提升是巨大的;

这与常规的第三方对接模式很相似,本地服务在请求第三方服务时,请求过程耗时很大,会异步执行,第三方第一次回调,确认请求可以被执行;第二次回调则是推送处理结果,这种思想在处理复杂问题时,可以很大程度的提高性能,节省资源:

2、参考案例

【服务端】各种「accept」、「read」、「write」动作是异步,通过Future来获取计算的结果;

复制

public class SocketServer01 {

public static void main(String[] args) throws Exception {

// 启动服务开启监听

AsynchronousServerSocketChannel socketChannel = AsynchronousServerSocketChannel.open() ;

socketChannel.bind(new InetSocketAddress("127.0.0.1", 8989));

// 指定30秒内获取客户端连接,否则超时

Future acceptFuture = socketChannel.accept();

AsynchronousSocketChannel asyChannel = acceptFuture.get(30, TimeUnit.SECONDS);

if (asyChannel != null && asyChannel.isOpen()){

// 读数据

ByteBuffer inBuffer = ByteBuffer.allocate(1024);

Future readResult = asyChannel.read(inBuffer);

readResult.get();

System.out.println("read:"+new String(inBuffer.array()));

// 写数据

inBuffer.flip();

Future writeResult = asyChannel.write(ByteBuffer.wrap("server-hello".getBytes()));

writeResult.get();

}

// 关闭资源

asyChannel.close();

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

24.

25.

26.

【客户端】相关「connect」、「read」、「write」方法调用是异步的,通过Future来获取计算的结果;

复制

public class SocketClient01 {

public static void main(String[] args) throws Exception {

// 连接服务端

AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();

Future result = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));

result.get();

// 写数据

String conVar = "client-hello";

ByteBuffer reqBuffer = ByteBuffer.wrap(conVar.getBytes());

Future writeFuture = socketChannel.write(reqBuffer);

writeFuture.get();

// 读数据

ByteBuffer inBuffer = ByteBuffer.allocate(1024);

Future readFuture = socketChannel.read(inBuffer);

readFuture.get();

System.out.println("read:"+new String(inBuffer.array()));

// 关闭资源

socketChannel.close();

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

五、Reactor模型

1、模型图解

这部分内容,可以参考「Doug Lea的《IO》」文档,查看更多细节;

1.1 Reactor设计原理

Reactor模式基于事件驱动设计,也称为「反应器」模式或者「分发者」模式;服务端收到多个客户端请求后,会将请求分派给对应的线程处理;

Reactor:负责事件的监听和分发;Handler:负责处理事件,核心逻辑「read读」、「decode解码」、「compute业务计算」、「encode编码」、「send应答数据」;

1.2 单Reactor单线程

【1】Reactor线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;

【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,并创建一个Handler对象来处理后续业务;

【3】如果不是连接请求事件,则Reactor会将该事件交由当前连接的Handler来处理;

【4】在Handler中,会完成相应的业务流程;

这种模式将所有逻辑「连接、读写、业务」放在一个线程中处理,避免多线程的通信,资源竞争等问题,但是存在明显的并发和性能问题;

1.3 单Reactor多线程

【1】Reactor线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;

【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,并创建一个Handler对象来处理后续业务;

【3】如果不是连接请求事件,则Reactor会将该事件交由当前连接的Handler来处理;

【4】在Handler中,只负责事件响应不处理具体业务,将数据发送给Worker线程池来处理;

【5】Worker线程池会分配具体的线程来处理业务,最后把结果返回给Handler做响应;

这种模式将业务从Reactor单线程分离处理,可以让其更专注于事件的分发和调度,Handler使用多线程也充分的利用cpu的处理能力,导致逻辑变的更加复杂,Reactor单线程依旧存在高并发的性能问题;

1.4 主从Reactor多线程

【1】 MainReactor主线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;

【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,之后MainReactor将连接分配给SubReactor;

【3】如果不是连接请求事件,则MainReactor将连接分配给SubReactor,SubReactor调用当前连接的Handler来处理;

【4】在Handler中,只负责事件响应不处理具体业务,将数据发送给Worker线程池来处理;

【5】Worker线程池会分配具体的线程来处理业务,最后把结果返回给Handler做响应;

这种模式Reactor线程分工明确,MainReactor负责接收新的请求连接,SubReactor负责后续的交互业务,适应于高并发的处理场景,是Netty组件通信框架的所采用的模式;

2、参考案例

【服务端】提供两个EventLoopGroup,「ParentGroup」主要是用来接收客户端的请求连接,真正的处理是转交给「ChildGroup」执行,即Reactor多线程模型;

复制

@Slf4j

public class NettyServer {

public static void main(String[] args) {

// EventLoop组,处理事件和IO

EventLoopGroup parentGroup = new NioEventLoopGroup();

EventLoopGroup childGroup = new NioEventLoopGroup();

try {

// 服务端启动引导类

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(parentGroup, childGroup)

.channel(NioServerSocketChannel.class).childHandler(new ServerChannelInit());

// 异步IO的结果

ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();

channelFuture.channel().closeFuture().sync();

} catch (Exception e){

e.printStackTrace();

} finally {

parentGroup.shutdownGracefully();

childGroup.shutdownGracefully();

}

}

}

class ServerChannelInit extends ChannelInitializer {

@Override

protected void initChannel(SocketChannel socketChannel) {

// 获取管道

ChannelPipeline pipeline = socketChannel.pipeline();

// 编码、解码器

pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));

pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

// 添加自定义的handler

pipeline.addLast("serverHandler", new ServerHandler());

}

}

class ServerHandler extends ChannelInboundHandlerAdapter {

/**

* 通道读和写

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

System.out.println("Server-Msg【"+msg+"】");

TimeUnit.MILLISECONDS.sleep(2000);

String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;

ctx.channel().writeAndFlush("hello-client;time:" + nowTime);

ctx.fireChannelActive();

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {

cause.printStackTrace();

ctx.close();

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

24.

25.

26.

27.

28.

29.

30.

31.

32.

33.

34.

35.

36.

37.

38.

39.

40.

41.

42.

43.

44.

45.

46.

47.

48.

49.

50.

51.

52.

53.

54.

55.

【客户端】通过Bootstrap类,与服务器建立连接,服务端通过ServerBootstrap启动服务,绑定在8989端口,然后服务端和客户端进行通信;

复制

public class NettyClient {

public static void main(String[] args) {

// EventLoop处理事件和IO

NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {

// 客户端通道引导

Bootstrap bootstrap = new Bootstrap();

bootstrap.group(eventLoopGroup)

.channel(NioSocketChannel.class).handler(new ClientChannelInit());

// 异步IO的结果

ChannelFuture channelFuture = bootstrap.connect("localhost", 8989).sync();

channelFuture.channel().closeFuture().sync();

} catch (Exception e){

e.printStackTrace();

} finally {

eventLoopGroup.shutdownGracefully();

}

}

}

class ClientChannelInit extends ChannelInitializer {

@Override

protected void initChannel(SocketChannel socketChannel) {

// 获取管道

ChannelPipeline pipeline = socketChannel.pipeline();

// 编码、解码器

pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));

pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

// 添加自定义的handler

pipeline.addLast("clientHandler", new ClientHandler());

}

}

class ClientHandler extends ChannelInboundHandlerAdapter {

/**

* 通道读和写

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

System.out.println("Client-Msg【"+msg+"】");

TimeUnit.MILLISECONDS.sleep(2000);

String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;

ctx.channel().writeAndFlush("hello-server;time:" + nowTime);

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

ctx.channel().writeAndFlush("channel...active");

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {

cause.printStackTrace();

ctx.close();

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

24.

25.

26.

27.

28.

29.

30.

31.

32.

33.

34.

35.

36.

37.

38.

39.

40.

41.

42.

43.

44.

45.

46.

47.

48.

49.

50.

51.

52.

53.

54.

55.

六、参考源码

复制

编程文档:

https://gitee.com/cicadasmile/butte-java-note

应用仓库:

https://gitee.com/cicadasmile/butte-flyer-parent

1.

2.

3.

4.

5.

0 留言

评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。