Go to the documentation of this file.
50 #include "debug/DistEthernet.hh"
51 #include "debug/DistEthernetPkt.hh"
74 inform(
"Next dist synchronisation tick is changed to %lu.\n",
nextAt);
78 panic(
"Dist synchronisation interval must be greater than zero");
82 inform(
"Dist synchronisation interval is changed to %lu.\n",
90 std::unique_lock<std::mutex> sync_lock(
lock);
107 nextAt = std::numeric_limits<Tick>::max();
108 nextRepeat = std::numeric_limits<Tick>::max();
121 nextAt = std::numeric_limits<Tick>::max();
122 nextRepeat = std::numeric_limits<Tick>::max();
129 std::unique_lock<std::mutex> sync_lock(
lock);
132 assert(waitNum == 0);
138 header.syncRepeat = nextRepeat;
139 header.needCkpt = needCkpt;
140 header.needStopSync = needStopSync;
143 header.needExit = needExit;
150 auto lf = [
this]{
return waitNum == 0; };
151 cv.wait(sync_lock, lf);
153 assert(isAbort || !same_tick || (nextAt ==
curTick()));
161 std::unique_lock<std::mutex> sync_lock(
lock);
165 auto lf = [
this]{
return waitNum == 0; };
166 cv.wait(sync_lock, lf);
168 assert(waitNum == 0);
171 assert(!same_tick || (nextAt ==
curTick()));
176 header.syncRepeat = nextRepeat;
177 if (doCkpt || numCkptReq == numNodes) {
184 if (doExit || numExitReq == numNodes) {
190 if (doStopSync || numStopSyncReq == numNodes) {
208 std::unique_lock<std::mutex> sync_lock(
lock);
213 if (send_tick > nextAt)
215 if (nextRepeat > sync_repeat)
216 nextRepeat = sync_repeat;
249 std::unique_lock<std::mutex> sync_lock(
lock);
254 nextAt = max_send_tick;
255 nextRepeat = next_repeat;
273 std::lock_guard<std::mutex> sync_lock(
lock);
276 warn(
"Ckpt requested multiple times (req:%d)\n",
static_cast<int>(req));
284 std::lock_guard<std::mutex> sync_lock(
lock);
287 warn(
"Exit requested multiple times (req:%d)\n",
static_cast<int>(req));
309 int need_exit =
static_cast<int>(needExit);
318 needExit =
static_cast<ReqType>(need_exit);
345 panic(
"DistIface::SyncEvent::start() aborted\n");
354 assert(!scheduled());
363 inform(
"Dist sync scheduled at %lu and repeats %lu\n", when(),
381 "Distributed sync is hit while draining");
420 warn_once(
"Tried to wake up thread in dist-gem5, but it "
421 "was already awake!\n");
439 recvDone = recv_done;
440 linkDelay = link_delay;
448 Tick recv_tick = send_tick + send_delay + linkDelay;
450 assert(recv_tick >= prev_recv_tick + send_delay);
451 panic_if(prev_recv_tick + send_delay > recv_tick,
452 "Receive window is smaller than send delay");
454 "Simulators out of sync - missed packet receive by %llu ticks"
455 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
457 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
476 while (!descQueue.empty()) {
477 Desc d = descQueue.front();
480 d.sendDelay =
d.packet->simLength;
487 if (recvDone->scheduled()) {
488 assert(!descQueue.empty());
489 eventManager->reschedule(recvDone,
curTick());
491 assert(descQueue.empty() &&
v.empty());
503 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
505 DPRINTF(DistEthernetPkt,
"DistIface::recvScheduler::pushPacket "
506 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
507 send_tick, send_delay, linkDelay, recv_tick);
521 descQueue.emplace(new_packet, send_tick, send_delay);
522 if (descQueue.size() == 1) {
523 assert(!recvDone->scheduled());
524 eventManager->schedule(recvDone, recv_tick);
526 assert(recvDone->scheduled());
527 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
528 "Out of order packet received (recv_tick: %lu top(): %lu\n",
529 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
543 if (descQueue.size() > 0) {
544 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
545 descQueue.front().sendDelay,
547 eventManager->schedule(recvDone, recv_tick);
558 packet->serialize(
"rxPacket", cp);
566 packet = std::make_shared<EthPacketData>();
567 packet->unserialize(
"rxPacket", cp);
575 std::queue<Desc> tmp_queue(descQueue);
576 unsigned n_desc_queue = descQueue.size();
577 assert(tmp_queue.size() == descQueue.size());
579 for (
int i = 0;
i < n_desc_queue;
i++) {
580 tmp_queue.front().serializeSection(cp,
csprintf(
"rxDesc_%d",
i));
583 assert(tmp_queue.empty());
589 assert(descQueue.size() == 0);
590 assert(!recvDone->scheduled());
591 assert(!ckptRestore);
595 unsigned n_desc_queue;
597 for (
int i = 0;
i < n_desc_queue;
i++) {
600 descQueue.push(recv_desc);
611 bool is_switch,
int num_nodes) :
616 DPRINTF(DistEthernet,
"DistIface() ctor rank:%d\n",dist_rank);
619 assert(
sync ==
nullptr);
658 header.sendDelay = send_delay;
660 header.dataPacketLength = pkt->length;
661 header.simLength = pkt->simLength;
667 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
668 pkt->length, send_delay);
724 const_cast<Event *
>(recv_done),
732 DPRINTF(DistEthernet,
"DistIFace::drain() called\n");
741 DPRINTF(DistEthernet,
"DistIFace::drainResume() called\n");
769 unsigned rank_orig, dist_iface_id_orig;
773 panic_if(
rank != rank_orig,
"Rank mismatch at resume (rank=%d, orig=%d)",
776 "at resume (distIfaceId=%d, orig=%d)",
distIfaceId,
802 assert(
sync !=
nullptr);
815 DPRINTF(DistEthernet,
"DistIface::startup() started\n");
819 DPRINTF(DistEthernet,
"DistIface::startup() done\n");
826 DPRINTF(DistEthernet,
"DistIface::readyToCkpt() called, delay:%lu "
827 "period:%lu\n", delay, period);
830 inform(
"m5 checkpoint called with zero delay => triggering collaborative "
834 inform(
"m5 checkpoint called with non-zero delay => triggering immediate "
835 "checkpoint (at the next sync)\n");
839 inform(
"Non-zero period for m5_ckpt is ignored in "
840 "distributed gem5 runs\n");
849 std::lock_guard<std::mutex> sync_lock(
lock);
864 inform(
"Request toggling syncronization off\n");
878 inform(
"Request toggling syncronization on\n");
898 DPRINTF(DistEthernet,
"DistIface::readyToExit() called, delay:%lu\n",
906 inform(
"m5 exit called with zero delay => triggering collaborative "
910 inform(
"m5 exit called with non-zero delay => triggering immediate "
911 "exit (at the next sync)\n");
926 warn(
"Dist-rank parameter is queried in single gem5 simulation.");
939 warn(
"Dist-size parameter is queried in single gem5 simulation.");
Tick curTick()
The universal simulation clock.
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
virtual void initTransport()=0
Init hook for the underlaying transport.
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
virtual System * getSystemPtr()=0
unsigned rank
The rank of this process among the gem5 peers.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
#define UNSERIALIZE_SCALAR(scalar)
void serialize(CheckpointOut &cp) const override
Serialize an object.
void init(Tick start, Tick repeat)
Initialize periodic sync params.
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
ReqType needStopSync
Sync stop requested.
void serializeSection(CheckpointOut &cp, const char *name) const
Serialize an object into a new section.
bool doStopSync
Flag is set if sync is to stop upon sync completion.
void lock()
Provide an interface for locking/unlocking the event queue.
void serialize(CheckpointOut &cp) const override
Serialize an object.
Received packet descriptor.
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
std::string csprintf(const char *format, const Args &...args)
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
unsigned size
The number of gem5 processes comprising this dist simulation.
virtual void requestStopSync(ReqType req)=0
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
virtual Status status() const =0
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync)=0
Callback when the receiver thread gets a sync ack message.
Tick syncRepeat
Frequency of dist sync events in ticks.
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
void serialize(CheckpointOut &cp) const override
Serialize an object.
DrainState
Object drain/handover states.
unsigned distIfaceId
Unique id for the dist link.
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
ThreadContext is the external interface to all thread state for anything outside of the CPU.
void quiesce()
Quiesce thread context.
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
void requestStopSync(ReqType req) override
std::shared_ptr< EthPacketData > EthPacketPtr
static bool isSwitch
Is this node a switch?
void unserialize(CheckpointIn &cp) override
Unserialize an object.
@ Suspended
Temporarily inactive.
static Sync * sync
The singleton Sync object to perform dist synchronisation.
uint64_t Tick
Tick count type.
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
void requestExit(ReqType req) override
void requestCkpt(ReqType req) override
std::thread * recvThread
Receiver thread pointer.
void drainResume() override
Resume execution after a successful drain.
static unsigned distIfaceNum
Number of DistIface objects (i.e.
void init(const Event *e, Tick link_delay)
The global event to schedule periodic dist sync.
Tick nextAt
Tick for the next periodic sync (if the event is not scheduled yet)
@ Drained
Buffers drained, ready for serialization/handover.
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
DrainState drain() override
Draining is the process of clearing out the states of SimObjects.These are the SimObjects that are pa...
std::mutex lock
The lock to protect access to the Sync object.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
#define SERIALIZE_SCALAR(scalar)
virtual void requestCkpt(ReqType req)=0
bool run(bool same_tick) override
Core method to perform a full dist sync.
virtual void sendCmd(const Header &header)=0
Send out a control command to the remote end.
void start()
Schedule the first periodic sync event.
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
void process() override
This is a global event so process() will only be called by exactly one simulation thread.
static uint64_t sizeParam()
Getter for the dist size param.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
static DistIface * primary
The very first DistIface object created becomes the primary interface.
EventQueue * curEventQueue()
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
Tick nextRepeat
The repeat value for the next periodic sync.
SyncSwitch(int num_nodes)
virtual void requestExit(ReqType req)=0
std::ostream CheckpointOut
static void toggleSync(ThreadContext *tc)
Trigger the primary to start/stop synchronization.
Tick syncStart
Tick to schedule the first dist sync event.
static uint64_t rankParam()
Getter for the dist rank param.
bool run(bool same_tick) override
Core method to perform a full dist sync.
Reference material can be found at the JEDEC website: UFS standard http://www.jedec....
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
RecvScheduler recvScheduler
Meta information about data packets received.
#define panic(...)
This implements a cprintf based panic() function.
void serialize(CheckpointOut &cp) const override
Serialize an object.
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
Generated on Tue Dec 21 2021 11:34:29 for gem5 by doxygen 1.8.17