/** * Jeff Rupp * Master's Thesis * 2005 * * The crossbow specs for mote transmit power consumption are: * for 433 MHz * Pout = -20 dBm I = 5.3 mA * Pout = -5 dBm I = 8.9 mA * Pout = 0 dBm I = 10.4 mA * Pout = 5 dBm I = 14.8 mA * Pout = 10 dBm I = 26.7 mA * * Conversion from watts to dBm: * dBm = 10 log [Signal (mW)/1mW] * * 2 AA alkalines are worth about 4.5 watt-hours * http://www.powerstream.com/Compare.htm * watch batteries have approx. 10 joule 1 watt-hour == 3600 Joule., so 0.0028 watt-hour * */ package jdr.mobisim; import java.util.*; import org.apache.log4j.*; import jdr.utils.*; import java.awt.*; public class MobileNode implements EventCallbackIF, NodeIF { private static final boolean DO_LOADS_OF_LOGS = false; private static final boolean DO_SCHEDULER_LOGS = false; private static final boolean DO_PROCESS_PACKET_LOGS = false; private static final boolean DO_CLUSTER_FORM_LOGS = false; private static final boolean DO_SENSOR_DATA_LOGS = false; private static final boolean DO_MESSAGE_PROPAGATE_LOGS = false; private static final boolean DO_CHANGE_CLUSTER_HEAD_LOGS = false; private static Logger m_logger = Logger.getLogger(MobileNode.class); public EDU.oswego.cs.dl.util.concurrent.Mutex m_busyLock = new EDU.oswego.cs.dl.util.concurrent.Mutex(); public static final double MIN_TRANSMIT_POWER_DBM = -25.0; public static final double MAX_TRANSMIT_POWER_DBM = 10.0; private static final int s_ticsConsumedByHello = 5; private static final int s_ticsConsumedTellHead = 2; private static final int s_ticsConsumedRejectHead = 5; private static final int s_ticsConsumedByChooseHead = 15; private static final int s_ticsConsumedRecordHelloPacket = 2; private static final int s_ticsConsumedBySendData = 25; private static final int s_ticsConsumedByCheckTired = 40; private static final int s_ticsConsumedBySendTired = 40; private static final int s_ticsConsumedBySendChildPower = 20; private static final int s_ticsConsumedBySelectNewHead = 60; private static final int s_ticsConsumedByPropagate = 5; private static final int s_ticsConsumedByJoinCluster = 14; private static final int s_RETRY_TRANSMIT_SLEEP_TICS = 35; private static final int s_ticsDelayBetweenTellFormOwnCluster = 500; // set a time at which we'll try to form our own cluster, > 0 means want to form private long m_whenToFormOwnCluster = -1; private double m_currentPowerdBm; // the location is used in sending data via the propagation model, so that it knows // where the message came from and thus what other nodes can hear it private FloatPoint m_location = new FloatPoint(0,0); private static int s_nodeCount = 0; private int m_myNodeNumber = 0; private boolean m_saidHello = false; private boolean m_amClustered = false; private int m_myClusterHead = -1; private double m_numNodesPerCluster = 10; // note that the power consumed is actually kept track in the protocol, since that // is where the transmits are taking place private double m_powerReserveWattHours = 4.5; // about how many watt-hours 2 AA alkalines are worth private double m_dataRateBps = 100000.0; private double m_maxPowerReserveWattHours = m_powerReserveWattHours; private double m_tiredLevelWatts = 0.001; // set to % of remaining power when made clusterhead private double m_tiredRatio = 0.4; private double m_currentTransmitPowerLeveldBm = MAX_TRANSMIT_POWER_DBM; private double m_lastTransmitPowerLeveldBm = MIN_TRANSMIT_POWER_DBM; private double m_helloTransmitIncrement = MAX_TRANSMIT_POWER_DBM - MIN_TRANSMIT_POWER_DBM; private HashMap m_levelsToNodesHeardMap = new HashMap(); private long m_simTimeAtExecute = 0; private byte m_numberOfHelloTransmitLevels = 3; private double m_rehelloPercent = 0.3; private byte m_currentHelloTransmitLevel = 0; private boolean m_doingNormalComm = false; private long m_whenToSendSensorData = 0; private byte m_sensorDataSizeBytes = 10; private int m_sensorDataHeaderSizeBytes = 16; private int m_sensorDataTicsBetweenTransmits = 1000; private java.util.Random m_rand = new java.util.Random(12344123); private Vector m_nodesHeard = new Vector(); private Vector m_clusterMembers = new Vector(); private Vector m_rejectedHeadNodes = new Vector(); private Vector m_childPowerLevels = new Vector(); private long m_timeWhenSentTiredMessage = 0; private static final int s_childPowerResponseTimeoutTics = 1000; private boolean m_doingHeadTransfer = false; private java.lang.Object m_propagatedMessageNumbersLock = new java.lang.Object(); private Vector m_propagatedMessageNumbers = new Vector(); private ProtocolIF m_protocol = null; // flag to set if this node should highlite itself private boolean m_highlite = false; private boolean m_drawTransmitRings = false; private java.text.DecimalFormat m_percentFormat = new java.text.DecimalFormat("##.00"); private int m_ticsPassedDuringThisExecute = 0; // state of the node, determines what we do when ExecuteEvent is called private int m_nodeState = -1; private static final int SEND_HELLO_MESSAGE = 0; private static final int AWAITING_RTS = 10; private static final int AWAITING_CTS = 20; private static final int AWAITING_PACKET = 30; private static final int AWAITING_ACK = 40; private static final int SEND_SENSOR_DATA_MESSAGE = 50; private static final int SEND_CLUSTERED_DATA_MESSAGE = 60; private static final int SEND_CLUSTER_HEAD_GETTING_TIRED_MESSAGE = 70; private static final int TRANSMITTING = 80; /** * create a MobileNode, the first node created is the one to send the initial * Hello message */ public MobileNode() { m_myNodeNumber = s_nodeCount++; // get a unique instance of the ProtocolIF, one per node to handle the protocol m_protocol = ProtocolIF.getNewInstance(); m_protocol.setMyNodeNumber(m_myNodeNumber); m_protocol.setNodeEventCallbackIf(this); m_protocol.SetDataRate(m_dataRateBps); m_nodeState = AWAITING_RTS; // the first node starts the 'hello' protocol if(m_myNodeNumber == 0) { m_nodeState = SEND_HELLO_MESSAGE; m_saidHello = true; Scheduler.getInstance().ScheduleEvent(this, DO_SCHEDULER_LOGS); } } public static void ResetNodeCount() { s_nodeCount = 0; } public String GetAlgorithmName() { return "JDR Algorithm"; } /** * enqueue a packet to be processed, and schedule ourselves to process the packet * if we aren't busy with another packet already (Except if this packe is * a CTS or ACK, then we need to process it) * * @param packet the packet to enqueue */ public void ReceivePacket(PacketIF packet) { // determine if we are the destination for this packet int sourceNum = packet.extractInt(packet.getData(), MobilePacket.SOURCE_NODE_BYTE_OFFSET); //m_logger.debug("node: " + m_myNodeNumber + " received packet from node: "+sourceNum); if(sourceNum != m_myNodeNumber) { byte [] data = packet.getData(); int destNum = packet.extractInt(data, MobilePacket.DESTINATION_NODE_BYTE_OFFSET); if((destNum == -1) || (destNum == m_myNodeNumber)) { // packet is for us, enqueue and schedule ourselves to process it m_protocol.AddPacket(packet); } } } /** * do whatever we were waiting to be allowed to do * @return true if node isn't already occupied doing something else, * false if the node is already busy (will leave packet on sceduler queue) */ public boolean ExecuteEvent(long timeNow) { // If we're busy we return false and leave them on the queue and will process them when we've // finished what we are doing currently try { if(!m_busyLock.attempt(0)) { return false; } } catch(InterruptedException iex) { m_logger.error("node: "+m_myNodeNumber+" ExecuteEvent, state: "+GetNodeStateString(m_nodeState)+ " at time of: " + timeNow+" caught interrupted exc, returned false"); if(m_protocol.GetPacketQueueSize() > 0) { // request time from scheduler to deal with the next packet Scheduler.getInstance().ScheduleEvent(this, DO_SCHEDULER_LOGS); } m_busyLock.release(); return false; } m_ticsPassedDuringThisExecute = 0; m_simTimeAtExecute = timeNow; if(DO_LOADS_OF_LOGS) { m_logger.debug("node: "+m_myNodeNumber+" ExecuteEvent, state: "+GetNodeStateString(m_nodeState)+ " at time of: " + timeNow); } // note it is always OK to receive a packet in any of the states, // will have to determine what to do with it based on the node's state switch(m_nodeState) { case SEND_HELLO_MESSAGE: SendHello(); break; case AWAITING_RTS: break; case AWAITING_PACKET: break; case SEND_SENSOR_DATA_MESSAGE: m_whenToSendSensorData = timeNow; m_nodeState = AWAITING_RTS; break; case SEND_CLUSTERED_DATA_MESSAGE: SendClusterheadPackedData(); m_nodeState = AWAITING_RTS; break; case SEND_CLUSTER_HEAD_GETTING_TIRED_MESSAGE: //SendClusterheadTired(timeNow); m_nodeState = AWAITING_RTS; break; default: // we weren't expecting to do something in particular, so assume // somebody sent a packet break; } // we'll always call ProcessPacket to ensure that we take care of our queue ProcessPacket(); // check if we want to form a cluster of our own if(m_whenToFormOwnCluster > 0) { if(m_whenToFormOwnCluster <= Scheduler.getInstance().GetSimulationTime()) { // send stored form event packet ChooseClusterHead(null); } } // check if we're sending our periodic messages if(m_doingNormalComm && (timeNow >= m_whenToSendSensorData)) { SendSensorData(timeNow); } // check if this cluster head is getting tired if(!m_doingHeadTransfer) { CheckIfTired(timeNow); } else { if((m_timeWhenSentTiredMessage + s_childPowerResponseTimeoutTics) <= timeNow) { // we've timed out for child response, assume other // children have expired SelectNewClusterHead(timeNow); } else { // schedule ourselves for another check } } m_busyLock.release(); if(m_protocol.GetPacketQueueSize() > 0) { // request time from scheduler to deal with the next packet Scheduler.getInstance().ScheduleEvent(this, timeNow, DO_SCHEDULER_LOGS); } return true; } private void ProcessPacket() { if(DO_PROCESS_PACKET_LOGS) { m_logger.debug("ProcessPacket was called"); } PacketIF packet = m_protocol.PopNextPacket(); if(packet != null) { packet.setPacketArrived(); // ??? consume power to rcv packet if(GetRemainingPower() <= 0) { // use up some time just to keep the clock ticking Scheduler.getInstance().IncrementSimulationTics(m_simTimeAtExecute, 5); return; } // ??? update statistics with data about this packet // get the data (the data is the only thing a real node would have available) byte [] data = packet.getData(); int sourceNode = MobilePacket.extractInt(data, MobilePacket.SOURCE_NODE_BYTE_OFFSET); int destinationNode = MobilePacket.extractInt(data, MobilePacket.DESTINATION_NODE_BYTE_OFFSET); int lastNode = MobilePacket.extractInt(data, MobilePacket.LAST_NODE_BYTE_OFFSET); int nextNode = MobilePacket.extractInt(data, MobilePacket.NEXT_NODE_BYTE_OFFSET); int ackNumber = MobilePacket.extractInt(data, MobilePacket.ACK_SEQUENCE_NUMBER_BYTE_OFFSET); int seqNumber = MobilePacket.extractInt(data, MobilePacket.SEQUENCE_NUMBER_BYTE_OFFSET); byte messageType = data[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET]; byte messageLen = data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET]; if(DO_PROCESS_PACKET_LOGS) { if((destinationNode == m_myNodeNumber) || (destinationNode == -1)) { m_logger.info("node: "+m_myNodeNumber+ " received a packet of type: "+ MobilePacket.GetMessageTypeString(messageType)+ " from node: " + sourceNode+ " seq num: "+seqNumber+ " ack num: "+ackNumber ); } } switch(messageType) { case MobilePacket.INSIDE_CLUSTER: if(m_myClusterHead == m_myNodeNumber) { // ??? cache the sensor data to be forwarded on during // the cluster head comm phase // for now we just ignore since the cluster head message is assumed // to be a summary, and the same size the other node data messages } break; case MobilePacket.BETWEEN_CLUSTER: if(m_myClusterHead == m_myNodeNumber) { // if we are a cluster head, then re-transmit the message PropagateMessage(packet); } break; case MobilePacket.HELLO_MESSAGE: { byte xmitLevel = data[MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET]; synchronized(m_nodesHeard) { NodeHeard hrd = new NodeHeard(sourceNode, xmitLevel); if(!m_nodesHeard.contains(hrd)) { // do an in-order add, so the vector is always in ascending // transmit power level order int currentNumNodes = m_nodesHeard.size(); boolean added = false; for(int i = 0; i < currentNumNodes; ++i) { NodeHeard tstNode = (NodeHeard)m_nodesHeard.get(i); if(tstNode.m_lowestLevelHeard >= hrd.m_lowestLevelHeard) { m_nodesHeard.add(i, hrd); added = true; break; } } if(!added) { m_nodesHeard.addElement(hrd); } // add to the protocol too m_protocol.AddNodeHeard(hrd); } } // assume that the protocol takes care of collsions byte rehelloLevel = (byte)((1 - m_rehelloPercent) * m_numberOfHelloTransmitLevels); if(rehelloLevel == m_numberOfHelloTransmitLevels) { rehelloLevel = (byte)(m_numberOfHelloTransmitLevels - (byte)1); } if(!m_saidHello && (xmitLevel >= rehelloLevel)) { m_saidHello = true; m_nodeState = SEND_HELLO_MESSAGE; // consume a few tics to do our work Scheduler.getInstance().IncrementSimulationTics(m_simTimeAtExecute, s_ticsConsumedRecordHelloPacket); m_ticsPassedDuringThisExecute += s_ticsConsumedRecordHelloPacket; Scheduler.getInstance().ScheduleEvent(this, Scheduler.getInstance().GetSimulationTime()+s_ticsConsumedByHello*2, DO_SCHEDULER_LOGS); } } break; case MobilePacket.CHOOSE_HEADS: ChooseClusterHead(packet); m_nodeState = AWAITING_RTS; break; case MobilePacket.TELL_HEAD: if(destinationNode != m_myNodeNumber) { break; } if(!m_amClustered) { m_logger.debug("Node: "+m_myNodeNumber+" setting cluster head to node: "+ sourceNode); m_myClusterHead = sourceNode; m_amClustered = true; m_nodeState = AWAITING_RTS; // consume a few tics to do our work Scheduler.getInstance().IncrementSimulationTics(m_simTimeAtExecute, s_ticsConsumedTellHead); m_ticsPassedDuringThisExecute += s_ticsConsumedTellHead; } else { RejectTellHead(packet); } break; case MobilePacket.REJECT_HEAD: if(destinationNode != m_myNodeNumber) { break; } m_logger.debug("Node: "+sourceNode+" rejected node:"+m_myNodeNumber+ " as its cluster head"); // remove the source node from our list of clustered nodes synchronized(m_clusterMembers) { NodeHeard nodeHrd = new NodeHeard(sourceNode, (byte)-1); boolean removeOk = m_clusterMembers.removeElement(nodeHrd); m_logger.debug("clusterHead node num: " + m_myNodeNumber+ " removed from cluster node num: " + sourceNode+ " status: "+removeOk); } // consume a few tics to do our work Scheduler.getInstance().IncrementSimulationTics(m_simTimeAtExecute, s_ticsConsumedTellHead); m_ticsPassedDuringThisExecute += s_ticsConsumedTellHead; break; case MobilePacket.RTS: case MobilePacket.CTS: case MobilePacket.ACK: m_logger.warn("Node got message that should have been handled by the protocol layer"); break; case MobilePacket.OK_TO_DO_NORMAL_COMM: // take this opportunity to check if we are a lonely head, if so // join one of the heads we rejected. if(m_myNodeNumber == m_myClusterHead) { int numClusterChildren = 0; synchronized(m_clusterMembers) { numClusterChildren = m_clusterMembers.size(); } if(numClusterChildren == 0) { JoinOtherCluster(); } } // set flag so that we begin our 'normal comm' routine m_doingNormalComm = true; m_whenToSendSensorData = Scheduler.getInstance().GetSimulationTime() + m_sensorDataTicsBetweenTransmits; // schedule event at time in future for normal comm msg Scheduler.getInstance().ScheduleEvent(this, m_whenToSendSensorData, DO_SCHEDULER_LOGS); // propagate the OK_TO_DO_NORMAL_COMM message PropagateMessage(packet); m_nodeState = AWAITING_RTS; break; case MobilePacket.OVERRIDE_HEAD: if(destinationNode != m_myNodeNumber) { break; } int newHead = MobilePacket.extractInt(data, MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET); m_myClusterHead = newHead; if(m_myNodeNumber == m_myClusterHead) { m_tiredLevelWatts = m_tiredRatio * GetRemainingPower(); } m_logger.debug("node: "+m_myNodeNumber + " changed cluster head to: "+ m_myClusterHead+" tiredLevelWatts: "+m_tiredLevelWatts); break; case MobilePacket.JOIN_CLUSTER: if(destinationNode != m_myNodeNumber) { break; } // allow the other node to join if we are a clusterhead, or if we // had selected the node that is joining as our cluster head if((m_myClusterHead == m_myNodeNumber) || (sourceNode == m_myClusterHead)) { if(sourceNode == m_myClusterHead) { m_myClusterHead = m_myNodeNumber; } m_logger.debug("node: "+sourceNode+" joined cluster of node: "+m_myNodeNumber); // find this node in our list of nodes heard NodeHeard newNode = new NodeHeard(sourceNode, GetMinTransmitLevel(sourceNode)); synchronized(m_clusterMembers) { m_clusterMembers.addElement(newNode); } } else { // we are part of another cluster, push back to the node to // assign its head as our head SetOtherNodeHead(sourceNode, m_myClusterHead); } break; case MobilePacket.HEAD_TIRED: if(m_myClusterHead == sourceNode) { SendChildPower(sourceNode, m_simTimeAtExecute + m_ticsPassedDuringThisExecute); } break; case MobilePacket.CHILD_POWER: if(destinationNode != m_myNodeNumber) { break; } if(m_myClusterHead == m_myNodeNumber) { // record this child's power synchronized(m_childPowerLevels) { double pwr = 0.0; // extract the remaining power percentage digits int numDigits = data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET]; byte thisDigit = (byte)0; for(int digits = 0; digits < numDigits; ++digits) { thisDigit = data[MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET+digits]; pwr += (double)thisDigit / Math.pow(10, ((double)digits+1.0)); if(DO_CHANGE_CLUSTER_HEAD_LOGS) { m_logger.debug("node: "+m_myNodeNumber+ " received child power digit#: "+digits+ " as value: "+ thisDigit+" power now: "+ pwr+ " from node: "+sourceNode); } } if(DO_CHANGE_CLUSTER_HEAD_LOGS) { m_logger.debug("node: "+m_myNodeNumber+" received child power as: "+ pwr+ " from node: "+sourceNode); } m_childPowerLevels.addElement(new ChildPower(sourceNode, pwr)); synchronized(m_clusterMembers) { if(m_clusterMembers.size() == m_childPowerLevels.size()) { // all children have reported SelectNewClusterHead(m_simTimeAtExecute); } } } } break; case MobilePacket.ASSIGN_CHILD_NODES: if(destinationNode != m_myNodeNumber) { break; } if(m_myNodeNumber != m_myClusterHead) { m_logger.error("node: "+m_myNodeNumber+" received assign child nodes"+ " message, but isn't a cluster head"); } synchronized(m_clusterMembers) { m_clusterMembers.removeAllElements(); // extract all the node numbers int numChildren = data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET] / 4; for(int i = 0; i < numChildren; ++i) { int child = packet.extractInt(data, MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET+ (i*4)); m_clusterMembers.addElement(new NodeHeard(child, GetMinTransmitLevel(child))); } } break; default: m_logger.warn("received a packet with an unknown type: "+ messageType); } } else { if(DO_PROCESS_PACKET_LOGS) { m_logger.error("ProcessPacket called, but no packets on queue"); } } } private void SendHello() { PacketIF packet = PacketIF.getNewInstance(); if(packet != null) { // the hello is a broadcast, so no need to do the whole RTS/CTS/Send/ACK sequence byte [] data = new byte[MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET + 1 + PacketIF.CRC_SIZE_BYTES]; packet.setData(data); // don't care about sequence number when broadcasting packet.insertInt(data, MobilePacket.SEQUENCE_NUMBER_BYTE_OFFSET, -1); packet.insertInt(data, MobilePacket.ACK_SEQUENCE_NUMBER_BYTE_OFFSET, -1); packet.insertInt(data, MobilePacket.SOURCE_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.DESTINATION_NODE_BYTE_OFFSET, -1); // no particular destination, anybody who hears packet.insertInt(data, MobilePacket.LAST_NODE_BYTE_OFFSET, -1); packet.insertInt(data, MobilePacket.NEXT_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.MESSAGE_NUMBER_BYTE_OFFSET, packet.GetNextMessageNumber()); data[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET] = MobilePacket.HELLO_MESSAGE; data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET] = 1; data[MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET] = m_currentHelloTransmitLevel; if(DO_LOADS_OF_LOGS) { m_logger.debug("node: "+m_myNodeNumber+" sending hello at power level: "+ m_currentHelloTransmitLevel+" which is: "+ m_currentTransmitPowerLeveldBm+" dBm"); } // consume a few tics to do our work Scheduler.getInstance().IncrementSimulationTics(m_simTimeAtExecute, s_ticsConsumedByHello); m_ticsPassedDuringThisExecute += s_ticsConsumedByHello; // return after increment, so time will continue to pass if(GetRemainingPower() <= 0) { return; } PropagationIF prop = PropagationIF.getInstance(); if(prop != null) { double xmitWatts = (Math.pow(10.0, (m_currentTransmitPowerLeveldBm / 10))) / 1000; // next calculate what portion of an hour based on the packet size and // our data rate. xmitWatts *= (packet.getTransmitedBitCount() / m_dataRateBps) / ProtocolIF.s_SECONDS_PER_HOUR; // last thing to do to the packet before sending is insert the CRC PacketIF.AddCrc(data); int prevState = m_nodeState; m_nodeState = TRANSMITTING; m_lastTransmitPowerLeveldBm = m_currentTransmitPowerLeveldBm; prop.TransmitData(m_simTimeAtExecute + s_ticsConsumedByHello, data.length * 8, packet, m_location, m_currentTransmitPowerLeveldBm, DO_LOADS_OF_LOGS); packet.IncrementPowerConsumedInTransit(xmitWatts); m_powerReserveWattHours -= xmitWatts; m_nodeState = prevState; } ++m_currentHelloTransmitLevel; if(m_currentHelloTransmitLevel < m_numberOfHelloTransmitLevels) { m_currentTransmitPowerLeveldBm += m_helloTransmitIncrement; m_nodeState = SEND_HELLO_MESSAGE; // schedule another hello, at the next power level in 1/10 second from 'now' Scheduler.getInstance().ScheduleEvent(this, m_simTimeAtExecute + Scheduler.s_ticsPerSecond / 10, DO_SCHEDULER_LOGS); } else { m_nodeState = AWAITING_RTS; } } } private void SendSensorData(long timeNow) { // send sensor data // outside packet != null block for logging int sensorData = m_rand.nextInt(); PacketIF packet = PacketIF.getNewInstance(); if(packet != null) { byte [] data = new byte[MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET + m_sensorDataSizeBytes + m_sensorDataHeaderSizeBytes + PacketIF.CRC_SIZE_BYTES]; packet.setData(data); int seqNum = PacketIF.GetNextSequenceNumber(); packet.insertInt(data, MobilePacket.SEQUENCE_NUMBER_BYTE_OFFSET, seqNum); packet.insertInt(data, MobilePacket.ACK_SEQUENCE_NUMBER_BYTE_OFFSET, 0); packet.insertInt(data, MobilePacket.SOURCE_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.MESSAGE_NUMBER_BYTE_OFFSET, packet.GetNextMessageNumber()); int destNode = m_myClusterHead; double xmitPwr = MIN_TRANSMIT_POWER_DBM; if(m_amClustered && (m_myClusterHead == m_myNodeNumber)) { // we're the cluster head, need to send summary message along ??? // for now the summary message is the same size as the normal message destNode = -1; xmitPwr = MAX_TRANSMIT_POWER_DBM; data[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET] = MobilePacket.BETWEEN_CLUSTER; } else { xmitPwr = GetMinTransmitDbm(destNode); data[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET] = MobilePacket.INSIDE_CLUSTER; } packet.insertInt(data, MobilePacket.DESTINATION_NODE_BYTE_OFFSET, destNode); packet.insertInt(data, MobilePacket.LAST_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.NEXT_NODE_BYTE_OFFSET, destNode); data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET] = m_sensorDataSizeBytes; packet.insertInt(data, MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET + m_sensorDataHeaderSizeBytes, sensorData); // consume some tics for sending data Scheduler.getInstance().IncrementSimulationTics(timeNow, s_ticsConsumedBySendData); m_ticsPassedDuringThisExecute += s_ticsConsumedBySendData; // return after increment, so time will continue to pass if(GetRemainingPower() <= 0) { return; } boolean rc = false; rc = m_protocol.TransmitData(timeNow, data.length * 8, packet, m_location, xmitPwr, DO_LOADS_OF_LOGS); } // schedule for next periodic transmit m_whenToSendSensorData = timeNow + m_sensorDataTicsBetweenTransmits; if(DO_SENSOR_DATA_LOGS) { m_logger.debug("node: "+m_myNodeNumber+" sending periodic data: " + sensorData+ " at time: " + timeNow + " will send next at time: "+ m_whenToSendSensorData); } // schedule event at time in future for normal comm msg Scheduler.getInstance().ScheduleEvent(this, m_whenToSendSensorData, DO_SCHEDULER_LOGS); } private void SendClusterheadPackedData() { if(GetRemainingPower() <= 0) { // use up some time just to keep the clock ticking Scheduler.getInstance().IncrementSimulationTics(m_simTimeAtExecute, 5); return; } // ??? send sensor cluster head packed data // not sending specific data, just sending a normal data packet, but // with the 'forward' flag on } /** * sends a message to this cluster's children to determine which child to pass * the clusterhead duties to * Message contains this node's current ratio of power left. Message format is * to provide 4 bytes, which represent the first 4 digits after the decimal of the ratio * Expect to get return messages from all the children stating their ratio power left */ private void SendClusterheadTired(long timeNow) { // broadcast the I'm tired message, then will get a response from all the // children containing their remaining power, then pick the strongest synchronized(m_childPowerLevels) { m_childPowerLevels.removeAllElements(); } if(DO_CHANGE_CLUSTER_HEAD_LOGS) { double pwrLeft = GetRemainingPower() / m_maxPowerReserveWattHours; m_logger.debug("node: "+m_myNodeNumber+" sending the cluster head tired message "+ " at power remaining ratio: "+pwrLeft); } // find the one child requiring the highest power level int farNode = GetFarthestChildNode(); PacketIF packet = PacketIF.getNewInstance(); if(packet != null) { byte [] data = new byte[MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET + PacketIF.CRC_SIZE_BYTES]; packet.setData(data); int seqNum = PacketIF.GetNextSequenceNumber(); packet.insertInt(data, MobilePacket.SEQUENCE_NUMBER_BYTE_OFFSET, seqNum); packet.insertInt(data, MobilePacket.ACK_SEQUENCE_NUMBER_BYTE_OFFSET, 0); packet.insertInt(data, MobilePacket.SOURCE_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.DESTINATION_NODE_BYTE_OFFSET, -1); packet.insertInt(data, MobilePacket.LAST_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.NEXT_NODE_BYTE_OFFSET, -1); packet.insertInt(data, MobilePacket.MESSAGE_NUMBER_BYTE_OFFSET, packet.GetNextMessageNumber()); data[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET] = MobilePacket.HEAD_TIRED; data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET] = 0; // consume some tics for sending message Scheduler.getInstance().IncrementSimulationTics(timeNow, s_ticsConsumedBySendTired); m_ticsPassedDuringThisExecute += s_ticsConsumedBySendTired; // return after increment, so time will continue to pass if(GetRemainingPower() <= 0) { return; } double xmitPwr = MIN_TRANSMIT_POWER_DBM; xmitPwr = GetMinTransmitDbm(farNode); boolean rc = false; rc = m_protocol.TransmitData(timeNow, data.length * 8, packet, m_location, xmitPwr, DO_LOADS_OF_LOGS); } } private void CheckIfTired(long timeNow) { if(m_myClusterHead != m_myNodeNumber) { // we're not the clusterhead, so we don't care. return; } if(GetRemainingPower() < m_tiredLevelWatts) { if(DO_CHANGE_CLUSTER_HEAD_LOGS) { m_logger.debug("node: "+m_myNodeNumber+" getting tired as cluster head. "+ " tired level: "+m_tiredLevelWatts); } m_doingHeadTransfer = true; m_timeWhenSentTiredMessage = timeNow; Scheduler.getInstance().IncrementSimulationTics(timeNow, s_ticsConsumedByCheckTired); m_ticsPassedDuringThisExecute += s_ticsConsumedByCheckTired; SendClusterheadTired(timeNow); } } private void SendChildPower(int destnode, long timeNow) { PacketIF packet = PacketIF.getNewInstance(); if(packet != null) { byte numDecimals = (byte)4; byte [] data = new byte[MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET + PacketIF.CRC_SIZE_BYTES + numDecimals]; packet.setData(data); int seqNum = PacketIF.GetNextSequenceNumber(); packet.insertInt(data, MobilePacket.SEQUENCE_NUMBER_BYTE_OFFSET, seqNum); packet.insertInt(data, MobilePacket.ACK_SEQUENCE_NUMBER_BYTE_OFFSET, 0); packet.insertInt(data, MobilePacket.SOURCE_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.DESTINATION_NODE_BYTE_OFFSET, destnode); packet.insertInt(data, MobilePacket.LAST_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.NEXT_NODE_BYTE_OFFSET, destnode); packet.insertInt(data, MobilePacket.MESSAGE_NUMBER_BYTE_OFFSET, packet.GetNextMessageNumber()); data[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET] = MobilePacket.CHILD_POWER; // get our remaining power, which is less than one double pwrLeft = GetRemainingPower() / m_maxPowerReserveWattHours; if(pwrLeft < 0.0001) { pwrLeft = 0; } else if(pwrLeft > 0.9999) { pwrLeft = 0.9999; } double pwrLeftLog = pwrLeft; byte digit = (byte)0; data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET] = numDecimals; // digits after decimal String digitsAsString = ""; for(int decimals = 0; decimals < numDecimals; ++decimals) { pwrLeft *= 10; digit = (byte)((int)pwrLeft); data[MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET + decimals] = digit; pwrLeft -= digit; digitsAsString = digitsAsString + digit; } if(DO_CHANGE_CLUSTER_HEAD_LOGS) { m_logger.debug("node: "+m_myNodeNumber+" sending child power as: 0."+ digitsAsString+" based on actual power left: "+pwrLeftLog); } // consume some tics for sending message Scheduler.getInstance().IncrementSimulationTics(timeNow, s_ticsConsumedBySendChildPower); m_ticsPassedDuringThisExecute += s_ticsConsumedBySendChildPower; // return after increment, so time will continue to pass if(GetRemainingPower() <= 0) { return; } double xmitPwr = GetMinTransmitDbm(destnode); boolean rc = false; rc = m_protocol.TransmitData(timeNow, data.length * 8, packet, m_location, xmitPwr, DO_LOADS_OF_LOGS); } } private int GetFarthestChildNode() { int farNodeNum = -1; byte level = Byte.MIN_VALUE; synchronized(m_nodesHeard) { int numHrd = m_nodesHeard.size(); for(int i = 0; i < numHrd; ++i) { NodeHeard testHrd = ((NodeHeard)m_nodesHeard.get(i)); if(testHrd.m_lowestLevelHeard > level) { level = testHrd.m_lowestLevelHeard; farNodeNum = testHrd.m_nodeNumber; } } } return farNodeNum; } /** * decide who this node thinks would make a good head * since the head choosing is initiated by the sink node, the * first node to hear is closest, so it should start out as the * head, then pick the nodes it heard at a lower level, and inform them * that this node is their head, then tell the nodes it heard at a higher level * that they need to pick a head */ protected void ChooseClusterHead(PacketIF headStartPacket) { if(m_amClustered) { // this node is already part of a cluster, ignore return; } int delayBeforeStartForming = 0; if(headStartPacket != null) { byte[] headStartData = headStartPacket.getData(); if(headStartData.length >= (MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET + 4)) // 4 bytes for int { boolean hasDelay = (headStartData[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET] > 0); if(hasDelay) { delayBeforeStartForming = headStartPacket.extractInt(headStartData, MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET); if(delayBeforeStartForming > 0) { m_whenToFormOwnCluster = Scheduler.getInstance().GetSimulationTime() + (delayBeforeStartForming); if(DO_CLUSTER_FORM_LOGS) { m_logger.debug("node: "+m_myNodeNumber+" delaying forming our own cluster until: "+ m_whenToFormOwnCluster); } // schedule event for us to do our own cluster formation Scheduler.getInstance().ScheduleEvent(this, m_whenToFormOwnCluster, DO_CLUSTER_FORM_LOGS); return; } } } } int numNodes = m_nodesHeard.size(); m_logger.debug("Node: "+m_myNodeNumber+" ChooseClusterHead Total num other nodes heard: "+ numNodes); m_amClustered = true; m_myClusterHead = m_myNodeNumber; // record the level at which we'll decide we're tired: m_tiredLevelWatts = m_tiredRatio * GetRemainingPower(); byte rehelloLevel = (byte)((1 - m_rehelloPercent) * m_numberOfHelloTransmitLevels); boolean xmitMsg = false; // for each node we tell to form its own cluster, add a delay for when it should start int numFormOwn = 0; // loop through twice, once for nodes to add, once for nodes to tell to // form their own cluster for(int twice = 0; twice < 2; ++twice) { int i = 0; while(i < numNodes) { xmitMsg = false; NodeHeard nodeHrd = null; synchronized(m_nodesHeard) { nodeHrd = (NodeHeard)m_nodesHeard.get(i); ++i; numNodes = m_nodesHeard.size(); } if(nodeHrd == null) { continue; } // tell node it has to pick a cluster head PacketIF packet = PacketIF.getNewInstance(); if(packet == null) { m_logger.error("unable to get a new packet"); return; } byte [] data = new byte[MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET + 4 + PacketIF.CRC_SIZE_BYTES]; packet.setData(data); int seqNum = PacketIF.GetNextSequenceNumber(); packet.insertInt(data, MobilePacket.SEQUENCE_NUMBER_BYTE_OFFSET, seqNum); packet.insertInt(data, MobilePacket.ACK_SEQUENCE_NUMBER_BYTE_OFFSET, 0); packet.insertInt(data, MobilePacket.SOURCE_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.DESTINATION_NODE_BYTE_OFFSET, nodeHrd.m_nodeNumber); packet.insertInt(data, MobilePacket.LAST_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.NEXT_NODE_BYTE_OFFSET, nodeHrd.m_nodeNumber); packet.insertInt(data, MobilePacket.MESSAGE_NUMBER_BYTE_OFFSET, packet.GetNextMessageNumber()); if(twice == 0) { if(nodeHrd.m_lowestLevelHeard < rehelloLevel) { int numberOfClusterMembers = 0; synchronized(m_clusterMembers) { numberOfClusterMembers = m_clusterMembers.size(); } xmitMsg = true; if(numberOfClusterMembers < m_numNodesPerCluster) { // make node part of the cluster synchronized(m_clusterMembers) { m_clusterMembers.addElement(nodeHrd); } data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET] = 0; data[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET] = MobilePacket.TELL_HEAD; m_logger.debug("node: " + m_myNodeNumber + " telling node: "+nodeHrd.m_nodeNumber + " to use this node as cluster head"); } else { data[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET] = MobilePacket.CHOOSE_HEADS; // add more info to the packet, the time to delay data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET] = 4; packet.insertInt(data, MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET, (numFormOwn * s_ticsDelayBetweenTellFormOwnCluster)); ++numFormOwn; m_logger.debug("node: " + m_myNodeNumber + " telling node: "+nodeHrd.m_nodeNumber + " to form its own cluster, "+ "power level: "+ nodeHrd.m_lowestLevelHeard); } } } else { // only tell the nodes at the first level that is not included in the // cluster to form their own cluster, assumes that the nodes at the // higher un-used levels will be able to hear the nodes at the lower levels // ??? if we only do == then we don't get the message out if(nodeHrd.m_lowestLevelHeard >= rehelloLevel) { xmitMsg = true; data[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET] = MobilePacket.CHOOSE_HEADS; // add more info to the packet, the time to delay data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET] = 4; packet.insertInt(data, MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET, (numFormOwn * s_ticsDelayBetweenTellFormOwnCluster)); ++numFormOwn; m_logger.debug("node: " + m_myNodeNumber + " telling node: "+nodeHrd.m_nodeNumber + " to form its own cluster, "+ "power level: "+ nodeHrd.m_lowestLevelHeard); } } // consume a few tics to do our work Scheduler.getInstance().IncrementSimulationTics(m_simTimeAtExecute, s_ticsConsumedByChooseHead); m_ticsPassedDuringThisExecute += s_ticsConsumedByChooseHead; // return after increment, so time will continue to pass if(GetRemainingPower() <= 0) { return; } // only transmit when the node tested met the close enough or far enough criteria if(xmitMsg) { double xmitPwr = GetMinTransmitDbm(nodeHrd.m_nodeNumber); // last thing to do to the packet before sending is insert the CRC PacketIF.AddCrc(data); int prevState = m_nodeState; m_nodeState = TRANSMITTING; m_lastTransmitPowerLeveldBm = xmitPwr; boolean rc = false; rc = m_protocol.TransmitData(m_simTimeAtExecute + s_ticsConsumedByChooseHead, data.length * 8, packet, m_location, xmitPwr, DO_CLUSTER_FORM_LOGS); m_nodeState = prevState; } } } } /** * uses the m_childPowerLevels vector to choose a replacement cluster head */ protected void SelectNewClusterHead(long timeNow) { m_doingHeadTransfer = false; double mostPowerLeft = GetRemainingPower() / m_maxPowerReserveWattHours; int nodeNumMostPwrLeft = m_myNodeNumber; synchronized(m_childPowerLevels) { int numChildren = m_childPowerLevels.size(); for(int i = 0; i < numChildren; ++i) { ChildPower cpwr = (ChildPower)m_childPowerLevels.get(i); if(cpwr.m_level > mostPowerLeft) { nodeNumMostPwrLeft = cpwr.m_nodeNumber; } } } if(nodeNumMostPwrLeft != m_myNodeNumber) { m_myClusterHead = nodeNumMostPwrLeft; // tell all nodes to use node with the most power remaining as become cluster head int numClusterMembers = 0; Vector clusterNodeNums = new Vector(); clusterNodeNums.addElement(new Integer(m_myNodeNumber)); synchronized(m_clusterMembers) { numClusterMembers = m_clusterMembers.size(); for(int i = 0; i < numClusterMembers; ++i) { int destNode = ((NodeHeard)m_clusterMembers.get(i)).m_nodeNumber; SetOtherNodeHead(destNode, nodeNumMostPwrLeft); if(destNode == m_myClusterHead) { // don't add the new clusterhead as a member of its cluster continue; } clusterNodeNums.addElement(new Integer(destNode)); } } if(DO_CHANGE_CLUSTER_HEAD_LOGS) { m_logger.debug("node: "+m_myNodeNumber+" chose new cluster head node: "+ nodeNumMostPwrLeft); } // tell the new cluster head all the child nodes PacketIF packet = PacketIF.getNewInstance(); if(packet != null) { byte [] data = new byte[MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET + PacketIF.CRC_SIZE_BYTES + numClusterMembers*4]; packet.setData(data); int seqNum = PacketIF.GetNextSequenceNumber(); packet.insertInt(data, MobilePacket.SEQUENCE_NUMBER_BYTE_OFFSET, seqNum); packet.insertInt(data, MobilePacket.ACK_SEQUENCE_NUMBER_BYTE_OFFSET, 0); packet.insertInt(data, MobilePacket.SOURCE_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.DESTINATION_NODE_BYTE_OFFSET, nodeNumMostPwrLeft); packet.insertInt(data, MobilePacket.LAST_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.NEXT_NODE_BYTE_OFFSET, nodeNumMostPwrLeft); packet.insertInt(data, MobilePacket.MESSAGE_NUMBER_BYTE_OFFSET, packet.GetNextMessageNumber()); data[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET] = MobilePacket.ASSIGN_CHILD_NODES; data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET] = (byte)(numClusterMembers*4); for(int i = 0; i < numClusterMembers; ++i) { int member = ((Integer)clusterNodeNums.get(i)).intValue(); packet.insertInt(data, MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET + (4*i), member); } // consume some tics for sending message Scheduler.getInstance().IncrementSimulationTics(timeNow, s_ticsConsumedBySelectNewHead); m_ticsPassedDuringThisExecute += s_ticsConsumedBySelectNewHead; // return after increment, so time will continue to pass if(GetRemainingPower() <= 0) { return; } double xmitPwr = GetMinTransmitDbm(nodeNumMostPwrLeft); boolean rc = false; rc = m_protocol.TransmitData(timeNow, data.length * 8, packet, m_location, xmitPwr, DO_LOADS_OF_LOGS); } } } protected void RejectTellHead(PacketIF packet) { byte [] data = packet.getData(); int sourceNode = MobilePacket.extractInt(data, MobilePacket.SOURCE_NODE_BYTE_OFFSET); int destinationNode = MobilePacket.extractInt(data, MobilePacket.DESTINATION_NODE_BYTE_OFFSET); int lastNode = MobilePacket.extractInt(data, MobilePacket.LAST_NODE_BYTE_OFFSET); int nextNode = MobilePacket.extractInt(data, MobilePacket.NEXT_NODE_BYTE_OFFSET); byte messageType = data[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET]; byte messageLen = data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET]; synchronized(m_rejectedHeadNodes) { m_rejectedHeadNodes.addElement(new Integer(sourceNode)); } m_logger.debug("Node: "+m_myNodeNumber+" rejecting node:"+sourceNode+ " as cluster head"); // tell the calling node that this node is already clustered PacketIF rejPacket = PacketIF.getNewInstance(); if(packet == null) { m_logger.error("unable to get a new packet"); return; } byte [] rejdata = new byte[MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET + PacketIF.CRC_SIZE_BYTES]; rejPacket.setData(rejdata); int seqNum = PacketIF.GetNextSequenceNumber(); rejPacket.insertInt(rejdata, MobilePacket.SEQUENCE_NUMBER_BYTE_OFFSET, seqNum); rejPacket.insertInt(rejdata, MobilePacket.ACK_SEQUENCE_NUMBER_BYTE_OFFSET, 0); rejPacket.insertInt(rejdata, MobilePacket.SOURCE_NODE_BYTE_OFFSET, m_myNodeNumber); rejPacket.insertInt(rejdata, MobilePacket.DESTINATION_NODE_BYTE_OFFSET, sourceNode); rejPacket.insertInt(rejdata, MobilePacket.LAST_NODE_BYTE_OFFSET, m_myNodeNumber); rejPacket.insertInt(rejdata, MobilePacket.NEXT_NODE_BYTE_OFFSET, sourceNode); rejPacket.insertInt(data, MobilePacket.MESSAGE_NUMBER_BYTE_OFFSET, packet.GetNextMessageNumber()); rejdata[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET] = 0; rejdata[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET] = MobilePacket.REJECT_HEAD; double xmitPwr = GetMinTransmitDbm(sourceNode); // last thing to do to the packet before sending is insert the CRC PacketIF.AddCrc(rejdata); int prevState = m_nodeState; m_nodeState = TRANSMITTING; m_lastTransmitPowerLeveldBm = xmitPwr; // consume a few tics to do our work Scheduler.getInstance().IncrementSimulationTics(m_simTimeAtExecute, s_ticsConsumedRejectHead); m_ticsPassedDuringThisExecute += s_ticsConsumedRejectHead; // return after increment, so time will continue to pass if(GetRemainingPower() <= 0) { return; } boolean rc = false; rc = m_protocol.TransmitData(m_simTimeAtExecute + s_ticsConsumedRejectHead, rejdata.length * 8, rejPacket, m_location, xmitPwr, DO_LOADS_OF_LOGS); m_nodeState = prevState; } /** * we've determined that we are a lonely cluster head, so join another cluster */ protected void JoinOtherCluster() { int clusterHead = Integer.MIN_VALUE; // find the cluster head we rejected with the lowest xmit level synchronized(m_rejectedHeadNodes) { int minLvl = Integer.MAX_VALUE; Iterator iter = m_rejectedHeadNodes.iterator(); while(iter.hasNext()) { int nodeNum = ((Integer)iter.next()).intValue(); int lvl = GetMinTransmitLevel(nodeNum); if(lvl < minLvl) { clusterHead = nodeNum; minLvl = lvl; } } } if(clusterHead == Integer.MIN_VALUE) { return; } m_myClusterHead = clusterHead; PacketIF packet = PacketIF.getNewInstance(); byte [] data = new byte[MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET + PacketIF.CRC_SIZE_BYTES]; packet.setData(data); int seqNum = PacketIF.GetNextSequenceNumber(); packet.insertInt(data, MobilePacket.SEQUENCE_NUMBER_BYTE_OFFSET, seqNum); packet.insertInt(data, MobilePacket.ACK_SEQUENCE_NUMBER_BYTE_OFFSET, 0); packet.insertInt(data, MobilePacket.SOURCE_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.DESTINATION_NODE_BYTE_OFFSET, clusterHead); packet.insertInt(data, MobilePacket.LAST_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.NEXT_NODE_BYTE_OFFSET, clusterHead); packet.insertInt(data, MobilePacket.MESSAGE_NUMBER_BYTE_OFFSET, packet.GetNextMessageNumber()); data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET] = 0; data[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET] = MobilePacket.JOIN_CLUSTER; double xmitPwr = GetMinTransmitDbm(clusterHead); // last thing to do to the packet before sending is insert the CRC PacketIF.AddCrc(data); int prevState = m_nodeState; m_nodeState = TRANSMITTING; m_lastTransmitPowerLeveldBm = xmitPwr; // consume a few tics to do our work Scheduler.getInstance().IncrementSimulationTics(m_simTimeAtExecute, s_ticsConsumedByJoinCluster); m_ticsPassedDuringThisExecute += s_ticsConsumedByJoinCluster; // return after increment, so time will continue to pass if(GetRemainingPower() <= 0) { return; } boolean rc = false; rc = m_protocol.TransmitData(m_simTimeAtExecute + s_ticsConsumedRejectHead, data.length * 8, packet, m_location, xmitPwr, DO_LOADS_OF_LOGS); m_nodeState = prevState; } protected void SetOtherNodeHead(int otherNode, int headToAssign) { PacketIF packet = PacketIF.getNewInstance(); byte [] data = new byte[MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET + PacketIF.CRC_SIZE_BYTES + 4]; packet.setData(data); int seqNum = PacketIF.GetNextSequenceNumber(); packet.insertInt(data, MobilePacket.SEQUENCE_NUMBER_BYTE_OFFSET, seqNum); packet.insertInt(data, MobilePacket.ACK_SEQUENCE_NUMBER_BYTE_OFFSET, 0); packet.insertInt(data, MobilePacket.SOURCE_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.DESTINATION_NODE_BYTE_OFFSET, otherNode); packet.insertInt(data, MobilePacket.LAST_NODE_BYTE_OFFSET, m_myNodeNumber); packet.insertInt(data, MobilePacket.NEXT_NODE_BYTE_OFFSET, otherNode); packet.insertInt(data, MobilePacket.MESSAGE_NUMBER_BYTE_OFFSET, packet.GetNextMessageNumber()); data[MobilePacket.MESSAGE_TYPE_BYTE_OFFSET] = MobilePacket.OVERRIDE_HEAD; data[MobilePacket.MESSAGE_LENGTH_BYTE_OFFSET] = 4; packet.insertInt(data, MobilePacket.BEGIN_GENERIC_DATA_BYTE_OFFSET, headToAssign); double xmitPwr = GetMinTransmitDbm(otherNode); // last thing to do to the packet before sending is insert the CRC PacketIF.AddCrc(data); int prevState = m_nodeState; m_nodeState = TRANSMITTING; m_lastTransmitPowerLeveldBm = xmitPwr; // consume a few tics to do our work Scheduler.getInstance().IncrementSimulationTics(m_simTimeAtExecute, s_ticsConsumedByJoinCluster); m_ticsPassedDuringThisExecute += s_ticsConsumedByJoinCluster; // return after increment, so time will continue to pass if(GetRemainingPower() <= 0) { return; } boolean rc = false; rc = m_protocol.TransmitData(m_simTimeAtExecute + s_ticsConsumedByJoinCluster, data.length * 8, packet, m_location, xmitPwr, DO_LOADS_OF_LOGS); m_nodeState = prevState; } /** * propagates a packet by sending it out again at a broadcast level appropriate to the * destination node (if this node didn't hear the node then send message at max power */ protected void PropagateMessage(PacketIF packet) { byte [] data = packet.getData(); int sourceNode = MobilePacket.extractInt(data, MobilePacket.SOURCE_NODE_BYTE_OFFSET); int destinationNode = MobilePacket.extractInt(data, MobilePacket.DESTINATION_NODE_BYTE_OFFSET); int msgNumber = MobilePacket.extractInt(data, MobilePacket.MESSAGE_NUMBER_BYTE_OFFSET); // check the sequence number to see if we've forwarded this message before Integer newInteger = new Integer(msgNumber); synchronized(m_propagatedMessageNumbersLock) { if(m_propagatedMessageNumbers.contains(newInteger)) { // already propagated this message, just return return; } else { // add to the list of propagated sequence numbers m_propagatedMessageNumbers.addElement(newInteger); } } // increment the hop count packet.IncrementHopCount(1); double xmitPwr = GetMinTransmitDbm(destinationNode); long timeNow = Scheduler.getInstance().GetSimulationTime(); boolean rc; if(DO_MESSAGE_PROPAGATE_LOGS) { m_logger.debug("propagating packet: "+ packet.toString()); } // consume a few tics to do our work Scheduler.getInstance().IncrementSimulationTics(m_simTimeAtExecute, s_ticsConsumedByPropagate); m_ticsPassedDuringThisExecute += s_ticsConsumedByPropagate; // return after increment, so time will continue to pass if(GetRemainingPower() <= 0) { return; } // propagate the message rc = m_protocol.TransmitData(timeNow, data.length * 8, packet, m_location, xmitPwr); } protected double GetMinTransmitDbm(int destNode) { return (MIN_TRANSMIT_POWER_DBM + (m_helloTransmitIncrement * (GetMinTransmitLevel(destNode)+1))); } protected byte GetMinTransmitLevel(int destNode) { byte xmitLevel = (byte)(m_numberOfHelloTransmitLevels - 1); // determine the transmit level needed to communicate with the // node we want to communicate with synchronized(m_nodesHeard) { int numHrd = m_nodesHeard.size(); for(int i = 0; i < numHrd; ++i) { NodeHeard testHrd = ((NodeHeard)m_nodesHeard.get(i)); if(testHrd.m_nodeNumber == destNode) { xmitLevel = testHrd.m_lowestLevelHeard; break; } } } if(DO_LOADS_OF_LOGS) { m_logger.debug("node: " + m_myNodeNumber +" heard node: "+destNode + " at level: "+xmitLevel); } return xmitLevel; } public void setNumberOfHelloTransmitLevels(byte numLevels) { m_numberOfHelloTransmitLevels = numLevels; if(numLevels > 1) { m_currentTransmitPowerLeveldBm = MIN_TRANSMIT_POWER_DBM; m_helloTransmitIncrement = (MAX_TRANSMIT_POWER_DBM - MIN_TRANSMIT_POWER_DBM) / (numLevels-1); m_logger.debug("node: "+m_myNodeNumber+" number of transmit levels: "+numLevels + " increment size: " + m_helloTransmitIncrement); } else { m_helloTransmitIncrement = MobileNode.MAX_TRANSMIT_POWER_DBM - MobileNode.MIN_TRANSMIT_POWER_DBM; } } /** * determines how many nodes will execute the hello sequence * as a percentage of the total number of transmit levels (e.g. 30% of 10 levels * means that the 3 highest power levels will execute the hello sequence) * Note that the 0..100.0 percentage is converted to 0..1.0 * * @param percentRehello (0..100.0) the percentage of total levels which will do the hello */ public void setRehelloPercent(double percentRehello) { m_rehelloPercent = percentRehello / 100.0; } public double GetRemainingPower() { return m_powerReserveWattHours - m_protocol.GetWattsConsumed(); } public double GetMaxPower() { return m_maxPowerReserveWattHours; } public void SetWattHours(double wattHours) { m_powerReserveWattHours = wattHours; m_maxPowerReserveWattHours = m_powerReserveWattHours; m_tiredLevelWatts = m_tiredRatio * m_maxPowerReserveWattHours; } public void setSensorDataSizeBytes(int numBytes) { m_sensorDataSizeBytes = (byte)numBytes; } public void setSensorDataHeaderSizeBytes(int numBytes) { m_sensorDataHeaderSizeBytes = numBytes; } public void setSensorDataTicsBetweenTransmits(int ticsBetweenTransmits) { m_sensorDataTicsBetweenTransmits = ticsBetweenTransmits; } /** * To avoid losing track of the location, only the AllNodes should call this * @param pt new location */ public void setLocation(jdr.utils.FloatPoint pt) { m_location = pt; m_protocol.setMyNodeLocation(m_location); } public jdr.utils.FloatPoint getLocation() { return m_location; } public int getNodeNumber() { return m_myNodeNumber; } public void SetNumNodesPerCluster(double nodesPerCluster) { m_numNodesPerCluster = nodesPerCluster; } public void SetDataRate(double dataRateBps) { m_dataRateBps = dataRateBps; m_protocol.SetDataRate(m_dataRateBps); } public PacketIF GetNextPacket() { return m_protocol.PeekNextPacket(); } public void SetHighliteCluster(boolean highlite) { SetHighliteCluster(highlite, true); } public void SetHighliteCluster(boolean highlite, boolean propagate) { if(m_highlite == highlite) { return; } m_highlite = highlite; if(!propagate) { return; } // need to get our cluster head, and propagate the message to the rest of the cluster // Note this message is not passed through the normal node comm channel as it has nothing to do // with normal node communication, it is strictly an artifact of the simulator // if this node is the cluster head, let it tell its children, if this node is a child, let it // tell its cluster head if(m_myClusterHead == m_myNodeNumber) { synchronized(m_clusterMembers) { int numNodes = m_clusterMembers.size(); for(int i = 0; i < numNodes; ++i) { NodeHeard nodeHrd = (NodeHeard)m_clusterMembers.get(i); NodeIF node = AllNodes.Instance().GetNodeByNumber(nodeHrd.m_nodeNumber); m_logger.debug("SetHighliteCluster: "+ highlite +" my Node num: " + m_myNodeNumber + " cluster child node num: " + nodeHrd.m_nodeNumber); if(node instanceof MobileNode) { ((MobileNode)node).SetHighliteCluster(highlite, false); } } } } else if(propagate) // we were the first node set { // pass the message along to our clusterhead NodeIF clusterHead = AllNodes.Instance().GetNodeByNumber(m_myClusterHead); if(clusterHead instanceof MobileNode) { ((MobileNode)clusterHead).SetHighliteCluster(highlite, true); } } } public void SetHighliteClusterMembers(boolean highlite, int clusterHead) { if(m_myClusterHead == clusterHead) { SetHighliteCluster(highlite, false); } } public void SetTransmitRingsOn(boolean ringsOn) { m_drawTransmitRings = ringsOn; } public void Draw(Graphics2D graphics, Rectangle bounds, double xScale, double yScale) { // draw the node, always show cluster heads in a different color Color origColor = graphics.getColor(); if(GetRemainingPower() <= 0) { m_nodeState = AWAITING_RTS; } Color nodeStateColor = Color.red; switch(m_nodeState) { case SEND_HELLO_MESSAGE: nodeStateColor = Color.white; break; case AWAITING_RTS: nodeStateColor = Color.green; break; case AWAITING_CTS: nodeStateColor = Color.magenta; break; case AWAITING_PACKET: nodeStateColor = Color.blue; break; case AWAITING_ACK: nodeStateColor = Color.pink; break; case SEND_SENSOR_DATA_MESSAGE: nodeStateColor = Color.lightGray; break; case SEND_CLUSTERED_DATA_MESSAGE: nodeStateColor = Color.gray; break; case SEND_CLUSTER_HEAD_GETTING_TIRED_MESSAGE: nodeStateColor = Color.orange; break; case TRANSMITTING: nodeStateColor = Color.yellow; break; default: nodeStateColor = Color.red; break; } graphics.setColor(nodeStateColor); graphics.fillOval((int)(m_location.getX() / xScale), (int)(m_location.getY() / yScale), 6, 6); if(m_myClusterHead == m_myNodeNumber) { graphics.setColor(Color.cyan); graphics.fillRect((int)(m_location.getX() / xScale) -2, (int)(m_location.getY() / yScale) -2, 4, 4); } // if the m_highlite flag is set, add a circle around cluster-mates if(m_highlite) { graphics.setColor(Color.white); graphics.drawOval((int)(m_location.getX() / xScale) - 3, (int)(m_location.getY() / yScale) - 3, 12, 12); //m_logger.debug("Highliting node num: "+m_myNodeNumber); } if(m_drawTransmitRings) { graphics.setColor(Color.yellow); Stroke currentStroke = graphics.getStroke(); float [] dashInfo = new float[2]; dashInfo[0] = (float)2.0; dashInfo[1] = (float)4.0; graphics.setStroke(new BasicStroke((float)1.0, BasicStroke.CAP_SQUARE, BasicStroke.JOIN_ROUND, (float)1.0, dashInfo, (float)1.0)); PropagationIF propagation = PropagationIF.getInstance(); for(double pwr = MIN_TRANSMIT_POWER_DBM; pwr <= MAX_TRANSMIT_POWER_DBM; pwr += m_helloTransmitIncrement) { double distAtPwr = propagation.GetTransmitDistance(pwr); graphics.drawOval((int)((m_location.getX() / xScale) - ((distAtPwr / xScale) - 3)), (int)((m_location.getY() / yScale) - ((distAtPwr / yScale) - 3)), (int)(((distAtPwr / xScale) * 2)), (int)(((distAtPwr / yScale) * 2))); } graphics.setStroke(currentStroke); } // draw a circle representing the transmit distance if(m_nodeState == TRANSMITTING) { // get the propagation model so we can get the transmit distance PropagationIF propagation = PropagationIF.getInstance(); double distAtPwr = propagation.GetTransmitDistance(m_lastTransmitPowerLeveldBm); graphics.setColor(Color.yellow); Stroke currentStroke = graphics.getStroke(); float [] dashInfo = new float[2]; dashInfo[0] = (float)2.0; dashInfo[1] = (float)4.0; graphics.setStroke(new BasicStroke((float)1.0, BasicStroke.CAP_SQUARE, BasicStroke.JOIN_ROUND, (float)1.0, dashInfo, (float)1.0)); graphics.drawOval((int)((m_location.getX() / xScale) - ((distAtPwr / xScale) - 3)), (int)((m_location.getY() / yScale) - ((distAtPwr / yScale) - 3)), (int)(((distAtPwr / xScale) * 2)), (int)(((distAtPwr / yScale) * 2))); graphics.setStroke(currentStroke); } // draw the 'fuel guage' bar double powerReserve = GetRemainingPower(); int guageAngle = (int)(-180 * (powerReserve / m_maxPowerReserveWattHours)); if(Math.abs(guageAngle) < 15) { guageAngle = -15; // enough to stay slightly visible } if(powerReserve <= 0) { guageAngle = -180; // all magenta indicates dead graphics.setColor(Color.magenta); } else if(powerReserve <= (m_maxPowerReserveWattHours / 10)) { graphics.setColor(Color.red); } else if(powerReserve < (m_maxPowerReserveWattHours / 2)) { graphics.setColor(Color.yellow); } else { graphics.setColor(Color.green); } graphics.fillArc((int)(m_location.getX() / xScale) + 6, (int)(m_location.getY() / yScale) + 6, 10, 10, 180, guageAngle); graphics.setColor(origColor); m_protocol.Draw(graphics, bounds, xScale, yScale); } public String toString() { return "MobileNode node: " + m_myNodeNumber; } /** * dumps the nodes this node has heard during the 'hello' sequence to the logger */ public void DumpNodesHeard() { String nodesHrd = ""; synchronized(m_nodesHeard) { int numNodes = m_nodesHeard.size(); for(int i = 0; i < numNodes; ++i) { nodesHrd += " " + m_nodesHeard.get(i).toString() + "\n"; } if(numNodes == 0) { nodesHrd += " None\n"; } } m_logger.debug("Nodes heard by node: " + m_myNodeNumber + " total ("+m_nodesHeard.size()+")" + " :\n" + nodesHrd); } public void DumpNodePacketQueue() { int numPacketsInQueue = 0; boolean busy = true; try { busy = !(m_busyLock.attempt(1)); if(!busy) { // we weren't busy, so release the mutex m_busyLock.release(); } } catch(InterruptedException ie) { busy = false; } String logStr = "node: "+m_myNodeNumber+ " busy status: "+ busy + " node state: "+GetNodeStateString(m_nodeState); logStr += m_protocol.DumpPacketQueue(); m_logger.debug(logStr); } /** * dumps the cluster information: * who is head for this node, or if this node is a head, the nodes it is * head for */ public void DumpClusterInfo() { String msg = GetClusterInfo(); m_logger.debug(msg); } public String GetClusterInfo() { String msg = ""; if(m_myClusterHead == m_myNodeNumber) { String clusterMembers = ""; synchronized(m_clusterMembers) { int numNodes = m_clusterMembers.size(); for(int i = 0; i < numNodes; ++i) { clusterMembers += " " + m_clusterMembers.get(i).toString() + "\n"; } if(numNodes == 0) { clusterMembers += " None\n"; } } msg = "Cluster head node: "+ m_myNodeNumber + "\n Cluster members " + " total ("+m_clusterMembers.size()+")" + " :\n" + clusterMembers; } else { msg = "This node: "+ m_myNodeNumber + " has cluster head: "+ m_myClusterHead; } msg = msg + " highlite set to: "+m_highlite; msg = msg + " \r\n location of this node: "+m_location.toString(); double pwrRes = GetRemainingPower(); msg = msg + "\r\n Power reserve Watts: " + pwrRes + " ("+m_percentFormat.format(100*(pwrRes / m_maxPowerReserveWattHours))+"%)"; return msg; } public String GetNodeStateString() { return GetNodeStateString(m_nodeState); } public String GetNodeStateString(int state) { String stateStr = "unknown"; switch(state) { case SEND_HELLO_MESSAGE: stateStr = "SEND_HELLO_MESSAGE"; break; case AWAITING_RTS: stateStr = "AWAITING_RTS"; break; case AWAITING_CTS: stateStr = "AWAITING_CTS"; break; case AWAITING_PACKET: stateStr = "AWAITING_PACKET"; break; case AWAITING_ACK: stateStr = "AWAITING_ACK"; break; case SEND_SENSOR_DATA_MESSAGE: stateStr = "SEND_SENSOR_DATA_MESSAGE"; break; case SEND_CLUSTERED_DATA_MESSAGE: stateStr = "SEND_CLUSTERED_DATA_MESSAGE"; break; case SEND_CLUSTER_HEAD_GETTING_TIRED_MESSAGE: stateStr = "SEND_CLUSTER_HEAD_GETTING_TIRED_MESSAGE"; break; case TRANSMITTING: stateStr = "TRANSMITTING"; break; default: break; } return stateStr; } //================================================================== // inner classes //================================================================== public class ChildPower { public int m_nodeNumber = -1; public double m_level = -1; public ChildPower() { } public ChildPower(int node, double lvl) { m_nodeNumber = node; m_level = lvl; } public boolean equals(Object obj) { boolean rc = false; if(obj instanceof NodeHeard) { rc = (((NodeHeard)obj).m_nodeNumber == m_nodeNumber); } return rc; } public String toString() { return "Node: " + m_nodeNumber + " level: " + m_level; } } } // end class definition