Skip to content

Instantly share code, notes, and snippets.

@earlwlkr
Created July 15, 2016 09:44
Show Gist options
  • Select an option

  • Save earlwlkr/f7287e44b9c023de92f440c14e3efbb8 to your computer and use it in GitHub Desktop.

Select an option

Save earlwlkr/f7287e44b9c023de92f440c14e3efbb8 to your computer and use it in GitHub Desktop.
package com.imt.streetlight.observer.modbus;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.imt.streetlight.common.enums.ConnectionProcessStatus;
import com.imt.streetlight.common.enums.MLCStatusEnum;
import com.imt.streetlight.common.utils.ByteUtils;
import com.imt.streetlight.common.utils.ConfigUtils;
import com.imt.streetlight.observer.modbus.request.AbstractReceiveAction;
import com.imt.streetlight.observer.modbus.request.MLCResponse;
import com.imt.streetlight.observer.modbus.request.SLMSRequest;
/**
* Busy sending data thread
* @author phuongnh
*
*/
public class SendingThread implements Runnable
{
/**
* Logger
*/
private static final Logger LOGGER = LoggerFactory.getLogger(SendingThread.class);
/**
* Sleep time to wait for sending next message.
*/
public static int WAIT_BETWEEN_REQUEST_MSECONDS = 50;
/**
* Time sent a last message.
*/
private long lastTimeSentMessage;
// private int countSentMessage;
/**
* Last time heartbeat message sent
*/
// private long lastTimeHeartbeatRequest = 0;
/**
* Number count
*/
// private int countSending = 0;
/**
* Transaction Id sent.
*/
private int transactionIdSent;
/**
* Map waiting
*/
public final Map<Integer, Integer> mapWaiting = new HashMap<Integer, Integer>();
/**
* If thread stop
*/
private boolean run = true;
/* public boolean isRun()
{
return run;
}
*/
public void setRun(boolean run)
{
this.run = run;
}
/**
* Output data stream
*/
private DataOutputStream outToServer;
/**
* Modbus connection attached
*/
private ModbusConnection connection;
/**
* Default constructor
* @param connection
* @param outToServer
* @throws Exception
* @throws FileNotFoundException
*/
public SendingThread(ModbusConnection connection, DataOutputStream outToServer)
{
this.connection = connection;
this.outToServer = outToServer;
}
/**
* With MLC is registered, OBServer can sent out message.
* @throws IOException
* @throws InterruptedException
*/
private void processSLMSRequest() throws IOException, InterruptedException
{
if (!this.connection.getPendingSLMSRequest().isEmpty()) {
//Poll from queue.
SLMSRequest request = this.connection.getPendingSLMSRequest().poll();
lastTimeSentMessage = System.currentTimeMillis();
// countSentMessage = 0;
try
{
//Set status for connection.
//updateStatusWhenSentSLMSRequest();
this.connection.increaseTotalmessage();
outToServer.write(request.getRawData());
//there might be race between packet send and ack processing setup,
//but the occurence of this race should be very small
request.setSentTime(new Date(lastTimeSentMessage));
// this.connection.setWaitForResponseResquest(request);
this.connection.getMapSLMSRequest().put(
request.getHeader().getTransactionId(),
request);
LOGGER.debug(
String.format(
"Sent: Connection: [{%s}], time: [{%s}], message: [{%s}]",
this.connection.getConnectionId(),
(new SimpleDateFormat("dd:MM:yyyy HH:mm:ss")).format(request.getSentTime()),
request.toString()));
LOGGER.debug(
String.format(
"Put request [TransactionId: %d] to List request [Count: %d]",
request.getHeader().getTransactionId(),
this.connection.getMapSLMSRequest().size()));
}
catch(IOException ex)
{
this.connection.dispatchRequestToTop(request);
LOGGER.error("processSentData: Write data to Socket error", ex);
}
}
if ((this.connection.getPendingSLMSRequest().isEmpty()) &&
(this.connection.getMlcStatus() == MLCStatusEnum.BOOTING))
{
this.connection.releaseOperationFlag();
this.connection.setMlcStatus(MLCStatusEnum.READY);
}
}
private class HearbeartReceiveAction extends AbstractReceiveAction
{
private ModbusConnection connection = null;
public HearbeartReceiveAction(SLMSRequest request, ModbusConnection connection)
{
super(request);
this.connection = connection;
}
@Override
public void takeAction()
{
try
{
//Parse MLC from byte to int32.
long mlcId = ByteUtils.toUnsignedLong32(this.getRequest().getResponse().getFuncData().getRawData(), 1);
LOGGER.debug("Receive keepalive MLC Id: {} ", mlcId);
this.connection.setInHeartBeart(false);
} catch(Exception ex)
{
LOGGER.error("Unexpected error", ex);
}
}
}
private void sendHeartbeatMessage() throws IOException
{
SLMSRequest LastHeartBeatRequest = this.connection.getInputRegisterHandler().getMasterLightControllerID();
try
{
this.outToServer.write(LastHeartBeatRequest.getRawData());
//updateStatusWhenSentSLMSRequest();
this.connection.increaseTotalmessage();
this.connection.setHeartBeatTransaction(LastHeartBeatRequest.getHeader().getTransactionId());
this.connection.setInHeartBeart(true);
LastHeartBeatRequest.setSentTime(new Date(System.currentTimeMillis()));
LastHeartBeatRequest.setAction(
new HearbeartReceiveAction(LastHeartBeatRequest,
this.connection));
this.connection.getMapSLMSRequest().put(
LastHeartBeatRequest.getHeader().getTransactionId(),
LastHeartBeatRequest);
LOGGER.debug(
String.format(
"Sent: Connection: [{%s}] time:[{%s}], message:[{%s}]",
this.connection.getConnectionId(),
(new SimpleDateFormat("dd:MM:yyyy HH:mm:ss")).format(new Date()),
LastHeartBeatRequest.toString()));
}
catch(Exception e)
{
LOGGER.error("Unexpected error", e);
}
}
private boolean processMLCResponse() throws IOException, InterruptedException
{
if (!this.connection.getPendingMLCResponse().isEmpty())
{
//Poll from queue.
MLCResponse mlcResponse = this.connection.getPendingMLCResponse().poll();
long timeSentMessage = System.currentTimeMillis();
try
{
outToServer.write(mlcResponse.getRawData());
Thread.sleep(50);
mlcResponse.setLastTimeSentMessage(new Date(timeSentMessage));
LOGGER.debug(
String.format(
"Sent: Connection:[{%s}], time:[{%s}], message:[{%s}]",
this.connection.getConnectionId(),
(new SimpleDateFormat("dd:MM:yyyy HH:mm:ss")).format(mlcResponse.getLastTimeSentMessage()),
mlcResponse.toString()));
}
catch(IOException ex)
{
LOGGER.error("processMLCResponse: Write data to Socket error", ex);
}
return true;
}
return false;
}
/**
* Active thread
*/
public void run()
{
int count = 0;
try
{
LOGGER.debug("Start send data");
this.connection.setFLagAllowAllMessage(true);
while(run)
{
// wait between process
Thread.sleep(WAIT_BETWEEN_REQUEST_MSECONDS);
processMLCResponse();
//check lastime operation if more than 2 minutes sent heartbeat
if((System.currentTimeMillis() - this.connection.getLastTimeOperation() > 120 * 1000) && !this.connection.isInHeartBeart() ){
sendHeartbeatMessage();
}
if (this.connection.isConnectionRegistered()){
if(count == 0){
this.processSLMSRequest();
}
}
count ++ ;
if(count * WAIT_BETWEEN_REQUEST_MSECONDS >= Integer.valueOf(ConfigUtils.getInstance().getTimewaitBetweenMessage())){
count = 0;
}
}
LOGGER.debug("End sending data");
}
catch (IOException e)
{
LOGGER.error("failed: IOException", e);
LOGGER.debug("End sending data");
}
catch (InterruptedException e)
{
LOGGER.error("failed: InterruptedException", e);
LOGGER.debug("End sending data");
}
catch (Exception e)
{
LOGGER.error("failed: InterruptedException", e);
LOGGER.debug("End sending data");
}
//move all offline processing to here to prevent race situation
this.connection.changeStatusOfflineDpsListByMlcId(this.connection.getMlcRegisteredId());
this.connection.setMlcStatus(MLCStatusEnum.OFFLINE);
this.connection.setConnectionRegistered(false);
this.connection.setStatus(ConnectionProcessStatus.INIT);
//clear pending request, let start a new in the new connection
this.connection.getMapSLMSRequest().clear();
this.connection.getPendingSLMSRequest().clear();
this.connection.getPendingMLCResponse().clear();
this.connection.getMapMLCReResponeUpdate().clear();
}
public int getCurrentTransactionId()
{
return transactionIdSent;
}
public void resetCountSending()
{
// this.countSending = 0;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment