注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

流星永恒的博客

JSF,Facelets,Rich(Prime)Faces,和java的笔记

 
 
 

日志

 
 

Implement a heart beat ping using netty4  

2012-11-29 16:32:41|  分类: java |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
Server part:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 
 * @author Daniel.Yang
 */
public class Server {
    public static void main(String[] args) throws InterruptedException {
        int port = 8443;
        ServerBootstrap b = new ServerBootstrap();
        try {
            b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
                    .channel(NioServerSocketChannel.class)
                    .localAddress(port)
                    .childHandler(new DiscardServerInitializer());
            b.bind().sync().channel().closeFuture().sync();
        } finally {
            b.shutdown();
        }
    }
}

ServerInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;

/**
*
* @author Daniel.Yang
*/
public class ServerInitializer extends ChannelInitializer<SocketChannel> {

public ServerInitializer() {
}

@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Read SecureChatSslContextFactory
pipeline.addLast("ping", new IdleStateHandler(5, 5, 8, TimeUnit.SECONDS));
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// and then business logic.
pipeline.addLast("handler", new DiscardServerHandler());
}
}

ServerHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.net.InetAddress;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
*
* @author Daniel.Yang
*/
public class ServerHandler extends ChannelInboundMessageHandlerAdapter<String> {

private static final Logger logger = Logger.getLogger(
ServerHandler.class.getName());
static final ChannelGroup channels = new DefaultChannelGroup();
private long lastping = 0;

@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
// Once session is secured, send a greeting and register the channel to the global channel
// list so the channel received the messages from others.
ctx.write(
"Welcome to " + InetAddress.getLocalHost().getHostName()
+ " Server!\n");
channels.add(ctx.channel());
DataServer ds = new DataServer(ctx.channel());
Thread t = new Thread(ds);
t.start();

}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
System.out.println("READER_IDLE");
if (lastping != 0L) {
long time = (System.currentTimeMillis() - lastping) / 1000;
System.out.println("Time : " + time);
if (time > 3) {
System.err.println("No heart beat received in 3 seconds, close channel.");
channels.remove(ctx.channel());
ctx.close();
}
}
} else if (event.state().equals(IdleState.WRITER_IDLE)) {
System.out.println("WRITER_IDLE");
} else if (event.state().equals(IdleState.ALL_IDLE)) {
System.out.println("ALL_IDLE");
if (lastping == 0L) {
lastping = System.currentTimeMillis();
}
ctx.channel().write("ping\n");
}
}
super.userEventTriggered(ctx, evt); //To change body of generated methods, choose Tools | Templates.
}

@Override
public void messageReceived(ChannelHandlerContext ctx, String request) throws Exception {
// Send the received message to all channels but the current one.
System.out.println(request);
if (lastping != 0L) {
long time = (System.currentTimeMillis() - lastping) / 1000;
System.out.println("time : " + time);
if (time > 3) { // after 3s then close the connection;
ctx.close();
} else {
if ("OK".equals(request)) {
lastping = 0L;
}
}
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.", cause);
ctx.close();
}
}

DataServer

import io.netty.channel.Channel;

/**
*
* @author Daniel.Yang
*/
public class DataServer implements Runnable {

private Channel channel;
private boolean stop = false;

public DataServer(Channel channel) {
this.channel = channel;
}

public void run() {
System.out.println("Data Server started...");
while (!stop) {
try {
if (!channel.isOpen()) {
System.out.println("Channel is not actived, close the Data Server.");
stop = true;
} else {
System.out.println("Data server writing...");
channel.write("Data transfering!!\n");
Thread.sleep(Math.round(Math.random()*20)*1000); //sleeping some time
}
} catch (Exception e) {
stop = true;
e.printStackTrace();
}
}
}
}

Client part:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
*
* @author Daniel.Yang
*/
public class Client {

public static void main(String[] args) throws InterruptedException {
final String host = "localhost";
final int port = 8443;
Bootstrap b = new Bootstrap();
try {
b.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// On top of the SSL handler, add the text line codec.
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// and then business logic.
pipeline.addLast("handler", new ClientHandler());
}
});
// Read commands from the stdin.
ChannelFuture f = b.connect().sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// The connection is closed automatically on shutdown.
b.shutdown();
}
}
}

ClientHandler

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
*
* @author Daniel.Yang
*/
@ChannelHandler.Sharable
public class ClientHandler extends ChannelInboundMessageHandlerAdapter<String> {

private static final Logger logger = Logger.getLogger(
ClientHandler.class.getName());

@Override
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.err.println(msg);
if ("ping".equals(msg)) {
ctx.channel().write("OK\n");
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx); //To change body of generated methods, choose Tools | Templates.
System.out.println("Disconnected!!!");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.", cause);
ctx.close();
}
}


  评论这张
 
阅读(4421)| 评论(1)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017