本文共 7026 字,大约阅读时间需要 23 分钟。
netty 5 已经放弃掉了,作为学习netty4和5的差别不大,本例子是基于netty5
一个thread + 队列 == 一个单线程线程池。线程安全的,任务是线性串行执行的
线程安全,不会产生阻塞效应 ,使用对象组
线程不安全,会产生阻塞效应, 使用对象池
其中 EventExecutor executor 是 NioEventLoop 线程安全
public final class NioEventLoop extends SingleThreadEventLoop
/** * netty5服务端 * * */public class Server { public static void main(String[] args) { //服务类 ServerBootstrap bootstrap = new ServerBootstrap(); //boss和worker EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { //设置线程池 bootstrap.group(boss, worker); //设置socket工厂、 bootstrap.channel(NioServerSocketChannel.class); //设置管道工厂 bootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ServerHandler()); } }); //netty3中对应设置如下 //bootstrap.setOption("backlog", 1024); //bootstrap.setOption("tcpNoDelay", true); //bootstrap.setOption("keepAlive", true); //设置参数,TCP参数 bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接 bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送 //绑定端口 ChannelFuture future = bootstrap.bind(10101); System.out.println("start"); //等待服务端关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally{ //释放资源 boss.shutdownGracefully(); worker.shutdownGracefully(); } }}
import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/** * 服务端消息处理 * * */public class ServerHandler extends SimpleChannelInboundHandler{ @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); ctx.channel().writeAndFlush("hi"); ctx.writeAndFlush("hi"); } /** * 新客户端接入 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); } /** * 客户端断开 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive"); } /** * 异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); }}
import java.io.BufferedReader;import java.io.InputStreamReader;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/** * netty5的客户端 * * */public class Client { public static void main(String[] args) { //服务类 Bootstrap bootstrap = new Bootstrap(); //worker EventLoopGroup worker = new NioEventLoopGroup(); try { //设置线程池 bootstrap.group(worker); //设置socket工厂、 bootstrap.channel(NioSocketChannel.class); //设置管道 bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture connect = bootstrap.connect("127.0.0.1", 10101); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); while(true){ System.out.println("请输入:"); String msg = bufferedReader.readLine(); connect.channel().writeAndFlush(msg); } } catch (Exception e) { e.printStackTrace(); } finally{ worker.shutdownGracefully(); } }}
import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/** * 客户端消息处理 * * */public class ClientHandler extends SimpleChannelInboundHandler{ @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("客户端收到消息:"+msg); }}
MultClient
其中 synchronized(channel) 可以采用 单任务队列的方式
import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/** * 多连接客户端 * * */public class MultClient { /** * 服务类 */ private Bootstrap bootstrap = new Bootstrap(); /** * 会话 */ private Listchannels = new ArrayList<>(); /** * 引用计数 */ private final AtomicInteger index = new AtomicInteger(); /** * 初始化 * @param count */ public void init(int count){ //worker EventLoopGroup worker = new NioEventLoopGroup(); //设置线程池 bootstrap.group(worker); //设置socket工厂、 bootstrap.channel(NioSocketChannel.class); //设置管道 bootstrap.handler(new ChannelInitializer () { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); for(int i=1; i<=count; i++){ ChannelFuture future = bootstrap.connect("192.168.0.103", 10101); channels.add(future.channel()); } } /** * 获取会话 * @return */ public Channel nextChannel(){ return getFirstActiveChannel(0); } private Channel getFirstActiveChannel(int count){ Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size())); if(!channel.isActive()){ //重连 reconnect(channel); if(count >= channels.size()){ throw new RuntimeException("no can use channel"); } return getFirstActiveChannel(count + 1); } return channel; } /** * 重连 * @param channel */ private void reconnect(Channel channel){ synchronized(channel){ if(channels.indexOf(channel) == -1){ return ; } Channel newChannel = bootstrap.connect("192.168.0.103", 10101).channel(); channels.set(channels.indexOf(channel), newChannel); } }}
Start
import java.io.BufferedReader;import java.io.InputStreamReader;/** * 启动类 * * */public class Start { public static void main(String[] args) { MultClient client = new MultClient(); client.init(5); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); while(true){ try { System.out.println("请输入:"); String msg = bufferedReader.readLine(); client.nextChannel().writeAndFlush(msg); } catch (Exception e) { e.printStackTrace(); } } }}
转载地址:http://bclnn.baihongyu.com/