Go to the documentation of this file.
50 #include "debug/DistEthernet.hh"
51 #include "debug/DistEthernetPkt.hh"
70 inform(
"Next dist synchronisation tick is changed to %lu.\n",
nextAt);
74 panic(
"Dist synchronisation interval must be greater than zero");
78 inform(
"Dist synchronisation interval is changed to %lu.\n",
86 std::unique_lock<std::mutex> sync_lock(
lock);
103 nextAt = std::numeric_limits<Tick>::max();
104 nextRepeat = std::numeric_limits<Tick>::max();
117 nextAt = std::numeric_limits<Tick>::max();
118 nextRepeat = std::numeric_limits<Tick>::max();
125 std::unique_lock<std::mutex> sync_lock(
lock);
128 assert(waitNum == 0);
134 header.syncRepeat = nextRepeat;
135 header.needCkpt = needCkpt;
136 header.needStopSync = needStopSync;
139 header.needExit = needExit;
146 auto lf = [
this]{
return waitNum == 0; };
147 cv.wait(sync_lock, lf);
149 assert(isAbort || !same_tick || (nextAt ==
curTick()));
157 std::unique_lock<std::mutex> sync_lock(
lock);
161 auto lf = [
this]{
return waitNum == 0; };
162 cv.wait(sync_lock, lf);
164 assert(waitNum == 0);
167 assert(!same_tick || (nextAt ==
curTick()));
172 header.syncRepeat = nextRepeat;
173 if (doCkpt || numCkptReq == numNodes) {
180 if (doExit || numExitReq == numNodes) {
186 if (doStopSync || numStopSyncReq == numNodes) {
204 std::unique_lock<std::mutex> sync_lock(
lock);
209 if (send_tick > nextAt)
211 if (nextRepeat > sync_repeat)
212 nextRepeat = sync_repeat;
245 std::unique_lock<std::mutex> sync_lock(
lock);
250 nextAt = max_send_tick;
251 nextRepeat = next_repeat;
269 std::lock_guard<std::mutex> sync_lock(
lock);
272 warn(
"Ckpt requested multiple times (req:%d)\n",
static_cast<int>(req));
280 std::lock_guard<std::mutex> sync_lock(
lock);
283 warn(
"Exit requested multiple times (req:%d)\n",
static_cast<int>(req));
305 int need_exit =
static_cast<int>(needExit);
314 needExit =
static_cast<ReqType>(need_exit);
341 panic(
"DistIface::SyncEvent::start() aborted\n");
350 assert(!scheduled());
359 inform(
"Dist sync scheduled at %lu and repeats %lu\n", when(),
377 "Distributed sync is hit while draining");
416 warn_once(
"Tried to wake up thread in dist-gem5, but it "
417 "was already awake!\n");
435 recvDone = recv_done;
436 linkDelay = link_delay;
444 Tick recv_tick = send_tick + send_delay + linkDelay;
446 assert(recv_tick >= prev_recv_tick + send_delay);
447 panic_if(prev_recv_tick + send_delay > recv_tick,
448 "Receive window is smaller than send delay");
450 "Simulators out of sync - missed packet receive by %llu ticks"
451 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
453 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
472 while (!descQueue.empty()) {
473 Desc d = descQueue.front();
476 d.sendDelay =
d.packet->simLength;
483 if (recvDone->scheduled()) {
484 assert(!descQueue.empty());
485 eventManager->reschedule(recvDone,
curTick());
487 assert(descQueue.empty() &&
v.empty());
499 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
501 DPRINTF(DistEthernetPkt,
"DistIface::recvScheduler::pushPacket "
502 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
503 send_tick, send_delay, linkDelay, recv_tick);
517 descQueue.emplace(new_packet, send_tick, send_delay);
518 if (descQueue.size() == 1) {
519 assert(!recvDone->scheduled());
520 eventManager->schedule(recvDone, recv_tick);
522 assert(recvDone->scheduled());
523 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
524 "Out of order packet received (recv_tick: %lu top(): %lu\n",
525 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
539 if (descQueue.size() > 0) {
540 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
541 descQueue.front().sendDelay,
543 eventManager->schedule(recvDone, recv_tick);
554 packet->serialize(
"rxPacket",
cp);
562 packet = std::make_shared<EthPacketData>();
563 packet->unserialize(
"rxPacket",
cp);
571 std::queue<Desc> tmp_queue(descQueue);
572 unsigned n_desc_queue = descQueue.size();
573 assert(tmp_queue.size() == descQueue.size());
575 for (
int i = 0;
i < n_desc_queue;
i++) {
576 tmp_queue.front().serializeSection(
cp,
csprintf(
"rxDesc_%d",
i));
579 assert(tmp_queue.empty());
585 assert(descQueue.size() == 0);
586 assert(!recvDone->scheduled());
587 assert(!ckptRestore);
591 unsigned n_desc_queue;
593 for (
int i = 0;
i < n_desc_queue;
i++) {
596 descQueue.push(recv_desc);
607 bool is_switch,
int num_nodes) :
612 DPRINTF(DistEthernet,
"DistIface() ctor rank:%d\n",dist_rank);
615 assert(
sync ==
nullptr);
654 header.sendDelay = send_delay;
656 header.dataPacketLength = pkt->length;
657 header.simLength = pkt->simLength;
663 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
664 pkt->length, send_delay);
720 const_cast<Event *
>(recv_done),
728 DPRINTF(DistEthernet,
"DistIFace::drain() called\n");
737 DPRINTF(DistEthernet,
"DistIFace::drainResume() called\n");
765 unsigned rank_orig, dist_iface_id_orig;
769 panic_if(
rank != rank_orig,
"Rank mismatch at resume (rank=%d, orig=%d)",
772 "at resume (distIfaceId=%d, orig=%d)",
distIfaceId,
798 assert(
sync !=
nullptr);
811 DPRINTF(DistEthernet,
"DistIface::startup() started\n");
815 DPRINTF(DistEthernet,
"DistIface::startup() done\n");
822 DPRINTF(DistEthernet,
"DistIface::readyToCkpt() called, delay:%lu "
823 "period:%lu\n", delay, period);
826 inform(
"m5 checkpoint called with zero delay => triggering collaborative "
830 inform(
"m5 checkpoint called with non-zero delay => triggering immediate "
831 "checkpoint (at the next sync)\n");
835 inform(
"Non-zero period for m5_ckpt is ignored in "
836 "distributed gem5 runs\n");
845 std::lock_guard<std::mutex> sync_lock(
lock);
860 inform(
"Request toggling syncronization off\n");
867 #if THE_ISA != NULL_ISA
874 inform(
"Request toggling syncronization on\n");
881 #if THE_ISA != NULL_ISA
894 DPRINTF(DistEthernet,
"DistIface::readyToExit() called, delay:%lu\n",
902 inform(
"m5 exit called with zero delay => triggering collaborative "
906 inform(
"m5 exit called with non-zero delay => triggering immediate "
907 "exit (at the next sync)\n");
922 warn(
"Dist-rank parameter is queried in single gem5 simulation.");
935 warn(
"Dist-size parameter is queried in single gem5 simulation.");
void unserialize(CheckpointIn &cp) override
Unserialize an object.
static unsigned distIfaceNum
Number of DistIface objects (i.e.
virtual void requestExit(ReqType req)=0
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
The interface class to talk to peer gem5 processes.
#define UNSERIALIZE_SCALAR(scalar)
void quiesce()
Quiesce thread context.
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
bool run(bool same_tick) override
Core method to perform a full dist sync.
void process() override
This is a global event so process() will only be called by exactly one simulation thread.
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
static DistIface * primary
The very first DistIface object created becomes the primary interface.
void serialize(CheckpointOut &cp) const override
Serialize an object.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
DrainState drain() override
Draining is the process of clearing out the states of SimObjects.These are the SimObjects that are pa...
void unserialize(CheckpointIn &cp) override
Unserialize an object.
uint64_t Tick
Tick count type.
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
static void toggleSync(ThreadContext *tc)
Trigger the primary to start/stop synchronization.
void serialize(CheckpointOut &cp) const override
Serialize an object.
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void serializeSection(CheckpointOut &cp, const char *name) const
Serialize an object into a new section.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
bool run(bool same_tick) override
Core method to perform a full dist sync.
Tick nextRepeat
The repeat value for the next periodic sync.
@ Drained
Buffers drained, ready for serialization/handover.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
DrainState
Object drain/handover states.
void init(const Event *e, Tick link_delay)
unsigned size
The number of gem5 processes comprising this dist simulation.
ReqType needStopSync
Sync stop requested.
The global event to schedule periodic dist sync.
virtual void initTransport()=0
Init hook for the underlaying transport.
Received packet descriptor.
void requestStopSync(ReqType req) override
static uint64_t rankParam()
Getter for the dist rank param.
ThreadContext is the external interface to all thread state for anything outside of the CPU.
Tick nextAt
Tick for the next periodic sync (if the event is not scheduled yet)
void unserialize(CheckpointIn &cp) override
Unserialize an object.
virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync)=0
Callback when the receiver thread gets a sync ack message.
static bool isSwitch
Is this node a switch?
void requestCkpt(ReqType req) override
unsigned rank
The rank of this process among the gem5 peers.
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
void drainResume() override
Resume execution after a successful drain.
virtual void sendCmd(const Header &header)=0
Send out a control command to the remote end.
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
EventQueue * curEventQueue()
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
std::thread * recvThread
Receiver thread pointer.
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
virtual Status status() const =0
void start()
Schedule the first periodic sync event.
virtual void requestStopSync(ReqType req)=0
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
#define SERIALIZE_SCALAR(scalar)
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
static Sync * sync
The singleton Sync object to perform dist synchronisation.
void lock()
Provide an interface for locking/unlocking the event queue.
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
std::shared_ptr< EthPacketData > EthPacketPtr
void requestExit(ReqType req) override
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
@ Suspended
Temporarily inactive.
bool doStopSync
Flag is set if sync is to stop upon sync completion.
void init(Tick start, Tick repeat)
Initialize periodic sync params.
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
unsigned distIfaceId
Unique id for the dist link.
std::ostream CheckpointOut
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
virtual void requestCkpt(ReqType req)=0
Tick curTick()
The universal simulation clock.
void serialize(CheckpointOut &cp) const override
Serialize an object.
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
std::mutex lock
The lock to protect access to the Sync object.
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
RecvScheduler recvScheduler
Meta information about data packets received.
SyncSwitch(int num_nodes)
static uint64_t sizeParam()
Getter for the dist size param.
std::string csprintf(const char *format, const Args &...args)
virtual System * getSystemPtr()=0
Tick syncStart
Tick to schedule the first dist sync event.
#define panic(...)
This implements a cprintf based panic() function.
Tick syncRepeat
Frequency of dist sync events in ticks.
void serialize(CheckpointOut &cp) const override
Serialize an object.
Generated on Tue Mar 23 2021 19:41:26 for gem5 by doxygen 1.8.17