Concurrency
Maggie provides Go-style concurrency primitives: lightweight processes backed by goroutines, channels for communication, and synchronization tools like mutexes, wait groups, and semaphores. If you have used Go's goroutines and channels, Maggie's model will feel familiar -- but everything is expressed through message passing rather than keywords.
The core idea is simple: fork a block to run it concurrently, use channels to pass values between processes, and use synchronization primitives to coordinate shared access when needed.
[42] fork wait
[3 + 4] fork wait
Forking Processes
Send fork to a block to run it in a new lightweight process backed by
a Go goroutine. The message returns a Process handle immediately --
the block executes concurrently in the background.
fork is the most common way to create concurrent work. The block
runs independently and its result can be retrieved later with wait.
Use forkWith: to pass a single argument to the block. The block
should take one parameter to receive it.
Non-local returns (^) inside a forked block are caught at the process boundary and treated as local returns. This prevents a ^ from escaping one goroutine into another.
[42] fork wait
[3 + 4] fork wait
['hello' , ' world'] fork wait
[:x | x * 10] forkWith: 5 . wait
You can also use the class-side method Process fork: which takes
a block argument:
p := Process fork: [100 factorial].
p wait
Waiting and Joining
Send wait to a Process to block the caller until the process finishes.
wait returns the result of the forked block -- the value of its last
expression.
isDone checks whether a process has finished without blocking. result
returns the process's result if it has finished, or nil if it is still
running.
p := [99] fork.
p wait
p := [42] fork.
p wait.
p isDone
"Waiting on multiple processes sequentially"
p1 := [10] fork.
p2 := [20] fork.
p3 := [30] fork.
p1 wait + p2 wait + p3 wait "=> 60"
Channels -- Basics
Channels are typed pipes for sending values between processes. Create
a channel with Channel new (unbuffered) or Channel new: n (buffered
with capacity n).
An unbuffered channel blocks the sender until a receiver is ready, and vice versa. A buffered channel allows up to n values to queue before the sender blocks.
send:-- blocking sendreceive-- blocking receivetrySend:-- non-blocking send, returns true/falsetryReceive-- non-blocking receive, returns value or nilclose-- close the channel (no more sends)
For doctests, always prefer buffered channels (new:) to avoid
deadlocks in single-line evaluation.
ch := Channel new: 1.
ch trySend: 42.
ch tryReceive >>> 42
ch := Channel new: 5.
ch trySend: 'hello' >>> true
(Channel new: 1) tryReceive >>> nil
(Channel new: 3) isEmpty >>> true
(Channel new: 5) capacity >>> 5
ch := Channel new: 3.
ch trySend: 'a'.
ch trySend: 'b'.
ch size >>> 2
Channels -- Closing and Draining
Send close to a channel when no more values will be sent. After
closing, trySend: returns false and isClosed returns true.
Receives will drain any remaining buffered values, then return nil.
ch := Channel new: 1.
ch close.
ch isClosed >>> true
ch := Channel new: 3.
ch trySend: 10.
ch trySend: 20.
ch close.
ch tryReceive >>> 10
ch := Channel new: 3.
ch trySend: 10.
ch trySend: 20.
ch close.
ch tryReceive.
ch tryReceive >>> 20
ch := Channel new: 1.
ch trySend: 'a'.
ch close.
ch trySend: 'b' >>> false
Channels -- Blocking Send and Receive
With send: and receive, one process can produce values while
another consumes them. send: blocks until a receiver takes the
value (unbuffered) or until buffer space is available (buffered).
receive blocks until a value is available.
Because blocking operations require concurrent processes, these patterns are shown as examples rather than doctests.
"Send and receive across two processes"
ch := Channel new.
[ch send: 42] fork.
ch receive "=> 42"
"Streaming values through a buffered channel"
ch := Channel new: 10.
[1 to: 5 do: [:i | ch send: i * 10]. ch close] fork.
results := Array new: 5.
0 to: 4 do: [:i | results at: i put: ch receive].
results "=> #(10 20 30 40 50)"
ch := Channel new: 1.
[ch send: 'hello'] fork.
ch receive
Channel Select
Channel select: waits on multiple channels simultaneously and
dispatches to the handler of whichever channel becomes ready first.
Each case is built with onReceive: which creates a channel-handler
association.
Channel select:ifNone: adds a default block that runs if no
channel is immediately ready, making the select non-blocking.
"Wait on two channels, handle whichever is ready first"
ch1 := Channel new: 1.
ch2 := Channel new: 1.
ch1 send: 'from-1'.
result := Channel select: {
ch1 onReceive: [:v | 'Got: ', v].
ch2 onReceive: [:v | 'Got: ', v]
}.
result "=> 'Got: from-1'"
"Non-blocking select with a default"
ch := Channel new: 1.
result := Channel select: {
ch onReceive: [:v | v]
} ifNone: [
'nothing ready'
].
result "=> 'nothing ready'"
Select is the Maggie equivalent of Go's select statement. It is
essential for building event loops, multiplexers, and timeout
patterns.
Mutex
A Mutex (mutual exclusion lock) ensures that only one process at a
time can access a shared resource. The preferred usage is critical:,
which acquires the lock, evaluates a block, and guarantees release
when done.
lock/unlock-- manual acquire and releasetryLock-- non-blocking, returns true if acquiredcritical:-- evaluate a block while holding the lock (preferred)isLocked-- check current state
Mutex new isLocked
Mutex new tryLock
Mutex new critical: [42]
m := Mutex new.
m critical: [3 + 4]
m := Mutex new.
m lock.
m isLocked
m := Mutex new.
m lock.
m tryLock
m := Mutex new.
m critical: ['done'].
m isLocked
"Protecting a shared counter from concurrent modification"
count := 0.
mutex := Mutex new.
wg := WaitGroup new.
1 to: 100 do: [:i |
wg wrap: [mutex critical: [count := count + 1]]
].
wg wait.
count "=> 100"
WaitGroup
A WaitGroup is a synchronization barrier. You add a count before
forking work, decrement with done when each unit finishes, and
wait blocks until the count reaches zero.
add:-- increment the counterdone-- decrement the counterwait-- block until counter is zerocount-- read current counter valuewrap:-- convenience: add 1, fork block, auto-done
wrap: is the most common pattern. It combines incrementing the
counter, forking, and calling done into a single message.
WaitGroup new count
wg := WaitGroup new.
wg add: 3.
wg count
wg := WaitGroup new.
wg add: 2.
wg done.
wg count
wg := WaitGroup new.
wg wrap: [42].
wg wait.
wg count
"Fork 5 workers using wrap:, then wait for all to finish"
ch := Channel new: 5.
wg := WaitGroup new.
1 to: 5 do: [:i |
wg wrap: [ch send: i * 10]
].
wg wait.
ch close
forkAll: forks every block and waits for all to complete.
forkAllCollect: does the same but collects results in order.
wg := WaitGroup new.
results := wg forkAllCollect: {[1 + 1]. [2 + 2]. [3 + 3]}.
results "=> #(2 4 6)"
Semaphore
A Semaphore manages a fixed number of permits, allowing you to limit
concurrent access to a resource pool. Semaphore new creates a binary
semaphore (1 permit). Semaphore new: n creates one with n permits.
acquire-- blocking, waits for a permitrelease-- returns a permittryAcquire-- non-blocking, returns true/falsecritical:-- acquire, evaluate block, release (preferred)available-- number of available permitscapacity-- total permits
Semaphore new capacity
(Semaphore new: 3) capacity
(Semaphore new: 3) available
Semaphore new tryAcquire
s := Semaphore new.
s tryAcquire.
s tryAcquire
(Semaphore new: 2) critical: [42]
s := Semaphore new: 1.
s critical: ['done'].
s available
"Rate-limiting: at most 3 concurrent workers"
sem := Semaphore new: 3.
wg := WaitGroup new.
1 to: 20 do: [:i |
wg wrap: [sem critical: [Process sleep: 50]]
].
wg wait
CancellationContext
A CancellationContext wraps Go's context.Context to provide
cooperative cancellation and timeouts for concurrent work.
Create contexts with class-side methods:
CancellationContext background-- never cancelled, base contextCancellationContext withCancel-- cancellable contextCancellationContext withTimeout:-- auto-cancels after N milliseconds
Create child contexts from an existing context:
ctx withCancel-- child that cancels independentlyctx withTimeout:-- child with its own timeout
Cancelling a parent automatically cancels all children.
Query a context:
isCancelled/isDone-- check if cancelledhasDeadline-- check if timeout is setcancel-- cancel this contextwait-- block until cancelled
ctx := CancellationContext withCancel.
ctx isCancelled "=> false"
ctx cancel.
ctx isCancelled "=> true"
ctx := CancellationContext withTimeout: 100.
ctx hasDeadline "=> true"
ctx isCancelled "=> false (initially)"
Process sleep: 200.
ctx isCancelled "=> true (timed out)"
"Child context cancelled when parent is cancelled"
parent := CancellationContext withCancel.
child := parent withCancel.
parent cancel.
child isCancelled "=> true"
Forking With Context
Use forkWithContext: to pass a CancellationContext to a forked
process. The context is delivered as the block's first argument.
The process should periodically check isCancelled and exit
gracefully when cancellation is detected.
This pattern lets you build long-running workers that respond to external shutdown signals or timeouts.
ctx := CancellationContext withTimeout: 500.
p := [:c |
count := 0.
[c isCancelled not] whileTrue: [
count := count + 1.
Process sleep: 10
].
count
] forkWithContext: ctx.
result := p wait.
result "=> ~50 (iterations before timeout)"
"Cancelling a worker externally"
ctx := CancellationContext withCancel.
p := [:c |
[c isCancelled not] whileTrue: [Process sleep: 10].
'stopped'
] forkWithContext: ctx.
Process sleep: 100.
ctx cancel.
p wait "=> 'stopped'"
Process Restrictions
Forked processes can be sandboxed by hiding certain global names.
Use forkRestricted: with an Array of class name strings to create
a process that cannot see those globals -- they resolve to nil
instead of signaling an error.
Restrictions are inherited by child forks: if a restricted process forks again, the child inherits all parent restrictions.
Writes to globals from a restricted process go to a process-local overlay and do not affect the shared VM state.
"Restrict a process from seeing File and HTTP"
p := [File] forkRestricted: #('File' 'HTTP').
p wait "=> nil (File is hidden)"
"Class-side equivalent"
p := Process forkWithout: #('File' 'HTTP') do: [File].
p wait "=> nil"
"Restrictions are inherited by child forks"
p := [
inner := [File] fork.
inner wait
] forkRestricted: #('File').
p wait "=> nil (inner fork also restricted)"
Process Mailboxes
Every process has a bounded mailbox for receiving messages from other processes. This is an alternative to channels for actor-style messaging where each process has its own inbox.
Sending messages:
- aProcess send: value -- fire-and-forget, wraps value in a
MailboxMessage with no selector.
- aProcess send: #selector with: value -- sends a MailboxMessage
with the given selector and payload.
Both return true if delivered, false if the process is dead or
its mailbox is full (default capacity: 1024).
Receiving messages:
Process receive-- blocking receive, returns a MailboxMessage.Process receive: ms-- blocking with timeout, returns nil on timeout.Process tryReceive-- non-blocking, returns nil if empty.
A MailboxMessage has three accessors: sender (the sending process
or nil), selector (a Symbol or nil), and payload (the value).
"Simple send and receive between two processes"
p := [
msg := Process receive.
msg payload * 2
] fork.
Process sleep: 10.
p send: 21.
p wait "=> 42"
"Selector-based messaging"
p := [
msg := Process receive.
msg selector "=> #greet"
] fork.
Process sleep: 10.
p send: #greet with: 'hello'.
p wait
"Non-blocking receive returns nil when empty"
p := [Process tryReceive] fork.
p wait "=> nil"
"Timeout receive returns nil when no message arrives"
p := [Process receive: 50] fork.
p wait "=> nil"
Registered Process Names
Processes can register themselves under a string name for discovery by other processes. This avoids passing process references through channels.
- aProcess registerAs: 'name' -- register (fails if name taken by
live process)
- Process named: 'name' -- look up by name, returns process or nil
- aProcess unregister -- remove registration
Names are automatically cleaned up when the process dies.
"Register and look up a process by name"
worker := [
Process current registerAs: 'adder'.
msg := Process receive.
msg payload + 10
] fork.
Process sleep: 20.
found := Process named: 'adder'.
found send: 32.
worker wait "=> 42"
Process Links
Links create bidirectional crash propagation between processes. When a linked process terminates abnormally, the other process is also killed -- unless it is trapping exits.
Normal exits do not propagate through links. Only crashes and explicit kills trigger propagation.
"Link two processes -- if worker crashes, supervisor knows"
worker := [
Process sleep: 100.
1 / 0 "crash!"
] fork.
Process current trapExit: true.
Process current link: worker.
msg := Process receive: 500.
msg isNil
ifTrue: ['No crash detected']
ifFalse: [msg selector] "=> #exit"
trapExit: true converts kill signals into mailbox messages
with selector #exit, allowing a supervisor process to detect
and handle child crashes without dying itself.
Process Monitors
Monitors are unidirectional: the watcher is notified when the
watched process dies, but is not killed. The notification arrives
as a MailboxMessage with selector #processDown: and a
4-element array payload: #(refID processValue reason result).
If the monitored process is already dead when the monitor is established, the DOWN message is delivered immediately.
"Monitor a short-lived worker"
worker := [42] fork.
ref := Process current monitor: worker.
worker wait.
msg := Process tryReceive.
msg isNil
ifTrue: ['No notification']
ifFalse: [msg selector] "=> #processDown:"
Use demonitor: with the ref ID to cancel a monitor before
the watched process dies.
Monitors also work across nodes -- see Guide13 (Distribution) for cross-node monitors with automatic failure detection.
Pattern: Producer-Consumer
The producer-consumer pattern uses a channel as a bounded buffer between a process that generates values and a process that consumes them. The producer sends values to the channel and closes it when done. The consumer receives values until the channel is closed.
This decouples production rate from consumption rate -- the buffered channel absorbs temporary mismatches.
ch := Channel new: 10.
"Producer: generate squares, then close"
[
1 to: 10 do: [:i | ch send: i * i].
ch close
] fork.
"Consumer: collect results until channel closes"
results := Array new: 10.
0 to: 9 do: [:i |
results at: i put: ch receive
].
results "=> #(1 4 9 16 25 36 49 64 81 100)"
"Multi-producer, single consumer"
ch := Channel new: 20.
wg := WaitGroup new.
1 to: 4 do: [:worker |
wg wrap: [
1 to: 5 do: [:i | ch send: worker * 100 + i]
]
].
"Close channel after all producers finish"
[wg wait. ch close] fork.
"Collect all 20 results"
results := Array new: 20.
0 to: 19 do: [:i | results at: i put: ch receive].
results size "=> 20"
Pattern: Fan-Out / Fan-In
Fan-out distributes work across multiple concurrent processes. Fan-in collects results back into a single channel or collection.
The typical structure is: 1. Create a results channel 2. Fork N workers, each sending its result to the channel 3. Use a WaitGroup to know when all workers are done 4. Close the results channel after wait completes 5. Drain the channel to collect results
"Fan-out: distribute work across 5 processes"
results := Channel new: 5.
wg := WaitGroup new.
1 to: 5 do: [:i |
wg wrap: [results send: i * i]
].
wg wait.
results close.
"Fan-in: collect all results"
sum := 0.
[results isClosed not or: [results isEmpty not]]
whileTrue: [
v := results tryReceive.
v isNil ifFalse: [sum := sum + v]
].
sum "=> 55 (1+4+9+16+25)"
WaitGroup forkAllCollect: provides a convenient shorthand for
the fan-out/fan-in pattern when you want ordered results:
wg := WaitGroup new.
results := wg forkAllCollect: {
[10 * 10].
[20 * 20].
[30 * 30]
}.
results "=> #(100 400 900)"
Pattern: Timeout with Select
Combine CancellationContext withTimeout: with Channel select: to
implement operations that give up after a deadline. The context's
doneChannel closes when the timeout fires, which can be used as a
case in a select.
"Race a slow operation against a timeout"
ch := Channel new: 1.
ctx := CancellationContext withTimeout: 100.
[Process sleep: 500. ch send: 'result'] fork.
result := Channel select: {
ch onReceive: [:v | v].
ctx doneChannel onReceive: [:v | 'timed out']
}.
result "=> 'timed out'"
"Fast operation beats the timeout"
ch := Channel new: 1.
ctx := CancellationContext withTimeout: 1000.
[ch send: 'fast result'] fork.
result := Channel select: {
ch onReceive: [:v | v].
ctx doneChannel onReceive: [:v | 'timed out']
}.
result "=> 'fast result'"
Putting It Together
Here is a realistic example that combines channels, processes, a wait group, a mutex, and a semaphore to build a rate-limited concurrent pipeline.
"Rate-limited pipeline: fetch URLs with at most 3 concurrent workers"
urls := #('url-1' 'url-2' 'url-3' 'url-4' 'url-5'
'url-6' 'url-7' 'url-8' 'url-9' 'url-10').
results := Channel new: 10.
sem := Semaphore new: 3.
wg := WaitGroup new.
urls do: [:url |
wg wrap: [
sem critical: [
"Simulate work"
Process sleep: 50.
results send: 'done: ', url
]
]
].
"Close results channel after all workers finish"
[wg wait. results close] fork.
"Collect results"
count := 0.
[results isClosed not or: [results isEmpty not]]
whileTrue: [
v := results tryReceive.
v isNil ifFalse: [count := count + 1]
].
count "=> 10"
The key primitives and their roles:
- Process (fork, wait) -- parallel execution
- Channel (send:, receive, select:) -- communication
- Mutex (critical:) -- exclusive access to shared state
- WaitGroup (wrap:, wait) -- barrier synchronization
- Semaphore (critical:) -- rate limiting / resource pools
- CancellationContext (withTimeout:, cancel) -- cooperative shutdown
These primitives compose naturally through message passing, giving you the building blocks for any concurrent architecture.