The following example code tries to put some case objects into a dataframe. The code includes the definition of a case object hierarchy and a case class using this trait:
When executing the code, I unfortunately encounter the following exception:
Questions
- Is there a possibility to add or define a schema for certain types (here type
Some
)? - Does another approach exist to represent this kind of enumerations?
- I tried to use
Enumeration
directly, but also without success. (see below)
- I tried to use
Code for
Enumeration
:Thanks in advance. I hope, that the best approach is not to use strings instead.
Jacek Laskowski48.5k2020 gold badges149149 silver badges288288 bronze badges
Martin SenneMartin Senne4,08155 gold badges2222 silver badges4242 bronze badges
1 Answer
Spark 2.0.0+:
UserDefinedType
has been made private in Spark 2.0.0 and as for now it has no Dataset
friendly replacement.See: SPARK-14155 (Hide UserDefinedType in Spark 2.0)
Most of the time statically typed
Dataset
can serve as replacement There is a pending Jira SPARK-7768 to make UDT API public again with target version 2.4.See also How to store custom objects in Dataset?
Spark < 2.0.0
Is there a possibility to add or define a schema for certain types (here type Some)?
I guess the answer depends on how badly you need this. It looks like it is possible to create an
UserDefinedType
but it requires access to DeveloperApi
and is not exactly straightforward or well documented.You should probably override
hashCode
and equals
as well.Its PySpark counterpart can look like this:
In Spark < 1.5 Python UDT requires a paired Scala UDT, but it look like it is no longer the case in 1.5.
For a simple UDT like you can use simple types (for example
IntegerType
instead of whole Struct
).Community♦
zero323zero323182k4444 gold badges540540 silver badges605605 bronze badges
Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.
Not the answer you're looking for? Browse other questions tagged scalaapache-sparkapache-spark-sqlcase-class or ask your own question.
This repository has been archived by the owner. It is now read-only.
Join GitHub today
GitHub is home to over 36 million developers working together to host and review code, manage projects, and build software together.
Sign upComments
commented Oct 30, 2015
It would be nice to have an option to supply a read schema (in lieu of the embedded schema) when reading avro files via spark-avro. For example, the Python Avro API allows the following: reader = DataFileReader(data, DatumReader(readers_schema=schema)) The scenario is this: I have many .avro files, possibly with different schemas (due to schema evolution), and I would like to use a single 'master' schema to ingest all of those avro files into a single Spark Dataframe. |
commented Nov 3, 2015
This will be a really good idea. I've just been looking at Avro schema evolution and how I can manage this in spark |
commented Nov 20, 2015
I'm waiting for this feature as well. Otherwise there is no way of ingesting many avro files (each with their own schemas) with an up-to-date master schema. |
commented Nov 23, 2015
i'm also waiting for this feature as it seems spark is very slow in generating dataframe schema when a large number of sequence files are selected. It also seems to create thousands of broadcast variables and using a lot of memory on the driver node. |
added a commit to yanxiaole/spark-avro that referenced this issue Dec 26, 2015
referenced this issue Dec 26, 2015
ClosedISSUE #96: Specifying a read schema #109
added a commit to yanxiaole/spark-avro that referenced this issue Jan 18, 2016
referenced this issue Jan 24, 2016
ClosedAdd a new feature to get Schema from explicit Avro class #113
added a commit to yanxiaole/spark-avro that referenced this issue Feb 3, 2016
commented Jul 5, 2016
Hi, is there any progress on this feature? The only reason I'm not using spark-avro (using hadoopRDD instead) is because I need support for schemas that can evolve. I'm sometimes re-processing historical data which may not have some attributes which have been recently added, but the whole lot would be processed in one go ideally. |
commented Jul 6, 2016 • edited
edited
In Spark 2.0, you can specify a schema using spark.read.schema(user_defined_schema).format(..). |
commented Jul 12, 2016
That's great, thanks for the reply. |
commented Jul 18, 2016
Sorry, another question on this @clockfly. Shouldn't I be able to specify an avsc file? I tried this and I'm getting scala.MatchError. I've already had to specify my input schema in case classes for conversion to dataset, and re-specifying the exact same thing in StructType seems unweildy. The most convenient scenario would be to just plug in my latest avsc generated from avro-tools. |
commented Jul 18, 2016 • edited
edited
Thanks @clockfly. Do you guys have any trouble to load spark-avro 3.0.0-preview (for Spark 2.0)? The parameter used in spark shell: --packages com.databricks:spark-avro_2.10:3.0.0-preview (I also tried --packages com.databricks:spark-avro_2.10:2.0.1) I get an error: java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.avro. Please use Spark package http://spark-packages.org/package/databricks/spark-avro Thanks. |
commented Jul 27, 2016 • edited
edited
I've been playing around with this. Should it not be able to support missing fields if they are nullable? I can't really use this feature unless I can specify an all encompassing master schema which will work for all versions, including earlier files that have new fields missing. In @clockfly's example above, I should be able to specify a schema like this val struct = StructType(StructField('a', IntegerType, true) :: StructField('b', IntegerType, true) :: StructField('c', IntegerType, true) :: StructField('d', IntegerType, true) :: Nil) and have the input file read as follows: I believe the problem is that the StructField nullable property isn't enough. It needs a default setting of null as well, which I can't see a way around :( |
referenced this issue Aug 10, 2016
Closedfix #96, support user-define schema when reading avro files #155
commented Aug 10, 2016
Spark Udf Case Class
@tomseddon I have created a PR to solve this problem #155 |
commented Aug 23, 2016
Wondering how to make this work in Spark Streaming? spark.read.format('com.databricks.spark.avro').schema(DataType.fromJson(res6).asInstanceOf[StructType])**.load('/tmp/output').**show() Is microbatching and writing it to the file system would be the way to go ahead |
modified the milestones: 3.0.1, 3.1.0Sep 15, 2016
commented Nov 9, 2016
Can you please let me know if the fix#96 is included in 3.0.1 build or scheduled for any future release. I tried the test using 3.0.1, but does not seem to be working. |
added a commit to bdrillard/spark-avro that referenced this issue Nov 29, 2016
Sign up for freeto subscribe to this conversation on GitHub. Already have an account? Sign in.
I ran into a little problem with my Spark Scala script. Basically I have raw data which I am doing aggregations on and after grouping and counting etc I want to save the output to a specific JSON format.
EDIT:
I tried to simplify the question and rewrote it:
When I select data from the source dataframe with an
Array[org.apache.spark.sql.Column]
where the column names have aliases, then using column names (or indeed indices) as variables when trying to map the rows to a case class, then I get a 'Task not serializable' exception.And the second part or the question, I actually need the final schema to be an array of these
Result
class objects. I still haven't figured out, how to do this as well. The expected result should have a schema like that:TL;DR:
- How to map dataframe rows to a case class object when dataframe columns have aliases and variables are used for column names?
- How to add these case class objects to an array? Piper 28 201 afm.
V. Samma
V. SammaV. Samma1,23322 gold badges1717 silver badges2626 bronze badges
1 Answer
Serialization Issue: the problem here is the
val n = 'Name'
: it is used inside an anonymous function passed into an RDD transformation (dm2.map(..)
), which makes Spark close over that variable and the scope containing it, which also includes cl2
which has the type Array[Column]
, hence it isn't serializable.The solution is simple - either inline
Tzach ZoharTzach Zoharn
(to get dm2.map(row => Result(row.getAs('Name')))
), or place it in a Serializable context (an object or a class that doesn't contain any non-serializable members).30.8k33 gold badges4949 silver badges6262 bronze badges
Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.
Not the answer you're looking for? Browse other questions tagged jsonscalaapache-sparkreflectioncase-class or ask your own question.
If I wanted to create a
StructType
(i.e. a DataFrame.schema
) out of a case class
, is there a way to do it without creating a DataFrame
? I can easily do:But it seems overkill to actually create a
DataFrame
when all I want is the schema.(If you are curious, the reason behind the question is that I am defining a
David GriffinDavid GriffinUserDefinedAggregateFunction
, and to do so you override a couple of methods that return StructTypes
and I use case classes.)9,47744 gold badges3333 silver badges5858 bronze badges
4 Answers
You can do it the same way
Tzach ZoharTzach ZoharSQLContext.createDataFrame
does it:What Is Dataframe In Spark
30.8k33 gold badges4949 silver badges6262 bronze badges
I know this question is almost a year old but I came across it and thought others who do also might want to know that I have just learned to use this approach:
KurtKurt
Spark Case Class As Column
in case someone wants to do this for a custom Java bean:
ArtArt
Instead of manually reproducing the logic for creating the implicit
Encoder
object that gets passed to toDF
, one can use that directly (or, more precisely, implicitly in the same way as toDF
):Unfortunately, this actually suffers from the same problem as using
org.apache.spark.sql.catalyst
or Encoders
as in the other answers: the Encoder
trait is experimental.How does this work? The
toDF
method on Seq
comes from a DatasetHolder
, which is created via the implicit localSeqToDatasetHolder
that is imported via spark.implicits._
. That function is defined like:As you can see, it takes an
huonhuonimplicit
Encoder[T]
argument, which, for a case class
, can be computed via newProductEncoder
(also imported via spark.implicits._
). We can reproduce this implicit logic to get an Encoder
for our case class, via the convenience scala.Predef.implicitly
(in scope by default, because it's from Predef
) that will just returns its requested implicit argument:57.9k99 gold badges156156 silver badges178178 bronze badges