类型错误:在 pandas DataFrame上使用Dask时无法设置PICKLE_THREAD._LOCAL对象对象、错误、类型、_LOCAL

2023-09-03 08:04:20 作者:十字当头的我们说什么永远

我有一个巨大的DataFrame,为了节省时间,我想使用Dask来处理它。问题是,当这个TypeError: can't pickle _thread._local objects错误一开始运行时,我就陷入了这个错误。有人能帮帮我吗?

我已经编写了一个函数,该函数根据df的行来处理存储在df中的数据,并用

out = df_query.progress_apply(lambda row: run(row), axis=1)
数据分析 pandas DataFrame的常用操作

并且运行正常。

由于这需要很长时间,我开始使用DASK:

ddata = dd.from_pandas(df_query, npartitions=3)
out = ddata.map_partitions(lambda df: df.apply((lambda row: run(row)), axis=1)).compute(scheduler='processes')

问题是,一旦处理开始,我就会收到这个错误(在巨大的回溯之后,见下文):TypeError: can't pickle _thread._local objects

run(...)函数执行一些数据操作,包括查询数据库。

以下是完整的回溯:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-14-aefae1f00437> in <module>
----> 1 out = ddata.map_partitions(lambda df: df.apply((lambda row: run(row)), axis=1)).compute(scheduler='processes')

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, pool, **kwargs)
    190                            get_id=_process_get_id, dumps=dumps, loads=loads,
    191                            pack_exception=pack_exception,
--> 192                            raise_exception=reraise, **kwargs)
    193     finally:
    194         if cleanup:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    447             # Seed initial tasks into the thread pool
    448             while state['ready'] and len(state['running']) < num_workers:
--> 449                 fire_task()
    450 
    451             # Main loop, wait on tasks to finish, insert new ones

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/local.py in fire_task()
    441                 # Submit
    442                 apply_async(execute_task,
--> 443                             args=(key, dumps((dsk[key], data)),
    444                                   dumps, loads, get_id, pack_exception),
    445                             callback=queue.put)

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/multiprocessing.py in _dumps(x)
     24 
     25 def _dumps(x):
---> 26     return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     27 
     28 

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
    950     try:
    951         cp = CloudPickler(file, protocol=protocol)
--> 952         cp.dump(obj)
    953         return file.getvalue()
    954     finally:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
    265         self.inject_addons()
    266         try:
--> 267             return Pickler.dump(self, obj)
    268         except RuntimeError as e:
    269             if 'recursion' in e.args[0]:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in dump(self, obj)
    435         if self.proto >= 4:
    436             self.framer.start_framing()
--> 437         self.save(obj)
    438         self.write(STOP)
    439         self.framer.end_framing()

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
    769         if n <= 3 and self.proto >= 2:
    770             for element in obj:
--> 771                 save(element)
    772             # Subtle.  Same as in the big comment below.
    773             if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
    769         if n <= 3 and self.proto >= 2:
    770             for element in obj:
--> 771                 save(element)
    772             # Subtle.  Same as in the big comment below.
    773             if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    636         else:
    637             save(func)
--> 638             save(args)
    639             write(REDUCE)
    640 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
    784         write(MARK)
    785         for element in obj:
--> 786             save(element)
    787 
    788         if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    885                 k, v = tmp[0]
    886                 save(k)
--> 887                 save(v)
    888                 write(SETITEM)
    889             # else tmp is empty, and we're done

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
    784         write(MARK)
    785         for element in obj:
--> 786             save(element)
    787 
    788         if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
    769         if n <= 3 and self.proto >= 2:
    770             for element in obj:
--> 771                 save(element)
    772             # Subtle.  Same as in the big comment below.
    773             if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_list(self, obj)
    814 
    815         self.memoize(obj)
--> 816         self._batch_appends(obj)
    817 
    818     dispatch[list] = save_list

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_appends(self, items)
    838                 write(MARK)
    839                 for x in tmp:
--> 840                     save(x)
    841                 write(APPENDS)
    842             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_list(self, obj)
    814 
    815         self.memoize(obj)
--> 816         self._batch_appends(obj)
    817 
    818     dispatch[list] = save_list

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_appends(self, items)
    838                 write(MARK)
    839                 for x in tmp:
--> 840                     save(x)
    841                 write(APPENDS)
    842             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
    393                 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
    394                 or themodule is None):
--> 395             self.save_function_tuple(obj)
    396             return
    397         else:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
    592         if hasattr(func, '__qualname__'):
    593             state['qualname'] = func.__qualname__
--> 594         save(state)
    595         write(pickle.TUPLE)
    596         write(pickle.REDUCE)  # applies _fill_function on the tuple

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    885                 k, v = tmp[0]
    886                 save(k)
--> 887                 save(v)
    888                 write(SETITEM)
    889             # else tmp is empty, and we're done

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
    393                 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
    394                 or themodule is None):
--> 395             self.save_function_tuple(obj)
    396             return
    397         else:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
    592         if hasattr(func, '__qualname__'):
    593             state['qualname'] = func.__qualname__
--> 594         save(state)
    595         write(pickle.TUPLE)
    596         write(pickle.REDUCE)  # applies _fill_function on the tuple

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
    393                 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
    394                 or themodule is None):
--> 395             self.save_function_tuple(obj)
    396             return
    397         else:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
    592         if hasattr(func, '__qualname__'):
    593             state['qualname'] = func.__qualname__
--> 594         save(state)
    595         write(pickle.TUPLE)
    596         write(pickle.REDUCE)  # applies _fill_function on the tuple

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    522             reduce = getattr(obj, "__reduce_ex__", None)
    523             if reduce is not None:
--> 524                 rv = reduce(self.proto)
    525             else:
    526                 reduce = getattr(obj, "__reduce__", None)

TypeError: can't pickle _thread._local objects

推荐答案

您的run函数可能引用了其作用域之外的变量,这些变量将被捕获到闭包中。确保已在函数内创建了任何文件句柄或数据库连接

错误:

conn = DBConn(...)
def run(row):
    return conn.do_stuff(row)

好:

def run(row):
    conn = DBConn(...)
    return conn.do_stuff(row)