请选择 进入手机版 | 继续访问电脑版
搜索
房产
装修
汽车
婚嫁
健康
理财
旅游
美食
跳蚤
二手房
租房
招聘
二手车
教育
茶座
我要买房
买东西
装修家居
交友
职场
生活
网购
亲子
情感
龙城车友
找美食
谈婚论嫁
美女
兴趣
八卦
宠物
手机

Netty+Websocket 聊天、推送(实战)

[复制链接]
查看: 78|回复: 0

2万

主题

2万

帖子

7万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
74933
发表于 2020-9-15 15:08 | 显示全部楼层 |阅读模式
疯狂创客圈 Java 高并发【 亿级流量聊天室实战】实战系列 【博客园总入口
架构师成长+面试必备之 高并发基础书籍 【Netty Zookeeper Redis 高并发实战
什么是Netty ?

Netty是由Jboss提供的一款著名的开源框架,常用于搭建 RPC中的TCP服务器、Websocket服务器,甚至是类似Tomcat的Web服务器,反正就是各种网络服务器,在处理高并发的项目中,有奇用!功能丰富且性能良好,基于Java中NIO的二次封装,具有比原生NIO更好更稳健的体验。
关于Netty 原理,请参见 《Netty Zookeeper Redis 高并发实战》  一书
为什么要使用 Netty 替代 Tomcat?

很多项目,都需要基于  Websocket 协议做在线客服、在线推送、在线聊天,虽然 Tomcat 内置支持  Websocket  协议,但是由于 Tomcat  的吞吐量、连接数都很低,作为测试是可以的。在生产环境,一定需要使用高吞吐量、高连接数的 Netty 服务器进行替代
我的关键词 Netty+Websocket 聊天、推送(实战)  新闻咨询

之所以 Netty 性能高,因为其使用的是 Reactor 反应器模式。关于反应器模式原理,请参见 《Netty Zookeeper Redis 高并发实战》  一书。
Netty+WS 在线聊天(在线推送)功能演示

聊天过程gif 演示:
我的关键词 Netty+Websocket 聊天、推送(实战)  新闻咨询

聊天示意图:
我的关键词 Netty+Websocket 聊天、推送(实战)  新闻咨询 20200421184453380

Springboot+Netty 项目结构

我的关键词 Netty+Websocket 聊天、推送(实战)  新闻咨询 20200421184655623

Netty  服务启动

Netty搭建的服务器基本上都是差不多的写法:
绑定主线程组和工作线程组,这部分对应架构图中的事件循环组。其原理,,请参见 《Netty Zookeeper Redis 高并发实战》  一书。
重点就是ChannelInitializer的配置,以异步的方式启动,最后是结束的时候关闭线程组。
  1.     /**     * 启动即时通讯服务器     */    public void start()    {        final WebSocketServer webSocketServer = new WebSocketServer();        ChannelFuture channelFuture = null;        ServerBootstrap bootstrap = new ServerBootstrap();        bootstrap.group(group)                .channel(NioServerSocketChannel.class)                .childHandler(new ChatServerInitializer());        InetSocketAddress address = new InetSocketAddress(9999);        channelFuture = bootstrap.bind(address);        channelFuture.syncUninterruptibly();        channel = channelFuture.channel();        // 返回与当前Java应用程序关联的运行时对象        Runtime.getRuntime().addShutdownHook(new Thread()        {            @Override            public void run()            {                webSocketServer.stop();            }        });        channelFuture.channel().closeFuture().syncUninterruptibly();    }    /**     * 内部类     */    class ChatServerInitializer extends ChannelInitializer    {        private static final int READ_IDLE_TIME_OUT = 60; // 读超时  s        private static final int WRITE_IDLE_TIME_OUT = 0;// 写超时        private static final int ALL_IDLE_TIME_OUT = 0; // 所有超时        @Override        protected void initChannel(Channel ch) throws Exception        {            ChannelPipeline pipeline = ch.pipeline();            // HTTP请求的解码和编码            pipeline.addLast(new HttpServerCodec());            // 主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的; 增加之后就不用考虑这个问题了            pipeline.addLast(new ChunkedWriteHandler());            // 把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse,            // 原因是HTTP解码器会在每个HTTP消息中生成多个消息对象HttpRequest/HttpResponse,HttpContent,LastHttpContent            pipeline.addLast(new HttpObjectAggregator(64 * 1024));            // WebSocket数据压缩            pipeline.addLast(new WebSocketServerCompressionHandler());            // 协议包长度限制            pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 10 * 1024));            // //当连接在60秒内没有接收到消息时,进会触发一个 IdleStateEvent 事件,被 HeartbeatHandler 的 userEventTriggered 方法处理            pipeline.addLast(new IdleStateHandler(READ_IDLE_TIME_OUT, WRITE_IDLE_TIME_OUT, ALL_IDLE_TIME_OUT, TimeUnit.SECONDS));            pipeline.addLast(new TextWebSocketFrameHandler());        }    }
复制代码
报文处理器
  1. /** * Created by 尼恩 @ 疯狂创客圈 *
  2. * WebSocket 帧:WebSocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧 */@Slf4jpublic class TextWebSocketFrameHandler extends SimpleChannelInboundHandler{    @Override    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception    {        //增加消息的引用计数(保留消息),并将他写到 ChannelGroup 中所有已经连接的客户端        ServerSession session = ServerSession.getSession(ctx);        Map result = ChatProcesser.inst().onMessage(msg.text(), session);        if (result != null && null!=result.get("type"))        {            switch (result.get("type"))            {                case "msg":                    SessionMap.inst().sendToOthers(result, session);                    break;                case "init":                    SessionMap.inst().addSession(result, session);                    break;            }        }    }    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception    {        //是否握手成功,升级为 Websocket 协议        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE)        {            // 握手成功,移除 HttpRequestHandler,因此将不会接收到任何消息            // 并把握手成功的 Channel 加入到 ChannelGroup 中            ServerSession session = new ServerSession(ctx.channel());            String echo = ChatProcesser.inst().onOpen(session);            SessionMap.inst().sendMsg(ctx, echo);        } else if (evt instanceof IdleStateEvent)        {            IdleStateEvent stateEvent = (IdleStateEvent) evt;            if (stateEvent.state() == IdleState.READER_IDLE)            {                ServerSession session = ServerSession.getSession(ctx);                SessionMap.inst().remove(session);                session.processError(null);            }        } else        {            super.userEventTriggered(ctx, evt);        }    }}
复制代码
业务处理器

下面是用websocket做聊天室的逻辑:

  • 使用 Json  传递实体消息;
  • ServerSession 存储了每个会话,保存对 Channel和 User,使用User 表示连接上来用户
  • 前端要求填入用户和房间(群组)后,模拟登录,并返回用户列表。进入后可以发送群组消息。
  1. package com.crazymaker.websocket.processer;import com.crazymaker.websocket.Model.User;import com.crazymaker.websocket.session.ServerSession;import com.crazymaker.websocket.session.SessionMap;import com.crazymaker.websocket.util.JsonUtil;import com.google.gson.reflect.TypeToken;import lombok.extern.slf4j.Slf4j;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.HashMap;import java.util.Map;/** * 业务处理器 * Created by 尼恩 @ 疯狂创客圈 */@Slf4jpublic class ChatProcesser{    private static final Logger logger = LoggerFactory.getLogger(ChatProcesser.class);    /**     * 单例     */    private static ChatProcesser singleInstance = new ChatProcesser();    public static ChatProcesser inst()    {        return singleInstance;    }    /**     * 连接建立成功调用的方法     *     * @param s 会话     */    public String onOpen(ServerSession s) throws IOException    {        Map result = new HashMap();        result.put("type", "bing");        result.put("sendUser", "系统消息");        result.put("id", s.getId());        String json = JsonUtil.pojoToJson(result);        return json;    }    /**     * 连接关闭调用的方法     */    public String onClose(ServerSession s)    {        User user = s.getUser();        if (user != null)        {            String nick = user.getNickname();            Map result = new HashMap();            result.put("type", "init");            result.put("msg", nick + "离开房间");            result.put("sendUser", "系统消息");            String json = JsonUtil.pojoToJson(result);            return json;        }        return null;    }    /**     * 收到客户端消息后调用的方法     *     * @param message 消息内容     * @param session 会哈     */    public Map onMessage(String message, ServerSession session)    {        TypeToken typeToken = new TypeToken()        {        };        Map map = JsonUtil.jsonToPojo(message, typeToken);        Map result = new HashMap();        User user = null;        switch (map.get("type"))        {            case "msg":                user = session.getUser();                result.put("type", "msg");                result.put("msg", map.get("msg"));                result.put("sendUser", user.getNickname());                break;            case "init":                String room = map.get("room");                session.setGroup(room);                String nick = map.get("nick");                user = new User(session.getId(), nick);                session.setUser(user);                result.put("type", "init");                result.put("msg", nick + "成功加入房间");                result.put("sendUser", "系统消息");                break;            case "ping":                break;        }        return result;    }    /**     * 连接发生错误时的调用方法     *     * @param session 会话     * @param error   异常     */    public String onError(ServerSession session, Throwable error)    {        //捕捉异常信息        if (null != error)        {            log.error(error.getMessage());        }        User user = session.getUser();        if (user == null)        {            return null;        }        String nick = user.getNickname();        Map result = new HashMap();        result.put("type", "init");        result.put("msg", nick + "离开房间");        result.put("sendUser", "系统消息");        String json = JsonUtil.pojoToJson(result);        return json;    }}
复制代码
源码网址: Java 高并发研习社群博客园 总入口
疯狂创客圈   经典图书 : 《Netty Zookeeper Redis 高并发实战》   面试必备 +  面试必备 + 面试必备
我的关键词 Netty+Websocket 聊天、推送(实战)  新闻咨询


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

技术支持:迪恩网络科技公司  Powered by Discuz! X3.2
快速回复 返回顶部 返回列表