0%

Java 多线程并行管理工具类

摘要: 123

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ThreadUtil {

/** 线程池 线程数 */
private static final int THREAD_POOL_SIZE = 5;

/**
* 操作并行处理完成返回
*
* @param <T>
* @param name
* @param list
* @param groupSize
* @param helper
*/
public static <T> void execute(String name, List<T> list, int groupSize, IRunHelper<T> helper) {
ExecutorService executorService = null;
try {
List<List<T>> split = splitList(list, groupSize);
int size = split.size();
log.info("Count:{}", size);
// 开始操作多线程,并添加门闩
CountDownLatch countDownLatch = new CountDownLatch(size);
executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);

Iterator<List<T>> iterator = split.iterator();
boolean hasNext = iterator.hasNext();
while (hasNext) {
List<T> next = iterator.next();
executorService.submit(new Runner<T>(name, next, countDownLatch, helper));
hasNext = iterator.hasNext();
}

// 所有门闩打开后继续
countDownLatch.await();
log.info("{} Complete!", name);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
} finally {
if (!(null == executorService || executorService.isShutdown())) {
executorService.shutdown();
}
}

}

private static <T> List<List<T>> splitList(List<T> list, int groupSize) {
int size = list.size();
int limit = (size + groupSize - 1) / groupSize;
return Stream.iterate(0, n -> n + 1).limit(limit).parallel()
.map(a -> list.stream().skip(a * groupSize).limit(groupSize).parallel().collect(Collectors.toList()))
.collect(Collectors.toList());
}

/**
* 业务逻辑执行注入
*
* @author jun.chen
*
* @param <T>
*/
public static interface IRunHelper<T> {

/**
* 具体操作
*
* @return
*/
void operate(String name, List<T> list);
}

static class Runner<T> implements Runnable {

private String name;

private List<T> list;

private CountDownLatch countDownLatch;

private IRunHelper<T> helper;

public Runner(String name, List<T> list, CountDownLatch countDownLatch, IRunHelper<T> helper) {
this.name = name;
this.list = list;
this.countDownLatch = countDownLatch;
this.helper = helper;
}

@Override
public void run() {
try {
Thread currentThread = Thread.currentThread();
String currentName = String.format("%s_%s_%s", this.name, currentThread.getName(),
currentThread.getId());
this.helper.operate(currentName, this.list);
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
// 打开门闩
this.countDownLatch.countDown();
}

}
}

public static void main(String[] args) {
int i = 0;
int groupSize = 4;
while (i < 5) {
List<String> list = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o",
"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O");
ThreadUtil.execute("Test" + "-" + i, list, groupSize, new IRunHelper<String>() {

@Override
public void operate(String name, List<String> list) {
log.info("Thread:{} Size:{} Info:{}", name, list.size(),
String.join(",", list.toArray(new String[0])));
}

});

i++;
}
}

}