¿Cuál es el marco para el servidor de programación de redes de alta concurrencia con sockets?
netty;
PayServer.java
paquete com.miri.pay.scoket;
importar io.netty.bootstrap.ServerBootstrap ;
importar io.netty.channel.ChannelFuture;
importar io.netty.channel.ChannelInitializer;
importar io.netty.channel.ChannelOption;
importar io.netty.channel.ChannelPipeline;
importar io.netty.channel.EventLoopGroup;
importar io.netty.channel.nio.NioEventLoopGroup;
importar io.netty.channel.socket.SocketChannel;
importar io.netty.channel.socket.nio.NioServerSocketChannel;
importar org.slf4j.Logger ;
importar org.slf4j.LoggerFactory;
la clase pública PayServer implementa Runnable
{
Logger final estático privado DLOG = LoggerFactory. getLogger(PayServer.class);
puerto int final privado;
PayServer público(puerto int)
{
this.port = puerto;
}
/**
* Vincular el puerto especificado a ServerBootstrap
*/
public void run()
{
// Se utiliza para recibir solicitudes de conexión entrantes
final EventLoopGroup bossGroup = new NioEventLoopGroup();
// Se utiliza para procesar la información en la conexión aceptada por el jefe y registrada para el trabajador
final EventLoopGroup trabajadorGroup = new NioEventLoopGroup();
intentar
{
// Configurar el servidor
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, WorkersGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.opción
(ChannelOption.SO_BACKLOG, 128);
// Desactiva Nagle a través de NoDelay para que el mensaje se envíe inmediatamente sin esperar una cierta cantidad de datos.
bootstrap.option(ChannelOption . TCP_NODELAY, true);
// Mantiene el estado de conexión larga
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// CustomChannelInitializer es un controlador especial , utilizado para configurar cómodamente la implementación del controlador definido por el usuario
bootstrap.childHandler(new CustomChannelInitializer());
// Vincula y comienza a aceptar conexiones entrantes
final ChannelFuture futuro = bootstrap.bind(this.port).sync();
if (future.isSuccess())
{
PayServer.DLOG. info("Iniciar el servidor de socket {} éxito", this.port);
}
else
{
p>
PayServer.DLOG.info("Error al iniciar el servidor de socket {}, ¡salida del sistema!", this.port);
throw new RuntimeException("El servidor de socket no pudo iniciarse"); p>
}
// Espera a que se cierre el socket del servidor
// Cierra el servidor
Future.channel().closeFuture() .sync();
}
catch (final InterruptedException e)
{
PayServer.DLOG.error("Cerrar el Se produce una excepción en el servidor de socket, ¡salida del sistema!", e);
lanza una nueva RuntimeException ("Error al cerrar el servidor de socket");
}
finalmente
{
// Cerrar todos los subprocesos de terminación del bucle de eventos
bossGroup.shutdownGracefully();
trabajadorGroup.shutdownGracefully();
p>
}
}
/**
* Clase interna especial
* lt ;pgt;
p>* es un controlador especial, que se utiliza para configurar convenientemente implementaciones de controladores definidos por el usuario
* @author xulonghui
*/
clase estática
s CustomChannelInitializer extiende ChannelInitializerlt;
{
@Override
protected void initChannel(SocketChannel ch) lanza una excepción
{ p>
final ChannelPipeline p = ch.pipeline();
p.addLast(new PayMessageEncoder());
p.addLast(new PayMessageDecoder());
p.addLast(new PayServerHandler());
}
}
}
PayMessageEncoder.java
paquete com.miri.pay.scoket;
importar io.netty.buffer.ByteBuf;
importar io.netty.channel.ChannelHandlerContext; p>
p>
importar io.netty.handler.codec.MessageToByteEncoder;
importar io.netty.util.CharsetUtil;
importar com.miri.pay .model.CommonResponse;
importar com.miri.pay.utils.JsonUtil;
/**
*Codificador de mensajes
*lt;pgt ;
* Codificar mensajes enviados desde el servidor
*/
la clase pública PayMessageEncoder extiende MessageToByteEncoderlt CommonResponsegt;
{
@Override
codificación vacía protegida (ChannelHandlerContext ctx, CommonResponse rsp, ByteBuf out) arroja una excepción
{
if (rsp != null)
{
objeto final msgContent = rsp.getMsgContent();
// La longitud total del ID del mensaje, el ID de secuencia y el ID de entidad es 12
int msgLen = 12;
byte[] contentbs = nuevo byte[] {}
if (msgContent != null)
{
contenido de cadena final = JsonUtil.bean2json(msgContent);
contentbs = content.getBytes(CharsetUtil);
.UTF_8);
final int cl = contentbs.length;
msgLen = cl
}
out.writeInt(msgLen); ); // Escribe la longitud total del mensaje actual
out.writeInt(rsp.getMsgId() // Escribe el ID del mensaje actual
out.writeInt); (rsp .getSequenceId()); // Escribe el SequenceId del mensaje actual
out.writeInt(rsp.getEntityId()); // Escribe el EntityId del mensaje actual
// Escribir Ingresa el contenido del cuerpo del mensaje
if (contentbs.length gt; 0)
{
out.writeBytes(contentbs);
}
}
}
}
PayMessageDecoder.java
paquete com.miri .pay.scoket;
importar io.netty.buffer.ByteBuf;
importar io.netty.channel.ChannelHandlerContext;
importar io.netty.handler .codec.ByteToMessageDecoder;
importar io.netty.util.CharsetUtil;
importar java.util.List;
importar org.slf4j.Logger;
importar org.slf4j.LoggerFactory;
importar com.miri.pay.constants.Constants;
importar com.miri.pay.model.CommonRequest;
importar com.miri.pay.utils.ByteUtil;
/**
* Decodificador de mensajes
* lt;pgt;
* Decodificar el mensaje solicitado al cliente
*/
clase pública PayMessageDecoder extiende ByteToMessageDecoder
{
registrador final estático privado DLOG = LoggerFactory.getLogger(PayMessageDecoder.class);
/**
* El número de bytes que indica la longitud del encabezado
*/
private static final int HEAD_LENGTH = 4;
/**
* El número de bytes al que pertenecen todas las cadenas de ID
*/
int final estático privado ID_STR_LENGTH = 12
/**
*;
Número de bytes que pertenecen a un único ID
*/
private static final int SINGLE_ID_LENGTH = 4
@Override
protected void; decodificar (ChannelHandlerContext ctx, ByteBuf in, Listlt; Objectgt; out) arroja una excepción
{
int readable = in.readableBytes();
if (legible lt ; PayMessageDecoder.HEAD_LENGTH)
{
return
}
in.markReaderIndex() // Marquemos el actual La posición de readIndex
final int dataLength = in.readInt() // Lee la longitud del mensaje transmitido. El método readInt() de ByteBuf aumentará su readIndex en 4
if (dataLength lt; 0)
{
// El cuerpo del mensaje que leemos La longitud es 0. Esta es una situación que no debería ocurrir. Si esto sucede aquí, cierre la conexión.
ctx.close();
}
legible = in.readableBytes();
if (legible lt; longitud de datos)
{
// Si la longitud del cuerpo del mensaje leído es menor que la longitud del mensaje que enviamos, resetReaderIndex Esto se usa junto con markReaderIndex.
Restablecer readIndex para marcar
in.resetReaderIndex();
return
}
byte final[] cuerpo = nuevo byte[; dataLength];
in.readBytes(body);
// Determinar si el contenido ha sido leído
final int length = body.length ; p>
if (length == 0)
{
return; // Si no se lee ningún contenido, ignórelo
}
out.add(this.byte2req(body));
}
/**
* Leerá Convertir los datos del byte en el objeto de solicitud
* @param body
* @return
* @throws Excepción
*/ p>
private CommonRequest byte2req(byte[] cuerpo) lanza una excepción
{
final CommonRequest req = new CommonRequest(Constants.INVALID_MSGID);
final int length = cuerpo .length;
// Si la longitud de la matriz de contenido es menor o igual a 12, significa que el contenido del cuerpo del mensaje está vacío y se devuelve directamente un mensaje no válido
if (length lt; PayMessageDecoder.ID_STR_LENGTH)
{
PayMessageDecoder.DLOG
.info("El cliente envía el mensaje con una longitud de: {}, es un mensaje no válido, devuelve directamente un msgId = {} requestentity",
length, Constants.INVALID_MSGID);
return req;
}
// Obtener ID del mensaje
byte final[] mbs = nuevo byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.arraycopy(body, 0, mbs, 0, PayMessageDecoder .SINGLE_ID_LENGTH );
final int msgId = ByteUtil.byte4toint(mbs);
req.setMsgId(msgId);
// Obtener secuenciaId
byte final[] sbs = nuevo byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.arraycopy(body, 4, sbs, 0,
PayMessageDecoder.SINGLE_ID_LENGTH);
final int secuenciaId = ByteUtil.byte4toint(sbs);
req.setSequenceId(sequenceId);
// Obtener entidadId p>
byte final[] ebs = nuevo byte[PayMessageDecoder.SINGLE_ID_LENGTH];
System.arraycopy(body, 8, ebs, 0, PayMessageDecoder.SINGLE_ID_LENGTH);
final int entidadId = ByteUtil.byte4toint(ebs);
req.setEntityId(entityId);
// Obtener el contenido del cuerpo del mensaje
if (length gt ; PayMessageDecoder.ID_STR_LENGTH)
{
final int contentLen = longitud - PayMessageDecoder.ID_STR_LENGTH;
final byte[] contentbs = nuevo byte[contentLen];
p>
System.arraycopy(body, 12, contentbs, 0, contentLen
contenido final de cadena = new String(contentbs, CharsetUtil.UTF_8); >
req.setMsgContent(contenido);
}
requisito de devolución;
}
}
PayServerHandler.java
paquete com.miri.pay.scoket;
importar io.netty.channel.Channel;
importar io.netty.channel .ChannelHandlerContext;
importar io.netty.channel.ChannelInboundHandlerAdapter;
importar io.netty.util.ReferenceCountUtil;
importar java.util.HashMap;
importar java.util.Map;
importar org.slf4j.Logger;
importar org.slf4j.LoggerFactory;
importar com .miri.pay.MessageQueue ;
importar com.miri.pay.model.CommonRequest;
importar com.miri.pay.model.PendingBean;
/**
* Procesador de servidor de socket
*/<
/p>
La clase pública PayServerHandler extiende ChannelInboundHandlerAdapter
{
Registrador final estático privado DLOG = LoggerFactory.getLogger(PayServerHandler.class);
/* *
* Número de pedido externo-canal
*/
maplt final estático público; String, Channelgt; new HashMaplt; ;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) arroja una excepción
{
intentar
{
PayServerHandler.DLOG.info("El mensaje enviado por el cliente es: {}", msg
solicitud final de CommonRequest = (CommonRequest) msg;
bean PendingBean final = nuevo PendingBean(ctx.channel(), request
MessageQueue.offer(bean);
}
finalmente
{
ReferenceCountUtil.release(msg
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) arroja una excepción
{
ctx.flush();
}
@Override
public void channelActive(ChannelHandlerContext ctx) arroja una excepción
{
super.channelActive(ctx);
canal final canal = ctx .channel();
PayServerHandler.DLOG.info("Formulario activo del cliente {}", canal.remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) arroja una excepción
{
super.channelInactive(ctx);
canal final canal = ctx .ch
canal();
PayServerHandler.DLOG.info("Cliente inactivo formulario {}", canal.remoteAddress());
}
@Override
excepción public voidCaught(ChannelHandlerContext ctx, causa arrojable) lanza una excepción
{
PayServerHandler.DLOG.error("Excepción del sistema", causa);
ctx.close();
}
}