Split-Job 0.92

Here is an update to the Split-Job function. Based in part on some of the comments on the previous version, I made the following changes:

  1. The format for the scriptblock has changed; this was done to make it more straightforward to specify parameters for those commands/scripts that accept pipeline input. If you need a foreach (%) you will have to include that in the command line. Examples:

    “Server1″,”Server2″,”Server3″ | Split-Job { c:test.ps1 -Force }

    “Server1″,”Server2″,”Server3″ | Split-Job { % {Get-WmiObject Win32_ComputerSystem -ComputerName $_}}

  2. You can now import your profile, variables and/or aliases into the runspaces. This is somewhat of an experiment; I am not convinced this is even a good idea. Please give me your feedback if you think this is useful.
  3. Each runspace will have its current directory ($PWD) set to that of the main runspace.

There is also some error handling code to make the script more robust.

Enjoy!

Arnoud

#requires -version 1.0
################################################################################
## Run commands in multiple concurrent pipelines
##   by Arnoud Jansveld - www.jansveld.net/powershell
## Version History
## 0.92   Add UseProfile switch: imports the PS profile into each runspace
##        Add Variable parameter: imports variable(s) into each runspace
##        Add Alias parameter: imports alias(es)
##        Restart pipeline if it stops due to an error
##        Set the current path in each runspace to that of the calling process
## 0.91   Revert to v 0.8 input syntax for the script block
##        Add error handling for empty input queue
## 0.9    Add logic to distinguish between scriptblocks and cmdlets or scripts:
##        if a ScriptBlock is specified, a foreach {} wrapper is added
## 0.8    Adds a progress bar
## 0.7    Stop adding runspaces if the queue is already empty
## 0.6    First version. Inspired by Gaurhoth's New-TaskPool script
################################################################################

function Split-Job (
    $Scriptblock = $(throw 'You must specify a command or script block!'),
    [int]$MaxPipelines=10,
    [switch]$UseProfile,
    [string[]]$Variable,
    [string[]]$Alias

) {
    # Create the shared thread-safe queue and fill it with the input objects
    $Queue = [Collections.Queue]::Synchronized([Collections.Queue]@($Input))
    $QueueLength = $Queue.Count
    if ($MaxPipelines -gt $QueueLength) {$MaxPipelines = $QueueLength}
    # Set up the script to be run by each runspace
    $Script  = "Set-Location '$PWD'; "
    $Script += '$Queue = $($Input); '
    $Script += '& {trap {continue}; while ($Queue.Count) {$Queue.Dequeue()}} |'
    $Script += $Scriptblock

    # Create an array to keep track of the set of pipelines
    $Pipelines = New-Object System.Collections.ArrayList

    function Add-Pipeline {
        # This creates a new runspace and starts an asynchronous pipeline with our script.
        # It will automatically start processing objects from the shared queue.
        $Runspace = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace($Host)
        $Runspace.Open()
        # Optionally import profile, variables and aliases from the main runspace
        if ($UseProfile) {
            $Pipeline = $Runspace.CreatePipeline(". '$PROFILE'")
            $Pipeline.Invoke()
            $Pipeline.Dispose()
        }
        if ($Variable) {
            Get-Variable $Variable -Scope 2 | foreach {
                trap {continue}
                $Runspace.SessionStateProxy.SetVariable($_.Name, $_.Value)
            }
        }
        if ($Alias) {
            $Pipeline = $Runspace.CreatePipeline({$Input | Set-Alias -value {$_.Definition}})
            $Null = $Pipeline.Input.Write((Get-Alias $Alias -Scope 2), $True)
            $Pipeline.Input.Close()
            $Pipeline.Invoke()
            $Pipeline.Dispose()
        }
        $Pipeline = $Runspace.CreatePipeline($Script)
        $Null = $Pipeline.Input.Write($Queue)
        $Pipeline.Input.Close()
        $Pipeline.InvokeAsync()
        $Null = $Pipelines.Add($Pipeline)
    }

    function Remove-Pipeline ($Pipeline) {
        # Remove a pipeline and runspace when it is done
        $Pipeline.RunSpace.Close()
        $Pipeline.Dispose()
        $Pipelines.Remove($Pipeline)
    }

    # Start the pipelines
    while ($Pipelines.Count -lt $MaxPipelines -and $Queue.Count) {Add-Pipeline} 

    # Loop through the pipelines and pass their output to the pipeline until they are finished
    while ($Pipelines.Count) {
        Write-Progress 'Split-Job' "Queues: $($Pipelines.Count)" `
            -PercentComplete (100 - [Int]($Queue.Count)/$QueueLength*100)
        foreach ($Pipeline in (New-Object System.Collections.ArrayList(,$Pipelines))) {
            if ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline ) {
                $Pipeline.Output.NonBlockingRead()
                $Pipeline.Error.NonBlockingRead() | Write-Error
            } else {
                if ($Pipeline.PipelineStateInfo.State -eq 'Failed') {
                    Write-Error $Pipeline.PipelineStateInfo.Reason
                    # Start a new runspace, unless there was a syntax error in the scriptblock
                    if ($Queue.Count -lt $QueueLength) {Add-Pipeline}
                }
                Remove-Pipeline $Pipeline
            }
        }
        Start-Sleep -Milliseconds 100
    }
}

5 thoughts on “Split-Job 0.92

  1. Very nice update. It fixed 3 problems I had (empty queues was one of them). I’ll look at the other stuff and see if I like it.

    I did one other change to give a better indication of progress. I changed the write-progess part to include a little more information. It gives me a better idea of the number of items in the Queue. That way you see if it’s slow because the scriptblock is slow or slow because it has 2000 objects to process.

    Write-Progress ‘Split-Job’ `
    “Queues: $($Pipelines.Count) QueueLength: $($QueueLength) Completed: $($QueueLength-$Queue.Count- $Pipelines.count) Pending: $($QueueLength- ($QueueLength-$Queue.Count))” `
    -PercentComplete (100 – [Int]($Queue.Count)/$QueueLength*100)

    Thanks for the work you’ve done on this. It makes Powershell much faster when you have a lot of systems to work on.

  2. Thanks for putting this thing together. It’s saving me time and makes Powershell more enjoyable.

    It turns out I don’t really use the UseProfile option, but I do use the Variable option. I bumped into some other things that made it so I had to add things to my scriptblock that were already in my environment.

    One of them was snapins. I was trying to use some cmdlets from the Powershell Community Extensions and I had to add the snapin manually since it didn’t exist in the runspace by default.

    The other was a function. I was changing a script to use Split-Job and it had a function declared in the script. Since there was no way of accessing that function inside the Split-Job scriptblock, I had to create the function in the scriptblock.

    I think it would be great if it were possible to add more functionality like the $Variable parameter for the other environmental objects. One for Function, Snapin, and Environment in addition to the 2 you already have (Variable and Alias). This would almost totally eliminate the need for $UseProfile and still have it fast to load if you don’t need anything special.

  3. Hi, I am trying to do this :
    $param2 = “Something”
    “Server1″,”Server2″,”Server3″ | Split-Job { MyFunction -Param2 $param2}
    Where param1 in Myfunction is the server that accepts ppipeline, but the $param2 is not going to the Myfunction.
    How can I solve this ?

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>