六、JAVA户端操作Cassandra
Cassandra有众多的JAVA客户端,目前比较流程的都是不同公司开源的客户端,如:Netfix的astyanax,datastax的java-driver,hector,以及Spring Data for Apache Cassandra。
6.2 datastax的java-driver
6.2.1 介绍
https://docs.datastax.com/en/landing_page/doc/landing_page/current.html
源码地址:
在页面上可以看到使用java-driver的简单介绍,包含Maven依赖内容,环境兼容要求。
6.2.2 创建Maven工程引入依赖
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cassandra-demo</artifactId> <groupId>com.itheima</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>datastax-demo</artifactId> <dependencies> <!--cassandra 包--> <dependency> <groupId>com.datastax.cassandra</groupId> <artifactId>cassandra-driver-core</artifactId> <version>3.9.0</version> </dependency> <dependency> <groupId>com.datastax.cassandra</groupId> <artifactId>cassandra-driver-mapping</artifactId> <version>3.9.0</version> </dependency> <!--junit 测试--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies></project>6.2.3 操作键空间
package com.itheima.test;import com.datastax.oss.driver.api.core.CqlIdentifier;import com.datastax.oss.driver.api.core.CqlSession;import com.datastax.oss.driver.api.core.cql.SimpleStatement;import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;import com.datastax.oss.driver.api.querybuilder.schema.Drop;import org.junit.Before;import org.junit.Test;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.UnknownHostException;import java.util.Optional;public class TestKeySpace { CqlSession session = null; @Before public void init(){ //Cassandra服务器地址 String hosts = "192.168.137.131"; //端口 int port = 9042; try { session = CqlSession.builder().withLocalDatacenter("datacenter1").build(); } catch (Exception e) { e.printStackTrace(); } } @Test public void findKeySpace(){ Optional<CqlIdentifier> keyspace = session.getKeyspace(); if(keyspace.isPresent()){ CqlIdentifier cqlIdentifier = keyspace.get(); System.out.println("键空间名:"+cqlIdentifier.asInternal()); } } @Test public void createKeySpace(){ SimpleStatement simpleStatement = SchemaBuilder. createKeyspace("school"). ifNotExists(). withSimpleStrategy(1). build(); session.execute(simpleStatement); } @Test public void dropKeySpace(){ SimpleStatement state = SchemaBuilder.dropKeyspace("school").ifExists().build(); session.execute(state); }}6.2.4 操作表
package com.itheima.test;import com.datastax.driver.core.*;import com.datastax.driver.core.schemabuilder.SchemaBuilder;import com.datastax.driver.mapping.Mapper;import com.datastax.driver.mapping.MappingManager;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;import com.itheima.entity.Student;import org.junit.Before;import org.junit.Test;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.UnknownHostException;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;import static com.datastax.driver.core.querybuilder.QueryBuilder.select;public class TestTable { Session session = null; Mapper<Student> mapper; @Before public void init() { //Cassandra服务器地址 String hosts = "192.168.137.131"; //端口 int port = 9042; Cluster cluster = Cluster.builder(). addContactPoint(hosts). withPort(port). build(); session = cluster.connect(); } @Test public void createTable() { Statement statement = SchemaBuilder. createTable("school", "student"). addPartitionKey("id", DataType.bigint()). addColumn("address", DataType.text()). addColumn("age", DataType.cint()). addColumn("education", DataType.map(DataType.text(), DataType.text())).// addColumn("email", DataType.text()). addColumn("gender", DataType.cint()). addColumn("interest", DataType.set(DataType.text())). addColumn("phone", DataType.list(DataType.text())). addColumn("name", DataType.text()). ifNotExists(); ResultSet resultSet = session.execute(statement); System.out.println(resultSet.getExecutionInfo()); } @Test public void updateTable(){// 添加字段 SchemaBuilder.alterTable("school","student") .addColumn("email").type(DataType.text());// 修改字段 SchemaBuilder.alterTable("school","student") .alterColumn("email").type(DataType.set(DataType.text()));// 删除字段 SchemaBuilder.alterTable("school","student") .dropColumn("email"); } @Test public void dropTable(){ Statement statement = SchemaBuilder.dropTable("school","student").ifExists(); session.execute(statement); } @Test public void insertByCQL() { String insertSql = "INSERT INTO school.student (id,address,age,gender,name,interest, phone,education) VALUES (1011,'中山路21号',16,1,'李小仙',{'游泳', '跑步'},['010-88888888','13888888888'],{'小学' : '城市第一小学', '中学' : '城市第一中学'}) ;"; session.execute(insertSql); } @Test public void insertByMapper() { mapper = new MappingManager(session).mapper(Student.class); HashMap<String, String> education = new HashMap<>(); education.put("小学", "中心第五小学"); education.put("中学", "中心实验中学"); HashSet<String> interest = new HashSet<>(); interest.add("看书"); interest.add("电影"); List<String> phones = new ArrayList<>(); phones.add("020-66666666"); phones.add("13666666666");// 构造student Student student = new Student( 1012L, "北京市朝阳区100号", 20, education, "xiaoshuai@123.com", 1, interest, phones, "马小帅");// 数据保存到cassandra服务器 mapper.save(student); } @Test public void queryAll(){ mapper = new MappingManager(session).mapper(Student.class); ResultSet resultSet = session.execute(select().all().from("school","student")); List<Student> studentList = mapper.map(resultSet).all(); for (Student student : studentList) { System.out.println(student); } } @Test public void queryOne(){ mapper = new MappingManager(session).mapper(Student.class); ResultSet resultSet = session.execute(select().all().from("school","student")); Student student = mapper.map(resultSet).one(); System.out.println(student); } @Test public void queryById(){ mapper = new MappingManager(session).mapper(Student.class); ResultSet resultSet = session.execute(select().all().from("school", "student").where(eq("id", 1012L))); List<Student> studentList = mapper.map(resultSet).all(); for (Student student : studentList) { System.out.println(student); } } @Test public void delete() { mapper = new MappingManager(session).mapper(Student.class); Long id = 1011L; mapper.delete(id); }}6.2.5 Prepared statements
https://docs.datastax.com/en/developer/java-driver/3.0/manual/statements/prepared/
基本原理:
代码:
@Test public void batchPrepare(){// 先把语句预编译 BatchStatement batch = new BatchStatement(); PreparedStatement ps = session .prepare("INSERT INTO school.student (id,address,age,gender,name,interest, phone,education) VALUES (?,?,?,?,?,?,?,?)");// 循环10次,构造不同的student对象 for (int i = 0; i < 10; i++) { HashMap<String, String> education = new HashMap<>(); education.put("小学", "中心第"+i+"小学"); education.put("中学", "第"+i+"中学"); HashSet<String> interest = new HashSet<>(); interest.add("看书"); interest.add("电影"); List<String> phones = new ArrayList<>(); phones.add("0"+i+"0-66666666"); phones.add("1"+i+"666666666");// 构造student Student student = new Student( 1013L+i, "北京市朝阳区10"+i+"号", 21+i, education, "xiaoshuai@123.com", 1, interest, phones, "学生"+i); BoundStatement bs = ps.bind(student.getId(), student.getAddress(), student.getAge(), student.getGender(), student.getName(), student.getInterest(), student.getPhone(), student.getEducation()); batch.add(bs); } session.execute(batch); batch.clear(); }6.3 Spring Data Cassandra
6.3.1 介绍
https://spring.io/projects/spring-data-cassandra
Spring Data for Apache Cassandra 2.x binaries require JDK level 8.0 and later and Spring framework 5.2.7.RELEASE and later. It requires Cassandra 2.0 or later.
6.3.2 创建Maven工程
引入依赖
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cassandra-demo</artifactId> <groupId>com.itheima</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>springCassandra-demo</artifactId> <dependencies> <!--使用spring-data-cassandra 2.2.8--> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-cassandra</artifactId> <version>2.2.8.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies></project>创建配置文件
springContext.xml 配置文件
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cassandra="http://www.springframework.org/schema/data/cassandra" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/cql http://www.springframework.org/schema/cql/spring-cql-1.0.xsdhttp://www.springframework.org/schema/data/cassandra http://www.springframework.org/schema/data/cassandra/spring-cassandra-1.0.xsdhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"> <context:property-placeholder location="classpath:cassandra.properties"/> <!--cassandra的配置--> <cassandra:cluster contact-points="${cassandra.contactpoints}" port="${cassandra.port}"/> <cassandra:session keyspace-name="${cassandra.keyspace}" /> <!-- orm映射 --> <cassandra:mapping /> <!-- 类型转换 --> <cassandra:converter/> <!-- cassandra operater --> <cassandra:template id="cassandraTemplate"/> <!-- spring data 接口 --> <cassandra:repositories base-package="com.itheima.springcass.repository" /> <!-- 自动扫描(自动注入) --> <context:component-scan base-package="com.itheima" /></beans>6.3.3 编写代码
创建实体类 Student.java
package com.itheima.springcass.entity;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.springframework.data.cassandra.core.mapping.PrimaryKey;import org.springframework.data.cassandra.core.mapping.Table;import java.util.List;import java.util.Map;import java.util.Set;@Data@Table@AllArgsConstructor@NoArgsConstructorpublic class Student { @PrimaryKey private Long id; private String address; private Integer age; private Map<String,String> education; private String email ; private Integer gender; private Set<String> interest; private List<String> phone ; private String name;}创建StudentRepository.java
package com.itheima.springcass.repository;import com.itheima.springcass.entity.Student;import org.springframework.data.cassandra.repository.CassandraRepository;public interface StudentRepository extends CassandraRepository<Student,Long> {}创建service
package com.itheima.springcass.service;import com.itheima.springcass.entity.Student;import com.itheima.springcass.repository.StudentRepository;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;@Servicepublic class StudentService { @Autowired private StudentRepository repository; public List<Student> queryAllStudent(){ List<Student> studentList = repository.findAll(); return studentList; } public Student queryoneStudent(Long id){ return repository.findById(id).get(); } public void saveStudent(Student student){ repository.save(student); } public void updateStudent(){ Student student = this.queryoneStudent(1019L); student.setGender(0); repository.save(student); } public void deleteStudent(Long id){ repository.deleteById(id); }}创建测试代码
package com.itheima.springcass.test;import com.itheima.springcass.entity.Student;import com.itheima.springcass.service.StudentService;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.context.ConfigurableApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import java.net.InetSocketAddress;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;public class TestSpringCassandra { private StudentService studentService; @Before public void init(){ ConfigurableApplicationContext context = new ClassPathXmlApplicationContext("springContext.xml"); studentService = (StudentService)context.getBean("studentService"); } @Test public void testQueryAll(){ List<Student> students = studentService.queryAllStudent(); for (Student student : students) { System.out.println(student); System.out.println("============="); } } @Test public void testOne(){ Student student = studentService.queryoneStudent(1018L); System.out.println(student); } @Test public void insert(){ HashMap<String, String> education = new HashMap<>(); education.put("小学", "中心第五小学"); education.put("中学", "中心实验中学"); HashSet<String> interest = new HashSet<>(); interest.add("看书"); interest.add("电影"); List<String> phones = new ArrayList<>(); phones.add("130-66666666"); phones.add("15766666666");// 构造student Student student = new Student( 1028L, "北京市朝阳区800号", 30, education, "xiaoxiaoxian@14564e.com", 0, interest, phones, "小小咸"); studentService.saveStudent(student); } @Test public void delete(){ studentService.deleteStudent(1028L); }}七、Cassandra集群搭建
7.1 准备
为每台虚拟机设置静态IP
- 192.168.137.131 (seed)
- 192.168.137.132 (seed)
- 192.168.137.133
一个新节点加入集群时,需要通过种子节点来发现集群中其它节点,需要至少一个活跃的种子节点可以连接,一旦节点加入这个集群,知道了集群中的其它节点,这个节点在下次启动的时候就不需要种子节点了。 对于种子节点没有特殊要求,可以设置任何一个节点为种子。
7.2 改配置
cluster_name 集群名字,每个节点都要一样
listen_address 填写当前节点所在机器的IP地址
具体修改如下:
192.168.137.132 机器的修改内容
cluster_name: 'Test Cluster'seed_provider: - class_name: org.apache.cassandra.locator.SimpleSeedProvider parameters: - seeds: "192.168.137.131,192.168.137.132"listen_address: 192.168.137.132rpc_address: 192.168.137.132修改完成后,启动每个节点。可以在192.168.137.131机器上使用noodtools status 命令进行测试
八、Cassandra的数据存储
这些数据主要分为三种: CommitLog:主要记录客户端提交过来的数据以及操作。这种数据被持久化到磁盘中,方便数据没有被持久化到磁盘时可以用来恢复。 Memtable:用户写的数据在内存中的形式,它的对象结构在后面详细介绍。其实还有另外一种形式是BinaryMemtable 这个格式目前 Cassandra 并没有使用,这里不再介绍了。 SSTable:数据被持久化到磁盘,这又分为 Data、Index 和 Filter 三种数据格式。
8.1 CommitLog 数据格式
当一个Commitlog文件写满以后,会新建一个的文件。当旧的Commitlog文件不再需要时,会自动清除。
8.2 Memtable 内存中数据结构
SSTable是Read Only的,且一般情况下,一个ColumnFamily会对应多个SSTable,当用户检索数据时,Cassandra使用了Bloom Filter,即通过多个hash函数将key映射到一个位图中,来快速判断这个key属于哪个SSTable。
在Cassandra中,compaction主要完成的任务是:
2) 合并SSTable:compaction 将多个 SSTable 合并为一个(合并的文件包括索引文件,数据文件,bloom filter文件),以提高读操作的效率;
Cassandra的集群中每一台机器都是对等的,不存在主、从节点的区分,集群中任何一台机器出现故障是,整个集群系统不会受到影响。
一致哈希
在Cassandra中,每个表有Primary Key外,还有一个叫做Partition Key,Partition Key列的Value会通过Cassandra一致性算法得出一个哈希值,这个哈希值将决定这行数据该放到哪个节点上。
如果简单的使用哈希值,可能会引起数据分布不均匀的问题,为了解决这个问题,一致性哈希提出虚拟节点的概念,简单的理解就是:将某个节点根据一个映射算法,映射出若干个虚拟子节点出来,再把这些节点分布在哈希环上面,保存数据时,如果通过一致性哈希计算落到某个虚拟子节点上,这条记录就会被存在这个虚拟子节点的母节点上。
Range:在Cassandra中,每一个节点负责处理hash环的一段数据,范围是从上一个节点的Token到本节点Token,这就是Range
这里我们使用cassandra官方文档中一张图来说明
Cassandra使用Gossip的协议维护集群的状态,这是个端对端的通信协议。通过Gossip,每个节点都能知道集群中包含哪些节点,以及这些节点的状态,
Gossip进程每秒运行一次,与最多3个其他节点交换信息,这样所有节点可很快了解集群中的其他节点信息。
