分享

简述SparkSQL复杂数据类型处理方式

问题导读:
1、sparkSQL支持的数据类型有哪些?
2、greatest方法和Greatest类如何使用?
3、ArrayType处理方法是什么?
4、StructType处理方法是什么?


大家对简单数据类型的比较都很清楚,但是针对array、map、struct这些复杂类型,spark sql是否支持比较呢?都是怎么比较的?我们该怎么利用呢?

2022-01-18_204429.jpg
先给出一个结论:spark sql支持array、struct类型的比较,但不支持map类型的比较(Hive也是如此)。

那是怎么比较的呢?

先来看一下sparksql支持的数据类型

数字类型

  •         TimestampType:代表包含字段年,月,日,时,分,秒的值
  •         DateType:代表包含字段年,月,日的值
  •         ByteType:代表一个字节的整数。范围是-128到127
  •         ShortType:代表两个字节的整数。范围是-32768到32767
  •         IntegerType:代表4个字节的整数。范围是-2147483648到2147483647
  •         LongType:代表8个字节的整数。范围是-9223372036854775808到9223372036854775807
  •         FloatType:代表4字节的单精度浮点数
  •         DoubleType:代表8字节的双精度浮点数
  •         DecimalType:代表任意精度的10进制数据。通过内部的java.math.BigDecimal支持。BigDecimal由一个任意精度的整型非标度值和一个32位整数组成
  •         StringType:代表一个字符串值
  •         BinaryType:代表一个byte序列值
  •         BooleanType:代表boolean值
  •         Datetime类型

复杂类型

  •         StructField(name, dataType, nullable):代表StructType中的一个字段,字段的名字通过name指定,dataType指定field的数据类型,nullable表示字段的值是否有null值。
  •         ArrayType(elementType, containsNull):代表由elementType类型元素组成的序列值。containsNull用来指明ArrayType中的值是否有null值
  •         MapType(keyType, valueType, valueContainsNull):表示包括一组键 - 值对的值。通过keyType表示key数据的类型,通过valueType表示value数据的类型。valueContainsNull用来指明MapType中的值是否有null值
  •         StructType(fields):表示一个拥有StructFields (fields)序列结构的值


源码分析

以max函数为入口来查看:

max.scala-->greatest方法
2022-01-18_204518.jpg

arithmetic.scala-->Greatest类
2022-01-18_204602.jpg

从代码中,我们看到,比较的方法入口是TypeUtils类的getInterpretedOrdering方法。

  1. TypeUtils.getInterpretedOrdering(dataType)
  2. def getInterpretedOrdering(t: DataType): Ordering[Any] = {
  3.     t match {
  4.       //AtomicType 是指一种内部类型,用于表示所有非null、UDT、数组、结构和映射。
  5.       case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
  6.       case a: ArrayType => a.interpretedOrdering.asInstanceOf[Ordering[Any]]
  7.       case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
  8.       case udt: UserDefinedType[_] => getInterpretedOrdering(udt.sqlType)
  9.     }
  10.   }
复制代码



处理四种类型:AtomicType(原子类型:一种内部类型,用于表示所有非null、UDT、数组、结构和映射)、ArrayType(数组的类型)、StructType(struct类型)、UserDefinedType(用户自定义的类型)

从这里可以了解到,没有对map类型的判断方法

ArrayType处理方法

array的比较方法是取最短的数组的长度做为size,从左往右,挨个儿比,直到比出大小。

几种情况:
1、如果两个同位置的元素都为null,则do nothing,接着比下一个
2、如果两个同位置的元素其中有一个为null,则不为null的那个数组大
3、按照从左往右,如果所有同位置的元素都相等,则按长短比,数组元素多的大,如果两个数组长短一样,则说明两个数组相等

  1. @transient
  2. private[sql] lazy val interpretedOrdering: Ordering[ArrayData] = new Ordering[ArrayData] {
  3.   private[this] val elementOrdering: Ordering[Any] = elementType match {
  4.     case dt: AtomicType => dt.ordering.asInstanceOf[Ordering[Any]]
  5.     case a : ArrayType => a.interpretedOrdering.asInstanceOf[Ordering[Any]]
  6.     case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
  7.     case other =>
  8.       throw new IllegalArgumentException(
  9.         s"Type ${other.catalogString} does not support ordered operations")
  10.   }
  11.   def compare(x: ArrayData, y: ArrayData): Int = {
  12.     val leftArray = x
  13.     val rightArray = y
  14.     val minLength = scala.math.min(leftArray.numElements(), rightArray.numElements())
  15.     var i = 0
  16.     while (i < minLength) {
  17.       val isNullLeft = leftArray.isNullAt(i)
  18.       val isNullRight = rightArray.isNullAt(i)
  19.       if (isNullLeft && isNullRight) {
  20.         // Do nothing.
  21.       } else if (isNullLeft) {
  22.         return -1
  23.       } else if (isNullRight) {
  24.         return 1
  25.       } else {
  26.         val comp =
  27.           elementOrdering.compare(
  28.             leftArray.get(i, elementType),
  29.             rightArray.get(i, elementType))
  30.         if (comp != 0) {
  31.           return comp
  32.         }
  33.       }
  34.       i += 1
  35.     }
  36.     if (leftArray.numElements() < rightArray.numElements()) {
  37.       return -1
  38.     } else if (leftArray.numElements() > rightArray.numElements()) {
  39.       return 1
  40.     } else {
  41.       return 0
  42.     }
  43.   }
  44. }
复制代码



StructType处理方法

struct的比较方法和数组类似,因为StructType的fields是以一个数组的结构存储的。


2022-01-18_204646.jpg


StructType中要求元素个数必须是一样的,因此fields数组的长度是一样的。

比较方法也是:从左往右,挨个儿比,直到比出大小。

几种情况:

1、如果两个同位置的元素都为null,则do nothing,接着比下一个
2、如果两个同位置的元素其中有一个为null,则不为null的那个数组大(默认是Ascending,即:NullsFirst)
3、比较同位置元素时,会依据数据类型调用相应类型(AtomicType、ArrayType、StructType-->Struct套Struct的情况)的比较方法


  1. class InterpretedOrdering(ordering: Seq[SortOrder]) extends BaseOrdering {
  2.   def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
  3.     this(bindReferences(ordering, inputSchema))
  4.   override def compare(a: InternalRow, b: InternalRow): Int = {
  5.     var i = 0
  6.     val size = ordering.size
  7.     while (i < size) {
  8.       val order = ordering(i)
  9.       val left = order.child.eval(a)
  10.       val right = order.child.eval(b)
  11.       if (left == null && right == null) {
  12.         // Both null, continue looking.
  13.       } else if (left == null) {
  14.         return if (order.nullOrdering == NullsFirst) -1 else 1
  15.       } else if (right == null) {
  16.         return if (order.nullOrdering == NullsFirst) 1 else -1
  17.       } else {
  18.         val comparison = order.dataType match {
  19.           case dt: AtomicType if order.direction == Ascending =>
  20.             dt.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
  21.           case dt: AtomicType if order.direction == Descending =>
  22.             - dt.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
  23.           case a: ArrayType if order.direction == Ascending =>
  24.             a.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
  25.           case a: ArrayType if order.direction == Descending =>
  26.             - a.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
  27.           case s: StructType if order.direction == Ascending =>
  28.             s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
  29.           case s: StructType if order.direction == Descending =>
  30.             - s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
  31.           case other =>
  32.             throw new IllegalArgumentException(s"Type $other does not support ordered operations")
  33.         }
  34.         if (comparison != 0) {
  35.           return comparison
  36.         }
  37.       }
  38.       i += 1
  39.     }
  40.     0
  41.   }
  42. }
复制代码



怎么利用?


比如计算贡献gmv最大的用户id、购买时间最早的用户id:

可以通过构造struct,把gmv和购买时间做为第一个字段。这样在计算max、min的时候就可以按照gmv或者购买时间取最大、最小,且能同时把对应的其他的信息取出来。
  1. select
  2.     max(gmv_struct('gmv', gmv, 'uid', uid)).gmv,
  3.     max(gmv_struct('gmv', gmv, 'uid', uid)).uid as max_gmv_uid,
  4.     min(paytime_struct('pay_time', pay_time, 'uid', uid)).pay_time,
  5.     min(paytime_struct('pay_time', pay_time, 'uid', uid)).uid as earliest_paytime_uid
  6. from
  7.    XXX
  8. where
  9.    XXX
复制代码



作者:小萝卜算子
来源:https://mp.weixin.qq.com/s/9jc0I3JS3rUQG75AjWQ5kw

最新经典文章,欢迎关注公众号

  


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条