SoFunction
Updated on 2025-04-07

Example of JavaScript + TypeScript implementing concurrent queues

1. Preface

This article uses TypeScript and JavaScript. Some readers may not have learned TypeScript and are worried that they will not understand it. Actually, I think with TypeScript you should be easier to understand, because TypeScript is just a little cumbersome, because it just adds types to variables, but it can increase the readability and maintainability of the code, so you should be able to understand it quickly.

Install TypeScript at the end of the article.

Many things happen at the same time in life, such as: you are typing code, he is typing code, she is typing code, and I am watching you typing code. This is not concurrency but parallelism.

The biggest difference between concurrency and parallelism is whether multiple things are left to one person or multiple people. If it is handed over to one person, it is concurrent, and if it is handed over to multiple people, it is parallel. What we want to talk about here is concurrent execution, and use TypeScript and JavaScript to implement a concurrent queue.

In life, we can see concurrent queues everywhere, which is very similar to the concurrent queues to be mentioned in this article. For example, when queuing up, you can only go one by one in a ticket window, and the people behind can only wait for the people in front to buy the tickets first, and then you can buy the tickets after processing the procedures. The concurrent queue principle to be discussed in this article is very similar to this.

2. Core code analysis

If you don’t show all the code first, and after explaining the core logic clearly, other codes will only play a auxiliary role, so there is nothing difficult to understand.

I divided the core code into the following parts, from easy to difficult to explain:

  • Example of usage
  • Add a task
  • Run a task
  • Perform a task
  • Determine whether the execution is over

2.1. Execution Example

You can see that two tasks are defined and added below, both of which are outputted to the console in two seconds. However, when we create a concurrent queue, we specify that the maximum number of concurrency is 1, so we can only execute one task at a time, and the execution order of the task queue is first added and executed first.

// Callback function after all tasks are executedlet callback = (result: any) => {
    (result);
};

let concurrencyTask = new ConcurrencyTask(1, callback);

// Add a task((resolve, reject) => {
    setTimeout(() => {
        ("Execute in 2 seconds"); // Output in 2 seconds        resolve();
    }, 2000);
});

((resolve, reject) => {
    setTimeout(() => {
        ("Execute in 4 seconds"); // Output after 4 seconds        resolve();
    }, 2000);
});

(false);

2.2. Add a task

The following is the code to add a task. The added task requires a function and will receive three parameters when executed:resolverejecctargs. These three parameters are Promiseresolveandreject,andargsis a mutable parameter required for function execution. If a task is added during task queue execution, it is not allowed to join.

type Task = (resolve: Function, reject: Function, ...args: Array<any>) => any;

/**
  * Add a task to the task queue, will not be executed
  * @param task
  * @return Whether the addition is successful, if the task is in the execution stage, return false
  */
public addTask(task: Task): boolean {
    if (!()) {
        (task);
        return true;
    }
    return false;
}

2.3. Run the task

canAbortParameters indicate whether the queue can be interrupted during execution, and are called in the executed task.rejectThe function can interrupt the execution of a task. After the interruption, the task queue will be reset, cleared and unexecuted tasks, and reset other data.

The following code means to execute a task that specifies the number of maximum concurrency numbers. If the maximum concurrency number is greater than the total number of tasks, the total number of tasks is the maximum concurrency number.

/**
  * Start running the task
  * @param canAbort Is it interruptible
  * @param args Task execution parameters
  */
public run(canAbort: boolean = false, ...args: Array<any>): void {
     = canAbort;
    (true);
    let length = ;
    let maxConcurrency = ((), length);
    for (let index = 0; index < maxConcurrency; index++) {
        (args);
    }
}

2.4. Perform a task

Since task execution is asynchronous, we usePromiseCome to package the task and putresolverejectPass to the task function and let it decide when the task ends.

When a task is calledresolveWhen functioning, it will determine whether all tasks have been executed, that is, executionjudgeExecuteEndFunction, if the task is calledrejectFunctions will determine whether the task execution can be interrupted and reset the task queue. Of course, if you don’t want to reset the task queue, you can modify it in the source code, so I won’t change it here.

Then for each taskpromiseWill be saved intaskPromiseListAmong the variables, it is aPromiseArray of type.

/**
  * Perform a single task
  * @param args function execution parameters
  */
private executeSingleTask(...args: Array<any>): void {
    let promise = new Promise<void>((resolve, reject) => {
        let result = [++](resolve, reject, args);
        (result);
    });
    (() => {
        (args);
    }).catch((error) => {
        // If the task execution can be interrupted, reset the task queue        if () {
            ();
            return;
        }
        (error);
    });
    (promise);
}

2.5. Determine whether the execution is over

In the following codetaskIndexis the index of the current task.runOverA flag of whether to end the execution.

Here we judgetaskPromiseListInpromiseWhether it is all completed

/**
  * Determine whether the execution is over
  * @param args function to execute the required parameters
  */
private judgeExecuteEnd(args: Array<any>): void {
    // If all tasks are executed, and the execution does not end    // The reason for setting runOver is that the last few concurrent tasks will be executed after execution.    // Trigger this function, and the runOverCallback function should be executed only once    if ( >=  && !) {
         = true;
        let result = ;
        ().then(() => {
             && (result);
        }).catch((error) => {
            // If interrupts are not allowed, all tasks will be executed to complete the callback.            if(!) {
                 && (result);
            }
            (error);
        });
        ();
        return;
    }
    // If the execution is not finished, the next task will be executed    (args);
}

3. Source code display

Copy the following code directly totsThere will be no effect in the file because the browser cannot parsetsCode, we need to usetsThe compiler compiles it tojsAfter the code, then quotejsJust file. Install TypeScript at the end of the article.

/*
   Function description: Concurrent queue
   Created: December 17, 2023
  */

type Task = (resolve: Function, reject: Function, ...args: Array<any>) => any;
type ResultCallback = (result: Array<any>) => any;

/**
  * Concurrent task queue
  */
class ConcurrencyTask {

    /**
      * Task collection
      */
    private taskList: Array<Task>;

    /**
      * Processing results
      */
    private handleResult: Array<any>;

    /**
      * Whether the task is being performed
      */
    private running: boolean;

    /**
      * Maximum concurrency number
      */
    private maxConcurrency: number;

    /**
      * The default maximum concurrency number
      */
    private static DEFAULT_MAX_CONCURRENCY: number = 2;

    /**
      * Current task index
      */
    private taskIndex: number;

    /**
      * Use promise to wrap the task
      */
    private taskPromiseList: Array<Promise<void>>;

    /**
      * Is it possible to interrupt
      */
    private canAbort: boolean;

    /**
      * End of execution
      */
    private runOver: boolean;

    /**
      * Callback function when all tasks are executed
      */
    private runOverCallback: ResultCallback;

    /**
      * Create a concurrent task queue
      * @param maxConcurrency Maximum concurrency
      * @param runOverCallback Callback after all tasks are executed
      */
    public constructor(maxConcurrency: number = ConcurrencyTask.DEFAULT_MAX_CONCURRENCY, runOverCallback: ResultCallback) {
        (runOverCallback);
        (maxConcurrency);
        ();
    }

    private initial(): void {
         = false;
        ();
    }

    /**
      * Add a task to the task queue, will not be executed
      * @param task
      * @return Whether the addition is successful, if the task is in the execution stage, return false
      */
    public addTask(task: Task): boolean {
        if (!()) {
            (task);
            return true;
        }
        return false;
    }

    /**
      * Start running the task
      * @param canAbort Is it interruptible
      * @param args Task execution parameters
      */
    public run(canAbort: boolean = false, ...args: Array<any>): void {
         = canAbort;
        (true);
        let length = ;
        let maxConcurrency = ((), length);
        for (let index = 0; index < maxConcurrency; index++) {
            (args);
        }
    }

    /**
      * Perform a single task
      * @param args function execution parameters
      */
    private executeSingleTask(...args: Array<any>): void {
        let promise = new Promise<void>((resolve, reject) => {
            let result = [++](resolve, reject, args);
            (result);
        });
        (() => {
            (args);
        }).catch((error) => {
            // If the task execution can be interrupted, reset the task queue            if () {
                ();
                return;
            }
            (error);
        });
        (promise);
    }

    /**
      * Determine whether the execution is over
      * @param args function to execute the required parameters
      */
    private judgeExecuteEnd(args: Array<any>): void {
        // If all tasks are executed, and the execution does not end        // The reason for setting runOver is that the last few concurrent tasks will be executed after execution.        // Trigger this function, and the runOverCallback function should be executed only once        if ( >=  && !) {
             = true;
            let result = ;
            ().then(() => {
                 && (result);
            }).catch((error) => {
                if(!) {
                     && (result);
                }
                (error);
            });
            ();
            return;
        }
        (args);
    }

    private reset(): void {
         = [];
         = 0;
         = [];
         = false;
         = [];
    }

    private setRunning(running: boolean): void {
         = running;
    }

    public getRunning(): boolean {
        return ;
    }

    /**
      * Set the callback function after all tasks are executed, and if the queue is executing, it returns false
      * @param runOverCallback callback function
      */
    public setRunOverCallback(runOverCallback: ResultCallback): boolean {
        if(!()) {
             = runOverCallback;
            return true;
        }
        return false;
    }

    /**
      * Set the maximum number of concurrency, return false if execution
      * @param maxConcurrency Maximum number of concurrency, use default value when less than or equal to 0
      */
    public setMaxConcurrency(maxConcurrency: number): boolean {
        if(maxConcurrency <= 0) {
             = ConcurrencyTask.DEFAULT_MAX_CONCURRENCY;
        }
        if (!()) {
             = maxConcurrency;
            return true;
        }
        return false;
    }

    public getMaxConcurrency(): number {
        return ;
    }
}

4. Install TypeScript

Since TypeScript is running on , we also need to install it. You can go toChinese website

This is only provided for the installation method of TypeScript on Windows.

First enter cmd as an administrator (win + R, enter cmd, and then ctrl + shift + enter).

Install globally using the following command:

npm i -g typescript

Then create one in any directorytsFile, then open cmd under this folder and executetsc The compiled ja file will be obtained.

This is the end of this article about JavaScript + TypeScript's example implementation of concurrent queues. For more related JavaScript TypeScript concurrent queue content, please search for my previous articles or continue browsing the following related articles. I hope everyone will support me in the future!