gem5  v20.0.0.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
dist_iface.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-2016 ARM Limited
3  * All rights reserved
4  *
5  * The license below extends only to copyright in the software and shall
6  * not be construed as granting a license to any other intellectual
7  * property including but not limited to intellectual property relating
8  * to a hardware implementation of the functionality of the software
9  * licensed hereunder. You may use the software subject to the license
10  * terms below provided that you ensure that this notice is replicated
11  * unmodified and in its entirety in all distributions of the software,
12  * modified or unmodified, in source code or in binary form.
13  *
14  * Redistribution and use in source and binary forms, with or without
15  * modification, are permitted provided that the following conditions are
16  * met: redistributions of source code must retain the above copyright
17  * notice, this list of conditions and the following disclaimer;
18  * redistributions in binary form must reproduce the above copyright
19  * notice, this list of conditions and the following disclaimer in the
20  * documentation and/or other materials provided with the distribution;
21  * neither the name of the copyright holders nor the names of its
22  * contributors may be used to endorse or promote products derived from
23  * this software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  */
37 
38 /* @file
39  * The interface class for dist-gem5 simulations.
40  */
41 
42 #include "dev/net/dist_iface.hh"
43 
44 #include <queue>
45 #include <thread>
46 
47 #include "base/random.hh"
48 #include "base/trace.hh"
49 #include "cpu/thread_context.hh"
50 #include "debug/DistEthernet.hh"
51 #include "debug/DistEthernetPkt.hh"
52 #include "dev/net/etherpkt.hh"
53 #include "sim/sim_exit.hh"
54 #include "sim/sim_object.hh"
55 #include "sim/system.hh"
56 
57 using namespace std;
59 System *DistIface::sys = nullptr;
61 unsigned DistIface::distIfaceNum = 0;
62 unsigned DistIface::recvThreadsNum = 0;
63 DistIface *DistIface::master = nullptr;
64 bool DistIface::isSwitch = false;
65 
66 void
67 DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
68 {
69  if (start_tick < nextAt) {
70  nextAt = start_tick;
71  inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
72  }
73 
74  if (repeat_tick == 0)
75  panic("Dist synchronisation interval must be greater than zero");
76 
77  if (repeat_tick < nextRepeat) {
78  nextRepeat = repeat_tick;
79  inform("Dist synchronisation interval is changed to %lu.\n",
80  nextRepeat);
81  }
82 }
83 
84 void
86 {
87  std::unique_lock<std::mutex> sync_lock(lock);
88  waitNum = 0;
89  isAbort = true;
90  sync_lock.unlock();
91  cv.notify_one();
92 }
93 
95 {
96  numNodes = num_nodes;
97  waitNum = num_nodes;
98  numExitReq = 0;
99  numCkptReq = 0;
100  numStopSyncReq = 0;
101  doExit = false;
102  doCkpt = false;
103  doStopSync = false;
104  nextAt = std::numeric_limits<Tick>::max();
105  nextRepeat = std::numeric_limits<Tick>::max();
106  isAbort = false;
107 }
108 
110 {
111  waitNum = 0;
112  needExit = ReqType::none;
113  needCkpt = ReqType::none;
114  needStopSync = ReqType::none;
115  doExit = false;
116  doCkpt = false;
117  doStopSync = false;
118  nextAt = std::numeric_limits<Tick>::max();
119  nextRepeat = std::numeric_limits<Tick>::max();
120  isAbort = false;
121 }
122 
123 bool
125 {
126  std::unique_lock<std::mutex> sync_lock(lock);
127  Header header;
128 
129  assert(waitNum == 0);
130  assert(!isAbort);
131  waitNum = DistIface::recvThreadsNum;
132  // initiate the global synchronisation
133  header.msgType = MsgType::cmdSyncReq;
134  header.sendTick = curTick();
135  header.syncRepeat = nextRepeat;
136  header.needCkpt = needCkpt;
137  header.needStopSync = needStopSync;
138  if (needCkpt != ReqType::none)
139  needCkpt = ReqType::pending;
140  header.needExit = needExit;
141  if (needExit != ReqType::none)
142  needExit = ReqType::pending;
143  if (needStopSync != ReqType::none)
144  needStopSync = ReqType::pending;
145  DistIface::master->sendCmd(header);
146  // now wait until all receiver threads complete the synchronisation
147  auto lf = [this]{ return waitNum == 0; };
148  cv.wait(sync_lock, lf);
149  // global synchronisation is done.
150  assert(isAbort || !same_tick || (nextAt == curTick()));
151  return !isAbort;
152 }
153 
154 
155 bool
157 {
158  std::unique_lock<std::mutex> sync_lock(lock);
159  Header header;
160  // Wait for the sync requests from the nodes
161  if (waitNum > 0) {
162  auto lf = [this]{ return waitNum == 0; };
163  cv.wait(sync_lock, lf);
164  }
165  assert(waitNum == 0);
166  if (isAbort) // sync aborted
167  return false;
168  assert(!same_tick || (nextAt == curTick()));
169  waitNum = numNodes;
170  // Complete the global synchronisation
171  header.msgType = MsgType::cmdSyncAck;
172  header.sendTick = nextAt;
173  header.syncRepeat = nextRepeat;
174  if (doCkpt || numCkptReq == numNodes) {
175  doCkpt = true;
176  header.needCkpt = ReqType::immediate;
177  numCkptReq = 0;
178  } else {
179  header.needCkpt = ReqType::none;
180  }
181  if (doExit || numExitReq == numNodes) {
182  doExit = true;
183  header.needExit = ReqType::immediate;
184  } else {
185  header.needExit = ReqType::none;
186  }
187  if (doStopSync || numStopSyncReq == numNodes) {
188  doStopSync = true;
189  numStopSyncReq = 0;
190  header.needStopSync = ReqType::immediate;
191  } else {
192  header.needStopSync = ReqType::none;
193  }
194  DistIface::master->sendCmd(header);
195  return true;
196 }
197 
198 bool
200  Tick sync_repeat,
201  ReqType need_ckpt,
202  ReqType need_exit,
203  ReqType need_stop_sync)
204 {
205  std::unique_lock<std::mutex> sync_lock(lock);
206  if (isAbort) // sync aborted
207  return false;
208  assert(waitNum > 0);
209 
210  if (send_tick > nextAt)
211  nextAt = send_tick;
212  if (nextRepeat > sync_repeat)
213  nextRepeat = sync_repeat;
214 
215  if (need_ckpt == ReqType::collective)
216  numCkptReq++;
217  else if (need_ckpt == ReqType::immediate)
218  doCkpt = true;
219  if (need_exit == ReqType::collective)
220  numExitReq++;
221  else if (need_exit == ReqType::immediate)
222  doExit = true;
223  if (need_stop_sync == ReqType::collective)
224  numStopSyncReq++;
225  else if (need_stop_sync == ReqType::immediate)
226  doStopSync = true;
227 
228  waitNum--;
229  // Notify the simulation thread if the on-going sync is complete
230  if (waitNum == 0) {
231  sync_lock.unlock();
232  cv.notify_one();
233  }
234  // The receive thread must keep alive in the switch until the node
235  // closes the connection. Thus, we always return true here.
236  return true;
237 }
238 
239 bool
241  Tick next_repeat,
242  ReqType do_ckpt,
243  ReqType do_exit,
244  ReqType do_stop_sync)
245 {
246  std::unique_lock<std::mutex> sync_lock(lock);
247  if (isAbort) // sync aborted
248  return false;
249  assert(waitNum > 0);
250 
251  nextAt = max_send_tick;
252  nextRepeat = next_repeat;
253  doCkpt = (do_ckpt != ReqType::none);
254  doExit = (do_exit != ReqType::none);
255  doStopSync = (do_stop_sync != ReqType::none);
256 
257  waitNum--;
258  // Notify the simulation thread if the on-going sync is complete
259  if (waitNum == 0) {
260  sync_lock.unlock();
261  cv.notify_one();
262  }
263  // The receive thread must finish when simulation is about to exit
264  return !doExit;
265 }
266 
267 void
269 {
270  std::lock_guard<std::mutex> sync_lock(lock);
271  assert(req != ReqType::none);
272  if (needCkpt != ReqType::none)
273  warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
274  if (needCkpt == ReqType::none || req == ReqType::immediate)
275  needCkpt = req;
276 }
277 
278 void
280 {
281  std::lock_guard<std::mutex> sync_lock(lock);
282  assert(req != ReqType::none);
283  if (needExit != ReqType::none)
284  warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
285  if (needExit == ReqType::none || req == ReqType::immediate)
286  needExit = req;
287 }
288 
289 void
291 {
292  if (doCkpt) {
293  // The first DistIface object called this right before writing the
294  // checkpoint. We need to drain the underlying physical network here.
295  // Note that other gem5 peers may enter this barrier at different
296  // ticks due to draining.
297  run(false);
298  // Only the "first" DistIface object has to perform the sync
299  doCkpt = false;
300  }
301 }
302 
303 void
305 {
306  int need_exit = static_cast<int>(needExit);
307  SERIALIZE_SCALAR(need_exit);
308 }
309 
310 void
312 {
313  int need_exit;
314  UNSERIALIZE_SCALAR(need_exit);
315  needExit = static_cast<ReqType>(need_exit);
316 }
317 
318 void
320 {
321  SERIALIZE_SCALAR(numExitReq);
322 }
323 
324 void
326 {
327  UNSERIALIZE_SCALAR(numExitReq);
328 }
329 
330 void
332 {
333  // Note that this may be called either from startup() or drainResume()
334 
335  // At this point, all DistIface objects has already called Sync::init() so
336  // we have a local minimum of the start tick and repeat for the periodic
337  // sync.
338  repeat = DistIface::sync->nextRepeat;
339  // Do a global barrier to agree on a common repeat value (the smallest
340  // one from all participating nodes.
341  if (!DistIface::sync->run(false))
342  panic("DistIface::SyncEvent::start() aborted\n");
343 
344  assert(!DistIface::sync->doCkpt);
345  assert(!DistIface::sync->doExit);
346  assert(!DistIface::sync->doStopSync);
347  assert(DistIface::sync->nextAt >= curTick());
348  assert(DistIface::sync->nextRepeat <= repeat);
349 
350  if (curTick() == 0)
351  assert(!scheduled());
352 
353  // Use the maximum of the current tick for all participating nodes or a
354  // user provided starting tick.
355  if (scheduled())
356  reschedule(DistIface::sync->nextAt);
357  else
358  schedule(DistIface::sync->nextAt);
359 
360  inform("Dist sync scheduled at %lu and repeats %lu\n", when(),
361  DistIface::sync->nextRepeat);
362 }
363 
364 void
366 {
367  // We may not start a global periodic sync while draining before taking a
368  // checkpoint. This is due to the possibility that peer gem5 processes
369  // may not hit the same periodic sync before they complete draining and
370  // that would make this periodic sync clash with sync called from
371  // DistIface::serialize() by other gem5 processes.
372  // We would need a 'distributed drain' solution to eliminate this
373  // restriction.
374  // Note that if draining was not triggered by checkpointing then we are
375  // fine since no extra global sync will happen (i.e. all peer gem5 will
376  // hit this periodic sync eventually).
377  panic_if(_draining && DistIface::sync->doCkpt,
378  "Distributed sync is hit while draining");
379  /*
380  * Note that this is a global event so this process method will be called
381  * by only exactly one thread.
382  */
383  /*
384  * We hold the eventq lock at this point but the receiver thread may
385  * need the lock to schedule new recv events while waiting for the
386  * dist sync to complete.
387  * Note that the other simulation threads also release their eventq
388  * locks while waiting for us due to the global event semantics.
389  */
390  {
392  // we do a global sync here that is supposed to happen at the same
393  // tick in all gem5 peers
394  if (!DistIface::sync->run(true))
395  return; // global sync aborted
396  // global sync completed
397  }
398  if (DistIface::sync->doCkpt)
399  exitSimLoop("checkpoint");
400  if (DistIface::sync->doExit) {
401  exitSimLoop("exit request from gem5 peers");
402  return;
403  }
404  if (DistIface::sync->doStopSync) {
405  DistIface::sync->doStopSync = false;
406  inform("synchronization disabled at %lu\n", curTick());
407 
408  // The switch node needs to wait for the next sync immediately.
409  if (DistIface::isSwitch) {
410  start();
411  } else {
412  // Wake up thread contexts on non-switch nodes.
413  for (int i = 0; i < DistIface::master->sys->numContexts(); i++) {
414  ThreadContext *tc =
415  DistIface::master->sys->getThreadContext(i);
416  if (tc->status() == ThreadContext::Suspended)
417  tc->activate();
418  else
419  warn_once("Tried to wake up thread in dist-gem5, but it "
420  "was already awake!\n");
421  }
422  }
423  return;
424  }
425  // schedule the next periodic sync
426  repeat = DistIface::sync->nextRepeat;
427  schedule(curTick() + repeat);
428 }
429 
430 void
432 {
433  // This is called from the receiver thread when it starts running. The new
434  // receiver thread shares the event queue with the simulation thread
435  // (associated with the simulated Ethernet link).
436  curEventQueue(eventManager->eventQueue());
437 
438  recvDone = recv_done;
439  linkDelay = link_delay;
440 }
441 
442 Tick
444  Tick send_delay,
445  Tick prev_recv_tick)
446 {
447  Tick recv_tick = send_tick + send_delay + linkDelay;
448  // sanity check (we need atleast a send delay long window)
449  assert(recv_tick >= prev_recv_tick + send_delay);
450  panic_if(prev_recv_tick + send_delay > recv_tick,
451  "Receive window is smaller than send delay");
452  panic_if(recv_tick <= curTick(),
453  "Simulators out of sync - missed packet receive by %llu ticks"
454  "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
455  "linkDelay: %lu )",
456  curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
457  linkDelay);
458 
459  return recv_tick;
460 }
461 
462 void
464 {
465  // Schedule pending packets asap in case link speed/delay changed when
466  // restoring from the checkpoint.
467  // This may be done during unserialize except that curTick() is unknown
468  // so we call this during drainResume().
469  // If we are not restoring from a checkppint then link latency could not
470  // change so we just return.
471  if (!ckptRestore)
472  return;
473 
475  while (!descQueue.empty()) {
476  Desc d = descQueue.front();
477  descQueue.pop();
478  d.sendTick = curTick();
479  d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed
480  v.push_back(d);
481  }
482 
483  for (auto &d : v)
484  descQueue.push(d);
485 
486  if (recvDone->scheduled()) {
487  assert(!descQueue.empty());
488  eventManager->reschedule(recvDone, curTick());
489  } else {
490  assert(descQueue.empty() && v.empty());
491  }
492  ckptRestore = false;
493 }
494 
495 void
497  Tick send_tick,
498  Tick send_delay)
499 {
500  // Note : this is called from the receiver thread
501  curEventQueue()->lock();
502  Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
503 
504  DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
505  "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
506  send_tick, send_delay, linkDelay, recv_tick);
507  // Every packet must be sent and arrive in the same quantum
508  assert(send_tick > master->syncEvent->when() -
509  master->syncEvent->repeat);
510  // No packet may be scheduled for receive in the arrival quantum
511  assert(send_tick + send_delay + linkDelay > master->syncEvent->when());
512 
513  // Now we are about to schedule a recvDone event for the new data packet.
514  // We use the same recvDone object for all incoming data packets. Packet
515  // descriptors are saved in the ordered queue. The currently scheduled
516  // packet is always on the top of the queue.
517  // NOTE: we use the event queue lock to protect the receive desc queue,
518  // too, which is accessed both by the receiver thread and the simulation
519  // thread.
520  descQueue.emplace(new_packet, send_tick, send_delay);
521  if (descQueue.size() == 1) {
522  assert(!recvDone->scheduled());
523  eventManager->schedule(recvDone, recv_tick);
524  } else {
525  assert(recvDone->scheduled());
526  panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
527  "Out of order packet received (recv_tick: %lu top(): %lu\n",
528  recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
529  }
530  curEventQueue()->unlock();
531 }
532 
535 {
536  // Note : this is called from the simulation thread when a receive done
537  // event is being processed for the link. We assume that the thread holds
538  // the event queue queue lock when this is called!
539  EthPacketPtr next_packet = descQueue.front().packet;
540  descQueue.pop();
541 
542  if (descQueue.size() > 0) {
543  Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
544  descQueue.front().sendDelay,
545  curTick());
546  eventManager->schedule(recvDone, recv_tick);
547  }
548  prevRecvTick = curTick();
549  return next_packet;
550 }
551 
552 void
554 {
555  SERIALIZE_SCALAR(sendTick);
556  SERIALIZE_SCALAR(sendDelay);
557  packet->serialize("rxPacket", cp);
558 }
559 
560 void
562 {
563  UNSERIALIZE_SCALAR(sendTick);
564  UNSERIALIZE_SCALAR(sendDelay);
565  packet = std::make_shared<EthPacketData>();
566  packet->unserialize("rxPacket", cp);
567 }
568 
569 void
571 {
572  SERIALIZE_SCALAR(prevRecvTick);
573  // serialize the receive desc queue
574  std::queue<Desc> tmp_queue(descQueue);
575  unsigned n_desc_queue = descQueue.size();
576  assert(tmp_queue.size() == descQueue.size());
577  SERIALIZE_SCALAR(n_desc_queue);
578  for (int i = 0; i < n_desc_queue; i++) {
579  tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
580  tmp_queue.pop();
581  }
582  assert(tmp_queue.empty());
583 }
584 
585 void
587 {
588  assert(descQueue.size() == 0);
589  assert(!recvDone->scheduled());
590  assert(!ckptRestore);
591 
592  UNSERIALIZE_SCALAR(prevRecvTick);
593  // unserialize the receive desc queue
594  unsigned n_desc_queue;
595  UNSERIALIZE_SCALAR(n_desc_queue);
596  for (int i = 0; i < n_desc_queue; i++) {
597  Desc recv_desc;
598  recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
599  descQueue.push(recv_desc);
600  }
601  ckptRestore = true;
602 }
603 
604 DistIface::DistIface(unsigned dist_rank,
605  unsigned dist_size,
606  Tick sync_start,
607  Tick sync_repeat,
608  EventManager *em,
609  bool use_pseudo_op,
610  bool is_switch, int num_nodes) :
611  syncStart(sync_start), syncRepeat(sync_repeat),
612  recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op),
613  rank(dist_rank), size(dist_size)
614 {
615  DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
616  isMaster = false;
617  if (master == nullptr) {
618  assert(sync == nullptr);
619  assert(syncEvent == nullptr);
620  isSwitch = is_switch;
621  if (is_switch)
622  sync = new SyncSwitch(num_nodes);
623  else
624  sync = new SyncNode();
625  syncEvent = new SyncEvent();
626  master = this;
627  isMaster = true;
628  }
630  distIfaceNum++;
631 }
632 
634 {
635  assert(recvThread);
636  recvThread->join();
637  delete recvThread;
638  if (distIfaceNum-- == 0) {
639  assert(syncEvent);
640  delete syncEvent;
641  assert(sync);
642  delete sync;
643  }
644  if (this == master)
645  master = nullptr;
646 }
647 
648 void
650 {
651  Header header;
652 
653  // Prepare a dist header packet for the Ethernet packet we want to
654  // send out.
656  header.sendTick = curTick();
657  header.sendDelay = send_delay;
658 
659  header.dataPacketLength = pkt->length;
660  header.simLength = pkt->simLength;
661 
662  // Send out the packet and the meta info.
663  sendPacket(header, pkt);
664 
665  DPRINTF(DistEthernetPkt,
666  "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
667  pkt->length, send_delay);
668 }
669 
670 void
671 DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
672 {
673  EthPacketPtr new_packet;
675 
676  // Initialize receive scheduler parameters
677  recvScheduler.init(recv_done, link_delay);
678 
679  // Main loop to wait for and process any incoming message.
680  for (;;) {
681  // recvHeader() blocks until the next dist header packet comes in.
682  if (!recvHeader(header)) {
683  // We lost connection to the peer gem5 processes most likely
684  // because one of them called m5 exit. So we stop here.
685  // Grab the eventq lock to stop the simulation thread
686  curEventQueue()->lock();
687  exitSimLoop("connection to gem5 peer got closed");
688  curEventQueue()->unlock();
689  // The simulation thread may be blocked in processing an on-going
690  // global synchronisation. Abort the sync to give the simulation
691  // thread a chance to make progress and process the exit event.
692  sync->abort();
693  // Finish receiver thread
694  break;
695  }
696 
697  // We got a valid dist header packet, let's process it
698  if (header.msgType == MsgType::dataDescriptor) {
699  recvPacket(header, new_packet);
700  recvScheduler.pushPacket(new_packet,
701  header.sendTick,
702  header.sendDelay);
703  } else {
704  // everything else must be synchronisation related command
705  if (!sync->progress(header.sendTick,
706  header.syncRepeat,
707  header.needCkpt,
708  header.needExit,
709  header.needStopSync))
710  // Finish receiver thread if simulation is about to exit
711  break;
712  }
713  }
714 }
715 
716 void
717 DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
718 {
719  assert(recvThread == nullptr);
720 
721  recvThread = new std::thread(&DistIface::recvThreadFunc,
722  this,
723  const_cast<Event *>(recv_done),
724  link_delay);
725  recvThreadsNum++;
726 }
727 
730 {
731  DPRINTF(DistEthernet,"DistIFace::drain() called\n");
732  // This can be called multiple times in the same drain cycle.
733  if (this == master)
734  syncEvent->draining(true);
735  return DrainState::Drained;
736 }
737 
738 void
740  DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
741  if (this == master)
742  syncEvent->draining(false);
744 }
745 
746 void
748 {
749  // Drain the dist interface before the checkpoint is taken. We cannot call
750  // this as part of the normal drain cycle because this dist sync has to be
751  // called exactly once after the system is fully drained.
752  sync->drainComplete();
753 
754  unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
755 
756  SERIALIZE_SCALAR(rank_orig);
757  SERIALIZE_SCALAR(dist_iface_id_orig);
758 
759  recvScheduler.serializeSection(cp, "recvScheduler");
760  if (this == master) {
761  sync->serializeSection(cp, "Sync");
762  }
763 }
764 
765 void
767 {
768  unsigned rank_orig, dist_iface_id_orig;
769  UNSERIALIZE_SCALAR(rank_orig);
770  UNSERIALIZE_SCALAR(dist_iface_id_orig);
771 
772  panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
773  rank, rank_orig);
774  panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
775  "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
776  dist_iface_id_orig);
777 
778  recvScheduler.unserializeSection(cp, "recvScheduler");
779  if (this == master) {
780  sync->unserializeSection(cp, "Sync");
781  }
782 }
783 
784 void
785 DistIface::init(const Event *done_event, Tick link_delay)
786 {
787  // Init hook for the underlaying message transport to setup/finalize
788  // communication channels
789  initTransport();
790 
791  // Spawn a new receiver thread that will process messages
792  // coming in from peer gem5 processes.
793  // The receive thread will also schedule a (receive) doneEvent
794  // for each incoming data packet.
795  spawnRecvThread(done_event, link_delay);
796 
797 
798  // Adjust the periodic sync start and interval. Different DistIface
799  // might have different requirements. The singleton sync object
800  // will select the minimum values for both params.
801  assert(sync != nullptr);
803 
804  // Initialize the seed for random generator to avoid the same sequence
805  // in all gem5 peer processes
806  assert(master != nullptr);
807  if (this == master)
808  random_mt.init(5489 * (rank+1) + 257);
809 }
810 
811 void
813 {
814  DPRINTF(DistEthernet, "DistIface::startup() started\n");
815  // Schedule synchronization unless we are not a switch in pseudo_op mode.
816  if (this == master && (!syncStartOnPseudoOp || isSwitch))
817  syncEvent->start();
818  DPRINTF(DistEthernet, "DistIface::startup() done\n");
819 }
820 
821 bool
823 {
824  bool ret = true;
825  DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
826  "period:%lu\n", delay, period);
827  if (master) {
828  if (delay == 0) {
829  inform("m5 checkpoint called with zero delay => triggering collaborative "
830  "checkpoint\n");
832  } else {
833  inform("m5 checkpoint called with non-zero delay => triggering immediate "
834  "checkpoint (at the next sync)\n");
836  }
837  if (period != 0)
838  inform("Non-zero period for m5_ckpt is ignored in "
839  "distributed gem5 runs\n");
840  ret = false;
841  }
842  return ret;
843 }
844 
845 void
847 {
848  std::lock_guard<std::mutex> sync_lock(lock);
849  needStopSync = req;
850 }
851 
852 void
854 {
855  // Unforunate that we have to populate the system pointer member this way.
856  master->sys = tc->getSystemPtr();
857 
858  // The invariant for both syncing and "unsyncing" is that all threads will
859  // stop executing intructions until the desired sync state has been reached
860  // for all nodes. This is the easiest way to prevent deadlock (in the case
861  // of "unsyncing") and causality errors (in the case of syncing).
862  if (master->syncEvent->scheduled()) {
863  inform("Request toggling syncronization off\n");
865 
866  // At this point, we have no clue when everyone will reach the sync
867  // stop point. Suspend execution of all local thread contexts.
868  // Dist-gem5 will reactivate all thread contexts when everyone has
869  // reached the sync stop point.
870 #if THE_ISA != NULL_ISA
871  for (int i = 0; i < master->sys->numContexts(); i++) {
873  if (tc->status() == ThreadContext::Active)
874  tc->quiesce();
875  }
876 #endif
877  } else {
878  inform("Request toggling syncronization on\n");
879  master->syncEvent->start();
880 
881  // We need to suspend all CPUs until the sync point is reached by all
882  // nodes to prevent causality errors. We can also schedule CPU
883  // activation here, since we know exactly when the next sync will
884  // occur.
885 #if THE_ISA != NULL_ISA
886  for (int i = 0; i < master->sys->numContexts(); i++) {
888  if (tc->status() == ThreadContext::Active)
889  tc->quiesceTick(master->syncEvent->when() + 1);
890  }
891 #endif
892  }
893 }
894 
895 bool
897 {
898  bool ret = true;
899  DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
900  delay);
901  if (master) {
902  // To successfully coordinate an exit, all nodes must be synchronising
903  if (!master->syncEvent->scheduled())
904  master->syncEvent->start();
905 
906  if (delay == 0) {
907  inform("m5 exit called with zero delay => triggering collaborative "
908  "exit\n");
910  } else {
911  inform("m5 exit called with non-zero delay => triggering immediate "
912  "exit (at the next sync)\n");
914  }
915  ret = false;
916  }
917  return ret;
918 }
919 
920 uint64_t
922 {
923  uint64_t val;
924  if (master) {
925  val = master->rank;
926  } else {
927  warn("Dist-rank parameter is queried in single gem5 simulation.");
928  val = 0;
929  }
930  return val;
931 }
932 
933 uint64_t
935 {
936  uint64_t val;
937  if (master) {
938  val = master->size;
939  } else {
940  warn("Dist-size parameter is queried in single gem5 simulation.");
941  val = 1;
942  }
943  return val;
944 }
bool run(bool same_tick) override
Core method to perform a full dist sync.
Definition: dist_iface.cc:124
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
Definition: dist_iface.cc:496
#define panic(...)
This implements a cprintf based panic() function.
Definition: logging.hh:163
#define DPRINTF(x,...)
Definition: trace.hh:225
output header
Definition: nop.cc:36
virtual System * getSystemPtr()=0
virtual void sendCmd(const Header &header)=0
Send out a control command to the remote end.
unsigned simLength
Length used for modeling timing in the simulator.
Definition: dist_packet.hh:91
Bitfield< 28 > v
void start()
Schedule the first periodic sync event.
Definition: dist_iface.cc:331
void drainComplete()
Definition: dist_iface.cc:290
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
Definition: dist_iface.cc:717
virtual void requestExit(ReqType req)=0
void startup()
Definition: dist_iface.cc:812
Bitfield< 7 > i
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
DrainState drain() override
Notify an object that it needs to drain its state.
Definition: dist_iface.cc:729
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
Definition: dist_iface.hh:500
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
Definition: dist_iface.cc:85
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
Definition: dist_iface.cc:199
bool run(bool same_tick) override
Core method to perform a full dist sync.
Definition: dist_iface.cc:156
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:553
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
Definition: dist_iface.hh:474
Bitfield< 2 > em
Definition: misc.hh:602
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
Definition: dist_iface.cc:534
static bool isSwitch
Is this node a switch?
Definition: dist_iface.hh:521
Definition: system.hh:72
Overload hash function for BasicBlockRange type.
Definition: vec_reg.hh:587
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
Definition: dist_iface.cc:463
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:561
Definition: cprintf.cc:40
Tick when() const
void drainResume() override
Resume execution after a successful drain.
Definition: dist_iface.cc:739
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:766
void init(uint32_t s)
Definition: random.cc:64
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:304
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
Definition: dist_iface.cc:240
std::thread * recvThread
Receiver thread pointer.
Definition: dist_iface.hh:466
static void toggleSync(ThreadContext *tc)
Trigger the master to start/stop synchronization.
Definition: dist_iface.cc:853
ThreadContext is the external interface to all thread state for anything outside of the CPU...
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:319
bool draining() const
Definition: dist_iface.hh:321
DrainState
Object drain/handover states.
Definition: drain.hh:71
virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync)=0
Callback when the receiver thread gets a sync ack message.
void serializeSection(CheckpointOut &cp, const char *name) const
Serialize an object into a new section.
Definition: serialize.cc:171
STL vector class.
Definition: stl.hh:37
MsgType msgType
The msg type field is valid for all header packets.
Definition: dist_packet.hh:85
void quiesce()
Quiesce thread context.
virtual void initTransport()=0
Init hook for the underlaying transport.
Bitfield< 63 > val
Definition: misc.hh:769
ThreadContext * getThreadContext(ContextID tid) const
Definition: system.hh:186
Received packet descriptor.
Definition: dist_iface.hh:339
static DistIface * master
The very first DistIface object created becomes the master.
Definition: dist_iface.hh:513
#define UNSERIALIZE_SCALAR(scalar)
Definition: serialize.hh:770
#define inform(...)
Definition: logging.hh:209
Draining buffers pending serialization/handover.
Tick curTick()
The current simulated tick.
Definition: core.hh:44
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
Definition: dist_iface.cc:649
bool scheduled() const
std::string csprintf(const char *format, const Args &...args)
Definition: cprintf.hh:158
This class implements global sync operations among gem5 peer processes.
Definition: dist_iface.hh:116
static uint64_t sizeParam()
Getter for the dist size param.
Definition: dist_iface.cc:934
void init(const Event *e, Tick link_delay)
Definition: dist_iface.cc:785
uint64_t Tick
Tick count type.
Definition: types.hh:61
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
Definition: dist_iface.cc:443
Tick nextRepeat
The repeat value for the next periodic sync.
Definition: dist_iface.hh:149
Tick syncStart
Tick to schedule the first dist sync event.
Definition: dist_iface.hh:457
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
Definition: dist_iface.cc:431
Bitfield< 9 > d
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
Definition: dist_iface.cc:822
EventQueue * curEventQueue()
Definition: eventq.hh:82
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
Definition: dist_iface.hh:508
unsigned numContexts() const
Definition: system.hh:198
std::shared_ptr< EthPacketData > EthPacketPtr
Definition: etherpkt.hh:87
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
Definition: dist_iface.cc:604
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
Definition: dist_iface.cc:896
const FlagsType none
Nothing extra to print.
Definition: info.hh:43
bool doStopSync
Flag is set if sync is to stop upon sync completion.
Definition: dist_iface.hh:145
void requestExit(ReqType req) override
Definition: dist_iface.cc:279
virtual void activate()=0
Set the status to Active.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:325
The global event to schedule periodic dist sync.
Definition: dist_iface.hh:295
#define warn_once(...)
Definition: logging.hh:212
Bitfield< 20 > sr
unsigned size
The number of gem5 processes comprising this dist simulation.
Definition: dist_iface.hh:484
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
Definition: dist_iface.cc:671
bool isMaster
Definition: dist_iface.hh:494
#define SERIALIZE_SCALAR(scalar)
Definition: serialize.hh:763
RecvScheduler recvScheduler
Meta information about data packets received.
Definition: dist_iface.hh:470
void process() override
This is a global event so process() will only be called by exactly one simulation thread...
Definition: dist_iface.cc:365
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
Definition: sim_events.cc:88
void unlock()
Definition: eventq.hh:880
unsigned dataPacketLength
Actual length of the simulated Ethernet packet.
Definition: dist_packet.hh:101
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:586
void requestStopSync(ReqType req) override
Definition: dist_iface.cc:846
Tick syncRepeat
Frequency of dist sync events in ticks.
Definition: dist_iface.hh:461
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
Definition: dist_iface.hh:517
std::ostream CheckpointOut
Definition: serialize.hh:63
static Sync * sync
The singleton Sync object to perform dist synchronisation.
Definition: dist_iface.hh:504
void init(Tick start, Tick repeat)
Initialize periodic sync params.
Definition: dist_iface.cc:67
Definition: eventq.hh:245
unsigned distIfaceId
Unique id for the dist link.
Definition: dist_iface.hh:492
static unsigned distIfaceNum
Number of DistIface objects (i.e.
Definition: dist_iface.hh:488
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:311
Random random_mt
Definition: random.cc:96
virtual void requestStopSync(ReqType req)=0
void requestCkpt(ReqType req) override
Definition: dist_iface.cc:268
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:570
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:747
virtual Status status() const =0
virtual ~DistIface()
Definition: dist_iface.cc:633
Temporarily inactive.
void lock()
Provide an interface for locking/unlocking the event queue.
Definition: eventq.hh:879
#define warn(...)
Definition: logging.hh:208
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
virtual void requestCkpt(ReqType req)=0
Temporarily release the event queue service lock.
Definition: eventq.hh:714
The interface class to talk to peer gem5 processes.
Definition: dist_iface.hh:99
static uint64_t rankParam()
Getter for the dist rank param.
Definition: dist_iface.cc:921
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
Definition: logging.hh:181
Bitfield< 5 > lock
Definition: types.hh:77
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
Definition: serialize.cc:178
SyncSwitch(int num_nodes)
Definition: dist_iface.cc:94
unsigned rank
The rank of this process among the gem5 peers.
Definition: dist_iface.hh:480

Generated on Thu May 28 2020 16:21:32 for gem5 by doxygen 1.8.13