/** * Jeff Rupp * Master's Thesis * 2005 * * threaded class that takes care of scheduling events. * * Singleton class, as there is only 1 scheduler * * Has a concept of parallel events, such that the simulation clock does not * get incremented if the next event in the queue has a start time that has already passed * and if that event's duration will also keep it in the past * * Uses a pool of threads for the actual execution of events. The servicing of the queue * of events is a single thread which does only that. */ package jdr.mobisim; import java.util.*; import org.apache.log4j.*; public class Scheduler extends Thread { private static Logger m_logger = Logger.getLogger(Scheduler.class); private static final boolean s_DO_SCHEDULER_LOGS = false; private static Scheduler s_instance; public static final long s_ticsPerSecond = 1000000; private java.lang.Object m_scheduleLock = new java.lang.Object(); // the schedule queue will be a map from Long times to Vectors of events at that time // This was done due to the long time to insert new events. Experimentally I've seen // that when the queue has 180,000 elements, there are only 4,500 unique times // With luck this will speed the simulation significantly private TreeMap m_scheduleQueue = new TreeMap(); // map from EventCallbackIF to a TreeSet of Longs, used to get the next time an EventCallbackIF // is scheduled to be called ( so that the EventCallbackIF isn't jumping ahead in time) private java.lang.Object m_evtIfMapLock = new java.lang.Object(); private HashMap m_evtIfToTimesMap = new HashMap(); private boolean m_okToKeepRunning = true; private boolean m_okToContinueSimulating = false; private long m_whenToStopSimulation = Long.MAX_VALUE; private double m_speedPercent = 1.0; private int m_maxTicsBetweenDelays = 100000; private double m_currentTicsBetweenDelays = m_maxTicsBetweenDelays; private long m_simulationTime = 0; private java.lang.Object m_clockLock = new java.lang.Object(); private int m_delaySleepDurationMsec = 100; long m_ticsSinceLastDelay = 0; long m_simTicsWhenLastDelayInjected = 0; private long m_msecAddedLastEvent = 0; private java.lang.Object m_msecAddedLastEventLock = new java.lang.Object(); // semaphore to block running private EDU.oswego.cs.dl.util.concurrent.Semaphore m_sem = new EDU.oswego.cs.dl.util.concurrent.Semaphore(1); private static final int s_THREAD_POOL_MAX_THREADS = 50; private EDU.oswego.cs.dl.util.concurrent.PooledExecutor m_threadPool = null; protected Scheduler() { super("Scheduler"); m_threadPool = new EDU.oswego.cs.dl.util.concurrent.PooledExecutor(new EDU.oswego.cs.dl.util.concurrent.BoundedBuffer(10), s_THREAD_POOL_MAX_THREADS); m_threadPool.setMinimumPoolSize(10); // minimum thread pool size to always have around m_threadPool.setKeepAliveTime(1000 * 60 * 5); // keep threads around 5 minutes after last use m_threadPool.createThreads(20); // start out with 20 right away (grows to max very quickly) m_threadPool.waitWhenBlocked(); } /** * Schedule a callback for and event * Enqueues with a desired execution time of now * @param event the event to schedule */ public boolean ScheduleEvent(EventCallbackIF event) { return ScheduleEvent(event, GetSimulationTime()); } public boolean ScheduleEvent(EventCallbackIF event, boolean doLog) { return ScheduleEvent(event, GetSimulationTime(), doLog); } /** * Schedule a callback for and event * Enqueues with a the specified execution time * @param event the event to schedule * @param at the sim tic time to execute the event, used to ensure ordering */ public boolean ScheduleEvent(EventCallbackIF event, long at) { return ScheduleEvent(event, at, false); } public boolean ScheduleEvent(EventCallbackIF event, long at, boolean doLog) { AddEvent(at, event); if(doLog) { m_logger.debug("added event to execute at time: "+at+ " Num event bins in queue: "+m_scheduleQueue.size() + " event source: " + event.toString());//, new Throwable()); } // only release the semaphore if it is not currently available if(m_sem.permits() == 0) { m_sem.release(); } return true; } public static Scheduler getInstance() { if(s_instance == null) { s_instance = new Scheduler(); s_instance.start(); } return s_instance; } public void ClearSchedule() { synchronized(m_scheduleLock) { m_scheduleQueue.clear(); m_evtIfToTimesMap.clear(); } } public void run() { m_simTicsWhenLastDelayInjected = GetSimulationTime(); EventCallbackIF nextEvt = null; long eventTime = 0; m_logger.debug("Scheduler thread started"); Vector currentBin = null; Object firstKey = null; while(m_okToKeepRunning) { try { while(m_okToContinueSimulating) { // delay based on the speed percentage if(m_speedPercent < 1.0) { m_ticsSinceLastDelay = GetSimulationTime() - m_simTicsWhenLastDelayInjected; if(m_ticsSinceLastDelay > m_currentTicsBetweenDelays) { m_simTicsWhenLastDelayInjected = GetSimulationTime(); sleep(m_delaySleepDurationMsec); } } // pop the head of the event queue and let it process synchronized(m_scheduleLock) { nextEvt = null; eventTime = -1; // hope this is reasonably quick... if(m_scheduleQueue.size() > 0) { try { currentBin = null; firstKey = m_scheduleQueue.firstKey(); if(firstKey != null) { currentBin = (Vector)m_scheduleQueue.get(firstKey); } while((currentBin != null) && (currentBin.size() == 0)) { m_scheduleQueue.remove(firstKey); firstKey = m_scheduleQueue.firstKey(); if(firstKey != null) { currentBin = (Vector)m_scheduleQueue.get(firstKey); } } if((currentBin != null) && (currentBin.size() > 0)) { ScheduledEvent evt = (ScheduledEvent)currentBin.remove(0); eventTime = evt.m_time; nextEvt = evt.m_event; } } catch(java.util.NoSuchElementException nse) { nextEvt = null; // queue is now empty, fall out and acquire the semaphore again } } } if(nextEvt != null) { if(s_DO_SCHEDULER_LOGS) { m_logger.debug("scheduling event, event time: "+eventTime+ " current sim time: "+m_simulationTime); } if(eventTime > m_whenToStopSimulation) { StopSimulation(); ClearSchedule(); m_logger.debug("Stopping simulation, run tics expired: " + m_whenToStopSimulation); break; } if(eventTime > m_simulationTime) { // note that we have idle time here, the inc will take care of that IncrementSimulationTics(eventTime, 0); } // hand the event off to the thread pool for execution m_threadPool.execute(new ExecuteEventRunnable(nextEvt, eventTime)); } else { // out of events to process, break to the sem acquire break; } } if(s_DO_SCHEDULER_LOGS) { m_logger.debug("acquire semaphore, halts event processing"); } m_sem.acquire(); if(s_DO_SCHEDULER_LOGS) { m_logger.debug("semaphore acquired, will process more events now"); } } catch(InterruptedException iex) { m_logger.error("Scheduler thread interrupted"); } } m_logger.debug("Scheduler thread halted"); } public void StopThread() { m_okToKeepRunning = false; } public void StopSimulation() { m_okToContinueSimulating = false; } // lets any events in the queue scheduled at a time < whenToStop run, then stops public void StopSimulation(long whenToStop) { m_whenToStopSimulation = whenToStop; m_logger.debug("Setting time to stop simulation to tics: " + m_whenToStopSimulation + " now: "+GetSimulationTime()); } public void StartSimulation() { m_logger.debug("StartSimulation (resume a paused simulation, or start a new one if none running)",new Throwable()); m_okToContinueSimulating = true; m_whenToStopSimulation = Long.MAX_VALUE; m_sem.release(); } public void RestartSimulation() { // ??? initialize sim values m_logger.debug("RestartSimulation (re-set sim values and start at begining)",new Throwable()); m_okToContinueSimulating = true; m_whenToStopSimulation = Long.MAX_VALUE; m_sem.release(); } /** * sets the percentage of max possible speed the scheduler allows events to be run */ public void SetSimulationSpeedPercentage(double percentage) { m_speedPercent = percentage; m_currentTicsBetweenDelays = m_maxTicsBetweenDelays * m_speedPercent; m_logger.debug("sim percent: "+ percentage + " tics between "+m_delaySleepDurationMsec+" mSec delays: " + m_currentTicsBetweenDelays); } /** * gets the current simulation tics */ public long GetSimulationTime() { synchronized (m_clockLock) { return m_simulationTime; } } /** * gets the current simulation tics */ public void SetSimulationTime(long time) { synchronized (m_clockLock) { m_simulationTime = time; } } /** * increments the simulation clock * To allow parallel processing, any increment needs to give the time it thinks it is * and the amount to increment that time. This will allow multiple things to * increment the sim clock, and have the appearance of running in parallel * * @param simTime the time of the sim clock to add the increment value to * @param inc the amount to increment the sim clock by */ public long IncrementSimulationTics(long simTime, int inc) { synchronized (m_clockLock) { int actualInc = inc - (int)(m_simulationTime - simTime); int idleTics = actualInc - inc; if(idleTics > 0) { Statistics.getInstance().IncIdleTics(idleTics); } if(actualInc > 0) { // if idleTics is < 0, then the busy tics can become greater than total tics, // giving an indication of the parallel processing going on Statistics.getInstance().IncBusyTics(actualInc - idleTics); m_simulationTime += actualInc; } // delay during the increment, that should serve to slow things down if(m_speedPercent < 1.0) { m_ticsSinceLastDelay = GetSimulationTime() - m_simTicsWhenLastDelayInjected; if(m_ticsSinceLastDelay > m_currentTicsBetweenDelays) { m_simTicsWhenLastDelayInjected = GetSimulationTime(); try { sleep(m_delaySleepDurationMsec); } catch(InterruptedException iex) { } } } return m_simulationTime; } } /** * Adds events in time ascending order, allows duplicates, latest duplicate goes * to end of list of same time */ protected void AddEvent(long at, EventCallbackIF evt) { synchronized(m_scheduleLock) { // find correct bin of times, if it doesn't exist create a new bin Long timeLong = new Long(at); Vector bin = (Vector)m_scheduleQueue.get(timeLong); if(bin == null) { bin = new Vector(); m_scheduleQueue.put(timeLong, bin); } bin.addElement(new ScheduledEvent(at, evt)); } synchronized(m_msecAddedLastEventLock) { m_msecAddedLastEvent = System.currentTimeMillis(); } // add to the evtIf to times map synchronized(m_evtIfMapLock) { TreeSet times = (TreeSet)(m_evtIfToTimesMap.get(evt)); if(times == null) { times = new TreeSet(); m_evtIfToTimesMap.put(evt, times); } // the tree map keep its elements sorted times.add(new Long(at)); } } /** * returns the next time that the given EventCallbackIF is scheduled for * * @param evt the EventCallbackIF to check in the schedule queue for * @return -1 if no event scheduled, or the time that will be passed */ public long getNextScheduledEvent(EventCallbackIF evt) { long when = -1; // search for the evt in the m_evtIfToTimesMap synchronized(m_evtIfMapLock) { TreeSet times = (TreeSet)(m_evtIfToTimesMap.get(evt)); if(times != null) { when = ((Long)(times.first())).longValue(); } } return when; } public long getMsecAddedLastEvent() { synchronized(m_msecAddedLastEventLock) { return m_msecAddedLastEvent; } } public void setThreadPoolSize(double numThreads) { m_threadPool.setMaximumPoolSize((int)numThreads); } public boolean QueueHasBeenEmpty(long msec) { synchronized(m_msecAddedLastEventLock) { return ((System.currentTimeMillis() - m_msecAddedLastEvent) > msec); } } public void DumpStatus() { m_logger.debug("queue size (num bins): " + m_scheduleQueue.keySet().size() + " at sim time: " + m_simulationTime); m_logger.debug("Requested stop time: " + m_whenToStopSimulation); m_logger.debug("Thread pool size: "+m_threadPool.getPoolSize()); boolean loggerHasAppenders = m_logger.getAllAppenders().hasMoreElements(); int numEvents = 0; int numBins = 0; int maxBinSize = -1; synchronized(m_scheduleLock) { ScheduledEvent evt = null; Iterator iter = m_scheduleQueue.values().iterator(); while(iter.hasNext()) { ++numBins; Vector vect = (Vector)iter.next(); int vectSize = vect.size(); int j = 0; for(j = 0; j < vectSize; ++j) { ++numEvents; if(loggerHasAppenders) { m_logger.info("Event #:" + numEvents + " " + vect.get(j).toString()); } } if(j > maxBinSize) { maxBinSize = j; } } } System.out.println("real time: "+System.currentTimeMillis()+ " sim time: " + m_simulationTime + " num bins: " + numBins + " max bin size: " + maxBinSize + " num events: " + numEvents); } //================================================================== // inner classes //================================================================== protected class ScheduledEvent implements Comparable { public EventCallbackIF m_event; public long m_time; public ScheduledEvent(long time, EventCallbackIF evt) { m_event = evt; m_time = time; } public int compareTo(Object o) { long retVal = 0; if(o instanceof ScheduledEvent) { retVal = (((ScheduledEvent)o).m_time - m_time); } if(retVal < 0) { return -1; } else if(retVal > 0) { return 1; } else { return 0; } //m_logger.debug("compareTo o time: "+((ScheduledEvent)o).m_time+" this time: "+ m_time+" returning: "+retVal); } public String toString() { String str = "event time: " + m_time + " packet: "; PacketIF pckt = m_event.GetNextPacket(); if(pckt != null) { str += pckt.toString(); } else { str += " none"; } return str; } } protected class ExecuteEventRunnable implements Runnable { protected EventCallbackIF m_event = null; protected long m_timeNow = 0; public ExecuteEventRunnable(EventCallbackIF evt, long timeNow) { m_event = evt; m_timeNow = timeNow; } public void run() { if(m_event != null) { if(!m_event.ExecuteEvent(m_timeNow)) { // ExecuteEvent failed, put this event back on the queue // m_logger.debug("re-scheduled event"); Scheduler.getInstance().ScheduleEvent(m_event, m_timeNow, false); } } } } } // end class definition