50 #include "debug/DistEthernet.hh" 
   51 #include "debug/DistEthernetPkt.hh" 
   74         inform(
"Next dist synchronisation tick is changed to %lu.\n", 
nextAt);
 
   78         panic(
"Dist synchronisation interval must be greater than zero");
 
   82         inform(
"Dist synchronisation interval is changed to %lu.\n",
 
   90     std::unique_lock<std::mutex> sync_lock(
lock);
 
  107     nextAt = std::numeric_limits<Tick>::max();
 
  108     nextRepeat = std::numeric_limits<Tick>::max();
 
  121     nextAt = std::numeric_limits<Tick>::max();
 
  122     nextRepeat = std::numeric_limits<Tick>::max();
 
  129     std::unique_lock<std::mutex> sync_lock(
lock);
 
  132     assert(waitNum == 0);
 
  138     header.syncRepeat = nextRepeat;
 
  139     header.needCkpt = needCkpt;
 
  140     header.needStopSync = needStopSync;
 
  143     header.needExit = needExit;
 
  150     auto lf = [
this]{ 
return waitNum == 0; };
 
  151     cv.wait(sync_lock, lf);
 
  153     assert(isAbort || !same_tick || (nextAt == 
curTick()));
 
  161     std::unique_lock<std::mutex> sync_lock(
lock);
 
  165         auto lf = [
this]{ 
return waitNum == 0; };
 
  166         cv.wait(sync_lock, lf);
 
  168     assert(waitNum == 0);
 
  171     assert(!same_tick || (nextAt == 
curTick()));
 
  176     header.syncRepeat = nextRepeat;
 
  177     if (doCkpt || numCkptReq == numNodes) {
 
  184     if (doExit || numExitReq == numNodes) {
 
  190     if (doStopSync || numStopSyncReq == numNodes) {
 
  208     std::unique_lock<std::mutex> sync_lock(
lock);
 
  213     if (send_tick > nextAt)
 
  215     if (nextRepeat > sync_repeat)
 
  216         nextRepeat = sync_repeat;
 
  249     std::unique_lock<std::mutex> sync_lock(
lock);
 
  254     nextAt = max_send_tick;
 
  255     nextRepeat = next_repeat;
 
  273    std::lock_guard<std::mutex> sync_lock(
lock);
 
  276        warn(
"Ckpt requested multiple times (req:%d)\n", 
static_cast<int>(req));
 
  284    std::lock_guard<std::mutex> sync_lock(
lock);
 
  287        warn(
"Exit requested multiple times (req:%d)\n", 
static_cast<int>(req));
 
  309     int need_exit = 
static_cast<int>(needExit);
 
  318     needExit = 
static_cast<ReqType>(need_exit);
 
  345         panic(
"DistIface::SyncEvent::start() aborted\n");
 
  354         assert(!scheduled());
 
  363     inform(
"Dist sync scheduled at %lu and repeats %lu\n",  when(),
 
  381              "Distributed sync is hit while draining");
 
  420                     warn_once(
"Tried to wake up thread in dist-gem5, but it " 
  421                               "was already awake!\n");
 
  439     recvDone = recv_done;
 
  440     linkDelay = link_delay;
 
  448     Tick recv_tick = send_tick + send_delay + linkDelay;
 
  450     assert(recv_tick >= prev_recv_tick + send_delay);
 
  451     panic_if(prev_recv_tick + send_delay > recv_tick,
 
  452              "Receive window is smaller than send delay");
 
  454              "Simulators out of sync - missed packet receive by %llu ticks" 
  455              "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu " 
  457              curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
 
  476     while (!descQueue.empty()) {
 
  477         Desc d = descQueue.front();
 
  480         d.sendDelay = 
d.packet->simLength; 
 
  487     if (recvDone->scheduled()) {
 
  488         assert(!descQueue.empty());
 
  489         eventManager->reschedule(recvDone, 
curTick());
 
  491         assert(descQueue.empty() && 
v.empty());
 
  503     Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
 
  505     DPRINTF(DistEthernetPkt, 
"DistIface::recvScheduler::pushPacket " 
  506             "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
 
  507             send_tick, send_delay, linkDelay, recv_tick);
 
  521     descQueue.emplace(new_packet, send_tick, send_delay);
 
  522     if (descQueue.size() == 1) {
 
  523         assert(!recvDone->scheduled());
 
  524         eventManager->schedule(recvDone, recv_tick);
 
  526         assert(recvDone->scheduled());
 
  527         panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
 
  528                  "Out of order packet received (recv_tick: %lu top(): %lu\n",
 
  529                  recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
 
  543     if (descQueue.size() > 0) {
 
  544         Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
 
  545                                          descQueue.front().sendDelay,
 
  547         eventManager->schedule(recvDone, recv_tick);
 
  558         packet->serialize(
"rxPacket", cp);
 
  566         packet = std::make_shared<EthPacketData>();
 
  567         packet->unserialize(
"rxPacket", cp);
 
  575     std::queue<Desc> tmp_queue(descQueue);
 
  576     unsigned n_desc_queue = descQueue.size();
 
  577     assert(tmp_queue.size() == descQueue.size());
 
  579     for (
int i = 0; 
i < n_desc_queue; 
i++) {
 
  580         tmp_queue.front().serializeSection(cp, 
csprintf(
"rxDesc_%d", 
i));
 
  583     assert(tmp_queue.empty());
 
  589     assert(descQueue.size() == 0);
 
  590     assert(!recvDone->scheduled());
 
  591     assert(!ckptRestore);
 
  595     unsigned n_desc_queue;
 
  597     for (
int i = 0; 
i < n_desc_queue; 
i++) {
 
  600         descQueue.push(recv_desc);
 
  611                      bool is_switch, 
int num_nodes) :
 
  616     DPRINTF(DistEthernet, 
"DistIface() ctor rank:%d\n",dist_rank);
 
  619         assert(
sync == 
nullptr);
 
  658     header.sendDelay = send_delay;
 
  660     header.dataPacketLength = pkt->length;
 
  661     header.simLength = pkt->simLength;
 
  667             "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
 
  668             pkt->length, send_delay);
 
  724                                  const_cast<Event *
>(recv_done),
 
  732     DPRINTF(DistEthernet,
"DistIFace::drain() called\n");
 
  741     DPRINTF(DistEthernet,
"DistIFace::drainResume() called\n");
 
  769     unsigned rank_orig, dist_iface_id_orig;
 
  773     panic_if(
rank != rank_orig, 
"Rank mismatch at resume (rank=%d, orig=%d)",
 
  776              "at resume (distIfaceId=%d, orig=%d)", 
distIfaceId,
 
  802     assert(
sync != 
nullptr);
 
  815     DPRINTF(DistEthernet, 
"DistIface::startup() started\n");
 
  819     DPRINTF(DistEthernet, 
"DistIface::startup() done\n");
 
  826     DPRINTF(DistEthernet, 
"DistIface::readyToCkpt() called, delay:%lu " 
  827             "period:%lu\n", delay, period);
 
  830             inform(
"m5 checkpoint called with zero delay => triggering collaborative " 
  834             inform(
"m5 checkpoint called with non-zero delay => triggering immediate " 
  835                    "checkpoint (at the next sync)\n");
 
  839             inform(
"Non-zero period for m5_ckpt is ignored in " 
  840                    "distributed gem5 runs\n");
 
  849    std::lock_guard<std::mutex> sync_lock(
lock);
 
  864         inform(
"Request toggling syncronization off\n");
 
  876         inform(
"Request toggling syncronization on\n");
 
  894     DPRINTF(DistEthernet, 
"DistIface::readyToExit() called, delay:%lu\n",
 
  902             inform(
"m5 exit called with zero delay => triggering collaborative " 
  906             inform(
"m5 exit called with non-zero delay => triggering immediate " 
  907                    "exit (at the next sync)\n");
 
  922         warn(
"Dist-rank parameter is queried in single gem5 simulation.");
 
  935         warn(
"Dist-size parameter is queried in single gem5 simulation.");
 
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
The global event to schedule periodic dist sync.
void start()
Schedule the first periodic sync event.
void process() override
This is a global event so process() will only be called by exactly one simulation thread.
void requestExit(ReqType req) override
void serialize(CheckpointOut &cp) const override
Serialize an object.
ReqType needStopSync
Sync stop requested.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
void requestCkpt(ReqType req) override
bool run(bool same_tick) override
Core method to perform a full dist sync.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void requestStopSync(ReqType req) override
bool run(bool same_tick) override
Core method to perform a full dist sync.
SyncSwitch(int num_nodes)
void serialize(CheckpointOut &cp) const override
Serialize an object.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
Tick nextRepeat
The repeat value for the next periodic sync.
bool doStopSync
Flag is set if sync is to stop upon sync completion.
void init(Tick start, Tick repeat)
Initialize periodic sync params.
Tick nextAt
Tick for the next periodic sync (if the event is not scheduled yet)
virtual void requestStopSync(ReqType req)=0
virtual void requestExit(ReqType req)=0
virtual void requestCkpt(ReqType req)=0
std::mutex lock
The lock to protect access to the Sync object.
virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync)=0
Callback when the receiver thread gets a sync ack message.
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
static uint64_t rankParam()
Getter for the dist rank param.
virtual void initTransport()=0
Init hook for the underlaying transport.
RecvScheduler recvScheduler
Meta information about data packets received.
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
static void toggleSync(ThreadContext *tc)
Trigger the primary to start/stop synchronization.
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
void init(const Event *e, Tick link_delay)
unsigned rank
The rank of this process among the gem5 peers.
unsigned distIfaceId
Unique id for the dist link.
static DistIface * primary
The very first DistIface object created becomes the primary interface.
static uint64_t sizeParam()
Getter for the dist size param.
DrainState drain() override
Draining is the process of clearing out the states of SimObjects.These are the SimObjects that are pa...
std::thread * recvThread
Receiver thread pointer.
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
virtual void sendCmd(const Header &header)=0
Send out a control command to the remote end.
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
static Sync * sync
The singleton Sync object to perform dist synchronisation.
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
static unsigned distIfaceNum
Number of DistIface objects (i.e.
void drainResume() override
Resume execution after a successful drain.
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
static bool isSwitch
Is this node a switch?
void serialize(CheckpointOut &cp) const override
Serialize an object.
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
Tick syncRepeat
Frequency of dist sync events in ticks.
Tick syncStart
Tick to schedule the first dist sync event.
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
unsigned size
The number of gem5 processes comprising this dist simulation.
void lock()
Provide an interface for locking/unlocking the event queue.
ThreadContext is the external interface to all thread state for anything outside of the CPU.
virtual System * getSystemPtr()=0
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
void quiesce()
Quiesce thread context.
@ Suspended
Temporarily inactive.
virtual Status status() const =0
DrainState
Object drain/handover states.
@ Drained
Buffers drained, ready for serialization/handover.
#define panic(...)
This implements a cprintf based panic() function.
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
void serializeSection(CheckpointOut &cp, const char *name) const
Serialize an object into a new section.
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
Reference material can be found at the JEDEC website: UFS standard http://www.jedec....
Tick curTick()
The universal simulation clock.
std::ostream CheckpointOut
uint64_t Tick
Tick count type.
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
EventQueue * curEventQueue()
std::string csprintf(const char *format, const Args &...args)
std::shared_ptr< EthPacketData > EthPacketPtr
#define UNSERIALIZE_SCALAR(scalar)
#define SERIALIZE_SCALAR(scalar)
Received packet descriptor.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void unserialize(CheckpointIn &cp) override
Unserialize an object.