目录

1.Spark SQL 简介

2.SparkSession

3.Spark SQL 数据的读写

3.1 读写 TXT 文件

3.2 读写 CSV 文件


        Spark SQL是Apache Spark一个模块用于处理结构化数据。它提供了两种编程抽象:DataFrame和DataSet,并作为分布式SQL查询引擎

        DataFrame是一个分布式数据集合,类似于传统数据库中的表,带有schema信息。DataFrame是以列式格式进行存储的,每一列可以不同数据类型(如整数字符串等),并且每一列都有一个名称

        Dataset是在Spark 1.6中新增一个接口,它提供了RDD(强类型可以使用强大的lambda函数)的优点,以及Spark SQL优化执行引擎的优点。数据集可以从JVM对象构建然后使用函数转换mapflatMapfilter等)进行操作

        SparkSession是Spark 2.0版本引入的新概念,它是对SparkContext升级扩展。SparkSession提供了更加统一的API(在Spark 1.x版本中,每个API都需要一个对应的Context例如StreamingContext、SQLContext、HiveContext等),包括SQL查询、数据转换、数据读写注册数据源管理配置操作
        SparkSession是Spark 应用程序的新入口点,要创建一个基本的SparkSession,只需使用SparkSession.builder即可

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.SparkSession;

public class SparkSqlDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        spark.stop();
    }
}
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL Demo")
//      .config("spark.some.config.option", "some-value")
      .getOrCreate()

    spark.stop
  }

}

        Spark SQL提供了spark.read().text(“file_name“)来将文本文件目录下的文本文件读取到Spark DataFrame中,以及dataframe.write().text(“path“)来将数据写入文本文件。在读取文本文件时,每一行默认成为具有字符串value”列的一行。在写入文本文件时以及dataframe对象中的每一行也只能有一列,否则会提示“Text data source supports only a single column, and you have 2 columns.”错误

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSqlTxtDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL TXT Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();
        //读取txt文件
        Dataset<Row&gt; df1 = spark.read()
//                .option("lineSep", ",")//自定义分隔符:默认是`r`, `rn` and `n`
                .text("F:\24\spark\data\people.txt");
        df1.show();
        //读取txt文件的另一种方式
        Dataset<Row&gt; df2 = spark.read().format("text")
               .load("F:\24\spark\data\people.txt");
        df2.show();

        //写入txt文件
        df1.write().text("F:\24\spark\data\people_1.txt");
        //写入txt文件的另外一种方式
        df2.write().format("text").save("F:\24\spark\data\people_2.txt");
        spark.stop();
    }
}
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlTxtDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL TXT Demo")
      //      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取txt文件
    val df1 = spark.read.text("F:\24\spark\data\people.txt")
    df1.show
    //读取txt文件的另一种方式
    val df2 = spark.read.format("text").load("F:\24\spark\data\people.txt")
    df2.show

    //写入txt文件
    df1.write.text("F:\24\spark\data\people_1.txt")
    //写入txt文件的另一种方式
    df2.write.format("text").save("F:\24\spark\data\people_2.txt")
    
    spark.stop
  }

}

        Spark SQL提供了spark.read().csv(“file_name“)将CSV格式的文件或文件夹读取到Spark DataFrame中,以及dataframe.write().csv(“path“)将数据写入CSV文件。

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSqlCsvDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL CSV Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        //读取CSV文件:CSV带有表头
        Dataset<Row> df1 = spark.read()
               .option("header", "true")//设置是否表头
                .option("delimiter", ";") //设置分隔符,默认逗号
               .csv("F:\24\spark\data\people.csv");
        df1.show();
        //读取CSV文件:CSV带有表头
        //csv文件的另外一种读取方式
        Dataset<Row> df2 = spark.read().format("csv")
                .option("header", "true")//设置是否有表头
                .option("delimiter", ";") //设置分隔符,默认逗号
               .load("F:\24\spark\data\people.csv");
        df2.show();

        //写入CSV文件
        df1.write().csv("F:\24\spark\data\people_1.csv");
        //写入CSV文件的另外一种方式
        df2.write().format("csv").save("F:\24\spark\data\people_2.csv");
        spark.stop();
    }
}
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlCsvDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL CSV Demo")
      //      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取CSV文件
    val df1 = spark.read
      .option("header", "true")//是否有表头
      .option("delimiter", ";") //分隔符
      .csv("F:\24\spark\data\people.csv")
    df1.show
    //读取CSV文件的另一种方式
    val df2 = spark.read.format("csv")
      .option("header", "true") //是否有表头
      .option("delimiter", ";") //分隔符
      .load("F:\24\spark\data\people.csv")
    df2.show

    //写入csv文件
    df1.write.csv("F:\24\spark\data\people_1.csv")
    //写入csv文件的另一种方式
    df2.write.format("csv").save("F:\24\spark\data\people_2.csv")

    spark.stop
  }

}

        Spark SQL提供了spark.read().json(“file_name“)将json格式的文件或文件夹读取到Spark DataFrame中,以及dataframe.write().json(“path“)将数据写入json文件。

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSqlJsonDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL JSON Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        //读取JSON文件
        Dataset<Row> df1 = spark.read()
               .json("F:\24\spark\data\people.json");
        df1.show();
        //读取JSON文件
        //JSON文件的另外一种读取方式
        Dataset<Row> df2 = spark.read().format("json")
               .load("F:\24\spark\data\people.json");
        df2.show();

        //写入JSON文件
        df1.write().json("F:\24\spark\data\people_1.json");
        //写入JSON文件的另外一种方式
        df2.write().format("csv").save("F:\24\spark\data\people_2.json");
        spark.stop();
    }
}
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlJsonDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL JSON Demo")
      //      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取JSON文件
    val df1 = spark.read
      .json("F:\24\spark\data\people.json")
    df1.show
    //读取JSON文件的另一种方式
    val df2 = spark.read.format("json")
      .load("F:\24\spark\data\people.json")
    df2.show

    //写入JSON文件
    df1.write.json("F:\24\spark\data\people_1.json")
    //写入JSON件的另一种方式
    df2.write.format("json").save("F:\24\spark\data\people_2.json")

    spark.stop
  }

}

        Parquet 是一种通用的列式存储格式,Parquet已广泛应用于Hadoop生态圈(MapReduce、Spark、Hive、Impala),是离线数仓的主要数据格式。Spark SQL提供了spark.read().parquet(“file_name“)将parquet格式的文件或文件夹读取到Spark DataFrame中,以及dataframe.write().parquet(“path“)将数据写入parquet文件。

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSqlParquetDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL Parquet Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        //读取Parquet文件
        Dataset<Row> df1 = spark.read()
               .parquet("F:\24\spark\data\users.parquet");
        df1.show();
        //Parquet件的另外一种读取方式
        Dataset<Row> df2 = spark.read().format("parquet")
               .load("F:\24\spark\data\users.parquet");
        df2.show();

        //写入Parquet文件
        df1.write().parquet("F:\24\spark\data\users_1.parquet");
        //写入Parquet文件的另外一种方式
        df2.write().format("parquet").save("F:\24\spark\data\users_2.parquet");
        spark.stop();
    }
}
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlParquetDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL Parquet Demo")
      //      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取Parquet文件
    val df1 = spark.read
      .parquet("F:\24\spark\data\users.parquet")
    df1.show
    //读取Parquet文件的另一种方式
    val df2 = spark.read.format("parquet")
      .load("F:\24\spark\data\users.parquet")
    df2.show

    //写入Parquet文件
    df1.write.parquet("F:\24\spark\data\users_1.parquet")
    //写入Parquet件的另一种方式
    df2.write.format("parquet").save("F:\24\spark\data\users_2.parquet")

    spark.stop
  }

}

        ORC是一种列式存储结构,ORC最早产生于Apache Hive,用于减小Hadoop文件的存储空间和加速Hive查询。和Parquet一样,ORC也广泛应用于Hadoop生态圈。Spark SQL提供了spark.read().orc(“file_name“)将orc格式的文件或文件夹读取到Spark DataFrame中,以及dataframe.write().orc(“path“)将数据写入orc文件。

  •  Java代码实现:
package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSqlOrcDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL ORC Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        //读取ORC文件
        Dataset<Row> df1 = spark.read()
               .orc("F:\24\spark\data\users.orc");
        df1.show();
        //ORC文件的另外一种读取方式
        Dataset<Row> df2 = spark.read().format("orc")
               .load("F:\24\spark\data\users.orc");
        df2.show();

        //写入ORC文件
        df1.write().orc("F:\24\spark\data\users_1.orc");
        //写入ORC文件的另外一种方式
        df2.write().format("orc").save("F:\24\spark\data\users_2.orc");

        spark.stop();
    }
}
  • Scala代码实现:
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlOrcDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL ORC Demo")
      //      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取ORC文件
    val df1 = spark.read
      .orc("F:\24\spark\data\users.orc")
    df1.show
    //读取ORC文件的另一种方式
    val df2 = spark.read.format("orc")
      .load("F:\24\spark\data\users.orc")
    df2.show

    //写入ORC文件
    df1.write.orc("F:\24\spark\data\users_1.orc")
    //写入ORC件的另一种方式
    df2.write.format("orc").save("F:\24\spark\data\users_2.orc")

    spark.stop
  }

}

        Spark SQL 可以通过 JDBC 从关系数据库读取数据的方式创建 DataFrame,通过对DataFrame 一系列计算后,还可以将数据再写回关系数据库中。

  • Java代码实现:
package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import java.util.Properties;

public class SparkSqlMySQLDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL MySQL Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();
        String url = "jdbc:mysql://localhost:3307/demo?useSSL=false";
        String user = "root";
        String password = "wsx-123";
        String driver = "com.mysql.cj.jdbc.Driver";
        String table = "demo.dim_area";

        Properties properties = new Properties();
        properties.setProperty("user",user);
        properties.setProperty("password",password);
        properties.setProperty("driver",driver);
        properties.setProperty("url",url);
        properties.setProperty("useUnicode","true");
        properties.setProperty("characterEncoding","utf-8");

        //读取MySQL文件
        Dataset<Row> df1 = spark.read()
                        .jdbc(url,table,properties);
        df1.show(2);
        //MySQL的另外一种读取方式
        Dataset<Row> df2 = spark.read().format("jdbc")
                .option("url", url)
                .option("dbtable", table)
                .option("user", user)
                .option("password", password)
               .load();
        df2.show(3);

//        //写入MySQL文件
        df1.write().mode(SaveMode.Overwrite).jdbc(url,"dim_area_1",properties);
//        //写入MySQL的另外一种方式
        df2.write().mode(SaveMode.Overwrite).format("jdbc")
                .option("url", url)
                .option("dbtable", "dim_area_2")
                .option("user", user)
                .option("password", password)
                .save();

        spark.stop();
    }
}
  • Scala代码实现:
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}

import java.util.Properties

object SparkSqlMySQLDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL MySQL Demo")
      //      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    val url = "jdbc:mysql://localhost:3307/demo?useSSL=false"
    val user = "root"
    val password = "wsx-123"
    val driver = "com.mysql.cj.jdbc.Driver"
    val table = "demo.dim_area"

    val properties = new Properties
    properties.setProperty("user", user)
    properties.setProperty("password", password)
    properties.setProperty("driver", driver)
    properties.setProperty("url", url)
    properties.setProperty("useUnicode", "true")
    properties.setProperty("characterEncoding", "utf-8")

    //读取MySQL文件
    val df1 = spark.read.jdbc(url, table, properties)
    df1.show(2)
    //MySQL的另外一种读取方式
    val df2 = spark.read.format("jdbc")
      .option("url", url)
      .option("dbtable", table)
      .option("user", user)
      .option("password", password)
      .load

    df2.show(3)

    //写入MySQL文件
    df1.write.mode(SaveMode.Overwrite).jdbc(url, "dim_area_1", properties)
    //写入MySQL的另外一种方式
    df2.write.mode(SaveMode.Overwrite).format("jdbc")
      .option("url", url)
      .option("dbtable", "dim_area_2")
      .option("user", user)
      .option("password", password)
      .save

    spark.stop
  }

}

        注意通过jdbc方式写入mysql数据库是,dataframe中的字段保存表中必须存在,否则报错

        SparkSession中的sql函数允许应用程序编程运行SQL查询,并将结果作为DataFrame返回。这种方式的查询必须要有临时视图或者全局视图辅助

  • Java代码实现:
package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSqlDemo {
    public static void main(String[] args) throws AnalysisException {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();
        //读取CSV文件:CSV带有表头
        Dataset<Row> df1 = spark.read()
                .option("header", "true")//设置是否有表头
                .option("delimiter", ";") //设置分隔符,默认逗号
                .csv("F:\24\spark\data\people.csv");
        df1.show();
        //创建临时视图
        df1.createOrReplaceTempView("people");
        //创建全局临时视图
         df1.createGlobalTempView("people");
        Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
        sqlDF.show();
        //+-----+---+---------+
        //| name|age|      job|
        //+-----+---+---------+
        //|Jorge| 30|Developer|
        //|  Bob| 32|Developer|
        //+-----+---+---------+
        //全局临时视图保存global_temp数据库中,使用全局临时视图时需要路径访问,如:global_temp.people
        Dataset<Row> sqlDF2 = spark.sql("SELECT name,age,job FROM global_temp.people WHERE age > 30");
        sqlDF2.show();
        //+-----+---+---------+
        //| name|age|      job|
        //+-----+---+---------+
        //|  Bob| 32|Developer|
        //+-----+---+---------+
        spark.stop();
    }
}
  • Scala代码实现:
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL Demo")
//      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取CSV文件
    val df1 = spark.read
      .option("header", "true") //是否有表头
      .option("delimiter", ";") //分隔符
      .csv("F:\24\spark\data\people.csv")
    df1.show
    // 创建临时视图
    df1.createOrReplaceTempView("people")
    // 创建全局临时视图
     df1.createGlobalTempView("people")
    //使用临时视图
    spark.sql("SELECT * FROM people").show()
    //+-----+---+---------+
    //| name|age|      job|
    //+-----+---+---------+
    //|Jorge| 30|Developer|
    //|  Bob| 32|Developer|
    //+-----+---+---------+

    // 全局临时视图保存global_temp数据库中,使用全局临时视图时需要路径访问,如:global_temp.people
    spark.sql("SELECT name,age,job FROM global_temp.people where age > 30").show()
    //+-----+---+---------+
    //| name|age|      job|
    //+-----+---+---------+
    //|  Bob| 32|Developer|
    //+-----+---+---------+
    spark.stop
  }

}

        DataFrame DataFrame为Scala、Java、Python和R提供了针对结构化数据操作领域特定语言。使用 DSL 语法不必创建临时视图。

  • Java代码实现:
package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.*;

import static org.apache.spark.sql.functions.col;
public class SparkSqlDSLDemo {
    public static void main(String[] args) throws AnalysisException {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL DSL")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();
        //读取CSV文件:CSV带有表头
        Dataset<Row> df1 = spark.read()
                .option("header", "true")//设置是否有表头
                .option("delimiter", ";") //设置分隔符,默认逗号
                .csv("F:\24\spark\data\people.csv");
        df1.show();
        // 打印字段信息
        df1.printSchema();
//		root
//		 |-- name: string (nullable = true)
//		 |-- age: string (nullable = true)
//		 |-- job: string (nullable = true)

        //查询指定字段
        df1.select("name","age","job").show();
        //+-----+---+---------+
        //| name|age|      job|
        //+-----+---+---------+
        //|Jorge| 30|Developer|
        //|  Bob| 32|Developer|
        //+-----+---+---------+

       //查询指定字段并设置别名,对查询的字段进行运算
        df1.select(col("name").alias("name")
                ,col("age").plus(1).alias("age")
                ,col("job"))
                .show();
        //+-----+---+---------+
        //| name|age|      job|
        //+-----+---+---------+
        //|Jorge| 31|Developer|
        //|  Bob| 33|Developer|
        //+-----+---+---------+

        //过滤数据
        df1.filter(col("age").gt(30)).show();
//        df1.where(col("age").gt(30)).show();
        //+-----+---+---------+
        //| name|age|      job|
        //+-----+---+---------+
        //|  Bob| 32|Developer|
        //+-----+---+---------+

        //分组聚合
        df1.groupBy("job").count().show();
        //+---------+-----+
        //|      job|count|
        //+---------+-----+
        //|Developer|    2|
        //+---------+-----+
        //复杂聚合
        df1.groupBy(col("job").alias("post")).agg(
                functions.count(col("name")).as("nums")
                ,functions.sum(col("age")).as("total_age")
                ,functions.avg(col("age")).as("aver_age")
                ).show();
        //+---------+----+---------+--------+
        //|     post|nums|total_age|aver_age|
        //+---------+----+---------+--------+
        //|Developer|   2|     62.0|    31.0|
        //+---------+----+---------+--------+
        spark.stop();
    }
}
  • Scala代码实现:
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.{SparkSession, functions}
import org.apache.spark.sql.functions.col

object SparkSqlDSLDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL DSL")
//      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取CSV文件
    val df1 = spark.read
      .option("header", "true") //是否有表头
      .option("delimiter", ";") //分隔符
      .csv("F:\24\spark\data\people.csv")
    df1.show

    //打印字段信息
    df1.printSchema
    //		root
    //		 |-- name: string (nullable = true)
    //		 |-- age: string (nullable = true)
    //		 |-- job: string (nullable = true)

    //查询指定字段
    df1.select("name", "age", "job").show
    //+-----+---+---------+
    //| name|age|      job|
    //+-----+---+---------+
    //|Jorge| 30|Developer|
    //|  Bob| 32|Developer|
    //+-----+---+---------+

    //查询指定字段并设置别名,对查询的字段进行运算
    import spark.implicits._
    df1.select($"name" as "name", ($"age" + 1) as "age", $"job" as "job").show
    //+-----+---+---------+
    //| name|age|      job|
    //+-----+---+---------+
    //|Jorge| 31|Developer|
    //|  Bob| 33|Developer|
    //+-----+---+---------+

    //过滤数据
    df1.filter($"age" > 30).show
//    df1.where($"age" > 30).show
    //+-----+---+---------+
    //| name|age|      job|
    //+-----+---+---------+
    //|  Bob| 32|Developer|
    //+-----+---+---------+

    //分组聚合
    df1.groupBy("job").count.show
    //+---------+-----+
    //|      job|count|
    //+---------+-----+
    //|Developer|    2|
    //+---------+-----+
    //复杂聚合
    df1.groupBy(col("job").alias("post")).agg(
      functions.count(col("name")).as("nums")
      , functions.sum(col("age")).as("total_age")
      , functions.avg(col("age")).as("aver_age")
    ).show()
    //+---------+----+---------+--------+
    //|     post|nums|total_age|aver_age|
    //+---------+----+---------+--------+
    //|Developer|   2|     62.0|    31.0|
    //+---------+----+---------+--------+


    spark.stop
  }

}

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注