package com.warewms.hailiang.connect; import cn.hutool.extra.spring.SpringUtil; import com.github.rholder.retry.*; import com.warewms.hailiang.connect.base.TCPConnectBase; import com.warewms.hailiang.domain.Device; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * Created with IntelliJ IDEA. * * @author: liuzhifei * Date: 2023/8/9 * Time: 15:45 * To change this template use File | Settings | File Templates. * Description:倒角读码器连接 **/ @Slf4j public class CodeReader14Connect implements TCPConnectBase { private final String IP_ADDR = "172.20.27.14"; private final int PORT = 51236; private final String deviceName ="CodeReader14"; private boolean enable = false; private ChannelFuture future; private Bootstrap bootstrap; private EventLoopGroup group; private ChannelPipeline pipeline; @Override public void init() throws InterruptedException { if (enable) { log.info("ip:{},deviceName:{}正在进行连接", IP_ADDR, deviceName); group = new NioEventLoopGroup(); try { bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { pipeline = socketChannel.pipeline(); pipeline.addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelInactive(ChannelHandlerContext ctx) { SpringUtil.getApplicationContext().publishEvent(new Device(deviceName, "2")); log.error("设备:{}连接已断开!",deviceName); retry(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg; processMessages(String.valueOf(byteBuf.toString(CharsetUtil.UTF_8))); } }); } }); future = bootstrap.connect(IP_ADDR, PORT).sync(); future.addListener((channelFuture) -> { if (channelFuture.isSuccess()) { SpringUtil.getApplicationContext().publishEvent(new Device(deviceName, "1")); log.info("ip:{},deviceName:{}连接成功", IP_ADDR, deviceName); } else { log.info("ip:{},deviceName:{}连接失败等待重试!", IP_ADDR, deviceName); retry(); } }); } catch (Exception e) { log.error("ip:{},deviceName:{}连接异常,{},准备重试", IP_ADDR, deviceName, e.getMessage()); retry(); } } } @Override public void retry() { Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder() .retryIfResult(Boolean.FALSE::equals) .retryIfExceptionOfType(Exception.class) .withStopStrategy(StopStrategies.neverStop()) .withWaitStrategy(WaitStrategies.fixedWait(5, TimeUnit.SECONDS)) .build(); try { retryer.call(new Callable<Boolean>() { AtomicBoolean isSuccess = new AtomicBoolean(false); @Override public Boolean call() throws Exception { log.info("ip:{},deviceName:{}连接重试",IP_ADDR,deviceName); future = bootstrap.connect(IP_ADDR, PORT).sync(); future.addListener((channelFuture) -> { isSuccess.set(channelFuture.isSuccess()); if (channelFuture.isSuccess()) { SpringUtil.getApplicationContext().publishEvent(new Device(deviceName,"1")); log.info("ip:{},deviceName:{}连接成功",IP_ADDR,deviceName); }else { log.info("ip:{},deviceName:{}连接失败等待重试!",IP_ADDR,deviceName); } }); Thread.sleep(3000); return isSuccess.get(); } }); } catch (RetryException | ExecutionException e) { e.printStackTrace(); } } @Override public void close() { group.shutdownGracefully(); } @Override public String getDeviceName() { return deviceName; } @Override public ChannelPipeline getChannel() { return null; } @Override public void processMessages(String message) { } }