搞笑
altertable(java:Cassandra入门与实战——下)

ntent="mp" href="https://www.toutiao.com/i6852576380111192587/?group_id=6852576380111192587" rel="noopener noreferrer" target="_blank">Cassandra入门与实战——中

六、JAVA户端操作Cassandra

Cassandra有众多的JAVA客户端,目前比较流程的都是不同公司开源的客户端,如:Netfix的astyanax,datastax的java-driver,hector,以及Spring Data for Apache Cassandra。

6.2 datastax的java-driver

6.2.1 介绍

https://docs.datastax.com/en/landing_page/doc/landing_page/current.html

源码地址:

在页面上可以看到使用java-driver的简单介绍,包含Maven依赖内容,环境兼容要求。

6.2.2 创建Maven工程引入依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <parent>        <artifactId>cassandra-demo</artifactId>        <groupId>com.itheima</groupId>        <version>1.0-SNAPSHOT</version>    </parent>    <modelVersion>4.0.0</modelVersion>    <artifactId>datastax-demo</artifactId>    <dependencies>        <!--cassandra 包-->        <dependency>            <groupId>com.datastax.cassandra</groupId>            <artifactId>cassandra-driver-core</artifactId>            <version>3.9.0</version>        </dependency>        <dependency>            <groupId>com.datastax.cassandra</groupId>            <artifactId>cassandra-driver-mapping</artifactId>            <version>3.9.0</version>        </dependency>        <!--junit 测试-->        <dependency>            <groupId>junit</groupId>            <artifactId>junit</artifactId>            <version>4.12</version>        </dependency>    </dependencies></project>

6.2.3 操作键空间

package com.itheima.test;import com.datastax.oss.driver.api.core.CqlIdentifier;import com.datastax.oss.driver.api.core.CqlSession;import com.datastax.oss.driver.api.core.cql.SimpleStatement;import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;import com.datastax.oss.driver.api.querybuilder.schema.Drop;import org.junit.Before;import org.junit.Test;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.UnknownHostException;import java.util.Optional;public class TestKeySpace {    CqlSession  session = null;    @Before    public void init(){        //Cassandra服务器地址        String hosts = "192.168.137.131";        //端口        int port = 9042;        try {            session = CqlSession.builder().withLocalDatacenter("datacenter1").build();        } catch (Exception e) {            e.printStackTrace();        }    }        @Test    public void findKeySpace(){        Optional<CqlIdentifier> keyspace = session.getKeyspace();        if(keyspace.isPresent()){            CqlIdentifier cqlIdentifier = keyspace.get();            System.out.println("键空间名:"+cqlIdentifier.asInternal());        }    }        @Test    public void createKeySpace(){        SimpleStatement simpleStatement = SchemaBuilder.                createKeyspace("school").                ifNotExists().                withSimpleStrategy(1).                build();        session.execute(simpleStatement);    }        @Test    public void dropKeySpace(){        SimpleStatement state = SchemaBuilder.dropKeyspace("school").ifExists().build();        session.execute(state);    }}

6.2.4 操作表

package com.itheima.test;import com.datastax.driver.core.*;import com.datastax.driver.core.schemabuilder.SchemaBuilder;import com.datastax.driver.mapping.Mapper;import com.datastax.driver.mapping.MappingManager;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;import com.itheima.entity.Student;import org.junit.Before;import org.junit.Test;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.UnknownHostException;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;import static com.datastax.driver.core.querybuilder.QueryBuilder.select;public class TestTable {    Session session = null;    Mapper<Student> mapper;        @Before    public void init() {        //Cassandra服务器地址        String hosts = "192.168.137.131";        //端口        int port = 9042;        Cluster cluster = Cluster.builder().                addContactPoint(hosts).                withPort(port).                build();        session = cluster.connect();    }        @Test    public void createTable() {        Statement statement = SchemaBuilder.                createTable("school", "student").                addPartitionKey("id", DataType.bigint()).                addColumn("address", DataType.text()).                addColumn("age", DataType.cint()).                addColumn("education", DataType.map(DataType.text(), DataType.text())).//                addColumn("email", DataType.text()).                addColumn("gender", DataType.cint()).                addColumn("interest", DataType.set(DataType.text())).                addColumn("phone", DataType.list(DataType.text())).                addColumn("name", DataType.text()).                ifNotExists();        ResultSet resultSet = session.execute(statement);        System.out.println(resultSet.getExecutionInfo());    }        @Test    public void updateTable(){//        添加字段        SchemaBuilder.alterTable("school","student")                .addColumn("email").type(DataType.text());//        修改字段        SchemaBuilder.alterTable("school","student")                .alterColumn("email").type(DataType.set(DataType.text()));//        删除字段        SchemaBuilder.alterTable("school","student")                .dropColumn("email");    }        @Test    public void dropTable(){        Statement statement = SchemaBuilder.dropTable("school","student").ifExists();        session.execute(statement);    }        @Test    public void insertByCQL() {        String insertSql = "INSERT INTO school.student (id,address,age,gender,name,interest, phone,education) VALUES (1011,'中山路21号',16,1,'李小仙',{'游泳', '跑步'},['010-88888888','13888888888'],{'小学' : '城市第一小学', '中学' : '城市第一中学'}) ;";        session.execute(insertSql);    }        @Test    public void insertByMapper() {        mapper = new MappingManager(session).mapper(Student.class);        HashMap<String, String> education = new HashMap<>();        education.put("小学", "中心第五小学");        education.put("中学", "中心实验中学");        HashSet<String> interest = new HashSet<>();        interest.add("看书");        interest.add("电影");        List<String> phones = new ArrayList<>();        phones.add("020-66666666");        phones.add("13666666666");//        构造student        Student student = new Student(                1012L,                "北京市朝阳区100号",                20,                education,                "xiaoshuai@123.com",                1,                interest,                phones,                "马小帅");//         数据保存到cassandra服务器        mapper.save(student);    }        @Test    public void queryAll(){        mapper = new MappingManager(session).mapper(Student.class);        ResultSet resultSet = session.execute(select().all().from("school","student"));        List<Student> studentList = mapper.map(resultSet).all();        for (Student student : studentList) {            System.out.println(student);        }    }        @Test    public void queryOne(){        mapper = new MappingManager(session).mapper(Student.class);        ResultSet resultSet = session.execute(select().all().from("school","student"));        Student student = mapper.map(resultSet).one();        System.out.println(student);    }        @Test    public void queryById(){        mapper = new MappingManager(session).mapper(Student.class);        ResultSet resultSet = session.execute(select().all().from("school", "student").where(eq("id", 1012L)));        List<Student> studentList = mapper.map(resultSet).all();        for (Student student : studentList) {            System.out.println(student);        }    }        @Test    public void delete() {        mapper = new MappingManager(session).mapper(Student.class);        Long id = 1011L;        mapper.delete(id);    }}

6.2.5 Prepared statements

https://docs.datastax.com/en/developer/java-driver/3.0/manual/statements/prepared/

基本原理:

代码:

    @Test    public void batchPrepare(){//        先把语句预编译        BatchStatement batch = new BatchStatement();        PreparedStatement ps = session .prepare("INSERT INTO school.student (id,address,age,gender,name,interest, phone,education) VALUES (?,?,?,?,?,?,?,?)");//        循环10次,构造不同的student对象        for (int i = 0; i < 10; i++) {            HashMap<String, String> education = new HashMap<>();            education.put("小学", "中心第"+i+"小学");            education.put("中学", "第"+i+"中学");            HashSet<String> interest = new HashSet<>();            interest.add("看书");            interest.add("电影");            List<String> phones = new ArrayList<>();            phones.add("0"+i+"0-66666666");            phones.add("1"+i+"666666666");//        构造student            Student student = new Student(                    1013L+i,                    "北京市朝阳区10"+i+"号",                    21+i,                    education,                    "xiaoshuai@123.com",                    1,                    interest,                    phones,                    "学生"+i);            BoundStatement bs = ps.bind(student.getId(),                    student.getAddress(),                    student.getAge(),                    student.getGender(),                    student.getName(),                    student.getInterest(),                    student.getPhone(),                    student.getEducation());            batch.add(bs);        }        session.execute(batch);        batch.clear();    }

6.3 Spring Data Cassandra

6.3.1 介绍

https://spring.io/projects/spring-data-cassandra

Spring Data for Apache Cassandra 2.x binaries require JDK level 8.0 and later and Spring framework 5.2.7.RELEASE and later. It requires Cassandra 2.0 or later.

6.3.2 创建Maven工程

引入依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <parent>        <artifactId>cassandra-demo</artifactId>        <groupId>com.itheima</groupId>        <version>1.0-SNAPSHOT</version>    </parent>    <modelVersion>4.0.0</modelVersion>    <artifactId>springCassandra-demo</artifactId>    <dependencies>        <!--使用spring-data-cassandra 2.2.8-->        <dependency>            <groupId>org.springframework.data</groupId>            <artifactId>spring-data-cassandra</artifactId>            <version>2.2.8.RELEASE</version>        </dependency>        <dependency>            <groupId>junit</groupId>            <artifactId>junit</artifactId>            <version>4.12</version>        </dependency>    </dependencies></project>

创建配置文件

springContext.xml 配置文件

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:cassandra="http://www.springframework.org/schema/data/cassandra"       xmlns:context="http://www.springframework.org/schema/context"       xsi:schemaLocation="http://www.springframework.org/schema/cql http://www.springframework.org/schema/cql/spring-cql-1.0.xsdhttp://www.springframework.org/schema/data/cassandra http://www.springframework.org/schema/data/cassandra/spring-cassandra-1.0.xsdhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">    <context:property-placeholder location="classpath:cassandra.properties"/>    <!--cassandra的配置-->    <cassandra:cluster contact-points="${cassandra.contactpoints}" port="${cassandra.port}"/>    <cassandra:session keyspace-name="${cassandra.keyspace}" />    <!-- orm映射 -->    <cassandra:mapping />    <!-- 类型转换 -->    <cassandra:converter/>    <!-- cassandra operater -->    <cassandra:template id="cassandraTemplate"/>    <!-- spring data 接口 -->    <cassandra:repositories base-package="com.itheima.springcass.repository" />    <!-- 自动扫描(自动注入) -->    <context:component-scan base-package="com.itheima" /></beans>

6.3.3 编写代码

创建实体类 Student.java

package com.itheima.springcass.entity;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.springframework.data.cassandra.core.mapping.PrimaryKey;import org.springframework.data.cassandra.core.mapping.Table;import java.util.List;import java.util.Map;import java.util.Set;@Data@Table@AllArgsConstructor@NoArgsConstructorpublic class Student {    @PrimaryKey    private  Long id;    private  String address;    private  Integer age;    private  Map<String,String> education;    private  String email ;    private  Integer gender;    private  Set<String> interest;    private  List<String> phone ;    private  String name;}

创建StudentRepository.java

package com.itheima.springcass.repository;import com.itheima.springcass.entity.Student;import org.springframework.data.cassandra.repository.CassandraRepository;public interface StudentRepository extends CassandraRepository<Student,Long> {}

创建service

package com.itheima.springcass.service;import com.itheima.springcass.entity.Student;import com.itheima.springcass.repository.StudentRepository;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;@Servicepublic class StudentService {        @Autowired    private StudentRepository repository;        public List<Student> queryAllStudent(){        List<Student> studentList = repository.findAll();        return studentList;    }        public Student queryoneStudent(Long id){        return repository.findById(id).get();    }        public void saveStudent(Student student){        repository.save(student);    }        public void updateStudent(){        Student student = this.queryoneStudent(1019L);        student.setGender(0);        repository.save(student);    }        public void deleteStudent(Long id){        repository.deleteById(id);    }}

创建测试代码

package com.itheima.springcass.test;import com.itheima.springcass.entity.Student;import com.itheima.springcass.service.StudentService;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.context.ConfigurableApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import java.net.InetSocketAddress;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;public class TestSpringCassandra {    private StudentService studentService;    @Before    public void init(){        ConfigurableApplicationContext context = new ClassPathXmlApplicationContext("springContext.xml");        studentService = (StudentService)context.getBean("studentService");    }        @Test    public void testQueryAll(){        List<Student> students = studentService.queryAllStudent();        for (Student student : students) {            System.out.println(student);            System.out.println("=============");        }    }    @Test    public void testOne(){        Student student = studentService.queryoneStudent(1018L);        System.out.println(student);    }    @Test    public void insert(){        HashMap<String, String> education = new HashMap<>();        education.put("小学", "中心第五小学");        education.put("中学", "中心实验中学");        HashSet<String> interest = new HashSet<>();        interest.add("看书");        interest.add("电影");        List<String> phones = new ArrayList<>();        phones.add("130-66666666");        phones.add("15766666666");//        构造student        Student student = new Student(                1028L,                "北京市朝阳区800号",                30,                education,                "xiaoxiaoxian@14564e.com",                0,                interest,                phones,                "小小咸");        studentService.saveStudent(student);    }    @Test    public void delete(){        studentService.deleteStudent(1028L);    }}

七、Cassandra集群搭建

7.1 准备

为每台虚拟机设置静态IP

  • 192.168.137.131 (seed)
  • 192.168.137.132 (seed)
  • 192.168.137.133

一个新节点加入集群时,需要通过种子节点来发现集群中其它节点,需要至少一个活跃的种子节点可以连接,一旦节点加入这个集群,知道了集群中的其它节点,这个节点在下次启动的时候就不需要种子节点了。 对于种子节点没有特殊要求,可以设置任何一个节点为种子。

7.2 改配置

cluster_name 集群名字,每个节点都要一样

listen_address 填写当前节点所在机器的IP地址

具体修改如下:

192.168.137.132 机器的修改内容

cluster_name: 'Test Cluster'seed_provider:  - class_name: org.apache.cassandra.locator.SimpleSeedProvider    parameters:         - seeds: "192.168.137.131,192.168.137.132"listen_address: 192.168.137.132rpc_address: 192.168.137.132

修改完成后,启动每个节点。可以在192.168.137.131机器上使用noodtools status 命令进行测试

八、Cassandra的数据存储

这些数据主要分为三种: CommitLog:主要记录客户端提交过来的数据以及操作。这种数据被持久化到磁盘中,方便数据没有被持久化到磁盘时可以用来恢复。 Memtable:用户写的数据在内存中的形式,它的对象结构在后面详细介绍。其实还有另外一种形式是BinaryMemtable 这个格式目前 Cassandra 并没有使用,这里不再介绍了。 SSTable:数据被持久化到磁盘,这又分为 Data、Index 和 Filter 三种数据格式。

8.1 CommitLog 数据格式

当一个Commitlog文件写满以后,会新建一个的文件。当旧的Commitlog文件不再需要时,会自动清除。

8.2 Memtable 内存中数据结构

SSTable是Read Only的,且一般情况下,一个ColumnFamily会对应多个SSTable,当用户检索数据时,Cassandra使用了Bloom Filter,即通过多个hash函数将key映射到一个位图中,来快速判断这个key属于哪个SSTable。

在Cassandra中,compaction主要完成的任务是:

2) 合并SSTable:compaction 将多个 SSTable 合并为一个(合并的文件包括索引文件,数据文件,bloom filter文件),以提高读操作的效率;

Cassandra的集群中每一台机器都是对等的,不存在主、从节点的区分,集群中任何一台机器出现故障是,整个集群系统不会受到影响。

一致哈希

在Cassandra中,每个表有Primary Key外,还有一个叫做Partition Key,Partition Key列的Value会通过Cassandra一致性算法得出一个哈希值,这个哈希值将决定这行数据该放到哪个节点上。

如果简单的使用哈希值,可能会引起数据分布不均匀的问题,为了解决这个问题,一致性哈希提出虚拟节点的概念,简单的理解就是:将某个节点根据一个映射算法,映射出若干个虚拟子节点出来,再把这些节点分布在哈希环上面,保存数据时,如果通过一致性哈希计算落到某个虚拟子节点上,这条记录就会被存在这个虚拟子节点的母节点上。

Range:在Cassandra中,每一个节点负责处理hash环的一段数据,范围是从上一个节点的Token到本节点Token,这就是Range

这里我们使用cassandra官方文档中一张图来说明

java:Cassandra入门与实战——下nerror="javascript:errorimg.call(this);">

Cassandra使用Gossip的协议维护集群的状态,这是个端对端的通信协议。通过Gossip,每个节点都能知道集群中包含哪些节点,以及这些节点的状态,

Gossip进程每秒运行一次,与最多3个其他节点交换信息,这样所有节点可很快了解集群中的其他节点信息。


顶一下()     踩一下()

热门推荐

发表评论
0评