`
ZacMa
  • 浏览: 37505 次
  • 来自: 深圳
社区版块
存档分类
最新评论

<7>pg2 分析

pg2 
阅读更多
网上看到erlang的pg2模块似乎没人推荐使用,但是还是有不少使用者,自己也感觉使用跨节点通信时候,使用它来管理各个近点的进程也是不错的选择, 使用过程中也有不少疑问,也感觉接口不够丰富,简单分析下:

pg2为什么允许一个进程加入两次呢?,并且也要退出两次,使用时候可以自己加些判断,
join(Pid, Group) ->         
      ok =  case lists:member(Pid, pg2:get_members(Group)) of
            true -> ok;
               _ -> pg2:join(Group, Pid)
             end;

加接口,
获取某个节点的所有进程
获取某个节点的某个进程,进程id随机
get_node_members(Node, Group) -> [pid()] | {error, {no_such_group, Group}}
get_node_pid(Node, Group) -> pid() | {error, Reason}

get_node_members(Node, Group) ->
    case pg2:get_members(Group) of
        {error, Reason} ->
            {error, Reason};
        AllMembers ->
            Fun = fun(X) ->
                node(X) =:= Node end,
            lists:filter(Fun, AllMembers)
    end.

get_node_pid(Node, Group) ->
    case get_node_members(Node, Group) of
        [Pid] ->
            Pid;
        [] ->
            [];
        Members when is_list(Members) ->
            {_,_,X} = erlang:now(),
            lists:nth((X rem length(Members))+1, Members);
        Else ->
            Else
    end.

网上还看到一种获取最优的pid
get_best_pid(Group) ->
  Members = pg2:get_members(Group),
  Members1 = lists:map(fun(Pid) ->
      [{message_queue_len, Messages}] = erlang:process_info(Pid, [message_queue_len]),
                                                                 {Pid, Messages}
                       end, Members),
case lists:keysort(2, Members1) of
  [{Pid, _} | _] -> Pid;
  [] -> {error, empty_process_group}
end.
不过只能获得本地进程的信息,不能跨节点获取,
Failure: badarg if Pid is not a local process, or if Item is not a valid Item.
所以这种方法只适用于本地进程


那么pg2 是怎么实现节点信息存取的?
其实就是各个节点注册了一个相同名称的pg2, 通过{pg2, Node} ! Msg来实现
pg2.erl

start() ->
    ensure_started().
ensure_started() ->
    case whereis(?MODULE) of
        undefined ->
            C = {pg2, {?MODULE, start_link, []}, permanent,
                 1000, worker, [?MODULE]},
            supervisor:start_child(kernel_safe_sup, C);
        Pg2Pid ->
            {ok, Pg2Pid}
    end.

初始化的时候,给其他可见的node发送消息{new_pg2, node()}, 并且给自己发送{nodeup, Node},   并创建ets表
init([]) ->
    Ns = nodes(),
    net_kernel:monitor_nodes(true),
    lists:foreach(fun(N) ->
                          {?MODULE, N} ! {new_pg2, node()},
                          self() ! {nodeup, N}
                  end, Ns),
    pg2_table = ets:new(pg2_table, [ordered_set, protected, named_table]),
    {ok, #state{}}.

当自己收到nodeup, 向Node发送信息
当收到nodeup或者new_pg2都会返回信息{exchange, node(), all_members()}
all_members 就是所有的group和pid信息

handle_info({nodeup, Node}, S) ->
    gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
    {noreply, S};

handle_info({new_pg2, Node}, S) ->
    gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
    {noreply, S};

all_members() ->
    [[G, group_members(G)] || G <- all_groups()].

group_members(Name) ->
    [P ||
        [P, N] <- ets:match(pg2_table, {{member, Name, '$1'},'$2'}),
        _ <- lists:seq(1, N)].

当收到exchange信息,就会进行存储
handle_cast({exchange, _Node, List}, S) ->
    store(List),
    {noreply, S};

store(List) ->
    _ = [(assure_group(Name)
          andalso
          [join_group(Name, P) || P <- Members -- group_members(Name)]) ||
            [Name, Members] <- List],
    ok.
all_members() ->
    [[G, group_members(G)] || G <- all_groups()].

group_members(Name) ->
    [P || 
        [P, N] <- ets:match(pg2_table, {{member, Name, '$1'},'$2'}),
        _ <- lists:seq(1, N)].

存储就是将其他节点的进程,全部加入本节点,如果本几点没有该进程的话


那么在join时候会发生什么呢?
join(Name, Pid) when is_pid(Pid) ->
    ensure_started(),
    case ets:member(pg2_table, {group, Name}) of
        false ->
            {error, {no_such_group, Name}};
        true ->
            global:trans({{?MODULE, Name}, self()},
                         fun() ->
                                 gen_server:multi_call(?MODULE,
                                                       {join, Name, Pid})
                         end),
            ok 
    end.
就是告诉所有的节点,有一个进程要加入,
在告诉其他节点的时候,会对这个节点加锁 ,利用函数 global:trans/2
Sets a lock on Id (using set_lock/3). If this succeeds, Fun() is evaluated and the result Res is returned. Returns aborted if the lock attempt failed. If Retries is set to infinity, the transaction will not abort.
infinity is the default setting and will be used if no value is given for Retries.
就是说默认是是无限次加锁,如果加锁失败就会重新尝试,直到成功,
再看set_lock/3,
Sets a lock on the specified nodes (or on all nodes if none are specified) on ResourceId for LockRequesterId. If a lock already exists on ResourceId for another requester than LockRequesterId, and Retries is not equal to 0, the process sleeps for a while and will try to execute the action later. When Retries attempts have been made, false is returned, otherwise true. If Retries is infinity, true is eventually returned
This function is completely synchronous.
This function does not address the problem of a deadlock. A deadlock can never occur as long as processes only lock one resource at a time. But if some processes try to lock two or more resources, a deadlock may occur. It is up to the application to detect and rectify a deadlock.
pg2/leave/2和join做了相同的操作,
加锁过程完全同步,直到加锁成功,并且有可能会造成死锁,
如果是global注册的就只有一个锁,而pg2是本地注册的,
,并且有可能造成死锁的危险,看来pg2的代价真的是不小,而如果只采用注册本地进程来发消息,就不用这么多资源的使用, 或许很多人不用pg2也是有原因的吧,但是只要不是频繁的调用函数leave, join还是可以使用, 使用时候,最好用异步来调用leave/2 和join/2函数,
至于pg2可以实现进程挂掉主动从pg2移除,也是用leave实现的
分享到:
评论

相关推荐

    ap6212a0_bb16v3_sina33验证通过BT的功能_wifi部分有问题_20170626_1148没有外层目录.7z

    [ 4.040272] VFP support v0.3: implementor 41 architecture 2 part 30 variant 7 rev 5 [ 4.048780] ThumbEE CPU extension supported. [ 4.053550] Registering SWP/SWPB emulation handler [ 4.059269] [rfkill]...

    !!!!ap6212a0_a33_sc3817r_验证通过_修正wifi的配置文件为nvram_ap6212.txt

    [ 4.040272] VFP support v0.3: implementor 41 architecture 2 part 30 variant 7 rev 5 [ 4.048780] ThumbEE CPU extension supported. [ 4.053550] Registering SWP/SWPB emulation handler [ 4.059269] [rfkill]...

    ap6212a0_a33_sc3817r_服务器验证通过_bt已经通了_wifi需要修改配置_需要再次验证_20170626_1549.7z

    [ 4.040272] VFP support v0.3: implementor 41 architecture 2 part 30 variant 7 rev 5 [ 4.048780] ThumbEE CPU extension supported. [ 4.053550] Registering SWP/SWPB emulation handler [ 4.059269] [rfkill]...

    ap6212a0_a33_sc3817r_神舟验证版本_借用nvram_ap6210这个配置文件_20170626_1834没有外层目录.7z

    [ 4.040272] VFP support v0.3: implementor 41 architecture 2 part 30 variant 7 rev 5 [ 4.048780] ThumbEE CPU extension supported. [ 4.053550] Registering SWP/SWPB emulation handler [ 4.059269] [rfkill]...

    数据分析实战项目--链家租房数据可视化分析.pdf

    内容 内容 本项⽬分为两部分: + 数据可视化分析 (1)爬⾍:⽤包爬取链家租房⽹页内容——BeautifulSoup解析——re正则表达式匹配我们需要的内容——sqlite3保存数据 (2)分析:pandas清洗分析数据——pyecharts...

    GIS视频教程下载

    课时14:2.PostgreSQL索引和PG的GIST注意事项 课时15:3.PostGIS raster 课时16:4.PostGIS开源开发 课时17:5.PostGIS栅格操作 课时18:6.实践操作 课时19:课件资料 课时20:1.PG企业应用 课时21:2.其他开源空间...

    开源GIS视频教程优化版

    2、通过对典型开源GIS项目的分析,重点学习GIS设计的基本内容:项目规划,组织管理,系统设计,编码技能和系统测试与维护 3、通过典型模式分析,掌握设计模式在GIS项目中的使用原则和方法以及技巧,难点是分析设计...

    软件开发文档-开发流程..

    5. 产品优缺点分析 7 6. MAKE-OR-BUY决策 7 7. 项目计划 7 7.1 项目团队 7 7.2 软件硬件资源估计 8 7.3 成本估计 8 7.4 进度表 8 8. 市场营销计划 9 8.1产品盈利模式和销售目标 9 8.2 促销和渗透方式 9 8.3 销售方式...

    桥式起重机毕业设计论文

    1—电动机 2—制动器 3—高速浮动轴 4—联轴器 5—减速器 6—联轴器 7低速浮动轴 8—联轴器 9—车轮 2.2.2 选择车轮与轨道,并验算其强度 按照如图所示的重量分布,计算大车的最大轮压和最小轮压: 满载时的最大轮压...

    bios逆向工程文集

    2.Preliminary Bios Modification Guide 3.Advanced Award Bios v4.51PG Hacking Tutorial 4.Pinczakko's Guide to Award BIOS Reverse Engineering 5.Award Bios "POST Jump Table" Hacking 6.Building a Kernel in...

    DBX260中文说明书

    并提供线路/RTA开关,可让用户将进行实时声频分析话筒直接接到260 DriveRackÔ的输入上,260 DriveRackÔ的2个XLR输 入还有一个脚1浮地开关,当它按下时所选的XLR输入对的地浮起。 忠告:要想正确使用RTA话筒,必须...

    软件项目数据库设计报告.doc

    机构名称,日期 [SPP-PROC-SD] SEPG,系统设计规范,机构名称,日期 0.5 术语与缩写解释 "缩写、术语 "解 释 " "SPP "精简并行过程,Simplified Parallel Process " "SD "系统设计,System Design " " " " " " " "…...

    3-数据库设计报告.doc

    机构名称,日期 [SPP-PROC-SD] SEPG,系统设计规范,机构名称,日期 0.5 术语与缩写解释 "缩写、术语 "解 释 " "SPP "精简并行过程,Simplified Parallel Process " "SD "系统设计,System Design " " " " " " " "…...

    php网络开发完全手册

    第7章 字符的处理与正则表达式 102 7.1 字符类型的特殊性 102 7.2 字符的显示与格式化 102 7.2.1 字符的显示 102 7.2.2 字符的格式化 103 7.3 常见的操作 104 7.3.1 字符串重复操作——str_repeat 104 7.3.2 字符串...

    精通Odoo开发和使用

    2 Odoo 框架简介 7 2.1 python 模块分析 8 2.2 python2 还是 python3 8 3 Odoo 的安装和配置 9 3.1 PostgreSQL 数据库 10 3.2 Ubuntu14.04 下可能缺失的软件包 11 3.3 网页显示 node.js 方面 11 3.4 其他问题 12 3.5...

    进销存系统数据库设计报告.doc

    数据库设计过程中采用Sybase公司的PowerDesigner9.0创 建l了PSS数据库的ER图,使用SQL Server的查询分析器创建了数据库脚本文件PSS.sql。其中SQL Server的登录模式为混和身份验证,超级用户的用户名及密码均为sa,...

    HPLC培训教程.pdf

    我国药典收载高效液相色谱法项目和数量比较表: 方法 项目 数量 1985 年版 1990 年版 1995 年版 2000 年版 HPLC 法 鉴别 9 34 150 检查 12 40 160 含量测定 7 60 117 387 鉴于 HPLC 应用在药品分析中越来越多,因此...

    Reference

    Excel 第2章:VBA 第3章:Python VS代码Github 第4章:Python和熊猫Jupyter笔记本大熊猫第5章:Python,Matplotlib,SciPy和Numpy Matplotlib 科学NumPy 第6章:Python和API 蜜蜂随机模块Timeit模块城市模块日期时间...

Global site tag (gtag.js) - Google Analytics