1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
| import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; @Slf4j public class ThreadUtil { private static final int THREAD_POOL_SIZE = 5;
public static <T> void execute(String name, List<T> list, int groupSize, IRunHelper<T> helper) { ExecutorService executorService = null; try { List<List<T>> split = splitList(list, groupSize); int size = split.size(); log.info("Count:{}", size); CountDownLatch countDownLatch = new CountDownLatch(size); executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE); Iterator<List<T>> iterator = split.iterator(); boolean hasNext = iterator.hasNext(); while (hasNext) { List<T> next = iterator.next(); executorService.submit(new Runner<T>(name, next, countDownLatch, helper)); hasNext = iterator.hasNext(); } countDownLatch.await(); log.info("{} Complete!", name); } catch (InterruptedException e) { log.error(e.getMessage(), e); } finally { if (!(null == executorService || executorService.isShutdown())) { executorService.shutdown(); } } } private static <T> List<List<T>> splitList(List<T> list, int groupSize) { int size = list.size(); int limit = (size + groupSize - 1) / groupSize; return Stream.iterate(0, n -> n + 1).limit(limit).parallel() .map(a -> list.stream().skip(a * groupSize).limit(groupSize).parallel().collect(Collectors.toList())) .collect(Collectors.toList()); }
public static interface IRunHelper<T> {
void operate(String name, List<T> list); } static class Runner<T> implements Runnable { private String name; private List<T> list; private CountDownLatch countDownLatch; private IRunHelper<T> helper; public Runner(String name, List<T> list, CountDownLatch countDownLatch, IRunHelper<T> helper) { this.name = name; this.list = list; this.countDownLatch = countDownLatch; this.helper = helper; } @Override public void run() { try { Thread currentThread = Thread.currentThread(); String currentName = String.format("%s_%s_%s", this.name, currentThread.getName(), currentThread.getId()); this.helper.operate(currentName, this.list); } catch (Exception e) { log.error(e.getMessage(), e); } finally { this.countDownLatch.countDown(); } } } public static void main(String[] args) { int i = 0; int groupSize = 4; while (i < 5) { List<String> list = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O"); ThreadUtil.execute("Test" + "-" + i, list, groupSize, new IRunHelper<String>() { @Override public void operate(String name, List<String> list) { log.info("Thread:{} Size:{} Info:{}", name, list.size(), String.join(",", list.toArray(new String[0]))); } }); i++; } } }
|