Provide schema while reading csv file as a datafra

2019-02-01 08:40发布

I am trying to read a csv file into a dataframe. I know what the schema of my dataframe should be since I know my csv file. Also I am using spark csv package to read the file. I trying to specify the schema like below.

val pagecount = sqlContext.read.format("csv")
            .option("delimiter"," ").option("quote","")
            .option("schema","project: string ,article: string ,requests: integer ,bytes_served: long")
            .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

But when I check the schema of the data frame I created, it seems to have taken its own schema. Am I doing anything wrong ? how to make spark to pick up the schema I mentioned ?

> pagecount.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)

5条回答
啃猪蹄的小仙女
2楼-- · 2019-02-01 09:09

Thanks to the answer by @Nulu, it works for pyspark with minimal tweaking

from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType

customSchema = StructType(Array(
    StructField("project", StringType, true),
    StructField("article", StringType, true),
    StructField("requests", IntegerType, true),
    StructField("bytes_served", DoubleType, true)))

pagecount = sc.read.format("com.databricks.spark.csv")
         .option("delimiter"," ")
         .option("quote","")
         .option("header", "false")
         .schema(customSchema)
         .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
查看更多
Luminary・发光体
3楼-- · 2019-02-01 09:17

Try below , you need not specify the schema. when you give inferSchema as true it should take it from your csv file.

val pagecount = sqlContext.read.format("csv")
     .option("delimiter"," ").option("quote","")
     .option("header", "true")
     .option("inferSchema", "true")
     .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

if you want to manually specify the schema , you need to do as below

import org.apache.spark.sql.types._

val customSchema = StructType(Array(
        StructField("project", StringType, true),
        StructField("article", StringType, true),
        StructField("requests", IntegerType, true),
        StructField("bytes_served", DoubleType, true)))

     val pagecount = sqlContext.read.format("csv")
             .option("delimiter"," ").option("quote","")
             .option("header", "true")
             .schema(customSchema)
             .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
查看更多
一夜七次
4楼-- · 2019-02-01 09:18

I'm using the solution provided by Arunakiran Nulu in my analysis (see the code). Despite it is able to assign the correct types to the columns, all the values returned are null. Previously, I've tried to the option .option("inferSchema", "true") and it returns the correct values in the dataframe (although different type).

val customSchema = StructType(Array(
    StructField("numicu", StringType, true),
    StructField("fecha_solicitud", TimestampType, true),
    StructField("codtecnica", StringType, true),
    StructField("tecnica", StringType, true),
    StructField("finexploracion", TimestampType, true),
    StructField("ultimavalidacioninforme", TimestampType, true),
    StructField("validador", StringType, true)))

val df_explo = spark.read
        .format("csv")
        .option("header", "true")
        .option("delimiter", "\t")
        .option("timestampFormat", "yyyy/MM/dd HH:mm:ss") 
        .schema(customSchema)
        .load(filename)

Result

root


|-- numicu: string (nullable = true)
 |-- fecha_solicitud: timestamp (nullable = true)
 |-- codtecnica: string (nullable = true)
 |-- tecnica: string (nullable = true)
 |-- finexploracion: timestamp (nullable = true)
 |-- ultimavalidacioninforme: timestamp (nullable = true)
 |-- validador: string (nullable = true)

and the table is:

|numicu|fecha_solicitud|codtecnica|tecnica|finexploracion|ultimavalidacioninforme|validador|
+------+---------------+----------+-------+--------------+-----------------------+---------+
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
查看更多
贼婆χ
5楼-- · 2019-02-01 09:19

Here's how you can work with a custom schema, a complete demo:

$> shell code,

echo "
Slingo, iOS 
Slingo, Android
" > game.csv

Scala code:

import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("game_id", StringType, true),
  StructField("os_id", StringType, true)
))

val csv_df = spark.read.format("csv").schema(customSchema).load("game.csv")
csv_df.show 

csv_df.orderBy(asc("game_id"), desc("os_id")).show
csv_df.createOrReplaceTempView("game_view")
val sort_df = sql("select * from game_view order by game_id, os_id desc")
sort_df.show 
查看更多
Ridiculous、
6楼-- · 2019-02-01 09:22

For those interested in doing this in Python here is a working version.

customSchema = StructType([
    StructField("IDGC", StringType(), True),        
    StructField("SEARCHNAME", StringType(), True),
    StructField("PRICE", DoubleType(), True)
])
productDF = spark.read.load('/home/ForTesting/testProduct.csv', format="csv", header="true", sep='|', schema=customSchema)

testProduct.csv
ID|SEARCHNAME|PRICE
6607|EFKTON75LIN|890.88
6612|EFKTON100HEN|55.66

Hope this helps.

查看更多
登录 后发表回答