Go to the documentation of this file.
50 #include "debug/DistEthernet.hh"
51 #include "debug/DistEthernetPkt.hh"
69 if (start_tick < nextAt) {
71 inform(
"Next dist synchronisation tick is changed to %lu.\n", nextAt);
75 panic(
"Dist synchronisation interval must be greater than zero");
77 if (repeat_tick < nextRepeat) {
78 nextRepeat = repeat_tick;
79 inform(
"Dist synchronisation interval is changed to %lu.\n",
87 std::unique_lock<std::mutex> sync_lock(
lock);
104 nextAt = std::numeric_limits<Tick>::max();
105 nextRepeat = std::numeric_limits<Tick>::max();
118 nextAt = std::numeric_limits<Tick>::max();
119 nextRepeat = std::numeric_limits<Tick>::max();
126 std::unique_lock<std::mutex> sync_lock(
lock);
129 assert(waitNum == 0);
133 header.msgType = MsgType::cmdSyncReq;
135 header.syncRepeat = nextRepeat;
136 header.needCkpt = needCkpt;
137 header.needStopSync = needStopSync;
139 needCkpt = ReqType::pending;
140 header.needExit = needExit;
142 needExit = ReqType::pending;
144 needStopSync = ReqType::pending;
147 auto lf = [
this]{
return waitNum == 0; };
148 cv.wait(sync_lock, lf);
150 assert(isAbort || !same_tick || (nextAt ==
curTick()));
158 std::unique_lock<std::mutex> sync_lock(
lock);
162 auto lf = [
this]{
return waitNum == 0; };
163 cv.wait(sync_lock, lf);
165 assert(waitNum == 0);
168 assert(!same_tick || (nextAt ==
curTick()));
171 header.msgType = MsgType::cmdSyncAck;
173 header.syncRepeat = nextRepeat;
174 if (doCkpt || numCkptReq == numNodes) {
176 header.needCkpt = ReqType::immediate;
181 if (doExit || numExitReq == numNodes) {
183 header.needExit = ReqType::immediate;
187 if (doStopSync || numStopSyncReq == numNodes) {
190 header.needStopSync = ReqType::immediate;
205 std::unique_lock<std::mutex> sync_lock(
lock);
210 if (send_tick > nextAt)
212 if (nextRepeat > sync_repeat)
213 nextRepeat = sync_repeat;
215 if (need_ckpt == ReqType::collective)
217 else if (need_ckpt == ReqType::immediate)
219 if (need_exit == ReqType::collective)
221 else if (need_exit == ReqType::immediate)
223 if (need_stop_sync == ReqType::collective)
225 else if (need_stop_sync == ReqType::immediate)
246 std::unique_lock<std::mutex> sync_lock(
lock);
251 nextAt = max_send_tick;
252 nextRepeat = next_repeat;
270 std::lock_guard<std::mutex> sync_lock(
lock);
273 warn(
"Ckpt requested multiple times (req:%d)\n",
static_cast<int>(req));
281 std::lock_guard<std::mutex> sync_lock(
lock);
284 warn(
"Exit requested multiple times (req:%d)\n",
static_cast<int>(req));
306 int need_exit =
static_cast<int>(needExit);
315 needExit =
static_cast<ReqType>(need_exit);
342 panic(
"DistIface::SyncEvent::start() aborted\n");
351 assert(!scheduled());
360 inform(
"Dist sync scheduled at %lu and repeats %lu\n", when(),
378 "Distributed sync is hit while draining");
413 for (
auto *tc: primary->sys->threads) {
417 warn_once(
"Tried to wake up thread in dist-gem5, but it "
418 "was already awake!\n");
436 recvDone = recv_done;
437 linkDelay = link_delay;
445 Tick recv_tick = send_tick + send_delay + linkDelay;
447 assert(recv_tick >= prev_recv_tick + send_delay);
448 panic_if(prev_recv_tick + send_delay > recv_tick,
449 "Receive window is smaller than send delay");
451 "Simulators out of sync - missed packet receive by %llu ticks"
452 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
454 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
473 while (!descQueue.empty()) {
474 Desc d = descQueue.front();
477 d.sendDelay =
d.packet->simLength;
484 if (recvDone->scheduled()) {
485 assert(!descQueue.empty());
486 eventManager->reschedule(recvDone,
curTick());
488 assert(descQueue.empty() &&
v.empty());
500 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
502 DPRINTF(DistEthernetPkt,
"DistIface::recvScheduler::pushPacket "
503 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
504 send_tick, send_delay, linkDelay, recv_tick);
506 assert(send_tick > primary->syncEvent->when() -
507 primary->syncEvent->repeat);
509 assert(send_tick + send_delay + linkDelay > primary->syncEvent->when());
518 descQueue.emplace(new_packet, send_tick, send_delay);
519 if (descQueue.size() == 1) {
520 assert(!recvDone->scheduled());
521 eventManager->schedule(recvDone, recv_tick);
523 assert(recvDone->scheduled());
524 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
525 "Out of order packet received (recv_tick: %lu top(): %lu\n",
526 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
540 if (descQueue.size() > 0) {
541 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
542 descQueue.front().sendDelay,
544 eventManager->schedule(recvDone, recv_tick);
555 packet->serialize(
"rxPacket",
cp);
563 packet = std::make_shared<EthPacketData>();
564 packet->unserialize(
"rxPacket",
cp);
572 std::queue<Desc> tmp_queue(descQueue);
573 unsigned n_desc_queue = descQueue.size();
574 assert(tmp_queue.size() == descQueue.size());
576 for (
int i = 0;
i < n_desc_queue;
i++) {
577 tmp_queue.front().serializeSection(
cp,
csprintf(
"rxDesc_%d",
i));
580 assert(tmp_queue.empty());
586 assert(descQueue.size() == 0);
587 assert(!recvDone->scheduled());
588 assert(!ckptRestore);
592 unsigned n_desc_queue;
594 for (
int i = 0;
i < n_desc_queue;
i++) {
597 descQueue.push(recv_desc);
608 bool is_switch,
int num_nodes) :
609 syncStart(sync_start), syncRepeat(sync_repeat),
610 recvThread(nullptr), recvScheduler(
em), syncStartOnPseudoOp(use_pseudo_op),
611 rank(dist_rank), size(dist_size)
613 DPRINTF(DistEthernet,
"DistIface() ctor rank:%d\n",dist_rank);
616 assert(
sync ==
nullptr);
655 header.sendDelay = send_delay;
657 header.dataPacketLength = pkt->length;
658 header.simLength = pkt->simLength;
664 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
665 pkt->length, send_delay);
721 const_cast<Event *
>(recv_done),
729 DPRINTF(DistEthernet,
"DistIFace::drain() called\n");
738 DPRINTF(DistEthernet,
"DistIFace::drainResume() called\n");
766 unsigned rank_orig, dist_iface_id_orig;
770 panic_if(
rank != rank_orig,
"Rank mismatch at resume (rank=%d, orig=%d)",
773 "at resume (distIfaceId=%d, orig=%d)",
distIfaceId,
799 assert(
sync !=
nullptr);
812 DPRINTF(DistEthernet,
"DistIface::startup() started\n");
816 DPRINTF(DistEthernet,
"DistIface::startup() done\n");
823 DPRINTF(DistEthernet,
"DistIface::readyToCkpt() called, delay:%lu "
824 "period:%lu\n", delay, period);
827 inform(
"m5 checkpoint called with zero delay => triggering collaborative "
831 inform(
"m5 checkpoint called with non-zero delay => triggering immediate "
832 "checkpoint (at the next sync)\n");
836 inform(
"Non-zero period for m5_ckpt is ignored in "
837 "distributed gem5 runs\n");
846 std::lock_guard<std::mutex> sync_lock(
lock);
861 inform(
"Request toggling syncronization off\n");
868 #if THE_ISA != NULL_ISA
875 inform(
"Request toggling syncronization on\n");
882 #if THE_ISA != NULL_ISA
895 DPRINTF(DistEthernet,
"DistIface::readyToExit() called, delay:%lu\n",
903 inform(
"m5 exit called with zero delay => triggering collaborative "
907 inform(
"m5 exit called with non-zero delay => triggering immediate "
908 "exit (at the next sync)\n");
923 warn(
"Dist-rank parameter is queried in single gem5 simulation.");
936 warn(
"Dist-size parameter is queried in single gem5 simulation.");
void unserialize(CheckpointIn &cp) override
Unserialize an object.
static unsigned distIfaceNum
Number of DistIface objects (i.e.
virtual void requestExit(ReqType req)=0
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
The interface class to talk to peer gem5 processes.
#define UNSERIALIZE_SCALAR(scalar)
void quiesce()
Quiesce thread context.
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
bool run(bool same_tick) override
Core method to perform a full dist sync.
void process() override
This is a global event so process() will only be called by exactly one simulation thread.
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
static DistIface * primary
The very first DistIface object created becomes the primary interface.
void serialize(CheckpointOut &cp) const override
Serialize an object.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
DrainState drain() override
Draining is the process of clearing out the states of SimObjects.These are the SimObjects that are pa...
void unserialize(CheckpointIn &cp) override
Unserialize an object.
uint64_t Tick
Tick count type.
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
static void toggleSync(ThreadContext *tc)
Trigger the primary to start/stop synchronization.
void serialize(CheckpointOut &cp) const override
Serialize an object.
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void serializeSection(CheckpointOut &cp, const char *name) const
Serialize an object into a new section.
const FlagsType none
Nothing extra to print.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
bool run(bool same_tick) override
Core method to perform a full dist sync.
Tick nextRepeat
The repeat value for the next periodic sync.
@ Drained
Buffers drained, ready for serialization/handover.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
DrainState
Object drain/handover states.
void init(const Event *e, Tick link_delay)
unsigned size
The number of gem5 processes comprising this dist simulation.
ReqType needStopSync
Sync stop requested.
The global event to schedule periodic dist sync.
virtual void initTransport()=0
Init hook for the underlaying transport.
Received packet descriptor.
void requestStopSync(ReqType req) override
static uint64_t rankParam()
Getter for the dist rank param.
ThreadContext is the external interface to all thread state for anything outside of the CPU.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync)=0
Callback when the receiver thread gets a sync ack message.
static bool isSwitch
Is this node a switch?
void requestCkpt(ReqType req) override
unsigned rank
The rank of this process among the gem5 peers.
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
void drainResume() override
Resume execution after a successful drain.
virtual void sendCmd(const Header &header)=0
Send out a control command to the remote end.
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
EventQueue * curEventQueue()
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
std::thread * recvThread
Receiver thread pointer.
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
virtual Status status() const =0
void start()
Schedule the first periodic sync event.
virtual void requestStopSync(ReqType req)=0
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
#define SERIALIZE_SCALAR(scalar)
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
static Sync * sync
The singleton Sync object to perform dist synchronisation.
void lock()
Provide an interface for locking/unlocking the event queue.
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
std::shared_ptr< EthPacketData > EthPacketPtr
void requestExit(ReqType req) override
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
@ Suspended
Temporarily inactive.
bool doStopSync
Flag is set if sync is to stop upon sync completion.
Overload hash function for BasicBlockRange type.
void init(Tick start, Tick repeat)
Initialize periodic sync params.
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
unsigned distIfaceId
Unique id for the dist link.
std::ostream CheckpointOut
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
virtual void requestCkpt(ReqType req)=0
void serialize(CheckpointOut &cp) const override
Serialize an object.
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
std::mutex lock
The lock to protect access to the Sync object.
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
RecvScheduler recvScheduler
Meta information about data packets received.
SyncSwitch(int num_nodes)
static uint64_t sizeParam()
Getter for the dist size param.
std::string csprintf(const char *format, const Args &...args)
virtual System * getSystemPtr()=0
Tick syncStart
Tick to schedule the first dist sync event.
#define panic(...)
This implements a cprintf based panic() function.
Tick syncRepeat
Frequency of dist sync events in ticks.
Tick curTick()
The current simulated tick.
void serialize(CheckpointOut &cp) const override
Serialize an object.
Generated on Wed Sep 30 2020 14:02:11 for gem5 by doxygen 1.8.17