sql >> Database >  >> RDS >> Database

Wat zijn sequentiële versus parallelle streams in Java?

Java kan stream-bewerkingen parallelliseren om multi-coresystemen te benutten. Dit artikel biedt een perspectief en laat zien hoe parallelle stream de prestaties kan verbeteren met geschikte voorbeelden.

Streamen in Java

Een stream in Java is een reeks objecten weergegeven als een kanaal van gegevens. Het heeft meestal een bron waar de gegevens zich bevinden en een bestemming waar het wordt uitgezonden. Merk op dat een stream geen repository is; in plaats daarvan werkt het op een gegevensbron, zoals een array of een verzameling. De tussenliggende bits in de passage worden eigenlijk de stroom genoemd. Tijdens het transmissieproces doorloopt de stream meestal een of meer mogelijke transformaties, zoals filteren of sorteren, of het kan een ander proces zijn dat op de gegevens werkt. Dit past de originele gegevens aan in een andere vorm, meestal volgens de behoefte van de programmeur. Daarom wordt er een nieuwe stream gemaakt op basis van de bewerking die erop wordt toegepast. Wanneer een stream bijvoorbeeld wordt gesorteerd, resulteert dit in een nieuwe stream die een resultaat oplevert dat vervolgens wordt gesorteerd. Dit betekent dat de nieuwe gegevens een getransformeerde kopie van het origineel zijn in plaats van in de oorspronkelijke vorm.

Sequentiële stream

Elke streambewerking in Java, tenzij expliciet gespecificeerd als parallel, wordt sequentieel verwerkt. Het zijn in feite niet-parallelle stromen die een enkele draad gebruiken om hun pijplijn te verwerken. Sequentiële streams maken nooit gebruik van het multicore-systeem, zelfs niet als het onderliggende systeem parallelle uitvoering ondersteunt. Wat gebeurt er bijvoorbeeld als we multithreading toepassen om de stream te verwerken? Zelfs dan werkt het op één kern tegelijk. Het kan echter van de ene kern naar de andere springen, tenzij expliciet vastgemaakt aan een specifieke kern. De verwerking in vier verschillende threads versus vier verschillende kernen is bijvoorbeeld duidelijk anders wanneer de eerste niet overeenkomt met de laatste. Het is heel goed mogelijk om meerdere threads in een enkele kernomgeving uit te voeren, maar parallelle verwerking is een heel ander genre. Een programma moet van de grond af worden ontworpen voor parallel programmeren, behalve dat het wordt uitgevoerd in een omgeving die het ondersteunt. Dit is de reden waarom parallel programmeren een complexe arena is.

Laten we een voorbeeld proberen om het idee verder te illustreren.

package org.mano.example;

import java.util.Arrays;
import java.util.List;

public class Main2 {
   public static oid main(String[] args) {
      List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9);
      list.stream().forEach(System.out::println);
      System.out.println();
      list.parallelStream().forEach(System.out::println);
   }
}

Uitvoer

123456789
685973214

Dit voorbeeld is een illustratie van q sequentiële stroom en q parallelle stroom in werking. De list.stream() werkt in volgorde op een enkele thread met de println() operatie. list.parallelStream() , wordt daarentegen parallel verwerkt, waarbij optimaal gebruik wordt gemaakt van de onderliggende multicore-omgeving. Het interessante aspect zit in de uitvoer van het voorgaande programma. Bij een sequentiële stream wordt de inhoud van de lijst in een geordende volgorde afgedrukt. De uitvoer van de parallelle stroom is daarentegen ongeordend en de volgorde verandert elke keer dat het programma wordt uitgevoerd. Dit betekent in ieder geval één ding:het aanroepen van de list.parallelStream() methode maakt de println statement werken in meerdere threads, iets wat list.stream() doet in een enkele thread.

Parallelle stream

De primaire motivatie achter het gebruik van een parallelle stream is om streamverwerking een onderdeel van de parallelle programmering te maken, zelfs als het hele programma mogelijk niet wordt geparalleliseerd. Parallelle stream maakt gebruik van multicore-processors, wat resulteert in een aanzienlijke prestatieverbetering. In tegenstelling tot alle parallelle programmering, zijn ze complex en foutgevoelig. De Java-streambibliotheek biedt echter de mogelijkheid om dit gemakkelijk en op een betrouwbare manier te doen. Het hele programma mag niet worden geparalleliseerd. maar in ieder geval kan het deel dat de stroom afhandelt, worden geparallelliseerd. Ze zijn eigenlijk vrij eenvoudig in die zin dat we een paar methoden kunnen aanroepen en de rest is geregeld. Er zijn een aantal manieren om het te doen. Eén zo'n manier is om een ​​parallelle stroom te verkrijgen door de parallelStream() aan te roepen methode gedefinieerd door Collectie . Een andere manier is om de parallel() . aan te roepen methode gedefinieerd door BaseStream op een opeenvolgende stroom. De opeenvolgende stroom wordt geparalleliseerd door de aanroep. Merk op dat het onderliggende platform parallel programmeren moet ondersteunen, zoals bij een multicore-systeem. Anders heeft de aanroep geen zin. De stream zou in een dergelijk geval in volgorde worden verwerkt, zelfs als we de aanroep hebben gedaan. Als de aanroep wordt gedaan op een reeds parallelle stream, doet het niets en retourneert het gewoon de stream.

Om ervoor te zorgen dat het resultaat van parallelle verwerking toegepast op stream hetzelfde is als verkregen door sequentiële verwerking, moeten parallelle stromen stateloos, niet-storend en associatief zijn.

Een snel voorbeeld

package org.mano.example;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class Main {

   public static void main(String[] args) {
      List<Employee> employees = Arrays.asList(
         new Employee(1276, "FFF",2000.00),
         new Employee(7865, "AAA",1200.00),
         new Employee(4975, "DDD",3000.00),
         new Employee(4499, "CCC",1500.00),
         new Employee(9937, "GGG",2800.00),
         new Employee(5634, "HHH",1100.00),
         new Employee(9276, "BBB",3200.00),
         new Employee(6852, "EEE",3400.00));

      System.out.println("Original List");
      printList(employees);

      // Using sequential stream
      long start = System.currentTimeMillis();
      List<Employee> sortedItems = employees.stream()
         .sorted(Comparator
            .comparing(Employee::getName))
         .collect(Collectors.toList());
      long end = System.currentTimeMillis();

      System.out.println("sorted using sequential stream");
      printList(sortedItems);
      System.out.println("Total the time taken process :"
         + (end - start) + " milisec.");

      // Using parallel stream
      start = System.currentTimeMillis();
      List<Employee> anotherSortedItems = employees
         .parallelStream().sorted(Comparator
            .comparing(Employee::getName))
         .collect(Collectors.toList());
      end = System.currentTimeMillis();

      System.out.println("sorted using parallel stream");
      printList(anotherSortedItems);
      System.out.println("Total the time taken process :"
         + (end - start) + " milisec.");


      double totsal=employees.parallelStream()
         .map(e->e.getSalary())
         .reduce(0.00,(a1,a2)->a1+a2);
      System.out.println("Total Salary expense: "+totsal);
      Optional<Employee> maxSal=employees.parallelStream()
         .reduce((Employee e1, Employee e2)->
         e1.getSalary()<e2.getSalary()?e2:e1);
      if(maxSal.isPresent())
         System.out.println(maxSal.get().toString());
   }

   public static void printList(List<Employee> list) {
      for (Employee e : list)
         System.out.println(e.toString());
   }
}


package org.mano.example;

public class Employee {
   private int empid;
   private String name;
   private double salary;

   public Employee() {
      super();
   }

   public Employee(int empid, String name,
         double salary) {
      super();
      this.empid = empid;
      this.name = name;
      this.salary = salary;
   }

   public int getEmpid() {
      return empid;
   }

   public void setEmpid(int empid) {
      this.empid = empid;
   }

   public String getName() {
      return name;
   }

   public void setName(String name) {
      this.name = name;
   }

   public double getSalary() {
      return salary;
   }

   public void setSalary(double salary) {
      this.salary = salary;
   }

   @Override
   public String toString() {
      return "Employee [empid=" + empid + ", name="
         + name + ", salary=" + salary + "]";
   }
}

Merk in de vorige code op hoe we sorteren op een stream hebben toegepast door sequentiële uitvoering te gebruiken.

List<Employee> sortedItems = employees.stream()
               .sorted(Comparator
               .comparing(Employee::getName))
               .collect(Collectors.toList());

en parallelle uitvoering wordt bereikt door de code iets te wijzigen.

List<Employee> anotherSortedItems = employees
               .parallelStream().sorted(Comparator
               .comparing(Employee::getName))
               .collect(Collectors.toList());

We vergelijken ook de systeemtijd om een ​​idee te krijgen welk deel van de code meer tijd kost. Parallelle werking begint zodra de parallelle stream expliciet is verkregen door de parallelStream() methode. Er is nog een andere interessante methode, genaamd reduce() . Wanneer we deze methode toepassen op een parallelle stream, kan de bewerking in verschillende threads plaatsvinden.

We kunnen echter altijd schakelen tussen parallel en sequentieel, afhankelijk van de behoefte. Als we de parallelle stream in sequentieel willen veranderen, kunnen we dit doen door de sequentiële() aan te roepen methode gespecificeerd door BaseStream . Zoals we in ons eerste programma zagen, kan de bewerking die op de stream wordt uitgevoerd, worden geordend of ongeordend volgens de volgorde van de elementen. Dit betekent dat de volgorde afhankelijk is van de gegevensbron. Dit is echter niet het geval bij parallelle stromen. Om de prestaties te verbeteren, worden ze parallel verwerkt. Omdat dit gebeurt zonder enige volgorde, waarbij elke partitie van de stream onafhankelijk van de andere partities wordt verwerkt zonder enige coördinatie, is het gevolg onvoorspelbaar ongeordend. Maar als we specifiek een bewerking willen uitvoeren op elk element in de parallelle stroom die moet worden besteld, kunnen we de forEachOrdered() overwegen methode, wat een alternatief is voor de forEach() methode.

Conclusie

De stream-API's maken al heel lang deel uit van Java, maar het toevoegen van de aanpassing van parallelle verwerking is zeer welkom en tegelijkertijd een behoorlijk intrigerende functie. Dit geldt met name omdat moderne machines multicore zijn en er een stigma bestaat dat parallel programmeren een complex ontwerp is. De API's die door Java worden geleverd, bieden de mogelijkheid om een ​​vleugje parallelle programmeeraanpassingen op te nemen in een Java-programma dat het algemene ontwerp van sequentiële uitvoering heeft. Dit is misschien wel het beste deel van deze functie.


  1. Hoe in te voegen in een array in PostgreSQL

  2. Kolomgegevens retourneren van een gekoppelde server in SQL Server (T-SQL-voorbeelden)

  3. Voer een PostgreSQL .sql-bestand uit met behulp van opdrachtregelargumenten

  4. Waarom neemt de automatische verhoging van MySQL toe bij mislukte invoegingen?