50 #include "debug/DistEthernet.hh" 51 #include "debug/DistEthernetPkt.hh" 69 if (start_tick < nextAt) {
71 inform(
"Next dist synchronisation tick is changed to %lu.\n", nextAt);
75 panic(
"Dist synchronisation interval must be greater than zero");
77 if (repeat_tick < nextRepeat) {
78 nextRepeat = repeat_tick;
79 inform(
"Dist synchronisation interval is changed to %lu.\n",
87 std::unique_lock<std::mutex> sync_lock(
lock);
104 nextAt = std::numeric_limits<Tick>::max();
105 nextRepeat = std::numeric_limits<Tick>::max();
118 nextAt = std::numeric_limits<Tick>::max();
119 nextRepeat = std::numeric_limits<Tick>::max();
126 std::unique_lock<std::mutex> sync_lock(
lock);
129 assert(waitNum == 0);
133 header.
msgType = MsgType::cmdSyncReq;
139 needCkpt = ReqType::pending;
142 needExit = ReqType::pending;
144 needStopSync = ReqType::pending;
145 DistIface::master->
sendCmd(header);
147 auto lf = [
this]{
return waitNum == 0; };
148 cv.wait(sync_lock, lf);
150 assert(isAbort || !same_tick || (nextAt ==
curTick()));
158 std::unique_lock<std::mutex> sync_lock(
lock);
162 auto lf = [
this]{
return waitNum == 0; };
163 cv.wait(sync_lock, lf);
165 assert(waitNum == 0);
168 assert(!same_tick || (nextAt ==
curTick()));
171 header.
msgType = MsgType::cmdSyncAck;
174 if (doCkpt || numCkptReq == numNodes) {
176 header.
needCkpt = ReqType::immediate;
181 if (doExit || numExitReq == numNodes) {
183 header.
needExit = ReqType::immediate;
187 if (doStopSync || numStopSyncReq == numNodes) {
194 DistIface::master->
sendCmd(header);
205 std::unique_lock<std::mutex> sync_lock(
lock);
210 if (send_tick > nextAt)
212 if (nextRepeat > sync_repeat)
213 nextRepeat = sync_repeat;
215 if (need_ckpt == ReqType::collective)
217 else if (need_ckpt == ReqType::immediate)
219 if (need_exit == ReqType::collective)
221 else if (need_exit == ReqType::immediate)
223 if (need_stop_sync == ReqType::collective)
225 else if (need_stop_sync == ReqType::immediate)
246 std::unique_lock<std::mutex> sync_lock(
lock);
251 nextAt = max_send_tick;
252 nextRepeat = next_repeat;
270 std::lock_guard<std::mutex> sync_lock(
lock);
273 warn(
"Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
281 std::lock_guard<std::mutex> sync_lock(
lock);
284 warn(
"Exit requested multiple times (req:%d)\n", static_cast<int>(req));
306 int need_exit =
static_cast<int>(needExit);
315 needExit =
static_cast<ReqType>(need_exit);
341 if (!DistIface::sync->run(
false))
342 panic(
"DistIface::SyncEvent::start() aborted\n");
344 assert(!DistIface::sync->doCkpt);
345 assert(!DistIface::sync->doExit);
346 assert(!DistIface::sync->doStopSync);
347 assert(DistIface::sync->nextAt >=
curTick());
348 assert(DistIface::sync->nextRepeat <= repeat);
351 assert(!scheduled());
356 reschedule(DistIface::sync->nextAt);
358 schedule(DistIface::sync->nextAt);
360 inform(
"Dist sync scheduled at %lu and repeats %lu\n", when(),
361 DistIface::sync->nextRepeat);
377 panic_if(_draining && DistIface::sync->doCkpt,
378 "Distributed sync is hit while draining");
394 if (!DistIface::sync->run(
true))
398 if (DistIface::sync->doCkpt)
400 if (DistIface::sync->doExit) {
404 if (DistIface::sync->doStopSync) {
419 warn_once(
"Tried to wake up thread in dist-gem5, but it " 420 "was already awake!\n");
438 recvDone = recv_done;
439 linkDelay = link_delay;
447 Tick recv_tick = send_tick + send_delay + linkDelay;
449 assert(recv_tick >= prev_recv_tick + send_delay);
450 panic_if(prev_recv_tick + send_delay > recv_tick,
451 "Receive window is smaller than send delay");
453 "Simulators out of sync - missed packet receive by %llu ticks" 454 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu " 456 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
475 while (!descQueue.empty()) {
476 Desc d = descQueue.front();
486 if (recvDone->scheduled()) {
487 assert(!descQueue.empty());
488 eventManager->reschedule(recvDone,
curTick());
490 assert(descQueue.empty() && v.empty());
502 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
504 DPRINTF(DistEthernetPkt,
"DistIface::recvScheduler::pushPacket " 505 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
506 send_tick, send_delay, linkDelay, recv_tick);
508 assert(send_tick > master->syncEvent->when() -
509 master->syncEvent->repeat);
511 assert(send_tick + send_delay + linkDelay > master->syncEvent->when());
520 descQueue.emplace(new_packet, send_tick, send_delay);
521 if (descQueue.size() == 1) {
522 assert(!recvDone->scheduled());
523 eventManager->schedule(recvDone, recv_tick);
525 assert(recvDone->scheduled());
526 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
527 "Out of order packet received (recv_tick: %lu top(): %lu\n",
528 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
542 if (descQueue.size() > 0) {
543 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
544 descQueue.front().sendDelay,
546 eventManager->schedule(recvDone, recv_tick);
557 packet->serialize(
"rxPacket", cp);
565 packet = std::make_shared<EthPacketData>();
566 packet->unserialize(
"rxPacket", cp);
574 std::queue<Desc> tmp_queue(descQueue);
575 unsigned n_desc_queue = descQueue.size();
576 assert(tmp_queue.size() == descQueue.size());
578 for (
int i = 0;
i < n_desc_queue;
i++) {
579 tmp_queue.front().serializeSection(cp,
csprintf(
"rxDesc_%d",
i));
582 assert(tmp_queue.empty());
588 assert(descQueue.size() == 0);
589 assert(!recvDone->scheduled());
590 assert(!ckptRestore);
594 unsigned n_desc_queue;
596 for (
int i = 0;
i < n_desc_queue;
i++) {
599 descQueue.push(recv_desc);
610 bool is_switch,
int num_nodes) :
611 syncStart(sync_start), syncRepeat(sync_repeat),
612 recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op),
613 rank(dist_rank), size(dist_size)
615 DPRINTF(DistEthernet,
"DistIface() ctor rank:%d\n",dist_rank);
618 assert(
sync ==
nullptr);
666 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
667 pkt->length, send_delay);
723 const_cast<Event *>(recv_done),
731 DPRINTF(DistEthernet,
"DistIFace::drain() called\n");
740 DPRINTF(DistEthernet,
"DistIFace::drainResume() called\n");
768 unsigned rank_orig, dist_iface_id_orig;
772 panic_if(
rank != rank_orig,
"Rank mismatch at resume (rank=%d, orig=%d)",
775 "at resume (distIfaceId=%d, orig=%d)",
distIfaceId,
801 assert(
sync !=
nullptr);
806 assert(
master !=
nullptr);
814 DPRINTF(DistEthernet,
"DistIface::startup() started\n");
818 DPRINTF(DistEthernet,
"DistIface::startup() done\n");
825 DPRINTF(DistEthernet,
"DistIface::readyToCkpt() called, delay:%lu " 826 "period:%lu\n", delay, period);
829 inform(
"m5 checkpoint called with zero delay => triggering collaborative " 833 inform(
"m5 checkpoint called with non-zero delay => triggering immediate " 834 "checkpoint (at the next sync)\n");
838 inform(
"Non-zero period for m5_ckpt is ignored in " 839 "distributed gem5 runs\n");
848 std::lock_guard<std::mutex> sync_lock(
lock);
863 inform(
"Request toggling syncronization off\n");
870 #if THE_ISA != NULL_ISA 878 inform(
"Request toggling syncronization on\n");
885 #if THE_ISA != NULL_ISA 899 DPRINTF(DistEthernet,
"DistIface::readyToExit() called, delay:%lu\n",
907 inform(
"m5 exit called with zero delay => triggering collaborative " 911 inform(
"m5 exit called with non-zero delay => triggering immediate " 912 "exit (at the next sync)\n");
927 warn(
"Dist-rank parameter is queried in single gem5 simulation.");
940 warn(
"Dist-size parameter is queried in single gem5 simulation.");
bool run(bool same_tick) override
Core method to perform a full dist sync.
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
#define panic(...)
This implements a cprintf based panic() function.
virtual System * getSystemPtr()=0
virtual void sendCmd(const Header &header)=0
Send out a control command to the remote end.
void start()
Schedule the first periodic sync event.
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
virtual void requestExit(ReqType req)=0
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
DrainState drain() override
Notify an object that it needs to drain its state.
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
bool run(bool same_tick) override
Core method to perform a full dist sync.
void serialize(CheckpointOut &cp) const override
Serialize an object.
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
static bool isSwitch
Is this node a switch?
Overload hash function for BasicBlockRange type.
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void drainResume() override
Resume execution after a successful drain.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void serialize(CheckpointOut &cp) const override
Serialize an object.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
std::thread * recvThread
Receiver thread pointer.
static void toggleSync(ThreadContext *tc)
Trigger the master to start/stop synchronization.
ThreadContext is the external interface to all thread state for anything outside of the CPU...
void serialize(CheckpointOut &cp) const override
Serialize an object.
DrainState
Object drain/handover states.
virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync)=0
Callback when the receiver thread gets a sync ack message.
void serializeSection(CheckpointOut &cp, const char *name) const
Serialize an object into a new section.
void quiesce()
Quiesce thread context.
virtual void initTransport()=0
Init hook for the underlaying transport.
ThreadContext * getThreadContext(ContextID tid) const
Received packet descriptor.
static DistIface * master
The very first DistIface object created becomes the master.
#define UNSERIALIZE_SCALAR(scalar)
Draining buffers pending serialization/handover.
Tick curTick()
The current simulated tick.
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
std::string csprintf(const char *format, const Args &...args)
This class implements global sync operations among gem5 peer processes.
static uint64_t sizeParam()
Getter for the dist size param.
void init(const Event *e, Tick link_delay)
uint64_t Tick
Tick count type.
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
Tick nextRepeat
The repeat value for the next periodic sync.
Tick syncStart
Tick to schedule the first dist sync event.
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
EventQueue * curEventQueue()
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
unsigned numContexts() const
std::shared_ptr< EthPacketData > EthPacketPtr
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
const FlagsType none
Nothing extra to print.
bool doStopSync
Flag is set if sync is to stop upon sync completion.
void requestExit(ReqType req) override
virtual void activate()=0
Set the status to Active.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
The global event to schedule periodic dist sync.
unsigned size
The number of gem5 processes comprising this dist simulation.
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
#define SERIALIZE_SCALAR(scalar)
RecvScheduler recvScheduler
Meta information about data packets received.
void process() override
This is a global event so process() will only be called by exactly one simulation thread...
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void requestStopSync(ReqType req) override
Tick syncRepeat
Frequency of dist sync events in ticks.
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
std::ostream CheckpointOut
static Sync * sync
The singleton Sync object to perform dist synchronisation.
void init(Tick start, Tick repeat)
Initialize periodic sync params.
unsigned distIfaceId
Unique id for the dist link.
static unsigned distIfaceNum
Number of DistIface objects (i.e.
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
virtual void requestStopSync(ReqType req)=0
void requestCkpt(ReqType req) override
void serialize(CheckpointOut &cp) const override
Serialize an object.
void serialize(CheckpointOut &cp) const override
Serialize an object.
virtual Status status() const =0
void lock()
Provide an interface for locking/unlocking the event queue.
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
virtual void requestCkpt(ReqType req)=0
Temporarily release the event queue service lock.
The interface class to talk to peer gem5 processes.
static uint64_t rankParam()
Getter for the dist rank param.
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
SyncSwitch(int num_nodes)
unsigned rank
The rank of this process among the gem5 peers.