Async concurrency utilities in @fuzdev/fuz_util/async.js and
@fuzdev/fuz_util/dag.js. Controlled concurrency for file I/O, network
requests, task execution, and DAG scheduling.
Lifecycle type for tracking async operations in UI:
type AsyncStatus = 'initial' | 'pending' | 'success' | 'failure';await wait(500); // wait 500ms
await wait(); // wait 0ms (next macrotask via setTimeout)Type guard for Promise/thenable detection:
if (is_promise(value)) {
const result = await value;
}Separates promise creation from resolution — external control over when and how a promise resolves.
interface Deferred<T> {
promise: Promise<T>;
resolve: (value: T) => void;
reject: (reason: any) => void;
}Create with create_deferred():
const deferred = create_deferred<string>();
// Pass the promise to a consumer
some_async_consumer(deferred.promise);
// Resolve later from the producer
deferred.resolve('done');- Coordinating between independent async flows (e.g., DAG node dependencies) - Bridging callback-based APIs with promise-based code - Signaling completion from one context to waiters in another
Used internally by run_dag() and throttle.
Three functions for bounded concurrency over iterables. All require
concurrency >= 1 and accept an optional AbortSignal.
| Function | Returns results | Fail behavior | Use when |
| ------------------------ | --------------- | ----------------------- | ---------------------- |
| each_concurrent | No | Fail-fast | Side effects only |
| map_concurrent | Yes (ordered) | Fail-fast | Transform + collect |
| map_concurrent_settled | Yes (settled) | Collects all (no throw) | Best-effort collection |
Side effects only, no result collection:
const each_concurrent: <T>(
items: Iterable<T>,
concurrency: number,
fn: (item: T, index: number) => Promise<void> | void,
signal?: AbortSignal,
) => Promise<void>;await each_concurrent(
file_paths,
5, // max 5 concurrent deletions
async (path) => {
await unlink(path);
},
);Fail-fast: On first rejection, stops spawning new workers and rejects.
With signal, aborts immediately.
Like each_concurrent but collects results in input order:
const map_concurrent: <T, R>(
items: Iterable<T>,
concurrency: number,
fn: (item: T, index: number) => Promise<R> | R,
signal?: AbortSignal,
) => Promise<Array<R>>;const results = await map_concurrent(
file_paths,
5, // max 5 concurrent reads
async (path) => readFile(path, 'utf8'),
);
// results[i] corresponds to file_paths[i]Fail-fast: On first rejection, stops spawning and rejects. Partial results are lost.
Follows Promise.allSettled pattern — never rejects the outer promise:
const map_concurrent_settled: <T, R>(
items: Iterable<T>,
concurrency: number,
fn: (item: T, index: number) => Promise<R> | R,
signal?: AbortSignal,
) => Promise<Array<PromiseSettledResult<R>>>;const results = await map_concurrent_settled(urls, 5, fetch);
for (const [i, result] of results.entries()) {
if (result.status === 'fulfilled') {
console.log(`${urls[i]}: ${result.value.status}`);
} else {
console.error(`${urls[i]}: ${result.reason}`);
}
}Abort behavior: On abort, resolves with partial results — completed items keep their real settlements, in-flight items are rejected with abort reason. Items never pulled from the iterator are absent from the results array.
All three use the same internal pattern:
1. Maintain active_count and next_index counters
2. Spawn workers up to concurrency limit
3. On completion, decrement active_count and call run_next()
4. run_next() spawns more if slots are available
Empty iterables resolve immediately. The fn callback receives both item
and index, and may return synchronously.
Class-based concurrency limiter for more flexible control than concurrent map/each:
const semaphore = new AsyncSemaphore(3); // max 3 concurrent
async function do_work(item: string): Promise<void> {
await semaphore.acquire(); // blocks if 3 already active
try {
await process(item);
} finally {
semaphore.release(); // free the slot
}
}Constructor requires permits >= 0.
new AsyncSemaphore(Infinity) — acquire() always resolves immediately.
Useful for disabling concurrency limits without changing call sites.
- acquire(): If permits > 0, decrements and resolves. Otherwise queues.
- release(): If waiters queued, resolves next. Otherwise increments permits.
Used by run_dag() for controlling node execution concurrency.
run_dag() in @fuzdev/fuz_util/dag.js executes nodes in a dependency graph
concurrently. Nodes declare dependencies via depends_on; independent nodes
run in parallel up to max_concurrency. Uses AsyncSemaphore for concurrency
and Deferred for dependency signaling.
import {run_dag, type DagNode} from '@fuzdev/fuz_util/dag.js';
interface BuildStep extends DagNode {
command: string;
}
const result = await run_dag<BuildStep>({
nodes,
execute: async (node) => {
await run_command(node.command);
},
max_concurrency: 4,
stop_on_failure: true, // default
});
if (!result.success) {
console.error(result.error); // e.g., "2 node(s) failed"
}Sortable is from @fuzdev/fuz_util/sort.js (topological sort validation).
interface DagNode extends Sortable {
id: string;
depends_on?: Array<string>;
}interface DagOptions<T extends DagNode> {
nodes: Array<T>;
execute: (node: T) => Promise<void>;
on_error?: (node: T, error: Error) => Promise<void>;
on_skip?: (node: T, reason: string) => Promise<void>;
should_skip?: (node: T) => boolean;
max_concurrency?: number; // default: Infinity
stop_on_failure?: boolean; // default: true
skip_validation?: boolean; // default: false
}interface DagResult {
success: boolean;
results: Map<string, DagNodeResult>;
completed: number;
failed: number;
skipped: number;
duration_ms: number;
error?: string;
}interface DagNodeResult {
id: string;
status: 'completed' | 'failed' | 'skipped';
error?: string;
duration_ms: number;
}Failed dependency nodes cascade — dependents are skipped with reason
'dependency failed'.
Used by tx for pipeline execution and resource detection.
| Export | Module | Type | Purpose |
| ------------------------ | ---------- | --------- | ---------------------------------------------- |
| AsyncStatus | async.js | Type | Lifecycle status for async operations |
| wait | async.js | Function | Promise-based delay |
| is_promise | async.js | Function | Type guard for Promise/thenable |
| Deferred<T> | async.js | Interface | Promise with external resolve/reject |
| create_deferred | async.js | Function | Creates a Deferred |
| each_concurrent | async.js | Function | Concurrent side effects, fail-fast |
| map_concurrent | async.js | Function | Concurrent map with ordered results, fail-fast |
| map_concurrent_settled | async.js | Function | Concurrent map, allSettled pattern |
| AsyncSemaphore | async.js | Class | Concurrency limiter with acquire/release |
| run_dag | dag.js | Function | Concurrent DAG executor |
| DagNode | dag.js | Interface | Minimum shape for a DAG node |
| DagOptions | dag.js | Interface | Options for run_dag |
| DagResult | dag.js | Interface | Aggregated DAG execution result |
| DagNodeResult | dag.js | Interface | Per-node execution result |