gem5  v20.1.0.0
dist_iface.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-2016 ARM Limited
3  * All rights reserved
4  *
5  * The license below extends only to copyright in the software and shall
6  * not be construed as granting a license to any other intellectual
7  * property including but not limited to intellectual property relating
8  * to a hardware implementation of the functionality of the software
9  * licensed hereunder. You may use the software subject to the license
10  * terms below provided that you ensure that this notice is replicated
11  * unmodified and in its entirety in all distributions of the software,
12  * modified or unmodified, in source code or in binary form.
13  *
14  * Redistribution and use in source and binary forms, with or without
15  * modification, are permitted provided that the following conditions are
16  * met: redistributions of source code must retain the above copyright
17  * notice, this list of conditions and the following disclaimer;
18  * redistributions in binary form must reproduce the above copyright
19  * notice, this list of conditions and the following disclaimer in the
20  * documentation and/or other materials provided with the distribution;
21  * neither the name of the copyright holders nor the names of its
22  * contributors may be used to endorse or promote products derived from
23  * this software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  */
37 
38 /* @file
39  * The interface class for dist-gem5 simulations.
40  */
41 
42 #include "dev/net/dist_iface.hh"
43 
44 #include <queue>
45 #include <thread>
46 
47 #include "base/random.hh"
48 #include "base/trace.hh"
49 #include "cpu/thread_context.hh"
50 #include "debug/DistEthernet.hh"
51 #include "debug/DistEthernetPkt.hh"
52 #include "dev/net/etherpkt.hh"
53 #include "sim/sim_exit.hh"
54 #include "sim/sim_object.hh"
55 #include "sim/system.hh"
56 
57 using namespace std;
59 System *DistIface::sys = nullptr;
61 unsigned DistIface::distIfaceNum = 0;
62 unsigned DistIface::recvThreadsNum = 0;
63 DistIface *DistIface::primary = nullptr;
64 bool DistIface::isSwitch = false;
65 
66 void
67 DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
68 {
69  if (start_tick < nextAt) {
70  nextAt = start_tick;
71  inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
72  }
73 
74  if (repeat_tick == 0)
75  panic("Dist synchronisation interval must be greater than zero");
76 
77  if (repeat_tick < nextRepeat) {
78  nextRepeat = repeat_tick;
79  inform("Dist synchronisation interval is changed to %lu.\n",
80  nextRepeat);
81  }
82 }
83 
84 void
86 {
87  std::unique_lock<std::mutex> sync_lock(lock);
88  waitNum = 0;
89  isAbort = true;
90  sync_lock.unlock();
91  cv.notify_one();
92 }
93 
95 {
96  numNodes = num_nodes;
97  waitNum = num_nodes;
98  numExitReq = 0;
99  numCkptReq = 0;
100  numStopSyncReq = 0;
101  doExit = false;
102  doCkpt = false;
103  doStopSync = false;
104  nextAt = std::numeric_limits<Tick>::max();
105  nextRepeat = std::numeric_limits<Tick>::max();
106  isAbort = false;
107 }
108 
110 {
111  waitNum = 0;
112  needExit = ReqType::none;
113  needCkpt = ReqType::none;
114  needStopSync = ReqType::none;
115  doExit = false;
116  doCkpt = false;
117  doStopSync = false;
118  nextAt = std::numeric_limits<Tick>::max();
119  nextRepeat = std::numeric_limits<Tick>::max();
120  isAbort = false;
121 }
122 
123 bool
125 {
126  std::unique_lock<std::mutex> sync_lock(lock);
127  Header header;
128 
129  assert(waitNum == 0);
130  assert(!isAbort);
131  waitNum = DistIface::recvThreadsNum;
132  // initiate the global synchronisation
133  header.msgType = MsgType::cmdSyncReq;
134  header.sendTick = curTick();
135  header.syncRepeat = nextRepeat;
136  header.needCkpt = needCkpt;
137  header.needStopSync = needStopSync;
138  if (needCkpt != ReqType::none)
139  needCkpt = ReqType::pending;
140  header.needExit = needExit;
141  if (needExit != ReqType::none)
142  needExit = ReqType::pending;
143  if (needStopSync != ReqType::none)
144  needStopSync = ReqType::pending;
146  // now wait until all receiver threads complete the synchronisation
147  auto lf = [this]{ return waitNum == 0; };
148  cv.wait(sync_lock, lf);
149  // global synchronisation is done.
150  assert(isAbort || !same_tick || (nextAt == curTick()));
151  return !isAbort;
152 }
153 
154 
155 bool
157 {
158  std::unique_lock<std::mutex> sync_lock(lock);
159  Header header;
160  // Wait for the sync requests from the nodes
161  if (waitNum > 0) {
162  auto lf = [this]{ return waitNum == 0; };
163  cv.wait(sync_lock, lf);
164  }
165  assert(waitNum == 0);
166  if (isAbort) // sync aborted
167  return false;
168  assert(!same_tick || (nextAt == curTick()));
169  waitNum = numNodes;
170  // Complete the global synchronisation
171  header.msgType = MsgType::cmdSyncAck;
172  header.sendTick = nextAt;
173  header.syncRepeat = nextRepeat;
174  if (doCkpt || numCkptReq == numNodes) {
175  doCkpt = true;
176  header.needCkpt = ReqType::immediate;
177  numCkptReq = 0;
178  } else {
179  header.needCkpt = ReqType::none;
180  }
181  if (doExit || numExitReq == numNodes) {
182  doExit = true;
183  header.needExit = ReqType::immediate;
184  } else {
185  header.needExit = ReqType::none;
186  }
187  if (doStopSync || numStopSyncReq == numNodes) {
188  doStopSync = true;
189  numStopSyncReq = 0;
190  header.needStopSync = ReqType::immediate;
191  } else {
192  header.needStopSync = ReqType::none;
193  }
195  return true;
196 }
197 
198 bool
200  Tick sync_repeat,
201  ReqType need_ckpt,
202  ReqType need_exit,
203  ReqType need_stop_sync)
204 {
205  std::unique_lock<std::mutex> sync_lock(lock);
206  if (isAbort) // sync aborted
207  return false;
208  assert(waitNum > 0);
209 
210  if (send_tick > nextAt)
211  nextAt = send_tick;
212  if (nextRepeat > sync_repeat)
213  nextRepeat = sync_repeat;
214 
215  if (need_ckpt == ReqType::collective)
216  numCkptReq++;
217  else if (need_ckpt == ReqType::immediate)
218  doCkpt = true;
219  if (need_exit == ReqType::collective)
220  numExitReq++;
221  else if (need_exit == ReqType::immediate)
222  doExit = true;
223  if (need_stop_sync == ReqType::collective)
224  numStopSyncReq++;
225  else if (need_stop_sync == ReqType::immediate)
226  doStopSync = true;
227 
228  waitNum--;
229  // Notify the simulation thread if the on-going sync is complete
230  if (waitNum == 0) {
231  sync_lock.unlock();
232  cv.notify_one();
233  }
234  // The receive thread must keep alive in the switch until the node
235  // closes the connection. Thus, we always return true here.
236  return true;
237 }
238 
239 bool
241  Tick next_repeat,
242  ReqType do_ckpt,
243  ReqType do_exit,
244  ReqType do_stop_sync)
245 {
246  std::unique_lock<std::mutex> sync_lock(lock);
247  if (isAbort) // sync aborted
248  return false;
249  assert(waitNum > 0);
250 
251  nextAt = max_send_tick;
252  nextRepeat = next_repeat;
253  doCkpt = (do_ckpt != ReqType::none);
254  doExit = (do_exit != ReqType::none);
255  doStopSync = (do_stop_sync != ReqType::none);
256 
257  waitNum--;
258  // Notify the simulation thread if the on-going sync is complete
259  if (waitNum == 0) {
260  sync_lock.unlock();
261  cv.notify_one();
262  }
263  // The receive thread must finish when simulation is about to exit
264  return !doExit;
265 }
266 
267 void
269 {
270  std::lock_guard<std::mutex> sync_lock(lock);
271  assert(req != ReqType::none);
272  if (needCkpt != ReqType::none)
273  warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
274  if (needCkpt == ReqType::none || req == ReqType::immediate)
275  needCkpt = req;
276 }
277 
278 void
280 {
281  std::lock_guard<std::mutex> sync_lock(lock);
282  assert(req != ReqType::none);
283  if (needExit != ReqType::none)
284  warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
285  if (needExit == ReqType::none || req == ReqType::immediate)
286  needExit = req;
287 }
288 
289 void
291 {
292  if (doCkpt) {
293  // The first DistIface object called this right before writing the
294  // checkpoint. We need to drain the underlying physical network here.
295  // Note that other gem5 peers may enter this barrier at different
296  // ticks due to draining.
297  run(false);
298  // Only the "first" DistIface object has to perform the sync
299  doCkpt = false;
300  }
301 }
302 
303 void
305 {
306  int need_exit = static_cast<int>(needExit);
307  SERIALIZE_SCALAR(need_exit);
308 }
309 
310 void
312 {
313  int need_exit;
314  UNSERIALIZE_SCALAR(need_exit);
315  needExit = static_cast<ReqType>(need_exit);
316 }
317 
318 void
320 {
321  SERIALIZE_SCALAR(numExitReq);
322 }
323 
324 void
326 {
327  UNSERIALIZE_SCALAR(numExitReq);
328 }
329 
330 void
332 {
333  // Note that this may be called either from startup() or drainResume()
334 
335  // At this point, all DistIface objects has already called Sync::init() so
336  // we have a local minimum of the start tick and repeat for the periodic
337  // sync.
338  repeat = DistIface::sync->nextRepeat;
339  // Do a global barrier to agree on a common repeat value (the smallest
340  // one from all participating nodes.
341  if (!DistIface::sync->run(false))
342  panic("DistIface::SyncEvent::start() aborted\n");
343 
344  assert(!DistIface::sync->doCkpt);
345  assert(!DistIface::sync->doExit);
346  assert(!DistIface::sync->doStopSync);
347  assert(DistIface::sync->nextAt >= curTick());
348  assert(DistIface::sync->nextRepeat <= repeat);
349 
350  if (curTick() == 0)
351  assert(!scheduled());
352 
353  // Use the maximum of the current tick for all participating nodes or a
354  // user provided starting tick.
355  if (scheduled())
356  reschedule(DistIface::sync->nextAt);
357  else
358  schedule(DistIface::sync->nextAt);
359 
360  inform("Dist sync scheduled at %lu and repeats %lu\n", when(),
361  DistIface::sync->nextRepeat);
362 }
363 
364 void
366 {
367  // We may not start a global periodic sync while draining before taking a
368  // checkpoint. This is due to the possibility that peer gem5 processes
369  // may not hit the same periodic sync before they complete draining and
370  // that would make this periodic sync clash with sync called from
371  // DistIface::serialize() by other gem5 processes.
372  // We would need a 'distributed drain' solution to eliminate this
373  // restriction.
374  // Note that if draining was not triggered by checkpointing then we are
375  // fine since no extra global sync will happen (i.e. all peer gem5 will
376  // hit this periodic sync eventually).
377  panic_if(_draining && DistIface::sync->doCkpt,
378  "Distributed sync is hit while draining");
379  /*
380  * Note that this is a global event so this process method will be called
381  * by only exactly one thread.
382  */
383  /*
384  * We hold the eventq lock at this point but the receiver thread may
385  * need the lock to schedule new recv events while waiting for the
386  * dist sync to complete.
387  * Note that the other simulation threads also release their eventq
388  * locks while waiting for us due to the global event semantics.
389  */
390  {
392  // we do a global sync here that is supposed to happen at the same
393  // tick in all gem5 peers
394  if (!DistIface::sync->run(true))
395  return; // global sync aborted
396  // global sync completed
397  }
398  if (DistIface::sync->doCkpt)
399  exitSimLoop("checkpoint");
400  if (DistIface::sync->doExit) {
401  exitSimLoop("exit request from gem5 peers");
402  return;
403  }
404  if (DistIface::sync->doStopSync) {
405  DistIface::sync->doStopSync = false;
406  inform("synchronization disabled at %lu\n", curTick());
407 
408  // The switch node needs to wait for the next sync immediately.
409  if (DistIface::isSwitch) {
410  start();
411  } else {
412  // Wake up thread contexts on non-switch nodes.
413  for (auto *tc: primary->sys->threads) {
414  if (tc->status() == ThreadContext::Suspended)
415  tc->activate();
416  else
417  warn_once("Tried to wake up thread in dist-gem5, but it "
418  "was already awake!\n");
419  }
420  }
421  return;
422  }
423  // schedule the next periodic sync
424  repeat = DistIface::sync->nextRepeat;
425  schedule(curTick() + repeat);
426 }
427 
428 void
430 {
431  // This is called from the receiver thread when it starts running. The new
432  // receiver thread shares the event queue with the simulation thread
433  // (associated with the simulated Ethernet link).
434  curEventQueue(eventManager->eventQueue());
435 
436  recvDone = recv_done;
437  linkDelay = link_delay;
438 }
439 
440 Tick
442  Tick send_delay,
443  Tick prev_recv_tick)
444 {
445  Tick recv_tick = send_tick + send_delay + linkDelay;
446  // sanity check (we need atleast a send delay long window)
447  assert(recv_tick >= prev_recv_tick + send_delay);
448  panic_if(prev_recv_tick + send_delay > recv_tick,
449  "Receive window is smaller than send delay");
450  panic_if(recv_tick <= curTick(),
451  "Simulators out of sync - missed packet receive by %llu ticks"
452  "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
453  "linkDelay: %lu )",
454  curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
455  linkDelay);
456 
457  return recv_tick;
458 }
459 
460 void
462 {
463  // Schedule pending packets asap in case link speed/delay changed when
464  // restoring from the checkpoint.
465  // This may be done during unserialize except that curTick() is unknown
466  // so we call this during drainResume().
467  // If we are not restoring from a checkppint then link latency could not
468  // change so we just return.
469  if (!ckptRestore)
470  return;
471 
473  while (!descQueue.empty()) {
474  Desc d = descQueue.front();
475  descQueue.pop();
476  d.sendTick = curTick();
477  d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed
478  v.push_back(d);
479  }
480 
481  for (auto &d : v)
482  descQueue.push(d);
483 
484  if (recvDone->scheduled()) {
485  assert(!descQueue.empty());
486  eventManager->reschedule(recvDone, curTick());
487  } else {
488  assert(descQueue.empty() && v.empty());
489  }
490  ckptRestore = false;
491 }
492 
493 void
495  Tick send_tick,
496  Tick send_delay)
497 {
498  // Note : this is called from the receiver thread
499  curEventQueue()->lock();
500  Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
501 
502  DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
503  "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
504  send_tick, send_delay, linkDelay, recv_tick);
505  // Every packet must be sent and arrive in the same quantum
506  assert(send_tick > primary->syncEvent->when() -
507  primary->syncEvent->repeat);
508  // No packet may be scheduled for receive in the arrival quantum
509  assert(send_tick + send_delay + linkDelay > primary->syncEvent->when());
510 
511  // Now we are about to schedule a recvDone event for the new data packet.
512  // We use the same recvDone object for all incoming data packets. Packet
513  // descriptors are saved in the ordered queue. The currently scheduled
514  // packet is always on the top of the queue.
515  // NOTE: we use the event queue lock to protect the receive desc queue,
516  // too, which is accessed both by the receiver thread and the simulation
517  // thread.
518  descQueue.emplace(new_packet, send_tick, send_delay);
519  if (descQueue.size() == 1) {
520  assert(!recvDone->scheduled());
521  eventManager->schedule(recvDone, recv_tick);
522  } else {
523  assert(recvDone->scheduled());
524  panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
525  "Out of order packet received (recv_tick: %lu top(): %lu\n",
526  recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
527  }
528  curEventQueue()->unlock();
529 }
530 
533 {
534  // Note : this is called from the simulation thread when a receive done
535  // event is being processed for the link. We assume that the thread holds
536  // the event queue queue lock when this is called!
537  EthPacketPtr next_packet = descQueue.front().packet;
538  descQueue.pop();
539 
540  if (descQueue.size() > 0) {
541  Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
542  descQueue.front().sendDelay,
543  curTick());
544  eventManager->schedule(recvDone, recv_tick);
545  }
546  prevRecvTick = curTick();
547  return next_packet;
548 }
549 
550 void
552 {
553  SERIALIZE_SCALAR(sendTick);
554  SERIALIZE_SCALAR(sendDelay);
555  packet->serialize("rxPacket", cp);
556 }
557 
558 void
560 {
561  UNSERIALIZE_SCALAR(sendTick);
562  UNSERIALIZE_SCALAR(sendDelay);
563  packet = std::make_shared<EthPacketData>();
564  packet->unserialize("rxPacket", cp);
565 }
566 
567 void
569 {
570  SERIALIZE_SCALAR(prevRecvTick);
571  // serialize the receive desc queue
572  std::queue<Desc> tmp_queue(descQueue);
573  unsigned n_desc_queue = descQueue.size();
574  assert(tmp_queue.size() == descQueue.size());
575  SERIALIZE_SCALAR(n_desc_queue);
576  for (int i = 0; i < n_desc_queue; i++) {
577  tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
578  tmp_queue.pop();
579  }
580  assert(tmp_queue.empty());
581 }
582 
583 void
585 {
586  assert(descQueue.size() == 0);
587  assert(!recvDone->scheduled());
588  assert(!ckptRestore);
589 
590  UNSERIALIZE_SCALAR(prevRecvTick);
591  // unserialize the receive desc queue
592  unsigned n_desc_queue;
593  UNSERIALIZE_SCALAR(n_desc_queue);
594  for (int i = 0; i < n_desc_queue; i++) {
595  Desc recv_desc;
596  recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
597  descQueue.push(recv_desc);
598  }
599  ckptRestore = true;
600 }
601 
602 DistIface::DistIface(unsigned dist_rank,
603  unsigned dist_size,
604  Tick sync_start,
605  Tick sync_repeat,
606  EventManager *em,
607  bool use_pseudo_op,
608  bool is_switch, int num_nodes) :
609  syncStart(sync_start), syncRepeat(sync_repeat),
610  recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op),
611  rank(dist_rank), size(dist_size)
612 {
613  DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
614  isPrimary = false;
615  if (primary == nullptr) {
616  assert(sync == nullptr);
617  assert(syncEvent == nullptr);
618  isSwitch = is_switch;
619  if (is_switch)
620  sync = new SyncSwitch(num_nodes);
621  else
622  sync = new SyncNode();
623  syncEvent = new SyncEvent();
624  primary = this;
625  isPrimary = true;
626  }
628  distIfaceNum++;
629 }
630 
632 {
633  assert(recvThread);
634  recvThread->join();
635  delete recvThread;
636  if (distIfaceNum-- == 0) {
637  assert(syncEvent);
638  delete syncEvent;
639  assert(sync);
640  delete sync;
641  }
642  if (this == primary)
643  primary = nullptr;
644 }
645 
646 void
648 {
649  Header header;
650 
651  // Prepare a dist header packet for the Ethernet packet we want to
652  // send out.
654  header.sendTick = curTick();
655  header.sendDelay = send_delay;
656 
657  header.dataPacketLength = pkt->length;
658  header.simLength = pkt->simLength;
659 
660  // Send out the packet and the meta info.
661  sendPacket(header, pkt);
662 
663  DPRINTF(DistEthernetPkt,
664  "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
665  pkt->length, send_delay);
666 }
667 
668 void
669 DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
670 {
671  EthPacketPtr new_packet;
673 
674  // Initialize receive scheduler parameters
675  recvScheduler.init(recv_done, link_delay);
676 
677  // Main loop to wait for and process any incoming message.
678  for (;;) {
679  // recvHeader() blocks until the next dist header packet comes in.
680  if (!recvHeader(header)) {
681  // We lost connection to the peer gem5 processes most likely
682  // because one of them called m5 exit. So we stop here.
683  // Grab the eventq lock to stop the simulation thread
684  curEventQueue()->lock();
685  exitSimLoop("connection to gem5 peer got closed");
686  curEventQueue()->unlock();
687  // The simulation thread may be blocked in processing an on-going
688  // global synchronisation. Abort the sync to give the simulation
689  // thread a chance to make progress and process the exit event.
690  sync->abort();
691  // Finish receiver thread
692  break;
693  }
694 
695  // We got a valid dist header packet, let's process it
696  if (header.msgType == MsgType::dataDescriptor) {
697  recvPacket(header, new_packet);
698  recvScheduler.pushPacket(new_packet,
699  header.sendTick,
700  header.sendDelay);
701  } else {
702  // everything else must be synchronisation related command
703  if (!sync->progress(header.sendTick,
704  header.syncRepeat,
705  header.needCkpt,
706  header.needExit,
707  header.needStopSync))
708  // Finish receiver thread if simulation is about to exit
709  break;
710  }
711  }
712 }
713 
714 void
715 DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
716 {
717  assert(recvThread == nullptr);
718 
719  recvThread = new std::thread(&DistIface::recvThreadFunc,
720  this,
721  const_cast<Event *>(recv_done),
722  link_delay);
723  recvThreadsNum++;
724 }
725 
728 {
729  DPRINTF(DistEthernet,"DistIFace::drain() called\n");
730  // This can be called multiple times in the same drain cycle.
731  if (this == primary)
732  syncEvent->draining(true);
733  return DrainState::Drained;
734 }
735 
736 void
738  DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
739  if (this == primary)
740  syncEvent->draining(false);
742 }
743 
744 void
746 {
747  // Drain the dist interface before the checkpoint is taken. We cannot call
748  // this as part of the normal drain cycle because this dist sync has to be
749  // called exactly once after the system is fully drained.
750  sync->drainComplete();
751 
752  unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
753 
754  SERIALIZE_SCALAR(rank_orig);
755  SERIALIZE_SCALAR(dist_iface_id_orig);
756 
757  recvScheduler.serializeSection(cp, "recvScheduler");
758  if (this == primary) {
759  sync->serializeSection(cp, "Sync");
760  }
761 }
762 
763 void
765 {
766  unsigned rank_orig, dist_iface_id_orig;
767  UNSERIALIZE_SCALAR(rank_orig);
768  UNSERIALIZE_SCALAR(dist_iface_id_orig);
769 
770  panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
771  rank, rank_orig);
772  panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
773  "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
774  dist_iface_id_orig);
775 
776  recvScheduler.unserializeSection(cp, "recvScheduler");
777  if (this == primary) {
778  sync->unserializeSection(cp, "Sync");
779  }
780 }
781 
782 void
783 DistIface::init(const Event *done_event, Tick link_delay)
784 {
785  // Init hook for the underlaying message transport to setup/finalize
786  // communication channels
787  initTransport();
788 
789  // Spawn a new receiver thread that will process messages
790  // coming in from peer gem5 processes.
791  // The receive thread will also schedule a (receive) doneEvent
792  // for each incoming data packet.
793  spawnRecvThread(done_event, link_delay);
794 
795 
796  // Adjust the periodic sync start and interval. Different DistIface
797  // might have different requirements. The singleton sync object
798  // will select the minimum values for both params.
799  assert(sync != nullptr);
801 
802  // Initialize the seed for random generator to avoid the same sequence
803  // in all gem5 peer processes
804  assert(primary != nullptr);
805  if (this == primary)
806  random_mt.init(5489 * (rank+1) + 257);
807 }
808 
809 void
811 {
812  DPRINTF(DistEthernet, "DistIface::startup() started\n");
813  // Schedule synchronization unless we are not a switch in pseudo_op mode.
814  if (this == primary && (!syncStartOnPseudoOp || isSwitch))
815  syncEvent->start();
816  DPRINTF(DistEthernet, "DistIface::startup() done\n");
817 }
818 
819 bool
821 {
822  bool ret = true;
823  DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
824  "period:%lu\n", delay, period);
825  if (primary) {
826  if (delay == 0) {
827  inform("m5 checkpoint called with zero delay => triggering collaborative "
828  "checkpoint\n");
830  } else {
831  inform("m5 checkpoint called with non-zero delay => triggering immediate "
832  "checkpoint (at the next sync)\n");
834  }
835  if (period != 0)
836  inform("Non-zero period for m5_ckpt is ignored in "
837  "distributed gem5 runs\n");
838  ret = false;
839  }
840  return ret;
841 }
842 
843 void
845 {
846  std::lock_guard<std::mutex> sync_lock(lock);
847  needStopSync = req;
848 }
849 
850 void
852 {
853  // Unforunate that we have to populate the system pointer member this way.
854  primary->sys = tc->getSystemPtr();
855 
856  // The invariant for both syncing and "unsyncing" is that all threads will
857  // stop executing intructions until the desired sync state has been reached
858  // for all nodes. This is the easiest way to prevent deadlock (in the case
859  // of "unsyncing") and causality errors (in the case of syncing).
860  if (primary->syncEvent->scheduled()) {
861  inform("Request toggling syncronization off\n");
863 
864  // At this point, we have no clue when everyone will reach the sync
865  // stop point. Suspend execution of all local thread contexts.
866  // Dist-gem5 will reactivate all thread contexts when everyone has
867  // reached the sync stop point.
868 #if THE_ISA != NULL_ISA
869  for (auto *tc: primary->sys->threads) {
870  if (tc->status() == ThreadContext::Active)
871  tc->quiesce();
872  }
873 #endif
874  } else {
875  inform("Request toggling syncronization on\n");
876  primary->syncEvent->start();
877 
878  // We need to suspend all CPUs until the sync point is reached by all
879  // nodes to prevent causality errors. We can also schedule CPU
880  // activation here, since we know exactly when the next sync will
881  // occur.
882 #if THE_ISA != NULL_ISA
883  for (auto *tc: primary->sys->threads) {
884  if (tc->status() == ThreadContext::Active)
885  tc->quiesceTick(primary->syncEvent->when() + 1);
886  }
887 #endif
888  }
889 }
890 
891 bool
893 {
894  bool ret = true;
895  DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
896  delay);
897  if (primary) {
898  // To successfully coordinate an exit, all nodes must be synchronising
899  if (!primary->syncEvent->scheduled())
900  primary->syncEvent->start();
901 
902  if (delay == 0) {
903  inform("m5 exit called with zero delay => triggering collaborative "
904  "exit\n");
906  } else {
907  inform("m5 exit called with non-zero delay => triggering immediate "
908  "exit (at the next sync)\n");
910  }
911  ret = false;
912  }
913  return ret;
914 }
915 
916 uint64_t
918 {
919  uint64_t val;
920  if (primary) {
921  val = primary->rank;
922  } else {
923  warn("Dist-rank parameter is queried in single gem5 simulation.");
924  val = 0;
925  }
926  return val;
927 }
928 
929 uint64_t
931 {
932  uint64_t val;
933  if (primary) {
934  val = primary->size;
935  } else {
936  warn("Dist-size parameter is queried in single gem5 simulation.");
937  val = 1;
938  }
939  return val;
940 }
DistIface::unserialize
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:764
DistIface::distIfaceNum
static unsigned distIfaceNum
Number of DistIface objects (i.e.
Definition: dist_iface.hh:488
DistIface::Sync::drainComplete
void drainComplete()
Definition: dist_iface.cc:290
DistIface::Sync::requestExit
virtual void requestExit(ReqType req)=0
Serializable::unserializeSection
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
Definition: serialize.cc:178
DistIface::recvThreadsNum
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
Definition: dist_iface.hh:500
DistIface::DistIface
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
Definition: dist_iface.cc:602
warn
#define warn(...)
Definition: logging.hh:239
DistIface
The interface class to talk to peer gem5 processes.
Definition: dist_iface.hh:99
EventQueue::unlock
void unlock()
Definition: eventq.hh:948
system.hh
DistIface::Sync
Definition: dist_iface.hh:116
UNSERIALIZE_SCALAR
#define UNSERIALIZE_SCALAR(scalar)
Definition: serialize.hh:797
ThreadContext::quiesce
void quiesce()
Quiesce thread context.
Definition: thread_context.cc:129
DistIface::Sync::abort
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
Definition: dist_iface.cc:85
DistIface::SyncNode::run
bool run(bool same_tick) override
Core method to perform a full dist sync.
Definition: dist_iface.cc:124
warn_once
#define warn_once(...)
Definition: logging.hh:243
ArmISA::i
Bitfield< 7 > i
Definition: miscregs_types.hh:63
DistIface::SyncEvent::process
void process() override
This is a global event so process() will only be called by exactly one simulation thread.
Definition: dist_iface.cc:365
DistIface::recvThreadFunc
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
Definition: dist_iface.cc:669
DistIface::primary
static DistIface * primary
The very first DistIface object created becomes the primary interface.
Definition: dist_iface.hh:513
DistIface::SyncNode::serialize
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:304
DistIface::SyncNode::progress
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
Definition: dist_iface.cc:240
DistIface::drain
DrainState drain() override
Draining is the process of clearing out the states of SimObjects.These are the SimObjects that are pa...
Definition: dist_iface.cc:727
random.hh
X86ISA::lock
Bitfield< 5 > lock
Definition: types.hh:77
DistIface::RecvScheduler::Desc::unserialize
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:559
Tick
uint64_t Tick
Tick count type.
Definition: types.hh:63
DistIface::RecvScheduler::resumeRecvTicks
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
Definition: dist_iface.cc:461
DistIface::toggleSync
static void toggleSync(ThreadContext *tc)
Trigger the primary to start/stop synchronization.
Definition: dist_iface.cc:851
DistIface::RecvScheduler::Desc::serialize
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:551
DistIface::sys
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
Definition: dist_iface.hh:517
header
output header
Definition: nop.cc:36
std::vector
STL vector class.
Definition: stl.hh:37
DistIface::RecvScheduler::popPacket
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
Definition: dist_iface.cc:532
DistIface::SyncSwitch::serialize
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:319
DistIface::SyncNode
Definition: dist_iface.hh:203
Serializable::serializeSection
void serializeSection(CheckpointOut &cp, const char *name) const
Serialize an object into a new section.
Definition: serialize.cc:171
sim_exit.hh
Stats::none
const FlagsType none
Nothing extra to print.
Definition: info.hh:43
DistHeaderPkt::ReqType
ReqType
Definition: dist_packet.hh:65
DistHeaderPkt::Header
Definition: dist_packet.hh:77
EventQueue::ScopedRelease
Definition: eventq.hh:709
DistIface::SyncSwitch::progress
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
Definition: dist_iface.cc:199
DistIface::SyncSwitch::run
bool run(bool same_tick) override
Core method to perform a full dist sync.
Definition: dist_iface.cc:156
DistIface::Sync::nextRepeat
Tick nextRepeat
The repeat value for the next periodic sync.
Definition: dist_iface.hh:149
DistHeaderPkt::MsgType::dataDescriptor
@ dataDescriptor
DrainState::Drained
@ Drained
Buffers drained, ready for serialization/handover.
DistIface::RecvScheduler::unserialize
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:584
DrainState
DrainState
Object drain/handover states.
Definition: drain.hh:71
random_mt
Random random_mt
Definition: random.cc:96
DistIface::init
void init(const Event *e, Tick link_delay)
Definition: dist_iface.cc:783
DistIface::size
unsigned size
The number of gem5 processes comprising this dist simulation.
Definition: dist_iface.hh:484
BaseGlobalEvent::when
Tick when() const
Definition: global_event.hh:136
DistIface::SyncNode::needStopSync
ReqType needStopSync
Sync stop requested.
Definition: dist_iface.hh:217
DistIface::SyncEvent
The global event to schedule periodic dist sync.
Definition: dist_iface.hh:295
DistIface::initTransport
virtual void initTransport()=0
Init hook for the underlaying transport.
cp
Definition: cprintf.cc:40
DistIface::RecvScheduler::Desc
Received packet descriptor.
Definition: dist_iface.hh:339
DistIface::SyncNode::requestStopSync
void requestStopSync(ReqType req) override
Definition: dist_iface.cc:844
DistIface::rankParam
static uint64_t rankParam()
Getter for the dist rank param.
Definition: dist_iface.cc:917
ThreadContext
ThreadContext is the external interface to all thread state for anything outside of the CPU.
Definition: thread_context.hh:88
Random::init
void init(uint32_t s)
Definition: random.cc:64
DistHeaderPkt::ReqType::immediate
@ immediate
Event
Definition: eventq.hh:246
sim_object.hh
System
Definition: system.hh:73
DPRINTF
#define DPRINTF(x,...)
Definition: trace.hh:234
DistIface::SyncNode::unserialize
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:311
DistIface::Sync::progress
virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync)=0
Callback when the receiver thread gets a sync ack message.
ArmISA::d
Bitfield< 9 > d
Definition: miscregs_types.hh:60
DistIface::isSwitch
static bool isSwitch
Is this node a switch?
Definition: dist_iface.hh:521
DistHeaderPkt::ReqType::collective
@ collective
MipsISA::sr
Bitfield< 20 > sr
Definition: pra_constants.hh:116
DistIface::SyncNode::requestCkpt
void requestCkpt(ReqType req) override
Definition: dist_iface.cc:268
DistIface::rank
unsigned rank
The rank of this process among the gem5 peers.
Definition: dist_iface.hh:480
dist_iface.hh
DistIface::RecvScheduler::calcReceiveTick
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
Definition: dist_iface.cc:441
DistIface::drainResume
void drainResume() override
Resume execution after a successful drain.
Definition: dist_iface.cc:737
DistIface::~DistIface
virtual ~DistIface()
Definition: dist_iface.cc:631
DistIface::sendCmd
virtual void sendCmd(const Header &header)=0
Send out a control command to the remote end.
exitSimLoop
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
Definition: sim_events.cc:88
X86ISA::em
Bitfield< 2 > em
Definition: misc.hh:602
curEventQueue
EventQueue * curEventQueue()
Definition: eventq.hh:83
DistIface::isPrimary
bool isPrimary
Definition: dist_iface.hh:494
DistIface::SyncNode::SyncNode
SyncNode()
Definition: dist_iface.cc:109
DistIface::syncEvent
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
Definition: dist_iface.hh:508
DistIface::RecvScheduler::init
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
Definition: dist_iface.cc:429
DistIface::recvThread
std::thread * recvThread
Receiver thread pointer.
Definition: dist_iface.hh:466
X86ISA::val
Bitfield< 63 > val
Definition: misc.hh:769
DistIface::sendPacket
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
ThreadContext::status
virtual Status status() const =0
DistIface::SyncEvent::start
void start()
Schedule the first periodic sync event.
Definition: dist_iface.cc:331
DistIface::Sync::requestStopSync
virtual void requestStopSync(ReqType req)=0
ThreadContext::quiesceTick
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
Definition: thread_context.cc:136
SERIALIZE_SCALAR
#define SERIALIZE_SCALAR(scalar)
Definition: serialize.hh:790
DistIface::readyToExit
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
Definition: dist_iface.cc:892
DistIface::SyncSwitch::unserialize
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:325
DistIface::sync
static Sync * sync
The singleton Sync object to perform dist synchronisation.
Definition: dist_iface.hh:504
EventQueue::lock
void lock()
Provide an interface for locking/unlocking the event queue.
Definition: eventq.hh:947
DistIface::syncStartOnPseudoOp
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
Definition: dist_iface.hh:474
EthPacketPtr
std::shared_ptr< EthPacketData > EthPacketPtr
Definition: etherpkt.hh:87
DistIface::SyncNode::requestExit
void requestExit(ReqType req) override
Definition: dist_iface.cc:279
panic_if
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
Definition: logging.hh:197
System::threads
Threads threads
Definition: system.hh:309
ThreadContext::Suspended
@ Suspended
Temporarily inactive.
Definition: thread_context.hh:106
inform
#define inform(...)
Definition: logging.hh:240
BaseGlobalEvent::scheduled
bool scheduled() const
Definition: global_event.hh:126
DistIface::Sync::doStopSync
bool doStopSync
Flag is set if sync is to stop upon sync completion.
Definition: dist_iface.hh:145
std
Overload hash function for BasicBlockRange type.
Definition: vec_reg.hh:587
DistIface::Sync::init
void init(Tick start, Tick repeat)
Initialize periodic sync params.
Definition: dist_iface.cc:67
DistIface::RecvScheduler::pushPacket
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
Definition: dist_iface.cc:494
DistIface::readyToCkpt
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
Definition: dist_iface.cc:820
DistIface::SyncSwitch
Definition: dist_iface.hh:238
ThreadContext::Active
@ Active
Running.
Definition: thread_context.hh:102
etherpkt.hh
DistIface::spawnRecvThread
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
Definition: dist_iface.cc:715
DistIface::startup
void startup()
Definition: dist_iface.cc:810
DistIface::distIfaceId
unsigned distIfaceId
Unique id for the dist link.
Definition: dist_iface.hh:492
CheckpointOut
std::ostream CheckpointOut
Definition: serialize.hh:63
DistIface::recvPacket
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
DistIface::Sync::requestCkpt
virtual void requestCkpt(ReqType req)=0
EventManager
Definition: eventq.hh:973
trace.hh
DistIface::SyncEvent::draining
bool draining() const
Definition: dist_iface.hh:321
DistIface::serialize
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:745
DistIface::recvHeader
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
DistIface::Sync::lock
std::mutex lock
The lock to protect access to the Sync object.
Definition: dist_iface.hh:122
CheckpointIn
Definition: serialize.hh:67
DistIface::packetOut
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
Definition: dist_iface.cc:647
DistIface::recvScheduler
RecvScheduler recvScheduler
Meta information about data packets received.
Definition: dist_iface.hh:470
DistIface::SyncSwitch::SyncSwitch
SyncSwitch(int num_nodes)
Definition: dist_iface.cc:94
DistIface::sizeParam
static uint64_t sizeParam()
Getter for the dist size param.
Definition: dist_iface.cc:930
csprintf
std::string csprintf(const char *format, const Args &...args)
Definition: cprintf.hh:158
ArmISA::v
Bitfield< 28 > v
Definition: miscregs_types.hh:51
thread_context.hh
ThreadContext::getSystemPtr
virtual System * getSystemPtr()=0
DistIface::syncStart
Tick syncStart
Tick to schedule the first dist sync event.
Definition: dist_iface.hh:457
panic
#define panic(...)
This implements a cprintf based panic() function.
Definition: logging.hh:171
DistIface::syncRepeat
Tick syncRepeat
Frequency of dist sync events in ticks.
Definition: dist_iface.hh:461
curTick
Tick curTick()
The current simulated tick.
Definition: core.hh:45
DistIface::RecvScheduler::serialize
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:568

Generated on Wed Sep 30 2020 14:02:11 for gem5 by doxygen 1.8.17