Skip to content
代码片段 群组 项目
未验证 提交 dc7426f9 编辑于 作者: Arun Pandian's avatar Arun Pandian 提交者: GitHub
浏览文件

[Dataflow Streaming] fix max thread time calculation (#33686)

上级 da94e20f
No related branches found
No related tags found
无相关合并请求
......@@ -30,6 +30,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurren
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class BoundedQueueExecutor {
private final ThreadPoolExecutor executor;
private final long maximumBytesOutstanding;
......@@ -54,17 +55,17 @@ public class BoundedQueueExecutor {
private long totalTimeMaxActiveThreadsUsed;
public BoundedQueueExecutor(
int maximumPoolSize,
int initialMaximumPoolSize,
long keepAliveTime,
TimeUnit unit,
int maximumElementsOutstanding,
long maximumBytesOutstanding,
ThreadFactory threadFactory) {
this.maximumPoolSize = maximumPoolSize;
this.maximumPoolSize = initialMaximumPoolSize;
executor =
new ThreadPoolExecutor(
maximumPoolSize,
maximumPoolSize,
initialMaximumPoolSize,
initialMaximumPoolSize,
keepAliveTime,
unit,
new LinkedBlockingQueue<>(),
......
......@@ -50,6 +50,7 @@ import org.junit.runners.JUnit4;
// released (2.11.0)
@SuppressWarnings("unused")
public class BoundedQueueExecutorTest {
private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
private static final int DEFAULT_MAX_THREADS = 2;
private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
......@@ -247,7 +248,8 @@ public class BoundedQueueExecutorTest {
}
@Test
public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeUpdated() throws Exception {
public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsIncreased()
throws Exception {
CountDownLatch processStart1 = new CountDownLatch(1);
CountDownLatch processStart2 = new CountDownLatch(1);
CountDownLatch processStart3 = new CountDownLatch(1);
......@@ -287,6 +289,58 @@ public class BoundedQueueExecutorTest {
executor.shutdown();
}
@Test
public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsReduced()
throws Exception {
CountDownLatch processStart1 = new CountDownLatch(1);
CountDownLatch processStop1 = new CountDownLatch(1);
CountDownLatch processStart2 = new CountDownLatch(1);
CountDownLatch processStop2 = new CountDownLatch(1);
CountDownLatch processStart3 = new CountDownLatch(1);
CountDownLatch processStop3 = new CountDownLatch(1);
Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);
// Initial state.
assertEquals(0, executor.activeCount());
assertEquals(2, executor.getMaximumPoolSize());
// m1 is accepted.
executor.execute(m1, 1);
processStart1.await();
assertEquals(1, executor.activeCount());
assertEquals(2, executor.getMaximumPoolSize());
assertEquals(0L, executor.allThreadsActiveTime());
processStop1.countDown();
while (executor.activeCount() != 0) {
// Waiting for all threads to be ended.
Thread.sleep(200);
}
// Reduce max pool size to 1
executor.setMaximumPoolSize(1, 105);
assertEquals(0, executor.activeCount());
executor.execute(m2, 1);
processStart2.await();
Thread.sleep(100);
assertEquals(1, executor.activeCount());
assertEquals(1, executor.getMaximumPoolSize());
processStop2.countDown();
while (executor.activeCount() != 0) {
// Waiting for all threads to be ended.
Thread.sleep(200);
}
// allThreadsActiveTime() should be recorded
// since when the second task was running it reached the new max pool size.
assertThat(executor.allThreadsActiveTime(), greaterThan(0L));
executor.shutdown();
}
@Test
public void testRenderSummaryHtml() {
String expectedSummaryHtml =
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册