通俗易懂的讲解Java8 ParallelStream并发安全原理( 二 )

构造内部类Container,该类的作用是一个存放输入的容器,定义了三个方法:

  • accumulate方法对输入数据进行处理并存入本地的结果
  • combine方法将其他容器的结果合并到本地的结果中
  • getResult方法返回本地的结果
Container.java:
class Container { // 定义本地的result public Set<Double> set; public Container() { this.set = new HashSet<>(); } public Container accumulate(int num) { this.set.add(compute.compute(num)); return this; } public Container combine(Container container) { this.set.addAll(container.set); return this; } public Set<Double> getResult() { return this.set; }}在Main.java中编写测试方法:
public static void main(String[] args) { Main main = new Main(); main.run(); System.out.println("原始数据:"); main.nums.forEach(i -> System.out.print(i + " ")); System.out.println("nncollect方法加工后的数据:"); main.result.forEach(i -> System.out.print(i + " "));}输出:
原始数据:0 1 2 3 4 5 6 7 8 9
collect方法加工后的数据:0.0 2.0 4.0 8.0 16.0 18.0 10.0 6.0 12.0 14.0
我们将10个整型数值的list转成了10个double类型的set,至此验证成功~
本程序参考 http://blog.csdn.net/io_field/article/details/54971555 。
一言蔽之
总结就是paralleStream里直接去修改变量是非线程安全的,但是采用collect和reduce操作就是满足线程安全的了 。
java8中parallelStream性能测试及结果分析测试1
@BenchmarkMode(Mode.AverageTime)@OutputTimeUnit(TimeUnit.NANOSECONDS)@Warmup(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)@Measurement(iterations = 20, time = 3, timeUnit = TimeUnit.SECONDS)@Fork(1)@State(Scope.Benchmark)public class StreamBenchTest {List<String> data = https://www.isolves.com/it/cxkf/yy/JAVA/2020-07-27/new ArrayList<>();@Setuppublic void init() {// preparefor(int i=0;i<100;i++){data.add(UUID.randomUUID().toString());}}@TearDownpublic void destory() {// destory}@Benchmarkpublic void benchStream(){data.stream().forEach(e -> {e.getBytes();try {Thread.sleep(10);} catch (InterruptedException e1) {e1.printStackTrace();}});}@Benchmarkpublic void benchParallelStream(){data.parallelStream().forEach(e -> {e.getBytes();try {Thread.sleep(10);} catch (InterruptedException e1) {e1.printStackTrace();}});}public static void main(String[] args) throws RunnerException {Options opt = new OptionsBuilder().include(".*" +StreamBenchTest.class.getSimpleName()+ ".*").forks(1).build();new Runner(opt).run();}}parallelStream线程数
默认是Runtime.getRuntime().availableProcessors() - 1,这里为7
运行结果
# Run complete. Total time: 00:02:44BenchmarkMode CntScoreError UnitsStreamBenchTest.benchParallelStream avgt20155868805.437 ± 1509175.840 ns/opStreamBenchTest.benchStreamavgt20 1147570372.950 ± 6138494.414 ns/op测试2
将数据data改为30,同时sleep改为100
BenchmarkMode CntScoreError UnitsStreamBenchTest.benchParallelStream avgt20414230854.631 ± 725294.455 ns/opStreamBenchTest.benchStreamavgt20 3107250608.500 ± 4805037.628 ns/op可以发现sleep越长,parallelStream优势越明显 。
小结
parallelStream在阻塞场景下优势更明显,其线程池个数默认为Runtime.getRuntime().availableProcessors() - 1,如果需修改则需设置-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
以上就是本次讲述知识点的全部内容,感谢你对码农之家的支持 。
以上就是本次给大家分享的关于java的全部知识点内容总结,感谢大家的阅读和支持 。

【通俗易懂的讲解Java8 ParallelStream并发安全原理】


推荐阅读