java 8 stream tutorial



Java 8 Stream, obtenir la tête et la queue (6)

Java 8 a introduit une classe Stream qui ressemble à celle de Scala, une puissante construction paresseuse permettant de faire quelque chose de très concis:

def from(n: Int): Stream[Int] = n #:: from(n+1)

def sieve(s: Stream[Int]): Stream[Int] = {
  s.head #:: sieve(s.tail filter (_ % s.head != 0))
}

val primes = sieve(from(2))

primes takeWhile(_ < 1000) print  // prints all primes less than 1000

Je me demandais s'il était possible de faire cela dans Java 8, alors j'ai écrit quelque chose comme ceci:

IntStream from(int n) {
    return IntStream.iterate(n, m -> m + 1);
}

IntStream sieve(IntStream s) {
    int head = s.findFirst().getAsInt();
    return IntStream.concat(IntStream.of(head), sieve(s.skip(1).filter(n -> n % head != 0)));
}

IntStream primes = sieve(from(2));

Assez simple, mais cela produit java.lang.IllegalStateException: stream has already been operated upon or closed car findFirst() et skip() sont des opérations de terminal sur Stream qui ne peuvent être effectuées qu'une seule fois.

Je n'ai pas vraiment besoin d'utiliser le flux deux fois car tout ce dont j'ai besoin est le premier numéro du flux et le reste comme un autre flux, c'est-à-dire l'équivalent de Stream.head et de Stream.tail de Scala. Existe-t-il une méthode dans Java 8 Stream que je peux utiliser pour y parvenir?

Merci.

https://src-bin.com


Answer #1

La solution ci-dessous ne fait pas de mutations d'état, à l'exception de la déconstruction tête / queue du flux.

Le lazyness est obtenu en utilisant IntStream.iterate. La classe Prime est utilisée pour conserver l'état du générateur

    import java.util.PrimitiveIterator;
    import java.util.stream.IntStream;
    import java.util.stream.Stream;

    public class Prime {
        private final IntStream candidates;
        private final int current;

        private Prime(int current, IntStream candidates)
        {
            this.current = current;
            this.candidates = candidates;
        }

        private Prime next()
        {
            PrimitiveIterator.OfInt it = candidates.filter(n -> n % current != 0).iterator();

            int head = it.next();
            IntStream tail = IntStream.generate(it::next);

            return new Prime(head, tail);
        }

        public static Stream<Integer> stream() {
            IntStream possiblePrimes = IntStream.iterate(3, i -> i + 1);

            return Stream.iterate(new Prime(2, possiblePrimes), Prime::next)
                         .map(p -> p.current);
        }
    }

L’utilisation serait la suivante:

Stream<Integer> first10Primes = Prime.stream().limit(10)

Answer #2

Même si vous ne rencontrez pas le problème que vous ne pouvez pas diviser un IntStream , votre code ne fonctionne pas car vous appelez votre méthode sieve manière récursive plutôt que paresseuse. Vous avez donc eu une récursivité infinie avant de pouvoir interroger votre flux résultant pour la première valeur.

Diviser un IntStream s en une tête et une queue IntStream (qui n'a pas encore été consommée) est possible:

PrimitiveIterator.OfInt it = s.iterator();
int head = it.nextInt();
IntStream tail = IntStream.generate(it::next).filter(i -> i % head != 0);

A cet endroit, vous avez besoin d'une construction de invoquer le sieve sur la queue paresseusement. Stream ne fournit pas cela; concat attend des instances de flux existantes comme arguments et vous ne pouvez pas construire un flux invoquant sieve paresseusement avec une expression lambda car la création paresseuse ne fonctionne qu'avec un état mutable que les expressions lambda ne prennent pas en charge. Si vous ne disposez pas d'une implémentation de bibliothèque masquant l'état mutable, vous devez utiliser un objet mutable. Mais une fois que vous acceptez la condition d'état mutable, la solution peut être encore plus simple que votre première approche:

IntStream primes = from(2).filter(i -> p.test(i)).peek(i -> p = p.and(v -> v % i != 0));

IntPredicate p = x -> true;

IntStream from(int n)
{
  return IntStream.iterate(n, m -> m + 1);
}

Cela créera récursivement un filtre mais au final, peu importe si vous créez une arborescence d' IntPredicate s ou une arborescence d' IntStream s (comme avec votre approche IntStream.concat si cela fonctionnait). Si vous n'aimez pas le champ d'instance mutable pour le filtre, vous pouvez le cacher dans une classe interne (mais pas dans une expression lambda ...).


Answer #3

Pour avoir la tête à tête, il vous faut une implémentation de Lazy Stream. Les flux Java 8 ou RxJava ne sont pas adaptés.

Vous pouvez utiliser par exemple LazySeq comme suit.

La séquence paresseuse est toujours parcourue depuis le début en utilisant une décomposition premier / reste très bon marché (tête () et queue ())

LazySeq implémente l'interface java.util.List et peut donc être utilisé à différents endroits. En outre, il implémente également des améliorations de Java 8 pour les collections, à savoir les flux et les collecteurs.

package com.company;

import com.nurkiewicz.lazyseq.LazySeq;

public class Main {

    public static void main(String[] args) {

        LazySeq<Integer> ints = integers(2);
        LazySeq primes = sieve(ints);
        primes.take(10).forEach(p -> System.out.println(p));

    }

    private static LazySeq<Integer> sieve(LazySeq<Integer> s) {
        return LazySeq.cons(s.head(), () -> sieve(s.filter(x -> x % s.head() != 0)));
    }

    private static LazySeq<Integer> integers(int from) {
        return LazySeq.cons(from, () -> integers(from + 1));
    }

}

Answer #4

Si cela ne vous dérange pas d'utiliser des bibliothèques tierces, cyclops-streams , la bibliothèque que j'ai écrite contient un certain nombre de solutions potentielles.

La classe StreamUtils possède un grand nombre de méthodes statiques permettant de travailler directement avec java.util.stream.Streams y compris headAndTail .

HeadAndTail<Integer> headAndTail = StreamUtils.headAndTail(Stream.of(1,2,3,4));
int head = headAndTail.head(); //1
Stream<Integer> tail = headAndTail.tail(); //Stream[2,3,4]

La classe Streamable représente un Stream rejouable et fonctionne en construisant une structure de données intermédiaire de mise en cache paresseuse. Parce qu'il est mis en cache et remboursable - la tête et la queue peuvent être implémentées directement et séparément.

Streamable<Integer> replayable=  Streamable.fromStream(Stream.of(1,2,3,4));
int head = repayable.head(); //1
Stream<Integer> tail = replayable.tail(); //Stream[2,3,4]

cyclops-streams fournit également une extension de Stream séquentielle qui, à son tour, étend jOOλ et possède à la fois des solutions basées sur Tuple (à partir de jOOλ) et des solutions de domaine (HeadAndTail) pour l'extraction de la tête et de la queue.

SequenceM.of(1,2,3,4)
         .splitAtHead(); //Tuple[1,SequenceM[2,3,4]

SequenceM.of(1,2,3,4)
         .headAndTail();

Mise à jour par requête de Tagir -> Une version Java du tamis Scala utilisant SequenceM

public void sieveTest(){
    sieve(SequenceM.range(2, 1_000)).forEach(System.out::println);
}

SequenceM<Integer> sieve(SequenceM<Integer> s){

    return s.headAndTailOptional().map(ht ->SequenceM.of(ht.head())
                            .appendStream(sieve(ht.tail().filter(n -> n % ht.head() != 0))))
                    .orElse(SequenceM.of());
}

Et une autre version via Streamable

public void sieveTest2(){
    sieve(Streamable.range(2, 1_000)).forEach(System.out::println);
}

Streamable<Integer> sieve(Streamable<Integer> s){

    return s.size()==0? Streamable.of() : Streamable.of(s.head())
                                                    .appendStreamable(sieve(s.tail()
                                                                    .filter(n -> n % s.head() != 0)));
}

Note - ni Streamable de SequenceM ont une implémentation vide - par conséquent la vérification de taille pour Streamable et l'utilisation de headAndTailOptional .

Enfin une version utilisant plain java.util.stream.Stream

import static com.aol.cyclops.streams.StreamUtils.headAndTailOptional;

public void sieveTest(){
    sieve(IntStream.range(2, 1_000).boxed()).forEach(System.out::println);
}

Stream<Integer> sieve(Stream<Integer> s){

    return headAndTailOptional(s).map(ht ->Stream.concat(Stream.of(ht.head())
                            ,sieve(ht.tail().filter(n -> n % ht.head() != 0))))
                    .orElse(Stream.of());
}

Une autre mise à jour - un itératif paresseux basé sur la version de @ Holger utilisant des objets plutôt que des primitives (notez qu'une version primitive est également possible)

  final Mutable<Predicate<Integer>> predicate = Mutable.of(x->true);
  SequenceM.iterate(2, n->n+1)
           .filter(i->predicate.get().test(i))
           .peek(i->predicate.mutate(p-> p.and(v -> v%i!=0)))
           .limit(100000)
           .forEach(System.out::println);

Answer #5

Voici une autre recette utilisant la manière suggérée par Holger. Il utilise RxJava juste pour ajouter la possibilité d'utiliser la méthode take (int) et bien d'autres.

package com.company;

import rx.Observable;

import java.util.function.IntPredicate;
import java.util.stream.IntStream;

public class Main {

    public static void main(String[] args) {

        final IntPredicate[] p={(x)->true};
        IntStream primesStream=IntStream.iterate(2,n->n+1).filter(i -> p[0].test(i)).peek(i->p[0]=p[0].and(v->v%i!=0)   );

        Observable primes = Observable.from(()->primesStream.iterator());

        primes.take(10).forEach((x) -> System.out.println(x.toString()));


    }

}

Answer #6

Vous pouvez essentiellement l'implémenter comme ceci:

static <T> Tuple2<Optional<T>, Seq<T>> splitAtHead(Stream<T> stream) {
    Iterator<T> it = stream.iterator();
    return tuple(it.hasNext() ? Optional.of(it.next()) : Optional.empty(), seq(it));
}

Dans l'exemple ci-dessus, Tuple2 et Seq sont des types empruntés à jOOλ , une bibliothèque que nous avons développée pour les tests d'intégration jOOQ . Si vous ne voulez pas de dépendances supplémentaires, vous pouvez aussi les implémenter vous-même:

class Tuple2<T1, T2> {
    final T1 v1;
    final T2 v2;

    Tuple2(T1 v1, T2 v2) {
        this.v1 = v1;
        this.v2 = v2;
    }

    static <T1, T2> Tuple2<T1, T2> tuple(T1 v1, T2 v2) {
        return new Tuple<>(v1, v2);
    }
}

static <T> Tuple2<Optional<T>, Stream<T>> splitAtHead(Stream<T> stream) {
    Iterator<T> it = stream.iterator();
    return tuple(
        it.hasNext() ? Optional.of(it.next()) : Optional.empty,
        StreamSupport.stream(Spliterators.spliteratorUnknownSize(
            it, Spliterator.ORDERED
        ), false)
    );
}




java-stream