Friday, November 30, 2018

In JDK 9 (and well 8) and above everything can be a stream.

In JDK 8 we finally got to use streams and everything was good apart from the times when the API you used couldn't produce a stream. Then you ended up writing a wrapper class method that allowed you to convert an iterator into a Stream, because you missed streams.

public static <T> Stream<T> asStream(Iterator<T> it) {
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it,
        Spliterator.IMMUTABLE | Spliterator.ORDERED),false);
}

Now there are methods to programmatically generate stream in the case of iterate and generate but both of these generate an infinite stream whereas in most cases you really wanted to adapt an existing interface into a finite Stream.

This was resolved nicely in JDK 9 with the introduction of a new form of iterate method that allows you to provide a predicate to signal the end of the stream.

In the examples below I am going to use a predicate that continues until you get a null entry to the stream, I will leave it up to the reader to come up with more imaginative uses for predicate. In this simple example I am using the getCause method of Throwable to move us along a linked list of errors. Note how little code this would take when compared with a pre stream version.

// Simple linked list
//
Exception e = new Exception("one");
Exception e2 = new Exception("two",e);
Exception e3 = new Exception("three", e2);

Stream.iterate(e3, Objects::nonNull, Throwable::getCause)

    // Output the messages in turn
    .map(Throwable::getMessage)
    .forEach(System.out::println);

The second example converts a ReferenceQueue into a Stream so that we can easily drain its contents for processing as required. This code is a little bit different because the container is different from the object be worked on, so we provide the seed and the next value using the same method, This returns null when the queue becomes empty.

ReferenceQueue<Thing> queue = new ReferenceQueue<>();

// Make some things and then collect them
WeakReference one = new WeakReference<Thing>(new Thing(), queue);
WeakReference two = new WeakReference<Thing>(new Thing(), queue);
System.gc(); System.gc(); System.gc(); System.gc(); System.gc();

Stream.<Reference<? extends Thing>>iterate(
    queue.poll(), Objects::nonNull, v -> queue.poll())


    .forEach(System.out::println);

The third example shows a walk over a Node tree, note the nested stream iterator to work back up the list when we have worked to the end of a leaf.

Node root = doc.getDocumentElement();

Stream.iterate(
    root,
    Objects::nonNull,
    v -> {
        if (v.getFirstChild()!=null) {
            return v.getFirstChild();
        }

        if (v.getNextSibling()!=null) {
            return v.getNextSibling();
        }

        return Stream.iterate(v, Objects::nonNull, Node::getParentNode)
            .filter(node -> node.getNextSibling()!=null)
            .map(Node::getNextSibling).findFirst().orElse(null);
    })

    .map(Node::getNodeName)
    .forEach(System.out::println);


So with a little bit of mental gymnastics it is possible to transform most legacy APIs into a nice clean Stream, so you can ignore those nasty old fashioned for loops. And if you are stuck in JDK 8 then it is quite easy to put together a similar function using the asStream from before:

public static<T> Stream<T> iterateFinite(T seed, Predicate<? super T> hasNext, UnaryOperator<T> next) {

    return asStream(new Iterator<>() {

        T current = seed;

        @Override
        public boolean hasNext() {
            return hasNext.test(current);
        }

        @Override
        public T next() {
            if (current == null) {
                throw new NoSuchElementException();
            }
            try {
                return current;
            } finally {
                current = next.apply(current);
            }
        }
    });
}


Tuesday, November 27, 2018

await in turn by mistake

I really enjoy working with the new async functions but it is really easy to set up a situation where code that could be running in parallel is force to run in sequence. Consider this simple invocation of a javascript function that takes two values returned from other async functions:

   const combination = await combine(await value(1), await value(2));

The problem here is that unless the JS environment does some optimisation for you, the actions contained in the two called to await are performed in series.

function action() {
   return new Promise(resolve => {
       setTimeout(resolve, 5000)
   })
}

async function value(v) {
   console.log('started ' + v);
   return action('broken ' + v).then(()=> {
      console.log("finished " + v);
      return "ok "  + v
   })
}

async function test() {
 console.log(await value('1') + " " + await value('2'));
}
test();

This will result in an output that look something like this, and will take about ten seconds:

(index):39 started 1
(index):39 finished 1
(index):37 started 2
(index):39 finished 2
(index):44 ok 1 ok 2

So there are a number of way to re-write this code in order to ensure that the original processes are all started at the same time, I think the second is the best form, just wish they had put some syntactic sugar so we could do away with Promise.all references.

async function test() {
  // Worst
  const three = value('3');
  const four = value('4');
  console.log(await three + " " + await four);

  // Better
  const [five, six] = await Promise.all([value('5'), value('6')]; 
  console.log(five + " " + six));

  // Better?
  console.log(...await Promise.all([value('7'), value('8')]))
}
test();

So you would expect a test output to be similar to this, with each block taking about the minimum 5 seconds.

(index):37 started 3
(index):37 started 4
(index):39 finished 3
(index):39 finished 4
(index):47 ok 3 ok 4
(index):37 started 5
(index):37 started 6
(index):39 finished 5
(index):39 finished 6
(index):48 ok 5 ok 6
(index):37 started 7
(index):37 started 8
(index):39 finished 7
(index):39 finished 8
(index):48 ok 7 ok 8

It is really easy to make the same mistake when writing a for loop for example by waiting on each item in turn

async function test() {

  const list = ['1', '2', '3'];
  const result = [];
  for (const item of list) {
     result.push(await value(item));
  }

  console.log(result);
}

For most operation that involve Promise and loop you will normally need map/Promise.all at some point. This version should complete in around the minimum 5 seconds.

async function test() {

  const list = ['1', '2', '3'];
  const result = await Promise.all(list.map(value));

  console.log(result);
}