Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native pipe #2450

Merged
merged 20 commits into from
Nov 16, 2016
Merged

Native pipe #2450

merged 20 commits into from
Nov 16, 2016

Conversation

vors
Copy link
Collaborator

@vors vors commented Oct 11, 2016

Fix #559
Improve pipeline for native commands

  • Start native process in Prepare() instead of Complete()
    in NativeCommandProcessor.
  • Remove unneeded input, output, error threads. Replaced by
    simpler primitives.

Motivation example:

ping | %{powershell-code} | grep now works in real-time.
See the test

TODO before merging:

@msftclas
Copy link

Hi @vors, I'm your friendly neighborhood Microsoft Pull Request Bot (You can call me MSBOT). Thanks for your contribution!


It looks like you're a Microsoft contributor (Sergei Vorobev). If you're full-time, we DON'T require a Contribution License Agreement. If you are a vendor, please DO sign the electronic Contribution License Agreement. It will take 2 minutes and there's no faxing! https://cla.microsoft.com.

TTYL, MSBOT;

@daxian-dbw
Copy link
Member

@vors could you please resolve the Travis CI failures?

@vors
Copy link
Collaborator Author

vors commented Oct 19, 2016

@daxian-dbw working on it.
There are two parts: PSDSC.psm1 need to be updated: it uses sort instead of sort-object in one place and worked by accident #2520. The second part is about handing native executable failures because it still should not produce broken pipe error.

@vors vors force-pushed the native-pipe branch 2 times, most recently from 4eb4d29 to c1f90e8 Compare October 22, 2016 02:11
@vors
Copy link
Collaborator Author

vors commented Oct 22, 2016

🎉 @daxian-dbw ready for review

@@ -140,6 +142,8 @@ internal ProcessOutputObject(object data, MinishellStream stream)
/// </summary>
internal class NativeCommandProcessor : CommandProcessorBase
{
private static string XmlCliTag = "#< CLIXML";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to make it a const variable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

_exceptionToRethrow = e;

} // try
catch (PipelineStoppedException)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we shouldn't catch this exception since it's just re-thrown.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By design #Closed

{
bool isFirstOutput = true;
bool isXmlCliOutput = false;
_nativeProcess.OutputDataReceived += new DataReceivedEventHandler((sender, e) =>
Copy link
Member

@daxian-dbw daxian-dbw Oct 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By using DataReceivedEventHandler, you won't be able to pipe byte streams between native commands, as e.Data is already a string encoded with the default encoding.
In order to pipe streams between native commands, we need to have a separate thread reading from the redirected stream of the upstream command and writing to the downstream command -- something similar to the current ProcessOutputReader, but with the ability to read/write bytes directly.

Here is my 2 cents: keep the ProcessOutputReader and ProcessStreamReader, but re-design them to make it also able to transmit byte stream, and also get rid of the PipelineWriter (if a concurrent queue is sufficient enough I think BlockingCollection<T> is a better choice).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you are describing is #1908 and it's outside of the scope of this PR. I'd like to solve it with creating already piped processes and avoid expensive manual marshaling from stdout of one to stdin of another.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I know #1908 is not covered by this PR. My point is that we would still need a thread worker to transfer byte streams between native commands, so probably we should keep the ProcessOutputReader implementation there for now just in case that we need to reuse some of that code.
And for the same reason, using OutputDataReceived and ErrorDataReceived to collect process output/error may not be appropriate since we probably have to change it again using a thread worker when addressing #1908

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vors found that Stream.CopyToAsync is a perfect solution for this scenario - the async mechanism in CLR saves use the effort to own the threading implementation. Given that, the ProcessOutputReader will no longer be needed. #Closed

/// </returns>
private bool ConsumeAvailableNativeProcessOutput()
{
bool isNewData = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it should be hasNewData?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this var

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

/// <returns>
/// True if there was any new input available, otherwise false.
/// </returns>
private bool ConsumeAvailableNativeProcessOutput()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of returning bool from this method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a left-over from previous experiments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

if (this.Command.Context.CurrentPipelineStopping)
{
this.StopProcessing();
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the return doesn't seem necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. I moved it to another place.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

@@ -1104,7 +1364,7 @@ private bool IsDownstreamOutDefault(Pipe downstreamPipe)
/// <param name="redirectInput"></param>
private void CalculateIORedirection(out bool redirectOutput, out bool redirectError, out bool redirectInput)
{
redirectInput = true;
redirectInput = this.Command.MyInvocation.PipelineLength > 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The native command may be the first command in the pipeline, and it doesn't need input in that case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, it should be 'PipelinePosition'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

@@ -1352,31 +1609,46 @@ internal ProcessInputWriter(InternalCommand command)
{
Dbg.Assert(command != null, "Caller should validate the parameter");
_command = command;

_pipeline = ScriptBlock.Create("Out-String -Stream").GetSteppablePipeline();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will -Stream be a breaking change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? Out-String returns a single string, while Out-String -Stream returns an string array.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. Ok, so I would need to double check this. Here is my original thinking:

Before we had all data upfront, so we can use just Out-String. Now we get new data sporadically and should still output something, even when data is incomplete. -Stream seemed like a right switch.

After your comments, I revisited this thinking and it could be indeed incorrect. I think data would still be available, there would be no aggregation. For example, lets take a look at

$h = @{1 = 2}; $h; sleep 1; $h | Out-String | %{$_}

When I run it, I get the first $h immediately.

Also -Stream adds overhead, because there are more objects. So I think you are correct. Thank you for spotting the problem!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UPD: wait, I just realised my example was wrong :)

Here is the correct one:

$h = @{1 = 2}; 1..2 | % {$h; sleep 1} | Out-String

Out-String waits for all output before outputing anything.
Where

$h = @{1 = 2}; 1..2 | % {$h; sleep 1} | Out-String -Stream

Output everything as soon as available.
For the line-ends, I would not worry. I will double check that behaviour is the same on win and unix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. #Closed

{
return _inputList.Count;
string line = PSObject.ToStringParser(_command.Context, item);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks you are not differentiating Text input from Xml input, but you are differentiating them when processing the output of native command. Is this a oversight?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible, but I found that serizliation is handled by the code in src\Microsoft.PowerShell.ConsoleHost\host\msh\Serialization.cs. I think serialization path here was not used at all. I'd love to see proven wrong, if you can find a case, where it was used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know whether it's used or not. But I would like to see some analysis data if you believe it's an affordable breaking change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XmlInput has been brought back in the new iteration.
#Closed

@@ -1352,31 +1609,46 @@ internal ProcessInputWriter(InternalCommand command)
{
Dbg.Assert(command != null, "Caller should validate the parameter");
_command = command;

_pipeline = ScriptBlock.Create("Out-String -Stream").GetSteppablePipeline();
_pipeline.Begin(true);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With your change to ProcessInputWriter, we won't be able to use the same InputWriter type to handle byte stream transfers between native commands -- we will need a thread to read byte output from upstream native command and write to the downstream command.
My opinion is to keep the "use-separate-thread" module of ProcessInputWriter, but again use a BlockingCollection between Add and the thread that does the actual write operation. It's essentially a consumer/provider problem.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing approach was spinning a thread and managing its lifecycle. It's pretty wasteful and unnecessary complicated. One of the my goal is to avoid a "medium" between two native processes, while implementing #1908. It would be a simple redirection without marshaling. So this would be a completely different code path. If we will try to reuse existing approach with ProcessInputWriter and enhance it for bytes, powershell would never be able to get closer to bash in native | native case by it's performance. We need to strip away the overhead of marshaling objects and just do the stream redirection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With Stream.CopyToAsync to relay outputs between processes, we won't need our own threading implementation. #Closed

@vors
Copy link
Collaborator Author

vors commented Oct 27, 2016

I will update PR with the notify/wait code

}
_inputThread = new Thread(new ThreadStart(this.WriterThreadProc));
_inputThread.Start();
_inputFormat = inputFormat;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_inputFormat was used for processing Text or Xml input. If it's proven that we can ignore Xml input, then we need to clean up ProcessInputWriter to remove this member, including the parameter of Start method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the case, when we use this code:

1,2,3 | powershell -noprofile -command { $input }

Resurrecting it now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

// we allow call Done() multiply times.
// For example one time from Process() code path,
// when we detect that process already finished
// and once from End() code path.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Process() and End() is kinda confusing at first glance. Can you please change Process() to ProcessRecord() and End() to Complete()?
And please add one more comment explains that "even though Done could be called multiple times, the calls are on the same thread, so there is no race condition here".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

WriteXmlInput();
// on unix, if process is already finished attempt to dispose it will
// lead to "Broken pipe" exception.
// we are ignoring it here
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: may be we can avoid this exception by using the overload StreamWriter(Stream stream, Encoding encoding, int bufferSize, bool leaveOpen);. The writer will not try to dispose the stream when leaveOpen = true. Let the process object handle closing the stream.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dig into that and as far as I see, that will hang the process: there is no way to tell it that all input was present and close the stream, if process expects input.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. #Closed

if (isFirstOutput)
{
isFirstOutput = false;
if (e.Data == XmlCliTag)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's recommended to a culture overload Equals method to compare strings.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

{
lineContents.Append(bufferContents[row, column].Character);
if (e.Data == XmlCliTag)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

{
get
Array formattedObjects = _pipeline.Process(input);
foreach (var item in formattedObjects)
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add if (_stopping) { return } in this loop?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should not. That would increase code complexity (and time) for common path and save some time for uncommon path. Once all objects are formatted (in most cases there is only 1), it's pretty fast to write them to the process. We should not optimize uncommon paths, if it hurst common path perf.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

@@ -0,0 +1,50 @@
# Minishell is a powershell concept.
# It's primare use-case is when somebody executes a scriptblock in the new powershell process.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean Its primary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You missed the typo primare.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

{
bool isFirstOutput = true;
bool isXmlCliOutput = false;
_nativeProcess.OutputDataReceived += new DataReceivedEventHandler((sender, e) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we encapsulate the event handlers and related variables/methods/objects into a class? I'm thinking about something like the following:

...
var handler = new ProcessOutputHandler(true, true);
_nativeProcess.OutputDataReceived += new DataReceivedEventHandler(handler.OutputHandler);
_nativeProcess.ErrorDataReceived += new DataReceivedEventHandler(handler.ErrorHandler);
...

public class ProcessOutputHandler
{
    public ProcessOutputHandler (bool redirectOutput, bool redirectError)
    {
        Debug.Assert(redirectOutput || redirectError, "Caller should redirect at least one stream");
        _blockingQueue = new BlockingCollection<Tuple<string, int>>(new ConcurrentQueue<Tuple<string, int>>());

        if (redirectOutput) { _refCount ++; }
        if (redirectError) { _refCount ++; }
    }

    private int _refCount = 0;
    private bool _isFirstOutput = true, _isFirstError = true;
    private bool _isOutputXml = false, _isErrorXml = false;
    private BlockingCollection<Tuple<string, int>> _blockingQueue;

    public BlockingCollection<Tuple<string, int>> BlockingQueue
    {
        get { return _blockingQueue; }
    }

    private List<ProcessOutputObject> DeserializeCliXmlObject(string xml, bool isOutput)
    {
        // ...
    }

    public void OutputHandler(object sendingProcess, DataReceivedEventArgs outLine)
    {
        if (outLine.Data == null)
        {
            // StandardOutput is closed
            if (Interlocked.Decrement(ref _refCount) == 0)
            {
                _blockingQueue.CompleteAdding();
            }
        }
        else 
        {
            // Process output and add to _blockingQueue ...
        }
    }

    public void ErrorHandler(object sendingProcess, DataReceivedEventArgs errorLine)
    {
        if (errorLine.Data == null)
        {
            // ErrorOutput is closed
            if (Interlocked.Decrement(ref _refCount) == 0)
            {
                _blockingQueue.CompleteAdding();
            }
        }
        else
        {
            // Process error and add to _blockingQueue ...
        }
    }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

@vors
Copy link
Collaborator Author

vors commented Nov 8, 2016

@daxian-dbw I removed spin-lock and employed BlockedCollection as you suggested.

}
}

private class ProcessOutputHandler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please move this class outside of NativeCommandProcessor? My arguments:

  1. Nested class usually worsens readability of the code.
  2. It's essentially the counterpart of ProcessInputWriter, and thus I think it should be a non-nested internal class similar as ProcessInputWriter

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

}
}

private void OutputHandler(object sender, DataReceivedEventArgs e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment: I always encourage people to use descriptive/meaningful variable names. For example, the parameter e here and in ErrorHandler below could be a bit more descriptive, like outputReceived and errorReceived.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

private bool _isFirstError;
private bool _isXmlCliOutput;
private bool _isXmlCliError;
private string _path;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment: again about variable name. _processFileName would be better I think 😄

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

//Wait for input writer to finish.
_inputWriter.Done();
// collection is empty or exception been raised
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if (blocking) block is basically the same as following:

if (blocking)
{
    // if adding was completed and the queue is empty, there is no need to do a blocking Take(),
    if (!_nativeProcessOutputQueue.IsCompleted)
    {
         try
         {
             return _nativeProcessOutputQueue.Take();
         }
         catch (InvalidOperationException)
         {
             // It's a normal situation: another thread can mark collection as CompleteAdding
             // in a concurrent way and we will rise an exception in Take().
             // Although it's a normal situation it's not the most common path
             // and will be executed only on the race condtion case.
         }
    }
    // collection is empty and adding completed, or exception been raised
    return null;
}

Copy link
Collaborator Author

@vors vors Nov 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, cool. I was lazy to read about IsCompleted :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a conversation with @lzybkr about perf here.
Leaving a common path outside of try {} will have a slight positive upside. I think it's worth to leave it as is, I will add a comment with explanation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bear in mind that this has nothing to do with try/catch blocks: you only incur the cost when the actual exception is thrown. You can use as many try/catch blocks as you want. Using exceptions gratuitously is where you lose performance.

This is from a MSDN article Performance Tips and Tricks in .NET Applications. It indicates that try/catch block itself doesn't incur any performance overhead, as long as no exception is thrown.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

//Wait for the process to exit and consume available output
while (!_nativeProcess.HasExited)
{
ConsumeAvailableNativeProcessOutput(blocking: true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be within while (!_nativeProcess.HasExited). It's possible that the process quickly writes a lot output and exits, but the output has been captured in our _nativeProcessOutputQueue, and we can still read all those output and process them even if the process is gone. Otherwise, we might miss some some output/error because we stop reading the queue when the process is gone.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! It's a left-over from the previous spin-lock approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

StringBuilder bufferText = new StringBuilder();
// read all the available output one more time
_nativeProcess.WaitForExit();
ConsumeAvailableNativeProcessOutput(blocking: true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ConsumeAvailableNativeProcessOutput(blocking: true); won't be necessary. One blocking reading of the queue is guaranteed to read all output/error out of it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeap

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

Sergei Vorobev added 5 commits November 15, 2016 14:49
@vors
Copy link
Collaborator Author

vors commented Nov 16, 2016

Ready for review!

// nothing to do
}
else if (line.Equals("#< CLIXML", StringComparison.Ordinal) == false)
// we allow call Done() multiply times.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done() should be Dispose() here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

_processOutput = new ObjectStream(128);
if (_stopping || _streamWriter == null)
{
// if _streamWriter is already null, then we already called Done()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done() should be Dispose() here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

}
AddObjectToWriter(obj, stream);
_xmlSerializer.Done();
Dispose();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Dispose() is unnecessary as it's called in the end of this method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, good catch! I didn't remove it from here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

// if the downstream process already exited.
// to Prevent it, we first finalize the pipeline and set it to null,
// then calling the result processing for the last time
AddTextInputFromFormattedArray(finalResults);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

@daxian-dbw
Copy link
Member

daxian-dbw commented Nov 16, 2016

I don't see your updates on the duplicate _nativeProcessOutputQueue.Take(). I still think it's unnecessary duplication of code. #Closed

{
if (blocking)
{
if (!_nativeProcessOutputQueue.IsCompleted)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please keep the old comment // if adding was completed and the queue is empty, there is no need to do a blocking Take()? It will help reader to understand why we check IsCompleted which is a great optimization to avoid unnecessary exception from Take().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#Closed

Copy link
Member

@daxian-dbw daxian-dbw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you for getting this work done!

@daxian-dbw daxian-dbw merged commit 8010117 into PowerShell:master Nov 16, 2016
@iSazonov
Copy link
Collaborator

Great! 🌟

@joeyaiello joeyaiello mentioned this pull request Jan 25, 2017
@vors vors deleted the native-pipe branch February 9, 2017 05:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants