root/trunk/plagger/lib/Plagger/Plugin/Aggregator/Xango.pm

Revision 479 (checked in by miyagawa, 15 years ago)

Xango: add VERSION number so that PAUSE can hadndle

  • Property svn:keywords set to Id
Line 
1 # $Id$
2 #
3 # Copyright (c) 2006 Daisuke Maki <dmaki@cpan.org>
4 # All rights reserved.
5
6 package Plagger::Plugin::Aggregator::Xango;
7 use strict;
8 use base qw( Plagger::Plugin::Aggregator::Simple );
9 use POE;
10 use Xango::Broker::Push;
11 # BEGIN { sub Xango::DEBUG { 1 } } # uncomment to get Xango debug messages
12
13 our $VERSION = '0.1';
14
15 sub register {
16     my($self, $context) = @_;
17
18     my %xango_args = (
19         Alias => 'xgbroker',
20         HandlerAlias => 'xghandler',
21         HttpCompArgs => [ Agent => "Plagger/$Plagger::VERSION (http://plagger.org/)", Timeout => $self->conf->{timeout} || 10 ],
22         %{$self->conf->{xango_args} || {}},
23     );
24     $self->{xango_alias} = $xango_args{Alias};
25     Plagger::Plugin::Aggregator::Xango::Crawler->spawn(
26         Plugin => $self,
27         UseCache => exists $self->conf->{use_cache} ?
28             $self->conf->{use_cache} : 1,
29         BrokerAlias => $xango_args{Alias},
30         MaxRedirect => $self->conf->{max_redirect} || 3,
31     );
32     Xango::Broker::Push->spawn(%xango_args);
33     $context->register_hook(
34         $self,
35         'customfeed.handle'   => \&aggregate,
36         'aggregator.finalize' => \&finalize,
37     );
38 }
39
40 sub aggregate {
41     my($self, $context, $args) = @_;
42
43     my $url = $args->{feed}->url;
44     return unless $url =~ m!^https?://!i;
45     $context->log(info => "Fetch $url");
46     POE::Kernel->post($self->{xango_alias}, 'enqueue_job', Xango::Job->new(uri => URI->new($url), redirect => 0));
47 }
48
49 sub finalize {
50     my($self, $context, $args) = @_;
51     POE::Kernel->run;
52 }
53
54 package Plagger::Plugin::Aggregator::Xango::Crawler;
55 use strict;
56 use Feed::Find;
57 use POE;
58 use Storable qw(freeze thaw);
59 use XML::Feed;
60
61 sub apply_policy { 1 }
62 sub spawn  {
63     my $class = shift;
64     my %args  = @_;
65
66     POE::Session->create(
67         heap => {
68             PLUGIN => $args{Plugin}, USE_CACHE => $args{UseCache},
69             BROKER_ALIAS => $args{BrokerAlias},
70             MaxRedirect => $args{MaxRedirect},
71         },
72         package_states => [
73             $class => [ qw(_start _stop apply_policy prep_request handle_response) ]
74         ]
75     );
76 }
77
78 sub _start { $_[KERNEL]->alias_set('xghandler') }
79 sub _stop  { }
80 sub prep_request {
81     return unless $_[HEAP]->{USE_CACHE};
82
83     my $job = $_[ARG0];
84     my $req = $_[ARG1];
85     my $plugin = $_[HEAP]->{PLUGIN};
86
87     my $ref = $plugin->cache->get($job->uri);
88     if ($ref) {
89         $req->if_modified_since($ref->{LastModified})
90             if $ref->{LastModified};
91         $req->header('If-None-Match', $ref->{ETag})
92             if $ref->{ETag};
93     }
94 }
95
96 sub handle_response {
97     my $job = $_[ARG0];
98     my $plugin = $_[HEAP]->{PLUGIN};
99
100     my $redirect = $job->notes('redirect') + 1;
101     return if $redirect > $_[HEAP]->{MaxRedirect};
102
103     my $r = $job->notes('http_response');
104     my $url    = $job->uri;
105     if ($r->code =~ /^30[12]$/) {
106         $url = $r->header('location');
107         return unless $url =~ m!^https?://!i;
108         $_[KERNEL]->post($_[HEAP]->{BROKER_ALIAS}, 'enqueue_job', Xango::Job->new(uri => URI->new($url), redirect => $redirect));
109         return;
110     } else {
111         return unless $r->is_success;
112
113         my $ct = $r->content_type;
114         if ( $Feed::Find::IsFeed{$ct} ) {
115             $plugin->handle_feed($url, $r->content_ref);
116         } else {
117             my @feeds = Feed::Find->find_in_html($r->content_ref, $url);
118             if (@feeds) {
119                 $url = $feeds[0];
120                 return unless $url =~ m!^https?://!i;
121                 $_[KERNEL]->post($_[HEAP]->{BROKER_ALIAS}, 'enqueue_job', Xango::Job->new(uri => URI->new($url), redirect => $redirect));
122             }
123             return;
124         }
125     }
126
127     if ($_[HEAP]->{USE_CACHE}) {
128         $plugin->cache->set(
129             $job->uri,
130             {ETag => $r->header('ETag'),
131                 LastModified => $r->header('Last-Modified')}
132         );
133     }
134 }
135
136 1;
137
Note: See TracBrowser for help on using the browser.