博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty5 服务端和客户端
阅读量:3727 次
发布时间:2019-05-22

本文共 7026 字,大约阅读时间需要 23 分钟。

Netty5 服务端和客户端

更多干货

概述

netty 5 已经放弃掉了,作为学习netty4和5的差别不大,本例子是基于netty5

线程安全

  • 一个thread + 队列 == 一个单线程线程池。线程安全的,任务是线性串行执行的

  • 线程安全,不会产生阻塞效应 ,使用对象组

  • 线程不安全,会产生阻塞效应, 使用对象池

Figure 1. 对象池
Figure 2. 对象组

writeAndFlush 发送消息线程安全

其中 EventExecutor executor 是 NioEventLoop 线程安全

public final class NioEventLoop extends SingleThreadEventLoop

代码

Server

/** * netty5服务端 * * */public class Server {	public static void main(String[] args) {		//服务类		ServerBootstrap bootstrap = new ServerBootstrap();		//boss和worker		EventLoopGroup boss = new NioEventLoopGroup();		EventLoopGroup worker = new NioEventLoopGroup();		try {			//设置线程池			bootstrap.group(boss, worker);			//设置socket工厂、			bootstrap.channel(NioServerSocketChannel.class);			//设置管道工厂			bootstrap.childHandler(new ChannelInitializer
() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ServerHandler()); } }); //netty3中对应设置如下 //bootstrap.setOption("backlog", 1024); //bootstrap.setOption("tcpNoDelay", true); //bootstrap.setOption("keepAlive", true); //设置参数,TCP参数 bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接 bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送 //绑定端口 ChannelFuture future = bootstrap.bind(10101); System.out.println("start"); //等待服务端关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally{ //释放资源 boss.shutdownGracefully(); worker.shutdownGracefully(); } }}

ServerHandler

import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/** * 服务端消息处理 * * */public class ServerHandler extends SimpleChannelInboundHandler
{ @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); ctx.channel().writeAndFlush("hi"); ctx.writeAndFlush("hi"); } /** * 新客户端接入 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); } /** * 客户端断开 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive"); } /** * 异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); }}

Client

import java.io.BufferedReader;import java.io.InputStreamReader;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/** * netty5的客户端 * * */public class Client {	public static void main(String[] args) {		//服务类		Bootstrap bootstrap = new Bootstrap();		//worker		EventLoopGroup worker = new NioEventLoopGroup();		try {			//设置线程池			bootstrap.group(worker);			//设置socket工厂、			bootstrap.channel(NioSocketChannel.class);			//设置管道			bootstrap.handler(new ChannelInitializer
() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture connect = bootstrap.connect("127.0.0.1", 10101); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); while(true){ System.out.println("请输入:"); String msg = bufferedReader.readLine(); connect.channel().writeAndFlush(msg); } } catch (Exception e) { e.printStackTrace(); } finally{ worker.shutdownGracefully(); } }}

ClientHandler

import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/** * 客户端消息处理 * * */public class ClientHandler extends SimpleChannelInboundHandler
{ @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("客户端收到消息:"+msg); }}

多连接客户端

MultClient

  • 其中 synchronized(channel) 可以采用 单任务队列的方式

import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/** * 多连接客户端 * * */public class MultClient {	/**	 * 服务类	 */	private Bootstrap bootstrap = new Bootstrap();	/**	 * 会话	 */	private List
channels = new ArrayList<>(); /** * 引用计数 */ private final AtomicInteger index = new AtomicInteger(); /** * 初始化 * @param count */ public void init(int count){ //worker EventLoopGroup worker = new NioEventLoopGroup(); //设置线程池 bootstrap.group(worker); //设置socket工厂、 bootstrap.channel(NioSocketChannel.class); //设置管道 bootstrap.handler(new ChannelInitializer
() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); for(int i=1; i<=count; i++){ ChannelFuture future = bootstrap.connect("192.168.0.103", 10101); channels.add(future.channel()); } } /** * 获取会话 * @return */ public Channel nextChannel(){ return getFirstActiveChannel(0); } private Channel getFirstActiveChannel(int count){ Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size())); if(!channel.isActive()){ //重连 reconnect(channel); if(count >= channels.size()){ throw new RuntimeException("no can use channel"); } return getFirstActiveChannel(count + 1); } return channel; } /** * 重连 * @param channel */ private void reconnect(Channel channel){ synchronized(channel){ if(channels.indexOf(channel) == -1){ return ; } Channel newChannel = bootstrap.connect("192.168.0.103", 10101).channel(); channels.set(channels.indexOf(channel), newChannel); } }}

Start

import java.io.BufferedReader;import java.io.InputStreamReader;/** * 启动类 * * */public class Start {	public static void main(String[] args) {		MultClient client = new MultClient();		client.init(5);		BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));		while(true){			try {				System.out.println("请输入:");				String msg = bufferedReader.readLine();				client.nextChannel().writeAndFlush(msg);			} catch (Exception e) {				e.printStackTrace();			}		}	}}

转载地址:http://bclnn.baihongyu.com/

你可能感兴趣的文章
Hadoop之zookeeper
查看>>
Linux shell编程
查看>>
线程--线程池
查看>>
内存存储 -- 大小端问题
查看>>
线程池C++面向对象封装
查看>>
opencv -- 基础
查看>>
1创建数据库
查看>>
狂神SSM数据库
查看>>
实现EasyExcel对Excel进行读操作(测试)
查看>>
课程分类添加功能
查看>>
教育项目课程模块2
查看>>
教育项目课程模块3(课程分类前端实现)
查看>>
课程管理需求+后端接口
查看>>
3搭建service模块
查看>>
4service_edu模块
查看>>
3整合swagger进行接口测试
查看>>
5使用R方法
查看>>
Memcached的安装与使用
查看>>
构建事实表
查看>>
java与javax的区别分析
查看>>