gem5  v22.1.0.0
dist_iface.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-2016 ARM Limited
3  * All rights reserved
4  *
5  * The license below extends only to copyright in the software and shall
6  * not be construed as granting a license to any other intellectual
7  * property including but not limited to intellectual property relating
8  * to a hardware implementation of the functionality of the software
9  * licensed hereunder. You may use the software subject to the license
10  * terms below provided that you ensure that this notice is replicated
11  * unmodified and in its entirety in all distributions of the software,
12  * modified or unmodified, in source code or in binary form.
13  *
14  * Redistribution and use in source and binary forms, with or without
15  * modification, are permitted provided that the following conditions are
16  * met: redistributions of source code must retain the above copyright
17  * notice, this list of conditions and the following disclaimer;
18  * redistributions in binary form must reproduce the above copyright
19  * notice, this list of conditions and the following disclaimer in the
20  * documentation and/or other materials provided with the distribution;
21  * neither the name of the copyright holders nor the names of its
22  * contributors may be used to endorse or promote products derived from
23  * this software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  */
37 
38 /* @file
39  * The interface class for dist-gem5 simulations.
40  */
41 
42 #include "dev/net/dist_iface.hh"
43 
44 #include <queue>
45 #include <thread>
46 
47 #include "base/random.hh"
48 #include "base/trace.hh"
49 #include "cpu/thread_context.hh"
50 #include "debug/DistEthernet.hh"
51 #include "debug/DistEthernetPkt.hh"
52 #include "dev/net/etherpkt.hh"
53 #include "sim/cur_tick.hh"
54 #include "sim/sim_exit.hh"
55 #include "sim/sim_object.hh"
56 #include "sim/system.hh"
57 
58 namespace gem5
59 {
60 
61 DistIface::Sync *DistIface::sync = nullptr;
62 System *DistIface::sys = nullptr;
63 DistIface::SyncEvent *DistIface::syncEvent = nullptr;
64 unsigned DistIface::distIfaceNum = 0;
65 unsigned DistIface::recvThreadsNum = 0;
66 DistIface *DistIface::primary = nullptr;
67 bool DistIface::isSwitch = false;
68 
69 void
70 DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
71 {
72  if (start_tick < nextAt) {
73  nextAt = start_tick;
74  inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
75  }
76 
77  if (repeat_tick == 0)
78  panic("Dist synchronisation interval must be greater than zero");
79 
80  if (repeat_tick < nextRepeat) {
81  nextRepeat = repeat_tick;
82  inform("Dist synchronisation interval is changed to %lu.\n",
83  nextRepeat);
84  }
85 }
86 
87 void
89 {
90  std::unique_lock<std::mutex> sync_lock(lock);
91  waitNum = 0;
92  isAbort = true;
93  sync_lock.unlock();
94  cv.notify_one();
95 }
96 
98 {
99  numNodes = num_nodes;
100  waitNum = num_nodes;
101  numExitReq = 0;
102  numCkptReq = 0;
103  numStopSyncReq = 0;
104  doExit = false;
105  doCkpt = false;
106  doStopSync = false;
107  nextAt = std::numeric_limits<Tick>::max();
108  nextRepeat = std::numeric_limits<Tick>::max();
109  isAbort = false;
110 }
111 
113 {
114  waitNum = 0;
115  needExit = ReqType::none;
116  needCkpt = ReqType::none;
117  needStopSync = ReqType::none;
118  doExit = false;
119  doCkpt = false;
120  doStopSync = false;
121  nextAt = std::numeric_limits<Tick>::max();
122  nextRepeat = std::numeric_limits<Tick>::max();
123  isAbort = false;
124 }
125 
126 bool
128 {
129  std::unique_lock<std::mutex> sync_lock(lock);
130  Header header;
131 
132  assert(waitNum == 0);
133  assert(!isAbort);
134  waitNum = DistIface::recvThreadsNum;
135  // initiate the global synchronisation
136  header.msgType = MsgType::cmdSyncReq;
137  header.sendTick = curTick();
138  header.syncRepeat = nextRepeat;
139  header.needCkpt = needCkpt;
140  header.needStopSync = needStopSync;
141  if (needCkpt != ReqType::none)
142  needCkpt = ReqType::pending;
143  header.needExit = needExit;
144  if (needExit != ReqType::none)
145  needExit = ReqType::pending;
146  if (needStopSync != ReqType::none)
147  needStopSync = ReqType::pending;
149  // now wait until all receiver threads complete the synchronisation
150  auto lf = [this]{ return waitNum == 0; };
151  cv.wait(sync_lock, lf);
152  // global synchronisation is done.
153  assert(isAbort || !same_tick || (nextAt == curTick()));
154  return !isAbort;
155 }
156 
157 
158 bool
160 {
161  std::unique_lock<std::mutex> sync_lock(lock);
162  Header header;
163  // Wait for the sync requests from the nodes
164  if (waitNum > 0) {
165  auto lf = [this]{ return waitNum == 0; };
166  cv.wait(sync_lock, lf);
167  }
168  assert(waitNum == 0);
169  if (isAbort) // sync aborted
170  return false;
171  assert(!same_tick || (nextAt == curTick()));
172  waitNum = numNodes;
173  // Complete the global synchronisation
174  header.msgType = MsgType::cmdSyncAck;
175  header.sendTick = nextAt;
176  header.syncRepeat = nextRepeat;
177  if (doCkpt || numCkptReq == numNodes) {
178  doCkpt = true;
179  header.needCkpt = ReqType::immediate;
180  numCkptReq = 0;
181  } else {
182  header.needCkpt = ReqType::none;
183  }
184  if (doExit || numExitReq == numNodes) {
185  doExit = true;
186  header.needExit = ReqType::immediate;
187  } else {
188  header.needExit = ReqType::none;
189  }
190  if (doStopSync || numStopSyncReq == numNodes) {
191  doStopSync = true;
192  numStopSyncReq = 0;
193  header.needStopSync = ReqType::immediate;
194  } else {
195  header.needStopSync = ReqType::none;
196  }
198  return true;
199 }
200 
201 bool
203  Tick sync_repeat,
204  ReqType need_ckpt,
205  ReqType need_exit,
206  ReqType need_stop_sync)
207 {
208  std::unique_lock<std::mutex> sync_lock(lock);
209  if (isAbort) // sync aborted
210  return false;
211  assert(waitNum > 0);
212 
213  if (send_tick > nextAt)
214  nextAt = send_tick;
215  if (nextRepeat > sync_repeat)
216  nextRepeat = sync_repeat;
217 
218  if (need_ckpt == ReqType::collective)
219  numCkptReq++;
220  else if (need_ckpt == ReqType::immediate)
221  doCkpt = true;
222  if (need_exit == ReqType::collective)
223  numExitReq++;
224  else if (need_exit == ReqType::immediate)
225  doExit = true;
226  if (need_stop_sync == ReqType::collective)
227  numStopSyncReq++;
228  else if (need_stop_sync == ReqType::immediate)
229  doStopSync = true;
230 
231  waitNum--;
232  // Notify the simulation thread if the on-going sync is complete
233  if (waitNum == 0) {
234  sync_lock.unlock();
235  cv.notify_one();
236  }
237  // The receive thread must keep alive in the switch until the node
238  // closes the connection. Thus, we always return true here.
239  return true;
240 }
241 
242 bool
244  Tick next_repeat,
245  ReqType do_ckpt,
246  ReqType do_exit,
247  ReqType do_stop_sync)
248 {
249  std::unique_lock<std::mutex> sync_lock(lock);
250  if (isAbort) // sync aborted
251  return false;
252  assert(waitNum > 0);
253 
254  nextAt = max_send_tick;
255  nextRepeat = next_repeat;
256  doCkpt = (do_ckpt != ReqType::none);
257  doExit = (do_exit != ReqType::none);
258  doStopSync = (do_stop_sync != ReqType::none);
259 
260  waitNum--;
261  // Notify the simulation thread if the on-going sync is complete
262  if (waitNum == 0) {
263  sync_lock.unlock();
264  cv.notify_one();
265  }
266  // The receive thread must finish when simulation is about to exit
267  return !doExit;
268 }
269 
270 void
272 {
273  std::lock_guard<std::mutex> sync_lock(lock);
274  assert(req != ReqType::none);
275  if (needCkpt != ReqType::none)
276  warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
277  if (needCkpt == ReqType::none || req == ReqType::immediate)
278  needCkpt = req;
279 }
280 
281 void
283 {
284  std::lock_guard<std::mutex> sync_lock(lock);
285  assert(req != ReqType::none);
286  if (needExit != ReqType::none)
287  warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
288  if (needExit == ReqType::none || req == ReqType::immediate)
289  needExit = req;
290 }
291 
292 void
294 {
295  if (doCkpt) {
296  // The first DistIface object called this right before writing the
297  // checkpoint. We need to drain the underlying physical network here.
298  // Note that other gem5 peers may enter this barrier at different
299  // ticks due to draining.
300  run(false);
301  // Only the "first" DistIface object has to perform the sync
302  doCkpt = false;
303  }
304 }
305 
306 void
308 {
309  int need_exit = static_cast<int>(needExit);
310  SERIALIZE_SCALAR(need_exit);
311 }
312 
313 void
315 {
316  int need_exit;
317  UNSERIALIZE_SCALAR(need_exit);
318  needExit = static_cast<ReqType>(need_exit);
319 }
320 
321 void
323 {
324  SERIALIZE_SCALAR(numExitReq);
325 }
326 
327 void
329 {
330  UNSERIALIZE_SCALAR(numExitReq);
331 }
332 
333 void
335 {
336  // Note that this may be called either from startup() or drainResume()
337 
338  // At this point, all DistIface objects has already called Sync::init() so
339  // we have a local minimum of the start tick and repeat for the periodic
340  // sync.
341  repeat = DistIface::sync->nextRepeat;
342  // Do a global barrier to agree on a common repeat value (the smallest
343  // one from all participating nodes.
344  if (!DistIface::sync->run(false))
345  panic("DistIface::SyncEvent::start() aborted\n");
346 
347  assert(!DistIface::sync->doCkpt);
348  assert(!DistIface::sync->doExit);
349  assert(!DistIface::sync->doStopSync);
350  assert(DistIface::sync->nextAt >= curTick());
351  assert(DistIface::sync->nextRepeat <= repeat);
352 
353  if (curTick() == 0)
354  assert(!scheduled());
355 
356  // Use the maximum of the current tick for all participating nodes or a
357  // user provided starting tick.
358  if (scheduled())
359  reschedule(DistIface::sync->nextAt);
360  else
361  schedule(DistIface::sync->nextAt);
362 
363  inform("Dist sync scheduled at %lu and repeats %lu\n", when(),
364  DistIface::sync->nextRepeat);
365 }
366 
367 void
369 {
370  // We may not start a global periodic sync while draining before taking a
371  // checkpoint. This is due to the possibility that peer gem5 processes
372  // may not hit the same periodic sync before they complete draining and
373  // that would make this periodic sync clash with sync called from
374  // DistIface::serialize() by other gem5 processes.
375  // We would need a 'distributed drain' solution to eliminate this
376  // restriction.
377  // Note that if draining was not triggered by checkpointing then we are
378  // fine since no extra global sync will happen (i.e. all peer gem5 will
379  // hit this periodic sync eventually).
380  panic_if(_draining && DistIface::sync->doCkpt,
381  "Distributed sync is hit while draining");
382  /*
383  * Note that this is a global event so this process method will be called
384  * by only exactly one thread.
385  */
386  /*
387  * We hold the eventq lock at this point but the receiver thread may
388  * need the lock to schedule new recv events while waiting for the
389  * dist sync to complete.
390  * Note that the other simulation threads also release their eventq
391  * locks while waiting for us due to the global event semantics.
392  */
393  {
395  // we do a global sync here that is supposed to happen at the same
396  // tick in all gem5 peers
397  if (!DistIface::sync->run(true))
398  return; // global sync aborted
399  // global sync completed
400  }
401  if (DistIface::sync->doCkpt)
402  exitSimLoop("checkpoint");
403  if (DistIface::sync->doExit) {
404  exitSimLoop("exit request from gem5 peers");
405  return;
406  }
407  if (DistIface::sync->doStopSync) {
408  DistIface::sync->doStopSync = false;
409  inform("synchronization disabled at %lu\n", curTick());
410 
411  // The switch node needs to wait for the next sync immediately.
412  if (DistIface::isSwitch) {
413  start();
414  } else {
415  // Wake up thread contexts on non-switch nodes.
416  for (auto *tc: primary->sys->threads) {
417  if (tc->status() == ThreadContext::Suspended)
418  tc->activate();
419  else
420  warn_once("Tried to wake up thread in dist-gem5, but it "
421  "was already awake!\n");
422  }
423  }
424  return;
425  }
426  // schedule the next periodic sync
427  repeat = DistIface::sync->nextRepeat;
428  schedule(curTick() + repeat);
429 }
430 
431 void
433 {
434  // This is called from the receiver thread when it starts running. The new
435  // receiver thread shares the event queue with the simulation thread
436  // (associated with the simulated Ethernet link).
437  curEventQueue(eventManager->eventQueue());
438 
439  recvDone = recv_done;
440  linkDelay = link_delay;
441 }
442 
443 Tick
445  Tick send_delay,
446  Tick prev_recv_tick)
447 {
448  Tick recv_tick = send_tick + send_delay + linkDelay;
449  // sanity check (we need atleast a send delay long window)
450  assert(recv_tick >= prev_recv_tick + send_delay);
451  panic_if(prev_recv_tick + send_delay > recv_tick,
452  "Receive window is smaller than send delay");
453  panic_if(recv_tick <= curTick(),
454  "Simulators out of sync - missed packet receive by %llu ticks"
455  "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
456  "linkDelay: %lu )",
457  curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
458  linkDelay);
459 
460  return recv_tick;
461 }
462 
463 void
465 {
466  // Schedule pending packets asap in case link speed/delay changed when
467  // restoring from the checkpoint.
468  // This may be done during unserialize except that curTick() is unknown
469  // so we call this during drainResume().
470  // If we are not restoring from a checkppint then link latency could not
471  // change so we just return.
472  if (!ckptRestore)
473  return;
474 
476  while (!descQueue.empty()) {
477  Desc d = descQueue.front();
478  descQueue.pop();
479  d.sendTick = curTick();
480  d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed
481  v.push_back(d);
482  }
483 
484  for (auto &d : v)
485  descQueue.push(d);
486 
487  if (recvDone->scheduled()) {
488  assert(!descQueue.empty());
489  eventManager->reschedule(recvDone, curTick());
490  } else {
491  assert(descQueue.empty() && v.empty());
492  }
493  ckptRestore = false;
494 }
495 
496 void
498  Tick send_tick,
499  Tick send_delay)
500 {
501  // Note : this is called from the receiver thread
502  curEventQueue()->lock();
503  Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
504 
505  DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
506  "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
507  send_tick, send_delay, linkDelay, recv_tick);
508  // Every packet must be sent and arrive in the same quantum
509  assert(send_tick > primary->syncEvent->when() -
511  // No packet may be scheduled for receive in the arrival quantum
512  assert(send_tick + send_delay + linkDelay > primary->syncEvent->when());
513 
514  // Now we are about to schedule a recvDone event for the new data packet.
515  // We use the same recvDone object for all incoming data packets. Packet
516  // descriptors are saved in the ordered queue. The currently scheduled
517  // packet is always on the top of the queue.
518  // NOTE: we use the event queue lock to protect the receive desc queue,
519  // too, which is accessed both by the receiver thread and the simulation
520  // thread.
521  descQueue.emplace(new_packet, send_tick, send_delay);
522  if (descQueue.size() == 1) {
523  assert(!recvDone->scheduled());
524  eventManager->schedule(recvDone, recv_tick);
525  } else {
526  assert(recvDone->scheduled());
527  panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
528  "Out of order packet received (recv_tick: %lu top(): %lu\n",
529  recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
530  }
531  curEventQueue()->unlock();
532 }
533 
536 {
537  // Note : this is called from the simulation thread when a receive done
538  // event is being processed for the link. We assume that the thread holds
539  // the event queue queue lock when this is called!
540  EthPacketPtr next_packet = descQueue.front().packet;
541  descQueue.pop();
542 
543  if (descQueue.size() > 0) {
544  Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
545  descQueue.front().sendDelay,
546  curTick());
547  eventManager->schedule(recvDone, recv_tick);
548  }
549  prevRecvTick = curTick();
550  return next_packet;
551 }
552 
553 void
555 {
556  SERIALIZE_SCALAR(sendTick);
557  SERIALIZE_SCALAR(sendDelay);
558  packet->serialize("rxPacket", cp);
559 }
560 
561 void
563 {
564  UNSERIALIZE_SCALAR(sendTick);
565  UNSERIALIZE_SCALAR(sendDelay);
566  packet = std::make_shared<EthPacketData>();
567  packet->unserialize("rxPacket", cp);
568 }
569 
570 void
572 {
573  SERIALIZE_SCALAR(prevRecvTick);
574  // serialize the receive desc queue
575  std::queue<Desc> tmp_queue(descQueue);
576  unsigned n_desc_queue = descQueue.size();
577  assert(tmp_queue.size() == descQueue.size());
578  SERIALIZE_SCALAR(n_desc_queue);
579  for (int i = 0; i < n_desc_queue; i++) {
580  tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
581  tmp_queue.pop();
582  }
583  assert(tmp_queue.empty());
584 }
585 
586 void
588 {
589  assert(descQueue.size() == 0);
590  assert(!recvDone->scheduled());
591  assert(!ckptRestore);
592 
593  UNSERIALIZE_SCALAR(prevRecvTick);
594  // unserialize the receive desc queue
595  unsigned n_desc_queue;
596  UNSERIALIZE_SCALAR(n_desc_queue);
597  for (int i = 0; i < n_desc_queue; i++) {
598  Desc recv_desc;
599  recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
600  descQueue.push(recv_desc);
601  }
602  ckptRestore = true;
603 }
604 
605 DistIface::DistIface(unsigned dist_rank,
606  unsigned dist_size,
607  Tick sync_start,
608  Tick sync_repeat,
609  EventManager *em,
610  bool use_pseudo_op,
611  bool is_switch, int num_nodes) :
612  syncStart(sync_start), syncRepeat(sync_repeat),
613  recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op),
614  rank(dist_rank), size(dist_size)
615 {
616  DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
617  isPrimary = false;
618  if (primary == nullptr) {
619  assert(sync == nullptr);
620  assert(syncEvent == nullptr);
621  isSwitch = is_switch;
622  if (is_switch)
623  sync = new SyncSwitch(num_nodes);
624  else
625  sync = new SyncNode();
626  syncEvent = new SyncEvent();
627  primary = this;
628  isPrimary = true;
629  }
631  distIfaceNum++;
632 }
633 
635 {
636  assert(recvThread);
637  recvThread->join();
638  delete recvThread;
639  if (distIfaceNum-- == 0) {
640  assert(syncEvent);
641  delete syncEvent;
642  assert(sync);
643  delete sync;
644  }
645  if (this == primary)
646  primary = nullptr;
647 }
648 
649 void
651 {
652  Header header;
653 
654  // Prepare a dist header packet for the Ethernet packet we want to
655  // send out.
657  header.sendTick = curTick();
658  header.sendDelay = send_delay;
659 
660  header.dataPacketLength = pkt->length;
661  header.simLength = pkt->simLength;
662 
663  // Send out the packet and the meta info.
664  sendPacket(header, pkt);
665 
666  DPRINTF(DistEthernetPkt,
667  "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
668  pkt->length, send_delay);
669 }
670 
671 void
672 DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
673 {
674  EthPacketPtr new_packet;
676 
677  // Initialize receive scheduler parameters
678  recvScheduler.init(recv_done, link_delay);
679 
680  // Main loop to wait for and process any incoming message.
681  for (;;) {
682  // recvHeader() blocks until the next dist header packet comes in.
683  if (!recvHeader(header)) {
684  // We lost connection to the peer gem5 processes most likely
685  // because one of them called m5 exit. So we stop here.
686  // Grab the eventq lock to stop the simulation thread
687  curEventQueue()->lock();
688  exitSimLoop("connection to gem5 peer got closed");
689  curEventQueue()->unlock();
690  // The simulation thread may be blocked in processing an on-going
691  // global synchronisation. Abort the sync to give the simulation
692  // thread a chance to make progress and process the exit event.
693  sync->abort();
694  // Finish receiver thread
695  break;
696  }
697 
698  // We got a valid dist header packet, let's process it
699  if (header.msgType == MsgType::dataDescriptor) {
700  recvPacket(header, new_packet);
701  recvScheduler.pushPacket(new_packet,
702  header.sendTick,
703  header.sendDelay);
704  } else {
705  // everything else must be synchronisation related command
706  if (!sync->progress(header.sendTick,
707  header.syncRepeat,
708  header.needCkpt,
709  header.needExit,
710  header.needStopSync))
711  // Finish receiver thread if simulation is about to exit
712  break;
713  }
714  }
715 }
716 
717 void
718 DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
719 {
720  assert(recvThread == nullptr);
721 
722  recvThread = new std::thread(&DistIface::recvThreadFunc,
723  this,
724  const_cast<Event *>(recv_done),
725  link_delay);
726  recvThreadsNum++;
727 }
728 
731 {
732  DPRINTF(DistEthernet,"DistIFace::drain() called\n");
733  // This can be called multiple times in the same drain cycle.
734  if (this == primary)
735  syncEvent->draining(true);
736  return DrainState::Drained;
737 }
738 
739 void
741  DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
742  if (this == primary)
743  syncEvent->draining(false);
745 }
746 
747 void
749 {
750  // Drain the dist interface before the checkpoint is taken. We cannot call
751  // this as part of the normal drain cycle because this dist sync has to be
752  // called exactly once after the system is fully drained.
753  sync->drainComplete();
754 
755  unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
756 
757  SERIALIZE_SCALAR(rank_orig);
758  SERIALIZE_SCALAR(dist_iface_id_orig);
759 
760  recvScheduler.serializeSection(cp, "recvScheduler");
761  if (this == primary) {
762  sync->serializeSection(cp, "Sync");
763  }
764 }
765 
766 void
768 {
769  unsigned rank_orig, dist_iface_id_orig;
770  UNSERIALIZE_SCALAR(rank_orig);
771  UNSERIALIZE_SCALAR(dist_iface_id_orig);
772 
773  panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
774  rank, rank_orig);
775  panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
776  "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
777  dist_iface_id_orig);
778 
779  recvScheduler.unserializeSection(cp, "recvScheduler");
780  if (this == primary) {
781  sync->unserializeSection(cp, "Sync");
782  }
783 }
784 
785 void
786 DistIface::init(const Event *done_event, Tick link_delay)
787 {
788  // Init hook for the underlaying message transport to setup/finalize
789  // communication channels
790  initTransport();
791 
792  // Spawn a new receiver thread that will process messages
793  // coming in from peer gem5 processes.
794  // The receive thread will also schedule a (receive) doneEvent
795  // for each incoming data packet.
796  spawnRecvThread(done_event, link_delay);
797 
798 
799  // Adjust the periodic sync start and interval. Different DistIface
800  // might have different requirements. The singleton sync object
801  // will select the minimum values for both params.
802  assert(sync != nullptr);
804 
805  // Initialize the seed for random generator to avoid the same sequence
806  // in all gem5 peer processes
807  assert(primary != nullptr);
808  if (this == primary)
809  random_mt.init(5489 * (rank+1) + 257);
810 }
811 
812 void
814 {
815  DPRINTF(DistEthernet, "DistIface::startup() started\n");
816  // Schedule synchronization unless we are not a switch in pseudo_op mode.
817  if (this == primary && (!syncStartOnPseudoOp || isSwitch))
818  syncEvent->start();
819  DPRINTF(DistEthernet, "DistIface::startup() done\n");
820 }
821 
822 bool
824 {
825  bool ret = true;
826  DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
827  "period:%lu\n", delay, period);
828  if (primary) {
829  if (delay == 0) {
830  inform("m5 checkpoint called with zero delay => triggering collaborative "
831  "checkpoint\n");
833  } else {
834  inform("m5 checkpoint called with non-zero delay => triggering immediate "
835  "checkpoint (at the next sync)\n");
837  }
838  if (period != 0)
839  inform("Non-zero period for m5_ckpt is ignored in "
840  "distributed gem5 runs\n");
841  ret = false;
842  }
843  return ret;
844 }
845 
846 void
848 {
849  std::lock_guard<std::mutex> sync_lock(lock);
850  needStopSync = req;
851 }
852 
853 void
855 {
856  // Unforunate that we have to populate the system pointer member this way.
857  primary->sys = tc->getSystemPtr();
858 
859  // The invariant for both syncing and "unsyncing" is that all threads will
860  // stop executing intructions until the desired sync state has been reached
861  // for all nodes. This is the easiest way to prevent deadlock (in the case
862  // of "unsyncing") and causality errors (in the case of syncing).
863  if (primary->syncEvent->scheduled()) {
864  inform("Request toggling syncronization off\n");
866 
867  // At this point, we have no clue when everyone will reach the sync
868  // stop point. Suspend execution of all local thread contexts.
869  // Dist-gem5 will reactivate all thread contexts when everyone has
870  // reached the sync stop point.
871  for (auto *tc: primary->sys->threads) {
872  if (tc->status() == ThreadContext::Active)
873  tc->quiesce();
874  }
875  } else {
876  inform("Request toggling syncronization on\n");
877  primary->syncEvent->start();
878 
879  // We need to suspend all CPUs until the sync point is reached by all
880  // nodes to prevent causality errors. We can also schedule CPU
881  // activation here, since we know exactly when the next sync will
882  // occur.
883  for (auto *tc: primary->sys->threads) {
884  if (tc->status() == ThreadContext::Active)
885  tc->quiesceTick(primary->syncEvent->when() + 1);
886  }
887  }
888 }
889 
890 bool
892 {
893  bool ret = true;
894  DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
895  delay);
896  if (primary) {
897  // To successfully coordinate an exit, all nodes must be synchronising
898  if (!primary->syncEvent->scheduled())
899  primary->syncEvent->start();
900 
901  if (delay == 0) {
902  inform("m5 exit called with zero delay => triggering collaborative "
903  "exit\n");
905  } else {
906  inform("m5 exit called with non-zero delay => triggering immediate "
907  "exit (at the next sync)\n");
909  }
910  ret = false;
911  }
912  return ret;
913 }
914 
915 uint64_t
917 {
918  uint64_t val;
919  if (primary) {
920  val = primary->rank;
921  } else {
922  warn("Dist-rank parameter is queried in single gem5 simulation.");
923  val = 0;
924  }
925  return val;
926 }
927 
928 uint64_t
930 {
931  uint64_t val;
932  if (primary) {
933  val = primary->size;
934  } else {
935  warn("Dist-size parameter is queried in single gem5 simulation.");
936  val = 1;
937  }
938  return val;
939 }
940 
941 } // namespace gem5
#define DPRINTF(x,...)
Definition: trace.hh:186
bool scheduled() const
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:587
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:571
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
Definition: dist_iface.cc:432
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
Definition: dist_iface.cc:464
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
Definition: dist_iface.cc:535
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
Definition: dist_iface.cc:497
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
Definition: dist_iface.cc:444
The global event to schedule periodic dist sync.
Definition: dist_iface.hh:298
void start()
Schedule the first periodic sync event.
Definition: dist_iface.cc:334
void process() override
This is a global event so process() will only be called by exactly one simulation thread.
Definition: dist_iface.cc:368
void requestExit(ReqType req) override
Definition: dist_iface.cc:282
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:307
ReqType needStopSync
Sync stop requested.
Definition: dist_iface.hh:219
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
Definition: dist_iface.cc:243
void requestCkpt(ReqType req) override
Definition: dist_iface.cc:271
bool run(bool same_tick) override
Core method to perform a full dist sync.
Definition: dist_iface.cc:127
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:314
void requestStopSync(ReqType req) override
Definition: dist_iface.cc:847
bool run(bool same_tick) override
Core method to perform a full dist sync.
Definition: dist_iface.cc:159
SyncSwitch(int num_nodes)
Definition: dist_iface.cc:97
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:322
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:328
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
Definition: dist_iface.cc:202
Tick nextRepeat
The repeat value for the next periodic sync.
Definition: dist_iface.hh:151
bool doStopSync
Flag is set if sync is to stop upon sync completion.
Definition: dist_iface.hh:147
void init(Tick start, Tick repeat)
Initialize periodic sync params.
Definition: dist_iface.cc:70
Tick nextAt
Tick for the next periodic sync (if the event is not scheduled yet)
Definition: dist_iface.hh:155
virtual void requestStopSync(ReqType req)=0
virtual void requestExit(ReqType req)=0
virtual void requestCkpt(ReqType req)=0
std::mutex lock
The lock to protect access to the Sync object.
Definition: dist_iface.hh:124
virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync)=0
Callback when the receiver thread gets a sync ack message.
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
Definition: dist_iface.cc:88
static uint64_t rankParam()
Getter for the dist rank param.
Definition: dist_iface.cc:916
virtual void initTransport()=0
Init hook for the underlaying transport.
RecvScheduler recvScheduler
Meta information about data packets received.
Definition: dist_iface.hh:472
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
Definition: dist_iface.cc:891
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
Definition: dist_iface.hh:476
static void toggleSync(ThreadContext *tc)
Trigger the primary to start/stop synchronization.
Definition: dist_iface.cc:854
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
Definition: dist_iface.cc:823
void init(const Event *e, Tick link_delay)
Definition: dist_iface.cc:786
unsigned rank
The rank of this process among the gem5 peers.
Definition: dist_iface.hh:482
unsigned distIfaceId
Unique id for the dist link.
Definition: dist_iface.hh:494
static DistIface * primary
The very first DistIface object created becomes the primary interface.
Definition: dist_iface.hh:515
static uint64_t sizeParam()
Getter for the dist size param.
Definition: dist_iface.cc:929
DrainState drain() override
Draining is the process of clearing out the states of SimObjects.These are the SimObjects that are pa...
Definition: dist_iface.cc:730
std::thread * recvThread
Receiver thread pointer.
Definition: dist_iface.hh:468
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
Definition: dist_iface.hh:510
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:767
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
Definition: dist_iface.hh:519
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
Definition: dist_iface.cc:672
virtual void sendCmd(const Header &header)=0
Send out a control command to the remote end.
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
static Sync * sync
The singleton Sync object to perform dist synchronisation.
Definition: dist_iface.hh:506
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
static unsigned distIfaceNum
Number of DistIface objects (i.e.
Definition: dist_iface.hh:490
void drainResume() override
Resume execution after a successful drain.
Definition: dist_iface.cc:740
virtual ~DistIface()
Definition: dist_iface.cc:634
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
Definition: dist_iface.cc:718
static bool isSwitch
Is this node a switch?
Definition: dist_iface.hh:523
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:748
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
Tick syncRepeat
Frequency of dist sync events in ticks.
Definition: dist_iface.hh:463
Tick syncStart
Tick to schedule the first dist sync event.
Definition: dist_iface.hh:459
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
Definition: dist_iface.hh:502
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
Definition: dist_iface.cc:605
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
Definition: dist_iface.cc:650
unsigned size
The number of gem5 processes comprising this dist simulation.
Definition: dist_iface.hh:486
void unlock()
Definition: eventq.hh:955
void lock()
Provide an interface for locking/unlocking the event queue.
Definition: eventq.hh:954
void init(uint32_t s)
Definition: random.cc:67
Threads threads
Definition: system.hh:313
ThreadContext is the external interface to all thread state for anything outside of the CPU.
virtual System * getSystemPtr()=0
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
void quiesce()
Quiesce thread context.
@ Suspended
Temporarily inactive.
virtual Status status() const =0
STL vector class.
Definition: stl.hh:37
Random random_mt
Definition: random.cc:99
DrainState
Object drain/handover states.
Definition: drain.hh:75
@ Drained
Buffers drained, ready for serialization/handover.
#define panic(...)
This implements a cprintf based panic() function.
Definition: logging.hh:178
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
Definition: logging.hh:204
void serializeSection(CheckpointOut &cp, const char *name) const
Serialize an object into a new section.
Definition: serialize.cc:74
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
Definition: serialize.cc:81
#define warn(...)
Definition: logging.hh:246
#define warn_once(...)
Definition: logging.hh:250
#define inform(...)
Definition: logging.hh:247
Bitfield< 7 > i
Definition: misc_types.hh:67
Bitfield< 9 > d
Definition: misc_types.hh:64
Bitfield< 20 > sr
Bitfield< 0 > v
Definition: pagetable.hh:65
Bitfield< 63 > val
Definition: misc.hh:776
Bitfield< 2 > em
Definition: misc.hh:607
Bitfield< 5 > lock
Definition: types.hh:82
Reference material can be found at the JEDEC website: UFS standard http://www.jedec....
Tick curTick()
The universal simulation clock.
Definition: cur_tick.hh:46
std::ostream CheckpointOut
Definition: serialize.hh:66
uint64_t Tick
Tick count type.
Definition: types.hh:58
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
Definition: sim_events.cc:88
EventQueue * curEventQueue()
Definition: eventq.hh:88
std::string csprintf(const char *format, const Args &...args)
Definition: cprintf.hh:161
std::shared_ptr< EthPacketData > EthPacketPtr
Definition: etherpkt.hh:90
output header
Definition: nop.cc:36
#define UNSERIALIZE_SCALAR(scalar)
Definition: serialize.hh:575
#define SERIALIZE_SCALAR(scalar)
Definition: serialize.hh:568
Received packet descriptor.
Definition: dist_iface.hh:342
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:554
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:562

Generated on Wed Dec 21 2022 10:22:34 for gem5 by doxygen 1.9.1