guile-asyncs-batch-processing


Guile's asyncs for batch processing

by uriel.

While the code presented here can be used for real, the purpose of this page is to show what asyncs are and how they can be used to synchronise threads.

What is an async

An async is a thunk (a function that takes no arguments) that is created in a thread and evaluated in another thread. With asyncs the return value of the thunks is ignored.

System thunks are enqueued in a thread-specific sequence and evaluated when Guile thinks that "it is fine to do it". This means that, officially, it is not known when a system thunk will be evaluated.

User thunks are enqueued in a special Guile value and it is our responsibility to invoke a procedure to run them.

Here we use only system asyncs.


Let's create two threads, then from one enqueue a thunk to be evaluated in the other:

 (define 1st (begin-thread 
              (format #t "running 1st thread ~A~%" (current-thread)) 
              (usleep 50000))) 
  
 (define 2nd (begin-thread 
              (format #t "running 2nd thread ~A~%" (current-thread)) 
              (system-async-mark 
               (lambda () 
                 (format #t "from thread ~A to thread ~A~%" 2nd 1st)) 
               1st) 
              (usleep 50000))) 
  
 (sleep 1) 

It is that simple: system-async-mark takes two arguments, the thunk and an optional thread identifier that defaults to (current-thread).


We can make two threads talk to each other.

 (use-modules (ice-9 threads)) 
  
 (define 1st-incoming #f) 
 (define 2nd-incoming #f) 
  
 (define-macro (recv varname) 
   `(begin 
      (while (not ,varname) 
        (usleep 1000)) 
      (format #t ,varname) 
      (set! ,varname #f))) 
  
 (define-macro (send varname other-thread) 
   `(system-async-mark (lambda () 
                         (set! ,varname string)) 
                       ,other-thread)) 
  
 (define (1st-recv) 
   (recv 1st-incoming)) 
  
 (define (2nd-recv) 
   (recv 2nd-incoming)) 
  
 (define (1st-send string) 
   (send 2nd-incoming 2nd)) 
  
 (define (2nd-send string) 
   (send 1st-incoming 1st)) 
  
 (define 1st (begin-thread 
              (1st-recv) 
              (1st-send "ciao 2nd thread~%") 
              (1st-recv) 
              (1st-send "fine thanks!~%") 
              (1st-recv) 
              (1st-send "ya, too~%"))) 
  
 (define 2nd (begin-thread 
              (2nd-send "ciao 1st thread~%") 
              (2nd-recv) 
              (2nd-send "how are you?~%") 
              (2nd-recv) 
              (2nd-send "see ya~%") 
              (2nd-recv))) 
  
 (sleep 1) 

A batch processing loop

Batch processing means to enqueue a list of jobs and process them asynchronously with respect to the enqueuing entity. To do this we create a background thread that loops over a condition; we need a way to start the loop and to stop it:

 (use-modules (ice-9 threads)) 
  
 (define bg-run #f) 
 (define bg-thread #f) 
  
 (define (bg-start) 
   (set! bg-run #t) 
   (set! bg-thread (begin-thread 
                    (while bg-run 
                      (format #t "loop ") 
                      (usleep 1000))))) 
  
 (define (bg-stop) 
   (system-async-mark (lambda () 
                        (set! bg-run #f)) bg-thread)) 
  
 (bg-start) 
 (sleep 1) 
 (bg-stop) 

The only thing that is missing is a function to enqueue a thunk; it does the same thing of bg-stop. Here is the full code:

 (use-modules (ice-9 threads)) 
  
 (define bg-run #f) 
 (define bg-thread #f) 
  
 (define (bg-start) 
   (set! bg-run #t) 
   (set! bg-thread (begin-thread 
                    (while bg-run 
                      (usleep 1000))))) 
  
 (define (bg-enqueue thunk) 
   (system-async-mark thunk bg-thread)) 
  
 (define (bg-stop) 
   (bg-enqueue (lambda () 
                 (set! bg-run #f)))) 
  
 ;; ------------------------------------------------------------ 
 ;; test 
  
 (bg-start) 
 (bg-enqueue (lambda () 
               (usleep 1000) 
               (format #t "start job 1 ") 
               (usleep 5000) 
               (format #t "end job 1~%"))) 
 (bg-enqueue (lambda () 
               (format #t "start job 2 ") 
               (usleep 1000) 
               (format #t "end job 2~%"))) 
 (bg-enqueue (lambda () 
               (format #t "start job 3 ") 
               (usleep 3000) 
               (format #t "end job 3~%"))) 
 (format #t "waiting for jobs to finish~%") 
 (sleep 1) 
 (bg-stop) 

Unofficially, one of the points in which Guile runs asyncs is when we invoke, in a way or the other, the select function; this happens when we invoke the usleee procedure. So the background thread evaluates asyncs each 1000 microseconds.


category-guile