/* * Copyright (c) 2006-2020 North Concepts Inc. All rights reserved. * Proprietary and Confidential. Use is subject to license terms. * * https://northconcepts.com/data-pipeline/licensing/ */ package com.northconcepts.datapipeline.examples.cookbook; import java.util.concurrent.TimeUnit; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.group.CloseWindowStrategy; import com.northconcepts.datapipeline.group.GroupByReader; import com.northconcepts.datapipeline.job.Job; public class GroupRecordsByTimePeriodOrCount { private static final int MAX_ORDERS = 10; private static final long MAX_TRANSACTIONS = 200; private static final int RECORD_DELAY_MILLISECONDS = 250; public static void main(String[] args) { DataReader reader = new FakeMessageQueueReader(MAX_ORDERS, MAX_TRANSACTIONS, RECORD_DELAY_MILLISECONDS); // group records by "order_id" and release them every 10 seconds even if no new records are received reader = new GroupByReader(reader, "order_id") .count("count_of_transactions") .sum("amount") .setCloseWindowStrategy(CloseWindowStrategy.limitedTime(TimeUnit.SECONDS.toMillis(10))) ; DataWriter writer = StreamWriter.newSystemOutWriter(); Job job = Job.run(reader, writer); System.out.println(job.getRecordsTransferred() + " - " + job.getRunningTimeAsString()); } //================================================== public static class FakeMessageQueueReader extends DataReader { private final int maxOrders; private final long maxTransactions; private long nextTransactionId; private final long recordDelay; public FakeMessageQueueReader(int maxOrders, long maxTransactions, long recordDelay) { this.maxOrders = maxOrders; this.maxTransactions = maxTransactions; this.recordDelay = recordDelay; } @Override protected Record readImpl() throws Throwable { if (nextTransactionId >= maxTransactions) { return null; } if (recordDelay > 0) { Thread.sleep(recordDelay); } Record record = new Record(); record.setField("transaction_id", nextTransactionId++); record.setField("order_id", "order" + nextTransactionId % maxOrders); record.setField("amount", nextTransactionId + 0.01); return record; } } }