📖 API Reference
Complete reference for all TaskRunna classes and interfaces.
Core Classes
BatchJobProcessor
The main entry point for batch processing jobs.
class BatchJobProcessor<T>(
iterator: BaseBatchIterator<T>,
submitJob: (T) -> ListenableFuture<*>,
onSuccess: (T, Any) -> Unit = { _, _ -> },
onFailure: (T, Throwable) -> Unit = { _, _ -> },
logger: KLogger,
metrics: BatchMetrics = NoOpBatchMetrics.INSTANCE,
jobName: String = "batch_job"
)
Parameters
Parameter | Type | Description |
---|---|---|
iterator |
BaseBatchIterator<T> |
Iterator that loads data in batches |
submitJob |
(T) -> ListenableFuture<*> |
Function that processes each item asynchronously |
onSuccess |
(T, Any) -> Unit |
Callback invoked when an item is processed successfully |
onFailure |
(T, Throwable) -> Unit |
Callback invoked when an item fails to process |
logger |
KLogger |
Logger instance for job execution logging |
metrics |
BatchMetrics |
Metrics collector (optional, defaults to no-op) |
jobName |
String |
Name for the job (used in logging and metrics) |
Methods
run(): Unit
Executes the batch job, processing all available items.
processor.run()
Behavior:
- Loads data in batches using the iterator
- Submits each item for async processing
- Waits for all items in a batch to complete before loading the next batch
- Calls success/failure callbacks appropriately
- Logs progress and statistics
- Records metrics if enabled
Example:
val processor = BatchJobProcessor(
iterator = OrderIterator(),
submitJob = ::processOrder,
onSuccess = { order, result -> logger.info { "Processed $order" } },
onFailure = { order, error -> logger.error(error) { "Failed $order" } },
logger = logger
)
processor.run() // Processes all orders
BaseBatchIterator
Abstract base class for implementing batch data iteration with cursor-based pagination.
abstract class BaseBatchIterator<T> {
abstract fun loadNextBatch(afterCursor: String, batchSize: Int): List<T>
abstract fun extractCursorFrom(item: T): String
open val defaultBatchSize: Int = 20
}
Abstract Methods
loadNextBatch(afterCursor: String, batchSize: Int): List<T>
Loads the next batch of items for processing.
Parameters:
afterCursor
: Cursor indicating where to start loading (empty string for first batch)batchSize
: Maximum number of items to load
Returns: List of items to process (empty list indicates no more data)
Example:
override fun loadNextBatch(afterCursor: String, batchSize: Int): List<Order> {
return orderRepository.findFailedOrders(
afterId = afterCursor.takeIf { it.isNotEmpty() },
limit = batchSize
)
}
extractCursorFrom(item: T): String
Extracts a cursor value from an item for pagination.
Parameters:
item
: The item to extract cursor from
Returns: String cursor value (typically an ID or timestamp)
Example:
override fun extractCursorFrom(item: Order): String = item.id
Properties
defaultBatchSize: Int
Default batch size when not specified. Override to customize.
override val defaultBatchSize: Int = 50
BatchJobStats
Tracks statistics during batch job execution.
class BatchJobStats(
private val logger: KLogger,
private val onComplete: (BatchJobStats) -> Unit = {}
)
Properties
Property | Type | Description |
---|---|---|
tasksSubmitted |
AtomicInteger |
Number of tasks submitted for processing |
tasksCompleted |
AtomicInteger |
Number of tasks completed (success + failure) |
tasksSucceeded |
AtomicInteger |
Number of tasks that completed successfully |
tasksFailed |
AtomicInteger |
Number of tasks that failed |
startTime |
Instant |
When the job started |
endTime |
Instant? |
When the job completed (null if still running) |
Methods
reportStats(): String
Returns a formatted string with current statistics.
val stats = processor.stats.reportStats()
// Returns: "Completed processing 100 records — 95 succeeded (95.00%) in 2.34 seconds. 5 records failed"
Metrics
BatchMetrics
Interface for collecting batch processing metrics.
interface BatchMetrics {
fun recordJobStarted(jobName: String)
fun recordJobCompleted(jobName: String, success: Boolean, duration: Duration)
fun recordTaskSubmitted(jobName: String)
fun recordTaskCompleted(jobName: String, success: Boolean, duration: Duration, errorType: String?)
fun recordBatchProcessed(jobName: String, batchSize: Int)
fun recordItemsProcessed(jobName: String, count: Int)
fun recordQueueSize(jobName: String, size: Int)
}
MicrometerBatchMetrics
Prometheus metrics implementation using Micrometer.
class MicrometerBatchMetrics(
private val meterRegistry: MeterRegistry,
private val prefix: String = "batch"
) : BatchMetrics
Usage
val registry = PrometheusMeterRegistry.builder().build()
val metrics = MicrometerBatchMetrics(registry, "order_processing")
val processor = BatchJobProcessor(
iterator = OrderIterator(),
submitJob = ::processOrder,
metrics = metrics,
jobName = "order_retry"
)
NoOpBatchMetrics
No-operation metrics implementation (zero overhead).
object NoOpBatchMetrics : BatchMetrics {
// All methods are empty implementations
}
PrometheusConfig
Utility for creating and configuring Prometheus metrics.
object PrometheusConfig {
fun createBatchMetrics(prefix: String): BatchMetrics
fun createBatchMetrics(registry: PrometheusMeterRegistry, prefix: String): BatchMetrics
}
Methods
createBatchMetrics(prefix: String): BatchMetrics
Auto-detects available dependencies and creates appropriate metrics implementation.
val metrics = PrometheusConfig.createBatchMetrics("order_processor")
// Returns MicrometerBatchMetrics if Prometheus available
// Falls back to NoOpBatchMetrics if not available
Configuration
Thread Pool Configuration
BatchJobProcessor uses an internal thread pool for async execution. The pool is automatically sized based on available processors.
Default Configuration:
- Core pool size:
Runtime.getRuntime().availableProcessors()
- Maximum pool size:
Runtime.getRuntime().availableProcessors() * 2
- Keep alive time: 60 seconds
- Queue: Unbounded LinkedBlockingQueue
Batch Size Configuration
Control batch size through your iterator implementation:
class OrderIterator : BaseBatchIterator<Order>() {
override val defaultBatchSize = 100 // Process 100 orders per batch
override fun loadNextBatch(afterCursor: String, batchSize: Int): List<Order> {
// Use the provided batchSize parameter
return repository.findOrders(afterCursor, batchSize)
}
}
Error Handling Configuration
Configure error handling through callbacks:
val processor = BatchJobProcessor(
iterator = OrderIterator(),
submitJob = ::processOrder,
onFailure = { order, error ->
when (error) {
is TimeoutException -> {
logger.warn { "Order ${order.id} timed out, will retry later" }
retryQueue.add(order)
}
is ValidationException -> {
logger.error { "Order ${order.id} has invalid data: ${error.message}" }
invalidOrderQueue.add(order)
}
else -> {
logger.error(error) { "Unexpected error processing order ${order.id}" }
deadLetterQueue.add(order)
}
}
}
)
Production Configuration
Logging Configuration
TaskRunna uses KotlinLogging. Configure your logger appropriately:
<!-- logback.xml -->
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="com.taskrunna" level="INFO"/>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Metrics Configuration
For production, configure appropriate metric collection:
val registry = PrometheusMeterRegistry.builder()
.commonTags("application", "order-service", "environment", "production")
.meterFilter(MeterFilter.deny(id -> id.getName().startsWith("jvm"))) // Exclude JVM metrics
.build()
val metrics = MicrometerBatchMetrics(registry, "order_batch")
Resource Monitoring
Monitor TaskRunna’s resource usage:
// Add JVM metrics if needed
new JvmMemoryMetrics().bindTo(registry)
new JvmGcMetrics().bindTo(registry)
new ProcessorMetrics().bindTo(registry)
new JvmThreadMetrics().bindTo(registry)
Best Practices
Iterator Implementation
- Efficient Pagination: Use indexed columns for cursor-based pagination
- Consistent Ordering: Ensure consistent ordering to avoid skipping/duplicating items
- Empty List Termination: Return empty list when no more data available
- Exception Handling: Handle database exceptions gracefully
class EfficientOrderIterator : BaseBatchIterator<Order>() {
override fun loadNextBatch(afterCursor: String, batchSize: Int): List<Order> {
return try {
orderRepository.findFailedOrdersAfter(
afterId = afterCursor.takeIf { it.isNotEmpty() }?.toLong(),
limit = batchSize
).also { orders ->
logger.debug { "Loaded ${orders.size} orders after cursor '$afterCursor'" }
}
} catch (e: Exception) {
logger.error(e) { "Failed to load orders after cursor '$afterCursor'" }
emptyList() // Fail gracefully
}
}
}
Job Processing
- Idempotent Operations: Ensure job operations are idempotent
- Timeout Handling: Set appropriate timeouts for async operations
- Resource Cleanup: Clean up resources in failure scenarios
- Graceful Degradation: Handle partial failures appropriately
fun processOrderWithTimeout(order: Order): ListenableFuture<String> {
val future = SettableFuture.create<String>()
val timeoutFuture = Executors.newSingleThreadScheduledExecutor().schedule({
future.setException(TimeoutException("Order processing timed out after 30s"))
}, 30, TimeUnit.SECONDS)
paymentService.processAsync(order).addListener({
timeoutFuture.cancel(false)
try {
val result = paymentService.processAsync(order).get()
future.set("Successfully processed order ${order.id}: $result")
} catch (e: Exception) {
future.setException(e)
}
}, MoreExecutors.directExecutor())
return future
}
Error Handling
- Categorize Errors: Handle different error types appropriately
- Retry Strategies: Implement exponential backoff for transient failures
- Dead Letter Queues: Use DLQs for permanently failed items
- Alerting: Set up appropriate alerts for error rates
Monitoring
- Set Up Dashboards: Create Grafana dashboards for key metrics
- Configure Alerts: Set up alerts for high error rates, slow processing, etc.
- SLA Monitoring: Track SLAs and SLOs
- Capacity Planning: Monitor resource usage for capacity planning
Migration Guide
From v1.0.0 to v1.1.2
The main change is package structure:
// Before (v1.0.0)
import com.taskrunna.core.BaseBatchIterator
import com.taskrunna.batch.BatchJobProcessor
// After (v1.1.2)
import com.taskrunna.batch.BaseBatchIterator // Moved to batch package
import com.taskrunna.batch.BatchJobProcessor
Dependencies also simplified:
// Before (v1.0.0)
dependencies {
implementation("com.taskrunna:taskrunna-core:1.0.0")
implementation("com.taskrunna:taskrunna-batch:1.0.0")
}
// After (v1.1.2)
dependencies {
implementation("com.taskrunna:taskrunna:1.1.2") // Single dependency
}
All functionality remains the same - just import paths changed.
Need more details? Check the source code or open an issue!