Cassandra DB is one of the leading NoSQL database system as it can be very easily configured in scalable cluster in bot on-prime and cloud environment. Most of the leading cloud providers now providing it as managed or serverless service. It is a column based NoSQL and provides very fast write operation.
Cassandra has drivers and a framework for almost every development platform and programming language. You can check officially supported driver pages.
In this post, I will show you how you can use the Datastax Java driver to process website access log files and push data into Cassandra DB.
Problem Statement
You have daily website access logs in a folder generated by the NGINX web server. There is more than one file and each file has thousands of rows. You need to develop a Java program to process these log files and insert records to the Cassandra DB.
This program should be multi-threaded means more than one files should be processed concurrently.
We just need to pass the folder path containing the log files and number of threads as a command-line argument to the program.
Here is the log sample line, each file contains logs in line
157.42.230.57 - - [04/Jun/2021:06:25:08 +0000] "GET /medicine/energic-31/903 HTTP/2.0" 200 64127 "https://www.google.com/" "Mozilla/5.0 (Linux; Android 10; RMX2027) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.210 Mobile Safari/537.36"
Java and Cassandra solution
If you do not have Cassandra setup then you can read here
Creating Cassandra Keyspace and Table
For Storing in Cassandra DB we need to define Keyspace and Table with columns. You can think of Cassandra Keyspace as RDS schema and Table as RDS table with rows but not fixed columns.
Using cqlsh
create keyspace named accesslogdb with replication factor 1, you change it to your requirement.
CREATE KEYSPACE accesslogdb
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
For storing logs in Keyspace table we need to define a Cassandra table with following.
CREATE TABLE accesslogdb.access_log
(id uuid PRIMARY KEY, ip varchar, time timestamp, method varchar, url varchar,
status int, length bigint, referral varchar, client varchar );
Now our Kespace and table are ready, you can verify using cqlsh.
cqlsh> select * from accesslogdb.access_log;
id | client | ip | length | method | referral | status | time | url
----+--------+----+--------+--------+----------+--------+------+-----
(0 rows)
Development of Java Program with Cassandra DB
For connecting the java program to Cassandra DB I will use Datastax Java driver and for IDE I am using IntelliJ Idea and Gradle as build system.
Create one Java and Gradle project in IntelliJ after project creation add following dependacy
dependencies {
// https://mvnrepository.com/artifact/com.datastax.oss/java-driver-core
implementation group: 'com.datastax.oss', name: 'java-driver-core', version: '4.11.1'
// https://mvnrepository.com/artifact/com.datastax.oss/java-driver-query-builder
implementation group: 'com.datastax.oss', name: 'java-driver-query-builder', version: '4.11.1'
// https://mvnrepository.com/artifact/com.datastax.oss/java-driver-mapper-runtime
implementation group: 'com.datastax.oss', name: 'java-driver-mapper-runtime', version: '4.11.1'
annotationProcessor group: 'com.datastax.oss', name: 'java-driver-mapper-processor', version: '4.11.1'
// https://mvnrepository.com/artifact/commons-io/commons-io
implementation group: 'commons-io', name: 'commons-io', version: '2.9.0'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
}
I added Datastax Java lib java-driver-core, java-driver-query-builder and java-driver-mapper-runtime. You refer datastax/java-driver: DataStax Java Driver for Apache Cassandra (github.com) for details of these libraries.
Core driver: The core module handles cluster connectivity and request execution.
Query builder: The query builder is a utility to generate CQL queries programmatically.
Mapper: The mapper generates the boilerplate to execute queries and convert the results into application-level objects.
Now we need to define one Entity class for the Cassandra DB table. I added the id field of type UUID
for the partition key in AccessLog
entity class.
@Entity
public class AccessLog {
@PartitionKey
private UUID id;
private String ip;
private Instant time;
private String method;
private String url;
private int status;
private long length;
private String referral;
private String client;
public AccessLog() {
this.id = UUID.randomUUID();
}
public UUID getId() {
return id;
}
public void setId(UUID id) {
this.id = id;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public Instant getTime() {
return time;
}
public void setTime(Instant time) {
this.time = time;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public long getLength() {
return length;
}
public void setLength(long length) {
this.length = length;
}
public String getReferral() {
return referral;
}
public void setReferral(String referral) {
this.referral = referral;
}
public String getClient() {
return client;
}
public void setClient(String client) {
this.client = client;
}
}
Now we need to define one Dao interface for operation with Cassandra. @Dao defines some set of Cassandra query methods.
@Dao
public interface AccessLogDao {
@Select
AccessLog findById(UUID id);
@Insert
void save(AccessLog log);
@Delete
void delete(AccessLog log);
}
Now we need to define a Mapper interface using @Mapper interface
@Mapper
public interface AccessLogMapper {
@DaoFactory
AccessLogDao accessLogDao(@DaoKeyspace CqlIdentifier keyspace);
}
The mapper annotation processor will generate an implementation and a builder that allows you to create an instance from a CqlSession
.
By default, the builder’s name is the name of the interface with the suffix “Builder”, and it resides in the same package. You can also use a custom name with builderName()
.
The interface should define one or more DaoFactory
methods.
Now I am going to implement one class that will implement a method to put AccessLog
object into Cassandra DB.
public interface AccessLogService extends Closeable {
void save(AccessLog log);
}
public class AccessLogCassandraServiceImpl implements AccessLogService {
private final CqlSession session = CqlSession.builder().build();
private final AccessLogMapper accessLogMapper = new AccessLogMapperBuilder(session).build();
private final AccessLogDao dao = accessLogMapper.accessLogDao(CqlIdentifier.fromCql("accesslogdb"));
public AccessLogCassandraServiceImpl() {
}
@Override
public void save(AccessLog log){
this.dao.save(log);
}
@Override
public void close() throws IOException {
this.session.close();
}
}
The above class will be used to create a Cassandra session and Dao object, which will be used to perform DB operations.
CqlSession.builder().build()
will fetch all information from application.conf
file, which should be on the program classpath. In this project, I put this file in the resources folder.
application.conf
file has the following info, you can provide additional required info in this file. contact-points has all nodes IP addresses.
datastax-java-driver {
basic {
contact-points = [ "192.168.1.22:9042", "192.168.1.23:9042" ]
load-balancing-policy.local-datacenter = dc1
}
}
You can also provide these information code directly.
CqlSession session = CqlSession.builder()
.addContactPoint(new InetSocketAddress("192.168.1.22", 9042))
.addContactPoint(new InetSocketAddress("192.168.1.23", 9042))
.withLocalDatacenter("dc1")
.build();
Log Processor class to parse files using concurrent task and store in Cassandra DB.
public interface AccessLogProcessorService {
void process();
}
public class FileAccessLogProcessorServiceImpl implements AccessLogProcessorService {
private final ExecutorService executorService;
private final List<Path> files;
private DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("dd/MMM/yyyy:HH:mm:ss");
public FileAccessLogProcessorServiceImpl( List<Path> files, int noOfThreads) {
this.files = files;
this.executorService = Executors.newFixedThreadPool(noOfThreads);
}
@Override
public void process() {
Collection<Future<?>> futures = new LinkedList<>();
for (Path file : this.files) {
ProcessTask processTask = new ProcessTask(file, new AccessLogCassandraServiceImpl());
futures.add(this.executorService.submit(processTask));
}
//Wait to complete all concurrent task.
int i =0;
for (Future<?> future : futures) {
try {
future.get();
System.out.println("file complete "+ i++);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
this.executorService.shutdownNow();
}
class ProcessTask implements Callable<Boolean> {
final Path file;
final AccessLogService accessLogService;
ProcessTask(final Path file, final AccessLogService accessLogService) {
this.file = file;
this.accessLogService = accessLogService;
}
@Override
public Boolean call() throws Exception {
File file = this.file.toFile();
LineIterator it = FileUtils.lineIterator(file, "UTF-8");
try {
while (it.hasNext()) {
String line = it.nextLine();
AccessLog log = new AccessLog();
String[] parts = line.split(" - - \\[");
if (parts.length == 2) {
log.setIp(parts[0]);
} else {
this.logError(line);
continue;
}
parts = parts[1].split(" \\+0000\\] \"");
if (parts.length == 2) {
LocalDateTime ldt = LocalDateTime.parse(parts[0], dateTimeFormatter);
log.setTime(ldt.toInstant(ZoneOffset.UTC));
} else {
this.logError(line);
continue;
}
// Parse method
parts = parts[1].split(" \\/");
if (parts.length == 2) {
log.setMethod(parts[0]);
} else {
this.logError(line);
continue;
}
parts = parts[1].split("\"" );
if (parts.length == 5) {
log.setUrl("/"+parts[0]);
log.setReferral(parts[2]);
log.setClient(parts[4]);
} else {
this.logError(line);
continue;
}
parts = parts[1].trim().split(" ");
if(parts.length == 2) {
log.setStatus(Integer.parseInt(parts[0]));
log.setLength(Long.parseLong(parts[1]));
}else{
this.logError(line);
continue;
}
this.accessLogService.save(log);
}
} finally {
it.close();
this.accessLogService.close();
}
return true;
}
private void logError(String line) {
System.out.println("Unable to parse line");
System.out.println(line);
}
}
}
In the above class, I am making one executable task for each file and one Cassandra service instance which has a Cassandra session to save long into DB.
Class to get file List path
public class FileAccessLogSourceServiceImpl implements AccessLogSourceService {
private final String logFolder;
public FileAccessLogSourceServiceImpl(final String folder) {
this.logFolder = folder;
}
@CheckForNull
@Override
public List<Path> getFiles() throws IOException {
if (Files.exists(Paths.get(this.logFolder))) {
return Files.list(Paths.get(this.logFolder)).collect(Collectors.toList());
} else {
return null;
}
}
}
Application Entry main class
public class AccessLogClientApp {
public static void main(String[] args) throws IOException {
if(args.length == 2) {
String folderPath = args[0];
int noOfThreads = Integer.parseInt(args[1]);
AccessLogSourceService accessLogSourceService = new FileAccessLogSourceServiceImpl(folderPath);
AccessLogProcessorService accessLogParserService =
new FileAccessLogProcessorServiceImpl(accessLogSourceService.getFiles(), noOfThreads);
accessLogParserService.process();
}
}
}
You can find a complete working example on GitHub. I also keep one long fine inside the resources folder. cassandra-example/accesslog-client at main · sheelprabhakar/cassandra-example (github.com)
In some other post, I will write about querying Cassandra DB.