ShellBrowser Components Threadpool

Fully managed pool of worker threads

The Worker Threadpool in ShellBrowser Delphi

Top Features Developer Components
28.10.2022

Our Worker Threadpool provides a fully managed pool of worker threads to handle tasks or work items asynchronously. By maintaining a pool of threads instead of starting a thread for each task:

  • Performance is increased.
  • Latency is avoided in execution due to frequent creation and destruction of threads for short-lived tasks.
  • No manual thread management but concentration directed towards the actual tasks.

The number of available threads is tuned automatically to the computing resources available to the program.

We’ve used the thread pool internally for years under the hood, but with some refactoring that has recently taken place, we thought that it might be interesting outside ShellBrowser, too.

So, with the following text, we hope to give you enough insight and examples to give it a try with our ShellBrowser Delphi Components.

In ShellBrowser, to use our Worker Threadpool or run the code samples below, simply add the unit Jam.Threading to the uses clause.

Basics explained

Using ShellBrowser there is always one global Threadpool instance, that is accessible using TWorkerPool.GlobalInstance.

However, you can also create new threadpool instances.

Adding work items to any kind of threadpool is accomplished with a call to

procedure AddWorkItem(const pWorkItem: IWorkItem; pPriority: TPriority = TPriority.Normal);

 

Using the Priority argument, you can determine if the work item should be queued at the beginning (TPriority.Highest) or end (Priority.Normal) of the work items that are already waiting for being processed. Thus, this option also allows to implement a LIFO or FIFO approach in a Thread Pool.

In the first argument, you pass an instance implementing the IWorkItem interface. You can implement your own work items but for ease of use there are some predefined WorkItem types. We’ll start out with these.

Making simple method calls and anonymous procedures async

The “TAsyncProcedure” is a simple WorkItem that can take two anonymous procedures: the first one is executed on one of the threads provided by the threadpool. When the work item has finished, another procedure is called in the main thread.

This simple example illustrates how to send an email asynchronously using some TMail class:

 

procedure SendMail(pRecipient, pSubject, pBody: string);

var Mail: TMail;

begin

  Mail := TMail.Create();

  Mail.Recipient := pRecipient;

  Mail.Subject := pSubject;

  Mail.Body := pBody;

  TWorkerPool.GlobalInstance.AddWorkItem(TAsyncProcedure.Create(

    Mail.Send,

    nil,

    procedure

    begin

      UpdateUI();

      Mail.Free();

    end));

end;

 

The mail is sent in parallel via the threadpool, and when finished the UI is updated in the main thread. Note, that there is no need to free the TAsyncProcedure instance that is created. Like all work items we’ll introduce, it is ref-counted and will be cleaned up automatically when there is no reference left.

The middle “ICancellationIndicator” parameter, that is passed <nil> in the example, will be explained a little later.

Using IFuture / TFuture

A future can be used to calculate a result value asynchronously while doing other work in the current thread. Accessing the result value of a future using its Value property will block until the result was finally calculated.

In this example, we will check if a path is accessible. This usually takes only a few milliseconds, however, if the path is a network drive that is unreachable, the underlying Windows API call may block for 10-30 seconds. Doing this in the main thread can cause freezes in the user interfaces.

 

TPathExists = class(TFuture<Boolean>)

protected

  fPath: string;

  procedure DoWork(); override;

public

  constructor Create(pPath: string);

end;

 

procedure TPathExists.Create(pPath: string);

begin

  inherited;

  fPath := pPath;

end;

 

procedure TPathExists.DoWork();

begin

  fResult := FileExists(fPath) or DirectoryExists(fPath);

end;

And how to use this class:

procedure LoadFile(pPath: string);

var PathExists: IFuture<Boolean>;

begin

  PathExists := TPathExists.Create(pPath):

  TWorkerPool.GlobalInstance.AddWorkItem((IWorkItem)PathExists);

  // Prepare something else that takes a while ...

  if not PathExists.Value then

    ShowMessage('The given path does not exist: ' + pPath);

  // Load the file ...

end;

This time we have created a subclass of TFuture with all needed methods implemented.

Here is a similar example using an anonymous function again.

procedure LoadFile(pPath: string);

var lFuture: IFuture<Boolean>;

    lValid: Boolean;

begin

  lFuture := TFuture<Boolean>.Construct(

    function: Boolean begin

      Result := FileExists(pPath) or DirectoryExists(pPath);

    end, nil);

  TWorkerPool.GlobalInstance.AddWorkItem(lFuture as IWorkItem);

      

  if not (lFuture.TryGetValue(1000, lValid) = TTaskStatus.Completed) then

    lValid := false;

  if not lValid then

    ShowMessage('The given path does not exist or could not be determined in time: ' + pPath);

end;

Use the “TFuture<TResult>.Construct” method for anonymous implementations. Also, instead of potentially blocking the GUI Thread using the IFuture.GetValue method, a timeout of 1s is given via the TryGetValue method. This can be helpful if the program can safely continue with a default value. The function returns a TTaskStatus as indicator if it has completed correctly.

TFuture and TAsyncProcedure have a shortcut, that automatically uses the shared global thread pool. So, instead of the above, you may also write:

  TFuture<Boolean>.CreateAndQueue(

  function: Boolean

  begin

    Result := FileExists(pPath) or DirectoryExists(pPath);;

  end, nil);

How to implement an IWorkItem

All tasks or work-items that are added to the Worker Threadpool need to implement the IWorkItem interface. We’ve already seen a custom implementation of TFuture in the preceding example.

If neither the TAsyncProcedure nor a TFuture fit your needs, you can implement your own work item class. The easiest way is to derive from our TBasicWorkItem class. The main difference to a Future is that a work-item can fire a callback event when its work is finished.

We will now implement the previous task as IWorkItem that calls a callback at the end.

TPathExists = class(TBasicWorkItem)

protected

  fPath: string;

  procedure DoWork(); override;

public

  FileDoesExist: Boolean;

  constructor Create(pPath: string; pCallBack: TWorkDoneEvent);

end;

 

procedure TPathExists.Create(pPath: string; pCallBack: TWorkDoneEvent);

begin

  inherited;

  fPath := pPath;

  OnWorkDone := pCallBack;

end;

 

procedure TPathExists.DoWork();

begin

  FileDoesExist:= FileExists(fPath) or DirectoryExists(fPath);

end;

And how to use this class:

procedure TEditor.LoadFile(pPath: string);

begin

  TWorkerPool.GlobalInstance.AddWorkItem(TPathExists.Create(pPath, LoadExistingFile));

end;

 

procedure TEditor.LoadExistingFile(pWorkItem: IWorkItem);

begin

  if not (pWorkItem as TPathExists).FileDoesExist then

  begin

    ShowMessage('The file path does not exist: ' + (pWorkItem as TPathExists).fPath);

    exit;

  end

  // Load the file … 

end;

The callback is automatically called synchronized in the UI Thread. This can be configured but is useful in most use cases.

Note, that for simple use cases like this one, there is no good reason to go for an implemented WorkItem instead of using the anonymous TAsyncProcedure version or TFuture explained before; however, there might be more complex use-cases, that can be encapsulated nicely using a subclassed WorkItem.

Cancelling work items

In the preceding examples, we have assumed that the process is still running when a work item has finished its asynchronous work, and also, that the result is still welcome when the asynchronous part is done.

This is of course not always true. CancellationIndicators or a CancellationToken can be used, to cancel tasks that are still in the queue or that are currently being processed. No methods or callbacks are executed on cancelled work items.

By supplying an ICancellationIndicator the work of one or many work-items can be aborted in a proper way. There is only a single boolean function named IsCancellationRequested to be implemented:

TPresenter = class(TObject, ICancellationIndicator)

  fAborted: Boolean;

  function IsCancellationRequested(): Boolean;

  procedure Abort();

end;

 

function TPresenter.IsCancellationRequested(): Boolean;

begin

  exit(fAborted);

end;

 

procedure TPresenter.Abort();

begin

  fAborted := True;

end;

Alternatively, you may use our rich record type CancellationToken. It has a Cancel() method that signals cancellation and a Reset() method which resets its state for re-using it.

TDataScanner = class(TObject)

  fCancellationToken: CancellationToken;

  destructor Destroy();

  procedure Start();

end;

 

destructor TDataScanner.Destroy ();

begin

  fCancellationToken.Cancel();

  inherited;

end;

 

procedure TDataScanner.Start();

begin

  fCancellationToken.Reset();

  // Do some work and supply fCancellationToken where an ICancellationIndicator is needed

end;

Using an own Worker Threadpool

When to use an own Threadpool

Reasons for using an own threadpool can be:

  • Long lasting background work, that should be kept separate from other tasks.  As an example, the FileList in ShellBrowser uses an own threadpool for searching for files on large drives.
  • Limit the number of threads used for a certain task, maybe because a used resource cannot handle more than 1 or 2 consumers or queries anyway. This pattern is applied for the thumbnail view of the ShellList since there were issues, where delays in Windows API calls could have otherwise exhausted the main threadpool very quickly.
  • You need to know, when all work items of a certain type have been processed. There is a “OnAllWorkItemsFinished” event, that is issued after processing; however, it can hardly be used, if the threadpool is shared for unrelated workloads. This pattern is frequently applied in UltraSearch and TreeSize.

 

How to use an own Threadpool

customThreadPoolConfig := TWorkerPoolConfig.Create();

  customThreadPoolConfig.MaxRunningThreads := 2;

  customThreadPoolConfig.MinRunningThreads := 2;

  customThreadPoolConfig.OnAllWorkItemsFinished :=

    procedure(pWasCancelled: Boolean)

    begin

      OutputDebugString('Processed all workitems');

    end;

 

  fThreadPool := TWorkerPool.CreateInstance(customThreadPoolConfig);

The settings are self-explaining. The custom thread pool we have created will always run exactly two threads that can handle work items assigned to it.

 

Conclusion

A lot of the functionality and concepts introduced here will look familiar if you have worked with RAD Studio’s Parallel Programming Library.


In addition, there is support for cancellation tokens, affecting work items queued or running; a procedure that runs synchronized in the main thread after a work item has finished; configuration options for the thread pool, such as MaxCPUUsage or the control of priority for each work item.

Please don’t hesitate to give it a try and share any feedback or suggestions you have. We are happy to consider it in future releases of ShellBrowser.

The Threadpool is available with the ShellBrowser Components Package. Haven’t used ShellBrowser Delphi Components yet? Why not give the 30 days free trial a try!

See our API documentation for further help.

Blog author Sabine

Sabine

Developer Components