mod.rs 17.6 KB
Newer Older
1
// Copyright 2017 The Australian National University
qinsoon's avatar
qinsoon committed
2
//
3 4 5
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
qinsoon's avatar
qinsoon committed
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
qinsoon's avatar
qinsoon committed
8
//
9 10 11 12 13 14
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15
use heap::*;
16
use objectmodel::*;
qinsoon's avatar
qinsoon committed
17
use MY_GC;
18
use std::sync::atomic::{AtomicIsize, AtomicUsize, AtomicBool, Ordering};
19 20
use std::sync::{Arc, Mutex, Condvar, RwLock};

21
use crossbeam::sync::chase_lev::*;
22 23 24 25
use std::sync::mpsc;
use std::sync::mpsc::channel;
use std::thread;
use std::sync::atomic;
26
use std::mem::transmute;
27 28 29 30 31

lazy_static! {
    static ref STW_COND : Arc<(Mutex<usize>, Condvar)> = {
        Arc::new((Mutex::new(0), Condvar::new()))
    };
qinsoon's avatar
qinsoon committed
32

33 34 35
    static ref ROOTS : RwLock<Vec<ObjectReference>> = RwLock::new(vec![]);
}

36
pub static ENABLE_GC: AtomicBool = AtomicBool::new(false);
37

38
static CONTROLLER: AtomicIsize = AtomicIsize::new(0);
qinsoon's avatar
qinsoon committed
39
const NO_CONTROLLER: isize = -1;
40

qinsoon's avatar
qinsoon committed
41
pub fn init(n_gcthreads: usize) {
42
    CONTROLLER.store(NO_CONTROLLER, Ordering::SeqCst);
qinsoon's avatar
qinsoon committed
43
    GC_THREADS.store(n_gcthreads, Ordering::SeqCst);
qinsoon's avatar
qinsoon committed
44
    GC_COUNT.store(0, Ordering::SeqCst);
45 46 47 48
}

pub fn trigger_gc() {
    trace!("Triggering GC...");
qinsoon's avatar
qinsoon committed
49

50
    for m in MUTATORS.write().unwrap().iter_mut() {
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
        if m.is_some() {
            m.as_mut().unwrap().set_take_yield(true);
        }
    }
}

#[cfg(target_arch = "x86_64")]
#[link(name = "gc_clib_x64")]
extern "C" {
    fn immmix_get_stack_ptr() -> Address;
    pub fn set_low_water_mark();
    fn get_low_water_mark() -> Address;
    fn get_registers() -> *const Address;
    fn get_registers_count() -> i32;
}

67 68 69 70 71 72 73 74 75 76
#[cfg(target_arch = "aarch64")]
#[link(name = "gc_clib_aarch64")]
extern "C" {
    fn immmix_get_stack_ptr() -> Address;
    pub fn set_low_water_mark();
    fn get_low_water_mark() -> Address;
    fn get_registers() -> *const Address;
    fn get_registers_count() -> i32;
}

77
pub fn stack_scan() -> Vec<ObjectReference> {
78
    trace!("stack scanning...");
qinsoon's avatar
qinsoon committed
79 80
    let stack_ptr: Address = unsafe { immmix_get_stack_ptr() };

81 82 83
    if cfg!(debug_assertions) {
        if !stack_ptr.is_aligned_to(8) {
            use std::process;
qinsoon's avatar
qinsoon committed
84 85 86 87 88
            println!(
                "trying to scanning stack, however the current stack pointer is 0x{:x}, \
                 which is not aligned to 8bytes",
                stack_ptr
            );
89 90 91
            process::exit(102);
        }
    }
qinsoon's avatar
qinsoon committed
92 93 94

    let low_water_mark: Address = unsafe { get_low_water_mark() };

95 96
    let mut cursor = stack_ptr;
    let mut ret = vec![];
97

qinsoon's avatar
qinsoon committed
98 99 100
    let gccontext_guard = MY_GC.read().unwrap();
    let gccontext = gccontext_guard.as_ref().unwrap();

101
    while cursor < low_water_mark {
qinsoon's avatar
qinsoon committed
102 103
        let value: Address = unsafe { cursor.load::<Address>() };

104
        if gccontext.is_heap_object(value) {
qinsoon's avatar
qinsoon committed
105
            ret.push(unsafe { value.to_object_reference() });
106
        }
qinsoon's avatar
qinsoon committed
107

108
        cursor = cursor + POINTER_SIZE;
109
    }
qinsoon's avatar
qinsoon committed
110

111
    let roots_from_stack = ret.len();
qinsoon's avatar
qinsoon committed
112 113 114
    let registers_count = unsafe { get_registers_count() };
    let registers = unsafe { get_registers() };

115
    for i in 0..registers_count {
qinsoon's avatar
qinsoon committed
116 117
        let value = unsafe { *registers.offset(i as isize) };

118
        if gccontext.is_heap_object(value) {
qinsoon's avatar
qinsoon committed
119
            ret.push(unsafe { value.to_object_reference() });
120 121
        }
    }
qinsoon's avatar
qinsoon committed
122

123
    let roots_from_registers = ret.len() - roots_from_stack;
qinsoon's avatar
qinsoon committed
124 125 126 127 128 129 130

    trace!(
        "roots: {} from stack, {} from registers",
        roots_from_stack,
        roots_from_registers
    );

131 132 133 134
    ret
}

#[inline(never)]
135
pub fn sync_barrier(mutator: &mut Mutator) {
136
    let controller_id = CONTROLLER.compare_and_swap(-1, mutator.id() as isize, Ordering::SeqCst);
qinsoon's avatar
qinsoon committed
137 138 139 140 141 142 143

    trace!(
        "Mutator{} saw the controller is {}",
        mutator.id(),
        controller_id
    );

144 145
    // prepare the mutator for gc - return current block (if it has)
    mutator.prepare_for_gc();
qinsoon's avatar
qinsoon committed
146

147
    // user thread call back to prepare for gc
qinsoon's avatar
qinsoon committed
148 149
    //    USER_THREAD_PREPARE_FOR_GC.read().unwrap()();

150
    if controller_id != NO_CONTROLLER {
151 152 153 154 155 156
        // scan its stack
        {
            let mut thread_roots = stack_scan();
            ROOTS.write().unwrap().append(&mut thread_roots);
        }

157 158
        // this thread will block
        block_current_thread(mutator);
qinsoon's avatar
qinsoon committed
159

160
        // reset current mutator
161
        mutator.reset_after_gc();
162 163 164
    } else {
        // this thread is controller
        // other threads should block
165 166 167 168 169

        // init roots
        {
            // scan its stack
            let mut thread_roots = stack_scan();
qinsoon's avatar
qinsoon committed
170
            ROOTS.write().unwrap().append(&mut thread_roots);
171
        }
qinsoon's avatar
qinsoon committed
172

173 174 175
        // wait for all mutators to be blocked
        let &(ref lock, ref cvar) = &*STW_COND.clone();
        let mut count = 0;
qinsoon's avatar
qinsoon committed
176 177 178 179 180

        trace!(
            "expect {} mutators to park",
            *N_MUTATORS.read().unwrap() - 1
        );
181
        while count < *N_MUTATORS.read().unwrap() - 1 {
qinsoon's avatar
qinsoon committed
182
            let new_count = { *lock.lock().unwrap() };
183
            if new_count != count {
184 185 186 187
                count = new_count;
                trace!("count = {}", count);
            }
        }
qinsoon's avatar
qinsoon committed
188

189
        trace!("everyone stopped, gc will start");
qinsoon's avatar
qinsoon committed
190

191 192
        // roots->trace->sweep
        gc();
qinsoon's avatar
qinsoon committed
193

194 195
        // mutators will resume
        CONTROLLER.store(NO_CONTROLLER, Ordering::SeqCst);
196
        for t in MUTATORS.write().unwrap().iter_mut() {
197 198 199 200 201 202 203
            if t.is_some() {
                let t_mut = t.as_mut().unwrap();
                t_mut.set_take_yield(false);
                t_mut.set_still_blocked(false);
            }
        }
        // every mutator thread will reset themselves, so only reset current mutator here
204
        mutator.reset_after_gc();
205 206 207 208 209 210 211 212 213 214

        // resume
        {
            let mut count = lock.lock().unwrap();
            *count = 0;
            cvar.notify_all();
        }
    }
}

215
fn block_current_thread(mutator: &mut Mutator) {
216
    trace!("Mutator{} blocked", mutator.id());
qinsoon's avatar
qinsoon committed
217

218 219 220
    let &(ref lock, ref cvar) = &*STW_COND.clone();
    let mut count = lock.lock().unwrap();
    *count += 1;
qinsoon's avatar
qinsoon committed
221

222
    mutator.global.set_still_blocked(true);
qinsoon's avatar
qinsoon committed
223

224 225 226
    while mutator.global.is_still_blocked() {
        count = cvar.wait(count).unwrap();
    }
qinsoon's avatar
qinsoon committed
227

228 229 230
    trace!("Mutator{} unblocked", mutator.id());
}

231
pub static GC_COUNT: atomic::AtomicUsize = AtomicUsize::new(0);
232 233

fn gc() {
qinsoon's avatar
qinsoon committed
234
    if !ENABLE_GC.load(Ordering::SeqCst) {
235 236 237
        panic!("Triggering GC when GC is disabled");
    }

qinsoon's avatar
qinsoon committed
238 239
    GC_COUNT.store(
        GC_COUNT.load(atomic::Ordering::SeqCst) + 1,
240
        atomic::Ordering::SeqCst
qinsoon's avatar
qinsoon committed
241 242
    );

qinsoon's avatar
qinsoon committed
243 244 245
    // each space prepares for GC
    {
        let mut gccontext_guard = MY_GC.write().unwrap();
246
        let gccontext = gccontext_guard.as_mut().unwrap();
qinsoon's avatar
qinsoon committed
247 248
        gccontext.immix_tiny.prepare_for_gc();
        gccontext.immix_normal.prepare_for_gc();
qinsoon's avatar
qinsoon committed
249
        gccontext.lo.prepare_for_gc();
qinsoon's avatar
qinsoon committed
250 251
    }

252
    trace!("GC starts");
qinsoon's avatar
qinsoon committed
253

254 255
    // mark & trace
    {
qinsoon's avatar
qinsoon committed
256 257 258
        // creates root deque
        let mut roots: &mut Vec<ObjectReference> = &mut ROOTS.write().unwrap();

qinsoon's avatar
qinsoon committed
259 260
        let gccontext_guard = MY_GC.read().unwrap();
        let gccontext = gccontext_guard.as_ref().unwrap();
261 262 263
        for obj in gccontext.roots.iter() {
            roots.push(*obj);
        }
qinsoon's avatar
qinsoon committed
264

qinsoon's avatar
qinsoon committed
265 266
        trace!("total roots: {}", roots.len());

267
        start_trace(&mut roots);
268
    }
qinsoon's avatar
qinsoon committed
269

270
    trace!("trace done");
qinsoon's avatar
qinsoon committed
271

272 273
    // sweep
    {
qinsoon's avatar
qinsoon committed
274
        let mut gccontext_guard = MY_GC.write().unwrap();
275
        let gccontext = gccontext_guard.as_mut().unwrap();
276

277 278
        gccontext.immix_tiny.sweep();
        gccontext.immix_normal.sweep();
qinsoon's avatar
qinsoon committed
279
        gccontext.lo.sweep();
280
    }
qinsoon's avatar
qinsoon committed
281

qinsoon's avatar
qinsoon committed
282 283 284
    // clear existing roots (roots from last gc)
    ROOTS.write().unwrap().clear();

285 286 287
    trace!("GC finishes");
}

qinsoon's avatar
qinsoon committed
288
pub const PUSH_BACK_THRESHOLD: usize = 50;
289
pub static GC_THREADS: atomic::AtomicUsize = AtomicUsize::new(0);
290

291
const TRACE_GC: bool = true;
qinsoon's avatar
qinsoon committed
292

293 294
#[allow(unused_variables)]
#[inline(never)]
295
pub fn start_trace(work_stack: &mut Vec<ObjectReference>) {
296
    // creates root deque
297
    let (worker, stealer) = deque();
qinsoon's avatar
qinsoon committed
298

299 300 301 302 303
    while !work_stack.is_empty() {
        worker.push(work_stack.pop().unwrap());
    }

    loop {
qinsoon's avatar
qinsoon committed
304 305
        let (sender, receiver) = channel::<ObjectReference>();

306
        let mut gc_threads = vec![];
qinsoon's avatar
qinsoon committed
307 308 309
        let n_gcthreads = GC_THREADS.load(atomic::Ordering::SeqCst);
        trace!("launching {} gc threads...", n_gcthreads);
        for _ in 0..n_gcthreads {
310 311
            let new_stealer = stealer.clone();
            let new_sender = sender.clone();
312
            let t = thread::spawn(move || { start_steal_trace(new_stealer, new_sender); });
313 314
            gc_threads.push(t);
        }
qinsoon's avatar
qinsoon committed
315

316 317
        // only stealers own sender, when all stealers quit, the following loop finishes
        drop(sender);
qinsoon's avatar
qinsoon committed
318

319 320 321 322
        loop {
            let recv = receiver.recv();
            match recv {
                Ok(obj) => worker.push(obj),
323
                Err(_) => break
324 325
            }
        }
qinsoon's avatar
qinsoon committed
326

327 328
        match worker.try_pop() {
            Some(obj_ref) => worker.push(obj_ref),
329
            None => break
330 331 332 333 334
        }
    }
}

#[allow(unused_variables)]
335
fn start_steal_trace(stealer: Stealer<ObjectReference>, job_sender: mpsc::Sender<ObjectReference>) {
336
    let mut local_queue = vec![];
qinsoon's avatar
qinsoon committed
337

338 339 340
    loop {
        let work = {
            if !local_queue.is_empty() {
qinsoon's avatar
qinsoon committed
341 342 343
                let ret = local_queue.pop().unwrap();
                trace_if!(TRACE_GC, "got object {} from local queue", ret);
                ret
344 345
            } else {
                let work = stealer.steal();
qinsoon's avatar
qinsoon committed
346
                let ret = match work {
347 348
                    Steal::Empty => return,
                    Steal::Abort => continue,
349
                    Steal::Data(obj) => obj
qinsoon's avatar
qinsoon committed
350 351 352
                };
                trace_if!(TRACE_GC, "got object {} from global queue", ret);
                ret
353 354
            }
        };
qinsoon's avatar
qinsoon committed
355

356
        steal_trace_object(work, &mut local_queue, &job_sender);
357
    }
qinsoon's avatar
qinsoon committed
358
}
359 360

#[inline(always)]
qinsoon's avatar
qinsoon committed
361 362 363
pub fn steal_trace_object(
    obj: ObjectReference,
    local_queue: &mut Vec<ObjectReference>,
364
    job_sender: &mpsc::Sender<ObjectReference>
qinsoon's avatar
qinsoon committed
365
) {
366 367
    match SpaceDescriptor::get(obj) {
        SpaceDescriptor::ImmixTiny => {
qinsoon's avatar
qinsoon committed
368
            let mut space = ImmixSpace::get::<ImmixSpace>(obj.to_address());
369
            // mark current object traced
qinsoon's avatar
qinsoon committed
370
            space.mark_object_traced(obj);
371 372

            let encode = unsafe {
qinsoon's avatar
qinsoon committed
373 374 375
                space
                    .get_type_byte_slot(space.get_word_index(obj.to_address()))
                    .load::<TinyObjectEncode>()
376 377
            };

378
            trace_tiny_object(obj, &encode, local_queue, job_sender);
379 380
        }
        SpaceDescriptor::ImmixNormal => {
qinsoon's avatar
qinsoon committed
381
            let mut space = ImmixSpace::get::<ImmixSpace>(obj.to_address());
382
            //mark current object traced
qinsoon's avatar
qinsoon committed
383
            space.mark_object_traced(obj);
384

385 386 387
            let type_slot = space.get_type_byte_slot(space.get_word_index(obj.to_address()));
            let encode = unsafe { type_slot.load::<MediumObjectEncode>() };
            let small_encode: &SmallObjectEncode = unsafe { transmute(&encode) };
388

389 390 391 392
            if small_encode.is_small() {
                trace_small_object(obj, small_encode, local_queue, job_sender);
            } else {
                trace_medium_object(obj, &encode, local_queue, job_sender);
393 394
            }
        }
qinsoon's avatar
qinsoon committed
395
        SpaceDescriptor::Freelist => {
qinsoon's avatar
qinsoon committed
396
            let mut space = FreelistSpace::get::<FreelistSpace>(obj.to_address());
qinsoon's avatar
qinsoon committed
397 398 399
            space.mark_object_traced(obj);

            let encode = space.get_type_encode(obj);
400 401 402 403 404 405 406 407 408
            trace_large_object(obj, &encode, local_queue, job_sender);
        }
        SpaceDescriptor::Immortal => {
            let addr = obj.to_address();
            let hdr: &mut ImmortalObjectHeader = unsafe {
                (addr - IMMORTAL_OBJECT_HEADER_SIZE).to_ref_mut::<ImmortalObjectHeader>()
            };
            // mark as traced
            hdr.gc_byte = 1;
409 410 411 412 413 414 415 416 417 418 419 420
            unsafe {
                match hdr.encode {
                    ObjectEncode::Tiny(ref enc) => trace_tiny_object(obj, enc, local_queue, job_sender),
                    ObjectEncode::Small(ref enc) => {
                        trace_small_object(obj, enc, local_queue, job_sender)
                    }
                    ObjectEncode::Medium(ref enc) => {
                        trace_medium_object(obj, enc, local_queue, job_sender)
                    }
                    ObjectEncode::Large(ref enc) => {
                        trace_large_object(obj, enc, local_queue, job_sender)
                    }
qinsoon's avatar
qinsoon committed
421 422 423
                }
            }
        }
424 425
    }
}
426

427
#[inline(always)]
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
fn trace_tiny_object(
    obj: ObjectReference,
    encode: &TinyObjectEncode,
    local_queue: &mut Vec<ObjectReference>,
    job_sender: &mpsc::Sender<ObjectReference>
) {
    trace_if!(TRACE_GC, "  trace tiny obj: {} ({:?})", obj, encode);
    for i in 0..encode.n_fields() {
        trace_word(
            encode.field(i),
            obj,
            i << LOG_POINTER_SIZE,
            local_queue,
            job_sender
        )
    }
}

fn trace_small_object(
    obj: ObjectReference,
    encode: &SmallObjectEncode,
    local_queue: &mut Vec<ObjectReference>,
    job_sender: &mpsc::Sender<ObjectReference>
) {
    let type_id = encode.type_id();
    let size = encode.size();
    trace_if!(TRACE_GC, "  trace small obj: {} ({:?})", obj, encode);
    trace_if!(TRACE_GC, "        id {}, size {}", type_id, size);
    trace_short_type_encode(
        obj,
        &GlobalTypeTable::table()[type_id],
        size,
        local_queue,
        job_sender
    );
}

fn trace_medium_object(
    obj: ObjectReference,
    encode: &MediumObjectEncode,
    local_queue: &mut Vec<ObjectReference>,
    job_sender: &mpsc::Sender<ObjectReference>
) {
    let type_id = encode.type_id();
    let size = encode.size();
    trace_if!(TRACE_GC, "  trace medium obj: {} ({:?})", obj, encode);
    trace_if!(TRACE_GC, "        id {}, size {}", type_id, size);
    trace_short_type_encode(
        obj,
        &GlobalTypeTable::table()[type_id],
        size,
        local_queue,
        job_sender
    );
}

fn trace_short_type_encode(
    obj: ObjectReference,
    type_encode: &ShortTypeEncode,
    type_size: ByteSize,
    local_queue: &mut Vec<ObjectReference>,
    job_sender: &mpsc::Sender<ObjectReference>
) {
    let mut offset: ByteSize = 0;
    trace_if!(TRACE_GC, "  -fix part-");
    for i in 0..type_encode.fix_len() {
        trace_word(type_encode.fix_ty(i), obj, offset, local_queue, job_sender);
        offset += POINTER_SIZE;
    }
    // for variable part
    if type_encode.var_len() != 0 {
        trace_if!(TRACE_GC, "  -var part-");
        while offset < type_size {
            for i in 0..type_encode.var_len() {
                trace_word(type_encode.var_ty(i), obj, offset, local_queue, job_sender);
                offset += POINTER_SIZE;
            }
        }
    }
    trace_if!(TRACE_GC, "  -done-");
}

fn trace_large_object(
    obj: ObjectReference,
    encode: &LargeObjectEncode,
    local_queue: &mut Vec<ObjectReference>,
    job_sender: &mpsc::Sender<ObjectReference>
) {
    let tyid = encode.type_id();
    let ty = GlobalTypeTable::get_full_type(tyid);

    let mut offset: ByteSize = 0;
    // fix part
    trace_if!(TRACE_GC, "  -fix part-");
    for &word_ty in ty.fix.iter() {
        trace_word(word_ty, obj, offset, local_queue, job_sender);
        offset += POINTER_SIZE;
    }
    let end_of_object = encode.size();
    while offset < end_of_object {
        trace_if!(TRACE_GC, "  -var part-");
        for &word_ty in ty.var.iter() {
            trace_word(word_ty, obj, offset, local_queue, job_sender);
            offset += POINTER_SIZE;
            debug_assert!(offset <= end_of_object);
        }
    }
    trace_if!(TRACE_GC, "  -done-");
}

#[inline(always)]
539 540 541
fn trace_word(
    word_ty: WordType,
    obj: ObjectReference,
542
    offset: ByteSize,
543 544 545
    local_queue: &mut Vec<ObjectReference>,
    job_sender: &mpsc::Sender<ObjectReference>
) {
qinsoon's avatar
qinsoon committed
546 547 548 549 550 551 552
    trace_if!(
        TRACE_GC,
        "  follow field (offset: {}) of {} with type {:?}",
        offset,
        obj,
        word_ty
    );
553 554 555 556 557 558
    match word_ty {
        WordType::NonRef => {}
        WordType::Ref => {
            let field_addr = obj.to_address() + offset;
            let edge = unsafe { field_addr.load::<ObjectReference>() };

qinsoon's avatar
qinsoon committed
559 560 561 562
            if edge.to_address().is_zero() {
                return;
            }

563 564
            match SpaceDescriptor::get(edge) {
                SpaceDescriptor::ImmixTiny | SpaceDescriptor::ImmixNormal => {
qinsoon's avatar
qinsoon committed
565 566
                    let space = ImmixSpace::get::<ImmixSpace>(edge.to_address());
                    if !space.is_object_traced(edge) {
567 568 569
                        steal_process_edge(edge, local_queue, job_sender);
                    }
                }
qinsoon's avatar
qinsoon committed
570
                SpaceDescriptor::Freelist => {
qinsoon's avatar
qinsoon committed
571
                    let space = FreelistSpace::get::<FreelistSpace>(edge.to_address());
qinsoon's avatar
qinsoon committed
572 573 574 575 576 577 578
                    if !space.is_object_traced(edge) {
                        debug!("edge {} is not traced, trace it", edge);
                        steal_process_edge(edge, local_queue, job_sender);
                    } else {
                        debug!("edge {} is traced, skip", edge);
                    }
                }
579
                SpaceDescriptor::Immortal => unimplemented!()
580
            }
qinsoon's avatar
qinsoon committed
581
        }
qinsoon's avatar
qinsoon committed
582 583 584 585 586
        WordType::WeakRef | WordType::TaggedRef => {
            use std::process;
            error!("unimplemented");
            process::exit(1);
        }
587 588 589 590 591 592 593 594 595 596 597 598 599
    }
}

#[inline(always)]
fn steal_process_edge(
    edge: ObjectReference,
    local_queue: &mut Vec<ObjectReference>,
    job_sender: &mpsc::Sender<ObjectReference>
) {
    if local_queue.len() >= PUSH_BACK_THRESHOLD {
        job_sender.send(edge).unwrap();
    } else {
        local_queue.push(edge);
600 601
    }
}